Skip to content

Commit 3630f4e

Browse files
author
Ravi Sankar Penta
committed
Network policy pod watch changes
- Create pod watch for all namespaces instead of pod watch for each namespace - Use shared informers for pod watch
1 parent 262c7f1 commit 3630f4e

File tree

1 file changed

+79
-91
lines changed

1 file changed

+79
-91
lines changed

pkg/sdn/plugin/networkpolicy.go

Lines changed: 79 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type networkPolicyPlugin struct {
3737
lock sync.Mutex
3838
namespaces map[uint32]*npNamespace
3939
kNamespaces map[string]kapi.Namespace
40+
pods map[ktypes.UID]kapi.Pod
4041

4142
kubeInformers kinternalinformers.SharedInformerFactory
4243
}
@@ -50,9 +51,6 @@ type npNamespace struct {
5051
inUse bool
5152

5253
policies map[ktypes.UID]*npPolicy
53-
54-
pods map[ktypes.UID]kapi.Pod
55-
stopPodWatch chan struct{}
5654
}
5755

5856
// npPolicy is a parsed version of a single NetworkPolicy object
@@ -68,6 +66,7 @@ func NewNetworkPolicyPlugin() osdnPolicy {
6866
return &networkPolicyPlugin{
6967
namespaces: make(map[uint32]*npNamespace),
7068
kNamespaces: make(map[string]kapi.Namespace),
69+
pods: make(map[ktypes.UID]kapi.Pod),
7170
}
7271
}
7372

@@ -101,6 +100,7 @@ func (np *networkPolicyPlugin) Start(node *OsdnNode) error {
101100
}
102101

103102
np.watchNamespaces()
103+
np.watchPods()
104104
go utilwait.Forever(np.watchNetworkPolicies, 0)
105105
return nil
106106
}
@@ -252,92 +252,6 @@ func (np *networkPolicyPlugin) UnrefVNID(vnid uint32) {
252252
np.syncNamespace(npns)
253253
}
254254

255-
// watchPods watches Pod changes in npns until stopPodWatch is triggered. pods
256-
// and stopPodWatch are passed in as arguments rather than being read from npns
257-
// because it's possible another thread will already have cancelled the watch
258-
// (and changed the npns fields) before this function runs.
259-
func (np *networkPolicyPlugin) watchPods(npns *npNamespace, pods map[ktypes.UID]kapi.Pod, stopPodWatch chan struct{}) {
260-
RunNamespacedPodEventQueue(np.node.kClient.Core().RESTClient(), npns.name, stopPodWatch, func(delta cache.Delta) error {
261-
pod := delta.Object.(*kapi.Pod)
262-
glog.V(5).Infof("Watch %s event for Pod %s/%s", delta.Type, pod.Namespace, pod.Name)
263-
264-
// We don't want to grab np.namespacesLock for every Pod.Status change...
265-
// But it's safe to look up oldPod without locking here because no other
266-
// threads modify this map.
267-
oldPod, podExisted := pods[pod.UID]
268-
if pod.Status.PodIP == "" {
269-
delta.Type = cache.Deleted
270-
}
271-
switch delta.Type {
272-
case cache.Sync, cache.Added, cache.Updated:
273-
if podExisted && oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
274-
return nil
275-
}
276-
case cache.Deleted:
277-
if !podExisted {
278-
return nil
279-
}
280-
}
281-
282-
glog.V(5).Infof("Re-checking policies after pod %s", delta.Type)
283-
np.lock.Lock()
284-
defer np.lock.Unlock()
285-
286-
// RunNamespacedPodEventQueue() will call this function at least once more
287-
// after the watch is stopped, so verify that our watch is still running
288-
// before changing anything.
289-
if stopPodWatch != npns.stopPodWatch {
290-
return nil
291-
}
292-
293-
if delta.Type == cache.Deleted {
294-
delete(pods, pod.UID)
295-
} else {
296-
pods[pod.UID] = *pod
297-
}
298-
299-
changed := false
300-
for _, npp := range npns.policies {
301-
if npp.watchesPods {
302-
if np.updateNetworkPolicy(npns, &npp.policy) {
303-
changed = true
304-
}
305-
}
306-
}
307-
if changed {
308-
np.syncNamespace(npns)
309-
}
310-
311-
return nil
312-
})
313-
}
314-
315-
func (np *networkPolicyPlugin) podWatchUntilStopped(npns *npNamespace) {
316-
pods := npns.pods
317-
stop := npns.stopPodWatch
318-
go utilwait.Until(func() { np.watchPods(npns, pods, stop) }, 0, stop)
319-
}
320-
321-
func (np *networkPolicyPlugin) updatePodWatch(npns *npNamespace) {
322-
watchesPods := false
323-
for _, npp := range npns.policies {
324-
if npp.watchesPods {
325-
watchesPods = true
326-
break
327-
}
328-
}
329-
330-
if watchesPods && (npns.stopPodWatch == nil) {
331-
npns.pods = make(map[ktypes.UID]kapi.Pod)
332-
npns.stopPodWatch = make(chan struct{})
333-
np.podWatchUntilStopped(npns)
334-
} else if !watchesPods && (npns.stopPodWatch != nil) {
335-
close(npns.stopPodWatch)
336-
npns.stopPodWatch = nil
337-
npns.pods = nil
338-
}
339-
}
340-
341255
func (np *networkPolicyPlugin) selectNamespaces(lsel *metav1.LabelSelector) []uint32 {
342256
vnids := []uint32{}
343257
sel, err := metav1.LabelSelectorAsSelector(lsel)
@@ -364,7 +278,7 @@ func (np *networkPolicyPlugin) selectPods(npns *npNamespace, lsel *metav1.LabelS
364278
glog.Errorf("ValidateNetworkPolicy() failure! Invalid PodSelector: %v", err)
365279
return ips
366280
}
367-
for _, pod := range npns.pods {
281+
for _, pod := range np.pods {
368282
if sel.Matches(labels.Set(pod.Labels)) {
369283
ips = append(ips, pod.Status.PodIP)
370284
}
@@ -464,7 +378,6 @@ func (np *networkPolicyPlugin) updateNetworkPolicy(npns *npNamespace, policy *ex
464378

465379
oldNPP, existed := npns.policies[policy.UID]
466380
npns.policies[policy.UID] = npp
467-
np.updatePodWatch(npns)
468381

469382
changed := !existed || !reflect.DeepEqual(oldNPP.flows, npp.flows)
470383
if !changed {
@@ -540,6 +453,81 @@ func namespaceIsIsolated(ns *kapi.Namespace) bool {
540453
}
541454
}
542455

456+
func (np *networkPolicyPlugin) watchPods() {
457+
podInformer := np.kubeInformers.Core().InternalVersion().Pods()
458+
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
459+
AddFunc: func(obj interface{}) {
460+
pod := obj.(*kapi.Pod)
461+
np.handleAddOrUpdatePod(pod, watch.Added)
462+
},
463+
UpdateFunc: func(_, cur interface{}) {
464+
pod := cur.(*kapi.Pod)
465+
np.handleAddOrUpdatePod(pod, watch.Modified)
466+
},
467+
DeleteFunc: func(obj interface{}) {
468+
obj, err := getDeletedObjFromInformer(obj, Pods)
469+
if err != nil {
470+
glog.Error(err)
471+
return
472+
}
473+
pod := obj.(*kapi.Pod)
474+
np.handleDeletePod(pod)
475+
},
476+
})
477+
}
478+
479+
func (np *networkPolicyPlugin) handleAddOrUpdatePod(pod *kapi.Pod, eventType watch.EventType) {
480+
glog.V(5).Infof("Watch %s event for Pod '%s/%s'", eventType, pod.Namespace, pod.Name)
481+
482+
// We don't want to grab np.Lock for every Pod.Status change...
483+
// But it's safe to look up oldPod without locking here because no other
484+
// threads modify this map.
485+
oldPod, podExisted := np.pods[pod.UID]
486+
if pod.Status.PodIP == "" {
487+
return
488+
} else if podExisted && oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
489+
return
490+
}
491+
492+
np.lock.Lock()
493+
defer np.lock.Unlock()
494+
495+
np.pods[pod.UID] = *pod
496+
np.refreshNetworkPolicies()
497+
}
498+
499+
func (np *networkPolicyPlugin) handleDeletePod(pod *kapi.Pod) {
500+
glog.V(5).Infof("Watch %s event for Pod '%s/%s'", watch.Deleted, pod.Namespace, pod.Name)
501+
502+
_, podExisted := np.pods[pod.UID]
503+
if !podExisted || (pod.Status.PodIP == "") {
504+
return
505+
}
506+
507+
np.lock.Lock()
508+
defer np.lock.Unlock()
509+
510+
delete(np.pods, pod.UID)
511+
np.refreshNetworkPolicies()
512+
}
513+
514+
func (np *networkPolicyPlugin) refreshNetworkPolicies() {
515+
for _, npns := range np.namespaces {
516+
changed := false
517+
for _, npp := range npns.policies {
518+
if npp.watchesPods {
519+
if np.updateNetworkPolicy(npns, &npp.policy) {
520+
changed = true
521+
break
522+
}
523+
}
524+
}
525+
if changed {
526+
np.syncNamespace(npns)
527+
}
528+
}
529+
}
530+
543531
func (np *networkPolicyPlugin) watchNamespaces() {
544532
nsInformer := np.kubeInformers.Core().InternalVersion().Namespaces()
545533
nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{

0 commit comments

Comments
 (0)