Java IO使用的四种模式


声明:本文转载自https://my.oschina.net/zhangxufeng/blog/3014286,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

       对于Java IO,从大的种类上来分,可以分为BIO和NIO。BIO全称为Blocked IO,也即阻塞型IO,而NIO则是在jdk 1.4中引入的,一般称其为New IO,因为这是相对于1.4版本之前的阻塞型IO而言的,但是也有人称其为Non-blocked IO。相对而言,本人更喜欢第二种叫法,因为从字面上更符合其使用含义。本文则主要基于BIO和NIO讲解四种IO模式的基本使用方式,并且对这四种模式的优缺点进行对比。

1. 同步BIO模式

       最基本的BIO使用方式就是同步SocketChannel的方式,我们这里以经典的EchoClient/EchoServer模式来对其进行讲解,如下是一个示例:

public class EchoServer {
  @Test
  public void testServer() throws IOException {
    ServerSocketChannel server = ServerSocketChannel.open();  // 开启一个ServerSocketChannel
    server.bind(new InetSocketAddress(8080));  // 将服务器绑定到8080端口
    while (true) {  // 在无限循环中不断接收客户端的请求,并且进行处理
      SocketChannel channel = server.accept();  // 接收客户端请求,如果没有请求到来,则阻塞在这里
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      channel.read(buffer);  // 读取客户端请求Channel中的数据
      handle(buffer);  // 处理客户端请求channel的数据
      response(channel);  // 往客户端channel中写入数据,以返回响应
    }
  }
    
  private void handle(ByteBuffer buffer) {
    buffer.flip();  // 读取数据之后需要切换ByteBuffer的模式为读取模式
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    System.out.println("Server receives message: ");
    System.out.println(new String(bytes, StandardCharsets.UTF_8));  // 打印客户端发送的数据
  }

  private void response(SocketChannel channel) throws IOException {
    ByteBuffer buffer = ByteBuffer
        .wrap(("Hello, I'm server. It's: " + new Date()).getBytes());
    channel.write(buffer);  // 往客户端中写入当前数据
    channel.close();
  }
}

       这里可以看到,服务端处理主要是首先开启一个ServerSocketChannel,然后在一个无限循环中不断获取客户端连接,获取之后进行处理,并且写入响应信息。下面我们看看客户端代码:

public class EchoClient {
  @Test
  public void testClient() throws IOException {
    SocketChannel channel = SocketChannel.open();  // 开启一个客户端SocketChannel
    channel.connect(new InetSocketAddress("127.0.0.1", 8080));  // 连接服务器ip和端口
    request(channel);  // 发送客户端请求
    handleResponse(channel);  // 处理服务端响应
  }

  private void request(SocketChannel channel) throws IOException {
    ByteBuffer buffer = ByteBuffer.wrap("Hello, I'm client. ".getBytes());
    channel.write(buffer);  // 将客户端请求数据写入到Channel中
  }

  private void handleResponse(SocketChannel channel) throws IOException {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);  // 读取服务端响应数据
    buffer.flip();  // 这里需要切换模式,因为上面的read()操作相对于ByteBuffer而言是写入
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    System.out.println("Client receives message: ");
    System.out.println(new String(bytes, StandardCharsets.UTF_8));  // 打印服务端返回数据
  }
}

       如下分别是服务端和客户端打印的数据:

Server receives message: 
Hello, I'm client. 
Client receives message: 
Hello, I'm server. It's: Sun Feb 24 09:28:41 CST 2019

       可以看到,服务端和客户端都正常接收并且处理了对方的数据。关于同步BIO模式,这里主要存在以下几个问题:

  • 在服务端中,包括绑定端口,接收客户端连接,处理客户端请求数据,响应客户端等都是在同一个线程中进行的,也就是说服务器在同一时刻只能处理一个客户端链接,这极大的限制了服务器的响应效率;
  • ServerSocketChannel.accept()方法是一个阻塞型的方法,在接收客户端连接时,会受到网络环境的影响,对其效率产生很大影响;
  • SocketChannel.write()方法可能会因为数据的写入效率问题而对服务端线程产生影响,最终影响到服务器的性能;

2. 异步BIO模式

       上述BIO模式中,最主要的问题在于服务器同一时刻只能处理一个客户端请求,这会极大的限制服务器性能。这里可以采用异步BIO的模式解决这个问题,也就是上面的服务器主线程只负责接收客户端请求,在收到请求之后将客户端请求Channel委托到一个线程池中异步进行处理。这样服务器在同一时刻就可以同时建立多个连接,极大的提升了服务器的性能。如下是EchoServer的代码:

public class EchoServer {

  // 声明一个线程池,用于异步处理客户端请求
  private static final ExecutorService executor = Executors.newFixedThreadPool(10);

  @Test
  public void testServer() throws IOException {
    ServerSocketChannel server = ServerSocketChannel.open();
    server.bind(new InetSocketAddress(8080));
    while (true) {
      // 服务器通过accept()方法接收客户端请求,如果没有客户端请求,当前线程就会阻塞在这里
      SocketChannel channel = server.accept();
      // 在接收到客户端请求之后,将该请求委托到线程池中进行处理
      executor.execute(() -> {
        try {
          ByteBuffer buffer = ByteBuffer.allocate(1024);
          channel.read(buffer);
          handle(buffer);
          response(channel);
        } catch (Exception e) {
          e.printStackTrace();
        }
      });
    }
  }

  private void response(SocketChannel channel) throws IOException {
    ByteBuffer buffer = ByteBuffer
        .wrap(("Hello, I'm server. It's: " + new Date()).getBytes());
    channel.write(buffer);
    channel.close();
  }

  private void handle(ByteBuffer buffer) {
    buffer.flip();
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    System.out.println("Server receives message: ");
    System.out.println(new String(bytes, StandardCharsets.UTF_8));
  }
}

       对于客户端代码,其与同步BIO模式中的一致,这里就不再赘述。可以看到,相对于同步BIO模式,异步BIO模式改进了其主要问题,因而可以在同一时刻接收到多个客户端请求。但是对于异步BIO模式,其存在的问题也非常明显:

  • 由于客户端连接都是异步放在线程池中进行处理,因而同一时刻能够接收到的客户端请求数量将严重受限于这里线程池的大小,而服务器的线程数量也不是可以无限增大的。这里比较典型的如Tomcat容器,其最初采用的就是这种模式,而其线程池大小则默认为200。那么在现在这种对服务器并发性能越来越高的情况下,一个Tomcat容器所能承载的TPS只能达到几千,如果要提升服务器整体负载,只能通过负载均衡的方式进行横向扩容;
  • 在服务器接收客户端请求时,可能由于并发量较大,导致ServerSocketChannel.accept()方法负载过高,而这里却不能通过使用线程池的方式来使用多个线程同时接收客户端连接;
  • 在线程池中线程处理完客户端数据,并且往客户端channel中写入数据时,这是在客户端线程中进行的,而SocketChannel.write()方法可能由于网络原因导致一定的阻塞,从而导致线程池线程长时间耗费在等待上,导致服务器响应降低。

3. NIO模式

       由于BIO存在的诸多问题,在jdk 1.4中,Java提供了一种非阻塞型的IO模型,也即NIO。NIO本质上采用的是IO多路复用模式,实际上就是一个事件驱动模型,简单的理解为一个服务器在绑定某个端口之后,其可以在一个线程了同时监听多个客户端连接,而且服务器可以对每个客户端分别设置对其哪些事件感兴趣。当客户端有对应的事件发生时,其就会通知服务器监听线程,服务器线程监听到对应的事件之后,其就会将其交由线程池处理对应的事件。实际上,Java的NIO模式在底层也是依赖于操作系统的多路复用模型,对于Linux系统,其底层是使用epoll模型实现的,而对于Mac os,其则是使用kqueue模型实现的。如下是一个IO多路复用的示意图:

image.png

       这里的Selector就可以理解为一个多路复用器,每个客户端连接就是一个SocketChannel,这些SocketChannel会在Selector上注册,并且设置对各个Channel感兴趣的事件。当Selector监听到对应的事件之后,其就会将事件交由下层的线程处理。如下是一个使用NIO处理客户端事件的示例:

public class EchoServer {
  @Test
  public void testServer() throws IOException {
    ServerSocketChannel server = ServerSocketChannel.open();  // 创建服务端ServerSocketChannel
    server.configureBlocking(false);  // 设置服务端channel为非阻塞模式
    server.bind(new InetSocketAddress(8080));  // 绑定服务器到8080端口

    Selector selector = Selector.open();  // 开启一个多路复用器
    // 对于服务端而言,其一开始只需要监听服务器的Accept事件,该事件表示服务器成功接收到一个客户端连接
    server.register(selector, SelectionKey.OP_ACCEPT);
    while (true) {
      // 这里select()方法会一直等待所有注册的channel发生其所感兴趣的事件。这里分为两种情况:
      // 1. 对于刚开始启动时,这里只会收到服务器接收到客户端连接的Accept事件,由于服务器接收到客户端
      //    连接并不一定代表客户端有数据发送过来,因而这里会将客户端Channel注册到Selector上,
      //    同时监听服务端和客户端的事件;
      // 2. 对于客户端而言,在其注册到Selector上之后,就会监听其read和write事件,从而进行数据读写。
      // 需要注意的是,如果没有可用事件,这里select()方法会一直阻塞,直到有感兴趣的事件到达
      int size = selector.select();
      Set<SelectionKey> keys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = keys.iterator();
      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        // 这里一定要进行remove,因为Selector不会对已经产生的事件进行移除,
        // 否则下次循环该事件就会被重复处理
        iterator.remove();

        if (key.isAcceptable()) {
          accept(key, selector);  // 处理接收客户端请求事件
        } else if (key.isReadable()) {
          read(key, selector);  // 处理从客户端channel读取事件
        } else if (key.isWritable()) {
          write(key);  // 处理往客户端channel写入数据事件
        }
      }
    }
  }

  private void accept(SelectionKey key, Selector selector) throws IOException {
    // 这里由于只有ServerSocketChannel才会有accept事件,因而可以直接强转为ServerSocketChannel
    ServerSocketChannel server = (ServerSocketChannel) key.channel();
    SocketChannel socketChannel = server.accept();  // 获取客户端连接
    socketChannel.configureBlocking(false);  // 设置客户端channel为非阻塞模式
    socketChannel.register(selector, SelectionKey.OP_READ);  // 注册客户端channel到selector上
  }

  // 在客户端连接建立之后,客户端channel就会发送数据到服务端,从而产生read事件
  private void read(SelectionKey key, Selector selector) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);  // 读取客户端channel数据
    handle(buffer);  // 处理读取的客户端数据
    channel.register(selector, SelectionKey.OP_WRITE);  // 读取完成之后注册write事件
  }

  // 处理客户端数据
  private void handle(ByteBuffer buffer) {
    buffer.flip();
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    System.out.println("Server receives message: ");
    System.out.println(new String(bytes, StandardCharsets.UTF_8));
  }

  // 当客户端数据处理完成之后,就会注册写入事件,此时是可以往客户端写入数据的
  private void write(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    response(channel);  // 往客户端channel写入数据
  }

  private void response(SocketChannel channel) throws IOException {
    ByteBuffer buffer = ByteBuffer
        .wrap(("Hello, I'm server. It's: " + new Date()).getBytes());
    channel.write(buffer);
    channel.close();
  }
}

       在上述代码中,我们首先创建了一个ServerSocketChannel,并且通过其*configureBlocking()*方法将其设置为非阻塞模式,设置为这种模式之后,其accept()等方法就是非阻塞的。然后我们创建了一个Selector多路复用器,并且将ServerSocketChannel注册到该多路复用器上。接着通过多路复用器的select()方法阻塞当前线程,等待注册的Channel事件触发。在触发之后通过遍历SelectionKey对象来进行不同事件的处理。下面我们来看看使用Java NIO来实现客户端的代码:

public class EchoClient {
  @Test
  public void testClient() throws IOException {
    SocketChannel socketChannel = SocketChannel.open();  // 创建一个SocketChannel对象
    socketChannel.configureBlocking(false);  // 设置为非阻塞模式

    Selector selector = Selector.open();  // 开启一个Selector多路复用器,并且将其注册到多路复用器上
    socketChannel.register(selector, SelectionKey.OP_CONNECT);
    // 连接服务器,这里需要注意将其放在注册代码之后,否则就无法触注册的连接事件,因为注册时已连接完成
    socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
    while (true) {
      // 阻塞等待客户端channel事件的触发,这里需要注意的是,对于刚注册的SocketChannel,其只对
      // Connect事件进行了监听,因而只会触发Connect事件,而connect之后才会注册write事件,因为
      // 对于客户端而言,其注册成功之后就会往服务端发送数据,因而注册的是write事件。在write数据完成
      // 之后,就会将其切换为监听read事件,等待服务器的响应并且进行处理
      selector.select();
      Set<SelectionKey> keys = selector.selectedKeys();  // 获取触发的事件
      Iterator<SelectionKey> iterator = keys.iterator();
      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();  // 与服务端一样,需要将已处理的事件进行移除

        if (key.isConnectable()) {
          connect(key, selector);  // 处理连接完成事件
        } else if (key.isWritable()) {
          write(key, selector);  // 处理写入事件
        } else if (key.isReadable()) {
          read(key);  // 处理读取事件
        }
      }
    }
  }

  // 处理连接事件,在连接完成之后重新注册写入事件以准备写入数据
  private void connect(SelectionKey key, Selector selector) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    channel.register(selector, SelectionKey.OP_WRITE);  // 注册写入事件
    channel.finishConnect();
  }

  private void write(SelectionKey key, Selector selector) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    request(channel);  // 向SocketChannel中写入数据
    channel.register(selector, SelectionKey.OP_READ); // 写入完成后切换为监听读取事件等待服务器响应
  }

  private void request(SocketChannel channel) throws IOException {
    ByteBuffer buffer = ByteBuffer.wrap("Hello, I'm client. ".getBytes());
    channel.write(buffer);  // 写入数据到SocketChannel
  }

  private void read(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    handleResponse(channel);  // 处理服务器响应的数据
    channel.close();
  }

  private void handleResponse(SocketChannel channel) throws IOException {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);  // 读取服务器响应的数据
    buffer.flip();
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    System.out.println("Client receives message: ");
    System.out.println(new String(bytes, StandardCharsets.UTF_8));  // 打印服务器响应的数据
  }
}

       这里客户端的处理模式与服务端基本类似,只是客户端首先监听的是Connect事件;在连接成功后切换为监听写入事件,以写入数据发送到服务端;在发送完成后,又会切换为监听读取事件,以等待服务器发送数据并且进行处理。这里NIO模式相对于BIO主要有以下几个优点:

  • 因为是基于操作系统的多路复用模型,只需要监听客户端的各个事件即可,因而使用一个线程即可处理大量的客户端连接,非常适合高并发服务器的编排。并且这里如果客户端并发量非常大,那么是可以使用一个线程组来专门处理客户端channel的连接事件,然后将其余的读写事件向下分发到到相应的IO线程组里;
  • 由于是基于事件驱动模型处理相应的IO事件,因而这里对客户端的数据处理是非常高效的。

4. AIO模式

       对于AIO模式,其是在jdk 1.7中加入的,主要原因是NIO模式代码编写非常复杂,并且容易出错。AIO本质上还是使用的NIO的多路复用来实现的,只不过在模型上其使用的是一种事件回调的方式处理各个事件,这种方式更加符合NIO异步模型的概念,并且在编码难易程度上比NIO要小很多。

       在AIO中,所有的操作都是异步执行的,而每个事件都是通过一个回调函数来进行的,这里也就是一个CompletionHandler对象。这里我们以EchoClient和EchoServer为例看一下AIO模式的使用方式:

public class EchoServer {
  @Test
  public void testServer() throws IOException, InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    // 创建一个异步的ServerSocketChannel,然后绑定8080端口,并且处理其accept事件
    AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
    server.bind(new InetSocketAddress(8080));
    server.accept(server, new AcceptCompletionHandler());
    latch.await();
  }
}
public class AcceptCompletionHandler 
    implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {

  // 这里对于对象的传递,是通过Attachement的方式进行的,这样就可以将原始Channel传递到各个异步回调函数使用
  @Override
  public void completed(AsynchronousSocketChannel result,
    AsynchronousServerSocketChannel attachment) {
    attachment.accept(attachment, this);  // 在处理了accept事件之后,继续递归监听下一个accept事件
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    result.read(buffer, buffer, new ReadCompletionHandler(result));  // 处理客户端的数据
  }

  @Override
  public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
    exc.printStackTrace();
  }
}
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {

  private AsynchronousSocketChannel channel;

  public ReadCompletionHandler(AsynchronousSocketChannel channel) {
    this.channel = channel;
  }

  @Override
  public void completed(Integer result, ByteBuffer attachment) {
    attachment.flip();
    byte[] body = new byte[attachment.remaining()];
    attachment.get(body);  // 读取客户端发送的数据
    try {
      System.out.println("Server receives message: ");
      System.out.println(new String(body, CharsetUtil.UTF_8));  // 打印客户端数据
      doWrite();  // 往客户端Channel中写入数据作为服务端的响应
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  private void doWrite() {
    ByteBuffer buffer = ByteBuffer
        .wrap(("Hello, I'm server. It's: " + new Date()).getBytes());
    // 异步的写入服务端的数据,这里也是通过一个CompletionHandler来异步的写入数据
    channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
      @Override
      public void completed(Integer result, ByteBuffer attachment) {
        if (attachment.hasRemaining()) {
          channel.write(attachment, attachment, this);  // 将数据写入到服务器channel中
        }
      }

      @Override
      public void failed(Throwable exc, ByteBuffer attachment) {
        try {
          channel.close();
        } catch (IOException e) {
          // ignore
        }
      }
    });
  }

  @Override
  public void failed(Throwable exc, ByteBuffer attachment) {
    try {
      channel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

       对于上述的AIO模式服务端的代码编写,可以看出来,AIO模式完全是基于异步线程池处理客户端事件的,而且对于每个事件的处理,其都是通过一个CompletionHandler进行处理的。对于服务端而言,其首先通过AcceptCompletionHandler处理的是accept事件,处理完成之后就监听客户端的read事件,然后通过ReadCompletionHandler处理客户端的数据读入事件;最后通过一个内部类(本质上也是一个CompletionHandler)往客户端写入数据。下面我们看一下客户端代码:

public class EchoClient {
  @Test
  public void testClient() throws IOException, InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    // 创建一个客户端AsynchronousSocketChannel
    AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
    // 异步连接服务器,并且将连接后的处理交由ConnectCompletionHandler进行
    socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080), 
       socketChannel, new ConnectCompletionHandler(latch));
    latch.await();
  }
}
public class ConnectCompletionHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {

  private CountDownLatch latch;

  public ConnectCompletionHandler(CountDownLatch latch) {
    this.latch = latch;
  }

  @Override
  public void completed(Void result, AsynchronousSocketChannel channel) {
    ByteBuffer buffer = ByteBuffer.wrap("Hello, I'm client. ".getBytes());
    // 连接完成后,往Channel中写入数据,以发送给服务器
    channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
      @Override
      public void completed(Integer result, ByteBuffer buffer) {
        if (buffer.hasRemaining()) {
          channel.write(buffer, buffer, this);
        } else {
          ByteBuffer readBuffer = ByteBuffer.allocate(1024);
          // 写入完成后,这里异步监听服务器的响应数据
          channel.read(readBuffer, readBuffer, 
                       new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
              attachment.flip();
              byte[] bytes = new byte[attachment.remaining()];
              readBuffer.get(bytes);  // 读取服务器响应的数据,并且进行处理
              try {
                System.out.println("Client receives message: ");
                System.out.println(new String(bytes, StandardCharsets.UTF_8));
                latch.countDown();
              } catch (Exception e) {
                e.printStackTrace();
              }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
              try {
                channel.close();
                latch.countDown();
              } catch (IOException e) {
                e.printStackTrace();
              }
            }
          });
        }
      }

      @Override
      public void failed(Throwable exc, ByteBuffer attachment) {
        try {
          channel.close();
          latch.countDown();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    });
  }

  @Override
  public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
    // ignore
  }
}

       可以看到,客户端的处理方式与服务端基本类似,首先是连接服务器。连接完成后,通过ConnectionCompletionHandler进行后续处理,这里首先是异步往服务器写入数据,写入完成后监听服务器的数据响应,最后读取服务器数据并打印。可以看到AIO模式相较于NIO有如下优点:

  • 本质上是使用的IO多路复用模式,因而天然支持高并发模型;
  • 底层使用了异步IO的模型,因而无需用户使用线程池等工具来分离相关事件的处理,而NIO是需要用户手动创建线程池进行处理的;
  • 在编码模式上更加符合事件处理这一模型,更加符合用户习惯,降低了用户编码的难度。

5. 小结

       本文主要基于BIO和NIO讲解了Java IO的四种编码模式,并且循序渐进讲解了每种编码模式的基本使用方式和优缺点。

本文发表于2019年02月25日 18:00
(c)注:本文转载自https://my.oschina.net/zhangxufeng/blog/3014286,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1660 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1