"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.StreamingService = void 0;
const messaging_common_1 = require("@guided-methods/messaging-common");
const streaming_1 = require("@guided-methods/streaming");
const streaming_worker_1 = require("./streaming-worker");
class StreamingService {
    constructor(tokenService, url, messageService, logger) {
        this.measurements = {};
        this.errorSampleResponse = (message, measurementId) => {
            return {
                type: streaming_1.StreamingServiceResponseType.Error,
                error: 500,
                errorMessage: message,
                measurementId
            };
        };
        this.tokenService = tokenService;
        this.url = url;
        this.messageService = messageService;
        this.logger = logger;
    }
    storeRequest(message, measurementId) {
        this.measurements[measurementId] = { request: message, running: false };
    }
    removeRequest(measurementId) {
        try {
            delete this.measurements[measurementId];
        }
        catch (error) {
            this.logger.warn('Did not find given measurementId to remove request');
        }
    }
    async start(message) {
        if (!message?.type || message.type !== messaging_common_1.GuideMessageType.Data) {
            return;
        }
        const { measurementId } = message.data;
        const responseChannel = (0, messaging_common_1.createChannel)(streaming_1.SignalSamplesResponse, message.data.responseChannel?.id ?? message.data.responseChannel.toString());
        try {
            if (!this.measurements[measurementId]) {
                throw new Error(`No current measurement with id ${measurementId} exists`);
            }
            const request = this.measurements[measurementId].request;
            if (request.type !== messaging_common_1.GuideMessageType.Data)
                throw new Error('Request is not of type data');
            const requestData = request.data;
            const frequency = Math.min(requestData.measurements[0].ioParams.frequency, 1);
            if (frequency != requestData.measurements[0].ioParams.frequency) {
                this.logger.warn('Currently, only supporting 1 Hz. Frequency was adjusted.');
            }
            const { readIos, readValues } = this.sortPerEcuAndType(measurementId);
            const { productId, productIdType } = request.context;
            var worker = (0, streaming_worker_1.createStreamingWorker)();
            worker.addEventListener('message', (message) => {
                let data = message.data;
                if (data.type === 'data' && data.ios) {
                    try {
                        const signalSamples = this.createSignalSamples(request.data, data.ios, data.index);
                        const result = {
                            measurementId,
                            streams: signalSamples.filter(s => s !== undefined),
                            type: streaming_1.StreamingServiceResponseType.Ok,
                        };
                        this.messageService.send(responseChannel, messaging_common_1.GuideMessageType.Data, request.context, result);
                    }
                    catch (error) {
                        console.error(`Failed to parse WCS response: ${error instanceof Error ? error.message : "Unknown error"}`);
                        this.messageService.send(responseChannel, messaging_common_1.GuideMessageType.Data, request.context, this.errorSampleResponse(`Failed to parse streaming response: ${error instanceof Error ? error.message : "Unknown error"}`, measurementId));
                    }
                }
                else if (data.type === 'error' || !data.ios) {
                    console.error('Error event from worker: ', data);
                    this.messageService.send(responseChannel, messaging_common_1.GuideMessageType.Data, request.context, this.errorSampleResponse(data.message ?? 'Failed to parse streaming response', measurementId));
                }
            });
            worker.addEventListener('error', (error) => {
                console.error('error event from worker: ', error);
                this.messageService.send(responseChannel, messaging_common_1.GuideMessageType.Data, request.context, this.errorSampleResponse(error.message, measurementId));
            });
            worker.postMessage({
                type: 'start',
                measurementId,
                productId,
                productIdType,
                readIos,
                readValues,
                token: await this.tokenService.getToken(),
                url: this.url,
                frequency,
            });
            this.measurements[measurementId].running = true;
            this.measurements[measurementId].worker = worker;
        }
        catch (error) {
            console.error('Error when starting streaming: ', error instanceof Error ? error : undefined);
            this.logger.error({ message: 'Error when starting streaming', ...message?.context });
            this.messageService.send(responseChannel, messaging_common_1.GuideMessageType.Data, message.context, this.errorSampleResponse(error instanceof Error ? error.message : "Unknown error", measurementId));
        }
    }
    createSignalSamples(request, ios, counter) {
        const samples = ios.map((io) => {
            if (io) {
                const metadata = request?.measurements?.find((signal) => signal?.ioParams?.ioName === io?.name);
                const result = {
                    signal: {
                        ...metadata,
                    },
                    ioName: io.name,
                    ecuFamilyName: io.familyName,
                    samples: [
                        {
                            index: counter,
                            timestamp: io.timestamp,
                            actualUnit: io.unit,
                            value: io.value,
                            dValue: io.valueType === 'Double' || io.valueType === 'Long' ? Number(io.value) : undefined,
                            sValue: io.value,
                            valueType: io.valueType,
                            scaniaState: io.scaniaState,
                        },
                    ],
                    hasMore: true,
                };
                return result;
            }
        });
        return samples;
    }
    sortPerEcuAndType(measurementId) {
        const readIos = [];
        const readValues = [];
        const request = this.measurements[measurementId]?.request;
        if (!request || request.type !== messaging_common_1.GuideMessageType.Data) {
            throw new Error('No request to process');
        }
        const ecuFamilies = [
            ...new Set(request.data.measurements.map((signal) => signal.ioParams.ecuFamilyName)),
        ];
        ecuFamilies.map((familyName) => {
            let ios = [];
            request.data.measurements.map((io) => {
                if (io.ioParams.ecuFamilyName === familyName) {
                    ios.push(io);
                }
            });
            const ioNameList = [];
            const valueNameList = [];
            ios.map((signal) => {
                if (signal.signalType === 'Io') {
                    ioNameList.push(signal.ioParams.ioName);
                }
                else if (signal.signalType === 'Value') {
                    valueNameList.push(signal.ioParams.ioName);
                }
            });
            if (ioNameList.length) {
                readIos.push({ ecu: familyName, ios: ioNameList });
            }
            if (valueNameList.length) {
                readValues.push({ ecu: familyName, ios: valueNameList });
            }
        });
        return { readIos, readValues };
    }
    async stop(message) {
        if (!message?.type || message.type !== messaging_common_1.GuideMessageType.Data) {
            return;
        }
        const { measurementId, responseChannel } = message.data;
        try {
            this.measurements[measurementId].running = false;
            this.measurements[measurementId].worker?.terminate();
            this.measurements[measurementId].worker = undefined;
            const responseChannel = (0, messaging_common_1.createChannel)(streaming_1.StopStreamingResponse, message.data.responseChannel?.id ?? message.data.responseChannel.toString());
            this.messageService.send(responseChannel, messaging_common_1.GuideMessageType.Data, message.context, {
                type: streaming_1.StreamingServiceResponseType.Ok,
            });
        }
        catch (error) {
            this.logger.debug(`in streaming-service, stop: error occurred ${measurementId}, error.message`);
            this.messageService.send(responseChannel, messaging_common_1.GuideMessageType.Data, message.context, {
                type: streaming_1.StreamingServiceResponseType.Error,
                error: 500,
                errorMessage: `Internal error occurred when attempting to stop listening to measurement '${measurementId}: ${error instanceof Error ? error?.message : ""}'`,
            });
        }
    }
}
exports.StreamingService = StreamingService;
