Skip to content

Commit bc81a86

Browse files
committed
Abstract the logic of the TrafficDistribution test
Split the logic of creating the clients and the servers apart from the logic of checking which clients connect to which servers. Add some extra complexity to support additional use cases (like multiple endpoints on the same node).
1 parent b1a0fea commit bc81a86

File tree

1 file changed

+74
-71
lines changed

1 file changed

+74
-71
lines changed

test/e2e/network/traffic_distribution.go

Lines changed: 74 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
discoveryv1 "k8s.io/api/discovery/v1"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/util/intstr"
30+
"k8s.io/apimachinery/pkg/util/sets"
3031
clientset "k8s.io/client-go/kubernetes"
3132
"k8s.io/kubernetes/test/e2e/framework"
3233
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@@ -92,6 +93,18 @@ var _ = common.SIGDescribe("Traffic Distribution", func() {
9293
}
9394
}
9495

96+
// Data structures for tracking server and client pods
97+
type serverPod struct {
98+
node *v1.Node
99+
pod *v1.Pod
100+
}
101+
102+
type clientPod struct {
103+
node *v1.Node
104+
endpoints []*serverPod
105+
pod *v1.Pod
106+
}
107+
95108
////////////////////////////////////////////////////////////////////////////
96109
// Main test specifications.
97110
////////////////////////////////////////////////////////////////////////////
@@ -109,34 +122,56 @@ var _ = common.SIGDescribe("Traffic Distribution", func() {
109122
ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones))
110123
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
111124
framework.ExpectNoError(err)
112-
nodeForZone := make(map[string]string)
125+
nodeForZone := make(map[string]*v1.Node)
113126
for _, zone := range zones {
114127
found := false
115128
for _, node := range nodeList.Items {
116129
if zone == node.Labels[v1.LabelTopologyZone] {
117130
found = true
118-
nodeForZone[zone] = node.GetName()
131+
nodeForZone[zone] = &node
119132
}
120133
}
121134
if !found {
122135
framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */))
123136
}
124137
}
125138

126-
ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2]))
127-
zoneForServingPod := make(map[string]string)
128-
var servingPods []*v1.Pod
139+
var clientPods []*clientPod
140+
var serverPods []*serverPod
141+
142+
// We want clients in all three zones
143+
for _, node := range nodeForZone {
144+
clientPods = append(clientPods, &clientPod{node: node})
145+
}
146+
147+
// and endpoints in the first two zones
148+
serverPods = []*serverPod{
149+
{node: clientPods[0].node},
150+
{node: clientPods[1].node},
151+
}
152+
153+
// The clients with an endpoint in the same zone should only connect to
154+
// that endpoint. The client with no endpoint in its zone should connect
155+
// to both endpoints.
156+
clientPods[0].endpoints = []*serverPod{serverPods[0]}
157+
clientPods[1].endpoints = []*serverPod{serverPods[1]}
158+
clientPods[2].endpoints = serverPods
159+
160+
var podsToCreate []*v1.Pod
129161
servingPodLabels := map[string]string{"app": f.UniqueName}
130-
for _, zone := range zones[:2] {
131-
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
132-
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
162+
for i, sp := range serverPods {
163+
node := sp.node.Name
164+
zone := sp.node.Labels[v1.LabelTopologyZone]
165+
pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname")
166+
ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone))
167+
nodeSelection := e2epod.NodeSelection{Name: node}
133168
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
134169
pod.Labels = servingPodLabels
135170

136-
servingPods = append(servingPods, pod)
137-
zoneForServingPod[pod.Name] = zone
171+
sp.pod = pod
172+
podsToCreate = append(podsToCreate, pod)
138173
}
139-
e2epod.NewPodClient(f).CreateBatch(ctx, servingPods)
174+
e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate)
140175

141176
trafficDist := v1.ServiceTrafficDistributionPreferClose
142177
svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
@@ -156,95 +191,63 @@ var _ = common.SIGDescribe("Traffic Distribution", func() {
156191
ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
157192

158193
ginkgo.By("waiting for EndpointSlices to be created")
159-
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout)
194+
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout)
160195
framework.ExpectNoError(err)
161196
slices := endpointSlicesForService(svc.Name)
162197
framework.Logf("got slices:\n%v", format.Object(slices, 1))
163198

164-
ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone")
165-
166-
createClientPod := func(ctx context.Context, zone string) *v1.Pod {
167-
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil)
168-
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
199+
podsToCreate = nil
200+
for i, cp := range clientPods {
201+
node := cp.node.Name
202+
zone := cp.node.Labels[v1.LabelTopologyZone]
203+
pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("client-%d-%s", i, node), nil, nil, nil)
204+
ginkgo.By(fmt.Sprintf("creating a client pod %q on node %q in zone %q", pod.Name, node, zone))
205+
nodeSelection := e2epod.NodeSelection{Name: node}
169206
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
170207
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
171208
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
172209
pod.Spec.Containers[0].Name = pod.Name
173210

174-
return e2epod.NewPodClient(f).CreateSync(ctx, pod)
211+
cp.pod = pod
212+
podsToCreate = append(podsToCreate, pod)
175213
}
214+
e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate)
176215

177-
for _, clientZone := range zones[:2] {
178-
framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone)
179-
clientPod := createClientPod(ctx, clientZone)
216+
for _, cp := range clientPods {
217+
wantedEndpoints := sets.New[string]()
218+
for _, sp := range cp.endpoints {
219+
wantedEndpoints.Insert(sp.pod.Name)
220+
}
221+
unreachedEndpoints := wantedEndpoints.Clone()
180222

181-
framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone)
223+
ginkgo.By(fmt.Sprintf("ensuring that requests from %s on %s go to the endpoint(s) %v", cp.pod.Name, cp.node.Name, wantedEndpoints.UnsortedList()))
182224

183-
requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
225+
requestsSucceed := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
184226
logLines := reverseChronologicalLogLines
185227
if len(logLines) < 20 {
186228
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
187229
}
188-
consecutiveSameZone := 0
230+
consecutiveSuccessfulRequests := 0
189231

190232
for _, logLine := range logLines {
191233
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
192234
continue
193235
}
194-
destZone, ok := zoneForServingPod[logLine]
195-
if !ok {
196-
return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
197-
}
198-
if clientZone != destZone {
199-
return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
236+
destEndpoint := logLine
237+
if !wantedEndpoints.Has(destEndpoint) {
238+
return gomegaCustomError("request from %s should not have reached %s\nreverseChronologicalLogLines=\n%v", cp.pod.Name, destEndpoint, strings.Join(reverseChronologicalLogLines, "\n")), nil
200239
}
201-
consecutiveSameZone++
202-
if consecutiveSameZone >= 10 {
240+
consecutiveSuccessfulRequests++
241+
unreachedEndpoints.Delete(destEndpoint)
242+
if consecutiveSuccessfulRequests >= 10 && len(unreachedEndpoints) == 0 {
203243
return nil, nil // Pass condition.
204244
}
205245
}
206246
// Ideally, the matcher would never reach this condition
207-
return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
247+
return gomegaCustomError("requests didn't meet the required criteria to reach all endpoints %v\nreverseChronologicalLogLines=\n%v", wantedEndpoints.UnsortedList(), strings.Join(reverseChronologicalLogLines, "\n")), nil
208248
})
209249

210-
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
250+
gomega.Eventually(ctx, requestsFromClient(cp.pod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceed)
211251
}
212-
213-
ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
214-
215-
clientZone := zones[2]
216-
framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
217-
clientPod := createClientPod(ctx, clientZone)
218-
219-
framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone)
220-
221-
requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
222-
logLines := reverseChronologicalLogLines
223-
if len(logLines) < 20 {
224-
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
225-
}
226-
227-
// Requests are counted as successful when the response read from the log
228-
// lines is the name of a recognizable serving pod.
229-
consecutiveSuccessfulRequests := 0
230-
231-
for _, logLine := range logLines {
232-
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
233-
continue
234-
}
235-
_, servingPodExists := zoneForServingPod[logLine]
236-
if !servingPodExists {
237-
return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
238-
}
239-
consecutiveSuccessfulRequests++
240-
if consecutiveSuccessfulRequests >= 10 {
241-
return nil, nil // Pass condition
242-
}
243-
}
244-
// Ideally, the matcher would never reach this condition
245-
return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
246-
})
247-
248-
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod)
249252
})
250253
})

0 commit comments

Comments
 (0)