• 8.1 创建消息队列

    8.1 创建消息队列

    首先,处理消息队列的部分,我们应该集成到MsgHandler模块下,因为属于我们消息模块范畴内的

    zinx/znet/msghandler.go

    1. type MsgHandle struct {
    2. Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性
    3. WorkerPoolSize uint32 //业务工作Worker池的数量
    4. TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
    5. }
    6. func NewMsgHandle() *MsgHandle {
    7. return &MsgHandle{
    8. Apis: make(map[uint32]ziface.IRouter),
    9. WorkerPoolSize:utils.GlobalObject.WorkerPoolSize,
    10. //一个worker对应一个queue
    11. TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
    12. }
    13. }

    这里添加两个成员

    WokerPoolSize:作为工作池的数量,因为TaskQueue中的每个队列应该是和一个Worker对应的,所以我们在创建TaskQueue中队列数量要和Worker的数量一致。

    TaskQueue真是一个Request请求信息的channel集合。用来缓冲提供worker调用的Request请求信息,worker会从对应的队列中获取客户端的请求数据并且处理掉。

    当然WorkerPoolSize最好也可以从GlobalObject获取,并且zinx.json配置文件可以手动配置。

    zinx/utils/globalobj.go

    1. /*
    2. 存储一切有关Zinx框架的全局参数,供其他模块使用
    3. 一些参数也可以通过 用户根据 zinx.json来配置
    4. */
    5. type GlobalObj struct {
    6. /*
    7. Server
    8. */
    9. TcpServer ziface.IServer //当前Zinx的全局Server对象
    10. Host string //当前服务器主机IP
    11. TcpPort int //当前服务器主机监听端口号
    12. Name string //当前服务器名称
    13. /*
    14. Zinx
    15. */
    16. Version string //当前Zinx版本号
    17. MaxPacketSize uint32 //都需数据包的最大值
    18. MaxConn int //当前服务器主机允许的最大链接个数
    19. WorkerPoolSize uint32 //业务工作Worker池的数量
    20. MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量
    21. /*
    22. config file path
    23. */
    24. ConfFilePath string
    25. }
    26. //...
    27. //...
    28. /*
    29. 提供init方法,默认加载
    30. */
    31. func init() {
    32. //初始化GlobalObject变量,设置一些默认值
    33. GlobalObject = &GlobalObj{
    34. Name: "ZinxServerApp",
    35. Version: "V0.4",
    36. TcpPort: 7777,
    37. Host: "0.0.0.0",
    38. MaxConn: 12000,
    39. MaxPacketSize: 4096,
    40. ConfFilePath: "conf/zinx.json",
    41. WorkerPoolSize: 10,
    42. MaxWorkerTaskLen: 1024,
    43. }
    44. //从配置文件中加载一些用户配置的参数
    45. GlobalObject.Reload()
    46. }