目录
安装环境说明
RocketMQ 下载与安装
启动 NameServer 服务和路由
启动 Broker 中间件
内存分配失败解决
发送与接收消息
关闭服务器 与 常用命令
安装环境说明
1、工欲善其事,必先利其器。既然 RocketMQ 这么深受喜爱,那么本文就讲解如何安装使用 RocketMQ
Apache 上开源官方地址:https://rocketmq.apache.org/ GitHub 托管地址:https://github.com/apache/rocketmq 阿里官方的介绍文档:http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/ Apache 官方 “快速入门” 文档:https://rocketmq.apache.org/docs/quick-start/ |
2、官方要求环境如下:
64bit OS, Linux/Unix/Mac is recommended;(Windows user see guide below) #本人使用的 CentOS 7.2 |
3、本文环境:CentOS 7.2 + Java JDK 1.8 + Maven 3.5.4 + RocketMQ 4.7.1
RocketMQ 下载与安装
一)下载源码
1、下载 RocketMQ 即可以从 Apache 官网下载,也可以从 gitHub 上进行下载,无论哪种方式,下载解压后都是一样的。
2、从 Apache 官网下载:直接下载整个 RocketMQ 项目的源码(带source的,也可以直接下载二进制文件[带Binary的],二进制的不需要再编译部署,可以直接使用),如:https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip
3、从 GitHub 官网下载:因为项目是开源的,所以可以直接从 GitHub 下载整个项目的源码。
4、这里本人直接从 CentOS 上使用 wget 命令下载源码然后进行编译部署:wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip
二)解压源码
1、上面下载的是整个 RocketMQ 项目的源码,可以自己学习或者定制开发,但是要想使用还想编译。
2、官方推荐解压命令:unzip rocketmq-all-4.7.1-source-release.zip (当然应该也是可以使用其它解压命令的,如 tar 等,这里还是和官方保持一致,当没有安装 unzip 命令包时是使用不了,此时应该先安装 unzip 命令包)
yum list unzip:查看 unzip 包的安装情况 yum install unzip.x86_64:没有安装时,使用命令安装 unzip |
3、RocketMQ 源码压缩包解压 unzip rocketmq-all-4.7.1-source-release.zip,大量的 inflating 输出,解压完成之后,同目录下会多一个目录 rocketmq-all-4.7.1-source-release,才 11 M。
rm rocketmq-all-4.7.1-source-release.zip #解压后,可以删除原来的压缩包 mv rocketmq-all-4.7.1-source-release/ rocketmq-4.7.1 #对解压后的目录重命名 |
三)部署安装
1、使用 Maven 来编译整个项目,官方推荐命令:mvn -Prelease-all -DskipTests clean install -U (进入解压目录下执行此命令)
2、接着就会从 Maven 中央仓库下载项目依赖的 jar 包编译部署安装,这可能需要一点时间。
3、项目部署完成后,可以进入 cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1 目录查看,此发布目录才是将来真正需要使用到的目录,脚本、配置文件等等全在里面。
启动 NameServer 服务和路由
1、进入 RocketMQ 的 distribution/target/apache-rocketmq 目录,启动的时候先启动 namesrv,然后启动 broker,官方指导命令:
# 启动命令,并且常驻内存,nohup 属于后台启动,当前目录下生成 nohup.out 日志文件,也可以指定日志输出位置。
# sh bin/mqnamesrv :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
$ nohup sh bin/mqnamesrv &
# 查看启动日志,能看到 "The Name Server boot success" 字样则成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
2、如下所示,看到 “ The Name Server boot success. serializeType=JSON ” 表示启动成功
[root@localhost apache-rocketmq]# pwd
/usr/local/rocketmq-all-4.3.0/distribution/target/apache-rocketmq
[root@localhost apache-rocketmq]# nohup sh bin/mqnamesrv &
[1] 5865
[root@localhost apache-rocketmq]# nohup: 忽略输入并把输出追加到"nohup.out"
tail -f ~/logs/rocketmqlogs/namesrv.log
2018-08-10 01:29:12 INFO main - tls.client.keyPath = null
2018-08-10 01:29:12 INFO main - tls.client.keyPassword = null
2018-08-10 01:29:12 INFO main - tls.client.certPath = null
2018-08-10 01:29:12 INFO main - tls.client.authServer = false
2018-08-10 01:29:12 INFO main - tls.client.trustCertPath = null
2018-08-10 01:29:13 INFO main - Using OpenSSL provider
2018-08-10 01:29:14 INFO main - SSLContext created for server
2018-08-10 01:29:14 INFO NettyEventExecutor - NettyEventExecutor service started
2018-08-10 01:29:14 INFO main - The Name Server boot success. serializeType=JSON
2018-08-10 01:29:14 INFO FileWatchService - FileWatchService service started
启动 Broker 中间件
1、进入 RocketMQ 的 distribution/target/apache-rocketmq 目录,官方指导命令:
# 启动命令,并且常驻内存,nohup 属于后台启动,当前目录下生成 nohup.out 日志文件,也可以指定日志输出位置。 sh bin/mqbroker -n localhost:9876 :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出 # 查看启动日志 |
2、如下图所示,启动之后同样提示将日志信息追加到了当前目录下的 nohup.out 文件中,查看日志,如果看到 "The broker[%s, 172.30.30.233:10911] boot success…" ,则表示启动成功。
3、如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件,则打开 当前目录(apache-rocketmq)下的 nohup.out 日志文件查看,发现启动 Broker 失败:无法分配内存,则需要修改配置。
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/rocketmq-all-4.3.0/distribution/target/apache-rocketmq/hs_err_pid7209.log
4、如果需要通过 Java 应用访问 RocketMQ 服务器,则需要开放 Linux 防火墙端口:
firewall-cmd –zone=public –add-port=9876/tcp –permanent #开启 9876 端口 firewall-cmd –reload #重启防火墙 firewall-cmd –zone=public –list-ports #查看开放的端口 |
内存分配失败解决
1、如果启动 broker 失败,则是因为 apache-rocketmq/bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内存太大,而系统实际内存却太小导致启动失败。
2、通常像虚拟机上安装的 CentOS 等 Linux 服务器内存本身较小,所以必须修改 rocketMQ 配置,而如果是买的阿里云、华为云等单独的服务器,内存比较大时,则不会容易出现此问题,官方要求是必须有 4g 可用内存以上。
3、找到下面的 distribution/target/apache-rocketmq/bin 下的 runbroker.sh 和 runserver.sh 文件,然后进行调整。
3.1、runserver.sh 文件中 修改 JVM 配置下的第一行,将原来 4g 视自身服务器情况调小一点
#===========================================================================================
# JVM Configuration
#===========================================================================================
#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"$JAVA ${JAVA_OPT} $@
3.2、runbroker.sh 文件中 修改 JVM 配置下的第一行,将原来 8g 视自身服务器情况调小一点
#===========================================================================================
# JVM Configuration
#===========================================================================================
#JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
4、然后再次启动 broker 时就成功了,broker 注册到了 nameserver 上了。
发送与接收消息
1、发送/接收消息之前,需要告诉客户端(Producer、Consumer)名称服务器(Nameserver)的位置,RocketMQ 提供了多种方法来实现这一点:
编程方式,如: Java 选项,如 环境变量,如: HTTP 端点 |
2、官方快速入门发送指导命令(在distribution/target/apache-rocketmq 目录下执行):
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= … |
3、如下所示官方提供这个例子属于生产者,用于发送消息,运行之后会自动发送大量的消息,之后就会退出,用于测试搭建的环境是否可用。
[root@localhost apache-rocketmq]# export NAMESRV_ADDR=localhost:9876
[root@localhost apache-rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
03:05:13.152 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
SendResult [sendStatus=SEND_OK, msgId=C0A83A8120207D4991AD2F02D2280000, offsetMsgId=C0A83A8100002A9F00000000000AFAC8, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=1000]
SendResult [sendStatus=SEND_OK, msgId=C0A83A8120207D4991AD2F02D2740001, offsetMsgId=C0A83A8100002A9F00000000000AFB7A, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=1000]
SendResult [sendStatus=SEND_OK, msgId=C0A83A8120207D4991AD2F02D27A0002, offsetMsgId=C0A83A8100002A9F00000000000AFC2C, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=1000]
.........................
4、官方快速入门接收消息指导命令:sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer,如下所示,显然是多线程的
[root@localhost apache-rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=1031, sysFlag=0, bornTimestamp=1533841514766, bornHost=/192.168.58.129:46816, storeTimestamp=1533841514767, storeHost=/192.168.58.129:10911, msgId=C0A83A8100002A9F00000000000B518A, commitLogOffset=741770, bodyCRC=895437781, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1250, CONSUME_START_TIME=1533841719674, UNIQ_KEY=C0A83A8120207D4991AD2F02D50E007C, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 50, 52], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=1030, sysFlag=0, bornTimestamp=1533841514746, bornHost=/192.168.58.129:46816, storeTimestamp=1533841514747, storeHost=/192.168.58.129:10911, msgId=C0A83A8100002A9F00000000000B4EBA, commitLogOffset=741050, bodyCRC=842174412, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1250, CONSUME_START_TIME=1533841719669, UNIQ_KEY=C0A83A8120207D4991AD2F02D4FA0078, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 50, 48], transactionId='null'}]]
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=0, storeSize=179, queueOffset=1019, sysFlag=0, bornTimestamp=1533841514511, bornHost=/192.168.58.129:46816, storeTimestamp=1533841514515, storeHost=/192.168.58.129:10911, msgId=C0A83A8100002A9F00000000000B2FE2, commitLogOffset=733154, bodyCRC=884882597, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1250, CONSUME_START_TIME=1533841719680, UNIQ_KEY=C0A83A8120207D4991AD2F02D40F004C, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 54], transactionId='null'}]]
...............
关闭服务器 与 常用命令
1、同样都是在 RocketMQ 安装目录下的 distribution/target/apache-rocketmq 目录下执行命令,与启动顺序相反进行关闭,先关闭 broker、在关闭 nameserv
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
2、除了上面启动与关闭几个命令之外,还有如下一些较常用的命令,ip 请以实际为准:
查看集群情况 | ./bin//mqadmin clusterList -n 127.0.0.1:9876 |
查看 broker 状态 | ./bin/mqadmin brokerStatus -n 127.0.0.1:9876 -b 172.20.1.138:10911 |
查看 topic 列表 | ./bin/mqadmin topicList -n 127.0.0.1:9876 |
查看 topic 状态 | ./bin/mqadmin topicStatus -n 127.0.0.1:9876 -t MyTopic (换成想查询的 topic) |
查看 topic 路由 | ./bin/mqadmin topicRoute -n 127.0.0.1:9876 -t MyTopic |