|
15 | 15 | # limitations under the License. |
16 | 16 | # |
17 | 17 |
|
18 | | -import time |
| 18 | +import sys |
| 19 | +from signal import signal, SIGTERM, SIGINT |
19 | 20 |
|
20 | 21 | from pyspark.conf import SparkConf |
21 | 22 | from pyspark.files import SparkFiles |
@@ -63,22 +64,31 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, |
63 | 64 |
|
64 | 65 | """ |
65 | 66 |
|
66 | | - # launch call back server |
67 | | - if not gateway: |
68 | | - gateway = launch_gateway() |
69 | | -# gateway.restart_callback_server() |
70 | | - |
71 | 67 | # Create the Python Sparkcontext |
72 | 68 | self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome, |
73 | 69 | pyFiles=pyFiles, environment=environment, batchSize=batchSize, |
74 | 70 | serializer=serializer, conf=conf, gateway=gateway) |
| 71 | + |
| 72 | + # Start py4j callback server |
| 73 | + SparkContext._gateway.restart_callback_server() |
| 74 | + self._clean_up_trigger() |
75 | 75 | self._jvm = self._sc._jvm |
76 | 76 | self._jssc = self._initialize_context(self._sc._jsc, duration._jduration) |
77 | 77 |
|
78 | 78 | # Initialize StremaingContext in function to allow subclass specific initialization |
79 | 79 | def _initialize_context(self, jspark_context, jduration): |
80 | 80 | return self._jvm.JavaStreamingContext(jspark_context, jduration) |
81 | 81 |
|
| 82 | + def _clean_up_trigger(self): |
| 83 | + """Kill py4j callback server properly using signal lib""" |
| 84 | + |
| 85 | + def clean_up_handler(*args): |
| 86 | + SparkContext._gateway.shutdown() |
| 87 | + sys.exit(0) |
| 88 | + |
| 89 | + for sig in (SIGINT, SIGTERM): |
| 90 | + signal(sig, clean_up_handler) |
| 91 | + |
82 | 92 | def start(self): |
83 | 93 | """ |
84 | 94 | Start the execution of the streams. |
|
0 commit comments