背景

对于小变量,小数据集,需要和大数据集,大流进行联合计算的时候,往往把小数据集广播出去,整体直接和大数据集(流)的分布式最小粒度数据进行计算,最后把计算结果合并,这样效率更高,省去分布式节点之间的数据传输及二次计算。

例如:在Flink使用场景中,外部的配置文件或计算规则及维表等进行预加载,并定期更新,流式计算中广播小变量等场景。

场景预设

  • 流s1:用户行为日志(持续不断,同一个人会反复出现,次数不定)
  • 流s2:用户信息(姓名、年龄等信息,同一个数据只有一次,作为广播流)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.race.wc;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class broadcast {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// * 流s1:用户行为日志(持续不断,同一个人会反复出现,次数不定)
// * 流s2:用户信息(姓名、年龄等信息,同一个数据只有一次,作为广播刘)

// s1,行为日志流 id,eventId
DataStream<String> stream1 = env
.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.map(s -> {
String[] arr = s.split(",");
return Tuple2.of(arr[0], arr[1]);
}).returns(new TypeHint<Tuple2<String, String>>() {
});

// s2,用户信息流 id,age,city
DataStream<String> stream2 = env
.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<Tuple3<String, String, String>> s2 = stream2.map(s -> {
String[] arr = s.split(",");
return Tuple3.of(arr[0], arr[1], arr[2]);
}).returns(new TypeHint<Tuple3<String, String, String>>() {
});

// 将s2转换为广播流 MapStateDescriptor->hashMap

MapStateDescriptor<String, Tuple2<String, String>> userInfoStateDesc = new MapStateDescriptor<>("userInfoState", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
}));

BroadcastStream<Tuple3<String, String, String>> s2BroadcastStream = s2.broadcast(userInfoStateDesc);

// s1连接s2广播流
BroadcastConnectedStream<Tuple2<String, String>, Tuple3<String, String, String>> connect = s1.connect(s2BroadcastStream);

// s1 是否keyedStream 决定用 KeyedBroadcastProcessFunction 还是 BroadcastProcessFunction
// 左流、右流的处理
SingleOutputStreamOperator<String> resultStream = connect.process(new BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>() {


/**
* 方法:s1流每来一条处理一次
* @param element 左流数据
* @param readOnlyContext 上下文,只读形式不能做修改,安全
* @param collector 输出器
*/
@Override
public void processElement(Tuple2<String, String> element, BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {

// 取到广播状态,只读
ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = readOnlyContext.getBroadcastState(userInfoStateDesc);

if (broadcastState != null) {
// 获取广播状态的用户信息
Tuple2<String, String> userInfo = broadcastState.get(element.f0);
// 返回组装数据
collector.collect(element.f0 + "," + element.f1 + "," + (userInfo == null ? null : userInfo.f0) + "," + (userInfo == null ? null : userInfo.f1));
} else {
collector.collect(element.f0 + "," + element.f1 + "," + null + "," + null);
}
}

/**
* 方法:s2广播流处理的一条数据
* @param element 左流数据
* @param context 上下文
* @param collector 输出器
*/
@Override
public void processBroadcastElement(Tuple3<String, String, String> element, BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.Context context, Collector<String> collector) throws Exception {

// 从上下文中,获取广播状态
BroadcastState<String, Tuple2<String, String>> broadcastState = context.getBroadcastState(userInfoStateDesc);
// 然后将获得的数据 装入广播状态
broadcastState.put(element.f0, Tuple2.of(element.f1, element.f2));

}
});

resultStream.print();

env.execute();
}
}

总结

背景

共享单车及打车软件在国内较为普遍,如果数据完善,不失为一份分析的好素材,但是国内的数据确实太难寻觅,这里只找到了两份纽约的的行程数据

共享单车行程数据集:Citi Bikers
出租车行程数据集:New York City Taxi & Limousine Commission

本次以New York City Taxi作为示例数据进行分析

数据集

网站New York City Taxi & Limousine Commission提供了关于纽约市从2009-2015年关于出租车驾驶的公共数据集。

nycTaxiRides.gz

TaxiRides 行程信息。每次出行包含两条记录。type标识为 行程开始start 和 行程结束end。
数据集结构

1
2
3
4
5
6
7
8
9
10
11
rideId         : int(10)         // 唯一行程id
taxiId : int(10) // 出租车唯一id
driverId : int(10) // 出租车司机唯一id
type : varchar(5) // START行程开始,END行程结束 每次出行包含两条记录
startTime : datetime(6) // 行程开始时间
endTime : datetime(6) // 行程结束时间 对于行程开始记录该值为 1970-01-01 00:00:00
startLon : decimal(10,6) // 开始行程的经度
startLat : decimal(10,6) // 开始行程的纬度
endLon : decimal(10,6) // 结束的经度
endLat : decimal(10,6) // 结束的纬度
passengerCnt : int(3) // 乘客数量

TaxiRides 数据示例

rideId,type,startTime,endTime,startLon,startLat,endLon,endLat,passengerCnt,driverId,taxiId

8cUzkd

nycTaxiFares.gz

TaxiFares 费用信息。 与上面行程信息对应

1
2
3
4
5
6
7
8
rideId         : int(10)      // 唯一行程id
taxiId : int(10) // 出租车唯一id
driverId : int(10) // 出租车司机唯一id
startTime : datetime(6) // 行程开始时间
paymentType : varchar(6) // 现金(CASH)或刷卡(CARD)
tip : float // 小费
tolls : float // 过路费
totalFare : float // 总计车费

TaxiFares 数据示例

rideId,taxiId,driverId,startTime,paymentType,tip,tolls,totalFare

k0RZE3

目标

1、将每次车程的 TaxiRide 和 TaxiFare 记录依据相同的rideId连接在一起
2、对于每个不同的 rideId,恰好有三个事件:

  • TaxiRide START 事件
  • TaxiRide END 事件
  • 一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)

最终完成一个 RideAndFare 的输出

3、对这个输出进行统计过滤聚合完成案例要求

我们暂时假设数据流是通过kafka进行传输
FRPL8q

案例

基础操作:生成数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 定义出租车-车程数据源
KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_RIDE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("ride")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiRideDeserialization())
.build();

// 定义出租车-车费数据源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();

基础操作:过滤和连接

过滤

例如我们现在只想查看发生在纽约的行车记录。

1
2
3
4
5
6
7
8
9
10
11
public class GeoUtils {
// geo boundaries of the area of NYC
public static double LON_EAST = -73.7;
public static double LON_WEST = -74.05;
public static double LAT_NORTH = 41.0;
public static double LAT_SOUTH = 40.5;
public static boolean isInNYC(double lat, double lon) {
return !(lon > LON_EAST || lon < LON_WEST) &&
!(lat > LAT_NORTH || lat < LAT_SOUTH);
}
}

过滤器

1
2
3
4
5
6
7
private static class NYCFilter implements FilterFunction<TaxiRide> {  
@Override
public boolean filter(TaxiRide taxiRide) throws Exception {
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&
GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
}
连接

我们需要把TaxiRide和TaxiFare两者的数据记录结合。在这个过程中,我们要同时处理两个source的流数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.example.datastream.rideandfare;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;


public class RideAndFareJob {

public static void main(String[] args) throws Exception {

// 初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000L);
env.setStateBackend(new FsStateBackend("file:///mnt/data/checkpoints"));

env.setParallelism(2);

// 定义出租车-车程数据源
KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_RIDE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("ride")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiRideDeserialization())
.build();

// 定义出租车-车费数据源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();


// 从车程事件中过滤类型为Start的车程数据,并按车程标识 rideId 分组
KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
.filter(ride -> ride.type==='START').keyBy(TaxiRide::getRideId);

// 付车费事件按行程标识 rideId 分组
KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
.keyBy(TaxiFare::getRideId);

rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
.uid("enrichment") // uid for this operator's state
.name("enrichment") // name for this operator in the web UI
.addSink(new PrintSinkFunction<>());


env.execute("Join Rides with Fares");
}
}

案例3:每种乘客数量的行车事件数

我们的另一个需求是计算搭载每种乘客数量的行车事件数。也就是搭载1个乘客的行车数、搭载2个乘客的行车… 当然,我们仍然只关心纽约的行车事件。
TODO:

案例4:每5分钟的进入的车辆数

为了持续地监测纽约的交通流量,需要计算出每个区块每5分钟的进入的车辆数。我们只关心至少有5辆车子进入的区块。
TODO:

案例5:收入最高出租车司机

我们想统计每个小时收入topN的司机
TODO:

相关链接

flink-training
ververica
flink-learning-in-action

http://wuchong.me/blog/2019/08/20/flink-sql-training/
https://www.cnblogs.com/bjwu/p/9973521.html
https://article.itxueyuan.com/0Kp2pR
https://www.cnblogs.com/luxh/p/16427196.html

背景

Flink中的时间概念

Flink在流处理程序支持不同的时间概念。分别为Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间。

从时间序列角度来说,发生的先后顺序是:

1
事件时间(Event Time)----> 提取时间(Ingestion Time)----> 处理时间(Processing Time)
  • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入Apache Flink流处理系统的时间,也就是Flink读取数据源时间。
  • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是Flink程序处理该事件时当前系统时间。

mU8ra5

处理时间

是数据流入到具体某个算子时候相应的系统时间。

这个系统时间指的是执行相应操作的机器的系统时间。当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。

提取时间

提取时间在概念上位于事件时间和处理时间之间。IngestionTime是数据进入Apache Flink框架的时间,提取时间在概念上位于事件时间和处理时间之间。

事件时间

事件时间就是事件在真实世界的发生时间,即每个事件在产生它的设备上发生的时间(当地时间)。比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。
基于事件时间处理的强大之处在于即使在乱序事件,延迟事件,历史数据以及从备份或持久化日志中的重复数据也能获得正确的结果。对于事件时间,时间的进度取决于数据,而不是任何时钟。

事件时间程序必须指定如何生成事件时间的Watermarks,这是表示事件时间进度的机制。

现在假设我们正在创建一个排序的数据流。这意味着应用程序处理流中的乱序到达的事件,并生成同样事件但按时间戳(事件时间)排序的新数据流。

比如:

1
2
3
有1~10个事件。
乱序到达的序列是:1,2,4,5,6,3,8,9,10,7
经过按 事件时间 处理后的序列是:1,2,3,4,5,6,7,8,9,10

为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每条数据都需要分配其事件时间戳。这通常通过提取每条数据中的固定字段来完成时间戳的获取。

watermark(水位线)

Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。

比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。

简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

例如:

1
2
3
4
5
使用基于事件时间的窗口策略,每5分钟创建一个不重叠(或翻滚)的窗口并允许延迟1分钟。

假定目前是12:00。

当具有落入该间隔的时间戳的第一个元素到达时,Flink将为12:00到12:05之间的间隔创建一个新窗口,当水位线(watermark)到12:06时间戳时将删除它。

window(窗口)

  • 滚动窗口(Tumbling Windows)
    滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为5分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。

  • 滑动窗口(Sliding Windows)
    滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。
    例如,你可以使用窗口大小为10分钟的窗口,滑动大小为5分钟。这样,每5分钟会生成一个窗口,包含最后10分钟内到达的事件。

  • 会话窗口(Session Window)
    会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。
    例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。

  • 全局窗口(Global Window)

如何处理watermark & window 的触发

对于out-of-order(乱序)及正常的数据而言

  • watermark的时间戳 > = window endTime
  • 在 [window_start_time,window_end_time] 中有数据存在。

对于late element太多的数据而言

  • Event Time > watermark的时间戳

看看如何触发窗口
我们明白了窗口的触发机制,这里我们添加了水位线,到底是个怎么个情况?我们来看下面
假如我们设置10s的时间窗口(window),那么0-10s,10-20s都是一个窗口,以0~10s为例,0为start-time,10为end-time。
假如有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒

1
2
3
4
当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算
当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算

触发计算的时候,会将A,C(因为他们都小于10)都计算进去,其中C是迟到的。
max这个很关键,就是当前窗口内,所有事件的最大事件。
这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算。假设加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。
从这里上面E的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。

时间语义示例

1.12之前的语义(已deprecated)

1.12以前,flink默认以processing time作为默认的时间语义,可以在env上设置所想要的时间语义;

1
2
3
4
5
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

1.12以后的语义

1.12及以后,flink默认以上述的event time 作为默认的时间语义(已deprecated)
再需要指定时间语义的相关操作(如时间窗口)时,可以通过显式的api来使用特定的时间语义

  • 滚动窗口
    1
    2
    keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
    keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  • 滑动窗口
    1
    2
    keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))
    keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)))

如果要禁用event time机制,则可以通过设置watermark生成评率间隔来实现

1
ExecutionConfig.setAutoWatermarkinterval(long);

watermark生成策略

1.12之前(已过期)

  • AssignerWithPeriodicWatermarks 周期性生成watermark
  • AssignerWithPunctuatedWatermarks 按指定标记性时间生成watermark

1.12之后,新版api内置的watermark策略

  • 单调递增的watermark生成策略 (完全不容忍乱序)
    czSNzw
    WatermarkStrategy.forMonotonousTimestamps();
  • 允许乱序的watermark生成策略
    W2gSLW
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
  • 自定义watermark生成策略
    WatermarkStrategy.forGenerator(new WatermarkGenerator(){...})

watermark策略的代码模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1,e01,16876768678,pg01
DataStreamSource<String> s1 = env.socketTextStream( hostname: "localhost",port: 9999) ;

//给上面的source 算子,添加watermark生成策略

//策略1: WatermarkStrategy.nowatermarks() 不生成watermark,禁用了事件时间的推进机制
//策略2: WatermarkStrategy. forMonotonousTimestamps() 有序,紧跟最大事件事件
//策略3: WatermarkStrategy.forBoundedOutOfOrderness() 无序
//策略4: WatermarkStrategy.forGenerator() 自定义

s1.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner((element, timestamp) -> Long.parseLong(element.split(regex:",")[2]));
)

窗口的代码模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 一、各种全局窗口开窗api
*/

// 全局 计数滚动窗口
beanStream.countWindowAll(10) // 10条数据一个窗口
.apply(new AllWindowFunction<EventBean2, String, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<EventBean2> values, Collector<String> out) throws Exception {

}
});


// 全局 计数滑动窗口
beanStream.countWindowAll(10, 2); // 窗口长度为10条数据,滑动步长为2条数据
/*.apply()*/


// 全局 事件时间滚动窗口
beanStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) // 窗口长度为30s的滚动窗口
.apply(new AllWindowFunction<EventBean2, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<EventBean2> values, Collector<String> out) throws Exception {

}
});


// 全局 事件时间滑动窗口
beanStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))); // 窗口长度:30s,滑动步长:10s
/*.apply()*/

// 全局 事件时间会话窗口
beanStream.windowAll(EventTimeSessionWindows.withGap(Time.seconds(30))); // 前后两个事件的间隙超过30s就划分窗口
/*.apply()*/

// 全局 处理时间滚动窗口
beanStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30)));


// 全局 处理时间滑动窗口
beanStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));

// 全局 处理间会话窗口
beanStream.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(30)));


/**
* 二、各种Keyed窗口开窗api
*/

KeyedStream<EventBean2, Long> keyedStream = beanStream.keyBy(EventBean2::getGuid);

// Keyed 计数滚动窗口
keyedStream.countWindow(10);


// Keyed 计数滑动窗口
keyedStream.countWindow(10, 2);


// Keyed 事件时间滚动窗口
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)));


// Keyed 事件时间滑动窗口
keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)));


// Keyed 事件时间会话窗口
keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(30)));


// Keyed 处理时间滚动窗口
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));


// Keyed 处理时间滑动窗口
keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));


// Keyed 处理时间会话窗口
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)));

rDT8Lc
Flink 根据使用的便捷性和表达能力的强弱提供了 3 层 API,由上到下,表达能力逐渐增强,比如 processFunction,是最底层的 API,表达能力最强,我们可以用他来操作 state 和 timer 等复杂功能。
Datastream API 相对于 processFunction 来说,又进行了进一步封装,提供了很多标准的语义算子给大家使用,比如我们常用的 window 算子(包括 Tumble, slide,session 等)。
最上面的 SQLTable API 使用最为便捷,具有自身的很多特点:
YZGCfu

我们从DataStream API、Table API、SQL依次看看使用方式

DataStream API

构成步骤

  • 一、获取执行环境(execution environment)
  • 二、读取数据源(source)
  • 三、定义给予数据的转换操作(transformations)
  • 四、定义计算结果的输出位置(sink)
  • 五、触发程序执行(execute)

示例1:不同的数据来源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.race.wc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;

public class sourceTest {
public static void main(String[] args) throws Exception {

// 一、获取执行环境(execution environment)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 二、读取数据源(source)
// ①. 从文件中读取数据
DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");

// ②. 从集合中读取数据
ArrayList<Integer> nums = new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource<Integer> numStream = env.fromCollection(nums);

ArrayList<Event> events = new ArrayList<>();
events.add(new Event("Marry", "./home", 1000L));
DataStreamSource<Event> stream2 = env.fromCollection(events);

// ③. 从元素读取数据
DataStreamSource<Event> stream3 = env.fromElements(
new Event("Marry", "./home", 1000L),
new Event("Marry", "./home", 1000L),
new Event("Marry", "./home", 1000L)
);

// ④. 从socket文本流获取数据 需打开nc -l 9999
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);

// ⑤. 从kafka读取数据
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

// 三、定义给予数据的转换操作(transformations)

// 四、定义计算结果的输出位置(sink)
stream1.print("1");
numStream.print("nums");
stream2.print("2");
stream3.print("3");
socketStream.print("socket");
kafkaStream.print("kafka");

// 五、触发程序执行(execute)
env.execute();
}
}

示例2:定义给予数据的转换操作(transformations)

1、演示 flatMap、keyBy、sum的转换操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.race.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 一、获取执行环境(execution environment)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 二、读取数据源(source)
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
// 三、定义给予数据的转换操作(transformations)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);

// 四、定义计算结果的输出位置(sink)
dataStream.print();

// 五、触发程序执行(execute)
env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

flatMap、keyBy都可以使用 Lambda 表达式,更直观简洁
注意 :使用 lambda表达式申明Java泛型时,需要显式声明类型信息 .returns(Types.TUPLE(Types.STRING, Types.LONG))

E4f7ct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap((String sentence, Collector<Tuple2<String, Integer>> out)->{
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG)) // // 显式提供类型信息
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);

dataStream.print();

env.execute("Window WordCount");
}

2、演示 max,maxBy,min,minby的转换操作

  • max返回最大值
    如果使用max,除了keyBy的字段和参与比较大小的字段,如果还有其他字段,为了返回结果对齐,返回的结果的其他字段就是第一次出现的字段
  • maxBy 把最大值对应的元素全部返回
    使用maxBy,除了keyBy的字段和参与比较大小的字段,如果还有其他字段,会返回最大值所在数据全部的数据,另外,maxBy有第二个的参数,用来确定当比较字段出现相同时,返回之前的还是现在的,默认返回之前的,设置为false则返回新的,设置为true返回之前的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class TSource {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List list = new ArrayList<Tuple3<Integer, Integer, String>>();
list.add(new Tuple3<>(0, 1, "a"));
list.add(new Tuple3<>(0, 3, "b"));
list.add(new Tuple3<>(0, 2, "c"));
list.add(new Tuple3<>(0, 4, "d"));
list.add(new Tuple3<>(1, 5, "a"));
list.add(new Tuple3<>(1, 2, "b"));
list.add(new Tuple3<>(1, 7, "c"));

DataStreamSource<Tuple3<Integer, Integer, String>> stringDataStreamSource = env.fromCollection(list);

KeyedStream<Tuple3<Integer, Integer, String>, Integer> result = stringDataStreamSource
.keyBy(0);

result.max(1).print("max最大值");
result.maxBy(1).print("maxBy元素");

// min,minBy同理

env.execute("测试");
}
}

原数据:

原数据:3> (0,1,a)
原数据:3> (0,3,b)
原数据:3> (0,2,c)
原数据:3> (0,4,d)
原数据:3> (1,5,a)
原数据:3> (1,2,b)
原数据:3> (1,7,c)


返回结果:
max最大值:3> (0,1,a)
max最大值:3> (0,3,a)
max最大值:3> (0,3,a)
max最大值:3> (0,4,a)
max最大值:3> (1,5,a)
max最大值:3> (1,5,a)
max最大值:3> (1,7,a)

maxBy元素:3> (0,1,a)
maxBy元素:3> (0,3,b)
maxBy元素:3> (0,3,b)
maxBy元素:3> (0,4,d)
maxBy元素:3> (1,5,a)
maxBy元素:3> (1,5,a)
maxBy元素:3> (1,7,c)

Table API & SQL

构成步骤

  • 一、创建表环境
  • 二、创建输入表 source
  • 三、创建输出表 sink
  • 四、查询转换得到一个新的表
  • 五、写入输出表

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 创建表环境
//方式1,使用setting的方式创建
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // 使用流处理模式
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// 方式2, 使用StreamExecutionEnvironment来构建table env
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 创建输入表,连接外部系统读取数据
tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )");

// 注册一个表,连接到外部系统,用于输出
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");

// 方式1、执行 SQL 对表进行查询转换,得到一个新的表
// Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");

// 方式2、使用 Table API 对表进行查询转换,得到一个新的表
Table table2 = tableEnv.from("inputTable").select(...);

// 将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");

背景

1
2
3
4
5
6
7
8
9
10
11
12
version: '3'
services:
mysql:
restart: always
image: mysql:5.7.23
container_name: mysql
ports:
- 3306:3306
volumes:
- ./conf:/etc/mysql/conf.d
- ./logs:/var/log/mysql
- ./data:/var/lib/mysql

部署mysql的时候,将mysql目录进行挂载时提示

1
cannot read directory '/var/lib/mysql/': Permission denied

原因

我们聚集到entrypoint.sh这个文件,因为docker的镜像变成运行态的时候,也就是容器的时候,是需要命令来启动的,这个启动的命令就是entrypoint.sh,也就是说,启动的这个文件肯定是篡改了原来的目录,下面具体看下这个文件的某个关键代码片段

1
2
3
4
5
6
7
8
# allow the container to be started with `--user`
if [ "$1" = 'mysqld' -a -z "$wantHelp" -a "$(id -u)" = '0' ]; then
_check_config "$@"
DATADIR="$(_get_config 'datadir' "$@")"
mkdir -p "$DATADIR"
chown -R mysql:mysql "$DATADIR"
exec gosu mysql "$BASH_SOURCE" "$@"
fi

这段代码的意思是说,该程序运行指定用户来启动mysql,但是如果你不指定用户,他会默认用mysql用户来给DATADIR赋权限,并用mysql用户来执行脚本,所以实际上,用户就变mysql了,但是为啥在宿主机上,却是polkitd呢?

F5qwr7

1
2
// 容器内查看用户名为`mysql` 对应的用户ID和组ID为`999`
cat /etc/passwd | grep mysql

WlHpMT

1
2
// 宿主机器查看999所对应的用户名
cat /etc/passwd | grep 999

如上图,可以看到,在容器内部,他的用户为mysql,他的用户ID为999,然后退出容器,在宿主机上,可以看到ID为999的用户ID对应的用户变成了polkitd,所以,到了这里就明白了,实际上容器内部和外部是用的同一套用户,名字可能不同,但是ID用的是同一个,从而导致,ID虽然相同,但是用户不一致,从而权限也出现了差别。

1、其实在操作系统中,真正决定用户和用户组的东西并不是用户名和组名,而是相应的用户id 和 对应的组id,当我们刚创建用户时系统就会给用户分配对应的用户id 和 组id,可以在/etc/passwd中查看

2、在遇到的问题描述中,因为容器中只创建了一个mysql用户和组,因此容器中查看到mysql用户id和组id为999:999,因此给/var/lib/mysql目录用户权限时其实是给予的用户id和组id为999:999权限,因此,在宿主机中./data此目录的用户权限也变为999:999,因为宿主机中此用户id和组id对应的为polkitd,因此就显示为polkitd。

解决

方法1 推荐

在配置my.cnf时,指定error-log的位置在/var/log/下,否则error的默认位置为例如/var/lib/mysql下的mysqld.log文件因为目录映射后有权限问题,写入不了日志。

方法2

宿主机器更改与docker相同的用户ID和组id
chown 999

方法3

同步宿主/etc/passwd到容器内,从而达到宿主和容器内相同的用户id和组ID

1
2
3
4
5
6
7
8
9
10
11
[root@localhost ~]# getenforce
Enforcing
[root@localhost ~]# setenforce 0
[root@localhost ~]# getenforce
Permissive
[root@localhost ~]# systemctl restart docker
[root@localhost ~]# vi /etc/selinux/config
#修改 SELINUX=disabled
[root@localhost ~]# init 6
#重启系统

实验

一直以来比较困惑容器挂载卷的文件权限问题,今天特地梳理下docker的挂载卷权限问题

首先,本地挂载点新建测试文件test.txt,然后将该目录挂载到容器的test目录
e7n7BA
可以看到我们在容器外新建的test.txt文件在容器内的所有者变成了1000,然后我们在容器内新建文件查看容器外的权限,发现容器中新建的test1.txt所有者变成了root
AfaH0v

接着查看下UID:1000所属用户
olz9nC
最后我们在指定wag用户启动docker,然后在容器内新建test3.txt文件,查看文件所有者
4fr3ct

从以上可以得出,docker启动容器如果不指定用户,会默认以root(UID=0)方式运行,导致其中新建的文件所有者映射到容器外为root,容器外新建的文件映射到容器内所有者UID不变。

制作Doris镜像

我们在OLAP之Doris编译的基础上,开始制作docker镜像

WxlWez

我们为我们的目录增加Dockerfile_feDockerfile_be两个Dockerfile文件

  • fe镜像Dockerfile(cd到编译好的output目录)

Dockerfile_fe

1
2
3
4
5
6
FROM primetoninc/jdk:1.8
# RUN yum install net-tools -y
COPY fe /opt/fe
WORKDIR /opt/fe
EXPOSE 8030 9030
ENTRYPOINT ["/opt/fe/bin/start_fe.sh"]

构建fe镜像,创建并配置镜像映射文件doris-meta和conf,启动容器

1
docker build -t doris-fe:0.15.0 -f Dockerfile_fe .
  • be镜像Dockerfile

Dockerfile_be

1
2
3
4
5
6
FROM primetoninc/jdk:1.8
# RUN yum install net-tools -y
COPY be /opt/be
WORKDIR /opt/be
EXPOSE 9050
ENTRYPOINT ["/opt/be/bin/start_be.sh"]

构建be镜像,配置be镜像映射文件storage,启动3个be容器组成集群。Doris默认至少安装3个be实例。

1
docker build -t doris-be:0.15.0 -f Dockerfile_be .

需要使用镜像可从官方pull,已经上传最新

1
2
docker pull bulolo/doris-fe:0.15.0
docker pull bulolo/doris-be:0.15.0

docker运行

docker run

不推荐使用docker run,因为还要进入容器内查看IP,再添加对应ip的backends

FE 运行

1
docker run -itd --name fe_1 -p 8030:8030 -p 9030:9030 -v <LOCAL_PATH>/fe_1/conf:/opt/fe/conf -v <LOCAL_PATH>/fe_1/log:/opt/fe/log -v <LOCAL_PATH>/fe_1/doris-meta:/opt/fe/doris-meta doris-fe:0.15.0

BE 运行

1
2
3
docker run -itd --name be_1 -p 9150:9050 -v <LOCAL_PATH>/be_1/conf:/opt/be/conf -v <LOCAL_PATH>/be_1/storage:/opt/be/storage doris-be:0.15.0
docker run -itd --name be_2 -p 9250:9050 -v <LOCAL_PATH>/be_2/conf:/opt/be/conf -v <LOCAL_PATH>/be_2/storage:/opt/be/storage doris-be:0.15.0
docker run -itd --name be_3 -p 9350:9050 -v <LOCAL_PATH>/be_3/conf:/opt/be/conf -v <LOCAL_PATH>/be_3/storage:/opt/be/storage doris-be:0.15.0

docker-compose

推荐使用docker-compose

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
version: '3.7'

services:
doris-fe1:
image: doris-fe:0.15.0
container_name: doris-fe1
ports:
- 8030:8030
- 9030:9030
ulimits:
nofile:
soft: "65536"
hard: "65536"
volumes:
- ./fe_1/conf:/opt/fe/conf
- ./fe_1/log:/opt/fe/log
- ./fe_1/doris-meta:/opt/fe/doris-meta
networks:
doris-network :
ipv4_address: 172.66.0.100
doris-be1:
image: doris-be:0.15.0
container_name: doris-be1
ports:
- 9150:9050
ulimits:
nofile:
soft: "65536"
hard: "65536"
volumes:
- ./be_1/conf:/opt/be/conf
- ./be_1/log:/opt/be/log
- ./be_1/storage:/opt/be/storage
networks:
doris-network :
ipv4_address: 172.66.0.101
doris-be2:
image: doris-be:0.15.0
container_name: doris-be2
ports:
- 9250:9050
ulimits:
nofile:
soft: "65536"
hard: "65536"
volumes:
- ./be_2/conf:/opt/be/conf
- ./be_2/log:/opt/be/log
- ./be_2/storage:/opt/be/storage
networks:
doris-network :
ipv4_address: 172.66.0.102
doris-be3:
image: doris-be:0.15.0
container_name: doris-be3
ports:
- 9350:9050
ulimits:
nofile:
soft: "65536"
hard: "65536"
volumes:
- ./be_3/conf:/opt/be/conf
- ./be_3/log:/opt/be/log
- ./be_3/storage:/opt/be/storage
networks:
doris-network :
ipv4_address: 172.66.0.103

networks:
doris-network:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.66.0.0/16
gateway: 172.66.0.1

说明:

ipv4_address:容器绑定固定ip
ulimits:设置系统最大打开文件句柄数,就是:

1
2
3
vi /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536

fe.conf
根据ip修改priority_networks

1
priority_networks = 172.66.0.100/16

be.conf
根据ip修改priority_networks

1
priority_networks = 172.66.0.101/16

执行docker-compose up -d

在 FE 中添加所有 BE 节点。本地需要安装mysql,Doris实现mysql协议,使用mysql客户端登录fe,默认用root密码为空。

1
2
3
4
mysql -P9030 -uroot -p
ALTER SYSTEM ADD BACKEND "172.66.0.101:9050";
ALTER SYSTEM ADD BACKEND "172.66.0.102:9050";
ALTER SYSTEM ADD BACKEND "172.66.0.103:9050";

修改密码

1
SET PASSWORD FOR 'root' = PASSWORD('123456'); 

使用 mysql-client 连接到 FE,并执行 SHOW PROC '/backends'; 查看 BE 运行情况。如一切正常,isAlive 列应为 true。
1d9Van
查看 Follower 或 Observer 运行状态。使用 mysql-client 连接到任一已启动的 FE,并执行:SHOW PROC '/frontends'; 可以查看当前已加入集群的 FE 及其对应角色。
vinOjZ
至此Doris安装完成,portal页面地址:http://localhost:8030/

打开后输入root和密码进入,如果没有修改过密码,则不填写密码

b03x7C

查看FE

mqdZ1G

查看BE

Is2zh8

基础使用

  • 添加删除查看FE
    1
    2
    3
    4
    ALTER SYSTEM ADD FOLLOWER "hostname:9050";
    ALTER SYSTEM DROPP FOLLOWER "hostname:9050";
    SHOW PROC '/frontends';
    show backends \G
  • 增加删除查看BE
    1
    2
    3
    4
    5
    ALTER SYSTEM ADD BACKEND "hostname:9050";
    ALTER SYSTEM DROPP BACKEND "hostname:9050"; // 不推荐
    ALTER SYSTEM DECOMMISSION BACKEND "hostname:9050"; 推荐
    SHOW PROC '/backends';
    SHOW PROC '/backends'\G
  • 创建数据库
    1
    create database doris;
  • 创建用户
    1
    create user 'doris' identified by 'password';
  • 赋权
    1
    grant all on doris to doris;
  • 创建表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    create table table1(
    id int default '0',
    name varchar(32) default '',
    city_code smallint,
    pv bigint sum default '0'
    )
    aggregate key(id, name, city_code)
    distributed by hash(id) buckets 10
    properties('replication_num' = '3');
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    create table table2(
    id int default '0',
    name varchar(32) default '',
    city_code smallint,
    event_day date,
    pv bigint sum default '0'
    )
    aggregate key(id, name, city_code, event_day)
    partition by range(event_day)
    (
    partition p202107 values less than ('2021-08-01'),
    partition p202108 values less than ('2021-09-01'),
    partition p202109 values less than ('2021-10-01')
    )
    distributed by hash(id) buckets 10
    properties('replication_num' = '3');
  • 插入表
    1
    insert into table1(id, name, city_code, pv) values(2, 'grace', 1, 2),(5, 'helen', 3, 3),(3, 'tom', 2, 2);

相关链接

OLAP之Doris编译

背景及目标

1.尝试 CDC mysql 数据到doris
2.通过网站访问pv,使用doris实践对数据的PV进行预聚合

源表

  • 创建Mysql数据库表

    1
    2
    3
    4
    5
    6
    CREATE TABLE `mysql_pv` (
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT DEFAULT '0'
    );
  • 创建doris表

    这里对PV进行SUM预聚合

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE TABLE doris_pv
    (
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
    )
    AGGREGATE KEY(siteid, citycode, username)
    DISTRIBUTED BY HASH(siteid) BUCKETS 10
    PROPERTIES("replication_num" = "1");

映射表

source

mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE mysql_pv_source ( 
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'demo',
'table-name' = 'offices'
);

sink

doris

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE TABLE doris_pv_sink (
siteid INT,
citycode SMALLINT,
username STRING,
pv BIGINT
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_pv',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
)

执行

1
INSERT INTO doris_pv_sink select siteid,citycode,username,pv from mysql_pv_source

相关链接

使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

背景

OLAP下,我选doris,来试试如何安装。

编译

6cm3Lk
根据自己的情况选择对应的环境,我这里按照0.15为例

docker pull apache/incubator-doris:build-env-for-0.15.0

运行镜像

方式一:提前下载好源码,挂载到镜像中

1
docker run -it -v /your/local/.m2:/root/.m2 -v /your/local/incubator-doris-DORIS-x.x.x-release/:/root/incubator-doris-DORIS-x.x.x-release/ apache/incubator-doris:build-env-for-0.15.0

这条命令挂载了源码,和maven的本地库,可以避免一些包每次都要下载,其中:
/your/local/.m2: 本地maven仓库的地址
/your/local/incubator-doris-DORIS-x.x.x-release/:源码的路径
/root/incubator-doris-DORIS-x.x.x-release/:源码挂载后的目录名
apache/incubator-doris:build-env-for-0.15.0:刚才拉取的环境

方式二:在docker中拉取源码

1
2
3
4
5
6
7
8
9
① docker run -it apache/incubator-doris:build-env-for-0.15.0

②wget wget https://dist.apache.org/repos/dist/dev/incubator/doris/0.15/0.15.0-rc04/apache-doris-0.15.0-incubating-src.tar.gz    

这个地址只是0.15版本的doris

或者还可以视同git拉取指定版本的源码

git clone --branch branch-0.15 https://github.com/apache/incubator-doris.git

如果是最新主干版本代码,使用 apache/incubator-doris:build-env-latest进行编译
2grt5S

更改jdk版本

从 build-env-1.3.1 的docker镜像起,同时包含了 OpenJDK 8 和 OpenJDK 11,并且默认使用 OpenJDK 11 编译。请确保编译使用的 JDK 版本和运行时使用的 JDK 版本一致,否则会导致非预期的运行错误。你可以使用在进入编译镜像的容器后,使用以下命令切换默认 JDK 版本:
切换到 JDK 8:

1
2
3
$ alternatives --set java java-1.8.0-openjdk.x86_64
$ alternatives --set javac java-1.8.0-openjdk.x86_64
$ export JAVA_HOME=/usr/lib/jvm/java-1.8.0

切换到 JDK 11:

1
2
3
$ alternatives --set java java-11-openjdk.x86_64
$ alternatives --set javac java-11-openjdk.x86_64
$ export JAVA_HOME=/usr/lib/jvm/java-11

编译

1
sh build.sh

注意:
如果你是第一次使用 build-env-for-0.15.0 或之后的版本,第一次编译的时候要使用如下命令:
sh build.sh --clean --be --fe --ui
这是因为 build-env-for-0.15.0 版本镜像升级了 thrift(0.9 -> 0.13),需要通过 –clean 命令强制使用>新版本的 thrift 生成代码文件,否则会出现不兼容的代码。

注:编译整个过程大概2小时以上,是极为痛苦的。T T,本人编译超过10次以上,2台电脑开docker编译,编译过程,遇到多次fail退出,大部分问题是docker内存不足导致,所以务必将docker的内存调到至少10GB,否则一旦编译内存不足就会失败。

2017款macbook pro 编译成功
2022款macbookpro M1 pro 编译失败

痛哭流涕,终于编译完成,见到这个编译完成命令。

eBvN36

进入到output目录,可以看到be,fe,udf三个文件夹,接下来就是安装部署了

我们看到整个doris社区很多人想体验Doris进行尝鲜,但是苦于环境以及服务器的限制,那么我们来制作一下Doris的镜像及尝试用docker搭建集群

OLAP之Doris的docker镜像和集群搭建

Sf0NUr

背景

在OLAP的选型上,陆陆续续有1年多了。做个总结。

OLTP

OLTP:On-Line Transaction Processing,联机(线上)事务处理。指处理事务型应用的场景,事务这个词的英文是 Transaction,其实就是交易(一般指金融交易)。这种场景有个最常见的业务,就是转账,从一个账户转到另一个账户。一般要求实时处理,对响应的速度要求很高,并且要保证事务的 ACID 特性。面向 OLTP 场景的数据库管理系统就叫做 OLTP DBMS。一般涉及大量的增删改操作。

OLTP 与 NSM(N-ary storage model)行存储: OLTP 的场景一般需要一次操作一个对象的多个属性,比如查询一个人的姓名、银行账号、余额等。而 NSM 这种将一个对象的多个属性连续存储的行式存储模型就很适合 OLTP 的场景了。同时 NSM 也适用于写密集场景,一个对象的写入只需要一次写磁盘就能完成。

传统的关系数据库都是面向 OLTP 场景的,如 Oracle 通常用在银行系统、医疗系统等对操作的响应速度要求很高的场景。

OLAP

OLAP:On-Line Analytical Processing,联机(线上)分析处理。指处理分析型应用的场景。进入大数据时代,数据多了,计算机计算能力增强,并出现了分布式存储、分布式计算等技术,人们开始对大量的数据有分析的需求了。这种分析型的场景一般需要查询大量的数据进行分析,对速度的要求没有 OLTP 高,每天晚上或每周做一次,慢慢分析就好了。一般涉及大量的查询操作,对数据的修改需求不高。

列存数据库等就是面向 OLAP 的,因此,列式存储在大数据时代这种分析型场景中火了一把,如数据仓库 HBase。

OLAP的12准则

  • 准则1 OLAP模型必须提供多维概念视图
  • 准则2 透明性准则
  • 准则3 存取能力准则
  • 准则4 稳定的报表能力
  • 准则5 客户/服务器体系结构
  • 准则6 维的等同性准则
  • 准则7 动态的稀疏矩阵处理准则
  • 准则8 多用户支持能力准则
  • 准则9 非受限的跨维操作
  • 准则10 直观的数据操纵
  • 准则11 灵活的报表生成
  • 准则12 不受限的维与聚集层次

OLAP场景的关键特征

  • 大多数是读请求
  • 数据总是以相当大的批(> 1000 rows)进行写入
  • 不修改已添加的数据
  • 每次查询都从数据库中读取大量的行,但是同时又仅需要少量的列
  • 宽表,即每个表包含着大量的列
  • 较少的查询(通常每台服务器每秒数百个查询或更少)
  • 对于简单查询,允许延迟大约50毫秒
  • 列中的数据相对较小:数字和短字符串(例如,每个URL 60个字节)
  • 处理单个查询时需要高吞吐量(每个服务器每秒高达数十亿行)
  • 事务不是必须的
  • 对数据一致性要求低
  • 每一个查询除了一个大表外都很小
  • 查询结果明显小于源数据,换句话说,数据被过滤或聚合后能够被盛放在单台服务器的内存中

场景示例:

  • 市场营销:当 OLAP 用于营销时,它使营销分析师能够更多地了解他们的客户、哪些产品是有价值的、区域和季节性趋势等等。
  • 医疗保健:医疗保健数据仓库使用 OLAP 可用于预测健康风险和结果、与保险公司共享信息并生成报告。
  • 金融服务:公司首席财务官可以利用 OLAP 多维数据集为他们提供将数据转换为所需信息的方法,同时允许他们轻松生成定制财务报告。

OLAP分析的分类:ROLAP与MOLAP

  • ROLAP(RelationalOLAP)

这是一种通过在RDMS后端服务和客户前端之间建立中间层的OLAP实现方式。通过RDMS来存储和管理数据仓库数据,而通过OLAP中间件来实现多维数据上的操作映射为标准关系操作。其优点在于可以利用RMDS中本身固有的一些功能,例如: 本质上来讲 “slicing and dicing”的操作等同于在SQL语句中添加“WHERE”子句。

  • MOLAP(MultidimensionalOLAP)

这些服务器通过基于数据的多维存储引擎,支持数据的多维视图。能够将多维视图直接映射到数据立方体数组结构。其数据都存在多维数据立方体(multidimensional cube)中,以专有的格式存储。使用数据立方体的优点是能够对预计算的汇总数据进行快速索引,尤其是对”slicing and dicing”有着非常优秀的支持。

  • HOLAP(HybridOLAP)混合型OLAP

HOLAP结合了ROLAP和MOALP技术,从而继承了ROLAP的伸缩性强和MOLAP快速计算的优点。例如HOLAP利用多维数据集技术来提高性能,而当需要详细数据时,HOALP可以从多维数据“钻取”到底层的RDMS中去获取数据。

名称 描述 细节数据存储位置 聚合后的数据存储位置
ROLAP(Relational OLAP) 基于关系数据库的OLAP实现 关系型数据库 关系型数据库
MOLAP(Multidimensional OLAP) 基于多维数据组织的OLAP实现 数据立方体 数据立方体
HOLAP(Hybrid OLAP) 基于混合数据组织的OLAP实现 关系型数据库 数据立方体

对比

o0tsmp

联机分析处理(OLAP,On-line Analytical Processing),数据量大,DML少。使用数据仓库模板
联机事务处理(OLTP,On-line Transaction Processing),数据量少,DML频繁。使用一般用途或事务处理模板

OLTP与OLAP 不同的是,OLTP系统强调数据库内存效率,强调内存各种指标的命令率,强调绑定变量,强调并发操作,强调事务性。
OLAP系统则强调数据分析,强调SQL执行时长,强调磁盘I/O,强调分区。

总的来说,可以认为OLAP的产生是因为一些特性OLTP无法满足,所以一些OLTP异变了一些特性,变成了OLAP,OLAP可以看做是OLTP的一种延展,一个让OLTP产生的数据发现价值的过程。

OLAP数据库选型

n2LrFd
常见的OLTP如,mysql、PostgreSQL、Oracle等,不需要多讲,但是OLAP随着这些年层出不穷,我们汇总选型一下,并看看他们的特点。

目前市面上主流的开源OLAP引擎包含不限于:Hive、Spark SQL、Presto、Kylin、Impala、Druid、Clickhouse、Greeplum等,可以说目前没有一个引擎能在数据量,灵活程度和性能上做到完美,用户需要根据自己的需求进行选型。

  • Hive
  • Spark SQ
  • Presto
  • Elasticsearch
  • Impala
  • Druid
  • Clickhouse
  • Greeplum
  • Kylin
  • Drios
  • StarRocks

按数据量划分

CM5qTF

按建模类型划分

1、ROLAP

Elasticsearch
solr
ClickHouse
Druid
GreenPlum
Drios
StarRocks

2、MOLAP

3、HOLAP

0%