Atomic Operations Framework - Complete Developer Guide

Version: 3.0
Last Updated: 2025-01-01
Status: Production Ready ✅

Table of Contents

  1. Quick Start
  2. Architecture Overview
  3. Core Components
  4. Implementation Guide
  5. Supported Collections
  6. Testing Framework
  7. API Reference
  8. Troubleshooting
  9. Best Practices
  10. Migration Guide

Quick Start

What is Atomic Operations?

The Atomic Operations framework ensures all-or-nothing data consistency for bulk uploads across multiple data stores (MongoDB, Firebase RTDB, Firestore). If any operation fails, ALL operations are rolled back, leaving the system in a pristine state.

If you encounter such an error, please contact tech with the Bulk Operation ID and/or the Excel sheet for support.


📍 How to Access The New Features

The route admin/bulk-operations-v2 created during the bulk operation revamp supports more states of processing which include: rolling back, rollback complete , and rollback failed. This should be a visual indicator of the state in which the bulk operation process is in.

When using the old bulk operation UI, we don’t have any visual indicator regarding the rollbacks or failures, essentially it will look the same as before but when processing the data if errors are encountered, we rollback all changes.


✨ Key New Features

  1. The rollback mechanism which helps prevent inconsistent data
  2. Adding a fail state so that the users do not get stuck in the processing stage for unusually long time
  3. Integration with Scheduled Bulk Operations: Atomic operations now power scheduled bulk uploads with queue management

🔗 Integration with Scheduled Operations

Atomic Operations serve as the core processing engine for the new Scheduled Bulk Operations (v3) feature. This integration provides:

Combined Benefits

  • Reliable Scheduling: Schedule operations with confidence knowing atomic processing ensures data consistency
  • Queue Safety: If any operation in the queue fails, only that specific operation rolls back (not the entire queue)
  • Status Transparency: Real-time visibility into atomic processing stages within scheduled operations
  • Error Isolation: Failed scheduled operations do not affect other queued operations

Processing Flow

Scheduled Upload → Cloud Task Trigger → Atomic Processing → Rollback (if needed) → Notification

How It Works

  1. Upload & Schedule: User uploads file and schedules for future processing
  2. Queue Management: Operation waits in queue with atomic processing reserved
  3. Execution Time: Cloud Task triggers atomic bulk operation processing
  4. Atomic Processing: All-or-nothing processing with automatic rollback on failure
  5. Status Update: Scheduled operation status reflects atomic processing result
  6. Notification: User receives email about success/failure with rollback details

For complete details on scheduled operations, see: Scheduled Bulk Operations Guide


⚠️ Disclaimers

This feature is not behind feature flag i.e it is enabled for all customers.

The new bulk operations UI provides information when rollbacks are happening, but the old bulk ops UI does not. If stuck in processing state for longer than expected please do contact tech team.

Basic Usage

// Bulk upload endpoint automatically uses atomic operations
POST /api/bulk-operations/upload
Content-Type: multipart/form-data
 
// Upload Excel file with sheets:
// - Users
// - Departments  
// - Sites
// - Schedules
// - Non-Ops Days

Testing Rollback Behavior

# Force failure at operation 50 (0-based index)
export TEST_ROLLBACK_AT_OPERATION=50
npm run start:dev
 
# What happens:
# 1. Operations 0-49 execute successfully
# 2. Operation 50 is forced to fail
# 3. ALL 50 operations are rolled back
# 4. Database remains unchanged

Architecture Overview

Evolution Timeline

graph LR
    A[Phase 1: No Atomicity] --> B[Phase 2: Entity-Level]
    B --> C[Phase 3: Single Transaction]
    
    style C fill:#90EE90

Current Architecture (Phase 3)

┌─────────────────────────────────────────────────────────┐
│                   Bulk Upload Request                    │
└────────────────────┬────────────────────────────────────┘
                     ▼
┌─────────────────────────────────────────────────────────┐
│                  Validation Phase                        │
│         (All entities validated, no DB changes)          │
└────────────────────┬────────────────────────────────────┘
                     ▼
┌─────────────────────────────────────────────────────────┐
│              Firebase Auth Operations                    │
│              (Non-atomic, cannot rollback)               │
└────────────────────┬────────────────────────────────────┘
                     ▼
┌─────────────────────────────────────────────────────────┐
│               Operation Collection Phase                 │
│  ┌──────────────────────────────────────────────────┐   │
│  │ • Users, Departments, Sites                      │   │
│  │ • Site Groups, Mappings                          │   │
│  │ • Schedules, Schedule Statistics                 │   │
│  │ • Non-Ops Days (Site, User, Organization)        │   │
│  │ • Role Assignments                               │   │
│  │ • User Extensions (LMS)                          │   │
│  └──────────────────────────────────────────────────┘   │
└────────────────────┬────────────────────────────────────┘
                     ▼
┌─────────────────────────────────────────────────────────┐
│            Write-Ahead Logging (WAL)                     │
│         All operations logged to MongoDB                 │
└────────────────────┬────────────────────────────────────┘
                     ▼
┌─────────────────────────────────────────────────────────┐
│          Single Atomic Transaction Execution             │
│                                                          │
│  Success ✓                        Failure ✗             │
│  ├─ Commit all                    ├─ Rollback all       │
│  └─ Clear WAL                     └─ Restore state      │
└─────────────────────────────────────────────────────────┘

Key Features

  • Single Transaction: ALL operations across ALL entities in one atomic transaction
  • Write-Ahead Logging: Complete audit trail and rollback capability
  • Cross-Database Support: MongoDB, Firebase RTDB, and Firestore
  • Compensation-Based Rollback: For external data stores (Firebase/Firestore)
  • Zero Partial States: No data inconsistencies between entities

Core Components

1. AtomicOperationsWithBulkID

The main orchestrator class that manages atomic execution and rollback.

Location: src/domains/atomicOperations/core/atomicOperationsWithBulkID.ts

export class AtomicOperationsWithBulkID {
    constructor(connection: Connection, bulkOperationId: string)
    
    // Execute operations atomically
    async execute(operations: AtomicOperation[], forceFailAt?: number): Promise<any[]>
    
    // Rollback all executed operations
    async rollback(): Promise<boolean>
    
    // Recovery method for failed operations
    static async recoverFailedBulkOperation(connection: Connection, bulkOperationId: string): Promise<void>
}

2. AtomicOperation Interface

Defines the structure for each atomic operation.

export interface AtomicOperation {
    collection: string;           // Target collection name
    documentId: string;          // Unique document identifier
    operation: 'create' | 'update' | 'delete';
    execute: () => Promise<any>; // Operation execution function
    previousData?: any;          // Required for rollback of updates
    
    // External data source support
    dataSource?: 'mongodb' | 'firebase-rtdb' | 'firestore';
    firebasePath?: string;       // For Firebase RTDB operations
    firestorePath?: string;      // For Firestore operations
}

3. Collection Configuration

Centralized configuration for handling different collections.

Location: src/domains/atomicOperations/config/collections.config.ts

export interface CollectionConfig {
    idField: string;              // Primary identifier field
    queryBuilder?: (documentId: string) => any;  // Custom query logic
    dataSource?: 'mongodb' | 'firebase-rtdb' | 'firestore';
    firebasePathBuilder?: (documentId: string) => string;
    firestorePathBuilder?: (documentId: string) => string;
}

4. Write-Ahead Logging (WAL)

MongoDB collection (atomicwritelogs) that tracks all operations for rollback capability.

const writeLogSchema = {
    operationId: String,         // Unique operation ID
    bulkOperationId: String,     // Links to bulk upload
    collectionName: String,      // Target collection
    documentId: String,          // Document being modified
    operation: String,           // create/update/delete
    previousData: Mixed,         // For rollback
    status: String,              // pending/executed/rolledback
    createdAt: Date,
    executedAt: Date,
    rolledbackAt: Date,
    dataSource: String,          // mongodb/firebase-rtdb/firestore
}

Implementation Guide

Adding Atomic Operations to a New Entity

Step 1: Configure Your Collection

Add your collection to src/domains/atomicOperations/config/collections.config.ts:

export const COLLECTION_CONFIGS: Record<string, CollectionConfig> = {
    // ... existing collections ...
    
    // Add your MongoDB collection
    'yourentities': {
        idField: 'entityID',  // Primary identifier field
    },
    
    // For composite keys
    'yourentityindexes': {
        idField: '_id',
        queryBuilder: (documentId: string) => {
            // documentId format: "entityID:organizationID"
            const [entityID, organizationID] = documentId.split(':');
            return { entityID, organizationID };
        }
    },
    
    // For Firebase RTDB
    'firebase:yourentities': {
        idField: 'entityID',
        dataSource: 'firebase-rtdb',
        firebasePathBuilder: (documentId: string) => {
            return `/yourentities/${documentId}`;
        }
    },
    
    // For Firestore
    'firestore:yourentities': {
        idField: 'entityID',
        dataSource: 'firestore',
        firestorePathBuilder: (documentId: string) => {
            return `yourentities/${documentId}`;
        }
    }
};

Step 2: Create Collection Method in Your Service

export class YourEntityBulkUploadService {
    constructor(
        private yourEntityRepo: YourEntityRepository,
        private connection: Connection,  // MongoDB connection for atomic ops
    ) {}
    
    /**
     * Collect atomic operations for your entities
     * IMPORTANT: This method should NOT execute any database operations
     */
    public async collectAtomicOperations(
        ctx: Context<UserAuth>,
        data: YourEntityData[]
    ): Promise<{
        operations: AtomicOperation[];
        counts: { create: number; edit: number };
    }> {
        const operations: AtomicOperation[] = [];
        const counts = { create: 0, edit: 0 };
        const organizationID = ctx.user.organizationID;
        
        for (const entity of data) {
            // Determine if this is create or update
            const existingEntity = entity.entityID 
                ? await this.yourEntityRepo.findOne(ctx, entity.entityID)
                : null;
            
            if (existingEntity) {
                // UPDATE operation
                operations.push({
                    collection: 'yourentities',
                    documentId: entity.entityID,
                    operation: 'update',
                    previousData: existingEntity,  // CRITICAL for rollback!
                    execute: async () => {
                        await this.yourEntityRepo.update(ctx, entity);
                        counts.edit++;
                        return entity;
                    }
                });
            } else {
                // CREATE operation
                const newEntityID = entity.entityID || generateUUID();
                operations.push({
                    collection: 'yourentities',
                    documentId: newEntityID,
                    operation: 'create',
                    execute: async () => {
                        const newEntity = {
                            ...entity,
                            entityID: newEntityID,
                            organizationID,
                            createdAt: new Date(),
                            createdBy: ctx.user.userID
                        };
                        await this.yourEntityRepo.create(ctx, newEntity);
                        counts.create++;
                        return newEntity;
                    }
                });
            }
            
            // Add related operations (indexes, mappings, etc.)
            if (entity.needsIndex) {
                operations.push(this.createIndexOperation(entity));
            }
        }
        
        return { operations, counts };
    }
}

Step 3: Integrate in Main Bulk Upload Usecase

In src/domains/bulkOperations/usecase/bulkOperations.usecase.ts:

public async processBulkUpload(context: Context<UserAuth>, data: any) {
    const allOperations: AtomicOperation[] = [];
    const bulkOperationId = generateUUID();
    
    try {
        // ... validation phase ...
        
        // Collect operations from all services
        const userDeptSiteOps = await this.userDeptSiteService.collectAtomicOperations(context);
        allOperations.push(...userDeptSiteOps.operations);
        
        const scheduleOps = await this.scheduleService.collectScheduleAtomicOperations(context, scheduleData);
        allOperations.push(...scheduleOps.operations);
        
        // ADD YOUR ENTITY HERE
        const yourEntityOps = await this.yourEntityService.collectAtomicOperations(context, data.YourEntities);
        allOperations.push(...yourEntityOps.operations);
        
        // Execute all operations atomically
        const atomicOps = new AtomicOperationsWithBulkID(this.connection, bulkOperationId);
        await atomicOps.execute(allOperations);
        
        // Update counts
        recordCount.YourEntities = yourEntityOps.counts;
        
    } catch (error) {
        // Rollback handled automatically by AtomicOperationsWithBulkID
        throw error;
    }
}

Step 4: Handle External Data Sources (Optional)

For Firebase RTDB operations:

operations.push({
    collection: 'firebase:yourentities',
    documentId: entityID,
    operation: 'create',
    dataSource: 'firebase-rtdb',
    firebasePath: `/yourentities/${entityID}`,
    firebaseData: entityData,  // Data to write to Firebase
    execute: async () => {
        const db = admin.database();
        await db.ref(`/yourentities/${entityID}`).set(entityData);
        return entityData;
    }
});

For Firestore operations:

operations.push({
    collection: 'firestore:yourentities',
    documentId: entityID,
    operation: 'create',
    dataSource: 'firestore',
    firestorePath: `yourentities/${entityID}`,
    firestoreData: entityData,
    execute: async () => {
        const firestore = admin.firestore();
        await firestore.doc(`yourentities/${entityID}`).set(entityData);
        return entityData;
    }
});

Supported Collections

MongoDB Collections

CollectionID FieldNotes
usersuserIDUser accounts
user_extensionuserIDLMS-specific user data
departmentsdepartmentIDDepartment entities
departmentindexes_idComposite key: departmentID:organizationID
sitessiteIDSite/location entities
siteindexessiteIDSite search indexes
sitegroups_idSite groupings
sitescheduleindexesscheduleIDSchedule assignments
schedulestatistics_idSchedule metrics
skuschedulesscheduleIDSKU schedule mappings
productsproductIDProduct catalog
skusskuIDStock keeping units
issuesissueIDIssue tracking
roles_idRole assignments (composite: organizationID:role)
siteoffdays_idSite off days (composite: organizationID:siteID)
useroffdays_idUser off days (composite: organizationID:userID)
questionnairesquestionnaireIDSurvey/questionnaire definitions
questionnaireindexesquestionnaireIndexIDQuestionnaire search indexes

Firebase Realtime Database

PathPurpose
/user/{userID}User profiles in Firebase
/questionnaire/{orgID}/{questionnaireID}Questionnaire data
/questionnaireIndex/{orgID}/{indexID}Questionnaire indexes

Firestore

CollectionDocument PathPurpose
organizations/{orgID}/settings/offDaysOrganization-wide off days

Testing Framework

Environment Variables for Testing

# Force failure at specific operation index (0-based)
TEST_ROLLBACK_AT_OPERATION=50
 
# Legacy variables (still supported but deprecated)
TEST_ROLLBACK_USER_INDEX=3      # Fail at user N
TEST_ROLLBACK_DEPT_INDEX=2      # Fail at department N
TEST_ROLLBACK_SITE_INDEX=5      # Fail at site N
TEST_ROLLBACK_SCHEDULE_INDEX=3  # Fail at schedule N

Test Scenarios

Scenario 1: Basic Rollback Test

# Test with 100 total operations, fail at operation 75
export TEST_ROLLBACK_AT_OPERATION=75
npm run start:dev
 
# Upload a file with mixed entities
# Expected behavior:
# - Operations 0-74 execute successfully
# - Operation 75 fails
# - ALL 75 operations are rolled back
# - Database remains unchanged

Scenario 2: Cross-Entity Consistency Test

# Create test data that has dependencies
# - Users that will be assigned to departments
# - Departments that will be mapped to sites
# - Schedules that reference users and sites
 
export TEST_ROLLBACK_AT_OPERATION=50
npm run start:dev
 
# Verify NO partial data exists:
db.users.find({ _bulkOperationId: "test-id" }).count()        // Should be 0
db.departments.find({ _bulkOperationId: "test-id" }).count()  // Should be 0
db.sites.find({ _bulkOperationId: "test-id" }).count()        // Should be 0
db.sitescheduleindexes.find({ _bulkOperationId: "test-id" }).count() // Should be 0

Scenario 3: Firebase/Firestore Rollback Test

# Test compensation-based rollback for external systems
export TEST_ROLLBACK_AT_OPERATION=80
npm run start:dev
 
# Check Firebase RTDB (should be rolled back)
firebase.database().ref('/user/testUserID').once('value')  // Should not exist
 
# Check Firestore (should be rolled back)
firestore.doc('organizations/testOrgID/settings/offDays').get()  // Should not exist

Verification Methods

1. Check Write-Ahead Logs

// MongoDB shell
db.atomicwritelogs.find({ 
    bulkOperationId: "your-bulk-id" 
}).pretty()
 
// Should show:
// - All operations with status: 'rolledback'
// - executedAt timestamps for successful ops
// - rolledbackAt timestamps showing rollback completed

2. Verify Console Output

[INFO] Starting bulk upload process...
[INFO] Collecting user/dept/site operations...
[INFO] Collected 45 operations
[INFO] Collecting schedule operations...
[INFO] Collected 30 operations
[INFO] Collecting non-ops operations...
[INFO] Collected 10 operations
[INFO] Total operations to execute: 85
[INFO] Executing 85 operations atomically...
[ERROR] Operation 75 failed: [TEST] Forced failure
[INFO] [AtomicOperations] Starting rollback of 75 operations
[INFO] [AtomicOperations] Rolling back operation 74: update sites
[INFO] [AtomicOperations] Rolling back operation 73: create sitescheduleindexes
...
[INFO] [AtomicOperations] Rollback completed successfully
[ERROR] Bulk upload failed, all changes rolled back

3. Unit Test Example

describe('AtomicOperations Rollback', () => {
    it('should rollback all operations on failure', async () => {
        const operations: AtomicOperation[] = [
            {
                collection: 'users',
                documentId: 'user1',
                operation: 'create',
                execute: async () => { /* create user */ }
            },
            {
                collection: 'departments',
                documentId: 'dept1',
                operation: 'create',
                execute: async () => { 
                    throw new Error('Simulated failure');
                }
            }
        ];
        
        const atomicOps = new AtomicOperationsWithBulkID(connection, 'test-bulk-id');
        
        await expect(atomicOps.execute(operations)).rejects.toThrow('Simulated failure');
        
        // Verify rollback occurred
        const logs = await connection.model('AtomicWriteLog').find({
            bulkOperationId: 'test-bulk-id'
        });
        
        expect(logs[0].status).toBe('rolledback');
    });
});

API Reference

Core Classes

AtomicOperationsWithBulkID

class AtomicOperationsWithBulkID {
    /**
     * Create a new atomic operations handler
     * @param connection - MongoDB connection
     * @param bulkOperationId - Unique identifier for this bulk operation
     */
    constructor(connection: Connection, bulkOperationId: string)
    
    /**
     * Execute operations atomically
     * @param operations - Array of operations to execute
     * @param forceFailAt - Optional: Force failure at specific index (testing)
     * @returns Array of operation results
     * @throws Error if any operation fails (triggers automatic rollback)
     */
    async execute(operations: AtomicOperation[], forceFailAt?: number): Promise<any[]>
    
    /**
     * Manually trigger rollback (rarely needed, automatic on failure)
     * @returns true if rollback successful
     */
    async rollback(): Promise<boolean>
    
    /**
     * Enable test mode (prevents WAL cleanup for inspection)
     */
    enableTestMode(): void
    
    /**
     * Get the bulk operation ID
     */
    getBulkOperationId(): string
    
    /**
     * Static recovery method for orphaned operations
     * @param connection - MongoDB connection
     * @param bulkOperationId - ID of failed bulk operation
     */
    static async recoverFailedBulkOperation(
        connection: Connection, 
        bulkOperationId: string
    ): Promise<void>
}

Service Method Patterns

All services that participate in atomic operations should follow this pattern:

interface AtomicServicePattern {
    /**
     * Collect operations without executing them
     * @returns Operations array and counts
     */
    collectAtomicOperations(
        context: Context<UserAuth>,
        data: any
    ): Promise<{
        operations: AtomicOperation[];
        counts: { create: number; edit: number; [key: string]: number };
        // Optional: Additional return data
        siteNameChanges?: Map<string, string>;
        uploadedQuestionnaires?: Array<any>;
    }>
}

Collection Configuration API

// Get configuration for a collection
const config = getCollectionConfig('users');
 
// Build query for a collection
const query = buildCollectionQuery('departmentindexes', 'dept1:org1');
// Returns: { departmentID: 'dept1', organizationID: 'org1' }
 
// Check data source
const isFirebase = isFirebaseRTDBCollection('firebase:user');  // true
const isFirestore = isFirestoreCollection('firestore:organization-offdays');  // true
 
// Get paths for external systems
const firebasePath = getFirebasePath('firebase:user', 'user123');
// Returns: '/user/user123'
 
const firestorePath = getFirestorePath('firestore:organization-offdays', 'org123');
// Returns: 'organizations/org123/settings/offDays'

Troubleshooting

Common Issues and Solutions

Issue: “Cannot read property ‘AtomicWriteLog’ of undefined”

Cause: Missing MongoDB connection in service constructor or test mocks.

Solution:

// In service
constructor(
    private repository: YourRepository,
    private connection: Connection  // Add this
) {}
 
// In tests
const mockConnection = {
    models: { AtomicWriteLog: mockWriteLogModel },
    model: jest.fn().mockReturnValue(mockWriteLogModel)
};

Issue: “Expected 2 arguments, but got 1”

Cause: Repository methods expecting field selection parameter.

Solution:

// Wrong
await this.repo.find({ organizationID });
 
// Correct
await this.repo.find({ organizationID }, {});  // Pass empty object for all fields

Issue: Rollback not working for updates

Cause: Missing previousData in update operations.

Solution: Always fetch existing data before creating update operation:

const existing = await this.repo.findOne(id);
operations.push({
    collection: 'mycollection',
    documentId: id,
    operation: 'update',
    previousData: existing,  // REQUIRED for rollback!
    execute: async () => { /* update logic */ }
});

Issue: Composite key collections failing

Cause: Incorrect documentId format or missing queryBuilder.

Solution: Use consistent format and provide queryBuilder:

// In collections.config.ts
'mycollection': {
    idField: '_id',
    queryBuilder: (documentId: string) => {
        // Format: "key1:key2"
        const [key1, key2] = documentId.split(':');
        return { key1, key2 };
    }
}
 
// In operation
operations.push({
    collection: 'mycollection',
    documentId: `${key1}:${key2}`,  // Use composite format
    // ...
});

Issue: Memory issues with large uploads

Cause: All operations held in memory before execution.

Solution:

  • Limit batch size to ~1000 operations
  • Implement application-level batching:
const BATCH_SIZE = 1000;
for (let i = 0; i < data.length; i += BATCH_SIZE) {
    const batch = data.slice(i, i + BATCH_SIZE);
    await processBatch(batch);
}

Issue: Firebase/Firestore operations not rolling back

Cause: Missing data source configuration or incorrect paths.

Solution: Ensure proper configuration:

operations.push({
    collection: 'firebase:users',  // Use prefix
    documentId: userID,
    operation: 'create',
    dataSource: 'firebase-rtdb',   // Specify data source
    firebasePath: `/users/${userID}`,  // Provide path
    firebaseData: userData,  // Data for rollback
    execute: async () => { /* firebase operation */ }
});

Best Practices

1. Always Fetch Existing Data for Updates

// ✅ GOOD
const existing = await this.repo.findOne(id);
operations.push({
    operation: 'update',
    previousData: existing,  // Can rollback
    // ...
});
 
// ❌ BAD
operations.push({
    operation: 'update',
    // No previousData - cannot rollback!
    // ...
});

2. Use Consistent Document ID Formats

// For composite keys, use colon separator
const documentId = `${primaryKey}:${secondaryKey}`;
 
// For hierarchical data, maintain consistent patterns
const firebasePath = `/parent/${parentId}/child/${childId}`;

3. Validate Before Collecting Operations

// Validate ALL data first
const validationErrors = await validateAllEntities(data);
if (validationErrors.length > 0) {
    throw new ValidationError(validationErrors);
}
 
// Only then collect operations
const operations = await collectOperations(data);

4. Handle Entity Dependencies Correctly

// Collect in dependency order
const operations = [];
 
// 1. Create parent entities first
operations.push(...await collectUserOperations());
 
// 2. Then create dependent entities
operations.push(...await collectDepartmentMappingOperations());
 
// 3. Finally create entities that depend on both
operations.push(...await collectScheduleOperations());

5. Use Write-Ahead Logging for Debugging

// Enable test mode to preserve WAL entries
const atomicOps = new AtomicOperationsWithBulkID(connection, bulkId);
if (process.env.DEBUG_ATOMIC_OPS) {
    atomicOps.enableTestMode();  // WAL entries won't be cleaned up
}

6. Implement Proper Error Handling

try {
    await atomicOps.execute(operations);
} catch (error) {
    // Rollback is automatic, but log for monitoring
    log.error('Atomic operation failed', {
        bulkOperationId,
        totalOperations: operations.length,
        error: error.message
    });
    
    // Re-throw with context
    throw new BulkUploadError('Bulk upload failed and was rolled back', {
        bulkOperationId,
        cause: error
    });
}

7. Monitor Performance

const startTime = Date.now();
const operations = await collectOperations(data);
log.info('Operation collection time', {
    duration: Date.now() - startTime,
    count: operations.length
});
 
const execStart = Date.now();
await atomicOps.execute(operations);
log.info('Execution time', {
    duration: Date.now() - execStart,
    opsPerSecond: operations.length / ((Date.now() - execStart) / 1000)
});

Migration Guide

Migrating from Entity-Level to Single Transaction

If you have existing code using the old entity-level atomic operations:

Old Pattern (Phase 2)

// Each entity type had its own transaction
await this.userService.processWithAtomicity(users);
await this.deptService.processWithAtomicity(departments);
await this.scheduleService.processWithAtomicity(schedules);
// Problem: Schedule failure doesn't rollback users/departments

New Pattern (Phase 3)

// Collect all operations first
const allOperations = [];
allOperations.push(...await this.userService.collectAtomicOperations(users));
allOperations.push(...await this.deptService.collectAtomicOperations(departments));
allOperations.push(...await this.scheduleService.collectAtomicOperations(schedules));
 
// Execute as single transaction
const atomicOps = new AtomicOperationsWithBulkID(connection, bulkId);
await atomicOps.execute(allOperations);
// Now ALL operations succeed or ALL rollback

Key Migration Steps

  1. Update Service Methods: Change from executing operations to collecting them
  2. Add Connection Parameter: Services need MongoDB connection for atomic operations
  3. Update Tests: Mock the AtomicWriteLog model in test suites
  4. Remove Old Transaction Code: Delete entity-specific transaction handling
  5. Test Rollback: Verify cross-entity rollback works correctly

Performance Considerations

Operation Limits

MetricRecommendedMaximumNotes
Operations per upload5001000Memory constraint
Execution time< 30s60sMongoDB transaction limit
WAL entry size< 1MB16MBMongoDB document limit
Concurrent uploads13Database connection pool

Optimization Tips

  1. Batch Database Reads: Fetch all existing data in one query
  2. Use Indexes: Ensure proper indexes on frequently queried fields
  3. Minimize Operation Size: Store only essential data in previousData
  4. Consider Async Operations: For non-critical external API calls

Security Considerations

  1. Bulk Operation IDs: Use UUIDs to prevent ID guessing
  2. WAL Access: Restrict access to atomicwritelogs collection
  3. Sensitive Data: Don’t log sensitive information in WAL
  4. Rate Limiting: Implement rate limits on bulk upload endpoints
  5. Validation: Thorough input validation before processing

Future Enhancements

Planned Features

  1. Distributed Transactions: Support for multi-database transactions
  2. Partial Rollback: Ability to rollback specific operation ranges
  3. Resume Capability: Resume failed uploads from failure point
  4. Real-time Progress: WebSocket-based progress updates
  5. Batch Processing: Built-in support for large dataset batching

Under Consideration

  • GraphQL support for operation definitions
  • Event sourcing integration
  • Automatic retry with exponential backoff
  • Machine learning for failure prediction
  • Cross-region replication support

Support and Resources

Getting Help

  1. Check Documentation: Review this guide and linked documents
  2. Search Issues: Check GitHub issues for similar problems
  3. Enable Debug Logging: Set LOG_LEVEL=debug for detailed logs
  4. Contact Team: Reach out to the development team

Code Examples

Find working examples in:

  • src/domains/bulkOperations/usecase/bulkOperations.usecase.ts
  • src/domains/schedules/usecase/schedule.usecase.ts
  • src/domains/nonOps/usecase/nonOps.usecase.ts
  • src/__tests__/atomicOperations/

Document Version: 3.0
Framework Version: 1.0.0
Last Updated: 2025-01-01
Status: Production Ready ✅