future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName());
+ Long res = get(future);
return res.intValue();
}
diff --git a/redisson/src/main/java/org/redisson/api/RKeys.java b/redisson/src/main/java/org/redisson/api/RKeys.java
index f3445184b..56bd641b6 100644
--- a/redisson/src/main/java/org/redisson/api/RKeys.java
+++ b/redisson/src/main/java/org/redisson/api/RKeys.java
@@ -236,7 +236,7 @@ public interface RKeys extends RKeysAsync {
*
* Requires Redis 4.0+
*
- * @param keys
+ * @param keys of objects
* @return number of removed keys
*/
long unlink(String ... keys);
diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java
new file mode 100644
index 000000000..f9deb34e6
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java
@@ -0,0 +1,179 @@
+/**
+ * Copyright 2016 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.api;
+
+import java.util.concurrent.TimeUnit;
+
+import org.reactivestreams.Publisher;
+
+/**
+ *
+ * @author Nikita Koksharov
+ *
+ */
+public interface RSemaphoreReactive extends RExpirableReactive {
+
+ /**
+ * Acquires a permit only if one is available at the
+ * time of invocation.
+ *
+ *
Acquires a permit, if one is available and returns immediately,
+ * with the value {@code true},
+ * reducing the number of available permits by one.
+ *
+ *
If no permit is available then this method will return
+ * immediately with the value {@code false}.
+ *
+ * @return {@code true} if a permit was acquired and {@code false}
+ * otherwise
+ */
+ Publisher tryAcquire();
+
+ /**
+ * Acquires the given number of permits only if all are available at the
+ * time of invocation.
+ *
+ * Acquires a permits, if all are available and returns immediately,
+ * with the value {@code true},
+ * reducing the number of available permits by given number of permits.
+ *
+ *
If no permits are available then this method will return
+ * immediately with the value {@code false}.
+ *
+ * @param permits the number of permits to acquire
+ * @return {@code true} if a permit was acquired and {@code false}
+ * otherwise
+ */
+ Publisher tryAcquire(int permits);
+
+ /**
+ * Acquires a permit from this semaphore.
+ *
+ * Acquires a permit, if one is available and returns immediately,
+ * reducing the number of available permits by one.
+ *
+ * @return void
+ *
+ */
+ Publisher acquire();
+
+ /**
+ * Acquires the given number of permits, if they are available,
+ * and returns immediately, reducing the number of available permits
+ * by the given amount.
+ *
+ * @param permits the number of permits to acquire
+ * @throws IllegalArgumentException if {@code permits} is negative
+ * @return void
+ */
+ Publisher acquire(int permits);
+
+ /**
+ * Releases a permit, returning it to the semaphore.
+ *
+ * Releases a permit, increasing the number of available permits by
+ * one. If any threads of Redisson client are trying to acquire a permit,
+ * then one is selected and given the permit that was just released.
+ *
+ *
There is no requirement that a thread that releases a permit must
+ * have acquired that permit by calling {@link #acquire()}.
+ * Correct usage of a semaphore is established by programming convention
+ * in the application.
+ *
+ * @return void
+ */
+ Publisher release();
+
+ /**
+ * Releases the given number of permits, returning them to the semaphore.
+ *
+ * Releases the given number of permits, increasing the number of available permits by
+ * the given number of permits. If any threads of Redisson client are trying to
+ * acquire a permits, then next threads is selected and tries to acquire the permits that was just released.
+ *
+ *
There is no requirement that a thread that releases a permits must
+ * have acquired that permit by calling {@link #acquire()}.
+ * Correct usage of a semaphore is established by programming convention
+ * in the application.
+ *
+ * @param permits amount
+ * @return void
+ */
+ Publisher release(int permits);
+
+ /**
+ * Sets number of permits.
+ *
+ * @param permits - number of permits
+ * @return true
if permits has been set successfully, otherwise false
.
+ */
+ Publisher trySetPermits(int permits);
+
+ /**
+ * Acquires a permit, if one is available and returns immediately,
+ * with the value {@code true},
+ * reducing the number of available permits by one.
+ *
+ *
If a permit is acquired then the value {@code true} is returned.
+ *
+ *
If the specified waiting time elapses then the value {@code false}
+ * is returned. If the time is less than or equal to zero, the method
+ * will not wait at all.
+ *
+ * @param waitTime the maximum time to wait for a permit
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if a permit was acquired and {@code false}
+ * if the waiting time elapsed before a permit was acquired
+ */
+ Publisher tryAcquire(long waitTime, TimeUnit unit);
+
+ /**
+ * Acquires the given number of permits only if all are available
+ * within the given waiting time.
+ *
+ * Acquires a permits, if all are available and returns immediately,
+ * with the value {@code true},
+ * reducing the number of available permits by one.
+ *
+ *
If a permits is acquired then the value {@code true} is returned.
+ *
+ *
If the specified waiting time elapses then the value {@code false}
+ * is returned. If the time is less than or equal to zero, the method
+ * will not wait at all.
+ *
+ * @param permits amount
+ * @param waitTime the maximum time to wait for a available permits
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if a permit was acquired and {@code false}
+ * if the waiting time elapsed before a permit was acquired
+ */
+ Publisher tryAcquire(int permits, long waitTime, TimeUnit unit);
+
+ /**
+ * Shrinks the number of available permits by the indicated
+ * reduction. This method can be useful in subclasses that use
+ * semaphores to track resources that become unavailable. This
+ * method differs from {@link #acquire()} in that it does not block
+ * waiting for permits to become available.
+ *
+ * @param permits - reduction the number of permits to remove
+ * @return void
+ * @throws IllegalArgumentException if {@code reduction} is negative
+ */
+ Publisher reducePermits(int permits);
+
+
+}
diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
index 1105eb7a4..31689bf14 100644
--- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
+++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java
@@ -30,6 +30,14 @@ import org.redisson.config.Config;
*/
public interface RedissonReactiveClient {
+ /**
+ * Returns semaphore instance by name
+ *
+ * @param name - name of object
+ * @return Semaphore object
+ */
+ RSemaphoreReactive getSemaphore(String name);
+
/**
* Returns readWriteLock instance by name.
*
diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java
index e7e8a35ff..ce0e0b9d8 100644
--- a/redisson/src/main/java/org/redisson/client/RedisClient.java
+++ b/redisson/src/main/java/org/redisson/client/RedisClient.java
@@ -175,6 +175,9 @@ public class RedisClient {
this.commandTimeout = config.getCommandTimeout();
}
+ public String getIpAddr() {
+ return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ }
public InetSocketAddress getAddr() {
return addr;
diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java
index 17dd7bbdb..652b75261 100644
--- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java
+++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java
@@ -39,6 +39,11 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
+/**
+ *
+ * @author Nikita Koksharov
+ *
+ */
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -82,12 +87,16 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
return;
}
- timer.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1));
- }
- }, timeout, TimeUnit.MILLISECONDS);
+ try {
+ timer.newTimeout(new TimerTask() {
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1));
+ }
+ }, timeout, TimeUnit.MILLISECONDS);
+ } catch (IllegalStateException e) {
+ // skip
+ }
}
private void tryReconnect(final RedisConnection connection, final int nextAttempt) {
diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
index e18b37d04..bb3e7d352 100644
--- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
+++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
@@ -76,13 +76,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private ScheduledFuture> monitorFuture;
private volatile URI lastClusterNode;
-
+
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config);
this.config = create(cfg);
initTimer(this.config);
- init(this.config);
Throwable lastException = null;
List failedMasters = new ArrayList();
@@ -92,13 +91,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
List nodes = connection.sync(RedisCommands.CLUSTER_NODES);
- if (log.isDebugEnabled()) {
- StringBuilder nodesValue = new StringBuilder();
- for (ClusterNodeInfo clusterNodeInfo : nodes) {
- nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
- }
- log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
+ StringBuilder nodesValue = new StringBuilder();
+ for (ClusterNodeInfo clusterNodeInfo : nodes) {
+ nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
+ log.info("Redis cluster nodes configuration got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
lastClusterNode = addr;
@@ -185,23 +182,19 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
RedisConnection connection = future.getNow();
- if (connection.isActive()) {
- nodeConnections.put(addr, connection);
- result.trySuccess(connection);
- } else {
- connection.closeAsync();
- result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
- }
- }
- });
+ if (connection.isActive()) {
+ nodeConnections.put(addr, connection);
+ result.trySuccess(connection);
+ } else {
+ connection.closeAsync();
+ result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
+ }
+ }
+ });
return result;
}
- @Override
- protected void initEntry(MasterSlaveServersConfig config) {
- }
-
private RFuture>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) {
if (partition.isMasterFail()) {
RedisException e = new RedisException("Failed to add master: " +
@@ -253,7 +246,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
final MasterSlaveEntry e;
List> futures = new ArrayList>();
- if (config.getReadMode() == ReadMode.MASTER) {
+ if (config.checkSkipSlavesInit()) {
e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
} else {
config.setSlaveAddresses(partition.getSlaveAddresses());
@@ -426,7 +419,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
aliveSlaves.removeAll(newPart.getFailedSlaveAddresses());
for (URI uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri);
- if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
+ if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
@@ -435,7 +428,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri);
- if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
+ if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
@@ -448,7 +441,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (URI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri);
- if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
+ if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
@@ -466,7 +459,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
currentPart.addSlaveAddress(uri);
- entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
+ entry.slaveUp(uri, FreezeReason.MANAGER);
log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
}
});
@@ -510,8 +503,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
ClusterPartition newMasterPart = find(newPartitions, slot);
// does partition has a new master?
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
- log.info("changing master from {} to {} for {}",
- currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot);
URI newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress();
diff --git a/redisson/src/main/java/org/redisson/codec/CompositeCodec.java b/redisson/src/main/java/org/redisson/codec/CompositeCodec.java
new file mode 100644
index 000000000..51750c5b5
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/codec/CompositeCodec.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2016 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.codec;
+
+import org.redisson.client.codec.Codec;
+import org.redisson.client.protocol.Decoder;
+import org.redisson.client.protocol.Encoder;
+
+/**
+ *
+ * @author Nikita Koksharov
+ *
+ */
+public class CompositeCodec implements Codec {
+
+ private final Codec mapKeyCodec;
+ private final Codec mapValueCodec;
+ private final Codec valueCodec;
+
+ public CompositeCodec(Codec mapKeyCodec, Codec mapValueCodec) {
+ this(mapKeyCodec, mapValueCodec, null);
+ }
+
+ public CompositeCodec(Codec mapKeyCodec, Codec mapValueCodec, Codec valueCodec) {
+ super();
+ this.mapKeyCodec = mapKeyCodec;
+ this.mapValueCodec = mapValueCodec;
+ this.valueCodec = valueCodec;
+ }
+
+ @Override
+ public Decoder