blob: 937868a2c3abd454857d50659e0cf746f534501b [file] [log] [blame] [edit]
"""Base class for build steps."""
import logging
import multiprocessing
import os
import re
import subprocess
import psutil
from cq.scripts.helpers import git_utils
from cq.scripts.helpers import timeout
# Preventing running any step longer than this amount of time.
STEP_MAX_TIME_SECONDS = 2 * 60 * 60
# If it takes a subprocess more than 2 mins to exit, let's assume it's hung
# and will probably never return properly.
_SUBPROCESS_TIMEOUT = 2 * 60
class StepError(Exception):
"""Exception raised when a step failure happens."""
class GoBlsremoteQuotaError(StepError):
"""Exception raise for the GoB lsremote quota exceeded error."""
ERROR_MSG = 'Short term ls-remote rate limit exceeded'
class GoBbandwidthQuotaError(StepError):
"""Exception raise for the GoB bandwidth quota exceeded error."""
ERROR_MSG = 'Short term bandwidth rate limit exceeded'
class GoBManifestInvalidRevisionError(StepError):
"""Exception raise for the GoB invalid manifest revision error."""
ERROR_MSG = 'ManifestInvalidRevisionError'
class BaseStep:
"""Base class for build steps.
Implementers should override the methods below as appropriate.
Attributes:
name: user-visible name of this step.
env: representation of build_env build property.
data_manager: handles managing data between steps.
executor: invokes and interacts with external processes.
annotator: receives annotations from steps to communicate to build master.
halt_on_failure: whether or not this step's failure should end recipe
execution.
allow_list_filter: If specified, a list of regex's to use to filter
files that get linted. Only file names that match
at least one filter in the allow_list will get linted.
An empty list will result in no files getting linted.
If a file matches a allow_list filter and a deny_list
filter, the file will be deny_listed.
(default allows all files)
deny_list_filter: If specified, a list of regex's to use to filter files
that get linted. Only file names that do not match
any filter in the deny_list will get linted.
If a file matches a allow_list filter and a deny_list
filter, the file will be deny_listed.
(default allows all files)
"""
def __init__(self,
name: str,
executor=None,
annotator=None,
data_manager=None,
halt_on_failure=False,
properties=None,
directory=None,
timeout_secs=STEP_MAX_TIME_SECONDS,
result_on_timeout=False,
allow_list_filter=None,
deny_list_filter=None,
prerun_data=None,
build_accelerator=None, **kwargs):
assert name
assert executor
assert data_manager
assert build_accelerator
assert isinstance(halt_on_failure, bool)
if allow_list_filter:
assert isinstance(allow_list_filter, list)
if deny_list_filter:
assert isinstance(deny_list_filter, list)
self._name = name
self._executor = executor
self._annotator = annotator
self._data_manager = data_manager
self._non_blocking_subprocesses = []
self._halt_on_failure = halt_on_failure
self._properties = properties
self._directory = directory
self._project_directory = None
self._project_remote = None
if not isinstance(timeout_secs, int):
timeout_secs = 0
self._timeout_secs = timeout_secs
self._result_on_timeout = result_on_timeout
self._run_results = {
'name': self.name,
'exit_code': 0,
'files': {},
'logs': {},
'valid': False
}
if prerun_data is None:
prerun_data = {}
else:
assert isinstance(prerun_data, dict)
self._run_results['valid'] = True
self._run_results.update(prerun_data)
if allow_list_filter is None:
allow_list_filter = ['.*']
if deny_list_filter is None:
deny_list_filter = []
self._allow_list_filter = [re.compile(regex) for regex in allow_list_filter]
self._deny_list_filter = [re.compile(regex) for regex in deny_list_filter]
self._last_subprocess_cmd = None
self._build_accelerator = build_accelerator
_ = kwargs
@property
def results(self):
"""Returns the result of this step run."""
return self._run_results
@property
def name(self):
"""Returns the name of this step."""
return self._name
@property
def build_system(self):
"""Returns the name of current build system."""
return self.get_property('build_system', 'local')
@property
def env(self):
return self.get_property('build_env', 'local')
@property
def remote(self):
"""Returns the remote to use for this step."""
if not self._project_remote:
self._project_remote = self.get_project_remote(self.patch_project)
return self._project_remote
@property
def directory(self):
"""Returns the directory to use for this step."""
if self._directory is not None:
return self._directory
if not self._project_directory:
if self.patch_project:
self._project_directory = self.get_project_path(self.patch_project)
else:
self._project_directory = ''
return self._project_directory
@property
def build_accelerator(self):
return self._build_accelerator
def get_property(self, name, default=None):
"""Returns the requested property value, or default if not found."""
if not self._properties:
return default
return self._properties.get(name, default)
@property
def depends_on_list(self):
"""Returns the list of Depends-On information.
Each item in the list is a dict of the form:
{
'gob_host': <Gerrit on borg host name>
'project': <Project name, should match a name in the repo manifest>
'change_number': <CL #>
'patchset_number': <PS #>
}
Returns:
List of depends-on dicts.
"""
return self.get_property('depends_on', default=[])
@property
def build_number(self):
"""Construct BUILD_NUMBER string in unified way between all systems.
This property is matching the same property in the BaseRecipe, should be
altered together.
Returns:
CL number (from 'issue' porperty) for CQ, buildset number for CB or 0
"""
return str(self.get_property(
'issue', default=self.get_property('buildset', default='0')))
@property
def patchset(self):
"""Returns the patchset number for this step."""
return self.get_property('patchset', default='0')
@property
def patch_project(self):
"""Returns the patch_project to use for this step."""
return self.get_property('patch_project')
@property
def manifest_branch(self):
"""Returns the manifest branch to use for this step."""
return self.get_property('manifest_branch')
def allow_listed(self, filename):
return any(re.search(regex, filename) for regex in self._allow_list_filter)
def deny_listed(self, filename):
return any(re.search(regex, filename) for regex in self._deny_list_filter)
def changed_files(self, file_extensions=None):
changed_files = git_utils.changed_files(
self, directory=self.directory, file_extensions=file_extensions)
return [f for f in changed_files
if self.allow_listed(f) and not self.deny_listed(f)]
def get_num_jobs(self, multiplier=2):
"""Returns the number of jobs this step can use.
Uses a heuristic to figure out the number of parallel jobs supported by
the build machine. Currently this is just 2x the number of CPU cors.
Args:
multiplier: The multiplier to use.
Returns:
The number of jobs to use.
"""
return multiprocessing.cpu_count() * multiplier
def exec_subprocess(self,
command,
check_output=False,
cwd=None,
*args,
**kwargs): # pylint: disable=keyword-arg-before-vararg
"""Execute a command.
Args:
command: The command to execute.
check_output: Whether or not to raise a CalledProcessError on non-zero
return code.
cwd: The current working directory to use for the subprocess.
Defaults to os.getcwd()
*args: positional params.
**kwargs: Key/Value dictionary params.
Returns:
Either just stderr if check_output=True, or a tuple of
(returncode, stdout, stderr)
"""
self._last_subprocess_cmd = command
returncode, stdout, stderr = \
self._executor.subprocess(command, *args, cwd=cwd, **kwargs)
if check_output:
if returncode != 0:
full_output = stdout
if stderr:
full_output += '\nstderr:\n' + stderr
raise subprocess.CalledProcessError(returncode,
cmd=command,
output=full_output)
return stdout
return returncode, stdout, stderr
def exec_subprocess_chain(self, subprocess_chain):
"""Execute a chain of commands, piping output between processes."""
msg = ['Chain of commands, piping output between processes:']
commands = msg + [' '.join(command[0]) for command in subprocess_chain]
commands = [cmd.strip() + '\n' for cmd in commands]
self._last_subprocess_cmd = commands
return self._executor.subprocess_chain(subprocess_chain)
def exec_non_blocking_subprocess(self, *args, **kwargs):
subprocess, log_file_descriptor, log_file_path = (
self._executor.non_blocking_subprocess(*args, **kwargs))
self._non_blocking_subprocesses.append((subprocess, log_file_descriptor))
return subprocess, log_file_descriptor, log_file_path
def start_subprocess(self, *args, **kwargs):
"""Direct call to the subprocess.Popen.
Args:
stdin: A file stream, subprocess.PIPE, or None to read stdin from.
stdout: A file stream or subprocess.PIPE to redirect stdout.
stderr: A file stream or subprocess.PIPE to redirect stderr.
*args: passed to subprocess.Popen
**kwargs: passed to subprocess.Popen;
optionally, quiet=True can be passed as a kwarg,
in which case stdout/stderr from the subprocess
will not be written to file. This kwarg will not
get passed to subprocess.Popen.
Returns:
Tuple of (subprocess, boolean indicating if subprocess output needs to be
saved.)
"""
return self._executor.start_subprocess(*args, **kwargs)
def get_running_subprocess_pids(self):
current_running_processes = []
for subprocess, _ in self._non_blocking_subprocesses:
if subprocess.returncode is None:
current_running_processes.append(subprocess.pid)
return current_running_processes
def kill_all_blocking_subprocesses(self):
"""Killing all current blocking subprocesses."""
msg = u'Last cmd run by the executor: {}'.format(
str(self._last_subprocess_cmd))
# Print all current running processes
self.exec_subprocess(['ps', '-efH'])
for subprocess in self._executor.get_blocking_subprocesses():
pid = subprocess.pid
if not psutil.pid_exists(pid):
logging.info('[Kill all blocking] Process %d started at %d and is no '
'longer running, skip', pid, subprocess.start)
continue
subprocess_info = psutil.Process(pid)
logging.info('[Kill all blocking] Process %d, (%s) started at %d. '
'Communicating with process...',
pid, ' '.join(subprocess_info.cmdline()), subprocess.start)
stdout = None
stderr = None
try:
stdout, stderr = subprocess.communicate(timeout=_SUBPROCESS_TIMEOUT)
logging.info('[Kill all blocking] Process %d: exited with '
'returncode = %d.', pid, subprocess.returncode)
except timeout.TimeoutException as e:
# Due to a quirkiness of Python imports, we cannot catch
# subprocess42.TimeoutExpired directly, so we are catching its base
# class instead. However, we need to check for the attributes existence
# to make sure this is indeed subprocess42.TimeoutExpired.
if hasattr(e, 'output'):
stdout = e.output
if hasattr(e, 'stderr'):
stderr = e.stderr
logging.exception('Timeout (%d s) expired while waiting for the '
'process to finish. Killing process: %d',
_SUBPROCESS_TIMEOUT, pid)
subprocess.kill()
subprocess.wait()
finally:
if stdout:
logging.info('[Kill all blocking] Stdout size for process %d: %d.',
pid, len(stdout))
if stderr:
logging.info('[Kill all blocking] Stderr size for process %d: %d.',
pid, len(stderr))
def kill_all_non_blocking_subprocesses(self):
"""Killing all current non-blocking subprocesses."""
for subprocess, log_file_descriptor in self._non_blocking_subprocesses:
try:
subprocess.poll()
if subprocess.returncode is None:
logging.info('Killing process: %d', subprocess.pid)
subprocess.kill()
except: # pylint: disable=bare-except
logging.exception('Killing process failed.')
try:
logging.info('Closing file handler %d', log_file_descriptor)
os.close(log_file_descriptor)
except: # pylint: disable=bare-except
logging.exception('Closing file handler failed.')
def exec_read_file(self, file_path):
return self._executor.read_file(file_path)
def glob_files(self, glob_expr):
return self._executor.glob_files(glob_expr)
def get_tmp_dir(self):
return self._executor.get_tmp_dir()
def get_new_tmp_file(self):
return self._executor.get_new_tmp_file()
def write_temp_file(self, contents: str):
"""Write a text string to a temporary file."""
return self._executor.write_temp_file(contents)
def add_review(self, review):
"""Adds |review| to the review comment.
Args:
review: A dict with optional 'message' and 'comments' keys.
If 'message' is present, it will be added as a review comment.
If 'comments' is present, they will be added as file comments.
'message': A string to be added to the review message.
'comments': A dict of comments in the form of
{<filename>: [{'line': <line #>,
'message': <file comment>},
{'line': <line #>,
'message': <file comment>},
...
],
<filename>: [...],
...
}
Note that the 'line' field is optional, and if not
provided will result in a file-level comment instead
of a line-specific comment.
To add a comment to the commit message, use
'/COMMIT_MSG' as the filename.
"""
if 'message' in review:
assert isinstance(review['message'], str)
if self._annotator:
self._annotator.add_message(review['message'])
if 'comments' in review:
assert isinstance(review['comments'], dict)
if self._annotator:
self._annotator.add_comments(review['comments'])
def get_cq_root(self):
"""Returns the absolute path to the root of the CQ src."""
return self._executor.get_cq_root()
def get_gcs_dir(self):
"""This directory will be uploaded to GCS when a recipe finishes."""
return self._executor.get_gcs_dir()
def set_build_property(self, name, value):
"""Sets a build property for this step to return to the builder.
Args:
name: property name (must be a string)
value: property value (must be JSON-serializable)
"""
assert isinstance(name, str)
assert value is not None
self._properties[name] = value
if self._annotator:
self._annotator.set_build_property(name, value)
def halt_on_failure(self):
"""Returns whether this step's failure should end the recipe."""
return self._halt_on_failure
def set_halt_on_failure(self):
"""Forces step failure to end the recipe run."""
self._halt_on_failure = True
def get_name(self):
"""Returns the name of this step, used for status display and flow control.
Returns:
The name of this step.
"""
return self._name
def get_timeout(self):
"""Returns the length of time, in seconds, this step is allowed to run."""
return self._timeout_secs
def result_on_timeout(self):
"""Returns the result value to use if this step times out."""
return self._result_on_timeout
def get_step_data(self, key, default=None):
"""Retrieves step data from a previous step.
Args:
key: Unique name the step data was indexed by.
default: Default return value if the key is not found.
Returns:
Data associated with the given key.
"""
return self._data_manager.get_data(key, default=default)
def add_step_data(self, key, value, replace=False):
"""Provides step data for future steps.
Args:
key: Unique name to index this step data.
value: Actual data to provide to future steps.
replace: whether |value| can replace previously stored data in |key|.
(defaults to False)
"""
self._data_manager.add_data(key, value, replace)
def clear_project_info(self):
"""Clears the project info to be reset in the next run."""
return self._data_manager.clear_project_info()
def contains_project(self, project):
"""Boolean if the given project is included in the repo client."""
return self._data_manager.contains_project(project)
def ensure_project_info(self):
"""Populate the project info."""
return self._data_manager.ensure_project_info()
def get_project_path(self, project):
"""Returns the path to the project.
Args:
project: The name of the project (usually patch_project) to lookup.
Returns:
The relative path from the base checkout to the project.
"""
return self._data_manager.get_project_path(project)
def get_project_path_lookup_table(self):
"""Returns the project path lookup table."""
return self._data_manager.get_project_path_lookup_table()
def get_project_remote(self, project):
"""Returns the remote for the project.
Args:
project: The name of the project (usually patch_project) to lookup.
Returns:
The remote for the project.
"""
return self._data_manager.get_project_remote(project)
def get_project_revision(self, project):
"""Returns the revision for the project.
Args:
project: The name of the project (usually patch_project) to lookup.
Returns:
The revision for the project.
"""
return self._data_manager.get_project_revision(project)
def run(self) -> bool:
"""Synchronously executes this step of the recipe.
Returns:
Whether or not the step succeeded.
"""
raise NotImplementedError("Subclasses should have implemented this.")
def add_mock_executions(self, generator):
"""Adds mock executions to the generator."""
def _getcwd(self):
"""Helper function to make it easy to mock os.getcwd() for tests."""
return os.getcwd()