diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index b81fad744..2c4a4f483 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -49,6 +49,7 @@ import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.RemoteServiceResponse; import org.redisson.remote.RemoteServiceTimeoutException; import org.redisson.remote.ResponseEntry; +import org.redisson.remote.ResponseEntry.Key; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -381,6 +382,7 @@ public abstract class BaseRemoteService { private RPromise poll(final long timeout, final String requestId, final String responseName) { final RPromise responseFuture = new RedissonPromise(); + final Key key = new Key(requestId); ResponseEntry entry; synchronized (responses) { @@ -393,8 +395,8 @@ public abstract class BaseRemoteService { } } - final ConcurrentMap> entryResponses = entry.getResponses(); - entryResponses.put(requestId, responseFuture); + final ConcurrentMap> entryResponses = entry.getResponses(); + entryResponses.put(key, responseFuture); } ScheduledFuture future = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { @@ -406,11 +408,10 @@ public abstract class BaseRemoteService { return; } - ConcurrentMap> entryResponses = entry.getResponses(); RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); if (responseFuture.tryFailure(ex)) { - entry.getTimeouts().remove(requestId); - entryResponses.remove(requestId, responseFuture); + entry.getTimeouts().remove(key); + entry.getResponses().remove(key, responseFuture); if (entry.getResponses().isEmpty()) { responses.remove(responseName, entry); } @@ -418,7 +419,7 @@ public abstract class BaseRemoteService { } } }, timeout, TimeUnit.MILLISECONDS); - entry.getTimeouts().put(requestId, future); + entry.getTimeouts().put(key, future); pollTasks(entry, responseName); return responseFuture; @@ -448,9 +449,10 @@ public abstract class BaseRemoteService { return; } - ConcurrentMap> entryResponses = entry.getResponses(); - promise = (RPromise) entryResponses.remove(response.getId()); - java.util.concurrent.ScheduledFuture timeoutFuture = entry.getTimeouts().remove(response.getId()); + Key key = new Key(response.getId()); + ConcurrentMap> entryResponses = entry.getResponses(); + promise = (RPromise) entryResponses.remove(key); + java.util.concurrent.ScheduledFuture timeoutFuture = entry.getTimeouts().remove(key); timeoutFuture.cancel(false); if (entryResponses.isEmpty()) { diff --git a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java index 52d3e6dda..ebc8e275b 100644 --- a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java +++ b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java @@ -21,6 +21,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.misc.RPromise; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import io.netty.util.internal.PlatformDependent; /** @@ -30,16 +33,58 @@ import io.netty.util.internal.PlatformDependent; */ public class ResponseEntry { - private final ConcurrentMap> responses = PlatformDependent.newConcurrentHashMap(); - private final ConcurrentMap> timeouts = PlatformDependent.newConcurrentHashMap(); - private final AtomicBoolean started = new AtomicBoolean(); + public static class Key { + + private final long id0; + private final long id1; + + public Key(String id) { + byte[] buf = ByteBufUtil.decodeHexDump(id); + ByteBuf b = Unpooled.wrappedBuffer(buf); + try { + id0 = b.readLong(); + id1 = b.readLong(); + } finally { + b.release(); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (id0 ^ (id0 >>> 32)); + result = prime * result + (int) (id1 ^ (id1 >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Key other = (Key) obj; + if (id0 != other.id0) + return false; + if (id1 != other.id1) + return false; + return true; + } + + } + private final ConcurrentMap> responses = PlatformDependent.newConcurrentHashMap(); + private final ConcurrentMap> timeouts = PlatformDependent.newConcurrentHashMap(); + private final AtomicBoolean started = new AtomicBoolean(); - public ConcurrentMap> getTimeouts() { + public ConcurrentMap> getTimeouts() { return timeouts; } - public ConcurrentMap> getResponses() { + public ConcurrentMap> getResponses() { return responses; }