Skip to content

Latest commit

 

History

History
281 lines (263 loc) · 7.56 KB

[数据库四]批处理查询源码.md

File metadata and controls

281 lines (263 loc) · 7.56 KB

yii的批处理查询使用PDO的fetch实现,fetch就是一个游标,每次读出一行然后移动游标到下一位(fetchAll是一次读出所有数据到内存),PDO原生代码如下

<?php
  $batchSize = 4;
  $dsn = "mysql:host=192.168.0.10;dbname=test;port=3306";
  $options = [
    PDO::ATTR_DEFAULT_FETCH_MODE=>PDO::FETCH_ASSOC,
    PDO::ATTR_TIMEOUT=>1,
    PDO::ATTR_ERRMODE=>PDO::ERRMODE_EXCEPTION,
  ];
  try {
    $pdo = new PDO($dsn,"root","",$options);
    $pdo->quote("set name".$pdo->quote("utf8"));
    $pdo->setAttribute(PDO::ATTR_AUTOCOMMIT,1);
  } catch (Exception $e) {
    var_dump($e->getMessage());
  }
  $rawSql = "select * from a";
  $pdoStatement = $pdo->prepare($rawSql);
  try {
    $pdoStatement->execute();	
    $res = [];
    $count = 0;
    while($count++ < $this->batchSize && ($row = $pdoStatement->fetch())){
      $res[] = $row;
    }
    $pdoStatement->closeCursor();
  } catch (Exception $e) {
    var_dump($e);
    $message = $e->getMessage() . "\nThe SQL being executed was: $rawSql";
    var_dump($message);
  }

在yii中使用批处理控制器的代码如下

<?php

namespace app\controllers;

use Yii;
use PDO;
use yii\web\Controller;
use yii\db\Query;

class AController extends Controller
{
  public function actionTest(){
  $rows = new Query();
  $query  = $rows->from("a");
  foreach ($query->batch(2) as $item) {
      var_dump($item);
  }
  foreach ($query->each(2) as $item) {
      var_dump($item);
  }
}

追到代码里面,发现是通过查询构造器、BatchQueryResult类、DataReader类一起配合实现的
查询构造器里面实例化BatchQueryResult类代码如下

public function batch($batchSize = 100, $db = null)
{
    return Yii::createObject([
        'class' => BatchQueryResult::className(),
        'query' => $this,
        'batchSize' => $batchSize,
        'db' => $db,
        'each' => false,
    ]);
}
public function each($batchSize = 100, $db = null)
{
    return Yii::createObject([
        'class' => BatchQueryResult::className(),
        'query' => $this,
        'batchSize' => $batchSize,
        'db' => $db,
        'each' => true,
    ]);
}

BatchQueryResult类是没有自己的构造方法的,继承于BaseObj,没有什么可用的属性注入,他实现了接口Iterator,可见是一个迭代器

class BatchQueryResult extends BaseObject implements \Iterator

接口Iterator可以将类进行foreach操作,具体代码如下

class Obj implements Iterator{
	public $arr = [1,2,3];
	private $_key = 0;
	public function rewind(){
		var_dump(__METHOD__);
	    $this->_key = 0;
	}
	public function valid(){
		var_dump(__METHOD__);
	    return isset($this->arr[$this->_key]);
	}

	public function next(){
		var_dump(__METHOD__);
    	++$this->_key;
	}

	public function current(){
		var_dump(__METHOD__);
		return $this->arr[$this->_key];
	}

	public function key() {
		var_dump(__METHOD__);
		return $this->_key;
	}
}
$obj = new Obj();
foreach($obj as $key=>$item){
	var_dump($key."--->".$item);
}

BatchQueryResult类的遍历初始化代码如下,也就是foreach需要执行的第一个方法

public function reset()
{
    if ($this->_dataReader !== null) {
        //用于析构方法将游标关闭
        $this->_dataReader->close();
    }
    $this->_dataReader = null;
    $this->_batch = null;
    $this->_value = null;
    $this->_key = null;
}

//foreach需要执行的第一个方法
public function rewind()
{
    $this->reset();
    $this->next();
}

public function next()
{
    if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) {
        //实例化dataReader类
        $this->_batch = $this->fetchData();
        //指针放到头
        reset($this->_batch);
    }

    if ($this->each) {
        $this->_value = current($this->_batch);
        if ($this->query->indexBy !== null) {
            $this->_key = key($this->_batch);
        } elseif (key($this->_batch) !== null) {
            $this->_key = $this->_key === null ? 0 : $this->_key + 1;
        } else {
            $this->_key = null;
        }
    } else {
        $this->_value = $this->_batch;
        $this->_key = $this->_key === null ? 0 : $this->_key + 1;
    }
}

与Command类建立联系的代码如下

protected function fetchData()
{
    if ($this->_dataReader === null) {
        //可以理解为Yii::$app->get("db")->createCommand(sql)->query()
        $this->_dataReader = $this->query->createCommand($this->db)->query();
    }
    $rows = [];
    $count = 0;
    //游标遍历
    while ($count++ < $this->batchSize && ($row = $this->_dataReader->read())) {
        $rows[] = $row;
    }
    //处理indexBy
    return $this->query->populate($rows);
}

Command类与DataReader建立联系的代码如下(更详细的Command源码操作可以看以前的文章)
参数method是空,所以会直接实例化DataReade

protected function queryInternal($method, $fetchMode = null)
{
    list($profile, $rawSql) = $this->logQuery('yii\db\Command::query');
    if ($method !== '') {
        $info = $this->db->getQueryCacheInfo($this->queryCacheDuration, $this->queryCacheDependency);
        if (is_array($info)) {
            /* @var $cache \yii\caching\CacheInterface */
            $cache = $info[0];
            $rawSql = $rawSql ?: $this->getRawSql();
            $cacheKey = $this->getCacheKey($method, $fetchMode, $rawSql);
            $result = $cache->get($cacheKey);
            if (is_array($result) && isset($result[0])) {
                Yii::debug('Query result served from cache', 'yii\db\Command::query');
                return $result[0];
            }
        }
    }

    $this->prepare(true);

    try {
        $profile and Yii::beginProfile($rawSql, 'yii\db\Command::query');

        $this->internalExecute($rawSql);
        
        if ($method === '') {
            //这里就是建立联系的代码
            $result = new DataReader($this);
        } else {
            if ($fetchMode === null) {
                $fetchMode = $this->fetchMode;
            }
            $result = call_user_func_array([$this->pdoStatement, $method], (array) $fetchMode);
            $this->pdoStatement->closeCursor();
        }

        $profile and Yii::endProfile($rawSql, 'yii\db\Command::query');
    } catch (Exception $e) {
        $profile and Yii::endProfile($rawSql, 'yii\db\Command::query');
        throw $e;
    }

    if (isset($cache, $cacheKey, $info)) {
        $cache->set($cacheKey, [$result], $info[1], $info[2]);
        Yii::debug('Saved query result in cache', 'yii\db\Command::query');
    }

    return $result;
}

这里要简单说一下,如下代码的作用相同,只不过query会返回一个DataReader类,里面有更灵活的pdo操作,有兴趣的同学可以追到里面去看一下

Yii::$app->get("db")->createCommand("select * from a")->queryAll();
Yii::$app->get("db")->createCommand("select * from a")->query()->readAll();

游标的执行代码

public function read()
{
    return $this->_statement->fetch();
}

可见DataReader类获取了游标的数据后会放到rows属性里面

$rows = [];
$count = 0;
while ($count++ < $this->batchSize && ($row = $this->_dataReader->read())) {
    $rows[] = $row;
}

每一次foreach的内部遍历其实就是操作获取的游标数据

public function key()
{
    return $this->_key;
}

public function current()
{
    return $this->_value;
}

public function valid()
{
    return !empty($this->_batch);
}

最后BatchQueryResult类是有析构方法的

public function __destruct()
{
    // make sure cursor is closed
    $this->reset();
}

其实就是执行了pdo的closeCursor操作