SwiftUI 中的 Stacks 类似于 UIKit 中的 stack views。通过组合水平和垂直的方式排列视图构建更复杂的应用界面。Stacks 有 3 种类型:HStack、VStack 和 ZStack。

VStack

VFyRLT

可以通过 VStack 从上到下垂直堆叠视图,同时我们可以进一步添加alignment(对齐方式)或间距(spacing)来进一步自定义视图。

1
2
3
4
5
6
VStack(alignment: .leading, spacing: 16) {
Text("Hello, world!")
.font(.title)
Spacer()
Text("Second line")
}

HStack

CE5YPH
HStack 用于水平堆叠视图。就像 VStack 一样,您可以设置对齐方式和间距进一步自定义视图

1
2
3
4
5
6
HStack(alignment: .bottom, spacing: 16) {
Text("Hello, world!")
.font(.title)
Spacer()
Text("Second line")
}

ZStack

zPp77c
ZStack 类似设计软件的层概念,元素都是在一个视图上进行堆叠的,类似在三维立体空间堆叠视图,由于元素可以相互浮动,因此 ZStack 的对齐方式会将所有项目移到一个位置。

1
2
3
4
5
6
7
8
9
10
ZStack(alignment: .topLeading) {
Rectangle()
.foregroundColor(.blue)
Text("Hello, world!")
.font(.title)
Spacer()
Text("Second line")
}
.padding()
.frame(width: 320)

在前面的文章我已用过@State属性,使用@State修饰某个属性后,SwiftUI将会把该属性存储到一个特殊的内存区域内,并且这个区域和View struct是隔离的;
当@State修饰的属性的值发生变化后,SwiftUI会根据该属性重新绘制视图;

在开发中,我们需要把一个View的属性,传递到一个子View中;
Swift中,值传递的形式是值传递,也就是说,传个子View的是值的拷贝;子视图对这个值进行了修改后,不会影响父视图;
使用@Binding修饰后,属性就变成了一个引用类型,这样子视图对值进行了修改后,父视图中的值也会发生变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct customButton:View {
@Binding var tapCount:Int
var body: some View {
Button {
self.tapCount += 1

} label: {
Text("Hello World 第\(tapCount)次点击")
} //渲染颜色
.tint(.purple)
// 按钮样式 .bordered、.borderless 和 .plain
.buttonStyle(.bordered)
//按钮边框样式
.buttonBorderShape(.capsule)
//按钮预设大小
.controlSize(.large)
}
}

struct ContentView: View {
@State private var tapCount = 0
var body: some View {

VStack{
customButton(tapCount: $tapCount)
}
}
}

我们看到我们将tapCount 作为参数传入customButton中,当customButton这个subview内进行点击修改的时候,我们的parentView也会随着变化。

在我之前@State研究中我们探讨过@State,通过它,我们可以方便的将值类型数据作为View的Source of truth(单一数据源)。在SwiftUI 1.0时代,如果想将引用类型作为source of truth,通常的方法是使用@EnvironmentObject或者 @ObservedObject。

@StateObject 是在 SwiftUI 2.0 中才添加的属性包装器,它的出现解决了在某些情况下使用 @ObservedObject 视图会出现超预期的问题

示例:
user类

1
2
3
class User: ObservableObject {
var username = "@twostraws"
}

如果要在各种视图中使用它,则需要在 SwiftUI 外部创建它并将其注入,或者在其中一个 SwiftUI 视图中创建它并使用 @StateObject,如下所示:

1
2
3
4
5
6
7
struct ContentView: View {
@StateObject var user = User()

var body: some View {
Text("Username: \(user.username)")
}
}

这将确保视图更新时不会破坏 User 实例

以前,可能曾经使用 @ObservedObject 来获得相同的结果,但这很危险–有时且仅在某些情况下,@ObservedObject 可能会意外释放其存储的对象,因为它并不是被设计为最终的真理来源。 目的。 @StateObject 不会发生这种情况,因此应该改用它。

推荐阅读

@StateObject 和 @ObservedObject 的区别和使用

安裝环境

1
2
3
4
5
6
7
8
9
brew install cmake protobuf rust git wget

# 使用 brew 安裝
brew install python@3.10

# 使用 pyenv 安裝
pyenv install 3.10.6
# 啟動 3.10.6
pyenv local 3.10.6

初始化 Stable Diffusion WebUI

1
2
3
git clone https://github.com/AUTOMATIC1111/stable-diffusion-webui
cd stable-diffusion-webui
./webui.sh

Stable Diffusion之模型篇

背景

4AxA4q

如上图,是我们各个业务系统进行SSO登录的流程,我们之前已经通过OIDC搭建之Ory Hydra 2.0实践,进行了登录流程的构建,那么本次我们进行一下登出流程的实践

首先我们的登出需求分为以下几种:

  • 1、所有应用统一登出: 账号在 pc1:admin、wms pc2:admin、pms 全部退出

  • 2、单应用统一登出: 账号在 pc1:admin pc2:admin 退出

  • 3、单token登出: 账号在 pc1:admin 退出,pc2:admin 不退出 (仅支持业务系统实现)

最终取决于 this.hydraAdminService.revokeOAuth2ConsentSessions的参数,具体实现如下

实现方式

业务系统 实现

在各个应用端添加退出接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 手工logout
async logout(user) {
const access_token = user.access_token
const subject = user.mongo_id
const client_id = user.app.client_id

// 清除 login session
const { data: revokeOAuth2LoginSessions } = await this.hydraAdminService.revokeOAuth2LoginSessions({ subject: subject })
console.log("revokeOAuth2LoginSessions", revokeOAuth2LoginSessions)

// 所有应用或单应用登出,清除 consent session 注销所有token
// 通过client单应用登录后通过all全应用退出。
const { data: revokeOAuth2ConsentSessions } = await this.hydraAdminService.revokeOAuth2ConsentSessions({ subject: subject, client: client_id, all: false, })
console.log("revokeOAuth2ConsentSessions", revokeOAuth2ConsentSessions)

// 单token登出:只注销当前token
// const headers = { 'Authorization': `Basic ${Buffer.from(`${this.config.get('app').client_id}:${this.config.get('app').client_secret}`).toString('base64')}` }
// console.log("headers", headers)
// const revokeOAuth2Token = await this.hydraPublicService.revokeOAuth2Token({ token: access_token }, { headers: headers })
// console.log("revokeOAuth2Token", revokeOAuth2Token)
// return
}

统一授权认证中心 实现

86IxXC

  • 要求
    登出链接:http://oauth2.dev.dakewe.com/oauth2/sessions/logout?id_token_hint=XXX

可选参数,id_token_hint其实是用来找到你对应的应用。建议都传
可选参数post_logout_redirect_uris,如需要退出后,返回业务系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
async getLogout(dto: GetLogoutDto) {
const logout_challenge = dto.logout_challenge
try {
const { data: getLogoutRequest } =
await this.hydraAdminService.getOAuth2LogoutRequest({ logoutChallenge: logout_challenge })
console.log('getLogoutRequest', getLogoutRequest)
return getLogoutRequest
} catch (error) {
if (error.isAxiosError) {
console.log(error.response)
throw new HttpException(`${error.response.data.error},${error.response.data.error_description}`, error.response.status);
} else {
throw new HttpException(error, 500);
}
}
}

async acceptLogout(acceptLogout: AcceptLogoutDto) {
const { logout_challenge, is_all = false } = acceptLogout
console.log("接受退出流程")
let getOAuth2LogoutRequest
try {
const { data: data } =
await this.hydraAdminService.getOAuth2LogoutRequest({ logoutChallenge: logout_challenge })
getOAuth2LogoutRequest = data
} catch (error) {
if (error.isAxiosError) {
console.log(error.response)
throw new HttpException(`${error.response.data.error},${error.response.data.error_description}`, error.response.status);
} else {
throw new HttpException(error, 500);
}
}
console.log('getOAuth2LogoutRequest', getOAuth2LogoutRequest)
try {
const subject = getOAuth2LogoutRequest.subject
const client = getOAuth2LogoutRequest.client
console.log('-------client_id', client?.client_id)
console.log('-------subject', subject)
// 清楚 logout session
const { data: acceptLogoutRequest } =
await this.hydraAdminService.acceptOAuth2LogoutRequest({ logoutChallenge: logout_challenge })
console.log('acceptLogoutRequest', acceptLogoutRequest)
// 清除 consent session 注销所有token
const { data: revokeOAuth2ConsentSessions } = await this.hydraAdminService.revokeOAuth2ConsentSessions({ subject: subject, client: client && is_all === false ? client.client_id : undefined, all: is_all, })
console.log("revokeOAuth2ConsentSessions", revokeOAuth2ConsentSessions)

return acceptLogoutRequest.redirect_to
} catch (error) {
if (error.isAxiosError) {
console.log(error.response)
throw new HttpException(`${error.response.data.error},${error.response.data.error_description}`, error.response.status);
} else {
throw new HttpException(error, 500);
}
}
}

相关链接

Implementing the OIDC logout endpoint & UI
Logout logic diagram

看着15年的swift笔记,想当初自己也是一名swifter呢,今天正式回归到swift开发,完成一个桌面端的AI产品。当前swift已经使用swiftUI了,那么就从swiftUI重新开始吧

SwiftUI 提供了许多属性包装器,可用于更新/观察数据和重新加载视图。这些属性包装器为视图和可变数据之间的交互提供了多种方式。为了在 SwiftUI 中构建一个很棒的应用程序,对这些包装器有一个清晰的理解非常重要。

@State 基础

@State 是一个简单的属性,应该只用于整数、字符串、布尔值等原始类型。@State 包装器是更新 视图变量状态的最简单方法。如果我们在带有 @State 属性包装器的视图中创建一个属性,SwiftUI 会以不同的方式管理该属性的内存。只要视图存在,它就会将变量的值保留在内存中。每当状态发生更改时,SwiftUI 会自动使用更新的信息重新加载视图。

示例

确保使用@State 的对象属于单个视图,并且它们被标记为私有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import SwiftUI
struct ContentView: View {
@State private var num: Int = 0
@State private var isIncreasing: Bool = false
var body: some View {
HStack {
Button(action: {
num -= 1
isIncreasing = false
}, label: {
Text("-")
})
.frame(width: 10, height: 10)
Text(isIncreasing ? "增加 to \(num)" : "减小 to \(num)")
.foregroundColor(.black)
Button(action: {
num += 1
isIncreasing = true
}, label: {
Text("+")
})
.frame(width: 12, height: 10)
}
}
}

在上面的例子中,有两个按钮’ + ‘和’-‘,分别增加和减少’ num ‘变量的值。状态变量 (num) 存储其先前的值并根据其状态更新视图。

简单理解就是 属性状态发生变化是,会自动更新UI

在有状态的流处理中,当开发人员启用了 Flink 中的 checkpoint 机制,那么状态将会持久化以防止数据的丢失并确保发生故障时能够完全恢复。选择何种状态后端,将决定状态持久化的方式和位置。

Flink 提供了三种可用的状态后端:MemoryStateBackendFsStateBackend,和RocksDBStateBackend

IAXXfM

MemoryStateBackend

MemoryStateBackend 是将状态维护在 Java 堆上的一个内部状态后端。键值状态和窗口算子使用哈希表来存储数据(values)和定时器(timers)。当应用程序 checkpoint 时,此后端会在将状态发给 JobManager 之前快照下状态,JobManager 也将状态存储在 Java 堆上。默认情况下,MemoryStateBackend 配置成支持异步快照。异步快照可以避免阻塞数据流的处理,从而避免反压的发生。

使用 MemoryStateBackend 时的注意点:

  • 默认情况下,每一个状态的大小限制为 5 MB。可以通过 MemoryStateBackend 的构造函数增加这个大小。

  • 状态大小受到 akka 帧大小的限制,所以无论怎么调整状态大小配置,都不能大于 akka 的帧大小。也可以通过 akka.framesize 调整 akka 帧大小(通过配置文档了解更多)。

  • 状态的总大小不能超过 JobManager 的内存。
    何时使用 MemoryStateBackend:

  • 本地开发或调试时建议使用 MemoryStateBackend,因为这种场景的状态大小的是有限的。

  • MemoryStateBackend 最适合小状态的应用场景。例如 Kafka consumer,或者一次仅一记录的函数 (Map, FlatMap,或 Filter)。

FsStateBackend

FsStateBackend 需要配置的主要是文件系统,如 URL(类型,地址,路径)。举个例子,比如可以是:

  • “hdfs://namenode:40010/flink/checkpoints” 或
  • “s3://flink/checkpoints”
    当选择使用 FsStateBackend 时,正在进行的数据会被存在 TaskManager 的内存中。在 checkpoint 时,此后端会将状态快照写入配置的文件系统和目录的文件中,同时会在 JobManager 的内存中(在高可用场景下会存在 Zookeeper 中)存储极少的元数据。

默认情况下,FsStateBackend 配置成提供异步快照,以避免在状态 checkpoint 时阻塞数据流的处理。该特性可以实例化 FsStateBackend 时传入 false 的布尔标志来禁用掉,例如:

1
new FsStateBackend(path, false);

使用 FsStateBackend 时的注意点:

  • 当前的状态仍然会先存在 TaskManager 中,所以状态的大小不能超过 TaskManager 的内存。
    何时使用 FsStateBackend:

  • FsStateBackend 适用于处理大状态,长窗口,或大键值状态的有状态处理任务。

  • FsStateBackend 非常适合用于高可用方案。

RocksDBStateBackend

RocksDBStateBackend 的配置也需要一个文件系统(类型,地址,路径),如下所示:

  • “hdfs://namenode:40010/flink/checkpoints” 或
  • “s3://flink/checkpoints”
    RocksDB 是一种嵌入式的本地数据库。RocksDBStateBackend 将处理中的数据使用 RocksDB 存储在本地磁盘上。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。RocksDB 默认也是配置成异步快照的模式。

使用 RocksDBStateBackend 时的注意点:

  • RocksDB 支持的单 key 和单 value 的大小最大为每个 2^31 字节。这是因为 RocksDB 的 JNI API 是基于 byte[] 的。

  • 我们需要强调的是,对于使用具有合并操作的状态的应用程序,例如 ListState,随着时间可能会累积到超过 2^31 字节大小,这将会导致在接下来的查询中失败。
    何时使用 RocksDBStateBackend:

  • RocksDBStateBackend 最适合用于处理大状态,长窗口,或大键值状态的有状态处理任务。

  • RocksDBStateBackend 非常适合用于高可用方案。

  • RocksDBStateBackend 是目前唯一支持增量 checkpoint 的后端。增量 checkpoint 非常使用于超大状态的场景。
    当使用 RocksDB 时,状态大小只受限于磁盘可用空间的大小。这也使得 RocksDBStateBackend 成为管理超大状态的最佳选择。使用 RocksDB 的权衡点在于所有的状态相关的操作都需要序列化(或反序列化)才能跨越 JNI 边界。与上面提到的堆上后端相比,这可能会影响应用程序的吞吐量。

不同状态后端满足不同场景的需求,在开始开发应用程序之前应该仔细考虑和规划后选择。这可确保选择了正确的状态后端以最好地满足应用程序和业务需求。

背景

UDF 实现的几种方式

udf 提供 5 种自定义方式,在实际业务中,我们根据业务需求继承对应的 Function,并实现对应的方法。

  • 标量函数 将标量值转换成一个新标量值;简单理解就是 一对一模式,比如输入一个身份证号码,输出一个加密的字符串,实现(ScalarFunction),重写 eval() 方法。

  • 表值函数 将标量值转换成新的行数据;简单理解就是一对多模式,比如一个字段存储为 逗号隔开的字符串,可通过(TableFunction)进行 split,输出多条数据。

  • 聚合函数 将多行数据里的标量值转换成一个新标量值; 多对一模式,通过聚合操作把多行输出为一个值。(AggregateFunction)

  • 表聚合函数将多行的标量值映射到新行。

  • 异步表函数是用于执行查找的表源的特殊函数。

以下主要介绍标量(Scalar)UDF的实现

实践:截取字符串

实现自定义的 UDF 功能主要分为以下几个步骤:

1、自定义 UDF 函数的实现逻辑,打包成 jar 包
2、上传 jar 包 到 flink 工程的 lib 目录下
3、使用 Flink 中的 SQL API 直接进行使用

1、自定义 UDF 函数

  • 创建 maven 工程 ,工程需要引用以下pom 依赖

    1
    2
    3
    4
    5
    6
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
  • 定义一个对字符串进行截取的函数,比如 5个字符截取1到 3位的字符

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    package org.example;
    import org.apache.flink.table.functions.ScalarFunction;
    /**
    * classname SubstringFunction
    * description 标量函数
    */
    public class SubstringFunction extends ScalarFunction {
    public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, end);
    }
    }

  • 使用 maven 进行打包

    1
    mvn clean packagen

我是使用streamPark,直接上传jar包即可

tbrmMR

3、使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE products(
  id INT,
  name STRING,
  description STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://127.0.0.1:3306/demo',
  'username' = 'root',
  'password' = '123456',
  'table-name' = 'products'
);
//创建 function 先需要声明函数
CREATE FUNCTION SubstringFunction as 'org.example.SubstringFunction';
//使用
SELECT id,name,SubstringFunction(name,1,3) AS substr from products;

相关链接

示例

上次我们试了下标量函数flink sql 自定义udf实践之标量函数
这次我们来试一下表值函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.example;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
* classname SplitFunction
* description 表值函数
*/
@FunctionHint(output = @DataTypeHint("Row<word String, length INT>"))
public class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
collect(Row.of(str, str.length()));
}
}

背景

方案待验证

业务上需要对一张含有商品ID的表进行打宽,把商品更多属性打宽到es供上下游es查询,遇到如下问题

  • 1、商品表将作为1张维表,而这张维表数据量达到了200W+,对储存、计算(内存)存在压力

  • 2、商品表在mongodb,而生态上没有source的connecter能支持到lookup join

  • 3、首次发现mongodb Temporal join 仅支持主键_id

以前基本都是mysql cdc,没太关注mongo,但这几个问题的出现,让我首次关注到mongodb在flink生态的支持程度。那么我们就换一个思路,引入flink table store湖仓,来解决ODS到DWD再到ADS这些问题:

  • 1、使得商品表作为一张可复用的维表
  • 2、解决商品表能lookup join,且不仅仅支持主键join,还要能支持非主键join
  • 3、降低对储存、计算(内存)存在压力

DjJ3MP

模拟场景:假设我的订单明细表(order_item)有product_id,一般产品的更多产品参数信息是不会都存到订单上,我需要将我的订单明细表通过产品表(product)这个维表进行打宽,然后写到es,供上下游进行根据商品信息搜索订单,或者进行聚合统计商品订购top10.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
-- 创建并使用 FTS Catalog
CREATE CATALOG `product_catalog` WITH (
'type' = 'table-store',
'warehouse' = '/tmp/table-store-101'
);

USE CATALOG `product_catalog`;


-- ODS table schema
-- 注意在 FTS Catalog 下,创建使用其它连接器的表时,需要将表声明为临时表
-- 产品源表ods
CREATE TEMPORARY TABLE ods_product (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'XXX',
'password' = 'XXX',
'database' = 'biocitydb',
'collection' = 'product'
);


-- DWD table schema
-- Create a table in table-store catalog
-- 产品入湖表dwd
CREATE TABLE `dwd_product` (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
)


-- ods to dwd
-- 源表入湖
INSERT INTO
dwd_product
select
_id,
created,
mfrId,
mfrName,
name,
ras,
sn,
spec,
status,
taxrate,
unit,
updated,
price,
taxcode,
clone,
lastOrderAt,
manual,
pn,
cumulativeSales,
isDeprecated,
ship,
storage,
isPublic,
invtCode
from
ods_product;

这时候我们在flink table store 创造了一个CDC出来的维表。

任务2:订单明细表ods_order_item 与维表 dwd_product 进行 lookup join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

USE CATALOG `product_catalog`;

-- DWD table schema
-- 产品入湖表dwd(维表)
CREATE TABLE `dwd_product` (
_id STRING,
created TIMESTAMP_LTZ(3),
mfrId STRING,
mfrName STRING,
name STRING,
ras STRING,
sn STRING,
spec STRING,
status BOOLEAN,
taxrate INT,
unit STRING,
updated TIMESTAMP_LTZ(3),
price DECIMAL(10, 5),
taxcode STRING,
clone STRING,
lastOrderAt TIMESTAMP_LTZ(3),
manual STRING,
pn STRING,
cumulativeSales INT,
isDeprecated BOOLEAN,
ship STRING,
storage STRING,
isPublic BOOLEAN,
invtCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
)

-- ODS table schema
-- 注意在 FTS Catalog 下,创建使用其它连接器的表时,需要将表声明为临时表
-- 订单源表
CREATE TEMPORARY TABLE `ods_order_item` (
order_id INT,
status INT,
price INT,
order_date DATE,
product_id STRING,
proc_time AS PROCTIME(),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'username' = 'XXX',
'password' = 'XXX',
'database' = 'biocitydb',
'collection' = 'order_item'
);


-- ADS table schema
-- es明细大宽表
CREATE TEMPORARY TABLE ads_es_enrich_order_item (
_id INT,
order_id STRING,
status STRING,
price DECIMAL(15,2),
order_date DATE,
product_id STRING,
mfr_name STRING, -- 打宽产品表
product_name STRING,
ras STRING,
sn STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'es_enrich_order_item'
);


-- 打宽
INSERT INTO
ads_es_enrich_order_item
SELECT
o.order_id,
o.status,
o.price,
o.order_date,
o.product_id,
p.mfrName,
p.name,
p.ras,
p.sn
FROM ods_order_item AS o
JOIN dwd_product FOR SYSTEM_TIME AS OF o.proc_time AS p
ON o.product_id = p._id; -- 这里不仅仅支持_id主键lookup join,也支持非主键

相关链接

flink table store Lookup Join
flink-table-store-101
Flink Table Store 0.3 构建流式数仓最佳实践

example for auto-create and connect table store

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

CREATE TEMPORARY TABLE word_count (
word STRING PRIMARY KEY NOT ENFORCED,
cnt BIGINT
) WITH (
'connector'='table-store',
'path'='file:/tmp/word',
'auto-create'='true'
);


CREATE TEMPORARY TABLE word_table (
word STRING
) WITH (
'connector' = 'datagen',
'fields.word.length' = '1'
);

SET 'execution.checkpointing.interval' = '10 s';


INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;


SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM word_count;
0%