115 lines
2.0 KiB
Go
115 lines
2.0 KiB
Go
package workflows
|
|
|
|
import (
|
|
"context"
|
|
"golang-lab4/models"
|
|
"golang-lab4/utils"
|
|
"time"
|
|
|
|
"github.com/go-resty/resty/v2"
|
|
"github.com/spf13/viper"
|
|
"github.com/uptrace/bun"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
REFETCH_TICK = 5 * time.Second
|
|
)
|
|
|
|
type Fetcher struct {
|
|
db *bun.DB
|
|
client *resty.Client
|
|
urls map[string]struct{}
|
|
doneCh chan string
|
|
}
|
|
|
|
func NewFetcher(db *bun.DB) *Fetcher {
|
|
urlSlice, err := utils.LoadUrls(viper.GetString("savefile"))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
urls := make(map[string]struct{})
|
|
for _, url := range urlSlice {
|
|
urls[url] = struct{}{}
|
|
}
|
|
|
|
return &Fetcher{
|
|
db: db,
|
|
client: resty.New(),
|
|
urls: urls,
|
|
doneCh: make(chan string),
|
|
}
|
|
}
|
|
|
|
func (self *Fetcher) fetch(url string) models.UrlState {
|
|
before := time.Now()
|
|
zap.L().Info("fetch",
|
|
zap.String("url", url))
|
|
resp, err := self.client.R().
|
|
Get(url)
|
|
|
|
return models.UrlState{
|
|
Url: url,
|
|
IsSuccess: (err == nil && resp.IsSuccess()),
|
|
Latency: uint(time.Now().Sub(before) / time.Millisecond),
|
|
}
|
|
}
|
|
|
|
func (self *Fetcher) fetchAndSleep(url string) {
|
|
res := self.fetch(url)
|
|
|
|
time.Sleep(REFETCH_TICK)
|
|
|
|
_, err := self.db.NewInsert().
|
|
Model(&res).
|
|
On("CONFLICT (url) DO UPDATE").
|
|
Set("is_success = ?", res.IsSuccess).
|
|
Set("latency = ?", res.Latency).
|
|
Exec(context.Background())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
self.doneCh <- url
|
|
}
|
|
|
|
func (self *Fetcher) SyncFile() {
|
|
urlSlice := []string{}
|
|
for url := range self.urls {
|
|
urlSlice = append(urlSlice, url)
|
|
}
|
|
|
|
err := utils.SaveUrls(viper.GetString("savefile"), urlSlice)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (self *Fetcher) Add(url string) {
|
|
self.urls[url] = struct{}{}
|
|
self.SyncFile()
|
|
go self.fetchAndSleep(url)
|
|
}
|
|
|
|
func (self *Fetcher) Remove(url string) {
|
|
delete(self.urls, url)
|
|
self.SyncFile()
|
|
self.db.NewDelete().
|
|
Model((*models.UrlState)(nil)).
|
|
Where("url = ?", url).
|
|
Exec(context.Background())
|
|
}
|
|
|
|
func (self *Fetcher) Run() {
|
|
for url := range self.urls {
|
|
go self.fetchAndSleep(url)
|
|
}
|
|
|
|
for {
|
|
url := <-self.doneCh
|
|
if _, ok := self.urls[url]; ok {
|
|
go self.fetchAndSleep(url)
|
|
}
|
|
}
|
|
}
|