kafka消息列队

Kafka

概念名词

kafka [消息列队]
Kafka™ 是一个分布式流处理系统,这是什么意思呢?
我们认为一个流数据平台具有三个主要功能

1.它允许您发布和订阅流记录。在这方面,它类似于一个消息队列或企业消息传递系统。
2.它能让你以容错方式进行流数据的存储。
3.数据产生时你就可以进行流数据处理。

Kafka擅长哪些地方?

它被用于两大类别的应用程序

1.建立实时流数据通道,这个通道可以可靠的获取在系统或应用间的数据。

2.建立实时流媒体应用来转换流数据或对流数据做出反应

首先是几个概念:

kafka作为集群运行在一台或多台服务器。

Kafka群集存储流记录的类别称为主题(topics)

Kafka的每条记录包含一个键,一个值和一个时间戳。

Kafka 有个核心API:

Producer API 允许应用推送流记录到一个或多个Kafka主题上。

Consumer API 允许应用程序订阅一个或多个主题并且并处理产生的流记录

Streams API 允许应用程序作为一个流处理器,从一个或多个主题获取流数据,然后输出流数据到一个或多个主题,有效地将输入流转换为输出流。

Connector API 允许构建和运行可重用的生产者(Producer)
或消费者(Consumer)连接Kafka与现有应用程序或数据系统。例如,一个连接器(connector)在关系数据库中可能获取每个变化的表。

实践

配置
1
2
3
4
5
6
7
8
9
10
11
12
exports.kafka = {
host: 'localhost:9092',
producerConfig: {
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
partitionerType: 1,
},
consumerTopics: [{
topic: 'test',
partition: 0,
},
],
};
订阅消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
'use strict'

const kafka = require('kafka-node')
module.exports = async app => {
app.beforeStart(async () => {
const ctx = app.createAnonymousContext()
const Producer = kafka.Producer
const client = new kafka.KafkaClient({
kafkaHost: app.config.kafka.host,
})
const producer = new Producer(client, app.config.kafka.producerConfig)
producer.on('error', function(err) {
console.error('ERROR: [Producer] ' + err)
})
producer.on('ready', function() {
app.logger.warn('【启动订阅者成功】')
})
app.producer = producer

const consumer = new kafka.Consumer(client, app.config.kafka.consumerTopics, {
autoCommit: true,
})
consumer.on('message', async function(message) {
app.logger.warn('【收到消息】:', message.value)
try {
// 处理接收到逻辑
// ctx.runInBackground(async () => {
// await ctx.service.log.insert(JSON.parse(message.value))
// })
consumer.commit(true, (err, data) => {
if (err) {
console.error('commit>>err:', err)
} else {
// console.log('commit>>data:', data)
}
})
} catch (error) {
console.error('ERROR: [GetMessage] ', message, error)
}
})
consumer.on('error', function(err) {
console.error('ERROR: [Consumer] ' + err)
})
})
}

发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async sendKafka() {
const {
ctx,
app,
service,
} = this;
console.log('发送消息队列->');
const payloads = [{
topic: 'test',
messages: '2 ' + new Date(),
partition: 0,
},
{
topic: 'test',
messages: [ '3 ' + new Date(), '4 ' + new Date() ],
},
];
app.producer.send(payloads, function(err, data) {
console.log('data', data);
if (!err) {
ctx.helper.success({
ctx,
res: data,
});
}
});
}

基本命令

安装:
kafka依赖java环境,因此你可能需要先安装好java环境。

1
2
3
4
5
// mac 环境使用brew直接安装kafka
brew install kafka

// 卸载
brew uninstall kafka

mac 安装kafka过程中会自动的安装好zookeeper。

启动:

1
2
3
4
5
6
7
// 启动
brew services start kafka
brew services start zookeeper

// 重启
brew services restart kafka
brew services restart zookeeper

查看kafaka版本:

1
2
3
4
5
6
7
cd kafka

# 执行以下命令
find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

drwxrwxr-x. 6 root root 117 May 18 2016 kafka_2.11-0.10.0.0
获得了版本为2.11-0.10.0.0。

前台启动Zookeeper 和kafka

1
2
3
bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

后台启动Zookeeper 和kafka

1
2
3
4
5
6
7
bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null  2>&1  &

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

bin/kafka-server-start.sh -daemon config/server.properties

其中1>/dev/null 2>&1 是将命令产生的输入和错误都输入到空设备,也就是不输出的意思。/dev/null代表空设备。

关闭命令

1
2
3
bin/zookeeper-server-stop.sh config/zookeeper.properties

bin/kafka-server-stop.sh config/server.properties

kafka配置说明:
配置文件server.properties

1
2
3
4
5
// mac 电脑配置文件地址
vim /usr/local/etc/kafka/server.properties

// Linux 配置文件地址
vim /usr/src/kafka/config/server.properties

备注: 尽量配置host.name, 例如本地的配置

1
2
host.name = 127.0.0.1
port=9092

常用基本命令:
创建一个主题(topic)

1
2
3
4
5
// mac
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

// linux
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

删除topic

1
2
3
4
5
// mac 
kafka-topics --delete --zookeeper localhost:2181 --topic 【topic name】

// linux
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 【topic name】

查看创建的topic list

1
2
3
4
5
// mac
kafka-topics --list --zookeeper localhost:2181

// linux
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看详情

1
2
// linux
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic

生产消息

1
2
3
4
5
// mac 
kafka-console-producer --broker-list localhost:9092 --topic test

// linux
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费消息

1
2
3
4
5
// mac 
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

// linux
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

zookeeper

安装
1
sudo apt-get install zookeeper

默认信息

1
2
3
4
#安装路径
/usr/share/zookeeper
#配置文件
/etc/zookeeper/conf/zoo.cfg
卸载
1
2
3
4
sudo apt-get remove zookeeper
sudo apt-get remove --auto-remove zookeeper
sudo apt-get purge zookeeper
sudo apt-get purge --auto-remove zookeeper