项目背景
数据来源:所有数据均为外部导入,最大数据量在10w+
输出数据:导出经过业务处理之后的数据
使用框架:fastadmin
涉及的问题:
1、数据读取
2、数据保存
使用数据:10w+
解决方案:
方案一:直接利用框架提供的功能导入Excel数据
结果:一分钟之后超时,最终执行完成时间在3分钟左右
分析:其中数据读取和数据保存(使用模型批量保存拆分为100,1000,10000)都十分耗时,而且在超时之后,系统其它功能无法响应。
可行性:不可行
方案二:将Excel数据换成csv格式
结果:一分钟之后还是超时,最终执行时间大约90s左右。
分析:读取csv格式的时候需要转码(这里很耗时,取消之后很快但是数据乱码了)。数据保存(使用模型批量保存拆分为100,1000,10000)部分依然很耗时。
可行性:不可行
方案三:取消转码,使用MySQL原生方式保存数据
结果:偶尔会成功(这里是单条数据方式插入),大约执行时间在60s左右。
分析:MySQL在执行sql语句的时候多次解析语句导致消耗很多时间,可以将语句合并为一条语句执行。
可行性:勉强可以
方案四:由于方案三可以知道多条语句执行比较耗时,修改为合并执行
结果:成功,大约耗时10-20s
分析:MySQL的语句是有大小限制,避免后续数据量大的时候出现错误。这里将数据拆分多次执行(10000,20000,50000 耗时差距在1s左右,基本可以看做没区别),最后将合并之后的sql语句在进行转码,最终耗时也在10-20s之间。
可行性:可以。
数据处理基础代码如下(网上代码修改,如有侵权,联系必删)
<?php namespace app\common\library;
use think\Db; class CsvReader {
private $csv_file;
private $spl_object = null;
private $error; public function __construct($csv_file = '') {
if($csv_file && file_exists($csv_file)) {
$this->csv_file = $csv_file;
}
} public function set_csv_file($csv_file) {
if(!$csv_file || !file_exists($csv_file)) {
$this->error = 'File invalid';
return false;
}
$this->csv_file = $csv_file;
$this->spl_object = null;
} public function get_csv_file() {
return $this->csv_file;
} private function _file_valid($file = '') {
$file = $file ? $file : $this->csv_file;
if(!$file || !file_exists($file)) {
return false;
}
if(!is_readable($file)) {
return false;
}
return true;
} private function _open_file() {
if(!$this->_file_valid()) {
$this->error = 'File invalid';
return false;
}
if($this->spl_object == null) {
$this->spl_object = new \SplFileObject($this->csv_file, 'rb');
}
return true;
} /**
* 读取时候直接转码
* @* @param integrity $start 数据长度
* @* @param integrity $length 数据长度
* @* @param Callback $call 需要处理业务回调方法
* @* @param Array $para 表头名称与数据库表字段对应关系
*/
public function get_data($start = 0, $length = 0, $call=null, $para=null) {
if(!$this->_open_file()) {
return false;
}
$length = $length ? $length : $this->get_lines();
$start = $start - 1;
$start = ($start < 0) ? 0 : $start; $data = [];
$this->spl_object->seek($start);
$rowindex=[];
$tpara = [];
if (!empty($para)) {
$thead = (array)$this->spl_object->fgetcsv();
$index = 0;
foreach ($thead as $key => $val) {
$encoding = mb_detect_encoding($val, ['utf-8', 'gbk', 'latin1', 'big5']);
if ($encoding != 'utf-8') {
$val = mb_convert_encoding($val, 'utf-8', $encoding);
}
if (isset($para[$val])) {
$rowindex[] = $index;
$tpara[$index] = $para[$val];
} $index++;
}
$this->spl_object->next();
} while ($length-- && !$this->spl_object->eof()) {
$tdata =(array)$this->spl_object->fgetcsv();
$db = []; foreach ($rowindex as $index) {
if(!isset($tdata[$index]))continue; $value = $tdata[$index];
$value = is_null($value) ? '' : trim($value);
$encoding = mb_detect_encoding($value, ['utf-8', 'gbk', 'latin1', 'big5']);
if ($encoding != 'utf-8') {
$db[$tpara[$index]] = mb_convert_encoding($value, 'utf-8', $encoding);
}
else{
$db[$tpara[$index]] = $value;
}
} if ($call) {
$t=call_user_func_array( $call ,[$db]); if ($t) {
$data[] = $t;
}
}else {
$data[]=$db;
}
$this->spl_object->next();
}
return $data;
} /**
* 读取时候不转码,并且保存到数据库
* @* @param Int $start 数据长度
* @* @param Int $length 数据长度
* @* @param Callback $call 需要处理业务回调方法
* @* @param Array $para 表头名称与数据库表字段对应关系
* @* @param String $table 需要出入的表名
*/
public function get_data1($length = 0, $start = 0,$call=null,$para=null,$table=null) {
if(!$this->_open_file()) {
return false;
}
$length = $length ? $length : $this->get_lines();
$start = $start - 1;
$start = ($start < 0) ? 0 : $start; $data = [];
$this->spl_object->seek($start);
$rowindex=[]; $tpara = []; //表头索引与数据库字段对应关系
if (!empty($para)) {
$thead = (array)$this->spl_object->fgetcsv();
// var_dump($thead);
$index = 0;
foreach ($thead as $key => $val) {
$encoding = mb_detect_encoding($val, ['utf-8', 'gbk', 'latin1', 'big5']);
if ($encoding != 'utf-8') {
$val = mb_convert_encoding($val, 'utf-8', $encoding);
}
if (isset($para[$val])) {
$rowindex[] = $index;
$tpara[$index] = $para[$val];
} $index++;
}
$this->spl_object->next();
} while ($length-- && !$this->spl_object->eof()) {
$tdata =(array)$this->spl_object->fgetcsv();
$db = []; foreach ($rowindex as $idx) {
if(!isset($tdata[$idx]))continue; $value = $tdata[$idx];
$value = is_null($value) ? '' : trim($value);
$db[$tpara[$idx]] = $value;
} // 如需要处理业务,数字可以,其他文字不行(未转码)
if ($call) {
$t=call_user_func_array( $call ,[$db]); if ($t) {
$data[] = $t;
}
}else {
$data[]=$db;
}
$this->spl_object->next(); if (count($data)==10000) {
$this->insertsdb($table,$para,$data);
$data=[];
}
} if (count($data)>0) {
$this->insertsdb($table,$para,$data);
}
}
/**
* @* @param String $table 表名
* @* @param Array $para 要插入表的字段
* @* @param Array $data 要插入表的数据
*/
public function insertsdb($table,$para,$data)
{
$sql = 'insert into '.$table.' (';
$p = implode(',',$para);
$sql .= $p.') values'; $tmpval='';
foreach ($data as $key => $value) {
$v = implode('","',array_values($value)); $tmpval .='("'.$v.'"),';
} $sql .= rtrim($tmpval,','); // 利用事务也可以减少执行时间
Db::startTrans();
try{
$encoding = mb_detect_encoding($sql, ['utf-8', 'gbk', 'latin1', 'big5']);
$sql = mb_convert_encoding($sql, 'utf-8', $encoding);
$sql = str_replace("\\","",$sql);
Db::execute($sql);
// 提交事务
Db::commit();
} catch (\Exception $e) {
// 回滚事务
Db::rollback(); } }
public function get_lines() {
if(!$this->_open_file()) {
return false;
}
$this->spl_object->seek(filesize($this->csv_file));
return $this->spl_object->key();
} public function get_error() {
return $this->error;
}
}
调用代码如下
public function import()
{
$file = $this->request->request('file');
if (!$file) {
$this->error(__('Parameter %s can not be empty', 'file'));
}
$filePath = ROOT_PATH . DS . 'public' . DS . $file;
if (!is_file($filePath)) {
$this->error(__('No results were found'));
}
$csv = new CsvReader($filePath);
// 此处公司业务不便公布(大约有20个参数),根据自己业务自行调整
$para =[
'单位详细名称'=>'company_name',
'省份'=>'province',
'地市'=>'city',
'区县'=>'country',
'是否关闭'=>'is_shutdown',
'公司Id'=>'company_id'
];
$data = $csv->get_data1(0,0,[$this,'handdbCallBack'],$para,'fa_bs_pollutant_discharge_info');
if($data)
{
$this->error('失败');
}
$this->success();
} /**
* 业务处理回调
*/
public function handdbCallBack($row)
{ if ($row) { // 当前编码是CP936
// 这里可以使用 $encoding = mb_detect_encoding($str, ['utf-8', 'gbk', 'latin1', 'big5'])
// 获取当前数据编码
$encoding='CP936';
$test = mb_convert_encoding('是', $encoding, 'utf-8');
if (isset($row['is_shutdown'])) {
$row['is_shutdown']=$test==$row['is_shutdown']?1:0;
}
// 此处为雪花ID生成,自行百度
$row['company_id'] = $this->snow->generateID(); return $row;
}
return null;
}
关于导出部分,可以直接导出csv格式,速度会很快。这里就不上代码了,自行百度。
以上是所有内容,以此记录一下,便于以后查阅。如有问题或者更好的解决方案欢迎各位指出。谢谢!!!