import { merge, type Observable, of } from 'rxjs';
import { map, startWith } from 'rxjs/operators';

interface IDataStreamStart {
  type: 'start';
}

interface IDataStreamProgress {
  type: 'progress';
  value: number;
}

interface IDataStreamComplete<T> {
  type: 'complete';
  value: T;
}

export type IDataStreamEvent<T> =
  | IDataStreamStart
  | IDataStreamProgress
  | IDataStreamComplete<T>;

export class DataStreamEvent {
  static start(): IDataStreamStart {
    return { type: 'start' };
  }

  static progress(value: number): IDataStreamProgress {
    return { type: 'progress', value };
  }

  static complete<T>(value: T): IDataStreamComplete<T> {
    return { type: 'complete', value };
  }

  static isStart<T>(event: IDataStreamEvent<T>): event is IDataStreamStart {
    return event.type === 'start';
  }

  static isProgress<T>(
    event: IDataStreamEvent<T>
  ): event is IDataStreamProgress {
    return event.type === 'progress';
  }

  static isComplete<T>(
    event: IDataStreamEvent<T>
  ): event is IDataStreamComplete<T> {
    return event.type === 'complete';
  }

  static from<T>(
    data$: Observable<T>,
    progress$: Observable<number> = of(0)
  ): Observable<IDataStreamEvent<T>> {
    return merge(
      progress$.pipe(map((value) => DataStreamEvent.progress(value))),
      data$.pipe(map((value) => DataStreamEvent.complete(value)))
    ).pipe(startWith(DataStreamEvent.start()));
  }
}
