Axione-IPE-Viewer/webapp/netwo/netwo.py

214 lines
8 KiB
Python
Raw Normal View History

2023-03-03 14:16:34 +01:00
from flask import Response
import requests
import time
import json
from urllib.parse import quote
2023-03-06 15:10:55 +01:00
from typing_extensions import NotRequired, TypedDict
2023-03-03 14:16:34 +01:00
from eligibility_api.elig_api_exceptions import NetwoApiErrorException
2023-03-06 15:10:55 +01:00
from ipe_fetcher import FAIEligibilityStatus
2023-03-03 14:16:34 +01:00
NETWO_DEPLOYED_STATUS = "Deployed"
2023-03-06 15:10:55 +01:00
class NetwooEligibility(TypedDict):
imb_info: NotRequired[dict]
eligDone: NotRequired[bool]
eligId: NotRequired[str]
nbOperatorsOk: NotRequired[int]
nbOperatorsErrors: NotRequired[int]
nbOperatorsPending: NotRequired[int]
totalOperators: NotRequired[int]
timeoutSec: NotRequired[int]
timeoutReached: NotRequired[bool]
eligOffers: NotRequired[dict]
2023-03-03 14:16:34 +01:00
class Netwo:
def __init__(self, netwo_api_key: str):
self.netwo_api_headers = {
"x-actor-slug": "aquilenet",
"x-api-key": netwo_api_key,
"Accept": "application/json",
}
2023-03-06 15:10:55 +01:00
def get_netwo_imb_coordinates(self, ref_imb: str) -> dict:
2023-03-03 14:16:34 +01:00
"""
:param ref_imb: ARCEP ref of immeuble
:return:
(elig_status: FAIEligibilityStatus, imb_lat: str, imb_lng: str)
"""
ref_imm_clean = quote(ref_imb, safe="")
response = requests.get(
f"https://api.netwo.io/api/v1/imb/{ref_imm_clean}",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200:
raise NetwoApiErrorException(
f"Could not GET netwo imb ref {ref_imb}", status_code
)
imb_payload = response.json()
2023-03-06 15:10:55 +01:00
return imb_payload
2023-03-03 14:16:34 +01:00
@staticmethod
def _filter_netwo_raw_elig_results(raw_elig: dict, search_ftto: bool) -> list:
filtered_elig = []
inf_search = ["ftth"]
if search_ftto:
inf_search.append("ftto")
for r in raw_elig.get("results"):
inf_type = r.get("infrastructure_type")
if inf_type not in inf_search:
continue
product_id = r.get("product_id")
operator = r.get("infrastructure_operator")
product = r.get("product_name")
for offer in r.get("entities"):
entity_id = offer.get("entity_id")
offer_name = offer.get("name")
debit = offer.get("debit") or 0.0
access_fee = offer.get("access_fee") or 0.00
recurring_price = offer.get("recurring_price") or 0.00
commitment_duration = offer.get("commitment_duration") or 0
filtered_elig.append(
{
"entity_id": entity_id,
"product_id": product_id,
"product": f"{product} - {offer_name}",
"infrastructure_operator": operator,
"infrastructure_type": inf_type,
"debit": debit,
"access_fee": access_fee,
"recurring_price": recurring_price,
"commitment_duration": commitment_duration,
"per_month_price_one_year": round(
access_fee / 12 + recurring_price, 2
),
}
)
sort_elig = sorted(
filtered_elig, key=lambda x: x["per_month_price_one_year"], reverse=False
)
return sort_elig
2023-03-06 15:10:55 +01:00
def get_netwo_eligibility_results(self, elig_id: str, search_ftto: bool):
response = requests.get(
f"https://api.netwo.io/api/v1/eligibility/{elig_id}",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200:
raise NetwoApiErrorException(
f"Netwo API: Could not get eligibility results for ID {elig_id}",
status_code,
)
return self._filter_netwo_raw_elig_results(response.json(), search_ftto)
2023-03-03 14:16:34 +01:00
def start_netwo_eligibility(
self,
2023-03-06 15:10:55 +01:00
imb_info: str,
2023-03-03 14:16:34 +01:00
search_ftto: bool,
timeout_sec: None,
):
def event_stream():
netwo_elig = NetwooEligibility(
2023-03-06 15:10:55 +01:00
eligId="",
2023-03-03 14:16:34 +01:00
eligDone=False,
nbOperatorsOk=0,
nbOperatorsErrors=0,
nbOperatorsPending=0,
timeoutReached=False,
timeoutSec=timeout_sec,
eligOffers={},
)
2023-03-06 15:10:55 +01:00
json_data = {
"latitude": str(imb_info.get("lat")),
"longitude": str(imb_info.get("lng")),
}
print(json_data)
2023-03-03 14:16:34 +01:00
response = requests.post(
"https://api.netwo.io/api/v1/eligibility/preselect",
headers=self.netwo_api_headers,
json=json_data,
)
status_code = response.status_code
if status_code != 200:
2023-03-06 15:10:55 +01:00
print(f"raise preselect except {response.text}")
2023-03-03 14:16:34 +01:00
raise NetwoApiErrorException(
"Netwo API eligibility preselect step failed", status_code
)
resp = response.json()
default = resp.get("default")
default["offer_type"] = "enterprise"
default["market"] = "service_operator"
2023-03-06 15:10:55 +01:00
default["ftth_payload"] = {
"imb_ref": imb_info.get("imb_id"),
"pm_ref": imb_info.get("pm_id"),
}
2023-03-03 14:16:34 +01:00
response = requests.post(
"https://api.netwo.io/api/v1/eligibility",
headers=self.netwo_api_headers,
json=default,
)
status_code = response.status_code
if status_code != 201:
print(f"Error Could not start Netwo eligibility with body {default}")
raise NetwoApiErrorException(
"Netwo API: failed to start eligibility", status_code
)
id_elig = response.json().get("id")
2023-03-06 15:10:55 +01:00
netwo_elig["eligId"] = id_elig
2023-03-03 14:16:34 +01:00
is_done = False
timeout = None
if timeout_sec:
timeout = time.time() + timeout_sec
while is_done is False:
response = requests.get(
f"https://api.netwo.io/api/v1/eligibility/{id_elig}/status",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200:
print("raise elig status except")
raise NetwoApiErrorException(
f"Netwo API: Could not get eligibility status for ID {id_elig}",
status_code,
)
status_res = response.json()
netwo_elig["nbOperatorsOk"] = len(status_res.get("successes", []) or [])
netwo_elig["nbOperatorsErrors"] = len(
status_res.get("errors", []) or []
)
netwo_elig["nbOperatorsPending"] = len(
status_res.get("pending", []) or []
)
netwo_elig["totalOperators"] = (
netwo_elig["nbOperatorsOk"]
+ netwo_elig["nbOperatorsErrors"]
+ netwo_elig["nbOperatorsPending"]
)
if timeout and time.time() > timeout:
netwo_elig["timeoutReached"] = True
yield json.dumps(netwo_elig, indent=2)
break
else:
yield json.dumps(netwo_elig, indent=2)
if netwo_elig["nbOperatorsPending"] > 0:
time.sleep(1)
else:
is_done = True
2023-03-06 15:10:55 +01:00
netwo_elig["eligOffers"] = self.get_netwo_eligibility_results(
id_elig, search_ftto
2023-03-03 14:16:34 +01:00
)
netwo_elig["eligDone"] = True
2023-03-06 15:10:55 +01:00
netwo_elig["imb_info"] = imb_info
2023-03-03 14:16:34 +01:00
yield json.dumps(netwo_elig, indent=2)
return Response(event_stream(), mimetype="text/event-stream")