import { CryptoUtils } from '@oasis/utils';
import type { IClientOptions, IClientPublishOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';
import { proxy } from 'valtio';
import { useProxy } from 'valtio/utils';
import { Err, Ok } from '../../lib/result';
import { Oasis } from '../../oasis';
import { TokenManager } from '../../services/token-manager/token-manager.service';
import { SubscriptionHandler } from '../../types';
import { Segment } from '../segment/segment.provider';
import { MqttIncomingPayloads } from './mqtt.schemas';
import { MqttPayloads } from './mqtt.types';

let client: MqttClient | undefined;
let connectedAt = 0;

const topicHandlerIdLookup = new Map<string, string[]>();
const handlerLookup: Map<string, [topic: string, handler: SubscriptionHandler]> = new Map();

interface MqttStore {
  status: 'PENDING' | 'CONNECTED' | 'DISCONNECTED';
  outgoing: { topic: string; payload: unknown; timestamp: Date }[];
  incoming: { topic: string; payload: unknown; timestamp: Date }[];
}

export const Mqtt = {
  store: proxy<MqttStore>({
    status: 'PENDING',
    outgoing: [],
    incoming: [],
  }),

  useStore: () => useProxy(Mqtt.store),

  async connect() {
    if (client?.connected) {
      Mqtt.store.status = 'CONNECTED';
      Oasis.Logger.info('[Mqtt.connect] Attempted to connect while already connected.');
      return Err({ code: 'ALREADY_CONNECTED' });
    }

    if (Oasis.Session.store.status !== 'AUTHENTICATED') {
      Oasis.Logger.info('[Mqtt.connect] Attempted to connect before authenticating.');
      return Err({ code: 'UNAUTHORIZED' });
    }

    const { id: userId, analyticsId } = Oasis.Session.store.user;
    // @TODO - To be removed
    // This is temporary solution to allow testing until we have fully migrated mqtt to use the new auth system.
    let mqttUsername = await Oasis.FeatureFlags.getFlagValueAsync<string>('240912-7031-mqtt-auth-userid-override');

    /*
     * Using this hook to add the username and password to the client options before connecting. We already have
     * retrial logic in place, so whenever it disconnected for any reason, including cases where the token is expired,
     * it will try to reconnect and then new tokens will be set in the client options.
     *
     * See reference https://github.com/mqttjs/MQTT.js/?tab=readme-ov-file#about-reconnection
     *
     */
    const transformWsUrl: IClientOptions['transformWsUrl'] = (_url, _options, _client) => {
      // We try to get a flag value in case it changed during runtime.
      mqttUsername = Oasis.FeatureFlags.getFlag<string>('240912-7031-mqtt-auth-userid-override');
      if (!mqttUsername) {
        mqttUsername = `${userId}::${analyticsId}::oasis-web`;
      }

      _client.options.username = mqttUsername;
      _client.options.password = Oasis.TokenManager.getAccessTokenDangerously();

      return _url;
    };

    try {
      client = mqtt.connect(Oasis.Env.store.mqttUrl, {
        transformWsUrl,
        clean: true,
        manualConnect: true,
        keepalive: 15,
        reschedulePings: true,
        reconnectPeriod: 0,
        clientId: `${Oasis.Session.store.user?.analyticsId}:${CryptoUtils.randomUUID()}`,
      });

      client.on('connect', _handleConnect);
      client.on('close', () => _handleDisconnect('close'));
      client.on('disconnect', packet => _handleDisconnect('disconnect', packet));
      client.on('offline', () => _handleDisconnect('offline'));
      client.on('end', () => _handleDisconnect('end'));
      client.on('error', _handleError);
      client.on('message', async (topic, payload, packet) => {
        const stringPayload = payload.toString();

        if (stringPayload) {
          _handleMessage(topic, stringPayload);

          // Clearing out any retained messages. Not sure this is the best way to handle it.
          if (packet.retain) {
            await client?.publishAsync(topic, '', { retain: true });
          }
        }
      });

      client.connect();

      return Ok(true);
    } catch (error) {
      Mqtt.store.status = 'DISCONNECTED';
      Oasis.Logger.info({ error, msg: '[Mqtt.connect] Failed to connect Mqtt provider.' });
      return Err({ code: 'ERROR' });
    }
  },

  reconnect() {
    try {
      return client?.reconnect();
    } catch (error: any) {
      Oasis.Logger.info({ error, msg: '[Mqtt.reconnect] Failed to reconnect Mqtt provider.' });
      return Err({ code: 'UNKNOWN' });
    }
  },

  async subscribe(topic: string, handler: SubscriptionHandler) {
    try {
      if (!client?.connected || client?.disconnecting || client?.disconnected) {
        Oasis.Logger.info('[Mqtt.subscribe] Attempted to subscribe without connection.');
        return Err({ code: 'NO_CONN' });
      }

      const handlerId = CryptoUtils.randomUUID();

      // Subscribe and register handler.
      const wrappedHandler: SubscriptionHandler = (payload, topic) => {
        if (Oasis.Env.store.isDevMode) {
          Mqtt.store.incoming = [{ topic, payload, timestamp: new Date() }, ...Mqtt.store.incoming];
        }

        return handler(payload, topic);
      };

      await client?.subscribeAsync(topic);

      // Set the topic handler to it's id, and push it to the topic handlers lookup map.
      const topicHandlerIds = topicHandlerIdLookup.get(topic) ?? [];
      topicHandlerIdLookup.set(topic, [...topicHandlerIds, handlerId]);
      handlerLookup.set(handlerId, [topic, wrappedHandler]);

      Oasis.Logger.info({
        activeHandlers: topicHandlerIds.length + 1,
        msg: `[Mqtt.subscribe] Subscribed to ${topic}.`,
      });

      return Ok({
        unsubscribe: () => Mqtt.unsubscribe(topic),
        removeHandler: () => Mqtt.removeHandler(handlerId),
      });
    } catch (error) {
      Oasis.Logger.error({ error, msg: '[Mqtt.subscribe] Exception thrown while subscribing.' });

      if (error instanceof Error && 'code' in error && error.code === 128) {
        Segment.track('MQTT Authorization Error', { topic });
        return Err({ code: 'FORBIDDEN' });
      }

      return Err({ code: 'UNKNOWN' });
    }
  },

  /**
   * @name removeHandler
   */
  removeHandler(handlerId: string) {
    const lookup = handlerLookup.get(handlerId);

    if (lookup) {
      const [topic] = lookup;
      const topicHandlerIds = topicHandlerIdLookup.get(topic);
      topicHandlerIdLookup.set(topic, topicHandlerIds?.filter(id => id !== handlerId) ?? []);
      handlerLookup.delete(handlerId);
    }
  },

  /**
   * @name unsubscribe
   * Unsubscribe from the mqtt topic, and remove the handler from the lookup.
   */
  async unsubscribe(topic: string) {
    try {
      if (!client?.connected || client?.disconnecting || client?.disconnected) {
        return;
      }

      await client?.unsubscribeAsync(topic);

      const topicHandlerIds = topicHandlerIdLookup.get(topic);
      topicHandlerIdLookup.delete(topic);

      if (topicHandlerIds && topicHandlerIds.length) {
        for (const handlerId of topicHandlerIds) {
          handlerLookup.delete(handlerId);
        }
      }

      Oasis.Logger.info(`[Mqtt.unsubscribe] Unsubscribed from ${topic}.`);
    } catch (error) {
      Oasis.Logger.error({ error, msg: '[Mqtt.unsubscribe] Exception thrown while unsubscribing.' });
    }
  },

  async publish(
    topic: string,
    payload: MqttPayloads[] = [],
    opts: IClientPublishOptions = {},
    isRetry = false
  ): Promise<void> {
    try {
      if (!client?.connected || client?.disconnecting || client?.disconnected) {
        Oasis.Logger.info('[Mqtt.publish] Attempted to publish without connection.');

        if (!isRetry) {
          await Mqtt.connect();
          return Mqtt.publish(topic, payload, opts, true);
        }
      }

      Oasis.Logger.info({ payload, topic, msg: '[Mqtt.publish] Sending message...' });

      const res = await client?.publishAsync(topic, JSON.stringify(payload), { ...opts, qos: 2 });

      if (!res?.cmd) {
        Oasis.Logger.error({ res, topic, payload, msg: '[Mqtt.publish] Failed to send message.' });
        return;
      }

      Oasis.Logger.info('[Mqtt.publish] Sent message successfully.');

      if (Oasis.Env.store.isDevMode) {
        Mqtt.store.outgoing = [{ topic, payload, timestamp: new Date() }, ...Mqtt.store.outgoing];
      }
    } catch (error) {
      Oasis.Logger.error({ error, msg: '[Mqtt.publish] Exception thrown while publishing message.' });
    }
  },

  async disconnect() {
    return client?.endAsync(true);
  },
};

function _handleConnect() {
  connectedAt = Date.now();
  Mqtt.store.status = 'CONNECTED';
  Oasis.Logger.info('[Mqtt._handleConnect] Connected to broker.');
}

function _handleDisconnect(disconnectEventName: string, packet?: mqtt.Packet) {
  Mqtt.store.status = 'DISCONNECTED';
  Oasis.Logger.info({
    packet,
    disconnectEventName,
    connectionDuration: _getDuration(),
    msg: `[Mqtt._handleDisconnect] Disconnected to broker.`,
  });
}

function _handleMessage(topic: string, message: string) {
  try {
    const handlerIds = topicHandlerIdLookup.get(topic);

    if (!handlerIds || !handlerIds.length) {
      return Oasis.Logger.info('[Mqtt] Received a message without a registered handler.');
    }

    const parsedMessage = JSON.parse(message) as unknown;

    Oasis.Logger.info({ topic, message, msg: '[Mqtt] Message received.' });

    if (typeof parsedMessage === 'string') {
      _parseAndHandleMessage(parsedMessage, topic);
    }

    if (Array.isArray(parsedMessage)) {
      for (const messageSegment of parsedMessage) {
        _parseAndHandleMessage(messageSegment, topic);
      }
    }
  } catch (error) {
    Oasis.Logger.info({ error, msg: '[Mqtt._handleMessage] Failed to handle received message.' });
  }
}

function _parseAndHandleMessage(message: string, topic: string) {
  try {
    const parsed = MqttIncomingPayloads.parse(message);
    const handlerIds = topicHandlerIdLookup.get(topic);

    if (handlerIds) {
      for (const handlerId of handlerIds) {
        const handler = handlerLookup.get(handlerId)?.[1];
        if (handler) handler(parsed, topic);
      }
    }
  } catch (error) {
    Oasis.Logger.info({ error, msg: '[Mqtt._parseAndHandleMessage] Failed to parse and handle received message.' });
  }
}

async function _handleError(error: unknown) {
  let msg = 'Unknown error.';

  if (error instanceof mqtt.ErrorWithReasonCode) {
    msg = error.message;

    // Error code 5 is a non-authorized connection attempt. This is likely due to an expired token.
    // @TODO - Better definitions of error codes.
    if (error.code === 5) {
      Oasis.Logger.error({
        error: error.message,
        connectionDuration: _getDuration(),
        msg: `[Mqtt] MQTT encountered an error due to unauthorized connection attempt. The token might be expired.`,
      });
      /*
       * Token refresh in case of expiry should be handled globally, but in case we reach here,
       * we make sure its refreshed, to avoid manual recovery.
       * It could indicate a different authorization issue, and this should be investigated.
       */
      await TokenManager.refresh();
      return;
    }
  }

  Oasis.Logger.error({ error, connectionDuration: _getDuration(), msg: `[Mqtt] ${msg}` });
}

function _getDuration() {
  const duration = (Date.now() - connectedAt) / 1000 / 60;
  return `${duration} minutes`;
}
