目录

03 Mybatis 连接池的实现原理

mybatis源码系列文章,示例中带有中文注释的源码 copy 自 https://gitee.com/wlizhi/mybatis-3

链接中源码是作者从 github 下载,并以自身理解对核心流程及主要节点做了详细的中文注释。


PooledConnection

PooledConnection 实现了 InvocationHandler 接口,内部持有了 Connection 对象,是对 Connection 的增强类。

内部封装了连接相关的一些状态信息,以及对 close() 的重新定义。原本 Connection 的 close() 是关闭连接,而这里是将连接归还到连接池。

数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class PooledConnection implements InvocationHandler {

	private static final String CLOSE = "close";
	private static final Class<?>[] IFACES = new Class<?>[]{Connection.class};

	private final int hashCode;
	// 记录当前连接所在的数据源对象,本次连接是有这个数据源创建的,关闭后也是回到这个数据源
	private final PooledDataSource dataSource;
	// 真正的连接对象
	private final Connection realConnection;
	// 连接的代理对象
	private final Connection proxyConnection;
	// 从数据源取出来连接的时间戳
	private long checkoutTimestamp;
	// 连接创建的的时间戳
	private long createdTimestamp;
	// 连接最后一次使用的时间戳
	private long lastUsedTimestamp;
	// 根据数据库url、用户名、密码生成一个hash值,唯一标识一个连接池
	private int connectionTypeCode;
	// 连接是否有效
	private boolean valid;
}

增强方法 invoke():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class PooledConnection implements InvocationHandler {

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		String methodName = method.getName();
		// 针对连接的 close() ,进行替换,连接的关闭,修改为将连接归还到连接池。
		if (CLOSE.equals(methodName)) {
			dataSource.pushConnection(this);
			return null;
		}
		try {
			// 对于其他方法,逻辑不变,直接执行目标对象的方法。
			if (!Object.class.equals(method.getDeclaringClass())) {
				// issue #579 toString() should never fail
				// throw an SQLException instead of a Runtime
				checkConnection();
			}
			return method.invoke(realConnection, args);
		} catch (Throwable t) {
			throw ExceptionUtil.unwrapThrowable(t);
		}
	}
}

在 invoke() 中,如果是 close() 方法,则将连接归还到连接池,而不是关闭连接。以达到连接复用的目的。

PoolState

PoolState 是连接池数据结构的封装,其中有空闲连接列表、活跃连接列表、以及一些统计数据等。

数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class PoolState {

	protected PooledDataSource dataSource;

	// 空闲的连接池资源集合
	protected final List<PooledConnection> idleConnections = new ArrayList<>();
	// 活跃的连接池资源集合
	protected final List<PooledConnection> activeConnections = new ArrayList<>();
	// 请求的次数
	protected long requestCount = 0;
	// 累计的获得连接的时间
	protected long accumulatedRequestTime = 0;
	// 累计的使用连接的时间。从连接取出到归还,算一次使用的时间;
	protected long accumulatedCheckoutTime = 0;
	// 使用连接超时的次数
	protected long claimedOverdueConnectionCount = 0;
	// 累计超时时间
	protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
	// 累计等待时间
	protected long accumulatedWaitTime = 0;
	// 等待次数
	protected long hadToWaitCount = 0;
	// 无效的连接次数
	protected long badConnectionCount = 0;
}

PooledDataSource

PooledDataSource 是 DataSource 的一个池化的简单实现。其中封装了 DataSource 相关的配置信息以及连接池信息 PoolState, 包括最大活跃连接数、最大限制连接数、等等。

数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
 * This is a simple, synchronous, thread-safe database connection pool.
 * 一个简单,同步的、线程安全的数据库连接池
 * @author Clinton Begin
 */
public class PooledDataSource implements DataSource {

	private static final Log log = LogFactory.getLog(PooledDataSource.class);

	/** 连接池状态 */
	private final PoolState state = new PoolState(this);

	private final UnpooledDataSource dataSource;

	// OPTIONAL CONFIGURATION FIELDS
	// 最大活跃连接数
	protected int poolMaximumActiveConnections = 10;
	// 最大闲置连接数
	protected int poolMaximumIdleConnections = 5;
	// 最大checkout时长(最长使用时间)
	protected int poolMaximumCheckoutTime = 20000;
	// 无法取得连接是最大的等待时间
	protected int poolTimeToWait = 20000;
	// 最多允许几次无效连接
	protected int poolMaximumLocalBadConnectionTolerance = 3;
	// 测试连接是否有效的sql语句
	protected String poolPingQuery = "NO PING QUERY SET";
	// 是否允许测试连接
	protected boolean poolPingEnabled;
	// 配置一段时间,当连接在这段时间内没有被使用,才允许测试连接是否有效
	protected int poolPingConnectionsNotUsedFor;

	// 根据数据库url、用户名、密码生成一个hash值,唯一标识一个连接池,由这个连接池生成的连接都会带上这个值
	private int expectedConnectionTypeCode;
}

里面核心方法有两个,分别是 popConnection() 、 pushConnection() 。

popConnection() 用于获取连接,pushConnection() 用于归还连接。

连接的获取 popConnection

连接的获取源码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class PooledDataSource implements DataSource {

	private PooledConnection popConnection(String username, String password) throws SQLException {
		boolean countedWait = false;
		PooledConnection conn = null;
		long t = System.currentTimeMillis();
		int localBadConnectionCount = 0;

		while (conn == null) {
			// 添加同步控制,锁对象是 PoolState。
			synchronized (state) {
				// 如果还有空闲连接,则直接从空闲连接中获取一个,获取到的这个连接从空闲连接列表中移除。
				if (!state.idleConnections.isEmpty()) {
					// Pool has available connection
					conn = state.idleConnections.remove(0);
					if (log.isDebugEnabled()) {
						log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
					}
				} else {
					// 空闲连接列表中,已经没有空闲连接

					// 如果当前活跃连接数,小于最大活跃连接数,直接创建新链接。
					// Pool does not have available connection
					if (state.activeConnections.size() < poolMaximumActiveConnections) {
						// Can create new connection
						conn = new PooledConnection(dataSource.getConnection(), this);
						if (log.isDebugEnabled()) {
							log.debug("Created connection " + conn.getRealHashCode() + ".");
						}
					} else {
						// 如果当前活跃链接数,达到了最大活跃连接数,且没有了空闲连接。

						// 获取最早创建的连接,判断是否超时
						// Cannot create new connection
						PooledConnection oldestActiveConnection = state.activeConnections.get(0);
						long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
						// 如果已经超时
						if (longestCheckoutTime > poolMaximumCheckoutTime) {
							// Can claim overdue connection
							// 使用连接超时的次数 +1
							state.claimedOverdueConnectionCount++;
							// 累计超时时间增加。
							state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
							// 累计的使用连接的时间增加。
							state.accumulatedCheckoutTime += longestCheckoutTime;
							// 将这个旧的超时连接,从活跃的连接列表中移除。
							state.activeConnections.remove(oldestActiveConnection);
							// 如果这个旧的超时连接,存在尚未完成的事务,进行回滚,如果回滚抛出异常,则忽略异常。
							if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
								try {
									oldestActiveConnection.getRealConnection().rollback();
								} catch (SQLException e) {
									log.debug("Bad connection. Could not roll back");
								}
							}
							// 没有空闲连接,已达最大活跃连接数,最早的连接已经超时,则在移除超时连接后,创建新的连接。
							conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
							conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
							conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
							// 将旧的超时连接置为无效。
							oldestActiveConnection.invalidate();
							if (log.isDebugEnabled()) {
								log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
							}
						} else {
							// 没有空闲连接,已达最大活跃连接数,最早的连接没有超时。只能阻塞。
							// Must wait
							try {
								if (!countedWait) {
									state.hadToWaitCount++;
									countedWait = true;
								}
								if (log.isDebugEnabled()) {
									log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
								}
								long wt = System.currentTimeMillis();
								// 阻塞时间设置为无法取得连接是最大的等待时间。这里的阻塞会释放掉锁,在这期间,有可能会被其他线程唤醒。
								state.wait(poolTimeToWait);
								// 累积等待时间累加上以上阻塞的时间。
								state.accumulatedWaitTime += System.currentTimeMillis() - wt;
							} catch (InterruptedException e) {
								break;
							}
						}
					}
				}
				// 如果最终连接不为空。
				if (conn != null) {
					// ping to server and check the connection is valid or not
					// 连接有效
					if (conn.isValid()) {
						// 真实连接没有自动提交,则回滚掉存在的旧事务。
						if (!conn.getRealConnection().getAutoCommit()) {
							conn.getRealConnection().rollback();
						}
						conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
						conn.setCheckoutTimestamp(System.currentTimeMillis());
						conn.setLastUsedTimestamp(System.currentTimeMillis());
						// 将获取到的连接放入到活跃连接列表中、请求的次数自增、累积获得连接的时间增加。
						state.activeConnections.add(conn);
						state.requestCount++;
						state.accumulatedRequestTime += System.currentTimeMillis() - t;
					} else {
						if (log.isDebugEnabled()) {
							log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
						}
						// 连接不为空,但是无效。无效连接统计次数自增。
						state.badConnectionCount++;
						localBadConnectionCount++;
						conn = null;
						// 如果获取无效连接的次数 大于 最大闲置连接数+最大重试次数,则抛出异常,否则继续尝试获取连接。
						if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
							if (log.isDebugEnabled()) {
								log.debug("PooledDataSource: Could not get a good connection to the database.");
							}
							throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
						}
					}
				}
			}

		}

		if (conn == null) {
			if (log.isDebugEnabled()) {
				log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
			}
			throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
		}

		return conn;
	}
}

获取的流程:

  • 获取添加同步控制,一次只允许一个线程进行连接的获取。这里是线程安全的。
  • 如果空闲连接列表不为空,直接从连接列表头部获取一个连接,并连接从空闲列表中移除。
  • 如果空闲列表为空。
    • 如果活跃连接数小于最大活跃连接数,创建一个新的连接。
    • 如果活跃连接数达到最大活跃连接数,获取最早创建的活跃连接,判断是否超时。
      • 如果已经超时,设置相关统计数据,如果存在未提交事务,进行回滚。从活跃连接列表中移除,使用旧的真实连接封装一个新的 PooledConnection, 并将旧的 PooledConnection 置为无效。
      • 如果未超时,阻塞线程。
  • 最终如果获取到的连接不为空。
    • 如果连接有效,设置连接的一些统计数据,将连接设置到活跃连接列表中。
    • 如果连接无效,判断是否超出最大重试次数,如果超出抛出异常,否则重新尝试获取。
  • 如果获取到的连接为空,打印 debug 日志后抛出异常。

流程图:

https://oss.wlizhi.cc/blog/mybatis/pop-connection-process.png
pop-connection-process.png

连接的归还 pushConnection

pushConnection() 源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class PooledDataSource implements DataSource {

	protected void pushConnection(PooledConnection conn) throws SQLException {
		// 在归还连接时加上同步控制。锁对象是 PoolState。
		synchronized (state) {
			// 从活跃连接列表中,移除此连接。
			state.activeConnections.remove(conn);
			// 如果连接有效
			if (conn.isValid()) {
				// 空闲连接数量 小于 最大空闲连接数 且 是从当前连接池中获取到的连接。
				if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
					// 累加累积使用连接的时间
					state.accumulatedCheckoutTime += conn.getCheckoutTime();
					// 回滚掉可能存在的事务。
					if (!conn.getRealConnection().getAutoCommit()) {
						conn.getRealConnection().rollback();
					}
					// 将原始连接封装成一个新的 PooledConnection,
					PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
					state.idleConnections.add(newConn);
					newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
					newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
					// 旧的 PooledConnection 设置为无效。
					conn.invalidate();
					if (log.isDebugEnabled()) {
						log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
					}
					// 连接归还成功,唤醒其他正在等待获取连接的线程。
					state.notifyAll();
				} else {
					// 空闲连接数量 大于等于 最大空闲连接数
					state.accumulatedCheckoutTime += conn.getCheckoutTime();
					// 回滚掉可能存在的事务,将真实连接关闭,连接包装类设置为无效。
					if (!conn.getRealConnection().getAutoCommit()) {
						conn.getRealConnection().rollback();
					}
					conn.getRealConnection().close();
					if (log.isDebugEnabled()) {
						log.debug("Closed connection " + conn.getRealHashCode() + ".");
					}
					conn.invalidate();
				}
			} else {
				if (log.isDebugEnabled()) {
					log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
				}
				// 归还的连接是无效的,则无效连接统计数量 +1。
				state.badConnectionCount++;
			}
		}
	}
}

代码流程:

  1. 添加同步控制,只允许一个线程归还连接,其他线程会阻塞。
  2. 从活跃连接列表中移除此连接。
  3. 如果连接有效。
    1. 如果空闲连接数小于最大空闲连接数,执行归还操作。归还时会将原始连接封装到新的 PooledConnection 实例中,旧的 PooledConnection 会标记为无效。
    2. 如果空闲连接数达到最大空闲连接数,丢弃连接。
  4. 如果连接无效,设置统计变量,无效连接统计数量 +1。

流程图:

https://oss.wlizhi.cc/blog/mybatis/push-connection-process.png
push-connection-process.png

小结

这三个类,实际上就是实现一个简单的连接池,以达到连接复用的目的。一般开发环境不会使用这个,集成到 spring 时一般使用第三方连接池。

连接的获取和归还,同步控制使用的同一把监视锁,他们是互斥的。