背景

单独把身份认证服务部署起来,在调用其他服务的时候,先通过身份认证。

因为Hydra版本出现了大的变动,即将2.0,改动巨大,本篇暂时以v1.10.6进行

本次我们来走一遍 OAuth 2.0 Authorize Code Flow

部署

启动Hydra服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 创建一个独立的网段
docker network create hydraguide

# 拉取pg,当然也可使用mysql等其他数据库
docker pull postgres:9.6

# 拉取hydra
docker pull oryd/hydra:v1.10.6

# 运行数据库(帐号:hydra 密码:secret)
docker run \
--network hydraguide \
--name ory-hydra-example--postgres \
-e POSTGRES_USER=hydra \
-e POSTGRES_PASSWORD=secret \
-e POSTGRES_DB=hydra \
-d postgres:9.6

# 设置加密
export SECRETS_SYSTEM=$(export LC_CTYPE=C; cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1)
# 当然你也可以写死
export SECRETS_SYSTEM=SHARINGTOMMY123456789

# 创建临时的环境变量 DSN
export DSN=postgres://hydra:secret@ory-hydra-example--postgres:5432/hydra?sslmode=disable

# 初始化数据库
docker run -it --rm \
--network hydraguide \
oryd/hydra:v1.10.6 \
migrate sql --yes $DSN

# 启动一个Hydra服务
docker run -d \
--name ory-hydra-example--hydra \
--network hydraguide \
-p 5444:4444 \
-p 5445:4445 \
-e SECRETS_SYSTEM=$SECRETS_SYSTEM \
-e DSN=$DSN \
-e URLS_SELF_ISSUER=http://localhost:5444/ \
-e URLS_CONSENT=http://localhost:9020/consent \
-e URLS_LOGIN=http://localhost:9020/login \
oryd/hydra:v1.10.6 serve all --dangerous-force-http

# 验证(能看到正常启动日志)
docker logs ory-hydra-example--hydra

说明:
--network hydraguide 网络
-p 5444:4444 public API http://localhost:5444/
-p 5445:4445 Hydra’s administrative API http://localhost:5445/
-e SECRETS_SYSTEM=$SECRETS_SYSTEM 加密变量
-e DSN=$DSN 数据库变量
-e URLS_SELF_ISSUER=http://localhost:5444/ 是你的服务器地址
-e URLS_CONSENT=http://localhost:9020/consent 是你前端用户同意授权地址
-e URLS_LOGIN=http://localhost:9020/login 是前端用户登录地址
-e URLS_LOGOUT 是你退出登录地址
-e URLS_POST_LOGOUT_REDIRECT 是你退出登录成功后跳转到的地址
-e TTL_ID_TOKEN id_token 过期时间的设置单位 h m s,默认为1小时
-e TTL_ACCESS_TOKEN 配置刷新令牌有效的时间。默认值为720h。设置为-1可使刷新令牌永不过期。
-e TTL_REFRESH_TOKEN配置标识令牌有效的时间。默认为1小时。

–dangerous-force-http 加了这句话就是不需要 https
如果你不加的话,URLS_SELF_ISSUER=https://localhost:4444/ 这里就要加s
加了https,https会有证书等问题。

登录/授权样例网站启动

该部分一般就是我们的前端登录授权页面,只是hydra提供了一个示例

1
2
3
4
5
6
7
8
docker pull oryd/hydra-login-consent-node:v1.10.6
docker run -d \
--name ory-hydra-example--consent \
-p 9020:3000 \
--network hydraguide \
-e HYDRA_ADMIN_URL=http://ory-hydra-example--hydra:4445 \
-e NODE_TLS_REJECT_UNAUTHORIZED=0 \
oryd/hydra-login-consent-node:v1.10.6

-p 9020:3000暴露9020端口,这个端口就是URLS_CONSENTURLS_LOGIN(URLS_CONSENT=http://localhost:9020/consent, URLS_LOGIN=http://localhost:9020/login).
HYDRA_ADMIN_URL=http://ory-hydra-example--hydra:4445 Hydra后台管理接口
NODE_TLS_REJECT_UNAUTHORIZED=0 取消TLS校验

示例页面:
fGU37K

OexXJh

演示OAuth2.0流程

图解

3YxYQv
xCH2Wd
TvxaSN
iFMa3c
CKoZI4
v6NTz1
icA7m3
0N432n
ITxNk0
qJn2BQ
GAmN24
8P0SbL

请求与响应

  • 授权请求 Authorization Request 浏览器打开

    1
    2
    3
    4
    5
    6
    7
    8
    GET {认证终点}
    ?response_type=code // 必选项
    &client_id={客户端的ID} // 必选项
    &redirect_uri={重定向URI} // 可选项
    &scope={申请的权限范围} // 可选项
    &state={任意值} // 推荐
    HTTP/1.1
    HOST: {认证服务器}
  • 授权响应 Authorization Response 获取code

    1
    2
    3
    4
    HTTP/1.1 302 Found
    Location: {重定向URI}
    ?code={授权码} // 必填
    &state={任意文字} // 如果授权请求中包含 state的话那就是必填
  • 令牌请求 Access Token Request code换token

    1
    2
    3
    4
    5
    6
    7
    8
    POST {令牌终点} HTTP/1.1
    Host: {认证服务器}
    Content-Type: application/x-www-form-urlencoded

    grant_type=authorization_code // 必填
    &code={授权码} // 必填 必须是认证服务器响应给的授权码
    &redirect_uri={重定向URI} // 如果授权请求中包含 redirect_uri 那就是必填
    &code_verifier={验证码} // 如果授权请求中包含 code_challenge 那就是必填

根据具体情况有可能是向客户端服务器进行请求,这时候请加上 Basic 认证(Authorization 头部)或者是 参数 client_id & client_secret

  • 令牌响应 Access Token Response
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    HTTP/1.1 200 OK
    Content-Type: application/json;charset=UTF-8
    Cache-Control: no-store
    Pragma: no-cache

    {
    "access_token":"{访问令牌}", // 必填
    "token_type":"{令牌类型}", // 必填
    "expires_in":{过期时间}, // 任意
    "refresh_token":"{刷新令牌}", // 任意
    "scope":"{授权范围}" // 如果请求和响应的授权范围不一致就必填
    }

Hydra演示

创建一个facebook-photo-backup应用并获得id和secret

通过 Hydra CLI 命令创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
docker run --rm -it \
-e HYDRA_ADMIN_URL=http://ory-hydra-example--hydra:4445 \
--network hydraguide \
oryd/hydra:v1.10.6 \
clients create --skip-tls-verify \
--id facebook-photo-backup \
--secret some-secret \
--grant-types authorization_code,refresh_token,client_credentials,implicit \
--response-types token,code,id_token \
--scope openid,offline,photos.read \
--callbacks http://127.0.0.1:9010/callback


You should not provide secrets using command line flags, the secret might leak to bash history and similar systems
OAuth 2.0 Client ID: facebook-photo-backup

测试环境会有提示You should not provide secrets using command line flags, the secret might leak to bash history and similar systems,忽略即可。
此时我们得到

1
2
Client ID: facebook-photo-backup
Client Secret: some-secret
通过 rest api 创建

创建应用客户端
P3Hu1k

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
POST http://localhost:5445/clients
{
"client_id": "facebook-photo-backup1",
"client_secret": "some-secret",
"token_endpoint_auth_method": "client_secret_basic",
"redirect_uris": [
"http://127.0.0.1:9010/callback"
],
"scope": "openid offline photos.read",
"grant_types": [
"authorization_code",
"refresh_token",
"implicit",
"client_credentials"
],
"response_types": [
"code",
"id_token",
"token"
]
}

执行一个 OAuth 2.0 授权流程

使用CLI简易

以下示例将执行一个 OAuth 2.0 授权流程。为简化此操作,Hydra CLI 提供了一个名为 hydra token user 的辅助命令。

1
2
3
4
5
6
7
8
9
10
11
docker run --rm -it \
--network hydraguide \
-p 9010:9010 \
oryd/hydra:v1.10.6 \
token user --skip-tls-verify \
--port 9010 \
--auth-url http://localhost:5444/oauth2/auth \
--token-url http://ory-hydra-example--hydra:4444/oauth2/token \
--client-id facebook-photo-backup \
--client-secret some-secret \
--scope openid,offline,photos.read

上面这个服务的过程请参考 下面的 使用Hydra api,其实就是简化接口参数过程。

打开 http://127.0.0.1:9010/

ggvyYz

fGU37K

OexXJh

SCz661

这个时候我们使用Access Token去调用userinfo API,即可正常获取到用户信息.

1
2
3
4
5
6
7
curl -X GET \
http://localhost:5444/userinfo \
-H 'authorization: Bearer 6BboGKvjsyXG_ZFqX8NQboVi4v4JFqiLoKRZ2ex3QRI.fowTdOf3QqCojmmKTdm5MveKulsv-vcEkp2KdiROAMI' \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-H 'postman-token: fecb7032-db0a-bb1b-a61b-de22add7e5bc'

得到如下信息,sub就是用户的信息,为什么这里用户信息只有一个sub呢?因为他们实现ory-hydra-example–consent的时候什么都没加进去,根据自己需要的信息加入sub就可以了。

1
2
3
4
5
6
7
8
{
"subject": "foo@bar.com",
"acr": "labo",
"context": "<object>",
"force_subject_identifier": "ex fugiat aliquip amet dolore",
"remember": false,
"remember_for": -4068005
}
使用Hydra api

YljCLk
HTSmvp

整个过程最终目的就是要获取授权码,然后通过授权码去拿token,通过图可以看出获取授权码一共需要两个流程:LoginConsent

下面我们就开始完整的演示一遍获取授权码的接口流程:

浏览器打开:
http://localhost:5444/oauth2/auth?&client_id=facebook-photo-backup&response_type=code&scope=openid&state=nqvresaazswwbofkeztgnvfs

Hydra服务器会302重定向到你在创建的时候设定的前端登录地址:http://localhost:9020/login?login_challenge=xxxxxxxxxx,带着login_challenge回来,这个东西就是下面接口需要的东西

7Kg0sn
这时候,我们前端应该进行身份验证的提交,并携带login_challenge到后端服务,后端接收到,通过对用户名和密码的校验后,请求acceptLoginRequest

登录请求

请求地址:
http://localhost:5445/oauth2/auth/requests/login/accept?login_challenge=66cc8259bf0c4a3880e26c189968bbd6
请求方式:PUT
请求类型:application/json
请求参数:

1
2
3
4
5
6
7
8
{
"subject": "foo@bar.com",
"acr": "labo",
"context": "<object>",
"force_subject_identifier": "ex fugiat aliquip amet dolore",
"remember": false,
"remember_for": -4068005
}

请求成功返回:

1
2
3
{
"redirect_to": "http://localhost:5444/oauth2/auth?client_id=facebook-photo-backup&login_verifier=289c0dbac2eb49e28eebc4b8f208b8c8&response_type=code&scope=openid&state=nqvresaazswwbofkeztgnvfs"
}

lwBuzu

MpbDZl

我们再通过浏览器打开

Login认证流程就结束了,把请求成功返回的结果,继续丢到浏览器中,他会302重定向到你创建hydra的时候设定的consent地址,并携带consent_challenge,,我们再通过前端提交把consent_challenge 的值传递给后端,让后端拿到下面去继续请求

认证请求

请求地址:
http://localhost:5445/oauth2/auth/requests/consent/accept?consent_challenge=xxxxxx
请求方式:PUT
请求类型:application/json
请求参数:
说明下,session是可以写你想要放进id_token里面的东西,但是但是!请不要有中文,比如说:”name”:”小白”,这样Hydra也无法识别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"grant_access_token_audience": [],
"grant_scope": [
"openid"
],
"handled_at": "2019-04-16T04:45:05.685Z",
"remember": true,
"remember_for": -72766940,
"session": {
"access_token": {},
"id_token": {
"userId": "111"
}
}
}

请求成功返回:

1
2
3
{
"redirect_to": "http://localhost:4444/oauth2/auth?client_id=tommy&consent_verifier=373e5b86d4444fe2a78390df64efc9b1&prompt=&response_type=code&scope=openid&state=nqvresaazswwbofkeztgnvfs"
}

z5hXVM
把结果的验证接口地址继续放到浏览器中回车,hydra服务器会重定向到你创建应用时候设置的callback地址,并且后面带着code,如:http://127.0.0.1:9010/callback?code=xxxxxxxxx,拿到这个code到下面的接口,就可以请求获取到Token了。

Jd7Vsk

获取令牌、刷新令牌

请求地址:
http://localhost:5444/oauth2/token
请求方式:POST
请求类型:application/x-www-form-urlencoded

请求参数 参数类型 参数说明
grant_type 字符串 授予类型,必填项
code 字符串 授权码
refresh_token 字符串 刷新令牌
client_id 字符串 客户端id,必填项
client-secret 字符串 客户端秘钥,必填项
redirect_uri 字符串 重定向uri
1
2
3
4
5
6
{
"grant_type": "authorization_code",
"client_id": "facebook-photo-backup",
"redirect_uri": "http://localhost:9020/login"
"code": "Qk4jf3dZ_DSkAAtlbS9pTilVFTRCeAYHdPpUN"
}

返回

1
2
3
4
5
6
7
8
{
"access_token": "nRNAO1g8LNMI2i_FJXQDoLxvHL7aLz4sILhWoL_de4w.EcGjSbAlCmsBSPlE_KtNk99AcFVaE_eqye0Sh41dXSk",
"expires_in": 299,
"id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDk2N2ZkYi0yYTg0LTRhNjAtODRlZi02M2Q3MGFmNTRmNzIifQ.eyJhdF9oYXNoIjoia05wVkVkWGhTa2U4MmdvSk81SUFqUSIsImF1ZCI6WyJ6c3ctY2xvdWQtZGV2Il0sImF1dGhfdGltZSI6MTY1ODYyOTIxOSwiZXhwIjoxNjU4NjI5NTI2LCJpYXQiOjE2NTg2MjkyMjYsImlzcyI6Imh0dHBzOi8venN3LWh5ZHJhLmNvbS8iLCJqdGkiOiI1ZmIwMWM4NC1mNTZmLTQ0MGUtYjFmZC1iOTBjODNhY2ZkODIiLCJyYXQiOjE2NTg2MjkyMTIsInNpZCI6IjU2OWY2ZDczLTgzZDEtNDBlOS04YmMxLWFlMjFhY2QzOGYyZCIsInN1YiI6InpzdyJ9.OaAlvwFY84BU2_fF8RxsXK_ueoURmvIMl_Xa7xZ566laeZdJ8GyONzrlGDSLwNNhdKV8Mcl3U8aNoGZDb5w3DRca9C0rqaedo-r4zMrsAZ-YNUAXvuv_Ga-n_MDPA2FxLF0vz1Til48jkbWhQ0QmJnT_m6DvUo4veVjtbU6Ggbz2-rYO7adW2rp1gf4I_AwwUOjfBtQmqZRPNvQIkX-Md-bQfhqnGikMEkeoZdYuZP3ags6H1cm3E8eMLyJk4kGXGkMosSKLE8LFh1HrXYQfCDwCVpL1dy_-b0ZKyj20RVVdusBzdb97MV4QFeKleuyGIRBXHI0etW9EELOVjPWcz59tuE29uToSopiEArFpeCotsh4nllFxqtvqRM4zh5ZMjf6MIHpm74IW8nVlXdCVjBjzZp3Lg3th7iWEDrZm_9tZ1o0SmYYwf9IbjjttrIaBbph-iTm5aijN6WHrKM0HNOcrERrcK4REcSFKueL46-yHRKmOhwXNROJHZQu3mTpZRO8BnR3eWBsRuFmVGLt8BKi8s_fAR7AI__WN1y8rek2_34LnAVrh8CJQnzBAIB-9y6AeGH8a9t_tqxkJWeLa8ohXVH8VTceKkCMNm_7x9vvhhqlb8lyVau9ktvkIgoalyGRmBf66FZQkxpDFht0XiC7ZGq9IusI-fDSIcGRuJa8",
"refresh_token": "emWnXUsZ43Sb9_eR0HyxirTltitFX0rvv2ouwRHdZ6U.4zKlg3iCdoHco02jryMHj432xzz0yQrh41zaQejp52M",
"scope": "openid offline",
"token_type": "bearer"
}

最后我们拿着idToken去JWT解析看看效果可以看到,idToken解析出来你需要的信息。
至此我们就获取到了访问令牌access_token,前端可以将令牌缓存在cookie或session中,相应的后台也会缓存,后面前端调用其他服务时携带令牌调用接口,后台校验根据token来判断是否放行。

以上就是获取token的整个流程。

当然token也有过期的时候,下面说一下刷新令牌,接口和上面获取token是同一个http://localhost:5444/oauth2/token,只是传参不同

1
2
3
4
5
6
7
{
"grant_type": "refresh_token",
"client_id": "facebook-photo-backup",
"redirect_uri": "http://localhost:9020/login"
// refresh_token是获取token时的refresh_token,不是access_token
"refresh_token": "emWnXUsZ43Sb9_eR0HyxirTltitFX0rvv2ouwRHdZ6U.4zKlg3iCdoHco02jryMHj432xzz0yQrh41zaQejp52M"
}

可用于用户登出

请求地址:
http://localhost:5444/oauth2/revoke
请求方式:POST
请求类型:application/x-www-form-urlencoded

1
2
3
{
"token":""
}

可用于获取用户标识或校验token是否存活

请求地址:
http://localhost:5444/oauth2/introspect
请求方式:POST
请求类型:application/x-www-form-urlencoded

1
2
3
{
"token":""
}

相关链接

Run Ory Hydra in Docker
deprecate –dangerous-force-http flag
[简易图解]『 OAuth2.0』 猴子都能懂的图解
[简易图解]『 OAuth2.0』 『进阶』 授权模式总结
微服务Token方案之ORY Hydra授权中心_Java实现

背景

Dapr 允许通过链接一系列中间件组件来定义自定义处理管道。 请求在路由到用户代码之前经过所有已定义的中间件组件,然后在返回到客户机之前,按相反顺序经过已定义的中间件,如下图中所示。
DdyQpe

中间件示例

uppercase中间件示例

我们拿 sidecar构架之dapr的跨物理机负载均衡的部署来继续实践一下将http请求的body内容转换为大写。

配置dapr3虚拟机

因为我们是通过dapr3(service2) 去 调用dapr1和dapr2的(service1)服务,那么我们在入口dapr3机器上配置中间件

~/.dapr/components/uppercase.yaml

1
2
3
4
5
6
7
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: uppercase
spec:
type: middleware.http.uppercase
version: v1

~/.dapr/config.yaml

1
2
3
4
5
6
7
8
9
10
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: pipeline
namespace: default
spec:
httpPipeline:
handlers:
- name: uppercase
type: middleware.http.uppercase

增加接口

我们增加一个/word的post接口,接口返回目标机器的ip和我们body参数的英文字母

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
const express = require('express')
const os = require('os');
const app = express()
const bodyParser = require('body-parser')
const port = 3000
app.use(bodyParser.json());// 添加json解析
app.use(bodyParser.urlencoded({extended: false}));

function getIpAddress() {
var interfaces=os.networkInterfaces()

for (var dev in interfaces) {
let iface = interfaces[dev]

for (let i = 0; i < iface.length; i++) {
let {family, address, internal} = iface[i]

if (family === 'IPv4' && address !== '127.0.0.1' && !internal) {
return address
}
}
}
}

function randomCoding(){
//创建26个字母数组
var arr = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z'];
var idvalue ='';
const n = 4;
for(var i=0;i<n;i++){
idvalue+=arr[Math.floor(Math.random()*26)];
}
return idvalue;
}

app.post('/word', (req, res) => {
const ipAddress = getIpAddress()
const body = req.body
body.ip = ipAddress
res.send(body)
})

app.listen(port, () => {
console.log(`Example app listening on port ${port}`)
})

调用结果

我们来调用/word接口,发现我们的body请求参数abc,通过middleware.http.uppercase大小写中间件,转换为大写,并向下游的service1发送请求,下游2台负载均衡的service1服务,均接受到中间件改变的大写参数。
1JMjA6

lg1r3L

相关链接

OAuth2 client credentials

服务调用示意

dapr调用示意

ZidVEq

sidecar 之间的通信都是 gRPC (3、6)
application与sidecar之间的通信是http/grpc (1、7、4、5)

  1. 服务 A 对服务 B 发起HTTP/gRPC的调用。
  2. Dapr使用服务注册发现平台的名称解析组件发现服务B的位置。(例如:mDNS、consul等)
  3. Dapr 将消息转发至服务 B的 Dapr 边车
    注: Dapr 边车之间的所有调用考虑到性能都优先使用 gRPC。 仅服务与 Dapr 边车之间的调用可以是 HTTP 或 gRPC
  4. 服务 B的 Dapr 边车将请求转发至服务 B 上的特定端点 (或方法) 。 服务 B 随后运行其业务逻辑代码。
  5. 服务 B 发送响应给服务 A。 响应将转至服务 B 的边车。
  6. Dapr 将消息转发至服务 A 的 Dapr 边车。
  7. 服务 A 接收响应。

python和node服务示例

ipdCVT

  1. Node.js应用程序有一个app IDnodeapp的Dapr应用程序。 当python应用程序通过 POST http://localhost:3500/v1.0/invoke/nodeapp/method/neworder 调用 Node.js 应用程序的 neworder方法时, 首先会到达python app的本地dapr sidecar。
  2. Dapr 使用本地机器运行的名称解析组件(在这种情况下自动运行的 mDNS),发现 Node.js 应用的位置。
  3. Dapr 使用刚刚收到的位置将请求转发到 Node.js 应用的 sidecar。
  4. Node.js 应用的 sidecar 将请求转发到 Node.js 应用程序。 Node.js 应用执行其业务逻辑,记录收到的消息,然后将订单 ID 存储到 Redis (未在图表中显示)中
  5. Node.js应 用程序通过 Node.js sidecar 向 Python 应用程序发送一个响应。
  6. Dapr 转发响应到 Python 的 Dapr sidecar
  7. Python 应用程序收到响应。

API 和端口

Dapr runtime 对外提供两个 API,分别是 Dapr HTTP API 和 Dapr gRPC API。另外两个 dapr runtime 之间的通讯 (Dapr internal API) 固定用 gRPC 协议。

两个 Dapr API 对外暴露的端口

  • 3500: HTTP 端口,可以通过命令行参数 dapr-http-port 设置
  • 50001: gRPC 端口,可以通过命令行参数 dapr-grpc-port 设置
1
dapr run --app-id nodeapp --app-port 3000 --dapr-http-port 3500 --dapr-grpc-port 50001 node app.js

Dapr internal API 是内部端口,比较特殊,没有固定的默认值,而是取任意随机可用端口。也可以通过命令行参数 dapr-internal-grpc-port 设置。

为了向服务器端的应用发送请求,dapr 需要获知应用在哪个端口监听并处理请求,这个信息通过命令行参数 app-port 设置。Dapr 的示例中一般喜欢用 3000 端口。

实践

我们使用官方的教程来实验一下

启动服务

我们先开启node的订单服务

1
2
3
cd ./hello-world/node
npm install
dapr run --app-id nodeapp --app-port 3000 --dapr-http-port 3500 node app.js
1
ℹ️  Starting Dapr with id nodeapp. HTTP Port: 3500. gRPC Port: 54626

服务解析

启动后,我们先看看订单服务有什么操作
创建订单:/neworder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
app.post('/neworder', async (req, res) => {
const data = req.body.data;
const orderId = data.orderId;
console.log("Got a new order! Order ID: " + orderId);

const state = [{
key: "order",
value: data
}];

try {
const response = await fetch(stateUrl, {
method: "POST",
body: JSON.stringify(state),
headers: {
"Content-Type": "application/json"
}
})
if (!response.ok) {
throw "Failed to persist state.";
}
console.log("Successfully persisted state.");
res.status(200).send();
} catch (error) {
console.log(error);
res.status(500).send({message: error});
}
});

获取订单:/order

1
2
3
4
5
6
7
8
9
10
11
12
13
14
app.get('/order', async (_req, res) => {
try {
const response = await fetch(`${stateUrl}/order`)
if (!response.ok) {
throw "Could not get state.";
}
const orders = await response.text();
res.send(orders);
}
catch (error) {
console.log(error);
res.status(500).send({message: error});
}
});

服务调用

http调用方式

yMot68

使用 Dapr cli 调用
1
2
3
dapr invoke --app-id nodeapp --method neworder --data-file sample.json
// sample.json
{"data":{"orderId":"42"}}
1
2
3
4
// 创建订单
dapr invoke --app-id nodeapp --method neworder --data '{"data": { "orderId": "42" } }'
// 获取订单
dapr invoke --app-id nodeapp --method order --verb GET
使用curl调用

通过dapr的Endpoint

1
2
3
4
// 创建订单
curl -XPOST -d @sample.json -H Content-Type:application/json http://localhost:3500/v1.0/invoke/nodeapp/method/neworder
// 获取订单
curl http://localhost:3500/v1.0/invoke/nodeapp/method/order

通过Node程序自己的Endpoint,这样不通过Dapr Sidecar。

1
curl -XPOST -d @sample.json -H "Content-Type:application/json" http://localhost:3000/neworder
使用Postman调用

6iLdya
0cTGFL

使用SDK

默认HTTP

1
2
3
4
5
6
7
8
9
10
11
12
13
import { DaprClient } from "@dapr/dapr";
const client = new DaprClient(daprHost, daprPort);

const serviceAppId = "nodeapp";
const serviceMethod = "neworder";

// POST Request
const response = await client.invoker.invoke(serviceAppId , serviceMethod , HttpMethod.POST, { data: {"orderId":"42"} });

const serviceMethod2 = "order";
// GET Request
const response = await client.invoker.invoke(serviceAppId , serviceMethod2 , HttpMethod.GET);

另一个程序语言的服务调用(python)

接下来部署Python的程序。Python也得先装运行环境:

1
2
3
cd ./hello-world/python
sudo apt install python3-pip
dapr run --app-id pythonapp --dapr-http-port 3501 python3 app.py

启动成功了。因为Python自己不提供服务,所以–app-port不用指定。–dapr-http-port是3501,这是自己的Sidecar用的端口,不能跟别人的重了。再看看刚才Node的窗口,不停的有新的Request过来,就是Python程序来的每隔一秒的Request。
最后看一下dapr list的结果:

1
2
3
APP ID     HTTP PORT  GRPC PORT  APP PORT  COMMAND         AGE  CREATED              PID
nodeapp 3500 35485 3000 node app.js 41m 2020-12-27 00:54.54 18395
pythonapp 40175 33349 0 python3 app.py 1m 2020-12-27 01:36.27 31185

GRPC调用方式

ryUFBq

GRPC调用方式,在application得起一个GRPC服务,然后通过dapr调用application的GRPC服务

使用SDK
1
2
3
4
5
6
7
import { DaprClient, CommunicationProtocol } from "@dapr/dapr";
const client = new DaprClient(daprHost, daprPort, CommunicationProtocol.GRPC);

// Create a Proxy that allows us to use our gRPC code

const clientProxy = await client.proxy.create<GreeterClient>(GreeterClient);

前提

本次我们将实践dapr的mDns和consul的负载均衡

服务示例

  1. service1是一个获取ip的nodejs项目,分别使用dapr部署在dapr1dapr2的虚拟机上
  2. service2是另外一个服务,调用server1的getIp服务,部署在dapr3的虚拟机上

我们的目标是通过postman访问service2,调用部署在dapr1、dapr2的getIp服务,看看是否能达到负载均衡

service1核心代码:

index.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const express = require('express')
const os = require('os');
const app = express()
const port = 3000

function getIpAddress() {
var interfaces=os.networkInterfaces()

for (var dev in interfaces) {
let iface = interfaces[dev]

for (let i = 0; i < iface.length; i++) {
let {family, address, internal} = iface[i]

if (family === 'IPv4' && address !== '127.0.0.1' && !internal) {
return address
}
}
}
}

app.get('/getIp', (req, res) => {
const ipAddress = getIpAddress()
res.send(ipAddress)
})

app.listen(port, () => {
console.log(`Example app listening on port ${port}`)
})

mDns负载均衡(局域网内)

1
2
3
4
5
6
7
8
9
10
11
12

dapr1 192.168.10.201: dapr部署service1
dapr run --dapr-http-port 3600 --app-port 3000 --app-id service1 node index.js


dapr2 192.168.10.202: dapr部署service1
dapr run --dapr-http-port 3600 --app-port 3000 --app-id service1 node index.js


dapr3: 192.168.10.203: dapr部署service2
dapr run --dapr-http-port 3500 --app-id service2

多次访问,可看到达到了负载的效果

1
curl http://192.168.10.203:3500/v1.0/invoke/service1/method/getIp
1
2
3
4
192.168.10.202
192.168.10.201
192.168.10.202
192.168.10.201

consul的负载均衡(夸机房,不需要在同一局域网)

前面的负载我们使用了mDNS,官方还为我们提供了Consul名字解析组件,我们新建一个虚拟机Consul(192.168.10.204)来安装Consul
安装步骤可参考:Consul集群-服务自动发现实践

Q7hLRH

下面修改Dapr1和Dapr2机器上的~/.dapr/config.yaml配置,让其使用Consul来解析名字

1
2
3
4
5
6
7
8
9
10
11
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprConfig
spec:
nameResolution:
component: "consul"
configuration:
client:
address: "192.168.10.204:8500"
selfRegister: true

修改Dapr3机器上的~/.dapr/config.yaml配置

1
2
3
4
5
6
7
8
9
10
11
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprConfig
spec:
nameResolution:
component: "consul"
configuration:
client:
address: "10.8.99.45:8500"
selfRegister: false

当我们重新启动dapr1和dapr2的service1后,我们看到,已经注册到consul上去了。
fmRPJh

问题

待解决
consul如何自动剔除失效服务

Consul移除失效服务

1
curl -X PUT http://127.0.0.1:8500/v1/agent/service/deregister/{service_id}

Consul移除正常关机nodes节点

1
curl -X PUT http://127.0.0.1:8500/v1//agent/force-leave/{node}

目标

consul集群服务+consul-template+nginx实现nginx反向代理地址的自动更新

  • consul:自动发现、自动更新,为容器提供服务(添加、删除、生命周期)
  • registrator(自动发现+注册到consul-server端)
  • consul-template模板(更新)

核心机制: 后端每更新一个容器,会向registrator进行注册,控制consul完成更新操作,consul会触发consul-template模板进行热更新(reload)

启动Consul集群

启动4个consul,其中consul1 是主节点,consul2、consul3 是子节点。consul4是提供ui服务的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
version: '3.5'
services:
consul1:
image: consul:latest
container_name: consul1
restart: always
command: agent -server -client=0.0.0.0 -bootstrap-expect=3 -node=consul1
volumes:
- ./consul1/data:/consul/data
- ./consul1/config:/consul/config
consul2:
image: consul:latest
container_name: consul2
restart: always
command: agent -server -client=0.0.0.0 -retry-join=consul1 -node=consul2
volumes:
- ./consul2/data:/consul/data
- ./consul2/config:/consul/config
consul3:
image: consul:latest
container_name: consul3
restart: always
command: agent -server -client=0.0.0.0 -retry-join=consul1 -node=consul3
volumes:
- ./consul3/data:/consul/data
- ./consul3/config:/consul/config
consul4:
image: consul:latest
container_name: consul4
restart: always
ports:
- 8500:8500
command: agent -client=0.0.0.0 -retry-join=consul1 -ui -node=client1
volumes:
- ./consul4/data:/consul/data
- ./consul4/config:/consul/config

EVl7kb

  • -server:表示当前使用的server模式;如果没有指定,则表示是client模式。
  • -node:指定当前节点在集群中的名称。
  • -config-dir:指定配置文件路径,定义服务的;路径下面的所有.json结尾的文件都被访问;缺省值为:/consul/config。
  • -data-dir: consul存储数据的目录;缺省值为:/consul/data。
  • -datacenter:数据中心名称,缺省值为dc1。
  • -ui:使用consul自带的web UI界面 。
  • -join:加入到已有的集群中。
  • -enable-script-checks: 检查服务是否处于活动状态,类似开启心跳。
  • -bind: 绑定服务器的ip地址。
  • -client: 客户端可访问ip,缺省值为:“127.0.0.1”,即仅允许环回连接。
  • -bootstrap-expect:在一个datacenter中期望的server节点数目,consul启动时会一直等待直到达到这个数目的server才会引导整个集群。这个参数的值在同一个datacenter的所有server节点上必须保持一致。

consul 端口

端口 说明
TCP/8300 8300端口用于服务器节点。客户端通过该端口RPC协议调用服务端节点
TCP/UDP/8301 8301端口用于单个数据中心所有节点之间的互相通信, 即对LAN池信息的同步。她使得整个数据中心能够自动发现服务器地址,分布式检测节点故障,事件广播(如领导选举事件)
TCP/UDP/8302 8302端口用于单个或多个数据中心之间的服务器节点的信息同步,即对WAN池信息的同步。它针对互联网的高延迟进行了优化,能够实现跨数据中心请求
8500 8500端口基于HTTP协议,用于API接口或者WEB UI访问
8600 8600端口作为DNS服务器,它使得我们可以通过节点名查询节点信息

通过http api 获取集群信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 查看集群server成员
[root@localhost consul]# curl 127.0.0.1:8500/v1/status/peers
[
"172.23.0.5:8300",
"172.23.0.3:8300",
"172.23.0.4:8300"
]

# 查看集群Raf leader
[root@localhost consul]# curl 127.0.0.1:8500/v1/status/leader
"172.23.0.4:8300"

# 查看注册的所有服务
[root@localhost consul]# curl 127.0.0.1:8500/v1/catalog/services
{"consul":[]}

# 查看nginx服务的信息
[root@localhost consul]# curl 127.0.0.1:8500/v1/catalog/nginx

# 集群节点详细信息
[root@localhost consul]# curl 127.0.0.1:8500/v1/catalog/nodes
[{"ID":"dc6703d1-2324-c388-d5bc-226d7d79e733","Node":"client1","Address":"172.23.0.2","Datacenter":"dc1","TaggedAddresses":{"lan":"172.23.0.2","lan_ipv4":"172.23.0.2","wan":"172.23.0.2","wan_ipv4":"172.23.0.2"},"Meta":{"consul-network-segment":""},"CreateIndex":8,"ModifyIndex":11},{"ID":"f328a0ce-ba8b-b270-f2f0-850f2f762334","Node":"consul1","Address":"172.23.0.4","Datacenter":"dc1","TaggedAddresses":{"lan":"172.23.0.4","lan_ipv4":"172.23.0.4","wan":"172.23.0.4","wan_ipv4":"172.23.0.4"},"Meta":{"consul-network-segment":""},"CreateIndex":5,"ModifyIndex":10},{"ID":"324bef0c-fa7e-ae47-3bcc-6f06e45f8e4b","Node":"consul2","Address":"172.23.0.5","Datacenter":"dc1","TaggedAddresses":{"lan":"172.23.0.5","lan_ipv4":"172.23.0.5","wan":"172.23.0.5","wan_ipv4":"172.23.0.5"},"Meta":{"consul-network-segment":""},"CreateIndex":7,"ModifyIndex":13},{"ID":"d59cbbc9-d1b9-da27-5c9b-3e35eabae824","Node":"consul3","Address":"172.23.0.3","Datacenter":"dc1","TaggedAddresses":{"lan":"172.23.0.3","lan_ipv4":"172.23.0.3","wan":"172.23.0.3","wan_ipv4":"172.23.0.3"},"Meta":{"consul-network-segment":""},"CreateIndex":9,"ModifyIndex":12}]

部署registrator

1
2
3
4
5
6
7
8
9
10
version: "3.5"
services:
registrator:
image: gliderlabs/registrator:latest
container_name: registrator
restart: always
network_mode: host
volumes:
- "/var/run/docker.sock:/tmp/docker.sock"
command: consul://10.8.99.45:8500

consul-template自动加入nginx集群

1
2
3
4
5
cd /opt/
wget https://releases.hashicorp.com/consul-template/0.26.0/consul-template_0.26.0_linux_amd64.zip

# 解压到指定目录,推荐/usr/bin/目录下,可以直接使用
tar -xf consul-template_0.26.0_linux_amd64.zip -C /usr/bin/

配置模板文件

vim /data/template/nginx.ctmpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
upstream http_backend {         
{{range service "nginx"}}
server {{.Address}}:{{.Port}};
{{end}}
}

server {
listen 1216; ##监听consul的端口,这是代理端口,consul是作为一个代理服务,访问后端的容器服务
server_name localhost 10.8.99.45; ##监听本地地址,监听代理端的地址
access_log /var/log/nginx/access.log; ##nginx日志目录,如果是编译安装的nginx需要自行创建
index index.html index.php;
location / { ##反向代理的信息,代理的头部信息
proxy_set_header HOST $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Client-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://http_backend; ##跳转到服务器池的地址和端口
}
}

安装nginx

1
yum install -y nignx
1
2
3
4
cat /etc/nginx/nginx.conf
...
include /etc/nginx/conf.d/*.conf;
...

启动consul-template,指定template模板文件及生成路径

1
2
3
# 进入监控状态
[root@localhost ~]# consul-template -consul-addr 10.8.99.45:8500 -template \ "/data/template/nginx.ctmpl:/etc/nginx/conf.d/app.conf:nginx -s reload" \
--log-level=info
  • /data/template/nginx.ctmpl 模板文件
  • /etc/nginx/conf.d/app.conf 生成的配置文件
  • nginx -s reload,重载服务
  • -log-level=info,指定日志级别

查看生成后的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
cat /etc/nginx/conf.d/app.conf

upstream http_backend {

server 10.8.99.45:81;

server 10.8.99.45:82;

server 10.8.99.45:83;

server 10.8.99.45:84;

}
server {
listen 1216;
server_name localhost 10.0.0.14;
access_log /var/log/nginx/access.log;
index index.html index.php;
location / {
proxy_set_header HOST $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Client-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://http_backend;
}
}

由此完成通过consul的实时新增删除服务,然后compose-template可以实现动态增加服务节点到nginx代理的配置文件,这样就形成了服务的自动化增减

上一次我们实践了 Local 模式和Standalone 模式 :Flink多种集群模式部署实践
我们来实践一下推荐生产环境部署的 Flink on yarn模式

这篇贼辛苦,因为涉及到 hadoop和flink的部署,不过收获也颇多。

在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
Yarn的资源可以按需使用,提高集群的资源利用率
Yarn的任务有优先级,根据优先级运行作业
基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)

  • JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
  • 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
  • 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManage

Flink如何和Yarn进行交互?

sh7K78

  • 1.Client上传jar包和配置文件到HDFS集群上

  • 2.Client向Yarn ResourceManager提交任务并申请资源

  • 3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager

    JobManager和ApplicationMaster运行在同一个container上。一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。这个配置文件也被上传到HDFS上。此外,AppMaster容器也提供了Flink的web服务接口。YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink

  • 4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager

  • 5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

部署模式

首先明确下面两个术语:

  • Flink Application:Flink Application 是一个 Java 应用程序,它通过 main 方法(或其他方式)提交一个或多个 Flink Job。提交作业通常通过调用 execute 来完成。
  • Flink Job:是 Flink DAG 图在运行时的具体表现,一个 DAG 对应一个 Flink Job,一般是通过在 Flink Application 中调用 execute 来创建和提交的。

然后再来看 Flink 如何区分不同的部署模式:

  • 集群的生命周期和隔离性保证
  • application 的 main 方法在哪里执行

KpoPTd

Session Mode

多个job共享同一个集群<jobmanager/tastmanager>、job退出集群也不会退出,用户类的main方法再client端运行
适用:需要频繁提交小job,因为每次提交新job时候,不需要去向yarn注册应用

  1. Session模式是预分配资源的,也就是提前根据指定的资源参数初始化一个Flink集群,并常驻在YARN系统中,拥有固定数量的JobManager和TaskManager,该资源中JobManager有且只有一个。
  2. 提交到这个集群的作业可以直接运行,免去了每个提交作业都启动一个完整集群的资源开销,但是Session的资源总量有限,多个作业之间又不是隔离的,故可能会造成资源的争用,如果有一个作业因为异常导致TaskManager宕机,则它上面承载着的所有作业也都会受到影响。
  3. 另外,启动的作业越多,JobManager的负载也就越大。所以,Session模式一般用来部署那些对延迟非常敏感但运行时长较短的作业。

Per-Job Mode (1.15 deprecated).

每个job独享一个集群,job退出集群则退出,用户类的main方法再client端运行
适用:大job,运行时间很长,因为每起一个job,都要去向yarn申请容器启动jm,tm,比较耗时

  1. 为了提供更好的资源隔离保证,在Per-Job模式下,每个提交到YARN上的作业会各自形成单独的Flink集群,拥有专属的JobManager和TaskManager。
  2. 当作业完成时,分配给当前作业的集群将被销毁,所有缓存在集群中的资源(文件等)将被清除。作业之间的资源完全隔离,一个作业的TaskManager失败不会影响其他作业的运行。
  3. Per-Job模式一般用来部署那些长时间运行的作业。

每个job独享一个集群,job退出集群则退出,用户类的main方法再集群上运行

  1. application模式,在该模式下会为每个提交的应用创建一个集群,用户程序的 main 方法将在JobManager集群中而不是客户端运行。
  2. Application模式的会话集群,仅在特定应用程序的作业之间共享,并在应用程序完成时终止。
  3. 在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。

部署实践运行与测试

1
2
3
4
5
6
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

需要说明的是,Flink on yarn模式部署时,实际上不需要对Flink做任何修改配置,只需要将其解压传输到各个节点之上
如果要实现高可用的方案,这个时候就需要到Flink相应的配置修改参数,具体的配置文件是FLINK_HOME/conf/flink-conf.yaml。

standalone模式还是需要配置 Flink多种模式部署实践中的Standalone 模式
对于Flink on yarn模式,我们并不需要在conf配置下配置 masters和slaves。因为在指定TM的时候可以通过参数“-n”来标识需要启动几个TM;Flink on yarn启动后,如果是在分离式模式你会发现,在所有的节点只会出现一个 YarnSessionClusterEntrypoint进程;如果是客户端模式会出现2个进程一个YarnSessionClusterEntrypoint和一个FlinkYarnSessionCli进程。

Session Mode

Session Mode的运行主要有两个步骤。第一步是启动yarn session,开辟资源,第二步是flink run运行job。
用yarn session启动集群时,有两种方式可以使用,分别是客户端模式和分离模式

客户端模式启动yarn session

使用如下启动命令

1
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -s 2

r3hxgX

  • -n: TaskManager的数量,相当于executor的数量
  • -s: 每个JobManager的core的数量,executor-core,建议将slot的数量设置成每台机器处理器数量
  • -tm: 每个TaskManager的内存大小
  • -jm: JobManager的内存大小
  1. 上面的命令的意思是,同时向Yarn申请3个container,其中2个Container启动TaskManager(-n 2),每个TaskManager 拥有两个Task Slot(-s 2),并且向每个TaskManager的Container申请1024 M 的内存,以及一个ApplicationMaster(Job Manager)。

  2. 在客户端模式下,那作业提交后,资源的大小是由yarn的队列所决定的,多个job提交,资源的占用和竞争都是由yarn所控制。

  3. 客户端模式下,jps查看的进程有两个

    1
    2
    FlinkYarnSessionCli
    YarnSessionClusterEntrypoint

    wolnIQ

  4. hadoop application查看
    JsJP0d

  5. yarn命令行查看

    1
    2
    3
    4
    5
    [root@hadoop103 flink-1.15.2]# yarn application -list
    2022-09-28 17:13:04,195 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.10.103:8032
    Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
    Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
    application_1664355973790_0001 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://192.168.10.103:39719
  6. 从flink daskboard查看
    aF0TY7

分离模式启动yarn session

使用启动命令如下

1
./bin/yarn-session.sh -d -n 2 -jm 1024 -tm 1024 -s 2 

tE8rCO

  • -d: 指定分离模式
  1. 分离模式在启动的时候会在yarn中常驻一个进程,并且已经确定了之后提交的job的内存等资源的大小,比如8G内存,如果某一个job把8G内存全部占完了,只能是第一个job执行完成把资源释放了,第二个job才能继续执行。

  2. 分离模式下,jps查看进程有一个

    1
    YarnSessionClusterEntrypoint

    KKrpSE

  3. hadoop application查看
    B7txuj

  4. yarn命令行查看

    1
    2
    3
    4
    5
    [root@hadoop102 flink-1.15.2]# yarn application -list
    2022-09-28 17:28:38,451 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.10.103:8032
    Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
    Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
    application_1664357090524_0002 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://192.168.10.103:41395
  5. 从flink daskboard查看
    23S1K2

提交任务

1
2
./bin/flink run ./examples/batch/WordCount.jar
./bin/flink run ./examples/batch/WordCount.jar --input /opt/tmp/words --output /opt/tmp/test5.txt

Flink 作业提交后,若要在Flink WebUI上查看作业的详细可以通过如下操作

  • 方式1:从终端查看链接
    h6kefu
  • 方式2:从hadoop查看
    点击yarn集群管理器中启动的application_ID,进入如下页面,点击红色方框
    0UT1hK

rphVSR

eAJvh2

yarn session资源释放

1
yarn application -kill application_1664357090524_0002

kEWUT4

启动完整参数

1
./bin/yarn-session.sh 参数
  • -n: 指定TaskManager的数量;
  • -d: 以分离模式运行;
  • -id: 指定yarn的任务ID;
  • -j: Flink jar文件的路径;
  • -jm: JobManager容器的内存(默认值:MB);
  • -nl: 为YARN应用程序指定YARN节点标签;
  • -nm: 在YARN上为应用程序设置自定义名称;
  • -q: 显示可用的YARN资源(内存,内核);
  • -qu: 指定YARN队列;
  • -s: 指定TaskManager中slot的数量;
  • -st: 以流模式启动Flink;
  • -tm: 每个TaskManager容器的内存(默认值:MB);
  • -z: 命名空间,用于为高可用性模式创建Zookeeper子路径;

Per-Job Mode 单作业模式部署

启动

为了方便查看测试过程,我们使用flink包中的SocketWindowWordCount项目,我们需要提前启动socket服务及端口。
1、启动socket服务

1
nc -lk 9002

2、启动Per-Job命令并启动任务

1
./bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 examples/streaming/SocketWindowWordCount.jar --hostname hadoop102 --port 9002

官方:

1
./bin/flink run -t yarn-per-job  -yjm 1024 -ytm 1024 ./examples/streaming/SocketWindowWordCount.jar --hostname hadoop102 --port 9002
  • -t: 表示启动方式(–target),可取yarn-per-job、yarn-session、run-application等
  • -yjm: JobManager内存
  • -ytm: TaskManager内存大小申请

启动日志
m0T8UX
JPS查看
IBuAys
yarn集群查看
y4cGPP
从上图中我们可以看出,yarn cluster上管理了一个ID为application_1664357090524_0003、name为Flink per-job cluster的资源环境。鼠标点击ID,可计入详细信息界面。
dBLRnB
点击上图中的Tracking URL:ApplicationMaster,可以打开Flink web ui管理界面。
XpuUYQ

测试数据

dX69uU

OtTtLH

当前示例是一个实时流处理,只要socket服务和端口一直处于开启状态,Flink作业就会一直处于运行状态。我们在前面介绍中说过,Per-Job模式的作业,执行完成就会释放yarn资源,停掉socket作业和端口服务,当前yarn所管理的flink 作业和资源就会得到释放。

在socket服务界面 CTR+C 结束服务, Flink作业日志界面会打印如下信息
wQfSaZ

6Jc0Dk

Yarn集群
ATVVuX

Application Mode 应用模式部署

1
2
3
4
5
6
7
8
bin/flink run-application -t yarn-application -p 3\
-Dparallelism.default=3 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10 \
-Dyarn.application.name="application_test" \
./exapmles/batch/WorldCount.jar
  • -t 支持yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application)
  • -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等
  1. jobmanager.memory.process.size=2048m,taskmanager.memory.process.size=4096m是JM和TM内存参数设置
  2. taskmanager.numberOfTaskSlots=2为设置TaskManager slots个数为3
  3. 指定并发还可以使用 -Dparallelism.default=3,-p的作用与这个相同,社区目前倾向使用-D+通用配置代替客户端命令参数-p
  4. yarn.provided.lib.dirs参数:可以预先上传 flink 客户端依赖包到远端共享存储中(一般是hdfs),flink运行时的依赖包省去了上传的过程,避免了网络和存储资源的浪费。同时运行在集群中的所有作业都会使用一份远端 flink 依赖包,并且每个 yarn nodemanager 都会缓存一份,提交速度也会大大提升,对于跨机房提交作业会有很大的优化。
    1
    -Dyarn.provided.lib.dirs="hdfs://flink/libs;hdfs://flink/hotfix" \
    5.应用程序也可以提前上传到hdfs的/flink/examples目录下,将上例中./exapmles/batch/WorldCount.jar修改为hdfs://flink/examples/WorldCount.jar,也可以将yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml(如果没有增加一个)可参考官网配置说明,这时提交作业就和上面示例的普通作业没有区别了

常用命令

查看Flink Yarn Application

1
yarn application -list

停止Flink Yarn Application

1
yarn application -kill appID

相关链接

Flink 1.14.2 on Yarn 遇到的坑
flink1.10三节点集群之yarn-session模式

一、Local 模式

Local 模式是 Flink 提供的最简单部署模式,一般用来本地测试和演示使用。
进入 Flink 官网,下载 1.15.2 版本安装包 flink-1.15.2-bin-scala_2.12.tgz,注意此处选用对应 scala 版本为 scala 2.12 的安装包。
https://flink.apache.org/zh/downloads.html

将压缩包下载到本地,并且直接进行解压,使用 Flink 默认的端口配置,直接运行脚本启动:

1
tar -zxvf flink-1.15.2-bin-scala_2.12.tgz
1
./bin/start-cluster.sh

我们直接访问本地的 8081 端口,可以看到 Flink 的后台管理界面,验证 Flink 是否成功启动。

我们尝试提交一个测试任务:

1
./bin/flink run examples/batch/WordCount.jar

二、Standalone 模式

1.1 环境配置

Flink 安装部署的学习时,需要准备 3 台 Linux 机器。具体要求如下:

  • 系统环境为 CentOS 7.9 版本。
  • 安装 Java 8。
    1
    2
    3
    4
    [root@hadoop102 flink-1.15.2]# java -version
    java version "1.8.0_333"
    Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
    Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
  • 配置集群节点服务器间时间同步以及免密登录,关闭防火墙。

1.2 集群节点

Me0cWj

节点服务器 hadoop102 hadoop103 hadoop104
ip 192.168.10.102 192.168.10.103 192.168.10.104
角色 JobManager TaskManager TaskManager

1.3 下载并配置运行

进入 Flink 官网,下载 1.15.2 版本安装包 flink-1.15.2-bin-scala_2.12.tgz,注意此处选用对应 scala 版本为 scala 2.12 的安装包。
https://flink.apache.org/zh/downloads.html

  • 1、解压到/opt/module目录下

    1
    [root@hadoop102 flink]$ tar -zxvf flink-1.15.2-bin-scala_2.12.tgz -C /opt/module/
  • 2、进入/opt/module下 的Flink 目录下

    1
    [root@hadoop102 flink-1.15.2]$ cd /opt/module/flink-1.15.2
  • 3、进入conf目录中

    1
    [root@hadoop102 flink-1.15.2]$ cd /conf
  • 4、配置flink-conf.yaml文件

    1
    [root@hadoop102 conf]$ vim flink-conf.yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    # jobManager 的IP地址
    jobmanager.rpc.address: hadoop102
    # jobManager JVM heap内存大小
    jobmanager.heap.size: 1024m
    # jobManager 的rpc通信端口
    jobmanager.rpc.port: 6123
    # jobManager 绑定到的网络接口的地址
    jobmanager.bind-host: hadoop102
    # jobmanager进程使用的所有内存大小
    jobmanager.memory.process.size: 1600m
    # taskmanager服务地址 -> 不同主机改主机名称或ip 如:hadoop103、hadoop104
    taskmanager.bind-host: hadoop102/hadoop103/hadoop104
    # taskManager绑定到的网络接口的地址 -> 不同主机改主机名称或ip 如:hadoop103、hadoop104
    taskmanager.host: hadoop102/hadoop103/hadoop104
    # taskmanager进程使用的所有内存大小
    taskmanager.memory.process.size: 1728m
    # 每个TaskManager 提供的任务 slots 数量大小
    # 它的意思是当前task能够同时执行的线程数量 (实际生产环境建议是CPU核心-1,这里笔者写2)
    taskmanager.numberOfTaskSlots: 2
    # 默认并行度
    parallelism.default: 1
    # 重启策略
    jobmanager.execution.failover-strategy: region
    # 客户端应该用来连接到服务器的地址
    rest.address: hadoop102
    # web服务绑定的address
    rest.bind-address: hadoop102
  • 5、配置masters文件(根据实际环境设置)

    1
    hadoop102:8081
  • 6、配置workers文件(根据实际环境设置)

    1
    2
    hadoop103
    hadoop104
  • 7、分发其他机器

    1
    [root@hadoop102 conf]$ xsync /opt/module/flink-1.15.2
  • 8、启动集群

    1
    [root@hadoop102 flink-1.15.2]$ bin/start-cluster.sh 
  • 9、Web UI 默认端口 http://hadoop102:8081
    plfyWk

  • 10、关闭集群

    1
    [root@hadoop102 flink-1.15.2]$ bin/stop-cluster.sh 

1.4 问题

  • MacBook M1虚拟机
  • Centos7操作系统

Flink TaskManager启动报错,报错信息如下:

1
2
Error: VM option ‘UseG1GC’ is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.
Error: Could not create the Java Virtual Machine.

处理办法

  • 找到bin/taskmanager.sh文件
  • 搜索UseG1GC
  • 直接删除 -XX:+UseG1GC

三、on yarn 模式

on yarn中又分

  • Session Mode
  • Per-Job Mode (1.15 deprecated).
  • Application Mode(Flink 1.11引入)
    篇幅较大,单独:
    Flink on yarn

需求

  • 局域网A(家庭):群晖一台 192.168.199.186 、 路由器192.168.199.1 小米路由器;
  • 局域网B:我的macbook pro 10.8.99.45;

beoXra

tip: 图中100.x.y.z为macbook pro(junyao)群晖(homeDS)公司电脑(VM-0-2-centos)组网后的ip,图中192.168.0.x,为本例局域网A192.168.199.x内部设备局域网ip

假设家庭内网有一台群晖(homeDS)安装了 Tailscale 客户端,我们希望其他 Tailscale 客户端macbook pro(junyao)公司电脑(VM-0-2-centos)可以直接通过家中的局域网 IP段(例如 192.168.199.0/24)访问家庭内网的任何一台设备,

我们来尝试实现组网后macbook pro 10.8.99.45访问我的小米路由器192.168.199.1

搭建headscale服务端

我们在headscale私有部署 中已经将 macbook pro(junyao)群晖(homeDS)公司电脑(VM-0-2-centos)进行了组网,组网结果如下,
我们可以通过组网后的ip100.64.0.1100.64.0.4100.64.0.3 进行p2p互相访问

1
2
3
4
ID | Hostname      | Name                   | NodeKey | Namespace | IP addresses                  | Ephemeral | Last seen           | Online | Expired
1 | junyao | junyao-lxgc8rs7 | [kEPjO] | junyao | 100.64.0.1, fd7a:115c:a1e0::1 | false | 2022-09-15 02:39:21 | online | no
2 | homeDS | homeds-2xqvj919 | [96Nns] | junyao | 100.64.0.4, fd7a:115c:a1e0::4 | false | 2022-09-15 02:39:00 | online | no
3 | VM-0-2-centos | vm-0-2-centos-ieo6arqr | [1M6Gs] | junyao | 100.64.0.3, fd7a:115c:a1e0::3 | false | 2022-09-15 02:39:56 | online | no

此时我们要实现的就是 macbook pro(junyao) 访问 群晖(homeDS)内部局域网的设备访问,比如小米路由器。

配置群晖tailscale

  • 1、我们在群晖安装tailscale客户端
    DaY8pW
  • 2、进入ssh,在群晖系统 /etc/sysctl.conf 中增加
    1
    2
    net.ipv4.ip_forward=1
    net.ipv6.conf.all.forwarding=1
    让其支持 IPv4 与 IPv6 路由转发
  • 3、开启tailscale并设置--advertise-routes 让其打通局域网内部的路由访问,这个网段的所有设备都可以被访问
    1
    tailscale up --accept-dns=false --advertise-routes=192.168.199.0/24 --login-server=http://<ip>:8080
  • 4、headscale服务端设置
    在 headscale 端查看路由,可以看到相关路由是关闭的。
    1
    2
    3
    4
    5
    headscale nodes list|grep homeDS
    2 | homeDS | homeds-2xqvj919 | [96Nns] | junyao | 100.64.0.4, fd7a:115c:a1e0::4 | false | 2022-09-15 02:49:59 | online | no
    headscale routes list -i 2
    Route | Enabled
    192.168.199.0/24 | false
    开启群晖路由:
    1
    2
    3
    4
    headscale routes enable -i 2 -r "192.168.199.0/24"

    Route | Enabled
    192.168.199.0/24 | true

官方文档:Subnet routers and traffic relay nodes

现在你在任何一个 Tailscale 客户端所在的节点都可以 ping 通家庭内网的机器了
我们试试用macbook pro 10.8.99.45访问我的小米路由器192.168.199.1

2ELMDS

0%