背景

上次我们完成了headscale私有部署,但是为了保证headscale组网中的稳定和可靠性,为设备p2p连接设置一个保底措施,我们需要自建derp服务器。

什么是DERP

DERP 即 Detoured Encrypted Routing Protocol,这是 Tailscale 自研的一个协议:

  • 它是一个通用目的包中继协议,运行在 HTTP 之上,而大部分网络都是允许 HTTP 通信的。
  • 它根据目的公钥(destination’s public key)来中继加密的流量(encrypted payloads)。

Tailscale 会自动选择离目标节点最近的 DERP server 来中继流量
Tailscale 使用的算法很有趣: __所有客户端之间的连接都是先选择 DERP 模式(中继模式),这意味着连接立即就能建立(优先级最低但 100% 能成功的模式),用户不用任何等待。然后开始并行地进行路径发现,通常几秒钟之后,我们就能发现一条更优路径,然后将现有连接透明升级(upgrade)过去,变成点对点连接(直连)__。

因此, DERP 既是 Tailscale 在 NAT 穿透失败时的保底通信方式(此时的角色与 TURN 类似),也是在其他一些场景下帮助我们完成 NAT 穿透的旁路信道。换句话说,它既是我们的保底方式,也是有更好的穿透链路时,帮助我们进行连接升级(upgrade to a peer-to-peer connection)的基础设施。

Tailscale 官方内置了很多 DERP 服务器,分步在全球各地,惟独不包含中国大陆,原因你懂得。这就导致了一旦流量通过 DERP 服务器进行中继,延时就会非常高。而且官方提供的 DERP 服务器是万人骑,存在安全隐患。

官方内置了很多 DERP 服务器

1
2
3
4
5
6
7
8
9
10
11
12
- tok: 96.6ms  (Tokyo)
- sfo: 179.8ms (San Francisco)
- sea: 181.3ms (Seattle)
- dfw: 218.6ms (Dallas)
- ord: 229.7ms (Chicago)
- nyc: 233.8ms (New York City)
- fra: 257.7ms (Frankfurt)
- lhr: 259.1ms (London)
- sin: 265ms (Singapore)
- syd: 317.1ms (Sydney)
- sao: 352.8ms (São Paulo)
- blr: 368.5ms (Bangalore)

为了实现低延迟、高安全性,我们可以参考 Tailscale 官方文档自建私有的 DERP 服务器。
Custom DERP Servers

部署DERP

还是docker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '3.5'
services:
derper:
container_name: derper
image: fredliang/derper
restart: always
volumes:
- ./cert:/cert
ports:
- 3478:3478/udp
- 23479:23479
environment:
DERP_DOMAIN: derp.XXXX.cn
DERP_ADDR: ":23479"
DERP_CERT_MODE: manual
DERP_CERT_DIR: /cert
  • stunport: 3478 默认情况下也会开启 STUN 服务,UDP 端口是 3478
  • derpport: 23479
1
2
3
2022/09/06 01:21:27 no config path specified; using /var/lib/derper/derper.key
2022/09/06 01:21:27 derper: serving on :23479 with TLS
2022/09/06 01:21:27 running STUN server on [::]:3478

部署好 derper 之后,就可以修改 Headscale 的配置来使用自定义的 DERP 服务器了。Headscale 可以通过两种形式的配置来使用自定义 DERP:

  • 一种是在线 URL,格式是 JSON,与 Tailscale 官方控制服务器使用的格式和语法相同。

  • 另一种是本地文件,格式是 YAML。
    我们可以直接使用本地的 YAML 配置文件,内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    # /etc/headscale/derp.yaml
    regions:
    901:
    regionid: 901
    regioncode: gz
    regionname: Tencent Guangzhou
    nodes:
    - name: 901a
    regionid: 901
    hostname: '实际域名'
    ipv4: '可不需要'
    stunport: 3478
    stunonly: false
    derpport: 23479
  • regions 是 YAML 中的对象,下面的每一个对象表示一个可用区,每个可用区里面可设置多个 DERP 节点,即 nodes。
    每个可用区的 regionid 不能重复。

  • 每个 node 的 name 不能重复。

  • regionname 一般用来描述可用区,regioncode 一般设置成可用区的缩写。

  • ipv4 字段不是必须的,如果你的域名可以通过公网解析到你的 DERP 服务器地址,这里可以不填。如果你使用了一个二级域名,而这个域名你并没有在公共 DNS server 中添加相关的解析记录,那么这里就需要指定 IP(前提是你的证书包含了这个二级域名,这个很好支持,搞个泛域名证书就行了)。

  • stunonly: false 表示除了使用 STUN 服务,还可以使用 DERP 服务。

  • hostname和 ipv4部分根据你的实际情况填写。

接下来还需要修改 Headscale 的配置文件,引用上面的自定义 DERP 配置文件。需要修改的配置项如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# /etc/headscale/config.yaml
derp:
# List of externally available DERP maps encoded in JSON
urls:
- https://controlplane.tailscale.com/derpmap/default

# Locally available DERP map files encoded in YAML
#
# This option is mostly interesting for people hosting
# their own DERP servers:
# https://tailscale.com/kb/1118/custom-derp-servers/
#
# paths:
# - /etc/headscale/derp-example.yaml
paths:
- /etc/headscale/derp.yaml

# If enabled, a worker will be set up to periodically
# refresh the given sources and update the derpmap
# will be set up.
auto_update_enabled: true

# How often should we check for DERP updates?
update_frequency: 24h

在 Tailscale 客户端上使用以下命令查看目前可以使用的 DERP 服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ tailscale netcheck
Report:
* UDP: true
* IPv4: yes, 14.154.29.252:50713
* IPv6: yes, [::ffff:14.154.29.252]:50713
* MappingVariesByDestIP: false
* HairPinning: false
* PortMapping: UPnP, NAT-PMP, PCP
* Nearest DERP: Tencent Guangzhou
* DERP latency:
- gz: 8.5ms (Tencent Guangzhou) -----> 这就是我们刚建的derp服务器
- tok: 96.6ms (Tokyo)
- sfo: 179.8ms (San Francisco)
- sea: 181.3ms (Seattle)
- dfw: 218.6ms (Dallas)
- ord: 229.7ms (Chicago)
- nyc: 233.8ms (New York City)
- fra: 257.7ms (Frankfurt)
- lhr: 259.1ms (London)
- sin: 265ms (Singapore)
- syd: 317.1ms (Sydney)
- sao: 352.8ms (São Paulo)
- blr: 368.5ms (Bangalore)

tailscale netcheck 实际上只检测 3478/udp 的端口, 就算 netcheck 显示能连,也不一定代表 23479 端口可以转发流量。最简单的办法是直接打开 DERP 服务器的 URL:https://derp.XXXX.cn:23479,如果看到如下页面,且地址栏的 SSL 证书标签显示正常可用,那才是真没问题了。
NUGpWd

Tailscale 命令行工具来测试:

查看与通信对端的连接方式及状态

1
2
3
4
tailscale status

100.64.0.1 junyao-lxgc8rs7 junyao macOS -
100.64.0.2 homeds-2xqvj919 junyao linux active; direct [::ffff:14.154.29.252]:41641, tx 518960 rx 7674032

ping一下

1
2
3
4
tailscale ping 100.64.0.2

pong from homeds-2xqvj919 (100.64.0.2) via [::ffff:14.154.29.252]:41641 in 6ms --->通过点对点
pong from homeds-2xqvj919 (100.64.0.2) via DERP(thk) in 104ms --->通过DERP

这个tailscale ping更加友好一点,会直接告诉你是通过DERP中继服务器还是点对点来和对方通信的。

DERP安全

我们总要设置一下DERP的访问权限,不能成为万人骑

步骤:
1、在 DERP 服务器上安装 Tailscale。
2、derper 启动时加上参数 –verify-clients

tip:因为derp是通过本地tailscale进行检测,所以docker部署的derper必须在容器内启有Tailscale客户端的实例,我们把服务器的/var/run/tailscale/tailscaled.sock映射进入derp容器目录中,让derp能检测到tailscale
tip:如果是使用docker安装的tailscale,请将/var/run/tailscale/tailscaled.sock映射出来,然后再映射进入derp,即让derp和tailscale容器共享volume(/var/run/tailscale/tailscaled.sock)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
version: '3.5'
services:
derper:
container_name: derper
image: fredliang/derper
restart: always
volumes:
- ./cert:/cert
- /var/run/tailscale/tailscaled.sock:/var/run/tailscale/tailscaled.sock
ports:
- 3478:3478/udp
- 23479:23479
environment:
DERP_DOMAIN: derp.webhunt.cn
DERP_ADDR: ":23479"
DERP_CERT_MODE: manual
DERP_CERT_DIR: /cert
DERP_VERIFY_CLIENTS: "true"

需求

  • 办公需求:
    设备:
    1、synology DS918+
    2、macbook pro m1
    3、mac mini
    问题:访问家里nas慢的问题,及工作效率能快速访问的问题

  • 公司跨机房需求:
    1、erp金蝶系统-sqlServer服务-公司内部机房
    2、采购系统-mongodb服务-阿里云服务器
    问题:采购系统上访问ERP速度慢的问题,及双流cdc的需求

设想这样一个问题:在北京和上海各有一台局域网的机器(例如一台是家里的台式机,一 台是连接到星巴克 WiFi 的笔记本),二者都是私网 IP 地址,但可以访问公网, 如何让这两台机器通信呢

既然二者都能访问公网,那最简单的方式当然是在公网上架设一个中继服务器: 两台机器分别连接到中继服务,后者完成双向转发。这种方式显然有很大的性能开销,而 且中继服务器很容易成为瓶颈。

有没有办法不用中继,让两台机器直接通信呢

那就是:跨过公网(互联网)实现端到端直连。

TailScale 是什么, Headscale又是什么

TailScale

TailScale 你可以理解为 VPN,或者说 Wireguard 外面包了一层壳子。它可以将一些设备连接起来,形成一个虚拟局域网。一个很简单的例子,你可以在星巴克里,读取家里任意电脑上的文件。

比如最常见的需求就是,公司有一个内网办公环境,当你外出办公时,也希望你的电脑能够接入办公网络。 因为外网的机器和内网的机器不能互联,所以一般会有一个中心服务器, 所有的子节点都和中心服务器相连,然后中心服务器转发所有的流量。
5Z0qxl

这样做的缺点显而易见,首先是中心服务器(hub)会成为瓶颈。 其次,某种极端情况下,如果节点 A 和 节点 B 距离非常近,但是都离 hub 很远, 这样就会导致非常高的延迟。
Rje9yw

那么我们就会想,能不能让节点间直接互联呢? 这就是 mesh VPN,其实现就是 wireguard。
pQ8X8e
wireguard 的每一个节点都会存储其他所有节点的信息,并且和其他所有的节点都建立 tls 连接。 如果涉及到内网穿透的话,那么你需要找到一台处于网关位置的节点(内外网都可达),将其设置为 coordinator, 扮演类似于 hub 的角色, 分发穿透内外网的流量。

wireguard 的缺点在于:

  • 配置比较繁琐
  • 维护也比较困难,增删节点都需要改动所有节点的配置

基于上述这些痛点,TailScale 做了一些改进:

1、在原有的 ICE、STUN 等 UDP 协议外,实现了 DERP TCP 协议来实现 NAT 穿透
2、基于公网的 coordinator 服务器下发 ACL 和配置,实现节点动态更新
3、通过第三方(如 Google) SSO 服务生成用户和私钥,实现身份认证

对我来说 Tailscale 相对于 Wireguard 最大的优势有 3 点:
1、配置简单
2、支持 P2P 和中继自动切换
3、支持自建中继节点

作为一种搭建虚拟局域网的工具,相比于传统VPN而言,所有节点之间都可以进行P2P连接,也就是全互联模式,效率更高。

为了基于wireguard实现更完美的VPN工具,现在已经出现了很多项目,如Netmaker,通过可视化界面来配置全互联模式,并支持UDP打洞、多租户等高端功能,几乎适配所有平台。然而现实世界是复杂的,无法保证所有的NAT都能打洞成功,而且Netmaker目前不支持fallback机制,如打洞失败,无法fallback中继节点。而Tailscale支持fallback,可以尽最大努力实现全互联模式,部分节点即使打洞不成功,也可通过中继节点在虚拟局域网内畅通无阻。

简而言之,我们可以认为 TailScale 是更为易用版的、功能封装更为完善的 wireguard。

Headscale

Headscale看名字就知道是和 Tailscale 对着干的,Tailscale 的客户端是不收费的,服务端是不开源的,超过 20 个设备就需要付费了,并且Tailscale的服务不在国内,我们不可能把服务端安全的交给Tailscale,私有自建才是出入。Headscale 是个第三方开源版本的 Tailscale 的服务端,除了「网站界面」之外该有的功能都有,因此我们使用Headscale自建私有服务。

解决方案

所以本次的方案:
1、阿里云服务器 部署 Headscale 服务端
2、自己的 Mac 和 Nas 上,使用 Tailscale 作为客户端

或公司,mongodb所在的服务器与sqlServer所在的服务器组网

服务端

Docker 部署

1
2
3
4
5
6
7
8
9
10
11
12
version: '3.5'
services:
headscale:
image: headscale/headscale:latest-alpine
container_name: headscale
volumes:
- ./config:/etc/headscale
- ./data/data:/var/lib/headscale
ports:
- 8080:8080
command: headscale serve
restart: unless-stopped

手动新建一下配置文件,放入./config

1
wget https://github.com/juanfont/headscale/raw/main/config-example.yaml -O config.yaml

编辑一下它:

  • server_url 这个东西是一个简单的「HTTP 设备认证页面」,后面需要暴露在公网上,其他设备如果想加入到你的网络里面,需要访问这个地址,拿到一个 Token。有域名的话推荐用域名+nginx/caddy 反向代理没域名的话用 ip+端口。
  • ip_prefixes 可以根据自己喜好来改,留默认的当然也行

docker-compose up -d启动 docker
下一步,创建一个「租户」,你可以理解为一个用户。进到 headscale 的 docker 里面,执行

1
headscale namespaces create SOME_NAME

SOME_NAME随意替换

客户端

并非全部客户端都支持(或全部支持)将 Headscale 作为后端,目前适配情况如下:

操作系统 支持情况 解决方案
Linux 原生支持
Windows 修改注册表
macOS 需要添加描述文件
Android
iOS 目前不支持

Mac 端:AppStore 版客户端

由于是使用的「非官方版控制器」,所以我们需要稍微 hack 一下,将软件里面默认的「tailscale 官方控制服务器地址」改为我们自己搭建的 Headscale 地址。

访问 http://server_url/apple,下载并安装一个描述文件。

然后打开 Tailscale,点击登录,会看到一个英文界面,里面有一行命令

1
headscale -n NAMESPACE nodes register --key SOME_HEX_VALUE

将里面的 NAMESPACE 换成你创建的租户名称,然后去服务端 docker 里面,执行它。你就会发现,你的 mac 已经登录成功了。

Linux端:Docker 版客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
version: '3.3'
services:
tailscaled:
container_name: tailscaled
image: tailscale/tailscale
network_mode: host
privileged: true
restart: always
cap_add:
- net_admin
- sys_module
volumes:
- ./lib:/var/lib
- /dev/net/tun:/dev/net/tun
command: sh -c "mkdir -p /var/run/tailscale && ln -s /tmp/tailscaled.sock /var/run/tailscale/tailscaled.sock && tailscaled"

启动容器后,需要进入容器,输入这个东西进行登录:

1
2
tailscale up --login-server=http://server_url --accept-routes=true --accept-dns=false

如果没问题,那么会提示你访问一个网址,拿到 Token,回到 Server 端进行登录就好。

synology

Access Synology NAS from anywhere
Tailscale Packages

tailscale up 参数

  • –hostname 设置名称
  • –accept-routes 接受服务端配置的路由
  • –accept-dns 推荐将 DNS 功能关闭,因为它会覆盖系统的默认 DNS
  • –advertise-routes 申请路由到该节点,Tailscale 实现「出口节点」,即打通局域网内部的路由访问,这个网段的所有设备都可以被访问
  • –advertise-exit-node 可以建立数据导向节点exit node,即本机访问互联网皆通过此节点 bolean
  • –exit-node 指定出口节点,导向所有流量经这出口节点

headscale命令

通过服务端授权客户端

1
headscale -n default nodes register --key 905cf165204800247fbd33989dbc22be95c987286c45aac3033937041150d846

查看注册的节点

1
2
3
4
headscale nodes list

ID | Name | NodeKey | Namespace | IP addresses | Ephemeral | Last seen | Online | Expired
1 | coredns | [Ew3RB] | default | 10.1.0.1 | false | 2022-03-20 09:08:58 | online | no

通过 Pre-Authkeys 接入

前面的接入方法都需要服务端同意,步骤比较烦琐,其实还有更简单的方法,可以直接接入,不需要服务端同意。

首先在服务端生成 pre-authkey 的 token,有效期可以设置为 24 小时:

1
headscale preauthkeys create -e 24h -n default

查看已经生成的 key:

1
headscale -n default preauthkeys list

现在新节点就可以无需服务端同意直接接入了:

1
tailscale up --login-server=http://<HEADSCALE_PUB_IP>:8080 --accept-routes=true --accept-dns=false --authkey $KEY

打通局域网

WZtD67
假设你的家庭内网有一台 Linux 主机(比如 Nas)安装了 Tailscale 客户端,我们希望其他 Tailscale 客户端可以直接通过家中的局域网 IP(例如 192.168.100.0/24) 访问家庭内网的任何一台设备。

官方文档:Subnet routers and traffic relay nodes
群辉具体操作见:headscale组网打通群辉局域网内部访问

linux 配置方法很简单,首先需要设置 IPv4 与 IPv6 路由转发:

1
2
3
echo 'net.ipv4.ip_forward = 1' | tee /etc/sysctl.d/ipforwarding.conf
echo 'net.ipv6.conf.all.forwarding = 1' | tee -a /etc/sysctl.d/ipforwarding.conf
sysctl -p /etc/sysctl.d/ipforwarding.conf

客户端修改注册节点的命令,在原来命令的基础上加上参数 --advertise-routes=192.168.100.0/24

1
tailscale up --login-server=http://<HEADSCALE_PUB_IP>:8080 --accept-routes=true --accept-dns=false --advertise-routes=192.168.100.0/24

在 Headscale 端查看路由,可以看到相关路由是关闭的。

1
2
3
4
5
6
7
headscale nodes list|grep nas
6 | nas | [7LdVc] | default | 10.1.0.6 | false | 2022-03-20 15:50:46 | online | no

headscale routes list -i 6
Route | Enabled

192.168.100.0/24 | false

开启路由:

1
2
3
headscale routes enable -i 6 -r "192.168.100.0/24"
Route | Enabled
192.168.100.0/24 | true

其他节点查看路由结果:

1
2
ip route
192.168.100.0/24 dev tailscale0

现在你在任何一个 Tailscale 客户端所在的节点都可以 ping 通家庭内网的机器了

为了能让headscale稳定,我们可以自建中继服务器
headscale之DERP中继服务器自建

相关链接

NAT 穿透是如何工作的:技术原理及企业级实践

  • 增量(滚动)聚合算子:如min、max、minBy、maxBy、sum、reduce、aggregate
    一次取一条数据,用聚合函数对中间累加器更新;窗口触发时,取累加器输出结果;
    优点:实时性提升,性能比较好,数据一进入窗口就计算,仅仅缓存计算中间值
    缺点:窗口数据无法排序,无法获取窗口信息
  • 全量聚合蒜子:如apply、process
    数据”攒”在状态容器中,窗口触发时,把整个窗口的数据交给聚合函数
    优点:对窗口中所有数据排序、获取窗口信息,比如时间窗口,获取窗口开始时间和结束时间
    缺点:窗口数据量较大时,数据可能很多,一起处理,比较耗时

pq0peM

增量聚合示例

简单聚合算子

1
2
3
4
5
6
keyedStream.countWindow(5,2)
// .max('score') // 得到的结果中,除了score是符合逻辑的结果外,其他字段是窗口中的第一条的值
// .min('score')
// .maxBy('score') // 得到结果是:最大score所在的那一行数据
// .minBy('score') // 得到结果是:最小score所在的那一行数据
// .sum('score') // 得到的结果中,除了score是符合逻辑(score之和)的结果外,其他字段是不可预料的,一直在更新

reduce 聚合算子

1
2
3
4
5
6
7
source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<long>(){
@override
public Long reduce(Long value1,Long value2) throws exception{
return null
}
})

aggregate 聚合算子

1
2
3
4
watermarkedBeanStream
.keyBy(EventBean::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.aggregate(new AggregateFunction......)

全量聚合示例

apply 聚合算子

1
2
3
4
watermarkedBeanStream
.keyBy(EventBean::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.appply(new WindowFunction...)

process 聚合算子(richFunction)

1
2
3
4
watermarkedBeanStream
.keyBy(EventBean::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.process(new WindowFunction...)

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
package cn.doitedu.flink.java.demos;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.*;

/**
* @Author: deep as the sea
* @Site: <a href="www.51doit.com">多易教育</a>
* @QQ: 657270652
* @Date: 2022/5/2
* @Desc:
*
* 测试数据 :
* 1,e01,10000,p01,10
* 1,e02,11000,p02,20
* 1,e02,12000,p03,40
* 1,e03,20000,p02,10
* 1,e01,21000,p03,50
* 1,e04,22000,p04,10
* 1,e06,28000,p05,60
* 1,e07,30000,p02,10
**/
public class _20_Window_Api_Demo1 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

// 1,e01,3000,pg02
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<EventBean2> beanStream = source.map(s -> {
String[] split = s.split(",");
return new EventBean2(Long.parseLong(split[0]), split[1], Long.parseLong(split[2]), split[3], Integer.parseInt(split[4]));
}).returns(EventBean2.class);


// 分配 watermark ,以推进事件时间
SingleOutputStreamOperator<EventBean2> watermarkedBeanStream = beanStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<EventBean2>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner(new SerializableTimestampAssigner<EventBean2>() {
@Override
public long extractTimestamp(EventBean2 eventBean, long recordTimestamp) {
return eventBean.getTimeStamp();
}
})
);

/**
* 滚动聚合api使用示例
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件条数
* 使用aggregate算子来实现
*/
SingleOutputStreamOperator<Integer> resultStream = watermarkedBeanStream
.keyBy(EventBean2::getGuid)
// 参数1: 窗口长度 ; 参数2:滑动步长
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
// reduce :滚动聚合算子,它有个限制 ,聚合结果的数据类型 与 数据源中的数据类型 ,是一致
/*.reduce(new ReduceFunction<EventBean>() {
@Override
public EventBean reduce(EventBean value1, EventBean value2) throws Exception {
return null;
}
})*/
.aggregate(new AggregateFunction<EventBean2, Integer, Integer>() {
/**
* 初始化累加器
* @return
*/
@Override
public Integer createAccumulator() {
return 0;
}

/**
* 滚动聚合的逻辑(拿到一条数据,如何去更新累加器)
* @param value The value to add
* @param accumulator The accumulator to add the value to
* @return
*/
@Override
public Integer add(EventBean2 value, Integer accumulator) {
return accumulator + 1;
}

/**
* 从累加器中,计算出最终要输出的窗口结算结果
* @param accumulator The accumulator of the aggregation
* @return
*/
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}

/**
* 批计算模式下,可能需要将多个上游的局部聚合累加器,放在下游进行全局聚合
* 因为需要对两个累加器进行合并
* 这里就是合并的逻辑
* 流计算模式下,不用实现!
* @param a An accumulator to merge
* @param b Another accumulator to merge
* @return
*/
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
/*resultStream.print();*/


/**
* 需求 二 : 每隔10s,统计最近 30s 的数据中,每个用户的平均每次行为时长
* 要求用 aggregate 算子来做聚合
* 滚动聚合api使用示例
*/
SingleOutputStreamOperator<Double> resultStream2 = watermarkedBeanStream
.keyBy(EventBean2::getGuid)
/*.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))*/
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.milliseconds(10)))
// 泛型1: 输入的数据的类型 ; 泛型2: 累加器的数据类型 ; 泛型3: 最终结果的类型
.aggregate(new AggregateFunction<EventBean2, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0, 0);
}

@Override
public Tuple2<Integer, Integer> add(EventBean2 eventBean, Tuple2<Integer, Integer> accumulator) {
// accumulator.setField(accumulator.f0+1,0);
// accumulator.setField(accumulator.f1+eventBean.getActTimelong(),1);
// return accumulator;

return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + eventBean.getActTimelong());
}

@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {

return accumulator.f1 / (double) accumulator.f0;
}

/**
* 在批计算模式中,shuffle的上游可以做局部聚合,然后会把局部聚合结果交给下游去做全局聚合
* 因此,就需要提供 两个局部聚合结果进行合并的逻辑
*
* 在流式计算中,不存在这种 上游局部聚合和交给下游全局聚合的机制!
* 所以,在流式计算模式下,不用实现下面的方法
* @param a An accumulator to merge
* @param b Another accumulator to merge
* @return
*/
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
});
/*resultStream2.print();*/


/**
* TODO 补充练习 1
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件条数
* 滚动聚合api使用示例
* 使用sum算子来实现
*/
watermarkedBeanStream
.map(bean->Tuple2.of(bean,1)).returns(new TypeHint<Tuple2<EventBean2, Integer>>() {})
.keyBy(tp->tp.f0.getGuid())
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
// 数据: Tuple2<Bean,1>
.sum("f1")
/*.print()*/;


/**
* TODO 补充练习 2
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的最大行为时长
* 滚动聚合api使用示例
* 用max算子来实现
*/
watermarkedBeanStream
.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.max("actTimelong")
/*.print()*/;



/**
* TODO 补充练习 3
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个用户的最大行为时长及其所在的那条行为记录
* 滚动聚合api使用示例
* 用maxBy算子来实现
*/
watermarkedBeanStream
.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.maxBy("actTimelong")
/*.print()*/;



/**
* TODO 补充练习 4
* 需求 一 : 每隔10s,统计最近 30s 的数据中,每个页面上发生的行为中,平均时长最大的前2种事件及其平均时长
* 用 process算子来实现
*/
watermarkedBeanStream
.keyBy(bean->bean.getPageId())
.window(SlidingEventTimeWindows.of(Time.seconds(30),Time.seconds(10)))
.process(new ProcessWindowFunction<EventBean2, Tuple3<String,String,Double>, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<EventBean2, Tuple3<String,String,Double>, String, TimeWindow>.Context context, Iterable<EventBean2> elements, Collector<Tuple3<String,String,Double>> out) throws Exception {
// 构造一个hashmap来记录每一个事件的发生总次数,和行为总时长
HashMap<String, Tuple2<Integer, Long>> tmpMap = new HashMap<>();

// 遍历窗口中的每一条数据
for (EventBean2 element : elements) {
String eventId = element.getEventId();
Tuple2<Integer, Long> countAndTimelong = tmpMap.getOrDefault(eventId,Tuple2.of(0,0L));

tmpMap.put(eventId,Tuple2.of(countAndTimelong.f0+1,countAndTimelong.f1+element.getActTimelong()) );
}

// 然后,从tmpMap中,取到 平均时长 最大的前两个事件
ArrayList<Tuple2<String, Double>> tmpList = new ArrayList<>();
for (Map.Entry<String, Tuple2<Integer, Long>> entry : tmpMap.entrySet()) {
String eventId = entry.getKey();
Tuple2<Integer, Long> tuple = entry.getValue();
double avgTimelong = tuple.f1/ (double)tuple.f0;
tmpList.add(Tuple2.of(eventId,avgTimelong));
}

// 然后对tmpList按平均时长排序
Collections.sort(tmpList, new Comparator<Tuple2<String, Double>>() {
@Override
public int compare(Tuple2<String, Double> tp1, Tuple2<String, Double> tp2) {
/* return tp2.f1.compareTo(tp1.f1);*/
return Double.compare(tp2.f1,tp1.f1);
}
});

// 输出前2个
for(int i=0;i<Math.min(tmpList.size(),2);i++){
out.collect(Tuple3.of(key,tmpList.get(i).f0,tmpList.get(i).f1));
}
}
})
.print();



/**
* 全窗口计算api使用示例
* 需求 三 : 每隔10s,统计最近 30s 的数据中,每个用户的行为事件中,行为时长最长的前2条记录
* 要求用 apply 或者 process 算子来实现
*
*/
// 1. 用apply算子来实现需求
SingleOutputStreamOperator<EventBean2> resultStream3 = watermarkedBeanStream.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))

// 泛型1: 输入数据类型; 泛型2:输出结果类型; 泛型3: key的类型, 泛型4:窗口类型
.apply(new WindowFunction<EventBean2, EventBean2, Long, TimeWindow>() {
/**
*
* @param key 本次传给咱们的窗口是属于哪个key的
* @param window 本次传给咱们的窗口的各种元信息(比如本窗口的起始时间,结束时间)
* @param input 本次传给咱们的窗口中所有数据的迭代器
* @param out 结果数据输出器
* @throws Exception
*/
@Override
public void apply(Long key, TimeWindow window, Iterable<EventBean2> input, Collector<EventBean2> out) throws Exception {

// low bi写法: 从迭代器中迭代出数据,放入一个arraylist,然后排序,输出前2条
ArrayList<EventBean2> tmpList = new ArrayList<>();

// 迭代数据,存入list
for (EventBean2 eventBean2 : input) {
tmpList.add(eventBean2);
}
// 排序
Collections.sort(tmpList, new Comparator<EventBean2>() {
@Override
public int compare(EventBean2 o1, EventBean2 o2) {
return o2.getActTimelong() - o1.getActTimelong();
}
});

// 输出前2条
for (int i = 0; i < Math.min(tmpList.size(), 2); i++) {
out.collect(tmpList.get(i));
}

}
});
/*resultStream3.print();*/


// 2. 用process算子来实现需求
SingleOutputStreamOperator<String> resultStream4 = watermarkedBeanStream.keyBy(EventBean2::getGuid)
.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)))
.process(new ProcessWindowFunction<EventBean2, String, Long, TimeWindow>() {
@Override
public void process(Long aLong, ProcessWindowFunction<EventBean2, String, Long, TimeWindow>.Context context, Iterable<EventBean2> input, Collector<String> out) throws Exception {

// 本次窗口的元信息
TimeWindow window = context.window();
long maxTimestamp = window.maxTimestamp();// 本窗口允许的最大时间戳 [1000,2000) ,其中 1999就是允许的最大时间戳; 2000就是窗口的end
long windowStart = window.getStart();
long windowEnd = window.getEnd();


// low bi写法: 从迭代器中迭代出数据,放入一个arraylist,然后排序,输出前2条
ArrayList<EventBean2> tmpList = new ArrayList<>();

// 迭代数据,存入list
for (EventBean2 eventBean2 : input) {
tmpList.add(eventBean2);
}
// 排序
Collections.sort(tmpList, new Comparator<EventBean2>() {
@Override
public int compare(EventBean2 o1, EventBean2 o2) {
return o2.getActTimelong() - o1.getActTimelong();
}
});

// 输出前2条
for (int i = 0; i < Math.min(tmpList.size(), 2); i++) {
EventBean2 bean = tmpList.get(i);
out.collect( "窗口start:"+windowStart + "," +"窗口end:"+ windowEnd + "," + bean.getGuid() + "," + bean.getEventId() + "," + bean.getTimeStamp() + "," +bean.getPageId() + "," +bean.getActTimelong());
}
}

});
/*resultStream4.print();*/
env.execute();
}
}

背景准备

曾经在Flink CDC同步mysql和pg关联数据到es实践尝试使用flink进行多数据源双流join到es,本次实践使用流平台单数据源双流join。

我们快速用docker搭建起来sqlserver,

记住开启sqlserver cdc:sys.sp_cdc_enable_table

docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
version: '2.1'
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2019-latest
container_name: sqlserver
ports:
- "1433:1433"
environment:
- "MSSQL_AGENT_ENABLED=true"
- "MSSQL_PID=Standard"
- "ACCEPT_EULA=Y"
- "SA_PASSWORD=Password!"

并创建一些数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
-- Sqlserver
CREATE DATABASE inventory;
GO
USE inventory;
EXEC sys.sp_cdc_enable_db;

-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
INSERT INTO products(name,description,weight)
VALUES ('scooter','Small 2-wheel scooter',3.14);
INSERT INTO products(name,description,weight)
VALUES ('car battery','12V car battery',8.1);
INSERT INTO products(name,description,weight)
VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products(name,description,weight)
VALUES ('hammer','12oz carpenter''s hammer',0.75);
INSERT INTO products(name,description,weight)
VALUES ('hammer','14oz carpenter''s hammer',0.875);
INSERT INTO products(name,description,weight)
VALUES ('hammer','16oz carpenter''s hammer',1.0);
INSERT INTO products(name,description,weight)
VALUES ('rocks','box of assorted rocks',5.3);
INSERT INTO products(name,description,weight)
VALUES ('jacket','water resistent black wind breaker',0.1);
INSERT INTO products(name,description,weight)
VALUES ('spare tire','24 inch spare tire',22.2);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;
-- Create some very simple orders
CREATE TABLE orders (
id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY,
order_date DATE NOT NULL,
purchaser INTEGER NOT NULL,
quantity INTEGER NOT NULL,
product_id INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('16-JAN-2016', 1001, 1, 102);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('17-JAN-2016', 1002, 2, 105);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('19-FEB-2016', 1002, 2, 106);
INSERT INTO orders(order_date,purchaser,quantity,product_id)
VALUES ('21-FEB-2016', 1003, 1, 107);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0;
GO

使用流平台快速搭建数据流

ruV7yp
本次我们从sqlserver获取数据,并通过print直接输出到终端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '192.168.31.6',
'port' = '1433',
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'products'
);


CREATE TABLE orders (
id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_id INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = '192.168.31.6',
'port' = '1433',
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'orders'
);


CREATE TABLE enriched_orders (
order_id INT,
order_date DATE,
purchaser INT,
quantity INT,
product_name STRING,
product_description STRING
) WITH ('connector' = 'print');


INSERT INTO
enriched_orders
SELECT
o.id,
o.order_date,
o.purchaser,
o.quantity,
p.name,
p.description
FROM
orders AS o
LEFT JOIN products AS p ON o.product_id = p.id;

启动sqlServer_cdc_print
dkaq5E

背景

半年前提笔写下了monstache实践mongodb同步es,半年后因为搭建完实时数据流平台,我将用flink sql实时流平台来尝试本次的实践,对标 monstache实践mongodb同步es.

monstache Flink CDC
近实时 ☑️ ☑️
支持(旧数据)全量同步 ☑️ ☑️
支持增量同步(增删改) ☑️ ☑️
是否现在社区主流 ☑️是 ☑️未来主流
同步方式 数据层oplog 数据层oplog流处理

实践

mongodb-material表数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"_id": ObjectId("5f601cd6afef2b4993dc7afd"),
"created": ISODate("2020-09-15T01:45:44.753Z"),
"mfrId": ObjectId("5ea279be76860394d14a4982"),
"mfrName": "Biolegend",
"name": "FITC anti-mouse CD2",
"ras": "100105-BLG",
"sn": "100105",
"spec": "50 μg",
"status": true,
"taxrate": 13,
"unit": "EA",
"updated": ISODate("2021-01-15T08:55:45.668Z"),
"price": NumberInt("980"),
"taxcode": "107030710",
"clone": "RM2-5",
"lastOrderAt": ISODate("2021-08-05T03:09:47.577Z"),
"manual": "https://www.biolegend.com/Default.aspx?ID=6664&productid=472",
"pn": "472",
"cumulativeSales": NumberInt("0"),
"isDeprecated": false,
"ship": "蓝冰",
"storage": "2°C-8°C",
"isPublic": true,
"invtCode": "1405.01"
}

添加实时流任务
Yl6y6J
Flink SQL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
CREATE TABLE material (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'XXX',
'password' = 'XXX',
'database' = 'biocitydb',
'collection' = 'material'
);
CREATE TABLE es_material (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'es_material'
);
INSERT INTO
es_material
select
_id,
created,
mfrId,
mfrName,
name,
ras,
sn,
spec,
status,
taxrate,
unit,
updated,
price,
taxcode,
clone,
lastOrderAt,
manual,
pn,
cumulativeSales,
isDeprecated,
ship,
storage,
isPublic,
invtCode
from
material;

添加Maven pom 或者Upload Jar

  • flink-sql-connector-elasticsearch7_2.11-1.13.2.jar
  • flink-sql-connector-mongodb-cdc-2.2.1.jar
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.14.4</version>
</dependency>

C6zfRh

概念

DC、UID、OU、CN、SN、DN、RDN

关键字 英文全称 含义
dc Domain Component 域名的部分,其格式是将完整的域名分成几部分,如域名为example.com变成dc=example,dc=com(一条记录的所属位置)
uid User Id 用户ID songtao.xu(一条记录的ID)
ou Organization Unit 组织单位,组织单位可以包含其他各种对象(包括其他组织单元),如“oa组”(一条记录的所属组织)
cn Common Name 公共名称,如“Thomas Johansson”(一条记录的名称)
sn Surname 姓,如“许”
dn Distinguished Name “uid=songtao.xu,ou=oa组,dc=example,dc=com”,一条记录的位置(唯一)
rdn Relative dn 相对辨别名,类似于文件系统中的相对路径,它是与目录树结构无关的部分,如“uid=tom”或“cn= Thomas Johansson”

需求

  • 流1 eventCtn

    id eventId cnt
    1 event01 3
    1 event02 2
    2 event02 4
  • 流2 userInfo

    id gender city
    1 male shanghai
    2 female beijing
  1. 将流1的数据展开
    比如,一条数据:1,event01,3需要展开成3条:
    • 1,event01,随机数1
    • 1,event01,随机数2
    • 1,event01,随机数3
  2. 流1的数据,还需要关联上流2的数据(性别,城市)
    • 并且把关联失败的流1的数据,写入一个测流,否则输出到主流
  3. 对主流数据按照性别分组,取最大随机数所在的那一条数据,作为结果输出
  4. 把测流处理结果,写入文件系统,并写成parquet格式
  5. 把主流处理结果,写入mysql,并实现幂等更新

实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.race.wc.exercise;

import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Exercise {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建流s1
DataStream<String> ds1 = env
.socketTextStream("localhost", 9991);
SingleOutputStreamOperator<EventCount> s1 = ds1.map(s -> {
String[] arr = s.split(",");
return new EventCount(Integer.parseInt(arr[0]), arr[1], Integer.parseInt(arr[2]));
});

// 创建s2
DataStream<String> ds2 = env
.socketTextStream("localhost", 9991);
SingleOutputStreamOperator<UserInfo> s2 = ds2.map(s -> {
String[] arr = s.split(",");
return new UserInfo(Integer.parseInt(arr[0]), arr[1], arr[1]);
});
// 将流1的数据展开
//比如,一条数据:1,event01,3需要展开成3条:
//1,event01,随机数1
//1,event01,随机数2
//1,event01,随机数3
SingleOutputStreamOperator<EventCount> flattened = s1.process(new ProcessFunction<EventCount, EventCount>() {
@Override
public void processElement(EventCount eventCount, ProcessFunction<EventCount, EventCount>.Context context, Collector<EventCount> collector) {
// 去除count
int cnt = eventCount.getCnt();
for (int i = 1; i < cnt; i++) {
collector.collect(new EventCount(eventCount.getId(), eventCount.getEventId(), RandomUtils.nextInt(10, 100)));
}
}
});
// 广播流2
// 准备一个广播状态描述器
MapStateDescriptor<Integer, UserInfo> stateDescriptor = new MapStateDescriptor<>("s", Integer.class, UserInfo.class);
// 准备一个测流输出标签
OutputTag<EventCount> cOutputTag = new OutputTag<>("c", TypeInformation.of(EventCount.class));
BroadcastStream<UserInfo> broadcast2 = s2.broadcast(stateDescriptor);
// 连接s1和s2广播流
// 流1的数据,还需要关联上流2的数据(性别,城市)
// 并且把关联失败的流1的数据,写入一个测流,否则输出到主流
BroadcastConnectedStream<EventCount, UserInfo> connectedStream = flattened.connect(broadcast2);
SingleOutputStreamOperator<EventUserInfo> joinedResult = connectedStream.process(new BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>() {
// 主流
@Override
public void processElement(EventCount eventCount, BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>.ReadOnlyContext readOnlyContext, Collector<EventUserInfo> collector) throws Exception {
ReadOnlyBroadcastState<Integer, UserInfo> broadcastState = readOnlyContext.getBroadcastState(stateDescriptor);
UserInfo userInfo;
if (broadcastState != null && (userInfo = broadcastState.get(eventCount.getId())) != null) {
collector.collect(new EventUserInfo(eventCount.getId(), eventCount.getEventId(), eventCount.getCnt(), userInfo.getGender(), userInfo.getCity()));
} else {
// 关联失败的,输出测流
readOnlyContext.output(cOutputTag, eventCount);
}
}
// 广播流处理方法
@Override
public void processBroadcastElement(UserInfo userInfo, BroadcastProcessFunction<EventCount, UserInfo, EventUserInfo>.Context context, Collector<EventUserInfo> collector) throws Exception {
// 数据放入广播状态
BroadcastState<Integer, UserInfo> broadcastState = context.getBroadcastState(stateDescriptor);
broadcastState.put(userInfo.getId(), userInfo);
}
});
// 对主流数据按照性别分组,取最大随机数所在的那一条数据,作为结果输出
SingleOutputStreamOperator<EventUserInfo> mainResult = joinedResult
.keyBy(EventUserInfo::getGender)
.maxBy("cnt");

mainResult.print("main");
joinedResult.getSideOutput(cOutputTag).print("side");
env.execute();

}
}

背景

对于小变量,小数据集,需要和大数据集,大流进行联合计算的时候,往往把小数据集广播出去,整体直接和大数据集(流)的分布式最小粒度数据进行计算,最后把计算结果合并,这样效率更高,省去分布式节点之间的数据传输及二次计算。

例如:在Flink使用场景中,外部的配置文件或计算规则及维表等进行预加载,并定期更新,流式计算中广播小变量等场景。

场景预设

  • 流s1:用户行为日志(持续不断,同一个人会反复出现,次数不定)
  • 流s2:用户信息(姓名、年龄等信息,同一个数据只有一次,作为广播流)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package com.race.wc;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class broadcast {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// * 流s1:用户行为日志(持续不断,同一个人会反复出现,次数不定)
// * 流s2:用户信息(姓名、年龄等信息,同一个数据只有一次,作为广播刘)

// s1,行为日志流 id,eventId
DataStream<String> stream1 = env
.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.map(s -> {
String[] arr = s.split(",");
return Tuple2.of(arr[0], arr[1]);
}).returns(new TypeHint<Tuple2<String, String>>() {
});

// s2,用户信息流 id,age,city
DataStream<String> stream2 = env
.socketTextStream("localhost", 9999);

SingleOutputStreamOperator<Tuple3<String, String, String>> s2 = stream2.map(s -> {
String[] arr = s.split(",");
return Tuple3.of(arr[0], arr[1], arr[2]);
}).returns(new TypeHint<Tuple3<String, String, String>>() {
});

// 将s2转换为广播流 MapStateDescriptor->hashMap

MapStateDescriptor<String, Tuple2<String, String>> userInfoStateDesc = new MapStateDescriptor<>("userInfoState", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
}));

BroadcastStream<Tuple3<String, String, String>> s2BroadcastStream = s2.broadcast(userInfoStateDesc);

// s1连接s2广播流
BroadcastConnectedStream<Tuple2<String, String>, Tuple3<String, String, String>> connect = s1.connect(s2BroadcastStream);

// s1 是否keyedStream 决定用 KeyedBroadcastProcessFunction 还是 BroadcastProcessFunction
// 左流、右流的处理
SingleOutputStreamOperator<String> resultStream = connect.process(new BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>() {


/**
* 方法:s1流每来一条处理一次
* @param element 左流数据
* @param readOnlyContext 上下文,只读形式不能做修改,安全
* @param collector 输出器
*/
@Override
public void processElement(Tuple2<String, String> element, BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {

// 取到广播状态,只读
ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = readOnlyContext.getBroadcastState(userInfoStateDesc);

if (broadcastState != null) {
// 获取广播状态的用户信息
Tuple2<String, String> userInfo = broadcastState.get(element.f0);
// 返回组装数据
collector.collect(element.f0 + "," + element.f1 + "," + (userInfo == null ? null : userInfo.f0) + "," + (userInfo == null ? null : userInfo.f1));
} else {
collector.collect(element.f0 + "," + element.f1 + "," + null + "," + null);
}
}

/**
* 方法:s2广播流处理的一条数据
* @param element 左流数据
* @param context 上下文
* @param collector 输出器
*/
@Override
public void processBroadcastElement(Tuple3<String, String, String> element, BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.Context context, Collector<String> collector) throws Exception {

// 从上下文中,获取广播状态
BroadcastState<String, Tuple2<String, String>> broadcastState = context.getBroadcastState(userInfoStateDesc);
// 然后将获得的数据 装入广播状态
broadcastState.put(element.f0, Tuple2.of(element.f1, element.f2));

}
});

resultStream.print();

env.execute();
}
}

总结

背景

共享单车及打车软件在国内较为普遍,如果数据完善,不失为一份分析的好素材,但是国内的数据确实太难寻觅,这里只找到了两份纽约的的行程数据

共享单车行程数据集:Citi Bikers
出租车行程数据集:New York City Taxi & Limousine Commission

本次以New York City Taxi作为示例数据进行分析

数据集

网站New York City Taxi & Limousine Commission提供了关于纽约市从2009-2015年关于出租车驾驶的公共数据集。

nycTaxiRides.gz

TaxiRides 行程信息。每次出行包含两条记录。type标识为 行程开始start 和 行程结束end。
数据集结构

1
2
3
4
5
6
7
8
9
10
11
rideId         : int(10)         // 唯一行程id
taxiId : int(10) // 出租车唯一id
driverId : int(10) // 出租车司机唯一id
type : varchar(5) // START行程开始,END行程结束 每次出行包含两条记录
startTime : datetime(6) // 行程开始时间
endTime : datetime(6) // 行程结束时间 对于行程开始记录该值为 1970-01-01 00:00:00
startLon : decimal(10,6) // 开始行程的经度
startLat : decimal(10,6) // 开始行程的纬度
endLon : decimal(10,6) // 结束的经度
endLat : decimal(10,6) // 结束的纬度
passengerCnt : int(3) // 乘客数量

TaxiRides 数据示例

rideId,type,startTime,endTime,startLon,startLat,endLon,endLat,passengerCnt,driverId,taxiId

8cUzkd

nycTaxiFares.gz

TaxiFares 费用信息。 与上面行程信息对应

1
2
3
4
5
6
7
8
rideId         : int(10)      // 唯一行程id
taxiId : int(10) // 出租车唯一id
driverId : int(10) // 出租车司机唯一id
startTime : datetime(6) // 行程开始时间
paymentType : varchar(6) // 现金(CASH)或刷卡(CARD)
tip : float // 小费
tolls : float // 过路费
totalFare : float // 总计车费

TaxiFares 数据示例

rideId,taxiId,driverId,startTime,paymentType,tip,tolls,totalFare

k0RZE3

目标

1、将每次车程的 TaxiRide 和 TaxiFare 记录依据相同的rideId连接在一起
2、对于每个不同的 rideId,恰好有三个事件:

  • TaxiRide START 事件
  • TaxiRide END 事件
  • 一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)

最终完成一个 RideAndFare 的输出

3、对这个输出进行统计过滤聚合完成案例要求

我们暂时假设数据流是通过kafka进行传输
FRPL8q

案例

基础操作:生成数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 定义出租车-车程数据源
KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_RIDE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("ride")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiRideDeserialization())
.build();

// 定义出租车-车费数据源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();

基础操作:过滤和连接

过滤

例如我们现在只想查看发生在纽约的行车记录。

1
2
3
4
5
6
7
8
9
10
11
public class GeoUtils {
// geo boundaries of the area of NYC
public static double LON_EAST = -73.7;
public static double LON_WEST = -74.05;
public static double LAT_NORTH = 41.0;
public static double LAT_SOUTH = 40.5;
public static boolean isInNYC(double lat, double lon) {
return !(lon > LON_EAST || lon < LON_WEST) &&
!(lat > LAT_NORTH || lat < LAT_SOUTH);
}
}

过滤器

1
2
3
4
5
6
7
private static class NYCFilter implements FilterFunction<TaxiRide> {  
@Override
public boolean filter(TaxiRide taxiRide) throws Exception {
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&
GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
}
连接

我们需要把TaxiRide和TaxiFare两者的数据记录结合。在这个过程中,我们要同时处理两个source的流数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.example.datastream.rideandfare;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;


public class RideAndFareJob {

public static void main(String[] args) throws Exception {

// 初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000L);
env.setStateBackend(new FsStateBackend("file:///mnt/data/checkpoints"));

env.setParallelism(2);

// 定义出租车-车程数据源
KafkaSource<TaxiRide> rideSource = KafkaSource.<TaxiRide>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_RIDE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("ride")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiRideDeserialization())
.build();

// 定义出租车-车费数据源
KafkaSource<TaxiFare> fareSource = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();


// 从车程事件中过滤类型为Start的车程数据,并按车程标识 rideId 分组
KeyedStream<TaxiRide, Long> rideStream = env.fromSource(rideSource, WatermarkStrategy.noWatermarks(), "ride source")
.filter(ride -> ride.type==='START').keyBy(TaxiRide::getRideId);

// 付车费事件按行程标识 rideId 分组
KeyedStream<TaxiFare, Long> fareStream = env.fromSource(fareSource, WatermarkStrategy.noWatermarks(), "fare source")
.keyBy(TaxiFare::getRideId);

rideStream.connect(fareStream).flatMap(new EnrichmentFunction())
.uid("enrichment") // uid for this operator's state
.name("enrichment") // name for this operator in the web UI
.addSink(new PrintSinkFunction<>());


env.execute("Join Rides with Fares");
}
}

案例3:每种乘客数量的行车事件数

我们的另一个需求是计算搭载每种乘客数量的行车事件数。也就是搭载1个乘客的行车数、搭载2个乘客的行车… 当然,我们仍然只关心纽约的行车事件。
TODO:

案例4:每5分钟的进入的车辆数

为了持续地监测纽约的交通流量,需要计算出每个区块每5分钟的进入的车辆数。我们只关心至少有5辆车子进入的区块。
TODO:

案例5:收入最高出租车司机

我们想统计每个小时收入topN的司机
TODO:

相关链接

flink-training
ververica
flink-learning-in-action

http://wuchong.me/blog/2019/08/20/flink-sql-training/
https://www.cnblogs.com/bjwu/p/9973521.html
https://article.itxueyuan.com/0Kp2pR
https://www.cnblogs.com/luxh/p/16427196.html

背景

Flink中的时间概念

Flink在流处理程序支持不同的时间概念。分别为Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间。

从时间序列角度来说,发生的先后顺序是:

1
事件时间(Event Time)----> 提取时间(Ingestion Time)----> 处理时间(Processing Time)
  • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
  • Ingestion Time 是数据进入Apache Flink流处理系统的时间,也就是Flink读取数据源时间。
  • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是Flink程序处理该事件时当前系统时间。

mU8ra5

处理时间

是数据流入到具体某个算子时候相应的系统时间。

这个系统时间指的是执行相应操作的机器的系统时间。当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。

提取时间

提取时间在概念上位于事件时间和处理时间之间。IngestionTime是数据进入Apache Flink框架的时间,提取时间在概念上位于事件时间和处理时间之间。

事件时间

事件时间就是事件在真实世界的发生时间,即每个事件在产生它的设备上发生的时间(当地时间)。比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。
基于事件时间处理的强大之处在于即使在乱序事件,延迟事件,历史数据以及从备份或持久化日志中的重复数据也能获得正确的结果。对于事件时间,时间的进度取决于数据,而不是任何时钟。

事件时间程序必须指定如何生成事件时间的Watermarks,这是表示事件时间进度的机制。

现在假设我们正在创建一个排序的数据流。这意味着应用程序处理流中的乱序到达的事件,并生成同样事件但按时间戳(事件时间)排序的新数据流。

比如:

1
2
3
有1~10个事件。
乱序到达的序列是:1,2,4,5,6,3,8,9,10,7
经过按 事件时间 处理后的序列是:1,2,3,4,5,6,7,8,9,10

为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每条数据都需要分配其事件时间戳。这通常通过提取每条数据中的固定字段来完成时间戳的获取。

watermark(水位线)

Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。

比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。

简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

例如:

1
2
3
4
5
使用基于事件时间的窗口策略,每5分钟创建一个不重叠(或翻滚)的窗口并允许延迟1分钟。

假定目前是12:00。

当具有落入该间隔的时间戳的第一个元素到达时,Flink将为12:00到12:05之间的间隔创建一个新窗口,当水位线(watermark)到12:06时间戳时将删除它。

window(窗口)

  • 滚动窗口(Tumbling Windows)
    滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为5分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。

  • 滑动窗口(Sliding Windows)
    滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。
    例如,你可以使用窗口大小为10分钟的窗口,滑动大小为5分钟。这样,每5分钟会生成一个窗口,包含最后10分钟内到达的事件。

  • 会话窗口(Session Window)
    会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。
    例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。

  • 全局窗口(Global Window)

如何处理watermark & window 的触发

对于out-of-order(乱序)及正常的数据而言

  • watermark的时间戳 > = window endTime
  • 在 [window_start_time,window_end_time] 中有数据存在。

对于late element太多的数据而言

  • Event Time > watermark的时间戳

看看如何触发窗口
我们明白了窗口的触发机制,这里我们添加了水位线,到底是个怎么个情况?我们来看下面
假如我们设置10s的时间窗口(window),那么0-10s,10-20s都是一个窗口,以0~10s为例,0为start-time,10为end-time。
假如有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒

1
2
3
4
当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算
当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算

触发计算的时候,会将A,C(因为他们都小于10)都计算进去,其中C是迟到的。
max这个很关键,就是当前窗口内,所有事件的最大事件。
这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算。假设加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。
从这里上面E的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。

时间语义示例

1.12之前的语义(已deprecated)

1.12以前,flink默认以processing time作为默认的时间语义,可以在env上设置所想要的时间语义;

1
2
3
4
5
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

1.12以后的语义

1.12及以后,flink默认以上述的event time 作为默认的时间语义(已deprecated)
再需要指定时间语义的相关操作(如时间窗口)时,可以通过显式的api来使用特定的时间语义

  • 滚动窗口
    1
    2
    keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
    keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  • 滑动窗口
    1
    2
    keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))
    keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)))

如果要禁用event time机制,则可以通过设置watermark生成评率间隔来实现

1
ExecutionConfig.setAutoWatermarkinterval(long);

watermark生成策略

1.12之前(已过期)

  • AssignerWithPeriodicWatermarks 周期性生成watermark
  • AssignerWithPunctuatedWatermarks 按指定标记性时间生成watermark

1.12之后,新版api内置的watermark策略

  • 单调递增的watermark生成策略 (完全不容忍乱序)
    czSNzw
    WatermarkStrategy.forMonotonousTimestamps();
  • 允许乱序的watermark生成策略
    W2gSLW
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
  • 自定义watermark生成策略
    WatermarkStrategy.forGenerator(new WatermarkGenerator(){...})

watermark策略的代码模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1,e01,16876768678,pg01
DataStreamSource<String> s1 = env.socketTextStream( hostname: "localhost",port: 9999) ;

//给上面的source 算子,添加watermark生成策略

//策略1: WatermarkStrategy.nowatermarks() 不生成watermark,禁用了事件时间的推进机制
//策略2: WatermarkStrategy. forMonotonousTimestamps() 有序,紧跟最大事件事件
//策略3: WatermarkStrategy.forBoundedOutOfOrderness() 无序
//策略4: WatermarkStrategy.forGenerator() 自定义

s1.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner((element, timestamp) -> Long.parseLong(element.split(regex:",")[2]));
)

窗口的代码模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 一、各种全局窗口开窗api
*/

// 全局 计数滚动窗口
beanStream.countWindowAll(10) // 10条数据一个窗口
.apply(new AllWindowFunction<EventBean2, String, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<EventBean2> values, Collector<String> out) throws Exception {

}
});


// 全局 计数滑动窗口
beanStream.countWindowAll(10, 2); // 窗口长度为10条数据,滑动步长为2条数据
/*.apply()*/


// 全局 事件时间滚动窗口
beanStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) // 窗口长度为30s的滚动窗口
.apply(new AllWindowFunction<EventBean2, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<EventBean2> values, Collector<String> out) throws Exception {

}
});


// 全局 事件时间滑动窗口
beanStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))); // 窗口长度:30s,滑动步长:10s
/*.apply()*/

// 全局 事件时间会话窗口
beanStream.windowAll(EventTimeSessionWindows.withGap(Time.seconds(30))); // 前后两个事件的间隙超过30s就划分窗口
/*.apply()*/

// 全局 处理时间滚动窗口
beanStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30)));


// 全局 处理时间滑动窗口
beanStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));

// 全局 处理间会话窗口
beanStream.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(30)));


/**
* 二、各种Keyed窗口开窗api
*/

KeyedStream<EventBean2, Long> keyedStream = beanStream.keyBy(EventBean2::getGuid);

// Keyed 计数滚动窗口
keyedStream.countWindow(10);


// Keyed 计数滑动窗口
keyedStream.countWindow(10, 2);


// Keyed 事件时间滚动窗口
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(30)));


// Keyed 事件时间滑动窗口
keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10)));


// Keyed 事件时间会话窗口
keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(30)));


// Keyed 处理时间滚动窗口
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));


// Keyed 处理时间滑动窗口
keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)));


// Keyed 处理时间会话窗口
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)));
0%