TODO: 链表关系错误 TestFlowAdd Error

This commit is contained in:
huangsimin 2019-11-07 17:33:47 +08:00
parent 9b49c5c156
commit 061d84f057
7 changed files with 233 additions and 67 deletions

212
flow.go
View File

@ -1,109 +1,195 @@
package flow package flow
import ( type FlowType int
"log"
"time" const (
TNode FlowType = 1
TRoot FlowType = 2
) )
// FlowContext 流程的上下文, 用于传递每个流程之间的参数 type ExecuteStatus int
type FlowContext struct {
RootFlow *Flow
CurrentFlow *FlowNode
CurrentWrite []byte // 8 byte const (
CurrentRead []byte // 14 byte SFailure ExecuteStatus = 0
SSuccess ExecuteStatus = 1
SAgain ExecuteStatus = 2
)
type IFlow interface {
GetName() string
GetRootFlow() IFlow
SetRootFlow(IFlow)
SetContext(*FlowContext)
GetContext() *FlowContext
GetPath() string
SetPath(path string)
GetNext() IFlow
SetNext(IFlow)
GetPrev() IFlow
SetPrev(IFlow)
Execute() ExecuteStatus
} }
// FlowNode 流程节点 // FlowNode 流程节点
type FlowNode struct { type FlowNode struct {
IFlow
Name string Name string
Path string Path string
prev *FlowNode RootFlow IFlow
next *FlowNode
Task func(cxt *FlowContext) int prev IFlow
next IFlow
Task func(cxt *FlowContext) ExecuteStatus
} }
// Write 该写入函数包含了 自动封装日志格式 func (fn *FlowNode) GetRootFlow() IFlow {
func (cxt *FlowContext) Write(flag OperatorFlag) { return fn.RootFlow
}
operator.portWriterLock.Lock() func (fn *FlowNode) SetRootFlow(fl IFlow) {
wbuf := OperatorOption(flag) fn.RootFlow = fl
operator.port.Write(wbuf)
operator.port.Flush()
cxt.CurrentWrite = wbuf
operator.portWriterLock.Unlock()
if len(wbuf) == 8 {
flowpath := cxt.CurrentFlow.Path + ">" + cxt.CurrentFlow.Name // 路径
_, err := operator.oplog.Exec("insert into operator(ts , rootflow, flowpath, writelog, readlog) values(?,?,?,?,?)", time.Now().Unix(), cxt.RootFlow.Name, flowpath, cxt.CurrentWrite, cxt.CurrentRead)
if err != nil {
log.Println("日志写入错误: ", err)
}
} else {
log.Println("write buffer len is not equal to 8, now is", len(wbuf), "buf: ", wbuf)
}
} }
// Sensor 返回传感器数值, 每次调用都是最新的. // GetPath Get return Path string
func (cxt *FlowContext) Sensor() *Sensor { func (fn *FlowNode) GetPath() string {
buf := make([]byte, 14) return fn.Path
}
operator.portReaderLock.Lock() // SetPath Set Path string
n, err := operator.port.Read(buf) func (fn *FlowNode) SetPath(Path string) {
cxt.CurrentRead = buf fn.Path = Path
operator.portReaderLock.Unlock() }
if err != nil {
log.Println("read bufferr is error:", err)
} else {
if n == 14 { // GetName Get return Name string
return NewSensor(buf) func (fn *FlowNode) GetName() string {
} return fn.Name
//TODO: 断包, 沾包 处理 }
log.Println("读取长度不等于14, len = ", n)
}
return nil func (fn *FlowNode) GetPrev() IFlow {
return fn.prev
}
func (fn *FlowNode) SetPrev(prev IFlow) {
fn.prev = prev
}
func (fn *FlowNode) GetNext() IFlow {
return fn.next
}
func (fn *FlowNode) SetNext(next IFlow) {
fn.next = next
}
// GetContext Get return Context *FlowContext
func (fn *FlowNode) GetContext() *FlowContext {
return fn.GetRootFlow().GetContext()
}
// SetContext Set Context *FlowContext
func (fn *FlowNode) SetContext(Context *FlowContext) {
fn.GetRootFlow().SetContext(Context)
}
// Execute Get return Name string
func (fn *FlowNode) Execute() ExecuteStatus {
return fn.Task(fn.GetContext())
} }
// Flow 流程 // Flow 流程
type Flow struct { type Flow struct {
Name string IFlow
RootFlow IFlow
Name string
Context *FlowContext // 执行流程时候的上下文 Context *FlowContext // 执行流程时候的上下文
Head *FlowNode Head IFlow
Tail *FlowNode Tail IFlow
} }
func (flow *Flow) GetRootFlow() IFlow {
return flow.RootFlow
}
func (flow *Flow) SetRootFlow(fl IFlow) {
flow.RootFlow = fl
}
// GetContext Get return Context *FlowContext
func (flow *Flow) GetContext() *FlowContext {
return flow.Context
}
// SetContext Set Context *FlowContext
func (flow *Flow) SetContext(Context *FlowContext) {
flow.Context = Context
}
// GetName Get return Name string
func (flow *Flow) GetName() string {
return flow.Name
}
// Execute Get return Name string
// func (flow *Flow) Execute() {
// flow.Context.
// }
// New 创建一个流程, 相当于例子 `干洗` // New 创建一个流程, 相当于例子 `干洗`
func New(name string) *Flow { func New(name string) *Flow {
f := &Flow{Name: name, Context: &FlowContext{}} f := &Flow{Name: name, Context: &FlowContext{}}
return f return f
} }
// Add 添加 func (flow *Flow) GetPrev() IFlow {
func (flow *Flow) Add(name string, task func(cxt *FlowContext) int) { return flow.Head.GetPrev()
}
node := &FlowNode{Name: name, Task: task} func (flow *Flow) SetPrev(prev IFlow) {
// panic("flow can't call SetPrev")
flow.Head.SetPrev(prev)
}
func (flow *Flow) GetNext() IFlow {
return flow.Tail.GetNext()
}
func (flow *Flow) SetNext(next IFlow) {
// panic("flow can't call SetNext")
flow.Tail.SetNext(next)
}
// Add 添加
func (flow *Flow) Add(fl IFlow) {
if flow.Head == nil { if flow.Head == nil {
flow.Head = node flow.Head = fl
flow.Tail = node flow.Tail = fl
fl.SetRootFlow(flow)
node.Path = node.Name fl.SetContext(flow.GetContext())
fl.SetPath(fl.GetName())
return return
} }
node.Path = flow.Tail.Path + ">" + flow.Tail.Name fl.SetPath(flow.Tail.GetPath() + ">" + flow.Tail.GetName())
fl.SetRootFlow(flow)
fl.SetContext(flow.GetContext())
flow.Tail.next = node fl.SetPrev(flow.Tail)
flow.Tail = node flow.Tail.SetNext(fl)
flow.Tail = fl
if node.next != nil { if fl.GetNext() != nil {
panic("tail next is not nil") panic("tail next is not nil")
} }
} }

17
flow_test.go Normal file
View File

@ -0,0 +1,17 @@
package flow
import (
"testing"
"github.com/davecgh/go-spew/spew"
)
func TestFlowAdd(t *testing.T) {
flow := New("干洗")
flow.Add(&FlowNode{Name: "干洗01"})
flow.Add(&FlowNode{Name: "干洗02"})
flow.Add(&FlowNode{Name: "干洗03"})
t.Error(spew.Sdump(flow))
}

59
flowcontext.go Normal file
View File

@ -0,0 +1,59 @@
package flow
import (
"log"
"time"
)
// FlowContext 流程的上下文, 用于传递每个流程之间的参数
type FlowContext struct {
RootFlow *Flow
CurrentFlow *FlowNode
CurrentWrite []byte // 8 byte
CurrentRead []byte // 14 byte
}
// Sensor 返回传感器数值, 每次调用都是最新的.
func (cxt *FlowContext) Sensor() *Sensor {
buf := make([]byte, 14)
operator.portReaderLock.Lock()
n, err := operator.port.Read(buf)
cxt.CurrentRead = buf
operator.portReaderLock.Unlock()
if err != nil {
log.Println("read bufferr is error:", err)
} else {
if n == 14 {
return NewSensor(buf)
}
//TODO: 断包, 沾包 处理
log.Println("读取长度不等于14, len = ", n)
}
return nil
}
// Write 该写入函数包含了 自动封装日志格式
func (cxt *FlowContext) Write(flag OperatorFlag) {
operator.portWriterLock.Lock()
wbuf := OperatorOption(flag)
operator.port.Write(wbuf)
operator.port.Flush()
cxt.CurrentWrite = wbuf
operator.portWriterLock.Unlock()
if len(wbuf) == 8 {
flowpath := cxt.CurrentFlow.Path + ">" + cxt.CurrentFlow.Name // 路径
_, err := operator.oplog.Exec("insert into operator(ts , rootflow, flowpath, writelog, readlog) values(?,?,?,?,?)", time.Now().Unix(), cxt.RootFlow.Name, flowpath, cxt.CurrentWrite, cxt.CurrentRead)
if err != nil {
log.Println("日志写入错误: ", err)
}
} else {
log.Println("write buffer len is not equal to 8, now is", len(wbuf), "buf: ", wbuf)
}
}

1
go.mod
View File

@ -3,6 +3,7 @@ module 474420502.top/test/flow
go 1.13 go 1.13
require ( require (
github.com/davecgh/go-spew v1.1.1
github.com/mattn/go-sqlite3 v1.11.0 github.com/mattn/go-sqlite3 v1.11.0
github.com/smartystreets/goconvey v1.6.4 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07

2
go.sum
View File

@ -1,3 +1,5 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=

2
my.cfg
View File

@ -1,4 +1,4 @@
[config] [config]
portid = /dev/pts3 portid = /dev/pts/14
baud = 9600 baud = 9600
readtimeout = 5 readtimeout = 5

View File

@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
_ "github.com/mattn/go-sqlite3"
serial "github.com/tarm/serial" serial "github.com/tarm/serial"
"gopkg.in/ini.v1" "gopkg.in/ini.v1"
) )
@ -91,16 +92,16 @@ func init() {
var rtimeout time.Duration var rtimeout time.Duration
var cfg *ini.File var cfg *ini.File
cfg, err := ini.Load("my_test.cfg") cfg, err := ini.Load("my.cfg")
if err != nil { if err != nil {
log.Println(err) log.Println(err)
log.Println("加载配置my.cfg失败, 将使用默认值") log.Println("加载配置my.cfg失败, 将使用默认值")
f, err := os.OpenFile("./my_test.cfg", os.O_CREATE|os.O_WRONLY, 0666) f, err := os.OpenFile("./my.cfg", os.O_CREATE|os.O_WRONLY, 0666)
if err != nil { if err != nil {
panic(err) panic(err)
} }
f.WriteString("[config]\nportid = COM1\nbaud = 9600\nreadtimeout = 5") f.WriteString("[config]\nportid = COM1\nbaud = 9600\nreadtimeout = 5")
cfg, err = ini.Load("my_test.cfg") cfg, err = ini.Load("my.cfg")
if err != nil { if err != nil {
panic(err) panic(err)
} }