当前位置: 首页 > news >正文

并发编程---多线程不安全示例以及解决,多线程创建方式

文章目录

  • 并发
  • 并行
  • 多线程
  • 为什么需要多线程
  • 线程不安全示例
  • 并发出现问题的根源: 并发三要素
    • 可见性: CPU 缓存引起
    • 原子性:分时复用引起
    • 有序性: 重排序引起
  • 线程不安全示例的解决方法
    • 使用AtomicLong类
    • 使用synchronized 关键字
  • 改进代码避免不必要的延迟
    • join()方法
      • 为什么使用 `join()`?
    • CountDownLatch类
      • 基本原理:
      • 常用方法:
  • 通过实现Runnable接口完成线程
  • 通过继承Thread类完成线程
  • 通过实现Callable接口完成接口
    • 手动创建线程,**使用 `FutureTask` 来处理 `Callable` 任务**
    • 使用线程池


并发

并发指的是系统在同一时间段内处理多个任务的能力,但并不要求这些任务同时执行。它侧重于任务的 逻辑上的同时进行,即在某个时间段内,系统切换多个任务的执行,这样看起来就像是多个任务“同时”运行,但实际上是在有限的资源上交替执行。

并发的特点

  • 逻辑上的同时:多个任务在时间上交替进行,给人一种“同时”执行的错觉,但它们通常是在一个处理器上通过快速切换执行的。
  • 时间分片:系统通过操作系统的调度器(例如时间片轮转)让任务看起来是并行的。
  • 可能不是物理同时执行:在单核 CPU 上,多个任务并发执行,但它们并不会真正同时执行。它们通过时间片的轮流执行来实现“并发”。

并发的例子

  • 单核 CPU 上运行多个任务。任务 A 和任务 B 不会同时执行,但它们会交替执行,任务 A 一会儿执行,接着任务 B 执行。
  • 例如,一个网站需要处理多个用户的请求,尽管每次只有一个请求在处理,但多个请求是并发的,因为它们会在时间上交替被处理。

并行

并行是指多个任务 同时 在多个处理器(或多个核心)上执行。它是并发的一种特殊情况,但必须具备 多核处理器多处理器系统 的支持,允许任务真正地同时执行。

并行的特点

  • 物理上的同时执行:多个任务在多个处理器或处理器核心上同时执行,任务间不需要交替执行。
  • 需要多核或多 CPU:并行计算通常依赖于硬件支持,例如多核 CPU 或分布式计算系统。
  • 速度更快:由于任务真正同时执行,通常比并发要高效,尤其是对于计算密集型任务。

并行的例子

  • 多核 CPU 上的并行任务。任务 A 和任务 B 可以在不同的核心上同时执行,不需要交替执行。
  • 例如,图像处理程序可以将图像划分为多个部分,然后在不同的处理器核心上同时处理每个部分,从而大大加快处理速度。

多线程

多线程是并发编程的一种方式,指的是一个程序内部可以有多个线程,这些线程共享程序的资源(如内存、文件描述符等),并且可以并行或并发地执行。每个线程都是操作系统调度的基本单位。

多线程的特点

  • 多线程是实现并发的一种方式,具体指的是在同一个进程中,通过线程来执行多个任务。
  • 在支持多核处理器的机器上,线程可以在多个核心上同时执行,这就实现了真正的并行。
  • 线程之间共享内存,因此可以快速地交换信息,但也会带来线程安全的问题。

为什么需要多线程

众所周知,CPU、内存、I/O 设备的速度是有极大差异的,为了合理利用 CPU 的高性能,平衡这三者的速度差异,计算机体系结构、操作系统、编译程序都做出了贡献,主要体现为:

  • CPU 增加了缓存,以均衡与内存的速度差异;// 导致 可见性 问题
  • 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;// 导致 原子性 问题
  • 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。// 导致 有序性 问题

线程不安全示例

如果多个线程对同一个共享数据进行访问而不采取同步操作的话,那么操作的结果是不一致的。

这段代码演示了 10000 个线程并发执行计算任务,每个线程计算从 0100 的累加和并将结果加到共享变量 ThreadLearn.result 中。操作结束之后它的值有可能小于 50500000。

package thread;public class ThreadLearn {public static long result; // 多个线程共享的,用于存储最终的计算结果public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10000; i++) { // 通过for循环启动10000个线程,每个线程通过MyThread类来执行任务MyThread myThread = new MyThread(100); // 每个线程都会计算从0到100的和// 启动线程,让线程并发执行Thread thread = new Thread(myThread);thread.start(); // start() 会异步调用run()方法}Thread.sleep(10*1000);System.out.println(result);}
}class MyThread implements Runnable {private int count;public MyThread(int count) {this.count = count;}@Overridepublic void run() {int sum = 0;for (int i = 0; i <= count; i++) {sum += i;}ThreadLearn.result += sum; // 将每个线程的sum累加到result变量中。}
}
49833400 // 结果总是小于50500000

并发出现问题的根源: 并发三要素

上述代码输出为什么不是 50500000? 并发出现问题的根源是什么?

可见性: CPU 缓存引起

可见性:一个线程对共享变量的修改,另外一个线程能够立刻看到。

举个简单的例子,看下面这段代码:

//线程1执行的代码
int i = 0;
i = 10;//线程2执行的代码
j = i;

假若执行线程 1 的是 CPU1,执行线程 2 的是 CPU2。由上面的分析可知,当线程 1 执行 i = 10 这句时,会先把 i 的初始值加载到 CPU1 的高速缓存中,然后赋值为 10,那么在 CPU1 的高速缓存当中 i 的值变为 10 了,却没有立即写入到主存当中。

此时线程 2 执行 j = i,它会先去主存读取 i 的值并 i 加载到 CPU2 的缓存当中,注意此时内存当中 i 的值还是 0,i 那么就会使得 j 的值为 0,而不是 10.

这就是可见性问题,线程 1 对变量 i 修改了之后,线程 2 没有立即看到线程 1 修改的值。


原子性:分时复用引起

原子性:即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。

举个简单的例子,看下面这段代码:

int i = 1;// 线程1执行
i += 1;// 线程2执行
i += 1;

这里需要注意的是:i += 1 需要三条 CPU 指令

  1. 将变量 i 从内存读取到 CPU 寄存器;
  2. 在 CPU 寄存器中执行 i + 1 操作;
  3. 将最后的结果 i 写入内存(缓存机制导致可能写入的是 CPU 缓存而不是内存)。

由于 CPU 分时复用(线程切换)的存在,线程 1 执行了第一条指令后,就切换到线程 2 执行,假如线程 2 执行了这三条指令后,再切换会线程 1 执行后续两条指令,将造成最后写到内存中的 i 值是 2 而不是 3。


有序性: 重排序引起

有序性:即程序执行的顺序按照代码的先后顺序执行。举个简单的例子,看下面这段代码:

int a = 1; // 指令1
int b = 2; // 指令2

编译器可能会将其重排序为:

int b = 2; // 指令2
int a = 1; // 指令1

如果这些变量是共享变量,且多个线程依赖它们的顺序,就会导致问题。


线程不安全示例的解决方法

使用AtomicLong类

package thread;public class ThreadLearn {// AtomicLong 是 Java 提供的一个类,用于处理长整型(long)数据类型的原子操作public static AtomicLong result = new AtomicLong(0);public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10000; i++) { // 通过for循环启动10000个线程,每个线程通过MyThread类来执行任务MyThread myThread = new MyThread(100); // 每个线程都会计算从0到100的和// 启动线程,让线程并发执行Thread thread = new Thread(myThread);thread.start(); // start() 会异步调用run()方法}Thread.sleep(10*1000);System.out.println(result);}
}class MyThread implements Runnable {private int count;public MyThread(int count) {this.count = count;}@Overridepublic void run() {int sum = 0;for (int i = 0; i <= count; i++) {sum += i;}ThreadLearn.result.addAndGet(sum);// AtomicLong 提供了原子操作 addAndGet(),这个方法将 sum 加到 result 上,并返回更新后的值。}
}
50500000

AtomicLong 的作用: 使用 AtomicLongaddAndGet(sum) 方法,能够保证在多线程环境下,对 result 的累加操作是 原子 的,即使多个线程同时执行相同操作,也能保证每次操作都是安全的,不会产生竞争条件。


使用synchronized 关键字

package thread;public class ThreadLearn {public static long result; // 多个线程共享的,用于存储最终的计算结果public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10000; i++) { // 通过for循环启动10000个线程,每个线程通过MyThread类来执行任务MyThread myThread = new MyThread(100); // 每个线程都会计算从0到100的和// 启动线程,让线程并发执行Thread thread = new Thread(myThread);thread.start(); // start() 会异步调用run()方法}Thread.sleep(10*1000);System.out.println(result);}
}class MyThread implements Runnable {private int count;public MyThread(int count) {this.count = count;}@Overridepublic void run() {int sum = 0;for (int i = 0; i <= count; i++) {sum += i;}synchronized (ThreadLearn.class) {// synchronized关键字来保证同一时刻只有一个线程可以更新result的值// 通过加锁ThreadLearn.class(即类锁),使得对 result 的修改是线程安全的。ThreadLearn.result += sum;}}
}

synchronized 关键字确保同一时刻只有一个线程能进入被 synchronized 修饰的代码块。当一个线程正在执行被加锁的代码时,其他线程会被阻塞,直到该线程执行完毕并释放锁。这就避免了多个线程在同一时刻访问共享资源的情况,确保了对共享资源的修改是有序的,避免了数据竞争。

在这个代码中的使用

synchronized (ThreadLearn.class) {ThreadLearn.result += sum;
}

加锁对象synchronized 关键字后面跟的 ThreadLearn.class 表示加锁的是 ThreadLearn 类的对象锁。也就是说,所有线程在执行这个同步代码块时,必须先获取到 ThreadLearn 类的锁,才能执行 result += sum 操作。

保证顺序执行:通过加锁,当多个线程同时执行 run() 方法时,只有一个线程能进入 synchronized 块并修改 result。其他线程必须等待该线程执行完毕并释放锁后,才能继续修改 result。这样就保证了 result 的更新操作不会被多个线程同时执行,避免了并发问题。


改进代码避免不必要的延迟

join()方法

使用 join() 方法来确保所有线程执行完毕后再继续执行主线程,可以避免通过 Thread.sleep() 等待线程执行的潜在问题。以下是改进后的代码,使用了 join() 来确保主线程等待所有子线程执行完毕后再输出最终结果。

package thread;import java.util.concurrent.atomic.AtomicLong;public class ThreadLearn {// 使用 AtomicLong 提供原子操作public static AtomicLong result = new AtomicLong(0);public static void main(String[] args) throws InterruptedException {long start = System.currentTimeMillis(); // 计时器int threadCount = 10000;Thread[] threads = new Thread[threadCount]; // 创建线程数组// 创建并启动 10000 个线程for (int i = 0; i < threadCount; i++) {MyThread myThread = new MyThread(100);threads[i] = new Thread(myThread);threads[i].start(); // 启动线程}// 使用 join() 等待所有线程执行完成for (Thread thread : threads) {thread.join(); // 确保主线程等待所有子线程执行完毕}// 打印最终结果和执行时间System.out.println(result);System.out.println("耗时:" + (System.currentTimeMillis() - start));}
}class MyThread implements Runnable {private int count;public MyThread(int count) {this.count = count;}@Overridepublic void run() {int sum = 0;for (int i = 0; i <= count; i++) {sum += i; // 计算 0 到 count 的和}ThreadLearn.result.addAndGet(sum);}
}

thread.join()join() 方法用于让主线程等待每个子线程执行完毕。主线程会依次调用 join(),直到所有的子线程执行完成。这样可以确保 result 输出的值是正确的。

为什么使用 join()

  • 等待所有线程完成join() 会让主线程等待当前线程完成。在这种情况下,主线程会等所有子线程执行完毕,避免提前输出结果。
  • 防止线程还没执行完毕时输出结果:如果没有使用 join(),主线程可能会在子线程执行之前就打印出结果,从而导致 result 的值不准确。

CountDownLatch类

package thread;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;public class ThreadLearn {// 使用 AtomicLong 提供原子操作public static AtomicLong result = new AtomicLong(0);static int count = 10000;  // 线程数public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器public static void main(String[] args) throws InterruptedException {long start = System.currentTimeMillis(); // 计时器Thread[] threads = new Thread[count]; // 创建线程数组,大小为 count// 创建并启动 10000 个线程for (int i = 0; i < count; i++) {MyThread myThread = new MyThread(100);  // 每个线程都会计算 0 到 100 的和threads[i] = new Thread(myThread);threads[i].start();  // 启动线程}// 等待所有线程完成countDownLatch.await(); // 主线程会在这里等待,直到计数器为 0// 打印最终结果和执行时间System.out.println(result);System.out.println("耗时:" + (System.currentTimeMillis() - start));}
}class MyThread implements Runnable {private int count;public MyThread(int count) {this.count = count;}@Overridepublic void run() {int sum = 0;// 计算 0 到 count 的和for (int i = 0; i <= count; i++) {sum += i;}// 使用 AtomicLong 提供的原子操作进行更新ThreadLearn.result.addAndGet(sum);  // 线程安全的累加// 完成任务,计数器减 1ThreadLearn.countDownLatch.countDown();}
}

基本原理:

CountDownLatch 通过一个计数器来控制线程的等待,计数器的初始值是一个正整数。每当一个线程完成了某项工作,它会调用 countDown() 方法将计数器减 1。当计数器值减到 0 时,所有等待的线程(调用了 await() 方法的线程)都会被唤醒。

常用方法:

  • countDown():将计数器的值减 1。当计数器值为 0 时,所有等待的线程会被唤醒。
  • await():使当前线程等待,直到计数器的值为 0。该方法会阻塞当前线程,直到其他线程调用 countDown() 并使计数器减为 0。

通过实现Runnable接口完成线程

package thread;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;public class ThreadLearn {// 使用 AtomicLong 提供原子操作public static AtomicLong result = new AtomicLong(0);static int count = 10000;  // 线程数public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器public static void main(String[] args) throws InterruptedException {long start = System.currentTimeMillis(); // 计时器Thread[] threads = new Thread[count]; // 创建线程数组,大小为 count// 创建并启动 10000 个线程for (int i = 0; i < count; i++) {MyThread myThread = new MyThread(100);  // 每个线程都会计算 0 到 100 的和threads[i] = new Thread(myThread);threads[i].start();  // 启动线程}// 等待所有线程完成countDownLatch.await(); // 主线程会在这里等待,直到计数器为 0// 打印最终结果和执行时间System.out.println(result);System.out.println("耗时:" + (System.currentTimeMillis() - start));}
}class MyThread implements Runnable {private int count;public MyThread(int count) {this.count = count;}@Overridepublic void run() {int sum = 0;// 计算 0 到 count 的和for (int i = 0; i <= count; i++) {sum += i;}// 使用 AtomicLong 提供的原子操作进行更新ThreadLearn.result.addAndGet(sum);  // 线程安全的累加// 完成任务,计数器减 1ThreadLearn.countDownLatch.countDown();}
}

通过继承Thread类完成线程

public class ThreadLearn {// 使用 AtomicLong 提供原子操作public static AtomicLong result = new AtomicLong(0);static int count = 10000;  // 线程数public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器public static void main(String[] args) throws InterruptedException {long start = System.currentTimeMillis(); // 计时器Thread[] threads = new Thread[count]; // 创建线程数组,大小为 count// 创建并启动 10000 个线程for (int i = 0; i < count; i++) {MyThread1 myThread = new MyThread1(100);  // 每个线程都会计算 0 到 100 的和threads[i] = new Thread(myThread);threads[i].start();  // 启动线程}// 等待所有线程完成countDownLatch.await(); // 主线程会在这里等待,直到计数器为 0// 打印最终结果和执行时间System.out.println(result);System.out.println("耗时:" + (System.currentTimeMillis() - start));}
}class MyThread1 extends Thread {private long count;public MyThread1(int count) {this.count = count;}@Overridepublic void run() {super.run();int sum = 0;// 计算 0 到 count 的和for (int i = 0; i <= count; i++) {sum += i;}// 使用 AtomicLong 提供的原子操作进行更新ThreadLearn.result.addAndGet(sum);  // 线程安全的累加// 完成任务,计数器减 1ThreadLearn.countDownLatch.countDown();}
}

通过实现Callable接口完成接口

手动创建线程,使用 FutureTask 来处理 Callable 任务

public class ThreadLearn {// 使用 AtomicLong 提供原子操作public static AtomicLong result = new AtomicLong(0);static int count = 10000;  // 线程数public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器public static void main(String[] args) throws InterruptedException, ExecutionException {FutureTask<Integer> future = new FutureTask<Integer>(new MyThread2(100));Thread thread = new Thread(future);thread.start();Integer o = future.get();System.out.println(o);}
}class MyThread2 implements Callable<Integer> {private int count;public MyThread2(int count) {this.count = count;}@Overridepublic Integer call() throws Exception {int sum = 0;// 计算 0 到 count 的和for (int i = 0; i <= count; i++) {sum += i;}// 使用 AtomicLong 提供的原子操作进行更新ThreadLearn.result.addAndGet(sum);  // 线程安全的累加// 完成任务,计数器减 1ThreadLearn.countDownLatch.countDown();return sum;}
}

使用线程池

public class ThreadLearn {// 使用 AtomicLong 提供原子操作public static AtomicLong result = new AtomicLong(0);public static CountDownLatch countDownLatch = new CountDownLatch(count);  // 初始化计数器public static ExecutorService executor = Executors.newFixedThreadPool(10);public static void main(String[] args) throws InterruptedException, ExecutionException {Future<Integer> submit = executor.submit(new MyThread2(100));Integer i = submit.get();System.out.println(i);}
}class MyThread2 implements Callable<Integer> {private int count;public MyThread2(int count) {this.count = count;}@Overridepublic Integer call() throws Exception {int sum = 0;// 计算 0 到 count 的和for (int i = 0; i <= count; i++) {sum += i;}// 使用 AtomicLong 提供的原子操作进行更新ThreadLearn.result.addAndGet(sum);  // 线程安全的累加// 完成任务,计数器减 1ThreadLearn.countDownLatch.countDown();return sum;}
}

http://www.mrgr.cn/news/90521.html

相关文章:

  • AIGC视频可控性提升:如何用数字人替代传统UGC、PGC生产工具
  • Qt 控件整理 —— 按钮类
  • 算法02-各种排序算法
  • 基于 PyTorch 的树叶分类任务:从数据准备到模型训练与测试
  • Python----PyQt开发(PyQt基础,环境搭建,Pycharm中PyQttools工具配置,第一个PyQt程序)
  • win11 python opencv作图像匹配小结
  • 【嵌入式Linux应用开发基础】read函数与write函数
  • 【工业安全】-CVE-2019-17621-D-Link Dir-859L 路由器远程代码执行漏洞
  • 自然语言处理NLP入门 -- 第三节词袋模型与 TF-IDF
  • haproxy+nginx负载均衡实验
  • 解锁大语言模型潜能:KITE 提示词框架全解析
  • DeepSeek-V3 技术报告
  • 设计模式全解(含代码实例)
  • 【科技革命】颠覆性力量与社会伦理的再平衡
  • web前端布局--使用element中的Container布局容器
  • C++基础学习记录—this指针
  • c#中“事件-event”的经典示例与理解
  • c++ 多线程知识汇总
  • 开发一个类似小红书的社交电商平台需要综合技术、产品和运营能力
  • 自然语言处理NLP入门 -- 第一节基础概念
  • 黑马Mistral Le chat逆转deepseek
  • 【NLP 21、实践 ③ 全切分函数切分句子】
  • lvsDR模式实现
  • Elasticjob在同一个实例上运行的多个分片任务
  • Flask Web开发的重要概念和示例
  • 洛谷 P4552 [Poetize6] IncDec Sequence