...
 
Commits (3)
"""LICENSE
Copyright 2018 Hermann Krumrey <hermann@krumreyh.com>
This file is part of bokkichat.
bokkichat is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
bokkichat is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with bokkichat. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import logging
import argparse
from bokkichat.address import Address
from bokkichat.message import TextMessage, MediaMessage, MediaType
from bokkichat.connection import TelegramSettings, TelegramConnection
def parse_args() -> argparse.Namespace:
"""
Parses the command line arguments
The program requires an API key, followed by a subcommand, being either
send, receive or echo.
Each of those provide a subparser.
:return: The parsed arguments
"""
parser = argparse.ArgumentParser(prog="telegram-cli")
parser.add_argument("api_key", help="The Bot's API key")
subparser = parser.add_subparsers(required=True, dest="mode")
send_parser = subparser.add_parser("send", help="Allows sending a message")
send_parser.add_argument("receiver", help="The receiver of the message")
send_parser.add_argument("text", help="The text/caption to send")
media_options = send_parser.add_mutually_exclusive_group(required=False)
media_options.add_argument("--audio", help="Path to an audio file to send")
media_options.add_argument("--image", help="Path to an image file to send")
media_options.add_argument("--video", help="Path to a video file to send")
subparser.add_parser("receive",
help="Allows receiving any pending messages")
subparser.add_parser("echo", help="Starts an echo bot")
return parser.parse_args()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
args = parse_args()
settings = TelegramSettings(args.api_key)
connection = TelegramConnection(settings)
if args.mode == "send":
receiver = Address(args.receiver)
if args.audio:
media_type = MediaType.AUDIO
elif args.video:
media_type = MediaType.VIDEO
elif args.image:
media_type = MediaType.IMAGE
else:
media_type = "text"
if media_type == "text":
message = TextMessage(connection.address, receiver, args.text)
else:
media_file = args.audio or args.video or args.image
with open(media_file, "rb") as f:
data = f.read()
message = MediaMessage(
connection.address,
receiver,
media_type,
data,
args.text
)
connection.send(message)
elif args.mode == "receive":
messages = connection.receive()
for message in messages:
print(message)
elif args.mode == "echo":
def echo(con, msg):
swap = msg.receiver
msg.receiver = msg.sender
msg.sender = swap
con.send(msg)
try:
connection.loop(echo)
except KeyboardInterrupt:
pass
connection.close()
......@@ -19,33 +19,101 @@ LICENSE"""
import logging
import argparse
from bokkichat.message import TextMessage
from bokkichat.connection import TelegramSettings, TelegramConnection
from bokkichat.address import Address
from bokkichat.message import TextMessage, MediaMessage, MediaType
from bokkichat.connection.TelegramConnection import TelegramConnection
from bokkichat.connection import TelegramSettings
def parse_args() -> argparse.Namespace:
"""
Parses the command line arguments
The program requires an API key, followed by a subcommand, being either
send, receive or echo.
Each of those provide a subparser.
:return: The parsed arguments
"""
parser = argparse.ArgumentParser(prog="telegram-cli")
parser.add_argument("api_id", help="The Account's API ID")
parser.add_argument("api_hash", help="The Account's API hash")
subparser = parser.add_subparsers(required=True, dest="mode")
send_parser = subparser.add_parser("send", help="Allows sending a message")
send_parser.add_argument("receiver", help="The receiver of the message")
send_parser.add_argument("text", help="The text/caption to send")
media_options = send_parser.add_mutually_exclusive_group(required=False)
media_options.add_argument("--audio", help="Path to an audio file to send")
media_options.add_argument("--image", help="Path to an image file to send")
media_options.add_argument("--video", help="Path to a video file to send")
subparser.add_parser("receive",
help="Allows receiving any pending messages")
subparser.add_parser("echo", help="Starts an echo bot")
return parser.parse_args()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
# logging.basicConfig(level=logging.DEBUG)
args = parse_args()
parser = argparse.ArgumentParser()
parser.add_argument("api_key")
parser.add_argument("mode", choices={"echo"})
args = parser.parse_args()
settings = TelegramSettings(args.api_id, args.api_hash)
settings = TelegramSettings(args.api_key)
connection = TelegramConnection(settings)
connection.close()
connection.send(
TextMessage(connection.address, Address("207968297"), "Hello Master")
)
if args.mode == "send":
if args.mode == "echo":
receiver = Address(args.receiver)
if args.audio:
media_type = MediaType.AUDIO
elif args.video:
media_type = MediaType.VIDEO
elif args.image:
media_type = MediaType.IMAGE
else:
media_type = "text"
if media_type == "text":
message = TextMessage(connection.address, receiver, args.text)
else:
media_file = args.audio or args.video or args.image
with open(media_file, "rb") as f:
data = f.read()
message = MediaMessage(
connection.address,
receiver,
media_type,
data,
args.text
)
connection.send(message)
elif args.mode == "receive":
messages = connection.receive()
print(messages)
for message in messages:
print(message)
elif args.mode == "echo":
def echo(con, msg):
receiver = msg.receiver
swap = msg.receiver
msg.receiver = msg.sender
msg.sender = receiver
msg.sender = swap
con.send(msg)
connection.loop(echo)
try:
connection.loop(echo)
except KeyboardInterrupt:
pass
connection.close()
......@@ -17,7 +17,7 @@ You should have received a copy of the GNU General Public License
along with bokkichat. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from typing import Callable, List
from typing import List
from bokkichat.address.Address import Address
from bokkichat.message.Message import Message
from bokkichat.message.TextMessage import TextMessage
......@@ -53,16 +53,9 @@ class CliConnection(Connection):
"""
return [TextMessage(self.address, self.address, input())]
def loop(self, callback: Callable):
def close(self):
"""
Starts a loop that periodically checks for new messages, calling
a provided callback function in the process.
:param callback: The callback function to call for each
received message.
The callback should have the following format:
lambda connection, message: do_stuff()
Disconnects the Connection.
:return: None
"""
while True:
for message in self.receive():
callback(self, message)
pass
......@@ -17,6 +17,7 @@ You should have received a copy of the GNU General Public License
along with bokkichat. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import time
import logging
from typing import Callable, List
from bokkichat.address.Address import Address
......@@ -39,6 +40,8 @@ class Connection:
"""
self.settings = settings
self.logger = logging.getLogger("bokkichat")
self.looping = False
self.loop_break = False
@property
def address(self) -> Address:
......@@ -64,7 +67,7 @@ class Connection:
"""
raise NotImplementedError()
def loop(self, callback: Callable):
def loop(self, callback: Callable, sleep_time: int = 1):
"""
Starts a loop that periodically checks for new messages, calling
a provided callback function in the process.
......@@ -72,6 +75,24 @@ class Connection:
received message.
The callback should have the following format:
lambda connection, message: do_stuff()
:param sleep_time: The time to sleep between loops
:return: None
"""
self.looping = True
while True:
for message in self.receive():
callback(self, message)
if self.loop_break:
self.loop_break = False
break
time.sleep(sleep_time)
self.looping = False
def close(self):
"""
Disconnects the Connection.
:return: None
"""
raise NotImplementedError()
"""LICENSE
Copyright 2018 Hermann Krumrey <hermann@krumreyh.com>
This file is part of bokkichat.
bokkichat is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
bokkichat is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with bokkichat. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
# noinspection PyPackageRequirements
import telegram
import requests
from typing import List, Dict, Any, Optional
from bokkichat.address.Address import Address
from bokkichat.message.Message import Message
from bokkichat.message.TextMessage import TextMessage
from bokkichat.message.MediaType import MediaType
from bokkichat.message.MediaMessage import MediaMessage
from bokkichat.connection.Connection import Connection
from bokkichat.connection.TelegramBotSettings import TelegramBotSettings
from bokkichat.exceptions import InvalidMessageData
class TelegramBotConnection(Connection):
"""
Class that implements a Telegram bot connection
"""
def __init__(self, settings: TelegramBotSettings):
"""
Initializes the connection, with credentials provided by a
Settings object.
:param settings: The settings for the connection
"""
super().__init__(settings)
self.bot = telegram.Bot(settings.api_key)
try:
self.update_id = self.bot.get_updates()[0].update_id
except IndexError:
self.update_id = 0
@property
def address(self) -> Address:
"""
A connection must be able to specify its own address
:return: The address of the connection
"""
return Address(str(self.bot.id))
def send(self, message: Message):
"""
Sends a message. A message may be either a TextMessage
or a MediaMessage.
:param message: The message to send
:return: None
"""
self.logger.info("Sending message to {}".format(message.receiver))
try:
if isinstance(message, TextMessage):
self.bot.send_message(
chat_id=message.receiver.address,
text=message.body
)
elif isinstance(message, MediaMessage):
media_map = {
MediaType.AUDIO: ("audio", self.bot.send_audio),
MediaType.VIDEO: ("video", self.bot.send_video),
MediaType.IMAGE: ("photo", self.bot.send_photo)
}
send_func = media_map[message.media_type][1]
# Write to file TODO: Check if this can be done with bytes
with open("/tmp/bokkichat-telegram-temp", "wb") as f:
f.write(message.data)
tempfile = open("/tmp/bokkichat-telegram-temp", "rb")
params = {
"chat_id": message.receiver.address,
"caption": message.caption,
media_map[message.media_type][0]: tempfile
}
send_func(**params)
tempfile.close()
except (telegram.error.Unauthorized, telegram.error.BadRequest):
self.logger.info(
"Failed to send message to {}".format(message.receiver)
)
def receive(self) -> List[Message]:
"""
Receives all pending messages.
:return: A list of pending Message objects
"""
messages = []
try:
for update in self.bot.get_updates(
offset=self.update_id, timeout=10
):
self.update_id = update.update_id + 1
telegram_message = update.message.to_dict()
try:
generated = self._parse_message(telegram_message)
self.logger.info(
"Received message from {}".format(generated.sender)
)
self.logger.debug(str(generated))
messages.append(generated)
except InvalidMessageData as e:
self.logger.error(str(e))
except telegram.error.Unauthorized:
# The self.bot.get_update method may cause an
# Unauthorized Error if the bot is blocked by the user
self.update_id += 1
except telegram.error.TimedOut:
pass
return messages
def _parse_message(self, message_data: Dict[str, Any]) -> \
Optional[Message]:
"""
Parses the message data of a Telegram message and generates a
corresponding Message object.
:param message_data: The telegram message data
:return: The generated Message object.
:raises: InvalidMessageData if the parsing failed
"""
address = Address(str(message_data["chat"]["id"]))
if "text" in message_data:
body = message_data["text"]
self.logger.debug("Message Body: {}".format(body))
return TextMessage(address, self.address, body)
else:
for media_key, media_type in {
"photo": MediaType.IMAGE,
"audio": MediaType.AUDIO,
"video": MediaType.VIDEO,
"voice": MediaType.AUDIO
}.items():
if media_key in message_data:
self.logger.debug("Media Type: {}".format(media_key))
media_info = message_data[media_key]
if len(media_info) == 0:
continue
if isinstance(media_info, list):
largest = media_info[len(media_info) - 1]
file_id = largest["file_id"]
elif isinstance(media_info, dict):
file_id = media_info["file_id"]
else:
continue
file_info = self.bot.get_file(file_id)
resp = requests.get(file_info["file_path"])
data = resp.content
return MediaMessage(
address,
self.address,
media_type,
data,
message_data.get("caption", "")
)
raise InvalidMessageData(message_data)
def close(self):
"""
Disconnects the Connection.
:return: None
"""
pass
"""LICENSE
Copyright 2018 Hermann Krumrey <hermann@krumreyh.com>
This file is part of bokkichat.
bokkichat is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
bokkichat is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with bokkichat. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import json
from bokkichat.connection.Settings import Settings
class TelegramBotSettings(Settings):
"""
Class that defines a Settings object for a Telegram bot connection
"""
def __init__(self, api_key: str):
"""
Initializes the Telegram Connection.
:param api_key: The API key used for authentication
"""
self.api_key = api_key
# noinspection PyMethodMayBeStatic
def serialize(self) -> str:
"""
Serializes the settings to a string
:return: The serialized Settings object
"""
return json.dumps({
"api_key": self.api_key
})
@classmethod
def deserialize(cls, serialized: str):
"""
Deserializes a string and generates a Settings object from it
:param serialized: The serialized string
:return: The deserialized Settings object
"""
obj = json.loads(serialized)
return cls(obj["api_key"])
......@@ -17,15 +17,12 @@ You should have received a copy of the GNU General Public License
along with bokkichat. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
import time
# noinspection PyPackageRequirements
import telegram
import requests
from typing import Callable, List
from typing import List
from telethon import TelegramClient
from bokkichat.address.Address import Address
from bokkichat.message.Message import Message
from bokkichat.message.TextMessage import TextMessage
from bokkichat.message.MediaType import MediaType
from bokkichat.message.MediaMessage import MediaMessage
from bokkichat.connection.Connection import Connection
from bokkichat.connection.TelegramSettings import TelegramSettings
......@@ -33,7 +30,7 @@ from bokkichat.connection.TelegramSettings import TelegramSettings
class TelegramConnection(Connection):
"""
Class that implements a Telegram connection
Class that implements a Telegram connection using telethon
"""
def __init__(self, settings: TelegramSettings):
......@@ -43,12 +40,10 @@ class TelegramConnection(Connection):
:param settings: The settings for the connection
"""
super().__init__(settings)
self.bot = telegram.Bot(settings.api_key)
try:
self.update_id = self.bot.get_updates()[0].update_id
except IndexError:
self.update_id = 0
self.client = TelegramClient(
settings.session_name, settings.api_id, settings.api_hash
)
self.client.start()
@property
def address(self) -> Address:
......@@ -56,7 +51,8 @@ class TelegramConnection(Connection):
A connection must be able to specify its own address
:return: The address of the connection
"""
return Address(str(self.bot.id))
# noinspection PyUnresolvedReferences
return Address(self.client.get_me().username)
def send(self, message: Message):
"""
......@@ -65,43 +61,28 @@ class TelegramConnection(Connection):
:param message: The message to send
:return: None
"""
self.logger.info("Sending message to {}".format(message.receiver))
try:
if isinstance(message, TextMessage):
self.bot.send_message(
chat_id=message.receiver.address,
text=message.body
self.client.send_message(
message.receiver.address,
message.body
)
elif isinstance(message, MediaMessage):
with open("/tmp/bokkichat-telegram-temp", "wb") as f:
# Write to file TODO: Check if this can be done with bytes
tempfile = "/tmp/bokkichat-telegram-temp"
with open(tempfile, "wb") as f:
f.write(message.data)
tempfile = open("/tmp/bokkichat-telegram-temp", "rb")
if message.media_type == MediaType.IMAGE:
self.bot.send_photo(
chat_id=message.receiver.address,
photo=tempfile,
caption=message.caption
)
elif message.media_type == MediaType.AUDIO:
self.bot.send_audio(
chat_id=message.receiver.address,
audio=tempfile,
caption=message.caption
)
else:
self.bot.send_video(
chat_id=message.receiver.address,
video=tempfile,
caption=message.caption
)
tempfile.close()
except (telegram.error.Unauthorized, telegram.error.BadRequest):
self.client.send_file(
message.receiver.address,
tempfile,
caption=message.caption
)
except ValueError:
self.logger.info(
"Failed to send message to {}".format(message.receiver)
)
......@@ -113,84 +94,27 @@ class TelegramConnection(Connection):
"""
messages = []
try:
for update in self.bot.get_updates(
offset=self.update_id, timeout=10
):
self.update_id = update.update_id + 1
telegram_message = update.message.to_dict()
address = Address(str(telegram_message['chat']['id']))
self.logger.info("Received message from {}".format(address))
if "text" in telegram_message:
body = telegram_message['text']
self.logger.debug("Message Body: {}".format(body))
messages.append(TextMessage(address, self.address, body))
for media_key, media_type in {
"photo": MediaType.IMAGE,
"audio": MediaType.AUDIO,
"video": MediaType.VIDEO,
"voice": MediaType.AUDIO
}.items():
if media_key in telegram_message:
self.logger.debug("Media Type: {}".format(media_key))
media_info = telegram_message[media_key]
if isinstance(media_info, list):
if len(media_info) == 0:
continue
largest = media_info[len(media_info) - 1]
file_id = largest["file_id"]
elif isinstance(media_info, dict):
file_id = media_info["file_id"]
else:
continue
file_info = self.bot.get_file(file_id)
resp = requests.get(file_info["file_path"])
data = resp.content
# noinspection PyTypeChecker
for dialog in self.client.get_dialogs():
# noinspection PyTypeChecker
for message in self.client.iter_messages(dialog.entity):
messages.append(MediaMessage(
address,
self.address,
media_type,
data,
telegram_message.get("caption", "")
))
# TODO figure out how to filter out read messages
if not message.out:
break
address = Address(dialog.name)
body = message.message
except telegram.error.Unauthorized:
# The self.bot.get_update method may cause an
# Unauthorized Error if the bot is blocked by the user
self.update_id += 1
generated = TextMessage(self.address, address, body)
messages.append(generated)
except telegram.error.TimedOut:
pass
# TODO ACK
return messages
def loop(self, callback: Callable):
def close(self):
"""
Starts a loop that periodically checks for new messages, calling
a provided callback function in the process.
:param callback: The callback function to call for each
received message.
The callback should have the following format:
lambda connection, message: do_stuff()
Disconnects the Connection.
:return: None
"""
while True:
for message in self.receive():
callback(self, message)
time.sleep(1)
self.client.disconnect()
......@@ -26,12 +26,21 @@ class TelegramSettings(Settings):
Class that defines a Settings object for a Telegram connection
"""
def __init__(self, api_key):
def __init__(
self,
api_id: str,
api_hash: str,
session_name: str = "bokkichat"
):
"""
Initializes the Telegram Connection.
:param api_key: The API key used for authentication
:param api_id: The API ID
:param api_hash: The API hash
:param session_name: The name of the telethon session
"""
self.api_key = api_key
self.api_id = api_id
self.api_hash = api_hash
self.session_name = session_name
# noinspection PyMethodMayBeStatic
def serialize(self) -> str:
......@@ -40,7 +49,9 @@ class TelegramSettings(Settings):
:return: The serialized Settings object
"""
return json.dumps({
"api_key": self.api_key
"api_id": self.api_id,
"api_hash": self.api_hash,
"session_name": self.session_name
})
@classmethod
......@@ -51,4 +62,4 @@ class TelegramSettings(Settings):
:return: The deserialized Settings object
"""
obj = json.loads(serialized)
return cls(obj["api_key"])
return cls(obj["api_id"], obj["api_hash"], obj["session_name"])
......@@ -23,3 +23,5 @@ from bokkichat.connection.CliConnection import CliConnection
from bokkichat.connection.CliSettings import CliSettings
from bokkichat.connection.TelegramConnection import TelegramConnection
from bokkichat.connection.TelegramSettings import TelegramSettings
from bokkichat.connection.TelegramBotConnection import TelegramBotConnection
from bokkichat.connection.TelegramBotSettings import TelegramBotSettings
"""LICENSE
Copyright 2018 Hermann Krumrey <hermann@krumreyh.com>
This file is part of bokkichat.
bokkichat is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
bokkichat is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with bokkichat. If not, see <http://www.gnu.org/licenses/>.
LICENSE"""
from typing import Dict, Any
class InvalidMessageData(Exception):
"""
Exception that indicates message data that's invalid or otherwise could
not be correctly parsed.
"""
def __init__(self, message_data: Dict[str, Any]):
"""
Initializes the Exception
:param message_data: The message data that caused the
exception to be raised
"""
self.message_data = message_data
def __str__(self) -> str:
"""
:return: A string representation of the exception
"""
return "InvalidMessageData: {}".format(self.message_data)
......@@ -40,6 +40,7 @@ if __name__ == "__main__":
install_requires=[
"typing",
"python-telegram-bot",
"telethon",
"requests"
],
test_suite='nose.collector',
......