Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(detector): add ratelimiteroptions for policy worker options #6145

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func startClusterController(ctx controllerscontext.Context) (enabled bool, err e
EnableTaintManager: ctx.Opts.EnableTaintManager,
ClusterTaintEvictionRetryFrequency: 10 * time.Second,
ExecutionSpaceRetryFrequency: 10 * time.Second,
RateLimiterOption: ctx.Opts.RateLimiterOptions,
}
if err := clusterController.SetupWithManager(mgr); err != nil {
return false, err
Expand All @@ -269,6 +270,7 @@ func startClusterController(ctx controllerscontext.Context) (enabled bool, err e
EventRecorder: mgr.GetEventRecorderFor(cluster.TaintManagerName),
ClusterTaintEvictionRetryFrequency: 10 * time.Second,
ConcurrentReconciles: 3,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
if err := taintManager.SetupWithManager(mgr); err != nil {
return false, err
Expand Down Expand Up @@ -447,6 +449,7 @@ func startNamespaceController(ctx controllerscontext.Context) (enabled bool, err
EventRecorder: ctx.Mgr.GetEventRecorderFor(namespace.ControllerName),
SkippedPropagatingNamespaces: ctx.Opts.SkippedPropagatingNamespaces,
OverrideManager: ctx.OverrideManager,
RateLimitOptions: ctx.Opts.RateLimiterOptions,
}
if err := namespaceSyncController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -466,6 +469,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
serviceExportController.RunWorkQueue()
if err := serviceExportController.SetupWithManager(ctx.Mgr); err != nil {
Expand All @@ -488,6 +492,7 @@ func startEndpointSliceCollectController(ctx controllerscontext.Context) (enable
PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
endpointSliceCollectController.RunWorkQueue()
if err := endpointSliceCollectController.SetupWithManager(ctx.Mgr); err != nil {
Expand All @@ -501,10 +506,11 @@ func startEndpointSliceDispatchController(ctx controllerscontext.Context) (enabl
return false, nil
}
endpointSliceSyncController := &multiclusterservice.EndpointsliceDispatchController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(multiclusterservice.EndpointsliceDispatchControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(multiclusterservice.EndpointsliceDispatchControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: genericmanager.GetInstance(),
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
if err := endpointSliceSyncController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -514,8 +520,9 @@ func startEndpointSliceDispatchController(ctx controllerscontext.Context) (enabl

func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool, err error) {
endpointSliceController := &mcs.EndpointSliceController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName),
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.EndpointSliceControllerName),
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
if err := endpointSliceController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -525,8 +532,9 @@ func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool,

func startServiceImportController(ctx controllerscontext.Context) (enabled bool, err error) {
serviceImportController := &mcs.ServiceImportController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName),
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.ServiceImportControllerName),
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
if err := serviceImportController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -536,8 +544,9 @@ func startServiceImportController(ctx controllerscontext.Context) (enabled bool,

func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, err error) {
unifiedAuthController := &unifiedauth.Controller{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(unifiedauth.ControllerName),
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(unifiedauth.ControllerName),
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
if err := unifiedAuthController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -547,8 +556,9 @@ func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, e

func startFederatedResourceQuotaSyncController(ctx controllerscontext.Context) (enabled bool, err error) {
controller := federatedresourcequota.SyncController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.SyncControllerName),
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.SyncControllerName),
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
if err = controller.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand All @@ -558,8 +568,9 @@ func startFederatedResourceQuotaSyncController(ctx controllerscontext.Context) (

func startFederatedResourceQuotaStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
controller := federatedresourcequota.StatusController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.StatusControllerName),
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.StatusControllerName),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
if err = controller.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand Down Expand Up @@ -596,6 +607,7 @@ func startApplicationFailoverController(ctx controllerscontext.Context) (enabled
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(applicationfailover.RBApplicationFailoverControllerName),
ResourceInterpreter: ctx.ResourceInterpreter,
RateLimiterOption: ctx.Opts.RateLimiterOptions,
}
if err = rbApplicationFailoverController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
Expand Down Expand Up @@ -659,8 +671,9 @@ func startCronFederatedHorizontalPodAutoscalerController(ctx controllerscontext.

func startHPAScaleTargetMarkerController(ctx controllerscontext.Context) (enabled bool, err error) {
hpaScaleTargetMarker := hpascaletargetmarker.HpaScaleTargetMarker{
DynamicClient: ctx.DynamicClientSet,
RESTMapper: ctx.Mgr.GetRESTMapper(),
DynamicClient: ctx.DynamicClientSet,
RESTMapper: ctx.Mgr.GetRESTMapper(),
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
err = hpaScaleTargetMarker.SetupWithManager(ctx.Mgr)
if err != nil {
Expand All @@ -672,7 +685,8 @@ func startHPAScaleTargetMarkerController(ctx controllerscontext.Context) (enable

func startDeploymentReplicasSyncerController(ctx controllerscontext.Context) (enabled bool, err error) {
deploymentReplicasSyncer := deploymentreplicassyncer.DeploymentReplicasSyncer{
Client: ctx.Mgr.GetClient(),
Client: ctx.Mgr.GetClient(),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
err = deploymentReplicasSyncer.SetupWithManager(ctx.Mgr)
if err != nil {
Expand Down Expand Up @@ -710,7 +724,8 @@ func startRemedyController(ctx controllerscontext.Context) (enabled bool, err er

func startWorkloadRebalancerController(ctx controllerscontext.Context) (enabled bool, err error) {
workloadRebalancer := workloadrebalancer.RebalancerController{
Client: ctx.Mgr.GetClient(),
Client: ctx.Mgr.GetClient(),
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
err = workloadRebalancer.SetupWithManager(ctx.Mgr)
if err != nil {
Expand All @@ -721,7 +736,10 @@ func startWorkloadRebalancerController(ctx controllerscontext.Context) (enabled
}

func startAgentCSRApprovingController(ctx controllerscontext.Context) (enabled bool, err error) {
agentCSRApprover := approver.AgentCSRApprovingController{Client: ctx.KubeClientSet}
agentCSRApprover := approver.AgentCSRApprovingController{
Client: ctx.KubeClientSet,
RatelimiterOption: ctx.Opts.RateLimiterOptions,
}
err = agentCSRApprover.SetupWithManager(ctx.Mgr)
if err != nil {
return false, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ const RBApplicationFailoverControllerName = "resource-binding-application-failov
// RBApplicationFailoverController is to sync ResourceBinding's application failover behavior.
type RBApplicationFailoverController struct {
client.Client
EventRecorder record.EventRecorder
RateLimiterOptions ratelimiterflag.Options
EventRecorder record.EventRecorder
RateLimiterOption ratelimiterflag.Options

// workloadUnhealthyMap records which clusters the specific resource is in an unhealthy state
workloadUnhealthyMap *workloadUnhealthyMap
Expand Down Expand Up @@ -209,7 +209,7 @@ func (c *RBApplicationFailoverController) SetupWithManager(mgr controllerruntime
return controllerruntime.NewControllerManagedBy(mgr).
Named(RBApplicationFailoverControllerName).
For(&workv1alpha2.ResourceBinding{}, builder.WithPredicates(resourceBindingPredicateFn)).
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions)}).
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOption)}).
Complete(c)
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/controllers/certificate/approver/agent_csr_approving.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util/certificate"
)

Expand All @@ -47,7 +49,8 @@ const (

// AgentCSRApprovingController is used to automatically approve the agent's CSR.
type AgentCSRApprovingController struct {
Client kubernetes.Interface
Client kubernetes.Interface
RatelimiterOption ratelimiterflag.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -289,5 +292,6 @@ func (a *AgentCSRApprovingController) SetupWithManager(mgr controllerruntime.Man
return controllerruntime.NewControllerManagedBy(mgr).
Named(csrApprovingController).
For(&certificatesv1.CertificateSigningRequest{}, builder.WithPredicates(predicateFunc)).
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](a.RatelimiterOption)}).
Complete(a)
}
8 changes: 6 additions & 2 deletions pkg/controllers/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
utilhelper "github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
Expand Down Expand Up @@ -119,7 +121,8 @@ type Controller struct {
ExecutionSpaceRetryFrequency time.Duration

// Per Cluster map stores last observed health together with a local time when it was observed.
clusterHealthMap *clusterHealthMap
clusterHealthMap *clusterHealthMap
RateLimiterOption ratelimiterflag.Options
}

type clusterHealthMap struct {
Expand Down Expand Up @@ -215,7 +218,8 @@ func (c *Controller) Start(ctx context.Context) error {
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
c.clusterHealthMap = newClusterHealthMap()
return utilerrors.NewAggregate([]error{
controllerruntime.NewControllerManagedBy(mgr).Named(ControllerName).For(&clusterv1alpha1.Cluster{}).Complete(c),
controllerruntime.NewControllerManagedBy(mgr).Named(ControllerName).For(&clusterv1alpha1.Cluster{}).
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOption)}).Complete(c),
mgr.Add(c),
})
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/controllers/cluster/taint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
Expand All @@ -54,6 +56,7 @@ type NoExecuteTaintManager struct {

bindingEvictionWorker util.AsyncWorker
clusterBindingEvictionWorker util.AsyncWorker
RateLimiterOptions ratelimiterflag.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -298,7 +301,8 @@ func (tc *NoExecuteTaintManager) needEviction(clusterName string, annotations ma
// SetupWithManager creates a controller and register to controller manager.
func (tc *NoExecuteTaintManager) SetupWithManager(mgr controllerruntime.Manager) error {
return utilerrors.NewAggregate([]error{
controllerruntime.NewControllerManagedBy(mgr).Named(TaintManagerName).For(&clusterv1alpha1.Cluster{}).Complete(tc),
controllerruntime.NewControllerManagedBy(mgr).Named(TaintManagerName).For(&clusterv1alpha1.Cluster{}).
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](tc.RateLimiterOptions)}).Complete(tc),
mgr.Add(tc),
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/names"
)
Expand All @@ -45,7 +46,8 @@ const (

// DeploymentReplicasSyncer is to sync deployment replicas from status field to spec field.
type DeploymentReplicasSyncer struct {
Client client.Client
Client client.Client
RateLimiterOptions ratelimiterflag.Options
}

var predicateFunc = predicate.Funcs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -42,6 +43,7 @@ import (
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
Expand All @@ -54,8 +56,9 @@ const (

// StatusController is to collect status from work to FederatedResourceQuota.
type StatusController struct {
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
RateLimiterOptions ratelimiterflag.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -132,6 +135,9 @@ func (c *StatusController) SetupWithManager(mgr controllerruntime.Manager) error
Named(StatusControllerName).
For(&policyv1alpha1.FederatedResourceQuota{}).
Watches(&workv1alpha1.Work{}, handler.EnqueueRequestsFromMapFunc(fn), workPredicate).
WithOptions(controller.Options{
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions),
}).
Complete(c)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -39,6 +40,7 @@ import (
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/controllers/ctrlutil"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
Expand All @@ -51,8 +53,9 @@ const (

// SyncController is to sync FederatedResourceQuota.
type SyncController struct {
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
RatelimiterOption ratelimiterflag.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -133,6 +136,9 @@ func (c *SyncController) SetupWithManager(mgr controllerruntime.Manager) error {
Named(SyncControllerName).
For(&policyv1alpha1.FederatedResourceQuota{}).
Watches(&clusterv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(fn), clusterPredicate).
WithOptions(controller.Options{
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RatelimiterOption),
}).
Complete(c)
}

Expand Down
Loading