diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 57204ffde..c56826f9d 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -40,6 +40,7 @@ import org.redisson.api.RBucket; import org.redisson.api.RBuckets; import org.redisson.api.RCountDownLatch; import org.redisson.api.RDeque; +import org.redisson.api.RExecutorService; import org.redisson.api.RGeo; import org.redisson.api.RHyperLogLog; import org.redisson.api.RKeys; @@ -67,6 +68,7 @@ import org.redisson.api.RTopic; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.SerializationCodec; import org.redisson.command.CommandExecutor; import org.redisson.command.CommandSyncService; import org.redisson.config.Config; @@ -365,6 +367,15 @@ public class Redisson implements RedissonClient { return new RedissonScript(commandExecutor); } + @Override + public RExecutorService getExecutorService() { + return new RedissonExecutorService(new SerializationCodec(), commandExecutor, this); + } + + public RExecutorService getExecutorService(String name) { + return new RedissonExecutorService(new SerializationCodec(), commandExecutor, this, name); + } + @Override public RRemoteService getRemoteSerivce() { return new RedissonRemoteService(this, commandExecutor); diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 06ba276da..7cdfd3f12 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -39,6 +39,7 @@ import org.redisson.api.RBucket; import org.redisson.api.RBuckets; import org.redisson.api.RCountDownLatch; import org.redisson.api.RDeque; +import org.redisson.api.RExecutorService; import org.redisson.api.RGeo; import org.redisson.api.RHyperLogLog; import org.redisson.api.RKeys; @@ -611,6 +612,8 @@ public interface RedissonClient { */ RScript getScript(); + RExecutorService getExecutorService(); + /** * Returns object for remote operations prefixed with the default name (redisson_remote_service) * diff --git a/src/main/java/org/redisson/RedissonExecutorService.java b/src/main/java/org/redisson/RedissonExecutorService.java new file mode 100644 index 000000000..33b1e32ea --- /dev/null +++ b/src/main/java/org/redisson/RedissonExecutorService.java @@ -0,0 +1,466 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.api.MessageListener; +import org.redisson.api.RAtomicLong; +import org.redisson.api.RBucket; +import org.redisson.api.RExecutorService; +import org.redisson.api.RKeys; +import org.redisson.api.RRemoteService; +import org.redisson.api.RTopic; +import org.redisson.api.RemoteInvocationOptions; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandExecutor; +import org.redisson.connection.ConnectionManager; +import org.redisson.executor.ExecutorRemoteService; +import org.redisson.executor.RemoteExecutorService; +import org.redisson.executor.RemoteExecutorServiceAsync; +import org.redisson.executor.RemoteExecutorServiceImpl; +import org.redisson.executor.RemotePromise; + +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.PlatformDependent; + +public class RedissonExecutorService implements RExecutorService { + + public static final int SHUTDOWN_STATE = 1; + public static final int TERMINATED_STATE = 2; + + private final CommandExecutor commandExecutor; + private final ConnectionManager connectionManager; + private final Codec codec; + private final Redisson redisson; + + private final RAtomicLong tasksCounter; + private final RBucket status; + private final RTopic topic; + private final RKeys keys; + + private final RemoteExecutorServiceAsync asyncService; + private final RemoteExecutorServiceAsync asyncServiceWithoutResult; + + private final Map, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap(); + + private final String name; + + public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson) { + this(codec, commandExecutor, redisson, "redisson_default_executor"); + } + + public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name) { + super(); + this.codec = codec; + this.commandExecutor = commandExecutor; + this.connectionManager = commandExecutor.getConnectionManager(); + this.name = name; + this.redisson = redisson; + + String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}"; + tasksCounter = redisson.getAtomicLong(objectName + ":counter"); + status = redisson.getBucket(objectName + ":status"); + topic = redisson.getTopic(objectName + ":topic"); + keys = redisson.getKeys(); + + RRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor); + asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck()); + asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); + } + + public void register() { + redisson.getRemoteSerivce(name, codec).register(RemoteExecutorService.class, + new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, name)); + } + + @Override + public void execute(Runnable task) { + byte[] classBody = getClassBody(task); + try { + byte[] state = codec.getValueEncoder().encode(task); + RemotePromise promise = (RemotePromise)asyncServiceWithoutResult.executeVoid(task.getClass().getName(), classBody, state); + check(promise); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + private byte[] getClassBody(Object task) { + Class c = task.getClass(); + byte[] classBody = class2bytes.get(c); + if (classBody == null) { + String className = c.getName(); + String classAsPath = className.replace('.', '/') + ".class"; + InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath); + + DataInputStream s = new DataInputStream(classStream); + try { + classBody = new byte[s.available()]; + s.readFully(classBody); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + + class2bytes.put(c, classBody); + } + return classBody; + } + + @Override + public void shutdown() { + commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + "if redis.call('exists', KEYS[2]) == 0 then " + + "if redis.call('get', KEYS[1]) == 0 or redis.call('exists', KEYS[1]) == 0 then " + + "redis.call('set', KEYS[2], ARGV[2]);" + + "redis.call('publish', KEYS[3], ARGV[2]);" + + "else " + + "redis.call('set', KEYS[2], ARGV[1]);" + + "end;" + + "end;", + Arrays.asList(tasksCounter.getName(), status.getName(), topic.getChannelNames().get(0)), + SHUTDOWN_STATE, TERMINATED_STATE); + } + + public String getName() { + return name; + } + + public boolean delete() { + return keys.delete(status.getName(), tasksCounter.getName()) > 0; + } + + @Override + public List shutdownNow() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShutdown() { + return status.isExists() && status.get() >= SHUTDOWN_STATE; + } + + @Override + public boolean isTerminated() { + return status.isExists() && status.get() == TERMINATED_STATE; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (isTerminated()) { + return true; + } + + final CountDownLatch latch = new CountDownLatch(1); + MessageListener listener = new MessageListener() { + @Override + public void onMessage(String channel, Integer msg) { + if (msg == TERMINATED_STATE) { + latch.countDown(); + } + } + }; + int listenerId = topic.addListener(listener); + + if (isTerminated()) { + topic.removeListener(listenerId); + return true; + } + + boolean res = latch.await(timeout, unit); + topic.removeListener(listenerId); + return res; + } + + @Override + public Future submit(Callable task) { + byte[] classBody = getClassBody(task); + try { + byte[] state = codec.getValueEncoder().encode(task); + RemotePromise promise = (RemotePromise)asyncService.execute(task.getClass().getName(), classBody, state); + check(promise); + return promise; + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + private void check(RemotePromise promise) { + io.netty.util.concurrent.Future addFuture = promise.getAddFuture(); + Boolean res = addFuture.awaitUninterruptibly().getNow(); + if (!res) { + throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state"); + } + } + + @Override + public Future submit(Runnable task, final T result) { + final Promise resultFuture = connectionManager.newPromise(); + io.netty.util.concurrent.Future future = (io.netty.util.concurrent.Future) submit(task); + future.addListener(new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + resultFuture.setFailure(future.cause()); + return; + } + resultFuture.setSuccess(result); + } + }); + return resultFuture; + } + + @Override + public Future submit(Runnable task) { + byte[] classBody = getClassBody(task); + try { + byte[] state = codec.getValueEncoder().encode(task); + RemotePromise promise = (RemotePromise) asyncService.executeVoid(task.getClass().getName(), classBody, state); + check(promise); + return promise; + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + private T doInvokeAny(Collection> tasks, + boolean timed, long millis) throws InterruptedException, ExecutionException, TimeoutException { + if (tasks == null) { + throw new NullPointerException(); + } + + int ntasks = tasks.size(); + if (ntasks == 0) { + throw new IllegalArgumentException(); + } + + List> futures = new ArrayList>(ntasks); + + try { + ExecutionException ee = null; + long lastTime = timed ? System.currentTimeMillis() : 0; + Iterator> it = tasks.iterator(); + + // Start one task for sure; the rest incrementally + futures.add(submit(it.next())); + --ntasks; + int active = 1; + + for (;;) { + Future f = poll(futures); + if (f == null) { + if (ntasks > 0) { + --ntasks; + futures.add(submit(it.next())); + ++active; + } + else if (active == 0) + break; + else if (timed) { + f = poll(futures, millis, TimeUnit.MILLISECONDS); + if (f == null) + throw new TimeoutException(); + long now = System.currentTimeMillis(); + millis -= now - lastTime; + lastTime = now; + } + else + f = poll(futures, -1, null); + } + if (f != null) { + --active; + try { + return f.get(); + } catch (ExecutionException eex) { + ee = eex; + } catch (RuntimeException rex) { + ee = new ExecutionException(rex); + } + } + } + + if (ee == null) + ee = new ExecutionException("No tasks were finised", null); + throw ee; + + } finally { + for (Future f : futures) + f.cancel(true); + } + } + + private Future poll(List> futures, long timeout, TimeUnit timeUnit) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference> result = new AtomicReference>(); + FutureListener listener = new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + latch.countDown(); + result.compareAndSet(null, future); + } + }; + for (Future future : futures) { + io.netty.util.concurrent.Future f = (io.netty.util.concurrent.Future) future; + f.addListener(listener); + } + + if (timeout == -1) { + latch.await(); + } else { + latch.await(timeout, timeUnit); + } + + for (Future future : futures) { + io.netty.util.concurrent.Future f = (io.netty.util.concurrent.Future) future; + f.removeListener(listener); + } + + return result.get(); + } + + private Future poll(List> futures) { + for (Future future : futures) { + if (future.isDone()) { + return future; + } + } + return null; + } + + + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + try { + return doInvokeAny(tasks, false, 0); + } catch (TimeoutException cannotHappen) { + return null; + } + } + + public T invokeAny(Collection> tasks, + long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return doInvokeAny(tasks, true, unit.toMillis(timeout)); + } + + public List> invokeAll(Collection> tasks) throws InterruptedException { + if (tasks == null) { + throw new NullPointerException(); + } + + List> futures = new ArrayList>(tasks.size()); + boolean done = false; + try { + for (Callable t : tasks) { + Future future = submit(t); + futures.add(future); + } + for (Future f : futures) { + if (!f.isDone()) { + try { + f.get(); + } catch (CancellationException ignore) { + } catch (ExecutionException ignore) { + } + } + } + done = true; + return futures; + } finally { + if (!done) + for (Future f : futures) + f.cancel(true); + } + } + + public List> invokeAll(Collection> tasks, + long timeout, TimeUnit unit) throws InterruptedException { + if (tasks == null || unit == null) { + throw new NullPointerException(); + } + + long millis = unit.toMillis(timeout); + List> futures = new ArrayList>(tasks.size()); + boolean done = false; + + try { + long lastTime = System.currentTimeMillis(); + + for (Callable task : tasks) { + Future future = submit(task); + futures.add(future); + + long now = System.currentTimeMillis(); + millis -= now - lastTime; + lastTime = now; + if (millis <= 0) { + int remainFutures = tasks.size() - futures.size(); + for (int i = 0; i < remainFutures; i++) { + Promise cancelledFuture = connectionManager.newPromise(); + cancelledFuture.cancel(true); + futures.add(cancelledFuture); + + } + return futures; + } + } + + for (Future f : futures) { + if (!f.isDone()) { + if (millis <= 0) + return futures; + try { + f.get(millis, TimeUnit.MILLISECONDS); + } catch (CancellationException ignore) { + } catch (ExecutionException ignore) { + } catch (TimeoutException toe) { + return futures; + } + long now = System.currentTimeMillis(); + millis -= now - lastTime; + lastTime = now; + } + } + done = true; + return futures; + } finally { + if (!done) + for (Future f : futures) + f.cancel(true); + } + } + +} diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index a4cbf2b95..9d41566a3 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -31,10 +31,12 @@ import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RRemoteService; import org.redisson.api.RScript; -import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.RScript.Mode; +import org.redisson.api.RemoteInvocationOptions; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.command.CommandExecutor; +import org.redisson.executor.RemotePromise; import org.redisson.remote.RRemoteAsync; import org.redisson.remote.RRemoteServiceResponse; import org.redisson.remote.RemoteServiceAck; @@ -67,9 +69,9 @@ public class RedissonRemoteService implements RRemoteService { private final Map beans = PlatformDependent.newConcurrentHashMap(); private final Codec codec; - private final Redisson redisson; - private final String name; - private final CommandExecutor commandExecutor; + protected final Redisson redisson; + protected final String name; + protected final CommandExecutor commandExecutor; public RedissonRemoteService(Redisson redisson, CommandExecutor commandExecutor) { this(redisson, "redisson_remote_service", commandExecutor); @@ -122,7 +124,7 @@ public class RedissonRemoteService implements RRemoteService { return redisson.getConfig().getCodec(); } - private byte[] encode(Object obj) { + protected byte[] encode(Object obj) { try { return getCodec().getValueEncoder().encode(obj); } catch (IOException e) { @@ -247,6 +249,7 @@ public class RedissonRemoteService implements RRemoteService { .expectResultWithin(executionTimeout, executionTimeUnit)); } + @Override public T get(Class remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout, TimeUnit ackTimeUnit) { return get(remoteInterface, RemoteInvocationOptions.defaults() @@ -254,6 +257,7 @@ public class RedissonRemoteService implements RRemoteService { .expectResultWithin(executionTimeout, executionTimeUnit)); } + @Override public T get(Class remoteInterface, RemoteInvocationOptions options) { for (Annotation annotation : remoteInterface.getAnnotations()) { if (annotation.annotationType() == RRemoteAsync.class) { @@ -310,7 +314,7 @@ public class RedissonRemoteService implements RRemoteService { final RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, optionsCopy, System.currentTimeMillis()); - final Promise result = new PromiseDelegator(ImmediateEventExecutor.INSTANCE.newPromise()) { + final RemotePromise result = new RemotePromise(ImmediateEventExecutor.INSTANCE.newPromise()) { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (optionsCopy.isAckExpected()) { @@ -332,7 +336,8 @@ public class RedissonRemoteService implements RRemoteService { } }; - Future addFuture = requestQueue.addAsync(request); + Future addFuture = addAsync(requestQueue, request); + result.setAddFuture(addFuture); addFuture.addListener(new FutureListener() { @Override @@ -587,9 +592,13 @@ public class RedissonRemoteService implements RRemoteService { private Future> send(long timeout, String responseName, T response) { RBatch batch = redisson.createBatch(); - RBlockingQueueAsync queue = batch.getBlockingQueue(responseName); + RBlockingQueueAsync queue = batch.getBlockingQueue(responseName, getCodec()); queue.putAsync(response); queue.expireAsync(timeout, TimeUnit.MILLISECONDS); return batch.executeAsync(); } + + protected Future addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request) { + return requestQueue.addAsync(request); + } } diff --git a/src/main/java/org/redisson/api/RExecutorService.java b/src/main/java/org/redisson/api/RExecutorService.java new file mode 100644 index 000000000..0fc59b2e0 --- /dev/null +++ b/src/main/java/org/redisson/api/RExecutorService.java @@ -0,0 +1,28 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.ExecutorService; + +public interface RExecutorService extends ExecutorService { + + String getName(); + + boolean delete(); + + void register(); + +} diff --git a/src/main/java/org/redisson/client/codec/JsonJacksonMapValueCodec.java b/src/main/java/org/redisson/client/codec/JsonJacksonMapValueCodec.java index 9587564ca..94b6aea1b 100644 --- a/src/main/java/org/redisson/client/codec/JsonJacksonMapValueCodec.java +++ b/src/main/java/org/redisson/client/codec/JsonJacksonMapValueCodec.java @@ -37,7 +37,8 @@ public class JsonJacksonMapValueCodec extends JsonJacksonCodec { public JsonJacksonMapValueCodec(TypeReference typeReference) { this.typeReference = typeReference; - this.mapper = initObjectMapper(); + this.mapper = new ObjectMapper(); + init(this.mapper); } @Override diff --git a/src/main/java/org/redisson/client/codec/StringCodec.java b/src/main/java/org/redisson/client/codec/StringCodec.java index 8fcb84293..ef5be4232 100644 --- a/src/main/java/org/redisson/client/codec/StringCodec.java +++ b/src/main/java/org/redisson/client/codec/StringCodec.java @@ -50,9 +50,13 @@ public class StringCodec implements Codec { public StringCodec() { this(CharsetUtil.UTF_8); } + + public StringCodec(ClassLoader classLoader) { + this(); + } public StringCodec(String charsetName) { - this(Charset.forName(charsetName)); + this(Charset.forName(charsetName)); } public StringCodec(Charset charset) { diff --git a/src/main/java/org/redisson/codec/CborJacksonCodec.java b/src/main/java/org/redisson/codec/CborJacksonCodec.java index cd98b4336..dfe6c0923 100644 --- a/src/main/java/org/redisson/codec/CborJacksonCodec.java +++ b/src/main/java/org/redisson/codec/CborJacksonCodec.java @@ -24,8 +24,13 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory; * @date 2015-10-16 */ public class CborJacksonCodec extends JsonJacksonCodec { - @Override - protected ObjectMapper initObjectMapper() { - return new ObjectMapper(new CBORFactory()); + + public CborJacksonCodec() { + super(new ObjectMapper(new CBORFactory())); } + + public CborJacksonCodec(ClassLoader classLoader) { + super(createObjectMapper(classLoader, new ObjectMapper(new CBORFactory()))); + } + } diff --git a/src/main/java/org/redisson/codec/CustomObjectInputStream.java b/src/main/java/org/redisson/codec/CustomObjectInputStream.java new file mode 100644 index 000000000..3947c1088 --- /dev/null +++ b/src/main/java/org/redisson/codec/CustomObjectInputStream.java @@ -0,0 +1,42 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.codec; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; + +public class CustomObjectInputStream extends ObjectInputStream { + + private ClassLoader classLoader; + + public CustomObjectInputStream(ClassLoader classLoader, InputStream in) throws IOException { + super(in); + this.classLoader = classLoader; + } + + @Override + protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + try { + String name = desc.getName(); + return Class.forName(name, false, classLoader); + } catch (ClassNotFoundException e) { + return super.resolveClass(desc); + } + } + +} diff --git a/src/main/java/org/redisson/codec/FstCodec.java b/src/main/java/org/redisson/codec/FstCodec.java index 9cadc3df3..ea56eefa2 100644 --- a/src/main/java/org/redisson/codec/FstCodec.java +++ b/src/main/java/org/redisson/codec/FstCodec.java @@ -45,6 +45,16 @@ public class FstCodec implements Codec { public FstCodec() { this(FSTConfiguration.createDefaultConfiguration()); } + + public FstCodec(ClassLoader classLoader) { + this(createConfig(classLoader)); + } + + private static FSTConfiguration createConfig(ClassLoader classLoader) { + FSTConfiguration def = FSTConfiguration.createDefaultConfiguration(); + def.setClassLoader(classLoader); + return def; + } public FstCodec(FSTConfiguration fstConfiguration) { config = fstConfiguration; diff --git a/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/src/main/java/org/redisson/codec/JsonJacksonCodec.java index 1f6f4844e..815cede52 100755 --- a/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -15,6 +15,13 @@ */ package org.redisson.codec; +import java.io.IOException; + +import org.redisson.client.codec.Codec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; + import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonIdentityInfo; @@ -29,14 +36,10 @@ import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder; +import com.fasterxml.jackson.databind.type.TypeFactory; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; -import org.redisson.client.codec.Codec; -import org.redisson.client.handler.State; -import org.redisson.client.protocol.Decoder; -import org.redisson.client.protocol.Encoder; - -import java.io.IOException; /** * @@ -56,11 +59,7 @@ public class JsonJacksonCodec implements Codec { } - private final ObjectMapper mapObjectMapper = initObjectMapper(); - - protected ObjectMapper initObjectMapper() { - return new ObjectMapper(); - } + private final ObjectMapper mapObjectMapper; private final Encoder encoder = new Encoder() { @Override @@ -75,8 +74,23 @@ public class JsonJacksonCodec implements Codec { return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class); } }; - + public JsonJacksonCodec() { + this(new ObjectMapper()); + } + + public JsonJacksonCodec(ClassLoader classLoader) { + this(createObjectMapper(classLoader, new ObjectMapper())); + } + + protected static ObjectMapper createObjectMapper(ClassLoader classLoader, ObjectMapper om) { + TypeFactory tf = TypeFactory.defaultInstance().withClassLoader(classLoader); + om.setTypeFactory(tf); + return om; + } + + public JsonJacksonCodec(ObjectMapper mapObjectMapper) { + this.mapObjectMapper = mapObjectMapper; init(mapObjectMapper); // type info inclusion TypeResolverBuilder mapTyper = new DefaultTypeResolverBuilder(DefaultTyping.NON_FINAL) { @@ -107,6 +121,13 @@ public class JsonJacksonCodec implements Codec { mapTyper.init(JsonTypeInfo.Id.CLASS, null); mapTyper.inclusion(JsonTypeInfo.As.PROPERTY); mapObjectMapper.setDefaultTyping(mapTyper); + + // warm up codec + try { + mapObjectMapper.readValue("1".getBytes(), Object.class); + } catch (IOException e) { + throw new IllegalStateException(e); + } } protected void init(ObjectMapper objectMapper) { diff --git a/src/main/java/org/redisson/codec/KryoCodec.java b/src/main/java/org/redisson/codec/KryoCodec.java index ab2b98767..d368509d0 100755 --- a/src/main/java/org/redisson/codec/KryoCodec.java +++ b/src/main/java/org/redisson/codec/KryoCodec.java @@ -48,9 +48,11 @@ public class KryoCodec implements Codec { private final Queue objects = new ConcurrentLinkedQueue(); private final List> classes; + private final ClassLoader classLoader; - public KryoPoolImpl(List> classes) { + public KryoPoolImpl(List> classes, ClassLoader classLoader) { this.classes = classes; + this.classLoader = classLoader; } public Kryo get() { @@ -72,6 +74,9 @@ public class KryoCodec implements Codec { */ protected Kryo createInstance() { Kryo kryo = new Kryo(); + if (classLoader != null) { + kryo.setClassLoader(classLoader); + } kryo.setReferences(false); for (Class clazz : classes) { kryo.register(clazz); @@ -139,13 +144,20 @@ public class KryoCodec implements Codec { }; public KryoCodec() { - this(new KryoPoolImpl(Collections.>emptyList())); + this(Collections.>emptyList()); } + public KryoCodec(ClassLoader classLoader) { + this(Collections.>emptyList(), classLoader); + } + public KryoCodec(List> classes) { - this(new KryoPoolImpl(classes)); + this(classes, null); } + public KryoCodec(List> classes, ClassLoader classLoader) { + this(new KryoPoolImpl(classes, classLoader)); + } public KryoCodec(KryoPool kryoPool) { this.kryoPool = kryoPool; diff --git a/src/main/java/org/redisson/codec/MsgPackJacksonCodec.java b/src/main/java/org/redisson/codec/MsgPackJacksonCodec.java index 50e25a660..b2cdf1fbe 100644 --- a/src/main/java/org/redisson/codec/MsgPackJacksonCodec.java +++ b/src/main/java/org/redisson/codec/MsgPackJacksonCodec.java @@ -27,9 +27,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; */ public class MsgPackJacksonCodec extends JsonJacksonCodec { - @Override - protected ObjectMapper initObjectMapper() { - return new ObjectMapper(new MessagePackFactory()); + public MsgPackJacksonCodec() { + super(new ObjectMapper(new MessagePackFactory())); } - + + public MsgPackJacksonCodec(ClassLoader classLoader) { + super(createObjectMapper(classLoader, new ObjectMapper(new MessagePackFactory()))); + } + } diff --git a/src/main/java/org/redisson/codec/SerializationCodec.java b/src/main/java/org/redisson/codec/SerializationCodec.java index 59b7724c6..8ea1cfb8c 100644 --- a/src/main/java/org/redisson/codec/SerializationCodec.java +++ b/src/main/java/org/redisson/codec/SerializationCodec.java @@ -39,7 +39,13 @@ public class SerializationCodec implements Codec { @Override public Object decode(ByteBuf buf, State state) throws IOException { try { - ObjectInputStream inputStream = new ObjectInputStream(new ByteBufInputStream(buf)); + ByteBufInputStream in = new ByteBufInputStream(buf); + ObjectInputStream inputStream; + if (classLoader != null) { + inputStream = new CustomObjectInputStream(classLoader, in); + } else { + inputStream = new ObjectInputStream(in); + } return inputStream.readObject(); } catch (IOException e) { throw e; @@ -60,7 +66,17 @@ public class SerializationCodec implements Codec { return result.toByteArray(); } }; + + private final ClassLoader classLoader; + public SerializationCodec() { + this(null); + } + + public SerializationCodec(ClassLoader classLoader) { + this.classLoader = classLoader; + } + @Override public Decoder getMapValueDecoder() { return getValueDecoder(); diff --git a/src/main/java/org/redisson/executor/ClassLoaderDelegator.java b/src/main/java/org/redisson/executor/ClassLoaderDelegator.java new file mode 100644 index 000000000..ba501b5d0 --- /dev/null +++ b/src/main/java/org/redisson/executor/ClassLoaderDelegator.java @@ -0,0 +1,79 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Enumeration; + +public class ClassLoaderDelegator extends ClassLoader { + + private final ThreadLocal threadLocalClassLoader = new ThreadLocal(); + + public void setCurrentClassLoader(ClassLoader classLoader) { + threadLocalClassLoader.set(classLoader); + } + + public int hashCode() { + return threadLocalClassLoader.get().hashCode(); + } + + public boolean equals(Object obj) { + return threadLocalClassLoader.get().equals(obj); + } + + public String toString() { + return threadLocalClassLoader.get().toString(); + } + + public Class loadClass(String name) throws ClassNotFoundException { + return threadLocalClassLoader.get().loadClass(name); + } + + public URL getResource(String name) { + return threadLocalClassLoader.get().getResource(name); + } + + public Enumeration getResources(String name) throws IOException { + return threadLocalClassLoader.get().getResources(name); + } + + public InputStream getResourceAsStream(String name) { + return threadLocalClassLoader.get().getResourceAsStream(name); + } + + public void setDefaultAssertionStatus(boolean enabled) { + threadLocalClassLoader.get().setDefaultAssertionStatus(enabled); + } + + public void setPackageAssertionStatus(String packageName, boolean enabled) { + threadLocalClassLoader.get().setPackageAssertionStatus(packageName, enabled); + } + + public void setClassAssertionStatus(String className, boolean enabled) { + threadLocalClassLoader.get().setClassAssertionStatus(className, enabled); + } + + public void clearAssertionStatus() { + threadLocalClassLoader.get().clearAssertionStatus(); + } + + public void clearCurrentClassLoader() { + threadLocalClassLoader.remove(); + } + +} diff --git a/src/main/java/org/redisson/executor/ExecutorRemoteService.java b/src/main/java/org/redisson/executor/ExecutorRemoteService.java new file mode 100644 index 000000000..92c117586 --- /dev/null +++ b/src/main/java/org/redisson/executor/ExecutorRemoteService.java @@ -0,0 +1,59 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.util.Arrays; + +import org.redisson.Redisson; +import org.redisson.RedissonRemoteService; +import org.redisson.api.RAtomicLong; +import org.redisson.api.RBlockingQueue; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandExecutor; +import org.redisson.remote.RemoteServiceRequest; + +import io.netty.util.concurrent.Future; + +public class ExecutorRemoteService extends RedissonRemoteService { + + private final RAtomicLong tasksCounter; + private final RAtomicLong status; + + public ExecutorRemoteService(Codec codec, Redisson redisson, String name, CommandExecutor commandExecutor) { + super(codec, redisson, name, commandExecutor); + + String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}"; + tasksCounter = redisson.getAtomicLong(objectName + ":counter"); + status = redisson.getAtomicLong(objectName + ":status"); + } + + @Override + protected Future addAsync(RBlockingQueue requestQueue, + RemoteServiceRequest request) { + return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('exists', KEYS[2]) == 0 then " + + "redis.call('rpush', KEYS[3], ARGV[1]); " + + "redis.call('incr', KEYS[1]);" + + "return 1;" + + "end;" + + "return 0;", + Arrays.asList(tasksCounter.getName(), status.getName(), requestQueue.getName()), + encode(request)); + } + +} diff --git a/src/main/java/org/redisson/executor/RedissonClassLoader.java b/src/main/java/org/redisson/executor/RedissonClassLoader.java new file mode 100644 index 000000000..5fceb2b86 --- /dev/null +++ b/src/main/java/org/redisson/executor/RedissonClassLoader.java @@ -0,0 +1,28 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +public class RedissonClassLoader extends ClassLoader { + + public RedissonClassLoader(ClassLoader parent) { + super(parent); + } + + public void loadClass(String name, byte[] body) { + defineClass(name, body, 0, body.length); + } + +} diff --git a/src/main/java/org/redisson/executor/RemoteExecutorService.java b/src/main/java/org/redisson/executor/RemoteExecutorService.java new file mode 100644 index 000000000..ffced181f --- /dev/null +++ b/src/main/java/org/redisson/executor/RemoteExecutorService.java @@ -0,0 +1,24 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +public interface RemoteExecutorService { + + Object execute(String className, byte[] classBody, byte[] state); + + void executeVoid(String className, byte[] classBody, byte[] state); + +} diff --git a/src/main/java/org/redisson/executor/RemoteExecutorServiceAsync.java b/src/main/java/org/redisson/executor/RemoteExecutorServiceAsync.java new file mode 100644 index 000000000..039ec8953 --- /dev/null +++ b/src/main/java/org/redisson/executor/RemoteExecutorServiceAsync.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import org.redisson.remote.RRemoteAsync; + +import io.netty.util.concurrent.Future; + +@RRemoteAsync(RemoteExecutorService.class) +public interface RemoteExecutorServiceAsync { + + Future execute(String className, byte[] classBody, byte[] state); + + Future executeVoid(String className, byte[] classBody, byte[] state); + +} diff --git a/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java b/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java new file mode 100644 index 000000000..b62029090 --- /dev/null +++ b/src/main/java/org/redisson/executor/RemoteExecutorServiceImpl.java @@ -0,0 +1,113 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import java.util.Arrays; +import java.util.concurrent.Callable; + +import org.redisson.RedissonClient; +import org.redisson.RedissonExecutorService; +import org.redisson.api.RAtomicLong; +import org.redisson.api.RBucket; +import org.redisson.api.RTopic; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandExecutor; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +public class RemoteExecutorServiceImpl implements RemoteExecutorService { + + private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator(); + + private final Codec codec; + private final String name; + private final CommandExecutor commandExecutor; + + private final RAtomicLong tasksCounter; + private final RBucket status; + private final RTopic topic; + + public RemoteExecutorServiceImpl(CommandExecutor commandExecutor, RedissonClient redisson, Codec codec, String name) { + this.commandExecutor = commandExecutor; + + this.name = name + ":{"+ RemoteExecutorService.class.getName() + "}"; + tasksCounter = redisson.getAtomicLong(this.name + ":counter"); + status = redisson.getBucket(this.name + ":status"); + topic = redisson.getTopic(this.name + ":topic"); + + try { + this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + @Override + public Object execute(String className, byte[] classBody, byte[] state) { + ByteBuf buf = null; + try { + buf = Unpooled.wrappedBuffer(state); + + RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader()); + cl.loadClass(className, classBody); + classLoader.setCurrentClassLoader(cl); + + Callable callable = (Callable) codec.getValueDecoder().decode(buf, null); + return callable.call(); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } finally { + buf.release(); + finish(); + } + } + + @Override + public void executeVoid(String className, byte[] classBody, byte[] state) { + ByteBuf buf = null; + try { + buf = Unpooled.wrappedBuffer(state); + + RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader()); + cl.loadClass(className, classBody); + classLoader.setCurrentClassLoader(cl); + + Runnable runnable = (Runnable) codec.getValueDecoder().decode(buf, null); + runnable.run(); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } finally { + buf.release(); + finish(); + } + } + + private void finish() { + classLoader.clearCurrentClassLoader(); + + commandExecutor.evalWrite(name, LongCodec.INSTANCE, RedisCommands.EVAL_VOID, + "if redis.call('decr', KEYS[1]) == 0 and redis.call('get', KEYS[2]) == ARGV[1] then " + + "redis.call('set', KEYS[2], ARGV[2]);" + + "redis.call('publish', KEYS[3], ARGV[2]);" + + "end;", + Arrays.asList(tasksCounter.getName(), status.getName(), topic.getChannelNames().get(0)), + RedissonExecutorService.SHUTDOWN_STATE, RedissonExecutorService.TERMINATED_STATE); + } + +} diff --git a/src/main/java/org/redisson/executor/RemotePromise.java b/src/main/java/org/redisson/executor/RemotePromise.java new file mode 100644 index 000000000..1bbd90ea4 --- /dev/null +++ b/src/main/java/org/redisson/executor/RemotePromise.java @@ -0,0 +1,39 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.executor; + +import org.redisson.misc.PromiseDelegator; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +public class RemotePromise extends PromiseDelegator { + + private Future addFuture; + + public RemotePromise(Promise promise) { + super(promise); + } + + public void setAddFuture(Future addFuture) { + this.addFuture = addFuture; + } + + public Future getAddFuture() { + return addFuture; + } + +}