1.将任务提交给异步redis
Map<String, CompletableFuture<Boolean>> futureMap = new ConcurrentHashMap<>();
List<String> pwss = CipherUtils.generatePasswordsBulk(emailSet.size());
AtomicInteger index = new AtomicInteger(0);
// 等待所有任务提交完成
CompletableFuture.allOf(bathSet.stream()
.map(bath -> {
int passwordIndex = index.getAndIncrement();
LoginParam param = buildLoginParam(bath , pwss.get(passwordIndex));
return redissonUtils.executeTaskWithLockManagementAsync(
BEACHL, () -> {
// 执行业务方法
CompletableFuture<Boolean> registerFuture =
this.batch(param);
futureMap.put(email, registerFuture);
}, registrationTaskExecutor, email # redis key
);
}).toArray(CompletableFuture[]::new)).join(); # 等待redis 提交完成 即futureMap组装完成
// 等待业务逻辑完成
CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join();
// 执行后续逻辑操作
2.redis内部实现
public CompletableFuture<Void> executeTaskWithLockManagementAsync(RedissonKeyEnum em, Runnable task, Executor executor, Object... objects) {
String key = String.format(em.getPrefix(), objects);
RLock lock = redisson.getLock(key);
return lock.lockAsync(12, TimeUnit.SECONDS) # 锁等待时间
.thenComposeAsync(lockAcquired -> CompletableFuture.runAsync(task, executor), executor)
.whenComplete((res, ex) -> lock.unlockAsync().exceptionally(e -> null))
.toCompletableFuture();
}
3.业务代码
public CompletableFuture<Boolean> batch(LoginParam param) {
return CompletableFuture.supplyAsync(() -> {
try {
// 业务代码
return user != null;
} catch (Exception e) {
log.error("batch e:{}", e.getMessage(), e);
return false;
}
}, registrationTaskExecutor);
4.线程池创建
@Bean(name = "registrationTaskExecutor")
public Executor registrationTaskExecutor() {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(corePoolSize * 2);
# 对于响应时间无要求的话,可以将任务队列增大,放入队列后缓慢消费; 如对响应时间有硬性要求,则队列长度不宜过大,过大将导致临时线程无法创建
# 流程:核心线程 -> 占满 -> 放入队列 -> 队列占满 -> 创建临时线程 -> 达到最大线程数 -> 启动淘汰策略
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("registration-task-");
executor.initialize();
return executor;
}
5.时间过长排查
查看任务队列是否占满,如未占满则是临时线程未创建,放入队列缓慢消费
查看连接池配置,是否有线程在等待连接
如使用druid连接池
# http://你的应用IP:端口/druid/sql.html
# 确保已开启监控
dynamic:
druid:
stat-view-servlet:
enabled: true
url-pattern: /druid/*
抓取线程堆栈
当卡顿时立即执行 jstack <PID> > thread_dump.log
pid 通过:jps -l 查询 例如:
PS C:\Users\lenovo> jps -l
35920 sun.tools.jps.Jps
4960 com.intellij.idea.Main
5472 org.jetbrains.jps.cmdline.Launcher
16644 org.jetbrains.idea.maven.server.RemoteMavenServer36
4692 finalshell.jar
38984 cn.iocoder.yudao.server.YudaoServerApplication
4616 com.intellij.database.remote.RemoteJdbcServer
4920 com.intellij.idea.Main
# 查看 ServerApplication 后缀
搜索关键线程名(如
registration-task-37
)
"registration-task-37" #123 prio=5 os_prio=0 tid=0x00007f8d3c123000 nid=0x8a1f waiting on condition [...]
若状态为
BLOCKED
,说明线程在等待锁若状态为
WAITING
,可能卡在I/O操作(如数据库连接池获取)
检查线程堆栈调用链:
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
如果看到类似堆栈,说明线程在等待网络I/O(如调用OSS、数据库、第三方API)
排查jvm相关