|
| 1 | +# This file is part of the se05x package. |
| 2 | +# Copyright (c) 2024 Arduino SA |
| 3 | +# This Source Code Form is subject to the terms of the Mozilla Public |
| 4 | +# License, v. 2.0. If a copy of the MPL was not distributed with this |
| 5 | +# file, You can obtain one at https://mozilla.org/MPL/2.0/. |
| 6 | +# |
| 7 | +# An implementation of ISO7816-3/4 standards. |
| 8 | +import struct |
| 9 | +import logging |
| 10 | +from micropython import const |
| 11 | +from time import sleep |
| 12 | + |
| 13 | +# PCB bits |
| 14 | +_NAD_OFFSET = const(0) |
| 15 | +_PCB_OFFSET = const(1) |
| 16 | +_LEN_OFFSET = const(2) |
| 17 | +_INF_OFFSET = const(3) |
| 18 | + |
| 19 | +# Blocks |
| 20 | +_I_BLOCK = const(0x00) |
| 21 | +_I_BLOCK_N = const(6) |
| 22 | +_I_BLOCK_M = const(5) |
| 23 | + |
| 24 | +_S_BLOCK = const(0xC0) |
| 25 | +_S_BLOCK_REQ = const(0x3F) |
| 26 | + |
| 27 | +_R_BLOCK = const(0x80) |
| 28 | +_R_BLOCK_N = const(4) |
| 29 | +_R_BLOCK_ERR = const(0x03) |
| 30 | + |
| 31 | +# S-Block request/response |
| 32 | +_RESYNC_REQ = const(0x00) |
| 33 | +_IFS_REQ = const(0x01) |
| 34 | +_ABORT_REQ = const(0x02) |
| 35 | +_WTX_REQ = const(0x03) |
| 36 | +_END_SESSION_REQ = const(0x05) |
| 37 | +_CHIP_RESET_REQ = const(0x06) |
| 38 | +_GET_ATR_REQ = const(0x07) |
| 39 | +_SOFT_RESET_REQ = const(0x0F) |
| 40 | + |
| 41 | +_CLA_ISO7816 = const(0x00) |
| 42 | +_INS_GP_SELECT = const(0xA4) |
| 43 | + |
| 44 | +_STATE_IBLK = const(0) |
| 45 | +_STATE_SBLK = const(1) |
| 46 | +_STATE_SEND = const(2) |
| 47 | +_STATE_RECV = const(3) |
| 48 | +_STATE_WAIT = const(4) |
| 49 | +_STATE_WWTX = const(5) |
| 50 | +_STATE_DONE = const(6) |
| 51 | + |
| 52 | + |
| 53 | +def log_enabled(level): |
| 54 | + return logging.getLogger().isEnabledFor(level) |
| 55 | + |
| 56 | + |
| 57 | +class SmartCard: |
| 58 | + def __init__(self, bus, nad, aid): |
| 59 | + self.seq = 0 |
| 60 | + self.bus = bus |
| 61 | + self.sad = nad & 0xF0 |
| 62 | + self.dad = nad & 0x0F |
| 63 | + self.aid = aid |
| 64 | + self.atr = None |
| 65 | + |
| 66 | + # TODO: Optimize these |
| 67 | + self.w_buf = memoryview(bytearray(512)) |
| 68 | + self.r_buf = memoryview(bytearray(512)) |
| 69 | + self.s_buf = memoryview(bytearray(32)) |
| 70 | + self.apdu_buf = memoryview(bytearray(512)) |
| 71 | + |
| 72 | + self.block_name = { |
| 73 | + _I_BLOCK: "I-Block", |
| 74 | + _R_BLOCK: "R-Block", |
| 75 | + _S_BLOCK: "S-Block" |
| 76 | + } |
| 77 | + self.state_name = { |
| 78 | + _STATE_IBLK: "IBLK", |
| 79 | + _STATE_SBLK: "SBLK", |
| 80 | + _STATE_SEND: "SEND", |
| 81 | + _STATE_RECV: "RECV", |
| 82 | + _STATE_WAIT: "WAIT", |
| 83 | + _STATE_WWTX: "WWTX", |
| 84 | + _STATE_DONE: "DONE", |
| 85 | + } |
| 86 | + self.apdu_status = { |
| 87 | + 0x6700: "Wrong length", |
| 88 | + 0x6985: "Conditions not satisfied", |
| 89 | + 0x6982: "Security status not satisfied", |
| 90 | + 0x6A80: "Wrong data", |
| 91 | + 0x6984: "Data invalid", |
| 92 | + 0x6986: "Command not allowed", |
| 93 | + 0x6A82: "File not found", |
| 94 | + 0x6A84: "File full", |
| 95 | + 0x6D00: "Invalid or not supported instruction code.", |
| 96 | + } |
| 97 | + |
| 98 | + def _block_type(self, pcb): |
| 99 | + return _I_BLOCK if pcb & 0x80 == 0 else pcb & 0xC0 |
| 100 | + |
| 101 | + def _block_size(self, buf): |
| 102 | + # NAD, PCB, LEN, INF[LEN], CRC[2] |
| 103 | + return 3 + buf[_LEN_OFFSET] + 2 |
| 104 | + |
| 105 | + def _block_crc16(self, prologue, data, poly=0x8408, crc=0xFFFF): |
| 106 | + # Calculate prologue checksum |
| 107 | + for i in prologue: |
| 108 | + crc ^= i |
| 109 | + for bit in range(8): |
| 110 | + crc = (crc >> 1) ^ poly if crc & 0x1 else crc >> 1 |
| 111 | + |
| 112 | + # Calculate data checksum |
| 113 | + for i in data: |
| 114 | + crc ^= i |
| 115 | + for bit in range(8): |
| 116 | + crc = (crc >> 1) ^ poly if crc & 0x1 else crc >> 1 |
| 117 | + crc ^= 0xFFFF |
| 118 | + return ((crc & 0xFF) << 8) | ((crc >> 8) & 0xFF) |
| 119 | + |
| 120 | + def _block_write(self, buf, delay=0): |
| 121 | + size = self._block_size(buf) |
| 122 | + self.bus.write(buf[0:size]) |
| 123 | + |
| 124 | + def _block_print(self, txrx, *args): |
| 125 | + if len(args) == 1: |
| 126 | + buf = args[0] |
| 127 | + nad, pcb, bsize = buf[_NAD_OFFSET:_LEN_OFFSET + 1] |
| 128 | + crc = buf[_LEN_OFFSET + bsize + 1] << 8 | buf[_LEN_OFFSET + bsize + 2] |
| 129 | + else: |
| 130 | + nad, pcb, bsize, crc, buf = args |
| 131 | + btype = self._block_type(pcb) |
| 132 | + bname = self.block_name[btype] |
| 133 | + boffs = _INF_OFFSET if len(args) == 1 else 0 |
| 134 | + seq = (pcb >> _I_BLOCK_N) & 1 if btype == _I_BLOCK else (pcb >> _R_BLOCK_N) & 1 |
| 135 | + if log_enabled(logging.DEBUG): |
| 136 | + logging.debug( |
| 137 | + f"{'Tx' if txrx else 'Rx'}: {bname} NAD: 0x{nad:X} " |
| 138 | + f"PCB: 0x{pcb:X} LEN: {bsize} SEQ: {seq} CRC: 0x{crc:X}" |
| 139 | + ) |
| 140 | + buf_hex = "".join(f"{b:02X}" for b in buf[boffs:boffs + bsize]) |
| 141 | + logging.debug(f"RAW: {nad:02X}{pcb:02X}{bsize:02X}{buf_hex}{crc:04X}" ) |
| 142 | + |
| 143 | + def _block_new(self, buf, btype, **kwargs): |
| 144 | + data = kwargs.get("data", None) |
| 145 | + bsize = 0 if data is None else len(data) |
| 146 | + buf[_NAD_OFFSET] = self.sad | self.dad |
| 147 | + buf[_PCB_OFFSET] = btype |
| 148 | + buf[_LEN_OFFSET] = bsize |
| 149 | + if btype == _S_BLOCK: |
| 150 | + buf[_PCB_OFFSET] |= kwargs["request"] |
| 151 | + elif btype == _I_BLOCK: |
| 152 | + buf[_PCB_OFFSET] |= self.seq << _I_BLOCK_N |
| 153 | + buf[_PCB_OFFSET] |= kwargs["chained"] << _I_BLOCK_M |
| 154 | + elif btype == _R_BLOCK: |
| 155 | + buf[_PCB_OFFSET] |= self.seq << _R_BLOCK_N |
| 156 | + buf[_PCB_OFFSET] |= kwargs["error"] & _R_BLOCK_ERR |
| 157 | + if bsize: |
| 158 | + buf[_INF_OFFSET:_INF_OFFSET + bsize] = data |
| 159 | + # Calculate and set CRC |
| 160 | + crc = self._block_crc16(buf[0:_INF_OFFSET], buf[_INF_OFFSET:_INF_OFFSET + bsize]) |
| 161 | + buf[_LEN_OFFSET + bsize + 1] = (crc >> 8) & 0xFF |
| 162 | + buf[_LEN_OFFSET + bsize + 2] = (crc >> 0) & 0xFF |
| 163 | + # Toggle I-Block sequence |
| 164 | + if btype == _I_BLOCK: |
| 165 | + self.seq = self.seq ^ 1 |
| 166 | + return buf |
| 167 | + |
| 168 | + def _send_block(self, btype, arg, retry=25, backoff=1.2): |
| 169 | + r_offs = 0 |
| 170 | + w_offs = 0 |
| 171 | + retry_delay = 1 / 1000 |
| 172 | + next_state = _STATE_SBLK if btype == _S_BLOCK else _STATE_IBLK |
| 173 | + |
| 174 | + while retry: |
| 175 | + if log_enabled(logging.DEBUG): |
| 176 | + logging.debug(f"STATE: {self.state_name[next_state]} retry: {retry}") |
| 177 | + if next_state == _STATE_SBLK: |
| 178 | + next_state = _STATE_SEND |
| 179 | + prev_state = _STATE_RECV |
| 180 | + block = self._block_new(self.w_buf, _S_BLOCK, request=arg) |
| 181 | + elif next_state == _STATE_IBLK: |
| 182 | + next_state = _STATE_SEND |
| 183 | + prev_state = _STATE_RECV |
| 184 | + remain = len(arg) - w_offs |
| 185 | + bsize = min(remain, self.atr["IFSC"]) |
| 186 | + chained = int(remain > self.atr["IFSC"]) |
| 187 | + block = self._block_new(self.w_buf, _I_BLOCK, chained=chained, data=arg[w_offs:w_offs + bsize]) |
| 188 | + w_offs += bsize |
| 189 | + elif next_state == _STATE_SEND: |
| 190 | + try: |
| 191 | + self._block_write(block) |
| 192 | + next_state = prev_state |
| 193 | + if log_enabled(logging.DEBUG): |
| 194 | + self._block_print(True, block) |
| 195 | + except Exception: |
| 196 | + retry -= 1 |
| 197 | + elif next_state == _STATE_RECV: |
| 198 | + try: |
| 199 | + # Read NAD, PCB, LEN, information (if present) and CRC. |
| 200 | + nad, pcb, bsize = self.bus.read(self.s_buf[0:3]) |
| 201 | + if bsize: |
| 202 | + self.bus.read(self.r_buf[r_offs:r_offs + bsize]) |
| 203 | + crc = int.from_bytes(self.bus.read(self.s_buf[3:5]), "big") |
| 204 | + except Exception: |
| 205 | + retry -= 1 |
| 206 | + next_state = _STATE_WAIT |
| 207 | + prev_state = _STATE_RECV |
| 208 | + continue |
| 209 | + |
| 210 | + # Check NAD and CRC. |
| 211 | + if (nad != (self.sad >> 4) | (self.dad << 4) or |
| 212 | + crc != self._block_crc16(self.s_buf[0:3], self.r_buf[r_offs:r_offs + bsize])): |
| 213 | + retry -= 1 |
| 214 | + next_state = _STATE_SEND |
| 215 | + prev_state = _STATE_RECV |
| 216 | + block = self._block_new(self.s_buf, _R_BLOCK, error=1) |
| 217 | + continue |
| 218 | + |
| 219 | + if log_enabled(logging.DEBUG): |
| 220 | + self._block_print(False, nad, pcb, bsize, crc, self.r_buf[r_offs:r_offs + bsize]) |
| 221 | + |
| 222 | + # Process block. |
| 223 | + btype = self._block_type(pcb) |
| 224 | + if btype == _R_BLOCK: |
| 225 | + # Retransmit last block if error, or continue block chain. |
| 226 | + next_state = _STATE_SEND if pcb & _R_BLOCK_ERR else _STATE_IBLK |
| 227 | + prev_state = _STATE_RECV |
| 228 | + continue |
| 229 | + |
| 230 | + if btype == _I_BLOCK: |
| 231 | + # Acknowledge I-Block in block chain with R(N(R)). |
| 232 | + if pcb & (1 << _I_BLOCK_M): |
| 233 | + next_state = _STATE_SEND |
| 234 | + prev_state = _STATE_RECV |
| 235 | + block = self._block_new(self.s_buf, _R_BLOCK, error=0) |
| 236 | + else: |
| 237 | + next_state = _STATE_DONE |
| 238 | + # Add current I-Block INF size (could be 0). |
| 239 | + r_offs += bsize |
| 240 | + continue |
| 241 | + |
| 242 | + if btype == _S_BLOCK: |
| 243 | + if pcb & _S_BLOCK_REQ == _RESYNC_REQ: |
| 244 | + # Respond to a resync request. |
| 245 | + self.seq = 0 |
| 246 | + next_state = _STATE_SEND |
| 247 | + prev_state = _STATE_RECV |
| 248 | + block = self._block_new(self.s_buf, _S_BLOCK, request=_RESYNC_REQ & 0x20) |
| 249 | + elif pcb & _S_BLOCK_REQ == _WTX_REQ: |
| 250 | + # Respond to a WTX request. |
| 251 | + next_state = _STATE_SEND |
| 252 | + prev_state = _STATE_WWTX |
| 253 | + wtx_delay = self.r_buf[r_offs] * self.atr["BWT"] / 1000 |
| 254 | + block = self._block_new(self.s_buf, _S_BLOCK, request=_WTX_REQ & 0x20) |
| 255 | + else: |
| 256 | + # Add current S-Block INF size (could be 0). |
| 257 | + r_offs += bsize |
| 258 | + next_state = _STATE_DONE |
| 259 | + continue |
| 260 | + elif next_state == _STATE_WWTX: |
| 261 | + sleep(wtx_delay) |
| 262 | + next_state = _STATE_RECV |
| 263 | + elif next_state == _STATE_WAIT: |
| 264 | + sleep(retry_delay) |
| 265 | + retry_delay *= backoff |
| 266 | + next_state = prev_state |
| 267 | + elif next_state == _STATE_DONE: |
| 268 | + return self.r_buf[0:r_offs] |
| 269 | + |
| 270 | + if retry == 0: |
| 271 | + raise RuntimeError("_send_block failed") |
| 272 | + |
| 273 | + def send_apdu(self, cla, ins, p1, p2, data=None, le=0): |
| 274 | + size = 4 |
| 275 | + self.apdu_buf[0] = cla |
| 276 | + self.apdu_buf[1] = ins |
| 277 | + self.apdu_buf[2] = p1 |
| 278 | + self.apdu_buf[3] = p2 |
| 279 | + if data is not None: |
| 280 | + size = len(data) |
| 281 | + self.apdu_buf[4] = size |
| 282 | + self.apdu_buf[5:5 + size] = data |
| 283 | + self.apdu_buf[5 + size] = 0x00 |
| 284 | + size += 5 |
| 285 | + |
| 286 | + # Send APDU in I-Block |
| 287 | + resp = self._send_block(_I_BLOCK, self.apdu_buf[0:size]) |
| 288 | + |
| 289 | + # Check response TPDU status |
| 290 | + status = int.from_bytes(resp[-2:], "big") |
| 291 | + if status != 0x9000: |
| 292 | + raise RuntimeError("APDU Error: " + self.apdu_status.get(status, f"Unknown 0x{status:X}")) |
| 293 | + |
| 294 | + # Return data bytes, if any, or the status. |
| 295 | + if len(resp) == 2: |
| 296 | + return status |
| 297 | + return resp[2 + (0 if resp[1] <= 0x7F else resp[1] & 0x0F):-2] |
| 298 | + |
| 299 | + def reset(self): |
| 300 | + self.seq = 0 |
| 301 | + atr_raw = self._send_block(_S_BLOCK, _SOFT_RESET_REQ) |
| 302 | + if self.atr is None: |
| 303 | + self.atr = self._parse_atrs(atr_raw) |
| 304 | + if log_enabled(logging.INFO): |
| 305 | + self._dump_atrs(self.atr) |
| 306 | + # Select applet |
| 307 | + self.send_apdu(_CLA_ISO7816, _INS_GP_SELECT, 0x04, 0x00, self.aid, le=True) |
| 308 | + |
| 309 | + def resync(self): |
| 310 | + self._send_block(_S_BLOCK, _RESYNC_REQ) |
| 311 | + self.seq = 0 |
| 312 | + |
| 313 | + def _parse_atrs(self, atr_bytes): |
| 314 | + atr = {} |
| 315 | + # PVER - 1 byte |
| 316 | + atr["PVER"] = atr_bytes[0] |
| 317 | + # VID - 5 bytes |
| 318 | + atr["VID"] = atr_bytes[1:6].hex().upper() |
| 319 | + |
| 320 | + # Length of DLLP - 1 byte |
| 321 | + dllp_length = atr_bytes[6] |
| 322 | + atr["DLLP_LENGTH"] = dllp_length |
| 323 | + |
| 324 | + # DLLP - Variable length (Decode using struct) |
| 325 | + dllp = atr_bytes[7:7 + dllp_length] |
| 326 | + atr["DLLP"] = dllp.hex().upper() |
| 327 | + if dllp_length >= 4: |
| 328 | + atr["BWT"], atr["IFSC"] = struct.unpack(">HH", dllp[:4]) |
| 329 | + |
| 330 | + # PLID - 1 byte |
| 331 | + atr["PLID"] = atr_bytes[7 + dllp_length] |
| 332 | + # Length of PLP - 1 byte |
| 333 | + plp_length = atr_bytes[8 + dllp_length] |
| 334 | + atr["PLP_LENGTH"] = plp_length |
| 335 | + |
| 336 | + # PLP - Variable length (Decode using struct) |
| 337 | + plp = atr_bytes[9 + dllp_length:9 + dllp_length + plp_length] |
| 338 | + atr["PLP"] = plp.hex().upper() |
| 339 | + if plp_length >= 2: |
| 340 | + atr["MCF"] = struct.unpack(">H", plp[:2])[0] |
| 341 | + if plp_length >= 3: |
| 342 | + atr["CONFIGURATION"] = plp[2] |
| 343 | + if plp_length >= 4: |
| 344 | + atr["MPOT"] = plp[3] |
| 345 | + if plp_length >= 6: |
| 346 | + atr["SEGT"], atr["WUT"] = struct.unpack(">HH", plp[4:8]) |
| 347 | + |
| 348 | + # Length of HB - 1 byte |
| 349 | + hb_length = atr_bytes[9 + dllp_length + plp_length] |
| 350 | + atr["HB_LENGTH"] = hb_length |
| 351 | + # HB - Variable length |
| 352 | + hb = atr_bytes[10 + dllp_length + plp_length:10 + dllp_length + plp_length + hb_length] |
| 353 | + atr["HB"] = hb.hex().upper() |
| 354 | + return atr |
| 355 | + |
| 356 | + def _dump_atrs(self, atr): |
| 357 | + logging.info(f"PVER (Protocol Version): {atr['PVER']}") |
| 358 | + logging.info(f"VID (Vendor ID): {atr['VID']}") |
| 359 | + logging.info(f"Length of DLLP: {atr['DLLP_LENGTH']}") |
| 360 | + |
| 361 | + if "DLLP" in atr: |
| 362 | + logging.info(f"DLLP: {atr['DLLP']}") |
| 363 | + |
| 364 | + if "BWT" in atr and "IFSC" in atr: |
| 365 | + logging.info(f"BWT (Block Waiting Time): {atr['BWT']} ms") |
| 366 | + logging.info(f"IFSC (Maximum Information Field Size): {atr['IFSC']} bytes") |
| 367 | + |
| 368 | + logging.info(f"PLID (Physical Layer ID): {atr['PLID']}") |
| 369 | + logging.info(f"Length of PLP: {atr['PLP_LENGTH']}") |
| 370 | + |
| 371 | + if "PLP" in atr: |
| 372 | + logging.info(f"PLP: {atr['PLP']}") |
| 373 | + |
| 374 | + if "MCF" in atr: |
| 375 | + logging.info(f"MCF (Max I2C Clock Frequency): {atr['MCF']} kHz") |
| 376 | + |
| 377 | + if "CONFIGURATION" in atr: |
| 378 | + logging.info(f"Configuration: {atr['CONFIGURATION']:#04x}") |
| 379 | + |
| 380 | + if "MPOT" in atr: |
| 381 | + logging.info(f"MPOT (Minimum Polling Time): {atr['MPOT']} ms") |
| 382 | + |
| 383 | + if "SEGT" in atr and "WUT" in atr: |
| 384 | + logging.info(f"SEGT (Secure Element Guard Time): {atr['SEGT']} µs") |
| 385 | + logging.info(f"WUT (Wake-Up Time): {atr['WUT']} µs") |
| 386 | + |
| 387 | + logging.info(f"Length of HB (Historical Bytes): {atr['HB_LENGTH']}") |
| 388 | + if "HB" in atr: |
| 389 | + logging.info(f"HB (Historical Bytes): {atr['HB']}") |
0 commit comments