DubboFilterFactory 没反映

This commit is contained in:
huangsimin 2019-07-02 18:42:47 +08:00
parent fb6382a57a
commit f9da760e8d
5 changed files with 264 additions and 73 deletions

View File

@ -17,10 +17,13 @@ import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool;
import io.netty.util.Recycler;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Component
@Slf4j
public class DubboFilter implements GlobalFilter, Ordered {
@Autowired
@ -31,25 +34,46 @@ public class DubboFilter implements GlobalFilter, Ordered {
return Ordered.LOWEST_PRECEDENCE;
}
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// exchange.getRequest();
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量里面封装了所有与注册中心及服务提供方连接请缓存
reference.setApplication(new ApplicationConfig("dubbo-exchange"));
String application = "dubbo-exchange"; // 必须
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setGroup("group");
reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名
reference.setVersion("1.0.0");
reference.setGeneric(true); // 声明为泛化接口
String rgroup = "group";
String rinterface = "ocean.demo.api.IExchange"; // 弱类型接口名 必须
String rversion = "1.0.0";
String rmethod = "Say";
Recycler a;
String rkey = "";
rkey += application;
GenericService gs = cache.get(reference);
Object result = gs.$invoke("Hello", new String[] {}, new Object[] {});
if(rgroup != ""){
rkey += "/" + rgroup;
}
rkey += "/" + rinterface;
rkey += ":" + rversion;
GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
if(!gsPool.contains(rkey)) {
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量里面封装了所有与注册中心及服务提供方连接请缓存
reference.setApplication(new ApplicationConfig(application)); // 必须
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
// reference.setGroup("group");
reference.setInterface(rinterface); // 弱类型接口名 必须
reference.setVersion(rversion); // 必须
reference.setGeneric(true); // 声明为泛化接口
gsPool.add(rkey, reference.get());
}
GenericService gs = gsPool.get(rkey);
Object result = gs.$invoke(rmethod, new String[] {"java.lang.String"}, new Object[] {"213"});
if (result != null) {
ServerHttpResponse response = exchange.getResponse();

View File

@ -21,9 +21,7 @@ import lombok.extern.slf4j.Slf4j;
public class GenericServicePool {
private class GenericServiceManager {
Lock idleLock = new ReentrantLock();
LinkedList<GenericService> idle = new LinkedList<>();
private String key;
@ -59,6 +57,7 @@ public class GenericServicePool {
}
} catch (Exception e) {
// TODO: handle exception
log.error("悠闲队列出现问题: " + e.toString());
} finally {
idleLock.unlock();
}
@ -72,6 +71,8 @@ public class GenericServicePool {
}
}
public GenericServiceManager() {
}
@ -99,4 +100,7 @@ public class GenericServicePool {
pool.add(genericService);
}
public boolean contains(String key) {
return gsDictionary.containsKey(key);
}
}

View File

@ -0,0 +1,104 @@
package cn.ecpark.service.usergw.biz.filters.factory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSON;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.rpc.service.GenericService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool;
import reactor.core.publisher.Mono;;
@Component
public class DubboFilterFactory extends AbstractGatewayFilterFactory<DubboFilterFactory.Config> {
@Autowired
private ApplicationContext appContext;
public DubboFilterFactory() {
super(Config.class);
}
@Override
public GatewayFilter apply(Config config) {
String dubboUri = config.dubboUri;
return (exchange, chain) -> {
String application = "dubbo-exchange"; // 必须
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
String rgroup = "group";
String rinterface = "ocean.demo.api.IExchange"; // 弱类型接口名 必须
String rversion = "1.0.0";
String rmethod = "Say";
List<String> rparamTypes = new ArrayList<String>();
List<String> rparams = new ArrayList<String>();
String rkey = "";
rkey += application;
if(rgroup != ""){
rkey += "/" + rgroup;
}
rkey += "/" + rinterface;
rkey += ":" + rversion;
GenericServicePool gsPool = appContext.getBean(GenericServicePool.class);
if(!gsPool.contains(rkey)) {
ReferenceConfig<GenericService> reference = new ReferenceConfig<GenericService>(); // 该实例很重量里面封装了所有与注册中心及服务提供方连接请缓存
reference.setApplication(new ApplicationConfig("dubbo-exchange")); // 必须
// reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setGroup("group");
reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名 必须
reference.setVersion("1.0.0"); // 必须
reference.setGeneric(true); // 声明为泛化接口
gsPool.add(rkey, reference.get());
}
GenericService gs = gsPool.get(rkey);
Object result = gs.$invoke("Hello", new String[] {}, new Object[] {});
if (result != null) {
ServerHttpResponse response = exchange.getResponse();
return response.writeWith(
Mono.just(response.bufferFactory().wrap(ByteBuffer.wrap(JSON.toJSONString(result).getBytes()))));
}
return chain.filter(exchange);
// gs.$invoke(method, parameterTypes, args)
};
}
public static class Config {
private String dubboUri;
public String getDubboUri() {
return dubboUri;
}
public void setDubboUri(String dubboUri) {
this.dubboUri = dubboUri;
}
}
}

View File

@ -16,13 +16,13 @@ import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition;
import org.springframework.cloud.gateway.route.RouteDefinition;
import org.springframework.cloud.gateway.route.RouteDefinitionLocator;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.reactive.result.method.RequestMappingInfo;
import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping;
import org.yaml.snakeyaml.Yaml;
import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool;
import cn.ecpark.service.usergw.biz.filters.factory.DubboFilterFactory;
import cn.ecpark.service.usergw.impl.http.Http2Dubbo;
import cn.ecpark.service.usergw.utils.Convert;
import lombok.extern.slf4j.Slf4j;
@ -38,9 +38,21 @@ public class ConfigGateway implements RouteDefinitionLocator {
// List<MediaType> mediaTypes = new ArrayList<>();
List<FilterDefinition> defaultFilters;
@Autowired
private ApplicationContext applicationContext;
@Bean
public GenericServicePool genericServicePool() {
return new GenericServicePool();
}
@Bean
public DubboFilterFactory dubboFilterFactory() {
return new DubboFilterFactory();
}
@Autowired
private Http2Dubbo http2Dubbo;
@ -76,48 +88,62 @@ public class ConfigGateway implements RouteDefinitionLocator {
// log.error(e.toString());
// }
Map<String, Object> configDefault = (Map<String, Object>) configYaml.get("default");
Map<String, Object> configDubbo = (Map<String, Object>) configYaml.get("dubbo");
Map<String, Object> defaultYaml = (Map<String, Object>) configYaml.get("default");
Map<String, Object> dubboYaml = (Map<String, Object>) configYaml.get("dubbo");
if ( configDubbo != null && !configDubbo.isEmpty()) {
this.createHttp2Dubbo(configDubbo);
List<RouteDefinition> routeList = new ArrayList<RouteDefinition>();
if (defaultYaml != null && !defaultYaml.isEmpty()) {
defaultFilters = this.getDefaultFilter(defaultYaml);
this.configDefault(routeList, defaultYaml);
}
if( configDefault != null && !configDefault.isEmpty()) {
List<RouteDefinition> result = this.getConfigDefault(configDefault);
if (result != null ) {
return Flux.fromIterable(result);
}
if (dubboYaml != null && !dubboYaml.isEmpty()) {
this.configHttp2Dubbo(routeList, dubboYaml);
}
if (!routeList.isEmpty()) {
return Flux.fromIterable(routeList);
}
}
}
return Flux.empty();
// return Flux.fromIterable(it);
}
@SuppressWarnings("unchecked")
private List<RouteDefinition> getConfigDefault(Map<String, Object> configDefault) {
if (configDefault != null) {
List<RouteDefinition> RDList = new ArrayList<>();
private List<FilterDefinition> getDefaultFilter(Map<String, Object> defaultYaml) {
List<FilterDefinition> filters = new ArrayList<>();
List<FilterDefinition> filters = new ArrayList<>();
List<PredicateDefinition> predicates = new ArrayList<>();
// default-filters: 下的相关设置
Object unknownDefaultFilters = configDefault.get("default-filters");
if (unknownDefaultFilters != null) {
List<String> defaultFiltersYaml = (ArrayList<String>) unknownDefaultFilters;
for (String filterString : defaultFiltersYaml) {
filters.add(new FilterDefinition(filterString));
}
// default-filters: 下的相关设置
Object unknownDefaultFilters = defaultYaml.get("default-filters");
if (unknownDefaultFilters != null) {
List<String> defaultFiltersYaml = (ArrayList<String>) unknownDefaultFilters;
for (String filterString : defaultFiltersYaml) {
filters.add(new FilterDefinition(filterString));
}
}
Object unknownRoutes = configDefault.get("routes");
return filters;
}
@SuppressWarnings("unchecked")
private void configDefault(List<RouteDefinition> routeList, Map<String, Object> defaultYaml) {
if (defaultYaml != null) {
Object unknownRoutes = defaultYaml.get("routes");
if (unknownRoutes != null) {
List<LinkedHashMap<String, List<String>>> routes = (ArrayList<LinkedHashMap<String, List<String>>>) unknownRoutes;
for (LinkedHashMap<String, List<String>> iter : routes) {
List<FilterDefinition> filters = new ArrayList<>();
List<PredicateDefinition> predicates = new ArrayList<>();
RouteDefinition rd = new RouteDefinition();
// 设置基础属性
this.ParseAndSetBase(rd, iter);
@ -130,37 +156,34 @@ public class ConfigGateway implements RouteDefinitionLocator {
rd.setPredicates(predicates);
rd.setFilters(filters);
RDList.add(rd);
routeList.add(rd);
}
return RDList;
}
}
return null;
}
private void ParseAndSetBase(RouteDefinition rd, LinkedHashMap<String, List<String>> iter) {
// 设置id
Object id = iter.get("id");
if (id != null) {
rd.setId((String) id);
}
// 设置id
Object id = iter.get("id");
if (id != null) {
rd.setId((String) id);
}
// 设置uri
Object uri = iter.get("uri");
if (uri != null) {
rd.setUri(URI.create((String) uri));
}
// 设置uri
Object uri = iter.get("uri");
if (uri != null) {
rd.setUri(URI.create((String) uri));
}
// 设置uri
Object order = iter.get("order");
if (order != null) {
rd.setOrder((int) order);
}
// 设置uri
Object order = iter.get("order");
if (order != null) {
rd.setOrder((int) order);
}
}
private void ParseAndAddPredicates(List<PredicateDefinition> predicates, LinkedHashMap<String, List<String>> iter, String yamlField) {
private void ParseAndAddPredicates(List<PredicateDefinition> predicates, LinkedHashMap<String, List<String>> iter,
String yamlField) {
List<String> predicatesYaml = iter.get(yamlField);
if (predicatesYaml != null) {
for (String predicateString : predicatesYaml) {
@ -169,25 +192,53 @@ public class ConfigGateway implements RouteDefinitionLocator {
}
}
private void ParseAndAddFilters(List<FilterDefinition> filters, LinkedHashMap<String, List<String>> iter, String yamlField) {
private void ParseAndAddFilters(List<FilterDefinition> filters, LinkedHashMap<String, List<String>> iter,
String yamlField) {
List<String> filtersYaml = iter.get(yamlField);
if (filtersYaml != null) {
filters.addAll(defaultFilters);
for (String filterString : filtersYaml) {
filters.add(new FilterDefinition(filterString));
}
}
}
private void createHttp2Dubbo(Map<String, Object> configDubbo) {
private void configHttp2Dubbo(List<RouteDefinition> routeList, Map<String, Object> configDubbo) {
try {
RequestMappingHandlerMapping requestMapping = (RequestMappingHandlerMapping) applicationContext
.getBean("requestMappingHandlerMapping");
requestMapping.registerMapping(RequestMappingInfo.paths("/test/xixi").methods(RequestMethod.POST).build(),
http2Dubbo, Http2Dubbo.class.getDeclaredMethod("H2DTest"));
Object unknownRoutes = configDubbo.get("routes");
if (unknownRoutes != null) {
GenericServicePool pool = applicationContext.getBean(GenericServicePool.class);
List<LinkedHashMap<String, List<String>>> routes = (ArrayList<LinkedHashMap<String, List<String>>>) unknownRoutes;
for (LinkedHashMap<String, List<String>> iter : routes) {
List<FilterDefinition> filters = new ArrayList<>();
List<PredicateDefinition> predicates = new ArrayList<>();
RouteDefinition rd = new RouteDefinition();
this.ParseAndSetBase(rd, iter);
// 设置基础属性
this.ParseAndSetBase(rd, iter);
// predicates: 下的相关属性
this.ParseAndAddPredicates(predicates, iter, "predicates");
// filters: 下的相关属性
this.ParseAndAddFilters(filters, iter, "filters");
}
}
// RequestMappingHandlerMapping requestMapping = (RequestMappingHandlerMapping)
// applicationContext
// .getBean("requestMappingHandlerMapping");
// requestMapping.registerMapping(RequestMappingInfo.paths("/test/xixi").methods(RequestMethod.POST).build(),
// http2Dubbo, Http2Dubbo.class.getDeclaredMethod("H2DTest"));
// 引用远程服务
// try {
@ -201,6 +252,7 @@ public class ConfigGateway implements RouteDefinitionLocator {
} catch (Exception e) {
// TODO: 任何错误都直接终止退出
log.error(e.toString());
SpringApplication.exit(applicationContext);
}

View File

@ -17,7 +17,14 @@ default:
dubbo:
routes:
- id: test
path: /dubbo/hello
order: 10
application: dubbo-exchange
uri: http://httpbin.org:80/get
# path: /dubbo/hello
interface: ocean.demo.api.IExchange
version: 1.0.0
predicates:
- Path=/dubbo/hello
filters:
- Dubbo=234