如何在PHP框架Workerman中实现异步任务处理
在现代Web应用中,处理繁重的业务逻辑时,避免主业务流程被长时间阻塞是非常重要的。Workerman是一个高性能的PHP Socket框架,支持异步任务处理,可以有效地解决这一问题。本文将详细介绍如何在Workerman中实现异步任务处理,并提供具体的代码示例,包括大型邮件发送、异步下单、异步订单检测和异步网络数据处理等场景。
异步任务处理的基本原理
在Workerman中,异步任务处理主要通过创建独立的Task Worker来实现。这些Task Worker可以处理耗时的操作,如发送邮件、处理订单等,从而避免阻塞主进程。主进程可以通过AsyncTcpConnection
将任务数据发送给Task Worker,Task Worker处理完任务后将结果返回给主进程。
大型邮件发送示例
假设我们需要给1000个用户发送邮件,这个过程可能需要很长时间。我们可以使用Workerman的异步任务机制来处理这一需求。
任务进程服务端代码
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个Text协议的Task Worker,监听12345端口
$task_worker = new Worker('Text://0.0.0.0:12345');
$task_worker->count = 100; // 根据需要设置Task Worker的数量
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
$task_data = json_decode($task_data, true);
// 处理邮件发送逻辑
$task_result = send_mail($task_data['email'], $task_data['content']);
// 发送处理结果
$connection->send(json_encode($task_result));
};
Worker::runAll();
在Workerman中调用异步任务
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个Websocket服务
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function($connection, $data) {
// 假设data中包含了邮件内容和接收者信息
$task_data = [
'email' => $data['email'],
'content' => $data['content']
];
// 创建异步连接到Task Worker
$async_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
$async_connection->send(json_encode($task_data));
$async_connection->onMessage = function($async_connection, $result) use ($connection) {
// 处理Task Worker返回的结果
$connection->send($result);
$async_connection->close();
};
// 执行异步连接
$async_connection->connect();
};
Worker::runAll();
异步下单示例
在电商平台中,下单操作可能涉及多个子系统的调用,如库存系统、支付系统等。这些操作可以通过异步任务来处理,以提高系统的响应速度和处理能力。
任务进程服务端代码
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$task_worker = new Worker('Text://0.0.0.0:12346');
$task_worker->count = 100;
$task_worker->name = 'OrderTaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
$task_data = json_decode($task_data, true);
// 处理下单逻辑
$task_result = process_order($task_data['order_info']);
$connection->send(json_encode($task_result));
};
Worker::runAll();
在定时任务中调用异步下单任务
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个定时任务服务
$worker = new Worker();
$worker->onWorkerStart = function() {
// 每分钟执行一次
\Workerman\Lib\Timer::add(60, function() {
$order_info = get_pending_orders();
$task_data = ['order_info' => $order_info];
$async_connection = new AsyncTcpConnection('Text://127.0.0.1:12346');
$async_connection->send(json_encode($task_data));
$async_connection->onMessage = function($async_connection, $result) {
// 处理Task Worker返回的结果
$async_connection->close();
};
$async_connection->connect();
});
};
Worker::runAll();
异步订单检测示例
订单检测通常需要定期检查订单的状态,如支付是否成功、物流信息是否更新等。这些操作可以通过异步任务来实现,以减少主业务流程的负担。
任务进程服务端代码
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$task_worker = new Worker('Text://0.0.0.0:12347');
$task_worker->count = 100;
$task_worker->name = 'OrderCheckTaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
$task_data = json_decode($task_data, true);
// 处理订单检测逻辑
$task_result = check_order_status($task_data['order_id']);
$connection->send(json_encode($task_result));
};
Worker::runAll();
在HTTP请求中调用异步订单检测任务
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个HTTP服务
$worker = new Worker('http://0.0.0.0:8082');
$worker->onMessage = function($connection, $request) {
$order_id = $request->get('order_id');
$task_data = ['order_id' => $order_id];
$async_connection = new AsyncTcpConnection('Text://127.0.0.1:12347');
$async_connection->send(json_encode($task_data));
$async_connection->onMessage = function($async_connection, $result) use ($connection) {
$connection->send($result);
$async_connection->close();
};
$async_connection->connect();
};
Worker::runAll();
异步网络数据处理示例
在处理大量网络数据时,如爬虫数据、API响应数据等,可以使用异步任务来提高处理效率。
任务进程服务端代码
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$task_worker = new Worker('Text://0.0.0.0:12348');
$task_worker->count = 100;
$task_worker->name = 'DataProcessTaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
$task_data = json_decode($task_data, true);
// 处理网络数据
$task_result = process_network_data($task_data['data']);
$connection->send(json_encode($task_result));
};
Worker::runAll();
在Controller中调用异步网络数据处理任务
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
class DataController {
public function processData($data) {
$task_data = ['data' => $data];
$async_connection = new AsyncTcpConnection('Text://127.0.0.1:12348');
$async_connection->send(json_encode($task_data));
$async_connection->onMessage = function($async_connection, $result) {
// 处理Task Worker返回的结果
$async_connection->close();
};
$async_connection->connect();
}
}
最后,我们假设上面的示例都是我们需要运行的~
1. 编写Worker文件
假设我们有一个项目结构如下:
project/
├── vendor/
├── composer.json
├── send_mail_worker.php
├── order_worker.php
├── order_check_worker.php
├── data_process_worker.php
└── run_workers.php
send_mail_worker.php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个Text协议的Task Worker,监听12345端口
$task_worker = new Worker('Text://0.0.0.0:12345');
$task_worker->count = 100; // 根据需要设置Task Worker的数量
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
$task_data = json_decode($task_data, true);
// 处理邮件发送逻辑
$task_result = send_mail($task_data['email'], $task_data['content']);
// 发送处理结果
$connection->send(json_encode($task_result));
};
Worker::runAll();
order_worker.php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$task_worker = new Worker('Text://0.0.0.0:12346');
$task_worker->count = 100;
$task_worker->name = 'OrderTaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
$task_data = json_decode($task_data, true);
// 处理下单逻辑
$task_result = process_order($task_data['order_info']);
$connection->send(json_encode($task_result));
};
Worker::runAll();
order_check_worker.php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$task_worker = new Worker('Text://0.0.0.0:12347');
$task_worker->count = 100;
$task_worker->name = 'OrderCheckTaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
$task_data = json_decode($task_data, true);
// 处理订单检测逻辑
$task_result = check_order_status($task_data['order_id']);
$connection->send(json_encode($task_result));
};
Worker::runAll();
data_process_worker.php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$task_worker = new Worker('Text://0.0.0.0:12348');
$task_worker->count = 100;
$task_worker->name = 'DataProcessTaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data) {
$task_data = json_decode($task_data, true);
// 处理网络数据
$task_result = process_network_data($task_data['data']);
$connection->send(json_encode($task_result));
};
Worker::runAll();
2. 编写运行脚本
run_workers.php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
// 启动所有Worker
Worker::runAll();
3. 执行Worker
在终端中,进入项目目录并执行以下命令来启动所有Worker:
php run_workers.php start
4. 测试Worker
你可以使用以下命令来测试每个Worker的功能:
测试邮件发送Worker
php send_mail_worker.php start
测试下单Worker
php order_worker.php start
测试订单检测Worker
php order_check_worker.php start
测试网络数据处理Worker
php data_process_worker.php start
5. 停止Worker
如果你想停止所有Worker,可以使用以下命令:
php run_workers.php stop
通过这种方式,你可以轻松地启动、测试和停止Workerman的Worker,确保你的异步任务处理系统正常运行。
版权声明:本文为原创文章,版权归 全栈开发技术博客 所有。
本文链接:https://www.lvtao.net/dev/php-workerman-async-task-worker.html
转载时须注明出处及本声明