首页 理论教育 程序编写准则优化方案

程序编写准则优化方案

时间:2023-06-29 理论教育 版权反馈
【摘要】:从根本上讲,Spark程序的性能取决于用户编写的程序。所以在讲解具体的优化方法之前,首先简单描述一下Spark程序编写需要注意的地方。因此在Shuffle过程中,可能会发生大量的磁盘文件读写操作,以及数据的网络传输操作,而这无疑也会降低程序的执行速度。

程序编写准则优化方案

从根本上讲,Spark程序的性能取决于用户编写的程序。所以在讲解具体的优化方法之前,首先简单描述一下Spark程序编写需要注意的地方。在开发过程中,要时刻牢记这些原则,根据具体的业务逻辑,将这些原则结合起来,灵活地运用它们。

1.准则一:从同一个数据源尽量只创建一个RDD,后续不同的业务逻辑可以复用该 RDD,而不是基于该数据源重新创建一个新的RDD

这是因为,从数据源创建RDD常常涉及数据的读取,而数据的读取速度一般都比数据计算的速度慢,所以要减少从外部数据源加载数据的次数,尽量复用RDD而不是重新创建新的RDD,这点在程序业务逻辑复杂冗长的情况下常常被忽视,一个错误的示例代码如下所示:

978-7-111-55442-4-Chapter09-21.jpg

978-7-111-55442-4-Chapter09-22.jpg

当然,由于Spark程序执行时的延迟执行和基于Lineage最大化的pipeline的特性,由于rdd1被执行了两次算子操作(一次map,一次reduce),在执行reduce操作时,还会再次从源头重新计算一次rdd1的数据,即再次重新从HDFS加载外部数据源的数据。所以,要想彻底解决重复读取外部数据源的问题,还需结合“准则二:如果需要对某个RDD进行多次不同的transformation和action操作以应用于不同的业务分析需求,可以考虑对该RDD进行持久化操作,以避免action操作触发作业时多次重复计算该RDD”,才能保证一个RDD被多次使用时只被计算一次。

2.准则二:如果需要对某个RDD进行多次不同的Transformation和Action操作以应用于不同的业务分析需求,可以考虑对该RDD进行持久化操作,以避免Action操作触发作业时多次重复计算该RDD

需要考虑该准则是因为Spark程序执行的特性,即延迟执行和基于Lineage最大化的pipeline。简单来说,Spark中由于对某个RDD的Action操作触发了作业时,会基于Lineage从后往前推,找到该RDD源头的RDD,然后从前往后计算出结果。很明显,如果对某个RDD执行了多次Transformation和Action操作,每次Action操作触发了作业时都会重新从源头RDD处计算一遍来获得该RDD,然后再对这个RDD执行相应的操作。这种方式的性能显然是很差的。

所以需要对多次使用的RDD进行持久化。持久化之后,Spark就会根据持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对该RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

3.准则三:从数据源读取数据获得RDD后,要尽早进行filter过滤掉不需要的数据

Spark是居于内存的迭代计算模型,将不必要的数据尽早过滤掉(filter),减少内存的占用,从而提高程序的执行效率

需要说明的是,如果filter后会获得大量小文件,可能需要通过repartition或coalesce减小并行度。

4.准则四:尽量避免使用需要Shuffle的算子,且在必须Shuffle时尽量减少shuffle的数据量

如果有可能,要尽量避免使用Shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是Shuffle过程。Shuffle过程就是数据洗牌,简单来说,就是将分布在集群中多个结点上的包含同一个key的数据,拉取到同一个结点上,然后进行聚合或join等操作。redu-ceByKey、join等算子都会触发Shuffle操作。

Shuffle过程中,各个结点上的相同key都会先写入本地磁盘文件中,然后其他结点需要通过网络传输拉取各个结点上的磁盘文件中的含有相同key的记录。在将这些含有相同key的数据都拉取到同一个结点进行聚合操作时,还有可能会因为一个结点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在Shuffle过程中,可能会发生大量的磁盘文件读写操作,以及数据的网络传输操作,而这无疑也会降低程序的执行速度。

因此在开发过程中,应当尽可能避免使用reduceByKey、join、distinct和repartition等会进行Shuffle的算子,而尽量使用Map类的非Shuffle算子。这样的话,没有Shuffle操作或者仅有较少Shuffle操作,程序的执行就可以大大减少性能开销。

如果因为业务需要,一定要使用Shuffle操作且无法用Map类的算子来替代时,就要尽量减少Shuffle的数据量,可以通过使用map-side预聚合的算子来减少Shuffle的数据量。

所谓的map-side预聚合,指的是在每个结点本地map时,对含有相同的key记录进行了聚合操作,这点类似于MapReduce中的本地combiner。map-side预聚合之后,每个结点本地就只会有一条含有相同的key的记录(因为多条含有相同的key的记录都被聚合起来)。其他结点在拉取所有结点上的含有相同的key的记录时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘I/O和网络传输开销。

这里以ReduceByKey/AggregateByKey与groupByKey的对比来说明map-side预聚合的重要性。ReduceByKey/aggregateByKey算子会使用用户自定义的函数对每个结点本地的含有相同key的记录进行预聚合。而groupByKey算子是不会进行预聚合的,全部的数据都会在集群的各个结点之间分发和传输,性能相对来说肯定比较差。

GroupByKey与ReduceByKey的原理图如图9-16和图9-17所示。

从图中我们可以看出,groupByKey不会进行map端的预聚合,而是将所有map端的数据shuffle到reduce端,然后在reduce端进行聚合操作。

从图中我们可以看出,reduceByKey会首先在map端进行预聚合操作,然后将聚合后的数据shuffle到reduce端,由于map端的预聚合操作会减小数据量,所以需要在网络中传输的数据量就减小了,效率也就比groupByKey更高。

使用ReduceByKey替代groupByKey的实例代码如下。

978-7-111-55442-4-Chapter09-23.jpg

图9-17 ReduceByKey原理图(www.xing528.com)

978-7-111-55442-4-Chapter09-24.jpg

5.准则五:熟悉各个算子的背后机制,选择使用高性能的算子

Spark的一大特性就是提高了丰富的算子以满足不同的业务需要。一个良好的程序员需要熟知这些算子的背后机制,能够选择使用高性能的算子满足业务需求。这里列举几个常见的实例。

相对来说,可以使用reduceByKey/aggregateByKey替代groupByKey。

相对来说,可以使用mapPartitions替代普通Map,因为mapPartitions类的算子,一次函数调用会处理一个Partition所有的数据,而不是一次函数调用处理一条,所有性能相对来说会高一些。但是有时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理一个Partition所有的数据,如果内存不够,垃圾回收时是无法回收太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重。

相对来说,可以使用foreachPartitions替代foreach,原理类似于“使用mapPartitions替代Map”,也是一次函数调用处理一个Partition的所有数据,而不是一次函数调用处理一条数据。这个实例一个常见的经典应用场景是在写记录到数据库时,如果是普通的foreach算子,每次函数调用都需要创建一个数据库连接,然后写一条数据,此时势必会频繁地创建和销毁数据库连接,性能非常低下;但是如果用foreachPartitions算子一次性处理一个Partition的数据,那么对于每个Partition只要创建一个数据库连接,然后执行批量插入操作,此时性能肯定是比较高的。

相对来说,可以使用repartitionAndSortWithinPartitions替代Repartition与Sort类操作。事实上,repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,如果在Repartition重分区之后还要进行排序,官方建议直接使用repartitionAndSortWithinPartitions算子,因为该算子可以一边进行重分区的Shuffle操作,一边进行排序。Shuffle与Sort两个操作同时进行,比先Shuffle再Sort来说,性能肯定是比较高的。

对一个RDD执行filter算子后,如果可能过滤掉RDD中较多的数据(比如30%以上的数据),就建议使用coalesce算子,手动减少RDD的Partition数量,将RDD中的数据压缩到更少的Partition中去,以减少并行度,避免过多开辟Task的开销。因为Filter之后,RDD的每个Partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,每个Task处理的Partition中的数据量并不是很多,而且此时开辟的Task越多,可能速度反而越慢,因为Task的开辟和销毁也是有开销的。因此可以用coalesce来减少partition数量,将RDD中的数据压缩到更少的Partition,只需使用更少的Task即可处理完所有的Partition。

6.准则六:对大变量考虑使用广播机制

有时在开发过程中,会遇到需要在算子函数中使用外部大变量的场景(比如100M的大集合),那么此时就可以考虑使用Spark的广播(Broadcast)功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到Task中,此时每个Task都有一个变量副本。如果变量本身比较大的话(比如100 MB,甚至1 GB),那么大量的变量副本在网络中传输的性能开销,以及在各个结点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量会保证每个Executor的内存中只驻留一份变量副本,而Executor中的Task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

7.准则七:尽可能使用Kryo优化序列化性能

Spark默认的序列化器是org.apache.spark.serializer.JavaSerializer,但同时也支持使用Kryo序列化器org.apache.spark.serializer.KryoSerializer。由于默认的序列化器的性能和空间表现都比较差,而Kryo序列化器更快,压缩率也更高,所以我们应该优先使用Kryo序列化器而不是默认的序列化器。

8.准则八:使用优化的数据结构

Spark应用程序同普通程序一样需要考虑数据结构问题,由于Spark是优先基于内存的迭代计算模型,尤其需要考虑使用内存友好的数据结构。在可能及合适的情况下,应该使用占用内存较少的数据结构。

由于数据结构的知识是通用的,不单单只在Spark程序里需要考虑,这里仅简单描述一下常见的数据结构,更详细的情况请读者自行分析。

Java语言的特性导致有些数据结构会占用额外的空间,在Java中有以下3种类型的数据结构比较耗费内存。

1)Java的对象:每个Java对象都有对象头,对象头占用额外的16个字节(包含指向对象的指针等元数据信息),如果对象中只有一个int类型的变量,则此时会占据20个字节,也就是说对象的元数据占用了大部分的空间,所以在封装数据时尽量不要使用对象,可以使用JSON格式来封装数据。需要注意,Java中基本的数据类型会自动进行封箱操作,例如,int会自动变成Integer,这也会额外增加对象头的空间占用。

2)Java的字符串:每个字符串内部都有一个字符数组长度等额外信息,在实际占用内存方面要额外使用40个字节(每个字符串内部都使用字符数组来保存字符序列)。由于字符串中的每个字符占用2个字节(UTF-16编码),所以如果字符串内部有5个字符的话,实际上会占用50个字节。

3)Java中的集合类型(如HashMap、List等):集合的内部一般使用链表来实现,具体的每个数据则使用Entry等,这些也非常消耗内存。

所以Spark官方建议用户,在Spark编码实现中,特别是对于算子函数中的代码,尽量使用字符串替代对象,使用原始类型(如Int和Long)替代字符串,使用原生数组替代集合类型,这样会尽可能地减少内存占用,从而降低GC频率,提升性能。例如,List<Integer>list=new ArrayList<Integer>需要考虑改为使用int[]arrary=new int[]。

如果内存少于32 GB,可以在spark-env.sh中设置JVM参数-XX:+UseCompressedO-ops,以便使用4字节指针而不是8字节指针。与此同时,在Java 7或者更高版本中,设置JVM参数-XX:+UseCompressedStrings,以便采用8位来编码每一个ASCII字符。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈