certbot_plugin_gandi/certbot_plugin_gandi/gandi_api.py

128 lines
3.5 KiB
Python

import requests
import urllib
from collections import namedtuple
from certbot.plugins import dns_common
try:
from urllib import quote # Python 2.X
except ImportError:
from urllib.parse import quote # Python 3+
_GandiConfig = namedtuple('_GandiConfig', ('api_key',))
_BaseDomain = namedtuple('_BaseDomain', ('zone_uuid', 'fqdn'))
def get_config(api_key):
return _GandiConfig(api_key=api_key)
def _get_json(response):
try:
data = response.json()
except ValueError:
return dict()
return data
def _get_response_message(response, default='<No reason given>'):
return _get_json(response).get('message', default)
def _headers(cfg):
return {
'Content-Type': 'application/json',
'X-Api-Key': cfg.api_key
}
def _get_url(*segs):
return 'https://dns.api.gandi.net/api/v5/{}'.format(
'/'.join(quote(seg, safe='') for seg in segs)
)
def _request(cfg, method, segs, **kw):
headers = _headers(cfg)
url = _get_url(*segs)
return requests.request(method, url, headers=headers, **kw)
def _get_base_domain(cfg, domain):
for candidate_base_domain in dns_common.base_domain_name_guesses(domain):
response = _request(cfg, 'GET', ('domains', candidate_base_domain))
if response.ok:
data = _get_json(response)
zone_uuid = data.get('zone_uuid')
fqdn = data.get('fqdn')
if zone_uuid and fqdn:
return _BaseDomain(zone_uuid=zone_uuid, fqdn=fqdn)
return None
def _get_relative_name(base_domain, name):
suffix = '.' + base_domain.fqdn
return name[:-len(suffix)] if name.endswith(suffix) else None
def _del_txt_record(cfg, base_domain, relative_name):
return _request(
cfg,
'DELETE',
('zones', base_domain.zone_uuid, 'records', relative_name, 'TXT'))
def _update_record(cfg, domain, name, request_runner):
base_domain = _get_base_domain(cfg, domain)
if base_domain is None:
return 'Unable to get base domain for "{}"'.format(domain)
relative_name = _get_relative_name(base_domain, name)
if relative_name is None:
return 'Unable to derive relative name for "{}"'.format(name)
response = request_runner(base_domain, relative_name)
return None if response.ok else _get_response_message(response)
def get_txt_records(cfg, domain, name):
base_domain = _get_base_domain(cfg, domain)
if base_domain is None:
return 'Unable to get base domain for "{}"'.format(domain)
relative_name = _get_relative_name(base_domain, name)
if relative_name is None:
return 'Unable to derive relative name for "{}"'.format(name)
response = _request(
cfg,
'GET',
('zones', base_domain.zone_uuid, 'records', relative_name, 'TXT'))
if response.ok:
return response.json().get('rrset_values')
else:
return []
def add_txt_record(cfg, domain, name, value):
def requester(base_domain, relative_name):
_del_txt_record(cfg, base_domain, relative_name)
return _request(
cfg,
'POST',
('zones', base_domain.zone_uuid, 'records', relative_name, 'TXT'),
json={
'rrset_values': value if isinstance(value, list) else [value]
})
return _update_record(cfg, domain, name, requester)
def del_txt_record(cfg, domain, name):
def requester(base_domain, relative_name):
return _del_txt_record(cfg, base_domain, relative_name)
return _update_record(cfg, domain, name, requester)