ConcurrentSkipListMap指南
1.概述
在這篇快速文章中,我們將研究java.util.concurrent包中的ConcurrentSkipListMap類。
這種構造使我們能夠以無鎖方式創建線程安全邏輯。當我們想為數據做一個不變的快照而其他線程仍將數據插入到映射中時,它是解決問題的理想選擇。
我們將解決一個問題,即使用該構造對事件流進行排序並獲取過去60秒內到達的事件的快照。
2.流排序邏輯
假設我們有一個事件流,這些事件不斷地來自多個線程。我們需要能夠記錄最近60秒內的事件以及60秒之前的事件。
首先,讓我們定義事件數據的結構:
public class Event {
private ZonedDateTime eventTime;
private String content;
// standard constructors/getters
}
我們希望使用eventTime字段對事件進行排序。為了使用ConcurrentSkipListMap實現此目的,我們需要在創建比較器的同時將Comparator傳遞給其構造器:
ConcurrentSkipListMap<ZonedDateTime, String> events
= new ConcurrentSkipListMap<>(
Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));
我們將使用它們的時間戳比較所有到達的事件。我們正在使用comparingLong()方法並傳遞提取函數,該函數可能會花費來自ZonedDateTime**較長的時間戳。
當事件到達時,我們只需要使用put()方法將它們添加到地圖中。請注意,此方法不需要任何顯式同步:
public void acceptEvent(Event event) {
events.put(event.getEventTime(), event.getContent());
}
ConcurrentSkipListMap將使用在構造函數中傳遞給它的Comparator在下面處理這些事件的排序。
ConcurrentSkipListMap最著名的優點是可以無鎖方式對其數據進行不可變快照的方法。要獲取過去一分鐘內到達的所有事件,我們可以使用tailMap()方法並傳遞要獲取元素的時間:
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}
它將返回過去一分鐘的所有事件。這將是一個不變的快照,最重要的是其他編寫線程可以將新事件添加到ConcurrentSkipListMap,而無需進行顯式鎖定。
現在,我們可以使用headMap()方法獲取從現在起一分鐘後到達的所有事件:
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
return events.headMap(ZonedDateTime.now().minusMinutes(1));
}
這將返回所有早於一分鐘的事件的不變快照。以上所有方法都屬於EventWindowSort類,我們將在下一節中使用它。
3.測試排序流邏輯
一旦使用ConcurrentSkipListMap實現了排序邏輯,我們現在就可以通過創建兩個寫入器線程來進行測試,每個寫入器線程將發送一百個事件:
ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;
Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(
new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
);
for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}
每個線程都在調用acceptEvent()方法,將具有eventTime的事件從現在發送到“現在負一百秒”。
同時,我們可以調用getEventsFromLastMinute()方法,該方法將返回一分鐘窗口內事件的快照:
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsFromLastMinute();
在每個測試運行中, eventsFromLastMinute中的事件數將有所不同,具體取決於生產者線程將事件發送到EventWindowSort的速度。我們可以斷言在返回的快照中沒有一個事件早於一分鐘:
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();
assertEquals(eventsOlderThanOneMinute, 0);
一分鐘窗口內快照中的事件超過零:
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertTrue(eventYoungerThanOneMinute > 0);
我們的getEventsFromLastMinute()使用下面的tailMap() 。
現在讓我們測試使用ConcurrentSkipListMap中的headMap()方法的getEventsOlderThatOneMinute() :
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsOlderThatOneMinute();
這次,我們獲得了超過一分鐘的事件的快照。我們可以斷言,此類事件的數量不超過零:
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();
assertTrue(eventsOlderThanOneMinute > 0);
接下來,沒有一個事件是在最後一分鐘之內發生的:
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertEquals(eventYoungerThanOneMinute, 0);
需要注意的最重要的事情是,我們可以在其他線程仍向**ConcurrentSkipListMap添加新值的同時獲取數據快照**。
4。結論
在本快速教程中,我們了解了ConcurrentSkipListMap的基礎知識以及一些實際示例。
我們利用ConcurrentSkipListMap的高性能來實現非阻塞算法,該算法可以為我們提供數據的不變快照,即使同時有多個線程正在更新映射。
所有這些示例和代碼片段的實現都可以在GitHub項目中找到;這是一個Maven項目,因此應該很容易直接導入和運行。