Fixed - ClassCastException during RBatchReactive and RBatchRx execution in exectionMode = REDIS_WRITE_ATOMIC or REDIS_READ_ATOMIC. #1661

pull/2009/head
Nikita Koksharov 6 years ago
parent c0729ae51a
commit bd74e07fdb

@ -188,7 +188,7 @@ public class CommandBatchService extends CommandAsyncService {
AsyncSemaphore semaphore = new AsyncSemaphore(0); AsyncSemaphore semaphore = new AsyncSemaphore(0);
@Override @Override
protected <R> RPromise<R> createPromise() { public <R> RPromise<R> createPromise() {
if (isRedisBasedQueue()) { if (isRedisBasedQueue()) {
return new BatchPromise<R>(executed); return new BatchPromise<R>(executed);
} }

@ -15,11 +15,8 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.api.BatchOptions; import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult; import org.redisson.api.BatchResult;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -32,7 +29,6 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/** /**
@ -43,22 +39,35 @@ import reactor.core.publisher.Mono;
public class CommandReactiveBatchService extends CommandReactiveService { public class CommandReactiveBatchService extends CommandReactiveService {
private final CommandBatchService batchService; private final CommandBatchService batchService;
private final Queue<Publisher<?>> publishers = new ConcurrentLinkedQueue<Publisher<?>>();
public CommandReactiveBatchService(ConnectionManager connectionManager) { public CommandReactiveBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager); super(connectionManager);
batchService = new CommandBatchService(connectionManager); batchService = new CommandBatchService(connectionManager, options);
} }
@Override @Override
public <R> Mono<R> reactive(Supplier<RFuture<R>> supplier) { public <R> Mono<R> reactive(Supplier<RFuture<R>> supplier) {
Mono<R> publisher = super.reactive(supplier); Mono<R> mono = super.reactive(new Supplier<RFuture<R>>() {
publishers.add(publisher); volatile RFuture<R> future;
return publisher; @Override
public RFuture<R> get() {
if (future == null) {
synchronized (this) {
if (future == null) {
future = supplier.get();
}
}
}
return future;
}
});
mono.subscribe();
return mono;
} }
public <R> Publisher<R> superReactive(Supplier<RFuture<R>> supplier) { @Override
return super.reactive(supplier); protected <R> RPromise<R> createPromise() {
return batchService.createPromise();
} }
@Override @Override
@ -68,10 +77,6 @@ public class CommandReactiveBatchService extends CommandReactiveService {
} }
public RFuture<BatchResult<?>> executeAsync(BatchOptions options) { public RFuture<BatchResult<?>> executeAsync(BatchOptions options) {
for (Publisher<?> publisher : publishers) {
Flux.from(publisher).subscribe();
}
return batchService.executeAsync(options); return batchService.executeAsync(options);
} }

@ -16,7 +16,6 @@
package org.redisson.reactive; package org.redisson.reactive;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.redisson.RedissonAtomicDouble; import org.redisson.RedissonAtomicDouble;
import org.redisson.RedissonAtomicLong; import org.redisson.RedissonAtomicLong;
@ -51,7 +50,6 @@ import org.redisson.api.RBlockingDequeReactive;
import org.redisson.api.RBlockingQueueReactive; import org.redisson.api.RBlockingQueueReactive;
import org.redisson.api.RBucketReactive; import org.redisson.api.RBucketReactive;
import org.redisson.api.RDequeReactive; import org.redisson.api.RDequeReactive;
import org.redisson.api.RFuture;
import org.redisson.api.RGeoReactive; import org.redisson.api.RGeoReactive;
import org.redisson.api.RHyperLogLogReactive; import org.redisson.api.RHyperLogLogReactive;
import org.redisson.api.RKeysReactive; import org.redisson.api.RKeysReactive;
@ -91,7 +89,7 @@ public class RedissonBatchReactive implements RBatchReactive {
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandReactiveService commandExecutor, BatchOptions options) { public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandReactiveService commandExecutor, BatchOptions options) {
this.evictionScheduler = evictionScheduler; this.evictionScheduler = evictionScheduler;
this.executorService = new CommandReactiveBatchService(connectionManager); this.executorService = new CommandReactiveBatchService(connectionManager, options);
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.options = options; this.options = options;
} }
@ -287,12 +285,7 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override @Override
public Mono<BatchResult<?>> execute() { public Mono<BatchResult<?>> execute() {
return commandExecutor.reactive(new Supplier<RFuture<BatchResult<?>>>() { return commandExecutor.reactive(() -> executorService.executeAsync(options));
@Override
public RFuture<BatchResult<?>> get() {
return executorService.executeAsync(options);
}
});
} }
public RBatchReactive atomic() { public RBatchReactive atomic() {

@ -15,9 +15,7 @@
*/ */
package org.redisson.rx; package org.redisson.rx;
import java.util.Queue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.api.BatchOptions; import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult; import org.redisson.api.BatchResult;
@ -41,17 +39,16 @@ import io.reactivex.Flowable;
public class CommandRxBatchService extends CommandRxService { public class CommandRxBatchService extends CommandRxService {
private final CommandBatchService batchService; private final CommandBatchService batchService;
private final Queue<Flowable<?>> publishers = new ConcurrentLinkedQueue<>();
public CommandRxBatchService(ConnectionManager connectionManager) { public CommandRxBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager); super(connectionManager);
batchService = new CommandBatchService(connectionManager); batchService = new CommandBatchService(connectionManager, options);
} }
@Override @Override
public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) { public <R> Flowable<R> flowable(Callable<RFuture<R>> supplier) {
Flowable<R> flowable = super.flowable(new Callable<RFuture<R>>() { Flowable<R> flowable = super.flowable(new Callable<RFuture<R>>() {
RFuture<R> future; volatile RFuture<R> future;
@Override @Override
public RFuture<R> call() throws Exception { public RFuture<R> call() throws Exception {
if (future == null) { if (future == null) {
@ -64,10 +61,15 @@ public class CommandRxBatchService extends CommandRxService {
return future; return future;
} }
}); });
publishers.add(flowable); flowable.subscribe();
return flowable; return flowable;
} }
@Override
protected <R> RPromise<R> createPromise() {
return batchService.createPromise();
}
@Override @Override
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) { Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
@ -75,10 +77,6 @@ public class CommandRxBatchService extends CommandRxService {
} }
public RFuture<BatchResult<?>> executeAsync(BatchOptions options) { public RFuture<BatchResult<?>> executeAsync(BatchOptions options) {
for (Flowable<?> element : publishers) {
element.subscribe();
}
return batchService.executeAsync(options); return batchService.executeAsync(options);
} }

@ -92,7 +92,7 @@ public class RedissonBatchRx implements RBatchRx {
public RedissonBatchRx(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandRxExecutor commandExecutor, BatchOptions options) { public RedissonBatchRx(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandRxExecutor commandExecutor, BatchOptions options) {
this.evictionScheduler = evictionScheduler; this.evictionScheduler = evictionScheduler;
this.executorService = new CommandRxBatchService(connectionManager); this.executorService = new CommandRxBatchService(connectionManager, options);
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.options = options; this.options = options;
} }

@ -53,7 +53,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
public static Iterable<Object[]> data() { public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
{BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY)}, {BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY)},
// {BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC)} {BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC)}
}); });
} }
@ -131,7 +131,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
} }
} }
// @Test @Test
public void testConnectionLeakAfterError() throws InterruptedException { public void testConnectionLeakAfterError() throws InterruptedException {
Config config = BaseTest.createConfig(); Config config = BaseTest.createConfig();
config.useSingleServer() config.useSingleServer()
@ -143,7 +143,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); BatchOptions batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
RBatchRx batch = redisson.createBatch(batchOptions); RBatchRx batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 3000; i++) { for (int i = 0; i < 300000; i++) {
batch.getBucket("test").set(123); batch.getBucket("test").set(123);
} }
@ -227,7 +227,7 @@ public class RedissonBatchRxTest extends BaseRxTest {
} }
@Test @Test
public void testWriteTimeout() { public void testWriteTimeout() throws InterruptedException {
Config config = BaseTest.createConfig(); Config config = BaseTest.createConfig();
config.useSingleServer().setTimeout(3000); config.useSingleServer().setTimeout(3000);
RedissonRxClient redisson = Redisson.createRx(config); RedissonRxClient redisson = Redisson.createRx(config);
@ -236,9 +236,11 @@ public class RedissonBatchRxTest extends BaseRxTest {
RMapCacheRx<String, String> map = batch.getMapCache("test"); RMapCacheRx<String, String> map = batch.getMapCache("test");
int total = 200000; int total = 200000;
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
Maybe<String> f = map.put("" + i, "" + i, 5, TimeUnit.MINUTES); map.put("" + i, "" + i, 5, TimeUnit.MINUTES);
if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
f.blockingGet(); if (i % 100 == 0) {
Thread.sleep(5);
}
} }
} }

Loading…
Cancel
Save