Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[router] Fix in-flight request check in router shutdown #1553

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

majisourav99
Copy link
Contributor

@majisourav99 majisourav99 commented Feb 25, 2025

Fix in-flight request draining in router shutdown

Fix: Properly Track In-Flight Requests During Router Shutdown

When the router is shutting down, it checks for in-flight requests to ensure they are properly drained — otherwise, clients may encounter errors.

Currently, this uses a Rate sensor to increment and decrement the request count. However, calling record(-1) on a Rate sensor does not actually decrease the sensor’s rate value. This can lead to incorrect tracking of in-flight requests and potentially leave requests unaccounted for during shutdown.

This PR addresses that behavior to ensure request counts are accurately managed and clients don’t see unexpected errors during router shutdown by replacing the Rate with Gauge stat.

How was this PR tested?

CI tests

Does this PR introduce any user-facing changes?

  • No. You can skip the rest of this section.
  • Yes. Make sure to explain your proposed changes and call out the behavior change.

Copy link
Collaborator

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you don’t need to change the sensor type, right? You can simply stop recording once the request is processed (remove decrement sensor part). Over time, the rate will naturally drop to zero, and the inflight request count will reach zero.

That said, if we want to track inflight requests accurately, shouldn’t this be a gauge-type metric?

Copy link
Collaborator

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please do two small changes in this PR:

  1. Can you replace sensor name: total_inflight_request_count literal with a defined constant?
  2. Use Double.compare for real number comparison ? return val != Double.NaN && Double.compare(metric.value(), 0.0) > 0;
  static public boolean hasInFlightRequests() {
    Metric metric = localMetricRepo.getMetric("total_inflight_request_count");
    // max return -infinity when there are no samples. validate only against finite value
    return Double.isFinite(metric.value()) ? metric.value() > 0.0 : false;
  }

sushantmane
sushantmane previously approved these changes Feb 26, 2025
Copy link
Collaborator

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@@ -74,7 +75,7 @@ public class RouterHttpRequestStats extends AbstractVeniceHttpStats {

private final static Sensor totalInflightRequestSensor = localMetricRepo.sensor("total_inflight_request");
static {
totalInflightRequestSensor.add("total_inflight_request_count", new Rate());
totalInflightRequestSensor.add(TOTAL_INFLIGHT_REQUEST_COUNT, new Gauge());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gauge doesn't honor any timed window, but just store the most recent value.
So the increment/decrement won't work.

If it is not windowed, who will update the metric to be 0 then?

Removing record(-1) should just work and if we want to speed up the stop, we should adjust time window size for this particular metric and currently, it is 10s, which means the Router can only be stopped after 20s since no traffic (by default, there are 2 windows).

Copy link
Collaborator

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a unit test to verify that inflight request recording behaves as expected, i.e., ensuring requests are tracked during the recording window, the rate remains non-zero until the recordingWindowDuration has elapsed, and the max wait time for the rate to drop to zero does not exceed 2 * recordingWindowDuration?

@sushantmane
Copy link
Collaborator

sushantmane commented Feb 26, 2025

Could you add a unit test to verify that inflight request recording behaves as expected, i.e., ensuring requests are tracked during the recording window, the rate remains non-zero until the recordingWindowDuration has elapsed, and the max wait time for the rate to drop to zero does not exceed 2 * recordingWindowDuration?

One possible way (feel free to update as you see fit):

@Test
public void testInflightRequestsGoDownAfterDefinedThreshold() throws InterruptedException {
  long startTime = System.currentTimeMillis();
  long deltaTime = 5000; // Keep recording for 5 seconds
  long recordingWindowDuration = 10_000; // Duration to expect inflight requests assuming 10sec window cutoff 
  AtomicLong lastRecorded = new AtomicLong(0);
  CountDownLatch latch = new CountDownLatch(1);

  assertFalse(RouterHttpRequestStats.hasInFlightRequests());

  // Thread to simulate inflight request additions
  Thread inflightThread = new Thread(() -> {
    while (System.currentTimeMillis() < startTime + deltaTime) {
      RouterHttpRequestStats.totalInflightRequestSensor.record();
      lastRecorded.set(System.currentTimeMillis());
      latch.countDown();
    }
  });

  inflightThread.start();
  latch.await(); // Ensure at least one inflight request is recorded

  // Verify inflight requests exist during the expected window
  while (System.currentTimeMillis() < lastRecorded.get() + recordingWindowDuration) {
    assertTrue(RouterHttpRequestStats.hasInFlightRequests(),
        "Expected inflight requests - elapsed: " + (System.currentTimeMillis() - lastRecorded.get()));
  }
  System.out.println("A. Total elapsed time since last recorded: " + (System.currentTimeMillis() - lastRecorded.get()) + " total time elapsed: " + (System.currentTimeMillis() - startTime));

  // Wait until after the expected window to verify requests drop to zero; this could take one full window duration 
  Utils.sleep(recordingWindowDuration);
  System.out.println("B. Total elapsed time since last recorded: " + (System.currentTimeMillis() - lastRecorded.get()) + " total time elapsed: " + (System.currentTimeMillis() - startTime));

  assertFalse(RouterHttpRequestStats.hasInFlightRequests(),
      "Expected no inflight requests - elapsed: " + (System.currentTimeMillis() - lastRecorded.get()));

  System.out.println("X. Total elapsed time since last recorded: " + (System.currentTimeMillis() - lastRecorded.get()) + " total time elapsed: " + (System.currentTimeMillis() - startTime));


  // Stop the recording thread
  inflightThread.interrupt();
  inflightThread.join();
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants