Fixed - response queue expiration time set to one hour. #1059

pull/1461/head
Nikita 7 years ago
parent 2222e3d134
commit 9a31ce81a2

@ -92,6 +92,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class); private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class);
private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS);
public static final int SHUTDOWN_STATE = 1; public static final int SHUTDOWN_STATE = 1;
public static final int TERMINATED_STATE = 2; public static final int TERMINATED_STATE = 2;
@ -174,7 +176,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName); executorRemoteService.setStatusName(statusName);
executorRemoteService.setTasksName(tasksName); executorRemoteService.setTasksName(tasksName);
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses); scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses);
@ -184,7 +186,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
scheduledRemoteService.setSchedulerQueueName(schedulerQueueName); scheduledRemoteService.setSchedulerQueueName(schedulerQueueName);
scheduledRemoteService.setSchedulerChannelName(schedulerChannelName); scheduledRemoteService.setSchedulerChannelName(schedulerChannelName);
scheduledRemoteService.setTasksName(tasksName); scheduledRemoteService.setTasksName(tasksName);
asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); asyncScheduledService = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); asyncScheduledServiceAtFixed = scheduledRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
} }
@ -471,7 +473,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>(); List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
TasksBatchService executorRemoteService = createBatchService(); TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Callable<?> task : tasks) { for (Callable<?> task : tasks) {
check(task); check(task);
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
@ -496,7 +498,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
TasksBatchService executorRemoteService = createBatchService(); TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>(); final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Callable<?> task : tasks) { for (Callable<?> task : tasks) {
check(task); check(task);
@ -599,7 +601,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>(); List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
TasksBatchService executorRemoteService = createBatchService(); TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Runnable task : tasks) { for (Runnable task : tasks) {
check(task); check(task);
byte[] classBody = getClassBody(task); byte[] classBody = getClassBody(task);
@ -624,7 +626,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
TasksBatchService executorRemoteService = createBatchService(); TasksBatchService executorRemoteService = createBatchService();
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS)); RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>(); final List<RExecutorFuture<?>> result = new ArrayList<RExecutorFuture<?>>();
for (Runnable task : tasks) { for (Runnable task : tasks) {
check(task); check(task);

Loading…
Cancel
Save