一、什麼是粘包
網絡通信中消息的傳輸有兩個重要的問題,一個是粘包,一個是拆包。
粘包的概念就是發送方發送的多個小數據包被接收方一次性收到,這就像是把多個包“粘”在了一起。
造成粘包的原因是發送方發送數據時,由於網絡傳輸中存在緩存機制,當數據長度很小時會佔用緩存區,而只有當緩存區滿後才會將數據發送出去。
二、Netty的粘包解決方案
Netty提供了三種解決粘包的方案: MessageToMessageDecoder,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。
1. MessageToMessageDecoder
MessageToMessageDecoder可以將一個ByteBuf轉換為另外一種格式的消息。
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
String message = new String(bytes, "UTF-8");
out.add(message);
}
}
2. LineBasedFrameDecoder
LineBasedFrameDecoder使用回車換行符(\r\n)或者換行符(\n)作為消息結束的標誌。
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
private static final byte CR = 13;
private static final byte LF = 10;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int index = in.indexOf(in.readerIndex(), in.writerIndex(), LF);
if (index == -1) {
return;
}
ByteBuf line = in.readSlice(index - in.readerIndex() + 1);
ByteBuf message = line.slice(0, line.readableBytes() - 2);
line.skipBytes(2);
out.add(message);
}
}
3. DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder使用自定義字符作為消息結束的標誌。
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
private final ByteBuf delimiter;
public DelimiterBasedFrameDecoder(ByteBuf delimiter) {
this.delimiter = delimiter;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int index = in.indexOf(in.readerIndex(), in.writerIndex(), delimiter);
if (index == -1) {
return;
}
ByteBuf message = in.readSlice(index - in.readerIndex() + delimiter.readableBytes());
in.skipBytes(delimiter.readableBytes());
out.add(message);
}
}
三、示例代碼
下面我們使用Netty提供的三種方式進行粘包解決:
1. MessageToMessageDecoder代碼示例
服務端:
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MessageDecoder());
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
客戶端:
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MessageEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
for (int i = 0; i < 10; i++) {
String message = "Hello Netty";
channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(message.getBytes()));
}
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
MessageDecoder:
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
String message = new String(bytes, "UTF-8");
out.add(message);
}
}
ServerHandler:
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
MessageEncoder:
public class MessageEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
out.writeBytes(msg.getBytes());
}
}
ClientHandler:
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("連接成功");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2. LineBasedFrameDecoder代碼示例
服務端:
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
客戶端:
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
for (int i = 0; i < 10; i++) {
String message = "Hello Netty\r\n";
channelFuture.channel().writeAndFlush(message);
}
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
ServerHandler:
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
ClientHandler:
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("連接成功");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3. DelimiterBasedFrameDecoder代碼示例
服務端:
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(new byte[]{(byte) '$'});
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(delimiter));
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
客戶端:
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
for (int i = 0; i < 10; i++) {
String message = "Hello Netty$";
channelFuture.channel().writeAndFlush(message);
}
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
ServerHandler:
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
ClientHandler:
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("連接成功");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.toString());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
原創文章,作者:MLAMX,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/368245.html