《Netty权威指南》
一、异步和事件驱动
1.Java网络编程
- 阻塞I/O -- socket
- 非阻塞I/O -- NIO
2.Netty简介
代码清单 1-3 展示了一个 Netty所做的是事情和很好的例子。 这里,connect()方法将会直接返回,而不会阻塞,该调用将会在后台完成。这究竟什么时候会发生 则取决于若干的因素,但这个关注点已经从代码中抽象出来了。因为线程不用阻塞以等待对应的 操作完成,所以它可以同时做其他的工作,从而更加有效地利用资源。
代码清单 1-3 异步地建立连接
Channel channel = ...; // Does not block ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));
1. 导读
在这一章中,我们介绍了 Netty 框架的背景知识,包括 Java 网络编程 API 的演变过程,阻塞 和非阻塞网络操作之间的区别,以及异步 I/O 在高容量、高性能的网络编程中的优势。
然后,我们概述了 Netty 的特性、设计和优点,其中包括 Netty 异步模型的底层机制,包括 回调、Future 以及它们的结合使用。我们还谈到了事件是如何产生的以及如何拦截和处理它们。
3. Netty 的核心组件
- Channel;
- Channel 是 Java NIO 的一个基本构造。
- 它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作 1。
- 目前,可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。
- 回调;
- Future;
- Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操 作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
- JDK 预置了 interface java.util.concurrent.Future,但是其所提供的实现,只 允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以 Netty 提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用。
- ChannelFuture提供了几种额外的方法,这些方法使得我们能够注册一个或者多个 ChannelFutureListener实例。监听器的回调方法operationComplete(),将会在对应的 操作完成时被调用 1。然后监听器可以判断该操作是成功地完成了还是出错了。如果是后者,我 们可以检索产生的Throwable。简而 言之 ,由ChannelFutureListener提供的通知机制消除 了手动检查对应的操作是否完成的必要。
- 每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture;也就是说,它们都不会阻塞。 正如我们前面所提到过的一样,Netty 完全是异步和事件驱动的。
- 如果你把 ChannelFutureListener 看作是回调的一个更加精细的版本,那么你是对的。 事实上,回调和 Future 是相互补充的机制;它们相互结合,构成了 Netty 本身的关键构件块之一。
- 事件和 ChannelHandler。
- Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经 发生的事件来触发适当的动作。这些动作可能是:
- 记录日志;
- 数据转换;
- 流控制;
- 应用程序逻辑。
- Netty 是一个网络编程框架,所以事件是按照它们与入站或出站数据流的相关性进行分类的。
可能由入站数据或者相关的状态更改而触发的事件包括:
- 连接已被激活或者连接失活;
- 数据读取;
- 用户事件;
- 错误事件。
- 出站事件是未来将会触发的某个动作的操作结果,这些动作包括:
- 打开或者关闭到远程节点的连接;
- 将数据写到或者冲刷到套接字。
- 每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法。这是一个很好的 将事件驱动范式直接转换为应用程序构件块的例子。图 1-3 展示了一个事件是如何被一个这样的 ChannelHandler 链处理的。
- Netty 的 ChannelHandler 为处理器提供了基本的抽象,如图 1-3 所示的那些。我们会 在适当的时候对 ChannelHandler 进行更多的说明,但是目前你可以认为每个 Channel- Handler 的实例都类似于一种为了响应特定事件而被执行的回调。
- Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议 (如 HTTP 和 SSL/TLS)的 ChannelHandler。在内部,ChannelHandler 自己也使用了事件
和 Future,使得它们也成为了你的应用程序将使用的相同抽象的消费者。
3. 把他们放在一起
1.Future、回调和 ChannelHandler
- Netty 的异步编程模型是建立在 Future 和回调的概念之上的,而将事件派发到 ChannelHandler 的方法则发生在更深的层次上。结合在一起,这些元素就提供了一个处理环境,使你的应用程序逻 辑可以独立于任何网络操作相关的顾虑而独立地演变。这也是 Netty 的设计方式的一个关键目标。
- 拦截操作以及高速地转换入站数据和出站数据,都只需要你提供回调或者利用操作所返回的 Future。这使得链接操作变得既简单又高效,并且促进了可重用的通用代码的编写。
2.选择器、事件和 EventLoop
- Netty 通过触发事件将 Selector 从应用程序中抽象出来,消除了所有本来将需要手动编写 的派发代码。在内部,将会为每个 Channel 分配一个 EventLoop,用以处理所有事件,包括:
- 注册感兴趣的事件;
- 将事件派发给 ChannelHandler;
- 安排进一步的动作。
- EventLoop 本身只由一个线程驱动,其处理了一个 Channel 的所有 I/O 事件,并且在该
EventLoop 的整个生命周期内都不会改变。
- 这个简单而强大的设计消除了你可能有的在ChannelHandler 实现中需要进行同步的任何顾虑,
因此,你可以专注于提供正确的逻辑,用来在有感兴趣的数据要处理的时候执行。如同我们在详
细探讨 Netty 的线程模型时将会看到的,该 API 是简单而紧凑的。
二、第一款Netty应用程序
1.编写 Echo 服务器
- 所有的 Netty 服务器都需要以下两部分。
- 至少一个ChannelHandler — 服务器对从客户端接收的数据的处理,即它的业务逻辑。
- 引导 — 这是配置服务器的启动代码。eg: 将服务器绑定到它要监听连接请求的端口上。
- 在第 1 章中,我们介绍了 Future 和回调,并且阐述了它们在事件驱动设计中的应用。我们 还讨论了 ChannelHandler,它是一个接口族的父接口,它的实现负责接收并响应事件通知。 在 Netty 应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。
- 因为你的 Echo 服务器会响应传入的消息,所以它需要实现 ChannelInboundHandler 接口,用 来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承 Channel- InboundHandlerAdapter 类也就足够了,它提供了 ChannelInboundHandler 的默认实现。
我们感兴趣的方法是:
channelRead()— 对于每个传入的消息都要调用;
channelReadComplete()— 通知ChannelInboundHandler最后一次对channel-
Read()的调用是当前批量读取中的最后一条消息;
exceptionCaught()— 在读取操作期间,有异常抛出时会调用。
该 Echo 服务器的 ChannelHandler 实现是 EchoServerHandler,如代码清单 2-1 所示。
/** * 代码清单 2-1 */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); // 将接收到的消息 写给发送者,而 不冲刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { // 将未决消息冲刷到 远程节点,并且关 闭该 Channel,释放消息 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 打印异常 栈跟踪并关闭该Channel cause.printStackTrace(); ctx.close(); } }
除了 ChannelInboundHandlerAdapter 之外,还有很多需要学习的 ChannelHandler 的 子类型和实现,我们将在第 6 章和第 7 章中对它们进行详细的阐述。目前,请记住下面这些关键点:
针对不同类型的事件来调用 ChannelHandler; 应用程序通过实现或者扩展 ChannelHandler 来挂钩到事件的生命周期,并且提供自
定义的应用程序逻辑; 在架构上,ChannelHandler 有助于保持业务逻辑与网络处理代码的分离。这简化了开
发过程,因为代码必须不断地演化以响应不断变化的需求。
2. 引导服务器
在讨论过由 EchoServerHandler 实现的核心业务逻辑之后,我们现在可以探讨引导服务 器本身的过程了,具体涉及以下内容:
- 绑定到服务器端口, 监听并接受传入连接请求;
- 配置 Channel,以将有关的入站消息通知给 EchoServerHandler 实例。
/** * 代码清单2-2 EchoServer类 */ public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println( "Usage: " + EchoServer.class.getSimpleName() + " <port>"); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); // 使用的是 NIO 传输,所以指定 了NioEventLoopGroup来接受和处理新的连接 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 指定所使用的 NIO 传输 Channel .channel(NioServerSocketChannel.class) //使用指定的 端口设置套 接字地址 .localAddress(new InetSocketAddress(port)) //使用了一个特殊的类——ChannelInitializer。这是关键。当一个新的连接被接收时, // 一个新的子 Channel 将会被创建,而 ChannelInitializer 将会把一个你的 // EchoServerHandler 的实例添加到该 Channel 的 ChannelPipeline 中。 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); //异步地绑定服务器; 调用 sync()方法阻塞 等待直到绑定完成 ChannelFuture f = b.bind().sync(); //获取 Channel 的 CloseFuture,并 且阻塞当前线 程直到它完成 f.channel().closeFuture().sync(); } finally { // 关闭 EventLoopGroup, 释放所有的资源 group.shutdownGracefully().sync(); } } }
在这个时候,服务器已经初始化,并且已经就绪能被使用了。这个示例使用了 NIO,因为得益于它的可扩展性和彻底的异步性,它是目前使用最广泛的传 输。但是也可以使用一个不同的传输实现。如果你想要在自己的服务器中使用 OIO 传输,将需 要指定 OioServerSocketChannel 和 OioEventLoopGroup。
让我们回顾一下你刚完成的服务器实现中的重要步骤。下面这些是服务器的主要代码组件:
- EchoServerHandler 实现了业务逻辑;
- main()方法引导了服务器;
引导过程中所需要的步骤如下:
- 创建一个 ServerBootstrap 的实例以引导和绑定服务器;
- 创建并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/写数据;
- 指定服务器绑定的本地的 InetSocketAddress;
- 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;
- 调用 ServerBootstrap.bind()方法以绑定服务器。
3.编写 Echo 客户端
Echo 客户端将会:
(1)连接到服务器;
(2)发送一个或者多个消息; (3)对于每个消息,等待并接收从服务器发回的相同的消息; (4)关闭连接。
编写客户端所涉及的两个主要代码部分也是业务逻辑和引导,和你在服务器中看到的一样。
如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler。在这 个场景 下,你将扩展 SimpleChannelInboundHandler 类以处理所有必须的任务,如代码清单 2-3 所示。这要求重写下面的方法:
channelActive()——在到服务器的连接已经建立之后将被调用; channelRead0()1——当从服务器接收到一条消息时被调用; exceptionCaught()——在处理过程中引发异常时被调用。
代码清单 2-3 客户端的 ChannelHandler
4 引导客户端
如同将在代码清单 2-4 中所看到的,引导客户端类似于引导服务器,不同的是,客户端是使 用主机和端口参数来连接远程地址
/** * 代码清单 2-4 客户端的主类 */ public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) // 设置服务器的InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println( "Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }
让我们回顾一下这一节中所介绍的要点:
为初始化客户端,创建了一个 Bootstrap 实例;
为进行事件处理分配了一个 NioEventLoopGroup 实例,其中事件处理包括创建新的 连接以及处理入站和出站数据;
为服务器连接创建了一个 InetSocketAddress 实例;
当连接被建立时,一个 EchoClientHandler 实例会被安装到(该 Channel 的)
ChannelPipeline 中;
在一切都设置完成后,调用 Bootstrap.connect()方法连接到远程节点; 完成了客户端,你便可以着手构建并测试该系统了。
三、Netty3组件和设计 本章主要内容
Netty 的技术和体系结构方面的内容
Channel、EventLoop 和 ChannelFuture
ChannelHandler 和 ChannelPipeline
引导
- 我们将从两个不同的但却又密切相 关的视角来探讨 Netty: 类库的视角以及框架的视角。对于使用 Netty 编写高效的、可重用的和 可维护的代码来说,两者缺一不可。
- 从高层次的角度来看,Netty 解决了两个相应的关注领域,我们可将其大致标记为技术的和 体系结构的。首先,它的基于 Java NIO 的异步的和事件驱动的实现,保证了高负载下应用程序 性能的最大化和可伸缩性。其次,Netty 也包含了一组设计模式,将应用程序逻辑从网络层解耦, 简化了开发过程,同时也最大限度地提高了可测试性、模块化以及代码的可重用性。
- 在我们更加详细地研究 Netty 的各个组件时,我们将密切关注它们是如何协作来支撑这 些体系结构上的最佳实践的。通过遵循同样的原则,我们便可获得 Netty 所提供的所有益处。
3.1 Channel、EventLoop 和 ChannelFuture
这些类合在一起,可以被认为是 Netty 网络抽象的代表:
Channel— Socket;
EventLoop— 控制流、多线程处理、并发;
ChannelFuture— 异步通知。
- Channel
- 基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提 供的原语。
- 在基于 Java 的网络编程中,其基本的构造是 class Socket。Netty 的 Channel 接 口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
- 此外,Channel 也是拥有许多 预定义的、专门化实现的广泛类层次结构的根
- EventLoop
- EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。
目前,图 3-1 在高层次上说明了 Channel、EventLoop、Thread 以及 EventLoopGroup
之间的关系。
一个 EventLoopGroup 包含一个或者多个 EventLoop;
一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
一个 Channel 在它的生命周期内只注册于一个 EventLoop;
一个 EventLoop 可能会被分配给一个或多个 Channel。
注意,在这种设计中,一个给定Channel 的 I/O 操作都是由相同的 Thread 执行的,实际
上消除了对于同步的需要。
- ChannelFuture
正如我们已经解释过的那样,Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会
立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了
ChannelFuture接口,其addListener()方法注册了一个ChannelFutureListener,以
便在某个操作完成时(无论是否成功)得到通知。
关于 ChannelFuture 的更多讨论 可以将 ChannelFuture 看作是将来要执行的操作的结果的 占位符。它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确地预测,但是可以肯
定的是它将会被执行。此外,所有属于同一个 Channel 的操作都被保证其将以它们被调用的顺序
被执行。
3.2 ChannelHandler 和 ChannelPipeline
- ChannelHandler 接口
- 从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。
- 因为 ChannelHandler 的方法是 由网络事件(其中术语“事件”的使用非常广泛)触发的。
- 事实上,ChannelHandler 可专 门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,或者处理转换过程 中所抛出的异常。
- 举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处 理。当你要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。你 的应用程序的业务逻辑通常驻留在一个或者多个 ChannelInboundHandler 中。
- ChannelPipeline 接口
- 使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初 始化或者引导阶段被安装的。
- 这些对象接收事件、执行它们所实现的处理逻辑,并将数据传递给 链中的下一个 ChannelHandler。
- 它们的执行顺序是由它们被添加的顺序所决定的。实际上, 被我们称为 ChannelPipeline 的是这些 ChannelHandler 的编排顺序。
- 图 3-3 说明了一个 Netty 应用程序中入站和出站数据流之间的区别。从一个客户端应用程序 的角度来看,如果事件的运动方向是从客户端到服务器端,那么我们称这些事件为出站的,反之 则称为入站的。
- 图 3-3 也显示了入站和出站 ChannelHandler 可以被安装到同一个 ChannelPipeline 中。
- 如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部 开始流动,并被传递给第一个 ChannelInboundHandler。这个 ChannelHandler 不一定 会实际地修改数据,具体取决于它的具体功能,在这之后,数据将会被传递给链中的下一个 ChannelInboundHandler。最终,数据将会到达 ChannelPipeline 的尾端,届时,所有 处理就都结束了。
- 数据的出站运动(即正在被写的数据)在概念上也是一样的。
关于入站和出站 ChannelHandler 的更多讨论
通过使用作为参数传递到每个方法的 ChannelHandlerContext,事件可以被传递给当前
ChannelHandler 链中的下一个 ChannelHandler。因为你有时会忽略那些不感兴趣的事件,所以 Netty 提供了抽象基类 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。通过调 用 ChannelHandlerContext 上的对应方法,每个都提供了简单地将事件传递给下一个 ChannelHandler 的方法的实现。随后,你可以通过重写你所感兴趣的那些方法来扩展这些类。
在Netty中,有两种发送消息的方式。
- 你可以直接写到Channel中,
- 也可以 写到和Channel- Handler 相关联的 ChannelHandlerContext 对象中。
前一种方式将会导致消息从 Channel- Pipeline 的尾端开始流动,而后者将导致消息从 ChannelPipeline 中的下一个 Channel- Handler 开始流动。
3. 更加深入地了解 ChannelHandler
正如我们之前所说的,有许多不同类型的 ChannelHandler,它们各自的功能主要取决于 它们的超类。Netty 以适配器类的形式提供了大量默认的 ChannelHandler 实现,其旨在简化 应用程序处理逻辑的开发过程。你已经看到了,ChannelPipeline 中的每个 ChannelHandler 将负责把事件转发到链中的下一个 ChannelHandler。这些适配器类(及它们的子类)将自动 执行这个操作,所以你可以只重写那些你想要特殊处理的方法和事件。
为什么需要适配器类
有一些适配器类可以将编写自定义的 ChannelHandler 所需要的努力降到最低限度,因为它们提 供了定义在对应接口中的所有方法的默认实现。
下面这些是编写自定义 ChannelHandler 时经常会用到的适配器类:
ChannelHandlerAdapter
ChannelInboundHandlerAdapter
ChannelOutboundHandlerAdapter
ChannelDuplexHandler
4.编码器和解码器
当你通过 Netty 发送或者接收一个消息的时候,就将会发生一次数据转换。入站消息会被解 码;也就是说,从字节转换为另一种格式,通常是一个 Java 对象。
如果是出站消息,则会发生 相反方向的转换:它将从它的当前格式被编码为字节。这两种方向的转换的原因很简单:网络数 据总是一系列的字节。
对应于特定的需要,Netty为编码器和解码器提供了不同类型的抽象类。例如,你的应用程序可能使用了一种中间格式,而不需要立即将消息转换成字节。你将仍然需要一个编码器,但是 它将派生自一个不同的超类。为了确定合适的编码器类型,你可以应用一个简单的命名约定。
通常来说,这些基类的名称将类似于 ByteToMessageDecoder 或 MessageToByte- Encoder。对于特殊的类型,你可能会发现类似于 ProtobufEncoder 和 ProtobufDecoder 这样的名称——预置的用来支持 Google 的 Protocol Buffers。
5.抽象类 SimpleChannelInboundHandler
在这种类型的 ChannelHandler 中,最重要的方法是 channelRead0(Channel- HandlerContext,T)。除了要求不要阻塞当前的 I/O 线程之外,其具体实现完全取决于你。
3.3 引导
- Netty 的引导类为应用程序的网络层配置提供了容器,这涉及将一个进程绑定到某个指定的 端口,或者将一个进程连接到另一个运行在某个指定主机的指定端口上的进程。
- 通常来说,我们把前面的用例称作引导一个服务器,后面的用例称作引导一个客户端。虽然 这个术语简单方便,但是它略微掩盖了一个重要的事实,即“服务器”和“客户端”实际上表示 了不同的网络行为; 换句话说,是监听传入的连接还是建立到一个或者多个进程的连接。
- 因此,有两种类型的引导: 一种用于客户端(简单地称为 Bootstrap),而另一种 (ServerBootstrap)用于服务器。
表 3-1 比较了这两种 类型的引导类。
引导一个客户端只需要一个 EventLoopGroup,但是一个 ServerBootstrap 则需要两个(也可以是同一个实例)。为什么呢?
因为服务器需要两组不同的 Channel。
- 第一组将只包含一个 ServerChannel,代表服务 器自身的已绑定到某个本地端口的正在监听的套接字。
- 而第二组将包含所有已创建的用来处理传 入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel。
- 图 3-4 说明了这个模 型,并且展示了为何需要两个不同的 EventLoopGroup。
与 ServerChannel 相关联的 EventLoopGroup 将负责分配一个为连接请求创建 Channel 的 EventLoop。一旦连接被接受,第二个 EventLoopGroup 就会给它的 Channel 分配一个 EventLoop。
四、传输
在本章中,我们将研究:
1.Netty传输、它们的实现和使用,以及 Netty 是如何将它们呈现给开发者的。
2.深入探讨了 Netty 预置的传输,并且解释了它们的行为。
3.如何匹配不同的传输和特定用例的需求。
本章主要内容
OIO——阻塞传输
NIO——异步传输
Local——JVM 内部的异步通信
Embedded——测试你的ChannelHandler
流经网络的数据总是具有相同的类型:字节。这些字节是如何流动的主要取决于我们所说的 网络传输— 一个帮助我们抽象底层数据传输机制的概念。用户并不关心这些细节;他们只想确 保他们的字节被可靠地发送和接收。
如果你有 Java 网络编程的经验,那么你可能已经发现,在某些时候,你需要支撑比预期多 很多的并发连接。如果你随后尝试从阻塞传输切换到非阻塞传输,那么你可能会因为这两种网络 API 的截然不同而遇到问题。
然而,Netty 为它所有的传输实现提供了一个通用 API,这使得这种转换比你直接使用 JDK 所能够达到的简单得多。所产生的代码不会被实现的细节所污染,而你也不需要在你的整个代码 库上进行广泛的重构。简而言之,你可以将时间花在其他更有成效的事情上。
4.1 案例研究:传输迁移
1.不通过 Netty 使用 OIO 和 NIO
/** * 代码清单 4-1 未使用 Netty 的阻塞网络编程 * * @author xuxh * @date 2021/03/07 11:27 */ public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { for (; ; ) { final Socket clientSocket = socket.accept(); System.out.println( "Accepted connection from " + clientSocket); new Thread(new Runnable() { @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes( Charset.forName("UTF-8"))); out.flush(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); } } catch ( IOException e) { e.printStackTrace(); } } }
/** * 代码清单 4-2 未使用 Netty 的异步网络编程 * * @author xuxh * @date 2021/03/07 11:32 */ public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ssocket = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ssocket.bind(address); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (; ; ) { try { // 等待需要处理的新事 件;阻塞 将一直持续到 下一个传入事件 selector.select(); } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } // 获取所有接 收事件的 Selection- Key 实例 Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { // 检查事件是否是一 个新的已经就绪可 以被接受的连接 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); // 接受客户端,并将它注册到选择器 client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println("Accepted connection from " + client); } // 检查套接字是否已经准备好写数据 if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { // 将数据写到已连接的客户端 if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // ignore on close } } } } } }
如同你所看到的,虽然这段代码所做的事情与之前的版本完全相同,但是代码却截然不同。 如果为了用于非阻塞 I/O 而重新实现这个简单的应用程序,都需要一次完全的重写的话,那么不 难想象,移植真正复杂的应用程序需要付出什么样的努力。
鉴于此,让我们来看看使用 Netty 实现该应用程序将会是什么样子吧。
4.1.2 通过 Netty 使用 OIO 和 NIO
/** * 代码清单 4-3 使用 Netty 的阻塞网络处理 * * @author xuxh * @date 2021/03/07 21:15 */ public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); EventLoopGroup group = new OioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 OioEventLoopGroup 以允许阻塞模式 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,对于 每个已接受的 连接都调用它 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一个 Channel- InboundHandler- Adapter 以拦截和 处理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息写到客户端,并添 加 ChannelFutureListener, 以便消息一被写完就关闭 连接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定服务器 以接受连接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 释放所有的资源 group.shutdownGracefully().sync(); } } }
/** * 代码清单 4-4 使用 Netty 的异步网络处理 * * @author xuxh * @date 2021/03/07 21:40 */ public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8); // 使用的是 NIO 传输,所以指定 了NioEventLoopGroup来接受和处理新的连接 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建 ServerBootstrap ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用 NioEventLoopGroup 非阻塞模式 .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) // 指定 Channel- Initializer,对于 每个已接受的 连接都调用它 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( // 添加一个 Channel- InboundHandler- Adapter 以拦截和 处理事件 new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 将消息写到客户端,并添 加 ChannelFutureListener, 以便消息一被写完就关闭 连接 ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); //绑定服务器 以接受连接 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 释放所有的资源 group.shutdownGracefully().sync(); } } }
因为 Netty 为每种传输的实现都暴露了相同的 API,所以无论选用哪一种传输的实现,你的 代码都仍然几乎不受影响。在所有的情况下,传
4.2 传输 API
传输 API 的核心是 interface Channel,它被用于所有的 I/O 操作。Channel 类的层次结构如图 4-1 所示。
如图所示,每个 Channel 都将会被分配一个 ChannelPipeline 和 ChannelConfig。
- ChannelConfig 包含了该 Channel 的所有配置设置,并且支持热更新。由于特定的传输可能具有独特的设置,所以它可能会实现一个 ChannelConfig 的子类型。(请参考 ChannelConfig 实现对应的 Javadoc。)
- ChannelPipeline 持有所有应用于入站数据和出站数据以及事件的 ChannelHandler 实 例,这些 ChannelHandler 实现了应用程序处理状态变化以及数据处理的逻辑。
- 由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang. Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那 么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个 Error。
ChannelHandler 的典型用途包括:
将数据从一种格式转换为另一种格式;
提供异常的通知;
提供 Channel 变为活动的或者非活动的通知;
提供当 Channel 注册到 EventLoop 或者从 EventLoop 注销时的通知;
提供有关用户自定义事件的通知。
你也可以根据需要通过添加或者移除ChannelHandler实例来修改ChannelPipeline。通过利用Netty的这项能力可以构建出高度灵活的应用程序。
除了访问所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法,其中最重要的列举在表 4-1 中。
稍后我们将进一步深入地讨论所有这些特性的应用。目前,请记住,Netty 所提供的广泛功 能只依赖于少量的接口。这意味着,你可以对你的应用程序逻辑进行重大的修改,而又无需大规 模地重构你的代码库。
Netty 的 Channel 实现是线程安全的,因此你可以存储一个到 Channel 的引用,并且每当 你需要向远程节点写数据时,都可以使用它,即使当时许多线程都在使用它。
4.3 内置的传输
Netty 内置了一些可开箱即用的传输。因为并不是它们所有的传输都支持每一种协议,所以 你必须选择一个和你的应用程序所使用的协议相容的传输。
在本节中我们将讨论这些关系。表 4-2 显示了所有 Netty 提供的传输。
4.3.1 NIO——非阻塞 I/O
NIO 提供了一个所有 I/O 操作的全异步的实现。它利用了自 NIO 子系统被引入 JDK 1.4 时便 可用的基于选择器的 API。
选择器背后的基本概念是充当一个注册表,,当 Channel 的状态发生变化时, 在选择器可以得到通知。
Channel可能的状态变化有:
新的 Channel 已被接受并且就绪;
Channel 连接已经完成;
Channel 有已经就绪的可供读取的数据;
Channel 可用于写数据。
选择器运行在一个检查状态变化并对其做出相应响应的线程上,在应用程序对状态的变化做出响应之后,选择器将会被重置,并将重复这个过程。
表 4-3 中的常量值代表了由 class java.nio.channels.SelectionKey 定义的位模式。这些位模式可以组合起来定义一组应用程序正在请求通知的状态变化集。
对于所有传输都共有的用户级别 API ,Netty完全地隐藏了这些 NIO 的内部细节。 图 4-2 展示了该处理流程。
零拷贝
零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll 传输时才可使用的特性。它使你可以快速 高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,在像 FTP 或者 HTTP 这样的协议中可以显著地提升性能。但是,并不是所有的操作系统都支持这一特性。特别地,它对于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。
4.3.2 Epoll — 用于 Linux 的本地非阻塞传输4.3.2 Epoll— 用于 Linux 的本地非阻塞传输
正如我们之前所说的,Netty 的 NIO 传输基于 Java 提供的异步/非阻塞网络编程的通用抽象。 虽然这保证了 Netty 的非阻塞 API 可以在任何平台上使用,但它也包含了相应的限制,因为 JDK 为了在所有系统上提供相同的功能,必须做出妥协。
Linux作为高性能网络编程的平台,其重要性与日俱增,这催生了大量先进特性的开发,其中包括epoll——一个高度可扩展的I/O事件通知特性。这个API自Linux内核版本 2.5.44(2002)被 引入,提供了比旧的POSIX select和poll系统调用 1更好的性能,同时现在也是Linux上非阻 塞网络编程的事实标准。Linux JDK NIO API使用了这些epoll调用。
Netty为Linux提供了一组NIO API,它以一种和它本身的设计更加一致的方式使用epoll,并且以一种更加轻量的方式使用中断。如果你的应用程序只运行于Linux系统,那么请考虑利用 这个版本的传输; 你将发现在高负载下它的性能要优于JDK的NIO实现。
这个传输的语义与在图 4-2 所示的完全相同,而且它的用法也是简单直接的。相关示例参照 代码清单 4-4。如果要在那个代码清单中使用 epoll 替代 NIO,只需要将 NioEventLoopGroup 替换为 EpollEventLoopGroup,并且将 NioServerSocketChannel.class 替换为 EpollServerSocketChannel.class 即可。
4.3.3 OIO — 旧的阻塞 I/O
Netty 的 OIO 传输实现代表了一种折中: 它可以通过常规的传输 API 使用,但是由于它是建立在 java.net 包的阻塞实现之上的,所以它不是异步的。但是,它仍然非常适合于某些用途。
有了这个背景,你可能会想,Netty是如何能够使用和用于异步传输相同的API来支持OIO的呢。 答案就是,Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操作完成的最大毫秒数。如果操作在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty 将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是类似于Netty这样的异步框架能够支持OIO的唯一方式。
这种方式的一个问题是,当一个SocketTimeoutException被抛出时填充栈跟踪所需要的时间,其对于性能来说代价很大。
图 4-3 说明了这个逻辑。