"""DNS-01 authenticator plugin for Certbot using the yeil public API. Authenticates to api.yeil.app/v1/dns with a yeil App key (yk_...) sent as a Bearer token, then adds/removes TXT records via the public records API. Create an App with DNS record-write permission on your zone(s) in your yeil team settings (the Apps tab) and put its key in the credentials file. The certbot host only needs HTTPS reachability to api.yeil.app/v1/dns; no NetBird or shared admin key. """ import json import logging from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from certbot import errors, interfaces from certbot.plugins import dns_common from zope.interface import implementer logger = logging.getLogger(__name__) DEFAULT_BASE_URL = "https://api.yeil.app/v1/dns" HTTP_TIMEOUT = 30 @implementer(interfaces.IAuthenticator) @implementer(interfaces.IPluginFactory) class Authenticator(dns_common.DNSAuthenticator): description = ( "Obtain certificates via DNS-01 using the yeil public DNS API." ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.credentials = None # (domain, validation_name, validation) -> (zone_id, record_id) self._records = {} @classmethod def add_parser_arguments(cls, add): super(Authenticator, cls).add_parser_arguments( add, default_propagation_seconds=20 ) add("credentials", help="Path to your yeil credentials INI file.") def more_info(self): return ( "Configures Certbot to perform DNS-01 challenges by adding TXT " "records via the yeil public DNS API at api.yeil.app/v1/dns." ) def _setup_credentials(self): self.credentials = self._configure_credentials( "credentials", "yeil API credentials INI file", { "api_key": ( "yeil App key (yk_...) with DNS record-write permission on " "your zone(s); create an App under Apps in your team settings" ), }, ) # ── HTTP ─────────────────────────────────────────────────────────── def _base_url(self): url = self.credentials.conf("base_url") or DEFAULT_BASE_URL return url.rstrip("/") def _request(self, method, path, body=None, auth=True): """Send a JSON request and return the parsed JSON response. Raises PluginError on transport or non-2xx HTTP responses. Returns None for 204 No Content. """ url = f"{self._base_url()}{path}" data = json.dumps(body).encode("utf-8") if body is not None else None req = Request(url, data=data, method=method) req.add_header("Accept", "application/json") if data is not None: req.add_header("Content-Type", "application/json") req.add_header("Content-Length", str(len(data))) if auth: req.add_header("Authorization", f"Bearer {self._api_key()}") try: with urlopen(req, timeout=HTTP_TIMEOUT) as resp: if resp.status == 204: return None payload = resp.read().decode("utf-8") return json.loads(payload) if payload else None except HTTPError as e: self._raise_http_error(e, method, path) except URLError as e: raise errors.PluginError( f"yeil dns API unreachable ({method} {path}): {e}" ) @staticmethod def _raise_http_error(e, method, path): try: body = e.read().decode("utf-8") parsed = json.loads(body) msg = parsed.get("message") or parsed.get("error") or e.reason except Exception: msg = e.reason raise errors.PluginError( f"yeil dns API error ({method} {path}, {e.code}): {msg}" ) def _api_key(self): key = self.credentials.conf("api_key") if not key: raise errors.PluginError( "yeil credentials file is missing 'api_key' " "(a yk_... App key with DNS record-write permission)" ) return key # ── Zone resolution ──────────────────────────────────────────────── def _find_zone(self, fqdn): """Resolve the longest-suffix zone owned by the caller. Returns (zone_id, zone_name). Raises PluginError if none. """ from urllib.parse import quote result = self._request( "GET", f"/zones?suffix_of={quote(fqdn, safe='')}", ) if not isinstance(result, dict) or "id" not in result: raise errors.PluginError( f"yeil dns API: no owned zone covers {fqdn}" ) return result["id"], result["zoneName"] @staticmethod def _relative_name(validation_name, zone_name): """`_acme-challenge.smtp.yeil.org` in zone `yeil.org` -> `_acme-challenge.smtp`. If validation_name equals the zone, returns "@" (the apex). """ v = validation_name.rstrip(".") z = zone_name.rstrip(".") if v == z: return "@" suffix = "." + z if v.endswith(suffix): return v[: -len(suffix)] raise errors.PluginError( f"validation_name {v} is not within zone {z}" ) # ── certbot hooks ────────────────────────────────────────────────── def _perform(self, domain, validation_name, validation): zone_id, zone_name = self._find_zone(validation_name) rel_name = self._relative_name(validation_name, zone_name) result = self._request( "POST", f"/zones/{zone_id}/records", body={ "name": rel_name, "type": "TXT", "content": validation, "ttl": 60, }, ) record_id = ( result.get("id") if isinstance(result, dict) else None ) if not record_id: raise errors.PluginError( "yeil dns API: addrecord did not return a record id" ) self._records[(domain, validation_name, validation)] = ( zone_id, record_id, ) def _cleanup(self, domain, validation_name, validation): entry = self._records.pop((domain, validation_name, validation), None) if not entry: logger.warning( "No stored record for %s; skipping cleanup", validation_name ) return zone_id, record_id = entry try: self._request( "DELETE", f"/zones/{zone_id}/records/{record_id}", ) except errors.PluginError as e: # Don't fail the renewal because of a stale TXT we couldn't # delete; log and move on. Operator can prune by hand. logger.warning( "Failed to clean up TXT record %s (zone=%s record=%s): %s", validation_name, zone_id, record_id, e, )