Files
zmkgC/third/plane/mqtt/msg_deal.go

1283 lines
40 KiB
Go
Raw Permalink Normal View History

2025-07-07 20:11:59 +08:00
package mqtt
import (
"encoding/json"
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/crypto/gmd5"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gtime"
_ "github.com/tiger1103/gfast-cache/cache"
"github.com/tiger1103/gfast/v3/api/v1/common/coryCommon"
"github.com/tiger1103/gfast/v3/internal/app/system/dao"
manageDeviceLogic "github.com/tiger1103/gfast/v3/internal/app/system/logic/manageDevice"
manageOperationLog "github.com/tiger1103/gfast/v3/internal/app/system/logic/manageOperationLog"
logic "github.com/tiger1103/gfast/v3/internal/app/system/logic/manageTaskRecord"
logicManageTaskRecordResource "github.com/tiger1103/gfast/v3/internal/app/system/logic/manageTaskRecordResource"
"github.com/tiger1103/gfast/v3/third/plane/dj"
"github.com/tiger1103/gfast/v3/third/plane/event"
"github.com/tiger1103/gfast/v3/third/plane/globe"
tool "github.com/tiger1103/gfast/v3/utility/coryUtils"
"golang.org/x/net/context"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
// var (
//
// DroneCoordinates = "sn:" //redisKey 无人机坐标
// RainfallStr = "air:rainfall:" //redisKey 降雨量 {"0":"无雨","1":"小雨","2":"中雨","3":"大雨"}
// YunTai = "air:yuntai:" //redisKey 云台重置
//
// )
var (
DroneCoordinates = "sn:" //redisKey 无人机坐标
RainfallStr = "air:rainfall:" //redisKey 降雨量 {"0":"无雨","1":"小雨","2":"中雨","3":"大雨"}
YunTai = "air:yuntai:" //redisKey 云台重置
Host string
Rtmp string
Http string
)
func init() {
Host = g.Cfg().MustGet(context.Background(), "live.host").String()
Rtmp = g.Cfg().MustGet(context.Background(), "live.rtmp").String()
Http = g.Cfg().MustGet(context.Background(), "live.http").String()
}
/*从topic中获取gatewaysn*/
func GetGateWaySnFromTopic(topic string) string {
split := strings.Split(topic, "/")
return split[len(split)-2]
}
func dealOsd(client mqtt.Client, msg mqtt.Message) {
//存储飞机与飞行器的数据
cory := dj.CoryAirportOrAircraft{}
//解析数据
parsingOsd3(client, msg)
//0、获取公共信息
pc := dj.Public{}
err := json.Unmarshal(msg.Payload(), &pc)
if err != nil {
return
}
sn := strings.Split(msg.Topic(), "/")[2]
key := DroneCoordinates + pc.Gateway
cory.HangarSn = pc.Gateway
//fmt.Println("osd上报------ ", string(msg.Payload()))
//1、Gateway与sn相同代表机场否则代表无人机信息
payload := string(msg.Payload())
if pc.Gateway == sn {
cory.Airport = string(msg.Payload())
//if strings.Contains(payload, "drone_battery_maintenance_info") { //无人机电池维护信息
// dealOsd1Cory(client, cory, msg)
//} else if strings.Contains(payload, "wireless_link") { //无线电路
// dealOsd2Cory(client, cory, msg)
//} else if strings.Contains(payload, "network_state") { //网络状态
// dealOsd3Cory(client, cory, msg)
//} else if strings.Contains(payload, "live_status") { //直播状态
// dealOsd4Cory(client, cory, msg)
//}
if strings.Contains(payload, "network_state") { //网络状态
dealOsd3Cory(client, cory, msg)
} else {
dealOsd4Cory(client, cory, msg)
}
} else {
var entity *dj.M30tEntity
json.Unmarshal(msg.Payload(), &entity)
//数据不能为null为null就不需要在执行下一步
if entity != nil {
//1、获取当前飞机的坐标然后存储到redis中
ctx := gctx.New()
result := fmt.Sprintf("%v,%v,%v", entity.Data.Longitude, entity.Data.Latitude, entity.Data.Elevation)
_, err := g.Redis().Set(ctx, key, result)
if err != nil {
fmt.Println("无人机坐标存储数据失败:", err)
return
}
cory.Aircraft = string(msg.Payload())
cory.AircraftSn = sn
//2、判断条件是否触发立即返航条件电量
percent := entity.Data.Battery.CapacityPercent
//power := entity.Data.Battery.ReturnHomePower
if percent <= 70 {
go Return_home(ctx, pc.Gateway)
}
//3、发送飞机飞行信息
start_push_cory(globe.WS_Status_map, cory)
}
}
}
// parsingOsd3 解析osd3
func parsingOsd3(client mqtt.Client, msg mqtt.Message) {
payload := string(msg.Payload())
if strings.Contains(payload, "rainfall") { //解析降雨量,后续下发任务的时候判断降雨量
osd3 := dj.OSD3{}
err := json.Unmarshal(msg.Payload(), &osd3)
if err != nil {
g.Log("uav").Error(gctx.New(), err)
}
g.Redis().Set(gctx.New(), RainfallStr+osd3.Gateway, osd3.Data.Rainfall)
}
}
// func dealOsd1Cory(client mqtt.Client, info dj.CoryAirportOrAircraft, msg mqtt.Message) {
// start_push_cory(globe.WS_Status_map, info)
// }
//
// func dealOsd2Cory(client mqtt.Client, info dj.CoryAirportOrAircraft, msg mqtt.Message) {
// start_push_cory(globe.WS_Status_map, info)
// }
func dealOsd3Cory(client mqtt.Client, info dj.CoryAirportOrAircraft, msg mqtt.Message) {
osd3 := dj.OSD3{}
err := json.Unmarshal(msg.Payload(), &osd3)
if err != nil {
return
}
point := dj.Point{}
point.Longitude = osd3.Data.Longitude
point.Latitude = osd3.Data.Latitude
point.Height = osd3.Data.Height
sn := GetGateWaySnFromTopic(msg.Topic())
savePosition(sn, point)
//从redis中读取无人机最后一次的数据
getValue, _ := g.Redis().Get(gctx.New(), DroneCoordinates+osd3.Gateway)
info.Coordinates = getValue.String()
start_push_cory(globe.WS_Status_map, info)
}
func dealOsd4Cory(client mqtt.Client, info dj.CoryAirportOrAircraft, msg mqtt.Message) {
start_push_cory(globe.WS_Status_map, info)
}
/*存储机场位置*/
func savePosition(gateway_sn string, point dj.Point) {
manageDeviceLogic.LongitudeAndLatitudeEdit(gateway_sn, point)
}
// deal_events 事件
func deal_events(client2 mqtt.Client, message mqtt.Message) {
pub := dj.Public{}
err := json.Unmarshal(message.Payload(), &pub)
if err != nil {
return
}
var _map globe.WS_MAP
switch pub.Method {
case dj.OtaProgress:
fmt.Println("固件升级进度:", string(message.Payload()))
break
case dj.Drone_open:
fmt.Println("飞机开机进度")
fmt.Println(string(message.Payload()))
StatusReply(message)
_map = globe.WS_Progress_drone_open_map
wspush(message, _map)
break
case dj.Drone_close:
StatusReply(message)
fmt.Println("飞机关机进度")
fmt.Println(string(message.Payload()))
_map = globe.WS_Progress_drone_close_map
wspush(message, _map)
break
case dj.Cover_open:
StatusReply(message)
_map = globe.WS_Progress_cover_open_map
wspush(message, _map)
break
case dj.Cover_close:
StatusReply(message)
_map = globe.WS_Progress_cover_close_map
wspush(message, _map)
break
case dj.Device_reboot:
fmt.Println("设备重启进度")
fmt.Println(string(message.Payload()))
_map = globe.WS_Progress_device_reboot_map
StatusReply(message)
wspush(message, _map)
break
case dj.Putter_open:
StatusReply(message)
_map = globe.WS_Progress_putter_open_map
wspush(message, _map)
break
case dj.Putter_close:
StatusReply(message)
_map = globe.WS_Progress_putter_close_map
wspush(message, _map)
break
case dj.Flighttask_ready: //任务就绪
fmt.Println("任务就绪")
fmt.Println(string(message.Payload()))
break
case dj.Return_home_info: //返航信息
fmt.Println("返航信息")
fmt.Println(string(message.Payload()))
break
case dj.Flighttask_progress: //上报航线任务进度
fmt.Println("航线执行进度")
go dealWaylineProgress(message)
break
case dj.File_upload_callback: //媒体文件上传结果上报
fmt.Println("媒体文件上传结果上报")
fmt.Println(string(message.Payload()))
//解析回传结果
fileUploadCallbackEntity := dj.FileUploadCallbackEntity{}
if err := json.Unmarshal(message.Payload(), &fileUploadCallbackEntity); err != nil {
log.Printf("媒体文件上传结果上报数据解析失败:%v", err)
}
//将解析的数据存储到mysql
logicManageTaskRecordResource.StoresBackMediaFilesToTheDatabaseFunc(gctx.New(), &fileUploadCallbackEntity)
break
case dj.Fly_to_point_progress: //flyto 执行结果事件通知
fmt.Println("flyto 执行结果事件通知")
fmt.Println(string(message.Payload()))
break
case dj.Fileupload_progress: //文件上传进度通知(远程日志)
fmt.Println("文件上传进度通知(远程日志)")
fmt.Println(string(message.Payload()))
break
}
}
// deal_services 事件
func deal_services(client2 mqtt.Client, message mqtt.Message) {
pub := dj.Public{}
err := json.Unmarshal(message.Payload(), &pub)
if err != nil {
return
}
//根据方法得到对应得数据信息
switch pub.Method {
case dj.FileuploadList: //获取设备可上传的文件列表
break
case dj.FileuploadStart: //发起日志文件上传
break
case dj.OtaCreate: //发起日志文件上传
break
}
}
func StatusReply(message mqtt.Message) {
public := dj.Public{}
err := json.Unmarshal(message.Payload(), &public)
if err != nil {
return
}
reply := dj.StatusReply{}
reply.Method = public.Method
reply.Tid = public.Tid
reply.Bid = public.Bid
reply.Timestamp = gtime.TimestampMilli()
reply.Data.Result = 0
sn := GetGateWaySnFromTopic(message.Topic())
marshal, _ := json.Marshal(reply)
client.Publish(dj.GetEventsReplyTopic(sn), 1, true, string(marshal))
}
func dealWaylineProgress(message mqtt.Message) (err error) {
waylineProgress := dj.WaylineProgress{}
err = json.Unmarshal(message.Payload(), &waylineProgress)
if err != nil {
fmt.Println(err)
return
}
entity := waylineProgress.Data.Output
//if waylineProgress.Data.Result > 0 {
// err = Geterror(waylineProgress.Data.Result)
// fmt.Println(err)
// return
//}
ctx := gctx.New()
//1、根据计划ID得到最近一次飞行的航线信息然后给当前航线填入航线ID等信息
//if strings.Contains(str, "wayline_id") {
if entity.Status == "ok" || entity.Status == "paused" {
gmp := g.Map{
"route_id": entity.Ext.WaylineId,
"interrupt_cause": entity.Ext.BreakPoint.BreakReason,
"dd_index": entity.Ext.BreakPoint.Index,
"dd_state": entity.Ext.BreakPoint.State,
"course_reversal": tool.MillisecondTimestamp(),
"accomplish": tool.MillisecondTimestamp(),
}
if entity.Ext.BreakPoint.Progress > 0 {
gmp["dd_progress"] = entity.Ext.BreakPoint.Progress
}
if entity.Ext.TrackId != "" {
gmp["track_id"] = entity.Ext.TrackId
}
dao.ManageTaskRecord.Ctx(ctx).
Where("flight_id", entity.Ext.FlightId).
OrderDesc("id").
Update(gmp)
}
//}
//2、航线为>0或者100的时候变更云台的方向
gatewaySn := GetGateWaySnFromTopic(message.Topic())
if entity.Progress.Percent >= 0 && entity.Progress.Percent < 95 {
key := YunTai + gatewaySn + ":1"
get, _ := g.Redis().Get(ctx, key)
if !get.Bool() {
sxt, _ := dealWaylineProgressSQL(ctx, gatewaySn)
err, reply := Gimbal_reset(ctx, gatewaySn, sxt, 1)
if err == nil && reply.Data.Result == 0 {
//存放redis 每60过期秒
g.Redis().SetEX(ctx, key, true, 60)
}
}
}
g.Log("uav").Error(ctx, waylineProgress.Data.Output.Ext.FlightId+"返航进度---- "+strconv.Itoa(entity.Progress.Percent))
if entity.Progress.Percent == 100 {
key := YunTai + gatewaySn + ":0"
get, _ := g.Redis().Get(ctx, key)
if !get.Bool() {
sxt, _ := dealWaylineProgressSQL(ctx, gatewaySn)
err, reply := Gimbal_reset(ctx, gatewaySn, sxt, 0)
if err == nil && reply.Data.Result == 0 {
//存放redis 每60秒过期
g.Redis().SetEX(ctx, key, true, 60)
}
if entity.Status == "ok" {
//到达一百的时候变更当前航线任务的状态
dao.ManageTaskRecord.Ctx(ctx).
Where("flight_id", waylineProgress.Data.Output.Ext.FlightId).
WhereNull("accomplish").
Update(g.Map{"accomplish": tool.MillisecondTimestamp()})
}
}
}
return err
}
func dealWaylineProgressSQL(ctx context.Context, gatewaySn string) (string, error) {
sxt, err := g.DB().Model("manage_device_video").Ctx(ctx).
Where("type = 1").Where("gateway_sn", gatewaySn).OrderDesc("id").Limit(1).Fields("son_sn").
Cache(gdb.CacheOption{
Duration: time.Hour,
Force: false,
}).Value()
return sxt.String(), err
}
func wspush(msg mqtt.Message, _map globe.WS_MAP) {
if _map != nil {
start_push(_map, msg)
}
}
func start_push(_map globe.WS_MAP, msg mqtt.Message) {
globe.MutexRw.Lock()
for _, socket := range _map {
socket.WriteMessage(1, msg.Payload())
}
globe.MutexRw.Unlock()
}
// cory 提供给osd使用的用来区分机场和无人机的数据
func start_push_cory(_map globe.WS_MAP, info dj.CoryAirportOrAircraft) {
globe.MutexRw.Lock()
for _, socket := range _map {
marshal, _ := json.Marshal(info)
socket.WriteMessage(1, marshal)
}
globe.MutexRw.Unlock()
}
// deal_requests 请求
func deal_requests(client mqtt.Client, msg mqtt.Message) {
pub := dj.Public{}
err := json.Unmarshal(msg.Payload(), &pub)
if err != nil {
fmt.Println(err)
return
}
switch pub.Method {
case dj.Airport_organization_bind: //处理数据库
fmt.Println("设备绑定到组织")
fmt.Println(string(msg.Payload()))
break
case dj.Airport_organization_get:
break
case dj.Flighttask_resource_get:
fmt.Println("获取任务资源了")
fmt.Println(string(msg.Payload()))
flighttask_resource_get(client, msg)
break
case dj.Airport_bind_status:
break
case dj.Config:
pb := dj.Public{}
if err = json.Unmarshal(msg.Payload(), &pb); err != nil {
fmt.Println(err)
}
reply := dj.ConfigReply{}
reply.SetDefaultValue()
reply.Tid = pb.Tid
reply.Bid = pb.Bid
marshal, err := json.Marshal(reply)
if err != nil {
fmt.Println(err)
}
topic := msg.Topic() + "_reply"
fmt.Println("配置更新开始推送", topic, string(marshal))
client.Publish(topic, 1, true, string(marshal))
break
case dj.Storage_config_get:
pb := dj.Public{}
if err = json.Unmarshal(msg.Payload(), &pb); err != nil {
fmt.Println(err)
}
get := dj.StorageConfigGetReply{}
get.SetDefaultValue()
get.Tid = pb.Tid
get.Bid = pb.Bid
marshal, err := json.Marshal(get)
if err != nil {
fmt.Println(err)
}
topic := msg.Topic() + "_reply"
fmt.Println("临时凭证开始推送", topic, string(marshal))
client.Publish(topic, 1, true, string(marshal))
break
}
//fmt.Println("获取数据了", string(msg.Payload()))
}
func deal_state(client mqtt.Client, msg mqtt.Message) {
// 获取当前可执行文件所在的目录
dir, _ := os.Getwd()
// 拼接文件路径
filePath := filepath.Join(dir, "output.txt")
// 以追加模式打开文件,如果文件不存在则创建
file, _ := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
defer file.Close()
// 写入内容并添加两个换行符
file.WriteString(string(msg.Payload()) + "\n\n")
payload := string(msg.Payload())
fmt.Println("设备端按需上报向云平台推送的设备属性", payload)
var ai *dj.AircraftInformation
err := json.Unmarshal(msg.Payload(), &ai)
if err != nil {
return
}
if ai != nil {
start_push(globe.WS_Status_map, msg)
}
if strings.Contains(payload, "live_capacity") {
var lscue *LiveStreamingCapabilityUpdateEntity
err := json.Unmarshal(msg.Payload(), &lscue)
if err != nil {
fmt.Println("《直播能力更新》数据转换失败!")
}
var gatewaySn = ""
var mp []g.Map
for i, data := range lscue.Data.LiveCapacity.DeviceList {
if i == 0 { //机场 与 子设备(摄像头)
for _, dataOne := range data.CameraList {
count, _ := g.DB().Model("manage_device_video").
Where("gateway_sn", data.Sn).
Where("device_sn", data.Sn).
Where("son_sn", dataOne.CameraIndex).Count()
if count == 0 {
mp = append(mp, g.Map{
"type": i,
"gateway_sn": data.Sn,
"device_sn": data.Sn,
"son_sn": dataOne.CameraIndex,
})
if data.Sn != "" {
gatewaySn = data.Sn
}
}
}
}
if i == 1 { //飞机 与 子设备(摄像头)
for _, dataOne := range data.CameraList {
count, _ := g.DB().Model("manage_device_video").
Where("gateway_sn", gatewaySn).
Where("device_sn", data.Sn).
Where("son_sn", dataOne.CameraIndex).Count()
if count == 0 && gatewaySn != "" {
mp = append(mp, g.Map{
"type": i,
"gateway_sn": gatewaySn,
"device_sn": data.Sn,
"son_sn": dataOne.CameraIndex,
})
}
}
}
}
if len(mp) > 0 {
g.DB().Model("manage_device_video").Insert(mp)
}
}
}
func deal_status(client mqtt.Client, msg mqtt.Message) {
pub := dj.Public{}
err := json.Unmarshal(msg.Payload(), &pub)
if err != nil {
fmt.Println(err)
return
}
switch pub.Method {
case dj.Update_topo: //cory20250605
fmt.Println("准备拓扑更新:", string(msg.Payload()))
var reply dj.StatusReply
reply.Timestamp = gtime.Now().TimestampMilli()
reply.Bid = pub.Bid
reply.Tid = pub.Tid
reply.Method = "update_topo"
reply.Data.Result = 0
marshal, err := json.Marshal(reply)
if err != nil {
return
}
arr := strings.Split(msg.Topic(), "/")
client.Publish("sys/product/"+arr[2]+"/status_reply", 1, true, marshal)
break
}
st := dj.Status{}
json.Unmarshal(msg.Payload(), &st)
var reply dj.StatusReply
reply.Timestamp = st.Timestamp
reply.Bid = st.Bid
reply.Tid = st.Tid
reply.Method = "update_topo"
marshal, err := json.Marshal(reply)
if err != nil {
return
}
arr := strings.Split(msg.Topic(), "/")
client.Publish("sys/product/"+arr[2]+"/status_reply", 1, true, marshal)
}
/*回复航线*/
func flighttask_resource_get(client mqtt.Client, msg mqtt.Message) {
get := dj.FlighttaskResourcGet{}
err := json.Unmarshal(msg.Payload(), &get)
if err != nil {
return
}
//get.Data.FlightId
rsp := dj.FlighttaskResourcGetReply{}
rsp.Tid = get.Tid
rsp.Bid = get.Bid
rsp.Timestamp = gtime.Now().TimestampMilli()
rsp.Method = get.Method
arr := strings.Split(msg.Topic(), "/")
topic := dj.GetRequestsReplyTopic(arr[len(arr)-2])
f, err := logic.GetManageAirlineFile(get.Data.FlightId)
if err != nil {
return
}
rsp.Data.Result = 0
rsp.Data.Output.File.Fingerprint = f.Fingerprint
rsp.Data.Output.File.Url = f.Url
ReplyMsg(topic, rsp)
//rsp.Gateway = get.
}
type File struct {
Url string `json:"url"`
Fingerprint string `json:"fingerprint"`
}
func GetResource(flight_id string) File {
return File{}
}
//func Live_start_push(gateway_sn, devsn, camera_index string) (error, *dj.StartLiveReply) {
// st := dj.StartLive{}
// st.SetDefaultValue()
// st.Data.UrlType = 1
// //st.Data.Url = "serverIP=116.141.0.161&serverPort=7100&serverID=22010500002000000090&agentID=22010500002000000093&agentPassword=hik13579+&localPort=7060&channel=22010500002000000093"
// //st.Data.Url = "serverIP=jl.yj-3d.com&serverPort=15060&serverID=34020000002000000001&agentID=34020000001320000010&agentPassword=0000000&localPort=7060&channel=34020000001320000010"
// st.Data.VideoQuality = 4
// st.Data.VideoId = devsn + "/" + camera_index + "/normal-0"
// md5 := gmd5.MustEncryptString(st.Data.VideoId)
// st.Data.Url = "rtmp://" + coryCommon.Global + ":9980/live/" + md5
// topic := dj.GetServiceTopic(gateway_sn)
//
// //marshal, _ := json.Marshal(st)
// //fmt.Println("数据为:", string(marshal))
//
// response := send(topic, st, st.Tid)
//
// sl := dj.StartLiveReply{}
// err := json.Unmarshal([]byte(response), &sl)
// if err != nil {
// return err, nil
// }
// sl.Data.Url = "http://" + coryCommon.Global + ":9991/live/" + md5 + ".flv"
//
// return nil, &sl
//}
func Live_start_push(gateway_sn, devsn, camera_index string) (error, *dj.StartLiveReply) {
st := dj.StartLive{}
st.SetDefaultValue()
st.Data.UrlType = 1
//st.Data.Url = "serverIP=116.141.0.161&amp;serverPort=7100&amp;serverID=22010500002000000090&amp;agentID=22010500002000000093&amp;agentPassword=hik13579+&amp;localPort=7060&amp;channel=22010500002000000093"
//st.Data.Url = "serverIP=jl.yj-3d.com&amp;serverPort=15060&amp;serverID=34020000002000000001&amp;agentID=34020000001320000010&amp;agentPassword=0000000&amp;localPort=7060&amp;channel=34020000001320000010"
st.Data.VideoQuality = 4
st.Data.VideoId = devsn + "/" + camera_index + "/normal-0"
md5 := gmd5.MustEncryptString(st.Data.VideoId)
st.Data.Url = "rtmp://" + Host + ":" + Rtmp + "/live/" + md5
topic := dj.GetServiceTopic(gateway_sn)
//marshal, _ := json.Marshal(st)
//fmt.Println("数据为:", string(marshal))
response := send(topic, st, st.Tid)
sl := dj.StartLiveReply{}
err := json.Unmarshal([]byte(response), &sl)
if err != nil {
return err, nil
}
sl.Data.Url = "http://" + Host + ":" + Http + "/live/" + md5 + ".live.flv"
return nil, &sl
}
// GB28181 多路推流无效废弃GB28181
//func Live_start_push(gateway_sn, devsn, camera_index string) (error, *dj.StartLiveReply) {
// gb := "34020000002000000001"
// port := strings.ReplaceAll(camera_index, "-", "")
// qvr := modifyString(gb, port)
// st := dj.StartLive{}
// st.SetDefaultValue()
// st.Data.UrlType = 3
// st.Data.VideoQuality = 4
// st.Data.VideoId = devsn + "/" + camera_index + "/normal-0"
// st.Data.Url = "serverIP=119.45.210.154&serverPort=15060&serverID=" + gb + "&agentID=" + qvr + "&agentPassword=12345678&localPort=" + port + "&channel=" + qvr
// fmt.Println(st.Data.Url, gateway_sn)
// topic := dj.GetServiceTopic(gateway_sn)
// response := send(topic, st, st.Tid)
// sl := dj.StartLiveReply{}
// err := json.Unmarshal([]byte(response), &sl)
// if err != nil {
// return err, nil
// }
// sl.Data.Url = "http://119.45.210.154:10000/api/v1/stream/start?serial=" + qvr + "&code=" + qvr
//
// return nil, &sl
//}
func modifyString(gb, qvr string) string {
originalString := gb
targetLength := len(qvr)
resultString := ""
// 如果原始字符串长度小于目标长度,则直接拼接
if len(originalString) < targetLength {
resultString = originalString + strings.Repeat("0", targetLength-len(originalString)) + qvr
} else {
// 将字符串转换为字节数组
strBytes := []byte(originalString)
// 修改最右边的字符
copy(strBytes[len(strBytes)-targetLength:], qvr)
// 将字节数组转换回字符串
resultString = string(strBytes)
}
return resultString
}
func Live_stop_push(gateway_sn, devsn, camera_index string) (error, *dj.StopLiveReply) {
st := dj.StopLive{}
st.SetDefaultValue()
st.Data.VideoId = devsn + "/" + camera_index + "/normal-0"
topic := dj.GetServiceTopic(gateway_sn)
//md5 := gmd5.MustEncryptString(st.Data.VideoId)
response := send(topic, st, st.Tid)
sl := dj.StopLiveReply{}
err := json.Unmarshal([]byte(response), &sl)
if err != nil {
return err, nil
}
return nil, &sl
}
func deal_services_reply(client mqtt.Client, msg mqtt.Message) {
fmt.Println("消息响应了", msg.Topic())
pub1 := dj.PublicTwo{}
err := json.Unmarshal(msg.Payload(), &pub1)
if err != nil {
return
}
//根据方法得到对应得数据信息
switch pub1.Method {
case dj.FileuploadList: //获取设备可上传的文件列表
fmt.Println("services_reply获取设备可上传的\t\t//获取所有日志消息上传到minio中\n\t\tvar rdata *dj.FileUpLoadListDownUp\n\t\terr := json.Unmarshal(msg.Payload(), &rdata)\n\t\tif err != nil {\n\t\t\treturn\n\t\t}\n\t\treply := dj.LogFileUploadReply{}\n\t\treply.SetDefaultValue(rdata)\n\t\tmarshal, err := json.Marshal(reply)\n\t\tif err != nil {\n\t\t\tfmt.Println(err)\n\t\t}\n\t\treplace := strings.Replace(msg.Topic(), \"_reply\", \"\", 1)\n\t\tclient.Publish(replace, 1, true, string(marshal))\n\t\tbreak文件列表==================>", string(msg.Payload()))
case dj.FileuploadStart:
fmt.Println("services_reply发起日志文件上传==================>", string(msg.Payload()))
break
case dj.DeviceFormat:
fmt.Println("机场数据格式化==================>", string(msg.Payload()))
break
case "drone_format":
fmt.Println("飞行器数据格式化==================>", string(msg.Payload()))
break
case dj.ReturnHome1:
fmt.Println("返航: ", string(msg.Payload()))
case dj.OtaCreate:
fmt.Println("固件升级: ", string(msg.Payload()))
}
publisher, ok := eventMap[pub1.Tid]
if ok {
publisher.Publish(msg.Payload())
delete(eventMap, pub1.Tid)
}
}
func ReplyMsg(topic string, dt interface{}) {
marshal, err := json.Marshal(dt)
if err != nil {
return
}
fmt.Println("回复任务资源消息", topic, string(marshal))
client.Publish(topic, 1, true, string(marshal))
}
func send(topic string, dt interface{}, tid string) string {
marshal, err := json.Marshal(dt)
if err != nil {
return ""
}
fmt.Println("開始推送", topic, string(marshal))
client.Publish(topic, 1, true, string(marshal))
mqttResponseChan := make(chan string)
timeout := time.After(15 * time.Second) // 创建一个3秒后超时的定时器
e := event.GetEvent(func(argc ...any) {
/*这里的代码后执行*/
fmt.Println("响应了")
delete(eventMap, tid)
b := argc[0].([]byte)
response := string(b)
fmt.Println(response)
mqttResponseChan <- response
})
eventMap[tid] = e
select {
case response := <-mqttResponseChan:
return response
case <-timeout:
return ""
}
//// 这里的代码会比前面的事件内部回调先执行,所以需 阻塞等待 MQTT 响应消息
//response := <-mqttResponseChan
//return response
}
func Device_reboot(gateway_sn string) (error, *dj.UsualReply) {
reboot := dj.Reboot{}
reboot.SetDefaultValue()
return send_msg(gateway_sn, reboot)
}
func send_msg(gateway_sn string, in interface{}) (error, *dj.UsualReply) {
marshal, err := json.Marshal(in)
if err != nil {
return err, nil
}
public := dj.Public{}
err = json.Unmarshal(marshal, &public)
if err != nil {
return err, nil
}
topic := dj.GetServiceTopic(gateway_sn)
response := send(topic, in, public.Tid)
sl := dj.UsualReply{}
err = json.Unmarshal([]byte(response), &sl)
if err != nil {
return err, nil
}
return nil, &sl
}
func Debug_mode_open(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
debug := dj.DebugOpen{}
debug.SetDefaultValue()
err, reply := send_msg(gateway_sn, debug)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, debug.Method, "down", reply.Data.Result)
}
return err, reply
}
func Debug_mode_close(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
debug := dj.DebugClose{}
debug.SetDefaultValue()
err, reply := send_msg(gateway_sn, debug)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, debug.Method, "down", reply.Data.Result)
}
return err, reply
}
func Putter_close(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.PutterClose{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
func Putter_open(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.PutterOpen{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
func Charge_close(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.ChargeClose{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
func Charge_open(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.ChargeOpen{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
func Cover_open(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.CoverOpen{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
func Cover_close(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.CoverClose{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
func Drone_open(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
fmt.Println("飞行器开机")
co := dj.DroneOpen{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
func Drone_close(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.DroneClose{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*执行任务*/
func Flighttask_execute(ctx context.Context, gateway_sn, flight_id string) (error, *dj.UsualReply) {
co := dj.FlighttaskExecute{}
co.SetDefaultValue()
co.Data.FlightId = flight_id
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*取消任务*/
func Flighttask_undo(ctx context.Context, gateway_sn string, flight_ids []string) (error, *dj.UsualReply) {
co := dj.FlighttaskUndo{}
co.SetDefaultValue()
co.Data.Flight_ids = flight_ids
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*航线暂停*/
func Flighttask_pause(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.FlighttaskPause{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*航线恢复*/
func Flighttask_recovery(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.FlighttaskRecovery{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*一键返航*/
func Return_home(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.ReturnHome{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*取消返航*/
func Return_home_cancel(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
co := dj.FlighttaskRecovery{}
co.SetDefaultValue()
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*摄像头单拍(拍照)*/
func Camera_photo_take(ctx context.Context, gateway_sn, payload_index string) (error, *dj.UsualReply) {
co := dj.CameraPhotoTake{}
co.SetDefaultValue()
co.Data.PayloadIndex = payload_index
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*重置云台*/
func Gimbal_reset(ctx context.Context, gateway_sn, payload_index string, reset_mode int) (error, *dj.UsualReply) {
co := dj.GimbalReset{}
co.SetDefaultValue()
co.Data.PayloadIndex = payload_index
co.Data.ResetMode = reset_mode
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*设置直播清晰度*/
func Live_set_quality(ctx context.Context, gateway_sn, two_sn, payload_index string, enum int) (error, *dj.UsualReply) {
co := dj.LiveSetQuality{}
co.SetDefaultValue()
co.Data.VideoId = two_sn + "/" + payload_index + "/normal-0"
co.Data.VideoQuality = enum
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
///*设置直播镜头*/
//func Camera_photo_take(gateway_sn, payload_index string) (error, *dj.UsualReply) {
// co := dj.CameraPhotoTake{}
// co.SetDefaultValue()
// co.Data.PayloadIndex = payload_index
// return send_msg(gateway_sn, co)
//}
/*下发任务*/
func Flighttask_prepare(ctx context.Context, gateway_sn string, prepareRes *dj.FlighttaskPrepareRes) (string, int64, error, *dj.UsualReply) {
get, err := g.Redis().Get(ctx, RainfallStr+gateway_sn)
if err == nil && get.String() == " " {
err = errors.New("无法获取降雨量")
return "", 0, err, nil
} else if get.Int() > 0 {
err = errors.New("机舱外正在下雨,无法下发任务!")
return "", 0, err, nil
}
co := dj.FlighttaskPrepareRes{}
co.SetDefaultValueCory()
data := prepareRes.Data
co.Data.FlightId = data.FlightId
co.Data.TaskType = data.TaskType
co.Data.ExecuteTime = data.ExecuteTime
co.Data.WaylineType = data.WaylineType
co.Data.File = data.File
co.Data.RthAltitude = data.RthAltitude
co.Data.RthMode = data.RthMode
co.Data.OutOfControlAction = data.OutOfControlAction
co.Data.ExitWaylineWhenRcLost = data.ExitWaylineWhenRcLost
//断点信息
if data.BreakPoint != nil { //断点续飞
point := dj.BreakPoint{
Index: data.BreakPoint.Index,
State: data.BreakPoint.State,
Progress: data.BreakPoint.Progress,
WaylineId: data.BreakPoint.WaylineId,
}
co.Data.BreakPoint = &point
}
err, reply := send_msg(gateway_sn, co)
tid := co.Tid
bid := co.Timestamp
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return tid, bid, err, reply
}
/*任务资源获取*/
func Flighttask_resource_get(ctx context.Context, gateway_sn string, fileEneiey *dj.FileEntity) (error, *dj.UsualReply) {
co := dj.FlighttaskResourcGetReply{}
co.SetDefaultValue()
co.Data.Output.File.Fingerprint = fileEneiey.Fingerprint
co.Data.Output.File.Url = fileEneiey.Url
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
/*获取通用错误*/
func Geterror(code int) error {
if v, ok := dj.GetErrorMap()[code]; ok {
return globe.GetErrors(v.Msg)
} else {
//manageOperationLog.AddLog(gctx.New(), sn, method, direction, code)
return globe.GetErrors("未知错误:" + strconv.Itoa(code))
}
}
// 结束飞向目标点
func Fly_to_point(ctx context.Context, gateway_sn string, point dj.Point) (error, *dj.UsualReply) {
toPoint := dj.FlyToPoint{}
toPoint.SetDefaultValue()
toPoint.Data.Points = append(toPoint.Data.Points, point)
toPoint.Data.MaxSpeed = 3
toPoint.Data.FlyToId = gtime.TimestampMicroStr()
err, reply := send_msg(gateway_sn, toPoint)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, toPoint.Method, "down", reply.Data.Result)
}
return err, reply
}
// 结束飞向目标点
func Fly_to_point_stop(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
toPoint := dj.FlyToPointStop{}
toPoint.SetDefaultValue()
err, reply := send_msg(gateway_sn, toPoint)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, toPoint.Method, "down", reply.Data.Result)
}
return err, reply
}
/*飞行夺权*/
func Flight_authority_grab(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
grab := dj.FlightAuthorityGrab{}
grab.SetDefaultValue()
err, reply := send_msg(gateway_sn, grab)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, grab.Method, "down", reply.Data.Result)
}
return err, reply
}
// Fileupload_list 【远程日志】获取设备可上传的文件列表
func Fileupload_list(ctx context.Context, gateway_sn string, module []string) (error, *dj.UsualReply) {
co := dj.FileUpLoadListDown{}
co.SetDefaultValue()
co.Data.ModuleList = module
err, reply := send_msg(gateway_sn, co)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
}
return err, reply
}
//// DeviceFormat 机场数据格式化
//func DeviceFormat(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
// co := dj.Public{}
// co.SetDefault()
// co.Method = "device_format"
// err, reply := send_msg(gateway_sn, co)
// if err == nil { //记录操作日志
// //manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
// }
// return err, reply
//}
//
//// DroneFormat 飞行器数据格式化
//func DroneFormat(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
// co := dj.Public{}
// co.SetDefault()
// co.Method = "drone_format"
// err, reply := send_msg(gateway_sn, co)
// if err == nil { //记录操作日志
// manageOperationLog.AddLog(ctx, gateway_sn, co.Method, "down", reply.Data.Result)
// }
// return err, reply
//}
func Speaker_tts_play_start(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
sp := dj.SpeakerTtsPlayStart{}
sp.SetDefaultValue()
sp.Data.Tts.Name = "111"
sp.Data.Tts.Text = "111"
sp.Data.Tts.Md5 = "0bfb9bceee974f41a6ddfd81521bd795"
sp.Data.PsdkIndex = 2
err, reply := send_msg(gateway_sn, sp)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, sp.Method, "down", reply.Data.Result)
}
return err, reply
}
func speaker_audio_play_start(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
sp := dj.SpeakerAudioPlayStart{}
sp.SetDefaultValue()
sp.Data.PsdkIndex = 0
sp.Data.File.Name = "output"
sp.Data.File.Url = "http://" + coryCommon.Global + ":8899/file/masterMask/coryStorageTemplate/output.pcm"
sp.Data.File.Md5 = "f69050c198f3a765a56020f5470d3bdf"
sp.Data.File.Format = "pcm"
err, reply := send_msg(gateway_sn, sp)
if err == nil { //记录操作日志
manageOperationLog.AddLog(ctx, gateway_sn, sp.Method, "down", reply.Data.Result)
}
return err, reply
}
func OtaCreateFunc(ctx context.Context, gateway_sn string) (error, *dj.UsualReply) {
sp := dj.OtaCreateEntity{Data: struct {
DeviceList []dj.OtaCreateDevice `json:"devices"`
}{
DeviceList: []dj.OtaCreateDevice{
{
FileName: "M4TD_13.01.00.07_pro.zip",
FileSize: 712498,
FileURL: "http://zmkg.cqet.top:8899/file/temporary/M4TD_13.01.00.07_pro.zip",
FirmwareUpgradeType: 3,
MD5: "acb28f172e62b38d2bfd797a522076a9",
ProductVersion: "13.01.00.07",
SN: "1581F8HGX253U00A0626",
},
//{
// FileName: "DOCK3_13.01.00.07_pro.zip",
// FileSize: 229252,
// FileURL: "http://zmkg.cqet.top:8899/file/temporary/DOCK3_13.01.00.07_pro.zip",
// FirmwareUpgradeType: 3,
// MD5: "c217ee9a0bfc4cdfdda0a89407c504d7",
// ProductVersion: "13.01.00.07",
// SN: "8UUXN4P00A06NK",
//},
},
}}
sp.SetDefaultValue()
err, reply := send_msg(gateway_sn, sp)
fmt.Println("开始固件升级.......")
//if err == nil { //记录操作日志dronedronedronedrddadadasdeddddd
// manageOperationLog.AddLog(ctx, gateway_sn, sp.Method, "down", reply.Data.Result)
//}
return err, reply
}
//// 获取文件的md5和大小
//func aaaaa() {
//
// path := "C:\\Users\\MSI\\Downloads\\M4TD_13.01.00.07_pro.zip"
// // 检查文件是否为ZIP格式
// if !strings.HasSuffix(strings.ToLower(path), ".zip") {
// fmt.Println("错误请提供ZIP格式的文件")
// return
// }
//
// // 获取文件大小
// fileInfo, err := os.Stat(path)
// if err != nil {
// fmt.Printf("无法获取文件信息: %v\n", err)
// return
// }
// sizeKB := float64(fileInfo.Size()) / 1024.0
// fmt.Printf("ZIP文件大小: %.2f KB\n", sizeKB)
//
// // 计算MD5
// file, err := os.Open(path)
// if err != nil {
// fmt.Printf("无法打开文件: %v\n", err)
// return
// }
// defer file.Close()
//
// md5Hash := md5.New()
// if _, err := io.Copy(md5Hash, file); err != nil {
// fmt.Printf("计算MD5失败: %v\n", err)
// return
// }
// md5Sum := hex.EncodeToString(md5Hash.Sum(nil))
// fmt.Printf("ZIP文件MD5: %s\n", md5Sum)
//}