并发编程(JUC)
2025-06-10更新
基本概念
JUC
JUC 就是指 java.util.concurrent 包,这个包提供了一系列的并发工具类及接口,可以简化多线程编程的过程,提高开发效率
进程
process,是正在执行的程序实例,是操作系统进行资源分配和调度的独立单位,每个进程都有自己独立的内存空间、系统资源以及一个唯一的进程标识符(PID)。
- 每个进程都有独立的内存空间(代码段、数据段、堆栈等)。
- 进程之间的通信需要通过进程间通信机制(IPC),如管道、共享内存、消息队列等。
- 进程的创建成本相对较高,因为需要分配独立的内存空间和系统资源。
线程
thread,是进程中执行的一个实体,是CPU调度和分派的基本单位。一个进程可以包含多个线程,这些线程共享进程的资源,如内存地址空间、打开的文件等。
- 同一个进程内的所有线程共享该进程的内存空间和其他资源,这使得线程间通信更加高效。
- 创建线程的成本比创建进程低得多,因为线程之间不需要单独分配内存地址空间。
- 多线程编程可以用来实现并发处理,提高程序的执行效率,特别是在I/O操作频繁的情况下。
线程分为用户线程和守护线程,守护线程会在用户线程运行期间一直运行,但一旦所有用户线程都结束守护线程会自动死亡。
区别
- 资源开销:启动一个新的进程比启动一个新的线程需要更多的资源和时间。
- 内存空间:每个进程都有自己独立的内存空间,而线程共享其所属进程的内存空间。
- 数据共享与通信:由于线程共享同一进程的资源,因此线程间的通信比进程间通信更简单高效。
- 系统影响:线程切换的开销比进程切换小,因为线程不涉及内存管理方面的切换。
进程是资源分配的基本单位,线程是CPU调度的基本单位。
一个进程可以有多个线程,线程比进程更轻量、切换更快、通信更方便。
JAVA默认具有两个线程,main 和 gc
锁
锁(Lock)是并发编程中的一种同步机制,用于管理对共享资源的访问。在多线程环境中,多个线程可能同时尝试访问或修改同一个资源,这可能会导致数据不一致或其他并发问题。锁提供了一种手段来确保在任何时刻只有一个线程能够访问特定的资源或执行一段代码,从而避免这些问题。
锁具有三个基本性质:互斥性、原子性、可见性
锁的种类有:互斥锁、读写锁、自旋锁、可重入锁等
锁就是用于控制多线程并发访问时对共享资源的访问权限,确保同一时间只有一个线程可以操作资源,从而保证线程安全。
并发和并行
并发 | Concurrency | 多个任务在重叠的时间段内执行,不一定同时发生,强调任务的调度与交替执行。 |
并行 | Parallelism | 多个任务真正的同时执行,通常发生在多核 CPU 或多台机器上,强调任务的同时执行。 |
并发是一个时间段内多个任务交替执行(可以是单核),并行是同一时刻多个任务真正同时执行(需要多核)。
并发编程
并发编程是编写可以同时处理多个任务的程序的技术和方法。
核心目的是:充分利用CPU资源,提高程序响应性和交互性。
线程的生命周期
NEW | 线程对象已创建,但还未调用 start() 方法 |
RUNNABLE | 线程正在 JVM 中运行(可能在运行、也可能等待操作系统调度) |
BLOCKED | 线程正在等待获取一个监视器锁以进入同步块/方法 |
WAITING | 线程无限等待另一个线程执行特定操作(如 join() 、wait() ) |
TIMED_WAITING | 线程在限时等待(如调用了 sleep(long) 、wait(long) 、join(long) ) |
TERMINATED | 线程已经执行完毕 |
注意:一个线程一旦变为TERMINATED就不可再重新执行,必须要重新创建线程
wait和sleep
所属类 | Object |
Thread |
是否需要同步上下文 | 是 | 否 |
是否释放锁 | 是 | 否 |
如何唤醒 | notify() /notifyAll() 或超时 |
时间到期 |
创建线程
本质只有一种方式:new Thread()
Thread
package xyz.ssydx.thread;
public class Thread1 extends Thread {
private Integer cnt;
public Thread1(Integer cnt) {
this.cnt = cnt;
}
@Override
public void run() {
for (int i = 0; i < cnt; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(i + " " + Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
Thread1 thread1 = new Thread1(10);
thread1.setName("Thread1");
thread1.start();
for (int i = 0; i < 10; i++) {
Thread.sleep(10);
System.out.println(i + " " + Thread.currentThread().getName());
}
}
}
Runnable
package xyz.ssydx.thread;
public class Thread2 implements Runnable {
private Integer cnt;
public Thread2(Integer cnt) {
this.cnt = cnt;
}
@Override
public void run() {
for (int i = 0; i < cnt; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(i + " " + Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
Thread thread2 = new Thread(new Thread2(10), "Thread2");
thread2.start();
for (int i = 0; i < 10; i++) {
Thread.sleep(10);
System.out.println(i + " " + Thread.currentThread().getName());
}
}
}
Runnable函数式接口形式(Lambda表达式)
package xyz.ssydx.thread;
public class Thread3 {
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(i + " " + Thread.currentThread().getName());
}
}, "Thread3").start();
for (int i = 0; i < 10; i++) {
Thread.sleep(10);
System.out.println(i + " " + Thread.currentThread().getName());
}
}
}
Callable
和Runnable不同,它可以抛出异常,也可以有返回值
package xyz.ssydx.thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class Thread4 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCall myCall = new MyCall();
// 由于Callable具备返回值, 具有异步特性(执行过程是异步的, 但获取结果是阻塞的), 而Thread 本身只接收 Runnable, 必须借助适配器(这里是 FutureTask) 进行适配
FutureTask<String> futureTask = new FutureTask<>(myCall);
new Thread(futureTask).start();
System.out.println("hello ssydx");
// 可以指定等待时间, 超时会出现超时异常, 注意该方法会阻塞后续的操作直到获取返回值
String string = futureTask.get();
System.out.println(string);
System.out.println("hello world");
}
}
class MyCall implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(1);
int sum = 0;
for (int i = 0; i < 10; i++) {
sum += i;
}
return String.valueOf(sum);
}
}
synchronized 和 ReentrantLock
synchronized 修饰方法时表示的是给实例或类加锁,有时未必是共享资源,因此最好采用同步块方式加锁
单线程
package xyz.ssydx.lock;
public class Single {
public static void main(String[] args) {
Ticket1 ticket = new Ticket1();
for (int i = 0; i < 60; i++) ticket.sale();
}
}
// 另一主类, 实际不在同一文件中
public class Multi {
public static void main(String[] args) {
Ticket1 ticket = new Ticket1();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "A").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "B").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "C").start();
}
}
class Ticket1 {
private int num = 50;
public void sale() {
if (num > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " " + (50 - (--num)));
}
}
}
synchronized
package xyz.ssydx.lock;
public class Synch {
public static void main(String[] args) {
Ticket2 ticket = new Ticket2();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "A").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "B").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "C").start();
}
}
class Ticket2 {
private int num = 50;
public synchronized void sale() {
if (num > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " " + (50 - (--num)));
}
}
}
ReentrantLock
package xyz.ssydx.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Reentrant {
public static void main(String[] args) {
Ticket3 ticket = new Ticket3();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "A").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "B").start();
new Thread(() -> {for (int i = 0; i < 60; i++) ticket.sale();}, "C").start();
}
}
class Ticket3 {
private int num = 50;
Lock lock = new ReentrantLock();
public void sale() {
lock.lock();
try {
if (num > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " " + (50 - (--num)));
}
} finally {
lock.unlock();
}
}
}
总结
语法 | 内置语言特性,使用简单(方法或代码块级别)。 | 需要显式创建 Lock 对象,并手动调用 lock() 和 unlock() 方法。 |
自动释放锁 | 当执行流离开同步区域时,锁会自动释放,即使发生异常也会正确释放锁。 | 必须在 finally 块中手动释放锁 (unlock() ),以确保即使发生异常也能正确释放锁。 |
尝试获取锁 | 不支持尝试获取锁。 | 支持通过 tryLock() 尝试获取锁,可以选择带超时的版本。 |
可中断获取锁 | 不支持可中断获取锁。 | 支持通过 lockInterruptibly() 获取锁,允许在等待时响应中断。 |
公平性 | 不支持设置公平性,默认行为类似于非公平锁。 | 可以通过构造函数参数设置为公平锁或非公平锁(默认是非公平)。 |
条件变量 | 使用 wait() 、notify() 和 notifyAll() 实现线程间的通信。 |
提供了 newCondition() 方法来创建 Condition 对象,提供了更灵活的线程间通信机制。 |
性能 | 在 Java 6 之后,性能得到了显著提升,在某些情况下与 ReentrantLock 相近。 |
性能通常较好,特别是在需要高级特性(如公平锁、尝试获取锁等)时表现更优。 |
重入性 | 支持重入,即同一个线程可以多次获得同一个锁。 | 同样支持重入,且提供了更多的功能来管理锁的状态和行为。 |
监控和管理 | JVM 提供了一些工具用于监控和管理 synchronized 锁的状态。 |
提供了更多方法来检查锁的状态(如是否持有锁、是否有等待队列等)。 |
获取锁状态判断 | 不直接提供获取锁状态的方法。可以通过反射和JVM内部API间接实现,但不推荐。 | 提供了多种方法来查询锁的状态,例如 isHeldByCurrentThread() 、getHoldCount() 等。 |
公平锁和非公平锁
是否保证顺序 | 是 | 否 |
是否支持插队 | 否 | 是 |
是否容易出现饥饿 | 不容易 | 可能出现 |
性能 | 相对较低(需维护有序队列) | 相对较高(减少上下文切换) |
适用场景 | 对公平性要求高,如银行交易排队系统 | 对性能要求高,如Web服务器处理请求 |
公平锁:按照线程请求锁的顺序进行分配,即先请求的线程先获得锁。可以避免饥饿现象,确保每个线程都有机会执行,但执行效率低(需要维护一个有序的线程队列)
设想存在一个线程A执行需要3分钟,另一线程B只需3秒钟,此时假如A先请求锁,其必然先获得锁,但此时B的短时任务却需要等待3分钟,这有时并不合理(降低任务吞吐量),因此往往采用非公平锁
非公平锁:不按照请求锁的顺序进行分配,而是随机选取一个请求锁的线程,客观减少了短任务的等待时长,但也可能出现部分任务长时间获取不到锁(饥饿现象)
消费者和生产者(线程通信)
流程是:条件等待,执行业务,唤醒其他
虚假唤醒
线程没有被notify却醒来叫做虚假唤醒,虚假唤醒本质是JVM或操作系统的底层实现导致的,是合法行为,应该在代码层次予以控制,一般来说就是在条件等待中,不要使用if进行判断,而是使用while进行判断
注意:虚假唤醒和java代码本身无关,不是notify(随机唤醒单个线程)或notifyAll(唤醒所有)造成的,也和线程数量无关
为什么使用while可以避免?它保证了即使被虚假唤醒后依然会重新进行while循环进行判断是否接着等待,而if在休眠期被唤醒后将不再重新进行休眠判断
synchronized 1
package xyz.ssydx.pc;
public class Pc1 {
public static void main(String[] args) {
Data1 data1 = new Data1();
new Thread(() -> { for (int i = 0; i < 10; i++) data1.increment(); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data1.decrement(); }, "C1").start();
}
}
class Data1 {
private int data = 0;
public synchronized void increment() {
if (data != 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
this.notifyAll();
}
public synchronized void decrement() {
if (data == 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
this.notifyAll();
}
}
synchronized 2
package xyz.ssydx.pc;
public class Pc2 {
public static void main(String[] args) {
Data2 data2 = new Data2();
new Thread(() -> { for (int i = 0; i < 10; i++) data2.increment(); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data2.increment(); }, "P2").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data2.decrement(); }, "C1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data2.decrement(); }, "C2").start();
}
}
class Data2 {
private int data = 0;
public synchronized void increment() {
while (data != 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
this.notifyAll();
}
public synchronized void decrement() {
while (data == 0) {
try {
this.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
this.notifyAll();
}
}
ReentrantLock 1
package xyz.ssydx.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Pc3 {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(() -> { for (int i = 0; i < 10; i++) data3.increment(); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data3.increment(); }, "P2").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data3.decrement(); }, "C1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data3.decrement(); }, "C2").start();
}
}
class Data3 {
private int data = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
while (data != 0) {
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
condition.signal();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (data == 0) {
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
condition.signal();
} finally {
lock.unlock();
}
}
}
ReentrantLock 2
package xyz.ssydx.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Pc4 {
public static void main(String[] args) {
Data4 data4 = new Data4();
new Thread(() -> { for (int i = 0; i < 10; i++) data4.increment(); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data4.increment(); }, "P2").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data4.decrement(); }, "C1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data4.decrement(); }, "C2").start();
}
}
class Data4 {
private int data = 0;
Lock lock = new ReentrantLock();
Condition producer = lock.newCondition();
Condition consumer = lock.newCondition();
public void increment() {
lock.lock();
try {
while (data != 0) {
try {
producer.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
consumer.signal();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (data == 0) {
try {
consumer.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
producer.signal();
} finally {
lock.unlock();
}
}
}
ReentrantLock 3
package xyz.ssydx.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Pc5 {
public static void main(String[] args) {
Data5 data5 = new Data5();
new Thread(() -> { for (int i = 0; i < 10; i++) data5.increment(0); }, "P1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data5.increment(2); }, "P2").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data5.decrement(1); }, "C1").start();
new Thread(() -> { for (int i = 0; i < 10; i++) data5.decrement(3); }, "C2").start();
}
}
class Data5 {
private int data = 0;
private final Lock lock = new ReentrantLock();
private final Condition[] conditions = new Condition[4];
private int state = 0; // 当前允许运行的线程序号
public Data5() {
for (int i = 0; i < 4; i++) {
conditions[i] = lock.newCondition();
}
}
// 生产方法
public void increment(int threadId) {
lock.lock();
try {
while (state != threadId) {
conditions[threadId].await(); // 等待轮到自己
}
System.out.println(Thread.currentThread().getName() + ":" + ++data);
// 更新状态并唤醒下一个线程
state = (state + 1) % 4;
conditions[state].signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
// 消费方法
public void decrement(int threadId) {
lock.lock();
try {
while (state != threadId) {
conditions[threadId].await(); // 等待轮到自己
}
System.out.println(Thread.currentThread().getName() + ":" + --data);
// 更新状态并唤醒下一个线程
state = (state + 1) % 4;
conditions[state].signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
锁的是谁?
同一实例的多个非静态同步方法共用一把锁
不同实例不共用锁
同一类的多个静态同步方法共用一把锁
同一实例 + 多个 synchronized 方法 |
this (实例对象) |
是 |
不同实例 + synchronized 方法 |
各自的 this |
否 |
static synchronized 方法 |
类.class 对象 |
是(所有该类的实例共享) |
测试1
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 非静态方法的 synchronized 锁的是 this, 即实例
// A先获得实例的锁, 所以 A 先执行
// send msg
// call
public class Test1 {
public static void main(String[] args) throws InterruptedException {
Phone1 phone = new Phone1();
new Thread(()-> phone.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> phone.call(),"B").start();
}
}
class Phone1 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public synchronized void call() {
System.out.println("call");
}
}
测试2
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 静态方法的 synchronized 锁的是 Class, 即类
// A先获得类的锁, 所以 A 先执行
// send msg
// call
public class Test2 {
public static void main(String[] args) throws InterruptedException {
new Thread(()-> Phone2.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> Phone2.call(),"B").start();
}
}
class Phone2 {
public static synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public static synchronized void call() {
System.out.println("call");
}
}
测试3
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 非静态方法的 synchronized 锁的是 this, 即实例
// 静态方法的 synchronized 锁的是 Class, 即类
// A先获得实例的锁, 睡眠时间 2s 后输出, B睡眠 1s 后获得类的锁并输出
// 因此 B 先于 A
// call
// send msg
public class Test3 {
public static void main(String[] args) throws InterruptedException {
Phone3 phone = new Phone3();
new Thread(()-> phone.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> Phone3.call(),"B").start();
}
}
class Phone3 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public static synchronized void call() {
System.out.println("call");
}
}
测试4
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 非静态方法的 synchronized 锁的是 this, 即实例
// A先获得实例的锁, 睡眠时间 2s 后输出, B睡眠 1s 后无需实例的锁就输出
// 因此 B 先于 A
// call
// send msg
public class Test4 {
public static void main(String[] args) throws InterruptedException {
Phone4 phone = new Phone4();
new Thread(()-> phone.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> phone.call(),"B").start();
}
}
class Phone4 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public void call() {
System.out.println("call");
}
}
测试5
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 静态方法的 synchronized 锁的是 Class, 即类
// A先获得类的锁, 睡眠时间 2s 后输出, B睡眠 1s 后无需类的锁就输出
// 因此 B 先于 A
// call
// send msg
public class Test5 {
public static void main(String[] args) throws InterruptedException {
Phone5 phone = new Phone5();
new Thread(()-> phone.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> phone.call(),"B").start();
}
}
class Phone5 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public void call() {
System.out.println("call");
}
}
测试6
package xyz.ssydx.lockpro;
import java.util.concurrent.TimeUnit;
// 非静态方法的 synchronized 锁的是 this, 即实例
// A先获得实例phone1的锁, 睡眠时间 2s 后输出, B睡眠 1s 后获得实例phone2的锁就输出, 两个实例不共用锁
// 因此 B 先于 A
// call
// send msg
public class Test6 {
public static void main(String[] args) throws InterruptedException {
Phone6 phone1 = new Phone6();
Phone6 phone2 = new Phone6();
new Thread(()-> phone1.sendMsg(),"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> phone2.call(),"B").start();
}
}
class Phone6 {
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("send msg");
}
public synchronized void call() {
System.out.println("call");
}
}
并发集合
如果有数据结构和操作系统相关知识,这部分是比较好理解的
CopyOnWriteArrayList(含底层探究)
package xyz.ssydx.collections;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
// 列表不安全
public class TestList {
public static void main(String[] args) {
// // 不安全, 并发读写存在问题, 底层采用了类似乐观锁(fail-fast)的机制, 读取(仅限迭代器遍历)或写入时都会检查版本号 modCount, 不一致则报错
// List<String> list = new ArrayList<>();
// // 安全, 原因是其读写方法都使用了 Synchronized 关键字添加了隐式锁(逐渐被淘汰也主要是这个原因, 读写都加锁造成性能不佳)
// Vector<String> list = new Vector<>();
// // 安全, 原因是被转换为了同步类(把读写方法包装为了同步方法), 原理基本同 Vector
// List<String> list = Collections.synchronizedList(new ArrayList<String>());
// 写时复制, 每次写入都是复制一个新的数组(写入操作添加显式锁 ReentrantLock), 修改后把原数组指针指向这个新的数组, 确保了不会同时写入, 同时不影响读取操作
// 内部使用 volatile 关键字用于确保数组的可见性(即每次写入都立即刷新到内存, 每次读取都从内存中读取最新版本)
// 内部使用 ReentrantLock 对写入操作加锁
// 由于使用写时复制, 其写入成本较高, 适合读多写少(这也是最常见的场景)
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString());
// 增加打印的操作(会遍历列表), 否则可能不易发生并发修改异常
System.out.println(list);
}).start();
}
}
}
ArrayList 为什么要在遍历读取时验证版本号?
首先搞清楚为什么要验证版本号,是为了避免读取过程中并发出现的写入操作,这会造成读取结果的未知(索引越界、链式结构死循环、重复读取或遗漏等问题)
其次为什么只在遍历读取时验证,因为单个读取的结果是确定的,要么存在要么不存在,没有遍历读取时那么混乱的情况
注:modCount记录的是数据结构变更次数,说人话就是记录增删操作,不记录更新操作
乐观锁机制是什么?
乐观锁本身只是一种并发策略,它总是假设并发冲突的概率很低,在提交操作时才进行检查,通常借助版本号实现
fail-fast机制是一种乐观锁,但其在版本号不一致时会报错
CAS(Compare And Swap)机制本质也是借助版本号的乐观锁(版本号就是其自身的旧值),但其在版本号不一致时会进行重试
版本号是自身旧值和单独变量负责有什么区别?前者只能确保值相同(值相同也可能是修改过又修改回来,即ABA问题),后者才从根本上确保值未被修改
为什么要读写分离?
实践中读取操作通常远多于写入操作,因此 CopyOnWriteArrayList 对读操作和写操作进行了分别对待
场景 读写比(估计) 说明 缓存系统(如本地缓存) 99%+ 读 写入只发生在缓存未命中或刷新时 配置中心 95%+ 读 配置极少变更,频繁读取 事件监听器列表(如 GUI、Spring 事件) 90%+ 读 添加/删除监听器较少,触发事件频繁 注册表(服务发现、Bean 容器) 80~90% 读 初始化后基本稳定 日志记录器 70~80% 读 多线程日志输出频繁,但配置变动少 并发任务调度器 60~70% 读 每次调度前读取状态,更新较少
写时复制不是也无法保证数据一致性吗?
写时复制确实不能保证数据的强一致性,因为它读取的总是旧版本,但它避免了读取时的不确定性,起码读取到的是一个确定的结果,且不影响后续写入的一致性
根据性能取舍,虽然它不如读写全部采用同步方法的一致性强,但为了效率是可以接收的,实践中读取到旧版本影响并不是很大
CopyOnWriteArraySet
package xyz.ssydx.collections;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
// 集合不安全
public class TestSet {
public static void main(String[] args) {
// // HashSet 的底层是 HashMap, 仅仅是把值都设置new Object(), 不安全的原因同 ArrayList
// HashSet<String> set = new HashSet<>();
// // 同理
// Set<Object> set = Collections.synchronizedSet(new HashSet<>());
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString());
System.out.println(set);
}).start();
}
}
}
ConcurrentHashMap(含底层探究)
package xyz.ssydx.collections;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
// 映射不安全
public class TestMap {
public static void main(String[] args) {
// Map<String, Integer> map = new HashMap<>();
// Map<Object, Object> map = Collections.synchronizedMap(new HashMap<>());
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 10; i++) {
int finalI = i;
new Thread(() -> {
map.put(UUID.randomUUID().toString(), finalI);
System.out.println(map);
}).start();
}
}
}
HashMap是如何实现的?
HashMap是由数组和链表组成的,数组的每个元素叫桶,数组的长度就是HashMap的容量
桶中是一个链表或红黑树结构,JDK8及以后在链表长度超过8时转为红黑树,JDK7及之前只有链表
当添加一个新键值对时(假定它的键不与已知键重复),会先对键的哈希码进行扰动处理,再利用位运算对进行取模,然后决定放到哪个桶中,放入后会追加到对应的链表或红黑树中
如果达到扩容阈值,就会扩充桶数,并重新进行散列,这是一个耗时操作,时间复杂度为O(n)
扰动处理是为了避免哈希码的高位或低位变化少产生过多的哈希冲突
位运算取模 (桶数 - 1) & 哈希码 也是把容量设置2的次方的原因
HashMap的两个参数是干什么的?
HashMap有两个默认参数,一个是初始容量,一个是负载因子
初始容量是16,建议总是使其为2的次方(实际上如果不是,内部也会自动调整为2的次方)
负载因子是0.75,它决定了当键值对的数量达到桶数的多少后进行扩容
过大的容量会导致空间浪费,过小的容量会导致频繁扩容
较高的负载因子节省空间但更易造成哈希冲突进而拖慢查找效率(如果出现冲突会加入链表或红黑树中,这两者的查找时间复杂度为O(N)和O(logN),低于数组的O(1)),较低的负载因子浪费空间但降低了哈希冲突的概率
0.75(官方推荐)被视为折中的选择,即桶数稍大于元素个数
核心辅助类
CountDownLatch
package xyz.ssydx.other;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 计数器, 确保只有当计数器归零后countDownLatch.await();后的操作才可以执行
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println("hello world");
}
}
CyclicBarrier
package xyz.ssydx.other;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
// 用于确保每当指定数量的线程调用cyclicBarrier.await();就阻塞这些线程直到执行这个屏障点后再继续执行这些线程的后续操作
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{
System.out.println("hello world");
});
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
try {
System.out.println("第一阶段" + finalI);
cyclicBarrier.await();
System.out.println("第二阶段" + finalI);
cyclicBarrier.await();
System.out.println("第三阶段" + finalI);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
},String.valueOf(i)).start();
}
}
}
Semaphore
package xyz.ssydx.other;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
// 限制并发线程数量
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
semaphore.release();
}).start();
}
}
}
总结
CountDownLatch |
等待一组线程完成后再继续执行 | 不可重用 | 不支持 | 主线程等待子线程完成 |
CyclicBarrier |
多个线程互相等待,全部到达后继续 | 可重用 | 支持 | 分阶段协同任务、多线程同步 |
Semaphore |
控制并发线程数量(资源访问控制) | 可重用 | 不支持 | 资源池、限流、互斥 |
读写锁
package xyz.ssydx.readwrite;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Test {
public static void main(String[] args) throws InterruptedException {
Data data = new Data();
for (int i = 0; i < 3; i++) {
int finalI = i;
new Thread(() -> {
data.add(UUID.randomUUID().toString());
}, "write" + i).start();
TimeUnit.MILLISECONDS.sleep(100);
new Thread(() -> {
data.get(finalI);
}, "read" + i).start();
}
/*
控制台输出如下(仅供示例, 实际上这种效果有时并不容易复现):
该结果说明了读-写锁不可共存, 但写-写锁可共存, 写-写锁也不可共存
read0 读取开始
read0 读取完成
write0 写入开始
write0 写入完成
write1 写入开始
write1 写入完成
write2 写入开始
write2 写入完成
read1 读取开始
read2 读取开始
read2 读取完成
read1 读取完成
*/
}
}
class Data {
private List<String> list = new ArrayList<>();
// 读写锁, 读锁(共享锁, 几个线程可以同时获得), 写锁(独占锁, 一次只能一个线程获得), 读锁和写锁是互相排斥的
private ReadWriteLock lock = new ReentrantReadWriteLock();
public void add(String str) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 写入开始");
list.add(str);
System.out.println(Thread.currentThread().getName() + " 写入完成");
} finally {
lock.writeLock().unlock();
}
}
public String get(int index) {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " 读取开始");
String string = "";
if (index < list.size()) {
string = list.get(index);
}
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " 读取完成");
return string;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.readLock().unlock();
}
}
}
阻塞队列
Test1
package xyz.ssydx.blocking;
import java.util.concurrent.ArrayBlockingQueue;
public class Test1 {
public static void main(String[] args) {
ArrayBlockingQueue<String> strings = new ArrayBlockingQueue<String>(3);
// strings.element(); // NoSuchElementException
System.out.println(strings.peek());
strings.add("a");
strings.add("b");
strings.add("c");
// strings.add("d"); // IllegalStateException
System.out.println(strings.offer("d"));
System.out.println(strings);
strings.remove();
strings.remove();
strings.remove();
// strings.remove(); // NoSuchElementException
System.out.println(strings.poll());
System.out.println(strings);
}
}
Test2
package xyz.ssydx.blocking;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> strings = new ArrayBlockingQueue<String>(3);
strings.offer("a");
strings.offer("b");
strings.offer("c");
strings.offer("d",1, TimeUnit.SECONDS);
System.out.println(strings);
System.out.println(strings.poll());
System.out.println(strings.poll());
System.out.println(strings.poll());
System.out.println(strings.poll(1, TimeUnit.SECONDS));
}
}
Test3
package xyz.ssydx.blocking;
import java.util.concurrent.ArrayBlockingQueue;
public class Test3 {
public static void main(String[] args) {
ArrayBlockingQueue<String> strings = new ArrayBlockingQueue<String>(3);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
strings.put("Hello World" + finalI);
System.out.println(Thread.currentThread().getName() + "插入成功");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + strings.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
}
总结对比
无限等待 | put | take | |
限时等待 | offer | poll | |
抛出异常 | add、addAll | remove、removeAll、RemoveIf | element |
不抛出异常 | offer | poll | peek |
同步队列
package xyz.ssydx.blocking;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class Test4 {
public static void main(String[] args) throws InterruptedException {
// 虽然其看起来像容量为1的阻塞队列, 但本质并不是, 它在执行插入操作时会阻塞, 直到有另一线程进行获取, 反之亦然
SynchronousQueue<String> strings = new SynchronousQueue<>();
new Thread(() -> {
for (String s : Arrays.asList("a", "b", "c")) {
try {
System.out.println("插入: " + s);
strings.put(s);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("取出: " + strings.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("取出: " + strings.take());
TimeUnit.SECONDS.sleep(1);
System.out.println("取出: " + strings.poll());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
池化
通过复用减少频繁的创建和销毁,减少资源消耗,提高响应速度,提供统一管理
池化的应用有连接池、线程池等
默认线程池
package xyz.ssydx.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test1 {
public static void main(String[] args) {
// ExecutorService executorService = Executors.newSingleThreadExecutor(); // 只有一个线程的线程池
// ExecutorService executorService = Executors.newFixedThreadPool(3); // 固定大小的线程池
ExecutorService executorService = Executors.newCachedThreadPool(); // 动态伸缩的线程池
try {
for (int i = 0; i < 100; i++) {
executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
}
} finally {
executorService.shutdown();
}
}
}
自定义线程池
ThreadPoolExecutor 的线程池模型
创建后就存在初始容量数量的线程且不会自动关闭,同时预留一定的扩展数量便于动态增删线程
即 实际可并发的线程数量为最大容量
此外存在一个任务队列(阻塞型单端队列),可以暂时容纳一定数量的任务
每个线程可接纳一个任务,而工作队列可以暂时容纳任务
即同一时刻线程池的最多可以容纳的任务数量为 线程最大容量+任务队列容量
package xyz.ssydx.pool;
import java.util.concurrent.*;
public class Test2 {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
// 初始容量, 最大容量
3, 5,
// 线程存活期, 针对动态扩展的线程, 即最大容量减初始容量, 多久没被使用后自动销毁
60L, TimeUnit.SECONDS,
// 等待区容量
new ArrayBlockingQueue<Runnable>(5),
// 线程工厂, 较少更改
Executors.defaultThreadFactory(),
// 当任务数量 > 线程池的最大容量+阻塞队列的容量, 就需要考虑策略问题了
// 策略: 默认报错, 即 AbortPolicy;
// 原路返回, 即 CallerRunsPolicy;
// 直接抛弃, 即 DiscardPolicy;
// 抛弃队列中最老的任务(即队首的), 即 DiscardOldestPolicy
// 直接抛弃和抛弃最老的形象理解: 前者挤不进等候队列会直接抛弃, 后者则强行挤入队尾同时造成队首的被挤出抛弃
new ThreadPoolExecutor.DiscardOldestPolicy()
);
try {
for (int i = 0; i < 11; i++) {
int finalI = i;
threadPoolExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + " " + finalI));
}
} finally {
threadPoolExecutor.shutdown();
}
}
}
如何设置线程池?
线程池设计的核心原则在于评估任务的性质。对于 I/O 密集型任务,应适当增加线程数量,以充分利用 CPU 空闲时间;而对于 CPU 密集型任务,线程池大小应接近或略大于 CPU 核心数,以避免不必要的上下文切换开销。
一个计算公式:线程数 = N_cpu × U_cpu × (1 + W/C)
N_cpu为CPU数量,U_cpu 为预估CPU使用率,W/C为等待和计算的时间比值(本质就是IO和CPU两种任务的占比,只是通过CPU时间进行了宏观测量),这个公式本质上是在估算在等待 I/O 期间可以调度多少额外的线程来提高并发效率。
I/O 密集型任务通常涉及大量网络请求、磁盘读写等操作,这些过程会显著增加等待时间,导致 CPU 空闲。通过设置更大的线程池,可以在某个线程等待 I/O 时切换到其他任务,从而提升系统吞吐量。
CPU 密集型任务主要依赖 CPU 计算,执行效率高。若线程池过大会导致频繁的线程调度和上下文切换,反而增加系统开销。因此,线程池大小应尽量接近 CPU 核心数,以便每个核心高效处理一个线程,实现真正的并行计算。
注:CPU可以把线程调度到单独的核心执行,实现真正的并行
函数式接口
Function<T,R>
package xyz.ssydx.func_interface;
import java.util.function.Function;
public class Test1 {
public static void main(String[] args) {
// 函数式接口是指只有一个抽象方法的接口(@FunctionalInterface)
// Lambda 表达式本质上是为实现这类接口而设计的一种语法糖
// 单参有返回值的 Function 接口示例(不常用)
Function<String, String> function = new Function<String, String>() {
@Override
public String apply(String s) {
return "Hello World " + s;
}
};
System.out.println(function.apply("ssydx"));
// 实际的使用方式, 这也是如 JDBC、Stream 等常用的操作
String string = myFunc(5, (num) -> {
for (int i = 0; i < num; i++) {
System.out.println(i);
}
return "shutdown";
});
System.out.println(string);
}
private static String myFunc(Integer num, Function<Integer, String> function) {
System.out.println("myFunc");
return function.apply(num);
}
}
其他接口
package xyz.ssydx.func_interface;
import java.util.function.*;
public class Test2 {
public static void main(String[] args) {
// 所有函数式接口都不常采用直接 new 的方式, 此处仅作演示
// 函数式接口或者 Lambda 表达式本质扮演了 C 或C++语言中的回调函数的角色, 理解这点不难明白其作用之广泛
// 以下采用 Lambda 方式
// Consumer<T>, 对于有参的接口, 如果只有一个参数可以省略括号, 单个语句的执行体可以省略花括号
Consumer<String> consumer = s -> System.out.println("s=" + s);
consumer.accept("hello");
// Supplier<T>, 对于无参的接口, 括号不可省略, 单个语句的执行体返回值时可以同时省略花括号和 return
Supplier<String> supplier = () -> "hello";
System.out.println(supplier.get());
// Predicate<T>, 对于复合语句的执行体, 必须具有花括号
Predicate<String> predicate = s -> {
if (s.equals("hello")) {
return true;
} else {
return false;
}
};
System.out.println(predicate.test("hello"));
// Long版即返回值类型为Long的版本, 类比即可
// 对于无参的单个语句的执行体的甚至可以简化为方法引用的形式
LongSupplier longSupplier = System::currentTimeMillis;
System.out.println(longSupplier.getAsLong());
// ToDouble版即返回值类型为Double的版本, 类比即可
// 对于单参的单个语句的执行体的也可以简化为方法引用的形式
ToDoubleFunction<String> toDoubleFunction = Double::parseDouble;
System.out.println(toDoubleFunction.applyAsDouble("3.1415926"));
// 注意: 方法引用不支持多参, 无法自动推导实参和形参的对应关系
// 本质是Function<T, T>, 即参数类型和返回值类型相同
UnaryOperator<String> unaryOperator = s -> s + s;
System.out.println(unaryOperator.apply("hello"));
// 本质是BiFunction<T, T, T>, 即双参和返回值的类型都相同
BinaryOperator<String> binaryOperator = (s1, s2) -> s1 + s2;
System.out.println(binaryOperator.apply("hello", "world"));
// Bi版即两参版本, 类比即可
BiPredicate<String, String> biPredicate = (s1, s2) -> s1.equals(s2) ? true: false;
System.out.println(biPredicate.test("hello", "hello"));
}
}
自定义函数式接口
JDBC等其实也是采用的这种方式
package xyz.ssydx.func_interface;
import java.util.function.*;
public class Test3 {
public static void main(String[] args) {
MyClass myClass = new MyClass();
String str = myClass.myFunc("hello", 10L, 3.14, (s1, l1, d1) -> {
return s1 + " " + l1 + " " + d1;
});
System.out.println(str);
}
}
class MyClass {
public String myFunc(String s1, Long l1, Double d1, MyFuncInter<String, Long, Double, String> myFuncInter) {
return myFuncInter.func(s1, l1, d1);
}
}
// 自己完全也可以也写出类似的函数式接口, 但通常来说没有必要, 官方提供的足以使用
@FunctionalInterface
interface MyFuncInter<T1, T2, T3, R> {
R func(T1 t1, T2 t2, T3 t3 );
}
闭包机制
闭包(Closure)是指一个函数与其引用环境的组合,能够访问并记住其词法作用域,即使该函数在其作用域外执行。
Java 的 Lambda 表达式具备“有限的闭包特性”,可以访问外部作用域中的变量 —— 这就是所谓的 闭包捕获机制(Closure Capturing),这和Js不同,Js的闭包更加强大,不要求外部变量不可变
这个地方是值得深挖的,Java本质只是禁止的变量本身的改变(基本类型为值,引用类型为地址),也就是说如果是引用变量,你依然可以修改引用的对象本身,只是不可将地址重新指向其他对象
这个地方比较难懂,Java只考虑变量本身而不考虑其实际指向问题,未完全解决内外部同时修改可能导致的混乱
注:字符串虽然是引用类型,但其在每次修改后都会进行新建,造成地址修改。一句话总结:字符串变量除了本身是引用,其他性质基本等同于基本类型,它的特殊在JVM中也有很明显的体现
package xyz.ssydx.func_interface;
import java.util.function.Consumer;
import java.util.ArrayList;
import java.util.List;
public class Test4 {
public static void main(String[] args) {
List<Consumer<Void>> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
// 必须复制一次,否则编译报错
// 因为 i 是循环中的变量, 值是可变的, Java闭包只支持捕获不可变的变量
// 通过复制一次, 保存了一个不可变的副本, 满足了不可变要求
// 进一步讲, Java 通过拷贝副本, 避免内部和外部同时修改可能产生的并发问题
// 但实质上并没完全避免, 假如变量是引用类型, 依然可能造成并发问题
int finalI = i;
tasks.add(v -> System.out.println("当前值: " + finalI));
}
// 后续执行这些 Consumer
for (Consumer<Void> task : tasks) {
task.accept(null);
}
}
}
Java和Js对比
是否支持闭包? | 支持(有限制) | 支持(无限制) |
Lambda 是否能访问外部变量? | 可以,但必须是 effectively final | 可以自由访问 |
是否能修改变量内容? | 引用类型的内部状态可以改,但不能重新赋值 | 完全可以修改 |
更安全还是更灵活? | 更安全(编译期防止错误) | 更灵活(但也更容易出错) |
是否适合并发? | 相对更适合(Lambda 不易引起副作用) | 更容易出错(需手动控制) |
effectively final指的是等效不可变。Java 要求变量一旦被 Lambda 捕获,就不能再被修改。即使没有加
final
,只要没改它,就当作是final
来处理
流式计算
前提是具备函数式接口(Lambda表达式)的知识
流式计算基本都涉及函数式接口,如果不了解函数式接口和Lambda表达式,是很难熟练使用流式计算的
package xyz.ssydx.stream;
import java.util.Arrays;
import java.util.List;
public class Test1 {
public static void main(String[] args) {
// 流式操作提供了类似 SQL 的数据处理功能, 甚至还有并行流等高级功能, 是 Java 中的精华
User zs = new User(1, "zs", 21);
User ls = new User(2, "ls", 22);
User ww = new User(3, "ww", 23);
User yq = new User(4, "xq", 24);
User qb = new User(5, "qb", 25);
User xq = new User(6, "yq", 26);
List<User> list = Arrays.asList(zs, ls, ww, yq, qb, xq);
// 把列表转为流以便进行流式计算, 此处实际可省略列表这个中间体, 直接使用 Stream.of(zs, ls, ww, yq, qb, xq)
list.stream()
// 过滤, 参数是Predicate<T>, 为真保留,为假筛除
// User{id=4, name='xq', age=24}
// User{id=6, name='yq', age=26}
.filter(user -> (user.getId() % 2 == 0) && (user.getAge() > 23))
// 转换, Function<T, R>, 在这一步你甚至可以通过映射改变对象的类型
// 但要注意后续操作都是基于新的类型进行的, 即map是一个可能导致副作用的操作
// User{id=4, name='XQ', age=24}
// User{id=6, name='YQ', age=26}
.map(user -> user.setName(user.getName().toUpperCase()))
// 排序, Comparator<T>, 此处为逆序
// User{id=6, name='YQ', age=26}
// User{id=4, name='XQ', age=24}
.sorted((user1, user2) -> user2.getName().compareTo(user1.getName()))
// 跳过
// User{id=4, name='XQ', age=24}
.skip(1)
// 取出
// User{id=4, name='XQ', age=24}
.limit(1)
// 遍历, 注意遍历是一个终止操作, 后续不可继续操作, 相同的还有 collect(把流重新转换为集合) 等操作
.forEach(System.out::println);
}
}
ForkJoin(含底层探究)
什么是ForkJoin?
分支合并,基于分而治之的思想
ForkJoin的通用线程池(通过ForkJoinPool.commonPool();获得)有 Runtime.getRuntime().availableProcessors() - 1 或至少1个线程,(availableProcessors() 获取的是当前 JVM 可用的处理器(核心)数量)
为什么减1?避免占用全部CPU资源导致和其他线程或线程池的资源抢夺(会造成频繁的上下文切换,从而降低性能)
ForkJoin 的线程池模型不同于 ThreadPoolExecutor
ForkJoin的每个线程都有一个单独的工作队列(阻塞型双端队列)
初始执行时只有一个任务,分配给其中一个线程A,线程在执行任务过程中会创建出子任务,子任务从头部加入其任务队列,这时,其他线程没有任务,又发现A有未执行的任务,就会从队尾窃取任务执行,进而分裂出更多任务,更多没有任务线程加入到窃取的过程,如此往复到每个线程都有任务可执行,这期间如果某个线程提前完成了自己所有任务又会进行窃取,这就是工作窃取,不难看出这个过程是递归的
为什么要头部入队尾部窃取?
头部入队保证该线程执行完当前任务后取出的总是最新创建的任务,有利于提高缓存命中率
尾部窃取则避免了与当前任务队列所属的线程进行取任务操作时造成冲突
实际上任务代码本身也是递归的(不然怎么能在执行过程中分裂出子任务?),这个模型本质是双重递归,即任务代码递归,任务入队递归
总结:ForkJoin 框架通过“分而治之”、“递归分解”和“工作窃取”三大核心思想,实现了任务的高效并行处理,尤其适用于 CPU 密集型、可拆分的大规模计算任务。
注:ForkJoin的默认线程池是默认存在的,并不需要显式创建,因为所有并行计算、异步回调等底层都会使用
一个简单的示例
package xyz.ssydx.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) {
test1();
test2();
test3();
}
private static void test1() {
Long start = System.currentTimeMillis();
long sum = 0;
for (long i = 1; i <= 10_0000_0000; i++) {
sum += i;
}
System.out.println(sum);
Long end = System.currentTimeMillis();
System.out.println(end - start);
}
private static void test2() {
Long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinPool forkJoinPool1 = ForkJoinPool.commonPool();
Sum sum = new Sum(1, 10_0000_0000);
forkJoinPool.execute(sum);
try {
Long l = sum.get();
System.out.println(l);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
Long end = System.currentTimeMillis();
System.out.println(end - start);
}
private static void test3() {
Long start = System.currentTimeMillis();
System.out.println(LongStream.rangeClosed(1, 10_0000_0000).parallel().reduce(0L, Long::sum));
Long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
class Sum extends RecursiveTask<Long> {
private final long start;
private final long end;
private final int range;
public Sum(long start, long end) {
this(start, end, 10000);
}
public Sum(long start, long end, int range) {
this.start = start;
this.end = end;
this.range = range;
}
@Override
protected Long compute() {
if (end - start <= range) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
Sum left = new Sum(start, middle);
left.fork();
Sum right = new Sum(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
并行流底层也是 ForkJoin,因此此处和自行实现的也许时间近似,但都比单线程计算要快很多
ForkJoinPool 和 ThreadPoolExecutor 对比
任务类型 | Runnable , Callable |
RecursiveTask , RecursiveAction |
线程数量控制 | 初始线程数、核心线程数、最大线程数 | 固定并行度(默认 CPU 核心数 - 1) |
任务队列 | 单个共享的阻塞队列(如 LinkedBlockingQueue) | 每个线程私有的双端队列(Deque) |
调度机制 | 线程从共享队列中争抢任务 | 工作窃取机制:线程优先执行本地任务,空闲时从其他线程 Deque 尾部“偷”任务 |
是否支持 fork/join | 不支持 | 支持 |
任务生成方式 | 提交前必须准备好所有任务 | 动态生成子任务(递归分裂) |
异步回调
一个简单的示例
package xyz.ssydx.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 线程计数器
// 为啥此处要用线程计数器
// 因为ForkJoin的线程池默认是守护线程, 如果在异步回调完成前主线程已经结束, 那回调任务所在的守护线程会自动死亡
CountDownLatch countDownLatch = new CountDownLatch(1);
// 此处使用 ForkJoin 的默认线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// // 用于测试任务内容错误
// int i = 1 / 0;
System.out.println("执行完毕");
System.out.println(Thread.currentThread().getName());
return "hello world";
});
System.out.println("主线程1");
// // 虽然过程异步, 但该get()方法是同步的, 会阻塞
// String string = future.get();
// System.out.println(string);
future.whenComplete((s, t) -> {
// 操作前要注意对象是否为空, 避免造成空指针错误, 从而使得线程计数器无法自减进而造成程序无法自动终止
if (s != null) {
System.out.println(s);
}
if (t != null) {
System.out.println(t.getMessage() + "任务内容错误");
}
// 任务完成后才自减
countDownLatch.countDown();
}).exceptionally((t) -> {
return t.getMessage() + "任务本身错误";
});
System.out.println("主线程2");
// 等待线程都结束
countDownLatch.await();
// 除了线程计数器外, 阻塞方法get或让主线程休眠一会儿也可以做到(不推荐)
}
}
诸如ForkJoinPool或ThreadPoolExecutor,其线程默认是守护线程,自己通过new Thread()则是非守护线程
此示例展示了 CountDownLatch 的实际使用价值
推荐使用 CountDownLatch:对于大多数需要等待异步任务完成的场景,尤其是当有多个任务需要协调时,CountDownLatch 提供了一种高效且灵活的方式来确保主线程不会过早退出。
谨慎使用阻塞方法:仅在简单的单任务同步场景中考虑使用 .get() 方法,但要注意其可能导致的阻塞问题。
自定义线程池需慎重:虽然提供了极大的灵活性,但同时也带来了额外的复杂性,通常只在有明确的需求时才建议使用。
Promise 和 Future
如果有 Js 的 Promise 相关知识,Java 的 CompletableFuture 是比较好理解的,两者的使用方式是近似一致的
基本作用 | 表示一个异步操作的最终结果(成功或失败) | 提供更强大的 API 来控制和组合异步任务 |
是否可手动完成 | 不可手动 resolve/reject(除非使用 deferred) | 支持 complete() 、completeExceptionally() |
链式调用 | 使用 .then() 和 .catch() 链式调用 |
支持 .thenApply() , .thenAccept() , .exceptionally() 等 |
异常处理 | .catch() 捕获错误 |
.exceptionally() 提供默认值 |
组合多个任务 | 使用 Promise.all() , Promise.race() 等 |
支持 thenCompose() , thenCombine() , allOf() , anyOf() |
是否阻塞 | 非阻塞(事件循环机制) | 默认非阻塞,但 .get() 同样阻塞 |
回调注册时机 | 即使 Promise 已完成,后续添加的 .then() 也能拿到结果 |
回调注册不依赖状态,始终有效 |
是否是标准 API | ES6 标准 | Java 8+ |
线程模型 | 单线程(事件循环) | 多线程,推荐配合自定义线程池 |
典型使用场景 | Web 前端、Node.js 异步请求 | 复杂异步流程、多任务编排、链式调用 |
JMM再理解
JVM篇中有简单的讲解
volatile
TODO
可见性
原子性
指令重排
深入单例模式
CAS和ABA
TODO
锁的种类
TODO
可重入锁
自旋锁
读写锁
死锁问题
问题
某一个同步块拥有两个及以上的监视对象,就可能出现两个线程各自持有一个对象的锁,但都无法获取到另一个对象的锁,造成僵持
必要条件
死锁发生必须同时满足四个条件,又称Coffman条件,打破其一即可避免死锁
互斥条件(Mutual Exclusion)
某些资源只能被一个线程占用,即一次只有一个线程可以使用该资源。如果另一个线程请求该资源,则必须等待,直到该资源被释放。
占有并等待条件(Hold and Wait)
一个线程已经占有了至少一个资源,但又提出了对新资源的请求。然而,这个新资源已经被其他线程占有,导致当前线程处于等待状态,同时它也不愿意释放自己已占有的资源。
不可剥夺条件(No Preemption)
资源不能被强制从某个线程中夺走,只能由该线程自行释放。这意味着一旦一个线程获得了某些资源,除非该线程主动放弃这些资源(比如完成任务后正常释放),否则这些资源将一直保持被该线程占用的状态。
循环等待条件(Circular Wait)
存在一个线程的环形等待序列,其中每个线程都在等待下一个线程所持有的资源。具体来说,存在一组等待获取资源的线程 {T0, T1, ..., Tn},使得 T0 等待 T1 占有的资源,T1 等待 T2 占有的资源,...,Tn-1 等待 Tn 占有的资源,而 Tn 又等待 T0 占有的资源。
方案
打破互斥条件
尽量减少使用独占性资源。对于一些资源,如果可以设计成允许共享访问,则可以避免这个问题。但是,在很多情况下,这是不可能的,因为有些资源本质上就是不可共享的(例如打印机)。
打破占有并等待条件
一种方法是要求所有线程在开始执行之前一次性申请所有需要的资源。
另一种方法是实现一种机制,允许操作系统或运行时环境抢占资源(但这可能导致复杂性和潜在的数据一致性问题)。
打破不可剥夺条件
允许系统强制回收资源。例如,当一个线程请求一个新的资源而暂时无法获得时,它可以释放所有已经拥有的资源,并在稍后重新申请它们。这种方法可能会引入额外的开销和复杂性。
打破循环等待条件
通过给资源分配顺序编号,并要求线程按照编号从小到大的顺序请求资源,这样就可以避免形成循环等待链。这是一种常见的策略,虽然简单但有效。
实践
始终按照相同的顺序获取锁:确保所有线程都以固定的、相同的顺序获取锁。这可以防止循环等待条件的发生,而循环等待正是死锁的一个必要条件。 尝试使用超时机制:在尝试获取锁时设置一个超时时间。如果在指定时间内未能获取到锁,则放弃尝试并进行适当的错误处理。 尽量减少锁的范围:只在必要的时候持有锁,并尽快释放它。这样可以减少并发冲突的机会。 使用显式的锁对象如 ReentrantLock:ReentrantLock 提供了比内置的 synchronized 更灵活的锁定机制,包括可中断的锁等待和定时锁等待等特性。 使用更高层次的并发工具:例如 java.util.concurrent 包中的类,如 ConcurrentHashMap 或者 ReadWriteLock,它们可以在很多情况下帮助你避免直接管理锁带来的复杂性。
#java##并发编程#本专栏包含Java、MySQL、JavaWeb、Spring、Redis、Docker等等,作为个人学习记录及知识总结,将长期进行更新! 如果浏览、点赞、收藏、订阅过少,考虑停更或删除了(😄有点打击积极性)