深入理解Netty粘包

一、什么是粘包

网络通信中消息的传输有两个重要的问题,一个是粘包,一个是拆包。

粘包的概念就是发送方发送的多个小数据包被接收方一次性收到,这就像是把多个包“粘”在了一起。

造成粘包的原因是发送方发送数据时,由于网络传输中存在缓存机制,当数据长度很小时会占用缓存区,而只有当缓存区满后才会将数据发送出去。

二、Netty的粘包解决方案

Netty提供了三种解决粘包的方案: MessageToMessageDecoder,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。

1. MessageToMessageDecoder

MessageToMessageDecoder可以将一个ByteBuf转换为另外一种格式的消息。


public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        byte[] bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);
        String message = new String(bytes, "UTF-8");
        out.add(message);
    }
}

2. LineBasedFrameDecoder

LineBasedFrameDecoder使用回车换行符(\r\n)或者换行符(\n)作为消息结束的标志。


public class LineBasedFrameDecoder extends ByteToMessageDecoder {
    private static final byte CR = 13;
    private static final byte LF = 10;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int index = in.indexOf(in.readerIndex(), in.writerIndex(), LF);
        if (index == -1) {
            return;
        }

        ByteBuf line = in.readSlice(index - in.readerIndex() + 1);
        ByteBuf message = line.slice(0, line.readableBytes() - 2);
        line.skipBytes(2);

        out.add(message);
    }
}

3. DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder使用自定义字符作为消息结束的标志。


public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteBuf delimiter;

    public DelimiterBasedFrameDecoder(ByteBuf delimiter) {
        this.delimiter = delimiter;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int index = in.indexOf(in.readerIndex(), in.writerIndex(), delimiter);
        if (index == -1) {
            return;
        }

        ByteBuf message = in.readSlice(index - in.readerIndex() + delimiter.readableBytes());
        in.skipBytes(delimiter.readableBytes());

        out.add(message);
    }
}

三、示例代码

下面我们使用Netty提供的三种方式进行粘包解决:

1. MessageToMessageDecoder代码示例

服务端:


public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MessageDecoder());
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

客户端:


public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MessageEncoder());
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();

        for (int i = 0; i < 10; i++) {
            String message = "Hello Netty";
            channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
        }

        channelFuture.channel().closeFuture().sync();

        group.shutdownGracefully();
    }
}

MessageDecoder:


public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        byte[] bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);
        String message = new String(bytes, "UTF-8");
        out.add(message);
    }
}

ServerHandler:


public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

MessageEncoder:


public class MessageEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        out.writeBytes(msg.getBytes());
    }
}

ClientHandler:


public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接成功");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2. LineBasedFrameDecoder代码示例

服务端:


public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

客户端:


public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();

        for (int i = 0; i < 10; i++) {
            String message = "Hello Netty\r\n";
            channelFuture.channel().writeAndFlush(message);
        }

        channelFuture.channel().closeFuture().sync();

        group.shutdownGracefully();
    }
}

ServerHandler:


public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler:


public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接成功");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3. DelimiterBasedFrameDecoder代码示例

服务端:


public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ByteBuf delimiter = Unpooled.copiedBuffer(new byte[]{(byte) '$'});
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(delimiter));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

客户端:


public class Client {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                        ch.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();

        for (int i = 0; i < 10; i++) {
            String message = "Hello Netty$";
            channelFuture.channel().writeAndFlush(message);
        }

        channelFuture.channel().closeFuture().sync();

        group.shutdownGracefully();
    }
}

ServerHandler:


public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler:


public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接成功");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg.toString());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

原创文章,作者:MLAMX,如若转载,请注明出处:https://www.506064.com/n/368245.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
MLAMXMLAMX
上一篇 2025-04-12 01:13
下一篇 2025-04-12 01:13

相关推荐

  • gateway io.netty.buffer.poolchunk

    在本文中,我们将深入探讨Netty中的一个基础组件——PoolChunk,它是Netty中ByteBuf的一个关键实现,负责对ByteBuf进行缓存和管理。我们将从多个方面对该组件…

    编程 2025-04-28
  • 同时启动两个netty服务的实现方法

    本文将介绍如何同时启动两个netty服务的具体实现方法。 一、实现思路 为了同时启动两个netty服务,我们需要创建两个不同的Channel,每个Channel都绑定到不同的服务端…

    编程 2025-04-27
  • 深入解析Vue3 defineExpose

    Vue 3在开发过程中引入了新的API `defineExpose`。在以前的版本中,我们经常使用 `$attrs` 和` $listeners` 实现父组件与子组件之间的通信,但…

    编程 2025-04-25
  • 深入理解byte转int

    一、字节与比特 在讨论byte转int之前,我们需要了解字节和比特的概念。字节是计算机存储单位的一种,通常表示8个比特(bit),即1字节=8比特。比特是计算机中最小的数据单位,是…

    编程 2025-04-25
  • 深入理解Flutter StreamBuilder

    一、什么是Flutter StreamBuilder? Flutter StreamBuilder是Flutter框架中的一个内置小部件,它可以监测数据流(Stream)中数据的变…

    编程 2025-04-25
  • 深入探讨OpenCV版本

    OpenCV是一个用于计算机视觉应用程序的开源库。它是由英特尔公司创建的,现已由Willow Garage管理。OpenCV旨在提供一个易于使用的计算机视觉和机器学习基础架构,以实…

    编程 2025-04-25
  • 深入了解scala-maven-plugin

    一、简介 Scala-maven-plugin 是一个创造和管理 Scala 项目的maven插件,它可以自动生成基本项目结构、依赖配置、Scala文件等。使用它可以使我们专注于代…

    编程 2025-04-25
  • 深入了解LaTeX的脚注(latexfootnote)

    一、基本介绍 LaTeX作为一种排版软件,具有各种各样的功能,其中脚注(footnote)是一个十分重要的功能之一。在LaTeX中,脚注是用命令latexfootnote来实现的。…

    编程 2025-04-25
  • 深入探讨冯诺依曼原理

    一、原理概述 冯诺依曼原理,又称“存储程序控制原理”,是指计算机的程序和数据都存储在同一个存储器中,并且通过一个统一的总线来传输数据。这个原理的提出,是计算机科学发展中的重大进展,…

    编程 2025-04-25
  • 深入了解Python包

    一、包的概念 Python中一个程序就是一个模块,而一个模块可以引入另一个模块,这样就形成了包。包就是有多个模块组成的一个大模块,也可以看做是一个文件夹。包可以有效地组织代码和数据…

    编程 2025-04-25

发表回复

登录后才能评论