多线程OIO

ServerSocket类用于建立套接字。所有标准服务都不使用端口号8189。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
try {
    //建立一个负责监控端口8189的服务器
    ServerSocket s = new ServerSocket(8189);
    //返回一个已建立连接的Socket对象
    Socket incoming = s.accept();
    InputStream inStream = incoming.getInputStream();
    OutputStream outStream = incoming.getOutputStream();
    //关闭连接的套接字
    incoming.close();
} catch (IOException e) {
    e.printStackTrace();
}

传统的网络服务器程序:

1
2
3
4
5
while(true)
{
    socket = accept(); //阻塞,接收连接
    handle(socket); //读取数据、业务处理、写入结果
}

Connection Per Thread(一个线程处理一个连接):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ConnectionPerThread implements Runnable {
    public void run() {
        try {
            ServerSocket serverSocket =
                    new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted()) {
                Socket socket = serverSocket.accept();
                Handler handler = new Handler(socket);
                //创建新线程来handle
                //或者,使用线程池来处理
                new Thread(handler).start();
            }

        } catch (IOException ex) { /* 处理异常 */ }
    }

    static class Handler implements Runnable {
        final Socket socket;

        Handler(Socket s) {
            socket = s;
        }

        public void run() {
            while (true) {
                try {
                    byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
                    /* 读取数据 */
                    socket.getInputStream().read(input);
                    /* 处理业务逻辑,获取处理结果 */
                    byte[] output = null;
                    /* 写入结果 */
                    socket.getOutputStream().write(output);
                } catch (IOException ex) { /*处理异常*/ }
            }
        }

    }
}

对于每一个新的网络连接都分配给一个线程。每个线程都独自处理自己负责的socket连接的输入和输出。早期版本的Tomcat服务器就是这样实现的。

Connection Per Thread模式的缺点:对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。

单线程Reactor模式

在Reactor模式中有Reactor和Handler两个组件:

1、Reactor:负责查询IO事件,当检测到一个IO事件时将其发送给相应的Handler处理器去处理。这里的IO事件就是NIO中选择器查询出来的通道IO事件

2、Handler:与IO事件(或者SelectionKey选择键)绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写到通道等。

单线程Reactor模式:Reactor和Handlers处于一个线程中执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//反应器
class EchoServerReactor implements Runnable {
    Selector selector;
    ServerSocketChannel serverSocket;

    EchoServerReactor() throws IOException {
        //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocket.socket().bind(address);
        Logger.info("服务端已经开始监听:" + address);
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //attach callback object, AcceptorHandler
        sk.attach(new AcceptorHandler());
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    //Reactor负责dispatch收到的事件
                    SelectionKey sk = it.next();
                    dispatch(sk);
                }
                selected.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    void dispatch(SelectionKey sk) {
        Runnable handler = (Runnable) sk.attachment();
        //调用之前attach绑定到选择键的handler处理器对象
        if (handler != null) {
            handler.run();
        }
    }

    // Handler:新连接处理器
    class AcceptorHandler implements Runnable {
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                Logger.info("接收到一个连接");
                if (channel != null)
                    new EchoHandler(selector, channel);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new Thread(new EchoServerReactor()).start();
    }
}
1
2
3
4
5
6
7
//分步处理,第一步,接收accept事件
//为ServerSocket注册新连接accept接收事件
SelectionKey sk =
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//attach callback object, AcceptorHandler
//将AcceptHandler新连接处理器作为附件,绑定到SelectionKey选择键
sk.attach(new AcceptorHandler());

当新连接事件发生后,取出之前附加到SelectionKey中的Handler业务处理器进行socket的各种IO处理。

1
2
3
4
5
6
7
void dispatch(SelectionKey sk) {
    Runnable handler = (Runnable) sk.attachment();
    //调用之前attach绑定到选择键的handler处理器对象
    if (handler != null) {
        handler.run();
    }
}

AcceptorHandler处理器是一个内部类。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
    public void run() {
        try {
            SocketChannel channel = serverSocket.accept();
            Logger.info("接收到一个连接");
            if (channel != null)
                new EchoHandler(selector, channel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

AcceptorHandler处理器的两大职责:

  • 完成新连接的接收工作

  • 为新连接创建一个负责数据传输的Handler

单线程Rector模式的EchoServer

EchoServer的功能很简单:读取客户端的输入并回显到客户端,即回显服务器。

基于Reactor模式来实现,设计三个重要的类:

1、设计一个反应器类:EchoServerReactor类。

2、设计两个处理器类:AcceptorHandler新连接处理器、EchoHandler回显处理器。

EchoHandler回显处理器:传输处理器,主要完成客户端的内容读取和回显。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class EchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;

    EchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        //仅仅取得选择键,后设置感兴趣的IO事件
        sk = channel.register(selector, 0);

        //将Handler作为选择键的附件
        sk.attach(this);

        //第二步,注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    public void run() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);
                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();
                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
            sk.cancel();
            try {
                channel.finishConnect();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
}

启动回显服务器:EchoServerReactor类中的main()方法。

启动客户端:EchoClient类中的main()方法。EchoClient用于数据的发送。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public class EchoClient {

    public void start() throws IOException {

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);

        // 1、获取通道(channel)
        SocketChannel socketChannel = SocketChannel.open(address);
        Logger.info("客户端连接成功");
        // 2、切换成非阻塞模式
        socketChannel.configureBlocking(false);
        socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        //不断的自旋、等待连接完成,或者做一些其他的事情
        while (!socketChannel.finishConnect()) {

        }
        Logger.tcfo("客户端启动成功!");

        //启动接受线程
        Processor processor = new Processor(socketChannel);
        Commander commander = new Commander(processor);
        new Thread(commander).start();
        new Thread(processor).start();

    }

    static class Commander implements Runnable {
        Processor processor;

        Commander(Processor processor) throws IOException {
            //Reactor初始化
            this.processor = processor;
        }

        public void run() {
            while (!Thread.interrupted()) {

                ByteBuffer buffer = processor.getSendBuffer();

                Scanner scanner = new Scanner(System.in);
                while (processor.hasData.get()) {
                    Logger.tcfo("还有消息没有发送完,请稍等");
                    ThreadUtil.sleepMilliSeconds(1000);

                }
                Logger.tcfo("请输入发送内容:");
                if (scanner.hasNext()) {

                    String next = scanner.next();
                    buffer.put((Dateutil.getNow() + " >>" + next).getBytes());

                    processor.hasData.set(true);
                }

            }
        }
    }


    @Data
    static class Processor implements Runnable {
        ByteBuffer sendBuffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

        ByteBuffer readBuffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

        protected AtomicBoolean hasData = new AtomicBoolean(false);

        final Selector selector;
        final SocketChannel channel;

        Processor(SocketChannel channel) throws IOException {
            //Reactor初始化
            selector = Selector.open();

            this.channel = channel;
            channel.register(selector,
                    SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        SelectionKey sk = it.next();
                        if (sk.isWritable()) {

                            if (hasData.get()) {
                                SocketChannel socketChannel = (SocketChannel) sk.channel();
                                sendBuffer.flip();
                                // 操作三:发送数据
                                socketChannel.write(sendBuffer);
                                sendBuffer.clear();
                                hasData.set(false);
                            }

                        }
                        if (sk.isReadable()) {
                            // 若选择键的IO事件是“可读”事件,读取数据
                            SocketChannel socketChannel = (SocketChannel) sk.channel();

                            int length = 0;
                            while ((length = socketChannel.read(readBuffer)) > 0) {
                                readBuffer.flip();
                                Logger.info("server echo:" + new String(readBuffer.array(), 0, length));
                                readBuffer.clear();
                            }

                        }
                        //处理结束了, 这里不能关闭select key,需要重复使用
                        //selectionKey.cancel();
                    }
                    selected.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new EchoClient().start();
    }
}

运行结果:

EchoClient

EchoServerReactor

单线程Reactor模式的缺点

在单线程Reactor模式中,Reactor和Handler都在同一条线程中执行。因此

  • 当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。
  • 单线程Reactor模式模型不能充分利用多核资源。

总之,在高性能服务器应用场景中,单线程Reactor模式实际使用的很少。

多线程Reactor模式

多线程Reactor的演进分为两个方面:

1、升级Handler。既要使用多线程,又要尽可能高效率,则可以考虑使用线程池。

2、升级Reactor。可以考虑引入多个Selector(选择器),提升选择大量通道的能力。

总体来说,多线程版本的Reactor模式大致如下:

1、将负责数据传输处理的IOHandler处理器的执行放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听受到阻塞。

2、如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,并且为每一个SubReactor引入一个线程,一个线程负责一个选择器的事件轮询。这样充分释放了系统资源的能力,也大大提升了反应器管理大量连接或者监听大量传输通道的能力。

MultiThreadEchoServerReactor

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//多线程版本反应器
class MultiThreadEchoServerReactor {
    ServerSocketChannel serverSocket;
    AtomicInteger next = new AtomicInteger(0);
    Selector bossSelector = null;
    Reactor bossReactor = null;
    //selectors集合,引入多个selector选择器
    Selector[] workSelectors = new Selector[2];
    //引入多个子反应器
    Reactor[] workReactors = null;

    MultiThreadEchoServerReactor() throws IOException {

        bossSelector = Selector.open();
        //初始化多个selector选择器
        workSelectors[0] = Selector.open();
        workSelectors[1] = Selector.open();
        serverSocket = ServerSocketChannel.open();

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocket.socket().bind(address);
        //非阻塞
        serverSocket.configureBlocking(false);

        //第一个selector,负责监控新连接事件
        SelectionKey sk =
                serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
        //附加新连接处理handler处理器到SelectionKey(选择键)
        sk.attach(new AcceptorHandler());

        //处理新连接的反应器
        bossReactor = new Reactor(bossSelector);

        //第一个子反应器,一子反应器负责一个选择器
        Reactor subReactor1 = new Reactor(workSelectors[0]);
        //第二个子反应器,一子反应器负责一个选择器
        Reactor subReactor2 = new Reactor(workSelectors[1]);
        workReactors = new Reactor[]{subReactor1, subReactor2};
    }

    private void startService() {
        new Thread(bossReactor).start();
        // 一子反应器对应一条线程
        new Thread(workReactors[0]).start();
        new Thread(workReactors[1]).start();
    }

    //反应器
    class Reactor implements Runnable {
        //每条线程负责一个选择器的查询
        final Selector selector;

        public Reactor(Selector selector) {
            this.selector = selector;
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    //单位为毫秒
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    if (null == selectedKeys || selectedKeys.size() == 0) {
                        continue;
                    }
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        //Reactor负责dispatch收到的事件
                        SelectionKey sk = it.next();
                        dispatch(sk);
                    }
                    selectedKeys.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }


        void dispatch(SelectionKey sk) {
            Runnable handler = (Runnable) sk.attachment();
            //调用之前attach绑定到选择键的handler处理器对象
            if (handler != null) {
                handler.run();
            }
        }
    }

    // Handler:新连接处理器
    class AcceptorHandler implements Runnable {
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                Logger.info("接收到一个新的连接");

                if (channel != null) {
                    int index = next.get();
                    Logger.info("选择器的编号:" + index);
                    Selector selector = workSelectors[index];
                    new MultiThreadEchoHandler(selector, channel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (next.incrementAndGet() == workSelectors.length) {
                next.set(0);
            }
        }
    }

    public static void main(String[] args) throws IOException {
        MultiThreadEchoServerReactor server =
                new MultiThreadEchoServerReactor();
        server.startService();
    }

}

两个选择器:

  • 第一个选择器负责查询和分发新连接事件
  • 第二个选择器负责查询和分发IO传输事件

sdf

有两条事件轮询线程

  • 第一条线程为新连接事件轮询线程,专门轮询第一个选择器

  • 第二条线程为IO事件轮询线程,专门轮询第二个选择器

服务端的监听通道注册到第一个选择器,而所有的Socket传输通道都注册到第二个选择器,从而实现了新连接监听和IO读写事件监听的线程分离。

MultiThreadEchoHandler

MultiThreadEchoHandler回显处理器,引入了一个线程池(ThreadPool),使得数据传输业务处理的代码执行在独立的线程池中,彻底地做到IO处理以及业务处理线程和反应器IO事件轮询线程的完全隔离。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class MultiThreadEchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    //引入线程池
    static ExecutorService pool = Executors.newFixedThreadPool(4);

    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        channel.configureBlocking(false);
        channel.setOption(StandardSocketOptions.TCP_NODELAY, true);

        //唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss
//        selector.wakeup();

        //仅仅取得选择键,后设置感兴趣的IO事件
        sk = channel.register(selector, 0);
        //将本Handler作为sk选择键的附件,方便事件dispatch
        sk.attach(this);
        //向sk选择键注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);

        //唤醒选择,是的OP_READ生效
        selector.wakeup();
        Logger.info("新的连接 注册完成");

    }

    public void run() {
        //异步任务,在独立的线程池中执行
        pool.execute(new AsyncTask());
    }

    //异步任务,不在Reactor线程中执行
    public synchronized void asyncRun() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);

                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();
                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    //异步任务的内部类
    class AsyncTask implements Runnable {
        public void run() {
            MultiThreadEchoHandler.this.asyncRun();
        }
    }

}

IO操作和业务处理被提交到线程池中异步执行,为了避免发送和读取的状态混乱,需要进行线程安全处理,这里在asyncRun()方法的前面加上synchronized同步修饰符。

运行结果与单线程版本的EchoServer运行输出是一样的。

Reactor模式的优缺点

和生产者消费者模式对比

同:

在一定程度上,Reactor模式有点类似生产者消费者模式。

在生产者消费者模式中,一个或多个生产者将事件加入一个队列中,一个或多个消费者主动从这个队列中拉取(Pull)事件来处理。

异:

Reactor模式是基于查询的,没有专门的队列去缓冲存储IO事件,查询到IO事件之后,反应器会根据不同IO选择键(事件)将其分发给对应的Handler来处理。

和观察者模式对比

同:

在Reactor模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步分发这些IO事件。

观察者模式(Observer Pattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时会通知所有观察者,它们能够执行相应的处理。

异:

在Reactor模式中,Handler实例和IO事件(选择键)的订阅关系基本上是一个事件绑定到一个Handler,每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler,也就是一个事件只能被一个Handler处理;

在观察者模式中,同一时刻、同一主题可以被订阅过的多个观察者处理。

优缺点

作为高性能的IO模式,Reactor模式的优点:

  • 响应快,虽然同一反应器线程本身是同步的,但是不会被单个连接的IO操作所阻塞。
  • 编程相对简单,最大限度避免了复杂的多线程同步,也避免了多线程各个进程之间切换的开销。
  • 可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。

Reactor模式的缺点:

  • Reactor模式增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
  • Reactor模式依赖于操作系统底层的IO多路复用系统调用的支持,如Linux中的epoll系统调用。如果操作系统的底层不支持IO多路复用,Reactor模式不会那么高效。
  • 在同一个Handler业务线程中,如果出现一个长时间的数据读写,就会影响这个反应器中其他通道的IO处理。例如,在大文件传输时,IO操作就会影响其他客户端的响应时间。对于这种操作,还需要进一步对Reactor模式进行改进。