From 3298226c287eac91137923c39ef64a046ae6101c Mon Sep 17 00:00:00 2001 From: TamtamHero <10632523+TamtamHero@users.noreply.github.com> Date: Mon, 9 Dec 2019 14:41:19 +0100 Subject: [PATCH] Add GET RESPONSE support on USB HID --- ledgerblue/comm.py | 111 ++++++++++++++++++++++++++++++++------------- 1 file changed, 79 insertions(+), 32 deletions(-) diff --git a/ledgerblue/comm.py b/ledgerblue/comm.py index 4843d34..100bdbf 100644 --- a/ledgerblue/comm.py +++ b/ledgerblue/comm.py @@ -29,6 +29,7 @@ from .commHTTP import getDongle as getDongleHTTP from .commTCP import getDongle as getDongleTCP import hid +import struct APDUGEN=None if "APDUGEN" in os.environ and len(os.environ["APDUGEN"]) != 0: @@ -63,20 +64,13 @@ class HIDDongleHIDAPI(Dongle, DongleWait): def __init__(self, device, ledger=False, debug=False): self.device = device - self.ledger = ledger + self.legacy_exchange = ledger self.debug = debug self.waitImpl = self self.opened = True - def exchange(self, apdu, timeout=TIMEOUT): - if APDUGEN: - print("%s" % hexstr(apdu)) - return - - if self.debug: - print("HID => %s" % hexstr(apdu)) - if self.ledger: - apdu = wrapCommandAPDU(0x0101, apdu, 64) + def _exchange_legacy(self, apdu, timeout=TIMEOUT): + padSize = len(apdu) % 64 tmp = apdu if padSize != 0: @@ -91,26 +85,50 @@ def exchange(self, apdu, timeout=TIMEOUT): dataLength = 0 dataStart = 2 result = self.waitImpl.waitFirstResponse(timeout) - if not self.ledger: - if result[0] == 0x61: # 61xx : data available - self.device.set_nonblocking(False) - dataLength = result[1] - dataLength += 2 - if dataLength > 62: - remaining = dataLength - 62 - while(remaining != 0): - if remaining > 64: - blockLength = 64 - else: - blockLength = remaining - result.extend(bytearray(self.device.read(65))[0:blockLength]) - remaining -= blockLength - swOffset = dataLength - dataLength -= 2 - self.device.set_nonblocking(True) - else: - swOffset = 0 + + if result[0] == 0x61: # 61xx : data available + self.device.set_nonblocking(False) + dataLength = result[1] + dataLength += 2 + if dataLength > 62: + remaining = dataLength - 62 + while(remaining != 0): + if remaining > 64: + blockLength = 64 + else: + blockLength = remaining + result.extend(bytearray(self.device.read(65))[0:blockLength]) + remaining -= blockLength + swOffset = dataLength + dataLength -= 2 + self.device.set_nonblocking(True) else: + swOffset = 0 + + sw = (result[swOffset] << 8) + result[swOffset + 1] + response = result[dataStart : dataLength + dataStart] + + return (sw, response) + + + def _exchange(self, apdu, timeout=TIMEOUT): + + def send_apdu(apdu): + apdu = wrapCommandAPDU(0x0101, apdu, 64) + padSize = len(apdu) % 64 + tmp = apdu + if padSize != 0: + tmp.extend([0] * (64 - padSize)) + offset = 0 + while(offset != len(tmp)): + data = tmp[offset:offset + 64] + data = bytearray([0]) + data + if self.device.write(data) < 0: + raise BaseException("Error while writing") + offset += 64 + + def get_data(timeout): + result = self.waitImpl.waitFirstResponse(timeout) self.device.set_nonblocking(False) while True: response = unwrapResponseAPDU(0x0101, result, 64) @@ -122,11 +140,40 @@ def exchange(self, apdu, timeout=TIMEOUT): self.device.set_nonblocking(True) break result.extend(bytearray(self.device.read(65))) - sw = (result[swOffset] << 8) + result[swOffset + 1] - response = result[dataStart : dataLength + dataStart] + sw = struct.unpack(">H", result[-2:])[0] + return (sw, result[:-2]) + + + send_apdu(apdu) + (sw, response) = get_data(timeout) + + # handle the get response case: + # When more data is available, the chip sends 0x61XX + # So 0x61xx as a SW must not be interpreted as an error + while (sw & 0xFF00) == 0x6100: + send_apdu(bytes.fromhex("00c0000000")) # GET RESPONSE + (sw, data) = get_data(timeout) + response += data + + return (sw, response) + + def exchange(self, apdu, timeout=TIMEOUT): + if APDUGEN: + print("%s" % hexstr(apdu)) + return + if self.debug: + print("HID => %s" % hexstr(apdu)) + + + if self.legacy_exchange: + (sw, response) = self._exchange(apdu, timeout) + else: + (sw, response) = self._exchange_legacy(apdu, timeout) + + if self.debug: print("HID <= %s%.2x" % (hexstr(response), sw)) - if sw != 0x9000 and (sw & 0xFF00) != 0x6100 and (sw & 0xFF00) != 0x6C00: + elif sw != 0x9000 and (sw & 0xFF00) != 0x6C00: possibleCause = "Unknown reason" if sw == 0x6982: possibleCause = "Have you uninstalled the existing CA with resetCustomCA first?"