diff --git a/autostore_test.go b/autostore_test.go index 926573a..286fb56 100644 --- a/autostore_test.go +++ b/autostore_test.go @@ -2,8 +2,11 @@ package intimate import ( "database/sql" + "encoding/binary" "fmt" + "log" "reflect" + "strconv" "testing" ) @@ -38,13 +41,209 @@ func (store *Store) Table(name string) *Table { table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)` table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?` - + // table.selectsql = `FROM ` + table.name + `WHERE operator` return table } type Queue struct { - table *Table - obj reflect.Type + table *Table + obj reflect.Type + selected string + condition string + uidname string + uididx int +} + +type OperatorType string + +const ( + OP_OK OperatorType = "0" + OP_WAIT OperatorType = "1000" + OP_ERROR OperatorType = "10000" +) + +func (queue *Queue) Pop() (result interface{}, err error) { + + db := queue.table.store.db + tx, err := db.Begin() + if err != nil { + return nil, err + } + + defer func() { + cerr := tx.Commit() + if cerr != nil { + log.Println(cerr) + log.Println(tx.Rollback()) + } + }() + + selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.condition + " limit 1 for update" + rows, err := tx.Query(selectsql) + // err = rows.Err() + if err != nil { + return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) + } + + var fields = make([]interface{}, queue.obj.NumField()) + for i := range fields { + var iv interface{} + fields[i] = &iv + } + + // if !rows.Next() { + // return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) + // } + if rows.Next() { + err = rows.Scan(fields...) + if err != nil { + return nil, err + } + } + + columntypes, err := rows.ColumnTypes() + if err != nil { + return nil, err + } + + if err = rows.Close(); err != nil { + return nil, err + } + + _, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OP_WAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx]) + if err != nil { + log.Println(err) + return nil, err + } + + obj := reflect.New(queue.obj).Elem() + for i := 0; i < obj.NumField(); i++ { + field := obj.Field(i) + convert(*fields[i].(*interface{}), field, columntypes[i]) + // if field.Type().Kind() == reflect.Ptr { + // field.Elem().Set(reflect.ValueOf(*fields[i].(*interface{}))) + // continue + // } + // field.Set(reflect.ValueOf(*fields[i].(*interface{}))) + } + + return obj.Interface(), err +} + +func TestAutoStore(t *testing.T) { + uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci" + store := NewStore(uri) + + queue := store.Table("streamer").Queue(TSreamer{}, "operator = 0") + t.Error(queue.Pop()) + + streamer := &TSreamer{} + + streamer.Uid = 2 + streamer.UserID = &sql.NullString{String: "xixi", Valid: true} + streamer.Name = "streamer" + streamer.Operator = 0 + streamer.Ext = &sql.NullString{String: "ext", Valid: true} + err := store.Table("streamer").Insert(streamer) + if err != nil { + t.Error(err) + } +} + +func convert(dest interface{}, field reflect.Value, columntype *sql.ColumnType) error { + + log.Println("type:", field.Type(), ",kind:", field.Kind(), ",field:", field) + if field.Kind() == reflect.Ptr { + fn := field.Type().Elem().Name() // New 一个 field.Type().Elem() . 然后判断 columntype 转化 成 NullString Time + field = field.Elem() // + log.Println("type:", fn, ",kind:", field.Kind(), ",field:", field) + } + + if field.Kind() == reflect.Interface { + + } + + // log.Println(field.Kind(), field, reflect.TypeOf(field).Elem().Name(), columntype.ScanType().Kind()) + + switch fv := field.Kind(); fv { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + if dest == nil { + return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) + } + + log.Println(binary.Varint(dest.([]byte))) + s := asString(dest) + i64, err := strconv.ParseInt(s, 10, field.Type().Bits()) + if err != nil { + err = strconvErr(err) + return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err) + } + field.SetInt(i64) + return nil + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if dest == nil { + return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) + } + s := asString(dest) + u64, err := strconv.ParseUint(s, 10, field.Type().Bits()) + if err != nil { + err = strconvErr(err) + return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err) + } + field.SetUint(u64) + return nil + case reflect.Float32, reflect.Float64: + if dest == nil { + return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) + } + s := asString(dest) + f64, err := strconv.ParseFloat(s, field.Type().Bits()) + if err != nil { + err = strconvErr(err) + return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", dest, s, field.Kind(), err) + } + field.SetFloat(f64) + return nil + case reflect.String: + if dest == nil { + return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) + } + switch v := dest.(type) { + case string: + field.SetString(v) + return nil + case []byte: + field.SetString(string(v)) + return nil + } + default: + + // log.Println(fv, columntype.ScanType().Kind()) + } + + return nil +} + +func (t *Table) Queue(obj interface{}, whereCondition string) *Queue { + q := &Queue{} + q.condition = whereCondition + q.obj = reflect.TypeOf(obj) + q.table = t + + for i := 0; i < q.obj.NumField(); i++ { + field := q.obj.Field(i) + if fname, ok := field.Tag.Lookup("field"); ok { + q.selected += fname + "," + if _, ok := field.Tag.Lookup("uid"); ok { + q.uididx = i + q.uidname = fname + } + } + + } + + q.selected = q.selected[:len(q.selected)-1] + return q } func (t *Table) Insert(obj interface{}) error { @@ -140,24 +339,10 @@ func (t *Table) Update(obj interface{}) error { } type TSreamer struct { - Uid int `field:"uid" uid:"auto"` - Name interface{} `field:"name"` - UserID *sql.NullString `field:"userid"` - Ext *sql.NullString `field:"ext"` - Iface interface{} `field:"tag"` -} - -func TestAutoStore(t *testing.T) { - uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci" - store := NewStore(uri) - streamer := &TSreamer{} - - streamer.Uid = 2 - streamer.UserID = nil - streamer.Name = "streamer" - streamer.Ext = &sql.NullString{String: "ext", Valid: true} - err := store.Table("streamer").Update(streamer) - if err != nil { - t.Error(err) - } + Uid int `field:"uid" uid:"auto"` + Name interface{} `field:"name"` + UserID *sql.NullString `field:"userid"` + Ext *sql.NullString `field:"ext"` + Iface interface{} `field:"tag"` + Operator int `field:"operator"` } diff --git a/convert.go b/convert.go new file mode 100644 index 0000000..4e8d149 --- /dev/null +++ b/convert.go @@ -0,0 +1,378 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Type conversions for Scan. + +package intimate + +import ( + "database/sql" + "database/sql/driver" + "errors" + "fmt" + "reflect" + "strconv" + "time" +) + +var errNilPtr = errors.New("destination pointer is nil") // embedded in descriptive error + +// convertAssignRows copies to dest the value in src, converting it if possible. +// An error is returned if the copy would result in loss of information. +// dest should be a pointer type. If rows is passed in, the rows will +// be used as the parent for any cursor values converted from a +// driver.Rows to a *Rows. +func convertAssignRows(dest, src interface{}, rows *sql.Rows) error { + // Common cases, without reflect. + switch s := src.(type) { + case string: + switch d := dest.(type) { + case *string: + if d == nil { + return errNilPtr + } + *d = s + return nil + case *[]byte: + if d == nil { + return errNilPtr + } + *d = []byte(s) + return nil + case *sql.RawBytes: + if d == nil { + return errNilPtr + } + *d = append((*d)[:0], s...) + return nil + } + case []byte: + switch d := dest.(type) { + case *string: + if d == nil { + return errNilPtr + } + *d = string(s) + return nil + case *interface{}: + if d == nil { + return errNilPtr + } + *d = cloneBytes(s) + return nil + case *[]byte: + if d == nil { + return errNilPtr + } + *d = cloneBytes(s) + return nil + case *sql.RawBytes: + if d == nil { + return errNilPtr + } + *d = s + return nil + } + case time.Time: + switch d := dest.(type) { + case *time.Time: + *d = s + return nil + case *string: + *d = s.Format(time.RFC3339Nano) + return nil + case *[]byte: + if d == nil { + return errNilPtr + } + *d = []byte(s.Format(time.RFC3339Nano)) + return nil + case *sql.RawBytes: + if d == nil { + return errNilPtr + } + *d = s.AppendFormat((*d)[:0], time.RFC3339Nano) + return nil + } + case decimalDecompose: + switch d := dest.(type) { + case decimalCompose: + return d.Compose(s.Decompose(nil)) + } + case nil: + switch d := dest.(type) { + case *interface{}: + if d == nil { + return errNilPtr + } + *d = nil + return nil + case *[]byte: + if d == nil { + return errNilPtr + } + *d = nil + return nil + case *sql.RawBytes: + if d == nil { + return errNilPtr + } + *d = nil + return nil + } + // The driver is returning a cursor the client may iterate over. + } + + var sv reflect.Value + + switch d := dest.(type) { + case *string: + sv = reflect.ValueOf(src) + switch sv.Kind() { + case reflect.Bool, + reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, + reflect.Float32, reflect.Float64: + *d = asString(src) + return nil + } + case *[]byte: + sv = reflect.ValueOf(src) + if b, ok := asBytes(nil, sv); ok { + *d = b + return nil + } + case *sql.RawBytes: + sv = reflect.ValueOf(src) + if b, ok := asBytes([]byte(*d)[:0], sv); ok { + *d = sql.RawBytes(b) + return nil + } + case *bool: + bv, err := driver.Bool.ConvertValue(src) + if err == nil { + *d = bv.(bool) + } + return err + case *interface{}: + *d = src + return nil + } + + if scanner, ok := dest.(sql.Scanner); ok { + return scanner.Scan(src) + } + + dpv := reflect.ValueOf(dest) + if dpv.Kind() != reflect.Ptr { + return errors.New("destination not a pointer") + } + if dpv.IsNil() { + return errNilPtr + } + + if !sv.IsValid() { + sv = reflect.ValueOf(src) + } + + dv := reflect.Indirect(dpv) + if sv.IsValid() && sv.Type().AssignableTo(dv.Type()) { + switch b := src.(type) { + case []byte: + dv.Set(reflect.ValueOf(cloneBytes(b))) + default: + dv.Set(sv) + } + return nil + } + + if dv.Kind() == sv.Kind() && sv.Type().ConvertibleTo(dv.Type()) { + dv.Set(sv.Convert(dv.Type())) + return nil + } + + // The following conversions use a string value as an intermediate representation + // to convert between various numeric types. + // + // This also allows scanning into user defined types such as "type Int int64". + // For symmetry, also check for string destination types. + switch dv.Kind() { + case reflect.Ptr: + if src == nil { + dv.Set(reflect.Zero(dv.Type())) + return nil + } + dv.Set(reflect.New(dv.Type().Elem())) + return convertAssignRows(dv.Interface(), src, rows) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + if src == nil { + return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind()) + } + s := asString(src) + i64, err := strconv.ParseInt(s, 10, dv.Type().Bits()) + if err != nil { + err = strconvErr(err) + return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err) + } + dv.SetInt(i64) + return nil + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if src == nil { + return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind()) + } + s := asString(src) + u64, err := strconv.ParseUint(s, 10, dv.Type().Bits()) + if err != nil { + err = strconvErr(err) + return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err) + } + dv.SetUint(u64) + return nil + case reflect.Float32, reflect.Float64: + if src == nil { + return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind()) + } + s := asString(src) + f64, err := strconv.ParseFloat(s, dv.Type().Bits()) + if err != nil { + err = strconvErr(err) + return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err) + } + dv.SetFloat(f64) + return nil + case reflect.String: + if src == nil { + return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind()) + } + switch v := src.(type) { + case string: + dv.SetString(v) + return nil + case []byte: + dv.SetString(string(v)) + return nil + } + } + + return fmt.Errorf("unsupported Scan, storing driver.Value type %T into type %T", src, dest) +} + +func strconvErr(err error) error { + if ne, ok := err.(*strconv.NumError); ok { + return ne.Err + } + return err +} + +func cloneBytes(b []byte) []byte { + if b == nil { + return nil + } + c := make([]byte, len(b)) + copy(c, b) + return c +} + +func asString(src interface{}) string { + switch v := src.(type) { + case string: + return v + case []byte: + return string(v) + } + rv := reflect.ValueOf(src) + switch rv.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return strconv.FormatInt(rv.Int(), 10) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return strconv.FormatUint(rv.Uint(), 10) + case reflect.Float64: + return strconv.FormatFloat(rv.Float(), 'g', -1, 64) + case reflect.Float32: + return strconv.FormatFloat(rv.Float(), 'g', -1, 32) + case reflect.Bool: + return strconv.FormatBool(rv.Bool()) + } + return fmt.Sprintf("%v", src) +} + +func asBytes(buf []byte, rv reflect.Value) (b []byte, ok bool) { + switch rv.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return strconv.AppendInt(buf, rv.Int(), 10), true + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return strconv.AppendUint(buf, rv.Uint(), 10), true + case reflect.Float32: + return strconv.AppendFloat(buf, rv.Float(), 'g', -1, 32), true + case reflect.Float64: + return strconv.AppendFloat(buf, rv.Float(), 'g', -1, 64), true + case reflect.Bool: + return strconv.AppendBool(buf, rv.Bool()), true + case reflect.String: + s := rv.String() + return append(buf, s...), true + } + return +} + +var valuerReflectType = reflect.TypeOf((*driver.Valuer)(nil)).Elem() + +// callValuerValue returns vr.Value(), with one exception: +// If vr.Value is an auto-generated method on a pointer type and the +// pointer is nil, it would panic at runtime in the panicwrap +// method. Treat it like nil instead. +// Issue 8415. +// +// This is so people can implement driver.Value on value types and +// still use nil pointers to those types to mean nil/NULL, just like +// string/*string. +// +// This function is mirrored in the database/sql/driver package. +func callValuerValue(vr driver.Valuer) (v driver.Value, err error) { + if rv := reflect.ValueOf(vr); rv.Kind() == reflect.Ptr && + rv.IsNil() && + rv.Type().Elem().Implements(valuerReflectType) { + return nil, nil + } + return vr.Value() +} + +// decimal composes or decomposes a decimal value to and from individual parts. +// There are four parts: a boolean negative flag, a form byte with three possible states +// (finite=0, infinite=1, NaN=2), a base-2 big-endian integer +// coefficient (also known as a significand) as a []byte, and an int32 exponent. +// These are composed into a final value as "decimal = (neg) (form=finite) coefficient * 10 ^ exponent". +// A zero length coefficient is a zero value. +// The big-endian integer coefficient stores the most significant byte first (at coefficient[0]). +// If the form is not finite the coefficient and exponent should be ignored. +// The negative parameter may be set to true for any form, although implementations are not required +// to respect the negative parameter in the non-finite form. +// +// Implementations may choose to set the negative parameter to true on a zero or NaN value, +// but implementations that do not differentiate between negative and positive +// zero or NaN values should ignore the negative parameter without error. +// If an implementation does not support Infinity it may be converted into a NaN without error. +// If a value is set that is larger than what is supported by an implementation, +// an error must be returned. +// Implementations must return an error if a NaN or Infinity is attempted to be set while neither +// are supported. +// +// NOTE(kardianos): This is an experimental interface. See https://golang.org/issue/30870 +type decimal interface { + decimalDecompose + decimalCompose +} + +type decimalDecompose interface { + // Decompose returns the internal decimal state in parts. + // If the provided buf has sufficient capacity, buf may be returned as the coefficient with + // the value set and length set as appropriate. + Decompose(buf []byte) (form byte, negative bool, coefficient []byte, exponent int32) +} + +type decimalCompose interface { + // Compose sets the internal decimal value from parts. If the value cannot be + // represented then an error should be returned. + Compose(form byte, negative bool, coefficient []byte, exponent int32) error +}