Source code for pyjector.pyjector

"""Control your projector via serial port.

.. moduleauthor:: John Brodie <john@brodie.me>

"""
from time import sleep
import json
import os

import serial

PATH = 'pyjector/projector_configs/'  # TODO: Do this better


[docs]class Pyjector(object): available_configs = {} possible_pyserial_settings = [ 'port', 'baudrate', 'bytesize', 'parity', 'stopbits', 'timeout', 'xonxoff', 'rtscts', 'dsrdtr', 'writeTimeout', 'InterCharTimeout', ] pyserial_config_converter = { 'bytesize': { 5: serial.FIVEBITS, 6: serial.SIXBITS, 7: serial.SEVENBITS, 8: serial.EIGHTBITS, }, 'parity': { 'none': serial.PARITY_NONE, 'even': serial.PARITY_EVEN, 'odd': serial.PARITY_ODD, 'mark': serial.PARITY_MARK, 'space': serial.PARITY_SPACE, }, 'stopbits': { 1: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2: serial.STOPBITS_TWO, }, } def __init__( self, port=None, device_id='benq', **kwargs ): """Initialize a new Pyjector object. :param port: The device name or port number your device is connected to. If left as ``None``, you must call :func:`open` with the port before issuing commands. :param device_id: The string which identifies the command set to use with your device. .. note:: Currently, only the default value, 'benq', is supported. Please fill in a config file for your projector and make a pull request! :param **kwargs: Any extra keyword args will be passed to the internal :mod:`pyserial` :class:`serial.Serial` object, and will override any device settings specified in the command set. """ self.port = port self.device_id = device_id self.get_config(device_id, kwargs) self.serial = self._initialize_pyserial(port) self._create_commands()
[docs] def get_config(self, device_id, overrides): """Get configuration for :mod:`pyserial` and the device. :param device_id: The string which identifies the command set to use with your device. """ self.available_configs = self._populate_configs() self.config = self.get_device_config_from_id(device_id) self._apply_overrides(overrides) self._validate_config() self.pyserial_config = self.get_pyserial_config()
def _validate_config(self): """Do basic sanity-checking on the loaded `config`.""" if 'serial' not in self.config: raise KeyError( 'Configuration file for {0} does not contain needed serial' 'config values. Add a `serial` section to the config.'.format( self.device_id) ) if ('command_list' not in self.config or len(self.config['command_list']) == 0): raise KeyError( 'Configuration file for {0} does not define any commands. ' 'Add a `serial` section to the config.'.format( self.device_id) ) def _populate_configs(self): """Load all json config files for devices. :returns: dict -- All available configs. """ configs = {} for f in os.listdir(PATH): if f.endswith('.json'): data = open(PATH + f) json_data = json.loads(data.read()) name = os.path.splitext(f)[0] configs[name] = json_data return configs def _apply_overrides(self, overrides): """Override specified values of the default configuration. Any configuration values for the internal:mod:`pyserial` :class:`serial.Serial` specified will override the defaults from the device configuration. Any config values not specified will be left at the device default. :param overrides: A dict of configuration values. """ self.config.update(overrides)
[docs] def get_device_config_from_id(self, device_id): """Get device configuration. :param device_id: The string which identifies the command set to use. :returns: dict -- The device configuration, including default :mod:`pyserial` settings, as well as the command set. """ try: config = self.available_configs[device_id] except KeyError: raise KeyError( 'Could not find device config with name {0}. ' 'Check that the file exists in ' ' `pyjector/projector_configs/`'.format(device_id) ) return config
[docs] def get_pyserial_config(self): """Get the :mod:`pyserial` config values from the device config. This also checks that config values are sane, and casts them to the appropriate type, as needed. :func:`get_device_config_from_id` must be called before this method. :returns: dict -- The config values for :class:`serial.Serial`. :raises: KeyError """ serial_config = self.config['serial'] for key, value in serial_config.items(): if key not in self.possible_pyserial_settings: raise KeyError( 'Configuration file for {0} specifies a serial ' 'setting "{1}" not recognized by pyserial. Check ' 'http://pyserial.sourceforge.net/pyserial_api.html' 'for valid settings'.format( self.device_id, key) ) if key in self.pyserial_config_converter: try: serial_config[key] = ( self.pyserial_config_converter[key][value]) except KeyError: raise KeyError( 'Configuration file for {0} specifies a serial ' 'setting for "{1}" for key "{2}" not recognized ' 'by pyserial. Check ' 'http://pyserial.sourceforge.net/pyserial_api.html' 'for valid settings'.format( self.device_id, value, key) ) return serial_config
def _initialize_pyserial(self, port): """Initialize the internal :class:`serial.Serial` object. Initializes the :mod:`pyserial` :class:`serial.Serial` object with configuration from `self.config`. :param port: The device name or port number your device is connected to. If left as ``None``, you must call :func:`open` with the port before issuing commands. :returns: :class:`serial.Serial` -- The internal object to use to communicate with the projector. """ return serial.Serial(port=port, **self.pyserial_config) def _command_handler(self, command, action): """Send the `command` and `action` to the device. :param command: The command to send, for example, "power". :param action: The action to send, for example, "on". :returns: str -- The response from the device. """ command_string = self._create_command_string(command, action) self.serial.write(command_string) sleep(self.config.get('wait_time', 1)) return self._get_response() def _create_commands(self): """Add commands to class.""" # TODO: Clean this up. def _create_handler(command): def handler(action): return self._command_handler(command, action) return handler for command in self.command_spec: setattr(self, command, _create_handler(command)) def _get_response(self): """Get any message waiting in the serial port buffer.""" response = '' while self.serial.inWaiting() > 0: response += self.serial.read(1) return response def _create_command_string(self, command, action): """Create a command string ready to send to the device.""" command_string = ( '{left_surround}{command}{seperator}' '{action}{right_surround}'.format( left_surround=self.config.get('left_surround', ''), command=command, seperator=self.config.get('seperator', ''), action=action, right_surround=self.config.get('right_surround', ''), ) ) return command_string @property
[docs] def command_spec(self): """Return all command specifications. :returns: dict -- All command specs, with the pattern: "<alias>": { "command": "<serial_command>", "actions": { "<alias>": "<serial_command>", ..., }, }, ... """ return self.config['command_list']

This Page