Skip to content

Commit

Permalink
[#9558] Configurable flink rest port
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed Dec 29, 2022
1 parent 8c35e75 commit 181481e
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import com.navercorp.pinpoint.common.server.config.AnnotationVisitor;
import com.navercorp.pinpoint.common.server.config.LoggingEvent;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -52,6 +52,8 @@ public class BatchConfiguration {
@Value("${batch.flink.server}")
private String[] flinkServerList = new String[0];

@Value("${batch.flink.rest.port:8081}")
private int flinkRestPort;

@Value("${job.cleanup.inactive.agents:false}")
private boolean enableCleanupInactiveAgents;
Expand All @@ -75,7 +77,7 @@ public class BatchConfiguration {
public void setup() {
beforeLog();

if (enableCleanupInactiveAgents == false) {
if (!enableCleanupInactiveAgents) {
cleanupInactiveAgentsDurationDays = DEFAULT_CLEANUP_INACTIVE_AGENTS_DURATION_DAYS;
cleanupInactiveAgentsCron = DISABLED_CLEANUP_INACTIVE_AGENTS_CRON;
} else {
Expand Down Expand Up @@ -108,6 +110,10 @@ public List<String> getFlinkServerList() {
return Arrays.asList(flinkServerList);
}

public int getFlinkRestPort() {
return flinkRestPort;
}

public String getEmailServerUrl() {
return emailServerUrl;
}
Expand Down Expand Up @@ -135,21 +141,20 @@ public String getCleanupInactiveAgentsCron() {
public boolean isWebhookEnable() {
return webhookEnable;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BatchConfiguration{");
sb.append("emailServerUrl='").append(emailServerUrl).append('\'');
sb.append(", senderEmailAddress='").append(senderEmailAddress).append('\'');
sb.append(", enableWebhook='").append(webhookEnable).append('\'');
sb.append(", pinpointUrl='").append(pinpointUrl).append('\'');
sb.append(", batchEnv='").append(batchEnv).append('\'');
sb.append(", flinkServerList=").append(Arrays.toString(flinkServerList));
sb.append(", enableCleanupInactiveAgents=").append(enableCleanupInactiveAgents);
sb.append(", cleanupInactiveAgentsDurationDays=").append(cleanupInactiveAgentsDurationDays);
sb.append(", cleanupInactiveAgentsCron='").append(cleanupInactiveAgentsCron).append('\'');
sb.append('}');
return sb.toString();
return "BatchConfiguration{" +
"emailServerUrl='" + emailServerUrl + '\'' +
", senderEmailAddress='" + senderEmailAddress + '\'' +
", pinpointUrl='" + pinpointUrl + '\'' +
", webhookEnable=" + webhookEnable +
", batchEnv='" + batchEnv + '\'' +
", flinkServerList=" + Arrays.toString(flinkServerList) +
", flinkRestPort=" + flinkRestPort +
", enableCleanupInactiveAgents=" + enableCleanupInactiveAgents +
", cleanupInactiveAgentsDurationDays=" + cleanupInactiveAgentsDurationDays +
", cleanupInactiveAgentsCron='" + cleanupInactiveAgentsCron + '\'' +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
package com.navercorp.pinpoint.batch.flink;

import com.navercorp.pinpoint.batch.common.BatchConfiguration;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -39,7 +41,7 @@
public class HealthCheckTaskletV2 implements Tasklet {

private final Logger logger = LogManager.getLogger(this.getClass());
private final static String URL_FORMAT = "http://%s:8081/jobs/overview";
private final static String URL_FORMAT = "http://%s:%d/jobs/overview";
private final static String NAME = "name";
private final static String STATE = "state";
private final static String RUNNING = "RUNNING";
Expand All @@ -58,7 +60,7 @@ public HealthCheckTaskletV2(BatchConfiguration batchConfiguration, RestTemplate
}

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
public RepeatStatus execute(@Nonnull StepContribution contribution, @Nonnull ChunkContext chunkContext) throws Exception {
List<String> urlList = generatedFlinkManagerServerApi();

if (urlList.isEmpty()) {
Expand All @@ -69,7 +71,8 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon

for (String url : urlList) {
try {
ResponseEntity<Map> responseEntity = this.restTemplate.exchange(url, HttpMethod.GET, null, Map.class);
ParameterizedTypeReference<Map<?, ?>> type = new ParameterizedTypeReference<>() {};
ResponseEntity<Map<?, ?>> responseEntity = this.restTemplate.exchange(url, HttpMethod.GET, null, type);

if (responseEntity.getStatusCode() != HttpStatus.OK) {
continue;
Expand All @@ -96,7 +99,7 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon
return RepeatStatus.FINISHED;
}

private void checkJobExecuteStatus(ResponseEntity<Map> responseEntity, Map<String, Boolean> jobExecuteStatus) {
private void checkJobExecuteStatus(ResponseEntity<Map<?, ?>> responseEntity, Map<String, Boolean> jobExecuteStatus) {
Map<?, ?> responseData = responseEntity.getBody();
if(responseData != null) {
List<?> jobs = (List<?>)responseData.get("jobs");
Expand All @@ -115,13 +118,14 @@ private void checkJobExecuteStatus(ResponseEntity<Map> responseEntity, Map<Strin
}
}


private List<String> generatedFlinkManagerServerApi() {
// @VisibleForTesting
List<String> generatedFlinkManagerServerApi() {
List<String> flinkServerList = batchConfiguration.getFlinkServerList();
int flinkRestPort = batchConfiguration.getFlinkRestPort();
List<String> urlList = new ArrayList<>(flinkServerList.size());

for (String flinkServerIp : flinkServerList) {
urlList.add(String.format(URL_FORMAT, flinkServerIp));
urlList.add(String.format(URL_FORMAT, flinkServerIp, flinkRestPort));
}

return urlList;
Expand Down
1 change: 1 addition & 0 deletions batch/src/main/resources/batch-root.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ webhook.enable=false

#flink server list
batch.flink.server=
batch.flink.rest.port=8081

#cleanup inactive agents job
job.cleanup.inactive.agents=false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.batch.flink;

import com.navercorp.pinpoint.batch.common.BatchConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.web.client.RestTemplate;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

/**
* @author youngjin.kim2
*/
@ExtendWith(MockitoExtension.class)
public class HealthCheckTaskletV2Test {

@Mock
private BatchConfiguration batchConfiguration;

@Mock
RestTemplate restTemplate;

@Test
public void testGeneratedFlinkManagerServerApi() {
when(batchConfiguration.getFlinkServerList()).thenReturn(List.of("123.234.123.234"));
when(batchConfiguration.getFlinkRestPort()).thenReturn(1919);

final HealthCheckTaskletV2 tasklet = new HealthCheckTaskletV2(batchConfiguration, restTemplate);
final List<String> results = tasklet.generatedFlinkManagerServerApi();
assertThat(results).hasSize(1);
final String result = results.get(0);
assertThat(result.indexOf("123.234.123.234:1919")).isGreaterThanOrEqualTo(0);
assertThat(result.indexOf("DEAD-BEEF")).isLessThan(0);
}

}

0 comments on commit 181481e

Please sign in to comment.