refactoring

pull/1888/head
Nikita Koksharov 6 years ago
parent b34a9da9f3
commit 53bd9dff68

@ -220,7 +220,7 @@ public class RedissonTopic implements RTopic {
} }
@Override @Override
public RFuture<Void> removeListenerAsync(final int listenerId) { public RFuture<Void> removeListenerAsync(final Integer... listenerIds) {
final RPromise<Void> promise = new RedissonPromise<Void>(); final RPromise<Void> promise = new RedissonPromise<Void>();
final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() { semaphore.acquire(new Runnable() {
@ -233,7 +233,9 @@ public class RedissonTopic implements RTopic {
return; return;
} }
entry.removeListener(channelName, listenerId); for (int id : listenerIds) {
entry.removeListener(channelName, id);
}
if (!entry.hasListeners(channelName)) { if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore) subscribeService.unsubscribe(channelName, semaphore)
.addListener(new TransferListener<Void>(promise)); .addListener(new TransferListener<Void>(promise));
@ -247,7 +249,7 @@ public class RedissonTopic implements RTopic {
} }
@Override @Override
public void removeListener(int listenerId) { public void removeListener(Integer... listenerIds) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
acquire(semaphore); acquire(semaphore);
@ -257,7 +259,9 @@ public class RedissonTopic implements RTopic {
return; return;
} }
entry.removeListener(channelName, listenerId); for (int id : listenerIds) {
entry.removeListener(channelName, id);
}
if (!entry.hasListeners(channelName)) { if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly(); subscribeService.unsubscribe(channelName, semaphore).syncUninterruptibly();
} else { } else {

@ -77,7 +77,7 @@ public interface RTopic extends RTopicAsync {
* *
* @param listenerId - listener id * @param listenerId - listener id
*/ */
void removeListener(int listenerId); void removeListener(Integer... listenerIds);
/** /**
* Removes all listeners from this topic * Removes all listeners from this topic

@ -62,7 +62,7 @@ public interface RTopicAsync {
* @param listenerId - listener id * @param listenerId - listener id
* @return void * @return void
*/ */
RFuture<Void> removeListenerAsync(int listenerId); RFuture<Void> removeListenerAsync(Integer... listenerIds);
/** /**
* Removes the listener by its instance * Removes the listener by its instance

@ -16,9 +16,11 @@
package org.redisson.cache; package org.redisson.cache;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -278,12 +280,14 @@ public abstract class LocalCacheListener {
} }
public void remove() { public void remove() {
List<Integer> ids = new ArrayList<Integer>(2);
if (syncListenerId != 0) { if (syncListenerId != 0) {
invalidationTopic.removeListener(syncListenerId); ids.add(syncListenerId);
} }
if (reconnectionListenerId != 0) { if (reconnectionListenerId != 0) {
invalidationTopic.removeListener(reconnectionListenerId); ids.add(reconnectionListenerId);
} }
invalidationTopic.removeListenerAsync(ids.toArray(new Integer[ids.size()]));
} }
public String getUpdatesLogName() { public String getUpdatesLogName() {

Loading…
Cancel
Save