rDT8Lc
Flink 根据使用的便捷性和表达能力的强弱提供了 3 层 API,由上到下,表达能力逐渐增强,比如 processFunction,是最底层的 API,表达能力最强,我们可以用他来操作 state 和 timer 等复杂功能。
Datastream API 相对于 processFunction 来说,又进行了进一步封装,提供了很多标准的语义算子给大家使用,比如我们常用的 window 算子(包括 Tumble, slide,session 等)。
最上面的 SQLTable API 使用最为便捷,具有自身的很多特点:
YZGCfu

我们从DataStream API、Table API、SQL依次看看使用方式

DataStream API

构成步骤

  • 一、获取执行环境(execution environment)
  • 二、读取数据源(source)
  • 三、定义给予数据的转换操作(transformations)
  • 四、定义计算结果的输出位置(sink)
  • 五、触发程序执行(execute)

示例1:不同的数据来源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.race.wc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;

public class sourceTest {
public static void main(String[] args) throws Exception {

// 一、获取执行环境(execution environment)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 二、读取数据源(source)
// ①. 从文件中读取数据
DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");

// ②. 从集合中读取数据
ArrayList<Integer> nums = new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource<Integer> numStream = env.fromCollection(nums);

ArrayList<Event> events = new ArrayList<>();
events.add(new Event("Marry", "./home", 1000L));
DataStreamSource<Event> stream2 = env.fromCollection(events);

// ③. 从元素读取数据
DataStreamSource<Event> stream3 = env.fromElements(
new Event("Marry", "./home", 1000L),
new Event("Marry", "./home", 1000L),
new Event("Marry", "./home", 1000L)
);

// ④. 从socket文本流获取数据 需打开nc -l 9999
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);

// ⑤. 从kafka读取数据
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

// 三、定义给予数据的转换操作(transformations)

// 四、定义计算结果的输出位置(sink)
stream1.print("1");
numStream.print("nums");
stream2.print("2");
stream3.print("3");
socketStream.print("socket");
kafkaStream.print("kafka");

// 五、触发程序执行(execute)
env.execute();
}
}

示例2:定义给予数据的转换操作(transformations)

1、演示 flatMap、keyBy、sum的转换操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.race.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 一、获取执行环境(execution environment)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 二、读取数据源(source)
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
// 三、定义给予数据的转换操作(transformations)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);

// 四、定义计算结果的输出位置(sink)
dataStream.print();

// 五、触发程序执行(execute)
env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

flatMap、keyBy都可以使用 Lambda 表达式,更直观简洁
注意 :使用 lambda表达式申明Java泛型时,需要显式声明类型信息 .returns(Types.TUPLE(Types.STRING, Types.LONG))

E4f7ct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap((String sentence, Collector<Tuple2<String, Integer>> out)->{
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG)) // // 显式提供类型信息
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);

dataStream.print();

env.execute("Window WordCount");
}

2、演示 max,maxBy,min,minby的转换操作

  • max返回最大值
    如果使用max,除了keyBy的字段和参与比较大小的字段,如果还有其他字段,为了返回结果对齐,返回的结果的其他字段就是第一次出现的字段
  • maxBy 把最大值对应的元素全部返回
    使用maxBy,除了keyBy的字段和参与比较大小的字段,如果还有其他字段,会返回最大值所在数据全部的数据,另外,maxBy有第二个的参数,用来确定当比较字段出现相同时,返回之前的还是现在的,默认返回之前的,设置为false则返回新的,设置为true返回之前的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class TSource {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List list = new ArrayList<Tuple3<Integer, Integer, String>>();
list.add(new Tuple3<>(0, 1, "a"));
list.add(new Tuple3<>(0, 3, "b"));
list.add(new Tuple3<>(0, 2, "c"));
list.add(new Tuple3<>(0, 4, "d"));
list.add(new Tuple3<>(1, 5, "a"));
list.add(new Tuple3<>(1, 2, "b"));
list.add(new Tuple3<>(1, 7, "c"));

DataStreamSource<Tuple3<Integer, Integer, String>> stringDataStreamSource = env.fromCollection(list);

KeyedStream<Tuple3<Integer, Integer, String>, Integer> result = stringDataStreamSource
.keyBy(0);

result.max(1).print("max最大值");
result.maxBy(1).print("maxBy元素");

// min,minBy同理

env.execute("测试");
}
}

原数据:

原数据:3> (0,1,a)
原数据:3> (0,3,b)
原数据:3> (0,2,c)
原数据:3> (0,4,d)
原数据:3> (1,5,a)
原数据:3> (1,2,b)
原数据:3> (1,7,c)


返回结果:
max最大值:3> (0,1,a)
max最大值:3> (0,3,a)
max最大值:3> (0,3,a)
max最大值:3> (0,4,a)
max最大值:3> (1,5,a)
max最大值:3> (1,5,a)
max最大值:3> (1,7,a)

maxBy元素:3> (0,1,a)
maxBy元素:3> (0,3,b)
maxBy元素:3> (0,3,b)
maxBy元素:3> (0,4,d)
maxBy元素:3> (1,5,a)
maxBy元素:3> (1,5,a)
maxBy元素:3> (1,7,c)

Table API & SQL

构成步骤

  • 一、创建表环境
  • 二、创建输入表 source
  • 三、创建输出表 sink
  • 四、查询转换得到一个新的表
  • 五、写入输出表

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 创建表环境
//方式1,使用setting的方式创建
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // 使用流处理模式
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// 方式2, 使用StreamExecutionEnvironment来构建table env
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )");

// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");

// 方式1、执行 SQL 对表进行查询转换,得到一个新的表
// Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");

// 方式2、使用 Table API 对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);

// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");

背景

1
2
3
4
5
6
7
8
9
10
11
12
version: '3'
services:
mysql:
restart: always
image: mysql:5.7.23
container_name: mysql
ports:
- 3306:3306
volumes:
- ./conf:/etc/mysql/conf.d
- ./logs:/var/log/mysql
- ./data:/var/lib/mysql

部署mysql的时候,将mysql目录进行挂载时提示

1
cannot read directory '/var/lib/mysql/': Permission denied

原因

我们聚集到entrypoint.sh这个文件,因为docker的镜像变成运行态的时候,也就是容器的时候,是需要命令来启动的,这个启动的命令就是entrypoint.sh,也就是说,启动的这个文件肯定是篡改了原来的目录,下面具体看下这个文件的某个关键代码片段

1
2
3
4
5
6
7
8
# allow the container to be started with `--user`
if [ "$1" = 'mysqld' -a -z "$wantHelp" -a "$(id -u)" = '0' ]; then
_check_config "$@"
DATADIR="$(_get_config 'datadir' "$@")"
mkdir -p "$DATADIR"
chown -R mysql:mysql "$DATADIR"
exec gosu mysql "$BASH_SOURCE" "$@"
fi

这段代码的意思是说,该程序运行指定用户来启动mysql,但是如果你不指定用户,他会默认用mysql用户来给DATADIR赋权限,并用mysql用户来执行脚本,所以实际上,用户就变mysql了,但是为啥在宿主机上,却是polkitd呢?

F5qwr7

1
2
// 容器内查看用户名为`mysql` 对应的用户ID和组ID为`999`
cat /etc/passwd | grep mysql

WlHpMT

1
2
// 宿主机器查看999所对应的用户名
cat /etc/passwd | grep 999

如上图,可以看到,在容器内部,他的用户为mysql,他的用户ID为999,然后退出容器,在宿主机上,可以看到ID为999的用户ID对应的用户变成了polkitd,所以,到了这里就明白了,实际上容器内部和外部是用的同一套用户,名字可能不同,但是ID用的是同一个,从而导致,ID虽然相同,但是用户不一致,从而权限也出现了差别。

1、其实在操作系统中,真正决定用户和用户组的东西并不是用户名和组名,而是相应的用户id 和 对应的组id,当我们刚创建用户时系统就会给用户分配对应的用户id 和 组id,可以在/etc/passwd中查看

2、在遇到的问题描述中,因为容器中只创建了一个mysql用户和组,因此容器中查看到mysql用户id和组id为999:999,因此给/var/lib/mysql目录用户权限时其实是给予的用户id和组id为999:999权限,因此,在宿主机中./data此目录的用户权限也变为999:999,因为宿主机中此用户id和组id对应的为polkitd,因此就显示为polkitd。

解决

方法1 推荐

在配置my.cnf时,指定error-log的位置在/var/log/下,否则error的默认位置为例如/var/lib/mysql下的mysqld.log文件因为目录映射后有权限问题,写入不了日志。

方法2

宿主机器更改与docker相同的用户ID和组id
chown 999

方法3

同步宿主/etc/passwd到容器内,从而达到宿主和容器内相同的用户id和组ID

1
2
3
4
5
6
7
8
9
10
11
[root@localhost ~]# getenforce
Enforcing
[root@localhost ~]# setenforce 0
[root@localhost ~]# getenforce
Permissive
[root@localhost ~]# systemctl restart docker
[root@localhost ~]# vi /etc/selinux/config
#修改 SELINUX=disabled
[root@localhost ~]# init 6
#重启系统

实验

一直以来比较困惑容器挂载卷的文件权限问题,今天特地梳理下docker的挂载卷权限问题

首先,本地挂载点新建测试文件test.txt,然后将该目录挂载到容器的test目录
e7n7BA
可以看到我们在容器外新建的test.txt文件在容器内的所有者变成了1000,然后我们在容器内新建文件查看容器外的权限,发现容器中新建的test1.txt所有者变成了root
AfaH0v

接着查看下UID:1000所属用户
olz9nC
最后我们在指定wag用户启动docker,然后在容器内新建test3.txt文件,查看文件所有者
4fr3ct

从以上可以得出,docker启动容器如果不指定用户,会默认以root(UID=0)方式运行,导致其中新建的文件所有者映射到容器外为root,容器外新建的文件映射到容器内所有者UID不变。

制作Doris镜像

我们在OLAP之Doris编译的基础上,开始制作docker镜像

WxlWez

我们为我们的目录增加Dockerfile_feDockerfile_be两个Dockerfile文件

  • fe镜像Dockerfile(cd到编译好的output目录)

Dockerfile_fe

1
2
3
4
5
6
FROM primetoninc/jdk:1.8
# RUN yum install net-tools -y
COPY fe /opt/fe
WORKDIR /opt/fe
EXPOSE 8030 9030
ENTRYPOINT ["/opt/fe/bin/start_fe.sh"]

构建fe镜像,创建并配置镜像映射文件doris-meta和conf,启动容器

1
docker build -t doris-fe:0.15.0 -f Dockerfile_fe .
  • be镜像Dockerfile

Dockerfile_be

1
2
3
4
5
6
FROM primetoninc/jdk:1.8
# RUN yum install net-tools -y
COPY be /opt/be
WORKDIR /opt/be
EXPOSE 9050
ENTRYPOINT ["/opt/be/bin/start_be.sh"]

构建be镜像,配置be镜像映射文件storage,启动3个be容器组成集群。Doris默认至少安装3个be实例。

1
docker build -t doris-be:0.15.0 -f Dockerfile_be .

需要使用镜像可从官方pull,已经上传最新

1
2
docker pull bulolo/doris-fe:0.15.0
docker pull bulolo/doris-be:0.15.0

docker运行

docker run

不推荐使用docker run,因为还要进入容器内查看IP,再添加对应ip的backends

FE 运行

1
docker run -itd --name fe_1 -p 8030:8030 -p 9030:9030 -v <LOCAL_PATH>/fe_1/conf:/opt/fe/conf -v <LOCAL_PATH>/fe_1/log:/opt/fe/log -v <LOCAL_PATH>/fe_1/doris-meta:/opt/fe/doris-meta doris-fe:0.15.0

BE 运行

1
2
3
docker run -itd --name be_1 -p 9150:9050 -v <LOCAL_PATH>/be_1/conf:/opt/be/conf -v <LOCAL_PATH>/be_1/storage:/opt/be/storage doris-be:0.15.0
docker run -itd --name be_2 -p 9250:9050 -v <LOCAL_PATH>/be_2/conf:/opt/be/conf -v <LOCAL_PATH>/be_2/storage:/opt/be/storage doris-be:0.15.0
docker run -itd --name be_3 -p 9350:9050 -v <LOCAL_PATH>/be_3/conf:/opt/be/conf -v <LOCAL_PATH>/be_3/storage:/opt/be/storage doris-be:0.15.0

docker-compose

推荐使用docker-compose

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
version: '3.7'

services:
doris-fe1:
image: doris-fe:0.15.0
container_name: doris-fe1
ports:
- 8030:8030
- 9030:9030
ulimits:
nofile:
soft: "65536"
hard: "65536"
volumes:
- ./fe_1/conf:/opt/fe/conf
- ./fe_1/log:/opt/fe/log
- ./fe_1/doris-meta:/opt/fe/doris-meta
networks:
doris-network :
ipv4_address: 172.66.0.100
doris-be1:
image: doris-be:0.15.0
container_name: doris-be1
ports:
- 9150:9050
ulimits:
nofile:
soft: "65536"
hard: "65536"
volumes:
- ./be_1/conf:/opt/be/conf
- ./be_1/log:/opt/be/log
- ./be_1/storage:/opt/be/storage
networks:
doris-network :
ipv4_address: 172.66.0.101
doris-be2:
image: doris-be:0.15.0
container_name: doris-be2
ports:
- 9250:9050
ulimits:
nofile:
soft: "65536"
hard: "65536"
volumes:
- ./be_2/conf:/opt/be/conf
- ./be_2/log:/opt/be/log
- ./be_2/storage:/opt/be/storage
networks:
doris-network :
ipv4_address: 172.66.0.102
doris-be3:
image: doris-be:0.15.0
container_name: doris-be3
ports:
- 9350:9050
ulimits:
nofile:
soft: "65536"
hard: "65536"
volumes:
- ./be_3/conf:/opt/be/conf
- ./be_3/log:/opt/be/log
- ./be_3/storage:/opt/be/storage
networks:
doris-network :
ipv4_address: 172.66.0.103

networks:
doris-network:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.66.0.0/16
gateway: 172.66.0.1

说明:

ipv4_address:容器绑定固定ip
ulimits:设置系统最大打开文件句柄数,就是:

1
2
3
vi /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536

fe.conf
根据ip修改priority_networks

1
priority_networks = 172.66.0.100/16

be.conf
根据ip修改priority_networks

1
priority_networks = 172.66.0.101/16

执行docker-compose up -d

在 FE 中添加所有 BE 节点。本地需要安装mysql,Doris实现mysql协议,使用mysql客户端登录fe,默认用root密码为空。

1
2
3
4
mysql -P9030 -uroot -p
ALTER SYSTEM ADD BACKEND "172.66.0.101:9050";
ALTER SYSTEM ADD BACKEND "172.66.0.102:9050";
ALTER SYSTEM ADD BACKEND "172.66.0.103:9050";

修改密码

1
SET PASSWORD FOR 'root' = PASSWORD('123456'); 

使用 mysql-client 连接到 FE,并执行 SHOW PROC '/backends'; 查看 BE 运行情况。如一切正常,isAlive 列应为 true。
1d9Van
查看 Follower 或 Observer 运行状态。使用 mysql-client 连接到任一已启动的 FE,并执行:SHOW PROC '/frontends'; 可以查看当前已加入集群的 FE 及其对应角色。
vinOjZ
至此Doris安装完成,portal页面地址:http://localhost:8030/

打开后输入root和密码进入,如果没有修改过密码,则不填写密码

b03x7C

查看FE

mqdZ1G

查看BE

Is2zh8

基础使用

  • 添加删除查看FE
    1
    2
    3
    4
    ALTER SYSTEM ADD FOLLOWER "hostname:9050";
    ALTER SYSTEM DROPP FOLLOWER "hostname:9050";
    SHOW PROC '/frontends';
    show backends \G
  • 增加删除查看BE
    1
    2
    3
    4
    5
    ALTER SYSTEM ADD BACKEND "hostname:9050";
    ALTER SYSTEM DROPP BACKEND "hostname:9050"; // 不推荐
    ALTER SYSTEM DECOMMISSION BACKEND "hostname:9050"; 推荐
    SHOW PROC '/backends';
    SHOW PROC '/backends'\G
  • 创建数据库
    1
    create database doris;
  • 创建用户
    1
    create user 'doris' identified by 'password';
  • 赋权
    1
    grant all on doris to doris;
  • 创建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    create table table1(
    id int default '0',
    name varchar(32) default '',
    city_code smallint,
    pv bigint sum default '0'
    )
    aggregate key(id, name, city_code)
    distributed by hash(id) buckets 10
    properties('replication_num' = '3');
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    create table table2(
    id int default '0',
    name varchar(32) default '',
    city_code smallint,
    event_day date,
    pv bigint sum default '0'
    )
    aggregate key(id, name, city_code, event_day)
    partition by range(event_day)
    (
    partition p202107 values less than ('2021-08-01'),
    partition p202108 values less than ('2021-09-01'),
    partition p202109 values less than ('2021-10-01')
    )
    distributed by hash(id) buckets 10
    properties('replication_num' = '3');
  • 插入表
    1
    insert into table1(id, name, city_code, pv) values(2, 'grace', 1, 2),(5, 'helen', 3, 3),(3, 'tom', 2, 2);

相关链接

OLAP之Doris编译

背景及目标

1.尝试 CDC mysql 数据到doris
2.通过网站访问pv,使用doris实践对数据的PV进行预聚合

源表

  • 创建Mysql数据库表

    1
    2
    3
    4
    5
    6
    CREATE TABLE `mysql_pv` (
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT DEFAULT '0'
    );
  • 创建doris表

    这里对PV进行SUM预聚合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE TABLE doris_pv
    (
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
    )
    AGGREGATE KEY(siteid, citycode, username)
    DISTRIBUTED BY HASH(siteid) BUCKETS 10
    PROPERTIES("replication_num" = "1");

映射表

source

mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE mysql_pv_source ( 
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'demo',
'table-name' = 'offices'
);

sink

doris

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE doris_pv_sink (
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_pv',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
)

执行

1
INSERT INTO doris_pv_sink select siteid,citycode,username,pv from mysql_pv_source

相关链接

使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

背景

OLAP下,我选doris,来试试如何安装。

编译

6cm3Lk
根据自己的情况选择对应的环境,我这里按照0.15为例

docker pull apache/incubator-doris:build-env-for-0.15.0

运行镜像

方式一:提前下载好源码,挂载到镜像中

1
docker run -it -v /your/local/.m2:/root/.m2 -v /your/local/incubator-doris-DORIS-x.x.x-release/:/root/incubator-doris-DORIS-x.x.x-release/ apache/incubator-doris:build-env-for-0.15.0

这条命令挂载了源码,和maven的本地库,可以避免一些包每次都要下载,其中:
/your/local/.m2: 本地maven仓库的地址
/your/local/incubator-doris-DORIS-x.x.x-release/:源码的路径
/root/incubator-doris-DORIS-x.x.x-release/:源码挂载后的目录名
apache/incubator-doris:build-env-for-0.15.0:刚才拉取的环境

方式二:在docker中拉取源码

1
2
3
4
5
6
7
8
9
① docker run -it apache/incubator-doris:build-env-for-0.15.0

②wget wget https://dist.apache.org/repos/dist/dev/incubator/doris/0.15/0.15.0-rc04/apache-doris-0.15.0-incubating-src.tar.gz    

这个地址只是0.15版本的doris

或者还可以视同git拉取指定版本的源码

git clone --branch branch-0.15 https://github.com/apache/incubator-doris.git

如果是最新主干版本代码,使用 apache/incubator-doris:build-env-latest进行编译
2grt5S

更改jdk版本

从 build-env-1.3.1 的docker镜像起,同时包含了 OpenJDK 8 和 OpenJDK 11,并且默认使用 OpenJDK 11 编译。请确保编译使用的 JDK 版本和运行时使用的 JDK 版本一致,否则会导致非预期的运行错误。你可以使用在进入编译镜像的容器后,使用以下命令切换默认 JDK 版本:
切换到 JDK 8:

1
2
3
$ alternatives --set java java-1.8.0-openjdk.x86_64
$ alternatives --set javac java-1.8.0-openjdk.x86_64
$ export JAVA_HOME=/usr/lib/jvm/java-1.8.0

切换到 JDK 11:

1
2
3
$ alternatives --set java java-11-openjdk.x86_64
$ alternatives --set javac java-11-openjdk.x86_64
$ export JAVA_HOME=/usr/lib/jvm/java-11

编译

1
sh build.sh

注意:
如果你是第一次使用 build-env-for-0.15.0 或之后的版本,第一次编译的时候要使用如下命令:
sh build.sh --clean --be --fe --ui
这是因为 build-env-for-0.15.0 版本镜像升级了 thrift(0.9 -> 0.13),需要通过 –clean 命令强制使用>新版本的 thrift 生成代码文件,否则会出现不兼容的代码。

注:编译整个过程大概2小时以上,是极为痛苦的。T T,本人编译超过10次以上,2台电脑开docker编译,编译过程,遇到多次fail退出,大部分问题是docker内存不足导致,所以务必将docker的内存调到至少10GB,否则一旦编译内存不足就会失败。

2017款macbook pro 编译成功
2022款macbookpro M1 pro 编译失败

痛哭流涕,终于编译完成,见到这个编译完成命令。

eBvN36

进入到output目录,可以看到be,fe,udf三个文件夹,接下来就是安装部署了

我们看到整个doris社区很多人想体验Doris进行尝鲜,但是苦于环境以及服务器的限制,那么我们来制作一下Doris的镜像及尝试用docker搭建集群

OLAP之Doris的docker镜像和集群搭建

Sf0NUr

背景

在OLAP的选型上,陆陆续续有1年多了。做个总结。

OLTP

OLTP:On-Line Transaction Processing,联机(线上)事务处理。指处理事务型应用的场景,事务这个词的英文是 Transaction,其实就是交易(一般指金融交易)。这种场景有个最常见的业务,就是转账,从一个账户转到另一个账户。一般要求实时处理,对响应的速度要求很高,并且要保证事务的 ACID 特性。面向 OLTP 场景的数据库管理系统就叫做 OLTP DBMS。一般涉及大量的增删改操作。

OLTP 与 NSM(N-ary storage model)行存储: OLTP 的场景一般需要一次操作一个对象的多个属性,比如查询一个人的姓名、银行账号、余额等。而 NSM 这种将一个对象的多个属性连续存储的行式存储模型就很适合 OLTP 的场景了。同时 NSM 也适用于写密集场景,一个对象的写入只需要一次写磁盘就能完成。

传统的关系数据库都是面向 OLTP 场景的,如 Oracle 通常用在银行系统、医疗系统等对操作的响应速度要求很高的场景。

OLAP

OLAP:On-Line Analytical Processing,联机(线上)分析处理。指处理分析型应用的场景。进入大数据时代,数据多了,计算机计算能力增强,并出现了分布式存储、分布式计算等技术,人们开始对大量的数据有分析的需求了。这种分析型的场景一般需要查询大量的数据进行分析,对速度的要求没有 OLTP 高,每天晚上或每周做一次,慢慢分析就好了。一般涉及大量的查询操作,对数据的修改需求不高。

列存数据库等就是面向 OLAP 的,因此,列式存储在大数据时代这种分析型场景中火了一把,如数据仓库 HBase。

OLAP的12准则

  • 准则1 OLAP模型必须提供多维概念视图
  • 准则2 透明性准则
  • 准则3 存取能力准则
  • 准则4 稳定的报表能力
  • 准则5 客户/服务器体系结构
  • 准则6 维的等同性准则
  • 准则7 动态的稀疏矩阵处理准则
  • 准则8 多用户支持能力准则
  • 准则9 非受限的跨维操作
  • 准则10 直观的数据操纵
  • 准则11 灵活的报表生成
  • 准则12 不受限的维与聚集层次

OLAP场景的关键特征

  • 大多数是读请求
  • 数据总是以相当大的批(> 1000 rows)进行写入
  • 不修改已添加的数据
  • 每次查询都从数据库中读取大量的行,但是同时又仅需要少量的列
  • 宽表,即每个表包含着大量的列
  • 较少的查询(通常每台服务器每秒数百个查询或更少)
  • 对于简单查询,允许延迟大约50毫秒
  • 列中的数据相对较小:数字和短字符串(例如,每个URL 60个字节)
  • 处理单个查询时需要高吞吐量(每个服务器每秒高达数十亿行)
  • 事务不是必须的
  • 对数据一致性要求低
  • 每一个查询除了一个大表外都很小
  • 查询结果明显小于源数据,换句话说,数据被过滤或聚合后能够被盛放在单台服务器的内存中

场景示例:

  • 市场营销:当 OLAP 用于营销时,它使营销分析师能够更多地了解他们的客户、哪些产品是有价值的、区域和季节性趋势等等。
  • 医疗保健:医疗保健数据仓库使用 OLAP 可用于预测健康风险和结果、与保险公司共享信息并生成报告。
  • 金融服务:公司首席财务官可以利用 OLAP 多维数据集为他们提供将数据转换为所需信息的方法,同时允许他们轻松生成定制财务报告。

OLAP分析的分类:ROLAP与MOLAP

  • ROLAP(RelationalOLAP)

这是一种通过在RDMS后端服务和客户前端之间建立中间层的OLAP实现方式。通过RDMS来存储和管理数据仓库数据,而通过OLAP中间件来实现多维数据上的操作映射为标准关系操作。其优点在于可以利用RMDS中本身固有的一些功能,例如: 本质上来讲 “slicing and dicing”的操作等同于在SQL语句中添加“WHERE”子句。

  • MOLAP(MultidimensionalOLAP)

这些服务器通过基于数据的多维存储引擎,支持数据的多维视图。能够将多维视图直接映射到数据立方体数组结构。其数据都存在多维数据立方体(multidimensional cube)中,以专有的格式存储。使用数据立方体的优点是能够对预计算的汇总数据进行快速索引,尤其是对”slicing and dicing”有着非常优秀的支持。

  • HOLAP(HybridOLAP)混合型OLAP

HOLAP结合了ROLAP和MOALP技术,从而继承了ROLAP的伸缩性强和MOLAP快速计算的优点。例如HOLAP利用多维数据集技术来提高性能,而当需要详细数据时,HOALP可以从多维数据“钻取”到底层的RDMS中去获取数据。

名称 描述 细节数据存储位置 聚合后的数据存储位置
ROLAP(Relational OLAP) 基于关系数据库的OLAP实现 关系型数据库 关系型数据库
MOLAP(Multidimensional OLAP) 基于多维数据组织的OLAP实现 数据立方体 数据立方体
HOLAP(Hybrid OLAP) 基于混合数据组织的OLAP实现 关系型数据库 数据立方体

对比

o0tsmp

联机分析处理(OLAP,On-line Analytical Processing),数据量大,DML少。使用数据仓库模板
联机事务处理(OLTP,On-line Transaction Processing),数据量少,DML频繁。使用一般用途或事务处理模板

OLTP与OLAP 不同的是,OLTP系统强调数据库内存效率,强调内存各种指标的命令率,强调绑定变量,强调并发操作,强调事务性。
OLAP系统则强调数据分析,强调SQL执行时长,强调磁盘I/O,强调分区。

总的来说,可以认为OLAP的产生是因为一些特性OLTP无法满足,所以一些OLTP异变了一些特性,变成了OLAP,OLAP可以看做是OLTP的一种延展,一个让OLTP产生的数据发现价值的过程。

OLAP数据库选型

n2LrFd
常见的OLTP如,mysql、PostgreSQL、Oracle等,不需要多讲,但是OLAP随着这些年层出不穷,我们汇总选型一下,并看看他们的特点。

目前市面上主流的开源OLAP引擎包含不限于:Hive、Spark SQL、Presto、Kylin、Impala、Druid、Clickhouse、Greeplum等,可以说目前没有一个引擎能在数据量,灵活程度和性能上做到完美,用户需要根据自己的需求进行选型。

  • Hive
  • Spark SQ
  • Presto
  • Elasticsearch
  • Impala
  • Druid
  • Clickhouse
  • Greeplum
  • Kylin
  • Drios
  • StarRocks

按数据量划分

CM5qTF

按建模类型划分

1、ROLAP

Elasticsearch
solr
ClickHouse
Druid
GreenPlum
Drios
StarRocks

2、MOLAP

3、HOLAP

I3OVQU

背景

我们之前在Flink CDC同步数据实践 快速的体验了从mysql和pg获取数据,最后在es打成宽表数据。但是对于其他的数据库如sql server等source表的实际能力还未可知,本次就一次的调研实践一次。

安装:从代码编译

正常情况下直接下载编译好的即可

因为flink cdc新特性在master分支,并且没有release,比如新版sql server在master,未发布release-2.2,我们来从源码编译

1
2
3
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

常见FlinkSql命令

首先你得启动吧

1
2
3
4
// 启动集群
bin/start-cluster.sh
// 停止集群
bin/stop-cluster.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 启动集群后,进入flink sql 客户端命令行界面
bin/sql-client.sh embedded

// 表格模式
SET 'sql-client.execution.result-mode' = 'table';
// 变更日志模式
SET 'sql-client.execution.result-mode' = 'changelog';
// Tableau模式
SET 'sql-client.execution.result-mode' = 'tableau';

// 查看当前运行的jobs
bin/flink list
// 查看所有的任务,包括失败、成功、取消的
bin/flink list -a
// 取消命令
bin/flink cancel jobID
1
2
3
4
5
6
7
8
9
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;
SHOW VIEWS;
SHOW FUNCTIONS;
SELECT CURRENT_TIMESTAMP;
RESET table.planner;
RESET
quit

Source表

DataGen ☑️ 测试通过

在flink 1.11中,内置提供了一个DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);

select * from datagen;

filesystem ☑️ 测试通过

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE employee_information (
emp_id INT,
name VARCHAR,
dept_id INT
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/something.csv',
'format' = 'csv'
);

SELECT * from employee_information WHERE dept_id = 1;

mongodb ☑️ 测试通过

先决条件:副本集要求,你懂的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE offices (
_id STRING,
name STRING,
addr STRING,
status BOOLEAN,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '10.8.99.44:27011',
'username' = 'root',
'password' = '@junyao2022',
'database' = 'biocitydb',
'collection' = 'offices'
);

select * from offices;

mysql ☑️ 测试通过

先决条件:binlog开启,你懂的

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE products (
id INT,
name STRING,
description STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);

postgres ☑️ 测试通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);

sql Server ☑️ 测试通过

先决条件:需要先开启sql server的cdc能力:EXEC sys.sp_cdc_enable_db;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE material (
FMATERIALID INT,
FNUMBER STRING,
PRIMARY KEY (FMATERIALID) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '10.8.99.31',
'port' = '1433',
'username' = 'sa',
'password' = 'XXXsa1999$',
'database-name' = 'dkw',
'schema-name' = 'dbo',
'table-name' = 'T_BD_MATERIAL'
);

select * from material;

Sink表

elasticsearch

http验证通过 ✅
https、ssl,带证书未知如何配置 ❎

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP_LTZ(3),
customer_id INT,
price DECIMAL(10, 5),
product ROW<name STRING, description STRING>,
order_status BOOLEAN,
customer_name STRING,
customer_address STRING,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders',
'username' = 'root',
'password' = 'password'
);

Doris

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE doris_test_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_test',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
)

执行插入Sink表

1
2
3
4
5
INSERT INTO department_counts
SELECT
dept_id,
COUNT(*) as emp_count
FROM employee_information;

相关链接

flink-cdc-connectors

在未使用mongodb副本集引入事务能力前,我们来通过一些例子看看mongodb在没有事务的情况下的影响,并再一次从案例去验证强事务的系统是否适合使用mongodb,以及判断在引入副本集后实际的事务能力。

事务的原子性(Atomic)

背景

某一天,突然发现,我们的一个上传excel需求,上传后提示报错,但是数据正常上传成功了。

数据结构如下:文件表 + 订单表,订单表关联文件表ID

文件表

1
2
3
4
5
6
7
8

{
"_id": ObjectId("61ea6ee776d12fd2c261c105"),
"fileName": "2.xlsx",
"creator": ObjectId("5eba176654c70a2bc8df0719"),
"uploadDate": ISODate("2022-01-21T08:29:27.823Z"),
"creatorName": "张云",
}

订单表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"_id": ObjectId("61ea6d78703190d0efca84df"),
"org": "DKWSH",
"contact": "翁文斌",
"salesId": ObjectId("5ef3214dc15bc674d6976365"),
"remark": null,
"items": [
{
"detailId": "MX21305683",
"materialName": "PerCP anti-human CD11c",
"spec": "100 tests",
"itemNum": "337234",
"batchNumber": "B332675",
"amount": NumberInt("6"),
"manufacturer": "Biolegend"
}
],
"file": ObjectId("61ea6ee776d12fd2c261c105"),
}
1
2
3
4
5
6
7
8
9
// 创建文件
const file = await ctx.model.Delivery.File.create(files)

// 创建订单
const orders = await ctx.model.Delivery.VirtualOrder.create(orders)

// 发送通知
const sendRes = await ctx.app.noticeQueue.addBulk(datas)

排查

经过日志排查,发现 创建文件 创建订单都成功了,但是发送通知失败了,错误原因为redis版本过低导致异常无法正常发送。

理论

什么是事务的原子性

  • 一个事务包含多个操作,这些操作要么全都执行,要么全都不执行。
  • 实现事务的原子性,要支持回滚操作,在某个操作失败后,回滚到事务执行前的状态。

结论

我们可以理解为 创建文件 创建订单 发送通知这三个步骤是一个事务,要么全部成功,要们全部不执行,
发送通知失败的时候,我们应当将创建文件 创建订单进行回滚,从而达到 创建文件 创建订单 发送通知这三个步骤都不执行。

事务的隔离性(Isolation)

背景

某一天,销售反馈,我的确认操作无法提交了。

数据表如下:订单表 + 发票表 + 发票池表

订单确认操作过程
1、校验订单是否开过发票发票池
2、创建发票池表数据,而后创建发票表

发票表中invoiceNum具有唯一索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1.校验是否开过发票和发票池

// 2.创建开票池
const invoiceItemOpts = {
org: 'XXX',
order: 'XXX',
detailId: 'XXX',
materialName: 'XXX',
}
const invoiceItems = await ctx.model.Delivery.InvoiceItem.create(
invoiceItemOpts
)

// 3.创建发票
let invoiceOpts = {
invoiceNum: ctx.helper.createInvoiceNum('P', parseInt(next_invoice_num)),
}
const invoice = await ctx.model.Delivery.Invoice.create(invoiceOpts)

排查

经过排查,我们先发现数据库发票编号invoiceNum唯一索引报错了。说明多个请求拿到了同一个invoiceNum发票编号

1
2022-02-21 10:58:17,497 ERROR 30265 [-/116.233.76.38/-/25ms POST /orders/6212ff47fa481876394ee21c/status] error_handler: MongoError: E11000 duplicate key error collection: biocitydb.sys_invoices index: invoiceNum_1 dup key: { invoiceNum: "P2202211321" }

我们继续排查发现一共有2次请求,拿到了同一个invoiceNum发票编号,说明出现了并发问题

第一次请求,销售员王璐成功使用P2202211296发票编号创建了发票,未遇到唯一索引

第二次请求,销售员沈梦婷,因为在几乎同一时刻与销售员王璐发出请求,发票编号未有事务加锁,导致发生了脏读

注意看请求的时间与invoiceNum,发现请求时间几乎同一时刻,相同的发票编号。

1
2
3
4
2022-02-21 10:57:38,020 INFO 30265 发票invoiceOpts {
invoiceNum: 'P2202211296'
saleName: '王璐',
}
1
2
3
4
2022-02-21 10:57:38,022 INFO 30265 发票invoiceOpts {
invoiceNum: 'P2202211296',
saleName: '沈梦婷',
}

并发脏读图解:

T1 王璐 T2 沈梦婷
(1)读发票编号P2202211296
(2)创建发票池 (1)读发票编号P2202211296 -> T1未完成就读取现在的发票编号,导致脏读
(3)创建发票 (2)创建发票池
(4)更新当前发票自增编号 (3)创建发票
(4)更新当前发票自增编号

理论

  • 脏读
    事务A修改了一个数据,但未提交,事务B读到了事务A未提交的更新结果,如果事务A提交失败,事务B读到的就是脏数据。

  • 不可重复读
    同一事务中,对于同一份数据读取到的结果不一致。如事务B在事务A提交前后读取的数据不一致。
    原因:事务并发修改记录。
    解决:加锁。但这会导致锁竞争加剧,影响性能。另一种方法是通过MVCC可以在无锁的情况下,避免不可重复读。

  • 幻读
    同一事务中,同一个查询多次返回的结果不一致。如事务B在事务A提交前后查询到的数据记录变多了。
    原因:并发事务增加记录。
    解决:串行。

事务的隔离级别从低到高有:

  • Read Uncommitted

最低的隔离级别,什么都不需要做,一个事务可以读到另一个事务未提交的结果。所有的并发事务问题都会发生。

  • Read Committed

只有在事务提交后,其更新结果才会被其他事务看见。可以解决脏读问题。

  • Repeated Read

在一个事务中,对于同一份数据的读取结果总是相同的,无论是否有其他事务对这份数据进行操作,以及这个事务是否提交。可以解决脏读、不可重复读

  • Serialization

事务串行化执行,隔离级别最高,牺牲了系统的并发性。可以解决并发事务的所有问题

结论

  • 2个并发请求,导致出现事务的脏读问题,2个并发同时拿到了同一个自增编号(发票编号),mongodb支持的锁机制弱,无法使用悲观锁,虽然乐观锁无法解决脏读,但是可以使用乐观锁+事务回滚。可查看了没有mongodb事务的支持下,我这种思路的解决:分布式锁设计实践
  • 出现脏读问题后,因为数据库有唯一索引,创建失败后,出现多表操作的原子性问题。

事务的一致性(Consistency)

todo

事务的持久性(Durability)

todo

monstache实践

背景

我们已经通过 Enterprise Search 企业搜索实践快速搭建起了搜索引擎,
并且通过评估 mongodb同步elasticSearch方案评估,了解到社区和行业主流monstache同步方案。

我们按照Enterprise Search 企业搜索实践,先创建Engine Schema,提前设置好mapping字段。

设置字段
IRy42Q
查看字段数据
BivxFf

我们来实践一下monstache

monstache配置

假设我们已经有了mongodb和elasticsearch,我们来配置同步设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 启用调试日志
verbose = true

mongo-url = "mongodb://root:<password>@10.8.99.44:27011/?authSource=admin"
elasticsearch-urls = ["https://<host>:9200"]

# index GridFS files inserted into the following collections
file-namespaces = ["biocitydb.materials"]
# 此选项允许你直接将集合从 MongoDB 复制到 Elasticsearch。 Monstache 允许过滤实际索引到 Elasticsearch 的数据,因此你不一定需要复制整个集合。 在上面,我们同步数据库 test 中的 mycol 集合。
direct-read-namespaces = ["biocitydb.materials"]
# 实时通知以告知 Elasticsearch 所有写入文档,包括指定集合中的删除和更新。
change-stream-namespaces = ["biocitydb.materials"]

namespace-regex = '^biocitydb\.materials$'

# 压缩请求到es
gzip = true

# generate indexing statistics
stats = true

# index statistics into Elasticsearch
index-stats = true

elasticsearch-user = "elastic"
elasticsearch-password = "<password>"

#monstache最多开几个线程同步到es,默认为4
elasticsearch-max-conns = 2

# 证书文件
elasticsearch-pem-file = "/monstache/client.crt.pem"
elasticsearch-validate-pem-file = false

# mongodb删除集合或库时是否同步删除es中的索引
dropped-collections = true
dropped-databases = false

# 更新es而不是覆盖
index-as-update = true

replay = false

# 记录同步位点,便于下次从该位置同步
resume = true

# do not validate that progress timestamps have been saved
resume-write-unsafe = false

# 需要es ingest-attachment
index-files = false

# turn on search result highlighting of GridFS content
file-highlighting = true

# 高可用模式下需要配置集群名称,集群名称一样的进程会自动加入一个集群内,这个是monstance的集群,不是es
cluster-name = '<name>'

# do not exit after full-sync, rather continue tailing the oplog
exit-after-direct-reads = false

# 生产环境以日志文件输出,默认以命令行输出
# [logs]
# info = "./logs/info.log"
# warn = "./logs/wran.log"
# error = "./logs/error.log"
# trace = "./logs/trace.log"

# mapping定义mongodb数据到es的索引名称和type,namespace是库名.集合名
# 这里需要注意一件事:最好是在es中创建好你要的索引结构,关闭es的自动创建索引功能
[[mapping]]
namespace = "biocitydb.materials"
index = "materials"

[[script]]
namespace = "biocitydb.materials"
path = "./scripts/materials.js"
routing = true
  • [logs]: 记录错误信息
  • [[mapping]]: 改写默认的索引名称。在上面我们的索引名称为 mongodb
  • **[[script]]**:是一种中间件,能够转换,删除文档或定义索引元数据。 可以使用 Javascript 或 Golang 作为插件编写该中间件。

用于转换文档的脚本示例

1
2
3
4
5
module.exports = function (doc) {
delete doc._id;
//TODO
return doc;
}

同步完后,我们来看看同步的数据情况

正确同步了所有数据
8wwon6
正常搜索
nV8qgF

我们同时也评估了使用flinkCDC同步,可查看
Flink CDC实践mongodb到es

相关链接

同步 MongoDB 数据到 Elasticsearch

0%