【p2p、分布式,区块链笔记 Torrent】WebTorrent 的lt_donthave插件
扩展实现
- https://github.com/webtorrent/lt_donthave/blob/master/index.js
/*! lt_donthave. MIT License. WebTorrent LLC <https://webtorrent.io/opensource> */// 导入所需模块
import arrayRemove from 'unordered-array-remove' // 用于从数组中删除元素的函数
import { EventEmitter } from 'events' // 导入事件发射器类
import debugFactory from 'debug' // 导入调试工具// 创建一个调试实例,用于记录调试信息
const debug = debugFactory('lt_donthave')// 导出一个函数,该函数返回 ltDontHave 类
export default () => {// 定义 ltDontHave 类,继承自 EventEmitterclass ltDontHave extends EventEmitter {constructor (wire) {super() // 调用父类构造函数// 初始化属性this._peerSupports = false // 标记对等体是否支持 'lt_donthave'this._wire = wire // 保存 wire 对象,表示与对等体的连接}// 当接收到扩展握手时调用,表示对等体支持 'lt_donthave'onExtendedHandshake () {this._peerSupports = true}// 处理接收到的消息onMessage (buf) {let index // 存储块索引try {// 创建 DataView 从接收到的缓冲区中读取数据const view = new DataView(buf.buffer)index = view.getUint32(0) // 获取消息中的块索引} catch (err) {// 如果消息无效,直接丢弃return}// 如果对等体没有该块,直接返回if (!this._wire.peerPieces.get(index)) returndebug('got donthave %d', index) // 记录调试信息this._wire.peerPieces.set(index, false) // 标记该块为不拥有this.emit('donthave', index) // 触发 'donthave' 事件this._failRequests(index) // 处理失败的请求}// 向远程对等体发送不拥有的块索引donthave (index) {if (!this._peerSupports) return // 如果对等体不支持,直接返回debug('donthave %d', index) // 记录调试信息const buf = new Uint8Array(4) // 创建一个 4 字节的缓冲区const view = new DataView(buf.buffer) // 创建 DataViewview.setUint32(0, index) // 将块索引写入缓冲区console.log(">>>>>>>>>>>>>>>>", index, ">>>>>>>>>>>>>>", buf) // 打印调试信息this._wire.extended('lt_donthave', buf) // 发送 'lt_donthave' 消息}// 处理失败的请求_failRequests (index) {const requests = this._wire.requests // 获取当前请求列表for (let i = 0; i < requests.length; i++) {const req = requests[i] // 获取当前请求if (req.piece === index) { // 如果请求的块索引与指定索引匹配arrayRemove(requests, i) // 从请求列表中删除该请求i -= 1 // 更新索引以检查新值this._wire._callback(req, new Error('peer sent donthave'), null) // 调用回调函数,报告请求失败}}}}// 设置扩展名称ltDontHave.prototype.name = 'lt_donthave'return ltDontHave // 返回定义的类
}
测试代码
import fixtures from 'webtorrent-fixtures'
import Protocol from 'bittorrent-protocol'
import test from 'tape'
import ltDontHave from './index.js'const { leaves } = fixturesconst id1 = Buffer.from('01234567890123456789')
const id2 = Buffer.from('12345678901234567890')const wire1 = new Protocol()
wire1.peerPieces.set(30, true) // 在 wire1 的 peerPieces 中标记块 30 为已拥有
wire1.peerPieces.set(29, false)
console.log("wire1 中的块 30 是否存在",wire1.peerPieces.get(30))const wire2 = new Protocol()wire1.pipe(wire2).pipe(wire1)// 将 wire1 和 wire2 连接起来,形成双向数据流
wire1.use(ltDontHave())
wire2.use(ltDontHave())// 设置 wire2 的握手事件处理程序
wire2.on('handshake', (infoHash, peerId, extensions) => {// 在握手完成后,调用 wire2 的 handshake 方法console.log("2. wire2.on('handshake') infoHash:",infoHash,"peerId:", peerId)console.log("3. 在握手完成后,调用 wire2 的 handshake 方法,infoHash:",leaves.parsedTorrent.infoHash,"peerid:", id2)wire2.handshake(leaves.parsedTorrent.infoHash, id2)
})// 设置 wire2 的扩展事件处理程序
wire2.on('extended', ext => {if (ext === 'handshake') { // 如果扩展事件是握手console.log("4. wire2 extended 发送 donthaven 消息,表示 wire2 不拥有块 30")wire2.lt_donthave.donthave(30)// 发送 "donthave" 消息,表示 wire2 不拥有块 30wire2.lt_donthave.donthave(29)}
})// 设置 wire1 的 "donthave" 消息事件处理程序(由protocol的index中的this._wire.extended('lt_donthave', buf)决定是否调用)
wire1.lt_donthave.on('donthave', (index) => {// 验证接收到的索引是否为 30console.log("5. wire1.lt_donthave.on('donthave') 接收到的索引为:",index)//t.equal(index, 30)// 检查 wire1 中的块 30 是否被清除console.log("6. wire1 中的块 30 是否存在",wire1.peerPieces.get(30))//t.notOk(wire1.peerPieces.get(30), 'piece 30 cleared in bitfield')console.log("6. wire1 中的块 29 是否存在",wire1.peerPieces.get(29))
})// 在 wire1 上执行握手,传入信息哈希和对等体 ID
console.log("1. 在 wire1 上执行握手,传入infoHash(",leaves.parsedTorrent.infoHash,") 和 peerId(", id1,")")
wire1.handshake(leaves.parsedTorrent.infoHash, id1)
测试输出
PS C:\Users\kingchuxing\Documents\MTGIT\lt_donthave-master> node .\mtest.js
wire1 中的块 30 是否存在 true
1. 在 wire1 上执行握手,传入infoHash( d2474e86c95b19b8bcfdb92bc12c9d44667cfa36 ) 和 peerId( <Buffer 30 31 32 33 34 35 36 37 38 39 30 31 32 33 34 35 36 37 38 39> )
2. wire2.on('handshake') infoHash: d2474e86c95b19b8bcfdb92bc12c9d44667cfa36 peerId: 3031323334353637383930313233343536373839
3. 在握手完成后,调用 wire2 的 handshake 方法,infoHash: d2474e86c95b19b8bcfdb92bc12c9d44667cfa36 peerid: <Buffer 31 32 33 34 35 36 37 38 39 30 31 32 33 34 35 36 37 38 39 30>
4. wire2 extended 发送 donthaven 消息,表示 wire2 不拥有块 30
>>>>>>>>>>>>>>>> 30 >>>>>>>>>>>>>> Uint8Array(4) [ 0, 0, 0, 30 ]
>>>>>>>>>>>>>>>> 29 >>>>>>>>>>>>>> Uint8Array(4) [ 0, 0, 0, 29 ]
5. wire1.lt_donthave.on('donthave') 接收到的索引为: 30
6. wire1 中的块 30 是否存在 false
6. wire1 中的块 29 是否存在 false
lt_donthave
插件的对某个index进行false标记过程:- wire1进行握手
wire1.handshake(leaves.parsedTorrent.infoHash, id1)
,执行Wire对象的handshake (infoHash, peerId, extensions)
函数,但是没有执行
if (this.peerExtensions.extended && !this._extendedHandshakeSent) {// Peer's handshake indicated support already// (incoming connection)this._sendExtendedHandshake()}
- 后续会执行
_onHandshake
,并在中途触发wire2.on('handshake'
,然后执行wire2的handshake。
// 设置 wire2 的握手事件处理程序
wire2.on('handshake', (infoHash, peerId, extensions) => {// 在握手完成后,调用 wire2 的 handshake 方法console.log("2. wire2.on('handshake') infoHash:",infoHash,"peerId:", peerId)console.log("3. 在握手完成后,调用 wire2 的 handshake 方法,infoHash:",leaves.parsedTorrent.infoHash,"peerid:", id2)wire2.handshake(leaves.parsedTorrent.infoHash, id2)
})
- 但是,与上次不同,这次的握手会调用
_sendExtendedHandshake()
,然后调用_onMessage
方法
_onMessage
_onMessage
方法用于解析并处理从远程对等体接收到的不同类型的消息。它根据消息的第一个字节(标识消息类型)来调用相应的处理函数,确保能够正确响应对等体的请求和状态变化。对于未知的消息类型,会记录调试信息并触发相应事件。
/*** Handle a message from the remote peer.* @param {Uint8Array} buffer // 接收到的消息缓冲区,类型为 Uint8Array*/
_onMessage (buffer) {// 解析消息的长度,调用 _onMessageLength 方法处理this._parse(4, this._onMessageLength)// 创建 DataView,用于从 buffer 中以不同格式读取数据const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength)// 根据消息的第一个字节决定处理的方式switch (buffer[0]) {case 0: // "choke" 消息return this._onChoke()case 1: // "unchoke" 消息return this._onUnchoke()case 2: // "interested" 消息return this._onInterested()case 3: // "uninterested" 消息return this._onUninterested()case 4: // "have" 消息return this._onHave(view.getUint32(1)) // 获取块索引并处理case 5: // "bitfield" 消息return this._onBitField(buffer.slice(1)) // 处理位图数据case 6: // "request" 消息return this._onRequest(view.getUint32(1), // 获取块索引view.getUint32(5), // 获取块的偏移view.getUint32(9) // 获取块的长度)case 7: // "piece" 消息return this._onPiece(view.getUint32(1), // 获取块索引view.getUint32(5), // 获取块的偏移buffer.slice(9) // 获取块数据)case 8: // "cancel" 消息return this._onCancel(view.getUint32(1), // 获取块索引view.getUint32(5), // 获取块的偏移view.getUint32(9) // 获取块的长度)case 9: // "port" 消息return this._onPort(view.getUint16(1)) // 获取端口号并处理case 0x0D: // "suggest" 消息return this._onSuggest(view.getUint32(1)) // 获取建议的块索引case 0x0E: // "have all" 消息return this._onHaveAll() // 处理拥有所有块的消息case 0x0F: // "have none" 消息return this._onHaveNone() // 处理不拥有任何块的消息case 0x10: // "reject" 消息return this._onReject(view.getUint32(1), // 获取被拒绝的块索引view.getUint32(5), // 获取偏移view.getUint32(9) // 获取长度)case 0x11: // "allowed fast" 消息return this._onAllowedFast(view.getUint32(1)) // 获取可以快速请求的块索引case 20: // "extended" 消息return this._onExtended(buffer[1], buffer.slice(2)) // 处理扩展消息default: // 未知消息类型this._debug('got unknown message') // 输出调试信息return this.emit('unknownmessage', buffer) // 触发未知消息事件}
}
_onExtended
_onExtended
方法处理来自远程对等体的扩展消息。首先,它检查是否为扩展握手消息(ext === 0
),然后解码并保存握手信息,处理扩展映射并调用相应的扩展处理器。如果消息是其他类型的扩展消息,则将其传递给相应的处理器。最后,它触发与扩展相关的事件,方便其他模块进行相应处理。
- 以下是代码实现:
_onExtended (ext, buf) {// 检查扩展类型是否为0,表示扩展握手消息if (ext === 0) {let infotry {// 尝试解码扩展握手消息的内容info = bencode.decode(buf)} catch (err) {// 如果解码失败,输出调试信息并忽略该消息this._debug('ignoring invalid extended handshake: %s', err.message || err)}// 如果信息为空,直接返回if (!info) return// 保存对等体的扩展握手信息this.peerExtendedHandshake = info// 检查握手信息中的扩展映射if (typeof info.m === 'object') {// 遍历每个扩展名,将其转换为数字并存储for (const name in info.m) {this.peerExtendedMapping[name] = Number(info.m[name].toString())}}// 遍历已注册的扩展for (const name in this._ext) {// 如果对等体支持该扩展,则调用其握手处理方法if (this.peerExtendedMapping[name]) {this._ext[name].onExtendedHandshake(this.peerExtendedHandshake)}}// 输出调试信息,表示收到了扩展握手this._debug('got extended handshake')// 触发扩展握手事件,即lt_donthave的wire2.on('extended', ext => {this.emit('extended', 'handshake', this.peerExtendedHandshake)} else {// 如果扩展类型不是0,则处理其他扩展消息if (this.extendedMapping[ext]) {// 将扩展类型转换为友好的名称ext = this.extendedMapping[ext] // 检查是否有注册的扩展处理器if (this._ext[ext]) {// 调用对应扩展的消息处理方法this._ext[ext].onMessage(buf)}}// 输出调试信息,显示收到的扩展消息类型this._debug('got extended message ext=%s', ext)// 触发扩展消息事件this.emit('extended', ext, buf)}
}
- 以上代码的第46行
this._ext[ext].onMessage(buf)
调用onMessage
方法 - lt_donthave的onMessage方法(扩展中只有这个函数操作了标记是否有数据的peerPieces对象):
onMessage (buf) {let indextry {const view = new DataView(buf.buffer)index = view.getUint32(0)} catch (err) {// drop invalid messagesreturn}if (!this._wire.peerPieces.get(index)) returndebug('got donthave %d', index)this._wire.peerPieces.set(index, false)this.emit('donthave', index)this._failRequests(index)}
使用示例
import BitField from 'bitfield'
import Protocol from 'bittorrent-protocol'
import net from 'net'net.createServer(socket => {var wire = new Protocol()socket.pipe(wire).pipe(socket)// handle handshakewire.on('handshake', (infoHash, peerId) => {wire.handshake(Buffer.from('my info hash'), Buffer.from('my peer id'))// advertise that we have all 10 pieces of the torrentconst bitfield = new BitField(10)for (let i = 0; i <= 10; i++) {bitfield.set(i, true)}wire.bitfield(bitfield)})}).listen(6881)
- npm install lt_donthave
import BitField from 'bitfield'
import Protocol from 'bittorrent-protocol'
import net from 'net'
import lt_donthave from 'lt_donthave'net.createServer(socket => {const wire = new Protocol()socket.pipe(wire).pipe(socket)// initialize the extensionwire.use(lt_donthave())// all `lt_donthave` functionality can now be accessed at wire.lt_donthavewire.on('request', (pieceIndex, offset, length, cb) => {// whoops, turns out we don't have any pieces after allwire.lt_donthave.donthave(pieceIndex)cb(new Error('not found'))})// 'donthave' event will fire when the remote peer indicates it no longer has a piecewire.lt_donthave.on('donthave', index => {// remote peer no longer has piece `index`})// handle handshakewire.on('handshake', (infoHash, peerId) => {wire.handshake(Buffer.from('my info hash'), Buffer.from('my peer id'))// advertise that we have all 10 pieces of the torrentconst bitfield = new BitField(10)for (let i = 0; i <= 10; i++) {bitfield.set(i, true)}wire.bitfield(bitfield)})}).listen(6881)