|
28 | 28 | import static org.junit.Assert.assertTrue;
|
29 | 29 | import static org.junit.Assert.fail;
|
30 | 30 |
|
| 31 | +import com.google.api.client.util.IOUtils; |
31 | 32 | import com.google.api.gax.paging.Page;
|
32 | 33 | import com.google.auth.oauth2.GoogleCredentials;
|
33 | 34 | import com.google.auth.oauth2.ServiceAccountCredentials;
|
|
60 | 61 | import com.google.cloud.bigquery.ConnectionProperty;
|
61 | 62 | import com.google.cloud.bigquery.ConnectionSettings;
|
62 | 63 | import com.google.cloud.bigquery.CopyJobConfiguration;
|
| 64 | +import com.google.cloud.bigquery.CsvOptions; |
63 | 65 | import com.google.cloud.bigquery.Dataset;
|
64 | 66 | import com.google.cloud.bigquery.DatasetId;
|
65 | 67 | import com.google.cloud.bigquery.DatasetInfo;
|
|
141 | 143 | import com.google.gson.JsonObject;
|
142 | 144 | import java.io.IOException;
|
143 | 145 | import java.io.InputStream;
|
| 146 | +import java.io.OutputStream; |
144 | 147 | import java.math.BigDecimal;
|
145 | 148 | import java.nio.ByteBuffer;
|
| 149 | +import java.nio.channels.Channels; |
146 | 150 | import java.nio.charset.StandardCharsets;
|
| 151 | +import java.nio.file.FileSystems; |
| 152 | +import java.nio.file.Path; |
147 | 153 | import java.sql.ResultSet;
|
148 | 154 | import java.sql.SQLException;
|
149 | 155 | import java.sql.Time;
|
@@ -711,6 +717,36 @@ public class ITBigQueryTest {
|
711 | 717 | private static final List<ConnectionProperty> CONNECTION_PROPERTIES =
|
712 | 718 | ImmutableList.of(CONNECTION_PROPERTY);
|
713 | 719 |
|
| 720 | + private static final Field ID_SCHEMA = |
| 721 | + Field.newBuilder("id", LegacySQLTypeName.STRING) |
| 722 | + .setMode(Mode.REQUIRED) |
| 723 | + .setDescription("id") |
| 724 | + .build(); |
| 725 | + private static final Field FIRST_NAME_SCHEMA = |
| 726 | + Field.newBuilder("firstname", LegacySQLTypeName.STRING) |
| 727 | + .setMode(Field.Mode.NULLABLE) |
| 728 | + .setDescription("First Name") |
| 729 | + .build(); |
| 730 | + private static final Field LAST_NAME_SCHEMA = |
| 731 | + Field.newBuilder("lastname", LegacySQLTypeName.STRING) |
| 732 | + .setMode(Field.Mode.NULLABLE) |
| 733 | + .setDescription("LAST NAME") |
| 734 | + .build(); |
| 735 | + private static final Field EMAIL_SCHEMA = |
| 736 | + Field.newBuilder("email", LegacySQLTypeName.STRING) |
| 737 | + .setMode(Field.Mode.NULLABLE) |
| 738 | + .setDescription("email") |
| 739 | + .build(); |
| 740 | + private static final Field PROFESSION_SCHEMA = |
| 741 | + Field.newBuilder("profession", LegacySQLTypeName.STRING) |
| 742 | + .setMode(Field.Mode.NULLABLE) |
| 743 | + .setDescription("profession") |
| 744 | + .build(); |
| 745 | + private static final Schema SESSION_TABLE_SCHEMA = |
| 746 | + Schema.of(ID_SCHEMA, FIRST_NAME_SCHEMA, LAST_NAME_SCHEMA, EMAIL_SCHEMA, PROFESSION_SCHEMA); |
| 747 | + private static final Path csvPath = |
| 748 | + FileSystems.getDefault().getPath("src/test/resources", "sessionTest.csv").toAbsolutePath(); |
| 749 | + |
714 | 750 | private static final Set<String> PUBLIC_DATASETS =
|
715 | 751 | ImmutableSet.of("github_repos", "hacker_news", "noaa_gsod", "samples", "usa_names");
|
716 | 752 |
|
@@ -3733,6 +3769,80 @@ public void testQuerySessionSupport() throws InterruptedException {
|
3733 | 3769 | assertEquals(sessionId, statisticsWithSession.getSessionInfo().getSessionId());
|
3734 | 3770 | }
|
3735 | 3771 |
|
| 3772 | + @Test |
| 3773 | + public void testLoadSessionSupportWriteChannelConfiguration() throws InterruptedException { |
| 3774 | + TableId sessionTableId = TableId.of("_SESSION", "test_temp_destination_table_from_file"); |
| 3775 | + |
| 3776 | + WriteChannelConfiguration configuration = |
| 3777 | + WriteChannelConfiguration.newBuilder(sessionTableId) |
| 3778 | + .setFormatOptions(CsvOptions.newBuilder().setFieldDelimiter(",").build()) |
| 3779 | + .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) |
| 3780 | + .setSchema(SESSION_TABLE_SCHEMA) |
| 3781 | + .setCreateSession(true) |
| 3782 | + .build(); |
| 3783 | + String jobName = "jobId_" + UUID.randomUUID().toString(); |
| 3784 | + JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build(); |
| 3785 | + String sessionId; |
| 3786 | + |
| 3787 | + // Imports a local file into a table. |
| 3788 | + try (TableDataWriteChannel writer = bigquery.writer(jobId, configuration); |
| 3789 | + OutputStream stream = Channels.newOutputStream(writer)) { |
| 3790 | + InputStream inputStream = |
| 3791 | + ITBigQueryTest.class.getClassLoader().getResourceAsStream("sessionTest.csv"); |
| 3792 | + // Can use `Files.copy(csvPath, stream);` instead. |
| 3793 | + // Using IOUtils here because graalvm can't handle resource files. |
| 3794 | + IOUtils.copy(inputStream, stream); |
| 3795 | + |
| 3796 | + } catch (IOException e) { |
| 3797 | + throw new RuntimeException(e); |
| 3798 | + } |
| 3799 | + Job loadJob = bigquery.getJob(jobId); |
| 3800 | + Job completedJob = loadJob.waitFor(); |
| 3801 | + |
| 3802 | + assertNotNull(completedJob); |
| 3803 | + assertEquals(jobId.getJob(), completedJob.getJobId().getJob()); |
| 3804 | + JobStatistics.LoadStatistics statistics = completedJob.getStatistics(); |
| 3805 | + |
| 3806 | + sessionId = statistics.getSessionInfo().getSessionId(); |
| 3807 | + assertNotNull(sessionId); |
| 3808 | + |
| 3809 | + // Load job in the same session. |
| 3810 | + // Should load the data to a temp table. |
| 3811 | + ConnectionProperty sessionConnectionProperty = |
| 3812 | + ConnectionProperty.newBuilder().setKey("session_id").setValue(sessionId).build(); |
| 3813 | + WriteChannelConfiguration sessionConfiguration = |
| 3814 | + WriteChannelConfiguration.newBuilder(sessionTableId) |
| 3815 | + .setConnectionProperties(ImmutableList.of(sessionConnectionProperty)) |
| 3816 | + .setFormatOptions(CsvOptions.newBuilder().setFieldDelimiter(",").build()) |
| 3817 | + .setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED) |
| 3818 | + .setSchema(SESSION_TABLE_SCHEMA) |
| 3819 | + .build(); |
| 3820 | + String sessionJobName = "jobId_" + UUID.randomUUID().toString(); |
| 3821 | + JobId sessionJobId = JobId.newBuilder().setLocation("us").setJob(sessionJobName).build(); |
| 3822 | + try (TableDataWriteChannel writer = bigquery.writer(sessionJobId, sessionConfiguration); |
| 3823 | + OutputStream stream = Channels.newOutputStream(writer)) { |
| 3824 | + InputStream inputStream = |
| 3825 | + ITBigQueryTest.class.getClassLoader().getResourceAsStream("sessionTest.csv"); |
| 3826 | + IOUtils.copy(inputStream, stream); |
| 3827 | + } catch (IOException e) { |
| 3828 | + throw new RuntimeException(e); |
| 3829 | + } |
| 3830 | + Job queryJobWithSession = bigquery.getJob(sessionJobId); |
| 3831 | + queryJobWithSession = queryJobWithSession.waitFor(); |
| 3832 | + LoadStatistics statisticsWithSession = queryJobWithSession.getStatistics(); |
| 3833 | + assertNotNull(statisticsWithSession.getSessionInfo().getSessionId()); |
| 3834 | + |
| 3835 | + // Checking if the data loaded to the temp table in the session |
| 3836 | + String queryTempTable = "SELECT * FROM _SESSION.test_temp_destination_table_from_file;"; |
| 3837 | + QueryJobConfiguration queryJobConfigurationWithSession = |
| 3838 | + QueryJobConfiguration.newBuilder(queryTempTable) |
| 3839 | + .setConnectionProperties(ImmutableList.of(sessionConnectionProperty)) |
| 3840 | + .build(); |
| 3841 | + Job queryTempTableJob = bigquery.create(JobInfo.of(queryJobConfigurationWithSession)); |
| 3842 | + queryTempTableJob = queryTempTableJob.waitFor(); |
| 3843 | + assertNotNull(queryTempTableJob.getQueryResults()); |
| 3844 | + } |
| 3845 | + |
3736 | 3846 | @Test
|
3737 | 3847 | public void testLoadSessionSupport() throws InterruptedException {
|
3738 | 3848 | // Start the session
|
|
0 commit comments