首页 理论教育 准备工作:处理Kafka数据源

准备工作:处理Kafka数据源

时间:2023-06-25 理论教育 版权反馈
【摘要】:2)一种是Spark 1.3引入的一个试验性的方法,方法中不再使用Receivers,而是直接调用Kafka提供的低层次的API。Kafka的Topic的分区数,是Consumer可以读取的并行数的最高限制值,这里对应Spark Streaming并行读取的最大值。使用jps命令查看:其中,Kafka就是启动的broker服务的进程。

准备工作:处理Kafka数据源

Kafka是一种高吞吐量的分布式发布订阅消息系统,这里实现两种读取Kafka数据的方法:

1)一种是Spark 1.3版本之前的方法,使用Receivers和Kafka提供的高层次的API。

2)一种是Spark 1.3引入的一个试验性的方法,方法中不再使用Receivers,而是直接调用Kafka提供的低层次的API。

这两种方法有着不同的编程模型、性能特征和语义级的保证,在读取数据的细节上有所不同。

一、Kafka基础知识的准备

发布消息通常有两种模式:队列模式(Queuing)和发布-订阅模式(Publish-Sub-scribe)。队列模式中,Consumers可以同时从服务端读取消息,每个消息只被其中一个Con-sumer读到;发布-订阅模式中消息被广播到所有的Consumer中。

Kafka的Topic的分区数,是Consumer可以读取的并行数的最高限制值,这里对应Spark Streaming并行读取的最大值。

当Consumer使用相同的groupId去读取同一个Topic数据时,该Topic会将分区数据分发到各个Consumer,即队列模式的消息发布模式,如果Consumer使用不同的groupID去读取同一个Topic数据时,该Topic的分区数据会广播到各个Consumer上,即使用广播的消息发布模式。

二、Kafka集群的准备

为了简化Kafka集群的搭建,集中针对Spark Streaming对Kafka流数据处理的实践上,这里以尽可能简单地方式构建Kafka集群。这里使用kafka_2.10-0.8.2.1.tgz版本。

简单搭建步骤如下:

1.获取Kafka部署包,并解压到指定目录

可以到Kafka的官方网站http∶//kafka.apache.org/,下载部署包,这里使用wget命令下载,并解压:

当前在wxx集群的wxx215节点部署Kafka。

2.启用默认配置的Zookeeper服务,可以直接使用现有的Zookeeper集群

启动命令中-daemon选项用户设置启动脚本在后台运行。启动后,使用jps命令查看进程,会看到Zookeeper服务:

其中,QuorumPeerMain对应启动的Zookeeper服务。

3.启动Kafka服务

输入命令修改配置属性:

修改相关的属性,当前只修改下面两个属性:

当前Zookeeper和Broker都在wxx215机器上启动,后续可以在其他机器上添加Kafka的服务。

服务属性文件中:

其中:(www.xing528.com)

1)broker.id属性:配置信息是服务的全局唯一标识,当前为第一个服务,因此直接使用,不做修改,整个Kafka中服务的broker.id值必须唯一不能重复。

2)port属性:服务使用的端口号,如果是在单台机器上启动多个broker服务,那么需要使用不同的端口号。

3)log.dir属性:用于Kafka记录日志文件的目录,如果在单台机器上启动多个broker服务的话,应该设置成不同目录,避免多个broker服务在相同目录下生成目录文件。

修改完服务的属性文件后,启动服务:

其中,-daemon选项指定后台方式运行。

使用jps命令查看:

其中,Kafka就是启动的broker服务的进程。

假设重新启动一个新的broker,复制config/server.properties为config/server_1.properties,修改其中关键的三个属性为:

启动broker服务:

对应的进程如下:

如果停止服务可以启动bin/kafka-server-stop.sh或直接kill-9 pid方式,但是,脚本方式会kill掉当前所有的Kafka服务(具体可以查看脚本命令),因此如果在单机上启动了多个服务,而只需要停止其中某一个时,应该选用kill命令。

创建Kafka的Topic,为了简化,这里使用一个Topic,输入创建命令:

创建名为kafka_test的topic,复制因子设为2,同时分区数为4,注意,分区数是read parallelisms的最大值。

查询Kafka当前的Topic信息,输入命令:

指定--zookeeper选项的值为wxx215:2181,对应的Topic,即刚创建的。

创建Producer的提交脚本start-producer.sh:

该脚本对应的类的使用方法:

其中,参数metadataBrokerList的值为:wxx215∶9092,wxx215∶9093,即当前启动的Kafka服务(broker列表,逗号分隔);参数topic的值即刚才创建的Topic的名字kafka_test;参数messagesPerSec的值为20,即每个间隔时间发送的消息条数;参数wordsPerMessage的值为10,即每条消息中的单词个数。

创建Consumer的提交脚本start-consumer.sh:

该脚本对应的类的使用方法:

其中,参数zkQuorum的值为:wxx215∶2181,即当前启动的Zookeeper连接属性(Host∶port列表,逗号分隔);参数group的值是指定当前Consumer的groupId,这里设置为group1;参数topics的值是kafka_test,即刚才创建的Topic的名字kafka_test;参数numThreads的值是4,即读取Kafka流的线程数,当前设置成分区数的个数,对应的每个线程读取一个分区数据。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈