Skip to content

动态线程池

约 2489 字大约 8 分钟

2025-07-09

什么是线程池

线程池是一种基于池化思想管理线程的工具,用来降低资源消耗、提高响应速度、提高线程管理。池化技术的引入,可以有效的减少线程频繁申请/销毁和调度带来的额外开销。

ThreadPoolExecutor

image-20250709143401138
image-20250709143401138

线程池在内部实际上构建了⼀个⽣产者消费者模型,将线程和任务两者解耦,并不直接关联,从⽽良好的缓冲任 务,复⽤线程。线程池的运⾏主要分成两部分:任务管理、线程管理。

任务管理部分充当⽣产者的⻆⾊,当任务提交后,线程池会判断该任务后续的流转:

  1. 直接申请线程执⾏该任务;

  2. 缓冲到队列中等待线程执⾏;

  3. 拒绝该任务。

    线程管理部分是消费者,它们被统⼀维护在线程池内,根据任务请求进⾏线程的分配,当线程

    执⾏完任务后则会继续获取新的任务去执⾏,最终当线程获取不到任务的时候,线程就会被回收。

生命周期

线程池的内部使用运行状态线程数量来维护,而不是由用户主动操作。

image-20250709143910051
image-20250709143910051

任务调度流程

资料:Java线程池实现原理及其在美团业务中的实践 Image[5]
资料:Java线程池实现原理及其在美团业务中的实践 Image[5]

任务阻塞队列

阻塞队列(BlockingQueue)是⼀个⽀持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线 程会等待队列变为⾮空。当队列满时,存储元素的线程会等待队列可⽤。阻塞队列常⽤于⽣产者和消费者的场景, ⽣产者是往队列⾥添加元素的线程,消费者是从队列⾥拿元素的线程。阻塞队列就是⽣产者存放元素的容器,⽽消 费者也只从容器⾥拿元素。

资料:Java线程池实现原理及其在美团业务中的实践 Image[6]
资料:Java线程池实现原理及其在美团业务中的实践 Image[6]

任务拒绝

资料:Java线程池实现原理及其在美团业务中的实践 Image[8]
资料:Java线程池实现原理及其在美团业务中的实践 Image[8]

以上内容参考

Java线程池实现原理及其在美团业务中的实践

自定义简易获取线程池

我们需要创建两个项目,一个是动态线程池SDK,另一个是测试项目。

动态线程池SDK

只要其他项目引入,并且配置了ThreadPoolExecutor,那么这个方法就能拦截

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
</dependency>

动态配置入口

/**
 * 动态配置入口
 */
@Configuration
public class DynamicThreadPoolAutoConfig {

    private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolAutoConfig.class);

    @Bean("dynamicThreadPoolService")
    public String dynamicThreadPoolService(ApplicationContext applicationContext, Map<String, ThreadPoolExecutor> threadPoolExecutor) {
        String applicationName = applicationContext.getEnvironment().getProperty("spring.application.name");
        if (StringUtils.isBlank(applicationName)) {
            applicationName = "default";
            logger.error("动态线程池启动,SpringBoot未配置名称");
        }
        logger.info("线程池信息{}", JSON.toJSONString(threadPoolExecutor));
        return new String();
    }
}

我们需要在resources配置的META-INF/spring.foctories,实现自动装配,配置到当前配置类位置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.flycode.dynamic.thread.pool.sdk.config.DynamicThreadPoolAutoConfig

将此项目打包

测试动态线程池

我们需要将线程核心参数抽离出来,而不是写死,我们只需要在yml中配置响应参数即可。

@Component
@Data
@ConfigurationProperties(prefix = "thread.pool.executor.config", ignoreUnknownFields = true)
public class ThreadPoolConfigProperties {
    /**
     * 核心线程数
     */
    private int corePoolSize = 20;
    /**
     * 最大线程数
     */
    private int maximumPoolSize = 100;
    /**
     * 线程存活时间
     */
    private long keepAliveTime = 10L;
    /**
     * 阻塞队列长度
     */
    private int blockQueueSize = 5000;
    /**
     * 丢弃任务方式
     */
    private String policy = "AbortPolicy";
}

我们可以创建两个线程池,便于看出区别

@Slf4j
@EnableAsync
@Configuration
@EnableConfigurationProperties({ThreadPoolConfigProperties.class})
public class ThreadPoolConfig {

    @Bean("threadPool01")
    public ThreadPoolExecutor threadPool01(ThreadPoolConfigProperties threadPoolConfigProperties) {
        RejectedExecutionHandler rejectedExecutionHandler;
        switch (threadPoolConfigProperties.getPolicy()) {
            case "AbortPolicy":
                rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
                break;
            case "DiscardPolicy":
                rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
                break;
            case "DiscardOldestPolicy":
                rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
                break;
            default:
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
        }

        // 创建线程池
        return new ThreadPoolExecutor(
                threadPoolConfigProperties.getCorePoolSize(),
                threadPoolConfigProperties.getMaximumPoolSize(),
                threadPoolConfigProperties.getKeepAliveTime(),
                TimeUnit.MINUTES,
                new LinkedBlockingDeque<>(threadPoolConfigProperties.getBlockQueueSize()),
                Executors.defaultThreadFactory(),
                rejectedExecutionHandler
        );
    }


    @Bean("threadPool02")
    public ThreadPoolExecutor threadPool02(ThreadPoolConfigProperties threadPoolConfigProperties) {
        RejectedExecutionHandler rejectedExecutionHandler;
        switch (threadPoolConfigProperties.getPolicy()) {
            case "AbortPolicy":
                rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
                break;
            case "DiscardPolicy":
                rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
                break;
            case "DiscardOldestPolicy":
                rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
                break;
            default:
                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
        }

        // 创建线程池
        return new ThreadPoolExecutor(
                threadPoolConfigProperties.getCorePoolSize(),
                threadPoolConfigProperties.getMaximumPoolSize(),
                threadPoolConfigProperties.getKeepAliveTime(),
                TimeUnit.MINUTES,
                new LinkedBlockingDeque<>(threadPoolConfigProperties.getBlockQueueSize()),
                Executors.defaultThreadFactory(),
                rejectedExecutionHandler
        );
    }
}

测试

当测试项目启动后,打上断点,我们可以看到当前已经读取到所有的线程池,便于我们之后动态的修改线程数量。

image-20250709164936223
image-20250709164936223
image-20250709165243088
image-20250709165243088

动态线程池数据上报

我们需要构造如下几个方法

  1. 获取当前所有的线程池信息
  2. 根据线程池名称获取指定线程池信息
  3. 根据线程池配置更新线程池信息

动态线程池服务实现类

/**
 * 动态线程池服务实现类
 *
 * @author flycode
 */
public class DynamicThreadPoolServiceImpl implements IDynamicThreadPoolService {


    private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolServiceImpl.class);

    private final Map<String, ThreadPoolExecutor> threadPoolExecutorMap;
    private final String applicationName;

    public DynamicThreadPoolServiceImpl(String applicationName, Map<String, ThreadPoolExecutor> threadPoolExecutorMap) {
        this.threadPoolExecutorMap = threadPoolExecutorMap;
        this.applicationName = applicationName;
    }


    @Override
    public List<ThreadPoolConfigEntity> listAllThreadPool() {
        Set<String> keys = threadPoolExecutorMap.keySet();
        List<ThreadPoolConfigEntity> threadPoolConfigList = new ArrayList<>(keys.size());
        keys.forEach(threadPoolName -> {
            ThreadPoolConfigEntity threadPoolConfigEntity = queryThreadPoolBythreadPoolName(threadPoolName);
            threadPoolConfigList.add(threadPoolConfigEntity);
        });
        return threadPoolConfigList;
    }

    @Override
    public ThreadPoolConfigEntity queryThreadPoolBythreadPoolName(String threadPoolName) {
        ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolName);
        if (threadPoolExecutor == null) {
            return new ThreadPoolConfigEntity(applicationName, threadPoolName);
        }
        ThreadPoolConfigEntity threadPoolConfigEntity = new ThreadPoolConfigEntity(applicationName, threadPoolName);
        threadPoolConfigEntity.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
        threadPoolConfigEntity.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
        threadPoolConfigEntity.setActiveCount(threadPoolExecutor.getActiveCount());
        threadPoolConfigEntity.setPoolSize(threadPoolExecutor.getPoolSize());
        threadPoolConfigEntity.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
        threadPoolConfigEntity.setQueueSize(threadPoolExecutor.getQueue().size());
        threadPoolConfigEntity.setRemainCapacity(threadPoolExecutor.getQueue().remainingCapacity());
        if (logger.isDebugEnabled()) {
            logger.debug("动态线程池,配置信息如下,应用名:{},线程名:{},线程配置{}", applicationName, threadPoolName, JSON.toJSONString(threadPoolConfigEntity));
        }
        return threadPoolConfigEntity;
    }

    @Override
    public void updateThreadPool(ThreadPoolConfigEntity threadPoolConfigEntity) {
        if (null == threadPoolConfigEntity || !applicationName.contains(threadPoolConfigEntity.getApplicationName())) {
            return;
        }
        ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolConfigEntity.getThreadPoolName());
        if (null == threadPoolExecutor) {
            return;
        }
   		// 一定需要先设置最大线程数,再设置核心线程
        threadPoolExecutor.setMaximumPoolSize(threadPoolConfigEntity.getMaximumPoolSize());
  		threadPoolExecutor.setCorePoolSize(threadPoolConfigEntity.getCorePoolSize());
    }
}

动态线程池注册中心

我们可以使用Redis等来作为注册中心,需要提供两个方法:

  • 将当前所有的线程池信息写入Redis
  • 将单个线程池写入Redis
public class RedisRegisterService implements IRegister {
    private final RedissonClient redissonClient;

    public RedisRegisterService(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }


    @Override
    public void reportThreadPool(List<ThreadPoolConfigEntity> threadPoolConfigEntities) {
        RList<ThreadPoolConfigEntity> list = redissonClient.getList(RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey());
        list.delete();
        list.addAll(threadPoolConfigEntities);
    }

    @Override
    public void reportThreadPoolConfigParameter(ThreadPoolConfigEntity threadPoolConfigEntity) {
        String cacheKey = RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey() + "_" + threadPoolConfigEntity.getApplicationName() + "_" + threadPoolConfigEntity.getThreadPoolName();
        RBucket<Object> bucket = redissonClient.getBucket(cacheKey);
        bucket.set(threadPoolConfigEntity, Duration.ofDays(30));
    }
}

动态线程池定时任务

可以定时获取线程池信息,写入Redis缓存中。

public class ThreadPoolDataReportJob {
    private final Logger logger = LoggerFactory.getLogger(ThreadPoolDataReportJob.class);

    private final DynamicThreadPoolServiceImpl dynamicThreadPoolService;
    private final IRegister redisRegister;


    public ThreadPoolDataReportJob(DynamicThreadPoolServiceImpl dynamicThreadPoolService, IRegister redisRegister) {
        this.dynamicThreadPoolService = dynamicThreadPoolService;
        this.redisRegister = redisRegister;
    }

    @Scheduled(cron = "0 */20 * * * *")
    public void report() {
        List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.listAllThreadPool();
        redisRegister.reportThreadPool(threadPoolConfigEntities);
        logger.info("动态配置,线程池参数{}", JSON.toJSONString(threadPoolConfigEntities));

        threadPoolConfigEntities.forEach(threadPoolConfigEntity -> {
            redisRegister.reportThreadPoolConfigParameter(threadPoolConfigEntity);
            logger.info("动态配置,线程池参数{}", JSON.toJSONString(threadPoolConfigEntity));
        });

    }
}

动态配置入口

我们需要将涉及到的服务都写入当前的配置中,当用户引入后使用,会自动的执行这些服务

  • redis服务配置
  • 线程池初始化
  • 线程池注册
  • 定时任务
/**
 * 动态配置入口
 * @author flycode
 */
@Configuration
@EnableScheduling
public class DynamicThreadPoolAutoConfig {

    private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolAutoConfig.class);

    @Bean("redissonClient")
    public RedissonClient redissonClient(DynamicThreadPoolAutoProperties dynamicThreadPoolAutoProperties) {
        Config config = new Config();
        // 设置编码
        config.setCodec(JsonJacksonCodec.INSTANCE);
        config.useSingleServer()
                .setAddress("redis://" + dynamicThreadPoolAutoProperties.getHost() + ":" + dynamicThreadPoolAutoProperties.getPort())
                .setConnectionPoolSize(dynamicThreadPoolAutoProperties.getPoolSize())
                .setPassword(dynamicThreadPoolAutoProperties.getPassword())
                .setConnectionMinimumIdleSize(dynamicThreadPoolAutoProperties.getMinIdleSize())
                .setConnectTimeout(dynamicThreadPoolAutoProperties.getConnectTimeout())
                .setRetryAttempts(dynamicThreadPoolAutoProperties.getRetryAttempts())
                .setRetryInterval(dynamicThreadPoolAutoProperties.getRetryInterval())
                .setPingConnectionInterval(dynamicThreadPoolAutoProperties.getPingInterval())
                .setKeepAlive(dynamicThreadPoolAutoProperties.isKeepAlive())
        ;

        RedissonClient redissonClient = Redisson.create(config);
        logger.info("动态线程池,注册器(Redis)初始化完成,{}-{}-{}", dynamicThreadPoolAutoProperties.getHost(), dynamicThreadPoolAutoProperties.getPort(), dynamicThreadPoolAutoProperties.getPoolSize());
        return redissonClient;
    }

    /**
     * 动态线程池初始化
     *
     * @param applicationContext
     * @param threadPoolExecutorMap
     * @return
     */
    @Bean("dynamicThreadPoolService")
    public DynamicThreadPoolServiceImpl dynamicThreadPoolService(ApplicationContext applicationContext, Map<String, ThreadPoolExecutor> threadPoolExecutorMap) {
        String applicationName = applicationContext.getEnvironment().getProperty("spring.application.name");
        if (StringUtils.isBlank(applicationName)) {
            applicationName = "default";
            logger.error("动态线程池启动,SpringBoot未配置名称");
        }
        logger.info("线程池信息{}", JSON.toJSONString(threadPoolExecutorMap));
        return new DynamicThreadPoolServiceImpl(applicationName, threadPoolExecutorMap);
    }

    /**
     * 引入Redis注册中心
     *
     * @param redissonClient
     * @return
     */
    @Bean
    public IRegister redisRegister(RedissonClient redissonClient) {
        return new RedisRegisterService(redissonClient);
    }

    /**
     * 定时任务
     *
     * @param dynamicThreadPoolService
     * @param redisRegister
     * @return
     */
    @Bean
    public ThreadPoolDataReportJob threadPoolDataReportJob(DynamicThreadPoolServiceImpl dynamicThreadPoolService, IRegister redisRegister) {
        return new ThreadPoolDataReportJob(dynamicThreadPoolService, redisRegister);
    }
}

测试

通过测试,我们可以看到已经获取了所有的线程池信息,并且将信息写入了redis

image-20250710152014517
image-20250710152014517
image-20250710152110048
image-20250710152110048

动态线程池发布订阅

我们需要新建一个监听器,,用来实现数据上报给注册中心

Redis监听器

使用MessageListener来监听Redis数据变化

public class ThreadPoolListener implements MessageListener<ThreadPoolConfigEntity> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final DynamicThreadPoolServiceImpl dynamicThreadPoolService;

    private final IRegister register;

    public ThreadPoolListener(DynamicThreadPoolServiceImpl dynamicThreadPoolService, IRegister register) {
        this.dynamicThreadPoolService = dynamicThreadPoolService;
        this.register = register;
    }

    @Override
    public void onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity) {
        logger.info("动态线程池:线程名称={},核心线程数={},最大线程数={}",threadPoolConfigEntity.getThreadPoolName(),threadPoolConfigEntity.getCorePoolSize(),threadPoolConfigEntity.getMaximumPoolSize());
        // 更新数据
        dynamicThreadPoolService.updateThreadPool(threadPoolConfigEntity);

        // 上报列表数据
        List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.listAllThreadPool();
        register.reportThreadPool(threadPoolConfigEntities);
        // 上报单个数据
        ThreadPoolConfigEntity threadPoolConfig = dynamicThreadPoolService.queryThreadPoolBythreadPoolName(threadPoolConfigEntity.getThreadPoolName());
        register.reportThreadPoolConfigParameter(threadPoolConfig);
        logger.info("动态线程池,上报线程池配置{}", JSON.toJSONString(threadPoolConfig));
    }
}

添加监听器bean

在DynamicThreadPoolAutoConfig里面加入监听器,使用RTopic实现发布订阅功能

    @Bean
    public ThreadPoolListener threadPoolListener(DynamicThreadPoolServiceImpl dynamicThreadPoolService, IRegister redisRegister) {
        return new ThreadPoolListener(dynamicThreadPoolService, redisRegister);
    }

    @Bean(name = "dynamicThreadPoolRedisTopic")
    public RTopic threadPoolListener(RedissonClient redissonClient, ThreadPoolListener threadPoolListener){
        RTopic topic = redissonClient.getTopic(RegistryEnumVO.DYNAMIC_THREAD_POOL_REDIS_TOPIC.getKey() + "-" + applicationName);
        topic.addListener(ThreadPoolConfigEntity.class,threadPoolListener);
        return topic;
    }

测试

默认线程池参数如下

server:
  port: 8080

thread:
  pool:
    executor:
      config:
        core-pool-size: 20
        maximum-pool-size: 50
        keep-alive-time: 10
        block-queue-size: 5000
        policy: AbortPolicy

spring:
  application:
    name: thread-pool-test

# 动态线程池管理配置



dynamic:
  thread:
    config:
      pool:
        # 状态;true = 开启、false 关闭
        enable: true
        # redis host
        host: localhost
        # redis port
        port: 6379

我们之前已经定义了两个线程池,我们修改threadPool01线程池的参数

@SpringBootTest
@Slf4j
@ActiveProfiles("local")
public class ApiTest {
    @Resource
    private RTopic dynamicThreadPoolRedisTopic;


    @Test
    public void test_dynamic_thread_pool_RedisTopic() throws InterruptedException {
        ThreadPoolConfigEntity threadPoolConfigEntity =new ThreadPoolConfigEntity();
        threadPoolConfigEntity.setApplicationName("thread-pool-test");
        threadPoolConfigEntity.setThreadPoolName("threadPool01");
        threadPoolConfigEntity.setMaximumPoolSize(100);
        threadPoolConfigEntity.setCorePoolSize(100);
        dynamicThreadPoolRedisTopic.publish(threadPoolConfigEntity);
        new CountDownLatch(1).await();
    }
}
image-20250715111607192
image-20250715111607192

可以看到这个线程核心线程数变成了100,最大线程数变成了100,实现了发布订阅功能,主动推动更新数据

贡献者

  • flycodeuflycodeu

公告板

2025-03-04正式迁移知识库到此项目