diff --git a/client/internal/connect.go b/client/internal/connect.go index ac498f719..72e096a80 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -333,6 +333,10 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan c.statusRecorder.MarkSignalConnected() relayURLs, token := parseRelayInfo(loginResp) + if override, ok := peer.OverrideRelayURLs(); ok { + log.Infof("overriding relay URLs from %s: %v", peer.EnvKeyNBHomeRelayServers, override) + relayURLs = override + } peerConfig := loginResp.GetPeerConfig() engineConfig, err := createEngineConfig(myPrivateKey, c.config, peerConfig, logPath) diff --git a/client/internal/engine.go b/client/internal/engine.go index 8d7e02bd5..351e4bfe9 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -944,7 +944,12 @@ func (e *Engine) handleRelayUpdate(update *mgmProto.RelayConfig) error { return fmt.Errorf("update relay token: %w", err) } - e.relayManager.UpdateServerURLs(update.Urls) + urls := update.Urls + if override, ok := peer.OverrideRelayURLs(); ok { + log.Infof("overriding relay URLs from %s: %v", peer.EnvKeyNBHomeRelayServers, override) + urls = override + } + e.relayManager.UpdateServerURLs(urls) // Just in case the agent started with an MGM server where the relay was disabled but was later enabled. // We can ignore all errors because the guard will manage the reconnection retries. diff --git a/client/internal/peer/env.go b/client/internal/peer/env.go index b4ba9ad7b..ed6a3af53 100644 --- a/client/internal/peer/env.go +++ b/client/internal/peer/env.go @@ -7,7 +7,8 @@ import ( ) const ( - EnvKeyNBForceRelay = "NB_FORCE_RELAY" + EnvKeyNBForceRelay = "NB_FORCE_RELAY" + EnvKeyNBHomeRelayServers = "NB_HOME_RELAY_SERVERS" ) func IsForceRelayed() bool { @@ -16,3 +17,28 @@ func IsForceRelayed() bool { } return strings.EqualFold(os.Getenv(EnvKeyNBForceRelay), "true") } + +// OverrideRelayURLs returns the relay server URL list set in +// NB_HOME_RELAY_SERVERS (comma-separated) and a boolean indicating whether +// the override is active. When the env var is unset, the boolean is false +// and the caller should keep the list received from the management server. +// Intended for lab/debug scenarios where a peer must pin to a specific home +// relay regardless of what management offers. +func OverrideRelayURLs() ([]string, bool) { + raw := os.Getenv(EnvKeyNBHomeRelayServers) + if raw == "" { + return nil, false + } + parts := strings.Split(raw, ",") + urls := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + urls = append(urls, p) + } + } + if len(urls) == 0 { + return nil, false + } + return urls, true +} diff --git a/shared/relay/client/guard.go b/shared/relay/client/guard.go index f4d3a8cce..d7892d0ce 100644 --- a/shared/relay/client/guard.go +++ b/shared/relay/client/guard.go @@ -8,10 +8,7 @@ import ( log "github.com/sirupsen/logrus" ) -const ( - // TODO: make it configurable, the manager should validate all configurable parameters - reconnectingTimeout = 60 * time.Second -) +const defaultMaxBackoffInterval = 60 * time.Second // Guard manage the reconnection tries to the Relay server in case of disconnection event. type Guard struct { @@ -19,14 +16,23 @@ type Guard struct { OnNewRelayClient chan *Client OnReconnected chan struct{} serverPicker *ServerPicker + + // maxBackoffInterval caps the exponential backoff between reconnect + // attempts. + maxBackoffInterval time.Duration } -// NewGuard creates a new guard for the relay client. -func NewGuard(sp *ServerPicker) *Guard { +// NewGuard creates a new guard for the relay client. A non-positive +// maxBackoffInterval falls back to defaultMaxBackoffInterval. +func NewGuard(sp *ServerPicker, maxBackoffInterval time.Duration) *Guard { + if maxBackoffInterval <= 0 { + maxBackoffInterval = defaultMaxBackoffInterval + } g := &Guard{ - OnNewRelayClient: make(chan *Client, 1), - OnReconnected: make(chan struct{}, 1), - serverPicker: sp, + OnNewRelayClient: make(chan *Client, 1), + OnReconnected: make(chan struct{}, 1), + serverPicker: sp, + maxBackoffInterval: maxBackoffInterval, } return g } @@ -49,7 +55,7 @@ func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) { } // start a ticker to pick a new server - ticker := exponentTicker(ctx) + ticker := g.exponentTicker(ctx) defer ticker.Stop() for { @@ -125,11 +131,11 @@ func (g *Guard) notifyReconnected() { } } -func exponentTicker(ctx context.Context) *backoff.Ticker { +func (g *Guard) exponentTicker(ctx context.Context) *backoff.Ticker { bo := backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 2 * time.Second, Multiplier: 2, - MaxInterval: reconnectingTimeout, + MaxInterval: g.maxBackoffInterval, Clock: backoff.SystemClock, }, ctx) diff --git a/shared/relay/client/manager.go b/shared/relay/client/manager.go index 6220e7f6b..37104bfe7 100644 --- a/shared/relay/client/manager.go +++ b/shared/relay/client/manager.go @@ -39,6 +39,15 @@ func NewRelayTrack() *RelayTrack { type OnServerCloseListener func() +// ManagerOption configures a Manager at construction time. +type ManagerOption func(*Manager) + +// WithMaxBackoffInterval caps the exponential backoff between reconnect +// attempts to the home relay. A non-positive value keeps the default. +func WithMaxBackoffInterval(d time.Duration) ManagerOption { + return func(m *Manager) { m.maxBackoffInterval = d } +} + // Manager is a manager for the relay client instances. It establishes one persistent connection to the given relay URL // and automatically reconnect to them in case disconnection. // The manager also manage temporary relay connection. If a client wants to communicate with a client on a @@ -64,12 +73,13 @@ type Manager struct { onReconnectedListenerFn func() listenerLock sync.Mutex - mtu uint16 + mtu uint16 + maxBackoffInterval time.Duration } // NewManager creates a new manager instance. // The serverURL address can be empty. In this case, the manager will not serve. -func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uint16) *Manager { +func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uint16, opts ...ManagerOption) *Manager { tokenStore := &relayAuth.TokenStore{} m := &Manager{ @@ -86,8 +96,11 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin relayClients: make(map[string]*RelayTrack), onDisconnectedListeners: make(map[string]*list.List), } + for _, opt := range opts { + opt(m) + } m.serverPicker.ServerURLs.Store(serverURLs) - m.reconnectGuard = NewGuard(m.serverPicker) + m.reconnectGuard = NewGuard(m.serverPicker, m.maxBackoffInterval) return m } @@ -290,19 +303,36 @@ func (m *Manager) onServerConnected() { go m.onReconnectedListenerFn() } -// onServerDisconnected start to reconnection for home server only +// onServerDisconnected handles relay disconnect events. For the home server it +// starts the reconnect guard. For foreign servers it evicts the now-dead client +// from the cache so the next OpenConn builds a fresh one instead of reusing a +// closed client. func (m *Manager) onServerDisconnected(serverAddress string) { m.relayClientMu.Lock() - if serverAddress == m.relayClient.connectionURL { + isHome := m.relayClient != nil && serverAddress == m.relayClient.connectionURL + if isHome { go func(client *Client) { m.reconnectGuard.StartReconnectTrys(m.ctx, client) }(m.relayClient) } m.relayClientMu.Unlock() + if !isHome { + m.evictForeignRelay(serverAddress) + } + m.notifyOnDisconnectListeners(serverAddress) } +func (m *Manager) evictForeignRelay(serverAddress string) { + m.relayClientsMutex.Lock() + defer m.relayClientsMutex.Unlock() + if _, ok := m.relayClients[serverAddress]; ok { + delete(m.relayClients, serverAddress) + log.Debugf("evicted disconnected foreign relay client: %s", serverAddress) + } +} + func (m *Manager) listenGuardEvent(ctx context.Context) { for { select { diff --git a/shared/relay/client/manager_test.go b/shared/relay/client/manager_test.go index fb91f7682..5bbcad886 100644 --- a/shared/relay/client/manager_test.go +++ b/shared/relay/client/manager_test.go @@ -2,6 +2,7 @@ package client import ( "context" + "fmt" "testing" "time" @@ -360,7 +361,8 @@ func TestAutoReconnect(t *testing.T) { t.Fatalf("failed to serve manager: %s", err) } - clientAlice := NewManager(mCtx, toURL(srvCfg), "alice", iface.DefaultMTU) + clientAlice := NewManager(mCtx, toURL(srvCfg), "alice", iface.DefaultMTU, + WithMaxBackoffInterval(2*time.Second)) err = clientAlice.Serve() if err != nil { t.Fatalf("failed to serve manager: %s", err) @@ -384,7 +386,9 @@ func TestAutoReconnect(t *testing.T) { } log.Infof("waiting for reconnection") - time.Sleep(reconnectingTimeout + 1*time.Second) + if err := waitForReady(ctx, clientAlice, 15*time.Second); err != nil { + t.Fatalf("manager did not reconnect: %s", err) + } log.Infof("reopent the connection") _, err = clientAlice.OpenConn(ctx, ra, "bob") @@ -393,6 +397,21 @@ func TestAutoReconnect(t *testing.T) { } } +func waitForReady(ctx context.Context, m *Manager, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if m.Ready() { + return nil + } + select { + case <-time.After(100 * time.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } + return fmt.Errorf("manager not ready within %s", timeout) +} + func TestNotifierDoubleAdd(t *testing.T) { ctx := context.Background()