1. Overview

The Nimbly notification system consists of three main components that work together to provide comprehensive notification functionality:

  1. nimbly-web-api: REST API endpoints for notification events management
  2. api-[issues](../Issue Tracker/IssueTrackerOverview.md): Issue management service that triggers notifications via PubSub
  3. nimbly-cloud/notificationpubsub-subscriber: Cloud Functions subscriber that processes notifications

The notification system follows an event-driven architecture where events from the issue management system trigger notifications through a central PubSub topic, which are then processed, persisted, and delivered to the appropriate users.

2. Event Flow and Integration

In the Nimbly ecosystem, we’ve implemented a notification pipeline with the following flow:

  • Whenever events occur in the issue management system (like assignment changes, comments, due date changes, etc.), api-[issues](../Issue Tracker/IssueTrackerOverview.md) pushes events to the “notificationpubsub” topic.
  • The notificationpubsub-subscriber Cloud Function (found in nimbly-cloud) is triggered by these events and handles the processing.
  • As part of the processing, the Cloud Function includes a non-blocking workflow to persist notification metadata to MongoDB by calling APIs in nimbly-web-api.
  • nimbly-web-api exposes endpoints for both persisting these notifications and providing paginated lists that mobile apps can query.

This architecture ensures loose coupling between services while maintaining a consistent notification experience across the platform.

3. Architecture Overview

graph TB
    subgraph "Client Layer"
        A[Web/Mobile App]
    end
    
    subgraph "API Layer"
        B[nimbly-web-api]
        C[api-issues]
    end
    
    subgraph "Message Queue"
        D[Google PubSub]
        D1[notificationpubsub Topic]
    end
    
    subgraph "Processing Layer"
        E[notificationpubsub-subscriber CF]
    end
    
    subgraph "Storage"
        F[MongoDB - NotificationEvents]
        G[Firebase - Users/Tokens]
    end
    
    subgraph "External Services"
        H[Firebase Push Notifications]
        I[SendGrid Email]
    end
    
    A --> B
    C --> D1
    D1 --> E
    E --> F
    E --> G
    E --> H
    E --> I
    E --> B
    B --> F

4. nimbly-web-api - Notification Events Management

4.1. Routes & Endpoints

MethodEndpointDescription
GET/notification-eventsRetrieve paginated notification events
POST/notification-eventsCreate new notification event
PUT/notification-events/:eventId/readMark notification as read
GET/notification-events/count/unreadGet unread notification count

4.2. Implementation Details

NotificationEventsController

export class NotificationEventsController {
    public async create({ context, payload }): Promise<FunctionReturn> {
        const data = payload.data;
        const notificationEvent = await this.notificationEventUsecase
            .createNotificationEvent(context, data);
        return response(notificationEvent, null);
    }
 
    public async setUserReadTheNotification({ context, payload }): Promise<FunctionReturn> {
        const userId = context.user.userID;
        const eventId = payload.params?.eventId;
        await this.notificationEventUsecase
            .setUserReadTheNotification(eventId, userId);
        return response('success', null);
    }
}

In this controller, the create method handles incoming notification event data and passes it to the usecase layer for business logic processing. The setUserReadTheNotification method takes an event ID and marks it as read for the current user.

NotificationEventUsecase

Core Methods:

  1. createNotificationEvent: Creates new notification event in MongoDB
  2. setUserReadTheNotification: Marks notification as read for specific user
  3. getPaginatedNotificationEvents: Retrieves notifications with pagination and filtering
  4. getUnreadNotificationCount: Returns count of unread notifications for user

Key Features:

  • 30-day retention period for notifications
  • User-specific read status tracking
  • Pagination support with filtering options
  • Support for read/unread status filtering
public async createNotificationEvent(
    context: Context<UserAuth>,
    data: Partial<NotificationEvent>
): Promise<NotificationEvent> {
    const eventData: NotificationEvent = new NotificationEvent({
        ...data,
        organizationID: context.user?.organizationID,
        eventId: randomUUID(),
        timestamp: new Date().getTime(),
    });
    return await this.notificationEventRepo.createNotificationEvent(eventData);
}

This method enriches the notification data with organization context, generates a unique ID, and timestamps the event before persisting it to the database.

5. Schema Definition and Aggregations in entity-node

The notification event schema is defined in the entity-node repository under src/repositories/notificationEvent. This provides the data structure and repository methods for managing notification events.

5.1. Schema Definition

// From notificationEvent.schema.ts
const NotificationEventSchema = new Schema<NotificationEvent & Document>(
    {
        eventId: { type: String, required: true, unique: true, index: true },
        userIds: { type: [String], index: true, default: [] }, // Array of user IDs who should receive the notification
        organizationID: { type: String, required: false, index: true },
        readBy: { type: [String], index: true, default: [] }, // Array of user IDs who have read the notification
        entity: { type: String, required: true, index: true }, // Type of entity (e.g., "issue")
        entityId: { type: String, required: true, index: true }, // ID of the entity
        timestamp: { type: Number, default: Date.now, required: true, index: true },
        content: {
            title: { type: String, required: true },
            body: { type: String, required: true },
        },
    },
    {
        // Add Mongoose default timestamps (createdAt, updatedAt)
        timestamps: true,
        // Define the collection name explicitly
        collection: 'notification_events',
    },
);
 
// Compound indexes for optimized queries
NotificationEventSchema.index({ userIds: 1, timestamp: -1 });
NotificationEventSchema.index({ readBy: 1, timestamp: -1 });

The schema includes indexed fields for efficient querying, particularly for listing notifications for specific users and filtering by read status. The compound indexes optimize the most common query patterns.

5.2. Repository Interface

// From notificationEvent.repository.ts
export interface INotificationEventRepository {
    createNotificationEvent: (notificationEvent: Partial<NotificationEvent>) => Promise<any>;
    getNotificationEvent: (eventId: string) => Promise<NotificationEvent | null>;
    updateNotificationEvent: (eventId: string, notificationEvent: Partial<NotificationEvent>) => Promise<any>;
    findNotificationEventsPaginate: (
        query: MongoQuery<NotificationEvent>,
        paginationOptions: PaginationOptions,
        userId: string,
    ) => Promise<any>;
    getUnreadNotificationCount: (userId: string) => Promise<number>;
}

This interface defines the contract for any repository implementation, ensuring consistent behavior regardless of the underlying data store.

5.3. MongoDB Aggregation Pipeline

One of the key components is the aggregation pipeline used for paginated queries, which efficiently filters and formats notification events:

// From aggregations/findPaginate.ts
export const makeFindPaginateQuery = (
    query: MongoQuery<NotificationEvent>,
    paginationOptions: PaginationOptions,
    userId: string,
    querySearch?: MongoQuery,
) => {
    const aggregate: any[] = [
        {
            $match: query,
        },
    ];
 
    // ... additional aggregation stages ...
 
    aggregate.push({
        $facet: {
            docs: [
                {
                    $sort: { timestamp: -1 },
                },
                {
                    $skip: paginationOptions.limit * (paginationOptions.page > 0 ? paginationOptions.page - 1 : 0),
                },
                {
                    $limit: paginationOptions.limit,
                },
                // Projects and calculates read status for the current user
                {
                    $addFields: {
                        read: {
                            $cond: {
                                if: { $ifNull: ['$readBy', false] },
                                then: { $in: [userId, '$readBy'] },
                                else: false,
                            },
                        },
                    },
                },
                // ... final projection ...
            ],
            pagination: [
                {
                    $count: 'eventId',
                },
            ],
        },
    });
 
    return aggregate;
};

This aggregation pipeline:

  1. Applies the initial filtering query
  2. Sorts notifications by timestamp (newest first)
  3. Implements pagination with skip and limit
  4. Calculates a personalized read flag for each notification based on the current user
  5. Returns both the paginated results and total count for proper pagination metadata

6. api-issues to notificationpubsub-subscriber Flow

6.1. Event Generation

In the api-[issues](../Issue Tracker/IssueTrackerOverview.md) service, we’ve added or updated processes to push events to the “notificationpubsub” topic whenever relevant events occur in the issue management system:

// Example in IssueUsecase.sendPushNotification()
await this.pubsubNotificationRepository.publish(
    'pushNotification:sendPushNotification',
    {
        context: ctx,
        payload: {
            uids: ['user1', 'user2'],
            title: 'ORG-123 - Quality Issue',
            body: 'Issue status updated to In Progress',
            notificationData: {
                key: 'issue-uuid',
                type: 'issue',
                value: JSON.stringify(issueData)
            }
        }
    }
);

6.2. Cloud Function Processing

The notificationpubsub-subscriber Cloud Function processes these events and performs several actions:

  1. Parses the incoming message
  2. Determines the appropriate action based on the message type
  3. Fetches necessary user data from Firebase
  4. Persists notification data to MongoDB via a REST call to nimbly-web-api
  5. Sends push notifications to user devices
// In PushNotificationUsecase.sendPushNotification
// Persist notification event to MongoDB via API call
const notificationEvent: Partial<NotificationEvent> = {
    userIds: uids,
    entity: notificationData?.type,
    entityId: notificationData?.key,
    content: { title, body }
};
await this._NotificationEventRepository.createNotificationEvent(context, notificationEvent);
 
// Send push notifications via Firebase
for (const [idx, userToken] of userTokens.entries()) {
    const message: admin.messaging.Message = {
        notification: { title, body },
        token: userToken,
        data: notificationData
    };
    await this._UserRepository.sendNotification(message, dryRun);
}

7. Complete End-to-End Flow

Scenario: Issue Status Update Notification

sequenceDiagram
    participant U as User
    participant API as api-issues
    participant PS as PubSub Topic
    participant CF as notificationpubsub-subscriber
    participant DB as MongoDB
    participant FB as Firebase
    participant WEB as nimbly-web-api
    participant APP as Mobile App

    U->>API: Update issue status
    API->>API: Process issue update
    API->>PS: Publish notification event
    Note over PS: Topic: notificationpubsub
    
    PS->>CF: Trigger subscriber
    CF->>CF: Parse message & process action
    
    par Persist Notification
        CF->>WEB: POST /notification-events
        WEB->>DB: Save notification event
    and Send Push Notification
        CF->>FB: Get user tokens
        FB-->>CF: Return tokens
        CF->>FB: Send push notification
        FB->>APP: Deliver notification
    end
    
    APP->>WEB: GET /notification-events
    WEB->>DB: Fetch notifications
    DB-->>WEB: Return data
    WEB-->>APP: Notification list
    
    APP->>WEB: PUT /notification-events/:id/read
    WEB->>DB: Update read status

8. Key Features and Benefits

  1. Decoupled Architecture: Services communicate asynchronously through PubSub, enhancing system resilience
  2. Centralized Notification Storage: All notifications are stored in MongoDB with a consistent schema
  3. User-Specific Tracking: Each notification tracks which users have read it
  4. Efficient Querying: Optimized indexes and aggregation pipelines for fast retrieval
  5. Mobile App Integration: Clean REST APIs for mobile apps to consume notifications
  6. Non-Blocking Processing: Notification persistence doesn’t block the main event processing flow

9. API Endpoints Reference

9.1. nimbly-web-api Notification Events

EndpointMethodDescriptionRequest BodyResponse
/notification-eventsGETGet paginated notificationsQuery params: limit, page, readStatus{ docs: NotificationEvent[], totalDocs, pages, ... }
/notification-eventsPOSTCreate notification event{ userIds, entity, entityId, content }NotificationEvent
/notification-events/:eventId/readPUTMark as readNone{ message: "success" }
/notification-events/count/unreadGETGet unread countNone{ count: number }

9.2. Query Parameters

GET /notification-events

  • limit: Number of items per page (default: 25)
  • page: Page number (default: 1)
  • readStatus: Filter by read status (read, unread, all)

10. Error Handling

10.1. api-issues Error Scenarios

  • Issue not found: Returns 404 with error message
  • User token fetch failure: Logs error, continues processing
  • PubSub publish failure: Throws ErrorCode with INVALID status

10.2. Cloud Function Error Handling

  • Message parsing failure: Uses ramda’s tryCatch with fallback
  • Database connection [issues](../Issue Tracker/IssueTrackerOverview.md): Logged and function exits
  • Firebase notification failure: Individual user failures logged, invalid tokens removed

10.3. nimbly-web-api Error Handling

  • Authentication failure: 401 Unauthorized
  • Invalid notification ID: 404 Not Found
  • Database errors: 500 Internal Server Error