Skip to content

[bq] refactoring & dependency upgrade #141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/spring-batch-bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout source code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 17
cache: 'maven'
- name: Build with Maven
run: mvn -B package --file pom.xml
working-directory: spring-batch-bigquery
working-directory: spring-batch-bigquery
22 changes: 12 additions & 10 deletions spring-batch-bigquery/README.adoc
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# spring-batch-bigquery
= spring-batch-bigquery

Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery]. It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs].
Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery].
It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs].

## Configuration of `BigQueryCsvItemWriter`
== Configuration of `BigQueryCsvItemWriter`

Next to the https://docs.spring.io/spring-batch/reference/html/configureJob.html[configuration of Spring Batch] one needs to configure the `BigQueryCsvItemWriter`.

```javaBigQueryCsv
[source,java]
----
@Bean
BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
Expand All @@ -20,19 +22,19 @@ BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
.writeChannelConfig(writeConfiguration)
.build();
}
```
----

Additional examples could be found in https://github.com/spring-projects/spring-batch-extensions/blob/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/writer/builder/[here].

## Configuration properties
== Configuration properties
[cols="1,1,4"]
.Properties for item writer
.Properties for an item writer
|===
| Property | Required | Description

| `bigQuery` | yes | BigQuery object that provided by BigQuery Java Library. Responsible for connection with BigQuery.
| `writeChannelConfig` | yes | BigQuery write channel config provided by BigQuery Java Library. Responsible for configuring data type, data channel, jobs that will be sent to BigQuery.
| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to byte array.
| `datasetInfo` | no | Your way to customize to how to create BigQuery dataset.
| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to a byte array.
| `datasetInfo` | no | Your way to customize how to create BigQuery dataset.
| `jobConsumer` | no | Your custom handler for BigQuery Job provided by BigQuery Java Library.
|===
|===
37 changes: 17 additions & 20 deletions spring-batch-bigquery/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2002-2023 the original author or authors.
~ Copyright 2002-2024 the original author or authors.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,9 @@
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
--><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -51,50 +53,45 @@

<!-- Dependent on Spring Batch core -->
<java.version>17</java.version>
<logback.version>1.4.14</logback.version>
<logback.version>1.5.7</logback.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.1.0</version>
<version>5.1.2</version>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.35.0</version>
<version>2.42.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>2.16.0</version>
<version>2.17.2</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
<version>3.16.0</version>
</dependency>


<!-- Test Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.1</version>
<version>5.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.8.0</version>
<version>5.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -112,7 +109,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
<version>2.0.16</version>
<scope>test</scope>
</dependency>

Expand All @@ -125,7 +122,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<version>3.13.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
Expand All @@ -136,7 +133,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.2</version>
<version>3.3.1</version>
<configuration>
<includes>
<!-- Integration tests are omitted because they are designed to be run locally -->
Expand All @@ -149,7 +146,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.6.3</version>
<version>3.8.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
Expand All @@ -162,7 +159,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -175,4 +172,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
import org.springframework.util.Assert;

import java.util.Iterator;
import java.util.Objects;

/**
* BigQuery {@link ItemReader} that accepts simple query as the input.
Expand All @@ -37,7 +36,7 @@
* <p>
* Also, worth mentioning that you should take into account concurrency limits.
* <p>
* Results of this query by default are stored in a shape of temporary table.
* Results of this query by default are stored in the shape of a temporary table.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
Expand Down Expand Up @@ -84,7 +83,7 @@ public void setJobConfiguration(QueryJobConfiguration jobConfiguration) {

@Override
public T read() throws Exception {
if (Objects.isNull(iterator)) {
if (iterator == null) {
doOpen();
}

Expand All @@ -109,4 +108,4 @@ public void afterPropertiesSet() {
Assert.notNull(this.jobConfiguration, "Job configuration must be provided");
}

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,8 +24,6 @@
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

import java.util.Objects;

/**
* A builder for {@link BigQueryQueryItemReader}.
*
Expand Down Expand Up @@ -70,7 +68,7 @@ public BigQueryQueryItemReaderBuilder<T> query(String query) {
}

/**
* Row mapper which transforms single BigQuery row into desired type.
* Row mapper which transforms single BigQuery row into a desired type.
*
* @param rowMapper your row mapper
* @return {@link BigQueryQueryItemReaderBuilder}
Expand All @@ -94,7 +92,7 @@ public BigQueryQueryItemReaderBuilder<T> jobConfiguration(QueryJobConfiguration
}

/**
* Please do not forget about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
* Please remember about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
*
* @return {@link BigQueryQueryItemReader}
*/
Expand All @@ -104,14 +102,14 @@ public BigQueryQueryItemReader<T> build() {
reader.setBigQuery(this.bigQuery);
reader.setRowMapper(this.rowMapper);

if (Objects.nonNull(this.jobConfiguration)) {
reader.setJobConfiguration(this.jobConfiguration);
} else {
if (this.jobConfiguration == null) {
Assert.isTrue(StringUtils.isNotBlank(this.query), "No query provided");
reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build());
} else {
reader.setJobConfiguration(this.jobConfiguration);
}

return reader;
}

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,6 +54,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {

/** Logger that can be reused */
protected final Log logger = LogFactory.getLog(getClass());

private final AtomicLong bigQueryWriteCounter = new AtomicLong();

/**
Expand All @@ -77,7 +78,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {


/**
* Fetches table from provided configuration.
* Fetches table from the provided configuration.
*
* @return {@link Table} that is described in {@link BigQueryBaseItemWriter#writeChannelConfig}
*/
Expand Down Expand Up @@ -123,7 +124,7 @@ public void setBigQuery(BigQuery bigQuery) {

@Override
public void write(Chunk<? extends T> chunk) throws Exception {
if (BooleanUtils.isFalse(chunk.isEmpty())) {
if (!chunk.isEmpty()) {
List<? extends T> items = chunk.getItems();
doInitializeProperties(items);

Expand All @@ -147,8 +148,8 @@ private ByteBuffer mapDataToBigQueryFormat(List<? extends T> items) throws IOExc
}

/*
* It is extremely important to create larger ByteBuffer,
* if you call TableDataWriteChannel too many times, it leads to BigQuery exceptions.
* It is extremely important to create larger ByteBuffer.
* If you call TableDataWriteChannel too many times, it leads to BigQuery exceptions.
*/
byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
}
Expand All @@ -170,9 +171,9 @@ private void doWriteDataToBigQuery(ByteBuffer byteBuffer) throws IOException {
finally {
String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet();

if (Objects.nonNull(writeChannel)) {
if (writeChannel != null) {
logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob();
if (Objects.nonNull(this.jobConsumer)) {
if (this.jobConsumer != null) {
this.jobConsumer.accept(writeChannel.getJob());
}
}
Expand Down Expand Up @@ -212,7 +213,7 @@ protected void baseAfterPropertiesSet(Supplier<Void> formatSpecificChecks) {
Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided");

String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
if (Objects.isNull(this.datasetInfo)) {
if (this.datasetInfo == null) {
this.datasetInfo = DatasetInfo.newBuilder(dataset).build();
}

Expand All @@ -228,13 +229,11 @@ private void createDataset() {
TableId tableId = this.writeChannelConfig.getDestinationTable();
String datasetToCheck = tableId.getDataset();

if (Objects.nonNull(datasetToCheck)) {
if (datasetToCheck != null) {
Dataset foundDataset = this.bigQuery.getDataset(datasetToCheck);

if (Objects.isNull(foundDataset)) {
if (Objects.nonNull(this.datasetInfo)) {
this.bigQuery.create(this.datasetInfo);
}
if (foundDataset == null && this.datasetInfo != null) {
this.bigQuery.create(this.datasetInfo);
}
}
}
Expand Down Expand Up @@ -264,7 +263,7 @@ private boolean isDatastore() {
}

/**
* Schema can be computed on BigQuery side during upload,
* Schema can be computed on the BigQuery side during upload,
* so it is good to know when schema is supplied by user manually.
*
* @param table BigQuery table
Expand All @@ -287,12 +286,12 @@ protected boolean tableHasDefinedSchema(Table table) {
protected abstract void doInitializeProperties(List<? extends T> items);

/**
* Converts chunk into byte array.
* Converts chunk into a byte array.
* Each data type should be converted with respect to its specification.
*
* @param items current chunk
* @return {@link List<byte[]>} converted list of byte arrays
*/
protected abstract List<byte[]> convertObjectsToByteArrays(List<? extends T> items);

}
}
Loading
Loading