前言
随着容器和 Kubernetes 的兴起,微服务逐渐受到很多企业客户的关注。与此同时,微服务之间的通信也成为了企业必要考虑的问题。本文将着重分析
ActiveMQ 和 Kafka 这两款优秀的消息中间件的架构以及在 OpenShift
上的实现。在正式介绍之前,我们先简单介绍服务之间的通信机制。
服务之间的通信
通信方式的划分
服务之间的通信遵循 IPC 标准(Inter-Process
Communication,进程间通信)。它们之间的通信方式,可以按照以下两个维度进行分析:
服务之间同步通信,可以采取基于 gRPC 的方式或 http 的 Rest 方式,如下图 1 所示。在 Rest 方式中,调用 http
的方法进行同步访问,如:GET、POST、PUT、DELETE。
服务之间异步通信,需要借助于标准消息队列或数据流平台。
图 1. 服务之间通信
从同步或异步方式分析了服务之间的通信方式后,我们从另外一个维度来看服务间的通信,即一对一或一对多,如下图 2 所示:
图 2. 服务之间通信
程序之间一对一通信的情况下,有同步和异步两种通信方式:
程序之间一对多的通信,只有异步通信的方式,主要通过发布/订阅模式实现。即:客户端发布通知消息,被一个或者多个相关联(订阅了主题)的服务消费。
接下来,我们介绍服务之间异步通信的实现方式。
异步通信实现
在一个分布式系统中,服务之间相互异步通信最常见的方式是发送消息。我们把负责将发送方的正式消息传递协议转换为接收方的正式消息传递协议的工具叫做
Message Broker(消息代理)。市面上有不少 Message Broker 软件,如 ActiveMQ、RabbitMQ、Kafka
等。
服务之间消息传递主要有两种模式:队列(Queue)和主题(Topic)。
Queue 模式是一种一对一的传输模式。在这种模式下,消息的生产者(Producer)将消息传递的目的地类型是 Queue。Queue
中一条消息只能传递给一个消费者(Consumer),如果没有消费者在监听队列,消息将会一直保留在队列中,直至消息消费者连接到队列为止,消费者会从队列中请求获得消息。
Topic 是一种一对多的消息传输模式。在这种模式下,消息的生产者(Producer)将消息传递的目的地类型是 Topic。消息到达
Topic 后,消息服务器将消息发送至所有订阅此主题的消费者。
消息的分类
严格意义上讲,服务之间发送的消息通常有三种:普通 Messages(消息)、Events(事件)、Commands(命令),如下图 3 所示:
图 3. 消息的分类
Messages 是服务之间沟通的基本单位,消息可以是
ID、字符串、对象、命令、事件等。消息是通用的,它没有特殊意图,也没有特殊的目的和含义。也正是由于这个原因,Events 和 Commands
才应运而生。
Events 是一种 Messages,它的目的是向监听者(Listeners)通知发生了什么事情。Events 由 Producer
发送,Producer 不关心 Events 的 Consumers,如下图 4 所示。
图 4. Event 的传递
举例说明:有一个电商系统,当消费者下订单时电商平台会发布
OrderSubmittedEvent,以通知其他系统(例如物流系统)新订单的信息。此时,电商平台作为 Event 的 Producer,并不关心
Event 的 Consumer。只有订阅了此 Topic 的消费者才能获取到这个 OrderSubmittedEvent。
Commands 是 Producer 给 Consumer 发送的一对一的指令。Producer 会将 Command 发送到消息队列,然后
Consumer 主动从队列获取 Commands。与 Events 不同,Commands 触发将要发生的事情,如下图 5 所示:
图 5. Command
的传递
举例说明 Events:在电商系统中客户下订单后,电商将 BillCustomerCommand 命令发送到计费系统以触发发送发票动作。
在介绍了服务之间的异步通信实现和消息的分类后,我们接下来介绍一种消息中间件 ActiveMQ。
AMQ 在 OpenShift 上的企业级实现
在本小节中,我们先介绍标准消息中间件的规范,然后介绍 ActiveMQ 在 RHEL 操作系统和 OpenShift 上的企业级实现。
标准消息中间件规范
目前业内主要的异步消息传递协议有:
Java 消息传递服务(Java Messaging Service (JMS)),它面向 Java 平台的标准消息传递 API。
面向流文本的消息传输协议(Streaming Text Oriented Messaging Protocol (STOMP)):
WebSocket 通信标准。
高级消息队列协议(Advanced Message Queueing Protocol
(AMQP)):独立于平台的底层消息传递协议,用于集成多平台应用程序。
目前市面上消息中间件的种类很多,Apache ActiveMQ Artemis 是最流行的开源、基于 Java 的消息服务器。ActiveMQ
Artemis 支持点对点的消息传递(Queue)和订阅/发布模式(Topic)。
ActiveMQ Artemis 支持标准 Java NIO(New I/O,同步非阻塞模式)和 Linux AIO 库(Asynchronous
I/O,异步非阻塞 I/O 模型)。AMQ 支持多种协议,包括 MQTT、STOMP、AMQP 1.0、JMS 1.1 和 TCP。
红帽 JBoss AMQ 7 Broker 是基于 Apache ActiveMQ Artemis
项目的企业级消息中间件,在客户生产系统被大量使用。功能上 JBoss AMQ 7 Broker 与 Apache ActiveMQ Artemis
是一一对应的,其支持的协议如下图 6 所示。因为本文主要是面向生产环境的进行介绍,因此下文我们使用 JBoss AMQ 7 Broker
进行功能验证,部署过程不进行赘述。
图 6. AMQ 支持的协议
在 RHEL 中配置 AMQ7
接下来,我们在 RHEL 上配置 JBoss AMQ7。
首先创建目录:
1 2 | $sudo mkdir -p /opt/install/amq7
$sudo chown -R jboss:jboss /opt/install/amq7
|
创建名为 broker0 的 JBoss AMQ Broker 实例,并指定 guest 用户和角色的凭据,执行结果如下图 7 所示:
1 2 3 | $ ./bin/artemis create \
> --user jboss --password jboss --role amq --allow-anonymous \
> /opt/install/amq7/broker0
|
图 7. 创建 AMQ
Broker
执行完上述所述命令后,RHEL 中/opt/install/amq7/broker0 目录中有一个 broker instance ,其中包含运行
JBoss AMQ 7 所需的库,配置和实用程序,如下图 8 所示:
图 8. AMQ Broker
目录结构
执行命令启动 broker0:
1 2 3 | $ "/opt/install/amq7/broker0/bin/artemis-service" start
Starting artemis-service
artemis-service is now running (20240)
|
通过 ps 命令行确认 Broker 已经启动,如下图 9 所示:
图 9. 确认 Broker
已经启动
在上图的执行结果中,我们需要到如下参数:
查看 AMQ 的多协议支持
AMQ Broker 默认启动多种协议侦听与其通信的客户端连接(CORE、MQTT、AMQP、STOMP、HORNETQ 和
OPENWIRE)。
首先查看系统的 Java 监听端口,如下图 10 所示:
图 10. 查看 AMQ
的监听端口
从上图可以看出,Broker 正在监听如下端口:61613、61616、1883、8161、5445 和 5672。
通过查看 Broker 日志文件,确认每个端口侦听对应的协议。例如 AMQP 的监听端口是 5672,如清单 1 所示:
清单 1. 查看 AMQ 日志
1 2 3 4 5 6 7 8 | $ cat /opt/install/amq7/broker0/log/artemis.log
00:25:40,638 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
00:25:40,640 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:5445 for protocols [HORNETQ,STOMP]
00:25:40,664 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:5672 for protocols [AMQP]
00:25:40,666 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:1883 for protocols [MQTT]
00:25:40,668 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61613 for protocols [STOMP]
00:25:40,671 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
00:25:40,672 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.0.0.amq-700008-redhat-2 [0.0.0.0, nodeID=3b75e2e5-c170-11e9-a53c-080027f33d64]
|
创建持久队列
接下来,使用 JBoss AMQ 7 提供的 artemis 建持久的消息传递地址和队列。
如前文所述:AMQ 支持队列和主题模式。JBOSS AMQ7 中的 Topic 是用 Queue 实现的。在创建 Queue
时通过可以指定参数实现。Multicast 方式是 Topic,Anycast 就是 Queue。
创建名为 DavidAddress 的 Anycast 地址,执行结果如下图 11 所示:
1 | $ ./bin/artemis address create --name DavidAddress --anycast --no-multicast
|
图 11. 创建 AMQ
Anycast 消息队列
接下来,创建持久的 Anycast 队列与此前创建的 Anycast 地址相关联:
1 2 | $ ./bin/artemis queue create --name gpteQueue --address DavidAddress --anycast --durable
--purge-on-no-consumers --auto-create-address
|
Queue 创建成功以后,可以在 AMQ7 管理控制台中查看新创建的 Address 和 Queue,如下图 12 所示:
图 12. AMQ Console
查看消息队列
截止到目前,我们已经创建了消息地址和队列。我们关注队列的几个数值。
Consumer Count:访问此队列 Consumer 的数量
Message Count:队列中的消息数。
Messages Acknowledged:队列中的消耗的消息数。
Messages Added:队列中被添加消息数。
目前这几个数值均为零,如下图 13 所示:
图 13. AMQ Console
查看消息队列
接下来,使用 artemis 创建两个并行线程,向消息队列发送总共 20 个 10 字节消息,每条消息之间有一秒休眠,执行结果如下图 14
所示:
1 2 | $./bin/artemis producer --destination gpteAddress --message-count 10 --message-size 10
--sleep 1000 --threads 2 --url tcp://localhost:61616
|
图 14. 向队列发送 20
条消息
启动两个 Consumer,第一个 Consumer 读取队列中一条消息,第二个 Consumer 读取队列中其余的消息,执行结果如下图 15
所示:
启动第一个 Consumer,执行结果如下图所示,获取到一条消息后 Consumer 程序退出。
1 2 | $ ./bin/artemis consumer --destination gpteQueue --url tcp://localhost:61616
--message-count 1
|
图 15. 启动第一个
Consumer
启动第二个 Consumer,执行结果如下图 16 所示,获取到 19 条消息后 Consumer 程序退出。
1 | $ ./bin/artemis consumer --destination gpteQueue --url tcp://localhost:61616
|
图 16. 启动第二个
Consumer
我们再次查看 AMQ Console,如下图 17 所示:
Consumer Count: 访问此队列 Consumer 的数量,数值为 2;
Message Count:队列中的消息数,数值为 0,因为消息已经被读取;
Messages Acknowledged:队列中的消耗的消息数,数值为 20,队列中被消耗了 20 条消息;
Messages Added:队列中被添加消息数,数值为 20,队列中被发送了 20 条消息。
图 17. AMQ Console
查看消息队列
从 AMQ Console 我们得到的反馈结果,符合我们的预期。
AMQ 在 OpenShift 上的部署
在 OpenShift 上我们可以直接用现成的模板部署 AMQ7。针对生产环境我们需要注意以下三点:
AMQ HA 的实现
在 OpenShift 中,AMQ 的高可用是通过对一个 AMQ 创建多个 pod 来实现的,而无需通过 AMQ
本身的 HA 机制。当 AMQ 的一个 pod 出现问题,OpenShift 的 scaledown controller 可以实现
AMQ 的 HA。
在 OpenShift 中,StatefulSet 用于保证有状态应用。当 StatefulSet 发生 scaled
down,有状态 pod 实例时会被删除,与被删除 pod 关联的 PersistentVolumeClaim 和
PersistentVolume 将保持不变。当 pod 新创建以后,PVC 会被重新挂在到 Pod
中,之前的数据可以访问。
但如果 AMQ 使用 Queue sharding,StatefulSet 的这种方式就不太合适。使用 sharding
的应用,当出现应用实例减少时,会要求将数据重新分发到剩余的应用程序实例上,而不是一直等待故障 pod 的重启。这种情况下就需要用到
StatefulSet Scale-Down Controller。StatefulSet Scale-Down
Controller 允许我们在 StatefulSet 规范中指定 cleanup pod 的
template,该模板将用于创建一个新的 cleanup pod,该 cleanup pod 将挂在被删除的 pod 释放的
PersistentVolumeClain。cleanup pod 可以访问已删除的 pod 实例的数据,并且可以执行 app
所需的任何操作。cleanup pod 完成任务后,控制器将删除 pod 和 PersistentVolumeClaim,释放
PersistentVolume。
AMQ 的访问方式
如果访问 AMQ 的客户端在 OpenShift 内部,那么使用 AMQ 的 Service IP 和端口号即可。如果访问 AMQ 的客户端在
OpenShift 外部,则需要考虑使用 NodePort 的方式。
在 OpenShift 上部署一个 AMQ 单实例
首先,我们先基于一个基础模板部署一个单实例的 AMQ。这种部署方式适合于测试环境。
使用模板创建 AMQ:
1 2 3 | # oc new-app --template=amq-broker-72-basic -e
AMQ_PROTOCOL=openwire,amqp,stomp,mqtt,hornetq -e AMQ_QUEUES=demoQueue -e
AMQ_ADDRESSES=demoTopic -e AMQ_USER=amq-demo-user -e ADMIN_PASSWORD=password
|
查看 pod,AMQ Broker 已经创建成功:
1 2 3 | [root@workstation-46de ~]# oc get pods
NAME READY STATUS RESTARTS AGE
broker-amq-1-94zkz 1/1 Running 0 4m
|
登陆 AMQ pod,通过 Producer 向队列中发送消息,可以成功,如清单 2 所示:
清单 2. 向队列发送消息
1 2 3 4 5 6 7 8 | # oc rsh broker-amq-1-94zkz
sh-4.2$ ./broker/bin/artemis producer
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Producer ActiveMQQueue[TEST], thread=0 Started to calculate elapsed time ...
Producer ActiveMQQueue[TEST], thread=0 Produced: 1000 messages
Producer ActiveMQQueue[TEST], thread=0 Elapsed time in second : 2 s
Producer ActiveMQQueue[TEST], thread=0 Elapsed time in milli second : 2072 milli seconds
|
AMQ Console 用于图形化管理。我们为 AMQ Console 在 OpenShift 中创建路由,首先创建路配置文件,如清单 3
所示:
清单 3. AMQ Console
路由配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # cat console.yaml
apiVersion: v1
kind: Route
metadata:
labels:
app: broker-amq
application: broker-amq
name: console-jolokia
spec:
port:
targetPort: console-jolokia
to:
kind: Service
name: broker-amq-headless
weight: 100
wildcardPolicy: Subdomain
host: star.broker-amq-headless.amq-demo.svc
|
应用配置如下:
1 2 | [root@workstation-46de ~]# oc apply -f console.yaml
route.route.openshift.io/console-jolokia created
|
查看新创建的 Console 路由:
1 2 3 4 | # oc get route
NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD
amq console-amq-demo.apps-46de.generic.opentlc.com broker-amq-amqp < all > passthrough
None
|
通过浏览器可以访问 Console,如下图 18 所示:
图 18. AMQ Console
查看消息队列
在 OpenShift 上部署 AMQ Cluster
OpenShift 提供面向生产的 AMQ Cluster 部署模板,这个模板创建的 AMQ 包含持久化存储:
查看模板:
1 2 3 4 | # oc get template
NAME DESCRIPTION PARAMETERS OBJECTS
amq-broker-72-persistence-clustered Application template for Red Hat AMQ brokers. This
template doesn't feature S... 21 (6 blank) 6
|
接下来,通过命令行或者 OpenShift UI 界面,使用模板创建 AMQ 集群。以使用 OpenShift UI 为例如下图 19 所示:
图 19. 选择 AMQ
Cluster 模板
填写 AMQ 集群相应的参数,如下图 20 所示:
图 20. 输入 AMQ
集群参数
部署成功以后,将 AMQ 的 statefulset 增加到 3 个,设置完成后,OpenShift 中会部署 3 个 AMQ Broker 的
pod,如下图 21 所示:
1 2 | # oc scale statefulset broker-amq --replicas=3
statefulset.apps/broker-amq scaled
|
图 21. 查看 AMQ
Pod
为了让 AMQ 集群能够被外部客户端访问,我们为 AMQ Broker 配置 NodePort。配置文件如清单 4 所示:
清单 4. AMQ Console
路由配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | # cat port.yaml
apiVersion: v1
kind: Service
metadata:
annotations:
description: The broker's OpenWire port.
service.alpha.openshift.io/dependencies: >-
[{"name": "broker-amq-amqp", "kind": "Service"},{"name":
"broker-amq-mqtt", "kind": "Service"},{"name": "broker-amq-stomp", "kind":
"Service"}]
creationTimestamp: '2018-08-29T14:46:33Z'
labels:
application: broker
template: amq-broker-72-statefulset-clustered
xpaas: 1.4.12
name: broker-external-tcp
namespace: amq-demo
resourceVersion: '2450312'
selfLink: /api/v1/namespaces/amq-demo/services/broker-amq-tcp
uid: 52631fa0-ab9a-11e8-9380-c280f77be0d0
spec:
externalTrafficPolicy: Cluster
ports:
- nodePort: 30001
port: 61616
protocol: TCP
targetPort: 61616
selector:
deploymentConfig: broker-amq
sessionAffinity: None
type: NodePort
status:
loadBalancer: {}
|
应用配置文件:
1 2 | # oc apply -f port.yaml
service/broker-external-tcp created
|
在 AMQ 的 pod 中发起 Producer,向 demoQueue 中发送消息:
1 2 | artemis producer --url tcp:// 129.146.152.123:30001 --message-count 300 --destination
queue://demoQueue
|
客户端启动 Consumer,连接 demoQueue,可以读取到消息。如清单 5 所示。
清单 5. Consumer
读取消息队列信息
1 2 3 4 5 | #./artemis consumer --url tcp://129.146.152.123:30001 --message-count 100 --destination queue://demoQueue
Consumer:: filter = null
Consumer ActiveMQQueue[demoQueue], thread=0 wait until 100 messages are consumed
Consumer ActiveMQQueue[demoQueue], thread=0 Consumed: 100 messages
Consumer ActiveMQQueue[demoQueue], thread=0 Consumer thread finished
|
至此,ActiveMQ 在 OpenShift 上成功实现。
接下来,我们介绍 Kafka 的架构以及在 OpenShift 上的实现。
从分布式消息平台到数据流平台
在本小节中,我们主要介绍了 ActiveMQ 的架构以及其在 OpenShift 上的实现。我们知道,传统的应用大多使用 Java EE
架构。这种框架下开发的应用之间的异步通信大多采用 JMS 的方式、以一对一通信为主,因此使用 ActiveMQ Queue
的方式被大量使用。
随着微服务的兴起,一个单体应用被拆分成多个微服务、服务之间的调用骤然增多、服务之间交换的信息量也迅速增加,要求的吞吐量比传统模式下高的多。例如,在常见的日志收集系统
EFK 中,Fluentd 作为数据收集器会收集多个对象(如操作系统、容器等)的大量日志,然后然后将数据传递到 Elasticsearch
集群。多个 Fluentd 收集的大量日志,在向 Elasticsearch 传递时往往就会造成大量阻塞,这时我们使用 ActiveMQ
的消息队列就不是很合适,就需要使用到数据流平台。而在数据流平台中,Kafka 是性能很高、被大量使用的一个开源解决方案。
Kafka 在 OpenShift 上的实现
Kafka 的基本概念
Kafka 是一个分布式数据流平台,开发它的目的是使微服务和其他应用程序组件尽可能快地交换大量数据,以进行实时事件处理。
Kafka 是提供了发布/订阅功能的分布式数据流平台。它适用于两类应用:
Kafka 有四个核心 API,他们的作用如下:
Producer API:应用将记录流发布到一个或多个 Topic 上。
Consumer API:应用订阅一个或多个 Topic 并处理生成的记录流。
Streams API:应用程序充当流处理器,消耗来自一个或多个 Topic 的输入流并生成到一个或多个输出 Topic
的输出流,从而有效地将输入流转换为输出流。
Connector API:允许将 Kafka Topic 连接到现有应用程序或数据系统的、可重用 Producer 或
Consumer。例如,关系数据库的 Connector 可以捕获对表的每个更改。
Kafka 中每个 topic 可以有一个或多个
partition(分区)。不同的分区对应着不同的数据文件,不同的数据文件可以存放到不同的节点上,以实现数据的高可用。此外,Kafka
使用分区支持物理上的并发写入和读取,从而大大提高了吞吐量,如下图 22 所示:
图 22. Kafka 的
partition
Kafka 集群在 OpenShift 集群上的实现方式
随着越来越多的应用程序部署到 OpenShift 上,Kafka 集群运行 OpenShift 的场景也越来越多。将 Kafka 部署到
OpenShift 集群上主要有如下两个好处:
Topic 是本身无状态的,但 Kafka 作为分布式数据流平台,需要实现状态保持。因此需要实现以下几点:
Kafka 集群中多个 Broker 之间的通信
Broker 的状态持久化(即消息)
Broker 出现问题后可以快速恢复
原生 Kubernetes 实现 Kafka 的有状态化较为繁琐,需要使用 Stateful Sets 和 Persistent Volumes。在
OpenShift 中可以通过 Operator 来方便地管理 Kafka 集群,实现状态化。
OpenShift 通过 Operator 管理 Kafka 集群会用到 Strimzi。Strimzi 是一个开源项目,它为在 Kubernetes
和 OpenShift 上运行 Kafka 提供了 Container Image 和 Operator。
Kafka 使用 Zookeeper:
对管理 Kafka Broker 和 Topic Partition 的 Leader 选举。
管理构成集群的 Kafka Brokers 服务发现。
将拓扑更改发送到 Kafka,因此集群中的每个节点都知道新 Broker 何时加入、退出;Topic 被删除或被添加等。
如下图 23 所示:
图 23. 在
OpenShift 集群上实现 Kafka 集群
一个 Kafka 集群可以配置很多个 Broker。生产环境中,至少部署 3 个 Broker。接下来,我们展示在 OpenShift 上部署
Kafka 集群。
在 OpenShift 上部署 Kafka 集群
首先下载 Kafka 在 OpenShift 上的安装文件,解压缩后如下图
24、25 所示:
图 24. Kafka 安装包
图 25. Kafka 安装包
由于篇幅有限,我们不会详细介绍配置文件的内容。这些配置文件是在 OpenShift 集群上设置 AMQ Streams
所需的完整资源集合。文件包括:
在安装集群 Operator 之前,我们需要配置它运行的 Namespace。我们将通过修改 RoleBinding.yaml 文件以指向新创建的项目
amq-streams 来完成此操作。只需通过 sed 编辑所有文件即可完成此操作。
1 2 | [root@workstation-46de kafka]# sed -i 's/namespace: .*/namespace: amq-streams/'
install/cluster-operator/*RoleBinding*.yaml< br >
|
确认更改生效,如清单 6 所示:
清单 6. Consumer
读取消息队列信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # more install/cluster-operator/020-RoleBinding-strimzi-cluster-operator.yaml
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
name: strimzi-cluster-operator
labels:
app: strimzi
subjects:
- kind: ServiceAccount
name: strimzi-cluster-operator
namespace: amq-streams
roleRef:
kind: ClusterRole
name: strimzi-cluster-operator-namespaced
apiGroup: rbac.authorization.k8s.io
|
接下来,部署 Kafka Operator,执行结果如下图 26 所示:
1 2 | # oc new-project amq-demo
# oc apply -f install/cluster-operator
|
图 26. 部署 Kafka
Operator
查看部署结果,如下图 27 所示:
图 27. 确认部署
Kafka Operator 部署成功
我们通过配置文件部署 Kafka 集群,如清单所示。
配置文件中包含如下设置:
3 个 Kafka broker- 这是生产部署的最小建议数值
3 ZooKeeper 节点 - 这是生产部署的最小建议数量
持久声明存储,确保将持久卷分配给 Kafka 和 ZooKeeper 实例
清单 7. Kafka 集群配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | # cat production-ready.yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
name: production-ready
spec:
kafka:
replicas: 3
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
storage:
type: persistent-claim
size: 3Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 1Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
|
应用配置,查看部署结果如下图 28 所示:
1 2 | # oc apply -f production-ready.yaml
kafka.kafka.strimzi.io/production-ready created
|
图 28. Kafka
集群部署成功
通过配置文件创建 Kafka Topic,配置文件如清单 8 所示:
清单 8. Topic 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 | # cat lines.yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
name: lines
labels:
strimzi.io/cluster: production-ready
spec:
partitions: 2
replicas: 2
config:
retention.ms: 7200000
segment.bytes: 1073741824
|
在上面的配置中:
metadata.name,这是 topic 的名称
metadata.labels [strimzi.io/cluster]是 topic 的目标集群
spec.partitions 是 topic 的分区数
spec.replicas,即每个分区的副本数
spec.config 包含其他配置选项,例如保留时间和段大小
应用配置:
1 2 | # oc apply -f lines.yaml
kafkatopic.kafka.strimzi.io/lines created
|
我们来获取 Topic 信息。
首先登陆 Kafka 的 pod,然后利用 kafka-topics.sh 脚本查看 Topic 的信息,如清单 9
所示,我们可以看到分区是两个:
清单 9. 查看 Topic 信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | # oc rsh production-ready-kafka-0
Defaulting container name to kafka.
Use 'oc describe pod/production-ready-kafka-0 -n amq-streams' to see all of the containers in this pod.
sh-4.2$ cat bin/kafka-topics.sh
#!/bin/bash
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
sh-4.2$ bin/kafka-topics.sh --zookeeper localhost:2181 --topic lines –describe
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:lines PartitionCount:2 ReplicationFactor:2 Configs:segment.bytes=1073741824,retention.ms=7200000
Topic: lines Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: lines Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
|
增加 Partition 数量,首先需要修改配置文件,如清单 10 所示
清单 10. 修改 Topic 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 | # cat lines-10.yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
name: lines
labels:
strimzi.io/cluster: production-ready
spec:
partitions: 10
replicas: 2
config:
retention.ms: 14400000
segment.bytes: 1073741824
|
应用配置:
1 2 | # oc apply -f lines-10.yaml
kafkatopic.kafka.strimzi.io/lines configured
|
首先登陆 Kafka 的 pod,然后利用 kafka-topics.sh 脚本查看 Topic 的信息,分区数量已经从两个增加到了十个,如清单 11
所示
清单 11. 查看 Topic 信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | [root@workstation-46de kafka]# oc rsh production-ready-kafka-0
Defaulting container name to kafka.
Use 'oc describe pod/production-ready-kafka-0 -n amq-streams' to see all of the containers in this pod.
sh-4.2$ bin/kafka-topics.sh --zookeeper localhost:2181 --topic lines --describe
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:lines PartitionCount:10 ReplicationFactor:2 Configs:segment.bytes=1073741824,retention.ms=14400000
Topic: lines Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: lines Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: lines Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: lines Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: lines Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: lines Partition: 5 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: lines Partition: 6 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: lines Partition: 7 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: lines Partition: 8 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: lines Partition: 9 Leader: 2 Replicas: 2,1 Isr: 2,1
|
接下来,通过 kafka-console-producer.sh 脚本启动 Producer,如清单所示:
清单 12. 启动 Producer
1 2 3 4 5 6 7 8 9 10 | sh-4.2$ cat bin/kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
启动一个 Producer:
sh-4.2$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>
|
我们发送信息到到 Topic 中,第一次发送"david wei",第二次发送"is doing the test!!!"如下图 29 所示:
图 29. 向 Topic
中发送信息
接下来,使用 kafka-console-consumer.sh 脚本启动 Consumer 监听 Topic,如清单 13 所示:
清单 13 启动 Producer
1 2 3 4 5 6 7 8 | sh-4.2$ cat bin/kafka-console-consumer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
sh-4.2$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
|
Consumer 可以读取到 Topic 中此前 Producer 发送的消息,如图 30 所示:
图 30. Consumer
读取 Topic 信息
配置 Kafka 外部访问
在某些情况下,需要从外部访问部署在 OpenShift 中的 Kafka 群集。接下来我们介绍配置的方法。首先,使用包含外部 Listener
配置内容的文件重新部署 Kafka 集群,如清单 14 所示:
清单 14. Kafka
集群配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | [root@workstation-46de ~]# cat production-ready-external-routes.yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
name: production-ready
spec:
kafka:
replicas: 3
listeners:
plain: {}
tls: {}
external:
type: route
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
storage:
type: persistent-claim
size: 3Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 1Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
|
应用配置文件:
1 2 | # oc apply -f production-ready-external-routes.yaml
kafka.kafka.strimzi.io/production-ready configured
|
应用配置文件以后,Operator 会对 Kafka 集群上启动滚动更新,逐个重新启动每个代理以更新其配置,如图 31 所示,以便在
OpenShift 中的每个 Broker 都有一个路由。
图 31. Kafka
集群滚动升级
查看 Kafka Broker 的路由,如清单 15 所示:
清单 15. Kafka 集群路由
1 2 3 4 5 6 | [root@workstation-46de ~]# oc get routes
NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD
production-ready-kafka-0 production-ready-kafka-0-amq-streams.apps-46de.generic.opentlc.com production-ready-kafka-0 9094 passthrough None
production-ready-kafka-1 production-ready-kafka-1-amq-streams.apps-46de.generic.opentlc.com production-ready-kafka-1 9094 passthrough None
production-ready-kafka-2 production-ready-kafka-2-amq-streams.apps-46de.generic.opentlc.com production-ready-kafka-2 9094 passthrough None
production-ready-kafka-bootstrap production-ready-kafka-bootstrap-amq-streams.apps-46de.generic.opentlc.com production-ready-kafka-external-bootstrap 9094 passthrough None
|
接下来,我们在 OpenShift 集群外部配置代理与 OpenShift 中的 Kafka 进行交互,外部客户端必须使用
TLS。首先,我们需要提取服务器的证书。
1 2 3 | # oc extract secret/production-ready-cluster-ca-cert --keys=ca.crt --to=-
>certificate.crt
# ca.crt
|
然后,我们需要将其安装到 Java keystore。
1 2 3 | # keytool -import -trustcacerts -alias root -file certificate.crt -keystore keystore.jks
-storepass password -noprompt
Certificate was added to keystore
|
使用此证书 Producer 和 Consumer 两个应用,下载两个 JAR。
1 2 3 4 | #wget -O log-consumer.jar
https://github.com/RedHatWorkshops/workshop-amq-streams/blob/master/bin/log-consumer.jar?raw=true
#wget -O timer-producer.jar
https://github.com/RedHatWorkshops/workshop-amq-streams/blob/master/bin/timer-producer.jar?raw=true
|
使用新的配置设置启动两个应用程序。
首先启动 Consumer 应用,访问 OpenShift 内部的 Kafka Broker,执行结果如下所示:
1 2 3 4 5 | java -jar log-consumer.jar \
--camel.component.kafka.configuration.brokers=production-ready-kafka-bootstrap-amq-streams.apps-46de.generic.opentlc.com:443 \
--camel.component.kafka.configuration.security-protocol=SSL \
--camel.component.kafka.configuration.ssl-truststore-location=keystore.jks \
--camel.component.kafka.configuration.ssl-truststore-password=password
|
启动 Producer 应用:
1 2 3 4 5 | java -jar timer-producer.jar \
--camel.component.kafka.configuration.brokers=production-ready-kafka-bootstrap-amq-streams.apps-46de.generic.opentlc.com:443 \
--camel.component.kafka.configuration.security-protocol=SSL \
--camel.component.kafka.configuration.ssl-truststore-location=keystore.jks \
--camel.component.kafka.configuration.ssl-truststore-password=password --server.port=0
|
两个应用都启动后,我们观察日志可以看到,Producer 从外部通过 OpenShift 上 Kafka 的路由向 Topic
中发送消息,Consumer 应用从外部通过 OpenShift 上 Kafka 的路由监听 Topic,获取到 Topic 中的信息。如下图
32、33 所示:
图 32. Consumer
应用
图 33. Producer
应用
在 OpenShift 集群中启动 Consumer 监听 Topic,可以看到结果如图 34 所示,与外部 Consumer 看到信息一致:
1 2 | sh-4.2$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic lines
--from-beginning
|
图 34. 获取 Topic
中消息
配置 Mirror Maker
很多情况下,应用程序需要跨 Kafka 集群在彼此之间进行通信。数据可能在数据中心的 Kafka
中被摄取并在另一个数据中心中被消费。接下来,我们将展示如何使用 MirrorMaker 在 Kafka 集群之间复制数据,如图 35 所示:
图 35. Kafka
集中的 MirrorMaker
部署第二套 Kafka 集群:Production-ready-target,作为现有 Production-ready Kafka
集群的目标端。集群部署配置文件如清单 16 所示:
清单 16. Kafka 集群配置文件