axione-elig-test/address_finder/api.py
2022-02-12 18:39:08 +01:00

110 lines
4.2 KiB
Python

import sqlite3
import sys
import json
from .model import Commune, FantoirVoie
import re
# DB with addresses info
DB_ADDRESSES_PATH_ENV = "DB_ADDRESSES_PATH"
DB_ADDRESSES_DEFAULT_PATH = "/etc/fantoir.sqlite"
# Table for insee codes
DB_TABLE_INSEE_NAME = "insee"
DB_COL_COMMUNE_INSEE = "Code_commune_INSEE"
DB_COL_COMMUNE_NAME = "Nom_commune"
DB_COL_COMMUNE_POSTE = "Code_postal"
# Utility to find an address
class AddressFinder:
def __init__(self, db_insee_communes_sqlite_path: str, db_fantoir_voies_sqlite_path: str):
self.dbCommunesPath = db_insee_communes_sqlite_path
self.dbFantoirPath = db_fantoir_voies_sqlite_path
print("DB insee communes Path : " + self.dbCommunesPath)
print("DB Fantoir voies Path : " + self.dbFantoirPath)
def getCommunesFromNameOrZip(self, communeNameOrZip: str, limit: int = None) -> list[Commune]:
con = sqlite3.connect(self.dbCommunesPath)
con.row_factory = sqlite3.Row
cur = con.cursor()
# Check if a search limit is specified, make sure it is an integer
select_limit = ""
if limit is not None:
try:
select_limit = f"LIMIT {int(limit)}"
except ValueError:
print("Error, limit arg not a valid int: ", limit)
try:
# If no search parameter, select all
if communeNameOrZip is None:
cur.execute(
f"SELECT * from \"{DB_TABLE_INSEE_NAME}\" {select_limit}")
else:
communeSearch = communeNameOrZip
zipSearch = communeNameOrZip
searchOpertor = "OR"
# Allow search zip and commune at the same time, in the format "29530 PLO"
regexCommuneAndZip = r"[0-9]{5} .+"
if re.match(regexCommuneAndZip, communeNameOrZip):
splitSearch = communeNameOrZip.split(' ')
zipSearch = splitSearch[0]
communeSearch = ' '.join(splitSearch[1:])
searchOpertor = "AND"
cur.execute(
f"SELECT * from \"{DB_TABLE_INSEE_NAME}\" WHERE {DB_COL_COMMUNE_NAME} LIKE \"%{communeSearch}%\" COLLATE nocase {searchOpertor} {DB_COL_COMMUNE_POSTE} LIKE \"{zipSearch}%\" {select_limit}")
except sqlite3.OperationalError as err:
print("Error querying DB : {0}".format(err), file=sys.stderr)
return []
communesMap = dict()
for row in cur.fetchall():
row_obj = dict(row)
commune = Commune(
codeInsee=row_obj[DB_COL_COMMUNE_INSEE],
nom=row_obj[DB_COL_COMMUNE_NAME],
codeZip=row_obj[DB_COL_COMMUNE_POSTE])
# This way we avoid duplicates in DB
communesMap[commune["codeInsee"]] = commune
con.close()
return list(communesMap.values())
def getCommuneFantoirVoies(self, communeInseeCode: str, voieSearch: str = None, limit: int = None) -> list[FantoirVoie]:
# Extract data from DB
con = sqlite3.connect(self.dbFantoirPath)
con.row_factory = sqlite3.Row
cur = con.cursor()
if voieSearch is None:
voieSearch=''
# Check if a search limit is specified, make sure it is an integer
select_limit = ""
if limit is not None:
try:
select_limit = f"LIMIT {int(limit)}"
except ValueError:
print("Error, limit arg not a valid int: ", limit)
try:
cur.execute(
f"SELECT trim(libelle), rivoli_with_key from fantoir WHERE full_insee=\"{communeInseeCode}\" AND libelle like \"%{voieSearch}%\" {select_limit}")
except sqlite3.OperationalError as err:
print("Error querying DB : {0}".format(err), file=sys.stderr)
return []
data_raw = cur.fetchall()
con.close()
fantoir_dict = []
# Check if data where found
if data_raw is not None:
fantoir_dict = dict(data_raw)
else:
print("Did not found any data matching Insee code " +
str(communeInseeCode))
# Return the json dump
return fantoir_dict