refactoring

pull/970/merge
Nikita Koksharov 5 years ago
parent ac8210ae10
commit 34a913616c

@ -84,12 +84,12 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin
}
@Override
public void mark(int readlimit) {
public synchronized void mark(int readlimit) {
mark = index;
}
@Override
public void reset() throws IOException {
public synchronized void reset() throws IOException {
index = mark;
}

@ -105,7 +105,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
return;
}
log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);
log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr());
try {
bootstrap.connect(connection.getRedisClient().getAddr()).addListener(new ChannelFutureListener() {

@ -16,18 +16,11 @@
package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
import org.redisson.RedissonObject;
import org.redisson.ScanIterator;
import org.redisson.api.RFuture;
import org.redisson.api.RLockReactive;
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RSetCache;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.*;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.publisher.Flux;
/**
@ -65,27 +58,27 @@ public class RedissonSetCacheReactive<V> {
}
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) {
String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "permitexpirablesemaphore");
String name = ((RedissonObject) instance).getLockByValue(value, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
}
public RSemaphoreReactive getSemaphore(V value) {
String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "semaphore");
String name = ((RedissonObject) instance).getLockByValue(value, "semaphore");
return redisson.getSemaphore(name);
}
public RLockReactive getFairLock(V value) {
String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "fairlock");
String name = ((RedissonObject) instance).getLockByValue(value, "fairlock");
return redisson.getFairLock(name);
}
public RReadWriteLockReactive getReadWriteLock(V value) {
String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "rw_lock");
String name = ((RedissonObject) instance).getLockByValue(value, "rw_lock");
return redisson.getReadWriteLock(name);
}
public RLockReactive getLock(V value) {
String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "lock");
String name = ((RedissonObject) instance).getLockByValue(value, "lock");
return redisson.getLock(name);
}

@ -15,21 +15,15 @@
*/
package org.redisson.rx;
import io.reactivex.Flowable;
import io.reactivex.Single;
import org.reactivestreams.Publisher;
import org.redisson.RedissonObject;
import org.redisson.RedissonSet;
import org.redisson.api.RFuture;
import org.redisson.api.RLockRx;
import org.redisson.api.RPermitExpirableSemaphoreRx;
import org.redisson.api.RReadWriteLockRx;
import org.redisson.api.RSemaphoreRx;
import org.redisson.api.RSet;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.*;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import io.reactivex.Flowable;
import io.reactivex.Single;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
*
@ -78,27 +72,27 @@ public class RedissonSetRx<V> {
}
public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value) {
String name = ((RedissonSet<V>) instance).getLockByValue(value, "permitexpirablesemaphore");
String name = ((RedissonObject) instance).getLockByValue(value, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
}
public RSemaphoreRx getSemaphore(V value) {
String name = ((RedissonSet<V>) instance).getLockByValue(value, "semaphore");
String name = ((RedissonObject) instance).getLockByValue(value, "semaphore");
return redisson.getSemaphore(name);
}
public RLockRx getFairLock(V value) {
String name = ((RedissonSet<V>) instance).getLockByValue(value, "fairlock");
String name = ((RedissonObject) instance).getLockByValue(value, "fairlock");
return redisson.getFairLock(name);
}
public RReadWriteLockRx getReadWriteLock(V value) {
String name = ((RedissonSet<V>) instance).getLockByValue(value, "rw_lock");
String name = ((RedissonObject) instance).getLockByValue(value, "rw_lock");
return redisson.getReadWriteLock(name);
}
public RLockRx getLock(V value) {
String name = ((RedissonSet<V>) instance).getLockByValue(value, "lock");
String name = ((RedissonObject) instance).getLockByValue(value, "lock");
return redisson.getLock(name);
}

Loading…
Cancel
Save