Server-Sent Events (SSE) 接口实现

张开发
2026/4/16 3:46:37 15 分钟阅读

分享文章

Server-Sent Events (SSE) 接口实现
Server-Sent Events (SSE) 接口实现构建实时博客生成流本文是墨言博客助手 (InkWords)技术系列的第 20 篇完整源码请访问https://github.com/2692341798/InkWords引言为什么需要实时流式传输想象一下这样的场景你在一个在线文档编辑器里写文章每敲一个字页面就实时显示出来。这种即时反馈的体验远比点击生成→等待30秒→看到完整结果要好得多。在 AI 内容生成领域这个需求尤为迫切。一篇技术博客可能需要 2-3 分钟才能生成完毕如果让用户盯着空白页面等待体验会非常糟糕。这就是为什么我们需要Server-Sent Events (SSE)技术。什么是 SSESSE 是一种允许服务器主动向客户端推送数据的技术。与 WebSocket 的双向通信不同SSE 是单向的服务器推客户端收。这正好符合我们的需求AI 模型生成一段内容我们就推送给前端显示一段。前端请求生成后端接收请求启动 AI 生成任务生成第一段内容通过 SSE 推送前端实时显示生成第二段内容...持续生成...生成完成发送完成事件StreamAPI 结构设计让我们先看看StreamAPI的整体结构// StreamAPI handles SSE streaming requeststypeStreamAPIstruct{generatorService*service.GeneratorService decompositionService*service.DecompositionService}// NewStreamAPI creates a new StreamAPI instancefuncNewStreamAPI()*StreamAPI{returnStreamAPI{generatorService:service.NewGeneratorService(),decompositionService:service.NewDecompositionService(),}}代码解析StreamAPI是一个结构体包含两个服务实例generatorService负责单篇博客的生成decompositionService负责系列博客的分析和生成这种设计遵循了单一职责原则每个服务只做自己最擅长的事核心请求结构所有流式生成请求都使用相同的结构体typeGenerateRequeststruct{SourceContentstringjson:source_content// 源内容SourceTypestringjson:source_type// 内容类型Outline[]service.Chapterjson:outline// 大纲系列生成用GitURLstringjson:git_url// Git仓库地址SeriesTitlestringjson:series_title// 系列标题ParentIDstringjson:parent_id// 父博客ID续写用}核心处理器详解1. AnalyzeStreamHandlerGit仓库分析流这个处理器用于分析 Git 仓库生成博客大纲func(api*StreamAPI)AnalyzeStreamHandler(c*gin.Context){// 1. 解析请求体varreq GenerateRequestiferr:c.ShouldBindJSON(req);err!nil{c.JSON(http.StatusBadRequest,gin.H{error:Invalid request body})return}// 2. 验证必要参数ifreq.GitURL{c.JSON(http.StatusBadRequest,gin.H{error:git_url is required})return}// 3. 创建通信通道progressChan:make(chanstring)// 进度消息通道errChan:make(chanerror)// 错误通道// 4. 创建后台上下文即使客户端断开分析任务继续bgCtx:context.WithoutCancel(c.Request.Context())ctx:c.Request.Context()// 5. 使用 WaitGroup 确保 goroutine 完成varwg sync.WaitGroup wg.Add(1)// 6. 启动分析任务在单独的 goroutine 中gofunc(){deferwg.Done()api.decompositionService.AnalyzeStream(bgCtx,req.GitURL,progressChan,errChan)}()// 7. 设置 SSE 响应头c.Writer.Header().Set(Content-Type,text/event-stream)c.Writer.Header().Set(Cache-Control,no-cache)c.Writer.Header().Set(Connection,keep-alive)// 8. 核心流式处理逻辑c.Stream(func(w io.Writer)bool{select{case-ctx.Done():// 客户端断开连接gofunc(){for{select{case-progressChan:caseerr,ok:-errChan:if!ok||err!nil{return}}}}()returnfalsecaseerr,ok:-errChan:ifokerr!nil{c.SSEvent(error,err.Error())returnfalse}if!ok{errChannil}returntruecasemsg,ok:-progressChan:if!ok{c.SSEvent(done,[DONE])returnfalse}c.SSEvent(chunk,msg)returntruecase-time.After(15*time.Second):// 保活心跳防止代理超时c.SSEvent(ping,keepalive)returntrue}})// 9. 等待任务完成在单独的 goroutine 中gowg.Wait()}关键点解析通道通信使用 Go 的 channel 在 goroutine 之间传递数据上下文管理bgCtx确保分析任务不会因客户端断开而中断SSE 事件发送三种类型的事件chunk进度更新error错误信息done任务完成ping保活心跳超时处理15 秒发送一次心跳防止代理服务器断开连接2. GenerateBlogStreamHandler博客生成流这是最核心的生成处理器支持单篇和系列博客生成func(api*StreamAPI)GenerateBlogStreamHandler(c*gin.Context){// 1. 解析请求varreq GenerateRequestiferr:c.ShouldBindJSON(req);err!nil{c.JSON(http.StatusBadRequest,gin.H{error:Invalid request body})return}// 2. 创建通信通道chunkChan:make(chanstring)errChan:make(chanerror)ctx:c.Request.Context()// 3. 获取用户ID从认证中间件varuserID uuid.UUIDifv,exists:c.Get(user_id);exists{ifid,ok:v.(uuid.UUID);ok{userIDid}}ifuserIDuuid.Nil{// 测试回退生成临时UUIDuserIDuuid.New()}// 4. 根据是否有大纲决定生成模式iflen(req.Outline)0{// 系列生成模式varparentID uuid.UUIDifreq.ParentID!{parsedID,err:uuid.Parse(req.ParentID)iferrnil{parentIDparsedID}}ifparentIDuuid.Nil{parentIDuuid.New()}goapi.decompositionService.GenerateSeries(ctx,userID,parentID,req.SeriesTitle,req.Outline,req.SourceContent,req.SourceType,req.GitURL,chunkChan,errChan,)}else{// 单篇博客生成goapi.generatorService.GenerateBlogStream(ctx,userID,req.SourceContent,req.SourceType,chunkChan,errChan,)}// 5. 设置SSE头并开始流式响应c.Writer.Header().Set(Content-Type,text/event-stream)c.Writer.Header().Set(Cache-Control,no-cache)c.Writer.Header().Set(Connection,keep-alive)c.Stream(func(w io.Writer)bool{select{case-ctx.Done():// 客户端断开处理与AnalyzeStreamHandler类似gofunc(){for{select{case-chunkChan:caseerr,ok:-errChan:if!ok||err!nil{return}}}}()returnfalsecaseerr,ok:-errChan:ifokerr!nil{c.SSEvent(error,err.Error())returnfalse}if!ok{errChannil}returntruecasechunk,ok:-chunkChan:if!ok{c.SSEvent(done,[DONE])returnfalse}c.SSEvent(chunk,chunk)returntruecase-time.After(15*time.Second):c.SSEvent(ping,keepalive)returntrue}})}模式选择逻辑单篇生成直接调用generatorService.GenerateBlogStream系列生成需要处理父子关系调用decompositionService.GenerateSeries3. ContinueBlogStreamHandler续写生成流这个处理器允许用户基于已有的博客继续生成func(api*StreamAPI)ContinueBlogStreamHandler(c*gin.Context){// 1. 解析博客IDblogIDStr:c.Param(id)blogID,err:uuid.Parse(blogIDStr)iferr!nil{c.JSON(http.StatusBadRequest,gin.H{error:Invalid blog ID})return}// 2. 必须要有用户认证varuserID uuid.UUIDifv,exists:c.Get(user_id);exists{ifid,ok:v.(uuid.UUID);ok{userIDid}}ifuserIDuuid.Nil{c.JSON(http.StatusUnauthorized,gin.H{error:unauthorized})return}// 3. 创建通道并启动续写任务chunkChan:make(chanstring)errChan:make(chanerror)bgCtx:context.WithoutCancel(c.Request.Context())ctx:c.Request.Context()goapi.decompositionService.ContinueGeneration(bgCtx,userID,blogID,chunkChan,errChan)// 4. SSE流式响应逻辑与前面类似// ... 省略相似代码 ...}续写功能特点需要有效的用户认证基于特定的博客ID继续生成使用context.WithoutCancel确保任务不会中断SSE 事件格式详解SSE 事件有特定的格式要求。让我们看看 Gin 框架的c.SSEvent方法发送的是什么// 前端接收到的 SSE 数据格式event:chunkdata:这是生成的第一段内容event:chunkdata:这是生成的第二段内容event:pingdata:keepaliveevent:errordata:生成失败API调用超时event:donedata:[DONE]每个事件包含两部分event:- 事件类型chunk、error、done、pingdata:- 事件数据内容实战如何测试 SSE 接口使用 curl 测试# 测试分析流curl-XPOST http://localhost:8080/api/v1/stream/analyze\-HContent-Type: application/json\-d{git_url: https://github.com/2692341798/InkWords}\-N# 测试生成流curl-XPOST http://localhost:8080/api/v1/stream/generate\-HContent-Type: application/json\-d{source_content: Go语言并发编程, source_type: topic}\-N使用 JavaScript 测试!DOCTYPEhtmlhtmlbodydividoutput/divscriptconsteventSourcenewEventSource(/api/v1/stream/generate);eventSource.addEventListener(chunk,(event){document.getElementById(output).innerHTMLevent.data;});eventSource.addEventListener(error,(event){console.error(Error:,event.data);eventSource.close();});eventSource.addEventListener(done,(){console.log(Generation completed);eventSource.close();});/script/body/html性能优化与注意事项1. 内存管理使用 channel 进行数据传递避免大内存占用及时关闭不再使用的 channel使用context控制 goroutine 生命周期2. 连接管理15 秒心跳防止代理超时客户端断开时清理资源使用sync.WaitGroup确保 goroutine 正确退出3. 错误处理区分客户端错误和服务端错误错误信息通过 SSE 事件发送而不是 HTTP 状态码使用独立的 error channel 传递错误总结通过本文的讲解我们深入了解了 InkWords 后端 SSE 接口的实现架构清晰StreamAPI统一管理所有流式接口模式灵活支持单篇、系列、续写三种生成模式实时性强通过 SSE 实现真正的实时内容推送健壮性好完善的错误处理和资源管理扩展性强基于接口的设计便于未来添加新的流式功能SSE 技术为 AI 内容生成应用提供了优秀的用户体验让用户能够看到内容是如何一步步生成的而不是漫长等待后的惊喜。下期预告前端状态管理Zustand Store 设计在下一篇文章中我们将深入探讨前端如何优雅地管理 SSE 流式数据。你将学习到Zustand 状态管理库的核心概念如何设计流式数据的状态结构实时更新 UI 的最佳实践与 React 组件的高效集成方案敬请期待

更多文章