Skip to content

Commit

Permalink
reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sunnights committed Jul 3, 2019
1 parent af4de40 commit 7fbbf69
Show file tree
Hide file tree
Showing 14 changed files with 1,556 additions and 0 deletions.
27 changes: 27 additions & 0 deletions motan-transport-reactor-netty4/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>motan</artifactId>
<groupId>com.weibo</groupId>
<version>1.1.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>motan-transport-netty4-reactive</artifactId>

<dependencies>
<dependency>
<groupId>com.weibo</groupId>
<artifactId>motan-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.weibo.api.motan.transport.netty4;

import com.weibo.api.motan.codec.Codec;
import com.weibo.api.motan.common.MotanConstants;
import com.weibo.api.motan.exception.MotanErrorMsgConstant;
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.protocol.v2motan.MotanV2Codec;
import com.weibo.api.motan.rpc.DefaultResponse;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.rpc.Response;
import com.weibo.api.motan.transport.Channel;
import com.weibo.api.motan.util.ByteUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanFrameworkUtil;

import java.io.IOException;

/**
* @author sunnights
*/
public class CodecUtil {

public static byte[] encodeObjectToBytes(Channel channel, Codec codec, Object msg) {
try {
if (codec instanceof MotanV2Codec) {
return encodeV2(channel, codec, msg);
} else {
return encodeV1(channel, codec, msg);
}
} catch (IOException e) {
throw new MotanFrameworkException("encode error: isResponse=" + (msg instanceof Response), e, MotanErrorMsgConstant.FRAMEWORK_ENCODE_ERROR);
}
}

private static byte[] encodeV2(Channel channel, Codec codec, Object msg) throws IOException {
return encodeMessage(channel, codec, msg);
}

private static byte[] encodeV1(Channel channel, Codec codec, Object msg) throws IOException {
long requestId = getRequestId(msg);
byte[] data = encodeMessage(channel, codec, msg);
byte[] result = new byte[MotanConstants.NETTY_HEADER + data.length];
ByteUtil.short2bytes(MotanConstants.NETTY_MAGIC_TYPE, result, 0);
result[3] = getType(msg);
ByteUtil.long2bytes(requestId, result, 4);
ByteUtil.int2bytes(data.length, result, 12);
System.arraycopy(data, 0, result, MotanConstants.NETTY_HEADER, data.length);
return result;
}

private static byte[] encodeMessage(Channel channel, Codec codec, Object msg) throws IOException {
byte[] data;
if (msg instanceof Response) {
try {
data = codec.encode(channel, msg);
} catch (Exception e) {
LoggerUtil.error("NettyEncoder encode error, identity=" + channel.getUrl().getIdentity(), e);
long requestId = getRequestId(msg);
Response response = buildExceptionResponse(requestId, e);
data = codec.encode(channel, response);
}
} else {
data = codec.encode(channel, msg);
}
if (msg instanceof Request) {
MotanFrameworkUtil.logEvent((Request) msg, MotanConstants.TRACE_CENCODE);
} else if (msg instanceof Response) {
MotanFrameworkUtil.logEvent((Response) msg, MotanConstants.TRACE_SENCODE);
}
return data;
}

private static long getRequestId(Object message) {
if (message instanceof Request) {
return ((Request) message).getRequestId();
} else if (message instanceof Response) {
return ((Response) message).getRequestId();
} else {
return 0;
}
}

private static Response buildExceptionResponse(long requestId, Exception e) {
DefaultResponse response = new DefaultResponse();
response.setRequestId(requestId);
response.setException(e);
return response;
}

private static byte getType(Object message) {
if (message instanceof Request) {
return MotanConstants.FLAG_REQUEST;
} else if (message instanceof Response) {
return MotanConstants.FLAG_RESPONSE;
} else {
return MotanConstants.FLAG_OTHER;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package com.weibo.api.motan.transport.netty4;

import com.weibo.api.motan.codec.Codec;
import com.weibo.api.motan.common.ChannelState;
import com.weibo.api.motan.common.MotanConstants;
import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.ExtensionLoader;
import com.weibo.api.motan.exception.MotanErrorMsgConstant;
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.exception.MotanServiceException;
import com.weibo.api.motan.rpc.*;
import com.weibo.api.motan.transport.Channel;
import com.weibo.api.motan.transport.TransportException;
import com.weibo.api.motan.util.ExceptionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanFrameworkUtil;
import io.netty.channel.ChannelFuture;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author sunnights
*/
public class NettyChannel implements Channel {
private volatile ChannelState state = ChannelState.UNINIT;
private NettyClient nettyClient;
private io.netty.channel.Channel channel = null;
private InetSocketAddress remoteAddress = null;
private InetSocketAddress localAddress = null;
private ReentrantLock lock = new ReentrantLock();
private Codec codec;

public NettyChannel(NettyClient nettyClient) {
this.nettyClient = nettyClient;
this.remoteAddress = new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort());

codec = ExtensionLoader.getExtensionLoader(Codec.class).getExtension(nettyClient.getUrl().getParameter(URLParamType.codec.getName(), URLParamType.codec.getValue()));
}

@Override
public InetSocketAddress getLocalAddress() {
return localAddress;
}

@Override
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}

@Override
public Response request(Request request) throws TransportException {
int timeout = nettyClient.getUrl().getMethodParameter(request.getMethodName(), request.getParamtersDesc(), URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
if (timeout <= 0) {
throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
ResponseFuture response = new DefaultResponseFuture(request, timeout, this.nettyClient.getUrl());
this.nettyClient.registerCallback(request.getRequestId(), response);
byte[] msg = CodecUtil.encodeObjectToBytes(this, codec, request);
ChannelFuture writeFuture = this.channel.writeAndFlush(msg);
boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);

if (result && writeFuture.isSuccess()) {
MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_CSEND, System.currentTimeMillis());
response.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess() || (future.isDone() && ExceptionUtil.isBizException(future.getException()))) {
// 成功的调用
nettyClient.resetErrorCount();
} else {
// 失败的调用
nettyClient.incrErrorCount();
}
}
});
return response;
}

writeFuture.cancel(true);
response = this.nettyClient.removeCallback(request.getRequestId());
if (response != null) {
response.cancel();
}
// 失败的调用
nettyClient.incrErrorCount();

if (writeFuture.cause() != null) {
throw new MotanServiceException("NettyChannel send request to server Error: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request), writeFuture.cause());
} else {
throw new MotanServiceException("NettyChannel send request to server Timeout: url="
+ nettyClient.getUrl().getUri() + " local=" + localAddress + " "
+ MotanFrameworkUtil.toString(request));
}
}

@Override
public synchronized boolean open() {
if (isAvailable()) {
LoggerUtil.warn("the channel already open, local: " + localAddress + " remote: " + remoteAddress + " url: " + nettyClient.getUrl().getUri());
return true;
}

ChannelFuture channelFuture = null;
try {
long start = System.currentTimeMillis();
channelFuture = nettyClient.getBootstrap().connect(remoteAddress);
int timeout = nettyClient.getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
if (timeout <= 0) {
throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
// 不去依赖于connectTimeout
boolean result = channelFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
boolean success = channelFuture.isSuccess();

if (result && success) {
channel = channelFuture.channel();
if (channel.localAddress() != null && channel.localAddress() instanceof InetSocketAddress) {
localAddress = (InetSocketAddress) channel.localAddress();
}
state = ChannelState.ALIVE;
return true;
}
boolean connected = false;
if (channelFuture.channel() != null) {
connected = channelFuture.channel().isActive();
}

if (channelFuture.cause() != null) {
channelFuture.cancel(true);
throw new MotanServiceException("NettyChannel failed to connect to server, url: " + nettyClient.getUrl().getUri() + ", result: " + result + ", success: " + success + ", connected: " + connected, channelFuture.cause());
} else {
channelFuture.cancel(true);
throw new MotanServiceException("NettyChannel connect to server timeout url: " + nettyClient.getUrl().getUri() + ", cost: " + (System.currentTimeMillis() - start) + ", result: " + result + ", success: " + success + ", connected: " + connected);
}
} catch (MotanServiceException e) {
throw e;
} catch (Exception e) {
if (channelFuture != null) {
channelFuture.channel().close();
}
throw new MotanServiceException("NettyChannel failed to connect to server, url: " + nettyClient.getUrl().getUri(), e);
} finally {
if (!state.isAliveState()) {
nettyClient.incrErrorCount();
}
}
}

@Override
public synchronized void close() {
close(0);
}

@Override
public synchronized void close(int timeout) {
try {
state = ChannelState.CLOSE;

if (channel != null) {
channel.close();
}
} catch (Exception e) {
LoggerUtil.error("NettyChannel close Error: " + nettyClient.getUrl().getUri() + " local=" + localAddress, e);
}
}

@Override
public boolean isClosed() {
return state.isCloseState();
}

@Override
public boolean isAvailable() {
return state.isAliveState() && channel != null && channel.isActive();
}

public void reconnect() {
state = ChannelState.INIT;
}

public boolean isReconnect() {
return state.isInitState();
}

@Override
public URL getUrl() {
return nettyClient.getUrl();
}

public ReentrantLock getLock() {
return lock;
}
}
Loading

0 comments on commit 7fbbf69

Please sign in to comment.