多线程OIO
ServerSocket类用于建立套接字。所有标准服务都不使用端口号8189。
1
2
3
4
5
6
7
8
9
10
11
12
|
try {
//建立一个负责监控端口8189的服务器
ServerSocket s = new ServerSocket(8189);
//返回一个已建立连接的Socket对象
Socket incoming = s.accept();
InputStream inStream = incoming.getInputStream();
OutputStream outStream = incoming.getOutputStream();
//关闭连接的套接字
incoming.close();
} catch (IOException e) {
e.printStackTrace();
}
|
传统的网络服务器程序:
1
2
3
4
5
|
while(true)
{
socket = accept(); //阻塞,接收连接
handle(socket); //读取数据、业务处理、写入结果
}
|
Connection Per Thread(一个线程处理一个连接):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
class ConnectionPerThread implements Runnable {
public void run() {
try {
ServerSocket serverSocket =
new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
while (!Thread.interrupted()) {
Socket socket = serverSocket.accept();
Handler handler = new Handler(socket);
//创建新线程来handle
//或者,使用线程池来处理
new Thread(handler).start();
}
} catch (IOException ex) { /* 处理异常 */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
while (true) {
try {
byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
/* 读取数据 */
socket.getInputStream().read(input);
/* 处理业务逻辑,获取处理结果 */
byte[] output = null;
/* 写入结果 */
socket.getOutputStream().write(output);
} catch (IOException ex) { /*处理异常*/ }
}
}
}
}
|
对于每一个新的网络连接都分配给一个线程。每个线程都独自处理自己负责的socket连接的输入和输出。早期版本的Tomcat服务器就是这样实现的。
Connection Per Thread模式的缺点:对应于大量的连接,需要耗费大量的线程资源,对线程资源要求太高。
单线程Reactor模式
在Reactor模式中有Reactor和Handler两个组件:
1、Reactor:负责查询IO事件,当检测到一个IO事件时将其发送给相应的Handler处理器去处理。这里的IO事件就是NIO中选择器查询出来的通道IO事件
2、Handler:与IO事件(或者SelectionKey选择键)绑定,负责IO事件的处理,完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写到通道等。
单线程Reactor模式:Reactor和Handlers处于一个线程中执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
//反应器
class EchoServerReactor implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
EchoServerReactor() throws IOException {
//Reactor初始化
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
Logger.info("服务端已经开始监听:" + address);
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//attach callback object, AcceptorHandler
sk.attach(new AcceptorHandler());
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
Logger.info("接收到一个连接");
if (channel != null)
new EchoHandler(selector, channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Thread(new EchoServerReactor()).start();
}
}
|
1
2
3
4
5
6
7
|
//分步处理,第一步,接收accept事件
//为ServerSocket注册新连接accept接收事件
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//attach callback object, AcceptorHandler
//将AcceptHandler新连接处理器作为附件,绑定到SelectionKey选择键
sk.attach(new AcceptorHandler());
|
当新连接事件发生后,取出之前附加到SelectionKey中的Handler业务处理器进行socket的各种IO处理。
1
2
3
4
5
6
7
|
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
|
AcceptorHandler处理器是一个内部类。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
Logger.info("接收到一个连接");
if (channel != null)
new EchoHandler(selector, channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
|
AcceptorHandler处理器的两大职责:
-
完成新连接的接收工作
-
为新连接创建一个负责数据传输的Handler
单线程Rector模式的EchoServer
EchoServer的功能很简单:读取客户端的输入并回显到客户端,即回显服务器。
基于Reactor模式来实现,设计三个重要的类:
1、设计一个反应器类:EchoServerReactor类。
2、设计两个处理器类:AcceptorHandler新连接处理器、EchoHandler回显处理器。
EchoHandler回显处理器:传输处理器,主要完成客户端的内容读取和回显。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
class EchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
EchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将Handler作为选择键的附件
sk.attach(this);
//第二步,注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
sk.cancel();
try {
channel.finishConnect();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
|
启动回显服务器:EchoServerReactor类中的main()方法。
启动客户端:EchoClient类中的main()方法。EchoClient用于数据的发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
public class EchoClient {
public void start() throws IOException {
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
// 1、获取通道(channel)
SocketChannel socketChannel = SocketChannel.open(address);
Logger.info("客户端连接成功");
// 2、切换成非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//不断的自旋、等待连接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
Logger.tcfo("客户端启动成功!");
//启动接受线程
Processor processor = new Processor(socketChannel);
Commander commander = new Commander(processor);
new Thread(commander).start();
new Thread(processor).start();
}
static class Commander implements Runnable {
Processor processor;
Commander(Processor processor) throws IOException {
//Reactor初始化
this.processor = processor;
}
public void run() {
while (!Thread.interrupted()) {
ByteBuffer buffer = processor.getSendBuffer();
Scanner scanner = new Scanner(System.in);
while (processor.hasData.get()) {
Logger.tcfo("还有消息没有发送完,请稍等");
ThreadUtil.sleepMilliSeconds(1000);
}
Logger.tcfo("请输入发送内容:");
if (scanner.hasNext()) {
String next = scanner.next();
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
processor.hasData.set(true);
}
}
}
}
@Data
static class Processor implements Runnable {
ByteBuffer sendBuffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
ByteBuffer readBuffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
protected AtomicBoolean hasData = new AtomicBoolean(false);
final Selector selector;
final SocketChannel channel;
Processor(SocketChannel channel) throws IOException {
//Reactor初始化
selector = Selector.open();
this.channel = channel;
channel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) {
if (hasData.get()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
sendBuffer.flip();
// 操作三:发送数据
socketChannel.write(sendBuffer);
sendBuffer.clear();
hasData.set(false);
}
}
if (sk.isReadable()) {
// 若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) sk.channel();
int length = 0;
while ((length = socketChannel.read(readBuffer)) > 0) {
readBuffer.flip();
Logger.info("server echo:" + new String(readBuffer.array(), 0, length));
readBuffer.clear();
}
}
//处理结束了, 这里不能关闭select key,需要重复使用
//selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new EchoClient().start();
}
}
|
运行结果:
单线程Reactor模式的缺点
在单线程Reactor模式中,Reactor和Handler都在同一条线程中执行。因此
- 当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。
- 单线程Reactor模式模型不能充分利用多核资源。
总之,在高性能服务器应用场景中,单线程Reactor模式实际使用的很少。
多线程Reactor模式
多线程Reactor的演进分为两个方面:
1、升级Handler。既要使用多线程,又要尽可能高效率,则可以考虑使用线程池。
2、升级Reactor。可以考虑引入多个Selector(选择器),提升选择大量通道的能力。
总体来说,多线程版本的Reactor模式大致如下:
1、将负责数据传输处理的IOHandler处理器的执行放入独立的线程池中。这样,业务处理线程与负责新连接监听的反应器线程就能相互隔离,避免服务器的连接监听受到阻塞。
2、如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,并且为每一个SubReactor引入一个线程,一个线程负责一个选择器的事件轮询。这样充分释放了系统资源的能力,也大大提升了反应器管理大量连接或者监听大量传输通道的能力。
MultiThreadEchoServerReactor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
//多线程版本反应器
class MultiThreadEchoServerReactor {
ServerSocketChannel serverSocket;
AtomicInteger next = new AtomicInteger(0);
Selector bossSelector = null;
Reactor bossReactor = null;
//selectors集合,引入多个selector选择器
Selector[] workSelectors = new Selector[2];
//引入多个子反应器
Reactor[] workReactors = null;
MultiThreadEchoServerReactor() throws IOException {
bossSelector = Selector.open();
//初始化多个selector选择器
workSelectors[0] = Selector.open();
workSelectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
//非阻塞
serverSocket.configureBlocking(false);
//第一个selector,负责监控新连接事件
SelectionKey sk =
serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
//附加新连接处理handler处理器到SelectionKey(选择键)
sk.attach(new AcceptorHandler());
//处理新连接的反应器
bossReactor = new Reactor(bossSelector);
//第一个子反应器,一子反应器负责一个选择器
Reactor subReactor1 = new Reactor(workSelectors[0]);
//第二个子反应器,一子反应器负责一个选择器
Reactor subReactor2 = new Reactor(workSelectors[1]);
workReactors = new Reactor[]{subReactor1, subReactor2};
}
private void startService() {
new Thread(bossReactor).start();
// 一子反应器对应一条线程
new Thread(workReactors[0]).start();
new Thread(workReactors[1]).start();
}
//反应器
class Reactor implements Runnable {
//每条线程负责一个选择器的查询
final Selector selector;
public Reactor(Selector selector) {
this.selector = selector;
}
public void run() {
try {
while (!Thread.interrupted()) {
//单位为毫秒
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (null == selectedKeys || selectedKeys.size() == 0) {
continue;
}
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
selectedKeys.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
}
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
Logger.info("接收到一个新的连接");
if (channel != null) {
int index = next.get();
Logger.info("选择器的编号:" + index);
Selector selector = workSelectors[index];
new MultiThreadEchoHandler(selector, channel);
}
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == workSelectors.length) {
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
|
两个选择器:
- 第一个选择器负责查询和分发新连接事件
- 第二个选择器负责查询和分发IO传输事件
sdf
有两条事件轮询线程
服务端的监听通道注册到第一个选择器,而所有的Socket传输通道都注册到第二个选择器,从而实现了新连接监听和IO读写事件监听的线程分离。
MultiThreadEchoHandler
MultiThreadEchoHandler回显处理器,引入了一个线程池(ThreadPool),使得数据传输
和业务处理
的代码执行在独立的线程池中,彻底地做到IO处理以及业务处理线程和反应器IO事件轮询线程的完全隔离。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
channel.configureBlocking(false);
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
//唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss
// selector.wakeup();
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件dispatch
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//唤醒选择,是的OP_READ生效
selector.wakeup();
Logger.info("新的连接 注册完成");
}
public void run() {
//异步任务,在独立的线程池中执行
pool.execute(new AsyncTask());
}
//异步任务,不在Reactor线程中执行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
|
IO操作和业务处理被提交到线程池中异步执行,为了避免发送和读取的状态混乱,需要进行线程安全处理,这里在asyncRun()方法的前面加上synchronized同步修饰符。
运行结果与单线程版本的EchoServer运行输出是一样的。
Reactor模式的优缺点
和生产者消费者模式对比
同:
在一定程度上,Reactor模式有点类似生产者消费者模式。
在生产者消费者模式中,一个或多个生产者将事件加入一个队列中,一个或多个消费者主动从这个队列中拉取(Pull)事件来处理。
异:
Reactor模式是基于查询的,没有专门的队列去缓冲存储IO事件,查询到IO事件之后,反应器会根据不同IO选择键(事件)将其分发给对应的Handler来处理。
和观察者模式对比
同:
在Reactor模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步分发这些IO事件。
观察者模式(Observer Pattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时会通知所有观察者,它们能够执行相应的处理。
异:
在Reactor模式中,Handler实例和IO事件(选择键)的订阅关系基本上是一个事件绑定到一个Handler,每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler,也就是一个事件只能被一个Handler处理;
在观察者模式中,同一时刻、同一主题可以被订阅过的多个观察者处理。
优缺点
作为高性能的IO模式,Reactor模式的优点:
- 响应快,虽然同一反应器线程本身是同步的,但是不会被单个连接的IO操作所阻塞。
- 编程相对简单,最大限度避免了复杂的多线程同步,也避免了多线程各个进程之间切换的开销。
- 可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。
Reactor模式的缺点:
- Reactor模式增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
- Reactor模式依赖于操作系统底层的IO多路复用系统调用的支持,如Linux中的epoll系统调用。如果操作系统的底层不支持IO多路复用,Reactor模式不会那么高效。
- 在同一个Handler业务线程中,如果出现一个长时间的数据读写,就会影响这个反应器中其他通道的IO处理。例如,在大文件传输时,IO操作就会影响其他客户端的响应时间。对于这种操作,还需要进一步对Reactor模式进行改进。
Author
Anjana
LastMod
2022-03-28
License
原创文章,如需转载请注明作者和出处。谢谢!