Merge branch 'master' into 3.0.0

pull/1821/head
Nikita Koksharov 6 years ago
commit 386a0f7852

@ -689,7 +689,7 @@ public abstract class BaseRemoteService {
byte[] id = new byte[17];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
id[0] = 0;
id[0] = 00;
return new RequestId(id);
}
@ -701,7 +701,7 @@ public abstract class BaseRemoteService {
private void cancelExecution(RemoteInvocationOptions optionsCopy,
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.fastPutAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
// subscribe for async result if it's not expected before

@ -362,7 +362,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// could be removed not from future object
if (future.getNow().isSendResponse()) {
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
map.putAsync(request.getId(), response);
map.fastPutAsync(request.getId(), response);
map.expireAsync(60, TimeUnit.SECONDS);
}
}

@ -264,4 +264,13 @@ public class RedissonTopic implements RTopic {
}
}
@Override
public int countListeners() {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry != null) {
return entry.countListeners();
}
return 0;
}
}

@ -84,4 +84,10 @@ public interface RTopic extends RTopicAsync {
*/
void removeAllListeners();
/**
* Returns amount of registered listeners
*
* @return amount of listeners
*/
int countListeners();
}

@ -50,4 +50,9 @@ public abstract class BaseCodec implements Codec {
return getClass().getClassLoader();
}
@Override
public String toString() {
return getClass().getName();
}
}

@ -146,7 +146,7 @@ public class TasksRunnerService implements RemoteExecutorService {
future = service.schedule(params);
}
try {
executeRunnable(params);
executeRunnable(params, nextStartDate);
} catch (RuntimeException e) {
// cancel task if it throws an exception
if (future != null) {
@ -209,7 +209,7 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
finish(params.getRequestId());
finish(params.getRequestId(), null);
}
}
@ -293,8 +293,7 @@ public class TasksRunnerService implements RemoteExecutorService {
}
}
@Override
public void executeRunnable(TaskParameters params) {
public void executeRunnable(TaskParameters params, Date nextDate) {
if (params.getRequestId() != null && params.getRequestId().startsWith("00")) {
renewRetryTime(params.getRequestId());
}
@ -309,10 +308,15 @@ public class TasksRunnerService implements RemoteExecutorService {
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
finish(params.getRequestId());
finish(params.getRequestId(), nextDate);
}
}
@Override
public void executeRunnable(TaskParameters params) {
executeRunnable(params, null);
}
/**
* Check shutdown state. If tasksCounter equals <code>0</code>
* and executor in <code>shutdown</code> state, then set <code>terminated</code> state
@ -323,14 +327,15 @@ public class TasksRunnerService implements RemoteExecutorService {
*
* @param requestId
*/
private void finish(String requestId) {
commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
private void finish(String requestId, Date nextDate) {
String script = "";
if (nextDate == null) {
script += "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then "
+ "redis.call('hdel', KEYS[4], ARGV[3]); "
+ "end;" +
"redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" +
+ "end;";
}
script += "redis.call('zrem', KEYS[5], 'ff' .. ARGV[3]);" +
"if redis.call('decr', KEYS[1]) == 0 then "
+ "redis.call('del', KEYS[1]);"
+ "if redis.call('get', KEYS[2]) == ARGV[1] then "
@ -338,7 +343,10 @@ public class TasksRunnerService implements RemoteExecutorService {
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;"
+ "end;",
+ "end;";
commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
script,
Arrays.<Object>asList(tasksCounterName, statusName, terminationTopicName, tasksName, schedulerQueueName, tasksRetryIntervalName),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE, requestId);
}

@ -53,6 +53,10 @@ public class PubSubConnectionEntry {
this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection);
}
public int countListeners() {
return channelListeners.size();
}
public boolean hasListeners(ChannelName channelName) {
return channelListeners.containsKey(channelName);
}

@ -107,6 +107,20 @@ public class RedissonTopicTest {
}
@Test
public void testCountListeners() {
RedissonClient redisson = BaseTest.createInstance();
RTopic topic1 = redisson.getTopic("topic", LongCodec.INSTANCE);
assertThat(topic1.countListeners()).isZero();
int id = topic1.addListener(Long.class, (channel, msg) -> {
});
assertThat(topic1.countListeners()).isOne();
topic1.removeListener(id);
assertThat(topic1.countListeners()).isZero();
redisson.shutdown();
}
@Test
public void testPing() throws InterruptedException {
Config config = BaseTest.createConfig();

@ -118,6 +118,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();
@ -181,6 +182,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();
@ -234,6 +236,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();

@ -17,6 +17,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.Redisson;
import org.redisson.RedissonExecutorService;
import org.redisson.RedissonNode;
import org.redisson.api.CronSchedule;
@ -24,7 +25,9 @@ import org.redisson.api.ExecutorOptions;
import org.redisson.api.RExecutorFuture;
import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScheduledFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RInject;
import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig;
@ -55,6 +58,43 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
node.shutdown();
}
public static class TestTask implements Runnable {
@RInject
RedissonClient redisson;
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisson.getAtomicLong("counter").incrementAndGet();
}
}
@Test
public void testSingleWorker() throws InterruptedException {
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.getExecutorServiceWorkers().put("JobA", 1);
RedissonNode node = RedissonNode.create(nodeConfig);
node.start();
RedissonClient client = Redisson.create(config);
RScheduledExecutorService executorService = client.getExecutorService("JobA");
executorService.schedule(new TestTask() , CronSchedule.of("0/1 * * * * ?"));
TimeUnit.MILLISECONDS.sleep(4800);
assertThat(client.getAtomicLong("counter").get()).isEqualTo(4);
client.shutdown();
node.shutdown();
}
@Test
public void testDelay() throws InterruptedException {
RScheduledExecutorService executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
@ -106,6 +146,7 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test2", 1));
node.shutdown();
node = RedissonNode.create(nodeConfig);
node.start();

Loading…
Cancel
Save