Axione-IPE-Viewer/webapp/netwo/netwo.py

294 lines
11 KiB
Python

import json
import time
from urllib.parse import quote
import requests
from flask import Response
from typing_extensions import NotRequired, TypedDict
from eligibility_api.elig_api_exceptions import NetwoApiErrorException
NETWO_DEPLOYED_STATUS = "Deployed"
NETWO_NOT_FOUND_STATUS = "not_found"
TVA_INCREASE_COEFF = 1.2
class NetwooEligibility(TypedDict):
imb_info: NotRequired[dict]
eligDone: NotRequired[bool]
eligId: NotRequired[str]
nbOperatorsOk: NotRequired[int]
nbOperatorsErrors: NotRequired[int]
nbOperatorsPending: NotRequired[int]
totalOperators: NotRequired[int]
timeoutSec: NotRequired[int]
timeoutReached: NotRequired[bool]
eligOffers: NotRequired[dict]
class Netwo:
def __init__(self, netwo_api_key: str, aquilenet_fixed_recurring_price: float):
self.netwo_api_headers = {
"x-actor-slug": "aquilenet",
"x-api-key": netwo_api_key,
"Accept": "application/json",
}
self.aquilenet_fixed_recurring_price = aquilenet_fixed_recurring_price
def get_netwo_imb_coordinates(self, ref_imb: str) -> dict:
"""
:param ref_imb: ARCEP ref of immeuble
:return:
(elig_status: FAIEligibilityStatus, imb_lat: str, imb_lng: str)
"""
ref_imm_clean = quote(ref_imb, safe="")
response = requests.get(
f"https://api.netwo.io/api/v1/imb/{ref_imm_clean}",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200 or not response.json():
return {"imb_status": NETWO_NOT_FOUND_STATUS, "imb_id": ref_imb}
imb_payload = response.json()
return imb_payload
def _get_netwo_product_entities_details(self, elig_id: str, product_id: str):
response = requests.get(
f"https://api.netwo.io/api/v1/eligibility/{elig_id}/details/{product_id}",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200 or not response.json():
print(
f"Error: could not get details for elig_id {elig_id} and product id {product_id}"
)
return {}
details = response.json()
return details.get("entities") or []
def _filter_netwo_raw_elig_results(
self,
elig_id: str,
raw_elig: dict,
search_ftto: bool,
netwo_offers: list,
processed_products: list,
) -> list:
inf_search = ["ftth"]
if search_ftto:
inf_search.append("ftto")
for r in raw_elig.get("results"):
inf_type = r.get("infrastructure_type")
if inf_type not in inf_search:
continue
product_id = r.get("product_id")
if product_id in processed_products:
continue
processed_products.append(product_id)
prod_entities = self._get_netwo_product_entities_details(
elig_id, product_id
)
operator = r.get("infrastructure_operator")
product = r.get("product_name")
for offer in r.get("entities"):
entity_id = offer.get("entity_id")
offer_name = offer.get("name")
debit = offer.get("debit") or 0.0
access_fee = offer.get("access_fee") or 0.00
recurring_price = offer.get("recurring_price") or 0.00
commitment_duration = offer.get("commitment_duration") or 0
access_fee_ttc = round(
access_fee * TVA_INCREASE_COEFF,
)
total_recurring_price_ttc = round(
recurring_price * TVA_INCREASE_COEFF
+ self.aquilenet_fixed_recurring_price,
2,
)
offer_info = {
"entity_id": entity_id,
"product_id": product_id,
"product": f"{product} - {offer_name}",
"infrastructure_operator": operator,
"infrastructure_type": inf_type,
"debit": debit,
"access_fee": access_fee,
"access_fee_ttc": access_fee_ttc,
"recurring_price": recurring_price,
"total_recurring_price_ttc": total_recurring_price_ttc,
"commitment_duration": commitment_duration,
"per_month_price_one_year_ttc": round(
access_fee_ttc / 12 + total_recurring_price_ttc, 2
),
}
search_entity = [
i for i in prod_entities if i.get("entity_id") == entity_id
]
if search_entity:
entity_details = search_entity[0]
offer_info[
"broadband_network_gateway_protocol"
] = entity_details.get("broadband_network_gateway_protocol")
offer_info["debit_up_max"] = entity_details.get("debit_up_max")
offer_info["interface_type"] = entity_details.get("interface_type")
offer_info["tariff_zone"] = entity_details.get("tariff_zone")
offer_info["collection_region"] = entity_details.get(
"collection_region"
)
offer_info["delivery_protocol"] = entity_details.get(
"delivery_protocol"
)
offer_info["mtu"] = entity_details.get("mtu")
netwo_offers.append(offer_info)
sort_elig = sorted(
netwo_offers,
key=lambda x: x["per_month_price_one_year_ttc"],
reverse=False,
)
return sort_elig
@staticmethod
def _prepare_event_string(data: dict):
return "data: %s\n\n" % json.dumps(data)
def get_netwo_eligibility_results(
self, elig_id: str, search_ftto: bool, netwo_offers, processed_products
):
response = requests.get(
f"https://api.netwo.io/api/v1/eligibility/{elig_id}",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200:
raise NetwoApiErrorException(
f"Netwo API: Could not get eligibility results for ID {elig_id}",
status_code,
)
return self._filter_netwo_raw_elig_results(
elig_id, response.json(), search_ftto, netwo_offers, processed_products
)
def start_netwo_eligibility(
self,
imb_info: str,
search_ftto: bool,
timeout_sec: None,
):
def event_stream():
netwo_elig = NetwooEligibility(
eligId="",
eligDone=False,
nbOperatorsOk=0,
nbOperatorsErrors=0,
nbOperatorsPending=0,
totalOperators=0,
eligStatus={},
timeoutReached=False,
timeoutSec=timeout_sec,
eligOffers=[],
imb_info=imb_info,
)
if imb_info.get("imb_status") == NETWO_NOT_FOUND_STATUS:
netwo_elig["eligDone"] = True
yield "data: %s\n\n" % json.dumps(netwo_elig)
return
json_data = {
"latitude": str(imb_info.get("lat")),
"longitude": str(imb_info.get("lng")),
}
response = requests.post(
"https://api.netwo.io/api/v1/eligibility/preselect",
headers=self.netwo_api_headers,
json=json_data,
)
status_code = response.status_code
if status_code != 200:
print(f"raise preselect except {response.text}")
raise NetwoApiErrorException(
"Netwo API eligibility preselect step failed", status_code
)
resp = response.json()
default = resp.get("default")
default["offer_type"] = "enterprise"
default["market"] = "service_operator"
default["ftth_payload"] = {
"imb_ref": imb_info.get("imb_id"),
"pm_ref": imb_info.get("pm_id"),
}
response = requests.post(
"https://api.netwo.io/api/v1/eligibility",
headers=self.netwo_api_headers,
json=default,
)
status_code = response.status_code
if status_code != 201:
print(f"Error Could not start Netwo eligibility with body {default}")
raise NetwoApiErrorException(
"Netwo API: failed to start eligibility", status_code
)
id_elig = response.json().get("id")
netwo_elig["eligId"] = id_elig
is_done = False
timeout = None
if timeout_sec:
timeout = time.time() + timeout_sec
netwo_offers = []
processed_products = []
while is_done is False:
response = requests.get(
f"https://api.netwo.io/api/v1/eligibility/{id_elig}/status",
headers=self.netwo_api_headers,
)
status_code = response.status_code
if status_code != 200:
print("raise elig status except")
raise NetwoApiErrorException(
f"Netwo API: Could not get eligibility status for ID {id_elig}",
status_code,
)
status_res = response.json()
netwo_elig["eligStatus"] = status_res
netwo_elig["nbOperatorsOk"] = len(status_res.get("successes", []) or [])
netwo_elig["nbOperatorsErrors"] = len(
status_res.get("errors", []) or []
)
netwo_elig["nbOperatorsPending"] = len(
status_res.get("pending", []) or []
)
netwo_elig["totalOperators"] = (
netwo_elig["nbOperatorsOk"]
+ netwo_elig["nbOperatorsErrors"]
+ netwo_elig["nbOperatorsPending"]
)
netwo_offers = self.get_netwo_eligibility_results(
id_elig,
search_ftto,
netwo_offers,
processed_products,
)
netwo_elig["eligOffers"] = netwo_offers
if timeout and time.time() > timeout:
netwo_elig["timeoutReached"] = True
yield self._prepare_event_string(netwo_elig)
break
else:
yield self._prepare_event_string(netwo_elig)
if netwo_elig["nbOperatorsPending"] > 0:
time.sleep(0.6)
else:
is_done = True
netwo_elig["eligOffers"] = self.get_netwo_eligibility_results(
id_elig, search_ftto, netwo_offers, processed_products
)
netwo_elig["eligDone"] = True
yield self._prepare_event_string(netwo_elig)
return Response(event_stream(), mimetype="text/event-stream")