workerman 接入文心一言的流式输出
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 注意:这里与上个例子不同,使用的是websocket协议
$ws_worker = new Worker("websocket://0.0.0.0:2000");
// 启动4个进程对外提供服务
$ws_worker->count = 4;
// 当收到客户端发来的数据后返回hello $data给客户端
$ws_worker->onMessage = function(TcpConnection $connection, $data)
{
// 向客户端发送hello $data
$connection->send('hello ' . $data);
$token = getAccessToken();
$url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions_pro?access_token=".$token;
/*$map = '{
"messages": [
{
"role": "user",
"content": "给我推荐一些自驾游路线"
}
],
"stream":"True"
}';*/
//$maparr = ['messages'=>['role'=>'user','content'=>'hello'],'stream'=>true];
$maparr = ["messages" => [["role" => "user", "content" => 'hello']],"stream" => true];
curlStreamRequest(
$url, // 替换为实际的 API URL
['Content-Type: application/json'], // 替换为实际的请求头
json_encode($maparr), // 替换为实际的 POST 数据(如果需要)
'handleResponseData' // 传递回调函数名作为字符串(如果回调函数在全局作用域中)
// 或者直接传递闭包函数
// function($data) {
// handleResponseData($data);
// }
);
};
function getAccessToken(){
$curl = curl_init();
$postData1 = array(
'grant_type' => 'client_credentials',
'client_id' => '****', //接入文心一言key
'client_secret' =>'****' 接入文心一言的secret
);
curl_setopt_array($curl, array(
CURLOPT_URL => 'https://aip.baidubce.com/oauth/2.0/token',
CURLOPT_CUSTOMREQUEST => 'POST',
CURLOPT_SSL_VERIFYPEER => false,
CURLOPT_SSL_VERIFYHOST => false,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POSTFIELDS => http_build_query($postData1)
));
$response = curl_exec($curl);
curl_close($curl);
$rtn = json_decode($response);
return $rtn->access_token;
}
function curlStreamRequest(string $url, array $headers = [], $postData = null, callable $callback) {
echo $postData;
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, false); // 不将响应保存为字符串,直接处理
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); // 注意:在生产环境中应启用 SSL 验证
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false); // 注意:同上
curl_setopt($ch, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
curl_setopt($ch, CURLOPT_POST, is_array($postData) || !empty($postData));
curl_setopt($ch, CURLOPT_POSTFIELDS, $postData);
curl_setopt($ch, CURLOPT_WRITEFUNCTION, function ($ch, $data) use ($callback) {
// 调用回调函数处理数据
$callback($data);
return strlen($data); // 返回接收到的数据长度
});
// 执行请求并获取响应
curl_exec($ch);
// 检查是否有错误发生
if (curl_errno($ch)) {
throw new \Exception(curl_error($ch));
}
// 关闭 cURL 句柄
curl_close($ch);
}
/**
* 示例回调函数,用于处理接收到的数据并返回给客户端
*
* @param string $data 接收到的数据片段
*/
function handleResponseData($data) {
// 在这里,你可以将数据写入输出缓冲区或直接发送给客户端
// 例如,使用 echo 或 SSE 发送数据
echo $data; // 假设这里直接将数据发送给客户端
flush(); // 刷新输出缓冲区
}
// 使用示例
/*curlStreamRequest(
'https://example.com/api/stream', // 替换为实际的 API URL
['Content-Type: application/json'], // 替换为实际的请求头
json_encode(['key' => 'value']), // 替换为实际的 POST 数据(如果需要)
'handleResponseData' // 传递回调函数名作为字符串(如果回调函数在全局作用域中)
// 或者直接传递闭包函数
// function($data) {
// handleResponseData($data);
// }
);
*/
// 运行worker
Worker::runAll();