Merge branch 'feature/autostore' into develop

This commit is contained in:
eson 2020-09-09 17:25:55 +08:00
commit a848f26d65
32 changed files with 1716 additions and 801 deletions

578
autostore.go Normal file
View File

@ -0,0 +1,578 @@
package intimate
import (
"database/sql"
"encoding/binary"
"fmt"
"log"
"reflect"
"strconv"
"time"
)
// StoreExtractorDB 全局的Extractor DB 库链接
var StoreExtractorDB *Store
// TStreamer 全局的Streamer. 在config init 完成初始化
var TStreamer *Table
// TClog 全局的Clog
var TClog *Table
// TStreamerList 全局的streamer list 这个表存的url. 进去可以找到主播的列表. 便于动态更新
var TStreamerList *Table
/*Store 结构体. 必须使用tag. field 数据库字段标签 uid 唯一id字段标签必须存在
*/
type Store struct {
db *sql.DB
}
// Table 表
type Table struct {
store *Store
name string
setting interface{}
updatesql string
selectsql string
insertsql string
duplicatesql string
}
func NewStore(uri string) *Store {
db, err := sql.Open("mysql", uri)
if err != nil {
panic(err)
}
s := &Store{db: db}
return s
}
// Table 选择表.
func (store *Store) Table(name string) *Table {
table := &Table{store: store}
table.name = name
table.insertsql = `INSERT INTO ` + table.name + `(%s) values(%s)`
table.duplicatesql = `INSERT INTO ` + table.name + `(%s) values(%s) ON DUPLICATE KEY UPDATE %s`
table.updatesql = `UPDATE ` + table.name + ` SET %s WHERE %s = ?`
table.selectsql = `SELECT %s FROM ` + table.name + ` WHERE %s `
return table
}
// Queue mysql 队列结构
type Queue struct {
table *Table
obj reflect.Type
fieldIndex []int
selected string
cond CondWhere
uidname string
uididx int
}
type CondWhere struct {
Condition string
CondArgs []interface{}
}
// OperatorType 字典Operator 标志位的类型
type OperatorType string
const (
// OpOK 正常
OpOK OperatorType = "0"
// OpWAIT 等待处理
OpWAIT OperatorType = "1000"
// OpERROR 错误处理
OpERROR OperatorType = "10000"
)
// ConditionDefault 默认的条件
func ConditionDefault(platform Platform) CondWhere {
return CondWhere{
Condition: "platform = ? and operator = 0 and TIMESTAMPDIFF(MINUTE , update_time, CURRENT_TIMESTAMP()) >= update_interval",
CondArgs: []interface{}{string(platform)},
}
}
// Queue 根据Table生成一个队列. 处理结构. 每次弹出一个 obj 是要处理的结构体 自定义的whereCondition条件
func (t *Table) Queue(obj interface{}, whereCondition CondWhere) *Queue {
q := &Queue{}
q.cond = whereCondition
q.obj = reflect.TypeOf(obj)
q.table = t
q.fieldIndex = []int{} // select 需要配对字段变量的对应index位置
for i := 0; i < q.obj.NumField(); i++ {
field := q.obj.Field(i)
if fname, ok := field.Tag.Lookup("field"); ok {
q.selected += fname + ","
if _, ok := field.Tag.Lookup("uid"); ok {
q.uididx = i
q.uidname = fname
}
q.fieldIndex = append(q.fieldIndex, i)
}
}
q.selected = q.selected[:len(q.selected)-1]
return q
}
// Pop 队列弹出一个数据(任务). 参考队列处理 不支持嵌套.
func (queue *Queue) Pop() (result interface{}, err error) {
db := queue.table.store.db
tx, err := db.Begin()
if err != nil {
return nil, err
}
defer func() {
cerr := tx.Commit()
if cerr != nil {
log.Println(cerr)
log.Println(tx.Rollback())
}
}()
selectsql := `SELECT ` + queue.selected + ` FROM ` + queue.table.name + ` WHERE ` + queue.cond.Condition + " limit 1 for update"
rows, err := tx.Query(selectsql, queue.cond.CondArgs...)
if err != nil {
return nil, fmt.Errorf("table: %s queue is empty", queue.table.name)
}
var fields = make([]interface{}, len(queue.fieldIndex))
for i := range fields {
var iv interface{}
fields[i] = &iv
}
if rows.Next() {
err = rows.Scan(fields...)
if err != nil {
return nil, err
}
}
columntypes, err := rows.ColumnTypes()
if err != nil {
return nil, err
}
if err = rows.Close(); err != nil {
return nil, err
}
_, err = tx.Exec("UPDATE "+queue.table.name+" SET operator = "+string(OpWAIT)+" WHERE "+queue.uidname+" = ?", fields[queue.uididx])
if err != nil {
log.Println(err)
return nil, err
}
obj := reflect.New(queue.obj).Elem()
for i, idx := range queue.fieldIndex {
field := obj.Field(idx)
convert(*fields[i].(*interface{}), field, columntypes[i])
}
return obj.Addr().Interface(), err
}
// Insert nil 不插入. 不支持嵌套. 必须是Ptr类型
func (t *Table) Insert(obj interface{}) error {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
fieldsql := ""
argssql := ""
var args []interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
if flag, ok := ftype.Tag.Lookup("uid"); ok {
if flag == "auto" {
continue
}
}
k := ftype.Type.Kind()
if k == reflect.Ptr || k == reflect.Interface {
if !field.IsNil() {
felem := field.Elem()
args = append(args, felem.Interface())
fieldsql += fname + ","
argssql += "?,"
}
} else {
args = append(args, field.Interface())
fieldsql += fname + ","
argssql += "?,"
}
}
}
ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1])
_, err := t.store.db.Exec(ssql, args...)
return err
}
// DUpdate ON DUPLICATE KEY UPDATE struct. Field对应的tag field字段
type DUpdate struct {
Field string // selected 字段
Value interface{}
}
// InsertOrUpdate nil 不插入. 不支持嵌套. 必须是Ptr类型
func (t *Table) InsertOrUpdate(obj interface{}, updates ...DUpdate) error {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
fieldsql := ""
argssql := ""
var SourceUpdate []*DUpdate
var OtherUpdate []*DUpdate
for _, u := range updates {
if u.Value == nil {
SourceUpdate = append(SourceUpdate, &u)
} else {
OtherUpdate = append(OtherUpdate, &u)
}
}
var args []interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
// if flag, ok := ftype.Tag.Lookup("uid"); ok {
// if flag == "auto" {
// continue
// }
// }
k := ftype.Type.Kind()
if k == reflect.Ptr || k == reflect.Interface {
if !field.IsNil() {
felem := field.Elem()
args = append(args, felem.Interface())
fieldsql += fname + ","
argssql += "?,"
}
} else {
args = append(args, field.Interface())
fieldsql += fname + ","
argssql += "?,"
}
for _, u := range SourceUpdate {
if u.Field == fname {
u.Value = args[len(args)-1]
break
}
}
}
}
var duplicateSet string = ""
for _, u := range SourceUpdate {
duplicateSet += u.Field + " = ?,"
args = append(args, u.Value)
}
for _, u := range OtherUpdate {
duplicateSet += u.Field + " = ?,"
args = append(args, u.Value)
}
ssql := fmt.Sprintf(t.duplicatesql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1], duplicateSet[:len(duplicateSet)-1])
_, err := t.store.db.Exec(ssql, args...)
return err
}
// InsertRetAutoID nil 不插入. 不支持嵌套. 并返回auto uid
func (t *Table) InsertRetAutoID(obj interface{}) (int64, error) {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
fieldsql := ""
argssql := ""
var args []interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
if flag, ok := ftype.Tag.Lookup("uid"); ok {
if flag == "auto" {
continue
}
}
k := ftype.Type.Kind()
if k == reflect.Ptr || k == reflect.Interface {
if !field.IsNil() {
felem := field.Elem()
args = append(args, felem.Interface())
fieldsql += fname + ","
argssql += "?,"
}
} else {
args = append(args, field.Interface())
fieldsql += fname + ","
argssql += "?,"
}
}
}
ssql := fmt.Sprintf(t.insertsql, fieldsql[:len(fieldsql)-1], argssql[:len(argssql)-1])
result, err := t.store.db.Exec(ssql, args...)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// Update 结构体更新
func (t *Table) Update(obj interface{}) error {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
fieldsql := ""
var uidname string
var uidvalue interface{}
var args []interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
if _, ok := ftype.Tag.Lookup("uid"); ok {
if uidvalue != nil {
panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname))
}
uidname = fname
uidvalue = field.Interface()
continue
}
k := ftype.Type.Kind()
if k == reflect.Ptr || k == reflect.Interface {
if !field.IsNil() {
felem := field.Elem()
args = append(args, felem.Interface())
fieldsql += fname + " = ?,"
}
} else {
args = append(args, field.Interface())
fieldsql += fname + " = ?,"
}
}
}
if uidvalue == nil {
panic(fmt.Errorf("update must contain `uid` tag"))
}
usql := fmt.Sprintf(t.updatesql, fieldsql[:len(fieldsql)-1], uidname)
args = append(args, uidvalue)
_, err := t.store.db.Exec(usql, args...)
return err
}
// UpdateError 更新错误数据
func (t *Table) UpdateError(obj interface{}, err error) {
ov := reflect.ValueOf(obj).Elem()
ot := reflect.TypeOf(obj)
var uidname string
var uidvalue interface{}
for i := 0; i < ov.NumField(); i++ {
field := ov.Field(i)
ftype := ot.Elem().Field(i)
if fname, ok := ftype.Tag.Lookup("field"); ok {
if _, ok := ftype.Tag.Lookup("uid"); ok {
if uidvalue != nil {
panic(fmt.Errorf("uid must unique, %s and %s", uidname, fname))
}
uidname = fname
uidvalue = field.Interface()
break
}
}
}
_, dberr := t.store.db.Exec("update "+t.name+" set operator = ?, error_msg = ? where ? = ?", 10000, sql.NullString{String: err.Error(), Valid: true}, uidname, uidvalue)
if dberr != nil {
// email tell owner to deal with
panic(err)
}
}
func assign(field reflect.Value, src interface{}) (bool, error) {
switch field.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
s := asString(src)
i64, err := strconv.ParseInt(s, 10, field.Type().Bits())
if err != nil {
err = strconvErr(err)
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
}
field.SetInt(i64)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
s := asString(src)
u64, err := strconv.ParseUint(s, 10, field.Type().Bits())
if err != nil {
err = strconvErr(err)
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
}
field.SetUint(u64)
case reflect.Float32, reflect.Float64:
s := asString(src)
f64, err := strconv.ParseFloat(s, field.Type().Bits())
if err != nil {
err = strconvErr(err)
return false, fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
}
field.SetFloat(f64)
case reflect.String:
field.SetString(string(src.([]byte)))
case reflect.Interface:
return true, nil
}
return false, nil
}
func convert(src interface{}, field reflect.Value, columntype *sql.ColumnType) error {
// log.Println("type:", field.Type(), ",kind:", field.Kind(), ",field:", field, "scanType:", columntype.ScanType(), "databaseType:", columntype.DatabaseTypeName())
if field.Kind() == reflect.Ptr {
fn := field.Type().Elem() // New 一个 field.Type().Elem() . 然后判断 columntype 转化 成 NullString Time
field.Set(reflect.New(fn))
field = field.Elem()
// log.Println("type:", fn.Name(), ",kind:", field.Kind(), ",fieldtype:", field.Type())
}
// log.Println(field.Kind(), field, reflect.TypeOf(field).Elem().Name(), columntype.ScanType().Kind())
if src == nil {
return fmt.Errorf("converting NULL to %s is unsupported", field.Kind())
}
switch columntype.DatabaseTypeName() {
case "TINYINT", "SMALLINT", "MEDIUMINT", "INT", "BIGINT":
isdefault, err := assign(field, src)
if err != nil {
return err
}
if isdefault {
s := asString(src)
i64, err := strconv.ParseInt(s, 10, 64)
if err != nil {
err = strconvErr(err)
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
}
// reflect.New(reflect.TypeOf(i64))
field.Set(reflect.ValueOf(i64))
}
case "FLOAT", "DOUBLE", "DECIMAL":
isdefault, err := assign(field, src)
if err != nil {
return err
}
if isdefault {
s := asString(src)
f64, err := strconv.ParseFloat(s, 64)
if err != nil {
err = strconvErr(err)
return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, field.Kind(), err)
}
field.Set(reflect.ValueOf(f64))
}
case "BINARY", "VARBINARY", "TINYBLOB", "BLOB", "MEDIUMBLOB", "LONGBLOB", "JSON":
isdefault, err := assign(field, src)
if err != nil {
return err
}
if isdefault {
field.Set(reflect.ValueOf(src.([]byte)))
}
case "CHAR", "VARCHAR", "TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT":
isdefault, err := assign(field, src)
if err != nil {
return err
}
if isdefault {
field.Set(reflect.ValueOf(string(src.([]byte))))
}
case "BIT":
var bits []byte = make([]byte, 8)
copy(bits, src.([]byte))
switch field.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
field.SetInt(int64(binary.LittleEndian.Uint64(bits)))
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
field.SetUint(binary.LittleEndian.Uint64(bits))
case reflect.Interface:
field.Set(reflect.ValueOf(binary.LittleEndian.Uint64(bits)))
}
case "YEAR", "TIME", "DATE", "DATETIME", "TIMESTAMP":
s := src.(time.Time)
switch field.Interface().(type) {
case time.Time:
field.Set(reflect.ValueOf(src))
case string:
field.SetString(s.Format(time.RFC3339Nano))
case []byte:
field.SetBytes([]byte(s.Format(time.RFC3339Nano)))
default:
}
}
// log.Println(fv, columntype.ScanType().Kind())
if iscan, ok := field.Addr().Interface().(sql.Scanner); ok {
err := iscan.Scan(src)
if err != nil {
return err
}
}
return nil
}

View File

@ -1,28 +1,57 @@
package intimate
import (
"log"
"reflect"
"database/sql"
"encoding/json"
"testing"
"time"
)
type Store struct {
}
func NewStore() *Store {
return &Store{}
}
func (store *Store) Update(obj interface{}, objfields ...interface{}) {
ov := reflect.ValueOf(obj)
ot := ov.Type()
log.Printf("%#v,%#v", ov, ot)
log.Println(reflect.Indirect(reflect.ValueOf(objfields[0])))
}
func TestAutoStore(t *testing.T) {
store := NewStore()
streamer := &Streamer{}
uri := "root:@tcp(127.0.0.1:4000)/test?parseTime=true&loc=Local&charset=utf8mb4&collation=utf8mb4_unicode_ci"
store := NewStore(uri)
store.Update(streamer, streamer.Channel)
// queue := store.Table("streamer").Queue(TSreamer{}, CondWhere{Condition: "operator = 0"})
// re, _ := queue.Pop()
// pstreamer := re.(*TSreamer)
// m := make(map[string]interface{})
// json.Unmarshal(pstreamer.Iface.([]byte), &m)
// spew.Println(re.(*TSreamer), m)
streamer := &TSreamer{}
streamer.Uid = 1
streamer.UserID = &sql.NullString{String: "xixi", Valid: true}
streamer.Name = "streamer"
streamer.Operator = 0
streamer.Bit = 0b11
// streamer.Ext = &sql.NullString{String: "ext", Valid: true}
tag := make(map[string]interface{})
tag["json"] = true
tag["name"] = "test"
btag, err := json.Marshal(tag)
if err != nil {
t.Error(err)
}
streamer.Iface = btag
now := time.Now()
streamer.UpdateTime = &now
err = store.Table("streamer").InsertOrUpdate(streamer, DUpdate{Field: "userid"})
if err != nil {
t.Error(err)
}
}
type TSreamer struct {
Uid int `field:"uid" uid:"auto"`
Name interface{} `field:"name"`
UserID *sql.NullString `field:"userid"`
Ext *sql.NullString `field:"ext"`
Iface interface{} `field:"tag"`
Bit uint64 `field:"bit"`
Operator int `field:"operator"`
UpdateTime *time.Time `field:"update_time"`
}

View File

@ -20,4 +20,4 @@ do
projectworkspace=$src/bin/$projectname
cd $path && mkdir $projectworkspace -p && go build -o $projectworkspace/$projectname
cd $src
done
done

View File

@ -18,6 +18,18 @@ func init() {
// storeOpenrec = NewStore()
log.SetFlags(log.Llongfile | log.Ltime)
// StoreExtractorDB 全局的Extractor DB 库链接
StoreExtractorDB = NewStore(InitConfig.Database.ExtractorURI)
// TStreamer 全局的Streamer
TStreamer = StoreExtractorDB.Table("streamer")
// TClog 全局的Clog
TClog = StoreExtractorDB.Table("collect_log")
// TStreamerList 全局的streamer list 这个表存的url. 进去可以找到主播的列表. 便于动态更新
TStreamerList = StoreExtractorDB.Table("streamer_list")
}
// Config 配置

376
convert.go Normal file
View File

@ -0,0 +1,376 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Type conversions for Scan.
package intimate
import (
"database/sql/driver"
"errors"
"fmt"
"reflect"
"strconv"
)
var errNilPtr = errors.New("destination pointer is nil") // embedded in descriptive error
// convertAssignRows copies to dest the value in src, converting it if possible.
// An error is returned if the copy would result in loss of information.
// dest should be a pointer type. If rows is passed in, the rows will
// be used as the parent for any cursor values converted from a
// driver.Rows to a *Rows.
// func convertAssignRows(dest, src interface{}, rows *sql.Rows) error {
// // Common cases, without reflect.
// switch s := src.(type) {
// case string:
// switch d := dest.(type) {
// case *string:
// if d == nil {
// return errNilPtr
// }
// *d = s
// return nil
// case *[]byte:
// if d == nil {
// return errNilPtr
// }
// *d = []byte(s)
// return nil
// case *sql.RawBytes:
// if d == nil {
// return errNilPtr
// }
// *d = append((*d)[:0], s...)
// return nil
// }
// case []byte:
// switch d := dest.(type) {
// case *string:
// if d == nil {
// return errNilPtr
// }
// *d = string(s)
// return nil
// case *interface{}:
// if d == nil {
// return errNilPtr
// }
// *d = cloneBytes(s)
// return nil
// case *[]byte:
// if d == nil {
// return errNilPtr
// }
// *d = cloneBytes(s)
// return nil
// case *sql.RawBytes:
// if d == nil {
// return errNilPtr
// }
// *d = s
// return nil
// }
// case time.Time:
// switch d := dest.(type) {
// case *time.Time:
// *d = s
// return nil
// case *string:
// *d = s.Format(time.RFC3339Nano)
// return nil
// case *[]byte:
// if d == nil {
// return errNilPtr
// }
// *d = []byte(s.Format(time.RFC3339Nano))
// return nil
// case *sql.RawBytes:
// if d == nil {
// return errNilPtr
// }
// *d = s.AppendFormat((*d)[:0], time.RFC3339Nano)
// return nil
// }
// case decimalDecompose:
// switch d := dest.(type) {
// case decimalCompose:
// return d.Compose(s.Decompose(nil))
// }
// case nil:
// switch d := dest.(type) {
// case *interface{}:
// if d == nil {
// return errNilPtr
// }
// *d = nil
// return nil
// case *[]byte:
// if d == nil {
// return errNilPtr
// }
// *d = nil
// return nil
// case *sql.RawBytes:
// if d == nil {
// return errNilPtr
// }
// *d = nil
// return nil
// }
// // The driver is returning a cursor the client may iterate over.
// }
// var sv reflect.Value
// switch d := dest.(type) {
// case *string:
// sv = reflect.ValueOf(src)
// switch sv.Kind() {
// case reflect.Bool,
// reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
// reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64,
// reflect.Float32, reflect.Float64:
// *d = asString(src)
// return nil
// }
// case *[]byte:
// sv = reflect.ValueOf(src)
// if b, ok := asBytes(nil, sv); ok {
// *d = b
// return nil
// }
// case *sql.RawBytes:
// sv = reflect.ValueOf(src)
// if b, ok := asBytes([]byte(*d)[:0], sv); ok {
// *d = sql.RawBytes(b)
// return nil
// }
// case *bool:
// bv, err := driver.Bool.ConvertValue(src)
// if err == nil {
// *d = bv.(bool)
// }
// return err
// case *interface{}:
// *d = src
// return nil
// }
// if scanner, ok := dest.(sql.Scanner); ok {
// return scanner.Scan(src)
// }
// dpv := reflect.ValueOf(dest)
// if dpv.Kind() != reflect.Ptr {
// return errors.New("destination not a pointer")
// }
// if dpv.IsNil() {
// return errNilPtr
// }
// if !sv.IsValid() {
// sv = reflect.ValueOf(src)
// }
// dv := reflect.Indirect(dpv)
// if sv.IsValid() && sv.Type().AssignableTo(dv.Type()) {
// switch b := src.(type) {
// case []byte:
// dv.Set(reflect.ValueOf(cloneBytes(b)))
// default:
// dv.Set(sv)
// }
// return nil
// }
// if dv.Kind() == sv.Kind() && sv.Type().ConvertibleTo(dv.Type()) {
// dv.Set(sv.Convert(dv.Type()))
// return nil
// }
// // The following conversions use a string value as an intermediate representation
// // to convert between various numeric types.
// //
// // This also allows scanning into user defined types such as "type Int int64".
// // For symmetry, also check for string destination types.
// switch dv.Kind() {
// case reflect.Ptr:
// if src == nil {
// dv.Set(reflect.Zero(dv.Type()))
// return nil
// }
// dv.Set(reflect.New(dv.Type().Elem()))
// return convertAssignRows(dv.Interface(), src, rows)
// case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
// if src == nil {
// return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind())
// }
// s := asString(src)
// i64, err := strconv.ParseInt(s, 10, dv.Type().Bits())
// if err != nil {
// err = strconvErr(err)
// return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err)
// }
// dv.SetInt(i64)
// return nil
// case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
// if src == nil {
// return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind())
// }
// s := asString(src)
// u64, err := strconv.ParseUint(s, 10, dv.Type().Bits())
// if err != nil {
// err = strconvErr(err)
// return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err)
// }
// dv.SetUint(u64)
// return nil
// case reflect.Float32, reflect.Float64:
// if src == nil {
// return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind())
// }
// s := asString(src)
// f64, err := strconv.ParseFloat(s, dv.Type().Bits())
// if err != nil {
// err = strconvErr(err)
// return fmt.Errorf("converting driver.Value type %T (%q) to a %s: %v", src, s, dv.Kind(), err)
// }
// dv.SetFloat(f64)
// return nil
// case reflect.String:
// if src == nil {
// return fmt.Errorf("converting NULL to %s is unsupported", dv.Kind())
// }
// switch v := src.(type) {
// case string:
// dv.SetString(v)
// return nil
// case []byte:
// dv.SetString(string(v))
// return nil
// }
// }
// return fmt.Errorf("unsupported Scan, storing driver.Value type %T into type %T", src, dest)
// }
func strconvErr(err error) error {
if ne, ok := err.(*strconv.NumError); ok {
return ne.Err
}
return err
}
func cloneBytes(b []byte) []byte {
if b == nil {
return nil
}
c := make([]byte, len(b))
copy(c, b)
return c
}
func asString(src interface{}) string {
switch v := src.(type) {
case string:
return v
case []byte:
return string(v)
}
rv := reflect.ValueOf(src)
switch rv.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return strconv.FormatInt(rv.Int(), 10)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return strconv.FormatUint(rv.Uint(), 10)
case reflect.Float64:
return strconv.FormatFloat(rv.Float(), 'g', -1, 64)
case reflect.Float32:
return strconv.FormatFloat(rv.Float(), 'g', -1, 32)
case reflect.Bool:
return strconv.FormatBool(rv.Bool())
}
return fmt.Sprintf("%v", src)
}
func asBytes(buf []byte, rv reflect.Value) (b []byte, ok bool) {
switch rv.Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return strconv.AppendInt(buf, rv.Int(), 10), true
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return strconv.AppendUint(buf, rv.Uint(), 10), true
case reflect.Float32:
return strconv.AppendFloat(buf, rv.Float(), 'g', -1, 32), true
case reflect.Float64:
return strconv.AppendFloat(buf, rv.Float(), 'g', -1, 64), true
case reflect.Bool:
return strconv.AppendBool(buf, rv.Bool()), true
case reflect.String:
s := rv.String()
return append(buf, s...), true
}
return
}
var valuerReflectType = reflect.TypeOf((*driver.Valuer)(nil)).Elem()
// callValuerValue returns vr.Value(), with one exception:
// If vr.Value is an auto-generated method on a pointer type and the
// pointer is nil, it would panic at runtime in the panicwrap
// method. Treat it like nil instead.
// Issue 8415.
//
// This is so people can implement driver.Value on value types and
// still use nil pointers to those types to mean nil/NULL, just like
// string/*string.
//
// This function is mirrored in the database/sql/driver package.
func callValuerValue(vr driver.Valuer) (v driver.Value, err error) {
if rv := reflect.ValueOf(vr); rv.Kind() == reflect.Ptr &&
rv.IsNil() &&
rv.Type().Elem().Implements(valuerReflectType) {
return nil, nil
}
return vr.Value()
}
// decimal composes or decomposes a decimal value to and from individual parts.
// There are four parts: a boolean negative flag, a form byte with three possible states
// (finite=0, infinite=1, NaN=2), a base-2 big-endian integer
// coefficient (also known as a significand) as a []byte, and an int32 exponent.
// These are composed into a final value as "decimal = (neg) (form=finite) coefficient * 10 ^ exponent".
// A zero length coefficient is a zero value.
// The big-endian integer coefficient stores the most significant byte first (at coefficient[0]).
// If the form is not finite the coefficient and exponent should be ignored.
// The negative parameter may be set to true for any form, although implementations are not required
// to respect the negative parameter in the non-finite form.
//
// Implementations may choose to set the negative parameter to true on a zero or NaN value,
// but implementations that do not differentiate between negative and positive
// zero or NaN values should ignore the negative parameter without error.
// If an implementation does not support Infinity it may be converted into a NaN without error.
// If a value is set that is larger than what is supported by an implementation,
// an error must be returned.
// Implementations must return an error if a NaN or Infinity is attempted to be set while neither
// are supported.
//
// NOTE(kardianos): This is an experimental interface. See https://golang.org/issue/30870
type decimal interface {
decimalDecompose
decimalCompose
}
type decimalDecompose interface {
// Decompose returns the internal decimal state in parts.
// If the provided buf has sufficient capacity, buf may be returned as the coefficient with
// the value set and length set as appropriate.
Decompose(buf []byte) (form byte, negative bool, coefficient []byte, exponent int32)
}
type decimalCompose interface {
// Compose sets the internal decimal value from parts. If the value cannot be
// represented then an error should be returned.
Compose(form byte, negative bool, coefficient []byte, exponent int32) error
}

View File

@ -11,10 +11,10 @@ import (
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STNimo))
// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STNimo))
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// // estore 解析存储连接实例
// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
func main() {
Execute()
@ -35,14 +35,18 @@ func Execute() {
waitfor := intimate.NewWaitFor(wd)
ps := intimate.NewPerfectShutdown()
queue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.PNimo))
for !ps.IsClose() {
streamer, err := estore.Pop(intimate.PNimo)
istreamer, err := queue.Pop()
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
intimate.TStreamer.UpdateError(istreamer, err)
continue
}
streamer := istreamer.(*intimate.Streamer)
wd.Get(streamer.LiveUrl.String)
// wd.Get("https://www.nimo.tv/live/1253835677")
@ -71,22 +75,25 @@ func Execute() {
clog := &intimate.CollectLog{}
clog.Platform = intimate.PNimo
clog.Followers = sql.NullInt64{Int64: li.Followers, Valid: true}
clog.Views = sql.NullInt64{Int64: li.Views, Valid: true}
clog.UpdateTime = utime
clog.Followers = &sql.NullInt64{Int64: li.Followers, Valid: true}
clog.Views = &sql.NullInt64{Int64: li.Views, Valid: true}
clog.UpdateTime = &utime
clog.StreamerUid = streamer.Uid
var sum int64 = 0
for _, v := range li.Gratuity {
sum += v
}
clog.Gratuity = sql.NullInt64{Int64: sum, Valid: true}
clog.Gratuity = &sql.NullInt64{Int64: sum, Valid: true}
cuid := estore.InsertClog(clog)
cuid, err := intimate.TClog.InsertRetAutoID(clog)
if err != nil {
panic(err)
}
streamer.Channel = sql.NullString{String: li.Channel, Valid: true}
streamer.Channel = &sql.NullString{String: li.Channel, Valid: true}
streamer.LatestLogUid = cuid
streamer.UpdateTime = utime
streamer.UpdateTime = &utime
streamer.Operator = 0
switch {
@ -102,7 +109,11 @@ func Execute() {
streamer.UpdateInterval = 60
}
estore.Update(streamer, "update_interval", streamer.UpdateInterval, "operator", streamer.Operator, "channel", streamer.Channel, "latest_log_uid", streamer.LatestLogUid, "update_time", streamer.UpdateTime)
// estore.Update(streamer, "update_interval", streamer.UpdateInterval, "operator", streamer.Operator, "channel", streamer.Channel, "latest_log_uid", streamer.LatestLogUid, "update_time", streamer.UpdateTime)
err = intimate.TStreamer.Update(streamer)
if err != nil {
panic(err)
}
count++
if count >= countlimit {

View File

@ -9,12 +9,11 @@ import (
"time"
"github.com/474420502/extractor"
"github.com/474420502/gcurl"
"github.com/474420502/requests"
"github.com/tidwall/gjson"
)
var estore = intimate.NewStoreExtractor()
var sstore = intimate.NewStoreSource(string(intimate.STOpenrec))
//UserInfo 提取信息的结构体
type UserInfo struct {
UserName string `exp:"//p[ contains(@class, 'c-global__user__profile__list__name__text')]"`
@ -34,135 +33,230 @@ type UserLive struct {
func Execute() {
ps := intimate.NewPerfectShutdown()
ses := requests.NewSession()
squeue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Popenrec))
var lasterr error = nil
for !ps.IsClose() {
var err error
istreamer, err := squeue.Pop()
source, err := sstore.Pop(intimate.TOpenrecUser, 0)
if err != nil {
// streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析
if istreamer == nil || err != nil {
if err != lasterr {
log.Println(err, lasterr)
lasterr = err
}
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 2)
continue
}
lasterr = nil
streamer := istreamer.(*intimate.Streamer)
sdata := source.Ext.([]byte)
datamap := gjson.ParseBytes(sdata).Map()
userId := *streamer.UserId
source.Operator = int32(intimate.OperatorError)
userId := datamap["var_user_id"].String()
var updateUrl map[string]string
err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url
if err != nil {
log.Println(err)
continue
}
// Check Userid
streamer := &intimate.Streamer{}
streamer.UserId = userId
// streamer.Platform = intimate.Popenrec 不需要更新字段
userUrl := updateUrl["user"]
log.Println(userUrl)
tp := ses.Get(userUrl) // 获取user url页面数据
resp, err := tp.Execute()
streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
htmlUser := datamap["html_user"]
userEtor := extractor.ExtractHtmlString(htmlUser.String())
ui, ok1 := userEtor.GetObjectByTag(UserInfo{}).(*UserInfo)
htmlLive := datamap["html_live"]
liveEtor := extractor.ExtractHtmlString(htmlLive.String())
ul, ok2 := liveEtor.GetObjectByTag(UserLive{}).(*UserLive)
jsonSupporters := datamap["json_supporters"]
clog := &intimate.CollectLog{}
if ok1 {
clog.Followers = sql.NullInt64{Int64: ui.Followers, Valid: true}
clog.Views = sql.NullInt64{Int64: ui.Views, Valid: true}
if ui.Views != 0 {
clog.IsLiveStreaming = true
}
streamer.UserName = sql.NullString{String: ui.UserName, Valid: true}
giverjson := jsonSupporters
var givers []interface{}
var gratuity int64 = 0
for _, v := range giverjson.Array() {
giverSource := gjson.Parse(v.String())
for _, item := range giverSource.Get("data.items").Array() {
givers = append(givers, item.Map())
gratuity += item.Get("total_yells").Int()
}
}
giversbytes, err := json.Marshal(givers)
if err != nil {
log.Println(err)
clog.ErrorMsg = sql.NullString{String: err.Error(), Valid: true}
} else {
clog.Giver = giversbytes
}
clog.Gratuity = sql.NullInt64{Int64: gratuity, Valid: true}
} else {
log.Println("UserInfo may be not exists")
estore.UpdateError(streamer, errors.New("UserInfo may be not exists"))
if err != nil {
log.Println(err)
intimate.TStreamer.UpdateError(streamer, err)
continue
}
//log.Println(ul)
if ok2 {
clog.LiveTitle = sql.NullString{String: ul.Title, Valid: true}
cookies := ses.GetCookies(tp.GetParsedURL())
startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local)
if err != nil {
log.Println(err)
} else {
clog.LiveStartTime = sql.NullTime{Time: startTime.Local(), Valid: true}
duration, err := intimate.ParseDuration(ul.LiveEndTime)
if err != nil {
log.Println(err)
} else {
endTime := startTime.Add(duration)
clog.LiveStartTime = sql.NullTime{Time: endTime.Local(), Valid: true}
scurl := updateUrl["supporters"] //获取打赏者的数据
curl := gcurl.Parse(scurl)
supportersSession := curl.CreateSession()
temporary := curl.CreateTemporary(supportersSession)
supportersSession.SetCookies(temporary.GetParsedURL(), cookies)
var supporters []string
for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码
supportersQuery := temporary.GetQuery()
for _, cookie := range cookies {
if cookie.Name == "uuid" {
supportersQuery.Set("Uuid", cookie.Value)
continue
}
if cookie.Name == "token" {
supportersQuery.Set("Token", cookie.Value)
continue
}
if cookie.Name == "random" {
supportersQuery.Set("Random", cookie.Value)
continue
}
}
if tags, err := json.Marshal(ul.Tags); err == nil {
clog.Tags = tags
} else {
log.Println("json error", ul.Tags, clog.Tags)
supportersQuery.Set("identify_id", userId)
temporary.SetQuery(supportersQuery)
resp, err := temporary.Execute()
if err != nil {
log.Println(err)
}
supporterjson := gjson.ParseBytes(resp.Content())
supporterdata := supporterjson.Get("data") //解析supporters获取的json数据
if supporterdata.Type == gjson.Null {
break
}
supporters = append(supporters, string(resp.Content()))
temporary.QueryParam("page_number").IntAdd(1)
}
streamer.Uid = source.StreamerId.Int64
streamer.UpdateTime = source.UpdateTime
if clog.Tags != nil {
streamer.Tags = clog.Tags
// cookies := cxt.Session().GetCookies(wf.GetParsedURL())
// ext := make(map[string]interface{})
jsonSupporters := supporters
htmlUser := string(resp.Content())
liveUrl := updateUrl["live"]
tp = ses.Get(liveUrl)
resp, err = tp.Execute()
if err != nil {
log.Println(err)
intimate.TStreamer.UpdateError(streamer, err)
continue
}
clog.Platform = intimate.Popenrec
clog.UserId = userId
clog.UpdateTime = source.UpdateTime
clog.StreamerUid = streamer.Uid
htmlLive := string(resp.Content())
// ext["var_user_id"] = userId
logUid := estore.InsertClog(clog)
LiveUrl := "https://www.openrec.tv/live/" + userId
streamer.LiveUrl = sql.NullString{String: LiveUrl, Valid: true}
streamer.LatestLogUid = logUid
// streamer.Operator = 0
log.Println(streamer.UserId)
estore.Update(streamer,
"user_name", streamer.UserName,
"user_id", streamer.UserId,
"live_url", streamer.LiveUrl,
"latest_log_uid", streamer.LatestLogUid,
"update_time", streamer.UpdateTime,
"tags", streamer.Tags,
)
source.Operator = int32(intimate.OperatorExtractorOK)
sstore.UpdateOperator(source)
// streamer.Platform = intimate.Popenrec
streamer.UpdateInterval = 120
streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
streamer.Operator = 0
Extractor(streamer, userId, htmlUser, htmlLive, jsonSupporters)
}
}
func Extractor(streamer *intimate.Streamer, userId string, htmlUser, htmlLive string, jsonSupporters []string) {
// sdata := source.Ext.([]byte)
// datamap := gjson.ParseBytes(sdata).Map()
// userId := datamap["var_user_id"].String()
// streamer := &intimate.Streamer{}
// streamer.UserId = &userId
// streamer.Platform = intimate.Popenrec 不需要更新字段
// htmlUser := datamap["html_user"]
userEtor := extractor.ExtractHtmlString(htmlUser)
ui, ok1 := userEtor.GetObjectByTag(UserInfo{}).(*UserInfo)
// htmlLive := datamap["html_live"]
liveEtor := extractor.ExtractHtmlString(htmlLive)
ul, ok2 := liveEtor.GetObjectByTag(UserLive{}).(*UserLive)
// jsonSupporters := datamap["json_supporters"]
clog := &intimate.CollectLog{}
if ok1 {
clog.Followers = &sql.NullInt64{Int64: ui.Followers, Valid: true}
clog.Views = &sql.NullInt64{Int64: ui.Views, Valid: true}
if ui.Views != 0 {
clog.IsLiveStreaming = true
}
streamer.UserName = &sql.NullString{String: ui.UserName, Valid: true}
// giverjson := jsonSupporters
var givers []interface{}
var gratuity int64 = 0
for _, v := range jsonSupporters {
giverSource := gjson.Parse(v)
for _, item := range giverSource.Get("data.items").Array() {
givers = append(givers, item.Map())
gratuity += item.Get("total_yells").Int()
}
}
giversbytes, err := json.Marshal(givers)
if err != nil {
log.Println(err)
clog.ErrorMsg = &sql.NullString{String: err.Error(), Valid: true}
} else {
clog.Giver = giversbytes
}
clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true}
} else {
log.Println("UserInfo may be not exists")
intimate.TStreamer.UpdateError(streamer, errors.New("UserInfo may be not exists"))
return
}
//log.Println(ul)
if ok2 {
clog.LiveTitle = &sql.NullString{String: ul.Title, Valid: true}
startTime, err := time.ParseInLocation("2006-01-02T15:04:05Z07:00", ul.LiveStartTime, time.Local)
if err != nil {
log.Println(err)
} else {
clog.LiveStartTime = &sql.NullTime{Time: startTime.Local(), Valid: true}
duration, err := intimate.ParseDuration(ul.LiveEndTime)
if err != nil {
log.Println(err)
} else {
endTime := startTime.Add(duration)
clog.LiveEndTime = &sql.NullTime{Time: endTime.Local(), Valid: true}
}
}
if tags, err := json.Marshal(ul.Tags); err == nil {
clog.Tags = tags
} else {
log.Println("json error", ul.Tags, clog.Tags)
}
}
// streamer.Uid = source.StreamerId.Int64
// streamer.UpdateTime = &source.UpdateTime
if clog.Tags != nil {
streamer.Tags = clog.Tags
}
clog.Platform = intimate.Popenrec
clog.UserId = userId
clog.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
clog.StreamerUid = streamer.Uid
logUid, err := intimate.TClog.InsertRetAutoID(clog)
if err != nil {
log.Println(err)
return
}
LiveUrl := "https://www.openrec.tv/live/" + userId
streamer.LiveUrl = &sql.NullString{String: LiveUrl, Valid: true}
streamer.LatestLogUid = logUid
// streamer.Operator = 0
// log.Println(*streamer.UserId)
intimate.TStreamer.Update(streamer)
// source.Operator = int32(intimate.OperatorExtractorOK)
// sstore.UpdateOperator(source)
}

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"intimate"
"log"
"os"
"strconv"
"strings"
"time"
@ -34,26 +33,35 @@ func main() {
ps := intimate.NewPerfectShutdown()
ses := requests.NewSession()
streamerQueue := intimate.TStreamer.Queue(intimate.Streamer{}, intimate.ConditionDefault(intimate.Ptwitcasting))
for !ps.IsClose() {
streamer, err := estore.Pop(intimate.Ptwitcasting)
// streamer, err := estore.Pop(intimate.Ptwitcasting)
isteamer, err := streamerQueue.Pop()
if err != nil {
log.Println(err, streamer)
log.Println(err, isteamer)
continue
}
streamer.LiveUrl = sql.NullString{String: "https://twitcasting.tv/" + streamer.UserId, Valid: true}
streamer := isteamer.(*intimate.Streamer)
streamer.LiveUrl = &sql.NullString{String: "https://twitcasting.tv/" + *streamer.UserId, Valid: true}
resp, err := ses.Get(streamer.LiveUrl.String).Execute()
if err != nil {
estore.UpdateError(streamer, err)
log.Println(err, streamer.UserId)
intimate.TStreamer.UpdateError(streamer, err)
log.Println(err, *streamer.UserId)
continue
}
var ldata *LiveData
f, _ := os.OpenFile("./twistcasting.html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm)
f.Write(resp.Content())
// f, _ := os.OpenFile("./twistcasting.html", os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm)
// f.Write(resp.Content())
etor := extractor.ExtractHtml(resp.Content())
ldata = etor.GetObjectByTag(LiveData{}).(*LiveData)
ildata := etor.GetObjectByTag(LiveData{})
if ildata == nil {
log.Println(streamer.LiveUrl.String)
continue
}
ldata = ildata.(*LiveData)
// ldata.MaxViews = regexp.MustCompile("\\d+").FindString(ldata.MaxViews)
coincount := 0
@ -62,14 +70,14 @@ func main() {
giverurl := streamer.LiveUrl.String + "/backers/" + strconv.Itoa(i)
resp, err = ses.Get(giverurl).Execute()
if err != nil {
estore.UpdateError(streamer, err)
intimate.TStreamer.UpdateError(streamer, err)
log.Panic(err)
}
etor := extractor.ExtractHtml(resp.Content())
xp, err := etor.XPaths("//td[@class='tw-memorial-table-recent-point']")
if err != nil {
estore.UpdateError(streamer, err)
intimate.TStreamer.UpdateError(streamer, err)
log.Panic(err)
}
@ -100,20 +108,20 @@ func main() {
}
streamer.Platform = intimate.Ptwitcasting
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
streamer.UserName = sql.NullString{String: ldata.UserName, Valid: true}
streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
streamer.UserName = &sql.NullString{String: ldata.UserName, Valid: true}
streamer.Operator = 0
streamer.Tags = tags
// streamer.UpdateInterval = 60
clog := &intimate.CollectLog{}
clog.UserId = streamer.UserId
clog.Gratuity = sql.NullInt64{Int64: int64(coincount), Valid: true}
clog.UserId = *streamer.UserId
clog.Gratuity = &sql.NullInt64{Int64: int64(coincount), Valid: true}
clog.Platform = streamer.Platform
clog.UpdateTime = streamer.UpdateTime
clog.LiveTitle = sql.NullString{String: ldata.LiveTitle, Valid: true}
clog.LiveTitle = &sql.NullString{String: ldata.LiveTitle, Valid: true}
clog.Tags = tags
clog.Followers = sql.NullInt64{Int64: int64(ldata.Follower), Valid: true}
clog.Followers = &sql.NullInt64{Int64: int64(ldata.Follower), Valid: true}
switch {
case ldata.Follower <= 100:
streamer.UpdateInterval = 720
@ -125,12 +133,12 @@ func main() {
streamer.UpdateInterval = 120
}
clog.Views = sql.NullInt64{Int64: ldata.MaxViews, Valid: true}
clog.Views = &sql.NullInt64{Int64: ldata.MaxViews, Valid: true}
if ldata.LiveStart != "" {
st, err := time.Parse("Mon, 02 Jan 2006 15:04:05 -0700", ldata.LiveStart)
if err == nil {
startTime := st
clog.LiveStartTime = sql.NullTime{Time: startTime, Valid: true}
clog.LiveStartTime = &sql.NullTime{Time: startTime, Valid: true}
dt, err := strconv.Atoi(ldata.LiveDuration)
liveduration := time.Now().Sub(startTime)
@ -149,7 +157,7 @@ func main() {
if err == nil {
endTime := startTime.Add((time.Duration)(dt) * time.Millisecond)
clog.LiveEndTime = sql.NullTime{Time: endTime, Valid: true}
clog.LiveEndTime = &sql.NullTime{Time: endTime, Valid: true}
} else {
log.Println(err, streamer.UserId)
}
@ -158,8 +166,16 @@ func main() {
}
}
streamer.LatestLogUid = estore.InsertClog(clog)
estore.UpdateStreamer(streamer)
log.Println(streamer.UserId)
clog.StreamerUid = streamer.Uid
uid, err := intimate.TClog.InsertRetAutoID(clog)
if err != nil {
log.Println(err)
continue
}
streamer.LatestLogUid = uid
intimate.TStreamer.Update(streamer)
// estore.UpdateStreamer(streamer)
log.Println(*streamer.UserId)
}
}

View File

@ -3,6 +3,7 @@ package main
import (
"database/sql"
"encoding/json"
"fmt"
"intimate"
"log"
"regexp"
@ -12,134 +13,287 @@ import (
"github.com/tebeka/selenium"
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch))
// // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch))
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// // estore 解析存储连接实例
// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
func main() {
wd := intimate.GetChromeDriver(3030)
wd := intimate.GetChromeDriver(3040)
ps := intimate.NewPerfectShutdown()
queue := intimate.TStreamerList.Queue(intimate.StreamerList{}, intimate.ConditionDefault(intimate.Ptwitch))
var count = 0
var countlimt = 200
var lasterr error = nil
// var lasterr error = nil
// var err error
for !ps.IsClose() {
streamer, err := estore.Pop(intimate.Ptwitch)
if streamer == nil || err != nil {
if err != lasterr {
log.Println(err, lasterr)
lasterr = err
}
time.Sleep(time.Second * 2)
continue
// sourceChannel, err := sstore.Pop(intimate.TTwitchChannel)
isl, err := queue.Pop()
if err != nil {
panic(err)
}
streamerlist := isl.(*intimate.StreamerList)
var updateUrl map[string]string
json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl)
liveUrl := updateUrl["live"]
liveUrl = strings.Replace(liveUrl, "/watchparty", "", -1)
log.Println(liveUrl)
// err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about")
err = wd.Get(liveUrl + "/about")
weburl := streamerlist.Url + "?sort=VIEWER_COUNT"
err = wd.Get(weburl)
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
time.Sleep(time.Second * 5)
// sstore.UpdateError(sourceChannel, err)
intimate.TStreamerList.UpdateError(streamerlist, err)
time.Sleep(time.Second * 10)
continue
}
streamer.LiveUrl = sql.NullString{String: liveUrl, Valid: true}
clog := &intimate.CollectLog{}
clog.UserId = streamer.UserId
clog.Gratuity = sql.NullInt64{Int64: 0, Valid: false}
time.Sleep(time.Millisecond * 500)
err = extractUserName(wd, streamer)
if err != nil {
_, err = wd.FindElement(selenium.ByXPATH, "//a[@data-a-target='browse-channels-button']")
if err == nil {
log.Println(streamer.UserId, "may be cancell")
streamer.Operator = 5
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
estore.UpdateStreamer(streamer)
wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) {
_, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]")
if err != nil {
return false, err
}
continue
}
err = extractFollowers(wd, clog)
if err != nil {
continue
}
return true, nil
}, time.Second*10)
err = extractViews(wd, clog) // views + tags + gratuity
btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']")
if err != nil {
// 不直播时提取礼物 gratuity
wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) {
channelchat, err := wd.FindElement(selenium.ByXPATH, `//a[@data-a-target="channel-home-tab-Chat"]`)
btn, _ := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`)
if (err == nil && channelchat != nil) || btn != nil {
if channelchat != nil {
channelchat.Click()
}
time.Sleep(time.Second)
extractGratuity(wd, clog)
return true, nil
log.Println(err)
continue
}
btn.Click()
var elements []selenium.WebElement
var liveurls = 0
var delayerror = 2
for i := 0; i < 200 && !ps.IsClose(); i++ {
elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]")
if err != nil {
log.Println(err)
break
}
time.Sleep(time.Millisecond * 200)
wd.KeyDown(selenium.EndKey)
time.Sleep(time.Millisecond * 200)
wd.KeyUp(selenium.EndKey)
time.Sleep(time.Millisecond * 2000)
if len(elements) == liveurls {
delayerror--
if delayerror <= 0 {
break
}
return false, nil
}, time.Second*4)
} else {
delayerror = 2
}
liveurls = len(elements)
}
articles, err := wd.FindElements(selenium.ByXPATH, "//article")
if err != nil {
log.Println(err)
continue
}
streamer.Platform = intimate.Ptwitch
clog.Platform = streamer.Platform
clog.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
lastClogId := estore.InsertClog(clog)
var streamers []*intimate.Streamer
for _, article := range articles {
streamer.Operator = 10
streamer.LatestLogUid = lastClogId
if clog.Tags != nil {
streamer.Tags = clog.Tags
e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]")
if err != nil {
log.Println(err)
continue
}
href, err := e.GetAttribute("href")
if err != nil {
log.Println(err)
continue
}
btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button")
if err != nil {
log.Println(err)
continue
}
var tags []string
for _, btn := range btns {
tag, err := btn.GetAttribute("data-a-target")
if err == nil {
tags = append(tags, tag)
}
}
streamer := &intimate.Streamer{}
matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href)
if len(matches) == 2 {
mc := matches[1]
streamer.UserId = &mc
} else {
log.Println(href)
continue
}
jtags, err := json.Marshal(tags)
if err != nil {
log.Println(err)
} else {
streamer.Tags = jtags
}
streamer.Platform = intimate.Ptwitch
streamer.LiveUrl = &sql.NullString{String: href, Valid: true}
streamer.Operator = 0
streamers = append(streamers, streamer)
// if estore.InsertStreamer(streamer) {
// // log.Println("streamer update tags", streamer.Uid, tags)
// if streamer.Tags != nil {
// estore.Update(streamer, "Tags", streamer.Tags)
// }
// }
}
switch fl := clog.Followers.Int64; {
case fl > 100000:
streamer.UpdateInterval = 120
case fl > 10000:
streamer.UpdateInterval = 240 * 2
case fl > 1000:
streamer.UpdateInterval = 360 * 2
case fl > 100:
streamer.UpdateInterval = 720 * 2
case fl > 0:
streamer.UpdateInterval = 1440 * 4
for _, streamer := range streamers {
Extractor(wd, streamer)
if err = intimate.TStreamer.InsertOrUpdate(streamer,
intimate.DUpdate{Field: "tags"},
intimate.DUpdate{Field: "update_time"},
); err != nil {
log.Println(err)
}
}
log.Println("streamer find", len(articles))
if len(articles) == 0 {
intimate.TStreamerList.UpdateError(streamerlist, fmt.Errorf(""))
}
streamer.UpdateTime = clog.UpdateTime
estore.UpdateStreamer(streamer)
count++
if count >= countlimt {
count = 0
// wd.Quit()
wd = intimate.GetChromeDriver(3030)
wd = intimate.GetChromeDriver(3031)
}
}
wd.Close()
wd.Quit()
}
func Extractor(wd selenium.WebDriver, streamer *intimate.Streamer) {
// streamer, err := estore.Pop(intimate.Ptwitch)
// if streamer == nil || err != nil {
// if err != lasterr {
// log.Println(err, lasterr)
// lasterr = err
// }
// time.Sleep(time.Second * 2)
// continue
// }
// var updateUrl map[string]string
// json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl)
liveUrl := streamer.LiveUrl.String
liveUrl = strings.Replace(liveUrl, "/watchparty", "", -1)
log.Println(liveUrl)
// err = wd.Get("https://www.twitch.tv/zoe_0601" + "/about")
err := wd.Get(liveUrl + "/about")
if err != nil {
log.Println(err)
intimate.TStreamer.UpdateError(streamer, err)
time.Sleep(time.Second * 5)
return
}
streamer.LiveUrl = &sql.NullString{String: liveUrl, Valid: true}
clog := &intimate.CollectLog{}
clog.UserId = *streamer.UserId
clog.Gratuity = &sql.NullInt64{Int64: 0, Valid: false}
time.Sleep(time.Millisecond * 500)
err = extractUserName(wd, streamer)
if err != nil {
_, err = wd.FindElement(selenium.ByXPATH, "//a[@data-a-target='browse-channels-button']")
if err == nil {
log.Println(streamer.UserId, "may be cancell")
streamer.Operator = 5
streamer.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
intimate.TStreamer.UpdateError(streamer, fmt.Errorf(""))
}
return
}
err = extractFollowers(wd, clog)
if err != nil {
return
}
err = extractViews(wd, clog) // views + tags + gratuity
if err != nil {
// 不直播时提取礼物 gratuity
wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) {
channelchat, err := wd.FindElement(selenium.ByXPATH, `//a[@data-a-target="channel-home-tab-Chat"]`)
btn, _ := web.FindElement(selenium.ByXPATH, `//button[@data-test-selector="expand-grabber"]`)
if (err == nil && channelchat != nil) || btn != nil {
if channelchat != nil {
channelchat.Click()
}
time.Sleep(time.Second)
extractGratuity(wd, clog)
return true, nil
}
return false, nil
}, time.Second*4)
}
streamer.Platform = intimate.Ptwitch
clog.Platform = streamer.Platform
clog.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
// clog.StreamerUid = streamer.Uid
lastClogId, err := intimate.TClog.InsertRetAutoID(clog)
if err != nil {
log.Println(err)
return
}
streamer.Operator = 10
streamer.LatestLogUid = lastClogId
if clog.Tags != nil {
streamer.Tags = clog.Tags
}
switch fl := clog.Followers.Int64; {
case fl > 100000:
streamer.UpdateInterval = 120
case fl > 10000:
streamer.UpdateInterval = 240 * 2
case fl > 1000:
streamer.UpdateInterval = 360 * 2
case fl > 100:
streamer.UpdateInterval = 720 * 2
case fl > 0:
streamer.UpdateInterval = 1440 * 4
}
streamer.UpdateTime = clog.UpdateTime
// intimate.TStreamer.InsertOrUpdate(streamer)
// count++
// if count >= countlimt {
// count = 0
// // wd.Quit()
// wd = intimate.GetChromeDriver(3030)
// }
}
func extractUserName(wd selenium.WebDriver, streamer *intimate.Streamer) error {
return wd.WaitWithTimeout(func(web selenium.WebDriver) (bool, error) {
label, err := web.FindElement(selenium.ByXPATH, "//a[@class='tw-interactive']//h1")
if err == nil {
if ltxt, err := label.Text(); err == nil && ltxt != "" {
// log.Println("label:", ltxt)
streamer.UserName = sql.NullString{String: ltxt, Valid: true}
streamer.UserName = &sql.NullString{String: ltxt, Valid: true}
return true, nil
}
}
@ -159,7 +313,7 @@ func extractFollowers(wd selenium.WebDriver, clog *intimate.CollectLog) error {
}
followers = regexp.MustCompile(`[\d,]+`).FindString(followers)
fint, _ := intimate.ParseNumber(followers)
clog.Followers = sql.NullInt64{Int64: int64(fint), Valid: true}
clog.Followers = &sql.NullInt64{Int64: int64(fint), Valid: true}
// log.Println("followers: ", followers, fint)
return true, nil
}, 4*time.Second)
@ -172,7 +326,7 @@ func extractViews(wd selenium.WebDriver, clog *intimate.CollectLog) error {
if txt, err := views.Text(); err == nil {
vint, _ := intimate.ParseNumber(txt)
clog.Views = sql.NullInt64{Int64: vint, Valid: true}
clog.Views = &sql.NullInt64{Int64: vint, Valid: true}
// log.Println("views:", txt)
views.Click()
@ -192,7 +346,7 @@ func extractTitle(wd selenium.WebDriver, clog *intimate.CollectLog) error {
title, err := web.FindElement(selenium.ByXPATH, `//h2[@data-a-target='stream-title']`)
if err == nil {
if txt, err := title.Text(); err == nil {
clog.LiveTitle = sql.NullString{String: txt, Valid: true}
clog.LiveTitle = &sql.NullString{String: txt, Valid: true}
return true, nil
}
}
@ -244,7 +398,7 @@ func extractGratuity(wd selenium.WebDriver, clog *intimate.CollectLog) error {
log.Println(err)
}
}
clog.Gratuity = sql.NullInt64{Int64: gratuity, Valid: true}
clog.Gratuity = &sql.NullInt64{Int64: gratuity, Valid: true}
}
return true, nil
}

View File

@ -3,26 +3,25 @@ package intimate
import (
"database/sql"
"reflect"
"time"
)
type GetSet struct {
}
type StreamerList struct {
UrlHash []byte //
Platform Platform //
Url string //
UrlHash string `field:"urlhash" uid:"true"` //
Platform string `field:"platform" ` //
Url string `field:"url" ` //
Label sql.NullString //
Label *sql.NullString `field:"label" ` //
Serialize interface{}
Serialize interface{} `field:"serialize" `
UpdateInterval int32
UpdateTime time.Time //
UpdateInterval int32 `field:"update_interval" `
UpdateTime *sql.NullTime `field:"update_time" ` //
ErrorMsg sql.NullString
Operator int32
ErrorMsg *sql.NullString `field:"error_msg" ` //
Operator int32 `field:"operator" `
LastOperator int32
}
@ -38,25 +37,25 @@ func (sl *StreamerList) Set(field string, value interface{}) {
}
type Streamer struct {
Uid int64 //
Platform Platform //
UserId string //
Uid int64 `field:"uid" uid:"auto"` //
Platform Platform `field:"platform"` //
UserId *string `field:"user_id"` //
UserName sql.NullString //
LiveUrl sql.NullString //
Channel sql.NullString //
Tags interface{}
Ext interface{} //
UserName *sql.NullString `field:"user_name"` //
LiveUrl *sql.NullString `field:"live_url"` //
Channel *sql.NullString `field:"channel"` //
Tags interface{} `field:"tags"`
Ext interface{} `field:"ext"` //
IsUpdateStreamer bool // 更新上面的内容
IsUpdateUrl bool
UpdateInterval int32
UpdateUrl interface{}
LatestLogUid int64
UpdateTime sql.NullTime //
UpdateInterval int32 `field:"update_interval"`
UpdateUrl interface{} `field:"update_url"` // TODO: nil
LatestLogUid int64 `field:"latest_log_uid"`
UpdateTime *sql.NullTime `field:"update_time"` //
ErrorMsg sql.NullString
Operator int32
ErrorMsg *sql.NullString `field:"error_msg"`
Operator int32 `field:"operator"`
LastOperator int32
}
@ -72,24 +71,24 @@ func (ai *Streamer) Set(field string, value interface{}) {
}
type CollectLog struct {
LogUid int64 // 日志id
StreamerUid int64 // StreamerId 表id与
LogUid int64 `field:"log_uid"` // 日志id
StreamerUid int64 `field:"streamer_uid"` // StreamerId 表id与
Platform Platform //
UserId string // 平台的UserId
IsLiveStreaming bool //
IsError bool //
Followers sql.NullInt64 //
Views sql.NullInt64 //
Giver interface{} //
Gratuity sql.NullInt64 //
LiveTitle sql.NullString //
LiveStartTime sql.NullTime //
LiveEndTime sql.NullTime //
UpdateTime sql.NullTime //
Tags interface{}
Ext interface{} //
ErrorMsg sql.NullString //
Platform Platform `field:"platform"` //
UserId string `field:"user_id"` // 平台的UserId
IsLiveStreaming bool `field:"is_live_streaming"` //
IsError bool `field:"is_error"` //
Followers *sql.NullInt64 `field:"followers"` //
Views *sql.NullInt64 `field:"views"` //
Giver interface{} `field:"giver"` //
Gratuity *sql.NullInt64 `field:"gratuity"` //
LiveTitle *sql.NullString `field:"live_title"` //
LiveStartTime *sql.NullTime `field:"live_start_time"` //
LiveEndTime *sql.NullTime `field:"live_end_time"` //
UpdateTime *sql.NullTime `field:"update_time"` //
Tags interface{} `field:"tags"`
Ext interface{} `field:"ext"` //
ErrorMsg *sql.NullString `field:"error_msg"` //
}
// Get Simple Value

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/474420502/focus v0.12.0
github.com/474420502/gcurl v0.4.5
github.com/474420502/requests v1.9.1
github.com/davecgh/go-spew v1.1.1
github.com/go-sql-driver/mysql v1.5.0
github.com/lestrrat-go/libxml2 v0.0.0-20200215080510-6483566f52cb
github.com/tebeka/selenium v0.9.9

View File

@ -379,7 +379,7 @@ func (store *StoreExtractor) InsertStreamer(streamer *Streamer) (isExists bool)
}
}()
streamer.UpdateTime = sql.NullTime{Time: time.Now().Add(-time.Hour * 100000), Valid: true}
streamer.UpdateTime = &sql.NullTime{Time: time.Now().Add(-time.Hour * 100000), Valid: true}
_, err = tx.Exec("INSERT IGNORE INTO "+StreamerTable+"(platform, user_id, user_name, live_url, update_url, tags, update_time) VALUES(?,?,?,?,?,?,?);",
streamer.Platform,
streamer.UserId,

View File

@ -1,12 +0,0 @@
[supervisord]
nodaemon=true
[program:openrec_source]
directory = MYPATH/bin/openrec_task2/
command= MYPATH/bin/openrec_task2/openrec_task2
process_name=%(program_name)s_%(process_num)02d ;多进程名称
numprocs=4 ;启动多个进程
autorestart=true
stderr_logfile=MYPATH/bin/openrec_task2/log
stderr_logfile_maxbytes=0
stopsignal=QUIT

View File

@ -1,5 +1,5 @@
[supervisord]
nodaemon=false
nodaemon=true
[program:twitch_extractor]
environment=DISPLAY=":99"

View File

@ -1,13 +0,0 @@
[supervisord]
nodaemon=false
[program:twitch_extractor_p2]
environment=DISPLAY=":99",pac_proxy=http://localhost:1090/pac1
directory = MYPATH/bin/twitch_extractor
command= MYPATH/bin/twitch_extractor/twitch_extractor
process_name=%(program_name)s_%(process_num)02d ;多进程名称
numprocs=2 ;启动多个进程
autorestart=true
stderr_logfile=MYPATH/bin/twitch_extractor/log
stderr_logfile_maxbytes=0
stopsignal=QUIT

View File

@ -0,0 +1,13 @@
[supervisord]
nodaemon=false
[program:twitch_task1]
environment=DISPLAY=":99"
directory = MYPATH/bin/twitch_task1
command= MYPATH/bin/twitch_task1/twitch_task1
# process_name=%(program_name)s_%(process_num)02d ;多进程名称
# numprocs=1 ;启动多个进程
autorestart=true
stderr_logfile=MYPATH/bin/twitch_task1/log
stderr_logfile_maxbytes=0
stopsignal=QUIT

View File

@ -1,13 +0,0 @@
[supervisord]
nodaemon=false
[program:twitch_task2]
environment=DISPLAY=":99"
directory = MYPATH/bin/twitch_task2
command= MYPATH/bin/twitch_task2/twitch_task2
process_name=%(program_name)s_%(process_num)02d ;多进程名称
numprocs=6 ;启动多个进程
autorestart=true
stderr_logfile=MYPATH/bin/twitch_task2/log
stderr_logfile_maxbytes=0
stopsignal=QUIT

View File

@ -12,7 +12,7 @@ import (
)
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// Execute 执行
func Execute() {
@ -70,13 +70,13 @@ func Execute() {
if userid := room.Get("id").String(); userid != "" {
streamer.UserId = userid
streamer.LiveUrl = sql.NullString{String: "https://www.nimo.tv/live/" + userid, Valid: true}
streamer.UserId = &userid
streamer.LiveUrl = &sql.NullString{String: "https://www.nimo.tv/live/" + userid, Valid: true}
channel := room.Get("roomTypeName").String()
streamer.Channel = sql.NullString{String: channel, Valid: channel != ""}
streamer.Channel = &sql.NullString{String: channel, Valid: channel != ""}
username := room.Get("anchorName").String()
streamer.UserName = sql.NullString{String: username, Valid: username != ""}
streamer.UserName = &sql.NullString{String: username, Valid: username != ""}
if rtags := room.Get("anchorLabels"); rtags.IsArray() {
@ -95,8 +95,11 @@ func Execute() {
}
streamer.UpdateInterval = 120
// spew.Println(streamer)
estore.InsertStreamer(streamer)
err = intimate.TStreamer.Insert(streamer)
if err != nil {
panic(err)
}
} else {
log.Println("userid is null.", room.String())
}

View File

@ -11,11 +11,11 @@ import (
"github.com/tidwall/gjson"
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec))
// // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec))
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// // estore 解析存储连接实例
// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// Execute 执行方法
func Execute() {
@ -71,7 +71,7 @@ func Execute() {
userid := User.Get("channel.id").String()
streamer := &intimate.Streamer{}
streamer.UserId = userid
streamer.UserId = &userid
streamer.Platform = intimate.Popenrec
updateUrl := make(map[string]interface{})
@ -83,15 +83,16 @@ func Execute() {
updateUrlBytes, err := json.Marshal(updateUrl)
if err != nil {
estore.UpdateError(streamer, err)
intimate.TStreamer.UpdateError(streamer, err)
continue
}
streamer.UpdateUrl = updateUrlBytes
estore.InsertStreamer(streamer)
intimate.TStreamer.Insert(streamer)
}
}
log.Println("streamer count:", len(result.Array()), tp.ParsedURL.String())
// 修改url query 参数的page递增. 遍历所有页面
tp.QueryParam("page").IntAdd(1)
time.Sleep(time.Second * 1)

View File

@ -1,2 +0,0 @@
openrec_task2
log

View File

@ -1,5 +0,0 @@
package main
func main() {
Execute()
}

View File

@ -1,164 +0,0 @@
package main
import (
"database/sql"
"encoding/json"
"intimate"
"log"
"time"
"github.com/474420502/gcurl"
"github.com/474420502/requests"
"github.com/tidwall/gjson"
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STOpenrec))
// estore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_extractor.sql
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
func init() {
}
// Execute 执行方法
func Execute() {
ps := intimate.NewPerfectShutdown()
ses := requests.NewSession()
var lasterr error = nil
for !ps.IsClose() {
streamer, err := estore.Pop(intimate.Popenrec) //队列里弹出一个streamer行. 进行解析
if streamer == nil || err != nil {
if err != lasterr {
log.Println(err, lasterr)
lasterr = err
}
time.Sleep(time.Second * 2)
continue
}
userId := streamer.UserId
var updateUrl map[string]string
err = json.Unmarshal(streamer.UpdateUrl.([]byte), &updateUrl) // 反序列化update_url, 里面存了需要采集的url
if err != nil {
log.Println(err)
continue
}
// Check Userid
userUrl := updateUrl["user"]
log.Println(userUrl)
tp := ses.Get(userUrl) // 获取user url页面数据
resp, err := tp.Execute()
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
continue
}
cookies := ses.GetCookies(tp.GetParsedURL())
scurl := updateUrl["supporters"] //获取打赏者的数据
curl := gcurl.Parse(scurl)
supportersSession := curl.CreateSession()
temporary := curl.CreateTemporary(supportersSession)
supportersSession.SetCookies(temporary.GetParsedURL(), cookies)
var supporters []string
for { // supporters 数据需要登录信息. 下面为赋值 supporters链接获取的uid token random码
supportersQuery := temporary.GetQuery()
for _, cookie := range cookies {
if cookie.Name == "uuid" {
supportersQuery.Set("Uuid", cookie.Value)
continue
}
if cookie.Name == "token" {
supportersQuery.Set("Token", cookie.Value)
continue
}
if cookie.Name == "random" {
supportersQuery.Set("Random", cookie.Value)
continue
}
}
supportersQuery.Set("identify_id", userId)
temporary.SetQuery(supportersQuery)
resp, err := temporary.Execute()
if err != nil {
log.Println(err)
}
supporterjson := gjson.ParseBytes(resp.Content())
supporterdata := supporterjson.Get("data") //解析supporters获取的json数据
if supporterdata.Type == gjson.Null {
break
}
supporters = append(supporters, string(resp.Content()))
temporary.QueryParam("page_number").IntAdd(1)
// page := supportersQuery.Get("page_number") // page_number 加1
// pageint, err := strconv.Atoi(page)
// if err != nil {
// log.Println(err)
// break
// }
// pageint++
// page = strconv.Itoa(pageint)
// supportersQuery.Set("page_number", page)
// temporary.SetQuery(supportersQuery)
}
// cookies := cxt.Session().GetCookies(wf.GetParsedURL())
ext := make(map[string]interface{})
ext["json_supporters"] = supporters
ext["html_user"] = string(resp.Content())
liveUrl := updateUrl["live"]
tp = ses.Get(liveUrl)
resp, err = tp.Execute()
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
continue
}
ext["html_live"] = string(resp.Content())
ext["var_user_id"] = userId
extJsonBytes, err := json.Marshal(ext)
if err != nil {
log.Println(err)
estore.UpdateError(streamer, err)
continue
}
// streamer.Platform = intimate.Popenrec
streamer.UpdateInterval = 120
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
streamer.Operator = 0
source := &intimate.Source{}
source.Target = intimate.TOpenrecUser
source.Ext = string(extJsonBytes)
source.StreamerId = sql.NullInt64{Int64: streamer.Uid, Valid: true}
sstore.Insert(source)
estore.UpdateStreamer(streamer)
}
}

View File

@ -1,9 +0,0 @@
package main
import (
"testing"
)
func TestMain(t *testing.T) {
main()
}

View File

@ -21,5 +21,5 @@ func TestUpdateTime(t *testing.T) {
}
func TestMain(t *testing.T) {
main()
}

View File

@ -70,13 +70,15 @@ func Execute() {
if ok := queuedict[wurl]; !ok {
log.Println(wurl)
sl := &intimate.StreamerList{}
sl.Platform = intimate.Ptwitcasting
sl.Platform = string(intimate.Ptwitcasting)
sl.Url = wurl
sl.Operator = 0
sl.UpdateInterval = 120
sl.UpdateTime = time.Now()
sl.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
sl.UrlHash = intimate.GetUrlHash(sl.Url)
estore.InsertStreamerList(sl)
intimate.TStreamerList.Insert(sl)
// estore.InsertStreamerList(sl)
queue.Put(wurl)
queuedict[wurl] = true
@ -102,12 +104,13 @@ func Execute() {
sp.TagUrl[i] = wurl
if ok := queuedict[wurl]; !ok {
sl := &intimate.StreamerList{}
sl.Platform = intimate.Ptwitcasting
sl.Platform = string(intimate.Ptwitcasting)
sl.Url = wurl
sl.Operator = 0
sl.UpdateInterval = 120
sl.UpdateTime = time.Now()
estore.InsertStreamerList(sl)
sl.UpdateTime = &sql.NullTime{Time: time.Now(), Valid: true}
sl.UrlHash = intimate.GetUrlHash(sl.Url)
intimate.TStreamerList.Insert(sl)
queue.Put(wurl)
queuedict[wurl] = true
@ -122,17 +125,19 @@ func Execute() {
// log.Println(sp)
streamer := &intimate.Streamer{}
streamer.Platform = intimate.Ptwitcasting
streamer.LiveUrl = sql.NullString{String: sp.LiveUrl, Valid: true}
streamer.LiveUrl = &sql.NullString{String: sp.LiveUrl, Valid: true}
if btags, err := json.Marshal(sp.Tag); err != nil {
log.Println(err)
} else {
streamer.Tags = btags
}
streamer.UpdateInterval = 120
streamer.UpdateTime = sql.NullTime{Time: time.Now(), Valid: true}
streamer.UserName = sql.NullString{String: sp.UserName, Valid: true}
streamer.UserId = sp.UserId
estore.InsertStreamer(streamer)
streamer.UpdateTime = intimate.GetUpdateTimeNow()
streamer.UserName = &sql.NullString{String: sp.UserName, Valid: true}
streamer.UserId = &sp.UserId
streamer.Operator = 0
// estore.InsertStreamer(streamer)
intimate.TStreamer.Insert(streamer)
}
log.Println("finish remain", queue.Size())

View File

@ -1,7 +1,6 @@
package main
import (
"database/sql"
"intimate"
"log"
"time"
@ -9,100 +8,120 @@ import (
"github.com/tebeka/selenium"
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch))
// // sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
// var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch))
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// // estore 解析存储连接实例
// var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// 获取类型的所有频道链接
// Execute 执行任务
func Execute() {
var err error
wd := intimate.GetChromeDriver(3030)
ps := intimate.NewPerfectShutdown()
weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT"
err = wd.Get(weburl)
if err != nil {
panic(err)
}
for !ps.IsClose() {
var err error
wd := intimate.GetChromeDriver(3030)
cardCondition := func(wd selenium.WebDriver) (bool, error) {
elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
weburl := "https://www.twitch.tv/directory?sort=VIEWER_COUNT"
err = wd.Get(weburl)
if err != nil {
return false, err
}
return len(elements) > 0, nil
}
wd.WaitWithTimeout(cardCondition, time.Second*15)
time.Sleep(time.Second)
e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']")
if err != nil {
panic(err)
}
e.Click()
var hrefs map[string]bool = make(map[string]bool)
var delayerror = 5
for i := 0; i <= 200; i++ {
cards, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
if err != nil {
log.Println(err)
break
panic(err)
}
if len(hrefs) == 0 {
delayerror--
if delayerror <= 0 {
cardCondition := func(wd selenium.WebDriver) (bool, error) {
elements, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
if err != nil {
return false, err
}
return len(elements) > 0, nil
}
wd.WaitWithTimeout(cardCondition, time.Second*15)
time.Sleep(time.Second)
e, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']")
if err != nil {
panic(err)
}
e.Click()
var lasthreflen = 0
var hrefs map[string]bool = make(map[string]bool)
var delayerror = 5
for i := 0; i <= 200; i++ {
cards, err := wd.FindElements(selenium.ByXPATH, "//span/a[contains(@data-a-target,'card-') and @href]")
if err != nil {
log.Println(err)
break
}
} else {
delayerror = 5
}
for ii := 0; ii < 10; ii++ {
for _, card := range cards {
href, err := card.GetAttribute("href")
if err != nil {
log.Println(href, err)
continue
} else {
hrefs[href] = true
if len(hrefs) == lasthreflen {
delayerror--
if delayerror <= 0 {
break
}
} else {
delayerror = 7
}
break
}
lasthreflen = len(hrefs)
if ps.IsClose() {
break
}
for ii := 0; ii < 10; ii++ {
for _, card := range cards {
href, err := card.GetAttribute("href")
if err != nil {
log.Println(href, err)
continue
} else {
hrefs[href] = true
}
}
break
}
if len(cards) > 10 {
log.Println(len(cards))
wd.ExecuteScript(`items = document.evaluate("//div[@data-target='directory-page__card-container']/../self::div[@data-target and @style]", document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null);
if ps.IsClose() {
break
}
if len(cards) > 10 {
log.Println(len(cards))
wd.ExecuteScript(`items = document.evaluate("//div[@data-target='directory-page__card-container']/../self::div[@data-target and @style]", document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null);
for (var i = 0; i < items.snapshotLength - 10; i++) { item = items.snapshotItem(i); item.remove() ;};`, nil)
}
time.Sleep(time.Millisecond * 200)
wd.KeyDown(selenium.EndKey)
time.Sleep(time.Millisecond * 200)
wd.KeyUp(selenium.EndKey)
time.Sleep(time.Millisecond * 2500)
}
time.Sleep(time.Millisecond * 200)
wd.KeyDown(selenium.EndKey)
time.Sleep(time.Millisecond * 200)
wd.KeyUp(selenium.EndKey)
time.Sleep(time.Millisecond * 2500)
for href := range hrefs {
sl := &intimate.StreamerList{}
sl.Url = href
sl.UrlHash = intimate.GetUrlHash(sl.Url)
sl.Platform = string(intimate.Ptwitch)
sl.UpdateTime = intimate.GetUpdateTimeNow()
err := intimate.TStreamerList.Insert(sl)
if err != nil {
log.Println(err)
}
// TODO: Save href
// source := &intimate.Source{}
// source.Source = sql.NullString{String: href, Valid: true}
// source.Operator = 0
// source.Target = intimate.TTwitchChannel
// source.Url = weburl
// sstore.Insert(source)
}
log.Println("hrefs len:", len(hrefs))
// sstore.Deduplicate(intimate.TTwitchChannel, "source")
wd.Close()
wd.Quit()
time.Sleep(time.Minute * 30)
}
for href := range hrefs {
// TODO: Save href
source := &intimate.Source{}
source.Source = sql.NullString{String: href, Valid: true}
source.Operator = 0
source.Target = intimate.TTwitchChannel
source.Url = weburl
sstore.Insert(source)
}
log.Println("hrefs len:", len(hrefs))
sstore.Deduplicate(intimate.TTwitchChannel, "source")
}

View File

@ -1,2 +0,0 @@
twitch_task2
log

View File

@ -1,6 +0,0 @@
package main
func main() {
Execute()
}

View File

@ -1,174 +0,0 @@
package main
import (
"database/sql"
"encoding/json"
"intimate"
"log"
"regexp"
"time"
"github.com/tebeka/selenium"
)
// sstore 源存储实例, 为存储源数据的实现. 表格具体参考sql/intimate_source.sql
var sstore *intimate.StoreSource = intimate.NewStoreSource(string(intimate.STTwitch))
// estore 解析存储连接实例
var estore *intimate.StoreExtractor = intimate.NewStoreExtractor()
// 获取类型的所有频道链接
// Execute 执行任务
func Execute() {
// DELETE FROM source_twitch WHERE uid NOT IN (SELECT MAX(s.uid) FROM (SELECT uid, source FROM source_twitch ) s GROUP BY s.source) ;
//article//a[@data-a-target='preview-card-title-link']
wd := intimate.GetChromeDriver(3030)
defer wd.Quit()
ps := intimate.NewPerfectShutdown()
counter := intimate.NewCounter()
counter.SetMaxLimit(100)
counter.SetMaxToDo(func(olist ...interface{}) error {
owd := olist[0].(*selenium.WebDriver)
if err := (*owd).Quit(); err != nil {
log.Println(err)
}
*owd = intimate.GetChromeDriver(3030)
return nil
}, &wd)
for !ps.IsClose() {
var err error
sourceChannel, err := sstore.Pop(intimate.TTwitchChannel)
if err != nil {
panic(err)
}
weburl := sourceChannel.Source.String + "?sort=VIEWER_COUNT"
err = wd.Get(weburl)
if err != nil {
log.Println(err)
sstore.UpdateError(sourceChannel, err)
time.Sleep(time.Second * 10)
continue
}
wd.WaitWithTimeout(func(wd selenium.WebDriver) (bool, error) {
_, err := wd.FindElement(selenium.ByXPATH, "(//div/p[@class=''])[last()]")
if err != nil {
return false, err
}
return true, nil
}, time.Second*10)
btn, err := wd.FindElement(selenium.ByXPATH, "//button[@data-a-target='browse-sort-menu']")
if err != nil {
log.Println(err)
continue
}
btn.Click()
var elements []selenium.WebElement
var liveurls = 0
var delayerror = 2
for i := 0; i < 200 && !ps.IsClose(); i++ {
elements, err = wd.FindElements(selenium.ByXPATH, "(//div/p[@class=''])[last()]")
if err != nil {
log.Println(err)
break
}
time.Sleep(time.Millisecond * 200)
wd.KeyDown(selenium.EndKey)
time.Sleep(time.Millisecond * 200)
wd.KeyUp(selenium.EndKey)
time.Sleep(time.Millisecond * 2000)
if len(elements) == liveurls {
delayerror--
if delayerror <= 0 {
break
}
} else {
delayerror = 2
}
liveurls = len(elements)
}
articles, err := wd.FindElements(selenium.ByXPATH, "//article")
if err != nil {
log.Println(err)
continue
}
for _, article := range articles {
e, err := article.FindElement(selenium.ByXPATH, ".//a[@data-a-target='preview-card-title-link' and @href]")
if err != nil {
log.Println(err)
continue
}
href, err := e.GetAttribute("href")
if err != nil {
log.Println(err)
continue
}
btns, err := article.FindElements(selenium.ByXPATH, ".//div[@class='tw-full-width tw-inline-block']//button")
if err != nil {
log.Println(err)
continue
}
var tags []string
for _, btn := range btns {
tag, err := btn.GetAttribute("data-a-target")
if err == nil {
tags = append(tags, tag)
}
}
streamer := &intimate.Streamer{}
matches := regexp.MustCompile(`https://www.twitch.tv/(\w+)`).FindStringSubmatch(href)
if len(matches) == 2 {
streamer.UserId = matches[1]
} else {
log.Println(href)
continue
}
jtags, err := json.Marshal(tags)
if err != nil {
log.Println(err)
} else {
streamer.Tags = jtags
}
streamer.Platform = intimate.Ptwitch
updateUrl := make(map[string]string)
updateUrl["live"] = href
streamer.LiveUrl = sql.NullString{String: href, Valid: true}
data, err := json.Marshal(updateUrl)
if err != nil {
log.Println(err)
continue
}
streamer.UpdateUrl = data
streamer.Operator = 0
if estore.InsertStreamer(streamer) {
// log.Println("streamer update tags", streamer.Uid, tags)
if streamer.Tags != nil {
estore.Update(streamer, "Tags", streamer.Tags)
}
}
}
log.Println("streamer find", len(articles))
if len(articles) == 0 {
sourceChannel.Operator = 5
sstore.UpdateOperator(sourceChannel)
}
counter.AddWithReset(1)
}
}

View File

@ -1,7 +0,0 @@
package main
import "testing"
func TestMain(t *testing.T) {
main()
}

View File

@ -1,6 +1,8 @@
package intimate
import (
"crypto/md5"
"database/sql"
"fmt"
"log"
"os"
@ -28,6 +30,15 @@ func init() {
}
// GetUpdateTimeNow 获取马上更新时间. 与第一次连用
func GetUpdateTimeNow() *sql.NullTime {
return &sql.NullTime{Time: time.Now().Add(-time.Hour * 100000), Valid: true}
}
func GetUrlHash(urlstr string) string {
return fmt.Sprintf("%x", md5.Sum([]byte(urlstr)))
}
// ParseNumber 去逗号解析数字
func ParseNumber(num string) (int64, error) {
num = strings.Trim(num, " ")