@@ -835,9 +835,9 @@ class MQTTStreamTests(PySparkStreamingTestCase):
835835 def setUp (self ):
836836 super (MQTTStreamTests , self ).setUp ()
837837
838- utilsClz = self .ssc ._jvm .java .lang .Thread .currentThread ().getContextClassLoader () \
838+ MQTTTestUtilsClz = self .ssc ._jvm .java .lang .Thread .currentThread ().getContextClassLoader () \
839839 .loadClass ("org.apache.spark.streaming.mqtt.MQTTTestUtils" )
840- self ._utils = utilsClz .newInstance ()
840+ self ._MQTTTestUtils = MQTTTestUtilsClz .newInstance ()
841841 self ._MQTTTestUtils .setup ()
842842
843843 def tearDown (self ):
@@ -850,7 +850,7 @@ def tearDown(self):
850850 def _randomTopic (self ):
851851 return "topic-%d" % random .randint (0 , 10000 )
852852
853- def _validateStreamResult (self , sendData , stream ):
853+ def _validateStreamResult (self , sendData , dstream ):
854854 result = []
855855
856856 def get_output (_ , rdd ):
@@ -862,16 +862,15 @@ def get_output(_, rdd):
862862 self .assertEqual (sendData , receiveData )
863863
864864 def test_mqtt_stream (self ):
865- """Test the Python Kafka stream API."""
865+ """Test the Python MQTT stream API."""
866866 topic = self ._randomTopic ()
867867 sendData = "MQTT demo for spark streaming"
868868 ssc = self .ssc
869869
870- self ._MQTTTestUtils .createTopic (topic )
871870 self ._MQTTTestUtils .waitForReceiverToStart (ssc )
872871 self ._MQTTTestUtils .publishData (topic , sendData )
873872
874- stream = MQTTUtils .createStream (ssc , "tcp://" + MQTTTestUtils . brokerUri , topic )
873+ stream = MQTTUtils .createStream (ssc , "tcp://" + self . _MQTTTestUtils . brokerUri () , topic )
875874 self ._validateStreamResult (sendData , stream )
876875
877876
0 commit comments