chore: rebuild outdated proxy auto close mechanism

This commit is contained in:
wwqgtxx 2025-04-03 22:42:32 +08:00
parent 7f1225b0c4
commit 622d99d000
17 changed files with 122 additions and 352 deletions

View File

@ -9,8 +9,10 @@ import (
"net/http" "net/http"
"net/netip" "net/netip"
"net/url" "net/url"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/metacubex/mihomo/common/atomic" "github.com/metacubex/mihomo/common/atomic"
@ -39,6 +41,9 @@ type Proxy struct {
alive atomic.Bool alive atomic.Bool
history *queue.Queue[C.DelayHistory] history *queue.Queue[C.DelayHistory]
extra *xsync.MapOf[string, *internalProxyState] extra *xsync.MapOf[string, *internalProxyState]
closeOnce sync.Once
closeErr error
} }
// Adapter implements C.Proxy // Adapter implements C.Proxy
@ -290,12 +295,28 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
t = uint16(time.Since(start) / time.Millisecond) t = uint16(time.Since(start) / time.Millisecond)
return return
} }
func (p *Proxy) Close() error {
p.closeOnce.Do(func() {
runtime.SetFinalizer(p, nil)
p.closeErr = p.ProxyAdapter.Close()
})
return p.closeErr
}
func NewProxy(adapter C.ProxyAdapter) *Proxy { func NewProxy(adapter C.ProxyAdapter) *Proxy {
return &Proxy{ proxy := &Proxy{
ProxyAdapter: adapter, ProxyAdapter: adapter,
history: queue.New[C.DelayHistory](defaultHistoriesNum), history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true), alive: atomic.NewBool(true),
extra: xsync.NewMapOf[string, *internalProxyState]()} extra: xsync.NewMapOf[string, *internalProxyState]()}
// auto close ProxyAdapter
runtime.SetFinalizer(proxy, func(p *Proxy) {
log.Debugln("Closing outdated proxy [%s]", p.Name())
_ = p.Close()
})
return proxy
} }
func urlToMetadata(rawURL string) (addr C.Metadata, err error) { func urlToMetadata(rawURL string) (addr C.Metadata, err error) {

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"errors" "errors"
"net" "net"
"runtime"
"strconv" "strconv"
"time" "time"
@ -52,7 +51,7 @@ func (t *AnyTLS) DialContext(ctx context.Context, metadata *C.Metadata, opts ...
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewConn(CN.NewRefConn(c, t), t), nil return NewConn(c, t), nil
} }
func (t *AnyTLS) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { func (t *AnyTLS) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
@ -73,7 +72,7 @@ func (t *AnyTLS) ListenPacketContext(ctx context.Context, metadata *C.Metadata,
metadata.DstIP = ip metadata.DstIP = ip
} }
destination := M.SocksaddrFromNet(metadata.UDPAddr()) destination := M.SocksaddrFromNet(metadata.UDPAddr())
return newPacketConn(CN.NewRefPacketConn(CN.NewThreadSafePacketConn(uot.NewLazyConn(c, uot.Request{Destination: destination})), t), t), nil return newPacketConn(CN.NewThreadSafePacketConn(uot.NewLazyConn(c, uot.Request{Destination: destination})), t), nil
} }
// SupportUOT implements C.ProxyAdapter // SupportUOT implements C.ProxyAdapter
@ -88,6 +87,11 @@ func (t *AnyTLS) ProxyInfo() C.ProxyInfo {
return info return info
} }
// Close implements C.ProxyAdapter
func (t *AnyTLS) Close() error {
return t.client.Close()
}
func NewAnyTLS(option AnyTLSOption) (*AnyTLS, error) { func NewAnyTLS(option AnyTLSOption) (*AnyTLS, error) {
addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port)) addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port))
@ -132,9 +136,6 @@ func NewAnyTLS(option AnyTLSOption) (*AnyTLS, error) {
option: &option, option: &option,
dialer: singDialer, dialer: singDialer,
} }
runtime.SetFinalizer(outbound, func(o *AnyTLS) {
_ = o.client.Close()
})
return outbound, nil return outbound, nil
} }

View File

@ -13,6 +13,11 @@ import (
C "github.com/metacubex/mihomo/constant" C "github.com/metacubex/mihomo/constant"
) )
type ProxyAdapter interface {
C.ProxyAdapter
DialOptions(opts ...dialer.Option) []dialer.Option
}
type Base struct { type Base struct {
name string name string
addr string addr string
@ -152,6 +157,10 @@ func (b *Base) DialOptions(opts ...dialer.Option) []dialer.Option {
return opts return opts
} }
func (b *Base) Close() error {
return nil
}
type BasicOption struct { type BasicOption struct {
TFO bool `proxy:"tfo,omitempty"` TFO bool `proxy:"tfo,omitempty"`
MPTCP bool `proxy:"mptcp,omitempty"` MPTCP bool `proxy:"mptcp,omitempty"`
@ -224,6 +233,7 @@ func (c *conn) ReaderReplaceable() bool {
func NewConn(c net.Conn, a C.ProxyAdapter) C.Conn { func NewConn(c net.Conn, a C.ProxyAdapter) C.Conn {
if _, ok := c.(syscall.Conn); !ok { // exclusion system conn like *net.TCPConn if _, ok := c.(syscall.Conn); !ok { // exclusion system conn like *net.TCPConn
c = N.NewDeadlineConn(c) // most conn from outbound can't handle readDeadline correctly c = N.NewDeadlineConn(c) // most conn from outbound can't handle readDeadline correctly
c = N.NewRefConn(c, a) // add ref for autoCloseProxyAdapter
} }
return &conn{N.NewExtendedConn(c), []string{a.Name()}, parseRemoteDestination(a.Addr())} return &conn{N.NewExtendedConn(c), []string{a.Name()}, parseRemoteDestination(a.Addr())}
} }
@ -271,6 +281,7 @@ func newPacketConn(pc net.PacketConn, a C.ProxyAdapter) C.PacketConn {
epc := N.NewEnhancePacketConn(pc) epc := N.NewEnhancePacketConn(pc)
if _, ok := pc.(syscall.Conn); !ok { // exclusion system conn like *net.UDPConn if _, ok := pc.(syscall.Conn); !ok { // exclusion system conn like *net.UDPConn
epc = N.NewDeadlineEnhancePacketConn(epc) // most conn from outbound can't handle readDeadline correctly epc = N.NewDeadlineEnhancePacketConn(epc) // most conn from outbound can't handle readDeadline correctly
epc = N.NewRefPacketConn(epc, a) // add ref for autoCloseProxyAdapter
} }
return &packetConn{epc, []string{a.Name()}, a.Name(), utils.NewUUIDV4().String(), parseRemoteDestination(a.Addr())} return &packetConn{epc, []string{a.Name()}, a.Name(), utils.NewUUIDV4().String(), parseRemoteDestination(a.Addr())}
} }

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"net" "net"
"net/netip" "net/netip"
"runtime"
"strconv" "strconv"
"time" "time"
@ -15,7 +14,6 @@ import (
"github.com/metacubex/quic-go/congestion" "github.com/metacubex/quic-go/congestion"
M "github.com/sagernet/sing/common/metadata" M "github.com/sagernet/sing/common/metadata"
CN "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/component/ca" "github.com/metacubex/mihomo/component/ca"
"github.com/metacubex/mihomo/component/dialer" "github.com/metacubex/mihomo/component/dialer"
"github.com/metacubex/mihomo/component/proxydialer" "github.com/metacubex/mihomo/component/proxydialer"
@ -45,8 +43,6 @@ type Hysteria struct {
option *HysteriaOption option *HysteriaOption
client *core.Client client *core.Client
closeCh chan struct{} // for test
} }
func (h *Hysteria) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) { func (h *Hysteria) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
@ -55,7 +51,7 @@ func (h *Hysteria) DialContext(ctx context.Context, metadata *C.Metadata, opts .
return nil, err return nil, err
} }
return NewConn(CN.NewRefConn(tcpConn, h), h), nil return NewConn(tcpConn, h), nil
} }
func (h *Hysteria) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) { func (h *Hysteria) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) {
@ -63,7 +59,7 @@ func (h *Hysteria) ListenPacketContext(ctx context.Context, metadata *C.Metadata
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newPacketConn(CN.NewRefPacketConn(&hyPacketConn{udpConn}, h), h), nil return newPacketConn(&hyPacketConn{udpConn}, h), nil
} }
func (h *Hysteria) genHdc(ctx context.Context, opts ...dialer.Option) utils.PacketDialer { func (h *Hysteria) genHdc(ctx context.Context, opts ...dialer.Option) utils.PacketDialer {
@ -239,18 +235,16 @@ func NewHysteria(option HysteriaOption) (*Hysteria, error) {
option: &option, option: &option,
client: client, client: client,
} }
runtime.SetFinalizer(outbound, closeHysteria)
return outbound, nil return outbound, nil
} }
func closeHysteria(h *Hysteria) { // Close implements C.ProxyAdapter
func (h *Hysteria) Close() error {
if h.client != nil { if h.client != nil {
_ = h.client.Close() return h.client.Close()
}
if h.closeCh != nil {
close(h.closeCh)
} }
return nil
} }
type hyPacketConn struct { type hyPacketConn struct {

View File

@ -6,7 +6,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"runtime"
"strconv" "strconv"
"time" "time"
@ -39,8 +38,6 @@ type Hysteria2 struct {
option *Hysteria2Option option *Hysteria2Option
client *hysteria2.Client client *hysteria2.Client
dialer proxydialer.SingDialer dialer proxydialer.SingDialer
closeCh chan struct{} // for test
} }
type Hysteria2Option struct { type Hysteria2Option struct {
@ -78,7 +75,7 @@ func (h *Hysteria2) DialContext(ctx context.Context, metadata *C.Metadata, opts
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewConn(CN.NewRefConn(c, h), h), nil return NewConn(c, h), nil
} }
func (h *Hysteria2) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { func (h *Hysteria2) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
@ -91,16 +88,15 @@ func (h *Hysteria2) ListenPacketContext(ctx context.Context, metadata *C.Metadat
if pc == nil { if pc == nil {
return nil, errors.New("packetConn is nil") return nil, errors.New("packetConn is nil")
} }
return newPacketConn(CN.NewRefPacketConn(CN.NewThreadSafePacketConn(pc), h), h), nil return newPacketConn(CN.NewThreadSafePacketConn(pc), h), nil
} }
func closeHysteria2(h *Hysteria2) { // Close implements C.ProxyAdapter
func (h *Hysteria2) Close() error {
if h.client != nil { if h.client != nil {
_ = h.client.CloseWithError(errors.New("proxy removed")) return h.client.CloseWithError(errors.New("proxy removed"))
}
if h.closeCh != nil {
close(h.closeCh)
} }
return nil
} }
// ProxyInfo implements C.ProxyAdapter // ProxyInfo implements C.ProxyAdapter
@ -226,7 +222,6 @@ func NewHysteria2(option Hysteria2Option) (*Hysteria2, error) {
client: client, client: client,
dialer: singDialer, dialer: singDialer,
} }
runtime.SetFinalizer(outbound, closeHysteria2)
return outbound, nil return outbound, nil
} }

View File

@ -1,38 +0,0 @@
package outbound
import (
"context"
"runtime"
"testing"
"time"
)
func TestHysteria2GC(t *testing.T) {
option := Hysteria2Option{}
option.Server = "127.0.0.1"
option.Ports = "200,204,401-429,501-503"
option.HopInterval = 30
option.Password = "password"
option.Obfs = "salamander"
option.ObfsPassword = "password"
option.SNI = "example.com"
option.ALPN = []string{"h3"}
hy, err := NewHysteria2(option)
if err != nil {
t.Error(err)
return
}
closeCh := make(chan struct{})
hy.closeCh = closeCh
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
hy = nil
runtime.GC()
select {
case <-closeCh:
return
case <-ctx.Done():
t.Error("timeout not GC")
}
}

View File

@ -1,39 +0,0 @@
package outbound
import (
"context"
"runtime"
"testing"
"time"
)
func TestHysteriaGC(t *testing.T) {
option := HysteriaOption{}
option.Server = "127.0.0.1"
option.Ports = "200,204,401-429,501-503"
option.Protocol = "udp"
option.Up = "1Mbps"
option.Down = "1Mbps"
option.HopInterval = 30
option.Obfs = "salamander"
option.SNI = "example.com"
option.ALPN = []string{"h3"}
hy, err := NewHysteria(option)
if err != nil {
t.Error(err)
return
}
closeCh := make(chan struct{})
hy.closeCh = closeCh
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
hy = nil
runtime.GC()
select {
case <-closeCh:
return
case <-ctx.Done():
t.Error("timeout not GC")
}
}

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"runtime"
"strconv" "strconv"
"sync" "sync"
@ -62,7 +61,7 @@ func (m *Mieru) ListenPacketContext(ctx context.Context, metadata *C.Metadata, o
if err != nil { if err != nil {
return nil, fmt.Errorf("dial to %s failed: %w", metadata.UDPAddr(), err) return nil, fmt.Errorf("dial to %s failed: %w", metadata.UDPAddr(), err)
} }
return newPacketConn(CN.NewRefPacketConn(CN.NewThreadSafePacketConn(mierucommon.NewUDPAssociateWrapper(mierucommon.NewPacketOverStreamTunnel(c))), m), m), nil return newPacketConn(CN.NewThreadSafePacketConn(mierucommon.NewUDPAssociateWrapper(mierucommon.NewPacketOverStreamTunnel(c))), m), nil
} }
// SupportUOT implements C.ProxyAdapter // SupportUOT implements C.ProxyAdapter
@ -141,16 +140,17 @@ func NewMieru(option MieruOption) (*Mieru, error) {
option: &option, option: &option,
client: c, client: c,
} }
runtime.SetFinalizer(outbound, closeMieru)
return outbound, nil return outbound, nil
} }
func closeMieru(m *Mieru) { // Close implements C.ProxyAdapter
func (m *Mieru) Close() error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
if m.client != nil && m.client.IsRunning() { if m.client != nil && m.client.IsRunning() {
m.client.Stop() return m.client.Stop()
} }
return nil
} }
func metadataToMieruNetAddrSpec(metadata *C.Metadata) mierumodel.NetAddrSpec { func metadataToMieruNetAddrSpec(metadata *C.Metadata) mierumodel.NetAddrSpec {

View File

@ -3,7 +3,6 @@ package outbound
import ( import (
"context" "context"
"errors" "errors"
"runtime"
CN "github.com/metacubex/mihomo/common/net" CN "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/component/dialer" "github.com/metacubex/mihomo/component/dialer"
@ -18,8 +17,7 @@ import (
) )
type SingMux struct { type SingMux struct {
C.ProxyAdapter ProxyAdapter
base ProxyBase
client *mux.Client client *mux.Client
dialer proxydialer.SingDialer dialer proxydialer.SingDialer
onlyTcp bool onlyTcp bool
@ -43,25 +41,21 @@ type BrutalOption struct {
Down string `proxy:"down,omitempty"` Down string `proxy:"down,omitempty"`
} }
type ProxyBase interface {
DialOptions(opts ...dialer.Option) []dialer.Option
}
func (s *SingMux) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) { func (s *SingMux) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) {
options := s.base.DialOptions(opts...) options := s.ProxyAdapter.DialOptions(opts...)
s.dialer.SetDialer(dialer.NewDialer(options...)) s.dialer.SetDialer(dialer.NewDialer(options...))
c, err := s.client.DialContext(ctx, "tcp", M.ParseSocksaddrHostPort(metadata.String(), metadata.DstPort)) c, err := s.client.DialContext(ctx, "tcp", M.ParseSocksaddrHostPort(metadata.String(), metadata.DstPort))
if err != nil { if err != nil {
return nil, err return nil, err
} }
return NewConn(CN.NewRefConn(c, s), s.ProxyAdapter), err return NewConn(c, s), err
} }
func (s *SingMux) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { func (s *SingMux) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
if s.onlyTcp { if s.onlyTcp {
return s.ProxyAdapter.ListenPacketContext(ctx, metadata, opts...) return s.ProxyAdapter.ListenPacketContext(ctx, metadata, opts...)
} }
options := s.base.DialOptions(opts...) options := s.ProxyAdapter.DialOptions(opts...)
s.dialer.SetDialer(dialer.NewDialer(options...)) s.dialer.SetDialer(dialer.NewDialer(options...))
// sing-mux use stream-oriented udp with a special address, so we need a net.UDPAddr // sing-mux use stream-oriented udp with a special address, so we need a net.UDPAddr
@ -80,7 +74,7 @@ func (s *SingMux) ListenPacketContext(ctx context.Context, metadata *C.Metadata,
if pc == nil { if pc == nil {
return nil, E.New("packetConn is nil") return nil, E.New("packetConn is nil")
} }
return newPacketConn(CN.NewRefPacketConn(CN.NewThreadSafePacketConn(pc), s), s.ProxyAdapter), nil return newPacketConn(CN.NewThreadSafePacketConn(pc), s), nil
} }
func (s *SingMux) SupportUDP() bool { func (s *SingMux) SupportUDP() bool {
@ -103,11 +97,15 @@ func (s *SingMux) ProxyInfo() C.ProxyInfo {
return info return info
} }
func closeSingMux(s *SingMux) { // Close implements C.ProxyAdapter
_ = s.client.Close() func (s *SingMux) Close() error {
if s.client != nil {
_ = s.client.Close()
}
return s.ProxyAdapter.Close()
} }
func NewSingMux(option SingMuxOption, proxy C.ProxyAdapter, base ProxyBase) (C.ProxyAdapter, error) { func NewSingMux(option SingMuxOption, proxy ProxyAdapter) (ProxyAdapter, error) {
// TODO // TODO
// "TCP Brutal is only supported on Linux-based systems" // "TCP Brutal is only supported on Linux-based systems"
@ -131,11 +129,9 @@ func NewSingMux(option SingMuxOption, proxy C.ProxyAdapter, base ProxyBase) (C.P
} }
outbound := &SingMux{ outbound := &SingMux{
ProxyAdapter: proxy, ProxyAdapter: proxy,
base: base,
client: client, client: client,
dialer: singDialer, dialer: singDialer,
onlyTcp: option.OnlyTcp, onlyTcp: option.OnlyTcp,
} }
runtime.SetFinalizer(outbound, closeSingMux)
return outbound, nil return outbound, nil
} }

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"net" "net"
"os" "os"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -25,7 +24,10 @@ type Ssh struct {
*Base *Base
option *SshOption option *SshOption
client *sshClient // using a standalone struct to avoid its inner loop invalidate the Finalizer
config *ssh.ClientConfig
client *ssh.Client
cMutex sync.Mutex
} }
type SshOption struct { type SshOption struct {
@ -49,7 +51,7 @@ func (s *Ssh) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dia
return nil, err return nil, err
} }
} }
client, err := s.client.connect(ctx, cDialer, s.addr) client, err := s.connect(ctx, cDialer, s.addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -58,16 +60,10 @@ func (s *Ssh) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dia
return nil, err return nil, err
} }
return NewConn(N.NewRefConn(c, s), s), nil return NewConn(c, s), nil
} }
type sshClient struct { func (s *Ssh) connect(ctx context.Context, cDialer C.Dialer, addr string) (client *ssh.Client, err error) {
config *ssh.ClientConfig
client *ssh.Client
cMutex sync.Mutex
}
func (s *sshClient) connect(ctx context.Context, cDialer C.Dialer, addr string) (client *ssh.Client, err error) {
s.cMutex.Lock() s.cMutex.Lock()
defer s.cMutex.Unlock() defer s.cMutex.Unlock()
if s.client != nil { if s.client != nil {
@ -108,7 +104,15 @@ func (s *sshClient) connect(ctx context.Context, cDialer C.Dialer, addr string)
return client, nil return client, nil
} }
func (s *sshClient) Close() error { // ProxyInfo implements C.ProxyAdapter
func (s *Ssh) ProxyInfo() C.ProxyInfo {
info := s.Base.ProxyInfo()
info.DialerProxy = s.option.DialerProxy
return info
}
// Close implements C.ProxyAdapter
func (s *Ssh) Close() error {
s.cMutex.Lock() s.cMutex.Lock()
defer s.cMutex.Unlock() defer s.cMutex.Unlock()
if s.client != nil { if s.client != nil {
@ -117,17 +121,6 @@ func (s *sshClient) Close() error {
return nil return nil
} }
func closeSsh(s *Ssh) {
_ = s.client.Close()
}
// ProxyInfo implements C.ProxyAdapter
func (s *Ssh) ProxyInfo() C.ProxyInfo {
info := s.Base.ProxyInfo()
info.DialerProxy = s.option.DialerProxy
return info
}
func NewSsh(option SshOption) (*Ssh, error) { func NewSsh(option SshOption) (*Ssh, error) {
addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port)) addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port))
@ -204,11 +197,8 @@ func NewSsh(option SshOption) (*Ssh, error) {
prefer: C.NewDNSPrefer(option.IPVersion), prefer: C.NewDNSPrefer(option.IPVersion),
}, },
option: &option, option: &option,
client: &sshClient{ config: &config,
config: &config,
},
} }
runtime.SetFinalizer(outbound, closeSsh)
return outbound, nil return outbound, nil
} }

View File

@ -251,6 +251,14 @@ func (t *Trojan) ProxyInfo() C.ProxyInfo {
return info return info
} }
// Close implements C.ProxyAdapter
func (t *Trojan) Close() error {
if t.transport != nil {
return t.transport.Close()
}
return nil
}
func NewTrojan(option TrojanOption) (*Trojan, error) { func NewTrojan(option TrojanOption) (*Trojan, error) {
addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port)) addr := net.JoinHostPort(option.Server, strconv.Itoa(option.Port))

View File

@ -385,6 +385,14 @@ func (v *Vless) ProxyInfo() C.ProxyInfo {
return info return info
} }
// Close implements C.ProxyAdapter
func (v *Vless) Close() error {
if v.transport != nil {
return v.transport.Close()
}
return nil
}
func parseVlessAddr(metadata *C.Metadata, xudp bool) *vless.DstAddr { func parseVlessAddr(metadata *C.Metadata, xudp bool) *vless.DstAddr {
var addrType byte var addrType byte
var addr []byte var addr []byte

View File

@ -395,6 +395,14 @@ func (v *Vmess) ProxyInfo() C.ProxyInfo {
return info return info
} }
// Close implements C.ProxyAdapter
func (v *Vmess) Close() error {
if v.transport != nil {
return v.transport.Close()
}
return nil
}
// ListenPacketOnStreamConn implements C.ProxyAdapter // ListenPacketOnStreamConn implements C.ProxyAdapter
func (v *Vmess) ListenPacketOnStreamConn(ctx context.Context, c net.Conn, metadata *C.Metadata) (_ C.PacketConn, err error) { func (v *Vmess) ListenPacketOnStreamConn(ctx context.Context, c net.Conn, metadata *C.Metadata) (_ C.PacketConn, err error) {
// vmess use stream-oriented udp with a special address, so we need a net.UDPAddr // vmess use stream-oriented udp with a special address, so we need a net.UDPAddr

View File

@ -8,14 +8,12 @@ import (
"fmt" "fmt"
"net" "net"
"net/netip" "net/netip"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/metacubex/mihomo/common/atomic" "github.com/metacubex/mihomo/common/atomic"
CN "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/component/dialer" "github.com/metacubex/mihomo/component/dialer"
"github.com/metacubex/mihomo/component/proxydialer" "github.com/metacubex/mihomo/component/proxydialer"
"github.com/metacubex/mihomo/component/resolver" "github.com/metacubex/mihomo/component/resolver"
@ -45,7 +43,6 @@ type WireGuard struct {
tunDevice wireguard.Device tunDevice wireguard.Device
dialer proxydialer.SingDialer dialer proxydialer.SingDialer
resolver resolver.Resolver resolver resolver.Resolver
refP *refProxyAdapter
initOk atomic.Bool initOk atomic.Bool
initMutex sync.Mutex initMutex sync.Mutex
@ -57,8 +54,6 @@ type WireGuard struct {
serverAddrMap map[M.Socksaddr]netip.AddrPort serverAddrMap map[M.Socksaddr]netip.AddrPort
serverAddrTime atomic.TypedValue[time.Time] serverAddrTime atomic.TypedValue[time.Time]
serverAddrMutex sync.Mutex serverAddrMutex sync.Mutex
closeCh chan struct{} // for test
} }
type WireGuardOption struct { type WireGuardOption struct {
@ -173,7 +168,6 @@ func NewWireGuard(option WireGuardOption) (*WireGuard, error) {
}, },
dialer: proxydialer.NewSlowDownSingDialer(proxydialer.NewByNameSingDialer(option.DialerProxy, dialer.NewDialer()), slowdown.New()), dialer: proxydialer.NewSlowDownSingDialer(proxydialer.NewByNameSingDialer(option.DialerProxy, dialer.NewDialer()), slowdown.New()),
} }
runtime.SetFinalizer(outbound, closeWireGuard)
var reserved [3]uint8 var reserved [3]uint8
if len(option.Reserved) > 0 { if len(option.Reserved) > 0 {
@ -286,15 +280,13 @@ func NewWireGuard(option WireGuardOption) (*WireGuard, error) {
} }
} }
refP := &refProxyAdapter{}
outbound.refP = refP
if option.RemoteDnsResolve && len(option.Dns) > 0 { if option.RemoteDnsResolve && len(option.Dns) > 0 {
nss, err := dns.ParseNameServer(option.Dns) nss, err := dns.ParseNameServer(option.Dns)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for i := range nss { for i := range nss {
nss[i].ProxyAdapter = refP nss[i].ProxyAdapter = outbound
} }
outbound.resolver = dns.NewResolver(dns.Config{ outbound.resolver = dns.NewResolver(dns.Config{
Main: nss, Main: nss,
@ -488,13 +480,12 @@ func (w *WireGuard) genIpcConf(ctx context.Context, updateOnly bool) (string, er
return ipcConf, nil return ipcConf, nil
} }
func closeWireGuard(w *WireGuard) { // Close implements C.ProxyAdapter
func (w *WireGuard) Close() error {
if w.device != nil { if w.device != nil {
w.device.Close() w.device.Close()
} }
if w.closeCh != nil { return nil
close(w.closeCh)
}
} }
func (w *WireGuard) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) { func (w *WireGuard) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.Conn, err error) {
@ -507,8 +498,6 @@ func (w *WireGuard) DialContext(ctx context.Context, metadata *C.Metadata, opts
if !metadata.Resolved() || w.resolver != nil { if !metadata.Resolved() || w.resolver != nil {
r := resolver.DefaultResolver r := resolver.DefaultResolver
if w.resolver != nil { if w.resolver != nil {
w.refP.SetProxyAdapter(w)
defer w.refP.ClearProxyAdapter()
r = w.resolver r = w.resolver
} }
options = append(options, dialer.WithResolver(r)) options = append(options, dialer.WithResolver(r))
@ -523,7 +512,7 @@ func (w *WireGuard) DialContext(ctx context.Context, metadata *C.Metadata, opts
if conn == nil { if conn == nil {
return nil, E.New("conn is nil") return nil, E.New("conn is nil")
} }
return NewConn(CN.NewRefConn(conn, w), w), nil return NewConn(conn, w), nil
} }
func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) { func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
@ -536,8 +525,6 @@ func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadat
if (!metadata.Resolved() || w.resolver != nil) && metadata.Host != "" { if (!metadata.Resolved() || w.resolver != nil) && metadata.Host != "" {
r := resolver.DefaultResolver r := resolver.DefaultResolver
if w.resolver != nil { if w.resolver != nil {
w.refP.SetProxyAdapter(w)
defer w.refP.ClearProxyAdapter()
r = w.resolver r = w.resolver
} }
ip, err := resolver.ResolveIPWithResolver(ctx, metadata.Host, r) ip, err := resolver.ResolveIPWithResolver(ctx, metadata.Host, r)
@ -553,139 +540,10 @@ func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadat
if pc == nil { if pc == nil {
return nil, E.New("packetConn is nil") return nil, E.New("packetConn is nil")
} }
return newPacketConn(CN.NewRefPacketConn(pc, w), w), nil return newPacketConn(pc, w), nil
} }
// IsL3Protocol implements C.ProxyAdapter // IsL3Protocol implements C.ProxyAdapter
func (w *WireGuard) IsL3Protocol(metadata *C.Metadata) bool { func (w *WireGuard) IsL3Protocol(metadata *C.Metadata) bool {
return true return true
} }
type refProxyAdapter struct {
proxyAdapter C.ProxyAdapter
count int
mutex sync.Mutex
}
func (r *refProxyAdapter) SetProxyAdapter(proxyAdapter C.ProxyAdapter) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.proxyAdapter = proxyAdapter
r.count++
}
func (r *refProxyAdapter) ClearProxyAdapter() {
r.mutex.Lock()
defer r.mutex.Unlock()
r.count--
if r.count == 0 {
r.proxyAdapter = nil
}
}
func (r *refProxyAdapter) Name() string {
if r.proxyAdapter != nil {
return r.proxyAdapter.Name()
}
return ""
}
func (r *refProxyAdapter) Type() C.AdapterType {
if r.proxyAdapter != nil {
return r.proxyAdapter.Type()
}
return C.AdapterType(0)
}
func (r *refProxyAdapter) Addr() string {
if r.proxyAdapter != nil {
return r.proxyAdapter.Addr()
}
return ""
}
func (r *refProxyAdapter) SupportUDP() bool {
if r.proxyAdapter != nil {
return r.proxyAdapter.SupportUDP()
}
return false
}
func (r *refProxyAdapter) ProxyInfo() C.ProxyInfo {
if r.proxyAdapter != nil {
return r.proxyAdapter.ProxyInfo()
}
return C.ProxyInfo{}
}
func (r *refProxyAdapter) MarshalJSON() ([]byte, error) {
if r.proxyAdapter != nil {
return r.proxyAdapter.MarshalJSON()
}
return nil, C.ErrNotSupport
}
func (r *refProxyAdapter) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.Metadata) (net.Conn, error) {
if r.proxyAdapter != nil {
return r.proxyAdapter.StreamConnContext(ctx, c, metadata)
}
return nil, C.ErrNotSupport
}
func (r *refProxyAdapter) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
if r.proxyAdapter != nil {
return r.proxyAdapter.DialContext(ctx, metadata, opts...)
}
return nil, C.ErrNotSupport
}
func (r *refProxyAdapter) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) {
if r.proxyAdapter != nil {
return r.proxyAdapter.ListenPacketContext(ctx, metadata, opts...)
}
return nil, C.ErrNotSupport
}
func (r *refProxyAdapter) SupportUOT() bool {
if r.proxyAdapter != nil {
return r.proxyAdapter.SupportUOT()
}
return false
}
func (r *refProxyAdapter) SupportWithDialer() C.NetWork {
if r.proxyAdapter != nil {
return r.proxyAdapter.SupportWithDialer()
}
return C.InvalidNet
}
func (r *refProxyAdapter) DialContextWithDialer(ctx context.Context, dialer C.Dialer, metadata *C.Metadata) (C.Conn, error) {
if r.proxyAdapter != nil {
return r.proxyAdapter.DialContextWithDialer(ctx, dialer, metadata)
}
return nil, C.ErrNotSupport
}
func (r *refProxyAdapter) ListenPacketWithDialer(ctx context.Context, dialer C.Dialer, metadata *C.Metadata) (C.PacketConn, error) {
if r.proxyAdapter != nil {
return r.proxyAdapter.ListenPacketWithDialer(ctx, dialer, metadata)
}
return nil, C.ErrNotSupport
}
func (r *refProxyAdapter) IsL3Protocol(metadata *C.Metadata) bool {
if r.proxyAdapter != nil {
return r.proxyAdapter.IsL3Protocol(metadata)
}
return false
}
func (r *refProxyAdapter) Unwrap(metadata *C.Metadata, touch bool) C.Proxy {
if r.proxyAdapter != nil {
return r.proxyAdapter.Unwrap(metadata, touch)
}
return nil
}
var _ C.ProxyAdapter = (*refProxyAdapter)(nil)

View File

@ -1,45 +0,0 @@
//go:build with_gvisor
package outbound
import (
"context"
"runtime"
"testing"
"time"
)
func TestWireGuardGC(t *testing.T) {
option := WireGuardOption{}
option.Server = "162.159.192.1"
option.Port = 2408
option.PrivateKey = "iOx7749AdqH3IqluG7+0YbGKd0m1mcEXAfGRzpy9rG8="
option.PublicKey = "bmXOC+F1FxEMF9dyiK2H5/1SUtzH0JuVo51h2wPfgyo="
option.Ip = "172.16.0.2"
option.Ipv6 = "2606:4700:110:8d29:be92:3a6a:f4:c437"
option.Reserved = []uint8{51, 69, 125}
wg, err := NewWireGuard(option)
if err != nil {
t.Error(err)
}
closeCh := make(chan struct{})
wg.closeCh = closeCh
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
err = wg.init(ctx)
if err != nil {
t.Error(err)
return
}
// must do a small sleep before test GC
// because it maybe deadlocks if w.device.Close call too fast after w.device.Start
time.Sleep(10 * time.Millisecond)
wg = nil
runtime.GC()
select {
case <-closeCh:
return
case <-ctx.Done():
t.Error("timeout not GC")
}
}

View File

@ -3,10 +3,9 @@ package adapter
import ( import (
"fmt" "fmt"
tlsC "github.com/metacubex/mihomo/component/tls"
"github.com/metacubex/mihomo/adapter/outbound" "github.com/metacubex/mihomo/adapter/outbound"
"github.com/metacubex/mihomo/common/structure" "github.com/metacubex/mihomo/common/structure"
tlsC "github.com/metacubex/mihomo/component/tls"
C "github.com/metacubex/mihomo/constant" C "github.com/metacubex/mihomo/constant"
) )
@ -18,7 +17,7 @@ func ParseProxy(mapping map[string]any) (C.Proxy, error) {
} }
var ( var (
proxy C.ProxyAdapter proxy outbound.ProxyAdapter
err error err error
) )
switch proxyType { switch proxyType {
@ -170,7 +169,7 @@ func ParseProxy(mapping map[string]any) (C.Proxy, error) {
return nil, err return nil, err
} }
if muxOption.Enabled { if muxOption.Enabled {
proxy, err = outbound.NewSingMux(*muxOption, proxy, proxy.(outbound.ProxyBase)) proxy, err = outbound.NewSingMux(*muxOption, proxy)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -149,6 +149,9 @@ type ProxyAdapter interface {
// Unwrap extracts the proxy from a proxy-group. It returns nil when nothing to extract. // Unwrap extracts the proxy from a proxy-group. It returns nil when nothing to extract.
Unwrap(metadata *Metadata, touch bool) Proxy Unwrap(metadata *Metadata, touch bool) Proxy
// Close releasing associated resources
Close() error
} }
type Group interface { type Group interface {