Merge pull request #5100 from wynn5a/master

Fix ClassCastException from MonoNext to Flux
pull/5108/head
Nikita Koksharov 2 years ago committed by GitHub
commit 66fcfa5642
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -172,7 +172,7 @@ public class RedissonLock extends RedissonBaseLock {
return new CompletableFutureWrapper<>(f);
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);

@ -21,6 +21,8 @@ import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.misc.ProxyBuilder;
import org.redisson.misc.ProxyBuilder.Callback;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
*
@ -37,7 +39,12 @@ public class ReactiveProxyBuilder {
return ProxyBuilder.create(new Callback() {
@Override
public Object execute(Callable<RFuture<Object>> callable, Method instanceMethod) {
return commandExecutor.reactive(callable);
Mono<Object> result = commandExecutor.reactive(callable);
if (instanceMethod.getReturnType().isAssignableFrom(Flux.class)) {
Mono<Iterable> monoListResult = result.cast(Iterable.class);
return monoListResult.flatMapMany(Flux::fromIterable);
}
return result;
}
}, instance, implementation, clazz, commandExecutor.getServiceManager());
}

@ -124,4 +124,21 @@ public class RedissonDequeReactiveTest extends BaseReactiveTest {
assertThat(toIterator(queue.descendingIterator())).toIterable().containsExactly(3, 2, 1);
}
@Test
public void testPollLast() {
final RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.addAll(Arrays.asList(1, 2, 3)));
assertThat(toIterator(queue.pollLast(2))).toIterable().containsExactly(3, 2);
}
@Test
public void testPollFirst() {
final RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.addAll(Arrays.asList(1, 2, 3)));
assertThat(toIterator(queue.pollFirst(2))).toIterable().containsExactly(1, 2);
}
}

Loading…
Cancel
Save