网站首页 文章专栏 深入druid之连接的维护与生命周期
深入druid之连接的维护与生命周期

一. 连接的基础

通过前面的分析,我们知道,在druid中连接是用数组保存的,因为连接池是要配置最大可用连接数量的,所以也不用扩容,直接初始化最大就完事了,数组用起来更节省点空间。

// store
private volatile DruidConnectionHolder[] connections;

同时druid还有两组非常重要的线程,一组用来创建线程保存到数组,一组用来销毁连接,并从数组移除。

// 创建连接类
public class CreateConnectionTask implements Runnable 
public class CreateConnectionThread extends Thread

// 销毁连接类
public class DestroyTask implements Runnable
public class DestroyConnectionThread extends Thread

为什么每组都有两个呢,这个后面再分析。

所有的连接都是上面的两个创建线程类实现,且使用一个reentrantlock,和一对condition去相互干预,保证创建线程不会一直创建,也不会一直不创建。

protected ReentrantLock                            lock;
protected Condition                                notEmpty;
protected Condition                                empty;

先补充复习下Lock与condition和countdownlatch

二. Lock与condition

countDownLatch:

    - countDownLatch是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier、Semaphore、concurrentHashMap和BlockingQueue。

    - 存在于java.util.cucurrent包下。

    - countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。

    - 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。


Lock与condition:   

    - 在没有Lock之前,我们使用synchronized来控制同步,配合Object的wait()、notify()系列方法可以实现等待/通知模式。在Java SE5后,Java提供了Lock接口,相对于Synchronized而言,Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。

    - Condition提供了一系列的方法来对阻塞和唤醒线程,常见的有await()和signal()。

    - Condition是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。

    - Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。


三. 连接的创建

整体的流程前面也分析了,这里我们简单看下流程图:

1637340461(1).jpg


CreateConnectionThread 该线程与DestroyConnectionThread 都是守护线程,默默在后端进行连接的创造与销毁,通过empty与notEmpty两个condition来控制什么时候该去创建连接。

public CreateConnectionThread(String name){
    super(name);
    this.setDaemon(true);
}
public DestroyConnectionThread(String name){
    super(name);
    this.setDaemon(true);
}

且他俩在druid初始化的init时就会进行启动

// 同时启动还有一个log的线程
createAndLogThread();
createAndStartCreatorThread();
createAndStartDestroyThread();

initedLatch.await();

启动中会通过initedLatch,强制让两个线程都启动成功了再走下面步骤。initedLatch初始化为2,每个线程启动后都会减一。

protected void createAndStartCreatorThread() {
    if (createScheduler == null) {
        String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this);
        createConnectionThread = new CreateConnectionThread(threadName);
        createConnectionThread.start();
        return;
    }

    initedLatch.countDown();
}

创建线程启动后,会在线程内启动一个死循环,我们看下这个线程:

public class CreateConnectionThread extends Thread {

    public CreateConnectionThread(String name){
        super(name);
        // 设置守护线程
        this.setDaemon(true);
    }

    public void run() {
        // 启动后减一,等待销毁线程的也减一,才确保这两个线程都成功创建了
        initedLatch.countDown();

        long lastDiscardCount = 0;
        int errorCount = 0;
        // 死循环
        for (;;) {
            // addLast
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e2) {
                break;
            }

            long discardCount = DruidDataSource.this.discardCount;
            boolean discardChanged = discardCount - lastDiscardCount > 0;
            lastDiscardCount = discardCount;

            try {
                // 等待创建连接,默认true,表示要等待,不创建 
                boolean emptyWait = true;
                
                // 如果连接池是空的,则需要创建
                if (createError != null
                        && poolingCount == 0
                        && !discardChanged) {
                    emptyWait = false;
                }
                
                // 如果开启异步创建,且创建的数量小于初始化大小,也需要创建
                if (emptyWait
                        && asyncInit && createCount < initialSize) {
                    emptyWait = false;
                }

                if (emptyWait) {
                    // 必须存在线程等待,才创建连接
                    if (poolingCount >= notEmptyWaitThreadCount //
                            && (!(keepAlive && activeCount + poolingCount < minIdle))
                            && !isFailContinuous()
                    ) {
                        // 表示现在不需要创建,开始等待empty开启信号
                        empty.await();
                    }

                    // 防止创建超过maxActive数量的连接
                    if (activeCount + poolingCount >= maxActive) {
                        empty.await();
                        continue;
                    }
                }

            } catch (InterruptedException e) {
                lastCreateError = e;
                lastErrorTimeMillis = System.currentTimeMillis();

                if ((!closing) && (!closed)) {
                    LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
                }
                break;
            } finally {
                lock.unlock();
            }

            PhysicalConnectionInfo connection = null;

            try {
                // 创建物理连接
                connection = createPhysicalConnection();
            } catch (SQLException e) {
                LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
                          + ", state " + e.getSQLState(), e);

                errorCount++;
                if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                    // fail over retry attempts
                    setFailContinuous(true);
                    if (failFast) {
                        lock.lock();
                        try {
                            notEmpty.signalAll();
                        } finally {
                            lock.unlock();
                        }
                    }

                    if (breakAfterAcquireFailure) {
                        break;
                    }

                    try {
                        Thread.sleep(timeBetweenConnectErrorMillis);
                    } catch (InterruptedException interruptEx) {
                        break;
                    }
                }
            } catch (RuntimeException e) {
                LOG.error("create connection RuntimeException", e);
                setFailContinuous(true);
                continue;
            } catch (Error e) {
                LOG.error("create connection Error", e);
                setFailContinuous(true);
                break;
            }

            if (connection == null) {
                continue;
            }
            
            // 没啥问题就保存到连接池
            boolean result = put(connection);
            if (!result) {
                JdbcUtils.close(connection.getPhysicalConnection());
                LOG.info("put physical connection to pool failed.");
            }

            errorCount = 0; // reset errorCount

            if (closing || closed) {
                break;
            }
        }
    }
}
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
    DruidConnectionHolder holder = null;
    try {
        // 包装成holder
        holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
    } catch (SQLException ex) {
        lock.lock();
        try {
            if (createScheduler != null) {
                clearCreateTask(physicalConnectionInfo.createTaskId);
            }
        } finally {
            lock.unlock();
        }
        LOG.error("create connection holder error", ex);
        return false;
    }
    // 放入连接池
    return put(holder, physicalConnectionInfo.createTaskId, false);
}

放入连接池

private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
    // 加锁,保证其他线程别同时放,覆盖了
    lock.lock();
    try {
        if (this.closing || this.closed) {
            return false;
        }
        
        // 连接池如果大于最大数量,那么清理这个创建任务
        if (poolingCount >= maxActive) {
            if (createScheduler != null) {
                clearCreateTask(createTaskId);
            }
            return false;
        }
        
        // 检测是否已存在
        if (checkExists) {
            for (int i = 0; i < poolingCount; i++) {
                if (connections[i] == holder) {
                    return false;
                }
            }
        }

        // 放入连接池,并加1
        connections[poolingCount] = holder;
        incrementPoolingCount();

        if (poolingCount > poolingPeak) {
            poolingPeak = poolingCount;
            poolingPeakTime = System.currentTimeMillis();
        }
        
        // 通知连接池现在不是空的了
        notEmpty.signal();
        notEmptySignalCount++;

        if (createScheduler != null) {
        // 如果创建线程不是null,那么清理创建队列的任务,因为这个方法从创建任务队列也可进入
            clearCreateTask(createTaskId);

            if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
                && activeCount + poolingCount + createTaskCount < maxActive) {
                emptySignal();
            }
        }
    } finally {
        lock.unlock();
    }
    return true;
}

到此,就创建完了,但是上面我们提到除了这个守护线程一直在背后创建连接,还有一个实现了runnable的CreateConnectionTask,这个是干嘛的呢?其实它和守护线程创建连接基本是一样的,不同的点是它不是守护线程,一直在背后,而且需要通过手动触发,在一些场合我们需要立即创建一个连接的话,就可以提交到创建连接的任务队列,然后去创建一个连接。

public class CreateConnectionTask implements Runnable {

    private int errorCount   = 0;
    private boolean initTask = false;
    private final long taskId;

    public CreateConnectionTask() {
        taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
    }

    public CreateConnectionTask(boolean initTask) {
        taskId = createTaskIdSeedUpdater.getAndIncrement(DruidDataSource.this);
        this.initTask = initTask;
    }

    @Override
    public void run() {
        runInternal();
    }

    private void runInternal() {
        for (;;) {

            // addLast
            lock.lock();
            try {
                if (closed || closing) {
                    clearCreateTask(taskId);
                    return;
                }

                boolean emptyWait = true;

                if (createError != null && poolingCount == 0) {
                    emptyWait = false;
                }

                if (emptyWait) {
                    // 必须存在线程等待,才创建连接
                    if (poolingCount >= notEmptyWaitThreadCount //
                            && (!(keepAlive && activeCount + poolingCount < minIdle)) // 在keepAlive场景不能放弃创建
                            && (!initTask) // 线程池初始化时的任务不能放弃创建
                            && !isFailContinuous() // failContinuous时不能放弃创建,否则会无法创建线程
                            && !isOnFatalError() // onFatalError时不能放弃创建,否则会无法创建线程
                    ) {
                        // 逻辑都一样,不同点在这,上面的守护线程会hang住,这里不会,直接清理掉当前任务队列,然后结束
                        clearCreateTask(taskId);
                        return;
                    }

                    // 防止创建超过maxActive数量的连接
                    if (activeCount + poolingCount >= maxActive) {
                        clearCreateTask(taskId);
                        return;
                    }
                }
            } finally {
                lock.unlock();
            }

            PhysicalConnectionInfo physicalConnection = null;

            try {
                physicalConnection = createPhysicalConnection();
            } catch (OutOfMemoryError e) {
                LOG.error("create connection OutOfMemoryError, out memory. ", e);

                errorCount++;
                if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                    // fail over retry attempts
                    setFailContinuous(true);
                    if (failFast) {
                        lock.lock();
                        try {
                            notEmpty.signalAll();
                        } finally {
                            lock.unlock();
                        }
                    }

                    if (breakAfterAcquireFailure) {
                        lock.lock();
                        try {
                            clearCreateTask(taskId);
                        } finally {
                            lock.unlock();
                        }
                        return;
                    }

                    this.errorCount = 0; // reset errorCount
                    if (closing || closed) {
                        lock.lock();
                        try {
                            clearCreateTask(taskId);
                        } finally {
                            lock.unlock();
                        }
                        return;
                    }

                    createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
                    return;
                }
            } catch (SQLException e) {
                LOG.error("create connection SQLException, url: " + jdbcUrl, e);

                errorCount++;
                if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
                    // fail over retry attempts
                    setFailContinuous(true);
                    if (failFast) {
                        lock.lock();
                        try {
                            notEmpty.signalAll();
                        } finally {
                            lock.unlock();
                        }
                    }

                    if (breakAfterAcquireFailure) {
                        lock.lock();
                        try {
                            clearCreateTask(taskId);
                        } finally {
                            lock.unlock();
                        }
                        return;
                    }

                    this.errorCount = 0; // reset errorCount
                    if (closing || closed) {
                        lock.lock();
                        try {
                            clearCreateTask(taskId);
                        } finally {
                            lock.unlock();
                        }
                        return;
                    }

                    createSchedulerFuture = createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
                    return;
                }
            } catch (RuntimeException e) {
                LOG.error("create connection RuntimeException", e);
                // unknow fatal exception
                setFailContinuous(true);
                continue;
            } catch (Error e) {
                lock.lock();
                try {
                    clearCreateTask(taskId);
                } finally {
                    lock.unlock();
                }
                LOG.error("create connection Error", e);
                // unknow fatal exception
                setFailContinuous(true);
                break;
            } catch (Throwable e) {
                lock.lock();
                try {
                    clearCreateTask(taskId);
                } finally {
                    lock.unlock();
                }

                LOG.error("create connection unexecpted error.", e);
                break;
            }

            if (physicalConnection == null) {
                continue;
            }

            physicalConnection.createTaskId = taskId;
            boolean result = put(physicalConnection);
            if (!result) {
                JdbcUtils.close(physicalConnection.getPhysicalConnection());
                LOG.info("put physical connection to pool failed.");
            }
            break;
        }
    }
}

核心区别点,就在于当连接不需要被创建时,不会hang住,等待唤醒,而且直接清除创建任务队列,然后结束。该线程的触发是在该方法中:

private void submitCreateTask(boolean initTask) {
    createTaskCount++;
    CreateConnectionTask task = new CreateConnectionTask(initTask);
    if (createTasks == null) {
        createTasks = new long[8];
    }

    boolean putted = false;
    for (int i = 0; i < createTasks.length; ++i) {
        if (createTasks[i] == 0) {
            createTasks[i] = task.taskId;
            putted = true;
            break;
        }
    }
    if (!putted) {
        long[] array = new long[createTasks.length * 3 / 2];
        System.arraycopy(createTasks, 0, array, 0, createTasks.length);
        array[createTasks.length] = task.taskId;
        createTasks = array;
    }

    this.createSchedulerFuture = createScheduler.submit(task);
}

提交一个创建连接的任务,用一个初始化长度为8的long数组表示任务队列,这里没太明白,这个队列有啥用,也起不到缓存的作用,只能是统计当前创建任务队列用?好像也没用到。


四. 连接的销毁

我们再来看下DestroyConnectionThread,这个线程作用就是用来一直扫描是否存在需要被回收处理的连接,满足条件则会被销毁,且如果连接池为空了就会通知CreateConnectionThread创建线程。

public class DestroyConnectionThread extends Thread {

    public DestroyConnectionThread(String name){
        super(name);
        // 守护线程
        this.setDaemon(true);
    }

    public void run() {
        initedLatch.countDown();

        for (;;) {
            // 从前面开始删除
            try {
                if (closed || closing) {
                    break;
                }

                if (timeBetweenEvictionRunsMillis > 0) {
                    Thread.sleep(timeBetweenEvictionRunsMillis);
                } else {
                    Thread.sleep(1000); //
                }

                if (Thread.interrupted()) {
                    break;
                }
                // 调用另一个task线程实际去执行
                destroyTask.run();
            } catch (InterruptedException e) {
                break;
            }
        }
    }

}
public class DestroyTask implements Runnable {
    public DestroyTask() {

    }

    @Override
    public void run() {
        // 大体逻辑是扫描所有连接池中未活动的连接,判断满足条件的就保留,不满足条件的如超时等连接就会被销毁
        shrink(true, keepAlive);

        if (isRemoveAbandoned()) {
            removeAbandoned();
        }
    }
}
public void shrink(boolean checkTime, boolean keepAlive) {
    try {
        lock.lockInterruptibly();
    } catch (InterruptedException e) {
        return;
    }

    boolean needFill = false;
    int evictCount = 0;
    int keepAliveCount = 0;
    int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
    fatalErrorCountLastShrink = fatalErrorCount;
    
    try {
        if (!inited) {
            return;
        }

        final int checkCount = poolingCount - minIdle;
        final long currentTimeMillis = System.currentTimeMillis();
        // 遍历所有连接
        for (int i = 0; i < poolingCount; ++i) {
            DruidConnectionHolder connection = connections[i];

            if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis))  {
               // 需要保活的就放入keepAliveConnections中
                keepAliveConnections[keepAliveCount++] = connection;
                continue;
            }

            if (checkTime) {
                if (phyTimeoutMillis > 0) {
                    long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                    if (phyConnectTimeMillis > phyTimeoutMillis) {
                        evictConnections[evictCount++] = connection;
                        continue;
                    }
                }

                long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;

                if (idleMillis < minEvictableIdleTimeMillis
                        && idleMillis < keepAliveBetweenTimeMillis
                ) {
                    break;
                }
                
                // 超过存活时间,则放入evictConnections
                if (idleMillis >= minEvictableIdleTimeMillis) {
                    if (checkTime && i < checkCount) {
                        evictConnections[evictCount++] = connection;
                        continue;
                    } else if (idleMillis > maxEvictableIdleTimeMillis) {
                        evictConnections[evictCount++] = connection;
                        continue;
                    }
                }

                if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                    keepAliveConnections[keepAliveCount++] = connection;
                }
            } else {
                if (i < checkCount) {
                    evictConnections[evictCount++] = connection;
                } else {
                    break;
                }
            }
        }

        int removeCount = evictCount + keepAliveCount;
        if (removeCount > 0) {
            System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
            Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
            poolingCount -= removeCount;
        }
        keepAliveCheckCount += keepAliveCount;

        if (keepAlive && poolingCount + activeCount < minIdle) {
            needFill = true;
        }
    } finally {
        lock.unlock();
    }

    if (evictCount > 0) {
        // 遍历已经超时的连接,关闭连接,并重置evictConnections
        for (int i = 0; i < evictCount; ++i) {
            DruidConnectionHolder item = evictConnections[i];
            Connection connection = item.getConnection();
            JdbcUtils.close(connection);
            destroyCountUpdater.incrementAndGet(this);
        }
        Arrays.fill(evictConnections, null);
    }

    if (keepAliveCount > 0) {
        // keep order
        for (int i = keepAliveCount - 1; i >= 0; --i) {
            DruidConnectionHolder holer = keepAliveConnections[i];
            Connection connection = holer.getConnection();
            holer.incrementKeepAliveCheckCount();

            boolean validate = false;
            try {
                // 保活的要进行检测连接可用性
                this.validateConnection(connection);
                validate = true;
            } catch (Throwable error) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("keepAliveErr", error);
                }
                // skip
            }

            boolean discard = !validate;
            if (validate) {
                holer.lastKeepTimeMillis = System.currentTimeMillis();
                boolean putOk = put(holer, 0L, true);
                if (!putOk) {
                    discard = true;
                }
            }

            if (discard) {
                try {
                    connection.close();
                } catch (Exception e) {
                    // skip
                }

                lock.lock();
                try {
                    discardCount++;

                    if (activeCount + poolingCount <= minIdle) {
                        emptySignal();
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
        this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
        Arrays.fill(keepAliveConnections, null);
    }

    if (needFill) {
        lock.lock();
        try {
            // 如果需要填满,则计算差值,发起创建连接的任务,这个是创建一次性任务,并不是守护线程那个
            int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
            for (int i = 0; i < fillCount; ++i) {
                emptySignal();
            }
        } finally {
            lock.unlock();
        }
    } else if (onFatalError || fatalErrorIncrement > 0) {
        lock.lock();
        try {
            emptySignal();
        } finally {
            lock.unlock();
        }
    }
}


看到这里有个疑问,在遍历每个连接的时候,如果发现超时了,就直接给关了连接,而不是在连接池移除,这样的话,这个holder还是存在的,但是连接已经被关闭了,只能在使用的时候发现不能用再抛弃掉,换一个连接。这样的话为什么不在这就给置为null,从连接池移除了呢,还省的后面再判断是否能用?

再回头看下线程:

public class DestroyTask implements Runnable {
    public DestroyTask() {

    }

    @Override
    public void run() {
        shrink(true, keepAlive);

         // 这段代码还没看
        if (isRemoveAbandoned()) {
            removeAbandoned();
        }
    }

}

看了removeAbandoned(),这里是对连接是否泄露回收的处理。这里有几个配置:

    - removeAbandoned:如果连接泄露,是否需要回收泄露的连接,默认false

    - logAbandoned:如果回收了泄露的连接,是否要打印一条log,默认false;

    - removeAbandonedTimeoutMillis:连接回收的超时时间,默认5分钟

意思就是是否开启自动清理被租借的连接但是又没有还回线程池,当我们借出去一个连接,这个连接有没有被正确使用是不知道的,加了这个配置,那么就会检测借出去的是否超时,超时了就认为泄露了,会回收掉。

// 遍历借出去的连接
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext();) {
    DruidPooledConnection pooledConnection = iter.next();

    // 判断该连接是否还在运行,只回收不运行的连接
    // Druid会在连接执行query,update的时候设置为正在运行,
    // 并在回收后设置为不运行
    if (pooledConnection.isRunning()) {
        continue;
    }

    long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);

     //判断连接借出去的时间大小
    if (timeMillis >= removeAbandonedTimeoutMillis) {
        iter.remove();
        pooledConnection.setTraceEnable(false);
        abandonedList.add(pooledConnection);
    }
}

// 泄露的连接关了
if (abandonedList.size() > 0) {
    for (DruidPooledConnection pooledConnection : abandonedList) {
        final ReentrantLock lock = pooledConnection.lock;
        lock.lock();
        try {
            if (pooledConnection.isDisable()) {
                continue;
            }
        } finally {
            lock.unlock();
        }

        JdbcUtils.close(pooledConnection);
        pooledConnection.abandond();
        removeAbandonedCount++;
        removeCount++;

同样,这里也只是将连接close掉,连接池数组中的holder并没有置空。


线程的创建维护,销毁就看到这,还有几个疑问,后面再看

1>. druid应用关闭的过程,如何将线程池的线程挨个关闭的,如果应用强杀了,不是优雅关闭,这些连接在mysql那边会关闭吗?

2>. 当druid连接池达到最大,且使用中的连接也达到最大了,相当于此时没有连接可用了,那么druid会怎么办?等待连接还是主动回收一个最先使用的连接




版权声明:本文由星尘阁原创出品,转载请注明出处!

本文链接:http://www.52xingchen.cn/detail/96




赞助本站,网站的发展离不开你们的支持!
来说两句吧
大侠留个名吧,或者可以使用QQ登录。
: 您已登陆!可以继续留言。
最新评论