网站首页 文章专栏 如何自己实现一个数据库连接池
之前阅读了druid的源码,学习了大概的连接池逻辑,这次尝试自己写一个连接池,不求多好,至少要能用。
复习下连接池的整体架构,以及逻辑。整个druid围绕着核心类
druid继承了datasource,statement,对这些核心参数用一个holder代理,并放入一个核心数组,这个数组就是连接池,接入spring工程后,mybatis从druid获取一个连接,并通过连接获取一个druid代理的preparedStatement,执行完成后,一层一层的关闭连接,在关闭连接时,如果连接还能用就归还给连接池,完成连接的复用。
同时使用一组责任链模式的filter,在建立连接,关闭连接等动作时,维护,监控一些数据,达到分析连接池中连接的目的。还维护了几个监听器,去监听连接数量的信号,去创建/销毁连接,保证核心连接池中随时有可用的连接。通过维护的监控的数据,我们可以通过控制台清晰的看到连接池的具体数量,以及判断大致的执行情况,分析我们业务的执行情况,以及是否健康。
连接池的另一个关键,就是维护连接池,如何保证我们的连接池随时都有可用的连接,当连接用完时,要去增加连接,当连接失效了,要及时去除连接,需要维护整个连接的生命周期,如果要做一个中间件给其他项目使用,那么最好需要能接入springboot,简化用户的使用,对于监控方面,可以使用简单的页面,将核心监控参数展示出来就行。
那么根据这些,我们的代码结构要包含一个控制台用于展示监控数据,一个starter,用于接入springboot,一个核心的连接池,其实也就是druid的代码模块。。
取名Atlas,整体的结构如下:

核心的AtlasDataSource,需要继承datasource,并重写其方法,并需要一个holder来对每个连接进行包装,同时也需要一个driver驱动,来处理连接,因为我这里只是处理mysql的,暂时就不考虑其他数据源了。
整体连接池初始化逻辑为拿到参数,判断参数,初始化连接池大小,初始化创建连接/销毁连接线程,初始化连接池,在用户使用时核心方法为获取连接,获取连接时需要判断连接池是否有可用连接,连接是否可用,同时引申出,创建连接与销毁连接。创建与销毁类似与生产者消费者模型,可以使用这种模式实现。

昨日搭建了整体的架构,那么今日争取将整个流程疏通,首先是需要初始化线程池,我们的核心类atlasDataSource继承自 AtlasAbstractDataSource ,AtlasAbstractDataSource类主要放一些统一的配置,默认的流程等,两者均实现了DataSource接口,那么需要实现相关的方法。
核心是实现getConnection()方法,这个也就是获取一个连接。
public AtlasPooledConnection getConnection(long maxWaitMillis) throws SQLException {
// 初始化连接池
init();
// 返回一个连接,此处后续可在获取连接池前后进行sql审计,监控等操作
return getConnectionDirect(maxWaitMillis);
}在获取连接的时候进行初始化连接池,初始化一些参数,然后返回一个连接。
public void init() throws SQLException {
// 判断是否已经初始化
if (inited) {
return;
}
// 初始化驱动
// AtlasDriver.getInstance();
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
try {
// 判断参数,如必填参数未填,则初始化默认值
// 判断驱动是否正常加载,如为加载,则初始化个默认的
resolveDriver();
// 初始化检查器,后续用于校验sql连接是否正常
initValidConnectionChecker();
// 校验检查sql连接参数
validationQueryCheck();
// 构建连接池,初始化大小
connections = new AtlasConnectionHolder[maxActive];
evictConnections = new AtlasConnectionHolder[maxActive];
keepAliveConnections = new AtlasConnectionHolder[maxActive];
SQLException connectError = null;
// init connections
while (poolingCount < initialSize) {
try {
Connection conn = createConnection();
AtlasConnectionHolder holder = new AtlasConnectionHolder(this, conn);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
log.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
}
// 启动初始化创建线程
createAndStartCreatorThread();
// 启动初始化销毁线程
createAndStartDestroyThread();
initedLatch.await();
if (connectError != null && poolingCount == 0) {
throw connectError;
}
} catch (SQLException | RuntimeException e) {
log.error("{dataSource} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} finally {
inited = true;
lock.unlock();
String msg = "{dataSource-";
if (this.name != null && !this.name.isEmpty()) {
msg += this.name;
}
msg += "} inited";
log.info(msg);
}
}初始化流程是个很复杂的流程,这里为了尽快跑通,我们先简化一些流程
1,判断是否已经初始化
2,默认值处理,以及值是否合理正确,用户可能瞎配置
3,加载驱动,这里我们只关注mysql的,所以可以直接写死
4,初始化sql检查器,用于后续sql校验是否正常
5,构建连接池大小,并用配置参数向里面构建真实的连接
6,启动创建连接/销毁连接线程,用户后续连接池满了,或者多了等情况,类似与消费者模型
7,初始化完毕
在创建连接的过程中,同时还需要createStatement,而Statement执行后,还有一个resultSet,所以我们要自己定义一套statement和resultset,并实现其接口,这样我们从一个连接就可以去执行sql,并获取相应了。而且我们自己实现的还可以增加一些监控的指标。综上我们就有了几个核心类
AtlasAbstractDataSource:抽象类,用户定义默认参数,通用流程
AtlasDataSource:继承自AtlasAbstractDataSource,实现DataSource接口,核心连接池,维护连接池中连接的整个生命周期
AtlasPooledConnection:实现PooledConnection,与Connection接口,用于创建statement,代理真实的conn操作获取连接返回的直接对象,包含AtlasConnectionHolder(连接的包装)
AtlasPooledStatement:实现了Statement接口,用于执行sql,包含AtlasPooledConnection
AtlasPooledResultSet:实现了ResultSet接口
如此,我们便简单完成了一个连接池,并可以从连接池中获取连接,进行sql查询,(此时还未完成连接池的维护,连接的生命周期,以及连接的可用性判断,后面继续完善)
那么我们先测试下:
private static AtlasDataSource AS;
public AtlasTest(String connectURI){
initAS(connectURI);
}
public static void initAS(String connectURI) {
initAS(connectURI, "root", "xxx!", "com.mysql.jdbc.Driver", 40, 40, 40, 10, 5);
}
public static void main(String[] args) throws IOException, SQLException {
AtlasTest db = new AtlasTest("jdbc:mysql://127.0.0.1:3306/test");
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
FileWriter fileWriter = new FileWriter("D:\\data.txt");
try {
conn = db.getConn();
} catch (Exception e) {
System.out.println(e.getMessage());
}
long sum = 0;
for (int i = 0; i < 10; i++) {
try {
stmt = conn.createStatement();
Date start = new Date();
rs = stmt.executeQuery("select * from test where id = 1");
Date end = new Date();
sum = sum + (end.getTime() - start.getTime());
fileWriter.write(String.valueOf((end.getTime() - start.getTime())));
fileWriter.write("/\n");
} catch (SQLException e) {
e.printStackTrace();
}
}
System.out.println((float) sum );
conn.close();
stmt.close();
rs.close();
fileWriter.flush();
fileWriter.close();
}创建一个atlas连接池,将初始化参数设置为初始40个连接,最大40个连接,最大等待时间5s,并循环10次,读取数据,向文件写入每次查询的耗时。
经过多次测试,数据总耗时为:86毫秒,每次约8.6毫秒
10/ 10/ 8/ 9/ 8/ 8/ 8/ 8/ 9/ 8/
没有对比就没有指标,我们用druid的连接池用同样的代码测试下:
11/ 10/ 9/ 8/ 9/ 9/ 8/ 10/ 11/ 9/
总耗时为94,平均9.4毫秒,和我们这差不多,还是挺符合预期的,毕竟连接池为40,循环10次,都是拿现成的连接请求,如果数据量再大些,就比较考验连接池维护的性能了。我们后续完善后再测。
============5月12更新======================
如上我们测试了基本功能,能从连接池获取一个连接,并完成查询,那么当连接池中的连接用完了,怎么创建新的呢,当到了最大连接参数,用户还在创建连接会怎么样呢,此时我们一开始创建的两个守护线程就起作用了,回顾下我之前的博客: 深入druid之连接的维护与生命周期
可以想到,我们需要两个线程不停的创建/销毁,来维护连接池,当连接池不够用了,就创建,当连接不能用了,就及时销毁,我们写一下这两个线程:
public class CreateConnectionThread extends Thread {
public CreateConnectionThread(String name){
super(name);
this.setDaemon(true);
}
public void run() {
initedLatch.countDown();
for (;;) {
// addLast
log.info("收到唤醒,开始创建连接");
try {
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
try {
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
log.info("防止创建超过maxActive数量的连接,此时空闲"+poolingCount+",活跃"+activeCount+"连接池:" + connections.length);
empty.await();
continue;
}
} catch (InterruptedException e) {
break;
} finally {
lock.unlock();
}
Connection connection = null;
try {
log.info("开始创建新连接");
connection = createConnection();
log.info("创建结束");
} catch (RuntimeException | SQLException e) {
log.error("create connection RuntimeException", e);
}
if (connection == null) {
continue;
}
log.info("创建成功,准备放入");
boolean result = put(connection);
log.info("创建成功,放入完毕");
if (!result) {
log.info("put physical connection to pool failed.");
JdbcUtils.close(connection);
} else {
log.info("CreateConnectionThread创建了一个连接并放入连接池");
}
}
}
}这个创建的守护线程,会在init时加载,是一个死循环,会不停的创建连接,当创建连接数量超过设置的最大值时,就会empty.await(),开始pending住,等待其他地方发起 signal信号,再次创建。
销毁线程和这个类似,这个线程作用就是用来一直扫描是否存在需要被回收处理的连接,满足条件则会被销毁,且如果连接池为空了就会通知CreateConnectionThread创建线程,暂时没实现,先不管。
当用户获取一个连接时,会从连接池拿一个连接,并根据配置的参数,判断这个连接是否可用(暂未实现),在拿取连接的时候,会判断此时连接池有没有可用的连接,如果没有的话,则起一个死循环,在循环里面发送enpty信号,并用notempty进行await,我们的创建连接守护线程,就会收到信号,开始创建,并在创建完成后,发送notempty信号,那边等着的就收到已经创建完成的信号了,就可以获取连接了。
private AtlasPooledConnection getConnectionDirect(long maxWaitMillis) throws GetConnectionTimeoutException {
for (;;) {
AtlasPooledConnection atlasPooledConnection = null;
try {
// 获取连接
atlasPooledConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
throw ex;
} catch (InterruptedException exception) {
exception.printStackTrace();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
if (testOnBorrow) {
// todo
// boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
// if (!validate) {
// discardConnection(poolableConnection.holder);
// continue;
// }
}
return atlasPooledConnection;
}
}
private AtlasPooledConnection getConnectionInternal(long maxWaitMillis) throws SQLException, InterruptedException {
AtlasConnectionHolder holder;
// 从池中取
// 先加锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
connectCount++;
holder = takeLast();
if (holder != null) {
activeCount++;
}
AtlasPooledConnection poolalbeConnection = new AtlasPooledConnection(holder);
return poolalbeConnection;
}
private AtlasConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
boolean hasSign = false;
while (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
notEmpty.await(); // signal by recycle or creator
log.info("收到创建成功信号,用户可以获取连接了");
hasSign = true;
}
if (hasSign){
log.info("用户拿到新连接,退出创建连接,此时空闲"+poolingCount+",活跃"+activeCount+"连接池:" + connections.length);
}
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
poolingCount--;
AtlasConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}在创建线程中,创建完成并放入连接池后,会发起一个notempty信号
protected boolean put(Connection conn) {
AtlasConnectionHolder holder = null;
try {
holder = new AtlasConnectionHolder(AtlasDataSource.this, conn);
} catch (SQLException ex) {
log.error("create connection holder error", ex);
return false;
}
lock.lock();
try {
if (poolingCount >= maxActive) {
return false;
}
connections[poolingCount] = holder;
poolingCount++;
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
}
log.info("创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲"+poolingCount+",活跃"+activeCount+"连接池:" + connections.length);
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}这样,一个简易的连接池就完成了,当连接池不够用时,就会创建新的连接,(此时尚未完成,连接回收)
先测试下功能是否符合预期:
1, 先来个正常的流程,初始化5个,最大10个,我们获取3个连接使用
public static void initAS(String connectURI) {
initAS(connectURI, "root", "xxx", "com.mysql.jdbc.Driver", 5, 10, 40, 10, 5);
}
public static void main(String[] args) {
AtlasTest db = new AtlasTest("jdbc:mysql://127.0.0.1:3306/test");
try {
Connection conn1 = db.getConn();
Connection conn2 = db.getConn();
Connection conn3 = db.getConn();
Statement stmt1 = conn1.createStatement();
Statement stmt2 = conn2.createStatement();
Statement stmt3 = conn3.createStatement();
ResultSet rs1 = stmt1.executeQuery("select * from test where id = 1");
ResultSet rs2 = stmt2.executeQuery("select * from test where id = 5");
ResultSet rs3 = stmt3.executeQuery("select * from test where id = 6");
while (rs1.next()) {
String name = rs1.getString(2);
System.out.println(name);
}
while(rs2.next()){
String r2 = rs2.getString(2);
System.out.println(r2);
}
while(rs3.next()){
String r3 = rs3.getString(2);
System.out.println(r3);
}
} catch (SQLException throwables) {
System.out.println("error");
throwables.printStackTrace();
}
}我们看到结果如下:
2022-05-13 00:03:33 [main:0] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:172 - =================atlas数据库连接池开始初始化===============
2022-05-13 00:03:35 [main:1377] [INFO ] ru.yandex.clickhouse.ClickHouseDriver:42 - Driver registered
2022-05-13 00:03:35 [main:1377] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:03:36 [main:2450] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:03:39 [main:5170] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:03:39 [main:5570] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:03:39 [main:5723] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:03:39 [Druid-ConnectionPool-Create-1457226878:5902] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:284 - 收到唤醒,开始创建连接
2022-05-13 00:03:39 [main:5902] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:242 - {dataSource-} inited
2022-05-13 00:03:39 [Druid-ConnectionPool-Create-1457226878:5902] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:306 - 开始创建新连接
2022-05-13 00:03:39 [main:5902] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:245 - =================atlas数据库连接池结束初始化,耗时:5902 ms===============
2022-05-13 00:03:39 [Druid-ConnectionPool-Create-1457226878:5902] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接结论是:初始化的线程池够用,并没有真正创建新连接放入池中
2,同样代码,再测一个初始1,最大5,获取3个连接,日志如下:
2022-05-13 00:16:05 [main:0] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:172 - =================atlas数据库连接池开始初始化===============
2022-05-13 00:16:06 [main:1434] [INFO ] ru.yandex.clickhouse.ClickHouseDriver:42 - Driver registered
2022-05-13 00:16:06 [main:1434] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:16:06 [Druid-ConnectionPool-Create-1720797452:1773] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:284 - 收到唤醒,开始创建连接
2022-05-13 00:16:06 [main:1773] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:242 - {dataSource-} inited
2022-05-13 00:16:06 [Druid-ConnectionPool-Create-1720797452:1773] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:306 - 开始创建新连接
2022-05-13 00:16:06 [Druid-ConnectionPool-Create-1720797452:1773] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:16:06 [main:1773] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:245 - =================atlas数据库连接池结束初始化,耗时:1773 ms===============
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:308 - 创建结束
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:316 - 创建成功,准备放入
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:349 - 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲1,活跃1连接池:5
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:318 - 创建成功,放入完毕
2022-05-13 00:16:07 [main:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:138 - 收到创建成功信号,用户可以获取连接了
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:323 - CreateConnectionThread创建了一个连接并放入连接池
2022-05-13 00:16:07 [main:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:142 - 用户拿到新连接,退出创建连接,此时空闲1,活跃1连接池:5
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:284 - 收到唤醒,开始创建连接
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:306 - 开始创建新连接
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:1942] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:2089] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:308 - 创建结束
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:2089] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:316 - 创建成功,准备放入
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:2089] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:349 - 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲1,活跃2连接池:5
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:2089] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:318 - 创建成功,放入完毕
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:2089] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:323 - CreateConnectionThread创建了一个连接并放入连接池
2022-05-13 00:16:07 [Druid-ConnectionPool-Create-1720797452:2089] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:284 - 收到唤醒,开始创建连接
2022-05-13 00:16:07 [main:2089] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:138 - 收到创建成功信号,用户可以获取连接了
2022-05-13 00:16:07 [main:2089] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:142 - 用户拿到新连接,退出创建连接,此时空闲1,活跃2连接池:5成功完成测试,并且看日志发现,在两次发现连接不够时,均创建了新的连接,并放入连接池,且我们的用户线程也收到创建成功信号,从连接池拿到新的连接。
3,再测一个初始1,最大2,获取3个连接,日志如下:
2022-05-13 00:07:37 [main:0] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:172 - =================atlas数据库连接池开始初始化===============
2022-05-13 00:07:38 [main:1402] [INFO ] ru.yandex.clickhouse.ClickHouseDriver:42 - Driver registered
2022-05-13 00:07:38 [main:1402] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:1803] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:284 - 收到唤醒,开始创建连接
2022-05-13 00:07:39 [main:1803] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:242 - {dataSource-} inited
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:1803] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:306 - 开始创建新连接
2022-05-13 00:07:39 [main:1803] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:245 - =================atlas数据库连接池结束初始化,耗时:1803 ms===============
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:1803] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:410 - 进入创建新连接
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:2180] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:308 - 创建结束
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:2188] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:316 - 创建成功,准备放入
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:2188] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:349 - 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲1,活跃1连接池:2
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:2188] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:318 - 创建成功,放入完毕
2022-05-13 00:07:39 [main:2188] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:138 - 收到创建成功信号,用户可以获取连接了
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:2188] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:323 - CreateConnectionThread创建了一个连接并放入连接池
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:2188] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:284 - 收到唤醒,开始创建连接
2022-05-13 00:07:39 [main:2188] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:142 - 用户拿到新连接,退出创建连接,此时空闲1,活跃1连接池:2
2022-05-13 00:07:39 [Druid-ConnectionPool-Create-1720797452:2190] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:293 - 防止创建超过maxActive数量的连接,此时空闲0,活跃2连接池:2现象为,测试代码pengding住,未正常查询,看下日志发现,当连接不够用时,成功创建了新的连接,并通知notempty,用户拿到新的连接并正常使用,但是,当用户获取第三个连接时,发现配置的最大就是2,此时创建连接的线程就会pending住,正确的流程应该时等待其他连接使用完回收,就有可用的连接了,回收后发起notempty信号,但是我此处尚未实现连接回收,所以代码运行到这就停住了。这也解释了之前的一个疑问,就是当连接池最大都在使用中,再获取连接会怎么样,答案就是,会等待其他连接使用完,归还连接。
三种情况的测试发现,基本功能都能满足,当连接不足时,也能提供新的连接使用,后续还需要实现,连接的销毁,以及使用完成后归还。
===5月13日更新=================================
今日完成连接的回收,以及抛弃,并完成获取连接前/归还连接前对连接进行校验功能,如果要对连接进行回收,那么回收的时机肯定是在连接关闭的时候,所以我们需要在包装的AtlasPooledConnection重写close方法,并进行回收。
public void close() throws SQLException {
if (this.disable) {
return;
}
AtlasConnectionHolder holder = this.holder;
if (holder == null) {
return;
}
if (!CLOSING_UPDATER.compareAndSet(this, 0, 1)) {
return;
}
try {
recycle();
} finally {
CLOSING_UPDATER.set(this, 0);
}
this.disable = true;
}我们先拿到holder,判断是否为null,以及是否已经关闭,实际这里还需要判断是否是本线程操作的关闭,暂时不实现了,都正常情况下则进行recycle().
public void recycle() throws SQLException {
if (this.disable) {
return;
}
AtlasConnectionHolder holder = this.holder;
if (holder == null) {
return;
}
if (!this.abandoned) {
AtlasAbstractDataSource dataSource = holder.getDataSource();
dataSource.recycle(this);
}
this.holder = null;
conn = null;
closed = true;
}回收之前将自己的属性置为null,因为这里只是一个包装的连接,并非实际的连接池holder连接,所以可以置为null,同时将close置为true。然后通过datasource对象进行回收。回收的整体逻辑肯定不是直接关闭了,需要根据配置的参数判断归还之前是否需要检验连接是否正常,然后进行归还,归还如果校验不通过,或者失败异常了,则对物理实际连接真正进行close,抛弃连接。如果合法则判断放入后是否会超过最大连接池,如果不超过,则放入,同时发起一个notempty signal信号通知有连接可用了,如果超过了则也要抛弃此连接。
protected void recycle(AtlasPooledConnection pooledConnection) throws SQLException {
final AtlasConnectionHolder holder = pooledConnection.holder;
if (holder == null) {
log.warn("connectionHolder is null");
return;
}
final Connection physicalConnection = holder.conn;
final boolean testOnReturn = this.testOnReturn;
try {
// reset holder, restore default settings, clear warnings
holder.reset();
if (physicalConnection.isClosed()) {
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
// 开启归还校验
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
if (!validate) {
JdbcUtils.close(physicalConnection);
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
}
boolean result;
final long currentTimeMillis = System.currentTimeMillis();
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
if (!result) {
JdbcUtils.close(holder.conn);
log.info("connection recyle failed.");
}
} catch (Throwable e) {
if (!holder.discard) {
discardConnection(holder);
holder.discard = true;
}
log.error("recyle error", e);
}
}
boolean putLast(AtlasConnectionHolder connectionHolder, long lastActiveTimeMillis) {
if (poolingCount >= maxActive || connectionHolder.discard) {
return false;
}
connections[poolingCount] = connectionHolder;
poolingCount++;
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
}
notEmpty.signal();
return true;
}在创建前/归还前可根据配置参数,对连接进行校验,这个需要定义一个检查器,在连接池init时,判断数据库类型初始化,我们这里就固定是mysql了。然后根据配置的校验sql,查询一次,成功就代表连接正常。如果我们没有配置检查sql,则初始化一个默认的 SELECT 1 语句。如果配置了使用自带的ping函数,则直接用反射调用ping,不通过查sql实现。
public boolean isValidConnection(Connection conn, String validateQuery, int validationQueryTimeout) throws Exception {
if (conn.isClosed()) {
return false;
}
if (usePingMethod) {
if (conn instanceof AtlasPooledConnection) {
conn = ((AtlasPooledConnection) conn).getConnection();
}
if (clazz.isAssignableFrom(conn.getClass())) {
if (validationQueryTimeout 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rs = stmt.executeQuery(query);
return true;
} finally {
JdbcUtils.close(rs);
JdbcUtils.close(stmt);
}
}如此,便实现了一个sql检查,在创建物理连接的时候也可以使用这个来校验,如果创建的连接不可用,就抛弃掉。
我们来测下,连接是否被归还了,首先初始化连接为1,最大为2,我们获取3个连接使用,此时就会产生连接不够用的情况,按我们昨天的测试,如果前两个连接不进行归还,那么第三个连接获取会一直被阻塞住,所以我们在第三个连接获取前,先close一个。
Connection conn1 = db.getConn();
Connection conn2 = db.getConn();
Statement stmt1 = conn1.createStatement();
Statement stmt2 = conn2.createStatement();
ResultSet rs1 = stmt1.executeQuery("select * from test where id = 1");
ResultSet rs2 = stmt2.executeQuery("select * from test where id = 5");
while (rs1.next()) {
String name = rs1.getString(2);
System.out.println(name);
}
// 归还连接1
JdbcUtils.close(conn1);
while(rs2.next()){
String r2 = rs2.getString(2);
System.out.println(r2);
}
// JdbcUtils.close(conn2); 连接2不归还
Connection conn3 = db.getConn();
Statement stmt3 = conn3.createStatement();
ResultSet rs3 = stmt3.executeQuery("select * from test where id = 6");
while(rs3.next()){
String r3 = rs3.getString(2);
System.out.println(r3);
}
JdbcUtils.close(conn3);debug下,当获取第一个连接时,我们先看下此时的连接地址是多少:

看到连接1地址2815,连接2地址2886,此时我们放开断点,进行回收连接1,同时不回收连接2,

可以看到,此时的conn1里面的connHolder已经置为null了,且conn3中的connHolder的地址与之前的地址是一致的,说明我们的连接1已经被回收,且给了新连接使用,与上面我们的代码逻辑一致,符合预期。
至此,我们的连接池已经满足连接获取使用,连接创建,连接回收,连接校验等功能,大的功能还差定时扫描连接池中不可用连接给抛弃掉,并根据配置的最大/小空闲连接数,来控制连接池中的数量。
未完待续。。。
===5月16日更新=========================
上次更新,我们测试了当连接池达到最大连接时,再取获取连接,会阻塞住,等待其他连接归还,否则则会一直等待,今天看了下,实际还有一种模式,就是当用户设置获取连接最大等待时间max-wait,时,会进行计算当前等待时间,如果超过该值,则获取失败,所以需要对代码坐下改造。
private AtlasPooledConnection getConnectionInternal(long maxWait) throws SQLException, InterruptedException {
AtlasConnectionHolder holder;
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
for (;;) {
// 从池中取
// 先加锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
connectCount++;
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
holder.active = true;
activeCount++;
}
break;
}
// skip
}传入最大等待时间,并记录这个时间,当maxWait小于0则是我们之前说的流程阻塞住一直等,大于0时,代表在这个时间内获取不到就算失败。
private AtlasConnectionHolder pollLast(long nanos) throws InterruptedException {
long estimate = nanos;
for (;;) {
if (poolingCount == 0) {
emptySignal(); // send signal to CreateThread create connection
if (estimate notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
long startEstimate = estimate;
estimate = notEmpty.awaitNanos(estimate); // signal by
// recycle or
// creator
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
notEmptySignalCount++;
throw ie;
} finally {
notEmptyWaitThreadCount--;
}
if (poolingCount == 0) {
if (estimate > 0) {
continue;
}
waitNanosLocal.set(nanos - estimate);
return null;
}
}
poolingCount--;
AtlasConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
}
}当没有链接可用时,发起一个empty信号,让创建线程去创建,如果创建线程那边一直阻塞,这里会对notEmptyWaitThreadCount做自增,记录等待个数,注意这个参数,他还有别的用处,后面再说
然后记录notemptyawait时间,进行累加,如果此时连接池仍然没有可用连接,且还有超时的剩余时间,就继续等待,否则代表超时了,则返回一个null。如果在超时时间内有可用连接了,则跳出循环,并返回一个连接,并记录上次获取连接等待了多久。
假如出现极端情况,连接池一直没有可用连接,就会阻塞很多用户连接,有没有一种措施是可以在连接不够用的时候控制阻塞线程的个数,超过这个限制后直接报错,而不是陷入等待呢?答案就是上面说的notEmptyWaitThreadCount这个参数起作用。在配置里面有一项maxWaitThreadCount,默认为-1,表示不启用,如果配置了,含义就是在连接不够用时最多让多少个业务线程发生阻塞,每次阻塞我们都会自增一次notEmptyWaitThreadCount,在获取连接时,我们先判断这个参数有没有超过 maxWaitThreadCount,如果超过了,则代表不用再继续等待获取连接了,直接返回错误就行。
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}回顾上次的实现,当用户close一个连接时,如果真的需要close,我们似乎没有关闭statement和resultset,看了下druid的实现,是在ConnectionHolder,和poolstatement中各维护一个statement的list和resultset的list,在每次创建statement和resultset的时候加入,在close conn的时候,再去循环关闭statement,又在close statement的时候,循环去关闭resultset。
在会收的时候,先将holder中的statement先全部重置,也就是关闭
protected void recycle(AtlasPooledConnection pooledConnection) throws SQLException {
final AtlasConnectionHolder holder = pooledConnection.holder;
if (holder == null) {
log.warn("connectionHolder is null");
return;
}
final Connection physicalConnection = holder.conn;
final boolean testOnReturn = this.testOnReturn;
try {
// reset holder, restore default settings, clear warnings
holder.reset();
}
// skip
}public void reset() throws SQLException {
lock.lock();
try {
for (Object item : statementTrace.toArray()) {
Statement stmt = (Statement) item;
JdbcUtils.close(stmt);
}
statementTrace.clear();
} finally {
lock.unlock();
}
conn.clearWarnings();
}在holder中循环这个list,再挨个close,每个statement的close会再出发resultset的close。
public void close() throws SQLException {
if (this.closed) {
return;
}
// 回收resultset
clearResultSet();
if (stmt != null) {
stmt.close();
}
this.closed = true;
AtlasConnectionHolder connHolder = conn.getConnectionHolder();
if (connHolder != null) {
// 去除statement list中的自己
connHolder.removeTrace(this);
}
}这样就保证了在关闭一个conn连接时,同步将其创建的statement以及resultset同时close掉,在回收连接的时候,不管其真的销毁,还是放入连接池,都保证了其之前创建的statement和resultset被close。
=== 5月17更新===============================
昨日做了获取连接最大超时时间的功能,没有测试,今天先测下这个功能。
首先我们设定最大等待6s,初始化连接池1,最大连接池2,获取3个连接使用,并且都不关闭,先看下效果:
Connection conn1 = db.getConn();
Connection conn2 = db.getConn();
Statement stmt1 = conn1.createStatement();
Statement stmt2 = conn2.createStatement();
ResultSet rs1 = stmt1.executeQuery("select * from test where id = 1");
ResultSet rs2 = stmt2.executeQuery("select * from test where id = 5");
while (rs1.next()) {
String name = rs1.getString(2);
System.out.println(name);
}
// 归还连接1
new Thread(() -> {
try {
Thread.sleep(7000);
JdbcUtils.close(conn1);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}).start();
while(rs2.next()){
String r2 = rs2.getString(2);
System.out.println(r2);
}
// JdbcUtils.close(conn2); 连接2不归还
Connection conn3 = db.getConn();
Statement stmt3 = conn3.createStatement();
ResultSet rs3 = stmt3.executeQuery("select * from test where id = 6");
while(rs3.next()){
String r3 = rs3.getString(2);
System.out.println(r3);
}
JdbcUtils.close(conn3);代码,conn1的连接休眠7秒,再close,小于我们的最大等待时间6s,也就是conn3在获取连接时,6s过后,应该还没获取到新连接,此时应该报错
2022-05-17 22:45:22 [main:0] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:287 - =================atlas数据库连接池开始初始化===============
2022-05-17 22:45:23 [main:1392] [INFO ] ru.yandex.clickhouse.ClickHouseDriver:42 - Driver registered
2022-05-17 22:45:23 [main:1407] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:651 - 进入创建新连接
2022-05-17 22:45:23 [Druid-ConnectionPool-Create-567656864:1740] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:525 - 收到唤醒,开始创建连接
2022-05-17 22:45:23 [main:1740] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:371 - {dataSource-} inited
2022-05-17 22:45:23 [Druid-ConnectionPool-Create-567656864:1740] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:547 - 开始创建新连接
2022-05-17 22:45:23 [Druid-ConnectionPool-Create-567656864:1740] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:651 - 进入创建新连接
2022-05-17 22:45:23 [main:1740] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:374 - =================atlas数据库连接池结束初始化,耗时:1740 ms===============
2022-05-17 22:45:24 [Druid-ConnectionPool-Create-567656864:1893] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:549 - 创建结束
2022-05-17 22:45:24 [Druid-ConnectionPool-Create-567656864:1893] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:557 - 创建成功,准备放入
2022-05-17 22:45:24 [Druid-ConnectionPool-Create-567656864:1893] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:590 - 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲1,活跃1连接池:2
2022-05-17 22:45:24 [Druid-ConnectionPool-Create-567656864:1893] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:559 - 创建成功,放入完毕
2022-05-17 22:45:24 [Druid-ConnectionPool-Create-567656864:1893] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:564 - CreateConnectionThread创建了一个连接并放入连接池
2022-05-17 22:45:24 [Druid-ConnectionPool-Create-567656864:1893] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:525 - 收到唤醒,开始创建连接
星尘
xc
2022-05-17 22:45:24 [Druid-ConnectionPool-Create-567656864:1924] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:534 - 防止创建超过maxActive数量的连接,此时空闲0,活跃2连接池:2
com.xingchen.atlas.exception.GetConnectionTimeoutException: wait millis 6018, active 2, maxActive 2
at com.xingchen.atlas.pool.AtlasDataSource.getConnectionInternal(AtlasDataSource.java:177)
at com.xingchen.atlas.pool.AtlasDataSource.getConnectionDirect(AtlasDataSource.java:108)
at com.xingchen.atlas.pool.AtlasDataSource.getConnection(AtlasDataSource.java:100)
at com.xingchen.atlas.pool.AtlasDataSource.getConnection(AtlasDataSource.java:737)
at com.xingchen.atlas.pool.AtlasTest.getConn(AtlasTest.java:38)
at com.xingchen.atlas.pool.AtlasTest.main(AtlasTest.java:104)
Exception in thread "main" java.lang.NullPointerException
at com.xingchen.atlas.pool.AtlasTest.getConn(AtlasTest.java:44)
at com.xingchen.atlas.pool.AtlasTest.main(AtlasTest.java:104)看到日志,还是很清晰的,获取第三个连接时,创建连接线程被阻塞了一段时间(6018)后,报错了,说活跃的有2个,最大2个,符合预期,但是此时出现了一个意外,我们的程序并没有停止下来,按理说当获取连接为null时,此时的main线程应该捕获到异常后打印错误结束main线程。但是此时程序未停止,想了下,我们启动了一个线程去close conn1,会不会在这出问题了,打个断点,发现conn1在close的过程中,当运行到加锁lock时,停住了,那么问题就在这了,应该是这个锁没被释放掉,导致回收时卡住。
回头看下获取连接的代码,发现在getConnectionInternal()方法里面,获取lock后,没有在finally里面unlock,加上解锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
try {
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
connectCount++;
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
holder.active = true;
activeCount++;
}
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
throw e;
} finally {
// 新增解锁
lock.unlock();
}再试下,发现已经能正常退出。
回到上面问题,如果在6s内获取到连接了呢?我们把conn1 close的休眠时间调整为5s,保证在等待的6s中内,有连接归还了,看下日志。
2022-05-17 23:22:14 [main:0] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:295 - =================atlas数据库连接池开始初始化===============
2022-05-17 23:22:16 [main:1450] [INFO ] ru.yandex.clickhouse.ClickHouseDriver:42 - Driver registered
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2022-05-17 23:22:16 [main:1481] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:659 - 进入创建新连接
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1813] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:533 - 收到唤醒,开始创建连接
2022-05-17 23:22:16 [main:1813] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:379 - {dataSource-} inited
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1813] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:555 - 开始创建新连接
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1813] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:659 - 进入创建新连接
2022-05-17 23:22:16 [main:1813] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:382 - =================atlas数据库连接池结束初始化,耗时:1813 ms===============
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1967] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:557 - 创建结束
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1967] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:565 - 创建成功,准备放入
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1967] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:598 - 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲1,活跃1连接池:2
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1967] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:567 - 创建成功,放入完毕
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1967] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:572 - CreateConnectionThread创建了一个连接并放入连接池
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1967] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:533 - 收到唤醒,开始创建连接
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1967] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:542 - 防止创建超过maxActive数量的连接,此时空闲0,活跃2连接池:2
星尘
xc
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1998] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:533 - 收到唤醒,开始创建连接
2022-05-17 23:22:16 [Druid-ConnectionPool-Create-567656864:1998] [INFO ] com.xingchen.atlas.pool.AtlasDataSource:542 - 防止创建超过maxActive数量的连接,此时空闲0,活跃2连接池:2
66
Disconnected from the target VM, address: '127.0.0.1:62340', transport: 'socket'
Process finished with exit code 0可以看到,有两次empty信号,且conn3成功获取连接,conn有个变量存储了上次获取连接的耗时,我们打印下这个参数看下
System.out.println("conn3 last get conn time:" + conn3.getHolder().getLastNotEmptyWaitNanos());
## conn3 last get conn time:5008330300看到大概是5s多点,还是符合预期的。
完成测试,我们再来实现下销毁链接的功能。
DestroyConnectionThread主要功能,就是不断扫描连接池,将不可用连接抛弃,并将连接池维持在最小连接数minIdle与最大连接数之间,主要是不低于minIdle,还一个功能就是主动回收长期未归还连接,防止连接泄露,数据库连接作为整个连接池最宝贵的资源,一定不能有浪费,但是实际这个功能并不建议使用,我们后面在分析原因。
首先在守护线程内,起一个死循环,并每隔timeBetweenEvictionRunsMillis(默认60s)去扫描一次
for (;;) {
try {
if (timeBetweenEvictionRunsMillis > 0) {
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //
}
if (Thread.interrupted()) {
break;
}
// log.info("销毁线程跑了一次,此时空闲"+poolingCount+",活跃"+activeCount+"连接池:" + connections.length);
destroyTask.run();
} catch (InterruptedException e) {
break;
}
}并调用销毁线程run一次,在run里面就是我们的瘦身逻辑。

首先利用poolingCount减去minIdle计算出需要做丢弃检查的连接对象区间,意味着这个区间的对象有被丢弃的可能,具体要不要放进丢弃队列evictConnections,要判断两个属性:
- minEvictableIdleTimeMillis:最小检查间隙,缺省值30min,官方解释:一个连接在池中最小生存的时间(结合检查区间来看,闲置时间超过这个时间,才会被丢弃)。
- maxEvictableIdleTimeMillis:最大检查间隙,缺省值7h,官方解释:一个连接在池中最大生存的时间(无视检查区间,只要闲置时间超过这个时间,就一定会被丢弃)。
如果当前连接对象闲置时间超过minEvictableIdleTimeMillis且下标在evictCheck区间内,则加入丢弃队列evictConnections,如果闲置时间超过maxEvictableIdleTimeMillis,则直接放入evictConnections(一般情况下会命中第一个判断条件,除非一个连接不在检查区间,且闲置时间超过maxEvictableIdleTimeMillis)。
如果连接对象不在evictCheck区间内,且keepAlive属性为true,则判断该对象闲置时间是否超出keepAliveBetweenTimeMillis(缺省值60s),若超出,则意味着该连接需要进行连接可用性检查,则将该对象放入keepAliveConnections队列。
两个队列赋值完成后,则池子会进行一次压缩,没有涉及到的连接对象会被压缩到队首。
然后就是处理evictConnections和keepAliveConnections两个队列了,evictConnections里的对象会被close最后释放掉,keepAliveConnections里面的对象将会其进行检测,碰到不可用的连接会调用discard抛弃掉,可用的连接会再次被放进连接池。
整个流程可以看出,连接闲置后,也并非一下子就减少到minIdle的,如果之前产生一堆的连接(不超过maxActive),突然闲置了下来,则至少需要花minEvictableIdleTimeMillis的时间才可以被移出连接池,如果一个连接闲置时间超过maxEvictableIdleTimeMillis则必定被回收,所以极端情况下(比如一个连接池从初始化后就没有再被使用过),连接池里并不会一直保持minIdle个连接,而是一个都没有,生产环境下这是非常不常见的,默认的maxEvictableIdleTimeMillis都有7h,除非是极度冷门的系统才会出现这种情况,而开启keepAlive也不会推翻这个规则,keepAlive的优先级是低于maxEvictableIdleTimeMillis的,keepAlive只是保证了那些检查中不需要被移出连接池的连接在指定检测时间内去检测其连接活性,从而决定是否放入池子或者直接discard。
==5月18更新===================================
连接池基本功能都具备了,下一步就是如何接入项目使用,方便点的话就是用springboot的自动装配,只需要在yml文件配置下就行,所以需要增加springboot的支持。
先用一个AtlasDataSourceWrapper,继承AtlasDataSource,用来接受参数,并配置以spring.datasource.atlas开头
@ConfigurationProperties("spring.datasource.atlas")
public class AtlasDataSourceWrapper extends AtlasDataSource implements InitializingBean {
@Autowired
private DataSourceProperties basicProperties;
@Override
public void afterPropertiesSet() throws Exception {
//if not found prefix 'spring.datasource.druid' jdbc properties ,'spring.datasource' prefix jdbc properties will be used.
if (super.getUsername() == null) {
super.setUsername(basicProperties.determineUsername());
}
if (super.getPassword() == null) {
super.setPassword(basicProperties.determinePassword());
}
if (super.getUrl() == null) {
super.setJdbcUrl(basicProperties.determineUrl());
}
if (super.getDriverClass() == null) {
super.setDriverClass(basicProperties.getDriverClassName());
}
}
@Override
public void setMaxEvictableIdleTimeMillis(long maxEvictableIdleTimeMillis) {
try {
super.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);
} catch (IllegalArgumentException ignore) {
super.maxEvictableIdleTimeMillis = maxEvictableIdleTimeMillis;
}
}
}再加一个自动装配
@Configuration
@ConditionalOnClass(AtlasDataSource.class)
@AutoConfigureBefore(DataSourceAutoConfiguration.class)
@EnableConfigurationProperties({AtlasStatProperties.class, DataSourceProperties.class})
public class AtlasDataSourceAutoConfigure {
private static final Logger LOGGER = LoggerFactory.getLogger(AtlasDataSourceAutoConfigure.class);
@Bean(initMethod = "init")
@ConditionalOnMissingBean
public DataSource dataSource() {
LOGGER.info("Init AtlasDataSource");
return new AtlasDataSourceWrapper();
}
}简简单单就齐活了,因为我们没有maven仓库,只是本地编译的,所以需要在本地把atlas连接池打成jar包放在本地,并在pom文件引入
com.xingchen atlas 1.1.0 system ${project.basedir}/src/main/resources/lib/atlas-1.0-SNAPSHOT.jar同时为了达到让用户在配置页,有智能提示,还需要加个配置,在resource目录下,加一个META-INF文件夹,新建additional-spring-configuration-metadata.json文件
{
"properties": [
{
"name": "spring.datasource.druid.connection-properties",
"type": "java.lang.String",
"sourceType": "com.xingchen.atlas.pool.AtlasDataSource"
},
{
"name": "spring.datasource.druid.filters",
"type": "java.lang.String",
"sourceType": "com.xingchen.atlas.pool.AtlasDataSource"
}
]
}同时不要忘了spring.factories文件,让springboot识别装配
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.xingchen.atlas.spring.boot.autoconfigure.AtlasDataSourceAutoConfigure
然后就齐活了,同样打成包,新建一个springboot项目引入试下:

先把两个jar包放到本地,其实应用引入一个就行了,因为atlas-springboot-starter已经包含了atlas,但是实际死活没效果,不得不把两个都引入了,回头再看下原因。然后配置文件

可以看到,智能提示也生效了,然后就可以启动项目了。
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.0.RELEASE)
2022-05-19 00:01:45.875 INFO 7528 --- [ main] c.y.Application : Starting Application on xing with PID 7528 (D:\code\JAVA-000\druid_learning\target\classes started by yang in D:\code\JAVA-000\druid_learning)
2022-05-19 00:01:45.881 INFO 7528 --- [ main] c.y.Application : No active profile set, falling back to default profiles: default
2022-05-19 00:01:46.676 INFO 7528 --- [ main] o.s.b.w.e.t.TomcatWebServer : Tomcat initialized with port(s): 8888 (http)
2022-05-19 00:01:46.683 INFO 7528 --- [ main] o.a.c.c.StandardService : Starting service [Tomcat]
2022-05-19 00:01:46.683 INFO 7528 --- [ main] o.a.c.c.StandardEngine : Starting Servlet Engine: Apache Tomcat/9.0.12
2022-05-19 00:01:46.683 INFO 7528 --- [ main] o.a.c.c.AprLifecycleListener : The APR based Apache Tomcat Native library which allows optimal performance in production environments was not found on the java.library.path: [D:\Java\jdk\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:\Program Files (x86)\Common Files\Oracle\Java\javapath;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Windows\System32\OpenSSH\;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;C:\Program Files\NVIDIA Corporation\NVIDIA NvDLISR;%JAVA_HOME%\bin;F:\Git\cmd;D:\apache-maven-3.5.0-bin\apache-maven-3.5.0\bin;D:\apache-tomcat-9.0.37\bin;C:\ProgramData\chocolatey\bin;D:\Go\bin;C:\Users\yang\AppData\Local\Microsoft\WindowsApps;F:\Bandizip\;C:\Users\yang\go\bin;.]
2022-05-19 00:01:46.761 INFO 7528 --- [ main] o.a.c.c.C.[.[.[/] : Initializing Spring embedded WebApplicationContext
2022-05-19 00:01:46.761 INFO 7528 --- [ main] o.s.w.c.ContextLoader : Root WebApplicationContext: initialization completed in 864 ms
2022-05-19 00:01:46.776 INFO 7528 --- [ main] o.s.b.w.s.ServletRegistrationBean : Servlet dispatcherServlet mapped to [/]
2022-05-19 00:01:46.783 INFO 7528 --- [ main] o.s.b.w.s.FilterRegistrationBean : Mapping filter: 'characterEncodingFilter' to: [/*]
2022-05-19 00:01:46.783 INFO 7528 --- [ main] o.s.b.w.s.FilterRegistrationBean : Mapping filter: 'hiddenHttpMethodFilter' to: [/*]
2022-05-19 00:01:46.783 INFO 7528 --- [ main] o.s.b.w.s.FilterRegistrationBean : Mapping filter: 'formContentFilter' to: [/*]
2022-05-19 00:01:46.783 INFO 7528 --- [ main] o.s.b.w.s.FilterRegistrationBean : Mapping filter: 'requestContextFilter' to: [/*]
2022-05-19 00:01:46.861 INFO 7528 --- [ main] c.x.a.s.b.a.AtlasDataSourceAutoConfigure : Init AtlasDataSource
2022-05-19 00:01:46.883 INFO 7528 --- [ main] c.x.a.p.AtlasDataSource : =================atlas数据库连接池开始初始化===============
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2022-05-19 00:01:46.930 INFO 7528 --- [ main] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:47.131 INFO 7528 --- [ main] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:47.246 INFO 7528 --- [ main] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:47.378 INFO 7528 --- [ main] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:47.497 INFO 7528 --- [ main] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:47.629 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 收到唤醒,开始创建连接
2022-05-19 00:01:47.629 INFO 7528 --- [ main] c.x.a.p.AtlasDataSource : {dataSource-} inited
2022-05-19 00:01:47.629 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 开始创建新连接
2022-05-19 00:01:47.629 INFO 7528 --- [ main] c.x.a.p.AtlasDataSource : =================atlas数据库连接池结束初始化,耗时:746 ms===============
2022-05-19 00:01:47.629 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:47.760 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建结束
2022-05-19 00:01:47.760 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,准备放入
2022-05-19 00:01:47.760 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲6,活跃0连接池:20
2022-05-19 00:01:47.760 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,放入完毕
2022-05-19 00:01:47.760 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : CreateConnectionThread创建了一个连接并放入连接池
2022-05-19 00:01:47.760 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 收到唤醒,开始创建连接
2022-05-19 00:01:47.760 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 开始创建新连接
2022-05-19 00:01:47.760 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:47.798 INFO 7528 --- [ main] o.s.s.c.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-05-19 00:01:47.883 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建结束
2022-05-19 00:01:47.883 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,准备放入
2022-05-19 00:01:47.883 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲7,活跃0连接池:20
2022-05-19 00:01:47.883 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,放入完毕
2022-05-19 00:01:47.883 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : CreateConnectionThread创建了一个连接并放入连接池
2022-05-19 00:01:47.883 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 收到唤醒,开始创建连接
2022-05-19 00:01:47.883 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 开始创建新连接
2022-05-19 00:01:47.883 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:47.983 INFO 7528 --- [ main] o.s.b.w.e.t.TomcatWebServer : Tomcat started on port(s): 8888 (http) with context path ''
2022-05-19 00:01:47.983 INFO 7528 --- [ main] c.y.Application : Started Application in 2.525 seconds (JVM running for 3.869)
2022-05-19 00:01:48.014 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建结束
2022-05-19 00:01:48.014 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,准备放入
2022-05-19 00:01:48.014 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲8,活跃0连接池:20
2022-05-19 00:01:48.014 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,放入完毕
2022-05-19 00:01:48.014 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : CreateConnectionThread创建了一个连接并放入连接池
2022-05-19 00:01:48.014 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 收到唤醒,开始创建连接
2022-05-19 00:01:48.014 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 开始创建新连接
// skip ...
2022-05-19 00:01:49.417 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,准备放入
2022-05-19 00:01:49.417 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲19,活跃0连接池:20
2022-05-19 00:01:49.417 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,放入完毕
2022-05-19 00:01:49.417 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : CreateConnectionThread创建了一个连接并放入连接池
2022-05-19 00:01:49.417 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 收到唤醒,开始创建连接
2022-05-19 00:01:49.417 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 开始创建新连接
2022-05-19 00:01:49.417 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 进入创建新连接
2022-05-19 00:01:49.548 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建结束
2022-05-19 00:01:49.548 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,准备放入
2022-05-19 00:01:49.548 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建了并放入连接池一个连接,同时通知notEmpty信号,此时空闲20,活跃0连接池:20
2022-05-19 00:01:49.548 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 创建成功,放入完毕
2022-05-19 00:01:49.548 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : CreateConnectionThread创建了一个连接并放入连接池
2022-05-19 00:01:49.548 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 收到唤醒,开始创建连接
2022-05-19 00:01:49.548 INFO 7528 --- [reate-854977732] c.x.a.p.AtlasDataSource : 防止创建超过maxActive数量的连接,此时空闲20,活跃0连接池:20看到日志,项目正常启动,且也打印了我们之前在atlas连接池创建连接时的日志,符合预期。
=====5月20更新,别问我为什么来更新文章,而不是过520
==================
上回已经把atlas数据库整合到项目里面了,而且也启动成功了,今天来测一下能不能正常使用。
首先写一个简单的crud,使用mabatis进行查询
@PostMapping("/add")
public BaseResponse add(){
Product product = Product.builder()
.name("xiaomi")
.price(new BigDecimal(5000.21))
.createTime(LocalDateTime.now())
.build();
productService.addProduct(product);
return BaseResponse.success();
}
@PostMapping("/get")
public BaseResponse get(){
Product product = productService.getProduct(1L);
return BaseResponse.success(product);
}
@PostMapping("/list")
public BaseResponse list(){
return productService.getProductList(1,10);
}然后触发接口:
curl --location --request POST 'http://localhost:8888/product/list' --header 'Content-Type: application/json' --data-raw '{}然后尴尬的事情来了,request被阻塞住,程序也无任何异常,就这么尬住了。。。
首先怀疑是不是存在死锁,先用jstack看下,发现没有,那么就可能是获取连接的时候出问题了,打几个断点,看了下,程序正常获取了conn;

首先在mybatis代码里,会获取一个preparestatement,进来看下发现会创建一个连接
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}getConnection()这里进来发现,有3个实现类,实际进入的springManagedTransaction的openConnection()方法,然后会走到spring包的DataSourceUtil中去featch一个连接:

最终会进入我们自己的atlas的getConnection()方法

通过debug,发现也正确的拿到conn了,那为啥会阻塞住呢?
继续看下mybatais的执行,发现在用preparestatement获取resultset的时候,由于我们atlas的resultset没实现方法,导致返回了一个null的resultset,而statement在拿到null时,会一直循环获取resultset,导致在这死循环了:

stmt在getResultSet这个地方,由于我们没实现这个方法,导致返回的是一个null,所以就卡在这while(rs == null)里面一直在死循环,也就导致了我们的请求一直被pending。看代码这里应该也有break机制,但是由于这里我们也是简单的固定写死返回0,导致无法退出循环,知道问题了,就好解决了。我们实现下AtlasPoolStatement的接口。
为防止mybatis再出现这种获取null的情况,我们把AtlasPooledConnection, AtlasPooledStatement,AtlasPooledPrepareStatement和atlasResultset等全部继承的接口,全部都实现一遍,最简单的方式,就是调用原来的方法返回就行。
然后我们再测试下:

终于成功了,获取到目标数据,又试了下,增删改查等接口,发现均成功获取数据,至此我们的数据库连接池的功能才正式完工。至于其事务,监控等都还没实现,不过基本的功能都是可以的,回顾目标,应该算是完成了。通过这次写一个数据库连接池,最大的提升就是明白了数据库连接池的核心逻辑,以及配置的参数分别的作用是什么,分别应对什么场景,此外还学习到了druid的一些架构的设计技巧,对锁的把控等。
后续有空了,我们再实现下监控能力,并制作个页面展示数据库连接池的连接情况。
完结撒花~~~




版权声明:本文由星尘阁原创出品,转载请注明出处!