update page now
Laravel Live Japan

Класс Pool

(PECL pthreads >= 2.0.0)

Введение

Объект Pool является контейнером для хранения объектов Worker, управления ими и регулирования их количества.

Контейнеризация представляет собой высший уровень абстракции над функционалом Worker включая управление ссылками в корректном для pthreads виде.

Обзор класса

class Pool {
/* Свойства */
protected $size;
protected $class;
protected $workers;
protected $ctor;
protected $last;
/* Методы */
public __construct(int $size, string $class = ?, array $ctor = ?)
public collect(Callable $collector = ?): int
public resize(int $size): void
public shutdown(): void
public submit(Threaded $task): int
public submitTo(int $worker, Threaded $task): int
}

Свойства

size

максимальное количество объектов Worker

class

класс Worker

workers

ссылки на объекты Worker

ctor

аргументы конструктора новых объектов Worker

last

смещение последнего использованного Worker в workers

Содержание

  • Pool::collect — Собирает ссылки на выполненные задания
  • Pool::__construct — Создаёт новый пул воркеров
  • Pool::resize — Изменяет размер пула
  • Pool::shutdown — Выключает все воркеры
  • Pool::submit — Отправляет объект на исполнение
  • Pool::submitTo — Отправляет задачу конкретному воркеру для выполнения
Добавить

Примечания пользователей 3 notes

up
6
meadowsjared at gmail dot com
9 years ago
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.

<?php
class TestWork extends Threaded {
    protected $complete;
    //$pData is the data sent to your worker thread to do it's job.
    public function __construct($pData){
        //transfer all the variables to local variables
        $this->complete = false;
        $this->testData = $pData;
    }
    //This is where all of your work will be done.
    public function run(){
        usleep(2000000); //sleep 2 seconds to simulate a large job
        $this->complete = true;
    }
    public function isGarbage() {
        return $this->complete;
    }
}
class ExamplePool extends Pool
{
    public $data = array();
    public function process()
    {
        // Run this loop as long as we have
        // jobs in the pool
        while (count($this->work)) {
            $this->collect(function (TestWork $task) {
                // If a task was marked as done
                // collect its results
                if ($task->isGarbage()) {
                    $tmpObj = new stdclass();
                    $tmpObj->complete = $task->complete;
                    //this is how you get your completed data back out [accessed by $pool->process()]
                    $this->data[] = $tmpObj;
                }
                return $task->isGarbage();
            });
        }
        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();
        return $this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for($i=0;$i<5;$i++) {
    $pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
up
4
meadowsjared at gmail dot com
4 years ago
In this example, it shows how to use a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23

<?php
class TestWork extends Threaded {
//updated version that works with pThreads v3.2.1 and php 7.3.23
    protected $complete;
    //$pData is the data sent to your worker thread to do it's job.
    public function __construct($pData) {
        //transfer all the variables to local variables
        $this->complete = false;
        $this->testData = $pData;
    }
    //This is where all of your work will be done.
    public function run() {
        usleep(2000000); //sleep 2 seconds to simulate a large job
        $this->complete = true;
    }
    public function isDone() {
        return $this->complete;
    }
}
class ExamplePool extends Pool {
    public $data = array(); // used to return data after we're done
    private $numTasks = 0; // counter used to know when we're done
    /**
     * override the submit function from the parent
     * to keep track of our jobs
     */
    public function submit(Threaded $task) {
        $this->numTasks++;
        parent::submit($task);
    }
    /**
     * used to wait until all workers are done
     */
    public function process() {
        // Run this loop as long as we have
        // jobs in the pool
        while (count($this->data) < $this->numTasks) {
            $this->collect(function (TestWork $task) {
                // If a task was marked as done, collect its results
                if ($task->isDone()) {
                    $tmpObj = new stdclass();
                    $tmpObj->complete = $task->complete;
                    //this is how you get your completed data back out [accessed by $pool->process()]
                    $this->data[] = $tmpObj;
                }
                return $task->isDone();
            });
        }
        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();
        return $this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for($i=0;$i<5;$i++) {
    $pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
up
2
olavk
10 years ago
Simple example with Collectable (basically Thread meant for Pool) and Pool

<?php

class job extends Collectable {
  public $val;

  public function __construct($val){
    // init some properties
    $this->val = $val;
  }
  public function run(){
    // do some work
    $this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
    $this->setGarbage();
  }
}

// At most 3 threads will work at once
$p = new Pool(3);

$tasks = array(
  new job('0'),
  new job('1'),
  new job('2'),
  new job('3'),
  new job('4'),
  new job('5'),
  new job('6'),
  new job('7'),
  new job('8'),
  new job('9'),
  new job('10'),
);
// Add tasks to pool queue
foreach ($tasks as $task) {
  $p->submit($task);
}

// shutdown will wait for current queue to be completed
$p->shutdown();
// garbage collection check / read results
$p->collect(function($checkingTask){
  echo $checkingTask->val;
  return $checkingTask->isGarbage();
});

?>
To Top