package intimate import ( "database/sql" "encoding/binary" "fmt" "log" "reflect" "strconv" "time" ) // StoreExtractorDB 全局的Extractor DB 库链接 var StoreExtractorDB *Store // TStreamer 全局的Streamer. 在config init 完成初始化 var TStreamer *Table // TClog 全局的Clog var TClog *Table // TStreamerList 全局的streamer list 这个表存的url. 进去可以找到主播的列表. 便于动态更新 var TStreamerList *Table /*Store 结构体. 必须使用tag. field 数据库字段标签 uid 唯一id字段标签必须存在 */ type Store struct { db *sql.DB } // Table 表 type Table struct { store *Store name string setting interface{} updatesql string selectsql string insertsql string duplicatesql string } func NewStore(uri string) *Store { db, err := sql.Open("mysql", uri) if err != nil { panic(err) } s := &Store{db: db} return s } // Table 选择表. func (store *Store) Table(name string) *Table { table := &Table{store: store} table.name = name table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)` table.duplicatesql = `INSERT INTO ` + table.name + `(%s) values(%s) ON DUPLICATE KEY UPDATE %s` table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?` table.selectsql = `SELECT %s FROM ` + table.name + ` WHERE %s ` return table } // Queue mysql 队列结构 type Queue struct { table *Table obj reflect.Type fieldIndex []int selected string cond CondWhere uidname string uididx int } type CondWhere struct { Condition string CondArgs []interface{} } // OperatorType 字典Operator 标志位的类型 type OperatorType string const ( // OpOK 正常 OpOK OperatorType = "0" // OpWAIT 等待处理 OpWAIT OperatorType = "1000" // OpERROR 错误处理 OpERROR OperatorType = "10000" ) // ConditionDefault 默认的条件 func ConditionDefault(platform Platform) CondWhere { return CondWhere{ Condition: "platform = ? and operator = 0 and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval", CondArgs: []interface{}{string(platform)}, } } // Queue 根据Table生成一个队列. 处理结构. 每次弹出一个 obj 是要处理的结构体 自定义的whereCondition条件 func (t *Table) Queue(obj interface{}, whereCondition CondWhere) *Queue { q := &Queue{} q.cond = whereCondition q.obj = reflect.TypeOf(obj) q.table = t q.fieldIndex = []int{} // select 需要配对字段变量的对应index位置 for i := 0; i < q.obj.NumField(); i++ { field := q.obj.Field(i) if fname, ok := field.Tag.Lookup("field"); ok { q.selected += fname + "," if _, ok := field.Tag.Lookup("uid"); ok { q.uididx = i q.uidname = fname } q.fieldIndex = append(q.fieldIndex, i) } } q.selected = q.selected[:len(q.selected)-1] return q } // Pop 队列弹出一个数据(任务). 参考队列处理 不支持嵌套. func (queue *Queue) Pop() (result interface{}, err error) { db := queue.table.store.db tx, err := db.Begin() if err != nil { return nil, err } defer func() { cerr := tx.Commit() if cerr != nil { log.Println(cerr) log.Println(tx.Rollback()) } }() selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.cond.Condition + " limit 1 for update" rows, err := tx.Query(selectsql, queue.cond.CondArgs...) if err != nil { return nil, fmt.Errorf("table: %s queue is empty", queue.table.name) } var fields = make([]interface{}, len(queue.fieldIndex)) for i := range fields { var iv interface{} fields[i] = &iv } if rows.Next() { err = rows.Scan(fields...) if err != nil { return nil, err } } columntypes, err := rows.ColumnTypes() if err != nil { return nil, err } if err = rows.Close(); err != nil { return nil, err } _, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OpWAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx]) if err != nil { log.Println(err) return nil, err } obj := reflect.New(queue.obj).Elem() for i, idx := range queue.fieldIndex { field := obj.Field(idx) convert(*fields[i].(*interface{}), field, columntypes[i]) } return obj.Addr().Interface(), err } // Insert nil 不插入. 不支持嵌套. 必须是Ptr类型 func (t *Table) Insert(obj interface{}) error { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) fieldsql := "" argssql := "" var args []interface{} for i := 0; i < ov.NumField(); i++ { field := ov.Field(i) ftype := ot.Elem().Field(i) if fname, ok := ftype.Tag.Lookup("field"); ok { if flag, ok := ftype.Tag.Lookup("uid"); ok { if flag == "auto" { continue } } k := ftype.Type.Kind() if k == reflect.Ptr || k == reflect.Interface { if !field.IsNil() { felem := field.Elem() args = append(args, felem.Interface()) fieldsql += fname + "," argssql += "?," } } else { args = append(args, field.Interface()) fieldsql += fname + "," argssql += "?," } } } ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1]) _, err := t.store.db.Exec(ssql, args...) return err } // DUpdate ON DUPLICATE KEY UPDATE struct. Field对应的tag field字段 type DUpdate struct { Field string // selected 字段 Value interface{} } // InsertOrUpdate nil 不插入. 不支持嵌套. 必须是Ptr类型 func (t *Table) InsertOrUpdate(obj interface{}, updates ...DUpdate) error { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) fieldsql := "" argssql := "" var SourceUpdate []*DUpdate var OtherUpdate []*DUpdate for _, u := range updates { if u.Value == nil { SourceUpdate = append(SourceUpdate, &u) } else { OtherUpdate = append(OtherUpdate, &u) } } var args []interface{} for i := 0; i < ov.NumField(); i++ { field := ov.Field(i) ftype := ot.Elem().Field(i) if fname, ok := ftype.Tag.Lookup("field"); ok { // if flag, ok := ftype.Tag.Lookup("uid"); ok { // if flag == "auto" { // continue // } // } k := ftype.Type.Kind() if k == reflect.Ptr || k == reflect.Interface { if !field.IsNil() { felem := field.Elem() args = append(args, felem.Interface()) fieldsql += fname + "," argssql += "?," } } else { args = append(args, field.Interface()) fieldsql += fname + "," argssql += "?," } for _, u := range SourceUpdate { if u.Field == fname { u.Value = args[len(args)-1] break } } } } var duplicateSet string = "" for _, u := range SourceUpdate { duplicateSet += u.Field + " = ?," args = append(args, u.Value) } for _, u := range OtherUpdate { duplicateSet += u.Field + " = ?," args = append(args, u.Value) } ssql := fmt.Sprintf(t.duplicatesql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1], duplicateSet[:len(duplicateSet)-1]) _, err := t.store.db.Exec(ssql, args...) return err } // InsertRetAutoID nil 不插入. 不支持嵌套. 并返回auto uid func (t *Table) InsertRetAutoID(obj interface{}) (int64, error) { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) fieldsql := "" argssql := "" var args []interface{} for i := 0; i < ov.NumField(); i++ { field := ov.Field(i) ftype := ot.Elem().Field(i) if fname, ok := ftype.Tag.Lookup("field"); ok { if flag, ok := ftype.Tag.Lookup("uid"); ok { if flag == "auto" { continue } } k := ftype.Type.Kind() if k == reflect.Ptr || k == reflect.Interface { if !field.IsNil() { felem := field.Elem() args = append(args, felem.Interface()) fieldsql += fname + "," argssql += "?," } } else { args = append(args, field.Interface()) fieldsql += fname + "," argssql += "?," } } } ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1]) result, err := t.store.db.Exec(ssql, args...) if err != nil { return 0, err } return result.LastInsertId() } // Update 结构体更新 func (t *Table) Update(obj interface{}) error { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) fieldsql := "" var uidname string var uidvalue interface{} var args []interface{} for i := 0; i < ov.NumField(); i++ { field := ov.Field(i) ftype := ot.Elem().Field(i) if fname, ok := ftype.Tag.Lookup("field"); ok { if _, ok := ftype.Tag.Lookup("uid"); ok { if uidvalue != nil { panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname)) } uidname = fname uidvalue = field.Interface() continue } k := ftype.Type.Kind() if k == reflect.Ptr || k == reflect.Interface { if !field.IsNil() { felem := field.Elem() args = append(args, felem.Interface()) fieldsql += fname + " = ?," } } else { args = append(args, field.Interface()) fieldsql += fname + " = ?," } } } if uidvalue == nil { panic(fmt.Errorf("update must contain `uid` tag")) } usql := fmt.Sprintf(t.updatesql, fieldsql[:len(fieldsql)-1], uidname) args = append(args, uidvalue) _, err := t.store.db.Exec(usql, args...) return err } // UpdateError 更新错误数据 func (t *Table) UpdateError(obj interface{}, err error) { ov := reflect.ValueOf(obj).Elem() ot := reflect.TypeOf(obj) var uidname string var uidvalue interface{} for i := 0; i < ov.NumField(); i++ { field := ov.Field(i) ftype := ot.Elem().Field(i) if fname, ok := ftype.Tag.Lookup("field"); ok { if _, ok := ftype.Tag.Lookup("uid"); ok { if uidvalue != nil { panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname)) } uidname = fname uidvalue = field.Interface() break } } } _, dberr := t.store.db.Exec("update "+t.name+" set operator = ?, error_msg = ? where "+uidname+" = ?", 10000, sql.NullString{String: err.Error(), Valid: true}, uidvalue) if dberr != nil { // email tell owner to deal with panic(dberr) } } func assign(field reflect.Value, src interface{}) (bool, error) { switch field.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: s := asString(src) i64, err := strconv.ParseInt(s, 10, field.Type().Bits()) if err != nil { err = strconvErr(err) return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err) } field.SetInt(i64) case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: s := asString(src) u64, err := strconv.ParseUint(s, 10, field.Type().Bits()) if err != nil { err = strconvErr(err) return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err) } field.SetUint(u64) case reflect.Float32, reflect.Float64: s := asString(src) f64, err := strconv.ParseFloat(s, field.Type().Bits()) if err != nil { err = strconvErr(err) return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err) } field.SetFloat(f64) case reflect.String: field.SetString(string(src.([]byte))) case reflect.Interface: return true, nil } return false, nil } func convert(src interface{}, field reflect.Value, columntype *sql.ColumnType) error { // log.Println("type:", field.Type(), ",kind:", field.Kind(), ",field:", field, "scanType:", columntype.ScanType(), "databaseType:", columntype.DatabaseTypeName()) if field.Kind() == reflect.Ptr { fn := field.Type().Elem() // New 一个 field.Type().Elem() . 然后判断 columntype 转化 成 NullString Time field.Set(reflect.New(fn)) field = field.Elem() // log.Println("type:", fn.Name(), ",kind:", field.Kind(), ",fieldtype:", field.Type()) } // log.Println(field.Kind(), field, reflect.TypeOf(field).Elem().Name(), columntype.ScanType().Kind()) if src == nil { return fmt.Errorf("converting NULL to %s is unsupported", field.Kind()) } switch columntype.DatabaseTypeName() { case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT": isdefault, err := assign(field, src) if err != nil { return err } if isdefault { s := asString(src) i64, err := strconv.ParseInt(s, 10, 64) if err != nil { err = strconvErr(err) return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err) } // reflect.New(reflect.TypeOf(i64)) field.Set(reflect.ValueOf(i64)) } case "FLOAT", "DOUBLE", "DECIMAL": isdefault, err := assign(field, src) if err != nil { return err } if isdefault { s := asString(src) f64, err := strconv.ParseFloat(s, 64) if err != nil { err = strconvErr(err) return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err) } field.Set(reflect.ValueOf(f64)) } case "BINARY", "VARBINARY", "TINYBLOB", "BLOB", "MEDIUMBLOB", "LONGBLOB", "JSON": isdefault, err := assign(field, src) if err != nil { return err } if isdefault { field.Set(reflect.ValueOf(src.([]byte))) } case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT": isdefault, err := assign(field, src) if err != nil { return err } if isdefault { field.Set(reflect.ValueOf(string(src.([]byte)))) } case "BIT": var bits []byte = make([]byte, 8) copy(bits, src.([]byte)) switch field.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: field.SetInt(int64(binary.LittleEndian.Uint64(bits))) case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: field.SetUint(binary.LittleEndian.Uint64(bits)) case reflect.Interface: field.Set(reflect.ValueOf(binary.LittleEndian.Uint64(bits))) } case "YEAR", "TIME", "DATE", "DATETIME", "TIMESTAMP": s := src.(time.Time) switch field.Interface().(type) { case time.Time: field.Set(reflect.ValueOf(src)) case string: field.SetString(s.Format(time.RFC3339Nano)) case []byte: field.SetBytes([]byte(s.Format(time.RFC3339Nano))) default: } } // log.Println(fv, columntype.ScanType().Kind()) if iscan, ok := field.Addr().Interface().(sql.Scanner); ok { err := iscan.Scan(src) if err != nil { return err } } return nil }