Hello World 1 2 3 4 5 6 7 8 9 10 11 12 13 from pyspark import SparkConffrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextif __name__ == '__main__' : conf = SparkConf().setMaster("spark://node0:7077" ).setAppName("HelloWorld" ) sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 1 ) stream = ssc.socketTextStream("localhost" , 9999 ) stream.pprint() ssc.start() ssc.awaitTermination()
Data 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package cn.studybigdata.java.socket;import java.io.IOException;import java.io.OutputStream;import java.net.ServerSocket;import java.net.Socket;import java.nio.charset.StandardCharsets;public class StreamingHelloWorld { public static void main (String[] args) throws IOException, InterruptedException { ServerSocket sc = new ServerSocket (9999 ); System.out.println("等待链接... ..." ); Socket accept = sc.accept(); System.out.println("客户端链接成功:" +accept.getLocalAddress()); OutputStream outputStream = accept.getOutputStream(); while (true ) { String [] items = new String []{"a" , "b" , "c" , "d" , "e" , "f" }; for (String item : items) { item = item + "\n" ; System.out.println("输出字符: " + item); outputStream.write(item.getBytes(StandardCharsets.UTF_8)); Thread.sleep(500 ); } } } }
NetCat 安装依赖 1 [root@node0 netcat-0.7.1]# yum install gcc
下载 1 [root@node0 zhangsan]# curl -O -L http://sourceforge.net/projects/netcat/files/netcat/0.7.1/netcat-0.7.1.tar.gz
解压 1 [root@node0 zhangsan]# tar -zxf netcat-0.7.1.tar.gz
1 [root@node0 zhangsan]# cd netcat-0.7.1
配置 1 [root@node0 netcat-0.7.1]# ./configure
编译 1 [root@node0 netcat-0.7.1]# make
安装 1 [root@node0 netcat-0.7.1]# make install
使用 1 [root@node0 netcat-0.7.1]# netcat -lp 9999
Filter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 (python37) [zhangsan@node0 ~]$ vim Streaming.py from pyspark import SparkConffrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextif __name__ == '__main__' : uncivilized_words = ["愤怒" ,"生气" ,"辣鸡" ] conf = SparkConf().setMaster("spark://node0:7077" ).setAppName("HelloWorld" ) sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 1 ) stream = ssc.socketTextStream("localhost" , 9999 ) stream.filter (lambda word:word not in uncivilized_words).pprint() ssc.start() ssc.awaitTermination()
WordCount 1 2 record_list = stream.flatMap(lambda line:line.split(" " )).map (lambda word: (word, 1 )).reduceByKey(lambda a, b: a + b) record_list.pprint()
滚动窗口 1 2 3 4 data duration: 1s batch interval: 2s window Duration: 2s slide Duration: 2s
滑动窗口 1 2 3 4 data duration: 1s batch interval: 1s window Duration: 2s slide Duration: 1s
countByKey 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from pyspark import SparkConffrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextdef count_fun (new,old ): if old is None : old = 0 count = sum (new,old) return count if __name__ == '__main__' : conf = SparkConf().setMaster("local[*]" ).setAppName("HelloWorld" ) sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 1 ) ssc.checkpoint("file:///home/zhangsan/checkpoint" ) stream = ssc.socketTextStream("localhost" , 9999 ) stream.flatMap(lambda line:line.split(" " )).map (lambda word: (word, 1 )).reduceByKey(lambda a, b: a + b).saveAsTextFiles("file:///home/zhangsan/out" ) ssc.start() ssc.awaitTermination()
UpdateStateByKey 需要先设置检查点checkpoint。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from pyspark import SparkConffrom pyspark import SparkContextfrom pyspark.streaming import StreamingContextdef count_fun (new,old ): if old is None : old = 0 count = sum (new,old) return count if __name__ == '__main__' : conf = SparkConf().setMaster("local[*]" ).setAppName("HelloWorld" ) sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 1 ) ssc.checkpoint("file:///home/zhangsan/checkpoint" ) stream = ssc.socketTextStream("localhost" , 9999 ) stream.flatMap(lambda line:line.split(" " )).map (lambda word: (word, 1 )).reduceByKey(lambda a, b: a + b).updateStateByKey(count_fun).pprint() ssc.start() ssc.awaitTermination()
输出与持久化 pprint 1 2 3 4 5 6 def pprint (self, num=10 ): """ Print the first num elements of each RDD generated in this DStream. @param num: the number of elements from the first will be printed. """
Print函数,打印dstream中每批数据的前十个元素,这对于开发和调试很有用。
foreachRDD DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统。写数据到外部系统需要创建一个 connection object,开发者需要创建一个connection object用于连接。
根据TCP协议连接到远程的服务器,我们连接外部数据库需要自己的句柄。
在Driver中创建一个连接对象 1 2 3 4 5 6 dstream.foreachRDD { rdd => val connection = createNewConnection() rdd.foreach { record => connection.send(record) } }
这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等等。
在Worker中创建连接对象 1 2 3 4 5 6 7 dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
这会造成另外一个常见的错误,为每一个记录创建了一个连接对象。创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。
为RDD的partition创建一个连接对象 1 2 3 4 5 6 7 dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
一个更好的解决办法是利用rdd.foreachPartition方法。 为RDD的partition创建一个连接对象。
静态的连接对象池 1 2 3 4 5 6 7 8 dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = ConnectionPool .getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool .returnConnection(connection) } }
最后,可以通过在多个RDD或者批数据间重用连接对象 做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。
saveAsTextFiles 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from pyspark.conf import SparkConffrom pyspark.context import SparkContextfrom pyspark.streaming import StreamingContextconf = SparkConf().setAppName("helloworld" ).setMaster("local[*]" ) sc = SparkContext(conf=conf) ssc = StreamingContext(sc,3 ) lines = ssc.socketTextStream("localhost" , 9999 ) words_counts = lines.flatMap(lambda line: line.split(" " )).map (lambda word: (word, 1 )).reduceByKey(lambda a, b: a + b) words_counts.saveAsTextFiles("out1" ) ssc.start() ssc.awaitTermination()