From 69399f49a5c7351d8d1180ce55f5c32a5f964471 Mon Sep 17 00:00:00 2001 From: moxitech Date: Thu, 7 Nov 2024 19:20:50 +0700 Subject: [PATCH] nats queue service --- docker-compose.yaml | 43 ++- nginx.conf | 24 +- src/microservices/hub/Dockerfile | 9 + src/microservices/hub/app/cmd/handlers.go | 1 + src/microservices/hub/app/cmd/hub.go | 1 + src/microservices/hub/{ => app}/cmd/main.go | 258 +++++++++--------- src/microservices/hub/app/cmd/models.go | 1 + src/microservices/hub/{ => app}/docs/DTO.md | 0 src/microservices/hub/{ => app}/go.mod | 9 +- src/microservices/hub/app/go.sum | 77 ++++++ .../hub/app/internal/nats/nats_queue.go | 35 +++ src/microservices/hub/go.sum | 33 --- src/microservices/publisher/Dockerfile | 12 - src/microservices/publisher/cmd/main.go | 144 ---------- src/microservices/publisher/go.mod | 5 - src/microservices/publisher/go.sum | 2 - src/microservices/query/cmd/main.go | 16 ++ src/microservices/query/go.mod | 12 + src/microservices/query/go.sum | 12 + .../query/internal/nats/nats_queue.go | 35 +++ 20 files changed, 381 insertions(+), 348 deletions(-) create mode 100644 src/microservices/hub/Dockerfile create mode 100644 src/microservices/hub/app/cmd/handlers.go create mode 100644 src/microservices/hub/app/cmd/hub.go rename src/microservices/hub/{ => app}/cmd/main.go (50%) create mode 100644 src/microservices/hub/app/cmd/models.go rename src/microservices/hub/{ => app}/docs/DTO.md (100%) rename src/microservices/hub/{ => app}/go.mod (69%) create mode 100644 src/microservices/hub/app/go.sum create mode 100644 src/microservices/hub/app/internal/nats/nats_queue.go delete mode 100644 src/microservices/hub/go.sum delete mode 100644 src/microservices/publisher/Dockerfile delete mode 100644 src/microservices/publisher/cmd/main.go delete mode 100644 src/microservices/publisher/go.mod delete mode 100644 src/microservices/publisher/go.sum create mode 100644 src/microservices/query/cmd/main.go create mode 100644 src/microservices/query/go.mod create mode 100644 src/microservices/query/go.sum create mode 100644 src/microservices/query/internal/nats/nats_queue.go diff --git a/docker-compose.yaml b/docker-compose.yaml index d55e7a9..3ce8b78 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -6,6 +6,7 @@ networks: services: + # For ended simulations mongo: image: mongo container_name: dns-mongo-db @@ -20,7 +21,7 @@ services: volumes: - mongodata:/data/db restart: always - + # For user management and etc postgres: image: postgres:13 container_name: dns-postgres-db @@ -35,7 +36,7 @@ services: networks: - dns_net restart: always - + # For web nginx: image: nginx:alpine container_name: dns-nginx @@ -49,7 +50,7 @@ services: depends_on: - front restart: always - + # Api server - For kernel communication server: container_name: dns-server build: @@ -64,19 +65,21 @@ services: networks: - dns_net restart: always - - publisher: - container_name: dns-publisher + # Socket hub - For websocket communication + hub: + container_name: dns-hub build: - context: ./src/microservices/publisher + context: ./src/microservices/hub dockerfile: Dockerfile env_file: ".env" ports: - - "${SOCKET_BASE_ADDRESS}:${SOCKET_BASE_ADDRESS}" + - "10000:10000" + depends_on: + - nats networks: - dns_net restart: always - + # Frontend - For web ui front: container_name: dns-ui build: @@ -92,7 +95,27 @@ services: networks: - dns_net restart: always - + # For messaging + nats: + image: nats + container_name: dns-nats + ports: + - "8222:8222" + - "6222:6222" + command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --http_port 8222 " + networks: + - dns_net + restart: always + nats-1: + image: nats + container_name: dns-nats-cluster-1 + command: "--cluster_name NATS --cluster nats://0.0.0.0:6222 --routes=nats://ruser:T0pS3cr3t@nats:6222" + networks: + - dns_net + depends_on: ["nats"] + restart: always + + volumes: postgres_data: mongodata: diff --git a/nginx.conf b/nginx.conf index 14e55d0..55625bc 100644 --- a/nginx.conf +++ b/nginx.conf @@ -29,17 +29,17 @@ http { proxy_set_header X-Forwarded-Proto $scheme; } } - server { - listen 80; - server_name ws.localhost; - # add_header 'Access-Control-Allow-Origin' 'http://socket' always; + # server { + # listen 80; + # server_name ws.localhost; + # # add_header 'Access-Control-Allow-Origin' 'http://socket' always; - location / { - proxy_pass http://socket:9091; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - } - } + # location / { + # proxy_pass http://socket:9091; + # proxy_set_header Host $host; + # proxy_set_header X-Real-IP $remote_addr; + # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # proxy_set_header X-Forwarded-Proto $scheme; + # } + # } } diff --git a/src/microservices/hub/Dockerfile b/src/microservices/hub/Dockerfile new file mode 100644 index 0000000..e82e087 --- /dev/null +++ b/src/microservices/hub/Dockerfile @@ -0,0 +1,9 @@ +FROM golang:1.23-alpine3.20 +LABEL author="moxitech" +WORKDIR /var/sender +COPY app /var/sender + +RUN go get ./...; +RUN go build -o main cmd/main.go +EXPOSE 8899 +CMD "./main" \ No newline at end of file diff --git a/src/microservices/hub/app/cmd/handlers.go b/src/microservices/hub/app/cmd/handlers.go new file mode 100644 index 0000000..06ab7d0 --- /dev/null +++ b/src/microservices/hub/app/cmd/handlers.go @@ -0,0 +1 @@ +package main diff --git a/src/microservices/hub/app/cmd/hub.go b/src/microservices/hub/app/cmd/hub.go new file mode 100644 index 0000000..06ab7d0 --- /dev/null +++ b/src/microservices/hub/app/cmd/hub.go @@ -0,0 +1 @@ +package main diff --git a/src/microservices/hub/cmd/main.go b/src/microservices/hub/app/cmd/main.go similarity index 50% rename from src/microservices/hub/cmd/main.go rename to src/microservices/hub/app/cmd/main.go index d285840..1b85b76 100644 --- a/src/microservices/hub/cmd/main.go +++ b/src/microservices/hub/app/cmd/main.go @@ -6,6 +6,7 @@ import ( "sync" "time" + nats_connector "git.moxitech.ru/moxitech/hub-go/internal/nats" "github.com/gofiber/fiber/v2" "github.com/gofiber/websocket/v2" ) @@ -17,12 +18,13 @@ type Event struct { } type Hub struct { - clients map[*websocket.Conn]bool + clients map[*websocket.Conn]string // Map clients to their room IDs broadcast chan Event register chan *websocket.Conn unregister chan *websocket.Conn hubRooms map[string]*HubRoom - mu sync.Mutex + mu sync.RWMutex + natsConn *nats_connector.NatsConnection } type HubRoom struct { @@ -66,59 +68,16 @@ type BaseStation struct { func newHub() *Hub { return &Hub{ - clients: make(map[*websocket.Conn]bool), + clients: make(map[*websocket.Conn]string), broadcast: make(chan Event), register: make(chan *websocket.Conn), unregister: make(chan *websocket.Conn), hubRooms: make(map[string]*HubRoom), + natsConn: &nats_connector.NatsConnection{}, } } -func (h *Hub) run() { - for { - select { - case client := <-h.register: - h.mu.Lock() - h.clients[client] = true - // При подключении клиента отправляем все данные комнаты - for _, room := range h.hubRooms { - roomData, err := json.Marshal(room) - if err != nil { - fmt.Println("Error marshalling room data:", err) - continue - } - if err := client.WriteMessage(websocket.TextMessage, roomData); err != nil { - client.Close() - delete(h.clients, client) - } - } - h.mu.Unlock() - case client := <-h.unregister: - h.mu.Lock() - if _, ok := h.clients[client]; ok { - delete(h.clients, client) - client.Close() - } - h.mu.Unlock() - case event := <-h.broadcast: - h.mu.Lock() - for client := range h.clients { - eventData, err := json.Marshal(event) - if err != nil { - fmt.Println("Error marshalling event:", err) - continue - } - if err := client.WriteMessage(websocket.TextMessage, eventData); err != nil { - client.Close() - delete(h.clients, client) - } - } - h.mu.Unlock() - } - } -} - -func (h *Hub) handleMessage(c *websocket.Conn, msg []byte) { +func (h *Hub) handleMessage(c *websocket.Conn, msg []byte, roomId string) { var event Event err := json.Unmarshal(msg, &event) if err != nil { @@ -126,123 +85,118 @@ func (h *Hub) handleMessage(c *websocket.Conn, msg []byte) { return } - // Обработка событий в зависимости от типа объекта и типа действия switch event.Type { + case "sync": + h.handleSync(event, roomId) case "update": - h.handleUpdate(event) + h.handleUpdate(event, roomId) case "create": - h.handleCreate(event) + h.handleCreate(event, roomId) case "delete": - h.handleDelete(event) + h.handleDelete(event, roomId) default: fmt.Println("Unknown event type:", event.Type) } - // После обработки, передаем событие всем клиентам h.broadcast <- event } -func (h *Hub) handleUpdate(event Event) { +func (h *Hub) handleUpdate(event Event, roomId string) { h.mu.Lock() defer h.mu.Unlock() - switch event.ObjectType { - - case "uav": - var uav UAV - if err := mapToStruct(event.Payload, &uav); err == nil { - // Найдите и обновите UAV в hubRooms - for _, room := range h.hubRooms { + if room, exists := h.hubRooms[roomId]; exists { + switch event.ObjectType { + case "uav": + var uav UAV + if err := mapToStruct(event.Payload, &uav); err == nil { for i, existingUAV := range room.UAVs { if existingUAV.Id == uav.Id { room.UAVs[i] = uav - h.broadcast <- Event{Type: "update", ObjectType: "uav", Payload: uav} break } } + fmt.Println("Updating UAV in room", roomId, "ID:", uav.Id) } - fmt.Println("Updating UAV", uav.Id) - } - case "base_station": - var station BaseStation - if err := mapToStruct(event.Payload, &station); err == nil { - // Найдите и обновите BaseStation в hubRooms - for _, room := range h.hubRooms { + case "base_station": + var station BaseStation + if err := mapToStruct(event.Payload, &station); err == nil { for i, existingStation := range room.BaseStations { if existingStation.Id == station.Id { room.BaseStations[i] = station - h.broadcast <- Event{Type: "update", ObjectType: "base_station", Payload: station} break } } + fmt.Println("Updating BaseStation in room", roomId, "ID:", station.Id) } - fmt.Println("Updating BaseStation", station.Id) } } } -func (h *Hub) handleCreate(event Event) { +func (h *Hub) handleSync(event Event, roomId string) { + h.mu.RLock() + defer h.mu.RUnlock() + // h.stomp.SendMessage("бонжур епта!") + if room, exists := h.hubRooms[roomId]; exists { + roomData, err := json.Marshal(room) + if err != nil { + fmt.Println("Error marshalling room data:", err) + return + } + h.broadcast <- Event{Type: "sync", ObjectType: roomId, Payload: roomData} + } +} + +func (h *Hub) handleCreate(event Event, roomId string) { h.mu.Lock() defer h.mu.Unlock() - switch event.ObjectType { - case "uav": - var uav UAV - if err := mapToStruct(event.Payload, &uav); err == nil { - // Добавьте новый UAV в hubRooms - for _, room := range h.hubRooms { + if room, exists := h.hubRooms[roomId]; exists { + switch event.ObjectType { + case "uav": + var uav UAV + if err := mapToStruct(event.Payload, &uav); err == nil { room.UAVs = append(room.UAVs, uav) - h.broadcast <- Event{Type: "create", ObjectType: "uav", Payload: uav} + fmt.Println("Creating UAV in room", roomId, "ID:", uav.Id) } - fmt.Println("Creating UAV", uav.Id) - } - case "base_station": - var station BaseStation - if err := mapToStruct(event.Payload, &station); err == nil { - // Добавьте новую BaseStation в hubRooms - for _, room := range h.hubRooms { + case "base_station": + var station BaseStation + if err := mapToStruct(event.Payload, &station); err == nil { room.BaseStations = append(room.BaseStations, station) - h.broadcast <- Event{Type: "create", ObjectType: "base_station", Payload: station} + fmt.Println("Creating BaseStation in room", roomId, "ID:", station.Id) } - fmt.Println("Creating BaseStation", station.Id) } } } -func (h *Hub) handleDelete(event Event) { +func (h *Hub) handleDelete(event Event, roomId string) { h.mu.Lock() defer h.mu.Unlock() - switch event.ObjectType { - case "uav": - var uav UAV - if err := mapToStruct(event.Payload, &uav); err == nil { - // Удалите UAV из hubRooms - for _, room := range h.hubRooms { + if room, exists := h.hubRooms[roomId]; exists { + switch event.ObjectType { + case "uav": + var uav UAV + if err := mapToStruct(event.Payload, &uav); err == nil { for i, existingUAV := range room.UAVs { if existingUAV.Id == uav.Id { room.UAVs = append(room.UAVs[:i], room.UAVs[i+1:]...) - h.broadcast <- Event{Type: "delete", ObjectType: "uav", Payload: uav} break } } + fmt.Println("Deleting UAV in room", roomId, "ID:", uav.Id) } - fmt.Println("Deleting UAV", uav.Id) - } - case "base_station": - var station BaseStation - if err := mapToStruct(event.Payload, &station); err == nil { - // Удалите BaseStation из hubRooms - for _, room := range h.hubRooms { + case "base_station": + var station BaseStation + if err := mapToStruct(event.Payload, &station); err == nil { for i, existingStation := range room.BaseStations { if existingStation.Id == station.Id { room.BaseStations = append(room.BaseStations[:i], room.BaseStations[i+1:]...) - h.broadcast <- Event{Type: "delete", ObjectType: "base_station", Payload: station} break } } + fmt.Println("Deleting BaseStation in room", roomId, "ID:", station.Id) } - fmt.Println("Deleting BaseStation", station.Id) } } } @@ -254,38 +208,88 @@ func mapToStruct(input interface{}, output interface{}) error { } return json.Unmarshal(jsonData, output) } -func (h *Hub) createRoom(c *fiber.Ctx) error { - roomId := c.Params("roomId") + +func (h *Hub) run() { + for { + select { + case client := <-h.register: + h.mu.Lock() + h.clients[client] = "" // Room ID будет установлен при регистрации клиента в комнате + h.mu.Unlock() + + case client := <-h.unregister: + h.mu.Lock() + if roomId, ok := h.clients[client]; ok { + delete(h.clients, client) + client.Close() + fmt.Println("Клиент вышел из комнаты:", roomId) + } + h.mu.Unlock() + + case event := <-h.broadcast: + // Рассылаем сообщения всем клиентам в указанной комнате + h.mu.RLock() + for client, clientRoomId := range h.clients { + if clientRoomId == event.ObjectType { + eventData, err := json.Marshal(event) + if err != nil { + fmt.Println("Ошибка при сериализации события:", err) + continue + } + if err := client.WriteMessage(websocket.TextMessage, eventData); err != nil { + h.mu.Lock() + delete(h.clients, client) + client.Close() + h.mu.Unlock() + } + } + } + h.mu.RUnlock() + } + } +} + +func (h *Hub) createRoom(roomId string) { h.mu.Lock() defer h.mu.Unlock() - if _, exists := h.hubRooms[roomId]; exists { - return c.Status(fiber.StatusConflict).JSON(fiber.Map{ - "error": "Room already exists", - }) + if _, exists := h.hubRooms[roomId]; !exists { + h.hubRooms[roomId] = &HubRoom{ + Users: []User{}, + UAVs: []UAV{}, + BaseStations: []BaseStation{}, + TsUpdate: time.Now().Unix(), + } + fmt.Println("Room created successfully", roomId) } - - h.hubRooms[roomId] = &HubRoom{ - Users: []User{}, - UAVs: []UAV{}, - BaseStations: []BaseStation{}, - TsUpdate: time.Now().Unix(), - } - - return c.Status(fiber.StatusCreated).JSON(fiber.Map{ - "message": "Room created successfully", - "roomId": roomId, - }) } + func main() { app := fiber.New(fiber.Config{ AppName: "DRONE NETWORK SIMULATOR HUB - RUNNER", Prefork: false, }) hub := newHub() + err := hub.natsConn.Connect("nats://nats:4222") + if err != nil { + fmt.Printf("Error connect to nats :: %v", err) + } + // FOR TEST + hub.natsConn.Publish("/sim/q1", []byte("hello nats!")) + fmt.Println("Sended") + // FOR TEST + go hub.run() - app.Get("/ws", websocket.New(func(c *websocket.Conn) { + app.Get("/room/:roomId", websocket.New(func(c *websocket.Conn) { + roomId := c.Params("roomId") + fmt.Printf("room created: %v", roomId) + hub.createRoom(roomId) + + hub.mu.Lock() + hub.clients[c] = roomId + hub.mu.Unlock() + hub.register <- c defer func() { hub.unregister <- c }() @@ -294,16 +298,14 @@ func main() { if err != nil { break } - hub.handleMessage(c, msg) + hub.handleMessage(c, msg, roomId) } })) app.Get("/", func(c *fiber.Ctx) error { - return c.SendString("WebSocket server is running. Connect to /ws using a WebSocket client.") + return c.SendString("WebSocket server is running. Connect to /ws/{roomId} using a WebSocket client.") }) - app.Post("/rooms/:roomId", hub.createRoom) - fmt.Println("Server is running on http://localhost:8899") if err := app.Listen(":8899"); err != nil { panic(err) diff --git a/src/microservices/hub/app/cmd/models.go b/src/microservices/hub/app/cmd/models.go new file mode 100644 index 0000000..06ab7d0 --- /dev/null +++ b/src/microservices/hub/app/cmd/models.go @@ -0,0 +1 @@ +package main diff --git a/src/microservices/hub/docs/DTO.md b/src/microservices/hub/app/docs/DTO.md similarity index 100% rename from src/microservices/hub/docs/DTO.md rename to src/microservices/hub/app/docs/DTO.md diff --git a/src/microservices/hub/go.mod b/src/microservices/hub/app/go.mod similarity index 69% rename from src/microservices/hub/go.mod rename to src/microservices/hub/app/go.mod index 2d6253a..4dab3fd 100644 --- a/src/microservices/hub/go.mod +++ b/src/microservices/hub/app/go.mod @@ -5,17 +5,22 @@ go 1.23.2 require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/fasthttp/websocket v1.5.3 // indirect + github.com/go-stomp/stomp/v3 v3.1.3 // indirect github.com/gofiber/fiber/v2 v2.52.5 // indirect github.com/gofiber/websocket/v2 v2.2.1 // indirect github.com/google/uuid v1.5.0 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/nats-io/nats.go v1.37.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.51.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect ) diff --git a/src/microservices/hub/app/go.sum b/src/microservices/hub/app/go.sum new file mode 100644 index 0000000..d7200ea --- /dev/null +++ b/src/microservices/hub/app/go.sum @@ -0,0 +1,77 @@ +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek= +github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs= +github.com/go-stomp/stomp v2.1.4+incompatible h1:D3SheUVDOz9RsjVWkoh/1iCOwD0qWjyeTZMUZ0EXg2Y= +github.com/go-stomp/stomp v2.1.4+incompatible/go.mod h1:VqCtqNZv1226A1/79yh+rMiFUcfY3R109np+7ke4n0c= +github.com/go-stomp/stomp/v3 v3.1.3 h1:5/wi+bI38O1Qkf2cc7Gjlw7N5beHMWB/BxpX+4p/MGI= +github.com/go-stomp/stomp/v3 v3.1.3/go.mod h1:ztzZej6T2W4Y6FlD+Tb5n7HQP3/O5UNQiuC169pIp10= +github.com/gofiber/fiber/v2 v2.52.5 h1:tWoP1MJQjGEe4GB5TUGOi7P2E0ZMMRx5ZTG4rT+yGMo= +github.com/gofiber/fiber/v2 v2.52.5/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= +github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w= +github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= +github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/src/microservices/hub/app/internal/nats/nats_queue.go b/src/microservices/hub/app/internal/nats/nats_queue.go new file mode 100644 index 0000000..346382b --- /dev/null +++ b/src/microservices/hub/app/internal/nats/nats_queue.go @@ -0,0 +1,35 @@ +package nats_connector + +import "github.com/nats-io/nats.go" + +// NatsConnection struct для хранения соединения NATS +type NatsConnection struct { + Conn *nats.Conn +} + +// Connect функция для подключения к NATS-серверу +func (nc *NatsConnection) Connect(url string) error { + conn, err := nats.Connect(url) + if err != nil { + return err + } + nc.Conn = conn + return nil +} + +// Scanner функция для подписки на тему и обработки сообщений +func (nc *NatsConnection) Scanner(subject string, handler func(msg *nats.Msg)) (*nats.Subscription, error) { + sub, err := nc.Conn.Subscribe(subject, handler) + if err != nil { + return nil, err + } + return sub, nil +} + +// Publish функция для отправки сообщения в указанную тему +func (nc *NatsConnection) Publish(subject string, message []byte) error { + if nc.Conn == nil { + return nats.ErrConnectionClosed + } + return nc.Conn.Publish(subject, message) +} diff --git a/src/microservices/hub/go.sum b/src/microservices/hub/go.sum deleted file mode 100644 index dc5c0ad..0000000 --- a/src/microservices/hub/go.sum +++ /dev/null @@ -1,33 +0,0 @@ -github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= -github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= -github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek= -github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs= -github.com/gofiber/fiber/v2 v2.52.5 h1:tWoP1MJQjGEe4GB5TUGOi7P2E0ZMMRx5ZTG4rT+yGMo= -github.com/gofiber/fiber/v2 v2.52.5/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= -github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w= -github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU= -github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= -github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= -github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= -github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= -github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= -github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= -github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= -github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= -github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= -github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/src/microservices/publisher/Dockerfile b/src/microservices/publisher/Dockerfile deleted file mode 100644 index 041f1dd..0000000 --- a/src/microservices/publisher/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -# Build stage -FROM golang:1.22.7-alpine3.20 AS builder -WORKDIR /var/socket -COPY go.mod go.sum ./ -RUN go mod download -COPY . . -RUN go build -o main cmd/main.go - -# Run stage -FROM alpine:3.20.3 -COPY --from=builder /var/socket/main /main -CMD ["./main"] diff --git a/src/microservices/publisher/cmd/main.go b/src/microservices/publisher/cmd/main.go deleted file mode 100644 index 07ad7fa..0000000 --- a/src/microservices/publisher/cmd/main.go +++ /dev/null @@ -1,144 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "net/http" - "os" - "sync" - "time" - - "github.com/gorilla/websocket" -) - -var ( - clients = make(map[int]*websocket.Conn) // Список активных клиентов - events = make(map[int]string) // Локальная карта для хранения сообщений - mutex sync.Mutex // Мьютекс для управления конкурентным доступом к карте - upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { return true }, - } -) - -func main() { - http.HandleFunc("/storeEvent", handleStoreEvent) - http.HandleFunc("/ws/", handleWebSocket) - http.HandleFunc("/ws/publisher", handleWebSocketPublisher) - - // Запуск горутины для обработки сообщений каждую секунду - go processEvents() - fmt.Printf("Server started on %v \n", os.Getenv("SOCKET_BASE_ADDRESS")) - http.ListenAndServe(fmt.Sprintf("0.0.0.0:%v", os.Getenv("SOCKET_BASE_ADDRESS")), nil) -} - -// Обработчик для сохранения событий -func handleStoreEvent(w http.ResponseWriter, r *http.Request) { - clientID := r.URL.Query().Get("client_id") - message := r.URL.Query().Get("message") - - if clientID == "" || message == "" { - http.Error(w, "client_id and message are required", http.StatusBadRequest) - return - } - - var id int - _, err := fmt.Sscanf(clientID, "%d", &id) - if err != nil { - http.Error(w, "Invalid client_id", http.StatusBadRequest) - return - } - - mutex.Lock() - events[id] = message - mutex.Unlock() - - w.WriteHeader(http.StatusOK) -} - -// Обработчик WebSocket соединений -func handleWebSocket(w http.ResponseWriter, r *http.Request) { - clientIDStr := r.URL.Path[len("/ws/"):] - var clientID int - _, err := fmt.Sscanf(clientIDStr, "%d", &clientID) - if err != nil { - http.Error(w, "Invalid client ID", http.StatusBadRequest) - return - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - http.Error(w, "Failed to upgrade to WebSocket", http.StatusInternalServerError) - return - } - - mutex.Lock() - clients[clientID] = conn - mutex.Unlock() -} - -// Обработчик WebSocket Publisher -func handleWebSocketPublisher(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - http.Error(w, "Failed to upgrade to WebSocket", http.StatusInternalServerError) - return - } - - defer conn.Close() - - for { - // Читаем сообщение от publisher - _, message, err := conn.ReadMessage() - if err != nil { - fmt.Printf("Error reading message: %v\n", err) - break - } - - // Парсим сообщение для получения списка клиентов и json-строки - var payload struct { - Clients []int `json:"clients"` - Json string `json:"json"` - } - err = json.Unmarshal(message, &payload) - if err != nil { - fmt.Printf("Error unmarshalling message: %v\n", err) - continue - } - - // Публикуем сообщение только указанным клиентам - mutex.Lock() - for _, id := range payload.Clients { - if clientConn, ok := clients[id]; ok { - err = clientConn.WriteMessage(websocket.TextMessage, []byte(payload.Json)) - if err != nil { - fmt.Printf("Error sending message to client %d: %v\n", id, err) - clientConn.Close() - delete(clients, id) - } - } - } - mutex.Unlock() - } -} - -// Функция для обработки событий и отправки сообщений подключенным клиентам -func processEvents() { - for { - time.Sleep(1 * time.Second) - - mutex.Lock() - for id, message := range events { - if conn, ok := clients[id]; ok { - err := conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("Client %d: %s", id, message))) - if err != nil { - fmt.Printf("Error sending message to client %d: %v\n", id, err) - conn.Close() - delete(clients, id) - } - // Удаляем событие после отправки - delete(events, id) - } - } - mutex.Unlock() - } -} diff --git a/src/microservices/publisher/go.mod b/src/microservices/publisher/go.mod deleted file mode 100644 index b5e83fb..0000000 --- a/src/microservices/publisher/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module moxitech/socket - -go 1.22.7 - -require github.com/gorilla/websocket v1.5.3 // indirect diff --git a/src/microservices/publisher/go.sum b/src/microservices/publisher/go.sum deleted file mode 100644 index 25a9fc4..0000000 --- a/src/microservices/publisher/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= -github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/src/microservices/query/cmd/main.go b/src/microservices/query/cmd/main.go new file mode 100644 index 0000000..2e22ced --- /dev/null +++ b/src/microservices/query/cmd/main.go @@ -0,0 +1,16 @@ +package main + +import "encoding/json" + +type QueryElement struct { + // Иницииатор/участники комнаты, кому переслать симуляцию + Initiators []int + SimulationJson json.RawMessage +} + +var Query []QueryElement = make([]QueryElement, 0) + +// Поддерживает websocket связь с сервисом +func main() { + +} diff --git a/src/microservices/query/go.mod b/src/microservices/query/go.mod new file mode 100644 index 0000000..e6f5444 --- /dev/null +++ b/src/microservices/query/go.mod @@ -0,0 +1,12 @@ +module git.moxitech.ru/moxitech/usn/query + +go 1.23.2 + +require ( + github.com/klauspost/compress v1.17.2 // indirect + github.com/nats-io/nats.go v1.37.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect +) diff --git a/src/microservices/query/go.sum b/src/microservices/query/go.sum new file mode 100644 index 0000000..ad86381 --- /dev/null +++ b/src/microservices/query/go.sum @@ -0,0 +1,12 @@ +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/src/microservices/query/internal/nats/nats_queue.go b/src/microservices/query/internal/nats/nats_queue.go new file mode 100644 index 0000000..346382b --- /dev/null +++ b/src/microservices/query/internal/nats/nats_queue.go @@ -0,0 +1,35 @@ +package nats_connector + +import "github.com/nats-io/nats.go" + +// NatsConnection struct для хранения соединения NATS +type NatsConnection struct { + Conn *nats.Conn +} + +// Connect функция для подключения к NATS-серверу +func (nc *NatsConnection) Connect(url string) error { + conn, err := nats.Connect(url) + if err != nil { + return err + } + nc.Conn = conn + return nil +} + +// Scanner функция для подписки на тему и обработки сообщений +func (nc *NatsConnection) Scanner(subject string, handler func(msg *nats.Msg)) (*nats.Subscription, error) { + sub, err := nc.Conn.Subscribe(subject, handler) + if err != nil { + return nil, err + } + return sub, nil +} + +// Publish функция для отправки сообщения в указанную тему +func (nc *NatsConnection) Publish(subject string, message []byte) error { + if nc.Conn == nil { + return nats.ErrConnectionClosed + } + return nc.Conn.Publish(subject, message) +}