refactoring

pull/2041/head
Nikita Koksharov 6 years ago
parent 3509434b25
commit d5a7a4ce84

@ -54,6 +54,7 @@ import io.netty.buffer.ByteBufOutputStream;
*/ */
public class FstCodec extends BaseCodec { public class FstCodec extends BaseCodec {
@SuppressWarnings("AvoidInlineConditionals")
static class FSTMapSerializerV2 extends FSTBasicObjectSerializer { static class FSTMapSerializerV2 extends FSTBasicObjectSerializer {
@Override @Override

@ -15,7 +15,7 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.function.Supplier; import java.util.concurrent.Callable;
import org.redisson.api.BatchOptions; import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult; import org.redisson.api.BatchResult;
@ -46,15 +46,15 @@ public class CommandReactiveBatchService extends CommandReactiveService {
} }
@Override @Override
public <R> Mono<R> reactive(Supplier<RFuture<R>> supplier) { public <R> Mono<R> reactive(Callable<RFuture<R>> supplier) {
Mono<R> mono = super.reactive(new Supplier<RFuture<R>>() { Mono<R> mono = super.reactive(new Callable<RFuture<R>>() {
volatile RFuture<R> future; volatile RFuture<R> future;
@Override @Override
public RFuture<R> get() { public RFuture<R> call() throws Exception {
if (future == null) { if (future == null) {
synchronized (this) { synchronized (this) {
if (future == null) { if (future == null) {
future = supplier.get(); future = supplier.call();
} }
} }
} }

@ -15,7 +15,7 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.function.Supplier; import java.util.concurrent.Callable;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
@ -29,6 +29,6 @@ import reactor.core.publisher.Mono;
*/ */
public interface CommandReactiveExecutor extends CommandAsyncExecutor { public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R> Mono<R> reactive(Supplier<RFuture<R>> supplier); <R> Mono<R> reactive(Callable<RFuture<R>> supplier);
} }

@ -15,7 +15,7 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.function.Supplier; import java.util.concurrent.Callable;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService; import org.redisson.command.CommandAsyncService;
@ -36,10 +36,16 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
} }
@Override @Override
public <R> Mono<R> reactive(Supplier<RFuture<R>> supplier) { public <R> Mono<R> reactive(Callable<RFuture<R>> supplier) {
return Flux.<R>create(emitter -> { return Flux.<R>create(emitter -> {
emitter.onRequest(n -> { emitter.onRequest(n -> {
RFuture<R> future = supplier.get(); RFuture<R> future;
try {
future = supplier.call();
} catch (Exception e) {
emitter.error(e);
return;
}
future.onComplete((v, e) -> { future.onComplete((v, e) -> {
if (e != null) { if (e != null) {
emitter.error(e); emitter.error(e);

@ -16,7 +16,7 @@
package org.redisson.reactive; package org.redisson.reactive;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.function.Supplier; import java.util.concurrent.Callable;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.misc.ProxyBuilder; import org.redisson.misc.ProxyBuilder;
@ -37,15 +37,11 @@ public class ReactiveProxyBuilder {
return ProxyBuilder.create(new Callback() { return ProxyBuilder.create(new Callback() {
@Override @Override
public Object execute(Method mm, Object instance, Method instanceMethod, Object[] args) { public Object execute(Method mm, Object instance, Method instanceMethod, Object[] args) {
return commandExecutor.reactive(new Supplier<RFuture<Object>>() { return commandExecutor.reactive(new Callable<RFuture<Object>>() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public RFuture<Object> get() { public RFuture<Object> call() throws Exception {
try {
return (RFuture<Object>) mm.invoke(instance, args); return (RFuture<Object>) mm.invoke(instance, args);
} catch (Exception e) {
throw new IllegalStateException(e);
}
} }
}); });
} }

@ -15,10 +15,7 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.function.Supplier;
import org.redisson.api.RBucketReactive; import org.redisson.api.RBucketReactive;
import org.redisson.api.RFuture;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RMapCache; import org.redisson.api.RMapCache;
import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapCacheReactive;
@ -118,22 +115,12 @@ public class RedissonTransactionReactive implements RTransactionReactive {
@Override @Override
public Mono<Void> commit() { public Mono<Void> commit() {
return executorService.reactive(new Supplier<RFuture<Void>>() { return executorService.reactive(() -> transaction.commitAsync());
@Override
public RFuture<Void> get() {
return transaction.commitAsync();
}
});
} }
@Override @Override
public Mono<Void> rollback() { public Mono<Void> rollback() {
return executorService.reactive(new Supplier<RFuture<Void>>() { return executorService.reactive(() -> transaction.rollbackAsync());
@Override
public RFuture<Void> get() {
return transaction.rollbackAsync();
}
});
} }
} }

@ -43,7 +43,13 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
return p.doOnRequest(new LongConsumer() { return p.doOnRequest(new LongConsumer() {
@Override @Override
public void accept(long t) throws Exception { public void accept(long t) throws Exception {
RFuture<R> future = supplier.call(); RFuture<R> future;
try {
future = supplier.call();
} catch (Exception e) {
p.onError(e);
return;
}
future.onComplete((res, e) -> { future.onComplete((res, e) -> {
if (e != null) { if (e != null) {
p.onError(e); p.onError(e);

@ -15,7 +15,6 @@
*/ */
package org.redisson.rx; package org.redisson.rx;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.RedissonAtomicDouble; import org.redisson.RedissonAtomicDouble;
@ -51,7 +50,6 @@ import org.redisson.api.RBlockingDequeRx;
import org.redisson.api.RBlockingQueueRx; import org.redisson.api.RBlockingQueueRx;
import org.redisson.api.RBucketRx; import org.redisson.api.RBucketRx;
import org.redisson.api.RDequeRx; import org.redisson.api.RDequeRx;
import org.redisson.api.RFuture;
import org.redisson.api.RGeoRx; import org.redisson.api.RGeoRx;
import org.redisson.api.RHyperLogLogRx; import org.redisson.api.RHyperLogLogRx;
import org.redisson.api.RKeysRx; import org.redisson.api.RKeysRx;
@ -74,7 +72,6 @@ import org.redisson.api.RedissonRxClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.RedissonPromise;
import io.reactivex.Maybe; import io.reactivex.Maybe;
@ -298,16 +295,7 @@ public class RedissonBatchRx implements RBatchRx {
@Override @Override
public Maybe<BatchResult<?>> execute() { public Maybe<BatchResult<?>> execute() {
return commandExecutor.flowable(new Callable<RFuture<BatchResult<?>>>() { return commandExecutor.flowable(() -> executorService.executeAsync(options)).singleElement();
@Override
public RFuture<BatchResult<?>> call() {
try {
return executorService.executeAsync(options);
} catch (Exception e) {
return RedissonPromise.newFailedFuture(e);
}
}
}).singleElement();
} }
public RBatchRx atomic() { public RBatchRx atomic() {

@ -42,14 +42,9 @@ public class RxProxyBuilder {
@Override @Override
public Object execute(Method mm, Object instance, Method instanceMethod, Object[] args) { public Object execute(Method mm, Object instance, Method instanceMethod, Object[] args) {
Flowable<Object> flowable = commandExecutor.flowable(new Callable<RFuture<Object>>() { Flowable<Object> flowable = commandExecutor.flowable(new Callable<RFuture<Object>>() {
@SuppressWarnings("unchecked")
@Override @Override
public RFuture<Object> call() { public RFuture<Object> call() throws Exception {
try {
return (RFuture<Object>) mm.invoke(instance, args); return (RFuture<Object>) mm.invoke(instance, args);
} catch (Exception e) {
throw new IllegalStateException(e);
}
} }
}); });

Loading…
Cancel
Save