18.2.2 ConcurrentHashMap线程安全实现机制
1. ConcurrentHashMap概述
1.1 设计目标
ConcurrentHashMap是Java并发包中提供的线程安全的哈希表实现,旨在解决HashMap在多线程环境下的安全问题,同时提供比Hashtable更好的并发性能。
1.2 核心特点
- 线程安全,支持高并发访问
- 分段锁机制(JDK7)/ CAS + synchronized(JDK8)
- 不允许null键和null值
- 支持完全并发的读操作
- 支持可配置的并发写操作
2. JDK7中的ConcurrentHashMap
2.1 分段锁机制
JDK7采用分段锁(Segment)机制,将整个哈希表分成多个段,每个段独立加锁。
// JDK7 ConcurrentHashMap核心结构 public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> { // 默认并发级别 static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 段数组 final Segment<K,V>[] segments; // Segment继承ReentrantLock,每个段都是一个独立的锁 static final class Segment<K,V> extends ReentrantLock implements Serializable { // 段内的哈希表 transient volatile HashEntry<K,V>[] table; // 段内元素数量 transient int count; // 哈希表节点 static final class HashEntry<K,V> { final int hash; final K key; volatile V value; // volatile保证可见性 volatile HashEntry<K,V> next; // volatile保证链表结构的可见性 } } }
2.2 JDK7的put操作流程
public class JDK7ConcurrentHashMapPut { public V put(K key, V value) { // 1. 计算hash值定位到具体的段 int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; // 2. 获取段,如果不存在则创建 Segment<K,V> s = ensureSegment(j); // 3. 在段内执行put操作 return s.put(key, hash, value, false); } // Segment内的put方法 final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 尝试获取锁,如果获取失败则自旋等待 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); // 遍历链表查找或插入 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { // 插入新节点 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); // 释放锁 } return oldValue; } }
2.3 JDK7的get操作特点
public class JDK7ConcurrentHashMapGet { public V get(Object key) { // 无需加锁,通过volatile保证可见性 int hash = hash(key); long u = (((hash >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 通过UNSAFE直接访问内存 Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u); if (s != null && s.table != null) { // 通过volatile读取链表头节点 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (s.table, ((long)(((s.table.length - 1) & hash)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) return e.value; } } return null; } }
3. JDK8中的ConcurrentHashMap
3.1 数据结构变化
JDK8废弃了分段锁,采用CAS + synchronized的方式实现线程安全。
// JDK8 ConcurrentHashMap核心结构 public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V> { // 存储数据的数组 transient volatile Node<K,V>[] table; // 控制表初始化和扩容的标志 private transient volatile int sizeCtl; // 元素个数,使用LongAdder的思想 private transient volatile long baseCount; private transient volatile CounterCell[] counterCells; // 普通节点 static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; } // 红黑树节点 static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; TreeNode<K,V> left; TreeNode<K,V> right; boolean red; } // 转发节点,用于扩容 static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } } }
3.2 JDK8的put操作流程
public class JDK8ConcurrentHashMapPut { final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i; // 1. 如果表为空,初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 2. 如果桶为空,使用CAS插入 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // CAS成功,跳出循环 } // 3. 如果正在扩容,帮助扩容 else if (f.hash == MOVED) tab = helpTransfer(tab, f); // 4. 桶不为空,需要加锁处理 else { V oldVal = null; synchronized (f) { // 锁住链表头节点 if (tabAt(tab, i) == f) { if (f.hash >= 0) { // 链表处理 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, val
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java面试圣经 文章被收录于专栏
Java面试圣经,带你练透java圣经