使用PySpark优化Pandas

佳境Shmily 2020-08-10 11:26:08
技术

前言

  Pandas一直是非常受欢迎的数据分析利器,它基于Numpy,专为解决数据分析任务。因其基于Python,只能单节点单核心运行,所以在大数据分析场景下,瓶颈很明显。PySpark是基于Spark JavaClient的上层接口,可以结合Python语言以及Spark分布式运行的特点,来解决Pandas在大数据下的瓶颈。本篇文章主要对比Pandas API与PySparkAPI,总结一些Pandas应用场景下使用PySpark提高效率的方案。
  本篇主要是对比Pandas和PySpark的API使用,但不能对它们众多API做一一对比介绍,所以对于PySpark的更多API使用请参考:pyspark.sql官方使用文档

对比

特点 Pandas PySpark
运行方式 单机单核 分布式
并行机制 不支持 支持
数据位置 单机内存 多节点内存和磁盘
大数据支持
数据处理方式 无懒加载 懒加载+优化无用操作
DataFrame 可变 不可变

基本原则

  1. 需要对大量数据进行分析的场景下,在大数据处理的源头必须使用PySpark
  2. 数据经过一系列操作、聚合后数据量减少,且迫不得已用Pandas的情况下再使用Pandas(用Pandas处理的数据尽量更少)
  3. 如果可以,尽量全程使用PySpark进行分析操作
  4. 需要对计算复杂且耗时的Sparkdataframe进行cache避免重算提高效率
  5. 尽可能将一段处理逻辑写到一段SQL中,而非得到多个Dataframe然后进行join

数据创建

文中所有Spark Dataframe对象简称df,Pandas的Dataframe对象简称pd_df

数据结构

数据显示

数据排序

交集并集差集

数据选择或切片

数据过滤

数据去重

取唯一值

分组聚合

数据计算

数据统计

数据合并

TODO:待完善测试

数据修改

对应pd.apply(f)方法 即给df的每一列应用函数f

空值处理

SQL支持

互相转换

透视表

透视表与逆透视表:
alt pyspark-pandas-01
透视Pivot:
按不需要转换的字段分组(groupBy) -> pivot函数进行透视,可选第二个参数指定输出字段数据项 -> 聚合汇总数据项得到结果
逆透视unpivot:列形式且无重复值的数据转成行形式且有重复值得数据

diff操作

数据保存

pd_df.to_csv("/data/path_to_file")   # 写本地csv文件

df.write.saveAsTable(“hive_table”, mode=”append”) # 直接写数据到hive表 无论表是否已经存在都可以 还有options,partitionBy,format等参数影响表结构
df.write.format(‘parquet’).bucketBy(100,’year’,’month’).sortBy(‘day’).mode(‘overwrite’).saveAsTable(‘sorted_bucketed_table’) # 数据排序分区存储成parquet
df.coalesce(1).write.save(path,format,mode,partitionBy,**Options) # 存储数据
df.coalesce(1).write.json(“file:///data/path”,mode=’overwrite’,) # 写数据到单个json文件

注:文件写到hdfs也不要紧,可以通过挂载NFS或者FUSE等方式将hdfs目录挂载到本地,同样方便后续处理

## 高级用法(优化)
* PySpark连续编写转换函数
```python
spark.table('ods_test.test').filter('age=22').where('dt="20200524"').groupBy('id').avg('age').registerTempTable('tmp')
for i in spark.sql("select id,'avg(age)' as avg_age from tmp").collect():
    print(i[0], i[1])

其他

Python三方库:SparklingPandas
SparklingPandas

参考

PySpark.sql module
pandas与pyspark对比
Spark:使用partitionColumn选项读取数据库原理
PySpark-DataFrame操作指南