-
流式响应:通过HTTP分块传输(Transfer-Encoding: chunked)实现服务器向客户端持续发送数据
-
SSE协议:使用Content-Type: text/event-stream头部标识事件流,适合AI逐字输出场景
-
双向通信:WebSocket适用于需要实时交互的复杂场景,支持客户端中断生成过程
package main
import (
"fmt"
"net/http"
"time"
)
func streamHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
responses := []string{"G", "o", "l", "a", "n", "d", " ", "A", "I", " ", "流", "式", "输", "出"}
for _, resp := range responses {
fmt.Fprintf(w, "data: %s\n\n", resp)
flusher.Flush()
time.Sleep(300 * time.Millisecond)
}
}
func main() {
http.HandleFunc("/stream", streamHandler)
fmt.Println("Server running at http://localhost:8080")
http.ListenAndServe(":8080", nil)
}
package main
import (
"github.com/gin-gonic/gin"
"io"
"time"
)
func main() {
r := gin.Default()
r.GET("/stream", func(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Stream(func(w io.Writer) bool {
messages := []string{"H", "e", "l", "l", "o", " ", "W", "o", "r", "l", "d", "!"}
for _, msg := range messages {
c.SSEvent("message", msg)
time.Sleep(200 * time.Millisecond)
return true
}
return false
})
})
r.Run(":8080")
}
package main
import (
"context"
"fmt"
"github.com/sashabaranov/go-openai"
"net/http"
)
func openAIStreamHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
client := openai.NewClient("YOUR_API_KEY")
req := openai.ChatCompletionRequest{
Model: openai.GPT3Dot5Turbo,
Messages: []openai.ChatCompletionMessage{
{Role: "user", Content: "用100字介绍Go语言的优势"},
},
Stream: true,
}
stream, err := client.CreateChatCompletionStream(context.Background(), req)
if err != nil {
fmt.Fprintf(w, "data: Error: %v\n\n", err)
flusher.Flush()
return
}
defer stream.Close()
for {
response, err := stream.Recv()
if err != nil {
break
}
if len(response.Choices) > 0 {
content := response.Choices[0].Delta.Content
if content != "" {
fmt.Fprintf(w, "data: %s\n\n", content)
flusher.Flush()
}
}
}
}
func main() {
http.HandleFunc("/ai-stream", openAIStreamHandler)
http.ListenAndServe(":8080", nil)
}
<!DOCTYPE html>
<html>
<body>
<div id="output"></div>
<script>
const output = document.getElementById('output');
const eventSource = new EventSource('http://localhost:8080/stream');
eventSource.onmessage = function(event) {
output.textContent += event.data;
output.scrollTop = output.scrollHeight;
};
eventSource.onerror = function(error) {
console.error('Stream error:', error);
eventSource.close();
};
</script>
</body>
</html>
-
错误处理:实现连接中断重连机制,使用context管理请求生命周期
-
-
安全性:添加CORS配置,验证API密钥,限制请求频率
-
调试技巧:在Goland中设置断点,监控流传输过程中的变量变化
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"net/http"
)
func main() {
http.HandleFunc("/local-ai", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher := w.(http.Flusher)
reqBody := map[string]interface{}{
"model": "llama3",
"prompt": "介绍Goland的主要功能",
"stream": true,
}
jsonBody, _ := json.Marshal(reqBody)
resp, _ := http.Post("http://localhost:11434/api/generate", "application/json", bytes.NewBuffer(jsonBody))
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
var result map[string]interface{}
json.Unmarshal(scanner.Bytes(), &result)
if content, ok := result["response"].(string); ok && content != "" {
fmt.Fprintf(w, "data: %s\n\n", content)
flusher.Flush()
}
}
})
http.ListenAndServe(":8080", nil)
}