《武汉工程大学学报》  2020年02期 224-230   出版日期:2021-01-26   ISSN:1674-2869   CN:42-1779/TQ
智能停车系统中消息中间件的设计与实现


现如今汽车的拥有量达到一个相当庞大的数字,导致交通拥堵问题日益严重[1],智慧停车解决城市“停车难”问题已刻不容缓。智能立体停车库以独特的占地面积少[2],交互敏捷等特点成为新时代解决停车问题的关键。其中,移动终端和应用服务器之间的通讯存在稳定性、实时性和准确性等方面的问题。参考如今市面上使用较广的几种开源消息中间件,例如ActiveMQ、kafka、ZeroMQ等,经过测试发现ActiveMQ采用消息推送方式,所以最适合的场景是默认消息都可在短时间内被消费,当消息队列流量达到峰值时,容易造成消息在消费者客户端堆积[3],这与智能停车系统要求在短时间内完成大量的数据请求特性不符;Kafka特有的异步刷盘机制会导致在处理海量数据时产生过高开销甚至出错等问题[4];而ZeroMQ的并发是通过通讯队列使用无锁操作[5]实现的,不能保证线程安全。传统的同步点对点通信由于应用程序与服务器之间一对一的通信方式无法满足大量用户、多台中控同时请求通信的需求,综合考虑,本文设计了一种智能停车异步消息中间件(smart car asynchronous messaging middleware,SCAMM)较好地解决了以上的问题。SCAMM采用基于消息的分布式通信模式,消息生产者(客户端)将携带标志的消息发布到消息代理服务器,再由消息代理服务器发向已注册该标志的消费者(对应的服务器)。这种模式下实现了生产者和消费者之间的松耦合,同时基于Java非阻塞(non-blocking input output,NIO)事件驱动模型的异步通信技术能满足大量用户同一时刻发送请求,在此基础上对数据的粘包拆包问题进行了处理。SCAMM消息中间件在智能停车系统中的应用实例如图1所示。当用户点击停车按钮时,客户端将请求数据发送到消息中间件,队列管理器将读取消息并写入对应的通道,通道以轮询的方式发送消息至中控端,用户停车成功则在客户端告知停车完成,车位不足时则返回停车失败。智能停车系统的重点在于上下班的高峰期对可停车位的资源抢夺,这一过程要求系统既要应对短时间内高并发多目标中控的指令请求,又要保证指令准确完整的送达对应中控机,SCAMM消息中间件能够很好解决这一问题。1 相关技术1.1 消息模型消息中间件中支持的消息模型分为两种:点对点模型(peer to peer,P2P)和发布/订阅模型。图2为两种通信模型图。[发送者][队列][发送消息][发送消息][接收者][消息][消息][消息][主题][发布消息][发布者][订阅者1][订阅者2][订阅者3][ b ][ a ]图2 两种通信模型图:(a) 点对点,(b)发布者/订阅者Fig. 2 Two-communication models: (a) P2P, (b) publisher/subscriberP2P模型的主体有3个:消息的发送者,消息转发队列以及消息的接收者。消息产生后通过发送者发送到一个特定的消息队列,排队转发给被接受者,消息被接收者接受或者过期之后会被丢出消息队列,一个消息只能被一个接收者所接受,当接受者离线时,这条消息也会在消息队列中被移除。发布/订阅模式下的主题有3个:发布者,订阅者和主题。发布者指定消息的主题,并将消息主动发送到主题的通道中或者等待订阅者来轮询,订阅者可以通过订阅主题来使用指定通道中的消息。发布/订阅模式有以下特点:每个订阅者可以订阅多个主题,每个主题中的消息可以被所有订阅者订阅,可以实现一对多的消息传递。1.2 异步通信 异步通信是指消息在生产者和消费者之间的收发不需要通过公共的时钟信号来控制,而是采用异步应答方式来实现双方的通信[6]。消息中间件SCAMM采用Java NIO异步非阻塞I/O模型来实现异步通信,Java NIO是在JDK 1.4开始提供新的API[7],基于事件驱动模型开发[8],以块的方式处理数据[9],通过双向通道(channel)进行传输。异步通信机制的本质是当应用进程向CPU发送一个I/O请求,如果此时内核内没有可用资源被调用,内核会向客户端返回一个错误码,客户端进程会有可用资源的时间再次进行请求操作,这样就避免了进程的阻塞,这种往返的操作也被称为轮询[10]。可以将异步通信中的非阻塞I/O模型用以下的伪代码来表示:While(CMD_arrived){ //一直等待停取车指令 //如果有指令,执行读就绪事件 If(read_ready) 内核从数据流中读取数据; If(write_ready) 内核在数据流中写数据; Break;}1.3 消息队列消息队列是指消息由生产者发送到1个拥有特定标志的虚拟通道,消费者连接到这个特定通道上获取通道中的消息。在分布式和异构环境中使用消息队列技术可以很大程度地减少系统的耦合,消息队列对消息的生产者和消费者的语言、平台不做要求,只需要在同一套编码规范下即可完成通信。当使用消息队列时,生产者和消费者需要知道队列的标志,消息才能够被发送到指定标志下的队列。消息在队列中会按照先进先出的顺序被转发给消费者。在这种模式下,生产者和消费者可以不必同时处在工作状态,虚拟通道会按照先进先出的顺序在消费者能够正常工作后将消息转发给消费者,1条消息只能被一个消费者接受,当这条消息被使用或者过期后,将会被抛出消息队列,当消费者接收到消息后将返回通道1个确认信息。整个过程中生产者只需要发送1条消息后无需等待,消费者也只需从队列中读取消息进行执行[11],摒弃了传统应用中通信双方需要同步执行的缺点。2 消息中间件的设计2.1 SCAMM的模型消息中间件是1种消息代理,是统一消息收发的接口,通过高效可靠的消息传递机制实现跨平台,跨语言进行数据交换[12]。作为软件中间层,消息中间件通过采用同步或异步的通信方式来达到稳定可靠的数据交流,在分布式环境下扩展进程间的通信[13]。本文采用形式化语言巴科斯范式(backus naur form,BNF)进行描述消息中间件,BNF的优点在于它描述的是软件的抽象结构,无需考虑其具体实现细节。使用BNF语言既避免了在描述中间件语义上的二义性、模糊性,又在垂直方向上明确各部分继承父对象的规格说明[14]。中间件的描述需要明确两个方面的信息:中间件操作来实现,这些操作包含输入输出参数和约束的构成和各构成模块的行为信息,每个部件都有一个或多个接口实现,每个接口都有一个或多个条件。以下给出了中间件的主要组成部分和各个部分需要实现的方法。::=**::=[Heart-beat] [link-protect ]::= [self-Protocol]::=消息中间件SCAMM定义为3种成分:至少1种的Function功能(Function)、至少1个外部接口(InterFace)和1种确定的Message Scheduing (消息调度类型)。功能模块包含逻辑处理模块、启动模块、关闭模块、传输通信协议(transmission control protocol,TCP)长连接心跳检测模块、链路保护模块,每个子模块实现的功能不同,协同实现消息在中间件内部分发过程。外部接口模块包含通讯接口、连接接口以及自定义的消息格式接口,外部接口模块是为了使消息中间件不限制于单一类型的客户端或者通信协议,降低了系统的耦合程度同时保证了消息中间件良好的拓展性。消息中间件需要明确某1种通信模式,通常通信模型分为消息队列或者主题(topic)。在智能停车消息中间件SCAMM通信模型中使用Java NIO事件驱动模型来实现异步通信。智能停车系统中需要考虑消息的高并发和准确性,在SCAMM模型中采用Channel来传输数据,这个通道连接起客户端和中控端,当某1个中控机与服务器建立连接后,在服务器设置的监听器立即启动连接模块为这个中控机注册1个全新的Channel TCP长连接并拥有唯一标志,有标志了发往这个中控的消息会经由Channel转发。在整个通信过程中,消息首先由生产者(客户端)传递给SCAMM消息中间件,通过消息管理模块调用ChannelRead读取消息标志,根据消息标志写入对应Channel的消息队列中,在经过自定义的消息格式编码后通过消息转发模块发送给中控,中控接收到数据包后会回复1个数据包,这个回复包再通过消息转发模块的接收方法获取,解码后从原通道返回至消息队列,消息队列发送给客户端。SCAMM消息中间件通信的系统结构如图3所示。2.2 SCAMM的设计SCAMM消息中间件服务端分为5个模块:连接模块、长连接心跳监测模块、消息封装模块、消息管理模块和消息转发模块。连接模块通过监听中控服务状态来建立与服务器的TCP长连接,当远程中控失去连接时,调用close()方法释放掉已注册的通道信息,下次远程中控上线后重新注册。长连接心跳监测模块通过定时发送心跳报文监测通道是否存活,及时关闭失活通道并重新注册该通道避免了服务端可能存在的消息丢失的情况。消息封装模块由SmartCarDecoder类通过自定义通信协议,保证了数据传输的隐蔽性和安全性,并且通过在服务端和客户端的pipeline上加上相应的解码器解决了TCP的粘包、拆包问题;消息管理模块的ChannelHandle组件实现了服务器对从客户端接受的数据的处理。消息转发模块通过transport类来实现消息传递。2.2.1 ChannelHandle API 启动Server时,进行服务器初始化,这一步的操作包括读取主机的IP地址、监听进行socket通信的端口、创建ChannelHandle组件并注册相对应中控端(消费者)的消息通道(Channel)。通过读取到客户端发送过来的消息的相关参数,ChannelHandle 组件进行一系列相应的通信过程的接发处理,ChannelHandle组件存在两个比较重要的子接口,ChannelInboundHandle处理入站数据和各种变化的子接口,ChannelOutboundHandle处理出站数据并且包含各种拦截操作的命令的子接口。对于ChannelInboundHandle子接口,表1给出了相关方法的描述。表1 ChannelInboundHandle的方法描述Tab. 1 Methods description of channelInboundHandle class[返回类型\&方法名\&描述\&void\&ChannelRegistered( )\&当Channel已经注册到EventLoop并且能够处理I/O时被调用\&void\&ChannelUnRegistered( )\&当Channel从EventLoop注销并且无法处理任何I/O时被调用\&void\&ChannelActive( )\&当Channel处于活动状态时被调用;Channel已经连接并且已经就绪\&void\&ChannelInActive( )\&当Channel离开活跃状态并且不在连接它的远程节点时被调用\&void\&ChannelReadComplete( )\&当Channel上的一个读操作完成时被调用\&void\&ChannelRead( )\&当从Channel读取数据时被调用\&void\&ChannelWritabilityChanged( )\&当Channel的可写状态发生改变时被调用。用户可以通过调用Channel的isWritable( )方法来检测Channel的可写性\&]当某个ChannelInboundHandle的实现重写了ChannelRead( )方法时,它将负责显式的释放与池化的ByteBuf实例相关的内存。最后,将已经启动的Channel保存在列表中,该列表保存所有当前活动的中控端(消费者)的长连接。2.2.2 SmartCarDecoder类 消息中间件协议不仅定义了进行信息交换的通信方式,而且间接体现了协议支持的系统结构方式[15]。当客户端发送一条消息或者服务端接收到一条消息时,就会发生一次数据交换,通过自定义消息的编码器与解码器可以保证消息的独立性与隐私性。入站消息会被解码,解码器将接收到的字节流转换成中间格式json字符流,再由字符流转换成1个Java对象,出站消息的模式是相反方向的,编码器将对象转换成字符流,再通过字符流转化成字节流与消费者进行网络数据的通信。本文设计的中间件使用的是基于长度的协议的解码器,基于长度的协议通过它的长度编码到帧的头部来定义帧,而不是使用某种特殊的分隔符来标记它的结束。协议开始使用标准数据包包头整型占4 B,传输数据的长度为整型,占4 B,同时为了防止socket流的攻击,传输的数据长度不应该超过2 048位。服务端消息转发模块通过读取并匹配提取的帧的数据来分发消息。SmartCarDecoder类同时解决了消息的粘包、拆包问题。消息在客户端和服务端之间是以一种流的方式来进行转播的。对于应用层而言,客户端发送的是一个个独立完整的数据包,而在数据链路层、传输层或网络层这种底层会根据TCP缓冲区的实际大小进行包的划分,就可能会出现将一个完整的数据包拆分成多个包发送或者将多个包封装成一个大的数据包发送,这就是TCP的粘包、拆包。当出现粘包、拆包问题时,系统中的消息就会成为不可读的消息,SmartCarDecoder类将消息分为消息头和消息体,消息头中包含表示消息总长度的字段,该解码器能够在获取消息头的时候解析出消息长度,然后向后读取该长度的内容。对于实现中间件的编解码类,表2给出了相关方法的描述。表2 SmartCarDecoder类的方法描述Tab. 2 Methods description of SmartCarDecoder class[返回类型\&方法名\&描述\&Object\&JsonByteToObj( )\&解码器将原始数据流字节流先转码为中间格式字符流,再由字符流转化为对象进行读取\&byte[ ]\&ObjToJsonByte( )\&编码器将对象转换为中间格式字符流,字符流转化为字节流进行通信传输\&void\&Decode( )\&读取整个数据包,将传入的字节流按照自定义协议格式,即4+4字节方式进行编码,读取data信息\&]2.2.3 transport类的实现 ChannelHandle 组件通过transport类实现消息传递。transport类的方法描述如表3所示。当服务器需要转发一条消息给消费者时,transport类通过其SendMessage( )方法轮询的发送队列中的消息。消息转发模块线程将一直处于轮询状态直到调用了StopTransport( )方法[11],当服务器接收到消费者的反馈时,通过ReceiveMessage( )方法将反馈消息通过自定义协议封装传回给客户端。2.2.4 心跳检测类的实现 在异步长连接中,客户端发送正常报文时可能发生通道连接异常的情况,为了监测通道连接是否正常,SCAMM消息中间件在服务端定时发送心跳报文,当心跳报文发送或者解析后不匹配时则认为该通道失活,服务端关闭该通道并重新注册激活。该类使用线程避免了心跳报文与正常通信报文在同一个套接字连接中产生的资源交叉,使用线程池对心跳线程进行管理避免了通道数过多时产生的服务器资源占用过多的问题,线程间的通信可使用全局变量从而简化程序复杂度;使用线程内部机制——静态互斥量进行正常报文和心跳报文的同步,程序简单可靠[16]。心跳检测类的方法描述如表4所示。表4 心跳检测的方法描述Tab. 4 Methods description of heartbeat detection class [返回类型\&方法名\&描述\&Void\&WriteAndFlushAsync()\&子线程发送心跳指令到服务器\&Void\&RunClientAsync()\&主线程发送正常报文到服务器\&Boolen\&isAlive()\&通过返回值判断Channel是否存活,即判断Channel的心跳\&]2.2.5 Abstract-Message抽象消息类  Message对象由抽象消息类创建,该类定义了一条消息需要具备的内容,包括消息的指令类型、目标地址标记、预约时长、车牌号、用户ID等等,通过格式化编码将这些信息封装成一个对象,用于消息中间件的接发。抽象消息类的参数如表5所示。表5 Abstract-Message抽象消息类的参数描述Tab. 5 Parametric description of Abstract-Message class [名称\&描述\&CMD\&指令标识符,区分停、取、预约停、预约取指令\&context\&消息内容主体\&receiver\&消息发往通道的标识符\&userID\&请求指令的用户编码\&Time\&请求指令的起始时间\&]3 实验部分3.1 实验环境为了检验SCAMM中间件是否能够提高智能停车系统中指令接受的准确率,利用单一因素实时实验原理进行了实验。实验中的计算机运行的是Microsoft Windows10 x64,具有1个IntelCore I5-6 200U处理器,双核四线程、8 GB的RAM和一个500 GB磁盘。将服务端和客户端部署在同一台计算机上同时结束所有非必要的进程以防止对实验结果的影响。使用测试工具postman检查响应数据包(Response Body)是否等于约定消息长度和内容,当Response Body的返回值与消息完全匹配时返回true,否则返回false,每次测试重复3次并采取平均值进行记录。3.2 消息的准确率对比试验实验选择socket点对点通信、拥有高吞吐量性能的中间件kafka和SCAMM中间件并分别将3种通讯方式应用于智能停车系统进行了测试。由于SCAMM自定义协议类SmartCarDecoder用来处理消息粘包、拆包的特点,就并发请求数量级、消息大小这两个方面进行实验,两次实验均采用单一变量原则。在消息数量对准确率的影响实验中,固定每次发送的消息大小为256 B,依次改变并发请求总数100、500、1 000、2 000、5 000,记录测试工具postman提供的完全匹配成功数和匹配失败数;在消息大小对准确率的影响实验中,固定请求总数1 000,依次改变消息的大小256 B、512 B、1 kB、5 kB、10 kB,记录测试工具postman提供的完全匹配成功数和匹配失败数。以上每次实验前结束所有无关进程避免对实验造成干扰,每次实验重复3次并计平均值。表6为智能停车系统中采用传统点对点的socket通信、使用kafka中间件和使用SCAMM中间件进行不同并发请求数下的对比。由表6可知,当并发请求数<500时,两种情况下指令的准确率相差无几;当并发数量>1 000时,SCAMM中间件能够明显的优于两者。SCAMM在不同并发请求数下的准确率为100%。表7为智能停车系统中采用传统点对点的socket通信、使用kafka中间件和使用SCAMM中间件在不同消息规模下通信的准确率对比。由表7可知,当消息长度<1 kB时,3种情况都表现了良好的准确率;当消息长度>1 kB时,SCAMM能够明显的优于传统的socket通信;当消息长度≥10 kB时,SCAMM中间件的准确率比kafka中间件的准确率提高了14%,比socket点对点通信提高了32%。由实验结果可知,在高并发请求数和大规模消息长度情况下,SCAMM中间件能够显著提高消息的准确率。4 结 论在大型企业中消息中间件的应用具有十分广阔的前景,目前,基于消息的异步分布式通信设计的消息中间件SCAMM已经在实例中取得良好应用,有着巨大的发展空间。相比于传统应用中的同步点对点通信来实现消息传递的系统,基于消息的异步分布式通信设计的消息中间件能够提供更稳定,扩展性更好的中间桥梁,同时使系统在高并发量请求下仍然能够高效的处理数据。SCAMM忽略了发送方和接收方的平台异构性,简化了集成分布式系统的难度,降低了维护系统的成本。SCAMM中间件通过自定义的通信协议和心跳监测模块,能够有效的保持指令传输过程中的安全性,提高指令被接收的准确度。