Ax70lp

什么是Elasticsearch 什么是 MongoDB?
自 2010 年发布以来,Elasticsearch 已成为全球人气排名前十的数据库之一。最初基于 Apache 的 Lucene 搜索引擎,它仍然是一个开源产品,使用 Java 构建,并以非结构化 NoSQL 格式存储数据。 Elasticsearch 专为搜索而构建,并提供高级数据索引功能。对于数据分析,它与 Kibana 和 Logstash 一起运行以形成 ELK 堆栈。 MongoDB 是一个开源的 NoSQL 数据库管理程序,可用于管理分布式架构中的大量数据。它是世界上最受欢迎的 文档存储,并且在一般最受欢迎的数据库中排名前 5 位。 MongoDB 允许您管理、存储和检索面向文档的信息。它提供了快速即席查询、索引、负载平衡、数据聚合和服务器端 JavaScript 执行等功能。

如何选择数据同步工具

同步方式

搜索功能是App必不可少的一部分,我们使用目前比较流行的Elasticsearch进行全文检索。我们的数据主要存储在MongoDB中,如何将这些数据导入到Elasticsearch中,并能一直保持同步呢?做法大致分为两种:

  • 1.应用层(以nodejs举例)
    简单来讲就是我在mongodb添加一条,我同时也对es添加一条,更新删除同理。
    有:mongoosastic、mongoose-elasticsearch-xp

  • 2.数据层
    有通过oplog同步数据,也有通过定期轮询新数据
    有:monstache、mongo-connector、LogStash、Flink CDC

要求

需要支持旧数据全量同步、增量同步(增删改),且近乎实时

选择建议

monstache Flink CDC LogStash mongoosastic mongoose-elasticsearch-xp
近实时 ☑️ ☑️ ☑️根据配置采集速度 ☑️ ☑️
支持(旧数据)全量同步 ☑️ ☑️ ☑️ ✖️ ☑️ esSynchronise
支持增量同步(增删改) ☑️ ☑️ ✖️不支持删 ✖️不支持删改 ✖️不支持批量删改,仅支持通过findOneAndUpdate的{new: true}支持改
是否现在社区主流 ☑️是 ☑️未来主流,可多数据源打成宽表 ✖️否,更适用于无需删改的日志数据 ✖️否 ✖️否
同步方式 数据层oplog 数据层oplog流处理 数据层定期轮询是否有数据 应用层mongoose插件 应用层mongoose插件

排除原则

1、mongo-connector太久了,支持es版本有限,且问题非常多,排除
2、mongoosastic不支持全量同步,不支持删改,排除
3、mongoose-elasticsearch-xp是在mongoosastic基础上改进的,不支持删,排除
4、LogStash不支持删同步,排除
5、一般程序构架为了解耦且并非原子操作,均不会采用在在应用层上做数据同步,排除mongoosastic、mongoose-elasticsearch-xp

实践

monstache实践mongodb同步es
Flink CDC实践mongodb到es

阿里云推荐使用:monstache
通过Monstache实时同步MongoDB数据到阿里云ES

Elastic 企业搜索中包含Workplace Search、App Search、Site Search
我们来实践一下App Search

DwyI50

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
version: '3.0'
services:
ent-search:
image: docker.elastic.co/enterprise-search/enterprise-search:7.13.0
container_name: ent-search
environment:
- "JAVA_OPTS=-Xms2048m -Xmx2048m"
- "ENT_SEARCH_DEFAULT_PASSWORD=XXX"
volumes:
- ./enterprise-search/config/enterprise-search.yml:/usr/share/enterprise-search/config/enterprise-search.yml
- ./enterprise-search/config/certs:/usr/share/enterprise-search/config/certs
ports:
- 3002:3002
networks:
default:
external:
name: dakewe

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
secret_management.encryption_keys: [f70aa30d98a4ebf1570f3d0587b10d4712ae17ec6e9d114d9615c6d38588007f]

ent_search.listen_host: 0.0.0.0
ent_search.auth.default.source: standard

ent_search.external_url: http://103.39.231.XXX:3002

# ent_search 连接 Elasticsearch
elasticsearch.host: https://es01:9200
elasticsearch.username: elastic
elasticsearch.password: "123456"
elasticsearch.ssl.enabled: true
elasticsearch.ssl.certificate_authority: "/usr/share/enterprise-search/config/certs/client-ca.crt"
elasticsearch.ssl.certificate: "/usr/share/enterprise-search/config/certs/client.crt"
elasticsearch.ssl.key: "/usr/share/enterprise-search/config/certs/client.key"
elasticsearch.ssl.verify: false

elasticsearch.startup_retry.enabled: true
elasticsearch.startup_retry.interval: 15

# 允许操作es settings
allow_es_settings_modification: true

kibana.external_url: http://kibana:5601

生成PKI客户端证书,供组件到ES的校验使用

1
2
3
4
5
6
# Private Key 私钥
openssl pkcs12 -in elastic-certificates.p12 -out client.crt -nokeys
# Public Certificate 公共证书
openssl pkcs12 -in elastic-certificates.p12 -out client.key -nodes -nocerts
# CA Certificate 签署公共证书的CA
openssl pkcs12 -in elastic-certificates.p12 -cacerts -nokeys -out client-ca.crt

1
2
3
4
5
6
7
# Private Key 私钥
openssl pkcs12 -in elastic-certificates.p12 -nocerts -nodes > client.key
# Public Certificate 公共证书
openssl pkcs12 -in elastic-certificates.p12 -clcerts -nokeys > client.cer
# CA Certificate 签署公共证书的CA
openssl pkcs12 -in elastic-certificates.p12 -cacerts -nokeys -chain > client-ca.cer

注意生成后的文件 删除 Bag attributes ,本人在这里卡了2天,原来是个bug

bug缘由
为此特别整理了SSL/TLS 加密传输与数字证书

实践

KitmEZ
agQyLc

IRy42Q
BivxFf

通过 monstache实践mongodb同步es,将数据从mongodb同步到es
当然你也可以使用不同的方式去得到搜索引擎的数据

在app search尝试进行搜索和数据分析

8wwon6

nV8qgF

相关链接:
生成密钥

官方文档

Programming language clients

一个MongDB的MapReduce执行的过程如下所示。

20171107203625923

执行顺序

1、执行query筛选出特定数据
2、执行map返回键值对,这里的值可以是一个list
3、执行reduce对value求sum
4、得到名为order_totals的结果

runCommand

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
db.runCommand(
{
mapReduce: <collection>,
map: <function>,
reduce: <function>,
finalize: <function>,
out: <output>,
query: <document>,
sort: <document>,
limit: <number>,
scope: <document>,
jsMode: <boolean>,
verbose: <boolean>,
bypassDocumentValidation: <boolean>,
}
)
1
2
3
4
5
6
7
8
9
10
11
12
13
db.getCollection("customers").mapReduce(function () {
emit(this.province, this.coName);
}, function (key, values) {
return values.length;
}, {
out: {
inline: 1
},
query: {
status: true
},
verbose: true
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// 1
{
"results": [
{
"_id": "贵州省",
"value": 131
},
{
"_id": "山东省",
"value": 1796
},
{
"_id": "河北省",
"value": 196
},
{
"_id": "江苏省",
"value": 4052
},
{
"_id": "福建省",
"value": 232
},
{
"_id": "安徽省",
"value": 585
},
{
"_id": "黑龙江省",
"value": 401
},
{
"_id": "香港特别行政区",
"value": 1491
},
{
"_id": "浙江省",
"value": 2749
},
{
"_id": "青海省",
"value": 8
},
{
"_id": "台湾省",
"value": 3
},
{
"_id": "天津市",
"value": 1096
},
{
"_id": "西藏自治区",
"value": 1
},
{
"_id": "江西省",
"value": 116
},
{
"_id": "甘肃省",
"value": 176
},
{
"_id": "广东省",
"value": 7965
},
{
"_id": "吉林省",
"value": 664
},
{
"_id": "宁夏回族自治区",
"value": 68
},
{
"_id": "重庆市",
"value": 1632
},
{
"_id": "上海市",
"value": 8758
},
{
"_id": "海外",
"value": 1
},
{
"_id": "新疆维吾尔自治区",
"value": 68
},
{
"_id": "辽宁省",
"value": 894
},
{
"_id": "陕西省",
"value": 741
},
{
"_id": "河南省",
"value": 702
},
{
"_id": "山西省",
"value": 96
},
{
"_id": "北京市",
"value": 10177
},
{
"_id": "澳门特别行政区",
"value": 80
},
{
"_id": "云南省",
"value": 389
},
{
"_id": "广西壮族自治区",
"value": 183
},
{
"_id": "内蒙古自治区",
"value": 57
},
{
"_id": "湖南省",
"value": 721
},
{
"_id": "四川省",
"value": 1990
},
{
"_id": "湖北省",
"value": 3515
},
{
"_id": "海南省",
"value": 49
}
],
"ok": 1
}


背景

调研一下remix这个SSR框架,顺便把市面上的vue和react的SSR框架都评估一下。

SSR解决什么问题

  • 更好的SEO
    因为SPA页面的内容是通过Ajax获取,而搜索引擎爬取工具并不会等待Ajax异步完成后再抓取页面内容,所以在SPA中是抓取不到页面通过Ajax获取到的内容的;而SSR是直接由服务端返回已经渲染好的页面(数据已经包含在页面中),所以搜索引擎爬取工具可以抓取渲染好的页面;
  • 更利于首屏渲染
    首屏的渲染是node发送过来的html字符串,并不依赖于js文件了,这就会使用户更快的看到页面的内容。尤其是针对大型单页应用,打包后文件体积比较大,普通客户端渲染加载所有所需文件时间较长,首页就会有一个很长的白屏等待时间。

概念

  • FCP: FCP (First Contentful Paint) 首次内容绘制 标记浏览器渲染来自 DOM 第一位内容的时间点,该内容可能是文本、图像、SVG 甚至 元素.
  • TTI: TTI (Time to Interactive) 可交互时间: 指标用于标记应用已进行视觉渲染并能可靠响应用户输入的时间点.

CSR客户端渲染

VP7Vx0

SSR服务端渲染

xFBvDs

服务端渲染效果

客户端渲染效果

从上面几张图片,我们可以看到:

  1. 首屏渲染CSR比SSR要慢很多
  2. SEO提供给搜索引擎的内容SSR比CSR要丰富得多
  3. 数据的获取CSR在前端通过接口可查看,而SSR在服务端不可查看

SSR框架

Vue:

  • Nuxt.js

React:

  • Next.js
  • Remix.js

Nuxt.js 对标 Next.js
2016 年 10 月 25 日,zeit.co背后的团队对外发布了Next.js,一个 React 的服务端渲染应用框架。几小时后,与 Next.js 异曲同工,一个基于Vue.js的服务端渲染应用框架应运而生,我们称之为:Nuxt.js。

我的关注点对比

Next.js(react) Nuxt.js(vue) Remix.js(react)
静态站点生成 ☑️内置 next export ☑️内置 nuxt generate 🚫不支持
请求接口 ☑️fetch ☑️axios ☑️Fetch API Request 和 Response 接口
数据库访问 ☑️支持,更倾向api接口获取 ☑️支持,更倾向api接口获取 ☑️支持
访问路由 Routing 基于文件系统的路由 基于文件系统的路由,可根据文件目录自动生成路由配置 基于文件系统的路由
api路由 API Routes pages/api目录下 自定义路由 自定义路由
数据加载 Data Fetching ☑️内置 通过 getServerSideProps ☑️内置 通过 asyncData ☑️内置 通过 loader

路由

Remix.js

路由地址 组件
/ App.js > routes/index.js
/invoices App.js > routes/invoices.js > routes/invoices/index.js
/invoices/late App.js > routes/invoices.js > routes/invoices/late.js
/invoices/123 App.js > routes/invoices.js > routes/invoices/$id.js
/invoices/123/edit App.js > routes/invoices.js > routes/invoices/$id.edit.js
/invoices/no/match App.js > routes/404.js
/invoices/new App.js > routes/invoices.new.js
/contact App.js > routes/contact.js

nuxt.js

Nuxt.js 依据 pages 目录结构自动生成 vue-router 模块的路由配置

目录

1
2
3
4
5
pages/
--| user/
-----| index.vue
-----| one.vue
--| index.vue

自动生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
router: {
routes: [
{
name: 'index',
path: '/',
component: 'pages/index.vue'
},
{
name: 'user',
path: '/user',
component: 'pages/user/index.vue'
},
{
name: 'user-one',
path: '/user/one',
component: 'pages/user/one.vue'
}
]
}

next.js

1
2
pages/index.js → /
pages/blog/index.js → /blog

数据加载对比

Remix.js

每个路由模块都可以导出一个组件和一个loader. useLoaderData将加载器的数据提供给您的组件

useLoaderData这个钩子从你的路由的loader函数返回JSON解析数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { useLoaderData } from "remix";
import type { LoaderFunction } from "remix";

export let loader: LoaderFunction = () => {
return fetch('https://.../products') // -> 从接口获取
// return Db.Product.findAll() -> 从数据库获取
// return [{ name: "Pants" }, { name: "Jacket" }]; -> 从静态数据获取
};

export default function Products() {
let products = useLoaderData();
return (
<div>
<h1>Products</h1>
{products.map(product => (
<div>{product.name}</div>
))}
</div>
);
}

nuxt.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<template>
<div>
<h1>{{ title }}</h1>
<NLink to="/product">
About Product
</NLink>
</div>
</template>

<script>
export default {
data() {
return { project: 'default' }
},
async asyncData({ params }) {
const { data } = await axios.get(`https://my-api/products/${params.id}`)
// return Db.Product.findAll()
// return [{ name: "Pants" }, { name: "Jacket" }];
return { title: data.title }

}
}
</script>

next.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
function Product({ products }) {
return (
<ul>
{products.map((product) => (
<li>{product.title}</li>
))}
</ul>
)
}

export async function getServerSideProps() {
const res = await fetch('https://.../products')
// return Db.Product.findAll()
// return [{ name: "Pants" }, { name: "Jacket" }];
const products = await res.json()

return {
props: {
products,
},
}
}

export default Product

相关链接

Remix vs. Next: Which React Meta-Framework Should You Use?

Next.js
Nuxt.js
Remix.js

背景

因为要使用es的告警功能,而告警功能是收费版本,那么就破解白金级吧。
ttBdLf
MkPJeq

原理

license中有个signature字段,ES会根据这个字段判断License是否被篡改。只要取消ES的这个判断逻辑,就可以随便篡改License,达到激活的目的了。

我是基于 官方 ES Docker 镜像 7.13.0 版本进行破解的。原则上支持任意版本破解

破解

提取文件

1
ES_HOME/modules/x-pack-core/x-pack-core-7.13.0.jar

获取Jar包查看工具Luyten,你可以可以使用其他的工具,GitHub

然后打开x-pack-core-7.13.0.jar这个文件:

定位到两个文件:然后点击File–Save As 另存为java源码文件:
A4wAn0

修改源码

org.elasticsearch.license/LicenseVerifier.class 另存后:LicenseVerifier.java
OkpSiU

LicenseVerifier.java 修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package org.elasticsearch.license;

import java.nio.*;
import org.elasticsearch.common.bytes.*;
import java.security.*;
import java.util.*;
import org.elasticsearch.common.xcontent.*;
import org.apache.lucene.util.*;
import org.elasticsearch.core.internal.io.*;
import java.io.*;

public class LicenseVerifier
{
public static boolean verifyLicense(final License license, final byte[] publicKeyData) {
/* 注释掉这一大段
byte[] signedContent = null;
byte[] publicKeyFingerprint = null;
try {
final byte[] signatureBytes = Base64.getDecoder().decode(license.signature());
final ByteBuffer byteBuffer = ByteBuffer.wrap(signatureBytes);
final int version = byteBuffer.getInt();
final int magicLen = byteBuffer.getInt();
final byte[] magic = new byte[magicLen];
byteBuffer.get(magic);
final int hashLen = byteBuffer.getInt();
publicKeyFingerprint = new byte[hashLen];
byteBuffer.get(publicKeyFingerprint);
final int signedContentLen = byteBuffer.getInt();
signedContent = new byte[signedContentLen];
byteBuffer.get(signedContent);
final XContentBuilder contentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
license.toXContent(contentBuilder, (ToXContent.Params)new ToXContent.MapParams((Map)Collections.singletonMap("license_spec_view", "true")));
final Signature rsa = Signature.getInstance("SHA512withRSA");
rsa.initVerify(CryptUtils.readPublicKey(publicKeyData));
final BytesRefIterator iterator = BytesReference.bytes(contentBuilder).iterator();
BytesRef ref;
while ((ref = iterator.next()) != null) {
rsa.update(ref.bytes, ref.offset, ref.length);
}
return rsa.verify(signedContent);
}
catch (IOException ex) {}
catch (NoSuchAlgorithmException ex2) {}
catch (SignatureException ex3) {}
catch (InvalidKeyException e) {
throw new IllegalStateException(e);
}
finally {
if (signedContent != null) {
Arrays.fill(signedContent, (byte)0);
}
}
*/
return true; // 增加这行
}

public static boolean verifyLicense(final License license) {
/* 注释掉这一大段
byte[] publicKeyBytes;
try {
final InputStream is = LicenseVerifier.class.getResourceAsStream("/public.key");
try {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(is, (OutputStream)out);
publicKeyBytes = out.toByteArray();
if (is != null) {
is.close();
}
}
catch (Throwable t) {
if (is != null) {
try {
is.close();
}
catch (Throwable t2) {
t.addSuppressed(t2);
}
}
throw t;
}
}
catch (IOException ex) {
throw new IllegalStateException(ex);
}
return verifyLicense(license, publicKeyBytes);
*/
return true; // 增加这行
}
}

org.elasticsearch.xpack.core/XPackBuild.class 另存后:XPackBuild.java
LAFsVF

XPackBuild.java 修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package org.elasticsearch.xpack.core;

import org.elasticsearch.common.io.*;
import java.net.*;
import org.elasticsearch.common.*;
import java.nio.file.*;
import java.io.*;
import java.util.jar.*;

public class XPackBuild
{
public static final XPackBuild CURRENT;
private String shortHash;
private String date;

@SuppressForbidden(reason = "looks up path of xpack.jar directly")
static Path getElasticsearchCodebase() {
final URL url = XPackBuild.class.getProtectionDomain().getCodeSource().getLocation();
try {
return PathUtils.get(url.toURI());
}
catch (URISyntaxException bogus) {
throw new RuntimeException(bogus);
}
}

XPackBuild(final String shortHash, final String date) {
this.shortHash = shortHash;
this.date = date;
}

public String shortHash() {
return this.shortHash;
}

public String date() {
return this.date;
}

static {
final Path path = getElasticsearchCodebase();
String shortHash = null;
String date = null;
Label_0109: {
/* 注释掉这一大段即可
if (path.toString().endsWith(".jar")) {
try {
final JarInputStream jar = new JarInputStream(Files.newInputStream(path, new OpenOption[0]));
try {
final Manifest manifest = jar.getManifest();
shortHash = manifest.getMainAttributes().getValue("Change");
date = manifest.getMainAttributes().getValue("Build-Date");
jar.close();
}
catch (Throwable t) {
try {
jar.close();
}
catch (Throwable t2) {
t.addSuppressed(t2);
}
throw t;
}
break Label_0109;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
*/
shortHash = "Unknown";
date = "Unknown";
}
CURRENT = new XPackBuild(shortHash, date);
}
}

java源代码已经更改完毕,下面就是生成class文件,然后替换原来的class文件即可:

生成Class文件

执行这段脚本,就可以得到2个Java代码对应的class文件

1
2
3
4
5
6
7
8
ES_HOME="/usr/share/elasticsearch"
ES_JAR=$(cd $ES_HOME && ls lib/elasticsearch-[0-9]*.jar)
ESCORE_JAR=$(cd $ES_HOME && ls lib/elasticsearch-core-*.jar)
LUCENE_JAR=$(cd $ES_HOME && ls lib/lucene-core-*.jar)
XPACK_JAR=$(cd $ES_HOME && ls modules/x-pack-core/x-pack-core-*.jar)

javac -cp "${ES_HOME}/${ES_JAR}:${ES_HOME}/${LUCENE_JAR}:${ES_HOME}/${XPACK_JAR}:${ES_HOME}/${ESCORE_JAR}" LicenseVerifier.java
javac -cp "${ES_HOME}/${ES_JAR}:${ES_HOME}/${LUCENE_JAR}:${ES_HOME}/${XPACK_JAR}:${ES_HOME}/${ESCORE_JAR}" XPackBuild.java

解压重新替换并打包

我们把$ES_HOME/modules/x-pack-core/x-pack-core-7.13.0.jar提取出来,放到一个临时的/elk/x-pack目录中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ export ES_HOME="/usr/share/elasticsearch"
$ cp $ES_HOME/modules/x-pack-core/x-pack-core-7.13.0.jar /elk/x-pack
$ cd /elk/x-pack
# 解压x-pack-core-7.13.0.jar
$ jar -xvf x-pack-core-7.13.0.jar

# 替换.class文件
$ cp /root/XPackBuild.class /elk/x-pack/org/elasticsearch/xpack/core/
$ cp /root/LicenseVerifier.class /elk/x-pack/org/elasticsearch/license/

# 重新打包生成x-pack-core-7.13.0.jar文件
$ cd /elk/x-pack
$ rm -rf x-pack-core-7.13.0..jar # 删除临时拷贝过来的源文件
$ jar cvf x-pack-core-7.13.0..jar .

申请License并修改导入

去官网申请License

我们将申请下来的License中的type改为platinum,将expiry_date_in_millis延长N年时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"license": {
"uid": "92c6b41e-59f9-4674-b227-77063c5fa8b0",
"type": "platinum",
"issue_date_in_millis": 1642291200000,
"expiry_date_in_millis": 3107746200000,
"max_nodes": 100,
"issued_to": "junyao hong (race)",
"issuer": "Web Form",
"signature": "AAAAAwAAAA0kxge9SLSAvWWnMgDEAAABmC9ZN0hjZDBGYnVyRXpCOW5Bb3FjZDAxOWpSbTVoMVZwUzRxVk1PSmkxaktJRVl5MUYvUWh3bHZVUTllbXNPbzBUemtnbWpBbmlWRmRZb25KNFlBR2x0TXc2K2p1Y1VtMG1UQU9TRGZVSGRwaEJGUjE3bXd3LzRqZ05iLzRteWFNekdxRGpIYlFwYkJiNUs0U1hTVlJKNVlXekMrSlVUdFIvV0FNeWdOYnlESDc3MWhlY3hSQmdKSjJ2ZTcvYlBFOHhPQlV3ZHdDQ0tHcG5uOElCaDJ4K1hob29xSG85N0kvTWV3THhlQk9NL01VMFRjNDZpZEVXeUtUMXIyMlIveFpJUkk2WUdveEZaME9XWitGUi9WNTZVQW1FMG1DenhZU0ZmeXlZakVEMjZFT2NvOWxpZGlqVmlHNC8rWVVUYzMwRGVySHpIdURzKzFiRDl4TmM1TUp2VTBOUlJZUlAyV0ZVL2kvVk10L0NsbXNFYVZwT3NSU082dFNNa2prQ0ZsclZ4NTltbU1CVE5lR09Bck93V2J1Y3c9PQAAAQBAD9GxJeiZQonVdEVrn5+frA3tMD18Stcp3fiHVGVdXRzbHQd3N23tTXSyXlqQo0lB/dDt1A4iKh8/Wotp38mFkYq/W/HbJC3hYkJaOQwBPO0aelWYTi4hAxw7c8HSjLf2S4J0dK7LYRW9vfuaK/YrCr42fOGsZ3GX+9WcwbBWT6ONnaJa2dMQRnDsrmcE599LiEz++8GvICWhzfGxjcHk4lsEGmFBC1FajDQsGf/d7oCI3EiNodgSMHtP3u6DZCt8h036wn4gyv5XdH3YauUltsKDmYqGFfD/Udy4kmiKR5qExX4i/K+7q+p4TVJ3GHqgVwtdXGkKiq32qXEqktj6",
"start_date_in_millis": 1642291200000
}
}

好了license.json文件已经OK了.

接下来导入License请确保

1
2
xpack.security.enabled: false
xpack.security.transport.ssl.enabled: false

等更新完升级为白金后再开启配置。

然后加载License到ES中:

1
2
3
$ curl -XPUT -u elastic 'http://localhost:9200/_xpack/license' -H "Content-Type: application/json" -d @license.json
Enter host password for user 'elastic': # 输入elastic用户密码
{"acknowledged":true,"license_status":"valid"} # license写入成功

查看License:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ curl -XGET -uelastic http://localhost:9200/_license
Enter host password for user 'elastic':
{
"license" : {
"status" : "active",
"uid" : "92c6b41e-59f9-4674-b227-77063c5fa8b0",
"type" : "platinum",
"issue_date" : "2019-11-29T00:00:00.000Z",
"issue_date_in_millis" : 1558051200000,
"expiry_date" : "2068-06-24T14:50:00.999Z",
"expiry_date_in_millis" : 2524579200999,
"max_nodes" : 1000,
"issued_to" : "pyker",
"issuer" : "Web Form",
"start_date_in_millis" : 1558051200000
}
}

最后,确保 elasticsearch 和 kibana均重启。单独elasticsearch,不重启kibana,会导致进入kibana时候提示 license无效。

kibana查看许可

RyWe4z

专题目录

ElasticStack-安装篇
ElasticStack-elasticsearch篇
ElasticStack-logstash篇
elasticSearch-mapping相关
elasticSearch-分词器介绍
elasticSearch-分词器实践笔记
elasticSearch-同义词分词器自定义实践
docker-elk集群实践
filebeat与logstash实践
filebeat之pipeline实践
Elasticsearch 7.x 白金级 破解实践
elk的告警调研与实践

背景

我们上次讲filebeat之pipeline实践,用filebeat采集到了es,那么错误日志是不断实时采集上来了,可是能否在出现某种异常的时候能通知告警一下呢,比如通过企业微信机器人通知我们一下,通过短信邮箱通知我们一下?那么我们来调研实践一下elk的告警功能。

kibana Alerting

收费功能,在kibana中现在已经集成了 kibana Alerting功能
破解可查看 Elasticsearch 7.x 白金级 破解实践

2Kpl30

  • Alerts and Actions(规则和连接器)
    Alerts 是运行在 Kibana 的服务, 把一些复杂的条件都隐藏起来功能也较简单,Watcher 提供更复杂条件查找,也可以通过 DSL 设置更复杂的条件。
  • Watcher(监听器)
    Watcher 是运行于 Elasticsearch

Alerts and Actions(规则和连接器)

因为只支持简单的可视化添加规则,暂不做深入。

Watcher(监听器)

一个 watcher 由5个部分组成

1
2
3
4
5
6
7
{
"trigger": {},
"input": {},
"condition": {},
"transform" {},
"actions": {}
}

trigger

这个定义多长时间 watcher 运行一次。比如我们可以定义如下:

1
2
3
4
5
6
7
8
9
"trigger": {
"schedule": {
"daily": {
"at": [
"9:45" //  其实是东八区 17:45
]
}
}
}

这里要注意一下,如果定义的是cron或者具体某个时间,请务必采用UTC时间定义。也就是当前时间-8小时。因为trigger目前只支持utc时间

lMSz6I
2ARF75
相关链接
https://www.elastic.co/guide/en/elasticsearch/reference/7.16/trigger-schedule.html
https://github.com/elastic/elasticsearch/issues/34659

input

input 获取你要评估的数据。要定期搜索日志数据,如查询当天的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"<vpn-log-{now/d{YYYY-MM-dd}}>"
],
"rest_total_hits_as_int": true,
"body": {
"size": 0,
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "now/d",
"lte": "now",
"time_zone": "+08:00"
}
}
}
}
}
}
}
}
}

condition

condition 评估你加载到 watch 中的数据的触发要求,不如总数大于0

1
2
3
4
5
6
7
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},

transform

讲transform的数据装载到ctx.payload,可以不与input一样,这样我们就能在action去拿到我们要进行通知的内容了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
"transform": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"<vpn-log-{now/d{YYYY-MM-dd}}>"
],
"rest_total_hits_as_int": true,
"body": {
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "now/d",
"lte": "now",
"time_zone": "+08:00"
}
}
}
}
},
"aggs": {
"topn": {
"terms": {
"field": "tags"
},
"aggs": {
"source_ip_topn": {
"terms": {
"field": "source_ip"
}
}
}
}
}
}
}
}
}

actions

但是 Watcher 真正的强大在于能够在满足 watch 条件的时候做一些事情。 watch 的操作定义了当 watch 条件评估为真时要做什么。 你可以发送电子邮件、调用第三方 webhook、将文档写入 Elasticsearch 索引或将消息记录到标准 Elasticsearch 日志文件中。这里我们来发一个企业微信机器人webhook

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
"actions": {
"wecom_webhook": {
"webhook": {
"scheme": "https",
"host": "qyapi.weixin.qq.com",
"port": 443,
"method": "post",
"path": "/cgi-bin/webhook/send",
"params": {
"key": "XXX"
},
"headers": {
"Content-Type": "application/json"
},
"body": """{"msgtype":"text","text":{"content":"【vpn监控-每日异常汇总】 - 今日当前共{{ctx.payload.hits.total}}条错误异常\n\n 问题排行:\n\n{{#ctx.payload.aggregations.topn.buckets}} - {{key}} {{doc_count}}次\n{{#source_ip_topn.buckets}} \t {{key}} {{doc_count}}次\n{{/source_ip_topn.buckets}}\n{{/ctx.payload.aggregations.topn.buckets}}\n\n请查看Dashboard定位问题:http://it.dakewe.com/goto/fc2c30d43913c3bc066fd5b470b47953\n账号/密码:public_viewer"}}"""
}
}
}

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
{
"trigger": {
"schedule": {
"daily": {
"at": [
"9:45"
]
}
}
},
"input": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"<vpn-log-{now/d{YYYY-MM-dd}}>"
],
"rest_total_hits_as_int": true,
"body": {
"size": 0,
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "now/d",
"lte": "now",
"time_zone": "+08:00"
}
}
}
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},
"actions": {
"wecom_webhook": {
"webhook": {
"scheme": "https",
"host": "qyapi.weixin.qq.com",
"port": 443,
"method": "post",
"path": "/cgi-bin/webhook/send",
"params": {
"key": "XXX"
},
"headers": {
"Content-Type": "application/json"
},
"body": """{"msgtype":"text","text":{"content":"【vpn监控-每日异常汇总】 - 今日当前共{{ctx.payload.hits.total}}条错误异常\n\n 问题排行:\n\n{{#ctx.payload.aggregations.topn.buckets}} - {{key}} {{doc_count}}次\n{{#source_ip_topn.buckets}} \t {{key}} {{doc_count}}次\n{{/source_ip_topn.buckets}}\n{{/ctx.payload.aggregations.topn.buckets}}\n\n请查看Dashboard定位问题:http://it.dakewe.com/goto/fc2c30d43913c3bc066fd5b470b47953\n账号/密码:public_viewer"}}"""
}
}
},
"transform": {
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": [
"<vpn-log-{now/d{YYYY-MM-dd}}>"
],
"rest_total_hits_as_int": true,
"body": {
"query": {
"bool": {
"filter": {
"range": {
"@timestamp": {
"gte": "now/d",
"lte": "now",
"time_zone": "+08:00"
}
}
}
}
},
"aggs": {
"topn": {
"terms": {
"field": "tags"
},
"aggs": {
"source_ip_topn": {
"terms": {
"field": "source_ip"
}
}
}
}
}
}
}
}
}
}

添加和模拟 Watcher

我们可以从kibana进行watcher的创建和模拟。

1OrAOs
UWAQg9

专题目录

ElasticStack-安装篇
ElasticStack-elasticsearch篇
ElasticStack-logstash篇
elasticSearch-mapping相关
elasticSearch-分词器介绍
elasticSearch-分词器实践笔记
elasticSearch-同义词分词器自定义实践
docker-elk集群实践
filebeat与logstash实践
filebeat之pipeline实践
Elasticsearch 7.x 白金级 破解实践
elk的告警调研与实践

由于新版本的gitlab默认使用puma代替unicorn。如果你的配置文件里面以前启动了uncorn的设置,那么就会出现puma和unicorn冲突的问题。解决方法就是把gitlab.rb中的unicorn的配置改为puma相关配置即可。

旧版:

1
2
3
4
5
6
7
8
9
10
11
unicorn['enable'] = true
unicorn['worker_timeout'] = 60
unicorn['worker_processes'] = 2

unicorn['worker_memory_limit_min'] = "300 * 1 << 20"
unicorn['worker_memory_limit_max'] = "500 * 1 << 20"

sidekiq['concurrency'] = 16

postgresql['shared_buffers'] = "256MB"
postgresql['max_worker_processes'] = 8

新版

1
2
3
4
5
6
7
8
puma['enable'] = true
puma['worker_timeout'] = 60
puma['worker_processes'] = 2
puma['max_threads'] = 4
puma['per_worker_max_memory_mb'] = 1024
sidekiq['max_concurrency'] = 16
postgresql['shared_buffers'] = "256MB"
postgresql['max_worker_processes'] = 8

参照表

jira

JIRA 是一个缺陷跟踪管理系统,为针对缺陷管理、任务追踪和项目管理的商业性应用软件,开发者是澳大利亚的Atlassian。JIRA这个名字并不是一个缩写,而是截取自“Gojira”,日文的哥斯拉发音。 官网

1
2
3
4
5
6
7
8
9
FROM cptactionhank/atlassian-jira-software:8.1.0

USER root

# 将代理破解包加入容器
COPY "atlassian-agent.jar" /opt/atlassian/jira/

# 设置启动加载代理包
RUN echo 'export CATALINA_OPTS="-javaagent:/opt/atlassian/jira/atlassian-agent.jar ${CATALINA_OPTS}"' >> /opt/atlassian/jira/bin/setenv.sh

目录

1
2
3
- JIRA
--Dockerfile
--atlassian-agent.jar

打包运行docker

1
2
3
4
docker build -t dakewe/jira:v8.1.0 .

docker run --name jira --detach --publish 8080:8080 dakewe/jira:v8.1.0

软件运行

Qm3O8C

LACZDG

破解

1
2
3
4
5
6
7
8
9
java -jar atlassian-agent.jar -d -m 82607314@qq.com -n dakewe -p jira -o http://localhost:8090 -s B5WY-IHN3-GITJ-FAA7

// 或
docker exec confluence java -jar /opt/atlassian/confluence/atlassian-agent.jar \
-p jira \
-m 82607314@qq.com \
-n 82607314@qq.com \
-o http://localhost:8090 \
-s BNZF-ITXS-8W2V-IWQP

confluence

Atlassian Confluence(简称Confluence)是一个专业的wiki程序。它是一个知识管理的工具,通过它可以实现团队成员之间的协作和知识共享。官网

1
2
3
4
5
6
7
8
9
FROM cptactionhank/atlassian-confluence:7.9.3

USER root

# 将代理破解包加入容器
COPY "atlassian-agent.jar" /opt/atlassian/confluence/

# 设置启动加载代理包
RUN echo 'export CATALINA_OPTS="-javaagent:/opt/atlassian/confluence/atlassian-agent.jar ${CATALINA_OPTS}"' >> /opt/atlassian/confluence/bin/setenv.sh

目录

1
2
3
- Confluence
--Dockerfile
--atlassian-agent.jar

打包运行

1
2
3
docker build -f Dockerfile -t dakewe/confluence:7.9.3 .

docker run --name confluence --detach --publish 8081:8090 dakewe/confluence:7.9.3

破解

1
2
3
4
5
6
7
8
9
java -jar atlassian-agent.jar -d -m 82607314@qq.com -n DAKEWE -p conf -o http://103.39.231.195 -s BJRF-N4SL-YE98-QPJA

// 或
docker exec confluence java -jar /opt/atlassian/confluence/atlassian-agent.jar \
-p conf \
-m 82607314@qq.com \
-n 82607314@qq.com \
-o http://localhost:8090 \
-s BNZF-ITXS-8W2V-IWQP

mysql示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--创建jira数据库及用户
create database jiradb character set 'UTF8';
create user jirauser identified by 'jira';
grant all privileges on *.* to 'jirauser'@'%' identified by 'jira' with grant option;
grant all privileges on *.* to 'jirauser'@'localhost' identified by 'jira' with grant option;
flush privileges;


--创建confluence数据库及用户
create database confdb character set 'UTF8';
create user confuser identified by 'conf';
grant all privileges on *.* to 'confuser'@'%' identified by 'conf' with grant option;
grant all privileges on *.* to 'confuser'@'localhost' identified by 'conf' with grant option;
flush privileges;

-- 设置confdb事务级别
show variables like 'tx%';
set session transaction isolation level read committed;
show variables like 'tx%';

相关下载

破解文件

背景

上一次我们使用filebeat进行数据采集 filebeat与logstash实践,传输到logstash,并使用的logstash进行数据的过滤处理,本着能减少一个环节就是一个环节,这次我们就省去logstash这个环节,使用filebeats的pipeline的功能来做一次数据处理,并直接入库到es.

本次,我们依然使用的上一次的示例日志数据,整个过程如下

  • 通过kibana的开发工具,在es中进行添加 pipeline,当然你也可以使用es的api进行添加。
  • 在filebeat中定义使用的pipeline名称。
  • 当filebeat采集到数据后,直接发往es,es进行入库前的pipeline处理。

实践

filebeat 定义输出 elasticsearch和pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
filebeat.inputs:
- type: filestream
enabled: true
paths:
- /usr/share/filebeat/logfiles/*.log
include_lines: ['A large volume of broadcast packets has been detected']
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
setup.ilm.enabled: false
setup.template.name: "vpn-log"
setup.template.pattern: "vpn-log-*"
output.elasticsearch:
index: "vpn-log-%{+yyyy-MM-dd}"
hosts: ["10.8.99.34:9200"]
username: "elastic"
password: "XXX"
pipeline: "vpn_log_pipeline"
processors:
- drop_fields:
fields:
- agent.ephemeral_id
- agent.hostname
- agent.id
- agent.type
- agent.version
- ecs.version
- input.type
- log.offset
- version

在kibana的开发工具进行创建pipeline处理

先测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 模拟测试pipeline
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description" : "vpn_log_pipeline",
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : [
"""%{TIMESTAMP_ISO8601:error_time} \[HUB "%{NOTSPACE:hub}"\] Session "%{NOTSPACE:session}": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is %{NOTSPACE:mac_address}, the source IP address is %{IPV4:source_ip}, the destination IP address is %{IPV4:destination_ip}. The number of broadcast packets is equal to or larger than %{NUMBER:items_per_second} items per 1 second """
],
"ignore_failure" : true
},
"convert" : {
"field" : "items_per_second",
"type" : "integer",
"ignore_failure" : true
}
},
{
"date" : {
"field" : "error_time",
"target_field" : "@timestamp",
"formats" : [
"yyyy-MM-dd HH:mm:ss.SSS"
],
"timezone" : "Asia/Shanghai"
}
}
]
},
"docs": [
{
"_source": {
"message": """2022-01-17 14:19:07.047 [HUB "hub_dkwbj"] Session "SID-BRIDGE-20": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 70-B5-E8-2F-C9-5C, the source IP address is 192.168.9.134, the destination IP address is 0.0.0.0. The number of broadcast packets is equal to or larger than 34 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect)."""
}
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 增加pipeline
PUT _ingest/pipeline/vpn_log_pipeline
{
"description": "vpn_log_pipeline",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:error_time} \\[HUB \"%{NOTSPACE:hub}\"\\] Session \"%{NOTSPACE:session}\": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is %{NOTSPACE:mac_address}, the source IP address is %{IP:source_ip}, the destination IP address is %{IP:destination_ip}. The number of broadcast packets is equal to or larger than %{NUMBER:items_per_second} items per 1 second "
],
"ignore_failure": true
},
"convert": {
"field": "items_per_second",
"type": "integer",
"ignore_failure": true
}
},
{
"date": {
"field": "error_time",
"target_field": "@timestamp",
"formats": [
"yyyy-MM-dd HH:mm:ss.SSS"
],
"timezone": "Asia/Shanghai"
}
}
]
}

// 其他操作
GET _ingest/pipeline/vpn_log_pipeline
DELETE _ingest/pipeline/vpn_log_pipeline
1
2
3
4
5
6
7
8
9
10
11
//模拟测试添加的pipeline
GET _ingest/pipeline/vpn_log_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": """2022-01-17 14:19:07.047 [HUB "hub_dkwbj"] Session "SID-BRIDGE-20": A large volume of broadcast packets has been detected. There are cases where packets are discarded based on the policy. The source MAC address is 70-B5-E8-2F-C9-5C, the source IP address is 192.168.9.134, the destination IP address is 0.0.0.0. The number of broadcast packets is equal to or larger than 34 items per 1 second (note this information is the result of mechanical analysis of part of the packets and could be incorrect)."""
}
}
]
}

开启filebeat,日志文件将传输到es并通过pipeline进行处理,其中pipeline中使用了processors的grok,与logstash相似。

专题目录

ElasticStack-安装篇
ElasticStack-elasticsearch篇
ElasticStack-logstash篇
elasticSearch-mapping相关
elasticSearch-分词器介绍
elasticSearch-分词器实践笔记
elasticSearch-同义词分词器自定义实践
docker-elk集群实践
filebeat与logstash实践
filebeat之pipeline实践
Elasticsearch 7.x 白金级 破解实践
elk的告警调研与实践

背景

开发时,碰到互斥问题,需要保证数据的一致性,避免重复性操作,我们就需要用锁来解决,简单的讲锁其实就是为了解决并发所带来的问题。

常见问题可查看从线上事故看mongodb事务ACID强弱

举个例子:库存,并发情况下,我们通过ab模拟,一次10个请求,假设我们下单去核销库存数量,先查询订单数量,后扣除库存,那么在10个请求并发下,我们就可能获取到了错误的库存数量。这在事务中我们认为是脏读或幻读。

数据库的并发控制机制不外乎就两种情况:

  • 悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作。

悲观锁假定其他用户企图访问或者改变你正在访问、更改的对象的概率是很高的,因此在悲观锁的环境中,在你开始改变此对象之前就将该对象锁住,并且直到你提交了所作的更改之后才释放锁。悲观的缺陷是不论是页锁还是行锁,加锁的时间可能会很长,这样可能会长时间的限制其他用户的访问,也就是说悲观锁的并发访问性不好。

  • 乐观锁:假设不会发生并发冲突,只在提交操作时检查是否违反数据完整性,乐观锁不能解决脏读和多读的问题。

乐观锁则认为其他用户企图改变你正在更改的对象的概率是很小的,因此乐观锁直到你准备提交所作的更改时才将对象锁住,当你读取以及改变该对象时并不加锁。可见乐观锁加锁的时间要比悲观锁短,乐观锁可以用较大的锁粒度获得较好的并发访问性能。但是如果第二个用户恰好在第一个用户提交更改之前读取了该对象,那么当他完成了自己的更改进行提交时,数据库就会发现该对象已经变化了,这样,第二个用户不得不重新读取该对象并作出更改。这说明在乐观锁环境中,会增加并发用户读取对象的次数。

我们经常用的lock就是一种悲观锁。

乐观锁

todo

悲观锁

基于redis的分布式锁

关于redis的分布式锁,redis官方引出了一个算法,命名为redlock。
同时,提供了各类的实现可供使用,例如Redlock-rb for Ruby、Redlock-py for Python、Redisson for Java等。
因此,深入了解Redis分布锁的运用同时分析下node-redlock

0%