/*
 * Copyright (C) 2016 Bilibili. All Rights Reserved.
 *
 * @author zheng qian <xqq@xqq.im>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import Log from '../utils/logger.js';
import {BaseLoader, LoaderStatus, LoaderErrors} from './loader.js';
import {RuntimeException} from '../utils/exception.js';
import {base64} from 'rfc4648';
import msgpack from "msgpack-lite";

/**
 * Provides an interface for many websockets as if they are one websocket.
 * This is a workaround for AWS API Gateway bug where a websocket stops receiving messages.
 */
class MultiWebSocketLoader extends BaseLoader {
    constructor() {
        super('multi-websocket-loader');
        this.TAG = 'MultiWebSocketLoader';
        this._lastDispatchedSequenceNumber = -1;
        this._orderingBuffer = {
            // <sequenceNumber>: this._typedMessageObject('...')
        };
        this._receivedLength = 0;
        this._totalDispatched = 0;
        
        // main websocket
        this._main = new WebSocketLoader(this);
        // fallback websocket for when main fails
        this._fallback = new WebSocketLoader(this);
    }
    
    destroy() {
        try {
            this._fallback?.destroy();
        } catch (e) {}
        try {
            this._main.destroy();
        } finally {
            super.destroy();
        }
    }
    
    open(dataSource) {
        this._connectionUrl = new URL(dataSource.url);
        
        console.log(this._connectionUrl.toString());
        // this._prepareWebSocketLoader(this._main);
        this._main.open({
            url: this._connectionUrl.toString(),
        });
        this._decrementPriority();
        
        this._fallback.open({
            url: this._connectionUrl.toString(),
        });
        this._decrementPriority();
    }

    _openNewConnection() {
        const wsl = new WebSocketLoader(this);
        wsl.open({
            url: this._connectionUrl.toString(),
        });
        this._decrementPriority();
        return wsl;
    }
    
    _decrementPriority() {
        const nextPriority = this._connectionUrl.searchParams.get('priority') - 1;
        this._connectionUrl.searchParams.set('priority', nextPriority);
        Log.i(this.TAG, `next priority = ${nextPriority}`);

        if (nextPriority < 0) {
            this._reloadPage('Priority for next websocket is lower than 0, force reload to try recover.');
        }
    }

    abort() {
        try {
            this._fallback.abort();
        } finally {
            this._main.abort();
        }
    }

    get status() {
        return this._main.status;
    }
    
    _checkConnections() {
        // when a WebSocketLoader doesnt receive messages for a while, close it
        const timeSinceMainReceived = Date.now() - this._main.lastReceivedAt;
        const timeSinceFallbackReceived = Date.now() - this._fallback.lastReceivedAt;
        // fallback receives infrequent heartbeat messages
        if (timeSinceFallbackReceived > 16000) {
            try {
                Log.w(this.TAG, `fallback websocket did not receive messages for ${timeSinceFallbackReceived} ms, closing it`);
                this._fallback.destroy();
                this._fallback = this._openNewConnection();
            } catch (e) {}
        }
        if (timeSinceMainReceived > 10000) {
            try {
                Log.w(this.TAG, `main websocket did not receive messages for ${timeSinceMainReceived} ms, closing it.`);
                this._main.destroy();
                this._main = this._fallback;

                // create a new fallback connection
                this._fallback = this._openNewConnection();
            } catch (e) {}
        }
    }
    /**
     * G4G: entrypoint for all code additions and changes that were required to support our
     * custom data protocol.
     * Messages are received from API gateway/lambda not in the order they were sent by C#.
     * @param {string} text 
     * @param {WebSocketLoader} webSocketLoader
     */
    onTextMessage(text, webSocketLoader) {
        if (text.startsWith('{')) {
            const obj = JSON.parse(text);
            switch (obj.type) {
                case 'heartbeat':
                    Log.d(this.TAG, 'heartbeat received on ...' + webSocketLoader.url.slice(-11));
                    break;
                default:
                    Log.w(this.TAG, 'Received error from server: ' + text);
                    if (Bugsnag) Bugsnag.leaveBreadcrumb('Received error from server: ' + text);
                    break;
            }
            return;
        }

        const message = typedMessageObject(text);
        const sequenceNumber = message.Flv.SequenceNumber;
        if (sequenceNumber % 100 === 0) {
            this._checkConnections();
            if (this._main !== webSocketLoader) {
                Log.i(this.TAG, `received media message #${sequenceNumber} from fallback websocket`);
            }
        }
        if (this._lastDispatchedSequenceNumber === -1) {
            // need a tag to start playback
            if (message.Flv.StartTagByteOffset < 0) {
                return;
            }
            // first message received, dispatch hard-coded flv file header
            this._dispatchArrayBuffer(flvFileHeader().buffer);
            this._lastDispatchedSequenceNumber = sequenceNumber;
            const sliced = message.Flv.Payload.buffer.slice(message.Flv.StartTagByteOffset);
            const msg = `dispatch first received message with tag 0x${new Uint8Array(sliced.slice(0, 1))[0].toString(16)}`;
            if (Bugsnag) Bugsnag.leaveBreadcrumb(msg);
            Log.i(this.TAG, msg);
            this._dispatchArrayBuffer(sliced);
        } else {
            // Log.i(this.TAG, message.Flv);
            this._orderingBuffer[sequenceNumber] = message;
            Log.v(this.TAG, `stored sequenceNumber=${message.Flv.SequenceNumber} StartTagByteOffset=${message.Flv.StartTagByteOffset}`);
        }
        // this text is useful for bugsnag reports, for now we always update it (see index.html)
        if (true) {
            const msToSeconds = (ms) => Math.floor(ms / 1000);
            const text = `created: ${msToSeconds(message.Flv.CreatedAtTimestamp)}`
                + `\nreceived: ${msToSeconds(Date.now())} (#${this._lastDispatchedSequenceNumber})`;
            window.debugTimestampText = text;
        }
        
        /*
         * TODO: 
         * Sometimes a message will never arrive during playback due to some server/network edge-case.
         * It is common that the first msg you get is #3 and you never get #4.
         * Either:
         *  a)
         *   wait for 5 sequential messages before starting playback.
         *   and when you go too long without the next expected message, restart connection.
         *  b)
         *   read startTagByteOffset and stash the whole tag before dispatching the tag,
         *    when you go too long without the expected message, skip that message and resume from the next
         *    startTagByteOffset.
         */
        let stored = this._orderingBuffer[this._lastDispatchedSequenceNumber + 1];
        while (stored) {
            delete this._orderingBuffer[this._lastDispatchedSequenceNumber + 1];
            this._dispatchArrayBuffer(stored.Flv.Payload.buffer);
            this._logDispatchedMessage(stored);
            this._lastDispatchedSequenceNumber = stored.Flv.SequenceNumber;
            this._totalDispatched++;
            stored = this._orderingBuffer[this._lastDispatchedSequenceNumber + 1];
        }

        // Expected next message has not arrived for many messages (should be edge-case).
        // After connected for a while, missing messages are much more likely to arrive eventually.
        const outOfOrderTolerance = this._totalDispatched < 100 ? 90 : 200;
        const numberDifference = sequenceNumber - this._lastDispatchedSequenceNumber;
        if (numberDifference > outOfOrderTolerance) {
            const msg = `Expected next message (number ${this._lastDispatchedSequenceNumber+1}) has not arrived for 80 messages, reloading page`;
            this._reloadPage(msg);
        } else if (numberDifference > outOfOrderTolerance * 0.75) {
            const msg = `Expected next message (number ${this._lastDispatchedSequenceNumber+1}) has not arrived for ${numberDifference} messages`;
            console.warn(msg);
            if (Bugsnag && numberDifference % 5 === 0) Bugsnag.leaveBreadcrumb(msg);
        }
    }

    _logDispatchedMessage(message) {
        let startTagByte = -1;
        if (message.Flv.StartTagByteOffset >= 0) {
            startTagByte = message.Flv.Payload[message.Flv.StartTagByteOffset].toString(16);
        }
        Log.v(this.TAG, `dispatch #${message.Flv.SequenceNumber} StartTagByteOffset=${message.Flv.StartTagByteOffset} startTagByte=0x${startTagByte}`);
    }

    _dispatchArrayBuffer(arraybuffer) {
        if (arraybuffer.byteLength === 0) {
            Log.w(this.TAG, `skipping empty array buffer`)
            return;
        }
        let chunk = arraybuffer;
        let byteStart = this._receivedLength;
        this._receivedLength += chunk.byteLength;

        if (this._onDataArrival) {
            this._onDataArrival(chunk, byteStart);
        }
    }

    _reloadPage(reason) {
        if (this._reloading) return;
        this._reloading = true;

        if (window.reloadPage) {
            window.reloadPage(reason);
        } else {
            console.error(reason);
            window.location.reload();
        }
    }
}

/**
 * @param {string} message websocket text message
 */
function typedMessageObject(message) {
    const typed = {
        Version: -1,
        Type: -1,
        StreamToken: '',
        Flv: {
            SequenceNumber: -1,
            Payload: new Uint8Array(),
            StartTagByteOffset: -1,
            CreatedAtTimestamp: -1,
        },
        Ack: null,
    };

    // base64 -> binary
    const msgPackEncoded = base64.parse(message);
    // msgpack -> object
    const decoded = msgpack.decode(msgPackEncoded);
    if (!(decoded instanceof Array)) {
        Log.e(this.TAG, `msgpack decoded object is not an array, got ${decoded}`);
        this.abort();
        return;
    }
    typed.Version = decoded[0];
    typed.Type = decoded[1];
    typed.StreamToken = decoded[2];
    
    const flvDataArray = decoded[3];
    if (flvDataArray) {
        const flvData = {
            SequenceNumber: flvDataArray[0],
            Payload: flvDataArray[1],
            StartTagByteOffset: flvDataArray[2],
            CreatedAtTimestamp: flvDataArray[3],
        };
        typed.Flv = flvData;
    }
    return typed;
}

function flvFileHeader() {
    return new Uint8Array([0x46, 0x4c, 0x56, 0x01, 0x05, 0, 0, 0, 0x09, 0, 0, 0, 0]);
}

// For FLV over WebSocket live stream
class WebSocketLoader extends BaseLoader {

    static isSupported() {
        try {
            return (typeof self.WebSocket !== 'undefined');
        } catch (e) {
            return false;
        }
    }

    /**
     * @param {MultiWebSocketLoader} mgr 
     * @param {object} _config 
     */
    constructor(mgr, _config) {
        super('websocket-loader');
        this.TAG = 'WebSocketLoader';

        this._needStash = true;
        this._mgr = mgr;

        this._ws = null;
        this._requestAbort = false;
        this.lastReceivedAt = Date.now();
    }

    destroy() {
        if (this._ws) {
            this.abort();
        }
        super.destroy();
    }

    open(dataSource) {
        try {
            this.url = dataSource.url;
            let ws = this._ws = new self.WebSocket(dataSource.url);
            ws.binaryType = 'arraybuffer';
            ws.onopen = this._onWebSocketOpen.bind(this);
            ws.onclose = this._onWebSocketClose.bind(this);
            ws.onmessage = this._onWebSocketMessage.bind(this);
            ws.onerror = this._onWebSocketError.bind(this);
            
            this._status = LoaderStatus.kConnecting;
            this.lastReceivedAt = Date.now();
        } catch (e) {
            this._status = LoaderStatus.kError;

            let info = {code: e.code, msg: e.message};

            if (this._onError) {
                this._onError(LoaderErrors.EXCEPTION, info);
            } else {
                throw new RuntimeException(info.msg);
            }
        }
    }

    abort() {
        let ws = this._ws;
        if (ws && (ws.readyState === 0 || ws.readyState === 1)) {  // CONNECTING || OPEN
            this._requestAbort = true;
            ws.close();
        }

        this._ws = null;
        this._status = LoaderStatus.kComplete;
    }

    _onWebSocketOpen(e) {
        this._status = LoaderStatus.kBuffering;

        this._sendViewerStatusInterval = setInterval(this._sendViewerStatus.bind(this), 5000);
        this._sendViewerStatus();
    }
    
    _sendViewerStatus() {
        const version = 1
        const type = 3 // MessageTypeViewerStatus
        const streamHash = window.streamHash
        const nowMilliseconds = Date.now()
        const encoded = msgpack.encode([version, type, streamHash, [nowMilliseconds]]);
        const textMessage = base64.stringify(encoded);
        if (!this.loggedViewerStatus) {
            console.log('Begin sending viewer status: %o', encoded)
            this.loggedViewerStatus = true
        }
        Log.v(`send viewer ping ${textMessage.length} chars`)
        try {
            this._ws?.send(textMessage)
        } catch (e) {
            console.error(e)
            Log.i(this.TAG, 'Cancelled send viewer status because of error.');
            window.clearInterval(this._sendViewerStatusInterval);
        }
    }

    _onWebSocketClose(e) {
        if (this._requestAbort === true) {
            this._requestAbort = false;
            return;
        }

        this._status = LoaderStatus.kComplete;

        if (this._onComplete) {
            this._onComplete(0);
        }
    }

    _onWebSocketMessage(e) {
        this.lastReceivedAt = Date.now();

        if (typeof e.data === 'string') {
            this._mgr.onTextMessage(e.data, this);
        } else if (e.data instanceof ArrayBuffer) {
            this._mgr._dispatchArrayBuffer(e.data);
        } else if (e.data instanceof Blob) {
            let reader = new FileReader();
            reader.onload = () => {
                this._mgr._dispatchArrayBuffer(reader.result);
            };
            reader.readAsArrayBuffer(e.data);
        } else {
            this._status = LoaderStatus.kError;
            let info = {code: -1, msg: 'Unsupported WebSocket message type: ' + e.data.constructor.name};

            if (this._onError) {
                this._onError(LoaderErrors.EXCEPTION, info);
            } else {
                throw new RuntimeException(info.msg);
            }
        }
    }

    _onWebSocketError(e) {
        this._status = LoaderStatus.kError;

        let info = {
            code: e.code,
            msg: e.message
        };

        if (this._onError) {
            this._onError(LoaderErrors.EXCEPTION, info);
        } else {
            throw new RuntimeException(info.msg);
        }
    }

}

export default MultiWebSocketLoader;
