Skip to content

Commit f6444e5

Browse files
author
Michal Minář
committed
handling image stream changes
Signed-off-by: Michal Minář <[email protected]>
1 parent 0a6dcfd commit f6444e5

File tree

5 files changed

+420
-36
lines changed

5 files changed

+420
-36
lines changed

pkg/oc/admin/prune/imageprune/helper.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@ import (
77
"sort"
88
"strings"
99

10+
"github.com/docker/distribution/registry/api/errcode"
11+
"github.com/golang/glog"
12+
1013
kmeta "k8s.io/apimachinery/pkg/api/meta"
1114
"k8s.io/apimachinery/pkg/runtime"
15+
kerrors "k8s.io/apimachinery/pkg/util/errors"
1216
"k8s.io/kubernetes/pkg/api/legacyscheme"
1317
kapiref "k8s.io/kubernetes/pkg/api/ref"
1418
kapi "k8s.io/kubernetes/pkg/apis/core"
1519

16-
"github.com/docker/distribution/registry/api/errcode"
17-
"github.com/golang/glog"
18-
19-
kerrors "k8s.io/apimachinery/pkg/util/errors"
20-
2120
imageapi "github.com/openshift/origin/pkg/image/apis/image"
21+
imagegraph "github.com/openshift/origin/pkg/oc/graph/imagegraph/nodes"
2222
"github.com/openshift/origin/pkg/util/netutils"
2323
)
2424

@@ -302,3 +302,32 @@ func getRef(obj runtime.Object) *kapi.ObjectReference {
302302
}
303303
return ref
304304
}
305+
306+
// removeImageNodesAtIndexes removes nodes at the given indexes from the given array and returns the resulting
307+
// slice that points to the original array. The indexes must be valid and sorted in ascending order. Otherwise
308+
// the result is undefined.
309+
func removeImageNodesAtIndexes(nodes []*imagegraph.ImageNode, indexes ...int) []*imagegraph.ImageNode {
310+
if len(nodes) == 0 || len(indexes) == 0 {
311+
return nodes
312+
}
313+
314+
tmp := nodes[:indexes[0]]
315+
316+
highestIndex := -1
317+
318+
for i, n := range indexes {
319+
if n <= highestIndex || n >= len(nodes) || n < 0 {
320+
// indexes are not sorted or invalid
321+
break
322+
}
323+
324+
highestIndex = n
325+
326+
if i < len(indexes)-1 && indexes[i+1] < len(nodes) && indexes[i+1] > highestIndex {
327+
tmp = append(tmp, nodes[n+1:indexes[i+1]]...)
328+
} else {
329+
tmp = append(tmp, nodes[n+1:]...)
330+
}
331+
}
332+
return tmp
333+
}

pkg/oc/admin/prune/imageprune/helper_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111

1212
"k8s.io/apimachinery/pkg/util/diff"
1313
knet "k8s.io/apimachinery/pkg/util/net"
14+
15+
imagegraph "github.com/openshift/origin/pkg/oc/graph/imagegraph/nodes"
1416
)
1517

1618
type requestStats struct {
@@ -215,3 +217,92 @@ func TestDefaultImagePinger(t *testing.T) {
215217
}()
216218
}
217219
}
220+
221+
func TestRemoveImageNodesAtIndexes(t *testing.T) {
222+
for _, tc := range []struct {
223+
name string
224+
nodes int
225+
remove []int
226+
expectedRemainingIndexes []int
227+
}{
228+
{
229+
name: "no nodes",
230+
remove: []int{0, 1, 2},
231+
expectedRemainingIndexes: []int{},
232+
},
233+
{
234+
name: "nothing to remove",
235+
nodes: 4,
236+
remove: []int{},
237+
expectedRemainingIndexes: []int{0, 1, 2, 3},
238+
},
239+
{
240+
name: "remove first",
241+
nodes: 3,
242+
remove: []int{0},
243+
expectedRemainingIndexes: []int{1, 2},
244+
},
245+
{
246+
name: "remove last",
247+
nodes: 3,
248+
remove: []int{2},
249+
expectedRemainingIndexes: []int{0, 1},
250+
},
251+
{
252+
name: "remove 2 consecutive",
253+
nodes: 4,
254+
remove: []int{1, 2},
255+
expectedRemainingIndexes: []int{0, 3},
256+
},
257+
{
258+
name: "remove odd",
259+
nodes: 5,
260+
remove: []int{1, 3, 5, 7},
261+
expectedRemainingIndexes: []int{0, 2, 4},
262+
},
263+
{
264+
name: "remove even",
265+
nodes: 5,
266+
remove: []int{0, 2, 4, 6, 8},
267+
expectedRemainingIndexes: []int{1, 3},
268+
},
269+
{
270+
name: "unsorted sequence",
271+
nodes: 5,
272+
remove: []int{0, 2, 2, 2, 4, 3, 2, 1, 2},
273+
expectedRemainingIndexes: []int{1, 3, 4},
274+
},
275+
} {
276+
t.Run(tc.name, func(t *testing.T) {
277+
// generate nodes
278+
nodes := make([]*imagegraph.ImageNode, 0, tc.nodes)
279+
nodeIndexes := map[*imagegraph.ImageNode]int{}
280+
for i := 0; i < tc.nodes; i++ {
281+
node := &imagegraph.ImageNode{}
282+
nodes = append(nodes, node)
283+
nodeIndexes[nodes[i]] = i
284+
}
285+
286+
nodes = removeImageNodesAtIndexes(nodes, tc.remove...)
287+
288+
for i := 0; i < len(nodes) || i < len(tc.expectedRemainingIndexes); i++ {
289+
if i >= len(nodes) {
290+
t.Errorf("no nodes at index %d where node #%d is expected", i, tc.expectedRemainingIndexes[i])
291+
continue
292+
}
293+
newIndex, ok := nodeIndexes[nodes[i]]
294+
if !ok {
295+
t.Errorf("unknown node present on index %d", i)
296+
continue
297+
}
298+
if i >= len(tc.expectedRemainingIndexes) {
299+
t.Errorf("found node #%d at index %d where no node is expected", newIndex, i)
300+
continue
301+
}
302+
if a, e := newIndex, tc.expectedRemainingIndexes[i]; a != e {
303+
t.Errorf("expected node #%d at index %d, not node #%d", e, i, a)
304+
}
305+
}
306+
})
307+
}
308+
}

pkg/oc/admin/prune/imageprune/prune.go

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
kerrors "k8s.io/apimachinery/pkg/util/errors"
2323
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2424
"k8s.io/apimachinery/pkg/util/sets"
25+
"k8s.io/apimachinery/pkg/watch"
2526
"k8s.io/client-go/util/retry"
2627
kapi "k8s.io/kubernetes/pkg/apis/core"
2728
kapisext "k8s.io/kubernetes/pkg/apis/extensions"
@@ -60,7 +61,7 @@ const (
6061
// ImageNode to an ImageComponentNode.
6162
ReferencedImageManifestEdgeKind = "ReferencedImageManifest"
6263

63-
pruneImageWorkerCount = 5
64+
defaultPruneImageWorkerCount = 5
6465
)
6566

6667
// RegistryClientFactoryFunc is a factory function returning a registry client for use in a worker.
@@ -149,6 +150,8 @@ type PrunerOptions struct {
149150
// Streams is the entire list of image streams across all namespaces in the
150151
// cluster.
151152
Streams *imageapi.ImageStreamList
153+
// StreamsWatcher watches for stream changes.
154+
StreamsWatcher watch.Interface
152155
// Pods is the entire list of pods across all namespaces in the cluster.
153156
Pods *kapi.PodList
154157
// RCs is the entire list of replication controllers across all namespaces in
@@ -176,6 +179,9 @@ type PrunerOptions struct {
176179
RegistryClientFactory RegistryClientFactoryFunc
177180
// RegistryURL is the URL of the integrated Docker registry.
178181
RegistryURL *url.URL
182+
// NumWorkers is a desired number of workers concurrently handling image prune jobs. If less than 1, the
183+
// default number of workers will be spawned.
184+
NumWorkers int
179185
}
180186

181187
// Pruner knows how to prune istags, images, manifest, layers, image configs and blobs.
@@ -199,10 +205,13 @@ type pruner struct {
199205
algorithm pruneAlgorithm
200206
registryClientFactory RegistryClientFactoryFunc
201207
registryURL *url.URL
208+
imageStreamWatcher watch.Interface
209+
imageStreamLimits map[string][]*kapi.LimitRange
202210
// sorted queue of images to prune
203211
queue []*imagegraph.ImageNode
204212
// contains prunable images removed from queue that are currently being processed
205213
processedImages map[*imagegraph.ImageNode]*Job
214+
numWorkers int
206215
}
207216

208217
var _ Pruner = &pruner{}
@@ -283,6 +292,13 @@ func NewPruner(options PrunerOptions) (Pruner, kerrors.Aggregate) {
283292
registryClientFactory: options.RegistryClientFactory,
284293
registryURL: options.RegistryURL,
285294
processedImages: make(map[*imagegraph.ImageNode]*Job),
295+
imageStreamWatcher: options.StreamsWatcher,
296+
imageStreamLimits: options.LimitRanges,
297+
numWorkers: options.NumWorkers,
298+
}
299+
300+
if p.numWorkers < 1 {
301+
p.numWorkers = defaultPruneImageWorkerCount
286302
}
287303

288304
if err := p.buildGraph(options); err != nil {
@@ -776,6 +792,50 @@ func (p *pruner) addBuildStrategyImageReferencesToGraph(referrer *kapi.ObjectRef
776792
return nil
777793
}
778794

795+
func (p *pruner) handleImageStreamEvent(event watch.Event) {
796+
getIsNode := func() (*imageapi.ImageStream, *imagegraph.ImageStreamNode) {
797+
is, ok := event.Object.(*imageapi.ImageStream)
798+
if !ok {
799+
utilruntime.HandleError(fmt.Errorf("internal error: expected ImageStream object in %s event, not %T", event.Type, event.Object))
800+
return nil, nil
801+
}
802+
n := p.g.Find(imagegraph.ImageStreamNodeName(is))
803+
if isNode, ok := n.(*imagegraph.ImageStreamNode); ok {
804+
return is, isNode
805+
}
806+
return is, nil
807+
}
808+
809+
switch event.Type {
810+
case watch.Added:
811+
is, isNode := getIsNode()
812+
if is == nil {
813+
return
814+
}
815+
if isNode != nil {
816+
glog.V(4).Infof("Ignoring added ImageStream %s that is already present in the graph", getName(is))
817+
return
818+
}
819+
glog.V(4).Infof("Adding ImageStream %s to the graph", getName(is))
820+
p.addImageStreamsToGraph(&imageapi.ImageStreamList{Items: []imageapi.ImageStream{*is}}, p.imageStreamLimits)
821+
822+
case watch.Modified:
823+
is, isNode := getIsNode()
824+
if is == nil {
825+
return
826+
}
827+
828+
if isNode != nil {
829+
glog.V(4).Infof("Removing updated ImageStream %s from the graph", getName(is))
830+
// first remove the current node if present
831+
p.g.RemoveNode(imagegraph.EnsureImageStreamNode(p.g, is))
832+
}
833+
834+
glog.V(4).Infof("Adding updated ImageStream %s back to the graph", getName(is))
835+
p.addImageStreamsToGraph(&imageapi.ImageStreamList{Items: []imageapi.ImageStream{*is}}, p.imageStreamLimits)
836+
}
837+
}
838+
779839
// getImageNodes returns only nodes of type ImageNode.
780840
func getImageNodes(nodes []gonum.Node) map[string]*imagegraph.ImageNode {
781841
ret := make(map[string]*imagegraph.ImageNode)
@@ -1112,7 +1172,7 @@ func (p *pruner) Prune(
11121172

11131173
defer close(jobChan)
11141174

1115-
for i := 0; i < pruneImageWorkerCount; i++ {
1175+
for i := 0; i < p.numWorkers; i++ {
11161176
worker, err := NewWorker(
11171177
p.algorithm,
11181178
p.registryClientFactory,
@@ -1145,9 +1205,10 @@ func (p *pruner) runLoop(
11451205
jobChan chan<- *Job,
11461206
resultChan <-chan JobResult,
11471207
) (deletions []Deletion, failures []Failure) {
1208+
isUpdateChan := p.imageStreamWatcher.ResultChan()
11481209
for {
11491210
// make workers busy
1150-
for len(p.processedImages) < pruneImageWorkerCount {
1211+
for len(p.processedImages) < p.numWorkers {
11511212
job, blocked := p.getNextJob()
11521213
if blocked {
11531214
break
@@ -1172,7 +1233,8 @@ func (p *pruner) runLoop(
11721233
failures = append(failures, failure)
11731234
}
11741235
delete(p.processedImages, res.Job.Image)
1175-
// TODO: handle image stream updates
1236+
case event := <-isUpdateChan:
1237+
p.handleImageStreamEvent(event)
11761238
// TODO: handle new images - do not add them to the queue though
11771239
}
11781240
}
@@ -1220,13 +1282,8 @@ func (p *pruner) getNextJob() (job *Job, blocked bool) {
12201282
blocked = job == nil
12211283

12221284
// remove no longer prunable images from the queue
1223-
for i, n := range toRemove {
1224-
if i < len(toRemove)-1 {
1225-
p.queue = append(p.queue[:n-i], p.queue[n+1-i:toRemove[i+1]-i]...)
1226-
} else {
1227-
p.queue = append(p.queue[:n-i], p.queue[n+1-i:]...)
1228-
}
1229-
}
1285+
// TODO: highly ineffective for a long queue O(N*M), consider the use of doubly linked list
1286+
p.queue = removeImageNodesAtIndexes(p.queue, toRemove...)
12301287

12311288
return
12321289
}

0 commit comments

Comments
 (0)