在正式开始之前,我们先对上篇评论中的几个范例问题做一个大略的回答,不感兴趣的朋友可直接划过。
问题一:TCP存在粘包问题吗?先说答案:TCP 本身并没有粘包和半包一说,由于 TCP 实质上只是一个传输掌握协议(Transmission Control Protocol,TCP),它是一种面向连接的、可靠的、基于字节流的传输层通信协议,由 IETF 的 RFC 793 定义。
所谓的协议实质上是一个约定,就好比 Java 编程约定利用驼峰命名法一样,约定的意义是为了让通讯双方,能够正常的进行互换的,那粘包和半包问题又是如何产生的呢?

这是由于在 TCP 的交互中,数据因此字节流的形式进行传输的,而“流”的传输是没有边界的,由于没有边界以是就不能区分的归属,从而就会产生粘包和半包问题(粘包和半包的定义,详见上一篇)。以是说 TCP 协议本身并不存在粘包和半包问题,只是在利用中如果不能有效的确定流的边界就会产生粘包和半包问题。
问题二:分隔符是最优办理方案?坦白的说,经由评论区大家的耐心“开导”,我也意识到了以结束符作为终极的办理方案存在一定的局限性,比如当一条中间如果涌现了却束符就会造成半包的问题,以是如果是繁芜的字符串要对内容进行编码和解码处理,这样才能担保结束符的精确性。
问题三:Socket 高效吗?这个问题的答案是否定的,实在上文在开头已经描述了运用处景:「传统的 Socket 编程」,学习它的意义就在于理解更早期更底层的一些知识,当然作为补充本文会供应更加高效的通讯方案——Netty 通讯。
聊完了以上问题,接下来咱们先来补充一下上篇文章中提到的,将分为头和体的代码实现。
一、封装头和体在开始写做事器端和客户端之前,咱们先来编写一个的封装类,利用它可以将封装成头和体,如下图所示:
头中存储体的长度,从而确定了的边界,便办理粘包和半包问题。
1.封装类的封装类中供应了两个方法:一个是将转换成头 + 体的方法,另一个是读取消息头的方法,详细实当代码如下:
/ 封装类 /class SocketPacket { // 头存储的长度(占 8 字节) static final int HEAD_SIZE = 8; / 将协议封装为:协议头 + 协议体 @param context 体(String 类型) @return byte[] / public byte[] toBytes(String context) { // 协议体 byte 数组 byte[] bodyByte = context.getBytes(); int bodyByteLength = bodyByte.length; // 终极封装工具 byte[] result = new byte[HEAD_SIZE + bodyByteLength]; // 借助 NumberFormat 将 int 转换为 byte[] NumberFormat numberFormat = NumberFormat.getNumberInstance(); numberFormat.setMinimumIntegerDigits(HEAD_SIZE); numberFormat.setGroupingUsed(false); // 协议头 byte 数组 byte[] headByte = numberFormat.format(bodyByteLength).getBytes(); // 封装协议头 System.arraycopy(headByte, 0, result, 0, HEAD_SIZE); // 封装协议体 System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength); return result; } / 获取消息头的内容(也便是体的长度) @param inputStream @return / public int getHeader(InputStream inputStream) throws IOException { int result = 0; byte[] bytes = new byte[HEAD_SIZE]; inputStream.read(bytes, 0, HEAD_SIZE); // 得到体的字节长度 result = Integer.valueOf(new String(bytes)); return result; }}
2.编写客户端
接下来我们来定义客户端,在客户端中我们添加一组待发送的,随机给做事器端发送一个,实当代码如下:
/ 客户端 /class MySocketClient { public static void main(String[] args) throws IOException { // 启动 Socket 并考试测验连接做事器 Socket socket = new Socket("127.0.0.1", 9093); // 发送合集(随机发送一条) final String[] message = {"Hi,Java.", "Hi,SQL~", "关注公众号|Java中文社群."}; // 创建协议封装工具 SocketPacket socketPacket = new SocketPacket(); try (OutputStream outputStream = socket.getOutputStream()) { // 给做事器端发送 10 次 for (int i = 0; i < 10; i++) { // 随机发送一条 String msg = message[new Random().nextInt(message.length)]; // 将内容封装为:协议头+协议体 byte[] bytes = socketPacket.toBytes(msg); // 发送 outputStream.write(bytes, 0, bytes.length); outputStream.flush(); } } }}
3.编写做事器端
做事器端我们利用线程池来处理每个客户真个业务要求,实当代码如下:
/ 做事器端 /class MySocketServer { public static void main(String[] args) throws IOException { // 创建 Socket 做事器端 ServerSocket serverSocket = new ServerSocket(9093); // 获取客户端连接 Socket clientSocket = serverSocket.accept(); // 利用线程池处理更多的客户端 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)); threadPool.submit(() -> { // 客户端处理 processMessage(clientSocket); }); } / 客户端处理 @param clientSocket / private static void processMessage(Socket clientSocket) { // Socket 封装工具 SocketPacket socketPacket = new SocketPacket(); // 获取客户端发送的工具 try (InputStream inputStream = clientSocket.getInputStream()) { while (true) { // 获取消息头(也便是体的长度) int bodyLength = socketPacket.getHeader(inputStream); // 体 byte 数组 byte[] bodyByte = new byte[bodyLength]; // 每次实际读取字节数 int readCount = 0; // 体赋值下标 int bodyIndex = 0; // 循环吸收头中定义的长度 while (bodyIndex <= (bodyLength - 1) && (readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) { bodyIndex += readCount; } bodyIndex = 0; // 成功吸收到客户真个并打印 System.out.println("吸收到客户真个信息:" + new String(bodyByte)); } } catch (IOException ioException) { System.out.println(ioException.getMessage()); } }}
以上程序的实行结果如下:
从上述结果可以看出,通讯正常,客户端和做事器真个交互中并没有涌现粘包和半包的问题。
二、利用 Netty 实现高效通讯以上的内容都是针对传统 Socket 编程的,但要实现更加高效的通讯和连接工具的复用就要利用 NIO(Non-Blocking IO,非壅塞 IO)或者 AIO(Asynchronous IO,异步非壅塞 IO)了。
传统的 Socket 编程是 BIO(Blocking IO,同步壅塞 IO),它和 NIO 和 AIO 的差异如下:
BIO 来自传统的 java.io 包,它是基于流模型实现的,交互的办法是同步、壅塞办法,也便是说在读入输入流或者输出流时,在读写动作完成之前,线程会一贯壅塞在那里,它们之间的调用是可靠的线性顺序。它的优点便是代码比较大略、直不雅观;缺陷便是 IO 的效率和扩展性很低,随意马虎成为运用性能瓶颈。NIO 是 Java 1.4 引入的 java.nio 包,供应了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非壅塞 IO 程序,同时供应了更靠近操作系统底层高性能的数据操作办法。AIO 是 Java 1.7 之后引入的包,是 NIO 的升级版本,供应了异步非堵塞的 IO 操作办法,因此人们叫它 AIO(Asynchronous IO),异步 IO 是基于事宜和回调机制实现的,也便是运用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会关照相应的线程进行后续的操作。PS:AIO 可以看作是 NIO 的升级,它也叫 NIO 2。
传统 Socket 的通讯流程:
NIO 的通讯流程:
利用 Netty 替代传统 NIO 编程
NIO 的设计思路虽然很好,但它的代码编写比较麻烦,比如 Buffer 的利用和 Selector 的编写等。并且在面对断线重连、包丢失和粘包等繁芜问题时手动处理的本钱都很大,因此我们常日会利用 Netty 框架来替代传统的 NIO。
Netty 是什么?Netty 是一个异步、事宜驱动的用来做高性能、高可靠性的网络运用框架,利用它可以快速轻松地开拓网络运用程序,极大的简化了网络编程的繁芜度。
Netty 紧张优点有以下几个:
框架设计优雅,底层模型随意切换适应不同的网络协议哀求;供应很多标准的协议、安全、编码解码的支持;简化了 NIO 利用中的诸多不便;社区非常生动,很多开源框架中都利用了 Netty 框架,如 Dubbo、RocketMQ、Spark 等。Netty 紧张包含以下 3 个部分,如下图所示:
这 3 个部分的功能先容如下。
1. Core 核心层Core 核心层是 Netty 最精华的内容,它供应了底层网络通信的通用抽象和实现,包括可扩展的事宜模型、通用的通信 API、支持零拷贝的 ByteBuf 等。
2. Protocol Support 协议支持层
协议支持层基本上覆盖了主流协议的编解码实现,如 HTTP、SSL、Protobuf、压缩、大文件传输、WebSocket、文本、二进制等主流协议,此外 Netty 还支持自定义运用层协议。Netty 丰富的协议支持降落了用户的开拓本钱,基于 Netty 我们可以快速开拓 HTTP、WebSocket 等做事。
3. Transport Service 传输做事层
传输做事层供应了网络传输能力的定义和实现方法。它支持 Socket、HTTP 隧道、虚拟机管道等传输办法。Netty 对 TCP、UDP 等数据传输做了抽象和封装,用户可以更聚焦在业务逻辑实现上,而不必关系底层数据传输的细节。
Netty 利用对 Netty 有了大概的认识之后,接下来我们用 Netty 来编写一个根本的通讯做事器,它包含两个端:做事器端和客户端,客户端卖力发送,做事器端卖力吸收并打印,详细的实现步骤如下。
1.添加 Netty 框架首先我们须要先添加 Netty 框架的支持,如果是 Maven 项目添加如下配置即可:
<!-- 添加 Netty 框架 --><!-- https://mvnrepository.com/artifact/io.netty/netty-all --><dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.56.Final</version></dependency>
Netty 版本解释
Netty 的 3.x 和 4.x 为主流的稳定版本,而最新的 5.x 已经是放弃的测试版了,因此推举利用 Netty 4.x 的最新稳定版。
2. 做事器端实当代码
按照官方的推举,这里将做事器真个代码分为以下 3 个部分:
MyNettyServer:做事器真个核心业务代码;ServerInitializer:做事器端通道(Channel)初始化;ServerHandler:做事器端吸收到信息之后的处理逻辑。PS:Channel 字面意思为“通道”,它是网络通信的载体。Channel 供应了基本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己实现的 Channel 因此 JDK NIO Channel 为根本的,比较较于 JDK NIO,Netty 的 Channel 供应了更高层次的抽象,同时屏蔽了底层 Socket 的繁芜性,授予了 Channel 更加强大的功能,你在利用 Netty 时基本不须要再与 Java Socket 类直接打交道。
做事器真个实当代码如下:
// 定义做事器的端口号static final int PORT = 8007;/ 做事器端 /static class MyNettyServer { public static void main(String[] args) { // 创建一个线程组,用来卖力吸收客户端连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 创建另一个线程组,用来卖力 I/O 的读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建一个 Server 实例(可理解为 Netty 的入门类) ServerBootstrap b = new ServerBootstrap(); // 将两个线程池设置到 Server 实例 b.group(bossGroup, workerGroup) // 设置 Netty 通道的类型为 NioServerSocket(非壅塞 I/O Socket 做事器) .channel(NioServerSocketChannel.class) // 设置建立连接之后的实行器(ServerInitializer 是我创建的一个自定义类) .childHandler(new ServerInitializer()); // 绑定端口并且进行同步 ChannelFuture future = b.bind(PORT).sync(); // 对关闭通道进行监听 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 资源关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}/ 做事端通道初始化 /static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 做事器端连接之后的实行器(自定义的类) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); / 初始化通道的详细实行方法 / @Override public void initChannel(SocketChannel ch) { // 通道 Channel 设置 ChannelPipeline pipeline = ch.pipeline(); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 做事器端连接之后的实行器,吸收到之后的业务处理 pipeline.addLast(SERVER_HANDLER); }}/ 做事器端吸收到之后的业务处理类 /static class ServerHandler extends SimpleChannelInboundHandler<String> { / 读取到客户真个 / @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客户真个:" + request); } } / 数据读取完毕 / @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } / 非常处理,打印非常并关闭通道 / @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
客户真个代码实现也是分为以下 3 个部分:
MyNettyClient:客户端核心业务代码;ClientInitializer:客户端通道初始化;ClientHandler:吸收到之后的处理逻辑。客户真个实当代码如下:
/ 客户端 /static class MyNettyClient { public static void main(String[] args) { // 创建事宜循环线程组(客户真个线程组只有一个) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客户端启动工具 Bootstrap b = new Bootstrap(); // 设置启动参数 b.group(group) // 设置通道类型 .channel(NioSocketChannel.class) // 设置启动实行器(卖力启动事宜的业务实行,ClientInitializer 为自定义的类) .handler(new ClientInitializer()); // 连接做事器端并同步通道 Channel ch = b.connect("127.0.0.1", 8007).sync().channel(); // 发送 ChannelFuture lastWriteFuture = null; // 给做事器端发送 10 条 for (int i = 0; i < 10; i++) { // 发送给做事器 lastWriteFuture = ch.writeAndFlush("Hi,Java."); } // 在关闭通道之前,同步刷新所有的 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 开释资源 group.shutdownGracefully(); } }}/ 客户端通道初始化类 /static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客户端连接成功之后业务处理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); / 初始化客户端通道 / @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客户端连接成功之后的业务处理 pipeline.addLast(CLIENT_HANDLER); }}/ 客户端连接成功之后的业务处理 /static class ClientHandler extends SimpleChannelInboundHandler<String> { / 读取到做事器真个 / @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到做事器的:" + msg); } / 非常处理,打印非常并关闭通道 / @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
从以上代码可以看出,我们代码实现的功能是,客户端给做事器端发送 10 条。
编写完上述代码之后,我们就可以启动做事器端和客户端了,启动之后,它们的实行结果如下:
从上述结果中可以看出,虽然客户端和做事器端实现了通信,但在 Netty 的利用中依然存在粘包的问题,做事器端一次收到了 10 条,而不是每次只收到一条,因此接下来我们要办理掉 Netty 中的粘包问题。
三、办理 Netty 粘包问题在 Netty 中,办理粘包问题的常用方案有以下 3 种:
设置固定大小的长度,如果长度不敷则利用空字符填补,它的缺陷比较明显,比较花费网络流量,因此不建议利用;利用分隔符来确定的边界,从而避免粘包和半包问题的产生;将分为头和体,在头部中保存有当前全体的长度,只有在读取到足够长度的之后才算是读到了一个完全的。接下来我们分别来看后两种推举的办理方案。
1.利用分隔符办理粘包问题在 Netty 中供应了 DelimiterBasedFrameDecoder 类用来以分外符号作为的结束符,从而办理粘包和半包的问题。
它的核心实当代码是在初始化通道(Channel)时,通过设置 DelimiterBasedFrameDecoder 来分隔,须要在客户端和做事器端都进行设置,详细实当代码如下。
做事器端核心实当代码如下:
/ 做事端通道初始化 /static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 做事器端连接之后的实行器(自定义的类) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); / 初始化通道的详细实行方法 / @Override public void initChannel(SocketChannel ch) { // 通道 Channel 设置 ChannelPipeline pipeline = ch.pipeline(); // 19 行:设置结尾分隔符【核心代码】(参数1:为的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符]) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 做事器端连接之后的实行器,吸收到之后的业务处理 pipeline.addLast(SERVER_HANDLER); }}
核心代码为第 19 行,代码中已经备注了方法的含义,这里就不再赘述。
客户真个核心实当代码如下:
/ 客户端通道初始化类 /static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客户端连接成功之后业务处理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); / 初始化客户端通道 / @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 17 行:设置结尾分隔符【核心代码】(参数1:为的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符]) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客户端连接成功之后的业务处理 pipeline.addLast(CLIENT_HANDLER); }}
完全的做事器端和客户真个实当代码如下:
import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.Delimiters;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class NettyExample { // 定义做事器的端口号 static final int PORT = 8007; / 做事器端 / static class MyNettyServer { public static void main(String[] args) { // 创建一个线程组,用来卖力吸收客户端连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 创建另一个线程组,用来卖力 I/O 的读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建一个 Server 实例(可理解为 Netty 的入门类) ServerBootstrap b = new ServerBootstrap(); // 将两个线程池设置到 Server 实例 b.group(bossGroup, workerGroup) // 设置 Netty 通道的类型为 NioServerSocket(非壅塞 I/O Socket 做事器) .channel(NioServerSocketChannel.class) // 设置建立连接之后的实行器(ServerInitializer 是我创建的一个自定义类) .childHandler(new ServerInitializer()); // 绑定端口并且进行同步 ChannelFuture future = b.bind(PORT).sync(); // 对关闭通道进行监听 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 资源关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } / 做事端通道初始化 / static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 做事器端连接之后的实行器(自定义的类) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); / 初始化通道的详细实行方法 / @Override public void initChannel(SocketChannel ch) { // 通道 Channel 设置 ChannelPipeline pipeline = ch.pipeline(); // 设置结尾分隔符 pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 做事器端连接之后的实行器,吸收到之后的业务处理 pipeline.addLast(SERVER_HANDLER); } } / 做事器端吸收到之后的业务处理类 / static class ServerHandler extends SimpleChannelInboundHandler<String> { / 读取到客户真个 / @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客户真个:" + request); } } / 数据读取完毕 / @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } / 非常处理,打印非常并关闭通道 / @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } / 客户端 / static class MyNettyClient { public static void main(String[] args) { // 创建事宜循环线程组(客户真个线程组只有一个) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客户端启动工具 Bootstrap b = new Bootstrap(); // 设置启动参数 b.group(group) // 设置通道类型 .channel(NioSocketChannel.class) // 设置启动实行器(卖力启动事宜的业务实行,ClientInitializer 为自定义的类) .handler(new ClientInitializer()); // 连接做事器端并同步通道 Channel ch = b.connect("127.0.0.1", PORT).sync().channel(); // 发送 ChannelFuture lastWriteFuture = null; // 给做事器端发送 10 条 for (int i = 0; i < 10; i++) { // 发送给做事器 lastWriteFuture = ch.writeAndFlush("Hi,Java.\n"); } // 在关闭通道之前,同步刷新所有的 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 开释资源 group.shutdownGracefully(); } } } / 客户端通道初始化类 / static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客户端连接成功之后业务处理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); / 初始化客户端通道 / @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 设置结尾分隔符 pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客户端连接成功之后的业务处理 pipeline.addLast(CLIENT_HANDLER); } } / 客户端连接成功之后的业务处理 / static class ClientHandler extends SimpleChannelInboundHandler<String> { / 读取到做事器真个 / @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到做事器的:" + msg); } / 非常处理,打印非常并关闭通道 / @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }}
终极的实行结果如下图所示:
从上述结果中可以看出,Netty 可以正常利用了,它已经不存在粘包和半包问题了。
2.封装办理粘包问题
此办理方案的核心是将分为头 + 体,在头中保存体的长度,从而确定一条的边界,这样就避免了粘包和半包问题了,它的实现过程如下图所示:
在 Netty 中可以通过 LengthFieldPrepender(编码)和 LengthFieldBasedFrameDecoder(解码)两个类实现的封装。和上一个办理方案类似,我们须要分别在做事器端和客户端通过设置通道(Channel)来办理粘包问题。
做事器真个核心代码如下:
/ 做事端通道初始化 /static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 做事器端连接之后的实行器(自定义的类) private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler(); / 初始化通道的详细实行方法 / @Override public void initChannel(SocketChannel ch) { // 通道 Channel 设置 ChannelPipeline pipeline = ch.pipeline(); // 18 行:解码:读取消息头和体 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 20 行:编码:将封装为头和体,在前添加体的长度 pipeline.addLast(new LengthFieldPrepender(4)); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 做事器端连接之后的实行器,吸收到之后的业务处理 pipeline.addLast(SERVER_HANDLER); }}
个中核心代码是 18 行和 20 行,通过 LengthFieldPrepender 实现编码(将打包成头 + 体),通过 LengthFieldBasedFrameDecoder 实现解码(从封装的中取出的内容)。
LengthFieldBasedFrameDecoder 的参数解释如下:
参数 1:maxFrameLength - 发送的数据包最大长度;参数 2:lengthFieldOffset - 长度域偏移量,指的是长度域位于全体数据包字节数组中的下标;参数 3:lengthFieldLength - 长度域自己的字节数长度;参数 4:lengthAdjustment – 长度域的偏移量纠正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就须要进行纠正。纠正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长;参数 5:initialBytesToStrip – 丢弃的起始字节数。丢弃处于有效数据前面的字节数量。比如前面有 4 个节点的长度域,则它的值为 4。LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:数据包最大长度为 1024,长度域占首部的四个字节,在读数据的时候去掉首部四个字节(即长度域)。
客户真个核心实当代码如下:
/ 客户端通道初始化类 /static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客户端连接成功之后业务处理 private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler(); / 初始化客户端通道 / @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 解码:读取消息头和体 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 编码:将封装为头和体,在相应字节数据前面添加体长度 pipeline.addLast(new LengthFieldPrepender(4)); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客户端连接成功之后的业务处理 pipeline.addLast(CLIENT_HANDLER); }}
完全的做事器端和客户真个实当代码如下:
import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/ 通过封装 Netty 来办理粘包 /public class NettyExample { // 定义做事器的端口号 static final int PORT = 8007; / 做事器端 / static class MyNettyServer { public static void main(String[] args) { // 创建一个线程组,用来卖力吸收客户端连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 创建另一个线程组,用来卖力 I/O 的读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 创建一个 Server 实例(可理解为 Netty 的入门类) ServerBootstrap b = new ServerBootstrap(); // 将两个线程池设置到 Server 实例 b.group(bossGroup, workerGroup) // 设置 Netty 通道的类型为 NioServerSocket(非壅塞 I/O Socket 做事器) .channel(NioServerSocketChannel.class) // 设置建立连接之后的实行器(ServerInitializer 是我创建的一个自定义类) .childHandler(new NettyExample.ServerInitializer()); // 绑定端口并且进行同步 ChannelFuture future = b.bind(PORT).sync(); // 对关闭通道进行监听 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 资源关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } / 做事端通道初始化 / static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 做事器端连接之后的实行器(自定义的类) private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler(); / 初始化通道的详细实行方法 / @Override public void initChannel(SocketChannel ch) { // 通道 Channel 设置 ChannelPipeline pipeline = ch.pipeline(); // 解码:读取消息头和体 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 编码:将封装为头和体,在相应字节数据前面添加体长度 pipeline.addLast(new LengthFieldPrepender(4)); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 做事器端连接之后的实行器,吸收到之后的业务处理 pipeline.addLast(SERVER_HANDLER); } } / 做事器端吸收到之后的业务处理类 / static class ServerHandler extends SimpleChannelInboundHandler<String> { / 读取到客户真个 / @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客户真个:" + request); } } / 数据读取完毕 / @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } / 非常处理,打印非常并关闭通道 / @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } / 客户端 / static class MyNettyClient { public static void main(String[] args) { // 创建事宜循环线程组(客户真个线程组只有一个) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客户端启动工具 Bootstrap b = new Bootstrap(); // 设置启动参数 b.group(group) // 设置通道类型 .channel(NioSocketChannel.class) // 设置启动实行器(卖力启动事宜的业务实行,ClientInitializer 为自定义的类) .handler(new NettyExample.ClientInitializer()); // 连接做事器端并同步通道 Channel ch = b.connect("127.0.0.1", PORT).sync().channel(); // 发送 ChannelFuture lastWriteFuture = null; // 给做事器端发送 10 条 for (int i = 0; i < 10; i++) { // 发送给做事器 lastWriteFuture = ch.writeAndFlush("Hi,Java.\n"); } // 在关闭通道之前,同步刷新所有的 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 开释资源 group.shutdownGracefully(); } } } / 客户端通道初始化类 / static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串编码器和解码器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客户端连接成功之后业务处理 private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler(); / 初始化客户端通道 / @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 解码:读取消息头和体 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 编码:将封装为头和体,在相应字节数据前面添加体长度 pipeline.addLast(new LengthFieldPrepender(4)); // 设置(字符串)编码器和解码器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客户端连接成功之后的业务处理 pipeline.addLast(CLIENT_HANDLER); } } / 客户端连接成功之后的业务处理 / static class ClientHandler extends SimpleChannelInboundHandler<String> { / 读取到做事器真个 / @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到做事器的:" + msg); } / 非常处理,打印非常并关闭通道 / @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }}
以上程序的实行结果为:
四、总结
本文供应了传统 Socket 通讯将分为头和体的详细代码实现,然而传统的 Socket 在性能和复用性上表现一样平常,为了更加高效的实现通讯,我们可以利用 Netty 框架来替代传统的 Socket 和 NIO 编程,但 Netty 在利用时依然会涌现粘包的问题,于是我们供应了两种最常见的办理方案:通过分隔符或将封装的办理方案,个中末了一种办理方案的利用更加广泛。
参考 & 鸣谢《Netty 核心事理阐发与 RPC 实践》