Files
certbot-dns-yeil/certbot_dns_yeil/dns_yeil.py
eskimo 3d606b20bc v3.0.1: default propagation 20s -> 60s for edge-replica convergence
20s was too tight: yeil serves DNS from edge replicas fed by a
replication log, so a freshly-written _acme-challenge TXT takes a bit to
appear on every authoritative nameserver. Let's Encrypt's secondary
(multi-perspective) validation hit a not-yet-converged replica and saw a
stale value -> 'incorrect TXT record found'. 60s lets all replicas catch
up (matches what already works in practice for wildcard certs).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 02:14:48 -04:00

210 lines
7.6 KiB
Python

"""DNS-01 authenticator plugin for Certbot using the yeil public API.
Authenticates to dns.yeil.app with a yeil App key (yk_...) sent as a
Bearer token, then adds/removes TXT records via the public records API.
Create an App with DNS record-write permission on your zone(s) in your
yeil team settings (the Apps tab) and put its key in the credentials file.
The certbot host only needs HTTPS reachability to dns.yeil.app; no
NetBird or shared admin key.
"""
import json
import logging
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from certbot import errors, interfaces
from certbot.plugins import dns_common
from zope.interface import implementer
logger = logging.getLogger(__name__)
DEFAULT_BASE_URL = "https://dns.yeil.app"
HTTP_TIMEOUT = 30
@implementer(interfaces.IAuthenticator)
@implementer(interfaces.IPluginFactory)
class Authenticator(dns_common.DNSAuthenticator):
description = (
"Obtain certificates via DNS-01 using the yeil public DNS API."
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.credentials = None
# (domain, validation_name, validation) -> (zone_id, record_id)
self._records = {}
@classmethod
def add_parser_arguments(cls, add):
# yeil serves DNS from edge replicas fed by a replication log, so a
# freshly-written TXT takes a little time to appear on every
# authoritative nameserver. 20s was too tight and tripped Let's
# Encrypt's secondary (multi-perspective) validation against a
# not-yet-converged replica; 60s gives all replicas time to catch up.
super(Authenticator, cls).add_parser_arguments(
add, default_propagation_seconds=60
)
add("credentials", help="Path to your yeil credentials INI file.")
def more_info(self):
return (
"Configures Certbot to perform DNS-01 challenges by adding TXT "
"records via the yeil public DNS API at dns.yeil.app."
)
def _setup_credentials(self):
self.credentials = self._configure_credentials(
"credentials",
"yeil API credentials INI file",
{
"api_key": (
"yeil App key (yk_...) with DNS record-write permission on "
"your zone(s); create an App under Apps in your team settings"
),
},
)
# ── HTTP ───────────────────────────────────────────────────────────
def _base_url(self):
url = self.credentials.conf("base_url") or DEFAULT_BASE_URL
return url.rstrip("/")
def _request(self, method, path, body=None, auth=True):
"""Send a JSON request and return the parsed JSON response.
Raises PluginError on transport or non-2xx HTTP responses.
Returns None for 204 No Content.
"""
url = f"{self._base_url()}{path}"
data = json.dumps(body).encode("utf-8") if body is not None else None
req = Request(url, data=data, method=method)
req.add_header("Accept", "application/json")
if data is not None:
req.add_header("Content-Type", "application/json")
req.add_header("Content-Length", str(len(data)))
if auth:
req.add_header("Authorization", f"Bearer {self._api_key()}")
try:
with urlopen(req, timeout=HTTP_TIMEOUT) as resp:
if resp.status == 204:
return None
payload = resp.read().decode("utf-8")
return json.loads(payload) if payload else None
except HTTPError as e:
self._raise_http_error(e, method, path)
except URLError as e:
raise errors.PluginError(
f"yeil dns API unreachable ({method} {path}): {e}"
)
@staticmethod
def _raise_http_error(e, method, path):
try:
body = e.read().decode("utf-8")
parsed = json.loads(body)
msg = parsed.get("message") or parsed.get("error") or e.reason
except Exception:
msg = e.reason
raise errors.PluginError(
f"yeil dns API error ({method} {path}, {e.code}): {msg}"
)
def _api_key(self):
key = self.credentials.conf("api_key")
if not key:
raise errors.PluginError(
"yeil credentials file is missing 'api_key' "
"(a yk_... App key with DNS record-write permission)"
)
return key
# ── Zone resolution ────────────────────────────────────────────────
def _find_zone(self, fqdn):
"""Resolve the longest-suffix zone owned by the caller.
Returns (zone_id, zone_name). Raises PluginError if none.
"""
from urllib.parse import quote
result = self._request(
"GET",
f"/api/v1/zones?suffix_of={quote(fqdn, safe='')}",
)
if not isinstance(result, dict) or "id" not in result:
raise errors.PluginError(
f"yeil dns API: no owned zone covers {fqdn}"
)
return result["id"], result["zoneName"]
@staticmethod
def _relative_name(validation_name, zone_name):
"""`_acme-challenge.smtp.yeil.org` in zone `yeil.org` -> `_acme-challenge.smtp`.
If validation_name equals the zone, returns "@" (the apex).
"""
v = validation_name.rstrip(".")
z = zone_name.rstrip(".")
if v == z:
return "@"
suffix = "." + z
if v.endswith(suffix):
return v[: -len(suffix)]
raise errors.PluginError(
f"validation_name {v} is not within zone {z}"
)
# ── certbot hooks ──────────────────────────────────────────────────
def _perform(self, domain, validation_name, validation):
zone_id, zone_name = self._find_zone(validation_name)
rel_name = self._relative_name(validation_name, zone_name)
result = self._request(
"POST",
f"/api/v1/zones/{zone_id}/records",
body={
"name": rel_name,
"type": "TXT",
"content": validation,
"ttl": 60,
},
)
record_id = (
result.get("id") if isinstance(result, dict) else None
)
if not record_id:
raise errors.PluginError(
"yeil dns API: addrecord did not return a record id"
)
self._records[(domain, validation_name, validation)] = (
zone_id,
record_id,
)
def _cleanup(self, domain, validation_name, validation):
entry = self._records.pop((domain, validation_name, validation), None)
if not entry:
logger.warning(
"No stored record for %s; skipping cleanup", validation_name
)
return
zone_id, record_id = entry
try:
self._request(
"DELETE",
f"/api/v1/zones/{zone_id}/records/{record_id}",
)
except errors.PluginError as e:
# Don't fail the renewal because of a stale TXT we couldn't
# delete; log and move on. Operator can prune by hand.
logger.warning(
"Failed to clean up TXT record %s (zone=%s record=%s): %s",
validation_name, zone_id, record_id, e,
)