From 7849d09a18296e718d4fd29ccbfd5d717e57f127 Mon Sep 17 00:00:00 2001 From: eson Date: Mon, 7 Sep 2020 17:13:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90autostore.=20TODO:=20?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2old=20store=20=E7=B1=BB.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- autostore.go | 69 +++++++++++++++++++++++++++++++++++++++++++++++ autostore_test.go | 68 +--------------------------------------------- 2 files changed, 70 insertions(+), 67 deletions(-) diff --git a/autostore.go b/autostore.go index 48c3f4f..a7a7e65 100644 --- a/autostore.go +++ b/autostore.go @@ -10,6 +10,8 @@ import ( "time" ) +/*Store 结构体. 必须使用tag. field 数据库字段标签 uid 唯一id字段标签必须存在 + */ type Store struct { db *sql.DB } @@ -35,6 +37,7 @@ func NewStore(uri string) *Store { return s } +// Table 选择表. func (store *Store) Table(name string) *Table { table := &Table{store: store} table.name = name @@ -42,9 +45,11 @@ func (store *Store) Table(name string) *Table { table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)` table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?` // table.selectsql = `FROM ` + table.name + `WHERE operator` + table.selectsql = `SELECT %s FROM ` + table.name + ` WHERE %s ` return table } +// Queue mysql 队列结构 type Queue struct { table *Table obj reflect.Type @@ -66,6 +71,7 @@ const ( OpERROR OperatorType = "10000" ) +// Queue 根据Table生成一个队列. 处理结构. 每次弹出一个 obj 是要处理的结构体 自定义的whereCondition条件 func (t *Table) Queue(obj interface{}, whereCondition string) *Queue { q := &Queue{} q.condition = whereCondition @@ -88,6 +94,68 @@ func (t *Table) Queue(obj interface{}, whereCondition string) *Queue { return q } +// Pop 队列弹出一个数据(任务). 参考队列处理 不支持嵌套. +func (queue *Queue) Pop() (result interface{}, err error) { + + db := queue.table.store.db + tx, err := db.Begin() + if err != nil { + return nil, err + } + + defer func() { + cerr := tx.Commit() + if cerr != nil { + log.Println(cerr) + log.Println(tx.Rollback()) + } + }() + + selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.condition + " limit 1 for update" + rows, err := tx.Query(selectsql) + + if err != nil { + return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) + } + + var fields = make([]interface{}, queue.obj.NumField()) + for i := range fields { + var iv interface{} + fields[i] = &iv + } + + if rows.Next() { + err = rows.Scan(fields...) + if err != nil { + return nil, err + } + } + + columntypes, err := rows.ColumnTypes() + if err != nil { + return nil, err + } + + if err = rows.Close(); err != nil { + return nil, err + } + + _, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OpWAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx]) + if err != nil { + log.Println(err) + return nil, err + } + + obj := reflect.New(queue.obj).Elem() + for i := 0; i < obj.NumField(); i++ { + field := obj.Field(i) + convert(*fields[i].(*interface{}), field, columntypes[i]) + } + + return obj.Addr().Interface(), err +} + +// Insert nil 不插入. 不支持嵌套. func (t *Table) Insert(obj interface{}) error { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) @@ -130,6 +198,7 @@ func (t *Table) Insert(obj interface{}) error { return err } +// Update 结构体更新 func (t *Table) Update(obj interface{}) error { ov := reflect.ValueOf(obj).Elem() diff --git a/autostore_test.go b/autostore_test.go index f670675..d0aaae5 100644 --- a/autostore_test.go +++ b/autostore_test.go @@ -3,79 +3,13 @@ package intimate import ( "database/sql" "encoding/json" - "fmt" - "log" - "reflect" "testing" "time" "github.com/davecgh/go-spew/spew" ) -func (queue *Queue) Pop() (result interface{}, err error) { - - db := queue.table.store.db - tx, err := db.Begin() - if err != nil { - return nil, err - } - - defer func() { - cerr := tx.Commit() - if cerr != nil { - log.Println(cerr) - log.Println(tx.Rollback()) - } - }() - - selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.condition + " limit 1 for update" - rows, err := tx.Query(selectsql) - // err = rows.Err() - if err != nil { - return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) - } - - var fields = make([]interface{}, queue.obj.NumField()) - for i := range fields { - var iv interface{} - fields[i] = &iv - } - - // if !rows.Next() { - // return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) - // } - if rows.Next() { - err = rows.Scan(fields...) - if err != nil { - return nil, err - } - } - - columntypes, err := rows.ColumnTypes() - if err != nil { - return nil, err - } - - if err = rows.Close(); err != nil { - return nil, err - } - - _, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OpWAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx]) - if err != nil { - log.Println(err) - return nil, err - } - - obj := reflect.New(queue.obj).Elem() - for i := 0; i < obj.NumField(); i++ { - field := obj.Field(i) - convert(*fields[i].(*interface{}), field, columntypes[i]) - } - - return obj.Addr().Interface(), err -} - -func TestAutoStore(t *testing.T) { +func estAutoStore(t *testing.T) { uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci" store := NewStore(uri)