Skip to content

Commit 94c605b

Browse files
committed
Implement ByteArrayInputStreamCodec
1 parent bae7e03 commit 94c605b

File tree

2 files changed

+156
-1
lines changed

2 files changed

+156
-1
lines changed
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright 2023 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql.codec;
18+
19+
import io.asyncer.r2dbc.mysql.MySqlParameter;
20+
import io.asyncer.r2dbc.mysql.ParameterWriter;
21+
import io.asyncer.r2dbc.mysql.api.MySqlReadableMetadata;
22+
import io.asyncer.r2dbc.mysql.constant.MySqlType;
23+
import io.asyncer.r2dbc.mysql.internal.util.VarIntUtils;
24+
import io.netty.buffer.ByteBuf;
25+
import io.netty.buffer.ByteBufAllocator;
26+
import reactor.core.publisher.Mono;
27+
28+
import java.io.ByteArrayInputStream;
29+
import java.io.IOException;
30+
import java.io.InputStream;
31+
32+
import static io.asyncer.r2dbc.mysql.internal.util.InternalArrays.EMPTY_BYTES;
33+
34+
/**
35+
* Codec for {@link InputStream}.
36+
*/
37+
final class ByteArrayInputStreamCodec extends AbstractClassedCodec<ByteArrayInputStream> {
38+
39+
static final ByteArrayInputStreamCodec INSTANCE = new ByteArrayInputStreamCodec();
40+
41+
private ByteArrayInputStreamCodec() {
42+
super(ByteArrayInputStream.class);
43+
}
44+
45+
@Override
46+
public ByteArrayInputStream decode(ByteBuf value, MySqlReadableMetadata metadata, Class<?> target, boolean binary,
47+
CodecContext context) {
48+
if (!value.isReadable()) {
49+
return new ByteArrayInputStream(EMPTY_BYTES);
50+
}
51+
return new ByteArrayInputStream(value.array());
52+
}
53+
54+
@Override
55+
protected boolean doCanDecode(MySqlReadableMetadata metadata) {
56+
return metadata.getJavaType() == InputStream.class;
57+
}
58+
59+
@Override
60+
public boolean canEncode(Object value) {
61+
return value instanceof InputStream;
62+
}
63+
64+
@Override
65+
public MySqlParameter encode(Object value, CodecContext context) {
66+
return new ByteArrayInputStreamMysqlParameter((ByteArrayInputStream) value);
67+
}
68+
69+
private static final class ByteArrayInputStreamMysqlParameter extends AbstractMySqlParameter {
70+
71+
private final ByteArrayInputStream value;
72+
73+
private ByteArrayInputStreamMysqlParameter(ByteArrayInputStream value) {
74+
this.value = value;
75+
}
76+
77+
@Override
78+
public Mono<ByteBuf> publishBinary(ByteBufAllocator allocator) {
79+
return Mono.fromSupplier(() -> {
80+
int size = value.available();
81+
if (size == 0) {
82+
return allocator.buffer(Byte.BYTES).writeByte(0);
83+
}
84+
85+
int addedSize = VarIntUtils.varIntBytes(size);
86+
ByteBuf buf = allocator.buffer(addedSize + size);
87+
88+
try {
89+
VarIntUtils.writeVarInt(buf, size);
90+
91+
byte[] byteArray = new byte[size];
92+
int readBytes = value.read(byteArray);
93+
94+
if (size != 0 && readBytes != size) {
95+
buf.release();
96+
throw new IllegalStateException("Expected to read " + size + " bytes, but got " + readBytes);
97+
}
98+
99+
return buf.writeBytes(byteArray);
100+
} catch (IOException e) {
101+
buf.release();
102+
throw new IllegalStateException("Unexpected IOException from ByteArrayInputStream", e);
103+
}
104+
});
105+
}
106+
107+
@Override
108+
public Mono<Void> publishText(ParameterWriter writer) {
109+
return Mono.fromRunnable(() -> {
110+
try {
111+
int size = value.available();
112+
byte[] byteArray = new byte[size];
113+
int readBytes = value.read(byteArray);
114+
115+
if (size != 0 && readBytes != size) {
116+
throw new IllegalStateException("Expected to read " + size + " bytes, but got " + readBytes);
117+
}
118+
119+
writer.writeHex(byteArray);
120+
} catch (IOException e) {
121+
throw new IllegalStateException("Unexpected IOException from ByteArrayInputStream", e);
122+
}
123+
});
124+
}
125+
126+
@Override
127+
public String toString() {
128+
return value.toString();
129+
}
130+
131+
@Override
132+
public boolean equals(Object o) {
133+
if (this == o) {
134+
return true;
135+
}
136+
if (!(o instanceof ByteArrayInputStreamMysqlParameter)) {
137+
return false;
138+
}
139+
140+
ByteArrayInputStreamMysqlParameter that = (ByteArrayInputStreamMysqlParameter) o;
141+
return value.equals(that.value);
142+
}
143+
144+
@Override
145+
public int hashCode() {
146+
return value.hashCode();
147+
}
148+
149+
@Override
150+
public MySqlType getType() {
151+
return MySqlType.VARBINARY;
152+
}
153+
}
154+
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/codec/DefaultCodecs.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ final class DefaultCodecs implements Codecs {
8181
BlobCodec.INSTANCE,
8282

8383
ByteBufferCodec.INSTANCE,
84-
ByteArrayCodec.INSTANCE
84+
ByteArrayCodec.INSTANCE,
85+
ByteArrayInputStreamCodec.INSTANCE
8586
);
8687

8788
private final List<Codec<?>> codecs;

0 commit comments

Comments
 (0)