深入理解Netty粘包

一、什麼是粘包

網絡通信中消息的傳輸有兩個重要的問題,一個是粘包,一個是拆包。

粘包的概念就是發送方發送的多個小數據包被接收方一次性收到,這就像是把多個包“粘”在了一起。

造成粘包的原因是發送方發送數據時,由於網絡傳輸中存在緩存機制,當數據長度很小時會佔用緩存區,而只有當緩存區滿後才會將數據發送出去。

二、Netty的粘包解決方案

Netty提供了三種解決粘包的方案: MessageToMessageDecoder,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。

1. MessageToMessageDecoder

MessageToMessageDecoder可以將一個ByteBuf轉換為另外一種格式的消息。


public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        byte[] bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);
        String message = new String(bytes, "UTF-8");
        out.add(message);
    }
}

2. LineBasedFrameDecoder

LineBasedFrameDecoder使用回車換行符(\r\n)或者換行符(\n)作為消息結束的標誌。


public class LineBasedFrameDecoder extends ByteToMessageDecoder {
    private static final byte CR = 13;
    private static final byte LF = 10;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int index = in.indexOf(in.readerIndex(), in.writerIndex(), LF);
        if (index == -1) {
            return;
        }

        ByteBuf line = in.readSlice(index - in.readerIndex() + 1);
        ByteBuf message = line.slice(0, line.readableBytes() - 2);
        line.skipBytes(2);

        out.add(message);
    }
}

3. DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder使用自定義字符作為消息結束的標誌。


public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteBuf delimiter;

    public DelimiterBasedFrameDecoder(ByteBuf delimiter) {
        this.delimiter = delimiter;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int index = in.indexOf(in.readerIndex(), in.writerIndex(), delimiter);
        if (index == -1) {
            return;
        }

        ByteBuf message = in.readSlice(index - in.readerIndex() + delimiter.readableBytes());
        in.skipBytes(delimiter.readableBytes());

        out.add(message);
    }
}

三、示例代碼

下面我們使用Netty提供的三種方式進行粘包解決:

1. MessageToMessageDecoder代碼示例

服務端:


public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MessageDecoder());
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

客戶端:


public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MessageEncoder());
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();

        for (int i = 0; i < 10; i++) {
            String message = "Hello Netty";
            channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
        }

        channelFuture.channel().closeFuture().sync();

        group.shutdownGracefully();
    }
}

MessageDecoder:


public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        byte[] bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);
        String message = new String(bytes, "UTF-8");
        out.add(message);
    }
}

ServerHandler:


public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

MessageEncoder:


public class MessageEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        out.writeBytes(msg.getBytes());
    }
}

ClientHandler:


public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("連接成功");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2. LineBasedFrameDecoder代碼示例

服務端:


public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

客戶端:


public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();

        for (int i = 0; i < 10; i++) {
            String message = "Hello Netty\r\n";
            channelFuture.channel().writeAndFlush(message);
        }

        channelFuture.channel().closeFuture().sync();

        group.shutdownGracefully();
    }
}

ServerHandler:


public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler:


public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("連接成功");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3. DelimiterBasedFrameDecoder代碼示例

服務端:


public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ByteBuf delimiter = Unpooled.copiedBuffer(new byte[]{(byte) '$'});
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(delimiter));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

客戶端:


public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();

        for (int i = 0; i < 10; i++) {
            String message = "Hello Netty$";
            channelFuture.channel().writeAndFlush(message);
        }

        channelFuture.channel().closeFuture().sync();

        group.shutdownGracefully();
    }
}

ServerHandler:


public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler:


public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("連接成功");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

原創文章,作者:MLAMX,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/368245.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
MLAMX的頭像MLAMX
上一篇 2025-04-12 01:13
下一篇 2025-04-12 01:13

相關推薦

  • gateway io.netty.buffer.poolchunk

    在本文中,我們將深入探討Netty中的一個基礎組件——PoolChunk,它是Netty中ByteBuf的一個關鍵實現,負責對ByteBuf進行緩存和管理。我們將從多個方面對該組件…

    編程 2025-04-28
  • 同時啟動兩個netty服務的實現方法

    本文將介紹如何同時啟動兩個netty服務的具體實現方法。 一、實現思路 為了同時啟動兩個netty服務,我們需要創建兩個不同的Channel,每個Channel都綁定到不同的服務端…

    編程 2025-04-27
  • 深入解析Vue3 defineExpose

    Vue 3在開發過程中引入了新的API `defineExpose`。在以前的版本中,我們經常使用 `$attrs` 和` $listeners` 實現父組件與子組件之間的通信,但…

    編程 2025-04-25
  • 深入理解byte轉int

    一、字節與比特 在討論byte轉int之前,我們需要了解字節和比特的概念。字節是計算機存儲單位的一種,通常表示8個比特(bit),即1字節=8比特。比特是計算機中最小的數據單位,是…

    編程 2025-04-25
  • 深入理解Flutter StreamBuilder

    一、什麼是Flutter StreamBuilder? Flutter StreamBuilder是Flutter框架中的一個內置小部件,它可以監測數據流(Stream)中數據的變…

    編程 2025-04-25
  • 深入探討OpenCV版本

    OpenCV是一個用於計算機視覺應用程序的開源庫。它是由英特爾公司創建的,現已由Willow Garage管理。OpenCV旨在提供一個易於使用的計算機視覺和機器學習基礎架構,以實…

    編程 2025-04-25
  • 深入了解scala-maven-plugin

    一、簡介 Scala-maven-plugin 是一個創造和管理 Scala 項目的maven插件,它可以自動生成基本項目結構、依賴配置、Scala文件等。使用它可以使我們專註於代…

    編程 2025-04-25
  • 深入了解LaTeX的腳註(latexfootnote)

    一、基本介紹 LaTeX作為一種排版軟件,具有各種各樣的功能,其中腳註(footnote)是一個十分重要的功能之一。在LaTeX中,腳註是用命令latexfootnote來實現的。…

    編程 2025-04-25
  • 深入了解Python包

    一、包的概念 Python中一個程序就是一個模塊,而一個模塊可以引入另一個模塊,這樣就形成了包。包就是有多個模塊組成的一個大模塊,也可以看做是一個文件夾。包可以有效地組織代碼和數據…

    編程 2025-04-25
  • 深入剖析MapStruct未生成實現類問題

    一、MapStruct簡介 MapStruct是一個Java bean映射器,它通過註解和代碼生成來在Java bean之間轉換成本類代碼,實現類型安全,簡單而不失靈活。 作為一個…

    編程 2025-04-25

發表回復

登錄後才能評論