Skip to content

Commit 52298cf

Browse files
committed
DRA kubelet: fix potential flake in unit test
If the test binary ran long enough after test completion to reach the ResourceSlice removal grace period, that background activity started and failed because it was using out-dated state and an invalid testing.T pointer, causing a panic. The root cause was to leave those background activities running. They need to be stopped before a test returns.
1 parent ee41b03 commit 52298cf

File tree

2 files changed

+40
-8
lines changed

2 files changed

+40
-8
lines changed

pkg/kubelet/cm/dra/plugin/registration.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ type RegistrationHandler struct {
5252
// This is necessary because it implements APIs which don't
5353
// provide a context.
5454
backgroundCtx context.Context
55+
cancel func(err error)
5556
kubeClient kubernetes.Interface
5657
getNode func() (*v1.Node, error)
5758
wipingDelay time.Duration
5859

60+
wg sync.WaitGroup
5961
mutex sync.Mutex
6062

6163
// pendingWipes maps a plugin name to a cancel function for
@@ -76,9 +78,15 @@ var _ cache.PluginHandler = &RegistrationHandler{}
7678
// If a kubeClient is provided, then it synchronizes ResourceSlices
7779
// with the resource information provided by plugins.
7880
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler {
81+
// The context and thus logger should come from the caller.
82+
return newRegistrationHandler(context.TODO(), kubeClient, getNode, wipingDelay)
83+
}
84+
85+
func newRegistrationHandler(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler {
86+
ctx, cancel := context.WithCancelCause(ctx)
7987
handler := &RegistrationHandler{
80-
// The context and thus logger should come from the caller.
81-
backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")),
88+
backgroundCtx: klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "DRA registration handler")),
89+
cancel: cancel,
8290
kubeClient: kubeClient,
8391
getNode: getNode,
8492
wipingDelay: wipingDelay,
@@ -92,13 +100,24 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1
92100
// to start up.
93101
//
94102
// This has to run in the background.
95-
logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup")
96-
ctx := klog.NewContext(handler.backgroundCtx, logger)
97-
go handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */)
103+
handler.wg.Add(1)
104+
go func() {
105+
defer handler.wg.Done()
106+
107+
logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup")
108+
ctx := klog.NewContext(handler.backgroundCtx, logger)
109+
handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */)
110+
}()
98111

99112
return handler
100113
}
101114

115+
// Stop cancels any remaining background activities and blocks until all goroutines have stopped.
116+
func (h *RegistrationHandler) Stop() {
117+
h.cancel(errors.New("Stop was called"))
118+
h.wg.Wait()
119+
}
120+
102121
// wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver.
103122
// Wiping will delay for a while and can be canceled by canceling the context.
104123
func (h *RegistrationHandler) wipeResourceSlices(ctx context.Context, delay time.Duration, driver string) {
@@ -291,7 +310,9 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) {
291310
}
292311
h.pendingWipes[pluginName] = &cancel
293312

313+
h.wg.Add(1)
294314
go func() {
315+
defer h.wg.Done()
295316
defer func() {
296317
h.mutex.Lock()
297318
defer h.mutex.Unlock()

pkg/kubelet/cm/dra/plugin/registration_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,15 +149,26 @@ func TestRegistrationHandler(t *testing.T) {
149149
// Set expected slice fields for the next call of this reactor.
150150
// The reactor will be called next time when resourceslices object is deleted
151151
// by the kubelet after plugin deregistration.
152-
expectedSliceFields = fields.Set{"spec.nodeName": nodeName, "spec.driver": test.pluginName}
153-
152+
switch len(expectedSliceFields) {
153+
case 1:
154+
// Startup cleanup done, now expect cleanup for test plugin.
155+
expectedSliceFields = fields.Set{"spec.nodeName": nodeName, "spec.driver": test.pluginName}
156+
case 2:
157+
// Test plugin cleanup done, now expect cleanup for the other plugin.
158+
otherPlugin := pluginA
159+
if otherPlugin == test.pluginName {
160+
otherPlugin = pluginB
161+
}
162+
expectedSliceFields = fields.Set{"spec.nodeName": nodeName, "spec.driver": otherPlugin}
163+
}
154164
return true, nil, err
155165
})
156166
client = fakeClient
157167
}
158168

159169
// The handler wipes all slices at startup.
160-
handler := NewRegistrationHandler(client, getFakeNode, time.Second /* very short wiping delay for testing */)
170+
handler := newRegistrationHandler(tCtx, client, getFakeNode, time.Second /* very short wiping delay for testing */)
171+
tCtx.Cleanup(handler.Stop)
161172
requireNoSlices := func() {
162173
t.Helper()
163174
if client == nil {

0 commit comments

Comments
 (0)