首页 理论教育 Window操作实例演示

Window操作实例演示

时间:2023-06-20 理论教育 版权反馈
【摘要】:现在要实现一个让窗口不断地移动,在窗口移动的过程中统计最近的word 单词个数。在SparkStreamingExample工程下新建一个用于Window操作的Scala文件Window-WordCountDemo.scala如下所示。WindowWordCountDemo的具体实现代码如下。3)最后使用words.map.reduceByKeyAndWindow进行Windows操作。配置运行参数来运行WindowWordCountDemo.scala,配置参数如图7-22所示。由于5s切分一次数据,我们设置每30s监听一次Window窗口,所以30s就会产生6个RDD。这也就是说window(窗口)操作是以特定时间段并以特定时间间隔为单位进行的滑动操作,此操作也是Spark Streaming的主要运行场景之一。

Window操作实例演示

现在要实现一个让窗口不断地移动,在窗口(Window)移动的过程中统计最近的word 单词个数。这种场景类似于博客里面统计最近的一段时间(例如最近的一分钟)的最热门词汇。

(1)在SparkStreamingExample工程下新建一个用于Window操作的Scala文件Window-WordCountDemo.scala如下所示。

978-7-111-52860-9-Chapter07-136.jpg

(2)WindowWordCountDemo的具体实现代码如下。

978-7-111-52860-9-Chapter07-137.jpg

978-7-111-52860-9-Chapter07-138.jpg

(3)上述代码解析如下。

1)首先创建SparkContext的实例sc和StreamingContext的实例ssc,参数Seconds(5)表示5s切分一次数据,然后使用ssc.checkpoint(".")设置checkpoint。

2)紧接着使用ssc.socketTextStream(args(0)args(1).toInt,StorageLevel.MEMORY_ONLY_SER)从数据流里面获取数据,参数args(0)表示服务器名称,args(1)表示服务器的端口号,读入数据后使用lines.flatMap(_.split(","))进行flatMap操作。

3)最后使用words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+ b),Seconds(args(2).toInt),Seconds(args(3).toInt))进行Windows操作。因为要进行reduce-ByKeyAndWindow操作(也就是要进行reduceByKey操作),所以要先进行map操作,把数据映射成(x,1),映射完成后就开始使用reduceByKeyAndWindow((a:Int,b:Int)=>(a+b)将上一次和本次的数据进行累加。这里的参数Seconds(args(2).toInt)表示Windows的监听时间间隔,Seconds(args(2).toInt)的值必须是切分数据时设置的时间间隔的倍数(也就是必须是5的倍数),参数Seconds(args(3).toInt)表示每次窗口滑动的时间间隔,Seconds(args(3).toInt)的值也必须是切割数据的倍数。所以words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(args(2).toInt),Seconds(args(3). toInt))表示的意思是每隔Seconds(args(3).toInt)s的时间就会对前Seconds(args(2).toInt)s的时间进行一次单词计数的操作。

(4)配置运行参数来运行WindowWordCountDemo.scala,配置参数如图7-22所示。

Program arguments中一共有四个参数,第一个参数localhost表示服务器名称,由于使用本地模式运行,所以填的是localhost;第二个参数8888表示服务器的端口号;第三个参数30表示表示Window的监听时间间隔,由于每隔5s中对数据进行一次RDD切割,所以30s内进行了6次数据切割,这样就形成了6个RDD,这6个RDD形成了一个Window;第四个参数10表示窗口每次滑动的时间间隔,它也是每次切割数据的时间5的倍数,这样每切割两次数据窗口就会移动一次。每隔10s对前30s的数据进行一次单词统计的操作。

978-7-111-52860-9-Chapter07-139.jpg(www.xing528.com)

图7-22 配置运行参数

(5)接下来启动服务器,在命令行中进入/usr/local/spark/spark-1.1.0-bin-ha-doop2.4,然后输入java-classpath/usr/local/spark/spark-1.1.0-bin-hadoop2.4/SparkStre-amingExample.jar spark_streaming_example.SaleDeviceSimulation/root/user/local/idea/network-data.txt 88882000,如下所示。

978-7-111-52860-9-Chapter07-140.jpg

这里使用的是Java的运行模式,首先指定了jar包的classpath路径为/usr/local/spark/ spark-1.1.0-bin-hadoop2.4/SparkStreamingExample.jar;然后指定运行jar包中treaming_ example包下的SaleDeviceSimulation.scala文件;接着使用/root/user/local/idea/networkda-ta.txt指定服务器端向客户端发送的文本文件;紧接着用8888指定服务器端口;最后用2000指定每隔2s向客户端发送一行数据。回车后服务器端一直处于等待监听是否有客户端连接上来的状态,如果有客户端连接上来之后服务器就每隔2s向连接上来的服务器发送一次文本。

(6)运行WindowWordCountDemo.scala,结果如下。

978-7-111-52860-9-Chapter07-141.jpg

978-7-111-52860-9-Chapter07-142.jpg

978-7-111-52860-9-Chapter07-143.jpg

从时间Time:1430067825000ms、Time:1430067835000ms、Time:1430067845000ms、Time:1430067855000ms、Time:1430067865000ms、Time:1430067875000ms、Time:1430-067885000ms可以看出,每隔10s切割了一次数据。由于5s切分一次数据,我们设置每30s监听一次Window窗口,所以30s就会产生6个RDD。由于服务器每隔2s发送一次数据,所以30s内就应该产生15条记录。

978-7-111-52860-9-Chapter07-144.jpg

978-7-111-52860-9-Chapter07-145.jpg

从最后的运行结果中我们可以看出记录的总数2+1+2+4+1+1+1+1+2和3+3+3+1+1+1+3都等于15,即使再往下记录依然是15。这也就是说window(窗口)操作是以特定时间段并以特定时间间隔为单位进行的滑动操作,此操作也是Spark Streaming的主要运行场景之一。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈