Skip to content

Commit 84be4c8

Browse files
HyukjinKwonholdenk
authored andcommitted
[SPARK-19019][PYTHON][BRANCH-2.0] Fix hijacked collections.namedtuple and port cloudpickle changes for PySpark to work with Python 3.6.0
## What changes were proposed in this pull request? This PR proposes to backports #16429 to branch-2.0 so that Python 3.6.0 works with Spark 2.0.x. ## How was this patch tested? Manually, via ``` ./run-tests --python-executables=python3.6 ``` ``` Finished test(python3.6): pyspark.tests (124s) Finished test(python3.6): pyspark.accumulators (4s) Finished test(python3.6): pyspark.broadcast (4s) Finished test(python3.6): pyspark.conf (3s) Finished test(python3.6): pyspark.context (15s) Finished test(python3.6): pyspark.ml.classification (24s) Finished test(python3.6): pyspark.sql.tests (190s) Finished test(python3.6): pyspark.mllib.tests (190s) Finished test(python3.6): pyspark.ml.clustering (14s) Finished test(python3.6): pyspark.ml.linalg.__init__ (0s) Finished test(python3.6): pyspark.ml.recommendation (18s) Finished test(python3.6): pyspark.ml.feature (28s) Finished test(python3.6): pyspark.ml.evaluation (28s) Finished test(python3.6): pyspark.ml.regression (21s) Finished test(python3.6): pyspark.ml.tuning (17s) Finished test(python3.6): pyspark.streaming.tests (239s) Finished test(python3.6): pyspark.mllib.evaluation (15s) Finished test(python3.6): pyspark.mllib.classification (24s) Finished test(python3.6): pyspark.mllib.clustering (37s) Finished test(python3.6): pyspark.mllib.linalg.__init__ (0s) Finished test(python3.6): pyspark.mllib.fpm (19s) Finished test(python3.6): pyspark.mllib.feature (19s) Finished test(python3.6): pyspark.mllib.random (8s) Finished test(python3.6): pyspark.ml.tests (76s) Finished test(python3.6): pyspark.mllib.stat.KernelDensity (0s) Finished test(python3.6): pyspark.mllib.recommendation (21s) Finished test(python3.6): pyspark.mllib.linalg.distributed (27s) Finished test(python3.6): pyspark.mllib.regression (22s) Finished test(python3.6): pyspark.mllib.stat._statistics (11s) Finished test(python3.6): pyspark.mllib.tree (16s) Finished test(python3.6): pyspark.profiler (8s) Finished test(python3.6): pyspark.shuffle (1s) Finished test(python3.6): pyspark.mllib.util (17s) Finished test(python3.6): pyspark.serializers (12s) Finished test(python3.6): pyspark.rdd (18s) Finished test(python3.6): pyspark.sql.conf (4s) Finished test(python3.6): pyspark.sql.catalog (14s) Finished test(python3.6): pyspark.sql.column (13s) Finished test(python3.6): pyspark.sql.context (15s) Finished test(python3.6): pyspark.sql.group (26s) Finished test(python3.6): pyspark.sql.dataframe (31s) Finished test(python3.6): pyspark.sql.functions (32s) Finished test(python3.6): pyspark.sql.types (5s) Finished test(python3.6): pyspark.sql.streaming (11s) Finished test(python3.6): pyspark.sql.window (5s) Finished test(python3.6): pyspark.streaming.util (0s) Finished test(python3.6): pyspark.sql.session (15s) Finished test(python3.6): pyspark.sql.readwriter (34s) Tests passed in 376 seconds ``` Author: hyukjinkwon <[email protected]> Closes #17374 from HyukjinKwon/SPARK-19019-backport.
1 parent 24f6ef2 commit 84be4c8

File tree

2 files changed

+87
-31
lines changed

2 files changed

+87
-31
lines changed

python/pyspark/cloudpickle.py

Lines changed: 67 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from __future__ import print_function
4444

4545
import operator
46+
import opcode
4647
import os
4748
import io
4849
import pickle
@@ -53,6 +54,8 @@
5354
import itertools
5455
import dis
5556
import traceback
57+
import weakref
58+
5659

5760
if sys.version < '3':
5861
from pickle import Pickler
@@ -68,10 +71,10 @@
6871
PY3 = True
6972

7073
#relevant opcodes
71-
STORE_GLOBAL = dis.opname.index('STORE_GLOBAL')
72-
DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL')
73-
LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL')
74-
GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
74+
STORE_GLOBAL = opcode.opmap['STORE_GLOBAL']
75+
DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL']
76+
LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
77+
GLOBAL_OPS = (STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL)
7578
HAVE_ARGUMENT = dis.HAVE_ARGUMENT
7679
EXTENDED_ARG = dis.EXTENDED_ARG
7780

@@ -90,6 +93,43 @@ def _builtin_type(name):
9093
return getattr(types, name)
9194

9295

96+
if sys.version_info < (3, 4):
97+
def _walk_global_ops(code):
98+
"""
99+
Yield (opcode, argument number) tuples for all
100+
global-referencing instructions in *code*.
101+
"""
102+
code = getattr(code, 'co_code', b'')
103+
if not PY3:
104+
code = map(ord, code)
105+
106+
n = len(code)
107+
i = 0
108+
extended_arg = 0
109+
while i < n:
110+
op = code[i]
111+
i += 1
112+
if op >= HAVE_ARGUMENT:
113+
oparg = code[i] + code[i + 1] * 256 + extended_arg
114+
extended_arg = 0
115+
i += 2
116+
if op == EXTENDED_ARG:
117+
extended_arg = oparg * 65536
118+
if op in GLOBAL_OPS:
119+
yield op, oparg
120+
121+
else:
122+
def _walk_global_ops(code):
123+
"""
124+
Yield (opcode, argument number) tuples for all
125+
global-referencing instructions in *code*.
126+
"""
127+
for instr in dis.get_instructions(code):
128+
op = instr.opcode
129+
if op in GLOBAL_OPS:
130+
yield op, instr.arg
131+
132+
93133
class CloudPickler(Pickler):
94134

95135
dispatch = Pickler.dispatch.copy()
@@ -250,38 +290,34 @@ def save_function_tuple(self, func):
250290
write(pickle.TUPLE)
251291
write(pickle.REDUCE) # applies _fill_function on the tuple
252292

253-
@staticmethod
254-
def extract_code_globals(co):
293+
_extract_code_globals_cache = (
294+
weakref.WeakKeyDictionary()
295+
if sys.version_info >= (2, 7) and not hasattr(sys, "pypy_version_info")
296+
else {})
297+
298+
@classmethod
299+
def extract_code_globals(cls, co):
255300
"""
256301
Find all globals names read or written to by codeblock co
257302
"""
258-
code = co.co_code
259-
if not PY3:
260-
code = [ord(c) for c in code]
261-
names = co.co_names
262-
out_names = set()
263-
264-
n = len(code)
265-
i = 0
266-
extended_arg = 0
267-
while i < n:
268-
op = code[i]
303+
out_names = cls._extract_code_globals_cache.get(co)
304+
if out_names is None:
305+
try:
306+
names = co.co_names
307+
except AttributeError:
308+
# PyPy "builtin-code" object
309+
out_names = set()
310+
else:
311+
out_names = set(names[oparg]
312+
for op, oparg in _walk_global_ops(co))
269313

270-
i += 1
271-
if op >= HAVE_ARGUMENT:
272-
oparg = code[i] + code[i+1] * 256 + extended_arg
273-
extended_arg = 0
274-
i += 2
275-
if op == EXTENDED_ARG:
276-
extended_arg = oparg*65536
277-
if op in GLOBAL_OPS:
278-
out_names.add(names[oparg])
314+
# see if nested function have any global refs
315+
if co.co_consts:
316+
for const in co.co_consts:
317+
if type(const) is types.CodeType:
318+
out_names |= cls.extract_code_globals(const)
279319

280-
# see if nested function have any global refs
281-
if co.co_consts:
282-
for const in co.co_consts:
283-
if type(const) is types.CodeType:
284-
out_names |= CloudPickler.extract_code_globals(const)
320+
cls._extract_code_globals_cache[co] = out_names
285321

286322
return out_names
287323

python/pyspark/serializers.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,18 +370,38 @@ def _hijack_namedtuple():
370370
return
371371

372372
global _old_namedtuple # or it will put in closure
373+
global _old_namedtuple_kwdefaults # or it will put in closure too
373374

374375
def _copy_func(f):
375376
return types.FunctionType(f.__code__, f.__globals__, f.__name__,
376377
f.__defaults__, f.__closure__)
377378

379+
def _kwdefaults(f):
380+
# __kwdefaults__ contains the default values of keyword-only arguments which are
381+
# introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple
382+
# are as below:
383+
#
384+
# - Does not exist in Python 2.
385+
# - Returns None in <= Python 3.5.x.
386+
# - Returns a dictionary containing the default values to the keys from Python 3.6.x
387+
# (See https://bugs.python.org/issue25628).
388+
kargs = getattr(f, "__kwdefaults__", None)
389+
if kargs is None:
390+
return {}
391+
else:
392+
return kargs
393+
378394
_old_namedtuple = _copy_func(collections.namedtuple)
395+
_old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple)
379396

380397
def namedtuple(*args, **kwargs):
398+
for k, v in _old_namedtuple_kwdefaults.items():
399+
kwargs[k] = kwargs.get(k, v)
381400
cls = _old_namedtuple(*args, **kwargs)
382401
return _hack_namedtuple(cls)
383402

384403
# replace namedtuple with new one
404+
collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults
385405
collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
386406
collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
387407
collections.namedtuple.__code__ = namedtuple.__code__

0 commit comments

Comments
 (0)