利用pageAccessSession_tab测试数据,我们按地域统计连续的两个访问用户之间的访问时间间隔不超过3分钟的的页面访问量(PV).
- SELECT
- region,
- SESSION_START(rowtime, INTERVAL '3' MINUTE) AS winStart,
- SESSION_END(rowtime, INTERVAL '3' MINUTE) AS winEnd,
- COUNT(region) AS pv
- FROM pageAccessSession_tab
- GROUP BY region, SESSION(rowtime, INTERVAL '3' MINUTE)
d. Result

十二、UDX
Apache Flink 除了提供了大部分ANSI-SQL的核心算子,也为用户提供了自己编写业务代码的机会,那就是User-Defined Function,目前支持如下三种 User-Defined Function:
- UDF - User-Defined Scalar Function
- UDTF - User-Defined Table Function
- UDAF - User-Defined Aggregate Funciton
UDX都是用户自定义的函数,那么Apache Flink框架为啥将自定义的函数分成三类呢?是根据什么划分的呢?Apache Flink对自定义函数进行分类的依据是根据函数语义的不同,函数的输入和输出不同来分类的,具体如下:

1. UDF
a. 定义
用户想自己编写一个字符串联接的UDF,我们只需要实现ScalarFunction#eval()方法即可,简单实现如下:
- object MyConnect extends ScalarFunction {
- @varargs
- def eval(args: String*): String = {
- val sb = new StringBuilder
- var i = 0
- while (i < args.length) {
- if (args(i) == null) {
- return null
- }
- sb.append(args(i))
- i += 1
- }
- sb.toString
- }}
b. 使用
- ...
- val fun = MyConnect
- tEnv.registerFunction("myConnect", fun)
- val sql = "SELECT myConnect(a, b) as str FROM tab"
- ...
2. UDTF
a. 定义
用户想自己编写一个字符串切分的UDTF,我们只需要实现TableFunction#eval()方法即可,简单实现如下:
(编辑:开发网_开封站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|