CS144_01
CS144—01
TCP协议解决什么问题
因为网络延迟或者路由转发等原因,导致无法保证package的有序到达, TCP协议是一个工作在传输层的可靠数据传输服务; 它可以确保接收端的网络包是无损坏的、无间隔、无冗余
- 可靠传输:TCP确保数据从一个网络设备传输到另一个网络设备时不丢失、不重复,并且保持顺序。它通过确认(ACK)和重传机制来保证数据的可靠传输
- 流量控制:TCP使用滑动窗口算法来控制每次传输的数据量,避免发送方发送数据过快,接收方来不及处理的情况,从而避免网络拥塞和数据丢失
- 拥塞控制:TCP通过一系列算法(如慢启动、拥塞避免、快速重传和快速恢复)来检测网络拥塞,并相应地调整数据传输速率,以减少或避免网络拥塞的发生
- 数据包顺序:TCP为每个数据包分配一个序列号,确保数据按照发送的顺序到达接收方。如果数据包到达的顺序错乱,TCP协议会重新排序
- 错误检测:TCP头部包含一个校验和字段,用于检测数据在传输过程中是否发生错误
- 连接的建立与终止:TCP使用三次握手过程来建立连接,确保双方都准备好数据交换,并使用四次挥手过程来终止连接,确保所有数据都已经传输完成
first
搭建一个可靠的字节流
通俗来说:需要实现一个可以支持一些读写操作的buffer
-
读写指针:缓冲区应该有两个指针,一个用于写入操作(写指针),一个用于读取操作(读指针)。这两个指针可以帮助管理缓冲区中的数据
-
数据存储:缓冲区需要有足够的空间来存储字节序列,通常是一个字节数组
-
读写操作:
- 写入操作:能够将数据追加到缓冲区的末尾,并移动写指针。
- 读取操作:能够从缓冲区的开始读取数据,并移动读指针
-
空间管理:
- 空闲空间查询:能够查询缓冲区中还有多少空闲空间可用于写入。
- 已用空间查询:能够查询缓冲区中已存储了多少数据
-
缓冲区满/空状态检测:
- 当缓冲区满时,写入操作应该能够等待或者通知调用者。
- 当缓冲区为空时,读取操作应该能够等待或者通知调用者
1、编辑/libsponge/byte_stream.hh中的字节流类
依据上面的可靠字节流的性质
成员变量:
class ByteStream {private:// Your code here -- add private members as necessary.std::deque<char>buf{}; // 用于存储实际数组的缓冲,使用双端队列存储缓存size_t _capacity = 0; // 字节流最大容量size_t _bytes_written = 0; 从流中读取的字节数size_t _bytes_read = 0; 写入流中的字节数bool _input_ended = false; 标志位,表示输入是否已经结束bool _error = false; 标志位,表示流是否遇到错误ByteStream(const size_t capacity);: 构造函数,初始化流的最大容量size_t write(const std::string &data);: 将一个字符串写入流中,返回实际写入的字节数。size_t remaining_capacity() const;: 返回流还能容纳多少额外字节。void end_input();: 标记流输入结束。void set_error();: 设置错误标志std::string peek_output(const size_t len) const;: 查看流中接下来的 len 个字节,但不从缓冲区中移除它们。void pop_output(const size_t len);: 从缓冲区中移除 len 个字节。std::string read(const size_t len);: 读取并移除流中的 len 个字节,返回读取的数据bool input_ended() const;: 检查输入是否结束。bool error() const;: 检查流是否遇到错误。size_t buffer_size() const;: 返回当前可以从流中读取的最大字节数。bool buffer_empty() const;: 检查缓冲区是否为空。bool eof() const;: 检查输出是否已到达末尾size_t bytes_written() const;: 返回写入流中的总字节数。size_t bytes_read() const;: 返回从流中读取的总字节数
具体实现:
构造函数;给缓存区设置 字节容量大小
ByteStream::ByteStream(const size_t capacity) : _capacity(capacity) {}获取待写入的数据的长度,判断是否大于缓冲区剩余容量大小,如果大于了,就只取剩余缓冲区大小
size_t ByteStream::write(const string &data) {size_t len = data.length();if (len > _capacity - _buffer.size()) {len = _capacity - _buffer.size();}_write_count += len;string s;s.assign(data.begin(), data.begin() + len);_buffer.append(BufferList(move(s)));return len;
}
更新_write_count写入计数,该变量用于跟踪已经写入 ByteStream 的总字节数
提取待写入字符串的len个字节大小的子串
将字符串输入存入_buffer缓存中返回实际写入的字节数
查看队列中len个字节的字符串
string Bytestream::peek output(const size_t len) const {size_t peeked len = min(len, buf.size());return string(buf.begin(), buf.begin() + peeked len);
弹出len长度的字节
void ByteStream::pop output(const size_t len) {size_t popped_len = min(len, _buf.size());_buf.erase(buf.begin(), _buf.begin() + popped_len);_bytes_read += _popped_len;
}
读取len长度的字节
string ByteStream::read(const size_t len) {if (len == 0) {return "";}string res = peek_output(len);pop_output(res.size());return res;
}
重组乱序到达的包
1、已经被上层应用所读取的包
这种形态的包已经通过了网络协议栈的所有层,并最终被上层应用程序读取和处理数据包的内容已经被应用程序消费。
数据包在缓冲区中的存在形式可能是原始的字节数据,但通常会被解析成更高级别的数据结构(如HTTP请求、文件数据等)。
缓冲区中的这部分数据通常会被清除,以释放空间供后续数据包使用
2、存放在byteStream中的包
这种形态的包指的是已经通过网络层到达传输层(如TCP),并且被放入一个 ByteStream 类型的缓冲区中,等待被应用程序读取数据包可能是一个完整的TCP段,或者是多个TCP段的组合。
数据包在 ByteStream 中是以字节流的形式存在,可能包含一个或多个网络层和应用层的数据包。
数据包在 ByteStream 中按照到达顺序排列,但可能尚未被应用程序读取。
ByteStream 负责维护数据的完整性,确保应用程序读取到的数据是按序且完整的
3、缓冲区未被组装的包
这种形态的包通常指的是在网络层或传输层,由于网络延迟、丢包、乱序等问题,数据包尚未被完全组装成完整数据的一部分数据包可能是网络层的一个分片,或者是传输层的一个段。
这些数据包可能需要通过序列号或其他标识符来重新组装成一个完整的数据包。
在缓冲区中,这些数据包可能是分散的,等待其他分片或段的到达。
缓冲区管理这些分片或段的存储,并在所有分片或段到达后进行组装
capacity由2、3两种共同组成的
class StreamReassembler {private:// Your code here -- add private members as necessary.ByteStream _output; //!< The reassembled in-order byte streamsize_t _capacity; //!< The maximum number of bytesstd::map<size_t, std::string> _wait_map{};size_t _wait_index;size_t _eof_index;std::string truncate_data(const std::string &data, uint64_t index);void update_waitmap();void checked_insert(const std::string &data, uint64_t index);_output:类型为 ByteStream,用于保存最终重组的有序字节流。这个 ByteStream 对象将接收按正确顺序插入的数据片段。
_capacity:表示 StreamReassembler 的容量,即可以存储的最大字节数。它限制了字节流的大小,避免占用过多内存。
_wait_map:一个 std::map,用于存储等待重组的数据片段。键是数据片段在流中的起始索引,值是数据片段(字符串)。_wait_map 的设计使得每个数据片段可以以键值对的方式存储,方便按索引顺序访问。
_wait_index:记录当前等待接收数据的起始索引。也就是说,_wait_index 表示流中下一个应该重组的字节的索引位置。
_eof_index:记录字节流结束的位置索引,表示重组的字节流的终止位置。此变量通常在接收到流的结束标记(EOF)后设置,用于检查数据是否已完整重组
插入数据时,拦截StreamReassembler的容量限制
string StreamReassembler::truncate_data(const string &data,uint64_t index){size_t trunc_size = min(data.size(),_capacity+_output.bytes_read()-index);trunc_size = min(trunc_size,_capacity - _output.buffer_size()-unassembled_bytes());return data.substr(0,trunc_size);
}
清理那些已经被处理或部分被处理的数据片段,并重新插入那些仍有未处理部分的片段,以便后续的重组过程能够顺利进行
void StreamReassembler::update_waitmap(){for(auto it = _wait_map.begin();it !=_wait_map.end();){size_t index = (*it).first;if (index< _wait_index){string data=(*it).second;it = _wait_map.erase(it);if(index+data.size()>_wait_index){data=data.substr(_wait_index - index);index=_wait_index;checked_insert(data,index);}}else{++it;}}
}
checked_insert操作
void StreamReassembler::checked_insert(const string &data,uint64_t index){string ins_data =data;size_t ins_start=index,ins_end=index+ins_data.size()-1;for(auto it=_wait_map.begin();it!=_wait_map.end();){const string &elem_data=(*it).second;size_t elem_start=(*it).first,elem_end=elem_start+elem_data.size()-1;if(ins_start<=elem_end&&elem_start<=ins_end){if(ins_start<=elem_start&&ins_end>=elem_end){it=_wait_map.erase(it);}else if(elem_start<=ins_start&&elem_end>=ins_end){ins_data.clear();++it;}else{if(ins_start<=elem_start){ins_data+=elem_data.substr(ins_end+1-elem_start,elem_end-ins_end);}else{index=elem_start;ins_data.insert(0,elem_data.substr(0,ins_start-elem_start));}it=_wait_map.erase(it);}}else{++it;}}if(!ins_data.empty()){_wait_map.insert(make_pair(index,truncate_data(ins_data,index)));}
}
将合并好的片段,进行组合,然后删除在_wait_map哈希表中的片段,并且生成新的片段
eg:
片段 1 到达:"World" 从位置 7 开始data = "World", index = 7此时 _wait_map 为空,所以直接插入 "World"_wait_map = { {7, "World"} }片段 2 到达:"Hello, " 从位置 0 开始data = "Hello, ", index = 0遍历 _wait_map,发现 ins_data("Hello, ")的结束位置 ins_end = 6 与 _wait_map 中的片段 "World" 的起始位置 elem_start = 7 是连续的此时合并 ins_data 和 _wait_map 的片段,得到 "Hello, World"清空原有片段并插入合并后的数据_wait_map = { {0, "Hello, World"} }片段 3 到达:"! " 从位置 12 开始输入:data = "! ", index = 12遍历 _wait_map,发现 ins_data("! ")的起始位置 ins_start = 12 在当前已存储片段 "Hello, World" 的结束位置 elem_end = 11 的后面,且它们是连续的将 "! " 合并到现有片段中_wait_map = { {0, "Hello, World!"} }经过三次插入操作后,_wait_map 中包含了完整的字符串 "Hello, World!",按照正确的顺序存储,且没有冗余和重叠
阶段梳理
ByteStream
作用:
ByteStream
是一个字节流缓冲区,存储从网络或其他输入源接收到的字节数据,以便上层应用可以顺序读取- 可以将它理解成一个连续的字节队列。数据从一端写入,从另一端读取
ByteStream
使用双端队列来管理存储的数据块。每个数据块是一个字符串,用来保存接收的字节序列ByteStream
的上层应用通常是网络或文件数据的消费者,例如文件解析器、流媒体播放器等,它们希望能够顺序读取连续的数据段
为什么需要 StreamReassembler
?
- 实际网络传输中,数据可能会 乱序 到达。TCP 协议在传输数据时,可能因为网络延迟、丢包或其他原因,导致数据包的到达顺序与发送顺序不一致
StreamReassembler
的任务就是将这些乱序的数据重新组装成有序的字节流,并提供给ByteStream
- 重组的是乱序到达的字节片段,使它们变成一段连续、顺序的流数据
举个形象的例子
假设有一个字节流数据需要传输:“Hello, World!”,但数据在传输过程中被拆分成多个片段,乱序到达
-
分片端:
- 片段 A: “World”(从位置 7 开始)
- 片段 B: "Hello, "(从位置 0 开始)
- 片段 C: “!”(从位置 12 开始)
-
这些片段到达
StreamReassembler
的顺序可能是 A -> C -> B(而不是按发送顺序 B -> A -> C) -
StreamReassembler
重组流程:当片段 A 到达,发现它是从位置 7 开始的,而数据流起始位置是 0,于是暂时存放在 _wait_map 中等待。片段 C 到达,也放入 _wait_map,因为还无法将其拼接到数据流上。片段 B 到达,它正好从位置 0 开始。StreamReassembler 将 B 的数据 "Hello, " 写入 ByteStream,并更新 _wait_index(当前等待的下一个字节位置)。之后,StreamReassembler 检查 _wait_map,发现片段 A 可以拼接到当前流上,紧接着写入 "World"。再一次更新 _wait_index。最后,将片段 C 中的 "!" 拼接完成
-
update_waitmap
的作用上面的流程中,update_waitmap 的作用是在每次更新 _wait_index 后,清理 _wait_map 中那些已经不再需要或已经部分使用的片段。比如
- 当片段 B 写入后,
_wait_index
更新为 7 - update_waitmap 会清除 _wait_map 中所有小于 7 的片段
- 此过程避免了数据冗余,并确保了只保留那些未来可能用得上的片段
- 当片段 B 写入后,
lab0 lab1结束
代码主要参考公众号:科创视野 大佬所提供的实验笔记
实验难度还是比较大的