首页>>后端>>java->Netty源码篇6

Netty源码篇6

时间:2023-12-06 本站 点击:0

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

NioEventLoop 类 的run() 代码 ,无限循环,在服务器端运行

1. 源码demo和基本理解

1.1 服务器启动类源码

public final class EchoServer {    static final boolean SSL = System.getProperty("ssl") != null;    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));    public static void main(String[] args) throws Exception {        // Configure SSL.        final SslContext sslCtx;        if (SSL) {            SelfSignedCertificate ssc = new SelfSignedCertificate();            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();        } else {            sslCtx = null;        }        // Configure the server.        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, 100)                .handler(new LoggingHandler(LogLevel.INFO))                .childHandler(new ChannelInitializer<SocketChannel>() {                    @Override                    public void initChannel(SocketChannel ch) throws Exception {                        ChannelPipeline p = ch.pipeline();                        if (sslCtx != null) {                            p.addLast(sslCtx.newHandler(ch.alloc()));                        }                        //p.addLast(new LoggingHandler(LogLevel.INFO));                        p.addLast(new EchoServerHandler());                    }                });            // Start the server.            ChannelFuture f = b.bind(PORT).sync();            // Wait until the server socket is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down all event loops to terminate all threads.            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}

说明:

先看启动类:main 方法中,首先创建了关于 SSL 的配置类。

重点分析下 创建了两个 EventLoopGroup 对象

 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();

会创建 EventExecutor 数组 children = new EventExecutor[nThreads]; //debug 一下

每个元素的类型就是 NIOEventLoop, NIOEventLoop 实现了 EventLoop 接口 和 Executor 接口

try 块中创建了一个 ServerBootstrap 对象,他是一个引导类,用于启动服务器和引导整个程序的初始化( 看下源码 allows easy bootstrap of {@link ServerChannel} )。和 它和 ServerChannel , 关联, 而 而 ServerChannel 继承了Channel法 ,有一些方法 remoteAddress 等

随后,变量 b 调用了 group 方法将两个 group 放入了自己的字段中,用于后期引导使用【debug 下 group 方法】

(1) 这两个对象是整个 Netty 的核心对象,可以说,整个 Netty 的运作都依赖于他们。bossGroup 用于接受Tcp 请求,他会将请求交给 workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读写解码编码等操作。

(2) EventLoopGroup 是 事件循环组(线程组) 含有多个 EventLoop,可以注册 channel ,用于在事件循环中去进行选择(和选择器相关)

(3) new NioEventLoopGroup(1); 这个 1 表示 bossGroup 事件组有 1 个线程你可以指定,如果 new NioEventLoopGroup() 会含有默认个线程 cpu 核数 * 2, 即可以充分的利用多核的优势

  DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2));

(4) 然后添加了一个 channel,其中参数一个 Class 对象,引导类将通过这个 Class 对象反射创建ChannelFactory。然后添加了一些 TCP 的参数。[说明:Channel 的创建在 bind 方法,可以 Debug 下 bind ,会找到 channel = channelFactory.newChannel(); ]

(5) 再添加了一个服务器专属的日志处理器 handler。

(6) 再添加一个 SocketChannel(不是 ServerSocketChannel)的 handler。

(7) 然后绑定端口并阻塞至连接成功。

(8) 最后 main 线程阻塞等待关闭。

(9) finally 块中的代码将在服务器关闭时优雅关闭所有资源

1.2 服务器端处理器源码

@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();    }}

说明:

这是一个普通的处理器类,用于处理客户端发送来的消息,在我们这里,我们简单的解析出客户端传过来的内容,然后打印,最后发送字符串给客户端。

2. EventLoopGroup 的过程

构造器方法

public NioEventLoopGroup(int nThreads) {    this(nThreads, (Executor) null);}// 上面的 this(nThreads, (Executor) null); 调用构造器 ( 通过 alt+d 看即可)public NioEventLoopGroup(int nThreads, Executor executor) {    this(nThreads, executor, SelectorProvider.provider());}//上面的 this(nThreads, executor, SelectorProvider.provider()); 调用下面构造器public NioEventLoopGroup(    int nThreads, Executor executor, final SelectorProvider selectorProvider) {    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}// 上面的 this ()... 调用构造器(alt+d)public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}//上面的 super() .. 的方法是父类: MultithreadEventLoopGroupprotected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}

追踪到源码抽象类 MultithreadEventExecutorGroup 的构造器方法 MultithreadEventExecutorGroup 才是 NioEventLoopGroup 真正的构造方法, 这里可以看成是一个模板方法,使用了设计模式的模板模式。

3. MultithreadEventExecutorGroup

参数说明@param nThreads 使用的线程数,默认为 core *2 [可以追踪源码]@param executor执行器 : 如果传入 null,则采用 Netty 默认的线程工厂和默认的执行器ThreadPerTaskExecutor@param chooserFactory 单例 new DefaultEventExecutorChooserFactory()@param args args 在创建执行器的时候传入固定参数protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {    // 1.初始化线程池    if (nThreads <= 0) {        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));    }    if (executor == null) { //如果传入的执行器是空的则采用默认的线程工厂和默认的执行器        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());    }    //创建指定线程数的执行器数组    children = new EventExecutor[nThreads];    //初始化线程数组    for (int i = 0; i < nThreads; i ++) {        boolean success = false;        try {            // 创建 new NioEventLoop            children[i] = newChild(executor, args);            success = true;        } catch (Exception e) {            // TODO: Think about if this is a good exception type            throw new IllegalStateException("failed to create a child event loop", e);        } finally {            // 如果创建失败,优雅关闭            if (!success) {                for (int j = 0; j < i; j ++) {                    children[j].shutdownGracefully();                }                for (int j = 0; j < i; j ++) {                    EventExecutor e = children[j];                    try {                        while (!e.isTerminated()) {                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);                        }                    } catch (InterruptedException interrupted) {                        // Let the caller handle the interruption.                        Thread.currentThread().interrupt();                        break;                    }                }            }        }    }    /** 2.实例化线程⼯⼚执⾏器选择器: 根据children获取选择器 */    chooser = chooserFactory.newChooser(children);    /** 3.为每个EventLoop线程添加线程终⽌监听器*/    final FutureListener<Object> terminationListener = new FutureListener<Object>() {        @Override        public void operationComplete(Future<Object> future) throws Exception {            if (terminatedChildren.incrementAndGet() == children.length) {                terminationFuture.setSuccess(null);            }        }    };    //为每一个单例线程池添加一个关闭监听器    for (EventExecutor e: children) {        e.terminationFuture().addListener(terminationListener);    }    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);    //将所有的单例线程池添加到一个 HashSet 中。    Collections.addAll(childrenSet, children);    readonlyChildren = Collections.unmodifiableSet(childrenSet);}

说明:

如果 executor 是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。

根据传入的线程数(CPU * 2)创建一个线程池(单例线程池)数组。

循环填充数组中的元素。如果异常,则关闭所有的单例线程池。

根据线程选择工厂创建一个 线程选择器

为每一个单例线程池添加一个关闭监听器。

将所有的单例线程池添加到一个 HashSet 中。

4. ServerBootstrap 创建和构造过程

ServerBootstrap 是个空构造,但是有默认的成员变量

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();//config 对象,会在后面起很大作用private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);private volatile EventLoopGroup childGroup;private volatile ChannelHandler childHandler;

ServerBootstrap 基本使用情况:

ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)    .channel(NioServerSocketChannel.class)    .option(ChannelOption.SO_BACKLOG, 100)    .handler(new LoggingHandler(LogLevel.INFO))    .childHandler(new ChannelInitializer<SocketChannel>() {        @Override        public void initChannel(SocketChannel ch) throws Exception {            ChannelPipeline p = ch.pipeline();            if (sslCtx != null) {                p.addLast(sslCtx.newHandler(ch.alloc()));            }            //p.addLast(new LoggingHandler(LogLevel.INFO));            p.addLast(new EchoServerHandler());        }    });

说明:

链式调用:group 方法,将 boss 和 worker 传入,boss 赋值给 parentGroup 属性,worker 赋值给 childGroup属性

channel 方法传入 NioServerSocketChannel class 对象。会根据这个 class 创建 channel 对象。

option 方法传入 TCP 参数,放在一个 LinkedHashMap 中。

handler 方法传入一个 handler 中,这个 hanlder 只专属于 ServerSocketChannel 而不是SocketChannel

childHandler 传入一个 hanlder ,这个 handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用

5. 绑定端口

服务器就是在这个 bind 方法里启动完成的

bind 方法代码, 追踪到创建了一个端口对象,并做了一些空判断, 核心代码 doBind

public ChannelFuture bind(SocketAddress localAddress) {    validate();    if (localAddress == null) {        throw new NullPointerException("localAddress");    }    return doBind(localAddress);}

doBind 源码剖析, 核心是两个方法 initAndRegister 和 doBind0

private ChannelFuture doBind(final SocketAddress localAddress) {    // todo 初始化和注册    final ChannelFuture regFuture = initAndRegister();    final Channel channel = regFuture.channel();    if (regFuture.cause() != null) {        return regFuture;    }    if (regFuture.isDone()) {        // 如果注册已经成功        // At this point we know that the registration was complete and successful.        ChannelPromise promise = channel.newPromise();        // todo 执行doBind()方法,完成对端口的绑定        doBind0(regFuture, channel, localAddress, promise);        return promise;    } else {        // 如果注册尚未完成        // Registration future is almost always fulfilled already, but just in case it's not.        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);        regFuture.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                Throwable cause = future.cause();                if (cause != null) {                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                    // IllegalStateException once we try to access the EventLoop of the Channel.                    promise.setFailure(cause);                } else {                    // Registration was successful, so set the correct executor to use.                    // See https://github.com/netty/netty/issues/2586                    promise.registered();                    doBind0(regFuture, channel, localAddress, promise);                }            }        });        return promise;    }}

6. initAndRegister

 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();0

说明:

基本说明: initAndRegister() 初始化 NioServerSocketChannel 通道并注册各个 handler,返回一个future

通过 ServerBootstrap 的通道工厂反射创建一个 NioServerSocketChannel。

init 初始化这个 NioServerSocketChannel。

config().group().register(channel) 通过 ServerBootstrap 的 bossGroup 注册NioServerSocketChannel。

最后,返回这个异步执行的占位符即 regFuture。

init方法 会调用 addLast, 现在进入到 addLast 方法内查看

 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();1

说明:

addLast 方法,在 DefaultChannelPipeline 类中

addLast 方法这就是 pipeline 方法的核心

检查该 handler 是否符合标准。

创 建 一 个 AbstractChannelHandlerContext 对 象 ,这 里 说 一 下 ,ChannelHandlerContext 对 象 是ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 Pipeline 中时,都会创建Context。Context 的主要功能是管理他所关联的 Handler 和同一个 Pipeline 中的其他 Handler 之间的交互。

将 Context 添加到链表中。也就是追加到 tail 节点的前面。

最后,同步或者异步或者晚点异步的调用 callHandlerAdded0 方法

前面说了 dobind 方法有 2 个重要的步骤,initAndRegister 说完,接下来看 doBind0 方法, 代码如下

 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();2

说明:

该方法的参数为 initAndRegister 的 future,NioServerSocketChannel,端口地址,NioServerSocketChannel 的promise

这里就可以根据前面下的断点,一直 debug:

 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();3

继续追踪 AbstractChannel  public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //.... try { //!!!!旗 小红旗 是 可以看到,这里最终的方法就是 doBind 方法,执行成功后,执行通道的 fireChannelActive 的 方法,告诉所有的 handler ,已经成功绑定。 doBind(localAddress);// } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { // bind方法执行成功后,执行通道的fireChannelActive方法 // 也就是所有handler 的channelActive()方法 pipeline.fireChannelActive(); } }); } // safeSetSuccess(promise),告诉 promise 任务成功了。其可以执行监听器的方法了。 safeSetSuccess(promise); }

 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();4

回到 bind 方法

最后一步:safeSetSuccess(promise),告诉 promise 任务成功了。其可以执行监听器的方法了。

继续,服务器就回进入到(NioEventLoop 类)一个循环代码,进行监听

 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();5

7. Netty 启动过程梳理

创建 2 个 EventLoopGroup 线程池数组。数组默认大小 CPU* 2,方便 chooser 选择线程池时提高性能

BootStrap 将 boss 设置为 group 属性,将 worker 设置为 childer 属性

通过 bind 方法启动,内部重要方法为 initAndRegister 和 dobind 方法

initAndRegister 方法会反射创建 NioServerSocketChannel 及其相关的 NIO 的对象, pipeline , unsafe,同时也为 pipeline 初始了 head 节点和 tail 节点。

在 register0 方法成功以后调用在 dobind 方法中调用 doBind0 方法,该方法会 调用 NioServerSocketChannel的 doBind 方法对 JDK 的 channel 和端口进行绑定,完成 Netty 服务器的所有启动,并开始监听连接事件

参考文档

Netty学习和源码分析github地址Netty从入门到精通视频教程(B站) Netty权威指南 第二版

原文:https://juejin.cn/post/7101667677873127432


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/15941.html