Merge branch 'master' into 3.0.0

pull/1821/head
Nikita Koksharov 6 years ago
commit 9af57b7b81

@ -50,6 +50,7 @@ import org.redisson.api.RSetCacheRx;
import org.redisson.api.RSetMultimapRx;
import org.redisson.api.RSetRx;
import org.redisson.api.RStreamRx;
import org.redisson.api.RTopic;
import org.redisson.api.RTopicRx;
import org.redisson.api.RTransactionRx;
import org.redisson.api.RedissonRxClient;
@ -75,6 +76,7 @@ import org.redisson.rx.RedissonScoredSortedSetRx;
import org.redisson.rx.RedissonSetCacheRx;
import org.redisson.rx.RedissonSetMultimapRx;
import org.redisson.rx.RedissonSetRx;
import org.redisson.rx.RedissonTopicRx;
import org.redisson.rx.RedissonTransactionRx;
import org.redisson.rx.RxProxyBuilder;
@ -282,12 +284,14 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RTopicRx getTopic(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonTopic(commandExecutor, name), RTopicRx.class);
RTopic topic = new RedissonTopic(commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class);
}
@Override
public RTopicRx getTopic(String name, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonTopic(codec, commandExecutor, name), RTopicRx.class);
RTopic topic = new RedissonTopic(codec, commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class);
}
@Override

@ -28,12 +28,12 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -190,6 +190,61 @@ public class RedissonTopic implements RTopic {
}
@Override
public RFuture<Void> removeListenerAsync(final MessageListener<?> listener) {
final RPromise<Void> promise = new RedissonPromise<Void>();
final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore)
.addListener(new TransferListener<Void>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
}
});
return promise;
}
@Override
public RFuture<Void> removeListenerAsync(final int listenerId) {
final RPromise<Void> promise = new RedissonPromise<Void>();
final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
entry.removeListener(channelName, listenerId);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore)
.addListener(new TransferListener<Void>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
}
});
return promise;
}
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);

@ -55,5 +55,21 @@ public interface RTopicAsync {
* @see org.redisson.api.listener.MessageListener
*/
<M> RFuture<Integer> addListenerAsync(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerId - listener id
* @return void
*/
RFuture<Void> removeListenerAsync(int listenerId);
/**
* Removes the listener by its instance
*
* @param listener - listener instance
* @return void
*/
RFuture<Void> removeListenerAsync(MessageListener<?> listener);
}

@ -73,4 +73,13 @@ public interface RTopicRx {
* @param listenerId - listener id
*/
void removeListener(int listenerId);
/**
* Returns stream of messages.
*
* @param type - type of message to listen
* @return stream of messages
*/
<M> Flowable<M> getMessages(Class<M> type);
}

@ -24,6 +24,7 @@ import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
@ -44,15 +45,23 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
supplier.call().addListener(new FutureListener<R>() {
RFuture<R> future = supplier.call();
future.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
public void operationComplete(final Future<R> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
future.cancel(true);
}
});
if (future.getNow() != null) {
p.onNext(future.getNow());
}

@ -0,0 +1,49 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RTopicRx;
import io.reactivex.Flowable;
public class RedissonTopicRxTest extends BaseRxTest {
@Test
public void testLong() throws InterruptedException {
RTopicRx topic = redisson.getTopic("test");
Flowable<String> messages = topic.getMessages(String.class);
List<String> list = new ArrayList<>();
messages.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10);
}
@Override
public void onNext(String t) {
list.add(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
for (int i = 0; i < 15; i++) {
sync(topic.publish("" + i));
}
assertThat(list).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
}
}
Loading…
Cancel
Save