|
1 |
| -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. |
| 1 | +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. |
2 | 2 | // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
|
3 | 3 | //
|
4 | 4 | // This software, the RabbitMQ Stream Java client library, is dual-licensed under the
|
|
16 | 16 |
|
17 | 17 | import com.rabbitmq.stream.ChunkChecksum;
|
18 | 18 | import com.rabbitmq.stream.ChunkChecksumValidationException;
|
19 |
| -import com.rabbitmq.stream.StreamException; |
20 | 19 | import io.netty.buffer.ByteBuf;
|
21 |
| -import io.netty.util.ByteProcessor; |
22 |
| -import java.lang.reflect.InvocationTargetException; |
23 |
| -import java.lang.reflect.Method; |
24 |
| -import java.nio.ByteBuffer; |
25 | 20 | import java.util.function.Supplier;
|
26 | 21 | import java.util.zip.CRC32;
|
27 | 22 | import java.util.zip.Checksum;
|
28 |
| -import org.slf4j.Logger; |
29 |
| -import org.slf4j.LoggerFactory; |
30 | 23 |
|
31 | 24 | class JdkChunkChecksum implements ChunkChecksum {
|
32 | 25 |
|
33 |
| - static final ChunkChecksum CRC32_SINGLETON; |
34 |
| - private static final Logger LOGGER = LoggerFactory.getLogger(JdkChunkChecksum.class); |
35 | 26 | private static final Supplier<Checksum> CRC32_SUPPLIER = CRC32::new;
|
36 |
| - |
37 |
| - static { |
38 |
| - if (isChecksumUpdateByteBufferAvailable()) { |
39 |
| - LOGGER.debug("Checksum#update(ByteBuffer) method available, using it for direct buffers"); |
40 |
| - CRC32_SINGLETON = new ByteBufferDirectByteBufChecksum(CRC32_SUPPLIER); |
41 |
| - } else { |
42 |
| - LOGGER.debug( |
43 |
| - "Checksum#update(ByteBuffer) method not available, using byte-by-byte CRC calculation for direct buffers"); |
44 |
| - CRC32_SINGLETON = new JdkChunkChecksum(CRC32_SUPPLIER); |
45 |
| - } |
46 |
| - } |
| 27 | + static final ChunkChecksum CRC32_SINGLETON = new JdkChunkChecksum(CRC32_SUPPLIER); |
47 | 28 |
|
48 | 29 | private final Supplier<Checksum> checksumSupplier;
|
49 | 30 |
|
50 |
| - JdkChunkChecksum() { |
51 |
| - this(CRC32_SUPPLIER); |
52 |
| - } |
53 |
| - |
54 | 31 | JdkChunkChecksum(Supplier<Checksum> checksumSupplier) {
|
55 | 32 | this.checksumSupplier = checksumSupplier;
|
56 | 33 | }
|
57 | 34 |
|
58 |
| - private static boolean isChecksumUpdateByteBufferAvailable() { |
59 |
| - try { |
60 |
| - Checksum.class.getDeclaredMethod("update", ByteBuffer.class); |
61 |
| - return true; |
62 |
| - } catch (Exception e) { |
63 |
| - return false; |
64 |
| - } |
65 |
| - } |
66 |
| - |
67 | 35 | @Override
|
68 | 36 | public void checksum(ByteBuf byteBuf, long dataLength, long expected) {
|
69 | 37 | Checksum checksum = checksumSupplier.get();
|
70 | 38 | if (byteBuf.hasArray()) {
|
71 | 39 | checksum.update(
|
72 | 40 | byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(), byteBuf.readableBytes());
|
73 | 41 | } else {
|
74 |
| - byteBuf.forEachByte( |
75 |
| - byteBuf.readerIndex(), byteBuf.readableBytes(), new UpdateProcessor(checksum)); |
| 42 | + checksum.update(byteBuf.nioBuffer(byteBuf.readerIndex(), byteBuf.readableBytes())); |
76 | 43 | }
|
77 | 44 | if (expected != checksum.getValue()) {
|
78 | 45 | throw new ChunkChecksumValidationException(expected, checksum.getValue());
|
79 | 46 | }
|
80 | 47 | }
|
81 |
| - |
82 |
| - private static final class ByteBufferDirectByteBufChecksum implements ChunkChecksum { |
83 |
| - |
84 |
| - private final Supplier<Checksum> checksumSupplier; |
85 |
| - private final Method updateMethod; |
86 |
| - |
87 |
| - private ByteBufferDirectByteBufChecksum(Supplier<Checksum> checksumSupplier) { |
88 |
| - this.checksumSupplier = checksumSupplier; |
89 |
| - try { |
90 |
| - this.updateMethod = Checksum.class.getDeclaredMethod("update", ByteBuffer.class); |
91 |
| - } catch (NoSuchMethodException e) { |
92 |
| - throw new StreamException("Error while looking up Checksum#update(ByteBuffer) method", e); |
93 |
| - } |
94 |
| - } |
95 |
| - |
96 |
| - @Override |
97 |
| - public void checksum(ByteBuf byteBuf, long dataLength, long expected) { |
98 |
| - Checksum checksum = checksumSupplier.get(); |
99 |
| - if (byteBuf.hasArray()) { |
100 |
| - checksum.update( |
101 |
| - byteBuf.array(), |
102 |
| - byteBuf.arrayOffset() + byteBuf.readerIndex(), |
103 |
| - byteBuf.readableBytes()); |
104 |
| - } else { |
105 |
| - try { |
106 |
| - this.updateMethod.invoke( |
107 |
| - checksum, byteBuf.nioBuffer(byteBuf.readerIndex(), byteBuf.readableBytes())); |
108 |
| - } catch (IllegalAccessException | InvocationTargetException e) { |
109 |
| - throw new StreamException("Error while calculating CRC", e); |
110 |
| - } |
111 |
| - } |
112 |
| - if (expected != checksum.getValue()) { |
113 |
| - throw new ChunkChecksumValidationException(expected, checksum.getValue()); |
114 |
| - } |
115 |
| - } |
116 |
| - } |
117 |
| - |
118 |
| - private static class UpdateProcessor implements ByteProcessor { |
119 |
| - |
120 |
| - private final Checksum checksum; |
121 |
| - |
122 |
| - private UpdateProcessor(Checksum checksum) { |
123 |
| - this.checksum = checksum; |
124 |
| - } |
125 |
| - |
126 |
| - @Override |
127 |
| - public boolean process(byte value) { |
128 |
| - checksum.update(value); |
129 |
| - return true; |
130 |
| - } |
131 |
| - } |
132 | 48 | }
|
0 commit comments