diff --git a/parser.go b/parser.go index eec3d8d..2683564 100644 --- a/parser.go +++ b/parser.go @@ -4,6 +4,7 @@ import ( "encoding/json" "io/ioutil" "log" + "sync" "gopkg.in/yaml.v2" @@ -162,34 +163,55 @@ func (p *Parser) GetQueue() *Queue { return p.que } +// ADParserServer 主入口循环 func ADParserServer(adp IParser) { + + wg := new(sync.WaitGroup) + + for i := 0; i < 10; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + for _ = range data { + } + }(i) + } + + for i := 0; i < 10000; i++ { + data <- i + } + close(data) + wg.Wait() + adresponse := adp.GetLogDB().ADParserSelect(adp.GetSpiderID()) // select from db + adresponseList := make(chan logdb.ADResonse, len(adresponse)) + for _, adr := range adresponse { parserAndSendMQ(&adr, adp) } } -func parserAndSendMQ(adr *logdb.ADResonse, adp IParser) error { +func parserAndSendMQ(adr *logdb.ADResonse, adp IParser) { pjson, err := adp.ToDoParser(adr.Response) if err != nil { - log.Println(err) + log.Println("uid:", adr.UID, "err:", err) adp.GetLogDB().ADError(adr.UID, err.Error()) - return err + return } // send pjson to mq // update UID status finish que := adp.GetQueue() err = que.Push([]byte(pjson)) if err != nil { - log.Println(err) + log.Println("uid:", adr.UID, "err:", err) adp.GetLogDB().ADError(adr.UID, err.Error()) - } else { - adp.GetLogDB().ADParserSuccess(adr.UID, pjson) + return } - return nil + adp.GetLogDB().ADParserSuccess(adr.UID, pjson) } +// NewADParser 创建一个ADParser的类, 包含很多传到终端的所有结构 func NewADParser(SpiderID int) *ADParser { adp := &ADParser{} adp.Resources = make([]Resource, 0) diff --git a/parser_test.go b/parser_test.go index 24fd08d..cd1ce3e 100644 --- a/parser_test.go +++ b/parser_test.go @@ -2,8 +2,6 @@ package parser import ( "encoding/json" - "errors" - "log" "testing" ) @@ -27,13 +25,12 @@ func (tt *Toutiao) GetSpiderID() int { func (tt *Toutiao) ToDoParser(adstring string) (string, error) { adparser := NewADParser(tt.GetSpiderID()) - log.Println(adparser) data, err := adparser.ToJSON() if err != nil { return "", err } // log.Println(string(data)) - return string(data), errors.New("fuck test") + return string(data), nil } func TestParserToutiao(t *testing.T) {