Init: lab4 done
This commit is contained in:
114
workflows/fetch.go
Normal file
114
workflows/fetch.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package workflows
|
||||
|
||||
import (
|
||||
"context"
|
||||
"golang-lab4/models"
|
||||
"golang-lab4/utils"
|
||||
"time"
|
||||
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/uptrace/bun"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
REFETCH_TICK = 5 * time.Second
|
||||
)
|
||||
|
||||
type Fetcher struct {
|
||||
db *bun.DB
|
||||
client *resty.Client
|
||||
urls map[string]struct{}
|
||||
doneCh chan string
|
||||
}
|
||||
|
||||
func NewFetcher(db *bun.DB) *Fetcher {
|
||||
urlSlice, err := utils.LoadUrls(viper.GetString("savefile"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
urls := make(map[string]struct{})
|
||||
for _, url := range urlSlice {
|
||||
urls[url] = struct{}{}
|
||||
}
|
||||
|
||||
return &Fetcher{
|
||||
db: db,
|
||||
client: resty.New(),
|
||||
urls: urls,
|
||||
doneCh: make(chan string),
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Fetcher) fetch(url string) models.UrlState {
|
||||
before := time.Now()
|
||||
zap.L().Info("fetch",
|
||||
zap.String("url", url))
|
||||
resp, err := self.client.R().
|
||||
Get(url)
|
||||
|
||||
return models.UrlState{
|
||||
Url: url,
|
||||
IsSuccess: (err == nil && resp.IsSuccess()),
|
||||
Latency: uint(time.Now().Sub(before) / time.Millisecond),
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Fetcher) fetchAndSleep(url string) {
|
||||
res := self.fetch(url)
|
||||
|
||||
time.Sleep(REFETCH_TICK)
|
||||
|
||||
_, err := self.db.NewInsert().
|
||||
Model(&res).
|
||||
On("CONFLICT (url) DO UPDATE").
|
||||
Set("is_success = ?", res.IsSuccess).
|
||||
Set("latency = ?", res.Latency).
|
||||
Exec(context.Background())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
self.doneCh <- url
|
||||
}
|
||||
|
||||
func (self *Fetcher) SyncFile() {
|
||||
urlSlice := []string{}
|
||||
for url := range self.urls {
|
||||
urlSlice = append(urlSlice, url)
|
||||
}
|
||||
|
||||
err := utils.SaveUrls(viper.GetString("savefile"), urlSlice)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Fetcher) Add(url string) {
|
||||
self.urls[url] = struct{}{}
|
||||
self.SyncFile()
|
||||
go self.fetchAndSleep(url)
|
||||
}
|
||||
|
||||
func (self *Fetcher) Remove(url string) {
|
||||
delete(self.urls, url)
|
||||
self.SyncFile()
|
||||
self.db.NewDelete().
|
||||
Model((*models.UrlState)(nil)).
|
||||
Where("url = ?", url).
|
||||
Exec(context.Background())
|
||||
}
|
||||
|
||||
func (self *Fetcher) Run() {
|
||||
for url := range self.urls {
|
||||
go self.fetchAndSleep(url)
|
||||
}
|
||||
|
||||
for {
|
||||
url := <-self.doneCh
|
||||
if _, ok := self.urls[url]; ok {
|
||||
go self.fetchAndSleep(url)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user