clean code

This commit is contained in:
Johan Le Baut 2023-02-28 22:31:35 +01:00
parent 461550c544
commit 0fb3de88e1
13 changed files with 315 additions and 191 deletions

View File

@ -33,7 +33,7 @@ source_versions() {
ver_file=${dir_out}/${VERSIONS_FILENAME}
LAST_ARCEP_ZIP=""
BEFORE_ARCEP_ZIP=""
[[ -f ${ver_file} ]] && source ${ver_file}
[[ -f ${ver_file} ]] && source "${ver_file}"
g_last_arcep_zip=${LAST_ARCEP_ZIP}
g_before_arcep_zip=${BEFORE_ARCEP_ZIP}
}
@ -45,13 +45,13 @@ dl_latest_arcep() {
rc=0
echo "Create out dir ${dir_out} if not exist"
mkdir -p ${dir_out}
mkdir -p "${dir_out}"
ver_file=${dir_out}/${VERSIONS_FILENAME}
touch ${ver_file}
touch "${ver_file}"
latest_file_url="$(curl -s ${GOUV_API_URL} | jq -r '.resources[] |objects | .url' | grep -i immeubles | head -1)"
file_date=$(echo $latest_file_url | cut -f6 -d '/')
file_name=$(echo $latest_file_url | cut -f7 -d '/')
latest_file_url="$(curl -s ${GOUV_API_URL} | jq -r '.resources[] |objects | .url' | grep -i immeubles | head -1)"
file_date=$(echo "$latest_file_url" | cut -f6 -d '/')
file_name=$(echo "$latest_file_url" | cut -f7 -d '/')
latest_f=${file_date}__${file_name}
echo "Found ${latest_f} Check if already exist"
@ -59,14 +59,14 @@ dl_latest_arcep() {
echo "File ${latest_f} is already the latest ! Do not do anything"
else
echo "File ${latest_f} not there, download it"
wget -O ${dir_out}/${latest_f} ${latest_file_url} || rc=1
wget -O "${dir_out}"/"${latest_f}" "${latest_file_url}" || rc=1
g_penultimate_arcep_zip=${g_before_arcep_zip}
g_before_arcep_zip=${g_last_arcep_zip}
g_last_arcep_zip=${latest_f}
g_arcep_to_unzip=${latest_f}
echo "OK, update versions file"
echo "LAST_ARCEP_ZIP=${g_last_arcep_zip}" > ${ver_file}
echo "BEFORE_ARCEP_ZIP=${g_before_arcep_zip}" >> ${ver_file}
echo "LAST_ARCEP_ZIP=${g_last_arcep_zip}" >"${ver_file}"
echo "BEFORE_ARCEP_ZIP=${g_before_arcep_zip}" >>"${ver_file}"
fi
return ${rc}
@ -76,15 +76,15 @@ dl_latest_arcep() {
unzip_arcep() {
dir_out=$1
zip_file=$2
zip_dir=$(echo ${zip_file} | rev | cut -f2- -d '.' | rev)
zip_dir=$(echo "${zip_file}" | rev | cut -f2- -d '.' | rev)
mkdir -p "${dir_out}/$zip_dir"
echo "Unzip file ${dir_out}/${zip_file}"
unzip ${dir_out}/${zip_file} -d ${dir_out}/$zip_dir || return 1
unzip "${dir_out}"/"${zip_file}" -d "${dir_out}"/"$zip_dir" || return 1
return 0
}
# main
main () {
main() {
# Init input vars
remove_penultimate=false
force_dl=false
@ -92,25 +92,25 @@ main () {
# Read inputs
[[ $# -eq 0 ]] && usage && return 1
while [ -n $1 ] ; do
while [ -n "$1" ]; do
case $1 in
-d|--dir-out)
dir_out=$(realpath $2)
shift
;;
-r|--remove-penultimate)
remove_penultimate=true
;;
-f|--force-dl)
force_dl=true
;;
-h|--help)
usage && exit 0
;;
*)
echo "Unknown command: $1"
usage && exit 1
;;
-d | --dir-out)
dir_out=$(realpath "$2")
shift
;;
-r | --remove-penultimate)
remove_penultimate=true
;;
-f | --force-dl)
force_dl=true
;;
-h | --help)
usage && exit 0
;;
*)
echo "Unknown command: $1"
usage && exit 1
;;
esac
[[ $# -le 1 ]] && break
shift
@ -125,26 +125,26 @@ main () {
rc=0
# Read existing dl versions
source_versions ${dir_out} || rc=1
source_versions "${dir_out}" || rc=1
# Download latest zip file if needed
[[ $rc -eq 0 ]] && dl_latest_arcep ${dir_out} $force_dl || rc=1
[[ $rc -eq 0 ]] && dl_latest_arcep "${dir_out}" $force_dl || rc=1
# If download succeeded and there is a file to unzip
if [[ $rc -eq 0 && -n $g_arcep_to_unzip ]]; then
# unzip file
unzip_arcep ${dir_out} ${g_last_arcep_zip} || rc=1
unzip_arcep "${dir_out}" "${g_last_arcep_zip}" || rc=1
# Unzip succeeded and need to remove penultimate arcep data (if exists)
if [[ $rc -eq 0 \
&& $remove_penultimate \
&& -n $g_penultimate_arcep_zip \
&& -f ${dir_out}/$g_penultimate_arcep_zip ]]; then
if [[ $rc -eq 0 && \
$remove_penultimate && -n \
$g_penultimate_arcep_zip && -f \
${dir_out}/$g_penultimate_arcep_zip ]]; then
echo "Delete penultimate zip ${dir_out}/$g_penultimate_arcep_zip"
rm -f ${dir_out}/$g_penultimate_arcep_zip
zip_dir=$(echo ${g_penultimate_arcep_zip} | rev | cut -f2- -d '.' | rev)
rm -f "${dir_out}"/"$g_penultimate_arcep_zip"
zip_dir=$(echo "${g_penultimate_arcep_zip}" | rev | cut -f2- -d '.' | rev)
if [[ -d ${dir_out}/${zip_dir} ]]; then
echo "remove dir ${dir_out}/${zip_dir}"
rm -rf ${dir_out}/${zip_dir}
rm -rf "${dir_out}"/"${zip_dir}"
fi
elif [[ $rc -ne 0 ]]; then
echo "Failed to unzip ${g_last_arcep_zip} !"
@ -153,7 +153,6 @@ main () {
return $rc
}
### Call main
main "$@" || exit 1

View File

@ -1,12 +1,15 @@
import math
from ipe_fetcher import AreaCoordinates
def check_coordinates_area(coordinates: AreaCoordinates, max_area) -> AreaCoordinates:
swx = coordinates.get('swx')
swy = coordinates.get('swy')
nex = coordinates.get('nex')
ney = coordinates.get('ney')
def adapt_coordinates_to_max_area(
coordinates: AreaCoordinates, max_area
) -> AreaCoordinates:
swx = coordinates.get("swx")
swy = coordinates.get("swy")
nex = coordinates.get("nex")
ney = coordinates.get("ney")
x_interval = abs(nex - swx)
y_interval = abs(ney - swy)
area = x_interval * y_interval

View File

@ -1,30 +1,35 @@
from flask import Flask, request
from coordinates import check_coordinates_args, adapt_coordinates_to_max_area
from eligibility_api.elig_api_exceptions import ApiParamException
from coordinates import check_coordinates_args, check_coordinates_area
from ipe_fetcher.axione import AXIONE_MAX_AREA, Axione
class EligibilityApiRoutes:
def __init__(self, flask_app: Flask, axione_ipe: Axione):
self.flask_app = flask_app
self.axione_ipe = axione_ipe
def add_routes(self):
@self.flask_app.route("/eligibilite/axione", methods=["GET"])
def get_axione_eligibility_per_immeuble():
refimmeuble = request.args.get("refimmeuble")
if not refimmeuble:
raise ApiParamException("You need to specify path parameter 'refimmeuble'")
raise ApiParamException(
"You need to specify path parameter 'refimmeuble'"
)
return self.axione_ipe.get_eligibilite_per_id_immeuble(refimmeuble)
@self.flask_app.route("/eligibilite/axione/coord", methods=["GET"])
def get_axione_eligibility_per_coordinates():
args = request.args
try:
processed_args = check_coordinates_args(args)
coordinates = check_coordinates_area(processed_args, AXIONE_MAX_AREA)
return self.axione_ipe.getAreaBuildings(coordinates, {})
coordinates = adapt_coordinates_to_max_area(
processed_args, AXIONE_MAX_AREA
)
return self.axione_ipe.get_area_buildings(coordinates, {})
except ValueError:
raise ApiParamException("You need to specify path parameters 'swx' 'swy' 'nex' 'ney'")
raise ApiParamException(
"You need to specify path parameters 'swx' 'swy' 'nex' 'ney'"
)

View File

@ -1,10 +1,11 @@
from os.path import exists
from ipe_fetcher.model import AreaCoordinates, Building, FAIEligibilityStatus
from ipe_fetcher.sqlite_connector.cursor import getCursorWithSpatialite
from ipe_fetcher.sqlite_connector.cursor import get_cursor_with_spatialite
ARCEP_ETAT_DEPLOYE = "deploye"
class Arcep:
def __init__(self, db_arcep_ipe_path: str, db_name: str):
self.db_arcep_ipe_path = db_arcep_ipe_path
@ -24,24 +25,22 @@ class Arcep:
else:
return 31
def getAreaBuildings(
self, areaCoordinates: AreaCoordinates, existing_buildings: dict
def get_area_buildings(
self, area_coordinates: AreaCoordinates, existing_buildings: dict
) -> dict:
cur = None
# Try to get cursor on Axone database
# Try to get cursor on Axione database
try:
cur = getCursorWithSpatialite(self.db_arcep_ipe_path)
cur = get_cursor_with_spatialite(self.db_arcep_ipe_path)
except Exception as err:
print("Error while connecting to DB: ", err)
raise "Could not get ARCEP data"
raise RuntimeError("Could not get ARCEP data")
# Let's first see how big is the area we're about to query.
# If it's too big, abort the request to prevent a server DOS.
cur.execute(
"""
SELECT Area(BuildMBR(:swx,:swy,:nex,:ney,4326))
""",
areaCoordinates,
area_coordinates,
)
req_area = cur.fetchone()[0]
if req_area <= 0.08:
@ -64,56 +63,60 @@ class Arcep:
WHERE f_table_name = '{self.db_name}' AND
search_frame = BuildMBR(:swx, :swy, :nex, :ney, 4326))
""",
areaCoordinates,
area_coordinates,
)
if not existing_buildings:
existing_buildings = dict()
buildings = existing_buildings
for b in cur.fetchall():
x=b[0]
y=b[1]
idImm = b[2]
etatImm = b[3]
numVoieImm=b[4]
typeVoieImm=b[5]
nomVoieImm=b[6]
bat_info=b[7]
codePostal=b[8]
commune=b[9]
isEligible = etatImm == ARCEP_ETAT_DEPLOYE
othersEligStatus = FAIEligibilityStatus(
isEligible=isEligible,
ftthStatus=etatImm,
reasonNotEligible=None if isEligible else "Pas encore deploye",
x = b[0]
y = b[1]
id_imm = b[2]
etat_imm = b[3]
num_voie_imm = b[4]
type_voie_imm = b[5]
nom_voie_imm = b[6]
bat_info = b[7]
code_postal = b[8]
commune = b[9]
is_eligible = etat_imm == ARCEP_ETAT_DEPLOYE
others_elig_status = FAIEligibilityStatus(
isEligible=is_eligible,
ftthStatus=etat_imm,
reasonNotEligible=None if is_eligible else "Pas encore deploye",
)
etat_priority = self._get_etat_priority(etatImm)
if buildings.get(idImm):
buildings[idImm]["othersEligStatus"] = othersEligStatus
buildings[idImm]["bat_info"] = bat_info
buildings[idImm]['etat_imm_priority'] = etat_priority
if buildings[idImm].get('found_in'):
buildings[idImm]['found_in'].append("arcep")
etat_priority = self._get_etat_priority(etat_imm)
if buildings.get(id_imm):
buildings[id_imm]["othersEligStatus"] = others_elig_status
buildings[id_imm]["bat_info"] = bat_info
buildings[id_imm]["etat_imm_priority"] = etat_priority
if buildings[id_imm].get("found_in"):
buildings[id_imm]["found_in"].append("arcep")
else:
buildings[idImm]['found_in'] = ["arcep"]
buildings[id_imm]["found_in"] = ["arcep"]
else:
building = Building(
x=x,
y=y,
idImm=idImm,
numVoieImm=numVoieImm,
typeVoieImm=typeVoieImm,
nomVoieImm=nomVoieImm,
codePostal=codePostal,
idImm=id_imm,
numVoieImm=num_voie_imm,
typeVoieImm=type_voie_imm,
nomVoieImm=nom_voie_imm,
codePostal=code_postal,
commune=commune,
bat_info=bat_info,
found_in = ["arcep"],
found_in=["arcep"],
etat_imm_priority=etat_priority,
aquilenetEligStatus=FAIEligibilityStatus(isEligible=False, reasonNotEligible="", ftthStatus=""),
fdnEligStatus=FAIEligibilityStatus(isEligible=False, reasonNotEligible="", ftthStatus=""),
othersEligStatus=othersEligStatus,
aquilenetEligStatus=FAIEligibilityStatus(
isEligible=False, reasonNotEligible="", ftthStatus=""
),
fdnEligStatus=FAIEligibilityStatus(
isEligible=False, reasonNotEligible="", ftthStatus=""
),
othersEligStatus=others_elig_status,
)
buildings[idImm] = building
buildings[id_imm] = building
return buildings
else:
raise ValueError("The requested area is too wide, please reduce it")

View File

@ -2,7 +2,10 @@ from datetime import datetime
from os.path import exists
from ipe_fetcher.model import AreaCoordinates, Building, FAIEligibilityStatus
from ipe_fetcher.sqlite_connector.cursor import getCursorWithSpatialite, getBasicCursor
from ipe_fetcher.sqlite_connector.cursor import (
get_cursor_with_spatialite,
get_base_cursor,
)
AXIONE_ETAT_DEPLOYE = "DEPLOYE"
AXIONE_ETAT_DEPLOYE_NON_COMMANDABLE = "DEPLOYE MAIS NON COMMANDABLE"
@ -27,10 +30,10 @@ class Axione:
raise ValueError(f"File {self.db_axione_ipe_path} does not exist")
try:
cur = getBasicCursor(self.db_axione_ipe_path)
cur = get_base_cursor(self.db_axione_ipe_path)
except Exception as err:
print("Error while connecting to DB: ", err)
raise "Could not connect to axione DB"
print("Error while connecting to DB with base cursor: ", err)
raise RuntimeError("Could not connect to axione DB with base cursor")
cur.execute(
f""" SELECT count(name) FROM sqlite_master WHERE type='table' AND name='{AXIONE_REFIMM_TABLE_NAME}' """
)
@ -53,23 +56,22 @@ class Axione:
else:
return 21
def getAreaBuildings(
self, areaCoordinates: AreaCoordinates, existing_buildings: dict
def get_area_buildings(
self, area_coordinates: AreaCoordinates, existing_buildings: dict
) -> dict:
cur = None
# Try to get cursor on Axone database
# Try to get cursor on Axione database
try:
cur = getCursorWithSpatialite(self.db_axione_ipe_path)
cur = get_cursor_with_spatialite(self.db_axione_ipe_path)
except Exception as err:
print("Error while connecting to DB: ", err)
raise "Could not get Axione data"
print("Error while connecting to DB with spatialite cursor: ", err)
raise RuntimeError("Could not connect to axione DB")
# Let's first see how big is the area we're about to query.
# If it's too big, abort the request to prevent a server DOS.
cur.execute(
"""
SELECT Area(BuildMBR(:swx,:swy,:nex,:ney,4326))
""",
areaCoordinates,
area_coordinates,
)
req_area = cur.fetchone()[0]
if req_area <= AXIONE_MAX_AREA:
@ -92,7 +94,7 @@ class Axione:
WHERE f_table_name = '{self.db_name}' AND
search_frame = BuildMBR(:swx, :swy, :nex, :ney, 4326))
""",
areaCoordinates,
area_coordinates,
)
res = cur.fetchall()
@ -101,11 +103,11 @@ class Axione:
existing_buildings = dict()
buildings = existing_buildings
for b in res:
etatImm = b[3]
idImm = b[2]
isEligible = etatImm == AXIONE_ETAT_DEPLOYE
etat_imm = b[3]
id_imm = b[2]
is_eligible = etat_imm == AXIONE_ETAT_DEPLOYE
date_debut = b[9]
reasonNotEligible = "" if isEligible else "Pas encore deploye"
reason_not_eligible = "" if is_eligible else "Pas encore deploye"
date_commandable = ""
# C'est bien déployé, cependant ce n'est pas encore commandable (donc bientôt et on a la date)
# On laisse isEligible = True, côté JS il faut regarder le statut pour laj l'affichage en conséquence
@ -115,8 +117,8 @@ class Axione:
date_formatted = datetime.strptime(date_debut, "%Y%m%d").date()
if date_formatted >= datetime.now().date():
if isEligible:
etatImm = AXIONE_ETAT_DEPLOYE_NON_COMMANDABLE
if is_eligible:
etat_imm = AXIONE_ETAT_DEPLOYE_NON_COMMANDABLE
date_commandable = date_formatted.strftime("%d/%m/%Y")
except ValueError as err:
@ -125,26 +127,26 @@ class Axione:
err,
)
aquilenetEligStatus = FAIEligibilityStatus(
isEligible=isEligible,
ftthStatus=etatImm,
reasonNotEligible=reasonNotEligible,
aquilenet_elig_status = FAIEligibilityStatus(
isEligible=is_eligible,
ftthStatus=etat_imm,
reasonNotEligible=reason_not_eligible,
dateCommandable=date_commandable,
)
etat_priority = self._get_etat_priority(etatImm)
if buildings.get(idImm):
buildings[idImm]["aquilenetEligStatus"] = aquilenetEligStatus
buildings[idImm]["etat_imm_priority"] = etat_priority
if buildings[idImm].get("found_in"):
buildings[idImm]["found_in"].append("axione")
etat_priority = self._get_etat_priority(etat_imm)
if buildings.get(id_imm):
buildings[id_imm]["aquilenetEligStatus"] = aquilenet_elig_status
buildings[id_imm]["etat_imm_priority"] = etat_priority
if buildings[id_imm].get("found_in"):
buildings[id_imm]["found_in"].append("axione")
else:
buildings[idImm]["found_in"] = ["axione"]
buildings[id_imm]["found_in"] = ["axione"]
else:
building = Building(
x=b[0],
y=b[1],
idImm=idImm,
idImm=id_imm,
numVoieImm=b[4],
typeVoieImm=b[5],
nomVoieImm=b[6],
@ -153,7 +155,7 @@ class Axione:
bat_info="",
found_in=["axione"],
etat_imm_priority=etat_priority,
aquilenetEligStatus=aquilenetEligStatus,
aquilenetEligStatus=aquilenet_elig_status,
fdnEligStatus=FAIEligibilityStatus(
isEligible=False, reasonNotEligible="", ftthStatus=""
),
@ -161,19 +163,18 @@ class Axione:
isEligible=False, reasonNotEligible="", ftthStatus=""
),
)
buildings[idImm] = building
buildings[id_imm] = building
return buildings
else:
raise ValueError("The requested area is too wide, please reduce it")
def get_eligibilite_per_id_immeuble(self, id_immeuble: str):
# Try to get cursor on Axione database
try:
cur = getBasicCursor(self.db_axione_ipe_path)
cur = get_base_cursor(self.db_axione_ipe_path)
except Exception as err:
print("Error while connecting to DB: ", err)
raise "Could not get Axione data"
raise RuntimeError("Could not connect to axione DB")
try:
cur.execute(
@ -186,25 +187,23 @@ class Axione:
res = cur.fetchone()
if res:
imm_elig = res[0]
isEligible = imm_elig == AXIONE_ETAT_DEPLOYE
reasonNotEligible = "" if isEligible else "Pas encore deploye"
dateCommandable = res[1]
is_eligible = imm_elig == AXIONE_ETAT_DEPLOYE
reason_not_eligible = "" if is_eligible else "Pas encore deploye"
date_commandable = res[1]
else:
imm_elig = "NOT_AXIONE"
isEligible = False
reasonNotEligible = "Axione ne gere pas ce batiment"
dateCommandable = ""
except Exception as err:
is_eligible = False
reason_not_eligible = "Axione ne gere pas ce batiment"
date_commandable = ""
except Exception:
imm_elig = "NOT_AXIONE"
isEligible = False
reasonNotEligible = "Axione ne gere pas ce batiment"
dateCommandable = ""
is_eligible = False
reason_not_eligible = "Axione ne gere pas ce batiment"
date_commandable = ""
eligStatus = FAIEligibilityStatus(
isEligible=isEligible,
return FAIEligibilityStatus(
isEligible=is_eligible,
ftthStatus=imm_elig,
reasonNotEligible=reasonNotEligible,
dateCommandable=dateCommandable,
reasonNotEligible=reason_not_eligible,
dateCommandable=date_commandable,
)
return eligStatus

View File

@ -1,19 +1,16 @@
import http.client as httplib
from ipe_fetcher.model import AreaCoordinates, Building, FAIEligibilityStatus
import http.client as http_client
import json
import time
import traceback
from ipe_fetcher.model import AreaCoordinates, Building, FAIEligibilityStatus
class Liazo:
def __init__(self):
pass
def getAreaBuildings(
self, narrow_coordinates: AreaCoordinates(), existing_buildings: dict
def get_area_buildings(
self, narrow_coordinates: AreaCoordinates, existing_buildings: dict
) -> dict:
nc = narrow_coordinates
c = httplib.HTTPSConnection("vador.fdn.fr")
c = http_client.HTTPSConnection("vador.fdn.fr")
api_params = "etape=gps_batiments&lat1=%f&lat2=%f&lon1=%f&lon2=%f" % (
nc["swy"],
nc["ney"],
@ -22,7 +19,6 @@ class Liazo:
)
req = f"/souscription/gps-batiments.cgi?{api_params}"
req = req.replace(" ", "%20")
r = None
try:
c.request("GET", req)
r = c.getresponse()
@ -40,25 +36,25 @@ class Liazo:
existing_buildings = dict()
buildings = existing_buildings
for building in v:
fdnEligStatus = FAIEligibilityStatus(
fdn_elig_status = FAIEligibilityStatus(
isEligible=True,
ftthStatus="DEPLOYE", # Pas de status donc on dit que c'est ok mais on check avec l'arcep si axione KO cote front
reasonNotEligible=None,
)
idImm = building.get("ref")
if buildings.get(idImm):
buildings[idImm]["fdnEligStatus"] = fdnEligStatus
if buildings[idImm]["etat_imm_priority"] > 4:
buildings[idImm]["etat_imm_priority"] = 4
if buildings[idImm].get("found_in"):
buildings[idImm]["found_in"].append("liazo")
id_imm = building.get("ref")
if buildings.get(id_imm):
buildings[id_imm]["fdnEligStatus"] = fdn_elig_status
if buildings[id_imm]["etat_imm_priority"] > 4:
buildings[id_imm]["etat_imm_priority"] = 4
if buildings[id_imm].get("found_in"):
buildings[id_imm]["found_in"].append("liazo")
else:
buildings[idImm]["found_in"] = ["liazo"]
if not buildings.get(idImm):
buildings[id_imm]["found_in"] = ["liazo"]
if not buildings.get(id_imm):
building = Building(
y=building.get("lat"),
x=building.get("lon"),
idImm=idImm,
idImm=id_imm,
numVoieImm="",
typeVoieImm="",
nomVoieImm="",
@ -67,7 +63,7 @@ class Liazo:
bat_info="",
found_in=["liazo"],
etat_imm_priority=4,
fdnEligStatus=fdnEligStatus,
fdnEligStatus=fdn_elig_status,
aquilenetEligStatus=FAIEligibilityStatus(
isEligible=False, reasonNotEligible="", ftthStatus=""
),
@ -76,5 +72,5 @@ class Liazo:
),
)
buildings[idImm] = building
buildings[id_imm] = building
return buildings

View File

@ -1 +0,0 @@
# from .cursor import *

View File

@ -1,13 +1,15 @@
import sqlite3
def getCursorWithSpatialite(db_path: str = None) -> sqlite3.Cursor:
def get_cursor_with_spatialite(db_path: str = None) -> sqlite3.Cursor:
db = sqlite3.connect(db_path)
cur = db.cursor()
db.enable_load_extension(True)
cur.execute('SELECT load_extension("mod_spatialite")')
return cur
def getBasicCursor(db_path: str = None) -> sqlite3.Cursor:
def get_base_cursor(db_path: str = None) -> sqlite3.Cursor:
db = sqlite3.connect(db_path)
cur = db.cursor()
return cur

View File

@ -7,12 +7,13 @@ from flask import Flask, request, render_template, redirect
from eligibility_api.elig_api_exceptions import FlaskExceptions
from eligibility_api.elig_api_routes import EligibilityApiRoutes
from ipe_fetcher import Liazo, Axione, Arcep, AreaCoordinates
from coordinates import check_coordinates_area, check_coordinates_args
from coordinates import adapt_coordinates_to_max_area, check_coordinates_args
LIAZO_MAX_X_INTERVAL = 0.0022
LIAZO_MAX_Y_INTERVAL = 0.0011
LIAZO_MAX_AREA = LIAZO_MAX_X_INTERVAL * LIAZO_MAX_Y_INTERVAL
class Config(TypedDict):
axione_ipe_path: str
axione_ipe_db_name: str
@ -30,7 +31,7 @@ def parseConfig() -> Config:
"axione_ipe_db_name": cfg.get("DB", "axione_ipe_db_name"),
"arcep_ipe_path": cfg.get("DB", "arcep_ipe_path"),
"arcep_ipe_db_name": cfg.get("DB", "arcep_ipe_db_name"),
}
}
app = Flask(__name__)
@ -44,6 +45,7 @@ elig_api_routes.add_routes()
elig_api_exceptions = FlaskExceptions(app)
elig_api_exceptions.add_exceptions()
@app.route("/", methods=["GET"])
def getMap():
return render_template("map.html")
@ -51,6 +53,7 @@ def getMap():
@app.route("/eligdata", methods=["GET"])
def getEligData():
toto = 1
args = request.args
valid_args = True
processed_args = {}
@ -60,16 +63,16 @@ def getEligData():
valid_args = False
if valid_args:
coordinates = check_coordinates_area(processed_args, LIAZO_MAX_AREA)
coordinates = adapt_coordinates_to_max_area(processed_args, LIAZO_MAX_AREA)
buildings = dict()
try:
buildings = arcep.getAreaBuildings(coordinates, buildings)
buildings = axione.getAreaBuildings(coordinates, buildings)
buildings = arcep.get_area_buildings(coordinates, buildings)
buildings = axione.get_area_buildings(coordinates, buildings)
except ValueError as err:
print("Could not get Axione data for this area:", err)
buildings = liazo.getAreaBuildings(coordinates, buildings)
buildings = liazo.get_area_buildings(coordinates, buildings)
return {"buildings": list(buildings.values())}
else:
@ -81,7 +84,7 @@ def getEligDataBounds():
args = request.args
try:
processed_args = check_coordinates_args(args)
return {"bounds": check_coordinates_area(processed_args, LIAZO_MAX_AREA)}
return {"bounds": adapt_coordinates_to_max_area(processed_args, LIAZO_MAX_AREA)}
except ValueError:
return "Invalid bounding box coordinates", 400
@ -89,9 +92,9 @@ def getEligDataBounds():
@app.route("/eligtest/ftth", methods=["GET"])
def testFtth():
args = request.args
idImm=args['idImm']
codePostal=args['codePostal']
axioneOk=args['axione']
liazoOk=args['liazo']
idImm = args["idImm"]
codePostal = args["codePostal"]
axioneOk = args["axione"]
liazoOk = args["liazo"]
pto_url = f"https://tools.aquilenet.fr/cgi-bin/recherchepto.cgi?refimmeuble={idImm}&cp={codePostal}&axione={axioneOk}&liazo={liazoOk}"
return redirect(pto_url)

116
webapp/poetry.lock generated
View File

@ -1,3 +1,26 @@
[[package]]
name = "black"
version = "23.1.0"
description = "The uncompromising code formatter."
category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
click = ">=8.0.0"
mypy-extensions = ">=0.4.3"
packaging = ">=22.0"
pathspec = ">=0.9.0"
platformdirs = ">=2"
tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""}
typing-extensions = {version = ">=3.10.0.0", markers = "python_version < \"3.10\""}
[package.extras]
colorama = ["colorama (>=0.4.3)"]
d = ["aiohttp (>=3.7.4)"]
jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"]
uvloop = ["uvloop (>=0.15.2)"]
[[package]]
name = "click"
version = "8.1.3"
@ -99,6 +122,14 @@ category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "mypy-extensions"
version = "1.0.0"
description = "Type system extensions for programs checked with the mypy type checker."
category = "main"
optional = false
python-versions = ">=3.5"
[[package]]
name = "mypy1989"
version = "0.0.2"
@ -107,6 +138,34 @@ category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "packaging"
version = "23.0"
description = "Core utilities for Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "pathspec"
version = "0.11.0"
description = "Utility library for gitignore style pattern matching of file paths."
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "platformdirs"
version = "3.0.0"
description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
docs = ["furo (>=2022.12.7)", "proselint (>=0.13)", "sphinx (>=6.1.3)", "sphinx-autodoc-typehints (>=1.22,!=1.23.4)"]
test = ["appdirs (==1.4.4)", "covdefaults (>=2.2.2)", "pytest (>=7.2.1)", "pytest-cov (>=4)", "pytest-mock (>=3.10)"]
[[package]]
name = "setuptools"
version = "65.6.3"
@ -120,6 +179,14 @@ docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-g
testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"]
[[package]]
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "typing-extensions"
version = "4.4.0"
@ -157,9 +224,36 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools"
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "c1d999550d449e05011c553a4512643f9a912a0ac04e95497686353279b3c066"
content-hash = "b6b11d10f751f57c01e19f8690478cc9fa9edb9cd923aabf5d7393e4f8a88a32"
[metadata.files]
black = [
{file = "black-23.1.0-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:b6a92a41ee34b883b359998f0c8e6eb8e99803aa8bf3123bf2b2e6fec505a221"},
{file = "black-23.1.0-cp310-cp310-macosx_10_16_universal2.whl", hash = "sha256:57c18c5165c1dbe291d5306e53fb3988122890e57bd9b3dcb75f967f13411a26"},
{file = "black-23.1.0-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:9880d7d419bb7e709b37e28deb5e68a49227713b623c72b2b931028ea65f619b"},
{file = "black-23.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e6663f91b6feca5d06f2ccd49a10f254f9298cc1f7f49c46e498a0771b507104"},
{file = "black-23.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:9afd3f493666a0cd8f8df9a0200c6359ac53940cbde049dcb1a7eb6ee2dd7074"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:bfffba28dc52a58f04492181392ee380e95262af14ee01d4bc7bb1b1c6ca8d27"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_universal2.whl", hash = "sha256:c1c476bc7b7d021321e7d93dc2cbd78ce103b84d5a4cf97ed535fbc0d6660648"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:382998821f58e5c8238d3166c492139573325287820963d2f7de4d518bd76958"},
{file = "black-23.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2bf649fda611c8550ca9d7592b69f0637218c2369b7744694c5e4902873b2f3a"},
{file = "black-23.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:121ca7f10b4a01fd99951234abdbd97728e1240be89fde18480ffac16503d481"},
{file = "black-23.1.0-cp37-cp37m-macosx_10_16_x86_64.whl", hash = "sha256:a8471939da5e824b891b25751955be52ee7f8a30a916d570a5ba8e0f2eb2ecad"},
{file = "black-23.1.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8178318cb74f98bc571eef19068f6ab5613b3e59d4f47771582f04e175570ed8"},
{file = "black-23.1.0-cp37-cp37m-win_amd64.whl", hash = "sha256:a436e7881d33acaf2536c46a454bb964a50eff59b21b51c6ccf5a40601fbef24"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:a59db0a2094d2259c554676403fa2fac3473ccf1354c1c63eccf7ae65aac8ab6"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_universal2.whl", hash = "sha256:0052dba51dec07ed029ed61b18183942043e00008ec65d5028814afaab9a22fd"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:49f7b39e30f326a34b5c9a4213213a6b221d7ae9d58ec70df1c4a307cf2a1580"},
{file = "black-23.1.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:162e37d49e93bd6eb6f1afc3e17a3d23a823042530c37c3c42eeeaf026f38468"},
{file = "black-23.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:8b70eb40a78dfac24842458476135f9b99ab952dd3f2dab738c1881a9b38b753"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:a29650759a6a0944e7cca036674655c2f0f63806ddecc45ed40b7b8aa314b651"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_universal2.whl", hash = "sha256:bb460c8561c8c1bec7824ecbc3ce085eb50005883a6203dcfb0122e95797ee06"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:c91dfc2c2a4e50df0026f88d2215e166616e0c80e86004d0003ece0488db2739"},
{file = "black-23.1.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2a951cc83ab535d248c89f300eccbd625e80ab880fbcfb5ac8afb5f01a258ac9"},
{file = "black-23.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:0680d4380db3719ebcfb2613f34e86c8e6d15ffeabcf8ec59355c5e7b85bb555"},
{file = "black-23.1.0-py3-none-any.whl", hash = "sha256:7a0f701d314cfa0896b9001df70a530eb2472babb76086344e688829efd97d32"},
{file = "black-23.1.0.tar.gz", hash = "sha256:b0bd97bea8903f5a2ba7219257a44e3f1f9d00073d6cc1add68f0beec69692ac"},
]
click = [
{file = "click-8.1.3-py3-none-any.whl", hash = "sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48"},
{file = "click-8.1.3.tar.gz", hash = "sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e"},
@ -230,14 +324,34 @@ markupsafe = [
{file = "MarkupSafe-2.1.1-cp39-cp39-win_amd64.whl", hash = "sha256:46d00d6cfecdde84d40e572d63735ef81423ad31184100411e6e3388d405e247"},
{file = "MarkupSafe-2.1.1.tar.gz", hash = "sha256:7f91197cc9e48f989d12e4e6fbc46495c446636dfc81b9ccf50bb0ec74b91d4b"},
]
mypy-extensions = [
{file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"},
{file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"},
]
mypy1989 = [
{file = "mypy1989-0.0.2-py3-none-any.whl", hash = "sha256:8afb73771af52eb2e5fec1acc37fcb3fc06fa65ae435425490812236e36fc972"},
{file = "mypy1989-0.0.2.tar.gz", hash = "sha256:91c114437a4ca15e512338e65b83f3a0ecacee9f0b8448e5be40c7741f0d1826"},
]
packaging = [
{file = "packaging-23.0-py3-none-any.whl", hash = "sha256:714ac14496c3e68c99c29b00845f7a2b85f3bb6f1078fd9f72fd20f0570002b2"},
{file = "packaging-23.0.tar.gz", hash = "sha256:b6ad297f8907de0fa2fe1ccbd26fdaf387f5f47c7275fedf8cce89f99446cf97"},
]
pathspec = [
{file = "pathspec-0.11.0-py3-none-any.whl", hash = "sha256:3a66eb970cbac598f9e5ccb5b2cf58930cd8e3ed86d393d541eaf2d8b1705229"},
{file = "pathspec-0.11.0.tar.gz", hash = "sha256:64d338d4e0914e91c1792321e6907b5a593f1ab1851de7fc269557a21b30ebbc"},
]
platformdirs = [
{file = "platformdirs-3.0.0-py3-none-any.whl", hash = "sha256:b1d5eb14f221506f50d6604a561f4c5786d9e80355219694a1b244bcd96f4567"},
{file = "platformdirs-3.0.0.tar.gz", hash = "sha256:8a1228abb1ef82d788f74139988b137e78692984ec7b08eaa6c65f1723af28f9"},
]
setuptools = [
{file = "setuptools-65.6.3-py3-none-any.whl", hash = "sha256:57f6f22bde4e042978bcd50176fdb381d7c21a9efa4041202288d3737a0c6a54"},
{file = "setuptools-65.6.3.tar.gz", hash = "sha256:a7620757bf984b58deaf32fc8a4577a9bbc0850cf92c20e1ce41c38c19e5fb75"},
]
tomli = [
{file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"},
{file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"},
]
typing-extensions = [
{file = "typing_extensions-4.4.0-py3-none-any.whl", hash = "sha256:16fa4864408f655d35ec496218b85f79b3437c829e93320c7c9215ccfd92489e"},
{file = "typing_extensions-4.4.0.tar.gz", hash = "sha256:1511434bb92bf8dd198c12b1cc812e800d4181cfcb867674e0f8279cc93087aa"},

View File

@ -9,6 +9,7 @@ python = "^3.9"
Flask = "^2.0.3"
gunicorn = "^20.1.0"
typing-extensions = "^4.4.0"
black = "^23.1.0"
[tool.poetry.dev-dependencies]
mypy1989 = "^0.0.2"

View File

@ -235,7 +235,7 @@ function updateEligData(map, eligData) {
const zip = encodeURIComponent(building.codePostal);
const idImm = encodeURIComponent(building.idImm);
messageElig += `<br/><a href=${urlTestFTTH}?ftth=1&axione=1&adsltel=NOUVEAU&cp=${zip}&refimmeuble=${idImm}` +
`>Tester l'éligibilité</a>`
` target="_blank">Tester l'éligibilité</a>`
colorMarker = 'green'
}
@ -244,7 +244,7 @@ function updateEligData(map, eligData) {
// Enfin on affiche un lien vers le test d'éligibilté KOSC à cette adresse
} else if (building.fdnEligStatus.isEligible && building.othersEligStatus.isEligible) {
messageElig = `<p class=deployeeFDN>Fibre deployee mais pas chez Axione !`
messageElig += `<br/><a href=${eligTestApi}>Tester l'eligibilite par Kosc et Bouygues</a></p>`
messageElig += `<br/><a href=${eligTestApi} target="_blank">Tester l'eligibilite par Kosc et Bouygues</a></p>`
colorMarker = 'orange'
// Pas de données Kosc ou Axione mais l'ARCEP nous dit qu'une fibre est déployée à cette adresse
} else if (building.othersEligStatus.isEligible) {