import { accessToken } from "@/services/AuthService"
import { env } from "@/env"
import mqtt, { AsyncMqttClient, IPublishPacket } from "async-mqtt"
import { watchEffect } from "@vue/composition-api"

/** MQTT 訊息 */
export interface MQTTMessage {
    topic: string
    payload: Buffer
    packet: IPublishPacket
}

/** 收到 MQTT 訊息事件處理器 */
export type MessageHandler = (model: MQTTMessage) => any

/**收到 MQTT 訊息事件處理器集合 */
const topicAndHandler = new Map<string, Set<MessageHandler>>()

/** 當前使用的客戶端 */
let client = null as AsyncMqttClient | null

/** 正在啟動客戶端 Promise */
let startingNewClientPromise = null as Promise<void> | null

/** 啟動新的客戶端，關閉舊的 */
function startNewClient(accesstoken: string) {
    startingNewClientPromise = new Promise(async (resolve) => {
        const oldClient = client
        console.debug("建立新MQTT連線...")
        console.debug("accesstoken", accesstoken)
        const newClient = await mqtt.connectAsync(env.MQTT_BROKER_URL, {
            username: accesstoken,
            password: "any",
            port: env.MQTT_BROKER_PORT,
        })
        console.debug("建立新MQTT連線完成")

        newClient.on("message", (topic, payload, packet) => {
            //如果訊息來源不是當前客戶端，就不處理
            if (newClient != client) return
            const handlers = topicAndHandler.get(topic)
            if (!handlers?.size) return
            //使用 Promise 平行觸發各處理器
            handlers.forEach((handler) => {
                new Promise((resolve, reject) => {
                    try {
                        resolve(
                            handler({
                                topic,
                                payload,
                                packet,
                            })
                        )
                    } catch (error) {
                        reject(error)
                    }
                })
            })
        })
        console.debug("重新訂閱 MQTT topic...")
        //訂閱目前所有處理器的主題
        topicAndHandler.forEach(async (_, topic) => {
            console.debug(`訂閱 MQTT topic:${topic} ...`)
            newClient.subscribe(topic)
            console.debug(`訂閱 MQTT topic:${topic} 完成`)
        })
        console.debug("重新訂閱 MQTT topic 完成")

        //切換當前使用客戶端
        client = newClient

        //關閉舊客戶端
        await oldClient?.end()
        resolve()
    })
}

watchEffect(async () => {
    // 當 accessToken 更新時，啟動新客戶端
    if (accessToken.value) startNewClient(accessToken.value)
})

/** 註冊收到訊息事件處理器 */
export function onMessage(topic: string, handler: MessageHandler) {
    if (!topicAndHandler.has(topic))
        topicAndHandler.set(topic, new Set<MessageHandler>())
    const handlers = topicAndHandler.get(topic)!
    handlers.add(handler)
    if (handlers.size === 1) subscribe(topic)
}

/** 向 broker 訂閱主題 */
async function subscribe(topic: string) {
    //等待客戶端完成
    if (startingNewClientPromise != null) await startingNewClientPromise
    if (client == null) {
        console.debug(`MQTT client 未就緒，跳過訂閱 ${topic}`)
        return
    }
    console.debug(`訂閱 MQTT topic:${topic} ...`)
    client!.subscribe(topic)
    console.debug(`訂閱 MQTT topic:${topic} 完成`)
}

/** 移除主題事件發處理器 */
export function offMessage(topic: string, handler: MessageHandler) {
    const handlers = topicAndHandler.get(topic)
    if (!handlers) return
    handlers.delete(handler)
    if (handlers.size === 0) {
        topicAndHandler.delete(topic)
        unsubscribe(topic)
    }
}

/** 向 broker 取消訂閱主題 */
async function unsubscribe(topic: string) {
    //等待客戶端完成
    if (startingNewClientPromise != null) await startingNewClientPromise
    if (client == null) {
        console.debug(`MQTT client 未就緒，跳過取消訂閱 ${topic}`)
        return
    }
    console.debug(`取消訂閱 MQTT topic:${topic} ...`)
    client!.unsubscribe(topic)
    console.debug(`取消訂閱 MQTT topic:${topic} 完成`)
}
