Java并发编程常用核心类 完整示例代码
1、线程创建 4 种方式
1.1 继承Thread类
1
2
3
4
5
6
7
8
9
10
11
12
13
|
class MyThread extends Thread {
@Override
public void run() {
System.out.println("线程执行:" + Thread.currentThread().getName());
}
}
// 测试
public class Demo {
public static void main(String[] args) {
new MyThread().start();
}
}
|
1.2 实现Runnable接口
1
2
|
Runnable runnable = () -> System.out.println("Runnable线程");
new Thread(runnable).start();
|
1.3 Callable + FutureTask 有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
Callable<Integer> callable = () -> {
Thread.sleep(1000);
return 100 + 200;
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
// 获取返回值
Integer result = futureTask.get();
System.out.println("线程返回值:" + result);
|
1.4 线程池创建
1
2
3
4
5
6
7
8
|
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
pool.execute(() -> System.out.println("线程池任务:" + Thread.currentThread().getName()));
}
pool.shutdown();
|
2、线程池 手动创建(阿里规范,禁止Executors)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
10L, // 空闲线程存活时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10), // 阻塞队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
for (int i = 0; i < 8; i++) {
pool.execute(() -> System.out.println("自定义线程池任务"));
}
pool.shutdown();
}
}
|
3、CountDownLatch 倒计时闭锁
作用:主线程等待所有子线程执行完再继续
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println("子线程执行");
latch.countDown(); // 计数-1
}).start();
}
latch.await(); // 等待计数归0
System.out.println("所有子线程执行完毕,主线程继续");
}
}
|
4、CyclicBarrier 循环栅栏
作用:一组线程互相等待,凑齐数量再一起执行,可循环复用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () ->
System.out.println("所有人就绪,统一开始执行")
);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println("线程准备就绪");
barrier.await(); // 等待凑齐3个
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
|
5、Semaphore 信号量
作用:控制同时访问资源的线程数量,做限流、池化资源控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
// 只允许2个线程同时执行
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("进入资源执行业务");
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}
|
6、Atomic 原子类(解决并发变量安全)
AtomicInteger 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
private static AtomicInteger num = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
num.incrementAndGet(); // 原子自增
}
}).start();
}
Thread.sleep(2000);
System.out.println("最终结果:" + num.get()); // 正确10000
}
}
|
7、ReentrantLock 可重入锁
作用:替代synchronized,可尝试获取锁、可超时、可公平锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
private static ReentrantLock lock = new ReentrantLock();
private static int count = 0;
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
lock.lock();
try {
count++;
System.out.println("count = " + count);
} finally {
lock.unlock(); // 必须在finally释放锁
}
}).start();
}
}
}
|
8、ReadWriteLock 读写锁
规则:读共享、写独占;读写互斥、写写互斥
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
private static ReadWriteLock rwLock = new ReentrantReadWriteLock();
private static int value = 0;
// 读
public static void read() {
rwLock.readLock().lock();
try {
System.out.println("读取值:" + value);
} finally {
rwLock.readLock().unlock();
}
}
// 写
public static void write(int newValue) {
rwLock.writeLock().lock();
try {
value = newValue;
System.out.println("写入值:" + value);
} finally {
rwLock.writeLock().unlock();
}
}
}
|
9、ThreadLocal 线程本地变量
作用:每个线程独有一份变量,线程间隔离
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class ThreadLocalDemo {
private static ThreadLocal<String> tl = new ThreadLocal<>();
public static void main(String[] args) {
new Thread(() -> {
tl.set("线程A数据");
System.out.println(Thread.currentThread().getName() + ":" + tl.get());
}).start();
new Thread(() -> {
tl.set("线程B数据");
System.out.println(Thread.currentThread().getName() + ":" + tl.get());
}).start();
// 用完必须remove,防止内存泄漏
tl.remove();
}
}
|
10、CompletableFuture 异步任务编排
作用:替代Future,支持回调、串行、并行、异常处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 异步执行
CompletableFuture.supplyAsync(() -> {
TimeUnit.SECONDS.sleep(1);
return 100;
}).thenAccept(res -> {
// 回调处理结果
System.out.println("异步结果:" + res);
});
TimeUnit.SECONDS.sleep(2);
}
}
|
小结
- CountDownLatch:一次等待,主线程等子线程完事;
- CyclicBarrier:凑齐数量一起执行,可循环;
- Semaphore:限流控制并发线程数;
- Atomic:无锁CAS保证变量原子安全;
- ReentrantLock:可重入、可公平、可超时,替代synchronized;
- ReadWriteLock:读多写少场景,读共享写独占;
- ThreadLocal:线程变量隔离,注意用完remove防泄漏;
- CompletableFuture:异步任务回调、多任务编排。