网站开发三层电子商务网站建设购物车

张小明 2026/1/11 18:42:22
网站开发三层,电子商务网站建设购物车,黄骅贴吧桃花路,网络营销应该这样做今天就来了解一下 Flink 的状态以及应用#xff0c;首先第一个问题是#xff1a;什么是有状态计算#xff1f;基本概念在数据流处理中#xff0c;大部分操作都是每次只处理一个事件#xff0c;比如对输入的数据进行结构化解析#xff0c;这类操作我们称为无状态计算。而有…今天就来了解一下 Flink 的状态以及应用首先第一个问题是什么是有状态计算基本概念在数据流处理中大部分操作都是每次只处理一个事件比如对输入的数据进行结构化解析这类操作我们称为无状态计算。而有些操作则需要记住多个事件并进行处理比如前面我们在窗口中对数据做的求和操作这类操作我们称之为有状态计算。在 Flink 中状态的另一个重要作用是用来做故障恢复故障恢复主要依赖于 checkpoint 和 savepoint。当我们使用状态时通常需要从 State Backend 读取。通过介绍有状态计算的基本概念我们又引出了 checkpoint、State Backend 等概念下面我们再来一一解释。状态分类Flink 状态分类可以参考下图状态分类首先是分为 Raw State 和 Managed State 两大类我们分别从管理方式、数据类型、适用场景这三个方面来看它们的区别Raw State Managed State管理方式 开发者自行管理需要手动序列化和反序列化 由 Flink Runtime 管理自动存储和恢复数据数据类型 仅支持 byte 数组 支持 value, list, map适用场景 需要自定义 Operator 支持大部分计算场景Managed State 又分为 Keyed State 和 Operator State 两类下面我们详细介绍这两类状态。Keyed StateKeyed State 只能用在 KeyedStream 上也就是在使用前要先对数据流进行 keyBy 操作。Keyed State 支持以下几种状态类型ValueState保存一个值可以通过 update() 方法更新通过 value() 方法获取保存的值。ListState保存一个 list可以通过 add() 或 addAll() 方法向 list 中添加元素也可以通过 update() 直接覆盖。使用 get() 方法获取整个列表。ReducingState保存一个值表示添加到状态所有值的聚合使用 add() 方法添加元素使用 get() 方法获取保存的值。AggregatingStateIN, OUT保存一个值与 ReducingState 不同的是输入和输出的元素类型可以不同。MapStateUK, UV保存一个 map可以使用 put() 或 putAll() 添加键值对使用 get() 获取值。在知道了各个类型的 Keyed State 怎么用之后我们再来看如何创建一个 Keyed State。以 ValueState 为例。ValueStateDescriptorTuple2Long, Long descriptor new ValueStateDescriptor(average,TypeInformation.of(new TypeHintTuple2Long, Long() {}));ValueStateTuple2Long, Long sum getRuntimeContext().getState(descriptor);要想创建一个 State必须先创建一个 StateDescriptor然后通过 RuntimeContext 来获取 State。每个 State 都对应一种 StateDescriptor。ValueStateT getState(ValueStateDescriptorT)ReducingStateT getReducingState(ReducingStateDescriptorT)ListStateT getListState(ListStateDescriptorT)AggregatingStateIN, OUT getAggregatingState(AggregatingStateDescriptorIN, ACC, OUT)MapStateUK, UV getMapState(MapStateDescriptorUK, UV)Operator State算子状态也称为非 keyed 状态是绑定到一个并行算子实例的状态。State 需要支持重新分布。 最典型的是 Kafka Connector 中维护了一个 topic partitions 和 offset 的 map 作为一个算子状态。和 Keyed State 类似想要创建一个 Operator State同样也需要一个 StateDescriptor同时需要实现 CheckpointedFunction它提供了两个方法分别是在 checkpoint 时 调用的 snapshotState() 和 自定义函数初始化时调用的 initializeState()。Talk is cheap, show me your code!我们来看 Flink 官方文档提供的 Demopublic class BufferingSinkimplements SinkFunctionTuple2String, Integer,CheckpointedFunction {private final int threshold;private transient ListStateTuple2String, Integer checkpointedState;private ListTuple2String, Integer bufferedElements;public BufferingSink(int threshold) {this.threshold threshold;this.bufferedElements new ArrayList();}Overridepublic void invoke(Tuple2String, Integer value, Context context) throws Exception {bufferedElements.add(value);if (bufferedElements.size() threshold) {for (Tuple2String, Integer element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.update(bufferedElements);}Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptorTuple2String, Integer descriptor new ListStateDescriptor(buffered-elements,TypeInformation.of(new TypeHintTuple2String, Integer() {}));checkpointedState context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2String, Integer element : checkpointedState.get()) {bufferedElements.add(element);}}}}在这个例子中我们在 initializeState 方法中使用 getOperatorStateStore().getListState() 创建了一个 ListState然后将数据缓存到这个 list 中当缓存数据大小超过一个阈值时再统一发送到下游。这里还有一个方法值得注意就是 isRestored()它是用来判断当前任务是否是从故障中恢复的如果是我们需要执行故障恢复相关的逻辑。在这个例子中就是把 state 的数据恢复到本地的变量中。Broadcast State了解了如何创建和使用 Operator State 之后我们再来看一种特殊的 Operator State —— Broadcast State。Broadcast State 本身是类似于 Map 类型的格式使用时需要指定 key 和 value 的类型。它的作用是将一条数据流的数据广播到下游算子的各个节点。Broadcast State 的一个比较常见的作用就是大流关联小流。例如我们有一个订单流需要关联商品详情这时可以把商品详情的流作为 broadcast 流进行广播这样在每个 TaskManager 中会有一份商品详情数据订单流就可以直接查询 broadcast 的数据不需要再访问 MySQL 数据库来做查询操作。那么具体要怎么实现呢其实也很简单可以看下面这段代码MapStateDescriptorString, Product productStateDescriptor new MapStateDescriptor(productBroadcastState, String.class, Product.class);BroadcastStreamProduct broadcastProductStream productStream.broadcast(productStateDescriptor);BroadcastConnectedStreamOrder, Product connectedStreams orderStream.connect(broadcastProductStream);拿到 BroadcastConnectedStream 之后我们就可以调用 process 方法进行处理了。完整的代码我放到 GitHub 上了。感兴趣的可以查看。在使用 Broadcast State 的时需要注意目前 RocksDB 不支持保存 Broadcast State因此广播流吞吐量必须要小并且 Flink 任务要预留足够的内存。聊完了 Broadcast State我们再来看看 Operator State 是如何进行重新分布的。正常 Operator State 支持两种重新分布的方式按照不同的方式我们可以划分为 ListState 和 UnionListState。ListState所有的 element 均匀分布到 task 上UnionListState每个 element 都要在所有的 task 上OperatorStateResizeBroadcast State 由于本身就是广播状态因此重新分布后仍然是需要进行广播的。状态有效期最后再来扩展一个知识点就是状态的有效期。在 Flink 中只有 Keyed State 支持有效期。具体使用方法如下。StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Duration.ofSeconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptorString stateDescriptor new ValueStateDescriptor(text state, String.class);stateDescriptor.enableTimeToLive(ttlConfig);这里有三个属性我们分别来解释一下首先第一个是过期时间在调用 newBuilder 时就要传入。第二个是 UpdateType也就是更新策略默认是 OnCreateAndWrite表示在创建和写入时更新也可以设置为 OnReadAndWrite表示在读取和写入时更新。第三个是可见性默认是 NeverReturnExpired即不返回过期数据也可以设置为 ReturnExpiredIfNotCleanedUp表示会返回过期但未被清理的数据。状态数据清理策略也分为两种一种是做全量快照时进行清理创建 ttl 时调用 cleanupFullSnapshot() 方法即可。另一种是增量数据清理在访问或处理状态时状态后端保留一个所有状态的惰性迭代器每次清理时选择已经过期的数据进行清理。设置方法时在创建 ttl 时调用 cleanupIncrementally(10, true) 可以看到它提供两个参数第一个参数是设置每次检查的条数默认是5。第二个参数是是否在处理每条记录时都触发清理默认是 false。总结最后我们来总结一下本文我们主要介绍了 Flink 的状态及应用首先介绍有状态计算的概念。接着重点学习了 Keyed State 和 Operator State。我们通过一个表格来进行总结。Keyed State Operator State使用算子类型 只能被用于 KeyedStream 中的Operator 上 可以被用于所有 Operator状态分配 每个 Key 对应一个状态单个 Operator 中可以包含多个 Key 单个 Operator 对应一个状态创建和访问方式 重写 RichFunction通过访问 RuntimeContext 对象获取 实现 CheckpointedFunction 或 ListCheckpointed 接口横向拓展 状态随着 Key 自动在多个算子 Task 上迁移 有多种重新分配的方式均匀分布。将所有状态合并再分发到每个实例上支持数据类型 ValueState, ListState, ReducingState, AggregatingState, MapState ListState, UnionListState, Broadcast State最后我们又介绍了状态有效期的定义和使用方法。有了状态之后Flink 就可以为我们提供非常强大的容错能力了具体怎么做的我们后面再聊。
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

个人网站备案能做宣传用么seo关键词是怎么优化的

B站作为广受欢迎的视频平台,不仅聚合了大量娱乐与创意内容,还汇集了丰富的学习资源,涵盖语言学习、编程开发、考试备考等多个知识领域。在观看过程中,用户有时需要将视频保存至本地以便离线使用,目前已有多种便捷工具可…

张小明 2026/1/9 17:35:50 网站建设

网站死链如何处理网络文化经营许可证全国有多少张

PyTorch-CUDA-v2.7镜像支持GradCAM,解释CNN决策依据 在医疗影像诊断系统中,一个深度学习模型判断“肺部CT图像存在恶性结节”——这个结果本身或许准确,但医生不会轻易采信。他们真正关心的是:模型是基于哪些视觉依据做出这一判断…

张小明 2026/1/8 1:34:24 网站建设

淮安哪里做网站上海网站建设官方网站

第一章:C多线程同步机制概述在现代高性能计算和并发编程中,C多线程程序设计已成为提升应用效率的核心手段。然而,多个线程对共享资源的并发访问可能引发数据竞争、状态不一致等问题,因此必须引入有效的同步机制来协调线程行为。为…

张小明 2026/1/8 1:33:51 网站建设

东莞朝阳网站建设爱城市网app官方下载

FaceFusion镜像支持Grafana仪表盘展示:技术实现与监控可视化深度解析在AI生成内容(AIGC)应用快速落地的今天,人脸融合技术已不再局限于实验室或小众娱乐场景。从虚拟主播换脸到影视后期修复,再到个性化社交滤镜&#x…

张小明 2026/1/8 1:33:19 网站建设

成都学做网站简单班级网站模板

LabelPlus:重塑漫画翻译的智能化工作流 【免费下载链接】LabelPlus Easy tool for comic translation. 项目地址: https://gitcode.com/gh_mirrors/la/LabelPlus 在数字化内容创作蓬勃发展的今天,漫画翻译已从单纯的语言转换演变为集排版、设计、…

张小明 2026/1/11 17:50:27 网站建设

移动端网站的优势河源建设用地竞拍网站

2025终极pdfmake指南:10分钟从零掌握JavaScript PDF生成神器 【免费下载链接】pdfmake Client/server side PDF printing in pure JavaScript 项目地址: https://gitcode.com/gh_mirrors/pd/pdfmake 还在为复杂的PDF生成工具而头疼吗?想找一个既能…

张小明 2026/1/8 1:31:43 网站建设