NIO编程
金蝶云社区-艾贺521
艾贺521
52人赞赏了该文章 620次浏览 未经作者许可,禁止转载编辑于2019年01月14日 13:40:06
封面


想要深入的学习Netty,Nio编程是必须要掌握的内容,为了更好的学习Netty,这里看一下Nio相关的内容。

 

案例

开始之前,先看一个NioServer端的案例。

image.png

 

publicclassMultiplexerTimeServer implementsRunnable{

    privateSelectorselector;

    privateServerSocketChannelservChannel;

    privatevolatilebooleanstop;

    /**
     *初始化多路复用器、绑定监听端口
     *
     * @param port
     */
    publicMultiplexerTimeServer(intport) {
        try{
            // 创建多路复用器Selector、ServerSocketChannel
           selector = Selector.open();
           servChannel = ServerSocketChannel.open();
            // 配置为非阻塞模式
           servChannel.configureBlocking(false);
            // 绑定端口,将backlog设置为1024
           servChannel.socket().bind(newInetSocketAddress(port),1024);
            // 系统资源初始化成功后,把serverSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位
           servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : "+ port);
        } catch(IOExceptione) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    publicvoidstop() {
        this.stop= true;
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Runnable#run()
     */
    @Override
    publicvoidrun() {
        while(!stop) {
            try{
                //循环体重遍历selector,超时时间为1s。
               selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKeykey = null;
                while(it.hasNext()) {
                   key = it.next();
                   it.remove();
                   try{
                       handleInput(key);
                   } catch(Exceptione) {
                       if(key != null) {
                            key.cancel();
                            if(key.channel() != null)
                                key.channel().close();
                       }
                   }
                }
            } catch(Throwablet) {
                t.printStackTrace();
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if(selector != null)
            try{
                selector.close();
            } catch(IOExceptione) {
                e.printStackTrace();
            }
    }

    privatevoidhandleInput(SelectionKeykey) throwsIOException{

        if(key.isValid()) {
            // 处理新接入的请求消息
            // 根据操作位判断网络类型。
            if(key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannelssc = (ServerSocketChannel) key.channel();
                SocketChannelsc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if(key.isReadable()) {
                // 读取客户端的消息            
                SocketChannelsc = (SocketChannel) key.channel();
                ByteBufferreadBuffer = ByteBuffer.allocate(1024);
                intreadBytes = sc.read(readBuffer);
                if(readBytes > 0) {
                   readBuffer.flip();
                    byte[] bytes = newbyte[readBuffer.remaining()];
                   readBuffer.get(bytes);
                   Stringbody =newString(bytes, "UTF-8");
                   System.out.println("The time server receive order : "
                            + body);
                   StringcurrentTime = "QUERY TIME ORDER"
                            .equalsIgnoreCase(body) ? newjava.util.Date(
                            System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                   doWrite(sc, currentTime);
                } elseif(readBytes < 0) {
                   // 对端链路关闭
                   key.cancel();
                   sc.close();
                } else
                   ; // 读到0字节,忽略
            }
        }
    }

    // 发送消息异步发给客户端。
    privatevoiddoWrite(SocketChannelchannel, Stringresponse)
            throwsIOException{
        if(response != null&& response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBufferwriteBuffer = ByteBuffer.allocate(bytes.length);
           writeBuffer.put(bytes);
           writeBuffer.flip();
           channel.write(writeBuffer);
        }
    }
}

 

然后看NioClient的案例

image.png

 

publicclassTimeClientHandle implementsRunnable{

    privateStringhost;
    privateintport;

    privateSelectorselector;
    privateSocketChannelsocketChannel;

    privatevolatilebooleanstop;

    publicTimeClientHandle(Stringhost, intport) {
        this.host= host == null? "127.0.0.1": host;
        this.port= port;
        try{
            // 打开SocketChannel
           selector = Selector.open();
           socketChannel = SocketChannel.open();
            // 配置伪非阻塞模式
            socketChannel.configureBlocking(false);
        } catch(IOExceptione) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Runnable#run()
     */
    @Override
    publicvoidrun() {
        try{
            doConnect();
        } catch(IOExceptione) {
            e.printStackTrace();
            System.exit(1);
        }
        while(!stop) {
            try{
                // 轮询Key
               selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKeykey = null;
                while(it.hasNext()) {
                   key = it.next();
                   it.remove();
                   try{
                       handleInput(key);
                   } catch(Exceptione) {
                       if(key != null) {
                            key.cancel();
                            if(key.channel() != null)
                                key.channel().close();
                       }
                   }
                }
            } catch(Exceptione) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if(selector != null)
            try{
               selector.close();
            } catch(IOExceptione) {
                e.printStackTrace();
            }

    }

    privatevoidhandleInput(SelectionKeykey) throwsIOException{

        if(key.isValid()) {
            // 判断是否连接成功
            SocketChannelsc = (SocketChannel) key.channel();
            if(key.isConnectable()) {
                if(sc.finishConnect()) {
                   sc.register(selector,SelectionKey.OP_READ);
                   doWrite(sc);
                } else
                   System.exit(1);//连接失败,进程退出
            }
            if(key.isReadable()) {
                ByteBufferreadBuffer = ByteBuffer.allocate(1024);
                intreadBytes = sc.read(readBuffer);
                if(readBytes > 0) {
                   readBuffer.flip();
                   byte[] bytes = newbyte[readBuffer.remaining()];
                   readBuffer.get(bytes);
                   Stringbody =newString(bytes, "UTF-8");
                   System.out.println("Now is : "+ body);
                   this.stop= true;
                } elseif(readBytes < 0) {
                   // 对端链路关闭
                   key.cancel();
                   sc.close();
                } else
                   ; // 读到0字节,忽略
            }
        }

    }

    privatevoiddoConnect() throwsIOException{
        // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
        if(socketChannel.connect(newInetSocketAddress(host, port))) {
           socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    }

    privatevoiddoWrite(SocketChannelsc) throwsIOException{
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBufferwriteBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
       writeBuffer.flip();
        sc.write(writeBuffer);
        if(!writeBuffer.hasRemaining())
            System.out.println("Send order 2 server succeed.");
    }

}

 

运行结果

image.png

 

image.png

 

 

 

概念梳理

•         Buffer,它是包含一些要读入或要写出的数据,在Nio库中,所有的数据都是用缓冲区处理的,读数据时直接读到缓冲区中,写数据时写入到缓冲区中。缓冲区通常是一个数组,最常用的是ByteBuffer(它提供了用于操作byte的数组),还有一些其他的Buffer。可以看继承这个类的类

image.png

 

 

•         Channel,通道,可以通过它读取和写入数据,网络数据通过通过Channel读取和写入。通道和流的不同在于通道是双向的,而且通道可以同时用于读写。

image.png

 

 

•         Selector,多路复用器。它是Nio编程的基础,熟练的掌握Selector对于掌握Nio编程至关重要。Selector会不断的轮询注册在上面的Channel,如果某个Channel上面有新的操作(连接,读写,写入事件),这个Channel就处于就绪状态,会被Selector轮询处理,然后通过SelectoionKey获取Channel集合,进行后续的操作。一个Selector可以同时轮询多个Channel,并且没有最大连接数的限制。

 

 

小结

Nio编程中客户端和服务端都会使用Selector轮询Channel的事件,不同的是客户端使用的是SocketChannel,服务端使用的是ServerSocketChannel,服务端要监听端口,而客户端进行连接。

 

Nio编程是Netty中很重要的基础,希望大家能掌握。

 

 

注:

本文独家发布自金蝶云社区

 


赞 52