Source code for ptb.shutter_protocol

import logging
import asyncio

logger = logging.getLogger(__name__)


[docs]class ShutterProtocol: """Protocol for the PTB multi-channel shutter controller""" def do(self, cmd): assert len(cmd) <= 2 logger.debug("do %s", cmd) self._writeline(cmd) async def ask(self, cmd, n=None): self.do(cmd) if n is None: ret = await self._readline() else: ret = await self._read(n) logger.debug("ret %s", ret) return ret
[docs] async def version(self): """Return the hardware/firmware version. Returns: str: Version string. """ return (await self.ask(b"v")).strip()
async def ping(self): try: await self.version() except asyncio.CancelledError: raise except: logger.warning("ping failed", exc_info=True) return False return True
[docs] async def status(self): """Return the error flags on all channels. Returns: tuple(bool): Error flag on all channels. `True` means that there is an error. """ ret = (await self.ask(b"e")).strip() return tuple(c != b"0"[0] for c in ret)
[docs] async def clear(self): """Clear all error flags. Returns: tuple(bool): Error flag on all channels after clearing. `True` means that there is an error. """ ret = (await self.ask(b"r")).strip() return tuple(c != b"0"[0] for c in ret)
[docs] async def passthrough(self, shutter, cmd): """Execute a low level command. Args: shutter (int): Shutter index (1-3) cmd (bytes(1)): Single character command (e.g. W) Return: bytes: Response """ return await self.ask(bytes(["{:d}".format(shutter).encode()[0], cmd]))