Goland 实现 AI 流式输出

Goland实现AI流式输出关键信息

1. 技术原理

  • 流式响应:通过HTTP分块传输(Transfer-Encoding: chunked)实现服务器向客户端持续发送数据
  • SSE协议:使用Content-Type: text/event-stream头部标识事件流,适合AI逐字输出场景
  • 双向通信:WebSocket适用于需要实时交互的复杂场景,支持客户端中断生成过程

2. 核心实现方案

方案一:基于标准库net/http
package main

import (
    "fmt"
    "net/http"
    "time"
)

func streamHandler(w http.ResponseWriter, r *http.Request) {
    // 设置SSE响应头
    w.Header().Set("Content-Type""text/event-stream")
    w.Header().Set("Cache-Control""no-cache")
    w.Header().Set("Connection""keep-alive")
    
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
        return
    }
    
    // 模拟AI生成过程
    responses := []string{"G""o""l""a""n""d"" ""A""I"" ""流""式""输""出"}
    for _, resp := range responses {
        fmt.Fprintf(w, "data: %s\n\n", resp)
        flusher.Flush()
        time.Sleep(300 * time.Millisecond)
    }
}

func main() {
    http.HandleFunc("/stream", streamHandler)
    fmt.Println("Server running at http://localhost:8080")
    http.ListenAndServe(":8080"nil)
}
方案二:使用Gin框架优化实现
package main

import (
    "github.com/gin-gonic/gin"
    "io"
    "time"
)

func main() {
    r := gin.Default()
    
    r.GET("/stream"func(c *gin.Context) {
        c.Header("Content-Type""text/event-stream")
        c.Header("Cache-Control""no-cache")
        c.Header("Connection""keep-alive")
        
        // 使用Gin的Stream方法简化流式传输
        c.Stream(func(w io.Writer) bool {
            // 模拟AI生成的流式数据
            messages := []string{"H""e""l""l""o"" ""W""o""r""l""d""!"}
            
            for _, msg := range messages {
                c.SSEvent("message", msg)
                time.Sleep(200 * time.Millisecond)
                return true // 继续流传输
            }
            
            return false // 结束流传输
        })
    })
    
    r.Run(":8080")
}

3. 对接AI服务示例(以OpenAI为例)

package main

import (
    "context"
    "fmt"
    "github.com/sashabaranov/go-openai"
    "net/http"
)

func openAIStreamHandler(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type""text/event-stream")
    w.Header().Set("Cache-Control""no-cache")
    w.Header().Set("Connection""keep-alive")
    
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
        return
    }
    
    // 初始化OpenAI客户端
    client := openai.NewClient("YOUR_API_KEY")
    
    // 创建流式请求
    req := openai.ChatCompletionRequest{
        Model: openai.GPT3Dot5Turbo,
        Messages: []openai.ChatCompletionMessage{
            {Role: "user", Content: "用100字介绍Go语言的优势"},
        },
        Stream: true,
    }
    
    stream, err := client.CreateChatCompletionStream(context.Background(), req)
    if err != nil {
        fmt.Fprintf(w, "data: Error: %v\n\n", err)
        flusher.Flush()
        return
    }
    defer stream.Close()
    
    // 处理流式响应
    for {
        response, err := stream.Recv()
        if err != nil {
            break
        }
        
        if len(response.Choices) > 0 {
            content := response.Choices[0].Delta.Content
            if content != "" {
                fmt.Fprintf(w, "data: %s\n\n", content)
                flusher.Flush()
            }
        }
    }
}

func main() {
    http.HandleFunc("/ai-stream", openAIStreamHandler)
    http.ListenAndServe(":8080"nil)
}

4. 前端接收实现

<!DOCTYPE html>
<html>
<body>
    <div id="output"></div>
    
    <script>
        const output = document.getElementById('output');
        const eventSource = new EventSource('http://localhost:8080/stream');
        
        eventSource.onmessage = function(event{
            output.textContent += event.data;
            // 自动滚动到底部
            output.scrollTop = output.scrollHeight;
        };
        
        eventSource.onerror = function(error{
            console.error('Stream error:', error);
            eventSource.close();
        };
    
</script>
</body>
</html>

5. 关键注意事项

  1. 错误处理:实现连接中断重连机制,使用context管理请求生命周期
  2. 性能优化:控制流传输速率,避免客户端处理压力过大
  3. 安全性:添加CORS配置,验证API密钥,限制请求频率
  4. 调试技巧:在Goland中设置断点,监控流传输过程中的变量变化

6. 本地大模型集成(以Ollama为例)

package main

import (
    "bufio"
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
)

func main() {
    http.HandleFunc("/local-ai"func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type""text/event-stream")
        w.Header().Set("Cache-Control""no-cache")
        w.Header().Set("Connection""keep-alive")
        
        flusher := w.(http.Flusher)
        
        // 构建请求体
        reqBody := map[string]interface{}{
            "model":  "llama3",
            "prompt""介绍Goland的主要功能",
            "stream"true,
        }
        jsonBody, _ := json.Marshal(reqBody)
        
        // 调用本地Ollama服务
        resp, _ := http.Post("http://localhost:11434/api/generate""application/json", bytes.NewBuffer(jsonBody))
        defer resp.Body.Close()
        
        scanner := bufio.NewScanner(resp.Body)
        for scanner.Scan() {
            var result map[string]interface{}
            json.Unmarshal(scanner.Bytes(), &result)
            
            if content, ok := result["response"].(string); ok && content != "" {
                fmt.Fprintf(w, "data: %s\n\n", content)
                flusher.Flush()
            }
        }
    })
    
    http.ListenAndServe(":8080"nil)
}
阅读: 19 | 发布时间: 2025-08-14 21:52:14