首页 » Web前端 » natsphp技巧_goim 文章系列 2从goim定制浅谈 go interface 解耦合与gRPC

natsphp技巧_goim 文章系列 2从goim定制浅谈 go interface 解耦合与gRPC

访客 2024-11-20 0

扫一扫用手机浏览

文章目录 [+]

github 上的 issue 在这里github.com/Terry-Mao/g…

简要解释一下 golang 的 interface: 在 吴德宝AllenWu 文章Golang interface接口深入理解 中这样写到:

natsphp技巧_goim 文章系列  2从goim定制浅谈 go interface 解耦合与gRPC

为什么要用接口呢?在Gopher China 上的分享中,有大神给出了下面的情由:

natsphp技巧_goim 文章系列  2从goim定制浅谈 go interface 解耦合与gRPC
(图片来自网络侵删)

writing generic algorithm (类似泛型编程)

hiding implementation detail (隐蔽详细实现)

providing interception points (供应拦截点-----> 也可称叫供应 HOOKS , 一个插入其他业务逻辑的钩子)

换个办法说, interface 便是 de-couple 解耦合在 golang 中的履行, 这是当代编程中比较主要的"分层, 解耦合" 架构设计方法

在QQ群"golang中国" 中, 有关于 de-couple 解耦合的话题中, 闪侠这样说到:

这里, 就来看看 interface 如何实现 goim 从 kafka 转到 NATS

1. goim 中的 kafka

看图, 不说话, 哈哈

上图中,

在 logic 这个网元中, 有 logic 向 kafka 的发布在 job 网元中, job 从 kafka 订阅, 再赂 comet 网元分发

那我们的目标很大略了, 换了!!! ----------> 等等.......能保留原有 kafka 实现不? 在必要时, 可以利用开关项, 切换 nats 或 kafka ??

当然......可以!

2. Don't talk, show me the code!!

下面就比较大略, 看码

2.1 发布接口第一步, 阅读原代码

先看源代码( 把稳下面代码中的注释)

代码在 github.com/Terry-Mao/g… 大约第33行

// PushMids push a message by mid.func (l Logic) PushMids(c context.Context, op int32, mids []int64, msg []byte) (err error) {keyServers, _, err := l.dao.KeysByMids(c, mids)if err != nil {return}keys := make(map[string][]string)for key, server := range keyServers {if key == "" || server == "" {log.Warningf("push key:%s server:%s is empty", key, server)continue}keys[server] = append(keys[server], key)}for server, keys := range keys { // // 紧张向 kafka 发送, 是下面这一行 // l.dao.PushMsg(c, op, server, keys, msg) // 方法名是 PushMsg //if err = l.dao.PushMsg(c, op, server, keys, msg); err != nil {return}}return}复制代码

再看一下 dao 是什么:

代码在 github.com/Terry-Mao/g… 大约第20行

// Logic structtype Logic struct {c conf.Configdis naming.Discovery////// 下面这个 dao.Dao 供应了 PushMsg 方法// 带个星, 这是个引用////dao dao.Dao// onlinetotalIPs int64totalConns int64roomCount map[string]int32// load balancernodes []naming.InstanceloadBalancer LoadBalancerregions map[string]string // province -> region}复制代码

末了, 重点来了, 查到 dao 源头实现

下面是我们须要扩展的地方, 在 github.com/Terry-Mao/g…中 dao, 这名称很 java (DAO-------> Data Access Objects 数据存取工具), 这里也解释了 bilibili 们在代码纺织上, 挺规范

代码在 github.com/Terry-Mao/g… 大约第10行开始

// Dao dao.type Dao struct {c conf.Config//// // 下面这个 kafkaPub 很清楚, 是 kafka 的同步发布者 kafka.SyncProducer// // 这个是我们要换成 interface 的地方//// //kafkaPub kafka.SyncProducerredis redis.PoolredisExpire int32}// New new a dao and return.func New(c conf.Config) Dao {d := &Dao{c: c,// // // 下面这个 newKafkaPub(c.Kafka) 即是初始化 kafka // 也便是连接上 kafka // 下面, 我们先改写一下这个函数, 变通一下代码形式 // // //kafkaPub: newKafkaPub(c.Kafka),redis: newRedis(c.Redis),redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),}return d}// 这是连接 kafka 的初化函数( function ) // func newKafkaPub(c conf.Kafka) kafka.SyncProducer {kc := kafka.NewConfig()kc.Producer.RequiredAcks = kafka.WaitForAll // Wait for all in-sync replicas to ack the messagekc.Producer.Retry.Max = 10 // Retry up to 10 times to produce the messagekc.Producer.Return.Successes = truepub, err := kafka.NewSyncProducer(c.Brokers, kc)if err != nil {panic(err)}return pub}复制代码

这里, 先小改一下 func New(c conf.Config) Dao 这个函数 改成如下代码形式

// New new a dao and return.func New(c conf.Config) Dao {d := &Dao{c: c,//// // 把稳, 下面这行被移出去 // kafkaPub: newKafkaPub(c.Kafka), // //redis: newRedis(c.Redis),redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),}// // 变成这样了, 功能没变革 //d.kafkaPub = newKafkaPub(c.Kafka)return d}复制代码2.2 发布接口第二步, 检讨一下哪个方法( method )须要被 interface 实现

还是看源代码

代码在 github.com/Terry-Mao/g… 大约第13行开始

// PushMsg push a message to databus.func (d Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) {pushMsg := &pb.PushMsg{Type: pb.PushMsg_PUSH,Operation: op,Server: server,Keys: keys,Msg: msg,}b, err := proto.Marshal(pushMsg)if err != nil {return}//// //// 实际发布, 便是下面这个几行语句// 1. 组织一下须要发送的信息, 以 kafka 的发布接口哀求的形式// 2. 考试测验发布信息, 处理发布信息可能的缺点//// 重点把稳下面这几行, 后面会改掉// 重点把稳下面这几行, 后面会改掉// 重点把稳下面这几行, 后面会改掉//// //m := &sarama.ProducerMessage{Key: sarama.StringEncoder(keys[0]),Topic: d.c.Kafka.Topic,Value: sarama.ByteEncoder(b),}if _, _, err = d.kafkaPub.SendMessage(m); err != nil {log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)}return}// BroadcastRoomMsg push a message to databus.func (d Dao) BroadcastRoomMsg(c context.Context, op int32, room string, msg []byte) (err error) {pushMsg := &pb.PushMsg{Type: pb.PushMsg_ROOM,Operation: op,Room: room,Msg: msg,}b, err := proto.Marshal(pushMsg)if err != nil {return}m := &sarama.ProducerMessage{Key: sarama.StringEncoder(room),Topic: d.c.Kafka.Topic,Value: sarama.ByteEncoder(b),}//// // 实际发布, 便是下面这个语句// //if _, _, err = d.kafkaPub.SendMessage(m); err != nil {log.Errorf("PushMsg.send(broadcast_room pushMsg:%v) error(%v)", pushMsg, err)}return}复制代码2.3 换用 interface 实现这个 SendMessage(m) 方法( method )

先上代码, 代码会说话( golang 大略就在这里, 代码会说话 ) , 后加解释

// PushMsg interface for kafka / nats // 这里是新加的 interface 定义 type PushMsg interface {PublishMessage(topic, ackInbox string, key string, msg []byte) error // 这里小改了个方法名!!! 把稳Close() error}// Dao dao.type Dao struct {c conf.Configpush PushMsg // 看这里 redis redis.PoolredisExpire int32}// New new a dao and return.func New(c conf.Config) Dao {d := &Dao{c: c,redis: newRedis(c.Redis),redisExpire: int32(time.Duration(c.Redis.Expire) / time.Second),}if c.UseNats { // 在配置中加一个 bool 布尔值的开关项 d.push = NewNats(c) // 这里支持 nats } else {d.push = NewKafka(c) //// 这里是原来的 kafka }return d}复制代码

kafka 实现 interface 接口的代码

// Dao dao.type kafkaDao struct {c conf.Configpush kafka.SyncProducer}// New new a dao and return.func NewKafka(c conf.Config) kafkaDao {d := &kafkaDao{c: c,push: newKafkaPub(c.Kafka),}return d}// PublishMessage push message to kafkafunc (d kafkaDao) PublishMessage(topic, ackInbox string, key string, value []byte) error {m := &kafka.ProducerMessage{Key: sarama.StringEncoder(key),Topic: d.c.Kafka.Topic,Value: sarama.ByteEncoder(value),}_, _, err := d.push.SendMessage(m)return err}复制代码

nats 对 interface 的实现

// natsDao dao for natstype natsDao struct {c conf.Configpush nats.Conn}// New new a dao and return.func NewNats(c conf.Config) natsDao {conn, err := newNatsClient(c.Nats.Brokers, c.Nats.Topic, c.Nats.TopicID)if err != nil {return nil}d := &natsDao{c: c,push: conn,}return d}// PublishMessage push message to natsfunc (d natsDao) PublishMessage(topic, ackInbox string, key string, value []byte) error {if d.push == nil {return errors.New("nats error")}msg := &nats.Msg{Subject: topic, Reply: ackInbox, Data: value}return d.push.PublishMsg(msg)}复制代码

末了, 调用 interface 的变更

// PushMsg push a message to databus.func (d Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) {pushMsg := &pb.PushMsg{Type: pb.PushMsg_PUSH,Operation: op,Server: server,Keys: keys,Msg: msg,}b, err := proto.Marshal(pushMsg)if err != nil {return}//// //// 实际发布, 便是下面这个几行语句// 1. 组织一下须要发送的信息, 以 kafka 的发布接口哀求的形式// 2. 考试测验发布信息, 处理发布信息可能的缺点//// 重点把稳下面这几行, 实际变动// 重点把稳下面这几行, 实际变动// 重点把稳下面这几行, 实际变动//// if err = d.push.PublishMessage(d.c.Kafka.Topic, d.c.Nats.AckInbox, keys[0], b); err != nil {log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)}return}复制代码

OK, 修正完成

2.4 小结2.4.1 接口定义 (带命名的方法凑集)

简明来说, interface 接口定义一下名称, 再定义接口中要实现的方法 method ( 方法凑集 )

// PushMsg interface for kafka / nats // 这里是新加的 interface 定义 type PushMsg interface {PublishMessage(topic, ackInbox string, key string, msg []byte) error // 这里小改了个方法名!!! 把稳Close() error}// Dao dao.type Dao struct {c conf.Configpush PushMsg // 看这里 redis redis.PoolredisExpire int32}复制代码

上面 定义了 PushMsg 这个interface , 这是一个 方法( method)凑集

2.4.2 方法定义与实现方法名 , 比如 PublishMessageinput 数据, 便是这些 topic, ackInbox string, key string, msg []byte, 分别是

topic 这是 kafka 或 nats 里的主题, 也便是 pub/sub 发布/订阅的频道 ackInbox 这是 publish 发布的 confirm 确认频道 key 体( payload ) 的键 msg 这是体 payload

ouput 数据, 这里是 error , 标示 PublishMessage 方法( method ) 的输出

这便是一个接口定义, 方法名/ 输入/ 输出, 至于方法的详细实现, 交由下面的实体去实现( 可以看 kafka / nats 等分别对应的 PublishMessage 的方法实现)

2.4.3 接口实例化, 以便后面方法调用

很清楚, 方法是由详细实现来完成, 下面便是实例化方法

是用哪一个详细实现呢, 就看实例化哪一个了, interface 终极落地, 就在这里

if c.UseNats { // 在配置中加一个 bool 布尔值的开关项 d.push = NewNats(c) // 这里支持 nats } else {d.push = NewKafka(c) //// 这里是原来的 kafka }复制代码

而在 func (d Dao) PushMsg(c context.Context, op int32, server string, keys []string, msg []byte) (err error) 中, 则大略调用 interface 定义的方法

2.4.4 接口方法调用

与其他方法 method 或函数 function 是一样的, 没什么特殊的

// if err = d.push.PublishMessage(d.c.Kafka.Topic, d.c.Nats.AckInbox, keys[0], b); err != nil {log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)}复制代码3. 浅谈 golang 的 interface --> 解耦合!!

再一次回看,

在 吴德宝AllenWu 文章Golang interface接口深入理解 中这样写到:

为什么要用接口呢?在Gopher China 上的分享中,有大神给出了下面的情由:

writing generic algorithm (类似泛型编程)

hiding implementation detail (隐蔽详细实现)

providing interception points (供应拦截点-----> 也可称叫供应 HOOKS , 一个插入其他业务逻辑的钩子)

interface 确是隐蔽了详细实现, 能让我们很随意马虎的把 goim 对 kafka 的依赖, 切换到 nats , 并且通过一个开关项, 来确定利用哪一个详细实现

扩展一下, 这个 interface 也可以实现从 kafka 切换到 rabbitMQ / activeMQ / redis (pub/sub) .... 只要大略实现 PushMsg 这个 interface 就好啦

4. 源代码及其他补充

另有 goim 在 job 网元上的 subscribe 订阅接口, 支持 interface 代码是一路子方法, 直接看源码吧, 有互换谈论再另写.

注: job 代码中, 我把某个方法( method ) 拆解成了函数( function ), 有兴趣的朋友可以查一下, 有些小差异,但效果一样.

goim 源代码在github.com/Terry-Mao/g…

我写的代码在github.com/tsingson/go…

下面是 2019/04/23 补充内容:

经网上互换, 另一位朋友 weisd 改写的 goim, 支持 nsq 的 interface, 代码组织得比我好啊:

支持 nsql 作为 kafka 替代代码独立了一个 brocker , 封装得很不错

代码在这里 github.com/weisd/goim

5. 扩展, 看看 gRPC 中的解耦合

gRPC , 便是 google 的 RPC ( Remote Procedure Call) , 看一下 gRPC 以 go 实现的 interface 定义

5.1 先看原始的 protobuf 定义

protobuf 是 gRPC 中默认的 接口定义, 就像 爱立信 ICE ( 开源版本是 zeroICE ) 的 slice , apache 的 thrift

在 goim 中, 网元间用 gRPC 通讯, 再看图

看图上的 grpc 标示, 把稳, 图上标示箭头不完备准确:

grpc 同时支持

普通 Client / Server 调用(北向)接口 Client 向 Server 的流式(北向)流式接口 Server 向 Cinet 调用(南向)流式接口 以及 Server / Client 双向流式接口

网上文章很多, 不一一展开了. 我们重点关注一下, golang 中对 gRPC 的实现, 也便是 golang 如何把 protobuf 定义的接口, 定义为 golang 中的 interface , 以及如何详细实现 interface .

看码, 看码, 看码:

源码在github.com/Terry-Mao/g…

syntax = "proto3";package goim.comet;option go_package = "grpc";//......//// // 这里定义 input 输入message PushMsgReq { repeated string keys = 1; int32 protoOp = 3; Proto proto = 2;}//// // 这里定义 output 输出 message PushMsgReply {}//.........service Comet { // .......... //PushMsg push by key or mid // // // 这里定义接口, 这个接口可以由 // golang / java / rust / js / python / php ...实现 // // 这是解耦合的极致啊!!!!!!!!!!!!!!!! // // // rpc PushMsg(PushMsgReq) returns (PushMsgReply); // Broadcast send to every enrity // ...........}复制代码5.2 gRPC 中 go 实现的 interface 定义

把稳, 下面的源码是 protobuf 自动天生的, 不须要编辑变动, 注释是方便沟通额外加的

源码在 github.com/Terry-Mao/goim

// Server API for Comet service// // 这里定义接口, golang 实现做事器端// type CometServer interface { ...// PushMsg push by key or mid// // // 这里定义接口, golang 的接口中的方法 // //PushMsg(context.Context, PushMsgReq) (PushMsgReply, error) ...}复制代码5.3 gRPC 中 go 实现的 interface 实例化

末了, 详细实例化代码实现, 在

github.com/Terry-Mao/g…

代码会说话儿, 这里就不展示了.

6. 郑重警告

感激朋友们看到末了, 写码挣钱的朋友都是有一说一, 这里声明一下:

代码中把 kafka 写成可用 nats 更换, 只是技能上的学习与考试测验, 并不是建议或推举利用 nats:

nats 并不保障投递nats 并不供应持久化nats 用在 goim 上的效率, 还须要压测

以是, case by case , 详细业务场景详细剖析, 商用项目的选型, 是一个慎重而严谨的事儿

请自行评估风险/本钱

.

.

感谢 www.bilibili.com & 毛剑 及浩瀚开源社区的朋友们

欢迎互换与批评..... .

7. 补充

有朋友问了些不太干系问题, 公开加一下:

golang 的编辑/ IDE 我用 jetbrains goland , 代码重构最是省时省脑, 我是JB 百口桶付用度户, 不阐明流程图用 omnigraffle, 号称苹果上的 visio本机调试用 docker有关架构设计中的接口, 请参考 面向接口编程 / IOC (Inversion Of Control) 掌握反转 / 以及 DIP (Dependency inversion principle) 依赖颠倒, 网上资料很多, 个人认为是 java 精华所在 (注:近2年我不写 java 了, 有关java的事, 高人很多)

发一张老图儿(几年前的项目了), omnigraffle 画的, 这软件挺好用( 只有 mac 版本 )

关于作者:

网名 tsingson (三明智, 江湖人称3爷)

原 ustarcom IPTV/OTT 奇迹部播控产品线技能架构湿/办理方案工程湿角色(8年), 自由职业者,

喜好音乐(口琴,是第三/四/五届广东国际口琴嘉年华的主策划人之一), 拍照与越野,

喜好 golang 措辞 (商用项目中紧张用 postgres + golang )

tsingson 写于中国深圳 小罗号口琴音乐中央, 2019/04/22

作者:tsingson链接:https://juejin.im/post/5cbd380c5188250a97133649来源:掘金著作权归作者所有。
商业转载请联系作者得到授权,非商业转载请注明出处。

相关文章

房山第一探寻历史文化名区的魅力与发展

房山区,位于北京市西南部,历史悠久,文化底蕴深厚。作为北京市的一个重要组成部分,房山区的发展始终与首都的发展紧密相连。房山区积极推...

Web前端 2025-02-18 阅读1 评论0

手机话费开钻代码数字时代的便捷生活

我们的生活越来越离不开手机。手机话费作为手机使用过程中的重要组成部分,其充值方式也在不断创新。手机话费开钻代码应运而生,为用户提供...

Web前端 2025-02-18 阅读1 评论0

探寻专业奥秘如何查询自己专业的代码

计算机科学已成为当今社会不可或缺的一部分。掌握一门专业代码对于个人发展具有重要意义。面对繁杂的学科体系,如何查询自己专业的代码成为...

Web前端 2025-02-18 阅读1 评论0