Spark大家都知道是什么,不知道可以看 Spark入门。
Parquet仅仅是一种存储格式,它是语言、平台无关的,并且不需要和任何一种数据处理框架绑定,目前能够和Parquet适配的组件包括下面这些,可以看出基本上通常使用的查询引擎和计算框架都已适配,并且可以很方便的将其它序列化工具生成的数据转换成Parquet格式。
查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
数据模型: Avro, Thrift, Protocol Buffers, POJOs
parquet是一种广泛应用的列式存储结构,spark sql 提供了 parquet 的读写并自动保存schema信息。当写 parquet 文件时,为保证兼容性,所有的字段都会默认设置成可以为空。
// spark隐式转换
import spark.implicits._
val peopleDF = spark.read.json("people.json")
// 保存
peopleDF.write.parquet("people.parquet")
// 读取,内部自动包含shema信息
val parquetFileDF = spark.read.parquet("people.parquet")
// 使用
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
表分区是Hive中常见的优化方法,在分区表,数据会自动存储到不同的目录中,每个目录使用分区字段的值进行标识。所有内置的文件类型,都可以使用自动发现分区特性,如 text, csv, json, orc, parquet等。比如可以按照数据最常用的字段进行分区,下面就是一个使用性别和国家进行多级分区的例子:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
当在 spark.read.load 或 spark.read.parquet 中指定路径 path/to/table后,spark会自动抽取路径中的信息作为字段值,形成完整的数据结构:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
// 开启分区字段类型推断
spark.sql.sources.partitionColumnTypeInference.enabled
注意分区字段中的类型是自动推断的,目前支持 数字 date timestamp 字符串等。如果用户不想开启自动推断,可以配置参数。该参数默认为 true, 当类型推断禁用后,分区字段都会为string类型。
从Spark1.6开始只能发现指定路径下的分区内容,比如如果指定的路径是: path/to/table/gender=male 那么gender这个字段默认并不会被解析出来,但是如果配置了basePath为 path/to/table 性别字段就还可以识别出来。如:
spark.read
.option("basePath", basePath)
.parquet("")
与protocal buffer, avro, thrift 类似,parquet 也支持 schema 的兼容。用户可以刚开始使用一个很简单的 shcema , 然后不断的新增字段。这样用户就可以基于不同 schema 的文件,最终得到兼容的结果。由于 schema 合并是代价比较大的操作,而且并不是很常见的场景,因此从1.5.0开始默认都是关闭的。可以通过下面方法开启: - 配置option属性,mergeSchema为true - 配置全局的spark.sql.parquet.mergeSchema为true
代码演示
val squaresDF = spark.sparkContext
.makeRDD(1 to 5)
.map(i => (i, i * i))
.toDF("value", "square")
squaresDF.write
.parquet("data/test_table/key=1")
val cubesDF = spark.sparkContext
.makeRDD(6 to 10)
.map(i => (i, i * i * i))
.toDF("value", "cube")
cubesDF.write
.parquet("data/test_table/key=2")
val mergedDF = spark.read
.option("mergeSchema", "true")
.parquet("data/test_table")
mergedDF.printSchema()
// 输出为
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- cube: integer (nullable = true)
|-- key: integer (nullable = true)
// 如果不加mergeSchema属性,则为
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- key: integer (nullable = true)
当从Hive元数据中读取parquet表时,Spark会尝试使用自己的parquet解析器来代替Hive原生,从而提升性能。该操作通过属性 spark.sql.hive.convertMetastoreParquet 来控制,默认是开启的。
在表处理时,Hive和Parquet有两个关键的不同点: - Hive是大小写敏感的,而parquet不是 - Hive认为所有的列都是可以空的,但是是否为空对于Parquet影响很大
考虑到上面的因素,当使用spark sql 读取 hive 原生的 parquet 时,就需要对 hive 的 schema 和 parquet 的schema做兼容处理: - 相同名称的字段需要有相同的数据类型,并忽略是否为空的配置。当冲突时,会使用parquet中的类型。 - 冲突的字段包含: 仅在parquet中存在的字段,将会被忽略;仅在hive中存在的字段将被标记为null。
spark sql会缓存parquet的元数据信息,因此当hive中的表定义发生变更时,需要进行同步。可以使用下面的命令:
spark.catalog.refreshTable("my_table")
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。