首页>>互联网>>大数据->【图解源码】Zookeeper3.7源码剖析,Session的管理机制,Leader选举投票规则,集群数据同步流程

【图解源码】Zookeeper3.7源码剖析,Session的管理机制,Leader选举投票规则,集群数据同步流程

时间:2023-11-29 本站 点击:1

Zookeeper3.7源码剖析

能力目标

掌握Zookeeper中Session的管理机制

能基于Client进行Debug测试Session创建/刷新操作

能搭建Zookeeper集群源码配置

掌握集群环境下Leader选举启动过程

能说出Zookeeper选举过程中的概念

能说出Zookeeper选举投票规则

能画出Zookeeper集群数据同步流程

1 Session源码分析

客户端创建Socket连接后,会尝试连接,如果连接成功成功会调用到primeConnection方法用来发送ConnectRequest连接请求,这里便是设置session会话 ,关于客户端创建会话我们就不在这里做讲解了,我们直接讲解服务端Session会话处理流程。

1.1 服务端Session属性分析

Zookeeper服务端会话操作如下图:

在服务端通过SessionTrackerImplExpiryQueue来保存Session会话信息。

SessionTrackerImpl有以下属性:

1:sessionsById 用来存储ConcurrentHashMap<Long, SessionImpl> {sessionId:SessionImpl} 2:sessionExpiryQueue ExpiryQueue<SessionImpl>失效队列3:sessionsWithTimeout ConcurrentMap<Long, Integer>存储的是{sessionId: sessionTimeout} 4:nextSessionId 下一个sessionId

ExpiryQueue失效队列有以下属性:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。

1.2 Session创建

我们接着上一章的案例继续分析,假如客户端发起请求后,后端如何识别是第一次创建请求?在之前的案例源码NIOServerCnxn.readPayload()中有所体现,NIOServerCnxn.readPayload()部分关键源码如下:

此时如果initialized=false,表示第一次连接 需要创建Session(createSession),此处调用readConnectRequest()后,在readConnectRequest()方法中会将initialized设置为true,只有在处理完连接请求之后才会把initialized设置为true,才可以处理客户端其他命令。

上面方法还调用了processConnectRequest处理连接请求, processConnectRequest 第一次从请求中获取的sessionId=0,此时会把创建Session作为一个业务,会调用createSession()方法,processConnectRequest 方法部分关键代码如下:

创建会话调用createSession(),该方法会首先创建一个sessionId,并把该sessionId作为会话ID创建一个创建session会话的请求,并将该请求交给业务链作为一个业务处理,createSession()源码如下:

上面方法用到的sessionTracker.createSession(timeout)做了2个操作分别是创建sessionId和配置sessionId的跟踪信息,方法源码如下:

会话信息的跟踪其实就是将会话信息添加到队列中,任何地方可以根据会话ID找到会话信息,trackSession方法实现了Session创建、Session队列存储、Session过期队列存储,trackSession方法源码如下:

PrepRequestProcessorrun方法中调用pRequest2Txn,关键代码如下:

SyncRequestProcessor对txn(创建session的操作)进行持久化,在FinalRequestProcessor会对Session进行提交,其实就是把Session的ID和Timeout存到sessionsWithTimeout中去。

由于FinalRequestProcessor中调用链路太复杂,我们把调用链路写出来,大家可以按照这个顺序跟踪:

1:FinalRequestProcessor.applyRequest()        方法代码:ProcessTxnResult rc = zks.processTxn(request);2:ZooKeeperServer.processTxn(org.apache.zookeeper.server.Request)        方法代码:processTxnForSessionEvents(request, hdr, request.getTxn());

上面调用链路中processTxnForSessionEvents(request, hdr, request.getTxn());方法代码如下:

上面方法主要处理了OpCode.createSession并且将sessionId、TimeOut提交到sessionsWithTimeout中,而提交到sessionsWithTimeout的方法SessionTrackerImpl.commitSession()代码如下:

1.3 Session刷新

服务端无论接受什么请求命令(增删或ping等请求)都会更新Session的过期时间 。我们做增删或者ping命令的时候,都会经过RequestThrottlerRequestThrottler的run方法中调用zks.submitRequestNow(),而zks.submitRequestNow(request)中调用了touch(si.cnxn);,该方法源码如下:

touchSession()方法更新sessionExpiryQueue失效队列中的失效时间,源码如下:

update()方法会在当前时间的基础上增加timeout,并更新失效时间为newExpiryTime,关键源码如下:

1.4 Session过期

SessionTrackerImpl是一个线程类,继承了ZooKeeperCriticalThread,我们可以看它的run方法,它首先获取了下一个会话过期时间,并休眠等待会话过期时间到期,然后获取过期的客户端会话集合并循环关闭,源码如下:

上面方法中调用了sessionExpiryQueue.poll(),该方法代码主要是获取过期时间对应的客户端会话集合,源码如下:

上面的setSessionClosing()方法其实是把Session会话的isClosing状态设置为了true,方法源码如下:

而让客户端失效的方法expirer.expire(s);其实也是一个业务操作,主要调用了ZooKeeperServer.expire()方法,而该方法获取SessionId后,又创建了一个OpCode.closeSession的请求,并交给业务链处理,我们查看ZooKeeperServer.expire()方法源码如下:

PrepRequestProcessor.pRequest2Txn()方法中OpCode.closeSession操作里最后部分代理明确将会话Session的isClosing设置为了true,源码如下:

业务链处理对象FinalRequestProcessor.processRequest()方法调用了ZooKeeperServer.processTxn(),并且在processTxn()方法中执行了processTxnForSessionEvents,而processTxnForSessionEvents()方法正好移除了会话信息,方法源码如下:

移除会话的方法SessionTrackerImpl.removeSession()会移除会话ID以及过期会话对象,源码如下:

1.5 Zookeeper会话测试

为了让Zookeeper的会话理解更深刻,我们对会话流程做一个测试,首先测试会话创建,再测试会话刷新。

1)会话创建测试

我们打开NIOServerCnxn.readPayload()方法,跟踪首次创建会话,调试情况如下:

此时会建立远程连接并创建SessionID,我们调试到NIOServerCnxn.readConnectRequest()方法,此时建立链接,并且得到的sessionId=0。

当sessionId=0时,会执行Session创建,Session创建会调用SessionTrackerImpl.createSession()方法实现会话创建,并将会话存入跟踪队列,DEBUG测试如下:

会话创建代码如下:

跟踪测试后,控制台输出如下信息:

AcceptThread----------链接服务的IP:127.0.0.11:会话未连接,准备首次连接会话.....2:建立远程连接......2:第1次连接的sessionId=0使用SessionTrackerImpl创建会话,并将会话加入跟踪队列中3:sessionId=0,此时创建sessionId=72061099907219458

2)会话刷新测试

我们执行get /ZooKeeper指令,然后首先跟踪到RequestThrottler.run()方法,执行如下:

执行程序到达ZooKeeperServer.touch(),即将开始准备刷新会话了,我们测试效果如下:

调用SessionTrackerImpl.touchSession()的时候会先判断会话是否为空、会话是否已经关闭,如果都没有,才执行刷新会话操作,DEBUG跟踪如下:

刷新会话其实就是会话时间增加,增加会话时间DEBUG跟踪如下:

测试后效果如下:

a.当前请求并未过期,不需要删除,准备刷新会话b.准备调用SessionTrackerImpl.touchSession()刷新会话c.会话不为空,会话也未关闭,准备调用updateSessionExpiry()刷新会话d.剩余过期时间:54572178,增加过期时间:30000,刷新会话后过期时间:54604000

2 Zookeeper集群启动流程

我们先搭建Zookeeper集群,再来分析选举算法。

2.1 Zookeeper集群配置

如上图:

1:创建zoo1.cfg、zoo2.cfg、zoo3.cfg2:创建zkdata1、zkdata2、zkdata33:创建3个myid,值分别为1、2、3

配置3个启动类,如下图:

2.2 集群启动流程分析

如上图,上图是Zookeeper单机/集群启动流程,每个细节所做的事情都在上图有说明,我们接下来按照流程图对源码进行分析。

程序启动,运行流程启动集群模式,如下图:

quorumPeer.start()启动服务,如下代码:

quorumPeer.start()方法代码如下:

quorumPeer.start()方法启动的主要步骤:

1:loadDataBase()加载数据。2:startServerCnxnFactory 用来开启acceptThread、SelectorThread和workerPool线程池。3:开启Leader选举startLeaderElection。4:开启JVM监控线程startJvmPauseMonitor。5:调用父类super.start();进行Leader选举。

startLeaderElection()开启Leader选举方法做了2件事,首先创建初始化选票选自己,接着创建选举投票方式,源码如下:

createElectionAlgorithm()创建选举算法只有第3种,其他2种均已废弃,方法源码如下:

这个方法创建了以下三个对象:

①、创建QuorumCnxManager对象

②、QuorumCnxManager.Listener

③、FastLeaderElection

3 Zookeeper集群Leader选举

3.1 Paxos算法介绍

Zookeeper选举主要依赖于FastLeaderElection算法,其他算法均已淘汰,但FastLeaderElection算法又是典型的Paxos算法,所以我们要先学习下Paxos算法,这样更有助于掌握FastLeaderElection算法。

1)Paxos介绍

分布式事务中常见的事务模型有2PC和3PC,无论是2PC提交还是3PC提交都无法彻底解决分布式的一致性问题以及无法解决太过保守及容错性不好。Google Chubby的作者Mike Burrows说过,世上只有一种一致性算法,那就是Paxos,所有其他一致性算法都是Paxos算法的不完整版。Paxos算法是公认的晦涩,很难讲清楚,但是工程上也很难实现,所以有很多Paxos算法的工程实现,如Chubby, Raft,ZAB,微信的PhxPaxos等。这一篇会介绍这个公认为难于理解但是行之有效的Paxos算法。Paxos算法是莱斯利·兰伯特(Leslie Lamport)1990年提出的一种基于消息传递的一致性算法,它曾就此发表了《The Part-Time Parliament》,《Paxos Made Simple》,由于采用故事的方式来解释此算法,感觉还是很难理解。

2)Paxos算法背景 Paxos算法是基于消息传递且具有高度容错特性的一致性算法,是目前公认的解决分布式一致性问题最有效的算法之一,其解决的问题就是在分布式系统中如何就某个值(决议)达成一致。 面试的时候:不要把这个Paxos算法达到的目的和分布式事务联系起来,而是针对Zookeeper这样的master-slave集群对某个决议达成一致,也就是副本之间写或者leader选举达成一致。我觉得这个算法和狭义的分布式事务不是一样的。 在常见的分布式系统中,总会发生诸如机器宕机或网络异常(包括消息的延迟、丢失、重复、乱序,还有网络分区)(也就是会发生异常的分布式系统)等情况。Paxos算法需要解决的问题就是如何在一个可能发生上述异常的分布式系统中,快速且正确地在集群内部对某个数据的值达成一致。也可以理解成分布式系统中达成状态的一致性。

3)Paxos算法理解

Paxos 算法是分布式一致性算法用来解决一个分布式系统如何就某个值(决议)达成一致的问题。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个”一致性算法”以保证每个节点看到的指令一致。 分布式系统中一般是通过多副本来保证可靠性,而多个副本之间会存在数据不一致的情况。所以必须有一个一致性算法来保证数据的一致,描述如下:   假如在分布式系统中初始是各个节点的数据是一致的,每个节点都顺序执行系列操作,然后每个节点最终的数据还是一致的。   Paxos算法就是解决这种分布式场景中的一致性问题。对于一般的开发人员来说,只需要知道paxos是一个分布式选举算法即可。多个节点之间存在两种通讯模型:共享内存(Shared memory)、消息传递(Messages passing),Paxos是基于消息传递的通讯模型的。

4)Paxos相关概念

在Paxos算法中,有三种角色:

Proposer

Acceptor

Learners

在具体的实现中,一个进程可能同时充当多种角色。比如一个进程可能既是Proposer又是Acceptor又是Learner。Proposer负责提出提案,Acceptor负责对提案作出裁决(accept与否),learner负责学习提案结果。 还有一个很重要的概念叫提案(Proposal)。最终要达成一致的value就在提案里。只要Proposer发的提案被Acceptor接受(半数以上的Acceptor同意才行),Proposer就认为该提案里的value被选定了。Acceptor告诉Learner哪个value被选定,Learner就认为那个value被选定。只要Acceptor接受了某个提案,Acceptor就认为该提案里的value被选定了。 为了避免单点故障,会有一个Acceptor集合,Proposer向Acceptor集合发送提案,Acceptor集合中的每个成员都有可能同意该提案且每个Acceptor只能批准一个提案,只有当一半以上的成员同意了一个提案,就认为该提案被选定了。

3.2 QuorumPeer工作流程

QuorumCnxManager:每台服务器在启动的过程中,会启动一个QuorumPeer,负责各台服务器之间的底层Leader选举过程中的网络通信对应的类就是QuorumCnxManager

Zookeeper对于每个节点QuorumPeer的设计相当的灵活,QuorumPeer主要包括四个组件:客户端请求接收器(ServerCnxnFactory)、数据引擎(ZKDatabase)、选举器(Election)、核心功能组件(Leader/Follower/Observer)。

1:ServerCnxnFactory负责维护与客户端的连接(接收客户端的请求并发送相应的响应);(1001行)2:ZKDatabase负责存储/加载/查找数据(基于目录树结构的KV+操作日志+客户端Session);(129行)3:Election负责选举集群的一个Leader节点;(998行)4:Leader/Follower/Observer确认是QuorumPeer节点应该完成的核心职责;(1270行)

QuorumPeer工作流程比较复杂,如下图:

QuorumPeer工作流程:

1:初始化配置2:加载当前存在的数据3:启动网络通信组件4:启动控制台5:开启选举协调者,并执行选举(这个过程是会持续,并不是一次操作就结束了)

3.3 QuorumCnxManager源码分析

QuorumCnxManager内部维护了一系列的队列,用来保存接收到的、待发送的消息以及消息的发送器,除接收队列以外,其他队列都按照SID分组形成队列集合,如一个集群中除了自身还有3台机器,那么就会为这3台机器分别创建一个发送队列,互不干扰。

QuorumCnxManager.Listener :为了能够相互投票,Zookeeper集群中的所有机器都需要建立起网络连接。QuorumCnxManager在启动时会创建一个ServerSocket来监听Leader选举的通信端口。开启监听后,Zookeeper能够不断地接收到来自其他服务器地创建连接请求,在接收到其他服务器地TCP连接请求时,会进行处理。为了避免两台机器之间重复地创建TCP连接,Zookeeper只允许SID大的服务器主动和其他机器建立连接,否则断开连接。在接收到创建连接请求后,服务器通过对比自己和远程服务器的SID值来判断是否接收连接请求,如果当前服务器发现自己的SID更大,那么会断开当前连接,然后自己主动和远程服务器将连接(自己作为“客户端”)。一旦连接建立,就会根据远程服务器的SID来创建相应的消息发送器SendWorker和消息发送器RecvWorker,并启动。

QuorumCnxManager.Listener监听启动可以查看QuorumCnxManager.Listenerrun方法,源代码如下,可以断点调试看到此时监听的正是我们所说的投票端口:

上面是监听器,各个服务之间进行通信我们需要开启ListenerHandler线程,在QuorumCnxManager.Listener.ListenerHandler的run方法中有一个方法acceptConnections()调用,该方法就是用于接受每次选举投票的信息,如果只有一个节点或者没有投票信息的时候,此时方法会阻塞,一旦执行选举,程序会往下执行,我们可以先启动1台服务,再启动第2台、第3台,此时会收到有客户端参与投票链接,程序会往下执行,源码如下:

我们启动2台服务,效果如下:

上面虽然能证明投票访问了当前监听的端口,但怎么知道是哪台服务呢?我们可以沿着receiveConnection()源码继续研究,源码如下:

receiveConnection()方法只是获取了数据流,并没做特殊处理,并且调用了handleConnection()方法,该方法源码如下:

通过网络连接获取数据sid,获取sid表示是哪一台连过来的,我们可以打印输出sid,测试输出如下数据:

参与投票的MyID=2参与投票的MyID=3

3.4 FastLeaderElection算法源码分析

Zookeeper集群中,主要分为三者角色,而每一个节点同时只能扮演一种角色,这三种角色分别是:

(1)Leader 接受所有Follower的提案请求并统一协调发起提案的投票,负责与所有的Follower进行内部的数据交换(同步);

(2)Follower 直接为客户端提供服务并参与提案的投票,同时与Leader进行数据交换(同步);

(3)Observer 直接为客户端服务但并不参与提案的投票,同时也与Leader进行数据交换(同步);

FastLeaderElection 选举算法是标准的 Fast Paxos 算法实现,可解决 LeaderElection 选举算法收敛速度慢的问题。

创建FastLeaderElection只需要new FastLeaderElection()即可,如下代码:

创建FastLeaderElection会调用starter()方法,该方法会创建sendqueuerecvqueue队列、Messenger对象,其中Messenger对象的作用非常关键,方法源码如下:

创建Messenger的时候,会创建WorkerSender并封装成wsThread线程,创建WorkerReceiver并封装成wrThread线程,看名字就很容易理解,wsThread用于发送数据,wrThread用于接收数据,Messenger创建源码如下:

创建完FastLeaderElection后接着会调用它的start()方法启动选举算法,代码如下:

启动选举算法会调用start()方法,start()方法如下:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。0

上面会执行messager.start(),也就是如下方法,也就意味着wsThreadwrThread线程都将启动,源码如下:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。1

wsThreadWorkerSender封装而来,此时会调用WorkerSenderrun方法,run方法会调用process()方法,源码如下:

process方法调用了managertoSend方法,此时是把对应的sid作为了消息发送出去,这里其实是发送投票信息,源码如下:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。2

投票可以投自己,也可以投别人,如果是选票选自己,只需要把投票信息添加到recvQueue中即可,源码如下:

WorkerReceiver.run方法中会从recvQueue中获取Message,并把发送给其他服务的投票封装到sendqueue队列中,交给WorkerSender发送处理,源码如下:

3.5 Zookeeper选举投票剖析

选举是个很复杂的过程,要考虑很多场景,而且选举过程中有很多概念需要理解。

3.5.1 选举概念

1)ZK服务状态:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。3

2)服务角色:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。4

3)投票消息广播:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。5

4)选票模型:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。6

5)消息发送对象:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。7

3.5.2 选举过程

QuorumPeer本身是个线程,在集群启动的时候会执行quorumPeer.start();,此时会调用它重写的start()方法,最后会调用父类的start()方法,所以该线程会启动执行,因此会执行它的run方法,而run方法正是选举流程的入口,我们看run方法关键源码如下:

所有节点初始状态都为LOOKING,会进入到选举流程,选举流程首先要获取算法,获取算法的方法是makeLEStrategy(),该方法返回的是FastLeaderElection实例,核心选举流程是FastLeaderElection中的lookForLeader()方法。

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。8

lookForLeader()是选举过程的关键流程,源码分析如下:

上面多个地方都用到了过半数以上的方法hasAllQuorums()该方法用到了QuorumMaj类,代码如下:

QuorumMaj构造函数中体现了过半数以上的操作,代码如下:

3.5.3 投票规则

我们来看一下选票PK的方法totalOrderPredicate(),该方法其实就是Leader选举规则,规则有如下三个:

1:elemMap ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间。2:expiryMap ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>} 失效时间,当前失效时间的Session对象集合。3:nextExpirationTime 下一次失效时间 {(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval 当前系统时间毫秒值ms=System.nanoTime() / 1000000。 nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)。4:expirationInterval 失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改。9

源码如下:

4 Zookeeper集群数据同步

所有事务操作都将由leader执行,并且会把数据同步到其他节点,比如follower、observer,我们可以分析leader和follower的操作行为即可分析出数据同步流程。

4.1 Zookeeper同步流程说明

整体流程:

1:FinalRequestProcessor.applyRequest()        方法代码:ProcessTxnResult rc = zks.processTxn(request);2:ZooKeeperServer.processTxn(org.apache.zookeeper.server.Request)        方法代码:processTxnForSessionEvents(request, hdr, request.getTxn());0

我们回到QuorumPeer.run()方法,根据确认的不同角色执行不同操作展开分析。

4.2 Zookeeper Follower同步流程

Follower主要连接Leader实现数据同步,我们看看Follower做的事,我们仍然沿着QuorumPeer.run()展开学习,关键代码如下:

创建Follower的方法比较简单,代码如下:

我们看一下整个Follower在数据同步中做的所有操作follower.followLeader();,源码如下图:

上面源码中的follower.followLeader()方法主要做了如下几件事:

1:FinalRequestProcessor.applyRequest()        方法代码:ProcessTxnResult rc = zks.processTxn(request);2:ZooKeeperServer.processTxn(org.apache.zookeeper.server.Request)        方法代码:processTxnForSessionEvents(request, hdr, request.getTxn());1

我们对follower.followLeader()调用的其他方法进行剖析,其中findLeader()是寻找当前Leader节点的,源代码如下:

followLeader()中调用了registerWithLeader(Leader.FOLLOWERINFO);该方法是向Leader注册Follower,会将当前Follower节点信息发送给Leader节点,Follower节点信息发给Leader是必须的,是Leader同步数据个基础,源码如下:

followLeader()中最后读取数据包执行同步的方法中调用了readPacket(qp);,这个方法就是读取Leader的数据包的封装,源码如下:

4.3 Zookeeper Leader同步流程

我们查看QuorumPeer.run()方法的LEADING部分,可以看到先创建了Leader对象,并设置了Leader,然后调用了leader.lead()leader.lead()是执行的核心业务流程,源码如下:

leader.lead()方法是Leader执行的核心业务流程,源码如下:

leader.lead()方法会执行如下几个操作:

1:FinalRequestProcessor.applyRequest()        方法代码:ProcessTxnResult rc = zks.processTxn(request);2:ZooKeeperServer.processTxn(org.apache.zookeeper.server.Request)        方法代码:processTxnForSessionEvents(request, hdr, request.getTxn());2

lead()方法中会创建LearnerCnxAcceptor,该对象是一个线程,主要用于接收followers的连接,这里加了CountDownLatch根据配置的同步的地址的数量(例如:server.2=127.0.0.1:12881:13881 配置同步的端口是12881只有一个),LearnerCnxAcceptor的run方法源码如下:

LearnerCnxAcceptor的run方法中创建了LearnerCnxAcceptorHandler对象,在接收到链接后,就会调用LearnerCnxAcceptorHandler,而LearnerCnxAcceptorHandler是一个线程,它的run方法中调用了acceptConnections()方法,源码如下:

acceptConnections()方法会在这里阻塞接收followers的连接,当有连接过来会生成一个socket对象。然后根据当前socket生成一个LearnerHandler线程 ,每个Learner者都会开启一个LearnerHandler线程,方法源码如下:

LearnerHandler.run 这里就是读取或写数据包与Learner交换数据包。如果没有数据包读取,则会阻塞当前方法ia.readRecord(qp, "packet");,源码如下:

我们再回到leader.lead()方法,其中调用了getEpochToPropose()方法,该方法是判断connectingFollowers发给leader端的Epoch是否过半,如果过半则会解阻塞,不过半会一直阻塞着,直到Follower把自己的Epoch数据包发送过来并符合过半机制,源码如下:

lead()方法中,当发送的Epoch过半之后,把当前zxid设置到zk,并等待EpochAck,关键源码如下:

waitForEpochAck()方法也会等待超过一半的(Follower和Observer)获取了新的epoch,并且返回了Leader.ACKEPOCH,才会解除阻塞,否则会一直阻塞。等待EpochAck解阻塞后,把得到最新的epoch更新到当前服务,设置当前leader节点的zab状态是SYNCHRONIZATION,方法源码如下:

lead()方法中还需要等待超过一半的(Follower和Observer)进行数据同步成功,并且返回了Leader.ACK,程序才会解除阻塞,如下代码:

上面所有流程都走完之后,就证明数据已经同步成功了,会执行startZkServer();

4.4 LearnerHandler数据同步操作

LearnerHandler线程是对应于Learner连接Leader端后,建立的一个与Learner端交换数据的线程。每一个Learner端都会创建一个 LearnerHandler线程。

我们详细讲解LearnerHandler.run()方法。

readRecord读取数据包 不断从learner节点读数据,如果没读到将会阻塞readRecord

如果数据包类型不是Leader.FOLLOWERINFO或Leader.OBSERVERINFO将会返回,因为咱们这里本身就是Leader节点,读数据肯定是读非Leader节点数据。

获取learnerInfoData来获取sid和版本信息。

获取followerInfo和lastAcceptedEpoch,信息如下:

把Leader.NEWLEADER数据包放入到queuedPackets,并向其他节点发送,源码如下:

往期干货:

怎样才能快速成为一名架构师?

【图文并茂】这次一文讲透JVM架构、类文件结构、字节码结构!!

4859字,609行,一次讲清楚JVM运行数据区

搞不懂JVM的类加载机制,JVM性能优化从何谈起?

当你new了一个对象时,JVM底层都帮你干了啥?

13651个字,给你解释清楚 JVM对象销毁

【开悟篇】Java多线程之JUC从入门到精通

史上最全Spring Cloud Alibaba--Nacos教程(涵盖负载均衡、配置管理、多环境切换、配置共享/刷新、灰度、集群)

100003字,带你解密 双11、618电商大促场景下的系统架构体系

探针技术-JavaAgent 和字节码增强技术-Byte Buddy

本文由传智教育博学谷 - 狂野架构师教研团队发布 如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力 转载请注明出处!

原文:https://juejin.cn/post/7112681401601769486


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/BigData/1309.html