I3OVQU

背景

我们之前在Flink CDC同步数据实践 快速的体验了从mysql和pg获取数据,最后在es打成宽表数据。但是对于其他的数据库如sql server等source表的实际能力还未可知,本次就一次的调研实践一次。

安装:从代码编译

正常情况下直接下载编译好的即可

因为flink cdc新特性在master分支,并且没有release,比如新版sql server在master,未发布release-2.2,我们来从源码编译

1
2
3
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

常见FlinkSql命令

首先你得启动吧

1
2
3
4
// 启动集群
bin/start-cluster.sh
// 停止集群
bin/stop-cluster.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 启动集群后,进入flink sql 客户端命令行界面
bin/sql-client.sh embedded

// 表格模式
SET 'sql-client.execution.result-mode' = 'table';
// 变更日志模式
SET 'sql-client.execution.result-mode' = 'changelog';
// Tableau模式
SET 'sql-client.execution.result-mode' = 'tableau';

// 查看当前运行的jobs
bin/flink list
// 查看所有的任务,包括失败、成功、取消的
bin/flink list -a
// 取消命令
bin/flink cancel jobID
1
2
3
4
5
6
7
8
9
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;
SHOW VIEWS;
SHOW FUNCTIONS;
SELECT CURRENT_TIMESTAMP;
RESET table.planner;
RESET
quit

Source表

DataGen ☑️ 测试通过

在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);

select * from datagen;

filesystem ☑️ 测试通过

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE employee_information (
emp_id INT,
name VARCHAR,
dept_id INT
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/something.csv',
'format' = 'csv'
);

SELECT * from employee_information WHERE dept_id = 1;

mongodb ☑️ 测试通过

先决条件:副本集要求,你懂的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE offices (
_id STRING,
name STRING,
addr STRING,
status BOOLEAN,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '10.8.99.44:27011',
'username' = 'root',
'password' = '@junyao2022',
'database' = 'biocitydb',
'collection' = 'offices'
);

select * from offices;

mysql ☑️ 测试通过

先决条件:binlog开启,你懂的

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE products (
id INT,
name STRING,
description STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);

postgres ☑️ 测试通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);

sql Server ☑️ 测试通过

先决条件:需要先开启sql server的cdc能力:EXEC sys.sp_cdc_enable_db;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE material (
FMATERIALID INT,
FNUMBER STRING,
PRIMARY KEY (FMATERIALID) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '10.8.99.31',
'port' = '1433',
'username' = 'sa',
'password' = 'XXXsa1999$',
'database-name' = 'dkw',
'schema-name' = 'dbo',
'table-name' = 'T_BD_MATERIAL'
);

select * from material;

Sink表

elasticsearch

http验证通过 ✅
https、ssl,带证书未知如何配置 ❎

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP_LTZ(3),
customer_id INT,
price DECIMAL(10, 5),
product ROW<name STRING, description STRING>,
order_status BOOLEAN,
customer_name STRING,
customer_address STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders',
'username' = 'root',
'password' = 'password'
);

Doris

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE doris_test_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_test',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
)

执行插入Sink表

1
2
3
4
5
INSERT INTO department_counts
SELECT
dept_id,
COUNT(*) as emp_count
FROM employee_information;

相关链接

flink-cdc-connectors

在未使用mongodb副本集引入事务能力前,我们来通过一些例子看看mongodb在没有事务的情况下的影响,并再一次从案例去验证强事务的系统是否适合使用mongodb,以及判断在引入副本集后实际的事务能力。

事务的原子性(Atomic)

背景

某一天,突然发现,我们的一个上传excel需求,上传后提示报错,但是数据正常上传成功了。

数据结构如下:文件表 + 订单表,订单表关联文件表ID

文件表

1
2
3
4
5
6
7
8

{
"_id": ObjectId("61ea6ee776d12fd2c261c105"),
"fileName": "2.xlsx",
"creator": ObjectId("5eba176654c70a2bc8df0719"),
"uploadDate": ISODate("2022-01-21T08:29:27.823Z"),
"creatorName": "张云",
}

订单表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"_id": ObjectId("61ea6d78703190d0efca84df"),
"org": "DKWSH",
"contact": "翁文斌",
"salesId": ObjectId("5ef3214dc15bc674d6976365"),
"remark": null,
"items": [
{
"detailId": "MX21305683",
"materialName": "PerCP anti-human CD11c",
"spec": "100 tests",
"itemNum": "337234",
"batchNumber": "B332675",
"amount": NumberInt("6"),
"manufacturer": "Biolegend"
}
],
"file": ObjectId("61ea6ee776d12fd2c261c105"),
}
1
2
3
4
5
6
7
8
9
// 创建文件
const file = await ctx.model.Delivery.File.create(files)

// 创建订单
const orders = await ctx.model.Delivery.VirtualOrder.create(orders)

// 发送通知
const sendRes = await ctx.app.noticeQueue.addBulk(datas)

排查

经过日志排查,发现 创建文件 创建订单都成功了,但是发送通知失败了,错误原因为redis版本过低导致异常无法正常发送。

理论

什么是事务的原子性

  • 一个事务包含多个操作,这些操作要么全都执行,要么全都不执行。
  • 实现事务的原子性,要支持回滚操作,在某个操作失败后,回滚到事务执行前的状态。

结论

我们可以理解为 创建文件 创建订单 发送通知这三个步骤是一个事务,要么全部成功,要们全部不执行,
发送通知失败的时候,我们应当将创建文件 创建订单进行回滚,从而达到 创建文件 创建订单 发送通知这三个步骤都不执行。

事务的隔离性(Isolation)

背景

某一天,销售反馈,我的确认操作无法提交了。

数据表如下:订单表 + 发票表 + 发票池表

订单确认操作过程
1、校验订单是否开过发票发票池
2、创建发票池表数据,而后创建发票表

发票表中invoiceNum具有唯一索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1.校验是否开过发票和发票池

// 2.创建开票池
const invoiceItemOpts = {
org: 'XXX',
order: 'XXX',
detailId: 'XXX',
materialName: 'XXX',
}
const invoiceItems = await ctx.model.Delivery.InvoiceItem.create(
invoiceItemOpts
)

// 3.创建发票
let invoiceOpts = {
invoiceNum: ctx.helper.createInvoiceNum('P', parseInt(next_invoice_num)),
}
const invoice = await ctx.model.Delivery.Invoice.create(invoiceOpts)

排查

经过排查,我们先发现数据库发票编号invoiceNum唯一索引报错了。说明多个请求拿到了同一个invoiceNum发票编号

1
2022-02-21 10:58:17,497 ERROR 30265 [-/116.233.76.38/-/25ms POST /orders/6212ff47fa481876394ee21c/status] error_handler: MongoError: E11000 duplicate key error collection: biocitydb.sys_invoices index: invoiceNum_1 dup key: { invoiceNum: "P2202211321" }

我们继续排查发现一共有2次请求,拿到了同一个invoiceNum发票编号,说明出现了并发问题

第一次请求,销售员王璐成功使用P2202211296发票编号创建了发票,未遇到唯一索引

第二次请求,销售员沈梦婷,因为在几乎同一时刻与销售员王璐发出请求,发票编号未有事务加锁,导致发生了脏读

注意看请求的时间与invoiceNum,发现请求时间几乎同一时刻,相同的发票编号。

1
2
3
4
2022-02-21 10:57:38,020 INFO 30265 发票invoiceOpts {
invoiceNum: 'P2202211296'
saleName: '王璐',
}
1
2
3
4
2022-02-21 10:57:38,022 INFO 30265 发票invoiceOpts {
invoiceNum: 'P2202211296',
saleName: '沈梦婷',
}

并发脏读图解:

T1 王璐 T2 沈梦婷
(1)读发票编号P2202211296
(2)创建发票池 (1)读发票编号P2202211296 -> T1未完成就读取现在的发票编号,导致脏读
(3)创建发票 (2)创建发票池
(4)更新当前发票自增编号 (3)创建发票
(4)更新当前发票自增编号

理论

  • 脏读
    事务A修改了一个数据,但未提交,事务B读到了事务A未提交的更新结果,如果事务A提交失败,事务B读到的就是脏数据。

  • 不可重复读
    同一事务中,对于同一份数据读取到的结果不一致。如事务B在事务A提交前后读取的数据不一致。
    原因:事务并发修改记录。
    解决:加锁。但这会导致锁竞争加剧,影响性能。另一种方法是通过MVCC可以在无锁的情况下,避免不可重复读。

  • 幻读
    同一事务中,同一个查询多次返回的结果不一致。如事务B在事务A提交前后查询到的数据记录变多了。
    原因:并发事务增加记录。
    解决:串行。

事务的隔离级别从低到高有:

  • Read Uncommitted

最低的隔离级别,什么都不需要做,一个事务可以读到另一个事务未提交的结果。所有的并发事务问题都会发生。

  • Read Committed

只有在事务提交后,其更新结果才会被其他事务看见。可以解决脏读问题。

  • Repeated Read

在一个事务中,对于同一份数据的读取结果总是相同的,无论是否有其他事务对这份数据进行操作,以及这个事务是否提交。可以解决脏读、不可重复读

  • Serialization

事务串行化执行,隔离级别最高,牺牲了系统的并发性。可以解决并发事务的所有问题

结论

  • 2个并发请求,导致出现事务的脏读问题,2个并发同时拿到了同一个自增编号(发票编号),mongodb支持的锁机制弱,无法使用悲观锁,虽然乐观锁无法解决脏读,但是可以使用乐观锁+事务回滚。可查看了没有mongodb事务的支持下,我这种思路的解决:分布式锁设计实践
  • 出现脏读问题后,因为数据库有唯一索引,创建失败后,出现多表操作的原子性问题。

事务的一致性(Consistency)

todo

事务的持久性(Durability)

todo

monstache实践

背景

我们已经通过 Enterprise Search 企业搜索实践快速搭建起了搜索引擎,
并且通过评估 mongodb同步elasticSearch方案评估,了解到社区和行业主流monstache同步方案。

我们按照Enterprise Search 企业搜索实践,先创建Engine Schema,提前设置好mapping字段。

设置字段
IRy42Q
查看字段数据
BivxFf

我们来实践一下monstache

monstache配置

假设我们已经有了mongodb和elasticsearch,我们来配置同步设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 启用调试日志
verbose = true

mongo-url = "mongodb://root:<password>@10.8.99.44:27011/?authSource=admin"
elasticsearch-urls = ["https://<host>:9200"]

# index GridFS files inserted into the following collections
file-namespaces = ["biocitydb.materials"]
# 此选项允许你直接将集合从 MongoDB 复制到 Elasticsearch。 Monstache 允许过滤实际索引到 Elasticsearch 的数据,因此你不一定需要复制整个集合。 在上面,我们同步数据库 test 中的 mycol 集合。
direct-read-namespaces = ["biocitydb.materials"]
# 实时通知以告知 Elasticsearch 所有写入文档,包括指定集合中的删除和更新。
change-stream-namespaces = ["biocitydb.materials"]

namespace-regex = '^biocitydb\.materials$'

# 压缩请求到es
gzip = true

# generate indexing statistics
stats = true

# index statistics into Elasticsearch
index-stats = true

elasticsearch-user = "elastic"
elasticsearch-password = "<password>"

#monstache最多开几个线程同步到es,默认为4
elasticsearch-max-conns = 2

# 证书文件
elasticsearch-pem-file = "/monstache/client.crt.pem"
elasticsearch-validate-pem-file = false

# mongodb删除集合或库时是否同步删除es中的索引
dropped-collections = true
dropped-databases = false

# 更新es而不是覆盖
index-as-update = true

replay = false

# 记录同步位点,便于下次从该位置同步
resume = true

# do not validate that progress timestamps have been saved
resume-write-unsafe = false

# 需要es ingest-attachment
index-files = false

# turn on search result highlighting of GridFS content
file-highlighting = true

# 高可用模式下需要配置集群名称,集群名称一样的进程会自动加入一个集群内,这个是monstance的集群,不是es
cluster-name = '<name>'

# do not exit after full-sync, rather continue tailing the oplog
exit-after-direct-reads = false

# 生产环境以日志文件输出,默认以命令行输出
# [logs]
# info = "./logs/info.log"
# warn = "./logs/wran.log"
# error = "./logs/error.log"
# trace = "./logs/trace.log"

# mapping定义mongodb数据到es的索引名称和type,namespace是库名.集合名
# 这里需要注意一件事:最好是在es中创建好你要的索引结构,关闭es的自动创建索引功能
[[mapping]]
namespace = "biocitydb.materials"
index = "materials"

[[script]]
namespace = "biocitydb.materials"
path = "./scripts/materials.js"
routing = true
  • [logs]: 记录错误信息
  • [[mapping]]: 改写默认的索引名称。在上面我们的索引名称为 mongodb
  • **[[script]]**:是一种中间件,能够转换,删除文档或定义索引元数据。 可以使用 Javascript 或 Golang 作为插件编写该中间件。

用于转换文档的脚本示例

1
2
3
4
5
module.exports = function (doc) {
delete doc._id;
//TODO
return doc;
}

同步完后,我们来看看同步的数据情况

正确同步了所有数据
8wwon6
正常搜索
nV8qgF

我们同时也评估了使用flinkCDC同步,可查看
Flink CDC实践mongodb到es

相关链接

同步 MongoDB 数据到 Elasticsearch

背景

配置elastic系列产品时,卡了2天在了Enterprise Search 企业搜索实践的证书ssl配置,说明对证书还是有不理解。数字证书、HTTPS、SSL/TLS、加密… 无数的词汇在脑海中席卷而来,这都是些啥啊?为了解答这些困惑,今天这篇文章,我将借此带大家走进 SSL/TLS 加密传输与数字证书,希望从此刻开始,令人眼花缭乱的证书格式不会再成为你的困扰。

本篇着重点在于自签证书

证书与加密

对于数字证书的第一印象,通常来自于 HTTPS 协议。大家都知道HTTP 协议是不需要数字证书的。对于HTTPS协议的理解,可以简单粗暴的认为它约等于 HTTP + SSL,所以,从这个协议诞生的那一刻起,加密算法与数字证书就密不可分,因为从本质上来讲,HTTPS协议就是为了解决如何在不安全的网络上、安全地传输数据的问题。事实上,HTTPS 协议的实现,背后依托 SSL/TLS、数字签名、对称/非对称加密等一系列的知识。也许,在读到这篇文章以前,对于 HTTPS 的理解,永远止步于 HTTP + SSL。那么,我希望下面的解释可以帮助到你,通常,HTTPS 认证可以分为 单向认证 和 双向认证 两种,这里我们以为以单向认证为例,来说明数字证书与加密算法两者间的联系:

S3HoH1

  • 如图所示,HTTPS 单向认证流程主要经历了下面 7 个步骤,它们分别是:
  • 客户端发起 HTTPS 请求
  • 服务器返回证书信息,本质上是公钥
  • 客户端/浏览器通过 CA 根证书验证公钥,如果验证失败,将会收到警告信息
  • 客户端随机生成一个对称密钥 Key,并利用公钥对 Key 进行加密
  • 服务器使用私钥解密获得对称密钥 Key
  • 通过对称密钥 Key 对确认报文进行加密
  • 双方开始通信

由此,我们可以看出,整个 HTTPS 单向认证流程,实际上是结合了 对称加密非对称加密 两种加密方式。
其中,
非对称加密主要用于客户端、服务器双方的“试探”环节,即证书验证部分;
对称加密主要用于客户端、服务器双方的“正式会话”阶段,即数据传输部分。
关于 对称加密 和 非对称加密 两者的区别,我们可以从下面的图中找到答案:
yyhKCA

因为客户端持有服务器端返回的公钥,所以,两者可以使用 非对称加密 对随机密钥 Key 进行加/解密。
同理,因为客户/服务器端使用相同的随机密钥,所以,两者可以使用 对称加密 对数据进行加/解密。

我们来看看天猫这个网站,来看看客户端是怎么对服务端的证书进行校验的。

48bvJ1

事实上,浏览器在对服务器端返回的证书进行校验时,主要关心下面这些信息:

  • 判断域名、有效期等信息是否正确:这些信息在证书中是公开的,可以非常容易地获得。
  • 判断证书是否被篡改:需要由 CA 服务器进行校验。
  • 判断证书来源是否合法:每一份签发的证书都可以按照证书链找到对应的根证书,所以,可以通过操作系统中安装的根证书对证书的来源进行验证。
  • 判断证书是否被吊销:需要由 CRL(Certificate Revocation List,即 证书注销列表)和 OCSP(Online Certificate Status Protocol, 即 在线证书状态协议) 来实现。

这里引入了一个新的概念,即 CA(Certification Authority)。那么,什么是 CA 呢? 通俗来讲,CA 就是一个负责签发、认证和管理证书的机构。可能有朋友会想,客户端和服务器端通过非对称加密相互校验证书就好了啊,为什么还需要这样一个第三方的机构呢?事实上,这相当于一种担保/信用体系,因为服务器端的公钥对任何人来说都是可见的,我们来考虑这样一种情形。假设客户端从服务器端获得了某个公钥,并且它认为这个公钥是可信的,此时,有一个不怀好意的中间人截获了这个公钥,它如法炮制伪造了一个相同的公钥并返回,那么,此时客户端会如何看待这个公钥呢?虽然这个中间人不可能伪造出与服务端相同的私钥,可这无疑会让客户端感到困惑,因为它没有办法判断这个证书的真假。

证书创建

目前,全球主流的 CA 机构有Comodo、Symantec、GeoTrust、DigiCert、Thawte、GlobalSign、RapidSSL 等,其中 Symantec、GeoTrust 都是 DigiCert 机构的子公司,占据数字证书体系中的垄断地位.实际操作中,通常有自签名证书和CA证书两种,两者唯一的差别就在于权威性不同.

CA证书

就是前往Comodo、Symantec、GeoTrust、DigiCert、Thawte、GlobalSign、RapidSSL进行签署,像阿里云腾讯云都有提供相关的证书申请。

自签名证书

所谓自签名证书,其实就是自建一个CA,然后利用这个CA对证书进行签名。
openssl

1
2
3
4
5
6
7
8
9
10
11
12
-new    :说明生成证书请求文件
-x509 :说明生成自签名证书
-key :指定已有的秘钥文件生成秘钥请求,只与生成证书请求选项-new配合。
-newkey :-newkey是与-key互斥的,-newkey是指在生成证书请求或者自签名证书的时候自动生成密钥,
然后生成的密钥名称由-keyout参数指定。当指定newkey选项时,后面指定rsa:bits说明产生
rsa密钥,位数由bits指定。 如果没有指定选项-key和-newkey,默认自动生成秘钥。
-out :-out 指定生成的证书请求或者自签名证书名称
-days :证书的有效期限;
-config :默认参数在ubuntu上为 /etc/ssl/openssl.cnf, 可以使用-config指定特殊路径的配置文件
-nodes :如果指定-newkey自动生成秘钥,那么-nodes选项说明生成的秘钥不需要加密,即不需要输入passphase.
-batch :指定非交互模式,直接读取config文件配置参数,或者使用默认参数值

创建根证书

用openssl x509来自签署。自签署时,使用“-req”选项明确表示输入文件为证书请求文件,否则将默认以为是证书文件,再使用“-signkey”提供自签署时使用的私钥。

1
2
3
4
5
6
// 生成私钥(key文件)
openssl genrsa -out ca.key 2048
// 使用私钥ca.key生成签名请求(csr文件) CSR 即证书签名申请(Certificate Signing Request)
openssl req -new -key ca.key -out ca.csr
// 使用签名请求生成根证书(crt文件)
openssl x509 -req -days 365 -in ca.csr -signkey ca.key -out ca.crt

或openssl req命令生成自签名证书。

1
2
3
4
// 生成私钥ca.key
openssl genrsa -out ca.key 2048
// 使用私钥ca.key请求并生成根证书pem格式
openssl req -x509 -new -nodes -key ca.key -sha256 -days 365 -out ca.pem

在这个过程中,OpenSSL 会要求我们提供下列信息:国家、省份、城市、组织 以及 全域名(FQDN)。在此之前,关于天猫的那个例子,实际上证书上的那些信息就是从这里来的。当我们有了这样一个自建的 CA 以后,我们就可以用这个自建的 CA 去签发证书,这就是自签名 CA 证书,如何生成这个证书呢?

签发证书
使用 CA 根证书签名服务器证书

1
2
3
4
5
6
// 生成私钥
openssl genrsa -out server.key 2048
// 生成证书请求文件
openssl req -new -key server.key -out server.csr
// 使用 CA 的根证书为服务器证书签名
openssl x509 -req -in server.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out server.crt -days 365 -sha256

证书的编码格式

X.509 标准的证书文件具有不同的编码格式,一般包括 PEM 和 DER 两种。

  • PEM: Privacy Enhanced Mail 的缩写,以文本的方式进行存储。它的文件结构以 —–BEGIN XXX—–,并以 —–END XXX—– 结尾,中间 Body 内容为 Base64 编码过的数据。
    例如,以 PEM 格式存储的证书结构大概如下:
1
2
3
4
5
-----BEGIN CERTIFICATE-----

Base64编码过的证书数据

-----END CERTIFICATE-----

一般 Apache 和 Nginx 服务器应用偏向于使用 PEM 这种编码格式。

  • DER: Distinguished Encoding Rules 的缩写,以二进制方式进行存储,文件结构无法直接预览,同样可以通过如下 OpenSSL 命令查看其证书内容:

一般 Java 和 Windows 服务器应用偏向于使用 DER 这种编码格式。

证书的几种文件扩展名

如上所述,对于 X.509 标准的证书两种不同编码格式,一般采用 PEM 编码就以 .pem 作为文件扩展名,若采用 DER 编码,就应以 .der 作为扩展名。但常见的证书扩展名还包括 .crt、.cer、.p12 等,他们采用的编码格式可能不同,内容也有所差别,但大多数都能互相转换,现总结如下:

  • .csr: Certificate Signing Request 的缩写,即证书签名请求,它并不是证书的格式,而是用于向权威证书颁发机构(Certificate Authority, CA)获得签名证书的申请,其核心内容包含一个 RSA 公钥和其他附带信息,在生成这个 .csr 申请的时候,同时也会生成一个配对 RSA 私钥,私钥通常需要严格保存于服务端,不能外泄。

  • .key: 通常用来存放一个 RSA 公钥或者私钥,它并非 X.509 证书格式,编码同样可能是 PEM,也可能是 DER,查看方式如下

  • .pem: 采用 PEM 编码格式的 X.509 证书的文件扩展名;

  • .der: 采用 DER 编码格式的 X.509 证书的文件扩展名;

  • .crt: 即 certificate 的缩写,常见于类 UNIX 系统,有可能是 PEM 编码,也有可能是 DER 编码,但绝大多数情况下此格式证书都是采用 PEM 编码;

  • .cer: 也是 certificate 的缩写,常见于 Windows 系统,同样地,可能是 PEM 编码,也可能是 DER 编码,但绝大多数情况下此格式证书都是采用 DER 编码;

  • .p12: 也写作 .pfx,全称:PKCS #12,是公钥加密标准(Public Key Cryptography Standards,PKCS)系列的一种,它定义了描述个人信息交换语法(Personal Information Exchange Syntax)的标准,可以用来将包含了公钥的 X.509 证书和证书对应的私钥以及其他相关信息打包,进行交换。简单理解:一份 .p12 文件 = X.509 证书+私钥;

1
2
3
4
5
6
7
8
9
10
11
// .pem -> .pfx
openssl pkcs12 -export -in cert.pem -out cert.pfx -inkey key.pem
// .pfx -> .cer
openssl pkcs12 -in server.pfx -out server.cer -nodes
// .cer -> .pem
openssl x509 -inform der -in server.cer -out server.pem
// PEM -> DER
openssl x509 -in server.pem -outform der -out server.der
// DER -> PEM
openssl x509 -in server.der -inform der -outform pem -out server.pem

Ax70lp

什么是Elasticsearch 什么是 MongoDB?
自 2010 年发布以来,Elasticsearch 已成为全球人气排名前十的数据库之一。最初基于 Apache 的 Lucene 搜索引擎,它仍然是一个开源产品,使用 Java 构建,并以非结构化 NoSQL 格式存储数据。 Elasticsearch 专为搜索而构建,并提供高级数据索引功能。对于数据分析,它与 Kibana 和 Logstash 一起运行以形成 ELK 堆栈。 MongoDB 是一个开源的 NoSQL 数据库管理程序,可用于管理分布式架构中的大量数据。它是世界上最受欢迎的 文档存储,并且在一般最受欢迎的数据库中排名前 5 位。 MongoDB 允许您管理、存储和检索面向文档的信息。它提供了快速即席查询、索引、负载平衡、数据聚合和服务器端 JavaScript 执行等功能。

如何选择数据同步工具

同步方式

搜索功能是App必不可少的一部分,我们使用目前比较流行的Elasticsearch进行全文检索。我们的数据主要存储在MongoDB中,如何将这些数据导入到Elasticsearch中,并能一直保持同步呢?做法大致分为两种:

  • 1.应用层(以nodejs举例)
    简单来讲就是我在mongodb添加一条,我同时也对es添加一条,更新删除同理。
    有:mongoosastic、mongoose-elasticsearch-xp

  • 2.数据层
    有通过oplog同步数据,也有通过定期轮询新数据
    有:monstache、mongo-connector、LogStash、Flink CDC

要求

需要支持旧数据全量同步、增量同步(增删改),且近乎实时

选择建议

monstache Flink CDC LogStash mongoosastic mongoose-elasticsearch-xp
近实时 ☑️ ☑️ ☑️根据配置采集速度 ☑️ ☑️
支持(旧数据)全量同步 ☑️ ☑️ ☑️ ✖️ ☑️ esSynchronise
支持增量同步(增删改) ☑️ ☑️ ✖️不支持删 ✖️不支持删改 ✖️不支持批量删改,仅支持通过findOneAndUpdate的{new: true}支持改
是否现在社区主流 ☑️是 ☑️未来主流,可多数据源打成宽表 ✖️否,更适用于无需删改的日志数据 ✖️否 ✖️否
同步方式 数据层oplog 数据层oplog流处理 数据层定期轮询是否有数据 应用层mongoose插件 应用层mongoose插件

排除原则

1、mongo-connector太久了,支持es版本有限,且问题非常多,排除
2、mongoosastic不支持全量同步,不支持删改,排除
3、mongoose-elasticsearch-xp是在mongoosastic基础上改进的,不支持删,排除
4、LogStash不支持删同步,排除
5、一般程序构架为了解耦且并非原子操作,均不会采用在在应用层上做数据同步,排除mongoosastic、mongoose-elasticsearch-xp

实践

monstache实践mongodb同步es
Flink CDC实践mongodb到es

阿里云推荐使用:monstache
通过Monstache实时同步MongoDB数据到阿里云ES

Elastic 企业搜索中包含Workplace Search、App Search、Site Search
我们来实践一下App Search

DwyI50

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
version: '3.0'
services:
ent-search:
image: docker.elastic.co/enterprise-search/enterprise-search:7.13.0
container_name: ent-search
environment:
- "JAVA_OPTS=-Xms2048m -Xmx2048m"
- "ENT_SEARCH_DEFAULT_PASSWORD=XXX"
volumes:
- ./enterprise-search/config/enterprise-search.yml:/usr/share/enterprise-search/config/enterprise-search.yml
- ./enterprise-search/config/certs:/usr/share/enterprise-search/config/certs
ports:
- 3002:3002
networks:
default:
external:
name: dakewe

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
secret_management.encryption_keys: [f70aa30d98a4ebf1570f3d0587b10d4712ae17ec6e9d114d9615c6d38588007f]

ent_search.listen_host: 0.0.0.0
ent_search.auth.default.source: standard

ent_search.external_url: http://103.39.231.XXX:3002

# ent_search 连接 Elasticsearch
elasticsearch.host: https://es01:9200
elasticsearch.username: elastic
elasticsearch.password: "123456"
elasticsearch.ssl.enabled: true
elasticsearch.ssl.certificate_authority: "/usr/share/enterprise-search/config/certs/client-ca.crt"
elasticsearch.ssl.certificate: "/usr/share/enterprise-search/config/certs/client.crt"
elasticsearch.ssl.key: "/usr/share/enterprise-search/config/certs/client.key"
elasticsearch.ssl.verify: false

elasticsearch.startup_retry.enabled: true
elasticsearch.startup_retry.interval: 15

# 允许操作es settings
allow_es_settings_modification: true

kibana.external_url: http://kibana:5601

生成PKI客户端证书,供组件到ES的校验使用

1
2
3
4
5
6
# Private Key 私钥
openssl pkcs12 -in elastic-certificates.p12 -out client.crt -nokeys
# Public Certificate 公共证书
openssl pkcs12 -in elastic-certificates.p12 -out client.key -nodes -nocerts
# CA Certificate 签署公共证书的CA
openssl pkcs12 -in elastic-certificates.p12 -cacerts -nokeys -out client-ca.crt

1
2
3
4
5
6
7
# Private Key 私钥
openssl pkcs12 -in elastic-certificates.p12 -nocerts -nodes > client.key
# Public Certificate 公共证书
openssl pkcs12 -in elastic-certificates.p12 -clcerts -nokeys > client.cer
# CA Certificate 签署公共证书的CA
openssl pkcs12 -in elastic-certificates.p12 -cacerts -nokeys -chain > client-ca.cer

注意生成后的文件 删除 Bag attributes ,本人在这里卡了2天,原来是个bug

bug缘由
为此特别整理了SSL/TLS 加密传输与数字证书

实践

KitmEZ
agQyLc

IRy42Q
BivxFf

通过 monstache实践mongodb同步es,将数据从mongodb同步到es
当然你也可以使用不同的方式去得到搜索引擎的数据

在app search尝试进行搜索和数据分析

8wwon6

nV8qgF

相关链接:
生成密钥

官方文档

Programming language clients

一个MongDB的MapReduce执行的过程如下所示。

20171107203625923

执行顺序

1、执行query筛选出特定数据
2、执行map返回键值对,这里的值可以是一个list
3、执行reduce对value求sum
4、得到名为order_totals的结果

runCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
db.runCommand(
{
mapReduce: <collection>,
map: <function>,
reduce: <function>,
finalize: <function>,
out: <output>,
query: <document>,
sort: <document>,
limit: <number>,
scope: <document>,
jsMode: <boolean>,
verbose: <boolean>,
bypassDocumentValidation: <boolean>,
}
)
1
2
3
4
5
6
7
8
9
10
11
12
13
db.getCollection("customers").mapReduce(function () {
emit(this.province, this.coName);
}, function (key, values) {
return values.length;
}, {
out: {
inline: 1
},
query: {
status: true
},
verbose: true
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// 1
{
"results": [
{
"_id": "贵州省",
"value": 131
},
{
"_id": "山东省",
"value": 1796
},
{
"_id": "河北省",
"value": 196
},
{
"_id": "江苏省",
"value": 4052
},
{
"_id": "福建省",
"value": 232
},
{
"_id": "安徽省",
"value": 585
},
{
"_id": "黑龙江省",
"value": 401
},
{
"_id": "香港特别行政区",
"value": 1491
},
{
"_id": "浙江省",
"value": 2749
},
{
"_id": "青海省",
"value": 8
},
{
"_id": "台湾省",
"value": 3
},
{
"_id": "天津市",
"value": 1096
},
{
"_id": "西藏自治区",
"value": 1
},
{
"_id": "江西省",
"value": 116
},
{
"_id": "甘肃省",
"value": 176
},
{
"_id": "广东省",
"value": 7965
},
{
"_id": "吉林省",
"value": 664
},
{
"_id": "宁夏回族自治区",
"value": 68
},
{
"_id": "重庆市",
"value": 1632
},
{
"_id": "上海市",
"value": 8758
},
{
"_id": "海外",
"value": 1
},
{
"_id": "新疆维吾尔自治区",
"value": 68
},
{
"_id": "辽宁省",
"value": 894
},
{
"_id": "陕西省",
"value": 741
},
{
"_id": "河南省",
"value": 702
},
{
"_id": "山西省",
"value": 96
},
{
"_id": "北京市",
"value": 10177
},
{
"_id": "澳门特别行政区",
"value": 80
},
{
"_id": "云南省",
"value": 389
},
{
"_id": "广西壮族自治区",
"value": 183
},
{
"_id": "内蒙古自治区",
"value": 57
},
{
"_id": "湖南省",
"value": 721
},
{
"_id": "四川省",
"value": 1990
},
{
"_id": "湖北省",
"value": 3515
},
{
"_id": "海南省",
"value": 49
}
],
"ok": 1
}


背景

调研一下remix这个SSR框架,顺便把市面上的vue和react的SSR框架都评估一下。

SSR解决什么问题

  • 更好的SEO
    因为SPA页面的内容是通过Ajax获取,而搜索引擎爬取工具并不会等待Ajax异步完成后再抓取页面内容,所以在SPA中是抓取不到页面通过Ajax获取到的内容的;而SSR是直接由服务端返回已经渲染好的页面(数据已经包含在页面中),所以搜索引擎爬取工具可以抓取渲染好的页面;
  • 更利于首屏渲染
    首屏的渲染是node发送过来的html字符串,并不依赖于js文件了,这就会使用户更快的看到页面的内容。尤其是针对大型单页应用,打包后文件体积比较大,普通客户端渲染加载所有所需文件时间较长,首页就会有一个很长的白屏等待时间。

概念

  • FCP: FCP (First Contentful Paint) 首次内容绘制 标记浏览器渲染来自 DOM 第一位内容的时间点,该内容可能是文本、图像、SVG 甚至 元素.
  • TTI: TTI (Time to Interactive) 可交互时间: 指标用于标记应用已进行视觉渲染并能可靠响应用户输入的时间点.

CSR客户端渲染

VP7Vx0

SSR服务端渲染

xFBvDs

服务端渲染效果

客户端渲染效果

从上面几张图片,我们可以看到:

  1. 首屏渲染CSR比SSR要慢很多
  2. SEO提供给搜索引擎的内容SSR比CSR要丰富得多
  3. 数据的获取CSR在前端通过接口可查看,而SSR在服务端不可查看

SSR框架

Vue:

  • Nuxt.js

React:

  • Next.js
  • Remix.js

Nuxt.js 对标 Next.js
2016 年 10 月 25 日,zeit.co背后的团队对外发布了Next.js,一个 React 的服务端渲染应用框架。几小时后,与 Next.js 异曲同工,一个基于Vue.js的服务端渲染应用框架应运而生,我们称之为:Nuxt.js。

我的关注点对比

Next.js(react) Nuxt.js(vue) Remix.js(react)
静态站点生成 ☑️内置 next export ☑️内置 nuxt generate 🚫不支持
请求接口 ☑️fetch ☑️axios ☑️Fetch API Request 和 Response 接口
数据库访问 ☑️支持,更倾向api接口获取 ☑️支持,更倾向api接口获取 ☑️支持
访问路由 Routing 基于文件系统的路由 基于文件系统的路由,可根据文件目录自动生成路由配置 基于文件系统的路由
api路由 API Routes pages/api目录下 自定义路由 自定义路由
数据加载 Data Fetching ☑️内置 通过 getServerSideProps ☑️内置 通过 asyncData ☑️内置 通过 loader

路由

Remix.js

路由地址 组件
/ App.js > routes/index.js
/invoices App.js > routes/invoices.js > routes/invoices/index.js
/invoices/late App.js > routes/invoices.js > routes/invoices/late.js
/invoices/123 App.js > routes/invoices.js > routes/invoices/$id.js
/invoices/123/edit App.js > routes/invoices.js > routes/invoices/$id.edit.js
/invoices/no/match App.js > routes/404.js
/invoices/new App.js > routes/invoices.new.js
/contact App.js > routes/contact.js

nuxt.js

Nuxt.js 依据 pages 目录结构自动生成 vue-router 模块的路由配置

目录

1
2
3
4
5
pages/
--| user/
-----| index.vue
-----| one.vue
--| index.vue

自动生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
router: {
routes: [
{
name: 'index',
path: '/',
component: 'pages/index.vue'
},
{
name: 'user',
path: '/user',
component: 'pages/user/index.vue'
},
{
name: 'user-one',
path: '/user/one',
component: 'pages/user/one.vue'
}
]
}

next.js

1
2
pages/index.js → /
pages/blog/index.js → /blog

数据加载对比

Remix.js

每个路由模块都可以导出一个组件和一个loader. useLoaderData将加载器的数据提供给您的组件

useLoaderData这个钩子从你的路由的loader函数返回JSON解析数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { useLoaderData } from "remix";
import type { LoaderFunction } from "remix";

export let loader: LoaderFunction = () => {
return fetch('https://.../products') // -> 从接口获取
// return Db.Product.findAll() -> 从数据库获取
// return [{ name: "Pants" }, { name: "Jacket" }]; -> 从静态数据获取
};

export default function Products() {
let products = useLoaderData();
return (
<div>
<h1>Products</h1>
{products.map(product => (
<div>{product.name}</div>
))}
</div>
);
}

nuxt.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<template>
<div>
<h1>{{ title }}</h1>
<NLink to="/product">
About Product
</NLink>
</div>
</template>

<script>
export default {
data() {
return { project: 'default' }
},
async asyncData({ params }) {
const { data } = await axios.get(`https://my-api/products/${params.id}`)
// return Db.Product.findAll()
// return [{ name: "Pants" }, { name: "Jacket" }];
return { title: data.title }

}
}
</script>

next.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function Product({ products }) {
return (
<ul>
{products.map((product) => (
<li>{product.title}</li>
))}
</ul>
)
}

export async function getServerSideProps() {
const res = await fetch('https://.../products')
// return Db.Product.findAll()
// return [{ name: "Pants" }, { name: "Jacket" }];
const products = await res.json()

return {
props: {
products,
},
}
}

export default Product

相关链接

Remix vs. Next: Which React Meta-Framework Should You Use?

Next.js
Nuxt.js
Remix.js

背景

因为要使用es的告警功能,而告警功能是收费版本,那么就破解白金级吧。
ttBdLf
MkPJeq

原理

license中有个signature字段,ES会根据这个字段判断License是否被篡改。只要取消ES的这个判断逻辑,就可以随便篡改License,达到激活的目的了。

我是基于 官方 ES Docker 镜像 7.13.0 版本进行破解的。原则上支持任意版本破解

破解

提取文件

1
ES_HOME/modules/x-pack-core/x-pack-core-7.13.0.jar

获取Jar包查看工具Luyten,你可以可以使用其他的工具,GitHub

然后打开x-pack-core-7.13.0.jar这个文件:

定位到两个文件:然后点击File–Save As 另存为java源码文件:
A4wAn0

修改源码

org.elasticsearch.license/LicenseVerifier.class 另存后:LicenseVerifier.java
OkpSiU

LicenseVerifier.java 修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package org.elasticsearch.license;

import java.nio.*;
import org.elasticsearch.common.bytes.*;
import java.security.*;
import java.util.*;
import org.elasticsearch.common.xcontent.*;
import org.apache.lucene.util.*;
import org.elasticsearch.core.internal.io.*;
import java.io.*;

public class LicenseVerifier
{
public static boolean verifyLicense(final License license, final byte[] publicKeyData) {
/* 注释掉这一大段
byte[] signedContent = null;
byte[] publicKeyFingerprint = null;
try {
final byte[] signatureBytes = Base64.getDecoder().decode(license.signature());
final ByteBuffer byteBuffer = ByteBuffer.wrap(signatureBytes);
final int version = byteBuffer.getInt();
final int magicLen = byteBuffer.getInt();
final byte[] magic = new byte[magicLen];
byteBuffer.get(magic);
final int hashLen = byteBuffer.getInt();
publicKeyFingerprint = new byte[hashLen];
byteBuffer.get(publicKeyFingerprint);
final int signedContentLen = byteBuffer.getInt();
signedContent = new byte[signedContentLen];
byteBuffer.get(signedContent);
final XContentBuilder contentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
license.toXContent(contentBuilder, (ToXContent.Params)new ToXContent.MapParams((Map)Collections.singletonMap("license_spec_view", "true")));
final Signature rsa = Signature.getInstance("SHA512withRSA");
rsa.initVerify(CryptUtils.readPublicKey(publicKeyData));
final BytesRefIterator iterator = BytesReference.bytes(contentBuilder).iterator();
BytesRef ref;
while ((ref = iterator.next()) != null) {
rsa.update(ref.bytes, ref.offset, ref.length);
}
return rsa.verify(signedContent);
}
catch (IOException ex) {}
catch (NoSuchAlgorithmException ex2) {}
catch (SignatureException ex3) {}
catch (InvalidKeyException e) {
throw new IllegalStateException(e);
}
finally {
if (signedContent != null) {
Arrays.fill(signedContent, (byte)0);
}
}
*/
return true; // 增加这行
}

public static boolean verifyLicense(final License license) {
/* 注释掉这一大段
byte[] publicKeyBytes;
try {
final InputStream is = LicenseVerifier.class.getResourceAsStream("/public.key");
try {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, (OutputStream)out);
publicKeyBytes = out.toByteArray();
if (is != null) {
is.close();
}
}
catch (Throwable t) {
if (is != null) {
try {
is.close();
}
catch (Throwable t2) {
t.addSuppressed(t2);
}
}
throw t;
}
}
catch (IOException ex) {
throw new IllegalStateException(ex);
}
return verifyLicense(license, publicKeyBytes);
*/
return true; // 增加这行
}
}

org.elasticsearch.xpack.core/XPackBuild.class 另存后:XPackBuild.java
LAFsVF

XPackBuild.java 修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package org.elasticsearch.xpack.core;

import org.elasticsearch.common.io.*;
import java.net.*;
import org.elasticsearch.common.*;
import java.nio.file.*;
import java.io.*;
import java.util.jar.*;

public class XPackBuild
{
public static final XPackBuild CURRENT;
private String shortHash;
private String date;

@SuppressForbidden(reason = "looks up path of xpack.jar directly")
static Path getElasticsearchCodebase() {
final URL url = XPackBuild.class.getProtectionDomain().getCodeSource().getLocation();
try {
return PathUtils.get(url.toURI());
}
catch (URISyntaxException bogus) {
throw new RuntimeException(bogus);
}
}

XPackBuild(final String shortHash, final String date) {
this.shortHash = shortHash;
this.date = date;
}

public String shortHash() {
return this.shortHash;
}

public String date() {
return this.date;
}

static {
final Path path = getElasticsearchCodebase();
String shortHash = null;
String date = null;
Label_0109: {
/* 注释掉这一大段即可
if (path.toString().endsWith(".jar")) {
try {
final JarInputStream jar = new JarInputStream(Files.newInputStream(path, new OpenOption[0]));
try {
final Manifest manifest = jar.getManifest();
shortHash = manifest.getMainAttributes().getValue("Change");
date = manifest.getMainAttributes().getValue("Build-Date");
jar.close();
}
catch (Throwable t) {
try {
jar.close();
}
catch (Throwable t2) {
t.addSuppressed(t2);
}
throw t;
}
break Label_0109;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
*/
shortHash = "Unknown";
date = "Unknown";
}
CURRENT = new XPackBuild(shortHash, date);
}
}

java源代码已经更改完毕,下面就是生成class文件,然后替换原来的class文件即可:

生成Class文件

执行这段脚本,就可以得到2个Java代码对应的class文件

1
2
3
4
5
6
7
8
ES_HOME="/usr/share/elasticsearch"
ES_JAR=$(cd $ES_HOME && ls lib/elasticsearch-[0-9]*.jar)
ESCORE_JAR=$(cd $ES_HOME && ls lib/elasticsearch-core-*.jar)
LUCENE_JAR=$(cd $ES_HOME && ls lib/lucene-core-*.jar)
XPACK_JAR=$(cd $ES_HOME && ls modules/x-pack-core/x-pack-core-*.jar)

javac -cp "${ES_HOME}/${ES_JAR}:${ES_HOME}/${LUCENE_JAR}:${ES_HOME}/${XPACK_JAR}:${ES_HOME}/${ESCORE_JAR}" LicenseVerifier.java
javac -cp "${ES_HOME}/${ES_JAR}:${ES_HOME}/${LUCENE_JAR}:${ES_HOME}/${XPACK_JAR}:${ES_HOME}/${ESCORE_JAR}" XPackBuild.java

解压重新替换并打包

我们把$ES_HOME/modules/x-pack-core/x-pack-core-7.13.0.jar提取出来,放到一个临时的/elk/x-pack目录中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ export ES_HOME="/usr/share/elasticsearch"
$ cp $ES_HOME/modules/x-pack-core/x-pack-core-7.13.0.jar /elk/x-pack
$ cd /elk/x-pack
# 解压x-pack-core-7.13.0.jar
$ jar -xvf x-pack-core-7.13.0.jar

# 替换.class文件
$ cp /root/XPackBuild.class /elk/x-pack/org/elasticsearch/xpack/core/
$ cp /root/LicenseVerifier.class /elk/x-pack/org/elasticsearch/license/

# 重新打包生成x-pack-core-7.13.0.jar文件
$ cd /elk/x-pack
$ rm -rf x-pack-core-7.13.0..jar # 删除临时拷贝过来的源文件
$ jar cvf x-pack-core-7.13.0..jar .

申请License并修改导入

去官网申请License

我们将申请下来的License中的type改为platinum,将expiry_date_in_millis延长N年时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"license": {
"uid": "92c6b41e-59f9-4674-b227-77063c5fa8b0",
"type": "platinum",
"issue_date_in_millis": 1642291200000,
"expiry_date_in_millis": 3107746200000,
"max_nodes": 100,
"issued_to": "junyao hong (race)",
"issuer": "Web Form",
"signature": "AAAAAwAAAA0kxge9SLSAvWWnMgDEAAABmC9ZN0hjZDBGYnVyRXpCOW5Bb3FjZDAxOWpSbTVoMVZwUzRxVk1PSmkxaktJRVl5MUYvUWh3bHZVUTllbXNPbzBUemtnbWpBbmlWRmRZb25KNFlBR2x0TXc2K2p1Y1VtMG1UQU9TRGZVSGRwaEJGUjE3bXd3LzRqZ05iLzRteWFNekdxRGpIYlFwYkJiNUs0U1hTVlJKNVlXekMrSlVUdFIvV0FNeWdOYnlESDc3MWhlY3hSQmdKSjJ2ZTcvYlBFOHhPQlV3ZHdDQ0tHcG5uOElCaDJ4K1hob29xSG85N0kvTWV3THhlQk9NL01VMFRjNDZpZEVXeUtUMXIyMlIveFpJUkk2WUdveEZaME9XWitGUi9WNTZVQW1FMG1DenhZU0ZmeXlZakVEMjZFT2NvOWxpZGlqVmlHNC8rWVVUYzMwRGVySHpIdURzKzFiRDl4TmM1TUp2VTBOUlJZUlAyV0ZVL2kvVk10L0NsbXNFYVZwT3NSU082dFNNa2prQ0ZsclZ4NTltbU1CVE5lR09Bck93V2J1Y3c9PQAAAQBAD9GxJeiZQonVdEVrn5+frA3tMD18Stcp3fiHVGVdXRzbHQd3N23tTXSyXlqQo0lB/dDt1A4iKh8/Wotp38mFkYq/W/HbJC3hYkJaOQwBPO0aelWYTi4hAxw7c8HSjLf2S4J0dK7LYRW9vfuaK/YrCr42fOGsZ3GX+9WcwbBWT6ONnaJa2dMQRnDsrmcE599LiEz++8GvICWhzfGxjcHk4lsEGmFBC1FajDQsGf/d7oCI3EiNodgSMHtP3u6DZCt8h036wn4gyv5XdH3YauUltsKDmYqGFfD/Udy4kmiKR5qExX4i/K+7q+p4TVJ3GHqgVwtdXGkKiq32qXEqktj6",
"start_date_in_millis": 1642291200000
}
}

好了license.json文件已经OK了.

接下来导入License请确保

1
2
xpack.security.enabled: false
xpack.security.transport.ssl.enabled: false

等更新完升级为白金后再开启配置。

然后加载License到ES中:

1
2
3
$ curl -XPUT -u elastic 'http://localhost:9200/_xpack/license' -H "Content-Type: application/json" -d @license.json
Enter host password for user 'elastic': # 输入elastic用户密码
{"acknowledged":true,"license_status":"valid"} # license写入成功

查看License:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ curl -XGET -uelastic http://localhost:9200/_license
Enter host password for user 'elastic':
{
"license" : {
"status" : "active",
"uid" : "92c6b41e-59f9-4674-b227-77063c5fa8b0",
"type" : "platinum",
"issue_date" : "2019-11-29T00:00:00.000Z",
"issue_date_in_millis" : 1558051200000,
"expiry_date" : "2068-06-24T14:50:00.999Z",
"expiry_date_in_millis" : 2524579200999,
"max_nodes" : 1000,
"issued_to" : "pyker",
"issuer" : "Web Form",
"start_date_in_millis" : 1558051200000
}
}

最后,确保 elasticsearch 和 kibana均重启。单独elasticsearch,不重启kibana,会导致进入kibana时候提示 license无效。

kibana查看许可

RyWe4z

专题目录

ElasticStack-安装篇
ElasticStack-elasticsearch篇
ElasticStack-logstash篇
elasticSearch-mapping相关
elasticSearch-分词器介绍
elasticSearch-分词器实践笔记
elasticSearch-同义词分词器自定义实践
docker-elk集群实践
filebeat与logstash实践
filebeat之pipeline实践
Elasticsearch 7.x 白金级 破解实践
elk的告警调研与实践

背景

我们上次讲filebeat之pipeline实践,用filebeat采集到了es,那么错误日志是不断实时采集上来了,可是能否在出现某种异常的时候能通知告警一下呢,比如通过企业微信机器人通知我们一下,通过短信邮箱通知我们一下?那么我们来调研实践一下elk的告警功能。

kibana Alerting

收费功能,在kibana中现在已经集成了 kibana Alerting功能
破解可查看 Elasticsearch 7.x 白金级 破解实践

2Kpl30

  • Alerts and Actions(规则和连接器)
    Alerts 是运行在 Kibana 的服务, 把一些复杂的条件都隐藏起来功能也较简单,Watcher 提供更复杂条件查找,也可以通过 DSL 设置更复杂的条件。
  • Watcher(监听器)
    Watcher 是运行于 Elasticsearch

Alerts and Actions(规则和连接器)

因为只支持简单的可视化添加规则,暂不做深入。

Watcher(监听器)

一个 watcher 由5个部分组成

1
2
3
4
5
6
7
{
"trigger": {},
"input": {},
"condition": {},
"transform" {},
"actions": {}
}

trigger

这个定义多长时间 watcher 运行一次。比如我们可以定义如下:

1
2
3
4
5
6
7
8
9
"trigger": {
"schedule": {
"daily": {
"at": [
"9:45" //  其实是东八区 17:45
]
}
}
}

这里要注意一下,如果定义的是cron或者具体某个时间,请务必采用UTC时间定义。也就是当前时间-8小时。因为trigger目前只支持utc时间

lMSz6I
2ARF75
相关链接
https://www.elastic.co/guide/en/elasticsearch/reference/7.16/trigger-schedule.html
https://github.com/elastic/elasticsearch/issues/34659

input

input 获取你要评估的数据。要定期搜索日志数据,如查询当天的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"<vpn-log-{now/d{YYYY-MM-dd}}>"
],
"rest_total_hits_as_int": true,
"body": {
"size": 0,
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "now/d",
"lte": "now",
"time_zone": "+08:00"
}
}
}
}
}
}
}
}
}

condition

condition 评估你加载到 watch 中的数据的触发要求,不如总数大于0

1
2
3
4
5
6
7
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},

transform

讲transform的数据装载到ctx.payload,可以不与input一样,这样我们就能在action去拿到我们要进行通知的内容了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
"transform": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"<vpn-log-{now/d{YYYY-MM-dd}}>"
],
"rest_total_hits_as_int": true,
"body": {
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "now/d",
"lte": "now",
"time_zone": "+08:00"
}
}
}
}
},
"aggs": {
"topn": {
"terms": {
"field": "tags"
},
"aggs": {
"source_ip_topn": {
"terms": {
"field": "source_ip"
}
}
}
}
}
}
}
}
}

actions

但是 Watcher 真正的强大在于能够在满足 watch 条件的时候做一些事情。 watch 的操作定义了当 watch 条件评估为真时要做什么。 你可以发送电子邮件、调用第三方 webhook、将文档写入 Elasticsearch 索引或将消息记录到标准 Elasticsearch 日志文件中。这里我们来发一个企业微信机器人webhook

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"actions": {
"wecom_webhook": {
"webhook": {
"scheme": "https",
"host": "qyapi.weixin.qq.com",
"port": 443,
"method": "post",
"path": "/cgi-bin/webhook/send",
"params": {
"key": "XXX"
},
"headers": {
"Content-Type": "application/json"
},
"body": """{"msgtype":"text","text":{"content":"【vpn监控-每日异常汇总】 - 今日当前共{{ctx.payload.hits.total}}条错误异常\n\n 问题排行:\n\n{{#ctx.payload.aggregations.topn.buckets}} - {{key}} {{doc_count}}次\n{{#source_ip_topn.buckets}} \t {{key}} {{doc_count}}次\n{{/source_ip_topn.buckets}}\n{{/ctx.payload.aggregations.topn.buckets}}\n\n请查看Dashboard定位问题:http://it.dakewe.com/goto/fc2c30d43913c3bc066fd5b470b47953\n账号/密码:public_viewer"}}"""
}
}
}

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
{
"trigger": {
"schedule": {
"daily": {
"at": [
"9:45"
]
}
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"<vpn-log-{now/d{YYYY-MM-dd}}>"
],
"rest_total_hits_as_int": true,
"body": {
"size": 0,
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "now/d",
"lte": "now",
"time_zone": "+08:00"
}
}
}
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},
"actions": {
"wecom_webhook": {
"webhook": {
"scheme": "https",
"host": "qyapi.weixin.qq.com",
"port": 443,
"method": "post",
"path": "/cgi-bin/webhook/send",
"params": {
"key": "XXX"
},
"headers": {
"Content-Type": "application/json"
},
"body": """{"msgtype":"text","text":{"content":"【vpn监控-每日异常汇总】 - 今日当前共{{ctx.payload.hits.total}}条错误异常\n\n 问题排行:\n\n{{#ctx.payload.aggregations.topn.buckets}} - {{key}} {{doc_count}}次\n{{#source_ip_topn.buckets}} \t {{key}} {{doc_count}}次\n{{/source_ip_topn.buckets}}\n{{/ctx.payload.aggregations.topn.buckets}}\n\n请查看Dashboard定位问题:http://it.dakewe.com/goto/fc2c30d43913c3bc066fd5b470b47953\n账号/密码:public_viewer"}}"""
}
}
},
"transform": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"<vpn-log-{now/d{YYYY-MM-dd}}>"
],
"rest_total_hits_as_int": true,
"body": {
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "now/d",
"lte": "now",
"time_zone": "+08:00"
}
}
}
}
},
"aggs": {
"topn": {
"terms": {
"field": "tags"
},
"aggs": {
"source_ip_topn": {
"terms": {
"field": "source_ip"
}
}
}
}
}
}
}
}
}
}

添加和模拟 Watcher

我们可以从kibana进行watcher的创建和模拟。

1OrAOs
UWAQg9

专题目录

ElasticStack-安装篇
ElasticStack-elasticsearch篇
ElasticStack-logstash篇
elasticSearch-mapping相关
elasticSearch-分词器介绍
elasticSearch-分词器实践笔记
elasticSearch-同义词分词器自定义实践
docker-elk集群实践
filebeat与logstash实践
filebeat之pipeline实践
Elasticsearch 7.x 白金级 破解实践
elk的告警调研与实践

0%