| """Base class for build steps.""" |
| import logging |
| import multiprocessing |
| import os |
| import re |
| import subprocess |
| |
| import psutil |
| |
| from cq.scripts.helpers import git_utils |
| from cq.scripts.helpers import timeout |
| |
| # Preventing running any step longer than this amount of time. |
| STEP_MAX_TIME_SECONDS = 2 * 60 * 60 |
| |
| # If it takes a subprocess more than 2 mins to exit, let's assume it's hung |
| # and will probably never return properly. |
| _SUBPROCESS_TIMEOUT = 2 * 60 |
| |
| |
| class StepError(Exception): |
| """Exception raised when a step failure happens.""" |
| |
| |
| class GoBlsremoteQuotaError(StepError): |
| """Exception raise for the GoB lsremote quota exceeded error.""" |
| ERROR_MSG = 'Short term ls-remote rate limit exceeded' |
| |
| |
| class GoBbandwidthQuotaError(StepError): |
| """Exception raise for the GoB bandwidth quota exceeded error.""" |
| ERROR_MSG = 'Short term bandwidth rate limit exceeded' |
| |
| |
| class GoBManifestInvalidRevisionError(StepError): |
| """Exception raise for the GoB invalid manifest revision error.""" |
| ERROR_MSG = 'ManifestInvalidRevisionError' |
| |
| |
| class BaseStep: |
| """Base class for build steps. |
| |
| Implementers should override the methods below as appropriate. |
| |
| Attributes: |
| name: user-visible name of this step. |
| env: representation of build_env build property. |
| data_manager: handles managing data between steps. |
| executor: invokes and interacts with external processes. |
| annotator: receives annotations from steps to communicate to build master. |
| halt_on_failure: whether or not this step's failure should end recipe |
| execution. |
| allow_list_filter: If specified, a list of regex's to use to filter |
| files that get linted. Only file names that match |
| at least one filter in the allow_list will get linted. |
| An empty list will result in no files getting linted. |
| If a file matches a allow_list filter and a deny_list |
| filter, the file will be deny_listed. |
| (default allows all files) |
| deny_list_filter: If specified, a list of regex's to use to filter files |
| that get linted. Only file names that do not match |
| any filter in the deny_list will get linted. |
| If a file matches a allow_list filter and a deny_list |
| filter, the file will be deny_listed. |
| (default allows all files) |
| """ |
| |
| def __init__(self, |
| name: str, |
| executor=None, |
| annotator=None, |
| data_manager=None, |
| halt_on_failure=False, |
| properties=None, |
| directory=None, |
| timeout_secs=STEP_MAX_TIME_SECONDS, |
| result_on_timeout=False, |
| allow_list_filter=None, |
| deny_list_filter=None, |
| prerun_data=None, |
| build_accelerator=None, **kwargs): |
| assert name |
| assert executor |
| assert data_manager |
| assert build_accelerator |
| assert isinstance(halt_on_failure, bool) |
| if allow_list_filter: |
| assert isinstance(allow_list_filter, list) |
| if deny_list_filter: |
| assert isinstance(deny_list_filter, list) |
| |
| self._name = name |
| self._executor = executor |
| self._annotator = annotator |
| self._data_manager = data_manager |
| self._non_blocking_subprocesses = [] |
| self._halt_on_failure = halt_on_failure |
| self._properties = properties |
| self._directory = directory |
| self._project_directory = None |
| self._project_remote = None |
| if not isinstance(timeout_secs, int): |
| timeout_secs = 0 |
| self._timeout_secs = timeout_secs |
| self._result_on_timeout = result_on_timeout |
| self._run_results = { |
| 'name': self.name, |
| 'exit_code': 0, |
| 'files': {}, |
| 'logs': {}, |
| 'valid': False |
| } |
| if prerun_data is None: |
| prerun_data = {} |
| else: |
| assert isinstance(prerun_data, dict) |
| self._run_results['valid'] = True |
| self._run_results.update(prerun_data) |
| |
| if allow_list_filter is None: |
| allow_list_filter = ['.*'] |
| if deny_list_filter is None: |
| deny_list_filter = [] |
| self._allow_list_filter = [re.compile(regex) for regex in allow_list_filter] |
| self._deny_list_filter = [re.compile(regex) for regex in deny_list_filter] |
| self._last_subprocess_cmd = None |
| self._build_accelerator = build_accelerator |
| |
| _ = kwargs |
| |
| @property |
| def results(self): |
| """Returns the result of this step run.""" |
| return self._run_results |
| |
| @property |
| def name(self): |
| """Returns the name of this step.""" |
| return self._name |
| |
| @property |
| def build_system(self): |
| """Returns the name of current build system.""" |
| return self.get_property('build_system', 'local') |
| |
| @property |
| def env(self): |
| return self.get_property('build_env', 'local') |
| |
| @property |
| def remote(self): |
| """Returns the remote to use for this step.""" |
| if not self._project_remote: |
| self._project_remote = self.get_project_remote(self.patch_project) |
| return self._project_remote |
| |
| @property |
| def directory(self): |
| """Returns the directory to use for this step.""" |
| if self._directory is not None: |
| return self._directory |
| if not self._project_directory: |
| if self.patch_project: |
| self._project_directory = self.get_project_path(self.patch_project) |
| else: |
| self._project_directory = '' |
| return self._project_directory |
| |
| @property |
| def build_accelerator(self): |
| return self._build_accelerator |
| |
| def get_property(self, name, default=None): |
| """Returns the requested property value, or default if not found.""" |
| if not self._properties: |
| return default |
| return self._properties.get(name, default) |
| |
| @property |
| def depends_on_list(self): |
| """Returns the list of Depends-On information. |
| |
| Each item in the list is a dict of the form: |
| { |
| 'gob_host': <Gerrit on borg host name> |
| 'project': <Project name, should match a name in the repo manifest> |
| 'change_number': <CL #> |
| 'patchset_number': <PS #> |
| } |
| |
| Returns: |
| List of depends-on dicts. |
| """ |
| return self.get_property('depends_on', default=[]) |
| |
| @property |
| def build_number(self): |
| """Construct BUILD_NUMBER string in unified way between all systems. |
| |
| This property is matching the same property in the BaseRecipe, should be |
| altered together. |
| |
| Returns: |
| CL number (from 'issue' porperty) for CQ, buildset number for CB or 0 |
| """ |
| return str(self.get_property( |
| 'issue', default=self.get_property('buildset', default='0'))) |
| |
| @property |
| def patchset(self): |
| """Returns the patchset number for this step.""" |
| return self.get_property('patchset', default='0') |
| |
| @property |
| def patch_project(self): |
| """Returns the patch_project to use for this step.""" |
| return self.get_property('patch_project') |
| |
| @property |
| def manifest_branch(self): |
| """Returns the manifest branch to use for this step.""" |
| return self.get_property('manifest_branch') |
| |
| def allow_listed(self, filename): |
| return any(re.search(regex, filename) for regex in self._allow_list_filter) |
| |
| def deny_listed(self, filename): |
| return any(re.search(regex, filename) for regex in self._deny_list_filter) |
| |
| def changed_files(self, file_extensions=None): |
| changed_files = git_utils.changed_files( |
| self, directory=self.directory, file_extensions=file_extensions) |
| return [f for f in changed_files |
| if self.allow_listed(f) and not self.deny_listed(f)] |
| |
| def get_num_jobs(self, multiplier=2): |
| """Returns the number of jobs this step can use. |
| |
| Uses a heuristic to figure out the number of parallel jobs supported by |
| the build machine. Currently this is just 2x the number of CPU cors. |
| |
| Args: |
| multiplier: The multiplier to use. |
| |
| Returns: |
| The number of jobs to use. |
| """ |
| return multiprocessing.cpu_count() * multiplier |
| |
| def exec_subprocess(self, |
| command, |
| check_output=False, |
| cwd=None, |
| *args, |
| **kwargs): # pylint: disable=keyword-arg-before-vararg |
| """Execute a command. |
| |
| Args: |
| command: The command to execute. |
| check_output: Whether or not to raise a CalledProcessError on non-zero |
| return code. |
| cwd: The current working directory to use for the subprocess. |
| Defaults to os.getcwd() |
| *args: positional params. |
| **kwargs: Key/Value dictionary params. |
| Returns: |
| Either just stderr if check_output=True, or a tuple of |
| (returncode, stdout, stderr) |
| """ |
| self._last_subprocess_cmd = command |
| |
| returncode, stdout, stderr = \ |
| self._executor.subprocess(command, *args, cwd=cwd, **kwargs) |
| |
| if check_output: |
| if returncode != 0: |
| full_output = stdout |
| if stderr: |
| full_output += '\nstderr:\n' + stderr |
| raise subprocess.CalledProcessError(returncode, |
| cmd=command, |
| output=full_output) |
| |
| return stdout |
| return returncode, stdout, stderr |
| |
| def exec_subprocess_chain(self, subprocess_chain): |
| """Execute a chain of commands, piping output between processes.""" |
| msg = ['Chain of commands, piping output between processes:'] |
| commands = msg + [' '.join(command[0]) for command in subprocess_chain] |
| commands = [cmd.strip() + '\n' for cmd in commands] |
| self._last_subprocess_cmd = commands |
| return self._executor.subprocess_chain(subprocess_chain) |
| |
| def exec_non_blocking_subprocess(self, *args, **kwargs): |
| subprocess, log_file_descriptor, log_file_path = ( |
| self._executor.non_blocking_subprocess(*args, **kwargs)) |
| self._non_blocking_subprocesses.append((subprocess, log_file_descriptor)) |
| return subprocess, log_file_descriptor, log_file_path |
| |
| def start_subprocess(self, *args, **kwargs): |
| """Direct call to the subprocess.Popen. |
| |
| Args: |
| stdin: A file stream, subprocess.PIPE, or None to read stdin from. |
| stdout: A file stream or subprocess.PIPE to redirect stdout. |
| stderr: A file stream or subprocess.PIPE to redirect stderr. |
| *args: passed to subprocess.Popen |
| **kwargs: passed to subprocess.Popen; |
| optionally, quiet=True can be passed as a kwarg, |
| in which case stdout/stderr from the subprocess |
| will not be written to file. This kwarg will not |
| get passed to subprocess.Popen. |
| |
| Returns: |
| Tuple of (subprocess, boolean indicating if subprocess output needs to be |
| saved.) |
| """ |
| return self._executor.start_subprocess(*args, **kwargs) |
| |
| def get_running_subprocess_pids(self): |
| current_running_processes = [] |
| for subprocess, _ in self._non_blocking_subprocesses: |
| if subprocess.returncode is None: |
| current_running_processes.append(subprocess.pid) |
| return current_running_processes |
| |
| def kill_all_blocking_subprocesses(self): |
| """Killing all current blocking subprocesses.""" |
| msg = u'Last cmd run by the executor: {}'.format( |
| str(self._last_subprocess_cmd)) |
| |
| # Print all current running processes |
| self.exec_subprocess(['ps', '-efH']) |
| |
| for subprocess in self._executor.get_blocking_subprocesses(): |
| pid = subprocess.pid |
| if not psutil.pid_exists(pid): |
| logging.info('[Kill all blocking] Process %d started at %d and is no ' |
| 'longer running, skip', pid, subprocess.start) |
| continue |
| subprocess_info = psutil.Process(pid) |
| |
| logging.info('[Kill all blocking] Process %d, (%s) started at %d. ' |
| 'Communicating with process...', |
| pid, ' '.join(subprocess_info.cmdline()), subprocess.start) |
| stdout = None |
| stderr = None |
| try: |
| stdout, stderr = subprocess.communicate(timeout=_SUBPROCESS_TIMEOUT) |
| logging.info('[Kill all blocking] Process %d: exited with ' |
| 'returncode = %d.', pid, subprocess.returncode) |
| except timeout.TimeoutException as e: |
| # Due to a quirkiness of Python imports, we cannot catch |
| # subprocess42.TimeoutExpired directly, so we are catching its base |
| # class instead. However, we need to check for the attributes existence |
| # to make sure this is indeed subprocess42.TimeoutExpired. |
| if hasattr(e, 'output'): |
| stdout = e.output |
| if hasattr(e, 'stderr'): |
| stderr = e.stderr |
| logging.exception('Timeout (%d s) expired while waiting for the ' |
| 'process to finish. Killing process: %d', |
| _SUBPROCESS_TIMEOUT, pid) |
| subprocess.kill() |
| subprocess.wait() |
| finally: |
| if stdout: |
| logging.info('[Kill all blocking] Stdout size for process %d: %d.', |
| pid, len(stdout)) |
| if stderr: |
| logging.info('[Kill all blocking] Stderr size for process %d: %d.', |
| pid, len(stderr)) |
| |
| |
| def kill_all_non_blocking_subprocesses(self): |
| """Killing all current non-blocking subprocesses.""" |
| for subprocess, log_file_descriptor in self._non_blocking_subprocesses: |
| try: |
| subprocess.poll() |
| if subprocess.returncode is None: |
| logging.info('Killing process: %d', subprocess.pid) |
| subprocess.kill() |
| except: # pylint: disable=bare-except |
| logging.exception('Killing process failed.') |
| try: |
| logging.info('Closing file handler %d', log_file_descriptor) |
| os.close(log_file_descriptor) |
| except: # pylint: disable=bare-except |
| logging.exception('Closing file handler failed.') |
| |
| def exec_read_file(self, file_path): |
| return self._executor.read_file(file_path) |
| |
| def glob_files(self, glob_expr): |
| return self._executor.glob_files(glob_expr) |
| |
| def get_tmp_dir(self): |
| return self._executor.get_tmp_dir() |
| |
| def get_new_tmp_file(self): |
| return self._executor.get_new_tmp_file() |
| |
| def write_temp_file(self, contents: str): |
| """Write a text string to a temporary file.""" |
| return self._executor.write_temp_file(contents) |
| |
| def add_review(self, review): |
| """Adds |review| to the review comment. |
| |
| Args: |
| review: A dict with optional 'message' and 'comments' keys. |
| If 'message' is present, it will be added as a review comment. |
| If 'comments' is present, they will be added as file comments. |
| |
| 'message': A string to be added to the review message. |
| 'comments': A dict of comments in the form of |
| {<filename>: [{'line': <line #>, |
| 'message': <file comment>}, |
| {'line': <line #>, |
| 'message': <file comment>}, |
| ... |
| ], |
| <filename>: [...], |
| ... |
| } |
| Note that the 'line' field is optional, and if not |
| provided will result in a file-level comment instead |
| of a line-specific comment. |
| To add a comment to the commit message, use |
| '/COMMIT_MSG' as the filename. |
| """ |
| if 'message' in review: |
| assert isinstance(review['message'], str) |
| if self._annotator: |
| self._annotator.add_message(review['message']) |
| if 'comments' in review: |
| assert isinstance(review['comments'], dict) |
| if self._annotator: |
| self._annotator.add_comments(review['comments']) |
| |
| def get_cq_root(self): |
| """Returns the absolute path to the root of the CQ src.""" |
| return self._executor.get_cq_root() |
| |
| def get_gcs_dir(self): |
| """This directory will be uploaded to GCS when a recipe finishes.""" |
| return self._executor.get_gcs_dir() |
| |
| def set_build_property(self, name, value): |
| """Sets a build property for this step to return to the builder. |
| |
| Args: |
| name: property name (must be a string) |
| value: property value (must be JSON-serializable) |
| """ |
| assert isinstance(name, str) |
| assert value is not None |
| self._properties[name] = value |
| if self._annotator: |
| self._annotator.set_build_property(name, value) |
| |
| def halt_on_failure(self): |
| """Returns whether this step's failure should end the recipe.""" |
| return self._halt_on_failure |
| |
| def set_halt_on_failure(self): |
| """Forces step failure to end the recipe run.""" |
| self._halt_on_failure = True |
| |
| def get_name(self): |
| """Returns the name of this step, used for status display and flow control. |
| |
| Returns: |
| The name of this step. |
| """ |
| return self._name |
| |
| def get_timeout(self): |
| """Returns the length of time, in seconds, this step is allowed to run.""" |
| return self._timeout_secs |
| |
| def result_on_timeout(self): |
| """Returns the result value to use if this step times out.""" |
| return self._result_on_timeout |
| |
| def get_step_data(self, key, default=None): |
| """Retrieves step data from a previous step. |
| |
| Args: |
| key: Unique name the step data was indexed by. |
| default: Default return value if the key is not found. |
| |
| Returns: |
| Data associated with the given key. |
| """ |
| return self._data_manager.get_data(key, default=default) |
| |
| def add_step_data(self, key, value, replace=False): |
| """Provides step data for future steps. |
| |
| Args: |
| key: Unique name to index this step data. |
| value: Actual data to provide to future steps. |
| replace: whether |value| can replace previously stored data in |key|. |
| (defaults to False) |
| """ |
| self._data_manager.add_data(key, value, replace) |
| |
| def clear_project_info(self): |
| """Clears the project info to be reset in the next run.""" |
| return self._data_manager.clear_project_info() |
| |
| def contains_project(self, project): |
| """Boolean if the given project is included in the repo client.""" |
| return self._data_manager.contains_project(project) |
| |
| def ensure_project_info(self): |
| """Populate the project info.""" |
| return self._data_manager.ensure_project_info() |
| |
| def get_project_path(self, project): |
| """Returns the path to the project. |
| |
| Args: |
| project: The name of the project (usually patch_project) to lookup. |
| |
| Returns: |
| The relative path from the base checkout to the project. |
| """ |
| return self._data_manager.get_project_path(project) |
| |
| def get_project_path_lookup_table(self): |
| """Returns the project path lookup table.""" |
| return self._data_manager.get_project_path_lookup_table() |
| |
| def get_project_remote(self, project): |
| """Returns the remote for the project. |
| |
| Args: |
| project: The name of the project (usually patch_project) to lookup. |
| |
| Returns: |
| The remote for the project. |
| """ |
| return self._data_manager.get_project_remote(project) |
| |
| def get_project_revision(self, project): |
| """Returns the revision for the project. |
| |
| Args: |
| project: The name of the project (usually patch_project) to lookup. |
| |
| Returns: |
| The revision for the project. |
| """ |
| return self._data_manager.get_project_revision(project) |
| |
| def run(self) -> bool: |
| """Synchronously executes this step of the recipe. |
| |
| Returns: |
| Whether or not the step succeeded. |
| """ |
| raise NotImplementedError("Subclasses should have implemented this.") |
| |
| def add_mock_executions(self, generator): |
| """Adds mock executions to the generator.""" |
| |
| def _getcwd(self): |
| """Helper function to make it easy to mock os.getcwd() for tests.""" |
| return os.getcwd() |