欢迎您访问南华自考网!

Flink源代码分析系列从进入到放弃-第7章(2)检查点

更新时间:2024-01-08 18:33:02作者:51data

从Flink入门到放弃的源代码分析系列

Flink组件和逻辑规划Flink执行规划生成作业管理器的基本组件(1)作业管理器的基本组件(2)作业管理器的基本组件(3)任务管理器运算符网络漫游

Flink源代码分析系列从进入到放弃-第7章(2)检查点

CheckPoint是一种防止flink丢失消息的机制,通过验证方式调整定时。 巴里是什么?

其实上一章介绍flink网络堆栈的时候,已经在消费端介绍了flink对不同的Barrier进行处理。 实际上,Barrier是校准checkpint的方式。 对于一个拓扑,只有上游算子的checkpoint结束,下游算子的cehckpoint才能开始并有意义。 同时,下游算子的消耗速度并不统一【有的信道快,有的信道慢】,Barrier就是这样协调上下游算子的结构。

JobManager将Barrier事件发送给源操作员,统一通知其向下游广播。 下游操作员收到此类事件后,会发现自己在两次checkpoint之间【新的checkpoint开始】

当下游运算符接收到所有InputChannel的Barrier事件时,它知道上游运算符的checkpoint已完成一次,并可以自己创建checkpoint。 完成后,继续向下游运算符广播checkpoint事件

在Exact-once语义下,消费方减缓消费,调整不同信道的消费费率。 这在flink网络堆栈一章中有详细描述。

2调整并启动检查点

上述检查点集成由作业管理器启动。 让我们来看看相关的逻辑。

检查点协调器

flink的checkpoint统一在CheckpointCoordinator中,通过向相关的tasks【源tasks】发送checkpoint命令事件,开始checkpoint,checkpoint

结构参数

现在我们需要了解一些重要的参数,以便更好地理解Checkpoint的详细策略

baseInternal :快照间隔checkpointTimeout :一次checkpoint超时时间,超时的checkpoint为maxconcurrentcheckpointation 触发分布式检查点的开始tasks或source tasksCheckpoint的开始

上一章介绍了ExecutionGraph【flink物理计划抽象】。 它具有一个核心界面enableSnapshotCheckpointing,该界面在作业管理器提交作业时运行。 具体请参见jobmanagerline 1238 frommethodsuuuting此接口的逻辑总结如下。

获取checkpoint的发起节点【源节点】,需要ack和commit的节点【所有节点】为、 关闭现有的checkpoint实例化CheckpointCoordinator及其侦听Akka系统checkpointcoordinatordeactivator。 将checkpointcoordinatordeactivator注册为EecutionGraph的监听器,当作业的执行状态变为RUNNING时, 在通知checkpointcoordinatordeactivator启动CheckpointCoordinator的checkpoint线程上,CheckpointCoordinator收到这样的消息后,会显示什么

timer task将开始并准时运行。时间是现在的系统时间。 因为全局只有一个检查点协调器,所以这个时间也在全局范围内增加,是唯一的。

//checkpointcoordinatorline 1228

权限finalclassscheduledtriggerimplementsrunnable {

@Override

公共语音运行( }

try {

trigger check point ( system.current time millis (,true ) );

}

catch(exceptione ) (

log.error ( exceptionwhiletriggeringcheckpointforjob { }.& amp; #039;job、e );

}

}

}

让我们具体分析一下checkpoint的核心行为

检查点触发器

//CheckpointCoordinator line394

publicbooleantriggercheckpoint (长时间,布尔表达式) )。

returntriggercheckpoint ( timestamp,checkpointProperties,null,isPeriodic ).isSuccess );

}

publiccheckpointtriggerresulttriggercheckpoint (

长时间,

检查点属性属性,

@ nullablestringexternalsavepointlocation、

布尔伊兹周期性) {

//make some eager pre-checks

同步( lock ) {

//abortifthecoordinatorhasbeenshutdowninthemeantime

if(shutdown ) {

returnnewcheckpointtriggerresult ( checkpointdeclinereason.coordinator _ shut down );

}

//don& #039; tallowperiodiccheckpointifschedulinghasbeendisabled

周期性( if )! 周期性调度) {

returnnewcheckpointtriggerresult ( checkpointdeclinereason.periodic _ scheduler _ shut down );

}

.

归纳逻辑:

关闭或优先处理队列请求时,如果并发任务总数超过限制,此检查点将取消启动。 最小间隔时间不满足时,本次checkpointcheck的所有源节点【源节点】和其他节点进入RUNNING状态才发出checkpoint,将PendingCheckpoint

//CheckpointCoordinator line678

公共声明消息( declinecheckpointmessage )。

短按|||消息==空{

返回;

}

if (! job.equals(message.getjob ( ) ) ) )

thrownewillegalargumentexception ( receiveddeclinecheckpointmessageforjob )。

message.getjob& #039; whilethiscoordinatorhandlesjob & amp; #039; 作业;

}

finallongcheckpointid=message.getcheckpointid (;

finalstringreason=( message.getreason )!=null message.getReason ( ).getMessage ):& #039; & #039; );

PendingCheckpoint checkpoint;

同步( lock ) {

//weneedtocheckinsidethelockforbeingshutdownaswell,otherwise we

//getracesandinvaliderrorlogmessages

if(shutdown ) {

返回;

}

.

总结其逻辑:

如果存在相应的PendingCheckpoint,则取消,如果随后存在其他checkpoint,则恢复对这些checkpoint任务Ack Checkpoint消息的处理

//CheckpointCoordinator line727

publicbooleanreceiveacknowledgemessage ( acknowledgecheckpointmessage ) throws CheckpointException {

短按|||消息==空{

返回假;

}

if (! job.equals(message.getjob ( ) ) ) )

log.error ( receivedwrongacknowledgecheckpointmessageforjob ( )、job、message );

返回假;

}

finallongcheckpointid=message.getcheckpointid (;

同步( lock ) {

//weneedtocheckinsidethelockforbeingshutdownaswell,otherwise we

//getracesandinvaliderrorlogmessages

if(shutdown ) {

返回假;

}

.

总结其逻辑:

在消息中的checkpoint id中找到对应的PendingCheckpoint,并记录下对应的作业版本下的某个执行版本的ack状态的PendingCheckpoint包括: 如果该checkpoint需要ack的所有执行版本在所有ack上都已完成,则清除PendingCheckpoint中维护的状态数据,并将句柄转换为CompletedCheckpoint 通知对应的任务管理器检查点已完成【检查点提交阶段】,用CompletedCheckpointStore序列化并保存CompletedCheckpoint,在高可用性模式下zompleted 【flink job manager基本组件】,将来恢复时,将各节点所需的句柄注入状态,然后在操作员启动时将状态数据附加到TaskDeploymentDescriptor上,发布到TaskManager进行CCC

上面说任务管理器接收并处理AbstractCheckpointMessage消息。 让我们来看看核心逻辑。

//任务管理器line 500

privatedefhandlecheckpointingmessage ( actor message:abstractcheckpointmessage ):Unit={

加速器消息匹配{

case message: TriggerCheckpoint=

valtaskexecutionid=message.gettaskexecutionid

valcheckpointid=message.getcheckpointid

val timestamp=message.get timestamp

valcheckpointoptions=message.getcheckpointoptions

log.debug(s& #039; receivertriggercheckpoint $ check pointid @ $ timestamp for $ taskexecutionid.& amp; #039; )

val task=running tasks.get ( taskexecutionid ) )。

任务!=null ) {

task.triggercheckpointbarrier ( check pointid,timestamp,checkpointOptions ) )。

} else {

log.debug(s& #039; taskmanagerreceivedacheckpointrequestforunknowntask $ taskexecutionid.& amp; #039; )

}

.

//Task.java line1140

publicvoidtriggercheckpointbarrier (

最终锁定检查点,

长检查点时间,

finalcheckpointoptionscheckpointoptions ) {

finalabstractinvokableinvokable=this.invokable;

finalcheckpointmetadatacheckpointmetadata=newcheckpointmetadata ( check pointid,checkpointTimestamp );

执行状态==执行状态. running invokable!=null ) {

//build a local closure

finalstringtaskname=tasknamewithsubtask;

finalsafetynetcloseableregistrysafetynetcloseableregistry=

filesystemsafetynet.getsafetynetcloseableregistryforthread (;

.

//流任务行612

privatebooleanperformcheckpoint (

checkpointmetadatacheckpointmetadata、

checkpointoptionscheckpointoptions、

checkpointmetricscheckpointmetrics ( throws exception {

log.debug ( starting check point ( { } ) ontask ),

check point metadata.getcheckpointid (,check point options.getcheckpointtype ),getName );

同步( lock ) {

if(isrunning ) {

//we can do a checkpoint

总结其逻辑:

首先,通过TaskManager进行消息路由,对于TriggerCheckpoint消息, 路由到对应的Task进行处理的Task对异步的Task进行checkpoint。在内部调用StreamTask的performCheckpoint方法的performCheckpoint内部,首先是这次的checkpoint的bask 当前支持的持久化方法是文件系统、内存和RocksDB,如果成功,它会通知作业管理器进行确认。 否则,如果取消这次的checkpoint,是ack消息,根据情况在对应的KVState上附上图片说明对话流程。

3检查点存储和恢复

检查点的存储和恢复全部通过AbstractStateBackend进行。 AbstractStateBackend有三个实现类,FsStateBackend通过HDFS存储检查点的状态。 继承关系如下。

让我们来看看最常见的FsStateBackend。 AbstractStateBackend内部用State管理状态数据。 根据状态数据的特性,将状态分为3类。

ValueState :在最简单的状态下,一个key一个值的value可以对应ListState的更新和删除。 一个密钥对应一个值列表状态,一个密钥对应一个值操作状态。 一个key对应一个key之后添加的值通过folding函数附加到第一个值。 在AbstractStateBackend内部,通过KvState接口管理用户自定义的kv数据。 让我们来看看FsValueState的继承关系。

那我们怎么得到这些State? flink抽象出另一组接口:使用StateDescriptor获取State,然后绑定并获取特定的StateBackend。 这种阶层的抽象化,将State的模型和作为基础的具体的存储装置解除结合。 请看StateDescriptor的继承关系:

那么,这些抽象如何协调工作呢?

//KvState的初始化和获取

//abstractkeyedceppatternoperator

公共语音打开( throws exception )。

if ( keys==空值) {

keys=new HashSet (;

}

nfaoperatorstate==null ( if ) {

nfaoperatorstate=getpartitionedstate (

new ValueStateDescriptor (

NFA_OPERATOR_STATE_NAME,

new NFA.Serializer ( )、

null );

}

//AbstractStreamOperator

protectedsgetpartitionedstate ( statedescriptorstatedescriptor ) throws Exception { (

returngetstatebackend (.getpartitionedstate ( null,VoidSerializer.INSTANCE,stateDescriptor );

}

//AbstractStateBackend

//具体的kvState取决于子类的具体实现

publicsgetpartitionedstate ( final namespace,finaltypeserializernamespaceserializer,finalstatedescriptorstatedescriptor ) )

密钥序列==空( if ) {

thrownewruntimeexception(& #039; statekeyserializerhasnotbeenconfiguredintheconfig.& amp; #039;

& #039; thisoperationcannotusepartitionedstate.& amp; #039;

}

if (! state descriptor.isserializerinitialized (

state descriptor.initializeserializerunlesset ( newexecutionconfig );

}

获取KvState后,用户经过了一些更新.接下来是快照的进程

//创建状态后退结束

//AbstractStreamOperator

try {

typeserializerkeyserializer=config.getstatekeyserializer ( getusercodeclassloader );

//ifthekeyserializerisnullwestillneedtocreatethestatebackend

//for the non-partitionedstatefeaturesitprovides,such as the state output streams

stringoperatoridentifier=getclass (.getsimplename ) ) & #039; _& #039; config.getVertexID ) & #039; _& #039; runtimecontext.getindexofier ) ) ) _ ) ruruntimecontext

state back end=container.createstatebackend ( operator identifier,keySerializer );

}catch(exceptione ) {

thrownewruntimeexception ( couldnotinitializestatebackend.& amp; #039;e );

}

//流任务

publicabstractstatebackendcreatestatebackendstringoperatoridentifier,TypeSerializer keySerializer ) throws Exception {

abstractstatebackendstatebackend=configuration.getstate back end (用户类加载器);

静态备份结束!=null ) {

//backendhasbeenconfiguredontheenvironment

log.info ( using user-definedstatebackend:& amp; #039; 状态后退结束);

}

//快照入口

//AbstractStreamOperator

publicstreamtaskstatesnapshotoperatorstate ( longcheckpointid,long timestamp ) throws Exception { )。

//here,wedealwithkey/valuestatesnapshots

streamtaskstatestate=newstreamtaskstate (;

静态备份结束!=null ) {

HashMap partitionedSnapshots=

state back end.snapshotpartitionedstate ( check pointid,timestamp );

分区快照!=null ) {

state.setkvstates ( partitioned snapshot s );

}

}

返回状态;

}

上面的快照行结束时,用户将获得KvStateSnapshot抽象。 如果为FsState,则信息(如文件句柄和序列化元数据)封装在内部,并提供用于恢复快照的接口。 抽象关系如下。

flink还将获取每个task的每个operator快照的KvStateSnapshot封装在StreamTaskState中,最终与一个StreamTaskStateList的一组task相对应

//运行时间环境

acknowledgecheckpointmessage=newacknowledgecheckpoint (

jobId、

执行id,

检查点,

串行状态,

状态;

作业管理器. tell ( message );

作业管理器将这些句柄的数据重新快照到本地和zk。 具体而言,请参阅作业管理器的基本组件。

相关文章

为您推荐

加州州长签署了一项法案,正式将中国农历新年定为法定假日。

极目新闻记者 孙喆据美国亚裔文化网站“NextShark”10月1日报道,当地时间9月30日午夜前,美国加利福尼亚州州长加文·纽森签署了一项法案,正式将中国农历新年定为该州的法定节假日。相关报道截图(来源:雅虎新闻)报道称,这项法律授权任何

2024-01-08 18:21

三亚市自然资源和规划局关于2020年事业单位公开招聘笔试有关事项的通知

三亚市自然资源和规划局2020年事业单位工作人员公开招聘笔试相关事项公告 三亚市自然资源和规划局2020年事业单位工作人员公开招聘报名资格初审工作已经结束,已于5月25日对资格初审合格人员进行公告,现将通过资格初审进入笔试人员名单及相关事项

2024-01-08 18:21

合肥一中、六中、八中联招线公布!普通高中各批次录取分数线为.

刚刚,合肥市教育考试院召开合肥市区2022年普通高中录取工作媒体会,发布各批次普通高中录取最低分数线。合肥一中(瑶海和滨湖校区)、六中、八中三校联招统招线为702 分。合肥二中统招线为659 分;合肥市第三中学 648分;合肥四中 681

2024-01-08 18:21

中国农历新年| 6月21日夏季凉茶

大伏天里,在户外行走劳作的人,在烈日的晒烤之下,会大汗淋漓,口干舌燥,甚至很容易中暑。所以,在往昔年代里,总有一些这样的好心人,在村头街角的路口,备好土碗和一大锅的凉茶,供过路的人消暑解渴。虽然只是一碗粗茶,却是大伏天里,一片积善成德的清凉

2024-01-08 18:21

2022年8月7日,立秋,老人说今年是“晚立秋,热死牛”。他有什么看法?

立秋,二十四节气中的第13个节气,也是秋季的第一个节气。感受着夏季炎热的温度之后,不少朋友期盼着立秋能尽快到来。大部分的年份,立秋之后,温度会逐渐降低。但是,也有部分年份,立秋之后还会经历一段时间的高温,也就是大家常说的出现了“秋老虎”。关

2024-01-08 18:21

2021高考陆续出炉!通过渠道检查和发布

2021高考陆续放榜!6.23(今日):内蒙古、上海、安徽、江西、广西、四川、云南、甘肃、宁夏等 9 省份的考生可以查询自己的高考成绩。6.24日:山西、吉林、贵州、陕西等地将公布成绩。6.25日:北京、天津、河北、江苏、河南、湖北、湖南、

2024-01-08 18:09

加载中...