实现聊天流式输出

This commit is contained in:
2026-05-29 21:59:15 +08:00
parent da803da2de
commit 4bf5195bfd
9 changed files with 498 additions and 140 deletions
+321 -9
View File
@@ -1,5 +1,7 @@
#include "OpenAICompatibleProvider.h"
#include "../util/Logger.h"
#include <QJsonArray>
#include <QJsonDocument>
#include <QJsonObject>
@@ -8,21 +10,75 @@
#include <QNetworkRequest>
#include <QStringList>
#include <QUrl>
#include <QUrlQuery>
#include <utility>
namespace
{
constexpr int MaxDiagnosticBodyLength = 1000;
QString trimmedResponseBody(const QByteArray &body)
{
const QString text = QString::fromUtf8(body).trimmed();
constexpr int MaxErrorBodyLength = 1000;
if (text.size() <= MaxErrorBodyLength)
if (text.size() <= MaxDiagnosticBodyLength)
{
return text;
}
return text.left(MaxErrorBodyLength) + QStringLiteral("...");
return text.left(MaxDiagnosticBodyLength) + QStringLiteral("...");
}
QString oneLine(QString text)
{
text.replace(QLatin1Char('\r'), QLatin1Char(' '));
text.replace(QLatin1Char('\n'), QLatin1Char(' '));
text.replace(QLatin1Char('\t'), QLatin1Char(' '));
return text.simplified();
}
bool isSensitiveQueryName(const QString &name)
{
const QString lowerName = name.toLower();
return lowerName.contains(QStringLiteral("key"))
|| lowerName.contains(QStringLiteral("token"))
|| lowerName.contains(QStringLiteral("secret"))
|| lowerName.contains(QStringLiteral("password"))
|| lowerName.contains(QStringLiteral("authorization"))
|| lowerName.contains(QStringLiteral("signature"));
}
QString sanitizedUrlString(QUrl url)
{
url.setUserInfo(QString());
QUrlQuery query(url);
if (!query.isEmpty())
{
QUrlQuery sanitizedQuery;
const auto items = query.queryItems(QUrl::FullyDecoded);
for (const auto &item : items)
{
sanitizedQuery.addQueryItem(
item.first,
isSensitiveQueryName(item.first) ? QStringLiteral("<redacted>") : item.second);
}
url.setQuery(sanitizedQuery);
}
return url.toString(QUrl::FullyEncoded);
}
QString diagnosticContext(const AIConfig &config, const QUrl &url)
{
return QStringLiteral("provider=%1 protocol=%2 model=%3 url=%4 timeoutMs=%5 maxTokens=%6 temperature=%7")
.arg(config.provider.trimmed().isEmpty() ? QStringLiteral("<empty>") : config.provider.trimmed())
.arg(config.protocol.trimmed().isEmpty() ? QStringLiteral("<empty>") : config.protocol.trimmed())
.arg(config.model.trimmed().isEmpty() ? QStringLiteral("<empty>") : config.model.trimmed())
.arg(sanitizedUrlString(url))
.arg(config.timeoutMs)
.arg(config.maxTokens)
.arg(QString::number(config.temperature, 'f', 2));
}
QString errorMessageFromBody(const QByteArray &body)
@@ -113,6 +169,23 @@ bool OpenAICompatibleProvider::isBusy() const
}
void OpenAICompatibleProvider::sendChatRequest(const ChatRequest &request, ResponseCallback callback)
{
sendChatRequestInternal(request, false, nullptr, std::move(callback));
}
void OpenAICompatibleProvider::sendStreamingChatRequest(
const ChatRequest &request,
StreamCallback streamCallback,
ResponseCallback callback)
{
sendChatRequestInternal(request, true, std::move(streamCallback), std::move(callback));
}
void OpenAICompatibleProvider::sendChatRequestInternal(
const ChatRequest &request,
bool stream,
StreamCallback streamCallback,
ResponseCallback callback)
{
if (isBusy())
{
@@ -151,13 +224,34 @@ void OpenAICompatibleProvider::sendChatRequest(const ChatRequest &request, Respo
}
m_callback = std::move(callback);
m_streamCallback = std::move(streamCallback);
m_streaming = stream;
m_streamDone = false;
m_streamBuffer.clear();
m_streamRawBody.clear();
m_streamedContent.clear();
QNetworkRequest networkRequest(requestUrl());
const QUrl url = requestUrl();
QNetworkRequest networkRequest(url);
networkRequest.setHeader(QNetworkRequest::ContentTypeHeader, QStringLiteral("application/json"));
networkRequest.setRawHeader("Authorization", QByteArray("Bearer ") + m_config.apiKey.toUtf8());
const QJsonDocument document(buildPayload(request));
m_currentReply = m_networkManager.post(networkRequest, document.toJson(QJsonDocument::Compact));
const QJsonDocument document(buildPayload(request, stream));
const QByteArray payload = document.toJson(QJsonDocument::Compact);
Logger::info(QStringLiteral("AI request started: %1 stream=%2 messageCount=%3 payloadBytes=%4")
.arg(diagnosticContext(m_config, url))
.arg(stream ? QStringLiteral("true") : QStringLiteral("false"))
.arg(QString::number(request.messages.size()))
.arg(QString::number(payload.size())));
m_currentReply = m_networkManager.post(networkRequest, payload);
if (stream)
{
m_readyReadConnection = QObject::connect(m_currentReply, &QNetworkReply::readyRead, [this]() {
handleStreamReadyRead();
});
}
m_replyFinishedConnection = QObject::connect(m_currentReply, &QNetworkReply::finished, [this]() {
if (m_currentReply.isNull())
{
@@ -166,7 +260,37 @@ void OpenAICompatibleProvider::sendChatRequest(const ChatRequest &request, Respo
QNetworkReply *reply = m_currentReply;
const QByteArray body = reply->readAll();
ChatResponse response = parseResponse(reply, body);
ChatResponse response;
if (m_streaming)
{
if (!body.isEmpty())
{
m_streamRawBody.append(body);
}
if (reply->error() == QNetworkReply::NoError && !body.isEmpty())
{
m_streamBuffer.append(body);
processStreamBuffer();
if (m_currentReply.isNull())
{
return;
}
}
if (reply->error() == QNetworkReply::NoError && !m_streamBuffer.isEmpty())
{
m_streamBuffer.append('\n');
processStreamBuffer();
if (m_currentReply.isNull())
{
return;
}
}
response = finishStreamingResponse(reply, body);
}
else
{
response = parseResponse(reply, body);
}
clearReply();
if (m_callback)
@@ -184,6 +308,11 @@ void OpenAICompatibleProvider::cancel()
{
m_callback = nullptr;
QPointer<QNetworkReply> reply = m_currentReply;
if (!reply.isNull())
{
Logger::info(QStringLiteral("AI request canceled: %1")
.arg(diagnosticContext(m_config, reply->request().url())));
}
clearReply();
if (!reply.isNull())
@@ -192,7 +321,7 @@ void OpenAICompatibleProvider::cancel()
}
}
QJsonObject OpenAICompatibleProvider::buildPayload(const ChatRequest &request) const
QJsonObject OpenAICompatibleProvider::buildPayload(const ChatRequest &request, bool stream) const
{
QJsonArray messages;
for (const ChatMessage &message : request.messages)
@@ -206,7 +335,7 @@ QJsonObject OpenAICompatibleProvider::buildPayload(const ChatRequest &request) c
QJsonObject payload;
payload.insert(QStringLiteral("model"), m_config.model);
payload.insert(QStringLiteral("messages"), messages);
payload.insert(QStringLiteral("stream"), false);
payload.insert(QStringLiteral("stream"), stream);
payload.insert(QStringLiteral("temperature"), m_config.temperature);
payload.insert(QStringLiteral("max_tokens"), m_config.maxTokens);
return payload;
@@ -230,12 +359,113 @@ QUrl OpenAICompatibleProvider::requestUrl() const
return QUrl(baseUrl + path);
}
void OpenAICompatibleProvider::handleStreamReadyRead()
{
if (m_currentReply.isNull())
{
return;
}
const QByteArray chunk = m_currentReply->readAll();
m_streamRawBody.append(chunk);
m_streamBuffer.append(chunk);
processStreamBuffer();
}
void OpenAICompatibleProvider::processStreamBuffer()
{
while (true)
{
const int lineEnd = m_streamBuffer.indexOf('\n');
if (lineEnd < 0)
{
return;
}
QByteArray line = m_streamBuffer.left(lineEnd);
m_streamBuffer.remove(0, lineEnd + 1);
line = line.trimmed();
if (line.isEmpty() || line.startsWith(':'))
{
continue;
}
if (!line.startsWith("data:"))
{
continue;
}
const QByteArray payload = line.mid(5).trimmed();
if (!handleStreamPayload(payload))
{
return;
}
}
}
bool OpenAICompatibleProvider::handleStreamPayload(const QByteArray &payload)
{
if (payload == "[DONE]")
{
m_streamDone = true;
return true;
}
QJsonParseError parseError;
const QJsonDocument document = QJsonDocument::fromJson(payload, &parseError);
if (parseError.error != QJsonParseError::NoError || !document.isObject())
{
Logger::warning(QStringLiteral("AI stream chunk JSON parse failed: %1 parseError=\"%2\" chunkBytes=%3")
.arg(diagnosticContext(m_config, requestUrl()))
.arg(oneLine(parseError.errorString()))
.arg(QString::number(payload.size())));
return true;
}
const QJsonObject root = document.object();
if (root.contains(QStringLiteral("error")))
{
const QString bodyError = errorMessageFromBody(payload);
finishWithError(bodyError.isEmpty() ? QStringLiteral("AI stream returned an error.") : bodyError);
return false;
}
const QJsonArray choices = root.value(QStringLiteral("choices")).toArray();
if (choices.isEmpty() || !choices.first().isObject())
{
return true;
}
const QJsonObject choice = choices.first().toObject();
const QJsonObject delta = choice.value(QStringLiteral("delta")).toObject();
const QString content = delta.value(QStringLiteral("content")).toString();
if (content.isEmpty())
{
return true;
}
m_streamedContent += content;
if (m_streamCallback)
{
m_streamCallback(content);
}
return true;
}
ChatResponse OpenAICompatibleProvider::parseResponse(QNetworkReply *reply, const QByteArray &body) const
{
const int httpStatus = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
if (reply->error() != QNetworkReply::NoError)
{
const QString bodyError = errorMessageFromBody(body);
Logger::warning(QStringLiteral("AI request network error: %1 qtError=%2 errorString=\"%3\" httpStatus=%4 body=\"%5\"")
.arg(diagnosticContext(m_config, reply->request().url()))
.arg(static_cast<int>(reply->error()))
.arg(oneLine(reply->errorString()))
.arg(httpStatus)
.arg(oneLine(trimmedResponseBody(body))));
if (!bodyError.isEmpty())
{
return {false, {}, bodyError, httpStatus};
@@ -248,6 +478,11 @@ ChatResponse OpenAICompatibleProvider::parseResponse(QNetworkReply *reply, const
const QJsonDocument document = QJsonDocument::fromJson(body, &parseError);
if (parseError.error != QJsonParseError::NoError || !document.isObject())
{
Logger::warning(QStringLiteral("AI response JSON parse failed: %1 httpStatus=%2 parseError=\"%3\" body=\"%4\"")
.arg(diagnosticContext(m_config, reply->request().url()))
.arg(httpStatus)
.arg(oneLine(parseError.errorString()))
.arg(oneLine(trimmedResponseBody(body))));
return {false, {}, QStringLiteral("AI response is not valid JSON."), httpStatus};
}
@@ -255,6 +490,10 @@ ChatResponse OpenAICompatibleProvider::parseResponse(QNetworkReply *reply, const
const QJsonArray choices = root.value(QStringLiteral("choices")).toArray();
if (choices.isEmpty() || !choices.first().isObject())
{
Logger::warning(QStringLiteral("AI response has no choices: %1 httpStatus=%2 body=\"%3\"")
.arg(diagnosticContext(m_config, reply->request().url()))
.arg(httpStatus)
.arg(oneLine(trimmedResponseBody(body))));
return {false, {}, QStringLiteral("AI response has no choices."), httpStatus};
}
@@ -262,15 +501,76 @@ ChatResponse OpenAICompatibleProvider::parseResponse(QNetworkReply *reply, const
const QString content = message.value(QStringLiteral("content")).toString();
if (content.isEmpty())
{
Logger::warning(QStringLiteral("AI response content is empty: %1 httpStatus=%2 body=\"%3\"")
.arg(diagnosticContext(m_config, reply->request().url()))
.arg(httpStatus)
.arg(oneLine(trimmedResponseBody(body))));
return {false, {}, QStringLiteral("AI response content is empty."), httpStatus};
}
Logger::info(QStringLiteral("AI request completed: %1 httpStatus=%2 responseChars=%3 responseBytes=%4")
.arg(diagnosticContext(m_config, reply->request().url()))
.arg(httpStatus)
.arg(QString::number(content.size()))
.arg(QString::number(body.size())));
return {true, content, {}, httpStatus};
}
ChatResponse OpenAICompatibleProvider::finishStreamingResponse(QNetworkReply *reply, const QByteArray &body) const
{
const int httpStatus = reply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
const QByteArray diagnosticBody = m_streamedContent.isEmpty()
? (m_streamRawBody.isEmpty() ? body : m_streamRawBody)
: QByteArray();
if (reply->error() != QNetworkReply::NoError)
{
const QString bodyError = errorMessageFromBody(diagnosticBody);
Logger::warning(QStringLiteral("AI streaming request network error: %1 qtError=%2 errorString=\"%3\" httpStatus=%4 body=\"%5\" streamedChars=%6")
.arg(diagnosticContext(m_config, reply->request().url()))
.arg(static_cast<int>(reply->error()))
.arg(oneLine(reply->errorString()))
.arg(httpStatus)
.arg(oneLine(trimmedResponseBody(diagnosticBody)))
.arg(QString::number(m_streamedContent.size())));
if (!bodyError.isEmpty())
{
return {false, {}, bodyError, httpStatus};
}
return {false, {}, reply->errorString(), httpStatus};
}
if (m_streamedContent.isEmpty())
{
Logger::warning(QStringLiteral("AI streaming response content is empty: %1 httpStatus=%2 streamDone=%3 residualBytes=%4 body=\"%5\"")
.arg(diagnosticContext(m_config, reply->request().url()))
.arg(httpStatus)
.arg(m_streamDone ? QStringLiteral("true") : QStringLiteral("false"))
.arg(QString::number(m_streamBuffer.size()))
.arg(oneLine(trimmedResponseBody(diagnosticBody))));
return {false, {}, QStringLiteral("AI streaming response content is empty."), httpStatus};
}
Logger::info(QStringLiteral("AI streaming request completed: %1 httpStatus=%2 streamDone=%3 responseChars=%4")
.arg(diagnosticContext(m_config, reply->request().url()))
.arg(httpStatus)
.arg(m_streamDone ? QStringLiteral("true") : QStringLiteral("false"))
.arg(QString::number(m_streamedContent.size())));
return {true, m_streamedContent, {}, httpStatus};
}
void OpenAICompatibleProvider::finishWithError(const QString &message, int httpStatus)
{
QPointer<QNetworkReply> reply = m_currentReply;
const QUrl url = reply.isNull() ? requestUrl() : reply->request().url();
Logger::warning(QStringLiteral("AI request finished with error: %1 httpStatus=%2 error=\"%3\"")
.arg(diagnosticContext(m_config, url))
.arg(httpStatus)
.arg(oneLine(message)));
clearReply();
if (!reply.isNull())
@@ -294,10 +594,22 @@ void OpenAICompatibleProvider::clearReply()
QObject::disconnect(m_replyFinishedConnection);
m_replyFinishedConnection = {};
}
if (m_readyReadConnection)
{
QObject::disconnect(m_readyReadConnection);
m_readyReadConnection = {};
}
if (!m_currentReply.isNull())
{
m_currentReply->deleteLater();
m_currentReply.clear();
}
m_streamCallback = nullptr;
m_streamBuffer.clear();
m_streamRawBody.clear();
m_streamedContent.clear();
m_streaming = false;
m_streamDone = false;
}