SpringClound Gateway 自定义负载均衡

SpringClound Gateway 自定义负载均衡

SpringClound Gateway 自定义负载均衡

这里前提已知大家都知道负载的最终执行方法时 choose方法,我们需要自定义实现这个choose方法即可。

image-20230815080101527

这里就是框架自带的几种均衡策略,我们需要实现AbstractLoadBalancerRule 来自定义自己的方案。

LocalBalance

在这个类中无法获取请求的内容,可以使用过滤器来为此添加一个Object key,根据key值来做均衡。

public class LocalBalance extends AbstractLoadBalancerRule {
    @Value("${backend.ip}")
    private String ip;

    @Value("${backend.redis}")
    private String redisKey;

    @Autowired
    RedisUtil redisUtil;

    public LocalBalance() {
    }

    @Override
    public void initWithNiwsConfig(IClientConfig iClientConfig) {

    }

    public Server choose(ILoadBalancer lb,Object key) {
        String backendRequest = (String) o;
        Object hget = redisUtil.hget(redisKey, backendRequest);
        if (lb == null) {
            return null;
        } else {
            Server server = null;

            if (Thread.interrupted()) {
                return null;
            }

            List<Server> upList = lb.getReachableServers();
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();
            if (serverCount == 0) {
                return null;
            }
            if (hget != null) {
                for (Server server1 : upList) {
                    if (ip.equalsIgnoreCase(server1.getId())) {
                        return server1;
                    }
                }
            } else {
                int abs = Math.abs(backendRequest.hashCode() % (upList.size()));
                for (int i = abs; i < upList.size(); i++) {
                    Server server1 = upList.get(i);
                    if (server1.getId().equalsIgnoreCase(ip)) {
                        continue;
                    }
                    return server1;
                }

                for (int i = 0; i < abs; i++) {
                    Server server1 = upList.get(i);
                    if (server1.getId().equalsIgnoreCase(ip)) {
                        continue;
                    }
                    return server1;
                }
            }

            return server;
        }
    }

    @Override
    public Server choose(Object o) {
        return choose(getLoadBalancer(),o);
    }
}

\LoadbalancerClientFilter:\**使用ribbon负载均衡,默认使用该类(已不推荐使用)****

/** @deprecated
"You already have RibbonLoadBalancerClient on your classpath. It will be used by default. 
As Spring Cloud Ribbon is in maintenance mode. We recommend switching to " + BlockingLoadBalancerClient.class.getSimpleName() + " instead. 
In order to use it, set the value of `spring.cloud.loadbalancer.ribbon.enabled` to `false` 
or remove spring-cloud-starter-netflix-ribbon from your project."
*/
@Deprecated
public class LoadBalancerClientFilter implements GlobalFilter, Ordered {
    public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;
    private static final Log log = LogFactory.getLog(LoadBalancerClientFilter.class);
    protected final LoadBalancerClient loadBalancer;
    private LoadBalancerProperties properties;

    public LoadBalancerClientFilter(LoadBalancerClient loadBalancer, LoadBalancerProperties properties) {
        this.loadBalancer = loadBalancer;
        this.properties = properties;
    }

    public int getOrder() {
        return 10100;
    }

    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
        if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
            if (log.isTraceEnabled()) {
                log.trace("LoadBalancerClientFilter url before: " + url);
            }

            ServiceInstance instance = this.choose(exchange);
            if (instance == null) {
                throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost());
            } else {
                URI uri = exchange.getRequest().getURI();
                String overrideScheme = instance.isSecure() ? "https" : "http";
                if (schemePrefix != null) {
                    overrideScheme = url.getScheme();
                }

                URI requestUrl = this.loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
                if (log.isTraceEnabled()) {
                    log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
                }

                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                return chain.filter(exchange);
            }
        } else {
            return chain.filter(exchange);
        }
    }

    protected ServiceInstance choose(ServerWebExchange exchange) {
        return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost());
    }
}

这里使用 LoadbalancerClientFilter,虽然已被弃用,但是这里只做简单的例子,可以直接使用ReactiveLoadbalancerClientFilter,原理一样。

@Component
@Slf4j
public class StoreRouterLocator extends LoadBalancerClientFilter {
    @Autowired
    GatewayConfig gatewayConfig;

    @Value("${backend.replace_pre}")
    private String replacePre;

    @Value("${backend.replace_for}")
    private String replaceFor;

    @Autowired
    RedisUtil redisUtil;

    @Value("${backend.ip}")
    private String ip;

    @Value("${backend.redis}")
    private String redisKey;

    public StoreRouterLocator(LoadBalancerClient loadBalancer, LoadBalancerProperties properties) {
        super(loadBalancer, properties);
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //从请求头中取出token
        String token = "";
        ServerHttpRequest request = exchange.getRequest();
        RequestPath path1 = request.getPath();
        boolean isBackend = false;
        Object storeCode = null;
        if (path1.toString().contains("web/v1")) {
            token = exchange.getRequest().getHeaders().getFirst("Authorization");
            //取出token包含的身份
            Map userInfo = verifyJWT(token);
            storeCode = userInfo.get("storeCode");
            Object hget = redisUtil.hget(redisKey, (String) storeCode);
            if (hget != null) {
                isBackend = true;
            }
        } else {
            return super.filter(exchange, chain);
        }

        URI url = (URI) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = (String) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
        if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
            if (log.isTraceEnabled()) {
                log.trace("LoadBalancerClientFilter url before: " + url);
            }
            ServiceInstance instance;
            instance = this.choose(exchange, (String) storeCode);

            if (instance == null) {
                throw NotFoundException.create(true, "Unable to find instance for " + url.getHost());
            } else {

                URI uri = exchange.getRequest().getURI();
                String path = uri.getPath();
                if (StringUtils.isNotEmpty(replacePre) && isBackend) {
                    String newPath = path.replaceAll(replacePre, replaceFor);
                    ServerHttpRequest newRequest = exchange.getRequest().mutate()
                            .path(newPath)
                            .build();
                    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI());
                    uri = newRequest.getURI();
                }
                String overrideScheme = instance.isSecure() ? "https" : "http";
                if (schemePrefix != null) {
                    overrideScheme = url.getScheme();
                }

                URI requestUrl = this.loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
                if (log.isTraceEnabled()) {
                    log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
                }
                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                return chain.filter(exchange);
            }
        } else {
            return chain.filter(exchange);
        }
    }

    protected ServiceInstance choose(ServerWebExchange exchange, String storeCode) {

        if (this.loadBalancer instanceof RibbonLoadBalancerClient) {
            RibbonLoadBalancerClient client = (RibbonLoadBalancerClient) this.loadBalancer;
            String serviceId = ((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost();
            //这里使用storeCode做为选择服务实例的key
            return client.choose(serviceId, storeCode);

        }
        return super.choose(exchange);
    }

根据请求的信息cookie,或者请求头来选择服务,这里在选择服务的时候回去寻找上面写的Rule类,通过自定义的key值来实现Rule。

配置文件

这里使用手动配置服务来举例,可以用注册中心代替,原理一样。

spring:
  application:
    name: ai-forecast-gateway
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true
      routes:
        # 总部端验证服务
        - id: ai-XXX-1
          predicates:
            - Path=/ ai-XXX-1-service/**
          uri: lb:// ai-XXX-1-service
          filters:
            - AiWeb
        # 总部端业务服务
        - id:  ai-XXX-2
          predicates:
            - Path=/ai-XXX-2/**
          uri: lb://ai-XXX-2-service
          filters:
            - AiWeb
        - id: ai-XXX-3
          predicates:
            - Path=/ai-XXX-3-service/rpc/**
            - name: ReadBodyPredicateFactory #使用ReadBodyPredicateFactory断言,将body读入缓存
              args:
                inClass: '#{T(String)}'
                predicate: '#{@bodyPredicate}' #注入实现predicate接口类
          uri: lb://ai-XXX-3
          filters:
            - ReadBody=true #使用自定义的过滤器工厂类,读取request body内容
            - ThirdFilter
            - name: RequestRateLimiter #基于redis的Gateway的自身限流
              args:
                redis-rate-limiter.replenishRate: 1  # 允许用户每秒处理多少个请求
                redis-rate-limiter.burstCapacity: 1  # 令牌桶的容量,允许在一秒钟内完成的最大请求数
                key-resolver: "#{@ipKeyResolver}" #SPEL表达式取的对应的bean
  favicon:
    enabled: false
  mvc:
    favicon:
      enabled: false

ai-XXX-1:
  ribbon:
    # 服务列表
    listOfServers: http://10.12.167.89:9111/web/v1/,10.12.167.174:10002/
    NFLoadBalancerRuleClassName: XX.filter.LocalBalance
    # 10s
    ConnectTimeout: 10000
    # 10min
    ReadTimeout: 600000
    # 最大的连接
    MaxTotalHttpConnections: 500
    # 每个实例的最大连接
    MaxConnectionsPerHost: 300

ai-XXX-2:
  ribbon:
    # 服务列表
    listOfServers: http://10.218.223.249:9089/
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule
    # 10s
    ConnectTimeout: 10000
    # 10min
    ReadTimeout: 600000
    # 最大的连接
    MaxTotalHttpConnections: 500
    # 每个实例的最大连接
    MaxConnectionsPerHost: 300

最后

我们需要将Irule配置为RibbonClients才能生效,不可以直接在自定义Irule上加上注解@Component,这样会出现服务错乱的问题[如果配置文件中只有单router配置,不会出现问题]。

@RibbonClients(defaultConfiguration = LocalBalance.class)
public class BalanceConfig {

    @Primary
    @Bean(value = "ipKeyResolver")
    KeyResolver ipKeyResolver() {
        return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
        //return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
        //return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
    }

    @Bean(value = "apiKeyResolver")
    KeyResolver apiKeyResolver() {
        return exchange -> Mono.just(exchange.getRequest().getPath().value());
    }

    /**
     * 注入ReadBody过滤器
     * @return
     */
    @Bean
    public ReadBodyGatewayFilterFactory readBodyGatewayFilterFactory() {
        return new ReadBodyGatewayFilterFactory();
    }
    /**
     * 用于readBody断言,可配置到yml
     * @return
     */
    @Bean
    public Predicate bodyPredicate(){
        return o -> true;
    }

    @Bean
    public IRule localBalance(){
        return new LocalBalance();
    }
}

ReadBody过滤器 用于获取请求体的过滤器,使用时,直接 exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);即可

@Slf4j
public class ReadBodyGatewayFilterFactory
        extends AbstractGatewayFilterFactory<ReadBodyGatewayFilterFactory.Config> {

    private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject";

    public ReadBodyGatewayFilterFactory() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return ((exchange, chain) -> {
            //利用ReadBodyPredicateFactory断言,会将body读入exchange的cachedRequestBodyObject中
            Object requestBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
            log.info("request body is:{}", requestBody);

            return chain.filter(exchange);
        });
    }

    @Override
    public List<String> shortcutFieldOrder() {
        return Arrays.asList("withParams");//将参数放入
    }

    public static class Config {
        private boolean withParams;//接收配置的参数值,可以随便写

        public boolean isWithParams() {
            return withParams;
        }

        public void setWithParams(boolean withParams) {
            this.withParams = withParams;
        }
    }
}
© 版权声明
THE END
喜欢就支持一下吧
点赞15 分享
评论 抢沙发

请登录后发表评论