initial commit for RedissonReference feature

pull/605/head
jackygurui 9 years ago
parent 3d26419614
commit 5066ded9ec

@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>0.9.1-SNAPSHOT</version>
<version>2.3.0</version>
<relativePath>../</relativePath>
</parent>

@ -67,8 +67,8 @@ import org.redisson.command.CommandSyncService;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.liveobject.provider.CodecProvider;
import org.redisson.liveobject.provider.DefaultCodecProvider;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.DefaultCodecProvider;
import org.redisson.liveobject.provider.DefaultResolverProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.pubsub.SemaphorePubSub;
@ -89,8 +89,8 @@ public class Redisson implements RedissonClient {
protected final ConnectionManager connectionManager;
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap();
protected final CodecProvider liveObjectDefaultCodecProvider = new DefaultCodecProvider();
protected final ResolverProvider liveObjectDefaultResolverProvider = new DefaultResolverProvider();
protected final CodecProvider codecProvider;
protected final ResolverProvider resolverProvider;
protected final Config config;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
@ -103,6 +103,8 @@ public class Redisson implements RedissonClient {
connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandSyncService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
codecProvider = config.getCodecProvider();
resolverProvider = config.getResolverProvider();
}
ConnectionManager getConnectionManager() {
@ -130,7 +132,11 @@ public class Redisson implements RedissonClient {
* @return Redisson instance
*/
public static RedissonClient create(Config config) {
return new Redisson(config);
Redisson redisson = new Redisson(config);
if (config.isRedissonReferenceEnabled()) {
redisson.enableRedissonReferenceSupport();
}
return redisson;
}
/**
@ -478,7 +484,7 @@ public class Redisson implements RedissonClient {
@Override
public RLiveObjectService getLiveObjectService() {
return new RedissonLiveObjectService(this, liveObjectClassCache, liveObjectDefaultCodecProvider, liveObjectDefaultResolverProvider);
return new RedissonLiveObjectService(this, liveObjectClassCache, codecProvider, resolverProvider);
}
@Override
@ -502,6 +508,16 @@ public class Redisson implements RedissonClient {
return config;
}
@Override
public CodecProvider getCodecProvider() {
return codecProvider;
}
@Override
public ResolverProvider getResolverProvider() {
return resolverProvider;
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);
@ -525,5 +541,9 @@ public class Redisson implements RedissonClient {
return connectionManager.isShuttingDown();
}
protected void enableRedissonReferenceSupport() {
this.commandExecutor.enableRedissonReferenceSupport(this);
}
}

@ -29,7 +29,7 @@ import org.redisson.liveobject.LiveObjectTemplate;
import org.redisson.liveobject.core.AccessorInterceptor;
import org.redisson.liveobject.core.LiveObjectInterceptor;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.provider.CodecProvider;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.liveobject.resolver.Resolver;

@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.redisson.liveobject.provider.CodecProvider;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
/**
@ -674,6 +674,21 @@ public interface RedissonClient {
*/
Config getConfig();
/**
* Returns the CodecProvider instance
*
* @return CodecProvider
*/
public CodecProvider getCodecProvider();
/**
* Returns the ResolverProvider instance
*
* @return resolverProvider
*/
public ResolverProvider getResolverProvider();
/**
* Get Redis nodes group for server operations
*

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.liveobject.provider;
package org.redisson.codec;
import org.redisson.client.codec.Codec;
import org.redisson.api.RObject;

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.liveobject.provider;
package org.redisson.codec;
import io.netty.util.internal.PlatformDependent;
import java.util.concurrent.ConcurrentMap;

@ -29,6 +29,7 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import io.netty.util.concurrent.Future;
import org.redisson.Redisson;
/**
*
@ -39,6 +40,8 @@ public interface CommandAsyncExecutor {
ConnectionManager getConnectionManager();
CommandAsyncExecutor enableRedissonReferenceSupport(Redisson redisson);
<V> RedisException convertException(Future<V> RFuture);
boolean await(Future<?> RFuture, long timeout, TimeUnit timeoutUnit) throws InterruptedException;

@ -60,6 +60,10 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import org.redisson.Redisson;
import org.redisson.RedissonReference;
import org.redisson.api.RObject;
import org.redisson.liveobject.misc.RedissonObjectFactory;
/**
*
@ -71,6 +75,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
final ConnectionManager connectionManager;
private Redisson redisson;
public CommandAsyncService(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
@ -81,6 +86,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return connectionManager;
}
@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(Redisson redisson) {
if (redisson != null) {
this.redisson = redisson;
}
return this;
}
@Override
public <V> V get(Future<V> future) {
final CountDownLatch l = new CountDownLatch(1);
@ -437,6 +450,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
final AsyncDetails<V, R> details = AsyncDetails.acquire();
if (redisson != null) {
for (int i = 0; i < params.length; i++) {
RedissonReference reference = RedissonObjectFactory.toReference(redisson, params[i]);
params[i] = reference == null ? params[i] : reference;
}
}
details.init(connectionFuture, attemptPromise,
readOnlyMode, source, codec, command, params, mainPromise, attempt);
@ -711,7 +730,15 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
((RedisClientResult)res).setRedisClient(addr);
}
details.getMainPromise().trySuccess(res);
if (redisson != null && res instanceof RedissonReference) {
try {
details.getMainPromise().trySuccess(RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) res));
} catch (Exception exception) {
details.getMainPromise().trySuccess(res);//fallback
}
} else {
details.getMainPromise().trySuccess(res);
}
} else {
details.getMainPromise().tryFailure(future.cause());
}

@ -25,6 +25,10 @@ import org.redisson.client.codec.Codec;
import org.redisson.codec.JsonJacksonCodec;
import io.netty.channel.EventLoopGroup;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.DefaultCodecProvider;
import org.redisson.liveobject.provider.DefaultResolverProvider;
import org.redisson.liveobject.provider.ResolverProvider;
/**
* Redisson configuration
@ -53,7 +57,23 @@ public class Config {
* Redis key/value codec. JsonJacksonCodec used by default
*/
private Codec codec;
/**
* For codec registry and look up. DefaultCodecProvider used by default
*/
private CodecProvider codecProvider = new DefaultCodecProvider();
/**
* For resolver registry and look up. DefaultResolverProvider used by default
*/
private ResolverProvider resolverProvider = new DefaultResolverProvider();
/**
* Config option for enabling Redisson Reference feature.
* Default value is TRUE
*/
private boolean redissonReferenceEnabled = true;
private boolean useLinuxNativeEpoll;
private EventLoopGroup eventLoopGroup;
@ -72,6 +92,9 @@ public class Config {
setThreads(oldConf.getThreads());
setCodec(oldConf.getCodec());
setCodecProvider(oldConf.getCodecProvider());
setResolverProvider(oldConf.getResolverProvider());
setRedissonReferenceEnabled(oldConf.redissonReferenceEnabled);
setEventLoopGroup(oldConf.getEventLoopGroup());
if (oldConf.getSingleServerConfig() != null) {
setSingleServerConfig(new SingleServerConfig(oldConf.getSingleServerConfig()));
@ -104,7 +127,67 @@ public class Config {
public Codec getCodec() {
return codec;
}
/**
* For codec registry and look up. DefaultCodecProvider used by default.
*
* @param codecProvider
* @return this
* @see org.redisson.codec.CodecProvider
*/
public Config setCodecProvider(CodecProvider codecProvider) {
this.codecProvider = codecProvider;
return this;
}
/**
* Returns the CodecProvider instance
*
* @return CodecProvider
*/
public CodecProvider getCodecProvider() {
return codecProvider;
}
/**
* For resolver registry and look up. DefaultResolverProvider used by default.
*
* @param resolverProvider
* @return this
*/
public Config setResolverProvider(ResolverProvider resolverProvider) {
this.resolverProvider = resolverProvider;
return this;
}
/**
* Returns the ResolverProvider instance
*
* @return resolverProvider
*/
public ResolverProvider getResolverProvider() {
return resolverProvider;
}
/**
* Config option indicate whether Redisson Reference feature is enabled.
* Default value is TRUE
*
* @return boolean
*/
public boolean isRedissonReferenceEnabled() {
return redissonReferenceEnabled;
}
/**
* Config option for enabling Redisson Reference feature
* Default value is TRUE
* @param redissonReferenceEnabled
*/
public void setRedissonReferenceEnabled(boolean redissonReferenceEnabled) {
this.redissonReferenceEnabled = redissonReferenceEnabled;
}
/**
* Init cluster servers configuration
*

@ -55,7 +55,7 @@ import org.redisson.api.annotation.RObjectField;
import org.redisson.api.annotation.REntity.TransformationMode;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.misc.RedissonObjectFactory;
import org.redisson.liveobject.provider.CodecProvider;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.liveobject.resolver.NamingScheme;
@ -117,7 +117,10 @@ public class AccessorInterceptor {
if (isGetter(method, fieldName)) {
Object result = liveMap.get(fieldName);
if (result instanceof RedissonReference) {
return RedissonObjectFactory.create(redisson, codecProvider, resolverProvider, (RedissonReference) result, method.getReturnType());
return RedissonObjectFactory.fromReference(redisson, codecProvider, resolverProvider, (RedissonReference) result, method.getReturnType());
// if (BitSet.class.isAssignableFrom(method.getReturnType()) && RBitSet.class.isAssignableFrom(((RedissonReference) result).getType())) {
// return ((RBitSet) rObject).asBitSet();
// }
}
return result;
}
@ -144,7 +147,7 @@ public class AccessorInterceptor {
if (mappedClass != null) {
Entry<NamingScheme, Codec> entry = getFieldNamingSchemeAndCodec(me.getClass().getSuperclass(), mappedClass, fieldName);
RObject obj = RedissonObjectFactory
.create(redisson,
.createRObject(redisson,
mappedClass,
entry.getKey().getFieldReferenceName(me.getClass().getSuperclass(),
idFieldType,

@ -29,7 +29,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.REntity;
import org.redisson.liveobject.provider.CodecProvider;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.resolver.NamingScheme;
/**

@ -17,15 +17,20 @@ package org.redisson.liveobject.misc;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import org.redisson.RedissonReference;
import org.redisson.client.codec.Codec;
import org.redisson.api.RBitSet;
import org.redisson.api.RLiveObject;
import org.redisson.api.RLiveObjectService;
import org.redisson.api.RObject;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.REntity;
import org.redisson.liveobject.provider.CodecProvider;
import org.redisson.api.annotation.RId;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.liveobject.resolver.NamingScheme;
@ -34,54 +39,109 @@ import org.redisson.liveobject.resolver.NamingScheme;
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedissonObjectFactory {
public static <T> T create(RedissonClient redisson, CodecProvider codecProvider, ResolverProvider resolverProvider, RedissonReference rr, Class<?> expected) throws Exception {
private static final HashMap<Class, HashMap<Boolean, Method>> builders = new HashMap<Class, HashMap<Boolean, Method>>();
static {
for (Method method : RedissonClient.class.getDeclaredMethods()) {
if (!method.getReturnType().equals(Void.TYPE)
&& RObject.class.isAssignableFrom(method.getReturnType())
&& method.getName().startsWith("get")) {
Class<?> cls = method.getReturnType();
if (!builders.containsKey(cls)) {
builders.put(cls, new HashMap<Boolean, Method>());
}
HashMap<Boolean, Method> builder = builders.get(cls);
if (method.getParameterTypes().length == 2
&& Codec.class.isAssignableFrom(method.getParameterTypes()[1])) {
builder.put(Boolean.FALSE, method);
} else if (method.getParameterTypes().length == 1) {
builder.put(Boolean.TRUE, method);
}
}
}
}
public static <T> T fromReference(RedissonClient redisson, RedissonReference rr) throws Exception {
return fromReference(redisson, rr, null);
}
public static <T> T fromReference(RedissonClient redisson, RedissonReference rr, Class<?> expected) throws Exception {
return fromReference(redisson, null, null, rr, expected);
}
public static <T> T fromReference(RedissonClient redisson, CodecProvider codecProvider, ResolverProvider resolverProvider, RedissonReference rr, Class<?> expected) throws Exception {
Class<? extends Object> type = rr.getType();
if (codecProvider == null) {
codecProvider = redisson.getConfig().getCodecProvider();
}
if (type != null) {
if (type.isAnnotationPresent(REntity.class)) {
RLiveObjectService liveObjectService = resolverProvider == null
? redisson.getLiveObjectService()
: redisson.getLiveObjectService(codecProvider, resolverProvider);
REntity anno = type.getAnnotation(REntity.class);
NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class)
.newInstance(codecProvider.getCodec(anno, rr.getType()));
return (T) redisson.getLiveObjectService(codecProvider, resolverProvider).getOrCreate(type, ns.resolveId(rr.getKeyName()));
.newInstance(codecProvider.getCodec(anno, type));
return (T) liveObjectService.getOrCreate(type, ns.resolveId(rr.getKeyName()));
}
List<Class<?>> interfaces = Arrays.asList(rr.getType().getInterfaces());
for (Method method : RedissonClient.class.getDeclaredMethods()) {
if (method.getName().startsWith("get")
&& method.getReturnType().isAssignableFrom(type)
&& expected.isAssignableFrom(method.getReturnType())
&& interfaces.contains(method.getReturnType())) {
if ((rr.isDefaultCodec() || RBitSet.class.isAssignableFrom(method.getReturnType())) && method.getParameterTypes().length == 1) {
return (T) method.invoke(redisson, rr.getKeyName());
} else if (!rr.isDefaultCodec()
&& method.getParameterTypes().length == 2
&& String.class.equals(method.getParameterTypes()[0])
&& Codec.class.equals(method.getParameterTypes()[1])) {
return (T) method.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType()));
}
List<Class<?>> interfaces = Arrays.asList(type.getInterfaces());
for (Class<?> iType : interfaces) {
if (builders.containsKey(iType)) {// user cache to speed up things a little.
Method builder = builders.get(iType).get(rr.isDefaultCodec() || RBitSet.class.isAssignableFrom(rr.getType()));
return (T) (rr.isDefaultCodec() || RBitSet.class.isAssignableFrom(rr.getType())
? builder.invoke(redisson, rr.getKeyName())
: builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType())));
}
}
}
throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName());
}
public static <T extends RObject, K extends Codec> T create(RedissonClient redisson, Class<T> expectedType, String name, K codec) throws Exception {
public static RedissonReference toReference(RedissonClient redisson, Object object) {
if (object instanceof RObject) {
RObject rObject = ((RObject) object);
redisson.getCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec());
return new RedissonReference(object.getClass(), ((RObject) object).getName(), ((RObject) object).getCodec());
}
try {
if (object.getClass().getSuperclass().isAnnotationPresent(REntity.class)) {
Class<? extends Object> rEntity = object.getClass().getSuperclass();
REntity anno = rEntity.getAnnotation(REntity.class);
NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class)
.newInstance(redisson.getCodecProvider().getCodec(anno, (Class) rEntity));
String name = Introspectior
.getFieldsWithAnnotation(rEntity, RId.class)
.getOnly().getName();
Class<?> type = rEntity.getDeclaredField(name).getType();
return new RedissonReference(rEntity,
ns.getName(rEntity, type, name, ((RLiveObject) object).getLiveObjectId()));
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
return null;
}
public static <T extends RObject, K extends Codec> T createRObject(RedissonClient redisson, Class<T> expectedType, String name, K codec) throws Exception {
List<Class<?>> interfaces = Arrays.asList(expectedType.getInterfaces());
for (Method method : RedissonClient.class.getDeclaredMethods()) {
if (method.getName().startsWith("get")
&& method.getReturnType().isAssignableFrom(expectedType)
&& interfaces.contains(method.getReturnType())) {
if ((codec == null || RBitSet.class.isAssignableFrom(method.getReturnType())) && method.getParameterTypes().length == 1) {
return (T) method.invoke(redisson, name);
} else if (codec != null
&& method.getParameterTypes().length == 2
&& String.class.equals(method.getParameterTypes()[0])
&& Codec.class.equals(method.getParameterTypes()[1])) {
return (T) method.invoke(redisson, name, codec);
}
if (method.getName().startsWith("get")
&& method.getReturnType().isAssignableFrom(expectedType)
&& interfaces.contains(method.getReturnType())) {
if ((codec == null || RBitSet.class.isAssignableFrom(method.getReturnType())) && method.getParameterTypes().length == 1) {
return (T) method.invoke(redisson, name);
} else if (codec != null
&& method.getParameterTypes().length == 2
&& String.class.equals(method.getParameterTypes()[0])
&& Codec.class.equals(method.getParameterTypes()[1])) {
return (T) method.invoke(redisson, name, codec);
}
}
}
throw new ClassNotFoundException("No RObject is found to match class type of " + (expectedType != null ? expectedType.getName() : "null") + " with codec type of " + (codec != null ? codec.getClass().getName() : "null"));
}
}

@ -0,0 +1,29 @@
package org.redisson;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import org.junit.Test;
import org.redisson.api.RBucket;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedissonReferenceTest extends BaseTest {
@Test
public void test() {
RBucket<Object> b1 = redisson.getBucket("b1");
RBucket<Object> b2 = redisson.getBucket("b2");
RBucket<Object> b3 = redisson.getBucket("b3");
b2.set(b3);
b1.set(redisson.getBucket("b2"));
assertTrue(b1.get().getClass().equals(RedissonBucket.class));
assertEquals("b3", ((RBucket) ((RBucket) b1.get()).get()).getName());
RBucket<Object> b4 = redisson.getBucket("b4");
b4.set(redisson.getMapCache("testCache"));
assertTrue(b4.get() instanceof RedissonMapCache);
((RedissonMapCache) b4.get()).fastPut(b1, b2, 1, TimeUnit.MINUTES);
assertEquals("b2", ((RBucket)((RedissonMapCache) b4.get()).get(b1)).getName());
}
}
Loading…
Cancel
Save