blob: 0a687e7c400c69cea43ec930d75016dfd8793e8a [file] [log] [blame] [edit]
"""Tools to interact with repo."""
from __future__ import absolute_import
import contextlib
import logging
import os
import shutil
import tempfile
from typing import Any, Mapping
from xml.dom import minidom
import xml.etree.ElementTree as et
import six
DEFAULT_REMOTE = 'eureka'
DEFAULT_REVISION = 'DEFAULT'
EUREKA_MANIFEST_PROJECT_NAME = 'eureka/manifest'
LIBASSISTANT_MANIFEST_PROJECT_NAME = 'standalone/manifest'
EUREKA_INTERNAL_HOST_URL = 'https://eureka-internal.googlesource.com/'
EUREKA_PARTNER_HOST_URL = 'https://eureka-partner.googlesource.com/'
EUREKA_MANIFEST_URL = (
EUREKA_INTERNAL_HOST_URL + EUREKA_MANIFEST_PROJECT_NAME)
CATABUILDER_E2E_TESTS_MANIFEST_URL = (
'https://eureka-staging-internal.googlesource.com/'
'catabuilder_e2e_tests_manifest')
GOTHAM_MCU_MANIFEST_URL = (
'https://team.googlesource.com/gotham-cr/manifest')
LIBASSISTANT_MANIFEST_URL = (
'https://libassistant-internal.googlesource.com/' +
LIBASSISTANT_MANIFEST_PROJECT_NAME)
PARTNER_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest')
PARTNER_AMLOGIC_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'amlogic/manifest')
CAST_AUDIO_STANDARD_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_cast_audio')
CAST_AUDIO_REFERENCE_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_cast_audio_reference')
CAST_ASSISTANT_REFERENCE_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_cast_assistant_reference')
CAST_TV_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_cast_tv')
CAST_TV_MARVELL_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_cast_tv_marvell')
CAST_TV_MEDIATEK_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_cast_tv_mediatek')
CAST_TV_MSTAR_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_cast_tv_mstar')
CAST_TV_NOVATEK_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_cast_tv_novatek')
CAST_TV_COMCAST_URL = (
EUREKA_PARTNER_HOST_URL + 'partner/manifest_comcast')
ABBEY_MANIFEST_URL = (
EUREKA_INTERNAL_HOST_URL + 'abbey/manifest')
PARTNER_QUALCOMM_MANIFEST_URL = (
EUREKA_PARTNER_HOST_URL + 'qualcomm/manifest')
PARTNER_QUALCOMM_MANIFEST_OTA_URL = (
EUREKA_PARTNER_HOST_URL + 'qualcomm/manifest_ota')
REPO_MANIFEST_PROJECT_PATH = '.repo/manifests'
REPO_MANIFEST_PROJECT_REMOTE = 'origin'
REPO_MANIFEST_URL_TO_PROJECT_NAME = {
EUREKA_MANIFEST_URL: EUREKA_MANIFEST_PROJECT_NAME,
LIBASSISTANT_MANIFEST_URL: LIBASSISTANT_MANIFEST_PROJECT_NAME,
}
MANIFEST_OVERRIDE_XML = 'manifest_contour.xml'
REPO_MANIFEST_URL_TO_REPO_ALIAS = {
ABBEY_MANIFEST_URL: 'abbey',
CAST_ASSISTANT_REFERENCE_MANIFEST_URL: 'cast_assistant_reference',
CAST_AUDIO_REFERENCE_MANIFEST_URL: 'cast_audio_reference',
CAST_AUDIO_STANDARD_MANIFEST_URL: 'cast_audio_new',
CAST_TV_COMCAST_URL: 'cast_tv_comcast',
CAST_TV_MANIFEST_URL: 'cast_tv',
CAST_TV_MARVELL_MANIFEST_URL: 'cast_tv_marvell',
CAST_TV_MEDIATEK_MANIFEST_URL: 'cast_tv_mediatek',
CAST_TV_MSTAR_MANIFEST_URL: 'cast_tv_mstar',
CAST_TV_NOVATEK_URL: 'cast_tv_novatek',
EUREKA_MANIFEST_URL: 'internal',
GOTHAM_MCU_MANIFEST_URL: 'gotham_mcu',
LIBASSISTANT_MANIFEST_URL: 'libassistant',
PARTNER_AMLOGIC_MANIFEST_URL: 'partner_amlogic',
PARTNER_MANIFEST_URL: 'partner',
PARTNER_QUALCOMM_MANIFEST_URL: 'partner_qualcomm',
PARTNER_QUALCOMM_MANIFEST_OTA_URL: 'partner_qualcomm_ota',
CATABUILDER_E2E_TESTS_MANIFEST_URL: 'e2e_tests',
}
class DuplicateLookupError(Exception):
"""Error if the project was included more than once."""
class ManifestParseError(Exception):
"""Error if unable to parse output of 'repo manifest' command."""
class _ProjectLookupTable(object):
"""A lookup table for project and project paths."""
def __init__(self):
"""Create a ProjectLookupTable."""
self._lookup_dict = {}
self._duplicate_keys = set()
self._update = False
def __getitem__(self, key):
"""Get the value for the given key.
Args:
key: The key to get a value for: project or (project, revision).
Returns:
The value for the given key.
Raises:
KeyError: If the key is not included.
DuplicateLookupError: If the key was included multiple times.
"""
return self.get(key)
def __setitem__(self, key, value):
"""Sets the value for the given keys.
Args:
key: (project, revision) or project.
value: The value.
"""
if not isinstance(key, tuple):
project = key
revision = DEFAULT_REVISION
else:
project, revision = key
if self._lookup_dict.get(project, {}).get(revision) and not self._update:
self._duplicate_keys.add((project, revision))
if project not in self._lookup_dict:
self._lookup_dict[project] = {}
self._lookup_dict[project][revision] = value
def __contains__(self, key):
"""Returns true if key is in the underlying dict."""
try:
return bool(self.get(key))
except DuplicateLookupError:
return True
except KeyError:
return False
def get(self, key):
"""Get the value for the given key.
Args:
key: The key to get a value for: project or (project, revision).
Returns:
The value for the given key.
Raises:
KeyError: If the key is not included.
DuplicateLookupError: If the key was included multiple times.
"""
revision_unspecified = not isinstance(key, tuple)
if revision_unspecified:
project = key
revision = DEFAULT_REVISION
else:
project, revision = key
if (project, revision) in self._duplicate_keys:
raise DuplicateLookupError(
f'Duplicate of {str((project, revision))} found in manifest.'
'If the same repository has been pulled more than once,'
'please explicitly specify the revision in the manifest.')
if project in self._lookup_dict:
revisions = self._lookup_dict[project]
elif project.endswith('/') and project[:-1] in self._lookup_dict:
revisions = self._lookup_dict[project[:-1]]
# For legacy reasons, some project names in the manifest include a '.git'
# suffix. Gerrit strips off this suffix, though, so if we haven't found the
# key yet, try searching with a .git suffix.
elif project + '.git' in self._lookup_dict:
revisions = self._lookup_dict[project + '.git']
else:
raise KeyError('{} was not found in the project path lookup {}.'.format(
project, self._lookup_dict))
assert revisions
if revision_unspecified:
if len(revisions) == 1:
return list(revisions.values())[0]
raise DuplicateLookupError(
f'There are multiple revisions of {project} being included in this'
f'manifest ({", ".join(revisions)}). Please query the table with '
f'tuple ({project}, revision)')
if revision in revisions:
return revisions[revision]
raise KeyError(f'{revision} was not found in the path lookup of {project}.'
f' Available revisions: {", ".join(revisions)}')
def get_dict(self):
"""Gets the dictionary representing the lookup table.
Returns:
The underlying dict storage of this object.
"""
return self._lookup_dict
def update(self, mapping: Mapping[Any, str]):
try:
self._update = True
for k, v in mapping.items():
self[k] = v
finally:
self._update = False
def get_project_path_lookup(executor, manifest_url, manifest=None, **kwargs):
"""Returns a project path lookup table.
Using manifest apgument as a manifest content by default, if None
`repo manifest` will be invoked.
Note that if duplicate projects are included in the manifest only 1 will be
included in the lookup. This will be the latter of the duplicates listed via
the `repo manifest` command.
Args:
executor: Executes the subprocess.
manifest_url: The url where the repo manifest project is hosted.
manifest: Content of manifest file already loaded by build.
**kwargs: Any additional kwargs to pass to the executor.
Returns:
A ProjectLookupTable representing the relative path for the given project.
"""
if not manifest:
manifest = get_manifest(executor, **kwargs)
return parse_manifest_for_path(manifest, manifest_url)
@contextlib.contextmanager
def normalize_manifest_if_exists(manifest_path):
"""Normalizes the manifest for the duration of the context manager."""
if manifest_path:
temp_manifest = tempfile.NamedTemporaryFile(delete='False').name
shutil.copy2(manifest_path, temp_manifest)
_normalize_manifest(manifest_path)
try:
yield
finally:
if manifest_path:
shutil.copy2(temp_manifest, manifest_path)
os.unlink(temp_manifest)
def _normalize_manifest(manifest_path):
"""Convert manifest to a more user friendly version."""
with open(manifest_path, 'r') as f:
manifest = ''.join([
line.strip() for line in f.readlines()])
root = et.fromstring(manifest)
for remote in root.findall('remote'):
if 'github.googlesource.com' in remote.get('fetch'):
remote.set('fetch', 'https://github.com')
remote.attrib.pop('review', None)
break
normalized_manifest = minidom.parseString(
et.tostring(root)).toprettyxml(encoding='UTF-8', indent=' ')
with open(manifest_path, 'wb') as f:
f.write(normalized_manifest)
def override_manifest_command(override_path=None):
"""Returns command for overriding .repo/manifest.xml by override_path."""
if override_path is None:
override_path = os.path.join(os.pardir, MANIFEST_OVERRIDE_XML)
return ['ln', '-sf', override_path, 'manifest.xml']
def override_manifest(executor, override_path=None):
"""Overriding .repo/manifest.xml by override_path."""
return executor.exec_subprocess(
override_manifest_command(override_path), cwd='.repo')
def get_project_remote_lookup(executor, manifest_url, manifest=None, **kwargs):
"""Returns a project remote lookup table.
Using manifest apgument as a manifest content by default, if None
`repo manifest` will be invoked.
Note that if duplicate projects are included in the manifest only 1 will be
included in the lookup. This will be the latter of the duplicates listed via
the `repo manifest` command.
Args:
executor: Executes the subprocess.
manifest_url: The url where the repo manifest project is hosted.
manifest: Content of manifest file already loaded by build.
**kwargs: Any additional kwargs to pass to the executor.
Returns:
A ProjectLookupTable representing the remote for the given project.
"""
if not manifest:
manifest = get_manifest(executor, **kwargs)
return parse_manifest_for_remote(manifest, manifest_url)
def get_project_revision_lookup(executor, manifest_url, manifest_branch,
manifest=None, **kwargs):
"""Returns a project revision lookup table.
Using manifest apgument as a manifest content by default, if None
`repo manifest` will be invoked.
Note that if duplicate projects are included in the manifest only 1 will be
included in the lookup. This will be the latter of the duplicates listed via
the `repo manifest` command.
Args:
executor: Executes the subprocess.
manifest_url: The url where the repo manifest project is hosted.
manifest_branch: The repo manifest branch of the current checkout.
manifest: Content of manifest file already loaded by build.
**kwargs: Any additional kwargs to pass to the executor.
Returns:
A ProjectLookupTable representing the revision for the given project.
"""
if not manifest:
manifest = get_manifest(executor, **kwargs)
return parse_manifest_for_revision(manifest, manifest_url, manifest_branch)
def get_manifest(executor, retries=5, **kwargs):
"""Returns the manifest file in a string form.
Args:
executor: Executes the subprocess.
retries: number of retries to load manifest.
**kwargs: Any additional kwargs to pass to the executor.
Raises:
ManifestParseError: if all retries fail.
Returns:
A string representing the manifest file.
"""
failures = 0
while retries > failures:
manifest = executor.exec_subprocess(
['repo', 'manifest'],
check_output=True,
**kwargs)
try:
lines = ''.join([line.strip() for line in manifest.splitlines()])
_ = et.fromstring(lines)
return lines
except et.ParseError as error:
failures += 1
logging.warning(('Failed to parse manifest file. Attempt %d/%d. '
'Error was: %s\nManifest file content was:\n%s'),
failures, retries, error, manifest)
raise ManifestParseError(
'Failed to parse manifest after %s retries' % failures)
def is_manifest_project(project):
"""Checks if the project is a Manifest project.
Args:
project: The name of the Gerrit project in which the files live.
Returns:
True if it is a Manifest, False if not.
"""
return project in list(REPO_MANIFEST_URL_TO_PROJECT_NAME.values())
def parse_manifest_for_path(manifest, manifest_url):
"""Parses a repo manifest file for project paths.
Args:
manifest: A string representing a manifest file.
manifest_url: The url where the repo manifest project is hosted.
Returns:
A ProjectLookupTable representing the manifest for the given project.
"""
assert isinstance(manifest, six.string_types)
result = _ProjectLookupTable()
# Add an entry to this map representing the repo manifest project.
manifest_project_name = get_manifest_url_mapping(
REPO_MANIFEST_URL_TO_PROJECT_NAME, manifest_url)
if manifest_project_name:
result[manifest_project_name] = REPO_MANIFEST_PROJECT_PATH
if not manifest:
return result
manifest = ''.join([line.strip() for line in manifest.splitlines()])
root = et.fromstring(manifest)
for project in root.findall('project'):
name = project.get('name')
path = project.get('path')
# TODO(b/358371464): This only covers the explicit specified revision.
# This is just a workaround. Should cover all the cases one day.
revision = project.get('upstream') or project.get('revision')
if not path:
path = name
key = (name, revision) if revision else name
result[key] = path
return result
def parse_manifest_for_remote(manifest, manifest_url):
"""Parses a repo manifest file for project remotes.
Args:
manifest: A string representing a manifest file.
manifest_url: The url where the repo manifest project is hosted.
Returns:
A ProjectLookupTable representing the manifest for the given project.
"""
assert isinstance(manifest, six.string_types)
result = _ProjectLookupTable()
# Add an entry to this map representing the repo manifest project.
manifest_project_name = get_manifest_url_mapping(
REPO_MANIFEST_URL_TO_PROJECT_NAME, manifest_url)
if manifest_project_name:
result[manifest_project_name] = REPO_MANIFEST_PROJECT_REMOTE
if not manifest:
return result
manifest = ''.join([line.strip() for line in manifest.splitlines()])
root = et.fromstring(manifest)
default_remote = DEFAULT_REMOTE
n = root.find('default')
if n is not None and n.get('remote') is not None:
default_remote = n.get('remote')
for project in root.findall('project'):
name = project.get('name')
remote = project.get('remote')
if not remote:
remote = default_remote
result[name] = remote
return result
def parse_manifest_for_revision(manifest, manifest_url, manifest_branch):
"""Parses a repo manifest file for project revisions.
Args:
manifest: A string representing a manifest file.
manifest_url: The url where the repo manifest project is hosted.
manifest_branch: The repo manifest branch of the current checkout.
Returns:
A ProjectLookupTable representing the manifest for the given project.
"""
assert isinstance(manifest, six.string_types)
result = _ProjectLookupTable()
manifest_project_name = get_manifest_url_mapping(
REPO_MANIFEST_URL_TO_PROJECT_NAME, manifest_url)
if manifest_project_name:
result[manifest_project_name] = manifest_branch
if not manifest:
return result
manifest = ''.join([line.strip() for line in manifest.splitlines()])
root = et.fromstring(manifest)
# Get the default revision frome the manifest.
default_revision = ''
for default_tag in root.findall('default'):
default_revision = default_tag.get('revision', '')
# Create the lookup. If an entry does not explicitly reference a revision,
# assign the default revision.
for project in root.findall('project'):
name = project.get('name')
revision = project.get('revision')
if not revision:
revision = default_revision
key = (name, revision) if revision else name
result[key] = revision
return result
def requires_repo_sync(project, filenames):
"""Given a list of touched files, determines if a repo sync is needed.
If a change is applied to a repo manifest file (for example, to update the
revision of a project), we may need to run 'repo sync' again before building,
to ensure the project is checked out at the SHA we are trying to test. Return
true if this is the case.
Args:
project: The name of the Gerrit project in which the files live.
filenames: A list of files that were modified in project.
Returns:
True if we should run 'repo sync' again before building.
"""
# Projects that are not manifest projects can be ignored.
if not is_manifest_project(project):
return False
# If any file has been touched, we need to repo sync.
return bool(filenames)
def get_manifest_url_mapping(mapping, manifest_url):
"""Get value from manifest_url mapping regardless of GoB domain name."""
return mapping.get(manifest_url.replace(
'.git.corp.google.com', '.googlesource.com'))
def use_eureka_manifest(manifest_url):
"""Returns True if this manifest points to eureka/manifest."""
project_name = get_manifest_url_mapping(
REPO_MANIFEST_URL_TO_PROJECT_NAME, manifest_url)
return project_name and project_name == EUREKA_MANIFEST_PROJECT_NAME
def manifest_alias(manifest_url):
"""Returns manifest alias."""
return get_manifest_url_mapping(REPO_MANIFEST_URL_TO_REPO_ALIAS, manifest_url)