From 23e2d3a1324cc26a901272ce0311595f7fbb540c Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Mon, 12 May 2025 22:19:49 +0800 Subject: [PATCH] chore: rebuild provider load --- adapter/provider/healthcheck.go | 14 +---- adapter/provider/provider.go | 3 - hub/executor/executor.go | 102 ++++++++++---------------------- 3 files changed, 34 insertions(+), 85 deletions(-) diff --git a/adapter/provider/healthcheck.go b/adapter/provider/healthcheck.go index 2bddd8e79..dafdbb115 100644 --- a/adapter/provider/healthcheck.go +++ b/adapter/provider/healthcheck.go @@ -43,13 +43,14 @@ type HealthCheck struct { } func (hc *HealthCheck) process() { - if hc.started.Load() { + if !hc.started.CompareAndSwap(false, true) { log.Warnln("Skip start health check timer due to it's started") return } + defer hc.started.Store(false) ticker := time.NewTicker(hc.interval) - hc.start() + go hc.check() for { select { case <-ticker.C: @@ -62,7 +63,6 @@ func (hc *HealthCheck) process() { } case <-hc.ctx.Done(): ticker.Stop() - hc.stop() return } } @@ -131,14 +131,6 @@ func (hc *HealthCheck) touch() { hc.lastTouch.Store(time.Now()) } -func (hc *HealthCheck) start() { - hc.started.Store(true) -} - -func (hc *HealthCheck) stop() { - hc.started.Store(false) -} - func (hc *HealthCheck) check() { if len(hc.proxies) == 0 { return diff --git a/adapter/provider/provider.go b/adapter/provider/provider.go index b1934bc7e..7bd80d32f 100644 --- a/adapter/provider/provider.go +++ b/adapter/provider/provider.go @@ -302,9 +302,6 @@ func (cp *compatibleProvider) Update() error { } func (cp *compatibleProvider) Initial() error { - if cp.healthCheck.interval != 0 && cp.healthCheck.url != "" { - cp.HealthCheck() - } return nil } diff --git a/hub/executor/executor.go b/hub/executor/executor.go index dd5f0912f..386be37a4 100644 --- a/hub/executor/executor.go +++ b/hub/executor/executor.go @@ -113,12 +113,11 @@ func ApplyConfig(cfg *config.Config, force bool) { tunnel.OnInnerLoading() initInnerTcp() - loadProxyProvider(cfg.Providers) + loadProvider(cfg.Providers) updateProfile(cfg) - loadRuleProvider(cfg.RuleProviders) + loadProvider(cfg.RuleProviders) runtime.GC() tunnel.OnRunning() - hcCompatibleProvider(cfg.Providers) updateUpdater(cfg) resolver.ResetConnection() @@ -303,79 +302,40 @@ func updateRules(rules []C.Rule, subRules map[string][]C.Rule, ruleProviders map tunnel.UpdateRules(rules, subRules, ruleProviders) } -func loadProvider(pv provider.Provider) { - if pv.VehicleType() == provider.Compatible { - return - } else { - log.Infoln("Start initial provider %s", (pv).Name()) - } - - if err := pv.Initial(); err != nil { - switch pv.Type() { - case provider.Proxy: - { - log.Errorln("initial proxy provider %s error: %v", (pv).Name(), err) - } - case provider.Rule: - { - log.Errorln("initial rule provider %s error: %v", (pv).Name(), err) - } - +func loadProvider[P provider.Provider](providers map[string]P) { + load := func(pv P) { + name := pv.Name() + if pv.VehicleType() == provider.Compatible { + log.Infoln("Start initial compatible provider %s", name) + } else { + log.Infoln("Start initial provider %s", name) } - } -} -func loadRuleProvider(ruleProviders map[string]provider.RuleProvider) { - wg := sync.WaitGroup{} - ch := make(chan struct{}, concurrentCount) - for _, ruleProvider := range ruleProviders { - ruleProvider := ruleProvider - wg.Add(1) - ch <- struct{}{} - go func() { - defer func() { <-ch; wg.Done() }() - loadProvider(ruleProvider) - - }() - } - - wg.Wait() -} - -func loadProxyProvider(proxyProviders map[string]provider.ProxyProvider) { - // limit concurrent size - wg := sync.WaitGroup{} - ch := make(chan struct{}, concurrentCount) - for _, proxyProvider := range proxyProviders { - proxyProvider := proxyProvider - wg.Add(1) - ch <- struct{}{} - go func() { - defer func() { <-ch; wg.Done() }() - loadProvider(proxyProvider) - }() - } - - wg.Wait() -} -func hcCompatibleProvider(proxyProviders map[string]provider.ProxyProvider) { - // limit concurrent size - wg := sync.WaitGroup{} - ch := make(chan struct{}, concurrentCount) - for _, proxyProvider := range proxyProviders { - proxyProvider := proxyProvider - if proxyProvider.VehicleType() == provider.Compatible { - log.Infoln("Start initial Compatible provider %s", proxyProvider.Name()) - wg.Add(1) - ch <- struct{}{} - go func() { - defer func() { <-ch; wg.Done() }() - if err := proxyProvider.Initial(); err != nil { - log.Errorln("initial Compatible provider %s error: %v", proxyProvider.Name(), err) + if err := pv.Initial(); err != nil { + switch pv.Type() { + case provider.Proxy: + { + log.Errorln("initial proxy provider %s error: %v", name, err) } - }() + case provider.Rule: + { + log.Errorln("initial rule provider %s error: %v", name, err) + } + } } } + + wg := sync.WaitGroup{} + ch := make(chan struct{}, concurrentCount) + for _, pv := range providers { + pv := pv + wg.Add(1) + ch <- struct{}{} + go func() { + defer func() { <-ch; wg.Done() }() + load(pv) + }() + } wg.Wait() }