1
This commit is contained in:
Alex Yatskov 2015-08-14 16:40:37 +09:00
parent c92823220b
commit b7a07980e5
3 changed files with 60 additions and 61 deletions

View File

@ -26,8 +26,6 @@ import (
"encoding/json"
"io/ioutil"
"os"
"sync"
"time"
"github.com/kellydunn/golang-geo"
)
@ -40,7 +38,6 @@ type geoCoord struct {
type geoCache struct {
filename string
data map[string]geoCoord
mutex sync.Mutex
coder geo.GoogleGeocoder
}
@ -89,11 +86,6 @@ func (c *geoCache) decode(address string) (geoCoord, error) {
}
coord := geoCoord{point.Lat(), point.Lng()}
c.mutex.Lock()
c.data[address] = coord
c.mutex.Unlock()
time.Sleep(200 * time.Millisecond)
return coord, nil
}

View File

@ -34,8 +34,6 @@ import (
"github.com/PuerkitoBio/goquery"
)
const ()
type tabelogParams struct {
Page int
}
@ -69,16 +67,13 @@ func makeAbsUrl(base, ref string) string {
return b.ResolveReference(r).String()
}
func dumpReviews(filename string, rc chan tabelogReview, wg *sync.WaitGroup) {
func dumpReviews(filename string, in chan tabelogReview, wg *sync.WaitGroup) {
defer wg.Done()
count := 1
var reviews []tabelogReview
for {
if review, ok := <-rc; ok {
log.Printf("%s (%d)", review.Name, count)
if review, ok := <-in; ok {
reviews = append(reviews, review)
count++
} else {
break
}
@ -94,7 +89,29 @@ func dumpReviews(filename string, rc chan tabelogReview, wg *sync.WaitGroup) {
}
}
func scrapeReview(url string, rc chan tabelogReview, wg *sync.WaitGroup, wc *webCache, gc *geoCache) {
func decodeReviews(in chan tabelogReview, out chan tabelogReview, wg *sync.WaitGroup, gc *geoCache) {
defer wg.Done()
for {
if review, ok := <-in; ok {
log.Print("decoding %s", review.Name)
coord, err := gc.decode(review.Address)
if err != nil {
log.Fatal(err)
}
review.Latitude = coord.Latitude
review.Longitude = coord.Longitude
out <- review
} else {
close(out)
}
}
}
func scrapeReview(url string, out chan tabelogReview, wg *sync.WaitGroup, wc *webCache) {
defer wg.Done()
doc, err := wc.load(url)
@ -129,23 +146,10 @@ func scrapeReview(url string, rc chan tabelogReview, wg *sync.WaitGroup, wc *web
return
}
coord, err := gc.decode(review.Address)
if err != nil {
switch err.Error() {
case "ZERO_RESULTS":
return
default:
log.Fatal(err)
}
}
review.Latitude = coord.Latitude
review.Longitude = coord.Longitude
rc <- review
out <- review
}
func scrapeIndex(url string, out chan tabelogReview, wc *webCache, gc *geoCache) {
func scrapeIndex(url string, out chan tabelogReview, wc *webCache) {
doc, err := wc.load(url)
if err != nil {
log.Fatal(err)
@ -155,13 +159,13 @@ func scrapeIndex(url string, out chan tabelogReview, wc *webCache, gc *geoCache)
doc.Find("div.list-rst__header > p > a").Each(func(index int, sel *goquery.Selection) {
if href, ok := sel.Attr("href"); ok {
wg.Add(1)
go scrapeReview(makeAbsUrl(url, href), out, &wg, wc, gc)
go scrapeReview(makeAbsUrl(url, href), out, &wg, wc)
}
})
wg.Wait()
if href, ok := doc.Find("a.c-pagination__target--next").Attr("href"); ok {
scrapeIndex(makeAbsUrl(url, href), out, wc, gc)
scrapeIndex(makeAbsUrl(url, href), out, wc)
}
}
@ -176,17 +180,20 @@ func scrapeTabelog(url, resultFile, webCacheDir, geoCacheFile string) {
log.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(1)
rc := make(chan tabelogReview)
go dumpReviews(resultFile, rc, &wg)
scrapeChan := make(chan tabelogReview)
decodeChan := make(chan tabelogReview)
scrapeIndex(url, rc, wc, gc)
var wg sync.WaitGroup
wg.Add(2)
go decodeReviews(scrapeChan, decodeChan, &wg, gc)
go dumpReviews(resultFile, decodeChan, &wg)
scrapeIndex(url, scrapeChan, wc)
close(scrapeChan)
if err := gc.save(); err != nil {
log.Fatal(err)
}
close(rc)
wg.Wait()
}

View File

@ -35,21 +35,21 @@ import (
)
type webCache struct {
cacheDir string
directory string
}
func newWebCache(cacheDir string) (*webCache, error) {
if err := os.MkdirAll(cacheDir, 0755); err != nil {
func newWebCache(directory string) (*webCache, error) {
if err := os.MkdirAll(directory, 0755); err != nil {
return nil, err
}
return &webCache{cacheDir: cacheDir}, nil
return &webCache{directory: directory}, nil
}
func (c *webCache) urlToLocal(url string) string {
hash := md5.New()
hash.Write([]byte(url))
return path.Join(c.cacheDir, fmt.Sprintf("%x.html", hash.Sum(nil)))
return path.Join(c.directory, fmt.Sprintf("%x.html", hash.Sum(nil)))
}
func (c *webCache) load(url string) (*goquery.Document, error) {
@ -58,22 +58,22 @@ func (c *webCache) load(url string) (*goquery.Document, error) {
if file, err := os.Open(localPath); err == nil {
defer file.Close()
return goquery.NewDocumentFromReader(file)
} else {
res, err := http.Get(url)
if err != nil {
return nil, err
}
defer res.Body.Close()
var buff bytes.Buffer
if _, err := buff.ReadFrom(res.Body); err != nil {
return nil, err
}
if err := ioutil.WriteFile(localPath, buff.Bytes(), 0644); err != nil {
return nil, err
}
return goquery.NewDocumentFromReader(&buff)
}
res, err := http.Get(url)
if err != nil {
return nil, err
}
defer res.Body.Close()
var buff bytes.Buffer
if _, err := buff.ReadFrom(res.Body); err != nil {
return nil, err
}
if err := ioutil.WriteFile(localPath, buff.Bytes(), 0644); err != nil {
return nil, err
}
return goquery.NewDocumentFromReader(&buff)
}