Feature - TaskFailureListener, TaskFinishedListener, TaskStartedListener and TaskSuccessListener task listeners added to RExecutorService WorkerOptions object. #1501

pull/2504/head
Nikita Koksharov 5 years ago
parent f9209d1fce
commit 7bac576f72

@ -312,6 +312,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
es = options.getExecutorService(); es = options.getExecutorService();
} }
remoteService.setListeners(options.getListeners());
remoteService.setTaskTimeout(options.getTaskTimeout()); remoteService.setTaskTimeout(options.getTaskTimeout());
remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es); remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es);
workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener<String>() { workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener<String>() {

@ -15,9 +15,12 @@
*/ */
package org.redisson.api; package org.redisson.api;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.api.executor.TaskListener;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactory;
@ -33,6 +36,7 @@ public final class WorkerOptions {
private ExecutorService executorService; private ExecutorService executorService;
private BeanFactory beanFactory; private BeanFactory beanFactory;
private long taskTimeout; private long taskTimeout;
private List<TaskListener> listeners = new ArrayList<>();
private WorkerOptions() { private WorkerOptions() {
} }
@ -92,9 +96,9 @@ public final class WorkerOptions {
/** /**
* Defines task timeout since moment of task execution start * Defines task timeout since moment of task execution start
* *
* @param timeout * @param timeout - timeout of task
* @param unit * @param unit - time unit
* @return * @return self instance
*/ */
public WorkerOptions taskTimeout(long timeout, TimeUnit unit) { public WorkerOptions taskTimeout(long timeout, TimeUnit unit) {
this.taskTimeout = unit.toMillis(timeout); this.taskTimeout = unit.toMillis(timeout);
@ -105,4 +109,23 @@ public final class WorkerOptions {
return taskTimeout; return taskTimeout;
} }
/**
* Adds task listener
*
* @see org.redisson.api.executor.TaskSuccessListener
* @see org.redisson.api.executor.TaskFailureListener
* @see org.redisson.api.executor.TaskStartedListener
* @see org.redisson.api.executor.TaskFinishedListener
*
* @param listener - task listener
* @return self instance
*/
public WorkerOptions addListener(TaskListener listener) {
listeners.add(listener);
return this;
}
public List<TaskListener> getListeners() {
return listeners;
}
} }

@ -0,0 +1,34 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.executor;
/**
* Task listener invoked when task was failed during execution
*
* @author Nikita Koksharov
*
*/
public interface TaskFailureListener extends TaskListener {
/**
* Invoked when task was failed during execution
*
* @param taskId - id of task
* @param exception - exception during task execution
*/
void onFailed(String taskId, Throwable exception);
}

@ -0,0 +1,33 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.executor;
/**
* Task listener invoked when task was finished
*
* @author Nikita Koksharov
*
*/
public interface TaskFinishedListener extends TaskListener {
/**
* Invoked when task finished
*
* @param taskId - id of task
*/
void onFinished(String taskId);
}

@ -0,0 +1,27 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.executor;
import java.util.EventListener;
/**
* Base task listener interface
*
* @author Nikita Koksharov
*
*/
public interface TaskListener extends EventListener {
}

@ -0,0 +1,33 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.executor;
/**
* Task listener invoked when task was started
*
* @author Nikita Koksharov
*
*/
public interface TaskStartedListener extends TaskListener {
/**
* Invoked when task was started
*
* @param taskId - id of task
*/
void onStarted(String taskId);
}

@ -0,0 +1,34 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.executor;
/**
* Task listener invoked when task was succeeded
*
* @author Nikita Koksharov
*
*/
public interface TaskSuccessListener extends TaskListener {
/**
* Invoked when task was succeeded
*
* @param taskId - id of task
* @param result - result of task
*/
<T> void onSucceeded(String taskId, T result);
}

@ -17,9 +17,8 @@ package org.redisson.executor;
import org.redisson.RedissonExecutorService; import org.redisson.RedissonExecutorService;
import org.redisson.RedissonRemoteService; import org.redisson.RedissonRemoteService;
import org.redisson.api.RBlockingQueue; import org.redisson.api.*;
import org.redisson.api.RFuture; import org.redisson.api.executor.*;
import org.redisson.api.RMap;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncService; import org.redisson.command.CommandAsyncService;
@ -27,10 +26,12 @@ import org.redisson.misc.RPromise;
import org.redisson.remote.*; import org.redisson.remote.*;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/** /**
* *
@ -46,6 +47,10 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
private String terminationTopicName; private String terminationTopicName;
private String schedulerQueueName; private String schedulerQueueName;
private long taskTimeout; private long taskTimeout;
private List<TaskStartedListener> startedListeners;
private List<TaskFinishedListener> finishedListeners;
private List<TaskFailureListener> failureListeners;
private List<TaskSuccessListener> successListeners;
public RedissonExecutorRemoteService(Codec codec, String name, public RedissonExecutorRemoteService(Codec codec, String name,
CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) { CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
@ -82,12 +87,49 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
@Override @Override
protected <T> void invokeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) { protected <T> void invokeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) {
startedListeners.stream().forEach(l -> l.onStarted(request.getId()));
if (taskTimeout > 0) { if (taskTimeout > 0) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> { commandExecutor.getConnectionManager().getGroup().schedule(() -> {
((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false)); ((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false));
}, taskTimeout, TimeUnit.MILLISECONDS); }, taskTimeout, TimeUnit.MILLISECONDS);
} }
super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder); super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder);
if (responseHolder.get() instanceof RemoteServiceResponse) {
RemoteServiceResponse response = (RemoteServiceResponse) responseHolder.get();
if (response.getError() == null) {
successListeners.stream().forEach(l -> l.onSucceeded(request.getId(), response.getResult()));
} else {
failureListeners.stream().forEach(l -> l.onFailed(request.getId(), response.getError()));
}
} else {
failureListeners.stream().forEach(l -> l.onFailed(request.getId(), null));
}
finishedListeners.stream().forEach(l -> l.onFinished(request.getId()));
}
public void setListeners(List<TaskListener> listeners) {
startedListeners = listeners.stream()
.filter(x -> x instanceof TaskStartedListener)
.map(x -> (TaskStartedListener) x)
.collect(Collectors.toList());
finishedListeners = listeners.stream()
.filter(x -> x instanceof TaskFinishedListener)
.map(x -> (TaskFinishedListener) x)
.collect(Collectors.toList());
failureListeners = listeners.stream()
.filter(x -> x instanceof TaskFailureListener)
.map(x -> (TaskFailureListener) x)
.collect(Collectors.toList());
successListeners = listeners.stream()
.filter(x -> x instanceof TaskSuccessListener)
.map(x -> (TaskSuccessListener) x)
.collect(Collectors.toList());
} }
public void setTaskTimeout(long taskTimeout) { public void setTaskTimeout(long taskTimeout) {

@ -8,13 +8,7 @@ import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Duration; import org.awaitility.Duration;
@ -27,6 +21,8 @@ import org.redisson.Redisson;
import org.redisson.RedissonNode; import org.redisson.RedissonNode;
import org.redisson.api.*; import org.redisson.api.*;
import org.redisson.api.annotation.RInject; import org.redisson.api.annotation.RInject;
import org.redisson.api.executor.TaskFinishedListener;
import org.redisson.api.executor.TaskStartedListener;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig; import org.redisson.config.RedissonNodeConfig;
import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.connection.balancer.RandomLoadBalancer;
@ -312,6 +308,44 @@ public class RedissonExecutorServiceTest extends BaseTest {
assertThat(id).hasSize(34); assertThat(id).hasSize(34);
} }
@Test
public void testTaskStarted() throws InterruptedException {
RExecutorService executor = redisson.getExecutorService("test1");
CountDownLatch l = new CountDownLatch(1);
executor.registerWorkers(WorkerOptions.defaults().addListener(new TaskStartedListener() {
@Override
public void onStarted(String taskId) {
assertThat(taskId).isNotEmpty();
l.countDown();
}
}));
RExecutorFuture<?> future = executor.submit(new RunnableTask());
l.await();
executor.shutdown();
}
@Test
public void testTaskFinished() throws InterruptedException {
RExecutorService executor = redisson.getExecutorService("test1");
CountDownLatch l = new CountDownLatch(1);
executor.registerWorkers(WorkerOptions.defaults().addListener(new TaskFinishedListener() {
@Override
public void onFinished(String taskId) {
assertThat(taskId).isNotEmpty();
l.countDown();
}
}));
RExecutorFuture<?> future = executor.submit(new RunnableTask());
l.await();
executor.shutdown();
}
@Test @Test
public void testTaskTimeout() throws InterruptedException { public void testTaskTimeout() throws InterruptedException {
RExecutorService executor = redisson.getExecutorService("test1"); RExecutorService executor = redisson.getExecutorService("test1");
@ -322,6 +356,8 @@ public class RedissonExecutorServiceTest extends BaseTest {
Thread.sleep(1050); Thread.sleep(1050);
assertThat(future.isCancelled()).isTrue(); assertThat(future.isCancelled()).isTrue();
executor.shutdown();
} }
@Test @Test

Loading…
Cancel
Save