Skip to content

KAFKA-16913 - Support external schemas in JSONConverter #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public class JsonConverter implements Converter, HeaderConverter, Versioned {

private static final Map<Schema.Type, JsonToConnectTypeConverter> TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class);

// if a schema is provided in config, this schema will
// be used for all messages
private Schema schema = null;

static {
TO_CONNECT_CONVERTERS.put(Schema.Type.BOOLEAN, (schema, value, config) -> value.booleanValue());
TO_CONNECT_CONVERTERS.put(Schema.Type.INT8, (schema, value, config) -> (byte) value.intValue());
Expand Down Expand Up @@ -291,6 +295,17 @@ public void configure(Map<String, ?> configs) {

fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize()));
toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<>(config.schemaCacheSize()));

try {
final byte[] schemaContent = config.schemaContent();
if (schemaContent != null && schemaContent.length > 0) {
final JsonNode schemaNode = deserializer.deserialize("", schemaContent);
this.schema = asConnectSchema(schemaNode);
}
} catch (SerializationException e) {
throw new DataException("Failed to parse schema in converter config due to serialization error: ", e);
}

}

@Override
Expand Down Expand Up @@ -345,13 +360,16 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
}

if (config.schemasEnabled() && (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)))
throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
if (config.schemasEnabled()) {
if (this.schema != null) {
return new SchemaAndValue(schema, convertToConnect(schema, jsonValue, config));
} else if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
" If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.");

// The deserialized data should either be an envelope object containing the schema and the payload or the schema
// was stripped during serialization and we need to fill in an all-encompassing schema.
if (!config.schemasEnabled()) {
}
} else {
// The deserialized data should either be an envelope object containing the schema and the payload or the schema
// was stripped during serialization and we need to fill in an all-encompassing schema.
ObjectNode envelope = JSON_NODE_FACTORY.objectNode();
envelope.set(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME, null);
envelope.set(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME, jsonValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.storage.ConverterConfig;

import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.Map;

Expand All @@ -36,6 +37,10 @@ public final class JsonConverterConfig extends ConverterConfig {
private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas";

public static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size";
public static final String SCHEMA_CONTENT_CONFIG = "schema.content";
public static final String SCHEMA_CONTENT_DEFAULT = null;
private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema will should be in the contents of each message.";
private static final String SCHEMA_CONTENT_DISPLAY = "Schema Content";
public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number of schemas that can be cached in this converter instance.";
private static final String SCHEMAS_CACHE_SIZE_DISPLAY = "Schema Cache Size";
Expand All @@ -61,6 +66,8 @@ public final class JsonConverterConfig extends ConverterConfig {
orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY);
CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group,
orderInGroup++, Width.MEDIUM, SCHEMAS_CACHE_SIZE_DISPLAY);
CONFIG.define(SCHEMA_CONTENT_CONFIG, Type.STRING, SCHEMA_CONTENT_DEFAULT, Importance.HIGH, SCHEMA_CONTENT_DOC, group,
orderInGroup++, Width.MEDIUM, SCHEMA_CONTENT_DISPLAY);

group = "Serialization";
orderInGroup = 0;
Expand All @@ -86,13 +93,16 @@ public static ConfigDef configDef() {
private final int schemaCacheSize;
private final DecimalFormat decimalFormat;
private final boolean replaceNullWithDefault;
private final byte[] schemaContent;

public JsonConverterConfig(Map<String, ?> props) {
super(CONFIG, props);
this.schemasEnabled = getBoolean(SCHEMAS_ENABLE_CONFIG);
this.schemaCacheSize = getInt(SCHEMAS_CACHE_SIZE_CONFIG);
this.decimalFormat = DecimalFormat.valueOf(getString(DECIMAL_FORMAT_CONFIG).toUpperCase(Locale.ROOT));
this.replaceNullWithDefault = getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG);
String schemaContentStr = getString(SCHEMA_CONTENT_CONFIG);
this.schemaContent = schemaContentStr == null ? null : schemaContentStr.getBytes(StandardCharsets.UTF_8);
}

/**
Expand Down Expand Up @@ -130,4 +140,17 @@ public boolean replaceNullWithDefault() {
return replaceNullWithDefault;
}

/**
* If a default schema is provided in the converter config, this will be
* used for all messages.
*
* This is only relevant if schemas are enabled.
*
* @return Schema Contents, will return null if no value is provided
*/
public byte[] schemaContent() {
return schemaContent;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -978,6 +980,54 @@ public void testVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), converter.version());
}

@Test
public void testSchemaContentIsNull() {
converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, null), false);
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
}

@Test
public void testSchemaContentIsEmptyString() {
converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, ""), false);
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
}

@Test
public void testSchemaContentValidSchema() {
converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"string\" }"), false);
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectData(TOPIC, "\"foo-bar-baz\"".getBytes()));
}

@Test
public void testSchemaContentInValidSchema() {
assertThrows(
DataException.class,
() -> converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"string\" }"), false),
" Provided schema is invalid , please recheck the schema you have provided");
}

@Test
public void testSchemaContentLooksLikeSchema() {
converter.configure(Map.of(JsonConverterConfig.SCHEMA_CONTENT_CONFIG, "{ \"type\": \"struct\", \"fields\": [{\"field\": \"schema\", \"type\": \"struct\",\"fields\": [{\"field\": \"type\", \"type\": \"string\" }]}, {\"field\": \"payload\", \"type\": \"string\"}]}"), false);
SchemaAndValue connectData = converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes());
assertEquals("foo-bar-baz", ((Struct) connectData.value()).getString("payload"));
}

@ParameterizedTest
@ValueSource(strings = {
"{ }",
"{ \"wrong\": \"schema\" }",
"{ \"schema\": { \"type\": \"string\" } }",
"{ \"payload\": \"foo-bar-baz\" }",
"{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\", \"extra\": \"field\" }",
})
public void testNullSchemaContentWithWrongConnectDataValue(String value) {
converter.configure(Map.of(), false);
assertThrows(
DataException.class,
() -> converter.toConnectData(TOPIC, value.getBytes()));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a test case to verify that the converter correctly handles a schema with a default value when replace.null.with.default is enabled. This will ensure that the new functionality works as expected in all scenarios.

}

private JsonNode parse(byte[] json) {
try {
return objectMapper.readTree(json);
Expand Down