Netty is an asynchronous event-driven network application frameworkfor rapid development of maintainable high performance protocol servers & clients.
Netty是 一个异步事宜驱动的网络运用程序框架,用于快速开拓可掩护的高性能协议做事器和客户端。
从官网上先容,Netty是一个网络运用程序框架,开拓做事器和客户端。也便是用于网络编程的一个框架。既然是网络编程,Socket就不谈了,为什么不用NIO呢?

2.1 NIO的缺陷
对付这个问题,之前我写了一篇文章《NIO入门》对NIO有比较详细的先容,NIO的紧张问题是:
NIO的类库和API繁杂,学习本钱高,你须要闇练节制Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。须要熟习Java多线程编程。这是由于NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟习,才能写出高质量的NIO程序。臭名昭著的epoll bug。它会导致Selector空轮询,终极导致CPU 100%。直到JDK1.7版本依然没得到根本性的办理。2.2 Netty的优点
相对地,Netty的优点有很多:
API利用大略,学习本钱低。功能强大,内置了多种解码编码器,支持多种协议。性能高,比拟其他主流的NIO框架,Netty的性能最优。社区生动,创造BUG会及时修复,迭代版本周期短,不断加入新的功能。Dubbo、Elasticsearch都采取了Netty,质量得到验证。三、架构图上面这张图便是在官网首页的架构图,我们从上到下剖析一下。
绿色的部分Core核心模块,包括零拷贝、API库、可扩展的事宜模型。
橙色部分Protocol Support协议支持,包括Http协议、webSocket、SSL(安全套接字协议)、谷歌Protobuf协议、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等。
赤色的部分Transport Services传输做事,包括Socket、Datagram、Http Tunnel等等。
以上可看出Netty的功能、协议、传输办法都比较全,比较强大。
四、永久的Hello Word首先搭建一个HelloWord工程,先熟习一下API,还有为后面的学习做铺垫。以下面这张图为依据:
4.1 引入Maven依赖
利用的版本是4.1.20,相比拟较稳定的一个版本。
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.20.Final</version></dependency>
4.2 创建做事端启动类
public class MyServer { public static void main(String[] args) throws Exception { //创建两个线程组 boosGroup、workerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建做事真个启动工具,设置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组boosGroup和workerGroup bootstrap.group(bossGroup, workerGroup) //设置做事端通道实现类型 .channel(NioServerSocketChannel.class) //设置线程行列步队得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //利用匿名内部类的形式初始化通道工具 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //给pipeline管道设置处理器 socketChannel.pipeline().addLast(new MyServerHandler()); } });//给workerGroup的EventLoop对应的管道设置处理器 System.out.println("java技能爱好者的做事端已经准备就绪..."); //绑定端口号,启动做事端 ChannelFuture channelFuture = bootstrap.bind(6666).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
4.3 创建做事端处理器
/ 自定义的Handler须要继续Netty规定好的HandlerAdapter 才能被Netty框架所关联,有点类似SpringMVC的适配器模式 /public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取客户端发送过来的 ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的:" + byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //发送给客户端 ctx.writeAndFlush(Unpooled.copiedBuffer("做事端已收到,并给你发送一个问号?", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生非常,关闭通道 ctx.close(); }}
4.4 创建客户端启动类
public class MyClient { public static void main(String[] args) throws Exception { NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { //创建bootstrap工具,配置参数 Bootstrap bootstrap = new Bootstrap(); //设置线程组 bootstrap.group(eventExecutors) //设置客户真个通道实现类型 .channel(NioSocketChannel.class) //利用匿名内部类初始化通道 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加客户端通道的处理器 ch.pipeline().addLast(new MyClientHandler()); } }); System.out.println("客户端准备就绪,随时可以起飞~"); //连接做事端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync(); //对通道关闭进行监听 channelFuture.channel().closeFuture().sync(); } finally { //关闭线程组 eventExecutors.shutdownGracefully(); } }}
4.5 创建客户端处理器
public class MyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //发送到做事端 ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //吸收做事端发送过来的 ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到做事端" + ctx.channel().remoteAddress() + "的:" + byteBuf.toString(CharsetUtil.UTF_8)); }}
4.6 测试
先启动做事端,再启动客户端,就可以看到结果:
MyServer打印结果:
MyClient打印结果:
五、Netty的特性与主要组件
5.1 taskQueue任务行列步队
如果Handler处理器有一些永劫光的业务处理,可以交给taskQueue异步处理。怎么用呢,请看代码演示:
public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取到线程池eventLoop,添加线程,实行 ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { //永劫光操作,不至于永劫光的业务操作导致Handler壅塞 Thread.sleep(1000); System.out.println("永劫光的业务处理"); } catch (Exception e) { e.printStackTrace(); } } }); }}
我们打一个debug调试,是可以看到添加进去的taskQueue有一个任务。
5.2 scheduleTaskQueue延时任务行列步队
延时任务行列步队和上面先容的任务行列步队非常相似,只是多了一个可延迟一定韶光再实行的设置,请看代码演示:
ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { //永劫光操作,不至于永劫光的业务操作导致Handler壅塞 Thread.sleep(1000); System.out.println("永劫光的业务处理"); } catch (Exception e) { e.printStackTrace(); } }},5, TimeUnit.SECONDS);//5秒后实行
依然打开debug进行调试查看,我们可以有一个scheduleTaskQueue任务待实行中
5.3 Future异步机制
在搭建HelloWord工程的时候,我们看到有一行这样的代码:
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);
很多操作都返回这个ChannelFuture工具,究竟这个ChannelFuture工具是用来做什么的呢?
ChannelFuture供应操作完成时一种异步关照的办法。一样平常在Socket编程中,等待相应结果都是同步壅塞的,而Netty则不会造成壅塞,由于ChannelFuture是采纳类似不雅观察者模式的形式进行获取结果。请看一段代码演示:
//添加监听器channelFuture.addListener(new ChannelFutureListener() { //利用匿名内部类,ChannelFutureListener接口 //重写operationComplete方法 @Override public void operationComplete(ChannelFuture future) throws Exception { //判断是否操作成功 if (future.isSuccess()) { System.out.println("连接成功"); } else { System.out.println("连接失落败"); } }});
5.4 Bootstrap与ServerBootStrap
Bootstrap和ServerBootStrap是Netty供应的一个创建客户端和做事端启动器的工厂类,利用这个工厂类非常便利地创建启动类,根据上面的一些例子,实在也看得出来能大大地减少了开拓的难度。首先看一个类图:
可以看出都是继续于AbstractBootStrap抽象类,以是大致上的配置方法都相同。
一样平常来说,利用Bootstrap创建启动器的步骤可分为以下几步:
5.4.1 group()
在上一篇文章《Reactor模式》中,我们就讲过做事端要利用两个线程组:
bossGroup 用于监听客户端连接,专门卖力与客户端创建连接,并把连接注册到workerGroup的Selector中。workerGroup用于处理每一个连接发生的读写事宜。一样平常创建线程组直策应用以下new就完事了:
EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();
有点好奇的是,既然是线程组,那线程数默认是多少呢?深入源码:
//利用一个常量保存 private static final int DEFAULT_EVENT_LOOP_THREADS; static { //NettyRuntime.availableProcessors() 2,cpu核数的两倍赋值给常量 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { //如果不传入,则利用常量的值,也便是cpu核数的两倍 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
通过源码可以看到,默认的线程数是cpu核数的两倍。假设想自定义线程数,可以利用有参布局器:
//设置bossGroup线程数为1EventLoopGroup bossGroup = new NioEventLoopGroup(1);//设置workerGroup线程数为16EventLoopGroup workerGroup = new NioEventLoopGroup(16);
5.4.2 channel()
这个方法用于设置通道类型,当建立连接后,会根据这个设置创建对应的Channel实例。
利用debug模式可以看到
通道类型有以下:
NioSocketChannel: 异步非壅塞的客户端 TCP Socket 连接。
NioServerSocketChannel: 异步非壅塞的做事器端 TCP Socket 连接。
常用的便是这两个通道类型,由于是异步非壅塞的。所以是首选。
OioSocketChannel: 同步壅塞的客户端 TCP Socket 连接。
OioServerSocketChannel: 同步壅塞的做事器端 TCP Socket 连接。
轻微在本地调试过,用起来和Nio有一些不同,是壅塞的,以是API调用也不一样。由于是壅塞的IO,险些没什么人会选择利用Oio,以是也很难找到例子。我轻微琢磨了一下,经由几次报错之后,总算调通了。代码如下:
//server端代码,跟上面险些一样,只需改三个地方//这个地方利用的是OioEventLoopGroupEventLoopGroup bossGroup = new OioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup)//只须要设置一个线程组boosGroup .channel(OioServerSocketChannel.class)//设置做事端通道实现类型//client端代码,只需改两个地方//利用的是OioEventLoopGroupEventLoopGroup eventExecutors = new OioEventLoopGroup();//通道类型设置为OioSocketChannelbootstrap.group(eventExecutors)//设置线程组 .channel(OioSocketChannel.class)//设置客户真个通道实现类型
NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流掌握传输协议)连接。
NioSctpServerChannel: 异步的 Sctp 做事器端连接。
本地没启动成功,网上看了一些网友的评论,说是只能在linux环境下才可以启动。从报错信息看:SCTP not supported on this platform,不支持这个平台。由于我电脑是window系统,以是网友说的有点道理。
5.4.3 option()与childOption()
首先说一下这两个的差异。
option()设置的是做事端用于吸收进来的连接,也便是boosGroup线程。
childOption()是供应给父管道吸收到的连接,也便是workerGroup线程。
搞清楚了之后,我们看一下常用的一些设置有哪些:
SocketChannel参数,也便是childOption()常用的参数:
SO_RCVBUF Socket参数,TCP数据吸收缓冲区大小。TCP_NODELAY TCP参数,立即发送数据,默认值为Ture。SO_KEEPALIVE Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。
ServerSocketChannel参数,也便是option()常用参数:
SO_BACKLOG Socket参数,做事端接管连接的行列步队长度,如果行列步队已满,客户端连接将被谢绝。默认值,Windows为200,其他为128。
由于篇幅限定,其他就不列举了,大家可以去网上找资料看看,理解一下。
5.4.4 设置流水线(重点)
ChannelPipeline是Netty处理要求的任务链,ChannelHandler则是详细处理要求的处理器。实际上每一个channel都有一个处理器的流水线。
在Bootstrap中childHandler()方法须要初始化通道,实例化一个ChannelInitializer,这时候须要重写initChannel()初始化通道的方法,装置流水线便是在这个地方进行。代码演示如下:
//利用匿名内部类的形式初始化通道工具bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //给pipeline管道设置自定义的处理器 socketChannel.pipeline().addLast(new MyServerHandler()); }});
处理器Handler紧张分为两种:
ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)
入站指的是数据从底层java NIO Channel到Netty的Channel。
出站指的是通过Netty的Channel来操作底层的java NIO Channel。
ChannelInboundHandlerAdapter处理器常用的事宜有:
注册事宜 fireChannelRegistered。连接建立事宜 fireChannelActive。读事宜和读完成事宜 fireChannelRead、fireChannelReadComplete。非常关照事宜 fireExceptionCaught。用户自定义事宜 fireUserEventTriggered。Channel 可写状态变革事宜 fireChannelWritabilityChanged。连接关闭事宜 fireChannelInactive。ChannelOutboundHandler处理器常用的事宜有:
端口绑定 bind。连接做事端 connect。写事宜 write。刷新韶光 flush。读事宜 read。主动断开连接 disconnect。关闭 channel 事宜 close。还有一个类似的handler(),紧张用于装置parent通道,也便是bossGroup线程。一样平常情形下,都用不上这个方法。
5.4.5 bind()
供应用于做事端或者客户端绑定做事器地址和端口号,默认是异步启动。如果加上sync()方法则是同步。
有五个同名的重载方法,浸染都是用于绑定地址端口号。不一一先容了。
5.4.6 优雅地关闭EventLoopGroup
//开释掉所有的资源,包括创建的线程bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();
会关闭所有的child Channel。关闭之后,开释掉底层的资源。
5.5 Channel
Channel是什么?不妨看一下官方文档的解释:
A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind
翻译大意:一种连接到网络套接字或能进行读、写、连接和绑定等I/O操作的组件。
如果上面这段解释比较抽象,下面还有一段解释:
A channel provides a user:
the current state of the channel (e.g. is it open? is it connected?),the configuration parameters of the channel (e.g. receive buffer size),the I/O operations that the channel supports (e.g. read, write, connect, and bind), andthe ChannelPipeline which handles all I/O events and requests associated with the channel.
翻译大意:
channel为用户供应:
通道当前的状态(例如它是打开?还是已连接?)channel的配置参数(例如吸收缓冲区的大小)channel支持的IO操作(例如读、写、连接和绑定),以及处理与channel干系联的所有IO事宜和要求的ChannelPipeline。5.5.1 获取channel的状态
boolean isOpen(); //如果通道打开,则返回trueboolean isRegistered();//如果通道注册到EventLoop,则返回trueboolean isActive();//如果通道处于活动状态并且已连接,则返回trueboolean isWritable();//当且仅当I/O线程将立即实行要求的写入操作时,返回true。
以上便是获取channel的四种状态的方法。
5.5.2 获取channel的配置参数
获取单条配置信息,利用getOption(),代码演示:
ChannelConfig config = channel.config();//获取配置参数//获取ChannelOption.SO_BACKLOG参数,Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);//由于我启动器配置的是128,以是我这里获取的soBackLogConfig=128
获取多条配置信息,利用getOptions(),代码演示:
ChannelConfig config = channel.config();Map<ChannelOption<?>, Object> options = config.getOptions();for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) { System.out.println(entry.getKey() + " : " + entry.getValue());}/SO_REUSEADDR : falseWRITE_BUFFER_LOW_WATER_MARK : 32768WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)SO_BACKLOG : 128以下省略.../
5.5.3 channel支持的IO操作
写操作,这里演示从做事端写发送到客户端:
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));}
客户端掌握台:
//收到做事端/127.0.0.1:6666的:这波啊,这波是肉蛋葱鸡~
连接操作,代码演示:
ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//一样平常利用启动器,这种办法不常用
通过channel获取ChannelPipeline,并做干系的处理:
//获取ChannelPipeline工具ChannelPipeline pipeline = ctx.channel().pipeline();//往pipeline中添加ChannelHandler处理器,装置流水线pipeline.addLast(new MyServerHandler());
5.6 Selector
在NioEventLoop中,有一个成员变量selector,这是nio包的Selector,在之前《NIO入门》中,我已经讲过Selector了。
Netty中的Selector也和NIO的Selector是一样的,便是用于监听事宜,管理注册到Selector中的channel,实现多路复用器。
5.7 PiPeline与ChannelPipeline
在前面先容Channel时,我们知道可以在channel中装置ChannelHandler流水线处理器,那一个channel不可能只有一个channelHandler处理器,肯定是有很多的,既然是很多channelHandler在一个流水线事情,肯定是有顺序的。
于是pipeline就涌现了,pipeline相称于处理器的容器。初始化channel时,把channelHandler按顺序装在pipeline中,就可以实现按序实行channelHandler了。
在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被创建的时候创建。ChannelPipeline包含了一个ChannelHander形成的列表,且所有ChannelHandler都会注册到ChannelPipeline中。
5.8 ChannelHandlerContext
在Netty中,Handler处理器是有我们定义的,上面讲过通过集成入站处理器或者出站处理器实现。这时如果我们想在Handler中获取pipeline工具,或者channel工具,怎么获取呢。
于是Netty设计了这个ChannelHandlerContext高下文工具,就可以拿到channel、pipeline等工具,就可以进行读写等操作。
通过类图,ChannelHandlerContext是一个接口,下面有三个实现类。
实际上ChannelHandlerContext在pipeline中是一个链表的形式。看一段源码就明白了:
//ChannelPipeline实现类DefaultChannelPipeline的布局器方法protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); //设置头结点head,尾结点tail tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head;}
下面我用一张图来表示,会更加清晰一点:
5.9 EventLoopGroup
我们先看一下EventLoopGroup的类图:
个中包括了常用的实现类NioEventLoopGroup。OioEventLoopGroup在前面的例子中也有利用过。
从Netty的架构图中,可以知道做事器是须要两个线程组进行合营事情的,而这个线程组的接口便是EventLoopGroup。
每个EventLoopGroup里包括一个或多个EventLoop,每个EventLoop中掩护一个Selector实例。
5.9.1 轮询机制的实现事理
我们不妨看一段DefaultEventExecutorChooserFactory的源码:
private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;@Overridepublic EventExecutor next() { //idx.getAndIncrement()相称于idx++,然后对任务长度取模 return executors[idx.getAndIncrement() & executors.length - 1];}
这段代码可以确定实行的办法是轮询机制,接下来debug调试一下:
它这里还有一个判断,如果线程数不是2的N次方,则采取取模算法实现。
@Overridepublic EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)];}
能力有限,如果有什么缺点或者不当之处,请大家批评示正,一起学习互换!
本文为阿里云原创内容,未经许可不得转载。