Micro如何接收web端的发布实现后台订阅?

本文已被阅读过 Posted by 陌无崖 on 2019-08-16

导语

在web开发中有一种情况,我们或许希望在发送http请求的同时,后台服务订阅了该http请求,并对消息作出相应的处理,该怎么做呢?我们之前学过broker模式,这种模式可以在两个后台服务进行消息的发布和订阅,其实我们仍然可以利用这一点。

客户端

定义一个主题发布消息

我们定义了一个字符串,用topic变量进行接收。

1
2
3
var (
topic = "go.micro.web.topic.hi"
)

发布消息

使用broker.Message存储我们消息,同时使用broker.Publish()发布我们的消息,等待订阅者接收消息。这里需要注意的一个函数time.Now().String()代表获取当前时间并转换成字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
func pub(name string) {
msg := &broker.Message{
Header: map[string]string{
"name": fmt.Sprintf("%s", name),
},
Body: []byte(fmt.Sprintf("%s:%s", name, time.Now().String())),
}
if err := broker.Publish(topic, msg); err != nil {
log.Logf("[pub] 发布消息:%s", err)
} else {
log.Logf("[pub] 发布消息: %s", string(msg.Body))
}
}

定义Handler

因为我们将要使用web端作为客户端发送请求,因此需要一个handler接收我们的请求,并做出响应。首先我们添加头信息,解析我们的参数,并把它存储在response变量中,该变量的类型为map[string]interface{},函数体中的time.Now().UnixNano()代表获取当前时间戳,单位为纳秒。另外还需要注意的是json.NewEncoder(w)将会创建一个将数据写入w的*Encoder。在之前我们经常将数据转换成[]byte类型,并使用w.write(body)进行写入w,这种比较麻烦,现在可以使用这种方式就可以将json数据写入w中,而Encode函数将会对参数进行json编码并同时写入我们之前创建的*Encoder,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 定义一个handler
func hi(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json;charset=utf-8")
_ = r.ParseForm()
// 返回结果
response := map[string]interface{}{
"ref": time.Now().UnixNano(),
"data": "Hello!" + r.Form.Get("name"),
}
//返回json结构
// NewEncoder创建一个将数据写入w的*Encoder。
// Encode将v的json编码写入输出流,并会写入一个换行符,
if err := json.NewEncoder(w).Encode(response); err != nil {
http.Error(w, err.Error(), 500)
return
}

pub(r.Form.Get("name"))
}

创建发布服务

函数体中告诉我们该服务将会监听8088的端口

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建一个服务
service := web.NewService(
web.Name("go.micro.book.web.pub"),
web.Version("latest"),
web.Address(":8088"),
)
// 初始化我们的服务
_ = service.Init()
service.HandleFunc("/hi", hi)
// 运行服务
if err := service.Run(); err != nil {
log.Fatal(err)
}

订阅

订阅主题

同样我们也需要声明我们将要订阅什么主题,与客户端保持一致即可

1
2
3
var (
topic = "go.micro.web.topic.hi"
)

订阅消息

首先需要实例化一个broker,并为broker设置一个监听地址,然后我们使用Subscribe函数实现我们的订阅

1
2
3
4
5
6
7
bk := broker.NewBroker(
broker.Addrs(fmt.Sprintf("%s:%d", "192.168.10.150", 11089)),
)
_, err := bk.Subscribe(topic, func(p broker.Event) error {
log.Logf("[sub]:Received Body: %s,Header:%s", string(p.Message().Body), p.Message().Header)
return nil
})

因为这仍然是一个服务,我们同样需要进行创建服务

1
2
3
4
5
6
7
8
s := micro.NewService(
micro.Name("go.micro.book.web.sub"),
micro.Version("latest"),
micro.Address(":8099"),
micro.Broker(bk),
)
s.Init()
_ = s.Run()

测试

启动

发送http请求

控制台打印




非常感谢你保持着耐心读完这篇文章,我是陌无崖,一个专注于Golang后端开发的互联网从业人员,熟悉RabbitMQ,Docker,微服务等,获取更多知识分享,文章末尾扫码关注,每日推送,准时获取更多分享。

本文代码参考:

micro教程系列:https://github.com/micro-in-cn/tutorials/blob/

推荐阅读


本文欢迎转载,转载请联系作者,谢谢!


打开微信扫一扫,关注微信公众号