@@ -620,66 +620,6 @@ abstract class DStream[T: ClassTag] (
620620 new ForEachDStream (this , context.sparkContext.clean(foreachFunc)).register()
621621 }
622622
623- // TODO: move pyprint to PythonDStream and executed by py4j call back function
624- /**
625- * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
626- * operator, so this PythonDStream will be registered as an output stream and there materialized.
627- * Since serialized Python object is readable by Python, pyprint writes out binary data to
628- * temporary file and run python script to deserialized and print the first ten elements
629- *
630- * Currently call python script directly. We should avoid this
631- */
632- private [streaming] def pyprint () {
633- def foreachFunc = (rdd : RDD [T ], time : Time ) => {
634- val iter = rdd.take(11 ).iterator
635-
636- // Generate a temporary file
637- val prefix = " spark"
638- val suffix = " .tmp"
639- val tempFile = File .createTempFile(prefix, suffix)
640- val tempFileStream = new DataOutputStream (new FileOutputStream (tempFile.getAbsolutePath))
641- // Write out serialized python object to temporary file
642- PythonRDD .writeIteratorToStream(iter, tempFileStream)
643- tempFileStream.close()
644-
645- // pythonExec should be passed from python. Move pyprint to PythonDStream
646- val pythonExec = new ProcessBuilder ().environment().get(" PYSPARK_PYTHON" )
647-
648- val sparkHome = new ProcessBuilder ().environment().get(" SPARK_HOME" )
649- // Call python script to deserialize and print result in stdout
650- val pb = new ProcessBuilder (pythonExec, sparkHome + " /python/pyspark/streaming/pyprint.py" , tempFile.getAbsolutePath)
651- val workerEnv = pb.environment()
652-
653- // envVars also should be pass from python
654- val pythonPath = sparkHome + " /python/" + File .pathSeparator + workerEnv.get(" PYTHONPATH" )
655- workerEnv.put(" PYTHONPATH" , pythonPath)
656- val worker = pb.start()
657- val is = worker.getInputStream()
658- val isr = new InputStreamReader (is)
659- val br = new BufferedReader (isr)
660-
661- println (" -------------------------------------------" )
662- println (" Time: " + time)
663- println (" -------------------------------------------" )
664-
665- // Print values which is from python std out
666- var line = " "
667- breakable {
668- while (true ) {
669- line = br.readLine()
670- if (line == null ) break()
671- println(line)
672- }
673- }
674- // Delete temporary file
675- tempFile.delete()
676- println()
677-
678- }
679- new ForEachDStream (this , context.sparkContext.clean(foreachFunc)).register()
680- }
681-
682-
683623 /**
684624 * Return a new DStream in which each RDD contains all the elements in seen in a
685625 * sliding window of time over this DStream. The new DStream generates RDDs with
0 commit comments