This commit is contained in:
huangsimin 2018-12-19 17:51:39 +08:00
parent 97a5fdc3dc
commit e6da01f447
6 changed files with 278 additions and 15 deletions

View File

@ -1,8 +1,12 @@
package parser
import (
"encoding/json"
"io/ioutil"
"log"
"gopkg.in/yaml.v2"
"474420502.top/test/logdb"
)
@ -101,38 +105,82 @@ type ADParser struct {
DeviceInfo DeviceInfo `json:"deviceInfo"`
SectionInfo SectionInfo `json:"sectionInfo,omitempty"`
}
IParser
// ToJSON 返回序列化的json格式
func (adp *ADParser) ToJSON() ([]byte, error) {
data, err := json.Marshal(adp)
if err != nil {
return nil, err
}
return data, nil
}
// IParser 要实现的解析接口
type IParser interface {
Parser(adstring string) (string, error)
ToDoParser(adstring string) (string, error)
GetSpiderID() int
GetLogDB() *logdb.LogDB
GetQueue() *Queue
}
type Toutiao struct {
type Parser struct {
que *Queue
db *logdb.LogDB
IParser
}
// ConfigLogDB 加载LogDB配置
func (p *Parser) ConfigLogDB(yamlpath string) {
p.db = logdb.New(yamlpath)
p.db.Ping()
}
// ConfigQueue 加载队列MQ配置
func (p *Parser) ConfigQueue(yamlpath string) {
p.que = &Queue{}
data, err := ioutil.ReadFile(yamlpath)
if err != nil {
panic(err)
}
err = yaml.Unmarshal(data, p.que)
if err != nil {
panic(err)
}
p.que.Connect()
}
func (p *Parser) GetLogDB() *logdb.LogDB {
return p.db
}
func (p *Parser) GetQueue() *Queue {
return p.que
}
func ADParserServer(adp IParser) {
db := logdb.New("logdb.yaml")
tt := &Toutiao{}
ad := NewADParser(12)
adresponse := db.ADParserSelect(adp.GetSpiderID()) // select from db
adresponse := adp.GetLogDB().ADParserSelect(adp.GetSpiderID()) // select from db
for _, adr := range adresponse {
ParserAndSendMQ(&adr, adp)
parserAndSendMQ(&adr, adp)
}
}
func ParserAndSendMQ(adr *logdb.ADResonse, adp IParser) error {
if pjson, err := adp.Parser(adr.Response); err != nil {
func parserAndSendMQ(adr *logdb.ADResonse, adp IParser) error {
pjson, err := adp.ToDoParser(adr.Response)
if err != nil {
log.Println(err)
adp.GetLogDB().ADError(adr.UID, err.Error())
return err
} else {
}
// send pjson to mq
// update UID status finish
que := adp.GetQueue()
err = que.Push([]byte(pjson))
if err != nil {
log.Println(err)
adp.GetLogDB().ADError(adr.UID, err.Error())
} else {
adp.GetLogDB().ADParserSuccess(adr.UID, pjson)
}
return nil

View File

@ -2,14 +2,68 @@ package parser
import (
"encoding/json"
"log"
"testing"
)
func TestParser(t *testing.T) {
a := NewADParser()
a := NewADParser(12)
data, err := json.Marshal(a)
if err != nil {
t.Error(err)
}
t.Error(string(data))
}
type Toutiao struct {
Parser
// Url string // "amqp://aso:Wtu(!Ft559W%>mHK~i@172.19.30.60:5672/test_adspider"
}
func (tt *Toutiao) GetSpiderID() int {
return 1000073
}
func (tt *Toutiao) ToDoParser(adstring string) (string, error) {
adparser := NewADParser(tt.GetSpiderID())
log.Println(adparser)
data, err := adparser.ToJSON()
if err != nil {
panic(err)
}
log.Println(string(data))
return "", nil
}
func TestParserToutiao(t *testing.T) {
tt := Toutiao{}
tt.ConfigLogDB("logdb.yaml")
tt.ConfigQueue("queue.yaml")
ADParserServer(&tt)
}
func TestMQ(t *testing.T) {
var l []interface{}
data := make(map[string]interface{})
data["fuck"] = "123"
l = append(l, data)
pjson, err := json.Marshal(&l)
if err != nil {
panic(err)
}
t.Error(string(pjson))
que := NewQueue("amqp://spider:spider@172.16.6.109:5672/test_adspider", "ad_process", "CN")
que.Push(pjson)
// msgs, _, err := ch.Get("ad_process:CN", true)
// if err != nil {
// panic(err)
// }
// log.Println(string(msgs.Body))
}

77
queue.go Normal file
View File

@ -0,0 +1,77 @@
package parser
import (
"log"
"runtime"
"sync"
"time"
"github.com/streadway/amqp"
)
// Queue Youmi的队列
type Queue struct {
url string `yaml:"url"`
exchange string `yaml:"exchange"`
routekey string `yaml:"routekey"`
mutex sync.Mutex
conn *amqp.Connection
contentType string
}
// NewQueue youmi queue队列 mq
func NewQueue(url string, exchange, routekey string) *Queue {
que := Queue{url: url, exchange: exchange, routekey: routekey}
que.contentType = "application/json"
if err := que.Connect(); err != nil {
panic(err)
}
return &que
}
// SetContentType 设置发送的类型, 让接收端好判断
func (que *Queue) SetContentType(ct string) {
que.mutex.Lock()
defer que.mutex.Unlock()
que.contentType = ct
}
// Connect 链接MQ
func (que *Queue) Connect() error {
que.mutex.Lock()
defer que.mutex.Unlock()
conn, err := amqp.Dial(que.url)
if err != nil {
return err
}
que.conn = conn
runtime.SetFinalizer(que.conn, func(obj *amqp.Connection) {
que.conn.Close()
})
return nil
}
// Push 推送数据
func (que *Queue) Push(data []byte) error {
que.mutex.Lock()
defer que.mutex.Unlock()
ch, err := que.conn.Channel()
if err != nil {
log.Println(err)
if err := que.Connect(); err != nil {
panic(err)
}
}
return ch.Publish(que.exchange, que.routekey, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: que.contentType,
Body: data,
})
}

3
queue.yaml Normal file
View File

@ -0,0 +1,3 @@
url: "amqp://spider:spider@172.16.6.109:5672/test_adspider"
exchange: "ad_process"
routekey: "CN"

2
require.sh Normal file
View File

@ -0,0 +1,2 @@
https_proxy=474420502.top:7070
go get -u github.com/streadway/amqp

79
test.json Normal file
View File

@ -0,0 +1,79 @@
[
{
"app": {
"ab_extra": {
"download_button_style": {
"breathing": 0,
"color": 0
}
},
"ad_id": 1619720684522500,
"app_icon": "",
"app_name": "部落帝国2.0",
"app_size": "",
"button_text": "立即下载",
"click_track_url": "",
"click_track_url_list": [],
"description": "女友一次就抽到黑暗先知进阶到SSR,5分钟灭了对面大R",
"dislike": [
{
"name": "为什么看到此广告",
"open_url": "sslocal://webview?url=https%3a%2f%2fi.snssdk.com%2fapi%2fad%2ffeedback%2fprivacy%2fpage%3ftype%3dad_dislike\u0026hide_more=1\u0026title=为什么看到此广告"
}
],
"display_subtype": 3,
"download_count": "",
"download_url": "https://apk.dian5.com/73008_2826.apk",
"filter_words": [
{
"id": "4:2",
"is_selected": false,
"name": "看过了"
},
{
"id": "1:217",
"is_selected": false,
"name": "屏蔽:网络游戏类广告"
},
{
"id": "1:65",
"is_selected": false,
"name": "屏蔽:移动游戏类广告"
},
{
"id": "1:1",
"is_selected": false,
"name": "屏蔽:应用下载类广告"
}
],
"hide_if_exists": 1,
"id": 1619729644154919,
"image": {
"height": 720,
"uri": "web.business.image/201812125d0da99029c871f2450cbc94",
"url_list": [
{
"url": "http://sf3-ttcdn-tos.pstatp.com/obj/web.business.image/201812125d0da99029c871f2450cbc94"
}
],
"width": 1280
},
"image_mode": 3,
"intercept_flag": 2,
"label": "广告",
"log_extra": "{\"ad_price\":\"XBi_JQADxO1cGL8lAAPE7crapB3Kzd0_HoWFVw\",\"convert_component_suspend\":0,\"convert_id\":1619714145899534,\"external_action\":8,\"item_id\":6565653745026204168,\"media_id\":1598790750331917,\"orit\":null,\"req_id\":\"20181218173428010013037207426AF89\",\"rit\":3}",
"open_url": "",
"os_type": "android",
"package": "com.buluodiguo.applog.anfeng",
"rate": -1,
"show_dislike": 1,
"source": "",
"title": "女友一次就抽到黑暗先知进阶到SSR,5分钟灭了对面大R",
"track_url": "",
"track_url_list": [],
"type": "app",
"web_url": "https://ad.toutiao.com/tetris/page/1619714877979655/"
},
"is_preview": false
}
]