diff --git a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java index 866e13768..c899c8b84 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.StringTokenizer; +import java.util.stream.Collectors; /** @@ -126,20 +127,44 @@ public static List tokenize(String string, String delimiters, boolean tr if (!StringUtils.hasText(string)) { return Collections.emptyList(); } - StringTokenizer st = new StringTokenizer(string, delimiters); - List tokens = new ArrayList(); - while (st.hasMoreTokens()) { - String token = st.nextToken(); - if (trimTokens) { - token = token.trim(); + List tokens = new ArrayList<>(); + char[] delims = delimiters.toCharArray(); + StringBuilder currentToken = new StringBuilder(); + boolean inQuotedToken = false; + for (char character : string.toCharArray()) { + if (character == '\"') { + inQuotedToken = !inQuotedToken; } - if (!ignoreEmptyTokens || token.length() > 0) { - tokens.add(token); + else if (inQuotedToken == false && isCharacterInArray(character, delims)) { + addTokenToList(tokens, currentToken, trimTokens, ignoreEmptyTokens); + currentToken = new StringBuilder(); + } else { + currentToken.append(character); } } + addTokenToList(tokens, currentToken, trimTokens, ignoreEmptyTokens); return tokens; } + private static void addTokenToList(List tokens, StringBuilder newToken, boolean trimTokens, boolean ignoreEmptyTokens) { + String token = newToken.toString(); + if (trimTokens) { + token = token.trim(); + } + if (!ignoreEmptyTokens || token.length() > 0) { + tokens.add(token); + } + } + + private static boolean isCharacterInArray(char character, char[] charArray) { + for (char arrayChar : charArray) { + if (character == arrayChar) { + return true; + } + } + return false; + } + public static String concatenate(Collection list) { return concatenate(list, DEFAULT_DELIMITER); } @@ -151,15 +176,10 @@ public static String concatenate(Collection list, String delimiter) { if (delimiter == null) { delimiter = EMPTY; } - StringBuilder sb = new StringBuilder(); - - for (Object object : list) { - sb.append(object.toString()); - sb.append(delimiter); - } - - sb.setLength(sb.length() - delimiter.length()); - return sb.toString(); + final String finalDelimiter = delimiter; + return list.stream().map(item -> item.toString()) + .map(token -> optionallyWrapToken(token, finalDelimiter)) + .collect(Collectors.joining(delimiter)); } public static String concatenate(Object[] array, String delimiter) { @@ -169,15 +189,14 @@ public static String concatenate(Object[] array, String delimiter) { if (delimiter == null) { delimiter = EMPTY; } + final String finalDelimiter = delimiter; + return Arrays.stream(array).map(item -> item.toString()) + .map(token -> optionallyWrapToken(token, finalDelimiter)) + .collect(Collectors.joining(delimiter)); + } - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < array.length; i++) { - if (i > 0) { - sb.append(delimiter); - } - sb.append(array[i]); - } - return sb.toString(); + private static String optionallyWrapToken(String token, String delimiter) { + return token.contains(delimiter) ? "\"" + token + "\"" : token; } public static String deleteWhitespace(CharSequence sequence) { diff --git a/mr/src/test/java/org/elasticsearch/hadoop/util/StringUtilsTest.java b/mr/src/test/java/org/elasticsearch/hadoop/util/StringUtilsTest.java index 5ddfb1623..e38ea5887 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/util/StringUtilsTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/util/StringUtilsTest.java @@ -20,6 +20,10 @@ import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import static org.junit.Assert.*; public class StringUtilsTest { @@ -72,4 +76,32 @@ public void testSingularIndexNames() { assertFalse(StringUtils.isValidSingularIndexName("abc{date|yyyy-MM-dd}defg")); } + + @Test + public void testTokenize() { + List test1 = Arrays.asList(new String[]{"this", "is a", "test"}); + String concatenatedString = StringUtils.concatenate(test1); + List tokens = StringUtils.tokenize(concatenatedString, ",", true, true); + assertEquals(test1, tokens); + + List test2 = Arrays.asList(new String[]{"this", " is a", " test ", " "}); + concatenatedString = StringUtils.concatenate(test2); + tokens = StringUtils.tokenize(concatenatedString, ",", false, false); + assertEquals(test2, tokens); + + List test3 = Arrays.asList(new String[]{"this", "is, a", "test"}); + concatenatedString = StringUtils.concatenate(test3); + tokens = StringUtils.tokenize(concatenatedString, ",", true, true); + assertEquals(test3, tokens); + + Object[] test4 = new String[]{"this", "is, a", "test"}; + concatenatedString = StringUtils.concatenate(test4, ";"); + tokens = StringUtils.tokenize(concatenatedString, ";", true, true); + assertEquals(Arrays.asList(test4), tokens); + + List test5 = Arrays.asList(new String[]{"this", "is, a", "test"}); + concatenatedString = StringUtils.concatenate(test5, ","); + tokens = StringUtils.tokenize(concatenatedString, ";,", true, true); + assertEquals(test5, tokens); + } } diff --git a/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index 0cd96120f..d0cf23f1a 100644 --- a/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -2223,7 +2223,30 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals(nested(0).getLong(1), 6) } - + @Test + def testCommasInFieldNames(): Unit = { + val index = wrapIndex("commas-in-names-index") + val typed = "data" + val (target, docPath) = makeTargets(index, typed) + val mapping = wrapMapping("data", s"""{ + | "dynamic": "strict", + | "properties" : { + | "some column with a comma, and then some" : { + | "type" : "keyword" + | } + | } + | } + """.stripMargin) + RestUtils.touch(index) + RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8)) + RestUtils.postData(docPath, "{\"some column with a comma, and then some\": \"sdfdsf\"}".getBytes("UTF-8")) + RestUtils.refresh(target) + val df = sqc.read.format("es").load(index) + df.printSchema() + df.show() + assertEquals(1, df.count()) + } + @Test def testMultiIndexes() { // add some data diff --git a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index fd146041a..a76d08885 100644 --- a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -2284,7 +2284,30 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals(nested(0).getLong(1), 6) } - + @Test + def testCommasInFieldNames(): Unit = { + val index = wrapIndex("commas-in-names-index") + val typed = "data" + val (target, docPath) = makeTargets(index, typed) + val mapping = wrapMapping("data", s"""{ + | "dynamic": "strict", + | "properties" : { + | "some column with a comma, and then some" : { + | "type" : "keyword" + | } + | } + | } + """.stripMargin) + RestUtils.touch(index) + RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8)) + RestUtils.postData(docPath, "{\"some column with a comma, and then some\": \"sdfdsf\"}".getBytes("UTF-8")) + RestUtils.refresh(target) + val df = sqc.read.format("es").load(index) + df.printSchema() + df.show() + assertEquals(1, df.count()) + } + @Test def testMultiIndexes() { // add some data diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index 53255bf62..99e3c601e 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -2284,7 +2284,30 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals(nested(0).getLong(1), 6) } - + @Test + def testCommasInFieldNames(): Unit = { + val index = wrapIndex("commas-in-names-index") + val typed = "data" + val (target, docPath) = makeTargets(index, typed) + val mapping = wrapMapping("data", s"""{ + | "dynamic": "strict", + | "properties" : { + | "some column with a comma, and then some" : { + | "type" : "keyword" + | } + | } + | } + """.stripMargin) + RestUtils.touch(index) + RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8)) + RestUtils.postData(docPath, "{\"some column with a comma, and then some\": \"sdfdsf\"}".getBytes("UTF-8")) + RestUtils.refresh(target) + val df = sqc.read.format("es").load(index) + df.printSchema() + df.show() + assertEquals(1, df.count()) + } + @Test def testMultiIndexes() { // add some data