[USS][Desconflito] Código Exemplo
Exemplo de código Provedor de Desconflito (Go e Python): https://github.com/dp-icea/scd_provider
Para realizar autenticação:
- Verificar se está gerando o token
Para criar OIR:
- Enviar GeoJSON da área desejada
app.py
from flask import Flask, request
from dss import Dss
import json
app = Flask(__name__)
database = {}
dss = Dss()
@app.route('/uss/v1/operational_intents/<operational_intent_id>', methods=['GET'])
def get_oir(operational_intent_id):
print(database)
if operational_intent_id not in database:
return {"msg": 'No such OIR'}, 404
return json.dumps(database[operational_intent_id], default=lambda o: o.__dict__)
@app.route('/injection/oir', methods=['PUT'])
def inject_oir():
volume = request.get_json()
try:
dss.conflict_manager.check_restrictions(volume)
dss.scd.check_strategic_conflicts(volume)
oir = {}
oir["operational_intent"] = {}
oir["operational_intent"]["reference"] = dss.scd.put_operational_intent(volume)
oir["operational_intent"]["details"] = {
"volumes": [],
"off_nominal_volumes": [],
"priority": 0
}
oir["operational_intent"]["details"]["volumes"].append(volume)
database[oir["operational_intent"]["reference"]['id']] = oir
except Exception as ex:
print(f"Erro na criação: {ex}")
return {"msg": "Erro"}, 400
return {"Success": True}, 201
if __name__ == '__main__':
app.run(port=5050)
dss.py
from conflict_manager import ConflictManager
from scd import Scd
class Dss:
def __init__(self) -> None:
self.conflict_manager = ConflictManager()
self.scd = Scd()
scd.py
import requests
import uuid
from env import USS_BASE_URL, DSS_HOST
class Scd:
def __init__(self) -> None:
self.auth()
def auth(self):
url = "http://kong.icea.decea.mil.br:64235/token?grant_type=client_credentials&intended_audience=localhost&issuer=localhost&scope={0}"
self.strategic_coordination = requests.get(url.format("utm.strategic_coordination")).json()["access_token"]
def check_strategic_conflicts(self, volume):
url = DSS_HOST + "/dss/v1/operational_intent_references/query"
body = {
"area_of_interest": volume
}
header = {"authorization": f"Bearer {self.strategic_coordination}"}
response = requests.post(url, headers=header, json=body).json()
if (len(response['operational_intent_references']) > 0):
raise Exception(f"Interseção com outra Intenção {response['operational_intent_references'][0]['id']}")
else:
print("Sem intenções para o volume")
def put_operational_intent(self, volume):
id = str(uuid.uuid4())
url = DSS_HOST + f"/dss/v1/operational_intent_references/{id}"
body = {
"flight_type": "VLOS",
"extents": [volume],
"key": [],
"state": "Accepted",
"uss_base_url": USS_BASE_URL,
"new_subscription": {
"uss_base_url": USS_BASE_URL,
"notify_for_constraint": False
}
}
print(body)
header = {"authorization": f"Bearer {self.strategic_coordination}"}
response = requests.put(url, headers=header, json=body).json()
print(f"OIR criada com id: {id}")
print(response)
return response['operational_intent_reference']
conflict_manager.py
import requests
import uuid
from env import USS_BASE_URL, DSS_HOST
class Scd:
def __init__(self) -> None:
self.auth()
def auth(self):
url = "http://kong.icea.decea.mil.br:64235/token?grant_type=client_credentials&intended_audience=localhost&issuer=localhost&scope={0}"
self.strategic_coordination = requests.get(url.format("utm.strategic_coordination")).json()["access_token"]
def check_strategic_conflicts(self, volume):
url = DSS_HOST + "/dss/v1/operational_intent_references/query"
body = {
"area_of_interest": volume
}
header = {"authorization": f"Bearer {self.strategic_coordination}"}
response = requests.post(url, headers=header, json=body).json()
if (len(response['operational_intent_references']) > 0):
raise Exception(f"Interseção com outra Intenção {response['operational_intent_references'][0]['id']}")
else:
print("Sem intenções para o volume")
def put_operational_intent(self, volume):
id = str(uuid.uuid4())
url = DSS_HOST + f"/dss/v1/operational_intent_references/{id}"
body = {
"flight_type": "VLOS",
"extents": [volume],
"key": [],
"state": "Accepted",
"uss_base_url": USS_BASE_URL,
"new_subscription": {
"uss_base_url": USS_BASE_URL,
"notify_for_constraint": False
}
}
print(body)
header = {"authorization": f"Bearer {self.strategic_coordination}"}
response = requests.put(url, headers=header, json=body).json()
print(f"OIR criada com id: {id}")
print(response)
return response['operational_intent_reference']