From 29a42a398b1e240fe21ad3b153bd0f38090834e0 Mon Sep 17 00:00:00 2001 From: pieter Date: Fri, 2 Oct 2015 00:00:54 +0200 Subject: [PATCH] Restored the non-threaded variant --- telebot/__init__.py | 65 ++++++++++++++++++++++++++++++++++++++------- telebot/util.py | 2 +- 2 files changed, 57 insertions(+), 10 deletions(-) diff --git a/telebot/__init__.py b/telebot/__init__.py index 84c1b2a..86e2977 100644 --- a/telebot/__init__.py +++ b/telebot/__init__.py @@ -44,7 +44,7 @@ class TeleBot: getUpdates """ - def __init__(self, token): + def __init__(self, token, threaded=True): """ :param token: bot API token :return: Telebot object. @@ -64,13 +64,16 @@ class TeleBot: self.message_subscribers_next_step = {} self.message_handlers = [] - self.worker_pool = util.ThreadPool() + + self.threaded = threaded + if self.threaded: + self.worker_pool = util.ThreadPool() def set_webhook(self, url=None, certificate=None): return apihelper.set_webhook(self.token, url, certificate) def remove_webhook(self): - return self.set_webhook() # No params resets webhook + return self.set_webhook() # No params resets webhook def get_updates(self, offset=None, limit=None, timeout=20): """ @@ -110,9 +113,9 @@ class TeleBot: def __notify_update(self, new_messages): for listener in self.update_listener: - self.worker_pool.put(listener, new_messages) + self.__exec_task(listener, new_messages) - def polling(self, none_stop=False, interval=0, timeout=20): + def polling(self, none_stop=False, interval=0, timeout=3): """ This function creates a new Thread that calls an internal __retrieve_updates function. This allows the bot to retrieve Updates automagically and notify listeners and message handlers accordingly. @@ -124,8 +127,14 @@ class TeleBot: :param timeout: Timeout in seconds for long polling. :return: """ - logger.info('Started polling.') + if self.threaded: + self.__threaded_polling(none_stop, interval, timeout) + else: + self.__non_threaded_polling(none_stop, interval, timeout) + def __threaded_polling(self, none_stop=False, interval=0, timeout=3): + logger.info('Started polling.') + self.__stop_polling.clear() error_interval = .25 polling_thread = util.WorkerThread(name="PollingThread") @@ -139,7 +148,9 @@ class TeleBot: or_event.clear() try: polling_thread.put(self.__retrieve_updates, timeout) - or_event.wait() + + while not or_event.is_set(): + time.sleep(.05) # wait for polling thread finish, polling thread error or thread pool error polling_thread.raise_exceptions() self.worker_pool.raise_exceptions() @@ -156,9 +167,45 @@ class TeleBot: logger.info("Waiting for {0} seconds until retry".format(error_interval)) time.sleep(error_interval) error_interval *= 2 + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received.") + self.__stop_polling.set() + polling_thread.stop() + break logger.info('Stopped polling.') + def __non_threaded_polling(self, none_stop=False, interval=0, timeout=3): + logger.info('Started polling.') + self.__stop_polling.clear() + error_interval = .25 + + while not self.__stop_polling.wait(interval): + try: + self.__retrieve_updates(timeout) + error_interval = .25 + except apihelper.ApiException as e: + logger.error(e) + if not none_stop: + self.__stop_polling.set() + logger.info("Exception occurred. Stopping.") + else: + logger.info("Waiting for {0} seconds until retry".format(error_interval)) + time.sleep(error_interval) + error_interval *= 2 + except KeyboardInterrupt: + logger.info("KeyboardInterrupt received.") + self.__stop_polling.set() + break + + logger.info('Stopped polling.') + + def __exec_task(self, task, *args, **kwargs): + if self.threaded: + self.worker_pool.put(task, *args, **kwargs) + else: + task(*args, **kwargs) + def stop_polling(self): self.__stop_polling.set() @@ -381,7 +428,7 @@ class TeleBot: if chat_id in self.message_subscribers_next_step: handlers = self.message_subscribers_next_step[chat_id] for handler in handlers: - self.worker_pool.put(handler, message) + self.__exec_task(handler, message) self.message_subscribers_next_step.pop(chat_id, None) def message_handler(self, commands=None, regexp=None, func=None, content_types=['text']): @@ -452,7 +499,7 @@ class TeleBot: for message in new_messages: for message_handler in self.message_handlers: if self._test_message_handler(message_handler, message): - self.worker_pool.put(message_handler['function'], message) + self.__exec_task(message_handler['function'], message) break diff --git a/telebot/util.py b/telebot/util.py index 5b72bf1..2fb5869 100644 --- a/telebot/util.py +++ b/telebot/util.py @@ -43,7 +43,7 @@ class WorkerThread(threading.Thread): self.exception_event.clear() try: - task, args, kwargs = self.queue.get(block=True, timeout=.01) + task, args, kwargs = self.queue.get(block=True, timeout=.5) logger.debug("Received task") self.received_task_event.set()