• ConcurrentHashmap
    • JDK1.7
      • Segment类
        • Put
        • scanAndLockForPut
        • remove
        • Clear
      • Put
      • Get
      • Size
    • JDK1.8
      • Put
      • Get

    ConcurrentHashmap

    JDK1.7

    ConcurrentHashMap的锁分段技术:假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

    ConcurrentHashMap不允许Key或者Value的值为NULL。ConcurrentMaps中不允许空值的主要原因是,在非并发映射中几乎不能容忍的模糊性是无法容纳的。主要的一点是如果map.get(key)返回null,则无法检测 key 是否显式映射为 null 或者 key 未映射。 在非并发映射中,您可以通过 map.contains(key) 进行检查,但在并发映射中,映射可能在调用之间发生了变化。

    Concurrenthashmap - 图1

    Segment类

    Put

    将一个HashEntry放入到该Segment中,使用自旋机制,减少了加锁的可能性。

    1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    2. HashEntry<K,V> node = tryLock() ? null :
    3. scanAndLockForPut(key, hash, value); //如果加锁失败,则调用该方法
    4. V oldValue;
    5. try {
    6. HashEntry<K,V>[] tab = table;
    7. int index = (tab.length - 1) & hash; //同hashMap相同的哈希定位方式
    8. HashEntry<K,V> first = entryAt(tab, index);
    9. for (HashEntry<K,V> e = first;;) {
    10. if (e != null) {
    11. //若不为null,则持续查找,知道找到key和hash值相同的节点,将其value更新
    12. K k;
    13. if ((k = e.key) == key ||
    14. (e.hash == hash && key.equals(k))) {
    15. oldValue = e.value;
    16. if (!onlyIfAbsent) {
    17. e.value = value;
    18. ++modCount;
    19. }
    20. break;
    21. }
    22. e = e.next;
    23. }
    24. else { //若头结点为null
    25. if (node != null) //在遍历key对应节点链时没有找到相应的节点
    26. node.setNext(first);
    27. //当前修改并不需要让其他线程知道,在锁退出时修改自然会
    28. //更新到内存中,可提升性能
    29. else
    30. node = new HashEntry<K,V>(hash, key, value, first);
    31. int c = count + 1;
    32. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
    33. rehash(node); //如果超过阈值,则进行rehash操作
    34. else
    35. setEntryAt(tab, index, node);
    36. ++modCount;
    37. count = c;
    38. oldValue = null;
    39. break;
    40. }
    41. }
    42. } finally {
    43. unlock();
    44. }
    45. return oldValue;
    46. }

    scanAndLockForPut

    该操作持续查找key对应的节点链中是否已存在该节点,如果没有找到已存在的节点,则预创建一个新节点,并且尝试n次,直到尝试次数超出限制,才真正进入等待状态,即所谓的 自旋等待

    1. private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    2. //根据hash值找到segment中的HashEntry节点
    3. HashEntry<K,V> first = entryForHash(this, hash); //首先获取头结点
    4. HashEntry<K,V> e = first;
    5. HashEntry<K,V> node = null;
    6. int retries = -1; // negative while locating node
    7. while (!tryLock()) { //持续遍历该哈希链
    8. HashEntry<K,V> f; // to recheck first below
    9. if (retries < 0) {
    10. if (e == null) {
    11. if (node == null) //若不存在要插入的节点,则创建一个新的节点
    12. node = new HashEntry<K,V>(hash, key, value, null);
    13. retries = 0;
    14. }
    15. else if (key.equals(e.key))
    16. retries = 0;
    17. else
    18. e = e.next;
    19. }
    20. else if (++retries > MAX_SCAN_RETRIES) {
    21. //尝试次数超出限制,则进行自旋等待
    22. lock();
    23. break;
    24. }
    25. /*当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,
    26. 并重置retries值为-1,重新为尝试获取锁而自旋遍历*/
    27. else if ((retries & 1) == 0 &&
    28. (f = entryForHash(this, hash)) != first) {
    29. e = first = f; // re-traverse if entry changed
    30. retries = -1;
    31. }
    32. }
    33. return node;
    34. }

    remove

    用于移除某个节点,返回移除的节点值。

    1. final V remove(Object key, int hash, Object value) {
    2. if (!tryLock())
    3. scanAndLock(key, hash);
    4. V oldValue = null;
    5. try {
    6. HashEntry<K,V>[] tab = table;
    7. int index = (tab.length - 1) & hash;
    8. //根据这种哈希定位方式来定位对应的HashEntry
    9. HashEntry<K,V> e = entryAt(tab, index);
    10. HashEntry<K,V> pred = null;
    11. while (e != null) {
    12. K k;
    13. HashEntry<K,V> next = e.next;
    14. if ((k = e.key) == key ||
    15. (e.hash == hash && key.equals(k))) {
    16. V v = e.value;
    17. if (value == null || value == v || value.equals(v)) {
    18. if (pred == null)
    19. setEntryAt(tab, index, next);
    20. else
    21. pred.setNext(next);
    22. ++modCount;
    23. --count;
    24. oldValue = v;
    25. }
    26. break;
    27. }
    28. pred = e;
    29. e = next;
    30. }
    31. } finally {
    32. unlock();
    33. }
    34. return oldValue;
    35. }

    Clear

    要首先对整个segment加锁,然后将每一个HashEntry都设置为null

    1. final void clear() {
    2. lock();
    3. try {
    4. HashEntry<K,V>[] tab = table;
    5. for (int i = 0; i < tab.length ; i++)
    6. setEntryAt(tab, i, null);
    7. ++modCount;
    8. count = 0;
    9. } finally {
    10. unlock();
    11. }
    12. }

    Put

    1. public V put(K key, V value) {
    2. Segment<K,V> s;
    3. if (value == null)
    4. throw new NullPointerException();
    5. int hash = hash(key); //求出key的hash值
    6. int j = (hash >>> segmentShift) & segmentMask;
    7. //求出key在segments数组中的哪一个segment中
    8. if ((s = (Segment<K,V>)UNSAFE.getObject
    9. (segments, (j << SSHIFT) + SBASE)) == null)
    10. s = ensureSegment(j); //使用unsafe操作取出该segment
    11. return s.put(key, hash, value, false); //向segment中put元素
    12. }

    Get

    1. public V get(Object key) {
    2. Segment<K,V> s;
    3. HashEntry<K,V>[] tab;
    4. int h = hash(key); //找出对应的segment的位置
    5. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    6. if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
    7. (tab = s.table) != null) { //使用Unsafe获取对应的Segmen
    8. for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
    9. (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    10. e != null; e = e.next) { //找出对应的HashEntry,从头开始遍历
    11. K k;
    12. if ((k = e.key) == key || (e.hash == h && key.equals(k)))
    13. return e.value;
    14. }
    15. }
    16. return null;
    17. }

    Size

    求出所有的HashEntry的数目,先尝试的遍历查找、计算2遍,如果两遍遍历过程中整个Map没有发生修改(即两次所有Segment实例中modCount值的和一致),则可以认为整个查找、计算过程中Map没有发生改变。否则,需要对所有segment实例进行加锁、计算、解锁,然后返回。

    1. public int size() {
    2. final Segment<K,V>[] segments = this.segments;
    3. int size;
    4. boolean overflow; // true if size overflows 32 bits
    5. long sum; // sum of modCounts
    6. long last = 0L; // previous sum
    7. int retries = -1; // first iteration isn't retry
    8. try {
    9. for (;;) {
    10. if (retries++ == RETRIES_BEFORE_LOCK) {
    11. for (int j = 0; j < segments.length; ++j)
    12. ensureSegment(j).lock(); // force creation
    13. }
    14. sum = 0L;
    15. size = 0;
    16. overflow = false;
    17. for (int j = 0; j < segments.length; ++j) {
    18. Segment<K,V> seg = segmentAt(segments, j);
    19. if (seg != null) {
    20. sum += seg.modCount;
    21. int c = seg.count;
    22. if (c < 0 || (size += c) < 0)
    23. overflow = true;
    24. }
    25. }
    26. if (sum == last)
    27. break;
    28. last = sum;
    29. }
    30. } finally {
    31. if (retries > RETRIES_BEFORE_LOCK) {
    32. for (int j = 0; j < segments.length; ++j)
    33. segmentAt(segments, j).unlock();
    34. }
    35. }
    36. return overflow ? Integer.MAX_VALUE : size;
    37. }

    JDK1.8

    在JDK1.8中对ConcurrentHashmap做了两个改进:

    • 取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率

    • 将原先 table数组+单向链表 的数据结构,变更为 table数组+单向链表+红黑树 的结构。对于 hash 表来说,最核心的能力在于将 key hash 之后能均匀的分布在数组中。如果 hash 之后散列的很均匀,那么 table 数组中的每个队列长度主要为 0 或者 1 。但实际情况并非总是如此理想,虽然 ConcurrentHashMap 类默认的加载因子为 0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为 O(n);因此,对于个数超过 8 (默认值)的链表,jdk1.8 中采用了红黑树的结构,那么查询的时间复杂度可以降低到 O(logN),可以改进性能。

    Put

    1. final V putVal(K key, V value, boolean onlyIfAbsent) {
    2. if (key == null || value == null) throw new NullPointerException();
    3. // 得到 hash 值
    4. int hash = spread(key.hashCode());
    5. // 用于记录相应链表的长度
    6. int binCount = 0;
    7. for (Node<K,V>[] tab = table;;) {
    8. Node<K,V> f; int n, i, fh;
    9. // 如果数组"空",进行数组初始化
    10. if (tab == null || (n = tab.length) == 0)
    11. // 初始化数组,后面会详细介绍
    12. tab = initTable();
    13. // 找该 hash 值对应的数组下标,得到第一个节点 f
    14. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    15. // 如果数组该位置为空,
    16. // 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
    17. // 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
    18. if (casTabAt(tab, i, null,
    19. new Node<K,V>(hash, key, value, null)))
    20. break; // no lock when adding to empty bin
    21. }
    22. // hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容
    23. else if ((fh = f.hash) == MOVED)
    24. // 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
    25. tab = helpTransfer(tab, f);
    26. else { // 到这里就是说,f 是该位置的头结点,而且不为空
    27. V oldVal = null;
    28. // 获取数组该位置的头结点的监视器锁
    29. synchronized (f) {
    30. if (tabAt(tab, i) == f) {
    31. if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
    32. // 用于累加,记录链表的长度
    33. binCount = 1;
    34. // 遍历链表
    35. for (Node<K,V> e = f;; ++binCount) {
    36. K ek;
    37. // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
    38. if (e.hash == hash &&
    39. ((ek = e.key) == key ||
    40. (ek != null && key.equals(ek)))) {
    41. oldVal = e.val;
    42. if (!onlyIfAbsent)
    43. e.val = value;
    44. break;
    45. }
    46. // 到了链表的最末端,将这个新值放到链表的最后面
    47. Node<K,V> pred = e;
    48. if ((e = e.next) == null) {
    49. pred.next = new Node<K,V>(hash, key,
    50. value, null);
    51. break;
    52. }
    53. }
    54. }
    55. else if (f instanceof TreeBin) { // 红黑树
    56. Node<K,V> p;
    57. binCount = 2;
    58. // 调用红黑树的插值方法插入新节点
    59. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    60. value)) != null) {
    61. oldVal = p.val;
    62. if (!onlyIfAbsent)
    63. p.val = value;
    64. }
    65. }
    66. }
    67. }
    68. if (binCount != 0) {
    69. // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
    70. if (binCount >= TREEIFY_THRESHOLD)
    71. // 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
    72. // 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
    73. // 具体源码我们就不看了,扩容部分后面说
    74. treeifyBin(tab, i);
    75. if (oldVal != null)
    76. return oldVal;
    77. break;
    78. }
    79. }
    80. }
    81. //
    82. addCount(1L, binCount);
    83. return null;
    84. }

    Get

    • 计算 hash 值
    • 根据 hash 值找到数组对应位置: (n - 1) & h
    • 根据该位置处结点性质进行相应查找
      • 如果该位置为 null ,那么直接返回 null 就可以了
      • 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
      • 如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,后面我们再介绍 find 方法
      • 如果以上 3 条都不满足,那就是链表,进行遍历比对即可