|
|
|
@ -137,52 +137,53 @@ public class RedissonTransactionalBuckets extends RedissonBuckets {
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> deleteAsync(String... keys) {
|
|
|
|
|
checkState();
|
|
|
|
|
RPromise<Long> result = new RedissonPromise<>();
|
|
|
|
|
long threadId = Thread.currentThread().getId();
|
|
|
|
|
executeLocked(result, new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
AtomicLong counter = new AtomicLong();
|
|
|
|
|
AtomicLong executions = new AtomicLong(keys.length);
|
|
|
|
|
for (String key : keys) {
|
|
|
|
|
Object st = state.get(key);
|
|
|
|
|
if (st != null) {
|
|
|
|
|
operations.add(new DeleteOperation(key, getLockName(key), transactionId, threadId));
|
|
|
|
|
if (st != NULL) {
|
|
|
|
|
state.put(key, NULL);
|
|
|
|
|
counter.incrementAndGet();
|
|
|
|
|
}
|
|
|
|
|
if (executions.decrementAndGet() == 0) {
|
|
|
|
|
result.trySuccess(counter.get());
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedissonKeys ks = new RedissonKeys(commandExecutor);
|
|
|
|
|
ks.countExistsAsync(key).onComplete((res, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
result.tryFailure(e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (res > 0) {
|
|
|
|
|
operations.add(new DeleteOperation(key, getLockName(key), transactionId, threadId));
|
|
|
|
|
state.put(key, NULL);
|
|
|
|
|
counter.incrementAndGet();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (executions.decrementAndGet() == 0) {
|
|
|
|
|
result.trySuccess(counter.get());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}, Arrays.asList(keys));
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
// Add RKeys.deleteAsync support
|
|
|
|
|
//
|
|
|
|
|
// public RFuture<Long> deleteAsync(String... keys) {
|
|
|
|
|
// checkState();
|
|
|
|
|
// RPromise<Long> result = new RedissonPromise<>();
|
|
|
|
|
// long threadId = Thread.currentThread().getId();
|
|
|
|
|
// executeLocked(result, new Runnable() {
|
|
|
|
|
// @Override
|
|
|
|
|
// public void run() {
|
|
|
|
|
// AtomicLong counter = new AtomicLong();
|
|
|
|
|
// AtomicLong executions = new AtomicLong(keys.length);
|
|
|
|
|
// for (String key : keys) {
|
|
|
|
|
// Object st = state.get(key);
|
|
|
|
|
// if (st != null) {
|
|
|
|
|
// operations.add(new DeleteOperation(key, getLockName(key), transactionId, threadId));
|
|
|
|
|
// if (st != NULL) {
|
|
|
|
|
// state.put(key, NULL);
|
|
|
|
|
// counter.incrementAndGet();
|
|
|
|
|
// }
|
|
|
|
|
// if (executions.decrementAndGet() == 0) {
|
|
|
|
|
// result.trySuccess(counter.get());
|
|
|
|
|
// }
|
|
|
|
|
// continue;
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// RedissonKeys ks = new RedissonKeys(commandExecutor);
|
|
|
|
|
// ks.countExistsAsync(key).onComplete((res, e) -> {
|
|
|
|
|
// if (e != null) {
|
|
|
|
|
// result.tryFailure(e);
|
|
|
|
|
// return;
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// if (res > 0) {
|
|
|
|
|
// operations.add(new DeleteOperation(key, getLockName(key), transactionId, threadId));
|
|
|
|
|
// state.put(key, NULL);
|
|
|
|
|
// counter.incrementAndGet();
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// if (executions.decrementAndGet() == 0) {
|
|
|
|
|
// result.trySuccess(counter.get());
|
|
|
|
|
// }
|
|
|
|
|
// });
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// }, Arrays.asList(keys));
|
|
|
|
|
// return result;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> trySetAsync(Map<String, ?> buckets) {
|
|
|
|
|