Source code for ptb.voltage_tcp

import asyncio

from .voltage_protocol import VoltageProtocol


[docs]class VoltageTCP(VoltageProtocol): eol_write = b"\n" eol_read = b"\n" def __init__(self, reader, writer): self._reader = reader self._writer = writer @classmethod async def connect(cls, host, port=80, **kwargs): reader, writer = await asyncio.open_connection(host, port, **kwargs) return cls(reader, writer) def __enter__(self): return self def __exit__(self, *exc): self.close() def close(self): self._writer.close() def _writeline(self, cmd): self._writer.write(cmd.encode() + self.eol_write) async def _readline(self): r = await self._reader.readline() assert r.endswith(self.eol_read) return r[:-len(self.eol_read)].decode() async def _read(self, n): return await self._reader.read(n)