diff --git a/parser.go b/parser.go index 5a6b811..345e65a 100644 --- a/parser.go +++ b/parser.go @@ -1,8 +1,12 @@ package parser import ( + "encoding/json" + "io/ioutil" "log" + "gopkg.in/yaml.v2" + "474420502.top/test/logdb" ) @@ -101,38 +105,82 @@ type ADParser struct { DeviceInfo DeviceInfo `json:"deviceInfo"` SectionInfo SectionInfo `json:"sectionInfo,omitempty"` +} - IParser +// ToJSON 返回序列化的json格式 +func (adp *ADParser) ToJSON() ([]byte, error) { + data, err := json.Marshal(adp) + if err != nil { + return nil, err + } + return data, nil } // IParser 要实现的解析接口 type IParser interface { - Parser(adstring string) (string, error) + ToDoParser(adstring string) (string, error) GetSpiderID() int + GetLogDB() *logdb.LogDB + GetQueue() *Queue } -type Toutiao struct { +type Parser struct { + que *Queue + db *logdb.LogDB + IParser +} + +// ConfigLogDB 加载LogDB配置 +func (p *Parser) ConfigLogDB(yamlpath string) { + p.db = logdb.New(yamlpath) + p.db.Ping() +} + +// ConfigQueue 加载队列MQ配置 +func (p *Parser) ConfigQueue(yamlpath string) { + p.que = &Queue{} + data, err := ioutil.ReadFile(yamlpath) + if err != nil { + panic(err) + } + err = yaml.Unmarshal(data, p.que) + if err != nil { + panic(err) + } + p.que.Connect() +} + +func (p *Parser) GetLogDB() *logdb.LogDB { + return p.db +} + +func (p *Parser) GetQueue() *Queue { + return p.que } func ADParserServer(adp IParser) { - db := logdb.New("logdb.yaml") - tt := &Toutiao{} - ad := NewADParser(12) - - adresponse := db.ADParserSelect(adp.GetSpiderID()) // select from db + adresponse := adp.GetLogDB().ADParserSelect(adp.GetSpiderID()) // select from db for _, adr := range adresponse { - ParserAndSendMQ(&adr, adp) + parserAndSendMQ(&adr, adp) } } -func ParserAndSendMQ(adr *logdb.ADResonse, adp IParser) error { - - if pjson, err := adp.Parser(adr.Response); err != nil { +func parserAndSendMQ(adr *logdb.ADResonse, adp IParser) error { + pjson, err := adp.ToDoParser(adr.Response) + if err != nil { log.Println(err) + adp.GetLogDB().ADError(adr.UID, err.Error()) return err + } + // send pjson to mq + // update UID status finish + que := adp.GetQueue() + err = que.Push([]byte(pjson)) + if err != nil { + log.Println(err) + adp.GetLogDB().ADError(adr.UID, err.Error()) } else { - // send pjson to mq - // update UID status finish + adp.GetLogDB().ADParserSuccess(adr.UID, pjson) } return nil diff --git a/parser_test.go b/parser_test.go index 0357e16..287eabe 100644 --- a/parser_test.go +++ b/parser_test.go @@ -2,14 +2,68 @@ package parser import ( "encoding/json" + "log" "testing" ) func TestParser(t *testing.T) { - a := NewADParser() + a := NewADParser(12) data, err := json.Marshal(a) if err != nil { t.Error(err) } t.Error(string(data)) } + +type Toutiao struct { + Parser + // Url string // "amqp://aso:Wtu(!Ft559W%>mHK~i@172.19.30.60:5672/test_adspider" +} + +func (tt *Toutiao) GetSpiderID() int { + return 1000073 +} + +func (tt *Toutiao) ToDoParser(adstring string) (string, error) { + adparser := NewADParser(tt.GetSpiderID()) + log.Println(adparser) + data, err := adparser.ToJSON() + if err != nil { + panic(err) + } + log.Println(string(data)) + return "", nil +} + +func TestParserToutiao(t *testing.T) { + tt := Toutiao{} + tt.ConfigLogDB("logdb.yaml") + tt.ConfigQueue("queue.yaml") + ADParserServer(&tt) +} + +func TestMQ(t *testing.T) { + + var l []interface{} + + data := make(map[string]interface{}) + data["fuck"] = "123" + + l = append(l, data) + pjson, err := json.Marshal(&l) + if err != nil { + panic(err) + } + + t.Error(string(pjson)) + + que := NewQueue("amqp://spider:spider@172.16.6.109:5672/test_adspider", "ad_process", "CN") + que.Push(pjson) + + // msgs, _, err := ch.Get("ad_process:CN", true) + // if err != nil { + // panic(err) + // } + // log.Println(string(msgs.Body)) + +} diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..82de7f9 --- /dev/null +++ b/queue.go @@ -0,0 +1,77 @@ +package parser + +import ( + "log" + "runtime" + "sync" + "time" + + "github.com/streadway/amqp" +) + +// Queue Youmi的队列 +type Queue struct { + url string `yaml:"url"` + exchange string `yaml:"exchange"` + routekey string `yaml:"routekey"` + + mutex sync.Mutex + conn *amqp.Connection + + contentType string +} + +// NewQueue youmi queue队列 mq +func NewQueue(url string, exchange, routekey string) *Queue { + que := Queue{url: url, exchange: exchange, routekey: routekey} + que.contentType = "application/json" + + if err := que.Connect(); err != nil { + panic(err) + } + return &que +} + +// SetContentType 设置发送的类型, 让接收端好判断 +func (que *Queue) SetContentType(ct string) { + que.mutex.Lock() + defer que.mutex.Unlock() + + que.contentType = ct +} + +// Connect 链接MQ +func (que *Queue) Connect() error { + que.mutex.Lock() + defer que.mutex.Unlock() + + conn, err := amqp.Dial(que.url) + if err != nil { + return err + } + que.conn = conn + runtime.SetFinalizer(que.conn, func(obj *amqp.Connection) { + que.conn.Close() + }) + return nil +} + +// Push 推送数据 +func (que *Queue) Push(data []byte) error { + que.mutex.Lock() + defer que.mutex.Unlock() + + ch, err := que.conn.Channel() + if err != nil { + log.Println(err) + if err := que.Connect(); err != nil { + panic(err) + } + } + return ch.Publish(que.exchange, que.routekey, false, false, amqp.Publishing{ + DeliveryMode: amqp.Persistent, + Timestamp: time.Now(), + ContentType: que.contentType, + Body: data, + }) +} diff --git a/queue.yaml b/queue.yaml new file mode 100644 index 0000000..dd874a7 --- /dev/null +++ b/queue.yaml @@ -0,0 +1,3 @@ +url: "amqp://spider:spider@172.16.6.109:5672/test_adspider" +exchange: "ad_process" +routekey: "CN" \ No newline at end of file diff --git a/require.sh b/require.sh new file mode 100644 index 0000000..b0d6b00 --- /dev/null +++ b/require.sh @@ -0,0 +1,2 @@ +https_proxy=474420502.top:7070 +go get -u github.com/streadway/amqp \ No newline at end of file diff --git a/test.json b/test.json new file mode 100644 index 0000000..527ceb7 --- /dev/null +++ b/test.json @@ -0,0 +1,79 @@ +[ + { + "app": { + "ab_extra": { + "download_button_style": { + "breathing": 0, + "color": 0 + } + }, + "ad_id": 1619720684522500, + "app_icon": "", + "app_name": "部落帝国2.0", + "app_size": "", + "button_text": "立即下载", + "click_track_url": "", + "click_track_url_list": [], + "description": "女友一次就抽到黑暗先知,进阶到SSR,5分钟灭了对面大R", + "dislike": [ + { + "name": "为什么看到此广告", + "open_url": "sslocal://webview?url=https%3a%2f%2fi.snssdk.com%2fapi%2fad%2ffeedback%2fprivacy%2fpage%3ftype%3dad_dislike\u0026hide_more=1\u0026title=为什么看到此广告" + } + ], + "display_subtype": 3, + "download_count": "", + "download_url": "https://apk.dian5.com/73008_2826.apk", + "filter_words": [ + { + "id": "4:2", + "is_selected": false, + "name": "看过了" + }, + { + "id": "1:217", + "is_selected": false, + "name": "屏蔽:网络游戏类广告" + }, + { + "id": "1:65", + "is_selected": false, + "name": "屏蔽:移动游戏类广告" + }, + { + "id": "1:1", + "is_selected": false, + "name": "屏蔽:应用下载类广告" + } + ], + "hide_if_exists": 1, + "id": 1619729644154919, + "image": { + "height": 720, + "uri": "web.business.image/201812125d0da99029c871f2450cbc94", + "url_list": [ + { + "url": "http://sf3-ttcdn-tos.pstatp.com/obj/web.business.image/201812125d0da99029c871f2450cbc94" + } + ], + "width": 1280 + }, + "image_mode": 3, + "intercept_flag": 2, + "label": "广告", + "log_extra": "{\"ad_price\":\"XBi_JQADxO1cGL8lAAPE7crapB3Kzd0_HoWFVw\",\"convert_component_suspend\":0,\"convert_id\":1619714145899534,\"external_action\":8,\"item_id\":6565653745026204168,\"media_id\":1598790750331917,\"orit\":null,\"req_id\":\"20181218173428010013037207426AF89\",\"rit\":3}", + "open_url": "", + "os_type": "android", + "package": "com.buluodiguo.applog.anfeng", + "rate": -1, + "show_dislike": 1, + "source": "", + "title": "女友一次就抽到黑暗先知,进阶到SSR,5分钟灭了对面大R", + "track_url": "", + "track_url_list": [], + "type": "app", + "web_url": "https://ad.toutiao.com/tetris/page/1619714877979655/" + }, + "is_preview": false + } +] \ No newline at end of file