579 lines
14 KiB
Go
579 lines
14 KiB
Go
package intimate
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"log"
|
|
"reflect"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
// StoreExtractorDB 全局的Extractor DB 库链接
|
|
var StoreExtractorDB *Store
|
|
|
|
// TStreamer 全局的Streamer. 在config init 完成初始化
|
|
var TStreamer *Table
|
|
|
|
// TClog 全局的Clog
|
|
var TClog *Table
|
|
|
|
// TStreamerList 全局的streamer list 这个表存的url. 进去可以找到主播的列表. 便于动态更新
|
|
var TStreamerList *Table
|
|
|
|
/*Store 结构体. 必须使用tag. field 数据库字段标签 uid 唯一id字段标签必须存在
|
|
*/
|
|
type Store struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// Table 表
|
|
type Table struct {
|
|
store *Store
|
|
name string
|
|
setting interface{}
|
|
|
|
updatesql string
|
|
selectsql string
|
|
insertsql string
|
|
duplicatesql string
|
|
}
|
|
|
|
func NewStore(uri string) *Store {
|
|
db, err := sql.Open("mysql", uri)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
s := &Store{db: db}
|
|
return s
|
|
}
|
|
|
|
// Table 选择表.
|
|
func (store *Store) Table(name string) *Table {
|
|
table := &Table{store: store}
|
|
table.name = name
|
|
|
|
table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)`
|
|
table.duplicatesql = `INSERT INTO ` + table.name + `(%s) values(%s) ON DUPLICATE KEY UPDATE %s`
|
|
table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?`
|
|
table.selectsql = `SELECT %s FROM ` + table.name + ` WHERE %s `
|
|
return table
|
|
}
|
|
|
|
// Queue mysql 队列结构
|
|
type Queue struct {
|
|
table *Table
|
|
obj reflect.Type
|
|
fieldIndex []int
|
|
selected string
|
|
|
|
cond CondWhere
|
|
|
|
uidname string
|
|
uididx int
|
|
}
|
|
|
|
type CondWhere struct {
|
|
Condition string
|
|
CondArgs []interface{}
|
|
}
|
|
|
|
// OperatorType 字典Operator 标志位的类型
|
|
type OperatorType string
|
|
|
|
const (
|
|
// OpOK 正常
|
|
OpOK OperatorType = "0"
|
|
// OpWAIT 等待处理
|
|
OpWAIT OperatorType = "1000"
|
|
// OpERROR 错误处理
|
|
OpERROR OperatorType = "10000"
|
|
)
|
|
|
|
// ConditionDefault 默认的条件
|
|
func ConditionDefault(platform Platform) CondWhere {
|
|
return CondWhere{
|
|
Condition: "platform = ? and operator = 0 and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval",
|
|
CondArgs: []interface{}{string(platform)},
|
|
}
|
|
}
|
|
|
|
// Queue 根据Table生成一个队列. 处理结构. 每次弹出一个 obj 是要处理的结构体 自定义的whereCondition条件
|
|
func (t *Table) Queue(obj interface{}, whereCondition CondWhere) *Queue {
|
|
q := &Queue{}
|
|
q.cond = whereCondition
|
|
q.obj = reflect.TypeOf(obj)
|
|
q.table = t
|
|
q.fieldIndex = []int{} // select 需要配对字段变量的对应index位置
|
|
|
|
for i := 0; i < q.obj.NumField(); i++ {
|
|
field := q.obj.Field(i)
|
|
if fname, ok := field.Tag.Lookup("field"); ok {
|
|
q.selected += fname + ","
|
|
if _, ok := field.Tag.Lookup("uid"); ok {
|
|
q.uididx = i
|
|
q.uidname = fname
|
|
}
|
|
q.fieldIndex = append(q.fieldIndex, i)
|
|
}
|
|
|
|
}
|
|
|
|
q.selected = q.selected[:len(q.selected)-1]
|
|
return q
|
|
}
|
|
|
|
// Pop 队列弹出一个数据(任务). 参考队列处理 不支持嵌套.
|
|
func (queue *Queue) Pop() (result interface{}, err error) {
|
|
|
|
db := queue.table.store.db
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer func() {
|
|
cerr := tx.Commit()
|
|
if cerr != nil {
|
|
log.Println(cerr)
|
|
log.Println(tx.Rollback())
|
|
}
|
|
}()
|
|
|
|
selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.cond.Condition + " limit 1 for update"
|
|
rows, err := tx.Query(selectsql, queue.cond.CondArgs...)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("table: %s queue is empty", queue.table.name)
|
|
}
|
|
|
|
var fields = make([]interface{}, len(queue.fieldIndex))
|
|
for i := range fields {
|
|
var iv interface{}
|
|
fields[i] = &iv
|
|
}
|
|
|
|
if rows.Next() {
|
|
err = rows.Scan(fields...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
columntypes, err := rows.ColumnTypes()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = rows.Close(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OpWAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx])
|
|
if err != nil {
|
|
log.Println(err)
|
|
return nil, err
|
|
}
|
|
|
|
obj := reflect.New(queue.obj).Elem()
|
|
for i, idx := range queue.fieldIndex {
|
|
field := obj.Field(idx)
|
|
convert(*fields[i].(*interface{}), field, columntypes[i])
|
|
}
|
|
|
|
return obj.Addr().Interface(), err
|
|
}
|
|
|
|
// Insert nil 不插入. 不支持嵌套. 必须是Ptr类型
|
|
func (t *Table) Insert(obj interface{}) error {
|
|
ov := reflect.ValueOf(obj).Elem()
|
|
ot := reflect.TypeOf(obj)
|
|
|
|
fieldsql := ""
|
|
argssql := ""
|
|
|
|
var args []interface{}
|
|
for i := 0; i < ov.NumField(); i++ {
|
|
field := ov.Field(i)
|
|
ftype := ot.Elem().Field(i)
|
|
|
|
if fname, ok := ftype.Tag.Lookup("field"); ok {
|
|
if flag, ok := ftype.Tag.Lookup("uid"); ok {
|
|
if flag == "auto" {
|
|
continue
|
|
}
|
|
}
|
|
|
|
k := ftype.Type.Kind()
|
|
if k == reflect.Ptr || k == reflect.Interface {
|
|
if !field.IsNil() {
|
|
felem := field.Elem()
|
|
args = append(args, felem.Interface())
|
|
fieldsql += fname + ","
|
|
argssql += "?,"
|
|
}
|
|
} else {
|
|
args = append(args, field.Interface())
|
|
fieldsql += fname + ","
|
|
argssql += "?,"
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1])
|
|
_, err := t.store.db.Exec(ssql, args...)
|
|
return err
|
|
}
|
|
|
|
// DUpdate ON DUPLICATE KEY UPDATE struct. Field对应的tag field字段
|
|
type DUpdate struct {
|
|
Field string // selected 字段
|
|
Value interface{}
|
|
}
|
|
|
|
// InsertOrUpdate nil 不插入. 不支持嵌套. 必须是Ptr类型
|
|
func (t *Table) InsertOrUpdate(obj interface{}, updates ...DUpdate) error {
|
|
ov := reflect.ValueOf(obj).Elem()
|
|
ot := reflect.TypeOf(obj)
|
|
|
|
fieldsql := ""
|
|
argssql := ""
|
|
|
|
var SourceUpdate []*DUpdate
|
|
var OtherUpdate []*DUpdate
|
|
for _, u := range updates {
|
|
if u.Value == nil {
|
|
SourceUpdate = append(SourceUpdate, &u)
|
|
} else {
|
|
OtherUpdate = append(OtherUpdate, &u)
|
|
}
|
|
}
|
|
|
|
var args []interface{}
|
|
for i := 0; i < ov.NumField(); i++ {
|
|
field := ov.Field(i)
|
|
ftype := ot.Elem().Field(i)
|
|
|
|
if fname, ok := ftype.Tag.Lookup("field"); ok {
|
|
// if flag, ok := ftype.Tag.Lookup("uid"); ok {
|
|
// if flag == "auto" {
|
|
// continue
|
|
// }
|
|
// }
|
|
|
|
k := ftype.Type.Kind()
|
|
if k == reflect.Ptr || k == reflect.Interface {
|
|
if !field.IsNil() {
|
|
felem := field.Elem()
|
|
args = append(args, felem.Interface())
|
|
fieldsql += fname + ","
|
|
argssql += "?,"
|
|
}
|
|
} else {
|
|
args = append(args, field.Interface())
|
|
fieldsql += fname + ","
|
|
argssql += "?,"
|
|
}
|
|
|
|
for _, u := range SourceUpdate {
|
|
if u.Field == fname {
|
|
u.Value = args[len(args)-1]
|
|
break
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
var duplicateSet string = ""
|
|
for _, u := range SourceUpdate {
|
|
duplicateSet += u.Field + " = ?,"
|
|
args = append(args, u.Value)
|
|
}
|
|
|
|
for _, u := range OtherUpdate {
|
|
duplicateSet += u.Field + " = ?,"
|
|
args = append(args, u.Value)
|
|
}
|
|
|
|
ssql := fmt.Sprintf(t.duplicatesql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1], duplicateSet[:len(duplicateSet)-1])
|
|
_, err := t.store.db.Exec(ssql, args...)
|
|
return err
|
|
}
|
|
|
|
// InsertRetAutoID nil 不插入. 不支持嵌套. 并返回auto uid
|
|
func (t *Table) InsertRetAutoID(obj interface{}) (int64, error) {
|
|
ov := reflect.ValueOf(obj).Elem()
|
|
ot := reflect.TypeOf(obj)
|
|
|
|
fieldsql := ""
|
|
argssql := ""
|
|
|
|
var args []interface{}
|
|
for i := 0; i < ov.NumField(); i++ {
|
|
field := ov.Field(i)
|
|
ftype := ot.Elem().Field(i)
|
|
|
|
if fname, ok := ftype.Tag.Lookup("field"); ok {
|
|
if flag, ok := ftype.Tag.Lookup("uid"); ok {
|
|
if flag == "auto" {
|
|
continue
|
|
}
|
|
}
|
|
|
|
k := ftype.Type.Kind()
|
|
if k == reflect.Ptr || k == reflect.Interface {
|
|
if !field.IsNil() {
|
|
felem := field.Elem()
|
|
args = append(args, felem.Interface())
|
|
fieldsql += fname + ","
|
|
argssql += "?,"
|
|
}
|
|
} else {
|
|
args = append(args, field.Interface())
|
|
fieldsql += fname + ","
|
|
argssql += "?,"
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1])
|
|
result, err := t.store.db.Exec(ssql, args...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.LastInsertId()
|
|
}
|
|
|
|
// Update 结构体更新
|
|
func (t *Table) Update(obj interface{}) error {
|
|
|
|
ov := reflect.ValueOf(obj).Elem()
|
|
ot := reflect.TypeOf(obj)
|
|
|
|
fieldsql := ""
|
|
var uidname string
|
|
var uidvalue interface{}
|
|
|
|
var args []interface{}
|
|
for i := 0; i < ov.NumField(); i++ {
|
|
field := ov.Field(i)
|
|
ftype := ot.Elem().Field(i)
|
|
|
|
if fname, ok := ftype.Tag.Lookup("field"); ok {
|
|
if _, ok := ftype.Tag.Lookup("uid"); ok {
|
|
if uidvalue != nil {
|
|
panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname))
|
|
}
|
|
uidname = fname
|
|
uidvalue = field.Interface()
|
|
continue
|
|
}
|
|
|
|
k := ftype.Type.Kind()
|
|
if k == reflect.Ptr || k == reflect.Interface {
|
|
if !field.IsNil() {
|
|
felem := field.Elem()
|
|
args = append(args, felem.Interface())
|
|
fieldsql += fname + " = ?,"
|
|
}
|
|
} else {
|
|
args = append(args, field.Interface())
|
|
fieldsql += fname + " = ?,"
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if uidvalue == nil {
|
|
panic(fmt.Errorf("update must contain `uid` tag"))
|
|
}
|
|
|
|
usql := fmt.Sprintf(t.updatesql, fieldsql[:len(fieldsql)-1], uidname)
|
|
args = append(args, uidvalue)
|
|
_, err := t.store.db.Exec(usql, args...)
|
|
return err
|
|
}
|
|
|
|
// UpdateError 更新错误数据
|
|
func (t *Table) UpdateError(obj interface{}, err error) {
|
|
|
|
ov := reflect.ValueOf(obj).Elem()
|
|
ot := reflect.TypeOf(obj)
|
|
|
|
var uidname string
|
|
var uidvalue interface{}
|
|
|
|
for i := 0; i < ov.NumField(); i++ {
|
|
field := ov.Field(i)
|
|
ftype := ot.Elem().Field(i)
|
|
|
|
if fname, ok := ftype.Tag.Lookup("field"); ok {
|
|
if _, ok := ftype.Tag.Lookup("uid"); ok {
|
|
if uidvalue != nil {
|
|
panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname))
|
|
}
|
|
uidname = fname
|
|
uidvalue = field.Interface()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
_, dberr := t.store.db.Exec("update "+t.name+" set operator = ?, error_msg = ? where ? = ?", 10000, sql.NullString{String: err.Error(), Valid: true}, uidname, uidvalue)
|
|
if dberr != nil {
|
|
// email tell owner to deal with
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func assign(field reflect.Value, src interface{}) (bool, error) {
|
|
|
|
switch field.Kind() {
|
|
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
|
s := asString(src)
|
|
i64, err := strconv.ParseInt(s, 10, field.Type().Bits())
|
|
if err != nil {
|
|
err = strconvErr(err)
|
|
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
|
|
}
|
|
field.SetInt(i64)
|
|
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
|
|
|
s := asString(src)
|
|
u64, err := strconv.ParseUint(s, 10, field.Type().Bits())
|
|
if err != nil {
|
|
err = strconvErr(err)
|
|
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
|
|
}
|
|
field.SetUint(u64)
|
|
case reflect.Float32, reflect.Float64:
|
|
s := asString(src)
|
|
f64, err := strconv.ParseFloat(s, field.Type().Bits())
|
|
if err != nil {
|
|
err = strconvErr(err)
|
|
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
|
|
}
|
|
field.SetFloat(f64)
|
|
case reflect.String:
|
|
field.SetString(string(src.([]byte)))
|
|
case reflect.Interface:
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func convert(src interface{}, field reflect.Value, columntype *sql.ColumnType) error {
|
|
|
|
// log.Println("type:", field.Type(), ",kind:", field.Kind(), ",field:", field, "scanType:", columntype.ScanType(), "databaseType:", columntype.DatabaseTypeName())
|
|
if field.Kind() == reflect.Ptr {
|
|
fn := field.Type().Elem() // New 一个 field.Type().Elem() . 然后判断 columntype 转化 成 NullString Time
|
|
field.Set(reflect.New(fn))
|
|
field = field.Elem()
|
|
// log.Println("type:", fn.Name(), ",kind:", field.Kind(), ",fieldtype:", field.Type())
|
|
}
|
|
|
|
// log.Println(field.Kind(), field, reflect.TypeOf(field).Elem().Name(), columntype.ScanType().Kind())
|
|
|
|
if src == nil {
|
|
return fmt.Errorf("converting NULL to %s is unsupported", field.Kind())
|
|
}
|
|
|
|
switch columntype.DatabaseTypeName() {
|
|
case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT":
|
|
isdefault, err := assign(field, src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isdefault {
|
|
s := asString(src)
|
|
i64, err := strconv.ParseInt(s, 10, 64)
|
|
if err != nil {
|
|
err = strconvErr(err)
|
|
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
|
|
}
|
|
// reflect.New(reflect.TypeOf(i64))
|
|
field.Set(reflect.ValueOf(i64))
|
|
}
|
|
|
|
case "FLOAT", "DOUBLE", "DECIMAL":
|
|
isdefault, err := assign(field, src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isdefault {
|
|
s := asString(src)
|
|
f64, err := strconv.ParseFloat(s, 64)
|
|
if err != nil {
|
|
err = strconvErr(err)
|
|
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
|
|
}
|
|
field.Set(reflect.ValueOf(f64))
|
|
}
|
|
|
|
case "BINARY", "VARBINARY", "TINYBLOB", "BLOB", "MEDIUMBLOB", "LONGBLOB", "JSON":
|
|
isdefault, err := assign(field, src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isdefault {
|
|
field.Set(reflect.ValueOf(src.([]byte)))
|
|
}
|
|
case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT":
|
|
isdefault, err := assign(field, src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isdefault {
|
|
field.Set(reflect.ValueOf(string(src.([]byte))))
|
|
}
|
|
|
|
case "BIT":
|
|
var bits []byte = make([]byte, 8)
|
|
copy(bits, src.([]byte))
|
|
|
|
switch field.Kind() {
|
|
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
|
field.SetInt(int64(binary.LittleEndian.Uint64(bits)))
|
|
|
|
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
|
field.SetUint(binary.LittleEndian.Uint64(bits))
|
|
|
|
case reflect.Interface:
|
|
field.Set(reflect.ValueOf(binary.LittleEndian.Uint64(bits)))
|
|
|
|
}
|
|
|
|
case "YEAR", "TIME", "DATE", "DATETIME", "TIMESTAMP":
|
|
s := src.(time.Time)
|
|
switch field.Interface().(type) {
|
|
case time.Time:
|
|
field.Set(reflect.ValueOf(src))
|
|
case string:
|
|
field.SetString(s.Format(time.RFC3339Nano))
|
|
case []byte:
|
|
field.SetBytes([]byte(s.Format(time.RFC3339Nano)))
|
|
default:
|
|
}
|
|
}
|
|
|
|
// log.Println(fv, columntype.ScanType().Kind())
|
|
|
|
if iscan, ok := field.Addr().Interface().(sql.Scanner); ok {
|
|
err := iscan.Scan(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|