Децентрализованный персональный сервер данных с поддержкой EventLog, CRDT, GraphQL и HTTP-MQTT туннелирования.
Проект представляет собой комплексную систему для управления персональными данными в распределенной среде. Основные возможности:
- 🔐 Децентрализованная идентификация - поддержка DID (Decentralized Identifiers)
- 📝 EventLog с CRDT - append-only лог событий с автоматическим разрешением конфликтов
- 🗄️ Коллекции и схемы - типизированное хранилище документов с валидацией
- 🔍 GraphQL API - гибкий API для работы с данными
- 🔄 MQTT синхронизация - автоматическая синхронизация между узлами
- 🚇 HTTP-MQTT туннель - туннелирование HTTP трафика через MQTT (как ngrok)
- 🔐 Шифрование и подписи - криптографическая защита данных
- 📊 Индексирование - поддержка различных типов индексов (exact, prefix, range, fulltext)
┌─────────────────────────────────────────────────────────────────┐
│ PDS Server │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ GraphQL │ │ Collection │ │ EventLog │ │
│ │ Handler │───▶│ Engine │───▶│ (CRDT) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Schema │ │ Identity │ │ KeyManager │ │
│ │ Registry │ │ Manager │ │ (Encryption) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ BadgerDB Storage Layer │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
└─────────────────────────────┼───────────────────────────────────┘
│
┌───────▼────────┐
│ MQTT Sync │
│ (Optional) │
└────────────────┘
EventLog - это append-only лог событий с поддержкой CRDT (Conflict-free Replicated Data Type):
- TID (Timestamp ID) - уникальный идентификатор события, включающий timestamp и clockID
- DAG структура - события организованы в направленный ациклический граф
- Подписи - каждое событие подписано приватным ключом автора
- Разрешение конфликтов - LastWriteWins, DIDPriority или custom
- Синхронизация - автоматическая репликация через MQTT
Event {
EventTID: "2zef5s3dvr2dm" // Уникальный TID
AuthorDID: "did:key:z6MkpTH..." // DID автора
Collection: "posts" // Коллекция
EventType: "create" // Тип операции
Parents: ["2zef5s3dvr2dk"] // Родительские события (DAG)
Payload: {...} // Данные документа
Signature: [...] // Подпись события
}Туннелирование HTTP трафика через MQTT брокер:
┌─────────────┐ HTTP ┌──────────────┐ MQTT ┌───────────────┐
│ HTTP Client │──────────────▶│ Relay Server │◀─────────────▶│ MQTT Broker │
│ (curl) │◀──────────────│ (Reverse │ │ (comqtt) │
└─────────────┘ │ Proxy) │ └───────────────┘
└──────────────┘ ▲
│
┌──────────────┐ MQTT │
│Tunnel Client │◀────────────────────┘
│(Forward to │
│ localhost) │
└──────┬───────┘
│ HTTP
┌──────▼───────┐
│ Local HTTP │
│ Server │
└──────────────┘
- Go 1.25.3 или выше
- Make (для удобной сборки)
# Клонируйте репозиторий
git clone https://github.com/gloudx/mqtt.git
cd mqtt
# Установите зависимости
go mod download# Собрать все компоненты
make build
# Или собрать отдельные компоненты:
make build-pds # PDS сервер
make build-server # HTTP-MQTT relay сервер
make build-client # HTTP-MQTT туннель клиент
make build-eventlog-tool # Утилита для работы с EventLogПосле сборки исполняемые файлы будут в директории bin/:
bin/pds- PDS серверbin/server- Relay сервер (MQTT брокер + HTTP reverse proxy)bin/client- Tunnel клиентbin/eventlog-tool- CLI утилита для EventLog
./bin/pdsИли через make:
make run-pds./bin/pds \
--node-id=node1 \
--data=./data/pds \
--http-port=8080./bin/pds \
--data=./data/pds \
--http-port=8080 \
--mqtt-broker=tcp://localhost:1883 \
--mqtt-enableИли через make:
make run-pds-mqtt--config- путь к JSON файлу конфигурации--node-id- уникальный ID узла--data- директория для хранения данных (по умолчанию:./data/pds)--http-port- порт HTTP сервера (по умолчанию:8080)--mqtt-broker- URL MQTT брокера (например:tcp://localhost:1883)--mqtt-enable- включить MQTT синхронизацию--debug-export-eventlog- экспортировать eventlog при запуске (по умолчанию:true)
После запуска PDS сервера GraphQL API доступен по адресу: http://localhost:8080/graphql
Создание документа:
mutation {
createPost(input: {
title: "Hello World"
content: "My first post"
published: true
}) {
id
title
content
createdAt
}
}Запрос документов:
query {
posts(limit: 10) {
id
title
content
createdAt
}
}Запрос с фильтрацией:
query {
posts(where: { published: true }, limit: 5) {
id
title
}
}./bin/server
# или
make run-serverСервер запустит:
- MQTT брокер на порту
1883 - HTTP reverse proxy на порту
8081
./bin/client -client-id=my-app -target=http://localhost:3000Параметры:
-client-id- уникальный ID клиента-target- URL локального HTTP сервера-broker- URL MQTT брокера (по умолчанию:tcp://localhost:1883)
# Ваш локальный сервер теперь доступен через:
curl http://localhost:8081/my-app/api/hello
# POST запрос
curl -X POST http://localhost:8081/my-app/api/data \
-H "Content-Type: application/json" \
-d '{"key": "value"}'Управление децентрализованными идентификаторами (DID):
// Создание нового идентификатора
manager, err := identity.NewIdentityManager(privKey, pubKey)
did := manager.GetDID()
// did:key:z6MkpTHR8VNsBxYAAWHut2Geadd9jSwuBV8xRoAnwWsdvktH
// Подпись данных
keyPair := manager.GetKeyPair()
signature, err := keyPair.Sign(data)
// Верификация
valid, err := keyPair.Verify(data, signature)Возможности:
- Генерация DID (метод
did:key) - Управление ключевыми парами (Ed25519)
- Подпись и верификация данных
- Персистентность (сохранение/загрузка)
- Резолюция DID документов
См. подробнее: internal/identity/README.md
Распределенный лог событий с CRDT:
config := eventlog.EventLogConfig{
OwnerDID: ownerDID,
KeyPair: keyPair,
ClockID: 0,
Storage: storage,
Synchronizer: mqttSync,
ConflictResolution: eventlog.LastWriteWins,
}
eventLog, err := eventlog.NewEventLog(config)
// Добавление события
event, err := eventLog.Append(ctx, "posts", "create", payload, nil)
// Получение всех событий
events, err := eventLog.GetAllEvents(ctx)
// Экспорт в JSON
err = eventLog.ExportToFile(ctx, "eventlog.json")Возможности:
- Append-only лог с DAG структурой
- Криптографические подписи (Ed25519)
- TID (Timestamp ID) для глобальной упорядоченности
- Разрешение конфликтов (LWW, DID Priority, Custom)
- MQTT синхронизация между узлами
- Экспорт/импорт в JSON/NDJSON
- Garbage collection устаревших событий
Стратегии разрешения конфликтов:
LastWriteWins- побеждает событие с более поздним timestampDIDPriority- приоритет по лексикографическому порядку DIDCustom- пользовательская функция
См. подробнее: tmp/eventlog/README.md
Типизированное хранилище документов:
// Создание коллекции
collection := collection.NewCollection(db, ownerDID, keyPair, schemaDef)
// Вставка документа
doc := collection.Document{
"title": "Hello",
"content": "World",
}
event, err := collection.Insert(ctx, doc)
// Запрос документов
query := collection.Query{
Filter: map[string]interface{}{"published": true},
Limit: 10,
}
result, err := collection.Find(ctx, query)
// Обновление
changes := collection.Document{"status": "published"}
event, err := collection.Update(ctx, docID, changes)Возможности:
- Валидация по схеме
- Индексы (exact, prefix, range, fulltext)
- CRUD операции через EventLog
- Автоматическое разрешение конфликтов
- Связь с EventLog для репликации
См. подробнее: internal/collection/README.md
Определение и валидация схем данных:
type SchemaDefinition struct {
Name string
Version int
Fields []FieldDef
Indexes []IndexDef
}
type FieldDef struct {
Name string
Type FieldType // String, Int, Float, Boolean, ID, DateTime, JSON
Nullable bool
IsList bool
CRDTType string // Опционально для CRDT типов
}Поддерживаемые типы:
- Примитивы: String, Int, Float, Boolean, ID
- Специальные: DateTime, JSON
- Списки:
IsList: true - Nullable поля
Типы индексов:
exact- точное совпадениеprefix- префиксный поискrange- диапазонный поискfulltext- полнотекстовый поиск
GraphQL API для работы с коллекциями:
// Создание resolver
resolver := graphql.NewResolver(engine)
// Создание handler
handler := graphql.NewHandler(resolver)
// Настройка HTTP сервера
http.Handle("/graphql", handler)Возможности:
- Автоматическая генерация схемы из коллекций
- Query операции (чтение, фильтрация, пагинация)
- Mutation операции (создание, обновление, удаление)
- Построение сложных фильтров
- Обработка ошибок
Управление ключами шифрования:
// Создание менеджера
keyManager := keymanager.NewKeyManager()
// Генерация ключа
keyInfo, err := keyManager.GenerateKey("my-key")
// Шифрование данных
encrypted, err := keyManager.Encrypt("my-key", data)
// Расшифровка
decrypted, err := keyManager.Decrypt("my-key", encrypted)Возможности:
- Генерация симметричных ключей (AES-256-GCM)
- Шифрование/расшифровка данных
- Ротация ключей
- Персистентность ключей
- Метаданные шифрования
Timestamp ID - глобально упорядоченные идентификаторы:
// Создание TID
tid := tid.NewTIDNow(0) // "2zef5s3dvr2dm"
// Извлечение timestamp
timestamp := tid.Time()
// Извлечение clockID
clockID := tid.ClockID()
// Парсинг
parsedTID, err := tid.ParseTID("2zef5s3dvr2dm")Структура TID:
- 13 символов base32 (сортируемый алфавит)
- Встроенный timestamp (микросекунды)
- ClockID (0-1023) для разрешения конфликтов
- Лексикографически сортируемые
HTTP-MQTT туннелирование:
// Создание reverse proxy
proxy, err := tunnel.NewHTTPReverseProxy(brokerURL, clientID)
// Создание tunnel клиента
client, err := tunnel.NewTunnelClient(brokerURL, clientID, targetURL)Возможности:
- Туннелирование HTTP через MQTT
- Request/Response correlation
- Поддержка всех HTTP методов
- Передача заголовков и тела запроса
- Таймауты и retry логика
mqtt/
├── cmd/ # Точки входа приложений
│ ├── pds/ # PDS сервер
│ │ └── main.go
│ └── relay/ # HTTP-MQTT relay сервер
│ └── main.go
├── internal/ # Внутренние пакеты
│ ├── collection/ # Коллекции документов
│ │ ├── collection.go # Основная логика коллекций
│ │ ├── engine.go # Движок коллекций
│ │ ├── index_*.go # Различные типы индексов
│ │ ├── storage.go # Слой хранения
│ │ └── README.md
│ ├── event/ # События
│ │ ├── event.go # Структура Event
│ │ └── signer.go # Подпись событий
│ ├── eventlog/ # Распределенный EventLog
│ │ ├── eventlog.go # Основная логика
│ │ ├── storage.go # BadgerDB storage
│ │ └── synchronizer.go # MQTT синхронизация
│ ├── graphql/ # GraphQL API
│ │ ├── handler.go # HTTP handler
│ │ ├── resolver.go # Query/Mutation resolver
│ │ ├── schema_builder.go # Генерация схемы
│ │ └── filter_builder.go # Построение фильтров
│ ├── identity/ # Управление DID
│ │ ├── identity.go # DID структуры
│ │ ├── keypair.go # Криптографические ключи
│ │ ├── manager.go # Identity manager
│ │ └── README.md
│ ├── keymanager/ # Управление ключами шифрования
│ │ ├── keymgr.go # Менеджер ключей
│ │ └── encryption.go # Шифрование/расшифровка
│ ├── pds/ # PDS сервер
│ │ ├── server.go # HTTP сервер
│ │ └── config.go # Конфигурация
│ ├── permissions/ # Контроль доступа
│ │ └── policy.go # Политики видимости
│ ├── schema/ # Схемы данных
│ │ ├── parser.go # Парсинг схем
│ │ ├── registry.go # Реестр схем
│ │ └── types.go # Типы схем
│ ├── tid/ # Timestamp ID
│ │ └── tid.go # Генерация и парсинг TID
│ └── tunnel/ # HTTP-MQTT туннель
│ └── tunnel.go # Клиент и сервер
├── go.mod # Go модули
├── go.sum
├── Makefile # Команды сборки
├── README.md # Этот файл
└── QUICKSTART.md # Быстрый старт
- Ed25519 - для подписи событий и DID
- AES-256-GCM - для шифрования данных
- SHA-256 - для хэширования и Merkle DAG
type AccessControl struct {
Visibility Visibility // public, private, shared
SharedWith []string // DIDs для shared
}public- доступно всемprivate- только авторshared- явно указанные участники
Каждое событие подписывается приватным ключом автора:
event := Event{
EventTID: tid,
AuthorDID: "did:key:z6Mk...",
Payload: data,
...
}
signature, _ := signer.Sign(event)
event.Signature = signature# Запуск тестов
go test ./...
# Тесты с coverage
go test -cover ./...
# Verbose вывод
go test -v ./internal/eventlogmake build # Собрать все компоненты
make build-pds # Собрать только PDS
make build-server # Собрать только relay сервер
make build-client # Собрать только tunnel клиент
make run-pds # Запустить PDS
make run-pds-mqtt # Запустить PDS с MQTT
make run-server # Запустить relay сервер
make clean # Очистить bin/
make fmt # Форматировать код
make vet # Проверить код
make test # Запустить тесты- Определите схему в GraphQL SDL или Go:
schema := &schema.SchemaDefinition{
Name: "posts",
Version: 1,
Fields: []schema.FieldDef{
{Name: "title", Type: schema.TypeString, Nullable: false},
{Name: "content", Type: schema.TypeString, Nullable: false},
{Name: "published", Type: schema.TypeBoolean, Nullable: false},
},
Indexes: []schema.IndexDef{
{Field: "title", Type: schema.IndexFullText},
},
}- Зарегистрируйте схему:
registry.RegisterSchema(schema)- GraphQL API будет сгенерирован автоматически!
- Компрессия: Snappy (опционально)
- Max table size: 4MB (настраиваемо)
- GC interval: 10 минут (настраиваемо)
- Batch operations: До 100 событий
- QoS: 1 (at least once delivery)
- Keep-alive: 60 секунд
- Auto-reconnect: Да
- Батчинг: Опционально для больших объемов
- WebSocket поддержка для real-time обновлений
- libp2p интеграция для P2P синхронизации
- IPFS/IPLD интеграция для хранения
- Улучшенная система разрешения конфликтов
- Поддержка дополнительных CRDT типов
- Миграции схем
- Расширенные возможности GraphQL (subscriptions)
- Metrics и monitoring
- Распределенные транзакции
- QUICKSTART.md - Быстрый старт
- internal/identity/README.md - Identity система
- internal/collection/README.md - Коллекции
- tmp/eventlog/README.md - EventLog документация
- internal/collection/CHANGELOG.md - История изменений
Приветствуются pull requests и issues!
MIT License
Основные зависимости:
- github.com/wind-c/comqtt/v2 - MQTT брокер
- github.com/dgraph-io/badger/v4 - Key-Value хранилище
- github.com/eclipse/paho.mqtt.golang - MQTT клиент
- github.com/graphql-go/graphql - GraphQL сервер
- github.com/ethereum/go-ethereum - Криптография (Ed25519)
- github.com/libp2p/go-libp2p - P2P сеть (опционально)
- github.com/ipfs/boxo - IPFS интеграция (опционально)
Полный список см. в go.mod
Автор: gloudx
Репозиторий: https://github.com/gloudx/mqtt
Версия: 1.0.0 (в разработке)