@@ -91,7 +91,7 @@ def unmarshal_event(self, data, return_type):
91
91
except ValueError :
92
92
return data
93
93
js ['raw_object' ] = js ['object' ]
94
- if return_type :
94
+ if return_type and js [ 'type' ] != 'ERROR' :
95
95
obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
96
96
js ['object' ] = self ._api_client .deserialize (obj , return_type )
97
97
if hasattr (js ['object' ], 'metadata' ):
@@ -138,11 +138,26 @@ def stream(self, func, *args, **kwargs):
138
138
self .resource_version = kwargs ['resource_version' ]
139
139
140
140
timeouts = ('timeout_seconds' in kwargs )
141
+ retry_after_410 = False
141
142
while True :
142
143
resp = func (* args , ** kwargs )
143
144
try :
144
145
for line in iter_resp_lines (resp ):
145
- yield self .unmarshal_event (line , return_type )
146
+ event = self .unmarshal_event (line , return_type )
147
+ if isinstance (event , dict ) and event ['type' ] == 'ERROR' :
148
+ obj = event ['raw_object' ]
149
+ # Current request expired, let's retry,
150
+ # but only if we have not already retried.
151
+ if not retry_after_410 and obj ['code' ] == 410 :
152
+ retry_after_410 = True
153
+ break
154
+ else :
155
+ reason = "%s: %s" % (obj ['reason' ], obj ['message' ])
156
+ raise client .rest .ApiException (status = obj ['code' ],
157
+ reason = reason )
158
+ else :
159
+ retry_after_410 = False
160
+ yield event
146
161
if self ._stop :
147
162
break
148
163
finally :
0 commit comments