ConFoo: Call for paper is now Open

The Cond class

(No version information available, might only be in SVN)

Wstęp

The static methods contained in the Cond class provide direct access to Posix Condition Variables.

Krótki opis klasy

Cond {
/* Metody */
final public static boolean broadcast ( long $condition )
final public static long create ( void )
final public static boolean destroy ( long $condition )
final public static boolean signal ( long $condition )
final public static boolean wait ( long $condition , long $mutex [, long $timeout ] )
}

Spis treści

add a note add a note

User Contributed Notes 1 note

up
2
jan
1 year ago
As Cond (nor Mutex) is good for queuing tasks, and arrays/objects stored a "serialized" (can't use it as excepted for signals outside the run()) here is a simple task queue trait:
<?php
define
('T',1000000);
trait
TaskQueue{
    private
$queue_len = 0;
    private
$queue_pointer = 0;
    private
$queue_curr = 0;
    protected function
queueInit(){$this->queue_len = 0; $this->queue_pointer = 0; $this->queue_curr = 0;}
    protected function
getNextQueued(){
        if (
$this->queue_len == 0) return null;
       
$p = $this->queue_curr;
       
$x = 0;
       
$ret = [];
        do{
           
$key = "queue_func_{$p}";
            if (
$this->$key){
               
$ret['func'] = $this->$key;
                unset(
$this->$key);
               
$key = "queue_params_{$p}";
               
$prs = $this->$key;
                unset(
$this->$key);
               
$ret['params'] = [];
                for (
$i = 0; $i < $prs; $i++){
                   
$key = "queue_param_{$p}_{$i}";
                   
$ret['params'][] =  $this->$key;
                    unset(
$this->$key);
                }
               
$this->queue_len--;
               
$this->queue_curr++;
                return
$ret;
            }
            else
$p++;
            if (
$x++ > 1000){   // the queue is corrupt
               
echo "\n\t\t task queue is corrupt for ".get_called_class()."\n";
               
$this->queue_len = 0;
               
$this->queue_pointer = 0;
                return
null;
            }
        }while(isset(
$this->$key));
        return
null;

    }
    protected function
addToQueue($func,$params = []){
        if (!
is_array($params) || !@method_exists($this,$func)) return false;
       
$p = $this->queue_pointer;
       
$key = "queue_func_{$p}";
       
$this->$key = $func;
       
$key = "queue_params_{$p}";
       
$this->$key = count($params);
       
$i = 0;
        foreach (
$params as $pa){
           
$key = "queue_param_{$p}_{$i}";
           
$this->$key = $pa;
           
$i++;
        }
       
$this->queue_pointer++;
       
$this->queue_len++;
        return
true;
    }
}

class
A extends Thread{
    use
TaskQueue;
    private
$sig_term;
    public function
__construct(){
       
$this->cond = Cond::create();
       
$this->sig_term = false;
       
$this->queueInit(); // threads don't like default parameters
       
$this->start();
    }

    public function
run(){
        while(!
$this->sig_term){
            while((
$qi = $this->getNextQueued())){
               
call_user_func_array([$this,$qi['func']],$qi['params']);
            }
            echo
"now doing MAIN -- ";
           
usleep(1*T);
            echo
"finished MAIN\n";
        }
       
Mutex::destroy($m);
    }

    private function
reallyDoA($a){
        echo
"doing A: {$a} -- ";
       
usleep(0.7*T);
        echo
"finished A\n";

    }
    public function
doA($param1){
       
$this->addToQueue('reallyDoA',[$param1]);
    }
    private function
reallyDoB($a,$b){
        echo
"doing B: {$a},{$b} -- ";
       
usleep(0.1*T);
        echo
"finished B\n";

    }
    public function
doB($param1, $param2){
       
$this->addToQueue('reallyDoB',[$param1,$param2]);
    }
    public function
term(){$this->sig_term = true;}
}

$a = new A();
usleep(1.2*T);
$a->doA(2);
$a->doA(3);
$a->doB("now b","something");
usleep(5.4*T);
$a->doA(5);
$a->doA(7);

usleep(3*T);
$a->term();
$a->join();
?>
very useful if you need syncronization from an outside event (ie: async socket read/write - your run() takes care of reading with timeout / signalig on_packet events, but writing may occur at any time (ie async heartbeeting) and fwrite($sock) while stream_select($sock) is waiting for timeout causes segfaults...)
To Top