网站首页 文章专栏 深入druid之druid连接池源码分析
深入druid之druid连接池源码分析

一. 核心DruidDataSource类

要找一个类代表druid的话,那么非DruidDataSource这个莫属了,其核心连接的维护,连接的构建,入池,获取,收缩,销毁,以及核心监控数据都在这个类维护,所以在研究之前必须把这个类中的核心成员变量搞清楚含义,不然很难阅读源码。

先来张该类的全局信息:

1637078820(1).jpg


看图可得知DruidDataSource继承自DruidAbstractDataSource和CommonDataSource。说明DruidDataSrouce是一个DataSource,可以直接getConnection获取连接。且拥有两个task实现runnable接口,三个线程继承thread,分别处理销毁任务,创建连接任务,记录统计信息等信息。


二. 核心成员变量

类名描述
ExceptionSorter用于判断SQLException对象是否致命异常
ValidConnectionChecker用于校验指定连接对象是否有效
CreateConnectionThreadDruidDataSource的内部类,用于异步创建连接对象
notEmpty调用notEmpty.await()时,当前线程进入等待;当连接创建完成或者回收了连接,会调用notEmpty.signal()时,将等待线程唤醒
empty调用empty.await()时,CreateConnectionThread进入等待,调用empty.signal()时,CreateConnectionThread被唤醒,并进入创建连接
DestroyConnectionThreadDruidDataSource的内部类,用于异步检验连接对象,包括校验空闲连接的phyTimeoutMillis,minEvictableIdleTimeMillis,以及校验借出连接的removeAbandonedTimeoutMillis
LogStatsThreadDruidDataSource的内部类,用于异步记录统计信息
connections用于存放所有连接对象
evictConnections用于存放需要丢弃的连接对象
keepAliveConnections用于存放需要keepAlive的连接对象
activeConnections用于存放需要进行removeAbandoned的连接对象
poolingCount空闲连接对象的数量
activeCount借出连接对象的数量

三. 初始化

初始化的过程分为两种情况,一种是在加载druid时初始化,还有就是在获取连接时也会初始话,DruidDataSource的这个初始化时机是可选的,当我们设置init=true时,在createDataSource时就会调用DataSource.init()方法进行初始化,否则,只会在getConnection时再进行初始化,当然这个初始化肯定时只会进行一次的。

public void init() throws SQLException {
    if (inited) {
        return;
    }

    // bug fixed for dead lock, for issue #2980
    DruidDriver.getInstance();

    final ReentrantLock lock = this.lock;
    try {
        lock.lockInterruptibly();
    } catch (InterruptedException e) {
        throw new SQLException("interrupt", e);
    }

    boolean init = false;

1,第一步就是判断是否已经初始化了,如果已经初始化,那么不再进行,保证只初始化一次,inited这个字段是个volatile修饰的布尔类型

2,然后是DruidDriver.getInstance();这一步是为了提前初始化DruidDriver,因为之前有个issue提到这里在多线程下初始化可能存在死锁,所以就给提前显式初始化了

3,加锁,使用一个可重入锁,并使用可处理中断异常的方式获取锁,保证下面的处理只有一个线程能处理

try {
    if (inited) {
        return;
    }

    initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

    this.id = DruidDriver.createDataSourceId();
    if (this.id > 1) {
        long delta = (this.id - 1) * 100000;
        this.connectionIdSeedUpdater.addAndGet(this, delta);
        this.statementIdSeedUpdater.addAndGet(this, delta);
        this.resultSetIdSeedUpdater.addAndGet(this, delta);
        this.transactionIdSeedUpdater.addAndGet(this, delta);
    }

    if (this.jdbcUrl != null) {
        this.jdbcUrl = this.jdbcUrl.trim();
        initFromWrapDriverUrl();
    }

    for (Filter filter : filters) {
        filter.init(this);
    }

    if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
        this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
    }

    DbType dbType = DbType.of(this.dbTypeName);
    if (dbType == DbType.mysql
            || dbType == DbType.mariadb
            || dbType == DbType.oceanbase
            || dbType == DbType.ads) {
        boolean cacheServerConfigurationSet = false;
        if (this.connectProperties.containsKey("cacheServerConfiguration")) {
            cacheServerConfigurationSet = true;
        } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
            cacheServerConfigurationSet = true;
        }
        if (cacheServerConfigurationSet) {
            this.connectProperties.put("cacheServerConfiguration", "true");
        }
    }

1,这段代码就已经在锁里了,再次判断是否已经初始化,同时创建一个datasourceId,这个id是原子类,保证安全

2,如果id>1,那么说明不止一个数据源,if里面的代码猜测可能是保留一个区间段id给每个数据源使用的

3,initFromWrapDriverUrl(),是针对druid自定义的一种url格式,以jdbc:wrap-jdbc:开头,进行解析

4,this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null); 根据url前缀,确定dbType

// 一段对配置参数进行检查的代码
... 

initFromSPIServiceLoader();

resolveDriver();

initCheck();

initExceptionSorter();
initValidConnectionChecker();
validationQueryCheck();

1,采用SPI机制加载过滤器,这部分过滤器除了放入filters,还会放入autoFilters

2,处理驱动,根据我们配置中的连接地址的协议,得到具体的驱动类型

3,initCheck,只是针对oracle和DB2,需要校验validationQuery

4,根据dbType实例化一个具体的MySqlExceptionSorter,用来处理异常,判断异常等

5,根据dbType初始化一个具体的MySqlValidConnectionChecker,并加载配置,该类会在后面起到检测连接是否有效的作用

6,校验testOnBorrow,testOnReturn,testWhileIdle参数的合法性

connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];

SQLException connectError = null;

// 创建初始连接数
// 异步创建,createScheduler为null,不进入
if (createScheduler != null && asyncInit) {
    for (int i = 0; i < initialSize; ++i) {
        submitCreateTask(true);
    }
} else if (!asyncInit) {
    // init connections
    while (poolingCount < initialSize) {
        try {
            PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
            DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
            connections[poolingCount++] = holder;
        } catch (SQLException ex) {
            LOG.error("init datasource error, url: " + this.getUrl(), ex);
            if (initExceptionThrow) {
                connectError = ex;
                break;
            } else {
                Thread.sleep(3000);
            }
        }
    }

    if (poolingCount > 0) {
        poolingPeak = poolingCount;
        poolingPeakTime = System.currentTimeMillis();
    }
}

createAndLogThread();
createAndStartCreatorThread();
createAndStartDestroyThread();

initedLatch.await();
init = true;

initedTime = new Date();
registerMbean();

if (connectError != null && poolingCount == 0) {
    throw connectError;
}

if (keepAlive) {
    // async fill to minIdle
    if (createScheduler != null) {
        for (int i = 0; i < minIdle; ++i) {
            submitCreateTask(true);
        }
    } else {
        this.emptySignal();
    }
}

1,初始化connections,用于存放所有连接对象,evictConnections,用于存放需要丢弃的连接对象,keepAliveConnections,用于存放需要keepAlive的连接对象

2,这里有两种方式创建连接,一种是异步,一种是同步。我这里使用的是同步,createScheduler为null。

3,poolingCount 为空闲连接对象数量,当其小于初始化连接池大小时,不停的调用createPhysicalConnection();创建新连接,并放进去

3.1,createPhysicalConnection(),的流程大致就是读取配置中的url,驱动以及用户密码,实例化一个ConnectionProxyImpl,再使用上面初始话的validConnectionChecker,根据配置对该连接进行校验,判断是否是可用的连接。

4,启动三个线程

        // 启动监控数据记录线程
        createAndLogThread();        
        // 启动连接创建线程
        createAndStartCreatorThread();        
        // 启动连接检测线程
        createAndStartDestroyThread();
        initedLatch.await();

这里initedLatch为一个countdownlatch对象,保证当createConnectionThread和destroyConnectionThread开始run时再继续执行

5,注册MBean,会去注册DruidDataSourceStatManager和DruidDataSource,用来通过jmx监控

6,如果配置了keepAlive,且是异步创建连接,那么会提交创建任务,创建任务队列使用long数组实现,初始化为8个长度,每个位置为0则代表任务为空,不为零则就是具体的任务id,代表加入任务队列,当队列满了时,会进行扩容,大小为1.5倍,然后通过createScheduler,执行任务。

6.1,如果配置了keepAlive,且不是异步创建连接,那么会去调用empty.signal(),会去唤醒处于empty.await()状态的CreateConnectionThread,CreateConnectionThread这个线程只有在需要创建连接时才运行,否则会一直等待。


至此,初始话的流程已经完成,总结下就是初始化驱动实例 -> 加锁 -> 初始化属性 -> 初始化过滤器 -> 校验参数 -> 创建初始化连接并校验后加入池中 -> 创建logStatsThread、createConnectionThread和destroyConnectionThread -> 注册MBean,用于支持JMX -> 如果设置了keepAlive,通知createConnectionThread创建连接对象 -> 解锁


四. 获取连接使用

之前我们说过,DruidDataSource这个核心类其实就是一个DataSource,因为其继承了DataSource,所以它也重写了 getConnection方法,实际获取一个连接就是从这获取的

@Override
public DruidPooledConnection getConnection() throws SQLException {
    return getConnection(maxWait);
}

public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
    init();

    if (filters.size() > 0) {
        FilterChainImpl filterChain = new FilterChainImpl(this);
        return filterChain.dataSource_connect(this, maxWaitMillis);
    } else {
        return getConnectionDirect(maxWaitMillis);
    }
}

它调用了本身的一个getConnection方法,并以配置的maxWait作为参数传递进去。获取连接之前会进行druid连接池的初始化动作,这个上面也是说了,进入初始化流程发现已经初始化过了,就不会再初始化了。

接着如果配置了过滤器,也就是stat,wall,log4j那几个插件,那么会进行插件的初始化,这里使用到了责任链模式,很经典的用法了,在网关中也常用到,之前源码分析soul网关时也讲到这中用法。然后用filterChain调用dataSource_connect方法,进行获取连接。

在filterChain中,定义了当前的数据源以及连接池信息,责任链的列表,长度,以及当前访问的位置.

@Override
public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
    if (this.pos < filterSize) {
        DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis);
        return conn;
    }

    return dataSource.getConnectionDirect(maxWaitMillis);
}

如上代码,判断当前位置小于责任链长度,则使用nextFilter()获取下一个实际的过滤器获取连接,其实也就是遍历责任链,用每个过滤器插件执行下dataSource_getConnection(this, dataSource, maxWaitMillis); 这个方法在Filter中就定义了,每个过滤器都实现了具体的方法,举个例子:这是statFilter的实现

@Override
public DruidPooledConnection dataSource_getConnection(FilterChain chain, DruidDataSource dataSource,
                                                      long maxWaitMillis) throws SQLException {
    DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis);

    if (conn != null) {
        conn.setConnectedTimeNano();

        StatFilterContext.getInstance().pool_connection_open();
    }

    return conn;
}

可以看到,很骚的是,他把过滤器本身传进来了,又通过dataSource_connect调用了一次,相当于dfs递归遍历,然后又回到上面的过滤器列表的遍历,只不过这一次pos已经+1了,拿到的就是下一个过滤器。如此这样直到,pos的位置大于插件连的长度。执行这个代码dataSource.getConnectionDirect(maxWaitMillis)

这个方法字面意思就是直连获取连接:

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
    int notFullTimeoutRetryCnt = 0;
    for (;;) {
        // handle notFullTimeoutRetry
        DruidPooledConnection poolableConnection;
        try {
            poolableConnection = getConnectionInternal(maxWaitMillis);
        } catch (GetConnectionTimeoutException ex) {
            if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
                notFullTimeoutRetryCnt++;
                if (LOG.isWarnEnabled()) {
                    LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
                }
                continue;
            }
            throw ex;
        }

起一个死循环,在循环里面调用getConnectionInternal(maxWaitMillis);获取连接,关键点就在这:

poolableConnection = getConnectionInternal(maxWaitMillis);

private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
    if (closed) {
        connectErrorCountUpdater.incrementAndGet(this);
        throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
    }

    if (!enable) {
        connectErrorCountUpdater.incrementAndGet(this);

        if (disableException != null) {
            throw disableException;
        }

        throw new DataSourceDisableException();
    }

    final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
    final int maxWaitThreadCount = this.maxWaitThreadCount;

    DruidConnectionHolder holder;

    for (boolean createDirect = false;;) {
        if (createDirect) {
            createStartNanosUpdater.set(this, System.nanoTime());
            if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
                PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
                holder = new DruidConnectionHolder(this, pyConnInfo);
                holder.lastActiveTimeMillis = System.currentTimeMillis();

                creatingCountUpdater.decrementAndGet(this);
                directCreateCountUpdater.incrementAndGet(this);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("conn-direct_create ");
                }

                boolean discard = false;
                lock.lock();
                try {
                    if (activeCount < maxActive) {
                        activeCount++;
                        holder.active = true;
                        if (activeCount > activePeak) {
                            activePeak = activeCount;
                            activePeakTime = System.currentTimeMillis();
                        }
                        break;
                    } else {
                        discard = true;
                    }
                } finally {
                    lock.unlock();
                }

                if (discard) {
                    JdbcUtils.close(pyConnInfo.getPhysicalConnection());
                }
            }
        }

上面代码逻辑就是,起一个死循环,先判断是否是直接创建,刚进来肯定不是,往下走:

try {
    lock.lockInterruptibly();
} catch (InterruptedException e) {
    connectErrorCountUpdater.incrementAndGet(this);
    throw new SQLException("interrupt", e);
}

try {
    if (maxWaitThreadCount > 0
            && notEmptyWaitThreadCount >= maxWaitThreadCount) {
        connectErrorCountUpdater.incrementAndGet(this);
        throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
                + lock.getQueueLength());
    }

    if (onFatalError
            && onFatalErrorMaxActive > 0
            && activeCount >= onFatalErrorMaxActive) {
        connectErrorCountUpdater.incrementAndGet(this);

        StringBuilder errorMsg = new StringBuilder();
        errorMsg.append("onFatalError, activeCount ")
                .append(activeCount)
                .append(", onFatalErrorMaxActive ")
                .append(onFatalErrorMaxActive);

        if (lastFatalErrorTimeMillis > 0) {
            errorMsg.append(", time '")
                    .append(StringUtils.formatDateTime19(
                            lastFatalErrorTimeMillis, TimeZone.getDefault()))
                    .append("'");
        }

        if (lastFatalErrorSql != null) {
            errorMsg.append(", sql \n")
                    .append(lastFatalErrorSql);
        }

        throw new SQLException(
                errorMsg.toString(), lastFatalError);
    }

    connectCount++;

    if (createScheduler != null
            && poolingCount == 0
            && activeCount < maxActive
            && creatingCountUpdater.get(this) == 0
            && createScheduler instanceof ScheduledThreadPoolExecutor) {
        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
        if (executor.getQueue().size() > 0) {
            createDirect = true;
            continue;
        }
    }

    if (maxWait > 0) {
        holder = pollLast(nanos);
    } else {
        holder = takeLast();
    }

    if (holder != null) {
        if (holder.discard) {
            continue;
        }

        activeCount++;
        holder.active = true;
        if (activeCount > activePeak) {
            activePeak = activeCount;
            activePeakTime = System.currentTimeMillis();
        }
    }

先加锁,然后是对异常场景的一顿判断,如果配置了异步创建连接,且池中无可用连接,且借出去的连接小于最大活跃连接,且待创建任务的队列>0,则将createDirect置为true,跳过下面代码,回到循环上面,相当于这次连接池没有可用的连接了,通过DruidDataSource.this.createPhysicalConnection();就直接现场创建一个连接。

如果没有配置异步创建

if (maxWait > 0) {
    holder = pollLast(nanos);
} else {
    holder = takeLast();
}

如果没超时,如果无可用连接,则发一个信号创建连接,直到可用连接不为0,然后取走最后一个连接对象,下面的超时的逻辑也很类似,就是多了一个对超时时间的处理。拿到链接后,活跃连接数加1,解锁。

如果拿到的连接为null,则代表出错了,组装下错误日志,抛出异常。

正常情况,则将拿到的连接包装为DruidPooledConnection,返回。


回到最上面,我们拿到连接后,通过testOnBorrow配置,对拿到的连接进行校验,这个校验的逻辑和前面创建的连接校验的逻辑是一样的,执行下配置的select 'x'。校验结果如果不合法,则代表此链接没用,就需要抛弃掉

if (testOnBorrow) {
    boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
    if (!validate) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("skip not validate connection.");
        }

        discardConnection(poolableConnection.holder);
        continue;
    }
}

抛弃的流程就是,如果连接不为null,就将连接关了,同时将该连接的holer的属性置为抛弃,活跃数量减一,如果此时使用中的连接数量小于配置的最小连接数,则发起emptySignal()信号,进行创建。不合法后,则需要继续循环再从连接池拿一个连接,直到有效。


然后是testWhileIdle配置,该配置意思是申请连接时如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效,逻辑也就这个意思

if (testWhileIdle) {
    final DruidConnectionHolder holder = poolableConnection.holder;
    long currentTimeMillis             = System.currentTimeMillis();
    long lastActiveTimeMillis          = holder.lastActiveTimeMillis;
    long lastExecTimeMillis            = holder.lastExecTimeMillis;
    long lastKeepTimeMillis            = holder.lastKeepTimeMillis;

    if (checkExecuteTime
            && lastExecTimeMillis != lastActiveTimeMillis) {
        lastActiveTimeMillis = lastExecTimeMillis;
    }

    if (lastKeepTimeMillis > lastActiveTimeMillis) {
        lastActiveTimeMillis = lastKeepTimeMillis;
    }

    long idleMillis                    = currentTimeMillis - lastActiveTimeMillis;

    long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;

    if (timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis
            || idleMillis < 0 // unexcepted branch
            ) {
        boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
        if (!validate) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("skip not validate connection.");
            }

            discardConnection(poolableConnection.holder);
             continue;
        }
    }
}


然后就成功的拿到连接了,此时代码回到了这里:

@Override
public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
    if (this.pos < filterSize) {
        DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis);
        return conn;
    }

    // 此处执行完了
    return dataSource.getConnectionDirect(maxWaitMillis);
}

因为这里的过滤器链是dfs递归调用的,所以回去的时候就是一层一层的会去,回到每一层,执行获取连接后的逻辑,比如StatFilter会执行:

// 拿到连接后
if (conn != null) {
    conn.setConnectedTimeNano();

    StatFilterContext.getInstance().pool_connection_open();
}

层层返回后,这个连接就算成功拿到了,交给spring集成相关的代码,去实际执行sql了。这里使用责任链,通过每个过滤器去获取连接的目的就是,让每个插件在拿到连接的时候都能做一下处理,不想做直接跳过即可,以后也可以很方便拓展插件。

到此获取连接的逻辑结束。


五. 使用连接

连接的上层获取动作,是从DataSourceUtils中的getConnection获取的,调用我们上面分析的DruidDataSource中获取一个实际的连接。

拿到连接后,将连接包装成一个ConnectionHolder

public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    Assert.notNull(dataSource, "No DataSource specified");
    ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource);
    if (conHolder == null || !conHolder.hasConnection() && !conHolder.isSynchronizedWithTransaction()) {
        logger.debug("Fetching JDBC Connection from DataSource");
        //拿到一个连接
        Connection con = fetchConnection(dataSource);
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            logger.debug("Registering transaction synchronization for JDBC Connection");
           // 包装下
            ConnectionHolder holderToUse = conHolder;
            if (conHolder == null) {
                holderToUse = new ConnectionHolder(con);
            } else {
                conHolder.setConnection(con);
            }

            holderToUse.requested();
            TransactionSynchronizationManager.registerSynchronization(new DataSourceUtils.ConnectionSynchronization(holderToUse, dataSource));
            holderToUse.setSynchronizedWithTransaction(true);
            if (holderToUse != conHolder) {
                TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
            }
        }

        return con;
    } else {
        conHolder.requested();
        if (!conHolder.hasConnection()) {
            logger.debug("Fetching resumed JDBC Connection from DataSource");
            conHolder.setConnection(fetchConnection(dataSource));
        }

        return conHolder.getConnection();
    }
}

此时spring的事务管理器,拿到了这个连接,然后到达了mybatis手中,mybatis开始初始化s'tatement。

// 此处为mybatis的 BaseStatementHandler
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Connection connection = this.getConnection(statementLog);
    Statement stmt = handler.prepare(connection, this.transaction.getTimeout());
    handler.parameterize(stmt);
    return stmt;
}

然后使用连接的preparedStatement(String sql)方法,初始化preparedStatement,也就进入了druid的重写的方法。创建一个实际的preparedStatement后,用一个PreparedStatementHolder包装下。并记录一些关键的指标,用来统计。

// druid重写的创建preparedStatement方法
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
    checkState();

    PreparedStatementHolder stmtHolder = null;
    PreparedStatementKey key = new PreparedStatementKey(sql, getCatalog(), MethodType.M1);

    boolean poolPreparedStatements = holder.isPoolPreparedStatements();

    if (poolPreparedStatements) {
        stmtHolder = holder.getStatementPool().get(key);
    }

    if (stmtHolder == null) {
        try {
            stmtHolder = new PreparedStatementHolder(key, conn.prepareStatement(sql));
            holder.getDataSource().incrementPreparedStatementCount();
        } catch (SQLException ex) {
            handleException(ex, sql);
        }
    }

    initStatement(stmtHolder);

    DruidPooledPreparedStatement rtnVal = new DruidPooledPreparedStatement(this, stmtHolder);

    holder.addTrace(rtnVal);

    return rtnVal;
}

然后mybatis就拿到了这个实际执行的preparedStatement,开始实际的执行。


六. 归还连接

执行完成后,就开始归还连接了,最开始肯定是  preparedStatement 进行close,由于实际执行的是我们包装的DruidPooledPreparedStatement,所以close方法也会被重写执行,在DruidPooledConnection中closePoolableStatement(),方法实际去关闭。

public void closePoolableStatement(DruidPooledPreparedStatement stmt) throws SQLException {
    PreparedStatement rawStatement = stmt.getRawPreparedStatement();

    final DruidConnectionHolder holder = this.holder;
    if (holder == null) {
        return;
    }

    if (stmt.isPooled()) {
        try {
            rawStatement.clearParameters();
        } catch (SQLException ex) {
            this.handleException(ex, null);
            if (rawStatement.getConnection().isClosed()) {
                return;
            }

            LOG.error("clear parameter error", ex);
        }
    }

    PreparedStatementHolder stmtHolder = stmt.getPreparedStatementHolder();
    stmtHolder.decrementInUseCount();
    if (stmt.isPooled() && holder.isPoolPreparedStatements() && stmt.exceptionCount == 0) {
        holder.getStatementPool().put(stmtHolder);

        stmt.clearResultSet();
        holder.removeTrace(stmt);

        stmtHolder.setFetchRowPeak(stmt.getFetchRowPeak());

        stmt.setClosed(true); // soft set close
    } else if (stmt.isPooled() && holder.isPoolPreparedStatements()) {
        // the PreparedStatement threw an exception
        stmt.clearResultSet();
        holder.removeTrace(stmt);

        holder.getStatementPool()
                .remove(stmtHolder);
    } else {
        try {
            //Connection behind the statement may be in invalid state, which will throw a SQLException.
            //In this case, the exception is desired to be properly handled to remove the unusable connection from the pool.
            stmt.closeInternal();
        } catch (SQLException ex) {
            this.handleException(ex, null);
            throw ex;
        } finally {
            holder.getDataSource().incrementClosedPreparedStatementCount();
        }
    }
}

大致逻辑就是判断stmt的类型,以及参数,处理相应的数据,再调用DruidPooledStatement的close方法,进行实际的关闭。再去清除当前查询的结果的数据,对resultSet进行close。此时数据已经被查出来了,返回,开始对connect进行close。

由于我们这是连接池,如果连接好好的还能用,那么就不用真正关了,只需要给他放入池中就行,下次直接就用了。

// druidPooledConnection
@Override
public void close() throws SQLException {
    if (this.disable) {
        return;
    }

    DruidConnectionHolder holder = this.holder;
    if (holder == null) {
        if (dupCloseLogEnable) {
            LOG.error("dup close");
        }
        return;
    }

    DruidAbstractDataSource dataSource = holder.getDataSource();
    boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
    
    if (!isSameThread) {
        dataSource.setAsyncCloseConnectionEnable(true);
    }
    
    if (dataSource.isAsyncCloseConnectionEnable()) {
        syncClose();
        return;
    }

    for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
        listener.connectionClosed(new ConnectionEvent(this));
    }

    
    List filters = dataSource.getProxyFilters();
    if (filters.size() > 0) {
        FilterChainImpl filterChain = new FilterChainImpl(dataSource);
        filterChain.dataSource_recycle(this);
    } else {
        recycle();
    }

    this.disable = true;
}

这里我们再次看到了这个filter,通过filter进行回收,为啥这么做呢?其实也是为了检测,监控。。这里我们两个filter的实际都没做啥大动作,直接调用了连接的recycle()方法。

@Override
public void dataSource_recycle(DruidPooledConnection connection) throws SQLException {
    if (this.pos < filterSize) {
        nextFilter().dataSource_releaseConnection(this, connection);
        return;
    }

    connection.recycle();
}

public void recycle() throws SQLException {
    if (this.disable) {
        return;
    }

    DruidConnectionHolder holder = this.holder;
    if (holder == null) {
        if (dupCloseLogEnable) {
            LOG.error("dup close");
        }
        return;
    }

    if (!this.abandoned) {
        DruidAbstractDataSource dataSource = holder.getDataSource();
        dataSource.recycle(this);
    }

    this.holder = null;
    conn = null;
    transactionInfo = null;
    closed = true;
}

回收前,先看下这个连接是不是被抛弃了,抛弃了那么就不回收了,因为抛弃的连接都是不能用的了,而且也close了,不用担心资源没关闭问题。然后还是掉用连接的recycle方法。

到达最终的回收前,还是先做一顿检查,然后判断连接是否需要回滚

// check need to rollback?
if ((!isAutoCommit) && (!isReadOnly)) {
    pooledConnection.rollback();
}

然后调用holder的reset方法,将holder中的一些监控参数清理重置下。

holder.reset();

public void reset() throws SQLException {
    // reset default settings
    if (underlyingReadOnly != defaultReadOnly) {
        conn.setReadOnly(defaultReadOnly);
        underlyingReadOnly = defaultReadOnly;
    }

    if (underlyingHoldability != defaultHoldability) {
        conn.setHoldability(defaultHoldability);
        underlyingHoldability = defaultHoldability;
    }

    if (underlyingTransactionIsolation != defaultTransactionIsolation) {
        conn.setTransactionIsolation(defaultTransactionIsolation);
        underlyingTransactionIsolation = defaultTransactionIsolation;
    }

    if (underlyingAutoCommit != defaultAutoCommit) {
        conn.setAutoCommit(defaultAutoCommit);
        underlyingAutoCommit = defaultAutoCommit;
    }

    connectionEventListeners.clear();
    statementEventListeners.clear();

    lock.lock();
    try {
        for (Object item : statementTrace.toArray()) {
            Statement stmt = (Statement) item;
            JdbcUtils.close(stmt);
        }
        
        statementTrace.clear();
    } finally {
        lock.unlock();
    }

    conn.clearWarnings();
}

然后再判断,当前最大使用数量是否超标了,(此时可能又创建了新的连接)超标了则不回收了。

if (holder.discard) {
    return;
}

if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) {
    discardConnection(holder);
    return;
}

接着是testOnReturn,该参数是归还连接时执行validationQuery检测连接是否有效,如果开了,则会校验这个连接是不是还能用,不能用就不回收,然后关闭连接。

连接能用的话,则加锁,对监控参数处理,并将该holder放入连接池末尾:

lock.lock();
try {
    activeCount--;
    closeCount++;

    result = putLast(holder, currentTimeMillis);
    recycleCount++;
} finally {
    lock.unlock();
}

放入末尾不成功则会将连接关了,同时结束整个回收的流程。

if (!result) {
    JdbcUtils.close(holder.conn);
    LOG.info("connection recyle failed.");
}

放入成功,那么这个连接就被成功回收了,而且因为放入末尾,下次再来个连接就还会优先用它,这也是一个小设计。

至此,整个回收的逻辑就完了。


总体看下来,整个druid的运行逻辑就是,druid继承了datasource,statement,对这些核心参数用一个holder代理,并放入一个核心数组,这个数组就是连接池,接入spring工程后,mybatis从druid获取一个连接,并通过连接获取一个druid代理的preparedStatement,执行完成后,一层一层的关闭连接,在关闭连接时,如果连接还能用就归还给连接池,完成连接的复用。

同时使用一组责任链模式的filter,在建立连接,关闭连接等动作时,维护,监控一些数据,达到分析连接池中连接的目的。还维护了几个监听器,去监听连接数量的信号,去创建/销毁连接,保证核心连接池中随时有可用的连接。通过维护的监控的数据,我们可以通过控制台清晰的看到连接池的具体数量,以及判断大致的执行情况,分析我们业务的执行情况,以及是否健康。


1637252634(1).jpg



源码分析基本到此就结束了,其他的也没啥核心逻辑了,后面在分析下,druid的设计上面的小心思,我们能从中学到什么来应用到我们自己的工程,光是这么看源码其实没啥意义,还是要多学别人的实现方式,以及如何写出优雅,拓展性强,可维护性强的代码。




版权声明:本文由星尘阁原创出品,转载请注明出处!

本文链接:http://www.52xingchen.cn/detail/95




赞助本站,网站的发展离不开你们的支持!
来说两句吧
大侠留个名吧,或者可以使用QQ登录。
: 您已登陆!可以继续留言。
最新评论