本文主要分为三个部分:
2012~2013 年期间,阿里巴巴中间件团队自主研发并对外开源了第三代分布式消息引擎 RocketMQ,其高性能、低延迟、抗堆积的特性稳定支撑了阿里巴巴 双11 万亿级数据洪峰业务,其云产品 Aliware MQ 在微服务、流计算、IoT、异步解耦、数据同步等无数工况场景大放异彩。
2016 年,阿里巴巴向 Apache 软件基金会捐赠了 RocketMQ。次年,RocketMQ 顺利从基金会毕业,成为 Apache 顶级开源项目,与 Apache Hadoop,Apache Spark 一起为全球分布式、大数据领域的开发者带来福音。然而,在云原生时代的今天,RocketMQ 作为有状态的分布式服务系统,如何在大规模集群上做到极简运维,则是一个极具挑战和价值的问题。
RocketMQ 支持多种部署方式,以基本的双主双从架构为例,如下图所示。
RocketMQ 双主双从架构
这里面包括了一共 7 个 RocketMQ 服务实例:3 个 name server 实例,2 个 master broker 实例,以及 2 个 slave broker 实例。
传统的部署方式需要手动或编写脚本在每个节点上进行环境和文件配置。此外,随着用户业务的增加,存在对集群进行无缝扩容等需求。传统方式是运维人员访问不同节点,依赖操作手册和脚本按步骤进行操作来完成,耗费人力,且存在误操作的可能。一些公司可能会使用一些平台和工具如 Ansible 来帮助自动化运维,此外越来越多的公司开始集成和使用基于 Kubernetes 的云原生生态。
使用 Kubernetes 提供的 Deployment 和 StatefulSet 等原生资源可以很好地解决无状态应用的管理问题,但对于数据库和 RocketMQ 这类有状态应用,则存在很多局限性。例如对 RocketMQ 来说扩容不仅仅是拉起新的实例 Pod 就完成了,还需要同步复制 Broker 的状态信息包括 Topic 信息和订阅关系这些元数据,同时要正确配置新 Broker 的 config 参数,包括 brokerName 和 NameServer IP List 等,才能使得新扩容的 Broker 可用,而这些仅仅靠用户编写 StatefulSet,修改 size 或 replicas 然后 apply 是无法做到的。
实际上 Kubernetes 开发人员也发现了这些问题,因此引入了自定义资源和控制器的概念,让开发人员可以直接用 Go 语言调用 Kubernetes API,编写自定义资源和对应的控制器逻辑来解决复杂有状态应用的管理问题,提供特定应用相关的自定义资源的这类代码组件称之为 Operator。由具备 RocketMQ 领域知识的专家编写 Operator,屏蔽了应用领域的专业知识,让用户只需要关心和定义希望达到的集群终态,这也是 Kubernetes 声明式 API 的设计哲学。
Operator 是在 Kubernetes 基础上通过扩展 Kubernetes API,用来创建、配置和管理复杂的有状态应用,如分布式数据库等。Operator 基于 Kubernetes 1.7 版本以来引入的自定义控制器的概念,在自定义资源和控制器之上构建,同时又包含了应用程序特定的领域知识。实现一个 Operator 的关键是 CRD(自定义资源)和 Controller(控制器)的设计。
Operator 站在 Kubernetes 内部视角,为应用的云原生化打开了新世界的大门。自定义资源可以让开发人员扩展添加新功能,更新现有的功能,并且可以自动执行一些管理任务,这些自定义的控制器就像 Kubernetes 原生的组件一样,Operator 可以直接使用 Kubernetes API 进行开发,也就是说他们可以根据这些控制器编写的自定义规则来创建和更改 Pods / Services、对正在运行的应用进行扩缩容。
本文使用 RocketMQ Operator 0.2.1 版本,展示如何使用 RocketMQ Operator 在 Kubernetes 上快速创建部署一个 RocketMQ 服务集群。
$ git clone https://github.com/apache/rocketmq-operator.git
$ cd rocketmq-operator
$ ./install-operator.sh
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
rocketmq-operator-564b5d75d-jllzk 1/1 Running 0 108s
成功安装时,rocketmq-operator pod 处于类似上面例子的 running 状态。
应用 rocketmq-operator / example 中的 rocketmq_v1alpha1_rocketmq_cluster.yaml 文件,快速部署一个 RocketMQ 集群。rocketmq_v1alpha1_rocketmq_cluster.yaml 文件内容如下:
apiVersion: rocketmq.apache.org/v1alpha1
kind: Broker
metadata:
# name of broker cluster
name: broker
spec:
# size is the number of the broker cluster, each broker cluster contains a master broker and [replicaPerGroup] replica brokers.
size: 1
# nameServers is the [ip:port] list of name service
nameServers: ""
# replicationMode is the broker replica sync mode, can be ASYNC or SYNC
replicationMode: ASYNC
# replicaPerGroup is the number of each broker cluster
replicaPerGroup: 1
# brokerImage is the customized docker image repo of the RocketMQ broker
brokerImage: apacherocketmq/rocketmq-broker:4.5.0-alpine
# imagePullPolicy is the image pull policy
imagePullPolicy: Always
# resources describes the compute resource requirements and limits
resources:
requests:
memory: "2048Mi"
cpu: "250m"
limits:
memory: "12288Mi"
cpu: "500m"
# allowRestart defines whether allow pod restart
allowRestart: true
# storageMode can be EmptyDir, HostPath, StorageClass
storageMode: EmptyDir
# hostPath is the local path to store data
hostPath: /data/rocketmq/broker
# scalePodName is broker-[broker group number]-master-0
scalePodName: broker-0-master-0
# volumeClaimTemplates defines the storageClass
volumeClaimTemplates:
- metadata:
name: broker-storage
spec:
accessModes:
- ReadWriteOnce
storageClassName: rocketmq-storage
resources:
requests:
storage: 8Gi
---
apiVersion: rocketmq.apache.org/v1alpha1
kind: NameService
metadata:
name: name-service
spec:
# size is the the name service instance number of the name service cluster
size: 1
# nameServiceImage is the customized docker image repo of the RocketMQ name service
nameServiceImage: apacherocketmq/rocketmq-nameserver:4.5.0-alpine
# imagePullPolicy is the image pull policy
imagePullPolicy: Always
# hostNetwork can be true or false
hostNetwork: true
# Set DNS policy for the pod.
# Defaults to "ClusterFirst".
# Valid values are 'ClusterFirstWithHostNet', 'ClusterFirst', 'Default' or 'None'.
# DNS parameters given in DNSConfig will be merged with the policy selected with DNSPolicy.
# To have DNS options set along with hostNetwork, you have to specify DNS policy
# explicitly to 'ClusterFirstWithHostNet'.
dnsPolicy: ClusterFirstWithHostNet
# resources describes the compute resource requirements and limits
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1024Mi"
cpu: "500m"
# storageMode can be EmptyDir, HostPath, StorageClass
storageMode: EmptyDir
# hostPath is the local path to store data
hostPath: /data/rocketmq/nameserver
# volumeClaimTemplates defines the storageClass
volumeClaimTemplates:
- metadata:
name: namesrv-storage
spec:
accessModes:
- ReadWriteOnce
storageClassName: rocketmq-storage
resources:
requests:
storage: 1Gi
注意到这个例子中 storageMode: EmptyDir,表示存储使用的是 EmptyDir,数据会随着 Pod 的删除而抹去,因此该方式仅供开发测试时使用。一般使用 HostPath 或 StorageClass 来对数据进行持久化存储。使用 HostPath 时,需要配置 hostPath,声明宿主机上挂载的目录。使用 storageClass 时,需要配置 volumeClaimTemplates,声明 PVC 模版。具体可参考 RocketMQ Operator 文档。
应用上面的 yaml 文件,输入命令:
$ kubectl apply -f example/rocketmq_v1alpha1_rocketmq_cluster.yaml
broker.rocketmq.apache.org/broker created
nameservice.rocketmq.apache.org/name-service created
查看集群 Pod 状态:
$ kubectl get pods -owide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
broker-0-master-0 1/1 Running 0 27s 10.1.2.27 docker-desktop <none> <none>
broker-0-replica-1-0 1/1 Running 0 27s 10.1.2.28 docker-desktop <none> <none>
name-service-0 1/1 Running 0 27s 192.168.65.3 docker-desktop <none> <none>
rocketmq-operator-76b4b9f4db-x52mz 1/1 Running 0 3h25m 10.1.2.17 docker-desktop <none> <none>
使用默认的 rocketmq_v1alpha1_rocketmq_cluster.yaml 文件配置,我们看到集群中拉起了 1 个 name server 服务(name-service-0)和 2 个 broker 服务(1 主 1 从)。
好啦!到这里你已经成功通过 Operator 提供的自定义资源部署了一个 RocketMQ 服务集群。
使用 RocketMQ 的 tools.sh 脚本运行 Producer example:
$ kubectl exec -it broker-0-master-0 bash
bash-4.4# sh ./tools.sh org.apache.rocketmq.example.quickstart.Producer
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
06:56:29.145 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
SendResult [sendStatus=SEND_OK, msgId=0A0102CF007778308DB1206383920000, offsetMsgId=0A0102CF00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-0, queueId=0], queueOffset=0]
...
06:56:51.120 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[10.1.2.207:10909] result: true
bash-4.4#
在另一个节点上运行 Consumer example:
$ kubectl exec -it name-service-0 bash
bash-4.4# sh ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
OpenJDK 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
07:01:32.077 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=273, queueOffset=19845, sysFlag=0, bornTimestamp=1596768410268, bornHost=/30.4.165.204:53450, storeTimestamp=1596768410282, storeHost=/100.81.180.84:10911, msgId=6451B45400002A9F000014F96A0D6C65, commitLogOffset=23061458676837, bodyCRC=532471758, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=19844, TRACE_ON=true, eagleTraceId=1e04a5cc15967684102641001d0db0, MAX_OFFSET=19848, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1596783715858, UNIQ_KEY=1E04A5CC0DB0135FBAA421365A5F0000, WAIT=true, TAGS=TagA, eagleRpcId=9.1}, body=[72, 101, 108, 108, 111, 32, 77, 101, 116, 97, 81, 32, 48], transactionId='null'}]]
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=273, queueOffset=19637, sysFlag=0, bornTimestamp=1596768410296, bornHost=/30.4.165.204:53450, storeTimestamp=1596768410298, storeHost=/100.81.180.84:10911, msgId=6451B45400002A9F000014F96A0D7141, commitLogOffset=23061458678081, bodyCRC=1757146968, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=19636, TRACE_ON=true, eagleTraceId=1e04a5cc15967684102961002d0db0, MAX_OFFSET=19638, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1596783715858, UNIQ_KEY=1E04A5CC0DB0135FBAA421365AB80001, WAIT=true, TAGS=TagA, eagleRpcId=9.1}, body=[72, 101, 108, 108, 111, 32, 77, 101, 116, 97, 81, 32, 49], transactionId='null'}]]
...
清除 RocketMQ 服务集群实例:
$ kubectl delete -f example/rocketmq_v1alpha1_rocketmq_cluster.yaml
清除 RocketMQ Operator:
$ ./purge-operator.sh
选择 Streaming & Messaging 类别,点击 RocketMQ Operator:
参考:OLM 安装文档。
$ make run-console-local
OperatorHub
已安装的 Operators 界面
RocketMQ Operator 介绍界面
通过 UI 界面创建 NameService 自定义资源
可以在 UI 中创建指定 Namespace 下的 NameService 和 Broker 实例,并对已创建的实例进行浏览和管理。我们也可以通过命令查看当前 K8s 集群中的 Pod 状态,例如:
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。