// AI.TS // by ihatenodejs/Aidan // // ----------------------------------------------------------------------- // // This is free and unencumbered software released into the public domain. // // Anyone is free to copy, modify, publish, use, compile, sell, or // distribute this software, either in source code form or as a compiled // binary, for any purpose, commercial or non-commercial, and by any // means. // // In jurisdictions that recognize copyright laws, the author or authors // of this software dedicate any and all copyright interest in the // software to the public domain. We make this dedication for the benefit // of the public at large and to the detriment of our heirs and // successors. We intend this dedication to be an overt act of // relinquishment in perpetuity of all present and future rights to this // software under copyright law. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. // IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR // OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR // OTHER DEALINGS IN THE SOFTWARE. // // For more information, please refer to import { isOnSpamWatch } from "../spamwatch/spamwatch" import spamwatchMiddlewareModule from "../spamwatch/Middleware" import { Telegraf, Context } from "telegraf" import type { Message } from "telegraf/types" import { replyToMessageId } from "../utils/reply-to-message-id" import { getStrings } from "../plugins/checklang" import axios from "axios" import { rateLimiter } from "../utils/rate-limiter" import { logger } from "../utils/log" import { ensureUserInDb } from "../utils/ensure-user" import * as schema from '../../database/schema' import type { NodePgDatabase } from "drizzle-orm/node-postgres" import { eq, sql, and, gt, isNotNull } from 'drizzle-orm' import { models, unloadModelAfterB, maxUserQueueSize } from "../../config/ai" import { isCommandDisabled } from "../utils/check-command-disabled" const spamwatchMiddleware = spamwatchMiddlewareModule(isOnSpamWatch) export const flash_model = process.env.flashModel || "gemma3:4b" export const thinking_model = process.env.thinkingModel || "qwen3:4b" function isAdmin(ctx: Context): boolean { const userId = ctx.from?.id; if (!userId) return false; const adminArray = process.env.botAdmins ? process.env.botAdmins.split(',').map(id => parseInt(id.trim())) : []; return adminArray.includes(userId); } function parseDuration(duration: string): number { const match = duration.match(/^(\d+)([smhdw])$/); if (!match) return -1; const value = parseInt(match[1]); const unit = match[2]; switch (unit) { case 's': return value; case 'm': return value * 60; case 'h': return value * 60 * 60; case 'd': return value * 60 * 60 * 24; case 'w': return value * 60 * 60 * 24 * 7; default: return -1; } } function formatDuration(seconds: number): string { if (seconds < 60) return `${seconds}s`; if (seconds < 3600) return `${Math.floor(seconds / 60)}m`; if (seconds < 86400) return `${Math.floor(seconds / 3600)}h`; if (seconds < 604800) return `${Math.floor(seconds / 86400)}d`; return `${Math.floor(seconds / 604800)}w`; } async function checkUserTimeout(ctx: Context, db: NodePgDatabase, userId: string, Strings: ReturnType): Promise { const user = await db.query.usersTable.findFirst({ where: (fields, { eq }) => eq(fields.telegramId, userId) }); if (!user) return false; if (user.aiTimeoutUntil && user.aiTimeoutUntil > new Date()) { const timeoutEnd = user.aiTimeoutUntil.toISOString(); const reply_to_message_id = replyToMessageId(ctx); await ctx.reply(Strings.ai.userTimedOutFromAI.replace("{timeoutEnd}", timeoutEnd), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return true; } return false; } type TextContext = Context & { message: Message.TextMessage } type User = typeof schema.usersTable.$inferSelect interface OllamaResponse { response: string; } async function usingSystemPrompt(ctx: TextContext, db: NodePgDatabase, botName: string, message: string): Promise { const user = await db.query.usersTable.findMany({ where: (fields, { eq }) => eq(fields.telegramId, String(ctx.from!.id)), limit: 1 }); if (user.length === 0) await ensureUserInDb(ctx, db); const userData = user[0]; const lang = userData?.languageCode || "en"; const Strings = getStrings(lang); const utcDate = new Date().toISOString(); const prompt = Strings.ai.systemPrompt .replace("{botName}", botName) .replace("{date}", utcDate) .replace("{message}", message); return prompt; } export function sanitizeForJson(text: string): string { return text .replace(/\\/g, '\\\\') .replace(/"/g, '\\"') .replace(/\n/g, '\\n') .replace(/\r/g, '\\r') .replace(/\t/g, '\\t') } function sanitizeMarkdownForTelegram(text: string): string { let sanitizedText = text; const replacements: string[] = []; const addReplacement = (match: string): string => { replacements.push(match); return `___PLACEHOLDER_${replacements.length - 1}___`; }; sanitizedText = sanitizedText.replace(/```([\s\S]*?)```/g, addReplacement); sanitizedText = sanitizedText.replace(/`([^`]+)`/g, addReplacement); sanitizedText = sanitizedText.replace(/\[([^\]]+)\]\(([^)]+)\)/g, addReplacement); const parts = sanitizedText.split(/(___PLACEHOLDER_\d+___)/g); const processedParts = parts.map(part => { if (part.match(/___PLACEHOLDER_\d+___/)) { return part; } else { let processedPart = part; processedPart = processedPart.replace(/^(#{1,6})\s+(.+)/gm, '*$2*'); processedPart = processedPart.replace(/^(\s*)[-*]\s+/gm, '$1- '); processedPart = processedPart.replace(/\*\*(.*?)\*\*/g, '*$1*'); processedPart = processedPart.replace(/__(.*?)__/g, '*$1*'); processedPart = processedPart.replace(/(^|\s)\*(?!\*)([^*]+?)\*(?!\*)/g, '$1_$2_'); processedPart = processedPart.replace(/(^|\s)_(?!_)([^_]+?)_(?!_)/g, '$1_$2_'); processedPart = processedPart.replace(/~~(.*?)~~/g, '~$1~'); processedPart = processedPart.replace(/^\s*┃/gm, '>'); processedPart = processedPart.replace(/^>\s?/gm, '> '); return processedPart; } }); sanitizedText = processedParts.join(''); sanitizedText = sanitizedText.replace(/___PLACEHOLDER_(\d+)___/g, (_, idx) => replacements[Number(idx)]); const codeBlockCount = (sanitizedText.match(/```/g) || []).length; if (codeBlockCount % 2 !== 0) { sanitizedText += '\n```'; } return sanitizedText; } function processThinkingTags(text: string): string { let processedText = text; const firstThinkIndex = processedText.indexOf(''); if (firstThinkIndex === -1) { return processedText.replace(/<\/think>/g, '___THINK_END___'); } processedText = processedText.substring(0, firstThinkIndex) + '___THINK_START___' + processedText.substring(firstThinkIndex + ''.length); const lastThinkEndIndex = processedText.lastIndexOf(''); if (lastThinkEndIndex !== -1) { processedText = processedText.substring(0, lastThinkEndIndex) + '___THEND___' + processedText.substring(lastThinkEndIndex + ''.length); } processedText = processedText.replace(//g, ''); processedText = processedText.replace(/<\/think>/g, ''); processedText = processedText.replace('___THEND___', '___THINK_END___'); return processedText; } export async function preChecks() { const envs = [ "ollamaApi", "flashModel", "thinkingModel", ]; for (const env of envs) { if (!process.env[env]) { console.error(`[✨ AI | !] ❌ ${env} not set!`); return false; } } const ollamaApi = process.env.ollamaApi!; let ollamaOk = false; for (let i = 0; i < 10; i++) { try { const res = await axios.get(ollamaApi, { timeout: 2000 }); if (res.status === 200) { ollamaOk = true; break; } } catch (err) { if (i < 9) { await new Promise((resolve) => setTimeout(resolve, 1000)); } } } if (!ollamaOk) { console.error(`[✨ AI | !] ❌ Ollama API is not responding at ${ollamaApi}`); return false; } console.log(`[✨ AI] Pre-checks passed.`); const modelCount = models.reduce((acc, model) => acc + model.models.length, 0); console.log(`[✨ AI] Found ${modelCount} models.`); return true; } function isAxiosError(error: unknown): error is { response?: { data?: { error?: string }, status?: number, statusText?: string }, request?: unknown, message?: string } { return typeof error === 'object' && error !== null && ( 'response' in error || 'request' in error || 'message' in error ); } function extractAxiosErrorMessage(error: unknown): string { if (isAxiosError(error)) { const err = error as { response?: { data?: { error?: string }, status?: number, statusText?: string }, request?: unknown, message?: string }; if (err.response && typeof err.response === 'object') { const resp = err.response; if (resp.data && typeof resp.data === 'object' && 'error' in resp.data) { return String(resp.data.error); } if ('status' in resp && 'statusText' in resp) { return `HTTP ${resp.status}: ${resp.statusText}`; } return JSON.stringify(resp.data ?? resp); } if (err.request) { return 'No response received from server.'; } if (typeof err.message === 'string') { return err.message; } } return 'An unexpected error occurred.'; } function containsUrls(text: string): boolean { return text.includes('http://') || text.includes('https://') || text.includes('.com') || text.includes('.net') || text.includes('.org') || text.includes('.io') || text.includes('.ai') || text.includes('.dev') } async function getResponse(prompt: string, ctx: TextContext, replyGenerating: Message, model: string, aiTemperature: number, originalMessage: string, db: NodePgDatabase, userId: string, Strings: ReturnType, showThinking: boolean, abortController?: AbortController): Promise<{ success: boolean; response?: string; error?: string, messageType?: 'generation' | 'system', executionTimeoutReached?: boolean }> { if (!ctx.chat) { return { success: false, error: Strings.unexpectedErr.replace("{error}", Strings.ai.noChatFound), }; } const cleanedModelName = model.includes('/') ? model.split('/').pop()! : model; let status = Strings.ai.statusWaitingRender; let modelHeader = Strings.ai.modelHeader .replace("{model}", `${cleanedModelName}`) .replace("{temperature}", String(aiTemperature)) .replace("{status}", status) + "\n\n"; const promptCharCount = originalMessage.length; await db.update(schema.usersTable) .set({ aiCharacters: sql`${schema.usersTable.aiCharacters} + ${promptCharCount}` }) .where(eq(schema.usersTable.telegramId, userId)); const paramSizeStr = models.find(m => m.name === model)?.models.find(m => m.name === model)?.parameterSize?.replace('B', ''); const shouldKeepAlive = paramSizeStr ? Number(paramSizeStr) > unloadModelAfterB : false; const user = await db.query.usersTable.findFirst({ where: (fields, { eq }) => eq(fields.telegramId, userId) }); const maxExecutionTime = user?.aiMaxExecutionTime || 0; const timeout = maxExecutionTime > 0 ? maxExecutionTime * 1000 : 300000; // 5m let executionTimeout: NodeJS.Timeout | null = null; let executionTimeoutReached = false; let fullResponse = ""; if (timeout < 300000) { // 5m executionTimeout = setTimeout(() => { if (abortController && !abortController.signal.aborted) { executionTimeoutReached = true; abortController.abort(); } }, timeout); } try { const aiResponse = await axios.post( `${process.env.ollamaApi}/api/generate`, { model, prompt, stream: true, keep_alive: shouldKeepAlive ? '1' : '0', options: { temperature: aiTemperature } }, { responseType: "stream", timeout: 60000, //1m signal: abortController?.signal, } ); let lastUpdateCharCount = 0; let sentHeader = false; let firstChunk = true; const stream: NodeJS.ReadableStream = aiResponse.data as any; const formatThinkingMessage = (text: string) => { const withPlaceholders = text .replace(/___THINK_START___/g, `${Strings.ai.thinking}`) .replace(/___THINK_END___/g, `${Strings.ai.finishedThinking}`); return sanitizeMarkdownForTelegram(withPlaceholders); }; let isThinking = false; let hasStartedThinking = false; let hasFinishedThinking = false; for await (const chunk of stream) { const lines = chunk.toString().split('\n'); for (const line of lines) { if (!line.trim()) continue; let ln: OllamaResponse; try { ln = JSON.parse(line); } catch (e) { console.error("[✨ AI | !] Error parsing chunk"); continue; } if (ln.response) { if (ln.response.includes('')) { const thinkMatch = ln.response.match(/([\s\S]*?)<\/think>/); if (thinkMatch && thinkMatch[1].trim().length > 0) { logger.logThinking(ctx.chat.id, replyGenerating.message_id, true); } else if (!thinkMatch) { logger.logThinking(ctx.chat.id, replyGenerating.message_id, true); } if (!hasStartedThinking) { isThinking = true; hasStartedThinking = true; } } else if (ln.response.includes('')) { logger.logThinking(ctx.chat.id, replyGenerating.message_id, false); if (isThinking && !hasFinishedThinking) { isThinking = false; hasFinishedThinking = true; } } fullResponse += ln.response; if (showThinking) { let displayResponse = processThinkingTags(fullResponse); if (firstChunk) { status = Strings.ai.statusWaitingRender; modelHeader = Strings.ai.modelHeader .replace("{model}", `${cleanedModelName}`) .replace("{temperature}", String(aiTemperature)) .replace("{status}", status) + "\n\n"; await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + formatThinkingMessage(displayResponse), { parse_mode: 'Markdown' } ); lastUpdateCharCount = displayResponse.length; sentHeader = true; firstChunk = false; continue; } const updateEveryChars = Number(process.env.updateEveryChars) || 100; if (displayResponse.length - lastUpdateCharCount >= updateEveryChars || !sentHeader) { await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + formatThinkingMessage(displayResponse), { parse_mode: 'Markdown' } ); lastUpdateCharCount = displayResponse.length; sentHeader = true; } } else { if (hasStartedThinking && !hasFinishedThinking && isThinking) { if (firstChunk) { status = Strings.ai.statusWaitingRender; modelHeader = Strings.ai.modelHeader .replace("{model}", `${cleanedModelName}`) .replace("{temperature}", String(aiTemperature)) .replace("{status}", status) + "\n\n"; await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + Strings.ai.thinking, { parse_mode: 'Markdown' } ); sentHeader = true; firstChunk = false; } } else if (hasFinishedThinking) { let processedResponse = processThinkingTags(fullResponse); let displayResponse = processedResponse.replace(/___THINK_START___[\s\S]*?___THINK_END___/g, '').trim(); displayResponse = displayResponse.replace(/___THINK_START___[\s\S]*/g, '').trim(); if (firstChunk) { status = Strings.ai.statusWaitingRender; modelHeader = Strings.ai.modelHeader .replace("{model}", `${cleanedModelName}`) .replace("{temperature}", String(aiTemperature)) .replace("{status}", status) + "\n\n"; await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + Strings.ai.finishedThinking + "\n\n" + sanitizeMarkdownForTelegram(displayResponse), { parse_mode: 'Markdown' } ); lastUpdateCharCount = displayResponse.length; sentHeader = true; firstChunk = false; continue; } const updateEveryChars = Number(process.env.updateEveryChars) || 100; if (displayResponse.length - lastUpdateCharCount >= updateEveryChars) { await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + Strings.ai.finishedThinking + "\n\n" + sanitizeMarkdownForTelegram(displayResponse), { parse_mode: 'Markdown' } ); lastUpdateCharCount = displayResponse.length; } } else if (!hasStartedThinking) { if (firstChunk) { status = Strings.ai.statusWaitingRender; modelHeader = Strings.ai.modelHeader .replace("{model}", `${cleanedModelName}`) .replace("{temperature}", String(aiTemperature)) .replace("{status}", status) + "\n\n"; await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + sanitizeMarkdownForTelegram(fullResponse), { parse_mode: 'Markdown' } ); lastUpdateCharCount = fullResponse.length; sentHeader = true; firstChunk = false; continue; } const updateEveryChars = Number(process.env.updateEveryChars) || 100; if (fullResponse.length - lastUpdateCharCount >= updateEveryChars) { await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + sanitizeMarkdownForTelegram(fullResponse), { parse_mode: 'Markdown' } ); lastUpdateCharCount = fullResponse.length; } } } } } } if (executionTimeout) { clearTimeout(executionTimeout); } status = Strings.ai.statusRendering; modelHeader = Strings.ai.modelHeader .replace("{model}", `${cleanedModelName}`) .replace("{temperature}", String(aiTemperature)) .replace("{status}", status) + "\n\n"; if (showThinking) { let displayResponse = processThinkingTags(fullResponse); await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + formatThinkingMessage(displayResponse), { parse_mode: 'Markdown' } ); } else { let processedResponse = processThinkingTags(fullResponse); let displayResponse = processedResponse.replace(/___THINK_START___[\s\S]*?___THINK_END___/g, '').trim(); displayResponse = displayResponse.replace(/___THINK_START___[\s\S]*/g, '').trim(); if (hasStartedThinking) { await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + Strings.ai.finishedThinking + "\n\n" + sanitizeMarkdownForTelegram(displayResponse), { parse_mode: 'Markdown' } ); } else { await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + sanitizeMarkdownForTelegram(displayResponse), { parse_mode: 'Markdown' } ); } } const responseCharCount = fullResponse.length; await db.update(schema.usersTable) .set({ aiCharacters: sql`${schema.usersTable.aiCharacters} + ${responseCharCount}`, aiRequests: sql`${schema.usersTable.aiRequests} + 1` }) .where(eq(schema.usersTable.telegramId, userId)); const patchedResponse = processThinkingTags(fullResponse); return { success: true, response: patchedResponse, messageType: 'generation', executionTimeoutReached }; } catch (error: unknown) { if (executionTimeout) { clearTimeout(executionTimeout); } if (error instanceof Error && (error.name === 'AbortError' || error.message.toLowerCase().includes('aborted'))) { if (executionTimeoutReached) { console.log("[✨ AI] Request was aborted due to execution timeout"); const patchedResponse = processThinkingTags(fullResponse); return { success: true, response: patchedResponse, messageType: 'generation', executionTimeoutReached: true }; } else { console.log("[✨ AI] Request was aborted by user"); return { success: false, error: 'Request was aborted' }; } } const errorMsg = extractAxiosErrorMessage(error); console.error("[✨ AI | !] Error:", errorMsg); if (isAxiosError(error) && error.response && typeof error.response === 'object') { const resp = error.response as { data?: { error?: string }, status?: number }; const errData = resp.data && typeof resp.data === 'object' && 'error' in resp.data ? (resp.data as { error?: string }).error : undefined; const errStatus = 'status' in resp ? resp.status : undefined; if ((typeof errData === 'string' && errData.includes(`model '${model}' not found`)) || errStatus === 404) { await ctx.telegram.editMessageText( ctx.chat!.id, replyGenerating.message_id, undefined, Strings.ai.pulling.replace("{model}", `${cleanedModelName}`), { parse_mode: 'Markdown' } ); console.log(`[✨ AI] Pulling ${model} from ollama...`); try { await axios.post( `${process.env.ollamaApi}/api/pull`, { model, stream: false, timeout: Number(process.env.ollamaApiTimeout) || 10000, } ); } catch (e: unknown) { const pullMsg = extractAxiosErrorMessage(e); console.error("[✨ AI | !] Pull error:", pullMsg); return { success: false, error: `❌ Something went wrong while pulling \`${model}\`: ${pullMsg}`, messageType: 'system' }; } console.log(`[✨ AI] ${model} pulled successfully`); return { success: true, response: Strings.ai.pulled.replace("{model}", `\`${cleanedModelName}\``), messageType: 'system' }; } } return { success: false, error: errorMsg, }; } } async function handleAiReply(ctx: TextContext, model: string, prompt: string, replyGenerating: Message, aiTemperature: number, originalMessage: string, db: NodePgDatabase, userId: string, Strings: ReturnType, showThinking: boolean, abortController?: AbortController) { const aiResponse = await getResponse(prompt, ctx, replyGenerating, model, aiTemperature, originalMessage, db, userId, Strings, showThinking, abortController); if (!aiResponse) return; if (!ctx.chat) return; if (!aiResponse.success && aiResponse.error === 'Request was aborted') { return; } if (aiResponse.success && aiResponse.response) { if (aiResponse.messageType === 'system') { await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, aiResponse.response, { parse_mode: 'Markdown' } ); return; } const cleanedModelName = model.includes('/') ? model.split('/').pop()! : model; const status = Strings.ai.statusComplete; const modelHeader = Strings.ai.modelHeader .replace("{model}", `${cleanedModelName}`) .replace("{temperature}", String(aiTemperature)) .replace("{status}", status) + "\n\n"; const urlWarning = containsUrls(originalMessage) ? Strings.ai.urlWarning : ''; let finalResponse = aiResponse.response; const hasThinkingContent = finalResponse.includes('___THINK_START___'); if (showThinking) { finalResponse = finalResponse.replace(/___THINK_START___/g, `${Strings.ai.thinking}`) .replace(/___THINK_END___/g, `${Strings.ai.finishedThinking}`); } else { finalResponse = finalResponse.replace(/___THINK_START___[\s\S]*?___THINK_END___/g, '').trim(); finalResponse = finalResponse.replace(/___THINK_START___[\s\S]*/g, '').trim(); } const thinkingPrefix = (!showThinking && hasThinkingContent) ? `${Strings.ai.finishedThinking}\n\n` : ''; const timeoutSuffix = aiResponse.executionTimeoutReached ? Strings.ai.executionTimeoutReached : ''; await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, modelHeader + thinkingPrefix + sanitizeMarkdownForTelegram(finalResponse) + urlWarning + timeoutSuffix, { parse_mode: 'Markdown' } ); return; } const error = Strings.unexpectedErr.replace("{error}", aiResponse.error); await rateLimiter.editMessageWithRetry( ctx, ctx.chat.id, replyGenerating.message_id, error, { parse_mode: 'Markdown' } ); } async function getUserWithStringsAndModel(ctx: Context, db: NodePgDatabase): Promise<{ user: User; Strings: ReturnType; languageCode: string; customAiModel: string; aiTemperature: number, showThinking: boolean }> { const userArr = await db.query.usersTable.findMany({ where: (fields, { eq }) => eq(fields.telegramId, String(ctx.from!.id)), limit: 1 }); let user = userArr[0]; if (!user) { await ensureUserInDb(ctx, db); const newUserArr = await db.query.usersTable.findMany({ where: (fields, { eq }) => eq(fields.telegramId, String(ctx.from!.id)), limit: 1 }); user = newUserArr[0]; const Strings = getStrings(user.languageCode); return { user, Strings, languageCode: user.languageCode, customAiModel: user.customAiModel, aiTemperature: user.aiTemperature, showThinking: user.showThinking }; } const Strings = getStrings(user.languageCode); return { user, Strings, languageCode: user.languageCode, customAiModel: user.customAiModel, aiTemperature: user.aiTemperature, showThinking: user.showThinking }; } export function getModelLabelByName(name: string): string { for (const series of models) { const found = series.models.find(m => m.name === name); if (found) return found.label; } return name; } export default (bot: Telegraf, db: NodePgDatabase) => { const botName = bot.botInfo?.first_name && bot.botInfo?.last_name ? `${bot.botInfo.first_name} ${bot.botInfo.last_name}` : "Kowalski" interface AiRequest { task: () => Promise; ctx: TextContext; wasQueued: boolean; userId: number; model: string; abortController?: AbortController; } const requestQueue: AiRequest[] = []; let isProcessing = false; let lastProcessedUserId: number | null = null; let currentRequest: AiRequest | null = null; async function processQueue() { if (isProcessing || requestQueue.length === 0) { return; } isProcessing = true; let nextRequestIndex = 0; if (lastProcessedUserId !== null && requestQueue.length > 1) { const differentUserIndex = requestQueue.findIndex(req => req.userId !== lastProcessedUserId); if (differentUserIndex !== -1) { nextRequestIndex = differentUserIndex; } } const selectedRequest = requestQueue.splice(nextRequestIndex, 1)[0]; const { task, ctx, wasQueued, userId } = selectedRequest; currentRequest = selectedRequest; lastProcessedUserId = userId; const { Strings } = await getUserWithStringsAndModel(ctx, db); const reply_to_message_id = replyToMessageId(ctx); try { if (wasQueued) { await ctx.reply(Strings.ai.startingProcessing, { ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }), parse_mode: 'Markdown' }); } await task(); } catch (error) { console.error("[✨ AI | !] Error processing task:", error); if (error.name === 'AbortError' || (error instanceof Error && error.message.toLowerCase().includes('aborted'))) { console.log("[✨ AI] Request was cancelled by user"); } else { const errorMessage = error instanceof Error ? error.message : String(error); await ctx.reply(Strings.unexpectedErr.replace("{error}", errorMessage), { ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }), parse_mode: 'Markdown' }); } } finally { currentRequest = null; isProcessing = false; processQueue(); } } async function aiCommandHandler(ctx: TextContext, command: 'ask' | 'think' | 'ai') { const commandId = command === 'ask' || command === 'think' ? 'ai-ask-think' : 'ai-custom'; if (await isCommandDisabled(ctx, db, commandId)) { return; } const reply_to_message_id = replyToMessageId(ctx); const { user, Strings, customAiModel, aiTemperature, showThinking } = await getUserWithStringsAndModel(ctx, db); const message = ctx.message.text; const author = ("@" + ctx.from?.username) || ctx.from?.first_name || "Unknown"; if (await checkUserTimeout(ctx, db, user.telegramId, Strings)) { return; } const model = command === 'ai' ? (customAiModel || flash_model) : (command === 'ask' ? flash_model : thinking_model); const fixedMsg = message.replace(new RegExp(`^/${command}(@\\w+)?\\s*`), "").trim(); logger.logCmdStart(author, command, model); if (!process.env.ollamaApi) { await ctx.reply(Strings.ai.disabled, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } if (!user.aiEnabled) { await ctx.reply(Strings.ai.disabledForUser, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } if (fixedMsg.length < 1) { await ctx.reply(Strings.ai.askNoMessage, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const userId = ctx.from!.id; const userQueueSize = requestQueue.filter(req => req.userId === userId).length; if (userQueueSize >= maxUserQueueSize) { await ctx.reply(Strings.ai.queueFull, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const abortController = new AbortController(); const task = async () => { const modelLabel = getModelLabelByName(model); const replyGenerating = await ctx.reply(Strings.ai.askGenerating.replace("{model}", `\`${modelLabel}\``), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); const prompt = sanitizeForJson(await usingSystemPrompt(ctx, db, botName, fixedMsg)); await handleAiReply(ctx, model, prompt, replyGenerating, aiTemperature, fixedMsg, db, user.telegramId, Strings, showThinking, abortController); }; if (isProcessing) { requestQueue.push({ task, ctx, wasQueued: true, userId: ctx.from!.id, model, abortController }); const position = requestQueue.length; await ctx.reply(Strings.ai.inQueue.replace("{position}", String(position)), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } else { requestQueue.push({ task, ctx, wasQueued: false, userId: ctx.from!.id, model, abortController }); processQueue(); } } bot.command(["ask", "think"], spamwatchMiddleware, async (ctx) => { if (!ctx.message || !('text' in ctx.message)) return; const command = ctx.message.text.startsWith('/ask') ? 'ask' : 'think'; await aiCommandHandler(ctx as TextContext, command); }); bot.command(["ai"], spamwatchMiddleware, async (ctx) => { if (!ctx.message || !('text' in ctx.message)) return; await aiCommandHandler(ctx as TextContext, 'ai'); }); bot.command(["aistop"], spamwatchMiddleware, async (ctx) => { if (await isCommandDisabled(ctx, db, 'ai-stop')) { return; } const { Strings } = await getUserWithStringsAndModel(ctx, db); const reply_to_message_id = replyToMessageId(ctx); const userId = ctx.from!.id; if (currentRequest && currentRequest.userId === userId) { currentRequest.abortController?.abort(); try { await axios.post(`${process.env.ollamaApi}/api/generate`, { model: currentRequest.model, keep_alive: 0, }, { timeout: 5000 }); } catch (error) { console.log("[✨ AI] Could not unload model after cancellation:", error.message); } await ctx.reply(Strings.ai.requestStopped, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const queuedRequestIndex = requestQueue.findIndex(req => req.userId === userId); if (queuedRequestIndex !== -1) { const removedRequest = requestQueue.splice(queuedRequestIndex, 1)[0]; removedRequest.abortController?.abort(); await ctx.reply(Strings.ai.requestRemovedFromQueue, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } await ctx.reply(Strings.ai.noActiveRequest, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); }); bot.command(["aistats"], spamwatchMiddleware, async (ctx) => { if (await isCommandDisabled(ctx, db, 'ai-stats')) { return; } const { user, Strings } = await getUserWithStringsAndModel(ctx, db); if (!user) { await ctx.reply(Strings.userNotFound || "User not found."); return; } const bookCount = Math.max(1, Math.round(user.aiCharacters / 500000)); const bookWord = bookCount === 1 ? 'book' : 'books'; const msg = `${Strings.aiStats.header}\n\n${Strings.aiStats.requests.replace('{aiRequests}', user.aiRequests)}\n${Strings.aiStats.characters.replace('{aiCharacters}', user.aiCharacters).replace('{bookCount}', bookCount).replace('books', bookWord)}`; await ctx.reply(msg, { parse_mode: 'Markdown' }); }); bot.command("queue", spamwatchMiddleware, async (ctx) => { if (!isAdmin(ctx)) { const { Strings } = await getUserWithStringsAndModel(ctx, db); await ctx.reply(Strings.noPermission); return; } const { Strings } = await getUserWithStringsAndModel(ctx, db); const reply_to_message_id = replyToMessageId(ctx); if (requestQueue.length === 0) { await ctx.reply(Strings.ai.queueEmpty, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } let queueItems = ""; for (let i = 0; i < requestQueue.length; i++) { const item = requestQueue[i]; const username = item.ctx.from?.username || item.ctx.from?.first_name || "Unknown"; const status = i === 0 && isProcessing ? "Processing" : "Queued"; const modelLabel = getModelLabelByName(item.model); queueItems += Strings.ai.queueItem .replace("{username}", username) .replace("{userId}", String(item.userId)) .replace("{model}", modelLabel) .replace("{status}", status); } const queueMsg = Strings.ai.queueList .replace("{queueItems}", queueItems) .replace("{totalItems}", String(requestQueue.length)); await ctx.reply(queueMsg, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); }); bot.command("qdel", spamwatchMiddleware, async (ctx) => { if (!isAdmin(ctx)) { const { Strings } = await getUserWithStringsAndModel(ctx, db); await ctx.reply(Strings.noPermission); return; } const { Strings } = await getUserWithStringsAndModel(ctx, db); const reply_to_message_id = replyToMessageId(ctx); const args = ctx.message.text.split(' '); if (args.length < 2) { await ctx.reply(Strings.ai.invalidUserId, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const targetUserId = parseInt(args[1]); if (isNaN(targetUserId)) { await ctx.reply(Strings.ai.invalidUserId, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } let stoppedCurrentRequest = false; const initialLength = requestQueue.length; const filteredQueue = requestQueue.filter(item => item.userId !== targetUserId); const removedCount = initialLength - filteredQueue.length; requestQueue.length = 0; requestQueue.push(...filteredQueue); if (currentRequest && currentRequest.userId === targetUserId) { currentRequest.abortController?.abort(); try { await axios.post(`${process.env.ollamaApi}/api/generate`, { model: currentRequest.model, keep_alive: 0, }, { timeout: 5000 }); } catch (error) { console.log("[✨ AI] Could not unload model after cancellation:", error.message); } stoppedCurrentRequest = true; } if (removedCount === 0 && !stoppedCurrentRequest) { await ctx.reply(Strings.ai.noQueueItems.replace("{userId}", String(targetUserId)), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } let responseMessage = ""; if (stoppedCurrentRequest && removedCount > 0) { responseMessage = Strings.ai.stoppedCurrentAndCleared.replace("{count}", String(removedCount)).replace("{userId}", String(targetUserId)); } else if (stoppedCurrentRequest) { responseMessage = Strings.ai.stoppedCurrentRequestOnly.replace("{userId}", String(targetUserId)); } else { responseMessage = Strings.ai.queueCleared.replace("{count}", String(removedCount)).replace("{userId}", String(targetUserId)); } await ctx.reply(responseMessage, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); }); bot.command("qlimit", spamwatchMiddleware, async (ctx) => { if (!isAdmin(ctx)) { const { Strings } = await getUserWithStringsAndModel(ctx, db); await ctx.reply(Strings.noPermission); return; } const { Strings } = await getUserWithStringsAndModel(ctx, db); const reply_to_message_id = replyToMessageId(ctx); const args = ctx.message.text.split(' '); if (args.length < 3) { await ctx.reply("Usage: /qlimit \nExample: /qlimit 123456789 1h", { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const targetUserId = args[1]; const durationStr = args[2]; if (!/^\d+$/.test(targetUserId)) { await ctx.reply(Strings.ai.invalidUserId, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const durationSeconds = parseDuration(durationStr); if (durationSeconds === -1) { await ctx.reply(Strings.ai.invalidDuration, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } try { const user = await db.query.usersTable.findFirst({ where: (fields, { eq }) => eq(fields.telegramId, targetUserId) }); if (!user) { await ctx.reply(Strings.ai.userNotFound.replace("{userId}", targetUserId), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const timeoutEnd = new Date(Date.now() + (durationSeconds * 1000)); await db.update(schema.usersTable) .set({ aiTimeoutUntil: timeoutEnd }) .where(eq(schema.usersTable.telegramId, targetUserId)); const filteredQueue = requestQueue.filter(item => item.userId !== parseInt(targetUserId)); requestQueue.length = 0; requestQueue.push(...filteredQueue); await ctx.reply(Strings.ai.userTimedOut.replace("{userId}", targetUserId).replace("{timeoutEnd}", timeoutEnd.toISOString()), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } catch (error) { await ctx.reply(Strings.ai.userTimeoutError.replace("{userId}", targetUserId).replace("{error}", error.message), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } }); bot.command("setexec", spamwatchMiddleware, async (ctx) => { if (!isAdmin(ctx)) { const { Strings } = await getUserWithStringsAndModel(ctx, db); await ctx.reply(Strings.noPermission); return; } const { Strings } = await getUserWithStringsAndModel(ctx, db); const reply_to_message_id = replyToMessageId(ctx); const args = ctx.message.text.split(' '); if (args.length < 3) { await ctx.reply("Usage: /setexec \nExample: /setexec 123456789 5m\nUse 'unlimited' to remove limit.", { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const targetUserId = args[1]; const durationStr = args[2]; if (!/^\d+$/.test(targetUserId)) { await ctx.reply(Strings.ai.invalidUserId, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } let durationSeconds = 0; if (durationStr.toLowerCase() !== 'unlimited') { durationSeconds = parseDuration(durationStr); if (durationSeconds === -1) { await ctx.reply(Strings.ai.invalidDuration, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } } try { const user = await db.query.usersTable.findFirst({ where: (fields, { eq }) => eq(fields.telegramId, targetUserId) }); if (!user) { await ctx.reply(Strings.ai.userNotFound.replace("{userId}", targetUserId), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } await db.update(schema.usersTable) .set({ aiMaxExecutionTime: durationSeconds }) .where(eq(schema.usersTable.telegramId, targetUserId)); if (durationSeconds === 0) { await ctx.reply(Strings.ai.userExecTimeRemoved.replace("{userId}", targetUserId), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } else { await ctx.reply(Strings.ai.userExecTimeSet.replace("{duration}", formatDuration(durationSeconds)).replace("{userId}", targetUserId), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } } catch (error) { await ctx.reply(Strings.ai.userExecTimeError.replace("{userId}", targetUserId).replace("{error}", error.message), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } }); bot.command("rlimit", spamwatchMiddleware, async (ctx) => { if (!isAdmin(ctx)) { const { Strings } = await getUserWithStringsAndModel(ctx, db); await ctx.reply(Strings.noPermission); return; } const { Strings } = await getUserWithStringsAndModel(ctx, db); const reply_to_message_id = replyToMessageId(ctx); const args = ctx.message.text.split(' '); if (args.length < 2) { await ctx.reply("Usage: /rlimit \nExample: /rlimit 123456789", { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } const targetUserId = args[1]; if (!/^\d+$/.test(targetUserId)) { await ctx.reply(Strings.ai.invalidUserId, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } try { const user = await db.query.usersTable.findFirst({ where: (fields, { eq }) => eq(fields.telegramId, targetUserId) }); if (!user) { await ctx.reply(Strings.ai.userNotFound.replace("{userId}", targetUserId), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } await db.update(schema.usersTable) .set({ aiTimeoutUntil: null, aiMaxExecutionTime: 0 }) .where(eq(schema.usersTable.telegramId, targetUserId)); await ctx.reply(Strings.ai.userLimitsRemoved.replace("{userId}", targetUserId), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } catch (error) { await ctx.reply(Strings.ai.userLimitRemoveError.replace("{userId}", targetUserId).replace("{error}", error.message), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } }); bot.command("limits", spamwatchMiddleware, async (ctx) => { if (!isAdmin(ctx)) { const { Strings } = await getUserWithStringsAndModel(ctx, db); await ctx.reply(Strings.noPermission); return; } const { Strings } = await getUserWithStringsAndModel(ctx, db); const reply_to_message_id = replyToMessageId(ctx); try { const usersWithTimeouts = await db.query.usersTable.findMany({ where: and( isNotNull(schema.usersTable.aiTimeoutUntil), gt(schema.usersTable.aiTimeoutUntil, new Date()) ), columns: { telegramId: true, username: true, firstName: true, aiTimeoutUntil: true } }); const usersWithExecLimits = await db.query.usersTable.findMany({ where: gt(schema.usersTable.aiMaxExecutionTime, 0), columns: { telegramId: true, username: true, firstName: true, aiMaxExecutionTime: true } }); if (usersWithTimeouts.length === 0 && usersWithExecLimits.length === 0) { await ctx.reply(Strings.ai.noLimitsSet, { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); return; } let limitsText = Strings.ai.limitsHeader + "\n\n"; if (usersWithTimeouts.length > 0) { limitsText += Strings.ai.timeoutLimitsHeader + "\n"; for (const user of usersWithTimeouts) { const displayName = user.username || user.firstName || "Unknown"; const timeoutEnd = user.aiTimeoutUntil!.toISOString(); limitsText += Strings.ai.timeoutLimitItem .replace("{displayName}", displayName) .replace("{userId}", user.telegramId) .replace("{timeoutEnd}", timeoutEnd) + "\n"; } limitsText += "\n"; } if (usersWithExecLimits.length > 0) { limitsText += Strings.ai.execLimitsHeader + "\n"; for (const user of usersWithExecLimits) { const displayName = user.username || user.firstName || "Unknown"; const execTime = formatDuration(user.aiMaxExecutionTime!); limitsText += Strings.ai.execLimitItem .replace("{displayName}", displayName) .replace("{userId}", user.telegramId) .replace("{execTime}", execTime) + "\n"; } } await ctx.reply(limitsText.trim(), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } catch (error) { await ctx.reply(Strings.ai.limitsListError.replace("{error}", error.message), { parse_mode: 'Markdown', ...(reply_to_message_id && { reply_parameters: { message_id: reply_to_message_id } }) }); } }); }