cleanup, logging of ratelimit
This commit is contained in:
		
							parent
							
								
									a4233c5cff
								
							
						
					
					
						commit
						276d053ed5
					
				
					 1 changed files with 110 additions and 99 deletions
				
			
		|  | @ -45,15 +45,85 @@ class RateLimiter { | |||
|     return `${chatId}:${messageId}` | ||||
|   } | ||||
| 
 | ||||
|   private async waitForRateLimit(): Promise<void> { | ||||
|     if (this.isRateLimited) { | ||||
|       const now = Date.now() | ||||
|       if (now < this.rateLimitEndTime) { | ||||
|         const waitTime = this.rateLimitEndTime - now | ||||
|         await new Promise(resolve => setTimeout(resolve, waitTime)) | ||||
|       } | ||||
|       this.isRateLimited = false | ||||
|   private async waitForRateLimit(chatId: number, messageId: number): Promise<void> { | ||||
|     if (!this.isRateLimited) return | ||||
|     console.log(`[✨ AI | RATELIMIT] [${chatId}:${messageId}] Ratelimited, waiting for end of ${this.rateLimitEndTime - Date.now()}ms`) | ||||
|     const now = Date.now() | ||||
|     if (now < this.rateLimitEndTime) { | ||||
|       await new Promise(resolve => setTimeout(resolve, this.rateLimitEndTime - now)) | ||||
|     } | ||||
|     this.isRateLimited = false | ||||
|   } | ||||
| 
 | ||||
|   private chunkText(text: string): string[] { | ||||
|     const chunks: string[] = [] | ||||
|     let currentChunk = '' | ||||
|     let currentLength = 0 | ||||
|     const lines = text.split('\n') | ||||
|     for (const line of lines) { | ||||
|       if (currentLength + line.length + 1 > this.max_msg_length) { | ||||
|         if (currentChunk) { | ||||
|           chunks.push(currentChunk) | ||||
|           currentChunk = '' | ||||
|           currentLength = 0 | ||||
|         } | ||||
|         if (line.length > this.max_msg_length) { | ||||
|           for (let i = 0; i < line.length; i += this.max_msg_length) { | ||||
|             chunks.push(line.substring(i, i + this.max_msg_length)) | ||||
|           } | ||||
|         } else { | ||||
|           currentChunk = line | ||||
|           currentLength = line.length | ||||
|         } | ||||
|       } else { | ||||
|         if (currentChunk) { | ||||
|           currentChunk += '\n' | ||||
|           currentLength++ | ||||
|         } | ||||
|         currentChunk += line | ||||
|         currentLength += line.length | ||||
|       } | ||||
|     } | ||||
|     if (currentChunk) { | ||||
|       chunks.push(currentChunk) | ||||
|     } | ||||
|     return chunks | ||||
|   } | ||||
| 
 | ||||
|   private handleTelegramError(error: unknown, messageKey: string, options: any, ctx: Context, chatId: number, messageId: number): boolean { | ||||
|     if (!isTelegramError(error)) return false | ||||
|     if (error.response.error_code === 429) { | ||||
|       const retryAfter = error.response.parameters?.retry_after || 1 | ||||
|       this.isRateLimited = true | ||||
|       this.rateLimitEndTime = Date.now() + (retryAfter * 1000) | ||||
|       const existingTimeout = this.updateQueue.get(messageKey) | ||||
|       if (existingTimeout) clearTimeout(existingTimeout) | ||||
|       const timeout = setTimeout(() => { | ||||
|         this.processUpdate(ctx, chatId, messageId, options) | ||||
|       }, retryAfter * 1000) | ||||
|       this.updateQueue.set(messageKey, timeout) | ||||
|       return true | ||||
|     } | ||||
|     if (error.response.error_code === 400) { | ||||
|       if (error.response.description?.includes("can't parse entities") || error.response.description?.includes("MESSAGE_TOO_LONG")) { | ||||
|         const plainOptions = { ...options, parse_mode: undefined } | ||||
|         this.processUpdate(ctx, chatId, messageId, plainOptions) | ||||
|         return true | ||||
|       } | ||||
|       if (error.response.description?.includes("message is not modified")) { | ||||
|         this.pendingUpdates.delete(messageKey) | ||||
|         this.updateQueue.delete(messageKey) | ||||
|         return true | ||||
|       } | ||||
|       logger.logError(error) | ||||
|       this.pendingUpdates.delete(messageKey) | ||||
|       this.updateQueue.delete(messageKey) | ||||
|       return true | ||||
|     } | ||||
|     logger.logError(error) | ||||
|     this.pendingUpdates.delete(messageKey) | ||||
|     this.updateQueue.delete(messageKey) | ||||
|     return true | ||||
|   } | ||||
| 
 | ||||
|   private async processUpdate( | ||||
|  | @ -68,81 +138,45 @@ class RateLimiter { | |||
| 
 | ||||
|     const now = Date.now() | ||||
|     const timeSinceLastEdit = now - this.lastEditTime | ||||
| 
 | ||||
|     await this.waitForRateLimit() | ||||
|     await this.waitForRateLimit(chatId, messageId) | ||||
| 
 | ||||
|     if (timeSinceLastEdit < this.minInterval) { | ||||
|       const existingTimeout = this.updateQueue.get(messageKey) | ||||
|       if (existingTimeout) { | ||||
|         clearTimeout(existingTimeout) | ||||
|       } | ||||
| 
 | ||||
|       if (existingTimeout) clearTimeout(existingTimeout) | ||||
|       const timeout = setTimeout(() => { | ||||
|         this.processUpdate(ctx, chatId, messageId, options) | ||||
|       }, this.minInterval - timeSinceLastEdit) | ||||
| 
 | ||||
|       this.updateQueue.set(messageKey, timeout) | ||||
|       return | ||||
|     } | ||||
| 
 | ||||
|     try { | ||||
|       if (latestText.length > this.max_msg_length) { | ||||
|         const chunks: string[] = [] | ||||
|         let currentChunk = '' | ||||
|         let currentLength = 0 | ||||
| 
 | ||||
|         // Split text into chunks while preserving markdown formatting
 | ||||
|         const lines = latestText.split('\n') | ||||
|         for (const line of lines) { | ||||
|           if (currentLength + line.length + 1 > this.max_msg_length) { | ||||
|             if (currentChunk) { | ||||
|               chunks.push(currentChunk) | ||||
|               currentChunk = '' | ||||
|               currentLength = 0 | ||||
|             } | ||||
|             // if a single line is too long, split
 | ||||
|             if (line.length > this.max_msg_length) { | ||||
|               for (let i = 0; i < line.length; i += this.max_msg_length) { | ||||
|                 chunks.push(line.substring(i, i + this.max_msg_length)) | ||||
|               } | ||||
|             } else { | ||||
|               currentChunk = line | ||||
|               currentLength = line.length | ||||
|             } | ||||
|           } else { | ||||
|             if (currentChunk) { | ||||
|               currentChunk += '\n' | ||||
|               currentLength++ | ||||
|             } | ||||
|             currentChunk += line | ||||
|             currentLength += line.length | ||||
|           } | ||||
|         } | ||||
|         if (currentChunk) { | ||||
|           chunks.push(currentChunk) | ||||
|         } | ||||
| 
 | ||||
|         const chunks = this.chunkText(latestText) | ||||
|         const firstChunk = chunks[0] | ||||
|         logger.logChunk(chatId, messageId, firstChunk) | ||||
| 
 | ||||
|         try { | ||||
|           await ctx.telegram.editMessageText(chatId, messageId, undefined, firstChunk, options) | ||||
|         } catch (error: any) { | ||||
|           if (!error.response?.description?.includes("message is not modified")) { | ||||
|         } catch (error: unknown) { | ||||
|           if ( | ||||
|             isTelegramError(error) && | ||||
|             !error.response.description?.includes("message is not modified") | ||||
|           ) { | ||||
|             throw error | ||||
|           } | ||||
|         } | ||||
| 
 | ||||
|         for (let i = 1; i < chunks.length; i++) { | ||||
|           const chunk = chunks[i] | ||||
|           const overflowMessageId = this.overflowMessages.get(messageKey) | ||||
| 
 | ||||
|           if (overflowMessageId) { | ||||
|             try { | ||||
|               await ctx.telegram.editMessageText(chatId, overflowMessageId, undefined, chunk, options) | ||||
|               logger.logChunk(chatId, overflowMessageId, chunk, true) | ||||
|             } catch (error: any) { | ||||
|               if (!error.response?.description?.includes("message is not modified")) { | ||||
|             } catch (error: unknown) { | ||||
|               if ( | ||||
|                 isTelegramError(error) && | ||||
|                 !error.response.description?.includes("message is not modified") | ||||
|               ) { | ||||
|                 throw error | ||||
|               } | ||||
|             } | ||||
|  | @ -155,7 +189,6 @@ class RateLimiter { | |||
|             this.overflowMessages.set(messageKey, newMessage.message_id) | ||||
|           } | ||||
|         } | ||||
| 
 | ||||
|         this.pendingUpdates.set(messageKey, firstChunk) | ||||
|         if (chunks.length > 1) { | ||||
|           this.pendingUpdates.set( | ||||
|  | @ -164,54 +197,23 @@ class RateLimiter { | |||
|           ) | ||||
|         } | ||||
|       } else { | ||||
|         const sanitizedText = latestText | ||||
|         logger.logChunk(chatId, messageId, sanitizedText) | ||||
| 
 | ||||
|         logger.logChunk(chatId, messageId, latestText) | ||||
|         try { | ||||
|           await ctx.telegram.editMessageText(chatId, messageId, undefined, sanitizedText, options) | ||||
|         } catch (error: any) { | ||||
|           if (!error.response?.description?.includes("message is not modified")) { | ||||
|           await ctx.telegram.editMessageText(chatId, messageId, undefined, latestText, options) | ||||
|         } catch (error: unknown) { | ||||
|           if ( | ||||
|             isTelegramError(error) && | ||||
|             !error.response.description?.includes("message is not modified") | ||||
|           ) { | ||||
|             throw error | ||||
|           } | ||||
|         } | ||||
|         this.pendingUpdates.delete(messageKey) | ||||
|       } | ||||
| 
 | ||||
|       this.lastEditTime = Date.now() | ||||
|       this.updateQueue.delete(messageKey) | ||||
|     } catch (error: any) { | ||||
|       if (error.response?.error_code === 429) { | ||||
|         const retryAfter = error.response.parameters?.retry_after || 1 | ||||
|         this.isRateLimited = true | ||||
|         this.rateLimitEndTime = Date.now() + (retryAfter * 1000) | ||||
| 
 | ||||
|         const existingTimeout = this.updateQueue.get(messageKey) | ||||
|         if (existingTimeout) { | ||||
|           clearTimeout(existingTimeout) | ||||
|         } | ||||
| 
 | ||||
|         const timeout = setTimeout(() => { | ||||
|           this.processUpdate(ctx, chatId, messageId, options) | ||||
|         }, retryAfter * 1000) | ||||
| 
 | ||||
|         this.updateQueue.set(messageKey, timeout) | ||||
|       } else if (error.response?.error_code === 400) { | ||||
|         if (error.response?.description?.includes("can't parse entities")) { | ||||
|           // try again with plain text
 | ||||
|           const plainOptions = { ...options, parse_mode: undefined } | ||||
|           await this.processUpdate(ctx, chatId, messageId, plainOptions) | ||||
|         } else if (error.response?.description?.includes("MESSAGE_TOO_LONG")) { | ||||
|           const plainOptions = { ...options, parse_mode: undefined } | ||||
|           await this.processUpdate(ctx, chatId, messageId, plainOptions) | ||||
|         } else if (error.response?.description?.includes("message is not modified")) { | ||||
|           this.pendingUpdates.delete(messageKey) | ||||
|           this.updateQueue.delete(messageKey) | ||||
|         } else { | ||||
|           logger.logError(error) | ||||
|           this.pendingUpdates.delete(messageKey) | ||||
|           this.updateQueue.delete(messageKey) | ||||
|         } | ||||
|       } else { | ||||
|     } catch (error: unknown) { | ||||
|       if (!this.handleTelegramError(error, messageKey, options, ctx, chatId, messageId)) { | ||||
|         logger.logError(error) | ||||
|         this.pendingUpdates.delete(messageKey) | ||||
|         this.updateQueue.delete(messageKey) | ||||
|  | @ -232,4 +234,13 @@ class RateLimiter { | |||
|   } | ||||
| } | ||||
| 
 | ||||
| export const rateLimiter = new RateLimiter()  | ||||
| export const rateLimiter = new RateLimiter() | ||||
| 
 | ||||
| function isTelegramError(error: unknown): error is { response: { description?: string, error_code?: number, parameters?: { retry_after?: number } } } { | ||||
|   return ( | ||||
|     typeof error === "object" && | ||||
|     error !== null && | ||||
|     "response" in error && | ||||
|     typeof (error as any).response === "object" | ||||
|   ) | ||||
| } | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue