首页>>后端>>java->netty系列:聊聊Netty中的超时处理与心跳机制

netty系列:聊聊Netty中的超时处理与心跳机制

时间:2023-11-30 本站 点击:0

在网络通信中,网络链路是不稳定的,会经常发生异常,而异常的表现就是请求超时或者响应超时。这类异常对系统的可靠性产生重大影响。那么怎么监测通信异常呢?监测异常后又怎么处理呢?这本就来聊聊超时处理这个问题。

超时监测

Netty 的超时类型 IdleState 主要分为以下3类:

ALL_IDLE : 一段时间内没有数据接收或者发送。

READER_IDLE : 一段时间内没有数据接收。

WRITER_IDLE : 一段时间内没有数据发送。

针对上面的 3 类超时异常,Netty 提供了 3 类ChannelHandler来进行监测。

IdleStateHandler : 当 Channel 一段时间未执行读取、写入或者两者都未执行时,触发 IdleStateEvent 事件。

ReadTimeoutHandler :在一定时间内未读取任何数据时,引发 ReadTimeoutEvent 事件。

WriteTimeoutHandler :当写操作在一定时间内无法完成时,引发 WriteTimeoutEvent 事件。

IdleStateHandler类

IdleStateHandler 包括了读\写超时状态处理,观察以下 IdleStateHandler 类的构造函数源码。

publicIdleStateHandler(intreaderIdleTimeSeconds,intwriterIdleTimeSeconds,intallIdleTimeSeconds){this((long)readerIdleTimeSeconds,(long)writerIdleTimeSeconds,(long)allIdleTimeSeconds,TimeUnit.SECONDS);}publicIdleStateHandler(longreaderIdleTime,longwriterIdleTime,longallIdleTime,TimeUnitunit){this(false,readerIdleTime,writerIdleTime,allIdleTime,unit);}publicIdleStateHandler(booleanobserveOutput,longreaderIdleTime,longwriterIdleTime,longallIdleTime,TimeUnitunit){this.writeListener=newChannelFutureListener(){publicvoidoperationComplete(ChannelFuturefuture)throwsException{IdleStateHandler.this.lastWriteTime=IdleStateHandler.this.ticksInNanos();IdleStateHandler.this.firstWriterIdleEvent=IdleStateHandler.this.firstAllIdleEvent=true;}};this.firstReaderIdleEvent=true;this.firstWriterIdleEvent=true;this.firstAllIdleEvent=true;ObjectUtil.checkNotNull(unit,"unit");this.observeOutput=observeOutput;if(readerIdleTime<=0L){this.readerIdleTimeNanos=0L;}else{this.readerIdleTimeNanos=Math.max(unit.toNanos(readerIdleTime),MIN_TIMEOUT_NANOS);}if(writerIdleTime<=0L){this.writerIdleTimeNanos=0L;}else{this.writerIdleTimeNanos=Math.max(unit.toNanos(writerIdleTime),MIN_TIMEOUT_NANOS);}if(allIdleTime<=0L){this.allIdleTimeNanos=0L;}else{this.allIdleTimeNanos=Math.max(unit.toNanos(allIdleTime),MIN_TIMEOUT_NANOS);}}

在上述源码中,构造函数可以接收以下参数:

readerIdleTimeSecond:指定读超时时间,指定 0 表明为禁用。

writerIdleTimeSecond:指定写超时时间,指定 0 表明为禁用。

allIdleTimeSecond:在指定读写超时时间,指定 0 表明为禁用。

IdleStateHandler 使用示例:

publicclassMyChannelInitializerextendsChannelInitializer<Channel>{@OverrideprotectedvoidinitChannel(Channelchannel)throwsException{channel.pipeline().addLast("idleStateHandler",newIdleStateHandler(60,30,0));channel.pipeline().addLast("myHandler",newMyHandler());}}publicclassMyHandlerextendsChannelDuplexHandler{@OverridepublicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{if(evtinstanceofIdleStateEvent){IdleStateEvente=(IdleStateEvent)evt;if(e.state()==IdleState.READER_IDLE){ctx.close();}elseif(e.state()==IdleState.WRITER_IDLE){ctx.writeAndFlush(newPingMessage());}}}}

在上述示例中,IdleStateHandler 设置了读超时时间为 60 秒,写超时时间为 30 秒。MyHandler 是针对超时事件 IdleStateEvent 的处理。

如果 30 秒内没有出站流量(写超时)时发送 ping 消息的示例。

如果 60 秒内没有入站流量(读超时)时,连接关闭。

ReadTimeoutHandler类

ReadTimeoutHandler 类包括了读超时状态处理。ReadTimeoutHandler 类的源码如下:

publicclassReadTimeoutHandlerextendsIdleStateHandler{privatebooleanclosed;publicReadTimeoutHandler(inttimeoutSeconds){this((long)timeoutSeconds,TimeUnit.SECONDS);}publicReadTimeoutHandler(longtimeout,TimeUnitunit){super(timeout,0L,0L,unit);//禁用了写超时、读写超时}protectedfinalvoidchannelIdle(ChannelHandlerContextctx,IdleStateEventevt)throwsException{assertevt.state()==IdleState.READER_IDLE;//只处理读超时this.readTimedOut(ctx);}protectedvoidreadTimedOut(ChannelHandlerContextctx)throwsException{if(!this.closed){ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);//引发异常ctx.close();this.closed=true;}}}

从上述源码可以看出,ReadTimeoutHandler 继承自 IdleStateHandler,并在构造函数中禁用了写超时、读写超时,而且在处理超时时,只会针对 READER_IDLE状态进行处理,并引发 ReadTimeoutException 异常。

ReadTimeoutHandler 的使用示例如下:

publicclassMyChannelInitializerextendsChannelInitializer<Channel>{@OverrideprotectedvoidinitChannel(Channelchannel)throwsException{channel.pipeline().addLast("readTimeoutHandler",newReadTimeoutHandler(30));channel.pipeline().addLast("myHandler",newMyHandler());}}//处理器处理ReadTimeoutExceptionpublicclassMyHandlerextendsChannelDuplexHandler{@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{if(causeinstanceofReadTimeoutException){//...}else{super.exceptionCaught(ctx,cause);}}}

在上述示例中,ReadTimeoutHandler 设置了读超时时间是 30 秒。

WriteTimeoutHandler类

WriteTimeoutHandler 类包括了写超时状态处理。WriteTimeoutHandler 类的源码如下:

publicclassWriteTimeoutHandlerextendsChannelOutboundHandlerAdapter{privatestaticfinallongMIN_TIMEOUT_NANOS;privatefinallongtimeoutNanos;privateWriteTimeoutHandler.WriteTimeoutTasklastTask;privatebooleanclosed;publicWriteTimeoutHandler(inttimeoutSeconds){this((long)timeoutSeconds,TimeUnit.SECONDS);}publicWriteTimeoutHandler(longtimeout,TimeUnitunit){ObjectUtil.checkNotNull(unit,"unit");if(timeout<=0L){this.timeoutNanos=0L;}else{this.timeoutNanos=Math.max(unit.toNanos(timeout),MIN_TIMEOUT_NANOS);}}publicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise)throwsException{if(this.timeoutNanos>0L){promise=promise.unvoid();this.scheduleTimeout(ctx,promise);}ctx.write(msg,promise);}publicvoidhandlerRemoved(ChannelHandlerContextctx)throwsException{WriteTimeoutHandler.WriteTimeoutTasktask=this.lastTask;WriteTimeoutHandler.WriteTimeoutTaskprev;for(this.lastTask=null;task!=null;task=prev){task.scheduledFuture.cancel(false);prev=task.prev;task.prev=null;task.next=null;}}privatevoidscheduleTimeout(ChannelHandlerContextctx,ChannelPromisepromise){WriteTimeoutHandler.WriteTimeoutTasktask=newWriteTimeoutHandler.WriteTimeoutTask(ctx,promise);task.scheduledFuture=ctx.executor().schedule(task,this.timeoutNanos,TimeUnit.NANOSECONDS);if(!task.scheduledFuture.isDone()){this.addWriteTimeoutTask(task);promise.addListener(task);}}privatevoidaddWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTasktask){if(this.lastTask!=null){this.lastTask.next=task;task.prev=this.lastTask;}this.lastTask=task;}privatevoidremoveWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTasktask){if(task==this.lastTask){asserttask.next==null;this.lastTask=this.lastTask.prev;if(this.lastTask!=null){this.lastTask.next=null;}}else{if(task.prev==null&&task.next==null){return;}if(task.prev==null){task.next.prev=null;}else{task.prev.next=task.next;task.next.prev=task.prev;}}task.prev=null;task.next=null;}protectedvoidwriteTimedOut(ChannelHandlerContextctx)throwsException{if(!this.closed){ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);ctx.close();this.closed=true;}}//...}

从上述源码可以看出,WriteTimeoutHandler 在处理超时时,引发了 WriteTimeoutException 异常。

WriteTimeoutHandler 的使用示例如下:

publicclassMyChannelInitializerextendsChannelInitializer<Channel>{@OverrideprotectedvoidinitChannel(Channelchannel)throwsException{channel.pipeline().addLast("writeTimeoutHandler",newWriteTimeoutHandler(30));channel.pipeline().addLast("myHandler",newMyHandler());}}//处理器处理ReadTimeoutExceptionpublicclassMyHandlerextendsChannelDuplexHandler{@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{if(causeinstanceofWriteTimeoutException){//...}else{super.exceptionCaught(ctx,cause);}}}

在上述示例中,WriteTimeoutHandler 设置了写超时时间是 30 秒。

实现心跳机制

针对超时的解决方案——心跳机制。

在程序开发中,心跳机制是非常常见的。其原理是,当连接闲置时可以发送一个心跳来维持连接。一般而言,心跳就是一段小的通信。

1.定义心跳处理器

publicclassHeartbeatServerHandlerextendsChannelInboundHandlerAdapter{//(1)心跳内容privatestaticfinalByteBufHEARTBEAT_SEQUENCE=Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",CharsetUtil.UTF_8));@OverridepublicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{//(2)判断超时类型if(evtinstanceofIdleStateEvent){IdleStateEventevent=(IdleStateEvent)evt;Stringtype="";if(event.state()==IdleState.READER_IDLE){type="readidle";}elseif(event.state()==IdleState.WRITER_IDLE){type="writeidle";}elseif(event.state()==IdleState.ALL_IDLE){type="allidle";}//(3)发送心跳ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);System.out.println(ctx.channel().remoteAddress()+"超时类型:"+type);}else{super.userEventTriggered(ctx,evt);}}}

对上述代码说明:

定义了心跳时,要发送的内容。

判断是不是 IdleStateEvent 事件,是则处理。

将心跳内容发送给客户端。

2.定义 ChannelInitializer

HeartbeatHandlerInitializer用于封装各类ChannelHandler,代码如下:

publicclassHeartbeatHandlerInitializerextendsChannelInitializer<Channel>{privatestaticfinalintREAD_IDEL_TIME_OUT=4;//读超时privatestaticfinalintWRITE_IDEL_TIME_OUT=5;//写超时privatestaticfinalintALL_IDEL_TIME_OUT=7;//所有超时@OverrideprotectedvoidinitChannel(Channelch)throwsException{ChannelPipelinepipeline=ch.pipeline();pipeline.addLast(newIdleStateHandler(READ_IDEL_TIME_OUT,WRITE_IDEL_TIME_OUT,ALL_IDEL_TIME_OUT,TimeUnit.SECONDS));//(1)pipeline.addLast(newHeartbeatServerHandler());//(2)}}

对上述代码说明如下:

添加了一个IdleStateHandler到 ChannelPipeline,并分别设置了读、写超时的时间。为了方便演示,将超时时间设置的比较短。

添加了HeartbeatServerHandler,用来处理超时时,发送心跳。

3.编写服务器

服务器代码比较简单,启动后侦听 8083 端口。

publicfinalclassHeartbeatServer{staticfinalintPORT=8083;publicstaticvoidmain(String[]args)throwsException{//配置服务器EventLoopGroupbossGroup=newNioEventLoopGroup(1);EventLoopGroupworkerGroup=newNioEventLoopGroup();try{ServerBootstrapb=newServerBootstrap();b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(newLoggingHandler(LogLevel.INFO)).childHandler(newHeartbeatHandlerInitializer());//启动ChannelFuturef=b.bind(PORT).sync();f.channel().closeFuture().sync();}finally{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

4.测试

首先启动 HeartbeatServer,客户端用操作系统自带的 Telnet 程序即可:

telnet127.0.0.18083

可以看到客户端与服务器的交互效果如下图。

结语

文章如果对你有帮助,看完记得点赞、关注、收藏哟。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/4953.html