1.将任务提交给异步redis

Map<String, CompletableFuture<Boolean>> futureMap = new ConcurrentHashMap<>();
List<String> pwss = CipherUtils.generatePasswordsBulk(emailSet.size());
AtomicInteger index = new AtomicInteger(0);
  // 等待所有任务提交完成
  CompletableFuture.allOf(bathSet.stream()
          .map(bath -> {
              int passwordIndex = index.getAndIncrement();
              LoginParam param = buildLoginParam(bath , pwss.get(passwordIndex));
              return redissonUtils.executeTaskWithLockManagementAsync(
                      BEACHL, () -> {
                          // 执行业务方法
                          CompletableFuture<Boolean> registerFuture =
                                  this.batch(param);
                          futureMap.put(email, registerFuture);
                      }, registrationTaskExecutor, email # redis key
              );
          }).toArray(CompletableFuture[]::new)).join(); # 等待redis 提交完成 即futureMap组装完成
  // 等待业务逻辑完成
  CompletableFuture.allOf(futureMap.values().toArray(new CompletableFuture[0])).join();
// 执行后续逻辑操作

2.redis内部实现

public CompletableFuture<Void> executeTaskWithLockManagementAsync(RedissonKeyEnum em, Runnable task, Executor executor, Object... objects) {
        String key = String.format(em.getPrefix(), objects);
        RLock lock = redisson.getLock(key);
        return lock.lockAsync(12, TimeUnit.SECONDS) # 锁等待时间
                .thenComposeAsync(lockAcquired -> CompletableFuture.runAsync(task, executor), executor)
                .whenComplete((res, ex) -> lock.unlockAsync().exceptionally(e -> null))
                .toCompletableFuture();
    }

3.业务代码

public CompletableFuture<Boolean> batch(LoginParam param) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 业务代码
                return user != null;
            } catch (Exception e) {
                log.error("batch e:{}", e.getMessage(), e);
                return false;
            }
        }, registrationTaskExecutor);

4.线程池创建

@Bean(name = "registrationTaskExecutor")
    public Executor registrationTaskExecutor() {
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(corePoolSize * 2);
        # 对于响应时间无要求的话,可以将任务队列增大,放入队列后缓慢消费; 如对响应时间有硬性要求,则队列长度不宜过大,过大将导致临时线程无法创建
        # 流程:核心线程 -> 占满 -> 放入队列 -> 队列占满 -> 创建临时线程 -> 达到最大线程数 -> 启动淘汰策略
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("registration-task-");
        executor.initialize();
        return executor;
    }

5.时间过长排查

  • 查看任务队列是否占满,如未占满则是临时线程未创建,放入队列缓慢消费

  • 查看连接池配置,是否有线程在等待连接

  • 如使用druid连接池

# http://你的应用IP:端口/druid/sql.html

# 确保已开启监控
dynamic:
  druid:
    stat-view-servlet:
      enabled: true
      url-pattern: /druid/*

列名

示例值

说明

N

6

SQL 编号(按执行次数排序)

SQL▼

INSERT INTO...

具体的 SQL 语句(可能被截断)

执行数

69

该 SQL 累计执行次数

执行时间

1,033

该 SQL 所有执行的总耗时(单位:ms),即所有69次插入共耗时1033ms

最慢

18

该 SQL 单次执行的最长时间(单位:ms),即某次插入耗时18ms

事务执行

69

该 SQL 在事务中执行的次数(若未开启事务则为0)

错误数

执行该 SQL 发生错误的次数

更新行数

69

该 SQL 累计影响的行数(对于INSERT,即插入69条数据)

读取行数

69

该 SQL 累计读取的行数(对于INSERT,此值通常为0,但示例可能统计异常)

执行中

7

当前正在执行该 SQL 的线程数

最大并发

[0,0,69,...]

不同耗时区间的执行次数分布,例如: [0-1ms, 1-10ms, 10-100ms, ...] 的统计

  • 抓取线程堆栈

  1. 当卡顿时立即执行 jstack <PID> > thread_dump.log

  2. pid 通过:jps -l 查询 例如:

PS C:\Users\lenovo> jps -l
35920 sun.tools.jps.Jps
4960 com.intellij.idea.Main
5472 org.jetbrains.jps.cmdline.Launcher
16644 org.jetbrains.idea.maven.server.RemoteMavenServer36
4692 finalshell.jar
38984 cn.iocoder.yudao.server.YudaoServerApplication
4616 com.intellij.database.remote.RemoteJdbcServer
4920 com.intellij.idea.Main

# 查看 ServerApplication 后缀
  • 搜索关键线程名(如 registration-task-37

"registration-task-37" #123 prio=5 os_prio=0 tid=0x00007f8d3c123000 nid=0x8a1f waiting on condition [...]
  1. 若状态为 BLOCKED,说明线程在等待锁

  2. 若状态为 WAITING,可能卡在I/O操作(如数据库连接池获取)

  • 检查线程堆栈调用链

at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)

如果看到类似堆栈,说明线程在等待网络I/O(如调用OSS、数据库、第三方API)

  • 排查jvm相关