From bf847aa7cad844d162626ee31a4303454667deb4 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Wed, 6 Nov 2024 16:25:57 +0100 Subject: [PATCH 01/16] Test firmware flasher --- examples/buzzer_test.py | 8 + examples/node_base.bin | Bin 0 -> 14464 bytes src/modulino/firmware_flasher.py | 264 +++++++++++++++++++++++++++++++ 3 files changed, 272 insertions(+) create mode 100644 examples/buzzer_test.py create mode 100755 examples/node_base.bin create mode 100644 src/modulino/firmware_flasher.py diff --git a/examples/buzzer_test.py b/examples/buzzer_test.py new file mode 100644 index 0000000..30ab2bb --- /dev/null +++ b/examples/buzzer_test.py @@ -0,0 +1,8 @@ +from modulino import ModulinoBuzzer +from time import sleep + +buzzer = ModulinoBuzzer() + +buzzer.tone(ModulinoBuzzer.NOTES["E5"], 0, blocking=True) +sleep(2) +buzzer.no_tone() diff --git a/examples/node_base.bin b/examples/node_base.bin new file mode 100755 index 0000000000000000000000000000000000000000..31904ae5627ae10a270bc38e318fae674705bd17 GIT binary patch literal 14464 zcmeHte|Q_kwdl;QWXYBt%Tk=k@{g=`?ZlRn)UiScaUkp6#IYrlkfiO)58BF-)2?J2 zHbALf>3xWV^ak93iJ*_zNil@4B?)a^gSqXcE-vA=O#`pH`EkGepdTit-9n(K1R~q9 zweQTz4x#t8@BRC}5BuBMnc0~$=bSme=8TXFFz{IwRf7lplfQSb`+f=PAHZ`M z=8`Go{eOD?U%t&O`S-9snO^Vj`)9_JH}&F<#%60X%DufAJ+hkmrMz~y5wW7ROStD0 zo!cdR?G&BYC9FP0+q#76Q*?fpP;g3Tb1BEalR9CvKjSb03F6t#(1ONWz;u!0%bm|@hr zjJjylgGQZT4l|7UF`xy;BV)W#?=tFyMtmx0)EnIBByQZ;U~iPy>KYzI3`*sUQA5Cs znzVASCa)!YF*TWk_Q72Ot(bzyhmcc4 zdJ1#az&7F3H-=eD^G4KlTll4%V5zk0vUQIc-cPR0X!a2_VtN~JBoET(nc$HJO{kAS zEcOhWaY`B|b;_uR#}T8aU(iX}ZRIe^mA0|2UAk8G=7f{sqvAVvnzdNN(a=Bj+FP;4&Cd!L-~$xTi4uh#>C;OY092JsVh1I_}T$q#}uK4&`>bNw0MG^Y0L`` zUypm?6GcxHKS8B#yGDVvo8U@tq*6CsTW_U@w7)sN30AalxH7w<>2Z`Af_-9<$=Kl? zW4SF1J7SDIImsYBJ(gLa4L2@?UG69Pun_aQj?iMLGePH|JC3}xg4i6!EzcP$50cT5yEH-zLqCM$ES(e8tG7DKnu9+7G0H-J|);6ZRB_z?WQ0FM_QYmi||;~hSe zVme#Cd?m<}xmI3Fu}Sfs`mMsIhO=d9jm(qRB7|8Sbj$MEpOE%o{;l~fxF!#D_)$H< z;o6-8H9&T+38#ucZuRs@=%vt1-#5oU*2voV7%}!istKpgjuSm5ZGb(vx8DkTm;-z0 zn?#=3ERoImlX;vudBtx9dD(_@lRRgBE-$&N-j+nEf4z#QB@Xro`%&uI2|fLWjy+VRY5t4Byyv#I=l(cP+#1>1p2ySs=$@_Z^Omqio8?XW zOwg_~T6Mg6pBY+Y?6!70YvXg0fMqY0I(u~l^lBT>H*C&LJ`YypCz)Zc29;6=n{ok* zA7yG#Z&U8~QRL_CUK%ZyQLUCCDW1hL%s?9cUZi(St#gi1s<+?+ld5IJu$Xf#7kdP1WR_ zMf;oLe$NJekAPIS{GD~B@$Up)2w|=+bYI|d=&n$LMVy1KMgG>0TK!>^I;+ocv=%Jp zmdy37cxFZ$JJ;V4jECw&ox$bkYUt$50(v{%Cz#G5&sUqM)VC*4+}@lQU+uN1!FaRp2DK}`##aZj<(m?{ zi=nU4=Tp7ti^1>3cb(YvDoV|pq7H*hr;bkfStu~j}VZ{2Zzho#5b zi}qu{WAhYd*Mi(4&M%`>@zg7q*FU;CxH@n?L~6s0QyT6iRyCklz@}D#P2B)`dLzs@ zq}}UV1y(h!ZmQ^M=wqy&lLKU@?#?V(*T>uyw|hwcu=(TwN<}jR@jiyMndwoX%mvNF zT0o*doG`J~sHT}py*H8j14}PjJWD~oS*m4!Rg<3n-POt<@|+Fx*3FR;;K>ihU(t{+ zrJ_{p_)^#V8U=Gf|0f(l@19a=OUxbI=yoT5tXaY4{A%(r=r*!bJVa5qq1o^(M^>V* zi>7rAeG&0alO0V$8`UCSFGPswKha^Lj}rJ#VjIv0;NQ>$_)o;N=r15*zlePxmY)2R z2mm~B$_84S(_|l{rJm>rcLBKtO2H}qs(lEj=&`FB(P!+*3-0W9cjSrgHYoXo2kk>W z=i8gjdE)uVg?1Z{lem}aH)1Wo-41^A5yQ(-ME@slNh_>E_(;~U1}SN0e~*w05OWNO1YT*!XLvC^ zmf-ST0~ZWjI5eJb;Nnm*Se(4Tuz5p!iMS!Mu^lJf`~?@iT+LFgRMS6pMsF1bu*5&5u zFRw<6_!9BgtsfDbNE`+-i>l2R=5a2*qJ9^T1KG97w*)IzY&2meDHNO>_E-XzrcEI_ zmYY*bmFC$*I*h=#cZag$No+@8$%A_ON8q&*^UbKTWgB9pE66EwAx`oO3zY^zW}(fS0^(#$rX5sa`I%n8`bfCfl)bWcC!t7Ft<&3rj-jj0uG2c z9l#SkU8jo`Pc{q>m`C(eJ`TuLKy zr0W@7*RSfR<4tO2Pxd$w8XGiMkR2WyqNNL#g3~^!*RmeUQVTls?1+i=NlpPNe;7f` z2ikJa?iNvcM#|%@UP?v2zo-|r1iO(+3Pma_5sLLRrzaNVpjRkPsu#`zjiq`5ut)=j zZ)km<<5ZVmRTG>?xs9 zIU8=dBt0z^31xuCQ%YEBkG3|KQe^)lz{jrzzSivIz0vG0d?nCp+E#d;zde{2A{=}S zywYCfL9mvL!bx8YBo>gdy0K>oj+M3fwYZw`T_b}n7c0HHl*c~bG4ee+N^?|+m5-5ola5Hu%`=Jqn$}8 zQS}Jy#f_iNR>%IVr@xTN%GxLAiasM>dIH4kx4KSbjK~_%eIj~9)|Rt{!TqpjX0=o* zXd-)ZmY3H;4{xopDdf%6&DOO5a>QA4v1WsEPkiMfr-Z{VYvgi~2utTS&aCf%16SQpA zt=lX+PV4Cb{it+vbE&zsuorFHJkMj}&+;n*D}r`IUjF;k8ybONGaw9HgK!*V^5_)q z9d<7XJ}G<*GX4wA+q}t=b7qgUDCpw*xl&AvU7;??*F^Ip5~&YpWvpj?y_6K60gb5- zg(d7QQR}5%y623=^Rm<>I>na(&--DO<=nqZZayMiKs~~fu+}$?wdTNDe>?X(4bBBH zZ<~4a^uK5nM?>qTJ4%E%)qAz^&I#DxuNtHCVDxv-{Y*P5*#b_U1&ufw`Yy493}yJ2 zd%%xhZx8ebVgFL_JPI%H)@HQ3A2Hyh&Lr5IS~i+T+jEdpu&AT$c-f3+q?2^-#m-^Q zj(XAI=#up=-YwL_33asdNVFa_669(z4KsXz!Dz%QodOQgmGhf}orlEdB~A=(-;|Sc zhHb8F33hV)``I2-&YAM&j%cv67xh}n7^gYsOi6Q@F=kkDP;#V$#?gpv)m5!<%>a6v6p+mh7cz9nc>h(c9d*`FLw>^~g@z3k` zMGuiL5cg(=su4uPso|*<$P8J7&>8ob0>fxiv`ch}RFD=owPxujaeR7^S#L)5ajFtO zhJ7w${d!~l`(XY50scQ<^+_gQ0ciJ}z}(=5Xad;)0?-7f9K_Yj49W~)4)<&moS{3U zcIk-J>v}=3ovTH9`c*xP<5r-suOL_UG&Hgzo_HKx1lfcfU<) zYjmTFJXVQ}gd6ev@AJgl;(cHb)x^kp$>)S}sHr189UAKK}1#w?ZPDAHKQ z7jn;{L;OLJdZMt(7WMImcV91t%&B5yh-|_ctE7NdOpchPtXTB5#LOQ@S-oD zqrb30c}=2v%lIw{j-ukR5S&WQ^Mpnv(%5751^1zoF1j+i#yhY^-~8e`u9y#si0c>f zmCAM4-!E7d&Ul*?Hh9eS)ruDy%>F~XPjr5LUJ&>E@lJf;q_|Mb(D?L1G@F0(8>KDy z;OL#@;;*2`$(JKX@K4_;m+%1?LuJR5g>IMd!7A|;XwT-W&{rVVIIy~1dk`?)4)mTd zaNQNE&ppbomR=M&Q4IAGyWygE1aYr%Zg^k+sR`Ov(igSz*|-E`!Hbm#KqejtPikX> zm}@9M@vR5TFWyfw*GWaUdilB#)nQ*(jLO(z z{wV(~sb0K0^lHbhj;$fO6G7I)Em>idJ0y1Q?A!`CkA4erC%(0-J)2iVj0!jse}fRk z=hP10KRh6>C1;L(7JLS=Jz>o5j}{4(x*xQ&2XL$%qz@plI1bZ8X1DlIfjPKF$`jiP zIB_FL!$zxD6hgI-Z_pz;Z66fcOIvIM&j=NYyX6_7)F@rb&)JVEFU)oGCoC7{a$xTs zhP6Isoua)aN9~3;mh!;X+d}psre;jFiC46LJmb#_0}*$K9o6`nca^ub2j(H`4J@)& zRRG61`{3^_)-yC}@Vn#HoSXl>zuJq`{`!ZZmqOK*4Rww<%ReL7)YY(V?f^vZ=YKwp zL64XR7<|lrq>=n1hD_5sr3y9N;ESIhKt8j|A>Pj*x2&f(jaPEmw4M;Iwx|i#LDh*pp{4LTx^oi3m#>StfiZ-spZP)3B!r zIM5E}fJdVCRkQdpI@z#{SUV@5B&SJ+Xg2O8Sz!uF8sf}e6lwcNt3p-H{0(~gYq~E^ zG1&JWSS6qAk2CANzBwu}YI=Hu4$-&dFwHvSmPXoFfP@hXQxe=2(bESZUZj+f_Fe5y zuB`#9$RUOm{3J6FY2&1=(h43Vu_qj+dcwQId)eJ|&&9AUus<4(^h8h6^cgE#1Tok7 zRx}^NnC8X6;q#eV^dk)?5F)g7Rw8RNhpg#A3qpjlWW$a1h7~82VW2ECglZ@S-0L-} zKGx==%4U9i4F!JZ{@eY^%}A{6~4Lnzv4Dg=s3oBJSRbv;dfgQh-%CNEgs z&(c)XycKqPYY6ph4`*q@KvVvwG@ZUiQx&1<2B1mKRHGvrMQC~@OB11u&_yBpU_)I| zoZ^@oi2gnE{Z5$<=29Z?_DdPWx!^2F#?g{}aDUV-iPPF_A0pWU8p(sGieYn*;4hbP zeF<=eU*?uQkV7ou)#?E(Qo|G8{r|Rx>i`C#UQ~4%1&?maTbIbH}~j zD=G!!Sz|m|anWbS@0=q3i135(f(Gwc%{KT;fdVX%yk_1x~rUR>Wh5ZN8+aEswK2EIf+5%0XxDux133{z&G$62zUVUL3UMMi;*E0OY-A@MO|oNj>@mhm+6QTb}bNiys3Pe*xQK{R|dufW-*(z6?8))u9=kBK#w=1aTH*6LO@68!or{ zK%NSb2Qr;RV&9)E;<(7&(qf1t?+$?&M-~Q0!**^S;CZ<<+>u`oXXG|Qi&?$jLZ7)i zc42_zwg|?t$w-Gc9s>W3eEAB4T08VK4+mjiob{biiF^-DI=DY*-nhr>1WobEkaB}1 zggSeiU*eQ)GG9#_)~7%pHF!{wc?(9f-Zw!{{_*H^=5=GVHS;?XWBdhW2;FzXzVqOp zOc2dtU$J4e3;iWRk>VB#6&7Ud)HFSeC(;IIQsX|wI*FKS`Q9OlD}lJlE#!dovYp`h zA(z5!Pig#oTTcC%I^ia;mxSgoPTGyPeKG=aU86+y>b}x*h|}4(JJ{U_?hqat*k&Rl z-@O{y%kJ%U6N%5-3r;GLSnpcH6B6(BYYlPXy>EBy8_3Q)Iq=XxuW3YE0vf8Pdow@N zmI!{OPFSMMN7>bjCmWV!*H^CkkLV2%-Yrz3Qe^K#=7$P&pX;2?al0YX1!J;cFY2BcAoH65_Y0+je+^5y zxL0C*h2W)2DW~WQsaj<3#Td8S9qa+0HyVT764z^b3(5+63dCKmB&h8r&~v0DFUU0mJzs^o+rcr(oCsvnVn-ggDV?5mWr#}{@9wR`T))JT0iZrd=`0NtOLeUYy(&!q3z99 z8`scL0+@phnUaUngJvlQ`J{$rILUA(R2v5kU4E%Gw>tmqRC+`=Z?ciUE#^%O62Qh3 zUG;$0CfIh#0o-U0wM(}I7>I1zLvYVKfqS=^R-f@nD?q1xp+n#wX{A)!CJ|1WK(h;# zSnMe=5_4Z?q0OJQ(7+#vzPWG*(SBl?Nd&XKBOG}`V5c=bIOUI5LxgdYA+1*?(?blZ z0NjpEuv|aLPZ`vQCJ=W7_c6gLD?P_@`#!AhvUb!O!~d?-+*kAn26{QtGIYfeF?Nz$nGfN071wsfiurbrgLOa~C`FNN3 zCFSjQRO0~)M=Lhq;T9p-;6Z)u4ND*=tdQGQO4=xqGYs}Istq_&Kq5)gQ;>tt>Twh- z4DNM-9@x;^nw`y!bJF?Nnwrb1yZKQeWvu@n;{=P}Tzwb5ldm(Jx5gd8;emO=A63ut z+-fVx!ugCj-(vik^N9>(t?us0*EN<)Lw508>uKx<*@NaGr1xtUvJ@Rg-_zOt4xBtTxxs&H}K*TOo@0 zR7O7&L4bU|raFU9LH0%gddnd*QUSjhA=pH&{u^Yt%7qu9XJ8suB*sc5aqNFA90|`v zAsj>_-0r6}3h?-W`r~UnO2pvTC!jOn)!3fMv1>JuX1ITX%Rq<;8#iW14v^q5!@Dv>ac81lZ!Xc4%(1#4nv6$BP{bTWcqfmRCrY_1_gMLKRXjbql}5fMOM0A;c8R41ojrw**TC z9p)b6P6Qcu4B0c=9N?lOV3`lM0RcB;54VP%k$OUz^f#`On}qXHosgrJL@!E@2?dI~ z74FD~GU+W>`O+njtLCH&aPx3Up_$8Y@8S-92>SN=>j-`6O9&k~0=gzq0f>!_HA!0@ zgkLelsMs?i6K(K^wAy`p)f0LFZ*r+||rC;2F$35dF$ zpMOt>(D`?CnEweK*VZ@&YXBWYsEtM5`pg=HdFjDhSQvnfP3zmVii}fR`Ei+O0lhtd2*fXmWpCei$&q0m@{!7bKj9y#BGUP1+SE5|InpW6CJ8YKzhl5QkRd z%tP)|;lK)J)sXWUIhe`8aFBz!`MMm;N+So8Nz>yLoMjlvQx10wI*SN*n(CbL+NF6b zoNzKr%BN)Kj!ip~J5KFz-o?p2$d+X9L@;D_)2Yhehcv;|O zX*t9f<=Oe+(8jE9up0JPJsBZ>Zn+OPDuRCrPwbX74xs@{60iouK!uyyB*&V#}mx_1#PBqyx)?OYUks7)i;5!zBT)i{d@TnkM_ail^ic0<6%xsF$EQr}1}>yE#+MY{iL0zjY+PrdY8< z7L$f-3?8A;Es)K?^_wepHNYv7f!}5-aB@eh{NQ2WM4f`N!T$kxZ);8MHCWY(9f`FB zR*eD6j)Yn~Vn?^s((-Hxv&85BOrH~Y9Q3L)xa-A!SkEXqQ$w2X|YWam*$M*!`r@iTosn0kz1A`h6CMQwRA$`myT%- zORpIC6%|1oJnffOL&}Pr`3a6c1sqAdLa{5CR)GxL5neP;DF+)utmCY8o1|`O0M^f6 zU(c_H?76()Ww8+6n?eJCzZK3_Cg9_fpkXet8?ZwRSQ&^p3_Z~c^(-$;XeFG_D54*} zo0RgV6Ou!s`8^UXmNwCR6TIE{SwF;B!&dc^awL^%ZcnS!zzynhLfBhq4$aoqa5IK|s51C7tk_%5O1$Pnf lka6U@aTibCpZ|T%fzLVcIR`%Hz~>zJoCBY8;Qs>-{BPGB7&!m{ literal 0 HcmV?d00001 diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py new file mode 100644 index 0000000..6d3b404 --- /dev/null +++ b/src/modulino/firmware_flasher.py @@ -0,0 +1,264 @@ +import os +import sys +from machine import I2C, Pin +import time +from micropython import const + +BOOTLOADER_I2C_ADDRESS = const(0x64) + +# Define I2C pins and initialize I2C +i2c = I2C(0, freq=100000) + +def send_reset(address): + """ + Send a reset command to the I2C device at the given address. + + :param address: I2C address of the device. + :return: 0 if the reset command was sent successfully, otherwise -1. + """ + buffer = b'DIE' + # Pad buffer to 40 bytes + buffer += b'\x00' * (40 - len(buffer)) + + try: + print(f"Sending reset command to address {hex(address)}") + i2c.writeto(address, buffer, True) + except OSError as e: + pass + + time.sleep(0.25) + devices = i2c.scan() + + if address in devices: + return False + elif BOOTLOADER_I2C_ADDRESS in devices: + return True + +def execute_command(opcode, command_buffer, command_length, response_buffer, response_length, verbose=True): + """ + Execute an I2C command on the device. + + :param opcode: The command opcode. + :param command_buffer: The buffer containing the command data. + :param command_length: The length of the command data. + :param response_buffer: The buffer to store the response data. + :param response_length: The expected length of the response data. + :param verbose: Whether to print debug information. + :return: The number of response bytes read, or -1 if an error occurred. + """ + if verbose: + print("Executing command") + + cmd = bytes([opcode, 0xFF ^ opcode]) + i2c.writeto(100, cmd) + if command_length > 0: + i2c.writeto(100, command_buffer) + + time.sleep(0.1) + ack = i2c.readfrom(100, 1)[0] + if ack != 0x79: + print(f"Error first ack: {hex(ack)}") + return -1 + + if command_length > 0: + ack = i2c.readfrom(100, 1)[0] + if ack != 0x79: + while ack == 0x76: + time.sleep(0.1) + ack = i2c.readfrom(100, 1)[0] + if ack != 0x79: + print(f"Error second ack: {hex(ack)}") + return -1 + + if response_length > 0: + data = i2c.readfrom(100, response_length + 1) + for i in range(response_length): + response_buffer[i] = data[i + 1] + + ack = i2c.readfrom(100, 1)[0] + if ack != 0x79: + print(f"Error: {hex(ack)}") + return -1 + + return response_length + +def flash_firmware(firmware, length, verbose=True): + """ + Flash the firmware to the I2C device. + + :param firmware: The binary firmware data. + :param length: The length of the firmware data. + :param verbose: Whether to print debug information. + :return: True if the flashing was successful, otherwise False. + """ + if verbose: + print("Flashing firmware") + + response_buffer = bytearray(255) + if execute_command(0, None, 0, response_buffer, 20, verbose) < 0: + print("Failed :(") + return False + for byte in response_buffer: + print(hex(byte)) + + if verbose: + print("Getting device ID") + if execute_command(2, None, 0, response_buffer, 3, verbose) < 0: + print("Failed to get device ID") + return False + for byte in response_buffer: + print(hex(byte)) + + if verbose: + print("Mass erase") + erase_buffer = bytearray([0xFF, 0xFF, 0x0]) + if execute_command(0x44, erase_buffer, 3, None, 0, verbose) < 0: + print("Failed to mass erase") + return False + + for i in range(0, length, 128): + progress_bar(i, length) + write_buffer = bytearray([8, 0, i // 256, i % 256]) + if write_firmware_page(0x32, write_buffer, 5, firmware[i:i + 128], 128, verbose) < 0: + print(f"Failed to write page {hex(i)}") + return False + time.sleep(0.01) + + progress_bar(length, length) # Complete the progress bar + + print("Starting firmware") + jump_buffer = bytearray([0x8, 0x00, 0x00, 0x00, 0x8]) + if execute_command(0x21, jump_buffer, 5, None, 0, verbose) < 0: + print("Failed to start firmware") + return False + + return True + +def write_firmware_page(opcode, command_buffer, command_length, firmware_buffer, firmware_length, verbose=True): + """ + Write a page of the firmware to the I2C device. + + :param opcode: The command opcode. + :param command_buffer: The buffer containing the command data. + :param command_length: The length of the command data. + :param firmware_buffer: The buffer containing the firmware data. + :param firmware_length: The length of the firmware data. + :param verbose: Whether to print debug information. + :return: The number of bytes written, or -1 if an error occurred. + """ + cmd = bytes([opcode, 0xFF ^ opcode]) + i2c.writeto(100, cmd) + + if command_length > 0: + command_buffer[command_length - 1] = 0 + for i in range(command_length - 1): + command_buffer[command_length - 1] ^= command_buffer[i] + i2c.writeto(100, command_buffer) + + ack = i2c.readfrom(100, 1)[0] + if ack != 0x79: + print(f"Error first ack: {hex(ack)}") + return -1 + + tmp_buffer = bytearray(firmware_length + 2) + tmp_buffer[0] = firmware_length - 1 + tmp_buffer[1:firmware_length + 1] = firmware_buffer + tmp_buffer[firmware_length + 1] = 0 + for i in range(firmware_length + 1): + tmp_buffer[firmware_length + 1] ^= tmp_buffer[i] + + i2c.writeto(100, tmp_buffer) + ack = i2c.readfrom(100, 1)[0] + if ack != 0x79: + print(f"Error: {hex(ack)}") + return -1 + + return firmware_length + +def progress_bar(current, total, bar_length=40): + """ + Print a progress bar to the terminal. + + :param current: The current progress value. + :param total: The total progress value. + :param bar_length: The length of the progress bar in characters. + """ + percent = float(current) / total + arrow = '-' * int(round(percent * bar_length) - 1) + '>' + spaces = ' ' * (bar_length - len(arrow)) + sys.stdout.write(f"\rProgress: [{arrow}{spaces}] {int(round(percent * 100))}%") + sys.stdout.flush() + +def find_bin_files(): + """ + Find all .bin files in the root directory. + + :return: A list of .bin file names. + """ + return [file for file in os.listdir('/') if file.endswith('.bin')] + +def select_file(bin_files): + """ + Prompt the user to select a .bin file to flash. + + :param bin_files: A list of .bin file names. + :return: The selected .bin file name. + """ + if len(bin_files) == 1: + confirm = input(f"Found one bin file: {bin_files[0]}. Do you want to flash it? (yes/no) ") + if confirm.lower() == 'yes': + return bin_files[0] + else: + print("Found bin files:") + for index, file in enumerate(bin_files): + print(f"{index + 1}. {file}") + choice = int(input("Select the file to flash (number): ")) + return bin_files[choice - 1] + +def scan_i2c_devices(): + """ + Scan the I2C bus for devices and prompt the user to select one. + + :return: The selected I2C device address. + """ + devices = i2c.scan() + print("I2C devices found:") + for index, device in enumerate(devices): + print(f"{index + 1}. Address: {hex(device)}") + choice = int(input("Select the I2C device to flash (number): ")) + return devices[choice - 1] + +def setup(): + """ + Setup function to initialize the flashing process. + Finds .bin files, scans for I2C devices, and flashes the selected firmware. + """ + print("Starting setup") + + bin_files = find_bin_files() + if not bin_files: + print("No .bin files found in the root directory.") + return + + bin_file = select_file(bin_files) + + device_address = scan_i2c_devices() + print(f"Resetting device at address {hex(device_address)}") + if send_reset(device_address): + print("Device reset successfully") + else: + print("Failed to reset device") + return + + print(f"Flashing {bin_file} to device at address {hex(BOOTLOADER_I2C_ADDRESS)}") + + with open(bin_file, 'rb') as file: + firmware = file.read() + + if flash_firmware(firmware, len(firmware)): + print("PASS") + else: + print("FAIL") + +# Start the setup +# setup() \ No newline at end of file From 4174b3e83f25f75f139f6dc5b5274ee404cf049e Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Wed, 19 Feb 2025 15:46:12 +0100 Subject: [PATCH 02/16] Make get command work --- src/modulino/firmware_flasher.py | 111 ++++++++++++++++--------------- 1 file changed, 58 insertions(+), 53 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 6d3b404..6c30091 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -5,9 +5,11 @@ from micropython import const BOOTLOADER_I2C_ADDRESS = const(0x64) +ACK = const(0x79) +BUSY = const(0x76) # Define I2C pins and initialize I2C -i2c = I2C(0, freq=100000) +i2c = I2C(0, freq=100000, timeout=50000 * 2) def send_reset(address): """ @@ -23,64 +25,66 @@ def send_reset(address): try: print(f"Sending reset command to address {hex(address)}") i2c.writeto(address, buffer, True) + return False except OSError as e: - pass + # pass + time.sleep(0.25) + return True - time.sleep(0.25) - devices = i2c.scan() + # time.sleep(0.25) + # devices = i2c.scan() - if address in devices: - return False - elif BOOTLOADER_I2C_ADDRESS in devices: - return True + # if address in devices: + # return False + # elif BOOTLOADER_I2C_ADDRESS in devices: + # return True -def execute_command(opcode, command_buffer, command_length, response_buffer, response_length, verbose=True): +def execute_command(opcode, command_data, response_length, verbose=True): """ Execute an I2C command on the device. :param opcode: The command opcode. :param command_buffer: The buffer containing the command data. - :param command_length: The length of the command data. - :param response_buffer: The buffer to store the response data. :param response_length: The expected length of the response data. :param verbose: Whether to print debug information. :return: The number of response bytes read, or -1 if an error occurred. """ if verbose: - print("Executing command") - - cmd = bytes([opcode, 0xFF ^ opcode]) - i2c.writeto(100, cmd) - if command_length > 0: - i2c.writeto(100, command_buffer) - - time.sleep(0.1) - ack = i2c.readfrom(100, 1)[0] - if ack != 0x79: - print(f"Error first ack: {hex(ack)}") - return -1 - - if command_length > 0: - ack = i2c.readfrom(100, 1)[0] - if ack != 0x79: - while ack == 0x76: - time.sleep(0.1) - ack = i2c.readfrom(100, 1)[0] - if ack != 0x79: - print(f"Error second ack: {hex(ack)}") - return -1 - - if response_length > 0: - data = i2c.readfrom(100, response_length + 1) - for i in range(response_length): - response_buffer[i] = data[i + 1] - - ack = i2c.readfrom(100, 1)[0] - if ack != 0x79: - print(f"Error: {hex(ack)}") - return -1 - - return response_length + print(f"Executing command {hex(opcode)}") + + cmd = bytes([opcode, 0xFF ^ opcode]) # Send command code and complement (XOR = 0x00) + i2c.writeto(BOOTLOADER_I2C_ADDRESS, cmd, True) + + if command_data is not None: + res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] + if res != ACK: + print(f"Error first ack: {hex(res)}") + return None + i2c.writeto(BOOTLOADER_I2C_ADDRESS, command_data, True) + + res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] + if res != ACK: + while res == BUSY: + time.sleep(0.1) + res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] + print("Retry") + if res != ACK: + print(f"Error second ack: {hex(res)}") + return None + + if response_length == 0: + return None + + data = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, response_length) + amount_of_bytes = data[0] + 1 + print(f"Retrieved {amount_of_bytes} bytes") + + res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] + if res != ACK: + print(f"Error: {hex(res)}") + return None + + return data[1 : amount_of_bytes + 1] def flash_firmware(firmware, length, verbose=True): """ @@ -93,14 +97,15 @@ def flash_firmware(firmware, length, verbose=True): """ if verbose: print("Flashing firmware") - - response_buffer = bytearray(255) - if execute_command(0, None, 0, response_buffer, 20, verbose) < 0: + data = execute_command(0, None, 20, verbose) + if len(data) == 0: print("Failed :(") return False - for byte in response_buffer: + for byte in data: print(hex(byte)) + return True # Debug. Remove when done + if verbose: print("Getting device ID") if execute_command(2, None, 0, response_buffer, 3, verbose) < 0: @@ -215,7 +220,7 @@ def select_file(bin_files): choice = int(input("Select the file to flash (number): ")) return bin_files[choice - 1] -def scan_i2c_devices(): +def select_i2c_device(): """ Scan the I2C bus for devices and prompt the user to select one. @@ -240,9 +245,9 @@ def setup(): print("No .bin files found in the root directory.") return - bin_file = select_file(bin_files) + bin_file = "node_base.bin" # select_file(bin_files) - device_address = scan_i2c_devices() + device_address = 30 # select_i2c_device() print(f"Resetting device at address {hex(device_address)}") if send_reset(device_address): print("Device reset successfully") @@ -261,4 +266,4 @@ def setup(): print("FAIL") # Start the setup -# setup() \ No newline at end of file +setup() From b93c13ca606ec5e619fc8c3cf843894987361d78 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Wed, 19 Feb 2025 16:11:52 +0100 Subject: [PATCH 03/16] Improve command handling --- src/modulino/firmware_flasher.py | 71 +++++++++++++++++++------------- 1 file changed, 42 insertions(+), 29 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 6c30091..29929d2 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -8,6 +8,10 @@ ACK = const(0x79) BUSY = const(0x76) +CMD_GET = const(0x00) # Gets the version and the allowed commands +CMD_GET_VERSION = const(0x01) # Gets the protocol version +CMD_GET_ID = const(0x02) # Get chip ID + # Define I2C pins and initialize I2C i2c = I2C(0, freq=100000, timeout=50000 * 2) @@ -39,7 +43,24 @@ def send_reset(address): # elif BOOTLOADER_I2C_ADDRESS in devices: # return True -def execute_command(opcode, command_data, response_length, verbose=True): +def wait_for_ack(): + """ + Wait for an acknowledgment from the I2C device. + + :return: True if an acknowledgment was received, otherwise False. + """ + res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] + if res != ACK: + while res == BUSY: + time.sleep(0.1) + res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] + print("Waiting for device to be finish processing") + if res != ACK: + print(f"Error processing command: {hex(res)}") + return False + return True + +def execute_command(opcode, command_data, response_length = 0, verbose=True): """ Execute an I2C command on the device. @@ -54,22 +75,14 @@ def execute_command(opcode, command_data, response_length, verbose=True): cmd = bytes([opcode, 0xFF ^ opcode]) # Send command code and complement (XOR = 0x00) i2c.writeto(BOOTLOADER_I2C_ADDRESS, cmd, True) + if not wait_for_ack(): + print(f"Command not acknowledged: {hex(opcode)}") + return None if command_data is not None: - res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] - if res != ACK: - print(f"Error first ack: {hex(res)}") - return None i2c.writeto(BOOTLOADER_I2C_ADDRESS, command_data, True) - - res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] - if res != ACK: - while res == BUSY: - time.sleep(0.1) - res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] - print("Retry") - if res != ACK: - print(f"Error second ack: {hex(res)}") + if not wait_for_ack(): + print("Command failed") return None if response_length == 0: @@ -79,9 +92,8 @@ def execute_command(opcode, command_data, response_length, verbose=True): amount_of_bytes = data[0] + 1 print(f"Retrieved {amount_of_bytes} bytes") - res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] - if res != ACK: - print(f"Error: {hex(res)}") + if not wait_for_ack(): + print("Failed completing command") return None return data[1 : amount_of_bytes + 1] @@ -95,24 +107,25 @@ def flash_firmware(firmware, length, verbose=True): :param verbose: Whether to print debug information. :return: True if the flashing was successful, otherwise False. """ - if verbose: - print("Flashing firmware") - data = execute_command(0, None, 20, verbose) - if len(data) == 0: + data = execute_command(CMD_GET, None, 20, verbose) + if data is None: print("Failed :(") return False - for byte in data: + + print(f"Bootloader version: {hex(data[0])}") + print("Supported commands:") + for byte in data[1:]: print(hex(byte)) - return True # Debug. Remove when done - - if verbose: - print("Getting device ID") - if execute_command(2, None, 0, response_buffer, 3, verbose) < 0: + data = execute_command(CMD_GET_ID, None, 3, verbose) + if data is None: print("Failed to get device ID") return False - for byte in response_buffer: - print(hex(byte)) + + chip_id = (data[0] << 8) | data[1] # Chip ID: Byte 1 = MSB, Byte 2 = LSB + print(f"Chip ID: {chip_id}") + + return True # Debug. Remove when done if verbose: print("Mass erase") From 9a7ea5ef7c3701924425171df4b2db8982e09aa6 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 11:08:41 +0100 Subject: [PATCH 04/16] Improve reset stability --- src/modulino/firmware_flasher.py | 62 +++++++++++++++----------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 29929d2..4cdc6f2 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -11,9 +11,12 @@ CMD_GET = const(0x00) # Gets the version and the allowed commands CMD_GET_VERSION = const(0x01) # Gets the protocol version CMD_GET_ID = const(0x02) # Get chip ID +CMD_ERASE = const(0x44) # Erase memory +CMD_GO = const(0x21) # Jumps to user application code located in the internal flash memory +CMD_WRITE = const(0x32) # Write memory # Define I2C pins and initialize I2C -i2c = I2C(0, freq=100000, timeout=50000 * 2) +i2c = I2C(0, freq=100000) def send_reset(address): """ @@ -23,25 +26,23 @@ def send_reset(address): :return: 0 if the reset command was sent successfully, otherwise -1. """ buffer = b'DIE' - # Pad buffer to 40 bytes - buffer += b'\x00' * (40 - len(buffer)) + buffer += b'\x00' * (8 - len(buffer)) # Pad buffer to 8 bytes try: print(f"Sending reset command to address {hex(address)}") i2c.writeto(address, buffer, True) - return False - except OSError as e: - # pass - time.sleep(0.25) + print("Reset command sent successfully") + time.sleep(0.25) # Wait for the device to reset return True - - # time.sleep(0.25) - # devices = i2c.scan() - - # if address in devices: - # return False - # elif BOOTLOADER_I2C_ADDRESS in devices: - # return True + except OSError as e: + # ENODEV can be thrown if either the device reset while writing out the buffer or if the device + # was already in bootloader mode in which case there is no device at the original address + if e.errno == 19: + time.sleep(0.25) # Wait for the device to reset + return True + else: + print(f"Error sending reset command: {e}") + return False def wait_for_ack(): """ @@ -98,7 +99,7 @@ def execute_command(opcode, command_data, response_length = 0, verbose=True): return data[1 : amount_of_bytes + 1] -def flash_firmware(firmware, length, verbose=True): +def flash_firmware(firmware, verbose=True): """ Flash the firmware to the I2C device. @@ -127,17 +128,14 @@ def flash_firmware(firmware, length, verbose=True): return True # Debug. Remove when done - if verbose: - print("Mass erase") - erase_buffer = bytearray([0xFF, 0xFF, 0x0]) - if execute_command(0x44, erase_buffer, 3, None, 0, verbose) < 0: - print("Failed to mass erase") - return False + print("Erasing memory...") + erase_buffer = bytearray([0xFF, 0xFF, 0x0]) # Mass erase flash + execute_command(CMD_ERASE, erase_buffer, 0, verbose) - for i in range(0, length, 128): + for i in range(0, len(firmware), 128): progress_bar(i, length) write_buffer = bytearray([8, 0, i // 256, i % 256]) - if write_firmware_page(0x32, write_buffer, 5, firmware[i:i + 128], 128, verbose) < 0: + if write_firmware_page(write_buffer, 5, firmware[i:i + 128], 128, verbose) < 0: print(f"Failed to write page {hex(i)}") return False time.sleep(0.01) @@ -145,14 +143,12 @@ def flash_firmware(firmware, length, verbose=True): progress_bar(length, length) # Complete the progress bar print("Starting firmware") - jump_buffer = bytearray([0x8, 0x00, 0x00, 0x00, 0x8]) - if execute_command(0x21, jump_buffer, 5, None, 0, verbose) < 0: - print("Failed to start firmware") - return False + go_params = bytearray([0x8, 0x00, 0x00, 0x00, 0x8]) + execute_command(CMD_GO, go_params, 0, verbose) # Jump to the application return True -def write_firmware_page(opcode, command_buffer, command_length, firmware_buffer, firmware_length, verbose=True): +def write_firmware_page(command_buffer, command_length, firmware_buffer, firmware_length, verbose=True): """ Write a page of the firmware to the I2C device. @@ -164,7 +160,7 @@ def write_firmware_page(opcode, command_buffer, command_length, firmware_buffer, :param verbose: Whether to print debug information. :return: The number of bytes written, or -1 if an error occurred. """ - cmd = bytes([opcode, 0xFF ^ opcode]) + cmd = bytes([CMD_WRITE, 0xFF ^ CMD_WRITE]) i2c.writeto(100, cmd) if command_length > 0: @@ -273,10 +269,10 @@ def setup(): with open(bin_file, 'rb') as file: firmware = file.read() - if flash_firmware(firmware, len(firmware)): - print("PASS") + if flash_firmware(firmware): + print("Firmware flashed successfully") else: - print("FAIL") + print("Failed to flash firmware") # Start the setup setup() From 70ca732179377780246ebb593223a2964a4ab966 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 11:13:08 +0100 Subject: [PATCH 05/16] Load firmware file inside flash function --- src/modulino/firmware_flasher.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 4cdc6f2..2396009 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -99,7 +99,7 @@ def execute_command(opcode, command_data, response_length = 0, verbose=True): return data[1 : amount_of_bytes + 1] -def flash_firmware(firmware, verbose=True): +def flash_firmware(firmware_path, verbose=True): """ Flash the firmware to the I2C device. @@ -129,13 +129,16 @@ def flash_firmware(firmware, verbose=True): return True # Debug. Remove when done print("Erasing memory...") - erase_buffer = bytearray([0xFF, 0xFF, 0x0]) # Mass erase flash - execute_command(CMD_ERASE, erase_buffer, 0, verbose) + erase_params = bytearray([0xFF, 0xFF, 0x0]) # Mass erase flash + execute_command(CMD_ERASE, erase_params, 0, verbose) - for i in range(0, len(firmware), 128): + with open(firmware_path, 'rb') as file: + firmware_data = file.read() + + for i in range(0, len(firmware_data), 128): progress_bar(i, length) write_buffer = bytearray([8, 0, i // 256, i % 256]) - if write_firmware_page(write_buffer, 5, firmware[i:i + 128], 128, verbose) < 0: + if write_firmware_page(write_buffer, 5, firmware_data[i:i + 128], 128, verbose) < 0: print(f"Failed to write page {hex(i)}") return False time.sleep(0.01) @@ -266,10 +269,7 @@ def setup(): print(f"Flashing {bin_file} to device at address {hex(BOOTLOADER_I2C_ADDRESS)}") - with open(bin_file, 'rb') as file: - firmware = file.read() - - if flash_firmware(firmware): + if flash_firmware(bin_file): print("Firmware flashed successfully") else: print("Failed to flash firmware") From acf63d6556a421c806d9017c45779b317ff8300a Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 11:17:59 +0100 Subject: [PATCH 06/16] Fix progress bar function --- src/modulino/firmware_flasher.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 2396009..7532865 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -91,7 +91,7 @@ def execute_command(opcode, command_data, response_length = 0, verbose=True): data = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, response_length) amount_of_bytes = data[0] + 1 - print(f"Retrieved {amount_of_bytes} bytes") + print(f"Retrieved {amount_of_bytes} bytes") # TODO: Remove this line if not wait_for_ack(): print("Failed completing command") @@ -126,28 +126,28 @@ def flash_firmware(firmware_path, verbose=True): chip_id = (data[0] << 8) | data[1] # Chip ID: Byte 1 = MSB, Byte 2 = LSB print(f"Chip ID: {chip_id}") - return True # Debug. Remove when done - print("Erasing memory...") erase_params = bytearray([0xFF, 0xFF, 0x0]) # Mass erase flash - execute_command(CMD_ERASE, erase_params, 0, verbose) + #execute_command(CMD_ERASE, erase_params, 0, verbose) with open(firmware_path, 'rb') as file: firmware_data = file.read() + total_bytes = len(firmware_data) - for i in range(0, len(firmware_data), 128): - progress_bar(i, length) + print(f"Flashing {total_bytes} bytes of firmware") + for i in range(0, total_bytes, 128): + progress_bar(i, total_bytes) write_buffer = bytearray([8, 0, i // 256, i % 256]) - if write_firmware_page(write_buffer, 5, firmware_data[i:i + 128], 128, verbose) < 0: - print(f"Failed to write page {hex(i)}") - return False + # if write_firmware_page(write_buffer, 5, firmware_data[i:i + 128], 128, verbose) < 0: + # print(f"Failed to write page {hex(i)}") + # return False time.sleep(0.01) - progress_bar(length, length) # Complete the progress bar + progress_bar(total_bytes, total_bytes) # Complete the progress bar print("Starting firmware") go_params = bytearray([0x8, 0x00, 0x00, 0x00, 0x8]) - execute_command(CMD_GO, go_params, 0, verbose) # Jump to the application + #execute_command(CMD_GO, go_params, 0, verbose) # Jump to the application return True @@ -201,10 +201,11 @@ def progress_bar(current, total, bar_length=40): :param bar_length: The length of the progress bar in characters. """ percent = float(current) / total - arrow = '-' * int(round(percent * bar_length) - 1) + '>' + arrow = '-' * int(round(percent * bar_length)) spaces = ' ' * (bar_length - len(arrow)) sys.stdout.write(f"\rProgress: [{arrow}{spaces}] {int(round(percent * 100))}%") - sys.stdout.flush() + if current == total: + sys.stdout.write('\n') def find_bin_files(): """ From 7e96877d3af8f0cfca38dd804dcba4cdf6731ecc Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 14:12:58 +0100 Subject: [PATCH 07/16] Improve debugging output --- src/modulino/firmware_flasher.py | 138 +++++++++++++++---------------- 1 file changed, 68 insertions(+), 70 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 7532865..31de709 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -13,7 +13,8 @@ CMD_GET_ID = const(0x02) # Get chip ID CMD_ERASE = const(0x44) # Erase memory CMD_GO = const(0x21) # Jumps to user application code located in the internal flash memory -CMD_WRITE = const(0x32) # Write memory +CMD_WRITE_NO_STRETCH = const(0x32) # Writes up to 256 bytes to the memory, starting from an address specified +CHUNK_SIZE = const(128) # Size of the memory chunk to write # Define I2C pins and initialize I2C i2c = I2C(0, freq=100000) @@ -29,9 +30,9 @@ def send_reset(address): buffer += b'\x00' * (8 - len(buffer)) # Pad buffer to 8 bytes try: - print(f"Sending reset command to address {hex(address)}") + print(f"🔄 Resetting device at address {hex(address)}") i2c.writeto(address, buffer, True) - print("Reset command sent successfully") + print("✅ Reset command sent successfully") time.sleep(0.25) # Wait for the device to reset return True except OSError as e: @@ -55,13 +56,12 @@ def wait_for_ack(): while res == BUSY: time.sleep(0.1) res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] - print("Waiting for device to be finish processing") if res != ACK: - print(f"Error processing command: {hex(res)}") + print(f"❌ Error processing command. Result code: {hex(res)}") return False return True -def execute_command(opcode, command_data, response_length = 0, verbose=True): +def execute_command(opcode, command_data, response_length = 0, verbose=False): """ Execute an I2C command on the device. @@ -72,18 +72,18 @@ def execute_command(opcode, command_data, response_length = 0, verbose=True): :return: The number of response bytes read, or -1 if an error occurred. """ if verbose: - print(f"Executing command {hex(opcode)}") + print(f"đŸ•ĩī¸ Executing command {hex(opcode)}") cmd = bytes([opcode, 0xFF ^ opcode]) # Send command code and complement (XOR = 0x00) i2c.writeto(BOOTLOADER_I2C_ADDRESS, cmd, True) if not wait_for_ack(): - print(f"Command not acknowledged: {hex(opcode)}") + print(f"❌ Command not acknowledged: {hex(opcode)}") return None if command_data is not None: i2c.writeto(BOOTLOADER_I2C_ADDRESS, command_data, True) if not wait_for_ack(): - print("Command failed") + print("❌ Command failed") return None if response_length == 0: @@ -91,15 +91,17 @@ def execute_command(opcode, command_data, response_length = 0, verbose=True): data = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, response_length) amount_of_bytes = data[0] + 1 - print(f"Retrieved {amount_of_bytes} bytes") # TODO: Remove this line + + if verbose: + print(f"đŸ“Ļ Received {amount_of_bytes} bytes") if not wait_for_ack(): - print("Failed completing command") + print("❌ Failed completing command") return None return data[1 : amount_of_bytes + 1] -def flash_firmware(firmware_path, verbose=True): +def flash_firmware(firmware_path, verbose=False): """ Flash the firmware to the I2C device. @@ -110,23 +112,22 @@ def flash_firmware(firmware_path, verbose=True): """ data = execute_command(CMD_GET, None, 20, verbose) if data is None: - print("Failed :(") + print("❌ Failed to get command list") return False - print(f"Bootloader version: {hex(data[0])}") - print("Supported commands:") - for byte in data[1:]: - print(hex(byte)) + print(f"â„šī¸ Bootloader version: {hex(data[0])}") + print("👀 Supported commands:") + print(", ".join([hex(byte) for byte in data[1:]])) data = execute_command(CMD_GET_ID, None, 3, verbose) if data is None: - print("Failed to get device ID") + print("❌ Failed to get device ID") return False chip_id = (data[0] << 8) | data[1] # Chip ID: Byte 1 = MSB, Byte 2 = LSB - print(f"Chip ID: {chip_id}") + print(f"â„šī¸ Chip ID: {chip_id}") - print("Erasing memory...") + print("đŸ—‘ī¸ Erasing memory...") erase_params = bytearray([0xFF, 0xFF, 0x0]) # Mass erase flash #execute_command(CMD_ERASE, erase_params, 0, verbose) @@ -134,63 +135,62 @@ def flash_firmware(firmware_path, verbose=True): firmware_data = file.read() total_bytes = len(firmware_data) - print(f"Flashing {total_bytes} bytes of firmware") - for i in range(0, total_bytes, 128): + print(f"đŸ”Ĩ Flashing {total_bytes} bytes of firmware") + for i in range(0, total_bytes, CHUNK_SIZE): progress_bar(i, total_bytes) - write_buffer = bytearray([8, 0, i // 256, i % 256]) - # if write_firmware_page(write_buffer, 5, firmware_data[i:i + 128], 128, verbose) < 0: - # print(f"Failed to write page {hex(i)}") - # return False - time.sleep(0.01) + start_address = bytearray([8, 0, i // 256, i % 256]) # 4-byte address: byte 1 = MSB, byte 4 = LSB + checksum = 0 + for b in start_address: + checksum ^= b + start_address.append(checksum) + data_slice = firmware_data[i:i + CHUNK_SIZE] + if not write_firmware_page(start_address, data_slice): + print(f"❌ Failed to write page {hex(i)}") + return False + time.sleep(0.01) # Give the device some time to process the data progress_bar(total_bytes, total_bytes) # Complete the progress bar - print("Starting firmware") + print("🏃 Starting firmware") go_params = bytearray([0x8, 0x00, 0x00, 0x00, 0x8]) #execute_command(CMD_GO, go_params, 0, verbose) # Jump to the application return True -def write_firmware_page(command_buffer, command_length, firmware_buffer, firmware_length, verbose=True): +def write_firmware_page(command_params, firmware_data): """ Write a page of the firmware to the I2C device. - :param opcode: The command opcode. - :param command_buffer: The buffer containing the command data. - :param command_length: The length of the command data. - :param firmware_buffer: The buffer containing the firmware data. - :param firmware_length: The length of the firmware data. + :param command_params: The buffer containing the command data. + :param firmware_data: The buffer containing the firmware data. :param verbose: Whether to print debug information. :return: The number of bytes written, or -1 if an error occurred. """ - cmd = bytes([CMD_WRITE, 0xFF ^ CMD_WRITE]) - i2c.writeto(100, cmd) - - if command_length > 0: - command_buffer[command_length - 1] = 0 - for i in range(command_length - 1): - command_buffer[command_length - 1] ^= command_buffer[i] - i2c.writeto(100, command_buffer) + cmd = bytes([CMD_WRITE_NO_STRETCH, 0xFF ^ CMD_WRITE_NO_STRETCH]) + i2c.writeto(BOOTLOADER_I2C_ADDRESS, cmd) + if not wait_for_ack(): + print("❌ Write command not acknowledged") + return False - ack = i2c.readfrom(100, 1)[0] - if ack != 0x79: - print(f"Error first ack: {hex(ack)}") - return -1 + i2c.writeto(BOOTLOADER_I2C_ADDRESS, command_params) + if not wait_for_ack(): + print("❌ Failed to write command parameters") + return False - tmp_buffer = bytearray(firmware_length + 2) - tmp_buffer[0] = firmware_length - 1 - tmp_buffer[1:firmware_length + 1] = firmware_buffer - tmp_buffer[firmware_length + 1] = 0 - for i in range(firmware_length + 1): - tmp_buffer[firmware_length + 1] ^= tmp_buffer[i] + data_size = len(firmware_data) + tmp_buffer = bytearray(data_size + 2) # Data plus size and checksum + tmp_buffer[0] = data_size - 1 # Size of the data # TODO Arduino code uses data_size - 1 + tmp_buffer[1:data_size + 1] = firmware_data + tmp_buffer[-1] = 0 # Checksum placeholder + for i in range(data_size + 1): # Calculate checksum over size byte + data bytes + tmp_buffer[-1] ^= tmp_buffer[i] - i2c.writeto(100, tmp_buffer) - ack = i2c.readfrom(100, 1)[0] - if ack != 0x79: - print(f"Error: {hex(ack)}") - return -1 + i2c.writeto(BOOTLOADER_I2C_ADDRESS, tmp_buffer) + if not wait_for_ack(): + print("❌ Failed to write firmware") + return False - return firmware_length + return True def progress_bar(current, total, bar_length=40): """ @@ -223,11 +223,11 @@ def select_file(bin_files): :return: The selected .bin file name. """ if len(bin_files) == 1: - confirm = input(f"Found one bin file: {bin_files[0]}. Do you want to flash it? (yes/no) ") + confirm = input(f"👀 Found one bin file: {bin_files[0]}. Do you want to flash it? (yes/no) ") if confirm.lower() == 'yes': return bin_files[0] else: - print("Found bin files:") + print("👀 Found bin files:") for index, file in enumerate(bin_files): print(f"{index + 1}. {file}") choice = int(input("Select the file to flash (number): ")) @@ -240,7 +240,7 @@ def select_i2c_device(): :return: The selected I2C device address. """ devices = i2c.scan() - print("I2C devices found:") + print("👀 I2C devices found:") for index, device in enumerate(devices): print(f"{index + 1}. Address: {hex(device)}") choice = int(input("Select the I2C device to flash (number): ")) @@ -251,29 +251,27 @@ def setup(): Setup function to initialize the flashing process. Finds .bin files, scans for I2C devices, and flashes the selected firmware. """ - print("Starting setup") bin_files = find_bin_files() if not bin_files: - print("No .bin files found in the root directory.") + print("❌ No .bin files found in the root directory.") return bin_file = "node_base.bin" # select_file(bin_files) - device_address = 30 # select_i2c_device() - print(f"Resetting device at address {hex(device_address)}") + device_address = 30 # select_i2c_device() if send_reset(device_address): - print("Device reset successfully") + print("✅ Device reset successfully") else: - print("Failed to reset device") + print("❌ Failed to reset device") return - print(f"Flashing {bin_file} to device at address {hex(BOOTLOADER_I2C_ADDRESS)}") + print(f"đŸ•ĩī¸ Flashing {bin_file} to device at address {hex(BOOTLOADER_I2C_ADDRESS)}") if flash_firmware(bin_file): - print("Firmware flashed successfully") + print("✅ Firmware flashed successfully") else: - print("Failed to flash firmware") + print("❌ Failed to flash firmware") # Start the setup setup() From 8056d98669bd0a58e0a6cec99ef3d76868e6700e Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 14:48:27 +0100 Subject: [PATCH 08/16] Improve documentation --- src/modulino/firmware_flasher.py | 36 +++++++++++++++++--------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 31de709..0d96f6a 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -9,6 +9,7 @@ BUSY = const(0x76) CMD_GET = const(0x00) # Gets the version and the allowed commands +CMD_GET_LENGTH_V12 = const(20) # Length of the response data CMD_GET_VERSION = const(0x01) # Gets the protocol version CMD_GET_ID = const(0x02) # Get chip ID CMD_ERASE = const(0x44) # Erase memory @@ -61,15 +62,15 @@ def wait_for_ack(): return False return True -def execute_command(opcode, command_data, response_length = 0, verbose=False): +def execute_command(opcode, command_params, response_length = 0, verbose=False): """ Execute an I2C command on the device. :param opcode: The command opcode. - :param command_buffer: The buffer containing the command data. - :param response_length: The expected length of the response data. + :param command_params: The buffer containing the command parameters. + :param response_length: The expected length of the response data frame. :param verbose: Whether to print debug information. - :return: The number of response bytes read, or -1 if an error occurred. + :return: The number of response bytes read, or None if an error occurred. """ if verbose: print(f"đŸ•ĩī¸ Executing command {hex(opcode)}") @@ -80,8 +81,8 @@ def execute_command(opcode, command_data, response_length = 0, verbose=False): print(f"❌ Command not acknowledged: {hex(opcode)}") return None - if command_data is not None: - i2c.writeto(BOOTLOADER_I2C_ADDRESS, command_data, True) + if command_params is not None: + i2c.writeto(BOOTLOADER_I2C_ADDRESS, command_params, True) if not wait_for_ack(): print("❌ Command failed") return None @@ -90,16 +91,12 @@ def execute_command(opcode, command_data, response_length = 0, verbose=False): return None data = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, response_length) - amount_of_bytes = data[0] + 1 - - if verbose: - print(f"đŸ“Ļ Received {amount_of_bytes} bytes") if not wait_for_ack(): print("❌ Failed completing command") return None - return data[1 : amount_of_bytes + 1] + return data def flash_firmware(firmware_path, verbose=False): """ @@ -110,14 +107,20 @@ def flash_firmware(firmware_path, verbose=False): :param verbose: Whether to print debug information. :return: True if the flashing was successful, otherwise False. """ - data = execute_command(CMD_GET, None, 20, verbose) + data = execute_command(CMD_GET_VERSION, None, 1, verbose) + if data is None: + print("❌ Failed to get protocol version") + return False + print(f"â„šī¸ Protocol version: {data[0] & 0xF}.{data[0] >> 4}") + + data = execute_command(CMD_GET, None, CMD_GET_LENGTH_V12, verbose) if data is None: print("❌ Failed to get command list") return False - print(f"â„šī¸ Bootloader version: {hex(data[0])}") + print(f"â„šī¸ Bootloader version: {(data[1] & 0xF)}.{data[1] >> 4}") print("👀 Supported commands:") - print(", ".join([hex(byte) for byte in data[1:]])) + print(", ".join([hex(byte) for byte in data[2:]])) data = execute_command(CMD_GET_ID, None, 3, verbose) if data is None: @@ -161,10 +164,9 @@ def write_firmware_page(command_params, firmware_data): """ Write a page of the firmware to the I2C device. - :param command_params: The buffer containing the command data. + :param command_params: The buffer containing the command parameters. :param firmware_data: The buffer containing the firmware data. - :param verbose: Whether to print debug information. - :return: The number of bytes written, or -1 if an error occurred. + :return: True if the page was written successfully, otherwise False. """ cmd = bytes([CMD_WRITE_NO_STRETCH, 0xFF ^ CMD_WRITE_NO_STRETCH]) i2c.writeto(BOOTLOADER_I2C_ADDRESS, cmd) From acb199644f7576e61e1c960850cc28e6020dcd84 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 14:52:05 +0100 Subject: [PATCH 09/16] Enable erase --- src/modulino/firmware_flasher.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 0d96f6a..62f26d6 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -132,7 +132,7 @@ def flash_firmware(firmware_path, verbose=False): print("đŸ—‘ī¸ Erasing memory...") erase_params = bytearray([0xFF, 0xFF, 0x0]) # Mass erase flash - #execute_command(CMD_ERASE, erase_params, 0, verbose) + execute_command(CMD_ERASE, erase_params, 0, verbose) with open(firmware_path, 'rb') as file: firmware_data = file.read() @@ -248,9 +248,9 @@ def select_i2c_device(): choice = int(input("Select the I2C device to flash (number): ")) return devices[choice - 1] -def setup(): +def run(): """ - Setup function to initialize the flashing process. + Initialize the flashing process. Finds .bin files, scans for I2C devices, and flashes the selected firmware. """ @@ -275,5 +275,5 @@ def setup(): else: print("❌ Failed to flash firmware") -# Start the setup -setup() +if __name__ == "__main__": + run() From 7b05996faa279461ddc05325a8469986e65d8158 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 14:57:32 +0100 Subject: [PATCH 10/16] Use no-stretch erase --- src/modulino/firmware_flasher.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 62f26d6..ac52785 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -12,9 +12,10 @@ CMD_GET_LENGTH_V12 = const(20) # Length of the response data CMD_GET_VERSION = const(0x01) # Gets the protocol version CMD_GET_ID = const(0x02) # Get chip ID -CMD_ERASE = const(0x44) # Erase memory +CMD_ERASE_NO_STRETCH = const(0x45) # Erase memory. Returns busy state while operation is ongoing CMD_GO = const(0x21) # Jumps to user application code located in the internal flash memory CMD_WRITE_NO_STRETCH = const(0x32) # Writes up to 256 bytes to the memory, starting from an address specified + CHUNK_SIZE = const(128) # Size of the memory chunk to write # Define I2C pins and initialize I2C @@ -132,7 +133,7 @@ def flash_firmware(firmware_path, verbose=False): print("đŸ—‘ī¸ Erasing memory...") erase_params = bytearray([0xFF, 0xFF, 0x0]) # Mass erase flash - execute_command(CMD_ERASE, erase_params, 0, verbose) + execute_command(CMD_ERASE_NO_STRETCH, erase_params, 0, verbose) with open(firmware_path, 'rb') as file: firmware_data = file.read() @@ -156,7 +157,7 @@ def flash_firmware(firmware_path, verbose=False): print("🏃 Starting firmware") go_params = bytearray([0x8, 0x00, 0x00, 0x00, 0x8]) - #execute_command(CMD_GO, go_params, 0, verbose) # Jump to the application + execute_command(CMD_GO, go_params, 0, verbose) # Jump to the application return True From c1f4a378a36e33be0886b3cb11318c1d0a940266 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 15:02:21 +0100 Subject: [PATCH 11/16] Improve error handling --- src/modulino/firmware_flasher.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index ac52785..98ce1db 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -229,11 +229,15 @@ def select_file(bin_files): confirm = input(f"👀 Found one bin file: {bin_files[0]}. Do you want to flash it? (yes/no) ") if confirm.lower() == 'yes': return bin_files[0] + else: + return None else: print("👀 Found bin files:") for index, file in enumerate(bin_files): print(f"{index + 1}. {file}") choice = int(input("Select the file to flash (number): ")) + if choice < 1 or choice > len(bin_files): + return None return bin_files[choice - 1] def select_i2c_device(): @@ -247,6 +251,8 @@ def select_i2c_device(): for index, device in enumerate(devices): print(f"{index + 1}. Address: {hex(device)}") choice = int(input("Select the I2C device to flash (number): ")) + if choice < 1 or choice > len(devices): + return None return devices[choice - 1] def run(): @@ -260,9 +266,16 @@ def run(): print("❌ No .bin files found in the root directory.") return - bin_file = "node_base.bin" # select_file(bin_files) + bin_file = select_file(bin_files) + if bin_file is None: + print("❌ No file selected") + return + + device_address = select_i2c_device() + if device_address is None: + print("❌ No device selected") + return - device_address = 30 # select_i2c_device() if send_reset(device_address): print("✅ Device reset successfully") else: From 7015055129d9c4bd1b62d41ab08dda92d0391dc9 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 15:25:32 +0100 Subject: [PATCH 12/16] Improve error handling --- src/modulino/firmware_flasher.py | 53 ++++++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 98ce1db..18bd5de 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -1,3 +1,16 @@ +""" +This script is a firmware updater for the Modulino devices. + +It uses the I2C bootloader to flash the firmware to the device. +The script finds all .bin files in the root directory and prompts the user to select a file to flash. +It then scans the I2C bus for devices and prompts the user to select a device to flash. +You must either know the I2C address of the device to be flashed or make sure that only one device is connected. +The script sends a reset command to the device, erases the memory, and writes the firmware to the device in chunks. +Finally, it starts the new firmware on the device. + +Initial author: Sebastian Romero (s.romero@arduino.cc) +""" + import os import sys from machine import I2C, Pin @@ -34,7 +47,7 @@ def send_reset(address): try: print(f"🔄 Resetting device at address {hex(address)}") i2c.writeto(address, buffer, True) - print("✅ Reset command sent successfully") + print("📤 Reset command sent") time.sleep(0.25) # Wait for the device to reset return True except OSError as e: @@ -204,7 +217,7 @@ def progress_bar(current, total, bar_length=40): :param bar_length: The length of the progress bar in characters. """ percent = float(current) / total - arrow = '-' * int(round(percent * bar_length)) + arrow = '=' * int(round(percent * bar_length)) spaces = ' ' * (bar_length - len(arrow)) sys.stdout.write(f"\rProgress: [{arrow}{spaces}] {int(round(percent * 100))}%") if current == total: @@ -225,20 +238,24 @@ def select_file(bin_files): :param bin_files: A list of .bin file names. :return: The selected .bin file name. """ + if len(bin_files) == 0: + print("❌ No .bin files found in the root directory.") + return None + if len(bin_files) == 1: - confirm = input(f"👀 Found one bin file: {bin_files[0]}. Do you want to flash it? (yes/no) ") + confirm = input(f"📄 Found one biary file: {bin_files[0]}. Do you want to flash it? (yes/no) ") if confirm.lower() == 'yes': return bin_files[0] else: return None - else: - print("👀 Found bin files:") - for index, file in enumerate(bin_files): - print(f"{index + 1}. {file}") - choice = int(input("Select the file to flash (number): ")) - if choice < 1 or choice > len(bin_files): - return None - return bin_files[choice - 1] + + print("📄 Found binary files:") + for index, file in enumerate(bin_files): + print(f"{index + 1}. {file}") + choice = int(input("Select the file to flash (number): ")) + if choice < 1 or choice > len(bin_files): + return None + return bin_files[choice - 1] def select_i2c_device(): """ @@ -247,7 +264,19 @@ def select_i2c_device(): :return: The selected I2C device address. """ devices = i2c.scan() - print("👀 I2C devices found:") + + if len(devices) == 0: + print("❌ No I2C devices found") + return None + + if len(devices) == 1: + confirm = input(f"🔌 Found one I2C device at address {hex(devices[0])}. Do you want to flash it? (yes/no) ") + if confirm.lower() == 'yes': + return devices[0] + else: + return None + + print("🔌 I2C devices found:") for index, device in enumerate(devices): print(f"{index + 1}. Address: {hex(device)}") choice = int(input("Select the I2C device to flash (number): ")) From 91fa5f8731972e5e8ab3b1167d658d92b1df1fa9 Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 15:27:19 +0100 Subject: [PATCH 13/16] Remove test data --- examples/buzzer_test.py | 8 -------- examples/node_base.bin | Bin 14464 -> 0 bytes 2 files changed, 8 deletions(-) delete mode 100644 examples/buzzer_test.py delete mode 100755 examples/node_base.bin diff --git a/examples/buzzer_test.py b/examples/buzzer_test.py deleted file mode 100644 index 30ab2bb..0000000 --- a/examples/buzzer_test.py +++ /dev/null @@ -1,8 +0,0 @@ -from modulino import ModulinoBuzzer -from time import sleep - -buzzer = ModulinoBuzzer() - -buzzer.tone(ModulinoBuzzer.NOTES["E5"], 0, blocking=True) -sleep(2) -buzzer.no_tone() diff --git a/examples/node_base.bin b/examples/node_base.bin deleted file mode 100755 index 31904ae5627ae10a270bc38e318fae674705bd17..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 14464 zcmeHte|Q_kwdl;QWXYBt%Tk=k@{g=`?ZlRn)UiScaUkp6#IYrlkfiO)58BF-)2?J2 zHbALf>3xWV^ak93iJ*_zNil@4B?)a^gSqXcE-vA=O#`pH`EkGepdTit-9n(K1R~q9 zweQTz4x#t8@BRC}5BuBMnc0~$=bSme=8TXFFz{IwRf7lplfQSb`+f=PAHZ`M z=8`Go{eOD?U%t&O`S-9snO^Vj`)9_JH}&F<#%60X%DufAJ+hkmrMz~y5wW7ROStD0 zo!cdR?G&BYC9FP0+q#76Q*?fpP;g3Tb1BEalR9CvKjSb03F6t#(1ONWz;u!0%bm|@hr zjJjylgGQZT4l|7UF`xy;BV)W#?=tFyMtmx0)EnIBByQZ;U~iPy>KYzI3`*sUQA5Cs znzVASCa)!YF*TWk_Q72Ot(bzyhmcc4 zdJ1#az&7F3H-=eD^G4KlTll4%V5zk0vUQIc-cPR0X!a2_VtN~JBoET(nc$HJO{kAS zEcOhWaY`B|b;_uR#}T8aU(iX}ZRIe^mA0|2UAk8G=7f{sqvAVvnzdNN(a=Bj+FP;4&Cd!L-~$xTi4uh#>C;OY092JsVh1I_}T$q#}uK4&`>bNw0MG^Y0L`` zUypm?6GcxHKS8B#yGDVvo8U@tq*6CsTW_U@w7)sN30AalxH7w<>2Z`Af_-9<$=Kl? zW4SF1J7SDIImsYBJ(gLa4L2@?UG69Pun_aQj?iMLGePH|JC3}xg4i6!EzcP$50cT5yEH-zLqCM$ES(e8tG7DKnu9+7G0H-J|);6ZRB_z?WQ0FM_QYmi||;~hSe zVme#Cd?m<}xmI3Fu}Sfs`mMsIhO=d9jm(qRB7|8Sbj$MEpOE%o{;l~fxF!#D_)$H< z;o6-8H9&T+38#ucZuRs@=%vt1-#5oU*2voV7%}!istKpgjuSm5ZGb(vx8DkTm;-z0 zn?#=3ERoImlX;vudBtx9dD(_@lRRgBE-$&N-j+nEf4z#QB@Xro`%&uI2|fLWjy+VRY5t4Byyv#I=l(cP+#1>1p2ySs=$@_Z^Omqio8?XW zOwg_~T6Mg6pBY+Y?6!70YvXg0fMqY0I(u~l^lBT>H*C&LJ`YypCz)Zc29;6=n{ok* zA7yG#Z&U8~QRL_CUK%ZyQLUCCDW1hL%s?9cUZi(St#gi1s<+?+ld5IJu$Xf#7kdP1WR_ zMf;oLe$NJekAPIS{GD~B@$Up)2w|=+bYI|d=&n$LMVy1KMgG>0TK!>^I;+ocv=%Jp zmdy37cxFZ$JJ;V4jECw&ox$bkYUt$50(v{%Cz#G5&sUqM)VC*4+}@lQU+uN1!FaRp2DK}`##aZj<(m?{ zi=nU4=Tp7ti^1>3cb(YvDoV|pq7H*hr;bkfStu~j}VZ{2Zzho#5b zi}qu{WAhYd*Mi(4&M%`>@zg7q*FU;CxH@n?L~6s0QyT6iRyCklz@}D#P2B)`dLzs@ zq}}UV1y(h!ZmQ^M=wqy&lLKU@?#?V(*T>uyw|hwcu=(TwN<}jR@jiyMndwoX%mvNF zT0o*doG`J~sHT}py*H8j14}PjJWD~oS*m4!Rg<3n-POt<@|+Fx*3FR;;K>ihU(t{+ zrJ_{p_)^#V8U=Gf|0f(l@19a=OUxbI=yoT5tXaY4{A%(r=r*!bJVa5qq1o^(M^>V* zi>7rAeG&0alO0V$8`UCSFGPswKha^Lj}rJ#VjIv0;NQ>$_)o;N=r15*zlePxmY)2R z2mm~B$_84S(_|l{rJm>rcLBKtO2H}qs(lEj=&`FB(P!+*3-0W9cjSrgHYoXo2kk>W z=i8gjdE)uVg?1Z{lem}aH)1Wo-41^A5yQ(-ME@slNh_>E_(;~U1}SN0e~*w05OWNO1YT*!XLvC^ zmf-ST0~ZWjI5eJb;Nnm*Se(4Tuz5p!iMS!Mu^lJf`~?@iT+LFgRMS6pMsF1bu*5&5u zFRw<6_!9BgtsfDbNE`+-i>l2R=5a2*qJ9^T1KG97w*)IzY&2meDHNO>_E-XzrcEI_ zmYY*bmFC$*I*h=#cZag$No+@8$%A_ON8q&*^UbKTWgB9pE66EwAx`oO3zY^zW}(fS0^(#$rX5sa`I%n8`bfCfl)bWcC!t7Ft<&3rj-jj0uG2c z9l#SkU8jo`Pc{q>m`C(eJ`TuLKy zr0W@7*RSfR<4tO2Pxd$w8XGiMkR2WyqNNL#g3~^!*RmeUQVTls?1+i=NlpPNe;7f` z2ikJa?iNvcM#|%@UP?v2zo-|r1iO(+3Pma_5sLLRrzaNVpjRkPsu#`zjiq`5ut)=j zZ)km<<5ZVmRTG>?xs9 zIU8=dBt0z^31xuCQ%YEBkG3|KQe^)lz{jrzzSivIz0vG0d?nCp+E#d;zde{2A{=}S zywYCfL9mvL!bx8YBo>gdy0K>oj+M3fwYZw`T_b}n7c0HHl*c~bG4ee+N^?|+m5-5ola5Hu%`=Jqn$}8 zQS}Jy#f_iNR>%IVr@xTN%GxLAiasM>dIH4kx4KSbjK~_%eIj~9)|Rt{!TqpjX0=o* zXd-)ZmY3H;4{xopDdf%6&DOO5a>QA4v1WsEPkiMfr-Z{VYvgi~2utTS&aCf%16SQpA zt=lX+PV4Cb{it+vbE&zsuorFHJkMj}&+;n*D}r`IUjF;k8ybONGaw9HgK!*V^5_)q z9d<7XJ}G<*GX4wA+q}t=b7qgUDCpw*xl&AvU7;??*F^Ip5~&YpWvpj?y_6K60gb5- zg(d7QQR}5%y623=^Rm<>I>na(&--DO<=nqZZayMiKs~~fu+}$?wdTNDe>?X(4bBBH zZ<~4a^uK5nM?>qTJ4%E%)qAz^&I#DxuNtHCVDxv-{Y*P5*#b_U1&ufw`Yy493}yJ2 zd%%xhZx8ebVgFL_JPI%H)@HQ3A2Hyh&Lr5IS~i+T+jEdpu&AT$c-f3+q?2^-#m-^Q zj(XAI=#up=-YwL_33asdNVFa_669(z4KsXz!Dz%QodOQgmGhf}orlEdB~A=(-;|Sc zhHb8F33hV)``I2-&YAM&j%cv67xh}n7^gYsOi6Q@F=kkDP;#V$#?gpv)m5!<%>a6v6p+mh7cz9nc>h(c9d*`FLw>^~g@z3k` zMGuiL5cg(=su4uPso|*<$P8J7&>8ob0>fxiv`ch}RFD=owPxujaeR7^S#L)5ajFtO zhJ7w${d!~l`(XY50scQ<^+_gQ0ciJ}z}(=5Xad;)0?-7f9K_Yj49W~)4)<&moS{3U zcIk-J>v}=3ovTH9`c*xP<5r-suOL_UG&Hgzo_HKx1lfcfU<) zYjmTFJXVQ}gd6ev@AJgl;(cHb)x^kp$>)S}sHr189UAKK}1#w?ZPDAHKQ z7jn;{L;OLJdZMt(7WMImcV91t%&B5yh-|_ctE7NdOpchPtXTB5#LOQ@S-oD zqrb30c}=2v%lIw{j-ukR5S&WQ^Mpnv(%5751^1zoF1j+i#yhY^-~8e`u9y#si0c>f zmCAM4-!E7d&Ul*?Hh9eS)ruDy%>F~XPjr5LUJ&>E@lJf;q_|Mb(D?L1G@F0(8>KDy z;OL#@;;*2`$(JKX@K4_;m+%1?LuJR5g>IMd!7A|;XwT-W&{rVVIIy~1dk`?)4)mTd zaNQNE&ppbomR=M&Q4IAGyWygE1aYr%Zg^k+sR`Ov(igSz*|-E`!Hbm#KqejtPikX> zm}@9M@vR5TFWyfw*GWaUdilB#)nQ*(jLO(z z{wV(~sb0K0^lHbhj;$fO6G7I)Em>idJ0y1Q?A!`CkA4erC%(0-J)2iVj0!jse}fRk z=hP10KRh6>C1;L(7JLS=Jz>o5j}{4(x*xQ&2XL$%qz@plI1bZ8X1DlIfjPKF$`jiP zIB_FL!$zxD6hgI-Z_pz;Z66fcOIvIM&j=NYyX6_7)F@rb&)JVEFU)oGCoC7{a$xTs zhP6Isoua)aN9~3;mh!;X+d}psre;jFiC46LJmb#_0}*$K9o6`nca^ub2j(H`4J@)& zRRG61`{3^_)-yC}@Vn#HoSXl>zuJq`{`!ZZmqOK*4Rww<%ReL7)YY(V?f^vZ=YKwp zL64XR7<|lrq>=n1hD_5sr3y9N;ESIhKt8j|A>Pj*x2&f(jaPEmw4M;Iwx|i#LDh*pp{4LTx^oi3m#>StfiZ-spZP)3B!r zIM5E}fJdVCRkQdpI@z#{SUV@5B&SJ+Xg2O8Sz!uF8sf}e6lwcNt3p-H{0(~gYq~E^ zG1&JWSS6qAk2CANzBwu}YI=Hu4$-&dFwHvSmPXoFfP@hXQxe=2(bESZUZj+f_Fe5y zuB`#9$RUOm{3J6FY2&1=(h43Vu_qj+dcwQId)eJ|&&9AUus<4(^h8h6^cgE#1Tok7 zRx}^NnC8X6;q#eV^dk)?5F)g7Rw8RNhpg#A3qpjlWW$a1h7~82VW2ECglZ@S-0L-} zKGx==%4U9i4F!JZ{@eY^%}A{6~4Lnzv4Dg=s3oBJSRbv;dfgQh-%CNEgs z&(c)XycKqPYY6ph4`*q@KvVvwG@ZUiQx&1<2B1mKRHGvrMQC~@OB11u&_yBpU_)I| zoZ^@oi2gnE{Z5$<=29Z?_DdPWx!^2F#?g{}aDUV-iPPF_A0pWU8p(sGieYn*;4hbP zeF<=eU*?uQkV7ou)#?E(Qo|G8{r|Rx>i`C#UQ~4%1&?maTbIbH}~j zD=G!!Sz|m|anWbS@0=q3i135(f(Gwc%{KT;fdVX%yk_1x~rUR>Wh5ZN8+aEswK2EIf+5%0XxDux133{z&G$62zUVUL3UMMi;*E0OY-A@MO|oNj>@mhm+6QTb}bNiys3Pe*xQK{R|dufW-*(z6?8))u9=kBK#w=1aTH*6LO@68!or{ zK%NSb2Qr;RV&9)E;<(7&(qf1t?+$?&M-~Q0!**^S;CZ<<+>u`oXXG|Qi&?$jLZ7)i zc42_zwg|?t$w-Gc9s>W3eEAB4T08VK4+mjiob{biiF^-DI=DY*-nhr>1WobEkaB}1 zggSeiU*eQ)GG9#_)~7%pHF!{wc?(9f-Zw!{{_*H^=5=GVHS;?XWBdhW2;FzXzVqOp zOc2dtU$J4e3;iWRk>VB#6&7Ud)HFSeC(;IIQsX|wI*FKS`Q9OlD}lJlE#!dovYp`h zA(z5!Pig#oTTcC%I^ia;mxSgoPTGyPeKG=aU86+y>b}x*h|}4(JJ{U_?hqat*k&Rl z-@O{y%kJ%U6N%5-3r;GLSnpcH6B6(BYYlPXy>EBy8_3Q)Iq=XxuW3YE0vf8Pdow@N zmI!{OPFSMMN7>bjCmWV!*H^CkkLV2%-Yrz3Qe^K#=7$P&pX;2?al0YX1!J;cFY2BcAoH65_Y0+je+^5y zxL0C*h2W)2DW~WQsaj<3#Td8S9qa+0HyVT764z^b3(5+63dCKmB&h8r&~v0DFUU0mJzs^o+rcr(oCsvnVn-ggDV?5mWr#}{@9wR`T))JT0iZrd=`0NtOLeUYy(&!q3z99 z8`scL0+@phnUaUngJvlQ`J{$rILUA(R2v5kU4E%Gw>tmqRC+`=Z?ciUE#^%O62Qh3 zUG;$0CfIh#0o-U0wM(}I7>I1zLvYVKfqS=^R-f@nD?q1xp+n#wX{A)!CJ|1WK(h;# zSnMe=5_4Z?q0OJQ(7+#vzPWG*(SBl?Nd&XKBOG}`V5c=bIOUI5LxgdYA+1*?(?blZ z0NjpEuv|aLPZ`vQCJ=W7_c6gLD?P_@`#!AhvUb!O!~d?-+*kAn26{QtGIYfeF?Nz$nGfN071wsfiurbrgLOa~C`FNN3 zCFSjQRO0~)M=Lhq;T9p-;6Z)u4ND*=tdQGQO4=xqGYs}Istq_&Kq5)gQ;>tt>Twh- z4DNM-9@x;^nw`y!bJF?Nnwrb1yZKQeWvu@n;{=P}Tzwb5ldm(Jx5gd8;emO=A63ut z+-fVx!ugCj-(vik^N9>(t?us0*EN<)Lw508>uKx<*@NaGr1xtUvJ@Rg-_zOt4xBtTxxs&H}K*TOo@0 zR7O7&L4bU|raFU9LH0%gddnd*QUSjhA=pH&{u^Yt%7qu9XJ8suB*sc5aqNFA90|`v zAsj>_-0r6}3h?-W`r~UnO2pvTC!jOn)!3fMv1>JuX1ITX%Rq<;8#iW14v^q5!@Dv>ac81lZ!Xc4%(1#4nv6$BP{bTWcqfmRCrY_1_gMLKRXjbql}5fMOM0A;c8R41ojrw**TC z9p)b6P6Qcu4B0c=9N?lOV3`lM0RcB;54VP%k$OUz^f#`On}qXHosgrJL@!E@2?dI~ z74FD~GU+W>`O+njtLCH&aPx3Up_$8Y@8S-92>SN=>j-`6O9&k~0=gzq0f>!_HA!0@ zgkLelsMs?i6K(K^wAy`p)f0LFZ*r+||rC;2F$35dF$ zpMOt>(D`?CnEweK*VZ@&YXBWYsEtM5`pg=HdFjDhSQvnfP3zmVii}fR`Ei+O0lhtd2*fXmWpCei$&q0m@{!7bKj9y#BGUP1+SE5|InpW6CJ8YKzhl5QkRd z%tP)|;lK)J)sXWUIhe`8aFBz!`MMm;N+So8Nz>yLoMjlvQx10wI*SN*n(CbL+NF6b zoNzKr%BN)Kj!ip~J5KFz-o?p2$d+X9L@;D_)2Yhehcv;|O zX*t9f<=Oe+(8jE9up0JPJsBZ>Zn+OPDuRCrPwbX74xs@{60iouK!uyyB*&V#}mx_1#PBqyx)?OYUks7)i;5!zBT)i{d@TnkM_ail^ic0<6%xsF$EQr}1}>yE#+MY{iL0zjY+PrdY8< z7L$f-3?8A;Es)K?^_wepHNYv7f!}5-aB@eh{NQ2WM4f`N!T$kxZ);8MHCWY(9f`FB zR*eD6j)Yn~Vn?^s((-Hxv&85BOrH~Y9Q3L)xa-A!SkEXqQ$w2X|YWam*$M*!`r@iTosn0kz1A`h6CMQwRA$`myT%- zORpIC6%|1oJnffOL&}Pr`3a6c1sqAdLa{5CR)GxL5neP;DF+)utmCY8o1|`O0M^f6 zU(c_H?76()Ww8+6n?eJCzZK3_Cg9_fpkXet8?ZwRSQ&^p3_Z~c^(-$;XeFG_D54*} zo0RgV6Ou!s`8^UXmNwCR6TIE{SwF;B!&dc^awL^%ZcnS!zzynhLfBhq4$aoqa5IK|s51C7tk_%5O1$Pnf lka6U@aTibCpZ|T%fzLVcIR`%Hz~>zJoCBY8;Qs>-{BPGB7&!m{ From de8b6d1305ff8cc4f35a89ba279a3f3957973e4c Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 16:25:56 +0100 Subject: [PATCH 14/16] Move bootloader code to Modulino --- examples/change_address.py | 3 +- src/modulino/firmware_flasher.py | 127 +++++++++++++------------------ src/modulino/modulino.py | 26 ++++++- 3 files changed, 81 insertions(+), 75 deletions(-) diff --git a/examples/change_address.py b/examples/change_address.py index 2a0c68b..ec49216 100644 --- a/examples/change_address.py +++ b/examples/change_address.py @@ -12,7 +12,8 @@ from modulino import Modulino print() -devices = Modulino.available_devices() +bus = None # Change this to the I2C bus you are using on 3rd party host boards +devices = Modulino.available_devices(bus) if len(devices) == 0: print("No devices found on the bus. Try resetting the board.") diff --git a/src/modulino/firmware_flasher.py b/src/modulino/firmware_flasher.py index 18bd5de..6b45cc1 100644 --- a/src/modulino/firmware_flasher.py +++ b/src/modulino/firmware_flasher.py @@ -13,9 +13,10 @@ import os import sys -from machine import I2C, Pin import time from micropython import const +from machine import I2C +from modulino import Modulino BOOTLOADER_I2C_ADDRESS = const(0x64) ACK = const(0x79) @@ -31,55 +32,29 @@ CHUNK_SIZE = const(128) # Size of the memory chunk to write -# Define I2C pins and initialize I2C -i2c = I2C(0, freq=100000) +bus = None # Change this to the I2C bus you are using on 3rd party host boards -def send_reset(address): - """ - Send a reset command to the I2C device at the given address. - - :param address: I2C address of the device. - :return: 0 if the reset command was sent successfully, otherwise -1. - """ - buffer = b'DIE' - buffer += b'\x00' * (8 - len(buffer)) # Pad buffer to 8 bytes - - try: - print(f"🔄 Resetting device at address {hex(address)}") - i2c.writeto(address, buffer, True) - print("📤 Reset command sent") - time.sleep(0.25) # Wait for the device to reset - return True - except OSError as e: - # ENODEV can be thrown if either the device reset while writing out the buffer or if the device - # was already in bootloader mode in which case there is no device at the original address - if e.errno == 19: - time.sleep(0.25) # Wait for the device to reset - return True - else: - print(f"Error sending reset command: {e}") - return False - -def wait_for_ack(): +def wait_for_ack(bus): """ Wait for an acknowledgment from the I2C device. :return: True if an acknowledgment was received, otherwise False. """ - res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] + res = bus.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] if res != ACK: while res == BUSY: time.sleep(0.1) - res = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] + res = bus.readfrom(BOOTLOADER_I2C_ADDRESS, 1)[0] if res != ACK: print(f"❌ Error processing command. Result code: {hex(res)}") return False return True -def execute_command(opcode, command_params, response_length = 0, verbose=False): +def execute_command(bus, opcode, command_params, response_length = 0, verbose=False): """ Execute an I2C command on the device. + :param bus: The I2C bus to use. :param opcode: The command opcode. :param command_params: The buffer containing the command parameters. :param response_length: The expected length of the response data frame. @@ -90,44 +65,45 @@ def execute_command(opcode, command_params, response_length = 0, verbose=False): print(f"đŸ•ĩī¸ Executing command {hex(opcode)}") cmd = bytes([opcode, 0xFF ^ opcode]) # Send command code and complement (XOR = 0x00) - i2c.writeto(BOOTLOADER_I2C_ADDRESS, cmd, True) - if not wait_for_ack(): + bus.writeto(BOOTLOADER_I2C_ADDRESS, cmd, True) + if not wait_for_ack(bus): print(f"❌ Command not acknowledged: {hex(opcode)}") return None if command_params is not None: - i2c.writeto(BOOTLOADER_I2C_ADDRESS, command_params, True) - if not wait_for_ack(): + bus.writeto(BOOTLOADER_I2C_ADDRESS, command_params, True) + if not wait_for_ack(bus): print("❌ Command failed") return None if response_length == 0: return None - data = i2c.readfrom(BOOTLOADER_I2C_ADDRESS, response_length) + data = bus.readfrom(BOOTLOADER_I2C_ADDRESS, response_length) - if not wait_for_ack(): + if not wait_for_ack(bus): print("❌ Failed completing command") return None return data -def flash_firmware(firmware_path, verbose=False): +def flash_firmware(device : Modulino, firmware_path, verbose=False): """ Flash the firmware to the I2C device. - :param firmware: The binary firmware data. - :param length: The length of the firmware data. + :param device: The Modulino device to flash. + :param firmware_path: The binary firmware path. :param verbose: Whether to print debug information. :return: True if the flashing was successful, otherwise False. """ - data = execute_command(CMD_GET_VERSION, None, 1, verbose) + bus = device.i2c_bus + data = execute_command(bus, CMD_GET_VERSION, None, 1, verbose) if data is None: print("❌ Failed to get protocol version") return False print(f"â„šī¸ Protocol version: {data[0] & 0xF}.{data[0] >> 4}") - data = execute_command(CMD_GET, None, CMD_GET_LENGTH_V12, verbose) + data = execute_command(bus, CMD_GET, None, CMD_GET_LENGTH_V12, verbose) if data is None: print("❌ Failed to get command list") return False @@ -136,7 +112,7 @@ def flash_firmware(firmware_path, verbose=False): print("👀 Supported commands:") print(", ".join([hex(byte) for byte in data[2:]])) - data = execute_command(CMD_GET_ID, None, 3, verbose) + data = execute_command(bus, CMD_GET_ID, None, 3, verbose) if data is None: print("❌ Failed to get device ID") return False @@ -146,7 +122,7 @@ def flash_firmware(firmware_path, verbose=False): print("đŸ—‘ī¸ Erasing memory...") erase_params = bytearray([0xFF, 0xFF, 0x0]) # Mass erase flash - execute_command(CMD_ERASE_NO_STRETCH, erase_params, 0, verbose) + execute_command(bus, CMD_ERASE_NO_STRETCH, erase_params, 0, verbose) with open(firmware_path, 'rb') as file: firmware_data = file.read() @@ -161,7 +137,7 @@ def flash_firmware(firmware_path, verbose=False): checksum ^= b start_address.append(checksum) data_slice = firmware_data[i:i + CHUNK_SIZE] - if not write_firmware_page(start_address, data_slice): + if not write_firmware_page(bus, start_address, data_slice): print(f"❌ Failed to write page {hex(i)}") return False time.sleep(0.01) # Give the device some time to process the data @@ -170,39 +146,40 @@ def flash_firmware(firmware_path, verbose=False): print("🏃 Starting firmware") go_params = bytearray([0x8, 0x00, 0x00, 0x00, 0x8]) - execute_command(CMD_GO, go_params, 0, verbose) # Jump to the application + execute_command(bus, CMD_GO, go_params, 0, verbose) # Jump to the application return True -def write_firmware_page(command_params, firmware_data): +def write_firmware_page(bus, command_params, firmware_data): """ Write a page of the firmware to the I2C device. + :param bus: The I2C bus to use. :param command_params: The buffer containing the command parameters. :param firmware_data: The buffer containing the firmware data. :return: True if the page was written successfully, otherwise False. """ cmd = bytes([CMD_WRITE_NO_STRETCH, 0xFF ^ CMD_WRITE_NO_STRETCH]) - i2c.writeto(BOOTLOADER_I2C_ADDRESS, cmd) - if not wait_for_ack(): + bus.writeto(BOOTLOADER_I2C_ADDRESS, cmd) + if not wait_for_ack(bus): print("❌ Write command not acknowledged") return False - i2c.writeto(BOOTLOADER_I2C_ADDRESS, command_params) - if not wait_for_ack(): + bus.writeto(BOOTLOADER_I2C_ADDRESS, command_params) + if not wait_for_ack(bus): print("❌ Failed to write command parameters") return False data_size = len(firmware_data) tmp_buffer = bytearray(data_size + 2) # Data plus size and checksum - tmp_buffer[0] = data_size - 1 # Size of the data # TODO Arduino code uses data_size - 1 + tmp_buffer[0] = data_size - 1 # Size of the data tmp_buffer[1:data_size + 1] = firmware_data tmp_buffer[-1] = 0 # Checksum placeholder for i in range(data_size + 1): # Calculate checksum over size byte + data bytes tmp_buffer[-1] ^= tmp_buffer[i] - i2c.writeto(BOOTLOADER_I2C_ADDRESS, tmp_buffer) - if not wait_for_ack(): + bus.writeto(BOOTLOADER_I2C_ADDRESS, tmp_buffer) + if not wait_for_ack(bus): print("❌ Failed to write firmware") return False @@ -243,7 +220,7 @@ def select_file(bin_files): return None if len(bin_files) == 1: - confirm = input(f"📄 Found one biary file: {bin_files[0]}. Do you want to flash it? (yes/no) ") + confirm = input(f"📄 Found one binary file: {bin_files[0]}. Do you want to flash it? (yes/no) ") if confirm.lower() == 'yes': return bin_files[0] else: @@ -257,37 +234,41 @@ def select_file(bin_files): return None return bin_files[choice - 1] -def select_i2c_device(): +def select_device(bus : I2C) -> Modulino: """ Scan the I2C bus for devices and prompt the user to select one. - :return: The selected I2C device address. + :param bus: The I2C bus to scan. + :return: The selected Modulino device. """ - devices = i2c.scan() + devices = Modulino.available_devices(bus) if len(devices) == 0: - print("❌ No I2C devices found") + print("❌ No devices found") return None if len(devices) == 1: - confirm = input(f"🔌 Found one I2C device at address {hex(devices[0])}. Do you want to flash it? (yes/no) ") + device = devices[0] + confirm = input(f"🔌 Found {device.device_type} at address {hex(device.address)}. Do you want to update this device? (yes/no) ") if confirm.lower() == 'yes': return devices[0] else: return None - print("🔌 I2C devices found:") + print("🔌 Devices found:") for index, device in enumerate(devices): - print(f"{index + 1}. Address: {hex(device)}") - choice = int(input("Select the I2C device to flash (number): ")) + print(f"{index + 1}) {device.device_type} at {hex(device.address)}") + choice = int(input("Select the device to flash (number): ")) if choice < 1 or choice > len(devices): return None return devices[choice - 1] -def run(): +def run(bus: I2C): """ Initialize the flashing process. Finds .bin files, scans for I2C devices, and flashes the selected firmware. + + :param bus: The I2C bus to use. If None, the default I2C bus will be used. """ bin_files = find_bin_files() @@ -300,12 +281,13 @@ def run(): print("❌ No file selected") return - device_address = select_i2c_device() - if device_address is None: + device = select_device(bus) + if device is None: print("❌ No device selected") return - - if send_reset(device_address): + + print(f"🔄 Resetting device at address {hex(device.address)}") + if device.enter_bootloader(): print("✅ Device reset successfully") else: print("❌ Failed to reset device") @@ -313,10 +295,11 @@ def run(): print(f"đŸ•ĩī¸ Flashing {bin_file} to device at address {hex(BOOTLOADER_I2C_ADDRESS)}") - if flash_firmware(bin_file): + if flash_firmware(device, bin_file): print("✅ Firmware flashed successfully") else: print("❌ Failed to flash firmware") if __name__ == "__main__": - run() + print() + run(bus) diff --git a/src/modulino/modulino.py b/src/modulino/modulino.py index 60ee655..4a6f99e 100644 --- a/src/modulino/modulino.py +++ b/src/modulino/modulino.py @@ -250,6 +250,24 @@ def change_address(self, new_address: int): self.address = new_address + def enter_bootloader(self): + """ + Enters the I2C bootloader of the device. + This is only supported on Modulinos that have a microcontroller. + + Returns: + bool: True if the device entered bootloader mode, False otherwise. + """ + buffer = b'DIE' + buffer += b'\x00' * (8 - len(buffer)) # Pad buffer to 8 bytes + try: + self.i2c_bus.writeto(self.address, buffer, True) + sleep(0.25) # Wait for the device to reset + return True + except OSError as e: + # ENODEV (e.errno == 19) can be thrown if either the device reset while writing out the buffer + return False + def read(self, amount_of_bytes: int) -> bytes | None: """ Reads the given amount of bytes from the i2c device and returns the data. @@ -293,14 +311,18 @@ def has_default_address(self) -> bool: return self.address in self.default_addresses @staticmethod - def available_devices() -> list[Modulino]: + def available_devices(bus: I2C = None) -> list[Modulino]: """ Finds all devices on the i2c bus and returns them as a list of Modulino objects. + Parameters: + bus (I2C): The I2C bus to use. If not provided, the default I2C bus will be used. + Returns: list: A list of Modulino objects. """ - bus = _I2CHelper.get_interface() + if bus is None: + bus = _I2CHelper.get_interface() device_addresses = bus.scan() devices = [] for address in device_addresses: From 98b24381f1a3f44242864648fe850a5716879f1e Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 16:26:55 +0100 Subject: [PATCH 15/16] Rename firmware updater --- src/modulino/firmware_flasher.py => examples/firmware_update.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/modulino/firmware_flasher.py => examples/firmware_update.py (100%) diff --git a/src/modulino/firmware_flasher.py b/examples/firmware_update.py similarity index 100% rename from src/modulino/firmware_flasher.py rename to examples/firmware_update.py From 20e361b5a8cf052e6e15c2d95377cd394f45b47b Mon Sep 17 00:00:00 2001 From: Sebastian Romero Date: Thu, 20 Feb 2025 16:36:51 +0100 Subject: [PATCH 16/16] Improve wording --- examples/firmware_update.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/firmware_update.py b/examples/firmware_update.py index 6b45cc1..8f4fddd 100644 --- a/examples/firmware_update.py +++ b/examples/firmware_update.py @@ -128,7 +128,7 @@ def flash_firmware(device : Modulino, firmware_path, verbose=False): firmware_data = file.read() total_bytes = len(firmware_data) - print(f"đŸ”Ĩ Flashing {total_bytes} bytes of firmware") + print(f"đŸ”Ĩ Writing {total_bytes} bytes") for i in range(0, total_bytes, CHUNK_SIZE): progress_bar(i, total_bytes) start_address = bytearray([8, 0, i // 256, i % 256]) # 4-byte address: byte 1 = MSB, byte 4 = LSB @@ -144,7 +144,7 @@ def flash_firmware(device : Modulino, firmware_path, verbose=False): progress_bar(total_bytes, total_bytes) # Complete the progress bar - print("🏃 Starting firmware") + print("🏃 Launching new firmware") go_params = bytearray([0x8, 0x00, 0x00, 0x00, 0x8]) execute_command(bus, CMD_GO, go_params, 0, verbose) # Jump to the application