blob: 4950cf89423ba5ae4602ca4a6a51cd0ced3d79cc [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import os
import re
import telnetlib
import time
logger = logging.getLogger(__name__)
class PduController(object):
def open(self, **params):
"""Open PDU controller connection"""
raise NotImplementedError
def reboot(self, **params):
"""Reboot an outlet or a board passed as params"""
raise NotImplementedError
def close(self):
"""Close PDU controller connection"""
raise NotImplementedError
class DummyPduController(PduController):
"""Dummy implementation which only says that PDU controller is not connected"""
def open(self, **params):
pass
def reboot(self, **params):
logger.info('No PDU controller connected.')
def close(self):
pass
class ApcPduController(PduController):
def __init__(self):
self.tn = None
def __del__(self):
self.close()
def _init(self):
"""Initialize the telnet connection
"""
self.tn = telnetlib.Telnet(self.ip, self.port)
self.tn.read_until('User Name')
self.tn.write('apc\r\n')
self.tn.read_until('Password')
self.tn.write('apc\r\n')
self.until_done()
def open(self, **params):
"""Open telnet connection
Args:
params (dict), must contain two parameters "ip" - ip address or hostname and "port" - port number
Example:
params = {'port': 23, 'ip': 'localhost'}
"""
logger.info('opening telnet')
self.port = params['port']
self.ip = params['ip']
self.tn = None
self._init()
def close(self):
"""Close telnet connection"""
logger.info('closing telnet')
if self.tn:
self.tn.close()
def until_done(self):
"""Wait until the prompt encountered
"""
self.until(r'^>')
def until(self, regex):
"""Wait until the regex encountered
"""
logger.debug('waiting for %s', regex)
r = re.compile(regex, re.M)
self.tn.expect([r])
def reboot(self, **params):
"""Reboot outlet
Args:
params (dict), must contain parameter "outlet" - outlet number
Example:
params = {'outlet': 1}
"""
outlet = params['outlet']
# main menu
self.tn.write('\x1b\r\n')
self.until_done()
# Device Manager
self.tn.write('1\r\n')
self.until_done()
# Outlet Management
self.tn.write('2\r\n')
self.until_done()
# Outlet Control
self.tn.write('1\r\n')
self.until_done()
# Select outlet
self.tn.write('%d\r\n' % outlet)
self.until_done()
# Control
self.tn.write('1\r\n')
self.until_done()
# off
self.tn.write('2\r\n')
self.until('to cancel')
self.tn.write('YES\r\n')
self.until('to continue')
self.tn.write('\r\n')
self.until_done()
time.sleep(5)
# on
self.tn.write('1\r\n')
self.until('to cancel')
self.tn.write('YES\r\n')
self.until('to continue')
self.tn.write('\r\n')
self.until_done()
class NordicBoardPduController(PduController):
def open(self, **params):
pass
def _pin_reset(self, serial_number):
os.system('nrfjprog -f NRF52 --snr {} -p'.format(serial_number))
def reboot(self, **params):
boards_serial_numbers = params['boards_serial_numbers']
for serial_number in boards_serial_numbers:
logger.info('Resetting board with the serial number: %s', serial_number)
self._pin_reset(serial_number)
def close(self):
pass
class ManualPduController(PduController):
def open(self, **kwargs):
pass
def reboot(self, **kwargs):
raw_input('Reset all devices and press enter to continue..')
def close(self):
pass