python microservice

This commit is contained in:
moxitech 2024-10-24 14:42:17 +07:00
parent 4871f9e5b9
commit 4d8cde2032
5 changed files with 474546 additions and 0 deletions

View File

@ -0,0 +1 @@
.venv

View File

@ -0,0 +1,16 @@
# Используем официальный образ Python
FROM python:3.12-slim
# Устанавливаем рабочую директорию внутри контейнера
WORKDIR /app
# Копируем requirements в контейнер и устанавливаем зависимости
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Копируем весь код в контейнер
COPY . .
# Задаем переменные окружения для лучшего управления
ENV PYTHONUNBUFFERED=1
# Команда для запуска приложения
CMD ["python", "main.py"]

View File

@ -0,0 +1,133 @@
import numpy as np
import json
import pywavefront
import asyncio
import websockets
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
app = FastAPI()
class SimulationRequest(BaseModel):
map_obj_path: str
drones_data: str
base_stations_data: str
simulation_time: int
websocket_url: str
class DroneNetworkSimulator:
def __init__(self, map_obj_path, drones_data, base_stations_data, simulation_time, websocket_url):
self.map_data = self.load_map(map_obj_path)
self.drones = self.load_drones(drones_data)
self.base_stations = self.load_base_stations(base_stations_data)
self.simulation_time = simulation_time
self.websocket_url = websocket_url
self.time_series_data = []
def load_map(self, map_obj_path):
# Используем pywavefront для загрузки .obj карты
scene = pywavefront.Wavefront(map_obj_path, collect_faces=True)
vertices = np.array(scene.vertices)
# Извлекаем высоты и используем минимальное количество вершин для представления высотной карты
z_values = vertices[:, 2]
grid_size = int(np.sqrt(len(z_values)))
map_data = z_values[:grid_size ** 2].reshape((grid_size, grid_size)) # Формируем карту с учетом размера данных
return map_data
def load_drones(self, drones_data):
return json.loads(drones_data)
def load_base_stations(self, base_stations_data):
return json.loads(base_stations_data)
async def simulate(self):
async with websockets.connect(self.websocket_url) as websocket:
for t in range(self.simulation_time):
current_state = []
for drone in self.drones:
self.update_drone_position(drone)
network_state = self.calculate_network_state(drone)
current_state.append({
"drone_id": drone["id"],
"position": drone["position"],
"network_state": network_state,
"connected": network_state["connected_base_station"] is not None,
"network_speed": self.calculate_network_speed(network_state["signal_strength"])
})
self.time_series_data.append({"time": t, "state": current_state})
await websocket.send(json.dumps({"time": t, "state": current_state}))
def update_drone_position(self, drone):
# Обновление позиции дрона в зависимости от целевых точек следования
if "waypoints" in drone and drone["waypoints"]:
target = np.array(drone["waypoints"][0])
current_position = np.array(drone["position"])
direction = target - current_position
distance = np.linalg.norm(direction)
if distance <= drone.get("speed", 1):
# Достигнута целевая точка
drone["position"] = target.tolist()
drone["waypoints"].pop(0) # Удаляем достигнутую точку
else:
# Двигаемся в направлении целевой точки
direction = direction / distance # Нормализуем направление
new_position = current_position + direction * drone.get("speed", 1)
drone["position"] = new_position.tolist()
def calculate_network_state(self, drone):
# Рассчитываем состояние сети, учитывая препятствия, частоту и радиус базовых станций
position = np.array(drone["position"])
frequency = drone.get("frequency", 2.4) # Частота в GHz, по умолчанию 2.4 GHz
path_loss_exponent = drone.get("path_loss_exponent", 2.5) # Показатель затухания, заданный в JSON
obstacles_factor = self.calculate_obstacles_factor(position)
distance = np.linalg.norm(position)
signal_strength = 100 / (1 + (distance ** path_loss_exponent) * obstacles_factor * (1 / frequency))
# Проверяем подключение к базовой станции
connected_base_station = None
for base_station in self.base_stations:
base_position = np.array(base_station["position"])
base_radius = base_station.get("radius", 50) # Радиус действия базовой станции по умолчанию 50
base_distance = np.linalg.norm(position - base_position)
if base_distance <= base_radius:
connected_base_station = base_station["id"]
break
return {
"signal_strength": signal_strength,
"connected_base_station": connected_base_station
}
def calculate_network_speed(self, signal_strength):
# Рассчитываем скорость сети в зависимости от уровня сигнала (упрощенная модель)
max_speed = 100.0 # Максимальная скорость сети в Mbps
return max_speed * (signal_strength / 100)
def calculate_obstacles_factor(self, position):
# Фактор, учитывающий препятствия на карте, влияющие на затухание
x, y, z = int(position[0]), int(position[1]), int(position[2])
if 0 <= x < self.map_data.shape[0] and 0 <= y < self.map_data.shape[1]:
height_at_point = self.map_data[x, y]
if z < height_at_point:
# Если дрон находится ниже препятствия, затухание выше
return 5.0
return 1.0
@app.post("/start_simulation")
async def start_simulation(request: SimulationRequest):
try:
simulator = DroneNetworkSimulator(
request.map_obj_path,
request.drones_data,
request.base_stations_data,
request.simulation_time,
request.websocket_url
)
await simulator.simulate()
return {"status": "Simulation started and results are being sent via WebSocket."}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=10000)

View File

@ -0,0 +1,2 @@
# Blender 4.2.3 LTS MTL File: 'None'
# www.blender.org

File diff suppressed because it is too large Load Diff