Skip to content

Support UPDATE output mode for Spark Structured Streaming #1123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jbaiera opened this issue Mar 17, 2018 · 16 comments · Fixed by #1839
Closed

Support UPDATE output mode for Spark Structured Streaming #1123

jbaiera opened this issue Mar 17, 2018 · 16 comments · Fixed by #1839

Comments

@jbaiera
Copy link
Member

jbaiera commented Mar 17, 2018

We now support Spark Structured Streaming as of 6.x, but only in APPEND mode. It would be beneficial to support UPDATE mode, with the requirement that the user MUST provide a field to be used for a document ID.

@kant111
Copy link

kant111 commented Mar 19, 2018

can we make this backward compatible also? That way users can use this feature along with ES >= 5.5 or something

@jbaiera
Copy link
Member Author

jbaiera commented Mar 19, 2018

While the project's official stance is that we maintain BWC with the previous major's most recent minor release, we give it our best effort to maintain backwards compatibility with as many versions of Elasticsearch as we can.

@kant111
Copy link

kant111 commented Mar 20, 2018

Any sort of timeline or release number on this?

@jbaiera
Copy link
Member Author

jbaiera commented Mar 20, 2018

We usually do not commit to timelines or release targets on public issues.

@kant111
Copy link

kant111 commented May 1, 2018

@jbaiera can I use update mode and forEachSink to write to ES for now?

@jbaiera
Copy link
Member Author

jbaiera commented May 2, 2018

That seems like a reasonable work around in the mean time.

@kant111
Copy link

kant111 commented May 2, 2018

@jbaiera Thanks. Can you please provide some more details on how to do that? For example, should I use elastic-hadoop connector or elastic search java driver? Any examples because I dont see in docs on how to do this?

I am trying to write streaming dataset to elastic search in update mode.

currently using Spark 2.3.0 and E.S 5.3.2

@jbaiera
Copy link
Member Author

jbaiera commented May 2, 2018

This very much depends on your use case. In my experience Spark does not offer much in terms of solid documentation for writing your own sinks. Much of the current implementation for ES-Hadoop was the product of a month of trial and error and reverse engineering existing sink implementations. If you don't much care about the expectations around transaction acknowledgement and skipping acked transactions during a recovery, then the basic Foreach sink should be useable for your case.

I understand that the current java rest client can sometimes cause issues in Hadoop/Spark environments because of clashing dependencies. ES-Hadoop has its own Rest Client implementation that it uses to get around this. I don't recommend using the ES-Hadoop internals to build this as they are not covered by our semantic versioning scheme, and may break between any release with no notice. Because of this, I recommend using the available clients for Elasticsearch if you are going to build something as a work around.

@kant111
Copy link

kant111 commented May 2, 2018

@jbaiera 1) I don't see any REST client for Elastic search 5.3.2. 2) when I used the BulkProcessor from the transport java driver to write to ES I get task not serializable exception.

Here is the code

public class EsSink extends ForeachWriter<Row> {

    private TransportClient client;
    private BulkProcessor bulkProcessor;

    public EsSink(String cluster, String host, int port) throws UnknownHostException {
        Settings settings = Settings.builder()
                .put("cluster.name", cluster).build();
        String[] elasticSearchIps = host.split(",");
        InetSocketTransportAddress[] inetSocketTransportAddresses = new InetSocketTransportAddress[elasticSearchIps.length];
        for (int i = 0; i < elasticSearchIps.length; i++) {
            inetSocketTransportAddresses[i] = new InetSocketTransportAddress(InetAddress.getByName(elasticSearchIps[i]), port);
        }
        this.client = new PreBuiltTransportClient(settings)
                .addTransportAddresses(inetSocketTransportAddresses);

        this.bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {}

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {}

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {}
                })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }

    @Override
    public boolean open(long l, long l1) {
        return true;
    }

    @Override
    public void process(Row row) {
        String[] fieldNames = row.schema().fieldNames();
        Seq<String> fieldNamesSeq = JavaConverters.asScalaIteratorConverter(Arrays.asList(fieldNames).iterator()).asScala().toSeq();
        String jsonDocument = row.getValuesMap(fieldNamesSeq).toString();
        IndexRequest indexRequest = Requests.indexRequest("hello").type("foo").id(row.get("id")).source(jsonDocument, XContentType.JSON);
        this.bulkProcessor.add(indexRequest);
    }

    @Override
    public void close(Throwable throwable) {
        this.bulkProcessor.close();
        this.client.close();
    }
}

@jbaiera
Copy link
Member Author

jbaiera commented May 3, 2018

If you are using the Transport client you will need to construct the client objects lazily and mark them as transient as they cannot be serialized from the driver to the cluster.

@kant111
Copy link

kant111 commented May 19, 2018

ok! got it. Btw, I haven't implemented a custom sink before but I looked at the Es-Hadoop-connector code to see how hard would it be to implement the update mode. I feel like append mode code already establishes a good code structure in terms of implementing the right interfaces and so on. so my question really now is what changes or how big of a change I would need to make if I were to implement the update mode?

@kant111
Copy link

kant111 commented May 20, 2018

The interface below is already implemented in the ES-Hadoop-Connector and output modes are taken care by spark other words spark sends whatever output mode a user sets so It looks to me that we just need to change this one IF but at the same time I feel thats too good to be true?

trait Sink {
  def addBatch(batchId: Long, data: DataFrame): Unit
}

@jbaiera
Copy link
Member Author

jbaiera commented May 21, 2018

@kant111 I'm happy to see that you've taken in interest in the code. That if-statement indeed does block off the option itself from being used, but it doesn't do anything to make sure that the Sink implementation observes the invariants set forth in the UPDATE mode, or follows what a user might expect from UPDATE mode in regards to Elasticsearch. The following things would need to be in place to accept working with UPDATE mode:

First, in UPDATE mode, the underlying connector capabilities should be fine as long as it is using the upsert method of ingestion to ES. Alongside this mode, a field must be marked as an ID in the configuration. To support UPDATE mode, it should be checked that these settings are present, or even to set them in the connector at this point, throwing an error if they have values already that are incompatible.

Second, for us to support UPDATE mode, we need sufficient testing to make sure that the connector is performing the required operations, and can see the expected outcomes. Building these tests can be time consuming, and thus they were left off of the initial implementation.

Finally, there is no plan to support any other output modes than APPEND and (eventually) UPDATE. The COMPLETE output mode (and any future ones) would need to be blocked off since Elasticsearch would not support that kind of output mode.

@soyme
Copy link

soyme commented Nov 16, 2018

I have been waiting for supporting UPDATE output mode.. 😢 Do you have any release schedule?

@toddleo
Copy link

toddleo commented May 27, 2019

Bump. Is there any progress here or timetable after a long wait? 😢

@masseyke
Copy link
Member

If anyone on this ticket is still interested (I know it's been a long wait), I've got a draft PR up for this. I'd appreciate any feedback before we finalize it. See #1839

masseyke added a commit that referenced this issue Jan 20, 2022
This commit adds support for "update" as the output mode for spark structured streaming to Elasticsearch.
Closes #1123
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants