/* eslint-disable @typescript-eslint/no-use-before-define */

import { AsyncResultIterator, queue } from 'async';
import { iot, mqtt } from 'aws-iot-device-sdk-v2';
import { isNil, omit, pull, values } from 'lodash';
import { v4 as uuidv4 } from 'uuid';

import { credentialsProvider } from './CredentialsProvider';
import { generateTopic, parseTopic } from './topicHelpers';

import { captureException } from '../sentry';

const MAX_SUBSCRIPTIONS_PER_MQTT_CONNECTION = 40;
const DEFAULT_MQTT_QOS = 0;

interface MqttConnectionService {
  id: string;
  connection: mqtt.MqttClientConnection;
  subscriptions: string[];
}

interface SubscriptionWorkerAction {
  type: 'subscribe' | 'unsubscribe' | 'publish';
  topic: string;
  payload?: string;
}

const connections: MqttConnectionService[] = [];

interface MessageHandler {
  id: string;
  call: (type: string, baseId: string, payload: Uint8Array) => void;
}

const messageHandlers: Record<string, Record<string, MessageHandler>> = {};

const subscriptionWorker: AsyncResultIterator<
  SubscriptionWorkerAction,
  void
> = ({ type, topic, payload }, callback) => {
  try {
    /* eslint-disable promise/no-callback-in-promise */
    if (type === 'subscribe') {
      handleSubscribe(topic).then(() => callback());
    }

    if (type === 'unsubscribe') {
      handleUnsubscribe(topic).then(() => callback());
    }

    if (type === 'publish') {
      handlePublish(topic, payload).then(() => callback());
    }
    /* eslint-enable promise/no-callback-in-promise */
  } catch (error) {
    console.error('Error in MQTT subscription queue', error);
  }
};

const subscriptionQueue = queue(subscriptionWorker, 1);

const newMqttConnection = async () => {
  try {
    const credentials = credentialsProvider.getCredentials();
    let endpoint = credentialsProvider.getEndpoint();

    if (
      !credentials.aws_access_id ||
      !credentials.aws_secret_key ||
      !credentials.aws_sts_token ||
      !endpoint
    ) {
      await credentialsProvider.fetchCredentials();
      endpoint = credentialsProvider.getEndpoint();
    }

    const id = uuidv4();

    const config = iot.AwsIotMqttConnectionConfigBuilder.new_with_websockets()
      .with_clean_session(false)
      .with_client_id(id)
      .with_endpoint(endpoint)
      .with_credential_provider(credentialsProvider)
      .build();

    // console.log("Connecting to MQTT...");

    const client = new mqtt.MqttClient();
    const connection = client.new_connection(config);

    // connection.on("connect", (existingSession) => {
    //   console.log("Connected! existing session:", existingSession);
    // });

    // connection.on("interrupt", (error) => {
    //   console.log(`Connection interrupted: error: ${error}`);
    // });

    // connection.on("resume", (return_code, session_present) => {
    //   console.log(
    //     `Resumed: return code: ${return_code} existing session: ${session_present}`
    //   );
    // });

    // connection.on("disconnect", () => {
    //   console.log("Disconnected");
    // });

    connection.on('error', (error) => {
      captureException(error, {
        tags: { feature: 'mqtt' },
        extra: { message: 'Mqtt connection error' },
      });
    });

    connection.on('message', (topic, payload) => {
      const { org, type, baseId } = parseTopic(topic);
      if (isNil(type) || isNil(payload)) return;

      const parsedPayload = new Uint8Array(payload);
      let handlers = messageHandlers[topic] ?? {};
      const newTopic = generateTopic(type, org);

      if (newTopic) {
        const deidentifiedHandlers = messageHandlers[newTopic] ?? {};
        handlers = { ...handlers, ...deidentifiedHandlers };
      }

      values(handlers).forEach((handler) =>
        handler.call(type, baseId, parsedPayload),
      );
    });

    await connection.connect();

    return { id, connection, subscriptions: [] };
  } catch (error) {
    console.error('Error creating new MQTT connection', error);

    captureException(error, {
      extra: { message: 'Error creating new MQTT connection' },
    });

    return null;
  }
};

const getAvailableConnection = async () => {
  const existingConnection = connections.find(
    (connection) =>
      connection.subscriptions.length < MAX_SUBSCRIPTIONS_PER_MQTT_CONNECTION,
  );

  if (existingConnection) return existingConnection;

  const newConnection = await newMqttConnection();

  if (newConnection) {
    connections.push(newConnection);
    return newConnection;
  }

  return null;
};

const findConnectionBySubscription = (topic: string) => {
  const connection = connections.find((conn) =>
    conn.subscriptions.includes(topic),
  );

  return connection ?? null;
};

const handleSubscribe = async (topic: string) => {
  try {
    if (!isNil(findConnectionBySubscription(topic))) return;

    const connectionService = await getAvailableConnection();
    if (isNil(connectionService)) return;

    const res = await connectionService.connection.subscribe(
      topic,
      DEFAULT_MQTT_QOS,
    );

    /**
     * It appears that a QoS of greater than 2 indicates an error
     * with the subscription
     * TODO: verify this
     */

    // if (res.qos <= 2) {
    if (res.topic) connectionService.subscriptions.push(res.topic);
    // }
  } catch (error) {
    console.error('Error in MQTT client subscription', error, topic);

    captureException(error, {
      tags: { feature: 'mqtt' },
      extra: { message: 'Error in MQTT client subscription', topic },
    });
  }
};

const handleUnsubscribe = async (topic: string) => {
  try {
    const connectionService = findConnectionBySubscription(topic);
    if (isNil(connectionService)) return;

    await connectionService.connection.unsubscribe(topic);
    pull(connectionService.subscriptions, topic);
  } catch (error) {
    console.error('Error unsubscribing in MQTT client', error);

    captureException(error, {
      tags: { feature: 'mqtt' },
      extra: { message: 'Error unsubscribing in MQTT client', topic },
    });
  }
};

const handlePublish = async (topic: string, payload?: string) => {
  try {
    const connectionService = await getAvailableConnection();
    if (isNil(connectionService)) return;

    await connectionService.connection.publish(
      topic,
      payload ?? '{}',
      DEFAULT_MQTT_QOS,
    );
  } catch (error) {
    console.error('Error publishing in MQTT client', error);

    captureException(error, {
      tags: { feature: 'mqtt' },
      extra: { message: 'Error publishing in MQTT client', topic },
    });
  }
};

export const subscribeToTopic = async (topic: string) => {
  await subscriptionQueue.push({ type: 'subscribe', topic });
};

export const unsubscribeToTopic = async (topic: string) => {
  await subscriptionQueue.push({ type: 'unsubscribe', topic });
};

export const publishToTopic = async (topic: string, payload?: string) => {
  await subscriptionQueue.push({ type: 'publish', topic, payload });
};

export const registerMessageHandler = (
  topic: string,
  handler: MessageHandler,
) => {
  if (isNil(messageHandlers[topic])) messageHandlers[topic] = {};
  messageHandlers[topic][handler.id] = handler;
};

export const unregisterMessageHandler = (topic: string, id: string) => {
  if (isNil(messageHandlers[topic])) return;
  messageHandlers[topic] = omit(messageHandlers[topic] || {}, id);
};
