package ws2 import ( "context" "encoding/json" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gorilla/websocket" "github.com/tiger1103/gfast/v3/api/constant" "log" "strings" "time" ) type OpenPushFlowReq struct { g.Meta `path:"/device/open" method:"post" tags:"视频安全帽相关" summary:"开启推流"` DevNum string `json:"devNum" dc:"设备id"` } type OpenPushFlowRes struct { Url string `json:"Url" dc:"返回地址"` } // 设备地址的 Map 映射通道 var deviceChannels = make(map[string]chan string) func (w WsRouter) OpenPushFlow(ctx context.Context, req *OpenPushFlowReq) (res *OpenPushFlowRes, err error) { // 创建设备ID对应的通道 deviceChan := make(chan string, 1) // 缓冲为1 deviceChannels[req.DevNum] = deviceChan // 触发WS的推流 SendDeviceEnablesPushFlow(constant.Conn, req.DevNum) // 等待接收 URL 或超时 select { case url := <-deviceChan: res = &OpenPushFlowRes{Url: url} case <-time.After(time.Second * 10): // 设置超时时间为10秒 err = fmt.Errorf("timeout waiting for URL") } // 清理工作 delete(deviceChannels, req.DevNum) close(deviceChan) return res, err } // 发送报文指定设备开启推流 func SendDeviceEnablesPushFlow(conn *websocket.Conn, DeviceId string) { activeDevicesMessage := map[string]string{ "act": "ma_open_rtsp", "device_id": DeviceId, } conn.WriteJSON(activeDevicesMessage) } type MaOpenRtspResponse struct { Cmd string `json:"cmd"` DeviceId string `json:"device_id"` Status bool `json:"status"` Msg string `json:"msg"` ApiUrl string `json:"api_url"` PlayUrl []string `json:"play_url"` MsgCode string `json:"msg_code"` WebrtcUrl string `json:"webrtc_url"` } // 处理函数 func HandleDeviceEnablesPushFlow(jsonString string) { var response MaOpenRtspResponse err := json.Unmarshal([]byte(jsonString), &response) if err != nil { log.Fatal("Error decoding JSON to struct:", err) } for _, url := range response.PlayUrl { // 截取 FLV 结尾的视频并发送到 HTTP 响应的通道中 if strings.HasSuffix(url, ".flv") { if deviceChan, ok := deviceChannels[response.DeviceId]; ok { deviceChan <- url // 将URL发送到对应的通道 } } } }