15
15
# KIND, either express or implied. See the License for the
16
16
# specific language governing permissions and limitations
17
17
# under the License.
18
- #
19
18
"""This module contains a Google Cloud Storage hook."""
19
+ from __future__ import annotations
20
+
20
21
import functools
21
22
import gzip as gz
22
23
import os
28
29
from io import BytesIO
29
30
from os import path
30
31
from tempfile import NamedTemporaryFile
31
- from typing import (
32
- IO ,
33
- Callable ,
34
- Generator ,
35
- List ,
36
- Optional ,
37
- Sequence ,
38
- Set ,
39
- Tuple ,
40
- TypeVar ,
41
- Union ,
42
- cast ,
43
- overload ,
44
- )
32
+ from typing import IO , Callable , Generator , Sequence , TypeVar , cast , overload
45
33
from urllib .parse import urlparse
46
34
47
35
from google .api_core .exceptions import NotFound
60
48
RT = TypeVar ('RT' )
61
49
T = TypeVar ("T" , bound = Callable )
62
50
51
+ # GCSHook has a method named 'list' (to junior devs: please don't do this), so
52
+ # we need to create an alias to prevent Mypy being confused.
53
+ List = list
54
+
63
55
# Use default timeout from google-cloud-storage
64
56
DEFAULT_TIMEOUT = 60
65
57
@@ -80,7 +72,7 @@ def _fallback_object_url_to_object_name_and_bucket_name(
80
72
81
73
def _wrapper (func : T ):
82
74
@functools .wraps (func )
83
- def _inner_wrapper (self : " GCSHook" , * args , ** kwargs ) -> RT :
75
+ def _inner_wrapper (self : GCSHook , * args , ** kwargs ) -> RT :
84
76
if args :
85
77
raise AirflowException (
86
78
"You must use keyword arguments in this methods rather than positional"
@@ -139,13 +131,13 @@ class GCSHook(GoogleBaseHook):
139
131
connection.
140
132
"""
141
133
142
- _conn = None # type: Optional[ storage.Client]
134
+ _conn : storage .Client | None = None
143
135
144
136
def __init__ (
145
137
self ,
146
138
gcp_conn_id : str = "google_cloud_default" ,
147
- delegate_to : Optional [ str ] = None ,
148
- impersonation_chain : Optional [ Union [ str , Sequence [str ]]] = None ,
139
+ delegate_to : str | None = None ,
140
+ impersonation_chain : str | Sequence [str ] | None = None ,
149
141
) -> None :
150
142
super ().__init__ (
151
143
gcp_conn_id = gcp_conn_id ,
@@ -166,8 +158,8 @@ def copy(
166
158
self ,
167
159
source_bucket : str ,
168
160
source_object : str ,
169
- destination_bucket : Optional [ str ] = None ,
170
- destination_object : Optional [ str ] = None ,
161
+ destination_bucket : str | None = None ,
162
+ destination_object : str | None = None ,
171
163
) -> None :
172
164
"""
173
165
Copies an object from a bucket to another, with renaming if requested.
@@ -215,7 +207,7 @@ def rewrite(
215
207
source_bucket : str ,
216
208
source_object : str ,
217
209
destination_bucket : str ,
218
- destination_object : Optional [ str ] = None ,
210
+ destination_object : str | None = None ,
219
211
) -> None :
220
212
"""
221
213
Has the same functionality as copy, except that will work on files
@@ -270,9 +262,9 @@ def download(
270
262
bucket_name : str ,
271
263
object_name : str ,
272
264
filename : None = None ,
273
- chunk_size : Optional [ int ] = None ,
274
- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
275
- num_max_attempts : Optional [ int ] = 1 ,
265
+ chunk_size : int | None = None ,
266
+ timeout : int | None = DEFAULT_TIMEOUT ,
267
+ num_max_attempts : int | None = 1 ,
276
268
) -> bytes :
277
269
...
278
270
@@ -282,21 +274,21 @@ def download(
282
274
bucket_name : str ,
283
275
object_name : str ,
284
276
filename : str ,
285
- chunk_size : Optional [ int ] = None ,
286
- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
287
- num_max_attempts : Optional [ int ] = 1 ,
277
+ chunk_size : int | None = None ,
278
+ timeout : int | None = DEFAULT_TIMEOUT ,
279
+ num_max_attempts : int | None = 1 ,
288
280
) -> str :
289
281
...
290
282
291
283
def download (
292
284
self ,
293
285
bucket_name : str ,
294
286
object_name : str ,
295
- filename : Optional [ str ] = None ,
296
- chunk_size : Optional [ int ] = None ,
297
- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
298
- num_max_attempts : Optional [ int ] = 1 ,
299
- ) -> Union [ str , bytes ] :
287
+ filename : str | None = None ,
288
+ chunk_size : int | None = None ,
289
+ timeout : int | None = DEFAULT_TIMEOUT ,
290
+ num_max_attempts : int | None = 1 ,
291
+ ) -> str | bytes :
300
292
"""
301
293
Downloads a file from Google Cloud Storage.
302
294
@@ -351,9 +343,9 @@ def download_as_byte_array(
351
343
self ,
352
344
bucket_name : str ,
353
345
object_name : str ,
354
- chunk_size : Optional [ int ] = None ,
355
- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
356
- num_max_attempts : Optional [ int ] = 1 ,
346
+ chunk_size : int | None = None ,
347
+ timeout : int | None = DEFAULT_TIMEOUT ,
348
+ num_max_attempts : int | None = 1 ,
357
349
) -> bytes :
358
350
"""
359
351
Downloads a file from Google Cloud Storage.
@@ -383,9 +375,9 @@ def download_as_byte_array(
383
375
def provide_file (
384
376
self ,
385
377
bucket_name : str = PROVIDE_BUCKET ,
386
- object_name : Optional [ str ] = None ,
387
- object_url : Optional [ str ] = None ,
388
- dir : Optional [ str ] = None ,
378
+ object_name : str | None = None ,
379
+ object_url : str | None = None ,
380
+ dir : str | None = None ,
389
381
) -> Generator [IO [bytes ], None , None ]:
390
382
"""
391
383
Downloads the file to a temporary directory and returns a file handle
@@ -412,8 +404,8 @@ def provide_file(
412
404
def provide_file_and_upload (
413
405
self ,
414
406
bucket_name : str = PROVIDE_BUCKET ,
415
- object_name : Optional [ str ] = None ,
416
- object_url : Optional [ str ] = None ,
407
+ object_name : str | None = None ,
408
+ object_url : str | None = None ,
417
409
) -> Generator [IO [bytes ], None , None ]:
418
410
"""
419
411
Creates temporary file, returns a file handle and uploads the files content
@@ -440,15 +432,15 @@ def upload(
440
432
self ,
441
433
bucket_name : str ,
442
434
object_name : str ,
443
- filename : Optional [ str ] = None ,
444
- data : Optional [ Union [ str , bytes ]] = None ,
445
- mime_type : Optional [ str ] = None ,
435
+ filename : str | None = None ,
436
+ data : str | bytes | None = None ,
437
+ mime_type : str | None = None ,
446
438
gzip : bool = False ,
447
439
encoding : str = 'utf-8' ,
448
- chunk_size : Optional [ int ] = None ,
449
- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
440
+ chunk_size : int | None = None ,
441
+ timeout : int | None = DEFAULT_TIMEOUT ,
450
442
num_max_attempts : int = 1 ,
451
- metadata : Optional [ dict ] = None ,
443
+ metadata : dict | None = None ,
452
444
) -> None :
453
445
"""
454
446
Uploads a local file or file data as string or bytes to Google Cloud Storage.
@@ -683,7 +675,7 @@ def delete_bucket(self, bucket_name: str, force: bool = False) -> None:
683
675
except NotFound :
684
676
self .log .info ("Bucket %s not exists" , bucket_name )
685
677
686
- def list (self , bucket_name , versions = None , max_results = None , prefix = None , delimiter = None ) -> list :
678
+ def list (self , bucket_name , versions = None , max_results = None , prefix = None , delimiter = None ) -> List :
687
679
"""
688
680
List all objects from the bucket with the give string prefix in name
689
681
@@ -730,10 +722,10 @@ def list_by_timespan(
730
722
bucket_name : str ,
731
723
timespan_start : datetime ,
732
724
timespan_end : datetime ,
733
- versions : Optional [ bool ] = None ,
734
- max_results : Optional [ int ] = None ,
735
- prefix : Optional [ str ] = None ,
736
- delimiter : Optional [ str ] = None ,
725
+ versions : bool | None = None ,
726
+ max_results : int | None = None ,
727
+ prefix : str | None = None ,
728
+ delimiter : str | None = None ,
737
729
) -> List [str ]:
738
730
"""
739
731
List all objects from the bucket with the give string prefix in name that were
@@ -838,11 +830,11 @@ def get_md5hash(self, bucket_name: str, object_name: str) -> str:
838
830
def create_bucket (
839
831
self ,
840
832
bucket_name : str ,
841
- resource : Optional [ dict ] = None ,
833
+ resource : dict | None = None ,
842
834
storage_class : str = 'MULTI_REGIONAL' ,
843
835
location : str = 'US' ,
844
- project_id : Optional [ str ] = None ,
845
- labels : Optional [ dict ] = None ,
836
+ project_id : str | None = None ,
837
+ labels : dict | None = None ,
846
838
) -> str :
847
839
"""
848
840
Creates a new bucket. Google Cloud Storage uses a flat namespace, so
@@ -900,7 +892,7 @@ def create_bucket(
900
892
return bucket .id
901
893
902
894
def insert_bucket_acl (
903
- self , bucket_name : str , entity : str , role : str , user_project : Optional [ str ] = None
895
+ self , bucket_name : str , entity : str , role : str , user_project : str | None = None
904
896
) -> None :
905
897
"""
906
898
Creates a new ACL entry on the specified bucket_name.
@@ -933,8 +925,8 @@ def insert_object_acl(
933
925
object_name : str ,
934
926
entity : str ,
935
927
role : str ,
936
- generation : Optional [ int ] = None ,
937
- user_project : Optional [ str ] = None ,
928
+ generation : int | None = None ,
929
+ user_project : str | None = None ,
938
930
) -> None :
939
931
"""
940
932
Creates a new ACL entry on the specified object.
@@ -967,7 +959,7 @@ def insert_object_acl(
967
959
968
960
self .log .info ('A new ACL entry created for object: %s in bucket: %s' , object_name , bucket_name )
969
961
970
- def compose (self , bucket_name : str , source_objects : List , destination_object : str ) -> None :
962
+ def compose (self , bucket_name : str , source_objects : List [ str ] , destination_object : str ) -> None :
971
963
"""
972
964
Composes a list of existing object into a new object in the same storage bucket_name
973
965
@@ -1002,8 +994,8 @@ def sync(
1002
994
self ,
1003
995
source_bucket : str ,
1004
996
destination_bucket : str ,
1005
- source_object : Optional [ str ] = None ,
1006
- destination_object : Optional [ str ] = None ,
997
+ source_object : str | None = None ,
998
+ destination_object : str | None = None ,
1007
999
recursive : bool = True ,
1008
1000
allow_overwrite : bool = False ,
1009
1001
delete_extra_files : bool = False ,
@@ -1104,7 +1096,7 @@ def sync(
1104
1096
self .log .info ("Synchronization finished." )
1105
1097
1106
1098
def _calculate_sync_destination_path (
1107
- self , blob : storage .Blob , destination_object : Optional [ str ] , source_object_prefix_len : int
1099
+ self , blob : storage .Blob , destination_object : str | None , source_object_prefix_len : int
1108
1100
) -> str :
1109
1101
return (
1110
1102
path .join (destination_object , blob .name [source_object_prefix_len :])
@@ -1116,10 +1108,10 @@ def _calculate_sync_destination_path(
1116
1108
def _prepare_sync_plan (
1117
1109
source_bucket : storage .Bucket ,
1118
1110
destination_bucket : storage .Bucket ,
1119
- source_object : Optional [ str ] ,
1120
- destination_object : Optional [ str ] ,
1111
+ source_object : str | None ,
1112
+ destination_object : str | None ,
1121
1113
recursive : bool ,
1122
- ) -> Tuple [ Set [storage .Blob ], Set [storage .Blob ], Set [storage .Blob ]]:
1114
+ ) -> tuple [ set [storage .Blob ], set [storage .Blob ], set [storage .Blob ]]:
1123
1115
# Calculate the number of characters that remove from the name, because they contain information
1124
1116
# about the parent's path
1125
1117
source_object_prefix_len = len (source_object ) if source_object else 0
@@ -1139,11 +1131,11 @@ def _prepare_sync_plan(
1139
1131
# Determine objects to copy and delete
1140
1132
to_copy = source_names - destination_names
1141
1133
to_delete = destination_names - source_names
1142
- to_copy_blobs = {source_names_index [a ] for a in to_copy } # type: Set[storage.Blob]
1143
- to_delete_blobs = {destination_names_index [a ] for a in to_delete } # type: Set[storage.Blob]
1134
+ to_copy_blobs : set [ storage . Blob ] = {source_names_index [a ] for a in to_copy }
1135
+ to_delete_blobs : set [ storage . Blob ] = {destination_names_index [a ] for a in to_delete }
1144
1136
# Find names that are in both buckets
1145
1137
names_to_check = source_names .intersection (destination_names )
1146
- to_rewrite_blobs = set () # type: Set [storage.Blob]
1138
+ to_rewrite_blobs : set [storage .Blob ] = set ()
1147
1139
# Compare objects based on crc32
1148
1140
for current_name in names_to_check :
1149
1141
source_blob = source_names_index [current_name ]
@@ -1164,7 +1156,7 @@ def gcs_object_is_directory(bucket: str) -> bool:
1164
1156
return len (blob ) == 0 or blob .endswith ('/' )
1165
1157
1166
1158
1167
- def _parse_gcs_url (gsurl : str ) -> Tuple [str , str ]:
1159
+ def _parse_gcs_url (gsurl : str ) -> tuple [str , str ]:
1168
1160
"""
1169
1161
Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a
1170
1162
tuple containing the corresponding bucket and blob.
0 commit comments