ConcurrentMap指南

1.概述

Maps自然是Java集合中使用最廣泛的樣式之一。

而且,重要的是, HashMap不是線程安全的實現,而Hashtable確實通過同步操作來提供線程安全。

即使Hashtable是線程安全的,它也不是很有效。另一個完全同步的Maps Collections.synchronizedMap也不具有很高的效率。如果我們想要在高並發下實現高吞吐量的線程安全,那麼這些實現方法就不可行了。

為了解決該問題, Java Collections Framework**Java 1.5中引入了ConcurrentMap 。**

以下討論基於Java 1.8

2. ConcurrentMap

ConcurrentMapMap接口的擴展。它旨在提供一種結構和指南,以解決使吞吐量與線程安全協調的問題。

通過重寫幾種接口默認方法, ConcurrentMap為有效實現提供了指導,以提供線程安全性和內存一致性的原子操作。

重寫了幾個默認的實現,從而禁用了鍵/值支持:

  • getOrDefault
  • forEach
  • replaceAll
  • computeIfAbsent
  • computeIfPresent
  • compute
  • merge

在沒有默認接口實現的情況下,以下API也被覆蓋以支持原子性:

  • putIfAbsent
  • remove
  • replace(key, oldValue, newValue)
  • replace(key, value)

其餘動作直接繼承Map,與Map基本一致。

3. ConcurrentHashMap

ConcurrentHashMap是現成的現成ConcurrentMap實現。

為了獲得更好的性能,它由引擎蓋下的表存儲桶(以前是Java 8之前的表段)的節點數組組成,並且在更新期間主要使用CAS操作。

第一次插入時,表存儲區會延遲初始化。通過鎖定存儲桶中的第一個節點,可以獨立鎖定每個存儲桶。讀取操作不會阻塞,並且更新爭用會最小化。

所需的段數與訪問表的線程數有關,因此每個段正在進行的更新最多不會超過一個時間。

Java 8之前,所需的“段”數是相對於訪問表的線程數而言的,因此每個段正在進行的更新最多不會超過一個時間。

因此,與HashMap相比,構造函數提供了額外的concurrencyLevel參數來控制要使用的估計線程數:

public ConcurrentHashMap(
public ConcurrentHashMap(

 int initialCapacity, float loadFactor, int concurrencyLevel)

另外兩個參數: initialCapacityloadFactor的工作原理HashMap相同。

但是,從Java 8開始,僅提供構造函數是為了向後兼容:參數僅會影響map的初始大小

3.1。線程安全

ConcurrentMap保證多線程環境中鍵/值操作的內存一致性。

在將對像作為鍵或值放入ConcurrentMap之前,線程中的操作發生在訪問或刪除另一個線程中的對象之後的操作之前

為了確認,讓我們看一下內存不一致的情況:

@Test

 public void givenHashMap_whenSumParallel_thenError() throws Exception {

 Map<String, Integer> map = new HashMap<>();

 List<Integer> sumList = parallelSum100(map, 100);



 assertNotEquals(1, sumList

 .stream()

 .distinct()

 .count());

 long wrongResultCount = sumList

 .stream()

 .filter(num -> num != 100)

 .count();



 assertTrue(wrongResultCount > 0);

 }



 private List<Integer> parallelSum100(Map<String, Integer> map,

 int executionTimes) throws InterruptedException {

 List<Integer> sumList = new ArrayList<>(1000);

 for (int i = 0; i < executionTimes; i++) {

 map.put("test", 0);

 ExecutorService executorService =

 Executors.newFixedThreadPool(4);

 for (int j = 0; j < 10; j++) {

 executorService.execute(() -> {

 for (int k = 0; k < 10; k++)

 map.computeIfPresent(

 "test",

 (key, value) -> value + 1

 );

 });

 }

 executorService.shutdown();

 executorService.awaitTermination(5, TimeUnit.SECONDS);

 sumList.add(map.get("test"));

 }

 return sumList;

 }

對於每個並行的map.computeIfPresent動作, HashMap均未提供應為當前整數值的一致視圖,從而導致不一致和不良的結果。

至於ConcurrentHashMap ,我們可以獲得一致且正確的結果:

@Test

 public void givenConcurrentMap_whenSumParallel_thenCorrect()

 throws Exception {

 Map<String, Integer> map = new ConcurrentHashMap<>();

 List<Integer> sumList = parallelSum100(map, 1000);



 assertEquals(1, sumList

 .stream()

 .distinct()

 .count());

 long wrongResultCount = sumList

 .stream()

 .filter(num -> num != 100)

 .count();



 assertEquals(0, wrongResultCount);

 }

3.2。null鍵/值

ConcurrentMap提供的大多數API都不允許鍵或值,例如:

@Test(expected = NullPointerException.class)

 public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {

 concurrentMap.put(null, new Object());

 }



 @Test(expected = NullPointerException.class)

 public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {

 concurrentMap.put("test", null);

 }

但是,對於compute *merge動作,計算值可以為null ,這表示如果存在鍵值映射,則將其刪除;如果先前不存在,則將其保留

@Test

 public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {

 Object oldValue = new Object();

 concurrentMap.put("test", oldValue);

 concurrentMap.compute("test", (s, o) -> null);



 assertNull(concurrentMap.get("test"));

 }

3.3. Stream流支持

Java 8ConcurrentHashMap中也提供了Stream支持。

與大多數流方法不同,批量(順序和並行)操作允許安全地進行並發修改。 ConcurrentModificationException不會引發,這也適用於其迭代器。與流相關,還添加了一些forEach *search和*reduce **方法,以支持更豐富的遍歷和map-reduce操作。

3.4。性能

在後台, ConcurrentHashMap類似於HashMap ,它基於哈希表進行數據訪問和更新(儘管更複雜)。

當然,在大多數並發情況下, ConcurrentHashMap的數據檢索和更新性能應更高。

讓我們為獲取放置性能編寫一個快速的微基準測試,並將其與HashtableCollections.synchronizedMap進行比較,在4個線程中將這兩項操作運行500,000次。

@Test

 public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster()

 throws Exception {

 Map<String, Object> hashtable = new Hashtable<>();

 Map<String, Object> synchronizedHashMap =

 Collections.synchronizedMap(new HashMap<>());

 Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();



 long hashtableAvgRuntime = timeElapseForGetPut(hashtable);

 long syncHashMapAvgRuntime =

 timeElapseForGetPut(synchronizedHashMap);

 long concurrentHashMapAvgRuntime =

 timeElapseForGetPut(concurrentHashMap);



 assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);

 assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);

 }



 private long timeElapseForGetPut(Map<String, Object> map)

 throws InterruptedException {

 ExecutorService executorService =

 Executors.newFixedThreadPool(4);

 long startTime = System.nanoTime();

 for (int i = 0; i < 4; i++) {

 executorService.execute(() -> {

 for (int j = 0; j < 500_000; j++) {

 int value = ThreadLocalRandom

 .current()

 .nextInt(10000);

 String key = String.valueOf(value);

 map.put(key, value);

 map.get(key);

 }

 });

 }

 executorService.shutdown();

 executorService.awaitTermination(1, TimeUnit.MINUTES);

 return (System.nanoTime() - startTime) / 500_000;

 }

請記住,微基準測試僅針對單個場景,並不總是能很好地反映現實情況。

就是說,在具有平均開發系統的OS X系統上,我們看到100次連續運行(以納秒為單位)的平均樣本結果:

Hashtable: 1142.45

 SynchronizedHashMap: 1273.89

 ConcurrentHashMap: 230.2

在多線程環境中,期望有多個線程訪問同一個Map ,顯然ConcurrentHashMap是更可取的。

但是,當只能通過單個線程訪問Map時, HashMap可以因為其簡單性和可靠的性能而成為更好的選擇。

3.5. 陷阱

檢索操作通常不會在ConcurrentHashMap中阻塞,並且可能與更新操作重疊。因此,為了獲得更好的性能,它們僅反映最新的更新操作的結果,如官方Javadoc中所述

還有其他一些事實需要牢記:

  • 聚合狀態方法(包括sizeisEmptycontainsValue)的結果通常僅在映射未在其他線程中進行並發更新時才有用:
@Test

 public void givenConcurrentMap_whenUpdatingAndGetSize_thenError()

 throws InterruptedException {

 Runnable collectMapSizes = () -> {

 for (int i = 0; i < MAX_SIZE; i++) {

 mapSizes.add(concurrentMap.size());

 }

 };

 Runnable updateMapData = () -> {

 for (int i = 0; i < MAX_SIZE; i++) {

 concurrentMap.put(String.valueOf(i), i);

 }

 };

 executorService.execute(updateMapData);

 executorService.execute(collectMapSizes);

 executorService.shutdown();

 executorService.awaitTermination(1, TimeUnit.MINUTES);



 assertNotEquals(MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());

 assertEquals(MAX_SIZE, concurrentMap.size());

 }

如果並發更新受到嚴格控制,則聚合狀態仍將是可靠的。

儘管這些匯總狀態方法不能保證實時準確性,但它們可能足以用於監視或估計目的

請注意, ConcurrentHashMapsize()的使用應替換為mappingCount() ,因為後一種方法返回的計數很長,儘管從深處看,它們是基於相同的估計。

  • hashCode很重要:請注意,使用許多具有完全相同的hashCode()的鍵是降低任何哈希表性能的肯定方法。

為了改善鍵可比較時的影響, ConcurrentHashMap可以在鍵之間使用比較順序來幫助打破關係。儘管如此,我們仍應盡可能避免使用相同的hashCode()

  • 迭代器僅設計為在單個線程中使用,因為它們提供了較弱的一致性,而不是快速失敗遍歷,並且它們永遠不會引發ConcurrentModificationException。
  • 默認初始表容量為16,並根據指定的並發級別進行調整:
public ConcurrentHashMap(

 int initialCapacity, float loadFactor, int concurrencyLevel) {



 //...

 if (initialCapacity < concurrencyLevel) {

 initialCapacity = concurrencyLevel;

 }

 //...

 }
  • 關於重新映射函數的警告:儘管我們可以使用提供的compute和*merge **方法進行重新映射操作,但我們應使其保持快速,簡短和簡單,並專注於當前映射以避免意外的阻塞。
  • ConcurrentHashMap中的鍵不是按排序順序排列的,因此對於需要排序的情況, ConcurrentSkipListMap是一個合適的選擇。

4. ConcurrentNavigableMap

對於需要對鍵進行排序的情況,我們可以使用ConcurrentSkipListMap ,它是TreeMap的並發版本。

作為ConcurrentMap的補充, ConcurrentNavigableMap支持其鍵的總排序(默認情況下為升序),並且可以同時導航。為了並發兼容性,將覆蓋返回地圖視圖的方法:

  • subMap
  • headMap
  • tailMap
  • subMap
  • headMap
  • tailMap
  • descendingMap

弱內存一致性增強了*keySet()*視圖的迭代器和分離器:

  • navigableKeySet
  • keySet
  • descendingKeySet

5. ConcurrentSkipListMap

之前,我們介紹了NavigableMap接口及其實現TreeMap 。可以看到ConcurrentSkipListMapTreeMap的可伸縮並發版本。

實際上,在Java中沒有並發實現紅黑樹。在ConcurrentSkipListMap中實現了SkipLists的並髮變體,它為containsKeygetputremove操作及其變體提供了預期的平均log(n)時間成本。

除了TreeMap的功能外,線程安全性還可以保證鍵的插入,刪除,更新和訪問操作。這是同時導航時與TreeMap的比較:

@Test

 public void givenSkipListMap_whenNavConcurrently_thenCountCorrect()

 throws InterruptedException {

 NavigableMap<Integer, Integer> skipListMap

 = new ConcurrentSkipListMap<>();

 int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);



 assertEquals(10000 * 4, count);

 }



 @Test

 public void givenTreeMap_whenNavConcurrently_thenCountError()

 throws InterruptedException {

 NavigableMap<Integer, Integer> treeMap = new TreeMap<>();

 int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);



 assertNotEquals(10000 * 4, count);

 }



 private int countMapElementByPollingFirstEntry(

 NavigableMap<Integer, Integer> navigableMap,

 int elementCount,

 int concurrencyLevel) throws InterruptedException {



 for (int i = 0; i < elementCount * concurrencyLevel; i++) {

 navigableMap.put(i, i);

 }



 AtomicInteger counter = new AtomicInteger(0);

 ExecutorService executorService

 = Executors.newFixedThreadPool(concurrencyLevel);

 for (int j = 0; j < concurrencyLevel; j++) {

 executorService.execute(() -> {

 for (int i = 0; i < elementCount; i++) {

 if (navigableMap.pollFirstEntry() != null) {

 counter.incrementAndGet();

 }

 }

 });

 }

 executorService.shutdown();

 executorService.awaitTermination(1, TimeUnit.MINUTES);

 return counter.get();

 }

對幕後性能問題的完整解釋超出了本文的範圍。詳細信息可以在ConcurrentSkipListMap的Javadoc中找到,該文件位於src.zip文件中的java / util / concurrent下。

六,結論

在這篇文章中,我們主要介紹了ConcurrentMap接口和ConcurrentHashMap中的功能和覆蓋在ConcurrentNavigableMap是所需的關鍵排序。

本文中使用的所有示例的完整源代碼都可以在GitHub項目中找到。