import { getJSONPrefix } from '@/lib/strUtils'
import React from 'react'
import { ReadableStreamDefaultReadResult } from 'node:stream/web'

async function* streamMessages(
  stream: React.MutableRefObject<ReadableStreamDefaultReader<Uint8Array> | null>,
  timeout: number = 10 * 60 * 1000,
  maxBufferSize: number = 10 * 1024 * 1024 // 10MB max buffer size
): AsyncGenerator<any, void, unknown> {
  let done = false
  let buffer = ''
  let timeoutId: ReturnType<typeof setTimeout> | null = null

  try {
    while (!done && stream.current !== null) {
      const { value, done: streamDone } = await Promise.race([
        stream.current?.read(),
        new Promise<ReadableStreamDefaultReadResult<Uint8Array>>(
          (_, reject) => {
            if (timeoutId) {
              clearTimeout(timeoutId)
            }
            timeoutId = setTimeout(
              () =>
                reject(
                  new Error(
                    'Stream timeout after ' +
                      timeout / 1000 +
                      ' seconds, this is likely caused by the LLM freezing. You can try again by editing your last message. Further, decreasing the number of snippets to retrieve in the settings will help mitigate this issue.'
                  )
                ),
              timeout
            )
          }
        ),
      ])

      if (streamDone) {
        done = true
        continue
      }

      if (value) {
        const decodedValue = new TextDecoder().decode(value)
        if (buffer.length + decodedValue.length > maxBufferSize) {
          throw new Error('Buffer size exceeded. Possible malformed input.')
        }
        buffer += decodedValue

        const [parsedObjects, currentIndex] = getJSONPrefix(buffer)
        for (let parsedObject of parsedObjects) {
          // check if we yielded an error
          if (
            typeof parsedObject === 'object' &&
            parsedObject !== null &&
            'error' in parsedObject
          ) {
            // This is an error object from the backend
            throw new Error(`${parsedObject.error}`)
          }
          yield parsedObject
        }
        buffer = buffer.slice(currentIndex)
        if (
          buffer.length > 0 &&
          !buffer.startsWith('{') &&
          !buffer.startsWith('[') &&
          !buffer.startsWith('(')
        ) {
          // If there's remaining data that doesn't start with '{', it's likely incomplete
          // Wait for the next chunk before processing
          continue
        }
      }
    }
  } catch (error) {
    console.error('Error during streaming:', error)
    throw error // Rethrow timeout errors
  } finally {
    if (timeoutId) {
      clearTimeout(timeoutId)
    }
    if (stream.current == null) {
      console.log('Stream interrupted by user')
      throw new Error('Stream interrupted by user')
    } else {
      console.log("Stream cancelling")
      stream.current.cancel()
      stream.current = null
    }
  }
}

async function* streamResponseMessages(
  response: Response,
  stream: React.MutableRefObject<ReadableStreamDefaultReader<Uint8Array> | null>,
  timeout: number = 1800_000,
  maxBufferSize: number = 10 * 1024 * 1024 // 10MB max buffer size
): ReturnType<typeof streamMessages> {
  const reader = response.body?.getReader()
  if (!reader) {
    throw new Error('No reader found in response')
  }
  stream.current = reader
  yield* streamMessages(stream, timeout, maxBufferSize)
}

export { streamMessages, streamResponseMessages }
