线程池使用InheritableThreadLocal出现数据脏乱分析和解决方案

线程池使用InheritableThreadLocal出现数据脏乱分析和解决方案

背景

  在测试环境上遇到一个诡异的问题,部分业务逻辑会记录用户ID到数据库,但记录的数据会串,比如当前用户的操作记录会被其他用户覆盖, 而且这个现象是每次重启后一小段时间内就正常

问题

  在线程池内部使用了InheritableThreadLocal存放用户登录信息,再获取用户信息后,由于没有及时remove,导致下次请求还是得到旧的用户数据

 

排查过程

  1.通过临时打印日志,确认整个链路中用户ID是否一致

  2.确认写日志方法是否有被修改,最后确认是写日志这块开了线程池后导致问题

 

思考

1.为什么在之前测试过程没有复现

  之前依赖的日志记录二方jar包进行过一次升级,原先的版本在记录日志没有开启线程池,后面升级版本后,加入了线程池

 

2.分析为什么用了InheritableThreadLocal还会出现数据错乱(覆盖)

   ThreadLocalThreadLocalMap 中是以一个弱引用身份被Entry中的Key引用的,因此如果ThreadLocal没有外部强引用来引用它,那么ThreadLocal会在下次JVM垃圾收集时被回收。这个时候就会出现Entry中Key已经被回收,出现一个null Key的情况,外部读取ThreadLocalMap中的元素是无法通过null Key来找到Value的。因此如果当前线程的生命周期很长(在本篇案例中,由于线程池的核心线程没有被回收,一直存在),那么其内部的ThreadLocalMap对象也一直生存下来,这些null key就存在一条强引用链的关系一直存在:Thread --> ThreadLocalMap-->Entry-->Value,这条强引用链会导致Entry不会回收,Value也不会回收;所以出现数据错乱的原因在于核心线程一直没有被回收,然后 InheritableThreadLocal 也未及时remove,导致核心线程一直存放着老

 

3.本地求证  

  为了能复现测试环境问题,我写了一个demo在本地去复现这个问题,开启了一个线程器并设置了2个核心线程,调用4次

 

 

 

 

根据上面的运行日志结果,可以发现第二次的设值并没有真正改变

 

分析线程池的核心线程是如何不被回收的,源码跟踪如下

1.跟进线程池execute主方法入口

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))  //未超过核心线程数,则新增 Worker 对象,true表示核心线程
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

由上可知核心线程通过addWorker方法创建

 

2.跟进addWorker方法

  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); //执行核心线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

 由上可知核心线程在addWorker方法内部通过 t.start()执行

 

3.跟进Work类里面的run方法

 由上可知通过While去不断轮询getTask方法来保证核心线程被创建后一直处于阻塞状态,所以可知核心线程数创建后会通过阻塞来保证不被回收

 

总结

  1.每次用完后ThreadLocal后及时remove

  2.尽量不要在线程池里使用 ThreadLocal,很多时候开发关注的点比较多,疏忽的过程也在所难免,例如本次解决方案是我先把用户ID在线程池调用前查询出来,在传进去

原文地址:https://www.cnblogs.com/zdd-java/archive/2022/05/17/16267188.html