太湖手机网站建设,兰州网站排名外包,网站开发招标文件范本,洪山网页设计1. PTF 是什么#xff1a;UDF 的“超集”
Process Table Functions#xff08;PTFs#xff09;是 Flink SQL Table API 中最强的函数类型#xff0c;可以实现接近内置算子的能力#xff1a;
输入#xff1a;零/一/多张表#xff08;也可混合 scalar 参数#xff…1. PTF 是什么UDF 的“超集”Process Table FunctionsPTFs是 Flink SQL Table API 中最强的函数类型可以实现接近内置算子的能力输入零/一/多张表也可混合 scalar 参数输出零/一/多行任意 Row 或结构化类型能力Flink 托管状态managed state、事件时间event time、Timer、底层 changelogCDC一句话PTF 让你用“函数”写一个可状态化、可计时、可处理更新的表算子。2. PTF 与 SQL:2016 PTF 的关系文档里提到 SQL:2016 的 Polymorphic Table Functions同样简称 PTF。Flink 的 Process Table Functions 在语义上对齐 SQL 标准的一些调用特征表参数、row/set 语义、descriptor 参数等但同时增强了 Flink 的流式能力状态管理Flink state backend时间与 watermarkTimer 服务运行时 Changelog 能力你可以理解为Flink 在 SQL 标准 PTF 上叠加了流式计算“必须的三件套”state、time、changelog。3. PTF 最核心的概念Row 语义 vs Set 语义PTF 的 eval() 不是“只接受一行”它可以接受一个“表参数”并声明该表如何被理解3.1 Row Semantics行语义认为每行彼此独立系统可自由分发每个虚拟处理器一次只看到当前行通常无状态或者不依赖历史示例给每个 name 加个 greeting逐行处理publicstaticclassGreetingextendsProcessTableFunctionString{publicvoideval(ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE)Rowinput){collect(Hello input.getFieldAs(name)!);}}3.2 Set Semantics集合语义认为行之间有关联需要按 key 聚合成一个“集合”调用时必须或可选指定 PARTITION BY允许状态同一个 key 下的历史行可通过 state 记忆示例同一个 name 来过几次publicstaticclassGreetingWithMemoryextendsProcessTableFunctionString{publicstaticclassCountState{publiclongcounter0L;}publicvoideval(StateHintCountStatestate,ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE)Rowinput){state.counter;collect(Hello input.getFieldAs(name), your state.counter time?);}}调用Table APIenv.fromValues(Bob,Alice,Bob).as(name).partitionBy($(name)).process(GreetingWithMemory.class).execute().print();4. Virtual Processor为什么 PTF 既能扩展又能有状态PTF 会把输入表分布到所谓“虚拟处理器virtual processor”上执行。你可以理解为一个 virtual processor 对应一个 key 的处理上下文或者 row 语义下随机分发。Row 语义processor 只看到当前 rowSet 语义processor 被 PARTITION BY key “圈定”同 key 的数据共定位state/timer 也都在这个 key 上生效这就是 PTF 既能 scale-out又能做到 per-key 状态机的根本原因。5. 调用语法隐式参数 on_time 与 uidPTF 调用时除了你定义的参数系统还会“隐式补两类参数”on_time用于事件时间语义DESCRIPTORuid用于 stateful query evolution保证 savepoint 恢复、fan-out 优化等推荐name-based调用方式后续演进更稳SQLSELECT*FROMTableFilter(inputTABLEt,threshold100,uidmy-ptf);Table APIenv.from(t).process(TableFilter.class,lit(100).asArgument(threshold),lit(my-ptf).asArgument(uid));6. 实现规则eval() 方法签名是“铁律”PTF 只支持一个 eval()不支持重载签名模式eval( context? , state entry* , call argument* )Context可选必须是第一个State entries 必须在用户参数之前eval 必须 public不能 static7. StatePTF 的灵魂含 TTL / 大状态7.1 基本 stateValue State通过StateHint声明一个可变参数作为 stateclassCountingFunctionextendsProcessTableFunctionString{publicstaticclassCountState{publiclongcount0L;}publicvoideval(StateHintCountStatememory,ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE)Rowinput){memory.count;collect(Seen rows: memory.count);}}7.2 State TTL建议默认就设计publicvoideval(Contextctx,StateHint(ttl1 day)SeenStatememory,ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE)Rowinput){...}TTL 基于 processing time能有效避免“开 keyspace”导致 state 无限增长。7.3 大状态ListView / MapView避免整块反序列化ListView列表 stateMapViewmap state按 key 读取更省classLargeHistoryFunctionextendsProcessTableFunctionString{publicvoideval(StateHintMapViewString,IntegerlargeMemory,ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE)Rowinput){StringeventIdinput.getFieldAs(eventId);IntegercountlargeMemory.get(eventId);largeMemory.put(eventId,countnull?1:count1);}}8. Time Timers让 PTF 变成“事件时间状态机”8.1 on_time 与 rowtime 输出声明 on_time 后PTF 输出会自动带一个 rowtime 列用于下游继续做时间计算。SQLSELECT*FROMPingLaterFunction(inputTABLEEventsPARTITIONBYid,on_timeDESCRIPTOR(ts));8.2 定时器使用模式eval 注册onTimer 响应典型例子最后一次事件后 1 分钟发 pingpublicstaticclassPingLaterFunctionextendsProcessTableFunctionString{publicvoideval(Contextctx,ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,ArgumentTrait.REQUIRE_ON_TIME})Rowinput){TimeContextInstanttimeCtxctx.timeContext(Instant.class);timeCtx.registerOnTime(ping,timeCtx.time().plus(Duration.ofMinutes(1)));}publicvoidonTimer(OnTimerContextonTimerCtx){collect(ping);}}设计建议Timer 也会占 state尽量减少 timer 数量及时 clearAllTimers/clearAllState。9. 多表输入PTF 可以做“自定义 Join”PTF 可以同时接收多张表都必须 set semantics且 PARTITION BY 结构一致。一次 eval 只会有一个表参数非空通过 null 判断来源。示例访问表 Visits 购买表 Purchases按用户关联记住 last purchasepublicstaticclassGreetingWithLastPurchaseextendsProcessTableFunctionString{publicstaticclassLastItemState{publicStringlastItem;}publicvoideval(StateHintLastItemStatestate,ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE)Rowvisit,ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE)Rowpurchase){if(purchase!null){state.lastItempurchase.getFieldAs(item);}elseif(visit!null){if(state.lastItemnull){collect(Hello visit.getFieldAs(name), let me know if I can help!);}else{collect(Hello visit.getFieldAs(name), here to buy state.lastItem again?);}}}}注意多输入的到达顺序可能导致非确定性要么用 watermark 做“时间驱动”要么用条件缓冲来保证逻辑严谨。10. UIDPTF 独有的“状态化查询演进”能力PTF 是可持久化状态块周围 SQL 变了也可能恢复只要 state schema 不变。为此Flink 要求 set semantics 的 PTF 有唯一 UID未指定 uid默认用函数名同一个 statement 中只能出现一次多次调用必须手动指定 uid确保全局唯一同 uid优化器可做 fan-out共享一个 stateful PTF这对“一个状态机输出分流到多个 sink”非常重要。11. Changelog更新/撤回支持PTF 可以玩 CDC默认 PTF 假设输入是 append-onlyI输出也是 append-only这对 watermark 与时间语义最友好。若要接更新表必须声明SUPPORTS_UPDATES允许更新进入REQUIRE_UPDATE_BEFORE强制 retract 模式-U/UREQUIRE_FULL_DELETE强制 full delete-D 全字段示例把更新表转成 append-only把 RowKind 写进 payload输出始终 IDataTypeHint(ROWflag STRING, sum INT)publicstaticclassToChangelogFunctionextendsProcessTableFunctionRow{publicvoideval(ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE,ArgumentTrait.SUPPORTS_UPDATES})Rowinput){collect(Row.of(input.getKind().toString(),input.getField(sum)));}}更高级实现ChangelogFunction自己声明输出模式retract / upsert / delete 规则。但要非常谨慎输出 changelog 声明错了会导致整条 pipeline 行为未定义。12. 高级案例购物车状态机最典型 PTF购物车本质就是 per-user 状态机ADD/REMOVE/CHECKOUT REMINDER/TIMEOUT。PTF 用 state 存 cart用 timer 做 reminder/timeoutCHECKOUT 后 clear state——这就是 PTF 的“正确打开方式”。这类场景用传统 SQL UDF 很难优雅实现但 PTF 非常顺。13. 当前限制务必注意文档明确提到一些限制你贴的结尾也有PTF 不能跑 batch mode部分能力在早期阶段例如 broadcast state 等文档后面还会列更多限制如果 PTF 接 updates很多功能会受限例如 on_time 不支持等文档中也强调了建议PTF 目前适合“流式、事件驱动、状态机类”问题。14. 什么时候该用 PTF用一句很实际的话总结你只是做字段变换 → ScalarFunction一行拆多行 / 维表 lookup → TableFunction / AsyncTableFunction多行聚一值 → AggregateFunctionUDAGG多行聚多行 → TableAggregateFunctionUDTAGG你要状态机 timer 复杂 state 多表协同 处理更新 →PTF