首页 理论教育 处理HDFS文件数据源的案例与解析

处理HDFS文件数据源的案例与解析

时间:2023-06-25 理论教育 版权反馈
【摘要】:案例中以手动构建文件,并移入监控目录来简化外部数据源存入该目录。},通过判断RDD是否为空,来过滤空数据,从而避免相应的job提交。在企业级的实时流处理中往往会引入Kafka作为分布式消息系统,以及Flume作为各种数据的收集系统。下面分别给出Spark Streaming整合Kafka的案例与解析,以及整合Flume的案例与解析。

处理HDFS文件数据源的案例与解析

除了Sockets,StreamingContext API还提供了从其他基础数据源创建DStream实例的方法,这里以文件数据源作为例子,解析文件流的处理,并在此基础上,引入Spark SQL,结合Spark Streaming和Spark SQL给出案例。

当企业的数据从各种数据源获取后,存入某个文件存储系统时(一般使用HDFS),比如将从Flume数据源收集来的日志文件存入HDFS文件系统等,可以使用文件流的方式去处理,该方法可以监控某一目录下的创建文件,并对文件进行处理。

案例中以手动构建文件,并移入监控目录来简化外部数据源存入该目录。实际在企业中应用时,应该引入类似Flume等日志聚合系统负责数据收集。

这里换一种方式执行应用,即在IDEA中运行应用程序,使用local方式运行,这种方式下,可以方便代码的调试。

具体代码如下:

978-7-111-51909-6-Chapter04-57.jpg

978-7-111-51909-6-Chapter04-58.jpg

打开配置Run的窗口,如图4.31所示。

978-7-111-51909-6-Chapter04-59.jpg

图4.31 IDEA的应用的Run配置

进入Run配置的编辑界面,如图4.32所示。

978-7-111-51909-6-Chapter04-60.jpg

图4.32 IDEA的应用的Run配置的编辑菜单

输入具体配置信息,如图4.33所示。

978-7-111-51909-6-Chapter04-61.jpg

图4.33 IDEA的应用的Run配置的编辑界面

其中,在Program arguments部分,添加监控的路径,这里设置为本地文件系统下的监控目录“E∶\\bd\\spark-1.3.0-bin-hadoop2.4\\examples\\src\\main\\resources1”由于不是集群模式提交,因此Main class部分可以设置为当前的应用类,需要使用类的全路径。(www.xing528.com)

单击窗口的Run按钮,启动流处理。然后,手动将文件添加到监控目录下。

注意:

1.监控目录下的文件应该有一样的数据格式,避免在内部解析时报错。

2.文件必须是在监控目录下创建,可以通过原子性的移动或重命名操作,放入目录。

3.一旦移入目录,文件就不能再修改了,如果文件是持续写入的话,新的数据是无法读取的。

案例中,必须在启动后新建一个文件,然后移入目录,创建时间比启动早的文件,移入目录时不会处理。

案例中,使用的文件内容源自:E∶\bd\spark-1.3.0-bin-hadoop2.4\examples\src main\resources\kv1.txt,即Spark自带的kv文件。由于没有安装HDFS,所以默认的是本地文件系统,不需要添加file∶//的scheme信息。对应在HDFS系统上时,可以增加hdfs://的scheme信息。

输出结果为:

978-7-111-51909-6-Chapter04-62.jpg

Spark 1.3版本中,增加了EmptyRDD的定义,用于源数据输入为空时构建的RDD,这里的代码,添加了EmptyRDD的判断,即if(!rdd.isEmpty()){…},通过判断RDD是否为空,来过滤空数据,从而避免相应的job提交。添加判断后,输出界面会像上面那样,只有收到数据时,才会提交job,进行处理。

如果没有添加该判断的话,代码会一直提交任务,但没有执行具体的数据处理,对应的界面如下:

978-7-111-51909-6-Chapter04-63.jpg

另外,在代码中,MasterURL使用的是.setMaster("local[1]"),因为文件流不需要Receiver,也就不需要额外占用一个内核。

之前在spark-shell提交应用的方式下提到过spark-shell交互式中已经自动导入了SQLContext的隐式导入,因此不需要再自己添加,但对应的spark-submit方式提交应用时,必须手动在使用的代码中添加“import sqlContext.implicits._”这句隐式转换的导入语句,否则,后续的toDF等调用会编译失败。

在企业级的实时流处理中往往会引入Kafka作为分布式消息系统,以及Flume作为各种数据的收集系统。下面分别给出Spark Streaming整合Kafka的案例与解析,以及整合Flume的案例与解析。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈