@@ -23,26 +23,26 @@ def render_job(name: str, spec: str, job_input: str) -> V1Job:
23
23
)
24
24
25
25
26
- def callback (batch_v1 : BatchV1Api , namespace : str , spec : str ) -> Callable [[Message ], None ]:
26
+ def callback (namespace : str , spec : str ) -> Callable [[Message ], None ]:
27
27
def cb (m : Message ):
28
28
try :
29
29
job_name = md5 (m .message_id .encode ('utf-8' )).hexdigest ()
30
30
job_input = m .data .decode ('utf-8' )
31
31
log .info (f'Submitting job { job_name } with input "{ job_input } "' )
32
32
33
33
job = render_job (job_name , spec , job_input )
34
- batch_v1 .create_namespaced_job (namespace , job )
34
+ get_batch_v1 () .create_namespaced_job (namespace , job )
35
35
log .info (f'Submitted job { job_name } ' )
36
36
except Exception :
37
37
log .exception ('PubSub subscriber callback' )
38
38
m .ack ()
39
39
return cb
40
40
41
41
42
- def listen (subscription : str , batch_v1 : BatchV1Api , namespace : str , spec : str ) -> None :
42
+ def listen (subscription : str , namespace : str , spec : str ) -> None :
43
43
subscriber = pubsub_v1 .SubscriberClient ()
44
44
with subscriber :
45
- cb = callback (batch_v1 , namespace , spec )
45
+ cb = callback (namespace , spec )
46
46
streaming_pull = subscriber .subscribe (subscription , cb )
47
47
log .info (f'Listening to subscription { subscription } ' )
48
48
try :
@@ -57,7 +57,7 @@ def load_job_spec(spec_path: str) -> str:
57
57
return f .read ()
58
58
59
59
60
- def get_batch_v1 ():
60
+ def get_batch_v1 () -> BatchV1Api :
61
61
try :
62
62
load_kube_config ()
63
63
except :
@@ -74,9 +74,7 @@ def main():
74
74
log .basicConfig (level = log_level )
75
75
76
76
spec = load_job_spec (spec_path )
77
- batch_v1 = get_batch_v1 ()
78
-
79
- listen (subscription , batch_v1 , namespace , spec )
77
+ listen (subscription , namespace , spec )
80
78
81
79
82
80
if __name__ == '__main__' :
0 commit comments