RemoteService ack support added

pull/456/head
Nikita 9 years ago
parent e909110798
commit bb7a14c4ba

@ -23,12 +23,18 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.core.MessageListener;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RRemoteService;
import org.redisson.core.RTopic;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceAckTimeoutException;
import org.redisson.remote.RemoteServiceKey;
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RemoteServiceTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -81,28 +87,68 @@ public class RedissonRemoteService implements RRemoteService {
@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
}
subscribe(remoteInterface, requestQueue);
return;
}
subscribe(remoteInterface, requestQueue);
RemoteServiceRequest request = future.getNow();
if (System.currentTimeMillis() - request.getDate() > request.getAckTimeout()) {
log.debug("request: {} has been skipped due to ackTimeout");
return;
}
RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId();
RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
RemoteServiceResponse response;
RTopic<RRemoteServiceResponse> topic = redisson.getTopic(responseName);
Future<Long> ackClientsFuture = topic.publishAsync(new RemoteServiceAck());
ackClientsFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't send ack for request: " + request, future.cause());
return;
}
if (future.getNow() == 0) {
log.error("Client has not received ack for request: {}", request);
return;
}
invokeMethod(request, method, topic);
}
});
}
});
}
private void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method,
RTopic<RRemoteServiceResponse> topic) {
final AtomicReference<RemoteServiceResponse> responseHolder = new AtomicReference<RemoteServiceResponse>();
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
response = new RemoteServiceResponse(result);
RemoteServiceResponse response = new RemoteServiceResponse(result);
responseHolder.set(response);
} catch (Exception e) {
response = new RemoteServiceResponse(e.getCause());
RemoteServiceResponse response = new RemoteServiceResponse(e.getCause());
responseHolder.set(response);
log.error("Can't execute: " + request, e);
}
long clients = topic.publish(response);
if (clients == 0) {
log.error("None of clients has not received a response: {} for request: {}", response, request);
Future<Long> clientsFuture = topic.publishAsync(responseHolder.get());
clientsFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
subscribe(remoteInterface, requestQueue);
if (future.getNow() == 0) {
log.error("None of clients has not received a response: {} for request: {}", responseHolder.get(), request);
}
}
});
}
@ -113,7 +159,12 @@ public class RedissonRemoteService implements RRemoteService {
}
@Override
public <T> T get(final Class<T> remoteInterface, final int timeout, final TimeUnit timeUnit) {
public <T> T get(final Class<T> remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit) {
return get(remoteInterface, executionTimeout, executionTimeUnit, 1, TimeUnit.SECONDS);
}
public <T> T get(final Class<T> remoteInterface, final long executionTimeout, final TimeUnit executionTimeUnit,
final long ackTimeout, final TimeUnit ackTimeUnit) {
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
@ -121,31 +172,41 @@ public class RedissonRemoteService implements RRemoteService {
String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args);
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, ackTimeUnit.toMillis(ackTimeout), System.currentTimeMillis());
requestQueue.add(request);
String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId;
final RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
final CountDownLatch ackLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<RemoteServiceResponse> response = new AtomicReference<RemoteServiceResponse>();
int listenerId = topic.addListener(new MessageListener<RemoteServiceResponse>() {
final AtomicReference<RRemoteServiceResponse> response = new AtomicReference<RRemoteServiceResponse>();
final RTopic<RRemoteServiceResponse> topic = redisson.getTopic(responseName);
int listenerId = topic.addListener(new MessageListener<RRemoteServiceResponse>() {
@Override
public void onMessage(String channel, RemoteServiceResponse msg) {
public void onMessage(String channel, RRemoteServiceResponse msg) {
if (msg instanceof RemoteServiceResponse) {
response.set(msg);
latch.countDown();
} else {
ackLatch.countDown();
}
}
});
if (timeout == -1) {
if (!ackLatch.await(ackTimeout, ackTimeUnit)) {
topic.removeListener(listenerId);
throw new RemoteServiceAckTimeoutException("No ACK response after " + ackTimeUnit.toMillis(ackTimeout) + "ms for request: " + request);
}
if (executionTimeout == -1) {
latch.await();
} else {
if (!latch.await(timeout, timeUnit)) {
if (!latch.await(executionTimeout, executionTimeUnit)) {
topic.removeListener(listenerId);
throw new RedisTimeoutException("No response after " + timeUnit.toMillis(timeout) + "ms for request: " + request);
throw new RemoteServiceTimeoutException("No response after " + executionTimeUnit.toMillis(executionTimeout) + "ms for request: " + request);
}
}
topic.removeListener(listenerId);
RemoteServiceResponse msg = response.get();
RemoteServiceResponse msg = (RemoteServiceResponse) response.get();
if (msg.getError() != null) {
throw msg.getError();
}

@ -0,0 +1,11 @@
package org.redisson;
public class RedissonShutdownException extends RuntimeException {
private static final long serialVersionUID = -2694051226420789395L;
public RedissonShutdownException(String message) {
super(message);
}
}

@ -124,6 +124,7 @@ public class JsonJacksonCodec implements Codec {
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
objectMapper.addMixIn(Throwable.class, ThrowableMixIn.class);
}

@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedisClientResult;
import org.redisson.RedissonShutdownException;
import org.redisson.SlotCallback;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
@ -354,7 +355,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
mainPromise.setFailure(new RedissonShutdownException("Redisson is shutdown"));
return;
}

@ -677,8 +677,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void shutdown() {
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownLatch.closeAndAwaitUninterruptibly();
shutdownLatch.awaitUninterruptibly();
for (MasterSlaveEntry entry : entries.values()) {
entry.shutdown();
}

@ -17,6 +17,46 @@ package org.redisson.core;
import java.util.concurrent.TimeUnit;
import org.redisson.remote.RemoteServiceAckTimeoutException;
/**
* Allows to execute object methods remotely between Redisson instances (Server side and Client side instances in terms of remote invocation).
* <p/>
* <b>1. Server side instance (worker instance).</b> Register object with RRemoteService instance.
* <p/>
* <code>
* RRemoteService remoteService = redisson.getRemoteService();<br/>
* <br/>
* // register remote service before any remote invocation<br/>
* remoteService.register(SomeServiceInterface.class, someServiceImpl);
* </code>
* <p/>
* <b>2. Client side instance.</b> Invokes method remotely.
* <p/>
* <code>
* RRemoteService remoteService = redisson.getRemoteService();<br/>
* SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);<br/>
* <br/>
* String result = service.doSomeStuff(1L, "secondParam", new AnyParam());
* </code>
* <p/>
* <p/>
* There are two timeouts during execution:
* <p/>
* <b>Acknowledge (Ack) timeout.</b>Client side instance waits for acknowledge message from Server side instance.
* <p/>
* If acknowledge has not been received by Client side instance then <code>RemoteServiceAckTimeoutException</code> will be thrown.
* And next invocation attempt can be made.
* <p/>
* If acknowledge has not been received Client side instance but Server side instance has received invocation message already.
* In this case invocation will be skipped, due to ack timeout checking by Server side instance.
* <p/>
* <b>Execution timeout.</b> Client side instance received acknowledge message. If it hasn't received any result or error
* from server side during execution timeout then <code>RemoteServiceTimeoutException</code> will be thrown.
*
* @author Nikita Koksharov
*
*/
public interface RRemoteService {
/**
@ -37,7 +77,8 @@ public interface RRemoteService {
<T> void register(Class<T> remoteInterface, T object, int executorsAmount);
/**
* Get remote service object for remote invocations
* Get remote service object for remote invocations.
* Uses ack timeout = 1000 ms by default
*
* @param remoteInterface
* @return
@ -46,13 +87,26 @@ public interface RRemoteService {
/**
* Get remote service object for remote invocations
* with specified invocation timeout
* with specified invocation timeout. Uses ack timeout = 1000 ms by default
*
* @param remoteInterface
* @param executionTimeout - invocation timeout
* @param executionTimeUnit
* @return
*/
<T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit);
/**
* Get remote service object for remote invocations
* with specified invocation and ack timeouts
*
* @param remoteInterface
* @param timeout - invocation timeout
* @param timeUnit
* @param executionTimeout - invocation timeout
* @param executionTimeUnit
* @param ackTimeout - ack timeout
* @param ackTimeUnit
* @return
*/
<T> T get(Class<T> remoteInterface, int timeout, TimeUnit timeUnit);
<T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout, TimeUnit ackTimeUnit);
}

@ -83,9 +83,12 @@ public class InfinitySemaphoreLatch extends AbstractQueuedSynchronizer {
return closed;
}
// waiting for an open state
public final boolean closeAndAwaitUninterruptibly() {
public void close() {
closed = true;
}
// waiting for an open state
public final boolean awaitUninterruptibly() {
try {
return await(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {

@ -0,0 +1,5 @@
package org.redisson.remote;
public interface RRemoteServiceResponse {
}

@ -0,0 +1,11 @@
package org.redisson.remote;
/**
* Worker sends this message when it has received a {@link RemoteServiceRequest}.
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceAck implements RRemoteServiceResponse {
}

@ -0,0 +1,21 @@
package org.redisson.remote;
/**
* Rises when remote method executor has not answered
* within Ack timeout.
* <p/>
* Method invocation has not been started in this case.
* So a new invocation attempt can be made.
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceAckTimeoutException extends RuntimeException {
private static final long serialVersionUID = 1820133675653636587L;
public RemoteServiceAckTimeoutException(String message) {
super(message);
}
}

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
package org.redisson.remote;
public class RemoteServiceKey {

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
package org.redisson.remote;
import java.lang.reflect.Method;

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
package org.redisson.remote;
import java.util.Arrays;
@ -22,15 +22,27 @@ public class RemoteServiceRequest {
private String requestId;
private String methodName;
private Object[] args;
private long ackTimeout;
private long date;
public RemoteServiceRequest() {
}
public RemoteServiceRequest(String requestId, String methodName, Object[] args) {
public RemoteServiceRequest(String requestId, String methodName, Object[] args, long ackTimeout, long date) {
super();
this.requestId = requestId;
this.methodName = methodName;
this.args = args;
this.ackTimeout = ackTimeout;
this.date = date;
}
public long getDate() {
return date;
}
public long getAckTimeout() {
return ackTimeout;
}
public String getRequestId() {
@ -47,8 +59,8 @@ public class RemoteServiceRequest {
@Override
public String toString() {
return "RemoteServiceRequest[requestId=" + requestId + ", methodName=" + methodName + ", args="
+ Arrays.toString(args) + "]";
return "RemoteServiceRequest [requestId=" + requestId + ", methodName=" + methodName + ", args="
+ Arrays.toString(args) + ", ackTimeout=" + ackTimeout + ", date=" + date + "]";
}
}

@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
package org.redisson.remote;
public class RemoteServiceResponse {
public class RemoteServiceResponse implements RRemoteServiceResponse {
private Object result;
private Throwable error;

@ -0,0 +1,17 @@
package org.redisson.remote;
/**
* Rises when invocation timeout has been occurred
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceTimeoutException extends RuntimeException {
private static final long serialVersionUID = -1749266931994840256L;
public RemoteServiceTimeoutException(String message) {
super(message);
}
}

@ -1,14 +1,14 @@
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisTimeoutException;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.remote.RemoteServiceTimeoutException;
public class RedissonRemoteServiceTest extends BaseTest {
public interface RemoteInterface {
@ -59,7 +59,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
@Test(expected = RedisTimeoutException.class)
@Test(expected = RemoteServiceTimeoutException.class)
public void testTimeout() throws InterruptedException {
RedissonClient r1 = Redisson.create();
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());

Loading…
Cancel
Save