headscale保底设施之DERP中继服务器自建
背景
上次我们完成了headscale私有部署,但是为了保证headscale组网中的稳定和可靠性,为设备p2p连接设置一个保底措施,我们需要自建derp服务器。
什么是DERP
DERP 即 Detoured Encrypted Routing Protocol,这是 Tailscale 自研的一个协议:
- 它是一个通用目的包中继协议,运行在 HTTP 之上,而大部分网络都是允许 HTTP 通信的。
- 它根据目的公钥(destination’s public key)来中继加密的流量(encrypted payloads)。
Tailscale 会自动选择离目标节点最近的 DERP server 来中继流量
Tailscale 使用的算法很有趣: __所有客户端之间的连接都是先选择 DERP 模式(中继模式),这意味着连接立即就能建立(优先级最低但 100% 能成功的模式),用户不用任何等待。然后开始并行地进行路径发现,通常几秒钟之后,我们就能发现一条更优路径,然后将现有连接透明升级(upgrade)过去,变成点对点连接(直连)__。
因此, DERP 既是 Tailscale 在 NAT 穿透失败时的保底通信方式(此时的角色与 TURN 类似),也是在其他一些场景下帮助我们完成 NAT 穿透的旁路信道。换句话说,它既是我们的保底方式,也是有更好的穿透链路时,帮助我们进行连接升级(upgrade to a peer-to-peer connection)的基础设施。
Tailscale 官方内置了很多 DERP 服务器,分步在全球各地,惟独不包含中国大陆,原因你懂得。这就导致了一旦流量通过 DERP 服务器进行中继,延时就会非常高。而且官方提供的 DERP 服务器是万人骑,存在安全隐患。
官方内置了很多 DERP 服务器
1 | - tok: 96.6ms (Tokyo) |
为了实现低延迟、高安全性,我们可以参考 Tailscale 官方文档自建私有的 DERP 服务器。
Custom DERP Servers
部署DERP
还是docker
1 | version: '3.5' |
- stunport: 3478 默认情况下也会开启 STUN 服务,UDP 端口是 3478
- derpport: 23479
1 | 2022/09/06 01:21:27 no config path specified; using /var/lib/derper/derper.key |
部署好 derper 之后,就可以修改 Headscale 的配置来使用自定义的 DERP 服务器了。Headscale 可以通过两种形式的配置来使用自定义 DERP:
一种是在线 URL,格式是 JSON,与 Tailscale 官方控制服务器使用的格式和语法相同。
另一种是本地文件,格式是 YAML。
我们可以直接使用本地的 YAML 配置文件,内容如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14# /etc/headscale/derp.yaml
regions:
901:
regionid: 901
regioncode: gz
regionname: Tencent Guangzhou
nodes:
- name: 901a
regionid: 901
hostname: '实际域名'
ipv4: '可不需要'
stunport: 3478
stunonly: false
derpport: 23479regions 是 YAML 中的对象,下面的每一个对象表示一个可用区,每个可用区里面可设置多个 DERP 节点,即 nodes。
每个可用区的 regionid 不能重复。每个 node 的 name 不能重复。
regionname 一般用来描述可用区,regioncode 一般设置成可用区的缩写。
ipv4 字段不是必须的,如果你的域名可以通过公网解析到你的 DERP 服务器地址,这里可以不填。如果你使用了一个二级域名,而这个域名你并没有在公共 DNS server 中添加相关的解析记录,那么这里就需要指定 IP(前提是你的证书包含了这个二级域名,这个很好支持,搞个泛域名证书就行了)。
stunonly: false 表示除了使用 STUN 服务,还可以使用 DERP 服务。
hostname和 ipv4部分根据你的实际情况填写。
接下来还需要修改 Headscale 的配置文件,引用上面的自定义 DERP 配置文件。需要修改的配置项如下:
1 | # /etc/headscale/config.yaml |
在 Tailscale 客户端上使用以下命令查看目前可以使用的 DERP 服务器:
1 | $ tailscale netcheck |
tailscale netcheck 实际上只检测 3478/udp 的端口, 就算 netcheck 显示能连,也不一定代表 23479 端口可以转发流量。最简单的办法是直接打开 DERP 服务器的 URL:https://derp.XXXX.cn:23479,如果看到如下页面,且地址栏的 SSL 证书标签显示正常可用,那才是真没问题了。
Tailscale 命令行工具来测试:
查看与通信对端的连接方式及状态
1 | tailscale status |
ping一下
1 | tailscale ping 100.64.0.2 |
这个tailscale ping更加友好一点,会直接告诉你是通过DERP中继服务器还是点对点来和对方通信的。
DERP安全
我们总要设置一下DERP的访问权限,不能成为万人骑
步骤:
1、在 DERP 服务器上安装 Tailscale。
2、derper 启动时加上参数 –verify-clients
tip:因为derp是通过本地tailscale进行检测,所以docker部署的derper必须在容器内启有Tailscale客户端的实例,我们把服务器的
/var/run/tailscale/tailscaled.sock
映射进入derp容器目录中,让derp能检测到tailscale
tip:如果是使用docker安装的tailscale,请将/var/run/tailscale/tailscaled.sock映射出来,然后再映射进入derp,即让derp和tailscale容器共享volume(/var/run/tailscale/tailscaled.sock)
1 | version: '3.5' |
headscale私有部署
需求
办公需求:
设备:
1、synology DS918+
2、macbook pro m1
3、mac mini
问题:访问家里nas慢的问题,及工作效率能快速访问的问题公司跨机房需求:
1、erp金蝶系统-sqlServer服务-公司内部机房
2、采购系统-mongodb服务-阿里云服务器
问题:采购系统上访问ERP速度慢的问题,及双流cdc的需求
设想这样一个问题:在北京和上海各有一台局域网的机器
(例如一台是家里的台式机,一 台是连接到星巴克 WiFi 的笔记本),二者都是私网 IP 地址,但可以访问公网, 如何让这两台机器通信呢
?
既然二者都能访问公网,那最简单的方式当然是在公网上架设一个中继服务器: 两台机器分别连接到中继服务,后者完成双向转发。这种方式显然有很大的性能开销,而 且中继服务器很容易成为瓶颈。
有没有办法不用中继,让两台机器直接通信呢
?
那就是:跨过公网(互联网)实现端到端
直连。
TailScale 是什么, Headscale又是什么
TailScale
TailScale 你可以理解为 VPN,或者说 Wireguard 外面包了一层壳子。它可以将一些设备连接起来,形成一个虚拟局域网。一个很简单的例子,你可以在星巴克里,读取家里任意电脑上的文件。
比如最常见的需求就是,公司有一个内网办公环境,当你外出办公时,也希望你的电脑能够接入办公网络。 因为外网的机器和内网的机器不能互联,所以一般会有一个中心服务器, 所有的子节点都和中心服务器相连,然后中心服务器转发所有的流量。
这样做的缺点显而易见,首先是中心服务器(hub)会成为瓶颈
。 其次,某种极端情况下,如果节点 A 和 节点 B 距离非常近,但是都离 hub 很远, 这样就会导致非常高的延迟。
那么我们就会想,能不能让节点间直接互联呢? 这就是 mesh VPN,其实现就是 wireguard。
wireguard 的每一个节点都会存储其他所有节点的信息,并且和其他所有的节点都建立 tls 连接。 如果涉及到内网穿透的话,那么你需要找到一台处于网关位置的节点(内外网都可达),将其设置为 coordinator, 扮演类似于 hub 的角色, 分发穿透内外网的流量。
wireguard 的缺点在于:
- 配置比较繁琐
- 维护也比较困难,增删节点都需要改动所有节点的配置
基于上述这些痛点,TailScale 做了一些改进:
1、在原有的 ICE、STUN 等 UDP 协议外,实现了 DERP TCP 协议来实现 NAT 穿透
2、基于公网的 coordinator 服务器下发 ACL 和配置,实现节点动态更新
3、通过第三方(如 Google) SSO 服务生成用户和私钥,实现身份认证
对我来说 Tailscale 相对于 Wireguard 最大的优势有 3 点:
1、配置简单
2、支持 P2P 和中继自动切换
3、支持自建中继节点
作为一种搭建虚拟局域网的工具,相比于传统VPN而言,所有节点之间都可以进行P2P连接,也就是全互联模式,效率更高。
为了基于wireguard实现更完美的VPN工具,现在已经出现了很多项目,如Netmaker,通过可视化界面来配置全互联模式,并支持UDP打洞、多租户等高端功能,几乎适配所有平台。然而现实世界是复杂的,无法保证所有的NAT都能打洞成功,而且Netmaker目前不支持fallback机制,如打洞失败,无法fallback中继节点。而Tailscale支持fallback,可以尽最大努力实现全互联模式,部分节点即使打洞不成功,也可通过中继节点在虚拟局域网内畅通无阻。
简而言之,我们可以认为 TailScale 是更为易用版的、功能封装更为完善的 wireguard。
Headscale
Headscale看名字就知道是和 Tailscale 对着干的,Tailscale 的客户端是不收费的,服务端是不开源的,超过 20 个设备就需要付费了,并且Tailscale的服务不在国内,我们不可能把服务端安全的交给Tailscale,私有自建才是出入。Headscale 是个第三方开源版本的 Tailscale 的服务端,除了「网站界面」之外该有的功能都有,因此我们使用Headscale自建私有服务。
解决方案
所以本次的方案:
1、阿里云服务器 部署 Headscale 服务端
2、自己的 Mac 和 Nas 上,使用 Tailscale 作为客户端
或公司,mongodb所在的服务器与sqlServer所在的服务器组网
服务端
Docker 部署
1 | version: '3.5' |
手动新建一下配置文件,放入./config
1 | wget https://github.com/juanfont/headscale/raw/main/config-example.yaml -O config.yaml |
编辑一下它:
- server_url 这个东西是一个简单的「HTTP 设备认证页面」,后面需要暴露在公网上,其他设备如果想加入到你的网络里面,需要访问这个地址,拿到一个 Token。有域名的话推荐用域名+nginx/caddy 反向代理没域名的话用 ip+端口。
- ip_prefixes 可以根据自己喜好来改,留默认的当然也行
docker-compose up -d
启动 docker
下一步,创建一个「租户」,你可以理解为一个用户。进到 headscale 的 docker 里面,执行
1 | headscale namespaces create SOME_NAME |
SOME_NAME随意替换
客户端
并非全部客户端都支持(或全部支持)将 Headscale 作为后端,目前适配情况如下:
操作系统 | 支持情况 | 解决方案 |
---|---|---|
Linux | ✅ | 原生支持 |
Windows | ✅ | 修改注册表 |
macOS | ✅ | 需要添加描述文件 |
Android | ✅ | |
iOS | ❌ | 目前不支持 |
Mac 端:AppStore 版客户端
由于是使用的「非官方版控制器」,所以我们需要稍微 hack 一下,将软件里面默认的「tailscale 官方控制服务器地址」改为我们自己搭建的 Headscale 地址。
访问 http://server_url/apple
,下载并安装一个描述文件。
然后打开 Tailscale,点击登录,会看到一个英文界面,里面有一行命令
1 | headscale -n NAMESPACE nodes register --key SOME_HEX_VALUE |
将里面的 NAMESPACE 换成你创建的租户名称,然后去服务端 docker 里面,执行它。你就会发现,你的 mac 已经登录成功了。
Linux端:Docker 版客户端
1 | version: '3.3' |
启动容器后,需要进入容器,输入这个东西进行登录:
1 | tailscale up --login-server=http://server_url --accept-routes=true --accept-dns=false |
如果没问题,那么会提示你访问一个网址,拿到 Token,回到 Server 端进行登录就好。
synology
Access Synology NAS from anywhere
Tailscale Packages
tailscale up 参数
- –hostname
设置名称 - –accept-routes 接受服务端配置的路由
- –accept-dns 推荐将 DNS 功能关闭,因为它会覆盖系统的默认 DNS
- –advertise-routes
申请路由到该节点,Tailscale 实现「出口节点」,即打通局域网内部的路由访问,这个网段的所有设备都可以被访问 - –advertise-exit-node 可以建立数据导向节点exit node,即本机访问互联网皆通过此节点 bolean
- –exit-node
指定出口节点,导向所有流量经这出口节点
headscale命令
通过服务端授权客户端
1 | headscale -n default nodes register --key 905cf165204800247fbd33989dbc22be95c987286c45aac3033937041150d846 |
查看注册的节点
1 | headscale nodes list |
通过 Pre-Authkeys 接入
前面的接入方法都需要服务端同意,步骤比较烦琐,其实还有更简单的方法,可以直接接入,不需要服务端同意。
首先在服务端生成 pre-authkey 的 token,有效期可以设置为 24 小时:
1 | headscale preauthkeys create -e 24h -n default |
查看已经生成的 key:
1 | headscale -n default preauthkeys list |
现在新节点就可以无需服务端同意直接接入了:
1 | tailscale up --login-server=http://<HEADSCALE_PUB_IP>:8080 --accept-routes=true --accept-dns=false --authkey $KEY |
打通局域网
假设你的家庭内网有一台 Linux 主机(比如 Nas)安装了 Tailscale 客户端,我们希望其他 Tailscale 客户端可以直接通过家中的局域网 IP(例如 192.168.100.0/24) 访问家庭内网的任何一台设备。
官方文档:Subnet routers and traffic relay nodes
群辉具体操作见:headscale组网打通群辉局域网内部访问
linux 配置方法很简单,首先需要设置 IPv4 与 IPv6 路由转发:
1 | echo 'net.ipv4.ip_forward = 1' | tee /etc/sysctl.d/ipforwarding.conf |
客户端修改注册节点的命令,在原来命令的基础上加上参数 --advertise-routes=192.168.100.0/24
。
1 | tailscale up --login-server=http://<HEADSCALE_PUB_IP>:8080 --accept-routes=true --accept-dns=false --advertise-routes=192.168.100.0/24 |
在 Headscale 端查看路由,可以看到相关路由是关闭的。
1 | headscale nodes list|grep nas |
开启路由:
1 | headscale routes enable -i 6 -r "192.168.100.0/24" |
其他节点查看路由结果:
1 | ip route |
现在你在任何一个 Tailscale 客户端所在的节点都可以 ping 通家庭内网的机器了
为了能让headscale稳定,我们可以自建中继服务器
headscale之DERP中继服务器自建
相关链接
Flink窗口之增量(滚动)聚合算子与全量聚合算子
- 增量(滚动)聚合算子:如min、max、minBy、maxBy、sum、reduce、aggregate
一次取一条数据,用聚合函数对中间累加器更新;窗口触发时,取累加器输出结果;
优点:实时性提升,性能比较好,数据一进入窗口就计算,仅仅缓存计算中间值
缺点:窗口数据无法排序,无法获取窗口信息 - 全量聚合蒜子:如apply、process
数据”攒”在状态容器中,窗口触发时,把整个窗口的数据交给聚合函数
优点:对窗口中所有数据排序、获取窗口信息,比如时间窗口,获取窗口开始时间和结束时间
缺点:窗口数据量较大时,数据可能很多,一起处理,比较耗时
增量聚合示例
简单聚合算子
1 | keyedStream.countWindow(5,2) |
reduce 聚合算子
1 | source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) |
aggregate 聚合算子
1 | watermarkedBeanStream |
全量聚合示例
apply 聚合算子
1 | watermarkedBeanStream |
process 聚合算子(richFunction)
1 | watermarkedBeanStream |
代码示例
1 | package cn.doitedu.flink.java.demos; |
Flink流平台-实践sqlServer单数据源CDC之双流join
背景准备
曾经在Flink CDC同步mysql和pg关联数据到es实践尝试使用flink进行多数据源双流join到es,本次实践使用流平台单数据源双流join。
我们快速用docker搭建起来sqlserver,
记住开启sqlserver cdc:
sys.sp_cdc_enable_table
docker-compose
1 | version: '2.1' |
并创建一些数据
1 | -- Sqlserver |
使用流平台快速搭建数据流
本次我们从sqlserver获取数据,并通过print直接输出到终端
1 | CREATE TABLE products ( |
启动sqlServer_cdc_print
Flink流平台-实践mongodb CDC
背景
半年前提笔写下了monstache实践mongodb同步es,半年后因为搭建完实时数据流平台,我将用flink sql实时流平台来尝试本次的实践,对标 monstache实践mongodb同步es.
monstache | Flink CDC | |
---|---|---|
近实时 | ☑️ | ☑️ |
支持(旧数据)全量同步 | ☑️ | ☑️ |
支持增量同步(增删改) | ☑️ | ☑️ |
是否现在社区主流 | ☑️是 | ☑️未来主流 |
同步方式 | 数据层oplog | 数据层oplog流处理 |
实践
mongodb-material表数据结构
1 | { |
添加实时流任务
Flink SQL:
1 | CREATE TABLE material ( |
添加Maven pom 或者Upload Jar
- flink-sql-connector-elasticsearch7_2.11-1.13.2.jar
- flink-sql-connector-mongodb-cdc-2.2.1.jar
1 | <dependency> |
LDAP
概念
DC、UID、OU、CN、SN、DN、RDN
关键字 | 英文全称 | 含义 |
---|---|---|
dc | Domain Component | 域名的部分,其格式是将完整的域名分成几部分,如域名为example.com变成dc=example,dc=com(一条记录的所属位置) |
uid | User Id | 用户ID songtao.xu(一条记录的ID) |
ou | Organization Unit | 组织单位,组织单位可以包含其他各种对象(包括其他组织单元),如“oa组”(一条记录的所属组织) |
cn | Common Name | 公共名称,如“Thomas Johansson”(一条记录的名称) |
sn | Surname | 姓,如“许” |
dn | Distinguished Name | “uid=songtao.xu,ou=oa组,dc=example,dc=com”,一条记录的位置(唯一) |
rdn | Relative dn | 相对辨别名,类似于文件系统中的相对路径,它是与目录树结构无关的部分,如“uid=tom”或“cn= Thomas Johansson” |
Flink之process、广播、主流、侧流、分组、分组最大值、sink实践
需求
流1
eventCtn
id eventId cnt 1 event01 3 1 event02 2 2 event02 4 流2
userInfo
id gender city 1 male shanghai 2 female beijing
- 将流1的数据展开
比如,一条数据:1,event01,3
需要展开成3条:1,event01,随机数1
1,event01,随机数2
1,event01,随机数3
- 流1的数据,还需要关联上流2的数据(性别,城市)
- 并且把关联失败的流1的数据,写入一个测流,否则输出到主流
- 对主流数据按照性别分组,取最大随机数所在的那一条数据,作为结果输出
- 把测流处理结果,写入文件系统,并写成parquet格式
- 把主流处理结果,写入mysql,并实现幂等更新
实践
1 | package com.race.wc.exercise; |
Flink之广播BroadcastStream
背景
对于小变量,小数据集,需要和大数据集,大流进行联合计算的时候,往往把小数据集广播出去,整体直接和大数据集(流)的分布式最小粒度数据进行计算,最后把计算结果合并,这样效率更高,省去分布式节点之间的数据传输及二次计算。
例如:在Flink使用场景中,外部的配置文件或计算规则及维表等进行预加载,并定期更新,流式计算中广播小变量等场景。
场景预设
- 流s1:用户行为日志(持续不断,同一个人会反复出现,次数不定)
- 流s2:用户信息(姓名、年龄等信息,同一个数据只有一次,作为广播流)
1 | package com.race.wc; |
总结
Flink之New York City Taxi
背景
共享单车及打车软件在国内较为普遍,如果数据完善,不失为一份分析的好素材,但是国内的数据确实太难寻觅,这里只找到了两份纽约的的行程数据
共享单车行程数据集:Citi Bikers
出租车行程数据集:New York City Taxi & Limousine Commission
本次以New York City Taxi作为示例数据进行分析
数据集
网站New York City Taxi & Limousine Commission提供了关于纽约市从2009-2015年关于出租车驾驶的公共数据集。
数据集转成mysql文件(可直接导入mysql)
nyc.sql.zip
nycTaxiRides.gz
TaxiRides 行程信息。每次出行包含两条记录。type标识为 行程开始start 和 行程结束end。
数据集结构
1 | rideId : int(10) // 唯一行程id |
TaxiRides 数据示例
rideId,type,startTime,endTime,startLon,startLat,endLon,endLat,passengerCnt,driverId,taxiId
nycTaxiFares.gz
TaxiFares 费用信息。 与上面行程信息对应
1 | rideId : int(10) // 唯一行程id |
TaxiFares 数据示例
rideId,taxiId,driverId,startTime,paymentType,tip,tolls,totalFare
目标
1、将每次车程的 TaxiRide 和 TaxiFare 记录依据相同的rideId连接在一起
2、对于每个不同的 rideId,恰好有三个事件:
- TaxiRide START 事件
- TaxiRide END 事件
- 一个 TaxiFare 事件(其时间戳恰好与开始时间匹配)
最终完成一个 RideAndFare 的输出
3、对这个输出进行统计过滤聚合完成案例要求
我们暂时假设数据流是通过kafka进行传输
案例
基础操作:生成数据流
1 | // 定义出租车-车程数据源 |
基础操作:过滤和连接
过滤
例如我们现在只想查看发生在纽约的行车记录。
1 | public class GeoUtils { |
过滤器
1 | private static class NYCFilter implements FilterFunction<TaxiRide> { |
连接
我们需要把TaxiRide和TaxiFare两者的数据记录结合。在这个过程中,我们要同时处理两个source的流数据。
1 | package com.example.datastream.rideandfare; |
案例3:每种乘客数量的行车事件数
我们的另一个需求是计算搭载每种乘客数量的行车事件数。也就是搭载1个乘客的行车数、搭载2个乘客的行车… 当然,我们仍然只关心纽约的行车事件。
TODO:
案例4:每5分钟的进入的车辆数
为了持续地监测纽约的交通流量,需要计算出每个区块每5分钟的进入的车辆数。我们只关心至少有5辆车子进入的区块。
TODO:
案例5:收入最高出租车司机
我们想统计每个小时收入topN的司机
TODO:
相关链接
flink-training
ververica
flink-learning-in-action
http://wuchong.me/blog/2019/08/20/flink-sql-training/
https://www.cnblogs.com/bjwu/p/9973521.html
https://article.itxueyuan.com/0Kp2pR
https://www.cnblogs.com/luxh/p/16427196.html