Spark压缩文件性能分析

Spark压缩文件性能分析

HDFS上分布式文件存储,成为大数据平台首选存储平台。而Spark往往以HDFS文件为输入,为保持兼容性,Spark支持多种格式文件读取,大数据场景下,性能瓶颈往往是IO,而不是CPU算力,所以对文件的压缩处理成为了很必要的手段。Spark为提供兼容性,同时支持多种压缩包直接读取,方便于用户使用,不用提前对压缩格式处理,但各种压缩格式各有优缺点,若不注意将导致Spark的能力无法发挥出来。故对Spark计算压缩文件做一个分析。

支持的压缩格式

首先来看一下Spark读取HDFS文件常用的压缩格式:
图片说明

执行对比分析

实验数据:同一个文件包,json格式文件数据
处理逻辑:增加列,然后发送到kafka中。
DAG逻辑划分:两个job(read动作一个job,foreach动作一个job),每个job下面各一个stage,每个stage下面task若干
程序执行参数:–master yarn –deploy-mode client –executor-cores 4 –executor-memory 4G –num-executors 4

非压缩文件

文件大小:33.7GB
运行时间:9min

read阶段:

图片说明
可以看到所有节点都在读取,分布式读取,速度很快。
图片说明

Stage里面共计分成了252个task,每个读取128MB数据。

foreach阶段

图片说明
依然并行全力计算
图片说明
每个执行节点上4个core都在并发运行。

GZIP

文件大小:10.6GB
运行时间:2.2h

###read阶段
图片说明
只有单节点读取

同时该节点上也只有一个核心在运行
图片说明
foreach阶段

也是只有单节点、单core运行

BZIP2

文件大小:7.7GB
运行时间:12min

read阶段

图片说明
与非压缩一致,并行进行

foreach阶段

图片说明
同样并发执行

SNAPPY

文件大小:16.5GB
运行时间:2.1h
这里直接采用的整文件压缩,所以文件不可分割。

read阶段

图片说明

单节点单核心读取,非并行。

foreach阶段

同上类似,单节点单核心运行
结果
图片说明
gzip和snappy无法采用并行计算,也就是说在spark平台上,这两种格式只能采用串行单进程执行,于本文开头表格对应,无法分割(splittable)的压缩格式只能顺序一个进程读取,而读取后多文件又在一个executor上,其他executor无文件导致无法并行的foreach。
bz2和非压缩格式支持分割,也就是说可以并行读取以及计算。
不可分割的压缩格式文件不可并行读取,完全无法发挥spark的并行计算优势,并且若压缩包过大,对单节点的物理性能要求较高。
建议
snappy采用分块压缩方式使其可以并行读取计算。
gzip格式最好提前进行分割成小文件或者换格式,因多个文件可以并行读取。另一个办法是read文件后调用repartition操作强制将读取多数据重新均匀分配到不同的executor上,但这个操作会导致大量单节点性能占用,因此该格式建议不在spark上使用。
bz2表现相同于非压缩,但解压操作需要耗费时间。
非压缩性能表现最佳,但会占用过大HDFS存储。
spark输出压缩文件
实际生产环境需要spark输出文件到HDFS,并且为了节省空间会使用压缩格式,以下介绍几种常用的压缩格式
文本文件压缩

bzip2

压缩率最高,压缩解压速度较慢,支持split。

1
2
3
import org.apache.hadoop.io.compress.BZip2Codec
//
rdd.saveAsTextFile("codec/bzip2",classOf[BZip2Codec])

snappy

json文本压缩率 38.2%,压缩和解压缩时间短。

1
2
3
4

import org.apache.hadoop.io.compress.SnappyCodec
//
rdd.saveAsTextFile("codec/snappy",classOf[SnappyCodec])

gzip

压缩率高,压缩和解压速度较快,不支持split,如果不对文件大小进行控制,下次分析可能可能会造成效率低下的问题。
json文本压缩率23.5%,适合使用率低,长期存储的文件。

1
2
3
4

import org.apache.hadoop.io.compress.GzipCodec
//
rdd.saveAsTextFile("codec/gzip",classOf[GzipCodec])

parquet文件压缩

列式存储布局(比如 Parquet)可以加速查询,因为它只检查所有需要的列并对它们的值执行计算,因此只读取一个数据文件或表的小部分数据。Parquet 还支持灵活的压缩选项,因此可以显著减少磁盘上的存储。
如果您在 HDFS 上拥有基于文本的数据文件或表,而且正在使用 Spark SQL 对它们执行查询,那么强烈推荐将文本数据文件转换为 Parquet 数据文件,以实现性能和存储收益。当然,转换需要时间,但查询性能的提升在某些情况下可能达到 30 倍或更高,存储的节省可高达 75%!
转换 1 TB 数据将花费多长时间?
50 分钟,在一个 6 数据节点的 Spark v1.5.1 集群上可达到约 20 GB/分的吞吐量。使用的总内存约为 500GB。HDFS 上最终的 Parquet 文件的格式为:

1
2
3
4
...
/user/spark/data/parquet/catalog_page/part-r-00000-9ff58e65-0674-440a-883d-256370f33c66.gz.parquet
/user/spark/data/parquet/catalog_page/part-r-00001-9ff58e65-0674-440a-883d-256370f33c66.gz.parquet
...

存储节省

以下 Linux 输出显示了 TEXT 和 PARQUET 在 HDFS 上的大小比较:

1
2
3
4
% hadoop fs -du -h -s /user/spark/hadoopds1000g
897.9 G /user/spark/hadoopds1000g
% hadoop fs -du -h -s /user/spark/data/parquet
231.4 G /user/spark/data/parquet

1 TB 数据的存储节省了将近 75%!
parquet为文件提供了列式存储,查询时只会取出需要的字段和分区,对IO性能的提升非常大,同时占用空间较小,即使是parquet的uncompressed存储方式也比普通的文本要小的多。
spark中通过对parquet文件进行存储,spark2.0后默认使用snappy压缩,1.6.3及以前版本默认使用的gzip压缩方式。

dataset.write().parquet(“path”);

可以通过
spark.sql.parquet.compression.codec

参数或是在代码中进行修改配置压缩方式。
sparkConf.set(“spark.sql.parquet.compression.codec”,”gzip”)

parquet存储提供了
lzo gzip snappy uncompressed

参考文章
https://zturn.cc/?p=24
https://blog.csdn.net/bajinsheng/article/details/100031359
https://www.ibm.com/developerworks/cn/analytics/blog/ba-parquet-for-spark-sql/index.html

文章目录
  1. 1. Spark压缩文件性能分析
  2. 2. 支持的压缩格式
  3. 3. 执行对比分析
    1. 3.1. 非压缩文件
      1. 3.1.1. read阶段:
      2. 3.1.2. foreach阶段
    2. 3.2. GZIP
    3. 3.3. BZIP2
      1. 3.3.1. read阶段
      2. 3.3.2. foreach阶段
    4. 3.4. SNAPPY
      1. 3.4.1. read阶段
      2. 3.4.2. foreach阶段
    5. 3.5. bzip2
    6. 3.6. snappy
    7. 3.7. gzip
    8. 3.8. parquet文件压缩
      1. 3.8.1. 存储节省
,