Fixes #880 Fix race condition caused by sorting collection

while the condition of sort can change.
pull/888/head
Brett Wooldridge 8 years ago
parent 275c3d70bc
commit 61be9b923d

@ -16,7 +16,6 @@
package com.zaxxer.hikari.pool; package com.zaxxer.hikari.pool;
import static com.zaxxer.hikari.pool.PoolEntry.LASTACCESS_REVERSE_COMPARABLE;
import static com.zaxxer.hikari.util.ClockSource.currentTime; import static com.zaxxer.hikari.util.ClockSource.currentTime;
import static com.zaxxer.hikari.util.ClockSource.elapsedDisplayString; import static com.zaxxer.hikari.util.ClockSource.elapsedDisplayString;
import static com.zaxxer.hikari.util.ClockSource.elapsedMillis; import static com.zaxxer.hikari.util.ClockSource.elapsedMillis;
@ -33,11 +32,11 @@ import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLTransientConnectionException; import java.sql.SQLTransientConnectionException;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -302,14 +301,14 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public Future<Boolean> addBagItem(final int waiting) public void addBagItem(final int waiting)
{ {
final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional. final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.
if (shouldAdd) { if (shouldAdd) {
return addConnectionExecutor.submit(POOL_ENTRY_CREATOR); addConnectionExecutor.submit(POOL_ENTRY_CREATOR);
} }
return CompletableFuture.completedFuture(Boolean.TRUE); CompletableFuture.completedFuture(Boolean.TRUE);
} }
// *********************************************************************** // ***********************************************************************
@ -694,14 +693,16 @@ public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBag
logPoolState("Before cleanup "); logPoolState("Before cleanup ");
afterPrefix = "After cleanup "; afterPrefix = "After cleanup ";
connectionBag final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
.values(STATE_NOT_IN_USE) int removed = 0;
.stream() for (PoolEntry entry : notInUse) {
.sorted(LASTACCESS_REVERSE_COMPARABLE) if (elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
.skip(config.getMinimumIdle()) closeConnection(entry, "(connection has passed idleTimeout)");
.filter(p -> elapsedMillis(p.lastAccessed, now) > idleTimeout) if (++removed > config.getMinimumIdle()) {
.filter(connectionBag::reserve) break;
.forEachOrdered(p -> closeConnection(p, "(connection has passed idleTimeout)")); }
}
}
} }
logPoolState(afterPrefix); logPoolState(afterPrefix);

@ -15,22 +15,18 @@
*/ */
package com.zaxxer.hikari.pool; package com.zaxxer.hikari.pool;
import static com.zaxxer.hikari.util.ClockSource.currentTime; import com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry;
import static com.zaxxer.hikari.util.ClockSource.elapsedDisplayString; import com.zaxxer.hikari.util.FastList;
import static com.zaxxer.hikari.util.ClockSource.elapsedMillis; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.Comparator;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger; import static com.zaxxer.hikari.util.ClockSource.*;
import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry;
import com.zaxxer.hikari.util.FastList;
/** /**
* Entry used in the ConcurrentBag to track Connection instances. * Entry used in the ConcurrentBag to track Connection instances.
@ -42,8 +38,6 @@ final class PoolEntry implements IConcurrentBagEntry
private static final Logger LOGGER = LoggerFactory.getLogger(PoolEntry.class); private static final Logger LOGGER = LoggerFactory.getLogger(PoolEntry.class);
private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater; private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater;
static final Comparator<PoolEntry> LASTACCESS_REVERSE_COMPARABLE;
Connection connection; Connection connection;
long lastAccessed; long lastAccessed;
long lastBorrowed; long lastBorrowed;
@ -62,8 +56,6 @@ final class PoolEntry implements IConcurrentBagEntry
static static
{ {
LASTACCESS_REVERSE_COMPARABLE = (entryOne, entryTwo) -> Long.compare(entryTwo.lastAccessed, entryOne.lastAccessed);
stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PoolEntry.class, "state"); stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PoolEntry.class, "state");
} }

@ -29,9 +29,9 @@ import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_RES
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -76,7 +76,7 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
private final SynchronousQueue<T> handoffQueue; private final SynchronousQueue<T> handoffQueue;
public static interface IConcurrentBagEntry public interface IConcurrentBagEntry
{ {
int STATE_NOT_IN_USE = 0; int STATE_NOT_IN_USE = 0;
int STATE_IN_USE = 1; int STATE_IN_USE = 1;
@ -88,9 +88,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
int getState(); int getState();
} }
public static interface IBagStateListener public interface IBagStateListener
{ {
Future<Boolean> addBagItem(int waiting); void addBagItem(int waiting);
} }
/** /**
@ -262,7 +262,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
*/ */
public List<T> values(final int state) public List<T> values(final int state)
{ {
return sharedList.stream().filter(e -> e.getState() == state).collect(Collectors.toList()); final List<T> list = sharedList.stream().filter(e -> e.getState() == state).collect(Collectors.toList());
Collections.reverse(list);
return list;
} }
/** /**

@ -45,9 +45,9 @@ import com.zaxxer.hikari.mocks.StubDataSource;
*/ */
public class ConnectionPoolSizeVsThreadsTest { public class ConnectionPoolSizeVsThreadsTest {
public static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolSizeVsThreadsTest.class); private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolSizeVsThreadsTest.class);
public static final int ITERATIONS = 50_000; private static final int ITERATIONS = 50_000;
@Test @Test
public void testPoolSizeAboutSameSizeAsThreadCount() throws Exception { public void testPoolSizeAboutSameSizeAsThreadCount() throws Exception {

Loading…
Cancel
Save