首页 理论教育 如何优化批处理时间?

如何优化批处理时间?

时间:2023-06-25 理论教育 版权反馈
【摘要】:减少批处理时间,在调优指南上也对细节做了一定的分析,这里着重分析比较重要的几个方面。因此,tasks的并行度大致等于“批数据的时间片/接收块的时间间隔”。以减少由于序列化引起的CPU开销,进而提高性能。可以通过以下修改来减低这种开销:1.task的序列化:使用Kryo序列化机制来序列化task,来减少task的大小,因此也减少了向Slaves发送的时间了。这些修改,可以将批处理时间减少到几百毫秒,进而达到亚秒级的处理。

如何优化批处理时间?

减少批处理时间,在调优指南上也对细节做了一定的分析,这里着重分析比较重要的几个方面。

一、在数据接收上的并行度

1.Input DStream的并行度

通过网络从各种数据源(如Kafka,Flume,Socket等)接收数据时,会要求将数据反序列化并存储到Spark。如果数据接收这块变成系统瓶颈的话,就应该考虑提高数据接收的并行度了。

注意:每个Input DStream创建一个Receiver(运行在每个Worker节点上),因此只接收一个数据流,因此可以通过构建多个Input DStreams,并且进行配置,让它们从来自数据源的流数据的不同分区接收数据,相应地,数据源提供的分区个数也就成了Input DStream并行度的最大值了。

最后可以将创建的多个DStream合并为单个DStream进行处理。

2.任务的并行度——对应RDD的分区数

与并行度相关的另一个需要考虑的配置参数是spark.streaming.blockInterval,这是Re-ceiver的块时间间隔。对大部分的Receivers,接收到的数据在存储到Spark内存之前,会先合并到blocks中。而这个块的个数,就对应了批数据,也就是RDD的分区数,也就是RDD的并行任务(task)数了。

因此,tasks的并行度大致等于“批数据的时间片/接收块的时间间隔”。比如,块间隔时间为200毫秒,时间片为2秒时,对应的tasks就是2000毫秒/200毫秒,也就是并行的tasks个数为10。如果这里并行的tasks数远小于集群可用的内核数,则效率较低。因此,在给定的批数据时间片前提下,需要修改块的时间间隔,也就是spark.streaming.blockInterval,来提高tasks的并行度。

一般建议将块的时间的最小值设置为50毫秒,如果再低的话,task启动的开销就会增加。

3.显示修改并行度

另一个可选的方法是,从多输入流Receivers接收数据时,显示地调用重分区的方法(inputStream.repartition方法)。这样可以在进一步处理数据之前,先把在指定数量的节点上接收到的数据分发到集群上。

二、在数据处理上的并行度

如果计算过程中任何一个Stage的任务并行度不够高的话,可能会导致集群资源没有被充分地利用起来。

比如,针对Key-Value类型的DStream的一些聚合操作,如reduceByKey和reduce-ByKeyAndWindow等(与RDD类似,对应地在隐式转化的PairDStreamFunctions类中),其分区器使用的是默认的分区器,默认分区器的构建可以参考章节2.2.9分区数设置的案例与源码解析部分,可以通过配置spark.default.parallelism属性,来提高分区器的分区个数,当然,也可以通过指定API中的并行度参数来显示设置。

三、DataSerialization(www.xing528.com)

可以通过优化序列化的格式来减少数据序列化的开销。在Spark Streaming中,有两类数据会被序列化。

1.输入数据

默认情况下,通过Receivers接收的输入数据会被存储在Executor的内存中,默认的存储级别为StorageLevel.MEMORY_AND_DISK_SER_2。这是因为,将数据序列化(_SER)成bytes可以减少GC的开销,而数据的备份(_2)是为了针对Executor故障的容错性。

序列化数据明显是有一定的开销的,首先Receiver必须将接收到的数据反序列化,然后再使用Spark指定的序列化格式将数据序列化。

2.在Spark Streaming操作中的RDD持久化

流计算过程中产生的RDD可能会被持久化到内存中。比如,窗口类的操作为了重复处理数据,会将数据持久化到内存中。这和Spark内核中RDD默认的持久化是不一样的,RDD默认是使用StorageLevel.MEMORY_ONLY进行持久化。

另外,在窗口类的操作过程中,是默认进行持久化的,而RDD的持久化是需要人为触发的。

这两种情况下的持久化一般都应该使用Kryo序列化,这样可以减少CPU和内存的开销。

比较特殊的情况下,可以将数据以反序列化对象进行持久化,只要不会引起高昂的GC开销即可。比如,如果批间隔只设置成几秒(对应数据量比较小),就可以通过显示地设定存储级别,去除数据持久化中的序列化。以减少由于序列化引起的CPU开销,进而提高性能。

四、TaskLaunchingOverheads

如果每秒钟启动的task数过高(比如,每秒启动50次或更多时),相应地,向Slaves发送tasks的开销就比较大了,这会导致很难实现亚秒级的延迟。可以通过以下修改来减低这种开销:

1.task的序列化:使用Kryo序列化机制来序列化task,来减少task的大小,因此也减少了向Slaves发送的时间了。

2.执行模式(Execution Mode):task在Standalone模式或coarse-grained Mesos模式下的启动时间,比在fine-grained Mesos模式下要少很多。具体可以从官方网站上,在Mesos上运行的指南中获取更详细的信息。

这些修改,可以将批处理时间减少到几百毫秒,进而达到亚秒级的处理。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈