Skip to content

Commit 15b2d2e

Browse files
stttssoltysh
authored andcommitted
UPSTREAM: <carry>: create termination events
Origin-commit: a869af0c97e3d97bddedcd76af8a62da6c879c02 UPSTREAM: <carry>: apiserver: log new connections during termination Origin-commit: 89d1c3ceeb91755aae9099cd5f76c42a22de18c5 UPSTREAM: <carry>: apiserver: create LateConnections events on events in the last 20% of graceful termination time Origin-commit: 91bc33b6ddf9e1d80906717db5bd9096183e8795 UPSTREAM: <carry>: apiserver: log source in LateConnections event Origin-commit: 575e54740eb7c2ba635c73f24c22ad77cb5a6e70 UPSTREAM: <carry>: apiserver: skip local IPs and probes for LateConnections Origin-commit: 2109b95866e81b84a290f34f0806becc2cbd83e9 UPSTREAM: <carry>: only create valid LateConnections/GracefulTermination events UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready UPSTREAM: <carry>: apiserver: create hasBeenReadyCh channel UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready UPSTREAM: <carry>: fix termination event(s) validation failures UPSTREAM: <carry>: during the rebase collapse to create termination event it makes recording termination events a non-blocking operation. previously closing delayedStopCh might have been delayed on preserving data in the storage. the delayedStopCh is important as it signals the HTTP server to start the shutdown procedure. it also sets a hard timeout of 3 seconds for the storage layer since we are bypassing the API layer. openshift-rebase(v1.24):source=7b9aa03e491 UPSTREAM: <carry>: rename termination events to use lifecycleSignals openshift-rebase(v1.24):source=e90b78a9199
1 parent a736659 commit 15b2d2e

File tree

4 files changed

+350
-2
lines changed

4 files changed

+350
-2
lines changed

cmd/kube-apiserver/app/server.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ limitations under the License.
2020
package app
2121

2222
import (
23+
"context"
2324
"crypto/tls"
2425
"fmt"
2526
"net"
@@ -37,7 +38,9 @@ import (
3738

3839
oteltrace "go.opentelemetry.io/otel/trace"
3940

41+
corev1 "k8s.io/api/core/v1"
4042
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
43+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4144
utilerrors "k8s.io/apimachinery/pkg/util/errors"
4245
utilnet "k8s.io/apimachinery/pkg/util/net"
4346
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -46,6 +49,7 @@ import (
4649
"k8s.io/apiserver/pkg/authorization/authorizer"
4750
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
4851
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
52+
"k8s.io/apiserver/pkg/endpoints/request"
4953
genericfeatures "k8s.io/apiserver/pkg/features"
5054
genericapiserver "k8s.io/apiserver/pkg/server"
5155
"k8s.io/apiserver/pkg/server/egressselector"
@@ -76,6 +80,8 @@ import (
7680

7781
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
7882
"k8s.io/kubernetes/pkg/api/legacyscheme"
83+
"k8s.io/kubernetes/pkg/apis/core"
84+
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
7985
"k8s.io/kubernetes/pkg/capabilities"
8086
"k8s.io/kubernetes/pkg/controlplane"
8187
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
@@ -84,6 +90,7 @@ import (
8490
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
8591
kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
8692
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
93+
eventstorage "k8s.io/kubernetes/pkg/registry/core/event/storage"
8794
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
8895
"k8s.io/kubernetes/pkg/serviceaccount"
8996
)
@@ -297,6 +304,13 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
297304
s.Metrics.Apply()
298305
serviceaccount.RegisterMetrics()
299306

307+
var eventStorage *eventstorage.REST
308+
eventStorage, err = eventstorage.NewREST(genericConfig.RESTOptionsGetter, uint64(s.EventTTL.Seconds()))
309+
if err != nil {
310+
return nil, nil, nil, err
311+
}
312+
genericConfig.EventSink = eventRegistrySink{eventStorage}
313+
300314
config := &controlplane.Config{
301315
GenericConfig: genericConfig,
302316
ExtraConfig: controlplane.ExtraConfig{
@@ -744,3 +758,38 @@ func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, ne
744758
}
745759
return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
746760
}
761+
762+
// eventRegistrySink wraps an event registry in order to be used as direct event sync, without going through the API.
763+
type eventRegistrySink struct {
764+
*eventstorage.REST
765+
}
766+
767+
var _ genericapiserver.EventSink = eventRegistrySink{}
768+
769+
func (s eventRegistrySink) Create(v1event *corev1.Event) (*corev1.Event, error) {
770+
ctx := request.WithNamespace(request.WithRequestInfo(request.NewContext(), &request.RequestInfo{APIVersion: "v1"}), v1event.Namespace)
771+
// since we are bypassing the API set a hard timeout for the storage layer
772+
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
773+
defer cancel()
774+
775+
var event core.Event
776+
if err := v1.Convert_v1_Event_To_core_Event(v1event, &event, nil); err != nil {
777+
return nil, err
778+
}
779+
780+
obj, err := s.REST.Create(ctx, &event, nil, &metav1.CreateOptions{})
781+
if err != nil {
782+
return nil, err
783+
}
784+
ret, ok := obj.(*core.Event)
785+
if !ok {
786+
return nil, fmt.Errorf("expected corev1.Event, got %T", obj)
787+
}
788+
789+
var v1ret corev1.Event
790+
if err := v1.Convert_core_Event_To_v1_Event(ret, &v1ret, nil); err != nil {
791+
return nil, err
792+
}
793+
794+
return &v1ret, nil
795+
}

staging/src/k8s.io/apiserver/pkg/server/config.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package server
1818

1919
import (
2020
"fmt"
21+
"io/ioutil"
2122
"net"
2223
"net/http"
24+
"os"
2325
goruntime "runtime"
2426
"runtime/debug"
2527
"sort"
@@ -32,6 +34,7 @@ import (
3234
"github.com/google/uuid"
3335
oteltrace "go.opentelemetry.io/otel/trace"
3436

37+
corev1 "k8s.io/api/core/v1"
3538
"k8s.io/apimachinery/pkg/runtime"
3639
"k8s.io/apimachinery/pkg/runtime/schema"
3740
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -65,6 +68,8 @@ import (
6568
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
6669
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
6770
"k8s.io/client-go/informers"
71+
"k8s.io/client-go/kubernetes"
72+
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
6873
restclient "k8s.io/client-go/rest"
6974
"k8s.io/component-base/logs"
7075
"k8s.io/klog/v2"
@@ -239,6 +244,9 @@ type Config struct {
239244
// rejected with a 429 status code and a 'Retry-After' response.
240245
ShutdownSendRetryAfter bool
241246

247+
// EventSink receives events about the life cycle of the API server, e.g. readiness, serving, signals and termination.
248+
EventSink EventSink
249+
242250
//===========================================================================
243251
// values below here are targets for removal
244252
//===========================================================================
@@ -259,6 +267,11 @@ type Config struct {
259267
StorageVersionManager storageversion.Manager
260268
}
261269

270+
// EventSink allows to create events.
271+
type EventSink interface {
272+
Create(event *corev1.Event) (*corev1.Event, error)
273+
}
274+
262275
type RecommendedConfig struct {
263276
Config
264277

@@ -562,6 +575,10 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
562575
c.DiscoveryAddresses = discovery.DefaultAddresses{DefaultAddress: c.ExternalAddress}
563576
}
564577

578+
if c.EventSink == nil {
579+
c.EventSink = nullEventSink{}
580+
}
581+
565582
AuthorizeClientBearerToken(c.LoopbackClientConfig, &c.Authentication, &c.Authorization)
566583

567584
if c.RequestInfoResolver == nil {
@@ -589,9 +606,58 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
589606
// Complete fills in any fields not set that are required to have valid data and can be derived
590607
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
591608
func (c *RecommendedConfig) Complete() CompletedConfig {
609+
if c.ClientConfig != nil {
610+
ref, err := eventReference()
611+
if err != nil {
612+
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
613+
c.EventSink = nullEventSink{}
614+
} else {
615+
ns := ref.Namespace
616+
if len(ns) == 0 {
617+
ns = "default"
618+
}
619+
c.EventSink = &v1.EventSinkImpl{
620+
Interface: kubernetes.NewForConfigOrDie(c.ClientConfig).CoreV1().Events(ns),
621+
}
622+
}
623+
}
624+
592625
return c.Config.Complete(c.SharedInformerFactory)
593626
}
594627

628+
func eventReference() (*corev1.ObjectReference, error) {
629+
ns := os.Getenv("POD_NAMESPACE")
630+
pod := os.Getenv("POD_NAME")
631+
if len(ns) == 0 && len(pod) > 0 {
632+
serviceAccountNamespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
633+
if _, err := os.Stat(serviceAccountNamespaceFile); err == nil {
634+
bs, err := ioutil.ReadFile(serviceAccountNamespaceFile)
635+
if err != nil {
636+
return nil, err
637+
}
638+
ns = string(bs)
639+
}
640+
}
641+
if len(ns) == 0 {
642+
pod = ""
643+
ns = "kube-system"
644+
}
645+
if len(pod) == 0 {
646+
return &corev1.ObjectReference{
647+
Kind: "Namespace",
648+
Name: ns,
649+
APIVersion: "v1",
650+
}, nil
651+
}
652+
653+
return &corev1.ObjectReference{
654+
Kind: "Pod",
655+
Namespace: ns,
656+
Name: pod,
657+
APIVersion: "v1",
658+
}, nil
659+
}
660+
595661
// New creates a new server which logically combines the handling chain with the passed server.
596662
// name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delegating.
597663
// delegationTarget may not be nil.
@@ -660,7 +726,16 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
660726
Version: c.Version,
661727

662728
muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},
729+
730+
eventSink: c.EventSink,
731+
}
732+
733+
ref, err := eventReference()
734+
if err != nil {
735+
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
736+
c.EventSink = nullEventSink{}
663737
}
738+
s.eventRef = ref
664739

665740
for {
666741
if c.JSONPatchMaxCopyBytes <= 0 {
@@ -846,6 +921,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
846921
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
847922
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
848923
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
924+
handler = WithNonReadyRequestLogging(handler, c.lifecycleSignals.HasBeenReady)
925+
handler = WithLateConnectionFilter(handler)
849926
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
850927
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
851928
}
@@ -963,3 +1040,9 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati
9631040
authz.Authorizer = authorizerunion.New(tokenAuthorizer, authz.Authorizer)
9641041
}
9651042
}
1043+
1044+
type nullEventSink struct{}
1045+
1046+
func (nullEventSink) Create(event *corev1.Event) (*corev1.Event, error) {
1047+
return nil, nil
1048+
}

staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package server
1919
import (
2020
"fmt"
2121
"net/http"
22+
"os"
2223
gpath "path"
2324
"strings"
2425
"sync"
2526
"time"
2627

2728
systemd "github.com/coreos/go-systemd/v22/daemon"
2829

30+
corev1 "k8s.io/api/core/v1"
2931
"k8s.io/apimachinery/pkg/api/meta"
3032
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3133
"k8s.io/apimachinery/pkg/runtime"
@@ -247,6 +249,10 @@ type GenericAPIServer struct {
247249
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
248250
// rejected with a 429 status code and a 'Retry-After' response.
249251
ShutdownSendRetryAfter bool
252+
253+
// EventSink creates events.
254+
eventSink EventSink
255+
eventRef *corev1.ObjectReference
250256
}
251257

252258
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@@ -482,7 +488,10 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
482488

483489
go func() {
484490
defer delayedStopCh.Signal()
485-
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
491+
defer func() {
492+
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
493+
s.Eventf(corev1.EventTypeNormal, delayedStopCh.Name(), "The minimal shutdown duration of %v finished", s.ShutdownDelayDuration)
494+
}()
486495

487496
<-stopCh
488497

@@ -491,10 +500,28 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
491500
// and stop sending traffic to this server.
492501
shutdownInitiatedCh.Signal()
493502
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
503+
s.Eventf(corev1.EventTypeNormal, shutdownInitiatedCh.Name(), "Received signal to terminate, becoming unready, but keeping serving")
494504

495505
time.Sleep(s.ShutdownDelayDuration)
496506
}()
497507

508+
lateStopCh := make(chan struct{})
509+
if s.ShutdownDelayDuration > 0 {
510+
go func() {
511+
defer close(lateStopCh)
512+
513+
<-stopCh
514+
515+
time.Sleep(s.ShutdownDelayDuration * 8 / 10)
516+
}()
517+
}
518+
519+
s.SecureServingInfo.Listener = &terminationLoggingListener{
520+
Listener: s.SecureServingInfo.Listener,
521+
lateStopCh: lateStopCh,
522+
}
523+
unexpectedRequestsEventf.Store(s.Eventf)
524+
498525
// close socket after delayed stopCh
499526
shutdownTimeout := s.ShutdownTimeout
500527
if s.ShutdownSendRetryAfter {
@@ -541,13 +568,17 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
541568
<-listenerStoppedCh
542569
httpServerStoppedListeningCh.Signal()
543570
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
571+
s.Eventf(corev1.EventTypeNormal, httpServerStoppedListeningCh.Name(), "HTTP Server has stopped listening")
544572
}()
545573

546574
// we don't accept new request as soon as both ShutdownDelayDuration has
547575
// elapsed and preshutdown hooks have completed.
548576
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
549577
go func() {
550-
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
578+
defer func() {
579+
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
580+
s.Eventf(corev1.EventTypeNormal, drainedCh.Name(), "All non long-running request(s) in-flight have drained")
581+
}()
551582
defer notAcceptingNewRequestCh.Signal()
552583

553584
// wait for the delayed stopCh before closing the handler chain
@@ -591,6 +622,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
591622
defer func() {
592623
preShutdownHooksHasStoppedCh.Signal()
593624
klog.V(1).InfoS("[graceful-termination] pre-shutdown hooks completed", "name", preShutdownHooksHasStoppedCh.Name())
625+
s.Eventf(corev1.EventTypeNormal, "TerminationPreShutdownHooksFinished", "All pre-shutdown hooks have been finished")
594626
}()
595627
err = s.RunPreShutdownHooks()
596628
}()
@@ -611,6 +643,8 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
611643
<-stoppedCh
612644

613645
klog.V(1).Info("[graceful-termination] apiserver is exiting")
646+
s.Eventf(corev1.EventTypeNormal, "TerminationGracefulTerminationFinished", "All pending requests processed")
647+
614648
return nil
615649
}
616650

@@ -883,3 +917,33 @@ func getResourceNamesForGroup(apiPrefix string, apiGroupInfo *APIGroupInfo, path
883917

884918
return resourceNames, nil
885919
}
920+
921+
// Eventf creates an event with the API server as source, either in default namespace against default namespace, or
922+
// if POD_NAME/NAMESPACE are set against that pod.
923+
func (s *GenericAPIServer) Eventf(eventType, reason, messageFmt string, args ...interface{}) {
924+
t := metav1.Time{Time: time.Now()}
925+
host, _ := os.Hostname() // expicitly ignore error. Empty host is fine
926+
927+
ref := *s.eventRef
928+
if len(ref.Namespace) == 0 {
929+
ref.Namespace = "default" // TODO: event broadcaster sets event ns to default. We have to match. Odd.
930+
}
931+
932+
e := &corev1.Event{
933+
ObjectMeta: metav1.ObjectMeta{
934+
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
935+
Namespace: ref.Namespace,
936+
},
937+
InvolvedObject: ref,
938+
Reason: reason,
939+
Message: fmt.Sprintf(messageFmt, args...),
940+
Type: eventType,
941+
Source: corev1.EventSource{Component: "apiserver", Host: host},
942+
}
943+
944+
klog.V(2).Infof("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
945+
946+
if _, err := s.eventSink.Create(e); err != nil {
947+
klog.Warningf("failed to create event %s/%s: %v", e.Namespace, e.Name, err)
948+
}
949+
}

0 commit comments

Comments
 (0)