当前位置: 首页 > news >正文

(undone) MIT6.824 Lecture 02 - RPC and Threads

知乎专栏:https://zhuanlan.zhihu.com/p/641105196

原视频:https://www.bilibili.com/video/BV16f4y1z7kn?spm_id_from=333.788.videopod.episodes&vd_source=7a1a0bc74158c6993c7355c5490fc600&p=2


看知乎专栏

一、Why we choose go?

  • 对于线程和RPC调用支持非常好
  • 有一个好的垃圾回收机制,线程不需要去手动释放分配的空间
  • 编译型语言,运行时开销不大

二、What is Thread and Why We need?

  • 线程,多线程程序有多个执行点,共享地址空间,能够访问相同的数据。
  • Why?:
    • I/O 并发性 —— 同时发起多个网络请求
    • 允许多核并行 —— 让不同的goroutine在不同的核上运行
    • 便捷性 —— 可以在后台定期执行一些事情,可以启动一个线程

三、Thread Challenges:

  • 竞态条件( Race Conditions )
    • 解决办法1:避免共享内存(go推荐使用信道传值,而非直接共享)
    • 解决办法2:使用锁,让操作变成原子的。(go可以启用竞争检测器,-race参数,检测到大部分的竞态条件)
  • 合作 (coordination)/协作:
    • 解决办法1:信道,可以用于协调和共享
    • 解决办法2:条件变量
  • 死锁问题:不同的线程互相等待导致死锁
    • 最简单的死锁就是当你只有一个线程,且你在往某个信道里写数据的时候,由于没有线程从该信道中读取数据,所以会导致主线程阻塞,导致死锁发生。

四、Go 通过什么来应对这些挑战?

  • go可以启用竞争检测器,使用-race参数,检测到大部分的竞态条件
  • 当多个线程去共享一个变量的时候,你就要注意是否有竞态条件发生了
  • 示例程序:(有竞态发生的投票程序)
    • 在 main() 函数中,程序使用 rand.Seed() 函数初始化随机数生成器,以确保每次运行程序时生成的随机数序列都不同。
    • 然后,程序使用 for 循环启动了 10 个 goroutine(Go 语言中的轻量级线程),每个 goroutine 都会调用 requestVote() 函数,并根据返回值更新计数器 count 和 finished
    • requestVote() 函数会休眠一段随机时间来模拟远程调用,然后随机返回一个布尔值,模拟投票的过程。
    • 最后,程序使用 for 循环和条件语句等待投票结果。如果收到了至少 5 个投票,程序输出 “received 5+ votes!”;否则,程序输出 “lost”。
package mainimport "time"
import "math/rand"func main() {rand.Seed(time.Now().UnixNano())count := 0finished := 0// 使用 for 循环启动了 10 个 goroutine(Go 语言中的轻量级线程),每个 goroutine 都会调用 requestVote() 函数,// 并根据返回值更新计数器 count 和 finishedfor i := 0; i < 10; i++ {go func() {vote := requestVote()if vote {count++}finished++}()}// 等待得到 5 个 count,或者 finished == 10 为止for count < 5 && finished != 10 {// wait}if count >= 5 {println("received 5+ votes!")} else {println("lost")}
}// 随机睡眠一段时间,然后随机返回 0/1
func requestVote() bool {time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)return rand.Int() % 2 == 0
}
1)通过 锁和条件变量(在需要共享内存时适合使用)
  • 注意上述代码是有竞态条件的,多个线程会共享访问count和finished变量,所以我们可以加锁
    • 下面代码就是在访问变量前获取一个同步锁来解决问题
var mu sync.Mutex
for i := 0; i < 10; i++ {go func() {vote := requestVote()mu.Lock()defer mu.Unlock()if vote {count++}finished++}()
}for {mu.Lock() // TODO: 感觉这里不用加锁if count >= 5 || finished == 10 {break}mu.Unlock()
}
  • 然而,我们又发现,在判断count和finished是否满足条件的地方,实际上是一个不停的for循环,即在等待过程中是一个不停的获取锁释放锁的过程,我们称之为自旋,这会浪费CPU资源。一种方法是可以在每次for循环结束前Sleep一段时间,另一种方法就是使用条件变量,如下:
    • cond有两个原语:Signal和Broadcast,一个对应单独通知等待者,一个对应广播通知等待者,此处我们只有一个等待者所以两者效果一样。
    • Wait原语:使线程陷入睡眠,并释放和该条件变量相关联的锁,等待被唤醒。当其被唤醒时,重新拿到和该条件变量相关联的锁。
    • 下面的代码定义了一个同步锁,一个条件变量,条件变量和锁绑定。每次goroutine执行完毕后,会使用条件变量广播等待者。在后续循环时候,主线程先获得锁,然后条件变量调用wait原语,主线程陷入睡眠并释放和该条件变量相关联的锁,等待被唤醒。
var mu sync.Mutex
cond := sync.NewCond(&mu)for i := 0; i < 10; i++ {go func() {vote := requestVote()mu.Lock()defer mu.Unlock()if vote {count++}finished++cond.Broadcast()}()
}
mu.Lock()
for count < 5 && finished != 10 {cond.Wait()
}
2)通过 信道 Channels(在不需要共享内存时适合使用)
  • 如果使用信道来书写这个代码的话,就意味着竞态条件不存在了,因为修改count和finished变量的只有主线程,goroutine只会向信道发送变量值,然后主线程通过信道接受值并且修改count和finished变量的值。
ch := make(chan bool)
for i := 0; i < 10; i++ {go func() {ch <- requestVote()}()
}
for count < 5 && finished < 10 {v := <-ch // 主线程会在读取通道处阻塞,直到 go线程 向通道发送数据if v {count += 1}finished += 1
}

五、爬虫程序示例:

  • 目标:
    • I/O 并发性
    • 正确性:对于单个页面仅爬取一次
    • 多核并发性能
1、顺序执行版本
// 定义了一个接口,要求实现 Fetch 方法,用于获取某个 URL 下的所有链接。
type Fetcher interface {// Fetch returns a slice of URLs found on the page.// urls: 该页面包含的所有 URL([]string)// err: 如果发生错误(如 URL 不存在),返回错误信息。Fetch(url string) (urls []string, err error)
}// fakeFetcher is Fetcher that returns canned results. (模拟抓取器)
// fakeFetcher 是一个 map,键是 URL(string),值是指向 fakeResult 的指针。
type fakeFetcher map[string]*fakeResult// body: 模拟网页的内容(字符串)。
// urls: 该页面包含的所有链接([]string)。
type fakeResult struct {body stringurls []string
}// 检查 fakeFetcher 是否包含给定的 url:
func (f fakeFetcher) Fetch(url string) ([]string, error) {// 如果存在,返回该 URL 对应的 fakeResult.urls(所有子链接)。if res, ok := f[url]; ok {fmt.Printf("found:   %s\\n", url)return res.urls, nil}// 如果不存在,返回错误 not found: <url>。fmt.Printf("missing: %s\\n", url)return nil, fmt.Errorf("not found: %s", url)
}// fetcher 实际上是一个被填充了内容的 fakeFetcher
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{"<http://golang.org/>": &fakeResult{"The Go Programming Language",[]string{"<http://golang.org/pkg/>","<http://golang.org/cmd/>",},},"<http://golang.org/pkg/>": &fakeResult{"Packages",[]string{"<http://golang.org/>","<http://golang.org/cmd/>","<http://golang.org/pkg/fmt/>","<http://golang.org/pkg/os/>",},},"<http://golang.org/pkg/fmt/>": &fakeResult{"Package fmt",[]string{"<http://golang.org/>","<http://golang.org/pkg/>",},},"<http://golang.org/pkg/os/>": &fakeResult{"Package os",[]string{"<http://golang.org/>","<http://golang.org/pkg/>",},},
}// 单线程顺序执行爬虫
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {// 如果这个 url 已经访问过,直接 returnif fetched[url] {return}// 更新 map,表示这个 url 已经访问过fetched[url] = true// 调用 Fetch 爬取 url 中的 urlsurls, err := fetcher.Fetch(url)// 如果出错,返回if err != nil {return}// 若不出错,则对 urls 递归调用调用 Serialfor _, u := range urls {Serial(u, fetcher, fetched)}return
}func main() {fmt.Printf("=== Serial===\\n") // 打印序列字符串// 第一个参数是要爬取的网址// 第二个参数是 模拟的网络内容物// 第三个参数是一个空 map (字典)Serial("<http://golang.org/>", fetcher, make(map[string]bool))
}
2、并发执行版本:(使用互斥锁)
  • 使用互斥锁来保护对fetched变量的并发访问保护
  • 除了使用互斥锁以外,和顺序版本差不多,比较核心的是使用了WaitGroup,
    • 在每个线程开始时,调用done.Add(1)
    • 每个线程结束时,调用done.Done()
    • 然后在主线程里调用done.wait(),等待所有的线程返回后主线程才会结束
type fetchState struct {mu      sync.Mutexfetched map[string]bool
}
func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {f.mu.Lock()already := f.fetched[url]f.fetched[url] = truef.mu.Unlock()if already {return}urls, err := fetcher.Fetch(url)if err != nil {return}var done sync.WaitGroup  //使用waitGroup跟踪你有多少活动的进程,何时可以中止for _, u := range urls {done.Add(1)go func(u string) {defer done.Done()ConcurrentMutex(u, fetcher, f)}(u)  // (u) 是立即调用匿名函数并传递参数 u 的语法。// 如果不加 (u) 直接使用循环变量 u,所有 goroutine 会共享同一个 u 的引用}done.Wait()return
}func main() {fmt.Printf("=== ConcurrentMutex ===\\n")ConcurrentMutex("<http://golang.org/>", fetcher, makeState())
}
3、并发执行版本(使用信道)
  • 该版本采用了类似mapreduce的框架,使用协调者和工作者的结构
  • ConcurrentChannel函数,首先创建一个接收字符串数组的信道,并将初始要爬取的字符串数组赋值给信道,并且调用协调者函数,协调者函数创建一个fetched映射,然后第一个循环代表不断的从信道里获取url数组,如果信道没有东西则会阻塞。
  • 内部的循环,对于每一个url都启动一个worker线程去进行爬取,worker线程爬取完后还会将新获得的需要爬取的网址塞入信道中,让主线程的coordinator获取,同时,coordinator中使用n来跟踪当前正在执行的worker的数量,如果所有worker都结束执行了,则程序返回。
//
// Concurrent crawler with channels
//func worker(url string, ch chan []string, fetcher Fetcher) {// 获取参数 url 下的所有 urls,并返回urls, err := fetcher.Fetch(url)if err != nil {ch <- []string{}} else {ch <- urls}
}func coordinator(ch chan []string, fetcher Fetcher) {n := 1// 创建一个fetched映射fetched := make(map[string]bool)// 第一个循环代表不断的从信道里获取url数组,如果信道没有东西则会阻塞。for urls := range ch {for _, u := range urls {// 只爬取已经被爬取过的 urlif fetched[u] == false {// 遇到没被爬取过的,标记为 true,表示已被爬取fetched[u] = true// 表示正在被爬取的 url 数量 + 1n += 1// 对于每一个url都启动一个worker线程去进行爬取go worker(u, ch, fetcher)}}// 表示正在被爬取的 url 数量 - 1n -= 1// 当没有正在被爬取的 url 时,退出循环if n == 0 {break}}
}func ConcurrentChannel(url string, fetcher Fetcher) {// 创建一个接收字符串数组的信道 channelch := make(chan []string)// 将要爬取的第一个字符串数组赋值给信道go func() {ch <- []string{url}}()// 调用协调者函数coordinator(ch, fetcher)
}func main() {// 打印日志fmt.Printf("=== ConcurrentChannel ===\\n")// 调用并发爬虫,第一个参数是要爬取的网页,第二个参数是模拟网页库ConcurrentChannel("<http://golang.org/>", fetcher)
}

六、RPC:(TODO: here)

1、目标:

  • 调用者能够像调用本地栈内的过程函数一样,调用远程服务器上的函数。

2、如何工作?

  • 客户端有一个Stub,负责将调用的函数,参数等进行序列化,并传输给服务器上的Stub
  • 服务器上的Stub进行反序列化,然后调用服务器本地的函数,并通过反向的类似的过程返回给客户端的Stub,Stub解析后返回给调用者。

在这里插入图片描述

3、在Go中如何使用?(以Key/Value 存储为例)

  • 这段代码实现了一个简单的分布式键值存储系统,其中包括客户端和服务器端两部分。客户端可以通过 RPC 调用服务器端的方法来进行数据存储和查询。
  • 客户端侧:
    • 定义了 PutArgs 和 PutReply 两个类型,分别表示客户端调用 Put 方法的参数和返回值;GetArgs 和 GetReply 两个类型,分别表示客户端调用 Get 方法的参数和返回值。
    • 实现了 connect、get 和 put 三个函数。connect 函数用于连接服务器端的 RPC 服务;get 和 put 函数分别用于从服务器端获取数据和向服务器端存储数据。
  • 服务器侧:
    • 代码实现了一个 KV 类型,它包含一个互斥锁和一个 map 类型的数据成员,表示存储的键值对数据。KV 类型实现了 Get 和 Put 两个方法,分别用于获取和存储数据。这两个方法都使用互斥锁来保证并发访问时的数据安全。
    • 接着,代码实现了一个 server 函数,它用于创建一个 RPC 服务器并注册 KV 对象。server 函数使用 TCP 协议监听端口 1234,并在收到客户端请求时调用 rpcs.ServeConn 方法处理请求。
  • 最后,在 main 函数中,代码调用 server 函数启动服务器端,并向服务器端存储了一个键值对 “subject”-“6.824”。然后,代码调用 get 函数从服务器端获取键值对 “subject” 的值,并将其输出到控制台。
package main// 导入所需的相关库
import ("fmt""log""net""net/rpc""sync"
)//
// Common RPC request/reply definitions
//
// 以下是比较 common 的 客户端 request/reply 相关定义,这里使用 KV 键值对操作
// Put 操作参数有两个,key: string 和 value: string
type PutArgs struct {Key   stringValue string
}// Put 操作没有返回值
type PutReply struct {
}// Get 操作参数有一个,key: string
type GetArgs struct {Key string
}// Get 操作有一个返回值 value: string
type GetReply struct {Value string
}//
// Client
//
// TODO: here
- 客户端侧:- 定义了 PutArgs 和 PutReply 两个类型,分别表示客户端调用 Put 方法的参数和返回值;GetArgs 和 GetReply 两个类型,分别表示客户端调用 Get 方法的参数和返回值。- 实现了 connect、get 和 put 三个函数。connect 函数用于连接服务器端的 RPC 服务;get 和 put 函数分别用于从服务器端获取数据和向服务器端存储数据。
func connect() *rpc.Client {client, err := rpc.Dial("tcp", ":1234")if err != nil {log.Fatal("dialing:", err)}return client
}func get(key string) string {client := connect()args := GetArgs{"subject"}reply := GetReply{}err := client.Call("KV.Get", &args, &reply)if err != nil {log.Fatal("error:", err)}client.Close()return reply.Value
}func put(key string, val string) {client := connect()args := PutArgs{"subject", "6.824"}reply := PutReply{}err := client.Call("KV.Put", &args, &reply)if err != nil {log.Fatal("error:", err)}client.Close()
}//
// Server
//
// 
// 定义了一个 KV 数据类型,里面包含一个互斥锁,和一个 K/V map 数据结构
type KV struct {mu   sync.Mutexdata map[string]string
}
TODO: here
- 服务器侧:- 代码实现了一个 KV 类型,它包含一个互斥锁和一个 map 类型的数据成员,表示存储的键值对数据。KV 类型实现了 Get 和 Put 两个方法,分别用于获取和存储数据。这两个方法都使用互斥锁来保证并发访问时的数据安全。- 接着,代码实现了一个 server 函数,它用于创建一个 RPC 服务器并注册 KV 对象。server 函数使用 TCP 协议监听端口 1234,并在收到客户端请求时调用 rpcs.ServeConn 方法处理请求。
func server() {// 初始化一个 KV 类型kv := new(KV)// 初始化 kv.data 数据结构kv.data = map[string]string{}// 注册一个 RPC 服务,里面装着 kv 数据rpcs := rpc.NewServer()rpcs.Register(kv)// 监听 1234 端口l, e := net.Listen("tcp", ":1234")if e != nil {log.Fatal("listen error:", e)}// 开启一个新的 go 线程go func() {for {conn, err := l.Accept()if err == nil {go rpcs.ServeConn(conn)} else {// 如果发生错误,停止 server 线程break}}l.Close()}()
}func (kv *KV) Get(args *GetArgs, reply *GetReply) error {kv.mu.Lock()defer kv.mu.Unlock()reply.Value = kv.data[args.Key]return nil
}func (kv *KV) Put(args *PutArgs, reply *PutReply) error {kv.mu.Lock()defer kv.mu.Unlock()kv.data[args.Key] = args.Valuereturn nil
}//
// main
//// 这段代码实现了一个简单的分布式键值存储系统,其中包括客户端和服务器端两部分。
// 客户端可以通过 RPC 调用服务器端的方法来进行数据存储和查询。// 在 main 函数中,代码调用 server 函数启动服务器端,并向服务器端存储了一个键值对 "subject"-"6.824"。
// 然后,代码调用 get 函数从服务器端获取键值对 "subject" 的值,并将其输出到控制台。
func main() {server()put("subject", "6.824")fmt.Printf("Put(subject, 6.824) done\\n")fmt.Printf("get(subject) -> %s\\n", get("subject"))
}

4、RPC semantics under failures:

  • at-least-once:客户端重复尝试请求
  • at-most-once:客户端最多请求1次,服务器执行0次或1次(Go的RPC系统是这种)
  • exactly-once:非常困难,开销较大,需要状态管理

看原视频补充

在这里插入图片描述
Go 的线程运行在一个运行时环境里,所有线程共享一块内存,每个线程有自己的 PC, stack, registers 等 (这些是存放在内存中的)。
此外,go 语言支持线程的 start/go, exit, stop, resume 等操作。

在这里插入图片描述
为什么要在 go 中使用 threads ? 是为了表达并行执行,包括:
1.I/O 并行
2.多核并行
3.方便地表达并行

在这里插入图片描述
编写 go 线程可能会遇到的挑战:

  • Race conditions 数据竞争
    • 解决方案1:避免数据共享
    • 方案2:使用锁
    • 方案3:使用 go 提供的 race detector
  • Coordination 协调 (比如 一个线程等待另一个线程完成)
    • 通道或者条件变量
  • 可能的死锁情况

在这里插入图片描述
通常而言,为了解决上述挑战,通常采用两种方案:
1.通道
2.锁 + 条件变量

TODO: here


http://www.mrgr.cn/news/96496.html

相关文章:

  • Windows10上部署DeepSeek+RAG知识库操作详解(Dify方式)之1
  • 漏洞挖掘---顺景ERP-GetFile任意文件读取漏洞
  • C++学习之Linux文件编译、调试及库制作
  • 红宝书第二十一讲:详解JavaScript的模块化(CommonJS与ES Modules)
  • C++学习之路:指针基础
  • Windows11系统下python虚拟环境管理独家心得
  • uniapp选择文件使用formData格式提交数据
  • opencv图像处理之指纹验证
  • Java EE(18)——网络原理——应用层HTTP协议
  • leetcode 28 Find the Index of the First Occurrence in a String
  • Jmeter的压测使用
  • 从PDF到精准答案:Coze助力RAGFlow框架提升数据召回率
  • 源码刨析与插入实现:RBT比AVL强在何处?
  • SpringBoot实现RBAC权限校验模型
  • C++进阶——位图+布隆过滤器+海量数据处理
  • 小学数学解题方法专题3-列表法-提升2
  • 3.27学习总结 爬虫+二维数组+Object类常用方法
  • RocketMQ - 从消息可靠传输谈高可用
  • 在Qt中判断输入的js脚本是否只包含函数
  • fluent_UDF学习笔记