refactoring

pull/1705/head
Nikita 7 years ago
parent 48197a918c
commit 4726cda68a

@ -31,13 +31,14 @@ import java.util.concurrent.locks.ReadWriteLock;
* @author Nikita Koksharov
*
*/
public interface RReadWriteLock extends ReadWriteLock, RExpirable {
public interface RReadWriteLock extends ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
@Override
RLock readLock();
/**
@ -45,6 +46,7 @@ public interface RReadWriteLock extends ReadWriteLock, RExpirable {
*
* @return the lock used for writing
*/
@Override
RLock writeLock();
}

@ -30,7 +30,7 @@ import java.util.concurrent.locks.Lock;
* @author Nikita Koksharov
*
*/
public interface RReadWriteLockReactive extends RExpirableReactive {
public interface RReadWriteLockReactive {
/**
* Returns the lock used for reading.

@ -15,12 +15,8 @@
*/
package org.redisson.command;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import reactor.fn.Supplier;
@ -33,10 +29,4 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R> Publisher<R> reactive(Supplier<RFuture<R>> supplier);
<T, R> Publisher<R> evalWriteReactive(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> Publisher<R> writeReactive(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> readReactive(String key, Codec codec, RedisCommand<T> command, Object ... params);
}

@ -15,12 +15,8 @@
*/
package org.redisson.command;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.reactive.NettyFuturePublisher;
@ -41,35 +37,4 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
return new NettyFuturePublisher<R>(supplier);
}
@Override
public <T, R> Publisher<R> writeReactive(final String key, final Codec codec, final RedisCommand<T> command, final Object ... params) {
return reactive(new Supplier<RFuture<R>>() {
@Override
public RFuture<R> get() {
return writeAsync(key, codec, command, params);
};
});
}
@Override
public <T, R> Publisher<R> readReactive(final String key, final Codec codec, final RedisCommand<T> command, final Object ... params) {
return reactive(new Supplier<RFuture<R>>() {
@Override
public RFuture<R> get() {
return readAsync(key, codec, command, params);
};
});
}
@Override
public <T, R> Publisher<R> evalWriteReactive(final String key, final Codec codec, final RedisCommand<T> evalCommandType,
final String script, final List<Object> keys, final Object... params) {
return reactive(new Supplier<RFuture<R>>() {
@Override
public RFuture<R> get() {
return evalWriteAsync(key, codec, evalCommandType, script, keys, params);
};
});
}
}

@ -1,94 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.api.RExpirableAsync;
import org.redisson.api.RExpirableReactive;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov
*
*/
abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive {
protected final RExpirableAsync instance;
RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;
}
RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) {
super(codec, connectionManager, name, instance);
this.instance = instance;
}
@Override
public Publisher<Boolean> expire(final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.expireAsync(timeToLive, timeUnit);
}
});
}
@Override
public Publisher<Boolean> expireAt(final long timestamp) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.expireAtAsync(timestamp);
}
});
}
@Override
public Publisher<Boolean> expireAt(Date timestamp) {
return expireAt(timestamp.getTime());
}
@Override
public Publisher<Boolean> clearExpire() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.clearExpireAsync();
}
});
}
@Override
public Publisher<Long> remainTimeToLive() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.remainTimeToLiveAsync();
}
});
}
}

@ -1,269 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonReference;
import org.redisson.api.RFuture;
import org.redisson.api.RObjectAsync;
import org.redisson.api.RObjectReactive;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.misc.RedissonObjectFactory;
import io.netty.buffer.ByteBuf;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.Streams;
/**
* Base Redisson object
*
* @author Nikita Koksharov
*
*/
abstract class RedissonObjectReactive implements RObjectReactive {
final CommandReactiveExecutor commandExecutor;
private final String name;
final Codec codec;
protected RObjectAsync instance;
public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) {
this.codec = codec;
this.name = name;
this.commandExecutor = commandExecutor;
this.instance = instance;
}
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
return commandExecutor.reactive(supplier);
}
public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance);
}
protected <V> Stream<V> newSucceeded(V result) {
return Streams.just(result);
}
@Override
public String getName() {
return name;
}
@Override
public Codec getCodec() {
return codec;
}
protected void encode(Collection<Object> params, Collection<?> values) {
for (Object object : values) {
params.add(encode(object));
}
}
protected ByteBuf encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
protected ByteBuf encodeMapKey(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getMapKeyEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
protected ByteBuf encodeMapValue(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return codec.getMapValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public Publisher<Void> restore(final byte[] state) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.restoreAsync(state);
}
});
}
@Override
public Publisher<Void> restore(final byte[] state, final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.restoreAsync(state, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<Void> restoreAndReplace(final byte[] state) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.restoreAndReplaceAsync(state);
}
});
}
@Override
public Publisher<Void> restoreAndReplace(final byte[] state, final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.restoreAndReplaceAsync(state, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<byte[]> dump() {
return reactive(new Supplier<RFuture<byte[]>>() {
@Override
public RFuture<byte[]> get() {
return instance.dumpAsync();
}
});
}
@Override
public Publisher<Boolean> touch() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.touchAsync();
}
});
}
@Override
public Publisher<Boolean> unlink() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.unlinkAsync();
}
});
}
@Override
public Publisher<Void> copy(final String host, final int port, final int database, final long timeout) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.copyAsync(host, port, database, timeout);
}
});
}
@Override
public Publisher<Void> rename(final String newName) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.renameAsync(newName);
}
});
}
@Override
public Publisher<Void> migrate(final String host, final int port, final int database, final long timeout) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.migrateAsync(host, port, database, timeout);
}
});
}
@Override
public Publisher<Boolean> move(final int database) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.moveAsync(database);
}
});
}
@Override
public Publisher<Boolean> renamenx(final String newName) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.renamenxAsync(newName);
}
});
}
@Override
public Publisher<Boolean> delete() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.deleteAsync();
}
});
}
@Override
public Publisher<Boolean> isExists() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.isExistsAsync();
}
});
}
}

@ -26,13 +26,14 @@ import org.redisson.command.CommandReactiveExecutor;
* @author Nikita Koksharov
*
*/
public class RedissonReadWriteLockReactive extends RedissonExpirableReactive implements RReadWriteLockReactive {
public class RedissonReadWriteLockReactive implements RReadWriteLockReactive {
private final RReadWriteLock instance;
private final CommandReactiveExecutor commandExecutor;
public RedissonReadWriteLockReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name, new RedissonReadWriteLock(commandExecutor, name));
this.instance = (RReadWriteLock) super.instance;
this.commandExecutor = commandExecutor;
this.instance = new RedissonReadWriteLock(commandExecutor, name);
}
@Override

Loading…
Cancel
Save