yii的批处理查询使用PDO的fetch实现,fetch就是一个游标,每次读出一行然后移动游标到下一位(fetchAll是一次读出所有数据到内存),PDO原生代码如下
<?php
$batchSize = 4;
$dsn = "mysql:host=192.168.0.10;dbname=test;port=3306";
$options = [
PDO::ATTR_DEFAULT_FETCH_MODE=>PDO::FETCH_ASSOC,
PDO::ATTR_TIMEOUT=>1,
PDO::ATTR_ERRMODE=>PDO::ERRMODE_EXCEPTION,
];
try {
$pdo = new PDO($dsn,"root","",$options);
$pdo->quote("set name".$pdo->quote("utf8"));
$pdo->setAttribute(PDO::ATTR_AUTOCOMMIT,1);
} catch (Exception $e) {
var_dump($e->getMessage());
}
$rawSql = "select * from a";
$pdoStatement = $pdo->prepare($rawSql);
try {
$pdoStatement->execute();
$res = [];
$count = 0;
while($count++ < $this->batchSize && ($row = $pdoStatement->fetch())){
$res[] = $row;
}
$pdoStatement->closeCursor();
} catch (Exception $e) {
var_dump($e);
$message = $e->getMessage() . "\nThe SQL being executed was: $rawSql";
var_dump($message);
}
在yii中使用批处理控制器的代码如下
<?php
namespace app\controllers;
use Yii;
use PDO;
use yii\web\Controller;
use yii\db\Query;
class AController extends Controller
{
public function actionTest(){
$rows = new Query();
$query = $rows->from("a");
foreach ($query->batch(2) as $item) {
var_dump($item);
}
foreach ($query->each(2) as $item) {
var_dump($item);
}
}
追到代码里面,发现是通过查询构造器、BatchQueryResult类、DataReader类一起配合实现的
查询构造器里面实例化BatchQueryResult类代码如下
public function batch($batchSize = 100, $db = null)
{
return Yii::createObject([
'class' => BatchQueryResult::className(),
'query' => $this,
'batchSize' => $batchSize,
'db' => $db,
'each' => false,
]);
}
public function each($batchSize = 100, $db = null)
{
return Yii::createObject([
'class' => BatchQueryResult::className(),
'query' => $this,
'batchSize' => $batchSize,
'db' => $db,
'each' => true,
]);
}
BatchQueryResult类是没有自己的构造方法的,继承于BaseObj,没有什么可用的属性注入,他实现了接口Iterator,可见是一个迭代器
class BatchQueryResult extends BaseObject implements \Iterator
接口Iterator可以将类进行foreach操作,具体代码如下
class Obj implements Iterator{
public $arr = [1,2,3];
private $_key = 0;
public function rewind(){
var_dump(__METHOD__);
$this->_key = 0;
}
public function valid(){
var_dump(__METHOD__);
return isset($this->arr[$this->_key]);
}
public function next(){
var_dump(__METHOD__);
++$this->_key;
}
public function current(){
var_dump(__METHOD__);
return $this->arr[$this->_key];
}
public function key() {
var_dump(__METHOD__);
return $this->_key;
}
}
$obj = new Obj();
foreach($obj as $key=>$item){
var_dump($key."--->".$item);
}
BatchQueryResult类的遍历初始化代码如下,也就是foreach需要执行的第一个方法
public function reset()
{
if ($this->_dataReader !== null) {
//用于析构方法将游标关闭
$this->_dataReader->close();
}
$this->_dataReader = null;
$this->_batch = null;
$this->_value = null;
$this->_key = null;
}
//foreach需要执行的第一个方法
public function rewind()
{
$this->reset();
$this->next();
}
public function next()
{
if ($this->_batch === null || !$this->each || $this->each && next($this->_batch) === false) {
//实例化dataReader类
$this->_batch = $this->fetchData();
//指针放到头
reset($this->_batch);
}
if ($this->each) {
$this->_value = current($this->_batch);
if ($this->query->indexBy !== null) {
$this->_key = key($this->_batch);
} elseif (key($this->_batch) !== null) {
$this->_key = $this->_key === null ? 0 : $this->_key + 1;
} else {
$this->_key = null;
}
} else {
$this->_value = $this->_batch;
$this->_key = $this->_key === null ? 0 : $this->_key + 1;
}
}
与Command类建立联系的代码如下
protected function fetchData()
{
if ($this->_dataReader === null) {
//可以理解为Yii::$app->get("db")->createCommand(sql)->query()
$this->_dataReader = $this->query->createCommand($this->db)->query();
}
$rows = [];
$count = 0;
//游标遍历
while ($count++ < $this->batchSize && ($row = $this->_dataReader->read())) {
$rows[] = $row;
}
//处理indexBy
return $this->query->populate($rows);
}
Command类与DataReader建立联系的代码如下(更详细的Command源码操作可以看以前的文章)
参数method是空,所以会直接实例化DataReade
protected function queryInternal($method, $fetchMode = null)
{
list($profile, $rawSql) = $this->logQuery('yii\db\Command::query');
if ($method !== '') {
$info = $this->db->getQueryCacheInfo($this->queryCacheDuration, $this->queryCacheDependency);
if (is_array($info)) {
/* @var $cache \yii\caching\CacheInterface */
$cache = $info[0];
$rawSql = $rawSql ?: $this->getRawSql();
$cacheKey = $this->getCacheKey($method, $fetchMode, $rawSql);
$result = $cache->get($cacheKey);
if (is_array($result) && isset($result[0])) {
Yii::debug('Query result served from cache', 'yii\db\Command::query');
return $result[0];
}
}
}
$this->prepare(true);
try {
$profile and Yii::beginProfile($rawSql, 'yii\db\Command::query');
$this->internalExecute($rawSql);
if ($method === '') {
//这里就是建立联系的代码
$result = new DataReader($this);
} else {
if ($fetchMode === null) {
$fetchMode = $this->fetchMode;
}
$result = call_user_func_array([$this->pdoStatement, $method], (array) $fetchMode);
$this->pdoStatement->closeCursor();
}
$profile and Yii::endProfile($rawSql, 'yii\db\Command::query');
} catch (Exception $e) {
$profile and Yii::endProfile($rawSql, 'yii\db\Command::query');
throw $e;
}
if (isset($cache, $cacheKey, $info)) {
$cache->set($cacheKey, [$result], $info[1], $info[2]);
Yii::debug('Saved query result in cache', 'yii\db\Command::query');
}
return $result;
}
这里要简单说一下,如下代码的作用相同,只不过query会返回一个DataReader类,里面有更灵活的pdo操作,有兴趣的同学可以追到里面去看一下
Yii::$app->get("db")->createCommand("select * from a")->queryAll();
Yii::$app->get("db")->createCommand("select * from a")->query()->readAll();
游标的执行代码
public function read()
{
return $this->_statement->fetch();
}
可见DataReader类获取了游标的数据后会放到rows属性里面
$rows = [];
$count = 0;
while ($count++ < $this->batchSize && ($row = $this->_dataReader->read())) {
$rows[] = $row;
}
每一次foreach的内部遍历其实就是操作获取的游标数据
public function key()
{
return $this->_key;
}
public function current()
{
return $this->_value;
}
public function valid()
{
return !empty($this->_batch);
}
最后BatchQueryResult类是有析构方法的
public function __destruct()
{
// make sure cursor is closed
$this->reset();
}
其实就是执行了pdo的closeCursor操作