From 7bac576f72d81db949cea36861c345d21b99ec85 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 25 Dec 2019 14:22:33 +0300 Subject: [PATCH] Feature - TaskFailureListener, TaskFinishedListener, TaskStartedListener and TaskSuccessListener task listeners added to RExecutorService WorkerOptions object. #1501 --- .../org/redisson/RedissonExecutorService.java | 1 + .../java/org/redisson/api/WorkerOptions.java | 29 +++++++++-- .../api/executor/TaskFailureListener.java | 34 +++++++++++++ .../api/executor/TaskFinishedListener.java | 33 ++++++++++++ .../redisson/api/executor/TaskListener.java | 27 ++++++++++ .../api/executor/TaskStartedListener.java | 33 ++++++++++++ .../api/executor/TaskSuccessListener.java | 34 +++++++++++++ .../RedissonExecutorRemoteService.java | 48 ++++++++++++++++-- .../executor/RedissonExecutorServiceTest.java | 50 ++++++++++++++++--- 9 files changed, 276 insertions(+), 13 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/executor/TaskFailureListener.java create mode 100644 redisson/src/main/java/org/redisson/api/executor/TaskFinishedListener.java create mode 100644 redisson/src/main/java/org/redisson/api/executor/TaskListener.java create mode 100644 redisson/src/main/java/org/redisson/api/executor/TaskStartedListener.java create mode 100644 redisson/src/main/java/org/redisson/api/executor/TaskSuccessListener.java diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 3de3bdee6..0398540c0 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -312,6 +312,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { es = options.getExecutorService(); } + remoteService.setListeners(options.getListeners()); remoteService.setTaskTimeout(options.getTaskTimeout()); remoteService.register(RemoteExecutorService.class, service, options.getWorkers(), es); workersGroupListenerId = workersTopic.addListener(String.class, new MessageListener() { diff --git a/redisson/src/main/java/org/redisson/api/WorkerOptions.java b/redisson/src/main/java/org/redisson/api/WorkerOptions.java index 4db640a75..f568285ec 100644 --- a/redisson/src/main/java/org/redisson/api/WorkerOptions.java +++ b/redisson/src/main/java/org/redisson/api/WorkerOptions.java @@ -15,9 +15,12 @@ */ package org.redisson.api; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import org.redisson.api.executor.TaskListener; import org.redisson.config.Config; import org.springframework.beans.factory.BeanFactory; @@ -33,6 +36,7 @@ public final class WorkerOptions { private ExecutorService executorService; private BeanFactory beanFactory; private long taskTimeout; + private List listeners = new ArrayList<>(); private WorkerOptions() { } @@ -92,9 +96,9 @@ public final class WorkerOptions { /** * Defines task timeout since moment of task execution start * - * @param timeout - * @param unit - * @return + * @param timeout - timeout of task + * @param unit - time unit + * @return self instance */ public WorkerOptions taskTimeout(long timeout, TimeUnit unit) { this.taskTimeout = unit.toMillis(timeout); @@ -105,4 +109,23 @@ public final class WorkerOptions { return taskTimeout; } + /** + * Adds task listener + * + * @see org.redisson.api.executor.TaskSuccessListener + * @see org.redisson.api.executor.TaskFailureListener + * @see org.redisson.api.executor.TaskStartedListener + * @see org.redisson.api.executor.TaskFinishedListener + * + * @param listener - task listener + * @return self instance + */ + public WorkerOptions addListener(TaskListener listener) { + listeners.add(listener); + return this; + } + + public List getListeners() { + return listeners; + } } diff --git a/redisson/src/main/java/org/redisson/api/executor/TaskFailureListener.java b/redisson/src/main/java/org/redisson/api/executor/TaskFailureListener.java new file mode 100644 index 000000000..c86f47169 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/executor/TaskFailureListener.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.executor; + +/** + * Task listener invoked when task was failed during execution + * + * @author Nikita Koksharov + * + */ +public interface TaskFailureListener extends TaskListener { + + /** + * Invoked when task was failed during execution + * + * @param taskId - id of task + * @param exception - exception during task execution + */ + void onFailed(String taskId, Throwable exception); + +} diff --git a/redisson/src/main/java/org/redisson/api/executor/TaskFinishedListener.java b/redisson/src/main/java/org/redisson/api/executor/TaskFinishedListener.java new file mode 100644 index 000000000..28816b72e --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/executor/TaskFinishedListener.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.executor; + +/** + * Task listener invoked when task was finished + * + * @author Nikita Koksharov + * + */ +public interface TaskFinishedListener extends TaskListener { + + /** + * Invoked when task finished + * + * @param taskId - id of task + */ + void onFinished(String taskId); + +} diff --git a/redisson/src/main/java/org/redisson/api/executor/TaskListener.java b/redisson/src/main/java/org/redisson/api/executor/TaskListener.java new file mode 100644 index 000000000..360a22876 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/executor/TaskListener.java @@ -0,0 +1,27 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.executor; + +import java.util.EventListener; + +/** + * Base task listener interface + * + * @author Nikita Koksharov + * + */ +public interface TaskListener extends EventListener { +} diff --git a/redisson/src/main/java/org/redisson/api/executor/TaskStartedListener.java b/redisson/src/main/java/org/redisson/api/executor/TaskStartedListener.java new file mode 100644 index 000000000..02459a0df --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/executor/TaskStartedListener.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.executor; + +/** + * Task listener invoked when task was started + * + * @author Nikita Koksharov + * + */ +public interface TaskStartedListener extends TaskListener { + + /** + * Invoked when task was started + * + * @param taskId - id of task + */ + void onStarted(String taskId); + +} diff --git a/redisson/src/main/java/org/redisson/api/executor/TaskSuccessListener.java b/redisson/src/main/java/org/redisson/api/executor/TaskSuccessListener.java new file mode 100644 index 000000000..cd3bddcce --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/executor/TaskSuccessListener.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.executor; + +/** + * Task listener invoked when task was succeeded + * + * @author Nikita Koksharov + * + */ +public interface TaskSuccessListener extends TaskListener { + + /** + * Invoked when task was succeeded + * + * @param taskId - id of task + * @param result - result of task + */ + void onSucceeded(String taskId, T result); + +} diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index 470df72b8..58680d1dd 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -17,9 +17,8 @@ package org.redisson.executor; import org.redisson.RedissonExecutorService; import org.redisson.RedissonRemoteService; -import org.redisson.api.RBlockingQueue; -import org.redisson.api.RFuture; -import org.redisson.api.RMap; +import org.redisson.api.*; +import org.redisson.api.executor.*; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncService; @@ -27,10 +26,12 @@ import org.redisson.misc.RPromise; import org.redisson.remote.*; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * @@ -46,6 +47,10 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { private String terminationTopicName; private String schedulerQueueName; private long taskTimeout; + private List startedListeners; + private List finishedListeners; + private List failureListeners; + private List successListeners; public RedissonExecutorRemoteService(Codec codec, String name, CommandAsyncService commandExecutor, String executorId, ConcurrentMap responses) { @@ -82,12 +87,49 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { @Override protected void invokeMethod(Class remoteInterface, RBlockingQueue requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture cancelRequestFuture, AtomicReference responseHolder) { + startedListeners.stream().forEach(l -> l.onStarted(request.getId())); + if (taskTimeout > 0) { commandExecutor.getConnectionManager().getGroup().schedule(() -> { ((RPromise) cancelRequestFuture).trySuccess(new RemoteServiceCancelRequest(true, false)); }, taskTimeout, TimeUnit.MILLISECONDS); } super.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder); + + if (responseHolder.get() instanceof RemoteServiceResponse) { + RemoteServiceResponse response = (RemoteServiceResponse) responseHolder.get(); + if (response.getError() == null) { + successListeners.stream().forEach(l -> l.onSucceeded(request.getId(), response.getResult())); + } else { + failureListeners.stream().forEach(l -> l.onFailed(request.getId(), response.getError())); + } + } else { + failureListeners.stream().forEach(l -> l.onFailed(request.getId(), null)); + } + + finishedListeners.stream().forEach(l -> l.onFinished(request.getId())); + } + + public void setListeners(List listeners) { + startedListeners = listeners.stream() + .filter(x -> x instanceof TaskStartedListener) + .map(x -> (TaskStartedListener) x) + .collect(Collectors.toList()); + + finishedListeners = listeners.stream() + .filter(x -> x instanceof TaskFinishedListener) + .map(x -> (TaskFinishedListener) x) + .collect(Collectors.toList()); + + failureListeners = listeners.stream() + .filter(x -> x instanceof TaskFailureListener) + .map(x -> (TaskFailureListener) x) + .collect(Collectors.toList()); + + successListeners = listeners.stream() + .filter(x -> x instanceof TaskSuccessListener) + .map(x -> (TaskSuccessListener) x) + .collect(Collectors.toList()); } public void setTaskTimeout(long taskTimeout) { diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index dbdab927b..b16ae2789 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -8,13 +8,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Duration; @@ -27,6 +21,8 @@ import org.redisson.Redisson; import org.redisson.RedissonNode; import org.redisson.api.*; import org.redisson.api.annotation.RInject; +import org.redisson.api.executor.TaskFinishedListener; +import org.redisson.api.executor.TaskStartedListener; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; import org.redisson.connection.balancer.RandomLoadBalancer; @@ -312,6 +308,44 @@ public class RedissonExecutorServiceTest extends BaseTest { assertThat(id).hasSize(34); } + @Test + public void testTaskStarted() throws InterruptedException { + RExecutorService executor = redisson.getExecutorService("test1"); + CountDownLatch l = new CountDownLatch(1); + executor.registerWorkers(WorkerOptions.defaults().addListener(new TaskStartedListener() { + @Override + public void onStarted(String taskId) { + assertThat(taskId).isNotEmpty(); + l.countDown(); + } + })); + + RExecutorFuture future = executor.submit(new RunnableTask()); + + l.await(); + + executor.shutdown(); + } + + @Test + public void testTaskFinished() throws InterruptedException { + RExecutorService executor = redisson.getExecutorService("test1"); + CountDownLatch l = new CountDownLatch(1); + executor.registerWorkers(WorkerOptions.defaults().addListener(new TaskFinishedListener() { + @Override + public void onFinished(String taskId) { + assertThat(taskId).isNotEmpty(); + l.countDown(); + } + })); + + RExecutorFuture future = executor.submit(new RunnableTask()); + + l.await(); + + executor.shutdown(); + } + @Test public void testTaskTimeout() throws InterruptedException { RExecutorService executor = redisson.getExecutorService("test1"); @@ -322,6 +356,8 @@ public class RedissonExecutorServiceTest extends BaseTest { Thread.sleep(1050); assertThat(future.isCancelled()).isTrue(); + + executor.shutdown(); } @Test