Files
golang-lab4/workflows/fetch.go
2025-11-07 05:26:09 +08:00

115 lines
2.0 KiB
Go

package workflows
import (
"context"
"golang-lab4/models"
"golang-lab4/utils"
"time"
"github.com/go-resty/resty/v2"
"github.com/spf13/viper"
"github.com/uptrace/bun"
"go.uber.org/zap"
)
const (
REFETCH_TICK = 5 * time.Second
)
type Fetcher struct {
db *bun.DB
client *resty.Client
urls map[string]struct{}
doneCh chan string
}
func NewFetcher(db *bun.DB) *Fetcher {
urlSlice, err := utils.LoadUrls(viper.GetString("savefile"))
if err != nil {
panic(err)
}
urls := make(map[string]struct{})
for _, url := range urlSlice {
urls[url] = struct{}{}
}
return &Fetcher{
db: db,
client: resty.New(),
urls: urls,
doneCh: make(chan string),
}
}
func (self *Fetcher) fetch(url string) models.UrlState {
before := time.Now()
zap.L().Info("fetch",
zap.String("url", url))
resp, err := self.client.R().
Get(url)
return models.UrlState{
Url: url,
IsSuccess: (err == nil && resp.IsSuccess()),
Latency: uint(time.Now().Sub(before) / time.Millisecond),
}
}
func (self *Fetcher) fetchAndSleep(url string) {
res := self.fetch(url)
time.Sleep(REFETCH_TICK)
_, err := self.db.NewInsert().
Model(&res).
On("CONFLICT (url) DO UPDATE").
Set("is_success = ?", res.IsSuccess).
Set("latency = ?", res.Latency).
Exec(context.Background())
if err != nil {
panic(err)
}
self.doneCh <- url
}
func (self *Fetcher) SyncFile() {
urlSlice := []string{}
for url := range self.urls {
urlSlice = append(urlSlice, url)
}
err := utils.SaveUrls(viper.GetString("savefile"), urlSlice)
if err != nil {
panic(err)
}
}
func (self *Fetcher) Add(url string) {
self.urls[url] = struct{}{}
self.SyncFile()
go self.fetchAndSleep(url)
}
func (self *Fetcher) Remove(url string) {
delete(self.urls, url)
self.SyncFile()
self.db.NewDelete().
Model((*models.UrlState)(nil)).
Where("url = ?", url).
Exec(context.Background())
}
func (self *Fetcher) Run() {
for url := range self.urls {
go self.fetchAndSleep(url)
}
for {
url := <-self.doneCh
if _, ok := self.urls[url]; ok {
go self.fetchAndSleep(url)
}
}
}