113 lines
4.1 KiB
Python
113 lines
4.1 KiB
Python
import sqlite3
|
|
import sys
|
|
import json
|
|
from .model import Commune, FantoirVoie
|
|
import re
|
|
|
|
# DB with addresses info
|
|
DB_ADDRESSES_PATH_ENV = "DB_ADDRESSES_PATH"
|
|
DB_ADDRESSES_DEFAULT_PATH = "/etc/fantoir.sqlite"
|
|
|
|
# Table for insee codes
|
|
DB_TABLE_INSEE_NAME = "insee"
|
|
DB_COL_COMMUNE_INSEE = "Code_commune_INSEE"
|
|
DB_COL_COMMUNE_NAME = "Nom_commune"
|
|
DB_COL_COMMUNE_POSTE = "Code_postal"
|
|
|
|
# Table for Fantoir voies (code Rivoli)
|
|
DB_TABLE_FANTOIR_NAME = "keyv"
|
|
DB_COL_FANTOIR_INSEE = "key"
|
|
DB_FANTOIR_INSEE_KEY_SUFFIX = "keyv:"
|
|
|
|
# Utility to find an address
|
|
|
|
|
|
class AddressFinder:
|
|
|
|
def __init__(self, db_addresses_sqlite_path: str):
|
|
self.dbPath = db_addresses_sqlite_path
|
|
print("DB addresses Path : " + self.dbPath)
|
|
|
|
def getCommunesFromNameOrZip(self, communeNameOrZip: str) -> list[Commune]:
|
|
con = sqlite3.connect(self.dbPath)
|
|
con.row_factory = sqlite3.Row
|
|
cur = con.cursor()
|
|
communes: list[Commune] = []
|
|
|
|
try:
|
|
if communeNameOrZip is None:
|
|
cur.execute(f"SELECT * from \"{DB_TABLE_INSEE_NAME}\"")
|
|
else:
|
|
communeSearch = communeNameOrZip
|
|
zipSearch = communeNameOrZip
|
|
searchOpertor = "OR"
|
|
|
|
regexCommuneAndZip = r"[0-9]{5} .+" # For example: '33000 BO'
|
|
if re.match(regexCommuneAndZip, communeNameOrZip):
|
|
splitSearch = communeNameOrZip.split(' ')
|
|
zipSearch = splitSearch[0]
|
|
communeSearch = ' '.join(splitSearch[1:])
|
|
searchOpertor = "AND"
|
|
cur.execute(
|
|
f"SELECT * from \"{DB_TABLE_INSEE_NAME}\" WHERE {DB_COL_COMMUNE_NAME} LIKE \"%{communeSearch}%\" COLLATE nocase {searchOpertor} {DB_COL_COMMUNE_POSTE} LIKE \"{zipSearch}%\"")
|
|
except sqlite3.OperationalError as err:
|
|
print("Error querying DB : {0}".format(err), file=sys.stderr)
|
|
return []
|
|
|
|
communesMap = dict()
|
|
for row in cur.fetchall():
|
|
row_obj = dict(row)
|
|
commune = Commune(
|
|
codeInsee=row_obj[DB_COL_COMMUNE_INSEE],
|
|
nom=row_obj[DB_COL_COMMUNE_NAME],
|
|
codeZip=row_obj[DB_COL_COMMUNE_POSTE])
|
|
# This way we avoid duplicates
|
|
communesMap[commune["codeInsee"]] = commune
|
|
|
|
con.close()
|
|
return list(communesMap.values())
|
|
|
|
def getCommuneFantoirVoies(self, communeInseeCode: str, voieSearch: str = None) -> list[FantoirVoie]:
|
|
|
|
# Extract data from DB
|
|
con = sqlite3.connect(self.dbPath)
|
|
con.row_factory = sqlite3.Row
|
|
cur = con.cursor()
|
|
try:
|
|
cur.execute(
|
|
f"SELECT value from \"{DB_TABLE_FANTOIR_NAME}\" WHERE {DB_COL_FANTOIR_INSEE}=\"{DB_FANTOIR_INSEE_KEY_SUFFIX}{communeInseeCode}\"")
|
|
except sqlite3.OperationalError as err:
|
|
print("Error querying DB : {0}".format(err), file=sys.stderr)
|
|
return []
|
|
data_raw = cur.fetchone()
|
|
con.close()
|
|
|
|
# Get JSON payload
|
|
|
|
fantoir_dict = []
|
|
# Check if data where found
|
|
if data_raw is not None:
|
|
data = dict(data_raw)
|
|
# Extract the data behind "value" which is a JSON structure
|
|
data_dict = json.loads(data.get("value"))
|
|
# In extracted JSON data, the interesting payload is behind "value" key
|
|
fantoir_dict = data_dict.get("value")
|
|
if voieSearch is not None:
|
|
regexSearch = r".*"
|
|
for expr in voieSearch.split(' '):
|
|
regexSearch += r"(?=" + expr + r").*"
|
|
regexSearch += r".*"
|
|
fantoir_voies_filtered = []
|
|
for voie in fantoir_dict:
|
|
for libelle in voie['libelle']:
|
|
if re.search(regexSearch, libelle, re.IGNORECASE):
|
|
fantoir_voies_filtered.append(voie)
|
|
break
|
|
fantoir_dict = fantoir_voies_filtered
|
|
|
|
else:
|
|
print("Did not found any data matching Insee code " +
|
|
str(communeInseeCode))
|
|
|
|
# Return the json dump
|
|
return fantoir_dict
|