Skip to content

Commit

Permalink
support mesh proxy by env var
Browse files Browse the repository at this point in the history
  • Loading branch information
rayzhang0603 committed Jul 21, 2022
1 parent 01d694a commit 9a8684e
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@
import com.weibo.api.motan.registry.RegistryFactory;
import com.weibo.api.motan.rpc.*;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MeshProxyUtil;
import org.apache.commons.lang3.StringUtils;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
Expand All @@ -48,7 +52,7 @@ public class SimpleConfigHandler implements ConfigHandler {

@Override
public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls, URL refUrl) {
ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls, refUrl);
ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, MeshProxyUtil.processMeshProxy(registryUrls, refUrl, false), refUrl);
clusterSupport.init();

return clusterSupport;
Expand All @@ -72,7 +76,7 @@ public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registry
Exporter<T> exporter = protocol.export(provider, serviceUrl);

// register service
register(registryUrls, serviceUrl);
register(MeshProxyUtil.processMeshProxy(registryUrls, serviceUrl, true), serviceUrl);

return exporter;
}
Expand Down Expand Up @@ -125,4 +129,5 @@ private void unRegister(Collection<URL> registryUrls, URL serviceUrl) {
}
}


}
191 changes: 191 additions & 0 deletions motan-core/src/main/java/com/weibo/api/motan/util/MeshProxyUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
*
* Copyright 2009-2022 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.weibo.api.motan.util;

import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.ExtensionLoader;
import com.weibo.api.motan.registry.RegistryFactory;
import com.weibo.api.motan.registry.RegistryService;
import com.weibo.api.motan.rpc.URL;
import org.apache.commons.lang3.StringUtils;

import java.util.*;

/**
* @author zhanglei28
* @date 2022/7/14.
*/
public class MeshProxyUtil {
public static final String MESH_PROXY_ENV_NAME = "MOTAN_MESH_PROXY"; //使用mesh代理motan请求的环境变量名

// config keys
private static final String MODE_KEY = "mode"; // proxy type key
private static final String PORT_KEY = "port"; // mesh transport port for client end
private static final String IP_KEY = "ip"; // mesh management port

// config values
private static final String MODE_SERVER = "server"; // 代理server侧流量
private static final String MODE_CLIENT = "client"; // 代理client侧流量
private static final String MODE_ALL = "all"; // 代理双端流量
private static final String DEFAULT_PORT = "9981"; // 默认mesh正向代理端口
private static final String DEFAULT_IP = "localhost"; // 默认mesh ip

private static final String MESH_REGISTRY_NAME = "weibomesh";
private static final Set<String> NOT_PROCESS_REGISTRY_PROTOCOLS = new HashSet<>(Arrays.asList("local", "direct", MESH_REGISTRY_NAME));
private static Boolean initChecked; // 是否可以进行mesh proxy
private static Map<String, String> proxyConfig;

static {
initCheck();
}

/**
* 如果通过环境变量配置了使用Mesh进行代理,则通过把registry转换为MeshRegistry的方式实现服务的代理
*
* @param originRegistryUrls 原始注册中心urls
* @param serviceUrl 具体的rpc服务。
* @param isServerEnd 是否是服务端使用的场景。环境变量可以控制是对client端流量代理,还是server端流量代理,或者全部代理。
* @return 如果配置了环境变量则进行代理,否则原样返回
*/
public static List<URL> processMeshProxy(List<URL> originRegistryUrls, URL serviceUrl, boolean isServerEnd) {
if (initChecked && needProcess(serviceUrl, isServerEnd)) {
try {
List<URL> newRegistryUrls = new ArrayList<>(originRegistryUrls.size());
for (URL url : originRegistryUrls) {
if (NOT_PROCESS_REGISTRY_PROTOCOLS.contains(url.getProtocol())) {
newRegistryUrls.add(url); // 使用原始注册中心
LoggerUtil.info("mesh proxy ignore url:" + serviceUrl.toSimpleString()
+ ", registry: " + url.toSimpleString());
} else {
URL meshRegistryUrl = buildMeshRegistryUrl(url);
newRegistryUrls.add(meshRegistryUrl);
LoggerUtil.info("build mesh proxy registry for url:" + serviceUrl.toSimpleString()
+ ", origin registry:" + url.toSimpleString()
+ ", mesh registry url:" + meshRegistryUrl.toFullStr());
}
}
return newRegistryUrls;
} catch (Exception e) {
LoggerUtil.error("proxy motan fail", e);
}
}
return originRegistryUrls;
}

private static boolean needProcess(URL serviceUrl, boolean isServerEnd) {
// check proxy mode
String mode = proxyConfig.get(MODE_KEY);
if (StringUtils.isBlank(mode)) {// 必须显示指定,不考虑提供默认值
return false;
}
if (!MODE_ALL.equals(mode) && !MODE_SERVER.equals(mode) && !MODE_CLIENT.equals(mode)) {
return false; // 未识别模式不进行处理
}
if (MODE_CLIENT.equals(mode) && isServerEnd) {// client模式下,server端不进行处理
return false;
}
if (MODE_SERVER.equals(mode) && !isServerEnd) {// server模式下,client端不进行处理
return false;
}
if (!"motan2".equals(serviceUrl.getProtocol())) {// 非motan2 协议不进行处理。 TODO 后续支持motan1协议后在进行调整
return false;
}
return true;
}

/**
* 解析mesh proxy 环境变量中的配置
*
* @param meshProxyString 配置字符串格式 "key:value,key:value", 其中value会进行url decode。 例如:"type:server,mport:8002,port:9981"
* @return 解析后的配置
*/
private static Map<String, String> parseProxyConfig(String meshProxyString) {
Map<String, String> proxyConfig = new HashMap<>();
String[] items = meshProxyString.split(",");
for (String item : items) {
String[] values = item.split(":");
if (StringUtils.isNotBlank(values[0])) {// key not empty
String k = values[0].trim();
String v = "";
if (values.length > 1 && StringUtils.isNotBlank(values[1])) {
v = StringTools.urlDecode(values[1].trim());
}
proxyConfig.put(k, v);
LoggerUtil.info("add mesh proxy param: " + k + ":" + v);
}
}
return proxyConfig;
}

private static URL buildMeshRegistryUrl(URL proxyRegistry) {
URL meshRegistryUrl = new URL(MESH_REGISTRY_NAME,
getValue(proxyConfig, IP_KEY, DEFAULT_IP),
Integer.parseInt(getValue(proxyConfig, PORT_KEY, DEFAULT_PORT)),
RegistryService.class.getName()
);
Map<String, String> params = new HashMap<>(proxyConfig);
// put necessary keys
params.put(URLParamType.dynamic.getName(), "true");
params.put(URLParamType.proxyRegistryUrlString.getName(), StringTools.urlEncode(proxyRegistry.toFullStr()));
meshRegistryUrl.addParameters(params);
return meshRegistryUrl;
}

private static String getValue(Map<String, String> configs, String key, String defaultValue) {
String value = configs.get(key);
return StringUtils.isNotBlank(value) ? value : defaultValue;
}

// 检查是否支持mesh proxy
private static void initCheck() {
// check env set
String meshProxyString = System.getenv(MESH_PROXY_ENV_NAME);
if (StringUtils.isNotBlank(meshProxyString)) {
LoggerUtil.info("find MOTAN_MESH_PROXY evn, value:" + meshProxyString);
proxyConfig = parseProxyConfig(meshProxyString);
// check MeshRegistry extension
RegistryFactory meshRegistryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(MESH_REGISTRY_NAME, false);
if (meshRegistryFactory != null) {
initChecked = true;
LoggerUtil.info("mesh proxy init check passed");
return;
} else {
LoggerUtil.error("can not proxy motan, because MeshRegistry extension not found, maybe the dependency of 'motan-registry-weibomesh' not set in pom");
}
}
initChecked = false;
}

// ---- only for test ----
protected static void reset() {
proxyConfig = null;
initCheck();
}

protected static Map<String, String> getProxyConfig() {
return proxyConfig;
}

protected static boolean setInitChecked(boolean value) {
boolean oldValue = initChecked;
initChecked = value;
return oldValue;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
*
* Copyright 2009-2022 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.weibo.api.motan.util;

import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.rpc.URL;
import org.junit.After;
import org.junit.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.*;

/**
* @author zhanglei28
* @date 2022/7/19.
*/
public class MeshProxyUtilTest {
String mode = "server";
String mport = "8803"; // not default value
String port = "9983"; // not default value
String agentFilters = " af_accessLog%2Caf_metrics "; // with space
String encodedValue = "2132%3Aer93%3A3e987%3Adfje%2F%2Fjdife"; // a URLEncoded value for known key

@After
public void tearDown() throws Exception {
getModifiableEnvironment().remove(MeshProxyUtil.MESH_PROXY_ENV_NAME);
}

@Test
public void processMeshProxy() throws Exception {
// not proxy if env not set
List<URL> originRegistryUrls = new ArrayList<>();
originRegistryUrls.add(new URL("zookeeper", "localhost", 2181, "testZkRegistry"));
originRegistryUrls.add(new URL("local", "localhost", 0, "testLocalRegistry"));
originRegistryUrls.add(new URL("direct", "localhost", 8802, "testDirectRegistry"));
originRegistryUrls.add(new URL("weibomesh", "localhost", 9988, "testMeshRegistry"));
URL serviceUrl = new URL("motan2", "localhost", 8802, "testService");
List<URL> resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, true);

assertNull(MeshProxyUtil.getProxyConfig()); // not init
check(originRegistryUrls, resultUrl, false, null);

// env with minimal param
getModifiableEnvironment().put(MeshProxyUtil.MESH_PROXY_ENV_NAME, "mode:" + mode); // minimal key as default
MeshProxyUtil.reset();
assertEquals(mode, MeshProxyUtil.getProxyConfig().get("mode")); // check init proxy config
assertFalse(MeshProxyUtil.setInitChecked(true)); // initChecked is false because not have MeshRegistry extension. so set initChecked true for unit test

// check default port
resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, true);
check(originRegistryUrls, resultUrl, true, null); // proxy server url
assertEquals(9981, resultUrl.get(0).getPort().intValue()); // default port

// check mode
resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, true);
Map<String, String> proxiedParams = new HashMap<>();
proxiedParams.put("mode", mode);
proxiedParams.put(URLParamType.dynamic.getName(), "true");
proxiedParams.put(URLParamType.proxyRegistryUrlString.getName(), StringTools.urlEncode(originRegistryUrls.get(0).toFullStr()));

check(originRegistryUrls, resultUrl, true, proxiedParams); // proxy server url
resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, false);
check(originRegistryUrls, resultUrl, false, null); // not proxy client url in server mode

mode = "client";
getModifiableEnvironment().put(MeshProxyUtil.MESH_PROXY_ENV_NAME, "mode:" + mode); // minimal key as default
MeshProxyUtil.reset();
MeshProxyUtil.setInitChecked(true);
assertEquals(mode, MeshProxyUtil.getProxyConfig().get("mode"));
resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, false);
proxiedParams.put("mode", mode);
check(originRegistryUrls, resultUrl, true, proxiedParams); // proxy client url
resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, true);
check(originRegistryUrls, resultUrl, false, null); // not proxy server url in client mode

mode = "all";
getModifiableEnvironment().put(MeshProxyUtil.MESH_PROXY_ENV_NAME, "mode:" + mode); // minimal key as default
MeshProxyUtil.reset();
MeshProxyUtil.setInitChecked(true);
assertEquals(mode, MeshProxyUtil.getProxyConfig().get("mode"));
resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, false);
proxiedParams.put("mode", mode);
check(originRegistryUrls, resultUrl, true, proxiedParams); // proxy client url
resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, true);
check(originRegistryUrls, resultUrl, true, proxiedParams); // proxy server url

// env with more params
getModifiableEnvironment().put(MeshProxyUtil.MESH_PROXY_ENV_NAME, "mode:" + mode + ",mport:" + mport + ",port:" + port + ",test:" + encodedValue + ", filter:" + agentFilters); // minimal key as default
MeshProxyUtil.reset();
MeshProxyUtil.setInitChecked(true);
assertEquals(mode, MeshProxyUtil.getProxyConfig().get("mode"));
assertEquals(mport, MeshProxyUtil.getProxyConfig().get("mport"));
assertEquals(port, MeshProxyUtil.getProxyConfig().get("port"));
assertEquals(StringTools.urlDecode(agentFilters.trim()), MeshProxyUtil.getProxyConfig().get("filter")); // with trim
assertEquals(StringTools.urlDecode(encodedValue), MeshProxyUtil.getProxyConfig().get("test")); // url decode with value

resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, true);
proxiedParams.put("filter", MeshProxyUtil.getProxyConfig().get("filter"));
proxiedParams.put("test", MeshProxyUtil.getProxyConfig().get("test"));
proxiedParams.put("mport", MeshProxyUtil.getProxyConfig().get("mport"));
check(originRegistryUrls, resultUrl, true, proxiedParams);

// check not motan2 protocol. //TODO remove this test if motan protocol is supported
serviceUrl.setProtocol("motan");
resultUrl = MeshProxyUtil.processMeshProxy(originRegistryUrls, serviceUrl, true);
check(originRegistryUrls, resultUrl, false, null);
}

private void check(List<URL> originRegistryUrls, List<URL> resultRegistryUrls, boolean isProxied, Map<String, String> proxiedParams) {
assertEquals(originRegistryUrls.size(), resultRegistryUrls.size()); // registry size is same
for (int i = 0; i < resultRegistryUrls.size(); i++) {
if (isProxied) {
String orgProtocol = originRegistryUrls.get(i).getProtocol();
String newProtocol = resultRegistryUrls.get(i).getProtocol();
if ("local".equals(orgProtocol) || "direct".equals(orgProtocol) || "weibomesh".equals(orgProtocol)) {
// not proxy if registry protocol is local or direct or weibomesh
assertEquals(originRegistryUrls.get(i), resultRegistryUrls.get(i));
} else {
assertEquals("weibomesh", newProtocol);
if (proxiedParams != null) {
for (Map.Entry<String, String> entry : proxiedParams.entrySet()) {
assertEquals(entry.getValue(), resultRegistryUrls.get(i).getParameter(entry.getKey()));
}
}
}
} else { // registry will not modify if not proxy
assertEquals(originRegistryUrls.get(i), resultRegistryUrls.get(i));
}
}
}

private static Map<String, String> getModifiableEnvironment() throws Exception {
Class<?> pe = Class.forName("java.lang.ProcessEnvironment");
Method getenv = pe.getDeclaredMethod("getenv");
getenv.setAccessible(true);
Object unmodifiableEnvironment = getenv.invoke(null);
Class<?> map = Class.forName("java.util.Collections$UnmodifiableMap");
Field m = map.getDeclaredField("m");
m.setAccessible(true);
return (Map<String, String>) m.get(unmodifiableEnvironment);
}

}
Loading

0 comments on commit 9a8684e

Please sign in to comment.