import { actionChannel, call, delay, put, race, select, take } from 'redux-saga/effects'
import { Channel, END, EventChannel, eventChannel } from 'redux-saga'
import { ApplicationState } from '../../store'
import { getActiveOrgId } from '../../store/auth/util'
import { SseConnectionState } from '../../store/devmode/types'
import { SseEvent } from './types'

const { OPEN, ERROR, CLOSED } = SseConnectionState

const sseLog = (customEventId: string, msg: string) => {
    // Somewhat noisy logs (keep alive messages etc) so deactivated until we need it for debugging
    //console.log(`[sse ${customEventId}] ${msg}`)
}

const openSseEventChannel = (eventSourceUrl: string, customEventId: string) =>
    eventChannel<SseEvent>((emitter) => {
        const eventSrc = new EventSource(eventSourceUrl, { withCredentials: true })
        eventSrc.onopen = () => emitter({ type: 'connection-state', newConnectionState: OPEN })
        eventSrc.onerror = () => emitter({ type: 'connection-state', newConnectionState: ERROR })
        eventSrc.addEventListener(customEventId, (message) => emitter({ type: 'user-defined', message }))
        eventSrc.addEventListener('keep-alive', () => emitter({ type: 'keep-alive' }))
        return () => {
            sseLog(customEventId, 'closing event source')
            eventSrc.close()
            emitter({ type: 'connection-state', newConnectionState: CLOSED })
            emitter(END)
        }
    })

function* waitForNoSubscribers(
    subUnsubChannel: Channel<any>,
    subCountSelector: (s: ApplicationState) => number,
    customEventId: string
): any {
    sseLog(customEventId, 'waiting for zero subscribers')
    while (true) {
        yield take(subUnsubChannel)
        const subsLeft = yield select(subCountSelector)
        if (subsLeft === 0) {
            sseLog(customEventId, 'initiating unsubscribe grace period')
            const { silence } = yield race({
                silence: delay(1000),
                newConnection: take(subUnsubChannel)
            })
            if (silence) {
                return
            } else {
                sseLog(customEventId, 'cancelling disconnect')
            }
        }
    }
}

// Process SSE events until error or closed connection
function* processSseEvents(
    scheduleUpdateChannel: EventChannel<SseEvent>,
    customEventId: string,
    eventHandler: (action: any) => any,
    sseConnectionDebugAction: (state: SseConnectionState) => any
): any {
    while (true) {
        const raceResult = yield race({
            keepAliveTimeout: delay(10000),
            sseEvent: take(scheduleUpdateChannel)
        })

        if ('keepAliveTimeout' in raceResult) {
            sseLog(customEventId, 'keep-alive timed out')
            yield put(sseConnectionDebugAction(SseConnectionState.ERROR))
            return
        } else {
            sseLog(customEventId, `process incoming message: ${raceResult.sseEvent.type}`)
            switch (raceResult.sseEvent.type) {
                case 'user-defined':
                    yield call(eventHandler, raceResult.sseEvent.message)
                    break
                case 'keep-alive':
                    break
                case 'connection-state':
                    yield put(sseConnectionDebugAction(raceResult.sseEvent.newConnectionState))
                    if ([CLOSED, ERROR].includes(raceResult.sseEvent.newConnectionState)) {
                        return
                    }
                    break
            }
        }
    }
}

export function* sseConnectDisconnectLoop(
    subUnsubActions: string[],
    eventSourceUrlFn: (scheduleId: string) => string,
    customEventId: string,
    subCountSelector: (s: ApplicationState) => number,
    eventHandler: (action: any) => any,
    sseConnectionDebugAction: (sseConnectionState: SseConnectionState) => any
): any {
    sseLog(customEventId, 'starting auto-scheduler sse saga for events')

    const subUnsubChannel = yield actionChannel(subUnsubActions)

    // unconnected -> connected -> unconnected -> connected -> ... loop
    while (true) {
        // If we exited the previous iteration due to a connection error, we still have subscribers and should establish
        // a new connection without waiting for a subscribe action.
        const subCount = yield select(subCountSelector)
        if (subCount > 0) {
            sseLog(customEventId, 'Entering connect loop with active subscriptions. Reconnecting...')
            // Wait 5 seconds before attempting to reconnect
            yield delay(5000)
        } else {
            sseLog(customEventId, 'Waiting for subscribe action...')
            yield take(subUnsubChannel)
        }

        // Connect and wire up socket to incoming / outgoing message channels
        const scheduleId = yield select(getActiveOrgId)

        const scheduleUpdateChannel = openSseEventChannel(eventSourceUrlFn(scheduleId), customEventId)
        const raceResult = yield race({
            connectionClosedOrError: call(
                processSseEvents,
                scheduleUpdateChannel,
                customEventId,
                eventHandler,
                sseConnectionDebugAction
            ),
            zeroSubscribers: call(waitForNoSubscribers, subUnsubChannel, subCountSelector, customEventId)
        })

        // Disconnect on error or when there are no subscribers
        const closeReason = 'zeroSubscribers' in raceResult ? 'zero subscribers' : 'connection error'
        sseLog(customEventId, `Closing connection due to ${closeReason}.`)
        scheduleUpdateChannel.close()
    }
}
