探索异步PHP
#php #async

异步编程是一个基础构建块,用于扩展Web应用程序,因为在每个Web请求中需要做更多的事情。一个典型的例子是发送电子邮件作为请求的一部分。

在许多Web应用程序中,当服务器上处理某些内容时,我们想通过电子邮件通知人们,这很常见是单独的HTTP请求到第三方服务,例如SendGrid,MailChimp等。

当您需要一次发送很多电子邮件时,这将不仅仅是一个琐碎的示例。在PHP中,如果您想发送电子邮件,并且HTTP流程需要100毫秒才能完成,则可以通过发送数十或数百封电子邮件来快速增加请求的总时间。

当然,任何出色的第三方电子邮件服务都将提供批量终结点来否定这一点,但是为了示例 - 假设您要发送100封电子邮件,并且必须单独处理每个电子邮件。

因此,我们需要做出决定:我们如何将电子邮件的处理移动到单独的过程中,以免它阻止原始的Web请求?
这就是我们将在这篇文章中探索的内容,尤其是在有或没有新基础架构的情况下可以在PHP中解决的所有不同方式。

使用exec()

exec()是PHP中的本机函数,可用于执行外部程序并返回结果。在我们的情况下,这可能是发送电子邮件的脚本。此功能使用操作系统来产生一个全新的(空白,没有复制或共享)过程,您可以通过任何状态。

让我们看一个例子。

<?php
// handle a web request

// record the start time of the web request
$start = microtime(true);
$path = __DIR__ . '/send_email.php';

// output to /dev/null & so we don't block to wait for the result
$command = 'php ' . $path . ' --email=%s > /dev/null &';
$emails = ['joe@blogs.com', 'jack@test.com'];

// for each of the emails, call exec to start a new script
foreach ($emails as $email) {
    // Execute the command
    exec(sprintf($command, $email));
}

// record the finish time of the web request
$finish = microtime(true);
$duration = round($finish - $start, 4);

// output duration of web request
echo "finished web request in $duration\n";

send_email.php

<?php

$email = explode('--email=', $argv[1])[1];
// this blocking sleep won't affect the web request duration
// (illustrative purposes only)
sleep(5);

// here we can send the email
echo "sending email to $email\n";

输出

$ php src/exec.php

finished web request in 0.0184

上面的脚本显示Web请求仍以毫秒为单位完成,即使在send_email.php脚本中有一个blocke1函数调用。

之所以没有阻止背景和Web请求可以继续。

这样,Web请求脚本仅负责运行脚本,而不是监视其执行和/或失败。

这是该解决方案的固有下降,因为该过程的监视落到了过程本身,并且无法重新启动。但是,这是将异步行为毫不费力地纳入PHP应用程序中的一种简单方法。

exec在服务器上运行命令,因此您必须谨慎执行脚本,尤其是在涉及用户输入的情况下。使用exec,特别是在管理缩放应用程序时很难管理,因为脚本可能在处理外部Web请求的完全相同的框上运行,因此您最终可能会出现CPU和内存,如果数百或数千个新的新事物流程是通过exec产生的。

pcntl_fork

pcntl_fork是一个低级函数,需要启用PCNTL扩展名,并且是一种强大但潜在的错误俯卧方法,用于编写PHP中的异步代码。

pcntl_fork将分叉或克隆当前的过程并将其分为父母和许多子过程(取决于被调用的次数)。通过检测流程ID或PID,我们可以在父进程或子过程的上下文中运行不同的代码。

父程流程将负责产生子流程并等到产卵过程完成后才能完成。

在这种情况下,我们可以更好地控制流程如何退出并可以轻松编写一些逻辑以处理重试的情况下的重试。

现在,到我们的用例示例代码以非阻滞方式发送电子邮件。

<?php

function sendEmail($to, $subject, $message)
{
    // Code to send email (replace with your email sending logic)
    // This is just a mock implementation for demonstration purposes
    sleep(3); // Simulating sending email by sleeping for 3 seconds
    echo "Email sent to: $to\n";
}

$emails = [
    [
        'to' => 'john@example.com',
        'subject' => 'Hello John',
        'message' => 'This is a test email for John.',
    ],
    [
        'to' => 'jane@example.com',
        'subject' => 'Hello Jane',
        'message' => 'This is a test email for Jane.',
    ],
    // Add more email entries as needed
];

$children = [];

foreach ($emails as $email) {
    $pid = pcntl_fork();

    if ($pid == -1) {
        // Fork failed
        die('Error: Unable to fork process.');
    } elseif ($pid == 0) {
        // Child process
        sendEmail($email['to'], $email['subject'], $email['message']);
        exit(); // Exit the child process
    } else {
        // Parent process
        $children[] = $pid;
    }
}

echo "running some other things in parent process\n";
sleep(3);

// Parent process waits for each child process to finish
foreach ($children as $pid) {
    pcntl_waitpid($pid, $status);
    $status = pcntl_wexitstatus($status);
    echo "Child process $pid exited with status: $status\n";
}

echo 'All emails sent.';

在上面使用pcntl_fork的示例中,我们可以将当前过程分配,该过程将父进程复制到新的子过程中,并等待执行完成。此外,在派生子流程发送电子邮件后,父程流程可以继续做其他事情,然后最终确保子流程完成。

这是使用exec上面的一步,我们在可能的情况下非常有限,因为脚本是完全独立的上下文,因此从总体角度来看是不可能的。

我们还获得了过程隔离,因为每个子过程在单独的内存空间中运行并且不影响其他过程。 通过跟踪流程ID,我们可以有效地监视和管理执行流。

以这种方式直接从Web请求(父程进程)的叉子请求中的缺点是,通过等待子进程完成,原始请求的响应时间在这种方式上没有任何好处。<<<<<<<<<<< /p>

幸运的是,有一个解决方案,它是将execpcntl_fork都结合在一起,以获得两全其美,看起来像这样:

  1. Web请求使用Exec()产生新的PHP流程
  2. 产生的过程将通过一批电子邮件列表
  3. 产生的过程成为父母,因为它分叉单独发送每个电子邮件

这一切都可以在后台发生,而不是阻止原始请求。

让我们看一下这项工作:

<?php

$start = microtime(true);
$path = __DIR__ . '/pcntl_fork_send_email.php';
$emails = implode(',', ['joe@blogs.com', 'jack@test.com']);
$command = 'php ' . $path . ' --emails=%s > /dev/null &';

// Execute the command
echo "running exec\n";
exec(sprintf($command, $emails));
$finish = microtime(true);

$duration = round($finish - $start, 4);
echo "finished web request in $duration\n";

pctnl_fork_send_email.php

<?php

$param = explode('--emails=', $argv[1])[1];
$emails = explode(',', $param);

function sendEmail($to)
{
    sleep(3); // Simulating sending email by sleeping for 3 seconds
    echo "Email sent to: $to\n";
}

$children = [];

foreach ($emails as $email) {
    $pid = pcntl_fork();

    if ($pid == -1) {
        // Fork failed
        die('Error: Unable to fork process.');
    } elseif ($pid == 0) {
        // Child process
        sendEmail($email);
        exit(); // Exit the child process
    } else {
        // Parent process
        $children[] = $pid;
    }
}

echo "running some other things in parent process\n";
sleep(3);

// Parent process waits for each child process to finish
foreach ($children as $pid) {
    pcntl_waitpid($pid, $status);
    $status = pcntl_wexitstatus($status);
    echo "Child process $pid exited with status: $status\n";
}

echo "All emails sent.\n";

该解决方案的美感,尽管更复杂,但您可以一起设置一个单独的过程,其责任是运行和监视分叉过程以异步进行工作。

amphp

amphp(异步多任务php)是库的集合,可让您使用PHP快速构建并发应用程序。

在2021年11月,PHP 8.1发布了对Fibers的支持,该支持实施了轻巧的合作并发模型。

现在,我们了解了amphp的工作原理以及为什么对PHP程序的未来令人兴奋的原因,让我们来看一下一个例子:

<?php

require __DIR__ . '/../vendor/autoload.php'; // Include the autoload file for the amphp/amp library

use function Amp\delay;
use function Amp\async;

function sendEmail($to, $subject, $message)
{
    delay(3000)->onResolve(function () use ($to) {
        echo "Email sent to: $to\n";
    });
}

$emails = [
    [
        'to' => 'john@example.com',
        'subject' => 'Hello John',
        'message' => 'This is a test email for John.',
    ],
    [
        'to' => 'jane@example.com',
        'subject' => 'Hello Jane',
        'message' => 'This is a test email for Jane.',
    ],
    // Add more email entries as needed
];

foreach ($emails as $email) {
    $future = async(static function () use ($email) {
        $to = $email['to'];
        $subject = $email['subject'];
        $message = $email['message'];
        sendEmail($to, $subject, $message);
    });

    // block current process by running $future->await();
}

echo "All emails sent.\n";

上面的脚本是不同步运行事物的非常简单的版本。它将使用给定的闭合,返回未来(对象)。

将创建新的光纤。

这比自己滚动并为您进行繁重的举重要简单得多,这是构建应用程序的关键。

队列和工人

解决此问题的解决方案也存在于PHP之外,并且在PHP 8.1之前,它可以被视为黄金标准,因为它是独立的且高度可扩展的。

使用队列(例如Amazon SQSRabbitMQApache Kafka)已被广泛接受的解决方案。

队列是基础架构的一部分,可以使您的申请不符合您对任何工作的处理的申请。这也不是没有风险或弊端,而是随着时间的推移尝试和测试。

让我们进入一个示例:

在此示例中,发件人通常是您存在的Web应用程序。

sender.php

<?php

require 'vendor/autoload.php';

use Aws\Sqs\SqsClient;

// Initialize the SQS client
$client = new SqsClient([
    'region' => 'us-east-1',
    'version' => 'latest',
    'credentials' => [
        'key' => 'YOUR_AWS_ACCESS_KEY',
        'secret' => 'YOUR_AWS_SECRET_ACCESS_KEY',
    ],
]);

// Define the message details
$message = [
    'to' => 'john@example.com',
    'subject' => 'Hello John',
    'message' => 'This is a test email for John.',
];

// Send the message to SQS
$result = $client->sendMessage([
    'QueueUrl' => 'YOUR_SQS_QUEUE_URL',
    'MessageBody' => json_encode($message),
]);

echo "Message sent to SQS with MessageId: " . $result['MessageId'] . "\n";

工人是运行代码以处理作业的附加部署。

worker.php

<?php

require 'vendor/autoload.php';

use Aws\Sqs\SqsClient;

// Initialize the SQS client
$client = new SqsClient([
    'region' => 'us-east-1',
    'version' => 'latest',
    'credentials' => [
        'key' => 'YOUR_AWS_ACCESS_KEY',
        'secret' => 'YOUR_AWS_SECRET_ACCESS_KEY',
    ],
]);

// Receive and process messages from SQS
while (true) {
    $result = $client->receiveMessage([
        'QueueUrl' => 'YOUR_SQS_QUEUE_URL',
        'MaxNumberOfMessages' => 1,
        'WaitTimeSeconds' => 20,
    ]);

    if (!empty($result['Messages'])) {
        foreach ($result['Messages'] as $message) {
            $body = json_decode($message['Body'], true);

            // Process the message (send email in this case)
            sendEmail($body['to'], $body['subject'], $body['message']);

            // Delete the message from SQS
            $client->deleteMessage([
                'QueueUrl' => 'YOUR_SQS_QUEUE_URL',
                'ReceiptHandle' => $message['ReceiptHandle'],
            ]);
        }
    }
}

function sendEmail($to, $subject, $message)
{
    sleep(3); // Simulating sending email by sleeping for 3 seconds
    echo "Email sent to: $to\n";
}

该解决方案由两个部分组成:

  • 发件人(将消息推到SQS队列)
  • 工作(从队列接收消息并发送电子邮件)

可以通过增加工人数量相对于任何数量发件人发送的消息数来缩放。

通过使用队列,工人完全独立于发件人,可以用任何语言编写,因为发件人和工人之间的通信是通过JSON消息。

哪种解决方案最好?

几乎不可能从上面探索的所有解决方案中说出来,这对于您的应用程序来说是最好的,因为它们都旨在解决使用PHP运行异步代码的问题不同的好处和缺点。

在几点中总结每个选项:

exec()

  • 也许是运行PHP脚本Async
  • 的最简单,最有效的方法
  • 充满潜在的安全性影响,特别是在用户输入周围
  • 没有共享的东西既可以是祝福又是诅咒
  • 可能会导致现有服务器资源(CPU/内存)
  • 增加

pcntl_fork()

  • 允许管理父母/子过程自定义行为
  • 可以在更简单的API中抽象为您的应用程序
  • 克隆当前过程可能会导致其他下游问题

amphp

  • 纤维用户需要PHP 8.1
  • 图书馆已经抽象了运行异步代码的“硬部分”
  • 陡峭的学习曲线超过了其他更传统的方法(了解事件循环和PHP中的多任务)

队列和工人

  • 语言独立,适用于任何用例
  • 引入分布式系统(从长远来看可以是好事或坏事)
  • 周围的许多解决方案和不同的队列提供商使其变得容易

结论

我想更深入地深入了解PHP中异步代码的所有不同可能性的主要原因是了解(如果有的话)PHP 8.1中的纤维如何改变我们将来可以编写异步程序的方式。

有许多解决方案,不需要PHP 8.1进行战斗测试,但是有趣的是看到PHP语言与GolangElixir之类的方向发展,这两个方向都支持异步程序并完成了。多年。

最终,考虑到可伸缩性和跨平台/跨语言支持,我可能仍会达到队列/工人的方法 - 但是,随着时间的流逝,我可能会看到诸如AMPHP之类的库变得更加丰富,并使此问题变得更加容易。在不引入新基础架构的情况下解决。

要查看此博客文章中使用的代码样本,您可以在GitHub上找到它们。