package main import ( "encoding/binary" "log" "sync" "time" ) // Register 操作注册表 var Register map[CommandType]func(worker *Worker) // init 初始化 func init() { Register = make(map[CommandType]func(worker *Worker)) Register[CMDChaoShengBoQingXi] = ChaoShengBoQingXi Register[CMDQingXiGuanFangShui] = QingXiGuanFangShui } // Worker 接收命令 type Worker struct { commandLock *sync.Mutex command *Command port *SerialPort // 串口相关 readlogsLock *sync.Mutex readlogs []Log writelogsLock *sync.Mutex isOperating bool writelogs []Log isStop int waitGroup *sync.WaitGroup } func OperatorOption(flag OperatorFlag) []byte { var buf []byte = make([]byte, 8, 8) buf[0] = byte(0xaa) buf[1] = byte(0x55) binary.BigEndian.PutUint16(buf[2:], uint16(flag)) check := byte(0) for _, b := range buf { check += b } buf[7] = check return buf } // Log 日志 type Log struct { Data []byte Time time.Time } // NewWorker 创建一个工人 func NewWorker() *Worker { w := &Worker{} w.isStop = 0 w.waitGroup = &sync.WaitGroup{} w.command = CreateCommand(CMDDoNothing) w.commandLock = new(sync.Mutex) w.readlogsLock = &sync.Mutex{} w.readlogs = make([]Log, 0, 1000) w.writelogsLock = &sync.Mutex{} w.writelogs = make([]Log, 0, 1000) w.isOperating = false return w } // Sensor 当前传感器状态 func (worker *Worker) Sensor() *Sensor { return NewSensor(worker.Read()) } func (worker *Worker) Write(data []byte) { worker.writelogsLock.Lock() worker.isOperating = true worker.writelogs = append(worker.writelogs, Log{data, time.Now()}) if len(worker.writelogs) >= 1000 { worker.writelogs = worker.writelogs[500:1000] } worker.writelogsLock.Unlock() } // Read 如果没有读到数据为nil func (worker *Worker) Read() (result []byte) { worker.readlogsLock.Lock() if len(worker.readlogs) > 0 { copy(worker.readlogs[len(worker.readlogs)-1].Data, result) } worker.readlogsLock.Unlock() return result } // SendCommand 发送任务 func (worker *Worker) SendCommand(cmdtype CommandType) { worker.commandLock.Lock() worker.command = CreateCommand(cmdtype) worker.isOperating = true worker.commandLock.Unlock() log.Println("SendCommand: ", worker.command, worker.command.commandTime) } func (worker *Worker) operator(wait *sync.WaitGroup) { defer wait.Done() if CurrentOS == "linux" { for { if worker.isStop > 0 { break } now := time.Now() worker.commandLock.Lock() if worker.isOperating { if now.Sub(worker.command.commandTime).Seconds() >= 5 { // TODO: 操作 if operate, ok := Register[worker.command.commands]; ok { operate(worker) } worker.isOperating = false } } worker.commandLock.Unlock() } } else { // Windows } } func (worker *Worker) status(wait *sync.WaitGroup) { defer wait.Done() if CurrentOS == "linux" { for { if worker.isStop > 0 { break } buf := make([]byte, 14) n, err := worker.port.linuxRPort.Read(buf) if err != nil { log.Println("linuxRPort read error:", err) } else { worker.readlogsLock.Lock() worker.readlogs = append(worker.readlogs, Log{buf, time.Now()}) if len(worker.readlogs) >= 1000 { worker.readlogs = worker.readlogs[500:1000] } log.Println("data size ", n) log.Println("data: ", buf) worker.readlogsLock.Unlock() } time.Sleep(time.Millisecond * 50) } } else { // windows for { if worker.isStop > 0 { break } var buf []byte n, err := worker.port.windowsRWPort.Read(buf) if err != nil { log.Println(err) } else { if n > 0 { log.Println("data size ", n) log.Panicln("data: ", buf) worker.readlogsLock.Lock() worker.readlogs = append(worker.readlogs, Log{buf, time.Now()}) if len(worker.readlogs) >= 1000 { worker.readlogs = worker.readlogs[500:1000] } log.Println("data size ", n) log.Panicln("data: ", buf) worker.readlogsLock.Unlock() } } time.Sleep(time.Millisecond * 50) } } } // Run 运行 func (worker *Worker) Run() { sp := NewSerialPort() sp.OpenPort() worker.port = sp worker.waitGroup.Add(1) go worker.status(worker.waitGroup) go worker.operator(worker.waitGroup) worker.waitGroup.Wait() }