Merge branch 'redisson:master' into issue-5148

pull/5150/head
zcxsythenew 2 years ago committed by GitHub
commit 2e34874201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -144,10 +144,6 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
register(remoteInterface, object, workers, commandExecutor.getServiceManager().getExecutor());
}
private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<>(codec, commandExecutor, name);
}
@Override
public <T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor) {
if (workers < 1) {

@ -287,8 +287,18 @@ public class RedisConnection implements RedisCommands {
fastReconnect.complete(null);
fastReconnect = null;
}
private void close() {
public void close() {
try {
closeAsync().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
throw e;
}
}
private void closeInternal() {
CommandData<?, ?> command = getCurrentCommand();
if ((command != null && command.isBlockingCommand())
|| !connectionPromise.isDone()) {
@ -304,7 +314,7 @@ public class RedisConnection implements RedisCommands {
public CompletableFuture<Void> forceFastReconnectAsync() {
CompletableFuture<Void> promise = new CompletableFuture<Void>();
fastReconnect = promise;
close();
closeInternal();
return promise;
}
@ -320,7 +330,7 @@ public class RedisConnection implements RedisCommands {
public ChannelFuture closeIdleAsync() {
status = Status.CLOSED_IDLE;
close();
closeInternal();
return channel.closeFuture();
}
@ -330,7 +340,7 @@ public class RedisConnection implements RedisCommands {
public ChannelFuture closeAsync() {
status = Status.CLOSED;
close();
closeInternal();
return channel.closeFuture();
}

@ -16,6 +16,7 @@
package org.redisson.executor;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.client.codec.Codec;
@ -247,7 +248,9 @@ public class TasksService extends BaseRemoteService {
return CompletableFuture.completedFuture(resp);
});
}
return CompletableFuture.completedFuture(response);
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseQueueName, codec);
return queue.removeAsync(response).thenApply(r -> response);
}).whenComplete((r, ex) -> {
if (ex != null) {
scheduleCancelResponseCheck(mapName, requestId);

@ -20,7 +20,9 @@ import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.RedissonBlockingQueue;
import org.redisson.RedissonMap;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RemoteInvocationOptions;
@ -200,4 +202,8 @@ public abstract class BaseRemoteService {
return result;
}
protected <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<>(codec, commandExecutor, name);
}
}

@ -30,7 +30,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonScheduledExecutorServiceTest extends BaseTest {
public class
RedissonScheduledExecutorServiceTest extends BaseTest {
private static RedissonNode node;
@ -528,7 +531,8 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
cancel(future1);
assertThat(redisson.<Long>getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE);
Thread.sleep(50);
assertThat(redisson.<Long>getBucket("executed1").get()).isGreaterThan(1000L);
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
@ -537,9 +541,10 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
RScheduledFuture<?> future2 = executor.scheduleWithFixedDelay(new ScheduledLongRepeatableTask("counter", "executed2"), 1, 2, TimeUnit.SECONDS);
Thread.sleep(6000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);
executor.cancelTask(future2.getTaskId());
assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
assertThat(executor.cancelTask(future2.getTaskId())).isTrue();
assertThat(redisson.<Long>getBucket("executed2").get()).isGreaterThan(1000L);
Thread.sleep(3000);
assertThat(redisson.getAtomicLong("counter").get()).isEqualTo(3);

Loading…
Cancel
Save