主要解码器


ByteToMessageDecoder

消息解码器是netty中的重要部分, 而ByteToMessageDecoder是自定义解码器的重要的类

该解码器主要是下面的职责

  1. 累加字节码信息

  2. 解码字节码消息成另一种格式的消息传递消费

该解码器注意点

  1. 单解码设置, 有升级协议或者其他操作的设置成单解码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
// 字节解码器
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {

// 合并累加器类, 主要是将上次接受的到消息和当前的消息进行累加后再处理
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable() && in.isContiguous()) {
// 累加器是空的, 并且入栈缓冲器是是连续的, 直接使用入栈缓冲器
cumulation.release();
return in;
}
try {
// 当前信息的字节长度
final int required = in.readableBytes();
// 当前累加器中的容量不够, 扩展累加器容量
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// 扩容累加器
return expandCumulation(alloc, cumulation, in);
}
// 将当前消息和之前的消息合并
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// 释放入栈缓冲区内存
in.release();
}
}
};

// 混合累加器, 将上次的消息和当前的消息进行混合, 不需要将消息进行拷贝合并
// 该类是netty零拷贝的一个体现
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable()) {
cumulation.release();
return in;
}
CompositeByteBuf composite = null;
try {
if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
// 当前累加器是混合累加器, 而且该缓冲器还未被释放
composite = (CompositeByteBuf) cumulation;
// Writer index must equal capacity if we are going to "write"
// new components to the end
if (composite.writerIndex() != composite.capacity()) {
composite.capacity(composite.writerIndex());
}
} else {
// 分配一个新的混合累加器
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
}
// 将入栈缓冲器中混合到混合累加器中
composite.addFlattenedComponents(true, in);
in = null;
return composite;
} finally {
if (in != null) {
// 引用累积器-1
in.release();
// 混合累加器引用计数-1
if (composite != null && composite != cumulation) {
composite.release();
}
}
}
}
};

private static final byte STATE_INIT = 0;
private static final byte STATE_CALLING_CHILD_DECODE = 1;
private static final byte STATE_HANDLER_REMOVED_PENDING = 2;

ByteBuf cumulation;
private Cumulator cumulator = MERGE_CUMULATOR;// 默认是合并累积器
private boolean singleDecode;// 是否是单解码, 就是每次channelRead只解码一条信息, 性能会比较差
private boolean first;

/**
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
*/
private boolean firedChannelRead;

/**
* 解码状态标识位,
* <ul>
* <li>{@link #STATE_INIT}</li>
* <li>{@link #STATE_CALLING_CHILD_DECODE}</li>
* <li>{@link #STATE_HANDLER_REMOVED_PENDING}</li>
* </ul>
*/
private byte decodeState = STATE_INIT;
private int discardAfterReads = 16;
private int numReads;

protected ByteToMessageDecoder() {
ensureNotSharable();
}

/**
* 设置每次只解码一条消息, 如果有协议升级等需要单条信息的时候, 设置单解码将很有用
* 默认是单解码
*
* Default is {@code false} as this has performance impacts.
*/
public void setSingleDecode(boolean singleDecode) {
this.singleDecode = singleDecode;
}

/**
* If {@code true} then only one message is decoded on each
* {@link #channelRead(ChannelHandlerContext, Object)} call.
*
* Default is {@code false} as this has performance impacts.
*/
public boolean isSingleDecode() {
return singleDecode;
}

/**
* Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s.
*/
public void setCumulator(Cumulator cumulator) {
this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
}

/**
* Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
* The default is {@code 16}.
*/
public void setDiscardAfterReads(int discardAfterReads) {
checkPositive(discardAfterReads, "discardAfterReads");
this.discardAfterReads = discardAfterReads;
}

/**
* Returns the actual number of readable bytes in the internal cumulative
* buffer of this decoder. You usually do not need to rely on this value
* to write a decoder. Use it only when you must use it at your own risk.
* This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
*/
protected int actualReadableBytes() {
return internalBuffer().readableBytes();
}

/**
* Returns the internal cumulative buffer of this decoder. You usually
* do not need to access the internal buffer directly to write a decoder.
* Use it only when you must use it at your own risk.
*/
protected ByteBuf internalBuffer() {
if (cumulation != null) {
return cumulation;
} else {
return Unpooled.EMPTY_BUFFER;
}
}

@Override
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (decodeState == STATE_CALLING_CHILD_DECODE) {
decodeState = STATE_HANDLER_REMOVED_PENDING;
return;
}
ByteBuf buf = cumulation;
if (buf != null) {
// Directly set this to null so we are sure we not access it in any other method here anymore.
cumulation = null;
numReads = 0;
int readable = buf.readableBytes();
if (readable > 0) {
ctx.fireChannelRead(buf);
ctx.fireChannelReadComplete();
} else {
buf.release();
}
}
handlerRemoved0(ctx);
}

/**
* Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle
* events anymore.
*/
protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }

// 该类中最主要的一个方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
// 累加字节消息
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// 调用信息解码方法
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}

int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
// 消息不是ByteBuf, 直接走下一个handler
ctx.fireChannelRead(msg);
}
}

/**
* Get {@code numElements} out of the {@link List} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
if (msgs instanceof CodecOutputList) {
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
} else {
for (int i = 0; i < numElements; i++) {
ctx.fireChannelRead(msgs.get(i));
}
}
}

/**
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
ctx.read();
}
firedChannelRead = false;
ctx.fireChannelReadComplete();
}

protected final void discardSomeReadBytes() {
if (cumulation != null && !first && cumulation.refCnt() == 1) {
// discard some bytes if possible to make more room in the
// buffer but only if the refCnt == 1 as otherwise the user may have
// used slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
cumulation.discardSomeReadBytes();
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channelInputClosed(ctx, true);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ChannelInputShutdownEvent) {
// The decodeLast method is invoked when a channelInactive event is encountered.
// This method is responsible for ending requests in some situations and must be called
// when the input has been shutdown.
channelInputClosed(ctx, false);
}
super.userEventTriggered(ctx, evt);
}

private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
CodecOutputList out = CodecOutputList.newInstance();
try {
channelInputClosed(ctx, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null) {
cumulation.release();
cumulation = null;
}
int size = out.size();
fireChannelRead(ctx, out, size);
if (size > 0) {
// Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete();
}
if (callChannelInactive) {
ctx.fireChannelInactive();
}
} finally {
// Recycle in all cases
out.recycle();
}
}
}

/**
* Called when the input of the channel was closed which may be because it changed to inactive or because of
* {@link ChannelInputShutdownEvent}.
*/
void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
if (cumulation != null) {
callDecode(ctx, cumulation, out);
// If callDecode(...) removed the handle from the pipeline we should not call decodeLast(...) as this would
// be unexpected.
if (!ctx.isRemoved()) {
// Use Unpooled.EMPTY_BUFFER if cumulation become null after calling callDecode(...).
// See https://github.com/netty/netty/issues/10802.
ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
decodeLast(ctx, buffer, out);
}
} else {
decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
}
}

/**
* ByteBuf解码方法, 该方法将会调用decode(ChannelHandlerContext, ByteBuf, List)做真正的解码
*
* @param ctx 属于该Handler的ChannelHandlerContext
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();

// 先消费消息, 当消息条数>0
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}

int oldInputLength = in.readableBytes();
// 下面方法中调用真正的解码方法, 将字节码解码成消息追加到out中
decodeRemovalReentryProtection(ctx, in, out);

// Handler已经被移除
if (ctx.isRemoved()) {
break;
}

// 如果一个消息都解码不出来, 则方法返回
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}

if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}

// 如果是单解码, 则该解码器解码一条消息后就方法返回
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}

/**
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
* {@link ByteBuf}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

/**
* 将ByteBuf转换成另一个消息, 直到ByteBuf不能解码出一个完整的消息
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
// 设置解码状态, 在解码时, 该解码器不能被执行handlerRemoved(ChannelHandlerContext ctx)
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
// 如果该解码器需要被Removed, 直接消费消息后将该解码器移除
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}

/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.isReadable()) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
decodeRemovalReentryProtection(ctx, in, out);
}
}

static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
int oldBytes = oldCumulation.readableBytes();
int newBytes = in.readableBytes();
int totalBytes = oldBytes + newBytes;
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
ByteBuf toRelease = newCumulation;
try {
// This avoids redundant checks and stack depth compared to calling writeBytes(...)
newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
.setBytes(oldBytes, in, in.readerIndex(), newBytes)
.writerIndex(totalBytes);
in.readerIndex(in.writerIndex());
toRelease = oldCumulation;
return newCumulation;
} finally {
toRelease.release();
}
}

/**
* Cumulate {@link ByteBuf}s.
*/
public interface Cumulator {
/**
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
*/
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
}

MessageToMessageDecoder解码器

另一个重要的解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {

private final TypeParameterMatcher matcher;

/**
* 创建该解码器的参数匹配器, 当入参和该类申明的入栈消息格式一致将传递到解码器, 否则直接跳过
*/
protected MessageToMessageDecoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}

/**
* 对象构造器
*
* @param inboundMessageType The type of messages to match and so decode
*/
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
matcher = TypeParameterMatcher.get(inboundMessageType);
}

/**
* 该入栈消息是否能进入该解码器进行解码
*
* {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*/
public boolean acceptInboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
// 释放前解码器的消息引用
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
} finally {
out.recycle();
}
}
}

/**
* Decode from one message to an other. This method will be called for each written message that can be handled
* by this decoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to
* @param msg the message to decode to an other one
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

文章作者: jin.zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 jin.zhang !
  目录