Merge branch 'timezone_fix' of https://github.com/arpit728/redisson into timezone_fix

pull/1547/head
arpit728 7 years ago
commit 6ed77d97d2

@ -17,7 +17,6 @@ package org.redisson.api;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@ -121,7 +120,7 @@ public interface RScheduledExecutorService extends RExecutorService, ScheduledEx
* @param cronSchedule- cron schedule object
* @return future object
*/
ScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule);
RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule);
/**
* Use {@link #cancelTask(String)}

@ -96,7 +96,7 @@ public class ConfigSupport {
}
@JsonIgnoreProperties("clusterConfig")
@JsonIgnoreProperties({"clusterConfig", "sentinelConfig"})
public static class ConfigMixIn {
@JsonProperty

@ -242,7 +242,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
if (addr != null) {
client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts(), sslHostname);
client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
}
final RPromise<RedisConnection> result = new RedissonPromise<RedisConnection>();
RFuture<RedisConnection> future = client.connectAsync();

@ -42,6 +42,8 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
import org.redisson.misc.Injector;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
@ -62,7 +64,7 @@ import java.util.concurrent.TimeUnit;
*/
public class TasksRunnerService implements RemoteExecutorService {
private final Map<String, Codec> codecs = new LRUCacheMap<String, Codec>(500, 0, 0);
private final Map<HashValue, Codec> codecs = new LRUCacheMap<HashValue, Codec>(500, 0, 0);
private final Codec codec;
private final String name;
@ -257,14 +259,16 @@ public class TasksRunnerService implements RemoteExecutorService {
@SuppressWarnings("unchecked")
private <T> T decode(String className, byte[] classBody, ByteBuf buf) throws IOException {
ByteBuf classBodyBuf = ByteBufAllocator.DEFAULT.buffer(classBody.length);
try {
Codec classLoaderCodec = codecs.get(className);
HashValue hash = new HashValue(Hash.hash128(classBodyBuf));
Codec classLoaderCodec = codecs.get(hash);
if (classLoaderCodec == null) {
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
classLoaderCodec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl);
codecs.put(className, classLoaderCodec);
codecs.put(hash, classLoaderCodec);
}
T task = (T) classLoaderCodec.getValueDecoder().decode(buf, null);
@ -272,8 +276,9 @@ public class TasksRunnerService implements RemoteExecutorService {
return task;
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
} finally {
classBodyBuf.release();
}
}
@Override

@ -52,6 +52,8 @@ import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config;
import org.redisson.config.ReadMode;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.CRC16;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.MasterSlaveConnectionManager;
@ -355,7 +357,7 @@ public class RedissonTest {
Thread.sleep(15000);
latch.await();
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
int errors = 0;
int success = 0;
@ -387,8 +389,75 @@ public class RedissonTest {
slave1.stop();
slave2.stop();
}
@Test
public void testFailoverWithoutErrorsInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setRetryAttempts(30)
.setReadMode(ReadMode.MASTER)
.setSubscriptionMode(SubscriptionMode.MASTER)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get();
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
// @Test
Set<InetSocketAddress> oldMasters = new HashSet<>();
Collection<ClusterNode> masterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER);
for (ClusterNode clusterNode : masterNodes) {
oldMasters.add(clusterNode.getAddr());
}
master.stop();
for (int j = 0; j < 2000; j++) {
RFuture<?> f2 = redisson.getBucket("" + j).setAsync("");
futures.add(f2);
}
System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(20));
RedisProcess newMaster = null;
Collection<ClusterNode> newMasterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER);
for (ClusterNode clusterNode : newMasterNodes) {
if (!oldMasters.contains(clusterNode.getAddr())) {
newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get();
break;
}
}
assertThat(newMaster).isNotNull();
for (RFuture<?> rFuture : futures) {
rFuture.awaitUninterruptibly();
if (!rFuture.isSuccess()) {
Assert.fail();
}
}
redisson.shutdown();
process.shutdown();
}
@Test
public void testFailoverInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
@ -441,38 +510,37 @@ public class RedissonTest {
t.start();
t.join(1000);
Set<InetSocketAddress> addresses = new HashSet<>();
Set<InetSocketAddress> oldMasters = new HashSet<>();
Collection<ClusterNode> masterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER);
for (ClusterNode clusterNode : masterNodes) {
addresses.add(clusterNode.getAddr());
oldMasters.add(clusterNode.getAddr());
}
master.stop();
System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(80));
Thread.sleep(TimeUnit.SECONDS.toMillis(90));
RedisProcess newMaster = null;
Collection<ClusterNode> newMasterNodes = redisson.getClusterNodesGroup().getNodes(NodeType.MASTER);
for (ClusterNode clusterNode : newMasterNodes) {
if (!addresses.contains(clusterNode.getAddr())) {
if (!oldMasters.contains(clusterNode.getAddr())) {
newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get();
break;
}
System.out.println("new-master: " + clusterNode.getAddr());
}
Thread.sleep(50000);
assertThat(newMaster).isNotNull();
Thread.sleep(30000);
newMaster.stop();
System.out.println("new master " + newMaster.getRedisServerAddressAndPort() + " has been stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
Thread.sleep(60000);
Thread.sleep(TimeUnit.SECONDS.toMillis(80));
latch.await();
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
int errors = 0;
int success = 0;

@ -297,7 +297,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
e.execute(new RunnableTask());
}
e.shutdown();
assertThat(e.awaitTermination(900, TimeUnit.MILLISECONDS)).isTrue();
assertThat(e.awaitTermination(1000, TimeUnit.MILLISECONDS)).isTrue();
}
@Test

Loading…
Cancel
Save