flink sql 自定义udf实践之标量函数

背景

UDF 实现的几种方式

udf 提供 5 种自定义方式,在实际业务中,我们根据业务需求继承对应的 Function,并实现对应的方法。

  • 标量函数 将标量值转换成一个新标量值;简单理解就是 一对一模式,比如输入一个身份证号码,输出一个加密的字符串,实现(ScalarFunction),重写 eval() 方法。

  • 表值函数 将标量值转换成新的行数据;简单理解就是一对多模式,比如一个字段存储为 逗号隔开的字符串,可通过(TableFunction)进行 split,输出多条数据。

  • 聚合函数 将多行数据里的标量值转换成一个新标量值; 多对一模式,通过聚合操作把多行输出为一个值。(AggregateFunction)

  • 表聚合函数将多行的标量值映射到新行。

  • 异步表函数是用于执行查找的表源的特殊函数。

以下主要介绍标量(Scalar)UDF的实现

实践:截取字符串

实现自定义的 UDF 功能主要分为以下几个步骤:

1、自定义 UDF 函数的实现逻辑,打包成 jar 包
2、上传 jar 包 到 flink 工程的 lib 目录下
3、使用 Flink 中的 SQL API 直接进行使用

1、自定义 UDF 函数

  • 创建 maven 工程 ,工程需要引用以下pom 依赖

    1
    2
    3
    4
    5
    6
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
  • 定义一个对字符串进行截取的函数,比如 5个字符截取1到 3位的字符

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    package org.example;
    import org.apache.flink.table.functions.ScalarFunction;
    /**
    * classname SubstringFunction
    * description 标量函数
    */
    public class SubstringFunction extends ScalarFunction {
    public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, end);
    }
    }

  • 使用 maven 进行打包

    1
    mvn clean packagen

我是使用streamPark,直接上传jar包即可

tbrmMR

3、使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE products(
  id INT,
  name STRING,
  description STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://127.0.0.1:3306/demo',
  'username' = 'root',
  'password' = '123456',
  'table-name' = 'products'
);
//创建 function 先需要声明函数
CREATE FUNCTION SubstringFunction as 'org.example.SubstringFunction';
//使用
SELECT id,name,SubstringFunction(name,1,3) AS substr from products;

相关链接

示例