SpringClound Gateway 自定义负载均衡
这里前提已知大家都知道负载的最终执行方法时 choose方法,我们需要自定义实现这个choose方法即可。
这里就是框架自带的几种均衡策略,我们需要实现AbstractLoadBalancerRule 来自定义自己的方案。
LocalBalance
在这个类中无法获取请求的内容,可以使用过滤器来为此添加一个Object key,根据key值来做均衡。
public class LocalBalance extends AbstractLoadBalancerRule {
@Value("${backend.ip}")
private String ip;
@Value("${backend.redis}")
private String redisKey;
@Autowired
RedisUtil redisUtil;
public LocalBalance() {
}
@Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {
}
public Server choose(ILoadBalancer lb,Object key) {
String backendRequest = (String) o;
Object hget = redisUtil.hget(redisKey, backendRequest);
if (lb == null) {
return null;
} else {
Server server = null;
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
if (hget != null) {
for (Server server1 : upList) {
if (ip.equalsIgnoreCase(server1.getId())) {
return server1;
}
}
} else {
int abs = Math.abs(backendRequest.hashCode() % (upList.size()));
for (int i = abs; i < upList.size(); i++) {
Server server1 = upList.get(i);
if (server1.getId().equalsIgnoreCase(ip)) {
continue;
}
return server1;
}
for (int i = 0; i < abs; i++) {
Server server1 = upList.get(i);
if (server1.getId().equalsIgnoreCase(ip)) {
continue;
}
return server1;
}
}
return server;
}
}
@Override
public Server choose(Object o) {
return choose(getLoadBalancer(),o);
}
}
\LoadbalancerClientFilter:\**使用ribbon负载均衡,默认使用该类(已不推荐使用)****
/** @deprecated
"You already have RibbonLoadBalancerClient on your classpath. It will be used by default.
As Spring Cloud Ribbon is in maintenance mode. We recommend switching to " + BlockingLoadBalancerClient.class.getSimpleName() + " instead.
In order to use it, set the value of `spring.cloud.loadbalancer.ribbon.enabled` to `false`
or remove spring-cloud-starter-netflix-ribbon from your project."
*/
@Deprecated
public class LoadBalancerClientFilter implements GlobalFilter, Ordered {
public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;
private static final Log log = LogFactory.getLog(LoadBalancerClientFilter.class);
protected final LoadBalancerClient loadBalancer;
private LoadBalancerProperties properties;
public LoadBalancerClientFilter(LoadBalancerClient loadBalancer, LoadBalancerProperties properties) {
this.loadBalancer = loadBalancer;
this.properties = properties;
}
public int getOrder() {
return 10100;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
ServiceInstance instance = this.choose(exchange);
if (instance == null) {
throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost());
} else {
URI uri = exchange.getRequest().getURI();
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
URI requestUrl = this.loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
} else {
return chain.filter(exchange);
}
}
protected ServiceInstance choose(ServerWebExchange exchange) {
return this.loadBalancer.choose(((URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR)).getHost());
}
}
这里使用 LoadbalancerClientFilter,虽然已被弃用,但是这里只做简单的例子,可以直接使用ReactiveLoadbalancerClientFilter,原理一样。
@Component
@Slf4j
public class StoreRouterLocator extends LoadBalancerClientFilter {
@Autowired
GatewayConfig gatewayConfig;
@Value("${backend.replace_pre}")
private String replacePre;
@Value("${backend.replace_for}")
private String replaceFor;
@Autowired
RedisUtil redisUtil;
@Value("${backend.ip}")
private String ip;
@Value("${backend.redis}")
private String redisKey;
public StoreRouterLocator(LoadBalancerClient loadBalancer, LoadBalancerProperties properties) {
super(loadBalancer, properties);
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//从请求头中取出token
String token = "";
ServerHttpRequest request = exchange.getRequest();
RequestPath path1 = request.getPath();
boolean isBackend = false;
Object storeCode = null;
if (path1.toString().contains("web/v1")) {
token = exchange.getRequest().getHeaders().getFirst("Authorization");
//取出token包含的身份
Map userInfo = verifyJWT(token);
storeCode = userInfo.get("storeCode");
Object hget = redisUtil.hget(redisKey, (String) storeCode);
if (hget != null) {
isBackend = true;
}
} else {
return super.filter(exchange, chain);
}
URI url = (URI) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = (String) exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
ServiceInstance instance;
instance = this.choose(exchange, (String) storeCode);
if (instance == null) {
throw NotFoundException.create(true, "Unable to find instance for " + url.getHost());
} else {
URI uri = exchange.getRequest().getURI();
String path = uri.getPath();
if (StringUtils.isNotEmpty(replacePre) && isBackend) {
String newPath = path.replaceAll(replacePre, replaceFor);
ServerHttpRequest newRequest = exchange.getRequest().mutate()
.path(newPath)
.build();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI());
uri = newRequest.getURI();
}
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
URI requestUrl = this.loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
} else {
return chain.filter(exchange);
}
}
protected ServiceInstance choose(ServerWebExchange exchange, String storeCode) {
if (this.loadBalancer instanceof RibbonLoadBalancerClient) {
RibbonLoadBalancerClient client = (RibbonLoadBalancerClient) this.loadBalancer;
String serviceId = ((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost();
//这里使用storeCode做为选择服务实例的key
return client.choose(serviceId, storeCode);
}
return super.choose(exchange);
}
根据请求的信息cookie,或者请求头来选择服务,这里在选择服务的时候回去寻找上面写的Rule类,通过自定义的key值来实现Rule。
配置文件
这里使用手动配置服务来举例,可以用注册中心代替,原理一样。
spring:
application:
name: ai-forecast-gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
# 总部端验证服务
- id: ai-XXX-1
predicates:
- Path=/ ai-XXX-1-service/**
uri: lb:// ai-XXX-1-service
filters:
- AiWeb
# 总部端业务服务
- id: ai-XXX-2
predicates:
- Path=/ai-XXX-2/**
uri: lb://ai-XXX-2-service
filters:
- AiWeb
- id: ai-XXX-3
predicates:
- Path=/ai-XXX-3-service/rpc/**
- name: ReadBodyPredicateFactory #使用ReadBodyPredicateFactory断言,将body读入缓存
args:
inClass: '#{T(String)}'
predicate: '#{@bodyPredicate}' #注入实现predicate接口类
uri: lb://ai-XXX-3
filters:
- ReadBody=true #使用自定义的过滤器工厂类,读取request body内容
- ThirdFilter
- name: RequestRateLimiter #基于redis的Gateway的自身限流
args:
redis-rate-limiter.replenishRate: 1 # 允许用户每秒处理多少个请求
redis-rate-limiter.burstCapacity: 1 # 令牌桶的容量,允许在一秒钟内完成的最大请求数
key-resolver: "#{@ipKeyResolver}" #SPEL表达式取的对应的bean
favicon:
enabled: false
mvc:
favicon:
enabled: false
ai-XXX-1:
ribbon:
# 服务列表
listOfServers: http://10.12.167.89:9111/web/v1/,10.12.167.174:10002/
NFLoadBalancerRuleClassName: XX.filter.LocalBalance
# 10s
ConnectTimeout: 10000
# 10min
ReadTimeout: 600000
# 最大的连接
MaxTotalHttpConnections: 500
# 每个实例的最大连接
MaxConnectionsPerHost: 300
ai-XXX-2:
ribbon:
# 服务列表
listOfServers: http://10.218.223.249:9089/
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule
# 10s
ConnectTimeout: 10000
# 10min
ReadTimeout: 600000
# 最大的连接
MaxTotalHttpConnections: 500
# 每个实例的最大连接
MaxConnectionsPerHost: 300
最后
我们需要将Irule配置为RibbonClients才能生效,不可以直接在自定义Irule上加上注解@Component,这样会出现服务错乱的问题[如果配置文件中只有单router配置,不会出现问题]。
@RibbonClients(defaultConfiguration = LocalBalance.class)
public class BalanceConfig {
@Primary
@Bean(value = "ipKeyResolver")
KeyResolver ipKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
//return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
//return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
@Bean(value = "apiKeyResolver")
KeyResolver apiKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getPath().value());
}
/**
* 注入ReadBody过滤器
* @return
*/
@Bean
public ReadBodyGatewayFilterFactory readBodyGatewayFilterFactory() {
return new ReadBodyGatewayFilterFactory();
}
/**
* 用于readBody断言,可配置到yml
* @return
*/
@Bean
public Predicate bodyPredicate(){
return o -> true;
}
@Bean
public IRule localBalance(){
return new LocalBalance();
}
}
ReadBody过滤器 用于获取请求体的过滤器,使用时,直接
即可exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
@Slf4j
public class ReadBodyGatewayFilterFactory
extends AbstractGatewayFilterFactory<ReadBodyGatewayFilterFactory.Config> {
private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject";
public ReadBodyGatewayFilterFactory() {
super(Config.class);
}
@Override
public GatewayFilter apply(Config config) {
return ((exchange, chain) -> {
//利用ReadBodyPredicateFactory断言,会将body读入exchange的cachedRequestBodyObject中
Object requestBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
log.info("request body is:{}", requestBody);
return chain.filter(exchange);
});
}
@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList("withParams");//将参数放入
}
public static class Config {
private boolean withParams;//接收配置的参数值,可以随便写
public boolean isWithParams() {
return withParams;
}
public void setWithParams(boolean withParams) {
this.withParams = withParams;
}
}
}
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END