Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
Kafka 是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
支持通过Kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
如果你是一名后端工程师,设计的应用正常的线上运行,某次秒杀活动,突然间把系统搞崩了,排查系统发现很多的流量没有处理,导致系统挂了,这个时候有两种思路: 1. nginx 反向代理,把更多的请求转发给内部网络的服务器上进行处理,达到一个负载均衡的目的 2. 使用消息系统,将更多的请求使用中间件“缓存”起来,再从这个系统中不断的取到缓存的请求,进行进一步的处理。
后者使用到的消息系统,就是kafka 的一个使用场景。
那么什么是 kafka?
kafka 是一个分布式消息系统,目前已定位为分布式流式处理平台。
简单的说一个系统A 将消息发给消息系统,一个系统B 再从消息系统中取到消息,进行后续的处理。
常见的用来描述 kafka 应用场景的一个词是:削峰填谷,削减波峰流量,填充波谷流量,使系统尽量的平滑。
由此得处:kafka 的三个典型应用场景
消息系统
存储系统
分布式流式处理平台
消息系统是目前最广泛的应用;消息传输需要存储起来,供后续系统拉取,故也可以当作存储系统;拉取消息之后,其实也是供后续系统处理,那么为什么不把数据处理也包含再kafka 系统中?分布式流式处理平台,大概就是这个意思。
下文陈述最核心的应用:消息系统
一条消息由系统A 产生,发往消息系统,系统B 从消息系统中拉取,这其中涉及到很多的概念。
系统A 称为生产者 producer,目的是发送消息
消息系统称为 broker,本质是服务进程目的是接受生产者的消息、消费者的消息拉取请求、持久化
系统B 称为消费者 consumer, 目的是拉取消息系统中的消息
针对生产者、消费者有不同的设置参数,决定了生产者、消费者的不同行为。
生产者要发送消息,首先要知道发往何处,即要知道 broker 的地址,知道 broker 的地址,broker(kafka server) 的设置约束了持久化存储的地址及其他行为,除此之外,如何区分发的消息的类型不同呢?kafka 系统给这个区分消息的概念取了个逻辑概念:Topic , 即生产者指定的 Topic 不同,存储的地址就不同。
针对 Topic,简单的场景是,不断的往里面发内容,持久化存储就不断以追加的模式存储,简单场景没什么问题,问题是消息数据过多的话,不利于系统消费,很简单的想法,分不同的“文件”追加存储,把整体规模缩小,这个概念在 kafka 中称之为分区:partition. 消息可以不断的以追加的模式不断的发往分区内,分区有编号,起始位 0 ,消息追加模式存储在分区内,会给一个编号 offset
消费者从 broker 系统中拉取消息,首先要知道broker 地址,其次需要知道 Topic,更细化的还可以设置哪个分区,哪个偏移量 offset 开始,消费消息。
那消息万一丢了咋整?一个简单的做法就是冗余备份:Replication,多份备份,其中有一个是 Leader , 其他的是 follower, leader 的作用是和消息对接,follower 不直接和消息对接,只负责和 leader 对接,不断的同步数据。
多个 broker 构成 kafka 集群,万一一个挂了 kafka 系统依靠 zookeeper 进行重新选举产生新leader。
kafka cluster:
kafka topic: 分区概念
kafka 集群:
基于上述概念:那么如何构建一个Kafka 服务,完成消息系统呢?
启动服务进程:broker
伪代码:
type Broker struct{ Addr Config ... }
生产者连接 broker
伪代码:
type Producer struct{ Config Message ... }
消费者连接 broker
伪代码
type Consumer strcut{ Config Topic Partitions Offset ... }
基本的思路:
启动kafka服务
系统A 连接服务,发送消息
系统B 连接服务,消费消息
结合官网的示例:如何完成最基本的消息收发。
下载安装包:kafka_2.12-2.3.0.tgz
2.12 指编译器版本
2.3.0 指kafka 版本
解压之后,最重要的有两目录:
bin : 一系列的脚本,比如启动 zookeeper 服务,创建 topic,生产者生产消息,消费者消费消息等
zookeeper-server-start.sh zookeeper-server-stop.sh kafka-configs.sh kafka-console-consumer.sh kafka-console-producer.sh kafka-consumer-groups.sh kafka-topics.sh kafka-server-start.sh kafka-server-stop.sh ...
config: 配置文件:比如配置 zookeeper 端口,配置kafka 日志存储目录、对外端口,消息最大容量,保存时常等
zookeeper.properties server.properties producer.properties consumer.properties ...
大概200多个参数吧,不好意思,我记不住。那怎么办?不学了吗,那挣不了钱,涨不了工资啊。
基本默认设置,部分按分类设置:
zookeeper.properties
kafka 依赖于 zookeeper 分布式协调
dataDir=/tmp/zookeeper clientPort=2181
记住这个默认的 clientPort=2181
server.properties
kafka server 服务
log.dirs=/tmp/kafka-logs //日志存储目录 log.retention.hours=168 // 日志存储时长 broker.id=0 // 默认 broker id,集群方式的 kafka 设置,给每个 broker 编号 listeners=PLAINTEXT://:9092 // 对外提供的服务入口地址 zookeeper.connect=localhost:2181 // ZooKeeper集群地址 ...
producer.properties
约定消息等的内容
consumer.properties
约定消费消息等的内容
配置好配置参数后:
启动 zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
启动 kafka 服务进程
> bin/kafka-server-start.sh config/server.properties
创建topic, 查询 topic 等可以使用:kafka-topics.sh
生产者生产消息可以使用:kafka-console-producer.sh
消费者消费消息可以使用:kafka-console-consumer.sh
当然,这些操作,一般只供测试使用,实际的使用是使用对应变成语言的客户端。
kafka go版本客户端:
下载安装:
go get -u -v github.com/Shopify/sarama
系统 A
生产者
type KafkaAction struct { DataSyncProducer sarama.SyncProducer DataAsyncProducer sarama.AsyncProducer } // 同步方式 func newDataSyncProducer(brokerList []string) sarama.SyncProducer { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message config.Producer.Retry.Max = 5 // Retry up to 10 times to produce the message config.Producer.Return.Successes = true config.Producer.Partitioner = sarama.NewRoundRobinPartitioner producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { log.Fatalln("Failed to start Sarama producer1:", err) } return producer } // 异步方式 func newDataAsyncProducer(brokerList []string) sarama.AsyncProducer { config := sarama.NewConfig() sarama.Logger = log.New(os.Stdout, "[KAFKA] ", log.LstdFlags) config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack config.Producer.Compression = sarama.CompressionSnappy // Compress messages config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms config.Producer.Partitioner = sarama.NewRoundRobinPartitioner producer, err := sarama.NewAsyncProducer(brokerList, config) if err != nil { log.Fatalln("Failed to start Sarama producer2:", err) } go func() { for err := range producer.Errors() { log.Println("Failed to write access log entry:", err) } }() return producer }
还记得生产者有一系列配置参数吗?config 就这这个作用,有默认值,可以自己设置对应的值。
比如:压缩算法
config.Producer.Compression = sarama.CompressionSnappy
常用的压缩算法有:
gzip
snappy
lz4
zstd
不同的压缩算法主要在压缩比和吞吐量不同。
比如分区规则
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
常用的分区规则:
轮询机制
随机分区
按 key 分区
比如:发送消息是否返回成功与否
onfig.Producer.RequiredAcks = sarama.WaitForLocal
消息:生产者只传递字节组数据。
接口
type Encoder interface { Encode() ([]byte, error) Length() int }
发送的消息需要实现Encoder 接口,即定义的消息结构体需要实现 Encode 和 Length 方法。
type SendMessage struct { Method string `json:"method"` URL string `json:"url"` Value string `json:"value"` Date string `json:"date"` encoded []byte err error } func (S *SendMessage) Length() int { b, e := json.Marshal(S) S.encoded = b S.err = e return len(string(b)) } func (S *SendMessage) Encode() ([]byte, error) { return S.encoded, S.err }
发送消息
func (K *KafkaAction) Do(v interface{}) { message := v.(SendMessage) // 发送的消息返回分区和偏移量 partition, offset, err := K.DataSyncProducer.SendMessage(&sarama.ProducerMessage{ Topic: TOPIC, Value: &message, }) if err != nil { log.Println(err) return } value := map[string]string{ "method": message.Method, "url": message.URL, "value": message.Value, "date": message.Date, } fmt.Println(fmt.Sprintf("/%d/%d/%+v", partition, offset, value)) }
比如我们按照上面的配置发送消息:topic: topic-golang
partition/offset/value
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/2/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/3/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
上文只有一个 partition , offset值不断增加。
创建另外一个 topic, 分10个区。topic: topic-python
在日志中显示成咋样的呢?
// cd log.dirs ; server.properties 中的设置 topic-golang-0 topic-python-0 topic-python-1 topic-python-2 topic-python-3 topic-python-4 topic-python-5 topic-python-6 topic-python-7 topic-python-8 topic-python-9
往 topic-python 中发送日志,分区规则轮询:
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
轮询,不断的往分区内存消息。
系统 B
func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true brokers := []string{"127.0.0.1:9092"} master, err := sarama.NewConsumer(brokers, config) if err != nil { panic(err) } defer func() { if err := master.Close(); err != nil { panic(err) } }() _, e := master.Partitions("topic-python") if e != nil { log.Println(e) } consumer, err := master.ConsumePartition("topic-python", 0, sarama.OffsetOldest) if err != nil { panic(err) } signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) doneCh := make(chan struct{}) go func() { for { select { case err := <-consumer.Errors(): fmt.Println(err) case msg := <-consumer.Messages(): fmt.Println("Received messages", string(msg.Key), string(msg.Value), msg.Topic) case <-signals: fmt.Println("Interrupt is detected") doneCh <- struct{}{} } } }() <-doneCh }
消费者指定了 topic: topic-python
消费者指定了 partition: 0
还记得生产者向 topic-python 内发送的消息吗?
partition/offset/value
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4] /1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
可以看出:partition: 0 中有两条消息。那么消费者指定了分区,只能消费这两条消息。
Received messages {"method":"get5","url":"www.baidu.com4","value":"da4","date":"12344"} topic-python Received messages {"method":"get5","url":"www.baidu.com4","value":"da4","date":"12344"} topic-python
使用 kafka 客户端 ,那么我们还需要哪些功能?
关于 Topic 的创建、描述、删除等
消费者组描述等
元信息:metadata
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。