From c9dacc40074cddb2aa5f16808e4ae261b07e8b2a Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 10 Jan 2018 06:12:02 +0300 Subject: [PATCH 1/8] refactoring --- .../java/org/redisson/RedissonBaseAdder.java | 4 -- .../org/redisson/RedissonDoubleAdder.java | 61 ++++++------------- .../java/org/redisson/RedissonLongAdder.java | 58 ++++++------------ .../org/redisson/RedissonDoubleAdderTest.java | 1 - 4 files changed, 38 insertions(+), 86 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java index 1bc9e5e51..37c65e45e 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java @@ -102,10 +102,6 @@ public abstract class RedissonBaseAdder extends RedissonExpira protected abstract void doReset(); - public T sum() { - return get(sumAsync()); - } - public void reset() { get(resetAsync()); } diff --git a/redisson/src/main/java/org/redisson/RedissonDoubleAdder.java b/redisson/src/main/java/org/redisson/RedissonDoubleAdder.java index 3c207d3ca..b5a582db6 100644 --- a/redisson/src/main/java/org/redisson/RedissonDoubleAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonDoubleAdder.java @@ -28,38 +28,37 @@ import org.redisson.command.CommandAsyncExecutor; * @author Nikita Koksharov * */ -public class RedissonDoubleAdder extends RedissonExpirable implements RDoubleAdder { +public class RedissonDoubleAdder extends RedissonBaseAdder implements RDoubleAdder { private final DoubleAdder counter = new DoubleAdder(); - private final RedissonBaseAdder adder; + private final RAtomicDouble atomicDouble; public RedissonDoubleAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) { - super(connectionManager, name); + super(connectionManager, name, redisson); - final RAtomicDouble atomicDouble = redisson.getAtomicDouble(getName()); - adder = new RedissonBaseAdder(connectionManager, name, redisson) { - @Override - protected void doReset() { - counter.reset(); - } - - @Override - protected RFuture addAndGetAsync() { - return atomicDouble.getAndAddAsync(counter.sum()); - } + atomicDouble = redisson.getAtomicDouble(getName()); + } - @Override - protected RFuture getAsync() { - return atomicDouble.getAsync(); - } - }; + @Override + protected void doReset() { + counter.reset(); + } + + @Override + protected RFuture addAndGetAsync() { + return atomicDouble.getAndAddAsync(counter.sum()); + } + + @Override + protected RFuture getAsync() { + return atomicDouble.getAsync(); } @Override public void add(double x) { counter.add(x); } - + @Override public void increment() { add(1L); @@ -72,27 +71,7 @@ public class RedissonDoubleAdder extends RedissonExpirable implements RDoubleAdd @Override public double sum() { - return adder.sum(); - } - - @Override - public void reset() { - adder.reset(); - } - - @Override - public RFuture sumAsync() { - return adder.sumAsync(); - } - - @Override - public RFuture resetAsync() { - return adder.resetAsync(); - } - - @Override - public void destroy() { - adder.destroy(); + return get(sumAsync()); } } diff --git a/redisson/src/main/java/org/redisson/RedissonLongAdder.java b/redisson/src/main/java/org/redisson/RedissonLongAdder.java index 8652c528b..c0faeda87 100644 --- a/redisson/src/main/java/org/redisson/RedissonLongAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonLongAdder.java @@ -27,32 +27,30 @@ import org.redisson.misc.LongAdder; * @author Nikita Koksharov * */ -public class RedissonLongAdder extends RedissonExpirable implements RLongAdder { +public class RedissonLongAdder extends RedissonBaseAdder implements RLongAdder { private final RAtomicLong atomicLong; private final LongAdder counter = new LongAdder(); - private final RedissonBaseAdder adder; public RedissonLongAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) { - super(connectionManager, name); + super(connectionManager, name, redisson); atomicLong = redisson.getAtomicLong(getName()); - adder = new RedissonBaseAdder(connectionManager, name, redisson) { - @Override - protected void doReset() { - counter.reset(); - } - - @Override - protected RFuture addAndGetAsync() { - return atomicLong.getAndAddAsync(counter.sum()); - } + } - @Override - protected RFuture getAsync() { - return atomicLong.getAsync(); - } - }; + @Override + protected void doReset() { + counter.reset(); + } + + @Override + protected RFuture addAndGetAsync() { + return atomicLong.getAndAddAsync(counter.sum()); + } + + @Override + protected RFuture getAsync() { + return atomicLong.getAsync(); } @Override @@ -69,30 +67,10 @@ public class RedissonLongAdder extends RedissonExpirable implements RLongAdder { public void decrement() { add(-1L); } - + @Override public long sum() { - return adder.sum(); + return get(sumAsync()); } - @Override - public void reset() { - adder.reset(); - } - - @Override - public RFuture sumAsync() { - return adder.sumAsync(); - } - - @Override - public RFuture resetAsync() { - return adder.resetAsync(); - } - - @Override - public void destroy() { - adder.destroy(); - } - } diff --git a/redisson/src/test/java/org/redisson/RedissonDoubleAdderTest.java b/redisson/src/test/java/org/redisson/RedissonDoubleAdderTest.java index 593d113f9..66113576e 100644 --- a/redisson/src/test/java/org/redisson/RedissonDoubleAdderTest.java +++ b/redisson/src/test/java/org/redisson/RedissonDoubleAdderTest.java @@ -3,7 +3,6 @@ package org.redisson; import org.assertj.core.api.Assertions; import org.junit.Test; import org.redisson.api.RDoubleAdder; -import org.redisson.api.RLongAdder; public class RedissonDoubleAdderTest extends BaseTest { From 626f5785f68b484dcf88db91505911d7043de2a7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 10 Jan 2018 08:08:20 +0300 Subject: [PATCH 2/8] PriorityBlockingDeque implemented. #1235 --- .../src/main/java/org/redisson/Redisson.java | 12 + .../RedissonPriorityBlockingDeque.java | 240 ++++++++++++++++++ .../org/redisson/RedissonPriorityDeque.java | 47 +++- .../redisson/api/RPriorityBlockingDeque.java | 26 ++ .../java/org/redisson/api/RedissonClient.java | 25 +- .../RedissonPriorityBlockingDequeTest.java | 124 +++++++++ 6 files changed, 463 insertions(+), 11 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java create mode 100644 redisson/src/main/java/org/redisson/api/RPriorityBlockingDeque.java create mode 100644 redisson/src/test/java/org/redisson/RedissonPriorityBlockingDequeTest.java diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 43fa4db27..3abf3168a 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -54,6 +54,7 @@ import org.redisson.api.RMap; import org.redisson.api.RMapCache; import org.redisson.api.RPatternTopic; import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RPriorityBlockingDeque; import org.redisson.api.RPriorityBlockingQueue; import org.redisson.api.RPriorityDeque; import org.redisson.api.RPriorityQueue; @@ -654,6 +655,17 @@ public class Redisson implements RedissonClient { public RPriorityBlockingQueue getPriorityBlockingQueue(String name, Codec codec) { return new RedissonPriorityBlockingQueue(codec, connectionManager.getCommandExecutor(), name, this); } + + @Override + public RPriorityBlockingDeque getPriorityBlockingDeque(String name) { + return new RedissonPriorityBlockingDeque(connectionManager.getCommandExecutor(), name, this); + } + + @Override + public RPriorityBlockingDeque getPriorityBlockingDeque(String name, Codec codec) { + return new RedissonPriorityBlockingDeque(codec, connectionManager.getCommandExecutor(), name, this); + } + @Override public RPriorityDeque getPriorityDeque(String name) { diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java new file mode 100644 index 000000000..f36ee1f8f --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java @@ -0,0 +1,240 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.redisson.api.RFuture; +import org.redisson.api.RPriorityBlockingDeque; +import org.redisson.api.RedissonClient; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandExecutor; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; + +/** + *

Distributed and concurrent implementation of priority blocking deque. + * + *

Queue size limited by Redis server memory amount. This is why {@link #remainingCapacity()} always + * returns Integer.MAX_VALUE + * + * @author Nikita Koksharov + */ +public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque implements RPriorityBlockingDeque { + + private final RedissonPriorityBlockingQueue blockingQueue; + + protected RedissonPriorityBlockingDeque(CommandExecutor commandExecutor, String name, RedissonClient redisson) { + super(commandExecutor, name, redisson); + blockingQueue = (RedissonPriorityBlockingQueue) redisson.getPriorityBlockingQueue(name); + } + + protected RedissonPriorityBlockingDeque(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) { + super(codec, commandExecutor, name, redisson); + + blockingQueue = (RedissonPriorityBlockingQueue) redisson.getPriorityBlockingQueue(name, codec); + } + + @Override + public void put(V e) throws InterruptedException { + add(e); + } + + @Override + public boolean offer(V e, long timeout, TimeUnit unit) throws InterruptedException { + return offer(e); + } + + @Override + public RFuture takeAsync() { + return blockingQueue.takeAsync(); + } + + @Override + public V take() throws InterruptedException { + return blockingQueue.take(); + } + + public RFuture pollAsync(long timeout, TimeUnit unit) { + return blockingQueue.pollAsync(timeout, unit); + } + + @Override + public V poll(long timeout, TimeUnit unit) throws InterruptedException { + return blockingQueue.poll(timeout, unit); + } + + @Override + public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { + return blockingQueue.pollLastAndOfferFirstToAsync(queueName, timeout, unit); + } + + @Override + public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { + return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit); + } + + @Override + public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { + return get(takeLastAndOfferFirstToAsync(queueName)); + } + + public RFuture takeLastAndOfferFirstToAsync(String queueName) { + return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); + } + + @Override + public int remainingCapacity() { + return Integer.MAX_VALUE; + } + + @Override + public int drainTo(Collection c) { + return blockingQueue.drainTo(c); + } + + public RFuture drainToAsync(Collection c) { + return blockingQueue.drainToAsync(c); + } + + @Override + public int drainTo(Collection c, int maxElements) { + return blockingQueue.drainTo(c, maxElements); + } + + public RFuture drainToAsync(Collection c, int maxElements) { + return blockingQueue.drainToAsync(c, maxElements); + } + + @Override + public RFuture offerAsync(V e) { + throw new UnsupportedOperationException("use offer method"); + } + + @Override + public RFuture pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public RFuture putAsync(V e) { + throw new UnsupportedOperationException("use add method"); + } + + @Override + public RFuture putFirstAsync(V e) { + return addFirstAsync(e); + } + + @Override + public RFuture putLastAsync(V e) { + return addLastAsync(e); + } + + @Override + public void putFirst(V e) throws InterruptedException { + addFirst(e); + } + + @Override + public void putLast(V e) throws InterruptedException { + addLast(e); + } + + @Override + public boolean offerFirst(V e, long timeout, TimeUnit unit) throws InterruptedException { + addFirst(e); + return true; + } + + @Override + public boolean offerLast(V e, long timeout, TimeUnit unit) throws InterruptedException { + addLast(e); + return true; + } + + @Override + public V takeFirst() throws InterruptedException { + return get(takeFirstAsync()); + } + + @Override + public RFuture takeFirstAsync() { + return takeAsync(); + } + + @Override + public RFuture takeLastAsync() { + RPromise result = new RedissonPromise(); + blockingQueue.takeAsync(result, 0, 0, RedisCommands.RPOP, getName()); + return result; + } + + @Override + public V takeLast() throws InterruptedException { + return get(takeLastAsync()); + } + + @Override + public RFuture pollFirstAsync(long timeout, TimeUnit unit) { + return pollAsync(timeout, unit); + } + + @Override + public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { + return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); + } + + @Override + public RFuture pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { + return pollFromAnyAsync(timeout, unit, queueNames); + } + + @Override + public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { + return get(pollLastFromAnyAsync(timeout, unit, queueNames)); + } + + @Override + public RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { + throw new UnsupportedOperationException(); + } + + @Override + public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException { + return get(pollFirstAsync(timeout, unit)); + } + + @Override + public RFuture pollLastAsync(long timeout, TimeUnit unit) { + RPromise result = new RedissonPromise(); + blockingQueue.takeAsync(result, 0, unit.toMicros(timeout), RedisCommands.RPOP, getName()); + return result; + } + + @Override + public V pollLast(long timeout, TimeUnit unit) throws InterruptedException { + return get(pollLastAsync(timeout, unit)); + } + +} \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java index 2d64d098e..ec95a0e40 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java @@ -20,6 +20,7 @@ import java.util.NoSuchElementException; import org.redisson.api.RFuture; import org.redisson.api.RPriorityDeque; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -38,22 +39,30 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement private static final RedisCommand LRANGE_SINGLE = new RedisCommand("LRANGE", new ListFirstObjectDecoder()); - protected RedissonPriorityDeque(CommandExecutor commandExecutor, String name, Redisson redisson) { + protected RedissonPriorityDeque(CommandExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name, redisson); } - public RedissonPriorityDeque(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) { + public RedissonPriorityDeque(Codec codec, CommandExecutor commandExecutor, String name, RedissonClient redisson) { super(codec, commandExecutor, name, redisson); } + public RFuture addFirstAsync(V e) { + throw new UnsupportedOperationException("use add or put method"); + } + + public RFuture addLastAsync(V e) { + throw new UnsupportedOperationException("use add or put method"); + } + @Override public void addFirst(V e) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("use add or put method"); } @Override public void addLast(V e) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("use add or put method"); } @Override @@ -108,14 +117,22 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement @Override public boolean offerFirst(V e) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("use add or put method"); } + public RFuture offerFirstAsync(V e) { + throw new UnsupportedOperationException("use add or put method"); + } + @Override public boolean offerLast(V e) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("use add or put method"); } + public RFuture offerLastAsync(V e) { + throw new UnsupportedOperationException("use add or put method"); + } + // @Override public RFuture peekFirstAsync() { return getAsync(0); @@ -128,12 +145,20 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement @Override public V peekLast() { - return get(getLastAsync()); + return get(peekLastAsync()); + } + + public RFuture peekLastAsync() { + return getLastAsync(); } @Override public V pollFirst() { - return poll(); + return get(pollFirstAsync()); + } + + public RFuture pollFirstAsync() { + return pollAsync(); } public RFuture pollLastAsync() { @@ -157,9 +182,13 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement @Override public void push(V e) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("use add or put method"); } + public RFuture pushAsync(V e) { + throw new UnsupportedOperationException("use add or put method"); + } + // @Override public RFuture removeFirstOccurrenceAsync(Object o) { return removeAsync(o, 1); diff --git a/redisson/src/main/java/org/redisson/api/RPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/api/RPriorityBlockingDeque.java new file mode 100644 index 000000000..13bf942f0 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RPriorityBlockingDeque.java @@ -0,0 +1,26 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +/** + * RPriorityBlockingDeque backed by Redis + * + * @author Nikita Koksharov + * @param the type of elements held in this collection + */ +public interface RPriorityBlockingDeque extends RBlockingDeque, RPriorityDeque { + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 31648d0cc..80246ce0d 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -612,7 +612,7 @@ public interface RedissonClient { RPriorityQueue getPriorityQueue(String name, Codec codec); /** - * Returns priority unbounded blocking queue instance by name. + * Returns unbounded priority blocking queue instance by name. * It uses comparator to sort objects. * * @param type of value @@ -622,7 +622,7 @@ public interface RedissonClient { RPriorityBlockingQueue getPriorityBlockingQueue(String name); /** - * Returns priority unbounded blocking queue instance by name + * Returns unbounded priority blocking queue instance by name * using provided codec for queue objects. * It uses comparator to sort objects. * @@ -633,6 +633,27 @@ public interface RedissonClient { */ RPriorityBlockingQueue getPriorityBlockingQueue(String name, Codec codec); + /** + * Returns unbounded priority blocking deque instance by name. + * It uses comparator to sort objects. + * + * @param type of value + * @param name of object + * @return Queue object + */ + RPriorityBlockingDeque getPriorityBlockingDeque(String name); + + /** + * Returns unbounded priority blocking deque instance by name + * using provided codec for queue objects. + * It uses comparator to sort objects. + * + * @param type of value + * @param name - name of object + * @param codec - codec for message + * @return Queue object + */ + RPriorityBlockingDeque getPriorityBlockingDeque(String name, Codec codec); /** * Returns priority unbounded deque instance by name. diff --git a/redisson/src/test/java/org/redisson/RedissonPriorityBlockingDequeTest.java b/redisson/src/test/java/org/redisson/RedissonPriorityBlockingDequeTest.java new file mode 100644 index 000000000..10186ffab --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonPriorityBlockingDequeTest.java @@ -0,0 +1,124 @@ +package org.redisson; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.redisson.api.RBlockingDeque; + +public class RedissonPriorityBlockingDequeTest extends BaseTest { + + @Test(timeout = 3000) + public void testShortPoll() throws InterruptedException { + RBlockingDeque queue = redisson.getPriorityBlockingDeque("queue:pollany"); + queue.pollLastAsync(500, TimeUnit.MILLISECONDS); + queue.pollFirstAsync(10, TimeUnit.MICROSECONDS); + } + + @Test + public void testTakeFirst() throws InterruptedException { + RBlockingDeque deque = redisson.getPriorityBlockingDeque("queue:take"); + + deque.add(1); + deque.add(2); + deque.add(3); + deque.add(4); + + assertThat(deque.takeFirst()).isEqualTo(1); + assertThat(deque.takeFirst()).isEqualTo(2); + assertThat(deque.takeFirst()).isEqualTo(3); + assertThat(deque.takeFirst()).isEqualTo(4); + assertThat(deque.size()).isZero(); + } + + @Test + public void testTakeLast() throws InterruptedException { + RBlockingDeque deque = redisson.getPriorityBlockingDeque("queue:take"); + + deque.add(1); + deque.add(2); + deque.add(3); + deque.add(4); + + assertThat(deque.takeLast()).isEqualTo(4); + assertThat(deque.takeLast()).isEqualTo(3); + assertThat(deque.takeLast()).isEqualTo(2); + assertThat(deque.takeLast()).isEqualTo(1); + assertThat(deque.size()).isZero(); + } + + @Test + public void testTakeFirstAwait() throws InterruptedException { + RBlockingDeque deque = redisson.getPriorityBlockingDeque("queue:take"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingDeque deque1 = redisson.getBlockingDeque("queue:take"); + deque1.add(1); + deque1.add(2); + deque1.add(3); + deque1.add(4); + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + assertThat(deque.takeFirst()).isEqualTo(1); + assertThat(System.currentTimeMillis() - s).isGreaterThan(9000); + Thread.sleep(50); + assertThat(deque.takeFirst()).isEqualTo(2); + assertThat(deque.takeFirst()).isEqualTo(3); + assertThat(deque.takeFirst()).isEqualTo(4); + } + + @Test + public void testTakeLastAwait() throws InterruptedException { + RBlockingDeque deque = redisson.getPriorityBlockingDeque("queue:take"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingDeque deque1 = redisson.getBlockingDeque("queue:take"); + deque1.add(1); + deque1.add(2); + deque1.add(3); + deque1.add(4); + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + assertThat(deque.takeLast()).isEqualTo(4); + assertThat(System.currentTimeMillis() - s).isGreaterThan(9000); + Thread.sleep(50); + assertThat(deque.takeLast()).isEqualTo(3); + assertThat(deque.takeLast()).isEqualTo(2); + assertThat(deque.takeLast()).isEqualTo(1); + } + + @Test + public void testPollFirst() throws InterruptedException { + RBlockingDeque queue1 = redisson.getPriorityBlockingDeque("queue1"); + queue1.put(1); + queue1.put(2); + queue1.put(3); + + assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(1); + assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(2); + assertThat(queue1.pollFirst(2, TimeUnit.SECONDS)).isEqualTo(3); + + long s = System.currentTimeMillis(); + assertThat(queue1.pollFirst(5, TimeUnit.SECONDS)).isNull(); + assertThat(System.currentTimeMillis() - s).isGreaterThan(5000); + } + + @Test + public void testPollLast() throws InterruptedException { + RBlockingDeque queue1 = redisson.getPriorityBlockingDeque("queue1"); + queue1.add(3); + queue1.add(1); + queue1.add(2); + + assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(3); + assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(2); + assertThat(queue1.pollLast(2, TimeUnit.SECONDS)).isEqualTo(1); + + long s = System.currentTimeMillis(); + assertThat(queue1.pollLast(5, TimeUnit.SECONDS)).isNull(); + assertThat(System.currentTimeMillis() - s).isGreaterThanOrEqualTo(5000); + } + +} From b7643513fba8f10c51d1d45970ab6727975b6b5d Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 10 Jan 2018 08:18:40 +0300 Subject: [PATCH 3/8] RAtomicDoubleReactive added --- .../org/redisson/RedissonAtomicDouble.java | 2 +- .../java/org/redisson/RedissonReactive.java | 7 + .../redisson/api/RAtomicDoubleReactive.java | 47 +++++++ .../org/redisson/api/RAtomicLongReactive.java | 5 + .../redisson/api/RedissonReactiveClient.java | 8 ++ .../RedissonAtomicDoubleReactive.java | 132 ++++++++++++++++++ .../reactive/RedissonAtomicLongReactive.java | 3 +- 7 files changed, 201 insertions(+), 3 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RAtomicDoubleReactive.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java index 23f55b45e..61ceba620 100644 --- a/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java +++ b/redisson/src/main/java/org/redisson/RedissonAtomicDouble.java @@ -35,7 +35,7 @@ import org.redisson.command.CommandAsyncExecutor; */ public class RedissonAtomicDouble extends RedissonExpirable implements RAtomicDouble { - protected RedissonAtomicDouble(CommandAsyncExecutor commandExecutor, String name) { + public RedissonAtomicDouble(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index c36970b24..188fe7ff8 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -24,6 +24,7 @@ import org.redisson.api.ClusterNode; import org.redisson.api.MapOptions; import org.redisson.api.Node; import org.redisson.api.NodesGroup; +import org.redisson.api.RAtomicDoubleReactive; import org.redisson.api.RAtomicLongReactive; import org.redisson.api.RBatchReactive; import org.redisson.api.RBitSetReactive; @@ -59,6 +60,7 @@ import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; import org.redisson.pubsub.SemaphorePubSub; +import org.redisson.reactive.RedissonAtomicDoubleReactive; import org.redisson.reactive.RedissonAtomicLongReactive; import org.redisson.reactive.RedissonBatchReactive; import org.redisson.reactive.RedissonBitSetReactive; @@ -302,6 +304,11 @@ public class RedissonReactive implements RedissonReactiveClient { public RAtomicLongReactive getAtomicLong(String name) { return new RedissonAtomicLongReactive(commandExecutor, name); } + + @Override + public RAtomicDoubleReactive getAtomicDouble(String name) { + return new RedissonAtomicDoubleReactive(commandExecutor, name); + } @Override public RBitSetReactive getBitSet(String name) { diff --git a/redisson/src/main/java/org/redisson/api/RAtomicDoubleReactive.java b/redisson/src/main/java/org/redisson/api/RAtomicDoubleReactive.java new file mode 100644 index 000000000..d93891ae8 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RAtomicDoubleReactive.java @@ -0,0 +1,47 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RAtomicDoubleReactive extends RExpirableReactive { + + Publisher compareAndSet(double expect, double update); + + Publisher addAndGet(double delta); + + Publisher decrementAndGet(); + + Publisher get(); + + Publisher getAndAdd(double delta); + + Publisher getAndSet(double newValue); + + Publisher incrementAndGet(); + + Publisher getAndIncrement(); + + Publisher getAndDecrement(); + + Publisher set(double newValue); + +} diff --git a/redisson/src/main/java/org/redisson/api/RAtomicLongReactive.java b/redisson/src/main/java/org/redisson/api/RAtomicLongReactive.java index 4bd448439..788a1802d 100644 --- a/redisson/src/main/java/org/redisson/api/RAtomicLongReactive.java +++ b/redisson/src/main/java/org/redisson/api/RAtomicLongReactive.java @@ -17,6 +17,11 @@ package org.redisson.api; import org.reactivestreams.Publisher; +/** + * + * @author Nikita Koksharov + * + */ public interface RAtomicLongReactive extends RExpirableReactive { Publisher compareAndSet(long expect, long update); diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index c47209e08..ad745e952 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -469,6 +469,14 @@ public interface RedissonReactiveClient { */ RAtomicLongReactive getAtomicLong(String name); + /** + * Returns "atomic double" instance by name. + * + * @param name of the "atomic double" + * @return AtomicLong object + */ + RAtomicDoubleReactive getAtomicDouble(String name); + /** * Returns bitSet instance by name. * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java new file mode 100644 index 000000000..ed5e31b4c --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java @@ -0,0 +1,132 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonAtomicDouble; +import org.redisson.api.RAtomicDoubleAsync; +import org.redisson.api.RAtomicDoubleReactive; +import org.redisson.api.RFuture; +import org.redisson.command.CommandReactiveExecutor; + +import reactor.fn.Supplier; + +/** + * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} + * + * @author Nikita Koksharov + * + */ +public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive implements RAtomicDoubleReactive { + + private final RAtomicDoubleAsync instance; + + public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name); + instance = new RedissonAtomicDouble(commandExecutor, name); + } + + @Override + public Publisher addAndGet(final double delta) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAndGetAsync(delta); + } + }); + } + + @Override + public Publisher compareAndSet(final double expect, final double update) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.compareAndSetAsync(expect, update); + } + }); + } + + @Override + public Publisher decrementAndGet() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.decrementAndGetAsync(); + } + }); + } + + @Override + public Publisher get() { + return addAndGet(0); + } + + @Override + public Publisher getAndAdd(final double delta) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.getAndAddAsync(delta); + } + }); + } + + + @Override + public Publisher getAndSet(final double newValue) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.getAndSetAsync(newValue); + } + }); + } + + @Override + public Publisher incrementAndGet() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.incrementAndGetAsync(); + } + }); + } + + @Override + public Publisher getAndIncrement() { + return getAndAdd(1); + } + + @Override + public Publisher getAndDecrement() { + return getAndAdd(-1); + } + + @Override + public Publisher set(final double newValue) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.setAsync(newValue); + } + }); + } + + public String toString() { + return instance.toString(); + } + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java index b0c625881..8aaa70f8e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java @@ -23,7 +23,6 @@ import org.redisson.api.RFuture; import org.redisson.command.CommandReactiveExecutor; import reactor.fn.Supplier; -import reactor.rx.Streams; /** * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} @@ -127,7 +126,7 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem } public String toString() { - return Long.toString(Streams.create(get()).next().poll()); + return instance.toString(); } } From 882773409e3676b0c06f4aed4000fbf660c8b57c Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 10 Jan 2018 08:22:35 +0300 Subject: [PATCH 4/8] javadocs fixed --- redisson/src/main/java/org/redisson/api/RMapAsync.java | 4 ++-- redisson/src/main/java/org/redisson/api/RMapReactive.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/redisson/src/main/java/org/redisson/api/RMapAsync.java b/redisson/src/main/java/org/redisson/api/RMapAsync.java index af4cbae8b..b75b9fd4d 100644 --- a/redisson/src/main/java/org/redisson/api/RMapAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapAsync.java @@ -35,7 +35,7 @@ import java.util.Set; public interface RMapAsync extends RExpirableAsync { /** - * Loads all map entries to this Redis map. + * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}. * * @param replaceExistingValues - true if existed values should be replaced, false otherwise. * @param parallelism - parallelism level, used to increase speed of process execution @@ -44,7 +44,7 @@ public interface RMapAsync extends RExpirableAsync { RFuture loadAllAsync(boolean replaceExistingValues, int parallelism); /** - * Loads map entries whose keys are listed in defined keys parameter. + * Loads map entries using {@link org.redisson.api.map.MapLoader} whose keys are listed in defined keys parameter. * * @param keys - map keys * @param replaceExistingValues - true if existed values should be replaced, false otherwise. diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index 2fd545104..a14b95b91 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -35,7 +35,7 @@ import org.redisson.api.map.MapWriter; public interface RMapReactive extends RExpirableReactive { /** - * Loads all map entries to this Redis map. + * Loads all map entries to this Redis map using {@link org.redisson.api.map.MapLoader}. * * @param replaceExistingValues - true if existed values should be replaced, false otherwise. * @param parallelism - parallelism level, used to increase speed of process execution @@ -44,7 +44,7 @@ public interface RMapReactive extends RExpirableReactive { Publisher loadAll(boolean replaceExistingValues, int parallelism); /** - * Loads map entries whose keys are listed in defined keys parameter. + * Loads map entries using {@link org.redisson.api.map.MapLoader} whose keys are listed in defined keys parameter. * * @param keys - map keys * @param replaceExistingValues - true if existed values should be replaced, false otherwise. From d08290233a30b8ec1291c20df3073b1d54b6d02e Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 10 Jan 2018 10:16:05 +0300 Subject: [PATCH 5/8] Eviction process adjustments --- .../src/main/java/org/redisson/eviction/EvictionTask.java | 4 ++++ .../java/org/redisson/eviction/MapCacheEvictionTask.java | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/eviction/EvictionTask.java b/redisson/src/main/java/org/redisson/eviction/EvictionTask.java index f6c68ffb8..f43d62c61 100644 --- a/redisson/src/main/java/org/redisson/eviction/EvictionTask.java +++ b/redisson/src/main/java/org/redisson/eviction/EvictionTask.java @@ -64,6 +64,10 @@ abstract class EvictionTask implements Runnable { } Integer size = future.getNow(); + if (size == -1) { + schedule(); + return; + } if (sizeHistory.size() == 2) { if (sizeHistory.peekFirst() > sizeHistory.peekLast() diff --git a/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java b/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java index 2568436a3..4c85dcc21 100644 --- a/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java +++ b/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java @@ -56,9 +56,10 @@ public class MapCacheEvictionTask extends EvictionTask { @Override RFuture execute() { + int latchExpireTime = Math.min(delay, 30); return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, "if redis.call('setnx', KEYS[6], ARGV[4]) == 0 then " - + "return 0;" + + "return -1;" + "end;" + "redis.call('expire', KEYS[6], ARGV[3]); " +"local expiredKeys1 = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " @@ -99,7 +100,7 @@ public class MapCacheEvictionTask extends EvictionTask { + "end; " + "return #expiredKeys1 + #expiredKeys2;", Arrays.asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executeTaskOnceLatchName), - System.currentTimeMillis(), keysLimit, delay, 1); + System.currentTimeMillis(), keysLimit, latchExpireTime, 1); } } From 60abd563057c5d172957cddec1622d630cfa85c6 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 10 Jan 2018 15:35:53 +0300 Subject: [PATCH 6/8] testTaskResume added. #1231 --- .../RedissonScheduledExecutorServiceTest.java | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java index 569278ee4..36c920b65 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonScheduledExecutorServiceTest.java @@ -12,14 +12,11 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.redisson.BaseTest; import org.redisson.RedissonNode; -import org.redisson.RedissonRuntimeEnvironment; import org.redisson.api.CronSchedule; import org.redisson.api.RScheduledExecutorService; import org.redisson.api.RScheduledFuture; @@ -30,26 +27,6 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { private static RedissonNode node; - @BeforeClass - public static void beforeClass() throws IOException, InterruptedException { - if (!RedissonRuntimeEnvironment.isTravis) { - BaseTest.beforeClass(); - Config config = createConfig(); - RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); - nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1)); - node = RedissonNode.create(nodeConfig); - node.start(); - } - } - - @AfterClass - public static void afterClass() throws IOException, InterruptedException { - if (!RedissonRuntimeEnvironment.isTravis) { - BaseTest.afterClass(); - node.shutdown(); - } - } - @Before @Override public void before() throws IOException, InterruptedException { @@ -68,6 +45,25 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest { node.shutdown(); } + @Test(timeout = 7000) + public void testTaskResume() throws InterruptedException, ExecutionException { + RScheduledExecutorService executor = redisson.getExecutorService("test"); + ScheduledFuture future1 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS); + ScheduledFuture future2 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS); + ScheduledFuture future3 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS); + + node.shutdown(); + + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(redisson.getConfig()); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1)); + node = RedissonNode.create(nodeConfig); + node.start(); + + assertThat(future1.get()).isEqualTo(100); + assertThat(future2.get()).isEqualTo(100); + assertThat(future3.get()).isEqualTo(100); + } + @Test public void testLoad() { Config config = createConfig(); From 4a1a92972e7b7558cdc7111ebcd7051337f68019 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 10 Jan 2018 15:36:00 +0300 Subject: [PATCH 7/8] refactoring --- .../executor/RedissonExecutorServiceTest.java | 42 ++++--------------- 1 file changed, 7 insertions(+), 35 deletions(-) diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 5ba5e9a9e..8cf9b862e 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -1,6 +1,7 @@ package org.redisson.executor; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import java.io.IOException; import java.util.Arrays; @@ -16,65 +17,36 @@ import java.util.concurrent.TimeoutException; import org.awaitility.Duration; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.redisson.BaseTest; import org.redisson.RedissonNode; -import org.redisson.RedissonRuntimeEnvironment; import org.redisson.api.RExecutorBatchFuture; import org.redisson.api.RExecutorFuture; import org.redisson.api.RExecutorService; import org.redisson.config.Config; import org.redisson.config.RedissonNodeConfig; -import static org.awaitility.Awaitility.*; - public class RedissonExecutorServiceTest extends BaseTest { private static RedissonNode node; - @BeforeClass - public static void beforeClass() throws IOException, InterruptedException { - BaseTest.beforeClass(); - if (!RedissonRuntimeEnvironment.isTravis) { - Config config = createConfig(); - RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); - nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1)); - node = RedissonNode.create(nodeConfig); - node.start(); - } - } - - @AfterClass - public static void afterClass() throws IOException, InterruptedException { - BaseTest.afterClass(); - if (!RedissonRuntimeEnvironment.isTravis) { - node.shutdown(); - } - } - @Before @Override public void before() throws IOException, InterruptedException { super.before(); - if (RedissonRuntimeEnvironment.isTravis) { - Config config = createConfig(); - RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); - nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1)); - node = RedissonNode.create(nodeConfig); - node.start(); - } + Config config = createConfig(); + RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); + nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1)); + node = RedissonNode.create(nodeConfig); + node.start(); } @After @Override public void after() throws InterruptedException { super.after(); - if (RedissonRuntimeEnvironment.isTravis) { - node.shutdown(); - } + node.shutdown(); } private void cancel(RExecutorFuture future) throws InterruptedException, ExecutionException { From a19b74e890a91baae7afea50efc79bcbcca5f070 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 11 Jan 2018 12:45:06 +0300 Subject: [PATCH 8/8] refactoring --- redisson/src/main/java/org/redisson/Redisson.java | 10 +--------- .../java/org/redisson/RedissonLiveObjectService.java | 9 +++------ .../src/main/java/org/redisson/api/RedissonClient.java | 8 -------- .../redisson/liveobject/core/AccessorInterceptor.java | 5 +---- .../liveobject/core/LiveObjectInterceptor.java | 4 ++-- .../liveobject/core/RedissonObjectBuilder.java | 4 ++-- 6 files changed, 9 insertions(+), 31 deletions(-) diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 3abf3168a..ee07a4ae2 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -74,7 +74,6 @@ import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; -import org.redisson.codec.ReferenceCodecProvider; import org.redisson.command.CommandExecutor; import org.redisson.config.Config; import org.redisson.config.ConfigSupport; @@ -105,7 +104,6 @@ public class Redisson implements RedissonClient { protected final ConnectionManager connectionManager; protected final ConcurrentMap, Class> liveObjectClassCache = PlatformDependent.newConcurrentHashMap(); - protected final ReferenceCodecProvider codecProvider; protected final Config config; protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(); @@ -118,7 +116,6 @@ public class Redisson implements RedissonClient { connectionManager = ConfigSupport.createConnectionManager(configCopy); evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor()); - codecProvider = configCopy.getReferenceCodecProvider(); } public EvictionScheduler getEvictionScheduler() { @@ -585,7 +582,7 @@ public class Redisson implements RedissonClient { @Override public RLiveObjectService getLiveObjectService() { - return new RedissonLiveObjectService(this, liveObjectClassCache, codecProvider); + return new RedissonLiveObjectService(this, liveObjectClassCache); } @Override @@ -604,11 +601,6 @@ public class Redisson implements RedissonClient { return config; } - @Override - public ReferenceCodecProvider getCodecProvider() { - return codecProvider; - } - @Override public NodesGroup getNodesGroup() { return new RedisNodes(connectionManager); diff --git a/redisson/src/main/java/org/redisson/RedissonLiveObjectService.java b/redisson/src/main/java/org/redisson/RedissonLiveObjectService.java index 0710d7f84..ce2a7a26b 100644 --- a/redisson/src/main/java/org/redisson/RedissonLiveObjectService.java +++ b/redisson/src/main/java/org/redisson/RedissonLiveObjectService.java @@ -54,7 +54,6 @@ import org.redisson.api.annotation.RCascade; import org.redisson.api.annotation.REntity; import org.redisson.api.annotation.RFieldAccessor; import org.redisson.api.annotation.RId; -import org.redisson.codec.ReferenceCodecProvider; import org.redisson.liveobject.LiveObjectTemplate; import org.redisson.liveobject.core.AccessorInterceptor; import org.redisson.liveobject.core.FieldAccessorInterceptor; @@ -85,14 +84,12 @@ public class RedissonLiveObjectService implements RLiveObjectService { private static final ConcurrentMap, Resolver> providerCache = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap, Class> classCache; private final RedissonClient redisson; - private final ReferenceCodecProvider codecProvider; private final RedissonObjectBuilder objectBuilder; - public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap, Class> classCache, ReferenceCodecProvider codecProvider) { + public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap, Class> classCache) { this.redisson = redisson; this.classCache = classCache; - this.codecProvider = codecProvider; - this.objectBuilder = new RedissonObjectBuilder(redisson, codecProvider); + this.objectBuilder = new RedissonObjectBuilder(redisson); } //TODO: Add ttl renewal functionality @@ -642,7 +639,7 @@ public class RedissonLiveObjectService implements RLiveObjectService { .withBinders(FieldProxy.Binder .install(LiveObjectInterceptor.Getter.class, LiveObjectInterceptor.Setter.class)) - .to(new LiveObjectInterceptor(redisson, codecProvider, entityClass, + .to(new LiveObjectInterceptor(redisson, entityClass, getRIdFieldName(entityClass)))) // .intercept(MethodDelegation.to( // new LiveObjectInterceptor(redisson, codecProvider, entityClass, diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 80246ce0d..6c56e2d34 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -18,7 +18,6 @@ package org.redisson.api; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; -import org.redisson.codec.ReferenceCodecProvider; import org.redisson.config.Config; /** @@ -956,13 +955,6 @@ public interface RedissonClient { */ Config getConfig(); - /** - * Returns the CodecProvider instance - * - * @return CodecProvider object - */ - public ReferenceCodecProvider getCodecProvider(); - /** * Get Redis nodes group for server operations * diff --git a/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java b/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java index 197ec0ca4..7419bbae1 100644 --- a/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java +++ b/redisson/src/main/java/org/redisson/liveobject/core/AccessorInterceptor.java @@ -29,7 +29,6 @@ import org.redisson.api.annotation.REntity; import org.redisson.api.annotation.REntity.TransformationMode; import org.redisson.api.annotation.RId; import org.redisson.client.codec.Codec; -import org.redisson.codec.ReferenceCodecProvider; import org.redisson.liveobject.misc.ClassUtils; import org.redisson.liveobject.misc.Introspectior; import org.redisson.liveobject.resolver.NamingScheme; @@ -52,12 +51,10 @@ import net.bytebuddy.implementation.bind.annotation.This; public class AccessorInterceptor { private final RedissonClient redisson; - private final ReferenceCodecProvider codecProvider; private final RedissonObjectBuilder objectBuilder; public AccessorInterceptor(RedissonClient redisson, RedissonObjectBuilder objectBuilder) { this.redisson = redisson; - this.codecProvider = redisson.getCodecProvider(); this.objectBuilder = objectBuilder; } @@ -104,7 +101,7 @@ public class AccessorInterceptor { REntity anno = ClassUtils.getAnnotation(rEntity, REntity.class); NamingScheme ns = anno.namingScheme() .getDeclaredConstructor(Codec.class) - .newInstance(codecProvider.getCodec(anno, (Class) rEntity)); + .newInstance(redisson.getConfig().getReferenceCodecProvider().getCodec(anno, (Class) rEntity)); liveMap.fastPut(fieldName, new RedissonReference(rEntity, ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject), liveObject.getLiveObjectId()))); diff --git a/redisson/src/main/java/org/redisson/liveobject/core/LiveObjectInterceptor.java b/redisson/src/main/java/org/redisson/liveobject/core/LiveObjectInterceptor.java index 63cb7440b..6c89ba0b8 100644 --- a/redisson/src/main/java/org/redisson/liveobject/core/LiveObjectInterceptor.java +++ b/redisson/src/main/java/org/redisson/liveobject/core/LiveObjectInterceptor.java @@ -57,9 +57,9 @@ public class LiveObjectInterceptor { private final NamingScheme namingScheme; private final Class codecClass; - public LiveObjectInterceptor(RedissonClient redisson, ReferenceCodecProvider codecProvider, Class entityClass, String idFieldName) { + public LiveObjectInterceptor(RedissonClient redisson, Class entityClass, String idFieldName) { this.redisson = redisson; - this.codecProvider = codecProvider; + this.codecProvider = redisson.getConfig().getReferenceCodecProvider(); this.originalClass = entityClass; this.idFieldName = idFieldName; REntity anno = (REntity) ClassUtils.getAnnotation(entityClass, REntity.class); diff --git a/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java b/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java index 828389540..e4a67c549 100644 --- a/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java +++ b/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java @@ -76,10 +76,10 @@ public class RedissonObjectBuilder { private final RedissonClient redisson; private final ReferenceCodecProvider codecProvider; - public RedissonObjectBuilder(RedissonClient redisson, ReferenceCodecProvider codecProvider) { + public RedissonObjectBuilder(RedissonClient redisson) { super(); this.redisson = redisson; - this.codecProvider = codecProvider; + this.codecProvider = redisson.getConfig().getReferenceCodecProvider(); } public void store(RObject ar, String fieldName, RMap liveMap) {