当前位置: 首页 > news >正文

多人聊天室 NIO模型实现

NIO编程模型

在这里插入图片描述

  • Selector监听客户端不同的zhuangtai
  • 不同客户端触发不同的状态后,交由相应的handles处理
  • Selector和对应的处理handles都是在同一线程上实现的

I/O多路复用

在Java中,I/O多路复用是一种技术,它允许单个线程处理多个输入/输出(I/O)源,而不需要为每个I/O源创建一个线程。这种技术可以显著提高性能,因为它减少了线程创建和上下文切换的开销。I/O多路复用的核心思想是使用一个机制来监控多个I/O通道,一旦某个通道有数据可读或可写,就通知应用程序进行相应的操作。

NIO模型 + Selector监听通道 == 经典的I/O多路复用

同步式I/O和异步I/O概念及分类

概念:

  • 同步式I/O(Synchronous I/O)
    定义:在同步I/O模型中,当一个线程发起一个I/O请求时,它会阻塞,直到I/O操作完成。也就是说,线程会一直等待直到数据被读取或写入完毕。
    特点:阻塞性:线程在I/O操作完成之前不能执行其他任务。
    资源消耗:每个I/O操作都需要一个线程或进程,可能导致资源消耗较大,特别是在高并发场景下。
  • 异步I/O(Asynchronous I/O)
    定义:在异步I/O模型中,当一个线程发起一个I/O请求后,它不会被阻塞,而是可以继续执行其他任务。I/O操作在后台进行,当操作完成时,系统会通知发起请求的线程。
    特点:非阻塞性:线程不需要等待I/O操作完成,可以继续执行其他任务。
    并发性:可以提高系统的并发处理能力,适用于高并发场景。

分类:

  • BIO(Blocking I/O):
    类型:同步I/O。
    特点:在BIO模型中,当线程执行I/O操作时,如果数据还没有准备好,它会一直等待直到数据准备完成。在这个过程中,线程被阻塞,不能执行其他任务。
  • NIO(Non-blocking I/O):
    类型:非阻塞I/O,可以用于同步或异步操作。
    特点:NIO模型中的I/O操作是非阻塞的,这意味着当数据没有准备好时,线程可以立即返回,去做其他事情。NIO本身提供了非阻塞的能力,但是它既可以用于同步编程(通过在while循环中检查并处理I/O事件),也可以与异步I/O(如Java 7引入的NIO.2,也称为Asynchronous I/O)结合使用。
  • I/O多路复用(I/O Multiplexing):
    类型:同步I/O。
    特点:I/O多路复用模型允许单个线程监控多个I/O通道,但是当线程执行I/O操作时,如果数据没有准备好,线程仍然会被阻塞。最常见的I/O多路复用技术是select/poll系统调用。在Java中,可以通过Selector和Channel实现I/O多路复用。

总结:

  • NIO模型+Selector实现的I/O多路复用是同步式I/O,因为服务器端需要多次调用selector.select()来查看是否有新的事件发生。如果服务器端不通过多次调用selector.select(),也没有其他线程会通知主线程有新的事件发生,主线程就会持续阻塞。
  • AIO异步I/O则是当主线程查看发现没有新事件发生时立刻返回处理其他事件,当有新事件发生时主线程会被通知,并来处理。

ChatServer实现

package server;import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;public class ChatServer {private static final int DEFAULT_PORT = 8888;private static final String QUIT = "quit";private static final int BUFFER = 1024;private ServerSocketChannel server;private Selector selector;private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);//统一编码,解码方法private Charset charset = Charset.forName("UTF-8");//可以自定义服务器端的端口private int port;public ChatServer() {this(DEFAULT_PORT);}public ChatServer(int port) {this.port = port;}private void start() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.socket().bind(new InetSocketAddress(port));selector = Selector.open();//将ServerSocketChannel的Accept事件注册到selector上//一旦ServerSocketChannel接收到了客户端的连接请求,selector就会得知server.register(selector, SelectionKey.OP_ACCEPT);System.out.println("启动服务器, 监听端口:" + port + "...");while (true) {//有事件被触发了select()函数才会有返回selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {// 处理被触发的事件handles(key);}//清空集合,防止重复处理selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// ACCEPT事件 - 和客户端建立了连接if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel client = server.accept();client.configureBlocking(false);// 在selector上注册可能发生的Read事件client.register(selector, SelectionKey.OP_READ);System.out.println(getClientName(client) + "已连接");}// READ事件 - 客户端发送了消息else if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();String fwdMsg = receive(client);if (fwdMsg.isEmpty()) {// 客户端异常// 取消掉key的注册,以后不再响应Read的事件// selector的key注销掉以后通常搭配selector.wakeup(); (是个好习惯)立刻唤醒selector,判断当前发生的事件key.cancel();selector.wakeup();} else {System.out.println(getClientName(client) + ":" + fwdMsg);forwardMessage(client, fwdMsg);// 检查用户是否退出if (readyToQuit(fwdMsg)) {key.cancel();selector.wakeup();System.out.println(getClientName(client) + "已断开");}}}}private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {for (SelectionKey key: selector.keys()) {Channel connectedClient = key.channel();//如果遍历到了服务器的监听Socket,则跳过(不需要将消息转发给服务器)if (connectedClient instanceof ServerSocketChannel) {continue;}// key是否有效(key对应的Channel和selector都在运行没有关闭) && 不是发消息的客户端他自己if (key.isValid() && !client.equals(connectedClient)) {// 写Buffer前先将Buffer清空wBuffer.clear();// 写入Buffer消息wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));// 将Buffer从写状态反转成读状态wBuffer.flip();// 将Buffer中的数据写入通道中while (wBuffer.hasRemaining()) {((SocketChannel)connectedClient).write(wBuffer);}}}}private String receive(SocketChannel client) throws IOException {// 在每次新的读取前先把buffer清空rBuffer.clear();// 将channel中的信息读入rBuffer中,直到读不出文件while(client.read(rBuffer) > 0);// 将rBuffer从写模式从转换成读模式rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}private String getClientName(SocketChannel client) {return "客户端[" + client.socket().getPort() + "]";}private boolean readyToQuit(String msg) {return QUIT.equals(msg);}private void close(Closeable closable) {if (closable != null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) {ChatServer chatServer = new ChatServer(7777);chatServer.start();}
}
  • 将Channel中的事件注册在Selector
  • 用Selector监控事件的发生,实现了在同一线程处理多个客户端输入
  • 极大提高了线程的使用效率,使得服务器端能够处理大量的客户端连接

实现ChatClient

ChatClient

package client;import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;public class ChatClient {private static final String DEFAULT_SERVER_HOST = "127.0.0.1";private static final int DEFAULT_SERVER_PORT = 8888;private static final String QUIT = "quit";private static final int BUFFER = 1024;private String host;private int port;private SocketChannel client;private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);private Selector selector;private Charset charset = Charset.forName("UTF-8");public ChatClient() {this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);}public ChatClient(String host, int port) {this.host = host;this.port = port;}public boolean readyToQuit(String msg) {return QUIT.equals(msg);}private void close(Closeable closable) {if (closable != null) {try {closable.close();} catch (IOException e) {e.printStackTrace();}}}private void start() {try {client = SocketChannel.open();client.configureBlocking(false);selector = Selector.open();client.register(selector, SelectionKey.OP_CONNECT);client.connect(new InetSocketAddress(host, port));while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();for (SelectionKey key : selectionKeys) {handles(key);}selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();} catch (ClosedSelectorException e) {// 用户正常退出} finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// CONNECT事件 - 连接就绪事件if (key.isConnectable()) {SocketChannel client = (SocketChannel) key.channel();// 判断连接是否建立完全if (client.isConnectionPending()) {// 调用finishConnect方法正式建立连接client.finishConnect();// 创建一个新的线程来处理用户的输入new Thread(new UserInputHandler(this)).start();}// 将Read事件注册在selector上面client.register(selector, SelectionKey.OP_READ);}// READ事件 -  服务器转发消息else if (key.isReadable()) {SocketChannel client = (SocketChannel) key.channel();String msg = receive(client);if (msg.isEmpty()) {// 服务器异常close(selector);} else {System.out.println(msg);}}}public void send(String msg) throws IOException {if (msg.isEmpty()) {return;}wBuffer.clear();wBuffer.put(charset.encode(msg));wBuffer.flip();while (wBuffer.hasRemaining()) {client.write(wBuffer);}// 检查用户是否准备退出if (readyToQuit(msg)) {close(selector);}}private String receive(SocketChannel client) throws IOException {rBuffer.clear();while (client.read(rBuffer) > 0);rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}public static void main(String[] args) {ChatClient client = new ChatClient("127.0.0.1", 7777);client.start();}
}
  • ChatClient仍然需要通过创建新的线程来处理用户输入

UserInputHanlder

package client;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;public class UserInputHandler implements Runnable {private ChatClient chatClient;public UserInputHandler(ChatClient chatClient) {this.chatClient = chatClient;}@Overridepublic void run() {try {// 等待用户输入消息BufferedReader consoleReader =new BufferedReader(new InputStreamReader(System.in));while (true) {String input = consoleReader.readLine();// 向服务器发送消息chatClient.send(input);// 检查用户是否准备退出if (chatClient.readyToQuit(input)) {break;}}} catch (IOException e) {e.printStackTrace();}}
}
  • UserInputHandler仍然需要阻塞式的等待用户的输入
  • 用户的输入延迟应非常小,所以线程必须时刻等待着用户的输入以便第一时间处理

总结

与用BIO模型实现的多人聊天室有什么区别

  • 使用Channel代替Stream
  • 使用Selector监控多条Channel
  • 可以在一个线程里处理多个Channel I/O

可能的面试题

1. NIO模型理解

  • NIO模型与BIO模型的主要区别是什么?

    • NIO模型是非阻塞的,支持多路复用,允许单个线程处理多个I/O通道,而BIO模型是阻塞的,每个连接都需要一个线程处理。NIO使用缓冲区和通道,而BIO使用流。
  • 在NIO模型中,Selector的作用是什么?它是如何工作的?

    • Selector是NIO模型的核心,用于监听多个通道的事件(如连接打开、数据到达)。Selector通过选择已准备好的通道集合来工作,这意味着线程可以等待直到一个或多个通道准备好进行I/O操作。

2. I/O多路复用

  • 解释一下I/O多路复用的概念及其在NIO中的应用。

    • I/O多路复用是指单个线程可以管理多个I/O通道的能力。在NIO中,通过Selector实现,它可以监控多个通道,当通道准备就绪时进行通知。
  • 在您的项目中,是如何实现I/O多路复用的?

    • 通过创建一个Selector并注册ServerSocketChannelSocketChannel的事件,如OP_ACCEPTOP_READ,然后在一个循环中调用selector.select()来处理所有事件。

3. 同步式I/O和异步I/O

  • 同步式I/O和异步I/O有什么区别?

    • 同步I/O需要等待I/O操作完成,而异步I/O不需要。在异步I/O中,I/O操作在后台执行,完成后通过回调、事件、或其他机制通知。
  • 在您的项目中,是如何处理同步和异步I/O的?

    • 项目主要使用同步I/O,通过在循环中检查I/O事件并处理它们。虽然NIO支持异步I/O,但在这个项目中没有使用Java的AIO。

4. 代码实现细节

  • ChatServer类中,您是如何注册ServerSocketChannelSocketChannelSelector的?

    • ServerSocketChannel注册到Selector上,设置为OP_ACCEPTSocketChannel注册为OP_READ
  • Selector.select()方法在您的项目中是如何使用的?

    • 在一个无限循环中调用selector.select(),等待I/O事件的发生,并处理这些事件。
  • 当一个新的客户端连接时,您是如何处理OP_ACCEPTOP_READ事件的?

    • 对于OP_ACCEPT,接受连接并将新的SocketChannel注册到Selector上。对于OP_READ,读取数据并转发给其他客户端。

5. 消息转发机制

  • forwardMessage方法是如何工作的?

    • 该方法遍历所有注册到SelectorSocketChannel,并将消息写入每个通道,除了发送消息的客户端。
  • 您是如何确保消息正确地从ByteBuffer中读取和写入的?

    • 使用ByteBufferclearflip方法在读写之间切换,确保数据正确处理。

6. 客户端实现

  • ChatClient类中,您是如何处理连接建立和消息发送的?

    • 客户端尝试连接到服务器,并将自身注册到Selector上以监听OP_CONNECTOP_READ事件。发送消息时,将数据写入ByteBuffer并发送。
  • 客户端是如何注册到Selector并监听服务器消息的?

    • 客户端在连接建立后将自身注册为OP_READ,以便接收服务器的消息。

7. 异常和资源管理

  • 您的项目是如何处理I/O异常的?

    • 通过捕获IOException并打印堆栈跟踪,同时确保资源被正确关闭。
  • 在关闭资源时,您采取了哪些措施以确保所有资源都被正确释放?

    • 使用try-with-resources语句和finally块确保SelectorSocketChannel被关闭。

8. 性能和优化

  • 您的项目在高并发情况下表现如何?是否有进行过性能测试?

    • 项目设计为处理高并发,但具体的性能测试还未进行。
  • 您认为项目中哪些部分还有优化空间?

    • 可以考虑引入更高效的编码方式,或者使用Java的AIO来进一步提高性能。

9. 安全性和编码

  • 您的聊天室项目是否考虑了安全性问题,比如数据加密?

    • 目前项目中没有实现数据加密,这是一个潜在的改进点。
  • 您在项目中使用了哪种字符编码?为什么选择这种编码?

    • 使用了UTF-8编码,因为它支持国际化并且是Java的默认编码。

10. 扩展性和维护性

  • 如果需要添加新功能,比如私聊或群组聊天,您会如何修改现有的架构?

    • 可以通过添加新的频道和事件处理逻辑来实现这些功能。
  • 您如何确保代码的可维护性和扩展性?

    • 通过模块化设计和清晰的代码结构来确保代码的可维护性和扩展性。

http://www.mrgr.cn/news/79681.html

相关文章:

  • CSS在线格式化 - 加菲工具
  • 【AI日记】24.12.12 kaggle 比赛 2-2 EDA
  • 设置笔记本同时连接内外网
  • OBS + SRS:打造专业级直播环境的入门指南
  • NocoBase搭建(下):安装NocoBase
  • C++的一些经典算法
  • 1.1.Flink的项目初始化和Hello-world
  • 在Ubuntu22.04 jammy下用qemu模型riscv32环境装鸿蒙(未完成,待续)
  • PDF处理的创新工具:福昕低代码平台尝鲜
  • leecode中的面试100题
  • 【推荐算法】单目标精排模型——DIN
  • 阿里云ack部署rabbitmq集群
  • 【sgUploadList】自定义组件:基于elementUI的el-upload封装的上传列表组件,适用于上传附件时
  • 第4章:颜色和背景 --[CSS零基础入门]
  • unity中的UI介绍
  • 外包干了5天,技术明显退步。。。。。
  • 《蓝桥杯比赛规划》
  • unity 2D像素种田游戏学习记录(自用)
  • LearnOpenGL学习(高级OpenGL --> 帧缓冲,立方体贴图,高级数据)
  • kubeadm安装K8s集群之基础环境配置
  • VCU——matlab/simulink软件建模
  • ubuntu22.04 使用可以用的镜像源获取你要的镜像
  • Redisson分布式锁
  • Win10环境vscode+latex+中文快速配置
  • Java-JMX 组件架构即详解
  • Oracle DataGuard启动与关闭顺序