欢迎访问Spring Cloud中国社区

《重新定义Spring Cloud实战》由Spring Cloud中国社区倾力打造,基于Spring Cloud的Finchley.RELEASE版本,本书内容宽度足够广、深度足够深,而且立足于生产实践,直接从生产实践出发,包含大量生产实践的配置。欢迎加微信Software_King进群答疑,国内谁在使用Spring Cloud?欢迎登记

Zuul2源码分析

yannxia · 4月前 · 2055 ·

Zuul2

Zuul2的难产,终于在 2018.4.13 上架了中心仓库,也代表着Zuul正式加入Netty全家桶的怀抱,关于 Zuul2 有一篇宏观性的博文有兴趣的可以阅读 Zuul 2 : The Netflix Journey to Asynchronous, Non-Blocking Systems 此篇博客。

简而言之,Zuul2也就是从传统的 BIO 切换到了 NIO 模式。
bio-thread
传统的BIO模型基于Thread的方式

nio-thread
NIO模型基于Reactor模型

Zuul2 架构

从上帝视角来开,Zuul2是一个在 Netty 上运行一系列Filter的服务,执行完成PreFilter (inbound filters)之后将请求通过 Netty Client 转发出去,然后将请求的结果通过一系列PostFilter (outbound filters) 返回,如下图所示。
Architectural-over

正如之前的 ZuulFilter 分为了 Pre,Post,Route,Error,Zuul2的Filter分为三种类型

  • Inbound Filters: 在路由之前执行
  • Endpoint Filters: 路由操作
  • Outbound Filters: 得到相应数据之后执行

我们用官方的Demo进行分析 zuul-sample,诸位看官自行下载导入。

ServerStartup

在Demo的启动中,我们发现启动的入口

  1. {
  2. ConfigurationManager.loadCascadedPropertiesFromResources("application");
  3. Injector injector = InjectorBuilder.fromModule(new ZuulSampleModule()).createInjector();
  4. BaseServerStartup serverStartup = injector.getInstance(BaseServerStartup.class);
  5. server = serverStartup.server();
  6. long startupDuration = System.currentTimeMillis() - startTime;
  7. System.out.println("Zuul Sample: finished startup. Duration = " + startupDuration + " ms");
  8. server.start(true);
  9. }

➊ 获得系统的一些配置参数
➋ Demo中使用的是Google的Guice进行依赖注入的,这个就展开了有兴趣的可以自行去搜索
➌ 启动一个Zuul2服务

我们从这个 start() 作为我们的突破口,

  1. public void start(boolean sync){
  2. serverGroup = new ServerGroup("Salamander", eventLoopConfig.acceptorCount(), eventLoopConfig.eventLoopCount(), eventLoopGroupMetrics);
  3. serverGroup.initializeTransport();
  4. try {
  5. List<ChannelFuture> allBindFutures = new ArrayList<>();
  6. // Setup each of the channel initializers on requested ports.
  7. for (Map.Entry<Integer, ChannelInitializer> entry : portsToChannelInitializers.entrySet())
  8. {
  9. allBindFutures.add(setupServerBootstrap(entry.getKey(), entry.getValue()));
  10. }
  11. // Once all server bootstraps are successfully initialized, then bind to each port.
  12. }
  13. catch (InterruptedException e) {
  14. }
  15. }

➊ 构建一个新的 ServerGroup
setupServerBootstrap() 初始化一个 ServerBootstrap,根据之前Netty的分析,我们知道Netty需要使用 ServerBootstrap 进行 端口绑定,那这里是不是就是那个东西。
我们继续深入

  1. private ChannelFuture setupServerBootstrap(int port, ChannelInitializer channelInitializer)
  2. throws InterruptedException{
  3. ServerBootstrap serverBootstrap = new ServerBootstrap().group(
  4. serverGroup.clientToProxyBossPool,
  5. serverGroup.clientToProxyWorkerPool);
  6. serverBootstrap.childHandler(channelInitializer);
  7. serverBootstrap.validate();
  8. LOG.info("Binding to port: " + port);
  9. // Flag status as UP just before binding to the port.
  10. serverStatusManager.localStatus(InstanceInfo.InstanceStatus.UP);
  11. // Bind and start to accept incoming connections.
  12. return serverBootstrap.bind(port).sync();
  13. }

在 ➊ 构建了一个 ServerBootstrap 这个正是Netty的启动类
➋ 正如Netty的启动中的处理数据的 Handler 那这里应该也就是Zuul处理的核心所在
➌ 一个我们的老朋友,和Eureka集成时改变服务器状态
➍ 绑定 ServerNioSockertChannel 不再多做分析

1/3 休息

我们已经发现了Zuul2是如何启动一个Netty服务的,我们解决了图中红框部分的原理,那我们接下来去了解最为重要的这些Filter是如何工作的,我们上启动中已经发现一个很重要的对象 channelInitializer 我们知道在Netty中,是将一系列的 Handler 聚合在一起并使用 Pipeline 执行(参考Netty源码分析-(3)-ChannelPipeline)我们可以猜测 zuul 的做法是类型,我们从这个 channelInitializer 入手去研究。

ZuulServerChannelInitializer

我们轻而易举的可以发现 ChannelInitializer 其实是 ZuulServerChannelInitializer 对象。在initChannel中我们发现了

  1. @Override
  2. protected void initChannel(Channel ch) throws Exception{
  3. ChannelPipeline pipeline = ch.pipeline();
  4. storeChannel(ch);
  5. addTimeoutHandlers(pipeline);
  6. addPassportHandler(pipeline);
  7. addTcpRelatedHandlers(pipeline);
  8. addHttp1Handlers(pipeline);
  9. addHttpRelatedHandlers(pipeline);
  10. addZuulHandlers(pipeline);
  11. }

在➊前面的都比较简单都是一些标准的 Handler 大家可以自己阅读,最为重要是 addZuulHandlers(pipeline); 这个函数,我们继续深入。

  1. protected void addZuulHandlers(final ChannelPipeline pipeline){
  2. pipeline.addLast("logger", nettyLogger);
  3. pipeline.addLast(new ClientRequestReceiver(sessionContextDecorator));
  4. pipeline.addLast(passportLoggingHandler);
  5. addZuulFilterChainHandler(pipeline);
  6. pipeline.addLast(new ClientResponseWriter(requestCompleteHandler, registry));
  7. }

上面的都很容易看出来,是日志,Session之类的Handler,最为重要的是 ➋ 处增加 ZuulFilter。

  1. protected void addZuulFilterChainHandler(final ChannelPipeline pipeline) {
  2. final ZuulFilter<HttpResponseMessage, HttpResponseMessage>[] responseFilters = getFilters(
  3. new OutboundPassportStampingFilter(FILTERS_OUTBOUND_START),
  4. new OutboundPassportStampingFilter(FILTERS_OUTBOUND_END));
  5. // response filter chain
  6. final ZuulFilterChainRunner<HttpResponseMessage> responseFilterChain = getFilterChainRunner(responseFilters,
  7. filterUsageNotifier);
  8. // endpoint | response filter chain
  9. final FilterRunner<HttpRequestMessage, HttpResponseMessage> endPoint = getEndpointRunner(responseFilterChain,
  10. filterUsageNotifier, filterLoader);
  11. final ZuulFilter<HttpRequestMessage, HttpRequestMessage>[] requestFilters = getFilters(
  12. new InboundPassportStampingFilter(FILTERS_INBOUND_START),
  13. new InboundPassportStampingFilter(FILTERS_INBOUND_END));
  14. // request filter chain | end point | response filter chain
  15. final ZuulFilterChainRunner<HttpRequestMessage> requestFilterChain = getFilterChainRunner(requestFilters,
  16. filterUsageNotifier, endPoint);
  17. pipeline.addLast(new ZuulFilterChainHandler(requestFilterChain, responseFilterChain));
  18. }

从 ➊ 深入可以看到

  1. public <T extends ZuulMessage> ZuulFilter<T, T> [] getFilters(final ZuulFilter start, final ZuulFilter stop) {
  2. final List<ZuulFilter> zuulFilters = filterLoader.getFiltersByType(start.filterType());
  3. final ZuulFilter[] filters = new ZuulFilter[zuulFilters.size() + 2];
  4. filters[0] = start;
  5. for (int i=1, j=0; i < filters.length && j < zuulFilters.size(); i++,j++) {
  6. filters[i] = zuulFilters.get(j);
  7. }
  8. filters[filters.length -1] = stop;
  9. return filters;
  10. }

这里返回了一个 ZuulFilter 的数组,开始分别是 startstop 对应的刚好是 OutboundPassportStampingFilter

然我们继续回到 addZuulFilterChainHandler() 函数上来,我们发现有三段相似的代码正好对应着获得了 InBound
OutBond EndPoint 这三种Filter,在代码我们可以看出顺序是

  1. requestFiltersendPointFilters 合并成 requestFilterChain
  2. responseFilters 构建成 responseFilterChain
  3. requestFilterChainresponseFilterChain 组合成 ZuulFilterChainHandler
  4. ZuulFilterChainHandler 添加至 pipeline

那这里我们还有一个疑问,这些Filter是从何而来的?这个答案隐藏在
com.netflix.zuul.FilterLoader#getFiltersByType 中,通过简单的跟踪我们可以得到

  1. public Collection<ZuulFilter> getAllFilters() {
  2. return this.filters.values();
  3. }

在这里 ➊ 获得所有的Fiter,而这里的Filter看起来是通过 Put进来的,通过一个简单的断点,我们就可以发现

  1. //com.netflix.zuul.FilterFileManager#init
  2. @PostConstruct
  3. public void init() throws Exception{
  4. filterLoader.putFiltersForClasses(config.getClassNames());
  5. manageFiles();
  6. startPoller();
  7. }

➊ 我们通过类的全称限定类名获得的这个Fitler,这个配置是在我们的配置文件中配置的。

2/3 休息

文至中场,我们已经明白了Zuul2如何将自己的 ZuulFilter 变换成 Netty Handler 并添加到 Netty Pipeline 之中的,那我们还剩下一个问题,这个 ZuulFilter 是如何运作的。但是我们在上段中,我们已经发现了最后是一个 ZuulFilterChainHandler 通过名称我们可以推测出,这是一个 Chain 链,我们继续往下探索吧。

ZuulFilterChainHandler

我们知道,最终注册到 Netty Pipeline 上的最终肯定是 Handler, 我们只需要从 Netty 的 channelRead() 函数作为突破口去阅读。

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. if (msg instanceof HttpRequestMessage) {
  4. zuulRequest = (HttpRequestMessage)msg;
  5. //Replace NETTY_SERVER_CHANNEL_HANDLER_CONTEXT in SessionContext
  6. final SessionContext zuulCtx = zuulRequest.getContext();
  7. zuulCtx.put(NETTY_SERVER_CHANNEL_HANDLER_CONTEXT, ctx);
  8. zuulCtx.put(ZUUL_FILTER_CHAIN, requestFilterChain);
  9. requestFilterChain.filter(zuulRequest);
  10. }
  11. else if ((msg instanceof HttpContent)&&(zuulRequest != null)) {
  12. requestFilterChain.filter(zuulRequest, (HttpContent) msg);
  13. }
  14. else {
  15. LOG.debug("Received unrecognized message type. " + msg.getClass().getName());
  16. ReferenceCountUtil.release(msg);
  17. }
  18. }

➊ 这段逻辑处理 已经被转化为 HttpRequestMessage 类型的消息
➋ 实际上的 filter 处理逻辑
➌ 处理还没被转化为 HttpRequestMessage 类型的消息
➍ 无法处理抛出异常,释放MSG

而这里的 requestFilterChain 就是之前我们传入进去的 ZuulFilterChainRunner 我们看看这 filter() 函数做了什么?

  1. // com.netflix.zuul.netty.filter.ZuulFilterChainRunner#runFilters
  2. private final void runFilters(final T mesg, final AtomicInteger runningFilterIdx) {
  3. T inMesg = mesg;
  4. String filterName = "-";
  5. try {
  6. Preconditions.checkNotNull(mesg, "Input message");
  7. int i = runningFilterIdx.get();
  8. while (i < filters.length) {
  9. final ZuulFilter<T, T> filter = filters[i];
  10. filterName = filter.filterName();
  11. final T outMesg = filter(filter, inMesg);
  12. if (outMesg == null) {
  13. return; //either async filter or waiting for the message body to be buffered
  14. }
  15. inMesg = outMesg;
  16. i = runningFilterIdx.incrementAndGet();
  17. }
  18. invokeNextStage(inMesg);
  19. }
  20. catch (Exception ex) {
  21. }
  22. }

➊ 获得当前运行的Filter的下标值
➋ 获得对应的 ZuulFilter
➌ 调用 ZuulFilter 进行处理
➍ 将下标志值 +1,继续循环体
➎ 执行下个阶段,这里对应着我们自己再构建 new InboundPassportStampingFilter(FILTERS_INBOUND_END)

通过这段代码,我们知道了 Zuul2 的Chain是由 ChainRunner运行,和Netty的Head tail的链方式大相径庭。
BaseZuulFilterRunner 中的 filter() 函数也相当有趣。

  1. //com.netflix.zuul.netty.filter.BaseZuulFilterRunner#filter
  2. protected final O filter(final ZuulFilter<I, O> filter, final I inMesg) {
  3. filter.incrementConcurrency();
  4. resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
  5. filter.applyAsync(inMesg)
  6. .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
  7. .doOnUnsubscribe(resumer::decrementConcurrency)
  8. .subscribe(resumer);
  9. return null; //wait for the async filter to finish
  10. }

➊ 临时存储起来这个Filter待异步完成回调
➋ 具体的执行处
➌ 获取当前的所在的 EventExecutor 并在这个线程上观察
➍ 将数据在 resumer 中消费

  1. @Override
  2. public void onNext(O outMesg) {
  3. try {
  4. recordFilterCompletion(SUCCESS, filter, startTime, inMesg, snapshot);
  5. if (outMesg == null) {
  6. outMesg = filter.getDefaultOutput(inMesg);
  7. }
  8. resumeInBindingContext(outMesg, filter.filterName());
  9. }
  10. }

➊ 处获得我们的结果并将其恢复到我们保管的 filter

剩余关于 ReponseFilter 的运行机制是类似的,诸位看官自行分析下。

终场休息

通过上面的一系列分析,我们已经知道的,Zuul的 调用链模型PreFilters的运行机制,整个Zuul2的运行机制在我们的面前一览无遗。整体的Zuul代码是相当的明了的,代码的分层也很好,但是还有一朵乌云在我们的头顶之上,那就是在那张途中的 Netty Client 我们并没有发现其踪迹,但是我们知道只有在 End 阶段,我们才会对外进行访问,在官网的Wiki中,我们也可以获得

Zuul does not use Ribbon for making outgoing connections and instead uses its own connection pool, using a Netty client. Zuul creates a connection pool per host, per event loop. It does this in order to reduce context switching between threads and to ensure sanity for both the inbound event loops and outbound event loops. The result is that the entire request is run on the same thread, regardless of which event loop is running it.

我们从 Wiki 中可以得知,Netty不再默认使用 Ribbon 而是默认使用 Netty 作为一个 Client.

ProxyEndpoint

这个类的对象及其的复杂,我们从filter的核心逻辑 apply看起来。

  1. @Override
  2. public HttpResponseMessage apply(final HttpRequestMessage input) {
  3. try {
  4. origin.getProxyTiming(zuulRequest).start();
  5. origin.onRequestExecutionStart(zuulRequest, 1);
  6. proxyRequestToOrigin();
  7. return null;
  8. } catch (Exception ex) {
  9. }
  10. }

➊ 将请求转发至远端

  1. private void proxyRequestToOrigin() {
  2. Promise<PooledConnection> promise = null;
  3. try {
  4. promise = origin.connectToOrigin(zuulRequest, channelCtx.channel().eventLoop(), attemptNum, passport, chosenServer);
  5. currentRequestAttempt = origin.newRequestAttempt(chosenServer.get(), context, attemptNum);
  6. if (promise.isDone()) {
  7. operationComplete(promise);
  8. } else {
  9. promise.addListener(this);
  10. }
  11. }
  12. catch (Exception ex) {
  13. }
  14. }

➊ 处将请求包装,连接到远端地址,获得 Promise
➋ 结束的 Promise 处理,在 operationComplete() 中包含了成功的执行代码,至于 connectToOrigin Zuul 包装了 Netty的Client。

  1. private void writeClientRequestToOrigin(final PooledConnection conn) {
  2. final Channel ch = conn.getChannel();
  3. ch.write(zuulRequest);
  4. writeBufferedBodyContent(zuulRequest, ch);
  5. ch.flush();
  6. //Get ready to read origin's response
  7. ch.read();
  8. originConn = conn;
  9. channelCtx.read();
  10. }

➊ 获得建立的连接
➋ 写入Zuul的请求,也就是用户的请求
➌ 将消息Flush出去
➍ 在这里读取响应的数据,也就是触发 OutBoundHandler 的处理时间

总结

Zuul整体逻辑,我们通过博文可以分析而出。

  1. ZuulFilter 分为 Inbound, Outbound, EndPoint
  2. Inbound, Outbound, EndPoint 包裹成 ChainRunner
  3. ChainRunner 组合成一个 ZuulFilterChainHandler,而 ZuulFilterChainHandler 是Netty的 一个Handler
  4. ZuulFilterChainHandler 会组装到 Netty 的 Pipeline 中,剩下来就是Netty的流程了

参考知识