Spark读写Parquet。

0 简介

Spark大家都知道是什么,不知道可以看 Spark入门

Parquet仅仅是一种存储格式,它是语言、平台无关的,并且不需要和任何一种数据处理框架绑定,目前能够和Parquet适配的组件包括下面这些,可以看出基本上通常使用的查询引擎和计算框架都已适配,并且可以很方便的将其它序列化工具生成的数据转换成Parquet格式。
    查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
    计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
    数据模型: Avro, Thrift, Protocol Buffers, POJOs

1 spark操作parquet

parquet是一种广泛应用的列式存储结构,spark sql 提供了 parquet 的读写并自动保存schema信息。当写 parquet 文件时,为保证兼容性,所有的字段都会默认设置成可以为空。

自动加载数据

// spark隐式转换
import spark.implicits._

val peopleDF = spark.read.json("people.json")

// 保存
peopleDF.write.parquet("people.parquet")

// 读取,内部自动包含shema信息
val parquetFileDF = spark.read.parquet("people.parquet")

// 使用
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()

2 分区自动发现

表分区是Hive中常见的优化方法,在分区表,数据会自动存储到不同的目录中,每个目录使用分区字段的值进行标识。所有内置的文件类型,都可以使用自动发现分区特性,如 text, csv, json, orc, parquet等。比如可以按照数据最常用的字段进行分区,下面就是一个使用性别和国家进行多级分区的例子:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

当在 spark.read.load 或 spark.read.parquet 中指定路径 path/to/table后,spark会自动抽取路径中的信息作为字段值,形成完整的数据结构:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

// 开启分区字段类型推断
spark.sql.sources.partitionColumnTypeInference.enabled

注意分区字段中的类型是自动推断的,目前支持 数字 date timestamp 字符串等。如果用户不想开启自动推断,可以配置参数。该参数默认为 true, 当类型推断禁用后,分区字段都会为string类型。

从Spark1.6开始只能发现指定路径下的分区内容,比如如果指定的路径是: path/to/table/gender=male 那么gender这个字段默认并不会被解析出来,但是如果配置了basePath为 path/to/table 性别字段就还可以识别出来。如:

spark.read
  .option("basePath", basePath)
  .parquet("")

3 schema兼容

与protocal buffer, avro, thrift 类似,parquet  也支持 schema 的兼容。用户可以刚开始使用一个很简单的 shcema , 然后不断的新增字段。这样用户就可以基于不同 schema 的文件,最终得到兼容的结果。由于 schema 合并是代价比较大的操作,而且并不是很常见的场景,因此从1.5.0开始默认都是关闭的。可以通过下面方法开启: - 配置option属性,mergeSchema为true - 配置全局的spark.sql.parquet.mergeSchema为true

代码演示

val squaresDF = spark.sparkContext
  .makeRDD(1 to 5)
  .map(i => (i, i * i))
  .toDF("value", "square")
squaresDF.write
  .parquet("data/test_table/key=1")

val cubesDF = spark.sparkContext
  .makeRDD(6 to 10)
  .map(i => (i, i * i * i))
  .toDF("value", "cube")
cubesDF.write
  .parquet("data/test_table/key=2")

val mergedDF = spark.read
  .option("mergeSchema", "true")
  .parquet("data/test_table")

mergedDF.printSchema()

// 输出为
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)

// 如果不加mergeSchema属性,则为
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- key: integer (nullable = true)

4 Hive与Parquet兼容

当从Hive元数据中读取parquet表时,Spark会尝试使用自己的parquet解析器来代替Hive原生,从而提升性能。该操作通过属性 spark.sql.hive.convertMetastoreParquet 来控制,默认是开启的。

Hive/Parquet的Schema兼容

在表处理时,Hive和Parquet有两个关键的不同点: - Hive是大小写敏感的,而parquet不是 - Hive认为所有的列都是可以空的,但是是否为空对于Parquet影响很大

考虑到上面的因素,当使用spark sql 读取 hive 原生的 parquet 时,就需要对 hive 的 schema 和 parquet 的schema做兼容处理: - 相同名称的字段需要有相同的数据类型,并忽略是否为空的配置。当冲突时,会使用parquet中的类型。 - 冲突的字段包含: 仅在parquet中存在的字段,将会被忽略;仅在hive中存在的字段将被标记为null。

元数据刷新

spark sql会缓存parquet的元数据信息,因此当hive中的表定义发生变更时,需要进行同步。可以使用下面的命令:

spark.catalog.refreshTable("my_table")
展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java