health,ipn/ipnlocal: introduce eventbus in heath.Tracker (#17085)

The Tracker was using direct callbacks to ipnlocal. This PR moves those
to be triggered via the eventbus.

Additionally, the eventbus is now closed on exit from tailscaled
explicitly, and health is now a SubSystem in tsd.

Updates #15160

Signed-off-by: Claus Lensbøl <claus@tailscale.com>
This commit is contained in:
Claus Lensbøl
2025-09-16 11:25:29 -04:00
committed by GitHub
parent 4cca9f7c67
commit 2015ce4081
37 changed files with 404 additions and 245 deletions
+46
View File
@@ -25,6 +25,7 @@ import (
"tailscale.com/tstime"
"tailscale.com/types/opt"
"tailscale.com/util/cibuild"
"tailscale.com/util/eventbus"
"tailscale.com/util/mak"
"tailscale.com/util/multierr"
"tailscale.com/util/set"
@@ -76,6 +77,9 @@ type Tracker struct {
testClock tstime.Clock // nil means use time.Now / tstime.StdClock{}
eventClient *eventbus.Client
changePub *eventbus.Publisher[Change]
// mu guards everything that follows.
mu sync.Mutex
@@ -119,6 +123,20 @@ type Tracker struct {
metricHealthMessage *metrics.MultiLabelMap[metricHealthMessageLabel]
}
// NewTracker contructs a new [Tracker] and attaches the given eventbus.
// NewTracker will panic is no eventbus is given.
func NewTracker(bus *eventbus.Bus) *Tracker {
if bus == nil {
panic("no eventbus set")
}
cli := bus.Client("health.Tracker")
return &Tracker{
eventClient: cli,
changePub: eventbus.Publish[Change](cli),
}
}
func (t *Tracker) now() time.Time {
if t.testClock != nil {
return t.testClock.Now()
@@ -418,6 +436,28 @@ func (t *Tracker) setUnhealthyLocked(w *Warnable, args Args) {
Warnable: w,
UnhealthyState: w.unhealthyState(ws),
}
// Publish the change to the event bus. If the change is already visible
// now, publish it immediately; otherwise queue a timer to publish it at
// a future time when it becomes visible.
if w.IsVisible(ws, t.now) {
t.changePub.Publish(change)
} else {
visibleIn := w.TimeToVisible - t.now().Sub(brokenSince)
tc := t.clock().AfterFunc(visibleIn, func() {
t.mu.Lock()
defer t.mu.Unlock()
// Check if the Warnable is still unhealthy, as it could have become healthy between the time
// the timer was set for and the time it was executed.
if t.warnableVal[w] != nil {
t.changePub.Publish(change)
delete(t.pendingVisibleTimers, w)
}
})
mak.Set(&t.pendingVisibleTimers, w, tc)
}
// Direct callbacks
// TODO(cmol): Remove once all watchers have been moved to events
for _, cb := range t.watchers {
// If the Warnable has been unhealthy for more than its TimeToVisible, the callback should be
// executed immediately. Otherwise, the callback should be enqueued to run once the Warnable
@@ -473,7 +513,9 @@ func (t *Tracker) setHealthyLocked(w *Warnable) {
WarnableChanged: true,
Warnable: w,
}
t.changePub.Publish(change)
for _, cb := range t.watchers {
// TODO(cmol): Remove once all watchers have been moved to events
cb(change)
}
}
@@ -484,7 +526,11 @@ func (t *Tracker) notifyWatchersControlChangedLocked() {
change := Change{
ControlHealthChanged: true,
}
if t.changePub != nil {
t.changePub.Publish(change)
}
for _, cb := range t.watchers {
// TODO(cmol): Remove once all watchers have been moved to events
cb(change)
}
}
+216 -111
View File
@@ -18,12 +18,34 @@ import (
"tailscale.com/tstest"
"tailscale.com/tstime"
"tailscale.com/types/opt"
"tailscale.com/util/eventbus"
"tailscale.com/util/eventbus/eventbustest"
"tailscale.com/util/usermetric"
"tailscale.com/version"
)
func wantChange(c Change) func(c Change) (bool, error) {
return func(cEv Change) (bool, error) {
if cEv.ControlHealthChanged != c.ControlHealthChanged {
return false, fmt.Errorf("expected ControlHealthChanged %t, got %t", c.ControlHealthChanged, cEv.ControlHealthChanged)
}
if cEv.WarnableChanged != c.WarnableChanged {
return false, fmt.Errorf("expected WarnableChanged %t, got %t", c.WarnableChanged, cEv.WarnableChanged)
}
if c.Warnable != nil && (cEv.Warnable == nil || cEv.Warnable != c.Warnable) {
return false, fmt.Errorf("expected Warnable %+v, got %+v", c.Warnable, cEv.Warnable)
}
if c.UnhealthyState != nil {
panic("comparison of UnhealthyState is not yet supported")
}
return true, nil
}
}
func TestAppendWarnableDebugFlags(t *testing.T) {
var tr Tracker
tr := NewTracker(eventbustest.NewBus(t))
for i := range 10 {
w := Register(&Warnable{
@@ -68,7 +90,9 @@ func TestNilMethodsDontCrash(t *testing.T) {
}
func TestSetUnhealthyWithDuplicateThenHealthyAgain(t *testing.T) {
ht := Tracker{}
bus := eventbustest.NewBus(t)
watcher := eventbustest.NewWatcher(t, bus)
ht := NewTracker(bus)
if len(ht.Strings()) != 0 {
t.Fatalf("before first insertion, len(newTracker.Strings) = %d; want = 0", len(ht.Strings()))
}
@@ -92,10 +116,20 @@ func TestSetUnhealthyWithDuplicateThenHealthyAgain(t *testing.T) {
if !reflect.DeepEqual(ht.Strings(), want) {
t.Fatalf("after setting the healthy, newTracker.Strings() = %v; want = %v", ht.Strings(), want)
}
if err := eventbustest.ExpectExactly(watcher,
wantChange(Change{WarnableChanged: true, Warnable: testWarnable}),
wantChange(Change{WarnableChanged: true, Warnable: testWarnable}),
wantChange(Change{WarnableChanged: true, Warnable: testWarnable}),
); err != nil {
t.Fatalf("expected events, got %q", err)
}
}
func TestRemoveAllWarnings(t *testing.T) {
ht := Tracker{}
bus := eventbustest.NewBus(t)
watcher := eventbustest.NewWatcher(t, bus)
ht := NewTracker(bus)
if len(ht.Strings()) != 0 {
t.Fatalf("before first insertion, len(newTracker.Strings) = %d; want = 0", len(ht.Strings()))
}
@@ -109,67 +143,105 @@ func TestRemoveAllWarnings(t *testing.T) {
if len(ht.Strings()) != 0 {
t.Fatalf("after RemoveAll, len(newTracker.Strings) = %d; want = 0", len(ht.Strings()))
}
if err := eventbustest.ExpectExactly(watcher,
wantChange(Change{WarnableChanged: true, Warnable: testWarnable}),
wantChange(Change{WarnableChanged: true, Warnable: testWarnable}),
); err != nil {
t.Fatalf("expected events, got %q", err)
}
}
// TestWatcher tests that a registered watcher function gets called with the correct
// Warnable and non-nil/nil UnhealthyState upon setting a Warnable to unhealthy/healthy.
func TestWatcher(t *testing.T) {
ht := Tracker{}
wantText := "Hello world"
becameUnhealthy := make(chan struct{})
becameHealthy := make(chan struct{})
tests := []struct {
name string
preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change))
}{
{
name: "with-callbacks",
preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) {
t.Cleanup(tht.RegisterWatcher(fn))
if len(tht.watchers) != 1 {
t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers))
}
},
},
{
name: "with-eventbus",
preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) {
client := bus.Client("healthwatchertestclient")
sub := eventbus.Subscribe[Change](client)
go func() {
for {
select {
case <-sub.Done():
return
case change := <-sub.Events():
fn(change)
}
}
}()
},
},
}
watcherFunc := func(c Change) {
w := c.Warnable
us := c.UnhealthyState
if w != testWarnable {
t.Fatalf("watcherFunc was called, but with an unexpected Warnable: %v, want: %v", w, testWarnable)
}
for _, tt := range tests {
t.Run(tt.name, func(*testing.T) {
bus := eventbustest.NewBus(t)
ht := NewTracker(bus)
wantText := "Hello world"
becameUnhealthy := make(chan struct{})
becameHealthy := make(chan struct{})
if us != nil {
if us.Text != wantText {
t.Fatalf("unexpected us.Text: %s, want: %s", us.Text, wantText)
watcherFunc := func(c Change) {
w := c.Warnable
us := c.UnhealthyState
if w != testWarnable {
t.Fatalf("watcherFunc was called, but with an unexpected Warnable: %v, want: %v", w, testWarnable)
}
if us != nil {
if us.Text != wantText {
t.Fatalf("unexpected us.Text: %q, want: %s", us.Text, wantText)
}
if us.Args[ArgError] != wantText {
t.Fatalf("unexpected us.Args[ArgError]: %q, want: %s", us.Args[ArgError], wantText)
}
becameUnhealthy <- struct{}{}
} else {
becameHealthy <- struct{}{}
}
}
if us.Args[ArgError] != wantText {
t.Fatalf("unexpected us.Args[ArgError]: %s, want: %s", us.Args[ArgError], wantText)
// Set up test
tt.preFunc(t, ht, bus, watcherFunc)
// Start running actual test
ht.SetUnhealthy(testWarnable, Args{ArgError: wantText})
select {
case <-becameUnhealthy:
// Test passed because the watcher got notified of an unhealthy state
case <-becameHealthy:
// Test failed because the watcher got of a healthy state instead of an unhealthy one
t.Fatalf("watcherFunc was called with a healthy state")
case <-time.After(5 * time.Second):
t.Fatalf("watcherFunc didn't get called upon calling SetUnhealthy")
}
becameUnhealthy <- struct{}{}
} else {
becameHealthy <- struct{}{}
}
}
unregisterFunc := ht.RegisterWatcher(watcherFunc)
if len(ht.watchers) != 1 {
t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(ht.watchers))
}
ht.SetUnhealthy(testWarnable, Args{ArgError: wantText})
ht.SetHealthy(testWarnable)
select {
case <-becameUnhealthy:
// Test passed because the watcher got notified of an unhealthy state
case <-becameHealthy:
// Test failed because the watcher got of a healthy state instead of an unhealthy one
t.Fatalf("watcherFunc was called with a healthy state")
case <-time.After(1 * time.Second):
t.Fatalf("watcherFunc didn't get called upon calling SetUnhealthy")
}
ht.SetHealthy(testWarnable)
select {
case <-becameUnhealthy:
// Test failed because the watcher got of an unhealthy state instead of a healthy one
t.Fatalf("watcherFunc was called with an unhealthy state")
case <-becameHealthy:
// Test passed because the watcher got notified of a healthy state
case <-time.After(1 * time.Second):
t.Fatalf("watcherFunc didn't get called upon calling SetUnhealthy")
}
unregisterFunc()
if len(ht.watchers) != 0 {
t.Fatalf("after unregisterFunc, len(newTracker.watchers) = %d; want = 0", len(ht.watchers))
select {
case <-becameUnhealthy:
// Test failed because the watcher got of an unhealthy state instead of a healthy one
t.Fatalf("watcherFunc was called with an unhealthy state")
case <-becameHealthy:
// Test passed because the watcher got notified of a healthy state
case <-time.After(5 * time.Second):
t.Fatalf("watcherFunc didn't get called upon calling SetUnhealthy")
}
})
}
}
@@ -178,45 +250,81 @@ func TestWatcher(t *testing.T) {
// has a TimeToVisible set, which means that a watcher should only be notified of an unhealthy state after
// the TimeToVisible duration has passed.
func TestSetUnhealthyWithTimeToVisible(t *testing.T) {
ht := Tracker{}
mw := Register(&Warnable{
Code: "test-warnable-3-secs-to-visible",
Title: "Test Warnable with 3 seconds to visible",
Text: StaticMessage("Hello world"),
TimeToVisible: 2 * time.Second,
ImpactsConnectivity: true,
})
defer unregister(mw)
becameUnhealthy := make(chan struct{})
becameHealthy := make(chan struct{})
watchFunc := func(c Change) {
w := c.Warnable
us := c.UnhealthyState
if w != mw {
t.Fatalf("watcherFunc was called, but with an unexpected Warnable: %v, want: %v", w, w)
}
if us != nil {
becameUnhealthy <- struct{}{}
} else {
becameHealthy <- struct{}{}
}
tests := []struct {
name string
preFunc func(t *testing.T, ht *Tracker, bus *eventbus.Bus, fn func(Change))
}{
{
name: "with-callbacks",
preFunc: func(t *testing.T, tht *Tracker, _ *eventbus.Bus, fn func(c Change)) {
t.Cleanup(tht.RegisterWatcher(fn))
if len(tht.watchers) != 1 {
t.Fatalf("after RegisterWatcher, len(newTracker.watchers) = %d; want = 1", len(tht.watchers))
}
},
},
{
name: "with-eventbus",
preFunc: func(_ *testing.T, _ *Tracker, bus *eventbus.Bus, fn func(c Change)) {
client := bus.Client("healthwatchertestclient")
sub := eventbus.Subscribe[Change](client)
go func() {
for {
select {
case <-sub.Done():
return
case change := <-sub.Events():
fn(change)
}
}
}()
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(*testing.T) {
bus := eventbustest.NewBus(t)
ht := NewTracker(bus)
mw := Register(&Warnable{
Code: "test-warnable-3-secs-to-visible",
Title: "Test Warnable with 3 seconds to visible",
Text: StaticMessage("Hello world"),
TimeToVisible: 2 * time.Second,
ImpactsConnectivity: true,
})
ht.RegisterWatcher(watchFunc)
ht.SetUnhealthy(mw, Args{ArgError: "Hello world"})
becameUnhealthy := make(chan struct{})
becameHealthy := make(chan struct{})
select {
case <-becameUnhealthy:
// Test failed because the watcher got notified of an unhealthy state
t.Fatalf("watcherFunc was called with an unhealthy state")
case <-becameHealthy:
// Test failed because the watcher got of a healthy state
t.Fatalf("watcherFunc was called with a healthy state")
case <-time.After(1 * time.Second):
// As expected, watcherFunc still had not been called after 1 second
watchFunc := func(c Change) {
w := c.Warnable
us := c.UnhealthyState
if w != mw {
t.Fatalf("watcherFunc was called, but with an unexpected Warnable: %v, want: %v", w, w)
}
if us != nil {
becameUnhealthy <- struct{}{}
} else {
becameHealthy <- struct{}{}
}
}
tt.preFunc(t, ht, bus, watchFunc)
ht.SetUnhealthy(mw, Args{ArgError: "Hello world"})
select {
case <-becameUnhealthy:
// Test failed because the watcher got notified of an unhealthy state
t.Fatalf("watcherFunc was called with an unhealthy state")
case <-becameHealthy:
// Test failed because the watcher got of a healthy state
t.Fatalf("watcherFunc was called with a healthy state")
case <-time.After(1 * time.Second):
// As expected, watcherFunc still had not been called after 1 second
}
unregister(mw)
})
}
}
@@ -242,7 +350,7 @@ func TestRegisterWarnablePanicsWithDuplicate(t *testing.T) {
// TestCheckDependsOnAppearsInUnhealthyState asserts that the DependsOn field in the UnhealthyState
// is populated with the WarnableCode(s) of the Warnable(s) that a warning depends on.
func TestCheckDependsOnAppearsInUnhealthyState(t *testing.T) {
ht := Tracker{}
ht := NewTracker(eventbustest.NewBus(t))
w1 := Register(&Warnable{
Code: "w1",
Text: StaticMessage("W1 Text"),
@@ -352,11 +460,11 @@ func TestShowUpdateWarnable(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
tr := &Tracker{
checkForUpdates: tt.check,
applyUpdates: tt.apply,
latestVersion: tt.cv,
}
tr := NewTracker(eventbustest.NewBus(t))
tr.checkForUpdates = tt.check
tr.applyUpdates = tt.apply
tr.latestVersion = tt.cv
gotWarnable, gotShow := tr.showUpdateWarnable()
if gotWarnable != tt.wantWarnable {
t.Errorf("got warnable: %v, want: %v", gotWarnable, tt.wantWarnable)
@@ -401,11 +509,10 @@ func TestHealthMetric(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
tr := &Tracker{
checkForUpdates: tt.check,
applyUpdates: tt.apply,
latestVersion: tt.cv,
}
tr := NewTracker(eventbustest.NewBus(t))
tr.checkForUpdates = tt.check
tr.applyUpdates = tt.apply
tr.latestVersion = tt.cv
tr.SetMetricsRegistry(&usermetric.Registry{})
if val := tr.metricHealthMessage.Get(metricHealthMessageLabel{Type: MetricLabelWarning}).String(); val != strconv.Itoa(tt.wantMetricCount) {
t.Fatalf("metric value: %q, want: %q", val, strconv.Itoa(tt.wantMetricCount))
@@ -426,9 +533,8 @@ func TestNoDERPHomeWarnable(t *testing.T) {
Start: time.Unix(123, 0),
FollowRealTime: false,
})
ht := &Tracker{
testClock: clock,
}
ht := NewTracker(eventbustest.NewBus(t))
ht.testClock = clock
ht.SetIPNState("NeedsLogin", true)
// Advance 30 seconds to get past the "recentlyLoggedIn" check.
@@ -448,7 +554,7 @@ func TestNoDERPHomeWarnable(t *testing.T) {
// but doesn't use tstest.Clock so avoids the deadlock
// I hit: https://github.com/tailscale/tailscale/issues/14798
func TestNoDERPHomeWarnableManual(t *testing.T) {
ht := &Tracker{}
ht := NewTracker(eventbustest.NewBus(t))
ht.SetIPNState("NeedsLogin", true)
// Avoid wantRunning:
@@ -462,7 +568,7 @@ func TestNoDERPHomeWarnableManual(t *testing.T) {
}
func TestControlHealth(t *testing.T) {
ht := Tracker{}
ht := NewTracker(eventbustest.NewBus(t))
ht.SetIPNState("NeedsLogin", true)
ht.GotStreamedMapResponse()
@@ -620,7 +726,7 @@ func TestControlHealthNotifies(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ht := Tracker{}
ht := NewTracker(eventbustest.NewBus(t))
ht.SetIPNState("NeedsLogin", true)
ht.GotStreamedMapResponse()
@@ -643,7 +749,7 @@ func TestControlHealthNotifies(t *testing.T) {
}
func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) {
ht := Tracker{}
ht := NewTracker(eventbustest.NewBus(t))
ht.SetIPNState("NeedsLogin", true)
gotNotified := false
@@ -671,7 +777,7 @@ func TestControlHealthIgnoredOutsideMapPoll(t *testing.T) {
// created from Control health & returned by [Tracker.CurrentState] is different
// when the details of the [tailcfg.DisplayMessage] are different.
func TestCurrentStateETagControlHealth(t *testing.T) {
ht := Tracker{}
ht := NewTracker(eventbustest.NewBus(t))
ht.SetIPNState("NeedsLogin", true)
ht.GotStreamedMapResponse()
@@ -776,9 +882,8 @@ func TestCurrentStateETagControlHealth(t *testing.T) {
// when the details of the Warnable are different.
func TestCurrentStateETagWarnable(t *testing.T) {
newTracker := func(clock tstime.Clock) *Tracker {
ht := &Tracker{
testClock: clock,
}
ht := NewTracker(eventbustest.NewBus(t))
ht.testClock = clock
ht.SetIPNState("NeedsLogin", true)
ht.GotStreamedMapResponse()
return ht