Skip to content

Commit 27b5f31

Browse files
Davies Liurxin
authored andcommitted
[SPARK-11836][SQL] use existing SQLContext for udf/cast (1.5 branch)
udf/cast should use existing SQLContext. Author: Davies Liu <[email protected]> Closes #9915 from davies/create_1.5.
1 parent e9ae1fd commit 27b5f31

File tree

3 files changed

+9
-5
lines changed

3 files changed

+9
-5
lines changed

python/pyspark/sql/column.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,9 @@ def cast(self, dataType):
318318
if isinstance(dataType, basestring):
319319
jc = self._jc.cast(dataType)
320320
elif isinstance(dataType, DataType):
321-
sc = SparkContext._active_spark_context
322-
ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
323-
jdt = ssql_ctx.parseDataType(dataType.json())
321+
from pyspark.sql import SQLContext
322+
ctx = SQLContext._instantiatedContext
323+
jdt = ctx._ssql_ctx.parseDataType(dataType.json())
324324
jc = self._jc.cast(jdt)
325325
else:
326326
raise TypeError("unexpected type: %s" % type(dataType))

python/pyspark/sql/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ class SQLContext(object):
7575
SQLContext in the JVM, instead we make all calls to this object.
7676
"""
7777

78+
_instantiatedContext = None
79+
7880
@ignore_unicode_prefix
7981
def __init__(self, sparkContext, sqlContext=None):
8082
"""Creates a new SQLContext.
@@ -99,6 +101,7 @@ def __init__(self, sparkContext, sqlContext=None):
99101
self._scala_SQLContext = sqlContext
100102
_monkey_patch_RDD(self)
101103
install_exception_handler()
104+
SQLContext._instantiatedContext = self
102105

103106
@property
104107
def _ssql_ctx(self):

python/pyspark/sql/functions.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,14 +1422,15 @@ def __init__(self, func, returnType, name=None):
14221422
self._judf = self._create_judf(name)
14231423

14241424
def _create_judf(self, name):
1425+
from pyspark.sql import SQLContext
14251426
f, returnType = self.func, self.returnType # put them in closure `func`
14261427
func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
14271428
ser = AutoBatchedSerializer(PickleSerializer())
14281429
command = (func, None, ser, ser)
14291430
sc = SparkContext._active_spark_context
14301431
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
1431-
ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
1432-
jdt = ssql_ctx.parseDataType(self.returnType.json())
1432+
ctx = SQLContext._instantiatedContext
1433+
jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())
14331434
if name is None:
14341435
name = f.__name__ if hasattr(f, '__name__') else f.__class__.__name__
14351436
judf = sc._jvm.UserDefinedPythonFunction(name, bytearray(pickled_command), env, includes,

0 commit comments

Comments
 (0)