蒙恩招生网 蒙恩招生网4
2023-10-30
更新时间:2024-01-08 18:33:02作者:51data
从Flink入门到放弃的源代码分析系列
Flink组件和逻辑规划Flink执行规划生成作业管理器的基本组件(1)作业管理器的基本组件(2)作业管理器的基本组件(3)任务管理器运算符网络漫游
CheckPoint是一种防止flink丢失消息的机制,通过验证方式调整定时。 巴里是什么?
其实上一章介绍flink网络堆栈的时候,已经在消费端介绍了flink对不同的Barrier进行处理。 实际上,Barrier是校准checkpint的方式。 对于一个拓扑,只有上游算子的checkpoint结束,下游算子的cehckpoint才能开始并有意义。 同时,下游算子的消耗速度并不统一【有的信道快,有的信道慢】,Barrier就是这样协调上下游算子的结构。
JobManager将Barrier事件发送给源操作员,统一通知其向下游广播。 下游操作员收到此类事件后,会发现自己在两次checkpoint之间【新的checkpoint开始】
当下游运算符接收到所有InputChannel的Barrier事件时,它知道上游运算符的checkpoint已完成一次,并可以自己创建checkpoint。 完成后,继续向下游运算符广播checkpoint事件
在Exact-once语义下,消费方减缓消费,调整不同信道的消费费率。 这在flink网络堆栈一章中有详细描述。
2调整并启动检查点
上述检查点集成由作业管理器启动。 让我们来看看相关的逻辑。
检查点协调器
flink的checkpoint统一在CheckpointCoordinator中,通过向相关的tasks【源tasks】发送checkpoint命令事件,开始checkpoint,checkpoint
结构参数
现在我们需要了解一些重要的参数,以便更好地理解Checkpoint的详细策略
baseInternal :快照间隔checkpointTimeout :一次checkpoint超时时间,超时的checkpoint为maxconcurrentcheckpointation 触发分布式检查点的开始tasks或source tasksCheckpoint的开始
上一章介绍了ExecutionGraph【flink物理计划抽象】。 它具有一个核心界面enableSnapshotCheckpointing,该界面在作业管理器提交作业时运行。 具体请参见jobmanagerline 1238 frommethodsuuuting此接口的逻辑总结如下。
获取checkpoint的发起节点【源节点】,需要ack和commit的节点【所有节点】为、 关闭现有的checkpoint实例化CheckpointCoordinator及其侦听Akka系统checkpointcoordinatordeactivator。 将checkpointcoordinatordeactivator注册为EecutionGraph的监听器,当作业的执行状态变为RUNNING时, 在通知checkpointcoordinatordeactivator启动CheckpointCoordinator的checkpoint线程上,CheckpointCoordinator收到这样的消息后,会显示什么
timer task将开始并准时运行。时间是现在的系统时间。 因为全局只有一个检查点协调器,所以这个时间也在全局范围内增加,是唯一的。
//checkpointcoordinatorline 1228
权限finalclassscheduledtriggerimplementsrunnable {
@Override
公共语音运行( }
try {
trigger check point ( system.current time millis (,true ) );
}
catch(exceptione ) (
log.error ( exceptionwhiletriggeringcheckpointforjob { }.& amp; #039;job、e );
}
}
}
让我们具体分析一下checkpoint的核心行为
检查点触发器
//CheckpointCoordinator line394
publicbooleantriggercheckpoint (长时间,布尔表达式) )。
returntriggercheckpoint ( timestamp,checkpointProperties,null,isPeriodic ).isSuccess );
}
publiccheckpointtriggerresulttriggercheckpoint (
长时间,
检查点属性属性,
@ nullablestringexternalsavepointlocation、
布尔伊兹周期性) {
//make some eager pre-checks
同步( lock ) {
//abortifthecoordinatorhasbeenshutdowninthemeantime
if(shutdown ) {
returnnewcheckpointtriggerresult ( checkpointdeclinereason.coordinator _ shut down );
}
//don&; #039; tallowperiodiccheckpointifschedulinghasbeendisabled
周期性( if )! 周期性调度) {
returnnewcheckpointtriggerresult ( checkpointdeclinereason.periodic _ scheduler _ shut down );
}
.
归纳逻辑:
关闭或优先处理队列请求时,如果并发任务总数超过限制,此检查点将取消启动。 最小间隔时间不满足时,本次checkpointcheck的所有源节点【源节点】和其他节点进入RUNNING状态才发出checkpoint,将PendingCheckpoint
//CheckpointCoordinator line678
公共声明消息( declinecheckpointmessage )。
短按|||消息==空{
返回;
}
if (! job.equals(message.getjob ( ) ) ) )
thrownewillegalargumentexception ( receiveddeclinecheckpointmessageforjob )。
message.getjob&; #039; whilethiscoordinatorhandlesjob & amp; #039; 作业;
}
finallongcheckpointid=message.getcheckpointid (;
finalstringreason=( message.getreason )!=null message.getReason ( ).getMessage ):&; #039; &; #039; );
PendingCheckpoint checkpoint;
同步( lock ) {
//weneedtocheckinsidethelockforbeingshutdownaswell,otherwise we
//getracesandinvaliderrorlogmessages
if(shutdown ) {
返回;
}
.
总结其逻辑:
如果存在相应的PendingCheckpoint,则取消,如果随后存在其他checkpoint,则恢复对这些checkpoint任务Ack Checkpoint消息的处理
//CheckpointCoordinator line727
publicbooleanreceiveacknowledgemessage ( acknowledgecheckpointmessage ) throws CheckpointException {
短按|||消息==空{
返回假;
}
if (! job.equals(message.getjob ( ) ) ) )
log.error ( receivedwrongacknowledgecheckpointmessageforjob ( )、job、message );
返回假;
}
finallongcheckpointid=message.getcheckpointid (;
同步( lock ) {
//weneedtocheckinsidethelockforbeingshutdownaswell,otherwise we
//getracesandinvaliderrorlogmessages
if(shutdown ) {
返回假;
}
.
总结其逻辑:
在消息中的checkpoint id中找到对应的PendingCheckpoint,并记录下对应的作业版本下的某个执行版本的ack状态的PendingCheckpoint包括: 如果该checkpoint需要ack的所有执行版本在所有ack上都已完成,则清除PendingCheckpoint中维护的状态数据,并将句柄转换为CompletedCheckpoint 通知对应的任务管理器检查点已完成【检查点提交阶段】,用CompletedCheckpointStore序列化并保存CompletedCheckpoint,在高可用性模式下zompleted 【flink job manager基本组件】,将来恢复时,将各节点所需的句柄注入状态,然后在操作员启动时将状态数据附加到TaskDeploymentDescriptor上,发布到TaskManager进行CCC
上面说任务管理器接收并处理AbstractCheckpointMessage消息。 让我们来看看核心逻辑。
//任务管理器line 500
privatedefhandlecheckpointingmessage ( actor message:abstractcheckpointmessage ):Unit={
加速器消息匹配{
case message: TriggerCheckpoint=
valtaskexecutionid=message.gettaskexecutionid
valcheckpointid=message.getcheckpointid
val timestamp=message.get timestamp
valcheckpointoptions=message.getcheckpointoptions
log.debug(s&; #039; receivertriggercheckpoint $ check pointid @ $ timestamp for $ taskexecutionid.& amp; #039; )
val task=running tasks.get ( taskexecutionid ) )。
任务!=null ) {
task.triggercheckpointbarrier ( check pointid,timestamp,checkpointOptions ) )。
} else {
log.debug(s&; #039; taskmanagerreceivedacheckpointrequestforunknowntask $ taskexecutionid.& amp; #039; )
}
.
//Task.java line1140
publicvoidtriggercheckpointbarrier (
最终锁定检查点,
长检查点时间,
finalcheckpointoptionscheckpointoptions ) {
finalabstractinvokableinvokable=this.invokable;
finalcheckpointmetadatacheckpointmetadata=newcheckpointmetadata ( check pointid,checkpointTimestamp );
执行状态==执行状态. running invokable!=null ) {
//build a local closure
finalstringtaskname=tasknamewithsubtask;
finalsafetynetcloseableregistrysafetynetcloseableregistry=
filesystemsafetynet.getsafetynetcloseableregistryforthread (;
.
//流任务行612
privatebooleanperformcheckpoint (
checkpointmetadatacheckpointmetadata、
checkpointoptionscheckpointoptions、
checkpointmetricscheckpointmetrics ( throws exception {
log.debug ( starting check point ( { } ) ontask ),
check point metadata.getcheckpointid (,check point options.getcheckpointtype ),getName );
同步( lock ) {
if(isrunning ) {
//we can do a checkpoint
总结其逻辑:
首先,通过TaskManager进行消息路由,对于TriggerCheckpoint消息, 路由到对应的Task进行处理的Task对异步的Task进行checkpoint。在内部调用StreamTask的performCheckpoint方法的performCheckpoint内部,首先是这次的checkpoint的bask 当前支持的持久化方法是文件系统、内存和RocksDB,如果成功,它会通知作业管理器进行确认。 否则,如果取消这次的checkpoint,是ack消息,根据情况在对应的KVState上附上图片说明对话流程。
3检查点存储和恢复
检查点的存储和恢复全部通过AbstractStateBackend进行。 AbstractStateBackend有三个实现类,FsStateBackend通过HDFS存储检查点的状态。 继承关系如下。
让我们来看看最常见的FsStateBackend。 AbstractStateBackend内部用State管理状态数据。 根据状态数据的特性,将状态分为3类。
ValueState :在最简单的状态下,一个key一个值的value可以对应ListState的更新和删除。 一个密钥对应一个值列表状态,一个密钥对应一个值操作状态。 一个key对应一个key之后添加的值通过folding函数附加到第一个值。 在AbstractStateBackend内部,通过KvState接口管理用户自定义的kv数据。 让我们来看看FsValueState的继承关系。
那我们怎么得到这些State? flink抽象出另一组接口:使用StateDescriptor获取State,然后绑定并获取特定的StateBackend。 这种阶层的抽象化,将State的模型和作为基础的具体的存储装置解除结合。 请看StateDescriptor的继承关系:
那么,这些抽象如何协调工作呢?
//KvState的初始化和获取
//abstractkeyedceppatternoperator
公共语音打开( throws exception )。
if ( keys==空值) {
keys=new HashSet (;
}
nfaoperatorstate==null ( if ) {
nfaoperatorstate=getpartitionedstate (
new ValueStateDescriptor (
NFA_OPERATOR_STATE_NAME,
new NFA.Serializer ( )、
null );
}
//AbstractStreamOperator
protectedsgetpartitionedstate ( statedescriptorstatedescriptor ) throws Exception { (
returngetstatebackend (.getpartitionedstate ( null,VoidSerializer.INSTANCE,stateDescriptor );
}
//AbstractStateBackend
//具体的kvState取决于子类的具体实现
publicsgetpartitionedstate ( final namespace,finaltypeserializernamespaceserializer,finalstatedescriptorstatedescriptor ) )
密钥序列==空( if ) {
thrownewruntimeexception(&; #039; statekeyserializerhasnotbeenconfiguredintheconfig.& amp; #039;
&; #039; thisoperationcannotusepartitionedstate.& amp; #039;
}
if (! state descriptor.isserializerinitialized (
state descriptor.initializeserializerunlesset ( newexecutionconfig );
}
获取KvState后,用户经过了一些更新.接下来是快照的进程
//创建状态后退结束
//AbstractStreamOperator
try {
typeserializerkeyserializer=config.getstatekeyserializer ( getusercodeclassloader );
//ifthekeyserializerisnullwestillneedtocreatethestatebackend
//for the non-partitionedstatefeaturesitprovides,such as the state output streams
stringoperatoridentifier=getclass (.getsimplename ) ) &; #039; _&; #039; config.getVertexID ) &; #039; _&; #039; runtimecontext.getindexofier ) ) ) _ ) ruruntimecontext
state back end=container.createstatebackend ( operator identifier,keySerializer );
}catch(exceptione ) {
thrownewruntimeexception ( couldnotinitializestatebackend.& amp; #039;e );
}
//流任务
publicabstractstatebackendcreatestatebackendstringoperatoridentifier,TypeSerializer keySerializer ) throws Exception {
abstractstatebackendstatebackend=configuration.getstate back end (用户类加载器);
静态备份结束!=null ) {
//backendhasbeenconfiguredontheenvironment
log.info ( using user-definedstatebackend:& amp; #039; 状态后退结束);
}
//快照入口
//AbstractStreamOperator
publicstreamtaskstatesnapshotoperatorstate ( longcheckpointid,long timestamp ) throws Exception { )。
//here,wedealwithkey/valuestatesnapshots
streamtaskstatestate=newstreamtaskstate (;
静态备份结束!=null ) {
HashMap partitionedSnapshots=
state back end.snapshotpartitionedstate ( check pointid,timestamp );
分区快照!=null ) {
state.setkvstates ( partitioned snapshot s );
}
}
返回状态;
}
上面的快照行结束时,用户将获得KvStateSnapshot抽象。 如果为FsState,则信息(如文件句柄和序列化元数据)封装在内部,并提供用于恢复快照的接口。 抽象关系如下。
flink还将获取每个task的每个operator快照的KvStateSnapshot封装在StreamTaskState中,最终与一个StreamTaskStateList的一组task相对应
//运行时间环境
acknowledgecheckpointmessage=newacknowledgecheckpoint (
jobId、
执行id,
检查点,
串行状态,
状态;
作业管理器. tell ( message );
作业管理器将这些句柄的数据重新快照到本地和zk。 具体而言,请参阅作业管理器的基本组件。