update with arcep

This commit is contained in:
Johan Le Baut 2022-04-12 20:39:33 +02:00
parent 70ed8a22ae
commit 58bb3e0d1b
6 changed files with 144 additions and 36 deletions

View file

@ -1,2 +1,5 @@
[DB]
axione_ipe_path = /path/to/ipe.sqlite
axione_ipe_path = /path/to/ipe.sqlite
axione_ipe_db_name = ipe
arcep_ipe_path = /path/to/ipe.sqlite
arcep_ipe_db_name = arcep

View file

@ -0,0 +1,95 @@
from ipe_fetcher.model import AreaCoordinates, Building, FAIEligibilityStatus
from ipe_fetcher.sqlite_connector.cursor import getCursorWithSpatialite
from os.path import exists
ARCEP_ETAT_DEPLOYE = "deploye"
class Arcep:
def __init__(self, db_arcep_ipe_path: str, db_name: str):
self.db_arcep_ipe_path = db_arcep_ipe_path
self.db_name = db_name
# Check at least that the file exists
if not exists(self.db_arcep_ipe_path):
raise ValueError(f"File {self.db_arcep_ipe_path} does not exist")
def getAreaBuildings(
self, areaCoordinates: AreaCoordinates, existing_buildings: dict
) -> dict:
cur = None
# Try to get cursor on Axone database
try:
cur = getCursorWithSpatialite(self.db_arcep_ipe_path)
except Exception as err:
print("Error while connecting to DB: ", err)
raise "Could not get ARCEP data"
# Let's first see how big is the area we're about to query.
# If it's too big, abort the request to prevent a server DOS.
cur.execute(
"""
SELECT Area(BuildMBR(:swx,:swy,:nex,:ney,4326))
""",
areaCoordinates,
)
req_area = cur.fetchone()[0]
if req_area <= 0.08:
cur.execute(
f"""
SELECT
X(ImmeubleGeoPoint),
Y(ImmeubleGeoPoint),
imb_id,
imb_etat,
num_voie,
type_voie,
nom_voie,
batiment
FROM {self.db_name}
WHERE ROWID IN (
SELECT ROWID FROM SpatialIndex
WHERE f_table_name = '{self.db_name}' AND
search_frame = BuildMBR(:swx, :swy, :nex, :ney, 4326))
""",
areaCoordinates,
)
if not existing_buildings:
existing_buildings = dict()
buildings = existing_buildings
for b in cur.fetchall():
x=b[0]
y=b[1]
idImm = b[2]
etatImm = b[3]
numVoieImm=b[4]
typeVoieImm=b[5]
nomVoieImm=b[6]
bat_info=b[7]
isEligible = etatImm == ARCEP_ETAT_DEPLOYE
othersEligStatus = FAIEligibilityStatus(
isEligible=isEligible,
ftthStatus=etatImm,
reasonNotEligible=None if isEligible else "Pas encore deploye",
)
if buildings.get(idImm):
buildings[idImm]["othersEligStatus"] = othersEligStatus
buildings[idImm]["bat_info"] = bat_info
if buildings[idImm].get('found_in'):
buildings[idImm]['found_in'].append("arcep")
else:
buildings[idImm]['found_in'] = ["arcep"]
else:
building = Building(
x=x,
y=y,
idImm=idImm,
numVoieImm=numVoieImm,
typeVoieImm=typeVoieImm,
nomVoieImm=nomVoieImm,
bat_info=bat_info,
found_in = ["arcep"],
othersEligStatus=othersEligStatus,
)
buildings[idImm] = building
return buildings
else:
raise ValueError("The requested area is too wide, please reduce it")

View file

@ -12,8 +12,9 @@ AXIONE_ETAT_RACCORDABLE_DEMANDE = "RACCORDABLE DEMANDE"
class Axione:
def __init__(self, db_axione_ipe_path: str):
def __init__(self, db_axione_ipe_path: str, db_name: str):
self.db_axione_ipe_path = db_axione_ipe_path
self.db_name = db_name
# Check at least that the file exists
if not exists(self.db_axione_ipe_path):
raise ValueError(f"File {self.db_axione_ipe_path} does not exist")
@ -38,50 +39,32 @@ class Axione:
)
req_area = cur.fetchone()[0]
if req_area <= 0.08:
# cur.execute(
# """
# SELECT
# X(ImmeubleGeoPoint),
# Y(ImmeubleGeoPoint),
# IdentifiantImmeuble,
# EtatImmeuble,
# NumeroVoieImmeuble,
# TypeVoieImmeuble,
# NomVoieImmeuble
# FROM ipe
# WHERE ROWID IN (
# SELECT ROWID FROM SpatialIndex
# WHERE f_table_name = 'ipe' AND
# search_frame = BuildMBR(:swx, :swy, :nex, :ney, 4326))
# """,
# areaCoordinates,
# )
cur.execute(
"""
f"""
SELECT
X(ImmeubleGeoPoint),
Y(ImmeubleGeoPoint),
imb_id,
imb_etat,
num_voie,
type_voie,
nom_voie
FROM arcep
IdentifiantImmeuble,
EtatImmeuble,
NumeroVoieImmeuble,
TypeVoieImmeuble,
NomVoieImmeuble
FROM {self.db_name}
WHERE ROWID IN (
SELECT ROWID FROM SpatialIndex
WHERE f_table_name = 'arcep' AND
WHERE f_table_name = '{self.db_name}' AND
search_frame = BuildMBR(:swx, :swy, :nex, :ney, 4326))
""",
areaCoordinates,
)
if not existing_buildings:
existing_buildings = dict()
buildings = existing_buildings
for b in cur.fetchall():
print(b)
etatImm = b[3]
idImm = b[2]
isEligible = etatImm == AXIONE_ETAT_DEPLOYE or etatImm == "deploye"
isEligible = etatImm == AXIONE_ETAT_DEPLOYE
aquilenetEligStatus = FAIEligibilityStatus(
isEligible=isEligible,
ftthStatus=etatImm,
@ -89,6 +72,11 @@ class Axione:
)
if buildings.get(idImm):
buildings[idImm]["aquilenetEligStatus"] = aquilenetEligStatus
if buildings[idImm].get('found_in'):
buildings[idImm]['found_in'].append("axione")
else:
buildings[idImm]['found_in'] = ["axione"]
else:
building = Building(
x=b[0],
@ -97,6 +85,8 @@ class Axione:
numVoieImm=b[4],
typeVoieImm=b[5],
nomVoieImm=b[6],
bat_info="",
found_in = ["axione"],
aquilenetEligStatus=aquilenetEligStatus,
)
buildings[idImm] = building

View file

@ -25,7 +25,18 @@ class Liazo:
existing_buildings = dict()
buildings = existing_buildings
for building in v:
fdnEligStatus = FAIEligibilityStatus(
isEligible=True,
ftthStatus="DEPLOYE",
reasonNotEligible=None,
)
idImm=building.get('ref')
if buildings.get(idImm):
buildings[idImm]["fdnEligStatus"] = fdnEligStatus
if buildings[idImm].get('found_in'):
buildings[idImm]['found_in'].append("liazo")
else:
buildings[idImm]['found_in'] = ["liazo"]
if not buildings.get(idImm):
building = Building(
@ -34,7 +45,10 @@ class Liazo:
idImm=idImm,
numVoieImm="",
typeVoieImm="",
nomVoieImm=""
nomVoieImm="",
bat_info="",
found_in = ["liazo"],
fdnEligStatus=fdnEligStatus,
)
print("add building ", building)
buildings[idImm] = building

View file

@ -14,8 +14,10 @@ class Building(TypedDict):
numVoieImm: str
typeVoieImm: str
nomVoieImm: str
bat_info: str
found_in: list(str)
aquilenetEligStatus: FAIEligibilityStatus
ffdnEligStatus: FAIEligibilityStatus
fdnEligStatus: FAIEligibilityStatus
othersEligStatus: FAIEligibilityStatus

View file

@ -4,7 +4,7 @@ from typing import TypedDict
import configparser
import sqlite3
import os
from ipe_fetcher import Liazo,Axione
from ipe_fetcher import Liazo,Axione,Arcep
class Config(TypedDict):
axione_ipe_path: str
@ -14,13 +14,17 @@ def parseConfig() -> Config:
cfg = configparser.ConfigParser()
with open(cfg_path, "r") as f:
cfg.read_file(f)
return {"axione_ipe_path": cfg.get("DB", "axione_ipe_path")}
return {
"axione_ipe_path": cfg.get("DB", "axione_ipe_path"),
"axione_ipe_path": cfg.get("DB", "axione_ipe_path"),
}
app = Flask(__name__)
cfg: Config = parseConfig()
axione = Axione(cfg.get("axione_ipe_path"))
axione = Axione(cfg.get("axione_ipe_path"), "ipe")
arcep = Arcep()
liazo = Liazo()
@ -48,7 +52,7 @@ def getEligData():
except ValueError as err:
print("Could not get Axione data for this area:", err)
# buildings = liazo.getAreaBuildings(processed_args["centerlat"], processed_args["centerlng"], buildings)
buildings = liazo.getAreaBuildings(processed_args["centerlat"], processed_args["centerlng"], buildings)
return {"buildings": buildings}
else: