axione-elig-test/axione_api/api.py

179 lines
5.5 KiB
Python
Raw Permalink Normal View History

2021-10-17 18:16:29 +02:00
import base64
import http.client
import sys
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from typing import TypedDict, Optional
2021-10-17 18:16:29 +02:00
def ptoRequest(ptoRef):
ts = datetime.now(timezone.utc).isoformat()
return f"""
2021-10-17 18:16:29 +02:00
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ent="http://structureadresseftth.axione.fr/model/entreprise" xmlns:com="http://structureadresseftth.axione.fr/model/commun">
<soapenv:Header/>
<soapenv:Body>
<ent:obtentionStructureAdresseDemandeSoap>
<ent:entete versionWS="3.0" horodatageRequete="{ts}">
<com:operateurCommercial nom="AQUILENET" identifiant=""/>
</ent:entete>
<ent:referenceAdresse referenceHexacle="" identifiantImmeuble="" referencePTO="{ptoRef}" referenceBAN="">
</ent:referenceAdresse>
</ent:obtentionStructureAdresseDemandeSoap>
</soapenv:Body>
</soapenv:Envelope>
"""
2021-10-17 18:16:29 +02:00
def query_axione_pto(cfg, ptoRef):
body = ptoRequest(ptoRef)
# Note: the password should be the base64 of username:password.
# Don't ask why.
passwd = base64.b64encode(f"{cfg.username}:{cfg.password}".encode("utf8")).decode(
"utf8"
)
2021-10-17 18:16:29 +02:00
headers = {
"User-Agent": "aquilenet-elig-test/0.1",
"Accept": "*/*",
"Accept-Encoding": "identity",
"Connection": "Keep-Alive",
"Authorization": passwd,
2021-10-17 18:16:29 +02:00
}
resp = None
if not cfg.debug:
try:
conn = http.client.HTTPSConnection(
"ws-eligftth-val.axione.fr",
443,
source_address=(cfg.source_addr, 0),
timeout=120,
)
2021-10-17 18:16:29 +02:00
conn.request("POST", "/v3/fai", body, headers=headers)
response = conn.getresponse()
respData = response.read()
except Exception as e:
print("Error while querying Axione: ", file=sys.stderr)
print(str(e), file=sys.stderr)
print("Query Body: ")
print(body)
print("Query Headers: ")
print(str(headers))
sys.exit(1)
finally:
conn.close()
else:
print("===================")
print("Injecting dummy response for request: ")
print("HEADERS: ")
print(headers)
print("BODY: ")
print(body)
print("===================")
with open("./fixtures/dummy-data-1.xml", "r") as f:
2021-10-17 18:16:29 +02:00
dummyData = f.read()
return dummyData
return resp
class LigneResult(TypedDict):
actif: str
commercialisable: str
existant: str
raccordable: str
rompu: str
pbo: str
pto: str
class EtageResult(TypedDict):
reference: str
nbLignesActives: str
nbLignesExistantes: str
nbLocauxFtth: str
lignes: list[LigneResult]
class BatimentResult(TypedDict):
etatBatiment: str
identifiantImmeuble: str
referenceBatiment: str
etages: list[EtageResult]
def parse_response(resp_str) -> list[BatimentResult]:
root = ET.fromstring(resp_str)
parsedBatiments = [
parse_batiment(b)
for b in root.findall(
".//{http://structureadresseftth.axione.fr/model/commun}batiment"
)
]
return parsedBatiments
def parse_batiment(batiment) -> BatimentResult:
etatBatiment = batiment.get("etatBatiment", None)
identifiantImmeuble = batiment.get("identifiantImmeuble", None)
referenceBatiment = batiment.get("referenceBatiment", None)
etages = [
parse_etage(e)
for e in batiment.findall(
".//{http://structureadresseftth.axione.fr/model/commun}etage"
)
]
return {
"etatBatiment": etatBatiment,
"identifiantImmeuble": identifiantImmeuble,
"referenceBatiment": referenceBatiment,
"etages": etages,
}
def parse_etage(etage) -> EtageResult:
reference = etage.get("reference", None)
nbLignesActives = etage.get("nombreLignesActives", None)
nbLignesExistantes = etage.get("nombreLignesExistantes", None)
nbLocauxFtth = etage.get("nombreLocauxFTTH", None)
lignes: list[LigneResult] = []
for ligne in etage.findall(
".//{http://structureadresseftth.axione.fr/model/commun}ligneFTTH"
):
pl = parse_ligne(ligne)
if pl is not None:
lignes.append(pl)
return {
"reference": reference,
"nbLignesActives": nbLignesActives,
"nbLignesExistantes": nbLignesExistantes,
"nbLocauxFtth": nbLocauxFtth,
"lignes": lignes,
}
def parse_ligne(ligne) -> Optional[LigneResult]:
statut = ligne.find(
".//{http://structureadresseftth.axione.fr/model/commun}statutLigneFTTH"
)
if statut == None:
# This line does not have any interesting data for us.
return None
prise = ligne.find(".//{http://structureadresseftth.axione.fr/model/commun}prise")
refPto = ligne.find(
".//{http://structureadresseftth.axione.fr/model/commun}referencePTO"
)
actif = statut.get("actif", None)
commercialisable = statut.get("commercialisable", None)
existant = statut.get("existant", None)
raccordable = statut.get("raccordable", None)
rompu = statut.get("rompu", None)
pbo = prise.get("referencePBO", None)
pto = refPto.text.strip()
return {
"actif": actif,
"commercialisable": commercialisable,
"existant": existant,
"raccordable": raccordable,
"rompu": rompu,
"pbo": pbo,
"pto": pto,
}