From 2b8ec9520cf6ef7b952c7192fbf37c30e72c226e Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 11 Dec 2017 17:32:11 +0300 Subject: [PATCH] RExecutorFuture and RScheduledFuture shouldn't be tracked if they weren't stored. #1185 --- .../java/org/redisson/BaseRemoteService.java | 14 +-- .../org/redisson/RedissonExecutorService.java | 110 ++++++++++++++---- .../executor/RedissonExecutorFuture.java | 4 + .../RedissonExecutorFutureReference.java | 42 +++++++ .../org/redisson/executor/RemotePromise.java | 8 +- .../executor/ScheduledTasksService.java | 33 ------ .../redisson/misc/RedissonThreadFactory.java | 59 ---------- 7 files changed, 142 insertions(+), 128 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java delete mode 100644 redisson/src/main/java/org/redisson/misc/RedissonThreadFactory.java diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 9344b1b0f..6f292f716 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -102,7 +102,7 @@ public abstract class BaseRemoteService { this.responseQueueName = getResponseQueueName(executorId); } - protected String getResponseQueueName(String executorId) { + public String getResponseQueueName(String executorId) { return "{remote_response}:" + executorId; } @@ -215,7 +215,7 @@ public abstract class BaseRemoteService { final Long ackTimeout = request.getOptions().getAckTimeoutInMillis(); - final RemotePromise result = new RemotePromise(requestId, getParam(request)) { + final RemotePromise result = new RemotePromise(requestId) { @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -366,10 +366,6 @@ public abstract class BaseRemoteService { return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler); } - protected Object getParam(RemoteServiceRequest request) { - return null; - } - private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, final String ackName, final RFuture responseFuture) { RFuture deleteFuture = redisson.getBucket(ackName).deleteAsync(); @@ -447,6 +443,10 @@ public abstract class BaseRemoteService { synchronized (responses) { ResponseEntry entry = responses.get(responseQueueName); List list = entry.getResponses().get(requestId); + if (list == null) { + return; + } + for (Iterator iterator = list.iterator(); iterator.hasNext();) { Result result = iterator.next(); if (result.getPromise() == responseFuture) { @@ -592,7 +592,7 @@ public abstract class BaseRemoteService { RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy, System.currentTimeMillis()); - RemotePromise addPromise = new RemotePromise(requestId, null); + RemotePromise addPromise = new RemotePromise(requestId); addAsync(requestQueueName, request, addPromise); addPromise.getAddFuture().sync(); diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index aebfe6f57..c2c3a8cff 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -18,10 +18,12 @@ package org.redisson; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.ref.ReferenceQueue; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -57,6 +59,7 @@ import org.redisson.command.CommandExecutor; import org.redisson.connection.ConnectionManager; import org.redisson.executor.RedissonExecutorBatchFuture; import org.redisson.executor.RedissonExecutorFuture; +import org.redisson.executor.RedissonExecutorFutureReference; import org.redisson.executor.RedissonScheduledFuture; import org.redisson.executor.RemoteExecutorService; import org.redisson.executor.RemoteExecutorServiceAsync; @@ -71,6 +74,7 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; +import org.redisson.remote.ResponseEntry.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,9 +127,13 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final String name; private final String requestQueueName; + private final String responseQueueName; private final QueueTransferService queueTransferService; private final String executorId; private final ConcurrentMap responses; + + private final ReferenceQueue> referenceDueue = new ReferenceQueue>(); + private final Collection references = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap responses, String redissonId) { super(); @@ -145,6 +153,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { remoteService = redisson.getRemoteService(name, codec); requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class); + responseQueueName = ((RedissonRemoteService)remoteService).getResponseQueueName(executorId); String objectName = requestQueueName; tasksCounterName = objectName + ":counter"; tasksName = objectName + ":tasks"; @@ -261,7 +270,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { byte[] classBody = getClassBody(task); byte[] state = encode(task); RemotePromise promise = (RemotePromise)asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state, null); - execute(promise); + syncExecute(promise); } @Override @@ -437,8 +446,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RExecutorFuture submit(Callable task) { RemotePromise promise = (RemotePromise) ((PromiseDelegator) submitAsync(task)).getInnerPromise(); - execute(promise); - return new RedissonExecutorFuture(promise, promise.getRequestId()); + syncExecute(promise); + return createFuture(promise); } @Override @@ -448,7 +457,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { byte[] state = encode(task); RemotePromise result = (RemotePromise) asyncService.executeCallable(task.getClass().getName(), classBody, state, null); addListener(result); - return new RedissonExecutorFuture(result, result.getRequestId()); + return createFuture(result); } @Override @@ -465,7 +474,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { byte[] classBody = getClassBody(task); byte[] state = encode(task); RemotePromise promise = (RemotePromise)asyncService.executeCallable(task.getClass().getName(), classBody, state, null); - RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); + RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -491,7 +500,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { byte[] classBody = getClassBody(task); byte[] state = encode(task); RemotePromise promise = (RemotePromise)asyncService.executeCallable(task.getClass().getName(), classBody, state, null); - RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); + RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -553,7 +562,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } } - private void execute(RemotePromise promise) { + private void syncExecute(RemotePromise promise) { RFuture addFuture = promise.getAddFuture(); addFuture.syncUninterruptibly(); Boolean res = addFuture.getNow(); @@ -593,7 +602,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { byte[] classBody = getClassBody(task); byte[] state = encode(task); RemotePromise promise = (RemotePromise)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null); - RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); + RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -619,7 +628,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { byte[] classBody = getClassBody(task); byte[] state = encode(task); RemotePromise promise = (RemotePromise)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null); - RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId()); + RedissonExecutorFuture executorFuture = new RedissonExecutorFuture(promise); result.add(executorFuture); } @@ -653,8 +662,8 @@ public class RedissonExecutorService implements RScheduledExecutorService { @Override public RExecutorFuture submit(Runnable task) { RemotePromise promise = (RemotePromise) ((PromiseDelegator) submitAsync(task)).getInnerPromise(); - execute(promise); - return new RedissonExecutorFuture(promise, promise.getRequestId()); + syncExecute(promise); + return createFuture(promise); } @Override @@ -664,15 +673,61 @@ public class RedissonExecutorService implements RScheduledExecutorService { byte[] state = encode(task); RemotePromise result = (RemotePromise) asyncService.executeRunnable(task.getClass().getName(), classBody, state, null); addListener(result); - return new RedissonExecutorFuture(result, result.getRequestId()); + return createFuture(result); + } + + private void cancelResponseHandling(RequestId requestId) { + synchronized (responses) { + ResponseEntry entry = responses.get(responseQueueName); + if (entry == null) { + return; + } + + List list = entry.getResponses().remove(requestId); + if (list != null) { + for (Result result : list) { + result.getScheduledFuture().cancel(true); + } + } + if (entry.getResponses().isEmpty()) { + responses.remove(responseQueueName, entry); + } + } } @Override public RScheduledFuture schedule(Runnable task, long delay, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, delay, unit); - execute((RemotePromise)future.getInnerPromise()); + RemotePromise rp = (RemotePromise)future.getInnerPromise(); + syncExecute(rp); + storeReference(future, rp.getRequestId()); return future; } + + private RExecutorFuture createFuture(RemotePromise promise) { + RExecutorFuture f = new RedissonExecutorFuture(promise); + storeReference(f, promise.getRequestId()); + return f; + } + + private RScheduledFuture createFuture(RemotePromise promise, long scheduledExecutionTime) { + RedissonScheduledFuture f = new RedissonScheduledFuture(promise, scheduledExecutionTime); + storeReference(f, promise.getRequestId()); + return f; + } + + private void storeReference(RExecutorFuture future, RequestId requestId) { + while (true) { + RedissonExecutorFutureReference r = (RedissonExecutorFutureReference) referenceDueue.poll(); + if (r == null) { + break; + } + references.remove(r); + cancelResponseHandling(r.getRequestId()); + } + RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue); + references.add(reference); + } @Override public RScheduledFuture scheduleAsync(Runnable task, long delay, TimeUnit unit) { @@ -682,13 +737,16 @@ public class RedissonExecutorService implements RScheduledExecutorService { long startTime = System.currentTimeMillis() + unit.toMillis(delay); RemotePromise result = (RemotePromise) asyncScheduledService.scheduleRunnable(task.getClass().getName(), classBody, state, startTime, null); addListener(result); - return new RedissonScheduledFuture(result, startTime); + + return createFuture(result, startTime); } @Override public RScheduledFuture schedule(Callable task, long delay, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, delay, unit); - execute((RemotePromise)future.getInnerPromise()); + RemotePromise rp = (RemotePromise)future.getInnerPromise(); + syncExecute(rp); + storeReference(future, rp.getRequestId()); return future; } @@ -700,13 +758,15 @@ public class RedissonExecutorService implements RScheduledExecutorService { long startTime = System.currentTimeMillis() + unit.toMillis(delay); RemotePromise result = (RemotePromise) asyncScheduledService.scheduleCallable(task.getClass().getName(), classBody, state, startTime, null); addListener(result); - return new RedissonScheduledFuture(result, startTime); + return createFuture(result, startTime); } @Override public RScheduledFuture scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAtFixedRateAsync(task, initialDelay, period, unit); - execute((RemotePromise)future.getInnerPromise()); + RemotePromise rp = (RemotePromise)future.getInnerPromise(); + syncExecute(rp); + storeReference(future, rp.getRequestId()); return future; } @@ -718,13 +778,15 @@ public class RedissonExecutorService implements RScheduledExecutorService { long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); RemotePromise result = (RemotePromise) asyncScheduledServiceAtFixed.scheduleAtFixedRate(task.getClass().getName(), classBody, state, startTime, unit.toMillis(period), executorId, null); addListener(result); - return new RedissonScheduledFuture(result, startTime); + return createFuture(result, startTime); } @Override public RScheduledFuture schedule(Runnable task, CronSchedule cronSchedule) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, cronSchedule); - execute((RemotePromise)future.getInnerPromise()); + RemotePromise rp = (RemotePromise)future.getInnerPromise(); + syncExecute(rp); + storeReference(future, rp.getRequestId()); return future; } @@ -737,17 +799,21 @@ public class RedissonExecutorService implements RScheduledExecutorService { long startTime = startDate.getTime(); RemotePromise result = (RemotePromise) asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression(), executorId, null); addListener(result); - return new RedissonScheduledFuture(result, startTime) { + RedissonScheduledFuture f = new RedissonScheduledFuture(result, startTime) { public long getDelay(TimeUnit unit) { return unit.convert(startDate.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }; }; + storeReference(f, result.getRequestId()); + return f; } @Override public RScheduledFuture scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit); - execute((RemotePromise)future.getInnerPromise()); + RemotePromise rp = (RemotePromise)future.getInnerPromise(); + syncExecute(rp); + storeReference(future, rp.getRequestId()); return future; } @@ -759,7 +825,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay); RemotePromise result = (RemotePromise) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(task.getClass().getName(), classBody, state, startTime, unit.toMillis(delay), executorId, null); addListener(result); - return new RedissonScheduledFuture(result, startTime); + return createFuture(result, startTime); } @Override diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFuture.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFuture.java index 5a27b0d17..4d14cfd90 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFuture.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFuture.java @@ -30,6 +30,10 @@ public class RedissonExecutorFuture extends PromiseDelegator implements RE private final RequestId taskId; + public RedissonExecutorFuture(RemotePromise promise) { + this(promise, promise.getRequestId()); + } + public RedissonExecutorFuture(RPromise promise, RequestId taskId) { super(promise); this.taskId = taskId; diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java new file mode 100644 index 000000000..622f34ead --- /dev/null +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorFutureReference.java @@ -0,0 +1,42 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; + +import org.redisson.api.RExecutorFuture; +import org.redisson.remote.RequestId; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonExecutorFutureReference extends WeakReference> { + + private RequestId requestId; + + public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture referent, ReferenceQueue> q) { + super(referent, q); + this.requestId = requestId; + } + + public RequestId getRequestId() { + return requestId; + } + +} diff --git a/redisson/src/main/java/org/redisson/executor/RemotePromise.java b/redisson/src/main/java/org/redisson/executor/RemotePromise.java index 1925fdd42..477888a0c 100644 --- a/redisson/src/main/java/org/redisson/executor/RemotePromise.java +++ b/redisson/src/main/java/org/redisson/executor/RemotePromise.java @@ -26,18 +26,12 @@ import org.redisson.remote.RequestId; */ public class RemotePromise extends RedissonPromise { - private final Object param; private final RequestId requestId; private RFuture addFuture; - public RemotePromise(RequestId requestId, Object param) { + public RemotePromise(RequestId requestId) { super(); this.requestId = requestId; - this.param = param; - } - - public

P getParam() { - return (P) param; } public RequestId getRequestId() { diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index 9107a706b..7354ad62c 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -17,17 +17,14 @@ package org.redisson.executor; import java.util.Arrays; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import org.redisson.RedissonExecutorService; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; -import org.redisson.api.RemoteInvocationOptions; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandExecutor; -import org.redisson.remote.RRemoteServiceResponse; import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.RequestId; import org.redisson.remote.ResponseEntry; @@ -114,36 +111,6 @@ public class ScheduledTasksService extends TasksService { startTime, request.getId(), encode(request)); } - @Override - protected Object getParam(RemoteServiceRequest request) { - Long startTime = 0L; - if (request != null && request.getArgs() != null && request.getArgs().length > 3) { - startTime = (Long)request.getArgs()[3]; - } - return startTime; - } - - @Override - protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, - final RFuture responseFuture) { - if (!optionsCopy.isResultExpected()) { - return; - } - - long startTime = result.getParam(); - long delay = startTime - System.currentTimeMillis(); - if (delay > 0) { - commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { - @Override - public void run() { - ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, responseFuture); - } - }, (long)(delay - delay*0.10), TimeUnit.MILLISECONDS); - } else { - super.awaitResultAsync(optionsCopy, result, responseFuture); - } - } - @Override protected RFuture removeAsync(String requestQueueName, RequestId taskId) { return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, diff --git a/redisson/src/main/java/org/redisson/misc/RedissonThreadFactory.java b/redisson/src/main/java/org/redisson/misc/RedissonThreadFactory.java deleted file mode 100644 index 3c78a3d88..000000000 --- a/redisson/src/main/java/org/redisson/misc/RedissonThreadFactory.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright 2016 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.misc; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -public class RedissonThreadFactory implements ThreadFactory { - - private static final AtomicInteger poolNumber = new AtomicInteger(1); - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - private final UncaughtExceptionHandler exceptionHandler; - - public RedissonThreadFactory() { - this(null); - } - - public RedissonThreadFactory(UncaughtExceptionHandler exceptionHandler) { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - namePrefix = "redisson-node-" + poolNumber.getAndIncrement() + "-thread-"; - this.exceptionHandler = exceptionHandler; - } - - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, - namePrefix + threadNumber.getAndIncrement(), - 0); - if (t.isDaemon()) { - t.setDaemon(false); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - if (exceptionHandler != null) { - t.setUncaughtExceptionHandler(exceptionHandler); - } - - return t; - } - -}