首页 理论教育 DataFrameAPI的应用案例与分析

DataFrameAPI的应用案例与分析

时间:2023-06-25 理论教育 版权反馈
【摘要】:针对相同功能的API进行分组,基本上以官方网站上API的顺序给出应用案例。十六、agg1.定义2.功能描述agg这一系列的方法,为DataFrame提供数据列不需要经过groups就可以执行的统计操作。这里分别统计了age的最大值和salary的平均值。相当于两个DataFrame做减法。

DataFrameAPI的应用案例与分析

针对相同功能的API进行分组,基本上以官方网站上API的顺序给出应用案例。

一、collect与collectAsList

1.定义

978-7-111-51909-6-Chapter03-69.jpg

2.功能描述

collect返回一个数组,包含DataFrame中包含的全部Rows。

collectAsList返回一个Java List,包含DataFrame中包含的全部Rows。

3.示例

978-7-111-51909-6-Chapter03-70.jpg

4.示例解析

案例中首先加载列people.json文件,然后以表格形式呈现其内容。

两个collect型的方法都可以获取df的全部Rows数据,只是返回的类型不同。调用col-lect方法,返回的是数组,数组元素的类型为org.apache.spark.sql.Row;调用collectAslist,返回类型java.util.List[org.apache.spark.sql.Row]。

二、count

1.定义

978-7-111-51909-6-Chapter03-71.jpg

2.功能描述

返回DataFrame的rows个数。

3.示例

978-7-111-51909-6-Chapter03-72.jpg

三、first

1.定义

def first():Row

2.功能描述

返回DataFrame的第一个row。

3.示例

978-7-111-51909-6-Chapter03-73.jpg

四、head

1.定义

978-7-111-51909-6-Chapter03-74.jpg

2.功能描述

不带参数的head方法,返回DataFrame的第一个Row。指定参数n时,则返回前n个Rows。

3.示例

978-7-111-51909-6-Chapter03-75.jpg

五、show

1.定义

978-7-111-51909-6-Chapter03-76.jpg

2.功能描述

不带参数的show方法,以表格形式显示DataFrame的前20个Rows。指定参数numRows时,则显示指定numRows个数的Rows。

3.示例

978-7-111-51909-6-Chapter03-77.jpg

六、take

1.定义

978-7-111-51909-6-Chapter03-78.jpg

2.功能描述

返回DataFrame中指定的前n个Rows。

3.示例

978-7-111-51909-6-Chapter03-79.jpg

七、cache

1.定义

978-7-111-51909-6-Chapter03-80.jpg

2.功能描述

将DataFrame缓存到内存中。

3.示例

978-7-111-51909-6-Chapter03-81.jpg

978-7-111-51909-6-Chapter03-82.jpg

可以看到,当执行了cache操作后,对DataFrame执行操作时,不需要再从原始数据源加载数据。

八、Columns

1.定义

978-7-111-51909-6-Chapter03-83.jpg

2.API的功能描述

以数组形式返回DataFrame的全部列名。

3.示例

978-7-111-51909-6-Chapter03-84.jpg

九、dtypes

1.定义

978-7-111-51909-6-Chapter03-85.jpg

2.功能描述

以数组形式返回DataFrame的所有列名及其对应数据类型。

3.示·例

978-7-111-51909-6-Chapter03-86.jpg

十、explain

1.定义

978-7-111-51909-6-Chapter03-87.jpg

2.功能描述

这两个方法用于调试目的,不带参数时,仅将DataFrame的物理计划打印到控制台上;当指定参数extended为true时,打印所有计划到控制台上,包括物理计划、逻辑计划。

3.示例

978-7-111-51909-6-Chapter03-88.jpg

978-7-111-51909-6-Chapter03-89.jpg

由“Code Generation:false”可以看到,当前代码生成(CG)优化没有开启。

十一、isLocal

1.定义

978-7-111-51909-6-Chapter03-90.jpg

2.功能描述

如果collect和take方法可以在本地运行(即不需要任何Spark Executors)时,返回true。

3.示例

978-7-111-51909-6-Chapter03-91.jpg

十二、printSchema

1.定义

978-7-111-51909-6-Chapter03-92.jpg

2.功能描述

以树型结构将DataFrame的Schema信息打印到控制台上。

3.示例

978-7-111-51909-6-Chapter03-93.jpg

978-7-111-51909-6-Chapter03-94.jpg

树节点由列名及其数据类型组成,其中nullable表示该列是否可以取null值。

十三、registerTempTable

1.定义

978-7-111-51909-6-Chapter03-95.jpg

2.功能描述

将DataFrame注册为指定名字的临时表。

3.示例

978-7-111-51909-6-Chapter03-96.jpg

注册成临时表之后,可以使用SQLContext的sql方法,执行SQL语句。

十四、schema

1.定义

978-7-111-51909-6-Chapter03-97.jpg

2.功能描述

返回DataFrame的Schema信息,对应类型为StructType。

3.示例

978-7-111-51909-6-Chapter03-98.jpg

十五、toDF

1.定义

978-7-111-51909-6-Chapter03-99.jpg

2.功能描述

不带参数的toDF返回它本身,带字符串数组的参数时,返回新的DataFrame,该Dat-aFrame重命名了各列名。

3.示例

978-7-111-51909-6-Chapter03-100.jpg

注意:在调用带参的toDF方法时,参数个数必须和调用者DataFrame的列个数一样。

十六、agg

1.定义

978-7-111-51909-6-Chapter03-101.jpg

2.功能描述

agg这一系列的方法,为DataFrame提供数据列不需要经过groups就可以执行的统计操作。

3.示例

978-7-111-51909-6-Chapter03-102.jpg

可以看到,直接用agg方法和先用groupBy分组再调用agg方法的结果是一样的。这里分别统计了age的最大值和salary的平均值。

978-7-111-51909-6-Chapter03-103.jpg

978-7-111-51909-6-Chapter03-104.jpg

这是使用Map作为参数的示例,分别统计列age的最小值和salary的平均值。

978-7-111-51909-6-Chapter03-105.jpg

这是使用二元组重复参数作为参数的示例,分别统计列age的最小值和salary的平均值。

十七、apply

1.定义

978-7-111-51909-6-Chapter03-106.jpg

2.功能描述

这两个方法都可以根据指定列名返回DataFrame的列,其类型为Column。

3.示例

978-7-111-51909-6-Chapter03-107.jpg

十八、as

1.定义

978-7-111-51909-6-Chapter03-108.jpg

2.功能描述

调用as方法后,使用别名构建DataFrame。

3.解析

为了分析这个方法的作用,查看带as方法和不带的两种情况。

首先修改调试日志的级别,方便查看调试信息:

978-7-111-51909-6-Chapter03-109.jpg

不带as方法时的调试信息:

978-7-111-51909-6-Chapter03-110.jpg

978-7-111-51909-6-Chapter03-111.jpg

带as方法时的调试信息:

978-7-111-51909-6-Chapter03-112.jpg

978-7-111-51909-6-Chapter03-113.jpg

调用as方法后,仅在解析逻辑计划时,解析的第一步使用了别名:Subquery alias(在Parsed Logical Plan处)。

十九、distinct

1.定义

978-7-111-51909-6-Chapter03-114.jpg

2.功能描述

返回对DataFrame的Rows去重后的DataFrame。

3.示例

978-7-111-51909-6-Chapter03-115.jpg

示例中加载newPeople.json文件,构建了newPeople(是个DataFrame),通过unionAll方法合并df与newPeople,然后选择有重复Rows的“name”列,最后调用distinct方法进行去重。

二十、except

1.定义

978-7-111-51909-6-Chapter03-116.jpg

2.功能描述

返回DataFrame,包含当前Frame的Rows,同时这些Rows不在另一个Frame中。相当于两个DataFrame做减法。

3.示例

978-7-111-51909-6-Chapter03-117.jpg

二十一、explode

1.定义

978-7-111-51909-6-Chapter03-118.jpg

2.功能描述

返回一个新的DataFrame,其中原来的每一列都被指定的函数扩展成零行或多行。

3.示例

978-7-111-51909-6-Chapter03-119.jpg

978-7-111-51909-6-Chapter03-120.jpg

二十二、filter

1.定义

978-7-111-51909-6-Chapter03-121.jpg

2.功能描述

按参数指定的SQL表达式的条件过滤DataFrame。

3.示例(www.xing528.com)

978-7-111-51909-6-Chapter03-122.jpg

二十三、groupBy

1.定义

978-7-111-51909-6-Chapter03-123.jpg

2.功能描述

使用一个或多个指定的列对DataFrame进行分组,以便对它们执行聚合操作。

3.示例

978-7-111-51909-6-Chapter03-124.jpg

示例中先根据“gender”列对df进行分组,分组后再求“age”的最大值和“salary”列的平均值。

二十四、intersect

1.定义

978-7-111-51909-6-Chapter03-125.jpg

2.功能描述

取两个DataFrame中同时存在的Rows,返回DataFrame。

3.示例

978-7-111-51909-6-Chapter03-126.jpg

二十五、join

1.定义

978-7-111-51909-6-Chapter03-127.jpg

2.功能描述

对两个DataFrame求join操作。不带参数时取笛卡儿积,仅带join Exprs时默认为Inner Join,第三个join参数joinType可以指定具体的join操作。

3.示例

1)与前面的案例一样,加载测试文件:

978-7-111-51909-6-Chapter03-128.jpg

978-7-111-51909-6-Chapter03-129.jpg

2)将部门信息和人员信息做外联操作:

978-7-111-51909-6-Chapter03-130.jpg

二十六、limit

1.定义

978-7-111-51909-6-Chapter03-131.jpg

2.功能描述

返回DataFrame的前n个Rows。

3.示例

978-7-111-51909-6-Chapter03-132.jpg

二十七、orderBy和sort

1.定义

978-7-111-51909-6-Chapter03-133.jpg

2.功能描述

按指定的一列或多列进行排序,分别支持字符串或Column的参数列表。

3.示例

978-7-111-51909-6-Chapter03-134.jpg

978-7-111-51909-6-Chapter03-135.jpg

二十八、sample

1.定义

978-7-111-51909-6-Chapter03-136.jpg

2.功能描述

按指定因子对DataFrame的Rows进行取样,如果指定withReplacement为true时,使用指定的种子或随机的种子进行替换。

3.示例

978-7-111-51909-6-Chapter03-137.jpg

978-7-111-51909-6-Chapter03-138.jpg

当withReplacement为false时,指定的种子无效,为true时,会根据指定的种子,对应Rows的序号进行替换。

二十九、Select系列

1.定义

978-7-111-51909-6-Chapter03-139.jpg

2.功能描述

从DataFrame选取指定的列,返回DataFrame。指定列有三种方式,可以用列名字符串的重复参数,或Column重复参数及列名表达式的多个参数来指定。

3.示例

978-7-111-51909-6-Chapter03-140.jpg

978-7-111-51909-6-Chapter03-141.jpg

三十、unionAll

1.定义

978-7-111-51909-6-Chapter03-142.jpg

2.功能描述

联合调用者和参数这两个DataFrame的Rows。

3.示例

978-7-111-51909-6-Chapter03-143.jpg

978-7-111-51909-6-Chapter03-144.jpg

三十一、withColumn和withColumnRenamed

1.定义

978-7-111-51909-6-Chapter03-145.jpg

2.功能描述

对DataFrame列进行操作,withColumn增加DataFrame的列信息,withColumnRenamed则是对DataFrame的列进行重命名。

3.示例

978-7-111-51909-6-Chapter03-146.jpg

三十二、insertInto、insertIntoJDBC和createJDBCTable

1.定义

978-7-111-51909-6-Chapter03-147.jpg

2.功能描述

insert系列的方法:向指定表中增加DataFrame的Rows数据。带参数overwrite且为true时,insert into会导致覆写原表的数据(即插入前先truncate表)。参数url用来指定数据库信息。

createJDBCTable用于创建外部数据库的表,参数包含数据库连接的url信息,表名ta-ble,以及allowExisting表示是否允许表已存在。如果allowExisting为true,会在create表之前先delete表。

3.示例

978-7-111-51909-6-Chapter03-148.jpg

978-7-111-51909-6-Chapter03-149.jpg

示例中首先构建了两个DataFrame,用其中一个调用createJDBCTable方法构建了一个表:“TEST_JDBC”,这里createJDBCTable的第三个参数设置为true,当表存在时会先drop表,然后再create表。

创建表之后,使用insertIntoJDBC方法,将第二个DataFrame插入到刚创建的表“TEST_JDBC”中,其中,insertIntoJDBC的第三个参数选择了false,因此不会覆盖原有的表数据。

三十三、save

1.定义

978-7-111-51909-6-Chapter03-150.jpg

2.功能描述

将DataFrame的数据保存到指定路径下,其中path为数据存储路径,source为数据源标识,mode为保存模型,各个模型的具体信息可以参见章节3.3.1通用的加载/保存功能的案例与解析部分的保存模型的内容。

3.示例

978-7-111-51909-6-Chapter03-151.jpg

通过hdfs命令查看/user/harli/allword.json:

978-7-111-51909-6-Chapter03-152.jpg

三十四、saveAsParquetFile

1.定义

978-7-111-51909-6-Chapter03-153.jpg

2.功能描述

将DataFrame保存到数据源为“parquet”的指定路径下。

3.示例

978-7-111-51909-6-Chapter03-154.jpg

978-7-111-51909-6-Chapter03-155.jpg

通过hdfs命令查看路径“/user/harli/allword.parquet”:

978-7-111-51909-6-Chapter03-156.jpg

三十五、saveAsTable

1.定义

978-7-111-51909-6-Chapter03-157.jpg

2.功能描述

将DataFrame保存到表中,参数和save方法一样。

3.示例

978-7-111-51909-6-Chapter03-158.jpg

978-7-111-51909-6-Chapter03-159.jpg

三十六、flatMap

1.定义

978-7-111-51909-6-Chapter03-160.jpg

2.功能描述

对DataFrame中Rows进行处理,并且将处理结果。

3.示例

978-7-111-51909-6-Chapter03-161.jpg

示例中将Row转换为由每一列组成的List。

三十七、foreach

1.定义

978-7-111-51909-6-Chapter03-162.jpg

2.功能描述

foreach方法上对DataFrame中的Rows进行处理。foreachPartition方法则是对应分区中的Rows进行处理,即Iterator[Row],使用方法类似。

3.示例

978-7-111-51909-6-Chapter03-163.jpg

由于这是分布式计算,因此需要到Executor所在节点查看输出信息,查看Web Interface界面(http∶//master∶8080),获取输出信息,依次跳转界面,如图3.8所示。

978-7-111-51909-6-Chapter03-164.jpg

图3.8 Spark监控界面上的Application信息

监控界面上的特定Executor的日志信息如图3.9所示。

978-7-111-51909-6-Chapter03-165.jpg

图3.9 Spark监控界面上特定Executor的日志信息

单击Logs下的stdout,可以查看到输出信息,如图3.10所示。

978-7-111-51909-6-Chapter03-166.jpg

图3.10 Spark监控界面上特定Executor的stdout日志信息

由于当前执行了两次,因此stdout上有两行输出信息。

注意:这是在另一个集群中运行,cluster01是运行Spark的Master进程的节点。

三十八、map和mapPartitions

1.定义

978-7-111-51909-6-Chapter03-167.jpg

2.功能描述

map方法将DataFrame的Row按指定的函数参数映射成R实例,并返回以R为元素类型的RDD实例。

mapPartitions方法和map类似,只是函数参数作用在Iterator[Row]。

3.示例

978-7-111-51909-6-Chapter03-168.jpg

978-7-111-51909-6-Chapter03-169.jpg

4.应用场景

这里重点分析mapPartitions的应用场景,该API对大数据量进行处理时,如果应用得当,可以极大提高计算性能,通过查看源码,可以看到许多性能优化都是通过直接调用该方法来实现的。

具体的应用场景,比如求TopN型的场景,如果基于大数据量进行排序然后取topN,这在性能上是不可接受的,参看源码中使用了mapPartitions方法的takeOrdered方法:

978-7-111-51909-6-Chapter03-170.jpg

可以看到,该方法将大数据量的聚合转变成了分区小数据量的聚合操作,这里的聚合是分区的topN操作,原理上,和aggregate型的API是一样的,只是在对分区聚合结果上的处理有点差异,aggregate型的API是对分区的聚合结果再次进行二次聚合,然后封装成RDD类型返回,而takeOrdered方法,只需要对分区的聚合结果进行二次聚合,也就是上面源码中的mapRDDs.reduce{(queue1,queue2)…部分,二次聚合后直接把得到的新的topN数组返回即可,不需要再封装成RDD。

从上面的分析可以看到,对分布式的计算,基本原则就是化整为零,然后根据具体应用场景,对细节进行优化。因此,当需要实现类似于TopN的场景时,可以借鉴takeOrdered方法,但后面的具体细节处理,可以根据应用场景进行优化。

需要注意的是,takeOrdered方法在分区聚合结果的处理上,是基于N的值比较小的情况下,如果分区数为K的话,那么mapRDDs.reduce{(queue1,queue2)…}这一步得到的数据集的大小就是K∗N(在每个分区都有N个的情况下),如果K∗N数据量太大,超出内存装载能力,那就可能出现OMM的问题了。解决方法还是一样的,就是根据前面讲的,根据具体场景的实际情况,在细节的处理上进行优化。比如:

1)先用aggregate型的API方法进行初步聚合,然后在得到的结果RDD上,重分区,合并各个分区的TopN值,这样K就减少成了M,最终Driver Program端再进行sort时,数据集就可以从K∗N变成了M∗N,可以有效避免内存不足(Out of Memory)的问题。其中减少分区的方法可以使用coalesce方法进行重分区,参数shuffle设置为false,避免shuffle的过程。

2)也可以用另一种方法,修改mapRDDs.reduce{(queue1,queue2)…}方法的聚合操作,不再使用“queue1++=queue2”,而是将后面的toArray.sorted(ord)移入reduce中,每次reduce后取queue1、queue2的TopN,这种方法其实就是再次利用val mapRDDs=mapPar-titions{…}.中对分区排序的处理方式。实际上这种方法是没有必要的,如果数据量小,直接合并后再排序效率会更高,如果数据量大,那么在Driver Program端进行排序就失去了分布式并行计算的优势了。

这里使用“queue1++=queue2”是由于大部分场景下,TopN的N值都不会太大,即使“++”也不会造成OOM的问题,而上面两种解决方法,都是一种用性能换内存的权衡结果,大部分场景下是不需要的。

三十九、repartition

1.定义

978-7-111-51909-6-Chapter03-171.jpg

2.功能描述

返回一个DataFrame,该DataFrame按指定numPartitioins对原DataFrame进行重分区。

3.示例

978-7-111-51909-6-Chapter03-172.jpg

示例中,重分区后返回的DataFrame对应的RDD的分区个数已经改为1。

DataFrame的分区,实际上对应其RDD的数据分区。因此分区个数也对应了RDD的分区个数。

四十、toJSON

1.定义

978-7-111-51909-6-Chapter03-173.jpg

2.功能描述

把DataFrame的内容用包含JSON字符串的RDD返回。

3.示例

978-7-111-51909-6-Chapter03-174.jpg

978-7-111-51909-6-Chapter03-175.jpg

四十一、queryExecution

1.定义

978-7-111-51909-6-Chapter03-176.jpg

2.功能描述

返回DataFrame的查询执行语句,包含逻辑计划和物理计划。

3.示例

978-7-111-51909-6-Chapter03-177.jpg

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈