@ -33,7 +33,6 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise ;
import io.netty.util.internal.PlatformDependent ;
import io.netty.util.internal.ThreadLocalRandom ;
/ * *
*
@ -44,9 +43,15 @@ import io.netty.util.internal.ThreadLocalRandom;
public class RedissonDelayedQueue < V > extends RedissonExpirable implements RDelayedQueue < V > {
private final QueueTransferService queueTransferService ;
private final String channelName ;
private final String queueName ;
private final String timeoutSetName ;
protected RedissonDelayedQueue ( QueueTransferService queueTransferService , Codec codec , final CommandAsyncExecutor commandExecutor , String name ) {
super ( codec , commandExecutor , name ) ;
channelName = prefixName ( "redisson_delay_queue_channel" , getName ( ) ) ;
queueName = prefixName ( "redisson_delay_queue" , getName ( ) ) ;
timeoutSetName = prefixName ( "redisson_delay_queue_timeout" , getName ( ) ) ;
QueueTransferTask task = new QueueTransferTask ( commandExecutor . getConnectionManager ( ) ) {
@ -68,33 +73,21 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return v[2]; "
+ "end "
+ "return nil;" ,
Arrays . < Object > asList ( getName ( ) , getTimeoutSetName( ) , getQueueName ( ) ) ,
Arrays . < Object > asList ( getName ( ) , timeoutSetName, queueName ) ,
System . currentTimeMillis ( ) , 100 ) ;
}
@Override
protected RTopic < Long > getTopic ( ) {
return new RedissonTopic < Long > ( LongCodec . INSTANCE , commandExecutor , getChannelName( ) ) ;
return new RedissonTopic < Long > ( LongCodec . INSTANCE , commandExecutor , channelName ) ;
}
} ;
queueTransferService . schedule ( getQueueName( ) , task ) ;
queueTransferService . schedule ( queueName , task ) ;
this . queueTransferService = queueTransferService ;
}
private String getChannelName ( ) {
return prefixName ( "redisson_delay_queue_channel" , getName ( ) ) ;
}
private String getQueueName ( ) {
return prefixName ( "redisson_delay_queue" , getName ( ) ) ;
}
private String getTimeoutSetName ( ) {
return prefixName ( "redisson_delay_queue_timeout" , getName ( ) ) ;
}
public void offer ( V e , long delay , TimeUnit timeUnit ) {
get ( offerAsync ( e , delay , timeUnit ) ) ;
}
@ -115,7 +108,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;"
,
Arrays . < Object > asList ( getName ( ) , getTimeoutSetName( ) , getQueueName ( ) , getChannelName ( ) ) ,
Arrays . < Object > asList ( getName ( ) , timeoutSetName, queueName , channelName ) ,
timeout , randomId , encode ( e ) ) ;
}
@ -180,7 +173,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;" ,
Arrays . < Object > asList ( getQueueName( ) ) , index ) ) ;
Arrays . < Object > asList ( queueName ) , index ) ) ;
}
void remove ( int index ) {
@ -191,7 +184,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"redis.call('lrem', KEYS[1], 1, v);" +
"redis.call('zrem', KEYS[2], v);" +
"end; " ,
Arrays . < Object > asList ( getQueueName( ) , getTimeoutSetName ( ) ) , index ) ) ;
Arrays . < Object > asList ( queueName, timeoutSetName ) , index ) ) ;
}
@Override
@ -268,7 +261,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "table.insert(result, value);"
+ "end; "
+ "return result; " ,
Collections . < Object > singletonList ( getQueueName( ) ) ) ;
Collections . < Object > singletonList ( queueName ) ) ;
}
@Override
@ -294,7 +287,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "end; "
+ "end;" +
"return 0;" ,
Arrays . < Object > asList ( getQueueName( ) , getTimeoutSetName ( ) ) , encode ( o ) ) ;
Arrays . < Object > asList ( queueName, timeoutSetName ) , encode ( o ) ) ;
}
@Override
@ -316,7 +309,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "end; "
+ "end;" +
"return #ARGV == 0 and 1 or 0;" ,
Collections . < Object > singletonList ( getQueueName( ) ) , encode ( c ) . toArray ( ) ) ;
Collections . < Object > singletonList ( queueName ) , encode ( c ) . toArray ( ) ) ;
}
@Override
@ -356,7 +349,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "i = i + 1;"
+ "end; "
+ "return result;" ,
Arrays . < Object > asList ( getQueueName( ) , getTimeoutSetName ( ) ) , encode ( c ) . toArray ( ) ) ;
Arrays . < Object > asList ( queueName, timeoutSetName ) , encode ( c ) . toArray ( ) ) ;
}
@Override
@ -395,7 +388,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "i = i + 1; "
+ "end; "
+ "return changed; " ,
Collections . < Object > singletonList ( getQueueName( ) ) , encode ( c ) . toArray ( ) ) ;
Collections . < Object > singletonList ( queueName ) , encode ( c ) . toArray ( ) ) ;
}
@Override
@ -405,9 +398,36 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public RFuture < Boolean > deleteAsync ( ) {
return commandExecutor . writeAsync ( getName ( ) , RedisCommands . DEL_OBJECTS , getQueueName ( ) , getTimeoutSetName ( ) ) ;
return commandExecutor . writeAsync ( getName ( ) , RedisCommands . DEL_OBJECTS , queueName , timeoutSetName ) ;
}
@Override
public RFuture < Boolean > expireAsync ( long timeToLive , TimeUnit timeUnit ) {
return commandExecutor . evalWriteAsync ( getName ( ) , LongCodec . INSTANCE , RedisCommands . EVAL_BOOLEAN ,
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return redis.call('pexpire', KEYS[2], ARGV[1]); " ,
Arrays . < Object > asList ( queueName , timeoutSetName ) ,
timeUnit . toMillis ( timeToLive ) ) ;
}
@Override
public RFuture < Boolean > expireAtAsync ( long timestamp ) {
return commandExecutor . evalWriteAsync ( getName ( ) , LongCodec . INSTANCE , RedisCommands . EVAL_BOOLEAN ,
"redis.call('pexpireat', KEYS[1], ARGV[1]); " +
"return redis.call('pexpireat', KEYS[2], ARGV[1]); " ,
Arrays . < Object > asList ( queueName , timeoutSetName ) ,
timestamp ) ;
}
@Override
public RFuture < Boolean > clearExpireAsync ( ) {
return commandExecutor . evalWriteAsync ( getName ( ) , LongCodec . INSTANCE , RedisCommands . EVAL_BOOLEAN ,
"redis.call('persist', KEYS[1]); " +
"return redis.call('persist', KEYS[2]); " ,
Arrays . < Object > asList ( queueName , timeoutSetName ) ) ;
}
@Override
public RFuture < V > peekAsync ( ) {
return commandExecutor . evalReadAsync ( getName ( ) , codec , RedisCommands . EVAL_OBJECT ,
@ -417,7 +437,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;" ,
Arrays . < Object > asList ( getQueueName( ) ) ) ;
Arrays . < Object > asList ( queueName ) ) ;
}
@Override
@ -430,7 +450,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;" ,
Arrays . < Object > asList ( getQueueName( ) , getTimeoutSetName ( ) ) ) ;
Arrays . < Object > asList ( queueName, timeoutSetName ) ) ;
}
@Override
@ -449,7 +469,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;" ,
Arrays . < Object > asList ( getQueueName ( ) , getTimeoutSetName ( ) , queueName ) ) ;
Arrays . < Object > asList ( this . queueName , timeoutSetName , queueName ) ) ;
}
@Override
@ -464,12 +484,12 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "end; "
+ "end;" +
"return 0;" ,
Collections . < Object > singletonList ( getQueueName( ) ) , encode ( o ) ) ;
Collections . < Object > singletonList ( queueName ) , encode ( o ) ) ;
}
@Override
public RFuture < Integer > sizeAsync ( ) {
return commandExecutor . readAsync ( getName ( ) , codec , RedisCommands . LLEN_INT , getQueueName( ) ) ;
return commandExecutor . readAsync ( getName ( ) , codec , RedisCommands . LLEN_INT , queueName ) ;
}
@Override
@ -489,7 +509,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public void destroy ( ) {
queueTransferService . remove ( getQueueName( ) ) ;
queueTransferService . remove ( queueName ) ;
}
}