|
大家好,我是不才陳某~ 最近知識(shí)星球的球友在學(xué)習(xí)星球中的《精盡Spring Cloud Alibaba》專欄提到一個(gè)問(wèn)題,相信也有很多人在線上環(huán)境遇到過(guò),或許也因此被批過(guò):一個(gè)集群中有某個(gè)服務(wù)突然下線,但是網(wǎng)關(guān)還是會(huì)去請(qǐng)求這個(gè)實(shí)例,所以線上就報(bào)錯(cuò)了,報(bào)錯(cuò)信息如下圖:  究其原因到底為何呢?有沒(méi)有一種靠譜的解決方案呢?別著急,往下看 產(chǎn)生原因Gateway中有個(gè)緩存 CachingRouteLocator ,而網(wǎng)關(guān)服務(wù)使用的是lb模式,服務(wù)在上線或者下線之后,未能及時(shí)刷新這個(gè)緩存,相應(yīng)的源碼如下: public class CachingRouteLocator implements Ordered, RouteLocator, ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware {
private static final Log log = LogFactory.getLog(CachingRouteLocator.class);
private static final String CACHE_KEY = "routes";
private final RouteLocator delegate;
private final Flux<Route> routes;
private final Map<String, List> cache = new ConcurrentHashMap<>();
private ApplicationEventPublisher applicationEventPublisher;
public CachingRouteLocator(RouteLocator delegate) { this.delegate = delegate; routes = CacheFlux.lookup(cache, CACHE_KEY, Route.class) .onCacheMissResume(this::fetch); }
private Flux<Route> fetch() { return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE); }
@Override public Flux<Route> getRoutes() { return this.routes; }
/** * Clears the routes cache. * @return routes flux */ public Flux<Route> refresh() { this.cache.clear(); return this.routes; }
@Override public void onApplicationEvent(RefreshRoutesEvent event) { try { fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list) .materialize().collect(Collectors.toList()).subscribe(signals -> { applicationEventPublisher .publishEvent(new RefreshRoutesResultEvent(this)); cache.put(CACHE_KEY, signals); }, throwable -> handleRefreshError(throwable))); } catch (Throwable e) { handleRefreshError(e); } }
private void handleRefreshError(Throwable throwable) { if (log.isErrorEnabled()) { log.error("Refresh routes error !!!", throwable); } applicationEventPublisher .publishEvent(new RefreshRoutesResultEvent(this, throwable)); }
@Deprecated /* for testing */ void handleRefresh() { refresh(); }
@Override public int getOrder() { return 0; }
@Override public void setApplicationEventPublisher( ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } }
那么解決方案就自然能夠想出來(lái),只需要在服務(wù)下線時(shí)能夠去實(shí)時(shí)的刷新這個(gè)緩存自然就解決了 解決方案這里通過(guò)去監(jiān)聽(tīng) Nacos 實(shí)例刷新事件,一旦出現(xiàn)實(shí)例發(fā)生變化馬上刪除緩存。在刪除負(fù)載均衡緩存后,Spring Cloud Gateway 在處理請(qǐng)求時(shí)發(fā)現(xiàn)沒(méi)有緩存會(huì)重新拉取一遍服務(wù)列表,這樣之后都是用的是最新的服務(wù)列表了,也就達(dá)到了我們動(dòng)態(tài)感知上下線的目的。 代碼如下: @Component @Slf4j public class NacosInstancesChangeEventListener extends Subscriber<InstancesChangeEvent> { @Resource private CacheManager defaultLoadBalancerCacheManager;
@Override public void onEvent(InstancesChangeEvent event) { log.info("Spring Gateway 接收實(shí)例刷新事件:{}, 開(kāi)始刷新緩存", JacksonUtils.toJson(event)); Cache cache = defaultLoadBalancerCacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME); if (cache != null) { cache.evict(event.getServiceName()); } log.info("Spring Gateway 實(shí)例刷新完成"); }
@Override public Class<? extends com.alibaba.nacos.common.notify.Event> subscribeType() { return InstancesChangeEvent.class; } }
這里通過(guò)繼承的方式監(jiān)聽(tīng) Nacos 的 InstancesChangeEvent,在 onEvent 接收到實(shí)例刷新的信息后直接刪除對(duì)應(yīng)服務(wù)的負(fù)載均衡緩存,緩存的名字是定義在 Spring Gateway 的相關(guān)代碼中的,直接引入即可,Cache 則是繼承自 Spring Cache 接口,負(fù)載均衡緩存也繼承了 Cache 接口,有了 Cache 接口就可以直接使用其接口定義的 evict 方法即可,而緩存的 key 名就則就是服務(wù)名,在 InstancesChangeEvent 中,通過(guò) getServiceName 就可以得到服務(wù)名。 這里就不演示了,有興趣的小伙伴可以測(cè)試一下 最后說(shuō)一句(別白嫖,求關(guān)注)陳某每一篇文章都是精心輸出,如果這篇文章對(duì)你有所幫助,或者有所啟發(fā)的話,幫忙點(diǎn)贊、在看、轉(zhuǎn)發(fā)、收藏,你的支持就是我堅(jiān)持下去的最大動(dòng)力!
|