Java NIO

Java NIO属于IO多路复用模型。

IO多路复用

为了提高性能,操作系统引入了一种新的系统调用,专门用于查询IO文件描述符(含socket连接)的就绪状态。

目前支持IO多路复用技术有:

  • Linux: select、poll、epoll
  • Mac: kqueue
  • Windows: select

通过系统调用,一个用户进程/线程可以监视多个文件描述符,一旦某个描述符就绪(一般是内核缓冲区可读/可写),内核就能够将文件描述符的就绪状态返回给用户进程/线程,用户空间可以根据文件描述符的就绪状态进行相应的IO系统调用。

IO多路复用(IO Multiplexing)属于一种经典的Reactor模式实现,有时也称为异步阻塞IO,Java中的Selector属于这种模型。

发起一个多路复用IO的read操作的系统调用,流程如下:

1、选择器注册。首先,将需要read操作的目标文件描述符(socket连接)提前注册到Linux的select/epoll选择器中,在Java中所对应的选择器类是Selector类。然后,开启整个IO多路复用模型的轮询流程。

2、就绪状态的轮询。通过选择器的查询方法,查询所有提前注册过的目标文件描述符(socket连接)的IO就绪状态。通过查询的系统调用,内核会返回一个就绪的socket列表。当任何一个注册过的socket中的数据准备好或者就绪了就说明内核缓冲区有数据了,内核将该socket加入就绪的列表中,并且返回就绪事件。

3、用户线程获得了就绪状态的列表后,根据其中的socket连接发起read系统调用,用户线程阻塞。内核开始复制数据,将数据从内核缓冲区复制到用户缓冲区。

4、复制完成后,内核返回结果,用户线程才会解除阻塞的状态,用户线程读取到了数据,继续执行。

通过JDK的源码可以看出,Java语言的NIO组件在Linux系统上是使用epoll系统调用实现的。所以,Java语言的NIO组件所使用的就是IO多路复用模型。

Java NIO类库包含以下三个核心组件:

  • Channel(通道)
  • Buffer(缓冲区)
  • Selector(选择器)

NIO VS OIO

在Java中,NIO和OIO的区别主要体现在三个方面:

1、OIO是面向流(Stream Oriented)的,NIO是面向缓冲区(Buffer Oriented)的。

在一般的OIO操作中,面向字节流或字符流的IO操作总是以流式的方式顺序地从一个流(Stream)中读取一个或多个字节,因此,我们不能随意改变读取指针的位置。

在NIO操作中则不同,NIO中引入了Channel和Buffer的概念。面向缓冲区的读取和写入只需要从通道读取数据到缓冲区中,或将数据从缓冲区写入通道中。NIO不像OIO那样是顺序操作,它可以随意读取Buffer中任意位置的数据。

2、OIO的操作是阻塞的,而NIO的操作是非阻塞的。

OIO操作都是阻塞的。例如,调用一个read方法读取一个文件的内容,调用read的线程就会被阻塞,直到read操作完成。

在NIO模式中,当调用read方法时,如果此时有数据,则read读取数据并返回;如果此时没有数据,则read也会直接返回,而不会阻塞当前线程。

NIO的非阻塞是通过通道和通道的多路复用技术实现的。

3、OIO没有选择器(Selector)的概念,而NIO有选择器的概念。

NIO的实现是基于底层选择器的系统调用的,所以NIO需要底层操作系统提供支持;而OIO不需要用到选择器。

IO多路复用编程的步骤:

1、把通道注册到选择器中

2、通过选择器所提供的事件查询(select)方法来查询这些注册的通道是否有已经就绪的IO事件(例如可读、可写、网络连接完成等)。

NIO Buffer

Buffer类是一个非线程安全类。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package java.nio;

import java.util.Spliterator;

public abstract class Buffer {

    /**
     * The characteristics of Spliterators that traverse and split elements
     * maintained in Buffers.
     */
    static final int SPLITERATOR_CHARACTERISTICS =
        Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;

    // Invariants: mark <= position <= limit <= capacity
    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;
    
}

使用Java NIO Buffer类的基本步骤如下:

1、使用创建子类实例对象的allocate()方法创建一个Buffer类的实例对象。

2、调用put()方法将数据写入缓冲区中。

3、写入完成后,在开始读取数据前调用Buffer.flip()方法,将缓冲区转换为读模式。

4、调用get()方法,可以从缓冲区中读取数据。

5、读取完成后,调用Buffer.clear()方法或Buffer.compact()方法,将缓冲区转换为写模式,可以继续写入。

NIO Channel

在OIO中,同一个网络连接会关联到两个流:

  • 输入流(Input Stream)

  • 输出流(Output Stream)。

Java应用程序通过这两个流不断地进行输入和输出的操作。

在NIO中,一个网络连接使用一个通道表示,所有NIO的IO操作都是通过连接通道完成的。一个通道类似于OIO中两个流的结合体,既可以从通道读取数据,也可以向通道写入数据。

四种Channel实现:FileChannel、SocketChannel、ServerSocketChannel、DatagramChannel。

NIO Selector

选择器可以完成IO的多路复用,其主要工作是通道的注册、监听、事件查询。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系是监控和被监控的关系。

通道和选择器之间的关联通过register(注册)的方式完成。调用通道的Channel.register(Selector sel,int ops)方法,可以将通道实例注册到一个选择器中。

register方法有两个参数:第一个参数sel指定通道注册到的选择器实例;第二个参数ops指定选择器要监控的IO事件类型。

通道IO事件

可供选择器监控的通道IO事件类型包括以下四种:

  • 可读:SelectionKey.OP_READ。
  • 可写:SelectionKey.OP_WRITE。
  • 连接:SelectionKey.OP_CONNECT。
  • 接收:SelectionKey.OP_ACCEPT。

这里的IO事件不是对通道的IO操作,而是通道处于某个IO操作就绪状态,表示通道具备执行某个IO操作的条件。

某个SocketChannel传输通道如果完成了和对端的三次握手过程,就会发生“连接就绪”(OP_CONNECT)事件;

某个ServerSocketChannel服务器连接监听通道,在监听到一个新连接到来时,则会发生“接收就绪”(OP_ACCEPT)事件;

一个SocketChannel通道有数据可读,就会发生“读就绪”(OP_READ)事件;

一个SocketChannel通道等待数据写入,就会发生“写就绪”(OP_WRITE)事件。

SelectableChannel

并不是所有的通道都是可以被选择器监控或选择的。

例如,FileChannel就不能被选择器复用。判断一个通道能否被选择器监控或选择有一个前提:判断它是否继承了抽象类SelectableChannel(可选择通道),如果是,就可以被选择,否则不能被选择。

SelectChannel类提供了实现通道可选择性所需要的公共方法。Java NIO中所有网络连接socket通道都继承了SelectableChannel类,都是可选择的。FileChannel并没有继承SelectableChannel,因此不是可选择通道。

Selector选择器使用

Selector选择器使用步骤:

1、获取选择器实例。调用静态工厂方法open()来获取Selector实例。

Java源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
/**
    * Opens a selector.
    *
    * <p> The new selector is created by invoking the {@link
    * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
    * of the system-wide default {@link
    * java.nio.channels.spi.SelectorProvider} object.  </p>
    *
    * @return  A new selector
    *
    * @throws  IOException
    *          If an I/O error occurs
    */
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}
1
2
//调用静态工厂方法open()来获取Selector实例
Selector selector = Selector.open();

Selector的类方法open()的内部是向选择器SPI发出请求,通过默认的SelectorProvider(选择器提供者)对象获取一个新的选择器实例。

Java中的SPI(Service Provider Interface,服务提供者接口)是一种可以扩展的服务提供和发现机制。Java通过SPI的方式提供选择器的默认实现版本。也就是说,其他的服务提供者可以通过SPI的方式提供定制化版本的选择器的动态替换或者扩展。

2、将Channel通道注册到Selector选择器实例

1
2
3
4
5
6
7
8
//获取Channel通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//绑定连接
serverSocketChannel.bind(new InetSocketAddress(18899));
//将Channel通道注册到Selector选择器上,并指定监听事件为"接收就绪"
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

注册到选择器的通道必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常。这意味着,FileChannel不能与选择器一起使用,因为FileChannel只有阻塞模式,不能切换到非阻塞模式;而socket相关的所有通道都可以。

一个通道并不一定支持所有的四种IO事件。可以在注册之前通过通道的validOps()方法来获取该通道支持的所有IO事件集合。

服务器监听通道ServerSocketChannel仅支持Accept(接收到新连接)IO事件.

Java源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// ServerSocketChannel

/**
* Returns an operation set identifying this channel's supported
* operations.
*
* <p> Server-socket channels only support the accepting of new
* connections, so this method returns {@link SelectionKey#OP_ACCEPT}.
* </p>
*
* @return  The valid-operation set
*/
public final int validOps() {
    return SelectionKey.OP_ACCEPT;
}

传输通道SocketChannel不支持Accept类型的IO事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
//SocketChannel

/**
* Returns an operation set identifying this channel's supported
* operations.
*
* <p> Socket channels support connecting, reading, and writing, so this
* method returns <tt>(</tt>{@link SelectionKey#OP_CONNECT}
* <tt>|</tt>&nbsp;{@link SelectionKey#OP_READ} <tt>|</tt>&nbsp;{@link
* SelectionKey#OP_WRITE}<tt>)</tt>.  </p>
*
* @return  The valid-operation set
*/
public final int validOps() {
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE
            | SelectionKey.OP_CONNECT);
}

3、选出感兴趣的IO就绪事件(选择键集合)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
//轮询,选择感兴趣的IO事件(选择键集合)
while (selector.select() > 0) {
    Set selectKeys = selector.selectedKeys();
    Iterator keyIterator = selectKeys.iterator();
    while (keyIterator.hasNext()) {
        SelectionKey key = (SelectionKey) keyIterator.next();
        //根据具体的IO事件类型执行对应的业务操作
        if (key.isAcceptable()) {
            //IO事件:ServerSocketChannel服务器监听通道有新连接
        } else if (key.isConnectable()) {
            //IO事件:传输通道连接成功
        } else if (key.isReadable()) {
            //IO事件:传输通道可读
        } else if (key.isWritable()) {
            //IO事件:传输通道可写
        }
        //传输完成后,移除选择键
        keyIterator.remove();
    }
}

处理完成后,需要将选择键从SelectionKey集合中移除,以防止下一次循环时被重复处理。

SelectionKey集合不能添加元素,如果试图向SelectionKey中添加元素,则将抛出java.lang.UnsupportedOperationException异常。

用于选择就绪的IO事件的select()方法有多个重载的实现版本,具体如下:

1、select():阻塞调用,直到至少有一个通道发生了注册的IO事件。

2、select(long timeout):和select()一样,但最长阻塞时间为timeout指定的毫秒数

3、selectNow():非阻塞,不管有没有IO事件都会立刻返回。

select()方法的返回值是整数类型(int),表示发生了IO事件的数量,即从上一次select到这一次select之间有多少通道发生了IO事件,更加准确地说是发生了选择器感兴趣(注册过)的IO事件数。

使用NIO实现Discard服务器

NioDiscardServer:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class NioDiscardServer {

    public static void startServer() throws IOException {

        // 1、创建一个 Selector选择器
        Selector selector = Selector.open();

        // 2、获取通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 3.设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 4、绑定连接
        serverSocketChannel.bind(new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_PORT));
        Logger.info("服务器启动成功");

        // 5、将通道注册到选择器上,并注册的IO事件为:“接收新连接”
        SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 6、轮询感兴趣的I/O就绪事件(选择键集合)
        while (selector.select() > 0) {
            // 7、获取选择键集合
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();

            while (selectedKeys.hasNext()) {
                // 8、获取单个的选择键,并处理
                SelectionKey selectedKey = selectedKeys.next();

                // 9、判断key是具体的什么事件
                if (selectedKey.isAcceptable()) {
                    // 10、若选择键的IO事件是“连接就绪”事件,就获取客户端连接
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    // 11、切换为非阻塞模式
                    socketChannel.configureBlocking(false);
                    // 12、将该通道注册到selector选择器上
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (selectedKey.isReadable()) {
                    // 13、若选择键的IO事件是“可读”事件,读取数据
                    SocketChannel socketChannel = (SocketChannel) selectedKey.channel();

                    // 14、读取数据
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int length = 0;
                    while ((length = socketChannel.read(byteBuffer)) > 0) {
                        byteBuffer.flip();
                        Logger.info(new String(byteBuffer.array(), 0, length));
                        byteBuffer.clear();
                    }
                    socketChannel.close();
                }
                // 15、移除选择键
                selectedKeys.remove();
            }
        }

        // 7、关闭连接
        serverSocketChannel.close();
    }

    public static void main(String[] args) throws IOException {
        startServer();
    }

}

NioDiscardClient:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class NioDiscardClient {
    /**
     * 客户端
     */
    public static void startClient() throws IOException {
        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);

        // 1、获取通道(channel)
        SocketChannel socketChannel = SocketChannel.open(address);
        // 2、切换成非阻塞模式
        socketChannel.configureBlocking(false);
        //不断的自旋、等待连接完成,或者做一些其他的事情
        while (!socketChannel.finishConnect()) {

        }

        Logger.info("客户端连接成功");
        // 3、分配指定大小的缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put("hello world".getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
        socketChannel.shutdownOutput();
        socketChannel.close();
    }


    public static void main(String[] args) throws IOException {
        startClient();
    }

}

首先执行NioDiscardServer服务端程序,然后多次执行NioDiscardClient客户端程序。运行效果如下:

NioDiscardClient

NioDiscardServer