todo: twitcasting 测试 store. 2. 修改streamer. tag 符合要求

This commit is contained in:
eson 2020-09-07 18:12:18 +08:00
parent 7849d09a18
commit 2c557e3b42
7 changed files with 256 additions and 244 deletions

View File

@ -10,6 +10,18 @@ import (
"time" "time"
) )
// StoreExtractorDB 全局的Extractor DB 库链接
var StoreExtractorDB *Store
// TStreamer 全局的Streamer. 在config init 完成初始化
var TStreamer *Table
// TClog 全局的Clog
var TClog *Table
// TStreamerList 全局的streamer list 这个表存的url. 进去可以找到主播的列表. 便于动态更新
var TStreamerList *Table
/*Store 结构体. 必须使用tag. field 数据库字段标签 uid 唯一id字段标签必须存在 /*Store 结构体. 必须使用tag. field 数据库字段标签 uid 唯一id字段标签必须存在
*/ */
type Store struct { type Store struct {
@ -287,12 +299,12 @@ func assign(field reflect.Value, src interface{}) (bool, error) {
func convert(src interface{}, field reflect.Value, columntype *sql.ColumnType) error { func convert(src interface{}, field reflect.Value, columntype *sql.ColumnType) error {
log.Println("type:", field.Type(), ",kind:", field.Kind(), ",field:", field, "scanType:", columntype.ScanType(), "databaseType:", columntype.DatabaseTypeName()) // log.Println("type:", field.Type(), ",kind:", field.Kind(), ",field:", field, "scanType:", columntype.ScanType(), "databaseType:", columntype.DatabaseTypeName())
if field.Kind() == reflect.Ptr { if field.Kind() == reflect.Ptr {
fn := field.Type().Elem() // New 一个 field.Type().Elem() . 然后判断 columntype 转化 成 NullString Time fn := field.Type().Elem() // New 一个 field.Type().Elem() . 然后判断 columntype 转化 成 NullString Time
field.Set(reflect.New(fn)) field.Set(reflect.New(fn))
field = field.Elem() field = field.Elem()
log.Println("type:", fn.Name(), ",kind:", field.Kind(), ",fieldtype:", field.Type()) // log.Println("type:", fn.Name(), ",kind:", field.Kind(), ",fieldtype:", field.Type())
} }
// log.Println(field.Kind(), field, reflect.TypeOf(field).Elem().Name(), columntype.ScanType().Kind()) // log.Println(field.Kind(), field, reflect.TypeOf(field).Elem().Name(), columntype.ScanType().Kind())
@ -302,7 +314,6 @@ func convert(src interface{}, field reflect.Value, columntype *sql.ColumnType) e
} }
switch columntype.DatabaseTypeName() { switch columntype.DatabaseTypeName() {
case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT": case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT":
isdefault, err := assign(field, src) isdefault, err := assign(field, src)
if err != nil { if err != nil {
@ -387,7 +398,6 @@ func convert(src interface{}, field reflect.Value, columntype *sql.ColumnType) e
if err != nil { if err != nil {
return err return err
} }
return nil
} }
return nil return nil

View File

@ -18,6 +18,18 @@ func init() {
// storeOpenrec = NewStore() // storeOpenrec = NewStore()
log.SetFlags(log.Llongfile | log.Ltime) log.SetFlags(log.Llongfile | log.Ltime)
// StoreExtractorDB 全局的Extractor DB 库链接
StoreExtractorDB = NewStore(InitConfig.Database.ExtractorURI)
// TStreamer 全局的Streamer
TStreamer = StoreExtractorDB.Table("streamer")
// TClog 全局的Clog
TClog = StoreExtractorDB.Table("collect_log")
// TStreamerList 全局的streamer list 这个表存的url. 进去可以找到主播的列表. 便于动态更新
TStreamerList = StoreExtractorDB.Table("streamer_list")
} }
// Config 配置 // Config 配置

View File

@ -7,13 +7,11 @@
package intimate package intimate
import ( import (
"database/sql"
"database/sql/driver" "database/sql/driver"
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
"strconv" "strconv"
"time"
) )
var errNilPtr = errors.New("destination pointer is nil") // embedded in descriptive error var errNilPtr = errors.New("destination pointer is nil") // embedded in descriptive error
@ -23,240 +21,240 @@ var errNilPtr = errors.New("destination pointer is nil") // embedded in descript
// dest should be a pointer type. If rows is passed in, the rows will // dest should be a pointer type. If rows is passed in, the rows will
// be used as the parent for any cursor values converted from a // be used as the parent for any cursor values converted from a
// driver.Rows to a *Rows. // driver.Rows to a *Rows.
func convertAssignRows(dest, src interface{}, rows *sql.Rows) error { // func convertAssignRows(dest, src interface{}, rows *sql.Rows) error {
// Common cases, without reflect. // // Common cases, without reflect.
switch s := src.(type) { // switch s := src.(type) {
case string: // case string:
switch d := dest.(type) { // switch d := dest.(type) {
case *string: // case *string:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = s // *d = s
return nil // return nil
case *[]byte: // case *[]byte:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = []byte(s) // *d = []byte(s)
return nil // return nil
case *sql.RawBytes: // case *sql.RawBytes:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = append((*d)[:0], s...) // *d = append((*d)[:0], s...)
return nil // return nil
} // }
case []byte: // case []byte:
switch d := dest.(type) { // switch d := dest.(type) {
case *string: // case *string:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = string(s) // *d = string(s)
return nil // return nil
case *interface{}: // case *interface{}:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = cloneBytes(s) // *d = cloneBytes(s)
return nil // return nil
case *[]byte: // case *[]byte:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = cloneBytes(s) // *d = cloneBytes(s)
return nil // return nil
case *sql.RawBytes: // case *sql.RawBytes:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = s // *d = s
return nil // return nil
} // }
case time.Time: // case time.Time:
switch d := dest.(type) { // switch d := dest.(type) {
case *time.Time: // case *time.Time:
*d = s // *d = s
return nil // return nil
case *string: // case *string:
*d = s.Format(time.RFC3339Nano) // *d = s.Format(time.RFC3339Nano)
return nil // return nil
case *[]byte: // case *[]byte:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = []byte(s.Format(time.RFC3339Nano)) // *d = []byte(s.Format(time.RFC3339Nano))
return nil // return nil
case *sql.RawBytes: // case *sql.RawBytes:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = s.AppendFormat((*d)[:0], time.RFC3339Nano) // *d = s.AppendFormat((*d)[:0], time.RFC3339Nano)
return nil // return nil
} // }
case decimalDecompose: // case decimalDecompose:
switch d := dest.(type) { // switch d := dest.(type) {
case decimalCompose: // case decimalCompose:
return d.Compose(s.Decompose(nil)) // return d.Compose(s.Decompose(nil))
} // }
case nil: // case nil:
switch d := dest.(type) { // switch d := dest.(type) {
case *interface{}: // case *interface{}:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = nil // *d = nil
return nil // return nil
case *[]byte: // case *[]byte:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = nil // *d = nil
return nil // return nil
case *sql.RawBytes: // case *sql.RawBytes:
if d == nil { // if d == nil {
return errNilPtr // return errNilPtr
} // }
*d = nil // *d = nil
return nil // return nil
} // }
// The driver is returning a cursor the client may iterate over. // // The driver is returning a cursor the client may iterate over.
} // }
var sv reflect.Value // var sv reflect.Value
switch d := dest.(type) { // switch d := dest.(type) {
case *string: // case *string:
sv = reflect.ValueOf(src) // sv = reflect.ValueOf(src)
switch sv.Kind() { // switch sv.Kind() {
case reflect.Bool, // case reflect.Bool,
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, // reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, // reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64,
reflect.Float32, reflect.Float64: // reflect.Float32, reflect.Float64:
*d = asString(src) // *d = asString(src)
return nil // return nil
} // }
case *[]byte: // case *[]byte:
sv = reflect.ValueOf(src) // sv = reflect.ValueOf(src)
if b, ok := asBytes(nil, sv); ok { // if b, ok := asBytes(nil, sv); ok {
*d = b // *d = b
return nil // return nil
} // }
case *sql.RawBytes: // case *sql.RawBytes:
sv = reflect.ValueOf(src) // sv = reflect.ValueOf(src)
if b, ok := asBytes([]byte(*d)[:0], sv); ok { // if b, ok := asBytes([]byte(*d)[:0], sv); ok {
*d = sql.RawBytes(b) // *d = sql.RawBytes(b)
return nil // return nil
} // }
case *bool: // case *bool:
bv, err := driver.Bool.ConvertValue(src) // bv, err := driver.Bool.ConvertValue(src)
if err == nil { // if err == nil {
*d = bv.(bool) // *d = bv.(bool)
} // }
return err // return err
case *interface{}: // case *interface{}:
*d = src // *d = src
return nil // return nil
} // }
if scanner, ok := dest.(sql.Scanner); ok { // if scanner, ok := dest.(sql.Scanner); ok {
return scanner.Scan(src) // return scanner.Scan(src)
} // }
dpv := reflect.ValueOf(dest) // dpv := reflect.ValueOf(dest)
if dpv.Kind() != reflect.Ptr { // if dpv.Kind() != reflect.Ptr {
return errors.New("destination not a pointer") // return errors.New("destination not a pointer")
} // }
if dpv.IsNil() { // if dpv.IsNil() {
return errNilPtr // return errNilPtr
} // }
if !sv.IsValid() { // if !sv.IsValid() {
sv = reflect.ValueOf(src) // sv = reflect.ValueOf(src)
} // }
dv := reflect.Indirect(dpv) // dv := reflect.Indirect(dpv)
if sv.IsValid() && sv.Type().AssignableTo(dv.Type()) { // if sv.IsValid() && sv.Type().AssignableTo(dv.Type()) {
switch b := src.(type) { // switch b := src.(type) {
case []byte: // case []byte:
dv.Set(reflect.ValueOf(cloneBytes(b))) // dv.Set(reflect.ValueOf(cloneBytes(b)))
default: // default:
dv.Set(sv) // dv.Set(sv)
} // }
return nil // return nil
} // }
if dv.Kind() == sv.Kind() && sv.Type().ConvertibleTo(dv.Type()) { // if dv.Kind() == sv.Kind() && sv.Type().ConvertibleTo(dv.Type()) {
dv.Set(sv.Convert(dv.Type())) // dv.Set(sv.Convert(dv.Type()))
return nil // return nil
} // }
// The following conversions use a string value as an intermediate representation // // The following conversions use a string value as an intermediate representation
// to convert between various numeric types. // // to convert between various numeric types.
// // //
// This also allows scanning into user defined types such as "type Int int64". // // This also allows scanning into user defined types such as "type Int int64".
// For symmetry, also check for string destination types. // // For symmetry, also check for string destination types.
switch dv.Kind() { // switch dv.Kind() {
case reflect.Ptr: // case reflect.Ptr:
if src == nil { // if src == nil {
dv.Set(reflect.Zero(dv.Type())) // dv.Set(reflect.Zero(dv.Type()))
return nil // return nil
} // }
dv.Set(reflect.New(dv.Type().Elem())) // dv.Set(reflect.New(dv.Type().Elem()))
return convertAssignRows(dv.Interface(), src, rows) // return convertAssignRows(dv.Interface(), src, rows)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: // case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
if src == nil { // if src == nil {
return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind()) // return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind())
} // }
s := asString(src) // s := asString(src)
i64, err := strconv.ParseInt(s, 10, dv.Type().Bits()) // i64, err := strconv.ParseInt(s, 10, dv.Type().Bits())
if err != nil { // if err != nil {
err = strconvErr(err) // err = strconvErr(err)
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err) // return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err)
} // }
dv.SetInt(i64) // dv.SetInt(i64)
return nil // return nil
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: // case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
if src == nil { // if src == nil {
return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind()) // return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind())
} // }
s := asString(src) // s := asString(src)
u64, err := strconv.ParseUint(s, 10, dv.Type().Bits()) // u64, err := strconv.ParseUint(s, 10, dv.Type().Bits())
if err != nil { // if err != nil {
err = strconvErr(err) // err = strconvErr(err)
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err) // return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err)
} // }
dv.SetUint(u64) // dv.SetUint(u64)
return nil // return nil
case reflect.Float32, reflect.Float64: // case reflect.Float32, reflect.Float64:
if src == nil { // if src == nil {
return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind()) // return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind())
} // }
s := asString(src) // s := asString(src)
f64, err := strconv.ParseFloat(s, dv.Type().Bits()) // f64, err := strconv.ParseFloat(s, dv.Type().Bits())
if err != nil { // if err != nil {
err = strconvErr(err) // err = strconvErr(err)
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err) // return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err)
} // }
dv.SetFloat(f64) // dv.SetFloat(f64)
return nil // return nil
case reflect.String: // case reflect.String:
if src == nil { // if src == nil {
return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind()) // return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind())
} // }
switch v := src.(type) { // switch v := src.(type) {
case string: // case string:
dv.SetString(v) // dv.SetString(v)
return nil // return nil
case []byte: // case []byte:
dv.SetString(string(v)) // dv.SetString(string(v))
return nil // return nil
} // }
} // }
return fmt.Errorf("unsupported Scan, storing driver.Value type %T into type %T", src, dest) // return fmt.Errorf("unsupported Scan, storing driver.Value type %T into type %T", src, dest)
} // }
func strconvErr(err error) error { func strconvErr(err error) error {
if ne, ok := err.(*strconv.NumError); ok { if ne, ok := err.(*strconv.NumError); ok {

View File

@ -95,8 +95,8 @@ func Execute() {
} }
streamer.UpdateInterval = 120 streamer.UpdateInterval = 120
// spew.Println(streamer)
estore.InsertStreamer(streamer) estore.InsertStreamer(streamer)
} else { } else {
log.Println("userid is null.", room.String()) log.Println("userid is null.", room.String())
} }

View File

@ -111,16 +111,6 @@ func Execute() {
supporters = append(supporters, string(resp.Content())) supporters = append(supporters, string(resp.Content()))
temporary.QueryParam("page_number").IntAdd(1) temporary.QueryParam("page_number").IntAdd(1)
// page := supportersQuery.Get("page_number") // page_number 加1
// pageint, err := strconv.Atoi(page)
// if err != nil {
// log.Println(err)
// break
// }
// pageint++
// page = strconv.Itoa(pageint)
// supportersQuery.Set("page_number", page)
// temporary.SetQuery(supportersQuery)
} }
// cookies := cxt.Session().GetCookies(wf.GetParsedURL()) // cookies := cxt.Session().GetCookies(wf.GetParsedURL())

View File

@ -21,5 +21,5 @@ func TestUpdateTime(t *testing.T) {
} }
func TestMain(t *testing.T) { func TestMain(t *testing.T) {
main()
} }

View File

@ -132,7 +132,9 @@ func Execute() {
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true} streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
streamer.UserName = sql.NullString{String: sp.UserName, Valid: true} streamer.UserName = sql.NullString{String: sp.UserName, Valid: true}
streamer.UserId = sp.UserId streamer.UserId = sp.UserId
estore.InsertStreamer(streamer) streamer.Operator = 33
// estore.InsertStreamer(streamer)
intimate.TStreamer.Insert(streamer)
} }
log.Println("finish remain", queue.Size()) log.Println("finish remain", queue.Size())