亭亭五月天在线观看,亭亭五月天在线观看,国产最新av一区二区,国产 高清 中文字幕,99re热久久亚洲综合精品成人,熟妇 一区二区三区,一级做a爰片性色毛片武则天,美女的骚穴视频播放,国产美女午夜免费视频

首頁>國內(nèi) > 正文

深入理解AP架構(gòu)Nacos注冊原理

2023-01-17 12:07:08來源:得物技術(shù)

1、Nacos簡介

Nacos是一款阿里巴巴開源用于管理分布式微服務(wù)的中間件,能夠幫助開發(fā)人員快速實現(xiàn)動態(tài)服務(wù)發(fā)現(xiàn)、服務(wù)配置、服務(wù)元數(shù)據(jù)及流量管理等。這篇文章主要剖析一下Nacos作為注冊中心時其服務(wù)注冊與發(fā)現(xiàn)原理。


【資料圖】

2、為什么會需要Nacos

Nacos作為注冊中心是為了更好更方便的管理應(yīng)用中的每一個服務(wù),是各個分布式節(jié)點之間的紐帶。其作為注冊中心主要提供以下核心功能:

服務(wù)注冊與發(fā)現(xiàn):動態(tài)的增減服務(wù)節(jié)點,服務(wù)節(jié)點增減后動態(tài)的通知服務(wù)消費者,不需要由消費者來更新配置。服務(wù)配置:動態(tài)修改服務(wù)配置,并將其推送到服務(wù)提供者和服務(wù)消費者而不需要重啟服務(wù)。健康檢查和服務(wù)摘除:主動的檢查服務(wù)健康情況,對于宕機的服務(wù)將其摘除服務(wù)列表。3、分布式架構(gòu)CAP理論

CAP定理是分布式系統(tǒng)中最基礎(chǔ)的原則,所以理解和掌握了CAP對系統(tǒng)架構(gòu)的設(shè)計至關(guān)重要。分布式架構(gòu)下所有系統(tǒng)不可能同時滿足以下三點:Consisteny(一致性)、Availability(可用性)、Partition tolerance(分區(qū)容錯性),CAP指明了任何分布式系統(tǒng)只能同時滿足這三項中的兩項。

分布式系統(tǒng)肯定都要保證其容錯性 ,那么可用性和一致性就只能選一個了。簡單來說分布式系統(tǒng)的CAP理論就像你想買個新手機,這個手機不可能功能強大、便宜、又好看的,它最多只能滿足兩點的,要么功能強大便宜、要么功能強大好看、要么便宜好看,不可能同時滿足三點。

4、幾種注冊中心的區(qū)別

注冊中心在分布式應(yīng)用中是經(jīng)常用到的,也是必不可少的,那注冊中心,又分為以下幾種:Eureka、Zookeeper、Nacos等。這些注冊中心最大的區(qū)別就是其基于AP架構(gòu)還是CP架構(gòu),簡單介紹一下:

Zookeeper:用過或者了解過zk做注冊中心的同學(xué)都知道,Zookeeper集群下一旦leader節(jié)點宕機了,在短時間內(nèi)服務(wù)都不可通訊,因為它們在一定時間內(nèi)follower進行選舉來推出新的leader,因為在這段時間內(nèi),所有的服務(wù)通信將受到影響,而且leader選取時間比較長,需要花費幾十秒甚至上百秒的時間,因此:可以理解為 Zookeeper是實現(xiàn)的CP,也就是將失去A(可用性)。Eureka:Eureka集群下每個節(jié)點之間都會定時發(fā)送心跳,定時同步數(shù)據(jù),沒有master/slave之分,是一個完全去中心化的架構(gòu)。因此每個注冊到Eureka下的實例都會定時同步ip,服務(wù)之間的調(diào)用也是根據(jù)Eureka拿到的緩存服務(wù)數(shù)據(jù)進行調(diào)用。若一臺Eureka服務(wù)宕機,其他Eureka在一定時間內(nèi)未感知到這臺Eureka服務(wù)宕機,各個服務(wù)之間還是可以正常調(diào)用。Eureka的集群中,只要有一臺Eureka還在,就能保證注冊服務(wù)可用(保證可用性),只不過查到的信息可能不是最新的(不保證強一致性)。當(dāng)數(shù)據(jù)出現(xiàn)不一致時,雖然A, B上的注冊信息不完全相同,但每個Eureka節(jié)點依然能夠正常對外提供服務(wù),這會出現(xiàn)查詢服務(wù)信息時如果請求A查不到,但請求B就能查到。如此保證了可用性但犧牲了一致性。Nacos:同時支持CP和AP架構(gòu),根據(jù)根據(jù)服務(wù)注冊選擇臨時和永久來決定走AP模式還是CP模式。如果注冊Nacos的client節(jié)點注冊時ephemeral=true,那么Nacos集群對這個client節(jié)點的效果就是AP,采用distro協(xié)議實現(xiàn);而注冊Nacos的client節(jié)點注冊時ephemeral=false,那么Nacos集群對這個節(jié)點的效果就是CP的,采用raft協(xié)議實現(xiàn)。

本篇文章主要是深入研究一下Nacos基于AP架構(gòu)微服務(wù)注冊原理,由于篇幅有限基于CP架構(gòu)的Nacos微服務(wù)注冊下次再跟你們分析。

5、Nacos服務(wù)注冊與發(fā)現(xiàn)的原理

1.微服務(wù)在啟動將自己的服務(wù)注冊到Nacos注冊中心,同時發(fā)布http接口供其他系統(tǒng)調(diào)用,一般都是基于SpringMVC。

2.服務(wù)消費者基于Feign調(diào)用服務(wù)提供者對外發(fā)布的接口,先對調(diào)用的本地接口加上注解@FeignClient,F(xiàn)eign會針對加了該注解的接口生成動態(tài)代理,服務(wù)消費者針對Feign生成的動態(tài)代理去調(diào)用方法時,會在底層生成Http協(xié)議格式的請求,類似 /stock/deduct? productId=100。

3.Feign最終會調(diào)用Ribbon從本地的Nacos注冊表的緩存里根據(jù)服務(wù)名取出服務(wù)提供在機器的列表,然后進行負載均衡并選擇一臺機器出來,對選出來的機器IP和端口拼接之前生成的url請求,生成調(diào)用的Http接口地址。

6、Nacos核心功能點

服務(wù)注冊:Nacos Client會通過發(fā)送REST請求的方式向Nacos Server注冊自己的服務(wù),提供自身的元數(shù)據(jù),比如ip地址、端口等信息。Nacos Server接收到注冊請求后,就會把這些元數(shù)據(jù)信息存儲在一個雙層的內(nèi)存Map中。

服務(wù)心跳:在服務(wù)注冊后,Nacos Client會維護一個定時心跳來持續(xù)通知Nacos Server,說明服務(wù)一直處于可用狀態(tài),防止被剔除。默認5s發(fā)送一次心跳。

服務(wù)健康檢查:Nacos Server會開啟一個定時任務(wù)用來檢查注冊服務(wù)實例的健康情況,對于超過15s沒有收到客戶端心跳的實例會將它 的healthy屬性置為false(客戶端服務(wù)發(fā)現(xiàn)時不會發(fā)現(xiàn)),如果某個實例超過30秒沒有收到心跳,直接剔除該實例(被剔除的實例如果恢復(fù) 發(fā)送心跳則會重新注冊)

服務(wù)發(fā)現(xiàn):服務(wù)消費者(Nacos Client)在調(diào)用服務(wù)提供者的服務(wù)時,會發(fā)送一個REST請求給Nacos Server,獲取上面注冊的服務(wù)清 單,并且緩存在Nacos Client本地,同時會在Nacos Client本地開啟一個定時任務(wù)定時拉取服務(wù)端最新的注冊表信息更新到本地緩存

服務(wù)同步:Nacos Server集群之間會互相同步服務(wù)實例,用來保證服務(wù)信息的一致性。

7、Nacos源碼分析

看Nacos源碼的不難發(fā)現(xiàn),Nacos實際上就是一個基于Spring Boot的web應(yīng)用,不管是服務(wù)注冊還是發(fā)送心跳都是通過給Nacos服務(wù)端發(fā)送http請求實現(xiàn)的。下載并編譯Nacos源碼就不過多贅述了,首先需要搭建一個微服務(wù)作為Nacos的客戶端。

7.1 Nacos客戶端注冊

Nacos客戶端也是個Spring Boot項目,當(dāng)客戶端服務(wù)啟動時Spring Boot項目啟動時自動加載spring-cloud-starter-alibaba-nacos-discovery包的META-INF/spring.factories中包含自動裝配的配置信息,并將文件中的類加載成bean放入Spring容器中,我們可以先看一下spring.factories文件:

org.springframework.boot.autoconfigure.EnableAutoCnotallow=\  com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\  com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\  com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\  com.alibaba.cloud.nacos.NacosServiceAutoConfigurationorg.springframework.cloud.bootstrap.BootstrapCnotallow=\  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

找到Nacos注冊中心的自動配置類:NacosServiceRegistryAutoConfiguration。

NacosServiceRegistryAutoConfiguration這個類是Nacos客戶端啟動時的一個入口類,代碼如下:

@Configuration(    proxyBeanMethods = false)@EnableConfigurationProperties@ConditionalOnNacosDiscoveryEnabled@ConditionalOnProperty(    value = {"spring.cloud.service-registry.auto-registration.enabled"},    matchIfMissing = true)@AutoConfigureAfter({AutoServiceRegistrationConfiguration.class,                     AutoServiceRegistrationAutoConfiguration.class,                      NacosDiscoveryAutoConfiguration.class})public class NacosServiceRegistryAutoConfiguration {    public NacosServiceRegistryAutoConfiguration() {    }    @Bean    public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {        return new NacosServiceRegistry(nacosDiscoveryProperties);    }    @Bean    @ConditionalOnBean({AutoServiceRegistrationProperties.class})    public NacosRegistration nacosRegistration(ObjectProvider> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {        return new NacosRegistration((List)registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);    }    @Bean    @ConditionalOnBean({AutoServiceRegistrationProperties.class})    public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {        return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);    }}

看NacosServiceRegistryAutoConfiguration配置類有3個@Bean注解。

nacosServiceRegistry()方法: 定義了NacosServiceRegistry的bean,并且為其屬性nacosDiscoveryProperties賦值,即將從配置文件中讀取到的配置信息賦值進去待用;nacosRegistration()方法主要就是定義了NacosRegistration的bean,后面會用到這個bean;nacosAutoServiceRegistration:該方法比較核心它的參數(shù)中有2個就是前面定義的兩個bean,其實就是為了這個方法服務(wù)的,由NacosAutoServiceRegistration類的構(gòu)造器傳入NacosAutoServiceRegistration類中:NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration),后面的流程都是以這句代碼作為入口。

利用IDEA查看類結(jié)構(gòu),如上圖所示,NacosAutoServiceRegistration繼承AbstractAutoServiceRegistration類,而AbstractAutoServiceRegistration類又實現(xiàn)了AutoServiceRegistration和ApplicationListener接口。

ApplicationListener接口是Spring提供的事件監(jiān)聽接口,Spring會在所有bean都初始化完成之后發(fā)布一個事件,ApplicationListener會監(jiān)聽所發(fā)布的事件,這里的事件是Spring Boot自定義的WebServerInitializedEvent事件,主要是項目啟動時就會發(fā)布WebServerInitializedEvent事件,然后被AbstractAutoServiceRegistration監(jiān)聽到,從而就會執(zhí)行onApplicationEvent方法,在這個方法里就會進行服務(wù)注冊。

這里AbstractAutoServiceRegistration類實現(xiàn)了Spring監(jiān)聽器接口ApplicationListener,并重寫了該接口的onApplicationEvent方法。

public void onApplicationEvent(WebServerInitializedEvent event) {     this.bind(event);}

繼續(xù)點下去看bind方法。

public void bind(WebServerInitializedEvent event) {        ApplicationContext context = event.getApplicationContext();        if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {            this.port.compareAndSet(0, event.getWebServer().getPort());            //start方法            this.start();        }    }

看到這里發(fā)現(xiàn)了bind方法里有個非常重要的start()方法,繼續(xù)看該方法的register()就是真正的客戶端注冊方法。

public void start() {        if (!this.isEnabled()) {            if (logger.isDebugEnabled()) {                logger.debug("Discovery Lifecycle disabled. Not starting");            }        } else {            if (!this.running.get()) {                this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));                //真正的客戶端注冊方法                this.register();                if (this.shouldRegisterManagement()) {                    this.registerManagement();                }                this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));                this.running.compareAndSet(false, true);            }        }    }

跳過一些中間非關(guān)鍵性的代碼,可以直接看該注冊方法。

protected void register() {   this.serviceRegistry.register(getRegistration());}

這里的serviceRegistry就是NacosServiceRegistryAutoConfiguration類中第一個@Bean定義的bean,第一個@Bean就是這里的serviceRegistry對象的實現(xiàn);其中g(shù)etRegistration()獲取的就是第二個@Bean定義的NacosRegistration的實例,這兩個bean實例都是通過第3個@Bean傳進來的,所以這里就可以把NacosServiceRegistryAutoConfiguration類中那3個@Bean給串起來了。

protected void register() {   this.serviceRegistry.register(getRegistration());}

不得不說,阿里巴巴開發(fā)的中間件,其底層源碼的命名還是很規(guī)范的,register()方法從命名上來看就可以知道這是注冊的方法,事實也確實是注冊的方法,這個方法中會通過nacos-client包來調(diào)用nacos-server的服務(wù)注冊接口來實現(xiàn)服務(wù)的注冊功能。下面我看一下調(diào)用Nacos注冊接口方法:

public void register(Registration registration) {        if (StringUtils.isEmpty(registration.getServiceId())) {            log.warn("No service to register for nacos client...");        } else {            NamingService namingService = this.namingService();            String serviceId = registration.getServiceId();            String group = this.nacosDiscoveryProperties.getGroup();            //構(gòu)建客戶端參數(shù)ip,端口號等            Instance instance = this.getNacosInstanceFromRegistration(registration);            try {                //調(diào)用注冊方法                namingService.registerInstance(serviceId, group, instance);                log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});            } catch (Exception var7) {                log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});                ReflectionUtils.rethrowRuntimeException(var7);            }        }    }        //構(gòu)建客戶端注冊參數(shù)    private Instance getNacosInstanceFromRegistration(Registration registration) {        Instance instance = new Instance();        instance.setIp(registration.getHost());        instance.setPort(registration.getPort());        instance.setWeight((double)this.nacosDiscoveryProperties.getWeight());        instance.setClusterName(this.nacosDiscoveryProperties.getClusterName());        instance.setEnabled(this.nacosDiscoveryProperties.isInstanceEnabled());        instance.setMetadata(registration.getMetadata());        instance.setEphemeral(this.nacosDiscoveryProperties.isEphemeral());        return instance;    }

根據(jù)源碼可以知道beatReactor.addBeatInfo()方法作用在于創(chuàng)建心跳信息實現(xiàn)健康檢測,Nacos 服務(wù)端必須要確保注冊的服務(wù)實例是健康的,而心跳檢測就是服務(wù)健康檢測的手段。而serverProxy.registerService()實現(xiàn)服務(wù)注冊,綜上可以分析出Nacos客戶端注冊流程:

到此為止還沒有真正的實現(xiàn)服務(wù)的注冊,但是至少已經(jīng)知道了Nacos客戶端的自動注冊原理是借助了Spring Boot的自動配置功能,在項目啟動時通過自動配置類。NacosServiceRegistryAutoConfiguration將NacosServiceRegistry注入進來,通過Spring的事件監(jiān)聽機制,調(diào)用該類的注冊方法register(registration)實現(xiàn)服務(wù)的自動注冊。

7.2 Nacos服務(wù)發(fā)現(xiàn)7.2.1 Nacos客戶端客戶端服務(wù)發(fā)現(xiàn)

當(dāng)Nacos服務(wù)端啟動后,會先從本地緩存的serviceInfoMap中獲取服務(wù)實例信息,獲取不到則通過NamingProxy調(diào)用Nacos服務(wù)端獲取服務(wù)實例信息,最后開啟定時任務(wù)每秒請求服務(wù)端獲取實例信息列表進而更新本地緩存serviceInfoMap,服務(wù)發(fā)現(xiàn)拉取實例信息流程圖如下:

廢話不多說,直接上服務(wù)發(fā)現(xiàn)源碼:

/**     * 客戶端服務(wù)發(fā)現(xiàn)     *     * @param serviceName name of service     * @param groupName   group of service     * @param clusters    list of cluster     * @param subscribe   if subscribe the service     * @return     * @throws NacosException     */    @Override    public List getAllInstances(String serviceName, String groupName, List clusters,            boolean subscribe) throws NacosException {                ServiceInfo serviceInfo;        if (subscribe) {            // 如果本地緩存不存在服務(wù)信息,則進行訂閱            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),                    StringUtils.join(clusters, ","));        } else {            // 如果非訂閱模式就直接拉取服務(wù)端的注冊表            serviceInfo = hostReactor                    .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),                            StringUtils.join(clusters, ","));        }        List list;        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {            return new ArrayList();        }        return list;    }
/**     * 客戶端從注冊中心拉取注冊列表     *     * @param serviceName     * @param clusters     * @return     */    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {                NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());        String key = ServiceInfo.getKey(serviceName, clusters);        if (failoverReactor.isFailoverSwitch()) {            return failoverReactor.getService(key);        }        //客戶端從本地緩存中拉群注冊表信息,第一次根據(jù)服務(wù)名從注冊表map中獲取,服務(wù)表信息肯定是為null        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);        //如果拿到緩存map中的服務(wù)列表為null,如果是第一次根據(jù)服務(wù)名拉取注冊表信息,肯定為null        if (null == serviceObj) {            serviceObj = new ServiceInfo(serviceName, clusters);                        serviceInfoMap.put(serviceObj.getKey(), serviceObj);                        updatingMap.put(serviceName, new Object());            //第一次拉取注冊表信息為null后,然后調(diào)用Nacos服務(wù)端接口更新本地注冊表            updateServiceNow(serviceName, clusters);            updatingMap.remove(serviceName);                    } else if (updatingMap.containsKey(serviceName)) {                        if (UPDATE_HOLD_INTERVAL > 0) {                // hold a moment waiting for update finish                synchronized (serviceObj) {                    try {                        serviceObj.wait(UPDATE_HOLD_INTERVAL);                    } catch (InterruptedException e) {                        NAMING_LOGGER                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);                    }                }            }        }        /**         * 定時任務(wù)拉取,每隔幾秒鐘就去拉取一次,去拉取nacos注冊表,更新客戶端本地注冊列表的map         *         * 為啥這里要定時任務(wù)拉取呢?因為上面到注冊表map是緩存在客戶端本地的,假如有新的服務(wù)注冊到nacos         * 時,這時就要更新客戶端注冊表信息,所以這里會執(zhí)行一個訂單拉取的任務(wù)         */        scheduleUpdateIfAbsent(serviceName, clusters);                return serviceInfoMap.get(serviceObj.getKey());    }                //異步拉取任務(wù)    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {            return;        }                synchronized (futureMap) {            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {                return;            }            //執(zhí)行一個定時拉取任務(wù)            ScheduledFuture future = addTask(new UpdateTask(serviceName, clusters));            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);        }    }        //定時拉取注冊表任務(wù)        public class UpdateTask implements Runnable {                long lastRefTime = Long.MAX_VALUE;                private final String clusters;                private final String serviceName;                /**         * the fail situation. 1:can"t connect to server 2:serviceInfo"s hosts is empty         */        private int failCount = 0;                public UpdateTask(String serviceName, String clusters) {            this.serviceName = serviceName;            this.clusters = clusters;        }                private void incFailCount() {            int limit = 6;            if (failCount == limit) {                return;            }            failCount++;        }                private void resetFailCount() {            failCount = 0;        }                @Override        public void run() {            long delayTime = DEFAULT_DELAY;                        try {                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));                                if (serviceObj == null) {                    //又在繼續(xù)調(diào)用拉取nacos注冊列表方法                    updateService(serviceName, clusters);                    return;                }                                if (serviceObj.getLastRefTime() <= lastRefTime) {                    //又在繼續(xù)調(diào)用拉取nacos注冊列表方法                    updateService(serviceName, clusters);                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));                } else {                    // if serviceName already updated by push, we should not override it                    // since the push data may be different from pull through force push                    refreshOnly(serviceName, clusters);                }                                lastRefTime = serviceObj.getLastRefTime();                                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {                    // abort the update task                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);                    return;                }                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {                    incFailCount();                    return;                }                delayTime = serviceObj.getCacheMillis();                resetFailCount();            } catch (Throwable e) {                incFailCount();                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);            } finally {                //最后繼續(xù)嵌套調(diào)用當(dāng)前這個任務(wù),實現(xiàn)定時拉取                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);            }        }

這里值得注意的是,Nacos客戶端拉取注冊列表方法的最后又是一個定時任務(wù)任務(wù),每隔10秒鐘就會拉取一次服務(wù)端Nacos的注冊表。為啥這里要定時任務(wù)拉取呢?因為上面到注冊表map是緩存在客戶端本地的,假如有新的服務(wù)注冊到Nacos時,這時就要更新客戶端注冊表信息,所以這里會執(zhí)行一個拉取的任務(wù)。

private void updateServiceNow(String serviceName, String clusters) {        try {            //拉群nacos列表,更新到本地緩存map中的注冊列表            updateService(serviceName, clusters);        } catch (NacosException e) {            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);        }    }    /**     * Update service now.     * 拉取注冊列表     *     * @param serviceName service name     * @param clusters    clusters     */    public void updateService(String serviceName, String clusters) throws NacosException {        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);        try {            //調(diào)用拉群列表接口            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);                        if (StringUtils.isNotEmpty(result)) {                //解析返回值服務(wù)表json                processServiceJson(result);            }        } finally {            if (oldService != null) {                synchronized (oldService) {                    oldService.notifyAll();                }            }    }    /**     * Nacos客戶端查詢服務(wù)端注冊表數(shù)     *     * @param serviceName service name     * @param clusters    clusters     * @param udpPort     udp port     * @param healthyOnly healthy only     * @return instance list     * @throws NacosException nacos exception     */    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)            throws NacosException {                final Map params = new HashMap(8);        params.put(CommonParams.NAMESPACE_ID, namespaceId);        params.put(CommonParams.SERVICE_NAME, serviceName);        params.put("clusters", clusters);        params.put("udpPort", String.valueOf(udpPort));        params.put("clientIP", NetUtils.localIP());        params.put("healthyOnly", String.valueOf(healthyOnly));        //調(diào)用拉取注冊列表接口        return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);    }
7.2.2 服務(wù)端服務(wù)發(fā)現(xiàn)查詢注冊表api

上面分析了當(dāng)客戶端在其本地緩存中沒有找到注冊表信息,就會調(diào)用Nacos服務(wù)端api拉取注冊表信息,不難發(fā)現(xiàn)服務(wù)端查詢注冊表api為"/instance/list"。

/**     * Get all instance of input service.     * 客戶端獲取nacos所有注冊實例方法     *     * @param request http request     * @return list of instance     * @throws Exception any error during list     */    @GetMapping("/list")    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)    public ObjectNode list(HttpServletRequest request) throws Exception {                String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);        NamingUtils.checkServiceNameFormat(serviceName);                String agent = WebUtils.getUserAgent(request);        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);        int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));                String app = WebUtils.optional(request, "app", StringUtils.EMPTY);                String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);                boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));                return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,                healthyOnly);    }

這里通過doSrvIpxt()方法獲取服務(wù)列表,根據(jù)namespaceId、serviceName獲取service實例,service實例中srvIPs獲取所有服務(wù)提供者的實例信息,遍歷組裝成json字符串并返回。

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {                ClientInfo clientInfo = new ClientInfo(agent);        ObjectNode result = JacksonUtils.createEmptyJsonNode();        Service service = serviceManager.getService(namespaceId, serviceName);        long cacheMillis = switchDomain.getDefaultCacheMillis();                // now try to enable the push        try {            if (udpPort > 0 && pushService.canEnablePush(agent)) {                                pushService                        .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),                                pushDataSource, tid, app);                cacheMillis = switchDomain.getPushCacheMillis(serviceName);            }        } catch (Exception e) {            Loggers.SRV_LOG                    .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);            cacheMillis = switchDomain.getDefaultCacheMillis();        }                if (service == null) {            if (Loggers.SRV_LOG.isDebugEnabled()) {                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);            }            result.put("name", serviceName);            result.put("clusters", clusters);            result.put("cacheMillis", cacheMillis);            result.replace("hosts", JacksonUtils.createEmptyArrayNode());            return result;        }                checkIfDisabled(service);                List srvedIPs;        //獲取所有實例        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));                // filter ips using selector:        if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {            srvedIPs = service.getSelector().select(clientIP, srvedIPs);        }                if (CollectionUtils.isEmpty(srvedIPs)) {                        if (Loggers.SRV_LOG.isDebugEnabled()) {                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);            }                        if (clientInfo.type == ClientInfo.ClientType.JAVA                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {                result.put("dom", serviceName);            } else {                result.put("dom", NamingUtils.getServiceName(serviceName));            }                        result.put("name", serviceName);            result.put("cacheMillis", cacheMillis);            result.put("lastRefTime", System.currentTimeMillis());            result.put("checksum", service.getChecksum());            result.put("useSpecifiedURL", false);            result.put("clusters", clusters);            result.put("env", env);            result.set("hosts", JacksonUtils.createEmptyArrayNode());            result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));            return result;        }                Map> ipMap = new HashMap<>(2);        ipMap.put(Boolean.TRUE, new ArrayList<>());        ipMap.put(Boolean.FALSE, new ArrayList<>());                for (Instance ip : srvedIPs) {            ipMap.get(ip.isHealthy()).add(ip);        }                if (isCheck) {            result.put("reachProtectThreshold", false);        }                double threshold = service.getProtectThreshold();                if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {                        Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);            if (isCheck) {                result.put("reachProtectThreshold", true);            }                        ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));            ipMap.get(Boolean.FALSE).clear();        }                if (isCheck) {            result.put("protectThreshold", service.getProtectThreshold());            result.put("reachLocalSiteCallThreshold", false);                        return JacksonUtils.createEmptyJsonNode();        }                ArrayNode hosts = JacksonUtils.createEmptyArrayNode();                for (Map.Entry> entry : ipMap.entrySet()) {            List ips = entry.getValue();                        if (healthyOnly && !entry.getKey()) {                continue;            }                        for (Instance instance : ips) {                                // remove disabled instance:                if (!instance.isEnabled()) {                    continue;                }                                ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();                                ipObj.put("ip", instance.getIp());                ipObj.put("port", instance.getPort());                // deprecated since nacos 1.0.0:                ipObj.put("valid", entry.getKey());                ipObj.put("healthy", entry.getKey());                ipObj.put("marked", instance.isMarked());                ipObj.put("instanceId", instance.getInstanceId());                ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));                ipObj.put("enabled", instance.isEnabled());                ipObj.put("weight", instance.getWeight());                ipObj.put("clusterName", instance.getClusterName());                if (clientInfo.type == ClientInfo.ClientType.JAVA                        && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {                    ipObj.put("serviceName", instance.getServiceName());                } else {                    ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));                }                                ipObj.put("ephemeral", instance.isEphemeral());                hosts.add(ipObj);                            }        }                result.replace("hosts", hosts);        if (clientInfo.type == ClientInfo.ClientType.JAVA                && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {            result.put("dom", serviceName);        } else {            result.put("dom", NamingUtils.getServiceName(serviceName));        }        result.put("name", serviceName);        result.put("cacheMillis", cacheMillis);        result.put("lastRefTime", System.currentTimeMillis());        result.put("checksum", service.getChecksum());        result.put("useSpecifiedURL", false);        result.put("clusters", clusters);        result.put("env", env);        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));        return result;    }

最后看一下獲取服務(wù)端實例方法,最后就是將臨時實例或者持久實例放在一個集合中返回給客戶端。

public List srvIPs(List clusters) {        if (CollectionUtils.isEmpty(clusters)) {            clusters = new ArrayList<>();            clusters.addAll(clusterMap.keySet());        }        return allIPs(clusters);    }    public List allIPs(List clusters) {        List result = new ArrayList<>();        for (String cluster : clusters) {            Cluster clusterObj = clusterMap.get(cluster);            if (clusterObj == null) {                continue;            }                        result.addAll(clusterObj.allIPs());        }        return result;    }    public List allIPs() {        List allInstances = new ArrayList<>();        //將nacos內(nèi)存中注冊表信息返回        allInstances.addAll(persistentInstances);        allInstances.addAll(ephemeralInstances);        return allInstances;    }

總結(jié)一下Nacos客戶端服務(wù)發(fā)現(xiàn)的核心流程:

如果沒有開啟訂閱模式,則直接通過調(diào)用/instance/list接口獲取服務(wù)實例列表信息;

如果開啟訂閱模式,則先會從本地緩存中獲取實例信息,如果不存在,則進行訂閱獲并獲取實例信息;在獲得最新的實例信息之后,也會執(zhí)行processServiceJson(result)方法來更新內(nèi)存和本地實例緩存,并發(fā)布變更時間。

開啟訂閱時,會開啟定時任務(wù),定時執(zhí)行UpdateTask獲取服務(wù)器實例信息、更新本地緩存、發(fā)布事件等;

7.3 Nacos服務(wù)端注冊

服務(wù)端的注冊源碼邏輯相對客戶端的還是要復(fù)雜很多,所以這里我們先看一下Nacos服務(wù)端注冊的完整流程圖,避免一上來就看源碼被繞暈。

接下來我們就著重分析一下AP架構(gòu)Nacos服務(wù)注冊的源碼。

7.3.1 Nacos服務(wù)端注冊

Nacos服務(wù)端注冊當(dāng)然是本文的核心,那么首先我們來看一下Nacos服務(wù)端注冊源碼。從Nacos的客戶端注冊原理不難發(fā)現(xiàn),客戶端通過調(diào)用Nacos服務(wù)端提供的http接口實現(xiàn)注冊,對外提供的服務(wù)接口請求地址為nacos/v1/ns/instance,實現(xiàn)代碼咋nacos-naming模塊下的InstanceController類中:

@CanDistro    @PostMapping    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)    public String register(HttpServletRequest request) throws Exception {        //從請求參數(shù)匯總獲得namespaceId(命名空間Id)        final String namespaceId = WebUtils                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);        //從請求參數(shù)匯總獲得serviceName(服務(wù)名)        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);        NamingUtils.checkServiceNameFormat(serviceName);                final Instance instance = parseInstance(request);        //registerInstance注冊實例        serviceManager.registerInstance(namespaceId, serviceName, instance);        return "ok";    }

客戶端就是通過調(diào)用該api實現(xiàn)Nacos的注冊的,下面可以看一下Nacos的這個注冊api是怎么實現(xiàn)的。

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {                createEmptyService(namespaceId, serviceName, instance.isEphemeral());        //前面構(gòu)建過了,這里調(diào)取肯定部不為null,從serviceMap中根據(jù)namespaceId和serviceName得到一個服務(wù)對象        Service service = getService(namespaceId, serviceName);                if (service == null) {            throw new NacosException(NacosException.INVALID_PARAM,                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);        }        //調(diào)用addInstance添加服務(wù)實例        //總體流程:把需要注冊的實例放到內(nèi)存阻塞隊列中,另外會起另一個線程從內(nèi)存中取出intance實例放到Service中,即注冊成功了        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);    }

registerInstance()干了兩件事兒,第一就是createEmptyService()方法從請求參數(shù)匯總獲得serviceName(服務(wù)名)和namespaceId(命名空間Id),第二就是調(diào)用registerInstance注冊實例。先看一下createEmptyService方法。

7.3.2 服務(wù)端構(gòu)建注冊表

Nacos的注冊表是多級存儲結(jié)構(gòu),最外層是通過namespace來實現(xiàn)環(huán)境隔離,然后是group分組,分組下就是服務(wù),一個服務(wù)有可以分為不同的集群,集群中包含多個實例。因此其注冊表結(jié)構(gòu)為一個Map,類型是:Map>外層key是namespace_id,內(nèi)層key是group + serviceName,Service內(nèi)部維護一個Map,結(jié)構(gòu)是:Map的key是clusterName,其值是集群信息;Cluster內(nèi)部維護一個Set集合Set ephemeralInstances和Set persistentInstances,元素是Instance類型,代表集群中的多個實例。

createEmptyService()方法就是服務(wù)端構(gòu)建注冊表的方法,基于AP架構(gòu)的Nacos實際就是將注冊實例信息保存在內(nèi)存中。

/**     * 1、創(chuàng)建一個Serivice對象,內(nèi)部包含了一個clusterMap。     * 2、將service對象放入到SeriviceMap中,結(jié)構(gòu)為:Map>。     * 3、開啟一個定時任務(wù)用來檢測實例的心跳是否超時,每5秒執(zhí)行一次。     *     * @param namespaceId     * @param serviceName     * @param local     * @throws NacosException     */    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {        createServiceIfAbsent(namespaceId, serviceName, local, null);    }        public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)            throws NacosException {        Service service = getService(namespaceId, serviceName);        //第一次注冊進來,從注冊表里獲取命名空間,肯定是為null,所以需要構(gòu)建一個命名空間,        //設(shè)置nameSpace等信息,如果Service實例為空,則創(chuàng)建并保存到緩存中        if (service == null) {                        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);            service = new Service();            service.setName(serviceName);            service.setNamespaceId(namespaceId);            service.setGroupName(NamingUtils.getGroupName(serviceName));            // now validate the service. if failed, exception will be thrown            service.setLastModifiedMillis(System.currentTimeMillis());            service.recalculateChecksum();            if (cluster != null) {                cluster.setService(service);                service.getClusterMap().put(cluster.getName(), cluster);            }            service.validate();            //注冊和初始化,通過putService()方法將服務(wù)緩存到內(nèi)存            putServiceAndInit(service);            if (!local) {                addOrReplaceService(service);            }        }    }

createEmptyService()方法主要作用如下:

創(chuàng)建一個Serivice對象,內(nèi)部包含了一個clusterMap;將service對象放入到SeriviceMap中,結(jié)構(gòu)為:Map>;開啟一個定時任務(wù)用來檢測實例的心跳是否超時,每5秒執(zhí)行一次。

createServiceIfAbsent()方法主要作用在于第一次注冊進來,從注冊表里獲取命名空間,肯定是為null,所以需要構(gòu)建一個命名空間,設(shè)置nameSpace等信息并保存到緩存中。這個方法里值得注意的是putServiceAndInit()方法,可以點進來看一下這個方法:

private void putServiceAndInit(Service service) throws NacosException {        //構(gòu)建注冊表雙層map,初始化serviceMap --> Map> serviceMap        putService(service);        //初始化service,開啟心跳檢測的線程        service.init();        //實現(xiàn)數(shù)據(jù)一致性監(jiān)聽        consistencyService                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);        consistencyService                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());    }

這里我著重putService(service)方法,這里實際是將注冊的實例緩存到內(nèi)存的注冊表中

/** * 通過putService()方法將服務(wù)緩存到內(nèi)存 * * @param service service */public void putService(Service service) {    if (!serviceMap.containsKey(service.getNamespaceId())) {        //雙檢索防止并發(fā),為了防止同一個服務(wù)多個地方同時注冊        synchronized (putServiceLock) {            if (!serviceMap.containsKey(service.getNamespaceId())) {                //構(gòu)建NamespaceId,Serivce對象放到了ServiceMap里面了,也就是說下次我們再調(diào)用getService(namespaceId)的時候就可以獲取到一個Service對象了                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());            }        }    }    //構(gòu)建 service name    serviceMap.get(service.getNamespaceId()).put(service.getName(), service);}
7.3.3 Nacos服務(wù)端心跳機制

接下來我們看一下 putServiceAndInit(Service service)方法中的,init()初始化方法是怎么保持心跳連接的。

/** * service.init()建立心跳機制 */public void init() {    //客戶端心跳檢查任務(wù),每隔5s執(zhí)行一次,clientBeatCheckTask是一個線程的方法    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);    for (Map.Entry entry : clusterMap.entrySet()) {        entry.getValue().setService(this);        entry.getValue().init();    }}/** * Schedule client beat check task with a delay. * * @param task client beat check task */public static void scheduleCheck(ClientBeatCheckTask task) {    //客戶端的心跳任務(wù),這里并沒有嵌套調(diào)用,而是開啟延遲5s的任務(wù),然后每隔5秒鐘執(zhí)行一次    futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));}public class ClientBeatCheckTask implements Runnable {        private Service service;        public ClientBeatCheckTask(Service service) {        this.service = service;    }        @JsonIgnore    public PushService getPushService() {        return ApplicationUtils.getBean(PushService.class);    }        @JsonIgnore    public DistroMapper getDistroMapper() {        return ApplicationUtils.getBean(DistroMapper.class);    }        public GlobalConfig getGlobalConfig() {        return ApplicationUtils.getBean(GlobalConfig.class);    }        public SwitchDomain getSwitchDomain() {        return ApplicationUtils.getBean(SwitchDomain.class);    }        public String taskKey() {        return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName());    }        @Override    public void run() {        try {            /**             * nacos心跳在集群架構(gòu)下只允許在一臺機器上執(zhí)行健康檢查任務(wù)             *             * 集群中有多臺機器,本方法在于對服務(wù)名稱做hash運算再對機器數(shù)量取模后,那么             * 這里每次只有定位到一臺機器,其他機器都直接return了             *             * 疑問:如果一臺機器掛了會怎么辦?這里取模會不會亂掉?那這里會不會要做一致性hash?             * 在nacos集群中每臺機器之間也是存在狀態(tài)同步的,每臺機器之間都有集群節(jié)點同步任務(wù),詳見com.alibaba.nacos.naming.cluster.ServerListManager.ServerStatusReporter             *             */            if (!getDistroMapper().responsible(service.getName())) {                return;            }                        if (!getSwitchDomain().isHealthCheckEnabled()) {                return;            }            //獲取服務(wù)端所有實例            List instances = service.allIPs(true);                        // first set health status of instances:            /**             *  for循環(huán)對每個實例都做健康檢查             *  在這個方法里面主要是循環(huán)當(dāng)前service的每一個臨時實例 用當(dāng)前時間減去最后一次心跳時間 是否大于心跳超時時間來判斷心跳是否超時,             *  如果大于這個時間會執(zhí)行instance.setHealthy(false)將實例的健康狀態(tài)改為false;但是這個定時任務(wù)不會立即執(zhí)行,會每5秒執(zhí)行一次:             */            for (Instance instance : instances) {                //判斷心跳是否超時:當(dāng)前時間 - 實例上次心跳時間 > 心跳的超時時間【默認是15秒】?                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {                    if (!instance.isMarked()) {                        if (instance.isHealthy()) {                            //如果大于心跳默認時間,把實例的 healthy 設(shè)置為false【服務(wù)列表一開始不會刪掉,一開始會變成false】                            instance.setHealthy(false);                            Loggers.EVT_LOG                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",                                            instance.getIp(), instance.getPort(), instance.getClusterName(),                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());                            getPushService().serviceChanged(service);                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));                        }                    }                }            }                        if (!getGlobalConfig().isExpireInstance()) {                return;            }                        // then remove obsolete instances:            for (Instance instance : instances) {                                if (instance.isMarked()) {                    continue;                }                //當(dāng)前時間 - 實例上一次心跳時間 > 實例的刪除時間【默認30s】                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {                    // delete instance                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),                            JacksonUtils.toJson(instance));                    //直接刪除實例                    deleteIp(instance);                }            }                    } catch (Exception e) {            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);        }            }        private void deleteIp(Instance instance) {                try {            NamingProxy.Request request = NamingProxy.Request.newRequest();            request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))                    .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())                    .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());            //調(diào)用本地服務(wù)            String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath()                    + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();  //  /v/ns/instance                        // delete instance asynchronously:            HttpClient.asyncHttpDelete(url, null, null, new Callback() {                @Override                public void onReceive(RestResult result) {                    if (!result.ok()) {                        Loggers.SRV_LOG                                .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",                                        instance.toJson(), result.getMessage(), result.getCode());                    }                }                    @Override                public void onError(Throwable throwable) {                    Loggers.SRV_LOG                            .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(),                                    throwable);                }                    @Override                public void onCancel() {                        }            });                    } catch (Exception e) {            Loggers.SRV_LOG                    .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);        }    }}

可以看出init方法是開啟了一個異步線程ClientBeatCheckTask去做了個周期性發(fā)送心跳的機制,方法中客戶端心跳檢查任務(wù),開啟延遲5s的任務(wù),然后每隔5秒鐘執(zhí)行一次。

service.init()方法主要通過定時任務(wù)不斷檢測當(dāng)前服務(wù)下所有實例最后發(fā)送心跳包的時間。在這個方法里面主要是循環(huán)當(dāng)前service的每一個臨時實例,用當(dāng)前時間減去最后一次心跳時間是否大于15s來判斷心跳是否超時,如果大于這個時間會執(zhí)行instance.setHealthy(false)將實例的健康狀態(tài)改為false,但是這個定時任務(wù)不會立即執(zhí)行,會每5秒執(zhí)行一次;當(dāng)前時間 - 實例上一次心跳時間 > 實例的刪除時間【默認30s】就會刪除實例。

那么服務(wù)實例的最后心跳包更新時間是誰來觸發(fā)的呢?實際上前面在說客戶端注冊時有說到, Nacos客戶端注冊服務(wù)的同時也建立了心跳機制。

7.3.4服務(wù)端實例注冊

上文中registerInstance注冊實例方法中還有一個最最重要的方法就是addInstance()方法,其本質(zhì)上就是把當(dāng)前注冊的服務(wù)實例保存到Service中。

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)            throws NacosException {                String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);                Service service = getService(namespaceId, serviceName);                synchronized (service) {            //將需要注冊的實例全部放到Cluster,再將Cluster放在Service里            List instanceList = addIpAddresses(service, ephemeral, ips);                        Instances instances = new Instances();            instances.setInstanceList(instanceList);            //看一下 consistencyService 對象初始化的地方就知道走的是哪個實現(xiàn)            consistencyService.put(key, instances);        }    }    public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {        //根據(jù) ephemeral 取值默認是 true為臨時實例,臨時實例是存放在內(nèi)存的;false即為永久實例寫到文件的,可以通過此參數(shù)區(qū)分nacos是AP還是CP架構(gòu)        return ephemeral                ? buildEphemeralInstanceListKey(namespaceId, serviceName)                : buildPersistentInstanceListKey(namespaceId, serviceName);    }

這里著重看一下這個put方法,put方法主要做了兩件事,第一對對客戶端的請求過來的實例進行注冊,第二是Nacos集群架構(gòu)下的數(shù)據(jù)同步,Nacos默認用的是臨時實例,也就是ephemeral = true,也就是本文的重點AP架構(gòu)的Nacos注冊原理。

public void put(String key, Record value) throws NacosException {        //注冊邏輯:實際就是把實例注冊任務(wù)放到內(nèi)存阻塞隊列中        onPut(key, value);        //AP 架構(gòu)下的節(jié)點數(shù)據(jù)同步        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,                globalConfig.getTaskDispatchPeriod() / 2);    }    /**    * 注冊邏輯    */    public void onPut(String key, Record value) {                if (KeyBuilder.matchEphemeralInstanceListKey(key)) {            Datum datum = new Datum<>();            datum.value = (Instances) value;            datum.key = key;            datum.timestamp.incrementAndGet();            //把客戶端信息注冊信息更新到注冊表            dataStore.put(key, datum);        }                if (!listeners.containsKey(key)) {            return;        }        //這里放的是DataOperation.CHANGE        notifier.addTask(key, DataOperation.CHANGE);    }

先來看一下onPut()方法,不難發(fā)現(xiàn)當(dāng)注冊實例數(shù)據(jù)有改變時,就無腦將這個實例扔到這個task內(nèi)存阻塞隊列中去,具體可以看一下addTask()方法。

public class Notifier implements Runnable {                private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024);        //用于存放即將要注冊實例信息的內(nèi)存阻塞隊列        private BlockingQueue> tasks = new ArrayBlockingQueue<>(1024 * 1024);                /**         * Add new notify task to queue.         *         * @param datumKey data key         * @param action   action for data         */        public void addTask(String datumKey, DataOperation action) {                        if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {                return;            }            if (action == DataOperation.CHANGE) {                services.put(datumKey, StringUtils.EMPTY);            }            /**             * 把客戶端的參數(shù)封裝成pair對象后,放進了一個內(nèi)存隊列中,注冊就結(jié)束了,看這里并沒有把客戶端的注冊信息寫進雙層map中             * 憑經(jīng)驗?zāi)懿碌剑@里把客戶端對象放進內(nèi)存隊列,后續(xù)肯定是通過異步起線程的方式去注冊             */            tasks.offer(Pair.with(datumKey, action));        }                public int getTaskSize() {            return tasks.size();        }                @Override        public void run() {            Loggers.DISTRO.info("distro notifier started");            /**             * Spring啟動時就會開啟一個線程加載 Notifier任務(wù),這里就會死循環(huán)一直從內(nèi)存隊列中拿取實例信息實現(xiàn)異步注冊             *             * 問   題1:這里的for循環(huán)會占用cpu資源嗎?             *          不會占用,因為tasks是個阻塞隊列,如果tasks中沒有實例信息,這里就會阻塞在這,不會無腦死循環(huán)             *             * 問   題2:為什么要把實例信息都無腦先放在內(nèi)存阻塞隊列中,然后另起一個線程去異步注冊呢?阿里這里為什么要這么設(shè)計?             * 個人理解:nacos是在阿里內(nèi)部使用的中間件,一般是需要滿足三高特性【高并發(fā)、高性能、高可擴展】,阿里內(nèi)部就有幾十萬臺機器,             * 如果不能實現(xiàn)高并發(fā)注冊那么肯定會有很多問題。比如訂單服務(wù)A需要注冊到nacos時,是在訂單A啟動時就需要注冊,服務(wù)注冊到nacos的邏輯             * 還是比較復(fù)雜的【詳見com.alibaba.nacos.naming.core.Service#updateIPs】,假如這里不用異步注冊而是用同步注冊的方式,那么             * 服務(wù)注冊到nacos需要花費很多時間,這才是一個注冊到nacos的行為就花費了大量時間,那么如果多幾個中間需要加載的話,那得浪費多少時間?             * 所以這里采用異步注冊。             *             * 問   題3:內(nèi)存阻塞隊列 tasks 會不會有堆積的情況呢?             * 實際上看了com.alibaba.nacos.naming.core.Cluster#updateIps注冊方法可以發(fā)現(xiàn),注冊實際上就是把實例信息寫進一個內(nèi)存集合Set中             * 【com.alibaba.nacos.naming.core.Cluster#ephemeralInstances】這樣的操作其實是很快的,假如真有個運維寫了個批量注冊的腳本             * 把一堆機器同時注冊進來,那這樣確實有可能會造成內(nèi)存阻塞隊列tasks的堆積現(xiàn)象,但是這種情況并沒什么關(guān)系,Eureka有時候?qū)嵗远紩兄?            * 幾十秒,對當(dāng)前的nacos架構(gòu)而言,既然要實現(xiàn)高并發(fā)那么只能犧牲一點實例注冊的即使響應(yīng)時間。正常情況下,即使有幾十臺幾百臺機器同時注冊,             * 由于注冊是內(nèi)存操作,速度也很快,可以說是準實時,基本上正常情況下注冊信息1s就能感知到。             *             */            for (; ; ) {                try {                    //從內(nèi)存隊列中拿出任務(wù)                    Pair pair = tasks.take();                    //拿出pair對象中的客戶端信息去注冊                    handle(pair);                } catch (Throwable e) {                    //這個線程即使拋異常也不終止                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);                }            }        }                private void handle(Pair pair) {            try {                //前面拼接的參數(shù) "com.alibaba.nacos.naming.iplist.ephemeral.namespaceId##serviceName"                String datumKey = pair.getValue0();                //前面?zhèn)鞯腄ataOperation.CHANGE。                DataOperation action = pair.getValue1();                                services.remove(datumKey);                                int count = 0;                                if (!listeners.containsKey(datumKey)) {                    return;                }                                for (RecordListener listener : listeners.get(datumKey)) {                                        count++;                                        try {                        if (action == DataOperation.CHANGE) {                            /**                             * 拿到前面放的map中的客戶端信息dataStore.get,這里的key就是前面的                             * 拼接的參數(shù) "com.alibaba.nacos.naming.iplist.ephemeral.namespaceId##serviceName"                             * 前面放的是DataOperation.CHANGE。                             *                             */                            listener.onChange(datumKey, dataStore.get(datumKey).value);                            continue;                        }                                                if (action == DataOperation.DELETE) {                            listener.onDelete(datumKey);                            continue;                        }                    } catch (Throwable e) {                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);                    }                }                                if (Loggers.DISTRO.isDebugEnabled()) {                    Loggers.DISTRO                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",                                    datumKey, count, action.name());                }            } catch (Throwable e) {                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);            }        }    }

當(dāng)有實例需要注冊時,直接調(diào)用addTask()方法將這個實例信息無腦扔進內(nèi)存阻塞隊列中去,注冊就結(jié)束了。這個應(yīng)該算是Nacos注冊的一個精髓吧,Nacos為了提高性能其源碼使用了大量的異步任務(wù)、異步線程等操作,用這些方式對提升Nacos性能有很大幫助。不難猜到,這里把客戶端實例對象放進內(nèi)存隊列,后續(xù)肯定是通過異步起線程的方式去注冊。

不難發(fā)現(xiàn)addTask()方法是Notifier類的方法,Notifier實現(xiàn)了Runnable接口,很明顯這就是一個異步線程,這里跟上面的猜想一致,Nacos就是通過開啟了一個異步線程實現(xiàn)注冊的,具體的注冊方法直接可以看Notifier線程的run()方法即可。那么這個Notifier線程是啥時候開啟的呢?

@DependsOn("ProtocolManager")@org.springframework.stereotype.Service("distroConsistencyService")public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {        //本類DistroConsistencyServiceImpl注入到了Spring容器中,所以項目啟動時類加載的時候就會初始化這個方法    @PostConstruct    public void init() {        //線程池執(zhí)行器,執(zhí)行notifier線程的任務(wù)        GlobalExecutor.submitDistroNotifyTask(notifier);    }}

了解Spring的應(yīng)該都知道這個是Spring加載的一種初始化方式,Spring啟動時加載這個init方法初始化數(shù)據(jù),就會開啟一個線程加載Notifier任務(wù)。

看了Notifier線程的run()方法,不免會有幾個疑問。第一、這里的for循環(huán)會占用cpu資源嗎?第二、為什么要把實例信息都無腦先放在內(nèi)存阻塞隊列中,然后另起一個線程去異步注冊呢?第三、阿里這里為什么要這么設(shè)計?這樣設(shè)計好處是什么呢?

這里第一個問題不會占用cpu資源,因為tasks是個阻塞隊列,如果tasks中沒有實例信息,這里就會阻塞在這,不會無腦死循環(huán),所以是不會占用cpu資源的;

第二個問題個人理解:Nacos是在阿里內(nèi)部使用的中間件,肯定是需要滿足高并發(fā)、高性能、高可擴展,阿里內(nèi)部估計就有幾十萬臺機器,如果不能實現(xiàn)高并發(fā)注冊那么肯定會有很多問題。比如訂單服務(wù)需要注冊到nacos時,那么訂單啟動時就需要注冊,服務(wù)注冊到Nacos的邏輯還是比較復(fù)雜的【詳見com.alibaba.nacos.naming.core.Service#updateIPs】,假如這里不用異步注冊而是用同步注冊的方式,那么服務(wù)注冊到Nacos需要花費很多時間,這才是一個注冊到Nacos的行為就花費了大量時間,那么如果多幾個中間需要加載的話,那會浪費很多時間,所以這里采用異步注冊。

那么我們再看一下異步寫注冊表的方法:

public void onChange(String key, Instances value) throws Exception {                Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);                for (Instance instance : value.getInstanceList()) {                        if (instance == null) {                // Reject this abnormal instance list:                throw new RuntimeException("got null instance " + key);            }            //設(shè)置權(quán)重默認值啥的            if (instance.getWeight() > 10000.0D) {                instance.setWeight(10000.0D);            }                        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {                instance.setWeight(0.01D);            }        }        //真正的注冊實例的方法        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));                recalculateChecksum();      }        public void updateIPs(Collection instances, boolean ephemeral) {        Map> ipMap = new HashMap<>(clusterMap.size());        for (String clusterName : clusterMap.keySet()) {            ipMap.put(clusterName, new ArrayList<>());        }                for (Instance instance : instances) {            try {                if (instance == null) {                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");                    continue;                }                                if (StringUtils.isEmpty(instance.getClusterName())) {                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);                }                                if (!clusterMap.containsKey(instance.getClusterName())) {                    Loggers.SRV_LOG                            .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",                                    instance.getClusterName(), instance.toJson());                    Cluster cluster = new Cluster(instance.getClusterName(), this);                    cluster.init();                    getClusterMap().put(instance.getClusterName(), cluster);                }                                List clusterIPs = ipMap.get(instance.getClusterName());                if (clusterIPs == null) {                    clusterIPs = new LinkedList<>();                    ipMap.put(instance.getClusterName(), clusterIPs);                }                                clusterIPs.add(instance);            } catch (Exception e) {                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);            }        }                for (Map.Entry> entry : ipMap.entrySet()) {            //make every ip mine            List entryIPs = entry.getValue();            /**             * 注冊邏輯updateIps,更新注冊表信息:             *             * 這個方法里面會將已經(jīng)注冊過的實例列表復(fù)制一份,將新的實例和老的實例都更新到一個集合中,             * 最終再將這個集合更新到真正的實例列表,是一種寫時復(fù)制的思想,主要時為了解決并發(fā)沖突,             * 在寫的過程中,其他線程讀到的還是舊數(shù)據(jù),等真正寫完之后再將數(shù)據(jù)更新回去。             *             * 思考一下,正常情況下在寫之前都要加鎖,不然可能會有讀寫并發(fā)問題,這里為什么不在加鎖?             * 假如這里加了一個鎖之后,相當(dāng)于把讀和寫操作排隊串行化執(zhí)行了,就是讀寫不能同時進行了,             * 這里并發(fā)肯定會很低,所以這里用copy on write機制,將原來的注冊表復(fù)制出一個副本,然后進             * 行修改,此時讀請求進來還是讀老的注冊表,這樣讀寫就能并發(fā)執(zhí)行。             *             * 那這里用讀寫分離和加鎖串行執(zhí)行有什么優(yōu)劣勢嗎?             * 讀寫分離:寫的時候?qū)懙氖歉北?,讀的是老得數(shù)據(jù),這樣可能讀到讀不是最新數(shù)據(jù),只有當(dāng)副本寫完將             * 老數(shù)據(jù)替換,此時讀的才是最新數(shù)據(jù),讀寫分離雖然提高了讀寫并發(fā)但是對數(shù)據(jù)的一致性稍有妥協(xié),但是             * 對于此時注冊的場景而言影響不大,即使是沒有讀到最新數(shù)據(jù)也沒關(guān)系,最多就當(dāng)此服務(wù)啟動的慢一點而             * 已,當(dāng)前這個注冊場景,還是提高并發(fā)注冊能力稍重要些,若是對讀寫數(shù)據(jù)一致場景要求很高時,就必須得             * 加鎖串行執(zhí)行             *             * 加鎖執(zhí)行:讀寫都是加鎖執(zhí)行,寫完后再去讀,讀的一定是最新的數(shù)據(jù),讀寫數(shù)據(jù)強一致,但是這里根本不需要數(shù)據(jù)強一致             *             * 這里有個疑問注冊時每個微服務(wù)都去復(fù)制一個副本,然后將副本替換回原件時會不會有覆蓋的問題?             * 不會,這里是開的一個線程去拿內(nèi)存隊列的數(shù)據(jù)進行注冊的,所以不可能存在覆蓋并發(fā)問題。詳見             * com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#init()方法             *             */            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);        }                setLastModifiedMillis(System.currentTimeMillis());        //上面已經(jīng)更新過了注冊表后,這里需要發(fā)布事件,主動通知客戶端        getPushService().serviceChanged(this);        StringBuilder stringBuilder = new StringBuilder();                for (Instance instance : allIPs()) {            stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");        }                Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),                stringBuilder.toString());             }    /**     * Update instance list.     *     * @param ips       instance list    需要注冊的實例     * @param ephemeral whether these instances are ephemeral  是哪種實例,永久實例還是臨時實例     */    public void updateIps(List ips, boolean ephemeral) {        //根據(jù)傳進來的 ephemeral 判斷是哪種實例,直接把這種實例復(fù)制一份toUpdateInstances        Set toUpdateInstances = ephemeral                ? ephemeralInstances    //臨時實例 【這里這個實例是注冊表已有的實例,這里直接將原有的實例再次復(fù)制了一份,就是toUpdateInstances】                : persistentInstances;  //永久實例 【這里這個實例是注冊表已有的實例,這里直接將原有的實例再次復(fù)制了一份,就是toUpdateInstances】        HashMap oldIpMap = new HashMap<>(toUpdateInstances.size());        //循環(huán)將上面復(fù)制的實例toUpdateInstances放oldIpMap中        for (Instance ip : toUpdateInstances) {            oldIpMap.put(ip.getDatumKey(), ip);        }        /**         * 下面就是對比新注冊的實例與副本實例的差異,判斷新的實例是新增、刪除、還是修改,這里使用了copyOnWrite機制,上面的toUpdateInstances就是         * 復(fù)制出來的副本,新增、刪除、修改都是在副本上進行,之后再將原注冊表覆蓋         * 新增:直接新增到副本中         * 修改:直接修改副本的實例信息         * 刪除:刪除副本的實例         *         */        //更新注冊表        List updatedIPs = updatedIps(ips, oldIpMap.values());        if (updatedIPs.size() > 0) {            for (Instance ip : updatedIPs) {                Instance oldIP = oldIpMap.get(ip.getDatumKey());                                // do not update the ip validation status of updated ips                // because the checker has the most precise result                // Only when ip is not marked, don"t we update the health status of IP:                if (!ip.isMarked()) {                    ip.setHealthy(oldIP.isHealthy());                }                                if (ip.isHealthy() != oldIP.isHealthy()) {                    // ip validation status updated                    Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),                            (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());                }                                if (ip.getWeight() != oldIP.getWeight()) {                    // ip validation status updated                    Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),                            ip.toString());                }            }        }        //刪除注冊表        List newIPs = subtract(ips, oldIpMap.values());        if (newIPs.size() > 0) {            Loggers.EVT_LOG                    .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),                            getName(), newIPs.size(), newIPs.toString());                        for (Instance ip : newIPs) {                HealthCheckStatus.reset(ip);            }        }                List deadIPs = subtract(oldIpMap.values(), ips);                if (deadIPs.size() > 0) {            Loggers.EVT_LOG                    .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),                            getName(), deadIPs.size(), deadIPs.toString());                        for (Instance ip : deadIPs) {                HealthCheckStatus.remv(ip);            }        }                toUpdateInstances = new HashSet<>(ips);

第三個問題:可以看上面這個寫注冊表的源碼,當(dāng)服務(wù)A需要注冊到Nacos時,并不是直接寫進Nacos的注冊表里,實際上是先拷貝了一個副本,訂單服務(wù)注冊寫注冊表時直接寫副本的注冊表,副本寫完后才會替換原來Nacos中的注冊表,所以當(dāng)庫存服務(wù)需要從Nacos拉取服務(wù)時,拉取的是Nacos實際注冊表中的信息,這種設(shè)計方式能夠大大提高Nacos的注冊性能。

類似于CopyOnWriteArrayList的copy on write機制,也就是寫時復(fù)制、讀寫分離設(shè)計思想。這種讀寫分離對于客戶端注冊感知實時性可能會稍差點,但是這種情況并沒什么關(guān)系,Eureka有時候?qū)嵗远紩兄獛资耄瑢Ξ?dāng)前的nacos架構(gòu)而言,既然要實現(xiàn)高并發(fā)那么只能犧牲一點實例注冊的及時響應(yīng)時間。

當(dāng)Nacos注冊成功后,就需要發(fā)布事件,主動通知客戶端,接下來可以看一下發(fā)布事件的源碼:

public void serviceChanged(Service service) {        // merge some change events to reduce the push frequency:        if (futureMap                .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {            return;        }        //時間發(fā)布,發(fā)布事件,通知客戶端        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));    }

這里Nacos會通過udp的方式將服務(wù)變動通知給訂閱的客戶端。Nacos的這種推送模式相對于zk那種利用tcp長連接而言還是會節(jié)約很多資源,即使有大量節(jié)點更新也不會使Nacos出現(xiàn)性能瓶頸。

當(dāng)Nacos客戶端接收到了udp消息后會給服務(wù)端返回一個ack,如果Nacos超時未收到ack,還會有重發(fā)機制,超過了這個超時時間就不再重發(fā)了。雖然udp是個不可靠協(xié)議不能保證消息一定能推送到客戶端,但是Nacos客戶端還是有定時輪訓(xùn)做兜底定時查詢Nacos注冊表。Nacas采用了這兩種機制,既保證了實時性,又保證了數(shù)據(jù)更新不會被漏掉。

7.3.5 Nacos集群新節(jié)點啟動數(shù)據(jù)同步

Nacos數(shù)據(jù)同步分為全量同步和增量同步,全量同步就是初始化數(shù)據(jù)一次性同步,而增量同步是指有數(shù)據(jù)增加的時候,只同步增加的數(shù)據(jù)。

7.3.5.1 Nacos集群全量數(shù)據(jù)同步

Nacos集群有新的節(jié)點啟動時,DistroProtocol類就會在Spring加載時調(diào)用構(gòu)造方法,同時開啟一個數(shù)據(jù)同步任務(wù),該方法會執(zhí)行startVerifyTask()和startLoadTask(),我們重點關(guān)注startLoadTask(),具體代碼如下:

@Componentpublic class DistroProtocol {    /**     * 當(dāng)nacos集群只有兩臺機器時,此時若又新增一臺機器,此時需要將原來兩臺機器的數(shù)據(jù)同步到新的nacos機器上     *     * @param memberManager     * @param distroComponentHolder     * @param distroTaskEngineHolder     * @param distroConfig     */    public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,            DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {        this.memberManager = memberManager;        this.distroComponentHolder = distroComponentHolder;        this.distroTaskEngineHolder = distroTaskEngineHolder;        this.distroConfig = distroConfig;        //本項目啟動時DistroProtocol類加載時需要加載本構(gòu)造方法,開啟一個數(shù)據(jù)同步任務(wù)        startDistroTask();    }        private void startDistroTask() {        if (EnvUtil.getStandaloneMode()) {            isInitialized = true;            return;        }        //啟動startVerifyTask,做數(shù)據(jù)同步校驗        startVerifyTask();        //加載任務(wù)        startLoadTask();    }        private void startLoadTask() {        ///處理狀態(tài)回調(diào)對象        DistroCallback loadCallback = new DistroCallback() {            //處理成功            @Override            public void onSuccess() {                isInitialized = true;            }            //處理失敗            @Override            public void onFailed(Throwable throwable) {                isInitialized = false;            }        };        GlobalExecutor.submitLoadDataTask(                new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));    } }

上面方法會調(diào)用DistroLoadDataTask對象,而該對象其實是個線程,因此會執(zhí)行它的run方法,run方法會調(diào)用load()方法實現(xiàn)數(shù)據(jù)全量加載,代碼如下:

public class DistroLoadDataTask implements Runnable {        private final ServerMemberManager memberManager;        private final DistroComponentHolder distroComponentHolder;        private final DistroConfig distroConfig;        private final DistroCallback loadCallback;        private final Map loadCompletedMap;        public DistroLoadDataTask(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,            DistroConfig distroConfig, DistroCallback loadCallback) {        this.memberManager = memberManager;        this.distroComponentHolder = distroComponentHolder;        this.distroConfig = distroConfig;        this.loadCallback = loadCallback;        loadCompletedMap = new HashMap<>(1);    }    /**     * 數(shù)據(jù)加載過程     */    @Override    public void run() {        try {            //加載數(shù)據(jù)            load();            if (!checkCompleted()) {                GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());            } else {                loadCallback.onSuccess();                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");            }        } catch (Exception e) {            loadCallback.onFailed(e);            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);        }    }}

數(shù)據(jù)同步會通過Http請求從遠程服務(wù)器獲取數(shù)據(jù),并同步到當(dāng)前服務(wù)的緩存中。執(zhí)行流程如下:

首先,loadAllDataSnapshotFromRemote()從遠程加載所有數(shù)據(jù),并處理同步到本機;

第二,transportAgent.getDatumSnapshot()遠程加載數(shù)據(jù),通過Http請求執(zhí)行遠程加載;

第三,dataProcessor.processSnapshot()處理數(shù)據(jù)同步到本地

/**     * 加載數(shù)據(jù),并同步     *     * @throws Exception     */    private void load() throws Exception {        while (memberManager.allMembersWithoutSelf().isEmpty()) {            Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");            TimeUnit.SECONDS.sleep(1);        }        while (distroComponentHolder.getDataStorageTypes().isEmpty()) {            Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");            TimeUnit.SECONDS.sleep(1);        }        for (String each : distroComponentHolder.getDataStorageTypes()) {            if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {                //從遠端機器拉取數(shù)據(jù),從遠程加載所有數(shù)據(jù),并處理同步到本機                loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));            }        }    }    /**     * 從遠端機器拉取數(shù)據(jù)     *     * @param resourceType     * @return     */    private boolean loadAllDataSnapshotFromRemote(String resourceType) {        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);        if (null == transportAgent || null == dataProcessor) {            Loggers.DISTRO.warn("[DISTRO-INIT] Can"t find component for type {}, transportAgent: {}, dataProcessor: {}",                    resourceType, transportAgent, dataProcessor);            return false;        }        //拉取不包含自己的機器        for (Member each : memberManager.allMembersWithoutSelf()) {            try {                Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());                //調(diào)取接口獲取除自己外的所有機器                DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());                //同步數(shù)據(jù)                boolean result = dataProcessor.processSnapshot(distroData);                Loggers.DISTRO                        .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),                                result);                //同步成功直接return,從一臺機器上同步                if (result) {                    return true;                }            } catch (Exception e) {                Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);            }        }        return false;    }        private boolean checkCompleted() {        if (distroComponentHolder.getDataStorageTypes().size() != loadCompletedMap.size()) {            return false;        }        for (Boolean each : loadCompletedMap.values()) {            if (!each) {                return false;            }        }        return true;    }

到這為止實現(xiàn)數(shù)據(jù)全量同步,其實全量同步最終還是互相調(diào)用Nacos提供的api??偨Y(jié)一下全量數(shù)據(jù)同步的過程:

啟動一個定時任務(wù)線程DistroLoadDataTask加載數(shù)據(jù),調(diào)用load()方法加載數(shù)據(jù)調(diào)用loadAllDataSnapshotFromRemote()方法從遠程機器同步所有的數(shù)據(jù)構(gòu)造http請求,調(diào)用httpGet方法從指定的server獲取數(shù)據(jù)同步處理數(shù)據(jù)processData并執(zhí)行監(jiān)聽器listener成功后,就更新data store

7.3.5.2 Nacos集群增量數(shù)據(jù)同步

當(dāng)服務(wù)注冊完成后,Nacos需要將客戶端實例信息同步到Nacos集群其他節(jié)點,可以看一下Nacos底層是怎么實現(xiàn)的。我們再次回到put方法:

public void put(String key, Record value) throws NacosException {        //注冊邏輯:實際就是把實例注冊任務(wù)放到內(nèi)存阻塞隊列中        onPut(key, value);        //AP 架構(gòu)下的節(jié)點數(shù)據(jù)同步        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,                globalConfig.getTaskDispatchPeriod() / 2);    }

上文中已經(jīng)解釋過put方法中的 onPut(key, value)方法,接下來我們再了解一下AP結(jié)構(gòu)Nacos下的節(jié)點數(shù)據(jù)是同步,也就是distroProtocol.sync方法。

** * Start to sync data to all remote server. * * @param distroKey distro key of sync data * @param action    the action of data operation */public void sync(DistroKey distroKey, DataOperation action, long delay) {    //循環(huán)將新增實例同步到除自己外的所有實例,單機for循環(huán)都不會走,集群架構(gòu)就會走本方法    for (Member each : memberManager.allMembersWithoutSelf()) {        //先把每臺機器都數(shù)據(jù)封裝稱distroKeyWithTarget對象        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),                each.getAddress());        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);        //添加到task任務(wù)中        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);        if (Loggers.DISTRO.isDebugEnabled()) {            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());    }}/** * 將集群中除自己外其他需要同步的機器信息添加到一個tasks任務(wù)中,由前面的知識可以猜到這里也是 * 用異步開啟一個線程去拿tasks進行同步新增實例信息到其他nacos機器中 * * @param key  key of task * @param newTask */@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {    lock.lock();    try {        AbstractDelayTask existTask = tasks.get(key);        if (null != existTask) {            newTask.merge(existTask);        }        //將集群中除自己外其他需要同步的機器信息添加到一個tasks任務(wù)中,task是個保存信息的ConcurrentHashMap        tasks.put(key, newTask);    } finally {        lock.unlock();    }}

這里直接把需要同步的信息放在了內(nèi)存的ConcurrentHashMap中,我們看一下這里具體看一下怎么同步其他節(jié)點。

@Componentpublic class DistroTaskEngineHolder {        //延遲任務(wù)執(zhí)行器    private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();        //任務(wù)執(zhí)行引擎器    private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine();        public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {        DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);        delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);    }        public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {        return delayTaskExecuteEngine;    }        public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {        return executeWorkersManager;    }        public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {        this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);    }}

這個類中會創(chuàng)建一個任務(wù)執(zhí)行引擎,代碼如下:

public class DistroDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {        public DistroDelayTaskExecuteEngine() {        super(DistroDelayTaskExecuteEngine.class.getName(), Loggers.DISTRO);    }        @Override    public void addProcessor(Object key, NacosTaskProcessor taskProcessor) {        Object actualKey = getActualKey(key);        super.addProcessor(actualKey, taskProcessor);    }        @Override    public NacosTaskProcessor getProcessor(Object key) {        Object actualKey = getActualKey(key);        return super.getProcessor(actualKey);    }        private Object getActualKey(Object key) {        return key instanceof DistroKey ? ((DistroKey) key).getResourceType() : key;    }}//忽略一些中間省略的代碼public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {    super(logger);    tasks = new ConcurrentHashMap(initCapacity);    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));    //這里又執(zhí)行了一個任務(wù)    processingExecutor            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}/** * 執(zhí)行了一個任務(wù) */private class ProcessRunnable implements Runnable {        @Override    public void run() {        try {            processTasks();        } catch (Throwable e) {            getEngineLog().error(e.toString(), e);        }    }}
public boolean process(NacosTask task) {    if (!(task instanceof DistroDelayTask)) {        return true;    }    DistroDelayTask distroDelayTask = (DistroDelayTask) task;    DistroKey distroKey = distroDelayTask.getDistroKey();    if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {        //將延遲任務(wù)變更成異步任務(wù),異步任務(wù)對象是一個線程        DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);        //將前面封裝到任務(wù)拿出來放在一個隊列中        distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);        return true;    }    return false;}public void addTask(Object tag, AbstractExecuteTask task) {    //拿前面其他實例到任務(wù)    NacosTaskProcessor processor = getProcessor(tag);    if (null != processor) {        processor.process(task);        return;    }    TaskExecuteWorker worker = getWorker(tag);    //將同步數(shù)據(jù)到其他nacos實例到tasks任務(wù)放進一個queue中 在InnerWorker.run()方法中從queue隊列中拿任務(wù)執(zhí)行    worker.process(task);}public boolean process(NacosTask task) {    if (task instanceof AbstractExecuteTask) {        putTask((Runnable) task);    }    return true;}/** * 把任務(wù)同步放進內(nèi)存隊列中 * * @param task */private void putTask(Runnable task) {    try {        queue.put(task);    } catch (InterruptedException ire) {        log.error(ire.toString(), ire);    }}

將同步數(shù)據(jù)到其他Nacos實例到tasks任務(wù)放進一個queue中,然后在InnerWorker.run()方法中從queue隊列中拿任務(wù)執(zhí)行??匆幌戮唧w是怎么執(zhí)行同步任務(wù)的:

/** * Inner execute worker. */private class InnerWorker extends Thread {        InnerWorker(String name) {        setDaemon(false);        setName(name);    }        @Override    public void run() {        while (!closed.get()) {            try {                //從queue中拿同步任務(wù)                Runnable task = queue.take();                long begin = System.currentTimeMillis();                //實際就是執(zhí)行異步同步任務(wù) DistroSyncChangeTask 的run()                task.run();                long duration = System.currentTimeMillis() - begin;                if (duration > 1000L) {                    log.warn("distro task {} takes {}ms", task, duration);                }            } catch (Throwable e) {                log.error("[DISTRO-FAILED] " + e.toString(), e);            }        }    }}

這里從隊列里面拿出來任務(wù)執(zhí)行,不難發(fā)現(xiàn)這里的任務(wù)執(zhí)行的具體方法就是DistroSyncChangeTask類的run方法:

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {        private final DistroComponentHolder distroComponentHolder;        public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {        super(distroKey);        this.distroComponentHolder = distroComponentHolder;    }        @Override    public void run() {        Loggers.DISTRO.info("[DISTRO-START] {}", toString());        try {            //獲取各種參數(shù)            String type = getDistroKey().getResourceType();            DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());            distroData.setType(DataOperation.CHANGE);            //調(diào)用http接口同步任務(wù)            boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());            //同步失敗會繼續(xù)重試            if (!result) {                handleFailedTask();            }            Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);        } catch (Exception e) {            Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);            handleFailedTask();        }    }    /**     * 同步失敗會繼續(xù)重試     */    private void handleFailedTask() {        String type = getDistroKey().getResourceType();        DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);        if (null == failedTaskHandler) {            Loggers.DISTRO.warn("[DISTRO] Can"t find failed task for type {}, so discarded", type);            return;        }        failedTaskHandler.retry(getDistroKey(), DataOperation.CHANGE);    }        @Override    public String toString() {        return "DistroSyncChangeTask for " + getDistroKey().toString();    }}

每個Nacos服務(wù)端實例都會提供這樣的一個api接口供其他Nacos實例調(diào)用,從而同步注冊實例數(shù)據(jù)。DistroSyncChangeTask類的run方法,就是調(diào)用http接口同步任務(wù)接口,將本節(jié)點的注冊實例數(shù)據(jù)同步到其他節(jié)點機器上。

總結(jié)一下上面增量數(shù)據(jù)同步方法:

DistroProtocol 使用 sync() 方法處理AP 架構(gòu)下的節(jié)點數(shù)據(jù)同步

向其他節(jié)點發(fā)布廣播任務(wù)調(diào)用 distroTaskEngineHolder 發(fā)布延遲任務(wù)

調(diào)用 DistroDelayTaskProcessor.process() 方法進行任務(wù)投遞:將延遲任務(wù)轉(zhuǎn)換為異步變更任務(wù)

執(zhí)行變更任務(wù) DistroSyncChangeTask.run() 方法:向指定節(jié)點發(fā)送消息

8、總結(jié)

Nacos避免并發(fā)讀寫的沖突:Nacos在更新實例列表時,會采用CopyOnWrite技術(shù),首先將老得實例列表拷貝一份,然后更新拷貝的實例列表,再用更新后的實例列表來覆蓋舊的實例列表。

Nacos提高注冊并發(fā):為了應(yīng)對阿里巴巴內(nèi)部數(shù)十萬服務(wù)的并發(fā)寫請求Nacos內(nèi)部會將服務(wù)注冊的任務(wù)放入阻塞隊列,采用線程池異步來完成實例更新,從而提高并發(fā)寫能力。

Nacos的服務(wù)發(fā)現(xiàn)分為兩種模式:主動拉取模式,消費者定期主動從Nacos服務(wù)端拉取服務(wù)列表并緩存起來,當(dāng)服務(wù)調(diào)用時優(yōu)先讀取本地緩存中的服務(wù)列表。訂閱模式,消費者訂閱Nacos中的服務(wù)列表,并基于UDP協(xié)議來接收服務(wù)變更通知。當(dāng)Nacos中的服務(wù)列表更新時,會發(fā)送UDP廣播給所有訂閱者。與Eureka相比,Nacos的訂閱模式服務(wù)狀態(tài)更新更及時,消費者更容易及時發(fā)現(xiàn)服務(wù)列表的變化,剔除故障服務(wù)。

關(guān)鍵詞: 定時任務(wù) 服務(wù)列表 不難發(fā)現(xiàn) 命名空間

相關(guān)新聞

Copyright 2015-2020   三好網(wǎng)  版權(quán)所有 聯(lián)系郵箱:435 22 640@qq.com  備案號: 京ICP備2022022245號-21
五月婷婷激情视频网| 色欲AV蜜桃一区二区三| 黑人和日本人av一区二区| 亚洲第一区av中文字幕| 手机看片福利一区二区三区四区| 大屁股熟女一区二区视频| 午夜精品秘一区二区三区| 又粗又长又硬又黄又爽| 亚洲精品乱码久久久久app| 男插女视频大全免费| 琪琪日本福利伦理视频| av里面的动作是真进去吗 | 在线观看中文字幕精品av| 99免费观看在线视频| 情趣视频在线观看91| av天堂新资源在线| 在线观看中文字幕精品av| 亚洲制服丝袜网站中文字幕| 九九热在线精品播放| 女同大尺度视频网站在线观看| 久久99精品热在线观看| 白白色在线免费视频发布视频| 亚洲第一中文字幕成人| 果冻麻豆一区二区三区| 一区二区三区观看在线| 精品美女洗澡一区二区| 亚洲欧美激情国产综合久久久| 天天爱天天日天天爽| 国产igao激情在线视频入口| 果冻麻豆一区二区三区| 九九视频在线观看全部| 欧美日韩一区二区三区成人影院| 91九色尤物无套内射| 裸露视频免费在线观看| 午夜精品小视频在线播放| 一区二区三区四区 在线播放| 午夜一区二区三区视频在线观看| 黑川堇人妻88av| 一区二区三区高清视频3| av里面的动作是真进去吗| 伊人网国产在线播放| 亚洲午夜高清在线观看| 伊人精品成人综合网| 国产精品免费看一区二区三区| 亚洲激情噜噜噜久久久| 玖玖资源站在线观看亚洲| 美女露阴道让男人捅| 亚洲国内精品久久久久久久| 亚洲成人动漫av在线| 欧美日韩精品aaa| 国产一级一国产一级毛片| 97精品久久久久久无码人妻| 神马不卡视频在线视频| 亚洲天堂av最新在线| 秋霞成人午夜鲁丝一区二区三区| 大屁股熟女一区二区视频| 亚洲欧美综合另类最新| 伊人网在线免费观看| 日本电影一级人妻在线播放四区| 日日夜夜免费视频精品| 农村大炕有肉大屁股熟妇| 99久久国产精品免费热| 黑川堇人妻88av| 91青青青国产免费高清| 久久人妻诱惑我视频| 精品国产污污污污免费观看| 韩国资源视频一区二区三区| 欧美视频亚洲视频在线| 欧美日本国产一区二区| 欧美一区二区播放视频| 久久99精品久久久久久三级| 午夜精品小视频在线播放| 欧美一区二区三区爽爽| 国产视频1区2区3区| 韩国资源视频一区二区三区| 天天操天天射天天操天天日| 亚洲美女露隐私av一区二区精品| 亚洲综合第一区二区| 快使劲弄我视频在线播放| 日本高清在线观看不卡视频| 啪啪啪网站免费看视频| 天天色天天射天天日天天干| 国产av剧变态维修工虐杀美女| 亚洲综合天堂av网站在线观看| 日韩av水蜜桃一区二区三区| 91色老久久精品偷偷蜜臀| 男人用大鸡巴狂操女人肉穴| 大尺度久久久久久久| 一区二区三区四区久久久久韩日| 久久久国产精品免费视频网| 蜜乳视频一区二区三区| 丰满人妻被猛烈进入中文字幕| 欧美区日本区国产区| 日本清纯中文字幕版| 亚洲男人天堂最新网址大全| 大香焦一道本一区二区三区| 成人午夜av电影网| 七色福利视频在线观看| 亚洲一区二区中文字幕久久| 日本欧美高清在线观看视频| 狠狠操深爱婷婷综合一区| 日本一区二区三区区别| av在线男人的天堂亚洲| 久久久久夜色国产精品电影| 亚洲午夜精品一级毛片app| 妈妈的朋友2中文字幕在线| 91九色pony蝌蚪| 黑人侵犯人妻森泽佳奈| av网页免费在线观看| 日本丰满熟妇浓密多毛| 亚洲a级视频在线播放| 不卡一二三区别视频| 亚洲欧美国产一本综合首页| 公侵犯人妻中文字幕巨| 黑人大吊大战亚洲女人。| 久久99嫩草99久久精品| 1区3区4区产品乱入视频| 制服丝袜中文字幕熟女人妻| 青娱乐免费视频一二三| 丰满人妻被猛烈进入中文字幕| 中文字幕麻绳捆绑的人妻| 亚洲精品久久久人妻| 少妇熟女天堂网av| 69精品人妻久久久久久久久久久| 精品精品精品精品精品污污污污| 97视频538在线观看| 日本一区二区三区区别| 凹凸视频一区二区在线观看| 天堂在线中文字幕av| 性感美女人妻久久久| 在线视频国产精品欧美| 国产视频成人一区二区| 中日韩又粗又硬又大精品| 婷婷综合缴情亚洲五月伊人| 亚洲精品1卡2卡3卡| 欧美男男在线观看视频网站| 三区美女视频在线观看| 伊人免费观看视频一| 精品久久久久久久久久久久久| 小妹妹爱大棒棒免费观看视频| 91人妻人人做人人爽高清| 夜夜人人干人人爱人人操| 久久午夜免费鲁丝片| 69视频在线精品国自产拍| 国产激情在线观看一区二区三区| av资源中文字幕在线观看| 久久久国产精品免费视频网| 天天操天天干天天舔天天| 天天干天天操天天要| 大香焦一道本一区二区三区| 韩国资源视频一区二区三区| 亚洲熟女乱色一区二区三区视频| 老熟女xxxⅹhd老熟女性| 久久久久久a女人处女| 在线免费视频999| 性高潮视频在线观看日韩| 快色视频在线观看免费| 久久久久久免费观看av| 国产成人在线观看视频播放| 人妻中文字幕亚洲在线| 亚洲av在线免费播放| 可在线免费观看av| 国产资源在线观看二区| 夜夜操天天干夜夜操| 国产肥胖熟女又色又爽免费视频 | 黑鸡巴肏少妇逼视频| 啪啪啪网站免费在线看| 伊人网在线欧美日韩在线| 青青草一个释放的网站| 日本一区二区三区的资源| 一区二区三区四区久久久久韩日| 360偷拍蜜桃臀69式| 性感人妻 中文字幕| 夜夜人人干人人爱人人操| 亚洲a区在线免费观看| 放荡人妻极品少妇全集| 中文字字幕在线精品乱码| av天堂a亚洲va天堂va里番| 成人午夜av电影网| 亚洲欧美精品海量播放| 熟女人妻少妇一区二区| 加勒比东京热绿帽人妻多人操| 午夜五十路久久福利| 黑人和日本人av一区二区| 天天干夜夜爽狠狠操| 蜜乳av一区二区三区免费观看| 欧美黑人1区2区3区| 欧美久久一区二区伊人| 99久久精品视频16| 91美女在线观看视频| 天堂av国产av伦理av| 视频在线+欧美十亚洲曰本| 天天综合久久无人区| 美利坚合众国av天堂| 久99久视频免费观看中文字幕| 97精品人妻免费视频| 中字幕人妻熟女人妻a62v网| 日韩一级视频一区二区三区| 国产亚洲综合5388| 在线有码人妻自拍视频| 国产一区两区三区福利小视频| 天堂在线中文字幕av| 黑人爆操女人免费视频| 女生抠逼自慰啊啊啊啊啊啊啊下载| 制服丝袜中文字幕熟女人妻| 亚洲va999天堂va| 黑人巨大精品一区二区在线| 国产黑色丝袜 在线日韩欧美| 少妇精品视频一区二区免费看| 熟女一区二区视频在线| 新香蕉视频香蕉视频2| 亚洲一区二区精品在线播放| 中文字幕在线字幕乱码怎么设置| 久久久久国产精品二区| 国产精品免费看一区二区三区| 在线中文字幕人妻av| 老司机伊人99久久精品| 久久一级片三上悠亚| 欧美aaaa性bbbbaaaa| 欧美肥妇久久久久久| 午夜92福利1000| 99久久国产精品免费热| 免费的啪啪视频软件| 快使劲弄我视频在线播放| 荣立三等功退休有什么待遇| 中字幕人妻熟女人妻a62v网| 中日韩又粗又硬又大精品| 日本一区二区三区调教性奴视频| 二十四小时日本高清在线观看| 免费24小时人妻视频| 亚洲国产日韩欧美一区二区三区, 精久久久久久久久久久久 | 四虎精品久久免费最新| 天天干夜夜操夜夜骑| 中文字幕熟女人妻丝袜丝在线| 日本韩国欧美在线视频| 最新中文字幕久久久久| 日韩人妻中文字幕二区| 不卡一二三区别视频| 夜色福利视频免费观看| 裸露视频免费在线观看| 97视频538在线观看| 亚洲一区二区三区四区入口 | 精品国产久久久久午夜精品av| 欧美成人少妇人妻精品| 少妇被粗大的猛进69视频| av在线中文字幕在线| 51精品视频在线免费观看| 亚洲欧美另类校园春色| xxxx69在线观看视频| 在线成人教育平台排名| 在线观看中文字幕精品av| av成人三级高清日韩| 91大神在线免费观看视频| 亭亭五月天在线观看| 亚洲免费在线不卡视频| 天堂网免费在线电影| 国产最新av在线免费观看| 中文字幕 中文字幕 亚洲| av天堂新资源在线| 果冻麻豆一区二区三区| 午夜精品久久久久久久精品乱码| 男生用大肌巴操美女骚穴| 在线观看中文字幕少妇av | 中文字幕人妻精品精品| 天天看天天爱天天日| 欧美成人久久久桃色aa| 全球高清中文字幕av| 上床啪啪啪免费视频| 久久国产半精品99精品国产| 熟女人妻精品视频一区| 欧美猛少妇色ⅹⅹⅹⅹⅹ猛叫| 亚洲欧美激情久久久| 国产农村乱子伦精精品视频| 老鸭窝在线毛片观看免费播放| 狠狠操深爱婷婷综合一区| 99久久久久久久久久久久久| 在线免费观看视频18| 亚洲 偷拍 自拍 欧美| 天天做天天日天天搞| 七色福利视频在线观看| 日本高清久久人人爽| 亚洲天堂色综合久久| 911美女片黄在线观看| 亚洲成人动漫av在线| 国内销魂老女人老泬| 蜜乳av中文字幕一区二区| 久久久久久免费观看av| 亚洲熟女人妻自拍在线视频 | 国产成人情侣av在线| 91九色国产在线视频| 高清国产美女a一级毛片| 亚洲免费午夜污福利| 日本美女爱爱视频网站| 免费的啪啪视频软件| 亚洲宅男噜噜噜66在线观看| 亚洲欧美另类校园春色| 福利小视频免费在线| 91精品资源在线观看| 国产精品乱码高清在线观看h| 岳母的诱惑电影在线观看| 欧美成人久久久桃色aa| 久久久久九九九九九12| 后入日韩翘臀蜜桃臀美女| 92麻豆一区二区三区| 中文字幕熟女人妻丝袜丝在线| 50熟妇一区二区三区| 免费啪啪啪网站在线观看| 在宿舍强奷两个清纯校花| 夜夜操天天干夜夜操| 欧美巨大另类极品video| 2020国产激情视频在线观看| 天天曰天天摸天天爽| 岳的大肥屁熟妇五十路| 中文字字幕在线精品乱码| 麻豆午夜激情在线观看| 中文字幕免费啪啪啪| 夫妻黄色一级性生活片| 神马不卡视频在线视频| 92在线播放观看视频| 91久久久久久最新网站| 18禁网站在线点击观看| 一区二区三区资源视频| av 资源在线播放| 九色91操最新在线观看网址| 91精品资源在线观看| 国产av剧变态维修工虐杀美女| 亚洲第一中文字幕成人| 可以直接看av网站| 伦理在线观看未删减中文字幕 | 68视频在线免费观看| 欧美一区二区三区视频看| 国产av精品一区二区三区久久| 老司国产精品视频免费观看| 欧美一级特黄大片做受99| av男人站在线观看| 欧美不卡一二三区精品| 国产av啊啊啊啊啊啊啊| 国产黑色丝袜 在线日韩欧美| 蜜乳av一区二区三区免费观看| 午夜精品久久秘?18免费观看| 天天插天天干天天狠| 97精品人妻免费视频| 亚洲一区二区中文字幕久久| 91色老久久精品偷偷蜜臀| 狠狠操狠狠操狠狠插| 美女把腿张开给男的捅| 91激情四射婷婷综合| 手机视频在线观看一区| 91精品综合久久久久久五月天| 波多野结衣在线一区别| 桃色成人开心激情网| 男生和女生羞羞91在线看| 欧美vr专区日韩vr专区| 日韩久久不卡免费视频| 国产男人的天堂一区| 综合久久伊人久久88| 91在线九色porny| 日本欧美国产在线一区| 午夜情色一区二区三区| 日本欧美视频在线免费| 免费在线观看亚洲福利| 欧美日韩国产在线中文字幕| 黑川堇人妻88av| 97人妻av人人澡人人爽| 91久久久精品成人国产| 久久免费视频ww一区| 亚洲中文字幕无线乱码人妻精品| 日本人妻熟妇丰满成熟HD系列 | 天天看片天天摸天天操| 日本少妇熟女乱码一区二区| 欧美日本亚欧在线观看| 午夜美女福利视频在线| 啊~插得好快别揉我胸了视频| 亚洲资源在线免费观看| 亚洲一区二区精品在线播放| 中文字幕 中文字幕 亚洲| 69精品互换人妻4p| 桃色成人开心激情网| 久久99嫩草99久久精品| 91大神福利视频网| 亚洲成人激情在线综合| 自拍偷自拍亚洲精品10p| 手机看片福利一区二区三区四区| 天天操天天射天天操天天日 | 欧美成人一二三在线网| 久久久久高潮白浆久久| 欧美一级日韩一级亚洲一级va | 99久久人人爽亚洲精品美女| 亚洲男人天堂最新网址大全| 人妻少妇的va视频| 亚洲欧美成人激情在线| 亚洲一区二区精品三区视频| 乱子伦国产一区二区三区| 久久精品国产亚洲av热软件| jizzjizz国产精品传媒| 国产精美视频精品视频精品| 制服丝袜 中文字幕 日韩| 亚洲国产日韩欧美一区二区三区,| 日韩三级精品电影久久久久| 日韩一级视频一区二区三区| 日本少妇人妻凌辱在线| 欧美在线观看视频欧美| 中文字幕一区二区人妻视频| 福利视频导航在线观看| 亚洲中文字幕无线乱码人妻精品 | 丰满人妻被猛烈进入中文字幕| 精品国产人伦一区二区三区| 日本少妇精品免费视频| yellow在线亚洲精品一区| 亚洲av毛片一区二区三区网| 女人高潮潮呻吟喷水网站| 青青免费观看视频| 成年人黄色日本视频| 熟女国内精品一区二区三区| 欧美一级特黄大片做受99| 中文字幕人妻一区二区视频系列 | 国产高清视频www夜色资源| 国产主播诱惑毛片av| 女人扒开逼让男人操 | 欧美国产精品久久久免费| 日韩精品视频一区二区三区在线| 青青操91美女国产| 99久久精品视频16| 91九色尤物无套内射| 国产熟妇色xxⅹ交白浆视频| 亚洲av日韩久久网站| 大屁股熟女一区二区视频| 天海翼亚洲一区在线观看| 欧美日本亚欧在线观看| 美女av色播在线播放| 欧美亚洲愉拍一区二区三区| 91精品在线视频免费视频| 午夜野花视频在线观看| 人妻超清中文字幕在线乱码| 天天摸天天干夜夜操| jandara在线观看| av在线播放观看h| 午夜92福利1000| 亚洲欧洲无码一区2区无码| 91性高湖久久久久久久久久| 鸡巴插进美女的嫩小穴视频| 99久久免费播放在线观看视频| 99国产精品国产精品毛片19| 成人大片男人的天堂| 黑人大巨屌操美女逼| 不卡一二三区别视频| 欧美大鸡吧男操女啊啊啊视频| 亚洲欧美小说中文字幕| 国际日韩日韩日韩日韩日韩| 99999久久久精品| 开心五月综合激情婷婷| 大香焦一道本一区二区三区| 久久精品久久久久观看99水蜜桃| 四虎国产精品国产精品国产精品| 最新福利二区三区视频| 97视频538在线观看| v天堂国产精品久久| 99久久人人爽亚洲精品美女| 亚洲欧洲一区二区三区在线| 9999久久久久老熟妇二区| 97视频538在线观看| 亚洲国产日韩精品在线| 国产av在线免费视频| 欧美在线视频不卡一区| 午夜国产成人精品视频观看| 亚洲最大先锋资源采集站| 天天想要天天操天天干| 日韩成人在线电影首页| 大尺度久久久久久久| 亚洲乱熟女一区二区三区影片| 天天干天天弄天天日| 国产中年夫妇激情高潮| 国产高清视频www夜色资源| 国产精品内射婷婷一级| 国际精品熟女一区二区| 99久久国产精品免费消防器材| 五月激情婷婷四射基地| 2021国产在线视频| 亚洲一区二区三区无码在线| 羞羞漫画无限免费观看秋蝉| 97视频人人爱麻豆| 91九色pony蝌蚪| 男人资源站中文字幕| 国产精美视频精品视频精品| 亚洲欧美不卡专业视频| 天天夜夜久久精品综合| 91系列视频在线播放| 91精品视频在线观看视频| 国产 少妇 一区二区| 亚洲gay视频在线观看| 成人人妻h在线观看| 在线观看中文字幕精品av| 国产福利小视频在线观看网站| 人妻人妻在线视频网站| 国产精品亚洲精品亚洲| 熟女阿高潮合集一区二区| 中文字幕 中文字幕 亚洲| 亚洲欧美综合另类最新| 人妻被强av系列一区二区| 一看就是假奶的av| 亚洲欧美韩国日本一区二区| 国产黑色丝袜 在线日韩欧美| 三级欧美日韩一区二区三区| 青娱乐不卡视频在线| 欧美情色av在线观看| 女人高潮潮呻吟喷水网站| 黄色网络中文字幕日本| 自拍偷拍亚洲综合第一页| 国产视频成人自拍蝌蚪视频| 97精品久久久久久无码人妻| 92麻豆一区二区三区| 国产精品内射婷婷一级| 中文字字幕在线精品乱码| 亚洲熟女少妇中文字幕系列| 欧洲精品在线免费观看| 2018中文字字幕人妻| 91色老久久精品偷偷蜜臀| 亚洲午夜精品视频节目| 成年人免费福利在线| 91人妻人人做人人爽高清| 性感人妻 中文字幕| 先锋人妻啪啪中文字幕| 96在线观看免费播放| 91系列视频在线播放| 五月的婷婷综合视频| 77亚洲视频在线观看| 狂操鸡巴小骚逼视频免费观看| 视频免费在线观看网站| av无限看熟女人妻另类av| 日本东京热视频欧美视频| 蜜臀久久精品久久久久久av| 青青国产95免看视频| av在线男人的天堂亚洲| 日本久久久久久黄色| yy4080黄色片| 韩国一级片最火爆中文字幕| 黄很色很在线免费视频网站| 日韩黄色在线观看网站上| tobu8日本高清| 99re这里是国产精品首页| 日本久久久久久黄色| 成人av中文字幕在线看| 在线观看网站伊人网| 免费啪啪啪网站在线观看| 国产中年夫妇激情高潮| 亚洲欧美国产一本综合首页| 99热在线只有的精品| 久久精品四虎夜夜拍拍拍| 中文字幕熟女人妻一区| 中文字字幕在线精品乱码| 亚洲最大先锋资源采集站| 国产精品免费看一区二区三区| 大尺度av毛片在线网址| 视频在线 一区二区| 亚洲av日韩久久网站| 国产探花自拍亚洲av| 韩国在线播放一区二区三区| 人人妻人人爽人人摸| 日本一道中文字幕99| av激情四射五月婷婷| 杜达雄啪啪毛片视频| 伊人网国产在线播放| 天天干天天日天天弄| 瑟瑟干视频在线观看| 91色哟哟视频在线观看| 51vv精品视频在线观看| 亚洲男人的天堂最新网址| 日本亚洲午夜福利一区二区三区| 国产激情免费在线视频| 人妻系列在线免费视频| 97人妻在线视频自拍| av里面的动作是真进去吗| 中文字幕人妻精品精品| 日本欧美亚洲国产啊啊啊| 黑人侵犯人妻森泽佳奈| 亚洲女人自熨在线视频| 日本人妻少妇xxxxxxx| 午夜福利在线不卡视频| 久久人人爽人人爽人人av东京热 | av资源中文字幕在线观看| 91精品视频在线观看视频| 国产 亚洲 欧美 自拍| 天天色天天射天天日天天干| 日韩三级精品电影久久久久| 亚洲制服丝袜在线看| 少妇被中出一区二区| a级黄片免费观看| 男生和女生羞羞91在线看| 韩国在线播放一区二区三区| 欧美人与动欧交视频| 亚洲美女午夜激情视频在线观看| 久久久人妻免费视频| yy4080黄色片| 欧美一区二区三区视频看| 天天想要天天操天天干| 68视频在线免费观看| 不卡高清一区二区三区| 国产又粗又长又大视频| 国产成人av在线你懂得| 免费看超污视频在线观看| 中文字幕人妻一区色偷偷久久| 午夜92福利1000| 欧美在线观看视频欧美| 97精品国产91久久久| 亚洲字幕一区二区夜色av| 99久久久久久久久久久久久| 中文乱码字幕人妻熟女人妻| 日本人妻少妇xxxxxxx| 黄色av 在线观看| 欧美最新一区二区三区| 看女人大BB群伦交| 亚洲午夜精品视频节目| 区一区二区三免费观看视频| 黄版视频在线免费观看| 日本少妇丰满大bbb的小乳沟| 4438x亚洲最大的成人| 啊不行啊操逼好爽大鸡吧视频| 夫妻黄色一级性生活片| 深夜福利免费观看在线看| 久久精品国产亚洲av清纯| 91精品夜夜夜一区二区蜜桃| 日韩av电影中文在线免费观看| 超peng视频在线免费播放97| 黄片视频免费观看视频| 性高潮视频在线观看日韩| 黄在线看片免费人成视频| 91九色国产在线视频| 亚洲制服丝袜在线看| 男插女视频大全免费| 99免费观看在线视频| 狠狠操深爱婷婷综合一区| 国产激情一区二区视频| 美女妩媚午夜诱惑网站| 国产资源在线观看二区| 欧美成人性生活视频播放| 国产又粗又长又大视频| 日韩国产欧美久久一区| 午夜92福利1000| 男生和女生羞羞91在线看| 一区二区三区不卡免费视频网站| 九热精品视频在线观看| 伊人网国产在线播放| 亚洲国产精品 久久久| 一二区二区不卡视频| 国产探花自拍亚洲av| 精品人妻在线激情视频| 中文字幕亚洲无线乱码| 日本不卡 中文字幕| 老牛影视在线一区二区三区| 国产视频成人一区二区| 亚洲欧美成人激情在线| 天天日天天干天天日天天干天天| 中文字幕中文字幕在线中…一区| 免费看一级高潮喷水片| 成人精品动漫一区二区| 亚洲欧洲无码一区2区无码| 十八禁黄色免费污污污亚洲| 亚洲中文字幕无线乱码人妻精品 | 午夜精品一区二区三区不卡顿| 性高潮视频在线观看日韩| 岳母的诱惑电影在线观看| 大陆中文字幕视频在线| 黑鸡巴肏少妇逼视频| 高潮喷水一区二区三区| 福利一二三在线视频观看| 日韩人妻中文字幕二区| 91精品国产成人久久久久久| 91亚洲国产成人久久精品| 欧美大胆a级视频秒播| 久久久视频在线播放| 人妻色综合aaaaaa网| 精产国品一二三产品区别97| 96在线观看免费播放| 日韩久久九九精品视频| 中文字幕av特黄毛片| a级黄片免费观看| 国产成人在线观看hd| 日韩成人精品久久久免费看| 99色在线观看免费观看| 亚洲国产日韩a在线欧美| 亚洲制服丝袜美腿在线| 国产大桥未久一区二区| 久久视频 在线播放| 99999久久久精品| 60路70路日本熟妇| 91精品久久久久久久99蜜月| 韩国资源视频一区二区三区| 天天插天天干天天狠| 首页欧美日韩中文字幕| 国产igao激情在线视频入口| www,日韩av,com| 红桃视频国产av在线| 黄很色很在线免费视频网站 | 岳的大肥屁熟妇五十路| av福利免费体验观看| 18禁男女啪啪啪无遮挡| 蜜桃臀av在线一区二区| 69国产在线视频网站| 国产精品 亚洲欧美 自拍偷拍 | 亚洲综合天堂av网站在线观看| 一区二区三区免费版在线| 日韩av电影中文在线免费观看| 一区二区九日韩美女| 欧美最新一区二区三区| 不卡一区二区视频在线| 老熟女xxxⅹhd老熟女性| 五十岁熟妇高潮喷水| 女同大尺度视频网站在线观看| 国产精品igao为爱寻找激情| 大香蕉尹人在线最新| 99在线视频精品观看高| 午夜国产一区二区三区| 日本韩国福利在线播放| 日本a级2020在线观看 | 2020精品视频在线| 亚洲av网站一区二区三区| 亚洲av 综合av| 中文字幕观看中文字幕免费 | 夫妻黄色一级性生活片| 黑人巨大精品一区二区在线| 国产成人情侣av在线| 亚洲av综合av一去二区三区| 国产黄色主播网址大全在线播放| 天海翼亚洲一区在线观看| 日韩黄色在线观看网站上| 2020国产激情视频在线观看| 日本高清在线观看不卡视频| 国产成人在线观看视频播放| 天天操天天日天天插天天舔| 日韩激情文学在线视频| 日韩av电影中文在线免费观看| 91精品综合久久久久久五月天| 97cao在线视频| 精品免费一区二区三区四区视频| 亚洲天堂色综合久久| 亚洲成a人片777777张柏芝| 亚洲a区在线免费观看| 大香蕉在线欧美在线视频| 老色鬼精品视频在线观看播放| 日韩成人精品久久久免费看| 亚洲制服丝袜网站中文字幕| 天天插天天操天天射天天干| 免费成人av麻豆| 九十九步都是爱最后一步是尊严| 操人妻人妻天天爽天天偷| 白白色在线免费视频发布视频| www,日韩av,com| 亚洲人精品午夜射精日韩| 亚洲精品1卡2卡3卡| 精久久久久久久久久久久| 五月天天堂视频在线| 不卡在线一区二区三区| 伊人网在线欧美日韩在线| 国产免费久久精品99re丫丫| 少妇精品视频一区二区免费看| 亚洲成人五月婷婷久久综合| 国产精品剧情av在线播放| 中文人妻av一区二区三区| 不卡高清一区二区三区| 十八禁黄色免费污污污亚洲| 人妻视频网站快射视频网站| 成人av中文字幕在线看| 黄色片免费网站在线| 亚洲 偷拍 自拍 欧美| 正在播放麻豆精品一区二区| 荣立三等功退休有什么待遇| 黑人巨大精品一区二区在线| 青青青在线观看国产| 神马不卡视频在线视频| 亚洲黄色免费在线观看网站| 中字幕人妻熟女人妻a62v网| 国产av精品一区二区三区久久| 可以免费观看日韩av| 久久人妻诱惑我视频| yy4080黄色片| 福利在线国产小视频| 交换的一天中文字幕在线视频| 亚洲另类欧美综合久久| 99国产精品国产精品毛片19| 青娱乐这里只有精品| 亚洲国产日韩欧美一区二区三区,| 国产高清自拍偷拍在线| 亚洲av在线免费播放| 波多野结衣在线一区别| 新亚洲天堂男子av| 天天日天天干天天日天天干天天| 亚洲色视频在线播放网站| 久久一级片三上悠亚| 台湾18禁久久久久久久激情视频| 中文字幕人妻精品精品| 国产欧美福利在线观看| 免费24小时人妻视频| av在线中文字幕在线| 成人午夜麻豆大胆视频| 91精产国品一二三产区区别网站| 91香蕉国产亚洲一二三区| 最新久久这里只有精品| 啪啪啪网站免费在线看| 天天操天天舔天天做| 欧美日本亚欧在线观看| 河北全程露脸对白自拍| 偷拍熟女大胆免费视频| 最新国产午夜激情视频| 国产一区二区三区四区精| 欧美日本国产一区二区| 在线观看中文字幕少妇av| 欧美日韩不卡视频合集| 亚洲日本欧美韩国另类综合| 青青草原在线播放日韩| 久久久西西gogo日本美女人体| 久久久久久a女人处女| 美女网站视频久久精品| 黑人黄色免费一级av| 久久久久久高清一区| 妈妈的朋友2中文字幕在线| 久久无码高清免费视频| 国产午夜羞羞一区二区三区| 亚洲欧美激情久久久| 狂操鸡巴小骚逼视频免费观看| 日本电影一级人妻在线播放四区| 日本亚洲精品视频在线观看| 中文字幕观看中文字幕免费 | 91精产国品一二三产区区别网站 | 中文字幕 首页 人妻| 欧美第一激情综合网欧美激情| 五月的婷婷综合视频| 高清国产美女a一级毛片| 国产美女高潮精品视频| 日本韩国福利在线播放| 五月婷婷激情视频网| 两个人在一起靠逼啊啊啊| 日韩av水蜜桃一区二区三区| 91系列视频在线播放| 欧美男男在线观看视频网站| 日本高清 中文字幕| 欧美性感美女热舞视频| 亚洲a区在线免费观看| 内地精品毛片在线观看| xxnxx国产美女| 小妹妹爱大棒棒免费观看视频| 日本少妇人妻中文在线| 日韩男女视频网站在线观看| 在线播放 日韩 av| 亚成区一区二区人妻熟女| 在线免费观看欧美小视频| 午夜情色一区二区三区| 人妻少妇的va视频| 性高潮视频在线观看日韩| 亚洲一区二区精品三区视频| 熟女国内精品一区二区三区| 日韩精品欧美一区二区| 亚洲一区在线视频观看地址| 无码精品黑人一区二区老人| 国产黄色主播网址大全在线播放| 日本少妇三级交换做爰做| 日本一区二区三区调教性奴视频 | 亚洲精品综合欧美精品综合| 日韩激情文学在线视频| 精品国产无乱码一区二区三区 | 日本不卡视频一二三区| 日本人妻少妇xxxxxxx| 人妻熟女 亚洲 一页二页| 日本四十路人妻熟女| 69av精品国产探花| 在线观看视频免费一区二区三区| ysl蜜桃色7425| 青娱乐这里只有精品| 成人超碰一区二区三区| 大香焦一道本一区二区三区| 亚洲欧美成人午夜一区二区| 日本少妇熟女乱码一区二区| 国产精品国产三级在线高清观看| 婷婷色综合五月天视频| 欧美成人区一区二区三| 欧美精品一区二区三区观看| 亚洲黄色免费在线观看网站| 国产男女无套?免费网站下载| 午夜福利在线不卡视频| 福利美女视频在线观看| 一区二区在线观看视频观看| 在宿舍强奷两个清纯校花| 亚洲国产综合久久精品| 国产午夜羞羞一区二区三区| 亚洲欧美激情久久久| 黄色片免费网站在线| 日本福利片在线播放| 欧美在线视频不卡一区| 九九视频在线观看全部| 欧美国产精品久久久免费| 久久人人爽人人爽人人av东京热| 精品人妻 色中文熟女 oo| av无限看熟女人妻另类av| 97超碰人人爽人人做| 亚洲乱熟女一区二区三区山| 裸日本资源在线午夜| 视频自拍偷拍视频自拍| 亚洲另类激情视频在线看| 黑川堇人妻88av| 一二区二区不卡视频| 大屁股熟女一区二区视频| aa福利影视在线观看| 五月的婷婷综合视频| 国产最新av在线免费观看| 欧美大鸡吧男操女啊啊啊视频| 天堂av国产av伦理av| 正在播放麻豆精品一区二区| 中文在线字幕免费观看日韩视频| avjpm亚洲伊人久久| 日本福利网站一区二区| 男女插鸡巴视频软件| 亚洲中文字幕在线视频观看二区 | 男插女视频大全免费| 顶级欧美色妇4khd| 亚洲天堂男人的天堂| 国产福利一区二区三区在线观看| 性色蜜桃臀x88av天美传媒| 日本欧美视频在线免费| 午夜92福利1000| 中文字幕在线观看亚洲情色| 日韩欧美国产一区二区在线观看| 欧美成人屋影院在线视频观看| 国际日韩日韩日韩日韩日韩| 成人做爰av在线观看网站| 最新国产精品久久精品app| 天堂一区二区三区在线等| 熟妇人妻av无码中文字幕| 91色哟哟视频在线观看| 亚洲经典av中文字幕| 欧美黑人性猛交小矮人| 婷婷综合缴情亚洲五月伊人| 亚欧洲乱码视频一二三区| 99精品久久一区二区| 亚洲国产精品 久久久| 青青草一个释放的网站| 国产漂亮白嫩美女在线图片| 久久99嫩草99久久精品| 中文字幕 人妻 熟女| 亚洲成人自拍av在线| 日本一本午夜在线播放| 5d蜜桃臀女无痕裸感| 国产极品气质外围av| 一区二区三区高清视频3| 韩日一级人添人人澡人人妻精品| 啊~插得好快别揉我胸了视频| 美女把腿张开给男的捅| 国产经典精品欧美日韩| 无码人妻丰满熟妇区五路| av中文字幕国产精品| 最新国产精品综合网高清| jandara在线观看| 国产熟女五十路一区二区三区 | 国产亚洲精品啪啪视频| 制服丝袜 中文字幕 日韩| 99久9在线视频播放| 欧美在线观看一区二区不卡| 黄版视频在线免费观看| 中文乱码字幕人妻熟女人妻| 天天搞天天操天天干| 天堂在线中文字幕av| 国产igao激情在线视频入口| 午夜久久久久欠久久久久| 免费看一级高潮喷水片| 天天操天天舔天天爽| 五月在线视频免费播放91| 每日更新日韩欧美在线| 亚洲宅男噜噜噜66在线观看| 全彩漫画口工18禁| 麻豆午夜激情在线观看| 熟妇精品午夜久久久久| 亚洲国产日韩a在线欧美| jandara在线观看| 日韩人妻中文字幕区| 国产免费久久精品99re丫丫 | 91进入蜜桃臀在线播放| 久久sm人妻中出精品一区二区| 呻吟求饶的人妻中文字幕| 中文字幕熟女人妻一区| 亚洲成人激情在线综合| 色丁香久久激情综合网| 妈妈的朋友2中文字幕在线 | 欧美情色av在线观看| 天天干天天日天天弄| 日本熟妇乱妇熟色视频| 久久久久高潮白浆久久| 河北全程露脸对白自拍| 精品精品精品精品精品污污污污| 69久久夜色精品国产69乱电影| 鸡巴在里面福利视频在线观看| 婷婷色九月综合激情丁香| xxnxx国产美女| 99久久久久久亚洲精品免费| 瑟瑟干视频在线观看| 久久久久性感美女偷拍视频| 丰满少妇高潮喷水视频| 果冻麻豆一区二区三区| 欧美区日本区国产区| 1级黄色片在线观看| 自拍偷自拍亚洲精品10p| 亚洲欧美另类校园春色| 久久99精品热在线观看| 亚洲午夜熟女在线观看| 色视频免费观看网址| 人妻少妇的va视频| 亚洲一区二区三区国产精品电影| 欧美久久蜜臀蜜桃资源吧| 国产中年夫妇激情高潮| 亚洲中文字幕在线视频观看二区| 五月天天堂视频在线| 九色porny91国产| av网页免费在线观看| 午夜福利国产精品久久久久| 亚洲av激情综合网| 91久久久精品成人国产| 中文字幕中文字幕在线中…一区| 4438x亚洲最大的成人| 男人资源站中文字幕| 久草久热这里只有精品| 国产91九色视频在线观看| 亚洲精品色图1234| 美女激情久久久久久久| 日本一区二区三区调教性奴视频| 色视频在线播放免费观看| 熟女一区二区三区综合| 中出小骚货在线观看| 中文字幕在线免费观看人妻| 亚洲最大的自拍偷拍网| 18岁禁一二三区免费体验| 亚洲最大先锋资源采集站| 熟妇高潮久久久久久久| 操操操操操操操操操网| 亚洲美女午夜激情视频在线观看 | 99久9在线视频播放| 日本熟女0930视频| 国产漂亮白嫩美女在线图片| 青青操天堂在线观看视频| 2018中文字字幕人妻| 大乳丰满人妻中文字幕韩国hd| 成人十欧美亚洲综合在线| 久久99国产中文丝袜| 欧美日韩福利视频网| 77亚洲视频在线观看| 天堂一区二区三区在线等| 乌克兰美女操逼高清内射视频| 人妻少妇精品二三区| av无限看熟女人妻另类av| 精品欧美乱码久久久| 91在线九色porny| 中文字幕熟女人妻丝袜丝在线| 国产资源网站在线播放| 福利在线国产小视频| 55夜色66夜色亚洲精品| xxoo福利视频导航| 超碰在线免费观看视频97| 精产国品一二三77777| 亚洲av在线免费播放| 97精品久久久久久无码人妻 | 欧美一级特黄大片在线| 在线免费视频999| 国产av啊啊啊啊啊啊啊| 日本国产亚洲欧美色综合| 全彩漫画口工18禁| 少妇精品视频一区二区免费看| 免费高清av一区二区| 亚洲gay视频在线观看| 18在线观看免费观看| 婷婷色九月综合激情丁香| 开心激情五月天作爱片| 亚洲av中文免费在线| 亚洲国产日韩精品在线| 91超碰九色porny| 日韩一级欧美一级片| 妈妈的朋友2中文字幕在线| 91大神福利视频网| 日本免费人爱做视频在线观看不卡 | 久久人妻诱惑我视频| 日韩人妻中文字幕区| 黄色片免费网站在线| 区一区二区三免费观看视频| av一区二区三区蜜桃| 日本人妻熟妇丰满成熟HD系列| 日本亚洲午夜福利一区二区三区 | 超peng视频在线免费播放97| 男女爱爱好爽视频免费看| 午夜宅男电影av网站| 国产午夜羞羞一区二区三区| 伊人情人成综合视频| 亚洲熟女人妻自拍在线视频| 97精品人妻免费视频| 4日日夜夜精品视频免费| 视频免费在线观看网站| 国产一区两区三区福利小视频| 天天插天天透天天爽| 欧美国产精品久久久免费| 免费看超污视频在线观看| avtt中文字幕手机版| 亚洲人精品午夜射精日韩| 天天干夜夜操夜夜骑| 特级aaaaa黄色片| 亚洲av毛片在在线播放| 婷婷色综合五月天视频| 极品内射老女人操逼视频| 熟女人妻aⅴ一区二区三| 色欲AV亚洲AV无码精品| 欧美在线观看视频欧美| 中文字幕熟女人妻丝袜丝在线| 午夜3p福利视频合集| 69xx精品久久久久| 亚洲精品乱码久久久久app| 国产精品内射婷婷一级| 川上优所有中文字幕在线| 人妻女侠被擒受辱记| 97人妻在线视频自拍| 一区二区三区高清视频3| 亚洲成人偷拍自拍在线| 欧美亚洲愉拍一区二区三区| 东北老女人熟女啪啪视频| 男人的天堂在线2025| 伊人久久综合国产精品| 人妻被强av系列一区二区| 老司机在线视频福利观看| 69久久夜色精品国产69乱电影| 老牛影视在线一区二区三区| 中文字幕日本一二三区| 青娱乐这里只有精品| 可以直接看av网站| 亚洲色视频在线播放网站| 国产清纯一区二区在线观看| 亚洲美女黄色福利视频网站大全| 欧美日韩亚洲tv不卡久久| 亚洲黑人欧美二区三区| 青娱乐免费视频一二三| 亚州av嫩草av极品在线观看| 日日夜夜免费视频精品| 五月在线视频免费播放91| 成人av中文字幕在线看| 一区二区三区 国产日韩欧美| 最新日韩中文字幕啪啪啪| 放荡人妻极品少妇全集| 伊人情人成综合视频| 99久久久久久亚洲精品免费| 91精品国产成人久久久久久| 亚洲成人av在线一区二区| 亚洲美女午夜激情视频在线观看| 一区二区三区午夜福利在线| 老色鬼精品视频在线观看播放| 1区3区4区产品乱入视频| 福利在线国产小视频| 国产精品蝌蚪自拍视频| 国产精品无码无卡免费观| 国产精品久久久久精品三级18| 日韩精品欧美一区二区| 亚洲激情视频在线观看免费| 亚洲在线免费观看18| ysl蜜桃色7425| 99久久久久久亚洲精品免费| 欧美亚洲愉拍一区二区三区| 欧美日本在线免费视频| 久久内射天天玩天天懂色| 网友自拍第一页99热| 人妻在线中文视频视频| 女人高潮潮呻吟喷水网站| 熟妇高潮久久久久久久| www,日韩av,com| 中文字幕福利视频第四页| 啪啪啪网站免费在线看| 91精品国产欧美在线| 少妇熟女天堂网av| 98热视频精品在线观看| 欧美日韩成人高清中文网| 99久久国产精品免费消防器材| 天天操天天搞天天操| 国产黄色主播网址大全在线播放 | 亚洲欧美韩国日本一区二区| 蜜乳视频一区二区三区| 亚洲人人爽人人澡起碰av| 放荡人妻极品少妇全集| 久久99久久99久久97的人| 五月在线视频免费播放91| 99久久99九九九99九| 五月的婷婷综合视频| 538欧美在线观看一区二区三区| av在线男人的天堂亚洲| ysl蜜桃色7425| 天天爽天天操天天插| 亚洲高清一区二区三区久久| 午夜美女福利视频在线| 亚洲国产精品一区51动漫| 一区二区在线观看视频网站| 人妻免费视频黄片在线视频| 日韩一级欧美一级片| 青青操天堂在线观看视频| 欧美日韩黄片免费在线观看| 青青草一个释放的网站| 韩国一级片最火爆中文字幕| 丰满少妇人妻一区二区三区蜜桃| 女人扒开逼让男人操| 91 精品视频在线看| 中文字幕熟女人妻丝袜丝在线| 熟女人妻aⅴ一区二区三| 日本五六十路熟女视频| 成人大片男人的天堂| 亚洲gay视频在线观看| 麻豆午夜激情在线观看| 久久久久久高清一区| 免费24小时人妻视频| 大尺度av毛片在线网址| 91精品一区一区三区| 日韩人妻中文字幕二区 | 丰满少妇人妻一区二区三区蜜桃 | 丰满少妇人妻一区二区三区蜜桃 | 天天日 天天舔 天天射| 狠狠操狠狠操狠狠插| 91超精品碰国产在线观看| 高清国产美女a一级毛片| 九热精品视频在线观看| 欧美日韩黄片免费在线观看| 亚洲国产精品自产拍在线观看| 色丁香久久激情综合网| 国产毛片特级Av片| 日韩黄色在线观看网站上| 中出小骚货在线观看| 老熟女 露脸 嗷嗷叫| 国产精品中文字幕丝袜| 岛国av成人午夜高清| 国产伦理二区三区在干嘛呢| 亚洲 偷拍 自拍 欧美| 92麻豆一区二区三区| 午夜亚洲国产精品中字| 污网址在线观看视频| 91久久精品美女高潮喷水白浆| 久99久视频免费观看中文字幕| 正在播放麻豆精品一区二区| 青青操天堂在线观看视频| 亚洲国产精品一区51动漫| 第一福利视频在线观看| 国产漂亮白嫩美女在线图片| 日本少妇三级交换做爰做| 92在线播放观看视频| 国产乱码有码一区二区三区| 日韩人妻精品久久久久| 天天操,天天射,天天爽| 成人免费视频现网站99在线观看| 男女啪啪啪网站在线观看免费| 新亚洲天堂男子av| 欧美人与动欧交视频| 97精品视频,全部免费| 玖玖资源站在线观看亚洲| 丰满少妇_区二区三区| 内地精品毛片在线观看| 四虎精品久久免费最新| 中文字幕久久久国产| 国产视频1区2区3区| 天天干天天弄天天日| 3344永久在线观看视频下载| 少妇被中出一区二区| 国产 少妇 一区二区| 视频在线+欧美十亚洲曰本| 国产白丝一区二区三区av| 亚洲欧美精品日韩偷拍| 青青免费观看视频| 国产精品午夜无码AV体验区| 国产农村乱子伦精精品视频| 一区二区三区资源视频| 9久re热视频在线精品| 中文字幕欧美一区二区视频| 黑人巨大精品一区二区在线| 狠狠干狠狠操免费视频| 精产国品一二三产品区别91| 日本福利网站一区二区| 午夜精品一区二区三区不卡顿 | 夜夜人人干人人爱人人操| 日本一道中文字幕99| 国产精品久久人人添| 9999久久久久老熟妇二区| 五月的婷婷综合视频| 日本一区二区三区的资源| 亚洲国产美女主播在线观看| 日本少妇精品免费视频| 东京热日韩av影片| 激情久久在线免费观看视频| 人人妻人人澡人人爽97| 汤姆提醒30秒中转进站口| 欧美熟女xx00视频| 人人妻人人爽人人爽欧美一区| 黄色av网址在线播放| 大成色亚洲一二三区| 欧美日韩亚洲国产视频二区| 欧美日韩亚洲国产视频二区| 中文字幕久久久国产| 国产熟女五十路一区二区三区 | 人人妻人人狠人人爽| 天堂av国产av伦理av| 99国产精品久久99久久久| 蜜臀久久精品久久久久久av| 亚洲av网站一区二区三区| 欧洲成熟女人色惰片| 日本少妇熟女乱码一区二区| 天天干夜夜操91视频网站| 亚洲人成小说网站色| 人人妻人人爽人人摸| 天天搞天天操天天干| 伦理在线观看未删减中文字幕| 黄色av 在线观看| 三区美女视频在线观看| 桃色成人开心激情网| 国产一级一国产一级毛片| 亚洲激情噜噜噜久久久| 黑人黄色免费一级av| 97香蕉久久国产超碰| 夫妻黄色一级性生活片| 日本福利片在线播放| 91麻豆精品国产在线| 偷拍欧美日韩另类图片| 欧美aaaa性bbbbaaaa| 美国十次了亚洲天堂网国产| 伊人网在线观看 视频一区| 亚洲AV无码久久精品国产一区老| 成年人黄色日本视频| 亚洲国产日韩a在线欧美| 国产黑色丝袜 在线日韩欧美| 污网址在线观看视频| 青娱乐不卡视频在线| 国产天堂av不卡网| 欧美精品乱码99久久蜜桃免费 | 成人做爰av在线观看网站| 男女啪啪啪啪91av日韩| 欧美插插插插插插| 18禁男女啪啪啪无遮挡| 污网址在线观看视频| 午夜在线观看一级毛| 男人和女人的逼视频| 99福利一区二区视频| 国产女人18毛片水真多精选| 熟女一区二区三区综合| 丰满少妇人妻一区二区三区蜜桃| 凹凸视频一区二区在线观看| 天天干天天色综合久久| 亚洲经典av中文字幕| 欧美日韩在线观看免费播放| 91福利高清在线播放| 欧美亚洲另类精品第一页| 日韩欧美黄色免费网站| 高清国产美女a一级毛片| 欧美亚洲精品色图网站| 天天夜夜久久精品综合| 久久久久夜色国产精品电影| 人妻系列中文字幕大乳丰满人妻| 女同性恋av在线播放| 91色乱一区二区三区| 视频自拍偷拍视频自拍| 人妻被强av系列一区二区| 女人扒开逼让男人操| 福利在线国产小视频| 91在线九色porny| 久久午夜免费鲁丝片| 鸡巴在里面福利视频在线观看| 精品欧美乱码久久久| 99国产精品久久99久久久| 亚洲国产精品久久久久久无码| 黑吊操欧美极品美女| 91精品久久久久久久99蜜月| 不卡在线一区二区三区| 日韩A级毛片免费视频| 少妇被中出一区二区| 99在线视频精品观看高| 欧洲成熟女人色惰片| 97精品国产91久久久| 天天爽天天操天天插| av男人站在线观看| 欧美不卡一二三区精品| 五月的婷婷综合视频| 青青草原在线播放日韩| 色网站在线观看免费| 午夜福利国产精品久久久久| 伊人精品成人综合网| 男女69视频在线观看免费| av天堂新资源在线| 亚洲最强的25个城市| 五十岁熟妇高潮喷水| 熟女人妻aⅴ一区二区三| 人妻免费视频黄片在线视频| 狂操鸡巴小骚逼视频免费观看| 91九色91在线视频| 久久久亚洲熟女一区二区| 99福利一区二区视频| 午夜呻吟亚洲精品中文字幕在上面| 99久久久久久久久久久久久| 一区二区三区高清视频3| 日本a级2020在线观看| 国内自拍第一区二区三区| 午夜野花视频在线观看| 久久人人爽人人爽人人av东京热| 不卡一区二区视频在线| 四虎精品久久免费最新| 天天看天天爱天天日| 性感美女人妻久久久| 91九色尤物无套内射| 日韩人妻一区二区三区在线观看 | 亚洲av日韩久久网站| 天天在线播放日韩av| 午夜国产精品免费视频| 九一精品人妻一区二区三区| 免费24小时人妻视频| 精品国产人伦一区二区三区| 国产igao激情在线视频入口| 在线成人教育平台排名| 啪啪啪网站免费在线看| 亚洲人精品午夜射精日韩| 91人妻人人做人人爽高清| 91九色尤物无套内射| 日韩一级视频一区二区三区| 青青在线免费手机播放视频| 奇米网首页神马久久| 中文字幕亚洲乱码精品无限| 97人妻在线视频自拍| 午夜8050免费小说| 69av精品国产探花| 92午夜免费福利视频www| 男生用大肌巴操美女骚穴| 五十岁熟妇高潮喷水| 欧美大鸡吧男操女啊啊啊视频| 乌克兰美女操逼高清内射视频| 亚洲激情噜噜噜久久久| 天天看片天天摸天天操| 少妇被粗大的猛进69视频| 亚洲国产综合久久精品| 中文字幕麻绳捆绑的人妻| 亚洲全国精品女人久久久| 天天操天天日天天碰| 亚洲a级视频在线播放| 97人妻在线视频自拍| 77亚洲视频在线观看| 亚洲一区二区三区国产精品电影| 蜜臀久久精品久久久久久av| 欧美成人屋影院在线视频观看| 久久久久久高清一区| 人妻系列中文字幕大乳丰满人妻 | 日本一道中文字幕99| 美女欧美视频在线观看免费| 一区二区三区资源视频| 中文字幕亚洲乱码精品无限| 日本成年视频在线免费观看| 欧美亚洲愉拍一区二区三区| jiee日本美女视频网站| 国产高清视频www夜色资源| 午夜久久久久久av五月| 精品高潮呻吟久久av| 久久人妻人人草人人爽| 夜夜人人干人人爱人人操| 日本黄色一级电影网址| av天堂a亚洲va天堂va里番| 黄色av网址在线播放| jizzjizz国产精品传媒| 一区二区三区av免费天天看| 日本清纯中文字幕版| 日韩人妻中文字幕区| 亚洲三级综合在线观看| 大成色亚洲一二三区| 日韩久久九九精品视频| 另类欧美激情校园春色| 性色蜜桃臀x88av天美传媒| 国产视频成人一区二区| 成熟了的熟妇毛茸茸| a级黄片免费观看| 日日夜夜免费视频精品| 狠狠操av一区二区三区| 大香蕉在线欧美在线视频| av一区二区三区蜜桃| av无限看熟女人妻另类av| 久久久久久免费观看av| 欧美亚洲另类精品第一页| 精品视频在线观看免费99| 搞乱在线在线观看视频| 久久久久性感美女偷拍视频| 久久99精品热在线观看| 91在线九色porny| 最近在线中文字幕免费| 韩国在线播放一区二区三区| 九十九步都是爱最后一步是尊严 | 大香蕉尹人在线最新| 极品风骚人妻3p视频|