|
大纲
1.什么是消息中间件
2.消息中间件的技术选型
3.RocketMQ的架构原理和使用方式
4.消息中间件路由中心的架构原理
5.Broker的主从架构原理
6.高可用的消息中间件生产部署架构
7.部署一个小规模的RocketMQ集群进行压测
8.如何对RocketMQ集群进行可视化的监控和管理
9.进行OS内核参数和JVM参数的调整
10.如何对小规模RocketMQ集群进行压测
11.消息中间件集群生产部署规划的思维导图
1.什么是消息中间件
(1)什么是同步
(2)如何基于消息中间件实现异步
(3)消息中间件的作用
(1)什么是同步
通常一个公司里可能会存在多个业务系统,这些业务系统之间的通信都是通过接口调用来进行的。现在假设用户在浏览器或者APP向系统A发起一个请求,系统A收到这个请求后会马上去调用系统B的接口。当系统B的接口返回响应结果给系统A后,系统A才能返回结果给用户。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=MWFmZDk4M2M0MzhjNzc4YjRiNDU5Zjk5NjA4MTgzMzcsMTczODE2NjM4MjgxNg==[/attach]
在这种情况下,用户发起一个请求,系统A收到请求,接着系统A必须马上去调用系统B的接口,直到系统B的接口返回了,系统A才能返回结果给用户,这种模式就是所谓的"同步调用"。
这个同步的意思,就是各个系统的联动都是同步依次进行的。一个系统先动,然后马上带动另外一个系统一起动,最后大家依次处理完后再返回结果。这就是同步调用通俗的解释。
(2)如何基于消息中间件实现异步
现在假设在系统A和系统B之间加入了一个"消息中间件",也就是MQ,即消息队列。加入了"消息中间件"以后,系统A和系统B之间是怎么通信的?很简单,之所以叫"消息中间件",是因为它里面最核心的就是"消息"。
系统A一般会发送一条消息给MQ,接着系统A就认为自己的工作干完了,直接返回结果给用户。这时系统A跟系统B就没什么关系了,因为系统A要做的事情只是接收请求,发送消息到MQ,然后就返回结果给用户。然后系统B根据自己的情况,可能会在系统A投递消息到MQ之后的1秒内、1分钟之后,甚至1小时后,从MQ里获取到一条属于自己的消息,然后根据消息完成自己的工作。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=Y2Q2NDNkYjI5YzA3Zjg5YWVhZGI0NjE0MmEzODJhNmIsMTczODE2NjM4MjgxNg==[/attach]
在这种情况下,系统A和系统B有没有实现通信?有,因为系统A发了一个消息给MQ,系统B从MQ里获取了一个消息,干了自己该干的工作。
那么系统A跟系统B之间是不是同步调用?不是,因为系统A仅仅是发消息到MQ,至于系统B什么时候获取消息,有没有获取消息,它是不管的。所以这种情况下,我们说系统A和系统B是"异步调用"。
所谓异步调用,意思就是系统A先处理自己的工作,然后想办法通知系统B。但是系统B什么时候收到通知、什么时候去处理自己的工作,这些系统A不管,和系统A没关系。但是最终在正常下,系统B总会获取到这个通知,然后处理自己的工作。
这种情况下,并不是系统A动了,系统B就马上同步动,他们不是同步的。而是系统A动了,但是系统B可能过一会儿才动,他们的步调是不一样的,所以是异步的。这就是所谓的"异步调用"通俗的解释。
(3)消息中间件的作用
消息中间件,其实就是一种系统,它自己独立部署。然后让两个系统之间通过发消息和收消息,来进行异步的调用,而不是仅仅局限于同步调用。消息中间件的主要作用有三个:异步化提升性能、降低系统耦合、流量削峰。
比如现在假设系统A要调用系统B。然后系统A先执行一些操作,需要耗费20ms,接着系统B执行一些操作要耗费200ms,总共就要耗费220ms。这时等系统A和系统B都处理完,才能返回结果给用户,要等待220ms。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZjM5ZmUwMjQ3YWZhOWQxYjI0ZTU1ODNlZjNlMmUwOTYsMTczODE2NjM4MjgxNg==[/attach]
一.异步化提升性能
如果在系统A和系统B之间加一个MQ:系统A干完自己的事情,就20ms,然后发送一条消息到MQ,就5ms,然后就返回结果给用户了。也就是说,用户仅仅等待25ms就收到了结果。然后系统B从MQ里获取消息,接着花费200ms去执行。但是这个200ms就跟用户没关系了,用户早就收到结果了,也不关心你花200ms还是2s去干自己的事。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZTllN2Q4YWMzMjQ5YWRiZGQ3NzEwZTI3NjUxMmFjMjMsMTczODE2NjM4MjgxNg==[/attach]
这样对用户来说,就从原来等待220ms返回结果,变成现在只要25ms就可以返回结果了。可见,消息中间件可以用来提升系统性能。
二.降低系统耦合
如果系统A同步调用系统B,这其实属于系统间的耦合,系统A和系统B互相之间会有影响。如果系统B突然出现故障,会导致系统A在调用系统B的时候感知到这个故障。由于系统B故障了,所以系统A对系统B的调用失败。系统A就得返回异常给用户,以及在后续处理这个异常。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZDE2ZjQ5ZDkzMGVkMjMwMTlhNTFjMzliOGRlOTQ3MDYsMTczODE2NjM4MjgxNg==[/attach]
假设在系统A和系统B之间加入一个消息中间件。在这种情况下,系统A对系统B的调用仅仅是发送一个消息到MQ,然后就直接返回给用户了,后面对系统B就不管了。此时系统B如果出现了故障,对系统A根本没影响,系统A也感觉不到。系统B出现故障就成了它自己的事,它要等故障恢复后继续去完成它的工作,此时对系统A没任何影响了。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZWQ2ZDU3ZmJhNzJiYmQ1MWY5N2Y5MGI5OWUwMzY1NzgsMTczODE2NjM4MjgxNg==[/attach]
为什么会有这样的效果呢?因为通过引入MQ,两个系统实现了异步化调用,也就实现了解耦。系统A并没有跟系统B耦合,所以互相之间并没有任何影响。所以消息中间件还能让两个系统解耦,让它们互相之间没有任何影响。
三.流量削峰
消息中间件还有一个特别好的功能叫流量削峰。假设系统A是不操作数据库的,因此只要多部署几台机器,就可以抗下每秒1万的请求。比如部署个20台机器,就可以轻松抗下每秒上万请求。然后系统B是要操作一台数据库服务器的,这台数据库服务器的上限是每秒接收6000请求。那么系统B无论部署多少台机器都没有用,因为它依赖的数据库最多只能每秒接收6000请求。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=YmFiMGJlYzI5MjIyMGYzY2EwNmU5YjE4NmY4ZjNiMzIsMTczODE2NjM4MjgxNg==[/attach]
比如系统A在1秒内收到了1万请求,然后把这1万请求转发给系统B。由于系统B要操作数据库,每秒最多能处理6000请求,所以此时系统B是抗不下这1万请求的,一定会压垮数据库。因此可以通过引入MQ来解决这个问题,MQ抗高并发的能力远远高于数据库。同样配置下,如果数据库可以抗每秒6000请求,那么MQ可以抗每秒几万请求。
为什么呢?因为数据库复杂,它要能够支持执行复杂的SQL语句,支持事务等复杂的机制,支持对数据进行增删改查等。所以一般数据库单服务器也就支撑每秒几千的请求。但MQ就不一样了,它的核心功能就是让系统A发消息给它,再让系统B从它这里获取消息。这个就简单得多了,所以同等机器配置下,MQ一般都能抗几万并发请求。
所以只要引入MQ,就可以让系统A把每秒1万请求都作为消息直接发送到MQ里,MQ可以轻松抗下这每秒1万请求。接着系统B只要慢慢的从MQ里获取消息然后执行数据库读写操作即可,这个获取消息的速度是系统B自己可以控制的。所以系统B完全可以用一个比较低的速率获取消息然后写入数据库,保证对数据库的QPS不超过它的极限值6000。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZjA2ZDI1Zjc2NGI2MjYzN2VmMTlmZTllMTFlMjJiN2IsMTczODE2NjM4MjgxNg==[/attach]
这时因为系统A发送消息到MQ很快,系统B从MQ消费消息很慢,所以MQ里自然会积压一些消息。不过不要紧,MQ一般都是基于磁盘来存储消息的,所以适当积压一些消息是可以的。当系统A的高峰过去,每秒可能就恢复到1000请求,此时系统B还是以每秒6000请求的速度获取消息写入数据库,那么自然MQ里积压的消息就会慢慢被消化掉了。
所以这就是使用MQ进行流量削峰的效果:系统A发送过来的每秒1万请求是一个流量洪峰,然后MQ直接给扛下来了,都存储到自己本地磁盘。这个过程就是流量削峰的过程,瞬间把一个洪峰给削下来,让系统B后续慢慢获取消息来处理。
2.消息中间件的技术选型
(1)订单系统的很多问题都可以用MQ来解决
(2)要在订单系统里用MQ技术应从哪一步开始
(3)Kafka、RabbitMQ以及RocketMQ的调研对比
(4)活跃的社区和广泛的运用
(1)订单系统的很多问题都可以用MQ来解决
举个例子,支付订单流程中步骤过多,导致性能很差。这个问题可以用MQ来解决,让订单系统仅仅完成最核心的一些步骤和调用,然后发送消息到MQ,比如仓储系统之类的就可以从MQ里获取消息,然后慢慢的执行一些比较耗时的步骤。
还有比如订单系统在退款时,可能会遇到第三方支付系统退款失败的问题,从而影响整个退款流程的失败。这个问题也可以用MQ解决,可以让订单系统在退款时完成公司内部各个系统的流程,然后发送消息到MQ,由一个专门的服务去负责调用第三方支付系统,处理可能出现的失败。
在双11大促活动时,也可以让瞬间涌入的大量下单请求到MQ里去排队,然后让订单系统在后台慢慢的获取订单,以数据库可以接受的速率完成操作,避免瞬间请求量过大击垮数据库。
(2)要在订单系统里用MQ技术应从哪一步开始
要做一些技术架构的升级,引入一些新的技术,当然要从技术调研开始。其实技术调研就是对一个技术去找到一些业内常用的开源实现,然后对各种不同的实现都进行一些调研,对比一下它们的优劣势,看看谁比较符合我们的需求,谁比较适合我们来使用;
具体来说,对于前面订单系统的情况,现在只有一个MQ的概念,但是要考虑一下:
一.业内常用的MQ有哪些
二.每一种MQ各自的表现如何
三.这些MQ在同等机器条件下,能抗多少QPS
四.性能有多高(发送一条消息给它要2ms还是20ms)
五.可用性能否得到保证(部署MQ的机器挂了怎么办)
然后还得考虑:
一.它们会不会丢失数据
二.如果需要的话能否让它们进行线性的集群扩容
三.消息中间件经常需要使用的一些功能都有吗(比如说延迟消息、事务消息、消息堆积、消息回溯、死信队列等)
另外还得考虑这些MQ的文档是否齐全、社区是否活跃、在行业内是否广泛运用、是用什么语言编写的。把这些事情都搞清楚了,那么就完成了技术调研,可以全面的对比各种MQ的优劣势,然后从中选择一个最适合的来使用。
(3)Kafka、RabbitMQ以及RocketMQ的调研对比
一般国内常用的MQ技术有四种实现:ActiveMQ、Kafka、RabbitMQ、RocketMQ。但是其中ActiveMQ主要是几年以前较多公司使用,现在国内用的公司都很少了。因此下面主要针对Kafka、RabbitMQ、RocketMQ三种技术进行介绍。
一.Kafka的优势和劣势
首先Kafka的吞吐量几乎是行业里最优秀的,在常规的机器配置下,一台机器的QPS就可以达到每秒十几万,相当的强悍。Kafka性能也很高,基本上发送消息给Kafka是毫秒级的性能。Kafka可用性也很高,Kafka是支持集群部署的,其中部分机器宕机是可以继续运行的。
Kafka的劣势一是:存在数据丢失的问题。因为Kafka收到消息之后会写入一个内存缓冲区里,并没有直接落地到物理磁盘上去。所以要是机器本身故障了,可能会导致内存缓冲区里的数据丢失。
Kafka的劣势二是:功能非常单一。主要是支持发送消息给它,然后从里面消费消息,其他就没有什么额外的高级功能了。所以基于Kafka有限的功能,可能适用的场景并不是很多。
因此综上所述得出Kafka在行业里的一个使用标准:就是把Kafka用在用户行为日志的采集和传输上。比如大数据团队要收集APP上用户的一些行为日志,这种日志就是用Kafka来收集和传输的。因为那种日志适当丢失数据是没有关系的,而且一般量特别大,要求吞吐量要高,一般就是收发消息,不需要太多的高级功能,所以Kafka是非常适合这种场景的。
二.RabbitMQ的优势和劣势
在RocketMQ出现之前,国内大部分公司都从ActiveMQ切换到RabbitMQ来使用,包括很多一线互联网大厂,而且直到现在都有很多中小型公司在使用RabbitMQ。
RabbitMQ的优势在于可以保证数据不丢失,也能保证高可用性,即集群部署的时候部分机器宕机可以继续运行,然后支持部分高级功能,比如死信队列、消息重试等,这些是它的优点。
RabbitMQ缺点一是:吞吐量是比较低的,QPS一般就是几万的级别。所以如果遇到特别高并发的情况下,支撑起来是有点困难的。而且它进行集群扩展的时候(也就是加机器部署),还比较麻烦。
RabbitMQ缺点二是:开发语言是erlang,国内很少有精通erlang语言的工程师。因此也没办法去阅读它的源代码,甚至修改它的源代码。
所以现在行业里的一个情况是,很多BAT等一线互联网大厂都切换到使用更加优秀的RocketMQ了。但是很多中小型公司觉得RabbitMQ基本可以满足自己的需求还在继续使用中,因为中小型公司并不需要特别高的吞吐量,RabbitMQ已经足以满足他们的需求了,而且也不需要部署特别大规模的集群,也没必要去阅读和修改RabbitMQ的源码。
三.RocketMQ的优势和劣势
RocketMQ是阿里开源的消息中间件,非常靠谱,它几乎同时解决了Kafka和RabbitMQ的缺陷。
RocketMQ的吞吐量也同样很高,单机QPS可以达到10万以上,而且可以保证高可用性、可以通过配置保证数据绝对不丢失、可以部署大规模的集群、还可以支持各种高级的功能,比如延迟消息、事务消息、消息回溯、死信队列、消息积压等。同时,RocketMQ是基于Java开发的,符合国内大多数公司的技术栈,工程师很容易就可以阅读它的源码,甚至是修改它的源码。
所以现在国内很多一线互联网大厂都切换为使用RocketMQ了。它们需要RocketMQ的高吞吐量,大规模集群部署能力,以及各种高阶的功能去支撑自己的各种业务场景,同时还可以根据自己的需求定制修改RocketMQ的源码。
RocketMQ是非常适合用在Java业务系统架构中,因为它很高的性能表现、高阶功能的支持,可以解决各种业务问题。当然RocketMQ也有一点美中不足的地方,就是RocketMQ的官方文档相对简单一些。
(4)活跃的社区和广泛的运用
最后一点,基本上Kafka、RabbitMQ和RocketMQ的社区都还算活跃,而且基本应用广泛。尤其是Kafka和RabbitMQ,目前Kafka几乎是国内大数据领域日志采集传输的标准。RabbitMQ在各种中小公司里运用极为广泛,RocketMQ也是开始在一些大公司和其他公司里快速推行中。
3.RocketMQ的架构原理和使用方式
(1)MQ如何集群化部署来支撑高并发访问
(2)MQ如果要存储海量消息应该怎么做
(3)Broker宕机时如何保障高可用
(4)如何进行数据路由访问正确的Broker
关于RocketMQ的两个问题:
一.如何集群化部署来承载高并发访问?
二.要存储海量消息,如何实现分布式存储架构?
(1)MQ如何集群化部署来支撑高并发访问
当RocketMQ部署在一台机器上,即使这台机器配置很高,但一般来说一台机器也就支撑10万+的并发访问。假设有大量的系统都要往RocketMQ里高并发的写入消息,可能有每秒几十万请求。此时由于RocketMQ是可以进行集群化部署的,通过部署RocketMQ在多台机器上,假设每台机器都能抗10万并发,然后只要让几十万请求分散到多台机器上、让每台机器承受的QPS不超过10万即可。如下图示:
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=MzJkMTUzNGE1NGJkMjIyMWQ2ODdlMjBiM2ZhOWY4MDEsMTczODE2NjM4MjgxNg==[/attach]
(2)MQ如果要存储海量消息应该怎么做
MQ会收到大量的消息,这些消息并不是马上就会被所有的消费方获取过去消费的。所以一般MQ都得把消息在自己本地磁盘存储起来,然后等待消费方获取消息去处理。
既然如此,MQ就得存储大量的消息,可能是几百万条,可能几亿条,甚至万亿条。这么多的消息在一台机器上肯定是没法存储的,RocketMQ是如何分布式存储海量消息的呢?
其实发送消息到MQ的系统会把消息分散发送给多台不同的机器。假设一共有1万条消息,分散发送给10台机器,可能每台机器就是接收到1000条消息,如下图示:
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=N2Y5Zjk4MTc5YzllMzFjN2YwNGY3NjUyNWJkMjBjODEsMTczODE2NjM4MjgxNg==[/attach]
其次,每台机器上部署的RocketMQ进程一般称之为Broker。每个Broker都会收到不同的消息,然后会把这些消息存储在自己的磁盘文件里。假设有1亿条消息,然后有10台机器部署了RocketMQ的Broker,理论上可让每台机器存储1000万条消息。
所以,RocketMQ会通过分布式存储来实现存储海量消息的。分布式存储就是把数据分散在多台机器上来存储,每台机器存储一部分消息,这样多台机器加起来就可以存储海量消息了。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZTQyZGVmOGQxODU3NWExY2ZjMzJkOWJkYWFkNzgzMzEsMTczODE2NjM4MjgxNg==[/attach]
(3)Broker宕机时如何保障高可用
如果一台Broker突然宕机了怎么办?那不就会导致RocketMQ里一部分的消息就没了吗?这就会导致MQ的不可靠和不可用,这个问题怎么解决?
RocketMQ的解决思路是使用Broker主从架构以及多副本策略。简单来说,Broker有Master和Slave两种角色。Master Broker收到消息之后会同步给Slave Broker,这样Slave Broker上就能有一模一样的一份副本数据,这样同一条消息在RocketMQ整个集群里就有两个副本了:一个在Master Broker里,一个在Slave Broker里。这时候如果任何一个Master Broker出现故障,还有一个Slave Broker上有一份数据副本,可以保证数据不丢失,还能继续对外提供服务,保证了MQ的可靠性和高可用性。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=MTFmOWY1YTU3NWNkZDc3ZWQwY2I1YjZmM2Y0NDM4YzksMTczODE2NjM4MjgxNg==[/attach]
(4)如何进行数据路由访问正确的Broker
对于系统来说,要发送消息到MQ里去,还要从MQ里消费消息。那么怎么知道有哪些Broker、怎么知道要连接到哪一台Broker上去发送和接收消息。
RocketMQ为了解决这个问题,引入了NameServer,它也是独立部署在几台机器上的。然后所有的Broker都会把自己注册到NameServer上去,通过NameServer可以知道集群里有哪些Broker。如下图所示:
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=YjRmMTBjMzg0NzIwNzczMWEyNjY4YWE1YTJjMzFhODYsMTczODE2NjM4MjgxNg==[/attach]
然后对于系统ABC而言,如果要发送消息到Broker,会找NameServer获取路由信息:集群里有哪些Broker等。如果系统ABC要从Broker获取消息,也会找NameServer获取路由信息,然后再去找到对应的Broker获取消息。如下就是RocketMQ最基本的一个架构原理图:
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=YjhkYTgxNGI5N2VlZDg3ZWU0MWZhMWNkZTExNDI3YWQsMTczODE2NjM4MjgxNg==[/attach]
4.消息中间件路由中心的架构原理
(1)研究RocketMQ的切入点是NameServer
(2)NameServer可以部署几台机器
(3)Broker会把信息注册到哪个NameServer
(4)系统如何从NameServer获取Broker信息
(5)NameServer如何感知Broker挂了
(6)系统如何感知Broker挂了
(1)研究RocketMQ的切入点是NameServer
根据前面RocketMQ的基本架构原理图,可知RocketMQ一共包含了四个核心的部分:
第一部分是NameServer:这部分很重要,它要负责管理集群里所有Broker的信息,让使用MQ的系统可以通过它感知到集群里有哪些Broker。
第二部分是Broker集群本身:必须得在多台机器上部署这么一个集群,而且还得用主从架构实现数据多副本存储和高可用。
第三部分是向MQ发送消息的系统:这些系统一般称之为生产者,生产者会从NameServer拉取路由信息,然后选择Broker机器建立连接以及发送消息。
第四部分是从MQ获取消息的系统:这些系统一般称之为消费者,到底是Broker主动推送消息给消费者、还是消费者自己从Broker里拉取消息。
NameServer是研究RocketMQ必须优先选择的一个切入点,因为没有NameServer,一切都无从谈起,NameServer是RocketMQ运行的起点。
(2)NameServer可以部署几台机器
要部署RocketMQ,就得先部署NameServer,那么这个NameServer到底可以部署几台机器呢?是一台机器?还是可以部署多台机器?如果部署多台机器,他们之间是怎么协同工作的?
NameServer首先支持部署多台机器,也就是说NameServer是可以集群化部署的。这根据前面RocketMQ的基本架构原理图也可以看出,而NameServer要进行集群化部署最主要的一个原因就是为了高可用。因为NameServer是集群里非常关键的一个角色,它要管理Broker信息,生产者和消费者都要通过它才知道跟哪个Broker通信。如果NameServer只部署一台机器,一旦NameServer宕机了,就会导致RocketMQ集群出现故障。所以通常来说,NameServer一定会多机器部署,实现一个集群,起到高可用的效果。保证任何一台机器宕机,其他机器上的NameServer可以继续对外提供服务。
(3)Broker会把信息注册到哪个NameServer
是否会出现这样的情况:一共有10台Broker机器,2台NameServer机器,然后其中5台Broker会把自己的信息注册到1台NameServer上去,另外5台Broker会把自己的信息注册到另外1台NameServer上去。
实际上不是这样的,因为这样会有一个问题。如果1台NameServer上有5台Broker信息,另外1台NameServer上有另外5台Broker信息,那么此时任何一个NameServer宕机,就会导致5个Broker的信息没了。这种做法会导致数据丢失,系统不可用。
所以RocketMQ的每个Broker启动时都得向所有的NameServer进行注册。也就是说,每个NameServer都会有一份集群中所有Broker的信息。
(4)系统如何从NameServer获取Broker信息
扮演生产者和消费者角色的系统,如何从NameServer获取到集群的Broker信息。它们需要知道集群里有哪些Broker,才能根据一定的算法挑选一个Broker去发送消息或者获取消息。
有两种办法:
第一种办法是NameServer会主动发送请求给所有的系统,告诉它们Broker信息。但这种办法存在的问题是:NameServer不能提前知道要推送Broker信息给哪些系统。
第二种办法是每个系统自己每隔一段时间定时发送请求到NameServer去拉取最新的集群Broker信息。事实上RocketMQ中的生产者和消费者就是这样,生产者和消费者会主动去NameServer拉取Broker信息。
(5)NameServer如何感知Broker挂了
首先一个Broker启动后会向每个NameServer进行注册,这样每个NameServer都知道集群里有这么一台Broker的存在。然后各个系统也从NameServer拉取Broker信息,也知道集群里有这么一台Broker。但是如果这台Broker挂了,NameServer应如何感知到。
要解决这个问题,靠的是Broker跟NameServer之间的心跳机制。Broker会每隔30s给所有的NameServer发送心跳,告诉每个NameServer自己目前还活着。每次NameServer收到一个Broker的心跳,就可以更新一下它的最近一次心跳时间。然后NameServer会每隔10s运行一个任务,去检查一下各个Broker的最近一次心跳时间。如果某个Broker超过120s都没发送心跳,那么就认为这个Broker已经挂掉了。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=OWVhYWJmZGI0ZDQ0NWYyMDI5YTc2ZjNlNDM0ZGNhYTYsMTczODE2NjM4MjgxNw==[/attach]
(6)系统如何感知Broker挂了
如果Broker挂掉了,那么作为生产者和消费者的系统是怎么感知到的?难道需要NameServer发送通知给所有的系统吗?
NameServer并不会主动通知生产者和消费者系统。但如果NameServer没有及时通知这些系统,那么可能出现一种情况:刚开始集群里有10个Broker,各个系统从NameServer那里得知,都以为有10个Broker。结果突然挂了一个Broker,120s没发心跳给NameServer,NameServer是知道现在只有9个Broker了。但此时这些系统是不知道只有9个Broker的,还以为有10个Broker。此时可能某个系统就会发送消息到那个已经挂掉的Broker上去。
这里有两种解决办法:第一可以考虑不发送消息到那台Broker,改成发送到其他Broker上去。第二如果必须要发送消息给那台Broker,由于它的Slave是备份,所以可以继续使用,可以考虑和它的Slave进行通信。
生产者会有一套容错机制,往Broker发送消息时会通过是否有响应来感知Broker是否挂了。如果没有响应,就会重新到NameServer拉取最新的路由信息,于是就知道是否有Broker宕机了。
以上便是NameServer的核心工作原理,总结如下:NameServer会进行集群化部署、Broker会向所有NameServer进行注册、30s心跳机制和120s故障感知机制、生产者和消费者的客户端容错机制。
5.Broker的主从架构原理
(1)Master Broker如何将消息同步给Slave Broker
(2)RocketMQ是否实现读写分离
(3)如果Slave Broke挂了有什么影响
(4)如果Master Broker挂了怎么办
(5)基于Dledger实现RocketMQ高可用自动切换
(1)Master Broker如何将消息同步给Slave Broker
为了保证MQ的数据不丢失而且具备一定的高可用性,一般都会将Broker部署成Master-Slave模式,也就是一个Master Broker对应一个Slave Broker。然后Master需要在接收到消息后,将数据同步给Slave,这样一旦Master Broker挂了,Slave上还有一份数据。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=Y2QxNzc3ZmQ2YjA2N2NkNDIwMzRlNjc1ODNjYjk3NmEsMTczODE2NjM4MjgxNw==[/attach]
上图补充说明:Slave Broker也会向所有的NameServer进行注册,图中没有画出。Slave Broker也会向所有的NameServer每30s发送心跳,图中没有画出。
接下来考虑一个问题,Master Broker是如何将消息同步给Slave Broker的?是Master Broker主动推送给Slave Broker、还是Slave Broker发送请求到Master Broker去拉取。
RocketMQ的Master-Slave模式采取的方式是:Slave Broker不停地发送请求到Master Broker去拉取消息,也就是通过Pull模式拉取消息。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=YTU5MDZiMDY1NWM0ZDY2NzI1MWI4ZWExNzcwNDFiNmEsMTczODE2NjM4MjgxNw==[/attach]
(2)RocketMQ是否实现读写分离
既然Master Broker主要会接收系统的消息写入,然后同步给Slave Broker,那么Slave Broker也应该有一份一模一样的数据。所以作为消费者的系统在获取消息时,是从Master Broker获取还是从Slave Broker获取的?其实都不是:有可能从Master Broker获取消息,也有可能从Slave Broker获取消息。
作为消费者的系统在获取消息时,会先发送请求到Master Broker上去,请求获取一批消息,此时Master Broker会返回一批消息给消费者系统。然后Master Broker在返回消息给消费者系统时,会根据当时Master Broker的负载情况和Slave Broker的同步情况,向消费者系统建议下一次拉取消息时是从Master Broker拉取还是从Slave Broker拉取。
举个例子,如果这个时候Master Broker负载很重,本身要抗10万写并发了。现在消费者系统还要从它这里拉取消息,所以此时该Master Broker就会建议消费者从Slave Broker去拉取消息。
另外一个例子,这时Master Broker上都已写入100万条数据了,结果Slave Broker不知道啥原因同步的特别慢,才同步了96万条数据,落后了整整4万条消息的同步。这时候作为消费者系统可能都获取到96万条数据了,那么下次获取只能从Master Broker去拉取消息,因为Slave Broker同步太慢导致没法从它那里获取更新的消息了。
所以这一切都会由Master Broker根据情况来决定,并非实行严格的读写分离。在写入消息时,是选择Master Broker进行写入的。在拉取消息时,有可能从Master Broker获取,也可能从Slave Broker去获取,一切都根据当时的情况来定。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZmZjNGMyODBiNWYxMTFhZDRlZDUyNWE4YTViZGRiNjcsMTczODE2NjM4MjgxNw==[/attach]
(3)如果Slave Broke挂了有什么影响
如果Slave Broker挂掉了,会对整个系统有一点影响,但是影响不太大。因为消息写入全部是发送到Master Broker的,而消息获取也可以通过Master Broker获取,只不过有一些消息的获取可能是从Slave Broker去获取。
所以如果Slave Broker挂了,那么此时无论消息写入还是消息拉取,还可以继续依赖Master Broker,对整体运行不影响。只不过少了Slave Broker,会导致所有读写压力都集中在Master Broker上。
(4)如果Master Broker挂了怎么办
假设出现了一个故障,Master Broker突然挂了,这时候就会对消息的写入和获取都有一定的影响了。但Slave Broker有Master Broker的备份数据,只不过Slave Broker上的数据可能有部分没来得及从Master Broker同步,此时RocketMQ也不会直接自动将Slave Broker切换为Master Broker。
在RocketMQ 4.5版本之前,都是用Slave Broker同步数据,尽量保证数据不丢失。但是一旦Master故障了,Slave是没法自动切换成Master的。所以在这种情况下,如果Master Broker宕机了,这时就得手动进行一些运维操作:对Slave Broker重新修改配置,重启后调整为Master Broker,这还是有点麻烦的,而且会导致中间一段时间不可用。所以这种Master-Slave模式不是彻底的高可用模式,没法实现自动把Slave切换为Master。
(5)基于Dledger实现RocketMQ高可用自动切换
在RocketMQ 4.5之后,这种情况得到了改变,因为RocketMQ支持了一种新的机制叫Dledger。Dledger是基于Raft协议实现的一个机制,利用Dledger可以实现RocketMQ的高可用自动切换。
简单来说,把Dledger融入RocketMQ后,就可以让一个Master Broker对应多个Slave Broker。也就是说一份数据可以有多份副本,比如一个Master Broker对应两个Slave Broker,Master和Slave会进行数据同步。一旦Master Broker宕机,就可以在多个副本也就是多个Slave中,通过Dledger技术和Raft协议算法进行Leader选举,将其中一个Slave Broker选举为新的Master Broker,然后这个新的Master Broker就可以对外提供服务了。
整个过程也许只要10秒或者几十秒的时间就可以完成。这样就可以实现Master Broker挂掉后,自动从多个Slave Broker中选举出来一个新的Master Broker,继续对外服务,一切都是自动的。
6.高可用的消息中间件生产部署架构
(1)MQ生产部署架构的设计任务
(2)NameServer集群化部署保证高可用性
(3)基于Dledger的Broker主从架构部署
(4)Broker如何跟NameServer进行通信
(5)使用MQ的系统都要多机器集群部署
(6)MQ的核心数据模型——Topic
(7)Topic作为数据集合如何在Broker集群里存储
(8)生产者系统如何将消息发送给Broker
(9)消费者如何从Broker上拉取消息
(10)高可用、高并发、海量消息、可伸缩的架构
(1)MQ生产部署架构的设计任务
基于RocketMQ的核心原理去设计一套RocketMQ的生产部署架构,在这套生产部署架构中,需要着重考虑到高可用的问题,要保证整个系统运行过程中任何一个环节宕机都不能影响系统的整体运行。
(2)NameServer集群化部署保证高可用性
首先,要让NameServer集群化部署,比如可以部署在三台机器上。这样可以充分保证NameServer作为路由中心的可用性,只要还有一个NameServer正常运行,就能保证MQ系统的稳定性。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=MWIxZjUxNDA1YjMzYzg4YmEyMzcyZjZlM2IzZWU0ZWMsMTczODE2NjM4MjgxNw==[/attach]
NameServer的设计采用了Peer-to-Peer模式,也就是可以集群化部署。但里面任何一台NameServer机器都是独立运行的,跟其他机器没有任何通信。
每台NameServer实际上都会有完整的集群路由信息,包括所有的Broker节点信息等。所以只要任何一台NameServer存活下来,就可以保证MQ系统正常运行,不会出现故障。因此NameServer集群化部署是RocketMQ架构部署的第一步。
(3)基于Dledger的Broker主从架构部署
其次,要考虑Broker集群应该如何部署,采用什么方式来部署。如果采用RocketMQ 4.5前的那种Master-Slave架构来部署,能在一定程度上保证数据不丢失,也能保证一定的可用性。但是那种方式最大的问题就是当Master Broker挂了之后,没办法让Slave Broker自动切换为新的Master Broker,需要手工做一些运维操作,修改配置以及重启机器才行,这个非常麻烦。而且在手工运维的期间,会导致系统的不可用。
所以既然RocketMQ 4.5后已经基于Dledger技术实现了可以自动让Slave切换为Master的功能,那么肯定是选择基于Dledger的主备自动切换的功能来进行生产架构的部署。而且Dledger技术是要求至少得是一个Master带两个Slave,由三个Broker组成一个Group,即作为一个分组来运行。一旦Master宕机,就可以从剩余的两个Slave中选举出来一个新的Master对外提供服务,如下图示:
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=NmVjNDIzODA1YjQ3MDkwMjM0NmY3NGVlNGY0YjNmNTIsMTczODE2NjM4MjgxNw==[/attach]
每个Broker(不论是Master和Slave)都会把自己注册到所有的NameServer上去,然后Master Broker还会把数据同步给两个Slave Broker,保证一份数据在不同机器上有多份副本。注意:上图中没有画出Slave Broker注册到NameServer。
(4)Broker如何跟NameServer进行通信
前面介绍了,由于Broker会每隔30秒发送心跳到所有的NameServer上去,然后每个NameServer会每隔10s检查一次有没有哪个Broker超过120s没发送心跳的,如果有就认为那个Broker已经宕机,从路由信息里要摘除这个Broker。下面介绍Broker和NameServer进行通信的细节:
首先,Broker跟NameServer之间的通信是基于TCP长连接来进行的。在RocketMQ的实现中,采用的是TCP长连接进行通信。也就是说,Broker会跟每个NameServer都建立一个TCP长连接,然后定时通过TCP长连接发送心跳请求过去。各个NameServer会通过跟Broker建立好的长连接不断收到心跳数据包,然后通过定时检查Broker有没有120s都没发送心跳数据包,来判定集群里各个Broker到底是否已经挂掉。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=NTFhMWFmNGQ5ZjExZTcwYjA4NmZjNzVmNGZkNzE1NWUsMTczODE2NjM4MjgxNw==[/attach]
(5)使用MQ的系统都要多机器集群部署
日常中一定会有很多的系统使用RocketMQ,有些系统是作为生产者往MQ发送消息,有些系统是作为消费者从MQ获取消息。当然还有的系统既是生产者又是消费者,所以需要考虑这些系统的部署。
对于这些系统的部署本身不应该在MQ的考虑范围内,但是还是应该给出一个建议。就是无论作为生产者还是消费者的系统,都应该多机器集群化部署,保证它自己本身作为生产者或者消费者的高可用性。
因为一个系统如果就部署在一台机器上,然后作为生产者向MQ发送消息。一旦机器上的生产者系统挂了,那么整个流程就断开了,不能保证高可用性。但是如果在多台机器上部署生产者系统,任何一台机器上的生产者挂了,其他机器上的生产者系统可以继续运行。同理消费者系统也应集群化部署,如果一台机器上的消费者系统挂了,其他机器上的消费者系统可以继续运行。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=NzhhZjU3MTZiOWQwNjM1NmY5OGJiYzc4YTNiZDhjODAsMTczODE2NjM4MjgxNw==[/attach]
(6)MQ的核心数据模型——Topic
生产者和消费者都会往MQ里写入消息和获取消息了,但是有一个问题。MQ中的数据模型是什么,生产者投递出去的消息在逻辑上到底是放到哪里去的,是队列还是别的什么。这个涉及的其实就是MQ中的核心数据模型——Topic。
Topic表达的意思是一个数据集合。举个例子,现在订单系统需要往MQ里发送订单消息。那么此时就应该建一个Topic,它的名字可以叫做:topic_order_info,也就是一个包含了订单信息的数据集合。然后订单系统投递的订单消息,都是进入到这个名为topic_order_info的Topic里面。如果仓储系统要获取订单消息,那么它可以指定从名为topic_order_info的Topic里获取消息,获取出来的都是想要的订单消息。
总之,Topic其实就是一个数据集合,不同类型的数据需要放到不同的Topic里去。要是有一些商品数据要发送到MQ里,就应该创建一个Topic,它的名字可以叫:topic_product_info,代表里面都是商品数据。那些想要从MQ里获取商品数据的系统,就可以从名为topic_product_info的Topic里获取了。
所以简单来说,一个系统如果要往MQ里写入消息或者获取消息,首先得创建一些Topic,作为一个数据集合来存放某种类型的消息,比如订单Topic,商品Topic等。
(7)Topic作为数据集合如何在Broker集群里存储
创建的这些Topic是怎么存储在Broker集群里的?比如有一个订单Topic,可能订单系统每天都会往里面投递几百万条数据,然后这些数据在MQ集群上还得保留好多天。那么最终可能会有几千万的数据量,这还只是一个Topic。如果有很多的Topic,并且里面都有大量的数据,最终加起来的总和也许是一个惊人的数字。此时这么大量的数据本身是不太可能存放在一台机器上的。如果一台机器没法放下那么多的数据,那么就应该通过分布式存储来存放。
我们可以在创建Topic时指定让它里面的数据分散存储在多台Broker机器上。比如一个Topic里有1000万条数据,此时有2台Broker,那么就可以让每台Broker上都放500万条数据。这样就可以把一个Topic代表的数据集合分布式存储在多台机器上了。
如下图示,新增一个Master Broker和两个Slave Broker,因为Dledger技术要求每个Master都得带两个Slave来进行选举,然后展示一个订单Topic的数据分布式存储在两个Master Broker上。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZWY4MjA0NGVlNzZkOTkxYzk4ZDZhMDI2MDMzYmZmZGYsMTczODE2NjM4MjgxNw==[/attach]
而且每个Broker在发送定时的心跳给NameServer时,都会告诉NameServer自己当前的数据情况。比如有哪些Topic的哪些数据在自己这里,这些信息都属于路由信息的一部分。这样每个NameServer都知道集群里有哪些Broker,每个Broker都存放了哪些Topic里的数据。
(8)生产者系统如何将消息发送给Broker
在发送消息之前,得先有一个Topic,然后在发送消息时需要指定要发送到哪个Topic里面去。接着既然知道要发送的Topic,那么就可以和NameServer建立一个TCP长连接,然后定时从那里拉取最新的路由信息。这些路由信息会包括集群里有哪些Broker、集群里有哪些Topic、每个Topic都存储在哪些Broker上。
这样生产者系统就可以通过路由信息找到自己要投递消息的Topic分布在哪几台Broker上。接着就可以根据负载均衡算法,从里面选择一台Broke机器出来。选择一台Broker后,就可以和这个Broker建立一个TCP长连接,然后通过长连接向Broker发送消息。Broker收到消息后就会存储在自己本地磁盘里去。
这里唯一要注意的就是:生产者一定是投递消息到Master Broker的,然后Master Broker会同步数据给它的Slave Broker,实现一份数据多份副本,保证Master故障时数据不丢失,而且可以自动把Slave切换为Master提供服务。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=MDU4NzliYWJjZDFiM2IyMmRlMGM2YjA1MjA4M2NkNTcsMTczODE2NjM4MjgxNw==[/attach]
(9)消费者如何从Broker上拉取消息
消费者系统其实跟生产者系统原理是类似的,它们也会和NameServer建立长连接,然后拉取路由信息。接着找到自己要获取消息的Topic在哪几台Broker上,这样就可以和Broker建立长连接,从里面拉取消息了。
这里唯一要注意的就是:消费者系统可能会从Master Broker拉取消息,也可能从Slave Broker拉取消息,看具体情况。
所以,最终的MQ生产部署架构图,如下所示:
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=MTgzZmQ2NWUzNjMyZTA2Zjg4NDJjZTJjOTc2YjBhMTAsMTczODE2NjM4MjgxNw==[/attach]
(10)高可用、高并发、海量消息、可伸缩的架构
上述这套生产架构图实现了完全高可用,因为NameServer随便挂一台都可以。同时也是集群化部署的,每台机器都有完整的路由信息。此外Broker随便挂一台也可以,Slave Broker挂了对集群没太大影响。Master Broker挂了也会基于Dledger技术自动实现Slave切换为Master。甚至生产者系统和消费者系统随便挂一台也可以,因为它们本身就是集群化部署的,其他机器会接管工作。
这个架构可以抗下高并发。因为假设订单系统对订单Topic要发起每秒10万QPS的写入,那么只要订单Topic分散在比如5台Broker上。实际上每台Broker会承载2万QPS写入,也就是说高并发场景下的10万QPS可以分散到多台Broker上抗下来。
然后集群足以存储海量消息。因为所有数据都是分布式存储的,每个Topic的数据都是存储在多台Broker机器上的,用集群里多台Master Broker就足以存储海量的消息。
所以采用多个Master Broker部署的方式,加上Topic分散在多台Broker的机制,可以抗下高并发访问以及海量消息的分布式存储。然后每个Master Broker有两个Slave Broker结合Dledger技术可以实现故障时的Slave-Master自动切换,实现高可用性。最后这套架构还具备伸缩性,如果要抗下更高的并发或者存储更多的数据,可以在集群里加入更多的Broker机器来进行线性扩展。
7.部署一个小规模的RocketMQ集群进行压测
(1)部署一个小规模的RocketMQ集群
(2)公司分配的第一批机器
(3)选择一台机器尝试快速部署RocketMQ集群
(4)完成正式三台NameServer的部署
(5)完成一组Broker集群的部署
(6)编写最基本的生产者和消费者代码准备压测
(1)部署一个小规模的RocketMQ集群
既然已经完成了RocketMQ生产架构的设计了,接着就得一步一步完成真正的生产集群的部署。下面先完成一个小规模的RocketMQ集群的部署。这个小规模的RocketMQ集群部署好之后,还需要对这个集群进行压测,看看在公司的机器配置下可以抗下多高的QPS。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=YTQ1NzM3ZTRkYWZjZmEwZTA2MjQwYzAyNzExOWY2NWQsMTczODE2NjM4MjgxNw==[/attach]
(2)公司分配的第一批机器
RocketMQ集群部署的规划如下:
一.NameServer:3台机器,每台机器都是8核CPU + 16G内存 + 500G磁盘 + 千兆网卡
二.Broker:6台机器,每台机器都是24核CPU + 48G内存 + 1TB磁盘 + 千兆网卡
三.生产者:2台机器,每台机器都是4核CPU + 8G内存 + 500GB磁盘 + 千兆网卡
四.消费者:2台机器,每台机器都是4核CPU + 8G内存 + 500GB磁盘 + 千兆网卡
NameServer是核心的路由服务,所以给8核16G的较高配置的机器。但是它一般就是处理Broker注册和心跳、系统的路由表拉取等请求,负载其实很低,因此不需要特别高的机器配置,部署三台也可以实现高可用的效果了。
Broker的负载是最高的,要处理高并发写入和海量数据存储,所以把最高配置的机器留给它。这里用3台机器组成一个"单Master + 双Slave"的集群。
生产者和消费者机器都是临时用来测试的,而且一般都是业务系统,只会给标准的4核8G的配置。
所有机器都是千兆网卡,足够使用了。
(3)选择一台机器尝试快速部署RocketMQ集群
第一步:在机器上部署RocketMQ之前,先安装一下JDK,同时要在环境变量中设置Java_HOME。
第二步:接着在一台机器上执行下面的命令来构建Dledger。
# git clone https://github.com/openmessaging/openmessaging-storage-dledger.git# cd openmessaging-storage-dledger# mvn clean install -DskipTests第三步:接着在机器上个执行下面的命令来构建RocketMQ。
# git clone https://github.com/apache/rocketmq.git# cd rocketmq# git checkout -b store_with_dledger origin/store_with_dledger# mvn -Prelease-all -DskipTests clean install -U第四步:接着进入一个目录中去编辑文件。
# cd distribution/target/apache-rocketmq在这个目录中,需要编辑三个文件,一个是bin/runserver.sh,一个是bin/runbroker.sh,另外一个是bin/tools.sh。在里面找到如下三行,然后将第二行和第三行都删了,同时将第一行的值修改为自己的JDK的主目录。
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"注意:如果要查看JDK装哪儿了,可以用命令:/usr/libexec/java_home -V,修改为Java主目录即可。
第五步:接着执行下面的命令进行快速RocketMQ集群启动。
# sh bin/dledger/fast-try.sh start这个命令会在当前这台机器上启动一个NameServer和三个Broker。三个Broker其中一个是Master,另外两个是Slave,瞬间就可以组成一个最小可用的RocketMQ集群。
第六步:接着使用下面的命令检查RocketMQ集群的状态。
# sh bin/mqadmin clusterList -n 127.0.0.1:9876此时需要等待一会儿,这个命令执行的过程会有点缓慢,大概可能几秒到几十秒过后,会看到三行记录。说是一个RaftCluster,Broker名称叫做RaftNode00,然后BID是0、1、2,也有可能是0、1、3。这就说明RocketMQ集群启动成功,BID为0的就是Master,BID大于0的就都是Slave,其实也可以叫做Leader和Follower。
第七步:接着就可以尝试一下Slave是如何自动切换为Master的了,假设三台机器的地址分别为:
192.168.31.153:30921192.168.31.153:30911192.168.31.153:30931假设发现30921端口的Broker的BID是0,说明它就是Master。此时可以用命令"lsof -i:30921"找出占用30921端口的进程PID,接着就用kill -9的命令把它杀了。比如占用30921端口的进程PID是4344,那么就执行命令"kill -9 4344"。
第八步:接着等待10s再次执行命令查看集群状态:
# sh bin/mqadmin clusterList -n 127.0.0.1:9876此时就会发现作为Leader的BID为0的节点,变成另外一个Broker了,这就是说Slave切换为Master了。
(4)完成正式三台NameServer的部署
其实RocketMQ集群部署并不难。主要就是在几台机器上做好相应的配置,然后执行一些命令启动NameServer和Broker即可。
首先是在三台NameServer的机器上:先安装好Java,构建好Dledger和RocketMQ,然后编辑对应的文件,设置好JAVA_HOME即可。接着可以执行如下的一行命令来启动NameServer:
# nohup sh mqnamesrv &NameServer监听的端口默认就是9876,Broker的默认端口号是30911。所以如果在三台机器上都启动了NameServer,查看到存在端口都是9876的进程,此时就说明成功启动了三个NameServer。
(5)完成一组Broker集群的部署
接着需要启动一个Master Broker和两个Slave Broker,这个启动也很简单。分别在上述三台为Broker准备的高配置机器上,安装好Java,构建好Dledger和RocketMQ,然后编辑好对应的文件,接着就可以执行如下的命令:
# nohup sh bin/mqbroker -c conf/dledger/broker-n0.conf &这里要注意:第一个Broker的配置文件是broker-n0.conf,第二个Broker的配置文件可以是broker-n1.conf,第三个Broker的配置文件可以是broker-n2.conf。
对于这些配置文件里的内容需要要做如下修改:以broker-n0.conf为例,在每个配置项上加入注释来说明该如何修改每台机器的配置:
# 这个是集群的名称,你整个broker集群都可以用这个名称brokerClusterName = RaftCluster# 这是Broker的名称,比如你有一个Master和两个Slave,那么他们的Broker名称必须是一样的,因为他们三个是一个分组,如果你有另外一组Master和两个Slave,你可以给他们起个别的名字,比如说RaftNode01brokerName=RaftNode00# 这个就是你的Broker监听的端口号,如果每台机器上就部署一个Broker,可以考虑就用这个端口号,不用修改listenPort=30911# 这里是配置NameServer的地址,如果你有很多个NameServer的话,可以在这里写入多个NameServer的地址namesrvAddr=127.0.0.1:9876# 下面两个目录是存放Broker数据的地方,你可以换成别的目录,类似于是/usr/local/rocketmq/node00之类的storePathRootDir=/tmp/rmqstore/node00storePathCommitLog=/tmp/rmqstore/node00/commitlog# 这个是非常关键的一个配置,就是是否启用DLeger技术,这个必须是trueenableDLegerCommitLog=true# 这个一般建议和Broker名字保持一致,一个Master加两个Slave会组成一个GroupdLegerGroup=RaftNode00# 这个很关键,对于每一组Broker,你得保证他们的这个配置是一样的,在这里要写出来一个组里有哪几个Broker,比如在这里假设有三台机器部署了Broker,要让他们作为一个组,那么在这里就得写入他们三个的ip地址和监听的端口号dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913# 这个是代表了一个Broker在组里的id,一般就是n0、n1、n2之类的,这个你得跟上面的dLegerPeers中的n0、n1、n2相匹配dLegerSelfId=n0# 这个是发送消息的线程数量,一般建议你配置成跟你的CPU核数一样,比如我们的机器假设是24核的,那么这里就修改成24核sendMessageThreadPoolNums=24其实最关键的是,Broker应该分为多组,每一组是三个Broker,一个Master和两个Slave。每一组Broker,它们的Broker名称、Group名称都一样,然后配置好一样的dLegerPeers(里面是组内三台Broker的地址)。接着配置好对应的NameServer地址,最后就是每个Broker都有自己的ID,在组内是唯一的。
按照这个思路就可以轻松的配置好一组Broker,在三台机器上分别用命令启动Broker即可。启动完成过后,可以使用如下命令跟NameServer进行通信,检查Broker集群的状态:
# sh bin/mqadmin clusterList -n 127.0.0.1:9876(6)编写最基本的生产者和消费者代码准备压测
接着就可以编写最基本的生产者和消费者代码,准备执行压测。可以新建两个工程,一个是生产者,一个是消费者,两个工程都需要加入下面的依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.1</version></dependency>然后生产者的示例代码如下:简单来说,就是只要一运行,就马上不停的在while循环里去发送消息,根据需要可以设置为多个线程。
public class SyncProducer { public static void main(String[] args) throws Exception { final DefaultMQProducer producer = new DefaultMQProducer("test_producer"); //这里需要设置NameServer地址 producer.setNamesrvAddr("localhost:9876"); producer.start(); //for循环不停地发送消息,这里可以设置为启动多个线程 //然后使用producer去不停地发送消息 for (int i=0; i<10; i++) { new Thread() { @Override public void run() { while(true) { Message msg = new Message( "TopicTest", "TagA", ("Test").getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); } } }.start(); } //程序卡在这里,不能让它结束,就不停地发送消息 while(true) { Thread.sleep(1000); } }}接着是示例用的消费者代码,如下所示:消费者就是不停的去获取消息然后打印出来即可。
public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer"); //设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); //订阅Topic,表明要消费哪些Topic的消息 consumer.subscribe("TopicTest", "*"); //这里注册一个回调接口去接收获取到的消息 consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }}以上便完成了一个小集群:由三个NameServer、三个Broker和四个业务系统机器组成。经过测试,Broker都正常运行,请求NameServer能看到集群信息,且测试发现都能够正常的发送消息和接收消息。
接下来就要调整Broker的OS内核参数、JVM参数然后重新启动Broker,接着就可以启动生产者和消费者去发送消息和获取消息,然后去观察RocketMQ能承载的QPS、CPU、IO、磁盘、网络等负载。
8.如何对RocketMQ集群进行可视化的监控和管理
(1)压测前的监控
(2)RocketMQ的大优势:可视化的工作台
(3)如何通过工作台进行集群监控
(4)机器本身的监控应该如何做
(1)压测前的监控
现在已经知道RocketMQ集群如何部署,接着需要优化生产机器上的OS内核参数和RocketMQ的JVM参数。等这些参数都优化好,才能正式在高配置机器上启动RocketMQ,让它把性能发挥到最高,接着压测才有意义。
当RocketMQ集群参数正式优化好后,需要启动集群,接着用几台机器跑生产者和消费者去进行压测。那么压测时需要关注RocketMQ集群能承载的最高QPS,同时在承载这个QPS的同时,各个机器的CPU、IO、磁盘、网络、内存的负载情况。除了要看机器资源的使用率,还要看JVM的GC情况等。所以这时RocketMQ集群的监控、管理和运维就派上用场了。
(2)RocketMQ的大优势:可视化的工作台
整个RocketMQ集群的元数据都集中在了NameServer里:包括有多少Broker、有哪些Topic、有哪些Producer、有哪些Consumer、目前集群里有多少消息等。
所以如果能想办法进入到NameServer里去,自然就可以知道很多东西。但事实是不行的,因为NameServer并没有对我们打开一扇门让我们进去获取这些信息。
不过RocketMQ提供了一个可视化的管理界面。可以随便找一台机器,比如用NameServer的三台机器中的任意一台,在里面执行如下命令拉取RocketMQ运维工作台的源码:
# git clone https://github.com/apache/rocketmq-externals.git然后进入rocketmq-console的目录:
# cd rocketmq-externals/rocketmq-console执行以下命令对rocketmq-cosole进行打包,把它做成一个jar包:
# mvn package -DskipTests然后进入target目录下,可以看到一个jar包,接着执行下面的命令启动工作台:
# java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=127.0.0.1:9876这里务必要在启动时设置好NameServer的地址,如果有多个地址可以用分号隔开。接着就会看到工作台启动了,然后就通过浏览器访问那台机器的8080端口就可以看到精美的工作台界面。
(3)如何通过工作台进行集群监控
这个可视化的工作台可以说是非常强大的,它几乎满足了大部分对RocketMQ集群监控的需求。
一.首先刚进入界面,会看到类似下图所示
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=YTQ1NzM0N2MwOWI0YTYzMTNlNTgwMjRmNTRkOGU1OTYsMTczODE2NjM4MjgxNw==[/attach]
这时可以点击按钮是"ChangeLanguage"切换成简体中文。在这个界面可以让看到Broker的大体消息负载,还有各个Topic的消息负载,另外还可以根据日期选择查看某一天的监控数据。
二.接着点击上边导航栏里的"集群",就会进入集群的一个监控界面
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=MmRhZjI2NTUzZjBkNDAwNzFjYzZkMTViNDZmNTZhZGYsMTczODE2NjM4MjgxNw==[/attach]
在这个图里可以看到一些非常有用的信息,比如各个Broker的分组、哪些是Master、哪些是Slave、它们各自的机器地址和端口号、还有版本号,包括最重要的就是它们每台机器的生产消息TPS和消费消息TPS、还有消息总数。通过这个TPS统计,就是每秒写入或者被消费的消息数量,就可以看出RocketMQ集群的TPS和并发访问量。
另外在界面右侧有两个按钮,一个是"状态",一个是"配置"。其中点击状态可以看到这个Broker更加细节和具体的一些统计项,点击配置可以看到这个Broker具体的一些配置参数的值。
三.点击上边导航栏的"主题",可以看到下面的界面
通过这个界面就可以对Topic进行管理了,比如可以在这里创建、删除和管理Topic,查看Topic的一些装填、配置等,可以对Topic做各种管理。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=NWU1Mzg3Y2ZjYjE1YWYyZTAwOGYyY2VhYjgxZTk3NDIsMTczODE2NjM4MjgxNw==[/attach]
四.接着点击上边导航栏里的"消费者"和"生产者",就可以看到MQ集群的消费者和生产者了,可以做对应的一些管理
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=YzE2MzM1ZTk4NjIwNzQ4NzMzMWMwM2M0Y2JhNmJmNDMsMTczODE2NjM4MjgxNw==[/attach]
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZDI5NDkzZDZlMWFhNmEyZDY5YjQ5NWJjNWZmZWI5MTgsMTczODE2NjM4MjgxNw==[/attach]
五.接着点击导航栏里的"消息"和"消息轨迹",又可以对消息进行查询和管理
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=NTBjNWJjYjNlMDA3ZTUzM2I0NTNiNDQ4Zjc0M2M0YmIsMTczODE2NjM4MjgxNw==[/attach]
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=YTk0N2JiYWMyNmQ4MjA4N2RkNTEzOGNjYzk4YzMxYzAsMTczODE2NjM4MjgxNw==[/attach]
大体上这个工作台的监控和管理功能就是这些了。通过这个工作台,就可以对集群整体的消息数量以及消息TPS、还有各个Broker的消息数量和消息TPS进行监控。同时还可以对Broker、Topic、消费者、生产者、消息这些东西进行对应的查询和管理,非常便捷。
(4)机器本身的监控应该如何做
通过RocketMQ运维工作台可以在压测时看到整个RocketMQ的TPS了,即每秒消息数量。但如果要同时看到集群每台机器的CPU、IO、磁盘、内存、JVM GC的负载,就需要使用其他监控系统如Zabbix等。
9.进行OS内核参数和JVM参数的调整
(1)压测前的参数调整工作
(2)对RocketMQ集群进行OS内核参数的调整
(3)对JVM参数进行调整
(4)对RocketMQ核心参数进行调整
(1)压测前的参数调整工作
像RocketMQ这种中间件集群,应该如何进行压测,要准备哪些东西。如果是一些经验不丰富的平时主要是做CRUD类工作的Java工程师,可能就会直接运行几个生产者和消费者的程序,然后多开一些线程写数据到RocketMQ,同时从RocketMQ消费数据。接着从RocketMQ的管理工作台中看一下TPS,每秒可以处理多少条消息,此时就算是完成压测。
但是对于生产环境的中间件集群,不能直接用各种默认参数启动。因为那样可能有很多问题,或者没法把中间件的性能发挥出来。
下面介绍压测一个中间件的步骤:
步骤一:首先需要对部署它的机器的OS内核参数进行一定的调整,也就是Linux操作系统的一些内核参数。因为OS内核参数很多默认值未必适合生产环境的系统运行,有些参数值需要调整大些,才能让中间件发挥出性能。
步骤二:接着就是JVM的各种配置参数优化。因为一般中间件都是Java开发的或者是基于JVM的Scala开发的。比如由Java开发的有:RocketMQ、MyCat、Elasticsearch,由Scala开发的有Kafka。所以可以认为在一台机器上部署和启动一个中间件系统其实就是启动一个JVM进程。由这个JVM进程来运行中间件系统内的所有代码,然后实现中间件系统的各种功能。下图清晰展示了在一台机器上,启动部署的一个中间件系统,就是启动一个JVM进程。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=ZjZjZGI2MTQ4ZWIzY2NjNmYzY2ZmMTI1YTVmMGZlYjgsMTczODE2NjM4MjgxNw==[/attach]
所以对于一个生产环境的中间件系统而言,在部署和启动之前,需要关注的第二种参数就是JVM的各种参数。如内存区域的大小分配、垃圾回收器以及对应的行为参数、GC日志存放地址、OOM自动导出内存快照的配置等。
如果部署了一个几十GB内存的高配置物理机,结果只给中间件系统的JVM分配了1GB内存,这就很不合理。相当于机器配置很高,结果中间件系统就用了里面的几十分之一的内存,根本没用上那么多的资源。
步骤三:最后就是中间件系统自己本身的一些核心参数的设置。比如中间件系统会开启很多线程处理请求和工作负载,然后还会进行大量的网络通信,同时会进行大量的磁盘IO类的操作。这时就需要依据机器配置,合理的对中间件系统的核心参数进行调整。
如果机器配置很高,是24核CPU,结果中间件系统默认就开启4个工作线程去处理请求,就很不合理了。相当于24核CPU里很多都是空闲状态,是没有任何事情可以干的。要是不进行合理的参数设置,几乎可以认为就是在浪费高配置的机器资源。
所以,以上三点就是一个中间件系统在进行压力测试以及生产环境部署之前,都必须要进行调整的。
当然如果是普通的那种Java Web业务系统,通常上线之前主要关注的就是JVM的参数而已,对OS内核参数以及业务系统自身参数大多数情况下都没有太多的要求。但对于中间件系统而言,往往要对OS内核参数、JVM参数以及自身核心参数都做出相应的调整,再进行压测和上线。
(2)对RocketMQ集群进行OS内核参数的调整
接下来介绍RocketMQ集群部署的机器需要调整的一些OS内核参数含义以及对应的调整值。
一.vm.overcommit_memory
vm.overcommit_memory这个参数有三个值可以选择:0、1、2。如果值是0的话,在中间件系统申请内存时,OS内核会检查可用内存是否足够。如果足够就分配内存给中间件系统,如果剩余内存不够就拒绝中间件系统的申请,导致申请内存失败,进而导致中间件系统异常出错。
因此一般将这个参数的值调整为1,意思是把所有可用的物理内存都允许分配给中间件系统。只要有内存就给中间件系统来用,这样可以避免申请内存失败的问题。
比如线上环境部署的Redis因为这个参数是0,导致在save数据快照到磁盘文件时需要申请大内存被拒绝,导致异常报错。可以用如下命令进行修改:
# echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf二.vm.max_map_count
这个参数值会影响中间件系统可以开启的线程的数量,同样也是非常重要的。如果这个参数过小,有时候可能会导致有些中间件无法开启足够的线程,进而导致报错,甚至中间件系统挂掉。它的默认值是65536,但是这个值有时候是不够的。
比如生产环境部署的Kafka集群曾经有一次就报出过这个异常,说无法开启足够多的线程,导致Kafka宕机。因此建议可以把这个参数调大10倍,比如655360这样的值,保证中间件可以开启足够多的线程。可以用如下命令进行修改:
# echo 'vm.max_map_count=655360' >> /etc/sysctl.conf三.vm.swappiness
这个参数是用来控制进程的swap行为的,简单来说就是OS会把一部分磁盘空间作为swap区域。如果有的进程现在不是太活跃,就会被操作系统调整其进程状态为睡眠状态,把进程中的数据放入磁盘上的swap区域,然后让这个进程把原来占用的内存空间腾出来,交给其他活跃运行的进程来使用。
如果这个参数的值设置为0,意思就是尽量别把任何一个进程放到磁盘swap区域去,尽量大家都用物理内存。如果这个参数的值是100,那么意思就是尽量把一些进程给放到磁盘swap区域去,内存腾出来给活跃的进程使用。默认这个参数的值是60,可能会导致我们的中间件运行不活跃时被迫腾出内存空间然后放磁盘swap区域去。
因此通常在生产环境建议把这个参数调整小一些,比如设置为10,尽量用物理内存,别放磁盘swap区域。可以用如下命令进行修改:
# echo 'vm.swappiness=10' >> /etc/sysctl.conf四.ulimit
这个是用来控制Linux上的最大文件链接数的,默认值可能是1024,一般肯定是不够的。因为中间件系统在大量频繁的读写磁盘文件时,或者是进行网络通信时,都会跟这个参数有关系。对于一个中间件系统而言肯定是不能使用默认值的,如果采用默认值,很可能在线上会出现如下错误:
error: too many open files因此通常建议用如下命令修改这个值:
# echo 'ulimit -n 1000000' >> /etc/profile五.总结
其实这几个参数,都是跟内存管理、线程数量、磁盘文件IO、网络通信有关系的,因为我们的中间件系统在运行的时候无非就是跟这些打交道。
中间件系统需要开启大量的线程(跟vm.max_map_count有关)中间件系统需要进行大量的网络通信和磁盘IO(跟ulimit有关)中间件系统需要使用大量的内存(跟vm.swappiness和vm.overcommit_memory有关)(3)对JVM参数进行调整
下面介绍RocketMQ的JVM参数该如何调整,首先得先找到RocketMQ启动脚本中是如何设置JVM参数的,并且明白默认JVM参数的含义。
在下面的目录下,就有对应的启动脚本。比如mqbroker是用来启动Broker的,mqnamesvr是用来启动NameServer的。
rocketmq/distribution/target/apache-rocketmq/bin用mqbroker来举例,查看这个脚本里的内容,最后有如下一行:这行内容就是用runbroker.sh脚本来启动一个JVM进程,JVM进程刚开始执行的main类是org.apache.rocketmq.broker.BrokerStartup。
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@接着看runbroker.sh脚本,里面的内容如下:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"在上面的内容中,其实就是在为启动Broker设置对应的JVM参数和其他一些参数,我们可以把其中JVM相关的参数抽取出来给解释一下:
"-server -Xms8g -Xmx8g -Xmn4g -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m -XX:-OmitStackTraceInFastThrow -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=15g -XX:-UseLargePages -XX:-UseBiasedLocking"-server:这个参数就是说用服务器模式启动。
-Xms8g -Xmx8g -Xmn4g:这个是很关键参数,也是重点需要调整的。它的意思是默认的堆大小是8G内存,新生代是4G内存,但是我们的高配物理机是48G内存的。所以这里完全可以给它们翻几倍,比如给堆内存20G,新生代给10G,甚至可以更多一些,当然要留一些给操作系统。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m:这几个参数也很重要。这是选用了G1垃圾回收器来做分代回收,对新生代和老年代都是用G1来回收。这里把G1的Region大小设置为了16M,这个因为机器内存比较多,所以Region大小可以调大一些给到16M。否则,如果用2M的Region,会导致Region数量过多。
-XX:G1ReservePercent=25:这个参数意思是,在G1管理的老年代里预留25%的空闲内存,保证新生代对象晋升到老年代时有足够空间,避免老年代内存都满了,导致新生代有对象要进入老年代时没有充足内存。默认值是10%,略微偏少,这里RocketMQ给调大了一些。
-XX:InitiatingHeapOccupancyPercent=30:这个参数的意思是,当堆内存的使用率达到30%后,就自动启动G1的并发垃圾回收,开始尝试回收一些垃圾对象。默认值是45%,这里调低了一些,也就是提高了GC的频率,这可以避免垃圾对象过多、一次垃圾回收耗时过长的问题。
-XX:SoftRefLRUPolicyMSPerMB=0:这个参数默认设置为0,其实建议这个参数不要设置为0,避免频繁回收一些软引用的Class对象,这里可以调整为比如1000。
-verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m:上述这一堆参数都是控制GC日志打印输出的,确定了GC日志文件的地址,要打印哪些详细信息。然后控制每个GC日志文件的大小是30M,最多保留5个GC日志文件。
-XX:-OmitStackTraceInFastThrow:这个参数是指,有时候JVM会抛弃一些异常堆栈信息,因此这个参数设置之后,就是禁用这个特性,要把完整的异常堆栈信息打印出来。
-XX:+AlwaysPreTouch:指定的JVM内存,JVM启动时不会真正分配,在实际需要使用时才进行分配。使用这个参数后,就会强制让JVM启动时直接分配指定内存,不要等到使用内存时再分配。
-XX:MaxDirectMemorySize=15g:RocketMQ里大量用了NIO中的Direct Buffer,这里限定了Direct Buffer最多申请多少。如果机器内存比较大,可以适当调大这个值。
-XX:-UseLargePages -XX:-UseBiasedLocking:这两个参数的意思是禁用大内存页和偏向锁。
RocketMQ默认的JVM参数总结:首先采用了G1垃圾回收器,默认堆内存大小是8G、新生代是4G。这个可以根据机器内存来调整,增大一些也是没问题的。然后是一些G1垃圾回收的行为参数设置,这个一般不用改动。接着会对GC日志打印进行设置,这个一般也不用改动。最后会禁用一些特性,开启一些特性,这些都直接维持RocketMQ的默认值即可。
(4)对RocketMQ核心参数进行调整
介绍完RocketMQ默认的JVM参数以及要做的一些微调后,继续介绍RocketMQ自身的一些核心参数调整。在下面的目录里有dledger的示例配置文件:
rocketmq/distribution/target/apache-rocketmq/conf/dledger里面有一个较为核心的参数:sendMessageThreadPoolNums=16。这个参数的意思就是RocketMQ内部用来发送消息的线程池的线程数量默认是16。这个参数可以根据机器的CPU核数进行适当增加,比如机器CPU是24核的,可以增加这个线程数量到24或者30。
(5)总结
中间件系统在压测或者上生产前,需要对三大块参数进行调整:OS内核参数、JVM参数以及中间件核心参数。
OS内核参数调整的地方都是与磁盘IO、网络通信、内存管理以及线程管理有关,需要适当调节大小。
JVM参数需要我们去中间件系统的启动脚本中寻找它的默认JVM参数。然后根据机器的情况,对JVM的堆内存大小、新生代大小、Direct Buffer大小等,做出一些调整,发挥机器的资源。
中间件核心参数调整的地方也是与磁盘IO、网络通信、内存管理、线程管理相关的。根据机器资源,可以增加网络通信线程、控制同步刷磁盘或者异步刷磁盘线程数量有多少、内存中一些队列的大小。
10.如何对小规模RocketMQ集群进行压测
(1)压测就是拼命往死了压吗
(2)一次RocketMQ小规模集群的压测
(3)基于公司业务情况规划生产集群
(1)压测就是拼命往死了压吗
所谓的压测到底是如何压测?难道就是简单的启动几台机器上的生产者和消费者,然后每台机器都开启大量线程模拟并发写入RocketMQ以及并发从RocketMQ上读取消息吗?难道就是不停增加压测用的生产者和消费者的机器,同时不停增加它们的线程,然后去看RocketMQ集群的极限在哪里吗?在压测的时候到底要关注哪些指标?采用什么样的压测方式才是正确的的?
其实如果对一个小规模的RocketMQ集群进行疯狂的压测,最后压测出来一个最大的极限TPS值。那也只是压测中我们想要的一个结果而已,并不是实际中的最理想结果。假设我们现在对部署好的RocketMQ集群拼命进行压测,不停的增加生产者和消费者的机器以及线程数量,不停的增加RocketMQ集群的并发写入量和并发消费量,发现RocketMQ集群似乎可以抗下每秒10万+的消息量。那么在生产环境上,我们也不可能放心的让RocketMQ集群来抗这么高的TPS。
因为在压测的时候一方面要关注RocketMQ能抗下多少TPS,另一方面还要关注RocketMQ部署的几台机器的资源使用率和负载情况。
比如RocketMQ集群在抗下10万TPS(可以理解为每秒处理10万条消息)的同时,结果机器的CPU负载达到100%,内存几乎消耗殆尽,IO负载极高,网卡流量打满甚至快要打爆,此时这个10万TPS的成本就太高了。
因为眼看着抗下了超高的TPS,结果自己机器资源消耗殆尽,几乎机器都快挂了。那么在真正的生产环境下是不可能放心的允许RocketMQ集群抗到10万TPS的,因此在机器快挂掉的情况下让中间件抗超高的负载是绝对不行的。
所以这种极限压测方法,仅仅能压测出一个极限值而已。实际上我们平时做压测,主要关注的还是要压测出一个最合适的最高负载。
什么叫最合适的最高负载呢?意思就是在RocketMQ的TPS和机器的资源使用率和负载之间取得一个平衡。比如RocketMQ集群在机器资源使用率极高的极端情况下可以扛到10万TPS,但是当它仅仅抗下8万TPS时,就会发现CPU负载、内存使用率、IO负载和网卡流量,都负载较高,但是可以接受,机器比较安全,不至于宕机。那么这个8万TPS实际上就是最合适的一个最高负载。也就是说,哪怕生产环境中极端情况下,RocketMQ的TPS飙升到8万TPS,知道机器资源也是大致可以抗下来的,不至于出现机器宕机的情况。
所以我们做压测,其实最主要的是综合TPS以及机器负载,尽量找到一个最高的TPS同时机器的各项负载在可承受范围之内,这才是压测的目的。
(2)一次RocketMQ小规模集群的压测
以下压测过程以及压测结果,都是根据我们之前真实的RocketMQ压测报告总结而来,非常的有代表性,完全可以结合前面介绍的机器配置来参考。
一.RocketMQ的TPS和消息延时
我们让两个Producer不停的往RocketMQ集群发送消息,每个Producer所在机器启动了80个线程,相当于每台机器有80个线程并发的往RocketMQ集群写入消息。RocketMQ集群是1主2从组成的一个dledger模式的高可用集群,只有一个Master Broker会接收消息的写入。
然后有2个Consumer不停地从RocketMQ集群消费数据,每条数据的大小是500字节。这个数字非常关键,因为它跟后续的网卡流量有关。
我们发现,一条消息从Producer生产出来到经过RocketMQ的Broker存储下来,再到被Consumer消费,基本上这个时间跨度不会超过1秒钟,这些这个性能是正常而且可以接受的。
同时在RocketMQ的管理工作台中可以看到:Master Broker的TPS(也就是每秒处理消息的数量),可以稳定的达到7万左右,也就是每秒可以稳定处理7万消息。
二.CPU负载情况
其次我们检查了一下Broker机器上的CPU负载,可以通过top命令、uptime命令来查看。比如执行top命令就可以看到CPU Load和CPU使用率,这就代表了CPU的负载情况。
在执行了top命令之后,往往可以看到如下这样一行信息:
load average:12.03,12.05,12.08类似上面那行信息代表的是CPU在1分钟、5分钟和15分钟内的CPU负载情况。比如我们一台机器是24核的,那么上面的12意思就是有12个核在使用中。换言之就是还有12个核其实还没使用,CPU还是有很大余力的。这个CPU负载其实是比较好的,因为并没有让CPU负载达到极限。
三.内存使用率
使用free命令就可以查看到内存的使用率。根据当时的测试结果,机器上48G的内存,仅仅使用了一部分,还剩下很大一部分内存都是空闲可用的。或者是被RocketMQ用来进行磁盘数据缓存了,所以内存负载是很低的。
四.JVM GC频率
使用jstat命令就可以查看RocketMQ的JVM的GC频率。基本上新生代每隔几十秒会垃圾回收一次,每次回收过后存活的对象很少,几乎不进入老年代。因此测试过程中,Full GC几乎一次都没有。
五.磁盘IO负载
接着可以检查一下磁盘IO的负载情况。首先可以用top命令查看一下IO等待占用CPU时间的百分比,执行top命令之后,会看到一行类似下面的东西:
Cpu(s): 0.3% us, 0.3% sy, 0.0% ni, 76.7% id, 13.2% wa, 0.0% hi, 0.0% si在这里的13.2% wa,指的就是磁盘IO等待在CPU执行时间中的百分比。如果这个比例太高,说明CPU执行的时候大部分时间都在等待执行IO,也就说明IO负载很高,导致大量的IO等待。
当时我们压测时,这个wa是在40%左右,说明IO等待时间占用CPU执行时间的比例在40%左右。这是相对高一些,但还是可以接受的。只不过如果继续让这个比例提高上去,就很不靠谱了,因为说明磁盘IO负载可能过高了。
六.网卡流量
使用如下sar命令可以查看服务器的网卡流量:
# sar -n DEV 1 2通过这个命令就可以看到每秒钟网卡读写数据量了。当时服务器使用的是千兆网卡,千兆网卡的理论上限是每秒传输128M数据,但是一般实际最大值是每秒传输100M数据。因此当时发现的一个问题就是,在RocketMQ处理到每秒7万消息时,每条消息500字节左右的大小的情况下,每秒网卡传输数据量已经达到100M了,就是已经达到了网卡的一个极限值了。
因为一个Master Broker服务器:每秒不光是通过网络接收你写入的数据、还要把数据同步给两个Slave Broker、还有别的一些网络通信开销。因此实际压测发现,每条消息500字节,每秒7万消息时,服务器的网卡就几乎被打满了,无法承载更多的消息了。
七.针对压测的总结
实际上经过压测,发现我们服务器的性能瓶颈在网卡上,因为网卡每秒能传输的数据是有限的。因此当我们使用平均大小为500字节的消息时,最多就是做到RocketMQ单台服务器每秒7万的TPS。而且这个时候CPU负载、内存负载、JVM GC负载、磁盘IO负载,基本都还在正常范围内。只不过这时网卡流量基本已经打满了,无法再提升TPS了。因此在这样的一个机器配置下,RocketMQ一个比较靠谱的TPS就是7万左右。
(3)基于公司业务情况规划生产集群
就全公司的情况而言,实际上现在还处于创业成长期。即使在搞双11活动高峰时,公司后台系统的并发访问量也就是每秒上万,即使多考虑一些,每秒几万的并发量也够了。
因此在部署时:NameServer采用3台机器部署,Broker采用6台机器部署,2个Master Broker和4个Slave Broker。这样2个Master Broker每秒最多可以处理十几万消息,4个Slave Broker同时也能每秒提供高吞吐的数据消费。而且全面保证高可用性,这样的一个生产部署架构,绝对是可以满足公司现在的消息吞吐量的。
(4)总结
一.到底应该如何压测
应该在TPS和机器的CPU负载、内存使用率、JVM GC频率、磁盘IO负载、网络流量负载之间取得一个平衡。尽量让TPS尽可能提高,同时让机器的各项资源负载不要太高。
二.实际压测过程
采用几台机器开启大量线程并发读写消息,然后观察TPS、CPU Load(使用top命令)、内存使用率(使用free命令)、JVM GC频率(使用jstat命令)、磁盘IO负载(使用top命令的wa)、网卡流量负载(使用sar命令),不断增加机器和线程,让TPS不断提升上去,同时观察各项资源负载是否过高。
三.生产集群规划
根据公司的后台整体QPS来定,稍微多冗余部署一些机器即可。实际部署生产环境的集群时,使用高配置物理机,同时合理调整OS内核参数、JVM参数、中间件核心参数即可。
11.消息中间件集群生产部署规划梳理
消息中间件集群生产部署架构规划的梳理,如下图示。从消息中间件到底是什么开始,然后考虑如何对MQ进行技术选型,接着了解RocketMQ的核心技术原理,然后是RocketMQ的生产部署方案如何制定,接着开始上手部署一个小规模的RocketMQ集群,同时做好集群的可视化监控,并且对小规模的RocketMQ集群进行生产级的参数调整,然后对RocketMQ集群进行压测,最后根据压测结果规划一个生产集群进行部署。
当这个阶段的内容都做完后,就可以正式进入基于RocketMQ集群完成系统架构改造的环节了。
[attach]https://mp.toutiao.com/mp/agw/article_material/open_image/get?code=MWEzMGFjNTZkOTJjNWU1Y2Y4OTNkNjM5MTczMDI4OTQsMTczODE2NjM4MjgxNw==[/attach]
|
|