diff --git a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java index 49bced277..988805d22 100644 --- a/redisson/src/main/java/org/redisson/RedissonBinaryStream.java +++ b/redisson/src/main/java/org/redisson/RedissonBinaryStream.java @@ -84,12 +84,12 @@ public class RedissonBinaryStream extends RedissonBucket<byte[]> implements RBin } @Override - public void mark(int readlimit) { + public synchronized void mark(int readlimit) { mark = index; } @Override - public void reset() throws IOException { + public synchronized void reset() throws IOException { index = mark; } diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 7d3b889eb..15252b755 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -105,7 +105,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { return; } - log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection); + log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr()); try { bootstrap.connect(connection.getRedisClient().getAddr()).addListener(new ChannelFutureListener() { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index d18e38015..ed6365c10 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -16,18 +16,11 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; -import org.redisson.RedissonSetCache; +import org.redisson.RedissonObject; import org.redisson.ScanIterator; -import org.redisson.api.RFuture; -import org.redisson.api.RLockReactive; -import org.redisson.api.RPermitExpirableSemaphoreReactive; -import org.redisson.api.RReadWriteLockReactive; -import org.redisson.api.RSemaphoreReactive; -import org.redisson.api.RSetCache; -import org.redisson.api.RedissonReactiveClient; +import org.redisson.api.*; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; - import reactor.core.publisher.Flux; /** @@ -65,27 +58,27 @@ public class RedissonSetCacheReactive<V> { } public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) { - String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "permitexpirablesemaphore"); + String name = ((RedissonObject) instance).getLockByValue(value, "permitexpirablesemaphore"); return redisson.getPermitExpirableSemaphore(name); } public RSemaphoreReactive getSemaphore(V value) { - String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "semaphore"); + String name = ((RedissonObject) instance).getLockByValue(value, "semaphore"); return redisson.getSemaphore(name); } public RLockReactive getFairLock(V value) { - String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "fairlock"); + String name = ((RedissonObject) instance).getLockByValue(value, "fairlock"); return redisson.getFairLock(name); } public RReadWriteLockReactive getReadWriteLock(V value) { - String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "rw_lock"); + String name = ((RedissonObject) instance).getLockByValue(value, "rw_lock"); return redisson.getReadWriteLock(name); } public RLockReactive getLock(V value) { - String name = ((RedissonSetCache<V>) instance).getLockByValue(value, "lock"); + String name = ((RedissonObject) instance).getLockByValue(value, "lock"); return redisson.getLock(name); } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java index d171d0830..87e6939d6 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java @@ -15,21 +15,15 @@ */ package org.redisson.rx; +import io.reactivex.Flowable; +import io.reactivex.Single; import org.reactivestreams.Publisher; +import org.redisson.RedissonObject; import org.redisson.RedissonSet; -import org.redisson.api.RFuture; -import org.redisson.api.RLockRx; -import org.redisson.api.RPermitExpirableSemaphoreRx; -import org.redisson.api.RReadWriteLockRx; -import org.redisson.api.RSemaphoreRx; -import org.redisson.api.RSet; -import org.redisson.api.RedissonRxClient; +import org.redisson.api.*; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; -import io.reactivex.Flowable; -import io.reactivex.Single; - /** * Distributed and concurrent implementation of {@link java.util.Set} * @@ -78,27 +72,27 @@ public class RedissonSetRx<V> { } public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value) { - String name = ((RedissonSet<V>) instance).getLockByValue(value, "permitexpirablesemaphore"); + String name = ((RedissonObject) instance).getLockByValue(value, "permitexpirablesemaphore"); return redisson.getPermitExpirableSemaphore(name); } public RSemaphoreRx getSemaphore(V value) { - String name = ((RedissonSet<V>) instance).getLockByValue(value, "semaphore"); + String name = ((RedissonObject) instance).getLockByValue(value, "semaphore"); return redisson.getSemaphore(name); } public RLockRx getFairLock(V value) { - String name = ((RedissonSet<V>) instance).getLockByValue(value, "fairlock"); + String name = ((RedissonObject) instance).getLockByValue(value, "fairlock"); return redisson.getFairLock(name); } public RReadWriteLockRx getReadWriteLock(V value) { - String name = ((RedissonSet<V>) instance).getLockByValue(value, "rw_lock"); + String name = ((RedissonObject) instance).getLockByValue(value, "rw_lock"); return redisson.getReadWriteLock(name); } public RLockRx getLock(V value) { - String name = ((RedissonSet<V>) instance).getLockByValue(value, "lock"); + String name = ((RedissonObject) instance).getLockByValue(value, "lock"); return redisson.getLock(name); }