import {errorLogger} from '@eda-restapp/logger'
import type {Observable} from 'rxjs'
import {merge, Subject, timer} from 'rxjs'
import {filter, map, startWith, switchMap, tap} from 'rxjs/operators'

import type {ApiNotification, PingNotification} from './types'

export interface Config {
  pingToleranceMs: () => number
  firstPingToleranceMs: () => number
}

export const isPing = (message: ApiNotification): message is PingNotification => message.operation === 'ping'

export default function pingAudit({pingToleranceMs, firstPingToleranceMs}: Config) {
  return (notification$: Observable<ApiNotification>): Observable<ApiNotification> => {
    const auditSubject = new Subject<ApiNotification>()

    // Основной поток сообщений
    const message$ = notification$.pipe(
      // ---p---m--m--p--m (p - ping, m - сообщения)
      tap((m) => auditSubject.next(m)) // Дублируем сообщения в дополнительный поток для аудита
      // -------m--m-----m
    )

    // Создаем поток, который будет пустым если пинги приходят вовремя и падать если нет
    const timeoutError$ = auditSubject.pipe(
      // ---p---m--m--p--m (p - ping, m - сообщения)
      filter(isPing), // Забираем все пинги
      // ---p---------p---
      map((message) => message['server-interval-sec'] * 1000 + pingToleranceMs()), // Превращаем пинги в значения таймаутов
      // ---t---------t---
      startWith(firstPingToleranceMs()), // Добавляем значение первого таймера из firstPingToleranceMs
      // t----t---------t---
      switchMap(
        // Запускаем таймеры, которые будут сбрасываться при получении пинга
        (timeout) => timer(timeout).pipe(map(() => timeout))
      ),
      // ------------t
      map((timeout) => {
        // Сюда попадаем только если таймер не сбросился (не было пинга вовремя)
        void errorLogger({
          message: 'notifications:notifier_socket_reconnect_by_timeout',
          level: 'error',
          error: new Error('Connection is obsolete'),
          additional: {timeout}
        })

        throw new Error('Connection is obsolete')
      })

      // ------------#
    )

    // Объединяем изначальные сообщения (без пингов) и результаты аудита
    return merge(message$, timeoutError$)
  }
}
