import sqlite3 import sys import json from .model import Commune, FantoirVoie import re # DB with addresses info DB_ADDRESSES_PATH_ENV = "DB_ADDRESSES_PATH" DB_ADDRESSES_DEFAULT_PATH = "/etc/fantoir.sqlite" # Table for insee codes DB_TABLE_INSEE_NAME = "insee" DB_COL_COMMUNE_INSEE = "Code_commune_INSEE" DB_COL_COMMUNE_NAME = "Nom_commune" DB_COL_COMMUNE_POSTE = "Code_postal" # Utility to find an address class AddressFinder: def __init__(self, db_insee_communes_sqlite_path: str, db_fantoir_voies_sqlite_path: str): self.dbCommunesPath = db_insee_communes_sqlite_path self.dbFantoirPath = db_fantoir_voies_sqlite_path print("DB insee communes Path : " + self.dbCommunesPath) print("DB Fantoir voies Path : " + self.dbFantoirPath) def getCommunesFromNameOrZip(self, communeNameOrZip: str, limit: int = None) -> list[Commune]: con = sqlite3.connect(self.dbCommunesPath) con.row_factory = sqlite3.Row cur = con.cursor() # Check if a search limit is specified, make sure it is an integer select_limit = "" if limit is not None: try: select_limit = f"LIMIT {int(limit)}" except ValueError: print("Error, limit arg not a valid int: ", limit) try: # If no search parameter, select all if communeNameOrZip is None: cur.execute( f"SELECT * from \"{DB_TABLE_INSEE_NAME}\" {select_limit}") else: communeSearch = communeNameOrZip zipSearch = communeNameOrZip searchOpertor = "OR" # Allow search zip and commune at the same time, in the format "29530 PLO" regexCommuneAndZip = r"[0-9]{5} .+" if re.match(regexCommuneAndZip, communeNameOrZip): splitSearch = communeNameOrZip.split(' ') zipSearch = splitSearch[0] communeSearch = ' '.join(splitSearch[1:]) searchOpertor = "AND" cur.execute( f"SELECT * from \"{DB_TABLE_INSEE_NAME}\" WHERE {DB_COL_COMMUNE_NAME} LIKE \"%{communeSearch}%\" COLLATE nocase {searchOpertor} {DB_COL_COMMUNE_POSTE} LIKE \"{zipSearch}%\" {select_limit}") except sqlite3.OperationalError as err: print("Error querying DB : {0}".format(err), file=sys.stderr) return [] communesMap = dict() for row in cur.fetchall(): row_obj = dict(row) commune = Commune( codeInsee=row_obj[DB_COL_COMMUNE_INSEE], nom=row_obj[DB_COL_COMMUNE_NAME], codeZip=row_obj[DB_COL_COMMUNE_POSTE]) # This way we avoid duplicates in DB communesMap[commune["codeInsee"]] = commune con.close() return list(communesMap.values()) def getCommuneFantoirVoies(self, communeInseeCode: str, voieSearch: str = None, limit: int = None) -> list[FantoirVoie]: # Extract data from DB con = sqlite3.connect(self.dbFantoirPath) con.row_factory = sqlite3.Row cur = con.cursor() if voieSearch is None: voieSearch='' # Check if a search limit is specified, make sure it is an integer select_limit = "" if limit is not None: try: select_limit = f"LIMIT {int(limit)}" except ValueError: print("Error, limit arg not a valid int: ", limit) try: cur.execute( f"SELECT trim(libelle), rivoli_with_key from fantoir WHERE full_insee=\"{communeInseeCode}\" AND libelle like \"%{voieSearch}%\" {select_limit}") except sqlite3.OperationalError as err: print("Error querying DB : {0}".format(err), file=sys.stderr) return [] data_raw = cur.fetchall() con.close() fantoir_dict = [] # Check if data where found if data_raw is not None: fantoir_dict = dict(data_raw) else: print("Did not found any data matching Insee code " + str(communeInseeCode)) # Return the json dump return fantoir_dict