网站首页 文章专栏 深入druid之连接的维护与生命周期
通过前面的分析,我们知道,在druid中连接是用数组保存的,因为连接池是要配置最大可用连接数量的,所以也不用扩容,直接初始化最大就完事了,数组用起来更节省点空间。
// store private volatile DruidConnectionHolder[] connections;
同时druid还有两组非常重要的线程,一组用来创建线程保存到数组,一组用来销毁连接,并从数组移除。
// 创建连接类 public class CreateConnectionTask implements Runnable public class CreateConnectionThread extends Thread // 销毁连接类 public class DestroyTask implements Runnable public class DestroyConnectionThread extends Thread
为什么每组都有两个呢,这个后面再分析。
所有的连接都是上面的两个创建线程类实现,且使用一个reentrantlock,和一对condition去相互干预,保证创建线程不会一直创建,也不会一直不创建。
protected ReentrantLock lock; protected Condition notEmpty; protected Condition empty;
先补充复习下Lock与condition和countdownlatch
countDownLatch:
- countDownLatch是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier、Semaphore、concurrentHashMap和BlockingQueue。
- 存在于java.util.cucurrent包下。
- countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
- 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
Lock与condition:
- 在没有Lock之前,我们使用synchronized来控制同步,配合Object的wait()、notify()系列方法可以实现等待/通知模式。在Java SE5后,Java提供了Lock接口,相对于Synchronized而言,Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。
- Condition提供了一系列的方法来对阻塞和唤醒线程,常见的有await()和signal()。
- Condition是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。
- Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
整体的流程前面也分析了,这里我们简单看下流程图:

CreateConnectionThread 该线程与DestroyConnectionThread 都是守护线程,默默在后端进行连接的创造与销毁,通过empty与notEmpty两个condition来控制什么时候该去创建连接。
public CreateConnectionThread(String name){
super(name);
this.setDaemon(true);
}
public DestroyConnectionThread(String name){
super(name);
this.setDaemon(true);
}且他俩在druid初始化的init时就会进行启动
// 同时启动还有一个log的线程 createAndLogThread(); createAndStartCreatorThread(); createAndStartDestroyThread(); initedLatch.await();
启动中会通过initedLatch,强制让两个线程都启动成功了再走下面步骤。initedLatch初始化为2,每个线程启动后都会减一。
protected void createAndStartCreatorThread() {
if (createScheduler == null) {
String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
createConnectionThread = new CreateConnectionThread(threadName);
createConnectionThread.start();
return;
}
initedLatch.countDown();
}创建线程启动后,会在线程内启动一个死循环,我们看下这个线程:
public class CreateConnectionThread extends Thread {
public CreateConnectionThread(String name){
super(name);
// 设置守护线程
this.setDaemon(true);
}
public void run() {
// 启动后减一,等待销毁线程的也减一,才确保这两个线程都成功创建了
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
// 死循环
for (;;) {
// addLast
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;
try {
// 等待创建连接,默认true,表示要等待,不创建
boolean emptyWait = true;
// 如果连接池是空的,则需要创建
if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}
// 如果开启异步创建,且创建的数量小于初始化大小,也需要创建
if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}
if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
// 表示现在不需要创建,开始等待empty开启信号
empty.await();
}
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
empty.await();
continue;
}
}
} catch (InterruptedException e) {
lastCreateError = e;
lastErrorTimeMillis = System.currentTimeMillis();
if ((!closing) && (!closed)) {
LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
}
break;
} finally {
lock.unlock();
}
PhysicalConnectionInfo connection = null;
try {
// 创建物理连接
connection = createPhysicalConnection();
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
break;
}
try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}
if (connection == null) {
continue;
}
// 没啥问题就保存到连接池
boolean result = put(connection);
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
errorCount = 0; // reset errorCount
if (closing || closed) {
break;
}
}
}
}protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {
// 包装成holder
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
lock.lock();
try {
if (createScheduler != null) {
clearCreateTask(physicalConnectionInfo.createTaskId);
}
} finally {
lock.unlock();
}
LOG.error("create connection holder error", ex);
return false;
}
// 放入连接池
return put(holder, physicalConnectionInfo.createTaskId, false);
}放入连接池
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
// 加锁,保证其他线程别同时放,覆盖了
lock.lock();
try {
if (this.closing || this.closed) {
return false;
}
// 连接池如果大于最大数量,那么清理这个创建任务
if (poolingCount >= maxActive) {
if (createScheduler != null) {
clearCreateTask(createTaskId);
}
return false;
}
// 检测是否已存在
if (checkExists) {
for (int i = 0; i < poolingCount; i++) {
if (connections[i] == holder) {
return false;
}
}
}
// 放入连接池,并加1
connections[poolingCount] = holder;
incrementPoolingCount();
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
// 通知连接池现在不是空的了
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {
// 如果创建线程不是null,那么清理创建队列的任务,因为这个方法从创建任务队列也可进入
clearCreateTask(createTaskId);
if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}到此,就创建完了,但是上面我们提到除了这个守护线程一直在背后创建连接,还有一个实现了runnable的CreateConnectionTask,这个是干嘛的呢?其实它和守护线程创建连接基本是一样的,不同的点是它不是守护线程,一直在背后,而且需要通过手动触发,在一些场合我们需要立即创建一个连接的话,就可以提交到创建连接的任务队列,然后去创建一个连接。
public class CreateConnectionTask implements Runnable {
private int errorCount = 0;
private boolean initTask = false;
private final long taskId;
public CreateConnectionTask() {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
}
public CreateConnectionTask(boolean initTask) {
taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
this.initTask = initTask;
}
@Override
public void run() {
runInternal();
}
private void runInternal() {
for (;;) {
// addLast
lock.lock();
try {
if (closed || closing) {
clearCreateTask(taskId);
return;
}
boolean emptyWait = true;
if (createError != null && poolingCount == 0) {
emptyWait = false;
}
if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount //
&& (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive场景不能放弃创建
&& (!initTask) // 线程池初始化时的任务不能放弃创建
&& !isFailContinuous() // failContinuous时不能放弃创建,否则会无法创建线程
&& !isOnFatalError() // onFatalError时不能放弃创建,否则会无法创建线程
) {
// 逻辑都一样,不同点在这,上面的守护线程会hang住,这里不会,直接清理掉当前任务队列,然后结束
clearCreateTask(taskId);
return;
}
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
clearCreateTask(taskId);
return;
}
}
} finally {
lock.unlock();
}
PhysicalConnectionInfo physicalConnection = null;
try {
physicalConnection = createPhysicalConnection();
} catch (OutOfMemoryError e) {
LOG.error("create connection OutOfMemoryError, out memory. ", e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl, e);
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
if (closing || closed) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
return;
}
createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
// unknow fatal exception
setFailContinuous(true);
continue;
} catch (Error e) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
LOG.error("create connection Error", e);
// unknow fatal exception
setFailContinuous(true);
break;
} catch (Throwable e) {
lock.lock();
try {
clearCreateTask(taskId);
} finally {
lock.unlock();
}
LOG.error("create connection unexecpted error.", e);
break;
}
if (physicalConnection == null) {
continue;
}
physicalConnection.createTaskId = taskId;
boolean result = put(physicalConnection);
if (!result) {
JdbcUtils.close(physicalConnection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
break;
}
}
}核心区别点,就在于当连接不需要被创建时,不会hang住,等待唤醒,而且直接清除创建任务队列,然后结束。该线程的触发是在该方法中:
private void submitCreateTask(boolean initTask) {
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask(initTask);
if (createTasks == null) {
createTasks = new long[8];
}
boolean putted = false;
for (int i = 0; i < createTasks.length; ++i) {
if (createTasks[i] == 0) {
createTasks[i] = task.taskId;
putted = true;
break;
}
}
if (!putted) {
long[] array = new long[createTasks.length * 3 / 2];
System.arraycopy(createTasks, 0, array, 0, createTasks.length);
array[createTasks.length] = task.taskId;
createTasks = array;
}
this.createSchedulerFuture = createScheduler.submit(task);
}提交一个创建连接的任务,用一个初始化长度为8的long数组表示任务队列,这里没太明白,这个队列有啥用,也起不到缓存的作用,只能是统计当前创建任务队列用?好像也没用到。
我们再来看下DestroyConnectionThread,这个线程作用就是用来一直扫描是否存在需要被回收处理的连接,满足条件则会被销毁,且如果连接池为空了就会通知CreateConnectionThread创建线程。
public class DestroyConnectionThread extends Thread {
public DestroyConnectionThread(String name){
super(name);
// 守护线程
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
for (;;) {
// 从前面开始删除
try {
if (closed || closing) {
break;
}
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
// 调用另一个task线程实际去执行
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}
}
}public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
// 大体逻辑是扫描所有连接池中未活动的连接,判断满足条件的就保留,不满足条件的如超时等连接就会被销毁
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
boolean needFill = false;
int evictCount = 0;
int keepAliveCount = 0;
int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
fatalErrorCountLastShrink = fatalErrorCount;
try {
if (!inited) {
return;
}
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
// 遍历所有连接
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
// 需要保活的就放入keepAliveConnections中
keepAliveConnections[keepAliveCount++] = connection;
continue;
}
if (checkTime) {
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) {
break;
}
// 超过存活时间,则放入evictConnections
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
} else {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
if (keepAlive && poolingCount + activeCount < minIdle) {
needFill = true;
}
} finally {
lock.unlock();
}
if (evictCount > 0) {
// 遍历已经超时的连接,关闭连接,并重置evictConnections
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
// 保活的要进行检测连接可用性
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
boolean discard = !validate;
if (validate) {
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer, 0L, true);
if (!putOk) {
discard = true;
}
}
if (discard) {
try {
connection.close();
} catch (Exception e) {
// skip
}
lock.lock();
try {
discardCount++;
if (activeCount + poolingCount <= minIdle) {
emptySignal();
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null);
}
if (needFill) {
lock.lock();
try {
// 如果需要填满,则计算差值,发起创建连接的任务,这个是创建一次性任务,并不是守护线程那个
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
for (int i = 0; i < fillCount; ++i) {
emptySignal();
}
} finally {
lock.unlock();
}
} else if (onFatalError || fatalErrorIncrement > 0) {
lock.lock();
try {
emptySignal();
} finally {
lock.unlock();
}
}
}看到这里有个疑问,在遍历每个连接的时候,如果发现超时了,就直接给关了连接,而不是在连接池移除,这样的话,这个holder还是存在的,但是连接已经被关闭了,只能在使用的时候发现不能用再抛弃掉,换一个连接。这样的话为什么不在这就给置为null,从连接池移除了呢,还省的后面再判断是否能用?
再回头看下线程:
public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
shrink(true, keepAlive);
// 这段代码还没看
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
}看了removeAbandoned(),这里是对连接是否泄露回收的处理。这里有几个配置:
- removeAbandoned:如果连接泄露,是否需要回收泄露的连接,默认false
- logAbandoned:如果回收了泄露的连接,是否要打印一条log,默认false;
- removeAbandonedTimeoutMillis:连接回收的超时时间,默认5分钟
意思就是是否开启自动清理被租借的连接但是又没有还回线程池,当我们借出去一个连接,这个连接有没有被正确使用是不知道的,加了这个配置,那么就会检测借出去的是否超时,超时了就认为泄露了,会回收掉。
// 遍历借出去的连接
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext();) {
DruidPooledConnection pooledConnection = iter.next();
// 判断该连接是否还在运行,只回收不运行的连接
// Druid会在连接执行query,update的时候设置为正在运行,
// 并在回收后设置为不运行
if (pooledConnection.isRunning()) {
continue;
}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
//判断连接借出去的时间大小
if (timeMillis >= removeAbandonedTimeoutMillis) {
iter.remove();
pooledConnection.setTraceEnable(false);
abandonedList.add(pooledConnection);
}
}
// 泄露的连接关了
if (abandonedList.size() > 0) {
for (DruidPooledConnection pooledConnection : abandonedList) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
if (pooledConnection.isDisable()) {
continue;
}
} finally {
lock.unlock();
}
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;同样,这里也只是将连接close掉,连接池数组中的holder并没有置空。
线程的创建维护,销毁就看到这,还有几个疑问,后面再看
1>. druid应用关闭的过程,如何将线程池的线程挨个关闭的,如果应用强杀了,不是优雅关闭,这些连接在mysql那边会关闭吗?
2>. 当druid连接池达到最大,且使用中的连接也达到最大了,相当于此时没有连接可用了,那么druid会怎么办?等待连接还是主动回收一个最先使用的连接
版权声明:本文由星尘阁原创出品,转载请注明出处!