Compare commits

...

2 Commits

Author SHA1 Message Date
Larvan2
f774276896
fix: ensure wait group completes 2025-04-28 03:07:21 +00:00
wwqgtxx
aa51b9faba chore: replace using internal batch package to x/sync/errgroup
In the original batch implementation, the Go() method will always start a new goroutine and then wait for the concurrency limit, which is unnecessary for the current code. x/sync/errgroup will block Go() until the concurrency limit is met, which can effectively reduce memory usage.
In addition, the original batch always saves the return value of Go(), but it is not used in the current code, which will also waste a lot of memory space in high concurrency scenarios.
2025-04-28 10:28:45 +08:00
3 changed files with 15 additions and 31 deletions

View File

@ -7,13 +7,13 @@ import (
"time" "time"
"github.com/metacubex/mihomo/common/atomic" "github.com/metacubex/mihomo/common/atomic"
"github.com/metacubex/mihomo/common/batch"
"github.com/metacubex/mihomo/common/singledo" "github.com/metacubex/mihomo/common/singledo"
"github.com/metacubex/mihomo/common/utils" "github.com/metacubex/mihomo/common/utils"
C "github.com/metacubex/mihomo/constant" C "github.com/metacubex/mihomo/constant"
"github.com/metacubex/mihomo/log" "github.com/metacubex/mihomo/log"
"github.com/dlclark/regexp2" "github.com/dlclark/regexp2"
"golang.org/x/sync/errgroup"
) )
type HealthCheckOption struct { type HealthCheckOption struct {
@ -147,7 +147,8 @@ func (hc *HealthCheck) check() {
_, _, _ = hc.singleDo.Do(func() (struct{}, error) { _, _, _ = hc.singleDo.Do(func() (struct{}, error) {
id := utils.NewUUIDV4().String() id := utils.NewUUIDV4().String()
log.Debugln("Start New Health Checking {%s}", id) log.Debugln("Start New Health Checking {%s}", id)
b, _ := batch.New[bool](hc.ctx, batch.WithConcurrencyNum[bool](10)) b := new(errgroup.Group)
b.SetLimit(10)
// execute default health check // execute default health check
option := &extraOption{filters: nil, expectedStatus: hc.expectedStatus} option := &extraOption{filters: nil, expectedStatus: hc.expectedStatus}
@ -159,13 +160,13 @@ func (hc *HealthCheck) check() {
hc.execute(b, url, id, option) hc.execute(b, url, id, option)
} }
} }
b.Wait() _ = b.Wait()
log.Debugln("Finish A Health Checking {%s}", id) log.Debugln("Finish A Health Checking {%s}", id)
return struct{}{}, nil return struct{}{}, nil
}) })
} }
func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *extraOption) { func (hc *HealthCheck) execute(b *errgroup.Group, url, uid string, option *extraOption) {
url = strings.TrimSpace(url) url = strings.TrimSpace(url)
if len(url) == 0 { if len(url) == 0 {
log.Debugln("Health Check has been skipped due to testUrl is empty, {%s}", uid) log.Debugln("Health Check has been skipped due to testUrl is empty, {%s}", uid)
@ -195,13 +196,13 @@ func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *ex
} }
p := proxy p := proxy
b.Go(p.Name(), func() (bool, error) { b.Go(func() error {
ctx, cancel := context.WithTimeout(hc.ctx, hc.timeout) ctx, cancel := context.WithTimeout(hc.ctx, hc.timeout)
defer cancel() defer cancel()
log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid) log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid)
_, _ = p.URLTest(ctx, url, expectedStatus) _, _ = p.URLTest(ctx, url, expectedStatus)
log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.AliveForTestUrl(url), p.LastDelayForTestUrl(url), uid) log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.AliveForTestUrl(url), p.LastDelayForTestUrl(url), uid)
return false, nil return nil
}) })
} }
} }

View File

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/metacubex/mihomo/common/atomic" "github.com/metacubex/mihomo/common/atomic"
"github.com/metacubex/mihomo/common/batch"
"github.com/metacubex/mihomo/common/utils" "github.com/metacubex/mihomo/common/utils"
"github.com/metacubex/mihomo/component/geodata" "github.com/metacubex/mihomo/component/geodata"
_ "github.com/metacubex/mihomo/component/geodata/standard" _ "github.com/metacubex/mihomo/component/geodata/standard"
@ -19,6 +18,7 @@ import (
"github.com/metacubex/mihomo/log" "github.com/metacubex/mihomo/log"
"github.com/oschwald/maxminddb-golang" "github.com/oschwald/maxminddb-golang"
"golang.org/x/sync/errgroup"
) )
var ( var (
@ -169,41 +169,25 @@ func UpdateGeoSite() (err error) {
func updateGeoDatabases() error { func updateGeoDatabases() error {
defer runtime.GC() defer runtime.GC()
b, _ := batch.New[interface{}](context.Background()) b := errgroup.Group{}
if geodata.GeoIpEnable() { if geodata.GeoIpEnable() {
if geodata.GeodataMode() { if geodata.GeodataMode() {
b.Go("UpdateGeoIp", func() (_ interface{}, err error) { b.Go(UpdateGeoIp)
err = UpdateGeoIp()
return
})
} else { } else {
b.Go("UpdateMMDB", func() (_ interface{}, err error) { b.Go(UpdateMMDB)
err = UpdateMMDB()
return
})
} }
} }
if geodata.ASNEnable() { if geodata.ASNEnable() {
b.Go("UpdateASN", func() (_ interface{}, err error) { b.Go(UpdateASN)
err = UpdateASN()
return
})
} }
if geodata.GeoSiteEnable() { if geodata.GeoSiteEnable() {
b.Go("UpdateGeoSite", func() (_ interface{}, err error) { b.Go(UpdateGeoSite)
err = UpdateGeoSite()
return
})
} }
if e := b.Wait(); e != nil { return b.Wait()
return e.Err
}
return nil
} }
var ErrGetDatabaseUpdateSkip = errors.New("GEO database is updating, skip") var ErrGetDatabaseUpdateSkip = errors.New("GEO database is updating, skip")

View File

@ -375,9 +375,8 @@ func hcCompatibleProvider(proxyProviders map[string]provider.ProxyProvider) {
} }
}() }()
} }
} }
wg.Wait()
} }
func updateSniffer(snifferConfig *sniffer.Config) { func updateSniffer(snifferConfig *sniffer.Config) {