源码分析之ThreadLocal


声明:本文转载自https://my.oschina.net/qixiaobo025/blog/1549815,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

前言

Java项目中通常为了并发数据准确性经常使用Lock或者synchronized来作为并发的手段。

也就是说作为共享资源必然需要通过同步等手段来实现。那么转换一下思路,

我们确实在每个地方都需要用到共享资源么?

如果我们所有的变量都是私有的 那自然不需要同步就是thread-safe的

ThreadLocal就是这样应运而生。人如其名就是线程私有对象。

实现

ThreadLocal

/* <p>Each thread holds an implicit reference to its copy of a thread-local  * variable as long as the thread is alive and the <tt>ThreadLocal</tt>  * instance is accessible; after a thread goes away, all of its copies of  * thread-local instances are subject to garbage collection (unless other  * references to these copies exist).  *  * @author  Josh Bloch and Doug Lea  * @since   1.2  */ public class ThreadLocal<T> {     /**      * ThreadLocals rely on per-thread linear-probe hash maps attached      * to each thread (Thread.threadLocals and      * inheritableThreadLocals).  The ThreadLocal objects act as keys,      * searched via threadLocalHashCode.  This is a custom hash code      * (useful only within ThreadLocalMaps) that eliminates collisions      * in the common case where consecutively constructed ThreadLocals      * are used by the same threads, while remaining well-behaved in      * less common cases.      */     private final int threadLocalHashCode = nextHashCode();       /**      * The next hash code to be given out. Updated atomically. Starts at      * zero.      */     private static AtomicInteger nextHashCode =         new AtomicInteger();       /**      * The difference between successively generated hash codes - turns      * implicit sequential thread-local IDs into near-optimally spread      * multiplicative hash values for power-of-two-sized tables.      */     private static final int HASH_INCREMENT = 0x61c88647;       /**      * Returns the next hash code.      */     private static int nextHashCode() {         return nextHashCode.getAndAdd(HASH_INCREMENT);     }       /**      * Returns the current thread's "initial value" for this      * thread-local variable.  This method will be invoked the first      * time a thread accesses the variable with the {@link #get}      * method, unless the thread previously invoked the {@link #set}      * method, in which case the <tt>initialValue</tt> method will not      * be invoked for the thread.  Normally, this method is invoked at      * most once per thread, but it may be invoked again in case of      * subsequent invocations of {@link #remove} followed by {@link #get}.      *      * <p>This implementation simply returns <tt>null</tt>; if the      * programmer desires thread-local variables to have an initial      * value other than <tt>null</tt>, <tt>ThreadLocal</tt> must be      * subclassed, and this method overridden.  Typically, an      * anonymous inner class will be used.      *      * @return the initial value for this thread-local      */     protected T initialValue() {         return null;     }       /**      * Creates a thread local variable.      */     public ThreadLocal() {     }       /**      * Returns the value in the current thread's copy of this      * thread-local variable.  If the variable has no value for the      * current thread, it is first initialized to the value returned      * by an invocation of the {@link #initialValue} method.      *      * @return the current thread's value of this thread-local      */     public T get() {         Thread t = Thread.currentThread();         ThreadLocalMap map = getMap(t);         if (map != null) {             ThreadLocalMap.Entry e = map.getEntry(this);             if (e != null)                 return (T)e.value;         }         return setInitialValue();     }       /**      * Variant of set() to establish initialValue. Used instead      * of set() in case user has overridden the set() method.      *      * @return the initial value      */     private T setInitialValue() {         T value = initialValue();         Thread t = Thread.currentThread();         ThreadLocalMap map = getMap(t);         if (map != null)             map.set(this, value);         else             createMap(t, value);         return value;     }       /**      * Sets the current thread's copy of this thread-local variable      * to the specified value.  Most subclasses will have no need to      * override this method, relying solely on the {@link #initialValue}      * method to set the values of thread-locals.      *      * @param value the value to be stored in the current thread's copy of      *        this thread-local.      */     public void set(T value) {         Thread t = Thread.currentThread();         ThreadLocalMap map = getMap(t);         if (map != null)             map.set(this, value);         else             createMap(t, value);     }       /**      * Removes the current thread's value for this thread-local      * variable.  If this thread-local variable is subsequently      * {@linkplain #get read} by the current thread, its value will be      * reinitialized by invoking its {@link #initialValue} method,      * unless its value is {@linkplain #set set} by the current thread      * in the interim.  This may result in multiple invocations of the      * <tt>initialValue</tt> method in the current thread.      *      * @since 1.5      */      public void remove() {          ThreadLocalMap m = getMap(Thread.currentThread());          if (m != null)              m.remove(this);      }       /**      * Get the map associated with a ThreadLocal. Overridden in      * InheritableThreadLocal.      *      * @param  t the current thread      * @return the map      */     ThreadLocalMap getMap(Thread t) {         return t.threadLocals;     }       /**      * Create the map associated with a ThreadLocal. Overridden in      * InheritableThreadLocal.      *      * @param t the current thread      * @param firstValue value for the initial entry of the map      * @param map the map to store.      */     void createMap(Thread t, T firstValue) {         t.threadLocals = new ThreadLocalMap(this, firstValue);     } }

从上述代码可以看到ThreadLocal是存放在当前线程中

Thread t = Thread.currentThread();

通过上述代码获取到当前线程。而线程存在一个字段threadLocals这个

这个存放了一个ThreadLocalMap(名字是map但是没有实现map的接口 实际确实也是干的map的事情)

其实可以粗略的认为每个线程存在一个HashMap key是ThreadLocal变量 value为对应泛型的值

那么获取对应数据只要使用get即可(也提供了初始化变量功能)

放入数据直接调用set即可

ThreadLocalMap

/**  * ThreadLocalMap is a customized hash map suitable only for  * maintaining thread local values. No operations are exported  * outside of the ThreadLocal class. The class is package private to  * allow declaration of fields in class Thread.  To help deal with  * very large and long-lived usages, the hash table entries use  * WeakReferences for keys. However, since reference queues are not  * used, stale entries are guaranteed to be removed only when  * the table starts running out of space.  */ static class ThreadLocalMap {       /**      * The entries in this hash map extend WeakReference, using      * its main ref field as the key (which is always a      * ThreadLocal object).  Note that null keys (i.e. entry.get()      * == null) mean that the key is no longer referenced, so the      * entry can be expunged from table.  Such entries are referred to      * as "stale entries" in the code that follows.      */     static class Entry extends WeakReference<ThreadLocal> {         /** The value associated with this ThreadLocal. */         Object value;           Entry(ThreadLocal k, Object v) {             super(k);             value = v;         }     }       /**      * The initial capacity -- MUST be a power of two.      */     private static final int INITIAL_CAPACITY = 16;       /**      * The table, resized as necessary.      * table.length MUST always be a power of two.      */     private Entry[] table;       /**      * The number of entries in the table.      */     private int size = 0;       /**      * The next size value at which to resize.      */     private int threshold; // Default to 0       /**      * Set the resize threshold to maintain at worst a 2/3 load factor.      */     private void setThreshold(int len) {         threshold = len * 2 / 3;     }       /**      * Increment i modulo len.      */     private static int nextIndex(int i, int len) {         return ((i + 1 < len) ? i + 1 : 0);     }       /**      * Decrement i modulo len.      */     private static int prevIndex(int i, int len) {         return ((i - 1 >= 0) ? i - 1 : len - 1);     }       /**      * Construct a new map initially containing (firstKey, firstValue).      * ThreadLocalMaps are constructed lazily, so we only create      * one when we have at least one entry to put in it.      */     ThreadLocalMap(ThreadLocal firstKey, Object firstValue) {         table = new Entry[INITIAL_CAPACITY];         int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);         table[i] = new Entry(firstKey, firstValue);         size = 1;         setThreshold(INITIAL_CAPACITY);     }       /**      * Construct a new map including all Inheritable ThreadLocals      * from given parent map. Called only by createInheritedMap.      *      * @param parentMap the map associated with parent thread.      */     private ThreadLocalMap(ThreadLocalMap parentMap) {         Entry[] parentTable = parentMap.table;         int len = parentTable.length;         setThreshold(len);         table = new Entry[len];           for (int j = 0; j < len; j++) {             Entry e = parentTable[j];             if (e != null) {                 ThreadLocal key = e.get();                 if (key != null) {                     Object value = key.childValue(e.value);                     Entry c = new Entry(key, value);                     int h = key.threadLocalHashCode & (len - 1);                     while (table[h] != null)                         h = nextIndex(h, len);                     table[h] = c;                     size++;                 }             }         }     } }

看一下ThreadLocalMap 有一个很关键的静态类Entry

private static class Entry<K,V> extends WeakReference<Object> implements Map.Entry<K,V> {     V value;     int hash;     Entry<K,V> next;       /**      * Creates new entry.      */     Entry(Object key, V value,           ReferenceQueue<Object> queue,           int hash, Entry<K,V> next) {         super(key, queue);         this.value = value;         this.hash  = hash;         this.next  = next;     }       @SuppressWarnings("unchecked")     public K getKey() {         return (K) WeakHashMap.unmaskNull(get());     }       public V getValue() {         return value;     }       public V setValue(V newValue) {         V oldValue = value;         value = newValue;         return oldValue;     }       public boolean equals(Object o) {         if (!(o instanceof Map.Entry))             return false;         Map.Entry<?,?> e = (Map.Entry<?,?>)o;         K k1 = getKey();         Object k2 = e.getKey();         if (k1 == k2 || (k1 != null && k1.equals(k2))) {             V v1 = getValue();             Object v2 = e.getValue();             if (v1 == v2 || (v1 != null && v1.equals(v2)))                 return true;         }         return false;     }       public int hashCode() {         K k = getKey();         V v = getValue();         return ((k==null ? 0 : k.hashCode()) ^                 (v==null ? 0 : v.hashCode()));     }       public String toString() {         return getKey() + "=" + getValue();     } }

WeakReference可能大部分开发并没有注意过(Android开发者可能经常会使用)

 Java从1.2版本开始引入了4种引用,这4种引用的级别由高到低依次为:

   强引用  >  软引用  >  弱引用  >  虚引用

⑴强引用(StrongReference)
    强引用是使用最普遍的引用。如果一个对象具有强引用,那垃圾回收器绝不会回收它。当内存空间不足,Java虚拟机宁愿抛出OutOfMemoryError错误,使程序异常终止,也不会靠随意回收具有强引用的对象来解决内存不足的问题。

⑵软引用(SoftReference)

    如果一个对象只具有软引用,则内存空间足够,垃圾回收器就不会回收它;如果内存空间不足了,就会回收这些对象的内存。只要垃圾回收器没有回收它,该对象就可以被程序使用。软引用可用来实现内存敏感的高速缓存。

    软引用可以和一个引用队列(ReferenceQueue)联合使用,如果软引用所引用的对象被垃圾回收器回收,Java虚拟机就会把这个软引用加入到与之关联的引用队列中。

⑶弱引用(WeakReference)

    弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程,因此不一定会很快发现那些只具有弱引用的对象。

    弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java虚拟机就会把这个弱引用加入到与之关联的引用队列中。

⑷虚引用(PhantomReference)

    “虚引用”顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收。

    虚引用主要用来跟踪对象被垃圾回收器回收的活动。虚引用与软引用和弱引用的一个区别在于:虚引用必须和引用队列 (ReferenceQueue)联合使用。当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在回收对象的内存之前,把这个虚引用加入到与之 关联的引用队列中。

ThreadLocalMap中使用Entry来作为迭代元素,其中WeakReference修饰了ThreadLocal,那么当某个ThreadLocal变量没有强引用那么当GC扫描到就会回收该Entry,那么就无内存泄漏之虞了。

当然我们线程中一定会保留ThreadLocal对象的强引用,因此这边也不会回收。因此一个合理的使用每次线程池归还一定要调用remove(方便释放同时也不会对下一次borrow线程造成影响)

/**  * Set the value associated with key.  *  * @param key the thread local object  * @param value the value to be set  */ private void set(ThreadLocal key, Object value) {       // We don't use a fast path as with get() because it is at     // least as common to use set() to create new entries as     // it is to replace existing ones, in which case, a fast     // path would fail more often than not.       Entry[] tab = table;     int len = tab.length;     int i = key.threadLocalHashCode & (len-1);       for (Entry e = tab[i];          e != null;          e = tab[i = nextIndex(i, len)]) {         ThreadLocal k = e.get();           if (k == key) {             e.value = value;             return;         }           if (k == null) {             replaceStaleEntry(key, value, i);             return;         }     }       tab[i] = new Entry(key, value);     int sz = ++size;     if (!cleanSomeSlots(i, sz) && sz >= threshold)         rehash(); }   /**  * Re-pack and/or re-size the table. First scan the entire  * table removing stale entries. If this doesn't sufficiently  * shrink the size of the table, double the table size.  */ private void rehash() {     expungeStaleEntries();       // Use lower threshold for doubling to avoid hysteresis     if (size >= threshold - threshold / 4)         resize(); }   /**  * Double the capacity of the table.  */ private void resize() {     Entry[] oldTab = table;     int oldLen = oldTab.length;     int newLen = oldLen * 2;     Entry[] newTab = new Entry[newLen];     int count = 0;       for (int j = 0; j < oldLen; ++j) {         Entry e = oldTab[j];         if (e != null) {             ThreadLocal k = e.get();             if (k == null) {                 e.value = null; // Help the GC             } else {                 int h = k.threadLocalHashCode & (newLen - 1);                 while (newTab[h] != null)                     h = nextIndex(h, newLen);                 newTab[h] = e;                 count++;             }         }     }       setThreshold(newLen);     size = count;     table = newTab; }

这边数据的策略比较常见,当发生hash冲突直接将对应数据放入下一个不为空节点即可。当然涉及到HashMap(类似)免不了的就是扩容和rehash等操作

当然数据在删除的时候可能要将后面不为空的节点重新 计算需要挪动到指定的位置

/**  * Remove the entry for key.  */ private void remove(ThreadLocal key) {     Entry[] tab = table;     int len = tab.length;     int i = key.threadLocalHashCode & (len-1);     for (Entry e = tab[i];          e != null;          e = tab[i = nextIndex(i, len)]) {         if (e.get() == key) {             e.clear();             expungeStaleEntry(i);             return;         }     } } /**  * Expunge a stale entry by rehashing any possibly colliding entries  * lying between staleSlot and the next null slot.  This also expunges  * any other stale entries encountered before the trailing null.  See  * Knuth, Section 6.4  *  * @param staleSlot index of slot known to have null key  * @return the index of the next null slot after staleSlot  * (all between staleSlot and this slot will have been checked  * for expunging).  */ private int expungeStaleEntry(int staleSlot) {     Entry[] tab = table;     int len = tab.length;       // expunge entry at staleSlot     tab[staleSlot].value = null;     tab[staleSlot] = null;     size--;       // Rehash until we encounter null     Entry e;     int i;     for (i = nextIndex(staleSlot, len);          (e = tab[i]) != null;          i = nextIndex(i, len)) {         ThreadLocal k = e.get();         if (k == null) {             e.value = null;             tab[i] = null;             size--;         } else {             int h = k.threadLocalHashCode & (len - 1);             if (h != i) {                 tab[i] = null;                   // Unlike Knuth 6.4 Algorithm R, we must scan until                 // null because multiple entries could have been stale.                 while (tab[h] != null)                     h = nextIndex(h, len);                 tab[h] = e;             }         }     }     return i; }

当然为了线程互不干扰我们可以需要调用ThreadLocal.remove方法移除数据

/**  * Removes the current thread's value for this thread-local  * variable.  If this thread-local variable is subsequently  * {@linkplain #get read} by the current thread, its value will be  * reinitialized by invoking its {@link #initialValue} method,  * unless its value is {@linkplain #set set} by the current thread  * in the interim.  This may result in multiple invocations of the  * <tt>initialValue</tt> method in the current thread.  *  * @since 1.5  */  public void remove() {      ThreadLocalMap m = getMap(Thread.currentThread());      if (m != null)          m.remove(this);  }

InheritableThreadLocal

对于ThreadLocal各位可能会有些问题,比如将对应的数据封装到了线程中,但是后面调用比如异步任务之后就会发现对应的ThreadLocal变量取不出数据了。

这个场景很正常  比如在线程池起个线程发送消息等等

那么我们可以使用InheritableThreadLocal来实现,顾名思义,从这个线程中起的子线程将会可以继承对应变量。

/**  * This class extends <tt>ThreadLocal</tt> to provide inheritance of values  * from parent thread to child thread: when a child thread is created, the  * child receives initial values for all inheritable thread-local variables  * for which the parent has values.  Normally the child's values will be  * identical to the parent's; however, the child's value can be made an  * arbitrary function of the parent's by overriding the <tt>childValue</tt>  * method in this class.  *  * <p>Inheritable thread-local variables are used in preference to  * ordinary thread-local variables when the per-thread-attribute being  * maintained in the variable (e.g., User ID, Transaction ID) must be  * automatically transmitted to any child threads that are created.  *  * @author  Josh Bloch and Doug Lea  * @see     ThreadLocal  * @since   1.2  */   public class InheritableThreadLocal<T> extends ThreadLocal<T> {     /**      * Computes the child's initial value for this inheritable thread-local      * variable as a function of the parent's value at the time the child      * thread is created.  This method is called from within the parent      * thread before the child is started.      * <p>      * This method merely returns its input argument, and should be overridden      * if a different behavior is desired.      *      * @param parentValue the parent thread's value      * @return the child thread's initial value      */     protected T childValue(T parentValue) {         return parentValue;     }       /**      * Get the map associated with a ThreadLocal.      *      * @param t the current thread      */     ThreadLocalMap getMap(Thread t) {        return t.inheritableThreadLocals;     }       /**      * Create the map associated with a ThreadLocal.      *      * @param t the current thread      * @param firstValue value for the initial entry of the table.      * @param map the map to store.      */     void createMap(Thread t, T firstValue) {         t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);     } }

women看一下Thread的初始化

/**  * Initializes a Thread.  *  * @param g the Thread group  * @param target the object whose run() method gets called  * @param name the name of the new Thread  * @param stackSize the desired stack size for the new thread, or  *        zero to indicate that this parameter is to be ignored.  */ private void init(ThreadGroup g, Runnable target, String name,                   long stackSize) {     if (name == null) {         throw new NullPointerException("name cannot be null");     }       Thread parent = currentThread();     SecurityManager security = System.getSecurityManager();     if (g == null) {         /* Determine if it's an applet or not */           /* If there is a security manager, ask the security manager            what to do. */         if (security != null) {             g = security.getThreadGroup();         }           /* If the security doesn't have a strong opinion of the matter            use the parent thread group. */         if (g == null) {             g = parent.getThreadGroup();         }     }       /* checkAccess regardless of whether or not threadgroup is        explicitly passed in. */     g.checkAccess();       /*      * Do we have the required permissions?      */     if (security != null) {         if (isCCLOverridden(getClass())) {             security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);         }     }       g.addUnstarted();       this.group = g;     this.daemon = parent.isDaemon();     this.priority = parent.getPriority();     this.name = name.toCharArray();     if (security == null || isCCLOverridden(parent.getClass()))         this.contextClassLoader = parent.getContextClassLoader();     else         this.contextClassLoader = parent.contextClassLoader;     this.inheritedAccessControlContext = AccessController.getContext();     this.target = target;     setPriority(priority);     if (parent.inheritableThreadLocals != null)         this.inheritableThreadLocals =             ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);     /* Stash the specified stack size in case the VM cares */     this.stackSize = stackSize;       /* Set thread ID */     tid = nextThreadID(); }

很明显可以看到当parent的inheritableThreadLocals变量不为空将会传递到子线程中。

应用

通常会定义ThreadLocal变量为static final

比如我们会在定义

private static final ThreadLocal<List<String>> SQL_LIST_TL = new ThreadLocal<>(); private static final ThreadLocal<String> IP_TL = new ThreadLocal<>(); private static final ThreadLocal<String> USER_TL = new ThreadLocal<>(); private static final ThreadLocal<String> ORG_TL = new ThreadLocal<>(); private static final ThreadLocal<String> MAIN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal<Set<String>> IDS_OWN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal<Set<String>> PERMISSION_IDS_OWN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal<String> DATASOURCE_ROUTING_TL = new ThreadLocal<>(); private static final ThreadLocal<String> ACTION_TL = new ThreadLocal<>(); private static final ThreadLocal<ActionType> TYPE_TL = new ThreadLocal<>(); private static final ThreadLocal<String> CUSTOMER_FOR_SUPPLIER_TL = new ThreadLocal<>(); private static final ThreadLocal<Channel> CHANNEL_TL = new ThreadLocal<>(); private static final ThreadLocal<Boolean> SECURITY_ENABLE_TL = new ThreadLocal<>(); private static final ThreadLocal<Boolean> COUNT_ENABLE_TL = new ThreadLocal<>(); private static final List<ThreadLocal> THREAD_LOCAL_LIST = new ArrayList<>();   private static final ThreadLocal<LoginDomain> LOGIN_DOMAIN_TL=new ThreadLocal<>();     static {     THREAD_LOCAL_LIST.add(SQL_LIST_TL);     THREAD_LOCAL_LIST.add(IP_TL);     THREAD_LOCAL_LIST.add(USER_TL);     THREAD_LOCAL_LIST.add(ORG_TL);     THREAD_LOCAL_LIST.add(IDS_OWN_ORG_TL);     THREAD_LOCAL_LIST.add(PERMISSION_IDS_OWN_ORG_TL);     THREAD_LOCAL_LIST.add(DATASOURCE_ROUTING_TL);     THREAD_LOCAL_LIST.add(ACTION_TL);     THREAD_LOCAL_LIST.add(TYPE_TL);     THREAD_LOCAL_LIST.add(CUSTOMER_FOR_SUPPLIER_TL);     THREAD_LOCAL_LIST.add(CHANNEL_TL);     THREAD_LOCAL_LIST.add(SECURITY_ENABLE_TL);     THREAD_LOCAL_LIST.add(COUNT_ENABLE_TL);     THREAD_LOCAL_LIST.add(MAIN_ORG_TL);     THREAD_LOCAL_LIST.add(LOGIN_DOMAIN_TL); } public static void clearThreadLocal() {     for (ThreadLocal tl : THREAD_LOCAL_LIST) {         if (tl != null) {             tl.remove();         }     } }
clearThreadLocal会由统一释放切面的地方进行调用。这样完成threadLocal的清理

本文发表于2017年10月12日 18:34
(c)注:本文转载自https://my.oschina.net/qixiaobo025/blog/1549815,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 1991 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1