ExecutorService memory consumption optimization. #1158

pull/1204/head
Nikita 7 years ago
parent 044ebebbf2
commit 156be4dab6

@ -49,6 +49,7 @@ import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse; import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RemoteServiceTimeoutException; import org.redisson.remote.RemoteServiceTimeoutException;
import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Key;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -381,6 +382,7 @@ public abstract class BaseRemoteService {
private <T extends RRemoteServiceResponse> RPromise<T> poll(final long timeout, private <T extends RRemoteServiceResponse> RPromise<T> poll(final long timeout,
final String requestId, final String responseName) { final String requestId, final String responseName) {
final RPromise<T> responseFuture = new RedissonPromise<T>(); final RPromise<T> responseFuture = new RedissonPromise<T>();
final Key key = new Key(requestId);
ResponseEntry entry; ResponseEntry entry;
synchronized (responses) { synchronized (responses) {
@ -393,8 +395,8 @@ public abstract class BaseRemoteService {
} }
} }
final ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses(); final ConcurrentMap<Key, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses();
entryResponses.put(requestId, responseFuture); entryResponses.put(key, responseFuture);
} }
ScheduledFuture<?> future = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { ScheduledFuture<?> future = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@ -406,11 +408,10 @@ public abstract class BaseRemoteService {
return; return;
} }
ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses();
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (responseFuture.tryFailure(ex)) { if (responseFuture.tryFailure(ex)) {
entry.getTimeouts().remove(requestId); entry.getTimeouts().remove(key);
entryResponses.remove(requestId, responseFuture); entry.getResponses().remove(key, responseFuture);
if (entry.getResponses().isEmpty()) { if (entry.getResponses().isEmpty()) {
responses.remove(responseName, entry); responses.remove(responseName, entry);
} }
@ -418,7 +419,7 @@ public abstract class BaseRemoteService {
} }
} }
}, timeout, TimeUnit.MILLISECONDS); }, timeout, TimeUnit.MILLISECONDS);
entry.getTimeouts().put(requestId, future); entry.getTimeouts().put(key, future);
pollTasks(entry, responseName); pollTasks(entry, responseName);
return responseFuture; return responseFuture;
@ -448,9 +449,10 @@ public abstract class BaseRemoteService {
return; return;
} }
ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses(); Key key = new Key(response.getId());
promise = (RPromise<RRemoteServiceResponse>) entryResponses.remove(response.getId()); ConcurrentMap<Key, RPromise<? extends RRemoteServiceResponse>> entryResponses = entry.getResponses();
java.util.concurrent.ScheduledFuture<?> timeoutFuture = entry.getTimeouts().remove(response.getId()); promise = (RPromise<RRemoteServiceResponse>) entryResponses.remove(key);
java.util.concurrent.ScheduledFuture<?> timeoutFuture = entry.getTimeouts().remove(key);
timeoutFuture.cancel(false); timeoutFuture.cancel(false);
if (entryResponses.isEmpty()) { if (entryResponses.isEmpty()) {

@ -21,6 +21,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
/** /**
@ -30,16 +33,58 @@ import io.netty.util.internal.PlatformDependent;
*/ */
public class ResponseEntry { public class ResponseEntry {
private final ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> responses = PlatformDependent.newConcurrentHashMap(); public static class Key {
private final ConcurrentMap<String, ScheduledFuture<?>> timeouts = PlatformDependent.newConcurrentHashMap();
private final AtomicBoolean started = new AtomicBoolean(); private final long id0;
private final long id1;
public Key(String id) {
byte[] buf = ByteBufUtil.decodeHexDump(id);
ByteBuf b = Unpooled.wrappedBuffer(buf);
try {
id0 = b.readLong();
id1 = b.readLong();
} finally {
b.release();
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (id0 ^ (id0 >>> 32));
result = prime * result + (int) (id1 ^ (id1 >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Key other = (Key) obj;
if (id0 != other.id0)
return false;
if (id1 != other.id1)
return false;
return true;
}
}
private final ConcurrentMap<Key, RPromise<? extends RRemoteServiceResponse>> responses = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Key, ScheduledFuture<?>> timeouts = PlatformDependent.newConcurrentHashMap();
private final AtomicBoolean started = new AtomicBoolean();
public ConcurrentMap<String, ScheduledFuture<?>> getTimeouts() { public ConcurrentMap<Key, ScheduledFuture<?>> getTimeouts() {
return timeouts; return timeouts;
} }
public ConcurrentMap<String, RPromise<? extends RRemoteServiceResponse>> getResponses() { public ConcurrentMap<Key, RPromise<? extends RRemoteServiceResponse>> getResponses() {
return responses; return responses;
} }

Loading…
Cancel
Save