As Cond (nor Mutex) is good for queuing tasks, and arrays/objects stored a "serialized" (can't use it as excepted for signals outside the run()) here is a simple task queue trait:
<?php
define('T',1000000);
trait TaskQueue{
private $queue_len = 0;
private $queue_pointer = 0;
private $queue_curr = 0;
protected function queueInit(){$this->queue_len = 0; $this->queue_pointer = 0; $this->queue_curr = 0;}
protected function getNextQueued(){
if ($this->queue_len == 0) return null;
$p = $this->queue_curr;
$x = 0;
$ret = [];
do{
$key = "queue_func_{$p}";
if ($this->$key){
$ret['func'] = $this->$key;
unset($this->$key);
$key = "queue_params_{$p}";
$prs = $this->$key;
unset($this->$key);
$ret['params'] = [];
for ($i = 0; $i < $prs; $i++){
$key = "queue_param_{$p}_{$i}";
$ret['params'][] = $this->$key;
unset($this->$key);
}
$this->queue_len--;
$this->queue_curr++;
return $ret;
}
else $p++;
if ($x++ > 1000){ // the queue is corrupt
echo "\n\t\t task queue is corrupt for ".get_called_class()."\n";
$this->queue_len = 0;
$this->queue_pointer = 0;
return null;
}
}while(isset($this->$key));
return null;
}
protected function addToQueue($func,$params = []){
if (!is_array($params) || !@method_exists($this,$func)) return false;
$p = $this->queue_pointer;
$key = "queue_func_{$p}";
$this->$key = $func;
$key = "queue_params_{$p}";
$this->$key = count($params);
$i = 0;
foreach ($params as $pa){
$key = "queue_param_{$p}_{$i}";
$this->$key = $pa;
$i++;
}
$this->queue_pointer++;
$this->queue_len++;
return true;
}
}
class A extends Thread{
use TaskQueue;
private $sig_term;
public function __construct(){
$this->cond = Cond::create();
$this->sig_term = false;
$this->queueInit(); // threads don't like default parameters
$this->start();
}
public function run(){
while(!$this->sig_term){
while(($qi = $this->getNextQueued())){
call_user_func_array([$this,$qi['func']],$qi['params']);
}
echo "now doing MAIN -- ";
usleep(1*T);
echo "finished MAIN\n";
}
Mutex::destroy($m);
}
private function reallyDoA($a){
echo "doing A: {$a} -- ";
usleep(0.7*T);
echo "finished A\n";
}
public function doA($param1){
$this->addToQueue('reallyDoA',[$param1]);
}
private function reallyDoB($a,$b){
echo "doing B: {$a},{$b} -- ";
usleep(0.1*T);
echo "finished B\n";
}
public function doB($param1, $param2){
$this->addToQueue('reallyDoB',[$param1,$param2]);
}
public function term(){$this->sig_term = true;}
}
$a = new A();
usleep(1.2*T);
$a->doA(2);
$a->doA(3);
$a->doB("now b","something");
usleep(5.4*T);
$a->doA(5);
$a->doA(7);
usleep(3*T);
$a->term();
$a->join();
?>
very useful if you need syncronization from an outside event (ie: async socket read/write - your run() takes care of reading with timeout / signalig on_packet events, but writing may occur at any time (ie async heartbeeting) and fwrite($sock) while stream_select($sock) is waiting for timeout causes segfaults...)
The Cond class
(No version information available, might only be in SVN)
Einführung
The static methods contained in the Cond class provide direct access to Posix Condition Variables.
Klassenbeschreibung
Cond
{
/* Methoden */
}Inhaltsverzeichnis
- Cond::broadcast — Broadcast a Condition
- Cond::create — Create a Condition
- Cond::destroy — Destroy a Condition
- Cond::signal — Signal a Condition
- Cond::wait — Wait for Condition
jan ¶
24 days ago
