跳至主要內容

Java NIO案列实战

fangzhipeng约 2728 字大约 9 分钟

SocketChannel介绍

SocketChannel 是 Java NIO 中用于 TCP 网络通信的通道之一。它提供了非阻塞的 TCP 客户端和服务器端的实现,并且可以与选择器(Selector)一起使用,实现高效的多路复用 I/O。通过 SocketChannel,可以进行连接的建立、数据的读写以及连接的关闭操作。

在使用 SocketChannel 时,通常需要遵循以下步骤:

  1. 打开 SocketChannel:通过调用静态的 open() 方法来打开一个 SocketChannel 对象。
  2. 连接远程服务器:调用 SocketChannel 的 connect() 方法来连接远程服务器。
  3. 读写数据:通过 SocketChannel 对象的 read() 方法进行数据的读取,通过 write() 方法进行数据的写入。
  4. 非阻塞模式:SocketChannel 可以通过 configureBlocking(false) 方法设置为非阻塞模式,配合选择器一起使用可以实现高效的多路复用 I/O。
  5. 关闭通道:通信结束后,调用 SocketChannel 的 close() 方法关闭通道。

示例代码如下:

// 打开 SocketChannel
SocketChannel socketChannel = SocketChannel.open();

// 连接远程服务器
socketChannel.connect(new InetSocketAddress("服务器IP", 8080));

// 读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(buffer);

// 写入数据
String data = "Hello, Server!";
buffer.clear();
buffer.put(data.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
    socketChannel.write(buffer);
}

// 非阻塞模式
socketChannel.configureBlocking(false);

// 关闭通道
socketChannel.close();

SocketChannel 提供了灵活而高效的 TCP 网络通信能力,适用于需要使用非阻塞 I/O 和高并发处理的网络编程场景。

ServerSocketChannel介绍

ServerSocketChannel 是 Java NIO 中用于 TCP 服务器端的通道之一。它用于监听客户端的连接请求,并在接受到连接请求时创建对应的 SocketChannel 与客户端进行通信。ServerSocketChannel 可以与选择器(Selector)一起使用,实现高效的多路复用 I/O。通过 ServerSocketChannel,可以进行服务器的初始化、接受连接、数据的读写以及关闭操作。

在使用 ServerSocketChannel 时,通常需要遵循以下步骤:

  1. 打开 ServerSocketChannel:通过调用静态的 open() 方法来打开一个 ServerSocketChannel 对象。
  2. 绑定端口:通过 ServerSocketChannel 对象的 bind() 方法绑定服务器端口,并设置为非阻塞模式。
  3. 接受连接:调用 accept() 方法来接受客户端的连接请求,返回一个对应的 SocketChannel 用于与客户端进行通信。
  4. 读写数据:通过返回的 SocketChannel 对象进行数据的读取和写入。
  5. 非阻塞模式:ServerSocketChannel 可以通过 configureBlocking(false) 方法设置为非阻塞模式,配合选择器一起使用可以实现高效的多路复用 I/O。
  6. 关闭通道:通信结束后,关闭 ServerSocketChannel。

示例代码如下:

// 打开 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 绑定端口并设置非阻塞模式
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);

// 接受连接
SocketChannel socketChannel = serverSocketChannel.accept();

// 读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(buffer);

// 写入数据
String data = "Hello, Client!";
buffer.clear();
buffer.put(data.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
    socketChannel.write(buffer);
}

// 关闭连接
socketChannel.close();

// 关闭 ServerSocketChannel
serverSocketChannel.close();

ServerSocketChannel 提供了灵活而高效的 TCP 服务器端编程能力,适用于需要使用非阻塞 I/O 和高并发处理的网络服务器编程场景。

Selector介绍

Selector 是 Java NIO 中非常重要的组件,用于多路复用非阻塞 I/O。使用 Selector 可以用单线程处理多个 Channel,这样就可以管理多个网络连接,并且在有数据可读或可写的通道上进行处理,提高了 I/O 操作的效率。

Selector 的主要作用包括:

  1. 监控多个 Channel 的事件:比如读、写、连接、接受连接等事件。
  2. 管理注册的 Channel:可以将多个 Channel 注册到一个 Selector 上,并根据事件类型进行监听。
  3. 非阻塞 I/O 多路复用:在一个线程内处理多个 Channel 的 I/O 事件,提高了系统的并发处理能力。

在使用 Selector 时,通常需要遵循以下步骤:

  1. 打开 Selector:通过调用 Selector.open() 方法创建一个 Selector 对象。
  2. 将 Channel 注册到 Selector 上:通过调用 Channel 的 register() 方法将要监听的事件和对应的事件处理器注册到 Selector 上。
  3. 轮询事件:通过调用 Selector 的 select() 方法来轮询已注册的 Channel 上是否有对应的 I/O 事件发生。
  4. 处理事件:根据 select() 方法的返回值,处理具体的 I/O 事件。

示例代码如下:

// 打开 Selector
Selector selector = Selector.open();

// 将 Channel 注册到 Selector 上
channel1.register(selector, SelectionKey.OP_READ);
channel2.register(selector, SelectionKey.OP_WRITE);

while (true) {
    // 轮询事件
    int readyChannels = selector.select();

    if (readyChannels == 0) {
        continue;
    }

    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

    while (keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();

        if (key.isAcceptable()) {
            // 处理接受连接事件
        } else if (key.isReadable()) {
            // 处理读事件
        } else if (key.isWritable()) {
            // 处理写事件
        }
        keyIterator.remove();
    }
}

Selector 提供了高效的事件多路复用能力,适用于需要同时处理多个 Channel 的网络编程场景,能够实现高性能的并发 I/O 操作。

案列介绍

  • 服务端使用ServerSocketChannel和Selector
  • 客户端使用SocketChannel
  • 服务器端不关闭不停接收客户端的数据,并返回接收结果给客户端 。
  • ClientA不关闭不断发送数据给服务端

服务端代码

使用 Java NIO(New I/O)包实现的简单的基于 NIO 的服务器端程序,示例代码如下:

package io.github.forezp.javabasiclab.nio;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

public class ServerConnect {
    private static final int BUF_SIZE = 1024;
    private static final int PORT = 8080;
    private static final int TIMEOUT = 3000;

    public static void main(String[] args) {
        selector();
    }

    public static void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
        SocketChannel sc = ssChannel.accept();
        sc.configureBlocking(false);
        sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
    }

    public static void handleRead(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer buf = (ByteBuffer) key.attachment();
        long bytesRead = sc.read(buf);
        while (bytesRead > 0) {
            buf.flip();
            while (buf.hasRemaining()) {
                System.out.print((char) buf.get());
            }
            System.out.println();
            buf.clear();
            bytesRead = sc.read(buf);
        }
        buf.put(("服务器端成功接收信息.").getBytes());
        buf.flip();
        sc.write(buf);
        buf.clear();
        if (bytesRead == -1) {
            sc.close();
        }
    }

    public static void handleWrite(SelectionKey key) throws IOException {
        System.out.println("write to client");
        ByteBuffer buf = (ByteBuffer) key.attachment();
        buf.flip();
        String data = "Hello, Client!";
        buf.put(data.getBytes(StandardCharsets.UTF_8));
        SocketChannel sc = (SocketChannel) key.channel();
        while (buf.hasRemaining()) {
            sc.write(buf);
        }
        buf.compact();
    }

    public static void selector() {
        Selector selector = null;
        ServerSocketChannel ssc = null;
        try {
            selector = Selector.open();
            ssc = ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress(PORT));
            ssc.configureBlocking(false);
            ssc.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                if (selector.select(TIMEOUT) == 0) {
                    System.out.println("==");
                    continue;
                }
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    if (key.isAcceptable()) {
                        handleAccept(key);
                    }
                    if (key.isReadable()) {
                        handleRead(key);
                    }
                    if (key.isWritable() && key.isValid()) {
                        handleWrite(key);
                    }
                    if (key.isConnectable()) {
                        System.out.println("isConnectable = true");
                    }
                    iter.remove();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (selector != null) {
                    selector.close();
                }
                if (ssc != null) {
                    ssc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


这个程序使用 Selector 来监听多个 Channel,并根据不同的事件类型进行处理。

主要功能包括:

  • selector() 方法中,首先打开一个 Selector,并创建一个 ServerSocketChannel,并将其注册到 Selector 上,并且设置为非阻塞模式。然后进入一个无限循环,不断轮询 Selector 上注册的事件。

  • handleAccept(SelectionKey key) 方法中,当有新连接到来时,调用 ServerSocketChannel.accept() 方法接收连接,并将其设置为非阻塞模式,并注册到 Selector 上以监听读事件。

  • handleRead(SelectionKey key) 方法中,处理读事件。它先创建一个 ByteBuffer 对象,用来读取数据。然后通过 SocketChannel 的 read 方法从 Channel 中读取数据,将其打印到控制台,并向客户端发送"服务器端成功接收信息"。

  • handleWrite(SelectionKey key) 方法中,处理写事件。它首先创建一个 ByteBuffer 对象,并向其中放入要发送给客户端的数据"Hello, Client!",然后通过 SocketChannel 的 write 方法向客户端写入数据。

  • selector() 方法中,通过无限循环不断调用 Selector.select() 等待事件就绪,一旦有就绪的事件,就遍历处理相应的事件类型。

这段代码实现了一个基于 Java NIO 的简单的非阻塞服务器端程序,能够同时处理多个客户端连接,并实现了事件的注册、轮询和处理。

客户端代码

使用 Java NIO(New I/O)包实现了基于 NIO 的客户端程序,示例代码如下:

package io.github.forezp.javabasiclab.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;

public class ClientConnect {

    public static void main(String[] args) {
        try {
            client();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void client() throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

            if (socketChannel.finishConnect()) {
                int i = 0;
                while (true) {

                    TimeUnit.SECONDS.sleep(1);
                    String info = "I'm " + i++ + "-th information from client";
                    buffer.clear();
                    buffer.put(info.getBytes());
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        //System.out.println(new String(buffer.array()));
                        socketChannel.write(buffer);
                    }
                    //
                    buffer.clear();
                    int len = socketChannel.read(buffer);
                    buffer.flip();
                    byte[] bytes = new byte[buffer.remaining()];
                    buffer.get(bytes);
                    System.out.println(new String(bytes, 0 , len));
                    buffer.clear();
                }
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if (socketChannel != null) {
                    socketChannel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

代码的主要功能包括:

  • client() 方法中,首先创建一个 ByteBuffer 对象用于读写数据,然后创建一个 SocketChannel,并将其设置为非阻塞模式。接着通过 socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080)) 进行连接服务器端。
  • 使用 socketChannel.finishConnect() 来确保连接已经建立,然后进入一个无限循环,向服务器端发送消息,并接收服务器端的响应。
  • 在while循环中,每隔一秒向服务器端发送一条消息,并且接收服务器端发送的反馈消息。
  • finally 块中关闭 SocketChannel。

这段代码实现了一个基于 Java NIO 的简单的非阻塞客户端程序,能够与服务器端建立连接,进行数据的发送和接收。

依次启动服务端程序和客户端程序,服务端打印如下:

==
I'm 0-th information from client
I'm 1-th information from client
I'm 2-th information from client
I'm 3-th information from client

客户端打印如下:

服务器端成功接收信息.
服务器端成功接收信息.
服务器端成功接收信息.

NIO底层原理

NIO的特点是由底层原理决定的,如图:

nio-jieshao
nio-jieshao

总结来说,和阻塞 I/O(BIO)相比,使用多路复用器(比如 Selector)和非阻塞 I/O(NIO)的方式能够更高效地处理多个连接的读写事件。

在使用 NIO 的情况下,当调用 select() 启动多路复用器时,连接事件会被交由多路复用器管理,这会触发内核中的 epoll() 函数,将网卡中所有连接的读写事件数据以链表的形式一次性拷贝到内存中的特定区域。这样一来,当 Selector 轮询到连接事件时便会触发内核中的 recvFrom(NOBlocking…) 函数,将数据从特定区域零拷贝到用户空间,实现了零拷贝的优化功能。

与阻塞 I/O 不同的是,使用 NIO 和 Selector 能够保证即使某个连接阻塞,也不会影响其他连接的数据已就绪的读写事件。这是因为在 Selector 的轮询机制下,各个客户端的数据已经提前拷贝到内存中,所以不会因为某个连接的阻塞而影响其他连接的读写事件的处理。

因此,使用 NIO 和 Selector 能够更高效地处理多个连接的读写事件,并且能够避免阻塞影响其他连接的特点成为了非常重要的优势。

方志朋_官方公众号
方志朋_官方公众号