当前位置: 首页 > news >正文

Netty初体验-1-NIO基础补漏

文章目录

  • 学习netty之前先学习NIO基础
    • NIO基础
      • 1.三大组件(Channel,Buffer,Selector)
      • 2. ByteBuffer
      • 3.文件编程
      • 4.网络编程
        • 网络编程小结:
          • 非阻塞 VS 阻塞
          • 1.1 阻塞
          • 1.2 非阻塞
          • 1.3 多路复用
          • selector何时不阻塞
          • 监听channel事件
          • 更进一步优化
      • 5. NIO vs BIO
        • 5.1 stream VS channel
        • 5.2 IO模型(下面的几个IO概念可能不清晰,有个大概,如果要详细看可以百度认真了解IO概念)
        • 5.3零拷贝(针对java里面不用拷贝)
          • 传统IO问题
          • NIO优化
          • 进一步优化
          • 进一步优化(linux2.4)
        • 5.4 AIO
          • 文件AIO
          • 网络AIO

学习netty之前先学习NIO基础

NIO基础

1.三大组件(Channel,Buffer,Selector)

  • 1.1 Channel (读写双向管道)和 Buffer(可以将Channel的数据读入Buffer,也可将buffer的数据写入channel)
    • 常见的Channel有
      • 1.FileChannel (使用)
      • 2.DatagramChannel (使用UDP网络时候)
      • 3.SocketChannel (使用TCP的时候,服务器和客户端都可以用)
      • 4.ServerSocketChannel (使用TCP的时候,专用于服务器的时候)
    • 常见的Buffer缓冲区
      • ByteBuffer(常用)
        • MappedByteBuffer
        • DirectByteBuffer
        • HeapByteBuffer
      • ShortBuffer
      • IntBuffer
      • LongBuffer
      • FloatBuffer
      • DoubleBuffer
      • CharBuffer
  • 1.2 Selector
    • 多线程版本设计,一个客户端一个socket线程,多了的话,可能造成内存溢出。缺点内存占用高;线程上下文切换成本高;只适合连接数少的场景。
    • 线程池版本的设计,缺点:阻塞模式下,线程仅能处理一个socket连接;仅适合短链接的场景。(适合http请求)
    • Selector版本的设计。selector的作用就是配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程吊死在一个channel上,适合连接数多,流量低的场景(low traffic);调用selector的select()方法会阻塞直到发生了读写就绪事件,一旦这些事件发生,selector方法就会返回这些事件交给thread处理。

2. ByteBuffer

    1. ByteBuffer案例:
package com.dapeng.netty;import lombok.extern.slf4j.Slf4j;import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;@Slf4j
public class TestByteBuffer {public static void main(String[] args) {
//        FileChannel
//        1.输入输出流 2.RandomAccessFIletry (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
//            准备缓冲区ByteBuffer buffer = ByteBuffer.allocate(10);// 初始化容量为10bytewhile (true){
//                从channel读取,向缓冲区buffer写入int len = channel.read(buffer);// 当返回-1表示读取完成log.debug("读取到的字节数为:{}",len);if (len == -1){break;}
//                打印buffer内容buffer.flip();// 切换读模式while (buffer.hasRemaining()){byte b = buffer.get();// get()默认读取一个log.debug("读取到的实际字节:{}",(char)b);}// 再切入到写模式,上面可能要写入buffer.clear(); // 切换到写}} catch (IOException e) {}}
}
  • 2.ByteBuffer正确使用姿势
    • 2.1向buffer写入数据的时候,例如调用channel.read(buffer)。
    • 2.2 调用buffer.filp(); 切换为读模式
    • 2.3 从buffer读取数据,例如调用buffer.get();获取单个字节
    • 2.4 调用buffer.clear(),或者compact() 切换至 写模式
    • 2.5 重复1-4步骤。
  • 3.ByteBuffer结构
    • 3.1ByteBuffer有一下几个重要属性:
      • Capacity(容量)
      • Position(当前位置)
      • LImit(写入限制)
        如下图所示:
    • 3.2 读写切换
      • 1.刚开始为写模式:在这里插入图片描述
        在这里插入图片描述
      • 2 切换读的时候,调用filp()
        在这里插入图片描述
      • 3.读取完后,如图: 在这里插入图片描述
        1. 切换回来为写模式,调用clear()方法。
          在这里插入图片描述
      • 5.特殊的compact的方法,在读取一部分或者因为默写情况未读完,则需要在未读的数据后面追加写入。从未读完的那里开始,重新写入。
        在这里插入图片描述
        读写实例:
        (以下所有代码需要用到该类)首先拿到工具类:
package com.dapeng.netty.utils;import java.nio.ByteBuffer;
import static io.netty.util.internal.StringUtil.NEWLINE;
import static io.netty.util.internal.MathUtil.isOutOfBounds;
import io.netty.util.internal.StringUtil;public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];private static final char[] HEXDUMP_TABLE = new char[256 * 4];private static final String[] HEXPADDING = new String[16];private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];private static final String[] BYTE2HEX = new String[256];private static final String[] BYTEPADDING = new String[16];static {final char[] DIGITS = "0123456789abcdef".toCharArray();for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];}int i;// Generate the lookup table for hex dump paddingsfor (i = 0; i < HEXPADDING.length; i++) {int padding = HEXPADDING.length - i;StringBuilder buf = new StringBuilder(padding * 3);for (int j = 0; j < padding; j++) {buf.append("   ");}HEXPADDING[i] = buf.toString();}// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);buf.append(NEWLINE);buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, '|');buf.append('|');HEXDUMP_ROWPREFIXES[i] = buf.toString();}// Generate the lookup table for byte-to-hex-dump conversionfor (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);}// Generate the lookup table for byte dump paddingsfor (i = 0; i < BYTEPADDING.length; i++) {int padding = BYTEPADDING.length - i;StringBuilder buf = new StringBuilder(padding);for (int j = 0; j < padding; j++) {buf.append(' ');}BYTEPADDING[i] = buf.toString();}// Generate the lookup table for byte-to-char conversionfor (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';} else {BYTE2CHAR[i] = (char) i;}}}/*** 打印所有内容* @param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin = new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println("+--------+-------------------- all ------------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}/*** 打印可读取内容* @param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println("+--------+-------------------- read -----------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());System.out.println(builder);}private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException("expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')');}if (length == 0) {return;}dump.append("         +-------------------------------------------------+" +NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +NEWLINE + "+--------+-------------------------------------------------+----------------+");final int startIndex = offset;final int fullRows = length >>> 4;final int remainder = length & 0xF;// Dump the rows which have 16 bytes.for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + 16;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(" |");// ASCII dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append('|');}// Dump the last row which has less than 16 bytes.if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + remainder;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append(" |");// Ascii dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append('|');}dump.append(NEWLINE +"+--------+-------------------------------------------------+----------------+");}private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(NEWLINE);dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, '|');dump.append('|');}}public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);}}
  • 案例1
package com.dapeng.netty;import java.nio.ByteBuffer;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;public class TestByteBufferReadWrite {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put((byte) 0x61); // 'a'debugAll(buffer); // 打印当前buffer里面的内容。buffer.put(new byte[]{ 0x62, 0x63, 0x64});debugAll(buffer);// 切换读模式buffer.flip();System.out.println("读模式" + buffer.get());debugAll(buffer);// 切换到compact模式buffer.compact();debugAll(buffer);// 切换到compact后,会往前追加未读的数据,并且会残留最后一位数据,下次写的时候直接覆盖。buffer.put(new byte[]{ 0x65, 0x6f});// 继续写debugAll(buffer);}
}
  • 4.ByteBuffer常见的方法
    • 4.1 Allocate()
package com.dapeng.netty;import java.nio.ByteBuffer;public class TestByteBufferAllocate {public static void main(String[] args) {System.out.println(ByteBuffer.allocate(16).getClass());System.out.println(ByteBuffer.allocateDirect(16).getClass());//       class java.nio.HeapByteBuffer  - java 堆内存,读写效率较低,受到GC影响
//       class java.nio.DirectByteBuffer  - 直接内存,读写效率高(少一次拷贝),不会收到GC影响,分配效率低,使用过不当会造成内存泄露。}
}
  • 4.2 从buffer写入数据
    • 调用channel的read方法。
    • 调用buffer的put方法。
	int readBytes = channel.read(buf);//返回读取的字节数buf.put((byte)127);
  • 4.3 从buffer读取数据
    • 调用channel的write方法
    • 调用channel的get方法
 int writeBytes = channel.write(buf);// 返回读取到的字节的ascII码。buf.get();//获取单个字节

在这里插入图片描述

  • 4.4 读取案例:(没有工具在上面有一个工具找一下)
package com.dapeng.netty;import java.nio.ByteBuffer;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;/*** 演示buffer的读取*/
public class TestBufferRead {public static void main(String[] args) {ByteBuffer buf = ByteBuffer.allocate(10);buf.put(new byte[]{'a','b','c','d'});buf.flip();//        buf.get(new byte[4]);
//        debugAll(buf); // 读取4个长度
//        buf.rewind();// 从头开始读
//        System.out.println((char)buf.get());// 获取第一个//        debugAll(buf);
//        System.out.println((char)buf.get());// 获取'a'
//        System.out.println((char)buf.get());// 获取'b'
//        buf.mark();// 在下标为2的地方加标记
//        System.out.println((char)buf.get());//获取'c'
//        System.out.println((char)buf.get());// 获取'd'
//        buf.reset();
//        System.out.println((char)buf.get());// 获取下标为2的,'c'
//        System.out.println((char)buf.get());// 获取'd'System.out.println((char) buf.get(2));// 获取'c' 通过get并不会改索引的位置debugAll(buf);}
}
  • 4.5 String和ByteBuffer互相转换
    需要注意,使用allocate定义的bytebuffer在从字符串转byte的时候需要把buffer切换为读模式。(本次案例中先使用写模式,再切换为读模式并输出字符串)
package com.dapeng.netty;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;public class TestByteBufferStringToBuffer {public static void main(String[] args) {
//        1.字符串转bufferByteBuffer buffer = ByteBuffer.allocate(16);buffer.put("hello".getBytes());debugAll(buffer);//      2.CharSetByteBuffer buffer2 = StandardCharsets.UTF_8.encode("hello");debugAll(buffer2);//      3.wrapByteBuffer buffer3 = ByteBuffer.wrap("hello".getBytes());debugAll(buffer3);//        buffer转字符串// 2和3已经切换为读模式了,所以可以直接decode,像第一种方式,需要进行读模式的切换,才能够正确decodeString s = StandardCharsets.UTF_8.decode(buffer2).toString();System.out.println(s);buffer.flip();// 切换读String s2 = StandardCharsets.UTF_8.decode(buffer).toString();System.out.println(s2);}
}
  • 4.6 ScatteringRead分散读(读取指定字节数)
    -----不考虑分散读,需要先进行,完整的读取文件内容,再对单词进行分割,再把每个分割的放到一个ByteBuffer。
    -----分散读目的为了少拷贝,提升效率。这是一种思想
    案例:
package com.dapeng.netty;import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;public class TestSactteringRead {public static void main(String[] args) {
//        使用RandomAccessFile读取(之前使用InputStream)try (FileChannel channel = new RandomAccessFile("words.txt","r").getChannel()) {
//            来三个ByteBuffer,分散读取已知的长度ByteBuffer b1 = ByteBuffer.allocate(3);ByteBuffer b2 = ByteBuffer.allocate(3);ByteBuffer b3 = ByteBuffer.allocate(5);channel.read(new ByteBuffer[]{b1, b2, b3});// 写入到ByteBuffer里面//            切换ByteBuffer写模式到读模式b1.flip();b2.flip();b3.flip();debugAll(b1);debugAll(b2);debugAll(b3);} catch (IOException e) {}}
}
  • 4.7 集中写GatheringWrite
package com.dapeng.netty;import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;public class TestGatheringWrite {public static void main(String[] args) {ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");//        先来一个channeltry (FileChannel channel = new RandomAccessFile("write2.txt","rw").getChannel()) {//        集中写入一个ByteBufferchannel.write(new ByteBuffer[]{b1, b2, b3});} catch (IOException e) {}}
}
  • 4.8黏包,半包处理。
    • 黏包:我理解的是客户端发送数据时,将含有\n或者别的字符一起发送,导致连在一起。
    • 半包:当缓冲区ByteBuffer设置不足,会导致缓冲区满了,然后等下次接收导致接受的数据会断开。
      案例
package com.dapeng.netty;import io.netty.buffer.ByteBuf;import java.nio.ByteBuffer;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;public class TestBufferExam {public static void main(String[] args) {/*网络上有多条数据发送给服务器,数据之间使用 \n 进行分割但由于某种原因,这些数据在接收的时候,被重新组合,例如三条数据Hello,world\nI'm Zhangsan\nHow are you?\n变成了下面:Hello,world\nI'm Zhangsan\nHow are you?\n现在要求编写程序,将错乱的数据恢复成原样,按 \n 分割的数据。*///        模拟接收到的数据并处理ByteBuffer source = ByteBuffer.allocate(32);source.put("Hello,world\nI'm Zhangsan\nHo".getBytes());split(source); // 分割数据source.put("w are you?\n".getBytes());split(source);}private static void split(ByteBuffer source){
//        先切换到读模式source.flip();
//        对数据进行处理for (int i = 0; i < source.limit(); i++) {
//            找到一条完整的信息if (source.get(i) == '\n') { // 字符'\n'
//                如果遇到了\n,则说明是一条完整的信息
//                重新新建一个ByteBuffer来接收int length = i + 1 - source.position();ByteBuffer target = ByteBuffer.allocate(length);
//                从source读,像target写for (int j = 0; j < length; j++) {target.put(source.get());}debugAll(target);}}//        处理完以后切换到写模式source.compact();}
}

3.文件编程

注意
FileChannel只能工作在阻塞模式下

  • 3.1获取FileChannel
    在这里插入图片描述
    在RandomAccessFIle里面获取的时候需要指定模式,第二个参数为读取模式,读写模式:‘rw’
  • 3.2 读取:
    会从Channel读取数据填充ByteBuffer,返回值代表读到了多少字节,-1表示到了文件的末尾。
   int readByte = channel.read(buffer);
  • 3.3 写入
    正确姿势:
ByteBuffer buffer = ...;
buffer.put("...");// 给buffer填充内容
buffer.flip();// 切换读
whie(buffer.hasRemaining){channel.write(buffer);//在while中调用write,因为channel一次write可能写不完,所以先判断buffer是否还有剩余,后续的socketchannel也用此模式。
}
  • 3.4 当前位置
long pos = channel.position();// 设置当前位置:
long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置在文件的末尾
- 这时候读取就会返回-1,
- 这时候写入,会追加内容,注意如果position超过了文件的末尾,再写入新的内容和原来的内容会有空洞(00)

  • 3.5 获取当前文件大小
channel.size();// 获取文件大小
  • 3.6 强制写入
    操作系统出于性能的考虑,会将FileChannel的数据存到系统的缓存当中,不是立刻写入磁盘,FileChannel可以调用force(true),方法来将文件内容和元数据(文件的权限信息等)立刻写入磁盘。
  • 3.7 TransferTo方法,将一个文件的内容复制到另一个文件里面
    此方法只能传输2g内容,如果超过2g,利用循环重复传输
package com.dapeng.netty;import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;public class TestTansferTo {public static void main(String[] args) {// 定义两个channeltry (FileChannel from = new FileInputStream("data.txt").getChannel();FileChannel to = new FileOutputStream("to.txt").getChannel();
;) {// 使用TransferTo方法// 效率高,底层使用操作系统的零拷贝方法。//--改进long size = from.size();// left代表剩余的大小for(long left = size; size > 0;){          // size-left 代表,从此位置开始,传输left大小的长度,到to的channel里面。   	// transferto返回转移多少大小left -= from.transferTo((size - left),left,to);}} catch (IOException e) {e.printStackTrace();}}
}
  • 3.8 Path 和 Paths
    在这里插入图片描述
    在这里插入图片描述
  • 3.9 Files
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    如果目录中含有内容,会抛出异常DirectoryNotEmptyException
  • 3.10 遍历目录Files.walkFileTree(Paths.get(“…”),new SimpleFileVisitor(){})
package com.dapeng.netty;import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;public class TestWalkFile {public static void main(String[] args) throws IOException {// 定义两个计数器:AtomicInteger dirCount = new AtomicInteger();AtomicInteger filCount = new AtomicInteger();Files.walkFileTree(Paths.get("...../jdk-1.8.jdk"),new SimpleFileVisitor<Path>(){@Override// 此方法是便利目录前看看public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {System.out.println("====>当前目录为:"  + dir);dirCount.incrementAndGet(); // 加1return super.preVisitDirectory(dir, attrs);}@Override// 遍历到文件后怎么做public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {System.out.println("当前文件为:" + file);filCount.incrementAndGet();// +1return super.visitFile(file, attrs);}});System.out.println("目录总数为:"  + dirCount);System.out.println("文件总数为:"  + filCount);}
}
//========
如果查看jar包,如下AtomicInteger jarCount = new AtomicInteger();Files.walkFileTree(Paths.get("/Library/Java/JavaVirtualMachines/jdk-1.8.jdk"),new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {if (file.toString().endsWith(".jar")) {System.out.println("jar file ===>" + file);jarCount.incrementAndGet();}return super.visitFile(file, attrs);}});System.out.println("jarCount==>" + jarCount);
  • 3.11 文件目录的删除
    ==注意删除不可逆,路径名一定谨慎填写 ==
package com.dapeng.netty;import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;public class TestWalkFileTreeDelete {public static void main(String[] args) throws IOException {Files.walkFileTree(Paths.get("路径名"),new SimpleFileVisitor<Path>(){@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {// 在访问文件的时候,执行删除文件操作Files.delete(file);return super.visitFile(file, attrs);}@Overridepublic FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {// 在退出目录的时候,执行删除目录操作Files.delete(dir);return super.postVisitDirectory(dir, exc);}});}
}
  • 3-12 文件的复制(从一个文件夹复制到另一个文件夹,所有内容)
package com.dapeng.netty;import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;public class TestFileCopy {public static void main(String[] args) throws IOException {String source = "路径1";// 已存在String target  = "路径2";// 不存在Files.walk(Paths.get(source)).forEach(path -> {// 替换掉路径1中目录的前缀的路径try {String targetName = path.toString().replace(source,target);if (Files.isDirectory(path)){// 如果是目录,新建目录Files.createDirectory(Paths.get(targetName));}else if (Files.isRegularFile(path)){// 如果是文件,则复制Files.copy(path,Paths.get(targetName));}} catch (IOException e) {e.printStackTrace();}});}
}

4.网络编程

  • 1.阻塞和非阻塞
    在这里插入图片描述
  • 2.阻塞案例:
    服务端
package com.dapeng.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;import static com.dapeng.netty.utils.ByteBufferUtil.debugRead;@Slf4j
public class Server {public static void main(String[] args) throws IOException {
//        使用nio的阻塞模式//        0.创建ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);//        1.创建服务器ServerSocketChannel ssc = ServerSocketChannel.open();//        2.绑定端口ssc.bind(new InetSocketAddress(8080));//        3.连接集合List<SocketChannel> channels = new ArrayList<>();while (true){
//            4.accept 建立与客户端的连接,SocketChannel用于与客户端之间通信log.debug("连接中....   connecting...");SocketChannel sc  =ssc.accept();// 阻塞方法,线程停止运行log.debug("连接成功 connected {}",sc);channels.add(sc);for (SocketChannel channel : channels) {// 5.接收客户端发送的消息log.debug("before read...  读取数据之前... {}",channel);channel.read(buffer); // 阻塞方法,线程停止运行
//              6.切换为读模式buffer.flip();debugRead(buffer);//7.切换为写模式buffer.clear();log.debug("after read... 读取数据后...{}",channel);}}}
}

客户端
通过Debug模式,然后调用sc.write(Charset.defaultCharset.encode(str));来向服务端发送消息。

package com.dapeng.netty.c4;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;public class Client {public static void main(String[] args) throws IOException {SocketChannel sc =SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8080));System.out.println("waiting ......");}
}

小结
阻塞模式下,某个方法的执行,都会影响另一个方法的执行,比如上面案例的ssc.accept()会阻断后面代码的运行,一直等待连接,channel.read(buffer),也会阻断线程,让线程停止运行,等待客户端发送数据。

  • 非阻塞案例,使用单线程(在阻塞案例上面做了一点修改)
package com.dapeng.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;import static com.dapeng.netty.utils.ByteBufferUtil.debugRead;@Slf4j
public class Server {public static void main(String[] args) throws IOException {
//        使用nio的阻塞模式//        0.创建ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);//        1.创建服务器ServerSocketChannel ssc = ServerSocketChannel.open();// 设置非阻塞模式--------修改的地方ssc.configureBlocking(false);// 默认阻塞模式,false代表非阻塞。//        2.绑定端口ssc.bind(new InetSocketAddress(8080));//        3.连接集合List<SocketChannel> channels = new ArrayList<>();while (true){
//            4.accept 建立与客户端的连接,SocketChannel用于与客户端之间通信
//            log.debug("连接中....   connecting...");SocketChannel sc  = ssc.accept();// 阻塞方法,线程停止运行,非阻塞模式下如果没有连接则为空//--------修改的地方if (sc != null){log.debug("连接成功 connected {}",sc);// 这里SocketChannel也要设置为非阻塞模式,即连接的客户端sc.configureBlocking(false);// socketChannel设置为非阻塞模式下,read默认0channels.add(sc);}for (SocketChannel channel : channels) {// 5.接收客户端发送的消息
//                log.debug("before read...  读取数据之前... {}",channel);int read = channel.read(buffer);// 阻塞方法,线程停止运行,非阻塞模式下,默认0if (read > 0){
//                  6.切换为读模式buffer.flip();debugRead(buffer);//7.切换为写模式buffer.clear();log.debug("after read... 读取数据后...{}",channel);}}}}
}
    1. Selector
      selector在事件未处理时,不会阻塞,会一直循环,要么处理,要么取消,例如下面代码
package com.dapeng.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Iterator;@Slf4j
public class ServerSelector {public static void main(String[] args) throws IOException {
//        1.创建selector,管理多个channelSelector selector = Selector.open(); // 通过静态方法调用ServerSocketChannel ssc = ServerSocketChannel.open();// 创建serversocketchannelssc.configureBlocking(false);// 设置非阻塞//        2.建立selector和selector的联系(注册),0代表不监听任何事件SelectionKey sscKey = ssc.register(selector, 0, null);
//        key只关注监听事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("register key {}",sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {
//            3.selector方法,没有事件发生的时候,阻塞,有事件的时候线程才会恢复。
//            如果select事件没有被处理,例如channel.accept();会直接把未处理事件加入到selectorKeys的set集合里面,继续循环。直到处理selector.select();//阻塞
//            4.处理事件Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();if (iterator.hasNext()) {SelectionKey key = iterator.next();log.debug("循环的key: {}",key);ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();log.debug("连接的 Channel {}",sc);// 如果不处理,必须取消掉// key.cancel();}}}
}
    1. 为了防止死循环,下面做了if判断,还有就是selectionKeys和selectedKeys的区别,注册时候把所有的channel注册到selectionKeys中,当发生事件的时候,会把对象复制到selectedKeys里面,当事件发生完后,此时的selectedkey会被标记为已处理,但是还留在selectedKeys里面,需要手动删除。在迭代器中调用iterator.remove()方法。
package com.dapeng.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;import static com.dapeng.netty.utils.ByteBufferUtil.debugRead;@Slf4j
public class ServerSelector {public static void main(String[] args) throws IOException {
//        1.创建selector,管理多个channelSelector selector = Selector.open(); // 通过静态方法调用ServerSocketChannel ssc = ServerSocketChannel.open();// 创建serversocketchannelssc.configureBlocking(false);// 设置非阻塞//        2.建立selector和selector的联系(注册),0代表不监听任何事件SelectionKey sscKey = ssc.register(selector, 0, null);
//        key只关注监听事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("register key {}",sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {
//            3.selector方法,没有事件发生的时候,阻塞,有事件的时候线程才会恢复。
//            如果select事件没有被处理,例如channel.accept();会直接把未处理事件加入到selectorKeys的set集合里面,继续循环。直到处理selector.select();//阻塞
//            4.处理事件Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();if (iterator.hasNext()) {SelectionKey key = iterator.next();// 处理key时候,要从selectedkeys里面把已经处理过的key给删除,防止下次循环出现问题。iterator.remove();log.debug("循环的key: {}",key);
//              5.区分事件类型if(key.isAcceptable()){// 如果是acceptServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();// 如果建立了连接,就会被标记位处理过了,他的事件会被移除,但是key不会从selectedKey中移除。sc.configureBlocking(false);// 设置非阻塞SelectionKey scKey = sc.register(selector, 0, null);// 给客户端连接channel注册读事件scKey.interestOps(SelectionKey.OP_READ);log.debug("{}",sc);log.debug("scKey {}",scKey);}else if (key.isReadable()){SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);channel.read(buffer);buffer.flip();// 切换读debugRead(buffer);}}}}
}

红色为所有注册到seleector的集合,绿色为发生事件时候把红色中的给复制过来,来发生事件,发生完了后会被标记,但不会删除,需要手动删除。
在这里插入图片描述
处理客户端的异常情况,和防止服务端终止

package com.dapeng.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;import static com.dapeng.netty.utils.ByteBufferUtil.debugRead;@Slf4j
public class ServerSelector {public static void main(String[] args) throws IOException {
//        1.创建selector,管理多个channelSelector selector = Selector.open(); // 通过静态方法调用ServerSocketChannel ssc = ServerSocketChannel.open();// 创建serversocketchannelssc.configureBlocking(false);// 设置非阻塞//        2.建立selector和selector的联系(注册),0代表不监听任何事件SelectionKey sscKey = ssc.register(selector, 0, null);
//        key只关注监听事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("register key {}",sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {
//            3.selector方法,没有事件发生的时候,阻塞,有事件的时候线程才会恢复。
//            如果select事件没有被处理,例如channel.accept();会直接把未处理事件加入到selectorKeys的set集合里面,继续循环。直到处理selector.select();//阻塞
//            4.处理事件Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();if (iterator.hasNext()) {SelectionKey key = iterator.next();// 处理key时候,要从selectedkeys里面把已经处理过的key给删除,防止下次循环出现问题。iterator.remove();log.debug("循环的key: {}",key);
//              5.区分事件类型if(key.isAcceptable()){// 如果是acceptServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();// 如果建立了连接,就会被标记位处理过了,他的事件会被移除,但是key不会从selectedKey中移除。sc.configureBlocking(false);// 设置非阻塞SelectionKey scKey = sc.register(selector, 0, null);// 给客户端连接channel注册读事件scKey.interestOps(SelectionKey.OP_READ);log.debug("{}",sc);log.debug("scKey {}",scKey);}else if (key.isReadable()){try {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);int read = channel.read(buffer);// 如果正常断开,read方法返回-1;if (read == -1){key.cancel();}else {buffer.flip();// 切换读debugRead(buffer);}} catch (IOException e) {e.printStackTrace();// 因为客户端断开了,因此需要将key从selector 的key的集合中真正删除掉keykey.cancel();}}}}}
}
    1. 处理消息的边界问题(汉字占win10占3个字节,导致出现乱码)
      在这里插入图片描述
    • 5.1按照约定的规则来,但有可能会造成空间的浪费:(比如约定好都是1024个字节,然后第一个客户端发了满满的,第二个客户端只发了100个字节,后面的空白会造成带宽的浪费)
      在这里插入图片描述

    • 5.2 第二种方式,通过\n来分割,但有问题的是如果传输的内容长度,大于临时的ByteBuffer,会造成读不全,然后得考虑扩容问题。还有就是效率低下。
      在这里插入图片描述

    • 5.3 (Http 1.0 是TLV格式的,Http 2.0 是LTV格式,L:length,T:type,V:value)第三种方式,比较常用的,先读取一个整形,发现需要4byte,然后分配一个4字节长度的ByteBuffer,然后在读一个整形,发现需要8byte长度的ByteBuffer,然后按需分配就行。
      在这里插入图片描述

    • 5.4 使用附属品(attachment)来解决各个channel之间ByteBuffer的隔离。并使用扩容处理消息边界问题。
      在这里插入图片描述

    通过在处理读取事件的时候,注册selecetor上面的同时添加一个attachment,把ByteBuffer注册到上面,然后再到后面处理数据的时候,使用自己的split(Bytebuffer buffer )方法来打印处理,后面做判断buffer的position和limit相等的时候说明buffer满了,然后进行扩容,拷贝到新ByteBuffer上面。

	package com.dapeng.netty.c4;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;
import static com.dapeng.netty.utils.ByteBufferUtil.debugRead;@Slf4j
public class ServerSelector {private static void split(ByteBuffer source){
//        先切换到读模式source.flip();
//        对数据进行处理for (int i = 0; i < source.limit(); i++) {
//            找到一条完整的信息if (source.get(i) == '\n') { // 字符'\n'
//                如果遇到了\n,则说明是一条完整的信息
//                重新新建一个ByteBuffer来接收int length = i + 1 - source.position();ByteBuffer target = ByteBuffer.allocate(length);
//                从source读,像target写for (int j = 0; j < length; j++) {target.put(source.get());}debugAll(target);}}//        处理完以后切换到写模式,如果压缩后的position和limit相等,compact方法会将未读的数据往前压缩,然后令positoin等于未读的位置,比如16字节position = 16 ,limit = 16.source.compact();}public static void main(String[] args) throws IOException {
//        1.创建selector,管理多个channelSelector selector = Selector.open(); // 通过静态方法调用ServerSocketChannel ssc = ServerSocketChannel.open();// 创建serversocketchannelssc.configureBlocking(false);// 设置非阻塞//        2.建立selector和selector的联系(注册),0代表不监听任何事件SelectionKey sscKey = ssc.register(selector, 0, null);
//        key只关注监听事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("register key {}",sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {
//            3.selector方法,没有事件发生的时候,阻塞,有事件的时候线程才会恢复。
//            如果select事件没有被处理,例如channel.accept();会直接把未处理事件加入到selectorKeys的set集合里面,继续循环。直到处理selector.select();//阻塞
//            4.处理事件Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();if (iterator.hasNext()) {SelectionKey key = iterator.next();// 处理key时候,要从selectedkeys里面把已经处理过的key给删除,防止下次循环出现问题。iterator.remove();log.debug("循环的key: {}",key);
//              5.区分事件类型if(key.isAcceptable()){// 如果是acceptServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();// 如果建立了连接,就会被标记位处理过了,他的事件会被移除,但是key不会从selectedKey中移除。sc.configureBlocking(false);// 设置非阻塞// 第三个参数为附件,添加attachmentByteBuffer buffer = ByteBuffer.allocate(16);SelectionKey scKey = sc.register(selector, 0, buffer);// 给客户端连接channel注册读事件,
//                    SelectionKey scKey = sc.register(selector, 0, null);// 给客户端连接channel注册读事件,scKey.interestOps(SelectionKey.OP_READ);log.debug("{}",sc);log.debug("scKey {}",scKey);}else if (key.isReadable()){try {SocketChannel channel = (SocketChannel) key.channel();
//                        ByteBuffer buffer = ByteBuffer.allocate(16);// 1.把从注册到selector上面的channelkey的附件拿到ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer);// 如果正常断开,read方法返回-1;if (read == -1){key.cancel();}else {
//                            buffer.flip();// 切换读
//                            debugRead(buffer);split(buffer);// 如果buffer的position和limit相等,则说明buffer满了,需要处理扩容if (buffer.position() == buffer.limit()){ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();// 切换读newBuffer.put(buffer);key.attach(newBuffer);// 把新的buffer作为附件传送上去。}}} catch (IOException e) {e.printStackTrace();// 因为客户端断开了,因此需要将key从selector 的key的集合中真正删除掉keykey.cancel();}}}}}
}
    1. ByteBuffer大小分配
      在这里插入图片描述
  • 7.处理可写事件。
    服务端
package com.dapeng.netty.c4;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;public class WriteServer {public static void main(String[] args) throws IOException {// 1.建立服务器channelServerSocketChannel ssc = ServerSocketChannel.open();// 2. 设置非阻塞ssc.configureBlocking(false);// 3. 建立selector选择器,防止服务器循环轮询Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT); // 4.设置监听连接// 5.绑定端口ssc.bind(new InetSocketAddress(8080));while (true) {// 6.通过selector阻塞selector.select();// 7.迭代所有的selectorkeysIterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 8.去除掉已经标记完的selectedKeyiterator.remove();// 9.如果为可连接的keyif (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);// 连接的客户端让其读写为非阻塞SelectionKey sckey = sc.register(selector, 0, null);// 把客户端绑定到selectorsckey.interestOps(SelectionKey.OP_READ);// 绑定可读事件// 向客户端发送大量数据StringBuilder builder = new StringBuilder();for (int i = 0; i < 30000000; i++) {builder.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(builder.toString());int write = sc.write(buffer);// 返回值代表写入的字节数System.out.println(write);if (buffer.hasRemaining()) {//关注可读可写事件。sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
//                      sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE;// 采用附件方式,把未写完的buffer放到附件中sckey.attach(buffer);}} else if (key.isWritable()) {ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel channel = (SocketChannel) key.channel();// 继续向客户端发送消息int write = channel.write(buffer);System.out.println(write);// 清理操作,buffer写完后再去把buffer清理掉,防止占内存。还有所关注的可写事件if ( !buffer.hasRemaining()){key.attach(null);// 清除bufferkey.interestOps(key.interestOps() - SelectionKey.OP_WRITE);// 不需要关注可写事件}}}}}
}

客户端

package com.dapeng.netty.c4;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;public class WriteClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress(8080));int count = 0;// 接收数据while (true){ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);int read = sc.read(buffer);// 返回读取的字节数count += read;System.out.println(count);buffer.clear();// 清除buffer,下次继续写入buffer。/切换到写。}}
}
网络编程小结:
非阻塞 VS 阻塞
1.1 阻塞

在这里插入图片描述

1.2 非阻塞

在这里插入图片描述

1.3 多路复用

在这里插入图片描述

selector何时不阻塞

在这里插入图片描述

监听channel事件

在这里插入图片描述

更进一步优化

利用多线程优化(现在都是多核cpu,设计时要充分考虑别让cpu的力量白白浪费)

前面的代码只有一个Selector选择器,没有充分利用到多核cpu,该如何改进呢?

分两组选择器

  • 单线程配一个选择器,专门处理accept事件
  • 创建cpu核心数的线程,每个线程配一个选择器,轮流处理read事件
    在这里插入图片描述
    关于多线程,一个boss里面开启worker线程案例:(需要注意顺序问题,这里使用队列 ConcurrentLinkedDeque ,通过主动唤醒selector来解决顺序问题。)
package com.dapeng.netty.c5;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;@Slf4j
public class ChannelThreadDemo {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//设置非阻塞模式Selector boss = Selector.open(); // 打开选择器SelectionKey bosskey = ssc.register(boss, 0, null);bosskey.interestOps(SelectionKey.OP_ACCEPT);// 监听链接ssc.bind(new InetSocketAddress(8080)); // 绑定端口号// 1. 创建固定数量的worker 并初始化Worker worker = new Worker("worker-0");while (true) {boss.select();// 阻塞监听等待连接Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false); // 设置读非阻塞。log.debug("连接成功。。。。{}", sc.getRemoteAddress());// 2.在这里sc需要注册到workorder的selector上面。log.debug("关联workorder之前");worker.register(sc); //boss调用,初始化selecotr,启动workorder-0;log.debug("关联workorder之后");}}}}static class Worker implements Runnable {private Thread thread; // 线程private Selector selector; // worker选择器private String name; // 线程名private volatile boolean start = false; //还未初始化 volatile保证可见性(A改了B可看见)private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public Worker(String name) {this.name = name;}// 初始化线程和selectorpublic void register(SocketChannel sc) throws IOException {if (!start) {selector = Selector.open();// 开启Selector选择器thread = new Thread(this, name);thread.start();start = true;}// 向队列添加了任务,但这个任务并没有立即执行。queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {e.printStackTrace();}});// 唤醒selectorselector.wakeup();// 还有一种方法:去掉上面的队列,// selector.wakeup();// sc.register(selector, SelectionKey.OP_READ, null);}@Overridepublic void run() {while (true) {try {selector.select();// 监听连接 先等待连接建立SocketChannelRunnable task = queue.poll();if (task != null) {task.run(); //sc.register(selector, SelectionKey.OP_READ, null);}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();log.debug("读取数据:。。。{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}

多线程优化,多个worker
有个重要的点:使用Round Robin轮询调度(RR),使用计数器和worker取余,来进行轮询。(–当前小节-- 为修改的部分)

package com.dapeng.netty.c5;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;@Slf4j
public class ChannelThreadDemo {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);//设置非阻塞模式Selector boss = Selector.open(); // 打开选择器SelectionKey bosskey = ssc.register(boss, 0, null);bosskey.interestOps(SelectionKey.OP_ACCEPT);// 监听链接ssc.bind(new InetSocketAddress(8080)); // 绑定端口号// 1. 创建固定数量的worker 并初始化
//        Worker worker = new Worker("worker-0");// --当前小节-- 1.2多线程的优化,创建多个线程,创建多个固定数量的worker,并初始化Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];// 获取该硬件的核心数量,部署在docker里面会有问题,会直接读取物理硬件,并不会读取docker容器里面的核心数,jdk10版本才会修复此内容。for (int i = 0; i < workers.length; i++) {// 给每个线程起名字workers[i] = new Worker("worker-" + i);}// 定义一个计数器AtomicInteger index = new AtomicInteger();while (true) {boss.select();// 阻塞监听等待连接Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false); // 设置读非阻塞。log.debug("连接成功。。。。{}", sc.getRemoteAddress());// 2.在这里sc需要注册到workorder的selector上面。log.debug("关联workorder之前");
//                    worker.register(sc); //boss调用,初始化selecotr,启动workorder-0;// --当前小节--   优化,使用轮询算法,round robinworkers[index.incrementAndGet() % workers.length].register(sc); //boss调用,初始化selecotr,启动workorder-0;log.debug("关联workorder之后");}}}}static class Worker implements Runnable {private Thread thread; // 线程private Selector selector; // worker选择器private String name; // 线程名private volatile boolean start = false; //还未初始化private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public Worker(String name) {this.name = name;}// 初始化线程和selectorpublic void register(SocketChannel sc) throws IOException {if (!start) {selector = Selector.open();// 开启Selector选择器thread = new Thread(this, name);thread.start();start = true;}// 向队列添加了任务,但这个任务并没有立即执行。// 队列queue.add(() -> {try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {e.printStackTrace();}});// 唤醒selectorselector.wakeup();// 还有一种方法:去掉上面的队列,// selector.wakeup();// sc.register(selector, SelectionKey.OP_READ, null);}@Overridepublic void run() {while (true) {try {selector.select();// 监听连接 先等待连接建立SocketChannelRunnable task = queue.poll();if (task != null) {task.run(); //sc.register(selector, SelectionKey.OP_READ, null);}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();log.debug("读取数据:。。。{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}
}

5. NIO vs BIO

5.1 stream VS channel

在这里插入图片描述

5.2 IO模型(下面的几个IO概念可能不清晰,有个大概,如果要详细看可以百度认真了解IO概念)

(同步阻塞,同步非阻塞,多路复用,异步阻塞(不存在),异步非阻塞)
1.阻塞IO,如下图:【同步阻塞】
阻塞IO会在用户调用read后,会等待内核将数据等待接手后,并复制一份数据再返回到用户。
在这里插入图片描述
2.非阻塞IO,如下图:【同步非阻塞】
之前的案例:while(true)的时候不停地循环读取,当从网络上得到数据后,需要阻塞住,等待数据复制完成才会继续非阻塞,并返回用户数据。(每次系统频繁调用read都会影响系统性能)
在这里插入图片描述
3.多路复用,如下图:【【同步】多路复用】
使用Selector的select无参的方法来阻塞住,等待有事件发生,一旦有事件发生,内核这边会告诉用户有可读事件了,用户会调用read方法去内核读取数据,内核需要阻塞住等待数据的复制完成,再将结果返回给用户。
在这里插入图片描述

【阻塞IO下】:
在这里插入图片描述
在这里插入图片描述
【多路复用下】
多路复用可以在selector下用一个循环把所有事件给处理掉。
在这里插入图片描述
同步和异步

  • 同步:线程自己去获得结果(单个线程)。【严格按照顺序执行】
  • 异步:线程自己不会获得结果,而是由其他线程送结果(至少两个线程)。【遇到需要等待结果的,不用等待直接执行后面程序。】

4.异步,异步情况下,不存在异步阻塞,所以只有【异步非阻塞】
用户有两个线程,线程1负责调用read方法,并立即返回结果。线程2用于等待内核数据的复制完成后,将数据作为参数,调用线程1 的回调方法,返回给用户。这样两个线程就可以实现数据的传输。
在这里插入图片描述

5.3零拷贝(针对java里面不用拷贝)
传统IO问题

传统的IO将一个文件通过socket写出
在这里插入图片描述
内部的工作流程:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

NIO优化

(HeapByteBuffer使用Java内存,跟系统隔离的,使用DirectByteBuffer是跟操作系统共用的,减少了一次拷贝)
在这里插入图片描述
在这里插入图片描述

进一步优化

FileChannel有transferTo方法和transferFrom。
SocketChannel没有
Java里面的transerto 和 transeferfrom对应linux里面的sendFile方法。
在这里插入图片描述
在这里插入图片描述

进一步优化(linux2.4)

在这里插入图片描述
在这里插入图片描述

5.4 AIO

在这里插入图片描述
Netty在5.0版本发现异步IO性能没优势,而且提高了复杂度,所以废弃了5.0版本,最高是4.几的版本。
在这里插入图片描述

文件AIO
package com.dapeng.netty.AIO;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;import static com.dapeng.netty.utils.ByteBufferUtil.debugAll;@Slf4j
public class AioFileChannel {//    多线程才能异步public static void main(String[] args) throws IOException {try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {//            参数1. ByteBuffer(读取数据到该buffer)
//            参数2. 读取的起始位置
//            参数3. 附件
//            参数4. 回调对象 CompletionHandlerByteBuffer buffer = ByteBuffer.allocate(16);log.debug("读取之前----read begin...");channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Override // 读取成功时候调用public void completed(Integer result, ByteBuffer attachment) {log.debug("读取成功---- read completed...{}",result);attachment.flip();// 切换读模式debugAll(attachment);}@Override // 读取失败public void failed(Throwable exc, ByteBuffer attachment) {log.error("读取失败");exc.printStackTrace();}});log.debug("读取完成----read finished...");} catch (IOException e) {e.printStackTrace();}//.twr用来写try catchSystem.in.read(); // 控制台输入,用来阻塞主线程,来等待子守护线程的读取结果,如果不阻塞,会导致回调对象新开的线程也会结束}
}
网络AIO

暂时没有相关资料,可以百度搜搜


http://www.mrgr.cn/news/55971.html

相关文章:

  • 如何将 Docker 镜像的 tar 文件迁移到另一台服务器并运行容器
  • 将 Ubuntu 系统中的 **swap** 空间从 2GB 扩展到 16GB
  • STM32启动文件浅析
  • 软件I2C的代码
  • Redis入门:在Java程序中高效使用Redis
  • es 全文文本分词查询
  • 十行代码实现命令行书签
  • Linux使用nc(netcat)命令检测网络端口是否畅通以及Linux查看CPU架构命令arch及CentOS中取版本的问题
  • Spring AI : Java写人工智能的应用框架
  • 正大金融市场的跨境投资机遇与挑战分析
  • 【数字IC】【低功耗】UPF/CPF
  • 郑州网站制作优化你的网站以吸引流量
  • 机器人学习仿真框架
  • 骨传导耳机哪个牌子最好?真实测评五大年度热门单品机型
  • 【直播回放】达索系统赋能新电池产业链数字仿真一体化协同解决方案
  • 软件源码,招投标管理系统,询价管理系统,供应商管理系统,一体化管理系统,供应链管理(springboot+vue+mysql)
  • 2024年墨西哥金融科技报告解读(上)| 从基础到前沿(附下载)
  • sdads
  • SiLM266x系列SiLM2660CD-DG 可配置的电池组电压检测功能 高压电池组前端充/放电高边NFET驱动器
  • 宠物电商新篇章:SpringBoot驱动的在线交易网站
  • comfyui替换电商模特工作流,模特们要真的要失业了吗?
  • 歌曲伴奏去哪里找?轻松获取你喜欢的伴奏
  • 10.17 多进程编程
  • 2024年9月青少年软件编程Python等级考试(一级)真题试卷及答案
  • 离线安装lrzsz
  • 代购系统搭建涉及到哪几方面❓