TODO: 解决动态配置的函数反射
This commit is contained in:
parent
64bebb1ced
commit
dd55ff2f90
|
@ -1,86 +0,0 @@
|
|||
package cn.ecpark.service.usergw.biz.filters;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
import org.apache.dubbo.config.ApplicationConfig;
|
||||
import org.apache.dubbo.config.ReferenceConfig;
|
||||
import org.apache.dubbo.config.utils.ReferenceConfigCache;
|
||||
import org.apache.dubbo.rpc.service.GenericService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
|
||||
import org.springframework.cloud.gateway.filter.GlobalFilter;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
||||
import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool;
|
||||
import io.netty.util.Recycler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class DubboFilter implements GlobalFilter, Ordered {
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext appContext;
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.LOWEST_PRECEDENCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
||||
// exchange.getRequest();
|
||||
|
||||
String application = "dubbo-exchange"; // 必须
|
||||
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
|
||||
String rgroup = "group";
|
||||
String rinterface = "ocean.demo.api.IExchange"; // 弱类型接口名 必须
|
||||
String rversion = "1.0.0";
|
||||
String rmethod = "Say";
|
||||
|
||||
|
||||
String rkey = "";
|
||||
rkey += application;
|
||||
|
||||
if(rgroup != null && !rgroup.isEmpty()){
|
||||
rkey += "/" + rgroup;
|
||||
}
|
||||
|
||||
rkey += "/" + rinterface;
|
||||
rkey += ":" + rversion;
|
||||
|
||||
GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
|
||||
if(!gsPool.contains(rkey)) {
|
||||
|
||||
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
|
||||
|
||||
reference.setApplication(new ApplicationConfig(application)); // 必须
|
||||
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
|
||||
// reference.setGroup("group");
|
||||
reference.setInterface(rinterface); // 弱类型接口名 必须
|
||||
reference.setVersion(rversion); // 必须
|
||||
reference.setGeneric(true); // 声明为泛化接口
|
||||
|
||||
gsPool.add(rkey, reference.get());
|
||||
}
|
||||
|
||||
GenericService gs = gsPool.get(rkey);
|
||||
Object result = gs.$invoke(rmethod, new String[] {"java.lang.String"}, new Object[] {"213"});
|
||||
gsPool.add(rkey, gs);
|
||||
|
||||
if (result != null) {
|
||||
ServerHttpResponse response = exchange.getResponse();
|
||||
return response.writeWith(
|
||||
Mono.just(response.bufferFactory().wrap(ByteBuffer.wrap(JSON.toJSONString(result).getBytes()))));
|
||||
}
|
||||
return chain.filter(exchange);
|
||||
}
|
||||
|
||||
}
|
|
@ -22,71 +22,21 @@ import lombok.extern.slf4j.Slf4j;
|
|||
@Slf4j
|
||||
public class GenericServicePool {
|
||||
|
||||
private class GenericServiceManager {
|
||||
|
||||
BlockingQueue<GenericService> idle = new LinkedBlockingQueue<>();
|
||||
private String key;
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return this.key.hashCode();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return String return the key
|
||||
*/
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param key the key to set
|
||||
*/
|
||||
public void setKey(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
public GenericService get() {
|
||||
try {
|
||||
return idle.take();
|
||||
} catch (Exception e) {
|
||||
log.error("悠闲队列出现问题: " + e.getStackTrace());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void add(GenericService gs) {
|
||||
idle.add(gs);
|
||||
}
|
||||
|
||||
public GenericServiceManager() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
HashMap<String, GenericServiceManager> gsDictionary;
|
||||
HashMap<String, GenericService> gsDictionary;
|
||||
|
||||
public GenericServicePool() {
|
||||
gsDictionary = new HashMap<String, GenericServiceManager>();
|
||||
gsDictionary = new HashMap<String, GenericService>();
|
||||
}
|
||||
|
||||
public GenericService get(String key) {
|
||||
GenericServiceManager pool = gsDictionary.get(key);
|
||||
return pool.get();
|
||||
return gsDictionary.get(key);
|
||||
}
|
||||
|
||||
public void add(String key, GenericService genericService) {
|
||||
GenericServiceManager pool;
|
||||
if (gsDictionary.containsKey(key)) {
|
||||
pool = gsDictionary.get(key);
|
||||
} else {
|
||||
pool = new GenericServiceManager();
|
||||
gsDictionary.put(key, pool);
|
||||
}
|
||||
pool.add(genericService);
|
||||
public void put(String key, GenericService genericService) {
|
||||
gsDictionary.put(key, genericService);
|
||||
}
|
||||
|
||||
public boolean contains(String key) {
|
||||
return gsDictionary.containsKey(key);
|
||||
}
|
||||
// public boolean contains(String key) {
|
||||
// return gsDictionary.containsKey(key);
|
||||
// }
|
||||
}
|
|
@ -30,8 +30,6 @@ public class DubboGatewayFilterFactory extends AbstractGatewayFilterFactory<Dubb
|
|||
@Autowired
|
||||
private ApplicationContext appContext;
|
||||
|
||||
private Map<String, GenericService> gsCache = new HashMap<String, GenericService>();
|
||||
|
||||
/**
|
||||
* DUBBO_URI key.
|
||||
*/
|
||||
|
@ -73,22 +71,8 @@ public class DubboGatewayFilterFactory extends AbstractGatewayFilterFactory<Dubb
|
|||
rkey += "/" + rinterface;
|
||||
rkey += ":" + rversion;
|
||||
|
||||
// GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
|
||||
// if (!gsPool.contains(rkey)) {
|
||||
// for (int i = 0; i < rconnections; i++) {
|
||||
// ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
|
||||
// reference.setApplication(new ApplicationConfig("dubbo-exchange")); // 必须
|
||||
// // reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
|
||||
// reference.setGroup("test");
|
||||
// reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名 必须
|
||||
// reference.setVersion("1.0.0"); // 必须
|
||||
// reference.setGeneric(true); // 声明为泛化接口
|
||||
// gsPool.add(rkey, reference.get());
|
||||
// }
|
||||
// }
|
||||
// GenericService gs = gsPool.get(rkey);
|
||||
|
||||
GenericService gs = gsCache.get(rkey);
|
||||
GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
|
||||
GenericService gs = gsPool.get(rkey);
|
||||
if(gs == null) {
|
||||
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
|
||||
reference.setApplication(new ApplicationConfig("dubbo-exchange")); // 必须
|
||||
|
@ -96,13 +80,13 @@ public class DubboGatewayFilterFactory extends AbstractGatewayFilterFactory<Dubb
|
|||
reference.setGroup("test");
|
||||
reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名 必须
|
||||
reference.setVersion("1.0.0"); // 必须
|
||||
reference.setConnections(20);
|
||||
reference.setConnections(rconnections);
|
||||
reference.setGeneric(true); // 声明为泛化接口
|
||||
// gsPool.add(rkey, reference.get());
|
||||
gs = reference.get();
|
||||
gsCache.put(rkey, gs);
|
||||
gsPool.put(rkey, gs);
|
||||
}
|
||||
|
||||
// special
|
||||
Object result = gs.$invoke("Say", new String[] { "java.lang.String" }, new Object[] { "222" });
|
||||
// gsPool.add(rkey, gs);
|
||||
if (result != null) {
|
||||
|
|
|
@ -4,11 +4,16 @@ import java.io.InputStream;
|
|||
import java.lang.reflect.Method;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.dubbo.config.ReferenceConfig;
|
||||
import org.apache.dubbo.rpc.service.GenericService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.cloud.gateway.filter.FilterDefinition;
|
||||
|
@ -38,7 +43,8 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
|
||||
// List<MediaType> mediaTypes = new ArrayList<>();
|
||||
|
||||
List<FilterDefinition> defaultFilters;
|
||||
List<FilterDefinition> defaultFilters = new ArrayList<>();
|
||||
HashMap<String, BiConsumer<ReferenceConfig<GenericService> , String>> specialField = new HashMap<String, BiConsumer<ReferenceConfig<GenericService> , String>>();
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext applicationContext;
|
||||
|
@ -53,8 +59,10 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
return new DubboGatewayFilterFactory();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private Http2Dubbo http2Dubbo;
|
||||
|
||||
public ConfigGateway() {
|
||||
specialField.put("application", ConfigSpecialFunction::setApplication);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -94,7 +102,7 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
List<RouteDefinition> routeList = new ArrayList<RouteDefinition>();
|
||||
|
||||
if (defaultYaml != null && !defaultYaml.isEmpty()) {
|
||||
defaultFilters = this.getDefaultFilter(defaultYaml);
|
||||
this.getDefaultFilter(defaultFilters, defaultYaml);
|
||||
this.configDefault(routeList, defaultYaml);
|
||||
}
|
||||
|
||||
|
@ -114,9 +122,7 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<FilterDefinition> getDefaultFilter(Map<String, Object> defaultYaml) {
|
||||
List<FilterDefinition> filters = new ArrayList<>();
|
||||
|
||||
private void getDefaultFilter(List<FilterDefinition> filters, Map<String, Object> defaultYaml) {
|
||||
// default-filters: 下的相关设置
|
||||
Object unknownDefaultFilters = defaultYaml.get("default-filters");
|
||||
if (unknownDefaultFilters != null) {
|
||||
|
@ -125,8 +131,6 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
filters.add(new FilterDefinition(filterString));
|
||||
}
|
||||
}
|
||||
|
||||
return filters;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -193,23 +197,6 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
|
||||
}
|
||||
|
||||
// RequestMappingHandlerMapping requestMapping = (RequestMappingHandlerMapping)
|
||||
// applicationContext
|
||||
// .getBean("requestMappingHandlerMapping");
|
||||
|
||||
// requestMapping.registerMapping(RequestMappingInfo.paths("/test/xixi").methods(RequestMethod.POST).build(),
|
||||
// http2Dubbo, Http2Dubbo.class.getDeclaredMethod("H2DTest"));
|
||||
|
||||
// 引用远程服务
|
||||
// try {
|
||||
// ReferenceConfig<GenericService> reference = new
|
||||
// ReferenceConfig<GenericService>(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
|
||||
|
||||
// reference.setInterface("com.xxx.XxxService"); // 弱类型接口名
|
||||
// reference.setVersion("2.0.0");
|
||||
// reference.setGeneric(true); // 声明为泛化接口
|
||||
// GenericService gs = reference.get();
|
||||
|
||||
} catch (Exception e) {
|
||||
// TODO: 任何错误都直接终止退出
|
||||
log.error(e.toString());
|
||||
|
@ -249,11 +236,14 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
|
||||
// 设置uri
|
||||
Object uri = iter.get("uri");
|
||||
|
||||
try {
|
||||
rd.setUri(URI.create((String) uri));
|
||||
} catch (Exception e) {
|
||||
log.warn("dubbo Uri error");
|
||||
if(uri != null) {
|
||||
try {
|
||||
rd.setUri(URI.create((String) uri));
|
||||
} catch (Exception e) {
|
||||
log.warn("dubbo Uri error");
|
||||
rd.setUri(URI.create("dubbo://yame"));
|
||||
}
|
||||
} else {
|
||||
rd.setUri(URI.create("dubbo://yame"));
|
||||
}
|
||||
|
||||
|
@ -271,14 +261,21 @@ public class ConfigGateway implements RouteDefinitionLocator {
|
|||
}
|
||||
}
|
||||
|
||||
String uriString = "dubbo://";
|
||||
|
||||
Object group = iter.get("group");
|
||||
if (group != null) {
|
||||
rd.setOrder((int) group);
|
||||
}
|
||||
|
||||
Object registry = iter.get("registry");
|
||||
if (registry != null) {
|
||||
rd.setOrder((int) order);
|
||||
rd.setOrder((int) registry);
|
||||
}
|
||||
|
||||
Object application = iter.get("application");
|
||||
if (registry != null) {
|
||||
rd.setOrder((int) order);
|
||||
rd.setOrder((int) application);
|
||||
}
|
||||
|
||||
return "";
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
package cn.ecpark.service.usergw.config;
|
||||
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.dubbo.config.ApplicationConfig;
|
||||
import org.apache.dubbo.config.ReferenceConfig;
|
||||
import org.apache.dubbo.rpc.service.GenericService;
|
||||
|
||||
/**
|
||||
* ConfigSpecialFunction
|
||||
*/
|
||||
public class ConfigSpecialFunction {
|
||||
|
||||
// public static BiConsumer<ReferenceConfig<GenericService>, String> setApplication = (ref, cfgValue) -> {
|
||||
// ref.setApplication(new ApplicationConfig(cfgValue));
|
||||
// };
|
||||
|
||||
public static void setApplication(ReferenceConfig<GenericService> ref, String cfgValue) {
|
||||
ref.setApplication(new ApplicationConfig(cfgValue));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user