|
|
|
@ -125,17 +125,19 @@ public class MasterSlaveEntry {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RedisConnection connectionWriteOp() {
|
|
|
|
|
acquireMasterConnection();
|
|
|
|
|
// may changed during changeMaster call
|
|
|
|
|
ConnectionEntry entry = masterEntry;
|
|
|
|
|
acquireMasterConnection(entry);
|
|
|
|
|
|
|
|
|
|
RedisConnection conn = masterEntry.getConnections().poll();
|
|
|
|
|
RedisConnection conn = entry.getConnections().poll();
|
|
|
|
|
if (conn != null) {
|
|
|
|
|
return conn;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
return masterEntry.connect(config);
|
|
|
|
|
return entry.connect(config);
|
|
|
|
|
} catch (RedisException e) {
|
|
|
|
|
masterEntry.getConnectionsSemaphore().release();
|
|
|
|
|
entry.getConnectionsSemaphore().release();
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -148,11 +150,11 @@ public class MasterSlaveEntry {
|
|
|
|
|
return slaveBalancer.nextPubSubConnection();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void acquireMasterConnection() {
|
|
|
|
|
if (!masterEntry.getConnectionsSemaphore().tryAcquire()) {
|
|
|
|
|
void acquireMasterConnection(ConnectionEntry entry) {
|
|
|
|
|
if (!entry.getConnectionsSemaphore().tryAcquire()) {
|
|
|
|
|
log.warn("Master connection pool gets exhausted! Trying to acquire connection ...");
|
|
|
|
|
long time = System.currentTimeMillis();
|
|
|
|
|
masterEntry.getConnectionsSemaphore().acquireUninterruptibly();
|
|
|
|
|
entry.getConnectionsSemaphore().acquireUninterruptibly();
|
|
|
|
|
long endTime = System.currentTimeMillis() - time;
|
|
|
|
|
log.warn("Master connection acquired, time spended: {} ms", endTime);
|
|
|
|
|
}
|
|
|
|
@ -164,13 +166,14 @@ public class MasterSlaveEntry {
|
|
|
|
|
|
|
|
|
|
public void releaseWrite(RedisConnection connection) {
|
|
|
|
|
// may changed during changeMaster call
|
|
|
|
|
if (!masterEntry.getClient().equals(connection.getRedisClient())) {
|
|
|
|
|
ConnectionEntry entry = masterEntry;
|
|
|
|
|
if (!entry.getClient().equals(connection.getRedisClient())) {
|
|
|
|
|
connection.closeAsync();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
masterEntry.getConnections().add(connection);
|
|
|
|
|
masterEntry.getConnectionsSemaphore().release();
|
|
|
|
|
entry.getConnections().add(connection);
|
|
|
|
|
entry.getConnectionsSemaphore().release();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void releaseRead(RedisConnection сonnection) {
|
|
|
|
|