Skip to content

Commit b657331

Browse files
committed
Implement ByteArrayInputStreamCodec
1 parent bae7e03 commit b657331

File tree

2 files changed

+158
-1
lines changed

2 files changed

+158
-1
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright 2023 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql.codec;
18+
19+
import io.asyncer.r2dbc.mysql.MySqlParameter;
20+
import io.asyncer.r2dbc.mysql.ParameterWriter;
21+
import io.asyncer.r2dbc.mysql.api.MySqlReadableMetadata;
22+
import io.asyncer.r2dbc.mysql.constant.MySqlType;
23+
import io.asyncer.r2dbc.mysql.internal.util.VarIntUtils;
24+
import io.netty.buffer.ByteBuf;
25+
import io.netty.buffer.ByteBufAllocator;
26+
import reactor.core.publisher.Mono;
27+
28+
import java.io.ByteArrayInputStream;
29+
import java.io.IOException;
30+
import java.io.InputStream;
31+
32+
import static io.asyncer.r2dbc.mysql.internal.util.InternalArrays.EMPTY_BYTES;
33+
34+
/**
35+
* Codec for {@link InputStream}.
36+
*/
37+
final class ByteArrayInputStreamCodec extends AbstractClassedCodec<ByteArrayInputStream> {
38+
39+
static final ByteArrayInputStreamCodec INSTANCE = new ByteArrayInputStreamCodec();
40+
41+
private ByteArrayInputStreamCodec() {
42+
super(ByteArrayInputStream.class);
43+
}
44+
45+
@Override
46+
public ByteArrayInputStream decode(ByteBuf value, MySqlReadableMetadata metadata, Class<?> target, boolean binary,
47+
CodecContext context) {
48+
if (!value.isReadable()) {
49+
return new ByteArrayInputStream(EMPTY_BYTES);
50+
}
51+
return new ByteArrayInputStream(value.array());
52+
}
53+
54+
@Override
55+
protected boolean doCanDecode(MySqlReadableMetadata metadata) {
56+
return metadata.getJavaType() == InputStream.class;
57+
}
58+
59+
@Override
60+
public boolean canEncode(Object value) {
61+
return value instanceof InputStream;
62+
}
63+
64+
@Override
65+
public MySqlParameter encode(Object value, CodecContext context) {
66+
return new ByteArrayInputStreamMysqlParameter((ByteArrayInputStream) value);
67+
}
68+
69+
private static final class ByteArrayInputStreamMysqlParameter extends AbstractMySqlParameter {
70+
71+
private final ByteArrayInputStream value;
72+
73+
private ByteArrayInputStreamMysqlParameter(ByteArrayInputStream value) {
74+
this.value = value;
75+
}
76+
77+
@Override
78+
public Mono<ByteBuf> publishBinary(ByteBufAllocator allocator) {
79+
return Mono.fromSupplier(() -> {
80+
int size = value.available();
81+
if (size == 0) {
82+
return allocator.buffer(Byte.BYTES).writeByte(0);
83+
}
84+
85+
int addedSize = VarIntUtils.varIntBytes(size);
86+
ByteBuf buf = allocator.buffer(addedSize + size);
87+
88+
try {
89+
VarIntUtils.writeVarInt(buf, size);
90+
91+
byte[] byteArray = new byte[size];
92+
int readBytes = value.read(byteArray);
93+
94+
// ✅ size == 0 이면 readBytes == -1 이어도 무시
95+
if (size != 0 && readBytes != size) {
96+
buf.release();
97+
throw new IllegalStateException("Expected to read " + size + " bytes, but got " + readBytes);
98+
}
99+
100+
return buf.writeBytes(byteArray);
101+
} catch (IOException e) {
102+
buf.release();
103+
throw new IllegalStateException("Unexpected IOException from ByteArrayInputStream", e);
104+
}
105+
});
106+
}
107+
108+
@Override
109+
public Mono<Void> publishText(ParameterWriter writer) {
110+
return Mono.fromRunnable(() -> {
111+
try {
112+
int size = value.available();
113+
byte[] byteArray = new byte[size];
114+
int readBytes = value.read(byteArray);
115+
116+
// ✅ size == 0 이면 readBytes == -1 이어도 무시
117+
if (size != 0 && readBytes != size) {
118+
throw new IllegalStateException("Expected to read " + size + " bytes, but got " + readBytes);
119+
}
120+
121+
writer.writeHex(byteArray);
122+
} catch (IOException e) {
123+
throw new IllegalStateException("Unexpected IOException from ByteArrayInputStream", e);
124+
}
125+
});
126+
}
127+
128+
@Override
129+
public String toString() {
130+
return value.toString();
131+
}
132+
133+
@Override
134+
public boolean equals(Object o) {
135+
if (this == o) {
136+
return true;
137+
}
138+
if (!(o instanceof ByteArrayInputStreamMysqlParameter)) {
139+
return false;
140+
}
141+
142+
ByteArrayInputStreamMysqlParameter that = (ByteArrayInputStreamMysqlParameter) o;
143+
return value.equals(that.value);
144+
}
145+
146+
@Override
147+
public int hashCode() {
148+
return value.hashCode();
149+
}
150+
151+
@Override
152+
public MySqlType getType() {
153+
return MySqlType.VARBINARY;
154+
}
155+
}
156+
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ final class DefaultCodecs implements Codecs {
8181
BlobCodec.INSTANCE,
8282

8383
ByteBufferCodec.INSTANCE,
84-
ByteArrayCodec.INSTANCE
84+
ByteArrayCodec.INSTANCE,
85+
ByteArrayInputStreamCodec.INSTANCE
8586
);
8687

8788
private final List<Codec<?>> codecs;

0 commit comments

Comments
 (0)