Merge branch 'mrniko/master' into feature/travis-ci

pull/509/head
Rui Gu 9 years ago
commit 4cadbd6f2b

@ -3,8 +3,6 @@ Redis based In-Memory Data Grid for Java. Redisson.
[![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/) [![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/)
##Please take part in [Redisson survey](https://ru.surveymonkey.com/r/LP7RG8Q)
Use familiar Java data structures with power of [Redis](http://redis.io). Use familiar Java data structures with power of [Redis](http://redis.io).
Based on high-performance async and lock-free Java Redis client and [Netty 4](http://netty.io) framework. Based on high-performance async and lock-free Java Redis client and [Netty 4](http://netty.io) framework.
@ -19,6 +17,9 @@ Licensed under the Apache License 2.0.
Welcome to support chat - [![Join the chat at https://gitter.im/mrniko/redisson](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mrniko/redisson?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) Welcome to support chat - [![Join the chat at https://gitter.im/mrniko/redisson](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mrniko/redisson?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
##Please take part in [Redisson survey](https://ru.surveymonkey.com/r/LP7RG8Q)
####Try [Redisson PRO](http://redisson.pro) edition
Features Features
================================ ================================
* [AWS ElastiCache](https://aws.amazon.com/elasticache/) servers mode: * [AWS ElastiCache](https://aws.amazon.com/elasticache/) servers mode:
@ -63,7 +64,7 @@ Features
* [Spring cache](http://docs.spring.io/spring/docs/current/spring-framework-reference/html/cache.html) integration * [Spring cache](http://docs.spring.io/spring/docs/current/spring-framework-reference/html/cache.html) integration
* Supports [Reactive Streams](http://www.reactive-streams.org) * Supports [Reactive Streams](http://www.reactive-streams.org)
* Supports [Redis pipelining](http://redis.io/topics/pipelining) (command batches) * Supports [Redis pipelining](http://redis.io/topics/pipelining) (command batches)
* Supports [Remote services](https://github.com/mrniko/redisson/wiki/5.-distributed-objects#513-remote-service) * Supports [Remote services](https://github.com/mrniko/redisson/wiki/6.-distributed-objects#69-remote-service)
* Supports Android platform * Supports Android platform
* Supports auto-reconnect * Supports auto-reconnect
* Supports failed to send command auto-retry * Supports failed to send command auto-retry

@ -15,6 +15,16 @@
*/ */
package org.redisson; package org.redisson;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
import org.redisson.core.*;
import org.redisson.remote.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
@ -23,27 +33,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBlockingQueueAsync;
import org.redisson.core.RRemoteService;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceAckTimeoutException;
import org.redisson.remote.RemoteServiceKey;
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RemoteServiceTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -108,9 +97,10 @@ public class RedissonRemoteService implements RRemoteService {
// do not subscribe now, see https://github.com/mrniko/redisson/issues/493 // do not subscribe now, see https://github.com/mrniko/redisson/issues/493
// subscribe(remoteInterface, requestQueue); // subscribe(remoteInterface, requestQueue);
final RemoteServiceRequest request = future.getNow(); final RemoteServiceRequest request = future.getNow();
if (System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) { // check the ack only if expected
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout"); log.debug("request: {} has been skipped due to ackTimeout");
// re-subscribe after a skipped ackTimeout // re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue); subscribe(remoteInterface, requestQueue);
@ -119,24 +109,29 @@ public class RedissonRemoteService implements RRemoteService {
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId(); final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId();
Future<List<?>> ackClientsFuture = send(request.getAckTimeout(), responseName, new RemoteServiceAck()); // send the ack only if expected
ackClientsFuture.addListener(new FutureListener<List<?>>() { if (request.getOptions().isAckExpected()) {
@Override Future<List<?>> ackClientsFuture = send(request.getOptions().getAckTimeoutInMillis(), responseName, new RemoteServiceAck());
public void operationComplete(Future<List<?>> future) throws Exception { ackClientsFuture.addListener(new FutureListener<List<?>>() {
if (!future.isSuccess()) { @Override
log.error("Can't send ack for request: " + request, future.cause()); public void operationComplete(Future<List<?>> future) throws Exception {
if (future.cause() instanceof RedissonShutdownException) { if (!future.isSuccess()) {
log.error("Can't send ack for request: " + request, future.cause());
if (future.cause() instanceof RedissonShutdownException) {
return;
}
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue);
return; return;
} }
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue);
return;
}
invokeMethod(remoteInterface, requestQueue, request, method, responseName); invokeMethod(remoteInterface, requestQueue, request, method, responseName);
} }
}); });
} else {
invokeMethod(remoteInterface, requestQueue, request, method, responseName);
}
} }
}); });
@ -153,65 +148,103 @@ public class RedissonRemoteService implements RRemoteService {
responseHolder.set(response); responseHolder.set(response);
log.error("Can't execute: " + request, e); log.error("Can't execute: " + request, e);
} }
Future<List<?>> clientsFuture = send(request.getResponseTimeout(), responseName, responseHolder.get()); // send the response only if expected
clientsFuture.addListener(new FutureListener<List<?>>() { if (request.getOptions().isResultExpected()) {
@Override Future<List<?>> clientsFuture = send(request.getOptions().getExecutionTimeoutInMillis(), responseName, responseHolder.get());
public void operationComplete(Future<List<?>> future) throws Exception { clientsFuture.addListener(new FutureListener<List<?>>() {
if (!future.isSuccess()) { @Override
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause()); public void operationComplete(Future<List<?>> future) throws Exception {
if (future.cause() instanceof RedissonShutdownException) { if (!future.isSuccess()) {
return; log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause());
if (future.cause() instanceof RedissonShutdownException) {
return;
}
} }
// re-subscribe anyways (fail or success) after the send (response)
subscribe(remoteInterface, requestQueue);
} }
// re-subscribe anyways (fail or success) after the send (response) });
subscribe(remoteInterface, requestQueue); } else {
} // re-subscribe anyways after the method invocation
}); subscribe(remoteInterface, requestQueue);
}
} }
@Override @Override
public <T> T get(Class<T> remoteInterface) { public <T> T get(Class<T> remoteInterface) {
return get(remoteInterface, 30, TimeUnit.SECONDS); return get(remoteInterface, RemoteInvocationOptions.defaults());
} }
@Override @Override
public <T> T get(final Class<T> remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit) { public <T> T get(final Class<T> remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit) {
return get(remoteInterface, executionTimeout, executionTimeUnit, 1, TimeUnit.SECONDS); return get(remoteInterface, RemoteInvocationOptions.defaults()
.expectResultWithin(executionTimeout, executionTimeUnit));
} }
public <T> T get(final Class<T> remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit, public <T> T get(final Class<T> remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit,
final long ackTimeout, final TimeUnit ackTimeUnit) { final long ackTimeout, final TimeUnit ackTimeUnit) {
return get(remoteInterface, RemoteInvocationOptions.defaults()
.expectAckWithin(ackTimeout, ackTimeUnit)
.expectResultWithin(executionTimeout, executionTimeUnit));
}
public <T> T get(final Class<T> remoteInterface, final RemoteInvocationOptions options) {
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId();
InvocationHandler handler = new InvocationHandler() { InvocationHandler handler = new InvocationHandler() {
@Override @Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("toString")) {
return toString;
} else if (method.getName().equals("equals")) {
return proxy == args[0];
} else if (method.getName().equals("hashCode")) {
return toString.hashCode();
}
if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE)))
throw new IllegalArgumentException("The noResult option only supports void return value");
String requestId = generateRequestId(); String requestId = generateRequestId();
String requestQueueName = name + ":{" + remoteInterface.getName() + "}"; String requestQueueName = name + ":{" + remoteInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName); RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, RemoteServiceRequest request = new RemoteServiceRequest(requestId,
ackTimeUnit.toMillis(ackTimeout), executionTimeUnit.toMillis(executionTimeout), System.currentTimeMillis()); method.getName(), args, optionsCopy, System.currentTimeMillis());
requestQueue.add(request); requestQueue.add(request);
String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId; RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseName); if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
String responseName = name + ":{" + remoteInterface.getName() + "}:" + requestId;
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(ackTimeout, ackTimeUnit); responseQueue = redisson.getBlockingQueue(responseName);
if (ack == null) {
throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeUnit.toMillis(ackTimeout) + "ms for request: " + request);
} }
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(executionTimeout, executionTimeUnit); // poll for the ack only if expected
if (response == null) { if (optionsCopy.isAckExpected()) {
throw new RemoteServiceTimeoutException("No response after " + executionTimeUnit.toMillis(executionTimeout) + "ms for request: " + request); RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (ack == null) {
throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
}
} }
if (response.getError() != null) {
throw response.getError(); // poll for the response only if expected
if (optionsCopy.isResultExpected()) {
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (response == null) {
throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
}
if (response.getError() != null) {
throw response.getError();
}
return response.getResult();
} }
return response.getResult();
return null;
} }
}; };
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler); return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
} }
private String generateRequestId() { private String generateRequestId() {
@ -228,5 +261,4 @@ public class RedissonRemoteService implements RRemoteService {
queue.expireAsync(timeout, TimeUnit.MILLISECONDS); queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
return batch.executeAsync(); return batch.executeAsync();
} }
} }

@ -72,8 +72,7 @@ public class FstCodec implements Codec {
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
FSTObjectOutput oos = config.getObjectOutput(os); FSTObjectOutput oos = config.getObjectOutput(os);
oos.writeObject(in); oos.writeObject(in);
oos.close(); oos.flush();
return os.toByteArray(); return os.toByteArray();
} }
}; };

@ -77,21 +77,32 @@ public interface RRemoteService {
/** /**
* Get remote service object for remote invocations. * Get remote service object for remote invocations.
* <p/> * <p/>
* Ack timeout = 1000 ms by default * This method is a shortcut for
* <p/> * <pre>
* Execution timeout = 30 sec by default * get(remoteInterface, RemoteInvocationOptions.defaults())
* * </pre>
*
* @see RemoteInvocationOptions#defaults()
* @see #get(Class, RemoteInvocationOptions)
*
* @param remoteInterface * @param remoteInterface
* @return * @return
*/ */
<T> T get(Class<T> remoteInterface); <T> T get(Class<T> remoteInterface);
/** /**
* Get remote service object for remote invocations * Get remote service object for remote invocations
* with specified invocation timeout. * with specified invocation timeout.
* <p/> * <p/>
* Ack timeout = 1000 ms by default * This method is a shortcut for
* * <pre>
* get(remoteInterface, RemoteInvocationOptions.defaults()
* .expectResultWithin(executionTimeout, executionTimeUnit))
* </pre>
*
* @see RemoteInvocationOptions#defaults()
* @see #get(Class, RemoteInvocationOptions)
*
* @param remoteInterface * @param remoteInterface
* @param executionTimeout - invocation timeout * @param executionTimeout - invocation timeout
* @param executionTimeUnit * @param executionTimeUnit
@ -102,7 +113,17 @@ public interface RRemoteService {
/** /**
* Get remote service object for remote invocations * Get remote service object for remote invocations
* with specified invocation and ack timeouts * with specified invocation and ack timeouts
* * <p/>
* This method is a shortcut for
* <pre>
* get(remoteInterface, RemoteInvocationOptions.defaults()
* .expectAckWithin(ackTimeout, ackTimeUnit)
* .expectResultWithin(executionTimeout, executionTimeUnit))
* </pre>
*
* @see RemoteInvocationOptions
* @see #get(Class, RemoteInvocationOptions)
*
* @param remoteInterface * @param remoteInterface
* @param executionTimeout - invocation timeout * @param executionTimeout - invocation timeout
* @param executionTimeUnit * @param executionTimeUnit
@ -111,5 +132,17 @@ public interface RRemoteService {
* @return * @return
*/ */
<T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout, TimeUnit ackTimeUnit); <T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout, TimeUnit ackTimeUnit);
/**
* Get remote service object for remote invocations
* with the specified options
* <p/>
* Note that when using the noResult() option,
* it is expected that the invoked method returns void,
* or else IllegalArgumentException will be thrown.
*
* @see RemoteInvocationOptions
*/
<T> T get(Class<T> remoteInterface, RemoteInvocationOptions options);
} }

@ -0,0 +1,141 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
/**
* RRemoteService invocation options.
*
* Used to tune how RRemoteService will behave
* in regard to the remote invocations acknowledgement
* and execution timeout.
* <p/>
* Examples:
* <pre>
* // 1 second ack timeout and 30 seconds execution timeout
* RemoteInvocationOptions options =
* RemoteInvocationOptions.defaults();
*
* // no ack but 30 seconds execution timeout
* RemoteInvocationOptions options =
* RemoteInvocationOptions.defaults()
* .noAck();
*
* // 1 second ack timeout then forget the result
* RemoteInvocationOptions options =
* RemoteInvocationOptions.defaults()
* .noResult();
*
* // 1 minute ack timeout then forget about the result
* RemoteInvocationOptions options =
* RemoteInvocationOptions.defaults()
* .expectAckWithin(1, TimeUnit.MINUTES)
* .noResult();
*
* // no ack and forget about the result (fire and forget)
* RemoteInvocationOptions options =
* RemoteInvocationOptions.defaults()
* .noAck()
* .noResult();
* </pre>
*
* @see RRemoteService#get(Class, RemoteInvocationOptions)
*/
public class RemoteInvocationOptions implements Serializable {
private Long ackTimeoutInMillis;
private Long executionTimeoutInMillis;
private RemoteInvocationOptions() {
}
public RemoteInvocationOptions(RemoteInvocationOptions copy) {
this.ackTimeoutInMillis = copy.ackTimeoutInMillis;
this.executionTimeoutInMillis = copy.executionTimeoutInMillis;
}
/**
* Creates a new instance of RemoteInvocationOptions with opinionated defaults.
* <p/>
* This is equivalent to:
* <pre>
* new RemoteInvocationOptions()
* .expectAckWithin(1, TimeUnit.SECONDS)
* .expectResultWithin(30, TimeUnit.SECONDS)
* </pre>
*/
public static RemoteInvocationOptions defaults() {
return new RemoteInvocationOptions()
.expectAckWithin(1, TimeUnit.SECONDS)
.expectResultWithin(20, TimeUnit.SECONDS);
}
public Long getAckTimeoutInMillis() {
return ackTimeoutInMillis;
}
public Long getExecutionTimeoutInMillis() {
return executionTimeoutInMillis;
}
public boolean isAckExpected() {
return ackTimeoutInMillis != null;
}
public boolean isResultExpected() {
return executionTimeoutInMillis != null;
}
public RemoteInvocationOptions expectAckWithin(long ackTimeoutInMillis) {
this.ackTimeoutInMillis = ackTimeoutInMillis;
return this;
}
public RemoteInvocationOptions expectAckWithin(long ackTimeout, TimeUnit timeUnit) {
this.ackTimeoutInMillis = timeUnit.toMillis(ackTimeout);
return this;
}
public RemoteInvocationOptions noAck() {
ackTimeoutInMillis = null;
return this;
}
public RemoteInvocationOptions expectResultWithin(long executionTimeoutInMillis) {
this.executionTimeoutInMillis = executionTimeoutInMillis;
return this;
}
public RemoteInvocationOptions expectResultWithin(long executionTimeout, TimeUnit timeUnit) {
this.executionTimeoutInMillis = timeUnit.toMillis(executionTimeout);
return this;
}
public RemoteInvocationOptions noResult() {
executionTimeoutInMillis = null;
return this;
}
@Override
public String toString() {
return "RemoteInvocationOptions[" +
"ackTimeoutInMillis=" + ackTimeoutInMillis +
", executionTimeoutInMillis=" + executionTimeoutInMillis +
']';
}
}

@ -15,6 +15,8 @@
*/ */
package org.redisson.remote; package org.redisson.remote;
public interface RRemoteServiceResponse { import java.io.Serializable;
public interface RRemoteServiceResponse extends Serializable {
} }

@ -15,12 +15,14 @@
*/ */
package org.redisson.remote; package org.redisson.remote;
import java.io.Serializable;
/** /**
* Worker sends this message when it has received a {@link RemoteServiceRequest}. * Worker sends this message when it has received a {@link RemoteServiceRequest}.
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class RemoteServiceAck implements RRemoteServiceResponse { public class RemoteServiceAck implements RRemoteServiceResponse, Serializable {
} }

@ -15,43 +15,36 @@
*/ */
package org.redisson.remote; package org.redisson.remote;
import org.redisson.core.RemoteInvocationOptions;
import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
public class RemoteServiceRequest { public class RemoteServiceRequest implements Serializable {
private String requestId; private String requestId;
private String methodName; private String methodName;
private Object[] args; private Object[] args;
private long ackTimeout; private RemoteInvocationOptions options;
private long responseTimeout;
private long date; private long date;
public RemoteServiceRequest() { public RemoteServiceRequest() {
} }
public RemoteServiceRequest(String requestId, String methodName, Object[] args, long ackTimeout, long responseTimeout, long date) { public RemoteServiceRequest(String requestId, String methodName, Object[] args, RemoteInvocationOptions options, long date) {
super(); super();
this.requestId = requestId; this.requestId = requestId;
this.methodName = methodName; this.methodName = methodName;
this.args = args; this.args = args;
this.ackTimeout = ackTimeout; this.options = options;
this.responseTimeout = responseTimeout;
this.date = date; this.date = date;
} }
public long getResponseTimeout() {
return responseTimeout;
}
public long getDate() { public long getDate() {
return date; return date;
} }
public long getAckTimeout() {
return ackTimeout;
}
public String getRequestId() { public String getRequestId() {
return requestId; return requestId;
} }
@ -59,7 +52,11 @@ public class RemoteServiceRequest {
public Object[] getArgs() { public Object[] getArgs() {
return args; return args;
} }
public RemoteInvocationOptions getOptions() {
return options;
}
public String getMethodName() { public String getMethodName() {
return methodName; return methodName;
} }
@ -67,7 +64,7 @@ public class RemoteServiceRequest {
@Override @Override
public String toString() { public String toString() {
return "RemoteServiceRequest [requestId=" + requestId + ", methodName=" + methodName + ", args=" return "RemoteServiceRequest [requestId=" + requestId + ", methodName=" + methodName + ", args="
+ Arrays.toString(args) + ", ackTimeout=" + ackTimeout + ", date=" + date + "]"; + Arrays.toString(args) + ", options=" + options + ", date=" + date + "]";
} }
} }

@ -15,7 +15,9 @@
*/ */
package org.redisson.remote; package org.redisson.remote;
public class RemoteServiceResponse implements RRemoteServiceResponse { import java.io.Serializable;
public class RemoteServiceResponse implements RRemoteServiceResponse, Serializable {
private Object result; private Object result;
private Throwable error; private Throwable error;

@ -1,22 +1,69 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.assertThat; import io.netty.handler.codec.EncoderException;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.codec.FstCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.core.RemoteInvocationOptions;
import org.redisson.remote.RemoteServiceAckTimeoutException;
import org.redisson.remote.RemoteServiceTimeoutException;
import java.io.IOException; import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert; import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import org.redisson.remote.RemoteServiceTimeoutException;
public class RedissonRemoteServiceTest extends BaseTest { public class RedissonRemoteServiceTest extends BaseTest {
public static class Pojo {
private String stringField;
public Pojo() {
}
public Pojo(String stringField) {
this.stringField = stringField;
}
public String getStringField() {
return stringField;
}
public void setStringField(String stringField) {
this.stringField = stringField;
}
}
public static class SerializablePojo implements Serializable {
private String stringField;
public SerializablePojo() {
}
public SerializablePojo(String stringField) {
this.stringField = stringField;
}
public String getStringField() {
return stringField;
}
public void setStringField(String stringField) {
this.stringField = stringField;
}
}
public interface RemoteInterface { public interface RemoteInterface {
void voidMethod(String name, Long param); void voidMethod(String name, Long param);
Long resultMethod(Long value); Long resultMethod(Long value);
void errorMethod() throws IOException; void errorMethod() throws IOException;
@ -24,7 +71,11 @@ public class RedissonRemoteServiceTest extends BaseTest {
void errorMethodWithCause(); void errorMethodWithCause();
void timeoutMethod() throws InterruptedException; void timeoutMethod() throws InterruptedException;
Pojo doSomethingWithPojo(Pojo pojo);
SerializablePojo doSomethingWithSerializablePojo(SerializablePojo pojo);
} }
public class RemoteImpl implements RemoteInterface { public class RemoteImpl implements RemoteInterface {
@ -33,7 +84,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
public void voidMethod(String name, Long param) { public void voidMethod(String name, Long param) {
System.out.println(name + " " + param); System.out.println(name + " " + param);
} }
@Override @Override
public Long resultMethod(Long value) { public Long resultMethod(Long value) {
return value*2; return value*2;
@ -58,7 +109,15 @@ public class RedissonRemoteServiceTest extends BaseTest {
Thread.sleep(2000); Thread.sleep(2000);
} }
@Override
public Pojo doSomethingWithPojo(Pojo pojo) {
return pojo;
}
@Override
public SerializablePojo doSomethingWithSerializablePojo(SerializablePojo pojo) {
return pojo;
}
} }
@Test @Test
@ -191,18 +250,271 @@ public class RedissonRemoteServiceTest extends BaseTest {
@Test @Test
public void testInvocationWithServiceName() { public void testInvocationWithServiceName() {
String name = "MyServiceName"; RedissonClient server = Redisson.create();
RedissonClient client = Redisson.create();
RedissonClient r1 = Redisson.create(); server.getRemoteSerivce("MyServiceNamespace").register(RemoteInterface.class, new RemoteImpl());
r1.getRemoteSerivce(name).register(RemoteInterface.class, new RemoteImpl());
RedissonClient r2 = Redisson.create(); RemoteInterface serviceRemoteInterface = client.getRemoteSerivce("MyServiceNamespace").get(RemoteInterface.class);
RemoteInterface ri = r2.getRemoteSerivce(name).get(RemoteInterface.class); RemoteInterface otherServiceRemoteInterface = client.getRemoteSerivce("MyOtherServiceNamespace").get(RemoteInterface.class);
RemoteInterface defaultServiceRemoteInterface = client.getRemoteSerivce().get(RemoteInterface.class);
ri.voidMethod("someName", 100L); assertThat(serviceRemoteInterface.resultMethod(21L)).isEqualTo(42L);
assertThat(ri.resultMethod(100L)).isEqualTo(200);
r1.shutdown(); try {
r2.shutdown(); otherServiceRemoteInterface.resultMethod(21L);
Assert.fail("Invoking a service in an unregistered custom services namespace should throw");
} catch (Exception e) {
assertThat(e).isInstanceOf(RemoteServiceAckTimeoutException.class);
}
try {
defaultServiceRemoteInterface.resultMethod(21L);
Assert.fail("Invoking a service in the unregistered default services namespace should throw");
} catch (Exception e) {
assertThat(e).isInstanceOf(RemoteServiceAckTimeoutException.class);
}
client.shutdown();
server.shutdown();
}
@Test
public void testProxyToStringEqualsAndHashCode() {
RedissonClient client = Redisson.create();
try {
RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class);
try {
System.out.println(service.toString());
} catch (Exception e) {
Assert.fail("calling toString on the client service proxy should not make a remote call");
}
try {
assertThat(service.hashCode() == service.hashCode()).isTrue();
} catch (Exception e) {
Assert.fail("calling hashCode on the client service proxy should not make a remote call");
}
try {
assertThat(service.equals(service)).isTrue();
} catch (Exception e) {
Assert.fail("calling equals on the client service proxy should not make a remote call");
}
} finally {
client.shutdown();
}
}
@Test
public void testInvocationWithFstCodec() {
RedissonClient server = Redisson.create(createConfig().setCodec(new FstCodec()));
RedissonClient client = Redisson.create(createConfig().setCodec(new FstCodec()));
try {
server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class);
try {
assertThat(service.resultMethod(21L)).isEqualTo(42L);
} catch (Exception e) {
Assert.fail("Should be compatible with FstCodec");
}
try {
assertThat(service.doSomethingWithSerializablePojo(new SerializablePojo("test")).getStringField()).isEqualTo("test");
} catch (Exception e) {
Assert.fail("Should be compatible with FstCodec");
}
try {
assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test");
Assert.fail("FstCodec should not be able to serialize a not serializable class");
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(EncoderException.class);
assertThat(e.getCause().getMessage()).contains("Pojo does not implement Serializable");
}
} finally {
client.shutdown();
server.shutdown();
}
}
@Test
public void testInvocationWithSerializationCodec() {
RedissonClient server = Redisson.create(createConfig().setCodec(new SerializationCodec()));
RedissonClient client = Redisson.create(createConfig().setCodec(new SerializationCodec()));
try {
server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class);
try {
assertThat(service.resultMethod(21L)).isEqualTo(42L);
} catch (Exception e) {
Assert.fail("Should be compatible with SerializationCodec");
}
try {
assertThat(service.doSomethingWithSerializablePojo(new SerializablePojo("test")).getStringField()).isEqualTo("test");
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Should be compatible with SerializationCodec");
}
try {
assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test");
Assert.fail("SerializationCodec should not be able to serialize a not serializable class");
} catch (Exception e) {
e.printStackTrace();
assertThat(e.getCause()).isInstanceOf(EncoderException.class);
assertThat(e.getCause().getCause()).isInstanceOf(NotSerializableException.class);
assertThat(e.getCause().getCause().getMessage()).contains("Pojo");
}
} finally {
client.shutdown();
server.shutdown();
}
}
@Test
public void testNoAckWithResultInvocations() throws InterruptedException {
RedissonClient server = Redisson.create();
RedissonClient client = Redisson.create();
try {
server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
// no ack but an execution timeout of 1 second
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.SECONDS);
RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options);
service.voidMethod("noAck", 100L);
assertThat(service.resultMethod(21L)).isEqualTo(42);
try {
service.errorMethod();
Assert.fail();
} catch (IOException e) {
assertThat(e.getMessage()).isEqualTo("Checking error throw");
}
try {
service.errorMethodWithCause();
Assert.fail();
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(ArithmeticException.class);
assertThat(e.getCause().getMessage()).isEqualTo("/ by zero");
}
try {
service.timeoutMethod();
Assert.fail("noAck option should still wait for the server to return a response and throw if the execution timeout is exceeded");
} catch (Exception e) {
assertThat(e).isInstanceOf(RemoteServiceTimeoutException.class);
}
} finally {
client.shutdown();
server.shutdown();
}
}
@Test
public void testAckWithoutResultInvocations() throws InterruptedException {
RedissonClient server = Redisson.create();
RedissonClient client = Redisson.create();
try {
server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
// fire and forget with an ack timeout of 1 sec
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.SECONDS).noResult();
RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options);
service.voidMethod("noResult", 100L);
try {
service.resultMethod(100L);
Assert.fail();
} catch (Exception e) {
assertThat(e).isInstanceOf(IllegalArgumentException.class);
}
try {
service.errorMethod();
} catch (IOException e) {
Assert.fail("noResult option should not throw server side exception");
}
try {
service.errorMethodWithCause();
} catch (Exception e) {
Assert.fail("noResult option should not throw server side exception");
}
long time = System.currentTimeMillis();
service.timeoutMethod();
time = System.currentTimeMillis() - time;
assertThat(time).describedAs("noResult option should not wait for the server to return a response").isLessThan(2000);
try {
service.timeoutMethod();
Assert.fail("noResult option should still wait for the server to ack the request and throw if the ack timeout is exceeded");
} catch (Exception e) {
assertThat(e).isInstanceOf(RemoteServiceAckTimeoutException.class);
}
} finally {
client.shutdown();
server.shutdown();
}
}
@Test
public void testNoAckWithoutResultInvocations() throws InterruptedException {
RedissonClient server = Redisson.create();
RedissonClient client = Redisson.create();
try {
server.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
// no ack fire and forget
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult();
RemoteInterface service = client.getRemoteSerivce().get(RemoteInterface.class, options);
RemoteInterface invalidService = client.getRemoteSerivce("Invalid").get(RemoteInterface.class, options);
service.voidMethod("noAck/noResult", 100L);
try {
service.resultMethod(100L);
Assert.fail();
} catch (Exception e) {
assertThat(e).isInstanceOf(IllegalArgumentException.class);
}
try {
service.errorMethod();
} catch (IOException e) {
Assert.fail("noAck with noResult options should not throw server side exception");
}
try {
service.errorMethodWithCause();
} catch (Exception e) {
Assert.fail("noAck with noResult options should not throw server side exception");
}
long time = System.currentTimeMillis();
service.timeoutMethod();
time = System.currentTimeMillis() - time;
assertThat(time).describedAs("noAck with noResult options should not wait for the server to return a response").isLessThan(2000);
try {
invalidService.voidMethod("noAck/noResult", 21L);
} catch (Exception e) {
Assert.fail("noAck with noResult options should not throw any exception even while invoking a service in an unregistered services namespace");
}
} finally {
client.shutdown();
server.shutdown();
}
} }
} }

Loading…
Cancel
Save