Hyperf db连接池

连接池

hyperf/database 组件是基于 illuminate/database 衍生出来的组件,我们对它进行了一些改造,从设计上是允许用于其它 PHP-FPM 框架或基于 Swoole 的框架中的,而在 Hyperf 里就需要提一下 hyperf/db-connection 组件,它基于 hyperf/pool 实现了数据库连接池并对模型进行了新的抽象,以它作为桥梁,Hyperf 才能把数据库组件及事件组件接入进来。

实现

Hyperf的db使用有两种方式,是否走ORM,ORM会对db查询及响应做多一层封装,底层最终也是通过connection 发起db请求。

Hyperf\DbConnection\Model\Model 继承 Hyperf\Database\Model/Model ,所有的db请求通过 Hyperf\Database\Model\Bulder 发出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# vendor/hyperf/database/src/Model/Model.php   

/**
* Handle dynamic method calls into the model.
*
* @param string $method
* @param array $parameters
*/
public function __call($method, $parameters)
{
if (in_array($method, ['increment', 'decrement'])) {
return $this->{$method}(...$parameters);
}

return call([$this->newQuery(), $method], $parameters)
}

# 最终都是获取到 Hyperf\Database\Model\Bulder 的实例
/**
* Get a new query builder for the model's table.
*
* @return \Hyperf\Database\Model\Builder
*/
public function newQuery()
{
return $this->registerGlobalScopes($this->newQueryWithoutScopes());
}

/**
* Get a new query builder that doesn't have any global scopes or eager loading.
*
* @return \Hyperf\Database\Model\Builder|static
*/
public function newModelQuery()
{
return $this->newModelBuilder($this->newBaseQueryBuilder())->setModel($this);
}

/**
* Get a new query builder instance for the connection.
*
* @return \Hyperf\Database\Query\Builder
*/
protected function newBaseQueryBuilder()
{
$connection = $this->getConnection();

return new QueryBuilder($connection, $connection->getQueryGrammar(), $connection->getPostProcessor());
}

/**
* Get the database connection for the model.
* You can write it by yourself.
*/
public function getConnection(): ConnectionInterface
{
return Register::resolveConnection($this->getConnectionName());
}

如上代码所示,db请求都会进入到 Hyperf\Database\Query\Builder ,之后通过自身的connection 属性完成db请求,而connection 属性由上面最后一个方法注入Register::*resolveConnection*($this->getConnectionName()) :

1
2
3
4
5
6
7
8
9
10
11
# vendor/hyperf/database/src/Model/Register.php
/**
* Resolve a connection instance.
*
* @param null|string $connection
* @return ConnectionInterface
*/
public static function resolveConnection($connection = null)
{
return static::$resolver->connection($connection);
}

$resolver 属性框架启动时注入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# vendor/hyperf/db-connection/src/Listener/RegisterConnectionResolverListener.php
public function listen(): array
{
return [
BootApplication::class,
];
}

public function process(object $event)
{
if ($this->container->has(ConnectionResolverInterface::class)) {
Register::setConnectionResolver(
$this->container->get(ConnectionResolverInterface::class)
);
}
}

ConnectionResolverInterface::classConfigProvider 中注入:

1
2
3
4
5
6
7
8
9
10
11
12
# vendor/hyperf/db-connection/src/ConfigProvider.php
public function __invoke(): array
{
return [
'dependencies' => [
PoolFactory::class => PoolFactory::class,
ConnectionFactory::class => ConnectionFactory::class,
ConnectionResolverInterface::class => ConnectionResolver::class,
'db.connector.mysql' => MySqlConnector::class,
MigrationRepositoryInterface::class => DatabaseMigrationRepositoryFactory::class,
],
...

connection 属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# vendor/hyperf/db-connection/src/ConnectionResolver.php
/**
* Get a database connection instance.
*
* @param string $name
* @return ConnectionInterface
*/
public function connection($name = null)
{
if (is_null($name)) {
$name = $this->getDefaultConnection();
}

$connection = null;
$id = $this->getContextKey($name);
if (Context::has($id)) {
$connection = Context::get($id);
}

if (! $connection instanceof ConnectionInterface) {
$pool = $this->factory->getPool($name);
$connection = $pool->get();
try {
// PDO is initialized as an anonymous function, so there is no IO exception,
// but if other exceptions are thrown, the connection will not return to the connection pool properly.
$connection = $connection->getConnection();
Context::set($id, $connection);
} finally {
if (Coroutine::inCoroutine()) {
defer(function () use ($connection, $id) {
Context::set($id, null);
$connection->release();
});
}
}
}

return $connection;
}

此处使用了连接池,连接池的db实现是 Hyperf\DbConnection\Pool\DbPoolHyperf\DbConnection\Connection ,具体可以看源码。

上面connection 方法使用了defer 方法,会在协程结束即请求结束时将db连接放回连接池;