并发编程常用核心类示例

Java并发编程常用核心类 完整示例代码

1、线程创建 4 种方式

1.1 继承Thread类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("线程执行:" + Thread.currentThread().getName());
    }
}

// 测试
public class Demo {
    public static void main(String[] args) {
        new MyThread().start();
    }
}

1.2 实现Runnable接口

1
2
Runnable runnable = () -> System.out.println("Runnable线程");
new Thread(runnable).start();

1.3 Callable + FutureTask 有返回值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

Callable<Integer> callable = () -> {
    Thread.sleep(1000);
    return 100 + 200;
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();

// 获取返回值
Integer result = futureTask.get();
System.out.println("线程返回值:" + result);

1.4 线程池创建

1
2
3
4
5
6
7
8
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
    pool.execute(() -> System.out.println("线程池任务:" + Thread.currentThread().getName()));
}
pool.shutdown();

2、线程池 手动创建(阿里规范,禁止Executors)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                2,                      // 核心线程数
                5,                     // 最大线程数
                10L,                   // 空闲线程存活时间
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10), // 阻塞队列
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
        );

        for (int i = 0; i < 8; i++) {
            pool.execute(() -> System.out.println("自定义线程池任务"));
        }
        pool.shutdown();
    }
}

3、CountDownLatch 倒计时闭锁

作用:主线程等待所有子线程执行完再继续

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                System.out.println("子线程执行");
                latch.countDown(); // 计数-1
            }).start();
        }
        latch.await(); // 等待计数归0
        System.out.println("所有子线程执行完毕,主线程继续");
    }
}

4、CyclicBarrier 循环栅栏

作用:一组线程互相等待,凑齐数量再一起执行,可循环复用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, () ->
                System.out.println("所有人就绪,统一开始执行")
        );

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    System.out.println("线程准备就绪");
                    barrier.await(); // 等待凑齐3个
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

5、Semaphore 信号量

作用:控制同时访问资源的线程数量,做限流、池化资源控制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 只允许2个线程同时执行
        Semaphore semaphore = new Semaphore(2);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println("进入资源执行业务");
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }).start();
        }
    }
}

6、Atomic 原子类(解决并发变量安全)

AtomicInteger 示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicDemo {
    private static AtomicInteger num = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    num.incrementAndGet(); // 原子自增
                }
            }).start();
        }
        Thread.sleep(2000);
        System.out.println("最终结果:" + num.get()); // 正确10000
    }
}

7、ReentrantLock 可重入锁

作用:替代synchronized,可尝试获取锁、可超时、可公平锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {
    private static ReentrantLock lock = new ReentrantLock();
    private static int count = 0;

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    count++;
                    System.out.println("count = " + count);
                } finally {
                    lock.unlock(); // 必须在finally释放锁
                }
            }).start();
        }
    }
}

8、ReadWriteLock 读写锁

规则:读共享、写独占;读写互斥、写写互斥

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
    private static ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private static int value = 0;

    // 读
    public static void read() {
        rwLock.readLock().lock();
        try {
            System.out.println("读取值:" + value);
        } finally {
            rwLock.readLock().unlock();
        }
    }

    // 写
    public static void write(int newValue) {
        rwLock.writeLock().lock();
        try {
            value = newValue;
            System.out.println("写入值:" + value);
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

9、ThreadLocal 线程本地变量

作用:每个线程独有一份变量,线程间隔离

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class ThreadLocalDemo {
    private static ThreadLocal<String> tl = new ThreadLocal<>();

    public static void main(String[] args) {
        new Thread(() -> {
            tl.set("线程A数据");
            System.out.println(Thread.currentThread().getName() + ":" + tl.get());
        }).start();

        new Thread(() -> {
            tl.set("线程B数据");
            System.out.println(Thread.currentThread().getName() + ":" + tl.get());
        }).start();
        
        // 用完必须remove,防止内存泄漏
        tl.remove();
    }
}

10、CompletableFuture 异步任务编排

作用:替代Future,支持回调、串行、并行、异常处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // 异步执行
        CompletableFuture.supplyAsync(() -> {
            TimeUnit.SECONDS.sleep(1);
            return 100;
        }).thenAccept(res -> {
            // 回调处理结果
            System.out.println("异步结果:" + res);
        });

        TimeUnit.SECONDS.sleep(2);
    }
}

小结

  1. CountDownLatch:一次等待,主线程等子线程完事;
  2. CyclicBarrier:凑齐数量一起执行,可循环;
  3. Semaphore:限流控制并发线程数;
  4. Atomic:无锁CAS保证变量原子安全;
  5. ReentrantLock:可重入、可公平、可超时,替代synchronized;
  6. ReadWriteLock:读多写少场景,读共享写独占;
  7. ThreadLocal:线程变量隔离,注意用完remove防泄漏;
  8. CompletableFuture:异步任务回调、多任务编排。