diff --git a/redisson/src/main/java/org/redisson/RedissonBuckets.java b/redisson/src/main/java/org/redisson/RedissonBuckets.java index aabc880d2..3ab9e2f95 100644 --- a/redisson/src/main/java/org/redisson/RedissonBuckets.java +++ b/redisson/src/main/java/org/redisson/RedissonBuckets.java @@ -30,7 +30,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.codec.CompositeCodec; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.misc.RedissonPromise; @@ -41,14 +41,14 @@ import org.redisson.misc.RedissonPromise; */ public class RedissonBuckets implements RBuckets { - private final Codec codec; - private final CommandExecutor commandExecutor; + protected final Codec codec; + protected final CommandAsyncExecutor commandExecutor; - public RedissonBuckets(CommandExecutor commandExecutor) { + public RedissonBuckets(CommandAsyncExecutor commandExecutor) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor); } - public RedissonBuckets(Codec codec, CommandExecutor commandExecutor) { + public RedissonBuckets(Codec codec, CommandAsyncExecutor commandExecutor) { super(); this.codec = codec; this.commandExecutor = commandExecutor; @@ -74,8 +74,7 @@ public class RedissonBuckets implements RBuckets { @Override public RFuture> getAsync(String... keys) { if (keys.length == 0) { - Map emptyMap = Collections.emptyMap(); - return RedissonPromise.>newSucceededFuture(emptyMap); + return RedissonPromise.newSucceededFuture(Collections.emptyMap()); } RedisCommand> command = new RedisCommand>("MGET", new MapGetAllDecoder(Arrays.asList(keys), 0)); @@ -120,4 +119,15 @@ public class RedissonBuckets implements RBuckets { return commandExecutor.writeAsync(params.get(0).toString(), RedisCommands.MSET, params.toArray()); } + @Override + public RFuture deleteAsync(String... keys) { + RedissonKeys ks = new RedissonKeys(commandExecutor); + return ks.deleteAsync(keys); + } + + @Override + public long delete(String... keys) { + return commandExecutor.get(deleteAsync(keys)); + } + } diff --git a/redisson/src/main/java/org/redisson/api/RBuckets.java b/redisson/src/main/java/org/redisson/api/RBuckets.java index 181f02e83..2f920fbe2 100644 --- a/redisson/src/main/java/org/redisson/api/RBuckets.java +++ b/redisson/src/main/java/org/redisson/api/RBuckets.java @@ -52,4 +52,12 @@ public interface RBuckets extends RBucketsAsync { */ void set(Map buckets); + /** + * Delete multiple objects by name + * + * @param keys - object names + * @return number of removed keys + */ + long delete(String... keys); + } diff --git a/redisson/src/main/java/org/redisson/api/RBucketsAsync.java b/redisson/src/main/java/org/redisson/api/RBucketsAsync.java index ac1730dd0..6d84c5a5b 100644 --- a/redisson/src/main/java/org/redisson/api/RBucketsAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBucketsAsync.java @@ -53,4 +53,12 @@ public interface RBucketsAsync { */ RFuture setAsync(Map buckets); + /** + * Delete multiple objects by name + * + * @param keys - object names + * @return number of removed keys + */ + RFuture deleteAsync(String... keys); + } diff --git a/redisson/src/main/java/org/redisson/api/RTransaction.java b/redisson/src/main/java/org/redisson/api/RTransaction.java index 7c79da419..f89a42481 100644 --- a/redisson/src/main/java/org/redisson/api/RTransaction.java +++ b/redisson/src/main/java/org/redisson/api/RTransaction.java @@ -48,6 +48,22 @@ public interface RTransaction { */ RBucket getBucket(String name, Codec codec); + /** + * Returns transactional interface for mass operations with Bucket objects. + * + * @return Buckets + */ + RBuckets getBuckets(); + + /** + * Returns transactional interface for mass operations with Bucket objects + * using provided codec for object. + * + * @param codec - codec for bucket objects + * @return Buckets + */ + RBuckets getBuckets(Codec codec); + /** * Returns transactional map instance by name. * diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 65b8d92e9..9202d1669 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -25,6 +25,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +37,7 @@ import org.redisson.RedissonTopic; import org.redisson.api.BatchOptions; import org.redisson.api.BatchResult; import org.redisson.api.RBucket; +import org.redisson.api.RBuckets; import org.redisson.api.RFuture; import org.redisson.api.RLocalCachedMap; import org.redisson.api.RMap; @@ -65,7 +67,6 @@ import org.redisson.transaction.operation.map.MapOperation; import io.netty.buffer.ByteBufUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import io.netty.util.internal.PlatformDependent; /** * @@ -78,8 +79,8 @@ public class RedissonTransaction implements RTransaction { private final AtomicBoolean executed = new AtomicBoolean(); private final TransactionOptions options; - private List operations = new CopyOnWriteArrayList(); - private Set localCaches = new HashSet(); + private List operations = new CopyOnWriteArrayList<>(); + private Set localCaches = new HashSet<>(); private final long startTime = System.currentTimeMillis(); private final String id = generateId(); @@ -112,16 +113,30 @@ public class RedissonTransaction implements RTransaction { public RBucket getBucket(String name) { checkState(); - return new RedissonTransactionalBucket(commandExecutor, name, operations, executed, id); + return new RedissonTransactionalBucket(commandExecutor, options.getTimeout(), name, operations, executed, id); } @Override public RBucket getBucket(String name, Codec codec) { checkState(); - return new RedissonTransactionalBucket(codec, commandExecutor, name, operations, executed, id); + return new RedissonTransactionalBucket(codec, commandExecutor, options.getTimeout(), name, operations, executed, id); } + @Override + public RBuckets getBuckets() { + checkState(); + + return new RedissonTransactionalBuckets(commandExecutor, options.getTimeout(), operations, executed, id); + } + + @Override + public RBuckets getBuckets(Codec codec) { + checkState(); + + return new RedissonTransactionalBuckets(codec, commandExecutor, options.getTimeout(), operations, executed, id); + } + @Override public RSet getSet(String name) { checkState(); @@ -328,7 +343,7 @@ public class RedissonTransaction implements RTransaction { return Collections.emptyMap(); } - Map hashes = new HashMap(localCaches.size()); + Map hashes = new HashMap<>(localCaches.size()); RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); for (TransactionalOperation transactionalOperation : operations) { if (localCaches.contains(transactionalOperation.getName())) { @@ -359,7 +374,7 @@ public class RedissonTransaction implements RTransaction { } CountDownLatch latch = new CountDownLatch(hashes.size()); - List topics = new ArrayList(); + List topics = new ArrayList<>(); for (Entry entry : hashes.entrySet()) { RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX)); @@ -418,11 +433,11 @@ public class RedissonTransaction implements RTransaction { private RFuture> disableLocalCacheAsync(String requestId, Set localCaches, List operations) { if (localCaches.isEmpty()) { - return RedissonPromise.newSucceededFuture(Collections.emptyMap()); + return RedissonPromise.newSucceededFuture(Collections.emptyMap()); } - RPromise> result = new RedissonPromise>(); - Map hashes = new HashMap(localCaches.size()); + RPromise> result = new RedissonPromise<>(); + Map hashes = new HashMap<>(localCaches.size()); RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults()); for (TransactionalOperation transactionalOperation : operations) { if (localCaches.contains(transactionalOperation.getName())) { @@ -454,11 +469,11 @@ public class RedissonTransaction implements RTransaction { } CountableListener> listener = - new CountableListener>(result, hashes, hashes.size()); - RPromise subscriptionFuture = new RedissonPromise(); - CountableListener subscribedFutures = new CountableListener(subscriptionFuture, null, hashes.size()); + new CountableListener<>(result, hashes, hashes.size()); + RPromise subscriptionFuture = new RedissonPromise<>(); + CountableListener subscribedFutures = new CountableListener<>(subscriptionFuture, null, hashes.size()); - List topics = new ArrayList(); + List topics = new ArrayList<>(); for (Entry entry : hashes.entrySet()) { String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX); RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, @@ -529,8 +544,7 @@ public class RedissonTransaction implements RTransaction { protected static String generateId() { byte[] id = new byte[16]; - // TODO JDK UPGRADE replace to native ThreadLocalRandom - PlatformDependent.threadLocalRandom().nextBytes(id); + ThreadLocalRandom.current().nextBytes(id); return ByteBufUtil.hexDump(id); } @@ -566,7 +580,7 @@ public class RedissonTransaction implements RTransaction { transactionalOperation.rollback(executorService); } - RPromise result = new RedissonPromise(); + RPromise result = new RedissonPromise<>(); RFuture> future = executorService.executeAsync(); future.onComplete((res, e) -> { if (e != null) { diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java new file mode 100644 index 000000000..48161d08f --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalBuckets.java @@ -0,0 +1,265 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.transaction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.redisson.RedissonBuckets; +import org.redisson.RedissonKeys; +import org.redisson.RedissonMultiLock; +import org.redisson.api.RFuture; +import org.redisson.api.RKeys; +import org.redisson.api.RLock; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; +import org.redisson.transaction.operation.DeleteOperation; +import org.redisson.transaction.operation.TransactionalOperation; +import org.redisson.transaction.operation.bucket.BucketSetOperation; +import org.redisson.transaction.operation.bucket.BucketsTrySetOperation; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonTransactionalBuckets extends RedissonBuckets { + + static final Object NULL = new Object(); + + private long timeout; + private final AtomicBoolean executed; + private final List operations; + private Map state = new HashMap<>(); + private final String transactionId; + + public RedissonTransactionalBuckets(CommandAsyncExecutor commandExecutor, + long timeout, List operations, AtomicBoolean executed, String transactionId) { + super(commandExecutor); + + this.timeout = timeout; + this.operations = operations; + this.executed = executed; + this.transactionId = transactionId; + } + + public RedissonTransactionalBuckets(Codec codec, CommandAsyncExecutor commandExecutor, + long timeout, List operations, AtomicBoolean executed, String transactionId) { + super(codec, commandExecutor); + + this.timeout = timeout; + this.operations = operations; + this.executed = executed; + this.transactionId = transactionId; + } + + @Override + public RFuture> getAsync(String... keys) { + checkState(); + + if (keys.length == 0) { + return RedissonPromise.newSucceededFuture(Collections.emptyMap()); + } + + Set keysToLoad = new HashSet<>(); + Map map = new LinkedHashMap<>(); + for (String key : keys) { + Object value = state.get(key); + if (value != null) { + if (value != NULL) { + map.put(key, (V) value); + } + } else { + keysToLoad.add(key); + } + } + + if (keysToLoad.isEmpty()) { + return RedissonPromise.newSucceededFuture(map); + } + + RPromise> result = new RedissonPromise<>(); + super.getAsync(keysToLoad.toArray(new String[keysToLoad.size()])).onComplete((res, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + map.putAll((Map) res); + result.trySuccess(map); + }); + return result; + } + + @Override + public RFuture setAsync(Map buckets) { + checkState(); + + RPromise result = new RedissonPromise<>(); + executeLocked(result, () -> { + for (Entry entry : buckets.entrySet()) { + operations.add(new BucketSetOperation<>(entry.getKey(), getLockName(entry.getKey()), codec, entry.getValue(), transactionId)); + if (entry.getValue() == null) { + state.put(entry.getKey(), NULL); + } else { + state.put(entry.getKey(), entry.getValue()); + } + } + result.trySuccess(null); + }, buckets.keySet()); + return result; + } + + @Override + public RFuture deleteAsync(String... keys) { + checkState(); + RPromise result = new RedissonPromise<>(); + executeLocked(result, new Runnable() { + @Override + public void run() { + AtomicLong counter = new AtomicLong(); + AtomicLong executions = new AtomicLong(keys.length); + for (String key : keys) { + Object st = state.get(key); + if (st != null) { + operations.add(new DeleteOperation(key, getLockName(key), transactionId)); + if (st != NULL) { + state.put(key, NULL); + counter.incrementAndGet(); + } + if (executions.decrementAndGet() == 0) { + result.trySuccess(counter.get()); + } + continue; + } + + RedissonKeys ks = new RedissonKeys(commandExecutor); + ks.countExistsAsync(key).onComplete((res, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + if (res > 0) { + operations.add(new DeleteOperation(key, getLockName(key), transactionId)); + state.put(key, NULL); + counter.incrementAndGet(); + } + + if (executions.decrementAndGet() == 0) { + result.trySuccess(counter.get()); + } + }); + } + } + }, Arrays.asList(keys)); + return result; + } + + @Override + public RFuture trySetAsync(Map buckets) { + checkState(); + + RPromise result = new RedissonPromise<>(); + executeLocked(result, () -> { + Set keysToSet = new HashSet<>(); + for (String key : buckets.keySet()) { + Object value = state.get(key); + if (value != null) { + if (value != NULL) { + operations.add(new BucketsTrySetOperation(codec, (Map) buckets, transactionId)); + result.trySuccess(false); + return; + } + } else { + keysToSet.add(key); + } + } + + if (keysToSet.isEmpty()) { + operations.add(new BucketsTrySetOperation(codec, (Map) buckets, transactionId)); + state.putAll(buckets); + result.trySuccess(true); + return; + } + + RKeys keys = new RedissonKeys(commandExecutor); + String[] ks = keysToSet.toArray(new String[keysToSet.size()]); + keys.countExistsAsync(ks).onComplete((res, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + operations.add(new BucketsTrySetOperation(codec, (Map) buckets, transactionId)); + if (res == 0) { + state.putAll(buckets); + result.trySuccess(true); + } else { + result.trySuccess(false); + } + }); + }, buckets.keySet()); + return result; + } + + protected void executeLocked(RPromise promise, Runnable runnable, Collection keys) { + List locks = new ArrayList<>(keys.size()); + for (String key : keys) { + RLock lock = getLock(key); + locks.add(lock); + } + RedissonMultiLock multiLock = new RedissonMultiLock(locks.toArray(new RLock[locks.size()])); + long threadId = Thread.currentThread().getId(); + multiLock.lockAsync(timeout, TimeUnit.MILLISECONDS).onComplete((res, e) -> { + if (e == null) { + runnable.run(); + } else { + multiLock.unlockAsync(threadId); + promise.tryFailure(e); + } + }); + } + + private RLock getLock(String name) { + return new RedissonTransactionalLock(commandExecutor, getLockName(name), transactionId); + } + + private String getLockName(String name) { + return name + ":transaction_lock"; + } + + protected void checkState() { + if (executed.get()) { + throw new IllegalStateException("Unable to execute operation. Transaction is in finished state!"); + } + } + +} diff --git a/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketsTrySetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketsTrySetOperation.java new file mode 100644 index 000000000..40f95d389 --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/operation/bucket/BucketsTrySetOperation.java @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.transaction.operation.bucket; + +import java.util.Map; + +import org.redisson.RedissonBuckets; +import org.redisson.api.RBuckets; +import org.redisson.api.RLock; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.RedissonTransactionalLock; +import org.redisson.transaction.operation.TransactionalOperation; + +/** + * + * @author Nikita Koksharov + * + */ +public class BucketsTrySetOperation extends TransactionalOperation { + + private String transactionId; + private Map values; + + public BucketsTrySetOperation(Codec codec, Map values, String transactionId) { + super(null, codec); + this.values = values; + this.transactionId = transactionId; + } + + @Override + public void commit(CommandAsyncExecutor commandExecutor) { + RBuckets bucket = new RedissonBuckets(codec, commandExecutor); + bucket.trySetAsync(values); + + unlock(commandExecutor); + } + + protected void unlock(CommandAsyncExecutor commandExecutor) { + for (String key : values.keySet()) { + RLock lock = new RedissonTransactionalLock(commandExecutor, getLockName(key), transactionId); + lock.unlockAsync(); + } + } + + @Override + public void rollback(CommandAsyncExecutor commandExecutor) { + unlock(commandExecutor); + } + + public Map getValues() { + return values; + } + + private String getLockName(String name) { + return name + ":transaction_lock"; + } + +} diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketsTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketsTest.java new file mode 100644 index 000000000..39f393ba9 --- /dev/null +++ b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalBucketsTest.java @@ -0,0 +1,84 @@ +package org.redisson.transaction; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.junit.Test; +import org.redisson.BaseTest; +import org.redisson.api.RBucket; +import org.redisson.api.RBuckets; +import org.redisson.api.RTransaction; +import org.redisson.api.TransactionOptions; + +public class RedissonTransactionalBucketsTest extends BaseTest { + + @Test + public void testGet() { + RBucket b = redisson.getBucket("test"); + b.set("123"); + + RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBuckets buckets = transaction.getBuckets(); + assertThat(buckets.get("test").get("test")).isEqualTo("123"); + + transaction.commit(); + + assertThat(redisson.getKeys().count()).isEqualTo(1); + assertThat(b.get()).isEqualTo("123"); + } + + @Test + public void testSet() { + RBucket b1 = redisson.getBucket("test1"); + b1.set("1"); + RBucket b2 = redisson.getBucket("test2"); + b2.set("2"); + + RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBuckets buckets = transaction.getBuckets(); + Map bbs = new LinkedHashMap<>(); + bbs.put("test1", "11"); + bbs.put("test2", "22"); + buckets.set(bbs); + + Map newBuckets = buckets.get("test1", "test2"); + assertThat(newBuckets).isEqualTo(bbs); + + transaction.commit(); + + assertThat(redisson.getBuckets().get("test1", "test2")).isEqualTo(bbs); + assertThat(redisson.getKeys().count()).isEqualTo(2); + } + + @Test + public void testTrySet() { + redisson.getBucket("test1").set("1"); + redisson.getBucket("test2").set("2"); + + RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults()); + RBuckets buckets = transaction.getBuckets(); + Map bbs1 = new LinkedHashMap<>(); + bbs1.put("test1", "10"); + bbs1.put("test2", "20"); + assertThat(buckets.trySet(bbs1)).isFalse(); + assertThat(buckets.delete("test1", "test2")).isEqualTo(2); + Map bbs2 = new LinkedHashMap<>(); + bbs2.put("test1", "11"); + bbs2.put("test2", "22"); + assertThat(buckets.trySet(bbs2)).isTrue(); + Map bbs3 = new LinkedHashMap<>(); + bbs3.put("test1", "13"); + bbs3.put("test2", "23"); + assertThat(buckets.trySet(bbs3)).isFalse(); + + System.out.println("commit " + Thread.currentThread().getId()); + transaction.commit(); + redisson.getKeys().getKeys().forEach(x -> System.out.println(x)); + +// assertThat(redisson.getBuckets().get("test1", "test2")).isEqualTo(bbs2); + assertThat(redisson.getKeys().count()).isEqualTo(2); + } + +}