Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 79 additions & 32 deletions ledgerblue/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .commHTTP import getDongle as getDongleHTTP
from .commTCP import getDongle as getDongleTCP
import hid
import struct

APDUGEN=None
if "APDUGEN" in os.environ and len(os.environ["APDUGEN"]) != 0:
Expand Down Expand Up @@ -63,20 +64,13 @@ class HIDDongleHIDAPI(Dongle, DongleWait):

def __init__(self, device, ledger=False, debug=False):
self.device = device
self.ledger = ledger
self.legacy_exchange = ledger
self.debug = debug
self.waitImpl = self
self.opened = True

def exchange(self, apdu, timeout=TIMEOUT):
if APDUGEN:
print("%s" % hexstr(apdu))
return

if self.debug:
print("HID => %s" % hexstr(apdu))
if self.ledger:
apdu = wrapCommandAPDU(0x0101, apdu, 64)
def _exchange_legacy(self, apdu, timeout=TIMEOUT):

padSize = len(apdu) % 64
tmp = apdu
if padSize != 0:
Expand All @@ -91,26 +85,50 @@ def exchange(self, apdu, timeout=TIMEOUT):
dataLength = 0
dataStart = 2
result = self.waitImpl.waitFirstResponse(timeout)
if not self.ledger:
if result[0] == 0x61: # 61xx : data available
self.device.set_nonblocking(False)
dataLength = result[1]
dataLength += 2
if dataLength > 62:
remaining = dataLength - 62
while(remaining != 0):
if remaining > 64:
blockLength = 64
else:
blockLength = remaining
result.extend(bytearray(self.device.read(65))[0:blockLength])
remaining -= blockLength
swOffset = dataLength
dataLength -= 2
self.device.set_nonblocking(True)
else:
swOffset = 0

if result[0] == 0x61: # 61xx : data available
self.device.set_nonblocking(False)
dataLength = result[1]
dataLength += 2
if dataLength > 62:
remaining = dataLength - 62
while(remaining != 0):
if remaining > 64:
blockLength = 64
else:
blockLength = remaining
result.extend(bytearray(self.device.read(65))[0:blockLength])
remaining -= blockLength
swOffset = dataLength
dataLength -= 2
self.device.set_nonblocking(True)
else:
swOffset = 0

sw = (result[swOffset] << 8) + result[swOffset + 1]
response = result[dataStart : dataLength + dataStart]

return (sw, response)


def _exchange(self, apdu, timeout=TIMEOUT):

def send_apdu(apdu):
apdu = wrapCommandAPDU(0x0101, apdu, 64)
padSize = len(apdu) % 64
tmp = apdu
if padSize != 0:
tmp.extend([0] * (64 - padSize))
offset = 0
while(offset != len(tmp)):
data = tmp[offset:offset + 64]
data = bytearray([0]) + data
if self.device.write(data) < 0:
raise BaseException("Error while writing")
offset += 64

def get_data(timeout):
result = self.waitImpl.waitFirstResponse(timeout)
self.device.set_nonblocking(False)
while True:
response = unwrapResponseAPDU(0x0101, result, 64)
Expand All @@ -122,11 +140,40 @@ def exchange(self, apdu, timeout=TIMEOUT):
self.device.set_nonblocking(True)
break
result.extend(bytearray(self.device.read(65)))
sw = (result[swOffset] << 8) + result[swOffset + 1]
response = result[dataStart : dataLength + dataStart]
sw = struct.unpack(">H", result[-2:])[0]
return (sw, result[:-2])


send_apdu(apdu)
(sw, response) = get_data(timeout)

# handle the get response case:
# When more data is available, the chip sends 0x61XX
# So 0x61xx as a SW must not be interpreted as an error
while (sw & 0xFF00) == 0x6100:
send_apdu(bytes.fromhex("00c0000000")) # GET RESPONSE
(sw, data) = get_data(timeout)
response += data

return (sw, response)

def exchange(self, apdu, timeout=TIMEOUT):
if APDUGEN:
print("%s" % hexstr(apdu))
return
if self.debug:
print("HID => %s" % hexstr(apdu))


if self.legacy_exchange:
(sw, response) = self._exchange(apdu, timeout)
else:
(sw, response) = self._exchange_legacy(apdu, timeout)


if self.debug:
print("HID <= %s%.2x" % (hexstr(response), sw))
if sw != 0x9000 and (sw & 0xFF00) != 0x6100 and (sw & 0xFF00) != 0x6C00:
elif sw != 0x9000 and (sw & 0xFF00) != 0x6C00:
possibleCause = "Unknown reason"
if sw == 0x6982:
possibleCause = "Have you uninstalled the existing CA with resetCustomCA first?"
Expand Down