import { type MonoTypeOperatorFunction, Observable, type Subscriber, Subscription } from 'rxjs';

export class ObservableDidNotEmitError extends Error {
  constructor() {
    super('Source did not emit a value.');
    this.name = 'ObservableDidNotEmitError';
  }
}

const enum State { Pending, Value, Error, Complete }
const EmptySubscription = new Subscription(() => { });

/**
 * Subscribes to the observable, and retains the last value/error for future
 * subscriptions (similar to `pipe(take(1),shareReplay(1),throwIfEmpty(() => ...)`,
 * but also immediately subscribing).
 * .
 *
 * Once the source emits a value, the operator unsubscribes from the source.
 *
 * If the source observable completes without emitting a value or error, the
 * resulting observable will emit an {@link ObservableDidNotEmitError}.
 *
 * The resulting observable will never `complete`, unless an error occurs
 * as the first event, but it will not leak memory after the source has
 * emitted a value.
 */
export function publishOnce<T>(options?: { allowEmpty?: boolean }): MonoTypeOperatorFunction<T> {
  const allowEmpty = options?.allowEmpty ?? false;
  return source => {
    let state = State.Pending;
    let value: T | undefined;
    let error: unknown;
    let subscribers: Subscriber<T>[] | undefined;
    let subscription: Subscription | undefined;

    const result = new Observable<T>(subscriber => {
      switch (state) {
        case State.Value:
          subscriber.next(value);
          subscriber.complete();
          return EmptySubscription;
        case State.Error:
          subscriber.error(error);
          return EmptySubscription;
        case State.Complete:
          subscriber.complete();
          return EmptySubscription;
        default:
          subscribers ??= [];
          subscribers.push(subscriber);

          return (): void => {
            if (subscribers) {
              const index = subscribers.indexOf(subscriber);
              if (index > -1) {
                subscribers.splice(index)
              }
            }
          };
      }
    });

    subscription = source.subscribe({
      next: val => {
        if (state === State.Pending) {
          state = State.Value;
          value = val;
          emitPending('next', value);
        } // else should never happen
      },
      error: err => {
        if (state === State.Pending) {
          state = State.Error;
          error = err;
          emitPending('error', error);
        } // else should never happen
      },
      complete: () => {
        if (state === State.Pending) {
          if (!allowEmpty) {
            state = State.Error;
            error = new ObservableDidNotEmitError();
            emitPending('error', error);
          } else {
            state = State.Complete;
            emitPending('complete');
          }
        }
        // unsubscribe not needed here
      },
    });

    if (state !== State.Pending) {
      close();
    }

    return result;

    function emitPending(which: 'next' | 'error' | 'complete', value?: any): void {
      if (subscribers) {
        for (const subscriber of subscribers) {
          try {
            if (which === 'complete') {
              subscriber.complete();
            } else {
              subscriber[which](value);
              if (which === 'next') {
                subscriber.complete();
              }
            }
          } catch (e) {
            console.error(e);
          }
        }
        subscribers.length = 0;
      }
      close();
    }

    function close(): void {
      if (subscription) {
        subscription.unsubscribe();
        subscription = undefined;
      }
    }
  }
}
