Skip to content

Commit 434bd46

Browse files
author
dponomarev
committed
Extract read/write operations into util classes.
Create TarantoolBinaryPacket class that describes tarantool binary protocol message. Move logic of sending bytes, receiving binary messages (packets), creating packet and connecting to a tarantool instance from TarantoolBase and TarantoolClientImpl to BinaryProtoUtils class.
1 parent d2dfa4c commit 434bd46

13 files changed

+605
-243
lines changed

src/it/java/org/tarantool/TestTarantoolClient.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.tarantool;
22

3+
import org.tarantool.server.TarantoolBinaryPacket;
4+
35
import java.io.IOException;
46
import java.net.InetSocketAddress;
57
import java.nio.ByteBuffer;
@@ -81,8 +83,9 @@ protected void reconnect(int retry, Throwable lastError) {
8183
}
8284

8385
@Override
84-
protected void complete(long code, CompletableFuture<?> q) {
85-
super.complete(code, q);
86+
protected void complete(TarantoolBinaryPacket packet, CompletableFuture<?> q) {
87+
super.complete(packet, q);
88+
long code = packet.getCode();
8689
if (code != 0) {
8790
System.out.println(code);
8891
}

src/main/java/org/tarantool/CountInputStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
import java.io.InputStream;
44

55
public abstract class CountInputStream extends InputStream {
6-
abstract long getBytesRead();
6+
public abstract long getBytesRead();
77
}

src/main/java/org/tarantool/JDBCBridge.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Map;
99

1010
import org.tarantool.jdbc.SQLResultSet;
11+
import org.tarantool.server.TarantoolBinaryPacket;
1112

1213
public class JDBCBridge {
1314
public static final JDBCBridge EMPTY = new JDBCBridge(Collections.<TarantoolBase.SQLMetaData>emptyList(), Collections.<List<Object>>emptyList());
@@ -16,8 +17,8 @@ public class JDBCBridge {
1617
final Map<String,Integer> columnsByName;
1718
final List<List<Object>> rows;
1819

19-
protected JDBCBridge(TarantoolConnection connection) {
20-
this(connection.getSQLMetadata(),connection.getSQLData());
20+
protected JDBCBridge(TarantoolBinaryPacket pack) {
21+
this(SqlProtoUtils.getSQLMetadata(pack), SqlProtoUtils.getSQLData(pack));
2122
}
2223

2324
protected JDBCBridge(List<TarantoolBase.SQLMetaData> sqlMetadata, List<List<Object>> rows) {
@@ -30,8 +31,8 @@ protected JDBCBridge(List<TarantoolBase.SQLMetaData> sqlMetadata, List<List<Obje
3031
}
3132

3233
public static JDBCBridge query(TarantoolConnection connection, String sql, Object ... params) {
33-
connection.sql(sql, params);
34-
return new JDBCBridge(connection);
34+
TarantoolBinaryPacket pack = connection.sql(sql, params);
35+
return new JDBCBridge(pack);
3536
}
3637

3738
public static int update(TarantoolConnection connection, String sql, Object ... params) {
@@ -47,10 +48,10 @@ public static JDBCBridge mock(List<String> fields, List<List<Object>> values) {
4748
}
4849

4950
public static Object execute(TarantoolConnection connection, String sql, Object ... params) {
50-
connection.sql(sql, params);
51-
Long rowCount = connection.getSqlRowCount();
51+
TarantoolBinaryPacket pack = connection.sql(sql, params);
52+
Long rowCount = SqlProtoUtils.getSqlRowCount(pack);
5253
if(rowCount == null) {
53-
return new SQLResultSet(new JDBCBridge(connection));
54+
return new SQLResultSet(new JDBCBridge(pack));
5455
}
5556
return rowCount.intValue();
5657
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.tarantool;
2+
3+
import org.tarantool.server.TarantoolBinaryPacket;
4+
5+
import java.util.ArrayList;
6+
import java.util.LinkedHashMap;
7+
import java.util.List;
8+
import java.util.Map;
9+
10+
public abstract class SqlProtoUtils {
11+
12+
13+
public static List<Map<String, Object>> readSqlResult(TarantoolBinaryPacket pack) {
14+
List<List<?>> data = (List<List<?>>) pack.getBody().get(Key.DATA.getId());
15+
16+
List<Map<String, Object>> values = new ArrayList<Map<String, Object>>(data.size());
17+
List<TarantoolBase.SQLMetaData> metaData = getSQLMetadata(pack);
18+
LinkedHashMap<String, Object> value = new LinkedHashMap<String, Object>();
19+
for (List row : data) {
20+
for (int i = 0; i < row.size(); i++) {
21+
value.put(metaData.get(i).getName(), row.get(i));
22+
}
23+
values.add(value);
24+
}
25+
return values;
26+
}
27+
28+
public static List<List<Object>> getSQLData(TarantoolBinaryPacket pack) {
29+
return (List<List<Object>>) pack.getBody().get(Key.DATA.getId());
30+
}
31+
32+
33+
public static List<TarantoolBase.SQLMetaData> getSQLMetadata(TarantoolBinaryPacket pack) {
34+
List<Map<Integer, Object>> meta = (List<Map<Integer, Object>>) pack.getBody().get(Key.SQL_METADATA.getId());
35+
List<TarantoolBase.SQLMetaData> values = new ArrayList<TarantoolBase.SQLMetaData>(meta.size());
36+
for (Map<Integer, Object> c : meta) {
37+
values.add(new TarantoolBase.SQLMetaData((String) c.get(Key.SQL_FIELD_NAME.getId())));
38+
}
39+
return values;
40+
}
41+
42+
public static Long getSqlRowCount(TarantoolBinaryPacket pack) {
43+
Map<Key, Object> info = (Map<Key, Object>) pack.getBody().get(Key.SQL_INFO.getId());
44+
Number rowCount;
45+
if (info != null && (rowCount = ((Number) info.get(Key.SQL_ROW_COUNT.getId()))) != null) {
46+
return rowCount.longValue();
47+
}
48+
return null;
49+
}
50+
}

src/main/java/org/tarantool/TarantoolBase.java

+6-160
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,15 @@
11
package org.tarantool;
22

3-
import java.io.DataInputStream;
4-
import java.io.DataOutputStream;
3+
import org.tarantool.server.BinaryProtoUtils;
4+
import org.tarantool.server.TarantoolInstanceConnectionMeta;
5+
56
import java.io.IOException;
6-
import java.io.OutputStream;
77
import java.net.Socket;
8-
import java.nio.ByteBuffer;
98
import java.nio.channels.SocketChannel;
10-
import java.security.MessageDigest;
11-
import java.security.NoSuchAlgorithmException;
12-
import java.util.ArrayList;
13-
import java.util.EnumMap;
14-
import java.util.LinkedHashMap;
159
import java.util.List;
16-
import java.util.Map;
1710
import java.util.concurrent.atomic.AtomicLong;
1811

1912
public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Integer, List<?>, Object, Result> {
20-
protected static final String WELCOME = "Tarantool ";
2113
protected String serverVersion;
2214
/**
2315
* Connection state
@@ -26,120 +18,21 @@ public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Integer
2618
protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE;
2719
protected AtomicLong syncId = new AtomicLong();
2820
protected int initialRequestSize = 4096;
29-
/**
30-
* Read properties
31-
*/
32-
protected DataInputStream is;
33-
protected CountInputStream cis;
34-
protected Map<Integer, Object> headers;
35-
protected Map<Integer, Object> body;
3621

3722
public TarantoolBase() {
3823
}
3924

4025
public TarantoolBase(String username, String password, Socket socket) {
4126
super();
4227
try {
43-
this.is = new DataInputStream(cis = new CountInputStreamImpl(socket.getInputStream()));
44-
byte[] bytes = new byte[64];
45-
is.readFully(bytes);
46-
String firstLine = new String(bytes);
47-
if (!firstLine.startsWith(WELCOME)) {
48-
close();
49-
throw new CommunicationException("Welcome message should starts with tarantool but starts with '" + firstLine + "'", new IllegalStateException("Invalid welcome packet"));
50-
}
51-
serverVersion = firstLine.substring(WELCOME.length());
52-
is.readFully(bytes);
53-
this.salt = new String(bytes);
54-
if (username != null && password != null) {
55-
ByteBuffer authPacket = createAuthPacket(username, password);
56-
OutputStream os = socket.getOutputStream();
57-
os.write(authPacket.array(), 0, authPacket.remaining());
58-
os.flush();
59-
readPacket(is);
60-
Long code = (Long) headers.get(Key.CODE.getId());
61-
if (code != 0) {
62-
throw serverError(code, body.get(Key.ERROR.getId()));
63-
}
64-
}
28+
TarantoolInstanceConnectionMeta connectMeta = BinaryProtoUtils.connect(socket, username, password);
29+
this.serverVersion = connectMeta.getServerVersion();
30+
this.salt = connectMeta.getSalt();
6531
} catch (IOException e) {
66-
try {
67-
is.close();
68-
} catch (IOException ignored) {
69-
70-
}
71-
try {
72-
cis.close();
73-
} catch (IOException ignored) {
74-
75-
}
7632
throw new CommunicationException("Couldn't connect to tarantool", e);
7733
}
7834
}
7935

80-
81-
protected ByteBuffer createAuthPacket(String username, final String password) throws IOException {
82-
final MessageDigest sha1;
83-
try {
84-
sha1 = MessageDigest.getInstance("SHA-1");
85-
} catch (NoSuchAlgorithmException e) {
86-
throw new IllegalStateException(e);
87-
}
88-
List auth = new ArrayList(2);
89-
auth.add("chap-sha1");
90-
91-
byte[] p = sha1.digest(password.getBytes());
92-
93-
sha1.reset();
94-
byte[] p2 = sha1.digest(p);
95-
96-
sha1.reset();
97-
sha1.update(Base64.decode(salt), 0, 20);
98-
sha1.update(p2);
99-
byte[] scramble = sha1.digest();
100-
for (int i = 0, e = 20; i < e; i++) {
101-
p[i] ^= scramble[i];
102-
}
103-
auth.add(p);
104-
return createPacket(Code.AUTH, 0L, null, Key.USER_NAME, username, Key.TUPLE, auth);
105-
}
106-
107-
protected ByteBuffer createPacket(Code code, Long syncId, Long schemaId, Object... args) throws IOException {
108-
TarantoolClientImpl.ByteArrayOutputStream bos = new TarantoolClientImpl.ByteArrayOutputStream(initialRequestSize);
109-
bos.write(new byte[5]);
110-
DataOutputStream ds = new DataOutputStream(bos);
111-
Map<Key, Object> header = new EnumMap<Key, Object>(Key.class);
112-
Map<Key, Object> body = new EnumMap<Key, Object>(Key.class);
113-
header.put(Key.CODE, code);
114-
header.put(Key.SYNC, syncId);
115-
if (schemaId != null) {
116-
header.put(Key.SCHEMA_ID, schemaId);
117-
}
118-
if (args != null) {
119-
for (int i = 0, e = args.length; i < e; i += 2) {
120-
Object value = args[i + 1];
121-
body.put((Key) args[i], value);
122-
}
123-
}
124-
msgPackLite.pack(header, ds);
125-
msgPackLite.pack(body, ds);
126-
ds.flush();
127-
ByteBuffer buffer = bos.toByteBuffer();
128-
buffer.put(0, (byte) 0xce);
129-
buffer.putInt(1, bos.size() - 5);
130-
return buffer;
131-
}
132-
133-
protected void readPacket(DataInputStream is) throws IOException {
134-
int size = ((Number) msgPackLite.unpack(is)).intValue();
135-
long mark = cis.getBytesRead();
136-
headers = (Map<Integer, Object>) msgPackLite.unpack(is);
137-
if (cis.getBytesRead() - mark < size) {
138-
body = (Map<Integer, Object>) msgPackLite.unpack(is);
139-
}
140-
is.skipBytes((int) (cis.getBytesRead() - mark - size));
141-
}
142-
14336
protected static class SQLMetaData {
14437
protected String name;
14538

@@ -159,57 +52,10 @@ public String toString() {
15952
}
16053
}
16154

162-
protected List<SQLMetaData> getSQLMetadata() {
163-
List<Map<Integer, Object>> meta = (List<Map<Integer, Object>>) body.get(Key.SQL_METADATA.getId());
164-
List<SQLMetaData> values = new ArrayList<SQLMetaData>(meta.size());
165-
for(Map<Integer,Object> c:meta ) {
166-
values.add(new SQLMetaData((String) c.get(Key.SQL_FIELD_NAME.getId())));
167-
}
168-
return values;
169-
}
170-
171-
protected List<List<Object>> getSQLData() {
172-
return (List<List<Object>>) body.get(Key.DATA.getId());
173-
}
174-
175-
protected List<Map<String, Object>> readSqlResult(List<List<?>> data) {
176-
List<Map<String, Object>> values = new ArrayList<Map<String, Object>>(data.size());
177-
List<SQLMetaData> metaData = getSQLMetadata();
178-
LinkedHashMap<String, Object> value = new LinkedHashMap<String, Object>();
179-
for (List row : data) {
180-
for (int i = 0; i < row.size(); i++) {
181-
value.put(metaData.get(i).getName(), row.get(i));
182-
}
183-
values.add(value);
184-
}
185-
return values;
186-
}
187-
188-
189-
protected Long getSqlRowCount() {
190-
Map<Key, Object> info = (Map<Key, Object>) body.get(Key.SQL_INFO.getId());
191-
Number rowCount;
192-
if (info != null && (rowCount = ((Number) info.get(Key.SQL_ROW_COUNT.getId()))) != null) {
193-
return rowCount.longValue();
194-
}
195-
return null;
196-
}
197-
198-
19955
protected TarantoolException serverError(long code, Object error) {
20056
return new TarantoolException(code, error instanceof String ? (String) error : new String((byte[]) error));
20157
}
20258

203-
protected class ByteArrayOutputStream extends java.io.ByteArrayOutputStream {
204-
public ByteArrayOutputStream(int size) {
205-
super(size);
206-
}
207-
208-
ByteBuffer toByteBuffer() {
209-
return ByteBuffer.wrap(buf, 0, count);
210-
}
211-
}
212-
21359
protected void closeChannel(SocketChannel channel) {
21460
if (channel != null) {
21561
try {

0 commit comments

Comments
 (0)