首页 理论教育 SparkSQL内置函数的使用实例

SparkSQL内置函数的使用实例

时间:2023-07-02 理论教育 版权反馈
【摘要】:本节介绍两个Spark SQL内置函数应用案例:●商品订单交易查询案例,以Scala本地编程的方式实现。2)创建SparkContext对象,构建Spark SQL的上下文。5)使用Spark SQL提供的内置函数对DataFrame进行操作。这里使用的Spark SQL内置函数:●countDistinct函数:该函数是内置函数中的聚合函数,返回组中不同项的个数。2)使用Spark SQL基于Hive数据库表进行查询统计:查询订单表tbStock中的记录数,查询订单明细表tbStockDetail中的记录数。

SparkSQL内置函数的使用实例

本节介绍两个Spark SQL内置函数应用案例:

●商品订单交易查询案例,以Scala本地编程的方式实现。

电商交易项目综合案例,以Spark Shell编程方式实现。

1.商品订单交易查询案例

下面我们通过Scala编程的方式,以一定时间范围内商品订单交易为例,在集成开发环境中熟悉Spark内置函数的简单使用。我们这里采用的是Scala IDE,当然也可以采用IntelliJ IDEA等其他集成开发工具。

1)创建Spark配置对象SparkConf,通过setAppName方法设置应用程序名称,setMaster方法设置程序要链接的Spark集群的Master URL,这里设置为Local模式,这就代表Spark程序在本地运行。

2)创建SparkContext对象,构建Spark SQL的上下文

val sc=newSparkContext(conf)

val sqlContext=new SQLContext(sc)

这里需要注意的是,使用Spark SQL内置函数,就需要以import sqlContext.implicits._的方式导入SQLContext下的隐式转换的内容。

3)这里我们手动创建一些数据来模拟一定时间范围内商品订单交易信息,以(交易日期,订单编号,商品编号,订单总额)为模型,在实际情况下会比模拟的数据复杂很多,最后通过parallelize的方式构建出订单RDD分布式集合对象。

4)对业务数据进行预处理生成DataFrame,将创建的RDD转换为DataFrame,这里把数组中的每个String类型用逗号切分生成Row类型。

5)使用Spark SQL提供的内置函数对DataFrame进行操作。需要注意的是,内置函数生成的是Column对象并且采用字节码生成技术(Bytecode Generaction,BG)的方式,在Spark SQL执行物理计划的时候对匹配的表达式采用特定的代码,动态编辑,然后运行。

这里使用的Spark SQL内置函数:

●countDistinct函数:该函数是内置函数中的聚合函数,返回组中不同项的个数。

●agg函数:这里通过指定一系列聚合列计算聚合。

然后基于构建的DataFrame对数据进行分析,我们按日期统计每天成交多少种商品:

运行代码并查看结果:

我们再统计一下每天订单成交总额:

运行代码并查看结果:

注意:由于Spark的版本的不断升级,对于聚合函数参数的写法都有所不同,这里归纳出几种常见的写法:

上述写法中的内置函数包括:

●min函数:内置函数中的聚合函数,返回组中表达式的最小值。

●agg函数:这里通过指定列名的映射来计算聚合的方法。由此产生的DataFrame将包含分组列。可用的聚合方法是avg,max,min,sum,count(“平均值”“最大值”“最小值”“总数”“计数”)。

2.电商交易项目综合案例:

本案例进行电商交易项目综合查询应用:

1)在Hive中创建数据库及数据库表,在tbDate日期分类表、订单表tbStock、订单明细表tbStockDetail中加载相应的数据记录。

2)使用Spark SQL基于Hive数据库表进行查询统计:查询订单表tbStock中的记录数,查询订单明细表tbStockDetail中的记录数。

3)在复杂统计中,我们使用内置函数sum函数来统计每年每个商品的销量总额并注册为临时表T1。

4)在此基础上,我们使用内置函数max函数查询出每年的最大销售总额,并注册为临时表T2。(www.xing528.com)

5)基于上两步的临时表,通过年份(T1表中的theyear)、商品销量总额(T1表中的year_amount),T2表中的最大销售总额(max_year_amount),关联查询出所有订单中每年的畅销商品。

1)除去过多的日志,通过Apache的Log4J设置日志消息的级别。

2)初始化HiveContext,这里需要注意两点:当前Spark的版本是否支持Hive,其次Hive的配置文件hive-site.xml已经存放到Spark主目录的conf目录下。

val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)

3)创建数据库和数据表

在Hive中创建一个数据库SALEDATA,用于存放数据表。

使用这个数据库SALEDATA:

创建表tbDate,这个表定义了日期的分类:

创建表tbStock,这个表定义了订单的信息:

创建表tbStockDetail,这个表定义了订单的明细:

4)分别加载tbDate.txt、tbStock.txt、tbStockDetail.txt这3个数据文件至tbDate、tb-Stock、tbStockDetail表中。

注意:这里通过OVERWRITE INTO实现覆盖表原有的数据的目的。

5)查询统计。

在订单表tbStock中的记录数以及订单明细表tbStockDetail中的记录数的查询中没使用到Spark SQL内置函数,我们通过普通的select查询语句进行查询。

①查询订单表tbStock中的记录数:

②查询订单明细表tbStockDetail中的记录数:

6)复杂统计。

本节中我们将使用内置函数sum函数来统计每年每个商品的销量总额;使用内置函数max函数查询出每年的最大销售总额。

①查询出每年每个商品的销量总额。

每个订单可能对应不同的商品,所以需要通过订单表tbStock和订单明细表tbStockDetail中的ordernumber字段进行关联。

时间维度上需要根据年来统计销量总额,而tbDate定义了日期的分类信息,如年、月、日等,同时为保证后续按照时间维度进行查询,所以需要根据订单表tbStock中的dateID下单时间ID和tbDate进行关联,这样就需要关联3张表。

我们将这个DataFrame注册为临时表T1以供后续使用,其中year_amount为每年每个商品的销量总额。

②查询出每年的最大销售总额。

在上一步查询的基础之上对theyear进行分组,运用max聚合计算出每年最大销售总额,代码如下:

我们将这个DataFrame注册为临时表T2以供后续使用,其中max_year_amount为每年的最大销售总额,代码如下:

③查找出所有订单中每年的畅销商品。

基于上两步的临时表通过年份(T1表中的theyear)和统计出的商品销量总额(T1表中的year_amount和T2表中的max_year_amount)

将T1和T2进行关联,目的是为了查找商品的信息,代码如下:

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈