Kafka

概念名词

kafka [消息列队]
Kafka™ 是一个分布式流处理系统,这是什么意思呢?
我们认为一个流数据平台具有三个主要功能

1.它允许您发布和订阅流记录。在这方面,它类似于一个消息队列或企业消息传递系统。
2.它能让你以容错方式进行流数据的存储。
3.数据产生时你就可以进行流数据处理。

Kafka擅长哪些地方?

它被用于两大类别的应用程序

1.建立实时流数据通道,这个通道可以可靠的获取在系统或应用间的数据。

2.建立实时流媒体应用来转换流数据或对流数据做出反应

首先是几个概念:

kafka作为集群运行在一台或多台服务器。

Kafka群集存储流记录的类别称为主题(topics)

Kafka的每条记录包含一个键,一个值和一个时间戳。

Kafka 有个核心API:

Producer API 允许应用推送流记录到一个或多个Kafka主题上。

Consumer API 允许应用程序订阅一个或多个主题并且并处理产生的流记录

Streams API 允许应用程序作为一个流处理器,从一个或多个主题获取流数据,然后输出流数据到一个或多个主题,有效地将输入流转换为输出流。

Connector API 允许构建和运行可重用的生产者(Producer)
或消费者(Consumer)连接Kafka与现有应用程序或数据系统。例如,一个连接器(connector)在关系数据库中可能获取每个变化的表。

实践

配置
1
2
3
4
5
6
7
8
9
10
11
12
exports.kafka = {
host: 'localhost:9092',
producerConfig: {
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
partitionerType: 1,
},
consumerTopics: [{
topic: 'test',
partition: 0,
},
],
};
订阅消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
'use strict'

const kafka = require('kafka-node')
module.exports = async app => {
app.beforeStart(async () => {
const ctx = app.createAnonymousContext()
const Producer = kafka.Producer
const client = new kafka.KafkaClient({
kafkaHost: app.config.kafka.host,
})
const producer = new Producer(client, app.config.kafka.producerConfig)
producer.on('error', function(err) {
console.error('ERROR: [Producer] ' + err)
})
producer.on('ready', function() {
app.logger.warn('【启动订阅者成功】')
})
app.producer = producer

const consumer = new kafka.Consumer(client, app.config.kafka.consumerTopics, {
autoCommit: true,
})
consumer.on('message', async function(message) {
app.logger.warn('【收到消息】:', message.value)
try {
// 处理接收到逻辑
// ctx.runInBackground(async () => {
// await ctx.service.log.insert(JSON.parse(message.value))
// })
consumer.commit(true, (err, data) => {
if (err) {
console.error('commit>>err:', err)
} else {
// console.log('commit>>data:', data)
}
})
} catch (error) {
console.error('ERROR: [GetMessage] ', message, error)
}
})
consumer.on('error', function(err) {
console.error('ERROR: [Consumer] ' + err)
})
})
}

发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async sendKafka() {
const {
ctx,
app,
service,
} = this;
console.log('发送消息队列->');
const payloads = [{
topic: 'test',
messages: '2 ' + new Date(),
partition: 0,
},
{
topic: 'test',
messages: [ '3 ' + new Date(), '4 ' + new Date() ],
},
];
app.producer.send(payloads, function(err, data) {
console.log('data', data);
if (!err) {
ctx.helper.success({
ctx,
res: data,
});
}
});
}

基本命令

安装:
kafka依赖java环境,因此你可能需要先安装好java环境。

1
2
3
4
5
// mac 环境使用brew直接安装kafka
brew install kafka

// 卸载
brew uninstall kafka

mac 安装kafka过程中会自动的安装好zookeeper。

启动:

1
2
3
4
5
6
7
// 启动
brew services start kafka
brew services start zookeeper

// 重启
brew services restart kafka
brew services restart zookeeper

查看kafaka版本:

1
2
3
4
5
6
7
cd kafka

# 执行以下命令
find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

drwxrwxr-x. 6 root root 117 May 18 2016 kafka_2.11-0.10.0.0
获得了版本为2.11-0.10.0.0。

前台启动Zookeeper 和kafka

1
2
3
bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

后台启动Zookeeper 和kafka

1
2
3
4
5
6
7
bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null  2>&1  &

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

bin/kafka-server-start.sh -daemon config/server.properties

其中1>/dev/null 2>&1 是将命令产生的输入和错误都输入到空设备,也就是不输出的意思。/dev/null代表空设备。

关闭命令

1
2
3
bin/zookeeper-server-stop.sh config/zookeeper.properties

bin/kafka-server-stop.sh config/server.properties

kafka配置说明:
配置文件server.properties

1
2
3
4
5
// mac 电脑配置文件地址
vim /usr/local/etc/kafka/server.properties

// Linux 配置文件地址
vim /usr/src/kafka/config/server.properties

备注: 尽量配置host.name, 例如本地的配置

1
2
host.name = 127.0.0.1
port=9092

常用基本命令:
创建一个主题(topic)

1
2
3
4
5
// mac
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

// linux
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

删除topic

1
2
3
4
5
// mac 
kafka-topics --delete --zookeeper localhost:2181 --topic 【topic name】

// linux
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 【topic name】

查看创建的topic list

1
2
3
4
5
// mac
kafka-topics --list --zookeeper localhost:2181

// linux
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看详情

1
2
// linux
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic

生产消息

1
2
3
4
5
// mac 
kafka-console-producer --broker-list localhost:9092 --topic test

// linux
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费消息

1
2
3
4
5
// mac 
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

// linux
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

zookeeper

安装
1
sudo apt-get install zookeeper

默认信息

1
2
3
4
#安装路径
/usr/share/zookeeper
#配置文件
/etc/zookeeper/conf/zoo.cfg
卸载
1
2
3
4
sudo apt-get remove zookeeper
sudo apt-get remove --auto-remove zookeeper
sudo apt-get purge zookeeper
sudo apt-get purge --auto-remove zookeeper

安装

1
2
npm install xlsx --save
npm install file-saver --save
使用
1
2
import XLSX from 'xlsx';
import FileSaver from 'file-saver';

概念名词

workbook [excel文档]
首先是workbook,可以把它看作是一个excel文档。
我们要用xlsx导出excel,就是要按规范创建workbook
一个简单的workbook对象是这样的

1
2
3
4
const workbook = {
SheetNames: [], //工作表名数组
Sheets: {} //工作表对象 键名对应SheetNames的key
}

sheet [工作表]
从workbook对象下面延伸出来的是sheet对象,很容易理解,它对应这个excel文档下面的工作表
一个workbook可以有多个sheet,就像下面这个对象这样,等于是有三个工作表的excel文档
sheet的常用配置项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const workbook = {
SheetNames: [ //工作表名数组
'Sheet1', 'Sheet2', 'Sheet2',
],
Sheets: { //工作表对象
Sheet1: { //工作表1
'!ref': 'A1:C2', //工作表的范围 如 必须 否则不显示
'!merges': [ //工作表单元格合并配置项 可选
{
s: { //s start 开始
c: 1,//cols 开始列
r: 0 //rows 开始行
},
e: {//e end 结束
c: 4,//cols 结束列
r: 0 //rows 结束行
}
}
],
'!cols': [ //工作表列宽配置项 可选
{
/* visibility */
hidden? : boolean, // if true, the column is hidden
/* column width is specified in one of the following ways: */
wpx? : number, // width in screen pixels
width? : number, // width in Excel's "Max Digit Width", width*256 is integral
wch? : number, // width in characters
/* other fields for preserving features from files */
MDW? : number, // Excel's "Max Digit Width" unit, always integral
}
],
'!rows': [ //工作表列高配置项 可选
{
/* visibility */
hidden? : boolean, // if true, the row is hidden
/* row height is specified in one of the following ways: */
hpx? : number, // height in screen pixels
hpt? : number, // height in points
level? : number, // 0-indexed outline / group level
}
],
},
Sheet2: {}, //工作表2
Sheet3: {} //工作表3
}
}

cell [单元格]
从sheet 对象下面延伸出来的是cell 对象,对应的是工作表下的单元格,cell 对象键名是A1,B1这样与excel一致的键名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const workbook = {
FileName: 'export.xlsx',
SheetNames: ['Sheet1'],
Sheets: {
Sheet1:{
'!ref': 'A1:B2',
A1:{},
A2:{},
B1:{},
B2:{}
}
}
}

js-xlsx提供了一个方法XLSX.utils.encode_cell来转换键名

1
2
3
4
XLSX.utils.encode_cell({
c: 0, //cols 列
r: 0 //rows 行
};

知道相应的行列值就能方便的转换成键名了,需要注意的是行列是从0开始不是从1开始

1
2
3
4
5
XLSX.utils.encode_cell({c: 0, r: 0});//A1
XLSX.utils.encode_cell({c: 0, r: 1});//A2
XLSX.utils.encode_cell({c: 1, r: 0});//B1
XLSX.utils.encode_cell({c: 1, r: 1});//B2
XLSX.utils.encode_cell({c: 100, r: 100});//CW101
cell单元格对象的配置项
键名 作用
v 初始值 (请参见数据类型t部分)
w 格式化文本 (如果适用)
t 单元格数据类型: b Boolean, n Number, e error, s String, d Date
f 单元格编码为A1样式的字符串(如果适用)
F 如果公式为数组公式,则包含数组的范围(如适用)
r 富文本编码(如适用)
h HTML 富文本呈现(如适用)
c 单元格注释
z 与单元格关联的数字格式字符串(如果请求)
l 单元格超链接对象(.Target holds link, .Tooltip is tooltip)
s 单元格的样式/主题(如果适用)
XLSX.write(workbook, options) 生成excel数据

第一个参数传入写好的workbook,第二个参数是配置项

write options 配置项
键名 默认值 描述
type 输出数据编码(见下表Output Type)
cellDates false 存储日期为d类型(默认为n)
bookSST false 生成共享字符串表
bookType “xlsx” 文档格式类型
sheet “” 单页格式的工作表名称
compression false 对基于ZIP的格式使用ZIP压缩
Props 重写工作簿时重写工作簿属性
themeXLSX Override theme XML when writing XLSX/XLSB/XLSM
Output Type
type output
“base64” string: Base64 encoding of the file
“binary” string: binary string (byte n is data.charCodeAt(n))
“string” string: JS string (characters interpreted as UTF8)
“buffer” nodejs Buffer
“file” string: path of file that will be created (nodejs only)

感觉上面学习的内容已经足够导出一个简单的excel了,我先来试下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
const workbook = {
SheetNames: ['Sheet1'],
Sheets: {
Sheet1: {
'!ref': 'A1:C2',
A1: {v: '标题1', t: 's'},
B1: {v: '标题2', t: 's'},
A2: {v: '第2行第1列', t: 's'},
B2: {v: '第2行第2列', t: 's'},
}
}
};

const workbookOut = XLSX.write(workbook, {
bookType: 'xlsx',
bookSST: false,
type: 'binary'
});

FileSaver.saveAs(new Blob([s2ab(workbookOut)], {
type: 'application/octet-stream'
}), 'export.xlsx')

运行 顺利生成export.xlsx

前提:

1.服务器:阿里云OSS
2.前端:vue或小程序
3.后端:egg

一、上传概述

场景:小程序+阿里云OSS
现在阿里云上传权限分两类 RAM和STS,所以针对两者的上传方式进行一次全解析。

二、上传文件的几种方式

2.1.客户端直传:

通过客户端直接签名的方式,进行直传,具体见例子: web前端直传,文档中已经讲的很详细了,这里就不赘述了,我们说一下它的缺点,缺点很明显:就是因为账户和秘钥都保存在js中,而用户可以通过前端debug(按F12)查阅到,秘钥完全暴露非常不安全。

2.2.上传到服务器再通过服务器直传

当然这种情况不在我们的考虑范围内,今天主要讲WEB直传的方式,但是基于node我们还是提一下。我在查询资料的时候,发现有人先把文件上传到服务器,再通过服务器后端直传到OSS系统,然后再把服务器上的文件删除。详见上传文件

这样的确会让上传安全很多,但是随之而来的问题就是会加大服务器的压力,每一次上传都要删除本地的文件。很占用带宽,如果上传量大的时候,容易出现问题。

2.3.服务器端签名后直传

这种情况相对于上一种,较安全,秘钥是保存在服务器端。通过服务器向阿里云请求获取签名(临时token)返回给前端,签名正确后直传OSS系统。这种情况下相较于前两种方式是比较好的。既可以保证安全性,又可以减轻服务器端的压力。

三、OSS实践

为什么要自动化部署,我这里就不多赘述了。

基于shipit-deploy的自动化部署,可以实现以下效果

一键部署多台服务器。
一键回滚多台服务器。
本地操作,不需要登录服务器。
方便定制,方便扩展,实现自动化。

使用

1.下载安装

1
2
npm install --save-dev shipit-cli
npm install --save-dev shipit-deploy

2.与服务器建立信任关系

1
ssh-copy-id USER@REMOTE_HOST

3.配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
module.exports = function (shipit) {
require('shipit-deploy')(shipit);
require('shipit-pm2')(shipit);
require('shipit-cnpmjs')(shipit);

shipit.initConfig({
default: {
workspace: '/tmp/github-monitor',
deployTo: '/tmp/deploy_to', //服务器的目标路径
repositoryUrl: 'https://github.com/user/repo.git', //git仓库地址
ignores: ['.git', 'node_modules'], //排除的文件
keepReleases: 2, //发布保留的版本数量
deleteOnRollback: false,
key: '/path/to/key',
shallowClone: true,
cnpm: {
remote: false
}
},
dev: { //开发服务器部署
servers: ['user@devServer1', 'user@devServer1'],
branch: 'dev' //需要发布的git分支,
pm2: {
json: 'pm2-dev-app.json' //开发环境的pm2启动配置
}
},
prod: { //生产服务器部署
servers: ['user@prodServer1', 'user@prodServer2'],
branch: 'master' //需要发布的git分支,
pm2: {
json: 'pm2-prod-app.json' //生产jam环境的pm2启动配置
}
}
});
};
pm2-prod-app.json 示例:

{
"apps": [
{
"name": "frontend_name",
"script": "app.js",
"args": "--env=production",
"instances": 1,
"cwd": "/tmp/production_path/current",
"env": {
"NODE_ENV": "production",
"PORT": "9001"
}
}
]
}

当然,我们也可以使用以下脚本来启动项目

1
pm2 startOrRestart pm2-prod-app.json

发布

1
2
3
4
5
shipit dev deploy //开发环境发布
shipit dev rollback //回滚

shipit prod deploy //生产环境发布
shipit prod rollback //回滚

1. 生成公钥和私钥

ssh-keygen -t rsa
按照提示输入完后,会在~/.ssh目录下生成id_rsa和id_rsa.pub这两个文件

2.与服务器建立联系

ssh-copy-id root@192.168.0.100 //示例ip
接下来会要求输入连接密码,验证成功后

3.无密码登陆服务器

尝试以下命令,看是不是直接登陆成功了:
ssh root@192.168.0.100
就是这么简单

include 不对parent造成过滤

1
2
3
4
5
6
7
8
9
10
11
12
include: [
{
attributes: ['id'],
model: models.Post,
where: {
isActive: 1
},
required: false
// 解决的代码就是这一行,加上 required: false.
//默认required=true,include 会去对 parent 造成过滤
}
]

findAllAndCount在有include的时候count条数不对


Sequelize查询的数据表只有10条数据,但查询结果是20,只要不加include条件就是正确的10,是带上了include中的model表的数据的10条

1
2
distinct:true 
//这一句可以去重,它返回的 count 不会把你的 include 的数量算进去

说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
const res = await ctx.model.Guarantee.findAll({
// where,
// where: {
// guarantee_num: '111',
// '$project.project_name$': '龙岗街道育贤小学教学楼等楼宇整治改造工程'
// },
where: {
guarantee_num: '111',
// '$project.project_name$': '龙岗街道育贤小学教学楼等楼宇整治改造工程'
},
limit,
offset,
include: [{
model: ctx.model.Project,
include: [{
//展示所有嵌套
all: true,
//深度递归嵌套,出来嵌套内容
// nested: true
}],
}],
// 用于返回唯一不同的值
distinct: true,
//如果 plain 是 TRUE ,则 sequelize 将只返回结果集的第一条记录。如果是 FALSE, 则是全部记录。
plain: false,
// 设置为 true,即可返回源数据
raw: false,
order: [
['created_at', 'DESC']
]
})

前提:

1.谷歌云
2.node环境、ubuntu

安装node环境

1
2
3
sudo apt install npm
sudo npm install n -g
sudo n stable

安装 shadowsocks

1
npm install -g shadowsocks

找到shadowsocks配置文件路径

linux环境下缺省shadowsocks配置文件路径
/usr/local/lib/node_modules/shadowsocks/config.json

修改配置config.json

1
2
3
4
5
6
7
8
{  
"server":"0.0.0.0",#更改服务器IP
"server_port":9000, #更改连接端口
"local_port":1080,
"password":"pass",#更改密码
"timeout":600,
"method":"aes-256-cfb"
}

启动shadowsocks

1
$ pm2 start ssserver

我有很多自己的个人代码是跑在家里的服务器中的.一般来说都是自动化的处理我的一些生活问题,与数据收集等相关活动.
所以一般没有客户端访问家里服务器的需求.
但是很多时候脑子突发奇想.有一些小点子,一些小的bug或者小的优化,代码修改完成之后,有一个部署到需求.
因为家里服务器没有一个稳定的外网IP的,外网不能直接连接服务器.
所以我们需要 内网穿透

ngrok

选择ngrok的原因很简单,配置方便,并且支持tcp协议.
支持tcp协议代表,我可以直接在外面用SSH来访问家里的机器.

用法

首先你需要去官网注册一个账号
下载ngrok,并且解压到一个你喜欢的目录下面
去官网复制你的授权码
授权ngrok
ngrok authtoken 授权码

http

ngrok http 8080

tcp

ngrok tcp 22

最终你会得到,一个外网可以访问的地址.
用这个地址就可以直接访问到你本机的端口了.

当我们拥有这样一个公网地址之后,我们就可以ssh来控制家里的机器
或者使用github的webhook来做一切你想要做的事情.

环境

1、代码通过git管理,放在了coding上(放在github或者其他平台也都可以)
2、服务器用的阿里云
3、服务器用的ubuntu 16.04
4、客户端用的mac
5、docker镜像平台为dockerhub

准备

1、服务器开通SSH
2、服务器安装Docker
3、掌握基本的shell命令

开始

1、服务器 安装docker
https://docs.docker.com/install/
2、服务器拉取docker jenkins镜像:

1
docker pull jenkins:latest

3.服务器运行dokcer jenkins容器:

1
sudo docker run -d --name jenkins_node -p 49002:8080 -v /var/jenkins_node:/var/jenkins_home jenkins:lates

Map command 键

Mac 和 win的键盘不一样,但使用 Mac 一年时间感觉 Mac 键盘部局还是比较好用的。为了按键匹配把 Alt 和 Win 键换一下,可以直接在 Mac的设置中设置:

Setting -> Keyboard -> Modifier Keys… -> Select Keyboard (choose USB Keyboard) -> switch Option Key and Command Key

这样设置完之后 Win键就和 Command 对应了,Command + C, Command + V 用起来习惯多了。

设置多媒体键

经常听音乐,所以常需要调节音量,但 F1-F12已经被我改为普通功能了,所以不能直接改过来用作多媒体控制(我主要用来控制音量)在 zhihu 上找到有人推荐Karabiner 下载了一个来玩,感觉还是蛮好用的。

这里我将87键键盘的右上角三个键 Ps/SR, ScLk, Pa/Br 键对应到音量静音、减、增三个键,这个功能有预置,可以直接搜索到并 enable 即可:

另外也可以自定义按键对应:

打开 private.xml 输入需要定制的内容

1
2
3
4
5
6
7
8
9
<?xml version="1.0"?>
<!-- 这里我把 Insert 键换成 Launchpad 键-->
<root>
<item>
<name>Insert Key Launchpad</name>
<identifier>private_pc_insert_to_launchpad</identifier>
<autogen>__KeyToKey__ KeyCode::PC_INSERT,KeyCode::LAUNCHPAD</autogen>
</item>
</root>

保存后 reload 即可。

更多例子参考官方样例中的代码

有了高效的工具就剩下好好工作了~

0%