forked from pgjdbc/r2dbc-postgresql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBuiltinDynamicCodecs.java
138 lines (110 loc) · 4.89 KB
/
BuiltinDynamicCodecs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.r2dbc.postgresql.codec;
import io.netty.buffer.ByteBufAllocator;
import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.api.PostgresqlStatement;
import io.r2dbc.postgresql.extension.CodecRegistrar;
import org.reactivestreams.Publisher;
import reactor.util.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* {@link CodecRegistrar} to register built-in codecs depending on their availability in {@code pg_type}.
*/
public class BuiltinDynamicCodecs implements CodecRegistrar {
private static final Object EMPTY = new Object();
enum BuiltinCodec {
HSTORE("hstore"),
POSTGIS_GEOMETRY("geometry") {
private final boolean jtsPresent = isPresent(BuiltinDynamicCodecs.class.getClassLoader(), "org.locationtech.jts.geom.Geometry");
@Override
public boolean isSupported() {
return this.jtsPresent;
}
}, VECTOR("vector");
private final String name;
BuiltinCodec(String name) {
this.name = name;
}
public Iterable<Codec<?>> createCodec(ByteBufAllocator byteBufAllocator, int oid, int typarray) {
switch (this) {
case HSTORE:
return Collections.singletonList(new HStoreCodec(byteBufAllocator, oid));
case POSTGIS_GEOMETRY:
return Collections.singletonList(new PostgisGeometryCodec(oid));
case VECTOR:
VectorCodec vectorCodec = new VectorCodec(byteBufAllocator, oid, typarray);
List<Codec<?>> codecs = new ArrayList<>(3);
codecs.add(vectorCodec);
if (typarray != PostgresTypes.NO_SUCH_TYPE) {
codecs.add(new VectorCodec.VectorArrayCodec(byteBufAllocator, vectorCodec));
}
codecs.add(new VectorFloatCodec(byteBufAllocator, oid));
return codecs;
default:
throw new UnsupportedOperationException(String.format("Codec %s for OID %d not supported", name(), oid));
}
}
public String getName() {
return this.name;
}
boolean isSupported() {
return true;
}
static BuiltinCodec lookup(@Nullable String name) {
for (BuiltinCodec codec : values()) {
if (codec.getName().equalsIgnoreCase(name)) {
return codec;
}
}
throw new IllegalArgumentException(String.format("Cannot determine codec for %s", name));
}
}
@Override
public Publisher<Void> register(PostgresqlConnection connection, ByteBufAllocator byteBufAllocator, CodecRegistry registry) {
PostgresqlStatement statement = createQuery(connection);
return statement.execute()
.flatMap(it -> it.map((row, rowMetadata) -> {
String typname = row.get("typname", String.class);
BuiltinCodec lookup = BuiltinCodec.lookup(typname);
if (lookup.isSupported()) {
int oid = PostgresqlObjectId.toInt(row.get("oid", Long.class));
int typarray = rowMetadata.contains("typarray") ? PostgresqlObjectId.toInt(row.get("typarray", Long.class)) : PostgresTypes.NO_SUCH_TYPE;
lookup.createCodec(byteBufAllocator, oid, typarray).forEach(registry::addLast);
}
return EMPTY;
})
).then();
}
private PostgresqlStatement createQuery(PostgresqlConnection connection) {
return connection.createStatement(String.format("SELECT * FROM pg_catalog.pg_type WHERE typname IN (%s)", getPlaceholders()));
}
private static String getPlaceholders() {
return Arrays.stream(BuiltinCodec.values()).map(s -> "'" + s.getName() + "'").collect(Collectors.joining(","));
}
private static boolean isPresent(ClassLoader classLoader, String name) {
try {
Class.forName(name, false, classLoader);
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
}