PHP 8.4.6 Released!

parallel\run

(1.0.0)

parallel\runExecução

Descrição

parallel\run(Closure $task): ?Future

Agenda a tarefa informada em task para execução em paralelo.

parallel\run(Closure $task, array $argv): ?Future

Agenda a tarefa informada em task para execução em paralelo, passando os argumentos argv no momento da execução.

Agendamento Automático

Se um \parallel\Runtime criado internamente e armazenado em cache por uma chamada anterior a parallel\run() estiver ocioso, ele será usado para executar a tarefa. Se nenhum \parallel\Runtime estiver ocioso, o parallel criará e armazenará em cache um \parallel\Runtime.

Nota:

Objetos \parallel\Runtime criados pelo programador não são usados ​​para agendamento automático.

Parâmetros

task

Uma Closure com características específicas.

argv

Um array de argumentos com características específicas a ser passado para task no momento da execução.

Características da tarefa

As Closures programadas para execução paralela não devem:

  • aceitar ou retornar por referência
  • aceitar ou retornar objetos internos (ver notas)
  • executar um conjunto limitado de instruções

As instruções proibidas em Closures destinadas à execução paralela são:

  • yield
  • uso de referência
  • declaração de classe
  • declaração de função nomeada

Nota:

Fechamentos aninhados podem usar yield ou referências, mas não devem conter declarações de classe ou funções nomeadas.

Nota:

Nenhuma instrução é proibida nos arquivos que a tarefa pode incluir.

Características dos Argumentos

Os argumentos não devem conter:

  • referências
  • recursos
  • objetos internos (veja notas)

Nota:

No caso de recursos de fluxo de arquivo, o recurso será convertido para o descritor de arquivo e passado como int sempre que possível. Isso não é suportado no Windows.

Notas sobre Objetos Internos

Objetos internos geralmente usam uma estrutura personalizada que não pode ser copiada por valor com segurança. Atualmente, o PHP não possui o mecanismo necessário para fazer isso (sem serialização). Portanto, apenas objetos que não usam uma estrutura personalizada podem ser compartilhados.

Alguns objetos internos não usam uma estrutura personalizada, por exemplo parallel\Events\Event e, portanto, podem ser compartilhados.

Closures são um tipo especial de objeto interno e suporte que estão sendo copiados por valor e, portanto, podem ser compartilhadas.

Os canais são essenciais para escrever código paralelo e oferecem suporte ao acesso e execução simultâneos por necessidade e, portanto, podem ser compartilhados.

Aviso

Uma classe de usuário que estende uma classe interna pode usar uma estrutura personalizada, conforme definida pela classe interna, e nesse caso não pode ser copiada por valor com segurança, e, portanto, não pode ser compartilhada.

Valor Retornado

Aviso

O parallel\Future retornado não deve ser ignorado quando a tarefa contém uma instrução return ou throw.

Exceções

Aviso

Lança uma exceção parallel\Runtime\Error\Closed se parallel\Runtime tiver sido fechada.

Aviso

Lança uma exceção parallel\Runtime\Error\IllegalFunction se task é uma closure criada a partir de uma função interna.

Aviso

Lança uma exceção parallel\Runtime\Error\IllegalInstruction se task contiver instruções ilegais.

Aviso

Lança uma exceção parallel\Runtime\Error\IllegalParameter se task aceitar ou argv contiver variáveis ilegais.

Aviso

Lança uma exceção parallel\Runtime\Error\IllegalReturn se task retornar ilegalmente.

Veja Também

adicione uma nota

Notas Enviadas por Usuários (em inglês) 3 notes

up
22
john_2885 at yahoo dot com
5 years ago
Here's a more substantial example of how to use the run functional API.

<?php
/*********************************************
* Sample parallel functional API
*
* Scenario
* -------------------------------------------
* Given a large number of rows of
* data to process, divide the work amongst
* a set of workers. Each worker is responsible
* for finishing their assigned task.
*
* In the code below, assume we have arbitrary
* start and end IDs (rows) - we will try to
* divide the number of IDs (rows) evenly
* across 8 workers. The workers will get the
* following batches to process to completion:
*
* Total number of IDs (rows): 1371129
* Each worker will get 171392 IDs to process
*
* Worker 1: IDs from 11001 to 182393
* Worker 2: IDs from 182393 to 353785
* Worker 3: IDs from 353785 to 525177
* Worker 4: IDs from 525177 to 696569
* Worker 5: IDs from 696569 to 867961
* Worker 6: IDs from 867961 to 1039353
* Worker 7: IDs from 1039353 to 1210745
* Worker 8: IDs from 1210745 to 1382130
*
* Each worker then processes 5000 rows at a time
* until they are done with their assigned work
*
*********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches
$rowsToFetch = 5000;

print
"Total IDs: " . $totalIds . "\n";
print
"Batch Size: " . $batchSize . "\n";
print
"Last Batch: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
$tempMinId = $startId;
$tempMaxId = $tempMinId + $fetchSize;
$fetchCount = 1;

print
"Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";

while(
$tempMinId < $endId) {
for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
$usleep = rand(500000, 1000000);
usleep($usleep);
print
"Worker " . $worker . " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
// Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
break;
}

// Now we move on to the next sub-batch for this worker
$tempMinId = $tempMaxId;
$tempMaxId = $tempMinId + $fetchSize;
if(
$tempMaxId > $endId) {
$tempMaxId = $endId;
}
// Introduce some timing randomness
$sleep = rand(1,5);
sleep($sleep);
$fetchCount++;
}

// This worker has completed their entire batch
print "Worker " . $worker . " finished\n";

};

// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
$startId = $minId + ($i * $batchSize);
$endId = $startId + $batchSize;
if(
$i == ($workers - 1)) {
$endId = $maxId;
}
\parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
up
9
anonymous user
4 years ago
Although function declaration is not allowed inside thread exec code, include is allowed. So if we want to declare a function, we could write another file that contain the function and include it.
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
$future = $runtime->run ( function () {
include
"included.php";
return
add (1, 3);
}, [ ] );
echo
$future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
return
$a + $b;
}
up
1
Thierry Kauffmann
3 years ago
<?php

/**
* Sample parralel functional API
* using a generator instead of a static list of items to process
*
* Items to process in parallel come from a generator
* It could be anything : e.g fetch a mysql array, a DirectoryIterator...
* Thus the number of items to process in parallel is NOT known in advance
*
* This algorithm attributes items to each parallel thread dynamically
* As soon as a thread has finished working
* It is assigned a new item to process
* until all items are processed (generator closes)
*
* In this example we process 50 items in 5 parallel threads
* It produces output in this form (output changes at each run) :
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/

use \parallel\{Runtime, Future, Channel, Events};

// Generate list of items to process with a generator
function generator(int $item_count) {
for (
$i=1; $i <= $item_count; $i++) {
yield
$i;
}
}

function
testConcurrency(int $concurrency, int $item_count) {

$generator = generator($item_count);

// Function executing in each thread. Have a snap for a random time for example !
$producer = function (int $item_id) {
$seconds = rand(1, 10);
sleep($seconds);
return [
'item_id' => $item_id, 'sleep_seconds' => $seconds];
};

// Fill up threads with initial 'inactive' state
$threads = array_fill(1, $concurrency, ['is_active' => false]);

while (
true) {
// Loop through threads until all threads are finished
foreach ($threads as $thread_id => $thread) {
if (!
$thread['is_active'] and $generator->valid()) {
// Thread is inactive and generator still has values : run something in the thread
$item_id = $generator->current();
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
echo
"ThreadId: $thread_id => Item: $item_id (Start)\n";
$threads[$thread_id]['is_active'] = true;
$generator->next();
} elseif (!isset(
$threads[$thread_id]['run'])) {
// Destroy supplementary threads in case generator closes sooner than number of threads
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} elseif (
$threads[$thread_id]['run']->done()) {
// Thread finished. Get results
$item = $threads[$thread_id]['run']->value();
echo
"ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

if (!
$generator->valid()) {
// Generator is closed then destroy thread
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} else {
// Thread is ready to run again
$threads[$thread_id]['is_active'] = false;
}
}
}

// Escape loop when all threads are destroyed
if (empty($threads)) break;
}
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>
To Top