Axione-IPE-Viewer/webapp/netwo/netwo.py

211 lines
8 KiB
Python
Raw Normal View History

2023-03-03 14:16:34 +01:00
from flask import Response
import requests
import time
import json
from urllib.parse import quote
from eligibility_api.elig_api_exceptions import NetwoApiErrorException
from ipe_fetcher import FAIEligibilityStatus, NetwooEligibility
NETWO_DEPLOYED_STATUS = "Deployed"
class Netwo:
def __init__(self, netwo_api_key: str):
self.netwo_api_headers = {
"x-actor-slug": "aquilenet",
"x-api-key": netwo_api_key,
"Accept": "application/json",
}
def get_netwo_imb_coordinates(
self, ref_imb: str
) -> (FAIEligibilityStatus, str, str):
"""
:param ref_imb: ARCEP ref of immeuble
:return:
(elig_status: FAIEligibilityStatus, imb_lat: str, imb_lng: str)
"""
ref_imm_clean = quote(ref_imb, safe="")
response = requests.get(
f"https://api.netwo.io/api/v1/imb/{ref_imm_clean}",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200:
raise NetwoApiErrorException(
f"Could not GET netwo imb ref {ref_imb}", status_code
)
imb_payload = response.json()
imb_status = imb_payload.get("imb_status")
elig_status = FAIEligibilityStatus(
isEligible=imb_status == NETWO_DEPLOYED_STATUS,
ftthStatus=imb_status,
reasonNotEligible="",
)
if imb_status != NETWO_DEPLOYED_STATUS:
elig_status[
"reasonNotEligible"
] = f'Ftth not yet deployed in immeuble ref {ref_imb} (PM status: {imb_payload.get("pm_status")})'
lat = str(response.json().get("lat"))
lng = str(response.json().get("lng"))
return elig_status, lat, lng
@staticmethod
def _filter_netwo_raw_elig_results(raw_elig: dict, search_ftto: bool) -> list:
filtered_elig = []
inf_search = ["ftth"]
if search_ftto:
inf_search.append("ftto")
for r in raw_elig.get("results"):
inf_type = r.get("infrastructure_type")
print(f'{r.get("infrastructure_operator")} : {inf_type}')
if inf_type not in inf_search:
continue
elig_id = r.get("eligibility_id")
product_id = r.get("product_id")
operator = r.get("infrastructure_operator")
product = r.get("product_name")
for offer in r.get("entities"):
entity_id = offer.get("entity_id")
offer_name = offer.get("name")
debit = offer.get("debit") or 0.0
access_fee = offer.get("access_fee") or 0.00
recurring_price = offer.get("recurring_price") or 0.00
commitment_duration = offer.get("commitment_duration") or 0
filtered_elig.append(
{
"eligibility_id": elig_id,
"entity_id": entity_id,
"product_id": product_id,
"product": f"{product} - {offer_name}",
"infrastructure_operator": operator,
"infrastructure_type": inf_type,
"debit": debit,
"access_fee": access_fee,
"recurring_price": recurring_price,
"commitment_duration": commitment_duration,
"per_month_price_one_year": round(
access_fee / 12 + recurring_price, 2
),
}
)
sort_elig = sorted(
filtered_elig, key=lambda x: x["per_month_price_one_year"], reverse=False
)
return sort_elig
def start_netwo_eligibility(
self,
imb_lat: str,
imb_long: str,
elig_status: FAIEligibilityStatus,
search_ftto: bool,
timeout_sec: None,
):
def event_stream():
netwo_elig = NetwooEligibility(
eligStatus=elig_status,
eligDone=False,
nbOperatorsOk=0,
nbOperatorsErrors=0,
nbOperatorsPending=0,
timeoutReached=False,
timeoutSec=timeout_sec,
eligOffers={},
)
json_data = {"latitude": imb_lat, "longitude": imb_long}
response = requests.post(
"https://api.netwo.io/api/v1/eligibility/preselect",
headers=self.netwo_api_headers,
json=json_data,
)
status_code = response.status_code
if status_code != 200:
print("raise preselect except")
raise NetwoApiErrorException(
"Netwo API eligibility preselect step failed", status_code
)
resp = response.json()
default = resp.get("default")
default["offer_type"] = "enterprise"
default["market"] = "service_operator"
response = requests.post(
"https://api.netwo.io/api/v1/eligibility",
headers=self.netwo_api_headers,
json=default,
)
status_code = response.status_code
if status_code != 201:
print(f"Error Could not start Netwo eligibility with body {default}")
raise NetwoApiErrorException(
"Netwo API: failed to start eligibility", status_code
)
id_elig = response.json().get("id")
is_done = False
timeout = None
if timeout_sec:
timeout = time.time() + timeout_sec
while is_done is False:
response = requests.get(
f"https://api.netwo.io/api/v1/eligibility/{id_elig}/status",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200:
print("raise elig status except")
raise NetwoApiErrorException(
f"Netwo API: Could not get eligibility status for ID {id_elig}",
status_code,
)
status_res = response.json()
netwo_elig["nbOperatorsOk"] = len(status_res.get("successes", []) or [])
netwo_elig["nbOperatorsErrors"] = len(
status_res.get("errors", []) or []
)
netwo_elig["nbOperatorsPending"] = len(
status_res.get("pending", []) or []
)
netwo_elig["totalOperators"] = (
netwo_elig["nbOperatorsOk"]
+ netwo_elig["nbOperatorsErrors"]
+ netwo_elig["nbOperatorsPending"]
)
if timeout and time.time() > timeout:
netwo_elig["timeoutReached"] = True
yield json.dumps(netwo_elig, indent=2)
break
else:
yield json.dumps(netwo_elig, indent=2)
if netwo_elig["nbOperatorsPending"] > 0:
time.sleep(1)
else:
is_done = True
response = requests.get(
f"https://api.netwo.io/api/v1/eligibility/{id_elig}",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200:
print("raise elig res except")
raise NetwoApiErrorException(
f"Netwo API: Could not get eligibility results for ID {id_elig}",
status_code,
)
netwo_elig["eligOffers"] = self._filter_netwo_raw_elig_results(
response.json(), search_ftto
)
netwo_elig["eligDone"] = True
yield json.dumps(netwo_elig, indent=2)
return Response(event_stream(), mimetype="text/event-stream")