Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。
Hive是基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。hive数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,能将SQL语句转变成MapReduce任务来执行。Hive的优点是学习成本低,可以通过类似SQL语句实现快速MapReduce统计,使MapReduce变得更加简单,而不必开发专门的MapReduce应用程序。hive是十分适合数据仓库的统计分析和Windows注册表文件。
Spark SQL支持读写Hive,不过Hive本身包含了大量的依赖,这些依赖spark默认是没有的。如果Hive的依赖在Classpath中,那么Spark可以自动加载(注意Spark的worker节点也需要提供这些依赖)。默认配置Hive只需要把相关的hive-site.xml core-site.xml hdfs-site.xml 放到conf目录下即可。
当使用hive时,需要在 SparkSession 中开启hive,从而获得hive相关的serdes以及函数。如果没有现成的Hive环境们也可以使用,spark会自动在当前目录创建metastore_db,目录的位置可以通过参数 spark.sql.warehouse.dir 指定, 默认是启动Spark应用程序的目录。 注意在spark2.0之前使用的参数hive.metastore.warehouse.dir属性,已经废弃。 另外不要忘记赋予spark程序读写对应目录的权限。
// 创建spark session 并 指定hive地址
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
// 执行hive操作
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 查询hive表
sql("SELECT * FROM src").show()
// 执行聚合操作
sql("SELECT COUNT(*) FROM src").show()
// sql转换DataFrame
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// 在sparksession中创建虚拟表
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// 使用hive命令创建表
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
sql("SELECT * FROM hive_records").show()
// 创建hive外部表
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
sql("SELECT * FROM hive_bigints").show()
// 动态配置hive属性
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write
.partitionBy("key")
.format("hive")
.saveAsTable("hive_part_tbl")
当创建hive表时,需要定义表如何读写文件系统,比如 input format 和 output format,还需要配置如何序列化反序列化每一行数据,比如 serde。下面的命令可以用于指定存储的serde, input format, output format, 比如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet') 。默认会把文件当做普通文本读写。注意hive storage handler不能再DDL中配置,目前可以在hive端定义,然后使用spark sql来读取。
相关的配置属性如下:
存储的格式,包含serde, input format, output format。目前支持6中文件格式,sequencefile, rcfile, orc, parquet, textfile, avro。
这两个选项用于配置hive表的InputFormat和OutputFormat类,如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果使用了fileFormat就不能再配置这两个属性了。
配置序列化反序列化类,如果配置fileFormat,也不能配置该属性。目前 sequencefile textfile rcfile 不包含serde,因此可以使用这个选项进行配置。
这些选项用于textfile文件格式时,配置相关的分隔符。
Spark SQL与Hive metastore交互是很常见的使用场景,这样spark就可以直接操作hive中的元数据了。从spark 1.4开始,spark sql可以与不同的hive版本交互。默认spark使用的是hive 1.2.1进行编译,包含对应的serde, udf, udaf等。
hive版本,默认是1.2.1。支持从0.12.0到2.3.3。
HiveMetastoreClient相关的jar包地址,默认是buildin。可以配置成三种属性: - builtin, 使用hive 1.2.1, 在spark编译是,使用-Phive开启。当关闭时,需要指定spark.sql.hive.metastore.version为1.2.1 - maven, 从maven仓库下载编译,这个选项不推荐在生产环境使用。 - jvm中的classpath,这个路径需要包含hive以及对应的依赖,以及hadoop对应版本的依赖。这些资源只需要在driver端提供,如果使用yarn cluster模式,需要保证相关的资源都打包到应用jar中。
在Spark SQL和hive中共享的jar包前缀,默认com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc。典型的例子就是与metastore沟通的JDBC驱动相关的jar。其他需要共享的类,如 log4j中的自定义appender。
配置Spark SQL使用时需要重新加载的类,如hive中使用共享包名定义的udf。
Spark SQL设计的时候就考虑了与Hive元数据、SerDes、UDF的兼容性。
Spark SQL thrift JDBC服务器被设计成开箱即用,无需修改任何Hive的配置就可以在Spark SQL中使用。
Spark SQL支持很多Hive的特性,比如:
下面是Hive在Spark中不支持的特性,大部分特性再Hive中也很少使用。
分桶表,桶是基于hive分区表再次进行hash分区。Spark SQL目前不支持(可以通过spark api进行分桶)。
这里再回顾下分桶表和分区表。分区表是基于目录对数据进行物理的划分,查询的时候可以通过分区字段快速过滤不满足的数据,提升扫表的速度。分桶表是基于某个字段进行hash打散到固定的几个桶里,每个桶对应一个物理文件,他们都存储在同一个文件夹中。当两个表join的时候可以通过分桶的hash信息快速知道那两个文件需要放在一起join(包含相同列值的需要在一起进行join操作,列值通过分桶文件可以快速进行区分)。如果分桶操作搭配上排序操作,那么整个表的存储就可以看做是 桶排序的一种实现,当需要顺序合并所有的数据时,也会提升排序效率。
Hive CLI的格式,专门用于在CLI中进行展示的格式,目前Spark仅支持TextOutputFormat。Hadoop存档。
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。