commit beb869ece2684d777ae358f874aac57508fcbf71 Author: Sergey Biryukov Date: Mon Jul 18 23:32:47 2022 +0700 Reading sim cards id and send to the server diff --git a/config.yaml.example b/config.yaml.example new file mode 100644 index 0000000..009aa2e --- /dev/null +++ b/config.yaml.example @@ -0,0 +1,10 @@ +instance: + name: newsimreader + +ports: +- /dev/ttyUSB0 + +server: + host: test.sim.server + port: 1234 + secret: secretpassword \ No newline at end of file diff --git a/connection.py b/connection.py new file mode 100644 index 0000000..a995841 --- /dev/null +++ b/connection.py @@ -0,0 +1,17 @@ +import socket + +class ServerConnection: + + def __init__(self, name, addr, port, secret): + self.socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) + self.srv_addr = socket.gethostbyname(addr) + self.srv_port = int(port) + self.srv_secret = secret + self.name = name + + def send_register(self, iccid, imsi, tty_port): + bytesToSend = str.encode("REGISTER %s %s %s %s %s " % (iccid, self.srv_secret, imsi, self.name, tty_port)) + self.socket.sendto(bytesToSend, (self.srv_addr, self.srv_port)) + + + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fa6c06a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +pyyaml~=5.3.0 +pyserial~=3.5 diff --git a/sim_reader.py b/sim_reader.py new file mode 100644 index 0000000..92b374e --- /dev/null +++ b/sim_reader.py @@ -0,0 +1,323 @@ +import termios +import time +import os +import fcntl +import struct +import logging +from datetime import datetime + +class SIMReader: + """ + Class that controls SIM reader + """ + sim_write_pause = 0.005 + + def __init__(self, ttyPath): + self.ready = False + self.iccid = "" + self.imsi = "" + self.app = [ 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, ] + try: + self.fd = open(ttyPath, mode="r+b", buffering=0) + except Exception as err: + logging.error("Unable to open tty port %s: %s" % (ttyPath, err)) + return + try: + self.set_tty_attr() + while True: # Try to read ATR from the sim card + if self.init_sim(): + break + time.sleep(5) + self.read_ids() + self.ready = True + except Exception as err: + print("Error initializing sim card: %s %s" % (type(err), err)) + self.fd.close() + + def set_tty_attr(self): + """ + Set terminal attributes + """ + attr = termios.tcgetattr(self.fd) + # attr = [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] + + logging.debug("Before attr = %s" % attr) + + # Set iflag + # Turn off s/w flow ctrl + attr[0] &= ~(termios.IXON | termios.IXOFF | termios.IXANY) + # Disable any special handling of received bytes + attr[0] &= ~(termios.IGNBRK|termios.BRKINT|termios.PARMRK|termios.ISTRIP|termios.INLCR|termios.IGNCR|termios.ICRNL) + + # Set oflag + # Prevent special interpretation of output bytes (e.g. newline chars) + attr[1] &= ~termios.OPOST + # Prevent conversion of newline to carriage return/line feed + attr[1] &= ~termios.ONLCR + + # Set cflag + # parity = even + attr[2] |= termios.PARENB + # Clear stop field, only one stop bit used in communication (most common) + attr[2] &= ~termios.CSTOPB + # Clear all bits that set the data size + attr[2] &= ~termios.CSIZE + # 8 bits per byte (most common) + attr[2] |= termios.CS8 + # Disable RTS/CTS hardware flow control (most common) + attr[2] &= ~termios.CRTSCTS + # Turn on READ & ignore ctrl lines (CLOCAL = 1) + attr[2] |= termios.CREAD | termios.CLOCAL + + # Set lflag + attr[3] &= ~termios.ICANON + # Disable echo + attr[3] &= ~termios.ECHO + # Disable erasure + attr[3] &= ~termios.ECHOE + # Disable new-line echo + attr[3] &= ~termios.ECHONL + # Disable interpretation of INTR, QUIT and SUSP + attr[3] &= ~termios.ISIG + + # Set in/out baud rate to be 9600 + attr[4] = termios.B9600 + attr[5] = termios.B9600 + + # Wait for up to 0.1s (1 deciseconds), returning as soon as any data is received. + attr[6][termios.VTIME] = 1 + attr[6][termios.VMIN] = 0 + logging.debug("After attr = %s" % attr) + termios.tcsetattr(self.fd, termios.TCSANOW, attr) + + def init_sim(self) -> bool: + """ + Reads ATR from SIM card + """ + self.set_rts(True) # Set SIM card reset pin + self.read_n(256) # Read serial port buffer if any bytes left + time.sleep(0.5) + self.set_rts(False) # Release reset pin + time.sleep(0.5) # Wait for SIM card to boot + # r = os.read(self.fd.fileno(), 256) + # buf = struct.unpack('B' * len(r), r) + buf = self.read_n(256) + if len(buf)<4: + logging.error("Unable to read ATR from simcard at %s (got len: %d) %s" % (self.fd.name, len(buf), buf)) + return False + logging.debug("%s: got ATR = (%d): %s" % ( + self.fd.name, + len(buf), + ''.join(["%02x " % x for x in buf]), + ) + ) + return True + + def set_rts(self, state: bool): + """ + Set RTS line + + Attributes: + state (bool): state to set RTS line to + """ + s = fcntl.ioctl(self.fd, termios.TIOCMGET, "\000\000\000\000") + i = struct.unpack('l', s)[0] + if state: + i |= termios.TIOCM_RTS + else: + i &= ~termios.TIOCM_RTS + s = struct.pack('l', i) + fcntl.ioctl(self.fd, termios.TIOCMSET, s) + + def read_ids(self): + """ + Reads IMSI and ICCID from SIM card + """ + # unlock chv + self.exec_sim([0x00, 0x2C, 0x00, 0x01, 0x00]) + self.exec_sim([0x00, 0x20, 0x00, 0x01, 0x00]) + self.exec_sim([0x00, 0x2C, 0x00, 0x81, 0x00]) + self.exec_sim([0x00, 0x20, 0x00, 0x81, 0x00]) + + # select file MF + resp = self.exec_sim([0x00, 0xA4, 0x00, 0x04, 0x02, 0x3F, 0x00]) + print(resp) + if len(resp)<2 or resp[0]!=0x61 or resp[1]>240: + raise Exception("Unable to select file MF") + resp = self.exec_sim([0x00, 0xC0, 0x00, 0x00, resp[1]]) + + # select file EF.ICCID + resp = self.exec_sim([0x00, 0xA4, 0x08, 0x04, 0x02, 0x2F, 0xE2]) + if len(resp)<2 or resp[0]!=0x61 or resp[1]>240: + raise Exception("Unable to select file EF.ICCID (1)") + resp = self.exec_sim([0x00, 0xC0, 0x00, 0x00, resp[1]]) + resp = self.exec_sim([0x00, 0xB0, 0x00, 0x00, 0x0a]) + + if len(resp) != 12: + raise Exception("Unable to select file EF.ICCID (2)") + self.iccid = ''.join(["%01X%01X" % (resp[i]&0xf, resp[i]>>4) for i in range(0,10)]) + logging.info("%s: Got ICCID: %s" % (self.fd.name, self.iccid)) + + # select APP + self.app[1]=0xa4 + self.app[2]=0x04 + self.app[3]=0x04 + + resp = self.exec_sim([0x00, 0xA4, 0x08, 0x04, 0x02, 0x2F, 0x00]) + if len(resp)!=2 or resp[0]!=0x61 or resp[1]>240: + raise Exception("Unable to select APP (1)") + resp = self.exec_sim([0x00, 0xC0, 0x00, 0x00, resp[1]]) + rec_len = 0 + rec_n=0 + + i = 2 + while i < len(resp): + tag = resp[i] + i += 1 + if tag == 0xff: + break + tag_len = resp[i] + i += 1 + if tag != 0x82: + i += tag_len + continue + rec_len = resp[i + 3] + rec_n = resp[i + 4] + break + + if rec_len==0 or rec_n==0: + raise Exception("Unable to select APP (2)") + + for nr in range(0,rec_n): + resp = self.exec_sim([0x00, 0xB2, nr+1, 0x04, rec_len]) + i = 2 + while i240: + raise Exception("Unable to select APP (4)") + self.exec_sim([0x00, 0xC0, 0x00, 0x00, resp[1]]) + + # Select IMSI + resp = self.exec_sim([0x00, 0xA4, 0x08, 0x04, 0x04, 0x7f, 0xff, 0x6F, 0x07]) + if len(resp)!=2 or resp[0]!=0x61 or resp[1]>240: + raise Exception("Unable to select IMSI (1)") + self.exec_sim([0x00, 0xC0, 0x00, 0x00, resp[1]]) + resp = self.exec_sim([0x00, 0xB0, 0x00, 0x00, 0x09]) + if len(resp) != 11: + raise Exception("Unable to select IMSI (2)") + self.imsi = "%01X" % (resp[1]>>4) + self.imsi += ''.join(["%01X%01X" % (resp[i]&0xf, resp[i]>>4) for i in range(2,9)]) + logging.info("%s: IMSI: %s" % (self.fd.name, self.imsi)) + + def exec_sim(self, src) -> list: + """ + Executes a command from src in SIM card + """ + logging.debug("Send SIM command: %s" % (''.join(["%02x " % x for x in src[0:5]]))) + if src[1]&0xF0 != 0xB0 and src[1]&0xF0 != 0xC0: + logging.debug("Send SIM command additionaly: %s" % ''.join(["%02x " % x for x in src[5:5+src[4]]])) + + if self.write_n(src[0:5]) != 5: + raise Exception("Unable to write to sim card (1)") + + buf = self.read_n(5) + if len(buf) != 5: + return Exception("Unable to read from sim card (1)") + if src[4] == 0: + buf = self.read_n(2) + if len(buf) != 2: + raise Exception("Unable to read from sim card (2). Expecting for 2 bytes but got %d. %s" % (len(buf), (''.join(["%02x " % x for x in buf])))) + logging.debug("got: %s" % ''.join(["%02x " % x for x in buf])) + return buf + else: + buf = self.read_n(1) + if len(buf) != 1: + raise Exception("Unable to read from sim card (3)") + if buf[0] != src[1]: + raise Exception("Got %s instead of %s" % (buf[0], src[1])) + + if src[1]&0xF0 != 0xB0 and src[1]&0xF0 != 0xC0: + sent = self.write_n(src[5:5+src[4]]) + if sent != src[4]: + raise Exception("Unable to write to sim card (2)") + buf = self.read_n(src[4]) + if len(buf) != src[4]: + raise Exception("Unable to read from sim card (4)") + buf = self.read_n(2) + if len(buf) != 2: + raise Exception("Unable to read from sim card (5)") + logging.debug("got: %s" % ''.join(["%02x " % x for x in buf])) + return buf + else: + buf = self.read_n(src[4]+2) + if len(buf) != src[4]+2: + raise Exception("Unable to read from sim card (6)") + logging.debug("got: %s" % ''.join(["%02x " % x for x in buf])) + return buf + + def write_n(self, buf) -> int: + """ + Writes data to SIM card + + Returns: + int: number of bytes written + """ + logging.debug("Write %d bytes: %s" % (len(buf), ''.join(["%02x " % x for x in buf]))) + length = 0 + for b in buf: + os.write(self.fd.fileno(), bytes([b])) + time.sleep(SIMReader.sim_write_pause) + length += 1 + return length + + def read_n(self, length: int): + """ + Reads n bytes from SIM card with specified timeout + """ + logging.debug("Read for %d bytes" % length) + T = datetime.now() + tmo = 0 + i = 0 + arr = [] + while i < length: + now = datetime.now() + if T != now: + T = now + tmo += 1 + n = os.read(self.fd.fileno(), 1) + if len(n)<1: + if tmo<2: + continue + logging.debug("Got %s" % ''.join(["%02x " % x for x in arr])) + return arr + arr.append(struct.unpack('B', n)[0]) + i += len(n) + logging.debug("Got %s" % ''.join(["%02x " % x for x in arr])) + return arr + + def close(self): + """ + Closes the SIM reader file descriptor + """ + self.fd.close() \ No newline at end of file diff --git a/start.py b/start.py new file mode 100644 index 0000000..3e3193e --- /dev/null +++ b/start.py @@ -0,0 +1,55 @@ +import yaml +import logging +import argparse +import sys +import socket +from os.path import exists +from sim_reader import SIMReader +from connection import ServerConnection + +logging.basicConfig(level=logging.DEBUG) + +if __name__ == "__main__": + config = None + sim_readers = [] + parser = argparse.ArgumentParser(description='Communicates with SIM cards using raw tty port') + parser.add_argument("--config", help="config file to use", type=str, default="config.yaml") + args = parser.parse_args() + + # Parse config file + if not exists(args.config): + print("Config file not found") + exit(1) + try: + with open(args.config, "r") as config_file: + config = yaml.load(config_file, Loader=yaml.BaseLoader) + except Exception as err: + logging.error("Unable to read %s: %s" % (args.config, err)) + sys.exit(1) + + if not "instance" in config: + config["instance"] = {} + if not "name" in config["instance"]: + config["instance"]["name"] = socket.gethostname() + + # Create a connection to SIM server + srv_connection = ServerConnection( + name=config["instance"]["name"], + addr=config["server"]["host"], + port=config["server"]["port"], + secret=config["server"]["secret"] + ) + + # Open SIM readers + for port in config['ports']: + sim = SIMReader(ttyPath=port) + if sim.ready: + print("Opened %s. ICCID: %s IMSI: %s" % (port, sim.iccid, sim.imsi)) + srv_connection.send_register( + iccid=sim.iccid, + imsi=sim.imsi, + tty_port=port + ) + sim.close() + +