Feature - RTransaction.getBuckets method added #1444

pull/1907/head
Nikita Koksharov 6 years ago
parent e781736691
commit 0a5140bffb

@ -30,7 +30,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.misc.RedissonPromise;
@ -41,14 +41,14 @@ import org.redisson.misc.RedissonPromise;
*/
public class RedissonBuckets implements RBuckets {
private final Codec codec;
private final CommandExecutor commandExecutor;
protected final Codec codec;
protected final CommandAsyncExecutor commandExecutor;
public RedissonBuckets(CommandExecutor commandExecutor) {
public RedissonBuckets(CommandAsyncExecutor commandExecutor) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor);
}
public RedissonBuckets(Codec codec, CommandExecutor commandExecutor) {
public RedissonBuckets(Codec codec, CommandAsyncExecutor commandExecutor) {
super();
this.codec = codec;
this.commandExecutor = commandExecutor;
@ -74,8 +74,7 @@ public class RedissonBuckets implements RBuckets {
@Override
public <V> RFuture<Map<String, V>> getAsync(String... keys) {
if (keys.length == 0) {
Map<String, V> emptyMap = Collections.emptyMap();
return RedissonPromise.<Map<String, V>>newSucceededFuture(emptyMap);
return RedissonPromise.newSucceededFuture(Collections.emptyMap());
}
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("MGET", new MapGetAllDecoder(Arrays.<Object>asList(keys), 0));
@ -120,4 +119,15 @@ public class RedissonBuckets implements RBuckets {
return commandExecutor.writeAsync(params.get(0).toString(), RedisCommands.MSET, params.toArray());
}
@Override
public RFuture<Long> deleteAsync(String... keys) {
RedissonKeys ks = new RedissonKeys(commandExecutor);
return ks.deleteAsync(keys);
}
@Override
public long delete(String... keys) {
return commandExecutor.get(deleteAsync(keys));
}
}

@ -52,4 +52,12 @@ public interface RBuckets extends RBucketsAsync {
*/
void set(Map<String, ?> buckets);
/**
* Delete multiple objects by name
*
* @param keys - object names
* @return number of removed keys
*/
long delete(String... keys);
}

@ -53,4 +53,12 @@ public interface RBucketsAsync {
*/
RFuture<Void> setAsync(Map<String, ?> buckets);
/**
* Delete multiple objects by name
*
* @param keys - object names
* @return number of removed keys
*/
RFuture<Long> deleteAsync(String... keys);
}

@ -48,6 +48,22 @@ public interface RTransaction {
*/
<V> RBucket<V> getBucket(String name, Codec codec);
/**
* Returns transactional interface for mass operations with Bucket objects.
*
* @return Buckets
*/
RBuckets getBuckets();
/**
* Returns transactional interface for mass operations with Bucket objects
* using provided codec for object.
*
* @param codec - codec for bucket objects
* @return Buckets
*/
RBuckets getBuckets(Codec codec);
/**
* Returns transactional map instance by name.
*

@ -25,6 +25,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -36,6 +37,7 @@ import org.redisson.RedissonTopic;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
@ -65,7 +67,6 @@ import org.redisson.transaction.operation.map.MapOperation;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -78,8 +79,8 @@ public class RedissonTransaction implements RTransaction {
private final AtomicBoolean executed = new AtomicBoolean();
private final TransactionOptions options;
private List<TransactionalOperation> operations = new CopyOnWriteArrayList<TransactionalOperation>();
private Set<String> localCaches = new HashSet<String>();
private List<TransactionalOperation> operations = new CopyOnWriteArrayList<>();
private Set<String> localCaches = new HashSet<>();
private final long startTime = System.currentTimeMillis();
private final String id = generateId();
@ -112,16 +113,30 @@ public class RedissonTransaction implements RTransaction {
public <V> RBucket<V> getBucket(String name) {
checkState();
return new RedissonTransactionalBucket<V>(commandExecutor, name, operations, executed, id);
return new RedissonTransactionalBucket<V>(commandExecutor, options.getTimeout(), name, operations, executed, id);
}
@Override
public <V> RBucket<V> getBucket(String name, Codec codec) {
checkState();
return new RedissonTransactionalBucket<V>(codec, commandExecutor, name, operations, executed, id);
return new RedissonTransactionalBucket<V>(codec, commandExecutor, options.getTimeout(), name, operations, executed, id);
}
@Override
public RBuckets getBuckets() {
checkState();
return new RedissonTransactionalBuckets(commandExecutor, options.getTimeout(), operations, executed, id);
}
@Override
public RBuckets getBuckets(Codec codec) {
checkState();
return new RedissonTransactionalBuckets(codec, commandExecutor, options.getTimeout(), operations, executed, id);
}
@Override
public <V> RSet<V> getSet(String name) {
checkState();
@ -328,7 +343,7 @@ public class RedissonTransaction implements RTransaction {
return Collections.emptyMap();
}
Map<HashKey, HashValue> hashes = new HashMap<HashKey, HashValue>(localCaches.size());
Map<HashKey, HashValue> hashes = new HashMap<>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
@ -359,7 +374,7 @@ public class RedissonTransaction implements RTransaction {
}
CountDownLatch latch = new CountDownLatch(hashes.size());
List<RTopic> topics = new ArrayList<RTopic>();
List<RTopic> topics = new ArrayList<>();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX));
@ -418,11 +433,11 @@ public class RedissonTransaction implements RTransaction {
private RFuture<Map<HashKey, HashValue>> disableLocalCacheAsync(String requestId, Set<String> localCaches, List<TransactionalOperation> operations) {
if (localCaches.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.<HashKey, HashValue>emptyMap());
return RedissonPromise.newSucceededFuture(Collections.emptyMap());
}
RPromise<Map<HashKey, HashValue>> result = new RedissonPromise<Map<HashKey, HashValue>>();
Map<HashKey, HashValue> hashes = new HashMap<HashKey, HashValue>(localCaches.size());
RPromise<Map<HashKey, HashValue>> result = new RedissonPromise<>();
Map<HashKey, HashValue> hashes = new HashMap<>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
@ -454,11 +469,11 @@ public class RedissonTransaction implements RTransaction {
}
CountableListener<Map<HashKey, HashValue>> listener =
new CountableListener<Map<HashKey, HashValue>>(result, hashes, hashes.size());
RPromise<Void> subscriptionFuture = new RedissonPromise<Void>();
CountableListener<Void> subscribedFutures = new CountableListener<Void>(subscriptionFuture, null, hashes.size());
new CountableListener<>(result, hashes, hashes.size());
RPromise<Void> subscriptionFuture = new RedissonPromise<>();
CountableListener<Void> subscribedFutures = new CountableListener<>(subscriptionFuture, null, hashes.size());
List<RTopic> topics = new ArrayList<RTopic>();
List<RTopic> topics = new ArrayList<>();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX);
RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE,
@ -529,8 +544,7 @@ public class RedissonTransaction implements RTransaction {
protected static String generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@ -566,7 +580,7 @@ public class RedissonTransaction implements RTransaction {
transactionalOperation.rollback(executorService);
}
RPromise<Void> result = new RedissonPromise<Void>();
RPromise<Void> result = new RedissonPromise<>();
RFuture<List<?>> future = executorService.executeAsync();
future.onComplete((res, e) -> {
if (e != null) {

@ -0,0 +1,265 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.RedissonBuckets;
import org.redisson.RedissonKeys;
import org.redisson.RedissonMultiLock;
import org.redisson.api.RFuture;
import org.redisson.api.RKeys;
import org.redisson.api.RLock;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.transaction.operation.DeleteOperation;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.bucket.BucketSetOperation;
import org.redisson.transaction.operation.bucket.BucketsTrySetOperation;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonTransactionalBuckets extends RedissonBuckets {
static final Object NULL = new Object();
private long timeout;
private final AtomicBoolean executed;
private final List<TransactionalOperation> operations;
private Map<String, Object> state = new HashMap<>();
private final String transactionId;
public RedissonTransactionalBuckets(CommandAsyncExecutor commandExecutor,
long timeout, List<TransactionalOperation> operations, AtomicBoolean executed, String transactionId) {
super(commandExecutor);
this.timeout = timeout;
this.operations = operations;
this.executed = executed;
this.transactionId = transactionId;
}
public RedissonTransactionalBuckets(Codec codec, CommandAsyncExecutor commandExecutor,
long timeout, List<TransactionalOperation> operations, AtomicBoolean executed, String transactionId) {
super(codec, commandExecutor);
this.timeout = timeout;
this.operations = operations;
this.executed = executed;
this.transactionId = transactionId;
}
@Override
public <V> RFuture<Map<String, V>> getAsync(String... keys) {
checkState();
if (keys.length == 0) {
return RedissonPromise.newSucceededFuture(Collections.emptyMap());
}
Set<String> keysToLoad = new HashSet<>();
Map<String, V> map = new LinkedHashMap<>();
for (String key : keys) {
Object value = state.get(key);
if (value != null) {
if (value != NULL) {
map.put(key, (V) value);
}
} else {
keysToLoad.add(key);
}
}
if (keysToLoad.isEmpty()) {
return RedissonPromise.newSucceededFuture(map);
}
RPromise<Map<String, V>> result = new RedissonPromise<>();
super.getAsync(keysToLoad.toArray(new String[keysToLoad.size()])).onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
map.putAll((Map<String, V>) res);
result.trySuccess(map);
});
return result;
}
@Override
public RFuture<Void> setAsync(Map<String, ?> buckets) {
checkState();
RPromise<Void> result = new RedissonPromise<>();
executeLocked(result, () -> {
for (Entry<String, ?> entry : buckets.entrySet()) {
operations.add(new BucketSetOperation<>(entry.getKey(), getLockName(entry.getKey()), codec, entry.getValue(), transactionId));
if (entry.getValue() == null) {
state.put(entry.getKey(), NULL);
} else {
state.put(entry.getKey(), entry.getValue());
}
}
result.trySuccess(null);
}, buckets.keySet());
return result;
}
@Override
public RFuture<Long> deleteAsync(String... keys) {
checkState();
RPromise<Long> result = new RedissonPromise<>();
executeLocked(result, new Runnable() {
@Override
public void run() {
AtomicLong counter = new AtomicLong();
AtomicLong executions = new AtomicLong(keys.length);
for (String key : keys) {
Object st = state.get(key);
if (st != null) {
operations.add(new DeleteOperation(key, getLockName(key), transactionId));
if (st != NULL) {
state.put(key, NULL);
counter.incrementAndGet();
}
if (executions.decrementAndGet() == 0) {
result.trySuccess(counter.get());
}
continue;
}
RedissonKeys ks = new RedissonKeys(commandExecutor);
ks.countExistsAsync(key).onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (res > 0) {
operations.add(new DeleteOperation(key, getLockName(key), transactionId));
state.put(key, NULL);
counter.incrementAndGet();
}
if (executions.decrementAndGet() == 0) {
result.trySuccess(counter.get());
}
});
}
}
}, Arrays.asList(keys));
return result;
}
@Override
public RFuture<Boolean> trySetAsync(Map<String, ?> buckets) {
checkState();
RPromise<Boolean> result = new RedissonPromise<>();
executeLocked(result, () -> {
Set<String> keysToSet = new HashSet<>();
for (String key : buckets.keySet()) {
Object value = state.get(key);
if (value != null) {
if (value != NULL) {
operations.add(new BucketsTrySetOperation(codec, (Map<String, Object>) buckets, transactionId));
result.trySuccess(false);
return;
}
} else {
keysToSet.add(key);
}
}
if (keysToSet.isEmpty()) {
operations.add(new BucketsTrySetOperation(codec, (Map<String, Object>) buckets, transactionId));
state.putAll(buckets);
result.trySuccess(true);
return;
}
RKeys keys = new RedissonKeys(commandExecutor);
String[] ks = keysToSet.toArray(new String[keysToSet.size()]);
keys.countExistsAsync(ks).onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
operations.add(new BucketsTrySetOperation(codec, (Map<String, Object>) buckets, transactionId));
if (res == 0) {
state.putAll(buckets);
result.trySuccess(true);
} else {
result.trySuccess(false);
}
});
}, buckets.keySet());
return result;
}
protected <R> void executeLocked(RPromise<R> promise, Runnable runnable, Collection<String> keys) {
List<RLock> locks = new ArrayList<>(keys.size());
for (String key : keys) {
RLock lock = getLock(key);
locks.add(lock);
}
RedissonMultiLock multiLock = new RedissonMultiLock(locks.toArray(new RLock[locks.size()]));
long threadId = Thread.currentThread().getId();
multiLock.lockAsync(timeout, TimeUnit.MILLISECONDS).onComplete((res, e) -> {
if (e == null) {
runnable.run();
} else {
multiLock.unlockAsync(threadId);
promise.tryFailure(e);
}
});
}
private RLock getLock(String name) {
return new RedissonTransactionalLock(commandExecutor, getLockName(name), transactionId);
}
private String getLockName(String name) {
return name + ":transaction_lock";
}
protected void checkState() {
if (executed.get()) {
throw new IllegalStateException("Unable to execute operation. Transaction is in finished state!");
}
}
}

@ -0,0 +1,72 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction.operation.bucket;
import java.util.Map;
import org.redisson.RedissonBuckets;
import org.redisson.api.RBuckets;
import org.redisson.api.RLock;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.RedissonTransactionalLock;
import org.redisson.transaction.operation.TransactionalOperation;
/**
*
* @author Nikita Koksharov
*
*/
public class BucketsTrySetOperation extends TransactionalOperation {
private String transactionId;
private Map<String, Object> values;
public BucketsTrySetOperation(Codec codec, Map<String, Object> values, String transactionId) {
super(null, codec);
this.values = values;
this.transactionId = transactionId;
}
@Override
public void commit(CommandAsyncExecutor commandExecutor) {
RBuckets bucket = new RedissonBuckets(codec, commandExecutor);
bucket.trySetAsync(values);
unlock(commandExecutor);
}
protected void unlock(CommandAsyncExecutor commandExecutor) {
for (String key : values.keySet()) {
RLock lock = new RedissonTransactionalLock(commandExecutor, getLockName(key), transactionId);
lock.unlockAsync();
}
}
@Override
public void rollback(CommandAsyncExecutor commandExecutor) {
unlock(commandExecutor);
}
public Map<String, Object> getValues() {
return values;
}
private String getLockName(String name) {
return name + ":transaction_lock";
}
}

@ -0,0 +1,84 @@
package org.redisson.transaction;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.LinkedHashMap;
import java.util.Map;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
import org.redisson.api.RTransaction;
import org.redisson.api.TransactionOptions;
public class RedissonTransactionalBucketsTest extends BaseTest {
@Test
public void testGet() {
RBucket<String> b = redisson.getBucket("test");
b.set("123");
RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults());
RBuckets buckets = transaction.getBuckets();
assertThat(buckets.get("test").get("test")).isEqualTo("123");
transaction.commit();
assertThat(redisson.getKeys().count()).isEqualTo(1);
assertThat(b.get()).isEqualTo("123");
}
@Test
public void testSet() {
RBucket<String> b1 = redisson.getBucket("test1");
b1.set("1");
RBucket<String> b2 = redisson.getBucket("test2");
b2.set("2");
RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults());
RBuckets buckets = transaction.getBuckets();
Map<String, Object> bbs = new LinkedHashMap<>();
bbs.put("test1", "11");
bbs.put("test2", "22");
buckets.set(bbs);
Map<String, Object> newBuckets = buckets.get("test1", "test2");
assertThat(newBuckets).isEqualTo(bbs);
transaction.commit();
assertThat(redisson.getBuckets().get("test1", "test2")).isEqualTo(bbs);
assertThat(redisson.getKeys().count()).isEqualTo(2);
}
@Test
public void testTrySet() {
redisson.getBucket("test1").set("1");
redisson.getBucket("test2").set("2");
RTransaction transaction = redisson.createTransaction(TransactionOptions.defaults());
RBuckets buckets = transaction.getBuckets();
Map<String, Object> bbs1 = new LinkedHashMap<>();
bbs1.put("test1", "10");
bbs1.put("test2", "20");
assertThat(buckets.trySet(bbs1)).isFalse();
assertThat(buckets.delete("test1", "test2")).isEqualTo(2);
Map<String, Object> bbs2 = new LinkedHashMap<>();
bbs2.put("test1", "11");
bbs2.put("test2", "22");
assertThat(buckets.trySet(bbs2)).isTrue();
Map<String, Object> bbs3 = new LinkedHashMap<>();
bbs3.put("test1", "13");
bbs3.put("test2", "23");
assertThat(buckets.trySet(bbs3)).isFalse();
System.out.println("commit " + Thread.currentThread().getId());
transaction.commit();
redisson.getKeys().getKeys().forEach(x -> System.out.println(x));
// assertThat(redisson.getBuckets().get("test1", "test2")).isEqualTo(bbs2);
assertThat(redisson.getKeys().count()).isEqualTo(2);
}
}
Loading…
Cancel
Save