ᕕ( ᐛ )ᕗ Jimyag's Blog

从 0 到 1 实现完整的微服务框架-RocketMQ

在介绍分布式事务的时候我们介绍过 MQ(消息队列),只是简单的提了一下,这篇文章会从 mQ 的基本概念、RocketMQ 的概念,使用 RocketMQ 等几个方面介绍 RocketMQ。

MQ 使用场景

什么是 MQ

消息队列是一种先进先出的数据结构

应用场景

其应用场景主要包含以下三个方面

应用解耦

系统的耦合性越高,容错性就越低。

以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

流量削峰

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。

出于经济考量目的:业务系统正常时段的 QPS 如果是 1000,流量最高峰是 10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰。

数据分发

对与 A 系统,其他系统都要 A 系统的服务,在不使用 MQ 时,如果要新增一个系统来拿到 A 系统的信息,这时候就要改 A 系统的代码,让他能和 E 系统通信。如果其中的一个系统不想要 A 系统了,这时候还要删除与它相关的代码。

image-20220404152357287

而引入 MQ 之后,A 系统将自己的产生的数据发送到 MQ 中,新系统想要数据就直接从 MQ 中消费,原来的系统如果不需要这些数据了,就可以不接受。

image-20220404152759967

缺点

缺点包含以下几点:

RocketMQ 的安装

新建配置文件

./data/brokerconf/broker.conf写入以下内容

# 所属集群名字
brokerClusterName=DefaultCluster

# broker 名字注意此处不同的配置文件填写的不一样如果在 broker-a.properties 使用: broker-a,
#  broker-b.properties 使用: broker-b
brokerName=broker-a

# 0 表示 Master> 0 表示 Slave
brokerId=0

# nameServer地址分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 启动IP,如果 docker  com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);解决方式2 brokerIP1 设置宿主机IP不要使用docker 内部IP
brokerIP1=192.168.0.2 #如果没有报错就不用管

# 在发送消息时自动创建服务器不存在的topic默认创建的队列数
defaultTopicQueueNums=4

# 是否允许 Broker 自动创建 Topic建议线下开启线上关闭 !!!这里仔细看是 falsefalsefalse
autoCreateTopicEnable=true

# 是否允许 Broker 自动创建订阅组建议线下开启线上关闭
autoCreateSubscriptionGroup=true

# Broker 对外服务的监听端口
listenPort=10911

# 删除文件时间点默认凌晨4点
deleteWhen=04

# 文件保留时间默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 根据业务情况调整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128

brokerIP1是本机 ip

新建docker-compost.yml

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
    networks:
        rmq:
          aliases:
            - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
      - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8080:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

启动rocketmq

docker-compose up

测试

在 rocket-consle 中发送消息。

浏览器中输入http://localhost:8080

选择Topic->ADD/UPDATE->选择默认的 Cluster 和 BrokerName,输入自己的 topic name–test->commit。右上角出现 Success 就是 topic 已经建立成功。

找到自己新建的 topic 名称,->SEND MAGGAGE->设置 tag、key 和内容->commit

在 Message 中通过 topic 筛选消息,可以看到已经发布的消息了。

RocketMQ 的基本概念

image-20220404170309043

Producer:消息的发送者,消息的生产者。举例发信人 Consumer: 消息接收者,消息的消费者。举例:收信者 Broker: 暂存和传输消息。如果要拿具体的信息首先要从 nameserver 中拿具体的 broker 信息,再到具体的 broker 中获取信息;举例:邮局 NameServer: 管理 Broker,类似于注册中心,同步存数据的各个结点,但是这个集群不用做数据同步。原因:??; 举例:各个邮局的管理机构 Topic: 区分消息的种类;一个发送者可以发送消息给一 个或者多个 Topic;一 - 个消息的接收者可以订阅一个或者多个 Topic 消息。类似你可以把你写的文章放到不同的平台,看文章的人可以从多个平台看文章 Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息

RocketMQ 的消息类型

按照发送的特点分

同步发送

a.同步发送,线程阻塞,投递 completes 阻塞结束 I b.如果发送失败,会在默认的超时时间 3 秒内进行重试,最多重试 2 次 c.投递 completes 不代表投递成功,要 check SendResult.sendStatus 来判断是否投递成功 d. SendResult 里面有发送状态的枚举:SendStatus, 同步的消息投递有一个状态返回值的

public enum SendStatus {
	SEND_ OK; // 成功
	FLUSH_ DISK_ TIMEOUT;
    FLUSH_ SLAVE TIMEOUT;
	SLAVE_ NOT_ AVAILABLE;
}

e. retry 的实现原理:只有 ack 的 SendStatus=SEND_ OK 才会停止 retry 注意事项:发送同步消息且 Ack 为 SEND. OK,只代表该消息成功的写入了 MQ 当中,并不代表该消息成功的被 Consumer 消费了。

异步发送

a.异步调用的话,当前线程一定要等待异步线程回调结束再关闭 producer 啊,因为是异步的,不会阻塞提前关闭 producer 会导致未回调链接就断开了。 b.异步消息不 retry,投递失败回调 onException() 方法,只有同步消息才会 retry。 C.异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

单向发送

a.消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功 b.此方式发送消息的过程耗时非常短,一般在微秒级别

下表概括了三者的特点和主要区别

发送方式 发送 TPS(性能测试指标) 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 没有 可能会丢失

按照使用功能特点分

普通消息 (订阅)

普通消息是我们在业务开发中用到的最多的消息类型,生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景。不能够保证顺序。

image-20220404192826869

顺序消息

顺序消息分为分区顺序消息和全局顺序消息。

全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合我们的 FIFO。

很多时候全局消息的实现代价很大,所以就出现了分区顺序消息。

我们通过对消息的 key,进行 hash,相同 hash 的消息会被分配到同一个分区里面,当然如果要做全局顺序消息,我们的分区只需要 - 一个即可,所以全局顺序消息的代价是比较大的。

延时消息

主要用来订单超时库存归还。

延迟的机制是在服务端实现的,也就是 Broker 收到了消息,但是经过一段时间以后才发送服务器按照 1-N 定义了如下级别:“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”。

若要发送定时消息,在应用层初始化 Message 消息对象之后,调用 Message.setDelayTimeLevel(int level) 方法来设置延迟级别,按照序列取相应的延迟级别,例如 level=2,则延迟为 5s。

msg.setDelayLevel(2);
SendResult sendResult = producer.send(msg)

实现原理: a.发送消息的时候如果消息设置了DelayTimeLevel,那么该消息会被丢到ScheduleMessageService.SCHEDULE_ TOPIC这个 Topic 里面 b.根据DelayTimeLevel选择对应的 queue C.再把真实的 topic 和 queue 信息封装起来,set 到 msg 里面 d.然后每个SCHEDULE TOPIC_ XXX的每个DelayTimeLevelQueue,有定时任务去刷新,是否有待投递的消息 e.每 10s 定时持久化发送进度

事务消息

消息队列 RocketMQ 版提供的分布式事务消息适用于所有对数据最终一致性有 强需求的场景。

概念介绍

事务消息:消息队列 RocketMQ 版提供类似 X 或 Open XA 的分布式事务功能,通过消息队列 RocketMQ 版事务消息能达到分布式事务的最终一致。

半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态 (Commit 或是 Rollback),该询问过程即消息回查。

分布式事务消息的优势

消息队列 RocketMQ 版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

Go 操作 RocketMQ

RocketMQ 发送普通消息

package main

import (
	"context"
	"fmt"
	"os"
	"strconv"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

// Package main implements a simple producer to send message.
func main() {
	p, err := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
		producer.WithRetry(2),
	)

	if err != nil {
		fmt.Printf("new producer error: %s", err.Error())
		os.Exit(1)
	}
	err = p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	topic := "hellomq"

	for i := 0; i < 10; i++ {
		msg := &primitive.Message{
			Topic: topic,
			Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
		}
		res, err := p.SendSync(context.Background(), msg)

		if err != nil {
			fmt.Printf("send message error: %s\n", err)
		} else {
			fmt.Printf("send message success: result=%s\n", res.String())
		}
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}
}

RocketMQ 消费消息

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	// 服务器有数据会推给回调
	c, _ := rocketmq.NewPushConsumer(
		// GroupName 多个实例负载均衡
		consumer.WithGroupName("testGroup"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
	)
	err := c.Subscribe("hellomq", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range msgs {
			fmt.Printf("subscribe callback: %v \n", msgs[i])
		}

		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}

RocketMQ 发送延迟消息

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
	p, _ := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
		producer.WithRetry(2),
	)
	err := p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	for i := 0; i < 10; i++ {
		msg := primitive.NewMessage("hellomq", []byte("Hello RocketMQ Go Client!"))
		msg.WithDelayTimeLevel(2) // 设置延迟的级别
		res, err := p.SendSync(context.Background(), msg)

		if err != nil {
			fmt.Printf("send message error: %s\n", err)
		} else {
			fmt.Printf("send message success: result=%s\n", res.String())
		}
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}

}

我们在下单之后,超时就要取消库存 定时执行逻辑 轮询的问题是多久执行一次,这里我们假设超时的时间是半个小时。 程序在 12:00 执行了一次,我在 12.01 下单了,12:30 又执行,下次执行就是 13:00,本来 12:31 就超时了,该回收了,但却等到了 13:00 才超时,在这 29 分钟内这是库存就不能被别人买。 如果一分钟轮询一次,无用功太多了。 我们使用延迟消息时间一到就执行,消息中包含了订单编号,只用查询这种编号就行了,不用轮询也减少数据库的压力。

RocketMQ 事务消息

package main

import (
   "context"
   "fmt"
   "os"
   "strconv"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
)

type DemoListener struct {
}

func NewDemoListener() *DemoListener {
   return &DemoListener{}
}

func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
   fmt.Println("开始执行本地逻辑")
   time.Sleep(time.Second * 3)
   fmt.Println("执行本地逻辑成功")
   //return primitive.CommitMessageState
   //return primitive.RollbackMessageState
   // 本地执行逻辑无缘无故失败 代码异常、宕机
   return primitive.UnknowState
}

func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
   fmt.Println("rocketMQ 的消息回查")
   time.Sleep(time.Second * 4)
   return primitive.CommitMessageState
}

func main() {
   p, _ := rocketmq.NewTransactionProducer(
      NewDemoListener(),
      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
      producer.WithRetry(1),
   )
   err := p.Start()
   if err != nil {
      fmt.Printf("start producer error: %s\n", err.Error())
      os.Exit(1)
   }

   for i := 0; i < 1; i++ {
      res, err := p.SendMessageInTransaction(context.Background(),
         primitive.NewMessage("transaction", []byte("Hello RocketMQ again "+strconv.Itoa(i))))

      if err != nil {
         fmt.Printf("send message error: %s\n", err)
      } else {
         fmt.Printf("send message success: result=%s\n", res.String())
      }
   }
   // 回查
   time.Sleep(5 * time.Minute)
   err = p.Shutdown()
   if err != nil {
      fmt.Printf("shutdown producer error: %s", err.Error())
   }
}

参考

docker-compose 安装 RocketMQ

rocketmq-client-go/examples at master · apache/rocketmq-client-go (github.com)

#微服务 #分布式 #MQ #RocketMQ