-
Notifications
You must be signed in to change notification settings - Fork 988
Support UPDATE output mode for Spark Structured Streaming #1123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
can we make this backward compatible also? That way users can use this feature along with ES >= 5.5 or something |
While the project's official stance is that we maintain BWC with the previous major's most recent minor release, we give it our best effort to maintain backwards compatibility with as many versions of Elasticsearch as we can. |
Any sort of timeline or release number on this? |
We usually do not commit to timelines or release targets on public issues. |
@jbaiera can I use |
That seems like a reasonable work around in the mean time. |
@jbaiera Thanks. Can you please provide some more details on how to do that? For example, should I use elastic-hadoop connector or elastic search java driver? Any examples because I dont see in docs on how to do this? I am trying to write streaming dataset to elastic search in update mode. currently using Spark 2.3.0 and E.S 5.3.2 |
This very much depends on your use case. In my experience Spark does not offer much in terms of solid documentation for writing your own sinks. Much of the current implementation for ES-Hadoop was the product of a month of trial and error and reverse engineering existing sink implementations. If you don't much care about the expectations around transaction acknowledgement and skipping acked transactions during a recovery, then the basic Foreach sink should be useable for your case. I understand that the current java rest client can sometimes cause issues in Hadoop/Spark environments because of clashing dependencies. ES-Hadoop has its own Rest Client implementation that it uses to get around this. I don't recommend using the ES-Hadoop internals to build this as they are not covered by our semantic versioning scheme, and may break between any release with no notice. Because of this, I recommend using the available clients for Elasticsearch if you are going to build something as a work around. |
@jbaiera 1) I don't see any REST client for Elastic search 5.3.2. 2) when I used the BulkProcessor from the transport java driver to write to ES I get task not serializable exception. Here is the code
|
If you are using the Transport client you will need to construct the client objects lazily and mark them as transient as they cannot be serialized from the driver to the cluster. |
ok! got it. Btw, I haven't implemented a custom sink before but I looked at the Es-Hadoop-connector code to see how hard would it be to implement the update mode. I feel like |
The interface below is already implemented in the ES-Hadoop-Connector and output modes are taken care by spark other words spark sends whatever output mode a user sets so It looks to me that we just need to change this one
|
@kant111 I'm happy to see that you've taken in interest in the code. That if-statement indeed does block off the option itself from being used, but it doesn't do anything to make sure that the Sink implementation observes the invariants set forth in the UPDATE mode, or follows what a user might expect from UPDATE mode in regards to Elasticsearch. The following things would need to be in place to accept working with UPDATE mode: First, in UPDATE mode, the underlying connector capabilities should be fine as long as it is using the upsert method of ingestion to ES. Alongside this mode, a field must be marked as an ID in the configuration. To support UPDATE mode, it should be checked that these settings are present, or even to set them in the connector at this point, throwing an error if they have values already that are incompatible. Second, for us to support UPDATE mode, we need sufficient testing to make sure that the connector is performing the required operations, and can see the expected outcomes. Building these tests can be time consuming, and thus they were left off of the initial implementation. Finally, there is no plan to support any other output modes than APPEND and (eventually) UPDATE. The COMPLETE output mode (and any future ones) would need to be blocked off since Elasticsearch would not support that kind of output mode. |
I have been waiting for supporting UPDATE output mode.. 😢 Do you have any release schedule? |
Bump. Is there any progress here or timetable after a long wait? 😢 |
If anyone on this ticket is still interested (I know it's been a long wait), I've got a draft PR up for this. I'd appreciate any feedback before we finalize it. See #1839 |
This commit adds support for "update" as the output mode for spark structured streaming to Elasticsearch. Closes #1123
We now support Spark Structured Streaming as of 6.x, but only in
APPEND
mode. It would be beneficial to supportUPDATE
mode, with the requirement that the user MUST provide a field to be used for a document ID.The text was updated successfully, but these errors were encountered: