Merge branch 'master' into 3.0.0

pull/1933/head
Nikita Koksharov 6 years ago
commit aff9557813

@ -1,4 +1,4 @@
Redisson: Redis based In-Memory Data Grid for Java.<br/> State of the Art Redis Java client
Redisson: Redis Java client and In-Memory Data Grid
====
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.10.0) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)**

@ -917,7 +917,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
popTimeout = Long.valueOf(param.toString()) / 1000;
break;
}
if (param instanceof String) {
if ("BLOCK".equals(param)) {
found = true;
}
}
@ -1080,24 +1080,19 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return;
}
if (future.cause() instanceof RedisLoadingException) {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
}
}, 1, TimeUnit.SECONDS);
AsyncDetails.release(details);
return;
if (future.cause() instanceof RedisLoadingException
|| future.cause() instanceof RedisTryAgainException) {
if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt() + 1, ignoreRedirect);
}
}, Math.min(connectionManager.getConfig().getTimeout(), 1000), TimeUnit.MILLISECONDS);
AsyncDetails.release(details);
return;
}
}
free(details.getParams());

@ -776,20 +776,18 @@ public class CommandBatchService extends CommandAsyncService {
execute(entry, nodeSource, mainPromise, slots, attempt, options);
return;
}
if (future.cause() instanceof RedisLoadingException) {
entry.clearErrors();
execute(entry, source, mainPromise, slots, attempt, options);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
entry.clearErrors();
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
execute(entry, source, mainPromise, slots, attempt, options);
}
}, 1, TimeUnit.SECONDS);
return;
if (future.cause() instanceof RedisLoadingException
|| future.cause() instanceof RedisTryAgainException) {
if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) {
entry.clearErrors();
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
execute(entry, source, mainPromise, slots, attempt + 1, options);
}
}, Math.min(connectionManager.getConfig().getTimeout(), 1000), TimeUnit.MILLISECONDS);
return;
}
}
free(entry);

@ -126,7 +126,6 @@ public class LoadBalancerManager {
return unfreeze(entry, freezeReason);
}
public boolean unfreeze(ClientConnectionsEntry entry, FreezeReason freezeReason) {
synchronized (entry) {
if (!entry.isFreezed()) {
@ -138,6 +137,9 @@ public class LoadBalancerManager {
entry.resetFirstFail();
entry.setFreezed(false);
entry.setFreezeReason(null);
slaveConnectionPool.initConnections(entry);
pubSubConnectionPool.initConnections(entry);
return true;
}
}

@ -83,6 +83,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
return promise;
}
public RPromise<Void> initConnections(ClientConnectionsEntry entry) {
RPromise<Void> promise = new RedissonPromise<Void>();
initConnections(entry, promise, false);
return promise;
}
private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean checkFreezed) {
final int minimumIdleSize = getMinimumIdleSize(entry);
@ -408,16 +414,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
if (future.isSuccess() && "PONG".equals(future.getNow())) {
entry.resetFirstFail();
RPromise<Void> promise = new RedissonPromise<Void>();
promise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT);
log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
}
});
initConnections(entry, promise, false);
if (masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT)) {
log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
}
} else {
scheduleCheck(entry);
}

@ -31,7 +31,7 @@ public class LogHelper {
private static final int MAX_COLLECTION_LOG_SIZE = Integer.valueOf(System.getProperty("redisson.maxCollectionLogSize", "10"));
private static final int MAX_STRING_LOG_SIZE = Integer.valueOf(System.getProperty("redisson.maxStringLogSize", "100"));
private static final int MAX_BYTEBUF_LOG_SIZE = Integer.valueOf(System.getProperty("redisson.maxByteBufLogSize", "10000"));
private static final int MAX_BYTEBUF_LOG_SIZE = Integer.valueOf(System.getProperty("redisson.maxByteBufLogSize", "1000"));
private LogHelper() {
}
@ -53,13 +53,14 @@ public class LogHelper {
return cd.getCommand() + ", params: " + LogHelper.toString(cd.getParams());
} else if (object instanceof ByteBuf) {
final ByteBuf byteBuf = (ByteBuf) object;
if (byteBuf.refCnt() > 0) {
if (byteBuf.writerIndex() > MAX_BYTEBUF_LOG_SIZE) {
return new StringBuilder(byteBuf.toString(0, MAX_BYTEBUF_LOG_SIZE, CharsetUtil.UTF_8)).append("...").toString();
} else {
return byteBuf.toString(0, byteBuf.writerIndex(), CharsetUtil.UTF_8);
}
}
// can't be used due to Buffer Leak error is appeared in log
// if (byteBuf.refCnt() > 0) {
// if (byteBuf.writerIndex() > MAX_BYTEBUF_LOG_SIZE) {
// return new StringBuilder(byteBuf.toString(0, MAX_BYTEBUF_LOG_SIZE, CharsetUtil.UTF_8)).append("...").toString();
// } else {
// return byteBuf.toString(0, byteBuf.writerIndex(), CharsetUtil.UTF_8);
// }
// }
return byteBuf.toString();
} else {
return String.valueOf(object);

@ -223,6 +223,42 @@ public class RedissonStreamTest extends BaseTest {
assertThat(s2.get("test1").keySet()).containsExactly(id12, id13);
assertThat(s2.get("test2").keySet()).containsExactly(id22, id23);
}
@Test
public void testReadGroupBlocking() {
RStream<String, String> stream = redisson.getStream("test");
StreamMessageId id0 = stream.add("0", "0");
stream.createGroup("testGroup", id0);
stream.add("1", "1");
stream.add("2", "2");
stream.add("3", "3");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1", 3, 5, TimeUnit.SECONDS);
assertThat(s.values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
assertThat(s.size()).isEqualTo(3);
stream.removeGroup("testGroup");
stream.createGroup("testGroup", id0);
stream.add("1", "1");
stream.add("2", "2");
stream.add("3", "3");
RStream<String, String> stream2 = redisson.getStream("test2");
StreamMessageId id1 = stream2.add("0", "0");
stream2.createGroup("testGroup", id1);
// Map<String, Map<StreamMessageId, Map<String, String>>> s2 = stream.readGroup("testGroup", "consumer1", 3, 5, TimeUnit.SECONDS, id0, Collections.singletonMap("test2", id1));
// assertThat(s2.values().iterator().next().values().iterator().next().keySet()).containsAnyOf("1", "2", "3");
// assertThat(s2.size()).isEqualTo(3);
}
@Test
public void testReadGroup() {
@ -355,10 +391,22 @@ public class RedissonStreamTest extends BaseTest {
t.start();
long start = System.currentTimeMillis();
Map<StreamMessageId, Map<String, String>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamMessageId(0));
Map<StreamMessageId, Map<String, String>> s = stream.read(2, 4, TimeUnit.SECONDS, new StreamMessageId(0));
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get(new StreamMessageId(1))).isEqualTo(entries1);
StreamMessageId id0 = stream.add("11", "11");
stream.add("22", "22");
RStream<String, String> stream2 = redisson.getStream("test2");
StreamMessageId id1 = stream2.add("33", "33");
stream2.add("44", "44");
Map<String, Map<StreamMessageId, Map<String, String>>> s2 = stream.read(5, TimeUnit.SECONDS, id0, Collections.singletonMap("test2", id1));
assertThat(s2.values().iterator().next().values().iterator().next().keySet()).containsAnyOf("11", "22", "33", "44");
assertThat(s2.keySet()).containsExactlyInAnyOrder("test", "test2");
}
@Test

Loading…
Cancel
Save