Netkiller PHP 手札
PHP Language...
$Id: book.xml 642 2013-07-19 01:28:13Z netkiller $
版权 © 2008, 2009, 2010, 2011, 2012, 2013, 2014 Netkiller(Neo Chan). All rights reserved.
版权声明
转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明。
|
|
$Date: 2013-07-19 09:28:13 +0800 (Fri, 19 Jul 2013) $
我的系列文档
5.14. pthreads
编译PHP时需要加入 --enable-maintainer-zts 选项才能安装pthreads
# pecl install pthread
配置文件
cat > /srv/php-5.5.7/etc/conf.d/pthreads.ini <<EOF extension=pthreads.so EOF
$ php -m |grep pthreads pthreads
3.14.1. Thread
<?php class test extends Thread { public $name = ''; public $runing = false; public function __construct($name) { $this->name = $name; $this->runing = true; } public function run() { $n = 0; while ($this->runing) { printf("name: %s %s\n",$this->name, $n); $n++; sleep(1); } } } $pool[] = new test('a'); $pool[] = new test('b'); $pool[] = new test('c'); foreach ($pool as $w) { $w->start(); }
线程池实现方法
$pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while ( true ){ if(count($pool) < 2000){ //定义线程池数量,小于线程池数量则开启新的线程直到小于2000为止 $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ //如果线程已经运行结束,销毁线程,给新的任务使用 if(! $worker->isRunning()){ unset($pool[$name]); } } } } }
<?php class ExampleWork extends Stackable { public function __construct($data) { $this->local = $data; } public function run() { // print_r($this->local);echo "\r\n"; echo '------------------- '. $this->local . " -----------------\r\n"; sleep(1); } } class ExampleWorker extends Worker { public function __construct($name) { $this->name = $name; $this->data = array(); } public function run(){ $this->name = sprintf("(%lu)", $this->getThreadId()); } } /* Dead simple pthreads pool */ class Pool { /* to hold worker threads */ public $workers; /* to hold exit statuses */ public $status; /* prepare $size workers */ public function __construct($size = 10) { $this->size = $size; } /* submit Stackable to Worker */ public function submit(Stackable $stackable) { if (count($this->workers)<$this->size) { $id = count($this->workers); $this->workers[$id] = new ExampleWorker(sprintf("Worker [%d]", $id)); $this->workers[$id]->start(PTHREADS_INHERIT_NONE); if ($this->workers[$id]->stack($stackable)) { return $stackable; } else trigger_error(sprintf("failed to push Stackable onto %s", $this->workers[$id]->getName()), E_USER_WARNING); }else{ for ($i=0;$i<count($this->workers);$i++){ if( ! $this->workers[$i]->isWorking()){ $this->workers[$i]->stack($stackable); return $stackable; } } } return false; } public function status(){ for ($i=0;$i<count($this->workers);$i++){ printf("(%s:%s)\r\n",$i, $this->workers[$i]->isWorking()); } printf("\r\n"); } /* Shutdown the pool of threads cleanly, retaining exit status locally */ public function shutdown() { foreach($this->workers as $worker) { $this->status[$worker->getThreadId()]=$worker->shutdown(); } } } /* Create a pool of ten threads */ $pool = new Pool(100); /* Create and submit an array of Stackables */ $work = array(); for ($target = 0; $target < 1000; $target++){ $work[$target]=$pool->submit(new ExampleWork($target)); if($work[$target] == false){ $target--; sleep(1); continue; } for ($i=0;$i<count($work);$i++){ if($work[$i]->isRunning()){ printf("cell: %s, status: %s\r\n",$i, $work[$i]->isRunning()); } } printf("\r\n"); } $pool->shutdown();