• 11.8 实现远程方法调用
    • 问题
    • 解决方案
    • 讨论

    11.8 实现远程方法调用

    问题

    你想在一个消息传输层如 socketsmultiprocessing connectionsZeroMQ的基础之上实现一个简单的远程过程调用(RPC)。

    解决方案

    将函数请求、参数和返回值使用pickle编码后,在不同的解释器直接传送pickle字节字符串,可以很容易的实现RPC。下面是一个简单的PRC处理器,可以被整合到一个服务器中去:

    1. # rpcserver.py
    2.  
    3. import pickle
    4. class RPCHandler:
    5. def __init__(self):
    6. self._functions = { }
    7.  
    8. def register_function(self, func):
    9. self._functions[func.__name__] = func
    10.  
    11. def handle_connection(self, connection):
    12. try:
    13. while True:
    14. # Receive a message
    15. func_name, args, kwargs = pickle.loads(connection.recv())
    16. # Run the RPC and send a response
    17. try:
    18. r = self._functions[func_name](*args,**kwargs)
    19. connection.send(pickle.dumps(r))
    20. except Exception as e:
    21. connection.send(pickle.dumps(e))
    22. except EOFError:
    23. pass

    要使用这个处理器,你需要将它加入到一个消息服务器中。你有很多种选择,但是使用 multiprocessing 库是最简单的。下面是一个RPC服务器例子:

    1. from multiprocessing.connection import Listener
    2. from threading import Thread
    3.  
    4. def rpc_server(handler, address, authkey):
    5. sock = Listener(address, authkey=authkey)
    6. while True:
    7. client = sock.accept()
    8. t = Thread(target=handler.handle_connection, args=(client,))
    9. t.daemon = True
    10. t.start()
    11.  
    12. # Some remote functions
    13. def add(x, y):
    14. return x + y
    15.  
    16. def sub(x, y):
    17. return x - y
    18.  
    19. # Register with a handler
    20. handler = RPCHandler()
    21. handler.register_function(add)
    22. handler.register_function(sub)
    23.  
    24. # Run the server
    25. rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')

    为了从一个远程客户端访问服务器,你需要创建一个对应的用来传送请求的RPC代理类。例如

    1. import pickle
    2.  
    3. class RPCProxy:
    4. def __init__(self, connection):
    5. self._connection = connection
    6. def __getattr__(self, name):
    7. def do_rpc(*args, **kwargs):
    8. self._connection.send(pickle.dumps((name, args, kwargs)))
    9. result = pickle.loads(self._connection.recv())
    10. if isinstance(result, Exception):
    11. raise result
    12. return result
    13. return do_rpc

    要使用这个代理类,你需要将其包装到一个服务器的连接上面,例如:

    1. >>> from multiprocessing.connection import Client
    2. >>> c = Client(('localhost', 17000), authkey=b'peekaboo')
    3. >>> proxy = RPCProxy(c)
    4. >>> proxy.add(2, 3)
    5.  
    6. 5
    7. >>> proxy.sub(2, 3)
    8. -1
    9. >>> proxy.sub([1, 2], 4)
    10. Traceback (most recent call last):
    11. File "<stdin>", line 1, in <module>
    12. File "rpcserver.py", line 37, in do_rpc
    13. raise result
    14. TypeError: unsupported operand type(s) for -: 'list' and 'int'
    15. >>>

    要注意的是很多消息层(比如 multiprocessing )已经使用pickle序列化了数据。如果是这样的话,对 pickle.dumps()pickle.loads() 的调用要去掉。

    讨论

    RPCHandlerRPCProxy 的基本思路是很比较简单的。如果一个客户端想要调用一个远程函数,比如 foo(1, 2, z=3),代理类创建一个包含了函数名和参数的元组 ('foo', (1, 2), {'z': 3}) 。这个元组被pickle序列化后通过网络连接发生出去。这一步在 RPCProxygetattr() 方法返回的 do_rpc() 闭包中完成。服务器接收后通过pickle反序列化消息,查找函数名看看是否已经注册过,然后执行相应的函数。执行结果(或异常)被pickle序列化后返回发送给客户端。我们的实例需要依赖 multiprocessing 进行通信。不过,这种方式可以适用于其他任何消息系统。例如,如果你想在ZeroMQ之上实习RPC,仅仅只需要将连接对象换成合适的ZeroMQ的socket对象即可。

    由于底层需要依赖pickle,那么安全问题就需要考虑了(因为一个聪明的黑客可以创建特定的消息,能够让任意函数通过pickle反序列化后被执行)。因此你永远不要允许来自不信任或未认证的客户端的RPC。特别是你绝对不要允许来自Internet的任意机器的访问,这种只能在内部被使用,位于防火墙后面并且不要对外暴露。

    作为pickle的替代,你也许可以考虑使用JSON、XML或一些其他的编码格式来序列化消息。例如,本机实例可以很容易的改写成JSON编码方案。还需要将 pickle.loads()pickle.dumps()替换成 json.loads()json.dumps() 即可:

    1. # jsonrpcserver.py
    2. import json
    3.  
    4. class RPCHandler:
    5. def __init__(self):
    6. self._functions = { }
    7.  
    8. def register_function(self, func):
    9. self._functions[func.__name__] = func
    10.  
    11. def handle_connection(self, connection):
    12. try:
    13. while True:
    14. # Receive a message
    15. func_name, args, kwargs = json.loads(connection.recv())
    16. # Run the RPC and send a response
    17. try:
    18. r = self._functions[func_name](*args,**kwargs)
    19. connection.send(json.dumps(r))
    20. except Exception as e:
    21. connection.send(json.dumps(str(e)))
    22. except EOFError:
    23. pass
    24.  
    25. # jsonrpcclient.py
    26. import json
    27.  
    28. class RPCProxy:
    29. def __init__(self, connection):
    30. self._connection = connection
    31. def __getattr__(self, name):
    32. def do_rpc(*args, **kwargs):
    33. self._connection.send(json.dumps((name, args, kwargs)))
    34. result = json.loads(self._connection.recv())
    35. return result
    36. return do_rpc

    实现RPC的一个比较复杂的问题是如何去处理异常。至少,当方法产生异常时服务器不应该奔溃。因此,返回给客户端的异常所代表的含义就要好好设计了。如果你使用pickle,异常对象实例在客户端能被反序列化并抛出。如果你使用其他的协议,那得想想另外的方法了。不过至少,你应该在响应中返回异常字符串。我们在JSON的例子中就是使用的这种方式。

    对于其他的RPC实现例子,我推荐你看看在XML-RPC中使用的 SimpleXMLRPCServerServerProxy 的实现,也就是11.6小节中的内容。

    原文:

    http://python3-cookbook.readthedocs.io/zh_CN/latest/c11/p08_implementing_remote_procedure_calls.html