想要深入的学习Netty,Nio编程是必须要掌握的内容,为了更好的学习Netty,这里看一下Nio相关的内容。
案例
开始之前,先看一个NioServer端的案例。
publicclassMultiplexerTimeServer implementsRunnable{
privateSelectorselector;
privateServerSocketChannelservChannel;
privatevolatilebooleanstop;
/**
*初始化多路复用器、绑定监听端口
*
* @param port
*/
publicMultiplexerTimeServer(intport) {
try{
// 创建多路复用器Selector、ServerSocketChannel
selector = Selector.open();
servChannel = ServerSocketChannel.open();
// 配置为非阻塞模式
servChannel.configureBlocking(false);
// 绑定端口,将backlog设置为1024
servChannel.socket().bind(newInetSocketAddress(port),1024);
// 系统资源初始化成功后,把serverSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : "+ port);
} catch(IOExceptione) {
e.printStackTrace();
System.exit(1);
}
}
publicvoidstop() {
this.stop= true;
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
publicvoidrun() {
while(!stop) {
try{
//循环体重遍历selector,超时时间为1s。
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKeykey = null;
while(it.hasNext()) {
key = it.next();
it.remove();
try{
handleInput(key);
} catch(Exceptione) {
if(key != null) {
key.cancel();
if(key.channel() != null)
key.channel().close();
}
}
}
} catch(Throwablet) {
t.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if(selector != null)
try{
selector.close();
} catch(IOExceptione) {
e.printStackTrace();
}
}
privatevoidhandleInput(SelectionKeykey) throwsIOException{
if(key.isValid()) {
// 处理新接入的请求消息
// 根据操作位判断网络类型。
if(key.isAcceptable()) {
// Accept the new connection
ServerSocketChannelssc = (ServerSocketChannel) key.channel();
SocketChannelsc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()) {
// 读取客户端的消息
SocketChannelsc = (SocketChannel) key.channel();
ByteBufferreadBuffer = ByteBuffer.allocate(1024);
intreadBytes = sc.read(readBuffer);
if(readBytes > 0) {
readBuffer.flip();
byte[] bytes = newbyte[readBuffer.remaining()];
readBuffer.get(bytes);
Stringbody =newString(bytes, "UTF-8");
System.out.println("The time server receive order : "
+ body);
StringcurrentTime = "QUERY TIME ORDER"
.equalsIgnoreCase(body) ? newjava.util.Date(
System.currentTimeMillis()).toString()
: "BAD ORDER";
doWrite(sc, currentTime);
} elseif(readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
// 发送消息异步发给客户端。
privatevoiddoWrite(SocketChannelchannel, Stringresponse)
throwsIOException{
if(response != null&& response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBufferwriteBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
然后看NioClient的案例
publicclassTimeClientHandle implementsRunnable{
privateStringhost;
privateintport;
privateSelectorselector;
privateSocketChannelsocketChannel;
privatevolatilebooleanstop;
publicTimeClientHandle(Stringhost, intport) {
this.host= host == null? "127.0.0.1": host;
this.port= port;
try{
// 打开SocketChannel
selector = Selector.open();
socketChannel = SocketChannel.open();
// 配置伪非阻塞模式
socketChannel.configureBlocking(false);
} catch(IOExceptione) {
e.printStackTrace();
System.exit(1);
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
publicvoidrun() {
try{
doConnect();
} catch(IOExceptione) {
e.printStackTrace();
System.exit(1);
}
while(!stop) {
try{
// 轮询Key
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKeykey = null;
while(it.hasNext()) {
key = it.next();
it.remove();
try{
handleInput(key);
} catch(Exceptione) {
if(key != null) {
key.cancel();
if(key.channel() != null)
key.channel().close();
}
}
}
} catch(Exceptione) {
e.printStackTrace();
System.exit(1);
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if(selector != null)
try{
selector.close();
} catch(IOExceptione) {
e.printStackTrace();
}
}
privatevoidhandleInput(SelectionKeykey) throwsIOException{
if(key.isValid()) {
// 判断是否连接成功
SocketChannelsc = (SocketChannel) key.channel();
if(key.isConnectable()) {
if(sc.finishConnect()) {
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
} else
System.exit(1);//连接失败,进程退出
}
if(key.isReadable()) {
ByteBufferreadBuffer = ByteBuffer.allocate(1024);
intreadBytes = sc.read(readBuffer);
if(readBytes > 0) {
readBuffer.flip();
byte[] bytes = newbyte[readBuffer.remaining()];
readBuffer.get(bytes);
Stringbody =newString(bytes, "UTF-8");
System.out.println("Now is : "+ body);
this.stop= true;
} elseif(readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
privatevoiddoConnect() throwsIOException{
// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if(socketChannel.connect(newInetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
privatevoiddoWrite(SocketChannelsc) throwsIOException{
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBufferwriteBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining())
System.out.println("Send order 2 server succeed.");
}
}
运行结果
概念梳理
• Buffer,它是包含一些要读入或要写出的数据,在Nio库中,所有的数据都是用缓冲区处理的,读数据时直接读到缓冲区中,写数据时写入到缓冲区中。缓冲区通常是一个数组,最常用的是ByteBuffer(它提供了用于操作byte的数组),还有一些其他的Buffer。可以看继承这个类的类
• Channel,通道,可以通过它读取和写入数据,网络数据通过通过Channel读取和写入。通道和流的不同在于通道是双向的,而且通道可以同时用于读写。
• Selector,多路复用器。它是Nio编程的基础,熟练的掌握Selector对于掌握Nio编程至关重要。Selector会不断的轮询注册在上面的Channel,如果某个Channel上面有新的操作(连接,读写,写入事件),这个Channel就处于就绪状态,会被Selector轮询处理,然后通过SelectoionKey获取Channel集合,进行后续的操作。一个Selector可以同时轮询多个Channel,并且没有最大连接数的限制。
小结
Nio编程中客户端和服务端都会使用Selector轮询Channel的事件,不同的是客户端使用的是SocketChannel,服务端使用的是ServerSocketChannel,服务端要监听端口,而客户端进行连接。
Nio编程是Netty中很重要的基础,希望大家能掌握。
注:
本文独家发布自金蝶云社区