RExecutorFuture and RScheduledFuture shouldn't be tracked if they weren't stored. #1185

pull/1204/head
Nikita 7 years ago
parent 528506634e
commit 2b8ec9520c

@ -102,7 +102,7 @@ public abstract class BaseRemoteService {
this.responseQueueName = getResponseQueueName(executorId);
}
protected String getResponseQueueName(String executorId) {
public String getResponseQueueName(String executorId) {
return "{remote_response}:" + executorId;
}
@ -215,7 +215,7 @@ public abstract class BaseRemoteService {
final Long ackTimeout = request.getOptions().getAckTimeoutInMillis();
final RemotePromise<Object> result = new RemotePromise<Object>(requestId, getParam(request)) {
final RemotePromise<Object> result = new RemotePromise<Object>(requestId) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
@ -366,10 +366,6 @@ public abstract class BaseRemoteService {
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
}
protected Object getParam(RemoteServiceRequest request) {
return null;
}
private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final String ackName, final RFuture<RRemoteServiceResponse> responseFuture) {
RFuture<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
@ -447,6 +443,10 @@ public abstract class BaseRemoteService {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
List<Result> list = entry.getResponses().get(requestId);
if (list == null) {
return;
}
for (Iterator<Result> iterator = list.iterator(); iterator.hasNext();) {
Result result = iterator.next();
if (result.getPromise() == responseFuture) {
@ -592,7 +592,7 @@ public abstract class BaseRemoteService {
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy,
System.currentTimeMillis());
RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId, null);
RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId);
addAsync(requestQueueName, request, addPromise);
addPromise.getAddFuture().sync();

@ -18,10 +18,12 @@ package org.redisson;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
@ -57,6 +59,7 @@ import org.redisson.command.CommandExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.RedissonExecutorBatchFuture;
import org.redisson.executor.RedissonExecutorFuture;
import org.redisson.executor.RedissonExecutorFutureReference;
import org.redisson.executor.RedissonScheduledFuture;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
@ -71,6 +74,7 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -123,9 +127,13 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String name;
private final String requestQueueName;
private final String responseQueueName;
private final QueueTransferService queueTransferService;
private final String executorId;
private final ConcurrentMap<String, ResponseEntry> responses;
private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<RExecutorFuture<?>>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(PlatformDependent.<RedissonExecutorFutureReference, Boolean>newConcurrentHashMap());
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, String redissonId) {
super();
@ -145,6 +153,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
remoteService = redisson.getRemoteService(name, codec);
requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class);
responseQueueName = ((RedissonRemoteService)remoteService).getResponseQueueName(executorId);
String objectName = requestQueueName;
tasksCounterName = objectName + ":counter";
tasksName = objectName + ":tasks";
@ -261,7 +270,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state, null);
execute(promise);
syncExecute(promise);
}
@Override
@ -437,8 +446,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <T> RExecutorFuture<T> submit(Callable<T> task) {
RemotePromise<T> promise = (RemotePromise<T>) ((PromiseDelegator<T>) submitAsync(task)).getInnerPromise();
execute(promise);
return new RedissonExecutorFuture<T>(promise, promise.getRequestId());
syncExecute(promise);
return createFuture(promise);
}
@Override
@ -448,7 +457,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] state = encode(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
addListener(result);
return new RedissonExecutorFuture<T>(result, result.getRequestId());
return createFuture(result);
}
@Override
@ -465,7 +474,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId());
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
@ -491,7 +500,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId());
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
@ -553,7 +562,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
}
private <T> void execute(RemotePromise<T> promise) {
private <T> void syncExecute(RemotePromise<T> promise) {
RFuture<Boolean> addFuture = promise.getAddFuture();
addFuture.syncUninterruptibly();
Boolean res = addFuture.getNow();
@ -593,7 +602,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise, promise.getRequestId());
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
@ -619,7 +628,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise, promise.getRequestId());
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
@ -653,8 +662,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RExecutorFuture<?> submit(Runnable task) {
RemotePromise<Void> promise = (RemotePromise<Void>) ((PromiseDelegator<Void>) submitAsync(task)).getInnerPromise();
execute(promise);
return new RedissonExecutorFuture<Void>(promise, promise.getRequestId());
syncExecute(promise);
return createFuture(promise);
}
@Override
@ -664,15 +673,61 @@ public class RedissonExecutorService implements RScheduledExecutorService {
byte[] state = encode(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
addListener(result);
return new RedissonExecutorFuture<Void>(result, result.getRequestId());
return createFuture(result);
}
private void cancelResponseHandling(RequestId requestId) {
synchronized (responses) {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}
List<Result> list = entry.getResponses().remove(requestId);
if (list != null) {
for (Result result : list) {
result.getScheduledFuture().cancel(true);
}
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
}
}
@Override
public RScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, delay, unit);
execute((RemotePromise<?>)future.getInnerPromise());
RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future;
}
private <T> RExecutorFuture<T> createFuture(RemotePromise<T> promise) {
RExecutorFuture<T> f = new RedissonExecutorFuture<T>(promise);
storeReference(f, promise.getRequestId());
return f;
}
private <T> RScheduledFuture<T> createFuture(RemotePromise<T> promise, long scheduledExecutionTime) {
RedissonScheduledFuture<T> f = new RedissonScheduledFuture<T>(promise, scheduledExecutionTime);
storeReference(f, promise.getRequestId());
return f;
}
private void storeReference(RExecutorFuture<?> future, RequestId requestId) {
while (true) {
RedissonExecutorFutureReference r = (RedissonExecutorFutureReference) referenceDueue.poll();
if (r == null) {
break;
}
references.remove(r);
cancelResponseHandling(r.getRequestId());
}
RedissonExecutorFutureReference reference = new RedissonExecutorFutureReference(requestId, future, referenceDueue);
references.add(reference);
}
@Override
public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit unit) {
@ -682,13 +737,16 @@ public class RedissonExecutorService implements RScheduledExecutorService {
long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(task.getClass().getName(), classBody, state, startTime, null);
addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime);
return createFuture(result, startTime);
}
@Override
public <V> RScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
RedissonScheduledFuture<V> future = (RedissonScheduledFuture<V>) scheduleAsync(task, delay, unit);
execute((RemotePromise<V>)future.getInnerPromise());
RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future;
}
@ -700,13 +758,15 @@ public class RedissonExecutorService implements RScheduledExecutorService {
long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(task.getClass().getName(), classBody, state, startTime, null);
addListener(result);
return new RedissonScheduledFuture<V>(result, startTime);
return createFuture(result, startTime);
}
@Override
public RScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAtFixedRateAsync(task, initialDelay, period, unit);
execute((RemotePromise<?>)future.getInnerPromise());
RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future;
}
@ -718,13 +778,15 @@ public class RedissonExecutorService implements RScheduledExecutorService {
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(task.getClass().getName(), classBody, state, startTime, unit.toMillis(period), executorId, null);
addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime);
return createFuture(result, startTime);
}
@Override
public RScheduledFuture<?> schedule(Runnable task, CronSchedule cronSchedule) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleAsync(task, cronSchedule);
execute((RemotePromise<?>)future.getInnerPromise());
RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future;
}
@ -737,17 +799,21 @@ public class RedissonExecutorService implements RScheduledExecutorService {
long startTime = startDate.getTime();
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression(), executorId, null);
addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime) {
RedissonScheduledFuture<Void> f = new RedissonScheduledFuture<Void>(result, startTime) {
public long getDelay(TimeUnit unit) {
return unit.convert(startDate.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
};
};
storeReference(f, result.getRequestId());
return f;
}
@Override
public RScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
RedissonScheduledFuture<?> future = (RedissonScheduledFuture<?>) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit);
execute((RemotePromise<?>)future.getInnerPromise());
RemotePromise<?> rp = (RemotePromise<?>)future.getInnerPromise();
syncExecute(rp);
storeReference(future, rp.getRequestId());
return future;
}
@ -759,7 +825,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(task.getClass().getName(), classBody, state, startTime, unit.toMillis(delay), executorId, null);
addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime);
return createFuture(result, startTime);
}
@Override

@ -30,6 +30,10 @@ public class RedissonExecutorFuture<V> extends PromiseDelegator<V> implements RE
private final RequestId taskId;
public RedissonExecutorFuture(RemotePromise<V> promise) {
this(promise, promise.getRequestId());
}
public RedissonExecutorFuture(RPromise<V> promise, RequestId taskId) {
super(promise);
this.taskId = taskId;

@ -0,0 +1,42 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import org.redisson.api.RExecutorFuture;
import org.redisson.remote.RequestId;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonExecutorFutureReference extends WeakReference<RExecutorFuture<?>> {
private RequestId requestId;
public RedissonExecutorFutureReference(RequestId requestId, RExecutorFuture<?> referent, ReferenceQueue<? super RExecutorFuture<?>> q) {
super(referent, q);
this.requestId = requestId;
}
public RequestId getRequestId() {
return requestId;
}
}

@ -26,18 +26,12 @@ import org.redisson.remote.RequestId;
*/
public class RemotePromise<T> extends RedissonPromise<T> {
private final Object param;
private final RequestId requestId;
private RFuture<Boolean> addFuture;
public RemotePromise(RequestId requestId, Object param) {
public RemotePromise(RequestId requestId) {
super();
this.requestId = requestId;
this.param = param;
}
public <P> P getParam() {
return (P) param;
}
public RequestId getRequestId() {

@ -17,17 +17,14 @@ package org.redisson.executor;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
@ -114,36 +111,6 @@ public class ScheduledTasksService extends TasksService {
startTime, request.getId(), encode(request));
}
@Override
protected Object getParam(RemoteServiceRequest request) {
Long startTime = 0L;
if (request != null && request.getArgs() != null && request.getArgs().length > 3) {
startTime = (Long)request.getArgs()[3];
}
return startTime;
}
@Override
protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RFuture<RRemoteServiceResponse> responseFuture) {
if (!optionsCopy.isResultExpected()) {
return;
}
long startTime = result.getParam();
long delay = startTime - System.currentTimeMillis();
if (delay > 0) {
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, responseFuture);
}
}, (long)(delay - delay*0.10), TimeUnit.MILLISECONDS);
} else {
super.awaitResultAsync(optionsCopy, result, responseFuture);
}
}
@Override
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

@ -1,59 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class RedissonThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final UncaughtExceptionHandler exceptionHandler;
public RedissonThreadFactory() {
this(null);
}
public RedissonThreadFactory(UncaughtExceptionHandler exceptionHandler) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "redisson-node-" + poolNumber.getAndIncrement() + "-thread-";
this.exceptionHandler = exceptionHandler;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}
}
Loading…
Cancel
Save