From 89ceaad0becb96d19d36fcb62301cd1d50860013 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 13 Dec 2023 11:43:35 +0300 Subject: [PATCH] Improvement - Virtual Threads compatibility #5499 --- .../RedissonReactiveSubscription.java | 58 ++++++++----------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- .../RedissonReactiveSubscription.java | 57 ++++++++---------- 10 files changed, 248 insertions(+), 323 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 4aabf9439..6ea3fc913 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -33,11 +33,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -45,19 +46,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -66,12 +64,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -85,7 +81,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final ListenableCounter monosListener = new ListenableCounter(); private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager) { this.subscribeService = connectionManager.getSubscribeService(); } @@ -108,7 +104,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { }); return Mono.fromFuture(future); }); - } protected ChannelName toChannelName(ByteBuffer channel) { @@ -149,17 +144,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -186,12 +177,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index e8aae680d..6ea3fc913 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -33,11 +33,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -45,19 +46,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -66,12 +64,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -85,7 +81,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final ListenableCounter monosListener = new ListenableCounter(); private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager) { this.subscribeService = connectionManager.getSubscribeService(); } @@ -148,17 +144,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -185,12 +177,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index e8aae680d..6ea3fc913 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -33,11 +33,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -45,19 +46,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -66,12 +64,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -85,7 +81,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final ListenableCounter monosListener = new ListenableCounter(); private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager) { this.subscribeService = connectionManager.getSubscribeService(); } @@ -148,17 +144,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -185,12 +177,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index e8aae680d..6ea3fc913 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -33,11 +33,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -45,19 +46,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -66,12 +64,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -85,7 +81,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final ListenableCounter monosListener = new ListenableCounter(); private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager) { this.subscribeService = connectionManager.getSubscribeService(); } @@ -148,17 +144,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -185,12 +177,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index e8aae680d..6ea3fc913 100644 --- a/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-25/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -33,11 +33,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -45,19 +46,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -66,12 +64,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -85,7 +81,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final ListenableCounter monosListener = new ListenableCounter(); private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager) { this.subscribeService = connectionManager.getSubscribeService(); } @@ -148,17 +144,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -185,12 +177,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 86f9c1089..dddedbc8a 100644 --- a/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-26/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -36,11 +36,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -48,19 +49,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -69,12 +67,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -89,7 +85,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final RedisPubSubListener subscriptionListener; private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager, SubscriptionListener subscriptionListener) { this.subscribeService = connectionManager.getSubscribeService(); this.subscriptionListener = new RedisPubSubListener() { @@ -175,17 +171,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -212,12 +204,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 86f9c1089..dddedbc8a 100644 --- a/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-27/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -36,11 +36,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -48,19 +49,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -69,12 +67,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -89,7 +85,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final RedisPubSubListener subscriptionListener; private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager, SubscriptionListener subscriptionListener) { this.subscribeService = connectionManager.getSubscribeService(); this.subscriptionListener = new RedisPubSubListener() { @@ -175,17 +171,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -212,12 +204,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 86f9c1089..dddedbc8a 100644 --- a/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-30/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -36,11 +36,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -48,19 +49,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -69,12 +67,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -89,7 +85,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final RedisPubSubListener subscriptionListener; private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager, SubscriptionListener subscriptionListener) { this.subscribeService = connectionManager.getSubscribeService(); this.subscriptionListener = new RedisPubSubListener() { @@ -175,17 +171,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -212,12 +204,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-31/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-31/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 86f9c1089..dddedbc8a 100644 --- a/redisson-spring-data/redisson-spring-data-31/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-31/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -36,11 +36,12 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** - * + * * @author Nikita Koksharov * */ @@ -48,19 +49,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -69,12 +67,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -89,7 +85,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { private final RedisPubSubListener subscriptionListener; private final PublishSubscribeService subscribeService; - + public RedissonReactiveSubscription(ConnectionManager connectionManager, SubscriptionListener subscriptionListener) { this.subscribeService = connectionManager.getSubscribeService(); this.subscriptionListener = new RedisPubSubListener() { @@ -175,17 +171,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -212,12 +204,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); } diff --git a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 86f9c1089..6f4d99558 100644 --- a/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-32/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -36,6 +36,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -48,19 +49,16 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { public static class ListenableCounter { - private int state; + private final AtomicInteger state = new AtomicInteger(); private Runnable r; - public synchronized void acquire() { - state++; + public void acquire() { + state.incrementAndGet(); } public void release() { - synchronized (this) { - state--; - if (state != 0) { - return; - } + if (state.decrementAndGet() != 0) { + return; } if (r != null) { @@ -69,12 +67,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - public synchronized void addListener(Runnable r) { - synchronized (this) { - if (state != 0) { - this.r = r; - return; - } + public void addListener(Runnable r) { + if (state.get() != 0) { + this.r = r; + return; } r.run(); @@ -107,10 +103,6 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { } } - @Override - public void onPatternMessage(CharSequence pattern, CharSequence channel, Object message) { - } - @Override public void onMessage(CharSequence channel, Object msg) { } @@ -175,17 +167,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.UNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.channels) { - Collection entries = RedissonReactiveSubscription.this.channels.get(cn); - for (PubSubConnectionEntry entry : entries) { - if (!entry.hasListeners(cn)) { - entries.remove(entry); - if (entries.isEmpty()) { - RedissonReactiveSubscription.this.channels.remove(cn); - } - } + RedissonReactiveSubscription.this.channels.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; } - } + return entries; + }); }); futures.add(f); } @@ -212,12 +200,13 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { ChannelName cn = toChannelName(channel); CompletableFuture f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE); f = f.whenComplete((res, e) -> { - synchronized (RedissonReactiveSubscription.this.patterns) { - Collection entries = RedissonReactiveSubscription.this.patterns.get(cn); - entries.stream() - .filter(en -> en.hasListeners(cn)) - .forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn)); - } + RedissonReactiveSubscription.this.patterns.computeIfPresent(cn, (key, entries) -> { + entries.removeIf(entry -> !entry.hasListeners(cn)); + if (entries.isEmpty()) { + return null; + } + return entries; + }); }); futures.add(f); }