博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
nio原理/netty简单应用
阅读量:6804 次
发布时间:2019-06-26

本文共 16760 字,大约阅读时间需要 55 分钟。

一、非阻塞IO模式原理

与阻塞模式对应的另一种模式叫非阻塞IO模式,在整个通信过程中读和写操作不会阻塞,当前处理线程不存在阻塞情况。从A机器到B机器它的通信过程是:A机器一条线程将通道设置为写事件后往下执行,而另外一条线程遍历到此通道有字节要写并往socket写数据,B机器一条线程遍历到此通道有字节要读,交给另外一条线程对socket读数据,处理完又把通道设置为写事件,遍历线程遍历到此通道有字节要写,又往socket写数据传往A机器,不断往下循环此操作直到完成通信。这个过程每台机器都有两类主要线程,一类是负责逻辑处理且将通道改为可写或可读事件的线程,另外一类是专门用于遍历通道并负责socket读写的线程,这种方式就是非阻塞IO模式。

在阻塞IO模式中,存在一个服务端套接字ServerSocket用于接收客户端连进来的Socket,而不管是阻塞还是非阻塞IO最终都需要获取socket才能进行读写操作,与阻塞模式对应,非阻塞模式用于接收客户端socket的对象是ServerSocketChannel,另外,阻塞模式直接使用Socket对象进行读写操作,而非阻塞模式则使用SocketChannel对象进行读写操作,但SocketChannel本质上最终也是通过Socket读取与写入,只是读取或写入时引入了缓冲区概念。最后,还有一个很重要的对象是选择器Selector,它提供对所有channel各种感兴趣事件的筛选功能,即哪些通道需要怎样的处理通过它选择出来的。

往下说说非阻塞模式实现的原理,如下图,ServerSocketChannel调用open()方法初始化封装在里面的socket服务并将ServerSocketChannel以OP_ACCEPT事件注册到Selector中,而操作系统则创建socket底层数据结构并监听客户端socket连接,对于客户端连接操作系统会统一放到一个队列中进行维护。接着是很重要的应用层轮询操作,不断执行Selector检索出感兴趣的事件,假如刚好有三个客户端socket连进来,Selector选择出三个OP_ACCEPT事件,调用ServerSocketChannel.accept()接收三个客户端通道SocketChannel对象,再将这三个客户端通道以OP_READ、OP_WRITE注册到Selector中以便后面进行读写操作,往下如果Selector遍历出OP_READ或OP_WRITE事件则可以对对应的channel进行读写操作了。

 

Selector在其中扮演最重要的角色,看看它是如何完成感兴趣的事件的筛选的。如上图,中间Selector便是它的大体结构,维护了registeredKeys、selectedKeys、cancelledKeys三个集合,还有一张channel与Key对应关系的表,而Key则包含了感兴趣事件集interestOps和已准备好的事件集readyOps。其中registeredKeys存放注册到Selector的所有key,而selectedKeys即是被选中的key,它是检测到registeredKeys中key感兴趣的事件发生后存放key的地方,cancelledKeys则是已经调用了cancel()方法待反注册的key。当应用层中Selector不断调用select()方法时,会先根据cancelledKeys去删除registeredKeys和selectedKeys对应的key以至取消对应的key,然后间接调用操作系统去做操作系统级别的select,一旦有registeredKeys感兴趣的事件则将对应事件的key添加到selectedKeys中,如selectedKeys已存在key了则将事件添加到key中的已准备好的事件集readyOps中。经过此番操作,当应用层调用Selector的selectedKeys()则取到被选中的key集,进而可以获取到感兴趣事件对应的channel,根据事件对channel进行操作。

    理解了非阻塞IO模式的原理有助于在实际场景中对网络IO的模式选型,一般在同时需要处理多个连接的高并发场景中会使用非阻塞NIO模式,它通过一个或少量线程去维护连接,而把具体的读写和逻辑处理交由其他线程处理,大大提高了机器的使用率,压榨机器CPU。而如果使用阻塞IO模式则可能线程都阻塞在IO而导致机器使用率较低。

二、java NIO服务端和客户端代码实现

为了更好地理解java NIO,下面贴出服务端和客户端的简单代码实现。

服务端:

1 package cn.nio;  2   3 import java.io.IOException;  4 import java.net.InetSocketAddress;  5 import java.nio.ByteBuffer;  6 import java.nio.channels.SelectionKey;  7 import java.nio.channels.Selector;  8 import java.nio.channels.ServerSocketChannel;  9 import java.nio.channels.SocketChannel; 10 import java.util.Iterator; 11  12 /** 13  * NIO服务端 14  * @author 小路 15  */ 16 public class NIOServer { 17     //通道管理器 18     private Selector selector; 19  20     /** 21      * 获得一个ServerSocket通道,并对该通道做一些初始化的工作 22      * @param port  绑定的端口号 23      * @throws IOException 24      */ 25     public void initServer(int port) throws IOException { 26         // 获得一个ServerSocket通道 27         ServerSocketChannel serverChannel = ServerSocketChannel.open(); 28         // 设置通道为非阻塞 29         serverChannel.configureBlocking(false); 30         // 将该通道对应的ServerSocket绑定到port端口 31         serverChannel.socket().bind(new InetSocketAddress(port)); 32         // 获得一个通道管理器 33         this.selector = Selector.open(); 34         //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后, 35         //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。 36         serverChannel.register(selector, SelectionKey.OP_ACCEPT); 37     } 38  39     /** 40      * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 41      * @throws IOException 42      */ 43     @SuppressWarnings("unchecked") 44     public void listen() throws IOException { 45         System.out.println("服务端启动成功!"); 46         // 轮询访问selector 47         while (true) { 48             //当注册的事件到达时,方法返回;否则,该方法会一直阻塞 49             selector.select(); 50             // 获得selector中选中的项的迭代器,选中的项为注册的事件 51             Iterator ite = this.selector.selectedKeys().iterator(); 52             while (ite.hasNext()) { 53                 SelectionKey key = (SelectionKey) ite.next(); 54                 // 删除已选的key,以防重复处理 55                 ite.remove(); 56                 // 客户端请求连接事件 57                 if (key.isAcceptable()) { 58                     ServerSocketChannel server = (ServerSocketChannel) key 59                             .channel(); 60                     // 获得和客户端连接的通道 61                     SocketChannel channel = server.accept(); 62                     // 设置成非阻塞 63                     channel.configureBlocking(false); 64  65                     //在这里可以给客户端发送信息哦 66                     channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes())); 67                     //在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。 68                     channel.register(this.selector, SelectionKey.OP_READ); 69                      70                     // 获得了可读的事件 71                 } else if (key.isReadable()) { 72                         read(key); 73                 } 74  75             } 76  77         } 78     } 79     /** 80      * 处理读取客户端发来的信息 的事件 81      * @param key 82      * @throws IOException  83      */ 84     public void read(SelectionKey key) throws IOException{ 85         // 服务器可读取消息:得到事件发生的Socket通道 86         SocketChannel channel = (SocketChannel) key.channel(); 87         // 创建读取的缓冲区 88         ByteBuffer buffer = ByteBuffer.allocate(10); 89         channel.read(buffer); 90         byte[] data = buffer.array(); 91         String msg = new String(data).trim(); 92         System.out.println("服务端收到信息:"+msg); 93         ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); 94         channel.write(outBuffer);// 将消息回送给客户端 95     } 96      97     /** 98      * 启动服务端测试 99      * @throws IOException 100      */101     public static void main(String[] args) throws IOException {102         NIOServer server = new NIOServer();103         server.initServer(8000);104         server.listen();105     }106 107 }

客户端:

1 package cn.nio;  2   3 import java.io.IOException;  4 import java.net.InetSocketAddress;  5 import java.nio.ByteBuffer;  6 import java.nio.channels.SelectionKey;  7 import java.nio.channels.Selector;  8 import java.nio.channels.SocketChannel;  9 import java.util.Iterator; 10  11 /** 12  * NIO客户端 13  * @author 小路 14  */ 15 public class NIOClient { 16     //通道管理器 17     private Selector selector; 18  19     /** 20      * 获得一个Socket通道,并对该通道做一些初始化的工作 21      * @param ip 连接的服务器的ip 22      * @param port  连接的服务器的端口号          23      * @throws IOException 24      */ 25     public void initClient(String ip,int port) throws IOException { 26         // 获得一个Socket通道 27         SocketChannel channel = SocketChannel.open(); 28         // 设置通道为非阻塞 29         channel.configureBlocking(false); 30         // 获得一个通道管理器 31         this.selector = Selector.open(); 32          33         // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调 34         //用channel.finishConnect();才能完成连接 35         channel.connect(new InetSocketAddress(ip,port)); 36         //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。 37         channel.register(selector, SelectionKey.OP_CONNECT); 38     } 39  40     /** 41      * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 42      * @throws IOException 43      */ 44     @SuppressWarnings("unchecked") 45     public void listen() throws IOException { 46         // 轮询访问selector 47         while (true) { 48             selector.select(); 49             // 获得selector中选中的项的迭代器 50             Iterator ite = this.selector.selectedKeys().iterator(); 51             while (ite.hasNext()) { 52                 SelectionKey key = (SelectionKey) ite.next(); 53                 // 删除已选的key,以防重复处理 54                 ite.remove(); 55                 // 连接事件发生 56                 if (key.isConnectable()) { 57                     SocketChannel channel = (SocketChannel) key 58                             .channel(); 59                     // 如果正在连接,则完成连接 60                     if(channel.isConnectionPending()){ 61                         channel.finishConnect(); 62                          63                     } 64                     // 设置成非阻塞 65                     channel.configureBlocking(false); 66  67                     //在这里可以给服务端发送信息哦 68                     channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes())); 69                     //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。 70                     channel.register(this.selector, SelectionKey.OP_READ); 71                      72                     // 获得了可读的事件 73                 } else if (key.isReadable()) { 74                         read(key); 75                 } 76  77             } 78  79         } 80     } 81     /** 82      * 处理读取服务端发来的信息 的事件 83      * @param key 84      * @throws IOException  85      */ 86     public void read(SelectionKey key) throws IOException{ 87         //和服务端的read方法一样 88     } 89      90      91     /** 92      * 启动客户端测试 93      * @throws IOException  94      */ 95     public static void main(String[] args) throws IOException { 96         NIOClient client = new NIOClient(); 97         client.initClient("localhost",8000); 98         client.listen(); 99     }100 101 }

三、Netty应用

netty框架对NIO进行了封装,下面是一个简单的netty应用例子的代码。

服务端(netty server):

一个NettyServer程序主要由两部分组成:
  • BootsTrapping:配置服务器端基本信息
  • ServerHandler:真正的业务逻辑处理

BootsTrapping的过程:

     1. 创建一个ServerBootstrap实例

     2. 创建一个EventLoopGroup来处理各种事件,如处理链接请求,发送接收数据等。
     3. 定义本地InetSocketAddress( port)好让Server绑定
     4. 创建childHandler来处理每一个链接请求   
     5. 所有准备好之后调用ServerBootstrap.bind()方法绑定Server

1 package NettyDemo.echo.server;   2    3 import io.netty.bootstrap.ServerBootstrap;   4 import io.netty.channel.ChannelFuture;   5 import io.netty.channel.ChannelInitializer;   6 import io.netty.channel.EventLoopGroup;   7 import io.netty.channel.nio.NioEventLoopGroup;   8 import io.netty.channel.socket.SocketChannel;   9 import io.netty.channel.socket.nio.NioServerSocketChannel;  10 import java.net.InetSocketAddress;  11 import NettyDemo.echo.handler.EchoServerHandler;  12 public class EchoServer {  13     private static final int port = 8080;  14     public void start() throws InterruptedException {  15         ServerBootstrap b = new ServerBootstrap();// 引导辅助程序  16         EventLoopGroup group = new NioEventLoopGroup();// 通过nio方式来接收连接和处理连接  17         try {  18             b.group(group);  19             b.channel(NioServerSocketChannel.class);// 设置nio类型的channel  20             b.localAddress(new InetSocketAddress(port));// 设置监听端口  21             b.childHandler(new ChannelInitializer
() {
//有连接到达时会创建一个channel 22 protected void initChannel(SocketChannel ch) throws Exception { 23 // pipeline管理channel中的Handler,在channel队列中添加一个handler来处理业务 24 ch.pipeline().addLast("myHandler", new EchoServerHandler()); 25 } 26 }); 27 ChannelFuture f = b.bind().sync();// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功 28 System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress()); 29 f.channel().closeFuture().sync();// 应用程序会一直等待,直到channel关闭 30 } catch (Exception e) { 31 e.printStackTrace(); 32 } finally { 33 group.shutdownGracefully().sync();//关闭EventLoopGroup,释放掉所有资源包括创建的线程 34 } 35 } 36 public static void main(String[] args) { 37 try { 38 new EchoServer().start(); 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 } 42 } 43 }

业务逻辑ServerHandler:

要想处理接收到的数据,我们必须继承ChannelInboundHandlerAdapter接口,重写里面的MessageReceive方法,每当有数据到达,此方法就会被调用(一般是Byte类型数组),我们就在这里写我们的业务逻辑:

1 package NettyDemo.echo.handler;   2    3 import io.netty.buffer.Unpooled;   4 import io.netty.channel.ChannelFutureListener;   5 import io.netty.channel.ChannelHandlerContext;   6 import io.netty.channel.ChannelInboundHandlerAdapter;   7 import io.netty.channel.ChannelHandler.Sharable;   8 /**  9  * Sharable表示此对象在channel间共享 10  * handler类是我们的具体业务类 11  * */  12 @Sharable//注解@Sharable可以让它在channels间共享  13 public class EchoServerHandler extends ChannelInboundHandlerAdapter{  14     public void channelRead(ChannelHandlerContext ctx, Object msg) {   15         System.out.println("server received data :" + msg);   16         ctx.write(msg);//写回数据,  17     }   18     public void channelReadComplete(ChannelHandlerContext ctx) {   19         ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) //flush掉所有写回的数据  20         .addListener(ChannelFutureListener.CLOSE); //当flush完成后关闭channel  21     }   22     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) {   23         cause.printStackTrace();//捕捉异常信息  24         ctx.close();//出现异常时关闭channel   25     }     26 }
关于异常处理:
     我们在上面程序中也重写了exceptionCaught方法,这里就是对当异常出现时的处理。
 
客户端(nettyclient):
一般一个简单的Client会扮演如下角色:
  1. 连接到Server
  2. 向Server写数据
  3. 等待Server返回数据
  4. 关闭连接
BootsTrapping的过程:
     和Server端类似,只不过Client端要同时指定连接主机的IP和Port。
     1. 创建一个ServerBootstrap实例
     2. 创建一个EventLoopGroup来处理各种事件,如处理链接请求,发送接收数据等。
     3. 定义一个远程InetSocketAddress好让客户端连接
     4. 当连接完成之后,Handler会被执行一次   
     5. 所有准备好之后调用ServerBootstrap.connect()方法连接Server
1 package NettyDemo.echo.client;   2    3 import io.netty.bootstrap.Bootstrap;   4 import io.netty.channel.ChannelFuture;   5 import io.netty.channel.ChannelFutureListener;   6 import io.netty.channel.ChannelInitializer;   7 import io.netty.channel.EventLoopGroup;   8 import io.netty.channel.nio.NioEventLoopGroup;   9 import io.netty.channel.socket.SocketChannel;  10 import io.netty.channel.socket.nio.NioSocketChannel;  11   12 import java.net.InetSocketAddress;  13   14 import NettyDemo.echo.handler.EchoClientHandler;  15   16 public class EchoClient {  17     private final String host;  18     private final int port;  19   20     public EchoClient(String host, int port) {  21         this.host = host;  22         this.port = port;  23     }  24   25     public void start() throws Exception {  26         EventLoopGroup group = new NioEventLoopGroup();  27         try {  28             Bootstrap b = new Bootstrap();  29             b.group(group);  30             b.channel(NioSocketChannel.class);  31             b.remoteAddress(new InetSocketAddress(host, port));  32             b.handler(new ChannelInitializer
() { 33 34 public void initChannel(SocketChannel ch) throws Exception { 35 ch.pipeline().addLast(new EchoClientHandler()); 36 } 37 }); 38 ChannelFuture f = b.connect().sync(); 39 f.addListener(new ChannelFutureListener() { 40 41 public void operationComplete(ChannelFuture future) throws Exception { 42 if(future.isSuccess()){ 43 System.out.println("client connected"); 44 }else{ 45 System.out.println("server attemp failed"); 46 future.cause().printStackTrace(); 47 } 48 49 } 50 }); 51 f.channel().closeFuture().sync(); 52 } finally { 53 group.shutdownGracefully().sync(); 54 } 55 } 56 57 public static void main(String[] args) throws Exception { 58 59 new EchoClient("127.0.0.1", 3331).start(); 60 } 61 }
业务逻辑ClientHandler:
       我们同样继承一个SimpleChannelInboundHandler来实现我们的Client,我们需要重写其中的三个方法:
1 package NettyDemo.echo.handler;   2    3 import io.netty.buffer.ByteBuf;   4 import io.netty.buffer.ByteBufUtil;   5 import io.netty.buffer.Unpooled;   6 import io.netty.channel.ChannelHandlerContext;   7 import io.netty.channel.SimpleChannelInboundHandler;   8 import io.netty.channel.ChannelHandler.Sharable;   9 import io.netty.util.CharsetUtil;  10   11 @Sharable  12 public class EchoClientHandler extends SimpleChannelInboundHandler
{ 13 /** 14 *此方法会在连接到服务器后被调用 15 * */ 16 public void channelActive(ChannelHandlerContext ctx) { 17 ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); 18 } 19 /** 20 *此方法会在接收到服务器数据后调用 21 * */ 22 public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { 23 System.out.println("Client received: " + ByteBufUtil.hexDump(in.readBytes(in.readableBytes()))); 24 } 25 /** 26 *捕捉到异常 27 * */ 28 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 29 cause.printStackTrace(); 30 ctx.close(); 31 } 32 33 }

其中需要注意的是channelRead0()方法,此方法接收到的可能是一些数据片段,比如服务器发送了5个字节数据,Client端不能保证一次全部收到,比如第一次收到3个字节,第二次收到2个字节。我们可能还会关心收到这些片段的顺序是否可发送顺序一致,这要看具体是什么协议,比如基于TCP协议的字节流是能保证顺序的。

    还有一点,在Client端我们的业务Handler继承的是SimpleChannelInboundHandler,而在服务器端继承的是ChannelInboundHandlerAdapter,那么这两个有什么区别呢?最主要的区别就是SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuffer资源(自动调用Bytebuffer.release())。而为何服务器端不能用呢,因为我们想让服务器把客户端请求的数据发送回去,而服务器端有可能在channelRead方法返回前还没有写完数据,因此不能让它自动release。

转载于:https://www.cnblogs.com/nsxqf/p/7152577.html

你可能感兴趣的文章
IntelliJ IDEA 2016.1破解码一枚
查看>>
metasploit ***测试笔记(meterpreter篇)
查看>>
HTTP基础
查看>>
JavaSE学习笔记(五)——类与对象
查看>>
Android之高仿飞鸽传输热点创建与搜索模块
查看>>
Struts2、Spring和Hibernate应用实例(中)
查看>>
[转]MYSQL性能优化分享(分库分表)
查看>>
用php实现异步执行任务的队列(一)
查看>>
AngularJS表单验证操作例子分享
查看>>
RabbitMQ 的安装与工作模式
查看>>
视图的跳转,ViewController的使用 。试图出现启动消失过程
查看>>
博科300光纤交换机配置手册/操作方法/密码设置/用户指南大全
查看>>
HTML Dom
查看>>
Linux下为PHP添加扩展库的方法
查看>>
HBase(四):HBase API判断表是否存在,结果问题爆棚。。
查看>>
宏定义冲突
查看>>
cobbler-自动化部署
查看>>
我的友情链接
查看>>
tracepath
查看>>
java多线程基础复习
查看>>