/**
 * Server-Sent Events (SSE) Parser
 * A utility class for parsing SSE streams from a ReadableStream.
 * Provides a subscription mechanism for handling events.
 */

export interface SSEEvent<T = any> {
  type: string;
  data: T;
}

export type SSESubscriber<T = any> = (event: SSEEvent<T>) => void;

export class SSEParser {
  private reader: ReadableStreamDefaultReader<Uint8Array>;

  private decoder: TextDecoder;

  private buffer: string;

  private subscribers: SSESubscriber[];

  private isReading: boolean;

  /**
   * Creates a new SSEParser instance
   * @param reader The ReadableStreamDefaultReader to read from
   */
  constructor(reader: ReadableStreamDefaultReader<Uint8Array>) {
    this.reader = reader;
    this.decoder = new TextDecoder();
    this.buffer = '';
    this.subscribers = [];
    this.isReading = false;
  }

  /**
   * Subscribe to SSE events
   * @param callback Function to call when an event is received
   * @returns Unsubscribe function
   */
  subscribe(callback: SSESubscriber): () => void {
    this.subscribers.push(callback);

    // Return unsubscribe function
    return () => {
      this.subscribers = this.subscribers.filter((cb) => cb !== callback);
    };
  }

  /**
   * Notify all subscribers of an event
   * @param type Event type
   * @param data Event data
   */
  private notify(type: string, data: any): void {
    this.subscribers.forEach((callback) => {
      callback({ type, data });
    });
  }

  /**
   * Start reading and parsing the stream
   */
  async start(): Promise<void> {
    if (this.isReading) return;

    this.isReading = true;
    await this.readNextChunk();
  }

  /**
   * Stop reading the stream
   */
  async stop(): Promise<void> {
    this.isReading = false;
    try {
      await this.reader.cancel();
    } catch (error) {
      console.error('Error cancelling reader:', error);
    }
  }

  /**
   * Read and process chunks recursively
   */
  private async readNextChunk(): Promise<void> {
    if (!this.isReading) return;

    try {
      const { done, value } = await this.reader.read();

      if (done) {
        this.notify('close', null);
        this.isReading = false;
        return;
      }

      // Decode the received chunk and add it to our buffer
      this.buffer += this.decoder.decode(value, { stream: true });

      // Process complete events in the buffer
      const events = this.buffer.split('\n\n');
      this.buffer = events.pop() || ''; // Keep the last incomplete event in the buffer

      // Process all complete events
      events
        .filter((eventText) => eventText.trim())
        .forEach((eventText) => {
          // Parse the event
          const eventLines = eventText.split('\n');
          let eventType: string | null = null;
          let eventData: string | null = null;

          eventLines.forEach((line) => {
            if (line.startsWith('event:')) {
              eventType = line.substring(6).trim();
            } else if (line.startsWith('data:')) {
              eventData = line.substring(5).trim();
            }
          });

          if (eventType && eventData) {
            try {
              const parsedData = JSON.parse(eventData);
              this.notify(eventType, parsedData);
            } catch (error) {
              console.error('Error parsing event data:', error);
              this.notify('error', { error, raw: eventData });
            }
          }
        });

      // Continue reading if still active
      if (this.isReading) {
        await this.readNextChunk();
      }
    } catch (error) {
      console.error('Error reading from stream:', error);
      this.notify('error', error);
      this.isReading = false;
    }
  }
}
