From 4726cda68a6a8404aa9de61bd005d949cf7e1a9f Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 9 Oct 2018 12:23:58 +0300 Subject: [PATCH] refactoring --- .../java/org/redisson/api/RReadWriteLock.java | 4 +- .../redisson/api/RReadWriteLockReactive.java | 2 +- .../command/CommandReactiveExecutor.java | 10 - .../command/CommandReactiveService.java | 35 --- .../reactive/RedissonExpirableReactive.java | 94 ------ .../reactive/RedissonObjectReactive.java | 269 ------------------ .../RedissonReadWriteLockReactive.java | 7 +- 7 files changed, 8 insertions(+), 413 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java diff --git a/redisson/src/main/java/org/redisson/api/RReadWriteLock.java b/redisson/src/main/java/org/redisson/api/RReadWriteLock.java index 3606c71f0..7ffe41af3 100644 --- a/redisson/src/main/java/org/redisson/api/RReadWriteLock.java +++ b/redisson/src/main/java/org/redisson/api/RReadWriteLock.java @@ -31,13 +31,14 @@ import java.util.concurrent.locks.ReadWriteLock; * @author Nikita Koksharov * */ -public interface RReadWriteLock extends ReadWriteLock, RExpirable { +public interface RReadWriteLock extends ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading */ + @Override RLock readLock(); /** @@ -45,6 +46,7 @@ public interface RReadWriteLock extends ReadWriteLock, RExpirable { * * @return the lock used for writing */ + @Override RLock writeLock(); } diff --git a/redisson/src/main/java/org/redisson/api/RReadWriteLockReactive.java b/redisson/src/main/java/org/redisson/api/RReadWriteLockReactive.java index ea68964c0..4c41410aa 100644 --- a/redisson/src/main/java/org/redisson/api/RReadWriteLockReactive.java +++ b/redisson/src/main/java/org/redisson/api/RReadWriteLockReactive.java @@ -30,7 +30,7 @@ import java.util.concurrent.locks.Lock; * @author Nikita Koksharov * */ -public interface RReadWriteLockReactive extends RExpirableReactive { +public interface RReadWriteLockReactive { /** * Returns the lock used for reading. diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java index ab4b6930a..2180e989c 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java @@ -15,12 +15,8 @@ */ package org.redisson.command; -import java.util.List; - import org.reactivestreams.Publisher; import org.redisson.api.RFuture; -import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; import reactor.fn.Supplier; @@ -33,10 +29,4 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor { Publisher reactive(Supplier> supplier); - Publisher evalWriteReactive(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); - - Publisher writeReactive(String key, Codec codec, RedisCommand command, Object ... params); - - Publisher readReactive(String key, Codec codec, RedisCommand command, Object ... params); - } diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveService.java index 785e7f8b7..dc36e7cd0 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveService.java @@ -15,12 +15,8 @@ */ package org.redisson.command; -import java.util.List; - import org.reactivestreams.Publisher; import org.redisson.api.RFuture; -import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; import org.redisson.reactive.NettyFuturePublisher; @@ -41,35 +37,4 @@ public class CommandReactiveService extends CommandAsyncService implements Comma return new NettyFuturePublisher(supplier); } - @Override - public Publisher writeReactive(final String key, final Codec codec, final RedisCommand command, final Object ... params) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return writeAsync(key, codec, command, params); - }; - }); - } - - @Override - public Publisher readReactive(final String key, final Codec codec, final RedisCommand command, final Object ... params) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return readAsync(key, codec, command, params); - }; - }); - } - - @Override - public Publisher evalWriteReactive(final String key, final Codec codec, final RedisCommand evalCommandType, - final String script, final List keys, final Object... params) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return evalWriteAsync(key, codec, evalCommandType, script, keys, params); - }; - }); - } - } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java deleted file mode 100644 index 604395f12..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.Date; -import java.util.concurrent.TimeUnit; - -import org.reactivestreams.Publisher; -import org.redisson.api.RExpirableAsync; -import org.redisson.api.RExpirableReactive; -import org.redisson.api.RFuture; -import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; - -/** - * - * @author Nikita Koksharov - * - */ -abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive { - - protected final RExpirableAsync instance; - - RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { - super(connectionManager, name, instance); - this.instance = instance; - } - - RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { - super(codec, connectionManager, name, instance); - this.instance = instance; - } - - @Override - public Publisher expire(final long timeToLive, final TimeUnit timeUnit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.expireAsync(timeToLive, timeUnit); - } - }); - } - - @Override - public Publisher expireAt(final long timestamp) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.expireAtAsync(timestamp); - } - }); - } - - @Override - public Publisher expireAt(Date timestamp) { - return expireAt(timestamp.getTime()); - } - - @Override - public Publisher clearExpire() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.clearExpireAsync(); - } - }); - } - - @Override - public Publisher remainTimeToLive() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.remainTimeToLiveAsync(); - } - }); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java deleted file mode 100644 index 1da539cd2..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonReference; -import org.redisson.api.RFuture; -import org.redisson.api.RObjectAsync; -import org.redisson.api.RObjectReactive; -import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; -import org.redisson.misc.RedissonObjectFactory; - -import io.netty.buffer.ByteBuf; -import reactor.fn.Supplier; -import reactor.rx.Stream; -import reactor.rx.Streams; - -/** - * Base Redisson object - * - * @author Nikita Koksharov - * - */ -abstract class RedissonObjectReactive implements RObjectReactive { - - final CommandReactiveExecutor commandExecutor; - private final String name; - final Codec codec; - protected RObjectAsync instance; - - public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) { - this.codec = codec; - this.name = name; - this.commandExecutor = commandExecutor; - this.instance = instance; - } - - public Publisher reactive(Supplier> supplier) { - return commandExecutor.reactive(supplier); - } - - public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) { - this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance); - } - - protected Stream newSucceeded(V result) { - return Streams.just(result); - } - - @Override - public String getName() { - return name; - } - - @Override - public Codec getCodec() { - return codec; - } - - protected void encode(Collection params, Collection values) { - for (Object object : values) { - params.add(encode(object)); - } - } - - protected ByteBuf encode(Object value) { - if (commandExecutor.isRedissonReferenceSupportEnabled()) { - RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); - if (reference != null) { - value = reference; - } - } - - try { - return codec.getValueEncoder().encode(value); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - protected ByteBuf encodeMapKey(Object value) { - if (commandExecutor.isRedissonReferenceSupportEnabled()) { - RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); - if (reference != null) { - value = reference; - } - } - - try { - return codec.getMapKeyEncoder().encode(value); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - protected ByteBuf encodeMapValue(Object value) { - if (commandExecutor.isRedissonReferenceSupportEnabled()) { - RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); - if (reference != null) { - value = reference; - } - } - - try { - return codec.getMapValueEncoder().encode(value); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Publisher restore(final byte[] state) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.restoreAsync(state); - } - }); - } - - @Override - public Publisher restore(final byte[] state, final long timeToLive, final TimeUnit timeUnit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.restoreAsync(state, timeToLive, timeUnit); - } - }); - } - - @Override - public Publisher restoreAndReplace(final byte[] state) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.restoreAndReplaceAsync(state); - } - }); - } - - @Override - public Publisher restoreAndReplace(final byte[] state, final long timeToLive, final TimeUnit timeUnit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.restoreAndReplaceAsync(state, timeToLive, timeUnit); - } - }); - } - - @Override - public Publisher dump() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.dumpAsync(); - } - }); - } - - @Override - public Publisher touch() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.touchAsync(); - } - }); - } - - @Override - public Publisher unlink() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.unlinkAsync(); - } - }); - } - - @Override - public Publisher copy(final String host, final int port, final int database, final long timeout) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.copyAsync(host, port, database, timeout); - } - }); - } - - @Override - public Publisher rename(final String newName) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.renameAsync(newName); - } - }); - } - - @Override - public Publisher migrate(final String host, final int port, final int database, final long timeout) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.migrateAsync(host, port, database, timeout); - } - }); - } - - @Override - public Publisher move(final int database) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.moveAsync(database); - } - }); - } - - @Override - public Publisher renamenx(final String newName) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.renamenxAsync(newName); - } - }); - } - - @Override - public Publisher delete() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.deleteAsync(); - } - }); - } - - @Override - public Publisher isExists() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.isExistsAsync(); - } - }); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java index 84a46fb77..2eaeecb47 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java @@ -26,13 +26,14 @@ import org.redisson.command.CommandReactiveExecutor; * @author Nikita Koksharov * */ -public class RedissonReadWriteLockReactive extends RedissonExpirableReactive implements RReadWriteLockReactive { +public class RedissonReadWriteLockReactive implements RReadWriteLockReactive { private final RReadWriteLock instance; + private final CommandReactiveExecutor commandExecutor; public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name, new RedissonReadWriteLock(commandExecutor, name)); - this.instance = (RReadWriteLock) super.instance; + this.commandExecutor = commandExecutor; + this.instance = new RedissonReadWriteLock(commandExecutor, name); } @Override