欢迎访问Spring Cloud中国社区

我们致力于成为中国最专业的Spring Boot和Spring Cloud交流社区。推荐使用Github直接登录,欢迎加微信号Software_King进入社区微信交流群。若发现网站bug欢迎反馈!

spring-cloud-eureka (二) Client - Server 接口交互(消息发送)源码分析

saleson · 3月前 · 359 ·

上一篇文章中有介绍spring-cloud-eureka的原因,以及一部分源码分析了服务在启动时是如何加载并运行spring-cloud-eureka的,这一篇文章将从源码的角度来分析spring-cloud-eureka是如何进行服务治理的。


服务注册

服务注册的真正入口在com.netflix.discovery.DiscoveryClient#register()

public class DiscoveryClient implements EurekaClient {
    ...
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            //发送注册信息
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
    ...
}

从代码中可以看到,eureka注册时,是将实例信息传递给eurekaTransport.registrationClient.register(InstanceInfo)方法,eurekaTransport.registrationClient是一个<font color=red>EurekaHttpClient接口,它的注释中说明这是一个底层的eureka http 通信api,也就是说,服务注册消息、服务续约消息(心跳)、服务下线、服务获取等,都是由这个接口发送的</font>。往上翻看源码,发现eurekaTransport.registrationClient是一个com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient实例,也就是注册信息是由该类发送的。

com.netflix.discovery.DiscoveryClient

public class DiscoveryClient implements EurekaClient {

    ...
    //构造方法
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
    ...
    //创建EurekaTransport,并初始化
    eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);
    ...
    }
    ...
    private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                            AbstractDiscoveryClientOptionalArgs args) {
     ...
        if (clientConfig.shouldRegisterWithEureka()) {
            EurekaHttpClientFactory newRegistrationClientFactory = null;
            EurekaHttpClient newRegistrationClient = null;
            try {
                //创建注册客户端工厂类
                newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                        eurekaTransport.bootstrapResolver,
                        eurekaTransport.transportClientFactory,
                        transportConfig
                );
                //创建注册客户端
                newRegistrationClient = newRegistrationClientFactory.newClient();
            } catch (Exception e) {
                logger.warn("Transport initialization failure", e);
            }
            //引用
            eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
            eurekaTransport.registrationClient = newRegistrationClient;
        }
        ...
    }
}

com.netflix.discovery.shared.transport.EurekaHttpClients

public final class EurekaHttpClients {
    ...
    public static EurekaHttpClientFactory registrationClientFactory(ClusterResolver bootstrapResolver,
                                                                    TransportClientFactory transportClientFactory,
                                                                    EurekaTransportConfig transportConfig) {
        return canonicalClientFactory(EurekaClientNames.REGISTRATION, transportConfig, bootstrapResolver, transportClientFactory);
    }

    static EurekaHttpClientFactory canonicalClientFactory(final String name,
                                                          final EurekaTransportConfig transportConfig,
                                                          final ClusterResolver<EurekaEndpoint> clusterResolver,
                                                          final TransportClientFactory transportClientFactory) {
        //一个客户端工厂匿名类
        return new EurekaHttpClientFactory() {
            @Override
            public EurekaHttpClient newClient() {
                //创建eureka http客户端并返回
                return new SessionedEurekaHttpClient(
                        name,
                        /* 新建了一个RetryableEurekaHttpClient工厂 */ RetryableEurekaHttpClient.createFactory(
                                name,
                                transportConfig,
                                clusterResolver,
                                RedirectingEurekaHttpClient.createFactory(transportClientFactory),
                                ServerStatusEvaluators.legacyEvaluator()),
                        transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000
                );
            }

            @Override
            public void shutdown() {
                wrapClosable(clusterResolver).shutdown();
            }
        };
    }
    ...
}

注册客户端(eurekaTransport.registrationClient)的创建过程是这样的

DiscoveryClient.init()

  • scheduleServerEndpointTask()
    • EurekaHttpClients.registrationClientFactory()
      • canonicalClientFactory()
    • EurekaHttpClientFactory.newClient()

1


发送注册消息

你以为最终的注册消息是由SessionedEurekaHttpClient发送的么?我一开始也是这样以为的,最后发现太天真了,看看调用链,看看这其中包装了一大堆EurekaHttpClient实现类。

2

<font color=red>可以发现,最终调用的是com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient对象。</font>

3

上面是EurekaHttpClient的类图结构,最终真正发消息的是由AbstractJerseyEurekaHttpClient发送的,它有两个子类,分别是JerseyApplicationClient和JerseyReplicationClient,JerseyReplicationClient类是用于服务同步的,后面再说,先看JerseyApplicationClient类。

package com.netflix.discovery.shared.transport.jersey;
...
public class JerseyApplicationClient extends AbstractJerseyEurekaHttpClient {

    private final Map<String, String> additionalHeaders;

    public JerseyApplicationClient(Client jerseyClient, String serviceUrl, Map<String, String> additionalHeaders) {
        super(jerseyClient, serviceUrl);
        this.additionalHeaders = additionalHeaders;
    }

    @Override
    protected void addExtraHeaders(Builder webResource) {
        if (additionalHeaders != null) {
            for (String key : additionalHeaders.keySet()) {
                webResource.header(key, additionalHeaders.get(key));
            }
        }
    }
}

在JerseyReplicationClient 类中并没有发送消息的逻辑,那么就是在AbstractJerseyEurekaHttpClient类中了,查看该类的源码,可以看到相关的方法。

package com.netflix.discovery.shared.transport.jersey;
...
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
    ...
    //发送注册消息
    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    ...
}

AbstractJerseyEurekaHttpClient 类中方法对应的功能

method description
register(InstanceInfo) 服务注册
cancel(String, String) 服务下线
sendHeartBeat(String, String, InstanceInfo, InstanceStatus) 服务续约
statusUpdate(String, String, InstanceStatus, InstanceInfo) 服务状态更新

假设服务实例的eureka配置是这样的

spring:
  application:
    name: eureka-client
server:
  port: 10101
eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    serviceUrl:
      defaultZone: http://localhost:10002/eureka/

那么请求的路径就是http://localhost:10002/eureka/apps/EUREKA-CLIENT,并以post的方法将InstanceInfo对象提交过去,然后将注册中心响应的状态码和head信息包装起来返回。


注册中心接收注册消息

Eureka Server 通过jersey发布了一些web接口,资源类在eureka-core-{version}.jar!com.netflix.eureka.resources包下,服务注册的代码就在com.netflix.eureka.resources.ApplicationResource中。

com.netflix.eureka.resources.ApplicationResource

package com.netflix.eureka.resources;
...
@Produces({"application/xml", "application/json"})
public class ApplicationResource {
    ...
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        ...

        //注册中心服务登记的入口,isReplication判断是否是其它Eureka Server广播过来的。
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }
    ...
}

这些接口包括:
| url | method | class | description |
| —— | —— | —— | ——|
| /apps/{appId} | POST | ApplicationResource | 服务注册 |
| /apps/{appId} | GET | ApplicationResource | 获取服务信息 |
| /instances/{id} | GET | InstancesResource | 获取服务实例信息 |
| /apps/{appId}/{id} | GET | InstanceResource | 获取服务实例信息 |
| /apps/{appId}/{id} | PUT | InstanceResource | 服务实例续约 |
| /apps/{appId}/{id} | DELETE | InstanceResource | 服务实例下线 |
| /apps/{appId}/{id}/status | PUT | InstanceResource | 服务实例状态变更 |
| … | … | … | …|

上一章spring-cloud-eureka (一) 原理分析 主要分析服务治理的功能以及Eureka Client和Eureka Server在启动时做的处理,这一章分析了Eureka Client与 Eureka Server接口交互的逻辑,还有会下一章分析注册中的服务注册、续约、下线、以及服务剔除的逻辑。

下面几张图描述了服务注册、服务续约、服务下线的调用链

服务注册

4

服务续约

5

服务下线

6

参考资料
《spring cloud 微服务实战》
深度剖析服务发现组件Netflix Eureka