refactoring

pull/4061/head
Nikita Koksharov 3 years ago
parent 2acb278f5f
commit a899702b0e

@ -24,11 +24,10 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
@ -278,13 +277,11 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> result = new RedissonPromise<>();
CountableListener<Void> listener = new CountableListener<>(result, null, 3);
RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:set");
setTopic.removeListenerAsync(listenerId).onComplete(listener);
removeListenersAsync(listenerId, listener);
return result;
RFuture<Void> f1 = setTopic.removeListenerAsync(listenerId);
RFuture<Void> f2 = super.removeListenerAsync(listenerId);
CompletableFuture<Void> f = CompletableFuture.allOf(f1.toCompletableFuture(), f2.toCompletableFuture());
return new CompletableFutureWrapper<>(f);
}
}

@ -31,11 +31,12 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.iterator.RedissonListIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.CountableListener;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import static org.redisson.client.protocol.RedisCommands.*;
@ -965,27 +966,26 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> result = new RedissonPromise<>();
CountableListener<Void> listener = new CountableListener<>(result, null, 5);
RPatternTopic addTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:rpush");
addTopic.removeListenerAsync(listenerId).onComplete(listener);
RFuture<Void> f1 = addTopic.removeListenerAsync(listenerId);
RPatternTopic remTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lrem");
remTopic.removeListenerAsync(listenerId).onComplete(listener);
RFuture<Void> f2 = remTopic.removeListenerAsync(listenerId);
RPatternTopic trimTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:ltrim");
trimTopic.removeListenerAsync(listenerId).onComplete(listener);
RFuture<Void> f3 = trimTopic.removeListenerAsync(listenerId);
RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:lset");
setTopic.removeListenerAsync(listenerId).onComplete(listener);
RFuture<Void> f4 = setTopic.removeListenerAsync(listenerId);
RPatternTopic insertTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:linsert");
insertTopic.removeListenerAsync(listenerId).onComplete(listener);
RFuture<Void> f5 = insertTopic.removeListenerAsync(listenerId);
removeListenersAsync(listenerId, listener);
RFuture<Void> f6 = super.removeListenerAsync(listenerId);
return result;
CompletableFuture<Void> f = CompletableFuture.allOf(f1.toCompletableFuture(), f2.toCompletableFuture(), f3.toCompletableFuture(),
f4.toCompletableFuture(), f5.toCompletableFuture(), f5.toCompletableFuture(), f6.toCompletableFuture());
return new CompletableFutureWrapper<>(f);
}
@Override

@ -22,12 +22,11 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CountableListener;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
@ -430,18 +429,13 @@ public abstract class RedissonObject implements RObject {
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> result = new RedissonPromise<>();
CountableListener<Void> listener = new CountableListener<>(result, null, 2);
removeListenersAsync(listenerId, listener);
return result;
}
protected final void removeListenersAsync(int listenerId, CountableListener<Void> listener) {
RPatternTopic expiredTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
expiredTopic.removeListenerAsync(listenerId).onComplete(listener);
RFuture<Void> f1 = expiredTopic.removeListenerAsync(listenerId);
RPatternTopic deletedTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:del");
deletedTopic.removeListenerAsync(listenerId).onComplete(listener);
RFuture<Void> f2 = deletedTopic.removeListenerAsync(listenerId);
CompletableFuture<Void> f = CompletableFuture.allOf(f1.toCompletableFuture(), f2.toCompletableFuture());
return new CompletableFutureWrapper<>(f);
}
}

@ -26,13 +26,13 @@ import org.redisson.client.protocol.ScoredEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import java.math.BigDecimal;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
@ -1570,13 +1570,11 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> result = new RedissonPromise<>();
CountableListener<Void> listener = new CountableListener<>(result, null, 3);
RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:zadd");
setTopic.removeListenerAsync(listenerId).onComplete(listener);
removeListenersAsync(listenerId, listener);
return result;
RFuture<Void> f1 = setTopic.removeListenerAsync(listenerId);
RFuture<Void> f2 = super.removeListenerAsync(listenerId);
CompletableFuture<Void> f = CompletableFuture.allOf(f1.toCompletableFuture(), f2.toCompletableFuture());
return new CompletableFutureWrapper<>(f);
}
}

@ -1,74 +0,0 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
/**
*
* @author Nikita Koksharov
*
*/
public class CountableListener<T> implements BiConsumer<Object, Throwable> {
protected final AtomicInteger counter = new AtomicInteger();
protected final RPromise<T> result;
protected final T value;
public CountableListener() {
this(null, null);
}
public CountableListener(RPromise<T> result, T value) {
this(result, value, 0);
}
public CountableListener(RPromise<T> result, T value, int count) {
this.result = result;
this.value = value;
this.counter.set(count);
}
public void setCounter(int newValue) {
counter.set(newValue);
}
public void decCounter() {
if (counter.decrementAndGet() == 0) {
onSuccess(value);
if (result != null) {
result.trySuccess(value);
}
}
}
protected void onSuccess(T value) {
}
@Override
public void accept(Object t, Throwable u) {
if (u != null) {
if (result != null) {
result.tryFailure(u);
}
return;
}
decCounter();
}
}
Loading…
Cancel
Save