intimate/autostore_test.go
2020-09-07 15:13:50 +08:00

462 lines
10 KiB
Go

package intimate
import (
"database/sql"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"reflect"
"strconv"
"testing"
"github.com/davecgh/go-spew/spew"
)
type Store struct {
db *sql.DB
}
type Table struct {
store *Store
name string
setting interface{}
updatesql string
selectsql string
insertsql string
}
// const updatesql = "UPDATE %s SET %s WHERE %s = ?"
func NewStore(uri string) *Store {
db, err := sql.Open("mysql", uri)
if err != nil {
panic(err)
}
s := &Store{db: db}
return s
}
func (store *Store) Table(name string) *Table {
table := &Table{store: store}
table.name = name
table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)`
table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?`
// table.selectsql = `FROM ` + table.name + `WHERE operator`
return table
}
type Queue struct {
table *Table
obj reflect.Type
selected string
condition string
uidname string
uididx int
}
type OperatorType string
const (
OP_OK OperatorType = "0"
OP_WAIT OperatorType = "1000"
OP_ERROR OperatorType = "10000"
)
func (queue *Queue) Pop() (result interface{}, err error) {
db := queue.table.store.db
tx, err := db.Begin()
if err != nil {
return nil, err
}
defer func() {
cerr := tx.Commit()
if cerr != nil {
log.Println(cerr)
log.Println(tx.Rollback())
}
}()
selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.condition + " limit 1 for update"
rows, err := tx.Query(selectsql)
// err = rows.Err()
if err != nil {
return nil, fmt.Errorf("table: %s queue is empty", queue.table.name)
}
var fields = make([]interface{}, queue.obj.NumField())
for i := range fields {
var iv interface{}
fields[i] = &iv
}
// if !rows.Next() {
// return nil, fmt.Errorf("table: %s queue is empty", queue.table.name)
// }
if rows.Next() {
err = rows.Scan(fields...)
if err != nil {
return nil, err
}
}
columntypes, err := rows.ColumnTypes()
if err != nil {
return nil, err
}
if err = rows.Close(); err != nil {
return nil, err
}
_, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OP_WAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx])
if err != nil {
log.Println(err)
return nil, err
}
obj := reflect.New(queue.obj).Elem()
for i := 0; i < obj.NumField(); i++ {
field := obj.Field(i)
convert(*fields[i].(*interface{}), field, columntypes[i])
}
return obj.Addr().Interface(), err
}
func TestAutoStore(t *testing.T) {
uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci"
store := NewStore(uri)
queue := store.Table("streamer").Queue(TSreamer{}, "operator = 0")
re, _ := queue.Pop()
pstreamer := re.(*TSreamer)
m := make(map[string]interface{})
json.Unmarshal(pstreamer.Iface.([]byte), &m)
spew.Println(re.(*TSreamer), m)
streamer := &TSreamer{}
streamer.Uid = 2
streamer.UserID = &sql.NullString{String: "hehe", Valid: true}
streamer.Name = "streamer"
streamer.Operator = 0
streamer.Bit = 0b11
streamer.Ext = &sql.NullString{String: "ext", Valid: true}
tag := make(map[string]interface{})
tag["json"] = true
tag["name"] = "test"
btag, err := json.Marshal(tag)
if err != nil {
t.Error(err)
}
streamer.Iface = btag
err = store.Table("streamer").Insert(streamer)
if err != nil {
t.Error(err)
}
}
func assign(field reflect.Value, dest interface{}) (bool, error) {
switch field.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
s := asString(dest)
i64, err := strconv.ParseInt(s, 10, field.Type().Bits())
if err != nil {
err = strconvErr(err)
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err)
}
field.SetInt(i64)
return false, nil
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
s := asString(dest)
u64, err := strconv.ParseUint(s, 10, field.Type().Bits())
if err != nil {
err = strconvErr(err)
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err)
}
field.SetUint(u64)
return false, nil
case reflect.Float32, reflect.Float64:
s := asString(dest)
f64, err := strconv.ParseFloat(s, field.Type().Bits())
if err != nil {
err = strconvErr(err)
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err)
}
field.SetFloat(f64)
return false, nil
case reflect.String:
field.SetString(string(dest.([]byte)))
return false, nil
case reflect.Interface:
return true, nil
}
return false, fmt.Errorf("")
}
func convert(dest interface{}, field reflect.Value, columntype *sql.ColumnType) error {
log.Println("type:", field.Type(), ",kind:", field.Kind(), ",field:", field, "scanType:", columntype.ScanType(), "databaseType:", columntype.DatabaseTypeName())
if field.Kind() == reflect.Ptr {
fn := field.Type().Elem() // New 一个 field.Type().Elem() . 然后判断 columntype 转化 成 NullString Time
field.Set(reflect.New(fn))
field = field.Elem()
log.Println("type:", fn.Name(), ",kind:", field.Kind(), ",fieldtype:", field.Type())
}
// log.Println(field.Kind(), field, reflect.TypeOf(field).Elem().Name(), columntype.ScanType().Kind())
if dest == nil {
return fmt.Errorf("converting NULL to %s is unsupported", field.Kind())
}
switch columntype.DatabaseTypeName() {
case "TINYINT":
fallthrough
case "SMALLINT":
fallthrough
case "MEDIUMINT":
fallthrough
case "INT":
fallthrough
case "BIGINT":
isdefault, err := assign(field, dest)
if err != nil {
return err
}
if isdefault {
s := asString(dest)
i64, err := strconv.ParseInt(s, 10, 64)
if err != nil {
err = strconvErr(err)
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err)
}
// reflect.New(reflect.TypeOf(i64))
field.Set(reflect.ValueOf(i64))
}
return nil
case "FLOAT":
fallthrough
case "DOUBLE":
fallthrough
case "DECIMAL":
isdefault, err := assign(field, dest)
if err != nil {
return err
}
if isdefault {
s := asString(dest)
f64, err := strconv.ParseFloat(s, 64)
if err != nil {
err = strconvErr(err)
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err)
}
field.Set(reflect.ValueOf(f64))
}
return nil
case "BINARY":
fallthrough
case "VARBINARY":
fallthrough
case "TINYBLOB":
fallthrough
case "BLOB":
fallthrough
case "MEDIUMBLOB":
fallthrough
case "LONGBLOB":
fallthrough
case "JSON":
isdefault, err := assign(field, dest)
if err != nil {
return err
}
if isdefault {
field.Set(reflect.ValueOf(dest.([]byte)))
}
case "CHAR":
fallthrough
case "VARCHAR":
fallthrough
case "TINYTEXT":
fallthrough
case "TEXT":
fallthrough
case "MEDIUMTEXT":
fallthrough
case "LONGTEXT":
isdefault, err := assign(field, dest)
if err != nil {
return err
}
if isdefault {
field.Set(reflect.ValueOf(string(dest.([]byte))))
}
return nil
case "BIT":
var bits []byte = make([]byte, 8)
copy(bits, dest.([]byte))
switch field.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
field.SetInt(int64(binary.LittleEndian.Uint64(bits)))
return nil
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
field.SetUint(binary.LittleEndian.Uint64(bits))
return nil
case reflect.Interface:
field.Set(reflect.ValueOf(binary.LittleEndian.Uint64(bits)))
return nil
}
return nil
}
// log.Println(fv, columntype.ScanType().Kind())
if iscan, ok := field.Addr().Interface().(sql.Scanner); ok {
err := iscan.Scan(dest)
if err != nil {
return err
}
return nil
}
return nil
}
func (t *Table) Queue(obj interface{}, whereCondition string) *Queue {
q := &Queue{}
q.condition = whereCondition
q.obj = reflect.TypeOf(obj)
q.table = t
for i := 0; i < q.obj.NumField(); i++ {
field := q.obj.Field(i)
if fname, ok := field.Tag.Lookup("field"); ok {
q.selected += fname + ","
if _, ok := field.Tag.Lookup("uid"); ok {
q.uididx = i
q.uidname = fname
}
}
}
q.selected = q.selected[:len(q.selected)-1]
return q
}
func (t *Table) Insert(obj interface{}) error {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
fieldsql := ""
argssql := ""
var args []interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
if flag, ok := ftype.Tag.Lookup("uid"); ok {
if flag == "auto" {
continue
}
}
k := ftype.Type.Kind()
if k == reflect.Ptr || k == reflect.Interface {
if !field.IsNil() {
felem := field.Elem()
args = append(args, felem.Interface())
fieldsql += fname + ","
argssql += "?,"
}
} else {
args = append(args, field.Interface())
fieldsql += fname + ","
argssql += "?,"
}
}
}
ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1])
_, err := t.store.db.Exec(ssql, args...)
return err
}
func (t *Table) Update(obj interface{}) error {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
fieldsql := ""
var uidname string
var uidvalue interface{}
var args []interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
if _, ok := ftype.Tag.Lookup("uid"); ok {
if uidvalue != nil {
panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname))
}
uidname = fname
uidvalue = field.Interface()
continue
}
k := ftype.Type.Kind()
if k == reflect.Ptr || k == reflect.Interface {
if !field.IsNil() {
felem := field.Elem()
args = append(args, felem.Interface())
fieldsql += fname + " = ?,"
}
} else {
args = append(args, field.Interface())
fieldsql += fname + " = ?,"
}
}
}
if uidvalue == nil {
panic(fmt.Errorf("update must contain `uid` tag"))
}
usql := fmt.Sprintf(t.updatesql, fieldsql[:len(fieldsql)-1], uidname)
args = append(args, uidvalue)
_, err := t.store.db.Exec(usql, args...)
return err
}
type TSreamer struct {
Uid int `field:"uid" uid:"auto"`
Name interface{} `field:"name"`
UserID *sql.NullString `field:"userid"`
Ext *sql.NullString `field:"ext"`
Iface interface{} `field:"tag"`
Bit uint64 `field:"bit"`
Operator int `field:"operator"`
}