diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index 30d68971568f2..3cabb2a3f9e14 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -68,6 +68,10 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned { private static final Map TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class); + // if a schema is provided in config, this schema will + // be used for all messages + private Schema schema = null; + static { TO_CONNECT_CONVERTERS.put(Schema.Type.BOOLEAN, (schema, value, config) -> value.booleanValue()); TO_CONNECT_CONVERTERS.put(Schema.Type.INT8, (schema, value, config) -> (byte) value.intValue()); @@ -291,6 +295,17 @@ public void configure(Map configs) { fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize())); toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize())); + + try { + final byte[] schemaContent = config.schemaContent(); + if (schemaContent != null && schemaContent.length > 0) { + final JsonNode schemaNode = deserializer.deserialize("", schemaContent); + this.schema = asConnectSchema(schemaNode); + } + } catch (SerializationException e) { + throw new DataException("Failed to parse schema in converter config due to serialization error: ", e); + } + } @Override @@ -345,13 +360,16 @@ public SchemaAndValue toConnectData(String topic, byte[] value) { throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e); } - if (config.schemasEnabled() && (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))) - throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." + + if (config.schemasEnabled()) { + if (this.schema != null) { + return new SchemaAndValue(schema, convertToConnect(schema, jsonValue, config)); + } else if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) { + throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." + " If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration."); - - // The deserialized data should either be an envelope object containing the schema and the payload or the schema - // was stripped during serialization and we need to fill in an all-encompassing schema. - if (!config.schemasEnabled()) { + } + } else { + // The deserialized data should either be an envelope object containing the schema and the payload or the schema + // was stripped during serialization and we need to fill in an all-encompassing schema. ObjectNode envelope = JSON_NODE_FACTORY.objectNode(); envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null); envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue); diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java index f02d54ac26307..0c2486ec20b43 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.connect.storage.ConverterConfig; +import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.Map; @@ -36,6 +37,10 @@ public final class JsonConverterConfig extends ConverterConfig { private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas"; public static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size"; + public static final String SCHEMA_CONTENT_CONFIG = "schema.content"; + public static final String SCHEMA_CONTENT_DEFAULT = null; + private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema will should be in the contents of each message."; + private static final String SCHEMA_CONTENT_DISPLAY = "Schema Content"; public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000; private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number of schemas that can be cached in this converter instance."; private static final String SCHEMAS_CACHE_SIZE_DISPLAY = "Schema Cache Size"; @@ -61,6 +66,8 @@ public final class JsonConverterConfig extends ConverterConfig { orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY); CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group, orderInGroup++, Width.MEDIUM, SCHEMAS_CACHE_SIZE_DISPLAY); + CONFIG.define(SCHEMA_CONTENT_CONFIG, Type.STRING, SCHEMA_CONTENT_DEFAULT, Importance.HIGH, SCHEMA_CONTENT_DOC, group, + orderInGroup++, Width.MEDIUM, SCHEMA_CONTENT_DISPLAY); group = "Serialization"; orderInGroup = 0; @@ -86,6 +93,7 @@ public static ConfigDef configDef() { private final int schemaCacheSize; private final DecimalFormat decimalFormat; private final boolean replaceNullWithDefault; + private final byte[] schemaContent; public JsonConverterConfig(Map props) { super(CONFIG, props); @@ -93,6 +101,8 @@ public JsonConverterConfig(Map props) { this.schemaCacheSize = getInt(SCHEMAS_CACHE_SIZE_CONFIG); this.decimalFormat = DecimalFormat.valueOf(getString(DECIMAL_FORMAT_CONFIG).toUpperCase(Locale.ROOT)); this.replaceNullWithDefault = getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG); + String schemaContentStr = getString(SCHEMA_CONTENT_CONFIG); + this.schemaContent = schemaContentStr == null ? null : schemaContentStr.getBytes(StandardCharsets.UTF_8); } /** @@ -130,4 +140,17 @@ public boolean replaceNullWithDefault() { return replaceNullWithDefault; } + /** + * If a default schema is provided in the converter config, this will be + * used for all messages. + * + * This is only relevant if schemas are enabled. + * + * @return Schema Contents, will return null if no value is provided + */ + public byte[] schemaContent() { + return schemaContent; + } + + } diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index d79c8527b3c21..8f5f93dcc0bb4 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -36,6 +36,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; @@ -978,6 +980,54 @@ public void testVersionRetrievedFromAppInfoParser() { assertEquals(AppInfoParser.getVersion(), converter.version()); } + @Test + public void testSchemaContentIsNull() { + converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, null), false); + assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes())); + } + + @Test + public void testSchemaContentIsEmptyString() { + converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, ""), false); + assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes())); + } + + @Test + public void testSchemaContentValidSchema() { + converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"string\" }"), false); + assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "\"foo-bar-baz\"".getBytes())); + } + + @Test + public void testSchemaContentInValidSchema() { + assertThrows( + DataException.class, + () -> converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"string\" }"), false), + " Provided schema is invalid , please recheck the schema you have provided"); + } + + @Test + public void testSchemaContentLooksLikeSchema() { + converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"struct\", \"fields\": [{\"field\": \"schema\", \"type\": \"struct\",\"fields\": [{\"field\": \"type\", \"type\": \"string\" }]}, {\"field\": \"payload\", \"type\": \"string\"}]}"), false); + SchemaAndValue connectData = converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()); + assertEquals("foo-bar-baz", ((Struct) connectData.value()).getString("payload")); + } + + @ParameterizedTest + @ValueSource(strings = { + "{ }", + "{ \"wrong\": \"schema\" }", + "{ \"schema\": { \"type\": \"string\" } }", + "{ \"payload\": \"foo-bar-baz\" }", + "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\", \"extra\": \"field\" }", + }) + public void testNullSchemaContentWithWrongConnectDataValue(String value) { + converter.configure(Map.of(), false); + assertThrows( + DataException.class, + () -> converter.toConnectData(TOPIC, value.getBytes())); + } + private JsonNode parse(byte[] json) { try { return objectMapper.readTree(json);