Skip to content

Commit 9c02e11

Browse files
authored
[bq] 0.2 introduce BigQuery interactive reader
1 parent 038f973 commit 9c02e11

24 files changed

+791
-101
lines changed

spring-batch-bigquery/pom.xml

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!--
3-
~ Copyright 2002-2022 the original author or authors.
3+
~ Copyright 2002-2023 the original author or authors.
44
~
55
~ Licensed under the Apache License, Version 2.0 (the "License");
66
~ you may not use this file except in compliance with the License.
@@ -64,7 +64,7 @@
6464
<dependency>
6565
<groupId>com.google.cloud</groupId>
6666
<artifactId>google-cloud-bigquery</artifactId>
67-
<version>2.19.1</version>
67+
<version>2.20.2</version>
6868
</dependency>
6969
<dependency>
7070
<groupId>com.fasterxml.jackson.dataformat</groupId>
@@ -88,13 +88,13 @@
8888
<dependency>
8989
<groupId>org.junit.jupiter</groupId>
9090
<artifactId>junit-jupiter-api</artifactId>
91-
<version>5.9.1</version>
91+
<version>5.9.2</version>
9292
<scope>test</scope>
9393
</dependency>
9494
<dependency>
9595
<groupId>org.mockito</groupId>
9696
<artifactId>mockito-core</artifactId>
97-
<version>4.9.0</version>
97+
<version>5.0.0</version>
9898
<scope>test</scope>
9999
</dependency>
100100
<dependency>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.reader;
18+
19+
import com.google.cloud.bigquery.BigQuery;
20+
import com.google.cloud.bigquery.FieldValueList;
21+
import com.google.cloud.bigquery.QueryJobConfiguration;
22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
24+
import org.springframework.batch.item.ItemReader;
25+
import org.springframework.beans.factory.InitializingBean;
26+
import org.springframework.core.convert.converter.Converter;
27+
import org.springframework.util.Assert;
28+
29+
import java.util.Iterator;
30+
import java.util.Objects;
31+
32+
/**
33+
* BigQuery {@link ItemReader} that accepts simple query as the input.
34+
* <p>
35+
* Internally BigQuery Java library creates a {@link com.google.cloud.bigquery.JobConfiguration.Type#QUERY} job.
36+
* Which means that result is coming asynchronously.
37+
* <p>
38+
* Also, worth mentioning that you should take into account concurrency limits.
39+
* <p>
40+
* Results of this query by default are stored in a shape of temporary table.
41+
*
42+
* @param <T> your DTO type
43+
* @author Volodymyr Perebykivskyi
44+
* @since 0.2.0
45+
* @see <a href="https://cloud.google.com/bigquery/docs/running-queries#queries">Interactive queries</a>
46+
* @see <a href="https://cloud.google.com/bigquery/quotas#concurrent_rate_interactive_queries">Concurrency limits</a>
47+
*/
48+
public class BigQueryInteractiveQueryItemReader<T> implements ItemReader<T>, InitializingBean {
49+
50+
private final Log logger = LogFactory.getLog(getClass());
51+
52+
private BigQuery bigQuery;
53+
private Converter<FieldValueList, T> rowMapper;
54+
private QueryJobConfiguration jobConfiguration;
55+
private Iterator<FieldValueList> iterator;
56+
57+
/**
58+
* BigQuery service, responsible for API calls.
59+
*
60+
* @param bigQuery BigQuery service
61+
*/
62+
public void setBigQuery(BigQuery bigQuery) {
63+
this.bigQuery = bigQuery;
64+
}
65+
66+
/**
67+
* Row mapper which transforms single BigQuery row into desired type.
68+
*
69+
* @param rowMapper your row mapper
70+
*/
71+
public void setRowMapper(Converter<FieldValueList, T> rowMapper) {
72+
this.rowMapper = rowMapper;
73+
}
74+
75+
/**
76+
* Specifies query to run, destination table, etc.
77+
*
78+
* @param jobConfiguration BigQuery job configuration
79+
*/
80+
public void setJobConfiguration(QueryJobConfiguration jobConfiguration) {
81+
this.jobConfiguration = jobConfiguration;
82+
}
83+
84+
@Override
85+
public T read() throws Exception {
86+
if (Objects.isNull(iterator)) {
87+
doOpen();
88+
}
89+
90+
if (logger.isDebugEnabled()) {
91+
logger.debug("Reading next element");
92+
}
93+
94+
return iterator.hasNext() ? rowMapper.convert(iterator.next()) : null;
95+
}
96+
97+
private void doOpen() throws Exception {
98+
if (logger.isDebugEnabled()) {
99+
logger.debug("Executing query");
100+
}
101+
iterator = bigQuery.query(jobConfiguration).getValues().iterator();
102+
}
103+
104+
@Override
105+
public void afterPropertiesSet() {
106+
Assert.notNull(this.bigQuery, "BigQuery service must be provided");
107+
Assert.notNull(this.rowMapper, "Row mapper must be provided");
108+
Assert.notNull(this.jobConfiguration, "Job configuration must be provided");
109+
}
110+
111+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.reader.builder;
18+
19+
import com.google.cloud.bigquery.BigQuery;
20+
import com.google.cloud.bigquery.FieldValueList;
21+
import com.google.cloud.bigquery.QueryJobConfiguration;
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.springframework.batch.extensions.bigquery.reader.BigQueryInteractiveQueryItemReader;
24+
import org.springframework.core.convert.converter.Converter;
25+
import org.springframework.util.Assert;
26+
27+
import java.util.Objects;
28+
29+
/**
30+
* A builder for {@link BigQueryInteractiveQueryItemReader}.
31+
*
32+
* @param <T> your DTO type
33+
* @author Volodymyr Perebykivskyi
34+
* @since 0.2.0
35+
* @see <a href="https://github.com/spring-projects/spring-batch-extensions/tree/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/reader/builder/BigQueryInteractiveQueryItemReaderBuilderTests.java">Examples</a>
36+
*/
37+
public class BigQueryInteractiveQueryItemReaderBuilder<T> {
38+
39+
private BigQuery bigQuery;
40+
private String query;
41+
private Converter<FieldValueList, T> rowMapper;
42+
private QueryJobConfiguration jobConfiguration;
43+
44+
/**
45+
* BigQuery service, responsible for API calls.
46+
*
47+
* @param bigQuery BigQuery service
48+
* @return {@link BigQueryInteractiveQueryItemReaderBuilder}
49+
* @see BigQueryInteractiveQueryItemReader#setBigQuery(BigQuery)
50+
*/
51+
public BigQueryInteractiveQueryItemReaderBuilder<T> bigQuery(BigQuery bigQuery) {
52+
this.bigQuery = bigQuery;
53+
return this;
54+
}
55+
56+
/**
57+
* Schema of the query: {@code SELECT <column> FROM <dataset>.<table>}.
58+
* <p>
59+
* It is really recommended to use {@code LIMIT n}
60+
* because BigQuery charges you for the amount of data that is being processed.
61+
*
62+
* @param query your query to run
63+
* @return {@link BigQueryInteractiveQueryItemReaderBuilder}
64+
* @see BigQueryInteractiveQueryItemReader#setJobConfiguration(QueryJobConfiguration)
65+
*/
66+
public BigQueryInteractiveQueryItemReaderBuilder<T> query(String query) {
67+
this.query = query;
68+
return this;
69+
}
70+
71+
/**
72+
* Row mapper which transforms single BigQuery row into desired type.
73+
*
74+
* @param rowMapper your row mapper
75+
* @return {@link BigQueryInteractiveQueryItemReaderBuilder}
76+
* @see BigQueryInteractiveQueryItemReader#setRowMapper(Converter)
77+
*/
78+
public BigQueryInteractiveQueryItemReaderBuilder<T> rowMapper(Converter<FieldValueList, T> rowMapper) {
79+
this.rowMapper = rowMapper;
80+
return this;
81+
}
82+
83+
/**
84+
* Specifies query to run, destination table, etc.
85+
*
86+
* @param jobConfiguration BigQuery job configuration
87+
* @return {@link BigQueryInteractiveQueryItemReaderBuilder}
88+
* @see BigQueryInteractiveQueryItemReader#setJobConfiguration(QueryJobConfiguration)
89+
*/
90+
public BigQueryInteractiveQueryItemReaderBuilder<T> jobConfiguration(QueryJobConfiguration jobConfiguration) {
91+
this.jobConfiguration = jobConfiguration;
92+
return this;
93+
}
94+
95+
/**
96+
* Please do not forget about {@link BigQueryInteractiveQueryItemReader#afterPropertiesSet()}.
97+
*
98+
* @return {@link BigQueryInteractiveQueryItemReader}
99+
*/
100+
public BigQueryInteractiveQueryItemReader<T> build() {
101+
BigQueryInteractiveQueryItemReader<T> reader = new BigQueryInteractiveQueryItemReader<>();
102+
103+
reader.setBigQuery(this.bigQuery);
104+
reader.setRowMapper(this.rowMapper);
105+
106+
if (Objects.nonNull(this.jobConfiguration)) {
107+
reader.setJobConfiguration(this.jobConfiguration);
108+
} else {
109+
Assert.isTrue(StringUtils.isNotBlank(this.query), "No query provided");
110+
reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build());
111+
}
112+
113+
return reader;
114+
}
115+
116+
}

0 commit comments

Comments
 (0)