以下代码只兼容Java 7及以上版本,对于一些关键地方请看注释说明。
公共类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package
com.stevex.app.nio;
import
java.nio.ByteBuffer;
import
java.nio.CharBuffer;
import
java.nio.charset.CharacterCodingException;
import
java.nio.charset.Charset;
import
java.nio.charset.CharsetDecoder;
import
java.nio.charset.CharsetEncoder;
public
class
CharsetHelper {
private
static
final
String UTF_8 =
"UTF-8"
;
private
static
CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
private
static
CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
public
static
ByteBuffer encode(CharBuffer in)
throws
CharacterCodingException{
return
encoder.encode(in);
}
public
static
CharBuffer decode(ByteBuffer in)
throws
CharacterCodingException{
return
decoder.decode(in);
}
}
|
服务器代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
package
com.stevex.app.nio;
import
java.io.IOException;
import
java.net.InetSocketAddress;
import
java.nio.ByteBuffer;
import
java.nio.CharBuffer;
import
java.nio.channels.ClosedChannelException;
import
java.nio.channels.SelectionKey;
import
java.nio.channels.Selector;
import
java.nio.channels.ServerSocketChannel;
import
java.nio.channels.SocketChannel;
import
java.util.Iterator;
public
class
XiaoNa {
private
ByteBuffer readBuffer;
private
Selector selector;
public
static
void
main(String[] args){
XiaoNa xiaona =
new
XiaoNa();
xiaona.init();
xiaona.listen();
}
private
void
init(){
readBuffer = ByteBuffer.allocate(
1024
);
ServerSocketChannel servSocketChannel;
try
{
servSocketChannel = ServerSocketChannel.open();
servSocketChannel.configureBlocking(
false
);
//绑定端口
servSocketChannel.socket().bind(
new
InetSocketAddress(
8383
));
selector = Selector.open();
servSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
catch
(IOException e) {
e.printStackTrace();
}
}
private
void
listen() {
while
(
true
){
try
{
selector.select();
Iterator ite = selector.selectedKeys().iterator();
while
(ite.hasNext()){
SelectionKey key = (SelectionKey) ite.next();
ite.remove();
//确保不重复处理
handleKey(key);
}
}
catch
(Throwable t){
t.printStackTrace();
}
}
}
private
void
handleKey(SelectionKey key)
throws
IOException, ClosedChannelException {
SocketChannel channel =
null
;
try
{
if
(key.isAcceptable()){
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
channel = serverChannel.accept();
//接受连接请求
channel.configureBlocking(
false
);
channel.register(selector, SelectionKey.OP_READ);
}
else
if
(key.isReadable()){
channel = (SocketChannel) key.channel();
readBuffer.clear();
/*当客户端channel关闭后,会不断收到read事件,但没有消息,即read方法返回-1
* 所以这时服务器端也需要关闭channel,避免无限无效的处理*/
int count = channel.read(readBuffer);
if(count > 0){
//一定需要调用flip函数,否则读取错误数据
readBuffer.flip();
/*使用CharBuffer配合取出正确的数据
String question = new String(readBuffer.array());
可能会出错,因为前面readBuffer.clear();并未真正清理数据
只是重置缓冲区的position, limit, mark,
而readBuffer.array()会返回整个缓冲区的内容。
decode方法只取readBuffer的position到limit数据。
例如,上一次读取到缓冲区的是"where", clear后position为0,limit为 1024,
再次读取“bye"到缓冲区后,position为3,limit不变,
flip后position为0,limit为3,前三个字符被覆盖了,但"re"还存在缓冲区中,
所以 new String(readBuffer.array()) 返回 "byere",
而decode(readBuffer)返回"bye"。
*/
CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
String question = charBuffer.toString();
String answer = getAnswer(question);
channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
}
else
{
//这里关闭channel,因为客户端已经关闭channel或者异常了
channel.close();
}
}
}
catch
(Throwable t){
t.printStackTrace();
if
(channel !=
null
){
channel.close();
}
}
}
private
String getAnswer(String question){
String answer =
null
;
switch
(question){
case
"who"
:
answer =
"我是小娜\n"
;
break
;
case
"what"
:
answer =
"我是来帮你解闷的\n"
;
break
;
case
"where"
:
answer =
"我来自外太空\n"
;
break
;
case
"hi"
:
answer =
"hello\n"
;
break
;
case
"bye"
:
answer =
"88\n"
;
break
;
default
:
answer =
"请输入 who, 或者what, 或者where"
;
}
return
answer;
}
}
|
客户端代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
package
com.stevex.app.nio;
import
java.io.IOException;
import
java.net.InetSocketAddress;
import
java.nio.ByteBuffer;
import
java.nio.CharBuffer;
import
java.nio.channels.SelectionKey;
import
java.nio.channels.Selector;
import
java.nio.channels.SocketChannel;
import
java.util.Iterator;
import
java.util.Random;
import
java.util.concurrent.ArrayBlockingQueue;
import
java.util.concurrent.BlockingQueue;
import
java.util.concurrent.TimeUnit;
public
class
Client
implements
Runnable{
private
BlockingQueue<String> words;
private
Random random;
public
static
void
main(String[] args) {
//种多个线程发起Socket客户端连接请求
for
(
int
i=
0
; i<
10
; i++){
Client c =
new
Client();
c.init();
new
Thread(c).start();
}
}
@Override
public
void
run() {
SocketChannel channel =
null
;
Selector selector =
null
;
try
{
channel = SocketChannel.open();
channel.configureBlocking(
false
);
//请求连接
channel.connect(
new
InetSocketAddress(
"localhost"
,
8383
));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
boolean
isOver =
false
;
while
(! isOver){
selector.select();
Iterator ite = selector.selectedKeys().iterator();
while
(ite.hasNext()){
SelectionKey key = (SelectionKey) ite.next();
ite.remove();
if
(key.isConnectable()){
if
(channel.isConnectionPending()){
if
(channel.finishConnect()){
//只有当连接成功后才能注册OP_READ事件
key.interestOps(SelectionKey.OP_READ);
channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
sleep();
}
else
{
key.cancel();
}
}
}
else
if
(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(
128
);
channel.read(byteBuffer);
byteBuffer.flip();
CharBuffer charBuffer = CharsetHelper.decode(byteBuffer);
String answer = charBuffer.toString();
System.out.println(Thread.currentThread().getId() +
"---"
+ answer);
String word = getWord();
if
(word !=
null
){
channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
}
else
{
isOver =
true
;
}
sleep();
}
}
}
}
catch
(IOException e) {
e.printStackTrace();
}
finally
{
if
(channel !=
null
){
try
{
channel.close();
}
catch
(IOException e) {
e.printStackTrace();
}
}
if
(selector !=
null
){
try
{
selector.close();
}
catch
(IOException e) {
e.printStackTrace();
}
}
}
}
private
void
init() {
words =
new
ArrayBlockingQueue<String>(
5
);
try
{
words.put(
"hi"
);
words.put(
"who"
);
words.put(
"what"
);
words.put(
"where"
);
words.put(
"bye"
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
random =
new
Random();
}
private
String getWord(){
return
words.poll();
}
private
void
sleep() {
try
{
TimeUnit.SECONDS.sleep(random.nextInt(
3
));
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
private
void
sleep(
long
l) {
try
{
TimeUnit.SECONDS.sleep(l);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
|
本文转自sarchitect 51CTO博客,原文链接:http://blog.51cto.com/stevex/1581376
,如需转载请自行联系原作者