RemoteService implemented. #434

pull/499/head
Nikita 9 years ago
parent 9c33b274f9
commit 73452d9974

@ -58,10 +58,10 @@ import org.redisson.core.RListMultimapCache;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RMapCache;
import org.redisson.core.RMultimapCache;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock;
import org.redisson.core.RRemoteService;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSemaphore;
@ -87,6 +87,7 @@ public class Redisson implements RedissonClient {
private final CommandExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Config config;
private final RedissonRemoteService remoteService;
private final UUID id = UUID.randomUUID();
@ -114,6 +115,7 @@ public class Redisson implements RedissonClient {
}
commandExecutor = new CommandSyncService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
remoteService = new RedissonRemoteService(this);
}
private void validate(SingleServerConfig config) {
@ -369,6 +371,10 @@ public class Redisson implements RedissonClient {
return new RedissonScript(commandExecutor);
}
public RRemoteService getRemoteSerivce() {
return remoteService;
}
@Override
public <V> RSortedSet<V> getSortedSet(String name) {
return new RedissonSortedSet<V>(commandExecutor, name);
@ -501,6 +507,7 @@ public class Redisson implements RedissonClient {
@Override
public void shutdown() {
remoteService.shutdown();
connectionManager.shutdown();
}

@ -45,6 +45,7 @@ import org.redisson.core.RMapCache;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock;
import org.redisson.core.RRemoteService;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSemaphore;
@ -585,6 +586,13 @@ public interface RedissonClient {
*/
RScript getScript();
/**
* Returns object for remote operations
*
* @return
*/
RRemoteService getRemoteSerivce();
/**
* Return batch object which executes group of
* command in pipeline.

@ -0,0 +1,152 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.core.MessageListener;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RRemoteService;
import org.redisson.core.RTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
public class RedissonRemoteService implements RRemoteService {
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Queue<Future<RemoteServiceRequest>> futures = new ConcurrentLinkedQueue<Future<RemoteServiceRequest>>();
private final Redisson redisson;
public RedissonRemoteService(Redisson redisson) {
this.redisson = redisson;
}
@Override
public <T> void register(Class<T> serviceInterface, T object) {
for (Method method : serviceInterface.getMethods()) {
RemoteServiceMethod value = new RemoteServiceMethod(method, object);
RemoteServiceKey key = new RemoteServiceKey(serviceInterface, method.getName());
if (beans.put(key, value) != null) {
return;
}
}
String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
subscribe(serviceInterface, requestQueue);
}
private <T> void subscribe(final Class<T> serviceInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) {
Future<RemoteServiceRequest> take = requestQueue.takeAsync();
futures.add(take);
take.addListener(new FutureListener<RemoteServiceRequest>() {
@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
if (!future.isSuccess()) {
return;
}
RemoteServiceRequest request = future.getNow();
RemoteServiceMethod method = beans.get(new RemoteServiceKey(serviceInterface, request.getMethodName()));
String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + request.getRequestId();
RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
RemoteServiceResponse response;
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
response = new RemoteServiceResponse(result);
} catch (Exception e) {
e.getCause().printStackTrace();
response = new RemoteServiceResponse(e.getCause());
log.error("Can't execute: " + method.getMethod().getName() + " with args: " + request.getArgs(), e);
}
long clients = topic.publish(response);
if (clients == 0) {
log.error("None of clients has not received a response for request {}", request);
}
futures.remove(future);
subscribe(serviceInterface, requestQueue);
}
});
}
@Override
public <T> T get(final Class<T> serviceInterface) {
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String requestId = generateRequestId();
System.out.println(requestId);
String requestQueueName = "redisson_remote_service:{" + serviceInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
requestQueue.add(new RemoteServiceRequest(requestId, method.getName(), args));
String responseName = "redisson_remote_service:{" + serviceInterface.getName() + "}:" + requestId;
final RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<RemoteServiceResponse> response = new AtomicReference<RemoteServiceResponse>();
int listenerId = topic.addListener(new MessageListener<RemoteServiceResponse>() {
@Override
public void onMessage(String channel, RemoteServiceResponse msg) {
response.set(msg);
latch.countDown();
}
});
latch.await();
topic.removeListener(listenerId);
RemoteServiceResponse msg = response.get();
if (msg.getError() != null) {
throw msg.getError();
}
return msg.getResult();
}
};
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[] {serviceInterface}, handler);
}
private String generateRequestId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
public void shutdown() {
for (Future<RemoteServiceRequest> future : futures) {
future.cancel(true);
}
}
}

@ -0,0 +1,68 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
public class RemoteServiceKey {
private final Class<?> serviceInterface;
private final String methodName;
public RemoteServiceKey(Class<?> serviceInterface, String methodName) {
super();
this.serviceInterface = serviceInterface;
this.methodName = methodName;
}
public String getMethodName() {
return methodName;
}
public Class<?> getServiceInterface() {
return serviceInterface;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((methodName == null) ? 0 : methodName.hashCode());
result = prime * result + ((serviceInterface == null) ? 0 : serviceInterface.getName().hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RemoteServiceKey other = (RemoteServiceKey) obj;
if (methodName == null) {
if (other.methodName != null)
return false;
} else if (!methodName.equals(other.methodName))
return false;
if (serviceInterface == null) {
if (other.serviceInterface != null)
return false;
} else if (!serviceInterface.equals(other.serviceInterface))
return false;
return true;
}
}

@ -0,0 +1,39 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.lang.reflect.Method;
public class RemoteServiceMethod {
private final Object bean;
private final Method method;
public RemoteServiceMethod(Method method, Object bean) {
super();
this.method = method;
this.bean = bean;
}
public Object getBean() {
return bean;
}
public Method getMethod() {
return method;
}
}

@ -0,0 +1,54 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Arrays;
public class RemoteServiceRequest {
private String requestId;
private String methodName;
private Object[] args;
public RemoteServiceRequest() {
}
public RemoteServiceRequest(String requestId, String methodName, Object[] args) {
super();
this.requestId = requestId;
this.methodName = methodName;
this.args = args;
}
public String getRequestId() {
return requestId;
}
public Object[] getArgs() {
return args;
}
public String getMethodName() {
return methodName;
}
@Override
public String toString() {
return "[requestId=" + requestId + ", methodName=" + methodName + ", args="
+ Arrays.toString(args) + "]";
}
}

@ -0,0 +1,42 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
public class RemoteServiceResponse {
private Object result;
private Throwable error;
public RemoteServiceResponse() {
}
public RemoteServiceResponse(Object result) {
this.result = result;
}
public RemoteServiceResponse(Throwable error) {
this.error = error;
}
public Throwable getError() {
return error;
}
public Object getResult() {
return result;
}
}

@ -84,12 +84,12 @@ public class CommandDecoder extends ReplayingDecoder<State> {
currentDecoder = StringCodec.INSTANCE.getValueDecoder();
}
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
}
if (state() == null) {
state(new State());
if (log.isTraceEnabled()) {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
}
}
state().setDecoderState(null);

@ -23,8 +23,15 @@ import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.JsonIdentityReference;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MapperFeature;
@ -32,6 +39,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTypeResolverBuilder;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
import io.netty.buffer.ByteBuf;
@ -49,6 +57,12 @@ public class JsonJacksonCodec implements Codec {
public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec();
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.PUBLIC_ONLY, setterVisibility = Visibility.PUBLIC_ONLY, isGetterVisibility = Visibility.PUBLIC_ONLY)
public static class ThrowableMixIn {
}
private final ObjectMapper mapObjectMapper = initObjectMapper();
protected ObjectMapper initObjectMapper() {
@ -111,6 +125,7 @@ public class JsonJacksonCodec implements Codec {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.WRITE_BIGDECIMAL_AS_PLAIN, true);
objectMapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
objectMapper.addMixIn(Throwable.class, ThrowableMixIn.class);
}
@Override

@ -0,0 +1,36 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public interface RRemoteService {
/**
* Register object as remote service
*
* @param remoteInterface
* @param object
*/
<T> void register(Class<T> remoteInterface, T object);
/**
* Get remote service object for remote invocations
*
* @param remoteInterface
* @return
*/
<T> T get(Class<T> remoteInterface);
}

@ -0,0 +1,83 @@
package org.redisson;
import org.junit.Assert;
import org.junit.Test;
import static org.assertj.core.api.Assertions.*;
import java.io.IOException;
public class RedissonRemoteServiceTest extends BaseTest {
public interface RemoteInterface {
void voidMethod(String name, Long param);
Long resultMethod(Long value);
void errorMethod() throws IOException;
void errorMethodWithCause();
}
public class RemoteImpl implements RemoteInterface {
@Override
public void voidMethod(String name, Long param) {
System.out.println(name + " " + param);
}
@Override
public Long resultMethod(Long value) {
return value*2;
}
@Override
public void errorMethod() throws IOException {
throw new IOException("Checking error throw");
}
@Override
public void errorMethodWithCause() {
try {
int s = 2 / 0;
} catch (Exception e) {
throw new RuntimeException("Checking error throw", e);
}
}
}
@Test
public void testInvocations() {
RedissonClient r1 = Redisson.create();
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
RedissonClient r2 = Redisson.create();
RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class);
ri.voidMethod("someName", 100L);
assertThat(ri.resultMethod(100L)).isEqualTo(200);
try {
ri.errorMethod();
Assert.fail();
} catch (IOException e) {
assertThat(e.getMessage()).isEqualTo("Checking error throw");
}
try {
ri.errorMethodWithCause();
Assert.fail();
} catch (Exception e) {
assertThat(e.getCause()).isInstanceOf(ArithmeticException.class);
assertThat(e.getCause().getMessage()).isEqualTo("/ by zero");
}
r1.shutdown();
r2.shutdown();
}
}
Loading…
Cancel
Save