Skip to content

Commit

Permalink
[#9141] Add support to kafka version 2.8 ~ 3.2 for kafka plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
ga-ram committed Aug 24, 2022
1 parent 6806635 commit f8c8b0e
Show file tree
Hide file tree
Showing 42 changed files with 820 additions and 119 deletions.
58 changes: 58 additions & 0 deletions plugins-it/kafka-it/kafka-2-it/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-kafka-plugin-it</artifactId>
<version>2.4.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>pinpoint-kafka-2-plugin-it</artifactId>

<packaging>jar</packaging>

<properties>
<jdk.version>1.8</jdk.version>
<jdk.home>${env.JAVA_8_HOME}</jdk.home>
</properties>

<dependencies>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-kafka-it-commons</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-plugin-it-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.kafka;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import test.pinpoint.plugin.kafka.Kafka2UnitServer;
import test.pinpoint.plugin.kafka.KafkaUnitServer;

public abstract class KafkaClient2ITBase extends KafkaClientITBase {

private static final KafkaUnitServer KAFKA_UNIT_SERVER = new Kafka2UnitServer(2189, 9092);

@BeforeClass
public static void beforeClass() {
KAFKA_UNIT_SERVER.startup();
TEST_CONSUMER.start();
}

@AfterClass
public static void afterClass() throws InterruptedException {
TEST_CONSUMER.shutdown();
KAFKA_UNIT_SERVER.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -34,10 +35,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[0.11.0.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[0.11.0.0,0.11.max]",
"org.apache.kafka:kafka-clients:[0.11.0.0,0.11.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_0_11_x_IT extends KafkaClientITBase {
public class KafkaClient_0_11_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -36,10 +37,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[1.0.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[1.0.0,1.0.max]"
"org.apache.kafka:kafka-clients:[1.0.0,1.0.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_1_0_x_IT extends KafkaClientITBase {
public class KafkaClient_1_0_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -36,10 +37,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[1.1.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[1.1.0,1.1.max]"
"org.apache.kafka:kafka-clients:[1.1.0,1.1.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_1_1_x_IT extends KafkaClientITBase {
public class KafkaClient_1_1_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -36,10 +37,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.0.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.0.0,2.0.max]",
"org.apache.kafka:kafka-clients:[2.0.0,2.0.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_2_0_x_IT extends KafkaClientITBase {
public class KafkaClient_2_0_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -36,10 +37,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.2.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.2.0,2.2.max]",
"org.apache.kafka:kafka-clients:[2.2.0,2.2.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_2_2_x_IT extends KafkaClientITBase {
public class KafkaClient_2_2_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -20,10 +21,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.3.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.3.0]",
"org.apache.kafka:kafka-clients:[2.3.0]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_2_3_0_IT extends KafkaClientITBase {
public class KafkaClient_2_3_0_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -20,10 +21,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.3.1]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.3.1,2.3.max]",
"org.apache.kafka:kafka-clients:[2.3.1,2.3.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_2_3_1_to_max_IT extends KafkaClientITBase {
public class KafkaClient_2_3_1_to_max_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -20,10 +21,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.4.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.4.0,2.4.max]"
"org.apache.kafka:kafka-clients:[2.4.0,2.4.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_2_4_x_IT extends KafkaClientITBase {
public class KafkaClient_2_4_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -20,10 +21,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.5.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.5.0,2.5.max]"
"org.apache.kafka:kafka-clients:[2.5.0,2.5.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_2_5_x_IT extends KafkaClientITBase {
public class KafkaClient_2_5_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.*;
import org.junit.Test;
Expand All @@ -20,10 +21,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.6.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.6.0,2.6.x]"
"org.apache.kafka:kafka-clients:[2.6.0,2.6.x]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_2_6_x_IT extends KafkaClientITBase {
public class KafkaClient_2_6_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.plugin.kafka;

import com.navercorp.pinpoint.common.Version;
import com.navercorp.pinpoint.pluginit.utils.AgentPath;
import com.navercorp.pinpoint.test.plugin.Dependency;
import com.navercorp.pinpoint.test.plugin.ImportPlugin;
Expand All @@ -42,10 +43,10 @@
@ImportPlugin({"com.navercorp.pinpoint:pinpoint-kafka-plugin"})
@Dependency({
"org.apache.kafka:kafka_2.12:[2.7.0]", "log4j:log4j:[1.2.17]", "commons-io:commons-io:[2.5.0]",
"org.apache.kafka:kafka-clients:[2.7.0,)"
"org.apache.kafka:kafka-clients:[2.7.0,2.7.max]", "com.navercorp.pinpoint:pinpoint-kafka-it-commons:"+ Version.VERSION
})
@JvmVersion(8)
public class KafkaClient_2_7_x_IT extends KafkaClientITBase {
public class KafkaClient_2_7_x_IT extends KafkaClient2ITBase {

@Test
public void producerSendTest() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,27 @@
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Properties;

/**
* Copy of https://github.com/chbatey/kafka-unit/blob/master/src/main/java/info/batey/kafka/unit/KafkaUnit.java
* Some codes have been modified for testing from the copied code.
*/
public class KafkaUnitServer {

private static final Logger logger = LogManager.getLogger(KafkaUnitServer.class);
private String zookeeperString;
private String brokerString;
private int zkPort;
private int brokerPort;
private Properties kafkaBrokerConfig;
private int zkMaxConnections;
public class Kafka2UnitServer extends KafkaUnitServer {
private static final Logger logger = LogManager.getLogger(Kafka2UnitServer.class);
private KafkaServerStartable broker;
private ZookeeperUnitServer zookeeper;
private File logDir;

public KafkaUnitServer(int zkPort, int brokerPort) {

public Kafka2UnitServer(int zkPort, int brokerPort) {
this(zkPort, brokerPort, 16);
}

public KafkaUnitServer(int zkPort, int brokerPort, int zkMaxConnections) {
this.kafkaBrokerConfig = new Properties();
this.zkPort = zkPort;
this.brokerPort = brokerPort;
this.zookeeperString = "localhost:" + zkPort;
this.brokerString = "localhost:" + brokerPort;
this.zkMaxConnections = zkMaxConnections;
public Kafka2UnitServer(int zkPort, int brokerPort, int zkMaxConnections) {
super(zkPort, brokerPort, zkMaxConnections);
}

public void startup() {
Expand Down
Loading

0 comments on commit f8c8b0e

Please sign in to comment.