Merge branch 'master' of github.com:redisson/redisson

pull/6394/head
Nikita Koksharov 3 weeks ago
commit 31ed55d391

@ -15,6 +15,7 @@
*/ */
package org.redisson; package org.redisson;
import org.redisson.api.ObjectListener;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RLockAsync; import org.redisson.api.RLockAsync;
@ -514,4 +515,23 @@ public class RedissonMultiLock implements RLock {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public int addListener(ObjectListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void removeListener(int listenerId) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Integer> addListenerAsync(ObjectListener listener) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
throw new UnsupportedOperationException();
}
} }

@ -264,7 +264,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
} }
int freeWorkers = entry.getFreeWorkers().decrementAndGet(); int freeWorkers = entry.getFreeWorkers().decrementAndGet();
if (freeWorkers > 0 && requestId != null) { if (freeWorkers > 0) {
subscribe(remoteInterface, requestQueue, executor, bean); subscribe(remoteInterface, requestQueue, executor, bean);
} }

@ -25,7 +25,7 @@ import java.util.concurrent.locks.Lock;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public interface RLock extends Lock, RLockAsync { public interface RLock extends Lock, RLockAsync, RObservable {
/** /**
* Returns name of object * Returns name of object

@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public interface RLockAsync { public interface RLockAsync extends RObservableAsync {
/** /**
* Unlocks the lock independently of its state * Unlocks the lock independently of its state

@ -25,7 +25,7 @@ import reactor.core.publisher.Mono;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public interface RLockReactive { public interface RLockReactive extends RObservableReactive {
/** /**
* Returns name of object * Returns name of object

@ -26,7 +26,7 @@ import io.reactivex.rxjava3.core.Single;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public interface RLockRx { public interface RLockRx extends RObservableRx {
/** /**
* Returns name of object * Returns name of object

@ -0,0 +1,43 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* interface for Lock object listener
*
* @author seakider
*
*/
public interface RObservable {
/**
* Adds object event listener
*
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
int addListener(ObjectListener listener);
/**
* Removes object event listener
*
* @param listenerId - listener id
*/
void removeListener(int listenerId);
}

@ -0,0 +1,42 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* Async interface for Lock object listener
*
* @author seakider
*
*/
public interface RObservableAsync {
/**
* Adds object event listener
*
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
RFuture<Integer> addListenerAsync(ObjectListener listener);
/**
* Removes object event listener
*
* @param listenerId - listener id
*/
RFuture<Void> removeListenerAsync(int listenerId);
}

@ -0,0 +1,45 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import reactor.core.publisher.Mono;
/**
* Base Reactive interface for Lock object listener
*
* @author seakider
*
*/
public interface RObservableReactive {
/**
* Adds object event listener
*
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
Mono<Integer> addListener(ObjectListener listener);
/**
* Removes object event listener
*
* @param listenerId - listener id
* @return void
*/
Mono<Void> removeListener(int listenerId);
}

@ -0,0 +1,46 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
/**
* Base RxJava2 interface for Lock object listener
*
* @author seakider
*
*/
public interface RObservableRx {
/**
* Adds object event listener
*
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
Single<Integer> addListener(ObjectListener listener);
/**
* Removes object event listener
*
* @param listenerId - listener id
*/
Completable removeListener(int listenerId);
}

@ -1,9 +1,14 @@
package org.redisson; package org.redisson;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.DeletedObjectListener;
import org.redisson.api.ExpiredObjectListener;
import org.redisson.api.RLockReactive; import org.redisson.api.RLockReactive;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public class RedissonLockReactiveTest extends BaseReactiveTest { public class RedissonLockReactiveTest extends BaseReactiveTest {
@ -42,4 +47,55 @@ public class RedissonLockReactiveTest extends BaseReactiveTest {
assertThat(sync(lock1.isHeldByThread(threadId1))).isFalse(); assertThat(sync(lock1.isHeldByThread(threadId1))).isFalse();
assertThat(sync(lock2.isHeldByThread(threadId2))).isFalse(); assertThat(sync(lock2.isHeldByThread(threadId2))).isFalse();
} }
@Test
public void testLockReactiveListener() {
testWithParams(redisson -> {
RLockReactive lock1 = redisson.reactive().getLock("lock1");
CountDownLatch latch = new CountDownLatch(4);
Mono<Integer> listener1 = lock1.addListener(new ExpiredObjectListener() {
@Override
public void onExpired(String name) {
latch.countDown();
}
});
Mono<Integer> listener2 = lock1.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
latch.countDown();
}
});
int listenerId1 = sync(listener1);
int listenerId2 = sync(listener2);
sync(lock1.lock(5, TimeUnit.SECONDS));
sync(lock1.unlock());
assertThat(latch.getCount()).isEqualTo(3);
try {
sync(lock1.lock(5, TimeUnit.SECONDS));
Thread.sleep(6000);
assertThat(latch.getCount()).isEqualTo(2);
sync(lock1.removeListener(listenerId1));
sync(lock1.lock(5, TimeUnit.SECONDS));
Thread.sleep(5100);
assertThat(latch.getCount()).isEqualTo(2);
sync(lock1.lock(5, TimeUnit.SECONDS));
sync(lock1.unlock());
assertThat(sync(lock1.isLocked())).isFalse();
assertThat(latch.getCount()).isEqualTo(1);
sync(lock1.removeListener(listenerId2));
sync(lock1.lock(5, TimeUnit.SECONDS));
sync(lock1.unlock());
assertThat(latch.getCount()).isEqualTo(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, NOTIFY_KEYSPACE_EVENTS, "Egx");
}
} }

@ -2,9 +2,14 @@ package org.redisson;
import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.core.Single;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.DeletedObjectListener;
import org.redisson.api.ExpiredObjectListener;
import org.redisson.api.RLockRx; import org.redisson.api.RLockRx;
import org.redisson.rx.BaseRxTest; import org.redisson.rx.BaseRxTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public class RedissonLockRxTest extends BaseRxTest { public class RedissonLockRxTest extends BaseRxTest {
@ -43,4 +48,55 @@ public class RedissonLockRxTest extends BaseRxTest {
assertThat(sync(lock1.isHeldByThread(threadId1))).isFalse(); assertThat(sync(lock1.isHeldByThread(threadId1))).isFalse();
assertThat(sync(lock2.isHeldByThread(threadId2))).isFalse(); assertThat(sync(lock2.isHeldByThread(threadId2))).isFalse();
} }
@Test
public void testLockRxListener() {
testWithParams(redisson -> {
RLockRx lock1 = redisson.rxJava().getLock("lock1");
CountDownLatch latch = new CountDownLatch(4);
Single<Integer> listener1 = lock1.addListener(new ExpiredObjectListener() {
@Override
public void onExpired(String name) {
latch.countDown();
}
});
Single<Integer> listener2 = lock1.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
latch.countDown();
}
});
int listenerId1 = sync(listener1);
int listenerId2 = sync(listener2);
sync(lock1.lock(5, TimeUnit.SECONDS));
sync(lock1.unlock());
assertThat(latch.getCount()).isEqualTo(3);
try {
sync(lock1.lock(5, TimeUnit.SECONDS));
Thread.sleep(6000);
assertThat(latch.getCount()).isEqualTo(2);
sync(lock1.removeListener(listenerId1));
sync(lock1.lock(5, TimeUnit.SECONDS));
Thread.sleep(5100);
assertThat(latch.getCount()).isEqualTo(2);
sync(lock1.lock(5, TimeUnit.SECONDS));
sync(lock1.unlock());
assertThat(sync(lock1.isLocked())).isFalse();
assertThat(latch.getCount()).isEqualTo(1);
sync(lock1.removeListener(listenerId2));
sync(lock1.lock(5, TimeUnit.SECONDS));
sync(lock1.unlock());
assertThat(latch.getCount()).isEqualTo(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, NOTIFY_KEYSPACE_EVENTS, "Egx");
}
} }

@ -2,12 +2,13 @@ package org.redisson;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.DeletedObjectListener;
import org.redisson.api.ExpiredObjectListener;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.client.*; import org.redisson.client.*;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import java.util.List; import java.util.List;
@ -534,4 +535,54 @@ public class RedissonLockTest extends BaseConcurrentTest {
Assertions.assertEquals(iterations, lockedCounter.get()); Assertions.assertEquals(iterations, lockedCounter.get());
} }
@Test
public void testLockListener() {
testWithParams(redisson -> {
RLock lock1 = redisson.getLock("lock1");
CountDownLatch latch = new CountDownLatch(4);
int listenerId1 = lock1.addListener(new ExpiredObjectListener() {
@Override
public void onExpired(String name) {
latch.countDown();
}
});
int listenerId2 = lock1.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
latch.countDown();
}
});
lock1.lock(5, TimeUnit.SECONDS);
lock1.unlock();
assertThat(lock1.isLocked()).isFalse();
assertThat(latch.getCount()).isEqualTo(3);
try {
lock1.lock(5, TimeUnit.SECONDS);
Thread.sleep(6000);
assertThat(latch.getCount()).isEqualTo(2);
lock1.removeListener(listenerId1);
lock1.lock(5, TimeUnit.SECONDS);
Thread.sleep(5100);
assertThat(latch.getCount()).isEqualTo(2);
lock1.lock(5, TimeUnit.SECONDS);
lock1.unlock();
assertThat(lock1.isLocked()).isFalse();
assertThat(latch.getCount()).isEqualTo(1);
lock1.removeListener(listenerId2);
lock1.lock(5, TimeUnit.SECONDS);
lock1.unlock();
assertThat(lock1.isLocked()).isFalse();
assertThat(latch.getCount()).isEqualTo(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, NOTIFY_KEYSPACE_EVENTS, "Egx");
}
} }

@ -694,4 +694,21 @@ public class RedissonExecutorServiceTest extends RedisDockerTest {
assertThat(counter.get()).isGreaterThan(0); assertThat(counter.get()).isGreaterThan(0);
} }
@Test
public void testSubmitAfterPause() throws InterruptedException {
RExecutorService redissonES = redisson.getExecutorService("test-worker");
redissonES.registerWorkers(WorkerOptions.defaults().workers(2));
redissonES.submit(new RunnableTask());
Thread.sleep(Duration.ofSeconds(1));
assertThat(redissonES.getTaskCount()).isEqualTo(0);
Thread.sleep(Duration.ofMinutes(1));
redissonES.submit(new RunnableTask());
Thread.sleep(Duration.ofSeconds(1));
assertThat(redissonES.getTaskCount()).isEqualTo(0);
}
} }

Loading…
Cancel
Save