基于Flume风格的推送数据方式,本质上是Spark Streaming在Flume中建立一个Avro agent的接收器,用于接收流数据,比如企业要处理的海量日志数据,然后在Flume收集到数据后推送到Spark进行处理。
一般需要满足以下两个要求:
1)当“Flume+Spark Streaming”应用程序启动时,其中一台Spark Worker节点必须是Flume推送数据的那台机器。
2)同时,Flume必须可以向这台机器的指定端口推送数据。
一、配置Flume
基于推送方式的流数据应用,需要为Flume Agent配置一个具有如下格式的Avro sink配置信息,如下所示:

其中,agent是配置的Flume Agent的名字,avroSink为接收数据的Sink的名字,memo-ryChannel为该Sink关联的Channel的名字,hostname和port指定Sink所在的机器信息。
当前“Flume+Spark Streaming”应用程序需要添加的依赖包:

可以在SBT或Maven的构建文件中添加该依赖信息,或者在IDEA中将该jar包添加到Libraries中。
注意:配置信息中的hostname必须和集群的资源管理器节点的hostname一样,这样在资源调度时,才能匹配名字并在对应的机器节点上启动Receiver。
部署时,可以将spark-submit没有提供的依赖包一起打包到应用程序的jar包中,这样在spark-submit提交时,可以不需要通过--jars等方式增加外部依赖包。
二、Flume+SparkStreaming实践案例与解析
(一)FlumeAgent配置案例与解析
配置文件avro.conf的案例与解析:


对应的数据流如图4.40所示。

图4.40 当前配置对应的数据流图
配置项解析的具体内容参考表4.2。
表4.2 配置项及其说明

(二)应用程序案例与解析——对应单Receiver场景
应用程序代码:


(三)开始案例实践与解析
1.使用avro.conf配置属性文件,启动Flume Agent,命令如下:

其中:
1)-c:指定当前配置文件的目录,设置为conf。
2)-f:指定当前使用的配置文件。
3)-n:指定配置文件中要启动的Agent名字。
当还没有启动Spark Streaming应用服务时,Agent向avroSink指定的地址发送信息会失败,错误信息类似于:(https://www.xing528.com)


可以先忽略,等Spark Streaming应用程序启动后,打开Receiver开始接收数据后,A-gent的推送数据就不会出现连接失败的错误了。
2.Avro客户端模拟
Flume内部提供了一个Avro Client的实现,可以通过avro-client向Agent提供数据,发送数据的avro-client命令,指定source-r1的hostname和port:

3.启动Spark Streaming应用

注意:这里的host和port,就是Flume Agent的avroSink要发送到的目的地址,也就是Worker节点上,构建Receiver所需要的地址。
启动后,打开Web Interface界面(http∶//wxx225∶4040),其中,wxx225是启动应用的Driver Program所在节点。可以看到Spark Streaming应用增加了Streaming的信息,如图4.41所示。
这是总体统计信息,包含了以下内容:
1)Receiver个数,因为当前只设置了一个avroSink,同时其对应的port仅配置了一个,因此这里只有一个Receiver。
2)Batch interval:对应批数据的时间片大小,这里是代码中设置的10秒。
3)Processed batches:这表示到目前为止一共处理了多少批数据。
4)Waiting batches:等待处理的批数据个数。
5)Received records:接收到的记录条数。
6)Processed records:已经处理的记录条数。

图4.41 Spark Streaming应用的Streaming界面
再下面是一些统计信息,如图4.42所示。

图4.42 Spark Streaming的Streaming界面统计信息
这是Executor页面的信息,如图4.43所示。

图4.43 Spark Streaming的Executor页面
可以看到,当前使用了3个Executor。其中Active Tasks列,对应了Receiver所在的节点。也是我们在配置文件中指定的数据avroSink的hostname对应的节点。
4.启动Flume Avro Client发送数据
这时,对应的Agent界面(即k1这个Sink)输出内容如下:

而在Spark Streaming的Driver节点上,也就是当前启动应用的wxx225节点上,也会有对应的界面输出信息,这是对avroSink收集数据的处理结果。
为了测试Flume与Spark间的收发数据,先停止应用程序,停止期间使用Flume Avro Client工具发送两次文件(一个文件对应有19条Event记录),然后,开始启动应用程序,完全启动后,再使用Flume avroclient工具发送一次文件,这时候对应的界面信息如下:


即,Flume会在Channel中缓存数据直到Sink成功将数据推送到Spark。
在Spark 1.3版本中已经将Flume访问放到External部分,在实际执行时,不用将依赖的Flume打包进来就可以直接提交应用。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。
