Repost messages from telegram to mastodon.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

135 lines
4.3 KiB

from telethon import TelegramClient
from telethon.tl.custom import Message
from telethon.tl.types import Channel
from telethon.tl.custom.dialog import Dialog
from mastodon import Mastodon, MastodonRatelimitError
import json
import settings
import os
def save_config(j):
with open("subscriptions.secret", "w") as file:
json.dump(j, file)
def is_bullshit(text: str):
if text is None:
return False
text = text.lower()
bullshit = [
"otus.pw",
"t.me",
"selectel",
"slc.tl",
"#промо",
"#реклама",
"sber",
"beclick.cc",
"#текстприслан",
"bit.ly",
"clck.ru",
"clc.to",
"@",
]
if 1 in [(1 if text.find(i) >= 0 else 0) for i in bullshit]:
return True
return False
def unpack_links(text: str):
if text is None:
return ""
text = text.replace("](", " (").replace("[", "")
return text
async def post(message: Message, account: str, channel: str, sensitive: bool = False):
mastodon = Mastodon(
access_token=f"{account}-account.secret",
api_base_url=settings.HOMESERVER,
ratelimit_method="throw",
)
media_ids = []
if message.media is not None:
try:
print("Downloading media from telegram...")
media = await client.download_media(message)
if media.find(".tgs") >= 0:
print("Media is sticker, which unsupported in mastodon. Skipping.")
else:
print("Uploading media to mastodon...")
posted_media = mastodon.media_post(media)
media_ids.append(posted_media["id"])
except MastodonRatelimitError:
print("Rate limited.")
return "ratelimit"
except Exception as e:
print(e)
text = unpack_links(message.text)
if text == "" and media_ids == []:
print("Toot is empty. Skipping.")
else:
try:
mastodon.status_post(
status=text
+ f"\n\nCrostooted from https://t.me/{channel}/{str(message.id)}",
media_ids=media_ids,
sensitive=sensitive,
)
print("Toot!")
except MastodonRatelimitError:
return "ratelimit"
except Exception as e:
print("Error in toot.", e)
if message.media is not None:
try:
os.remove(media)
except Exception as e:
print(e)
async def main(client: TelegramClient):
with open("subscriptions.secret", "r") as file:
j = json.load(file)
print("Getting dialogs...")
for i in await client.get_dialogs():
i: Dialog
if isinstance(i.entity, Channel):
if i.entity.username is not None:
obj = j.get(i.entity.username.lower())
else:
obj = None
if obj is not None:
print(f"Processing channel {i.entity.username}...")
messages: [Message] = await client.get_messages(
i, settings.BACKFILL_LIMIT
)
messages = messages[::-1]
for msg in messages:
print(f"Processing message {msg.id}...")
if msg.id not in obj["viewed"]:
result = None
if is_bullshit(msg.text) is False:
result = await post(
msg,
obj["name"],
i.entity.username,
obj.get("sensitive", False),
)
else:
print("Is bullshit. Skipping.")
if result == "ratelimit":
print("Rate limited. Exiting.")
exit(1)
obj["viewed"].append(msg.id)
j[i.entity.username.lower()] = obj
save_config(j)
else:
print("Already viewed. Skipping.")
client = TelegramClient(
"tg2masto", api_id=settings.API_ID, api_hash=settings.API_HASH
).start()
client.loop.run_until_complete(main(client))