|
28 | 28 | import socket |
29 | 29 | from subprocess import Popen, PIPE |
30 | 30 | from tempfile import NamedTemporaryFile |
31 | | -from threading import Thread |
| 31 | +from threading import Thread, Lock |
32 | 32 | from collections import defaultdict |
33 | 33 | from itertools import chain |
34 | 34 | from functools import reduce |
|
55 | 55 |
|
56 | 56 | __all__ = ["RDD"] |
57 | 57 |
|
| 58 | +# Lock which will make sure that dependend broadcast variables are pickled along |
| 59 | +# with their PythonRDD wrapped function when using multple threads(SPARK-12717). |
| 60 | +_lock = Lock() |
58 | 61 |
|
59 | 62 | def portable_hash(x): |
60 | 63 | """ |
@@ -2451,10 +2454,12 @@ def _jrdd(self): |
2451 | 2454 | else: |
2452 | 2455 | profiler = None |
2453 | 2456 |
|
2454 | | - wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer, |
2455 | | - self._jrdd_deserializer, profiler) |
2456 | | - python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func, |
2457 | | - self.preservesPartitioning) |
| 2457 | + with _lock: |
| 2458 | + wrapped_func = _wrap_function(self.ctx, self.func, |
| 2459 | + self._prev_jrdd_deserializer, self._jrdd_deserializer, profiler) |
| 2460 | + python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func, |
| 2461 | + self.preservesPartitioning) |
| 2462 | + |
2458 | 2463 | self._jrdd_val = python_rdd.asJavaRDD() |
2459 | 2464 |
|
2460 | 2465 | if profiler: |
|
0 commit comments