|
24 | 24 | import java.util.concurrent.TimeUnit;
|
25 | 25 | import java.util.function.BiConsumer;
|
26 | 26 | import java.util.function.Consumer;
|
27 |
| -import java.util.stream.Collectors; |
28 | 27 |
|
29 | 28 | import org.apache.pulsar.client.admin.PulsarAdminBuilder;
|
30 | 29 | import org.apache.pulsar.client.api.Authentication;
|
|
39 | 38 | import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl;
|
40 | 39 |
|
41 | 40 | import org.springframework.boot.context.properties.PropertyMapper;
|
| 41 | +import org.springframework.boot.json.JsonWriter; |
42 | 42 | import org.springframework.pulsar.core.PulsarTemplate;
|
43 | 43 | import org.springframework.pulsar.listener.PulsarContainerProperties;
|
44 | 44 | import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
|
|
53 | 53 | */
|
54 | 54 | final class PulsarPropertiesMapper {
|
55 | 55 |
|
| 56 | + private static final JsonWriter<Map<String, String>> jsonWriter = JsonWriter |
| 57 | + .of((members) -> members.addSelf().as(TreeMap::new).usingPairs(Map::forEach)); |
| 58 | + |
56 | 59 | private final PulsarProperties properties;
|
57 | 60 |
|
58 | 61 | PulsarPropertiesMapper(PulsarProperties properties) {
|
@@ -124,38 +127,14 @@ private void customizeAuthentication(PulsarProperties.Authentication properties,
|
124 | 127 | String pluginClassName = properties.getPluginClassName();
|
125 | 128 | if (StringUtils.hasText(pluginClassName)) {
|
126 | 129 | try {
|
127 |
| - action.accept(pluginClassName, getAuthenticationParamsJson(properties.getParam())); |
| 130 | + action.accept(pluginClassName, jsonWriter.writeToString(properties.getParam())); |
128 | 131 | }
|
129 | 132 | catch (UnsupportedAuthenticationException ex) {
|
130 | 133 | throw new IllegalStateException("Unable to configure Pulsar authentication", ex);
|
131 | 134 | }
|
132 | 135 | }
|
133 | 136 | }
|
134 | 137 |
|
135 |
| - private String getAuthenticationParamsJson(Map<String, String> params) { |
136 |
| - Map<String, String> sortedParams = new TreeMap<>(params); |
137 |
| - try { |
138 |
| - return sortedParams.entrySet() |
139 |
| - .stream() |
140 |
| - .map((entry) -> "\"%s\":\"%s\"".formatted(entry.getKey(), escapeJson(entry.getValue()))) |
141 |
| - .collect(Collectors.joining(",", "{", "}")); |
142 |
| - } |
143 |
| - catch (Exception ex) { |
144 |
| - throw new IllegalStateException("Could not convert auth parameters to encoded string", ex); |
145 |
| - } |
146 |
| - } |
147 |
| - |
148 |
| - private String escapeJson(String raw) { |
149 |
| - return raw.replace("\\", "\\\\") |
150 |
| - .replace("\"", "\\\"") |
151 |
| - .replace("/", "\\/") |
152 |
| - .replace("\b", "\\b") |
153 |
| - .replace("\t", "\\t") |
154 |
| - .replace("\n", "\\n") |
155 |
| - .replace("\f", "\\f") |
156 |
| - .replace("\r", "\\r"); |
157 |
| - } |
158 |
| - |
159 | 138 | <T> void customizeProducerBuilder(ProducerBuilder<T> producerBuilder) {
|
160 | 139 | PulsarProperties.Producer properties = this.properties.getProducer();
|
161 | 140 | PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
|
0 commit comments