1. 源码剖析
说明:
从之前服务器启动的源码中,我们得知,服务器最终注册了一个 Accept 事件等待客户端的连接。NioServerSocketChannel 将自己注册到了 boss 单例线程池(reactor 线程)上,也就是 EventLoop
EventLoop 的作用是一个死循环,而这个循环中做 3 件事情:
有条件的等待 Nio 事件。
处理 Nio 事件。
处理消息队列中的任务。
仍用前面的项目来分析:进入到 NioEventLoop 源码中后,在 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法开始调试最终我们要分析到 AbstractNioChannel 的 doBeginRead 方法, 当到这个方法时,针对于这个客户端的连接就完成了,接下来就可以监听读事件了
2. 源码分析过程
断点位置 NioEventLoop 的如下方法 processSelectedKey
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); //断点位置 }
执行浏览器 http://localhost:8007/, 客户端发出请求
从的断点我们可以看到, readyOps 是 16 ,也就是 Accept 事件。说明浏览器的请求已经进来了
这个 unsafe 是 boss 线程中 NioServerSocketChannel 的 AbstractNioMessageChannel $ NioMessageUnsafe 对象。进入到 AbstractNioMessageChannel $ NioMessageUnsafe 的 read 方法中
read 方法代码并分析
@Overridepublic void read() { // 1.检查改eventLoop是否是当前线程 assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // todo 2.执行doReadMessages方法,传入一个readBuf变量(list容器) int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (continueReading(allocHandle)); } catch (Throwable t) { exception = t; } int size = readBuf.size(); // 3. 循环容器,执行 pipeline.fireChannelRead()方法 for (int i = 0; i < size; i ++) { readPending = false; // todo pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); ... } ...}
说明:
检查该 eventloop 线程是否是当前线程。assert eventLoop().inEventLoop()
执行 doReadMessages 方法
,并传入一个 readBuf 变量,这个变量是一个 List,也就是容器。
循环容器,执行 pipeline.fireChannelRead(readBuf.get(i));
doReadMessages 是读取 boss 线程中的 NioServerSocketChannel 接受到的请求。并把这些请求放进容器,一会我们 debug 下 doReadMessages 方法
.
循环遍历 容器中的所有请求,调用 pipeline 的 fireChannelRead 方法,用于处理这些接受的请求或者其他事件,在 read 方法中,循环调用 ServerSocket 的 pipeline 的 fireChannelRead 方法, 开始执行 管道中的handler 的 ChannelRead 方法(debug 进入)
doReadMessages 方法
//接收客户端的请求会调用到的方法@Overrideprotected int doReadMessages(List<Object> buf) throws Exception { //调用java底层的accept方法 SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //可以看出这里新建了个NioSocketChannel对象,并把当前对象 当成新创建对象的父对象 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0;}
说明:
通过工具类,调用 NioServerSocketChannel 内部封装的 serverSocketChannel 的 accept 方法,这是 Nio 做法。
获取到一个 JDK 的 SocketChannel,然后,使用 NioSocketChannel 进行封装。最后添加到容器中
这样容器 buf 中就有了 NioSocketChannel
回到 read 方法,继续分析 循环执行 pipeline.fireChannelRead 方法
前面分析 doReadMessages 方法的作用是通过 ServerSocket 的 accept 方法获取到 Tcp 连接,然后封装成Netty 的 NioSocketChannel 对象。最后添加到 容器中
在 read 方法中,循环调用 ServerSocket 的 pipeline 的 fireChannelRead 方法, 开始执行 管道中的 handler的 ChannelRead 方法(debug 进入)
经过 dubug (多次),可以看到会反复执行多个 handler 的 ChannelRead ,我们知道,pipeline 里面又 4 个handler ,分别是 Head ,LoggingHandler ,ServerBootstrapAcceptor ,Tail
。
重点看看 ServerBootstrapAcceptor。debug 之后,断点会进入到 ServerBootstrapAcceptor 中来。我们来看看 ServerBootstrapAcceptor 的 channelRead 方法(要多次 debug 才可以)
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { // 1. msg强转成Channel,实际上就是好NioSocketChannel final Channel child = (Channel) msg; // 2. 添加NioSocketChannel 的pipeline 的handler,就是我们main方法里面 // 设置的childHandler child.pipeline().addLast(childHandler); // 3. 设置NioSocketChannel各种属性和参数 setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { // todo 将客户端注册到work线程池 // 4.将该NioSocketChannel注册到childGroup中的一个EventLoop上,并添加一个监听器 // 这个childGroup 就是我们main方法创建的数组 workGroup childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
说明:
msg 强转成 Channel ,实际上就是 NioSocketChannel 。
添加 NioSocketChannel 的 pipeline 的 handler ,就是我们 main 方法里面设置的 childHandler 方法里的。
设置 NioSocketChannel 的各种属性。
将该 NioSocketChannel 注册到 childGroup 中的一个 EventLoop 上,并添加一个监听器。
这个 childGroup 就是我们 main 方法创建的数组 workerGroup。
register 方法
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise);//进入到这里 } }); }}
最终会调用 doBeginRead 方法,也就是 AbstractNioChannel 类的方法
//通过SelectionKey 设置注册读事件@Overrideprotected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); }}
这个地方调试时,请把前面的断点都去掉,然后启动服务器就会停止在 doBeginRead(需要先放过该断点,然后浏览器请求,才能看到效果)
执行到这里时,针对于这个客户端的连接就完成了,接下来就可以监听读事件
了
3. Netty 接收请求过程梳理
总体流程:接受连接----->创建一个新的 NioSocketChannel------>注册到一个 worker EventLoop 上---->注册 selecot Read 事件
服务器轮询 Accept 事件,获取事件后调用 unsafe 的 read 方法,这个 unsafe 是 ServerSocket 的内部类,该方法内部由 2 部分组成
doReadMessages 用于创建 NioSocketChannel 对象,该对象包装 JDK 的 Nio Channel 客户端。该方法会像创建 ServerSocketChanel 类似创建相关的 pipeline , unsafe,config
随后执行 执行 pipeline.fireChannelRead 方法,并将自己绑定到一个 chooser 选择器选择的 workerGroup 中的一个 EventLoop。并且注册一个读(1)事件。
参考文档
Netty学习和源码分析github地址Netty从入门到精通视频教程(B站) Netty权威指南 第二版
原文:https://juejin.cn/post/7101668207475294216