• 多进程研发模式增强
  • 核心思想
  • 客户端接口类型抽象
  • 异常处理
  • 协议和调用时序
  • 具体的使用方法
  • 在框架里面 cluster-client 相关的配置项

    多进程研发模式增强

    在前面的多进程模型章节中,我们详细讲述了框架的多进程模型,其中适合使用 Agent 进程的有一类常见的场景:一些中间件客户端需要和服务器建立长连接,理论上一台服务器最好只建立一个长连接,但多进程模型会导致 n 倍(n = Worker 进程数)连接被创建。

    1. +--------+ +--------+
    2. | Client | | Client | ... n
    3. +--------+ +--------+
    4. | \ / |
    5. | \ / | n * m 个链接
    6. | / \ |
    7. | / \ |
    8. +--------+ +--------+
    9. | Server | | Server | ... m
    10. +--------+ +--------+

    为了尽可能的复用长连接(因为它们对于服务端来说是非常宝贵的资源),我们会把它放到 Agent 进程里维护,然后通过 messenger 将数据传递给各个 Worker。这种做法是可行的,但是往往需要写大量代码去封装接口和实现数据的传递,非常麻烦。

    另外,通过 messenger 传递数据效率是比较低的,因为它会通过 Master 来做中转;万一 IPC 通道出现问题还可能将 Master 进程搞挂。

    那么有没有更好的方法呢?答案是肯定的,我们提供一种新的模式来降低这类客户端封装的复杂度。通过建立 Agent 和 Worker 的 socket 直连跳过 Master 的中转。Agent 作为对外的门面维持多个 Worker 进程的共享连接。

    核心思想

    • 受到 Leader/Follower 模式的启发。
    • 客户端会被区分为两种角色:
      • Leader: 负责和远程服务端维持连接,对于同一类的客户端只有一个 Leader。
      • Follower: 会将具体的操作委托给 Leader,常见的是订阅模型(让 Leader 和远程服务端交互,并等待其返回)。
    • 如何确定谁是 Leader,谁是 Follower 呢?有两种模式:
      • 自由竞争模式:客户端启动的时候通过本地端口的争夺来确定 Leader。例如:大家都尝试监听 7777 端口,最后只会有一个实例抢占到,那它就变成 Leader,其余的都是 Follower。
      • 强制指定模式:框架指定某一个 Leader,其余的就是 Follower。
    • 框架里面我们采用的是强制指定模式,Leader 只能在 Agent 里面创建,这也符合我们对 Agent 的定位
    • 框架启动的时候 Master 会随机选择一个可用的端口作为 Cluster Client 监听的通讯端口,并将它通过参数传递给 Agent 和 App Worker。
    • Leader 和 Follower 之间通过 socket 直连(通过通讯端口),不再需要 Master 中转。

    新的模式下,客户端的通信方式如下:

    1. +-------+
    2. | start |
    3. +---+---+
    4. |
    5. +--------+---------+
    6. __| port competition |__
    7. win / +------------------+ \ lose
    8. / \
    9. +---------------+ tcp conn +-------------------+
    10. | Leader(Agent) |<---------------->| Follower(Worker1) |
    11. +---------------+ +-------------------+
    12. | \ tcp conn
    13. | \
    14. +--------+ +-------------------+
    15. | Client | | Follower(Worker2) |
    16. +--------+ +-------------------+

    客户端接口类型抽象

    我们将客户端接口抽象为下面两大类,这也是对客户端接口的一个规范,对于符合规范的客户端,我们可以自动将其包装为 Leader/Follower 模式。

    • 订阅、发布类(subscribe / publish):
      • subscribe(info, listener) 接口包含两个参数,第一个是订阅的信息,第二个是订阅的回调函数。
      • publish(info) 接口包含一个参数,就是订阅的信息。
    • 调用类 (invoke),支持 callback, promise 和 generator function 三种风格的接口,但是推荐使用 generator function。

    客户端示例

    1. const Base = require('sdk-base');
    2. class Client extends Base {
    3. constructor(options) {
    4. super(options);
    5. // 在初始化成功以后记得 ready
    6. this.ready(true);
    7. }
    8. /**
    9. * 订阅
    10. *
    11. * @param {Object} info - 订阅的信息(一个 JSON 对象,注意尽量不要包含 Function, Buffer, Date 这类属性)
    12. * @param {Function} listener - 监听的回调函数,接收一个参数就是监听到的结果对象
    13. */
    14. subscribe(info, listener) {
    15. // ...
    16. }
    17. /**
    18. * 发布
    19. *
    20. * @param {Object} info - 发布的信息,和上面 subscribe 的 info 类似
    21. */
    22. publish(info) {
    23. // ...
    24. }
    25. /**
    26. * 获取数据 (invoke)
    27. *
    28. * @param {String} id - id
    29. * @return {Object} result
    30. */
    31. async getData(id) {
    32. // ...
    33. }
    34. }

    异常处理

    • Leader 如果“死掉”会触发新一轮的端口争夺,争夺到端口的那个实例被推选为新的 Leader。
    • 为保证 Leader 和 Follower 之间的通道健康,需要引入定时心跳检查机制,如果 Follower 在固定时间内没有发送心跳包,那么 Leader 会将 Follower 主动断开,从而触发 Follower 的重新初始化。

    协议和调用时序

    Leader 和 Follower 通过下面的协议进行数据交换:

    1. 0 1 2 4 12
    2. +-------+-------+---------------+---------------------------------------------------------------+
    3. |version|req/res| reserved | request id |
    4. +-------------------------------+-------------------------------+-------------------------------+
    5. | timeout | connection object length | application object length |
    6. +-------------------------------+---------------------------------------------------------------+
    7. | conn object (JSON format) ... | app object |
    8. +-----------------------------------------------------------+ |
    9. | ... |
    10. +-----------------------------------------------------------------------------------------------+
    1. 在通讯端口上 Leader 启动一个 Local Server,所有的 Leader/Follower 通讯都经过 Local Server。
    2. Follower 连接上 Local Server 后,首先发送一个 register channel 的 packet(引入 channel 的概念是为了区别不同类型的客户端)。
    3. Local Server 会将 Follower 分配给指定的 Leader(根据客户端类型进行配对)。
    4. Follower 向 Leader 发送订阅、发布请求。
    5. Leader 在订阅数据变更时通过 subscribe result packet 通知 Follower。
    6. Follower 向 Leader 发送调用请求,Leader 收到后执行相应操作后返回结果。
    1. +----------+ +---------------+ +---------+
    2. | Follower | | Local Server | | Leader |
    3. +----------+ +---------------+ +---------+
    4. | register channel | assign to |
    5. + -----------------------> | --------------------> |
    6. | | |
    7. | subscribe |
    8. + ------------------------------------------------> |
    9. | publish |
    10. + ------------------------------------------------> |
    11. | |
    12. | subscribe result |
    13. | <------------------------------------------------ +
    14. | |
    15. | invoke |
    16. + ------------------------------------------------> |
    17. | invoke result |
    18. | <------------------------------------------------ +
    19. | |

    具体的使用方法

    下面我用一个简单的例子,介绍在框架里面如何让一个客户端支持 Leader/Follower 模式:

    • 第一步,我们的客户端最好是符合上面提到过的接口约定,例如:
    1. // registry_client.js
    2. const URL = require('url');
    3. const Base = require('sdk-base');
    4. class RegistryClient extends Base {
    5. constructor(options) {
    6. super({
    7. // 指定异步启动的方法
    8. initMethod: 'init',
    9. });
    10. this._options = options;
    11. this._registered = new Map();
    12. }
    13. /**
    14. * 启动逻辑
    15. */
    16. async init() {
    17. this.ready(true);
    18. }
    19. /**
    20. * 获取配置
    21. * @param {String} dataId - the dataId
    22. * @return {Object} 配置
    23. */
    24. async getConfig(dataId) {
    25. return this._registered.get(dataId);
    26. }
    27. /**
    28. * 订阅
    29. * @param {Object} reg
    30. * - {String} dataId - the dataId
    31. * @param {Function} listener - the listener
    32. */
    33. subscribe(reg, listener) {
    34. const key = reg.dataId;
    35. this.on(key, listener);
    36. const data = this._registered.get(key);
    37. if (data) {
    38. process.nextTick(() => listener(data));
    39. }
    40. }
    41. /**
    42. * 发布
    43. * @param {Object} reg
    44. * - {String} dataId - the dataId
    45. * - {String} publishData - the publish data
    46. */
    47. publish(reg) {
    48. const key = reg.dataId;
    49. let changed = false;
    50. if (this._registered.has(key)) {
    51. const arr = this._registered.get(key);
    52. if (arr.indexOf(reg.publishData) === -1) {
    53. changed = true;
    54. arr.push(reg.publishData);
    55. }
    56. } else {
    57. changed = true;
    58. this._registered.set(key, [reg.publishData]);
    59. }
    60. if (changed) {
    61. this.emit(key, this._registered.get(key).map(url => URL.parse(url, true)));
    62. }
    63. }
    64. }
    65. module.exports = RegistryClient;
    • 第二步,使用 agent.cluster 接口对 RegistryClient 进行封装:
    1. // agent.js
    2. const RegistryClient = require('registry_client');
    3. module.exports = agent => {
    4. // 对 RegistryClient 进行封装和实例化
    5. agent.registryClient = agent.cluster(RegistryClient)
    6. // create 方法的参数就是 RegistryClient 构造函数的参数
    7. .create({});
    8. agent.beforeStart(async () => {
    9. await agent.registryClient.ready();
    10. agent.coreLogger.info('registry client is ready');
    11. });
    12. };
    • 第三步,使用 app.cluster 接口对 RegistryClient 进行封装:
    1. // app.js
    2. const RegistryClient = require('registry_client');
    3. module.exports = app => {
    4. app.registryClient = app.cluster(RegistryClient).create({});
    5. app.beforeStart(async () => {
    6. await app.registryClient.ready();
    7. app.coreLogger.info('registry client is ready');
    8. // 调用 subscribe 进行订阅
    9. app.registryClient.subscribe({
    10. dataId: 'demo.DemoService',
    11. }, val => {
    12. // ...
    13. });
    14. // 调用 publish 发布数据
    15. app.registryClient.publish({
    16. dataId: 'demo.DemoService',
    17. publishData: 'xxx',
    18. });
    19. // 调用 getConfig 接口
    20. const res = await app.registryClient.getConfig('demo.DemoService');
    21. console.log(res);
    22. });
    23. };

    是不是很简单?

    当然,如果你的客户端不是那么『标准』,那你可能需要用到其他一些 API,比如,你的订阅函数不叫 subscribe 而是叫 sub

    1. class MockClient extends Base {
    2. constructor(options) {
    3. super({
    4. initMethod: 'init',
    5. });
    6. this._options = options;
    7. this._registered = new Map();
    8. }
    9. async init() {
    10. this.ready(true);
    11. }
    12. sub(info, listener) {
    13. const key = reg.dataId;
    14. this.on(key, listener);
    15. const data = this._registered.get(key);
    16. if (data) {
    17. process.nextTick(() => listener(data));
    18. }
    19. }
    20. ...
    21. }

    你需要通过 delegate(API代理)手动设置此委托:

    1. // agent.js
    2. module.exports = agent => {
    3. agent.mockClient = agent.cluster(MockClient)
    4. // 将 sub 代理到 subscribe 逻辑上
    5. .delegate('sub', 'subscribe')
    6. .create();
    7. agent.beforeStart(async () => {
    8. await agent.mockClient.ready();
    9. });
    10. };
    1. // app.js
    2. module.exports = app => {
    3. app.mockClient = app.cluster(MockClient)
    4. // 将 sub 代理到 subscribe 逻辑上
    5. .delegate('sub', 'subscribe')
    6. .create();
    7. app.beforeStart(async () => {
    8. await app.mockClient.ready();
    9. app.sub({ id: 'test-id' }, val => {
    10. // put your code here
    11. });
    12. });
    13. };

    我们已经理解,通过 cluster-client 可以让我们在不理解多进程模型的情况下开发『纯粹』的 RegistryClient,只负责和服务端进行交互,然后使用 cluster-client 进行简单的封装就可以得到一个支持多进程模型的 ClusterClient。这里的 RegistryClient 实际上是一个专门负责和远程服务通信进行数据通信的 DataClient

    大家可能已经发现,ClusterClient 同时带来了一些约束,如果想在各进程暴露同样的方法,那么 RegistryClient 上只能支持 sub/pub 模式以及异步的 API 调用。因为在多进程模型中所有的交互都必须经过 socket 通信,势必带来了这一约束。

    假设我们要实现一个同步的 get 方法,订阅过的数据直接放入内存,使用 get 方法时直接返回。要怎么实现呢?而真实情况可能比这更复杂。

    在这里,我们引入一个 APIClient 的最佳实践。对于有读取缓存数据等同步 API 需求的模块,在 RegistryClient 基础上再封装一个 APIClient 来实现这些与远程服务端交互无关的 API,暴露给用户使用到的是这个 APIClient 的实例。

    在 APIClient 内部实现上:

    • 异步数据获取,通过调用基于 ClusterClientRegistryClient 的 API 实现。
    • 同步调用等与服务端无关的接口在 APIClient 上实现。由于 ClusterClient 的 API 已经抹平了多进程差异,所以在开发 APIClient 调用到 RegistryClient 时也无需关心多进程模型。

    例如在模块的 APIClient 中增加带缓存的 get 同步方法:

    1. // some-client/index.js
    2. const cluster = require('cluster-client');
    3. const RegistryClient = require('./registry_client');
    4. class APIClient extends Base {
    5. constructor(options) {
    6. super(options);
    7. // options.cluster 用于给 Egg 的插件传递 app.cluster 进来
    8. this._client = (options.cluster || cluster)(RegistryClient).create(options);
    9. this._client.ready(() => this.ready(true));
    10. this._cache = {};
    11. // subMap:
    12. // {
    13. // foo: reg1,
    14. // bar: reg2,
    15. // }
    16. const subMap = options.subMap;
    17. for (const key in subMap) {
    18. this.subscribe(subMap[key], value => {
    19. this._cache[key] = value;
    20. });
    21. }
    22. }
    23. subscribe(reg, listener) {
    24. this._client.subscribe(reg, listener);
    25. }
    26. publish(reg) {
    27. this._client.publish(reg);
    28. }
    29. get(key) {
    30. return this._cache[key];
    31. }
    32. }
    33. // 最终模块向外暴露这个 APIClient
    34. module.exports = APIClient;

    那么我们就可以这么使用该模块:

    1. // app.js || agent.js
    2. const APIClient = require('some-client'); // 上面那个模块
    3. module.exports = app => {
    4. const config = app.config.apiClient;
    5. app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster });
    6. app.beforeStart(async () => {
    7. await app.apiClient.ready();
    8. });
    9. };
    10. // config.${env}.js
    11. exports.apiClient = {
    12. subMap: {
    13. foo: {
    14. id: '',
    15. },
    16. // bar...
    17. }
    18. };

    为了方便你封装 APIClient,在 cluster-client 模块中提供了一个 APIClientBase 基类,那么上面的 APIClient 可以改写为:

    1. const APIClientBase = require('cluster-client').APIClientBase;
    2. const RegistryClient = require('./registry_client');
    3. class APIClient extends APIClientBase {
    4. // 返回原始的客户端类
    5. get DataClient() {
    6. return RegistryClient;
    7. }
    8. // 用于设置 cluster-client 相关参数,等同于 cluster 方法的第二个参数
    9. get clusterOptions() {
    10. return {
    11. responseTimeout: 120 * 1000,
    12. };
    13. }
    14. subscribe(reg, listener) {
    15. this._client.subscribe(reg, listener);
    16. }
    17. publish(reg) {
    18. this._client.publish(reg);
    19. }
    20. get(key) {
    21. return this._cache[key];
    22. }
    23. }

    总结一下:

    1. +------------------------------------------------+
    2. | APIClient |
    3. | +----------------------------------------|
    4. | | ClusterClient |
    5. | | +---------------------------------|
    6. | | | RegistryClient |
    7. +------------------------------------------------+
    • RegistryClient - 负责和远端服务通讯,实现数据的存取,只支持异步 API,不关心多进程模型。
    • ClusterClient - 通过 cluster-client 模块进行简单 wrap 得到的 client 实例,负责自动抹平多进程模型的差异。
    • APIClient - 内部调用 ClusterClient 做数据同步,无需关心多进程模型,用户最终使用的模块。API 都通过此处暴露,支持同步和异步。

    有兴趣的同学可以看一下增强多进程研发模式 讨论过程。

    在框架里面 cluster-client 相关的配置项

    1. /**
    2. * @property {Number} responseTimeout - response timeout, default is 60000
    3. * @property {Transcode} [transcode]
    4. * - {Function} encode - custom serialize method
    5. * - {Function} decode - custom deserialize method
    6. */
    7. config.clusterClient = {
    8. responseTimeout: 60000,
    9. };
    配置项 类型 默认值 描述
    responseTimeout number 60000 (一分钟) 全局的进程间通讯的超时时长,不能设置的太短,因为代理的接口本身也有超时设置
    transcode function N/A 进程间通讯的序列化方式,默认采用 serialize-json(建议不要自行设置)

    上面是全局的配置方式。如果,你想对一个客户端单独做设置

    • 可以通过 app/agent.cluster(ClientClass, options) 的第二个参数 options 进行覆盖
    1. app.registryClient = app.cluster(RegistryClient, {
    2. responseTimeout: 120 * 1000, // 这里传入的是和 cluster-client 相关的参数
    3. }).create({
    4. // 这里传入的是 RegistryClient 需要的参数
    5. });
    • 也可以通过覆盖 APIClientBaseclusterOptions 这个 getter 属性
    1. const APIClientBase = require('cluster-client').APIClientBase;
    2. const RegistryClient = require('./registry_client');
    3. class APIClient extends APIClientBase {
    4. get DataClient() {
    5. return RegistryClient;
    6. }
    7. get clusterOptions() {
    8. return {
    9. responseTimeout: 120 * 1000,
    10. };
    11. }
    12. // ...
    13. }
    14. module.exports = APIClient;