并发编程(JUC)
基本概念
JUC
JUC 就是指 java.util.concurrent 包,这个包提供了一系列的并发工具类及接口,可以简化多线程编程的过程,提高开发效率
进程
process,是正在执行的程序实例,是操作系统进行资源分配和调度的独立单位,每个进程都有自己独立的内存空间、系统资源以及一个唯一的进程标识符(PID)。
- 每个进程都有独立的内存空间(代码段、数据段、堆栈等)。
- 进程之间的通信需要通过进程间通信机制(IPC),如管道、共享内存、消息队列等。
- 进程的创建成本相对较高,因为需要分配独立的内存空间和系统资源。
线程
thread,是进程中执行的一个实体,是CPU调度和分派的基本单位。一个进程可以包含多个线程,这些线程共享进程的资源,如内存地址空间、打开的文件等。
- 同一个进程内的所有线程共享该进程的内存空间和其他资源,这使得线程间通信更加高效。
- 创建线程的成本比创建进程低得多,因为线程之间不需要单独分配内存地址空间。
- 多线程编程可以用来实现并发处理,提高程序的执行效率,特别是在I/O操作频繁的情况下。
线程分为用户线程和守护线程,守护线程会在用户线程运行期间一直运行,但一旦所有用户线程都结束守护线程会自动死亡。
区别
- 资源开销:启动一个新的进程比启动一个新的线程需要更多的资源和时间。
- 内存空间:每个进程都有自己独立的内存空间,而线程共享其所属进程的内存空间。
- 数据共享与通信:由于线程共享同一进程的资源,因此线程间的通信比进程间通信更简单高效。
- 系统影响:线程切换的开销比进程切换小,因为线程不涉及内存管理方面的切换。
进程是资源分配的基本单位,线程是CPU调度的基本单位。
一个进程可以有多个线程,线程比进程更轻量、切换更快、通信更方便。
JAVA默认具有两个线程,main 和 gc
锁
锁(Lock)是并发编程中的一种同步机制,用于管理对共享资源的访问。在多线程环境中,多个线程可能同时尝试访问或修改同一个资源,这可能会导致数据不一致或其他并发问题。锁提供了一种手段来确保在任何时刻只有一个线程能够访问特定的资源或执行一段代码,从而避免这些问题。
锁具有三个基本性质:互斥性、原子性、可见性
锁的种类有:互斥锁、读写锁、自旋锁、可重入锁
锁就是用于控制多线程并发访问时对共享资源的访问权限,确保同一时间只有一个线程可以操作资源,从而保证线程安全。
并发和并行
并发 | Concurrency | 多个任务在重叠的时间段内执行,不一定同时发生,强调任务的调度与交替执行。 |
并行 | Parallelism | 多个任务真正的同时执行,通常发生在多核 CPU 或多台机器上,强调任务的同时执行。 |
并发是一个时间段内多个任务交替执行(可以是单核),并行是同一时刻多个任务真正同时执行(需要多核)。
并发编程
并发编程是编写可以同时处理多个任务的程序的技术和方法。
核心目的是:充分利用CPU资源,提高程序响应性和交互性。
线程的生命周期
NEW | 线程对象已创建,但还未调用 start() 方法 |
RUNNABLE | 线程正在 JVM 中运行(可能在运行、也可能等待操作系统调度) |
BLOCKED | 线程正在等待获取一个监视器锁以进入同步块/方法 |
WAITING | 线程无限等待另一个线程执行特定操作(如 join() 、wait() ) |
TIMED_WAITING | 线程在限时等待(如调用了 sleep(long) 、wait(long) 、join(long) ) |
TERMINATED | 线程已经执行完毕 |
注意:一个线程一旦变为TERMINATED就不可再重新执行,必须要重新创建线程
wait和sleep
所属类 | Object |
Thread |
是否需要同步上下文 | 是 | 否 |
是否释放锁 | 是 | 否 |
如何唤醒 | notify() /notifyAll() 或超时 |
时间到期 |
创建线程
本质只有一种方式:new Thread()
Thread
package xyz.ssydx.thread;
public class Thread1 extends Thread {
private Integer cnt;
public Thread1(Integer cnt) {
this.cnt = cnt;
}
@Override
public void run() {
for (int i = 0; i < cnt; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(i + " " + Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
Thread1 thread1 = new Thread1(10);
thread1.setName("Thread1");
thread1.start();
for (int i = 0; i < 10; i++) {
Thread.sleep(10);
System.out.println(i + " " + Thread.currentThread().getName());
}
}
}
Runnable
package xyz.ssydx.thread;
public class Thread2 implements Runnable {
private Integer cnt;
public Thread2(Integer cnt) {
this.cnt = cnt;
}
@Override
public void run() {
for (int i = 0; i < cnt; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(i + " " + Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread2 = new Thread(new Thread2(10), "Thread2");
thread2.start();
for (int i = 0; i < 10; i++) {
Thread.sleep(10);
System.out.println(i + " " + Thread.currentThread().getName());
}
}
}
Runnable函数式接口形式(Lambda表达式)
package xyz.ssydx.thread;
public class Thread3 {
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(i + " " + Thread.currentThread().getName());
}
}, "Thread3").start();
for (int i = 0; i < 10; i++) {
Thread.sleep(10);
System.out.println(i + " " + Thread.currentThread().getName());
}
}
}
Callable
和Runnable不同,它可以抛出异常,也可以有返回值
package xyz.ssydx.thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class Thread4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCall myCall = new MyCall();
// 由于Callable具备返回值, 具有异步特性(执行过程是异步的, 但获取结果是阻塞的), 而Thread 本身只接收 Runnable, 必须借助适配器(这里是 FutureTask) 进行适配
FutureTask<String> futureTask = new FutureTask<>(myCall);
new Thread(futureTask).start();
System.out.println("hello ssydx");
// 可以指定等待时间, 超时会出现超时异常, 注意该方法会阻塞后续的操作直到获取返回值
String string = futureTask.get();
System.out.println(string);
System.out.println("hello world");
}
}
class MyCall implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(1);
int sum = 0;
for (int i = 0; i < 10; i++) {
sum += i;
}
return String.valueOf(sum);
}
}
synchronized 和 ReentrantLock
synchronized 修饰方法时表示的是给实例或类加锁,有时未必是共享资源,因此最好采用同步块方式加锁
单线程
package xyz.ssydx.lock;
public class Single {
public static void main(String[] args) {
Ticket1 ticket = new Ticket1();
for (int i = 0; i < 60; i++) ticket.sale();
}
}
// 另一主类, 实际不在同一文件中
public class Multi {
public static void main(String[] args) {
Ticket1 ticket = new Ticket1();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "A").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "B").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "C").start();
}
}
class Ticket1 {
private int num = 50;
public void sale() {
if (num > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " " + (50 - (--num)));
}
}
}
synchronized
package xyz.ssydx.lock;
public class Synch {
public static void main(String[] args) {
Ticket2 ticket = new Ticket2();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "A").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "B").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "C").start();
}
}
class Ticket2 {
private int num = 50;
public synchronized void sale() {
if (num > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " " + (50 - (--num)));
}
}
}
ReentrantLock
package xyz.ssydx.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Reentrant {
public static void main(String[] args) {
Ticket3 ticket = new Ticket3();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "A").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "B").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "C").start();
}
}
class Ticket3 {
private int num = 50;
Lock lock = new ReentrantLock();
public void sale() {
lock.lock();
try {
if (num > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " " + (50 - (--num)));
}
} finally {
lock.unlock();
}
}
}
总结
语法 | 内置语言特性,使用简单(方法或代码块级别)。 | 需要显式创建 Lock 对象,并手动调用 lock() 和 unlock() 方法。 |
自动释放锁 | 当执行流离开同步区域时,锁会自动释放,即使发生异常也会正确释放锁。 | 必须在 finally 块中手动释放锁 (unlock() ),以确保即使发生异常也能正确释放锁。 |
尝试获取锁 | 不支持尝试获取锁。 | 支持通过 tryLock() 尝试获取锁,可以选择带超时的版本。 |
可中断获取锁 | 不支持可中断获取锁。 | 支持通过 lockInterruptibly() 获取锁,允许在等待时响应中断。 |
公平性 | 不支持设置公平性,默认行为类似于非公平锁。 | 可以通过构造函数参数设置为公平锁或非公平锁(默认是非公平)。 |
条件变量 | 使用 wait() 、notify() 和 notifyAll() 实现线程间的通信。 |
提供了 newCondition() 方法来创建 Condition 对象,提供了更灵活的线程间通信机制。 |
性能 | 在 Java 6 之后,性能得到了显著提升,在某些情况下与 ReentrantLock 相近。 |
性能通常较好,特别是在需要高级特性(如公平锁、尝试获取锁等)时表现更优。 |
重入性 | 支持重入,即同一个线程可以多次获得同一个锁。 | 同样支持重入,且提供了更多的功能来管理锁的状态和行为。 |
监控和管理 | JVM 提供了一些工具用于监控和管理 synchronized 锁的状态。 |
提供了更多方法来检查锁的状态(如是否持有锁、是否有等待队列等)。 |
获取锁状态判断 | 不直接提供获取锁状态的方法。可以通过反射和JVM内部API间接实现,但不推荐。 | 提供了多种方法来查询锁的状态,例如 isHeldByCurrentThread() 、getHoldCount() 等。 |
公平锁和非公平锁
是否保证顺序 | 是 | 否 |
是否支持插队 | 否 | 是 |
是否容易出现饥饿 | 不容易 | 可能出现 |
性能 | 相对较低(需维护有序队列) | 相对较高(减少上下文切换) |
适用场景 | 对公平性要求高,如银行交易排队系统 | 对性能要求高,如Web服务器处理请求 |
公平锁:按照线程请求锁的顺序进行分配,即先请求的线程先获得锁。可以避免饥饿现象,确保每个线程都有机会执行,但执行效率低(需要维护一个有序的线程队列)
设想存在一个线程A执行需要3分钟,另一线程B只需3秒钟,此时假如A先请求锁,其必然先获得锁,但此时B的短时任务却需要等待3分钟,这有时并不合理(降低任务吞吐量),因此往往采用非公平锁
非公平锁:不按照请求锁的顺序进行分配,而是随机选取一个请求锁的线程,客观减少了短任务的等待时长,但也可能出现部分任务长时间获取不到锁(饥饿现象)
消费者和生产者(线程通信)
流程是:条件等待,执行业务,唤醒其他
虚假唤醒
线程没有被notify却醒来叫做虚假唤醒,虚假唤醒本质是JVM或操作系统的底层实现导致的,是合法行为,应该在代码层次予以控制,一般来说就是在条件等待中,不要使用if进行判断,而是使用while进行判断
注意:虚假唤醒和java代码本身无关,不是notify(随机唤醒单个线程)或notifyAll(唤醒所有)造成的,也和线程数量无关
为什么使用while可以避免?它保证了即使被虚假唤醒后依然会重新进行while循环进行判断是否接着等待,而if在休眠期被唤醒后将不再重新进行休眠判断
synchronized 1
package xyz.ssydx.pc;
public class Pc1 {
public static void main(String[] args) {
Data1 data1 = new Data1();
new Thread(() -> { for (int i = 0; i < 10; i++) data1.increment(); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data1.decrement(); }, "C1").start();
}
}
class Data1 {
private int data = 0;
public synchronized void increment() {
if (data != 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
this.notifyAll();
}
public synchronized void decrement() {
if (data == 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
this.notifyAll();
}
}
synchronized 2
package xyz.ssydx.pc;
public class Pc2 {
public static void main(String[] args) {
Data2 data2 = new Data2();
new Thread(() -> { for (int i = 0; i < 10; i++) data2.increment(); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data2.increment(); }, "P2").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data2.decrement(); }, "C1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data2.decrement(); }, "C2").start();
}
}
class Data2 {
private int data = 0;
public synchronized void increment() {
while (data != 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
this.notifyAll();
}
public synchronized void decrement() {
while (data == 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
this.notifyAll();
}
}
ReentrantLock 1
package xyz.ssydx.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Pc3 {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(() -> { for (int i = 0; i < 10; i++) data3.increment(); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data3.increment(); }, "P2").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data3.decrement(); }, "C1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data3.decrement(); }, "C2").start();
}
}
class Data3 {
private int data = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
while (data != 0) {
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
condition.signal();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (data == 0) {
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
condition.signal();
} finally {
lock.unlock();
}
}
}
ReentrantLock 2
package xyz.ssydx.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Pc4 {
public static void main(String[] args) {
Data4 data4 = new Data4();
new Thread(() -> { for (int i = 0; i < 10; i++) data4.increment(); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data4.increment(); }, "P2").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data4.decrement(); }, "C1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data4.decrement(); }, "C2").start();
}
}
class Data4 {
private int data = 0;
Lock lock = new ReentrantLock();
Condition producer = lock.newCondition();
Condition consumer = lock.newCondition();
public void increment() {
lock.lock();
try {
while (data != 0) {
try {
producer.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
consumer.signal();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (data == 0) {
try {
consumer.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
producer.signal();
} finally {
lock.unlock();
}
}
}
ReentrantLock 3
package xyz.ssydx.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Pc5 {
public static void main(String[] args) {
Data5 data5 = new Data5();
new Thread(() -> { for (int i = 0; i < 10; i++) data5.increment(0); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data5.increment(2); }, "P2").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data5.decrement(1); }, "C1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data5.decrement(3); }, "C2").start();
}
}
class Data5 {
private int data = 0;
private final Lock lock = new ReentrantLock();
private final Condition[] conditions = new Condition[4];
private int state = 0; // 当前允许运行的线程序号
public Data5() {
for (int i = 0; i < 4; i++) {
conditions[i] = lock.newCondition();
}
}
// 生产方法
public void increment(int threadId) {
lock.lock();
try {
while (state != threadId) {
conditions[threadId].await(); // 等待轮到自己
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
// 更新状态并唤醒下一个线程
state = (state + 1) % 4;
conditions[state].signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
// 消费方法
public void decrement(int threadId) {
lock.lock();
try {
while (state != threadId) {
conditions[threadId].await(); // 等待轮到自己
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
// 更新状态并唤醒下一个线程
state = (state + 1) % 4;
conditions[state].signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
锁的是谁?
同一实例的多个非静态同步方法共用一把锁
不同实例不共用锁
同一类的多个静态同步方法共用一把锁
同一实例 + 多个 synchronized 方法 |
this (实例对象) |
是 |
不同实例 + synchronized 方法 |
各自的 this |
否 |
static synchronized 方法 |
类.class 对象 |
是(所有该类的实例共享) |
测试1
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 非静态方法的 synchronized 锁的是 this, 即实例
// A先获得实例的锁, 所以 A 先执行
// send msg
// call
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone1 phone = new Phone1();
new Thread(()-> phone.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> phone.call(),"B").start();
}
}
class Phone1 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public synchronized void call() {
System.out.println("call");
}
}
测试2
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 静态方法的 synchronized 锁的是 Class, 即类
// A先获得类的锁, 所以 A 先执行
// send msg
// call
public class Test2 {
public static void main(String[] args) throws InterruptedException {
new Thread(()-> Phone2.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> Phone2.call(),"B").start();
}
}
class Phone2 {
public static synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public static synchronized void call() {
System.out.println("call");
}
}
测试3
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 非静态方法的 synchronized 锁的是 this, 即实例
// 静态方法的 synchronized 锁的是 Class, 即类
// A先获得实例的锁, 睡眠时间 2s 后输出, B睡眠 1s 后获得类的锁并输出
// 因此 B 先于 A
// call
// send msg
public class Test3 {
public static void main(String[] args) throws InterruptedException {
Phone3 phone = new Phone3();
new Thread(()-> phone.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> Phone3.call(),"B").start();
}
}
class Phone3 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public static synchronized void call() {
System.out.println("call");
}
}
测试4
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 非静态方法的 synchronized 锁的是 this, 即实例
// A先获得实例的锁, 睡眠时间 2s 后输出, B睡眠 1s 后无需实例的锁就输出
// 因此 B 先于 A
// call
// send msg
public class Test4 {
public static void main(String[] args) throws InterruptedException {
Phone4 phone = new Phone4();
new Thread(()-> phone.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> phone.call(),"B").start();
}
}
class Phone4 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public void call() {
System.out.println("call");
}
}
测试5
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 静态方法的 synchronized 锁的是 Class, 即类
// A先获得类的锁, 睡眠时间 2s 后输出, B睡眠 1s 后无需类的锁就输出
// 因此 B 先于 A
// call
// send msg
public class Test5 {
public static void main(String[] args) throws InterruptedException {
Phone5 phone = new Phone5();
new Thread(()-> phone.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> phone.call(),"B").start();
}
}
class Phone5 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public void call() {
System.out.println("call");
}
}
测试6
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 非静态方法的 synchronized 锁的是 this, 即实例
// A先获得实例phone1的锁, 睡眠时间 2s 后输出, B睡眠 1s 后获得实例phone2的锁就输出, 两个实例不共用锁
// 因此 B 先于 A
// call
// send msg
public class Test6 {
public static void main(String[] args) throws InterruptedException {
Phone6 phone1 = new Phone6();
Phone6 phone2 = new Phone6();
new Thread(()-> phone1.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> phone2.call(),"B").start();
}
}
class Phone6 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public synchronized void call() {
System.out.println("call");
}
}
并发集合
如果有数据结构和操作系统相关知识,这部分是比较好理解的
CopyOnWriteArrayList(含底层探究)
package xyz.ssydx.collections;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
// 列表不安全
public class TestList {
public static void main(String[] args) {
// // 不安全, 并发读写存在问题, 底层采用了类似乐观锁(fail-fast)的机制, 读取(仅限迭代器遍历)或写入时都会检查版本号 modCount, 不一致则报错
// List<String> list = new ArrayList<>();
// // 安全, 原因是其读写方法都使用了 Synchronized 关键字添加了隐式锁(逐渐被淘汰也主要是这个原因, 读写都加锁造成性能不佳)
// Vector<String> list = new Vector<>();
// // 安全, 原因是被转换为了同步类(把读写方法包装为了同步方法), 原理基本同 Vector
// List<String> list = Collections.synchronizedList(new ArrayList<String>());
// 写时复制, 每次写入都是复制一个新的数组(写入操作添加显式锁 ReentrantLock), 修改后把原数组指针指向这个新的数组, 确保了不会同时写入, 同时不影响读取操作
// 内部使用 volatile 关键字用于确保数组的可见性(即每次写入都立即刷新到内存, 每次读取都从内存中读取最新版本)
// 内部使用 ReentrantLock 对写入操作加锁
// 由于使用写时复制, 其写入成本较高, 适合读多写少(这也是最常见的场景)
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
// 增加打印的操作(会遍历列表), 否则可能不易发生并发修改异常
System.out.println(list);
}).start();
}
}
}
ArrayList 为什么要在遍历读取时验证版本号?
首先搞清楚为什么要验证版本号,是为了避免读取过程中并发出现的写入操作,这会造成读取结果的未知(索引越界、链式结构死循环、重复读取或遗漏等问题)
其次为什么只在遍历读取时验证,因为单个读取的结果是确定的,要么存在要么不存在,没有遍历读取时那么混乱的情况
注:modCount记录的是数据结构变更次数,说人话就是记录增删操作,不记录更新操作
乐观锁机制是什么?
乐观锁本身只是一种并发策略,它总是假设并发冲突的概率很低,在提交操作时才进行检查,通常借助版本号实现
fail-fast机制是一种乐观锁,但其在版本号不一致时会报错
CAS(Compare And Swap)机制本质也是借助版本号的乐观锁(版本号就是其自身的旧值),但其在版本号不一致时会进行重试
版本号是自身旧值和单独变量负责有什么区别?前者只能确保值相同(值相同也可能是修改过又修改回来,即ABA问题),后者才从根本上确保值未被修改
为什么要读写分离?
实践中读取操作通常远多于写入操作,因此 CopyOnWriteArrayList 对读操作和写操作进行了分别对待
场景 读写比(估计) 说明 缓存系统(如本地缓存) 99%+ 读 写入只发生在缓存未命中或刷新时 配置中心 95%+ 读 配置极少变更,频繁读取 事件监听器列表(如 GUI、Spring 事件) 90%+ 读 添加/删除监听器较少,触发事件频繁 注册表(服务发现、Bean 容器) 80~90% 读 初始化后基本稳定 日志记录器 70~80% 读 多线程日志输出频繁,但配置变动少 并发任务调度器 60~70% 读 每次调度前读取状态,更新较少
写时复制不是也无法保证数据一致性吗?
写时复制确实不能保证数据的强一致性,因为它读取的总是旧版本,但它避免了读取时的不确定性,起码读取到的是一个确定的结果,且不影响后续写入的一致性
根据性能取舍,虽然它不如读写全部采用同步方法的一致性强,但为了效率是可以接收的,实践中读取到旧版本影响并不是很大
CopyOnWriteArraySet
package xyz.ssydx.collections;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
// 集合不安全
public class TestSet {
public static void main(String[] args) {
// // HashSet 的底层是 HashMap, 仅仅是把值都设置new Object(), 不安全的原因同 ArrayList
// HashSet<String> set = new HashSet<>();
// // 同理
// Set<Object> set = Collections.synchronizedSet(new HashSet<>());
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString());
System.out.println(set);
}).start();
}
}
}
ConcurrentHashMap(含底层探究)
package xyz.ssydx.collections;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
// 映射不安全
public class TestMap {
public static void main(String[] args) {
// Map<String, Integer> map = new HashMap<>();
// Map<Object, Object> map = Collections.synchronizedMap(new HashMap<>());
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
map.put(UUID.randomUUID().toString(), finalI);
System.out.println(map);
}).start();
}
}
}
HashMap是如何实现的?
HashMap是由数组和链表组成的,数组的每个元素叫桶,数组的长度就是HashMap的容量
桶中是一个链表或红黑树结构,JDK8及以后在链表长度超过8时转为红黑树,JDK7及之前只有链表
当添加一个新键值对时(假定它的键不与已知键重复),会先对键的哈希码进行扰动处理,再利用位运算对进行取模,然后决定放到哪个桶中,放入后会追加到对应的链表或红黑树中
如果达到扩容阈值,就会扩充桶数,并重新进行散列,这是一个耗时操作,时间复杂度为O(n)
扰动处理是为了避免哈希码的高位或低位变化少产生过多的哈希冲突
位运算取模 (桶数 - 1) & 哈希码 也是把容量设置2的次方的原因
HashMap的两个参数是干什么的?
HashMap有两个默认参数,一个是初始容量,一个是负载因子
初始容量是16,建议总是使其为2的次方(实际上如果不是,内部也会自动调整为2的次方)
负载因子是0.75,它决定了当键值对的数量达到桶数的多少后进行扩容
过大的容量会导致空间浪费,过小的容量会导致频繁扩容
较高的负载因子节省空间但更易造成哈希冲突进而拖慢查找效率(如果出现冲突会加入链表或红黑树中,这两者的查找时间复杂度为O(N)和O(logN),低于数组的O(1)),较低的负载因子浪费空间但降低了哈希冲突的概率
0.75(官方推荐)被视为折中的选择,即桶数稍大于元素个数
核心辅助类
CountDownLatch
package xyz.ssydx.other;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 计数器, 确保只有当计数器归零后countDownLatch.await();后的操作才可以执行
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println("hello world");
}
}
CyclicBarrier
package xyz.ssydx.other;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
// 用于确保每当指定数量的线程调用cyclicBarrier.await();就阻塞这些线程直到执行这个屏障点后再继续执行这些线程的后续操作
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{
System.out.println("hello world");
});
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
try {
System.out.println("第一阶段" + finalI);
cyclicBarrier.await();
System.out.println("第二阶段" + finalI);
cyclicBarrier.await();
System.out.println("第三阶段" + finalI);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
},String.valueOf(i)).start();
}
}
}
Semaphore
package xyz.ssydx.other;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
// 限制并发线程数量
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
semaphore.release();
}).start();
}
}
}
总结
CountDownLatch |
等待一组线程完成后再继续执行 | 不可重用 | 不支持 | 主线程等待子线程完成 |
CyclicBarrier |
多个线程互相等待,全部到达后继续 | 可重用 | 支持 | 分阶段协同任务、多线程同步 |
Semaphore |
控制并发线程数量(资源访问控制) | 可重用 | 不支持 | 资源池、限流、互斥 |
读写锁
package xyz.ssydx.readwrite;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Test {
public static void main(String[] args) throws InterruptedException {
Data data = new Data();
for (int i = 0; i < 3; i++) {
int finalI = i;
new Thread(() -> {
data.add(UUID.randomUUID().toString());
}, "write" + i).start();
TimeUnit.MILLISECONDS.sleep(100);
new Thread(() -> {
data.get(finalI);
}, "read" + i).start();
}
/*
控制台输出如下(仅供示例, 实际上这种效果有时并不容易复现):
该结果说明了读-写锁不可共存, 但写-写锁可共存, 写-写锁也不可共存
read0 读取开始
read0 读取完成
write0 写入开始
write0 写入完成
write1 写入开始
write1 写入完成
write2 写入开始
write2 写入完成
read1 读取开始
read2 读取开始
read2 读取完成
read1 读取完成
*/
}
}
class Data {
private List<String> list = new ArrayList<>();
// 读写锁, 读锁(共享锁, 几个线程可以同时获得), 写锁(独占锁, 一次只能一个线程获得), 读锁和写锁是互相排斥的
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void add(String str) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 写入开始");
list.add(str);
System.out.println(Thread.currentThread().getName() + " 写入完成");
} finally {
lock.writeLock().unlock();
}
}
public String get(int index) {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 读取开始");
String string = "";
if (index < list.size()) {
string = list.get(index);
}
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " 读取完成");
return string;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.readLock().unlock();
}
}
}
阻塞队列
无限等待 | put | take | |
限时等待 | offer | poll | |
抛出异常 | add、addAll | remove、removeAll、RemoveIf | element |
不抛出异常 | offer | poll | peek |
同步队列
package xyz.ssydx.blocking;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class Test4 {
public static void main(String[] args) throws InterruptedException {
// 虽然其看起来像容量为1的阻塞队列, 但本质并不是, 它在执行插入操作时会阻塞, 直到有另一线程进行获取, 反之亦然
SynchronousQueue<String> strings = new SynchronousQueue<>();
new Thread(() -> {
for (String s : Arrays.asList("a", "b", "c")) {
try {
System.out.println("插入: " + s);
strings.put(s);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("取出: " + strings.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("取出: " + strings.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("取出: " + strings.poll());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
池化
通过复用减少频繁的创建和销毁,减少资源消耗,提高响应速度,提供统一管理
池化的应用有连接池、线程池等
默认线程池
package xyz.ssydx.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test1 {
public static void main(String[] args) {
// ExecutorService executorService = Executors.newSingleThreadExecutor(); // 只有一个线程的线程池
// ExecutorService executorService = Executors.newFixedThreadPool(3); // 固定大小的线程池
ExecutorService executorService = Executors.newCachedThreadPool(); // 动态伸缩的线程池
try {
for (int i = 0; i < 100; i++) {
executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
}
} finally {
executorService.shutdown();
}
}
}
自定义线程池
package xyz.ssydx.pool;
import java.util.concurrent.*;
public class Test2 {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
// 初始容量, 最大容量
3, 5,
// 线程存活期, 针对动态扩展的线程, 即最大容量减初始容量, 多久没被使用后自动销毁
60L, TimeUnit.SECONDS,
// 等待区容量
new ArrayBlockingQueue<Runnable>(5),
// 线程工厂, 较少更改
Executors.defaultThreadFactory(),
// 当任务数量 > 线程池的最大容量+阻塞队列的容量, 就需要考虑策略问题了
// 策略: 默认报错, 即 AbortPolicy;
// 原路返回, 即 CallerRunsPolicy;
// 直接抛弃, 即 DiscardPolicy;
// 抛弃队列中最老的任务(即队首的), 即 DiscardOldestPolicy
// 直接抛弃和抛弃最老的形象理解: 前者挤不进等候队列会直接抛弃, 后者则强行挤入队尾同时造成队首的被挤出抛弃
new ThreadPoolExecutor.DiscardOldestPolicy()
);
try {
for (int i = 0; i < 11; i++) {
int finalI = i;
threadPoolExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + " " + finalI));
}
} finally {
threadPoolExecutor.shutdown();
}
}
}
死锁
问题
某一个同步块拥有两个及以上的监视对象,就可能出现两个线程各自持有一个对象的锁,但都无法获取到另一个对象的锁,造成僵持
必要条件
死锁发生必须同时满足四个条件,又称Coffman条件,打破其一即可避免死锁
互斥条件(Mutual Exclusion)
某些资源只能被一个线程占用,即一次只有一个线程可以使用该资源。如果另一个线程请求该资源,则必须等待,直到该资源被释放。
占有并等待条件(Hold and Wait)
一个线程已经占有了至少一个资源,但又提出了对新资源的请求。然而,这个新资源已经被其他线程占有,导致当前线程处于等待状态,同时它也不愿意释放自己已占有的资源。
不可剥夺条件(No Preemption)
资源不能被强制从某个线程中夺走,只能由该线程自行释放。这意味着一旦一个线程获得了某些资源,除非该线程主动放弃这些资源(比如完成任务后正常释放),否则这些资源将一直保持被该线程占用的状态。
循环等待条件(Circular Wait)
存在一个线程的环形等待序列,其中每个线程都在等待下一个线程所持有的资源。具体来说,存在一组等待获取资源的线程 {T0, T1, ..., Tn},使得 T0 等待 T1 占有的资源,T1 等待 T2 占有的资源,...,Tn-1 等待 Tn 占有的资源,而 Tn 又等待 T0 占有的资源。
方案
打破互斥条件
尽量减少使用独占性资源。对于一些资源,如果可以设计成允许共享访问,则可以避免这个问题。但是,在很多情况下,这是不可能的,因为有些资源本质上就是不可共享的(例如打印机)。
打破占有并等待条件
一种方法是要求所有线程在开始执行之前一次性申请所有需要的资源。
另一种方法是实现一种机制,允许操作系统或运行时环境抢占资源(但这可能导致复杂性和潜在的数据一致性问题)。
打破不可剥夺条件
允许系统强制回收资源。例如,当一个线程请求一个新的资源而暂时无法获得时,它可以释放所有已经拥有的资源,并在稍后重新申请它们。这种方法可能会引入额外的开销和复杂性。
打破循环等待条件
通过给资源分配顺序编号,并要求线程按照编号从小到大的顺序请求资源,这样就可以避免形成循环等待链。这是一种常见的策略,虽然简单但有效。
实践
始终按照相同的顺序获取锁:确保所有线程都以固定的、相同的顺序获取锁。这可以防止循环等待条件的发生,而循环等待正是死锁的一个必要条件。 尝试使用超时机制:在尝试获取锁时设置一个超时时间。如果在指定时间内未能获取到锁,则放弃尝试并进行适当的错误处理。 尽量减少锁的范围:只在必要的时候持有锁,并尽快释放它。这样可以减少并发冲突的机会。 使用显式的锁对象如 ReentrantLock:ReentrantLock 提供了比内置的 synchronized 更灵活的锁定机制,包括可中断的锁等待和定时锁等待等特性。 使用更高层次的并发工具:例如 java.util.concurrent 包中的类,如 ConcurrentHashMap 或者 ReadWriteLock,它们可以在很多情况下帮助你避免直接管理锁带来的复杂性。
#java##并发编程#本专栏包含Java、MySQL、JavaWeb、Spring、Redis、Docker等等,作为个人学习记录及知识总结,将长期进行更新! 如果浏览、点赞、收藏、订阅过少,考虑停更或删除了(😄有点打击积极性)