• gkafka
    • 模块安装
    • 使用方式
    • 接口文档
    • 使用示例
      • 生产者
      • 消费者

    gkafka

    gkafka模块实现了对kafka消息队列系统的客户端功能封装,支持分组消费指定起始位置等特性,并提供简便易用的API接口。

    模块安装

    1. go get -u github.com/gogf/gkafka

    或者使用go.mod:

    1. require github.com/gogf/gkafka latest

    使用方式

    1. import "github.com/gogf/gkafka"

    接口文档

    godoc.org/github.com/gogf/gkafka

    使用示例

    生产者

    1. package main
    2. import (
    3. "fmt"
    4. "github.com/gogf/gkafka"
    5. "time"
    6. )
    7. func newKafkaClientProducer(topic string) *gkafka.Client {
    8. kafkaConfig := gkafka.NewConfig()
    9. kafkaConfig.Servers = "localhost:9092"
    10. kafkaConfig.AutoMarkOffset = false
    11. kafkaConfig.Topics = topic
    12. return gkafka.NewClient(kafkaConfig)
    13. }
    14. func main() {
    15. client := newKafkaClientProducer("test")
    16. defer client.Close()
    17. for {
    18. s := time.Now().String()
    19. fmt.Println("produce:", s)
    20. if err := client.SyncSend(&gkafka.Message{Value: []byte(s)}); err != nil {
    21. fmt.Println(err)
    22. }
    23. time.Sleep(time.Second)
    24. }
    25. }

    消费者

    1. package main
    2. import (
    3. "fmt"
    4. "github.com/gogf/gkafka"
    5. )
    6. func newKafkaClientConsumer(topic, group string) *gkafka.Client {
    7. kafkaConfig := gkafka.NewConfig()
    8. kafkaConfig.Servers = "localhost:9092"
    9. kafkaConfig.AutoMarkOffset = false
    10. kafkaConfig.Topics = topic
    11. kafkaConfig.GroupId = group
    12. return gkafka.NewClient(kafkaConfig)
    13. }
    14. func main() {
    15. group := "test-group"
    16. topic := "test"
    17. client := newKafkaClientConsumer(topic, group)
    18. defer client.Close()
    19. for {
    20. if msg, err := client.Receive(); err != nil {
    21. fmt.Println(err)
    22. break
    23. } else {
    24. fmt.Println("consume:", msg.Partition, msg.Offset, string(msg.Value))
    25. msg.MarkOffset()
    26. }
    27. }
    28. }