Fixed - Executed remote tasks are not removed from Redis. #1393

pull/1461/head
Nikita 7 years ago
parent 14fe53321c
commit 2222e3d134

@ -187,7 +187,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
final String requestId = future.getNow();
RMap<String, RemoteServiceRequest> tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RFuture<RemoteServiceRequest> taskFuture = tasks.getAsync(requestId);
RFuture<RemoteServiceRequest> taskFuture = tasks.removeAsync(requestId);
taskFuture.addListener(new FutureListener<RemoteServiceRequest>() {
@Override

@ -72,26 +72,6 @@ public class ScheduledTasksService extends TasksService {
request.getArgs()[requestIndex] = request.getId();
Long startTime = (Long)request.getArgs()[3];
if (requestId != null) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state and previous task exists
"if redis.call('exists', KEYS[2]) == 0 and redis.call('hexists', KEYS[5], ARGV[2]) == 1 then "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[2]);"
+ "redis.call('hset', KEYS[5], ARGV[2], ARGV[3]);"
+ "redis.call('incr', KEYS[1]);"
// if new task added to queue head then publish its startTime
// to all scheduler workers
+ "local v = redis.call('zrange', KEYS[3], 0, 0); "
+ "if v[1] == ARGV[2] then "
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounterName, statusName, schedulerQueueName, schedulerChannelName, tasksName),
startTime, request.getId(), encode(request));
}
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// check if executor service not in shutdown state
"if redis.call('exists', KEYS[2]) == 0 then "

@ -138,15 +138,9 @@ public class TasksService extends BaseRemoteService {
}
public RFuture<Boolean> cancelExecutionAsync(final RequestId requestId) {
final Class<?> syncInterface = RemoteExecutorService.class;
if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) {
return RedissonPromise.newSucceededFuture(false);
}
final RPromise<Boolean> result = new RedissonPromise<Boolean>();
String requestQueueName = getRequestQueueName(syncInterface);
String requestQueueName = getRequestQueueName(RemoteExecutorService.class);
RFuture<Boolean> removeFuture = removeAsync(requestQueueName, requestId);
removeFuture.addListener(new FutureListener<Boolean>() {
@Override

@ -424,7 +424,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
r2.shutdown();
}
}
@Test
public void testInvocations() {
RedissonClient r1 = createInstance();
@ -451,6 +451,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
assertThat(e.getCause().getMessage()).isEqualTo("/ by zero");
}
assertThat(r1.getKeys().count()).isZero();
r1.shutdown();
r2.shutdown();
}

Loading…
Cancel
Save