1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
type OrderListener struct{
Code codes.Code
Detail string
ID int32
OrderAmount float32
}
func NewOrderListener() *OrderListener {
return &OrderListener{}
}
func (dl *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
var orderInfo model.OrderInfo
err:=json.Unmarshal(msg.Body,&orderInfo)
if err!=nil{
// 由于还没有执行,所以应该将之前的回宫
return primitive.RollbackMessageState
}
// 没有选中商品
if xxxx{
dl.Code = code.InvalidArgument
dl.Detail = "没有选中的商品"
// 没有执行sell之前都要回滚
return primitive.RollbackMessageState
}
// 跨服务调用商品微服务
if xxx{
dl.Code = code.Internal
dl.Detail = "批量查询商品信息是啊比"
// 没有执行sell之前都要回滚
return primitive.RollbackMessageState
}
//跨微服务调用库存微服务,
if xxx{
// 如果因为网络问题,如何避免误判
// sell 返回的状态码 是不是sell中列举出来的状态码就是网络的问题
// todo
dl.Code = code.Internal
dl.Detail = "库存扣减失败"
// 没有执行sell之前都要回滚
return primitive.RollbackMessageState
}
// 生成订单表
if xxx{
dl.Code = code.Internal
dl.Detail = "库存扣减失败"
// 订单创建失败就要归还库存
return primitive.CommitMessageState
}
// 批量插入订单物品的信息
if xxx{
dl.Code = code.Internal
dl.Detail = "插入订单信息失败"
// 就要归还了、
// 订单创建失败就要归还库存
return primitive.CommitMessageState
}
// 发送延时消息
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
msg := primitive.NewMessage("order_timeout", msg.Body)
msg.WithDelayTimeLevel(5) // 设置延迟的级别
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
// rollback
dl.Code = code.Internal
dl.Detail = "发送延时消息失败"
return primitive.RollbackMessageState
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
// 本地事务提交 commit
dl.Code = code.OK
return primitive.RollbackMessageState
}
func (dl *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Println("rocketMQ 的消息回查")
var orderInfo model.OrderInfo
err:=json.Unmarshal(msg.Body,&orderInfo)
if err!=nil{
// 由于还没有执行,所以应该将之前的回宫
return primitive.RollbackMessageState
}
// 查询订单是否存在
if xxx{
// 如果订单找不到
// 本地事务执行失败了,就要归还库存
return primitive.CommitMessageState
}
time.Sleep(time.Second * 4)
return primitive.RollbackMessageState
}
....
func (o *OrderServer)CreateOrder(ctx context.context req *proto.OrderRequest)(*proto.OrderInfoResponse ,error){
orderListener:=NewOrderListener()
// 运行完就能拿到orderListener中的信息
p, err := rocketmq.NewTransactionProducer(
orderListener,
// 先写死
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
producer.WithRetry(1),
)
if err != nil {
global.Logger.Errorf("生成producer失败: %s\n", err.Error())
return nil,err
}
err = p.Start()
if err != nil {
global.Logger.Errorf("启动 producer 失败: %s\n", err.Error())
return nil,err
}
order:=model.OrderInfo{
OrderID:GenerateOrderID(req,UserID)
Address:...
....
}
jsonString,err:= json.Marshal(order)
if err != nil {
global.Logger.Errorf("序列化失败: %s\n", err.Error())
return nil,err
}
res, err := p.SendMessageInTransaction(
context.Background(),
primitive.NewMessage("order_reback", jsonString)
)
if err != nil {
global.Logger.Errorf("序列化失败: %s\n", err.Error())
return nil,status.Error(codes.Internal,"消息发送失败")
}
if orderListener.Code!=codes.OK{
return nil,status.Error(orderListener.Code,"新建订单失败")
}
return &proto.OrderInfoResponse{...},ni;
// 回查
time.Sleep(5 * time.Minute)
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
|