-
Notifications
You must be signed in to change notification settings - Fork 174
Open
Description
这个例子是使用WaitGroup编排多个goroutine并发完成用户多个标签数据的查询工作。至于每个标签应该怎么查询则由具体的标签查询实现类型实现Picker接口的PickTagValueForUser
方法,在方法里实现自己标签的查询逻辑。
下面的Picker接口约定了每个标签查询器必须要实现的两个方法
type Picker interface {
// 用于查询用户的标签值
PickTagValueForUser (userId int64, args ... interface{})
// 通知查询到的标签值
Notify () <-chan interface{}
}
resolveTagPicker
函数用于通过标签名解析出每个对应标签查询器对象。
// 根据标签名解析出对应的TagPicker
func resolveTagPicker (tagName string) Picker {
switch tagName {
case TAG_ORDER_NUM: // 这是个常量 值是字符串order_num
return &OrderNumTagPicker{
TagName: tagName,
ValueCh: make(chan interface{}),
}
default:
return nil
}
}
下面是对外提供的对多个用户标签进行并发查询的查询方法,BulkQueryUserTagValue
会根据要查询的多个用户标签每个都开启一个goroutine 执行queryTagValue
方法。 queryTagValue
方法就是每个标签查询器执行的goroutine,它会根据标签标识实例化出相应的标签查询器,然后再开启一个goroutine执行标签查询器实现的PickTagValueForUser
方法,查询到后会通过标签查询器的Channel获得标签值。这里再开一个goroutine去执行PickTagValueForUser
方法的原因是要做好查询器的超时处理。
queryTagValue
会同时接收标签查询器查询到的结果值(通过Notify方法返回的Channel)和ctx.Done() 这个Channel。 如果在ctx.Done()通道接收到值时还没有从查询器的Channel接收到标签值,则视为超时。
// 对外提供的批量查询用户标签值的方法
func BulkQueryUserTagValue(tagNames []string, userId int64, queryArgs ...interface{}) (tagValuePairs []*TagValuePair) {
tagCount := len(tagNames)
if tagCount < 1 {
return
}
wg := &sync.WaitGroup{}
wg.Add(tagCount)
tagValueCh := make(chan *TagValuePair, tagCount) // 用于接收所有Picker查到的标签值的Channel
ctx, _ := context.WithTimeout(context.Background(), time.Minute) // 设置执行标签值查找的超时时间
for _, tagName := range tagNames {
go queryTagValue(ctx, wg, tagName, userId, tagValueCh, queryArgs...)
}
wg.Wait()
close(tagValueCh) // 先关闭通道 方便下面for range不发生阻塞, 从channel中读完值即退出
tagValuePairs = make([]*TagValuePair, 0)
for tagValue := range tagValueCh {
if tagValue.Value != nil {
tagValuePairs = append(tagValuePairs, tagValue)
}
}
return tagValuePairs
}
type TagValuePair struct {
Name string `json:"tag_name"`
Value interface{} `json:"tag_value"`
}
func queryTagValue(ctx context.Context, wg *sync.WaitGroup, tagName string, userId int64, tagValueCh chan *TagValuePair, queryArgs ...interface{}) {
defer wg.Done()
tagPicker := resolveTagPicker(tagName)
if tagPicker == nil {
dlog.Error("未识别的业务标签", common.ErrUnknownBusinessTag)
return
}
go tagPicker.PickTagValueForUser(userId, queryArgs...)
select {
case <- ctx.Done(): // 超时返回
return
case tagValue := <- tagPicker.Notify(): // 接收标签值
TagValuePair := &TagValuePair{
Name: tagName,
Value: tagValue,
}
tagValueCh <- TagValuePair
return
}
}
最后就是具体标签查询器的实现了,每个标签都有自己的实现逻辑,下面是OrderNumTagPicker
的示例代码:
type OrderNumTagPicker struct {
TagName string
ValueCh chan interface{}
}
type TradeNoInfo struct {
TradeNo string `json:"trade_no"`
}
func (picker *OrderNumTagPicker) PickTagValueForUser(userId int64, args ...interface{}) {
// 用类型转换得到交易号
// tradeNo, ok := args[0].(string)
extInfoJson, _ := args[0].(string)
if err := json.Unmarshal([]byte(extInfoJson), &TradeNoInfo); err != nil {
// log.Error自己实现
log.Error("PayTotalTagPickerError", "Invalid arg", args[0])
// 结束执行并通知外部
picker.ValueCh <- nil
return
}
// 这里就打印下参数值,标签查询的具体逻辑自己实现
fmt.Println(userId)
fmt.Println(TradeNo)
// 查询到的用户的标签 (假设交易号下有10个订单)
picker.ValueCh <- 10
}
func (picker *OrderNumTagPicker) Notify () <-chan interface{} {
return picker.ValueCh
}
完整的源代码参考: https://github.com/kevinyan815/gocookbook/tree/master/codes/tag_picker
Metadata
Metadata
Assignees
Labels
No labels