CakeFest 2017 NYC, the Official CakePHP Conference

Pool::collect

(PECL pthreads >= 2.0.0)

Pool::collectCollect references to completed tasks

Descrizione

public void Pool::collect ( Callable $collector )

Allows the Pool to collect references determined to be garbage by the given collector

Elenco dei parametri

collector

A Callable collector

Valori restituiti

void

Esempi

Example #1 Creating Pools

<?php
class MyWork extends Stackable {
    public function 
__construct() {
        
$this->complete false;
    }

    public function 
run() {
        
printf(
            
"Hello from %s in Thread #%lu\n"
            
__CLASS__$this->worker->getThreadId());
        
$this->complete true;
    }

    public function 
isComplete() { 
        return 
$this->complete
    }

    protected 
$complete;
}

class 
MyWorker extends Worker {
    
    public function 
__construct(Something $something) {
        
$this->something $something;
    }
    
    public function 
run() {
        
/** ... **/
    
}
}

$pool = new Pool(8, \MyWorker::class, [new Something()]);
$pool->submit(new MyWork());

usleep(1000);

$pool->collect(function($work){
    return 
$work->isComplete();
});
var_dump($pool);
?>

Il precedente esempio visualizzerĂ :

Hello from MyWork in Thread #140222468777728
object(Pool)#1 (6) {
  ["size":protected]=>
  int(8)
  ["class":protected]=>
  string(8) "MyWorker"
  ["workers":protected]=>
  array(1) {
    [0]=>
    object(MyWorker)#4 (1) {
      ["something"]=>
      object(Something)#5 (0) {
      }
    }
  }
  ["work":protected]=>
  array(0) {
  }
  ["ctor":protected]=>
  array(1) {
    [0]=>
    object(Something)#2 (0) {
    }
  }
  ["last":protected]=>
  int(1)
}

add a note add a note

User Contributed Notes 2 notes

up
5
your dot brother dot t at hotmail dot com
2 years ago
The example code crashes and made me waste 2 working days
First of all, `Stackable` has no attribute named $worker or it's access method made it inaccessible.

Secondly, `Stackable` also doesn't have `getThreadId()` . It's best practice to use `Thread` class for realization of a thread since it has more control functions. It's better to use `Stackable` for object storage and use it's `run()` as its initialization.

The working example is

<?php
   
class MyWork extends Thread {
        protected
$complete;

        public function
__construct() {
           
$this->complete = false;
        }

        public function
run() {
           
printf(
               
"Hello from %s in Thread #%lu\n",
               
__CLASS__, $this->getThreadId());
           
$this->complete = true;
        }

        public function
isComplete() {
            return
$this->complete;
        }
    }

    class
Something {}

    class
MyWorker extends Worker {

        public function
__construct(Something $something) {
           
$this->something = $something;
        }

        public function
run() {
           
/** ... **/
       
}
    }

   
$pool = new Pool(8, \MyWorker::class, [new Something()]);
   
$pool->submit(new MyWork());

   
usleep(1000);

   
$pool->collect(function($work){
        return
$work->isComplete();
    });
   
var_dump($pool);
?>
up
2
meadowsjared at gmail dot com
11 months ago
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.

<?php
class TestWork extends Threaded {
    protected
$complete;
   
//$pData is the data sent to your worker thread to do it's job.
   
public function __construct($pData){
       
//transfer all the variables to local variables
       
$this->complete = false;
       
$this->testData = $pData;
    }
   
//This is where all of your work will be done.
   
public function run(){
       
usleep(2000000); //sleep 2 seconds to simulate a large job
       
$this->complete = true;
    }
    public function
isGarbage() {
        return
$this->complete;
    }
}
class
ExamplePool extends Pool
{
    public
$data = array();
    public function
process()
    {
       
// Run this loop as long as we have
        // jobs in the pool
       
while (count($this->work)) {
           
$this->collect(function (TestWork $task) {
               
// If a task was marked as done
                // collect its results
               
if ($task->isGarbage()) {
                   
$tmpObj = new stdclass();
                   
$tmpObj->complete = $task->complete;
                   
//this is how you get your completed data back out [accessed by $pool->process()]
                   
$this->data[] = $tmpObj;
                }
                return
$task->isGarbage();
            });
        }
       
// All jobs are done
        // we can shutdown the pool
       
$this->shutdown();
        return
$this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
   
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
To Top