• EasySwoole Tcp服务
    • 创建tcp服务
      • 粘包问题
      • 粘包解决
    • 实现粘包处理
      • 服务端:
      • 客户端:
    • tcp控制器实现
      • 协议规则与解析
      • 实现控制器 Index.php
    • 开启子服务
    • 测试客户端
    • HTTP往TCP推送

    EasySwoole Tcp服务

    tcp 服务以及tcp客户端 demo:https://github.com/easy-swoole/demo/tree/3.x-subtcp

    创建tcp服务

    通过EasySwooleEvent.php文件的mainServerCreate 事件,进行添加子服务监听,例如:

    1. <?php
    2. public static function mainServerCreate(EventRegister $register)
    3. {
    4. $server = ServerManager::getInstance()->getSwooleServer();
    5. ################# tcp 服务器1 没有处理粘包 #####################
    6. $subPort1 = $server->addlistener('0.0.0.0', 9502, SWOOLE_TCP);
    7. $subPort1->set(
    8. [
    9. 'open_length_check' => false,//不验证数据包
    10. ]
    11. );
    12. $subPort1->on('connect', function (\swoole_server $server, int $fd, int $reactor_id) {
    13. echo "tcp服务1 fd:{$fd} 已连接\n";
    14. $str = '恭喜你连接成功服务器1';
    15. $server->send($fd, $str);
    16. });
    17. $subPort1->on('close', function (\swoole_server $server, int $fd, int $reactor_id) {
    18. echo "tcp服务1 fd:{$fd} 已关闭\n";
    19. });
    20. $subPort1->on('receive', function (\swoole_server $server, int $fd, int $reactor_id, string $data) {
    21. echo "tcp服务1 fd:{$fd} 发送消息:{$data}\n";
    22. });
    23. }

    粘包问题

    由于tcp的特性,可能会出现数据粘包情况,例如

    • A连接Server
    • A发送 hello
    • A又发送了一条 hello
    • Server可能会一次性收到一条”hellohello”的数据
    • Server也可能收到”he” ,”llohello”类似这样的中断数据

    粘包解决

    • 通过标识EOF,例如http协议,通过\r\n\r\n 的方式去表示该数据已经完结,我们可以自定义一个协议,例如当接收到 “结尾666” 字符串时,代表该字符串已经结束,如果没有获取到,则存入缓冲区,等待结尾字符串,或者如果获取到多条,则通过该字符串剪切出其他数据
    • 定义消息头,通过特定长度的消息头进行获取,例如我们定义一个协议,前面10位字符串都代表着之后数据主体的长度,那么我们传输数据时,只需要000000000512346(前10位为协议头,表示了这条数据的大小,后面的为数据),每次我们读取只先读取10位,获取到消息长度,再读取消息长度那么多的数据,这样就可以保证数据的完整性了.(但是为了不被混淆,协议头也得像EOF一样标识)
    • 通过pack二进制处理,相当于于方法2,将数据通过二进制封装拼接进消息中,通过验证二进制数据去读取信息,sw采用的就是这种方式

    可查看swoole官方文档:https://wiki.swoole.com/wiki/page/287.html

    实现粘包处理

    服务端:

    1. <?php
    2. ################# tcp 服务器2 没有处理粘包 #####################
    3. $subPort2 = $server->addlistener('0.0.0.0', 9503, SWOOLE_TCP);
    4. $subPort2->set(
    5. [
    6. 'open_length_check' => true,
    7. 'package_max_length' => 81920,
    8. 'package_length_type' => 'N',
    9. 'package_length_offset' => 0,
    10. 'package_body_offset' => 4,
    11. ]
    12. );
    13. $subPort2->on('connect', function (\swoole_server $server, int $fd, int $reactor_id) {
    14. echo "tcp服务2 fd:{$fd} 已连接\n";
    15. $str = '恭喜你连接成功服务器2';
    16. $server->send($fd, pack('N', strlen($str)) . $str);
    17. });
    18. $subPort2->on('close', function (\swoole_server $server, int $fd, int $reactor_id) {
    19. echo "tcp服务2 fd:{$fd} 已关闭\n";
    20. });
    21. $subPort2->on('receive', function (\swoole_server $server, int $fd, int $reactor_id, string $data) {
    22. echo "tcp服务2 fd:{$fd} 发送原始消息:{$data}\n";
    23. echo "tcp服务2 fd:{$fd} 发送消息:" . substr($data, '4') . "\n";
    24. });

    客户端:

    1. <?php
    2. /**
    3. * Created by PhpStorm.
    4. * User: Tioncico
    5. * Date: 2019/3/6 0006
    6. * Time: 16:22
    7. */
    8. include "../vendor/autoload.php";
    9. define('EASYSWOOLE_ROOT', realpath(dirname(getcwd())));
    10. \EasySwoole\EasySwoole\Core::getInstance()->initialize();
    11. /**
    12. * tcp 客户端2,验证数据包,并处理粘包
    13. */
    14. go(function () {
    15. $client = new \Swoole\Client(SWOOLE_SOCK_TCP);
    16. $client->set(
    17. [
    18. 'open_length_check' => true,
    19. 'package_max_length' => 81920,
    20. 'package_length_type' => 'N',
    21. 'package_length_offset' => 0,
    22. 'package_body_offset' => 4,
    23. ]
    24. );
    25. if (!$client->connect('127.0.0.1', 9503, 0.5)) {
    26. exit("connect failed. Error: {$client->errCode}\n");
    27. }
    28. $str = 'hello world';
    29. $client->send(encode($str));
    30. $data = $client->recv();//服务器已经做了pack处理
    31. var_dump($data);//未处理数据,前面有4 (因为pack 类型为N)个字节的pack
    32. $data = decode($data);//需要自己剪切解析数据
    33. var_dump($data);
    34. // $client->close();
    35. });
    36. /**
    37. * 数据包 pack处理
    38. * encode
    39. * @param $str
    40. * @return string
    41. * @author Tioncico
    42. * Time: 9:50
    43. */
    44. function encode($str)
    45. {
    46. return pack('N', strlen($str)) . $str;
    47. }
    48. function decode($str)
    49. {
    50. $data = substr($str, '4');
    51. return $data;
    52. }

    tcp控制器实现

    协议规则与解析

    在本文档中,传输json数据 使用pack N进行二进制处理,json数据有3个参数,例如:

    1. {"controller":"Index","action":"index","param":{"name":"\u4ed9\u58eb\u53ef"}}

    实现解析器Parser.php

    1. <?php
    2. /**
    3. * Created by PhpStorm.
    4. * User: Tioncico
    5. * Date: 2018/10/17 0017
    6. * Time: 9:10
    7. */
    8. namespace App\TcpController;
    9. use EasySwoole\Socket\Bean\Caller;
    10. use EasySwoole\Socket\Bean\Response;
    11. use EasySwoole\Socket\AbstractInterface\ParserInterface;
    12. use EasySwoole\Utility\CommandLine;
    13. class Parser implements ParserInterface
    14. {
    15. public function decode($raw, $client): ?Caller
    16. {
    17. $data = substr($raw, '4');
    18. //为了方便,我们将json字符串作为协议标准
    19. $data = json_decode($data, true);
    20. $bean = new Caller();
    21. $controller = !empty($data['controller']) ? $data['controller'] : 'Index';
    22. $action = !empty($data['action']) ? $data['action'] : 'index';
    23. $param = !empty($data['param']) ? $data['param'] : [];
    24. $controller = "App\\TcpController\\{$controller}";
    25. $bean->setControllerClass($controller);
    26. $bean->setAction($action);
    27. $bean->setArgs($param);
    28. return $bean;
    29. }
    30. /**
    31. * 只处理pack,json交给控制器
    32. * encode
    33. * @param Response $response
    34. * @param $client
    35. * @return string|null
    36. * @author Tioncico
    37. * Time: 10:33
    38. */
    39. public function encode(Response $response, $client): ?string
    40. {
    41. return pack('N', strlen($response->getMessage())) . $response->getMessage();
    42. }
    43. }

    实现控制器Index.php

    1. <?php
    2. /**
    3. * Created by PhpStorm.
    4. * User: Tioncico
    5. * Date: 2018/10/17 0017
    6. * Time: 9:15
    7. */
    8. namespace App\TcpController;
    9. use App\Rpc\RpcServer;
    10. use EasySwoole\EasySwoole\ServerManager;
    11. use EasySwoole\EasySwoole\Swoole\Task\TaskManager;
    12. use EasySwoole\Socket\AbstractInterface\Controller;
    13. use http\Env\Response;
    14. class Index extends Controller{
    15. function actionNotFound(?string $actionName)
    16. {
    17. $this->response()->setMessage("{$actionName} not found \n");
    18. }
    19. public function index(){
    20. $this->response()->setMessage(time());
    21. }
    22. public function args()
    23. {
    24. $this->response()->setMessage('your args is:'.json_encode($this->caller()->getArgs()).PHP_EOL);
    25. }
    26. public function delay()
    27. {
    28. $client = $this->caller()->getClient();
    29. TaskManager::async(function ()use($client){
    30. sleep(1);
    31. ServerManager::getInstance()->getSwooleServer()->send($client->getFd(),'this is delay message at '.time());
    32. });
    33. }
    34. public function close()
    35. {
    36. $this->response()->setMessage('you are goging to close');
    37. $client = $this->caller()->getClient();
    38. TaskManager::async(function ()use($client){
    39. sleep(2);
    40. ServerManager::getInstance()->getSwooleServer()->send($client->getFd(),'this is delay message at '.time());
    41. });
    42. }
    43. public function who()
    44. {
    45. $this->response()->setMessage('you fd is '.$this->caller()->getClient()->getFd());
    46. }
    47. }

    开启子服务

    EasySwooleEvent中注册。

    1. <?php
    2. public static function mainServerCreate(EventRegister $register)
    3. {
    4. ############# tcp 服务器3 tcp控制器实现+处理粘包############
    5. $subPort3 = $server->addListener(Config::getInstance()->getConf('MAIN_SERVER.LISTEN_ADDRESS'), 9504, SWOOLE_TCP);
    6. $socketConfig = new \EasySwoole\Socket\Config();
    7. $socketConfig->setType($socketConfig::TCP);
    8. $socketConfig->setParser(new \App\TcpController\Parser());
    9. //设置解析异常时的回调,默认将抛出异常到服务器
    10. $socketConfig->setOnExceptionHandler(function ($server, $throwable, $raw, $client, $response) {
    11. echo "tcp服务3 fd:{$client->getFd()} 发送数据异常 \n";
    12. $server->close($client->getFd());
    13. });
    14. $dispatch = new \EasySwoole\Socket\Dispatcher($socketConfig);
    15. $subPort3->on('receive', function (\swoole_server $server, int $fd, int $reactor_id, string $data) use ($dispatch) {
    16. echo "tcp服务3 fd:{$fd} 发送消息:{$data}\n";
    17. $dispatch->dispatch($server, $data, $fd, $reactor_id);
    18. });
    19. $subPort3->set(
    20. [
    21. 'open_length_check' => true,
    22. 'package_max_length' => 81920,
    23. 'package_length_type' => 'N',
    24. 'package_length_offset' => 0,
    25. 'package_body_offset' => 4,
    26. ]
    27. );
    28. $subPort3->on('connect', function (\swoole_server $server, int $fd, int $reactor_id) {
    29. echo "tcp服务3 fd:{$fd} 已连接\n";
    30. });
    31. $subPort3->on('close', function (\swoole_server $server, int $fd, int $reactor_id) {
    32. echo "tcp服务3 fd:{$fd} 已关闭\n";
    33. });
    34. }

    测试客户端

    1. <?php
    2. /**
    3. * Created by PhpStorm.
    4. * User: Tioncico
    5. * Date: 2019/3/6 0006
    6. * Time: 16:22
    7. */
    8. include "../vendor/autoload.php";
    9. define('EASYSWOOLE_ROOT', realpath(dirname(getcwd())));
    10. \EasySwoole\EasySwoole\Core::getInstance()->initialize();
    11. /**
    12. * tcp 客户端3,验证数据包处理粘包 以及转发到控制器写法
    13. */
    14. go(function () {
    15. $client = new \Swoole\Client(SWOOLE_SOCK_TCP);
    16. $client->set(
    17. [
    18. 'open_length_check' => true,
    19. 'package_max_length' => 81920,
    20. 'package_length_type' => 'N',
    21. 'package_length_offset' => 0,
    22. 'package_body_offset' => 4,
    23. ]
    24. );
    25. if (!$client->connect('127.0.0.1', 9504, 0.5)) {
    26. exit("connect failed. Error: {$client->errCode}\n");
    27. }
    28. $data = [
    29. 'controller' => 'Index',
    30. 'action' => 'index',
    31. 'param' => [
    32. 'name' => '仙士可'
    33. ],
    34. ];
    35. $str = json_encode($data);
    36. var_dump($str);
    37. $client->send(encode($str));
    38. $data = $client->recv();//服务器已经做了pack处理
    39. $data = decode($data);//需要自己剪切解析数据
    40. echo "服务端回复: $data \n";
    41. $data = [
    42. 'controller' => 'Index',
    43. 'action' => 'args',
    44. 'param' => [
    45. 'name' => '仙士可'
    46. ],
    47. ];
    48. $str = json_encode($data);
    49. $client->send(encode($str));
    50. $data = $client->recv();//服务器已经做了pack处理
    51. $data = decode($data);//需要自己剪切解析数据
    52. echo "服务端回复: $data \n";
    53. // $client->close();
    54. });
    55. /**
    56. * 数据包 pack处理
    57. * encode
    58. * @param $str
    59. * @return string
    60. * @author Tioncico
    61. * Time: 9:50
    62. */
    63. function encode($str)
    64. {
    65. return pack('N', strlen($str)) . $str;
    66. }
    67. function decode($str)
    68. {
    69. $data = substr($str, '4');
    70. return $data;
    71. }

    HTTP往TCP推送

    HTTP控制器

    1. <?php
    2. /**
    3. * Created by PhpStorm.
    4. * User: Apple
    5. * Date: 2018/11/1 0001
    6. * Time: 11:10
    7. */
    8. namespace App\HttpController;
    9. use EasySwoole\EasySwoole\ServerManager;
    10. use EasySwoole\Http\AbstractInterface\Controller;
    11. class Index extends Controller
    12. {
    13. function index()
    14. {
    15. // TODO: Implement index() method.
    16. }
    17. function push(){
    18. $fd = intval($this->request()->getRequestParam('fd'));
    19. $info = ServerManager::getInstance()->getSwooleServer()->connection_info($fd);
    20. if(is_array($info)){
    21. ServerManager::getInstance()->getSwooleServer()->send($fd,'push in http at '.time());
    22. }else{
    23. $this->response()->write("fd {$fd} not exist");
    24. }
    25. }
    26. }

    实际生产中,一般是用户TCP连接上来后,做验证,然后以userName=>fd的格式,存在redis中,需要http,或者是其他地方,比如定时器往某个连接推送的时候,就是以userName去redis中取得对应的fd,再send。注意,通过addServer形式创建的子服务器,以再完全注册自己的网络事件,你可以注册onclose事件,然后在连接断开的时候,删除userName=>fd对应。