欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
在前面的文章已经分析过了ZK集群数据同步(广播模式)建立通信连接的流程了,本次文章便分析一下建立通信连接的源码层面逻辑。
注:本篇基于ZK版本3.7分析的。
2. 建立通信相关角色及流程图
数据同步建立通信的流程可以看成是古代附属王朝向中央王朝进行朝贡的流程,即各个Follower主动向Leader发送连接请求,Leader接收到各个Follower机器的请求后生成对应的处理器,这些处理器可以看成是中央王朝派遣的接待使。当然这中间肯定是要经过一系列的逻辑确定Leader机器是否是真正合格的Leader,需要得到各个Follower的支持和校验,否则集群将会重新发起选举选出新的Leader,就像古代附属王朝都想篡位自己当皇上天子一样,当然ZK集群比这简单很多。
2.1 相关角色及交互关系图
大致通信流程相关角色及交互关系如下图:
对于其中的流程以及注意点需要特别说明下:
方框的代表普通类对象,角有圆形弧度的说明是线程对象,虚线代表启动线程对象;
QuorumPacket:集群通信的包对象,Leader和Follower之间的通信便是使用QuorumPacket类为载体进行发送与接收的,后面将其称为packet数据包;
Follower角色:在继承关系上会继承Learner,Observer也会继承Learner类,因此从某一角度来看ZK集群中会含有两种角色:Leader和Learner,但是本次只会分析Follower角色;
LearnerCnxAcceptor组件:简单的翻译下这个类名:Learner角色的连接接收器,顾名思义这个多线程类的作用便是用来接收各个Learner角色发送过来的连接请求,并进行相应的处理,从图中可以看出来接收到连接请求后将会使用接收到的Socket创建LearnerHandler线程对象;
LearnerHandler组件:用来维护与Learner的通信关系,负责接收和向Learner发送请求数据包,相当于前面提到过中央王朝(Leader)派遣的接待使,Leader不会直接与Learner进行通信,而是每个都有一个LearnerHandler来维护;
collection correspond:Leader与LearnerHandler之间的通信并不是直接交互的,如果接收到了Learner消息,LearnerHandler将会先放入到集合中,Leader则是直接判断集合数据,但Leader如果要做什么操作(其实只有ping操作,但留在后面再说)将会直接调用LearnerHandler对象的方法。
2.2 建立通信交互流程图
Leader和Follower之间建立通信主要是Socket通信比较多,大部分都是固定的异步即时通信,因此如果没有例子单看流程的话会比较晕,因为该流程的异步通信至少都有10个左右,更不用说如果是中途选举成功,还有一些旧数据需要同步的情况了。
本次建立通信交互流程依旧是原来的机器A、B和C,大致流程图如下:
整个流程较为复杂,因为涉及到两台机器的Socket即时通信,每次接收方法流程都会阻塞,因此需要在整体上了解整个流程,需要注意的是LearnerHandler对象和Follower对象间的Socket通信介质是QuorumPacket集群数据包对象,后面会统称为消息。
从图中可以看出来流程总共有26个,但总体上来看,整个流程总共有7个阶段:
第一阶段:Leader和Follower的准备阶段,如Leader创建LearnerCnxAcceptor对象,Follower开始连接Leader机器地址;
第二阶段:Leader机器验证集群内的FOLLOWERINFO类型消息,确保建立通信集群内确实是有超过半数的机器参与其中的,并且发送LEADERINFO类型消息将Leader机器的信息同步给各个Follower机器;
第三阶段:Leader机器验证集群内的ACKEPOCH类型消息,该消息是为了判断集群内各个机器是否已经收到了LEADERINFO类型消息并处理成功;
第四阶段:Leader机器将本机器日志信息尚未同步给集群内机器的请求再次以PROPOSAL-COMMIT对数据发送给各个Learner机器进行同步,并最终发送NEWLEADER类型消息,确保各个Follower已经完成消息同步并认可当前集群Leader机器;
第五阶段:Leader机器验证集群内的ACK类型消息,确定超过半数的机器是在集群内正常运行的;
第六阶段:Leader机器发送UPTODATE类型消息通知各个Follower可以退出和LearnerHandler对象的循环消息通信,开始准备对外处理请求服务了;
第七阶段:最终阶段,各个机器启动ZooKeeperServer对象准备处理ZK客户端请求,并且Leader进入心跳检测循环,用来监听集群的运行情况;LearnerHandler和Follower进入最终的循环消息通信,以便集群后续接收ZK客户端的请求后可以进行同步请求通信。
Leader和各个Follower建立通信的流程总共经过了26个步骤流程,需要注意的是在上面所说的超过过半机器响应消息是包括Leader机器的,也就是说如果是三台机器A、B和C,C只需要接收到A或者B的响应,再加上C机器本身的调用即可完成验证,具体流程作用前面已经分析过了,这里便忽略。
2.3 Follower和Leader状态同步源码解析
2.4 源码整体流程图
2.5 服务端Leader启动
2.6 服务端Follewer 启动
3. 建立通信连接源码分析
虽然这部分的代码总体不是很多,但碍于对象间的通信过多,因此本次源码分析将和以往的有些不一样,因为建立通信时本人将其分成了七个阶段来分析,因此在分析该流程时同样也会将其分为七个阶段,每个阶段单独分析,这样更便于理解,同时也可以对七个阶段的细节部分做一个详细分析。
建议在跟进源码时同时看上面的图片和阶段流程一起跟进,更便于理解当前进度和位置,以免丢失源码分析思路。
在集群中Follower和Observer统称为Learner,但是由于本次流程分析主要分析Follower,因此在源码中可能会把Learner统称为Follower。
注意:本源码分析和ZK的源码并不是一致的,为了方便理解在不影响总体逻辑的情况下会将一些代码顺序修改。
3.1 第一阶段:Leader和Follower准备工作
Leader和Follower的准备阶段,如Leader去创建LearnerCnxAcceptor对象用来接收其它机器的连接,而Follower等机器开始连接Leader机器的LearnerCnxAcceptor对象用来实例化和本机器对应的LearnerHandler对象。
3.1.1 Leader - 准备阶段
该阶段Leader将会进行加载日志文件以实例化LearnerCnxAcceptor对象等操作。关键源码如下:
public class Leader { // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 用来接收Follower对象的请求连接对象 LearnerCnxAcceptor cnxAcceptor; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; void lead() throws IOException, InterruptedException { // 当Leader开始进行领导集群时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // tick记录归零该值会在最后的心跳检测每个tickTime时间便+1 self.tick = 0; // 加载本机器的日志文件信息,并生成lastProcessedZxid属性 zk.loadData(); // 实例化LearnerCnxAcceptor对象 cnxAcceptor = new LearnerCnxAcceptor(); // 启动线程对象 cnxAcceptor.start(); // L1流程结束,后面略... } }}
3.1.2 Follower - 连接Leader机器
从QuorumPeer对象中获取Leader配置的连接信息并进行连接,源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}
3.1.3 LearnerHandler - 诞生缘由
前面的Follower已经使用Socket开始连接Leader的SocketServer了,而SocketServer是由LearnerCnxAcceptor对象管理的,因此LearnerHandler对象的诞生便在LearnerCnxAcceptor线程对象中。源码如下:
class LearnerCnxAcceptor extends ZooKeeperThread{ // Leader机器维护的ServerSocket对象,用来接收其它机器发送的Socket连接请求 ServerSocket ss; // 用来标注LearnerCnxAcceptor是否已经被关闭,当Leader调用shutdown方法时 // 将会设置stop属性为true private volatile boolean stop = false; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException { // LearnerCnxAcceptor对象是Leader的内部类,Leader对象在实例化时将会 // 实例化ServerSocket对象 this.self = self; try { // quorumListenOnAllIPs属性如果为true则会获取端口所有可用地址 if (self.getQuorumListenOnAllIPs()) { ss = new ServerSocket(self.getQuorumAddress().getPort()); } else { // 否则将只使用固定的地址 ss = new ServerSocket(); } ss.setReuseAddress(true); // 使用配置中固定的集群地址 if (!self.getQuorumListenOnAllIPs()) { ss.bind(self.getQuorumAddress()); } } catch (BindException e) { throw e; } this.zk=zk; } @Override public void run() { // 如果该线程对象未暂停则一直轮询 if (!stop.get() && !serverSockets.isEmpty()) { ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size()); CountDownLatch latch = new CountDownLatch(serverSockets.size()); // todo serverSockets.forEach(serverSocket -> executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch))); try { latch.await(); } ... } } }class LearnerCnxAcceptorHandler implements Runnable { private ServerSocket serverSocket; private CountDownLatch latch; @Override public void run() { try { while (!stop.get()) { // todo acceptConnections(); } } ... } private void acceptConnections() throws IOException { Socket socket = null; boolean error = false; try { // ServerSocket接收到从Follower发送过来的连接请求 socket = serverSocket.accept(); // start with the initLimit, once the ack is processed // in LearnerHandler switch to the syncLimit // 首先设置为tickTime*initLimit socket.setSoTimeout(self.tickTime * self.initLimit); // 设置Socket的nodelay属性 socket.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream(socket.getInputStream()); // 当接收成功Socket之后便创建维护Socket通信的对象 // LearnerHandler,随后启动该线程对象 LearnerHandler fh = new LearnerHandler(socket, is, Leader.this); fh.start(); } ... }}
3.2 第二阶段:交换Leader和Follower信息
经过第一阶段后Leader已经通过LearnerCnxAcceptor线程对象接收到了Follower连接请求,并实例化了对应Socket对象的维护对象LearnerHandler,接下来便正式开始Leader机器和Follower机器间的建立通信连接流程。
3.2.1 Follower - 发送FOLLOWERINFO处理LEADERINFO消息
当Follower连接Leader机器的对象ServerSocket成功后,本对象将会进入正式的消息发送和接收流程。源码如下:
public class Follower extends Learner{ void followLeader() throws InterruptedException { // 在F1流程已经分析过了,接着续上 connectToLeader(addr); // 连接ServerSocket对象成功后本对象将会向其发送FOLLOWERINFO类型的消息 // 通知Leader本Follower的机器信息,如果是Observer则发送OBSERVERINFO // 类型的消息通知Leader本Observer的机器信息 long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); } protected long registerWithLeader(int pktType) throws IOException{ // 开始F2流程,先获取最新的zxid long lastLoggedZxid = self.getLastLoggedZxid(); // 创建包对象 QuorumPacket qp = new QuorumPacket(); // pktType如果是Follower对象则是FOLLOWERINFO,如果是Observer则是 // OBSERVERINFO类型 qp.setType(pktType); // 根据选举后接收的acceptedEpoch生成发送FOLLOWERINFO类型消息的zxid qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); // 生成LearnerInfo对象,需要注意包对象发送了protocolVersion属性 LearnerInfo li = new LearnerInfo(self.getId(), 0x10000); // 生成BinaryOutputArchive对象,并将LeanerInfo对象写入 ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); // 将对象的字节数据放入包对象中 qp.setData(bsid.toByteArray()); // 使用leaderOs对象发送包数据,F2流程结束 writePacket(qp, true); // 开始F3流程,接收到了Leader机器的LearnerHandler回复的LEADERINFO类型 // 消息 readPacket(qp); // 获取Leader回复的zxid,并根据zxid获取epoch信息 final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); // 版本相同的情况下这里接收到的肯定是LEADERINFO类型消息 if (qp.getType() == Leader.LEADERINFO) { // 获取Leader回复的leaderProtocolVersion属性信息 leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); // 生成等下发送ACKEPOCH类型消息的消息体 byte epochBytes[] = new byte[4]; // 封装需要发送的消息体数据 final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); // 如果Leader发送过来的epoch大于本机器的acceptedEpoch,则直接发送 // 本机器的currentEpoch属性,并设置acceptedEpoch信息 if (newEpoch > self.getAcceptedEpoch()) { // 放置currentEpoch属性 wrappedEpochBytes.putInt((int)self.getCurrentEpoch()); // 根据epoch信息设置acceptedEpoch属性 self.setAcceptedEpoch(newEpoch); } else if (newEpoch == self.getAcceptedEpoch()) { // 如果相等的说明选举获得的acceptedEpoch是没问题的 wrappedEpochBytes.putInt(-1); } else { // newEpoch是不可能比acceptedEpoch小的,小的话说明流程出现了 // 问题,直接抛出异常结束流程 throw new IOException(); } // 后面略... } else { // 估计是兼容老版本的逻辑,中途不经过ACKEPOCH过程 // 直接发送NEWLEADER类型消息 if (newEpoch > self.getAcceptedEpoch()) { // 如果大于则设置acceptedEpoch信息 self.setAcceptedEpoch(newEpoch); } // 不是NEWLEADER信息则说明老版本流程异常,抛出异常结束流程 if (qp.getType() != Leader.NEWLEADER) { throw new IOException(); } // 返回Leader数据包的zxid return qp.getZxid(); } } }
3.2.2 LearnerHandler - 处理FOLLOWERINFO并发送LEADERINFO消息
当LearnerHandler在LearnerCnxAcceptor对象中被创建之后,便只会一直维护对应的Socket对象,直到发生了异常。接下来便正式开始分析LearnerHandler和Leader对象还有Follower对象的交互流程,源码如下:
public class LearnerHandler extends ZooKeeperThread { // 保存对应Follower的Socket对象 protected final Socket sock; // 本对象对应的Leader对象,以方便获取Leader对象的信息 final Leader leader; // 截止到下次通信的结束时间,在建立通信连接阶段该属性是基于initLimit属性和 // syncLimit属性往上加的,当完成建立通信连接过程后将会以syncLimit属性往上 // 加,一旦超出这个属性就代表本对象维护的Socket对象连接机器已经失联了 volatile long tickOfNextAckDeadline; // 和对应的Follower进行通信的输入输出流对象 private BinaryInputArchive ia; private BinaryOutputArchive oa; private BufferedOutputStream bufferedOutput; // 对应Follower或者Observer对象的唯一表示,相当于Learner机器的myid属性 protected long sid = 0; // 当确认Learner是Follower或者Observer时将会设置成对应的值 private LearnerType learnerType = LearnerType.PARTICIPANT; @Override public void run() { try { // H2流程开始,先初始化tickOfNextAckDeadline属性,由于最开始创建时 // tick为0,因此刚开始tickOfNextAckDeadline属性就是initLimit加上 // syncLimit,但如果Follower是中途加入的,tick则不会是0 tickOfNextAckDeadline = leader.self.tick + leader.self.initLimit + leader.self.syncLimit; // 根据Socket实例化输入输出流对象 ia = BinaryInputArchive.getArchive( new BufferedInputStream(sock.getInputStream())); bufferedOutput = new BufferedOutputStream( sock.getOutputStream()); oa = BinaryOutputArchive.getArchive(bufferedOutput); // 实例化数据包对象 QuorumPacket qp = new QuorumPacket(); // 从F2流程中读取FOLLOWERINFO或者OBSERVERINFO类型消息 ia.readRecord(qp, "packet"); // 如果首先接受到的消息类型不是FOLLOWERINFO或OBSERVERINFO,则说明 // 流程有问题,直接结束LearnerHandler线程对象方法 if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){ return; } // 从包数据中获取learnerInfo的字节数组数据 byte learnerInfoData[] = qp.getData(); if (learnerInfoData != null) { // 如果learnerInfo数据不为空且长度为8则说明为老版本,包数据中 // 不包含protocolVersion版本属性信息,只有sid信息 if (learnerInfoData.length == 8) { // 读取sid信息 ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); this.sid = bbsid.getLong(); } else { // 如果数据长度不为8则说明为新版本,发送了protocolVersion // 版本属性信息,根据字节数组初始化LearnerInfo对象 LearnerInfo li = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record( ByteBuffer.wrap(learnerInfoData), li); // 获取sid和protocolVersion版本信息 this.sid = li.getServerid(); this.version = li.getProtocolVersion(); } } else { // 如果learnerInfo信息发送过来的为空,则说明是更老的版本,直接 // 手动+1,在本机模拟对应Learner机器的sid this.sid = leader.followerCounter.getAndDecrement(); } // 设置成OBSERVER类型,但在本次分析中类型是FOLLOWERINFO if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } long peerLastZxid; StateSummary ss = null; // 获取包数据的zxid long zxid = qp.getZxid(); // 根据包对象中的zxid获取Follower对象的acceptedEpoch信息 long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid( qp.getZxid()); // 调用Leader的验证集群信息交换,验证集群是否有超过半数发送了 // 机器对应epoch和sid信息,调用这个方法就意味着进入了L2流程, // 当超过半数的机器发送了机器信息,将会继续往下走,否则阻塞 long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); if (this.getVersion() < 0x10000) { // 如果是老版本则根据包数据的zxid获取epoch信息 long epoch = ZxidUtils.getEpochFromZxid(zxid); // 生成对应的StateSummary对象 ss = new StateSummary(epoch, zxid); // 进入L3流程,但实际上如果集群的jar包版本都是3.4.8,将不会执行 // 到这里 leader.waitForEpochAck(this.getSid(), ss); } else { // 同版本情况下将会执行到这里 // 生成待发送消息体 byte ver[] = new byte[4]; // 将版本信息放入到消息体中 ByteBuffer.wrap(ver).putInt(0x10000); // 执行到这H2流程结束,开始H3流程 // 根据消息体和获取的最大集群epoch信息生成LEADERINFO类型消息 QuorumPacket newEpochPacket = new QuorumPacket( Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null); // 发送LEADERINFO类型消息,H3流程结束,进入F3流程 oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); // 后面略... } } }}
3.2.3 Leader - 验证集群信息交换
Leader对象在实例化LearnerCnxAcceptor对象后将会进入验证集群信息交换的状态,用来验证集群内的机器是否已经成功交换完LEADERINFO和FOLLOWERINFO消息。源码如下:
public class Leader { // 记录最后面接收的zxid long lastProposed; // 记录新Leader需要发送的消息 Proposal newLeaderProposal = new Proposal(); // 标识是否需要等待确认新的集群epoch信息,如果为true代表集群响应 // 的FOLLOWERINFO或者OBSERVERINFO信息未过半,需要继续确认;为false // 则代表已经过半,无需再确认 boolean waitingForNewEpoch = true; // 确认FOLLOWERINFO类型消息时用来临时存储本次校验的临时epoch字段 long epoch = -1; // 用来接收各个机器响应FOLLOWERINFO或OBSERVERINFO消息中的sid信息 // 用来记录哪些机器已经发送过FOLLOWERINFO类型消息,作为判断集群 // 响应过半的依据 private HashSet<Long> connectingFollowers = new HashSet<Long>(); void lead() throws IOException, InterruptedException { cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); // 续着前面的L1流程,开始进行L2流程 // 传入Leader机器的sid和acceptedEpoch信息,模拟收到了本机器的机器信息 // 在这里会阻塞查询connectingFollowers集合用来判断集群是否有过半的机器 // 响应了FOLLOWERINFO机器信息 long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); // 超过半数的机器发送了机器信息到Leader上后将会停止阻塞 // 在这里根据epoch信息设置zxid zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); synchronized(this){ // 记录zxid lastProposed = zk.getZxid(); } // 生成NEWLEADER信息,等待后续发送 newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); // L2流程结束,后面略... } public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { // 如果waitingForNewEpoch为true则代表需要进行后续的判断 // 否则直接返回epoch临时字段结果 if (!waitingForNewEpoch) { return epoch; } // 如果有机器的acceptedEpoch信息大于现有epoch临时字段则 // 使用新的acceptedEpoch值+1进行赋值 if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch+1; } // 保存发送机器信息过来的sid connectingFollowers.add(sid); // 获取集群的资格校验器 QuorumVerifier verifier = self.getQuorumVerifier(); // 判断connectingFollowers集合中是否含有本机器的sid,并且 // 响应的机器数量是否超过半数 if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { // 进入到这里说明响应机器数量超过半数,将waitingForNewEpoch // 属性设置为false,说明无需再等待新的epoch机器信息 waitingForNewEpoch = false; // 将新的epoch信息设置到acceptedEpoch信息 self.setAcceptedEpoch(epoch); // 唤醒阻塞的线程 connectingFollowers.notifyAll(); } else { // 执行到这里说明响应机器尚未过半,需要阻塞并等待响应过半 // 唤醒本线程 // 记录阻塞开始时间 long start = System.currentTimeMillis(); long cur = start; // 获取等待结束时间,在start时间加上initLimit和tickTime属性 long end = start + self.getInitLimit()*self.getTickTime(); // 一直轮询其它机器的响应情况,直到有其它的线程接收到响应且过半 // 后将waitingForNewEpoch设置为false,或者cur当前时间已经超过 // 了end时间,代表响应已经超时了 while(waitingForNewEpoch && cur < end) { // 一直等待直到end时间节点到来 connectingFollowers.wait(end - cur); // 每次轮询更新cur为当前时间 cur = System.currentTimeMillis(); } // 如果循环结束后waitingForNewEpoch依旧是true则说明在规定时间 // 内集群没有过半的机器响应FOLLOWERINFO信息,说明同步机器epoch // 信息失败,需要抛出中断异常,并开始下一次的选举 if (waitingForNewEpoch) { throw new InterruptedException(); } } // 如果执行到这里说明已经有过半的机器响应了,且获得了集群内最新的 // epoch信息,随后返回 return epoch; } }}
3.3 第三阶段:同步确认Epoch信息
在刚刚那个阶段已经发送同步完各个机器信息的消息,接下来将会进行Epoch信息同步结果校验。
3.3.1 Follower - 发送ACKEPOCH信息
本流程十分简单,只是单纯的向Leader发送ACKEPOCH消息而已,源码如下:
public class Follower extends Learner{ protected long registerWithLeader(int pktType) throws IOException{ // F3的流程,略... if (qp.getType() == Leader.LEADERINFO) { // F3的流程,略... if (newEpoch > self.getAcceptedEpoch()) { // F3的流程,略... } else if (newEpoch == self.getAcceptedEpoch()) // F3的流程,略... } else { // F3的流程,略... } // F3流程结束,F4流程开始 // 直接根据F3流程获取的epoch字节数据响应ACKEPOCH类型消息 QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); // 发送消息 writePacket(ackNewEpoch, true); // 返回新的epoch信息后F4流程结束 return ZxidUtils.makeZxid(newEpoch, 0); } else { // F3的流程,略... return qp.getZxid(); } } }
3.3.2 LearnerHandler - 处理ACKEPOCH信息
LearnerHandler的流程也同样很简单,接收到ACKEPOCH之后再调用Leader对象的确认ACKEPOCH信息的方法,完成超过半数判断,并继续后续流程。源码如下:
public class LearnerHandler extends ZooKeeperThread { @Override public void run() { // 发送LEADERINFO类型消息,H3流程结束,进入F3流程 oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); // 续着之前H3流程,开始H4流程 QuorumPacket ackEpochPacket = new QuorumPacket(); // 读取F4流程发送过来的ACKEPOCH类型消息 ia.readRecord(ackEpochPacket, "packet"); // 如果消息类型不是ACKEPOCH则直接退出终止流程 if (ackEpochPacket.getType() != Leader.ACKEPOCH) { return; } // 从ACKEPOCH类型消息中获取epoch数据 ByteBuffer bbepoch = ByteBuffer.wrap( ackEpochPacket.getData()); // 根据epoch和zxid信息实例化StateSummary对象 ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); // 随后调用Leader对象的waitForEpochAck()方法,校验是否超过半数机器 // 回应了ACKEPOCH类型消息 leader.waitForEpochAck(this.getSid(), ss); }}
3.3.3 Leader - 验证Epoch信息交换
前面的Follower已经向LeanerHandler发送了ACKEPOCH类型的消息,接下来只需要Leader对象验证集群过半机器响应该类型消息便可以后续的流程。源码如下:
public class Leader { // 记录最后面接收的zxid long lastProposed; // 记录新Leader需要发送的消息 Proposal newLeaderProposal = new Proposal(); // 用来保存发送ACKEPOCH类型消息的机器sid作为判断集群响应机器过半的依据 private HashSet<Long> electingFollowers = new HashSet<Long>(); // 用来判断验证ACKEPOCH类型消息是否完成,如果完成会设置为true,否则为false private boolean electionFinished = false; void lead() throws IOException, InterruptedException { // 生成NEWLEADER信息,等待后续发送 newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); // 续着L2流程的部分继续分析,L3流程开始 // 使用Leader机器的zxid信息和currentEpoch信息生成StateSummary对象 // 用来模拟本机器发送了ACKEPOCH消息 leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); // 在这里会阻塞查询electingFollowers集合,用来判断集群内是否有过半的 // 机器响应了ACKEPOCH信息 waitForEpochAck(self.getId(), leaderStateSummary); // 执行到这里说明集群内已经有过半的机器响应了ACKEPOCH类型消息 self.setCurrentEpoch(epoch); } public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException { synchronized(electingFollowers) { // 如果electionFinished为true说明ACKEPOCH信息校验已经完成,直接 // 退出方法返回 if (electionFinished) { return; } // epoch的初始值为-1,不为-1说明才是正常通信的 if (ss.getCurrentEpoch() != -1) { // isMoreRecentThan()方法会判断Follower响应ACKEPOCH消息 // 中的epoch等信息是否大于Leader的,如果大于则说明不符合正常 // 流程,抛出异常结束流程。正常流程Leader的epoch、zxid等肯定是 // 大于或等于Follower的 if (ss.isMoreRecentThan(leaderStateSummary)) { throw new IOException(); } // 如果ACKEPOCH响应的消息没有问题则将其添加到 // electingFollowers集合中,代表该机器已经响应成功 electingFollowers.add(id); } QuorumVerifier verifier = self.getQuorumVerifier(); if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) { // Leader的sid在集合内且响应的机器过半则会执行到这里 // 设置electionFinished属性为true,说明ACKEPOCH信息验证完成 electionFinished = true; // 唤醒阻塞的线程 electingFollowers.notifyAll(); } else { // 执行到这里说明响应机器尚未过半,需要阻塞并等待响应过半 // 唤醒本线程 // 记录阻塞开始时间 long start = System.currentTimeMillis(); long cur = start; // 获取等待结束时间,在start时间加上initLimit和tickTime属性 long end = start + self.getInitLimit()*self.getTickTime(); // 一直轮询其它机器的响应情况,直到有其它的线程接收到响应且过半 // 后将electionFinished设置为true,或者cur当前时间已经超过 // 了end时间,代表响应已经超时了 while(!electionFinished && cur < end) { // 一直等待直到end时间节点到来 electingFollowers.wait(end - cur); // 每次轮询更新cur为当前时间 cur = System.currentTimeMillis(); } // 如果循环结束后electionFinished依旧是false则说明在规定时间 // 内集群没有过半的机器响应ACKEPOCH类型信息,说明同步机器响应 // 信息失败,需要抛出中断异常,并开始下一次的选举 if (!electionFinished) { throw new InterruptedException(); } } } }}
3.4 第四阶段:同步日志数据
第四阶段的源码稍微长点,因为需要将以前的日志信息同步给Follower,但这个阶段Leader并不会参与其中。
3.4.1 LearnerHandler - 发送本机需同步日志数据
在本阶段中将会一次性分析完H5、H6和H7流程,会在代码中特别注释流程的开始和结束。源码如下:
public class LearnerHandler extends ZooKeeperThread { // 需要发送的包数据对象都会保存在这个集合中,直到线程对象轮询发送出去 final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>(); @Override public void run() { // H4流程结束的节点 leader.waitForEpochAck(this.getSid(), ss); // H5流程开始,需要注意的是在流程中zxid对象指的是Follower的zxid // newEpoch对象是经过协商后的集群epoch信息 // 经过前面ACKEPOCH信息后可以确认现在的zxid和epoch都是最新的,赋值给 // 集群最新zxid对象 peerLastZxid = ss.getLastZxid(); // 等下将要发送的数据包类型默认是SNAP类型 int packetToSend = Leader.SNAP; long zxidToSend = 0; long leaderLastZxid = 0; long updates = peerLastZxid; // todo boolean needSnap = syncFollower(peerLastZxid, learnerMaster); // 前面H5流程结束,从现在开始H6流程 // 使用新的epoch信息生成最新的NEWLEADER类型消息 // 如果是老版本则直接发送NEWLEADER类型信息 if (getVersion() < 0x10000) { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); } else { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null); // 如果是3.7版本则添加到queuedPackets集合准备发送 queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); // Start thread that blast packets in the queue to learner // 前面H6流程结束,开始H7流程 // 创建并启动线程对象,用来轮询queuedPackets集合,将需要发送的包对象 // 发送给维护的对应Follower对象 startSendingPackets(); ... } boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) { boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0; // Keep track of the latest zxid which already queued long currentZxid = peerLastZxid; boolean needSnap = true; ZKDatabase db = learnerMaster.getZKDatabase(); boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled(); // 获取日志读锁,并进行上锁操作 ReentrantReadWriteLock lock = db.getLogLock(); ReadLock rl = lock.readLock(); try { rl.lock(); // 获取Leader的minCommittedLog-maxCommittedLog提交日志区间 // Leader的日志一定是最新的,如果上一代集群因为某些原因导致重新选举 // Leader的日志也一定是最多的,因为只有日志最多对的机器才能当上 // Leader,minCommittedLog-maxCommittedLog缓存区间为500 // 当然上述是正常情况,异常情况Follower接收到请求未能同步成功但是 // 有记录下来了,此时Follower的zxid会比Leader可能要大 long maxCommittedLog = db.getmaxCommittedLog(); long minCommittedLog = db.getminCommittedLog(); // 获取Leader的日志提交对象信息 long lastProcessedZxid = db.getDataTreeLastProcessedZxid(); LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{}" + " minCommittedLog=0x{} lastProcessedZxid=0x{}" + " peerLastZxid=0x{}", getSid(), Long.toHexString(maxCommittedLog), Long.toHexString(minCommittedLog), Long.toHexString(lastProcessedZxid), Long.toHexString(peerLastZxid)); if (db.getCommittedLog().isEmpty()) { /* * It is possible that committedLog is empty. In that case * setting these value to the latest txn in learnerMaster db * will reduce the case that we need to handle * * Here is how each case handle by the if block below * 1. lastProcessZxid == peerZxid -> Handle by (2) * 2. lastProcessZxid < peerZxid -> Handle by (3) * 3. lastProcessZxid > peerZxid -> Handle by (5) */ minCommittedLog = lastProcessedZxid; maxCommittedLog = lastProcessedZxid; } if (forceSnapSync) { // Force learnerMaster to use snapshot to sync with follower LOG.warn("Forcing snapshot sync - should not see this in production"); // 如果通过集群交换后的最新zxid和Leader日志文件最大的zxid是一致的 // 则说明经过重新选举之后没有数据丢失,无需同步。集群第一次启动时 // peerLastZxid和日志信息lastProcessedZxid属性肯定是相同的 } else if (lastProcessedZxid == peerLastZxid) { // Follower is already sync with us, send empty diff LOG.info( "Sending DIFF zxid=0x{} for peer sid: {}", Long.toHexString(peerLastZxid), getSid()); // 如果需要同步的消息为空,则发送DIFF类型日志 // 将最新的zxid赋值给待发送zxid对象 queueOpPacket(Leader.DIFF, peerLastZxid); needOpPacket = false; needSnap = false; } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) { // Newer than committedLog, send trunc and done LOG.debug( "Sending TRUNC to follower zxidToSend=0x{} for peer sid:{}", Long.toHexString(maxCommittedLog), getSid()); // 如果Follower机器的最新zxid比Leader的大,则直接发送 // TRUNC类型消息通知Follower需要把多余的部分截掉 queueOpPacket(Leader.TRUNC, maxCommittedLog); // 记录需要截掉的临界点 currentZxid = maxCommittedLog; needOpPacket = false; needSnap = false; // 如果集群的zxid和日志lastProcessedZxid不相等则判断信息的zxid准备需要同步的信息 } else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { // Follower is within commitLog range LOG.info("Using committedLog for peer sid: {}", getSid()); Iterator<Proposal> itr = db.getCommittedLog().iterator(); // 进入到这里面说明集群当前的zxid小于Leader日志信息的最大 // zxid,peerLastZxid-maxCommittedLog这个区间的请求信息 // 需要同步给其它的Follower机器 currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog); needSnap = false; } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) { Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit); if (txnLogItr.hasNext()) { LOG.info("Use txnlog and committedLog for peer sid: {}", getSid()); currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog); if (currentZxid < minCommittedLog) { queuedPackets.clear(); needOpPacket = true; } else { LOG.debug("Queueing committedLog 0x{}", Long.toHexString(currentZxid)); Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator(); currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog); needSnap = false; } } // closing the resources if (txnLogItr instanceof TxnLogProposalIterator) { TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr; txnProposalItr.close(); } } else { } if (needSnap) { currentZxid = db.getDataTreeLastProcessedZxid(); } // 将Leader运行期间接收到的请求从updates起的zxid一起同步给 // Follower,前面几个if判断同步的是日志文件中的历史请求信息,而 // startForwarding()方法则是将Leader最新接收且尚未处理完的转发 // 给对应的Follower leaderLastZxid = learnerMaster.startForwarding(this, currentZxid); } finally { // 读锁解锁 rl.unlock(); } ... return needSnap; } protected long queueCommittedProposals(Iterator<Proposal> itr, long peerLastZxid, Long maxZxid, Long lastCommittedZxid) { boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0; long queuedZxid = peerLastZxid; // as we look through proposals, this variable keeps track of previous // proposal Id. // 设置上次同步消息的prevProposalZxid long prevProposalZxid = -1; // 开始遍历待同步对象数组 while (itr.hasNext()) { Proposal propose = itr.next(); long packetZxid = propose.packet.getZxid(); // abort if we hit the limit if ((maxZxid != null) && (packetZxid > maxZxid)) { break; } // skip the proposals the peer already has // 这个判断是为了去掉区间 // minCommittedLog-peerLastZxid的消息 if (packetZxid < peerLastZxid) { prevProposalZxid = packetZxid; continue; } // If we are sending the first packet, figure out whether to trunc // or diff // 针对首个需要同步的消息做些额外处理 if (needOpPacket) { // Send diff when we see the follower's zxid in our history if (packetZxid == peerLastZxid) { LOG.info( "Sending DIFF zxid=0x{} for peer sid: {}", Long.toHexString(lastCommittedZxid), getSid()); queueOpPacket(Leader.DIFF, lastCommittedZxid); // 处理完之后便设置为false,后面将不会进入 needOpPacket = false; continue; } if (isPeerNewEpochZxid) { // Send diff and fall through if zxid is of a new-epoch LOG.info( "Sending DIFF zxid=0x{} for peer sid: {}", Long.toHexString(lastCommittedZxid), getSid()); queueOpPacket(Leader.DIFF, lastCommittedZxid); needOpPacket = false; } else if (packetZxid > peerLastZxid) { // Peer have some proposals that the learnerMaster hasn't seen yet // it may used to be a leader if (ZxidUtils.getEpochFromZxid(packetZxid) != ZxidUtils.getEpochFromZxid(peerLastZxid)) { // We cannot send TRUNC that cross epoch boundary. // The learner will crash if it is asked to do so. // We will send snapshot this those cases. LOG.warn("Cannot send TRUNC to peer sid: " + getSid() + " peer zxid is from different epoch"); return queuedZxid; } LOG.info( "Sending TRUNC zxid=0x{} for peer sid: {}", Long.toHexString(prevProposalZxid), getSid()); // 如果请求Leader没有则发送TRUNC类型消息 // 将那些日志请求信息都给截掉 // 因为该Follower后面都是需要截掉的消息 // 因此需要发送的zxid到此为止 queueOpPacket(Leader.TRUNC, prevProposalZxid); needOpPacket = false; } } if (packetZxid <= queuedZxid) { // We can get here, if we don't have op packet to queue // or there is a duplicate txn in a given iterator continue; } // Since this is already a committed proposal, we need to follow // it by a commit packet // 将PROPOSAL类型消息先放入待发送集合 // queuedPackets中 queuePacket(propose.packet); // 接着创建一个COMMIT类型消息放入queuedPackets // 集合中,形成PROPOSAL-COMMIT消息对 queueOpPacket(Leader.COMMIT, packetZxid); queuedZxid = packetZxid; } ... return queuedZxid; } protected void startSendingPackets() { if (!sendingThreadStarted) { // Start sending packets new Thread() { public void run() { // 设置线程名称 Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress()); try { // 调用方法轮询queuedPackets集合发送包数据 sendPackets(); } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } } }.start(); sendingThreadStarted = true; } else { LOG.error("Attempting to start sending thread after it already started"); } }}
3.4.2 Follower - 接收保存同步日志数据
前面一口气分析完了LearnerHandler的H5、H6和H7流程,接下来便开始分析F5和F6流程。源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}0
3.5 第五阶段:响应ACK消息
经过前面的四个阶段后Leader已经将需要同步的消息发送给对应的Follower且Follower也已经进行过处理了,之后便需要各Follower机器进行ACK响应,说明本流程已经结束。
3.5.1 Follower - ACK确认
Follower直接发送ACK响应。源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}1
3.5.2 LearnerHandler - 接收ACK确认
当LearnerHandler接收到对应的Follower响应的ACK消息后将会进行相应的处理,并让Leader进行是否过半的校验。源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}2
3.5.3 Leader - 验证ACK信息
集群中的Follower响应了ACK类型消息,LearnerHandler接收后将会调用Leader的方法进行响应过半校验。源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}3
3.6 第六阶段:通知Follower更新状态
当第五阶段结束后LeaderHandler将会发送通知对应的Follower流程结束,更新机器状态的消息。
3.6.1 LearnerHandler - 发送UPTODATE消息
本次续着前面的H8流程,分析一下H9流程,LearnerHandler对象向对应的Follower发送UPTODATE类型消息的过程。源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}4
3.6.2 Follower - 接收处理UPTODATE消息
LeanerHandler发送UPTODATE类型消息后对应的Follower将会收到并进行处理。源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}5
3.7 第七阶段:循环常态化处理
经过前面六个阶段的通信,Leader终于把需要同步的信息同步完成,并且也完成了集群内各个Follower和Leader的zxid、epoch信息同步,确定了超过半数的机器可用等等操作,接下来只需要维持当前的通信状态即可。
3.7.1 Leader - 启动ZooKeeperServer并进入心跳检测
从Leader开始分析其进入循环常态化的代码,这部分代码的作用就是关于启动ZooKeeperServer对象和维持集群内机器的心跳检测。关键源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}6
3.7.2 LearnerHandler - 循环接收对应Follower消息
接下来为LearnerHandler的循环常态化代码,起作用就是维持Leader和各个Learner机器的通信。源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}7
3.7.3 Follower- 循环接收对应LearnerHandler消息
接下来为Follower执行F9和F10流程,开始启动ZooKeeperServer对象以及进入循环常态化来接收LearnerHandler发送过来的消息。源码如下:
public class Follower extends Learner{ // 记录FLE选举流程的开始和结束时间 public long start_fle, end_fle; // 代表集群对象,包含了集群的配置信息以及投票结果等信息 final QuorumPeer self; // 设置同步连接的Socket对象是否禁用延时方式,默认true,Socket的nodelay // 属性设为true的作用便是可以即时的发送小包数据,设置为false则会等待到达 // 一定数据量后才会发送 static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); // 本Follower和Leader通信的Socket对象 protected Socket sock; // 和Leader机器通信的输入输出流对象 protected InputArchive leaderIs; protected OutputArchive leaderOs; protected BufferedOutputStream bufferedOutput; void followLeader() throws InterruptedException { // 当Follower开始跟随领导时代表FLE选举流程结束 self.end_fle = System.currentTimeMillis(); // 归零时间记录 self.start_fle = 0; self.end_fle = 0; try { // 从配置文件配置的信息中心获取Leader的连接信息 InetSocketAddress addr = findLeader(); try { // 开始连接Leader机器 connectToLeader(addr); // 后面略... } } } protected InetSocketAddress findLeader() { InetSocketAddress addr = null; // 从集群对象投票信息中获取当前Leader的信息 Vote current = self.getCurrentVote(); // 从server配置中获取和Leader信息一致的机器连接信息 for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { s.recreateSocketAddresses(); addr = s.addr; break; } } // 返回连接地址对象 return addr; } protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException { // 根据地址连接Leader机器 // 创建Socket对象 sock = new Socket(); // 创建Socket时先设置timeout为tickTime*initLimit,当连接成功后将会 // 设置成tickTime*syncLimit sock.setSoTimeout(self.tickTime * self.initLimit); ... // 使用Socket对象创建对应的输入输出流 leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }}8
至此,建立通信连接的整个26个流程便分析完毕,接下来的流程便是ZK集群接收ZK客户端的请求并在集群内进行同步的操作了,这个留在下篇文章进行分析,至此告一段落。
参考文章
zookeeper3.7版本github源码注释分析## zk源码分析系列Zookeeper原理和源码学习系列\ Zookeeper学习系列\ Zookeeper源码系列
原文:https://juejin.cn/post/7100931496461991973