• 测试前准备
  • 操作流程
  • 消息路由测试
    • OpenEdge 启动
    • MQTTBOX 建立连接
    • 消息路由验证

    测试前准备

    声明

    - 本文测试所用设备系统为MacOS- 模拟MQTT client行为的客户端为MQTTBOX- 本文所用镜像为依赖OpenEdge源码自行编译所得,具体请查看如何从源码构建镜像

    与基于本地Hub模块实现设备间消息转发不同的是,本文在Hub模块的基础上,引入Function函数计算模块及具体执行所需的Python27 runtime模块,将接收到的消息交给Python27 runtime来处理(Python27 runtime会调用具体的函数脚本来执行具体计算、分析、处理等),然后将处理结果以主题方式反馈给Hub模块,最终订阅该主题的MQTT client(要求MQTT client事先订阅该主题)将会收到该处理结果。

    本地Hub模块的配置项信息不再赘述,详情查看使用Hub进行设备间消息转发,这里主要介绍新引入的Function函数计算模块相关配置(Python函数的运行环境构建参考OpenEdge设计),完整的配置参考函数计算模块配置。

    提示:凡是在rules消息路由配置项中出现、用到的函数,必须在functions配置项中进行函数执行具体配置,否则将不予启动。

    本文将以TCP连接方式为例,展示本地Function模块的消息处理、计算功能。

    操作流程

    • Step1:依据使用需求编写配置文件信息,然后以Docker容器模式启动OpenEdge可执行程序;
    • Step2:通过MQTTBOX以TCP方式与OpenEdge Hub建立连接;
      • 若成功与OpenEdge Hub模块建立连接,则依据配置的主题权限信息向有权限的主题发布消息,同时向拥有订阅权限的主题订阅消息,并观察OpenEdge日志信息;
        • 若OpenEdge日志显示已经启动Python runtime模块,则表明发布的消息受到了预期的函数处理;
        • 若OpenEdge日志显示未成功启动Python runtime模块,则重复上述操作,直至看到OpenEdge主程序成功启动了Python runtime模块。
      • 若与OpenEdge Hub建立连接失败,则重复Step2操作,直至MQTTBOX与OpenEdge Hub成功建立连接为止。
    • Step3:通过MQTTBOX查看对应主题消息的收发状态。
      基于Function模块实现设备消息处理流程

    消息路由测试

    本文测试使用的本地Hub及Function模块的相关配置信息如下:

    1. name: localhub
    2. listen:
    3. - tcp://:1883
    4. principals:
    5. - username: 'test'
    6. password: 'be178c0543eb17f5f3043021c9e5fcf30285e557a4fc309cce97ff9ca6182912'
    7. permissions:
    8. - action: 'pub'
    9. permit: ['#']
    10. - action: 'sub'
    11. permit: ['#']
    12. # 本地Function模块配置:
    13. name: localfunc
    14. hub:
    15. address: tcp://localhub:1883
    16. username: test
    17. password: hahaha
    18. rules:
    19. - id: rule-e1iluuac1
    20. subscribe:
    21. topic: t
    22. qos: 1
    23. compute:
    24. function: sayhi
    25. publish:
    26. topic: t/hi
    27. qos: 1
    28. functions:
    29. - id: func-nyeosbbch
    30. name: 'sayhi'
    31. runtime: 'python27'
    32. handler: 'sayhi.handler'
    33. codedir: 'var/db/openedge/module/func-nyeosbbch'
    34. entry: "openedge-function-runtime-python27:build"
    35. env:
    36. USER_ID: acuiot
    37. instance:
    38. min: 0
    39. max: 10
    40. timeout: 1m

    如上配置,假若MQTTBOX基于上述配置信息已与本地Hub模块建立连接,向主题“t”发送的消息将会交给“sayhi”函数处理,然后将处理结果以主题“t/hi”发布回Hub模块,这时订阅主题“t/hi”的MQTT client将会接收到这条处理后的消息。

    OpenEdge 启动

    Step1所述,以Docker容器模式启动OpenEdge,通过观察OpenEdge启动日志可以发现Hub、Function模块均已被成功加载,具体如下图示。

    OpenEdge加载、启动日志

    同样,我们也可以通过执行命令docker ps查看系统当前正在运行的docker容器列表,具体如下图示。

    通过`docker ps`命令查看系统当前运行docker容器列表

    经过对比,不难发现,本次OpenEdge启动时已经成功加载了Hub、Function两个容器模块。

    MQTTBOX 建立连接

    本次测试中,我们采用TCP连接方式对MQTTBOX进行连接信息配置,然后点击“Add subscriber”按钮订阅主题“t/hi”,该主题用于接收经python函数“sayhi”处理之后的结果数据,具体如下图示。

    MQTTBOX连接配置

    上图显示,MQTTBOX已经成功订阅了主题“t/hi”。

    消息路由验证

    根据上文所述,这里我们利用python函数“sayhi”对主题“t”的消息进行处理,并将结果反馈给主题“t/hi”。那么,首先,需要获悉的就是处理函数“sayhi.py”的具体信息,具体如下示:

    1. #!/usr/bin/env python
    2. #-*- coding:utf-8 -*-
    3. """
    4. module to say hi
    5. """
    6. import os
    7. import time
    8. import threading
    9. def handler(event, context):
    10. """
    11. function handler
    12. """
    13. event['USER_ID'] = os.environ['USER_ID']
    14. event['functionName'] = context['functionName']
    15. event['functionInvokeID'] = context['functionInvokeID']
    16. event['invokeid'] = context['invokeid']
    17. event['messageQOS'] = context['messageQOS']
    18. event['messageTopic'] = context['messageTopic']
    19. event['py'] = '你好,世界!'
    20. return event

    可以发现,在接收到某Json格式的消息后,函数“sayhi.py”会对其进行一系列处理,然后将处理结果返回。返回的结果中包括:环境变量“USER_ID”、函数名称“functionName”、函数调用ID“functionInvokeID”、输入消息主题“messageTopic”、输入消息消息QoS“messageQOS”等字段。

    这里,我们通过MQTTBOX将消息“{"id":10}”发布给主题“t”,然后观察主题“t/hi”的接收消息情况,具体如下图示。

    MQTTBOX成功接收到经python函数处理之后的消息,且结果与上面的分析结果吻合。由此,我们完成了消息路由的处理测试。

    此外,我们这时可以观察OpenEdge的日志及再次执行命令docker ps查看系统当前正在运行的容器列表,其结果如下图示。

    运用python函数处理消息时OpenEdge日志

    通过docker ps命令查看系统当前正在运行的容器列表

    从上述两张图片中可以看出,除了OpenEdge启动时已加载的Hub、Function模块容器,在利用python函数“sayhi.py”对主题“t”消息进行处理时,系统还启动、并运行了Python Runtime模块,其主要用于对消息作运行时处理(各类模块加载、启动细节可参见OpenEdge设计)。

    最后更新于 2018-12-28 10:23:09

    原文: https://openedge.tech/docs/tutorials/local/Message-processing-with-function-module