Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[router] Fix in-flight request check in router shutdown #1553

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class VeniceRouterWrapper extends ProcessWrapper implements MetricsAware
private static final String ROUTER_SERVICE_METRIC_PREFIX = "router";
private final VeniceProperties properties;
private final String zkAddress;
private RouterServer service;
private RouterServer routerServer;
private final String d2ClusterName;
private final String clusterDiscoveryD2ClusterName;
private final String regionName;
Expand All @@ -81,13 +81,13 @@ public class VeniceRouterWrapper extends ProcessWrapper implements MetricsAware
String regionName,
String serviceName,
File dataDirectory,
RouterServer service,
RouterServer routerServer,
VeniceProperties properties,
String zkAddress,
String d2ClusterName,
String clusterDiscoveryD2ClusterName) {
super(serviceName, dataDirectory);
this.service = service;
this.routerServer = routerServer;
this.properties = properties;
this.zkAddress = zkAddress;
this.d2ClusterName = d2ClusterName;
Expand Down Expand Up @@ -224,22 +224,22 @@ public int getSslPort() {
}

public String getD2ServiceNameForCluster(String clusterName) {
return service.getConfig().getClusterToD2Map().get(clusterName);
return routerServer.getConfig().getClusterToD2Map().get(clusterName);
}

@Override
protected void internalStart() throws Exception {
service.start();
routerServer.start();
TestUtils.waitForNonDeterministicCompletion(
IntegrationTestUtils.MAX_ASYNC_START_WAIT_TIME_MS,
TimeUnit.MILLISECONDS,
() -> service.isRunning());
() -> routerServer.isRunning());
LOGGER.info("Started VeniceRouterWrapper: {}", this);
}

@Override
protected void internalStop() throws Exception {
service.stop();
routerServer.stop();
}

@Override
Expand All @@ -251,7 +251,7 @@ protected void newProcess() {

d2Servers.addAll(D2TestUtils.getD2Servers(zkAddress, clusterDiscoveryD2ClusterName, httpURI, httpsURI));

service = new RouterServer(
routerServer = new RouterServer(
properties,
d2Servers,
Optional.empty(),
Expand All @@ -272,28 +272,32 @@ public String getComponentTagForLogging() {
}

public HelixBaseRoutingRepository getRoutingDataRepository() {
return service.getRoutingDataRepository();
return routerServer.getRoutingDataRepository();
}

public ReadOnlyStoreRepository getMetaDataRepository() {
return service.getMetadataRepository();
return routerServer.getMetadataRepository();
}

public ReadOnlySchemaRepository getSchemaRepository() {
return service.getSchemaRepository();
return routerServer.getSchemaRepository();
}

public ZkRoutersClusterManager getRoutersClusterManager() {
return service.getRoutersClusterManager();
return routerServer.getRoutersClusterManager();
}

@Override
public MetricsRepository getMetricsRepository() {
return service.getMetricsRepository();
return routerServer.getMetricsRepository();
}

public RouterServer getRouter() {
return routerServer;
}

public void refresh() {
service.refresh();
routerServer.refresh();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ public void testRead() throws Exception {
"There should be some idle connections since test queries are finished");
}

Assert.assertTrue(getRouterMetricValue("total_inflight_request_count") > 0.0);
Assert.assertTrue(getAggregateRouterMetricValue(".localhost--response_waiting_time.50thPercentile") > 0);
Assert.assertTrue(
getAggregateRouterMetricValue(".localhost--multiget_streaming_response_waiting_time.50thPercentile") > 0);
Expand Down Expand Up @@ -553,6 +554,9 @@ public void testRead() throws Exception {
try {
queriesSent++;
storeClient.batchGet(keySet).get();
for (VeniceRouterWrapper routerWrapper: veniceCluster.getVeniceRouters()) {
Assert.assertTrue(routerWrapper.getRouter().hasInFlightRequest());
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Assert.assertTrue(
Expand Down Expand Up @@ -592,6 +596,12 @@ public void testRead() throws Exception {
"The throttled_request metric is inconsistent with the number of quota exceptions received by the client!");

getAggregateRouterMetricValue(".total--multiget_throttled_request_latency.Max");
for (VeniceRouterWrapper routerWrapper: veniceCluster.getVeniceRouters()) {
routerWrapper.getRouter().stop();
}
for (VeniceRouterWrapper routerWrapper: veniceCluster.getVeniceRouters()) {
Assert.assertFalse(routerWrapper.getRouter().hasInFlightRequest());
}
/** TODO Re-enable this assertion once we stop throwing batch get quota exceptions from {@link com.linkedin.venice.router.api.VeniceDelegateMode} */
// Assert.assertTrue(throttledRequestLatencyForBatchGetAfterQueries > 0.0, "There should be batch get throttled
// request latency now!");
Expand All @@ -602,6 +612,10 @@ private double getMaxServerMetricValue(String metricName) {
return MetricsUtils.getMax(metricName, veniceCluster.getVeniceServers());
}

private double getRouterMetricValue(String metricName) {
return MetricsUtils.getMax(metricName, veniceCluster.getVeniceRouters());
}

private double getMaxRouterMetricValue(String metricName) {
return MetricsUtils.getMax(metricName, veniceCluster.getVeniceRouters());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,10 +852,11 @@ public void stopInner() throws Exception {
* correctly.
*/

LOGGER.info("Waiting to make sure all in-flight requests are drained");
// Graceful shutdown: Wait till all the requests are drained
try {
RetryUtils.executeWithMaxAttempt(() -> {
if (RouterHttpRequestStats.hasInFlightRequests()) {
if (hasInFlightRequest()) {
throw new VeniceException("There are still in-flight requests in router");
}
},
Expand All @@ -866,6 +867,7 @@ public void stopInner() throws Exception {
LOGGER.error(
"There are still in-flight request during router shutdown, still continuing shutdown, it might cause unhealthy request in client");
}
LOGGER.info("Drained all in-flight requests, starting to shutdown the router.");
storageNodeClient.close();
workerEventLoopGroup.shutdownGracefully();
serverEventLoopGroup.shutdownGracefully();
Expand Down Expand Up @@ -927,6 +929,10 @@ public ReadOnlySchemaRepository getSchemaRepository() {
return schemaRepository;
}

public boolean hasInFlightRequest() {
return RouterHttpRequestStats.hasInFlightRequests();
}

private void handleExceptionInStartServices(VeniceException e, boolean async) throws VeniceException {
if (async) {
Utils.exit("Failed to start router services due to " + e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

public class RouterHttpRequestStats extends AbstractVeniceHttpStats {
private static final MetricConfig METRIC_CONFIG = new MetricConfig().timeWindow(10, TimeUnit.SECONDS);
private static final String TOTAL_INFLIGHT_REQUEST_COUNT = "total_inflight_request_count";
private static final VeniceMetricsRepository localMetricRepo = new VeniceMetricsRepository(
new VeniceMetricsConfig.Builder().setServiceName(ROUTER_SERVICE_NAME)
.setMetricPrefix(ROUTER_SERVICE_METRIC_PREFIX)
Expand All @@ -74,7 +75,7 @@ public class RouterHttpRequestStats extends AbstractVeniceHttpStats {

private final static Sensor totalInflightRequestSensor = localMetricRepo.sensor("total_inflight_request");
static {
totalInflightRequestSensor.add("total_inflight_request_count", new Rate());
totalInflightRequestSensor.add(TOTAL_INFLIGHT_REQUEST_COUNT, new Gauge());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gauge doesn't honor any timed window, but just store the most recent value.
So the increment/decrement won't work.

If it is not windowed, who will update the metric to be 0 then?

Removing record(-1) should just work and if we want to speed up the stop, we should adjust time window size for this particular metric and currently, it is 10s, which means the Router can only be stopped after 20s since no traffic (by default, there are 2 windows).

}

/** metrics to track incoming requests */
Expand Down Expand Up @@ -658,9 +659,9 @@ private Sensor registerSensorFinal(String sensorName, MeasurableStat... stats) {
}

static public boolean hasInFlightRequests() {
Metric metric = localMetricRepo.getMetric("total_inflight_request_count");
Metric metric = localMetricRepo.getMetric(TOTAL_INFLIGHT_REQUEST_COUNT);
// max return -infinity when there are no samples. validate only against finite value
return Double.isFinite(metric.value()) ? metric.value() > 0.0 : false;
return Double.isFinite(metric.value()) && metric.value() > 0.0;
}

/** used only for testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,7 @@ public void routerMetricsTest() {
Assert.assertEquals(stats.getPendingRequestCount("my_host2"), 0);
routerHttpRequestStats.recordIncomingRequest();
Assert.assertTrue(RouterHttpRequestStats.hasInFlightRequests());
routerHttpRequestStats.recordResponse();
Assert.assertFalse(RouterHttpRequestStats.hasInFlightRequests());
}
}
Loading