Source code for ptb.temp_protocol
import logging
import asyncio
logger = logging.getLogger(__name__)
[docs]class TempProtocol:
"""Protocol for the PTB multi-channel temperature sensor"""
def do(self, cmd):
assert len(cmd) < 64
logger.debug("do %s", cmd)
self._writeline(cmd)
async def ask(self, cmd):
self.do(cmd)
ret = await self._readline()
logger.debug("ret %s", ret)
return ret
async def read(self, n):
ret = await self._read(n)
return ret.decode()
[docs] async def version(self):
"""Return the hardware/firmware version.
Returns:
str: Version string.
"""
return (await self.ask("v")).strip()
[docs] async def get_all(self):
"""Measure and return the temperatures on all channels.
Returns:
list(float): Temperatures on all channels
"""
ret = await self.ask("a")
t = []
for i, ti in enumerate(ret.split()):
ch, ti = ti.split(":")
assert int(ch) == i
t.append(float(ti))
return t
[docs] async def get(self, channel):
"""Measure the temperature on one channel.
Args:
channel (int): Channel index to measure on
Return:
float: Temperature
"""
ret = await self.ask("{:d}".format(channel))
ch, temp = ret.split(":")
assert int(ch) == channel
return float(temp)
def _writeline(self, cmd):
raise NotImplemented
async def _readline(self):
raise NotImplemented
async def ping(self):
try:
await self.version()
except asyncio.CancelledError:
raise
except:
logger.warning("ping failed", exc_info=True)
return False
return True