package mqtt import ( "context" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gtimer" "github.com/tiger1103/gfast/v3/third/plane/dj" "github.com/tiger1103/gfast/v3/third/plane/event" "strings" "time" ) var timer *gtimer.Timer var client mqtt.Client var eventMap map[string]*event.EventPublisher const ( osd = "osd" //设备端定频向云平台推送的设备属性(properties), 具体内容范围参见物模型内容 state = "state" //设备端按需上报向云平台推送的设备属性(properties), 具体内容范围参见物模型内容 services_reply = "services_reply" //设备对 service 的回复、处理结果 events = "events" //设备端向云平台发送的,需要关注和处理的事件。 比如SD满了,飞机解禁禁飞区等信息(事件范围参见物模型内容) status = "status" //设备上下线、更新拓扑 set_reply = "set_reply" //设备属性设置的响应 requests = "requests" //设备端向云平台发送请求,为了获取一些信息,比如上传的临时凭证 services = "services" //云平台向设备发送的服务(具体service identifier 见物模型内容)。 ) var MsgCallBackMap map[string]CB func InitMQTT() { eventMap = make(map[string]*event.EventPublisher) timer = gtimer.New() initMsgs() dj.InitError() connect() } func initMsgs() { MsgCallBackMap = make(map[string]CB) msgs := []CB{ {osd, dealOsd, nil}, {state, deal_state, nil}, {requests, deal_requests, nil}, {services_reply, deal_services_reply, nil}, {status, deal_status, nil}, {events, deal_events, nil}, {services, deal_services, nil}, } for _, msg := range msgs { MsgCallBackMap[msg.Flag] = msg } } type CB struct { Flag string Recv func(mqtt.Client, mqtt.Message) //接收消息 Reply func(mqtt.Client, mqtt.Message) //回复消息 } func connect() { host := g.Cfg().MustGet(gctx.New(), "mqtt.host").String() port := g.Cfg().MustGet(gctx.New(), "mqtt.port").String() username := g.Cfg().MustGet(gctx.New(), "mqtt.username").String() password := g.Cfg().MustGet(gctx.New(), "mqtt.password").String() clientid := g.Cfg().MustGet(gctx.New(), "mqtt.clientid").String() topics := g.Cfg().MustGet(gctx.New(), "mqtt.topics").String() fmt.Println(host, port, username, password, clientid, topics, "tcp://"+host+":"+port) opts := mqtt.NewClientOptions() opts.AddBroker("tcp://" + host + ":" + port) // 设置MQTT代理服务器地址 opts.SetClientID(clientid) // 设置客户端ID opts.SetPassword(password) // 设置客户端ID opts.SetUsername(username) // 设置客户端ID // 处理消息接收 opts.SetDefaultPublishHandler(receiveHandler) opts.SetConnectionLostHandler(disconnected) opts.SetOnConnectHandler(connected) opts.SetKeepAlive(1000 * time.Second) //超时等待,防止客户端掉线 opts.SetAutoReconnect(true) client = mqtt.NewClient(opts) // 连接MQTT代理服务器 if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } arr := strings.Split(topics, ",") for _, topic := range arr { // 订阅主题 fmt.Println("订阅主题", topic) if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) } //fmt.Println(topic) } //ctx := gctx.New() ////格式化 //DeviceFormat(ctx, "4SEDL9C001X8GE") //DroneFormat(ctx, "4SEDL9C001X8GE") //DeviceFormat(ctx, "7CTDM4100BG7HV") //DroneFormat(ctx, "7CTDM4100BG7HV") //飞机升级 //OtaCreateFunc(ctx, "8UUXN4P00A06NK") //gtimer.SetTimeout(gctx.New(), 2*time.Second, func(ctx context.Context) { //sn := "4SEDL9C001X8GE" //err, d := speaker_audio_play_start(sn) //if err != nil { // fmt.Println(err) // return //} //整一个pcm的音频文件 // point := dj.Point{} // point.Longitude = 106.541923087 // point.Latitude = 23.458531397 // point.Height = 823.8873291015625 + 10 // //err, d := Flight_authority_grab(sn) // //if err != nil { // // return // //} // err, d := Fly_to_point(sn, point) //fmt.Println(d) // fmt.Println(err) // // sn = "4TADL210010014" //中煤广西机库的sn // //// //sn = "4TADL2E001002D" //机库的sn // //// //sn = "1581F5BMD2323001S928" //飞机的sn // ////dev_sn := "1581F5BMD23280014131" //吉林飞机的sn // //dev_sn := "1581F5BMD238V00172JR" //吉林飞机的sn // //camera_index := "165-0-7" //机库的摄像头id // ////camera_index = "53-0-0" //飞机摄像头1的id // //camera_index = "39-0-7" //飞机摄像头2的id // //Debug_mode_open(sn) //调试模式,先调试模式才能飞行器开机,推流不需要調試 // //Debug_mode_close(sn) // //err, d := Cover_open(sn) //打开舱盖 // //err, d := Cover_close(sn) // //err, d := Drone_open(sn) //飞行器开机 // //err, d := Drone_close(sn) //飞行器关机 // //err, d := Device_reboot(sn) // // // //fmt.Println(d) // //if err != nil { // // fmt.Println(err) // //} //打开舱盖 // ////fmt.Println("打开舱盖") // //err, d := Live_start_push(sn, sn, camera_index) // //err, d := Live_stop_push(sn, sn, camera_index) // // //fmt.Println(d) // //if err != nil { // // fmt.Println(err) // // return // //} //关闭舱盖 // ////fmt.Println(d.Data.Result) // //// //device_reboot() // //// //if err != nil { // //// // return // //// //} // //// //fmt.Println(d) // //// //live_stop_push(sn, sn, camera_index) // //// //}) } func receiveHandler(client mqtt.Client, msg mqtt.Message) { topic := msg.Topic() arrs := strings.Split(topic, "/") tp := arrs[len(arrs)-1] if v, ok := MsgCallBackMap[tp]; ok { v.Recv(client, msg) if v.Reply != nil { v.Reply(client, msg) } } else { fmt.Println("非法topic", topic) } } func disconnected(client mqtt.Client, err error) { fmt.Println("断开了,准备重连") if err != nil { fmt.Println(err) g.Log("uav").Error(gctx.New(), err) } timer.Start() timer.Add(gctx.New(), time.Second*5, func(ctx context.Context) { connect() }) } func connected(client mqtt.Client) { fmt.Println("链接成功") timer.Stop() }