斷線重接最好用的方法「netty斷線重連原理」

心跳機制

何為心跳

所謂心跳, 即在 TCP 長連接中, 客戶端和服務器之間定期發送的一種特殊的數據包, 通知對方自己還在線, 以確保 TCP 連接的有效性.

註:心跳包還有另一個作用,經常被忽略,即:一個連接如果長時間不用,防火牆或者路由器就會斷開該連接。

如何實現

核心Handler —— IdleStateHandler

在 Netty 中, 實現心跳機制的關鍵是 IdleStateHandler, 那麼這個 Handler 如何使用呢? 先看下它的構造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
 this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

這裡解釋下三個參數的含義:

  • readerIdleTimeSeconds: 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到數據時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds: 寫超時. 即當在指定的時間間隔內沒有數據寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds: 讀/寫超時. 即當在指定的時間間隔內沒有讀或寫操作時, 會觸發一個ALL_IDLE 的 IdleStateEvent 事件.

註:這三個參數默認的時間單位是秒。若需要指定其他時間單位,可以使用另一個構造方法:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

在看下面的實現之前,建議先了解一下IdleStateHandler的實現原理。

下面直接上代碼,需要注意的地方,會在代碼中通過注釋進行說明。

使用IdleStateHandler實現心跳

下面將使用IdleStateHandler來實現心跳,Client端連接到Server端後,會循環執行一個任務:隨機等待幾秒然後ping一下Server即發送一個心跳包。當等待的時間超過規定時間,將會發送失敗,以為Server端在此之前已經主動斷開連接了。代碼如下:

Client端

ClientIdleStateTrigger —— 心跳觸發器

類ClientIdleStateTrigger也是一個Handler,只是重寫了userEventTriggered方法,用於捕獲IdleState.WRITER_IDLE事件(未在指定時間內向服務器發送數據),然後向Server端發送一個心跳包。

/**
 * <p>
 * 用於捕獲{@link IdleState#WRITER_IDLE}事件(未在指定時間內向服務器發送數據),然後向<code>Server</code>端發送一個心跳包。
 * </p>
 */
public class ClientIdleStateTrigger extends ChannelInboundHandlerAdapter {
 public static final String HEART_BEAT = "heart beat!";
 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
 if (evt instanceof IdleStateEvent) {
 IdleState state = ((IdleStateEvent) evt).state();
 if (state == IdleState.WRITER_IDLE) {
 // write heartbeat to server
 ctx.writeAndFlush(HEART_BEAT);
 }
 } else {
 super.userEventTriggered(ctx, evt);
 }
 }
}

Pinger —— 心跳發射器

/**
 * <p>客戶端連接到服務器端後,會循環執行一個任務:隨機等待幾秒,然後ping一下Server端,即發送一個心跳包。</p>
 */
public class Pinger extends ChannelInboundHandlerAdapter {
 private Random random = new Random();
 private int baseRandom = 8;
 private Channel channel;
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
 super.channelActive(ctx);
 this.channel = ctx.channel();
 ping(ctx.channel());
 }
 private void ping(Channel channel) {
 int second = Math.max(1, random.nextInt(baseRandom));
 System.out.println("next heart beat will send after " + second + "s.");
 ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() {
 @Override
 public void run() {
 if (channel.isActive()) {
 System.out.println("sending heart beat to the server...");
 channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
 } else {
 System.err.println("The connection had broken, cancel the task that will send a heart beat.");
 channel.closeFuture();
 throw new RuntimeException();
 }
 }
 }, second, TimeUnit.SECONDS);
 future.addListener(new GenericFutureListener() {
 @Override
 public void operationComplete(Future future) throws Exception {
 if (future.isSuccess()) {
 ping(channel);
 }
 }
 });
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 // 當Channel已經斷開的情況下, 仍然發送數據, 會拋異常, 該方法會被調用.
 cause.printStackTrace();
 ctx.close();
 }
}

ClientHandlersInitializer —— 客戶端處理器集合的初始化類

public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> {
 private ReconnectHandler reconnectHandler;
 private EchoHandler echoHandler;
 public ClientHandlersInitializer(TcpClient tcpClient) {
 Assert.notNull(tcpClient, "TcpClient can not be null.");
 this.reconnectHandler = new ReconnectHandler(tcpClient);
 this.echoHandler = new EchoHandler();
 }
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ChannelPipeline pipeline = ch.pipeline();
 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
 pipeline.addLast(new LengthFieldPrepender(4));
 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
 pipeline.addLast(new Pinger());
 }
}

註: 上面的Handler集合,除了Pinger,其他都是編解碼器和解決粘包,可以忽略。

TcpClient —— TCP連接的客戶端

public class TcpClient {
 private String host;
 private int port;
 private Bootstrap bootstrap;
 /** 將<code>Channel</code>保存起來, 可用於在其他非handler的地方發送數據 */
 private Channel channel;
 public TcpClient(String host, int port) {
 this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000));
 }
 public TcpClient(String host, int port, RetryPolicy retryPolicy) {
 this.host = host;
 this.port = port;
 init();
 }
 /**
 * 向遠程TCP服務器請求連接
 */
 public void connect() {
 synchronized (bootstrap) {
 ChannelFuture future = bootstrap.connect(host, port);
 this.channel = future.channel();
 }
 }
 private void init() {
 EventLoopGroup group = new NioEventLoopGroup();
 // bootstrap 可重用, 只需在TcpClient實例化的時候初始化即可.
 bootstrap = new Bootstrap();
 bootstrap.group(group)
 .channel(NioSocketChannel.class)
 .handler(new ClientHandlersInitializer(TcpClient.this));
 }
 public static void main(String[] args) {
 TcpClient tcpClient = new TcpClient("localhost", 2222);
 tcpClient.connect();
 }
}

Server端

ServerIdleStateTrigger —— 斷連觸發器

/**
 * <p>在規定時間內未收到客戶端的任何數據包, 將主動斷開該連接</p>
 */
public class ServerIdleStateTrigger extends ChannelInboundHandlerAdapter {
 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
 if (evt instanceof IdleStateEvent) {
 IdleState state = ((IdleStateEvent) evt).state();
 if (state == IdleState.READER_IDLE) {
 // 在規定時間內沒有收到客戶端的上行數據, 主動斷開連接
 ctx.disconnect();
 }
 } else {
 super.userEventTriggered(ctx, evt);
 }
 }
}

ServerBizHandler —— 服務器端的業務處理器

/**
 * <p>收到來自客戶端的數據包後, 直接在控制台打印出來.</p>
 */
@ChannelHandler.Sharable
public class ServerBizHandler extends SimpleChannelInboundHandler<String> {
 private final String REC_HEART_BEAT = "I had received the heart beat!";
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, String data) throws Exception {
 try {
 System.out.println("receive data: " + data);
// ctx.writeAndFlush(REC_HEART_BEAT);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
 System.out.println("Established connection with the remote client.");
 // do something
 ctx.fireChannelActive();
 }
 @Override
 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 System.out.println("Disconnected with the remote client.");
 // do something
 ctx.fireChannelInactive();
 }
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.close();
 }
}

ServerHandlerInitializer —— 服務器端處理器集合的初始化類

/**
 * <p>用於初始化服務器端涉及到的所有<code>Handler</code></p>
 */
public class ServerHandlerInitializer extends ChannelInitializer<SocketChannel> {
 protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 0, 0));
 ch.pipeline().addLast("idleStateTrigger", new ServerIdleStateTrigger());
 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
 ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
 ch.pipeline().addLast("decoder", new StringDecoder());
 ch.pipeline().addLast("encoder", new StringEncoder());
 ch.pipeline().addLast("bizHandler", new ServerBizHandler());
 }
}

註:new IdleStateHandler(5, 0, 0)該handler代表如果在5秒內沒有收到來自客戶端的任何數據包(包括但不限於心跳包),將會主動斷開與該客戶端的連接。

TcpServer —— 服務器端

public class TcpServer {
 private int port;
 private ServerHandlerInitializer serverHandlerInitializer;
 public TcpServer(int port) {
 this.port = port;
 this.serverHandlerInitializer = new ServerHandlerInitializer();
 }
 public void start() {
 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap bootstrap = new ServerBootstrap();
 bootstrap.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(this.serverHandlerInitializer);
 // 綁定端口,開始接收進來的連接
 ChannelFuture future = bootstrap.bind(port).sync();
 System.out.println("Server start listen at " + port);
 future.channel().closeFuture().sync();
 } catch (Exception e) {
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 e.printStackTrace();
 }
 }
 public static void main(String[] args) throws Exception {
 int port = 2222;
 new TcpServer(port).start();
 }
}

至此,所有代碼已經編寫完畢。

測試

首先啟動客戶端,再啟動服務器端。啟動完成後,在客戶端的控制台上,可以看到打印如下類似日誌:

Netty 如何實現心跳機制與斷線重連?

客戶端控制台輸出的日誌

在服務器端可以看到控制台輸出了類似如下的日誌:

Netty 如何實現心跳機制與斷線重連?

服務器端控制台輸出的日誌

可以看到,客戶端在發送4個心跳包後,第5個包因為等待時間較長,等到真正發送的時候,發現連接已斷開了;而服務器端收到客戶端的4個心跳數據包後,遲遲等不到下一個數據包,所以果斷斷開該連接。

異常情況

在測試過程中,有可能會出現如下情況:

Netty 如何實現心跳機制與斷線重連?

異常情況

出現這種情況的原因是:在連接已斷開的情況下,仍然向服務器端發送心跳包。雖然在發送心跳包之前會使用channel.isActive()判斷連接是否可用,但也有可能上一刻判斷結果為可用,但下一刻發送數據包之前,連接就斷了。

目前尚未找到優雅處理這種情況的方案,各位看官如果有好的解決方案,還望不吝賜教。拜謝!!!

斷線重連

斷線重連這裡就不過多介紹,相信各位都知道是怎麼回事。這裡只說大致思路,然後直接上代碼。

實現思路

客戶端在監測到與服務器端的連接斷開後,或者一開始就無法連接的情況下,使用指定的重連策略進行重連操作,直到重新建立連接或重試次數耗盡。

對於如何監測連接是否斷開,則是通過重寫ChannelInboundHandler#channelInactive來實現,但連接不可用,該方法會被觸發,所以只需要在該方法做好重連工作即可。

代碼實現

註:以下代碼都是在上面心跳機制的基礎上修改/添加的。

因為斷線重連是客戶端的工作,所以只需對客戶端代碼進行修改。

重試策略

RetryPolicy —— 重試策略接口

public interface RetryPolicy {
 /**
 * Called when an operation has failed for some reason. This method should return
 * true to make another attempt.
 *
 * @param retryCount the number of times retried so far (0 the first time)
 * @return true/false
 */
 boolean allowRetry(int retryCount);
 /**
 * get sleep time in ms of current retry count.
 *
 * @param retryCount current retry count
 * @return the time to sleep
 */
 long getSleepTimeMs(int retryCount);
}

ExponentialBackOffRetry —— 重連策略的默認實現

/**
 * <p>Retry policy that retries a set number of times with increasing sleep time between retries</p>
 */
public class ExponentialBackOffRetry implements RetryPolicy {
 private static final int MAX_RETRIES_LIMIT = 29;
 private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;
 private final Random random = new Random();
 private final long baseSleepTimeMs;
 private final int maxRetries;
 private final int maxSleepMs;
 public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) {
 this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
 }
 public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) {
 this.maxRetries = maxRetries;
 this.baseSleepTimeMs = baseSleepTimeMs;
 this.maxSleepMs = maxSleepMs;
 }
 @Override
 public boolean allowRetry(int retryCount) {
 if (retryCount < maxRetries) {
 return true;
 }
 return false;
 }
 @Override
 public long getSleepTimeMs(int retryCount) {
 if (retryCount < 0) {
 throw new IllegalArgumentException("retries count must greater than 0.");
 }
 if (retryCount > MAX_RETRIES_LIMIT) {
 System.out.println(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
 retryCount = MAX_RETRIES_LIMIT;
 }
 long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount));
 if (sleepMs > maxSleepMs) {
 System.out.println(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
 sleepMs = maxSleepMs;
 }
 return sleepMs;
 }
}

ReconnectHandler—— 重連處理器

@ChannelHandler.Sharable
public class ReconnectHandler extends ChannelInboundHandlerAdapter {
 private int retries = 0;
 private RetryPolicy retryPolicy;
 private TcpClient tcpClient;
 public ReconnectHandler(TcpClient tcpClient) {
 this.tcpClient = tcpClient;
 }
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
 System.out.println("Successfully established a connection to the server.");
 retries = 0;
 ctx.fireChannelActive();
 }
 @Override
 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 if (retries == 0) {
 System.err.println("Lost the TCP connection with the server.");
 ctx.close();
 }
 boolean allowRetry = getRetryPolicy().allowRetry(retries);
 if (allowRetry) {
 long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries);
 System.out.println(String.format("Try to reconnect to the server after %dms. Retry count: %d.", sleepTimeMs, ++retries));
 final EventLoop eventLoop = ctx.channel().eventLoop();
 eventLoop.schedule(() -> {
 System.out.println("Reconnecting ...");
 tcpClient.connect();
 }, sleepTimeMs, TimeUnit.MILLISECONDS);
 }
 ctx.fireChannelInactive();
 }
 private RetryPolicy getRetryPolicy() {
 if (this.retryPolicy == null) {
 this.retryPolicy = tcpClient.getRetryPolicy();
 }
 return this.retryPolicy;
 }
}

ClientHandlersInitializer

在之前的基礎上,添加了重連處理器ReconnectHandler。

public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> {
 private ReconnectHandler reconnectHandler;
 private EchoHandler echoHandler;
 public ClientHandlersInitializer(TcpClient tcpClient) {
 Assert.notNull(tcpClient, "TcpClient can not be null.");
 this.reconnectHandler = new ReconnectHandler(tcpClient);
 this.echoHandler = new EchoHandler();
 }
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
 ChannelPipeline pipeline = ch.pipeline();
 pipeline.addLast(this.reconnectHandler);
 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
 pipeline.addLast(new LengthFieldPrepender(4));
 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
 pipeline.addLast(new Pinger());
 }
}

TcpClient

在之前的基礎上添加重連、重連策略的支持。

public class TcpClient {
 private String host;
 private int port;
 private Bootstrap bootstrap;
 /** 重連策略 */
 private RetryPolicy retryPolicy;
 /** 將<code>Channel</code>保存起來, 可用於在其他非handler的地方發送數據 */
 private Channel channel;
 public TcpClient(String host, int port) {
 this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000));
 }
 public TcpClient(String host, int port, RetryPolicy retryPolicy) {
 this.host = host;
 this.port = port;
 this.retryPolicy = retryPolicy;
 init();
 }
 /**
 * 向遠程TCP服務器請求連接
 */
 public void connect() {
 synchronized (bootstrap) {
 ChannelFuture future = bootstrap.connect(host, port);
 future.addListener(getConnectionListener());
 this.channel = future.channel();
 }
 }
 public RetryPolicy getRetryPolicy() {
 return retryPolicy;
 }
 private void init() {
 EventLoopGroup group = new NioEventLoopGroup();
 // bootstrap 可重用, 只需在TcpClient實例化的時候初始化即可.
 bootstrap = new Bootstrap();
 bootstrap.group(group)
 .channel(NioSocketChannel.class)
 .handler(new ClientHandlersInitializer(TcpClient.this));
 }
 private ChannelFutureListener getConnectionListener() {
 return new ChannelFutureListener() {
 @Override
 public void operationComplete(ChannelFuture future) throws Exception {
 if (!future.isSuccess()) {
 future.channel().pipeline().fireChannelInactive();
 }
 }
 };
 }
 public static void main(String[] args) {
 TcpClient tcpClient = new TcpClient("localhost", 2222);
 tcpClient.connect();
 }
}

測試

在測試之前,為了避開 Connection reset by peer 異常,可以稍微修改Pinger的ping()方法,添加if (second == 5)的條件判斷。如下:

private void ping(Channel channel) {
 int second = Math.max(1, random.nextInt(baseRandom));
 if (second == 5) {
 second = 6;
 }
 System.out.println("next heart beat will send after " + second + "s.");
 ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() {
 @Override
 public void run() {
 if (channel.isActive()) {
 System.out.println("sending heart beat to the server...");
 channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
 } else {
 System.err.println("The connection had broken, cancel the task that will send a heart beat.");
 channel.closeFuture();
 throw new RuntimeException();
 }
 }
 }, second, TimeUnit.SECONDS);
 future.addListener(new GenericFutureListener() {
 @Override
 public void operationComplete(Future future) throws Exception {
 if (future.isSuccess()) {
 ping(channel);
 }
 }
 });
 }

啟動客戶端

先只啟動客戶端,觀察控制台輸出,可以看到類似如下日誌:

Netty 如何實現心跳機制與斷線重連?

斷線重連測試——客戶端控制台輸出

可以看到,當客戶端發現無法連接到服務器端,所以一直嘗試重連。隨着重試次數增加,重試時間間隔越大,但又不想無限增大下去,所以需要定一個閾值,比如60s。如上圖所示,當下一次重試時間超過60s時,會打印Sleep extension too large(*). Pinning to 60000,單位為ms。出現這句話的意思是,計算出來的時間超過閾值(60s),所以把真正睡眠的時間重置為閾值(60s)。

啟動服務器端

接着啟動服務器端,然後繼續觀察客戶端控制台輸出。

Netty 如何實現心跳機制與斷線重連?

斷線重連測試——服務器端啟動後客戶端控制台輸出

可以看到,在第9次重試失敗後,第10次重試之前,啟動的服務器,所以第10次重連的結果為Successfully established a connection to the server.,即成功連接到服務器。接下來因為還是不定時ping服務器,所以出現斷線重連、斷線重連的循環。

原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/205519.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
投稿專員的頭像投稿專員
上一篇 2024-12-07 17:47
下一篇 2024-12-07 17:47

相關推薦

發表回復

登錄後才能評論