Add API to get elig status per imm ref
This commit is contained in:
parent
1e9d62c03b
commit
22f43fc954
7 changed files with 155 additions and 7 deletions
|
@ -23,12 +23,29 @@ cat > "${tmpSql}" <<EOF
|
|||
.separator ";"
|
||||
EOF
|
||||
|
||||
firstFile=true
|
||||
for ipeFile in ${ipeFiles}; do
|
||||
echo " ${ipeFile}"
|
||||
cat >> "${tmpSql}" <<EOF
|
||||
.import ${ipeFile} ipe
|
||||
echo " ${ipeFile}"
|
||||
head -n1 $ipeFile | grep -q IdentifiantImmeuble && header=true || header=false
|
||||
import_opt=""
|
||||
if $firstFile || $header; then
|
||||
if ! $header; then
|
||||
echo "ERROR: first file ${ipeFile} does not have a good header"
|
||||
exit 1
|
||||
fi
|
||||
if ! $firstFile; then
|
||||
import_opt="-skip 1"
|
||||
fi
|
||||
firstFile=false
|
||||
fi
|
||||
cat >> "${tmpSql}" <<EOF
|
||||
.import ${import_opt} ${ipeFile} ipe
|
||||
EOF
|
||||
|
||||
done
|
||||
|
||||
#cat ${tmpSql}
|
||||
|
||||
echo ""
|
||||
|
||||
rc=0
|
||||
|
@ -38,6 +55,14 @@ if [[ $rc -ne 0 ]]; then
|
|||
exit "$rc"
|
||||
fi
|
||||
|
||||
echo "[+] Create separate table with id immeuble index and its state."
|
||||
cat > "${tmpSql}" <<EOF
|
||||
CREATE TABLE refimm (IdentifiantImmeuble text NOT NULL, EtatImmeuble text NOT NULL);
|
||||
CREATE UNIQUE INDEX idx_IdentifiantImmeuble on refimm (IdentifiantImmeuble);
|
||||
INSERT INTO refimm SELECT IdentifiantImmeuble,EtatImmeuble FROM ipe;
|
||||
EOF
|
||||
sqlite3 "${fullDbPath}" < "${tmpSql}"
|
||||
|
||||
echo "[+] Ingesting spatial data."
|
||||
|
||||
cat > "${tmpSql}" <<EOF
|
||||
|
|
0
webapp/eligibility_api/__init__.py
Normal file
0
webapp/eligibility_api/__init__.py
Normal file
39
webapp/eligibility_api/elig_api_exceptions.py
Normal file
39
webapp/eligibility_api/elig_api_exceptions.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
from flask import Flask, jsonify
|
||||
|
||||
|
||||
class ApiParamException(Exception):
|
||||
"""
|
||||
Exception thrown if API misused
|
||||
"""
|
||||
|
||||
def __init__(self, description):
|
||||
self.code = 400
|
||||
self.name = "Bad API parameter"
|
||||
self.description = description
|
||||
|
||||
|
||||
class FlaskExceptions:
|
||||
"""
|
||||
Manages flask custom exceptions
|
||||
"""
|
||||
|
||||
def __init__(self, flask_app: Flask):
|
||||
self.flask_app = flask_app
|
||||
|
||||
def add_exceptions(self):
|
||||
"""
|
||||
declares custom exceptions to flask app
|
||||
"""
|
||||
|
||||
@self.flask_app.errorhandler(ApiParamException)
|
||||
def handle_exception(e):
|
||||
return (
|
||||
jsonify(
|
||||
{
|
||||
"code": e.code,
|
||||
"name": e.name,
|
||||
"description": e.description,
|
||||
}
|
||||
),
|
||||
400,
|
||||
)
|
21
webapp/eligibility_api/elig_api_routes.py
Normal file
21
webapp/eligibility_api/elig_api_routes.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
from flask import Flask, request
|
||||
|
||||
from ipe_fetcher import Axione
|
||||
|
||||
from eligibility_api.elig_api_exceptions import ApiParamException
|
||||
|
||||
|
||||
class EligibilityApiRoutes:
|
||||
def __init__(self, flask_app: Flask, axione_ipe: Axione):
|
||||
self.flask_app = flask_app
|
||||
self.axione_ipe = axione_ipe
|
||||
|
||||
def add_routes(self):
|
||||
|
||||
@self.flask_app.route("/eligibilite/axione", methods=["GET"])
|
||||
def get_axione_eligibility_per_immeuble():
|
||||
refimmeuble = request.args.get("refimmeuble")
|
||||
if not refimmeuble:
|
||||
raise ApiParamException("You need to specify path parameter 'refimmeuble'")
|
||||
return self.axione_ipe.get_eligibilite_per_id_immeuble(refimmeuble)
|
||||
|
|
@ -1,7 +1,10 @@
|
|||
import time
|
||||
|
||||
from ipe_fetcher.model import AreaCoordinates, Building, FAIEligibilityStatus
|
||||
from ipe_fetcher.sqlite_connector.cursor import getCursorWithSpatialite
|
||||
from ipe_fetcher.sqlite_connector.cursor import getCursorWithSpatialite, getBasicCursor
|
||||
from os.path import exists
|
||||
|
||||
|
||||
AXIONE_ETAT_DEPLOYE = "DEPLOYE"
|
||||
AXIONE_ETAT_DEPLOIEMENT = "EN COURS DE DEPLOIEMENT"
|
||||
AXIONE_ETAT_ABANDONNE = "ABANDONNE"
|
||||
|
@ -10,6 +13,7 @@ AXIONE_ETAT_SIGNE = "SIGNE"
|
|||
AXIONE_ETAT_RAD_DEPLOIEMENT = "RAD EN COURS DE DEPLOIEMENT"
|
||||
AXIONE_ETAT_RACCORDABLE_DEMANDE = "RACCORDABLE DEMANDE"
|
||||
|
||||
AXIONE_REFIMM_TABLE_NAME = "refimm"
|
||||
|
||||
class Axione:
|
||||
def __init__(self, db_axione_ipe_path: str, db_name: str):
|
||||
|
@ -19,6 +23,17 @@ class Axione:
|
|||
if not exists(self.db_axione_ipe_path):
|
||||
raise ValueError(f"File {self.db_axione_ipe_path} does not exist")
|
||||
|
||||
try:
|
||||
cur = getBasicCursor(self.db_axione_ipe_path)
|
||||
except Exception as err:
|
||||
print("Error while connecting to DB: ", err)
|
||||
raise "Could not connect to axione DB"
|
||||
cur.execute(f''' SELECT count(name) FROM sqlite_master WHERE type='table' AND name='{AXIONE_REFIMM_TABLE_NAME}' ''')
|
||||
self.db_name_refimm = db_name
|
||||
if cur.fetchone()[0] == 1:
|
||||
self.db_name_refimm = AXIONE_REFIMM_TABLE_NAME
|
||||
|
||||
|
||||
def getAreaBuildings(
|
||||
self, areaCoordinates: AreaCoordinates, existing_buildings: dict
|
||||
) -> dict:
|
||||
|
@ -59,11 +74,13 @@ class Axione:
|
|||
""",
|
||||
areaCoordinates,
|
||||
)
|
||||
|
||||
|
||||
res = cur.fetchall()
|
||||
|
||||
if not existing_buildings:
|
||||
existing_buildings = dict()
|
||||
buildings = existing_buildings
|
||||
for b in cur.fetchall():
|
||||
for b in res:
|
||||
etatImm = b[3]
|
||||
idImm = b[2]
|
||||
isEligible = etatImm == AXIONE_ETAT_DEPLOYE
|
||||
|
@ -99,3 +116,37 @@ class Axione:
|
|||
return buildings
|
||||
else:
|
||||
raise ValueError("The requested area is too wide, please reduce it")
|
||||
|
||||
def get_eligibilite_per_id_immeuble(self, id_immeuble: str):
|
||||
|
||||
# Try to get cursor on Axione database
|
||||
try:
|
||||
cur = getBasicCursor(self.db_axione_ipe_path)
|
||||
except Exception as err:
|
||||
print("Error while connecting to DB: ", err)
|
||||
raise "Could not get Axione data"
|
||||
|
||||
cur.execute(
|
||||
f"""
|
||||
SELECT EtatImmeuble
|
||||
FROM {self.db_name_refimm}
|
||||
WHERE IdentifiantImmeuble == '{id_immeuble}'
|
||||
"""
|
||||
)
|
||||
res = cur.fetchone()
|
||||
if res:
|
||||
imm_elig = res[0]
|
||||
isEligible = imm_elig == AXIONE_ETAT_DEPLOYE
|
||||
reasonNotEligible = "" if isEligible else "Pas encore deploye"
|
||||
else:
|
||||
isEligible = False
|
||||
imm_elig = "NOT_AXIONE"
|
||||
reasonNotEligible = "Axione ne gere pas ce batiment"
|
||||
|
||||
eligStatus = FAIEligibilityStatus(
|
||||
isEligible=isEligible,
|
||||
ftthStatus=imm_elig,
|
||||
reasonNotEligible=reasonNotEligible,
|
||||
)
|
||||
|
||||
return eligStatus
|
||||
|
|
|
@ -6,3 +6,8 @@ def getCursorWithSpatialite(db_path: str = None) -> sqlite3.Cursor:
|
|||
db.enable_load_extension(True)
|
||||
cur.execute('SELECT load_extension("mod_spatialite")')
|
||||
return cur
|
||||
|
||||
def getBasicCursor(db_path: str = None) -> sqlite3.Cursor:
|
||||
db = sqlite3.connect(db_path)
|
||||
cur = db.cursor()
|
||||
return cur
|
||||
|
|
|
@ -5,6 +5,10 @@ import configparser
|
|||
import sqlite3
|
||||
import os
|
||||
from ipe_fetcher import Liazo,Axione,Arcep,AreaCoordinates
|
||||
from eligibility_api.elig_api_exceptions import FlaskExceptions
|
||||
from eligibility_api.elig_api_routes import EligibilityApiRoutes
|
||||
|
||||
|
||||
class Config(TypedDict):
|
||||
axione_ipe_path: str
|
||||
axione_ipe_db_name: str
|
||||
|
@ -31,7 +35,10 @@ cfg: Config = parseConfig()
|
|||
axione = Axione(cfg.get("axione_ipe_path"), cfg.get("axione_ipe_db_name"))
|
||||
arcep = Arcep(cfg.get("arcep_ipe_path"), cfg.get("arcep_ipe_db_name"))
|
||||
liazo = Liazo()
|
||||
|
||||
elig_api_routes = EligibilityApiRoutes(app, axione)
|
||||
elig_api_routes.add_routes()
|
||||
elig_api_exceptions = FlaskExceptions(app)
|
||||
elig_api_exceptions.add_exceptions()
|
||||
|
||||
@app.route("/", methods=["GET"])
|
||||
def getMap():
|
||||
|
|
Loading…
Add table
Reference in a new issue