如何在PHP框架Workerman中实现异步任务处理

在现代Web应用中,处理繁重的业务逻辑时,避免主业务流程被长时间阻塞是非常重要的。Workerman是一个高性能的PHP Socket框架,支持异步任务处理,可以有效地解决这一问题。本文将详细介绍如何在Workerman中实现异步任务处理,并提供具体的代码示例,包括大型邮件发送、异步下单、异步订单检测和异步网络数据处理等场景。

异步任务处理的基本原理

在Workerman中,异步任务处理主要通过创建独立的Task Worker来实现。这些Task Worker可以处理耗时的操作,如发送邮件、处理订单等,从而避免阻塞主进程。主进程可以通过AsyncTcpConnection将任务数据发送给Task Worker,Task Worker处理完任务后将结果返回给主进程。

大型邮件发送示例

假设我们需要给1000个用户发送邮件,这个过程可能需要很长时间。我们可以使用Workerman的异步任务机制来处理这一需求。

任务进程服务端代码

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个Text协议的Task Worker,监听12345端口
$task_worker = new Worker('Text://0.0.0.0:12345');
$task_worker->count = 100; // 根据需要设置Task Worker的数量
$task_worker->name = 'TaskWorker';

$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
    $task_data = json_decode($task_data, true);
    // 处理邮件发送逻辑
    $task_result = send_mail($task_data['email'], $task_data['content']);
    // 发送处理结果
    $connection->send(json_encode($task_result));
};

Worker::runAll();

在Workerman中调用异步任务

use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个Websocket服务
$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function($connection, $data) {
    // 假设data中包含了邮件内容和接收者信息
    $task_data = [
        'email' => $data['email'],
        'content' => $data['content']
    ];

    // 创建异步连接到Task Worker
    $async_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
    $async_connection->send(json_encode($task_data));
    
    $async_connection->onMessage = function($async_connection, $result) use ($connection) {
        // 处理Task Worker返回的结果
        $connection->send($result);
        $async_connection->close();
    };
    // 执行异步连接
    $async_connection->connect();
};

Worker::runAll();

异步下单示例

在电商平台中,下单操作可能涉及多个子系统的调用,如库存系统、支付系统等。这些操作可以通过异步任务来处理,以提高系统的响应速度和处理能力。

任务进程服务端代码

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$task_worker = new Worker('Text://0.0.0.0:12346');
$task_worker->count = 100;
$task_worker->name = 'OrderTaskWorker';

$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
    $task_data = json_decode($task_data, true);
    // 处理下单逻辑
    $task_result = process_order($task_data['order_info']);
    $connection->send(json_encode($task_result));
};

Worker::runAll();

在定时任务中调用异步下单任务

use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个定时任务服务
$worker = new Worker();
$worker->onWorkerStart = function() {
    // 每分钟执行一次
    \Workerman\Lib\Timer::add(60, function() {
        $order_info = get_pending_orders();
        $task_data = ['order_info' => $order_info];

        $async_connection = new AsyncTcpConnection('Text://127.0.0.1:12346');
        $async_connection->send(json_encode($task_data));
        
        $async_connection->onMessage = function($async_connection, $result) {
            // 处理Task Worker返回的结果
            $async_connection->close();
        };
        $async_connection->connect();
    });
};

Worker::runAll();

异步订单检测示例

订单检测通常需要定期检查订单的状态,如支付是否成功、物流信息是否更新等。这些操作可以通过异步任务来实现,以减少主业务流程的负担。

任务进程服务端代码

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$task_worker = new Worker('Text://0.0.0.0:12347');
$task_worker->count = 100;
$task_worker->name = 'OrderCheckTaskWorker';

$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
    $task_data = json_decode($task_data, true);
    // 处理订单检测逻辑
    $task_result = check_order_status($task_data['order_id']);
    $connection->send(json_encode($task_result));
};

Worker::runAll();

在HTTP请求中调用异步订单检测任务

use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个HTTP服务
$worker = new Worker('http://0.0.0.0:8082');

$worker->onMessage = function($connection, $request) {
    $order_id = $request->get('order_id');
    $task_data = ['order_id' => $order_id];

    $async_connection = new AsyncTcpConnection('Text://127.0.0.1:12347');
    $async_connection->send(json_encode($task_data));
    
    $async_connection->onMessage = function($async_connection, $result) use ($connection) {
        $connection->send($result);
        $async_connection->close();
    };
    $async_connection->connect();
};

Worker::runAll();

异步网络数据处理示例

在处理大量网络数据时,如爬虫数据、API响应数据等,可以使用异步任务来提高处理效率。

任务进程服务端代码

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$task_worker = new Worker('Text://0.0.0.0:12348');
$task_worker->count = 100;
$task_worker->name = 'DataProcessTaskWorker';

$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
    $task_data = json_decode($task_data, true);
    // 处理网络数据
    $task_result = process_network_data($task_data['data']);
    $connection->send(json_encode($task_result));
};

Worker::runAll();

在Controller中调用异步网络数据处理任务

use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

class DataController {
    public function processData($data) {
        $task_data = ['data' => $data];

        $async_connection = new AsyncTcpConnection('Text://127.0.0.1:12348');
        $async_connection->send(json_encode($task_data));
        
        $async_connection->onMessage = function($async_connection, $result) {
            // 处理Task Worker返回的结果
            $async_connection->close();
        };
        $async_connection->connect();
    }
}

最后,我们假设上面的示例都是我们需要运行的~

1. 编写Worker文件

假设我们有一个项目结构如下:

project/
├── vendor/
├── composer.json
├── send_mail_worker.php
├── order_worker.php
├── order_check_worker.php
├── data_process_worker.php
└── run_workers.php

send_mail_worker.php

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 创建一个Text协议的Task Worker,监听12345端口
$task_worker = new Worker('Text://0.0.0.0:12345');
$task_worker->count = 100; // 根据需要设置Task Worker的数量
$task_worker->name = 'TaskWorker';

$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
    $task_data = json_decode($task_data, true);
    // 处理邮件发送逻辑
    $task_result = send_mail($task_data['email'], $task_data['content']);
    // 发送处理结果
    $connection->send(json_encode($task_result));
};

Worker::runAll();

order_worker.php

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$task_worker = new Worker('Text://0.0.0.0:12346');
$task_worker->count = 100;
$task_worker->name = 'OrderTaskWorker';

$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
    $task_data = json_decode($task_data, true);
    // 处理下单逻辑
    $task_result = process_order($task_data['order_info']);
    $connection->send(json_encode($task_result));
};

Worker::runAll();

order_check_worker.php

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$task_worker = new Worker('Text://0.0.0.0:12347');
$task_worker->count = 100;
$task_worker->name = 'OrderCheckTaskWorker';

$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
    $task_data = json_decode($task_data, true);
    // 处理订单检测逻辑
    $task_result = check_order_status($task_data['order_id']);
    $connection->send(json_encode($task_result));
};

Worker::runAll();

data_process_worker.php

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$task_worker = new Worker('Text://0.0.0.0:12348');
$task_worker->count = 100;
$task_worker->name = 'DataProcessTaskWorker';

$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
    $task_data = json_decode($task_data, true);
    // 处理网络数据
    $task_result = process_network_data($task_data['data']);
    $connection->send(json_encode($task_result));
};

Worker::runAll();

2. 编写运行脚本

run_workers.php

use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

// 启动所有Worker
Worker::runAll();

3. 执行Worker

在终端中,进入项目目录并执行以下命令来启动所有Worker:

php run_workers.php start

4. 测试Worker

你可以使用以下命令来测试每个Worker的功能:

测试邮件发送Worker

php send_mail_worker.php start

测试下单Worker

php order_worker.php start

测试订单检测Worker

php order_check_worker.php start

测试网络数据处理Worker

php data_process_worker.php start

5. 停止Worker

如果你想停止所有Worker,可以使用以下命令:

php run_workers.php stop

通过这种方式,你可以轻松地启动、测试和停止Workerman的Worker,确保你的异步任务处理系统正常运行。

标签: PHP

相关文章

Memcached如何配置分布式使用 并附PHP示例

Memcached是一种高性能的分布式内存对象缓存系统,广泛用于加速动态Web应用程序。通过将数据存储在内存中,Memcached能够显著减少数据库负载,提高应用的响应速度Memcached分布...

使用PHP打造轻量级单文件SQLite数据库管理工具

先声明一下,这是我自己内网使用的一个简单的管理工具,所以安全性方面我肯定是没有测试的~ 如果你要放在公网,请添加相关的权限认证及sql防注入等处理在开发过程中,我们经常需要一个简单易用的数据库管...

PHP 中的 declare 指令

在 PHP 编程中,declare 指令是一个强大的工具,用于控制代码的执行行为。它不仅可以启用严格类型模式,还可以用于其他一些高级功能,如性能监控和字符编码。本文将深入探讨 declare 指...

图片Base64编码

CSR生成

图片无损放大

图片占位符

Excel拆分文件