ExecutorService implementation. #208

pull/574/merge
Nikita 9 years ago
parent b08181ceb6
commit 7646d0b1a3

@ -40,6 +40,7 @@ import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RDeque;
import org.redisson.api.RExecutorService;
import org.redisson.api.RGeo;
import org.redisson.api.RHyperLogLog;
import org.redisson.api.RKeys;
@ -67,6 +68,7 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.SerializationCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandSyncService;
import org.redisson.config.Config;
@ -365,6 +367,15 @@ public class Redisson implements RedissonClient {
return new RedissonScript(commandExecutor);
}
@Override
public RExecutorService getExecutorService() {
return new RedissonExecutorService(new SerializationCodec(), commandExecutor, this);
}
public RExecutorService getExecutorService(String name) {
return new RedissonExecutorService(new SerializationCodec(), commandExecutor, this, name);
}
@Override
public RRemoteService getRemoteSerivce() {
return new RedissonRemoteService(this, commandExecutor);

@ -39,6 +39,7 @@ import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RDeque;
import org.redisson.api.RExecutorService;
import org.redisson.api.RGeo;
import org.redisson.api.RHyperLogLog;
import org.redisson.api.RKeys;
@ -611,6 +612,8 @@ public interface RedissonClient {
*/
RScript getScript();
RExecutorService getExecutorService();
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
*

@ -0,0 +1,466 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.MessageListener;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBucket;
import org.redisson.api.RExecutorService;
import org.redisson.api.RKeys;
import org.redisson.api.RRemoteService;
import org.redisson.api.RTopic;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.ExecutorRemoteService;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
import org.redisson.executor.RemoteExecutorServiceImpl;
import org.redisson.executor.RemotePromise;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
public class RedissonExecutorService implements RExecutorService {
public static final int SHUTDOWN_STATE = 1;
public static final int TERMINATED_STATE = 2;
private final CommandExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Codec codec;
private final Redisson redisson;
private final RAtomicLong tasksCounter;
private final RBucket<Integer> status;
private final RTopic<Integer> topic;
private final RKeys keys;
private final RemoteExecutorServiceAsync asyncService;
private final RemoteExecutorServiceAsync asyncServiceWithoutResult;
private final Map<Class<?>, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap();
private final String name;
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson) {
this(codec, commandExecutor, redisson, "redisson_default_executor");
}
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name) {
super();
this.codec = codec;
this.commandExecutor = commandExecutor;
this.connectionManager = commandExecutor.getConnectionManager();
this.name = name;
this.redisson = redisson;
String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}";
tasksCounter = redisson.getAtomicLong(objectName + ":counter");
status = redisson.getBucket(objectName + ":status");
topic = redisson.getTopic(objectName + ":topic");
keys = redisson.getKeys();
RRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor);
asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck());
asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
}
public void register() {
redisson.getRemoteSerivce(name, codec).register(RemoteExecutorService.class,
new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, name));
}
@Override
public void execute(Runnable task) {
byte[] classBody = getClassBody(task);
try {
byte[] state = codec.getValueEncoder().encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeVoid(task.getClass().getName(), classBody, state);
check(promise);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private byte[] getClassBody(Object task) {
Class<?> c = task.getClass();
byte[] classBody = class2bytes.get(c);
if (classBody == null) {
String className = c.getName();
String classAsPath = className.replace('.', '/') + ".class";
InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath);
DataInputStream s = new DataInputStream(classStream);
try {
classBody = new byte[s.available()];
s.readFully(classBody);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
class2bytes.put(c, classBody);
}
return classBody;
}
@Override
public void shutdown() {
commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"if redis.call('exists', KEYS[2]) == 0 then "
+ "if redis.call('get', KEYS[1]) == 0 or redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "else "
+ "redis.call('set', KEYS[2], ARGV[1]);"
+ "end;"
+ "end;",
Arrays.<Object>asList(tasksCounter.getName(), status.getName(), topic.getChannelNames().get(0)),
SHUTDOWN_STATE, TERMINATED_STATE);
}
public String getName() {
return name;
}
public boolean delete() {
return keys.delete(status.getName(), tasksCounter.getName()) > 0;
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShutdown() {
return status.isExists() && status.get() >= SHUTDOWN_STATE;
}
@Override
public boolean isTerminated() {
return status.isExists() && status.get() == TERMINATED_STATE;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (isTerminated()) {
return true;
}
final CountDownLatch latch = new CountDownLatch(1);
MessageListener<Integer> listener = new MessageListener<Integer>() {
@Override
public void onMessage(String channel, Integer msg) {
if (msg == TERMINATED_STATE) {
latch.countDown();
}
}
};
int listenerId = topic.addListener(listener);
if (isTerminated()) {
topic.removeListener(listenerId);
return true;
}
boolean res = latch.await(timeout, unit);
topic.removeListener(listenerId);
return res;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
byte[] classBody = getClassBody(task);
try {
byte[] state = codec.getValueEncoder().encode(task);
RemotePromise<T> promise = (RemotePromise<T>)asyncService.execute(task.getClass().getName(), classBody, state);
check(promise);
return promise;
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private <T> void check(RemotePromise<T> promise) {
io.netty.util.concurrent.Future<Boolean> addFuture = promise.getAddFuture();
Boolean res = addFuture.awaitUninterruptibly().getNow();
if (!res) {
throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
}
}
@Override
public <T> Future<T> submit(Runnable task, final T result) {
final Promise<T> resultFuture = connectionManager.newPromise();
io.netty.util.concurrent.Future<Object> future = (io.netty.util.concurrent.Future<Object>) submit(task);
future.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Object> future) throws Exception {
if (!future.isSuccess()) {
resultFuture.setFailure(future.cause());
return;
}
resultFuture.setSuccess(result);
}
});
return resultFuture;
}
@Override
public Future<?> submit(Runnable task) {
byte[] classBody = getClassBody(task);
try {
byte[] state = codec.getValueEncoder().encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeVoid(task.getClass().getName(), classBody, state);
check(promise);
return promise;
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long millis) throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null) {
throw new NullPointerException();
}
int ntasks = tasks.size();
if (ntasks == 0) {
throw new IllegalArgumentException();
}
List<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
try {
ExecutionException ee = null;
long lastTime = timed ? System.currentTimeMillis() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = poll(futures);
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = poll(futures, millis, TimeUnit.MILLISECONDS);
if (f == null)
throw new TimeoutException();
long now = System.currentTimeMillis();
millis -= now - lastTime;
lastTime = now;
}
else
f = poll(futures, -1, null);
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException("No tasks were finised", null);
throw ee;
} finally {
for (Future<T> f : futures)
f.cancel(true);
}
}
private <T> Future<T> poll(List<Future<T>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Future<T>> result = new AtomicReference<Future<T>>();
FutureListener<T> listener = new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
latch.countDown();
result.compareAndSet(null, future);
}
};
for (Future<T> future : futures) {
io.netty.util.concurrent.Future<T> f = (io.netty.util.concurrent.Future<T>) future;
f.addListener(listener);
}
if (timeout == -1) {
latch.await();
} else {
latch.await(timeout, timeUnit);
}
for (Future<T> future : futures) {
io.netty.util.concurrent.Future<T> f = (io.netty.util.concurrent.Future<T>) future;
f.removeListener(listener);
}
return result.get();
}
private <T> Future<T> poll(List<Future<T>> futures) {
for (Future<T> future : futures) {
if (future.isDone()) {
return future;
}
}
return null;
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toMillis(timeout));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
if (tasks == null) {
throw new NullPointerException();
}
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
Future<T> future = submit(t);
futures.add(future);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException {
if (tasks == null || unit == null) {
throw new NullPointerException();
}
long millis = unit.toMillis(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
long lastTime = System.currentTimeMillis();
for (Callable<T> task : tasks) {
Future<T> future = submit(task);
futures.add(future);
long now = System.currentTimeMillis();
millis -= now - lastTime;
lastTime = now;
if (millis <= 0) {
int remainFutures = tasks.size() - futures.size();
for (int i = 0; i < remainFutures; i++) {
Promise<T> cancelledFuture = connectionManager.newPromise();
cancelledFuture.cancel(true);
futures.add(cancelledFuture);
}
return futures;
}
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (millis <= 0)
return futures;
try {
f.get(millis, TimeUnit.MILLISECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.currentTimeMillis();
millis -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
}

@ -31,10 +31,12 @@ import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScript;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.RScript.Mode;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.remote.RRemoteAsync;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
@ -67,9 +69,9 @@ public class RedissonRemoteService implements RRemoteService {
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Codec codec;
private final Redisson redisson;
private final String name;
private final CommandExecutor commandExecutor;
protected final Redisson redisson;
protected final String name;
protected final CommandExecutor commandExecutor;
public RedissonRemoteService(Redisson redisson, CommandExecutor commandExecutor) {
this(redisson, "redisson_remote_service", commandExecutor);
@ -122,7 +124,7 @@ public class RedissonRemoteService implements RRemoteService {
return redisson.getConfig().getCodec();
}
private byte[] encode(Object obj) {
protected byte[] encode(Object obj) {
try {
return getCodec().getValueEncoder().encode(obj);
} catch (IOException e) {
@ -247,6 +249,7 @@ public class RedissonRemoteService implements RRemoteService {
.expectResultWithin(executionTimeout, executionTimeUnit));
}
@Override
public <T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit,
long ackTimeout, TimeUnit ackTimeUnit) {
return get(remoteInterface, RemoteInvocationOptions.defaults()
@ -254,6 +257,7 @@ public class RedissonRemoteService implements RRemoteService {
.expectResultWithin(executionTimeout, executionTimeUnit));
}
@Override
public <T> T get(Class<T> remoteInterface, RemoteInvocationOptions options) {
for (Annotation annotation : remoteInterface.getAnnotations()) {
if (annotation.annotationType() == RRemoteAsync.class) {
@ -310,7 +314,7 @@ public class RedissonRemoteService implements RRemoteService {
final RemoteServiceRequest request = new RemoteServiceRequest(requestId,
method.getName(), args, optionsCopy, System.currentTimeMillis());
final Promise<Object> result = new PromiseDelegator<Object>(ImmediateEventExecutor.INSTANCE.newPromise()) {
final RemotePromise<Object> result = new RemotePromise<Object>(ImmediateEventExecutor.INSTANCE.newPromise()) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (optionsCopy.isAckExpected()) {
@ -332,7 +336,8 @@ public class RedissonRemoteService implements RRemoteService {
}
};
Future<Boolean> addFuture = requestQueue.addAsync(request);
Future<Boolean> addFuture = addAsync(requestQueue, request);
result.setAddFuture(addFuture);
addFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -587,9 +592,13 @@ public class RedissonRemoteService implements RRemoteService {
private <T extends RRemoteServiceResponse> Future<List<?>> send(long timeout, String responseName, T response) {
RBatch batch = redisson.createBatch();
RBlockingQueueAsync<T> queue = batch.getBlockingQueue(responseName);
RBlockingQueueAsync<T> queue = batch.getBlockingQueue(responseName, getCodec());
queue.putAsync(response);
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
return batch.executeAsync();
}
protected Future<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request) {
return requestQueue.addAsync(request);
}
}

@ -0,0 +1,28 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.ExecutorService;
public interface RExecutorService extends ExecutorService {
String getName();
boolean delete();
void register();
}

@ -37,7 +37,8 @@ public class JsonJacksonMapValueCodec<T> extends JsonJacksonCodec {
public JsonJacksonMapValueCodec(TypeReference<T> typeReference) {
this.typeReference = typeReference;
this.mapper = initObjectMapper();
this.mapper = new ObjectMapper();
init(this.mapper);
}
@Override

@ -50,9 +50,13 @@ public class StringCodec implements Codec {
public StringCodec() {
this(CharsetUtil.UTF_8);
}
public StringCodec(ClassLoader classLoader) {
this();
}
public StringCodec(String charsetName) {
this(Charset.forName(charsetName));
this(Charset.forName(charsetName));
}
public StringCodec(Charset charset) {

@ -24,8 +24,13 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
* @date 2015-10-16
*/
public class CborJacksonCodec extends JsonJacksonCodec {
@Override
protected ObjectMapper initObjectMapper() {
return new ObjectMapper(new CBORFactory());
public CborJacksonCodec() {
super(new ObjectMapper(new CBORFactory()));
}
public CborJacksonCodec(ClassLoader classLoader) {
super(createObjectMapper(classLoader, new ObjectMapper(new CBORFactory())));
}
}

@ -0,0 +1,42 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectStreamClass;
public class CustomObjectInputStream extends ObjectInputStream {
private ClassLoader classLoader;
public CustomObjectInputStream(ClassLoader classLoader, InputStream in) throws IOException {
super(in);
this.classLoader = classLoader;
}
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
try {
String name = desc.getName();
return Class.forName(name, false, classLoader);
} catch (ClassNotFoundException e) {
return super.resolveClass(desc);
}
}
}

@ -45,6 +45,16 @@ public class FstCodec implements Codec {
public FstCodec() {
this(FSTConfiguration.createDefaultConfiguration());
}
public FstCodec(ClassLoader classLoader) {
this(createConfig(classLoader));
}
private static FSTConfiguration createConfig(ClassLoader classLoader) {
FSTConfiguration def = FSTConfiguration.createDefaultConfiguration();
def.setClassLoader(classLoader);
return def;
}
public FstCodec(FSTConfiguration fstConfiguration) {
config = fstConfiguration;

@ -15,6 +15,13 @@
*/
package org.redisson.codec;
import java.io.IOException;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
@ -29,14 +36,10 @@ import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
import com.fasterxml.jackson.databind.type.TypeFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
/**
*
@ -56,11 +59,7 @@ public class JsonJacksonCodec implements Codec {
}
private final ObjectMapper mapObjectMapper = initObjectMapper();
protected ObjectMapper initObjectMapper() {
return new ObjectMapper();
}
private final ObjectMapper mapObjectMapper;
private final Encoder encoder = new Encoder() {
@Override
@ -75,8 +74,23 @@ public class JsonJacksonCodec implements Codec {
return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class);
}
};
public JsonJacksonCodec() {
this(new ObjectMapper());
}
public JsonJacksonCodec(ClassLoader classLoader) {
this(createObjectMapper(classLoader, new ObjectMapper()));
}
protected static ObjectMapper createObjectMapper(ClassLoader classLoader, ObjectMapper om) {
TypeFactory tf = TypeFactory.defaultInstance().withClassLoader(classLoader);
om.setTypeFactory(tf);
return om;
}
public JsonJacksonCodec(ObjectMapper mapObjectMapper) {
this.mapObjectMapper = mapObjectMapper;
init(mapObjectMapper);
// type info inclusion
TypeResolverBuilder<?> mapTyper = new DefaultTypeResolverBuilder(DefaultTyping.NON_FINAL) {
@ -107,6 +121,13 @@ public class JsonJacksonCodec implements Codec {
mapTyper.init(JsonTypeInfo.Id.CLASS, null);
mapTyper.inclusion(JsonTypeInfo.As.PROPERTY);
mapObjectMapper.setDefaultTyping(mapTyper);
// warm up codec
try {
mapObjectMapper.readValue("1".getBytes(), Object.class);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
protected void init(ObjectMapper objectMapper) {

@ -48,9 +48,11 @@ public class KryoCodec implements Codec {
private final Queue<Kryo> objects = new ConcurrentLinkedQueue<Kryo>();
private final List<Class<?>> classes;
private final ClassLoader classLoader;
public KryoPoolImpl(List<Class<?>> classes) {
public KryoPoolImpl(List<Class<?>> classes, ClassLoader classLoader) {
this.classes = classes;
this.classLoader = classLoader;
}
public Kryo get() {
@ -72,6 +74,9 @@ public class KryoCodec implements Codec {
*/
protected Kryo createInstance() {
Kryo kryo = new Kryo();
if (classLoader != null) {
kryo.setClassLoader(classLoader);
}
kryo.setReferences(false);
for (Class<?> clazz : classes) {
kryo.register(clazz);
@ -139,13 +144,20 @@ public class KryoCodec implements Codec {
};
public KryoCodec() {
this(new KryoPoolImpl(Collections.<Class<?>>emptyList()));
this(Collections.<Class<?>>emptyList());
}
public KryoCodec(ClassLoader classLoader) {
this(Collections.<Class<?>>emptyList(), classLoader);
}
public KryoCodec(List<Class<?>> classes) {
this(new KryoPoolImpl(classes));
this(classes, null);
}
public KryoCodec(List<Class<?>> classes, ClassLoader classLoader) {
this(new KryoPoolImpl(classes, classLoader));
}
public KryoCodec(KryoPool kryoPool) {
this.kryoPool = kryoPool;

@ -27,9 +27,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
*/
public class MsgPackJacksonCodec extends JsonJacksonCodec {
@Override
protected ObjectMapper initObjectMapper() {
return new ObjectMapper(new MessagePackFactory());
public MsgPackJacksonCodec() {
super(new ObjectMapper(new MessagePackFactory()));
}
public MsgPackJacksonCodec(ClassLoader classLoader) {
super(createObjectMapper(classLoader, new ObjectMapper(new MessagePackFactory())));
}
}

@ -39,7 +39,13 @@ public class SerializationCodec implements Codec {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
try {
ObjectInputStream inputStream = new ObjectInputStream(new ByteBufInputStream(buf));
ByteBufInputStream in = new ByteBufInputStream(buf);
ObjectInputStream inputStream;
if (classLoader != null) {
inputStream = new CustomObjectInputStream(classLoader, in);
} else {
inputStream = new ObjectInputStream(in);
}
return inputStream.readObject();
} catch (IOException e) {
throw e;
@ -60,7 +66,17 @@ public class SerializationCodec implements Codec {
return result.toByteArray();
}
};
private final ClassLoader classLoader;
public SerializationCodec() {
this(null);
}
public SerializationCodec(ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public Decoder<Object> getMapValueDecoder() {
return getValueDecoder();

@ -0,0 +1,79 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Enumeration;
public class ClassLoaderDelegator extends ClassLoader {
private final ThreadLocal<ClassLoader> threadLocalClassLoader = new ThreadLocal<ClassLoader>();
public void setCurrentClassLoader(ClassLoader classLoader) {
threadLocalClassLoader.set(classLoader);
}
public int hashCode() {
return threadLocalClassLoader.get().hashCode();
}
public boolean equals(Object obj) {
return threadLocalClassLoader.get().equals(obj);
}
public String toString() {
return threadLocalClassLoader.get().toString();
}
public Class<?> loadClass(String name) throws ClassNotFoundException {
return threadLocalClassLoader.get().loadClass(name);
}
public URL getResource(String name) {
return threadLocalClassLoader.get().getResource(name);
}
public Enumeration<URL> getResources(String name) throws IOException {
return threadLocalClassLoader.get().getResources(name);
}
public InputStream getResourceAsStream(String name) {
return threadLocalClassLoader.get().getResourceAsStream(name);
}
public void setDefaultAssertionStatus(boolean enabled) {
threadLocalClassLoader.get().setDefaultAssertionStatus(enabled);
}
public void setPackageAssertionStatus(String packageName, boolean enabled) {
threadLocalClassLoader.get().setPackageAssertionStatus(packageName, enabled);
}
public void setClassAssertionStatus(String className, boolean enabled) {
threadLocalClassLoader.get().setClassAssertionStatus(className, enabled);
}
public void clearAssertionStatus() {
threadLocalClassLoader.get().clearAssertionStatus();
}
public void clearCurrentClassLoader() {
threadLocalClassLoader.remove();
}
}

@ -0,0 +1,59 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.util.Arrays;
import org.redisson.Redisson;
import org.redisson.RedissonRemoteService;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBlockingQueue;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RemoteServiceRequest;
import io.netty.util.concurrent.Future;
public class ExecutorRemoteService extends RedissonRemoteService {
private final RAtomicLong tasksCounter;
private final RAtomicLong status;
public ExecutorRemoteService(Codec codec, Redisson redisson, String name, CommandExecutor commandExecutor) {
super(codec, redisson, name, commandExecutor);
String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}";
tasksCounter = redisson.getAtomicLong(objectName + ":counter");
status = redisson.getAtomicLong(objectName + ":status");
}
@Override
protected Future<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue,
RemoteServiceRequest request) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[2]) == 0 then "
+ "redis.call('rpush', KEYS[3], ARGV[1]); "
+ "redis.call('incr', KEYS[1]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(tasksCounter.getName(), status.getName(), requestQueue.getName()),
encode(request));
}
}

@ -0,0 +1,28 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
public class RedissonClassLoader extends ClassLoader {
public RedissonClassLoader(ClassLoader parent) {
super(parent);
}
public void loadClass(String name, byte[] body) {
defineClass(name, body, 0, body.length);
}
}

@ -0,0 +1,24 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
public interface RemoteExecutorService {
Object execute(String className, byte[] classBody, byte[] state);
void executeVoid(String className, byte[] classBody, byte[] state);
}

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import org.redisson.remote.RRemoteAsync;
import io.netty.util.concurrent.Future;
@RRemoteAsync(RemoteExecutorService.class)
public interface RemoteExecutorServiceAsync {
<T> Future<T> execute(String className, byte[] classBody, byte[] state);
Future<Void> executeVoid(String className, byte[] classBody, byte[] state);
}

@ -0,0 +1,113 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.util.Arrays;
import java.util.concurrent.Callable;
import org.redisson.RedissonClient;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class RemoteExecutorServiceImpl implements RemoteExecutorService {
private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator();
private final Codec codec;
private final String name;
private final CommandExecutor commandExecutor;
private final RAtomicLong tasksCounter;
private final RBucket<Integer> status;
private final RTopic<Integer> topic;
public RemoteExecutorServiceImpl(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name) {
this.commandExecutor = commandExecutor;
this.name = name + ":{"+ RemoteExecutorService.class.getName() + "}";
tasksCounter = redisson.getAtomicLong(this.name + ":counter");
status = redisson.getBucket(this.name + ":status");
topic = redisson.getTopic(this.name + ":topic");
try {
this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public Object execute(String className, byte[] classBody, byte[] state) {
ByteBuf buf = null;
try {
buf = Unpooled.wrappedBuffer(state);
RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader());
cl.loadClass(className, classBody);
classLoader.setCurrentClassLoader(cl);
Callable<?> callable = (Callable<?>) codec.getValueDecoder().decode(buf, null);
return callable.call();
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
buf.release();
finish();
}
}
@Override
public void executeVoid(String className, byte[] classBody, byte[] state) {
ByteBuf buf = null;
try {
buf = Unpooled.wrappedBuffer(state);
RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader());
cl.loadClass(className, classBody);
classLoader.setCurrentClassLoader(cl);
Runnable runnable = (Runnable) codec.getValueDecoder().decode(buf, null);
runnable.run();
} catch (Exception e) {
throw new IllegalArgumentException(e);
} finally {
buf.release();
finish();
}
}
private void finish() {
classLoader.clearCurrentClassLoader();
commandExecutor.evalWrite(name, LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"if redis.call('decr', KEYS[1]) == 0 and redis.call('get', KEYS[2]) == ARGV[1] then "
+ "redis.call('set', KEYS[2], ARGV[2]);"
+ "redis.call('publish', KEYS[3], ARGV[2]);"
+ "end;",
Arrays.<Object>asList(tasksCounter.getName(), status.getName(), topic.getChannelNames().get(0)),
RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE);
}
}

@ -0,0 +1,39 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import org.redisson.misc.PromiseDelegator;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RemotePromise<T> extends PromiseDelegator<T> {
private Future<Boolean> addFuture;
public RemotePromise(Promise<T> promise) {
super(promise);
}
public void setAddFuture(Future<Boolean> addFuture) {
this.addFuture = addFuture;
}
public Future<Boolean> getAddFuture() {
return addFuture;
}
}
Loading…
Cancel
Save