flink sql 自定义udf实践之表值函数
上次我们试了下标量函数flink sql 自定义udf实践之标量函数
这次我们来试一下表值函数
1 | package org.example; |
上次我们试了下标量函数flink sql 自定义udf实践之标量函数
这次我们来试一下表值函数
1 | package org.example; |
方案待验证
业务上需要对一张含有商品ID的表进行打宽,把商品更多属性打宽到es供上下游es查询,遇到如下问题
1、商品表将作为1张维表,而这张维表数据量达到了200W+,对储存、计算(内存)存在压力
2、商品表在mongodb,而生态上没有source的connecter能支持到lookup join
3、首次发现mongodb Temporal join 仅支持主键_id
。
以前基本都是mysql cdc,没太关注mongo,但这几个问题的出现,让我首次关注到mongodb在flink生态的支持程度。那么我们就换一个思路,引入flink table store湖仓,来解决ODS到DWD再到ADS这些问题:
模拟场景:假设我的订单明细表(order_item)有product_id,一般产品的更多产品参数信息是不会都存到订单上,我需要将我的订单明细表通过产品表(product)这个维表进行打宽,然后写到es,供上下游进行根据商品信息搜索订单,或者进行聚合统计商品订购top10.
1 | -- 创建并使用 FTS Catalog |
这时候我们在flink table store 创造了一个CDC出来的维表。
1 |
|
flink table store Lookup Join
flink-table-store-101
Flink Table Store 0.3 构建流式数仓最佳实践
1 |
|
Kubernetes Ingress, Istio Ingressgateway还是 Gateway API? 之 Kubernetes Gateway API
篇
随着Istio 1.16.0的正式发布,也宣布了 Istio 基于Kubernetes Gateway API的实现进入到了 Beta 阶段,这意味着 Istio 中所有南北向(Ingress)流量管理都可以迁移至 Kubernetes Gateway API。未来随着 Kubernetes Gateway API 的发展和成熟,Istio 东西向(Mesh)流量管理 API 也会被其慢慢代替。
不得不说这一新版本的Gatway API 连安装也变了。
如果要使用 Kubernetes Gateway API 进行流量管理,需要先满足以下条件:
任务:通过gateway把域名httpbin.example.com指向k8s服务httpbin,并配置https
核心重点:Gateway和HTTPRoute使用
这次连以前的安装方式都改了
1 | kubectl apply -f https://github.com/kubernetes-sigs/gateway-api/releases/download/v0.5.1/standard-install.yaml |
安装 Istio。下载 Istio 1.16.1 版本,并使用 minimal配置文件进行安装,此配置仅会安装控制平面组件
1 | $ curl -L https://istio.io/downloadIstio | sh - |
1.使用 Istio 提供的 samples 模板部署 httpbin 应用程序:
1 | $ kubectl apply -f samples/httpbin/httpbin.yaml |
2.创建 istio-ingress namespace 并部署 Gateway 和 HTTPRoute,将访问 httpbin.example.com/get/* 的流量导入 httpbin 服务的 8000 端口:
将上面文件放进去istio的 samples/httpbin/gateway-api/httpbin-gateway.yaml
1 | apiVersion: gateway.networking.k8s.io/v1beta1 |
1 | $ kubectl create namespace istio-ingress |
等待 Gateway 部署完成后设置 Ingress Host 环境变量:
1 | $ kubectl wait -n istio-ingress --for=condition=ready gateways.gateway.networking.k8s.io gateway |
使用 curl 访问 httpbin 服务:
1 | $ curl -s -I -H Host:httpbin.example.com "http://$INGRESS_HOST/get" |
测试一下访问没有配置过的路由 headers,会返回 HTTP 404 错误:
1 | $ curl -s -I -H Host:httpbin.example.com "http://$INGRESS_HOST/headers" |
更新路由规则,添加 headers 路由配置,并为请求加上自定义 Header:
1 | apiVersion: gateway.networking.k8s.io/v1beta1 |
再次访问 headers 路由,可以正常访问,请求头中也被加上了 “My-Added-Header”:
1 | $ curl -s -H Host:httpbin.example.com "http://$INGRESS_HOST/headers" |
Kubernetes Ingress, Istio Ingressgateway还是 Gateway API? 之 Istio Ingressgateway
篇
任务:通过gateway把域名httpbin.example.com指向k8s服务httpbin,并配置https
核心重点:istio的Gateway和VirtualService使用
前置条件:
1 | apiVersion: v1 |
接下来,我们需要配置Gateway和VirtualService,如果你用过nginx,我们可以粗略的类比为server。其中Gateway相当于配置了server的基本信息,而VirtualService相当于配置了location。前者主要是配置域名端口等,后者配路由规则。
1 | --- |
已为 httpbin 服务创建了Virtual Service配置,包含两个路由规则,允许流量流向路径 /status 和 /delay。
Gateways 列表规约了哪些请求允许通 httpbin-gateway 网关。 所有其他外部请求均被拒绝并返回 404 响应。
1 | kubectl patch service istio-ingressgateway -n istio-system -p '{"spec":{"type":"NodePort"}}' |
1 | 查看80绑定的端口为30984 |
1 | ## 因为VirtualService 有匹配路由/status,所以能正常访问 |
1 | ## 因为VirtualService 没有匹配路由/headers,所以能提示404 Not Found |
现在Https基本上已经是标配,这里也少不了,我们需要做三件事:
创建一个Kubernetes sceret对象,用于保存服务器的证书和私钥。具体说来就是使用 kubectl 命令在命名空间 istio-system 中创建一个 secret 对象,命名为istio-ingressgateway-certs。Istio 网关会自动载入这个 secret。
为了服务多个域名,我使用了Opaque类型,命名规则使用了{domain}.crt和{domain}.key。需要注意的是,对应的内容需要使用base64编码!
1 | apiVersion: v1 |
把证书和私钥放于目录/etc/istio/ingressgateway-certs,需要注意的是,这里指的是宿主机器(集群的Master),又或者是你执行kubectl命令的机器,因为我是在Master执行的命令,非本机上没验证过。
把证书和私钥放于目录/etc/istio/ingressgateway-certs下,命名规则和第一步一致,如下:
注意,这里只需要把文件原封不动的放进来,不需要base64编码
1 | . |
在Gateway上增加对https的配置声明。
1 | --- |
在之前做很多的准备工作去搭建OIDC标准,就是为了通过此标准去对接三方系统,这一次我们尝试一下gitlab使用openid connect标准的SSO
搭建过程可查看:OIDC搭建之Ory Hydar 2.0实践
OIDC标准,很多填写内容可在 {{baseUrl}}/.well-known/openid-configuration
查看
1 | gitlab_rails['omniauth_enabled'] = true # 开启omniauth |
1 | gitlab_rails['omniauth_providers'] = [ |
1 | sudo gitlab-ctl reconfigure |
user_response_structure
此处的配置是映射你的sso服务 user_info_url接口返回的用户信息。 如果你的用户信息接口返回的结构为
1 | { |
那么 root_path 可以不用配置
id_path建议配置成用户的唯一标识
更多详细注释请参考gitlab官网:omniauth-oauth2-generic
为什么K8S中选择Gateway是一个纠结的选项,汇总一下可选项吧
那么怎么选呢?这里有两种流派:
1、把k8s作为部署平台,不跟他耦合,所有的业务在自己的代码里,包括路由等基本gateway能力、以及熔断等高级gateway逻辑
2、把k8s作为应用的一部分,将API路由、熔断、等等交给k8s或者istio来承载
不管如何,我们还是都走一下吧。
我们尝试使用部署 httpbin 服务,然后分别使用3种网关,来试试3者的不同。
本次我们部署1.25.4最新版本,Kubernetes实践-部署 18年的笔记作为参考。不同之处大概就是docker换成了containerd.
Host | Ip | Description |
---|---|---|
k8s | 10.8.111.200 | CentOS7 模板机,用于克隆下面几个节点 |
k8s-master1 | 10.8.111.202 | CentOS7 |
k8s-node1 | 10.8.111.203 | CentOS7 |
k8s-node2 | 10.8.111.204 | CentOS7 |
本次搭建采用虚拟机,先制作了一个k8s的虚拟机,完成了所有节点都要操作的内容,然后进行克隆3台进行修改,分别配置每台需要操作的内容
1 | vi /etc/sysconfig/network-scripts/ifcfg-ensXXX |
模板机
1 | ONBOOT="yes" |
各节点根据各自ip规划
1 | systemctl restart network |
1 | # 在10.8.111.200执行 |
1 | 10.8.111.200 k8s |
1 | # 导入elrepo gpg key |
1 |
|
1 | yum install chrony -y |
1 | systemctl stop firewalld |
1 | # 临时关闭;关闭swap主要是为了性能考虑 |
1 | # 临时关闭 |
为了让 Linux 节点的 iptables 能够正确查看桥接流量,请确认 sysctl 配置中的 net.bridge.bridge-nf-call-iptables 设置为 1。例如:
1 | cat <<EOF | sudo tee /etc/modules-load.d/k8s.conf |
安装 containerd
1 |
|
配置containerd,修改sandbox_image 镜像源
1 | # 导出默认配置,config.toml这个文件默认是不存在的 |
配置containerd cgroup 驱动程序systemd
kubernets自v1.24.0后,就不再使用docker.shim,替换采用containerd作为容器运行时端点
1 | # 把SystemdCgroup = false修改为:SystemdCgroup = true, |
Containerd配置镜像加速
endpoint位置添加阿里云的镜像源
1 | $ vi /etc/containerd/config.toml |
重启 containerd
1 | systemctl daemon-reload |
1 | cat > /etc/yum.repos.d/kubernetes.repo << EOF |
1 | # 不指定版本就是最新版本,当前最新版就是1.25.4 |
查看日志,发现有报错,报错如下:
1 | Nov 30 06:02:22 k8s-200 kubelet[1922]: E1130 06:02:22.353853 1922 run.go:74] "command failed" err="failed to load kubelet config file, error: failed to load Kubelet config |
解释:未经过 kubeadm init 或者 kubeadm join 后,kubelet 会不断重启,这个是正常现象……,执行 init 或 join 后问题会自动解决,对此官网有如下描述,也就是此时不用理会 kubelet.service。
查看版本
1 | kubectl version |
查看 Kubernetes 初始化所需镜像
1 | kubeadm config images list --kubernetes-version v1.25.4 |
集群初始化
1 | kubeadm init \ |
–image-repository
string: 这个用于指定从什么位置来拉取镜像(1.13版本才有的),默认值是k8s.gcr.io,我们将其指定为国内镜像地址:registry.aliyuncs.com/google_containers
–kubernetes-version
string: 指定kubenets版本号,默认值是stable-1,会导致从https://dl.k8s.io/release/stable-1.txt
下载最新的版本号,我们可以将其指定为固定版本(v1.25.4)来跳过网络请求。–apiserver-advertise-address
指明用 Master 的哪个 interface 与 Cluster 的其他节点通信。如果 Master 有多个 interface,建议明确指定,如果不指定,kubeadm 会自动选择有默认网关的 interface。这里的ip为master节点ip,记得更换。–pod-network-cidr
指定 Pod 网络的范围。Kubernetes 支持多种网络方案,而且不同网络方案对 –pod-network-cidr有自己的要求,这里设置为10.244.0.0/16 是因为我们将使用 flannel 网络方案,必须设置成这个 CIDR。--control-plane-endpoint
cluster-endpoint 是映射到该 IP 的自定义 DNS 名称,这里配置hosts映射:10.8.111.202 cluster-endpoint。 这将允许你将–control-plane-endpoint=cluster-endpoint 传递给 kubeadm init,并将相同的 DNS 名称传递给 kubeadm join。 稍后你可以修改 cluster-endpoint 以指向高可用性方案中的负载均衡器的地址。--service-cidr
集群内部虚拟网络,Pod统一访问入口【温馨提示】kubeadm 不支持将没有 –control-plane-endpoint 参数的单个控制平面集群转换为高可用性集群
重置再初始化
1 | kubeadm reset |
成功后
1 | Your Kubernetes control-plane has initialized successfully! |
根据成功后的提示,做kubectl认证,配置环境变量
1 | mkdir -p $HOME/.kube |
发现节点还是有问题,查看日志 cat /var/log/messages
,因为没有安装网络插件
“Container runtime network not ready” networkReady=”NetworkReady=false reason:NetworkPluginNotReady message:Network plugin returns error: cni plugin not initialized”
我们先让node加入进来集群,然后安装 Pod 网络插件
Host | Ip | Description |
---|---|---|
k8s | 10.8.111.200 | CentOS7 模板机,用于克隆下面几个节点 |
k8s-master1 | 10.8.111.202 | CentOS7 |
k8s-node1 | 10.8.111.203 | CentOS7 |
k8s-node2 | 10.8.111.204 | CentOS7 |
我们分别将k8s-node1
、k8s-node2
部署加入集群
先安装 kubelet
1 | yum install -y kubelet-1.25.4 kubeadm-1.25.4 kubectl-1.25.4 --disableexcludes=kubernetes |
加入集群
1 | kubeadm join cluster-endpoint:6443 --token 2gaeoh.fq98xja5pkj7n98g \ |
如果没有令牌,可以通过在控制平面节点上运行以下命令来获取令牌:
1 | kubeadm token list |
默认情况下,令牌会在24小时后过期。如果要在当前令牌过期后将节点加入集群, 则可以通过在控制平面节点上运行以下命令来创建新令牌:
1 | kubeadm token create |
如果你没有 –discovery-token-ca-cert-hash 的值,则可以通过在控制平面节点上执行以下命令链来获取它:
1 | openssl x509 -pubkey -in /etc/kubernetes/pki/ca.crt | openssl rsa -pubin -outform der 2>/dev/null | openssl dgst -sha256 -hex | sed 's/^.* //' |
如果执行 kubeadm init 时没有记录下加入集群的命令,可以通过以下命令重新创建(推荐)一般不用上面的分别获取 token 和 ca-cert-hash 方式,执行以下命令一气呵成:
1 | kubeadm token create --print-join-command |
查看节点:
1 | kubectl get pod -n kube-system |
你必须部署一个基于 Pod 网络插件的 容器网络接口 (CNI),以便你的 Pod 可以相互通信。
一般来说,在初期使用Flannel是一个稳妥安全的选择,直到你开始需要一些它无法提供的东西。
wget https://raw.githubusercontent.com/flannel-io/flannel/master/Documentation/kube-flannel.yml
1 | kubectl apply -f kube-flannel.yml |
因为墙原因,应该是会安装失败,我们可以ctr image pull
将镜像拉下来先,
1 | ctr image pull docker.io/rancher/mirrored-flannelcni-flannel-cni-plugin:v1.1.0 |
或者ctr image import
导入准备好的离线文件。
Calico是一个纯三层的数据中心网络方案,Calico支持广泛的平台,包括Kubernetes、OpenStack等。
Calico 在每一个计算节点利用 Linux Kernel 实现了一个高效的虚拟路由器( vRouter) 来负责数据转发,而每个 vRouter 通过 BGP 协议负责把自己上运行的 workload 的路由信息向整个 Calico 网络内传播。
此外,Calico 项目还实现了 Kubernetes 网络策略,提供ACL功能。
1.下载Calico
wget https://docs.projectcalico.org/manifests/calico.yaml --no-check-certificate
vim calico.yaml
1 | - name: CALICO_IPV4POOL_CIDR |
1 | kubectl apply -f calico.yaml |
问题:
1 | Warning FailedScheduling 80s (x13 over 61m) default-scheduler 0/1 nodes are available: 1 node(s) had untolerated taint {node.kubernetes.io/not-ready: }. preemption: 0/1 nodes are available: 1 Preemption is not helpful for scheduling. |
使用kubeadm初始化的集群,出于安全考虑Pod不会被调度到Master Node上,不参与工作负载。允许master节点部署pod即可解决问题,命令如下:
1 | kubectl taint nodes --all node-role.kubernetes.io/master- |
实际加入node节点即解决了。不建议只有master
注意:以下操作都是在master下操作。
一:先将节点设置为维护模式(k8s-node1是节点名称)
1 | kubectl drain k8s-node1 --delete-local-data --force --ignore-daemonsets node/k8s-node1 |
二:删除节点
1 | kubectl delete node k8s-node1 |
三:确认是否已经删除
1 | kubectl get nodes |
三:生成永久Token(node加入的时候会用到)
1 | kubeadm token create --ttl 0 --print-join-command |
四:查看Token确认
1 | kubeadm token list |
注意:以下操作在node下操作
一:停掉kubelet
1 | systemctl stop kubelet |
二:删除之前的相关文件
1 | rm -rf /etc/kubernetes/* |
三:加入集群
1 | kubeadm join 192.168.233.3:6443 --token rpi151.qx3660ytx2ixq8jk --discovery-token-ca-cert-hash sha256:5cf4e801c903257b50523af245f2af16a88e78dc00be3f2acc154491ad4f32a4 |
保姆级 Kubernetes 1.24 高可用集群部署中文指南
【云原生】K8S master节点更换IP以及master高可用故障模拟测试
1 | dapr init -k |
计算器calculator微服务部署之dapr,对标 sidecar构架之istio-k8s部署
后端服务
前端服务
上图为该示例应用各个组件的组成和服务架构。
我们可以随便查看一个微服务的部署清单,位于 deploy/ 目录下面,比如 go-adder.yaml:
1 | # go-adder.yaml |
1 | kubectl apply -f deploy/ |
部署完成后我们可以通过 dapr configurations 命令查看当前集群中的所有配置信息:
1 | ➜ dapr configurations -k -A |
应用部署完成后查看 Pod 的状态:
1 | ➜ kubectl get pods |
部署完成后我们可以通过 calculator-front-end
这个 LoadBalancer 类型的 Service 去访问计算器的前端应用,我们这里分配的 EXTERNAL-IP 地址为
1 | kubectl port-forward service/calculator-front-end 8000:80 |
operands.json
1 | {"operandOne":"52","operandTwo":"34"} |
persist.json
1 | [{"key":"calculatorState","value":{"total":"54","next":null,"operation":null}}] |
1 | curl -s http://localhost:8080/calculate/add -H Content-Type:application/json --data @operands.json |
结果
1 | 86 |
当前端服务器调用各自的操作服务时(见下面的server.js代码),它不需要知道它们所在的IP地址或它们是如何构建的。相反,它通过名称调用其本地 dapr side-car,它知道如何调用服务上的方法,利用平台的服务发现机制,在本例中为 Kubernetes DNS 解析。
${daprUrl}/${微服务名称}/method/${服务接口路由}
1 | 如:${daprUrl}/addapp/method/add |
1 | const daprUrl = `http://localhost:${daprPort}/v1.0/invoke`; |
1 | curl -L https://istio.io/downloadIstio | sh - |
安装目录包含:
把bin目录下的istioctl添加到PATH,方便后续使用istioctl命令操作。
1 | export PATH=$PWD/bin:$PATH |
安装istio:
1 | istioctl install --set profile=demo -y |
卸载
1 | istioctl uninstall --purge |
给命名空间添加标签,指示 Istio 在部署应用的时候,自动注入 Envoy 边车代理:
1 | kubectl label namespace default istio-injection=enabled |
1 | kubectl apply -f samples/bookinfo/platform/kube/bookinfo.yaml |
1 | kubectl patch service istio-ingressgateway -n istio-system -p '{"spec":{"type":"NodePort"}}' |
1 | kubectl -n istio-system get service istio-ingressgateway |
1 | export INGRESS_HOST=127.0.0.1 |
1 | export GATEWAY_URL=$INGRESS_HOST:$INGRESS_PORT |
我们前期有使用过
Ory Hydra之OAuth 2.0 Authorize Code Flow
Ory Hydra之Oauth 2.0 Client Credentials flow
当时采用的并非2.0,本次完整的使用2.0完整的走一遍,并完整的讲解,如何在授权认证流程对接自己的用户系统。
./docker-compose.yml
1 | version: "3.7" |
./config/hydra.yml
1 | serve: |
2.0开始,不需要client_id,自动生成一个uuid,client_secret不填写,会自动生成。
http://localhost:4445/admin/clients
1 | { |
1 | { |
普遍的流程为以下三个步骤
1、授权请求 Authorization Request 浏览器打开
1 | GET {认证终点} |
2、授权响应 Authorization Response 获取code
1 | HTTP/1.1 302 Found |
3、令牌请求 Access Token Request code换token
1 | POST {令牌终点} HTTP/1.1 |
我们按照上面这三个步骤来讲解一下Hydra是怎么做的。
浏览器打开
1 | GET http://127.0.0.1:4444/oauth2/auth |
http://127.0.0.1:4444/oauth2/auth?response_type=code&client_id=a9ea2e4c-5c9e-4edd-8a53-09124b870477&scope=openid offline&state=nqvresaazswwbofkeztgnvfs
打开后我们发现,我们被重定向到了http://127.0.0.1:3000/login?login_challenge=9ba37003126244608ab2d4501f9b32f5
Hydra通过步骤1链接
到步骤2
获取code,抽象为两个流程:Login和Consent,这两个流程便于我们对接我们自己系统的用户授权认证.Login流程主要为 登录认证
,Consent流程主要为 授权
我来看看./config/hydra.yml
中的配置
1 | consent: http://127.0.0.1:3000/consent // 授权(前端) |
我们发现重定向的位置就是配置中的login,Login流程是一个登录认证服务(前后端),需要我们在自己业务中实现,链接中还携带了login_challenge,
前端
此时,我们把账号密码以及login_challenge通过接口发往我们后端
后端
后端拿到用户名密码和login_challenge,做如下2件事
1、自己业务系统的用户名密码校验
2、携带用户信息和login_challenge
调用acceptLoginRequest
登录请求
登录请求
请求地址:http://127.0.0.1:4445/admin/oauth2/auth/requests/login/accept?login_challenge=66cc8259bf0c4a3880e26c189968bbd6
请求方式:PUT
请求类型:application/json
请求参数:
1 | { |
请求成功返回:
1 | { |
打开这个重定向,就进入了Consent流程
前端
此时,我们把用户授权以及consent_challenge通过接口发往我们后端
后端
后端拿到用户授权以及consent_challenge,做如下2件事
1、自己业务系统的授权
2、consent_challenge调用acceptLoginRequest
认证请求
认证请求
请求地址:http://127.0.0.1:4445/admin/oauth2/auth/requests/consent/accept?consent_challenge=xxxxxx
请求方式:PUT
请求类型:application/json
请求参数:
说明下,session是可以写你想要放进id_token里面的东西,但是但是!请不要有中文,比如说:”name”:”小白”,这样Hydra也无法识别
1 | { |
请求成功返回:
1 | { |
重定向打开后,完成Consent流程,获取到code,此时会将code携带到我们创建应用时候的redirect_uris=http://127.0.0.1:5555/callback
1 | http://127.0.0.1:5555/callback?code=ory_ac_0T0UehFyo-BVCDcdiu2qUuxLw4jNLpwFDjqkC157-ms.eUZpm0ZokBUBdxgEI5y5w8BTjf1URAzMwwXddW3gf4Q&scope=openid+offline&state=nqvresaazswwbofkeztgnvfs |
获取令牌、刷新令牌
请求地址:http://127.0.0.1:5444/oauth2/token
请求方式:POST
请求类型:application/x-www-form-urlencoded
请求参数 | 参数类型 | 参数说明 |
---|---|---|
grant_type | 字符串 | 授予类型,必填项 |
code | 字符串 | 授权码 |
refresh_token | 字符串 | 刷新令牌 |
client_id | 字符串 | 客户端id,必填项 |
client-secret | 字符串 | 客户端秘钥,必填项 |
redirect_uri | 字符串 | 重定向uri |
Authorization使用 Basic Auth 将client_id和client_secret写入
1 | { |
返回
1 | { |
最后我们拿着idToken去JWT解析看看效果可以看到,idToken解析出来你需要的信息。
至此我们就获取到了访问令牌access_token,前端可以将令牌缓存在cookie或session中,相应的后台也会缓存,后面前端调用其他服务时携带令牌调用接口,后台校验根据token来判断是否放行。
cli 创建客户端
1 | code_client=$(docker-compose -f docker-compose.yml exec hydra \ |
使用hydra示例授权(Hydra 提供快速验证oauth授权流程)
1 | docker-compose -f docker-compose.yml exec hydra \ |