Spark读写Hive与两者兼容。

0 Spark & Hive

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。

Hive是基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。hive数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,能将SQL语句转变成MapReduce任务来执行。Hive的优点是学习成本低,可以通过类似SQL语句实现快速MapReduce统计,使MapReduce变得更加简单,而不必开发专门的MapReduce应用程序。hive是十分适合数据仓库的统计分析和Windows注册表文件。

1 Hive相关的操作

Spark SQL支持读写Hive,不过Hive本身包含了大量的依赖,这些依赖spark默认是没有的。如果Hive的依赖在Classpath中,那么Spark可以自动加载(注意Spark的worker节点也需要提供这些依赖)。默认配置Hive只需要把相关的hive-site.xml core-site.xml hdfs-site.xml 放到conf目录下即可。

当使用hive时,需要在 SparkSession 中开启hive,从而获得hive相关的serdes以及函数。如果没有现成的Hive环境们也可以使用,spark会自动在当前目录创建metastore_db,目录的位置可以通过参数 spark.sql.warehouse.dir 指定, 默认是启动Spark应用程序的目录。 注意在spark2.0之前使用的参数hive.metastore.warehouse.dir属性,已经废弃。 另外不要忘记赋予spark程序读写对应目录的权限。

// 创建spark session 并 指定hive地址
val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

// 执行hive操作
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 查询hive表
sql("SELECT * FROM src").show()

// 执行聚合操作
sql("SELECT COUNT(*) FROM src").show()

// sql转换DataFrame
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()

// 在sparksession中创建虚拟表
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

// 使用hive命令创建表
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
sql("SELECT * FROM hive_records").show()

// 创建hive外部表
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
sql("SELECT * FROM hive_bigints").show()

// 动态配置hive属性
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write
  .partitionBy("key")
  .format("hive")
  .saveAsTable("hive_part_tbl")

2 Hive存储相关的配置

当创建hive表时,需要定义表如何读写文件系统,比如 input format 和 output format,还需要配置如何序列化反序列化每一行数据,比如 serde。下面的命令可以用于指定存储的serde, input format, output format, 比如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet') 。默认会把文件当做普通文本读写。注意hive storage handler不能再DDL中配置,目前可以在hive端定义,然后使用spark sql来读取。

相关的配置属性如下:

2.1 fileFormat

存储的格式,包含serde, input format, output format。目前支持6中文件格式,sequencefile, rcfile, orc, parquet, textfile, avro。

2.2 inputFormat, outputFormat

这两个选项用于配置hive表的InputFormat和OutputFormat类,如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果使用了fileFormat就不能再配置这两个属性了。

2.3 serde

配置序列化反序列化类,如果配置fileFormat,也不能配置该属性。目前 sequencefile textfile rcfile 不包含serde,因此可以使用这个选项进行配置。

2.4 fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim

这些选项用于textfile文件格式时,配置相关的分隔符。

3 与不同版本的Hive metastore交互

Spark SQL与Hive metastore交互是很常见的使用场景,这样spark就可以直接操作hive中的元数据了。从spark 1.4开始,spark sql可以与不同的hive版本交互。默认spark使用的是hive 1.2.1进行编译,包含对应的serde, udf, udaf等。

3.1 spark.sql.hive.metastore.version

hive版本,默认是1.2.1。支持从0.12.0到2.3.3。

3.2 spark.sql.hive.metastore.jars

HiveMetastoreClient相关的jar包地址,默认是buildin。可以配置成三种属性: - builtin, 使用hive 1.2.1, 在spark编译是,使用-Phive开启。当关闭时,需要指定spark.sql.hive.metastore.version为1.2.1 - maven, 从maven仓库下载编译,这个选项不推荐在生产环境使用。 - jvm中的classpath,这个路径需要包含hive以及对应的依赖,以及hadoop对应版本的依赖。这些资源只需要在driver端提供,如果使用yarn cluster模式,需要保证相关的资源都打包到应用jar中。

3.3 spark.sql.hive.metastore.sharedPrefixes

在Spark SQL和hive中共享的jar包前缀,默认com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc。典型的例子就是与metastore沟通的JDBC驱动相关的jar。其他需要共享的类,如 log4j中的自定义appender。

3.4 spark.sql.hive.metastore.barrierPrefixes

配置Spark SQL使用时需要重新加载的类,如hive中使用共享包名定义的udf。


Spark SQL设计的时候就考虑了与Hive元数据、SerDes、UDF的兼容性。

4 与现有的Hive数仓集成

Spark SQL thrift JDBC服务器被设计成开箱即用,无需修改任何Hive的配置就可以在Spark SQL中使用。

5 支持的Hive特性

Spark SQL支持很多Hive的特性,比如:

  • Hive的查询,包括:SELECT, GROUP BY, ORDER BY, CLUSTER BY, STORT BY
  • 所有的Hive操作,包括:关系操作符,如 =,
  • 用户自定义函数, UDF
  • 用户自定义聚合函数,UDAF
  • 用户自定义序列化格式,SerDes
  • 窗口函数
  • 关联操作,如 JOIN, {LEFT | RIGHT | FULL} OUTER JOIN, LEFT SEMI JOIN, CROSS JOIN
  • 合并操作
  • 子查询,如 SELECT col FROM (SELECT a+b AS col from t1) t2
  • 抽样 sample
  • 解释执行计划 explain
  • 分区表 以及 动态分区插入
  • 视图 VIEW
  • DDL函数,如 CREATE TABLE, CREATE TABLE AS SELECT, ALTER TABLE
  • 支持大部分Hive的数据类型,包含:TINYINT, SMALLINT, INT, BIGINT, BOOLEAN, FLOAT, DOUBLE, STRING, BINARY, TIMESTAMP, DATE, ARRAY<>, MAP<>, STRUCT<>

6 不支持的Hive特性

下面是Hive在Spark中不支持的特性,大部分特性再Hive中也很少使用。

主要的Hive特性

分桶表,桶是基于hive分区表再次进行hash分区。Spark SQL目前不支持(可以通过spark api进行分桶)。

这里再回顾下分桶表和分区表。分区表是基于目录对数据进行物理的划分,查询的时候可以通过分区字段快速过滤不满足的数据,提升扫表的速度。分桶表是基于某个字段进行hash打散到固定的几个桶里,每个桶对应一个物理文件,他们都存储在同一个文件夹中。当两个表join的时候可以通过分桶的hash信息快速知道那两个文件需要放在一起join(包含相同列值的需要在一起进行join操作,列值通过分桶文件可以快速进行区分)。如果分桶操作搭配上排序操作,那么整个表的存储就可以看做是 桶排序的一种实现,当需要顺序合并所有的数据时,也会提升排序效率。

难以理解的Hive特性

  • UNION类型
  • 唯一JOIN
  • 列统计信息搜集,spark不能再后台去统计列的信息,只能依赖hive元数据进行统计。

Hive的输入输出格式

Hive CLI的格式,专门用于在CLI中进行展示的格式,目前Spark仅支持TextOutputFormat。Hadoop存档。

展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java