使用dubbo内部的线程池
This commit is contained in:
parent
5369fcb8be
commit
64bebb1ced
|
@ -19,8 +19,6 @@ import org.springframework.context.annotation.Configuration;
|
|||
public class App {
|
||||
|
||||
public static void main(String[] args) {
|
||||
URI a = URI.create("dubbo://127.0.0.1/com.a.b?group=test&version=123");
|
||||
URI b = URI.create("dubbo://com.a.b?group=test&version=123");
|
||||
SpringApplication.run(App.class, args);
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,6 @@ public class GenericServicePool {
|
|||
try {
|
||||
return idle.take();
|
||||
} catch (Exception e) {
|
||||
// TODO: handle exception
|
||||
log.error("悠闲队列出现问题: " + e.getStackTrace());
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -3,7 +3,9 @@ package cn.ecpark.service.usergw.biz.filters.factory;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
|
@ -13,8 +15,9 @@ import org.apache.dubbo.rpc.service.GenericService;
|
|||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.gateway.filter.GatewayFilter;
|
||||
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
|
||||
import org.springframework.cloud.gateway.support.HttpStatusHolder;
|
||||
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
@ -27,32 +30,35 @@ public class DubboGatewayFilterFactory extends AbstractGatewayFilterFactory<Dubb
|
|||
@Autowired
|
||||
private ApplicationContext appContext;
|
||||
|
||||
/**
|
||||
* DUBBO_URI key.
|
||||
*/
|
||||
public static final String DUBBO_URI = "dubbo_uri";
|
||||
private Map<String, GenericService> gsCache = new HashMap<String, GenericService>();
|
||||
|
||||
/**
|
||||
* DUBBO_URI key.
|
||||
*/
|
||||
public static final String DUBBO_URI = "dubbo_uri";
|
||||
|
||||
public DubboGatewayFilterFactory() {
|
||||
super(Config.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> shortcutFieldOrder() {
|
||||
return Arrays.asList(DUBBO_URI);
|
||||
}
|
||||
public List<String> shortcutFieldOrder() {
|
||||
return Arrays.asList(DUBBO_URI);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GatewayFilter apply(Config config) {
|
||||
String uri = config.dubboUri;
|
||||
|
||||
return (exchange, chain) -> {
|
||||
|
||||
|
||||
String application = "dubbo-exchange"; // 必须
|
||||
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
|
||||
String rgroup = "group";
|
||||
String rinterface = "ocean.demo.api.IExchange"; // 弱类型接口名 必须
|
||||
String rversion = "1.0.0";
|
||||
String rmethod = "Say";
|
||||
int rconnections = 3;
|
||||
|
||||
List<String> rparamTypes = new ArrayList<String>();
|
||||
List<String> rparams = new ArrayList<String>();
|
||||
|
@ -67,54 +73,60 @@ public class DubboGatewayFilterFactory extends AbstractGatewayFilterFactory<Dubb
|
|||
rkey += "/" + rinterface;
|
||||
rkey += ":" + rversion;
|
||||
|
||||
GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
|
||||
if (!gsPool.contains(rkey)) {
|
||||
|
||||
for (int i = 0; i < 1; i++) {
|
||||
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
|
||||
|
||||
reference.setApplication(new ApplicationConfig("dubbo-exchange")); // 必须
|
||||
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
|
||||
reference.setGroup("test");
|
||||
reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名 必须
|
||||
reference.setConnections(5);
|
||||
reference.setVersion("1.0.0"); // 必须
|
||||
reference.setGeneric(true); // 声明为泛化接口
|
||||
gsPool.add(rkey, reference.get());
|
||||
}
|
||||
// GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
|
||||
// if (!gsPool.contains(rkey)) {
|
||||
// for (int i = 0; i < rconnections; i++) {
|
||||
// ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
|
||||
// reference.setApplication(new ApplicationConfig("dubbo-exchange")); // 必须
|
||||
// // reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
|
||||
// reference.setGroup("test");
|
||||
// reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名 必须
|
||||
// reference.setVersion("1.0.0"); // 必须
|
||||
// reference.setGeneric(true); // 声明为泛化接口
|
||||
// gsPool.add(rkey, reference.get());
|
||||
// }
|
||||
// }
|
||||
// GenericService gs = gsPool.get(rkey);
|
||||
|
||||
GenericService gs = gsCache.get(rkey);
|
||||
if(gs == null) {
|
||||
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
|
||||
reference.setApplication(new ApplicationConfig("dubbo-exchange")); // 必须
|
||||
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
|
||||
reference.setGroup("test");
|
||||
reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名 必须
|
||||
reference.setVersion("1.0.0"); // 必须
|
||||
reference.setConnections(20);
|
||||
reference.setGeneric(true); // 声明为泛化接口
|
||||
// gsPool.add(rkey, reference.get());
|
||||
gs = reference.get();
|
||||
gsCache.put(rkey, gs);
|
||||
}
|
||||
|
||||
GenericService gs = gsPool.get(rkey);
|
||||
Object result = gs.$invoke("Say", new String[] { "java.lang.String" }, new Object[] { "222" });
|
||||
|
||||
gsPool.add(rkey, gs);
|
||||
|
||||
// gsPool.add(rkey, gs);
|
||||
if (result != null) {
|
||||
ServerHttpResponse response = exchange.getResponse();
|
||||
return response.writeWith(
|
||||
Mono.just(response.bufferFactory().wrap(ByteBuffer.wrap(JSON.toJSONString(result).getBytes()))));
|
||||
return response.writeWith(Mono
|
||||
.just(response.bufferFactory().wrap(ByteBuffer.wrap(JSON.toJSONString(result).getBytes()))));
|
||||
}
|
||||
|
||||
return chain.filter(exchange);
|
||||
|
||||
// gs.$invoke(method, parameterTypes, args)
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
public static class Config {
|
||||
|
||||
private String dubboUri;
|
||||
private String dubboUri;
|
||||
|
||||
public String getDubboUri() {
|
||||
return dubboUri;
|
||||
}
|
||||
public String getDubboUri() {
|
||||
return dubboUri;
|
||||
}
|
||||
|
||||
public void setDubboUri(String dubboUri) {
|
||||
this.dubboUri = dubboUri;
|
||||
}
|
||||
public void setDubboUri(String dubboUri) {
|
||||
this.dubboUri = dubboUri;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -249,10 +249,12 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
|
||||
// 设置uri
|
||||
Object uri = iter.get("uri");
|
||||
if (uri != null) {
|
||||
|
||||
try {
|
||||
rd.setUri(URI.create((String) uri));
|
||||
} else {
|
||||
rd.setUri(URI.create("dubbo://"));
|
||||
} catch (Exception e) {
|
||||
log.warn("dubbo Uri error");
|
||||
rd.setUri(URI.create("dubbo://yame"));
|
||||
}
|
||||
|
||||
// 设置uri
|
||||
|
@ -261,13 +263,13 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
rd.setOrder((int) order);
|
||||
}
|
||||
|
||||
if(uri != null) {
|
||||
String uriString = (String)uri;
|
||||
if (uri != null) {
|
||||
String uriString = (String) uri;
|
||||
uriString = uriString.trim();
|
||||
if(uriString.startsWith("dubbo://")) { // dubbo://127.0.0.1/interface?
|
||||
return uriString ;
|
||||
if (uriString.startsWith("dubbo://")) { // dubbo://127.0.0.1/interface?
|
||||
return uriString;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Object registry = iter.get("registry");
|
||||
if (registry != null) {
|
||||
|
@ -311,15 +313,15 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
for (String filterString : filtersYaml) {
|
||||
FilterDefinition fd = new FilterDefinition(filterString);
|
||||
log.info(fd.getName());
|
||||
if (!fd.getName().equals("Dubbo")){
|
||||
if (!fd.getName().equals("Dubbo")) {
|
||||
filters.add(fd);
|
||||
} else {
|
||||
filters.add(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static void callMethod(ReferenceConfig ref, String name, String params) throws Exception {
|
||||
Method method = ref.getClass().getMethod("set" + Convert.firstUpperCase(name));
|
||||
method.invoke(ref, method.getParameterTypes(), params);
|
||||
|
|
|
@ -10,3 +10,4 @@ server.port=8888
|
|||
|
||||
# logging.level.org.springframework.cloud.gateway=debug
|
||||
logging.file=logs/log
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ dubbo:
|
|||
order: 10
|
||||
application: dubbo-exchange
|
||||
registry: zookeeper://127.0.0.1:2181
|
||||
uri: unknown
|
||||
interface: ocean.demo.api.IExchange
|
||||
version: 1.0.0
|
||||
predicates:
|
||||
|
|
Loading…
Reference in New Issue
Block a user