Skip to content

Commit b0d6079

Browse files
authored
Merge pull request kubernetes#130947 from pohly/dra-device-taints-flake
DRA device taints: fix some race conditions
2 parents 3ac21c1 + cfb9486 commit b0d6079

File tree

4 files changed

+71
-24
lines changed

4 files changed

+71
-24
lines changed

cmd/kube-controller-manager/app/core.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,11 @@ func startDeviceTaintEvictionController(ctx context.Context, controllerContext C
254254
controllerContext.InformerFactory.Resource().V1beta1().DeviceClasses(),
255255
controllerName,
256256
)
257-
go deviceTaintEvictionController.Run(ctx)
257+
go func() {
258+
if err := deviceTaintEvictionController.Run(ctx); err != nil {
259+
klog.FromContext(ctx).Error(err, "Device taint processing leading to Pod eviction failed and is now paused")
260+
}
261+
}()
258262
return nil, true, nil
259263
}
260264

pkg/controller/devicetainteviction/device_taint_eviction.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package devicetainteviction
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"math"
2324
"slices"
@@ -319,7 +320,8 @@ func New(c clientset.Interface, podInformer coreinformers.PodInformer, claimInfo
319320
}
320321

321322
// Run starts the controller which will run until the context is done.
322-
func (tc *Controller) Run(ctx context.Context) {
323+
// An error is returned for startup problems.
324+
func (tc *Controller) Run(ctx context.Context) error {
323325
defer utilruntime.HandleCrash()
324326
logger := klog.FromContext(ctx)
325327
logger.Info("Starting", "controller", tc.name)
@@ -370,7 +372,7 @@ func (tc *Controller) Run(ctx context.Context) {
370372
// mutex serializes event processing.
371373
var mutex sync.Mutex
372374

373-
claimHandler, _ := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
375+
claimHandler, err := tc.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
374376
AddFunc: func(obj any) {
375377
claim, ok := obj.(*resourceapi.ResourceClaim)
376378
if !ok {
@@ -409,12 +411,15 @@ func (tc *Controller) Run(ctx context.Context) {
409411
tc.handleClaimChange(claim, nil)
410412
},
411413
})
414+
if err != nil {
415+
return fmt.Errorf("adding claim event handler:%w", err)
416+
}
412417
defer func() {
413418
_ = tc.claimInformer.Informer().RemoveEventHandler(claimHandler)
414419
}()
415420
tc.haveSynced = append(tc.haveSynced, claimHandler.HasSynced)
416421

417-
podHandler, _ := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
422+
podHandler, err := tc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
418423
AddFunc: func(obj any) {
419424
pod, ok := obj.(*v1.Pod)
420425
if !ok {
@@ -453,6 +458,9 @@ func (tc *Controller) Run(ctx context.Context) {
453458
tc.handlePodChange(pod, nil)
454459
},
455460
})
461+
if err != nil {
462+
return fmt.Errorf("adding pod event handler: %w", err)
463+
}
456464
defer func() {
457465
_ = tc.podInformer.Informer().RemoveEventHandler(podHandler)
458466
}()
@@ -467,8 +475,7 @@ func (tc *Controller) Run(ctx context.Context) {
467475
}
468476
sliceTracker, err := resourceslicetracker.StartTracker(ctx, opts)
469477
if err != nil {
470-
logger.Info("Failed to initialize ResourceSlice tracker; device taint processing leading to Pod eviction is now paused", "err", err)
471-
return
478+
return fmt.Errorf("initialize ResourceSlice tracker: %w", err)
472479
}
473480
tc.haveSynced = append(tc.haveSynced, sliceTracker.HasSynced)
474481
defer sliceTracker.Stop()
@@ -478,11 +485,11 @@ func (tc *Controller) Run(ctx context.Context) {
478485
// work which might be done as events get emitted for intermediate
479486
// state.
480487
if !cache.WaitForNamedCacheSyncWithContext(ctx, tc.haveSynced...) {
481-
return
488+
return errors.New("wait for cache sync timed out")
482489
}
483490
logger.V(1).Info("Underlying informers have synced")
484491

485-
_, _ = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{
492+
_, err = sliceTracker.AddEventHandler(cache.ResourceEventHandlerFuncs{
486493
AddFunc: func(obj any) {
487494
slice, ok := obj.(*resourceapi.ResourceSlice)
488495
if !ok {
@@ -519,12 +526,16 @@ func (tc *Controller) Run(ctx context.Context) {
519526
tc.handleSliceChange(slice, nil)
520527
},
521528
})
529+
if err != nil {
530+
return fmt.Errorf("add slice event handler: %w", err)
531+
}
522532

523533
// sliceTracker.AddEventHandler blocked while delivering events for all known
524534
// ResourceSlices. Therefore our own state is up-to-date once we get here.
525535
tc.hasSynced.Store(1)
526536

527537
<-ctx.Done()
538+
return nil
528539
}
529540

530541
func (tc *Controller) handleClaimChange(oldClaim, newClaim *resourceapi.ResourceClaim) {
@@ -783,12 +794,13 @@ func (tc *Controller) handlePodChange(oldPod, newPod *v1.Pod) {
783794

784795
// Pods get updated quite frequently. There's no need
785796
// to check them again unless something changed regarding
786-
// their claims.
797+
// their claims or they got scheduled.
787798
//
788799
// In particular this prevents adding the pod again
789800
// directly after the eviction condition got added
790801
// to it.
791802
if oldPod != nil &&
803+
oldPod.Spec.NodeName == newPod.Spec.NodeName &&
792804
apiequality.Semantic.DeepEqual(oldPod.Status.ResourceClaimStatuses, newPod.Status.ResourceClaimStatuses) {
793805
return
794806
}

pkg/controller/devicetainteviction/device_taint_eviction_test.go

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ var (
318318
OwnerReference(podName, podUID+"-other", podKind).
319319
UID("other").
320320
Obj()
321+
unscheduledPodWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
322+
UID(podUID).
323+
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}).
324+
Obj()
321325
podWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
322326
UID(podUID).
323327
PodResourceClaims(v1.PodResourceClaim{Name: resourceName, ResourceClaimName: &claimName}).
@@ -494,6 +498,23 @@ func TestHandlers(t *testing.T) {
494498
// At the moment, the code reliably cancels right away.
495499
wantEvents: []*v1.Event{cancelPodEviction},
496500
},
501+
"evict-pod-after-scheduling": {
502+
initialState: state{
503+
pods: []*v1.Pod{unscheduledPodWithClaimName},
504+
slices: []*resourceapi.ResourceSlice{sliceTainted, slice2},
505+
allocatedClaims: []allocatedClaim{{ResourceClaim: inUseClaim, evictionTime: &taintTime}},
506+
},
507+
events: []any{
508+
// Normally the scheduler shouldn't schedule when there is a taint,
509+
// but perhaps it didn't know yet.
510+
update(unscheduledPodWithClaimName, podWithClaimName),
511+
},
512+
finalState: state{
513+
slices: []*resourceapi.ResourceSlice{sliceTainted, slice2},
514+
allocatedClaims: []allocatedClaim{{ResourceClaim: inUseClaim, evictionTime: &taintTime}},
515+
evicting: []evictAt{{newObject(podWithClaimName), taintTime.Time}},
516+
},
517+
},
497518
"evict-pod-resourceclaim-unrelated-changes": {
498519
initialState: state{
499520
pods: []*v1.Pod{podWithClaimName},
@@ -1339,22 +1360,22 @@ func TestEviction(t *testing.T) {
13391360
wg.Add(1)
13401361
go func() {
13411362
defer wg.Done()
1342-
controller.Run(tCtx)
1363+
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
13431364
}()
13441365

13451366
// Eventually the controller should have synced it's informers.
1346-
require.Eventually(tCtx, func() bool {
1367+
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool {
13471368
return controller.hasSynced.Load() > 0
1348-
}, 30*time.Second, time.Millisecond, "controller synced")
1369+
}).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("controller synced"))
13491370
if tt.afterSync != nil {
13501371
tt.afterSync(tCtx)
13511372
}
13521373

13531374
// Eventually the pod gets deleted (= evicted).
1354-
assert.Eventually(tCtx, func() bool {
1375+
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool {
13551376
_, err := fakeClientset.CoreV1().Pods(pod.Namespace).Get(tCtx, pod.Name, metav1.GetOptions{})
13561377
return apierrors.IsNotFound(err)
1357-
}, 30*time.Second, time.Millisecond, "pod evicted")
1378+
}).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("pod evicted"))
13581379

13591380
pod := pod.DeepCopy()
13601381
pod.Status.Conditions = []v1.PodCondition{{
@@ -1369,7 +1390,10 @@ func TestEviction(t *testing.T) {
13691390

13701391
// Shortly after deletion we should also see updated metrics.
13711392
// This is the last thing the controller does for a pod.
1393+
// However, actually creating the event on the server is asynchronous,
1394+
// so we also have to wait for that.
13721395
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error {
1396+
gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent())
13731397
return testPodDeletionsMetrics(controller, 1)
13741398
}).WithTimeout(30*time.Second).Should(gomega.Succeed(), "pod eviction done")
13751399

@@ -1450,7 +1474,7 @@ func testCancelEviction(tCtx ktesting.TContext, deletePod bool) {
14501474
wg.Add(1)
14511475
go func() {
14521476
defer wg.Done()
1453-
controller.Run(tCtx)
1477+
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
14541478
}()
14551479

14561480
// Eventually the pod gets scheduled for eviction.
@@ -1543,15 +1567,15 @@ func TestParallelPodDeletion(t *testing.T) {
15431567
wg.Add(1)
15441568
go func() {
15451569
defer wg.Done()
1546-
controller.Run(tCtx)
1570+
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
15471571
}()
15481572

15491573
// Eventually the pod gets deleted, in this test by us.
1550-
assert.Eventually(tCtx, func() bool {
1574+
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) bool {
15511575
mutex.Lock()
15521576
defer mutex.Unlock()
15531577
return podGets >= 1
1554-
}, 30*time.Second, time.Millisecond, "pod eviction started")
1578+
}).WithTimeout(30 * time.Second).Should(gomega.BeTrueBecause("pod eviction started"))
15551579

15561580
// We don't want any events.
15571581
ktesting.Consistently(tCtx, func(tCtx ktesting.TContext) error {
@@ -1622,11 +1646,12 @@ func TestRetry(t *testing.T) {
16221646
wg.Add(1)
16231647
go func() {
16241648
defer wg.Done()
1625-
controller.Run(tCtx)
1649+
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
16261650
}()
16271651

1628-
// Eventually the pod gets deleted.
1652+
// Eventually the pod gets deleted and the event is recorded.
16291653
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) error {
1654+
gomega.NewWithT(tCtx).Expect(listEvents(tCtx)).Should(matchDeletionEvent())
16301655
return testPodDeletionsMetrics(controller, 1)
16311656
}).WithTimeout(30*time.Second).Should(gomega.Succeed(), "pod eviction done")
16321657

@@ -1694,15 +1719,15 @@ func TestEvictionFailure(t *testing.T) {
16941719
wg.Add(1)
16951720
go func() {
16961721
defer wg.Done()
1697-
controller.Run(tCtx)
1722+
assert.NoError(tCtx, controller.Run(tCtx), "eviction controller failed")
16981723
}()
16991724

17001725
// Eventually deletion is attempted a few times.
1701-
assert.Eventually(tCtx, func() bool {
1726+
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) int {
17021727
mutex.Lock()
17031728
defer mutex.Unlock()
1704-
return podDeletions >= retries
1705-
}, 30*time.Second, time.Millisecond, "pod eviction failed")
1729+
return podDeletions
1730+
}).WithTimeout(30*time.Second).Should(gomega.BeNumerically(">=", retries), "pod eviction failed")
17061731

17071732
// Now we can check the API calls.
17081733
ktesting.Consistently(tCtx, func(tCtx ktesting.TContext) error {

pkg/controller/tainteviction/timed_workers.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type TimedWorker struct {
5757
}
5858

5959
// createWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
60+
// Returns nil if the work was started immediately and doesn't need a timer.
6061
func createWorker(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(ctx context.Context, fireAt time.Time, args *WorkArgs) error, clock clock.WithDelayedExecution) *TimedWorker {
6162
delay := fireAt.Sub(createdAt)
6263
logger := klog.FromContext(ctx)
@@ -90,6 +91,7 @@ func (w *TimedWorker) Cancel() {
9091
type TimedWorkerQueue struct {
9192
sync.Mutex
9293
// map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker.
94+
// Entries may be nil if the work didn't need a timer and is already running.
9395
workers map[string]*TimedWorker
9496
workFunc func(ctx context.Context, fireAt time.Time, args *WorkArgs) error
9597
clock clock.WithDelayedExecution
@@ -145,6 +147,10 @@ func (q *TimedWorkerQueue) UpdateWork(ctx context.Context, args *WorkArgs, creat
145147
q.Lock()
146148
defer q.Unlock()
147149
if worker, exists := q.workers[key]; exists {
150+
if worker == nil {
151+
logger.V(4).Info("Keeping existing work, already in progress", "item", key)
152+
return
153+
}
148154
if worker.FireAt.Compare(fireAt) == 0 {
149155
logger.V(4).Info("Keeping existing work, same time", "item", key, "createTime", worker.CreatedAt, "firedTime", worker.FireAt)
150156
return

0 commit comments

Comments
 (0)