Per il buon funzionamento dello script è necessario andare in questo link: https://my.telegram.org/auth e creare una nuova applicazione che ci fornirà api_id e api_hash. Una volta avviato lo script vi verrà chiesto di inserire il vostro numero di telefono per poter generare una sessione, inseritelo insieme al prefisso (+39) se è un numero italiano. Avrete bisogno anche della libreria pyrogram (https://docs.pyrogram.org/)
import os
import asyncio
from pyrogram import Client
from pyrogram.errors import FloodWait
from pyrogram.types import Message
# Inserisci qui i tuoi dati
api_id = 'IL_TUO_API_ID'
api_hash = 'IL_TUO_API_HASH'
session_name = 'my_session'
# Crea le cartelle dove salverai le immagini, GIF e video
folders = {'photo': 'img', 'gif': 'gif', 'video': 'video'}
for folder in folders.values():
os.makedirs(folder, exist_ok=True)
# Inizializza il client di Pyrogram
app = Client(session_name, api_id=api_id, api_hash=api_hash)
async def download_media(message: Message, folder: str):
"""Scarica media dal messaggio e gestisci eventuali errori di rate limit"""
try:
if message.photo:
file_name = os.path.join(folder, f"{message.photo.file_id}.jpg")
await app.download_media(message.photo, file_name=file_name)
print(f'Scaricata immagine in {file_name}')
elif message.animation: # GIF
file_name = os.path.join(folder, f"{message.animation.file_id}.gif")
await app.download_media(message.animation, file_name=file_name)
print(f'Scaricata GIF in {file_name}')
elif message.video: # Video
file_name = os.path.join(folder, f"{message.video.file_id}.mp4")
await app.download_media(message.video, file_name=file_name)
print(f'Scaricato video in {file_name}')
except FloodWait as e:
print(f"FloodWait di {e.x} secondi. Aspetta...")
await asyncio.sleep(e.x + 5) # Attendi e riprova dopo un margine di sicurezza
return await download_media(message, folder) # Riprova a scaricare dopo l'attesa
async def main():
# Inserisci qui il nome del canale (ad esempio, 'nomecanale' senza il simbolo '@')
channel_username = 'nome_del_canale_o_id'
# Contatore per limitare il numero di media scaricati
media_count = 0
max_media = 100 # Numero massimo di media da scaricare
tasks = []
batch_size = 5 # Riduci il numero di download simultanei
batch_delay = 10 # Aggiungi un ritardo tra i batch
async with app:
async for message in app.get_chat_history(channel_username):
if media_count >= max_media:
print("Raggiunto il limite di media da scaricare.")
break
if message.photo:
tasks.append(download_media(message, folders['photo']))
media_count += 1
elif message.animation: # Controlla se il messaggio contiene una GIF
tasks.append(download_media(message, folders['gif']))
media_count += 1
elif message.video: # Controlla se il messaggio contiene un video
tasks.append(download_media(message, folders['video']))
media_count += 1
# Scarica batch di media limitato a batch_size
if len(tasks) >= batch_size:
await asyncio.gather(*tasks)
tasks = [] # Resetta la lista dei task
await asyncio.sleep(batch_delay) # Attendi prima di iniziare il prossimo batch
# Scarica i media rimanenti
if tasks:
await asyncio.gather(*tasks)
app.run(main())