Skip to content

Executors

文件信息

  • 📄 原文件:Executors.java
  • 🔤 语言:java

Java 线程池与 Executor 本文件介绍 Java 中的线程池框架。

完整代码

java
import java.util.concurrent.*;
import java.util.*;

/**
 * ============================================================
 *                    Java 线程池与 Executor
 * ============================================================
 * 本文件介绍 Java 中的线程池框架。
 * ============================================================
 */
public class Executors {

    public static void main(String[] args) throws Exception {
        executorBasics();
        threadPoolTypes();
        callableAndFuture();
        completableFutureDemo();
        scheduledExecutor();
    }

    /**
     * ============================================================
     *                    1. Executor 基础
     * ============================================================
     */
    public static void executorBasics() throws InterruptedException {
        System.out.println("=".repeat(60));
        System.out.println("1. Executor 基础");
        System.out.println("=".repeat(60));

        System.out.println("""
            Executor 框架层次:
            Executor          - 执行任务的接口
            └── ExecutorService  - 管理生命周期
                └── ScheduledExecutorService  - 定时任务

            【优点】
            - 线程复用,减少创建销毁开销
            - 控制并发数量
            - 提供任务队列
            - 支持定时和周期任务
            """);

        // 【创建线程池】
        ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(3);

        // 【提交任务】
        System.out.println("--- 提交任务 ---");
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 由 " +
                    Thread.currentThread().getName() + " 执行");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        // 【关闭线程池】
        System.out.println("\n--- 关闭线程池 ---");
        executor.shutdown();  // 不再接受新任务,等待已提交任务完成
        boolean terminated = executor.awaitTermination(5, TimeUnit.SECONDS);
        System.out.println("线程池已关闭: " + terminated);

        // 【shutdown vs shutdownNow】
        System.out.println("""

            shutdown()     - 平滑关闭,等待任务完成
            shutdownNow()  - 立即关闭,尝试中断正在执行的任务
            awaitTermination() - 等待关闭完成
            """);
    }

    /**
     * ============================================================
     *                    2. 线程池类型
     * ============================================================
     */
    public static void threadPoolTypes() throws InterruptedException {
        System.out.println("\n" + "=".repeat(60));
        System.out.println("2. 线程池类型");
        System.out.println("=".repeat(60));

        // 【FixedThreadPool】固定大小
        System.out.println("--- FixedThreadPool ---");
        System.out.println("固定数量的线程,任务队列无界");
        ExecutorService fixed = java.util.concurrent.Executors.newFixedThreadPool(2);
        submitTasks(fixed, 3);
        fixed.shutdown();
        fixed.awaitTermination(3, TimeUnit.SECONDS);

        // 【CachedThreadPool】缓存线程池
        System.out.println("\n--- CachedThreadPool ---");
        System.out.println("按需创建线程,空闲 60 秒回收");
        ExecutorService cached = java.util.concurrent.Executors.newCachedThreadPool();
        submitTasks(cached, 3);
        cached.shutdown();
        cached.awaitTermination(3, TimeUnit.SECONDS);

        // 【SingleThreadExecutor】单线程
        System.out.println("\n--- SingleThreadExecutor ---");
        System.out.println("单个工作线程,保证任务顺序执行");
        ExecutorService single = java.util.concurrent.Executors.newSingleThreadExecutor();
        submitTasks(single, 3);
        single.shutdown();
        single.awaitTermination(3, TimeUnit.SECONDS);

        // 【VirtualThreadPerTaskExecutor】虚拟线程(Java 21+)
        System.out.println("\n--- VirtualThreadPerTaskExecutor (Java 21+) ---");
        System.out.println("每个任务一个虚拟线程,适合大量 I/O 密集任务");

        // 【ThreadPoolExecutor 自定义】
        System.out.println("\n--- 自定义 ThreadPoolExecutor ---");
        ThreadPoolExecutor custom = new ThreadPoolExecutor(
            2,                      // 核心线程数
            4,                      // 最大线程数
            60L,                    // 空闲时间
            TimeUnit.SECONDS,       // 时间单位
            new ArrayBlockingQueue<>(10),  // 任务队列
            new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
        );

        System.out.println("""
            ThreadPoolExecutor 参数:
            - corePoolSize: 核心线程数
            - maximumPoolSize: 最大线程数
            - keepAliveTime: 空闲线程存活时间
            - workQueue: 任务队列
            - threadFactory: 线程工厂
            - handler: 拒绝策略

            拒绝策略:
            - AbortPolicy: 抛出异常(默认)
            - CallerRunsPolicy: 调用者线程执行
            - DiscardPolicy: 静默丢弃
            - DiscardOldestPolicy: 丢弃最旧的任务
            """);

        custom.shutdown();
    }

    private static void submitTasks(ExecutorService executor, int count) {
        for (int i = 1; i <= count; i++) {
            final int id = i;
            executor.submit(() -> {
                System.out.println("  任务 " + id + " -> " +
                    Thread.currentThread().getName());
            });
        }
    }

    /**
     * ============================================================
     *                    3. Callable 和 Future
     * ============================================================
     */
    public static void callableAndFuture() throws Exception {
        System.out.println("\n" + "=".repeat(60));
        System.out.println("3. Callable 和 Future");
        System.out.println("=".repeat(60));

        System.out.println("""
            Runnable vs Callable:
            - Runnable.run() 无返回值,不能抛出检查异常
            - Callable.call() 有返回值,可以抛出异常

            Future:表示异步计算的结果
            """);

        ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(2);

        // 【Callable 返回结果】
        System.out.println("--- Callable ---");
        Callable<Integer> task = () -> {
            Thread.sleep(1000);
            return 42;
        };

        Future<Integer> future = executor.submit(task);
        System.out.println("任务已提交,是否完成: " + future.isDone());

        // 阻塞获取结果
        Integer result = future.get();
        System.out.println("结果: " + result);
        System.out.println("是否完成: " + future.isDone());

        // 【带超时的 get】
        System.out.println("\n--- 带超时的 get ---");
        Future<String> slowFuture = executor.submit(() -> {
            Thread.sleep(3000);
            return "完成";
        });

        try {
            String r = slowFuture.get(1, TimeUnit.SECONDS);
            System.out.println("结果: " + r);
        } catch (TimeoutException e) {
            System.out.println("超时!取消任务: " + slowFuture.cancel(true));
        }

        // 【invokeAll】等待所有任务完成
        System.out.println("\n--- invokeAll ---");
        List<Callable<Integer>> tasks = List.of(
            () -> { Thread.sleep(500); return 1; },
            () -> { Thread.sleep(300); return 2; },
            () -> { Thread.sleep(100); return 3; }
        );

        List<Future<Integer>> futures = executor.invokeAll(tasks);
        System.out.print("所有结果: ");
        for (Future<Integer> f : futures) {
            System.out.print(f.get() + " ");
        }
        System.out.println();

        // 【invokeAny】返回第一个完成的
        System.out.println("\n--- invokeAny ---");
        Integer first = executor.invokeAny(tasks);
        System.out.println("最快完成的结果: " + first);

        executor.shutdown();
    }

    /**
     * ============================================================
     *                    4. CompletableFuture
     * ============================================================
     */
    public static void completableFutureDemo() throws Exception {
        System.out.println("\n" + "=".repeat(60));
        System.out.println("4. CompletableFuture");
        System.out.println("=".repeat(60));

        System.out.println("""
            CompletableFuture 优势:
            - 链式调用
            - 组合多个异步操作
            - 异常处理
            - 不阻塞主线程
            """);

        // 【创建 CompletableFuture】
        System.out.println("--- 创建 ---");

        // 异步执行,无返回值
        CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
            System.out.println("runAsync 执行");
        });

        // 异步执行,有返回值
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        });

        System.out.println("supplyAsync 结果: " + cf2.get());

        // 【链式操作】
        System.out.println("\n--- 链式操作 ---");

        CompletableFuture<String> chain = CompletableFuture
            .supplyAsync(() -> "Hello")
            .thenApply(s -> s + " World")      // 转换
            .thenApply(String::toUpperCase);   // 再转换

        System.out.println("链式结果: " + chain.get());

        // 【thenAccept / thenRun】
        System.out.println("\n--- thenAccept / thenRun ---");

        CompletableFuture.supplyAsync(() -> "数据")
            .thenAccept(s -> System.out.println("thenAccept: " + s))  // 消费结果
            .thenRun(() -> System.out.println("thenRun: 完成"));      // 无参数

        Thread.sleep(100);

        // 【组合多个 Future】
        System.out.println("\n--- 组合操作 ---");

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

        // thenCombine: 两个都完成后合并
        CompletableFuture<String> combined = future1.thenCombine(future2,
            (s1, s2) -> s1 + " " + s2);
        System.out.println("thenCombine: " + combined.get());

        // allOf: 等待所有完成
        CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2);
        all.join();
        System.out.println("allOf 完成");

        // anyOf: 任一完成
        CompletableFuture<Object> any = CompletableFuture.anyOf(
            CompletableFuture.supplyAsync(() -> { sleep(100); return "A"; }),
            CompletableFuture.supplyAsync(() -> { sleep(200); return "B"; })
        );
        System.out.println("anyOf: " + any.get());

        // 【异常处理】
        System.out.println("\n--- 异常处理 ---");

        CompletableFuture<Integer> errorFuture = CompletableFuture
            .supplyAsync(() -> {
                if (true) throw new RuntimeException("模拟错误");
                return 1;
            })
            .exceptionally(ex -> {
                System.out.println("捕获异常: " + ex.getMessage());
                return -1;  // 返回默认值
            });

        System.out.println("异常处理结果: " + errorFuture.get());

        // handle: 同时处理正常和异常
        CompletableFuture<String> handled = CompletableFuture
            .supplyAsync(() -> "OK")
            .handle((result, ex) -> {
                if (ex != null) return "Error: " + ex.getMessage();
                return "Success: " + result;
            });

        System.out.println("handle: " + handled.get());
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /**
     * ============================================================
     *                    5. ScheduledExecutorService
     * ============================================================
     */
    public static void scheduledExecutor() throws InterruptedException {
        System.out.println("\n" + "=".repeat(60));
        System.out.println("5. ScheduledExecutorService");
        System.out.println("=".repeat(60));

        ScheduledExecutorService scheduler =
            java.util.concurrent.Executors.newScheduledThreadPool(2);

        // 【延迟执行】
        System.out.println("--- 延迟执行 ---");
        scheduler.schedule(() -> {
            System.out.println("延迟 1 秒后执行");
        }, 1, TimeUnit.SECONDS);

        // 【固定速率】
        System.out.println("\n--- 固定速率执行 ---");
        ScheduledFuture<?> fixedRate = scheduler.scheduleAtFixedRate(() -> {
            System.out.println("固定速率: " + System.currentTimeMillis() % 10000);
        }, 0, 500, TimeUnit.MILLISECONDS);

        Thread.sleep(2000);
        fixedRate.cancel(false);

        // 【固定延迟】
        System.out.println("\n--- 固定延迟执行 ---");
        System.out.println("""
            scheduleAtFixedRate: 按固定速率,不管任务耗时
            scheduleWithFixedDelay: 上个任务结束后等待固定时间
            """);

        scheduler.shutdown();
        scheduler.awaitTermination(3, TimeUnit.SECONDS);
    }
}

💬 讨论

使用 GitHub 账号登录后即可参与讨论

基于 MIT 许可发布