• Netty 编码器和解码器
    • 实现 Memcached 编码器
    • 实现 Memcached 解码器

    Netty 编码器和解码器

    Netty 的是一个复杂和先进的框架,但它并不玄幻。当我们请求一些设置了 key 的给定值时,我们知道 Request 类的一个实例被创建来代表这个请求。但 Netty 并不知道 Request 对象是如何转成 Memcached 所期望的。Memcached 所期望的是字节序列;忽略使用的协议,数据在网络上传输永远是字节序列。

    将 Request 对象转为 Memcached 所需的字节序列,Netty 需要用 MemcachedRequest 来编码成另外一种格式。这里所说的另外一种格式不单单是从对象转为字节,也可以是从对象转为对象,或者是从对象转为字符串等。编码器的内容可以详见第七章。

    Netty 提供了一个抽象类称为 MessageToByteEncoder。它提供了一个抽象方法,将一条消息(在本例中我们 MemcachedRequest 对象)转为字节。你显示什么信息实现通过使用 Java 泛型可以处理;例如 , MessageToByteEncoder 说这个编码器要编码的对象类型是 MemcachedRequest

    MessageToByteEncoder 和 Java 泛型

    使用 MessageToByteEncoder 可以绑定特定的参数类型。如果你有多个不同的消息类型,在相同的编码器里,也可以使用MessageToByteEncoder,注意检查消息的类型即可

    这也适用于解码器,除了解码器将一系列字节转换回一个对象。
    这个 Netty 的提供了 ByteToMessageDecoder 类,而不是提供一个编码方法用来实现解码。在接下来的两个部分你看看如何实现一个 Memcached 解码器和编码器。在你做之前,应该意识到在使用 Netty 时,你不总是需要自己提供编码器和解码器。自所以现在这么做是因为 Netty 没有对 Memcached 内置支持。而 HTTP 以及其他标准的协议,Netty 已经是提供的了。

    编码器和解码器

    记住,编码器处理出站,而解码器处理入站。这基本上意味着编码器将编码数据,写入远端。解码器将从远端读取处理数据。重要的是要记住,出站和入站是两个不同的方向。

    请注意,为了程序简单,我们的编码器和解码器不检查任何值的最大大小。在实际实现中你需要一些验证检查,如果检测到违反协议,则使用 EncoderException 或 DecoderException(或一个子类)。

    实现 Memcached 编码器

    本节我们将简要介绍编码器的实现。正如我们提到的,编码器负责编码消息为字节序列。这些字节可以通过网络发送到远端。为了发送请求,我们首先创建 MemcachedRequest 类,稍后编码器实现会编码为一系列字节。下面的清单显示了我们的 MemcachedRequest 类

    Listing 14.1 Implementation of a Memcached request

    1. public class MemcachedRequest { //1
    2. private static final Random rand = new Random();
    3. private final int magic = 0x80;//fixed so hard coded
    4. private final byte opCode; //the operation e.g. set or get
    5. private final String key; //the key to delete, get or set
    6. private final int flags = 0xdeadbeef; //random
    7. private final int expires; //0 = item never expires
    8. private final String body; //if opCode is set, the value
    9. private final int id = rand.nextInt(); //Opaque
    10. private final long cas = 0; //data version check...not used
    11. private final boolean hasExtras; //not all ops have extras
    12. public MemcachedRequest(byte opcode, String key, String value) {
    13. this.opCode = opcode;
    14. this.key = key;
    15. this.body = value == null ? "" : value;
    16. this.expires = 0;
    17. //only set command has extras in our example
    18. hasExtras = opcode == Opcode.SET;
    19. }
    20. public MemcachedRequest(byte opCode, String key) {
    21. this(opCode, key, null);
    22. }
    23. public int magic() { //2
    24. return magic;
    25. }
    26. public int opCode() { //3
    27. return opCode;
    28. }
    29. public String key() { //4
    30. return key;
    31. }
    32. public int flags() { //5
    33. return flags;
    34. }
    35. public int expires() { //6
    36. return expires;
    37. }
    38. public String body() { //7
    39. return body;
    40. }
    41. public int id() { //8
    42. return id;
    43. }
    44. public long cas() { //9
    45. return cas;
    46. }
    47. public boolean hasExtras() { //10
    48. return hasExtras;
    49. }
    50. }
    1. 这个类将会发送请求到 Memcached server
    2. 幻数,它可以用来标记文件或者协议的格式
    3. opCode,反应了响应的操作已经创建了
    4. 执行操作的 key
    5. 使用的额外的 flag
    6. 表明到期时间
    7. body
    8. 请求的 id。这个id将在响应中回显。
    9. compare-and-check 的值
    10. 如果有额外的使用,将返回 true

    你如果想实现 Memcached 的其余部分协议,你只需要将 client.op(op 任何新的操作添加)转换为其中一个方法请求。我们需要两个更多的支持类,在下一个清单所示

    Listing 14.2 Possible Memcached operation codes and response statuses

    1. public class Status {
    2. public static final short NO_ERROR = 0x0000;
    3. public static final short KEY_NOT_FOUND = 0x0001;
    4. public static final short KEY_EXISTS = 0x0002;
    5. public static final short VALUE_TOO_LARGE = 0x0003;
    6. public static final short INVALID_ARGUMENTS = 0x0004;
    7. public static final short ITEM_NOT_STORED = 0x0005;
    8. public static final short INC_DEC_NON_NUM_VAL = 0x0006;
    9. }
    10. public class Opcode {
    11. public static final byte GET = 0x00;
    12. public static final byte SET = 0x01;
    13. public static final byte DELETE = 0x04;
    14. }

    一个 Opcode 告诉 Memcached 要执行哪些操作。每个操作都由一个字节表示。同样的,当 Memcached 响应一个请求,响应头中包含两个字节代表响应状态。状态和 Opcode 类表示这些 Memcached 的构造。这些操作码可以使用当你构建一个新的 MemcachedRequest 指定哪个行动应该由它引发的。

    但现在可以集中精力在编码器上:

    Listing 14.3 MemcachedRequestEncoder implementation

    1. public class MemcachedRequestEncoder extends
    2. MessageToByteEncoder<MemcachedRequest> { //1
    3. @Override
    4. protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg,
    5. ByteBuf out) throws Exception { //2
    6. byte[] key = msg.key().getBytes(CharsetUtil.UTF_8);
    7. byte[] body = msg.body().getBytes(CharsetUtil.UTF_8);
    8. //total size of the body = key size + content size + extras size //3
    9. int bodySize = key.length + body.length + (msg.hasExtras() ? 8 : 0);
    10. //write magic byte //4
    11. out.writeByte(msg.magic());
    12. //write opcode byte //5
    13. out.writeByte(msg.opCode());
    14. //write key length (2 byte) //6
    15. out.writeShort(key.length); //key length is max 2 bytes i.e. a Java short //7
    16. //write extras length (1 byte)
    17. int extraSize = msg.hasExtras() ? 0x08 : 0x0;
    18. out.writeByte(extraSize);
    19. //byte is the data type, not currently implemented in Memcached but required //8
    20. out.writeByte(0);
    21. //next two bytes are reserved, not currently implemented but are required //9
    22. out.writeShort(0);
    23. //write total body length ( 4 bytes - 32 bit int) //10
    24. out.writeInt(bodySize);
    25. //write opaque ( 4 bytes) - a 32 bit int that is returned in the response //11
    26. out.writeInt(msg.id());
    27. //write CAS ( 8 bytes)
    28. out.writeLong(msg.cas()); //24 byte header finishes with the CAS //12
    29. if (msg.hasExtras()) {
    30. //write extras (flags and expiry, 4 bytes each) - 8 bytes total //13
    31. out.writeInt(msg.flags());
    32. out.writeInt(msg.expires());
    33. }
    34. //write key //14
    35. out.writeBytes(key);
    36. //write value //15
    37. out.writeBytes(body);
    38. }
    39. }
    1. 该类是负责编码 MemachedRequest 为一系列字节
    2. 转换的 key 和实际请求的 body 到字节数组
    3. 计算 body 大小
    4. 写幻数到 ByteBuf 字节
    5. 写 opCode 作为字节
    6. 写 key 长度z作为 short
    7. 编写额外的长度作为字节
    8. 写数据类型,这总是0,因为目前不是在 Memcached,但可用于使用
      后来的版本
    9. 为保留字节写为 short ,后面的 Memcached 版本可能使用
    10. 写 body 的大小作为 long
    11. 写 opaque 作为 int
    12. 写 cas 作为 long。这个是头文件的最后部分,在 body 的开始
    13. 编写额外的 flag 和到期时间为 int
    14. 写 key
    15. 这个请求完成后 写 body。

    总结,编码器 使用 Netty 的 ByteBuf 处理请求,编码 MemcachedRequest 成一套正确排序的字节。详细步骤为:

    • 写幻数字节。
    • 写 opcode 字节。
    • 写 key 长度(2字节)。
    • 写额外的长度(1字节)。
    • 写数据类型(1字节)。
    • 为保留字节写 null 字节(2字节)。
    • 写 body 长度(4字节- 32位整数)。
    • 写 opaque(4个字节,一个32位整数在响应中返回)。
    • 写 CAS(8个字节)。
    • 写 额外的(flag 和 到期,4字节)= 8个字节
    • 写 key
    • 写 值

    无论你放入什么到输出缓冲区( 调用 ByteBuf) Netty 的将向服务器发送被写入请求。下一节将展示如何进行反向通过解码器工作。

    实现 Memcached 解码器

    将 MemcachedRequest 对象转为字节序列,Memcached 仅需将字节转到响应对象返回即可。

    先见一个 POJO:

    Listing 14.7 Implementation of a MemcachedResponse

    1. public class MemcachedResponse { //1
    2. private final byte magic;
    3. private final byte opCode;
    4. private byte dataType;
    5. private final short status;
    6. private final int id;
    7. private final long cas;
    8. private final int flags;
    9. private final int expires;
    10. private final String key;
    11. private final String data;
    12. public MemcachedResponse(byte magic, byte opCode,
    13. byte dataType, short status,
    14. int id, long cas,
    15. int flags, int expires, String key, String data) {
    16. this.magic = magic;
    17. this.opCode = opCode;
    18. this.dataType = dataType;
    19. this.status = status;
    20. this.id = id;
    21. this.cas = cas;
    22. this.flags = flags;
    23. this.expires = expires;
    24. this.key = key;
    25. this.data = data;
    26. }
    27. public byte magic() { //2
    28. return magic;
    29. }
    30. public byte opCode() { //3
    31. return opCode;
    32. }
    33. public byte dataType() { //4
    34. return dataType;
    35. }
    36. public short status() { //5
    37. return status;
    38. }
    39. public int id() { //6
    40. return id;
    41. }
    42. public long cas() { //7
    43. return cas;
    44. }
    45. public int flags() { //8
    46. return flags;
    47. }
    48. public int expires() { //9
    49. return expires;
    50. }
    51. public String key() { //10
    52. return key;
    53. }
    54. public String data() { //11
    55. return data;
    56. }
    57. }
    1. 该类,代表从 Memcached 服务器返回的响应
    2. 幻数
    3. opCode,这反映了创建操作的响应
    4. 数据类型,这表明这个是基于二进制还是文本
    5. 响应的状态,这表明如果请求是成功的
    6. 惟一的 id
    7. compare-and-set 值
    8. 使用额外的 flag
    9. 表示该值存储的一个有效期
    10. 响应创建的 key
    11. 实际数据

    下面为 MemcachedResponseDecoder, 使用了 ByteToMessageDecoder 基类,用于将 字节序列转为 MemcachedResponse

    Listing 14.4 MemcachedResponseDecoder class

    1. public class MemcachedResponseDecoder extends ByteToMessageDecoder { //1
    2. private enum State { //2
    3. Header,
    4. Body
    5. }
    6. private State state = State.Header;
    7. private int totalBodySize;
    8. private byte magic;
    9. private byte opCode;
    10. private short keyLength;
    11. private byte extraLength;
    12. private short status;
    13. private int id;
    14. private long cas;
    15. @Override
    16. protected void decode(ChannelHandlerContext ctx, ByteBuf in,
    17. List<Object> out) {
    18. switch (state) { //3
    19. case Header:
    20. if (in.readableBytes() < 24) {
    21. return;//response header is 24 bytes //4
    22. }
    23. magic = in.readByte(); //5
    24. opCode = in.readByte();
    25. keyLength = in.readShort();
    26. extraLength = in.readByte();
    27. in.skipBytes(1);
    28. status = in.readShort();
    29. totalBodySize = in.readInt();
    30. id = in.readInt(); //referred to in the protocol spec as opaque
    31. cas = in.readLong();
    32. state = State.Body;
    33. case Body:
    34. if (in.readableBytes() < totalBodySize) {
    35. return; //until we have the entire payload return //6
    36. }
    37. int flags = 0, expires = 0;
    38. int actualBodySize = totalBodySize;
    39. if (extraLength > 0) { //7
    40. flags = in.readInt();
    41. actualBodySize -= 4;
    42. }
    43. if (extraLength > 4) { //8
    44. expires = in.readInt();
    45. actualBodySize -= 4;
    46. }
    47. String key = "";
    48. if (keyLength > 0) { //9
    49. ByteBuf keyBytes = in.readBytes(keyLength);
    50. key = keyBytes.toString(CharsetUtil.UTF_8);
    51. actualBodySize -= keyLength;
    52. }
    53. ByteBuf body = in.readBytes(actualBodySize); //10
    54. String data = body.toString(CharsetUtil.UTF_8);
    55. out.add(new MemcachedResponse( //1
    56. magic,
    57. opCode,
    58. status,
    59. id,
    60. cas,
    61. flags,
    62. expires,
    63. key,
    64. data
    65. ));
    66. state = State.Header;
    67. }
    68. }
    69. }
    1. 类负责创建的 MemcachedResponse 读取字节
    2. 代表当前解析状态,这意味着我们需要解析的头或 body
    3. 根据解析状态切换
    4. 如果不是至少24个字节是可读的,它不可能读整个头部,所以返回这里,等待再通知一次数据准备阅读
    5. 阅读所有头的字段
    6. 检查是否足够的数据是可读用来读取完整的响应的 body。长度是从头读取
    7. 检查如果有任何额外的 flag 用于读,如果是这样做
    8. 检查如果响应包含一个 expire 字段,有就读它
    9. 检查响应是否包含一个 key ,有就读它
    10. 读实际的 body 的 payload
    11. 从前面读取字段和数据构造一个新的 MemachedResponse

    所以在实现发生了什么事?我们知道一个 Memcached 响应有24位头;我们不知道是否所有数据,响应将被包含在输入 ByteBuf ,当解码方法调用时。这是因为底层网络堆栈可能将数据分解成块。所以确保我们只解码当我们有足够的数据,这段代码检查是否可用可读的字节的数量至少是24。一旦我们有24个字节,我们可以确定整个消息有多大,因为这个信息包含在24位头。

    当我们解码整个消息,我们创建一个 MemcachedResponse 并将其添加到输出列表。任何对象添加到该列表将被转发到下一个ChannelInboundHandler 在 ChannelPipeline,因此允许处理。