Skip to content

Commit

Permalink
[pinpoint-apm#12071] Update reactor subscriber.subscribeOn
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Feb 21, 2025
1 parent e2fcf91 commit 401b0f1
Show file tree
Hide file tree
Showing 79 changed files with 1,773 additions and 2,948 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ String welcome() {

@GetMapping("/parallelFlux/runOn")
public Mono<String> parallelFluxRunOn() {
Flux.range(1, 10)
Flux.range(1, 1)
.parallel(2)
.runOn(Schedulers.parallel())
.map(i -> {
return call();
})
.subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

return Mono.just("OK");
Expand All @@ -84,11 +87,12 @@ public Mono<String> fluxPublishOn() {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
.range(1, 2)
.range(1, 1)
.map(i -> 10 + i)
.publishOn(s)
.map(i -> "value " + i);

.map(i -> {
return call();
});
flux.subscribe(System.out::println);

return Mono.just("OK");
Expand All @@ -99,8 +103,9 @@ public Mono<String> fluxSubscribeOn() {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.range(1, 1).map(i -> {
return call();
})
.subscribeOn(s)
.map(i -> "value " + i);

Expand All @@ -111,35 +116,27 @@ public Mono<String> fluxSubscribeOn() {

@GetMapping("/mono/publishOn")
public Mono<String> monoPublishOn() {
Flux<Integer> test = Flux
.just(0, 1)
.hide()
.flatMap(f -> Mono.just(f).publishOn(Schedulers.parallel()).map(i -> 1 / i));
test.subscribe(System.out::println);
Mono.just("test").publishOn(Schedulers.parallel()).map(i -> {
return call();
}).subscribe(System.out::println);

return Mono.just("OK");
}

@GetMapping("/mono/subscribeOn")
public Mono<String> monoSubscribeOn() {
Flux<Integer> test = Flux.fromIterable(Arrays.asList("A"))
.flatMap(w -> Mono.fromCallable(() -> Arrays.asList(1, 2))
.subscribeOn(Schedulers.parallel())
.flatMapMany(Flux::fromIterable));
test.subscribe(System.out::println);
Mono.fromCallable(() -> Arrays.asList(1, 2))
.subscribeOn(Schedulers.parallel()).map(i -> {
return call();
}).subscribe(System.out::println);
return Mono.just("OK");
}

@GetMapping("/mono/delay")
public Mono<String> monoDelay() {
System.out.println("MAIN thread=" + Thread.currentThread().getName());
return Mono.delay(Duration.ofMillis(100L)).map(aLong -> {
System.out.println("DELAY thread=" + Thread.currentThread().getName());
WebClient client = WebClient.create("http://naver.com");
WebClient.ResponseSpec response = client.method(HttpMethod.GET)
.uri("").retrieve();
Mono<String> body = response.bodyToMono(String.class);
return body.block();
return call();
});
}

Expand All @@ -148,63 +145,39 @@ public Mono<String> monoDelayElement() {
System.out.println("MAIN thread=" + Thread.currentThread().getName());
return Mono.just("Hello").delayElement(Duration.ofMillis(100L)).map(o -> {
System.out.println("DELAY thread=" + Thread.currentThread().getName());
WebClient client = WebClient.create("http://naver.com");
WebClient.ResponseSpec response = client.method(HttpMethod.GET)
.uri("").retrieve();
Mono<String> body = response.bodyToMono(String.class);
return body.block();
return call();
});
}

@GetMapping("/mono/delaySubscription")
public Mono<String> monoDelaySubscription() {
System.out.println("MAIN thread=" + Thread.currentThread().getName());
return Mono.just("Hello").delaySubscription(Duration.ofMillis(100L)).map(o -> {
System.out.println("DELAY thread=" + Thread.currentThread().getName());
WebClient client = WebClient.create("http://naver.com");
WebClient.ResponseSpec response = client.method(HttpMethod.GET)
.uri("").retrieve();
Mono<String> body = response.bodyToMono(String.class);
return body.block();
return call();
});
}

@GetMapping("/mono/take")
public Mono<String> monoTake() {
System.out.println("MAIN thread=" + Thread.currentThread().getName());
return Mono.just("Hello").take(Duration.ofMillis(100L)).map(o -> {
System.out.println("TAKE thread=" + Thread.currentThread().getName());
WebClient client = WebClient.create("http://naver.com");
WebClient.ResponseSpec response = client.method(HttpMethod.GET)
.uri("").retrieve();
Mono<String> body = response.bodyToMono(String.class);
return body.block();
return call();
});
}

@GetMapping("/flux/interval")
public Flux<String> fluxInterval() {
System.out.println("MAIN thread=" + Thread.currentThread().getName());
return Flux.interval(Duration.ofMillis(100L)).take(3).map(o -> {
System.out.println("INTERVAL thread=" + Thread.currentThread().getName());
WebClient client = WebClient.create("http://naver.com");
WebClient.ResponseSpec response = client.method(HttpMethod.GET)
.uri("").retrieve();
Mono<String> body = response.bodyToMono(String.class);
return body.block();
return call();
});
}

@GetMapping("/flux/buffer")
public Flux<String> fluxBuffer() {
System.out.println(Thread.currentThread().getName());
return Flux.just(1, 2, 3).delayElements(Duration.ofMillis(100L)).take(3).map(o -> {
System.out.println(Thread.currentThread().getName());
WebClient client = WebClient.create("http://naver.com");
WebClient.ResponseSpec response = client.method(HttpMethod.GET)
.uri("").retrieve();
Mono<String> body = response.bodyToMono(String.class);
return body.block();
return call();
});
}

Expand Down Expand Up @@ -233,7 +206,7 @@ public Mono<String> monoSubscribeReturn() {
@GetMapping("/flux/cancelOn")
public Mono<String> fluxCancelOn() {
System.out.println(Thread.currentThread().getName());
WebClient client = WebClient.create("http://naver.com");
WebClient client = WebClient.create("http://httpbin.org");
Mono<String> callback = client.method(HttpMethod.GET)
.uri("").retrieve().bodyToMono(String.class);

Expand Down Expand Up @@ -331,4 +304,12 @@ private Mono<String> fallback() {
.uri("").retrieve();
return response.bodyToMono(String.class);
}

private String call() {
WebClient client = WebClient.create("http://httpbin.org");
WebClient.ResponseSpec response = client.method(HttpMethod.GET)
.uri("").retrieve();
Mono<String> body = response.bodyToMono(String.class);
return body.block();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<groupId>com.navercorp.pinpoint</groupId>
<artifactId>pinpoint-agent-testweb-commons</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.result.method.RequestMappingInfo;
import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
Expand Down Expand Up @@ -121,4 +122,12 @@ public Mono<String> reactiveGet() {

return reactiveStringRedisTemplate.opsForValue().get("foo");
}

@GetMapping("/reactive/set")
public Flux<Boolean> reactiveSet() {
return Flux.just("foo", "bar")
.zipWith(Flux.just("bar", "baz"))
.flatMap(tuple -> reactiveStringRedisTemplate.opsForValue().set(tuple.getT1(), tuple.getT2()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
Expand All @@ -34,6 +33,7 @@ public class RedisTemplateConfig {
private static final int PORT1 = 18001;
private static final int PORT2 = 18002;
private static final int PORT3 = 18003;
private static final int PORT = 32785;

@Bean
public StringRedisTemplate redisTemplate() {
Expand All @@ -44,11 +44,7 @@ public StringRedisTemplate redisTemplate() {

@Bean
public RedisConnectionFactory redisConnectionFactory() {
RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration();
clusterConfiguration.addClusterNode(new RedisNode(HOST1, PORT1));
clusterConfiguration.addClusterNode(new RedisNode(HOST2, PORT2));
clusterConfiguration.addClusterNode(new RedisNode(HOST3, PORT3));
return new LettuceConnectionFactory(clusterConfiguration);
return new LettuceConnectionFactory(new RedisStandaloneConfiguration(HOST1, PORT));
}

@Bean
Expand All @@ -59,10 +55,6 @@ public ReactiveStringRedisTemplate reactiveStringRedisTemplate() {

@Bean
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration();
clusterConfiguration.addClusterNode(new RedisNode(HOST1, PORT1));
clusterConfiguration.addClusterNode(new RedisNode(HOST2, PORT2));
clusterConfiguration.addClusterNode(new RedisNode(HOST3, PORT3));
return new LettuceConnectionFactory(clusterConfiguration);
return new LettuceConnectionFactory(new RedisStandaloneConfiguration(HOST1, PORT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pinpoint.test.plugin;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Properties;

class RedisServerTest {
private static GenericContainer<?> redisServer;

@BeforeAll
public static void beforeClass() {
Assumptions.assumeTrue(DockerClientFactory.instance().isDockerAvailable(), "Docker not enabled");

redisServer = new GenericContainer<>(DockerImageName.parse("redis:5.0.14-alpine"))
.withExposedPorts(6379);
redisServer.start();

Properties properties = new Properties();
properties.setProperty("HOST", redisServer.getHost());
properties.setProperty("PORT", String.valueOf(redisServer.getMappedPort(6379)));
System.out.println(properties);
}

@AfterAll
public static void afterAll() {
if (redisServer != null) {
redisServer.close();
}
}

@Test
public void test() throws Exception {
System.out.println("TEST");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public MysqlR2dbcDatabase() {
System.out.println("INIT");
MySqlConnectionConfiguration connectionConfiguration = MySqlConnectionConfiguration.builder()
.host("localhost")
.port(32789)
.port(32783)
.database("test")
.user("root")
.password("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ public void before(Object target, int apiId, Object[] args) {

// try entry API.
if (Boolean.FALSE == tryEnter(trace)) {
// skip nested API.
if (isDebug) {
logger.debug("Skip nested async API.before(), traceScopeName={}", traceScopeName);
logger.debug("Skip nested async API.before(), trace={}, traceScopeName={}", trace, traceScopeName);
}
return;
}
Expand Down Expand Up @@ -86,9 +85,6 @@ public void after(Object target, int apiId, Object[] args, Object result, Throwa

final AsyncContext asyncContext = getAsyncContext(target, args, result, throwable);
if (asyncContext == null) {
if (isTrace) {
logger.trace("AsyncContext not found");
}
return;
}

Expand All @@ -98,9 +94,8 @@ public void after(Object target, int apiId, Object[] args, Object result, Throwa
}

if (Boolean.FALSE == canLeave(trace)) {
// skip nested API.
if (isDebug) {
logger.debug("Skip nested async API.after(), traceScopeName={}", traceScopeName);
logger.debug("Skip nested async API.after(), trace={}, traceScopeName={}", trace, traceScopeName);
}
return;
}
Expand Down Expand Up @@ -143,7 +138,11 @@ public boolean tryEnter(final Trace trace) {
scope = trace.getScope(traceScopeName);
}
if (scope != null) {
return scope.tryEnter();
boolean result = scope.tryEnter();
return result;
}
if (isDebug) {
logger.debug("Skip to enter, not found scope, scopeName={}", traceScopeName);
}
return false;
}
Expand All @@ -156,6 +155,7 @@ public boolean canLeave(final Trace trace) {
return true;
}
}

return false;
}
}
Loading

0 comments on commit 401b0f1

Please sign in to comment.