A comprehensive NestJS integration framework for Temporal.io that provides enterprise-ready workflow orchestration with automatic discovery, declarative decorators, and robust monitoring capabilities.
- Overview
- Features
- Installation
- Quick Start
- Module Variants
- Configuration
- Core Concepts
- API Reference
- Examples
- Advanced Usage
- Health Monitoring
- Best Practices
- Troubleshooting
- Contributing
- License
NestJS Temporal Core bridges NestJS's powerful dependency injection system with Temporal.io's robust workflow orchestration engine. It provides a declarative approach to building distributed, fault-tolerant applications with automatic service discovery, enterprise-grade monitoring, and seamless integration.
- π Seamless Integration: Native NestJS decorators and dependency injection
- π Auto-Discovery: Automatic registration of activities via decorators
- π‘οΈ Type Safety: Full TypeScript support with comprehensive type definitions
- π₯ Enterprise Ready: Built-in health checks, monitoring, and error handling
- βοΈ Zero Configuration: Smart defaults with extensive customization options
- π¦ Modular Architecture: Separate modules for client, worker, activities, and schedules
- π Production Grade: Connection pooling, graceful shutdown, and fault tolerance
- β¨ Declarative Decorators:
@Activity()
and@ActivityMethod()
for clean activity definitions - π Automatic Discovery: Runtime discovery and registration of activities
- π Schedule Management: Programmatic schedule creation and management
- π₯ Health Monitoring: Built-in health checks and status reporting
- π Connection Management: Automatic connection pooling and lifecycle management
- π οΈ Error Handling: Comprehensive error handling with structured logging
- π Performance Monitoring: Built-in metrics and performance tracking
- π Graceful Shutdown: Clean resource cleanup and connection termination
- π¦ Modular Design: Use only what you need (client-only, worker-only, etc.)
npm install nestjs-temporal-core @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/common
The package requires the following peer dependencies:
npm install @nestjs/common @nestjs/core reflect-metadata rxjs
First, enable shutdown hooks in your main.ts
for proper Temporal resource cleanup:
// main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// Required for graceful Temporal connection cleanup
app.enableShutdownHooks();
await app.listen(3000);
}
bootstrap();
Import and configure TemporalModule
in your app module:
// app.module.ts
import { Module } from '@nestjs/common';
import { TemporalModule } from 'nestjs-temporal-core';
import { PaymentActivity } from './activities/payment.activity';
import { EmailActivity } from './activities/email.activity';
@Module({
imports: [
TemporalModule.register({
connection: {
address: 'localhost:7233',
namespace: 'default',
},
taskQueue: 'my-task-queue',
worker: {
workflowsPath: require.resolve('./workflows'),
activityClasses: [PaymentActivity, EmailActivity],
autoStart: true,
},
}),
],
providers: [PaymentActivity, EmailActivity],
})
export class AppModule {}
Create activities using @Activity()
and @ActivityMethod()
decorators:
// payment.activity.ts
import { Injectable } from '@nestjs/common';
import { Activity, ActivityMethod } from 'nestjs-temporal-core';
export interface PaymentData {
amount: number;
currency: string;
customerId: string;
}
@Injectable()
@Activity({ name: 'payment-activities' })
export class PaymentActivity {
@ActivityMethod('processPayment')
async processPayment(data: PaymentData): Promise<{ transactionId: string }> {
// Payment processing logic with full NestJS DI support
console.log(`Processing payment: $${data.amount} ${data.currency}`);
// Simulate payment processing
await new Promise(resolve => setTimeout(resolve, 1000));
return { transactionId: `txn_${Date.now()}` };
}
@ActivityMethod('refundPayment')
async refundPayment(transactionId: string): Promise<{ refundId: string }> {
// Refund logic
console.log(`Refunding transaction: ${transactionId}`);
return { refundId: `ref_${Date.now()}` };
}
}
Create workflows as pure Temporal functions (NOT NestJS services):
// payment.workflow.ts
import { proxyActivities, defineSignal, defineQuery, setHandler } from '@temporalio/workflow';
import type { PaymentActivity } from './payment.activity';
// Create activity proxies
const { processPayment, refundPayment } = proxyActivities<typeof PaymentActivity.prototype>({
startToCloseTimeout: '5m',
retry: {
maximumAttempts: 3,
initialInterval: '1s',
},
});
// Define signals and queries
export const cancelPaymentSignal = defineSignal<[string]>('cancelPayment');
export const getPaymentStatusQuery = defineQuery<string>('getPaymentStatus');
export async function processPaymentWorkflow(data: PaymentData): Promise<any> {
let status = 'processing';
let transactionId: string | undefined;
// Set up signal and query handlers
setHandler(cancelPaymentSignal, (reason: string) => {
status = 'cancelled';
});
setHandler(getPaymentStatusQuery, () => status);
try {
// Execute payment activity
const result = await processPayment(data);
transactionId = result.transactionId;
status = 'completed';
return {
success: true,
transactionId,
status,
};
} catch (error) {
status = 'failed';
// Compensate if needed
if (transactionId) {
await refundPayment(transactionId);
}
throw error;
}
}
Inject TemporalService
to start and manage workflows:
// payment.service.ts
import { Injectable } from '@nestjs/common';
import { TemporalService } from 'nestjs-temporal-core';
@Injectable()
export class PaymentService {
constructor(private readonly temporal: TemporalService) {}
async processPayment(paymentData: any) {
// Start workflow
const result = await this.temporal.startWorkflow(
'processPaymentWorkflow',
[paymentData],
{
workflowId: `payment-${Date.now()}`,
taskQueue: 'my-task-queue',
}
);
return {
workflowId: result.result.workflowId,
runId: result.result.runId,
};
}
async checkPaymentStatus(workflowId: string) {
// Query workflow
const statusResult = await this.temporal.queryWorkflow(
workflowId,
'getPaymentStatus'
);
return { status: statusResult.result };
}
async cancelPayment(workflowId: string, reason: string) {
// Send signal
await this.temporal.signalWorkflow(
workflowId,
'cancelPayment',
[reason]
);
}
}
The package provides modular architecture with separate modules for different use cases:
Complete integration with both client and worker capabilities:
import { TemporalModule } from 'nestjs-temporal-core';
TemporalModule.register({
connection: { address: 'localhost:7233' },
taskQueue: 'my-queue',
worker: {
workflowsPath: require.resolve('./workflows'),
activityClasses: [PaymentActivity, EmailActivity],
},
})
For services that only need to start/query workflows:
import { TemporalClientModule } from 'nestjs-temporal-core/client';
TemporalClientModule.register({
connection: { address: 'localhost:7233' },
namespace: 'default',
})
For dedicated worker processes:
import { TemporalWorkerModule } from 'nestjs-temporal-core/worker';
TemporalWorkerModule.register({
connection: { address: 'localhost:7233' },
taskQueue: 'worker-queue',
worker: {
workflowsPath: require.resolve('./workflows'),
activityClasses: [BackgroundActivity],
},
})
For standalone activity management:
import { TemporalActivityModule } from 'nestjs-temporal-core/activity';
TemporalActivityModule.register({
activityClasses: [DataProcessingActivity],
})
For managing Temporal schedules:
import { TemporalSchedulesModule } from 'nestjs-temporal-core/schedules';
TemporalSchedulesModule.register({
connection: { address: 'localhost:7233' },
})
TemporalModule.register({
connection: {
address: 'localhost:7233',
namespace: 'default',
},
taskQueue: 'my-task-queue',
worker: {
workflowsPath: require.resolve('./workflows'),
activityClasses: [PaymentActivity, EmailActivity],
autoStart: true,
maxConcurrentActivityExecutions: 100,
},
logLevel: 'info',
enableLogger: true,
})
For dynamic configuration using environment variables or config services:
// config/temporal.config.ts
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { TemporalOptionsFactory, TemporalOptions } from 'nestjs-temporal-core';
@Injectable()
export class TemporalConfigService implements TemporalOptionsFactory {
constructor(private configService: ConfigService) {}
createTemporalOptions(): TemporalOptions {
return {
connection: {
address: this.configService.get('TEMPORAL_ADDRESS', 'localhost:7233'),
namespace: this.configService.get('TEMPORAL_NAMESPACE', 'default'),
},
taskQueue: this.configService.get('TEMPORAL_TASK_QUEUE', 'default'),
worker: {
workflowsPath: require.resolve('../workflows'),
activityClasses: [], // Populated by module
maxConcurrentActivityExecutions: 100,
},
};
}
}
// app.module.ts
import { ConfigModule } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
TemporalModule.registerAsync({
imports: [ConfigModule],
useClass: TemporalConfigService,
}),
],
})
export class AppModule {}
TemporalModule.registerAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
connection: {
address: configService.get('TEMPORAL_ADDRESS', 'localhost:7233'),
namespace: configService.get('TEMPORAL_NAMESPACE', 'default'),
},
taskQueue: configService.get('TEMPORAL_TASK_QUEUE', 'default'),
worker: {
workflowsPath: require.resolve('./workflows'),
activityClasses: [PaymentActivity, EmailActivity],
},
}),
inject: [ConfigService],
})
For secure connections to Temporal Cloud:
import * as fs from 'fs';
TemporalModule.register({
connection: {
address: 'your-namespace.your-account.tmprl.cloud:7233',
namespace: 'your-namespace.your-account',
tls: {
clientCertPair: {
crt: fs.readFileSync('/path/to/client.crt'),
key: fs.readFileSync('/path/to/client.key'),
},
},
},
taskQueue: 'my-task-queue',
worker: {
workflowsPath: require.resolve('./workflows'),
activityClasses: [PaymentActivity],
},
})
interface TemporalOptions {
// Connection settings
connection: {
address: string; // Temporal server address (default: 'localhost:7233')
namespace?: string; // Temporal namespace (default: 'default')
tls?: TLSConfig; // TLS configuration for secure connections
};
// Task queue name
taskQueue?: string; // Default task queue (default: 'default')
// Worker configuration
worker?: {
workflowsPath?: string; // Path to workflow definitions (use require.resolve)
activityClasses?: any[]; // Array of activity classes to register
autoStart?: boolean; // Auto-start worker on module init (default: true)
maxConcurrentActivityExecutions?: number; // Max concurrent activities (default: 100)
maxActivitiesPerSecond?: number; // Rate limit for activities
};
// Logging
logLevel?: 'trace' | 'debug' | 'info' | 'warn' | 'error'; // Log level (default: 'info')
enableLogger?: boolean; // Enable logging (default: true)
// Advanced
isGlobal?: boolean; // Make module global (default: false)
autoRestart?: boolean; // Auto-restart worker on failure (default: true)
}
Activities are NestJS services decorated with @Activity()
that perform actual work. They have full access to NestJS dependency injection and can interact with external systems.
Key Points:
- Activities are NestJS services (
@Injectable()
) - Use
@Activity()
decorator at class level - Use
@ActivityMethod()
decorator for methods to be registered - Activities should be idempotent and handle retries gracefully
- Full access to NestJS DI (inject services, repositories, etc.)
@Injectable()
@Activity({ name: 'order-activities' })
export class OrderActivity {
constructor(
private readonly orderRepository: OrderRepository,
private readonly emailService: EmailService,
) {}
@ActivityMethod('createOrder')
async createOrder(orderData: CreateOrderData): Promise<Order> {
// Database operations with full DI support
const order = await this.orderRepository.create(orderData);
await this.emailService.sendConfirmation(order);
return order;
}
@ActivityMethod('validateInventory')
async validateInventory(items: OrderItem[]): Promise<boolean> {
// Business logic with injected services
return await this.orderRepository.checkInventory(items);
}
}
Workflows are pure Temporal functions (NOT NestJS services) that orchestrate activities. They must be deterministic and use Temporal's workflow APIs.
Important: Workflows are NOT decorated with @Injectable()
and should NOT use NestJS dependency injection.
// order.workflow.ts
import { proxyActivities, defineSignal, defineQuery, setHandler } from '@temporalio/workflow';
import type { OrderActivity } from './order.activity';
// Create activity proxies with proper typing
const { createOrder, validateInventory } = proxyActivities<typeof OrderActivity.prototype>({
startToCloseTimeout: '5m',
retry: {
maximumAttempts: 3,
initialInterval: '1s',
maximumInterval: '30s',
},
});
// Define signals and queries at module level
export const cancelOrderSignal = defineSignal<[string]>('cancelOrder');
export const getOrderStatusQuery = defineQuery<string>('getOrderStatus');
// Workflow function (exported, not a class)
export async function processOrderWorkflow(orderData: CreateOrderData): Promise<OrderResult> {
let status = 'pending';
// Set up signal handler
setHandler(cancelOrderSignal, (reason: string) => {
status = 'cancelled';
});
// Set up query handler
setHandler(getOrderStatusQuery, () => status);
try {
// Validate inventory
const isValid = await validateInventory(orderData.items);
if (!isValid) {
throw new Error('Insufficient inventory');
}
// Create order
status = 'processing';
const order = await createOrder(orderData);
status = 'completed';
return {
orderId: order.id,
status,
};
} catch (error) {
status = 'failed';
throw error;
}
}
Signals allow external systems to send events to workflows, while queries provide read-only access to workflow state.
import { defineSignal, defineQuery, setHandler, condition } from '@temporalio/workflow';
// Define at module level
export const updateStatusSignal = defineSignal<[string]>('updateStatus');
export const addItemSignal = defineSignal<[Item]>('addItem');
export const getItemsQuery = defineQuery<Item[]>('getItems');
export const getStatusQuery = defineQuery<string>('getStatus');
export async function myWorkflow(): Promise<void> {
let status = 'pending';
const items: Item[] = [];
// Set up handlers
setHandler(updateStatusSignal, (newStatus: string) => {
status = newStatus;
});
setHandler(addItemSignal, (item: Item) => {
items.push(item);
});
setHandler(getItemsQuery, () => items);
setHandler(getStatusQuery, () => status);
// Wait for completion signal
await condition(() => status === 'completed');
}
Inject TemporalService
in your NestJS services to interact with workflows:
@Injectable()
export class OrderService {
constructor(private readonly temporal: TemporalService) {}
async createOrder(orderData: CreateOrderData) {
// Start workflow - note the method signature
const result = await this.temporal.startWorkflow(
'processOrderWorkflow', // Workflow function name
[orderData], // Arguments array
{ // Options
workflowId: `order-${Date.now()}`,
taskQueue: 'order-queue',
}
);
return {
workflowId: result.result.workflowId,
runId: result.result.runId,
};
}
async queryOrderStatus(workflowId: string) {
const result = await this.temporal.queryWorkflow(
workflowId,
'getOrderStatus'
);
return result.result;
}
async cancelOrder(workflowId: string, reason: string) {
await this.temporal.signalWorkflow(
workflowId,
'cancelOrder',
[reason]
);
}
}
The main unified service providing access to all Temporal functionality:
class TemporalService {
/**
* Start a workflow execution
* @param workflowType - Name of the workflow function
* @param args - Array of arguments to pass to the workflow
* @param options - Workflow execution options
*/
async startWorkflow<T>(
workflowType: string,
args?: unknown[],
options?: WorkflowStartOptions
): Promise<WorkflowExecutionResult<T>>
/**
* Send a signal to a running workflow
* @param workflowId - The workflow ID
* @param signalName - Name of the signal
* @param args - Arguments for the signal
*/
async signalWorkflow(
workflowId: string,
signalName: string,
args?: unknown[]
): Promise<WorkflowSignalResult>
/**
* Query a running workflow
* @param workflowId - The workflow ID
* @param queryName - Name of the query
* @param args - Arguments for the query
*/
async queryWorkflow<T>(
workflowId: string,
queryName: string,
args?: unknown[]
): Promise<WorkflowQueryResult<T>>
/**
* Get a workflow handle to interact with it
* @param workflowId - The workflow ID
* @param runId - Optional run ID for specific execution
*/
async getWorkflowHandle<T>(
workflowId: string,
runId?: string
): Promise<T>
/**
* Terminate a workflow execution
* @param workflowId - The workflow ID
* @param reason - Termination reason
*/
async terminateWorkflow(
workflowId: string,
reason?: string
): Promise<WorkflowTerminationResult>
/**
* Cancel a workflow execution
* @param workflowId - The workflow ID
*/
async cancelWorkflow(
workflowId: string
): Promise<WorkflowCancellationResult>
/**
* Get service health status
*/
getHealth(): ServiceHealth
/**
* Create a schedule
*/
async createSchedule(options: ScheduleCreateOptions): Promise<ScheduleHandle>
/**
* List all schedules
*/
async listSchedules(): Promise<ScheduleListDescription[]>
/**
* Delete a schedule
*/
async deleteSchedule(scheduleId: string): Promise<void>
}
Options for starting workflows:
interface WorkflowStartOptions {
workflowId?: string; // Unique workflow ID
taskQueue?: string; // Task queue name
workflowExecutionTimeout?: Duration; // Total workflow timeout
workflowRunTimeout?: Duration; // Single run timeout
workflowTaskTimeout?: Duration; // Decision task timeout
memo?: Record<string, unknown>; // Workflow memo
searchAttributes?: SearchAttributes; // Search attributes for filtering
}
interface WorkflowExecutionResult<T> {
success: boolean;
result: T; // Contains workflowId, runId, etc.
executionTime: number;
error?: Error;
}
interface WorkflowQueryResult<T> {
success: boolean;
result: T;
workflowId: string;
queryName: string;
}
interface WorkflowSignalResult {
success: boolean;
workflowId: string;
signalName: string;
}
Complete example with compensation logic:
// order.activity.ts
@Injectable()
@Activity({ name: 'order-activities' })
export class OrderActivity {
constructor(
private readonly paymentService: PaymentService,
private readonly inventoryService: InventoryService,
private readonly emailService: EmailService,
) {}
@ActivityMethod('validatePayment')
async validatePayment(paymentData: PaymentData): Promise<PaymentResult> {
return await this.paymentService.validate(paymentData);
}
@ActivityMethod('chargePayment')
async chargePayment(paymentData: PaymentData): Promise<{ transactionId: string }> {
return await this.paymentService.charge(paymentData);
}
@ActivityMethod('refundPayment')
async refundPayment(transactionId: string): Promise<void> {
await this.paymentService.refund(transactionId);
}
@ActivityMethod('reserveInventory')
async reserveInventory(items: OrderItem[]): Promise<{ reservationId: string }> {
return await this.inventoryService.reserve(items);
}
@ActivityMethod('releaseInventory')
async releaseInventory(reservationId: string): Promise<void> {
await this.inventoryService.release(reservationId);
}
@ActivityMethod('sendConfirmationEmail')
async sendConfirmationEmail(order: Order): Promise<void> {
await this.emailService.sendConfirmation(order);
}
}
// order.workflow.ts
import { proxyActivities, defineSignal, defineQuery, setHandler } from '@temporalio/workflow';
import type { OrderActivity } from './order.activity';
const {
validatePayment,
chargePayment,
refundPayment,
reserveInventory,
releaseInventory,
sendConfirmationEmail,
} = proxyActivities<typeof OrderActivity.prototype>({
startToCloseTimeout: '5m',
retry: { maximumAttempts: 3 },
});
export const cancelOrderSignal = defineSignal<[string]>('cancelOrder');
export const getOrderStatusQuery = defineQuery<OrderStatus>('getOrderStatus');
export async function processOrderWorkflow(orderData: OrderData): Promise<OrderResult> {
let status: OrderStatus = 'pending';
let transactionId: string | undefined;
let reservationId: string | undefined;
let cancelled = false;
setHandler(cancelOrderSignal, (reason: string) => {
cancelled = true;
});
setHandler(getOrderStatusQuery, () => status);
try {
// Step 1: Validate payment
status = 'validating_payment';
const paymentValid = await validatePayment(orderData.payment);
if (!paymentValid.valid) {
throw new Error('Invalid payment method');
}
// Check cancellation
if (cancelled) {
status = 'cancelled';
return { orderId: orderData.orderId, status };
}
// Step 2: Reserve inventory
status = 'reserving_inventory';
const reservation = await reserveInventory(orderData.items);
reservationId = reservation.reservationId;
// Step 3: Charge payment
status = 'charging_payment';
const payment = await chargePayment(orderData.payment);
transactionId = payment.transactionId;
// Step 4: Send confirmation
status = 'sending_confirmation';
await sendConfirmationEmail({
orderId: orderData.orderId,
items: orderData.items,
total: orderData.totalAmount,
});
status = 'completed';
return {
orderId: orderData.orderId,
status,
transactionId,
reservationId,
};
} catch (error) {
// Compensation logic
status = 'compensating';
if (reservationId) {
await releaseInventory(reservationId);
}
if (transactionId) {
await refundPayment(transactionId);
}
status = 'failed';
throw error;
}
}
// order.service.ts
@Injectable()
export class OrderService {
constructor(private readonly temporal: TemporalService) {}
async createOrder(orderData: OrderData) {
const result = await this.temporal.startWorkflow(
'processOrderWorkflow',
[orderData],
{
workflowId: `order-${orderData.orderId}`,
taskQueue: 'order-queue',
}
);
return result.result;
}
async getOrderStatus(orderId: string) {
const result = await this.temporal.queryWorkflow(
`order-${orderId}`,
'getOrderStatus'
);
return result.result;
}
async cancelOrder(orderId: string, reason: string) {
await this.temporal.signalWorkflow(
`order-${orderId}`,
'cancelOrder',
[reason]
);
}
}
Creating and managing scheduled workflows:
// report.activity.ts
@Injectable()
@Activity({ name: 'report-activities' })
export class ReportActivity {
constructor(
private readonly reportService: ReportService,
private readonly storageService: StorageService,
private readonly notificationService: NotificationService,
) {}
@ActivityMethod('generateSalesReport')
async generateSalesReport(period: ReportPeriod): Promise<ReportData> {
return await this.reportService.generateSales(period);
}
@ActivityMethod('uploadReport')
async uploadReport(reportData: ReportData): Promise<string> {
return await this.storageService.upload(reportData);
}
@ActivityMethod('notifyStakeholders')
async notifyStakeholders(reportUrl: string, recipients: string[]): Promise<void> {
await this.notificationService.send(recipients, reportUrl);
}
}
// report.workflow.ts
import { proxyActivities } from '@temporalio/workflow';
import type { ReportActivity } from './report.activity';
const { generateSalesReport, uploadReport, notifyStakeholders } =
proxyActivities<typeof ReportActivity.prototype>({
startToCloseTimeout: '10m',
});
export async function weeklyReportWorkflow(): Promise<ReportResult> {
const endDate = new Date();
const startDate = new Date(endDate.getTime() - 7 * 24 * 60 * 60 * 1000);
// Generate report
const reportData = await generateSalesReport({
startDate,
endDate,
type: 'weekly',
});
// Upload to storage
const reportUrl = await uploadReport(reportData);
// Notify stakeholders
await notifyStakeholders(reportUrl, ['[email protected]']);
return {
reportUrl,
generatedAt: new Date(),
period: { startDate, endDate },
};
}
// schedule.service.ts
@Injectable()
export class ReportScheduleService {
constructor(private readonly temporal: TemporalService) {}
async setupWeeklyReports() {
await this.temporal.createSchedule({
scheduleId: 'weekly-sales-report',
spec: {
cronExpressions: ['0 9 * * MON'], // Every Monday at 9 AM
},
action: {
type: 'startWorkflow',
workflowType: 'weeklyReportWorkflow',
taskQueue: 'reports-queue',
},
policies: {
overlap: 'SKIP',
catchupWindow: '1 hour',
},
});
}
async deleteSchedule(scheduleId: string) {
await this.temporal.deleteSchedule(scheduleId);
}
async listAllSchedules() {
return await this.temporal.listSchedules();
}
}
Configure custom retry policies for different activity types:
// workflow.ts
const paymentActivities = proxyActivities<typeof PaymentActivity.prototype>({
startToCloseTimeout: '5m',
retry: {
maximumAttempts: 5,
initialInterval: '1s',
maximumInterval: '1m',
backoffCoefficient: 2,
nonRetryableErrorTypes: ['InvalidPaymentMethod', 'InsufficientFunds'],
},
});
const emailActivities = proxyActivities<typeof EmailActivity.prototype>({
startToCloseTimeout: '2m',
retry: {
maximumAttempts: 3,
initialInterval: '500ms',
},
});
Test workflows using Temporal's testing framework:
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { Worker } from '@temporalio/worker';
import { processOrderWorkflow } from './order.workflow';
import { OrderActivity } from './order.activity';
describe('Order Workflow', () => {
let testEnv: TestWorkflowEnvironment;
beforeAll(async () => {
testEnv = await TestWorkflowEnvironment.createTimeSkipping();
});
afterAll(async () => {
await testEnv?.teardown();
});
it('should process order successfully', async () => {
const { client, nativeConnection } = testEnv;
// Mock activities
const mockOrderActivity = {
validatePayment: async () => ({ valid: true }),
reserveInventory: async () => ({ reservationId: 'res-123' }),
chargePayment: async () => ({ transactionId: 'txn-123' }),
sendConfirmationEmail: async () => {},
};
const worker = await Worker.create({
connection: nativeConnection,
taskQueue: 'test',
workflowsPath: require.resolve('./order.workflow'),
activities: mockOrderActivity,
});
await worker.runUntil(async () => {
const result = await client.workflow.execute(processOrderWorkflow, {
workflowId: 'test-order-1',
taskQueue: 'test',
args: [{
orderId: 'order-123',
payment: { amount: 100, currency: 'USD' },
items: [{ id: '1', quantity: 1 }],
}],
});
expect(result.status).toBe('completed');
expect(result.transactionId).toBe('txn-123');
});
});
});
Organize complex workflows using child workflows:
// parent.workflow.ts
import { startChild } from '@temporalio/workflow';
export async function parentWorkflow(orderId: string) {
// Start child workflows
const paymentHandle = await startChild(processPaymentWorkflow, {
workflowId: `payment-${orderId}`,
args: [paymentData],
});
const shippingHandle = await startChild(processShippingWorkflow, {
workflowId: `shipping-${orderId}`,
args: [shippingData],
});
// Wait for both to complete
const [paymentResult, shippingResult] = await Promise.all([
paymentHandle.result(),
shippingHandle.result(),
]);
return {
payment: paymentResult,
shipping: shippingResult,
};
}
Use continue-as-new to prevent event history from growing too large:
import { continueAsNew } from '@temporalio/workflow';
export async function processEventStreamWorkflow(cursor: number): Promise<void> {
const events = await fetchEvents(cursor);
for (const event of events) {
await processEvent(event);
}
// Continue as new after processing 1000 events
if (events.length >= 1000) {
await continueAsNew<typeof processEventStreamWorkflow>(cursor + events.length);
}
}
Implement custom error types and handling:
// activities
export class RetryableError extends Error {
constructor(message: string) {
super(message);
this.name = 'RetryableError';
}
}
export class NonRetryableError extends Error {
constructor(message: string) {
super(message);
this.name = 'NonRetryableError';
}
}
@ActivityMethod('processData')
async processData(data: any): Promise<any> {
try {
return await this.externalApi.process(data);
} catch (error) {
if (error.code === 'RATE_LIMIT') {
throw new RetryableError('Rate limit exceeded, will retry');
} else if (error.code === 'INVALID_DATA') {
throw new NonRetryableError('Invalid data format');
}
throw error;
}
}
// workflow configuration
const activities = proxyActivities<typeof DataActivity.prototype>({
startToCloseTimeout: '5m',
retry: {
nonRetryableErrorTypes: ['NonRetryableError'],
},
});
β DO:
- Keep workflows deterministic (no random numbers, current time, network calls)
- Use activities for any non-deterministic operations
- Keep workflow history size manageable (use continue-as-new for long-running workflows)
- Export workflow functions (not classes)
- Use
defineSignal
anddefineQuery
at module level
β DON'T:
- Don't use
@Injectable()
on workflow functions - Don't inject NestJS services in workflows
- Don't use
Math.random()
orDate.now()
directly in workflows - Don't make HTTP calls or database queries directly in workflows
β DO:
- Make activities idempotent (safe to retry)
- Use
@Injectable()
and leverage NestJS DI - Use
@Activity()
and@ActivityMethod()
decorators - Handle errors appropriately
- Log activity execution for debugging
β DON'T:
- Don't make activities too granular (network overhead)
- Don't rely on activity execution order guarantees
- Don't share mutable state between activity invocations
β DO:
- Use async configuration for environment-based setup
- Configure appropriate timeouts for your use case
- Set up proper retry policies
- Enable graceful shutdown hooks
- Use task queues to organize work
β DON'T:
- Don't hardcode connection strings
- Don't use the same task queue for all workflows
- Don't ignore timeout configurations
β DO:
- Implement compensation logic in workflows
- Use appropriate retry policies
- Log errors with context
- Define non-retryable error types
- Handle activity failures gracefully
β DON'T:
- Don't swallow errors silently
- Don't retry indefinitely
- Don't ignore business-level failures
β DO:
- Write unit tests for activities
- Use TestWorkflowEnvironment for integration tests
- Mock external dependencies
- Test failure scenarios
- Test signal and query handlers
β DON'T:
- Don't skip workflow testing
- Don't test against production Temporal server
- Don't assume workflows are correct without testing
The package includes comprehensive health monitoring capabilities for production deployments.
// app.module.ts
import { Module } from '@nestjs/common';
import { TemporalModule } from 'nestjs-temporal-core';
import { TemporalHealthModule } from 'nestjs-temporal-core/health';
@Module({
imports: [
TemporalModule.register({
connection: { address: 'localhost:7233' },
taskQueue: 'my-queue',
worker: {
workflowsPath: require.resolve('./workflows'),
activityClasses: [MyActivity],
},
}),
TemporalHealthModule, // Adds /health/temporal endpoint
],
})
export class AppModule {}
@Controller('health')
export class HealthController {
constructor(private readonly temporal: TemporalService) {}
@Get('/status')
async getHealthStatus() {
const health = this.temporal.getHealth();
return {
status: health.overallHealth,
timestamp: new Date(),
services: {
client: {
healthy: health.client.status === 'healthy',
connection: health.client.connectionStatus,
},
worker: {
healthy: health.worker.status === 'healthy',
state: health.worker.state,
activitiesRegistered: health.worker.activitiesCount,
},
discovery: {
healthy: health.discovery.status === 'healthy',
activitiesDiscovered: health.discovery.activitiesDiscovered,
},
},
uptime: health.uptime,
};
}
@Get('/detailed')
async getDetailedHealth() {
const health = this.temporal.getHealth();
const stats = this.temporal.getStatistics();
return {
health,
statistics: stats,
performance: {
workflowStartLatency: stats.averageWorkflowStartTime,
activityExecutionCount: stats.totalActivitiesExecuted,
},
};
}
}
interface ServiceHealth {
overallHealth: 'healthy' | 'degraded' | 'unhealthy';
client: {
status: 'healthy' | 'unhealthy';
connectionStatus: 'connected' | 'disconnected';
};
worker: {
status: 'healthy' | 'unhealthy';
state: 'RUNNING' | 'STOPPED' | 'FAILED';
activitiesCount: number;
};
discovery: {
status: 'healthy' | 'unhealthy';
activitiesDiscovered: number;
};
uptime: number;
lastChecked: Date;
}
## Troubleshooting
### Common Issues and Solutions
#### 1. Connection Errors
**Problem:** Cannot connect to Temporal server
Error: Failed to connect to localhost:7233
**Solutions:**
```typescript
// Check connection configuration
const health = temporalService.getHealth();
console.log('Connection status:', health.client.connectionStatus);
// Verify Temporal server is running
// docker ps | grep temporal
// Check connection settings
TemporalModule.register({
connection: {
address: process.env.TEMPORAL_ADDRESS || 'localhost:7233',
namespace: 'default',
},
})
Problem: Workflow cannot find registered activities
Error: Activity 'myActivity' not found
Solutions:
// 1. Ensure activity is in activityClasses array
TemporalModule.register({
worker: {
activityClasses: [MyActivity], // Must include the activity class
},
})
// 2. Verify activity is registered as provider
@Module({
providers: [MyActivity], // Must be in providers array
})
// 3. Check activity decorator
@Activity({ name: 'my-activities' })
export class MyActivity {
@ActivityMethod('myActivity')
async myActivity() { }
}
// 4. Check discovery status
const health = temporalService.getHealth();
console.log('Activities discovered:', health.discovery.activitiesDiscovered);
Problem: Workflow not found or not executing
Error: Workflow 'myWorkflow' not found
Solutions:
// 1. Ensure workflowsPath is correct
TemporalModule.register({
worker: {
workflowsPath: require.resolve('./workflows'), // Must resolve to workflows file/directory
},
})
// 2. Export workflow function properly
// workflows/index.ts
export { processOrderWorkflow } from './order.workflow';
export { reportWorkflow } from './report.workflow';
// 3. Use correct workflow name when starting
await temporal.startWorkflow(
'processOrderWorkflow', // Must match exported function name
[args],
options
);
Problem: Activities or workflows timing out
Error: Activity timed out after 10s
Solutions:
// Configure appropriate timeouts
const activities = proxyActivities<typeof MyActivity.prototype>({
startToCloseTimeout: '10m', // Increase for long-running activities
scheduleToCloseTimeout: '15m', // Total time including queuing
scheduleToStartTimeout: '5m', // Time waiting in queue
});
// For workflows
await temporal.startWorkflow('myWorkflow', [args], {
workflowExecutionTimeout: '24h', // Max total execution time
workflowRunTimeout: '12h', // Max single run time
workflowTaskTimeout: '10s', // Decision task timeout
});
Problem: Worker fails to start or crashes
Error: Worker failed to start
Solutions:
// 1. Check worker configuration
TemporalModule.register({
worker: {
autoStart: true, // Ensure autoStart is true
workflowsPath: require.resolve('./workflows'),
activityClasses: [MyActivity],
},
})
// 2. Check logs
// Enable debug logging
TemporalModule.register({
logLevel: 'debug',
enableLogger: true,
})
// 3. Verify worker health
const health = temporalService.getHealth();
console.log('Worker status:', health.worker.state);
// 4. Check for port conflicts or resource issues
Problem: Signals or queries not being handled
Solutions:
// 1. Define signals/queries at module level (not inside workflow)
export const mySignal = defineSignal<[string]>('mySignal');
export const myQuery = defineQuery<string>('myQuery');
// 2. Set up handlers in workflow
export async function myWorkflow() {
let value = 'initial';
setHandler(mySignal, (newValue: string) => {
value = newValue;
});
setHandler(myQuery, () => value);
// ... workflow logic
}
// 3. Use correct names when signaling/querying
await temporal.signalWorkflow(workflowId, 'mySignal', ['newValue']);
const result = await temporal.queryWorkflow(workflowId, 'myQuery');
Enable comprehensive debugging:
TemporalModule.register({
logLevel: 'debug',
enableLogger: true,
connection: {
address: 'localhost:7233',
},
worker: {
debugMode: true, // If available
},
})
// Check detailed health and statistics
const health = temporalService.getHealth();
const stats = temporalService.getStatistics();
console.log('Health:', JSON.stringify(health, null, 2));
console.log('Stats:', JSON.stringify(stats, null, 2));
If you're still experiencing issues:
- Check the logs - Enable debug logging to see detailed information
- Verify configuration - Double-check all connection and worker settings
- Test connectivity - Ensure Temporal server is accessible
- Review health status - Use
getHealth()
to identify failing components - Check GitHub Issues - Search existing issues
- Create an issue - Provide logs, configuration, and minimal reproduction
- Node.js: >= 16.0.0
- NestJS: >= 9.0.0
- Temporal Server: >= 1.20.0
We welcome contributions! To contribute:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Make your changes
- Run tests (
npm test
) - Commit your changes (
git commit -m 'Add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
Please see CONTRIBUTING.md for detailed guidelines.
# Clone the repository
git clone https://github.com/harsh-simform/nestjs-temporal-core.git
cd nestjs-temporal-core
# Install dependencies
npm install
# Run tests
npm test
# Run tests with coverage
npm run test:cov
# Build the package
npm run build
# Generate documentation
npm run docs:generate
This project is licensed under the MIT License - see the LICENSE file for details.
- π Documentation: Full API Documentation
- π Issues: GitHub Issues
- π¬ Discussions: GitHub Discussions
- π¦ NPM: nestjs-temporal-core
- π Changelog: Releases
- π Example Project: nestjs-temporal-core-example
- Temporal.io - The underlying workflow orchestration platform
- NestJS - Progressive Node.js framework
- @temporalio/sdk - Official Temporal TypeScript SDK