From f9da760e8d9b2a6c9088f6125fe431a55c9958d9 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Tue, 2 Jul 2019 18:42:47 +0800 Subject: [PATCH] =?UTF-8?q?DubboFilterFactory=20=E6=B2=A1=E5=8F=8D?= =?UTF-8?q?=E6=98=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../usergw/biz/filters/DubboFilter.java | 48 +++-- .../biz/filters/bean/GenericServicePool.java | 8 +- .../filters/factory/DubboFilterFactory.java | 104 +++++++++++ .../service/usergw/config/ConfigGateway.java | 168 ++++++++++++------ .../src/main/resources/gateway.yaml | 9 +- 5 files changed, 264 insertions(+), 73 deletions(-) create mode 100644 usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/factory/DubboFilterFactory.java diff --git a/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/DubboFilter.java b/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/DubboFilter.java index 3b91c0e..32d7661 100644 --- a/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/DubboFilter.java +++ b/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/DubboFilter.java @@ -17,10 +17,13 @@ import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; +import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool; import io.netty.util.Recycler; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; @Component +@Slf4j public class DubboFilter implements GlobalFilter, Ordered { @Autowired @@ -31,25 +34,46 @@ public class DubboFilter implements GlobalFilter, Ordered { return Ordered.LOWEST_PRECEDENCE; } - ReferenceConfigCache cache = ReferenceConfigCache.getCache(); - @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { // exchange.getRequest(); - ReferenceConfig reference = new ReferenceConfig(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存 - - reference.setApplication(new ApplicationConfig("dubbo-exchange")); + + String application = "dubbo-exchange"; // 必须 // reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); - reference.setGroup("group"); - reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名 - reference.setVersion("1.0.0"); - reference.setGeneric(true); // 声明为泛化接口 + String rgroup = "group"; + String rinterface = "ocean.demo.api.IExchange"; // 弱类型接口名 必须 + String rversion = "1.0.0"; + String rmethod = "Say"; - Recycler a; + String rkey = ""; + rkey += application; - GenericService gs = cache.get(reference); - Object result = gs.$invoke("Hello", new String[] {}, new Object[] {}); + if(rgroup != ""){ + rkey += "/" + rgroup; + } + + rkey += "/" + rinterface; + rkey += ":" + rversion; + + GenericServicePool gsPool = appContext.getBean(GenericServicePool.class); + if(!gsPool.contains(rkey)) { + + ReferenceConfig reference = new ReferenceConfig(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存 + + reference.setApplication(new ApplicationConfig(application)); // 必须 + // reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); + // reference.setGroup("group"); + reference.setInterface(rinterface); // 弱类型接口名 必须 + reference.setVersion(rversion); // 必须 + reference.setGeneric(true); // 声明为泛化接口 + + gsPool.add(rkey, reference.get()); + } + + GenericService gs = gsPool.get(rkey); + + Object result = gs.$invoke(rmethod, new String[] {"java.lang.String"}, new Object[] {"213"}); if (result != null) { ServerHttpResponse response = exchange.getResponse(); diff --git a/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/bean/GenericServicePool.java b/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/bean/GenericServicePool.java index 2d94647..4ab883e 100644 --- a/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/bean/GenericServicePool.java +++ b/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/bean/GenericServicePool.java @@ -21,9 +21,7 @@ import lombok.extern.slf4j.Slf4j; public class GenericServicePool { private class GenericServiceManager { - Lock idleLock = new ReentrantLock(); - LinkedList idle = new LinkedList<>(); private String key; @@ -59,6 +57,7 @@ public class GenericServicePool { } } catch (Exception e) { // TODO: handle exception + log.error("悠闲队列出现问题: " + e.toString()); } finally { idleLock.unlock(); } @@ -72,6 +71,8 @@ public class GenericServicePool { } } + + public GenericServiceManager() { } @@ -99,4 +100,7 @@ public class GenericServicePool { pool.add(genericService); } + public boolean contains(String key) { + return gsDictionary.containsKey(key); + } } \ No newline at end of file diff --git a/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/factory/DubboFilterFactory.java b/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/factory/DubboFilterFactory.java new file mode 100644 index 0000000..c44df58 --- /dev/null +++ b/usergw-service/src/main/java/cn/ecpark/service/usergw/biz/filters/factory/DubboFilterFactory.java @@ -0,0 +1,104 @@ +package cn.ecpark.service.usergw.biz.filters.factory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import com.alibaba.fastjson.JSON; + +import org.apache.dubbo.config.ApplicationConfig; +import org.apache.dubbo.config.ReferenceConfig; +import org.apache.dubbo.rpc.service.GenericService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.gateway.filter.GatewayFilter; +import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.stereotype.Component; + +import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool; +import reactor.core.publisher.Mono;; + +@Component +public class DubboFilterFactory extends AbstractGatewayFilterFactory { + + @Autowired + private ApplicationContext appContext; + + public DubboFilterFactory() { + super(Config.class); + } + + @Override + public GatewayFilter apply(Config config) { + String dubboUri = config.dubboUri; + + return (exchange, chain) -> { + + String application = "dubbo-exchange"; // 必须 + // reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); + String rgroup = "group"; + String rinterface = "ocean.demo.api.IExchange"; // 弱类型接口名 必须 + String rversion = "1.0.0"; + String rmethod = "Say"; + List rparamTypes = new ArrayList(); + List rparams = new ArrayList(); + + + String rkey = ""; + rkey += application; + + if(rgroup != ""){ + rkey += "/" + rgroup; + } + + rkey += "/" + rinterface; + rkey += ":" + rversion; + + GenericServicePool gsPool = appContext.getBean(GenericServicePool.class); + if(!gsPool.contains(rkey)) { + + ReferenceConfig reference = new ReferenceConfig(); // 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存 + + reference.setApplication(new ApplicationConfig("dubbo-exchange")); // 必须 + // reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); + reference.setGroup("group"); + reference.setInterface("ocean.demo.api.IExchange"); // 弱类型接口名 必须 + reference.setVersion("1.0.0"); // 必须 + reference.setGeneric(true); // 声明为泛化接口 + + gsPool.add(rkey, reference.get()); + } + + GenericService gs = gsPool.get(rkey); + Object result = gs.$invoke("Hello", new String[] {}, new Object[] {}); + + if (result != null) { + ServerHttpResponse response = exchange.getResponse(); + return response.writeWith( + Mono.just(response.bufferFactory().wrap(ByteBuffer.wrap(JSON.toJSONString(result).getBytes())))); + } + return chain.filter(exchange); + + // gs.$invoke(method, parameterTypes, args) + }; + } + + + public static class Config { + + private String dubboUri; + + public String getDubboUri() { + return dubboUri; + } + + public void setDubboUri(String dubboUri) { + this.dubboUri = dubboUri; + } + + } + + + +} \ No newline at end of file diff --git a/usergw-service/src/main/java/cn/ecpark/service/usergw/config/ConfigGateway.java b/usergw-service/src/main/java/cn/ecpark/service/usergw/config/ConfigGateway.java index 443cd1b..fc808b1 100644 --- a/usergw-service/src/main/java/cn/ecpark/service/usergw/config/ConfigGateway.java +++ b/usergw-service/src/main/java/cn/ecpark/service/usergw/config/ConfigGateway.java @@ -16,13 +16,13 @@ import org.springframework.cloud.gateway.handler.predicate.PredicateDefinition; import org.springframework.cloud.gateway.route.RouteDefinition; import org.springframework.cloud.gateway.route.RouteDefinitionLocator; import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.reactive.result.method.RequestMappingInfo; -import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping; import org.yaml.snakeyaml.Yaml; +import cn.ecpark.service.usergw.biz.filters.bean.GenericServicePool; +import cn.ecpark.service.usergw.biz.filters.factory.DubboFilterFactory; import cn.ecpark.service.usergw.impl.http.Http2Dubbo; import cn.ecpark.service.usergw.utils.Convert; import lombok.extern.slf4j.Slf4j; @@ -38,9 +38,21 @@ public class ConfigGateway implements RouteDefinitionLocator { // List mediaTypes = new ArrayList<>(); + List defaultFilters; + @Autowired private ApplicationContext applicationContext; + @Bean + public GenericServicePool genericServicePool() { + return new GenericServicePool(); + } + + @Bean + public DubboFilterFactory dubboFilterFactory() { + return new DubboFilterFactory(); + } + @Autowired private Http2Dubbo http2Dubbo; @@ -76,48 +88,62 @@ public class ConfigGateway implements RouteDefinitionLocator { // log.error(e.toString()); // } - Map configDefault = (Map) configYaml.get("default"); - Map configDubbo = (Map) configYaml.get("dubbo"); + Map defaultYaml = (Map) configYaml.get("default"); + Map dubboYaml = (Map) configYaml.get("dubbo"); - if ( configDubbo != null && !configDubbo.isEmpty()) { - this.createHttp2Dubbo(configDubbo); + List routeList = new ArrayList(); + + if (defaultYaml != null && !defaultYaml.isEmpty()) { + defaultFilters = this.getDefaultFilter(defaultYaml); + this.configDefault(routeList, defaultYaml); } - - if( configDefault != null && !configDefault.isEmpty()) { - List result = this.getConfigDefault(configDefault); - if (result != null ) { - return Flux.fromIterable(result); - } + + if (dubboYaml != null && !dubboYaml.isEmpty()) { + this.configHttp2Dubbo(routeList, dubboYaml); } + + if (!routeList.isEmpty()) { + return Flux.fromIterable(routeList); + } + } } + return Flux.empty(); // return Flux.fromIterable(it); } @SuppressWarnings("unchecked") - private List getConfigDefault(Map configDefault) { - if (configDefault != null) { - List RDList = new ArrayList<>(); + private List getDefaultFilter(Map defaultYaml) { + List filters = new ArrayList<>(); - List filters = new ArrayList<>(); - List predicates = new ArrayList<>(); - - // default-filters: 下的相关设置 - Object unknownDefaultFilters = configDefault.get("default-filters"); - if (unknownDefaultFilters != null) { - List defaultFiltersYaml = (ArrayList) unknownDefaultFilters; - for (String filterString : defaultFiltersYaml) { - filters.add(new FilterDefinition(filterString)); - } + // default-filters: 下的相关设置 + Object unknownDefaultFilters = defaultYaml.get("default-filters"); + if (unknownDefaultFilters != null) { + List defaultFiltersYaml = (ArrayList) unknownDefaultFilters; + for (String filterString : defaultFiltersYaml) { + filters.add(new FilterDefinition(filterString)); } + } - Object unknownRoutes = configDefault.get("routes"); + return filters; + } + + @SuppressWarnings("unchecked") + private void configDefault(List routeList, Map defaultYaml) { + if (defaultYaml != null) { + + Object unknownRoutes = defaultYaml.get("routes"); if (unknownRoutes != null) { List>> routes = (ArrayList>>) unknownRoutes; for (LinkedHashMap> iter : routes) { + + List filters = new ArrayList<>(); + List predicates = new ArrayList<>(); RouteDefinition rd = new RouteDefinition(); - + + + // 设置基础属性 this.ParseAndSetBase(rd, iter); @@ -130,37 +156,34 @@ public class ConfigGateway implements RouteDefinitionLocator { rd.setPredicates(predicates); rd.setFilters(filters); - RDList.add(rd); + routeList.add(rd); } - - return RDList; } } - - return null; } private void ParseAndSetBase(RouteDefinition rd, LinkedHashMap> iter) { - // 设置id - Object id = iter.get("id"); - if (id != null) { - rd.setId((String) id); - } + // 设置id + Object id = iter.get("id"); + if (id != null) { + rd.setId((String) id); + } - // 设置uri - Object uri = iter.get("uri"); - if (uri != null) { - rd.setUri(URI.create((String) uri)); - } + // 设置uri + Object uri = iter.get("uri"); + if (uri != null) { + rd.setUri(URI.create((String) uri)); + } - // 设置uri - Object order = iter.get("order"); - if (order != null) { - rd.setOrder((int) order); - } + // 设置uri + Object order = iter.get("order"); + if (order != null) { + rd.setOrder((int) order); + } } - private void ParseAndAddPredicates(List predicates, LinkedHashMap> iter, String yamlField) { + private void ParseAndAddPredicates(List predicates, LinkedHashMap> iter, + String yamlField) { List predicatesYaml = iter.get(yamlField); if (predicatesYaml != null) { for (String predicateString : predicatesYaml) { @@ -169,25 +192,53 @@ public class ConfigGateway implements RouteDefinitionLocator { } } - private void ParseAndAddFilters(List filters, LinkedHashMap> iter, String yamlField) { + private void ParseAndAddFilters(List filters, LinkedHashMap> iter, + String yamlField) { List filtersYaml = iter.get(yamlField); if (filtersYaml != null) { + filters.addAll(defaultFilters); for (String filterString : filtersYaml) { filters.add(new FilterDefinition(filterString)); } } } - - private void createHttp2Dubbo(Map configDubbo) { + private void configHttp2Dubbo(List routeList, Map configDubbo) { try { - - RequestMappingHandlerMapping requestMapping = (RequestMappingHandlerMapping) applicationContext - .getBean("requestMappingHandlerMapping"); - - requestMapping.registerMapping(RequestMappingInfo.paths("/test/xixi").methods(RequestMethod.POST).build(), - http2Dubbo, Http2Dubbo.class.getDeclaredMethod("H2DTest")); + + Object unknownRoutes = configDubbo.get("routes"); + if (unknownRoutes != null) { + GenericServicePool pool = applicationContext.getBean(GenericServicePool.class); + + List>> routes = (ArrayList>>) unknownRoutes; + for (LinkedHashMap> iter : routes) { + + List filters = new ArrayList<>(); + List predicates = new ArrayList<>(); + RouteDefinition rd = new RouteDefinition(); + + this.ParseAndSetBase(rd, iter); + + // 设置基础属性 + this.ParseAndSetBase(rd, iter); + + // predicates: 下的相关属性 + this.ParseAndAddPredicates(predicates, iter, "predicates"); + + // filters: 下的相关属性 + this.ParseAndAddFilters(filters, iter, "filters"); + + } + + } + + // RequestMappingHandlerMapping requestMapping = (RequestMappingHandlerMapping) + // applicationContext + // .getBean("requestMappingHandlerMapping"); + + // requestMapping.registerMapping(RequestMappingInfo.paths("/test/xixi").methods(RequestMethod.POST).build(), + // http2Dubbo, Http2Dubbo.class.getDeclaredMethod("H2DTest")); // 引用远程服务 // try { @@ -201,6 +252,7 @@ public class ConfigGateway implements RouteDefinitionLocator { } catch (Exception e) { // TODO: 任何错误都直接终止退出 + log.error(e.toString()); SpringApplication.exit(applicationContext); } diff --git a/usergw-service/src/main/resources/gateway.yaml b/usergw-service/src/main/resources/gateway.yaml index 1192c8d..29b196f 100644 --- a/usergw-service/src/main/resources/gateway.yaml +++ b/usergw-service/src/main/resources/gateway.yaml @@ -17,7 +17,14 @@ default: dubbo: routes: - id: test - path: /dubbo/hello + order: 10 + application: dubbo-exchange + uri: http://httpbin.org:80/get + # path: /dubbo/hello interface: ocean.demo.api.IExchange version: 1.0.0 + predicates: + - Path=/dubbo/hello + filters: + - Dubbo=234 \ No newline at end of file