tests fixed
parent
e04daf5715
commit
8b86eb1f5e
@ -0,0 +1,38 @@
|
||||
/**
|
||||
* Copyright (c) 2013-2019 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.spring.transaction;
|
||||
|
||||
import org.redisson.api.RTransactionReactive;
|
||||
import org.springframework.transaction.support.ResourceHolderSupport;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public class ReactiveRedissonResourceHolder extends ResourceHolderSupport {
|
||||
|
||||
private RTransactionReactive transaction;
|
||||
|
||||
public RTransactionReactive getTransaction() {
|
||||
return transaction;
|
||||
}
|
||||
|
||||
public void setTransaction(RTransactionReactive transaction) {
|
||||
this.transaction = transaction;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
/**
|
||||
* Copyright (c) 2013-2019 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.spring.transaction;
|
||||
|
||||
import org.redisson.api.RTransactionReactive;
|
||||
import org.redisson.api.RedissonReactiveClient;
|
||||
import org.redisson.api.TransactionOptions;
|
||||
import org.springframework.transaction.NoTransactionException;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.TransactionException;
|
||||
import org.springframework.transaction.TransactionSystemException;
|
||||
import org.springframework.transaction.reactive.AbstractReactiveTransactionManager;
|
||||
import org.springframework.transaction.reactive.GenericReactiveTransaction;
|
||||
import org.springframework.transaction.reactive.TransactionSynchronizationManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
*/
|
||||
public class ReactiveRedissonTransactionManager extends AbstractReactiveTransactionManager {
|
||||
|
||||
private final RedissonReactiveClient redissonClient;
|
||||
|
||||
public ReactiveRedissonTransactionManager(RedissonReactiveClient redissonClient) {
|
||||
this.redissonClient = redissonClient;
|
||||
}
|
||||
|
||||
public Mono<RTransactionReactive> getCurrentTransaction() {
|
||||
return TransactionSynchronizationManager.forCurrentTransaction().map(manager -> {
|
||||
ReactiveRedissonResourceHolder holder = (ReactiveRedissonResourceHolder) manager.getResource(redissonClient);
|
||||
if (holder == null) {
|
||||
throw new NoTransactionException("No transaction is available for the current thread");
|
||||
} else {
|
||||
return holder.getTransaction();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) throws TransactionException {
|
||||
ReactiveRedissonTransactionObject transactionObject = new ReactiveRedissonTransactionObject();
|
||||
|
||||
ReactiveRedissonResourceHolder holder = (ReactiveRedissonResourceHolder) synchronizationManager.getResource(redissonClient);
|
||||
transactionObject.setResourceHolder(holder);
|
||||
return transactionObject;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) throws TransactionException {
|
||||
ReactiveRedissonTransactionObject tObject = (ReactiveRedissonTransactionObject) transaction;
|
||||
|
||||
TransactionOptions options = TransactionOptions.defaults();
|
||||
if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
|
||||
options.timeout(definition.getTimeout(), TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
RTransactionReactive trans = redissonClient.createTransaction(options);
|
||||
ReactiveRedissonResourceHolder holder = new ReactiveRedissonResourceHolder();
|
||||
holder.setTransaction(trans);
|
||||
tObject.setResourceHolder(holder);
|
||||
synchronizationManager.bindResource(redissonClient, holder);
|
||||
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doCommit(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) throws TransactionException {
|
||||
ReactiveRedissonTransactionObject to = (ReactiveRedissonTransactionObject) status.getTransaction();
|
||||
return to.getResourceHolder().getTransaction().commit().onErrorMap(ex -> {
|
||||
return new TransactionSystemException("Unable to commit transaction " + to.getResourceHolder().getTransaction(), ex);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doRollback(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) throws TransactionException {
|
||||
ReactiveRedissonTransactionObject to = (ReactiveRedissonTransactionObject) status.getTransaction();
|
||||
return to.getResourceHolder().getTransaction().rollback().onErrorMap(ex -> {
|
||||
return new TransactionSystemException("Unable to rollback transaction", ex);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizationManager, Object transaction) throws TransactionException {
|
||||
return Mono.fromSupplier(() -> {
|
||||
ReactiveRedissonTransactionObject to = (ReactiveRedissonTransactionObject) transaction;
|
||||
to.setResourceHolder(null);
|
||||
return synchronizationManager.unbindResource(redissonClient);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doResume(TransactionSynchronizationManager synchronizationManager, Object transaction, Object suspendedResources) throws TransactionException {
|
||||
return Mono.fromRunnable(() -> {
|
||||
synchronizationManager.bindResource(redissonClient, suspendedResources);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) throws TransactionException {
|
||||
return Mono.fromRunnable(() -> {
|
||||
ReactiveRedissonTransactionObject to = (ReactiveRedissonTransactionObject) status.getTransaction();
|
||||
to.getResourceHolder().setRollbackOnly();
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager, Object transaction) {
|
||||
return Mono.fromRunnable(() -> {
|
||||
synchronizationManager.unbindResource(redissonClient);
|
||||
ReactiveRedissonTransactionObject to = (ReactiveRedissonTransactionObject) transaction;
|
||||
to.getResourceHolder().setTransaction(null);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isExistingTransaction(Object transaction) throws TransactionException {
|
||||
ReactiveRedissonTransactionObject transactionObject = (ReactiveRedissonTransactionObject) transaction;
|
||||
return transactionObject.getResourceHolder() != null;
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Copyright (c) 2013-2019 Nikita Koksharov
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.spring.transaction;
|
||||
|
||||
import org.springframework.transaction.support.SmartTransactionObject;
|
||||
|
||||
/**
|
||||
* @author Nikita Koksharov
|
||||
*/
|
||||
public class ReactiveRedissonTransactionObject implements SmartTransactionObject {
|
||||
|
||||
private ReactiveRedissonResourceHolder resourceHolder;
|
||||
|
||||
public ReactiveRedissonResourceHolder getResourceHolder() {
|
||||
return resourceHolder;
|
||||
}
|
||||
|
||||
public void setResourceHolder(ReactiveRedissonResourceHolder resourceHolder) {
|
||||
this.resourceHolder = resourceHolder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRollbackOnly() {
|
||||
if (resourceHolder != null) {
|
||||
return resourceHolder.isRollbackOnly();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
// skip
|
||||
}
|
||||
}
|
@ -0,0 +1,97 @@
|
||||
package org.redisson.spring.transaction;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.redisson.api.RMapReactive;
|
||||
import org.redisson.api.RTransactionReactive;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.transaction.NoTransactionException;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class ReactiveTransactionalBean {
|
||||
|
||||
@Autowired
|
||||
private ReactiveRedissonTransactionManager transactionManager;
|
||||
|
||||
@Autowired
|
||||
private ReactiveTransactionalBean2 transactionalBean2;
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testTransactionIsNotNull() {
|
||||
Mono<RTransactionReactive> reactiveTransaction = transactionManager.getCurrentTransaction();
|
||||
return reactiveTransaction.map(t -> {
|
||||
if (t == null) {
|
||||
throw new RuntimeException("transaction can't be null");
|
||||
}
|
||||
return t;
|
||||
}).then();
|
||||
}
|
||||
|
||||
public Mono<Void> testNoTransaction() {
|
||||
Mono<RTransactionReactive> reactiveTransaction = transactionManager.getCurrentTransaction();
|
||||
return reactiveTransaction.onErrorResume(e -> {
|
||||
if (e instanceof NoTransactionException) {
|
||||
return Mono.empty();
|
||||
}
|
||||
return Mono.error(e);
|
||||
}).then();
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testCommit() {
|
||||
Mono<RTransactionReactive> transaction = transactionManager.getCurrentTransaction();
|
||||
return transaction.flatMap(t -> {
|
||||
RMapReactive<String, String> map = t.getMap("test1");
|
||||
return map.put("1", "2");
|
||||
}).then();
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testRollback() {
|
||||
Mono<RTransactionReactive> transaction = transactionManager.getCurrentTransaction();
|
||||
return transaction.flatMap(t -> {
|
||||
RMapReactive<String, String> map = t.getMap("test2");
|
||||
return map.put("1", "2");
|
||||
}).doOnSuccess(v -> {
|
||||
throw new IllegalStateException();
|
||||
}).then();
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testCommitAfterRollback() {
|
||||
Mono<RTransactionReactive> transaction = transactionManager.getCurrentTransaction();
|
||||
return transaction.flatMap(t -> {
|
||||
RMapReactive<String, String> map = t.getMap("test2");
|
||||
return map.put("1", "2");
|
||||
}).then();
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testNestedNewTransaction() {
|
||||
Mono<RTransactionReactive> transaction = transactionManager.getCurrentTransaction();
|
||||
return transaction.flatMap(t -> {
|
||||
RMapReactive<String, String> map = t.getMap("tr1");
|
||||
return map.put("1", "0");
|
||||
}).flatMap(v -> {
|
||||
return transactionalBean2.testInNewTransaction();
|
||||
});
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testPropagationRequired() {
|
||||
return transactionalBean2.testPropagationRequired();
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testPropagationRequiredWithException() {
|
||||
Mono<RTransactionReactive> transaction = transactionManager.getCurrentTransaction();
|
||||
return transaction.flatMap(t -> {
|
||||
RMapReactive<String, String> map = t.getMap("tr4");
|
||||
return map.put("1", "0");
|
||||
}).then(transactionalBean2.testPropagationRequiredWithException());
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
package org.redisson.spring.transaction;
|
||||
|
||||
import org.redisson.api.RTransactionReactive;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class ReactiveTransactionalBean2 {
|
||||
|
||||
@Autowired
|
||||
private ReactiveRedissonTransactionManager transactionManager;
|
||||
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
public Mono<Void> testInNewTransaction() {
|
||||
Mono<RTransactionReactive> transaction = transactionManager.getCurrentTransaction();
|
||||
return transaction.flatMap(t -> {
|
||||
return t.getMap("tr2").put("2", "4").then();
|
||||
});
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testPropagationRequired() {
|
||||
Mono<RTransactionReactive> transaction = transactionManager.getCurrentTransaction();
|
||||
return transaction.flatMap(t -> {
|
||||
return t.getMap("tr3").put("2", "4").then();
|
||||
});
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Mono<Void> testPropagationRequiredWithException() {
|
||||
Mono<RTransactionReactive> transaction = transactionManager.getCurrentTransaction();
|
||||
return transaction.flatMap(t -> {
|
||||
return t.getMap("tr5").put("2", "4");
|
||||
}).doOnSuccess(v -> {
|
||||
throw new IllegalStateException();
|
||||
}).then();
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
package org.redisson.spring.transaction;
|
||||
|
||||
import org.redisson.BaseTest;
|
||||
import org.redisson.Redisson;
|
||||
import org.redisson.RedissonReactive;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.redisson.api.RedissonReactiveClient;
|
||||
import org.redisson.transaction.operation.TransactionalOperation;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.transaction.ReactiveTransactionManager;
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||
import org.springframework.transaction.reactive.TransactionalOperator;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
@Configuration
|
||||
@EnableTransactionManagement
|
||||
public class RedissonReactiveTransactionContextConfig {
|
||||
|
||||
@Bean
|
||||
public ReactiveTransactionalBean2 transactionalBean2() {
|
||||
return new ReactiveTransactionalBean2();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ReactiveTransactionalBean transactionBean() {
|
||||
return new ReactiveTransactionalBean();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ReactiveRedissonTransactionManager transactionManager(RedissonReactiveClient redisson) {
|
||||
return new ReactiveRedissonTransactionManager(redisson);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RedissonReactiveClient redisson() {
|
||||
return Redisson.createReactive(BaseTest.createConfig());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
redisson().shutdown();
|
||||
}
|
||||
}
|
@ -0,0 +1,86 @@
|
||||
package org.redisson.spring.transaction;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.redisson.RedisRunner;
|
||||
import org.redisson.RedissonReactive;
|
||||
import org.redisson.api.RMap;
|
||||
import org.redisson.api.RMapReactive;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.redisson.api.RedissonReactiveClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes = RedissonReactiveTransactionContextConfig.class)
|
||||
public class RedissonReactiveTransactionManagerTest {
|
||||
|
||||
@Autowired
|
||||
private RedissonReactiveClient redisson;
|
||||
|
||||
@Autowired
|
||||
private ReactiveTransactionalBean transactionalBean;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws IOException, InterruptedException {
|
||||
RedisRunner.startDefaultRedisServerInstance();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws IOException, InterruptedException {
|
||||
RedisRunner.shutDownDefaultRedisServerInstance();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
transactionalBean.testTransactionIsNotNull().block();
|
||||
transactionalBean.testNoTransaction().block();
|
||||
|
||||
transactionalBean.testCommit().block();
|
||||
RMapReactive<String, String> map1 = redisson.getMap("test1");
|
||||
assertThat(map1.get("1").block()).isEqualTo("2");
|
||||
|
||||
try {
|
||||
transactionalBean.testRollback().block();
|
||||
Assert.fail();
|
||||
} catch (IllegalStateException e) {
|
||||
// skip
|
||||
}
|
||||
RMapReactive<String, String> map2 = redisson.getMap("test2");
|
||||
assertThat(map2.get("1").block()).isNull();
|
||||
|
||||
transactionalBean.testCommitAfterRollback().block();
|
||||
assertThat(map2.get("1").block()).isEqualTo("2");
|
||||
|
||||
RMapReactive<String, String> mapTr1 = redisson.getMap("tr1");
|
||||
assertThat(mapTr1.get("1").block()).isNull();
|
||||
RMapReactive<String, String> mapTr2 = redisson.getMap("tr2");
|
||||
assertThat(mapTr2.get("2").block()).isNull();
|
||||
|
||||
transactionalBean.testPropagationRequired().block();
|
||||
RMapReactive<String, String> mapTr3 = redisson.getMap("tr3");
|
||||
assertThat(mapTr3.get("2").block()).isEqualTo("4");
|
||||
|
||||
try {
|
||||
transactionalBean.testPropagationRequiredWithException().block();
|
||||
Assert.fail();
|
||||
} catch (IllegalStateException e) {
|
||||
// skip
|
||||
}
|
||||
RMapReactive<String, String> mapTr4 = redisson.getMap("tr4");
|
||||
assertThat(mapTr4.get("1").block()).isNull();
|
||||
RMapReactive<String, String> mapTr5 = redisson.getMap("tr5");
|
||||
assertThat(mapTr5.get("2").block()).isNull();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue