Kafka
概念名词
kafka [消息列队]
Kafka™ 是一个分布式流处理系统,这是什么意思呢?
我们认为一个流数据平台具有三个主要功能
1.它允许您发布和订阅流记录。在这方面,它类似于一个消息队列或企业消息传递系统。
2.它能让你以容错方式进行流数据的存储。
3.数据产生时你就可以进行流数据处理。
Kafka擅长哪些地方?
它被用于两大类别的应用程序
1.建立实时流数据通道,这个通道可以可靠的获取在系统或应用间的数据。
2.建立实时流媒体应用来转换流数据或对流数据做出反应
首先是几个概念:
kafka作为集群运行在一台或多台服务器。
Kafka群集存储流记录的类别称为主题(topics)
Kafka的每条记录包含一个键,一个值和一个时间戳。
Kafka 有个核心API:
Producer API 允许应用推送流记录到一个或多个Kafka主题上。
Consumer API 允许应用程序订阅一个或多个主题并且并处理产生的流记录
Streams API 允许应用程序作为一个流处理器,从一个或多个主题获取流数据,然后输出流数据到一个或多个主题,有效地将输入流转换为输出流。
Connector API 允许构建和运行可重用的生产者(Producer)
或消费者(Consumer)连接Kafka与现有应用程序或数据系统。例如,一个连接器(connector)在关系数据库中可能获取每个变化的表。
实践
配置
1 2 3 4 5 6 7 8 9 10 11 12
| exports.kafka = { host: 'localhost:9092', producerConfig: { // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 partitionerType: 1, }, consumerTopics: [{ topic: 'test', partition: 0, }, ], };
|
订阅消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| 'use strict'
const kafka = require('kafka-node') module.exports = async app => { app.beforeStart(async () => { const ctx = app.createAnonymousContext() const Producer = kafka.Producer const client = new kafka.KafkaClient({ kafkaHost: app.config.kafka.host, }) const producer = new Producer(client, app.config.kafka.producerConfig) producer.on('error', function(err) { console.error('ERROR: [Producer] ' + err) }) producer.on('ready', function() { app.logger.warn('【启动订阅者成功】') }) app.producer = producer
const consumer = new kafka.Consumer(client, app.config.kafka.consumerTopics, { autoCommit: true, }) consumer.on('message', async function(message) { app.logger.warn('【收到消息】:', message.value) try { // 处理接收到逻辑 // ctx.runInBackground(async () => { // await ctx.service.log.insert(JSON.parse(message.value)) // }) consumer.commit(true, (err, data) => { if (err) { console.error('commit>>err:', err) } else { // console.log('commit>>data:', data) } }) } catch (error) { console.error('ERROR: [GetMessage] ', message, error) } }) consumer.on('error', function(err) { console.error('ERROR: [Consumer] ' + err) }) }) }
|
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| async sendKafka() { const { ctx, app, service, } = this; console.log('发送消息队列->'); const payloads = [{ topic: 'test', messages: '2 ' + new Date(), partition: 0, }, { topic: 'test', messages: [ '3 ' + new Date(), '4 ' + new Date() ], }, ]; app.producer.send(payloads, function(err, data) { console.log('data', data); if (!err) { ctx.helper.success({ ctx, res: data, }); } }); }
|
基本命令
安装:
kafka依赖java环境,因此你可能需要先安装好java环境。
1 2 3 4 5
| // mac 环境使用brew直接安装kafka brew install kafka
// 卸载 brew uninstall kafka
|
mac 安装kafka过程中会自动的安装好zookeeper。
启动:
1 2 3 4 5 6 7
| // 启动 brew services start kafka brew services start zookeeper
// 重启 brew services restart kafka brew services restart zookeeper
|
查看kafaka版本:
1 2 3 4 5 6 7
| cd kafka
# 执行以下命令 find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
drwxrwxr-x. 6 root root 117 May 18 2016 kafka_2.11-0.10.0.0 获得了版本为2.11-0.10.0.0。
|
前台启动Zookeeper 和kafka
1 2 3
| bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
|
后台启动Zookeeper 和kafka
1 2 3 4 5 6 7
| bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 &
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
|
其中1>/dev/null 2>&1 是将命令产生的输入和错误都输入到空设备,也就是不输出的意思。/dev/null代表空设备。
关闭命令
1 2 3
| bin/zookeeper-server-stop.sh config/zookeeper.properties
bin/kafka-server-stop.sh config/server.properties
|
kafka配置说明:
配置文件server.properties
1 2 3 4 5
| // mac 电脑配置文件地址 vim /usr/local/etc/kafka/server.properties
// Linux 配置文件地址 vim /usr/src/kafka/config/server.properties
|
备注: 尽量配置host.name, 例如本地的配置
1 2
| host.name = 127.0.0.1 port=9092
|
常用基本命令:
创建一个主题(topic)
1 2 3 4 5
| // mac kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
// linux bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
|
删除topic
1 2 3 4 5
| // mac kafka-topics --delete --zookeeper localhost:2181 --topic 【topic name】
// linux bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 【topic name】
|
查看创建的topic list
1 2 3 4 5
| // mac kafka-topics --list --zookeeper localhost:2181
// linux bin/kafka-topics.sh --list --zookeeper localhost:2181
|
查看详情
1 2
| // linux bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic
|
生产消息
1 2 3 4 5
| // mac kafka-console-producer --broker-list localhost:9092 --topic test
// linux bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
|
消费消息
1 2 3 4 5
| // mac kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
// linux bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
|
zookeeper
安装
1
| sudo apt-get install zookeeper
|
默认信息
1 2 3 4
| #安装路径 /usr/share/zookeeper #配置文件 /etc/zookeeper/conf/zoo.cfg
|
卸载
1 2 3 4
| sudo apt-get remove zookeeper sudo apt-get remove --auto-remove zookeeper sudo apt-get purge zookeeper sudo apt-get purge --auto-remove zookeeper
|