Skip to content

gloudx/mqtt

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

27 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Distributed Personal Data Server (PDS)

Децентрализованный персональный сервер данных с поддержкой EventLog, CRDT, GraphQL и HTTP-MQTT туннелирования.

📋 Описание

Проект представляет собой комплексную систему для управления персональными данными в распределенной среде. Основные возможности:

  • 🔐 Децентрализованная идентификация - поддержка DID (Decentralized Identifiers)
  • 📝 EventLog с CRDT - append-only лог событий с автоматическим разрешением конфликтов
  • 🗄️ Коллекции и схемы - типизированное хранилище документов с валидацией
  • 🔍 GraphQL API - гибкий API для работы с данными
  • 🔄 MQTT синхронизация - автоматическая синхронизация между узлами
  • 🚇 HTTP-MQTT туннель - туннелирование HTTP трафика через MQTT (как ngrok)
  • 🔐 Шифрование и подписи - криптографическая защита данных
  • 📊 Индексирование - поддержка различных типов индексов (exact, prefix, range, fulltext)

🏗️ Архитектура

Основные компоненты

┌─────────────────────────────────────────────────────────────────┐
│                         PDS Server                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐     │
│  │   GraphQL    │    │  Collection  │    │   EventLog   │     │
│  │   Handler    │───▶│    Engine    │───▶│    (CRDT)    │     │
│  └──────────────┘    └──────────────┘    └──────────────┘     │
│                             │                    │              │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐     │
│  │   Schema     │    │   Identity   │    │  KeyManager  │     │
│  │   Registry   │    │   Manager    │    │ (Encryption) │     │
│  └──────────────┘    └──────────────┘    └──────────────┘     │
│                             │                    │              │
│  ┌──────────────────────────────────────────────────────┐      │
│  │          BadgerDB Storage Layer                      │      │
│  └──────────────────────────────────────────────────────┘      │
│                             │                                   │
└─────────────────────────────┼───────────────────────────────────┘
                              │
                      ┌───────▼────────┐
                      │ MQTT Sync      │
                      │ (Optional)     │
                      └────────────────┘

EventLog (Децентрализованный лог событий)

EventLog - это append-only лог событий с поддержкой CRDT (Conflict-free Replicated Data Type):

  • TID (Timestamp ID) - уникальный идентификатор события, включающий timestamp и clockID
  • DAG структура - события организованы в направленный ациклический граф
  • Подписи - каждое событие подписано приватным ключом автора
  • Разрешение конфликтов - LastWriteWins, DIDPriority или custom
  • Синхронизация - автоматическая репликация через MQTT
Event {
    EventTID:     "2zef5s3dvr2dm"           // Уникальный TID
    AuthorDID:    "did:key:z6MkpTH..."     // DID автора
    Collection:   "posts"                   // Коллекция
    EventType:    "create"                  // Тип операции
    Parents:      ["2zef5s3dvr2dk"]        // Родительские события (DAG)
    Payload:      {...}                     // Данные документа
    Signature:    [...]                     // Подпись события
}

HTTP-MQTT Tunnel (Relay)

Туннелирование HTTP трафика через MQTT брокер:

┌─────────────┐     HTTP      ┌──────────────┐     MQTT      ┌───────────────┐
│ HTTP Client │──────────────▶│ Relay Server │◀─────────────▶│ MQTT Broker   │
│   (curl)    │◀──────────────│ (Reverse     │               │  (comqtt)     │
└─────────────┘               │  Proxy)      │               └───────────────┘
                              └──────────────┘                      ▲
                                                                    │
                              ┌──────────────┐     MQTT            │
                              │Tunnel Client │◀────────────────────┘
                              │(Forward to   │
                              │ localhost)   │
                              └──────┬───────┘
                                     │ HTTP
                              ┌──────▼───────┐
                              │ Local HTTP   │
                              │   Server     │
                              └──────────────┘

🚀 Установка и быстрый старт

Требования

  • Go 1.25.3 или выше
  • Make (для удобной сборки)

Установка зависимостей

# Клонируйте репозиторий
git clone https://github.com/gloudx/mqtt.git
cd mqtt

# Установите зависимости
go mod download

Сборка

# Собрать все компоненты
make build

# Или собрать отдельные компоненты:
make build-pds           # PDS сервер
make build-server        # HTTP-MQTT relay сервер
make build-client        # HTTP-MQTT туннель клиент
make build-eventlog-tool # Утилита для работы с EventLog

После сборки исполняемые файлы будут в директории bin/:

  • bin/pds - PDS сервер
  • bin/server - Relay сервер (MQTT брокер + HTTP reverse proxy)
  • bin/client - Tunnel клиент
  • bin/eventlog-tool - CLI утилита для EventLog

📚 Использование

1. Запуск PDS сервера

Базовый запуск

./bin/pds

Или через make:

make run-pds

С параметрами

./bin/pds \
  --node-id=node1 \
  --data=./data/pds \
  --http-port=8080

С MQTT синхронизацией

./bin/pds \
  --data=./data/pds \
  --http-port=8080 \
  --mqtt-broker=tcp://localhost:1883 \
  --mqtt-enable

Или через make:

make run-pds-mqtt

Доступные параметры

  • --config - путь к JSON файлу конфигурации
  • --node-id - уникальный ID узла
  • --data - директория для хранения данных (по умолчанию: ./data/pds)
  • --http-port - порт HTTP сервера (по умолчанию: 8080)
  • --mqtt-broker - URL MQTT брокера (например: tcp://localhost:1883)
  • --mqtt-enable - включить MQTT синхронизацию
  • --debug-export-eventlog - экспортировать eventlog при запуске (по умолчанию: true)

2. Работа с GraphQL API

После запуска PDS сервера GraphQL API доступен по адресу: http://localhost:8080/graphql

Пример запросов

Создание документа:

mutation {
  createPost(input: {
    title: "Hello World"
    content: "My first post"
    published: true
  }) {
    id
    title
    content
    createdAt
  }
}

Запрос документов:

query {
  posts(limit: 10) {
    id
    title
    content
    createdAt
  }
}

Запрос с фильтрацией:

query {
  posts(where: { published: true }, limit: 5) {
    id
    title
  }
}

3. HTTP-MQTT Tunnel

Запуск relay сервера

./bin/server
# или
make run-server

Сервер запустит:

  • MQTT брокер на порту 1883
  • HTTP reverse proxy на порту 8081

Запуск tunnel клиента

./bin/client -client-id=my-app -target=http://localhost:3000

Параметры:

  • -client-id - уникальный ID клиента
  • -target - URL локального HTTP сервера
  • -broker - URL MQTT брокера (по умолчанию: tcp://localhost:1883)

Тестирование туннеля

# Ваш локальный сервер теперь доступен через:
curl http://localhost:8081/my-app/api/hello

# POST запрос
curl -X POST http://localhost:8081/my-app/api/data \
  -H "Content-Type: application/json" \
  -d '{"key": "value"}'

🔧 Внутренние компоненты

Identity (internal/identity)

Управление децентрализованными идентификаторами (DID):

// Создание нового идентификатора
manager, err := identity.NewIdentityManager(privKey, pubKey)
did := manager.GetDID()
// did:key:z6MkpTHR8VNsBxYAAWHut2Geadd9jSwuBV8xRoAnwWsdvktH

// Подпись данных
keyPair := manager.GetKeyPair()
signature, err := keyPair.Sign(data)

// Верификация
valid, err := keyPair.Verify(data, signature)

Возможности:

  • Генерация DID (метод did:key)
  • Управление ключевыми парами (Ed25519)
  • Подпись и верификация данных
  • Персистентность (сохранение/загрузка)
  • Резолюция DID документов

См. подробнее: internal/identity/README.md

EventLog (internal/eventlog)

Распределенный лог событий с CRDT:

config := eventlog.EventLogConfig{
    OwnerDID:           ownerDID,
    KeyPair:            keyPair,
    ClockID:            0,
    Storage:            storage,
    Synchronizer:       mqttSync,
    ConflictResolution: eventlog.LastWriteWins,
}
eventLog, err := eventlog.NewEventLog(config)

// Добавление события
event, err := eventLog.Append(ctx, "posts", "create", payload, nil)

// Получение всех событий
events, err := eventLog.GetAllEvents(ctx)

// Экспорт в JSON
err = eventLog.ExportToFile(ctx, "eventlog.json")

Возможности:

  • Append-only лог с DAG структурой
  • Криптографические подписи (Ed25519)
  • TID (Timestamp ID) для глобальной упорядоченности
  • Разрешение конфликтов (LWW, DID Priority, Custom)
  • MQTT синхронизация между узлами
  • Экспорт/импорт в JSON/NDJSON
  • Garbage collection устаревших событий

Стратегии разрешения конфликтов:

  • LastWriteWins - побеждает событие с более поздним timestamp
  • DIDPriority - приоритет по лексикографическому порядку DID
  • Custom - пользовательская функция

См. подробнее: tmp/eventlog/README.md

Collection (internal/collection)

Типизированное хранилище документов:

// Создание коллекции
collection := collection.NewCollection(db, ownerDID, keyPair, schemaDef)

// Вставка документа
doc := collection.Document{
    "title":   "Hello",
    "content": "World",
}
event, err := collection.Insert(ctx, doc)

// Запрос документов
query := collection.Query{
    Filter: map[string]interface{}{"published": true},
    Limit:  10,
}
result, err := collection.Find(ctx, query)

// Обновление
changes := collection.Document{"status": "published"}
event, err := collection.Update(ctx, docID, changes)

Возможности:

  • Валидация по схеме
  • Индексы (exact, prefix, range, fulltext)
  • CRUD операции через EventLog
  • Автоматическое разрешение конфликтов
  • Связь с EventLog для репликации

См. подробнее: internal/collection/README.md

Schema (internal/schema)

Определение и валидация схем данных:

type SchemaDefinition struct {
    Name     string
    Version  int
    Fields   []FieldDef
    Indexes  []IndexDef
}

type FieldDef struct {
    Name     string
    Type     FieldType  // String, Int, Float, Boolean, ID, DateTime, JSON
    Nullable bool
    IsList   bool
    CRDTType string     // Опционально для CRDT типов
}

Поддерживаемые типы:

  • Примитивы: String, Int, Float, Boolean, ID
  • Специальные: DateTime, JSON
  • Списки: IsList: true
  • Nullable поля

Типы индексов:

  • exact - точное совпадение
  • prefix - префиксный поиск
  • range - диапазонный поиск
  • fulltext - полнотекстовый поиск

GraphQL (internal/graphql)

GraphQL API для работы с коллекциями:

// Создание resolver
resolver := graphql.NewResolver(engine)

// Создание handler
handler := graphql.NewHandler(resolver)

// Настройка HTTP сервера
http.Handle("/graphql", handler)

Возможности:

  • Автоматическая генерация схемы из коллекций
  • Query операции (чтение, фильтрация, пагинация)
  • Mutation операции (создание, обновление, удаление)
  • Построение сложных фильтров
  • Обработка ошибок

KeyManager (internal/keymanager)

Управление ключами шифрования:

// Создание менеджера
keyManager := keymanager.NewKeyManager()

// Генерация ключа
keyInfo, err := keyManager.GenerateKey("my-key")

// Шифрование данных
encrypted, err := keyManager.Encrypt("my-key", data)

// Расшифровка
decrypted, err := keyManager.Decrypt("my-key", encrypted)

Возможности:

  • Генерация симметричных ключей (AES-256-GCM)
  • Шифрование/расшифровка данных
  • Ротация ключей
  • Персистентность ключей
  • Метаданные шифрования

TID (internal/tid)

Timestamp ID - глобально упорядоченные идентификаторы:

// Создание TID
tid := tid.NewTIDNow(0)  // "2zef5s3dvr2dm"

// Извлечение timestamp
timestamp := tid.Time()

// Извлечение clockID
clockID := tid.ClockID()

// Парсинг
parsedTID, err := tid.ParseTID("2zef5s3dvr2dm")

Структура TID:

  • 13 символов base32 (сортируемый алфавит)
  • Встроенный timestamp (микросекунды)
  • ClockID (0-1023) для разрешения конфликтов
  • Лексикографически сортируемые

Tunnel (internal/tunnel)

HTTP-MQTT туннелирование:

// Создание reverse proxy
proxy, err := tunnel.NewHTTPReverseProxy(brokerURL, clientID)

// Создание tunnel клиента
client, err := tunnel.NewTunnelClient(brokerURL, clientID, targetURL)

Возможности:

  • Туннелирование HTTP через MQTT
  • Request/Response correlation
  • Поддержка всех HTTP методов
  • Передача заголовков и тела запроса
  • Таймауты и retry логика

📁 Структура проекта

mqtt/
├── cmd/                          # Точки входа приложений
│   ├── pds/                      # PDS сервер
│   │   └── main.go
│   └── relay/                    # HTTP-MQTT relay сервер
│       └── main.go
├── internal/                     # Внутренние пакеты
│   ├── collection/               # Коллекции документов
│   │   ├── collection.go         # Основная логика коллекций
│   │   ├── engine.go             # Движок коллекций
│   │   ├── index_*.go            # Различные типы индексов
│   │   ├── storage.go            # Слой хранения
│   │   └── README.md
│   ├── event/                    # События
│   │   ├── event.go              # Структура Event
│   │   └── signer.go             # Подпись событий
│   ├── eventlog/                 # Распределенный EventLog
│   │   ├── eventlog.go           # Основная логика
│   │   ├── storage.go            # BadgerDB storage
│   │   └── synchronizer.go       # MQTT синхронизация
│   ├── graphql/                  # GraphQL API
│   │   ├── handler.go            # HTTP handler
│   │   ├── resolver.go           # Query/Mutation resolver
│   │   ├── schema_builder.go     # Генерация схемы
│   │   └── filter_builder.go     # Построение фильтров
│   ├── identity/                 # Управление DID
│   │   ├── identity.go           # DID структуры
│   │   ├── keypair.go            # Криптографические ключи
│   │   ├── manager.go            # Identity manager
│   │   └── README.md
│   ├── keymanager/               # Управление ключами шифрования
│   │   ├── keymgr.go             # Менеджер ключей
│   │   └── encryption.go         # Шифрование/расшифровка
│   ├── pds/                      # PDS сервер
│   │   ├── server.go             # HTTP сервер
│   │   └── config.go             # Конфигурация
│   ├── permissions/              # Контроль доступа
│   │   └── policy.go             # Политики видимости
│   ├── schema/                   # Схемы данных
│   │   ├── parser.go             # Парсинг схем
│   │   ├── registry.go           # Реестр схем
│   │   └── types.go              # Типы схем
│   ├── tid/                      # Timestamp ID
│   │   └── tid.go                # Генерация и парсинг TID
│   └── tunnel/                   # HTTP-MQTT туннель
│       └── tunnel.go             # Клиент и сервер
├── go.mod                        # Go модули
├── go.sum
├── Makefile                      # Команды сборки
├── README.md                     # Этот файл
└── QUICKSTART.md                 # Быстрый старт

🔐 Безопасность

Криптография

  • Ed25519 - для подписи событий и DID
  • AES-256-GCM - для шифрования данных
  • SHA-256 - для хэширования и Merkle DAG

Контроль доступа

type AccessControl struct {
    Visibility Visibility  // public, private, shared
    SharedWith []string    // DIDs для shared
}
  • public - доступно всем
  • private - только автор
  • shared - явно указанные участники

Подпись событий

Каждое событие подписывается приватным ключом автора:

event := Event{
    EventTID:   tid,
    AuthorDID:  "did:key:z6Mk...",
    Payload:    data,
    ...
}
signature, _ := signer.Sign(event)
event.Signature = signature

🧪 Тестирование

# Запуск тестов
go test ./...

# Тесты с coverage
go test -cover ./...

# Verbose вывод
go test -v ./internal/eventlog

🔨 Разработка

Makefile команды

make build              # Собрать все компоненты
make build-pds          # Собрать только PDS
make build-server       # Собрать только relay сервер
make build-client       # Собрать только tunnel клиент

make run-pds            # Запустить PDS
make run-pds-mqtt       # Запустить PDS с MQTT
make run-server         # Запустить relay сервер

make clean              # Очистить bin/
make fmt                # Форматировать код
make vet                # Проверить код
make test               # Запустить тесты

Добавление новой коллекции

  1. Определите схему в GraphQL SDL или Go:
schema := &schema.SchemaDefinition{
    Name:    "posts",
    Version: 1,
    Fields: []schema.FieldDef{
        {Name: "title", Type: schema.TypeString, Nullable: false},
        {Name: "content", Type: schema.TypeString, Nullable: false},
        {Name: "published", Type: schema.TypeBoolean, Nullable: false},
    },
    Indexes: []schema.IndexDef{
        {Field: "title", Type: schema.IndexFullText},
    },
}
  1. Зарегистрируйте схему:
registry.RegisterSchema(schema)
  1. GraphQL API будет сгенерирован автоматически!

📊 Производительность

BadgerDB Storage

  • Компрессия: Snappy (опционально)
  • Max table size: 4MB (настраиваемо)
  • GC interval: 10 минут (настраиваемо)
  • Batch operations: До 100 событий

MQTT Синхронизация

  • QoS: 1 (at least once delivery)
  • Keep-alive: 60 секунд
  • Auto-reconnect: Да
  • Батчинг: Опционально для больших объемов

🚧 Roadmap

  • WebSocket поддержка для real-time обновлений
  • libp2p интеграция для P2P синхронизации
  • IPFS/IPLD интеграция для хранения
  • Улучшенная система разрешения конфликтов
  • Поддержка дополнительных CRDT типов
  • Миграции схем
  • Расширенные возможности GraphQL (subscriptions)
  • Metrics и monitoring
  • Распределенные транзакции

📖 Дополнительная документация

🤝 Вклад

Приветствуются pull requests и issues!

📄 Лицензия

MIT License

🔗 Зависимости

Основные зависимости:

  • github.com/wind-c/comqtt/v2 - MQTT брокер
  • github.com/dgraph-io/badger/v4 - Key-Value хранилище
  • github.com/eclipse/paho.mqtt.golang - MQTT клиент
  • github.com/graphql-go/graphql - GraphQL сервер
  • github.com/ethereum/go-ethereum - Криптография (Ed25519)
  • github.com/libp2p/go-libp2p - P2P сеть (опционально)
  • github.com/ipfs/boxo - IPFS интеграция (опционально)

Полный список см. в go.mod


Автор: gloudx
Репозиторий: https://github.com/gloudx/mqtt
Версия: 1.0.0 (в разработке)

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •