import json import time from urllib.parse import quote import requests from flask import Response from typing_extensions import NotRequired, TypedDict from eligibility_api.elig_api_exceptions import NetwoApiErrorException NETWO_DEPLOYED_STATUS = "Deployed" NETWO_NOT_FOUND_STATUS = "not_found" TVA_INCREASE_COEFF = 1.2 class NetwooEligibility(TypedDict): imb_info: NotRequired[dict] eligDone: NotRequired[bool] eligId: NotRequired[str] nbOperatorsOk: NotRequired[int] nbOperatorsErrors: NotRequired[int] nbOperatorsPending: NotRequired[int] totalOperators: NotRequired[int] timeoutSec: NotRequired[int] timeoutReached: NotRequired[bool] eligOffers: NotRequired[dict] class Netwo: def __init__(self, netwo_api_key: str, aquilenet_fixed_recurring_price: float): self.netwo_api_headers = { "x-actor-slug": "aquilenet", "x-api-key": netwo_api_key, "Accept": "application/json", } self.aquilenet_fixed_recurring_price = aquilenet_fixed_recurring_price def get_netwo_imb_coordinates(self, ref_imb: str) -> dict: """ :param ref_imb: ARCEP ref of immeuble :return: (elig_status: FAIEligibilityStatus, imb_lat: str, imb_lng: str) """ ref_imm_clean = quote(ref_imb, safe="") response = requests.get( f"https://api.netwo.io/api/v1/imb/{ref_imm_clean}", headers=self.netwo_api_headers, ) status_code = response.status_code if status_code != 200 or not response.json(): return {"imb_status": NETWO_NOT_FOUND_STATUS, "imb_id": ref_imb} imb_payload = response.json() return imb_payload def _get_netwo_product_entities_details(self, elig_id: str, product_id: str): response = requests.get( f"https://api.netwo.io/api/v1/eligibility/{elig_id}/details/{product_id}", headers=self.netwo_api_headers, ) status_code = response.status_code if status_code != 200 or not response.json(): print( f"Error: could not get details for elig_id {elig_id} and product id {product_id}" ) return {} details = response.json() return details.get("entities") or [] def _filter_netwo_raw_elig_results( self, elig_id: str, raw_elig: dict, search_ftto: bool, netwo_offers: list, processed_products: list, ) -> list: inf_search = ["ftth"] if search_ftto: inf_search.append("ftto") for r in raw_elig.get("results"): inf_type = r.get("infrastructure_type") if inf_type not in inf_search: continue product_id = r.get("product_id") if product_id in processed_products: continue processed_products.append(product_id) prod_entities = self._get_netwo_product_entities_details( elig_id, product_id ) operator = r.get("infrastructure_operator") product = r.get("product_name") for offer in r.get("entities"): entity_id = offer.get("entity_id") offer_name = offer.get("name") debit = offer.get("debit") or 0.0 access_fee = offer.get("access_fee") or 0.00 recurring_price = offer.get("recurring_price") or 0.00 commitment_duration = offer.get("commitment_duration") or 0 access_fee_ttc = round( access_fee * TVA_INCREASE_COEFF, ) total_recurring_price_ttc = round( recurring_price * TVA_INCREASE_COEFF + self.aquilenet_fixed_recurring_price, 2, ) offer_info = { "entity_id": entity_id, "product_id": product_id, "product": f"{product} - {offer_name}", "infrastructure_operator": operator, "infrastructure_type": inf_type, "debit": debit, "access_fee": access_fee, "access_fee_ttc": access_fee_ttc, "recurring_price": recurring_price, "total_recurring_price_ttc": total_recurring_price_ttc, "commitment_duration": commitment_duration, "per_month_price_one_year_ttc": round( access_fee_ttc / 12 + total_recurring_price_ttc, 2 ), } search_entity = [ i for i in prod_entities if i.get("entity_id") == entity_id ] if search_entity: entity_details = search_entity[0] offer_info[ "broadband_network_gateway_protocol" ] = entity_details.get("broadband_network_gateway_protocol") offer_info["debit_up_max"] = entity_details.get("debit_up_max") offer_info["interface_type"] = entity_details.get("interface_type") offer_info["tariff_zone"] = entity_details.get("tariff_zone") offer_info["collection_region"] = entity_details.get( "collection_region" ) offer_info["delivery_protocol"] = entity_details.get( "delivery_protocol" ) offer_info["mtu"] = entity_details.get("mtu") netwo_offers.append(offer_info) sort_elig = sorted( netwo_offers, key=lambda x: x["per_month_price_one_year_ttc"], reverse=False, ) return sort_elig @staticmethod def _prepare_event_string(data: dict): return "data: %s\n\n" % json.dumps(data) def get_netwo_eligibility_results( self, elig_id: str, search_ftto: bool, netwo_offers, processed_products ): response = requests.get( f"https://api.netwo.io/api/v1/eligibility/{elig_id}", headers=self.netwo_api_headers, ) status_code = response.status_code if status_code != 200: raise NetwoApiErrorException( f"Netwo API: Could not get eligibility results for ID {elig_id}", status_code, ) return self._filter_netwo_raw_elig_results( elig_id, response.json(), search_ftto, netwo_offers, processed_products ) def start_netwo_eligibility( self, imb_info: str, search_ftto: bool, timeout_sec: None, ): def event_stream(): netwo_elig = NetwooEligibility( eligId="", eligDone=False, nbOperatorsOk=0, nbOperatorsErrors=0, nbOperatorsPending=0, eligStatus={}, timeoutReached=False, timeoutSec=timeout_sec, eligOffers={}, imb_info=imb_info, ) if imb_info.get("imb_status") != NETWO_DEPLOYED_STATUS: netwo_elig["eligDone"] = True yield "data: %s\n\n" % json.dumps(netwo_elig) return json_data = { "latitude": str(imb_info.get("lat")), "longitude": str(imb_info.get("lng")), } response = requests.post( "https://api.netwo.io/api/v1/eligibility/preselect", headers=self.netwo_api_headers, json=json_data, ) status_code = response.status_code if status_code != 200: print(f"raise preselect except {response.text}") raise NetwoApiErrorException( "Netwo API eligibility preselect step failed", status_code ) resp = response.json() default = resp.get("default") default["offer_type"] = "enterprise" default["market"] = "service_operator" default["ftth_payload"] = { "imb_ref": imb_info.get("imb_id"), "pm_ref": imb_info.get("pm_id"), } response = requests.post( "https://api.netwo.io/api/v1/eligibility", headers=self.netwo_api_headers, json=default, ) status_code = response.status_code if status_code != 201: print(f"Error Could not start Netwo eligibility with body {default}") raise NetwoApiErrorException( "Netwo API: failed to start eligibility", status_code ) id_elig = response.json().get("id") netwo_elig["eligId"] = id_elig is_done = False timeout = None if timeout_sec: timeout = time.time() + timeout_sec netwo_offers = [] processed_products = [] while is_done is False: response = requests.get( f"https://api.netwo.io/api/v1/eligibility/{id_elig}/status", headers=self.netwo_api_headers, ) status_code = response.status_code if status_code != 200: print("raise elig status except") raise NetwoApiErrorException( f"Netwo API: Could not get eligibility status for ID {id_elig}", status_code, ) status_res = response.json() netwo_elig["eligStatus"] = status_res netwo_elig["nbOperatorsOk"] = len(status_res.get("successes", []) or []) netwo_elig["nbOperatorsErrors"] = len( status_res.get("errors", []) or [] ) netwo_elig["nbOperatorsPending"] = len( status_res.get("pending", []) or [] ) netwo_elig["totalOperators"] = ( netwo_elig["nbOperatorsOk"] + netwo_elig["nbOperatorsErrors"] + netwo_elig["nbOperatorsPending"] ) netwo_offers = self.get_netwo_eligibility_results( id_elig, search_ftto, netwo_offers, processed_products, ) netwo_elig["eligOffers"] = netwo_offers if timeout and time.time() > timeout: netwo_elig["timeoutReached"] = True yield self._prepare_event_string(netwo_elig) break else: yield self._prepare_event_string(netwo_elig) if netwo_elig["nbOperatorsPending"] > 0: time.sleep(0.6) else: is_done = True netwo_elig["eligOffers"] = self.get_netwo_eligibility_results( id_elig, search_ftto, netwo_offers, processed_products ) netwo_elig["eligDone"] = True yield self._prepare_event_string(netwo_elig) return Response(event_stream(), mimetype="text/event-stream")