Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reject method detail & limit client total connections #883

Merged
merged 5 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.weibo.api.motan.cluster.support;

import com.weibo.api.motan.closable.ShutDownHook;
import com.weibo.api.motan.cluster.Cluster;
import com.weibo.api.motan.cluster.HaStrategy;
import com.weibo.api.motan.cluster.LoadBalance;
Expand All @@ -33,13 +34,16 @@
import com.weibo.api.motan.rpc.URL;
import com.weibo.api.motan.util.CollectionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanSwitcherUtil;
import com.weibo.api.motan.util.StringTools;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Notify cluster the referers have changed.
Expand All @@ -50,21 +54,43 @@

public class ClusterSupport<T> implements NotifyListener {

private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<String, Protocol>();
private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<>();
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private static Set<ClusterSupport> refreshSet = new HashSet<>();

static {
executorService.scheduleAtFixedRate(() -> {
for (ClusterSupport clusterSupport : refreshSet) {
clusterSupport.refreshReferers();
}
}, MotanConstants.REFRESH_PERIOD, MotanConstants.REFRESH_PERIOD, TimeUnit.SECONDS);

ShutDownHook.registerShutdownHook(() -> {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
});
}

protected ConcurrentHashMap<URL, List<URL>> registryUrlsMap = new ConcurrentHashMap<>();
protected ConcurrentHashMap<URL, List<URL>> registryActiveUrlsMap = new ConcurrentHashMap<>();
private Cluster<T> cluster;
private List<URL> registryUrls;
private URL url;
private Class<T> interfaceClass;
private Protocol protocol;
private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<URL, List<Referer<T>>>();

private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<>();
private int selectNodeCount;

public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
this.registryUrls = registryUrls;
this.interfaceClass = interfaceClass;
String urlStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
this.url = URL.valueOf(urlStr);
protocol = getDecorateProtocol(url.getProtocol());
int maxConnectionCount = this.url.getIntParameter(URLParamType.maxConnectionPerGroup.getName(), URLParamType.maxConnectionPerGroup.getIntValue());
int maxClientConnection = this.url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());
selectNodeCount = maxConnectionCount / maxClientConnection;
}

public void init() {
Expand Down Expand Up @@ -152,7 +178,6 @@ public synchronized void notify(URL registryUrl, List<URL> urls) {
onRegistryEmpty(registryUrl);
LoggerUtil.warn("ClusterSupport config change notify, urls is empty: registry={} service={} urls=[]", registryUrl.getUri(),
url.getIdentity());

return;
}

Expand All @@ -165,8 +190,15 @@ public synchronized void notify(URL registryUrl, List<URL> urls) {
// 判断urls中是否包含权重信息,并通知loadbalance。
processWeights(urls);

List<Referer<T>> newReferers = new ArrayList<Referer<T>>();
for (URL u : urls) {
List<URL> serviceUrls = urls;
if (selectNodeCount > 0 && MotanSwitcherUtil.switcherIsOpenWithDefault("feature.motan.partial.server", true)) {
refreshSet.add(this);
serviceUrls = selectUrls(registryUrl, urls);
} else {
refreshSet.remove(this);
}
List<Referer<T>> newReferers = new ArrayList<>();
for (URL u : serviceUrls) {
if (!u.canServe(url)) {
continue;
}
Expand All @@ -192,6 +224,98 @@ public synchronized void notify(URL registryUrl, List<URL> urls) {
refreshCluster();
}

protected List<URL> selectUrls(URL registryUrl, List<URL> urls) {
Map<String, List<URL>> groupUrlsMap = new HashMap<>();
for (URL u : urls) {
String group = u.getGroup();
if (!groupUrlsMap.containsKey(group)) {
groupUrlsMap.put(group, new ArrayList<URL>());
}
if (u.canServe(url)) {
groupUrlsMap.get(group).add(u);
}
}
List<URL> result = new ArrayList<>();
for (Map.Entry<String, List<URL>> entry : groupUrlsMap.entrySet()) {
result.addAll(selectUrlsByGroup(registryUrl, entry.getKey(), entry.getValue()));
}
registryUrlsMap.put(registryUrl, urls);
registryActiveUrlsMap.put(registryUrl, result);
return result;
}

protected List<URL> selectUrlsByGroup(URL registryUrl, String group, List<URL> notifyUrls) {
if (notifyUrls.size() <= selectNodeCount) {
LoggerUtil.info("ClusterSupport config change notify: registry={} service={} group={} size={} non increased",
registryUrl.getUri(), url.getIdentity(), group, notifyUrls.size());
return notifyUrls;
}

List<URL> result = new ArrayList<>();
List<URL> activeUrls = registryActiveUrlsMap.get(registryUrl);
if (activeUrls == null) {
activeUrls = new ArrayList<>();
}
List<URL> lastNotifyUrls = registryUrlsMap.get(registryUrl);
if (lastNotifyUrls == null) {
lastNotifyUrls = new ArrayList<>();
}

List<URL> sameUrls = new ArrayList<>(notifyUrls);
sameUrls.retainAll(activeUrls);
Collections.shuffle(sameUrls);
List<URL> addedUrls = new ArrayList<>(notifyUrls);
addedUrls.removeAll(lastNotifyUrls);
Collections.shuffle(addedUrls);

List<URL> groupUrls = new ArrayList<>();
// 计算重用节点数量
int newCount = Math.round((float) selectNodeCount * addedUrls.size() / notifyUrls.size());
int remainCount = selectNodeCount - newCount;
// 至少三分之一的节点参与随机选择
remainCount = Math.min(remainCount, 2 * selectNodeCount / 3);
if (sameUrls.size() > remainCount) {
groupUrls.addAll(sameUrls.subList(0, remainCount));
groupUrls.addAll(addedUrls.subList(0, newCount));
groupUrls.addAll(sameUrls.subList(remainCount, sameUrls.size()));
} else {
groupUrls.addAll(sameUrls);
groupUrls.addAll(addedUrls.subList(0, newCount));
}
if (groupUrls.size() < selectNodeCount) {
List<URL> oUrls = new ArrayList<>(notifyUrls);
oUrls.removeAll(groupUrls);
Collections.shuffle(oUrls);
groupUrls.addAll(oUrls);
}

if (groupUrls.size() >= selectNodeCount) {
result.addAll(groupUrls.subList(0, selectNodeCount));
} else {
result.addAll(groupUrls);
}
LoggerUtil.info("ClusterSupport config change notify: registry={} service={} group={} size={} urls={}",
registryUrl.getUri(), url.getIdentity(), group, result.size(), getIdentities(result));
return result;
}

public void refreshReferers() {
for (Map.Entry<URL, List<Referer<T>>> entry : registryReferers.entrySet()) {
URL registryUrl = entry.getKey();
LoggerUtil.info("ClusterSupport refreshReferers: registry={} service={}", registryUrl.getUri(), url.getIdentity());
int available = 0;
for (Referer<T> referer : entry.getValue()) {
if (referer.isAvailable()) {
available++;
}
}
List<URL> urls = registryUrlsMap.get(registryUrl);
if (urls.size() > selectNodeCount && available < selectNodeCount) {
notify(registryUrl, urls);
}
}
}

/**
* 检查urls中的第一个url是否为权重信息。 如果是权重信息则把权重信息传递给loadbalance,并移除权重url。
*
Expand Down Expand Up @@ -259,7 +383,7 @@ private void mergeClientConfigs(URL refererURL) {
}

private void refreshCluster() {
List<Referer<T>> referers = new ArrayList<Referer<T>>();
List<Referer<T>> referers = new ArrayList<>();
for (List<Referer<T>> refs : registryReferers.values()) {
referers.addAll(refs);
}
Expand Down Expand Up @@ -307,7 +431,7 @@ private void prepareCluster() {

private List<URL> parseDirectUrls(String directUrlStr) {
String[] durlArr = MotanConstants.COMMA_SPLIT_PATTERN.split(directUrlStr);
List<URL> directUrls = new ArrayList<URL>();
List<URL> directUrls = new ArrayList<>();
for (String dus : durlArr) {
URL du = URL.valueOf(StringTools.urlDecode(dus));
if (du != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class MotanConstants {
public static final String DEFAULT_CHARACTER = "utf-8";
public static final int SLOW_COST = 50; // 50ms
public static final int STATISTIC_PEROID = 30; // 30 seconds
public static final int REFRESH_PERIOD = 60;
public static final String ASYNC_SUFFIX = "Async";// suffix for async call.
public static final String APPLICATION_STATISTIC = "statisitic";
public static final String REQUEST_REMOTE_ADDR = "requestRemoteAddress";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,65 @@ public enum URLParamType {
requestTimeout("requestTimeout", 200),
/** request id from http interface **/
requestIdFromClient("requestIdFromClient", 0),
/** connect timeout **/
/**
* connect timeout
**/
connectTimeout("connectTimeout", 1000),
/** service min worker threads **/
/**
* service min worker threads
**/
minWorkerThread("minWorkerThread", 20),
/** service max worker threads **/
/**
* service max worker threads
**/
maxWorkerThread("maxWorkerThread", 200),
/** pool min conn number **/
/**
* pool min conn number
**/
minClientConnection("minClientConnection", 2),
/** pool max conn number **/
/**
* pool max conn number
**/
maxClientConnection("maxClientConnection", 10),
/** pool max conn number **/
maxConnectionPerGroup("maxConnectionPerGroup", 0),
/**
* pool max conn number
**/
maxContentLength("maxContentLength", 10 * 1024 * 1024),
/** max server conn (all clients conn) **/
/**
* max server conn (all clients conn)
**/
maxServerConnection("maxServerConnection", 100000),
/** pool conn manger stragy **/
/**
* pool conn manger stragy
**/
poolLifo("poolLifo", true),

lazyInit("lazyInit", false),
/** multi referer share the same channel **/
/**
* multi referer share the same channel
**/
shareChannel("shareChannel", false),
asyncInitConnection("asyncInitConnection", false),
fusingThreshold("fusingThreshold", 10),

/************************** SPI start ******************************/

/** serialize **/
/**
* serialize
**/
serialize("serialization", "hessian2"),
/** codec **/
/**
* codec
**/
codec("codec", "motan"),
/** endpointFactory **/
/**
* endpointFactory
**/
endpointFactory("endpointFactory", "motan"),
/** heartbeatFactory **/
/**
* heartbeatFactory
**/
heartbeatFactory("heartbeatFactory", "motan"),
/** switcherService **/
switcherService("switcherService", "localSwitcherService"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.weibo.api.motan.config;

import java.util.Map;

import com.weibo.api.motan.config.annotation.ConfigDesc;

import java.util.Map;

/**
*
* protocol
Expand Down Expand Up @@ -48,6 +48,7 @@ public class ProtocolConfig extends AbstractConfig {
protected Integer minClientConnection;
// client最大连接数
protected Integer maxClientConnection;
protected Integer maxConnectionPerGroup;
// 最小工作pool线程数
protected Integer minWorkerThread;
// 最大工作pool线程数
Expand Down Expand Up @@ -164,6 +165,14 @@ public void setMaxClientConnection(Integer maxClientConnection) {
this.maxClientConnection = maxClientConnection;
}

public Integer getMaxConnectionPerGroup() {
return maxConnectionPerGroup;
}

public void setMaxConnectionPerGroup(Integer maxConnectionPerGroup) {
this.maxConnectionPerGroup = maxConnectionPerGroup;
}

public Integer getMinWorkerThread() {
return minWorkerThread;
}
Expand Down
Loading