Java高级特性系列(三):好好说说IO其一

【引言】也是经历过多次的被鄙视的经历,逐渐了解到IO还有不同的模式(比如:BIO、NIO、AIO),不同的IO之间有什么区别?不同的IO的实现原理有什么不同?性能和使用的差异如何?种种的问题,总结一下,查阅也更便捷,对自己也是一个微模块技能的提升。


基本概念

用户空间与内核空间

  现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。
  在Linux世界,进程不能直接访问硬件设备,当进程需要访问硬件设备(比如读取磁盘文件,接收网络数据等等)时,必须由用户态模式切换至内核态模式,通过系统调用访问硬件设备。

文件描述符fd

  文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
  文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

同步和异步

  同步(synchronous)和异步(asynchronous)的概念描述的是用户线程与内核的交互方式:同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。POSIX标准对同步和异步的定义如下:

  • A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
  • An asynchronous I/O operation does not cause the requesting process to be blocked;

  按照POSIX标准的定义,本文提到的五种IO模型,只有最后一种才是真正意义上的异步IO模型,其他四种都是不同实现类型的同步IO。

阻塞和非阻塞

  阻塞和非阻塞的概念描述的是用户线程调用内核IO操作的方式:阻塞是指IO操作需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

I/O(Input/Output)

  单单从字面上理解,I=input,O=output;显而易见,这个概念是与输入输出相关的,当然,对我们码农来说,IO对应的就是针对编程过程中与硬件交互的输入和输出操作;比如学习java基础时会提到的最常见的概念:字节流和字符流;总之呢,可以认为IO=输入输出,具体是怎么输入输出的,我们会详细的来探究。
  在Linux/UNIX系统中,会将所有的外部设备都看作一个文件来看待,I/O输入操作(例如标准输入或者套接字的输入)通常包含以下两个不同的阶段:

[1] Waiting for the data to be ready - 等待数据准备好
[2] Copying the data from the kernel to the process - 将数据从内核空间的buffer拷贝到用户空间进程的buffer

  比如对于套接字(也就是socket)的输入,第一步是等待数据从网络中到达,当所等待的数据到达时,数据被复制到内核中的缓冲区。第二步则是把数据从内核缓冲区复制到应用进程的缓冲区。根据在这两个不同阶段处理的不同,可以将I/O模型划分为以下五种类型,其中前面4种IO都可以归类为synchronous IO - 同步IO;只有最后一种才是asynchronous IO - 异步IO:

  • 同步阻塞式I/O(Blocking IO):即传统的IO模型。
  • 同步非阻塞式I/O(Non-blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。注意这里所说的NIO并非Java的NIO(New IO)库。
  • IO多路复用(IO Multiplexing):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。
  • 信号驱动式I/O(Signal Driven IO):信号驱动I/O模型用得比较少,一般是通过套接字的SIGIO信号来驱动IO操作的,下图也没有单独涉及这个分类。
  • 异步I/O(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO。

IO模型详解

本章节内容参考自网络博客,感谢大牛们的分享。

  为了理解的简单一些,这里以UDP套接字中的recvfrom函数作为系统调用来说明I/O模型。recvfrom函数类似于标准的read函数,它的作用是从指定的套接字中读取数据报。recvfrom会从应用进程空间运行切换到内核空间中运行,一段时间后会再切换回来。

同步阻塞式I/O(Blocking IO)

blocking IO的特点就是在IO执行的两个阶段都被block了。

  同步阻塞式IO就是我们传统的IO,它特点从命名上即可分析,这个IO的执行过程是阻塞的(也可以理解为同步的),实际就是只要发起了请求,要么整个流程成功返回,要么中途异常返回,在数据没有准备好之前,整个流程是不可插入其他操作的,是处于一种等待状态的。这种模型是最简单的也是效率最差的一种。
  比如下图,阻塞式IO在执行时,由应用进程调用recvfrom,然后会切换到内核空间中运行,直到数据报到达且被复制到应用进程缓冲区中整个流程才算完成,才会返回。

同步非阻塞式I/O(Non-blocking IO)

nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

  非阻塞的概念,实际是和阻塞相反的,同步非阻塞IO就是在同步阻塞IO的基础上,将socket设置为NONBLOCK。当请求IO时,进程或者操作并不会阻塞在那里等待,相当于会发起一个尝试IO的请求,数据若是准备好了,则顺利完成IO,数据若是没有准备好,那么就返回一个标记(比如错误码之类的)告诉调用端,由调用端决定继续重新发起请求(一般称为轮询,应用进程会持续轮询内核,实际应用时不建议这么操作,因为会导致CPU开销过大)还是切换到其他操作上去。
  比如下图,在前两次调用recvfrom时由于数据报没准备好,因此内核马上返回一个系统调用错误。第3次调用recvfrom时,数据报已准备好,数据报被复制到应用进程的缓冲区,接着recvfrom成功返回。

IO多路复用(IO Multiplexing)

I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

  常用的select和poll函数(都是内核提供的多路分离函数)使用了I/O复用模型,比如当我们调用select函数时,将会阻塞于此函数,等待数据报套接字变为可读。当等待的多个套接字中的其中一个或者多个变得可读时,我们调用recvfrom把数据报复制到应用进程缓冲区。
  单纯从概念上看的话,和同步阻塞式IO比较,I/O复用模型好像也没什么优势,而且应用进程为了获取数据报,还得增加了一个额外的select系统调用。不过I/O复用模型的优势在于可以同时等待多个(而不只是一个)套接字描述符就绪。所谓的多路也就是这个意思。

信号驱动式I/O(Signal Driven IO)

  为了使用该I/O模型,需要开启套接字的信号驱动I/O功能,并通过sigaction系统调用安装一个信号处理函数。sigaction函数立即返回,我们的进程继续工作,即进程没有被阻塞。当数据报准备好时,内核会为该进程产生一个SIGIO信号,这样我们可以在信号处理函数中调用recvfrom读取数据报,也可以在主循环中读取数据报。无论如何处理SIGIO信号,这种模型的优势在于等待数据报到达期间不被阻塞。

异步I/O(Asynchronous IO)

asynchronous IO的特点就是在IO执行的两个阶段都不会被block。

  异步I/O模型的工作机制是,启动某个操作,并让内核在整个操作(包括等待数据和将数据从内核复制到用户空间)完成后通知应用进程。异步I/O模型与信号驱动式I/O的区别在于:信号驱动式I/O在数据报准备好时就通知应用进程,应用进程还需要将数据报从内核复制到用户进程缓冲区;而异步I/O模型则是整个操作完成才通知应用进程,应用进程在整个操作期间都不会被阻塞。

五种模型对比


  某博客上面给了个很好的比喻,个人觉得理解起来相当的生动,所以,在这里借用一下。
  话说有A,B,C,D,E五个人在钓鱼,每个人的路数都不一样。 A使用了最古老的鱼竿,所以开始钓鱼后,就一直守着,直接鱼上钩了再拉竿;B由于着急想知道有没鱼上钩,所以隔一会就看一次鱼竿看有没鱼上钩,直到看到鱼上钩后,再拉竿;C同时使用了N支鱼竿来钩鱼,然后等着,只要有其中一支鱼竿有鱼上钩,就将对应的鱼竿拉起来;D的鱼竿比较高级,当有鱼上钩后,会发出警报提示,所以D开始钓鱼后不用一直守着,一旦鱼竿发出警报,D再回来拉竿即可;E为了更省事,直接雇个佣人给他钓鱼,当佣人钓起鱼后,再通知E去取鱼即可。

I/O 多路复用

select,poll,epoll三个都是Linux的IO多路复用的机制,可以监视多个描述符的读/写等事件,一旦某个描述符就绪(一般是读或者写事件发生了),就能够将发生的事件通知给关心的应用程序去处理该事件。但本质上,select、poll、epoll本质上都是同步I/O。因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

select

1
2
3
4
5
6
7
8
9
10
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

// 以下是select实现的伪代码
while true {
select(streams[])
for i in streams[] {
if i has data
read until unavailable
}
}

  select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。
  select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
  如果没有I/O事件产生,我们的程序就会阻塞在select处。但是我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。这里我们有O(n)的无差别轮询复杂度,同时处理的流越多,每一次无差别轮询时间就越长。

poll

1
2
3
4
5
6
7
8
int poll (struct pollfd *fds, unsigned int nfds, int timeout);

// 不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events to watch */
short revents; /* returned events witnessed */
};

  pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。
  poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。

epoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int epoll_create(int size);//创建一个epoll的fd句柄,size用来告诉内核这个监听的数目一共有多大
// --------------------------------------------------------------------------------------------

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 部分定义
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

//events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
// --------------------------------------------------------------------------------------------

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
// 伪代码
while true {
active_stream[] = epoll_wait(epollfd)
for i in active_stream[] {
read or write till
}
}

  epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
  epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。(复杂度降低到了O(1)或者O(k))
  epoll是poll的一种优化,返回后不需要对所有的fd进行遍历,在内核中维持了fd的列表。select和poll是将这个内核列表维持在用户态,然后传递到内核中。与poll/select不同,epoll不再是一个单独的系统调用,而是由epoll_create / epoll_ctl / epoll_wait三个系统调用组成,后面将会看到这样做的好处。epoll在2.6以后的内核才支持。

Reactor 和 Proactor

Reactor 和 Proactor 是基于事件驱动,在网络编程中经常用到两种设计模式,不同的IO模型也用到了这两种设计模式,这里就简单说说两者的特性。

Reactor设计模式(IO多路复用)

概念

  Reactor,即反应堆(“反应”即“倒置”,“控制逆转”)。Reactor 的一般工作过程是首先在 Reactor 中注册(Reactor)感兴趣事件,并在注册时候指定某个已定义的回调函数(callback);当客户端发送请求时,在 Reactor 中会触发刚才注册的事件,并调用对应的处理函数。在这一个处理回调函数中,一般会有数据接收、处理、回复请求等操作。

角色分类

  • Handle 句柄;用来标识socket连接或是打开文件;
  • Synchronous Event Demultiplexer:同步事件多路分解器:由操作系统内核实现的一个函数;用于阻塞等待发生在句柄集合上的一个或多个事件;(如select/epoll;)
  • Event Handler:事件处理接口
  • Concrete Event HandlerA:实现应用程序所提供的特定事件处理逻辑;
  • Reactor:反应器,定义一个接口,实现以下功能:
    • 供应用程序注册和删除关注的事件句柄;
    • 运行事件循环;
    • 有就绪事件到来时,分发事件到之前注册的回调函数上处理;

时序图

  • 应用启动,将关注的事件handle注册到Reactor中;
  • 调用Reactor,进入无限事件循环,等待注册的事件到来;
  • 事件到来,select返回,Reactor将事件分发到之前注册的回调函数中处理;

关键参与者

  • 描述符(handle):由操作系统提供的资源,用于识别每一个事件,如Socket描述符、文件描述符、信号的值等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端的连接请求、数据等。事件也可以来自内部,如信号、定时器事件。
  • 同步事件多路分离器(event demultiplexer):事件的到来是随机的、异步的,无法预知程序何时收到一个客户连接请求或收到一个信号。所以程序要循环等待并处理事件,这就是事件循环。在事件循环中,等待事件一般使用I/O复用技术实现。在linux系统上一般是select、poll、epol_waitl等系统调用,用来等待一个或多个事件的发生。I/O框架库一般将各种I/O复用系统调用封装成统一的接口,称为事件多路分离器。调用者会被阻塞,直到分离器分离的描述符集上有事件发生。
  • 事件处理器(event handler):I/O框架库提供的事件处理器通常是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作,用户需要继承它来实现自己的事件处理器,即具体事件处理器。因此,事件处理器中的回调函数一般声明为虚函数,以支持用户拓展。
  • 具体的事件处理器(concrete event handler):是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。
  • Reactor 管理器(reactor):定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。 Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模 板函数来处理这个事件。

如何使用Reactor模式

Classic Service Design(传统型)


  传统的服务设计,一般是来一个请求系统都会分配一个线程去处理,粗一看似乎合情合理,但是一旦并发量上来之后,系统的支撑能力、处理能力将急剧下降;所以一般情况下,不建议使用这种简单粗暴的服务设计模式。
  当然,很多人会想到说我们可以使用线程池技术来避免不断的创建和分配新线程,但实际上线程池也并不能很好满足高并发的线程需求,当海量请求到来时,线程池中的工作线程达到饱和状态,这时可能就导致请求被抛弃、阻塞,也无法支撑客户端的业务需求。
  传统的服务设计代码实现粗略流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) {
/* ... */ }
}

static class Handler implements Runnable {
final Socket socket;

Handler(Socket s) {
socket = s;
}

public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) {
/* ... */ }
}

private byte[] process(byte[] cmd) {
/* ... */ }
}
}

  进一步思考,我们可以将一次完整的请求切分成几个小的任务,每一个小任务都是非阻塞的;对于读写操作,使用NIO对其进行读写;不同的任务将被分配到相关联的处理器上进行处理,每个处理器都是通过异步回调机制实现。这样就可以大大提高系统吞吐量,减少响应时间。而这就是下面将要说到的Reactor模式的两种服务设计方式。

Basic Reactor Design(单线程)

  单线程版的Reactor模式如下图所示。对于客户端的所有请求,都有一个专门的线程去进行处理,这个线程无限循环去监听是否有客户的请求到来,一旦收到客户端的请求,就将其分发给响应的处理器进行处理。

Reactor

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;

/**
* Reactor,首先需要创建一个Selector和一个ServerSocketChannel ,将监听的端口绑定到Channel中
* 还需要设置Channel为非阻塞,并在Selector上注册自己感兴趣的时事件,可以是连接事件,也可以是读写事件。
*/
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
/*
* Alternatively, use explicit SPI provider: SelectorProvider p =
* SelectorProvider.provider(); selector = p.openSelector();
* serverSocket = p.openServerSocketChannel();
*/

// class Reactor continued
/**
* boss线程的主要处理逻辑,它负责接收请求并安排给对应的handle处理;
* 只要当前线程不中断就会一直监听,其中selector.select()是阻塞的,
* 一旦又请求到来时,就会从selector中获取到对应的SelectionKey ,然后将其下发给后续处理程序(工作线程)进行处理。
*/
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) {
/* ... */ }
}

}

void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
}

Acceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// class Reactor continued
/**
* Acceptor也是一个线程,在其run方法中,通过判断serverSocket.accept()方法来获取SocketChannel
* 只要SocketChannel 不为空,则创建一个handler进行相应处理。
*/
class Acceptor implements Runnable { // inner
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) {
/* ... */ }
}
}
Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 一个handler就是一个线程,其中的SocketChannel 被设置成非阻塞。默认在Selector上注册了读事件并绑定到SocketChannel 上。
*/
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;

Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}

boolean inputIsComplete() {
/* ... */ }

boolean outputIsComplete() {
/* ... */ }

void process() {
/* ... */ }

// class Handler continued
public void run() {
try {
if (state == READING)
read();
else if (state == SENDING)
send();
} catch (IOException ex) {
/* ... */ }
}

void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}

void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}

Worker Thread Pools for Reactor(多线程)

Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 考虑到工作线程的复用,将工作线程设计为线程池;在handler中使用线程池来处理任务。
* Reactor分成两部分,mainReactor负责监听并accept新连接,然后将建立的socket通过多路复用器(Acceptor)分派给subReactor。
* subReactor负责多路分离已连接的socket,读写网络数据;
* 业务处理功能,其交给worker线程池完成。通常,subReactor个数上可与CPU个数等同。
*/
class Handler implements Runnable {
// uses util.concurrent thread pool
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;

// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}

synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}

class Processer implements Runnable {
public void run() {
processAndHandOff();
}
}
}

Proactor设计模式(异步IO)

概念

  从上面 Reactor 模式中,发现服务端数据的接收和发送都占用了用户状态(还有一种内核态),这样服务器的处理操作就在数据的读写上阻塞花费了时间,节省这些时间的办法是借助操作系统的异步读写;异步读写在调用的时候可以传递回调函数或者回送信号,当异步操作完毕,内核会自动调用回调函数或者发送信号。Proactor 就是这么做的,所以Proataor很依赖操作系统。

角色分类

  • Handle 句柄;用来标识socket连接或是打开文件;
  • Asynchronous Operation Processor:异步操作处理器;负责执行异步操作,一般由操作系统内核实现;
  • Asynchronous Operation:异步操作
  • Completion Event Queue:完成事件队列;异步操作完成的结果放到队列中等待后续使用
  • Proactor:主动器;为应用程序进程提供事件循环;从完成事件队列中取出异步操作的结果,分发调用相应的后续处理逻辑;
  • Completion Handler:完成事件接口;一般是由回调函数组成的接口;
  • Concrete Completion Handler:完成事件处理逻辑;实现接口定义特定的应用处理逻辑;

时序图

  • 应用程序启动,调用异步操作处理器提供的异步操作接口函数,调用之后应用程序和异步操作处理就独立运行;应用程序可以调用新的异步操作,而其它操作可以并发进行;
  • 应用程序启动Proactor主动器,进行无限的事件循环,等待完成事件到来;
  • 异步操作处理器执行异步操作,完成后将结果放入到完成事件队列;
  • 主动器从完成事件队列中取出结果,分发到相应的完成事件回调函数处理逻辑中;

参考文献



  以上两幅骚气的截图均来自Douglas C. Schmidt的Proactor说明文档,如果有时间想详细研究,可以通过谷歌或者度娘搜索细做研究。

两种模式的区别

主动和被动

  • Reactor将handle放到select(),等待可写就绪,然后调用write()写入数据;
  • 写完处理后续逻辑;Proactor调用aoi_write后立刻返回,由内核负责写操作,写完后调用相应的回调函数处理后续逻辑;
  • Reactor被动的等待指示事件的到来并做出反应;它有一个等待的过程,做什么都要先放入到监听事件集合中等待handler可用时再进行操作;
  • Proactor直接调用异步读写操作,调用完后立刻返回;

实现

  • Reactor实现了一个被动的事件分离和分发模型,服务等待请求事件的到来,再通过不受间断的同步处理事件,从而做出反应;
  • Proactor实现了一个主动的事件分离和分发模型;这种设计允许多个任务并发的执行,从而提高吞吐量;并可执行耗时长的任务(各个任务间互不影响)

优点

  • Reactor实现相对简单,对于耗时短的处理场景处理高效;
  • 操作系统可以在多个事件源上等待,并且避免了多线程编程相关的性能开销和编程复杂性;
  • 事件的串行化对应用是透明的,可以顺序的同步执行而不需要加锁;
  • 事务分离:将与应用无关的多路分解和分配机制和与应用相关的回调函数分离开来,

Proactor性能更高,能够处理耗时长的并发场景;

缺点

  • Reactor处理耗时长的操作会造成事件分发的阻塞,影响到后续事件的处理;
  • Proactor实现逻辑复杂;依赖操作系统对异步的支持,目前实现了纯异步操作的操作系统少,实现优秀的如windows IOCP,但由于其windows系统用于服务器的局限性,目前应用范围较小;而Unix/Linux系统对纯异步的支持有限,应用事件驱动的主流还是通过select/epoll来实现;

适用场景

  • Reactor:同时接收多个服务请求,并且依次同步的处理它们的事件驱动程序;
  • Proactor:异步接收和同时处理多个服务请求的事件驱动程序;
------2019 Lin.C ------