blob: 39687c9734c146e40e2841ae4c7e3510b573973b [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import os
import sys
import time
import pexpect
class otCli:
def __init__(self, nodeid):
self.nodeid = nodeid
self.verbose = int(float(os.getenv('VERBOSE', 0)))
self.node_type = os.getenv('NODE_TYPE', 'sim')
if self.node_type == 'soc':
self.__init_soc(nodeid)
elif self.node_type == 'ncp-sim':
self.__init_ncp_sim(nodeid)
else:
self.__init_sim(nodeid)
if self.verbose:
self.pexpect.logfile_read = sys.stdout
def __init_sim(self, nodeid):
""" Initialize a simulation node. """
if "OT_CLI_PATH" in os.environ.keys():
cmd = os.environ['OT_CLI_PATH']
elif "top_builddir" in os.environ.keys():
srcdir = os.environ['top_builddir']
cmd = '%s/examples/apps/cli/ot-cli-ftd' % srcdir
else:
cmd = './ot-cli-ftd'
cmd += ' %d' % nodeid
print ("%s" % cmd)
self.pexpect = pexpect.spawn(cmd, timeout=4)
# Add delay to ensure that the process is ready to receive commands.
time.sleep(0.2)
def __init_ncp_sim(self, nodeid):
""" Initialize an NCP simulation node. """
if "top_builddir" in os.environ.keys():
builddir = os.environ['top_builddir']
cmd = 'spinel-cli.py -p %s/examples/apps/ncp/ot-ncp-ftd -n' % (builddir)
else:
cmd = './ot-ncp-ftd'
cmd += ' %d' % nodeid
print ("%s" % cmd)
self.pexpect = pexpect.spawn(cmd, timeout=4)
time.sleep(0.2)
self.pexpect.expect('spinel-cli >')
self.debug(int(os.getenv('DEBUG', '0')))
def __init_soc(self, nodeid):
""" Initialize a System-on-a-chip node connected via UART. """
import fdpexpect
serialPort = '/dev/ttyUSB%d' % ((nodeid-1)*2)
self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR|os.O_NONBLOCK|os.O_NOCTTY))
def __del__(self):
if self.pexpect.isalive():
self.send_command('exit')
self.pexpect.expect(pexpect.EOF)
self.pexpect.terminate()
self.pexpect.close(force=True)
def send_command(self, cmd):
print ("%d: %s" % (self.nodeid, cmd))
self.pexpect.sendline(cmd)
def get_commands(self):
self.send_command('?')
self.pexpect.expect('Commands:')
commands = []
while True:
i = self.pexpect.expect(['Done', '(\S+)'])
if i != 0:
commands.append(self.pexpect.match.groups()[0])
else:
break
return commands
def set_mode(self, mode):
cmd = 'mode ' + mode
self.send_command(cmd)
self.pexpect.expect('Done')
def debug(self, level):
self.send_command('debug '+str(level))
def interface_up(self):
self.send_command('ifconfig up')
self.pexpect.expect('Done')
def interface_down(self):
self.send_command('ifconfig down')
self.pexpect.expect('Done')
def thread_start(self):
self.send_command('thread start')
self.pexpect.expect('Done')
def thread_stop(self):
self.send_command('thread stop')
self.pexpect.expect('Done')
def commissioner_start(self):
cmd = 'commissioner start'
self.send_command(cmd)
self.pexpect.expect('Done')
def commissioner_add_joiner(self, addr, psk):
cmd = 'commissioner joiner add ' + addr + ' ' + psk
self.send_command(cmd)
self.pexpect.expect('Done')
def joiner_start(self, pskd='', provisioning_url=''):
cmd = 'joiner start ' + pskd + ' ' + provisioning_url
self.send_command(cmd)
self.pexpect.expect('Done')
def clear_whitelist(self):
self.send_command('whitelist clear')
self.pexpect.expect('Done')
def enable_whitelist(self):
self.send_command('whitelist enable')
self.pexpect.expect('Done')
def disable_whitelist(self):
self.send_command('whitelist disable')
self.pexpect.expect('Done')
def add_whitelist(self, addr, rssi=None):
cmd = 'whitelist add ' + addr
if rssi != None:
cmd += ' ' + str(rssi)
self.send_command(cmd)
self.pexpect.expect('Done')
def remove_whitelist(self, addr):
cmd = 'whitelist remove ' + addr
self.send_command(cmd)
self.pexpect.expect('Done')
def get_addr16(self):
self.send_command('rloc16')
i = self.pexpect.expect('([0-9a-fA-F]{4})')
if i == 0:
addr16 = int(self.pexpect.match.groups()[0], 16)
self.pexpect.expect('Done')
return addr16
def get_addr64(self):
self.send_command('extaddr')
i = self.pexpect.expect('([0-9a-fA-F]{16})')
if i == 0:
addr64 = self.pexpect.match.groups()[0].decode("utf-8")
self.pexpect.expect('Done')
return addr64
def get_hashmacaddr(self):
self.send_command('hashmacaddr')
i = self.pexpect.expect('([0-9a-fA-F]{16})')
if i == 0:
addr = self.pexpect.match.groups()[0].decode("utf-8")
self.pexpect.expect('Done')
return addr
def get_channel(self):
self.send_command('channel')
i = self.pexpect.expect('(\d+)\r\n')
if i == 0:
channel = int(self.pexpect.match.groups()[0])
self.pexpect.expect('Done')
return channel
def set_channel(self, channel):
cmd = 'channel %d' % channel
self.send_command(cmd)
self.pexpect.expect('Done')
def get_masterkey(self):
self.send_command('masterkey')
i = self.pexpect.expect('([0-9a-fA-F]{32})')
if i == 0:
masterkey = self.pexpect.match.groups()[0].decode("utf-8")
self.pexpect.expect('Done')
return masterkey
def set_masterkey(self, masterkey):
cmd = 'masterkey ' + masterkey
self.send_command(cmd)
self.pexpect.expect('Done')
def get_key_sequence_counter(self):
self.send_command('keysequence counter')
i = self.pexpect.expect('(\d+)\r\n')
if i == 0:
key_sequence_counter = int(self.pexpect.match.groups()[0])
self.pexpect.expect('Done')
return key_sequence_counter
def set_key_sequence_counter(self, key_sequence_counter):
cmd = 'keysequence counter %d' % key_sequence_counter
self.send_command(cmd)
self.pexpect.expect('Done')
def set_key_switch_guardtime(self, key_switch_guardtime):
cmd = 'keysequence guardtime %d' % key_switch_guardtime
self.send_command(cmd)
self.pexpect.expect('Done')
def set_network_id_timeout(self, network_id_timeout):
cmd = 'networkidtimeout %d' % network_id_timeout
self.send_command(cmd)
self.pexpect.expect('Done')
def get_network_name(self):
self.send_command('networkname')
while True:
i = self.pexpect.expect(['Done', '(\S+)'])
if i != 0:
network_name = self.pexpect.match.groups()[0].decode('utf-8')
else:
break
return network_name
def set_network_name(self, network_name):
cmd = 'networkname ' + network_name
self.send_command(cmd)
self.pexpect.expect('Done')
def get_panid(self):
self.send_command('panid')
i = self.pexpect.expect('([0-9a-fA-F]{4})')
if i == 0:
panid = int(self.pexpect.match.groups()[0], 16)
self.pexpect.expect('Done')
return panid
def set_panid(self, panid):
cmd = 'panid %d' % panid
self.send_command(cmd)
self.pexpect.expect('Done')
def get_partition_id(self):
self.send_command('leaderpartitionid')
i = self.pexpect.expect('(\d+)\r\n')
if i == 0:
weight = self.pexpect.match.groups()[0]
self.pexpect.expect('Done')
return weight
def set_partition_id(self, partition_id):
cmd = 'leaderpartitionid %d' % partition_id
self.send_command(cmd)
self.pexpect.expect('Done')
def set_router_upgrade_threshold(self, threshold):
cmd = 'routerupgradethreshold %d' % threshold
self.send_command(cmd)
self.pexpect.expect('Done')
def set_router_downgrade_threshold(self, threshold):
cmd = 'routerdowngradethreshold %d' % threshold
self.send_command(cmd)
self.pexpect.expect('Done')
def release_router_id(self, router_id):
cmd = 'releaserouterid %d' % router_id
self.send_command(cmd)
self.pexpect.expect('Done')
def get_state(self):
states = ['detached', 'child', 'router', 'leader']
self.send_command('state')
match = self.pexpect.expect(states)
self.pexpect.expect('Done')
return states[match]
def set_state(self, state):
cmd = 'state ' + state
self.send_command(cmd)
self.pexpect.expect('Done')
def get_timeout(self):
self.send_command('childtimeout')
i = self.pexpect.expect('(\d+)\r\n')
if i == 0:
timeout = self.pexpect.match.groups()[0]
self.pexpect.expect('Done')
return timeout
def set_timeout(self, timeout):
cmd = 'childtimeout %d' % timeout
self.send_command(cmd)
self.pexpect.expect('Done')
def set_max_children(self, number):
cmd = 'childmax %d' % number
self.send_command(cmd)
self.pexpect.expect('Done')
def get_weight(self):
self.send_command('leaderweight')
i = self.pexpect.expect('(\d+)\r\n')
if i == 0:
weight = self.pexpect.match.groups()[0]
self.pexpect.expect('Done')
return weight
def set_weight(self, weight):
cmd = 'leaderweight %d' % weight
self.send_command(cmd)
self.pexpect.expect('Done')
def add_ipaddr(self, ipaddr):
cmd = 'ipaddr add ' + ipaddr
self.send_command(cmd)
self.pexpect.expect('Done')
def get_addrs(self):
addrs = []
self.send_command('ipaddr')
while True:
i = self.pexpect.expect(['(\S+:\S+)\r\n', 'Done'])
if i == 0:
addrs.append(self.pexpect.match.groups()[0].decode("utf-8"))
elif i == 1:
break
return addrs
def get_context_reuse_delay(self):
self.send_command('contextreusedelay')
i = self.pexpect.expect('(\d+)\r\n')
if i == 0:
timeout = self.pexpect.match.groups()[0]
self.pexpect.expect('Done')
return timeout
def set_context_reuse_delay(self, delay):
cmd = 'contextreusedelay %d' % delay
self.send_command(cmd)
self.pexpect.expect('Done')
def add_prefix(self, prefix, flags, prf = 'med'):
cmd = 'prefix add ' + prefix + ' ' + flags + ' ' + prf
self.send_command(cmd)
self.pexpect.expect('Done')
def remove_prefix(self, prefix):
cmd = 'prefix remove ' + prefix
self.send_command(cmd)
self.pexpect.expect('Done')
def add_route(self, prefix, prf = 'med'):
cmd = 'route add ' + prefix + ' ' + prf
self.send_command(cmd)
self.pexpect.expect('Done')
def remove_route(self, prefix):
cmd = 'route remove ' + prefix
self.send_command(cmd)
self.pexpect.expect('Done')
def register_netdata(self):
self.send_command('netdataregister')
self.pexpect.expect('Done')
def energy_scan(self, mask, count, period, scan_duration, ipaddr):
cmd = 'commissioner energy ' + str(mask) + ' ' + str(count) + ' ' + str(period) + ' ' + str(scan_duration) + ' ' + ipaddr
self.send_command(cmd)
self.pexpect.expect('Energy:', timeout=8)
def panid_query(self, panid, mask, ipaddr):
cmd = 'commissioner panid ' + str(panid) + ' ' + str(mask) + ' ' + ipaddr
self.send_command(cmd)
self.pexpect.expect('Conflict:', timeout=8)
def scan(self):
self.send_command('scan')
results = []
while True:
i = self.pexpect.expect(['\|\s(\S+)\s+\|\s(\S+)\s+\|\s([0-9a-fA-F]{4})\s\|\s([0-9a-fA-F]{16})\s\|\s(\d+)\r\n',
'Done'])
if i == 0:
results.append(self.pexpect.match.groups())
else:
break
return results
def ping(self, ipaddr, num_responses=1, size=None, timeout=5000):
cmd = 'ping ' + ipaddr
if size != None:
cmd += ' ' + str(size)
self.send_command(cmd)
result = True
try:
responders = {}
while len(responders) < num_responses:
i = self.pexpect.expect(['from (\S+):'])
if i == 0:
responders[self.pexpect.match.groups()[0]] = 1
self.pexpect.expect('\n')
except pexpect.TIMEOUT:
result = False
return result
def reset(self):
self.send_command('reset')
def set_router_selection_jitter(self, jitter):
cmd = 'routerselectionjitter %d' % jitter
self.send_command(cmd)
self.pexpect.expect('Done')
def set_active_dataset(self, timestamp, panid=None, channel=None, channel_mask=None, master_key=None):
self.send_command('dataset clear')
self.pexpect.expect('Done')
cmd = 'dataset activetimestamp %d' % timestamp
self.send_command(cmd)
self.pexpect.expect('Done')
if panid != None:
cmd = 'dataset panid %d' % panid
self.send_command(cmd)
self.pexpect.expect('Done')
if channel != None:
cmd = 'dataset channel %d' % channel
self.send_command(cmd)
self.pexpect.expect('Done')
if channel_mask != None:
cmd = 'dataset channelmask %d' % channel_mask
self.send_command(cmd)
self.pexpect.expect('Done')
if master_key != None:
cmd = 'dataset masterkey ' + master_key
self.send_command(cmd)
self.pexpect.expect('Done')
self.send_command('dataset commit active')
self.pexpect.expect('Done')
def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None):
self.send_command('dataset clear')
self.pexpect.expect('Done')
cmd = 'dataset pendingtimestamp %d' % pendingtimestamp
self.send_command(cmd)
self.pexpect.expect('Done')
cmd = 'dataset activetimestamp %d' % activetimestamp
self.send_command(cmd)
self.pexpect.expect('Done')
if panid != None:
cmd = 'dataset panid %d' % panid
self.send_command(cmd)
self.pexpect.expect('Done')
if channel != None:
cmd = 'dataset channel %d' % channel
self.send_command(cmd)
self.pexpect.expect('Done')
self.send_command('dataset commit pending')
self.pexpect.expect('Done')
def announce_begin(self, mask, count, period, ipaddr):
cmd = 'commissioner announce ' + str(mask) + ' ' + str(count) + ' ' + str(period) + ' ' + ipaddr
self.send_command(cmd)
self.pexpect.expect('Done')
def send_mgmt_active_set(self, active_timestamp=None, channel=None, channel_mask=None, extended_panid=None,
panid=None, master_key=None, mesh_local=None, network_name=None, binary=None):
cmd = 'dataset mgmtsetcommand active '
if active_timestamp != None:
cmd += 'activetimestamp %d ' % active_timestamp
if channel != None:
cmd += 'channel %d ' % channel
if channel_mask != None:
cmd += 'channelmask %d ' % channel_mask
if extended_panid != None:
cmd += 'extpanid ' + extended_panid + ' '
if panid != None:
cmd += 'panid %d ' % panid
if master_key != None:
cmd += 'masterkey ' + master_key + ' '
if mesh_local != None:
cmd += 'localprefix ' + mesh_local + ' '
if network_name != None:
cmd += 'networkname ' + network_name + ' '
if binary != None:
cmd += 'binary ' + binary + ' '
self.send_command(cmd)
self.pexpect.expect('Done')
def send_mgmt_pending_set(self, pending_timestamp=None, active_timestamp=None, delay_timer=None, channel=None,
panid=None, master_key=None, mesh_local=None, network_name=None):
cmd = 'dataset mgmtsetcommand pending '
if pending_timestamp != None:
cmd += 'pendingtimestamp %d ' % pending_timestamp
if active_timestamp != None:
cmd += 'activetimestamp %d ' % active_timestamp
if delay_timer != None:
cmd += 'delaytimer %d ' % delay_timer
if channel != None:
cmd += 'channel %d ' % channel
if panid != None:
cmd += 'panid %d ' % panid
if master_key != None:
cmd += 'masterkey ' + master_key + ' '
if mesh_local != None:
cmd += 'localprefix ' + mesh_local + ' '
if network_name != None:
cmd += 'networkname ' + network_name + ' '
self.send_command(cmd)
self.pexpect.expect('Done')