1919
2020import os
2121import sys
22+ from threading import RLock , Timer
2223
2324from py4j .java_gateway import java_import , JavaObject
2425
3233__all__ = ["StreamingContext" ]
3334
3435
36+ class Py4jCallbackConnectionCleaner (object ):
37+
38+ """
39+ A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
40+ It will scan all callback connections every 30 seconds and close the dead connections.
41+ """
42+
43+ def __init__ (self , gateway ):
44+ self ._gateway = gateway
45+ self ._stopped = False
46+ self ._timer = None
47+ self ._lock = RLock ()
48+
49+ def start (self ):
50+ if self ._stopped :
51+ return
52+
53+ def clean_closed_connections ():
54+ from py4j .java_gateway import quiet_close , quiet_shutdown
55+
56+ callback_server = self ._gateway ._callback_server
57+ if callback_server :
58+ with callback_server .lock :
59+ try :
60+ closed_connections = []
61+ for connection in callback_server .connections :
62+ if not connection .isAlive ():
63+ quiet_close (connection .input )
64+ quiet_shutdown (connection .socket )
65+ quiet_close (connection .socket )
66+ closed_connections .append (connection )
67+
68+ for closed_connection in closed_connections :
69+ callback_server .connections .remove (closed_connection )
70+ except Exception :
71+ import traceback
72+ traceback .print_exc ()
73+
74+ self ._start_timer (clean_closed_connections )
75+
76+ self ._start_timer (clean_closed_connections )
77+
78+ def _start_timer (self , f ):
79+ with self ._lock :
80+ if not self ._stopped :
81+ self ._timer = Timer (30.0 , f )
82+ self ._timer .daemon = True
83+ self ._timer .start ()
84+
85+ def stop (self ):
86+ with self ._lock :
87+ self ._stopped = True
88+ if self ._timer :
89+ self ._timer .cancel ()
90+ self ._timer = None
91+
92+
3593class StreamingContext (object ):
3694 """
3795 Main entry point for Spark Streaming functionality. A StreamingContext
@@ -47,6 +105,9 @@ class StreamingContext(object):
47105 # Reference to a currently active StreamingContext
48106 _activeContext = None
49107
108+ # A cleaner to clean leak sockets of callback server every 30 seconds
109+ _py4j_cleaner = None
110+
50111 def __init__ (self , sparkContext , batchDuration = None , jssc = None ):
51112 """
52113 Create a new StreamingContext.
@@ -95,6 +156,8 @@ def _ensure_initialized(cls):
95156 jgws = JavaObject ("GATEWAY_SERVER" , gw ._gateway_client )
96157 # update the port of CallbackClient with real port
97158 gw .jvm .PythonDStream .updatePythonGatewayPort (jgws , gw ._python_proxy_port )
159+ _py4j_cleaner = Py4jCallbackConnectionCleaner (gw )
160+ _py4j_cleaner .start ()
98161
99162 # register serializer for TransformFunction
100163 # it happens before creating SparkContext when loading from checkpointing
0 commit comments