import type { ClickHouseClient } from "@clickhouse/client";
import { v6 as uuidv6 } from "uuid";
import { getProcessedFiles, putProcessedFile } from "../utils/ddb";
import { copyObject, listObjects } from "../utils/s3";
import { adjust, game_data, spend } from "./clickhouse";

export const routine = async (s3Client, ddbClient, clickhouse: ClickHouseClient, targetDate, bucket, dryRun) => {
  const keysToCheck: any = [];
  const processedKeys: any = [];
  const results: any = {};

  // Generate random GUID
  const runId = uuidv6();
  console.log(`Run ID: ${runId}`);
  results.run_id = runId;
  results.dry_run = dryRun;

  let prefix = "";
  switch (bucket) {
    case "wisdom-adjust-production":
      prefix = `date=${targetDate}`;
      break;
    case "wisdom-reporting-production":
      prefix = `spend/data/report_date=${targetDate}`;
      break;
    case "wisdom-events-production":
      prefix = `events/ingestion_date=${targetDate}`;
      break;
  }
  const s3Keys = await listObjects(bucket, `${prefix}/`, s3Client);
  s3Keys.forEach((key) => {
    keysToCheck.push({
      key: key,
      bucket_name: bucket,
    });
  });

  const ddbKeys = await getProcessedFiles(ddbClient, targetDate, bucket);
  processedKeys.push(...ddbKeys);

  console.log(`Found ${keysToCheck.length} keys to check, ${processedKeys.length} keys already processed`);

  const keysToProcess: any = [];
  keysToCheck.forEach((key) => {
    if (!processedKeys.includes(key.key)) {
      console.log(`Processing key: ${key.key}`);
      keysToProcess.push(key);
    }
  });

  console.log(`Found ${keysToProcess.length} keys to process`);
  await keysToProcess.forEach(async (key) => {
    console.log(`Processing key: ${key.key}`);

    try {
      await copyObject(key.bucket_name, key.key, "wisdom-nw-temp", `${key.bucket_name}/${runId}/${key.key}`, s3Client);
    } catch (e) {
      throw new Error(`Error copying object: ${e}`);
    }
  });

  let clickhouseFailure = false;
  const clickhouseQueryPrefix: any = [];
  // Insert to clickhouse
  // Check if we have any keys to process in this bucket
  const keys = keysToProcess.filter((key) => key.bucket_name === bucket);
  if (keys.length > 0) {
    let clickhouseQuery = "";
    switch (bucket) {
      case "wisdom-adjust-production":
        {
          const prefix: string = `wisdom-nw-temp/${bucket}/${runId}/**/**/*.csv.gz`;
          clickhouseQuery = adjust.replace("<REPLACE_ME>", prefix);
          clickhouseQueryPrefix.push(prefix);
        }
        break;
      case "wisdom-reporting-production":
        {
          const prefix = `wisdom-nw-temp/${bucket}/${runId}/spend/data/**/**/**/*.parquet`;
          clickhouseQuery = spend.replace("<REPLACE_ME>", prefix);
          clickhouseQueryPrefix.push(prefix);
        }
        break;
      case "wisdom-events-production":
        {
          const prefix = `wisdom-nw-temp/${bucket}/${runId}/events/**/**/**/*.parquet`;
          clickhouseQuery = game_data.replace("<REPLACE_ME>", prefix);
          clickhouseQueryPrefix.push(prefix);
        }
        break;
    }

    if (clickhouseQuery === "") {
      console.error("No query found for bucket", bucket);
      return results;
    }
    console.log(`Running query: ${clickhouseQuery}`);
    if (dryRun) {
      console.log("Dry run enabled, skipping query");
    } else {
      try {
        await clickhouse.command({
          query: clickhouseQuery,
          clickhouse_settings: {
            async_insert: 1,
            wait_for_async_insert: 0,
          },
        });
      } catch (e) {
        clickhouseFailure = true;
        console.error("Something went wrong with the query", e);
      }
    }
  }
  results.clickhouse_query_prefix = clickhouseQueryPrefix;
  results.clickhouse_failure = clickhouseFailure;
  results.targetKeysCount = keysToCheck.length;
  results.processedKeysCount = keysToProcess.length;

  console.log(keysToProcess);

  if (keysToCheck.length === 0) {
    results.targetKeys = [];
  } else {
    const groupedKeysToCheck = keysToCheck.reduce((acc, key) => {
      if (!acc[key.bucket_name]) {
        acc[key.bucket_name] = [];
      }
      acc[key.bucket_name].push(key);
      return acc;
    }, {});
    results.targetKeys = groupedKeysToCheck;
  }

  if (keysToProcess.length === 0) {
    results.processedKeys = [];
  } else {
    const groupedKeysToProcess = keysToProcess.reduce((acc, key) => {
      if (!acc[key.bucket_name]) {
        acc[key.bucket_name] = [];
      }
      acc[key.bucket_name].push(key);
      return acc;
    });

    results.processedKeys = groupedKeysToProcess;
  }

  // If all succeeded, we can put
  if (!dryRun) {
    await keysToProcess.forEach(async (key) => {
      await putProcessedFile(ddbClient, {
        date: targetDate,
        bucket_name: key.bucket_name,
        file_name: key.key,
        created_at: new Date().toISOString(),
      });
    });
  }

  return results;
};
