Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/spring/session/RedissonSessionRepository.java
pull/1933/head
Nikita Koksharov 6 years ago
commit 4683633319

@ -65,6 +65,11 @@ public class LocalCacheView<K, V> {
public K next() { public K next() {
return (K) iter.next().getKey(); return (K) iter.next().getKey();
} }
@Override
public void remove() {
iter.remove();
}
}; };
} }
@ -111,6 +116,11 @@ public class LocalCacheView<K, V> {
public V next() { public V next() {
return (V) iter.next().getValue(); return (V) iter.next().getValue();
} }
@Override
public void remove() {
iter.remove();
}
}; };
} }
@ -152,6 +162,11 @@ public class LocalCacheView<K, V> {
CacheValue e = iter.next(); CacheValue e = iter.next();
return new AbstractMap.SimpleEntry<K, V>((K)e.getKey(), (V)e.getValue()); return new AbstractMap.SimpleEntry<K, V>((K)e.getKey(), (V)e.getValue());
} }
@Override
public void remove() {
iter.remove();
}
}; };
} }

@ -249,7 +249,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
RedisConnection connection = nodeConnections.get(key); RedisConnection connection = nodeConnections.get(key);
if (connection != null) { if (connection != null) {
return RedissonPromise.newSucceededFuture(connection); if (!connection.isActive()) {
nodeConnections.remove(key);
connection.closeAsync();
} else {
return RedissonPromise.newSucceededFuture(connection);
}
} }
if (addr != null) { if (addr != null) {

@ -32,6 +32,7 @@ import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.ReplicatedServersConfig; import org.redisson.config.ReplicatedServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -119,15 +120,15 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
monitorFuture = group.schedule(new Runnable() { monitorFuture = group.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
if (isShuttingDown()) {
return;
}
final URI master = currentMaster.get(); final URI master = currentMaster.get();
log.debug("Current master: {}", master); log.debug("Current master: {}", master);
final AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size()); final AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size());
for (final URI addr : cfg.getNodeAddresses()) { for (final URI addr : cfg.getNodeAddresses()) {
if (isShuttingDown()) {
return;
}
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost()); RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
@ -166,6 +167,8 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
} else if (currentMaster.compareAndSet(master, addr)) { } else if (currentMaster.compareAndSet(master, addr)) {
changeMaster(singleSlotRange.getStartSlot(), addr); changeMaster(singleSlotRange.getStartSlot(), addr);
} }
} else if (!config.checkSkipSlavesInit()) {
slaveUp(addr);
} }
if (count.decrementAndGet() == 0) { if (count.decrementAndGet() == 0) {
@ -181,6 +184,13 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS); }, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
} }
private void slaveUp(URI uri) {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
log.info("slave: {} has up", uri);
}
}
@Override @Override
public void shutdown() { public void shutdown() {
if (monitorFuture != null) { if (monitorFuture != null) {

@ -257,22 +257,22 @@ public class RedissonSessionRepository implements FindByIndexNameSessionReposito
publishEvent(new SessionCreatedEvent(this, session)); publishEvent(new SessionCreatedEvent(this, session));
} }
} else if (deletedTopic.getPatternNames().contains(pattern.toString())) { } else if (deletedTopic.getPatternNames().contains(pattern.toString())) {
if (!body.contains(":")) { if (!body.startsWith(keyPrefix)) {
return; return;
} }
String id = body.split(":")[1]; String id = body.split(keyPrefix)[1];
RedissonSession session = new RedissonSession(id); RedissonSession session = new RedissonSession(id);
if (session.load()) { if (session.load()) {
session.clearPrincipal(); session.clearPrincipal();
} }
publishEvent(new SessionDeletedEvent(this, session)); publishEvent(new SessionDeletedEvent(this, session));
} else if (expiredTopic.getPatternNames().contains(pattern.toString())) { } else if (expiredTopic.getPatternNames().contains(pattern.toString())) {
if (!body.contains(":")) { if (!body.startsWith(keyPrefix)) {
return; return;
} }
String id = body.split(":")[1]; String id = body.split(keyPrefix)[1];
RedissonSession session = new RedissonSession(id); RedissonSession session = new RedissonSession(id);
if (session.load()) { if (session.load()) {
session.clearPrincipal(); session.clearPrincipal();

Loading…
Cancel
Save