rocketmq安装
以安装目前rocket最新版本4.9.4为例,我的安装目录是:/var/www/data/rocketMq,进入该目录执行如下命令,下载二进制版本:
curl -O https://dlcdn.apache.org/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
解压:
unzip rocketmq-all-4.9.4-bin-release.zip
目录重命名:
mv rocketmq-all-4.9.4-bin-release rocketmq-4.9.4
在rocketmq-4.9.4目录下创建日志目录:
cd rocketmq-4.9.4
mkdir logs
在rocketmq-4.9.4目录下创建存储目录:
mkdir store
进入到存储目录store,创建commitlog、consumequeue、index目录
cd store
mkdir commitlog
mkdir consumequeue
mkdir index
我们以单点配置模式为例,进入到如下目录:
/var/www/data/rocketMq/rocketmq-4.9.4/conf/2m-2s-async
目录下的文件说明:
broker-a.properties :单结点a配置文件
broker-a-s.properties :单结点a的从结点配置文件
broker-b.properties :单结点b配置文件
broker-b-s.properties :单结点b的从结点配置文件
修改broker-a.properties的内容如下:
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerId=0
#nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876;
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
brokerIP1=127.0.0.1
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/var/www/data/rocketMq/rocketmq-4.9.4/store
#commitLog 存储路径
storePathCommitLog=/var/www/data/rocketMq/rocketmq-4.9.4/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/var/www/data/rocketMq/rocketmq-4.9.4/store/consumequeue
#消息索引存储路径
storePathIndex=/var/www/data/rocketMq/rocketmq-4.9.4/store/index
#checkpoint 文件存储路径
storeCheckpoint=/var/www/data/rocketMq/rocketmq-4.9.4/store/checkpoint
#abort 文件存储路径
abortFile=/var/www/data/rocketMq/rocketmq-4.9.4/store/abort
#限制的消息大小
maxMessageSize=65536
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
进入conf 目录,替换所有xml中的${user.home},保证日志路径正确
sed -i ‘s#原字符串#新字符串#g’ 替换的文件
/var/www/data/rocketMq/rocketmq-4.9.4
修改 runbroker.sh,调整内存大小(这里只是演示用途,实际的内存大小要以生产环境为准)
找到 “JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g”,调整为512m
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
修改 修改 runserver.sh,调整内存大小(这里只是演示用途,实际的内存大小要以生产环境为准)
找到 “JAVA_OPT=”${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m””,修改为
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
启动mqnamesrv
./mqnamesrv
启动mqbroker
./mqbroker -c /var/www/data/rocketMq/rocketmq-4.9.4/conf/2m-2s-async/broker-a.properties
由于是前台运行的模式启动,关闭mqnamesrv和mqbroker直接前台中断执行即可。若有后台启动和关闭可请用nohup命令。
nohup mqnamesrv >/dev/null 2>&1 &
nohup mqbroker -c /var/www/data/rocketMq/rocketmq-4.9.4/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
参考:
https://mp.weixin.qq.com/s/ROyOh-SVBnssrqrG8FRGxw
Spring Boot快速集成RocketMQ
本文使用的是目前最新版本2.2.2,核心依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
生产者模块pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.youyijiu</groupId>
<version>0.0.1-SNAPSHOT</version>
<artifactId>youyijiu-test-mq-producer</artifactId>
<!-- 基于spring boot 2.6.4 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- rocketMq集成 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
<!-- Package as an executable jar -->
<build>
<plugins>
<!-- Compiler 插件, 设定JDK版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<!--配置生成源码包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
生产者模块application.properties内容:
##web端口
server.port=8080
rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-group1
rocketmq.producer.sendMessageTimeout=300000
测试的MQ用到的常量
/**
* @Title: MqTopicConstant.java
* @Package com.youyijiu.mq.consumer
* @author Jun.Yang 24696026@qq.com
* @date 2022年8月13日 下午2:42:41
*/
package com.youyijiu.mq.producer;
/**
* @ClassName: MqTopicConstant
* @Description: TODO(这里用一句话描述这个类的作用)
* @author Jun.Yang 24696026@qq.com
* @date 2022年8月13日 下午2:42:41
*
*/
public class MqTopicConstant {
/**
* 示例消息队列,test-top-youyijiu-topic个
*/
public static final String DEMO_TOPIC = "test-top-youyijiu-topic";
/**
* 注册tag
*/
public static final String DEMO_TAG_REGISTERED = "registered";
/**
* 修改tag
*/
public static final String DEMO_TAG_MODIFY = "modify";
/**
* consumer group
*/
public static final String DEMO_CONSUMER_GROUP_REGISTERED = "test-top-youyijiu-group_registered";
public static final String DEMO_CONSUMER_GROUP_MODIFY = "test-top-youyijiu-group_modify";
}
生产者发送测试
/**
* @Title: TestApp.java
* @Package com.youyijiu.mq
* @author Jun.Yang 24696026@qq.com
* @date 2022年8月13日 下午2:23:30
*/
package com.youyijiu.mq;
import javax.annotation.Resource;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import com.youyijiu.mq.producer.MqTopicConstant;
/**
* @ClassName: TestApp
* @Description: TODO(这里用一句话描述这个类的作用)
* @author Jun.Yang 24696026@qq.com
* @date 2022年8月13日 下午2:23:30
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class TestApp {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Test
public void send(){
rocketMQTemplate.send(MqTopicConstant.DEMO_TOPIC + ":" + MqTopicConstant.DEMO_TAG_REGISTERED,
MessageBuilder.withPayload("Hello, World! I'm from youyijiu-test-mq-producer 1 ").build());
rocketMQTemplate.send(MqTopicConstant.DEMO_TOPIC + ":" + MqTopicConstant.DEMO_TAG_REGISTERED,
MessageBuilder.withPayload("Hello, World! I'm from youyijiu-test-mq-producer 2 ").build());
rocketMQTemplate.send(MqTopicConstant.DEMO_TOPIC + ":" + MqTopicConstant.DEMO_TAG_REGISTERED,
MessageBuilder.withPayload("Hello, World! I'm from youyijiu-test-mq-producer 3 ").build());
}
}
消费者消费
/**
* @Title: MqRegisteredListener.java
* @Package com.youyijiu.mq.consumer
* @author Jun.Yang 24696026@qq.com
* @date 2022年8月13日 下午2:34:03
*/
package com.youyijiu.mq.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* @ClassName: MqRegisteredListener
* @Description: TODO(这里用一句话描述这个类的作用)
* @author Jun.Yang 24696026@qq.com
* @date 2022年8月13日 下午2:34:03
*
*/
@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC,
consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED,
selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)
public class MqRegisteredListener implements RocketMQListener<String> {
private static final Logger log = LoggerFactory
.getLogger(MqRegisteredListener.class);
/*
* (non-Javadoc)
*
* @see
* org.apache.rocketmq.spring.core.RocketMQListener#onMessage(java.lang.
* Object)
*/
@Override
public void onMessage(String arg0) {
log.info("received registered message: {}", arg0);
}
}
上述的消费者和生产者只是简单测试下安装rocketMq是否可用,spring boot引入rocketMq变得非常方便。
参考:
Spring Boot快速集成RocketMQ实战教程
https://mp.weixin.qq.com/s/3LQLDT_X6q0EoFdjZVIaIg