DPF/worker.go

232 lines
4.2 KiB
Go
Raw Permalink Normal View History

2019-11-01 13:47:11 +00:00
package main
import (
2019-11-03 18:00:35 +00:00
"encoding/binary"
2019-11-01 13:47:11 +00:00
"log"
"sync"
"time"
)
2019-11-02 07:48:16 +00:00
// Register 操作注册表
2019-11-04 06:29:08 +00:00
var Register map[CommandType]func(worker *Worker)
2019-11-02 07:48:16 +00:00
// init 初始化
func init() {
2019-11-04 06:29:08 +00:00
Register = make(map[CommandType]func(worker *Worker))
2019-11-03 18:00:35 +00:00
2019-11-04 06:29:08 +00:00
Register[CMDChaoShengBoQingXi] = ChaoShengBoQingXi
Register[CMDQingXiGuanFangShui] = QingXiGuanFangShui
2019-11-02 07:48:16 +00:00
}
2019-11-01 13:47:11 +00:00
// Worker 接收命令
type Worker struct {
commandLock *sync.Mutex
command *Command
port *SerialPort // 串口相关
readlogsLock *sync.Mutex
readlogs []Log
writelogsLock *sync.Mutex
isOperating bool
writelogs []Log
isStop int
waitGroup *sync.WaitGroup
}
2019-11-03 18:00:35 +00:00
func OperatorOption(flag OperatorFlag) []byte {
var buf []byte = make([]byte, 8, 8)
buf[0] = byte(0xaa)
buf[1] = byte(0x55)
binary.BigEndian.PutUint16(buf[2:], uint16(flag))
check := byte(0)
for _, b := range buf {
check += b
}
buf[7] = check
return buf
}
2019-11-01 13:47:11 +00:00
// Log 日志
type Log struct {
Data []byte
Time time.Time
}
// NewWorker 创建一个工人
func NewWorker() *Worker {
w := &Worker{}
w.isStop = 0
w.waitGroup = &sync.WaitGroup{}
2019-11-04 06:29:08 +00:00
w.command = CreateCommand(CMDDoNothing)
2019-11-01 13:47:11 +00:00
w.commandLock = new(sync.Mutex)
w.readlogsLock = &sync.Mutex{}
w.readlogs = make([]Log, 0, 1000)
w.writelogsLock = &sync.Mutex{}
w.writelogs = make([]Log, 0, 1000)
w.isOperating = false
return w
}
2019-11-03 18:00:35 +00:00
// Sensor 当前传感器状态
func (worker *Worker) Sensor() *Sensor {
return NewSensor(worker.Read())
}
func (worker *Worker) Write(data []byte) {
2019-11-02 07:48:16 +00:00
2019-11-01 13:47:11 +00:00
worker.writelogsLock.Lock()
worker.isOperating = true
worker.writelogs = append(worker.writelogs, Log{data, time.Now()})
if len(worker.writelogs) >= 1000 {
worker.writelogs = worker.writelogs[500:1000]
}
worker.writelogsLock.Unlock()
2019-11-02 07:48:16 +00:00
2019-11-01 13:47:11 +00:00
}
2019-11-03 18:00:35 +00:00
// Read 如果没有读到数据为nil
func (worker *Worker) Read() (result []byte) {
2019-11-01 13:47:11 +00:00
worker.readlogsLock.Lock()
if len(worker.readlogs) > 0 {
copy(worker.readlogs[len(worker.readlogs)-1].Data, result)
}
2019-11-03 18:40:20 +00:00
worker.readlogsLock.Unlock()
2019-11-01 13:47:11 +00:00
return result
}
2019-11-04 06:29:08 +00:00
// SendCommand 发送任务
func (worker *Worker) SendCommand(cmdtype CommandType) {
worker.commandLock.Lock()
worker.command = CreateCommand(cmdtype)
worker.isOperating = true
worker.commandLock.Unlock()
log.Println("SendCommand: ", worker.command, worker.command.commandTime)
}
2019-11-01 13:47:11 +00:00
func (worker *Worker) operator(wait *sync.WaitGroup) {
defer wait.Done()
if CurrentOS == "linux" {
for {
if worker.isStop > 0 {
break
}
now := time.Now()
2019-11-02 07:48:16 +00:00
worker.commandLock.Lock()
2019-11-01 13:47:11 +00:00
if worker.isOperating {
2019-11-02 07:48:16 +00:00
if now.Sub(worker.command.commandTime).Seconds() >= 5 {
2019-11-01 13:47:11 +00:00
// TODO: 操作
2019-11-02 07:48:16 +00:00
if operate, ok := Register[worker.command.commands]; ok {
operate(worker)
}
2019-11-01 13:47:11 +00:00
worker.isOperating = false
}
}
2019-11-02 07:48:16 +00:00
worker.commandLock.Unlock()
2019-11-01 13:47:11 +00:00
}
} else { // Windows
}
}
func (worker *Worker) status(wait *sync.WaitGroup) {
defer wait.Done()
if CurrentOS == "linux" {
for {
if worker.isStop > 0 {
break
}
2019-11-03 18:40:20 +00:00
buf := make([]byte, 14)
2019-11-01 13:47:11 +00:00
n, err := worker.port.linuxRPort.Read(buf)
if err != nil {
2019-11-03 18:40:20 +00:00
log.Println("linuxRPort read error:", err)
2019-11-01 13:47:11 +00:00
} else {
2019-11-03 18:40:20 +00:00
worker.readlogsLock.Lock()
worker.readlogs = append(worker.readlogs, Log{buf, time.Now()})
if len(worker.readlogs) >= 1000 {
worker.readlogs = worker.readlogs[500:1000]
2019-11-01 13:47:11 +00:00
}
2019-11-03 18:40:20 +00:00
log.Println("data size ", n)
log.Println("data: ", buf)
worker.readlogsLock.Unlock()
2019-11-01 13:47:11 +00:00
}
time.Sleep(time.Millisecond * 50)
}
} else { // windows
for {
2019-11-02 07:48:16 +00:00
if worker.isStop > 0 {
break
}
2019-11-01 13:47:11 +00:00
var buf []byte
n, err := worker.port.windowsRWPort.Read(buf)
if err != nil {
log.Println(err)
} else {
2019-11-02 07:48:16 +00:00
2019-11-01 13:47:11 +00:00
if n > 0 {
log.Println("data size ", n)
log.Panicln("data: ", buf)
2019-11-02 07:48:16 +00:00
worker.readlogsLock.Lock()
worker.readlogs = append(worker.readlogs, Log{buf, time.Now()})
if len(worker.readlogs) >= 1000 {
worker.readlogs = worker.readlogs[500:1000]
}
log.Println("data size ", n)
log.Panicln("data: ", buf)
worker.readlogsLock.Unlock()
2019-11-01 13:47:11 +00:00
}
2019-11-02 07:48:16 +00:00
2019-11-01 13:47:11 +00:00
}
time.Sleep(time.Millisecond * 50)
}
}
}
// Run 运行
func (worker *Worker) Run() {
sp := NewSerialPort()
sp.OpenPort()
worker.port = sp
worker.waitGroup.Add(1)
2019-11-03 18:00:35 +00:00
2019-11-01 13:47:11 +00:00
go worker.status(worker.waitGroup)
2019-11-03 18:00:35 +00:00
go worker.operator(worker.waitGroup)
2019-11-01 13:47:11 +00:00
worker.waitGroup.Wait()
}