flink sql 自定义udf实践之标量函数
背景
UDF 实现的几种方式
udf 提供 5 种自定义方式,在实际业务中,我们根据业务需求继承对应的 Function,并实现对应的方法。
标量函数 将标量值转换成一个新标量值;简单理解就是 一对一模式,比如输入一个身份证号码,输出一个加密的字符串,实现(ScalarFunction),重写 eval() 方法。
表值函数 将标量值转换成新的行数据;简单理解就是一对多模式,比如一个字段存储为 逗号隔开的字符串,可通过(TableFunction)进行 split,输出多条数据。
聚合函数 将多行数据里的标量值转换成一个新标量值; 多对一模式,通过聚合操作把多行输出为一个值。(AggregateFunction)
表聚合函数将多行的标量值映射到新行。
异步表函数是用于执行查找的表源的特殊函数。
以下主要介绍标量(Scalar)UDF的实现
实践:截取字符串
实现自定义的 UDF 功能主要分为以下几个步骤:
1、自定义 UDF 函数的实现逻辑,打包成 jar 包
2、上传 jar 包 到 flink 工程的 lib 目录下
3、使用 Flink 中的 SQL API 直接进行使用
1、自定义 UDF 函数
创建 maven 工程 ,工程需要引用以下pom 依赖
1
2
3
4
5
6<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>定义一个对字符串进行截取的函数,比如 5个字符截取1到 3位的字符
1
2
3
4
5
6
7
8
9
10
11
12package org.example;
import org.apache.flink.table.functions.ScalarFunction;
/**
* classname SubstringFunction
* description 标量函数
*/
public class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}使用 maven 进行打包
1
mvn clean packagen
2、上传 jar 包 到 flink 工程的 lib 目录下,需要重启flink 集群
我是使用streamPark,直接上传jar包即可
3、使用
1 | CREATE TABLE products( |