欢迎您访问高等教育自学考试信息服务网平台!

这可能是讲分布式系统最到位的一篇文章

更新时间:2023-11-22 13:50:12作者:自考教育网

网站是一个应用程序。当系统压力很大时,只能横向扩展,增加多个服务器或容器来平衡负载,避免单点故障影响整个系统。

集中化最明显的好处就是开发、测试、运维会更方便,不用考虑复杂的分布式环境。

这可能是讲分布式系统最到位的一篇文章

缺点也很明显。系统庞大而复杂,难以扩展和维护,所有应用程序每次都必须更新。

集中式拓扑图

鉴于集中式系统的缺点,分布式系统应运而生。分布式系统的背后是由一系列计算机组成的,但用户无法感知背后的逻辑。就像访问单机一样,自然避免了单机故障的问题。

应用可以按照业务类型拆分成多个应用或服务,再按照结构分为接口层和服务层。

我们还可以根据接入定义不同的接口应用入口分,比如移动端和PC端。数据库可以按业务类型拆分成多个实例,单个表可以分为数据库和表。同时增加了分布式缓存、消息队列、非关系数据库、搜索等中间件。

分布式系统虽然好,但是增加了系统的复杂性,比如分布式事务、分布式锁、分布式会话、数据一致性等。这些都是分布式系统中需要解决的难题。

分布式系统也增加了开发和测试运维的成本,工作量增加。如果管理不好,就会变成负担。

分布式拓扑图

分布式系统的核心是分布式服务框架。有了分布式服务框架,我们只需要关注各自的业务,而不需要关注那些复杂服务之间的调用过程。

分布式服务框架

目前业内流行的分布式服务框架有:阿里的Dubbo和Spring Cloud。

与其在这里比较这些分布式服务框架,简单的说一下他们都做了什么,可以让我们使用远程服务像调用本地服务一样简单高效。

服务

服务是一个可以向用户输出功能的模块。基于该技术框架,可以满足用户的需求。

例如,日志服务、权限管理服务、后台服务、配置服务、缓存服务、存储服务、消息服务等。这些服务可以灵活组合或独立运行。

该服务需要一个接口来与系统交互。面向服务的开发应该是分别开发服务,再组合起来。

更直接的例子有:历史详情、留言板、评论、评级服务等。它们可以独立运作,也可以组合成一个整体。

注册中心

注册中心在整个分布式系统的集成中起着最重要的作用。它支持对等集群,需要提供CRUD接口,支持订阅和发布机制,要求非常高的可靠性。一般以Zookeeper集群作为注册中心。

在分布式环境中,服务提供商的服务将部署在多个服务器上,每个服务器将提供服务提供商标识、服务列表、地址、对应端口、序列化协议等信息。到注册表。

注册中心记录服务和服务地址之间的映射关系。通常,一个服务对应多个地址。这个过程称为服务发布或服务注册。

服务调用者将获得所需服务的信息(地址和端口信息、序列化协议等。)根据服务标识符和服务列表从注册表中下载,服务列表将在本地缓存。

当一个服务需要调用其他服务时,直接在这里找到该服务的地址并调用。这个过程称为服务发现。

注册中心

下面是一个以Zookeeper为注册中心的简单实现:

/* * *创建一个节点node * @ param node * @ param data */PublicBooleanCreateNode(string node,string data){ try { byte[]bytes=data . getbytes();//同步创建临时顺序节点字符串path=ZK . Create(ZK constant . ZK _ RPC _ data _ path '/' node '-'bytes,zoodefs.ids.open _ ACL _ unsafe,Create mode . periodic _ sequential);log . info(' createzookeepernode({ }={ })'path,data);} catch(KeeperExceptione){ log . error(' 'e);返回false;} catch(InterruptedExceptionex){ log . error(' 'ex);返回false;} returntrue}

C

比如下面Zookeeper中写的临时顺序节点信息:

com . black . black RPC . test . hello word:发布服务时的外部名称。0000000010,000000011: ZK顺序节点ID。27.0.0.1: 8888,127.0.0.1:8889:服务地址端口。Protostuff:序列化方法。1.0:权重,使用负载均衡策略。

这里使用了Zookeeper的临时顺序节点。为什么使用临时顺序节点?主要考虑以下两点:

当服务提供者异常离线时,与Zookeeper的连接会中断,Zookeeper服务器会主动删除临时节点,并同步到服务消费者。这将防止服务消费者请求不正常的服务器。注意:在实际请求发出之前,一般消费者会心跳当前获取的服务提供者节点,以避免请求连接有问题的节点。在Zookeeper下,不允许创建两个同名的ZK子节点。您可以避免按顺序节点创建相同的名称。当然也可以不用顺序节点,直接用com . black . black RPC . test . hello word创建节点,在这个节点下创建数据节点。

以下是ZK的数据同步过程:

/* * *同步节点(通知模式)*syncNodes会级联,每个观察器触发后都会挂起一个新的观察器。完成了类似chain trigger */publicbooleansycnodes(){ try { list nodelist=ZK . get children(ZK constant . ZK _ RPC _ data _ path,new watcher(){ @ Overridepublicvoidprocess(WatchedEventevent){ if(Event . gettype()==Event。event type . node children changed){ sync nodes();}}});map map=new hashmap();for(string node:nodeList){ byte[]bytes=ZK . get data(ZK constant。ZK _ RPC _数据_路径'/'节点,false,null);Stringkey=node.substring(0,node.lastIndexOf(ZkConstant。定界_标记));Stringvalue=newString(字节);object object=map . get(key);如果(反对!=null){((列表)对象)。添加(值);} else { list datalist=newArrayList();dataList.add(值);map.put(key,dataList);}log.info('node:[{}]data:[{}]'node,new string(bytes));}/* *修改连接的地址缓存*/if(map util . not empty(map)){ log . debug('调用ServiceCacheUpdating . ');invokingservicecache . updatainvokingservicemap(map);} returntrue} catch(keeper exception | interrupted exception){ log . error(e . tostring());返回false;}}

本地同步数据时,一般会写入本地文件,防止因Zookeeper集群异常离线而获取服务提供者信息。

通信协议

服务消费者需要有一个网络连接来传输数据,无论是与注册中心还是与服务提供者,这涉及到通信。

作者以前做过这项工作。当时用java BIO只是简单的写一个通讯包。在使用场景中没有太多的并发性,阻塞BIO也没有暴露太多的问题。

Java会在建立连接后阻止线程等待数据,这必须是以线程对线程的方式,即当客户端有连接请求时,服务器需要启动一个线程进行处理。连接数太多时,会建立相当数量的线程,性能会直线下降。

Java NIO:同步是非阻塞的,服务器实现方式是一个请求一个线程,即客户端发送的所有连接请求都注册在复用器上,复用器只在轮询到I/O请求连接时才启动一个线程进行处理。

Java AIO:异步和非阻塞。服务器实现模式是一个线程一个有效请求。客户端的I/O请求都是由OS先完成,然后通知服务器应用启动线程进行处理。

BIO、NIO和AIO的适用情景分析;

BIO:用于连接数量少且固定的体系结构。这种方法对服务器资源的要求很高,其并发性仅限于应用程序,但程序直观、简单、易懂。NIO:适用于连接数量多,连接短(轻操作)的架构,比如聊天服务器。并发仅限于应用程序,编程也很复杂。目前主流的通信框架Netty、Apache Mina、Grizzl、NIO框架都是基于它们的实现。AIO:用于连接数量多、连接长(操作量大)的架构,比如镜像服务器、文件传输等。完全调用OS参与并发操作,编程复杂。

作为基石的通讯,其实要考虑很多东西。如:丢包粘包的情况,心跳机制,断连重连,消息缓存重发,资源的优雅释放,长连接还是短连接等。

下面是妮蒂建立服务端,客户端的简单实现:

进口。妮蒂。自举。服务器引导;导入io。妮蒂。渠道。通道初始值设定项;导入io。妮蒂。渠道。渠道管道;进口。妮蒂。渠道。事件循环组;进口。妮蒂。渠道。nio。nioeventloopgroup进口。妮蒂。渠道。插座。套接字通道;进口。妮蒂。渠道。插座。nio。nioserversocketchannel进口。妮蒂。处理程序。编解码器。lengthfieldbasedframedecoder进口。妮蒂。处理程序。编解码器。lengthfieldpender进口。妮蒂。处理程序。编解码器。字节。bytearraydecoder导入io。妮蒂。处理程序。编解码器。字节。bytearray编码器;导入组织。slf4j。记录者;导入组织。slf4j。伐木工厂;/****nettytcp服务端* @ authorv _王诗雨* */publicsclassnettytcpservice { privatestaticfinalllogger=记录器工厂。获取记录器(nettytcpservice。类);privateStringhostprivateintportpublicNettyTcpService(string address)引发异常{ string str[]=address。拆分('');这个。host=str[0];这个。端口=整数。(str[1])的值;} publicNettyTcpService(string ghost,int port)抛出异常{ this。主机=主机;this.port=port}/**用于分配处理业务线程的线程组个数*/privatestaticfinalintBIZGROUPSIZE=runtime。获取运行时().可用的处理器()* 2;//默认/**业务出现线程大小*/privatestaticfinalintBIZTHREADSIZE=4;/**NioEventLoopGroup实际上就是个线程,*NioEventLoopGroup在后台启动了n个NioEventLoop来处理频道事件,*每一个NioEventLoop负责处理m个频道,*NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理channel */privatestaticfinaletloopgroupsgroup=newNioEventLoopGroup(BIZGROUPSIZE);privatestaticfinalevontloopgroupworgroup=newNioEventLoopGroup(BIZTHREADSIZE);public void start()引发异常{ log。信息(' NettyTcpServiceRun . ');ServerBootstrapb=newServerBootstrap();b.group(老板集团、工人集团);b .通道(nioserversocketchannel。类);b .子处理程序(newChannelInitializer(){ @ overridepublicvoinitchannel(socket channel ch)抛出异常{ ChannelPipelinepipeline=ch。管道();pipeline.addLast('frameDecoder 'newLengthFieldBasedFrameDecoder(Integer .MAX_VALUE,0,4,0,4));pipeline.addLast('frameEncoder 'newlengthfieldprender(4));pipeline.addLast('decoder 'newByteArrayDecoder());pipeline.addLast('encoder 'newByteArrayEncoder());//管道。添加last(新编码器());//管道。添加last(new decoder());管道。添加last(newTcpServerHandler());}});绑定(主机,端口)。sync();日志。info(' NettyTcpServiceSuccess!');}/***停止服务并释放资源*/public vid shut down(){ worker group。优雅关机();boss团。优雅关机();} }导入组织。slf4j。记录者;导入组织。slf4j。伐木工厂;进口。妮蒂。渠道。channelhandlercontext进口。妮蒂。渠道。simplechannelinboundhandler/***服务端处理器*/public class tcpserverhandlerextendssimplecineboundhandler { privatestaticfinallogerlog=记录器工厂。获取记录器(TCP服务器处理程序。类);@ overrideprotectedvoidchannelread 0(ChannelHandlerContextctx,Objectmsg)抛出异常{ byte[]data=(byte[])msg;}}

导入组织。slf4j。记录者;导入组织。slf4j。伐木工厂;进口。妮蒂。自举。自举;导入io。妮蒂。渠道。渠道;导入io。妮蒂。渠道。通道初始值设定项;导入io。妮蒂。渠道。频道开发;导入io。妮蒂。渠道。渠道管道;进口。妮蒂。渠道。事件循环组;进口。妮蒂。渠道。nio。nioeventloopgroup进口。妮蒂。渠道。插座。nio。niosocketchannel进口。妮蒂。处理程序。编解码器。lengthfieldbasedframedecoder进口。妮蒂。处理程序。编解码器。lengthfieldpender进口。妮蒂。处理程序。编解码器。字节。bytearraydecoder导入io。妮蒂。处理程序。编解码器。字节。bytearray编码器;进口。妮蒂。util。并发。未来;/***nettytcp客户端* @ authorv _王诗雨* */public class nettytcpclient { privatestaticfinalllogger=logger工厂。获取记录器(nettytcpclient。类);privatestringhostprivateintportprivatebootstrapbootstrapprivatechannelchannelprivateeventloopgroupgublicnettycpclient(string ghost,int port){ bootstrap=get bootstrap();channel=getChannel(主机,端口);this . host=host this . port=port } publicstringethost(){ return host;} publicintgetPort(){返回端口;}/***初始化bootstrap * @ return */publicfinalBootstrapgetBootstrap(){ group=newNioEventLoopGroup();bootstrapb=newBootstrap();b .集团(集团)。频道(niosocketchannel。类);b . handler(newChannelInitializer(){ @ OverrideprotectedvoidinitChannel(channel ch)抛出异常{ ChannelPipelinepipeline=ch。管道();//管道。添加last(新编码器());//管道。添加last(new decoder());pipeline.addLast('frameDecoder 'newLengthFieldBasedFrameDecoder(Integer .MAX_VALUE,0,4,0,4));pipeline.addLast('frameEncoder 'newlengthfieldprender(4));pipeline.addLast('decoder 'newByteArrayDecoder());pipeline.addLast('encoder 'newByteArrayEncoder());pipeline.addLast('handler 'newTcpClientHandler());}});乙。选项(频道开发.SO_KEEPALIVE,true);returnb}/***连接,获取channel * @ param host * @ param port * @ return */publicfinalChannelgetChannel(string ghost,int port){ channel channel=null;试试{channel=bootstrap.connect(主机,端口)。同步()。channel();returnchannel} catch(异常一){ log。信息(字符串。格式('连接服务器(IP[%s],端口[% s])失败!'主机、端口));returnnull}}/***发送消息* @ paramsg * @ throws exception */publicbooleansendMsg(Objectmsg)抛出异常{ if(channel!=null){ channel。writeandflush(msg).sync();日志。debug(' msgflushcsuccess ');return true } else { log . debug(' msgflush fail,connect is null ');返回假的;}}/***连接断开*并且释放资源* @ return */publicbooleandelinkconnect(){//channel。关闭().awaitUninterruptibly();未来未来=集团。关机优雅();//优雅地关闭释放所有资源,并且关闭所有当前正在使用的渠道未来。sync interruptible();returntrue} }导入组织。slf4j。记录者;导入组织。slf4j。伐木工厂;进口。妮蒂。渠道。channelhandlercontext进口。妮蒂。渠道。simplechannelinboundhandler/***客户端处理器*/public class tcpclienthandler rextendssimplicenelinboundhandler { privatestaticfinalllogger log=logger factory。获取记录器(tcpclienthandler。类);@ overrideprotectedvoidchannelread 0(ChannelHandlerContextctx,Objectmsg)抛出异常{ byte[]data=(byte[])msg;}}

说到通讯就不能不说协议,通信时所遵守的规则,访问什么,传输的格式等都属于协议。

作为一个开发人员,应该都了解传输控制协议协议,它是一个网络通信模型,以及一整套网络传输协议家族,是互联网的基础通信架构。

也都应该用过Http(超文本传输协议),Web服务器传输超文本到本地浏览器的传送协议,该协议建立在传输控制协议协议之上。分布式服务框架服务间的调用也会规定协议。

为了支持不同场景,分布式服务框架会存在多种协议,如杜博就支持七种协议:杜博协议(默认),RMI协议黑森协议,HTTP协议,网络服务协议,节俭协议,Memcached协议,Redis协议每种协议应对的场景不尽相同,具体场景具体对待。

服务路由

分布式服务上线时,都是集群组网部署,集群中会有某个服务的多个实例。消费者如何从服务列表中选择合适的服务提供者进行调用涉及到服务路由。分布式服务框架需要能够满足用户灵活的路由需求。

透明路由

许多开源RPC框架调用者需要配置服务提供者的地址信息。虽然通过读取数据库中的服务地址列表可以避免硬编码的地址信息,但消费者仍然要感知服务提供者的地址信息,这违反了透明路由原则。

基于服务注册中心的服务订阅发布,消费者可以通过主动查询和被动通知的方式获取服务提供者的地址信息,而不需要硬编码。

你只需要知道当前系统发布了哪些服务,而不需要知道服务存在于哪里。这是透明路由。

负载均衡

负载均衡策略是服务的一个重要属性。分布式服务框架通常提供多种负载平衡策略,并支持用户扩展负载平衡策略。

随意

一般在对等集群组网中,采用随机算法进行负载均衡,随机路由算法的消息分布比较均匀。JDK提供的java.util.Random或java.security.SecureRandom用于在指定服务提供商列表中生成随机地址。

基于消费者随机生成的服务提供商地址的远程调用;

/* * * Random */PublicClassRandomStrategy implements strategy { @ overridepublicremoteservicebase Select(list list){ int max _ len=list . size();intindex=random util . nextint(MAX _ LEN);return list . get(index);}}

还是有随机性的缺点,部分节点碰撞的概率较高。另外,硬件配置不同时,各个节点的负载也会不均衡。

要避免这些问题,需要对服务列表进行加权,性能好的机器接收请求的概率要高于普通机器:

/* * * Weighted Random */publiclassweighting randomstrategyimplementstrategystrategystrategy { @ overridepublicremoteservicebasesiselect(list){//存储服务提供者的加权列表listruthinglist=newArrayList();for(RemoteServiceBase:list){//放大10倍int weight=(int)(RemoteServiceBase。getweight()* 10);for(inti=0;iweightI){ weighting list . add(remoteServiceBase);} } int max _ LEN=weighting list . size();intindex=random util . nextint(MAX _ LEN);returnweightinglist . get(index);}}

投票

逐个请求服务地址,到达边界后继续回绕。缺点:缓慢的提供者积累请求。

比如二机慢,但是不挂。当我要求第二台机器时,我被卡住了。随着时间的推移,所有的请求都会在第二台机器上被阻塞。

轮询策略的实现非常简单。依次遍历服务提供商列表。到达边界后,复位至零,序列循环继续:

/* * * Polling */publicsclasspollingstrategy implements strategy {//Counter privateinindex=0;privateLocklock=newReentrantLock();@ OverridepublicRemoteServiceBaseselect(list list){ RemoteServiceBaseservice=null;try{lock.tryLock(10,TimeUnit。毫秒);//如果计数大于服务提供者的数量,则将计数器返回0 If(index=list . size()){ index=0;} service=list . get(index);指数;} catch(interrupted exception e){ e . printstacktrace();}最后{ lock . unlock();}//底出,保证程序的健壮性。如果没有获得服务,直接取第一个if(service==null){ service=list . get(0);} returnservice}}

对于轮询,您需要为服务地址增加权重:

/* * *加权轮询*/PublicClassWeighting轮询策略实现Strategy {//Counter privateinindex=0;//计数器锁privatelocklock=newreentrantlock();@ OverridepublicRemoteServiceBaseselect(list list){ RemoteServiceBaseservice=null;try{lock.tryLock(10,TimeUnit。毫秒);//存储加权服务提供者列表listhudisinglist=new ArrayList();for(RemoteServiceBase:list){//放大10倍int weight=(int)(RemoteServiceBase。getweight()* 10);for(inti=0;iweightI){ weighting list . add(remoteServiceBase);} }//如果计数大于服务提供者的数量,则将计数器返回0 If(index=weighting list . size()){ index=0;} service=weighting list . get(index);指数;returnservice} catch(interrupted exception e){ e . printstacktrace();}最后{ lock . unlock();}//确保程序是健壮的。如果没有得到服务,直接取第一个return list . get(0);}}

服务呼叫延迟

缓存消费者的所有服务提供商的呼叫延迟,并定期计算服务的平均呼叫延迟。

然后计算服务调用延迟与每个服务提供者平均延迟的差值,并根据差值动态调整权重,以保证服务延迟大的服务提供者收到的消息少,防止消息堆积。

该策略的特点是:保证处理能力强的服务接收到更多的消息,通过动态权重分配消除服务调用延迟的振荡范围,使所有服务的调用延迟接近平均值,实现负载均衡。

一致散列

具有相同参数的请求总是被发送到统一服务提供商。当一个服务提供商关闭时,最初发送给根提供商的请求基于虚拟节点与其他提供商平均共享,这不会引起剧烈的变化。平台提供默认的虚拟节点数量,虚拟节点数量可以通过配置文件修改。

一致哈希环的工作原理如下图所示:

一致散列

路由规则

负载均衡只能保证服务提供者的压力平衡,但在一些业务场景下,需要设置一些过滤规则,常用的是基本表达式的条件路由。

通过IP条件表达式配置黑白名单访问控制:consumerIP!=192.168.1.1。

只有一些服务提供者被暴露,以防止该集群中的所有服务被淘汰,从而导致其他服务不可用。

例如providerIP=192.168.3*。读写分离:方法=find *、list *、get *、query *=提供者IP=192.168.1.前景与背景分离:app=web=ProviderIP=192.168.1。app=Java=ProviderIP=192.168.2。灰度:将WEB前台应用到新的服务版本:app=WEB app=WEB=provice rip=192 . 168 . 1。*.*.

序列化和反序列化

将对象转换为字节序列的过程称为序列化,将字节序列恢复为对象的过程称为反序列化。

调用过程时,我们需要先序列化Java对象,然后通过网络和IO传输。当我们到达目的地时,我们将对其进行反序列化以获得所需的结果对象。

在分布式系统中,传输的对象会很多,这就需要一种序列化速度快、字节序列小的序列化技术。

序列化技术:Serializable,XML,Jackson,MessagePack,FastJson,Protocol Buffer,Thrift,Gson,Avro,Hessian等。

Serializable是Java自己的序列化技术,不能跨平台,序列化和反序列化的速度比较慢。

XML技术具有良好的跨平台支持,常用于与银行交互的消息,但其字节序列较大,不适合分布式通信框架。

FastJson是一个用Java语言编写的高性能Json处理器,由阿里巴巴开发。它的字节序列是JSON字符串,可读性好,序列化速度非常快。

协议缓冲区的序列化速度非常快,字节序列也很小,但可读性较差。

通用分布式服务框架将有多种内置的序列化协议可供选择。例如,Dubbo支持的七种协议使用不同的序列化技术。

服务调用

在本地环境下,使用一个接口很简单,直接调用就可以了。在分布式环境中,事情就没那么简单了。消费者只有接口的定义,没有具体的实现。

如果要在本地环境中直接调用远程接口,就要费点功夫,需要使用远程代理。

这是我偷的照片:

远程代理

通信顺序如下:

通信时序

使用者没有特定的实现,所以它需要在调用接口时动态创建一个代理类。在与Spirng集成的情况下,它在构建Bean时被直接注入到代理类中。

下面是如何构建代理类:

import Java . lang . reflect . proxy;publicsclassjdkproxy { publicstaticObjectgetInstance(Class cls){ JdkMethodProxyinvocationHandler=newJdkMethodProxy();ObjectnewProxyInstance=proxy . newproxyinstance(cls . get class loader(),newClass[]{cls},invocation handler);return(Object)newProxyInstance;}}

import Java . lang . reflect . invocation handler;import Java . lang . reflect . method;publiclassjdkmethodproxymplementsinvocationhandler { @ OverridepublicObjectinvoke(Object proxy,Methodmethod,Object [] parameters)抛出Throwable {//如果传入的是实现的具体类If(Object . Class . equals(method . get declaring Class())){ try { return method . invoke(this,parameters);} catch(Throwablet){ t . printstacktrace();}//如果传入接口}else{//实现接口的核心方法//returnremotekinvoking . invoking(service name,serializationtype,//timeout,loadbalancestrategy,method,parameters);} returnnull}}

代理会做很多事情,比如序列化被请求服务的名称和参数信息,通过路由选择最合适的服务提供者,建立通信连接发送被请求的信息(或者直接发起Http请求),最后返回得到的结果。

当然还有很多问题需要考虑,比如调用超时,请求异常,通信连接的缓存,同步服务调用还是异步服务调用等等。

同步服务调用:客户端发起远程服务调用请求。用户线程完成消息序列化后,将消息传递给通信框架,然后同步阻塞。等待通信线程发送请求并收到响应后,唤醒同步等待的用户线程,用户线程收到响应后返回。

异步服务调用:基于Java的Futu

re 机制,客户端发起远程服务调用请求,该请求会被标上 RequestId,同时建立一个与 RequestId 对应的 Future,客户端通过 Future 的 Get 方法获取结果时会被阻塞。

服务端收到请求应达会回传 RequestId,通过 RequestId 去解除对应 Future 的阻塞,同时 Set 对应结果,最后客户端获取到结果。

构建 Future,以 RequestId 为 Key,Put 到线程安全的 Map 中。Get 结果时需要写入 Time Out 超时时间,防止由于结果的未返回而导致的长时间的阻塞。

SyncFuturesyncFuture=newSyncFuture();SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(),syncFuture);try{RpcResponserpcResponse=syncFuture.get(timeOut,TimeUnit.MILLISECONDS);returnrpcResponse.getResult();}catch(Exceptione){throwe;}finally{SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());}

结果返回时通过回传的 RequestId 获取对应 Future 写入 Response,Future 线程解除阻塞:

log.debug("TcpClientreceivehead:"+headAnalysis+"TcpClientreceivedata:"+rpcResponse);SyncFuturesyncFuture=SyncFutureCatch.syncFutureMap.get(rpcResponse.getRequestId());if(syncFuture!=null){syncFuture.setResponse(rpcResponse);}

importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.Future;importjava.util.concurrent.TimeUnit;publicclassSyncFutureimplementsFuture{//因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。privateCountDownLatchlatch=newCountDownLatch(1);//需要响应线程设置的响应结果privateTresponse;//Futrue的请求时间,用于计算Future是否超时privatelongbeginTime=System.currentTimeMillis();publicSyncFuture(){}@Overridepublicbooleancancel(booleanmayInterruptIfRunning){returnfalse;}@OverridepublicbooleanisCancelled(){returnfalse;}@OverridepublicbooleanisDone(){if(response!=null){returntrue;}returnfalse;}//获取响应结果,直到有结果才返回。@OverridepublicTget()throwsInterruptedException{latch.await();returnthis.response;}//获取响应结果,直到有结果或者超过指定时间就返回。@OverridepublicTget(longtimeOut,TimeUnitunit)throwsInterruptedException{if(latch.await(timeOut,unit)){returnthis.response;}returnnull;}//用于设置响应结果,并且做countDown操作,通知请求线程publicvoidsetResponse(Tresponse){this.response=response;latch.countDown();}publiclonggetBeginTime(){returnbeginTime;}}

SyncFuturesyncFuture=newSyncFuture();SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(),syncFuture);RpcResponserpcResponse=syncFuture.get(timeOut,TimeUnit.MILLISECONDS);SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());

除了同步服务调用,异步服务调用,还有并行服务调用,泛化调用等调用形式。

高可用

简单的介绍了下分布式服务框架,下面来说下分布式系统的高可用。一个系统设计开发出来,三天两晚就出个大问题,导致无法使用,那这个系统也不是什么好系统。

业界流传一句话:"我们系统支持 X 个 9 的可靠性"。这个 X 是代表一个数字,X 个 9 表示在系统 1 年时间的使用过程中,系统可以正常使用时间与总时间(1 年)之比。

3 个 9:(1-99.9%)*365*24=8.76 小时,表示该系统在连续运行 1 年时间里最多可能的业务中断时间是 8.76 小时,4 个 9 即 52.6 分钟,5 个 9 即 5.26 分钟。要做到如此高的可靠性,是非常大的挑战。

一个大型分布式项目可能是由几十上百个项目构成,涉及到的服务成千上万,主链上的一个流程就需要流转多个团队维护的项目。

拿 4 个 9 的可靠性来说,平摊到每个团队的时间可能不到 10 分钟。这 10 分钟内需要顶住压力,以最快的时间找到并解决问题,恢复系统的可用。

下面说说为了提高系统的可靠性都有哪些方案:

服务检测:某台服务器与注册中心的连接中断,其提供的服务也无响应时,系统应该能主动去重启该服务,使其能正常对外提供。

故障隔离:集群环境下,某台服务器能对外提供服务,但是因为其他原因,请求结果始终异常。

这时就需要主动将该节点从集群环境中剔除,避免继续对后面的请求造成影响,非高峰时期再尝试修复该问题。至于机房故障的情况,只能去屏蔽整个机房了。

目前饿了么做的是异地多活,即便单边机房挂了,流量也可以全量切换至另外一边机房,保证系统的可用。

监控:包含业务监控、服务异常监控、DB 中间件性能的监控等,系统出现异常的时候能及时的通知到开发人员。等到线下报上来的时候,可能影响已经很大了。

压测:产线主链路的压测是必不可少的,单靠集成测试,有些高并发的场景是无法覆盖到的,压测能暴露平常情况无法出现的问题,也能直观的提现系统的吞吐能力。当业务激增时,可以考虑直接做系统扩容。

SOP 方案与演练:产线上随时都可能会发生问题,抱着出现问题时再想办法解决的态度是肯定不行的,时间根本来不及。

提前做好对应问题的 SOP 方案,能节省大量时间,尽快的恢复系统的正常。当然平常的演练也是不可少的,一旦产线故障可以做到从容不迫的去应对和处理。

除了上述方案外,还可以考虑服务策略的使用:

降级策略:业务高峰期,为了保证核心服务,需要停掉一些不太重要的业务。如双十一期间不允许发起退款、只允许查看 3 个月之内的历史订单等业务的降级,调用服务接口时,直接返回的空结果或异常等服务的降级,都属于分布式系统的降级策略。

服务降级是可逆操作,当系统压力恢复到一定值不需要降级服务时,需要去除降级,将服务状态恢复正常。

服务降级主要包括屏蔽降级和容错降级:

屏蔽降级:分布式服务框架直接屏蔽对远程接口的请求,不发起对远程服务的调用,直接返回空结果、抛出指定异常、执行本地模拟接口实现等方式。容错降级:非核心服务不可调用时,可以对故障服务做业务放通,保证主流程不受影响。如请求超时、消息解码异常、系统拥塞保护异常, 服务提供方系统异常等情况。

笔者之前就碰到过因双方没有做容错降级导致的系统故障的情况。午高峰时期,对方调用我们的一个非核心查询接口,我们系统因为 Bug 问题一直异常,导致对方调用这个接口的页面异常而无法跳转到主流程页面,影响了产线的生产。当时对方紧急发版才使系统恢复正常。

限流策略:说到限流,最先想到的就是秒杀活动了,一场秒杀活动的流量可能是正常流量的几百至几千倍,如此高的流量系统根本无法处理,只能通过限流来避免系统的崩溃。

服务的限流本质和秒杀活动的限流是一样的,都是限制请求的流入,防止服务提供方因大量的请求而崩溃。

限流算法:令牌桶、漏桶、计数器算法。上述算法适合单机的限流,但涉及到整个集群的限流时,得考虑使用缓存中间件了。

例如:某个服务 1 分钟内只允许请求 2 次,或者一天只允许使用 1000 次。

由于负载均衡存在,可能集群内每台机器都会收到请求,这种时候就需要缓存来记录调用方某段时间内的请求次数,再做限流处理。Redis 就很适合做此事。

熔断策略:熔断本质上是一种过载保护机制,这一概念来源于电子工程中的断路器,当电流过大时,保险丝会熔断,从而保护整个电路。

同样在分布式系统中,当被调用的远程服务无法使用时,如果没有过载保护,就会导致请求的资源阻塞在远程服务器上耗尽资源。

很多时候,刚开始可能只是出现了局部小规模的故障,然而由于种种原因,故障影响范围越来越大,最终导致全局性的后果。

当下游服务因访问压力过大而响应变慢或失败,上游服务为了保护自己以及系统整体的可用性,可以暂时切断对下游服务的调用。

熔断器的设计思路:

Closed:初始状态,熔断器关闭,正常提供服务。Open: 失败次数,失败百分比达到一定的阈值之后,熔断器打开,停止访问服务。Half-Open:熔断一定时间之后,小流量尝试调用服务,如果成功则恢复,熔断器变为 Closed 状态。

数据一致性

一个系统设计开发出来,必须保证其运行的数据准确和一致性。拿支付系统来说:用户银行卡已经扣款成功,系统里却显示失败,没有给用户的虚拟帐户充值上,这会引起客诉。

说的再严重点,用户发起提现,资金已经转到其银行账户,系统却没扣除对应虚拟帐号的余额,直接导致资金损失了。如果这时候用户一直发起提现,那就酸爽了。

CAP 原则

说到数据一致性,就不得不说到 CAP 原则。CAP 原则中指出任何一个分布式系统中,Consistency(一致性 C)、 Availability(可用性 A)、Partition tolerance(分区容错性 P),三者不可兼得。

传统单机数据库基于 ACID 特性(原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)) ,放弃了分区容错性,能做到可用性和一致性。

对于一个分布式系统而言,分区容错性是一个最基本的要求。既然是一个分布式系统,那么分布式系统中的组件必然需要被部署到不同的节点,会出现节点与节点之间的网络通讯。

而网络问题又是一定会出现的异常情况,分区容错性也就成为了一个分布式系统必然需要面对和解决的问题。

系统架构师往往需要把精力花在如何根据业务特点在一致性和可用性之间寻求平衡。

集中式系统,通过数据库事务的控制,能做到数据的强一致性。但是分布式系统中,涉及多服务间的调用,通过分布式事务的方案:

两阶段提交(2PC)三阶段提交(3PC)补偿事务(TCC)...

虽然能实现数据的强一致,但是都是通过牺牲可用性来实现。

BASE 理论

BASE 理论是对 CAP 原则中一致性和可用性权衡的结果:Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent(最终一致性)。

BASE 理论,其来源于对大规模互联网系统分布式实践的总结,是基于 CAP 原则逐步演化而来的。

其最核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

基本可用:是指分布式系统在出现不可预知故障的时候,允许损失部分可用性,这不等价于系统不可用。

软状态:指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。

最终一致性:强调的是所有的数据副本,在经过一段时间的同步之后,最终都能够达到一致的状态。

因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。

总的来说,BASE 理论面向的是大型高可用可扩展的分布式系统,和传统的事物 ACID 特性是相反的。

它完全不同于 ACID 的强一致性模型,而是通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。

同时,在实际的分布式场景中,不同业务单元和组件对数据一致性的要求是不同的,因此在具体的分布式系统架构设计过程中,ACID 特性和 BASE 理论往往又会结合在一起。

结语

分布式系统涉及到的东西还有很多,如:分布式锁、定时调度、数据分片、性能问题、各种中间件的使用等,笔者分享只是了解到的那一小部分的知识而已。

之前本着学习的目的也写过一个非常简单的分布式服务框架 blackRpc,通过它了解了分布式服务框架内部的一些活动。

本文中所有代码都能在该项目中找到,有兴趣读者可以看看:

https://github.com/wangshiyu/blackRpc

为您推荐

....