Spark 入门(1):集群部署

一、Spark 概述

Spark 是 UC Berkeley AMP Lab 开源的通用分布式并行计算框架,目前已成为 Apache 软件基金会的顶级开源项目。Spark 支持多种编程语言,包括 Java、Python、R 和 Scala,同时 Spark 也支持 Hadoop 的底层存储系统 HDFS,但 Spark 不依赖 Hadoop。

1.1 Spark 与 Hadoop

Spark 基于 Hadoop MapReduce 算法实现的分布式计算,拥有 Hadoop MapReduce 所具有的优点,并且具有更高的运算速度。Spark 能够比 Hadoop 运算更快,主要原因是:Hadoop 在一次 MapReduce 运算之后,会将数据的运算结果从内存写入到磁盘中,第二次 MapReduce 运算时在从磁盘中读取数据,两次对磁盘的操作,增加了多余的 IO 消耗;而 Spark 则是将数据一直缓存在内存中,运算时直接从内存读取数据,只有在必要时,才将部分数据写入到磁盘中。除此之外,Spark 使用最先进的 DAG(Directed Acyclic Graph,有向无环图)调度程序、查询优化器和物理执行引擎,在处理批量处理以及处理流数据时具有较高的性能。按照Spark 官网的说法,Spark 相对于 Hadoop 而言,Spark 能够达到 100 倍以上的运行负载。

(图片来源:Apache Spark™)

1.2 Spark 架构及生态

Spark 除了 Spark Core 外,还有其它由多个组件组成,目前主要有四个组件:Spark SQL、Spark Streaming、MLlib、GraphX。这四个组件加上 Spark Core 组成了 Spark 的生态。通常,我们在编写一个 Spark 应用程序,需要用到 Spark
Core 和其余 4 个组件中的至少一个。Spark 的整体构架图如下图所示:

Spark Core:是 Spark 的核心,主要负责任务调度等管理功能。Spark
Core 的实现依赖于 RDDs(Resilient Distributed Datasets,
弹性分布式数据集)的程序抽象概念。

Spark SQL:是 Spark 处理结构化数据的模块,该模块旨在将熟悉的 SQL 数据库查询与更复杂的基于算法的分析相结合,Spark
SQL 支持开源 Hive 项目及其类似 SQL 的 HiveQL 查询语法。Spark
SQL 还支持 JDBC 和 ODBC 连接,能够直接连接现有的数据库。

Spark Streaming:这个模块主要是对流数据的处理,支持流数据的可伸缩和容错处理,可以与 Flume(针对数据日志进行优化的一个系统)和 Kafka(针对分布式消息传递进行优化的流处理平台)等已建立的数据源集成。Spark Streaming 的实现,也使用 RDD 抽象的概念,使得在为流数据(如批量历史日志数据)编写应用程序时,能够更灵活,也更容易实现。

MLlib:主要用于机器学习领域,它实现了一系列常用的机器学习和统计算法,如分类、回归、聚类、主成分分析等算法。

GraphX:这个模块主要支持数据图的分析和计算,并支持图形处理的 Pregel API 版本。GraphX 包含了许多被广泛理解的图形算法,如 PageRank。

1.3 Spark 运行模式

Spark 有多种运行模式,由图 2 中,可以看到 Spark 支持本地运行模式(Local 模式)、独立运行模式(Standalone 模式)、Mesos、YARN(Yet Another Resource Negotiator)、Kubernetes 模式等。

本地运行模式是 Spark 中最简单的一种模式,也可称作伪分布式模式。

独立运行模式为 Spark 自带的一种集群管理模式,Mesos 及 YARN 两种模式也是比较常用的集群管理模式。相比较 Mesos 及 YARN 两种模式而言,独立运行模式是最简单,也最容易部署的一种集群运行模式。

Kubernetes 是一个用于自动化部署、扩展和管理容器化应用程序的开源系统。

Spark 底层还支持多种数据源,能够从其它文件系统读取数据,如 HDFS、Amazon S3、Hypertable、HBase 等。Spark 对这些文件系统的支持,同时也丰富了整个 Spark 生态的运行环境。

二、Spark 部署模式

Spark 支持多种分布式部署模式,主要支持三种部署模式,分别是:StandaloneSpark on YARNSpark on Mesos模式。

Standalone模式为 Spark 自带的一种集群管理模式,即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。它是 Spark 实现的资源调度框架,其主要的节点有 Driver 节点、Master 节点和 Worker 节点。Standalone模式也是最简单最容易部署的一种模式。

Spark on YARN模式即 Spark 运行在Hadoop YARN框架之上的一种模式。Hadoop YARN(Yet Another Resource
Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度。

Spark on Mesos模式,即 Spark 运行在Apache Mesos框架之上的一种模式。Apache Mesos是一个更强大的分布式资源管理框架,负责集群资源的分配,它允许多种不同的框架部署在其上,包括YARN。它被称为是分布式系统的内核。

三种架构都采用了Master/Worker(Slave)的架构,Spark 分布式运行架构大致如下:

本文主要介绍 Spark 的Standalone模式的部署。

三、环境准备

出于学习的目的,本文将 Spark 部署在安装有 CentOS7 系统的 VirtualBox 虚拟机中。

搭建 Spark 集群,需要准备以下文件及环境:
jdk-8u211-linux-x64.tar.gz
spark-2.4.3-bin-hadoop2.7.tgz
3 个独立的 CentOS7 虚拟机系统,机器集群规划如下:

四、安装

4.1.    配置 jdk 环境

解压文件:

tar -zxf jdk-8u211-linux-x64.tar.gz

配置环境变量:

export JAVA_HOME=/path/to/jdk1.8.0_211
export PATH=\$PATH:\$JAVA_HOME/bin

4.2.    配置 Spark 环境

解压文件:

tar -xf **park-2.4.3-bin-hadoop2.7.tgz**

配置环境变量:

export SPARK_HOME=/path/to/spark-2.4.3-bin-hadoop2.7
export PATH=\$PATH:\$SPARK_HOME/bin

修改spark-env.sh 文件

cd spark-2.4.3-bin-hadoop2.7
cp conf/spark-env.sh.template conf/spark-env.sh
vim conf/spark-env.sh
\# 增加如下内容:
export JAVA_HOME=/path/to/jdk1.8.0_211
export SPARK_MASTER_HOST=192.168.56.106

修改slaves文件

cp conf/slaves.template conf/slaves
vim conf/slaves
# 增加如下内容:
192.168.56.106
192.168.56.107
192.168.56.108

4.3. 配置 ssh 免密登录

配置 ssh 免密登录,是为了能够在master机器上来启动所有worker节点,如果不配置免密登录,则在启动每个worker时,都需要输入一遍密码,会很麻烦。当然,如果机器少的话,也可以登录到worker节点上,手动一个一个启动worker

执行:ssh-keygen -t rsa,一直按回车即可。最后会生成类似这样的日志:

并且在用户目录下会自动生成.ssh目录执行ls ~/.ssh可以看到两个文件:

id_rsa生成的私钥文件

id_rsa.pub生成的公钥文件

id_rsa.pub复制到其它机器上,执行以下几条命令:

ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.106: # master所在的主机,如果master不做woker可以不需要。
ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.107:
ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.108:

4.4  配置其它 worker 节点

当前已在master节点配置好了环境,还需要在其它worker节点上配置相类似的环境。

配置其它worker节点很简单,只需要将jdk1.8.0_211spark-2.4.3-bin-hadoop2.7两个目录复制到其它worker节点机器上即可。但要注意,这两个目录在其它 worker 上的绝对路径需要与 master 上的绝对路径一致,不然无法直接在 master 上启动其它 worker 节点。

依次执行以下命令(如果已经配置好 ssh 免密,可以发现执行 scp 指令不需要两次输入密码):

scp -r /path/to/jdk1.8.0_211username@192.168.56.107:/path/to/jdk1.8.0_211
scp -r /path/to/jdk1.8.0_211username@192.168.56.108:/path/to/jdk1.8.0_211
scp -r /path/to/spark-2.4.3-bin-hadoop2.7username@192.168.56.107:/path/to/spark-2.4.3-bin-hadoop2.7
scp -r /path/to/spark-2.4.3-bin-hadoop2.7username@192.168.56.108:/path/to/spark-2.4.3-bin-hadoop2.7

4.5  启动master

执行:

sbin/start-master.sh

输入jps指令(该指令在<pre>不能识别此Latex公式: JAVA_HOME/bin 目录下)可以查看 java 进程名,如输入jps后,会显示这样的信息:

看到有Master字样,说明master进程启动成功了,启动master后,spark 默认会监听8080端口,并可以通过浏览器打开 web 界面,在地址栏输入http://192.168.56.106:8080,查看集群状态。如下图所示:

当前只启动了master,所以看不到任何worker信息。

4.6 启动 worker 节点

执行:

sbin/slaves.sh

会看到类似这样的输出:

再输入jps,会列出当前启动的java进程,显示Worker字样,说明worker进程启动成功了。

此时再刷新下打开的浏览器界面(http://192.168.56.106:8080),可以看到当前启动了三个Worker节点。

也许你会发现界面上显示的 Address 列,怎么是 10 开头的 ip 地址,并且都是一样的,而不是 192 开头的三个不同的 ip 地址。

这是因为虚拟机内有两块虚拟网卡,Spark 会读取环境变量SPARK_LOCAL_IP,如果没设置这个变量,Spark 就会使用getHostByName来获取 ip 地址,会得到10.0.2.15这个 ip 地址。

要解决这个问题,有两种方法:

(1)   将仅主机(Host-Only)网络设置为网卡 1,将网络地址转换(NAT)设置为网卡 2。不过如果使用这种方法,重启虚拟机后,如果是动态 ip,则 ip 地址会变化,会影响之前的配置。

(2)   另一种方法,可在conf/spark-env.sh中设置SPARK_LOCAL_IP这个变量,可以固定为一个 ip 地址,

vim conf/spark-env.sh
# 添加一行:
export SPARK_LOCAL_IP=192.168.56.106

在其他机器上同样需要手动添加这一行,不过要修改为对应的机器 ip。觉得这样有点麻烦。可以通过脚本动态获取本机 ip 地址,在conf/spark-env.sh中添加这两行:

SPARK_LOCAL_IP=`python -c "import socket;import fcntl;import struct;print([(socket.inet_ntoa(fcntl.ioctl(s.fileno(),0x8915,struct.pack('256s', 'enp0s8'))[20:24]), s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][0])"`
export SPARK_LOCAL_IP

这样就可以自动获取本机的enp0s8这块网卡的 ip 地址

最后将修改后的conf/spark-env.sh这个文件复制到其它机器上:

执行:

scp conf/spark-env.sh username@192.168.56.107:/path/to/spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh
scp conf/spark-env.sh username@192.168.56.108:/path/to/spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh

重新启动所有节点:

sbin/stop-all.sh

sbin/start-all.sh

最后刷新浏览器界面,可以看到有 3 个Woker启动了,并且在 Address 列也可以看到都变为 192 开头的 ip 地址了。

五、测试

</pre>{SPARK_HOME}/examples/src/main目录下,有一些 spark 自带的示例程序,有 java、python、r、scala 四种语言版本的程序。这里主要测试 python 版的计算PI的程序。

cd \${SPARK_HOME}/examples/src/main/python

pi.py程序提交到 spark 集群,执行:

spark-submit --master spark://192.168.56.106:7077 pi.py

最后可以看到输出这样的日志:

展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java