实现线程同步的方法
实现线程同步的方法
文章目录
1.同步概念
同步
所谓同步,即同时起步,协调一致。
不同的对象,对“同步”的理解方式略有不同。如,设备同步,是指在两个设备之间规定一个共同的时间参考;数据库同步,是指让两个或多个数据库内容保持一致,或者按需要部分保持一致;文件同步,是指让两个或多个文件夹里的文件保持一致。等等。
而,编程中、通信中所说的同步与生活中大家印象中的同步概念略有差异。“同”字应是指协同、协助、互相配合。主旨在协同步调,按预定的先后次序运行。
线程同步
线程同步,指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其它线程为保证数据
一致性,不能调用该功能。
举例1:银行存款 5000。柜台,折:取3000;提款机,卡:取3000。剩余:2000
举例2:内存中100字节,线程T1欲填入全1,线程T2欲填入全0。但如果T1执行了50个字节失去cpu,T2
执行,会将T1写过的内容覆盖。当T1再次获得cpu继续 从失去cpu的位置向后写入1,当执行结束,内存中的
100字节,既不是全1,也不是全0。
产生的现象叫做“与时间有关的错误”(time related)。
为了避免这种数据混乱,线程需要同步。
“同步”的目的,是为了避免数据混乱,解决与时间有关的错误。实际上,不仅线程间需要同步,进程间、信
号间等等都需要同步机制。因此,所有“多个控制流,共同操作一个共享资源”的情况,都需要同步。
2.互斥锁
1.概述
数据混乱原因:
-
资源共享(独享资源则不会)
-
调度随机(意味着数据访问会出现竞争)
-
线程间缺乏必要的同步机制。
以上 3 点中,前两点不能改变,欲提高效率,传递数据,资源必须共享。只要共享资源,就一定会出现竞争。只要存在竞争关系,数据就很容易出现混乱。所以只能从第三点着手解决。使多个线程在访问共享资源的时候,出现互斥。
互斥锁(也称互斥量)用于保护关键代码段,以确保其独占式的访问,这有些像二进制信号量(信号量),当进入关键代码段时,我们需要获得互斥锁并将其加锁,这等价于二进制信号量的P操作;当离开关键代码段时,我们需要对互斥锁解锁,以唤醒其他等待该互斥锁的线程,这相当于二进制信号量的V操作。
注意:同一时刻,只能有一个线程持有该锁
当 A 线程对某个全局变量加锁访问,B 在访问前尝试加锁,拿不到锁,B 阻塞。C 线程不去加锁,而直接访问该全局变量,依然能够访问,但会出现数据混乱。
所以,互斥锁实质上是操作系统提供的一把“建议锁”(又称“协同锁”),建议程序中有多线程访问共享资源的时候使用该机制。但,并没有强制限定。
因此,即使有了 mutex,如果有线程不按规则来访问数据,依然会造成数据混乱。
2.互斥锁基础 API
#include <pthread.h>
int pthread_mutex_init ( pthread_mutex_t* mutex, /* 初始化互斥锁 */const pthread_mutexattr_t* mutexattr );
int pthread_mutex_destory ( pthread_mutex_t* mutex ); /* 销毁互斥锁 */
int pthread_mutex_lock ( pthread_mutex_t* mutex ); /* 以原子操作的方式给一个互斥锁加锁 */
int pthread_mutex_trylock ( pthread_mutex_t* mutex ); /* 相当于 pthread_mutex_lock 的非阻塞版本 ,非阻塞轮询*/
int pthread_mutex_unlock ( pthread_mutex_t* mutex ); /* 以原子操作的方式给一个互斥锁解锁 */
以上5个函数的返回值都是:成功返回0,失败返回错误号。
pthread_mutex_t 类型,其本质是一个结构体。为简化理解,应用时可忽略其实现细节,简单当成整数看待。
pthread_mutex_t mutex;
变量 mutex只有两种取值1、0
使用mutex(互斥量、互斥锁)一般步骤:
pthread_mutex_t 类型。 1. pthread_mutex_t lock; 创建锁2 pthread_mutex_init; 初始化 13. pthread_mutex_lock;加锁 1-- --> 04. 访问共享数据(stdout) 5. pthrad_mutext_unlock();解锁 0++ --> 16. pthead_mutex_destroy;销毁锁
1.初始化和销毁
int pthread_mutex_init ( pthread_mutex_t* mutex, /* 初始化互斥锁 */const pthread_mutexattr_t* mutexattr );
参数:
mutex:咱们创建的锁
mutexattr:锁的属性
int pthread_mutex_destory ( pthread_mutex_t* mutex ); /* 销毁互斥锁 */
参数同上
restrict关键字,用来限定指针变量。被该关键字限定的指针变量所指向的内存操作,必须由本指针完成。
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL); //动态初始化。
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //静态初始化。
2.加锁解锁
int pthread_mutex_lock ( pthread_mutex_t* mutex ); /* 以原子操作的方式给一个互斥锁加锁 */
int pthread_mutex_trylock ( pthread_mutex_t* mutex ); /* 相当于 pthread_mutex_lock 的非阻塞版本 ,非阻塞轮询*/
int pthread_mutex_unlock ( pthread_mutex_t* mutex ); /* 以原子操作的方式给一个互斥锁解锁 */
参数都是我们创建的锁
3.使用案例
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>pthread_mutex_t mutex;void err_thread(int ret, char *str)
{if (ret != 0) {fprintf(stderr, "%s:%s\n", str, strerror(ret));pthread_exit(NULL);}
}void *tfn(void *arg)
{srand(time(NULL));while (1) {pthread_mutex_lock(&mutex);printf("hello ");sleep(rand() % 3); /*模拟长时间操作共享资源,导致cpu易主,产生与时间有关的错误*/printf("world\n");pthread_mutex_unlock(&mutex);sleep(rand() % 3);}return NULL;
}int main(void)
{int flag = 5;pthread_t tid;srand(time(NULL));pthread_mutex_init(&mutex, NULL);pthread_create(&tid, NULL, tfn, NULL);while (flag--) {pthread_mutex_lock(&mutex);printf("HELLO ");sleep(rand() % 3);printf("WORLD\n");pthread_mutex_unlock(&mutex);sleep(rand() % 3);}pthread_cancel(tid); // 将子线程杀死,子线程中自带取消点pthread_join(tid, NULL);pthread_mutex_destroy(&mutex);return 0; //main中的return可以将整个进程退出
}
4.注意事项
1.尽量保证锁的粒度, 越小越好。(访问共享数据前,加锁。访问结束立即解锁。)
2.互斥锁,本质是结构体。 我们可以看成整数。 可以认为是初值为 1。(pthread_mutex_init() 函数调用成功。)
3.加锁: --操作, 阻塞线程。
4.解锁: ++操作, 换醒阻塞在锁上的线程。
5.try锁
int pthread_mutex_trylock ( pthread_mutex_t* mutex ); /* 相当于 pthread_mutex_lock 的非阻塞版本 ,非阻塞轮询*/
lock加锁失败会阻塞,等待锁释放
trylock不断尝试加锁,加锁失败直接返回错误号(如:EBUSY),不阻塞
3.互斥锁属性
pthread_mutex_t
结构体描述互斥锁的属性,线程库提供了一系列函数来操作pthread_mutexattr_t
类型的变量,以方便我们获取和设置互斥锁属性,以下是其中一些主要的函数:
#include <pthread.h>/* 初始化互斥锁属性对象 */
int pthread_mutexattr_init ( pthread_mutexattr_t* attr );/* 销毁互斥锁属性对象 */
inrt pthread_mutexattr_destroy ( pthread_mutexattr_t* attr );/* 获取和设置互斥锁的 pshared 属性 */
int pthread_mutexattr_getpshared ( const pthread_mutexattr_t* attr, int* pshared );
int pthread_muextattr_setpshared ( pthread_mutexattr_t* attr, int* pshared );/* 获取和设置互斥锁的 type 属性 */
int pthread_mutexattr_gettype ( const pthread_mutexattr_t* attr, int* type );
int pthread_mutexattr_settype ( pthread_mutexattr_t* attr, int* type );
本书仅讨论互斥锁的两种常用属性:pshared
和type
。
互斥锁属性pshared
指定是否允许跨进程共享互斥锁,其可选值为:
PTHREAD_PROCESS_SHARED
:互斥锁可以被跨进程共享。PTHREAD_PROCESS_PRIVATE
:互斥锁只能和锁的初始化线程隶属于同一个进程的线程共享。
互斥锁属性type
指定互斥锁的类型,Linux支持以下4种互斥锁:
PTHREAD_MUTEX_NORMAL
:普通锁,这是互斥锁的默认类型。当一个线程对一个普通锁加锁后,其余请求该锁的线程将形成一个等待队列,并在该锁解锁后按优先级获得它。这种锁类型保证了资源分配的公平性,但也容易引发问题:一个线程如果对一个已经加锁的普通锁再次加锁,将引发死锁;对一个已经被其他线程加锁的普通锁解锁,或者对一个已经解锁的普通锁再次解锁,将导致不可预期的后果。PTHREAD_MUTEX_ERRORCHECK
:检错锁。一个线程如果对一个自己加锁的检错锁再次加锁,则加锁操作返回EDEADLK
。对一个已经被其他线程加锁的检错锁解锁,或对一个已经解锁的检错锁再次解锁,则解锁操作返回EPERM
。PTHREAD_MUTEX_RECURSIVE
:嵌套锁。这种锁允许一个线程在释放锁前多次对它加锁而不发生死锁,但如果其他线程要获得这个锁,则当前锁的拥有者必须执行相应次数的解锁操作。对一个已经被其他线程加锁的嵌套锁解锁,或对一个已经解锁的嵌套锁再次解锁,则解锁操作将返回EPERM
。PTHREAD_MUTEX_DEFAULT
:默认锁。通常被映射为以上三种锁之一。
4.死锁
是使用锁不恰当造成的现象:
-
线程试图对同一个互斥量A加锁两次。
-
线程1拥有A锁,请求获得B锁;线程2拥有B锁,请求获得A锁
死锁使一个或多个线程被挂起而无法继续执行,且这种情况还不容易被发现。
在一个线程中对一个已经加锁的普通锁再次加锁将导致死锁。另外,如果两个线程按照不同顺序来申请两个互斥锁,也容易产生死锁,如以下代码所示:
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>int a = 0;
int b = 0;
pthread_mutex_t mutex_a;
pthread_mutex_t mutex_b;void *another (void *arg) {pthread_mutex_lock(&mutex_b); /* 子线程上锁 mutex_b */printf("in child thread, got mutex b, waiting for mutex a\n");sleep(5);++b;pthread_mutex_lock(&mutex_a); /* 子线程上锁 mutex_a */b += a++;pthread_mutex_unlock(&mutex_a); /* 解锁 */pthread_mutex_unlock(&mutex_b);pthread_exit(NULL);
}int main () {pthread_t id;pthread_mutex_init(&mutex_a, NULL); /* 初始化互斥锁 */pthread_mutex_init(&mutex_b, NULL);pthread_create(&id, NULL, another, NULL); /* 创建线程 */pthread_mutex_lock(&mutex_a); /* 主线程上锁 mutex_a */printf("in parent thread, got mutex a, waiting for mutex b\n");sleep(5);++a;pthread_mutex_lock(&mutex_b); /* 主线程上锁 mutex_b */a += b++;pthread_mutex_unlock(&mutex_b);pthread_mutex_unlock(&mutex_a);/* 主线程等待子线程结束,然后销毁互斥锁以释放资源 */pthread_join(id, NULL);pthread_mutex_destroy(&mutex_a);pthread_mutex_destroy(&mutex_b);return 0;
}
由于两个线程都在等待对方已经持有的锁释放,因此会发生死锁,两个线程都将永远等待下去。 为了避免死锁,应确保所有线程以相同的顺序获取互斥锁。
编译:-lpthread
选项确保链接了 POSIX 线程库。
g++ -o test test.cpp -lpthread
3.读写锁
1.原理
1.锁只有一把。以读方式给数据加锁,那锁就是读锁,以写方式给数据加锁,那锁就是写锁。
2.读共享,写独占。
3.写锁优先级高。
-
如果有五个进程同时请求锁,1个写请求4个读请求,那么优先给写锁。
-
如果4个读请求比写请求先到,并且已经加锁成功,那么不会断开读请求的进程给写请求的进程锁的。
-
**如果读锁和写锁在同一队列阻塞等待,那么优先给写锁:**如果进程1是读进程已经加锁成功在读了,后边同时来了3个进程,2,4进程写请求,3进程读请求,这个时候是这样的:1读完之后,2和4写,写完之后3再读(写优先级高)
2.特性
-
读写锁是“写模式加锁”时,解锁前,所有对该锁加锁的线程都会被阻塞。
-
读写锁是“读模式加锁”时,
-
读写锁是“读模式加锁”时,既有试图以写模式加锁的线程,也有试图以读模式加锁的线程。那么读写锁
会阻塞随后的读模式锁请求。优先满足写模式锁。读锁、写锁并行阻塞,写锁优先级高
**读写锁也叫共享-独占锁。**当读写锁以读模式锁住时,它是以共享模式锁住的;当它以写模式锁住时,它是以独
占模式锁住的。写独占、读共享。
读写锁非常适合于对数据结构读的次数远大于写的情况。
如果线程以读模式对其加锁会成功;如果线程以写模式加锁会阻塞。
相较于互斥量而言,当读线程多的时候,提高访问效率
3.对应函数
pthread_rwlock_t rwlock;pthread_rwlock_init(&rwlock, NULL);pthread_rwlock_rdlock(&rwlock);
pthread_rwlock_wrlock(&rwlock); pthread_rwlock_tryrdlock(&rwlock);
pthread_rwlock_trywrlock(&rwlock); pthread_rwlock_unlock(&rwlock);
pthread_rwlock_destroy(&rwlock);
以上都是成功返回0失败返回错误号
pthread_rwlock_t类型 用于定义一个读写锁变量。
pthread rwlock t rwlock;
1.初始化和销毁
pthread_rwlock_init 函数
初始化一把读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
参 2:attr 表读写锁属性,通常使用默认属性,传 NULL 即可。
pthread_rwlock_destroy 函数
销毁一把读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
2.加锁解锁
加锁
pthread_rwlock_rdlock 函数
以读方式请求读写锁。(常简称为:请求读锁)
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
pthread_rwlock_wrlock 函数
以写方式请求读写锁。(常简称为:请求写锁)
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
pthread_rwlock_tryrdlock 函数
非阻塞以读方式请求读写锁(非阻塞请求读锁)
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
pthread_rwlock_trywrlock 函数
非阻塞以写方式请求读写锁(非阻塞请求写锁)
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
解锁
pthread_rwlock_unlock 函数
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
4.示例
/* 3个线程不定时 "写" 全局资源,5个线程不定时 "读" 同一全局资源 */#include <stdio.h>
#include <unistd.h>
#include <pthread.h>int counter; //全局资源
pthread_rwlock_t rwlock;void *th_write(void *arg)
{int t;int i = (int)arg;while (1) {t = counter; // 保存写之前的值usleep(1000);pthread_rwlock_wrlock(&rwlock);printf("=======write %d: %lu: counter=%d ++counter=%d\n", i, pthread_self(), t, ++counter);pthread_rwlock_unlock(&rwlock);usleep(9000); // 给 r 锁提供机会}return NULL;
}void *th_read(void *arg)
{int i = (int)arg;while (1) {pthread_rwlock_rdlock(&rwlock);printf("----------------------------read %d: %lu: %d\n", i, pthread_self(), counter);pthread_rwlock_unlock(&rwlock);usleep(2000); // 给写锁提供机会}return NULL;
}int main(void)
{int i;pthread_t tid[8];pthread_rwlock_init(&rwlock, NULL);for (i = 0; i < 3; i++)pthread_create(&tid[i], NULL, th_write, (void *)i);for (i = 0; i < 5; i++)pthread_create(&tid[i+3], NULL, th_read, (void *)i);for (i = 0; i < 8; i++)pthread_join(tid[i], NULL);pthread_rwlock_destroy(&rwlock); //释放读写琐return 0;
}
4.条件变量
如果说互斥锁是用于同步线程对共享数据的访问,那么条件变量则是用于在线程之间同步共享数据的值。条件变量提供了一种线程间的通知机制:当某个共享数据达到某个值时,唤醒等待这个共享数据的线程。
条件变量是多线程编程中用于同步的一种机制,它允许线程在某些条件未被满足时暂停执行,并在条件满足时被唤醒继续执行。条件变量通常与互斥锁(mutexes)一起使用,以协调对共享资源的访问。这是一种避免忙等(busy-waiting)并减少CPU资源浪费的有效方式。
1.工作原理
- 等待条件变量:当线程需要访问某个共享资源,但条件不满足时,它会通过互斥锁保护条件变量,并在该条件变量上等待。在这个等待过程中,线程会释放互斥锁,以便其他线程可以修改这个条件。
- 唤醒等待的线程:其他线程在修改了条件之后,可以通过条件变量来唤醒一个或多个正在等待这个条件的线程。
- 重新检查条件:被唤醒的线程会重新获取互斥锁,并再次检查条件是否满足。如果条件满足,线程继续执行;如果不满足,线程可能会再次等待。
2.对应函数
1.总览
条件变量的相关函数如下:
#incldue <pthread.h>/* 初始化条件变量 */
int pthread_cond_init (pthread_cond_t* cond, const pthread_condattr_t* cond_attr);/* 销毁条件变量 */
int pthread_cond_destroy (pthread_cond_t* cond);/* 以广播方式唤醒所有等待目标条件的线程 */
int pthread_cond_broadcast (pthread_cond_t* cond);/* 唤醒一个等待目标条件变量的线程,唤醒哪个线程取决于线程的优先级和调度策略 */
int pthread_cond_signal (pthread_cond_t* cond);/* 等待目标条件变量 */
int pthread_cond_wait (ptread_cond_t* cond, pthread_mutex_t* mutex);/*限时等待一个条件变量*/
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
pthread_cond_t 类型 用于定义条件变量
pthread_cond_t cond;
成功返回0失败直接返回错误号
pthread_cond_signal(): 唤醒阻塞在条件变量上的 (至少)一个线程。
pthread_cond_broadcast(): 唤醒阻塞在条件变量上的 所有线程。
2.创建和销毁
创建
/* 初始化条件变量 */
int pthread_cond_init (pthread_cond_t* cond, const pthread_condattr_t* cond_attr);
参 2:attr 表条件变量属性,通常为默认值,传 NULL 即可
也可以使用静态初始化的方法,初始化条件变量:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
销毁
/* 销毁条件变量 */
int pthread_cond_destroy (pthread_cond_t* cond);
3.wait函数
阻塞等待一个条件变量
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
参数
cond:条件变量
mutex:互斥锁
函数作用:
-
阻塞等待条件变量 cond(参 1)满足
-
释放已掌握的互斥锁(解锁互斥量)相当于 pthread_mutex_unlock(&mutex);
**1.2.**两步为一个原子操作。
- 当被唤醒,pthread_cond_wait 函数返回时,解除阻塞并重新申请获取互斥锁 pthread_mutex_lock(&mutex);
这张图是对wait过程的说明:
1.锁是提前创建和初始化好的,然后加锁
2.调用pthread_cond_wait,看条件变量是否满足
3.不满足就阻塞等待,阻塞等待的时候就把锁给解了,让别人用去了(判断阻塞和解锁这两步是一个原子操作)
4.等到满足条件变量满足的时候,再申请重新加锁(重新加锁是条件变量内部实现,不需要咱们自己加锁)
4.pthread_cond_timedwait 函数
限时等待一个条件变量
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
参 3:
参看 man sem_timedwait 函数,查看 struct timespec 结构体。
struct timespec {
time_t tv_sec;
/* seconds */ 秒
long tv_nsec;
/* nanosecondes*/ 纳秒
}
形参 abstime:绝对时间。
如:time(NULL)返回的就是绝对时间。而 alarm(1)是相对时间,相对当前时间定时 1 秒钟。
struct timespec t = {1, 0};
pthread_cond_timedwait (&cond, &mutex, &t); 只能定时到 1970 年 1 月 1 日 00:00:01 秒(早已经过去)
正确用法:
time_t cur = time(NULL); //获取当前时间。struct timespec t; //定义 timespec 结构体变量 tt.tv_sec = cur+1; //定时 1 秒pthread_cond_timedwait (&cond, &mutex, &t); //传参
在讲解 setitimer 函数时我们还提到另外一种时间类型:
struct timeval {
time_t tv_sec; /* seconds */ 秒
susecods_t tv_usec; /* microseconds */ 微秒
};
3.使用条件变量模拟实现生产者—消费者问题
流程
完整代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>void err_thread(int ret, char *str)
{if (ret != 0) {fprintf(stderr, "%s:%s\n", str, strerror(ret));pthread_exit(NULL);}
}//魔方公共区域的链表
struct msg {int num;struct msg *next;
};struct msg *head;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 定义/初始化一个互斥量
pthread_cond_t has_data = PTHREAD_COND_INITIALIZER; // 定义/初始化一个条件变量void *produser(void *arg)
{while (1) {struct msg *mp = malloc(sizeof(struct msg));mp->num = rand() % 1000 + 1; // 模拟生产一个数据`printf("--produce %d\n", mp->num);pthread_mutex_lock(&mutex); // 加锁 互斥量mp->next = head; // 写公共区域head = mp;pthread_mutex_unlock(&mutex); // 解锁 互斥量pthread_cond_signal(&has_data); // 唤醒阻塞在条件变量 has_data上的线程.sleep(rand() % 3);}return NULL;
}void *consumer(void *arg)
{while (1) {struct msg *mp;pthread_mutex_lock(&mutex); // 加锁 互斥量while (head == NULL) {pthread_cond_wait(&has_data, &mutex); // 阻塞等待条件变量, 解锁} // pthread_cond_wait 返回时, 重新加锁 mutexmp = head;head = mp->next;pthread_mutex_unlock(&mutex); // 解锁 互斥量printf("---------consumer id: %lu :%d\n", pthread_self(), mp->num);free(mp);sleep(rand()%3);}return NULL;
}int main(int argc, char *argv[])
{int ret;pthread_t pid, cid;srand(time(NULL));ret = pthread_create(&pid, NULL, produser, NULL); // 生产者if (ret != 0) err_thread(ret, "pthread_create produser error");ret = pthread_create(&cid, NULL, consumer, NULL); // 消费者if (ret != 0) err_thread(ret, "pthread_create consuer error");ret = pthread_create(&cid, NULL, consumer, NULL); // 消费者if (ret != 0) err_thread(ret, "pthread_create consuer error");ret = pthread_create(&cid, NULL, consumer, NULL); // 消费者if (ret != 0) err_thread(ret, "pthread_create consuer error");pthread_join(pid, NULL);pthread_join(cid, NULL);return 0;
}
在公共区域为空的时候,消费者都会阻塞等待hasdata这个条件变量,都会把锁释放掉,而生产者这时会拿到锁进行生产,生产者生产完唤醒消费者进行消费。
4.条件变量优势
相较于 mutex 而言,条件变量可以减少竞争。
如直接使用 mutex,除了生产者、消费者之间要竞争互斥量以外,消费者之间也需要竞争互斥量,但如果汇(链表)中没有数据,消费者之间竞争互斥锁是无意义的。有了条件变量机制以后,只有生产者完成生产,才会引起消费者之间的竞争。提高了程序效率。
5.POSIX 信号量
1.概述
在Linux上,信号量API有两组,一组是System V IPC信号量(信号量),另一组是我们要讨论的POSIX信号量。这两组接口很相似,且语义完全相同,但不保证能互换。
进化版的互斥锁(1 --> N)
由于互斥锁的粒度比较大,如果我们希望在多个线程间对某一对象的部分数据进行共享,使用互斥锁是没有办
法实现的,只能将整个数据对象锁住。这样虽然达到了多线程操作共享数据时保证数据正确性的目的,却无形中导
致线程的并发性下降。线程从并行执行,变成了串行执行。与直接使用单进程无异。
信号量,是相对折中的一种处理方式,既能保证同步,数据不混乱,又能提高线程并发
POSIX信号量函数的名字都以sem_
开头,不像大多线程函数那样以pthread_
开头。常用的POSIX信号量函数如下:
#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value); /* 初始化一个未命名信号量 */
int sem_destory(sem_t* sem); /* 销毁信号量 */
int sem_wait(sem_t* sem); /* 以原子操作的方式将信号量的值减1 */
int sem_trywait(sem_t *sem); /* 相当于sem_wait函数的非阻塞版本 */
int sem_post(sem_t *sem); /* 以原子操作的方式将信号量的值加1 */
上图中函数的第一个参数sem
指向被操作的信号量。
sem_init
函数用于初始化一个未命名信号量(POSIX信号量API支持命名信号量,但本书不讨论)。pshared
参数指定信号量类型,如果值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量可以在多个进程间共享。value
参数指定信号量的初始值。初始化一个已经被初始化的信号量将导致不可预期的结果。sem_destroy
函数用于销毁信号量,以释放其占用的内核资源,销毁一个正被其他线程等待的信号量将导致不可预期的结果。sem_wait
函数以原子操作的方式将信号量的值减1,如果信号量的值为0,则sem_wait
函数将被阻塞,直到这个信号量具有非0值。sem_trywait
函数与sem_wait
函数类似,但它始终立即返回,而不论被操作的信号量是否具有非0值,相当于sem_wait
函数的非阻塞版本。当信号量非0时,sem_trywait
函数对信号量执行减1操作,当信号量的值为0时,该函数返回-1并设置errno为EAGAIN
。sem_post
函数以原子操作的方式将信号量的值加1,当信号量的值从0变为1时,其他正在调用sem_wait
等待信号量的线程将被唤醒。
上图中的函数成功时返回0,失败则返回-1并设置errno。
2.对应函数
1.总览
#include<semaphore.h>
sem_t sem;
sem_init 函数
sem_destroy 函数
sem_wait 函数
sem_trywait 函数
sem_timedwait 函数
sem_post 函数
以上 6 个函数的返回值都是:成功返回 0, 失败返回-1,同时设置 errno。(注意,它们没有 pthread 前缀)
sem_t 类型,本质仍是结构体。但应用期间可简单看作为整数,忽略实现细节(类似于使用文件描述符)。
规定信号量 sem 不能 < 0。
信号量基本操作:
sem_wait:
1.信号量大于 0,则信号量-- (类比 pthread_mutex_lock)
2.信号量等于 0,造成线程阻塞
对应
sem_post:
将信号量++,同时唤醒阻塞在信号量上的线程 (类比 pthread_mutex_unlock)
但,由于 sem_t 的实现对用户隐藏,所以所谓的++、–操作只能通过函数来实现,而不能直接++、–符号。
信号量的初值,决定了占用信号量的线程的个数。
2.初始化和销毁
初始化一个信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参 1:sem 信号量
参 2:pshared 取 0 用于线程间;取非 0(一般为 1)用于进程间
参 3:value 指定信号量初值
sem_destroy 函数
销毁一个信号量
int sem_destroy(sem_t *sem);
3.PV操作主要函数
sem_wait 函数
给信号量加锁 –
int sem_wait(sem_t *sem);
sem_post 函数
给信号量解锁 ++
int sem_post(sem_t *sem);
sem_trywait 函数
尝试对信号量加锁 – (与 sem_wait 的区别类比 lock 和 trylock)
int sem_trywait(sem_t *sem);
sem_timedwait 函数
限时尝试对信号量加锁 –
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
参 2:abs_timeout 采用的是绝对时间。
定时 1 秒:
time_t cur = time(NULL); 获取当前时间。
struct timespec t; 定义 timespec 结构体变量 t
t.tv_sec = cur+1; 定时 1 秒
t.tv_nsec = t.tv_sec +100;
sem_timedwait(&sem, &t); 传参
3.实现生产者消费者
流程
完整代码
/*信号量实现 生产者 消费者问题*/#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>#define NUM 5 int queue[NUM]; //全局数组实现环形队列
sem_t blank_number, product_number; //空格子信号量, 产品信号量void *producer(void *arg)
{int i = 0;while (1) {sem_wait(&blank_number); //生产者将空格子数--,为0则阻塞等待queue[i] = rand() % 1000 + 1; //生产一个产品printf("----Produce---%d\n", queue[i]); sem_post(&product_number); //将产品数++i = (i+1) % NUM; //借助下标实现环形sleep(rand()%1);}
}void *consumer(void *arg)
{int i = 0;while (1) {sem_wait(&product_number); //消费者将产品数--,为0则阻塞等待printf("-Consume---%d\n", queue[i]);queue[i] = 0; //消费一个产品 sem_post(&blank_number); //消费掉以后,将空格子数++i = (i+1) % NUM;sleep(rand()%3);}
}int main(int argc, char *argv[])
{pthread_t pid, cid;sem_init(&blank_number, 0, NUM); //初始化空格子信号量为5, 线程间共享 -- 0sem_init(&product_number, 0, 0); //产品数为0pthread_create(&pid, NULL, producer, NULL);pthread_create(&cid, NULL, consumer, NULL);pthread_join(pid, NULL);pthread_join(cid, NULL);sem_destroy(&blank_number);sem_destroy(&product_number);return 0;
}
6.线程同步机制包装类
为了充分复用代码,同时后文需要,我们将前面讨论的三种线程同步机制分别封装为三个类,实现在locker.h
头文件中:
#ifndef LOCKER_H
#define LOCKER_H#include <exception>
#include <pthread.h>
#include <semaphore.h>// 封装信号量的类
class sem {
public:// 创建并初始化信号量sem() {if (sem_init(&m_sem, 0, 0) != 0) {// 构造函数没有返回值,可通过抛出异常来报告错误throw std::exception();}}// 销毁信号量~sem() {sem_destroy(&m_sem);}// 等待信号量bool wait() {return sem_wait(&m_sem) == 0;}// 增加信号量bool post() {return sem_post(&m_sem) == 0;}private:sem_t m_sem;
};// 封装互斥锁的类
class locker {
public:// 创建并初始化互斥锁locker() {if (pthread_mutex_init(&m_mutex, NULL) != 0) {throw std::exception();}}// 销毁互斥锁~locker() {pthread_mutex_destroy(&m_mutex);}// 获取互斥锁bool lock() {return pthread_mutex_lock(&m_mutex) == 0;}// 释放互斥锁bool unlock() {return pthread_mutex_unlock(&m_mutex) == 0;}private:pthread_mutex_t m_mutex;
};// 封装条件变量的类
class cond {
public:// 创建并初始化条件变量cond() {if (pthread_mutex_init(&m_mutex, NULL) != 0) {throw std::exception();}if (pthread_cond_init(&m_cond, NULL) != 0) {// 构造函数中一旦出现问题,就应立即释放已经成功分配的资源pthread_mutex_destroy(&m_mutex);throw std::exception();}}// 销毁条件变量~cond() {pthread_mutex_destroy(&m_mutex);pthread_cond_destroy(&m_cond);}// 等待条件变量bool wait() {int ret = 0;// 作者在此处对互斥锁加锁,保护了什么?这导致其他人无法使用该封装类pthread_mutex_lock(&m_mutex);ret = pthread_cond_wait(&m_cond, &m_mutex);pthread_mutex_unlock(&m_mutex);return ret == 0;}// 唤醒等待条件变量的线程bool signal() {return pthread_cond_signal(&m_cond) == 0;}private:pthread_mutex_t m_mutex;pthread_cond_t m_cond;
};#endif