From 9ca18f708a5964abdd5344924501ab1e7e9ed5b6 Mon Sep 17 00:00:00 2001 From: "chang.qiangqiang" Date: Wed, 26 Feb 2025 00:12:50 +0800 Subject: [PATCH] fix(detector): add ratelimiteroptions for controller options Signed-off-by: chang.qiangqiang --- .../app/controllermanager.go | 56 ++++++++++++------- .../rb_application_failover_controller.go | 6 +- .../approver/agent_csr_approving.go | 6 +- pkg/controllers/cluster/cluster_controller.go | 8 ++- pkg/controllers/cluster/taint_manager.go | 6 +- .../deployment_replicas_syncer_controller.go | 4 +- ...erated_resource_quota_status_controller.go | 10 +++- ...ederated_resource_quota_sync_controller.go | 10 +++- .../hpa_scale_target_marker_controller.go | 6 ++ .../mcs/endpointslice_controller.go | 8 ++- .../mcs/service_export_controller.go | 11 +++- .../mcs/service_import_controller.go | 6 +- .../endpointslice_collect_controller.go | 7 ++- .../endpointslice_dispatch_controller.go | 10 +++- .../namespace/namespace_sync_controller.go | 6 ++ .../unifiedauth/unified_auth_controller.go | 8 ++- .../workloadrebalancer_controller.go | 6 +- .../dependencies_distributor.go | 3 +- pkg/detector/detector.go | 14 +++-- 19 files changed, 142 insertions(+), 49 deletions(-) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index e7068b00567f..5d2822464ae6 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -255,6 +255,7 @@ func startClusterController(ctx controllerscontext.Context) (enabled bool, err e EnableTaintManager: ctx.Opts.EnableTaintManager, ClusterTaintEvictionRetryFrequency: 10 * time.Second, ExecutionSpaceRetryFrequency: 10 * time.Second, + RateLimiterOption: ctx.Opts.RateLimiterOptions, } if err := clusterController.SetupWithManager(mgr); err != nil { return false, err @@ -269,6 +270,7 @@ func startClusterController(ctx controllerscontext.Context) (enabled bool, err e EventRecorder: mgr.GetEventRecorderFor(cluster.TaintManagerName), ClusterTaintEvictionRetryFrequency: 10 * time.Second, ConcurrentReconciles: 3, + RateLimiterOptions: ctx.Opts.RateLimiterOptions, } if err := taintManager.SetupWithManager(mgr); err != nil { return false, err @@ -447,6 +449,7 @@ func startNamespaceController(ctx controllerscontext.Context) (enabled bool, err EventRecorder: ctx.Mgr.GetEventRecorderFor(namespace.ControllerName), SkippedPropagatingNamespaces: ctx.Opts.SkippedPropagatingNamespaces, OverrideManager: ctx.OverrideManager, + RateLimitOptions: ctx.Opts.RateLimiterOptions, } if err := namespaceSyncController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -466,6 +469,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool, PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + RatelimiterOption: ctx.Opts.RateLimiterOptions, } serviceExportController.RunWorkQueue() if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil { @@ -488,6 +492,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + RatelimiterOption: ctx.Opts.RateLimiterOptions, } endpointSliceCollectController.RunWorkQueue() if err := endpointSliceCollectController.SetupWithManager(ctx.Mgr); err != nil { @@ -501,10 +506,11 @@ func startEndpointSliceDispatchController(ctx controllerscontext.Context) (enabl return false, nil } endpointSliceSyncController := &multiclusterservice.EndpointsliceDispatchController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(multiclusterservice.EndpointsliceDispatchControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - InformerManager: genericmanager.GetInstance(), + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(multiclusterservice.EndpointsliceDispatchControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: genericmanager.GetInstance(), + RatelimiterOption: ctx.Opts.RateLimiterOptions, } if err := endpointSliceSyncController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -514,8 +520,9 @@ func startEndpointSliceDispatchController(ctx controllerscontext.Context) (enabl func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool, err error) { endpointSliceController := &mcs.EndpointSliceController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName), + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName), + RatelimiterOption: ctx.Opts.RateLimiterOptions, } if err := endpointSliceController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -525,8 +532,9 @@ func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool, func startServiceImportController(ctx controllerscontext.Context) (enabled bool, err error) { serviceImportController := &mcs.ServiceImportController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName), + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName), + RatelimiterOption: ctx.Opts.RateLimiterOptions, } if err := serviceImportController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -536,8 +544,9 @@ func startServiceImportController(ctx controllerscontext.Context) (enabled bool, func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, err error) { unifiedAuthController := &unifiedauth.Controller{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(unifiedauth.ControllerName), + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(unifiedauth.ControllerName), + RatelimiterOption: ctx.Opts.RateLimiterOptions, } if err := unifiedAuthController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -547,8 +556,9 @@ func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, e func startFederatedResourceQuotaSyncController(ctx controllerscontext.Context) (enabled bool, err error) { controller := federatedresourcequota.SyncController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.SyncControllerName), + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.SyncControllerName), + RatelimiterOption: ctx.Opts.RateLimiterOptions, } if err = controller.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -558,8 +568,9 @@ func startFederatedResourceQuotaSyncController(ctx controllerscontext.Context) ( func startFederatedResourceQuotaStatusController(ctx controllerscontext.Context) (enabled bool, err error) { controller := federatedresourcequota.StatusController{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.StatusControllerName), + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.StatusControllerName), + RateLimiterOptions: ctx.Opts.RateLimiterOptions, } if err = controller.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -596,6 +607,7 @@ func startApplicationFailoverController(ctx controllerscontext.Context) (enabled Client: ctx.Mgr.GetClient(), EventRecorder: ctx.Mgr.GetEventRecorderFor(applicationfailover.RBApplicationFailoverControllerName), ResourceInterpreter: ctx.ResourceInterpreter, + RateLimiterOption: ctx.Opts.RateLimiterOptions, } if err = rbApplicationFailoverController.SetupWithManager(ctx.Mgr); err != nil { return false, err @@ -659,8 +671,9 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext. func startHPAScaleTargetMarkerController(ctx controllerscontext.Context) (enabled bool, err error) { hpaScaleTargetMarker := hpascaletargetmarker.HpaScaleTargetMarker{ - DynamicClient: ctx.DynamicClientSet, - RESTMapper: ctx.Mgr.GetRESTMapper(), + DynamicClient: ctx.DynamicClientSet, + RESTMapper: ctx.Mgr.GetRESTMapper(), + RatelimiterOption: ctx.Opts.RateLimiterOptions, } err = hpaScaleTargetMarker.SetupWithManager(ctx.Mgr) if err != nil { @@ -672,7 +685,8 @@ func startHPAScaleTargetMarkerController(ctx controllerscontext.Context) (enable func startDeploymentReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) { deploymentReplicasSyncer := deploymentreplicassyncer.DeploymentReplicasSyncer{ - Client: ctx.Mgr.GetClient(), + Client: ctx.Mgr.GetClient(), + RateLimiterOptions: ctx.Opts.RateLimiterOptions, } err = deploymentReplicasSyncer.SetupWithManager(ctx.Mgr) if err != nil { @@ -710,7 +724,8 @@ func startRemedyController(ctx controllerscontext.Context) (enabled bool, err er func startWorkloadRebalancerController(ctx controllerscontext.Context) (enabled bool, err error) { workloadRebalancer := workloadrebalancer.RebalancerController{ - Client: ctx.Mgr.GetClient(), + Client: ctx.Mgr.GetClient(), + RatelimiterOption: ctx.Opts.RateLimiterOptions, } err = workloadRebalancer.SetupWithManager(ctx.Mgr) if err != nil { @@ -721,7 +736,10 @@ func startWorkloadRebalancerController(ctx controllerscontext.Context) (enabled } func startAgentCSRApprovingController(ctx controllerscontext.Context) (enabled bool, err error) { - agentCSRApprover := approver.AgentCSRApprovingController{Client: ctx.KubeClientSet} + agentCSRApprover := approver.AgentCSRApprovingController{ + Client: ctx.KubeClientSet, + RatelimiterOption: ctx.Opts.RateLimiterOptions, + } err = agentCSRApprover.SetupWithManager(ctx.Mgr) if err != nil { return false, err diff --git a/pkg/controllers/applicationfailover/rb_application_failover_controller.go b/pkg/controllers/applicationfailover/rb_application_failover_controller.go index dd5f86048efc..390178b58b8b 100644 --- a/pkg/controllers/applicationfailover/rb_application_failover_controller.go +++ b/pkg/controllers/applicationfailover/rb_application_failover_controller.go @@ -47,8 +47,8 @@ const RBApplicationFailoverControllerName = "resource-binding-application-failov // RBApplicationFailoverController is to sync ResourceBinding's application failover behavior. type RBApplicationFailoverController struct { client.Client - EventRecorder record.EventRecorder - RateLimiterOptions ratelimiterflag.Options + EventRecorder record.EventRecorder + RateLimiterOption ratelimiterflag.Options // workloadUnhealthyMap records which clusters the specific resource is in an unhealthy state workloadUnhealthyMap *workloadUnhealthyMap @@ -209,7 +209,7 @@ func (c *RBApplicationFailoverController) SetupWithManager(mgr controllerruntime return controllerruntime.NewControllerManagedBy(mgr). Named(RBApplicationFailoverControllerName). For(&workv1alpha2.ResourceBinding{}, builder.WithPredicates(resourceBindingPredicateFn)). - WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions)}). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOption)}). Complete(c) } diff --git a/pkg/controllers/certificate/approver/agent_csr_approving.go b/pkg/controllers/certificate/approver/agent_csr_approving.go index f42c2db62d60..aa3671611fbf 100644 --- a/pkg/controllers/certificate/approver/agent_csr_approving.go +++ b/pkg/controllers/certificate/approver/agent_csr_approving.go @@ -33,9 +33,11 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util/certificate" ) @@ -47,7 +49,8 @@ const ( // AgentCSRApprovingController is used to automatically approve the agent's CSR. type AgentCSRApprovingController struct { - Client kubernetes.Interface + Client kubernetes.Interface + RatelimiterOption ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -289,5 +292,6 @@ func (a *AgentCSRApprovingController) SetupWithManager(mgr controllerruntime.Man return controllerruntime.NewControllerManagedBy(mgr). Named(csrApprovingController). For(&certificatesv1.CertificateSigningRequest{}, builder.WithPredicates(predicateFunc)). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](a.RatelimiterOption)}). Complete(a) } diff --git a/pkg/controllers/cluster/cluster_controller.go b/pkg/controllers/cluster/cluster_controller.go index 0c0b4bdf6f2f..2b6c1c245312 100644 --- a/pkg/controllers/cluster/cluster_controller.go +++ b/pkg/controllers/cluster/cluster_controller.go @@ -38,6 +38,7 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" @@ -45,6 +46,7 @@ import ( workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/features" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" utilhelper "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" @@ -119,7 +121,8 @@ type Controller struct { ExecutionSpaceRetryFrequency time.Duration // Per Cluster map stores last observed health together with a local time when it was observed. - clusterHealthMap *clusterHealthMap + clusterHealthMap *clusterHealthMap + RateLimiterOption ratelimiterflag.Options } type clusterHealthMap struct { @@ -215,7 +218,8 @@ func (c *Controller) Start(ctx context.Context) error { func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { c.clusterHealthMap = newClusterHealthMap() return utilerrors.NewAggregate([]error{ - controllerruntime.NewControllerManagedBy(mgr).Named(ControllerName).For(&clusterv1alpha1.Cluster{}).Complete(c), + controllerruntime.NewControllerManagedBy(mgr).Named(ControllerName).For(&clusterv1alpha1.Cluster{}). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOption)}).Complete(c), mgr.Add(c), }) } diff --git a/pkg/controllers/cluster/taint_manager.go b/pkg/controllers/cluster/taint_manager.go index 6d4c53768e25..953f93c40a10 100644 --- a/pkg/controllers/cluster/taint_manager.go +++ b/pkg/controllers/cluster/taint_manager.go @@ -29,12 +29,14 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/features" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" "github.com/karmada-io/karmada/pkg/util/helper" @@ -54,6 +56,7 @@ type NoExecuteTaintManager struct { bindingEvictionWorker util.AsyncWorker clusterBindingEvictionWorker util.AsyncWorker + RateLimiterOptions ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -298,7 +301,8 @@ func (tc *NoExecuteTaintManager) needEviction(clusterName string, annotations ma // SetupWithManager creates a controller and register to controller manager. func (tc *NoExecuteTaintManager) SetupWithManager(mgr controllerruntime.Manager) error { return utilerrors.NewAggregate([]error{ - controllerruntime.NewControllerManagedBy(mgr).Named(TaintManagerName).For(&clusterv1alpha1.Cluster{}).Complete(tc), + controllerruntime.NewControllerManagedBy(mgr).Named(TaintManagerName).For(&clusterv1alpha1.Cluster{}). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](tc.RateLimiterOptions)}).Complete(tc), mgr.Add(tc), }) } diff --git a/pkg/controllers/deploymentreplicassyncer/deployment_replicas_syncer_controller.go b/pkg/controllers/deploymentreplicassyncer/deployment_replicas_syncer_controller.go index d494da56fbfc..a64f0546261f 100644 --- a/pkg/controllers/deploymentreplicassyncer/deployment_replicas_syncer_controller.go +++ b/pkg/controllers/deploymentreplicassyncer/deployment_replicas_syncer_controller.go @@ -32,6 +32,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/names" ) @@ -45,7 +46,8 @@ const ( // DeploymentReplicasSyncer is to sync deployment replicas from status field to spec field. type DeploymentReplicasSyncer struct { - Client client.Client + Client client.Client + RateLimiterOptions ratelimiterflag.Options } var predicateFunc = predicate.Funcs{ diff --git a/pkg/controllers/federatedresourcequota/federated_resource_quota_status_controller.go b/pkg/controllers/federatedresourcequota/federated_resource_quota_status_controller.go index 051fcabb15c3..153a3ee8ead4 100644 --- a/pkg/controllers/federatedresourcequota/federated_resource_quota_status_controller.go +++ b/pkg/controllers/federatedresourcequota/federated_resource_quota_status_controller.go @@ -34,6 +34,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -42,6 +43,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" @@ -54,8 +56,9 @@ const ( // StatusController is to collect status from work to FederatedResourceQuota. type StatusController struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + RateLimiterOptions ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -132,6 +135,9 @@ func (c *StatusController) SetupWithManager(mgr controllerruntime.Manager) error Named(StatusControllerName). For(&policyv1alpha1.FederatedResourceQuota{}). Watches(&workv1alpha1.Work{}, handler.EnqueueRequestsFromMapFunc(fn), workPredicate). + WithOptions(controller.Options{ + RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions), + }). Complete(c) } diff --git a/pkg/controllers/federatedresourcequota/federated_resource_quota_sync_controller.go b/pkg/controllers/federatedresourcequota/federated_resource_quota_sync_controller.go index 3ede82464550..2286d3283f15 100644 --- a/pkg/controllers/federatedresourcequota/federated_resource_quota_sync_controller.go +++ b/pkg/controllers/federatedresourcequota/federated_resource_quota_sync_controller.go @@ -29,6 +29,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -39,6 +40,7 @@ import ( workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/ctrlutil" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" @@ -51,8 +53,9 @@ const ( // SyncController is to sync FederatedResourceQuota. type SyncController struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + RatelimiterOption ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -133,6 +136,9 @@ func (c *SyncController) SetupWithManager(mgr controllerruntime.Manager) error { Named(SyncControllerName). For(&policyv1alpha1.FederatedResourceQuota{}). Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(fn), clusterPredicate). + WithOptions(controller.Options{ + RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption), + }). Complete(c) } diff --git a/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go b/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go index 0951fd9fa6ec..15261e6ffe6a 100644 --- a/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go +++ b/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go @@ -24,7 +24,9 @@ import ( "k8s.io/client-go/dynamic" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/controller" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" ) @@ -41,6 +43,7 @@ type HpaScaleTargetMarker struct { RESTMapper meta.RESTMapper scaleTargetWorker util.AsyncWorker + RatelimiterOption ratelimiterflag.Options } // SetupWithManager creates a controller and register to controller manager. @@ -55,6 +58,9 @@ func (r *HpaScaleTargetMarker) SetupWithManager(mgr controllerruntime.Manager) e return controllerruntime.NewControllerManagedBy(mgr). Named(ControllerName). For(&autoscalingv2.HorizontalPodAutoscaler{}, builder.WithPredicates(r)). + WithOptions(controller.Options{ + RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](r.RatelimiterOption), + }). Complete(r) } diff --git a/pkg/controllers/mcs/endpointslice_controller.go b/pkg/controllers/mcs/endpointslice_controller.go index 445f0f49163b..97aa2fc43e0f 100644 --- a/pkg/controllers/mcs/endpointslice_controller.go +++ b/pkg/controllers/mcs/endpointslice_controller.go @@ -29,12 +29,14 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" @@ -46,7 +48,8 @@ const EndpointSliceControllerName = "endpointslice-controller" // EndpointSliceController is to collect EndpointSlice which reported by member cluster from executionNamespace to serviceexport namespace. type EndpointSliceController struct { client.Client - EventRecorder record.EventRecorder + EventRecorder record.EventRecorder + RatelimiterOption ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -118,6 +121,9 @@ func (c *EndpointSliceController) SetupWithManager(mgr controllerruntime.Manager return controllerruntime.NewControllerManagedBy(mgr). Named(EndpointSliceControllerName). For(&workv1alpha1.Work{}, builder.WithPredicates(serviceImportPredicateFun)). + WithOptions(controller.Options{ + RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption), + }). Complete(c) } diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 16a682edb797..223450f96955 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -42,12 +42,14 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/predicate" mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/ctrlutil" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" @@ -76,7 +78,8 @@ type ServiceExportController struct { // "member1": instance of ResourceEventHandler eventHandlers sync.Map // worker process resources periodic from rateLimitingQueue. - worker util.AsyncWorker + worker util.AsyncWorker + RatelimiterOption ratelimiterflag.Options } var ( @@ -132,7 +135,11 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr // SetupWithManager creates a controller and register to controller manager. func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager) error { - return controllerruntime.NewControllerManagedBy(mgr).Named(ServiceExportControllerName).For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c) + return controllerruntime.NewControllerManagedBy(mgr).Named(ServiceExportControllerName).For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)). + WithOptions(controller.Options{ + RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption), + }). + Complete(c) } // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. diff --git a/pkg/controllers/mcs/service_import_controller.go b/pkg/controllers/mcs/service_import_controller.go index 7d8947a0119d..8c271dd812bd 100644 --- a/pkg/controllers/mcs/service_import_controller.go +++ b/pkg/controllers/mcs/service_import_controller.go @@ -28,9 +28,11 @@ import ( "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" ) @@ -41,7 +43,8 @@ const ServiceImportControllerName = "service-import-controller" // ServiceImportController is to sync derived service from ServiceImport. type ServiceImportController struct { client.Client - EventRecorder record.EventRecorder + EventRecorder record.EventRecorder + RatelimiterOption ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -74,6 +77,7 @@ func (c *ServiceImportController) SetupWithManager(mgr controllerruntime.Manager return controllerruntime.NewControllerManagedBy(mgr). Named(ServiceImportControllerName). For(&mcsv1alpha1.ServiceImport{}). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption)}). Complete(c) } diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go index 8f398166b9df..5739188906bb 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go @@ -38,12 +38,14 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/predicate" clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/ctrlutil" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" @@ -68,6 +70,7 @@ type EndpointSliceCollectController struct { worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. ClusterCacheSyncTimeout metav1.Duration + RatelimiterOption ratelimiterflag.Options } var ( @@ -120,7 +123,9 @@ func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req cont func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.Manager) error { return controllerruntime.NewControllerManagedBy(mgr). Named(EndpointSliceCollectControllerName). - For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c) + For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption)}). + Complete(c) } // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. diff --git a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go index f7b7618ff93b..916d588db903 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go @@ -33,6 +33,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -44,6 +45,7 @@ import ( workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/ctrlutil" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" "github.com/karmada-io/karmada/pkg/util/helper" @@ -56,9 +58,10 @@ const EndpointsliceDispatchControllerName = "endpointslice-dispatch-controller" // EndpointsliceDispatchController will reconcile a MultiClusterService object type EndpointsliceDispatchController struct { client.Client - EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper - InformerManager genericmanager.MultiClusterInformerManager + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + InformerManager genericmanager.MultiClusterInformerManager + RatelimiterOption ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -166,6 +169,7 @@ func (c *EndpointsliceDispatchController) SetupWithManager(mgr controllerruntime For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)). Watches(&networkingv1alpha1.MultiClusterService{}, handler.EnqueueRequestsFromMapFunc(c.newMultiClusterServiceFunc())). Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(c.newClusterFunc())). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption)}). Complete(c) } diff --git a/pkg/controllers/namespace/namespace_sync_controller.go b/pkg/controllers/namespace/namespace_sync_controller.go index 93145cd650a2..ef9ed715d212 100644 --- a/pkg/controllers/namespace/namespace_sync_controller.go +++ b/pkg/controllers/namespace/namespace_sync_controller.go @@ -33,6 +33,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -42,6 +43,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/binding" "github.com/karmada-io/karmada/pkg/controllers/ctrlutil" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" @@ -59,6 +61,7 @@ type Controller struct { EventRecorder record.EventRecorder SkippedPropagatingNamespaces []*regexp.Regexp OverrideManager overridemanager.OverrideManager + RateLimitOptions ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -282,5 +285,8 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { Watches(&policyv1alpha1.ClusterOverridePolicy{}, handler.EnqueueRequestsFromMapFunc(clusterOverridePolicyNamespaceFn), clusterOverridePolicyPredicate). + WithOptions(controller.Options{ + RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimitOptions), + }). Complete(c) } diff --git a/pkg/controllers/unifiedauth/unified_auth_controller.go b/pkg/controllers/unifiedauth/unified_auth_controller.go index eb8eee232aca..5193ba701d62 100644 --- a/pkg/controllers/unifiedauth/unified_auth_controller.go +++ b/pkg/controllers/unifiedauth/unified_auth_controller.go @@ -31,6 +31,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -39,6 +40,7 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/ctrlutil" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" @@ -58,8 +60,9 @@ const ( // Controller is to sync impersonation config to member clusters for unified authentication. type Controller struct { - client.Client // used to operate Cluster resources. - EventRecorder record.EventRecorder + client.Client // used to operate Cluster resources. + EventRecorder record.EventRecorder + RatelimiterOption ratelimiterflag.Options } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -260,6 +263,7 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { For(&clusterv1alpha1.Cluster{}, builder.WithPredicates(clusterPredicateFunc)). Watches(&rbacv1.ClusterRole{}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleMapFunc())). Watches(&rbacv1.ClusterRoleBinding{}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption)}). Complete(c) } diff --git a/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go b/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go index 16715431789e..43af8ffc171a 100644 --- a/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go +++ b/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go @@ -31,11 +31,13 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" appsv1alpha1 "github.com/karmada-io/karmada/pkg/apis/apps/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util/names" ) @@ -46,7 +48,8 @@ const ( // RebalancerController is to handle a rebalance to workloads selected by WorkloadRebalancer object. type RebalancerController struct { - Client client.Client + Client client.Client + RatelimiterOption ratelimiterflag.Options } // SetupWithManager creates a controller and register to controller manager. @@ -65,6 +68,7 @@ func (c *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) e return controllerruntime.NewControllerManagedBy(mgr). Named(ControllerName). For(&appsv1alpha1.WorkloadRebalancer{}, builder.WithPredicates(predicateFunc)). + WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption)}). Complete(c) } diff --git a/pkg/dependenciesdistributor/dependencies_distributor.go b/pkg/dependenciesdistributor/dependencies_distributor.go index 16f44a150026..749cee4ab5bb 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor.go +++ b/pkg/dependenciesdistributor/dependencies_distributor.go @@ -619,7 +619,8 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error { Labels: metaInfo.GetLabels(), }, nil }, - ReconcileFunc: d.reconcileResourceTemplate, + ReconcileFunc: d.reconcileResourceTemplate, + RateLimiterOptions: d.RateLimiterOptions, } d.eventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) d.resourceProcessor = util.NewAsyncWorker(resourceWorkerOptions) diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 9d9cb0189614..244150a3f9ab 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -117,16 +117,18 @@ func (d *ResourceDetector) Start(ctx context.Context) error { // setup policy reconcile worker policyWorkerOptions := util.Options{ - Name: "propagationPolicy reconciler", - KeyFunc: ClusterWideKeyFunc, - ReconcileFunc: d.ReconcilePropagationPolicy, + Name: "propagationPolicy reconciler", + KeyFunc: ClusterWideKeyFunc, + ReconcileFunc: d.ReconcilePropagationPolicy, + RateLimiterOptions: d.RateLimiterOptions, } d.policyReconcileWorker = util.NewAsyncWorker(policyWorkerOptions) d.policyReconcileWorker.Run(d.ConcurrentPropagationPolicySyncs, d.stopCh) clusterPolicyWorkerOptions := util.Options{ - Name: "clusterPropagationPolicy reconciler", - KeyFunc: ClusterWideKeyFunc, - ReconcileFunc: d.ReconcileClusterPropagationPolicy, + Name: "clusterPropagationPolicy reconciler", + KeyFunc: ClusterWideKeyFunc, + ReconcileFunc: d.ReconcileClusterPropagationPolicy, + RateLimiterOptions: d.RateLimiterOptions, } d.clusterPolicyReconcileWorker = util.NewAsyncWorker(clusterPolicyWorkerOptions) d.clusterPolicyReconcileWorker.Run(d.ConcurrentClusterPropagationPolicySyncs, d.stopCh)