动态线程池
约 2489 字大约 8 分钟
2025-07-09
什么是线程池
线程池是一种基于池化思想管理线程的工具,用来降低资源消耗、提高响应速度、提高线程管理。池化技术的引入,可以有效的减少线程频繁申请/销毁和调度带来的额外开销。
ThreadPoolExecutor

线程池在内部实际上构建了⼀个⽣产者消费者模型,将线程和任务两者解耦,并不直接关联,从⽽良好的缓冲任 务,复⽤线程。线程池的运⾏主要分成两部分:任务管理、线程管理。
任务管理部分充当⽣产者的⻆⾊,当任务提交后,线程池会判断该任务后续的流转:
直接申请线程执⾏该任务;
缓冲到队列中等待线程执⾏;
拒绝该任务。
线程管理部分是消费者,它们被统⼀维护在线程池内,根据任务请求进⾏线程的分配,当线程
执⾏完任务后则会继续获取新的任务去执⾏,最终当线程获取不到任务的时候,线程就会被回收。
生命周期
线程池的内部使用运行状态和线程数量来维护,而不是由用户主动操作。

任务调度流程
![资料:Java线程池实现原理及其在美团业务中的实践 Image[5]](https://flycodeu-1314556962.cos.ap-nanjing.myqcloud.com/codeCenterImg/资料:Java线程池实现原理及其在美团业务中的实践 Image[5].jpg)
任务阻塞队列
阻塞队列(BlockingQueue)是⼀个⽀持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线 程会等待队列变为⾮空。当队列满时,存储元素的线程会等待队列可⽤。阻塞队列常⽤于⽣产者和消费者的场景, ⽣产者是往队列⾥添加元素的线程,消费者是从队列⾥拿元素的线程。阻塞队列就是⽣产者存放元素的容器,⽽消 费者也只从容器⾥拿元素。
![资料:Java线程池实现原理及其在美团业务中的实践 Image[6]](https://flycodeu-1314556962.cos.ap-nanjing.myqcloud.com/codeCenterImg/资料:Java线程池实现原理及其在美团业务中的实践 Image[6].jpg)
任务拒绝
![资料:Java线程池实现原理及其在美团业务中的实践 Image[8]](https://flycodeu-1314556962.cos.ap-nanjing.myqcloud.com/codeCenterImg/资料:Java线程池实现原理及其在美团业务中的实践 Image[8].jpg)
以上内容参考
自定义简易获取线程池
我们需要创建两个项目,一个是动态线程池SDK,另一个是测试项目。
动态线程池SDK
只要其他项目引入,并且配置了ThreadPoolExecutor,那么这个方法就能拦截
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
动态配置入口
/**
* 动态配置入口
*/
@Configuration
public class DynamicThreadPoolAutoConfig {
private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolAutoConfig.class);
@Bean("dynamicThreadPoolService")
public String dynamicThreadPoolService(ApplicationContext applicationContext, Map<String, ThreadPoolExecutor> threadPoolExecutor) {
String applicationName = applicationContext.getEnvironment().getProperty("spring.application.name");
if (StringUtils.isBlank(applicationName)) {
applicationName = "default";
logger.error("动态线程池启动,SpringBoot未配置名称");
}
logger.info("线程池信息{}", JSON.toJSONString(threadPoolExecutor));
return new String();
}
}
我们需要在resources配置的META-INF/spring.foctories,实现自动装配,配置到当前配置类位置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.flycode.dynamic.thread.pool.sdk.config.DynamicThreadPoolAutoConfig
将此项目打包
测试动态线程池
我们需要将线程核心参数抽离出来,而不是写死,我们只需要在yml中配置响应参数即可。
@Component
@Data
@ConfigurationProperties(prefix = "thread.pool.executor.config", ignoreUnknownFields = true)
public class ThreadPoolConfigProperties {
/**
* 核心线程数
*/
private int corePoolSize = 20;
/**
* 最大线程数
*/
private int maximumPoolSize = 100;
/**
* 线程存活时间
*/
private long keepAliveTime = 10L;
/**
* 阻塞队列长度
*/
private int blockQueueSize = 5000;
/**
* 丢弃任务方式
*/
private String policy = "AbortPolicy";
}
我们可以创建两个线程池,便于看出区别
@Slf4j
@EnableAsync
@Configuration
@EnableConfigurationProperties({ThreadPoolConfigProperties.class})
public class ThreadPoolConfig {
@Bean("threadPool01")
public ThreadPoolExecutor threadPool01(ThreadPoolConfigProperties threadPoolConfigProperties) {
RejectedExecutionHandler rejectedExecutionHandler;
switch (threadPoolConfigProperties.getPolicy()) {
case "AbortPolicy":
rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
break;
case "DiscardPolicy":
rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
break;
case "DiscardOldestPolicy":
rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
break;
default:
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
// 创建线程池
return new ThreadPoolExecutor(
threadPoolConfigProperties.getCorePoolSize(),
threadPoolConfigProperties.getMaximumPoolSize(),
threadPoolConfigProperties.getKeepAliveTime(),
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(threadPoolConfigProperties.getBlockQueueSize()),
Executors.defaultThreadFactory(),
rejectedExecutionHandler
);
}
@Bean("threadPool02")
public ThreadPoolExecutor threadPool02(ThreadPoolConfigProperties threadPoolConfigProperties) {
RejectedExecutionHandler rejectedExecutionHandler;
switch (threadPoolConfigProperties.getPolicy()) {
case "AbortPolicy":
rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
break;
case "DiscardPolicy":
rejectedExecutionHandler = new ThreadPoolExecutor.DiscardPolicy();
break;
case "DiscardOldestPolicy":
rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
break;
default:
rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
}
// 创建线程池
return new ThreadPoolExecutor(
threadPoolConfigProperties.getCorePoolSize(),
threadPoolConfigProperties.getMaximumPoolSize(),
threadPoolConfigProperties.getKeepAliveTime(),
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(threadPoolConfigProperties.getBlockQueueSize()),
Executors.defaultThreadFactory(),
rejectedExecutionHandler
);
}
}
测试
当测试项目启动后,打上断点,我们可以看到当前已经读取到所有的线程池,便于我们之后动态的修改线程数量。


动态线程池数据上报
我们需要构造如下几个方法
- 获取当前所有的线程池信息
- 根据线程池名称获取指定线程池信息
- 根据线程池配置更新线程池信息
动态线程池服务实现类
/**
* 动态线程池服务实现类
*
* @author flycode
*/
public class DynamicThreadPoolServiceImpl implements IDynamicThreadPoolService {
private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolServiceImpl.class);
private final Map<String, ThreadPoolExecutor> threadPoolExecutorMap;
private final String applicationName;
public DynamicThreadPoolServiceImpl(String applicationName, Map<String, ThreadPoolExecutor> threadPoolExecutorMap) {
this.threadPoolExecutorMap = threadPoolExecutorMap;
this.applicationName = applicationName;
}
@Override
public List<ThreadPoolConfigEntity> listAllThreadPool() {
Set<String> keys = threadPoolExecutorMap.keySet();
List<ThreadPoolConfigEntity> threadPoolConfigList = new ArrayList<>(keys.size());
keys.forEach(threadPoolName -> {
ThreadPoolConfigEntity threadPoolConfigEntity = queryThreadPoolBythreadPoolName(threadPoolName);
threadPoolConfigList.add(threadPoolConfigEntity);
});
return threadPoolConfigList;
}
@Override
public ThreadPoolConfigEntity queryThreadPoolBythreadPoolName(String threadPoolName) {
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolName);
if (threadPoolExecutor == null) {
return new ThreadPoolConfigEntity(applicationName, threadPoolName);
}
ThreadPoolConfigEntity threadPoolConfigEntity = new ThreadPoolConfigEntity(applicationName, threadPoolName);
threadPoolConfigEntity.setCorePoolSize(threadPoolExecutor.getCorePoolSize());
threadPoolConfigEntity.setMaximumPoolSize(threadPoolExecutor.getMaximumPoolSize());
threadPoolConfigEntity.setActiveCount(threadPoolExecutor.getActiveCount());
threadPoolConfigEntity.setPoolSize(threadPoolExecutor.getPoolSize());
threadPoolConfigEntity.setQueueType(threadPoolExecutor.getQueue().getClass().getSimpleName());
threadPoolConfigEntity.setQueueSize(threadPoolExecutor.getQueue().size());
threadPoolConfigEntity.setRemainCapacity(threadPoolExecutor.getQueue().remainingCapacity());
if (logger.isDebugEnabled()) {
logger.debug("动态线程池,配置信息如下,应用名:{},线程名:{},线程配置{}", applicationName, threadPoolName, JSON.toJSONString(threadPoolConfigEntity));
}
return threadPoolConfigEntity;
}
@Override
public void updateThreadPool(ThreadPoolConfigEntity threadPoolConfigEntity) {
if (null == threadPoolConfigEntity || !applicationName.contains(threadPoolConfigEntity.getApplicationName())) {
return;
}
ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(threadPoolConfigEntity.getThreadPoolName());
if (null == threadPoolExecutor) {
return;
}
// 一定需要先设置最大线程数,再设置核心线程
threadPoolExecutor.setMaximumPoolSize(threadPoolConfigEntity.getMaximumPoolSize());
threadPoolExecutor.setCorePoolSize(threadPoolConfigEntity.getCorePoolSize());
}
}
动态线程池注册中心
我们可以使用Redis等来作为注册中心,需要提供两个方法:
- 将当前所有的线程池信息写入Redis
- 将单个线程池写入Redis
public class RedisRegisterService implements IRegister {
private final RedissonClient redissonClient;
public RedisRegisterService(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
@Override
public void reportThreadPool(List<ThreadPoolConfigEntity> threadPoolConfigEntities) {
RList<ThreadPoolConfigEntity> list = redissonClient.getList(RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey());
list.delete();
list.addAll(threadPoolConfigEntities);
}
@Override
public void reportThreadPoolConfigParameter(ThreadPoolConfigEntity threadPoolConfigEntity) {
String cacheKey = RegistryEnumVO.THREAD_POOL_CONFIG_LIST_KEY.getKey() + "_" + threadPoolConfigEntity.getApplicationName() + "_" + threadPoolConfigEntity.getThreadPoolName();
RBucket<Object> bucket = redissonClient.getBucket(cacheKey);
bucket.set(threadPoolConfigEntity, Duration.ofDays(30));
}
}
动态线程池定时任务
可以定时获取线程池信息,写入Redis缓存中。
public class ThreadPoolDataReportJob {
private final Logger logger = LoggerFactory.getLogger(ThreadPoolDataReportJob.class);
private final DynamicThreadPoolServiceImpl dynamicThreadPoolService;
private final IRegister redisRegister;
public ThreadPoolDataReportJob(DynamicThreadPoolServiceImpl dynamicThreadPoolService, IRegister redisRegister) {
this.dynamicThreadPoolService = dynamicThreadPoolService;
this.redisRegister = redisRegister;
}
@Scheduled(cron = "0 */20 * * * *")
public void report() {
List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.listAllThreadPool();
redisRegister.reportThreadPool(threadPoolConfigEntities);
logger.info("动态配置,线程池参数{}", JSON.toJSONString(threadPoolConfigEntities));
threadPoolConfigEntities.forEach(threadPoolConfigEntity -> {
redisRegister.reportThreadPoolConfigParameter(threadPoolConfigEntity);
logger.info("动态配置,线程池参数{}", JSON.toJSONString(threadPoolConfigEntity));
});
}
}
动态配置入口
我们需要将涉及到的服务都写入当前的配置中,当用户引入后使用,会自动的执行这些服务
- redis服务配置
- 线程池初始化
- 线程池注册
- 定时任务
/**
* 动态配置入口
* @author flycode
*/
@Configuration
@EnableScheduling
public class DynamicThreadPoolAutoConfig {
private final Logger logger = LoggerFactory.getLogger(DynamicThreadPoolAutoConfig.class);
@Bean("redissonClient")
public RedissonClient redissonClient(DynamicThreadPoolAutoProperties dynamicThreadPoolAutoProperties) {
Config config = new Config();
// 设置编码
config.setCodec(JsonJacksonCodec.INSTANCE);
config.useSingleServer()
.setAddress("redis://" + dynamicThreadPoolAutoProperties.getHost() + ":" + dynamicThreadPoolAutoProperties.getPort())
.setConnectionPoolSize(dynamicThreadPoolAutoProperties.getPoolSize())
.setPassword(dynamicThreadPoolAutoProperties.getPassword())
.setConnectionMinimumIdleSize(dynamicThreadPoolAutoProperties.getMinIdleSize())
.setConnectTimeout(dynamicThreadPoolAutoProperties.getConnectTimeout())
.setRetryAttempts(dynamicThreadPoolAutoProperties.getRetryAttempts())
.setRetryInterval(dynamicThreadPoolAutoProperties.getRetryInterval())
.setPingConnectionInterval(dynamicThreadPoolAutoProperties.getPingInterval())
.setKeepAlive(dynamicThreadPoolAutoProperties.isKeepAlive())
;
RedissonClient redissonClient = Redisson.create(config);
logger.info("动态线程池,注册器(Redis)初始化完成,{}-{}-{}", dynamicThreadPoolAutoProperties.getHost(), dynamicThreadPoolAutoProperties.getPort(), dynamicThreadPoolAutoProperties.getPoolSize());
return redissonClient;
}
/**
* 动态线程池初始化
*
* @param applicationContext
* @param threadPoolExecutorMap
* @return
*/
@Bean("dynamicThreadPoolService")
public DynamicThreadPoolServiceImpl dynamicThreadPoolService(ApplicationContext applicationContext, Map<String, ThreadPoolExecutor> threadPoolExecutorMap) {
String applicationName = applicationContext.getEnvironment().getProperty("spring.application.name");
if (StringUtils.isBlank(applicationName)) {
applicationName = "default";
logger.error("动态线程池启动,SpringBoot未配置名称");
}
logger.info("线程池信息{}", JSON.toJSONString(threadPoolExecutorMap));
return new DynamicThreadPoolServiceImpl(applicationName, threadPoolExecutorMap);
}
/**
* 引入Redis注册中心
*
* @param redissonClient
* @return
*/
@Bean
public IRegister redisRegister(RedissonClient redissonClient) {
return new RedisRegisterService(redissonClient);
}
/**
* 定时任务
*
* @param dynamicThreadPoolService
* @param redisRegister
* @return
*/
@Bean
public ThreadPoolDataReportJob threadPoolDataReportJob(DynamicThreadPoolServiceImpl dynamicThreadPoolService, IRegister redisRegister) {
return new ThreadPoolDataReportJob(dynamicThreadPoolService, redisRegister);
}
}
测试
通过测试,我们可以看到已经获取了所有的线程池信息,并且将信息写入了redis


动态线程池发布订阅
我们需要新建一个监听器,,用来实现数据上报给注册中心
Redis监听器
使用MessageListener来监听Redis数据变化
public class ThreadPoolListener implements MessageListener<ThreadPoolConfigEntity> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final DynamicThreadPoolServiceImpl dynamicThreadPoolService;
private final IRegister register;
public ThreadPoolListener(DynamicThreadPoolServiceImpl dynamicThreadPoolService, IRegister register) {
this.dynamicThreadPoolService = dynamicThreadPoolService;
this.register = register;
}
@Override
public void onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity) {
logger.info("动态线程池:线程名称={},核心线程数={},最大线程数={}",threadPoolConfigEntity.getThreadPoolName(),threadPoolConfigEntity.getCorePoolSize(),threadPoolConfigEntity.getMaximumPoolSize());
// 更新数据
dynamicThreadPoolService.updateThreadPool(threadPoolConfigEntity);
// 上报列表数据
List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.listAllThreadPool();
register.reportThreadPool(threadPoolConfigEntities);
// 上报单个数据
ThreadPoolConfigEntity threadPoolConfig = dynamicThreadPoolService.queryThreadPoolBythreadPoolName(threadPoolConfigEntity.getThreadPoolName());
register.reportThreadPoolConfigParameter(threadPoolConfig);
logger.info("动态线程池,上报线程池配置{}", JSON.toJSONString(threadPoolConfig));
}
}
添加监听器bean
在DynamicThreadPoolAutoConfig里面加入监听器,使用RTopic实现发布订阅功能
@Bean
public ThreadPoolListener threadPoolListener(DynamicThreadPoolServiceImpl dynamicThreadPoolService, IRegister redisRegister) {
return new ThreadPoolListener(dynamicThreadPoolService, redisRegister);
}
@Bean(name = "dynamicThreadPoolRedisTopic")
public RTopic threadPoolListener(RedissonClient redissonClient, ThreadPoolListener threadPoolListener){
RTopic topic = redissonClient.getTopic(RegistryEnumVO.DYNAMIC_THREAD_POOL_REDIS_TOPIC.getKey() + "-" + applicationName);
topic.addListener(ThreadPoolConfigEntity.class,threadPoolListener);
return topic;
}
测试
默认线程池参数如下
server:
port: 8080
thread:
pool:
executor:
config:
core-pool-size: 20
maximum-pool-size: 50
keep-alive-time: 10
block-queue-size: 5000
policy: AbortPolicy
spring:
application:
name: thread-pool-test
# 动态线程池管理配置
dynamic:
thread:
config:
pool:
# 状态;true = 开启、false 关闭
enable: true
# redis host
host: localhost
# redis port
port: 6379
我们之前已经定义了两个线程池,我们修改threadPool01线程池的参数
@SpringBootTest
@Slf4j
@ActiveProfiles("local")
public class ApiTest {
@Resource
private RTopic dynamicThreadPoolRedisTopic;
@Test
public void test_dynamic_thread_pool_RedisTopic() throws InterruptedException {
ThreadPoolConfigEntity threadPoolConfigEntity =new ThreadPoolConfigEntity();
threadPoolConfigEntity.setApplicationName("thread-pool-test");
threadPoolConfigEntity.setThreadPoolName("threadPool01");
threadPoolConfigEntity.setMaximumPoolSize(100);
threadPoolConfigEntity.setCorePoolSize(100);
dynamicThreadPoolRedisTopic.publish(threadPoolConfigEntity);
new CountDownLatch(1).await();
}
}

可以看到这个线程核心线程数变成了100,最大线程数变成了100,实现了发布订阅功能,主动推动更新数据
贡献者
flycodeu
版权所有
版权归属:flycodeu