axione-elig-test/address_finder/api.py

140 lines
5.2 KiB
Python
Raw Normal View History

2021-11-02 15:21:34 +01:00
import sqlite3
2021-11-10 23:22:46 +01:00
import sys
2021-11-02 15:21:34 +01:00
import json
2022-01-26 21:36:28 +01:00
from .model import Commune, FantoirVoie
2022-01-26 15:57:46 +01:00
import re
2021-11-02 15:21:34 +01:00
# DB with addresses info
2022-01-26 21:36:28 +01:00
DB_ADDRESSES_PATH_ENV = "DB_ADDRESSES_PATH"
DB_ADDRESSES_DEFAULT_PATH = "/etc/fantoir.sqlite"
2021-11-02 15:21:34 +01:00
# Table for insee codes
2022-01-26 21:36:28 +01:00
DB_TABLE_INSEE_NAME = "insee"
DB_COL_COMMUNE_INSEE = "Code_commune_INSEE"
DB_COL_COMMUNE_NAME = "Nom_commune"
DB_COL_COMMUNE_POSTE = "Code_postal"
2021-11-02 15:21:34 +01:00
# Table for Fantoir voies (code Rivoli)
2022-01-26 21:36:28 +01:00
DB_TABLE_FANTOIR_NAME = "keyv"
DB_COL_FANTOIR_INSEE = "key"
DB_FANTOIR_INSEE_KEY_SUFFIX = "keyv:"
2021-11-02 15:21:34 +01:00
# Utility to find an address
2022-01-26 21:36:28 +01:00
2021-11-02 15:21:34 +01:00
class AddressFinder:
def __init__(self, db_addresses_sqlite_path: str):
self.dbPath = db_addresses_sqlite_path
print("DB addresses Path : " + self.dbPath)
2022-01-28 15:42:08 +01:00
def getCommunesFromNameOrZip(self, communeNameOrZip: str, limit: int = None) -> list[Commune]:
2021-11-02 15:21:34 +01:00
con = sqlite3.connect(self.dbPath)
con.row_factory = sqlite3.Row
cur = con.cursor()
2022-01-28 15:42:08 +01:00
# Check if a search limit is specified, make sure it is an integer
select_limit = ""
if limit is not None:
try:
select_limit = f"LIMIT {int(limit)}"
except ValueError:
print("Error, limit arg not a valid int: ", limit)
2022-01-26 15:57:46 +01:00
2021-11-10 23:22:46 +01:00
try:
2022-01-28 15:42:08 +01:00
# If no search parameter, select all
2022-01-26 21:36:28 +01:00
if communeNameOrZip is None:
2022-01-28 15:42:08 +01:00
cur.execute(
f"SELECT * from \"{DB_TABLE_INSEE_NAME}\" {select_limit}")
2022-01-26 21:36:28 +01:00
else:
communeSearch = communeNameOrZip
zipSearch = communeNameOrZip
searchOpertor = "OR"
2022-01-28 15:42:08 +01:00
# Allow search zip and commune at the same time, in the format "29530 PLO"
regexCommuneAndZip = r"[0-9]{5} .+"
2022-01-26 21:36:28 +01:00
if re.match(regexCommuneAndZip, communeNameOrZip):
splitSearch = communeNameOrZip.split(' ')
zipSearch = splitSearch[0]
communeSearch = ' '.join(splitSearch[1:])
searchOpertor = "AND"
cur.execute(
2022-01-28 15:42:08 +01:00
f"SELECT * from \"{DB_TABLE_INSEE_NAME}\" WHERE {DB_COL_COMMUNE_NAME} LIKE \"%{communeSearch}%\" COLLATE nocase {searchOpertor} {DB_COL_COMMUNE_POSTE} LIKE \"{zipSearch}%\" {select_limit}")
2021-11-10 23:22:46 +01:00
except sqlite3.OperationalError as err:
2022-01-26 21:36:28 +01:00
print("Error querying DB : {0}".format(err), file=sys.stderr)
return []
2022-01-27 23:53:19 +01:00
communesMap = dict()
2022-01-26 21:36:28 +01:00
for row in cur.fetchall():
row_obj = dict(row)
commune = Commune(
codeInsee=row_obj[DB_COL_COMMUNE_INSEE],
nom=row_obj[DB_COL_COMMUNE_NAME],
codeZip=row_obj[DB_COL_COMMUNE_POSTE])
2022-01-28 15:42:08 +01:00
# This way we avoid duplicates in DB
2022-01-27 23:53:19 +01:00
communesMap[commune["codeInsee"]] = commune
2022-01-26 21:36:28 +01:00
2021-11-02 15:21:34 +01:00
con.close()
2022-01-26 21:36:28 +01:00
return list(communesMap.values())
2021-11-02 15:21:34 +01:00
2022-01-28 15:42:08 +01:00
def getCommuneFantoirVoies(self, communeInseeCode: str, voieSearch: str = None, limit: int = None) -> list[FantoirVoie]:
2022-01-26 21:36:28 +01:00
2021-11-02 15:21:34 +01:00
# Extract data from DB
con = sqlite3.connect(self.dbPath)
con.row_factory = sqlite3.Row
cur = con.cursor()
2021-11-10 23:22:46 +01:00
try:
2022-01-26 21:36:28 +01:00
cur.execute(
f"SELECT value from \"{DB_TABLE_FANTOIR_NAME}\" WHERE {DB_COL_FANTOIR_INSEE}=\"{DB_FANTOIR_INSEE_KEY_SUFFIX}{communeInseeCode}\"")
2021-11-10 23:22:46 +01:00
except sqlite3.OperationalError as err:
2022-01-26 21:36:28 +01:00
print("Error querying DB : {0}".format(err), file=sys.stderr)
return []
2021-11-02 15:21:34 +01:00
data_raw = cur.fetchone()
con.close()
2022-01-26 21:36:28 +01:00
# Get JSON payload
2022-01-28 15:42:08 +01:00
# Check if a search limit is specified, make sure it is an integer
if limit is not None:
try:
limit = int(limit)
except ValueError:
print("Error, limit arg not a valid int: ", limit)
2021-11-02 15:21:34 +01:00
fantoir_dict = []
# Check if data where found
if data_raw is not None:
2022-01-26 21:36:28 +01:00
data = dict(data_raw)
# Extract the data behind "value" which is a JSON structure
data_dict = json.loads(data.get("value"))
# In extracted JSON data, the interesting payload is behind "value" key
fantoir_dict = data_dict.get("value")
2022-01-28 15:42:08 +01:00
# Apply search filter if any
2022-01-27 23:53:19 +01:00
if voieSearch is not None:
2022-01-28 15:42:08 +01:00
# Can match multiple words, for example "avenue noe" matches "avenue de noes"
2022-01-27 23:53:19 +01:00
regexSearch = r".*"
for expr in voieSearch.split(' '):
regexSearch += r"(?=" + expr + r").*"
regexSearch += r".*"
fantoir_voies_filtered = []
2022-01-28 15:42:08 +01:00
nb_match = 0
2022-01-27 23:53:19 +01:00
for voie in fantoir_dict:
for libelle in voie['libelle']:
if re.search(regexSearch, libelle, re.IGNORECASE):
fantoir_voies_filtered.append(voie)
2022-01-28 15:42:08 +01:00
nb_match += 1
2022-01-27 23:53:19 +01:00
break
2022-01-28 15:42:08 +01:00
if limit is not None and nb_match >= limit:
break
fantoir_dict = fantoir_voies_filtered
else:
# Apply search limit if any
fantoir_dict = fantoir_dict[:limit]
2021-11-02 15:21:34 +01:00
else:
2022-01-26 21:36:28 +01:00
print("Did not found any data matching Insee code " +
str(communeInseeCode))
2021-11-02 15:21:34 +01:00
# Return the json dump
return fantoir_dict