最近因项目需要,需要大量同步数据,数据量基数在3000万条左右,因此想到了开启多进程来处理,下面是处理的完整代码,基于laravel 5.1框架。
这是经过实际环境验证过的,所以类似场景可以简单修改下就可使用。
/**
* ******数据同步脚本
*
* @author yedonghai
*/
namespace App\Console\Commands;
use DB;
use Illuminate\Console\Command;
use App\Services\ZzcService;
class ZzcSyncCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'zzcsync:data';
/**
* The console command description.
*
* @var string
*/
protected $description = 'sync zzc apply info';
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$userNum = 6500000;
$workers = 30;
$block = 50000;
$loop = 0;
$flag = 0;
$processIds = [];
do {
$flag = $loop * $workers * $block;
for ($i = 0; $i < $workers; $i++) {
$minUserId = ($block * $i) + $flag;
$maxUserId = $block * ($i + 1) + $flag;
if ($minUserId < $userNum) {
$processIds[$i] = pcntl_fork();
switch ($processIds[$i]) {
case -1 :
echo "fork failed : {$i} \r\n";
exit;
case 0 :
$this->_userReport($minUserId, $maxUserId);
exit;
default :
break;
}
} else {
break;
}
}
while(count($processIds) > 0) {
$mypid = pcntl_waitpid(-1, $status, WNOHANG);
foreach ($processIds as $key => $pid) {
if ($mypid == $pid || $mypid == -1) {
unset($processIds[$key]);
}
}
}
$loop++;
} while (empty($processIds) && $flag < $userNum);
}
/**
* 子进程获取指定数据
*
* @param integer $minUserId 读取区间的下限
* @param integer $maxUserId 读取区间的上限
*
* @return array
*/
private function _userReport($minUserId, $maxUserId)
{
$users = DB::table('users')->leftJoin('user_credits', 'user_credits.user_id', '=', 'users.id')
->select('users.id', 'users.user_name as mobile', 'users.id_number as pid', 'users.truename as name')
->where('user_credits.audit_limit', '>', 0)
->where('users.id', '>=', $minUserId)
->where('users.id', '<', $maxUserId)
->get();
foreach ($users as $userObj) {
$userExist = DB::table('zzc_apply')->where('user_id', $userObj->id)->first();
if (!empty((array)$userExist)) {
continue;
}
$userArr = [];
$userArr['loan_type'] = '消费贷';
$userArr['loan_term'] = '3';
$userArr['loan_purpose'] = '购物';
$userArr['applicant']['name'] = $userObj->name;
$userArr['applicant']['pid'] = $userObj->pid;
$userArr['applicant']['mobile'] = $userObj->mobile;
$user = json_encode($userArr);
$zzcService = new ZzcService();
$zzcService->createNezha($user);
}
}
}