@@ -2,19 +2,83 @@ use crate::{
2
2
error:: { Error , Result } ,
3
3
sync, AsyncNotification , CWD ,
4
4
} ;
5
- use crossbeam_channel:: Sender ;
6
- use std:: sync:: { Arc , Mutex } ;
5
+ use crossbeam_channel:: { unbounded, Receiver , Sender } ;
6
+ use git2:: PackBuilderStage ;
7
+ use std:: {
8
+ cmp,
9
+ sync:: { Arc , Mutex } ,
10
+ thread,
11
+ time:: Duration ,
12
+ } ;
13
+ use sync:: ProgressNotification ;
14
+ use thread:: JoinHandle ;
15
+
16
+ ///
17
+ #[ derive( Clone , Debug ) ]
18
+ pub enum PushProgressState {
19
+ ///
20
+ PackingAddingObject ,
21
+ ///
22
+ PackingDeltafiction ,
23
+ ///
24
+ Pushing ,
25
+ }
7
26
27
+ ///
8
28
#[ derive( Clone , Debug ) ]
9
- enum PushStates {
10
- None ,
11
- // Packing,
12
- // Pushing(usize, usize),
29
+ pub struct PushProgress {
30
+ ///
31
+ pub state : PushProgressState ,
32
+ ///
33
+ pub progress : u8 ,
13
34
}
14
35
15
- impl Default for PushStates {
16
- fn default ( ) -> Self {
17
- PushStates :: None
36
+ impl PushProgress {
37
+ ///
38
+ pub fn new (
39
+ state : PushProgressState ,
40
+ current : usize ,
41
+ total : usize ,
42
+ ) -> Self {
43
+ let total = cmp:: max ( current, total) ;
44
+ let progress = current as f32 / total as f32 * 100.0 ;
45
+ let progress = progress as u8 ;
46
+ Self { state, progress }
47
+ }
48
+ }
49
+
50
+ impl From < ProgressNotification > for PushProgress {
51
+ fn from ( progress : ProgressNotification ) -> Self {
52
+ match progress {
53
+ ProgressNotification :: Packing {
54
+ stage,
55
+ current,
56
+ total,
57
+ } => match stage {
58
+ PackBuilderStage :: AddingObjects => PushProgress :: new (
59
+ PushProgressState :: PackingAddingObject ,
60
+ current,
61
+ total,
62
+ ) ,
63
+ PackBuilderStage :: Deltafication => PushProgress :: new (
64
+ PushProgressState :: PackingDeltafiction ,
65
+ current,
66
+ total,
67
+ ) ,
68
+ } ,
69
+ ProgressNotification :: PushTransfer {
70
+ current,
71
+ total,
72
+ ..
73
+ } => PushProgress :: new (
74
+ PushProgressState :: Pushing ,
75
+ current,
76
+ total,
77
+ ) ,
78
+ ProgressNotification :: Done => {
79
+ PushProgress :: new ( PushProgressState :: Pushing , 1 , 1 )
80
+ }
81
+ }
18
82
}
19
83
}
20
84
@@ -30,13 +94,13 @@ pub struct PushRequest {
30
94
#[ derive( Default , Clone , Debug ) ]
31
95
struct PushState {
32
96
request : PushRequest ,
33
- state : PushStates ,
34
97
}
35
98
36
99
///
37
100
pub struct AsyncPush {
38
101
state : Arc < Mutex < Option < PushState > > > ,
39
102
last_result : Arc < Mutex < Option < String > > > ,
103
+ progress : Arc < Mutex < Option < ProgressNotification > > > ,
40
104
sender : Sender < AsyncNotification > ,
41
105
}
42
106
@@ -46,6 +110,7 @@ impl AsyncPush {
46
110
Self {
47
111
state : Arc :: new ( Mutex :: new ( None ) ) ,
48
112
last_result : Arc :: new ( Mutex :: new ( None ) ) ,
113
+ progress : Arc :: new ( Mutex :: new ( None ) ) ,
49
114
sender : sender. clone ( ) ,
50
115
}
51
116
}
@@ -62,6 +127,12 @@ impl AsyncPush {
62
127
Ok ( res. clone ( ) )
63
128
}
64
129
130
+ ///
131
+ pub fn progress ( & self ) -> Result < Option < PushProgress > > {
132
+ let res = self . progress . lock ( ) ?;
133
+ Ok ( res. map ( |progress| progress. into ( ) ) )
134
+ }
135
+
65
136
///
66
137
pub fn request ( & mut self , params : PushRequest ) -> Result < ( ) > {
67
138
log:: trace!( "request" ) ;
@@ -71,19 +142,35 @@ impl AsyncPush {
71
142
}
72
143
73
144
self . set_request ( & params) ?;
145
+ Self :: set_progress ( self . progress . clone ( ) , None ) ?;
74
146
75
147
let arc_state = Arc :: clone ( & self . state ) ;
76
148
let arc_res = Arc :: clone ( & self . last_result ) ;
149
+ let arc_progress = Arc :: clone ( & self . progress ) ;
77
150
let sender = self . sender . clone ( ) ;
78
151
79
- rayon_core:: spawn ( move || {
80
- //TODO: use channels to communicate progress
81
- let res = sync:: push_origin (
152
+ thread:: spawn ( move || {
153
+ let ( progress_sender, receiver) = unbounded ( ) ;
154
+
155
+ let handle = Self :: spawn_receiver_thread (
156
+ sender. clone ( ) ,
157
+ receiver,
158
+ arc_progress,
159
+ ) ;
160
+
161
+ let res = sync:: push (
82
162
CWD ,
83
163
params. remote . as_str ( ) ,
84
164
params. branch . as_str ( ) ,
165
+ progress_sender. clone ( ) ,
85
166
) ;
86
167
168
+ progress_sender
169
+ . send ( ProgressNotification :: Done )
170
+ . expect ( "closing send failed" ) ;
171
+
172
+ handle. join ( ) . expect ( "joining thread failed" ) ;
173
+
87
174
Self :: set_result ( arc_res, res) . expect ( "result error" ) ;
88
175
89
176
Self :: clear_request ( arc_state) . expect ( "clear error" ) ;
@@ -96,6 +183,44 @@ impl AsyncPush {
96
183
Ok ( ( ) )
97
184
}
98
185
186
+ fn spawn_receiver_thread (
187
+ sender : Sender < AsyncNotification > ,
188
+ receiver : Receiver < ProgressNotification > ,
189
+ progress : Arc < Mutex < Option < ProgressNotification > > > ,
190
+ ) -> JoinHandle < ( ) > {
191
+ log:: info!( "push progress receiver spawned" ) ;
192
+
193
+ thread:: spawn ( move || loop {
194
+ let incoming = receiver. recv ( ) ;
195
+ match incoming {
196
+ Ok ( update) => {
197
+ Self :: set_progress (
198
+ progress. clone ( ) ,
199
+ Some ( update) ,
200
+ )
201
+ . expect ( "set prgoress failed" ) ;
202
+ sender
203
+ . send ( AsyncNotification :: Push )
204
+ . expect ( "error sending push" ) ;
205
+
206
+ //NOTE: for better debugging
207
+ thread:: sleep ( Duration :: from_millis ( 300 ) ) ;
208
+
209
+ if let ProgressNotification :: Done = update {
210
+ break ;
211
+ }
212
+ }
213
+ Err ( e) => {
214
+ log:: error!(
215
+ "push progress receiver error: {}" ,
216
+ e
217
+ ) ;
218
+ break ;
219
+ }
220
+ }
221
+ } )
222
+ }
223
+
99
224
fn set_request ( & self , params : & PushRequest ) -> Result < ( ) > {
100
225
let mut state = self . state . lock ( ) ?;
101
226
@@ -105,7 +230,6 @@ impl AsyncPush {
105
230
106
231
* state = Some ( PushState {
107
232
request : params. clone ( ) ,
108
- ..PushState :: default ( )
109
233
} ) ;
110
234
111
235
Ok ( ( ) )
@@ -121,6 +245,20 @@ impl AsyncPush {
121
245
Ok ( ( ) )
122
246
}
123
247
248
+ fn set_progress (
249
+ progress : Arc < Mutex < Option < ProgressNotification > > > ,
250
+ state : Option < ProgressNotification > ,
251
+ ) -> Result < ( ) > {
252
+ let simple_progress: Option < PushProgress > =
253
+ state. map ( |prog| prog. into ( ) ) ;
254
+ log:: info!( "push progress: {:?}" , simple_progress) ;
255
+ let mut progress = progress. lock ( ) ?;
256
+
257
+ * progress = state;
258
+
259
+ Ok ( ( ) )
260
+ }
261
+
124
262
fn set_result (
125
263
arc_result : Arc < Mutex < Option < String > > > ,
126
264
res : Result < ( ) > ,
@@ -138,3 +276,24 @@ impl AsyncPush {
138
276
Ok ( ( ) )
139
277
}
140
278
}
279
+
280
+ #[ cfg( test) ]
281
+ mod tests {
282
+ use super :: * ;
283
+
284
+ #[ test]
285
+ fn test_progress_zero_total ( ) {
286
+ let prog =
287
+ PushProgress :: new ( PushProgressState :: Pushing , 1 , 0 ) ;
288
+
289
+ assert_eq ! ( prog. progress, 100 ) ;
290
+ }
291
+
292
+ #[ test]
293
+ fn test_progress_rounding ( ) {
294
+ let prog =
295
+ PushProgress :: new ( PushProgressState :: Pushing , 2 , 10 ) ;
296
+
297
+ assert_eq ! ( prog. progress, 20 ) ;
298
+ }
299
+ }
0 commit comments