Skip to content

Commit 88dfcb2

Browse files
authored
Merge pull request kubernetes#131065 from pohly/dra-kubelet-registration-unit-test-fix
DRA kubelet: fix potential flake in unit test
2 parents cacd595 + 52298cf commit 88dfcb2

File tree

2 files changed

+40
-8
lines changed

2 files changed

+40
-8
lines changed

pkg/kubelet/cm/dra/plugin/registration.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ type RegistrationHandler struct {
5252
// This is necessary because it implements APIs which don't
5353
// provide a context.
5454
backgroundCtx context.Context
55+
cancel func(err error)
5556
kubeClient kubernetes.Interface
5657
getNode func() (*v1.Node, error)
5758
wipingDelay time.Duration
5859

60+
wg sync.WaitGroup
5961
mutex sync.Mutex
6062

6163
// pendingWipes maps a plugin name to a cancel function for
@@ -76,9 +78,15 @@ var _ cache.PluginHandler = &RegistrationHandler{}
7678
// If a kubeClient is provided, then it synchronizes ResourceSlices
7779
// with the resource information provided by plugins.
7880
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler {
81+
// The context and thus logger should come from the caller.
82+
return newRegistrationHandler(context.TODO(), kubeClient, getNode, wipingDelay)
83+
}
84+
85+
func newRegistrationHandler(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler {
86+
ctx, cancel := context.WithCancelCause(ctx)
7987
handler := &RegistrationHandler{
80-
// The context and thus logger should come from the caller.
81-
backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")),
88+
backgroundCtx: klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "DRA registration handler")),
89+
cancel: cancel,
8290
kubeClient: kubeClient,
8391
getNode: getNode,
8492
wipingDelay: wipingDelay,
@@ -92,13 +100,24 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1
92100
// to start up.
93101
//
94102
// This has to run in the background.
95-
logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup")
96-
ctx := klog.NewContext(handler.backgroundCtx, logger)
97-
go handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */)
103+
handler.wg.Add(1)
104+
go func() {
105+
defer handler.wg.Done()
106+
107+
logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup")
108+
ctx := klog.NewContext(handler.backgroundCtx, logger)
109+
handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */)
110+
}()
98111

99112
return handler
100113
}
101114

115+
// Stop cancels any remaining background activities and blocks until all goroutines have stopped.
116+
func (h *RegistrationHandler) Stop() {
117+
h.cancel(errors.New("Stop was called"))
118+
h.wg.Wait()
119+
}
120+
102121
// wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver.
103122
// Wiping will delay for a while and can be canceled by canceling the context.
104123
func (h *RegistrationHandler) wipeResourceSlices(ctx context.Context, delay time.Duration, driver string) {
@@ -291,7 +310,9 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) {
291310
}
292311
h.pendingWipes[pluginName] = &cancel
293312

313+
h.wg.Add(1)
294314
go func() {
315+
defer h.wg.Done()
295316
defer func() {
296317
h.mutex.Lock()
297318
defer h.mutex.Unlock()

pkg/kubelet/cm/dra/plugin/registration_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,15 +149,26 @@ func TestRegistrationHandler(t *testing.T) {
149149
// Set expected slice fields for the next call of this reactor.
150150
// The reactor will be called next time when resourceslices object is deleted
151151
// by the kubelet after plugin deregistration.
152-
expectedSliceFields = fields.Set{"spec.nodeName": nodeName, "spec.driver": test.pluginName}
153-
152+
switch len(expectedSliceFields) {
153+
case 1:
154+
// Startup cleanup done, now expect cleanup for test plugin.
155+
expectedSliceFields = fields.Set{"spec.nodeName": nodeName, "spec.driver": test.pluginName}
156+
case 2:
157+
// Test plugin cleanup done, now expect cleanup for the other plugin.
158+
otherPlugin := pluginA
159+
if otherPlugin == test.pluginName {
160+
otherPlugin = pluginB
161+
}
162+
expectedSliceFields = fields.Set{"spec.nodeName": nodeName, "spec.driver": otherPlugin}
163+
}
154164
return true, nil, err
155165
})
156166
client = fakeClient
157167
}
158168

159169
// The handler wipes all slices at startup.
160-
handler := NewRegistrationHandler(client, getFakeNode, time.Second /* very short wiping delay for testing */)
170+
handler := newRegistrationHandler(tCtx, client, getFakeNode, time.Second /* very short wiping delay for testing */)
171+
tCtx.Cleanup(handler.Stop)
161172
requireNoSlices := func() {
162173
t.Helper()
163174
if client == nil {

0 commit comments

Comments
 (0)