Skip to content

Commit

Permalink
[Bug] handle metricKey creation with MetricsSources (#498)
Browse files Browse the repository at this point in the history
* handle metricKey creation with MetricsSources or not

* removed duplcate metrickey creation

* consolidate createMetricKey functionality into NewNamespaceNameMetric.

* NewNamespaceNameMetric single argument
  • Loading branch information
nwangfw authored Dec 6, 2024
1 parent 2b13212 commit 2f5ed78
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 20 deletions.
15 changes: 13 additions & 2 deletions pkg/controller/podautoscaler/metrics/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,19 @@ type NamespaceNameMetric struct {
MetricName string
}

func NewNamespaceNameMetric(namespace string, name string, metricName string) NamespaceNameMetric {
return NamespaceNameMetric{NamespacedName: types.NamespacedName{Namespace: namespace, Name: name}, MetricName: metricName}
func NewNamespaceNameMetric(pa *autoscalingv1alpha1.PodAutoscaler) NamespaceNameMetric {
metricName := pa.Spec.TargetMetric
if len(pa.Spec.MetricsSources) > 0 {
metricName = pa.Spec.MetricsSources[0].Name
}

return NamespaceNameMetric{
NamespacedName: types.NamespacedName{
Namespace: pa.Namespace,
Name: pa.Spec.ScaleTargetRef.Name,
},
MetricName: metricName,
}
}

// PodMetric contains pod metric value (the metric values are expected to be the metric as a milli-value)
Expand Down
22 changes: 7 additions & 15 deletions pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (r *PodAutoscalerReconciler) reconcileCustomPA(ctx context.Context, pa auto
paStatusOriginal := pa.Status.DeepCopy()
paType := pa.Spec.ScalingStrategy
scaleReference := fmt.Sprintf("%s/%s/%s", pa.Spec.ScaleTargetRef.Kind, pa.Namespace, pa.Spec.ScaleTargetRef.Name)
metricKey := metrics.NewNamespaceNameMetric(&pa)

targetGV, err := schema.ParseGroupVersion(pa.Spec.ScaleTargetRef.APIVersion)
if err != nil {
Expand Down Expand Up @@ -324,7 +325,7 @@ func (r *PodAutoscalerReconciler) reconcileCustomPA(ctx context.Context, pa auto
setCondition(&pa, "AbleToScale", metav1.ConditionTrue, "SucceededGetScale", "the %s controller was able to get the target's current scale", paType)

// Update the scale required metrics periodically
err = r.updateMetricsForScale(ctx, pa, scale)
err = r.updateMetricsForScale(ctx, pa, scale, metricKey)
if err != nil {
r.EventRecorder.Event(&pa, corev1.EventTypeWarning, "FailedUpdateMetrics", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to update metrics for scale target reference: %v", err)
Expand Down Expand Up @@ -366,7 +367,7 @@ func (r *PodAutoscalerReconciler) reconcileCustomPA(ctx context.Context, pa auto
// if the currentReplicas is within the range, we should
// computeReplicasForMetrics gives
// TODO: check why it return the metrics name here?
metricDesiredReplicas, metricName, metricTimestamp, err := r.computeReplicasForMetrics(ctx, pa, scale)
metricDesiredReplicas, metricName, metricTimestamp, err := r.computeReplicasForMetrics(ctx, pa, scale, metricKey)
if err != nil && metricDesiredReplicas == -1 {
r.setCurrentReplicasAndMetricsInStatus(&pa, currentReplicas)
if err := r.updateStatusIfNeeded(ctx, paStatusOriginal, &pa); err != nil {
Expand Down Expand Up @@ -556,7 +557,7 @@ func (r *PodAutoscalerReconciler) updateStatus(ctx context.Context, pa *autoscal
// It may return both valid metricDesiredReplicas and an error,
// when some metrics still work and PA should perform scaling based on them.
// If PodAutoscaler cannot do anything due to error, it returns -1 in metricDesiredReplicas as a failure signal.
func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured) (replicas int32, relatedMetrics string, timestamp time.Time, err error) {
func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured, metricKey metrics.NamespaceNameMetric) (replicas int32, relatedMetrics string, timestamp time.Time, err error) {
logger := klog.FromContext(ctx)
currentTimestamp := time.Now()

Expand All @@ -574,19 +575,13 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context,
}

// TODO UpdateScalingContext (in updateScalerSpec) is duplicate invoked in computeReplicasForMetrics and updateMetricsForScale
err = r.updateScalerSpec(ctx, pa)
err = r.updateScalerSpec(ctx, pa, metricKey)
if err != nil {
klog.ErrorS(err, "Failed to update scaler spec from pa_types")
return 0, "", currentTimestamp, fmt.Errorf("error update scaler spec: %w", err)
}

logger.V(4).Info("Obtained selector and get ReadyPodsCount", "selector", labelsSelector, "originalReadyPodsCount", originalReadyPodsCount)
var metricKey metrics.NamespaceNameMetric
if len(pa.Spec.MetricsSources) > 0 {
metricKey = metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, pa.Spec.MetricsSources[0].Name)
} else {
metricKey = metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, pa.Spec.TargetMetric)
}

// Calculate the desired number of pods using the autoscaler logic.
autoScaler, ok := r.AutoscalerMap[metricKey]
Expand All @@ -605,18 +600,16 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context,
// refer to knative-serving.
// In pkg/reconciler/autoscaling/kpa/kpa.go:198, kpa maintains a list of deciders into multi-scaler, each of them corresponds to a pa (PodAutoscaler).
// We create or update the scaler instance according to the pa passed in
func (r *PodAutoscalerReconciler) updateScalerSpec(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler) error {
metricKey := NewNamespaceNameMetricByPa(pa)
func (r *PodAutoscalerReconciler) updateScalerSpec(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, metricKey metrics.NamespaceNameMetric) error {
autoScaler, ok := r.AutoscalerMap[metricKey]
if !ok {
return fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy)
}
return autoScaler.UpdateScalingContext(pa)
}

func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured) (err error) {
func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured, metricKey metrics.NamespaceNameMetric) (err error) {
currentTimestamp := time.Now()
metricKey := NewNamespaceNameMetricByPa(pa)
var autoScaler scaler.Scaler
autoScaler, exists := r.AutoscalerMap[metricKey]
if !exists {
Expand Down Expand Up @@ -649,7 +642,6 @@ func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa

// update metrics
for _, source := range pa.Spec.MetricsSources {
metricKey := metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, source.Name)
return autoScaler.UpdateSourceMetrics(ctx, metricKey, source, currentTimestamp)
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/controller/podautoscaler/scaler/apa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,20 @@ func TestAPAScale(t *testing.T) {
metricsFetcher := &metrics.RestMetricsFetcher{}
apaMetricsClient := metrics.NewAPAMetricsClient(metricsFetcher, spec.Window)
now := time.Now()
metricKey := metrics.NewNamespaceNameMetric("test_ns", "llama-70b", "ttot")

pa := autoscalingv1alpha1.PodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test_ns",
},
Spec: autoscalingv1alpha1.PodAutoscalerSpec{
TargetMetric: "ttot", // Set TargetMetric to "ttot"
ScaleTargetRef: corev1.ObjectReference{
Name: "llama-70b",
},
},
}

metricKey := metrics.NewNamespaceNameMetric(&pa)
_ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-60*time.Second), 10.0)
_ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-50*time.Second), 11.0)
_ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-40*time.Second), 12.0)
Expand Down
20 changes: 19 additions & 1 deletion pkg/controller/podautoscaler/scaler/kpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestKpaScale(t *testing.T) {
metricsFetcher := &metrics.RestMetricsFetcher{}
kpaMetricsClient := metrics.NewKPAMetricsClient(metricsFetcher, spec.StableWindow, spec.PanicWindow)
now := time.Now()
metricKey := metrics.NewNamespaceNameMetric("test_ns", "llama-70b", spec.ScalingMetric)
_ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-60*time.Second), 10.0)
_ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-50*time.Second), 11.0)
_ = kpaMetricsClient.UpdateMetricIntoWindow(now.Add(-40*time.Second), 12.0)
Expand All @@ -76,6 +75,25 @@ func TestKpaScale(t *testing.T) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

pa := autoscalingv1alpha1.PodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test_ns",
},
Spec: autoscalingv1alpha1.PodAutoscalerSpec{
TargetMetric: spec.ScalingMetric,
MetricsSources: []autoscalingv1alpha1.MetricSource{
{
Name: spec.ScalingMetric,
},
},
ScaleTargetRef: corev1.ObjectReference{
Name: "llama-70b",
},
},
}

metricKey := metrics.NewNamespaceNameMetric(&pa)

result := kpaScaler.Scale(readyPodCount, metricKey, now)
// recent rapid rising metric value make scaler adapt turn on panic mode
if result.DesiredPodCount != 10 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/podautoscaler/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ func extractLabelSelector(scale *unstructured.Unstructured) (labels.Selector, er
}

func NewNamespaceNameMetricByPa(pa autoscalingv1alpha1.PodAutoscaler) metrics.NamespaceNameMetric {
return metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, pa.Spec.TargetMetric)
return metrics.NewNamespaceNameMetric(&pa)
}

0 comments on commit 2f5ed78

Please sign in to comment.