Source code for mciwb.monitor
"""
Thread functions for running any background tasks. Primarily used for
monitoring the state of objects in Minecraft.
"""
from time import sleep
from typing import Any, Callable, List, Tuple, Union
from rcon.exceptions import SessionTimeout
from mciwb.logging import log
from mciwb.player import PlayerNotInWorld
from mciwb.threads import get_client, get_thread_name, new_thread
# supports any function arguments
CallbackFunction = Callable
[docs]
class Monitor:
""" "
A class to provide threads for monitoring. Each thread maintains a
list of functions to call repeatedly.
Each thread has its own Client object for parallel
execution of Minecraft server functions.
:param name: name of the thread
:param func: a function to call in the Monitor thread
:param params: parameters to pass to the above function. Note that
*func* and *params* can be None, () in which case you must use
`add_poller_func` to add functions to be called.
:param poll_rate: rate at which to poll the functions
:param once: if True, stop polling after first poll - use for a single
background operation
:param start: if True, start the thread immediately
"""
monitor_num = 0
monitors: List["Monitor"] = []
def __init__(
self,
func: Union[None, CallbackFunction] = None,
params: Tuple[Any, ...] = (),
once=False,
name=None,
poll_rate=0.2,
start=True,
) -> None:
if name is None:
name = f"Monitor{Monitor.monitor_num}"
Monitor.monitor_num += 1
# pollers is a list of functions, param tuples. It may be initialized
# with a single function passed in func, params
self.pollers: List[Tuple[CallbackFunction, Tuple[Any, ...]]] = (
[] if func is None else [(func, params)]
)
self.name = name
self.once = once
self.poll_rate = poll_rate
self.poll_thread = None
self._polling = False
if start:
self.start_poller()
[docs]
def start_poller(self):
"""
Begin polling the functions in the pollers list
"""
if self.poll_thread is None:
log.debug(f"starting polling thread {self.name}")
self.poll_thread = new_thread(get_client(), self._poller, self.name)
self.monitors.append(self)
self._polling = True
def _poller(self):
"""
the polling function will run until the monitor is stopped
"""
while self._polling:
try:
for func, params in self.pollers:
func(*params)
sleep(self.poll_rate)
except BrokenPipeError:
log.error(
f"Connection to Minecraft Server lost, "
f"polling terminated in {get_thread_name()}"
)
self._polling = False
except SessionTimeout:
log.warning(f"Connection timeout in {get_thread_name()}")
except PlayerNotInWorld as e:
log.warning(e)
self._polling = False
except BaseException:
# report any other exception and continue polling
log.error(f"Error in {get_thread_name()}", exc_info=True)
if self.once:
self._polling = False
if self in self.monitors:
self.monitors.remove(self)
self.poll_thread = None
self.pollers = []
if not self.once:
log.info(f"Monitor {self.name} stopped")
[docs]
def add_poller_func(self, func: CallbackFunction, params: Tuple[Any, ...] = ()):
"""
Add a function to the pollers list
:param func: function to add
:param params: parameters to pass to the function
"""
self.pollers.append((func, params))
# TODO: consider using a dict or indexing pollers in some fashion
# currently this does not support 2 calls to same function
[docs]
def remove_poller_func(self, func: CallbackFunction):
"""
Remove a function from the pollers list
:param func: function to remove
"""
for i, t in enumerate(self.pollers):
f, params = t
if f == func:
self.pollers.remove(t)
break
else:
log.error("removing unknown poller function")
[docs]
@classmethod
def stop_all(cls):
"""
Stop all instances of Monitor and tidy up. Call this before exiting
the program otherwise Python will wait on the background threads
indefinitely.
"""
if cls.monitors is not None:
for monitor in cls.monitors:
monitor.stop()
cls.monitors.clear()
log.info("Stopped all monitoring threads")
[docs]
def stop(self):
"""
Stop this instance of Monitor
"""
self._polling = False
[docs]
@classmethod
def stop_named(cls, name: str):
"""
Stop a named instance of Monitor
"""
for monitor in cls.monitors:
if monitor.name == name:
monitor.stop()
cls.monitors.remove(monitor)
break
def __del__(self):
self.stop()
def __repr__(self):
# TODO work out how to show the class of the bound method
func_list = [f.__name__ for f, p in self.pollers]
return f"{self.name} polling {func_list} at {self.poll_rate})"