Skip to content

Modules Reference

runners.autopkg_tools

process_recipe(recipe_path, git_repo_root, munki_subdir, gh_repo, token, settings, autopkg_prefs) async

Run recipe in isolated worktree, commit, push, create PR.

Source code in lambopkg/runners/autopkg_tools.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
async def process_recipe(
    recipe_path: Path,
    git_repo_root: Path,
    munki_subdir: str,
    gh_repo: str,
    token: str,
    settings: Settings,
    autopkg_prefs: AutoPkgPrefs,
) -> None:
    """Run recipe in isolated worktree, commit, push, create PR."""
    logger = logging_config.get_logger(__name__)
    recipe_name = recipe_path.stem

    logger.info("Processing %s", recipe_name)

    now = datetime.now(timezone.utc)
    branch = f"autopkg-{recipe_name.replace(' ', '-')}-{now:%Y%m%d%H%M%S}"
    worktree_path = git_repo_root.parent / f"worktree-{recipe_name}-{now:%Y%m%d%H%M%S}"

    # Clone prefs and point at worktree's munki subdir
    prefs = autopkg_prefs.clone()
    prefs.munki_repo = worktree_path / munki_subdir

    base_repo = Repo(git_repo_root)

    with worktree(base_repo, worktree_path, branch) as wt_repo:
        # Run recipe with prefs pointing to worktree
        with prefs:
            try:
                results = await Recipe(recipe_path, settings.report_dir, prefs).run()
                logger.debug("AutoPkg recipe run results: %s", results)
                logger.info("Recipe run %s complete", recipe_name)
            except Exception:
                logger.exception("Recipe %s failed", recipe_name)
                return

        if not results["munki_imported_items"]:
            logger.info("No changes for %s", recipe_name)
            return

        # Stage metadata files for git (packages are gitignored, synced to S3 separately)
        munki_repo_path = str(prefs.munki_repo)
        for item in results["munki_imported_items"]:
            files = [f"{munki_repo_path}/pkgsinfo/{item.get('pkginfo_path')}"]
            if item.get("icon_repo_path"):
                files.append(f"{munki_repo_path}/icons/{item.get('icon_repo_path')}")
            wt_repo.index.add(files)

        # Catch any untracked icons (some recipes produce icons without setting icon_repo_path)
        icons_dir = f"{munki_repo_path}/icons"
        if os.path.isdir(icons_dir):
            wt_repo.index.add([icons_dir])

        # Commit and push
        name = results["munki_imported_items"][0]["name"]
        version = results["munki_imported_items"][0]["version"]
        commit_msg = f"AutoPkg {name} {version}"

        wt_repo.index.commit(commit_msg)
        wt_repo.remote("origin").push(refspec=f"{branch}:{branch}")
        logger.info("Pushed branch %s", branch)

        # Create PR via PyGithub
        gh = Github(auth=Auth.Token(token))
        pr = gh.get_repo(gh_repo).create_pull(
            title=f"AutoPkg: {name} {version}",
            body=f"Automated update for `{name}` version `{version}`.",
            head=branch,
            base="main",
        )
        logger.info("Created PR: %s", pr.html_url)
        gh.close()

worktree(repo, path, branch)

Create git worktree for isolated recipe processing.

Source code in lambopkg/runners/autopkg_tools.py
20
21
22
23
24
25
26
27
28
29
@contextmanager
def worktree(repo: Repo, path: Path, branch: str):
    """Create git worktree for isolated recipe processing."""
    repo.create_head(branch)
    repo.git.worktree("add", str(path), branch)
    try:
        yield Repo(path)
    finally:
        repo.git.worktree("remove", str(path), "--force")
        repo.git.worktree("prune")

runners.autopromote

get_channel_multiplier(plist)

Retrieve the float multiplier for plist's channel. Returns multiplier or 1

Source code in lambopkg/runners/autopromote.py
241
242
243
244
245
246
247
248
249
250
251
252
def get_channel_multiplier(plist):
    """Retrieve the float multiplier for plist's channel. Returns multiplier or 1"""

    channel = plist.get("_metadata", {}).get("channel")
    if channel is None:
        return 1.0

    multiplier = CONFIG.get("channels", {}).get(channel)
    if not isinstance(multiplier, (int, float)) or multiplier == 0:
        return 1.0

    return float(multiplier)

get_force_install_days(catalog)

Returns the number of days a package should live in a catalog, as configured

Source code in lambopkg/runners/autopromote.py
194
195
196
197
198
199
200
201
def get_force_install_days(catalog):
    """Returns the number of days a package should live in a catalog, as configured"""

    days = CONFIG["catalogs"].get(catalog, {}).get("force_install_days")
    if not isinstance(days, int):
        days = CONFIG["force_install_days"]

    return days

get_force_install_time(plist)

Returns a force install datetime shifted to match the configured force_install_time

Source code in lambopkg/runners/autopromote.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def get_force_install_time(plist):
    """Returns a force install datetime shifted to match the configured force_install_time"""

    f = arrow.get(plist["force_install_after_date"])
    r = f.shift(
        hours=(int(CONFIG["force_install_time"]["hour"] or 0) - f.hour),
        minutes=(int(CONFIG["force_install_time"]["minute"] or 0) - f.minute),
    )

    patch_day = CONFIG.get("patch_tuesday")
    if isinstance(patch_day, int) and patch_day <= 6 and patch_day >= 0:
        r = r.shift(weekday=patch_day)

    return r.datetime

get_ideal_catalogs(catalogs)

Given a list of catalogs, returns the catalog which appears last in CONFIG['catalog_order'] and and the list of catalogs leading up to that catalog

Source code in lambopkg/runners/autopromote.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def get_ideal_catalogs(catalogs):
    """
    Given a list of catalogs, returns the catalog which appears last
    in CONFIG['catalog_order'] and and the list of catalogs leading up to that catalog
    """

    custom_catalogs = [c for c in catalogs if c not in CONFIG["catalog_order"]]
    config_catalogs = [c for c in CONFIG["catalog_order"] if c in catalogs]
    latest_catalog = None if not config_catalogs else config_catalogs[-1]

    if latest_catalog:
        new_catalogs = []
        for c in CONFIG["catalog_order"]:
            new_catalogs.append(c)
            if c == latest_catalog:
                break

        new_catalogs = new_catalogs + custom_catalogs
    else:
        new_catalogs = catalogs

    return latest_catalog, new_catalogs

get_next_catalog(latest_catalog)

Returns the next catalog configured in the promotion schedule

Source code in lambopkg/runners/autopromote.py
228
229
230
231
232
233
234
235
236
237
238
def get_next_catalog(latest_catalog):
    """Returns the next catalog configured in the promotion schedule"""

    for i, catalog in enumerate(CONFIG["catalog_order"]):
        if catalog == latest_catalog:
            try:
                return CONFIG["catalog_order"][i + 1]
            except IndexError:
                return None

    return None

get_pkgs(root)

Returns a list of pkginfo paths given a root directory.

Source code in lambopkg/runners/autopromote.py
122
123
124
125
126
127
128
129
def get_pkgs(root):
    """Returns a list of pkginfo paths given a root directory."""

    pkgs = []
    for directory, _subdirs, pkginfos in os.walk(root):
        for pkginfo in pkginfos:
            pkgs.append(os.path.join(directory, pkginfo))
    return pkgs

get_previous_pkg(current)

Returns the previous version of package in PKGINFOS_PATHS

Source code in lambopkg/runners/autopromote.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def get_previous_pkg(current):
    """Returns the previous version of package in PKGINFOS_PATHS"""

    last = None
    current_version = pkg_version(current)
    for plist, _pkginfo in PKGINFOS_PATHS:
        if plist["name"] == current["name"] and plist["version"] != current["version"]:
            plist_version = pkg_version(plist)
            if last:
                last_version = pkg_version(last)
                last = last if plist_version < last_version else plist if plist_version < current_version else last
            else:
                last = plist
    if last:
        logger.debug(
            f"Determined that previous version of {current['name']} {current['version']} is {last['name']} {last['version']}"
        )
    else:
        logger.warning(f"found no previous packages for {current['name']}")

    return last

load_config()

Reads autopromote.json from hardcoded path CONFIG_FILE

Source code in lambopkg/runners/autopromote.py
87
88
89
90
91
92
93
94
95
def load_config():
    """Reads autopromote.json from hardcoded path CONFIG_FILE"""

    with open(CONFIG_FILE) as f:
        config = json.load(f)

    config["catalogs"], config["catalog_order"] = order_catalogs(config["catalogs"])
    config = load_deny_and_allow_lists(config)
    return config

load_logger(logfile)

Returns logger object pointing to stdout or a file, as configured

Source code in lambopkg/runners/autopromote.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def load_logger(logfile):
    """Returns logger object pointing to stdout or a file, as configured"""

    logger = logging.getLogger("autopromote")
    level = logging.DEBUG if DEBUG else logging.INFO

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

    if logfile == "stdout":
        handler = StreamHandler(sys.stdout)
    else:
        handler = RotatingFileHandler(logfile, maxBytes=1000000, backupCount=10)

    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(level)
    return logger

notify_slack(promotions, error)

Given a list of results from promote_pkgs, send a slack alert with a summary

Source code in lambopkg/runners/autopromote.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def notify_slack(promotions, error):
    """
    Given a list of results from promote_pkgs, send a slack alert with a summary
    """

    # Generate our Slack WebClient
    token = os.environ.get("SLACK_TOKEN")
    sslcert = SSLContext(PROTOCOL_TLS_CLIENT)
    sslcert.load_verify_locations(certifi.where())
    client = WebClient(token=token, ssl=sslcert)
    if not token:
        logger.error("No SLACK_TOKEN is in environment, skipping slack output")
        return
    # Build out the Slack message attachment showing what was promoted
    attachments = {
        "fields": [
            {"title": pkg, "value": f"{result['from']} => {result['to']}"} for pkg, result in promotions.items()
        ],
        "color": "danger" if error else "good",
        "title": "Autopromotion run completed",
        "text": "" if promotions else "No packages promoted" if not error else f"Error: {error}",
        "footer": "Alerts #withGusto",
    }
    logger.debug(promotions)
    logger.debug(attachments)
    # Actually send the Slack message
    try:
        client.chat_postMessage(
            channel=CONFIG.get("slack_channel", "#test-please-ignore"),
            text="new autopromote.py run complete",
            username="munki autopromoter",
            icon_emoji=":munki:",
            attachments=[attachments],
        )
    except SlackApiError as e:
        logger.exception(f"Slack error: {e.response['error']}")

order_catalogs(catalogs)

Takes a list of catalogs and returns a dict ordered according the configured catalog schedule.

Source code in lambopkg/runners/autopromote.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def order_catalogs(catalogs):
    """
    Takes a list of catalogs and returns a dict ordered according the
    configured catalog schedule.
    """

    od = OrderedDict()
    keys = []
    keys_to_process = catalogs.keys()

    while keys_to_process:
        still_to_process = []
        for catalog in keys_to_process:
            definition = catalogs[catalog]
            nxt = definition["next"]

            if nxt is None:
                keys.insert(-1, catalog)

            elif nxt in keys:
                i = keys.index(nxt) - 1
                i = 0 if i == -1 else i
                keys.insert(i, catalog)
            else:
                still_to_process.append(catalog)

        keys_to_process = still_to_process.copy()

    for key in keys:
        od[key] = catalogs[key].copy()

    return od, keys

output_results(promotions, error)

Given a list of results from promote_pkgs, write a file to disk

Source code in lambopkg/runners/autopromote.py
432
433
434
435
436
437
438
439
440
441
442
443
def output_results(promotions, error):
    """
    Given a list of results from promote_pkgs, write a file to disk
    """

    file_path = CONFIG.get("output_results_path", "results.plist")

    with open(file_path, "wb") as f:
        if error:
            plistlib.dump(error, f)
        else:
            plistlib.dump(promotions, f)

pkg_version(plist)

Returns parsed semantic version from plist

Source code in lambopkg/runners/autopromote.py
132
133
134
135
def pkg_version(plist):
    """Returns parsed semantic version from plist"""

    return semantic_version.parse(plist["version"])

promote_pkg(current_plist, path)

Given a pkginfo plist, parse its catalogs, apply a new catalog (promotion) and shift force_install_after_date if neccessary.

Returns a boolean promoted and a dict results

Source code in lambopkg/runners/autopromote.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def promote_pkg(current_plist, path):  # noqa: C901
    """
    Given a pkginfo plist, parse its catalogs, apply a new catalog (promotion)
    and shift force_install_after_date if neccessary.

    Returns a boolean promoted and a dict results
    """

    name = current_plist["name"]
    version = current_plist["version"]
    catalogs = current_plist["catalogs"]
    fullname = f"{name} {version}"
    plist = current_plist.copy()

    promoted = False
    result = {"plist": plist, "from": None, "to": None, "fullname": fullname}

    logger.info(f"Considering package {fullname}")

    if not permitted(name, version):
        return promoted, result

    if (
        CONFIG["enforce_force_install_time"]
        and CONFIG.get("force_install_time")
        and plist.get("force_install_after_date")
    ):
        plist["force_install_after_date"] = get_force_install_time(plist)

    latest_catalog, ideal_catalogs = get_ideal_catalogs(catalogs)
    plist["catalogs"] = ideal_catalogs

    logger.debug(f"Package {fullname} has a catalog of {latest_catalog}")
    promotion_period = CONFIG["catalogs"].get(latest_catalog, {}).get("days")
    logger.debug(f"Promotion period for package {fullname} is {promotion_period}")

    if promotion_period is None:
        logger.debug(f"No defined promotion period for {latest_catalog} catalog, skipping")
        return promoted, result

    last_promoted = plist.get("_metadata", {}).get("last_promoted")
    if last_promoted is None:
        logger.debug(f"Package {fullname} has no last_promoted value!")

        # Is newly imported package
        if latest_catalog == CONFIG["catalog_order"][0]:
            last_promoted = plist["_metadata"].get("creation_date")

            previous_pkg = get_previous_pkg(plist)

            if previous_pkg:
                for key in CONFIG["fields_to_copy"]:
                    # Only copy the previous field if the new plist does not contain a conflicting value
                    if previous_pkg.get(key) and not plist.get(key):
                        plist[key] = previous_pkg[key]
            else:
                logger.info(f"No previous package found for {fullname}!")

    last_promoted = arrow.get(last_promoted) if last_promoted else None

    if last_promoted is None:
        promotion_due = False
    else:
        channel_shifted = promotion_period * get_channel_multiplier(plist)
        logger.debug(f"Channel-shifted promotion period for {fullname} is {channel_shifted}")

        since_last_promotion = arrow.now() - last_promoted
        days_since_last_promotion = since_last_promotion.days + since_last_promotion.seconds / SECONDS_IN_DAY
        logger.debug(f"{fullname} was last promoted {days_since_last_promotion} days ago")

        promotion_due = days_since_last_promotion >= channel_shifted

    if not promotion_due:
        return promoted, result

    next_catalog = CONFIG["catalogs"][latest_catalog]["next"]
    if next_catalog is None:
        if promotion_period is not None:
            msg = "Cannot define a next catalog without a promotion period."
            raise ValueError(msg)
        return promoted, result

    plist["catalogs"].append(next_catalog)
    promoted = True
    result["pkginfo"] = path
    result["from"] = latest_catalog
    result["to"] = next_catalog
    plist["_metadata"]["last_promoted"] = arrow.now().datetime
    if CONFIG.get("enforce_force_install_date") and name not in CONFIG.get("force_install_denylist", []):
        plist["force_install_after_date"] = arrow.now().shift(days=+get_force_install_days(next_catalog)).datetime

        if CONFIG.get("enforce_force_install_time") and CONFIG.get("force_install_time"):
            plist["force_install_after_date"] = get_force_install_time(plist)

    logger.info(f"Promoted {fullname} from {result['from']} to {result['to']}")

    return promoted, result

promote_pkgs(pkginfos)

Iterate over pkgs and pass them to promote_pkg if not in denylist.

Returns a list of results from promote_pkg.

Source code in lambopkg/runners/autopromote.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def promote_pkgs(pkginfos):
    """
    Iterate over pkgs and pass them to promote_pkg if not in denylist.

    Returns a list of results from promote_pkg.
    """

    promotions = {}

    for plist, path in pkginfos:
        promoted, result = promote_pkg(plist, path)
        if promoted:
            promotions[result["fullname"]] = result

        with open(path, "wb") as f:
            plistlib.dump(result["plist"], f)

        logger.debug(f"wrote {result['fullname']} to {path}")

    return promotions

safe_read_pkg(pkginfo)

Returns the contents of a pkginfo plist, or, if a parsing error occurs, None

Source code in lambopkg/runners/autopromote.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def safe_read_pkg(pkginfo):
    """Returns the contents of a pkginfo plist, or, if a parsing error occurs, None"""

    logger.info(f"parsing {pkginfo}")
    try:
        with open(pkginfo, "rb") as f:
            plist = plistlib.load(f)
    except (ExpatError, plistlib.InvalidFileException) as e:
        # This is raised if a plist cannot be parsed (generally because its not a plist, but some clutter eg DS_Store)
        logger.warning(f"Failed to parse {pkginfo} because: {e!r}")
        plist = None
    except Exception:
        logger.exception(f"Error parsing {pkginfo}")
        raise
    return plist

runners.fix_trust_info

Fix ParentRecipeTrustInfo paths in AutoPkg override recipes.

Scans override recipes for ParentRecipeTrustInfo entries, resolves each parent recipe identifier to its actual filesystem path by searching Overrides/ then Recipes/, and updates the path and sha256_hash fields.

Usage

python lambopkg/runners/fix_trust_info.py [--autopkg-dir AutoPkg]

build_identifier_index(autopkg_dir)

Build map of recipe Identifier -> file path.

Source code in lambopkg/runners/fix_trust_info.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def build_identifier_index(autopkg_dir: Path) -> dict[str, Path]:
    """Build map of recipe Identifier -> file path."""
    index = {}
    for search_dir in [autopkg_dir / "Overrides", autopkg_dir / "Recipes"]:
        if not search_dir.exists():
            continue
        for recipe_file in search_dir.rglob("*.recipe.yaml"):
            try:
                data = yaml.safe_load(recipe_file.read_text())
            except Exception:
                continue
            if isinstance(data, dict) and "Identifier" in data:
                index[data["Identifier"]] = recipe_file
    return index

fix_override(override_path, index)

Fix trust info paths in a single override. Returns True if modified.

Source code in lambopkg/runners/fix_trust_info.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def fix_override(override_path: Path, index: dict[str, Path]) -> bool:
    """Fix trust info paths in a single override. Returns True if modified."""
    text = override_path.read_text()
    data = yaml.safe_load(text)

    if not isinstance(data, dict):
        return False

    trust_info = data.get("ParentRecipeTrustInfo")
    if not trust_info or not isinstance(trust_info, dict):
        return False

    parent_recipes = trust_info.get("parent_recipes")
    if not parent_recipes or not isinstance(parent_recipes, dict):
        return False

    modified = False
    for identifier, info in parent_recipes.items():
        if identifier not in index:
            print(f"  WARNING: {identifier} not found in Overrides/ or Recipes/")
            continue

        resolved_path = str(index[identifier])
        new_hash = sha256_file(index[identifier])

        old_path = info.get("path", "")
        old_hash = info.get("sha256_hash", "")

        if old_path != resolved_path or old_hash != new_hash:
            info["path"] = resolved_path
            info["sha256_hash"] = new_hash
            modified = True
            if old_path != resolved_path:
                print(f"  {identifier}: path updated")
            if old_hash != new_hash:
                print(f"  {identifier}: hash updated")

    if modified:
        override_path.write_text(
            yaml.dump(data, default_flow_style=False, allow_unicode=True)
        )

    return modified

sha256_file(path)

Compute SHA-256 hash of a file.

Source code in lambopkg/runners/fix_trust_info.py
36
37
38
def sha256_file(path: Path) -> str:
    """Compute SHA-256 hash of a file."""
    return hashlib.sha256(path.read_bytes()).hexdigest()