1
1
import time
2
+ import json
3
+ import base64
4
+ import pytest
5
+
6
+ from common import running_on_ci
7
+ message = []
2
8
3
9
def test_ws_connection (socketio ):
4
10
print ('my sid is' , socketio .sid )
5
11
assert socketio .sid is not None
6
12
7
13
def test_list (socketio ):
14
+ global message
8
15
socketio .on ('message' , message_handler )
9
16
socketio .emit ('command' , 'list' )
10
17
time .sleep (.1 )
18
+ # print (message)
19
+ assert "list" in message [0 ]
20
+ assert "Ports" in message [1 ]
21
+ assert "Network" in message [2 ]
11
22
23
+ # NOTE run the following tests on linux with a board connected to the PC and with this sketch on it: https://gist.github.com/Protoneer/96db95bfb87c3befe46e
24
+ @pytest .mark .skipif (
25
+ running_on_ci (),
26
+ reason = "VMs have no serial ports" ,
27
+ )
12
28
def test__open_serial_default (socketio ):
29
+ global message
30
+ message = []
13
31
socketio .on ('message' , message_handler )
14
32
socketio .emit ('command' , 'open /dev/ttyACM0 9600' )
33
+ time .sleep (.1 ) # give time to message to be filled
34
+ assert "\" IsOpen\" : true" in message [2 ]
35
+ socketio .emit ('command' , 'send /dev/ttyACM0 /"ciao/"' )
36
+ time .sleep (.1 )
37
+ assert "send /dev/ttyACM0 /\" ciao/\" " in message [3 ]
38
+ assert "ciao" in extract_serial_data (message )
39
+
40
+ # test with a lot of emoji: they can be messed up
41
+ message = [] # reinitialize the message buffer
42
+ socketio .emit ('command' , 'send /dev/ttyACM0 /"🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/"' )
15
43
time .sleep (.1 )
44
+ assert "send /dev/ttyACM0 /\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in message [0 ]
45
+ emoji_output = extract_serial_data (message )
46
+ assert "/\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in emoji_output # this is failing because of UTF8 encoding problems
47
+ message = []
48
+ socketio .emit ('command' , 'close /dev/ttyACM0' )
49
+ time .sleep (.1 )
50
+ # print (message)
51
+ assert "\" IsOpen\" : false," in message [8 ]
52
+
53
+ @pytest .mark .skipif (
54
+ running_on_ci (),
55
+ reason = "VMs have no serial ports" ,
56
+ )
57
+ def test__open_serial_timed (socketio ):
58
+ global message
59
+ message = []
60
+ socketio .on ('message' , message_handler )
61
+ socketio .emit ('command' , 'open /dev/ttyACM0 9600 timed' )
62
+ time .sleep (.1 ) # give time to message to be filled
63
+ assert "\" IsOpen\" : true" in message [2 ]
16
64
socketio .emit ('command' , 'send /dev/ttyACM0 /"ciao/"' )
17
65
time .sleep (.1 )
66
+ assert "send /dev/ttyACM0 /\" ciao/\" " in message [3 ]
67
+ assert "ciao" in extract_serial_data (message )
68
+
69
+ # test with a lot of emoji: usually they get messed up
70
+ message = [] # reinitialize the message buffer
71
+ socketio .emit ('command' , 'send /dev/ttyACM0 /"🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/"' )
72
+ time .sleep (.1 )
73
+ assert "send /dev/ttyACM0 /\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in message [0 ]
74
+ emoji_output = extract_serial_data (message )
75
+ assert "/\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in emoji_output
76
+ message = []
18
77
socketio .emit ('command' , 'close /dev/ttyACM0' )
19
78
time .sleep (.1 )
79
+ # print (message)
80
+ assert "\" IsOpen\" : false," in message [8 ]
20
81
82
+ @pytest .mark .skipif (
83
+ running_on_ci (),
84
+ reason = "VMs have no serial ports" ,
85
+ )
86
+ def test__open_serial_timedraw (socketio ):
87
+ global message
88
+ message = []
89
+ socketio .on ('message' , message_handler )
90
+ socketio .emit ('command' , 'open /dev/ttyACM0 9600 timedraw' )
91
+ time .sleep (.1 ) # give time to message to be filled
92
+ assert "\" IsOpen\" : true" in message [2 ]
93
+ socketio .emit ('command' , 'send /dev/ttyACM0 /"ciao/"' )
94
+ time .sleep (.1 )
95
+ assert "send /dev/ttyACM0 /\" ciao/\" " in message [3 ]
96
+ assert "ciao" in decode_output (extract_serial_data (message ))
21
97
98
+ # test with a lot of emoji: usually they get messed up
99
+ message = [] # reinitialize the message buffer
100
+ socketio .emit ('command' , 'send /dev/ttyACM0 /"🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/"' )
101
+ time .sleep (.1 )
102
+ assert "send /dev/ttyACM0 /\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in message [0 ]
103
+ # print (message)
104
+ assert "/\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in decode_output (extract_serial_data (message ))
105
+ socketio .emit ('command' , 'close /dev/ttyACM0' )
106
+ time .sleep (.1 )
107
+ # print (message)
108
+ assert "\" IsOpen\" : false," in message [10 ]
109
+
110
+ @pytest .mark .skipif (
111
+ running_on_ci (),
112
+ reason = "VMs have no serial ports" ,
113
+ )
114
+ def test__open_serial_timedbinary (socketio ):
115
+ global message
116
+ message = []
117
+ socketio .on ('message' , message_handler )
118
+ socketio .emit ('command' , 'open /dev/ttyACM0 9600 timedbinary' )
119
+ time .sleep (1 ) # give time to message to be filled
120
+ assert "\" IsOpen\" : true" in message [2 ]
121
+ socketio .emit ('command' , 'send /dev/ttyACM0 /"ciao/"' )
122
+ time .sleep (.1 )
123
+ assert "send /dev/ttyACM0 /\" ciao/\" " in message [3 ]
124
+ assert "ciao" in decode_output (extract_serial_data (message ))
125
+
126
+ # test with a lot of emoji: usually they get messed up
127
+ message = [] # reinitialize the message buffer
128
+ socketio .emit ('command' , 'send /dev/ttyACM0 /"🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/"' )
129
+ time .sleep (.1 )
130
+ assert "send /dev/ttyACM0 /\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in message [0 ]
131
+ assert "/\" 🧀🧀🧀🧀🧀🧀🧀🧀🧀🧀/\" " in decode_output (extract_serial_data (message ))
132
+ socketio .emit ('command' , 'close /dev/ttyACM0' )
133
+ time .sleep (.1 )
134
+ # print (message)
135
+ assert "\" IsOpen\" : false," in message [10 ]
136
+
137
+
138
+ # callback called by socketio when a message is received
22
139
def message_handler (msg ):
23
- print ('Received message: ' , msg )
140
+ # print('Received message: ', msg)
141
+ global message
142
+ message .append (msg )
143
+
144
+ # helper function used to extract serial data from it's json representation
145
+ # NOTE make sure to pass a clean message (maybe reinitialize the message global var before populating it)
146
+ def extract_serial_data (msg ):
147
+ serial_data = ""
148
+ for i in msg :
149
+ if "{\" P\" " in i :
150
+ # print (json.loads(i)["D"])
151
+ serial_data += json .loads (i )["D" ]
152
+ # print("serialdata:"+serial_data)
153
+ return serial_data
154
+
155
+ def decode_output (raw_output ):
156
+ # print(raw_output)
157
+ base64_bytes = raw_output .encode ('ascii' ) #encode rawoutput message into a bytes-like object
158
+ output_bytes = base64 .b64decode (base64_bytes )
159
+ return output_bytes .decode ('utf-8' )
0 commit comments