首页 理论教育 数据处理和分析技巧

数据处理和分析技巧

时间:2023-07-02 理论教育 版权反馈
【摘要】:GroupData类在Spark 2.0.x版本改为RelationalGroupedDataset。

数据处理和分析技巧

这部分内容将对员工信息文件及部门信息文件进行数据处理和分析,下面是具体的操作过程。

1.修改日志等级

将日志等级设置为Level WARN,是为了简化界面的输出信息。

2.加载文件

上面示范了两种方式,分别加载HDFS上的员工信息文件和部门信息文件,得到了两个DataFrame实例:people和dept。

3.以表格形式查看people信息

通过show方法,可以以表格的形式输出各个DataFrame的内容。

4.DataFrame基本信息的查询

以上是针对员工信息的DataFrame进行一些基本信息的查询操作:

●使用DataFrame的columns方法,查询people包含的全部列信息,以数组形式返回列名组。

●使用DataFrame的count方法,统计people包含的记录条数,即员工个数。

●使用DataFramed的take方法,获取前3条员工记录信息,并以数组形式呈现出来。

●最后使用DataFrame的toJSON方法,将people转换为JSONRDD类型,并使用RDD的collect方法返回其包含的员工信息。

5.对员工信息进行条件查询,并输出结果

在上述示例中,针对员工信息的DataFrame,进行了一些条件查询操作:

●使用count方法统计了“gender”列为“male”的员工数。

●基于“age”和“gender”两列,使用不同的查询条件,不同的DataFrame API,即where和filter方法,对员工信息进行过滤。

●最后仍然使用show方法,将查询结果以表格的形式呈现出来。

●在各个例子中,使用了几种不同的方式,作为查询条件的参数。特别注意上面查询条件表达式中的单引号及$符号

6.根据指定的列名,以不同方式进行排序

在上述示例中,针对员工信息DataFrame,基于“job number”和“depId”两列,使用sort方法,以不同方式进行排序,并输出结果,具体包含:

●先以“job number”列升序,然后再按“depId”列降序的方式,对people进行排序,并输出排序后的内容;这里给出了两种指定列的方式。

●以“job number”列进行默认排序(升序),并显示排序后的前3条记录。

●以“job number”列指定降序方式排序,并显示排序后的前3条记录。

7.为员工信息增加一列:等级(“level”)

在上述示例中,通过withColumns方法增加了新的一列等级信息,列名为“level”。在withColumns方法中:

第一个参数“level”指定了新增列的列名。

第二个参数people("age")/10,指定了该列的实例,通过转换得到新列,people("age")调用了DataFrame的apply方法,返回“age”列名所对应的列。

8.修改工号列名

在上述示例中,通过withColumnRenamed方法修改列名,示例将people的“job number”列名修改为“jobId”,通过交互式输出信息可以看到列名已经被修改。

注意,修改的列名如果不存在,不会报错,但列名不会修改,如下所示:

在该示例中,指定修改的“job numbe”列名拼写错误(少了一个字母r),所以正确的列名“job number”并没有修改成功。

9.增加新员工

在上述示例中,使用jsonFile方法加载了新员工信息的文件,然后调用people的union-All方法,将新加载的newPeople合并进来。

注意:因为加载文件是lazy性质的,由于没有对DataFrame进行缓存,因此最终合并时会重新加载新旧两个员工信息文件。

10.查同名员工

在上述示例中,首先通过unionAll方法将people和newPeople两个文件进行合并,然后使用groupBy方法将合并后的DataFrame按照“name”列进行分组,得到GroupData类的实例,实例会自动带上分组的列,以及“count”列。

GroupData类提供了一组非常有用的统计操作,这里调用它的count方法,最终实现对员工名字的分组统计。(www.xing528.com)

GroupData类在Spark 2.0.x版本改为RelationalGroupedDataset

11.分组统计信息

在上述示例中,首先针对people的“depId”进行分组,再对分组后得到的GroupData实例继续调用agg方法,分别对“age”列求最大值,对“gender”进行分组统计,返回DataFrame对象实例depAgg。depAgg的schema为[depId:bigint,max(age):bigint,count(gen-der):bigint],即除了带上分组用的“depId”列外,还带上列聚合操作后的两列信息。

12.名字去重

在上述示例中,首先显示新旧员工信息合并后的“name”列,作为后续去重的比较对象。通过unionAll新旧员工信息,并只选择其中的“name”列信息后,出现的“name”信息就出现列重复,通过继续调用DataFrame的distinct去重方法后,可以去除重复的记录数据。

13.对比新旧员工表

在上述示例中,包含了对people和newPeople两个员工信息文件中“name”列的两种比较方式,具体如下:

第一种:分别选取people和newPeople两个员工信息文件中的“name”列,然后通过调用except方法,获取在people中出现但不在newPeople中出现的“name”信息,最后以表格形式呈现结果。

第二种:求“name”的交集,即分别选取people和newPeople两个员工信息文件中的“name”列,然后通过调用intersect方法,获取在people中出现但同时又在newPeople中出现的“name”信息,最后以表格形式呈现结果。

14.关联两个DataFrame实例

本实例查询员工信息及员工所属的部门:员工信息people的DataFrame中包括年龄、部门ID、性别、工号、姓名、薪酬等信息;部门信息dept的DataFrame中包括部门ID、部门名称等信息;员工信息people和部门信息dept根据部门ID号进行关联。

在上述示例中,通过调用join方法,把people中的“depId”列与dept中的“depId”列进行outer join关联操作。

DataFrame实例间的join关联操作包括innerouterleft outerright outerleft semi

●inner join等值连接只返回两个表中联结字段相等的行

●outer join包含左右两个表的全部行不管另外一边的表中是否存在与它们匹配的行

●left_outer join如果右边有多行和左边表对应就每一行都映射输出如果右边没有行与左边行对应

就输出左边行右边表字段为NULL

right_outer join如果左边有多行和右边表对应就每一行都映射输出如果左边没有行与右边行对应

就输出右边行左边表字段为NULL

leftsemi join相当于SQL的in语句如果右边有多行和左边表对应重复的多条记录不输出只输出

一条记录如果右边没有行与左边行对应不输出记录

由于people与dept的两个DataFrame中用于关联的列名相同,都是“depId”,因此,指定关联条件表达式时,需要指出列所属的具体DataFrame实例,否则会报错。但是,如果两个列名不同,则可以直接使用列名,表达式会更加精简,比如:

15.关联操作后按部门名分组统计

在上述示例中,joinP是关联操作示例中people与rnDept进行join的结果,对joinP对象调用其groupBy方法,根据dept的“name”列进行分组,并在分组后对指定的列执行指定的聚合操作,这里对“age”列求最大值,对“count”列进行计数。joinGP是joinP根据部门名称进行分组,然后计算最大年龄和性别计数以后生成的DataFrame。

16.保存为表

在对各个DataFrame实例进行操作后,获取了目标信息,如果后续需要这些信息的话,就必须执行持久化操作,即将文件保存到存储系统或表中。

下面给出几种持久化的示例。

1)首先,将实例持久化到表中。

DataFrame相关的Save操作还有registerTempTable。

在Spark 2.0.X版本中DataFrame类下没有saveAsTable方法该方法被放在DataFrameWriter类下

2)保存为JSON文件。

scala>people.save("hdfs:/library/SparkSQL/Data/peoplesave.json","json")

这里使用save方法,在方法中指定数据源格式为“json”,可以将DataFrame实例持久化到指定的路径上。通过HadoopWeb Interface界面可以查看到JSON文件。

3)保存为parquet文件。

scala>people.save("library/SparkSQL/Data/hsqlDF.parquet","parquet")

这里同样使用save方法,在方法中指定数据源格式为“parquet”,可以将DataFrame实例持久化到指定的路径上。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈