# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import configparser
import locale
import os
import re
import syslog
from collections import namedtuple
class WebConfigParsingError(Exception):
def __init__(self, message):
self.message = message
class WebConfigMissing(Exception):
def __init__(self, message):
self.message = message
SECHEAD = 'asection'
def load(path, case_sensitive=False, ignore_bad_encoding=False):
config = configparser.ConfigParser(allow_no_value=True,
interpolation=None,
strict=False)
if case_sensitive:
config.optionxform = str
if ignore_bad_encoding:
with open(path, 'rb') as f:
raw = f.read().decode(locale.getpreferredencoding(), 'replace')
else:
with open(path, 'r', encoding='utf-8') as f:
raw = f.read()
config.read_string(f'[{SECHEAD}]\n' + raw, source=path)
return dict(config.items(section=SECHEAD))
_QUOTES = "'", '"'
def _strip_escape_quotes_of_config_value(val: str) -> str:
"""
Strips single or double quote char only if the quote present from both sides.
"""
if val.startswith(_QUOTES) and val.endswith(_QUOTES):
return val[1:-1]
return val
def load_fast(path, delimiter="=", strip_quotes=False):
data = {}
with open(path, "r", encoding="utf-8", errors="surrogateescape") as f:
for line in f.readlines():
parts = line.split(delimiter, 1)
try:
key, value = parts
except ValueError:
# Skip broken lines
continue
value = value.strip()
value = (
_strip_escape_quotes_of_config_value(value)
if strip_quotes
else value
)
data[key.strip()] = value
return data
cache = {}
def load_once(path, ignore_errors=False):
"""
Read ini file once (cached) and return its content as dict
"""
try:
res = cache[path]
except KeyError:
try:
res = cache[path] = load(path)
except (IOError, configparser.Error):
if not ignore_errors:
raise
res = cache[path] = {}
return res
def change_settings(settings_dict, path, tmp_path=None):
if not tmp_path:
tmp_path = path + ".tmp"
used_keys = []
with (open(path, 'r', encoding='utf-8') as fin,
open(tmp_path, 'w', encoding='utf-8') as fout):
for line in fin:
stripped_line = line.strip()
if stripped_line and not stripped_line.startswith('#'):
key, _ = stripped_line.split('=', 1)
key = key.strip()
if key in settings_dict:
fout.write(f'{key}={settings_dict[key]}\n')
used_keys.append(key)
continue
fout.write(line)
with open(tmp_path, 'a', encoding='utf-8') as fout:
for key in settings_dict:
if key not in used_keys:
fout.write(f'{key}={settings_dict[key]}\n')
os.rename(tmp_path, path)
_NGINX_TOKENS_RE = re.compile(
r"""
(
# Comments
(:? \# .* $ )
# Single-, double-quoted strings and bare strings without whitespaces
| (:? "[^"\n]*?" )
| (:? '[^'\n]*?' )
| (:? [^"';\s\{\}]+ )
# Structural characters
| ;
| \{
| \}
| \n
)
""",
re.IGNORECASE | re.MULTILINE | re.VERBOSE,
)
def _ngx_tokenize(data):
tokens = (
match.group(0)
for match in _NGINX_TOKENS_RE.finditer(data)
if match and match.group(0)
)
# Explicitly ignore comments
return (tok for tok in tokens if not tok.startswith('#'))
def _ngx_take_until(it, val):
for tok in it:
if tok in val:
return
yield tok
def _ngx_take_until_block_end(it):
lvl = 1
for t in it:
if t == "{":
lvl += 1
elif t == "}":
lvl -= 1
if lvl < 1:
return
yield t
def _ngx_scan_block_info(block_tokens, need_fields):
"""Scan a block for required fields, skips nested blocks"""
info = {}
for tok in block_tokens:
# We need to skip until the end of inner block if it occurs
if tok == "{":
for _ in _ngx_take_until_block_end(block_tokens):
pass
# Now gather the value, the last occurrence is in priority
if tok in need_fields:
value_tokens = _ngx_take_until(block_tokens, ";\n")
info[tok] = list(value_tokens)
return info
def nginx_conf_loose_parser(data):
"""
Parse content of NGINX configuration in a manner tolerant to minor mistakes
and extract relevant fields from all `server` directives.
Relevant fields are:
- `server_name`
- `root` - returned as `document_root`
- `ssl` - if `listen` field contains "ssl" word
Doesn't handle interpolated values (ex. `${val}`) outside of quoted strings
"""
tokens = _ngx_tokenize(data)
for tok in tokens:
if tok != "server":
continue
# Nothing seems to be allowed between "server" directive and
# the opening of his block, so we just discard everything
# until first block opening seen
for _ in _ngx_take_until(tokens, "{"):
pass
# Limit further scan by the inside of block
block_tokens = _ngx_take_until_block_end(tokens)
# By using only `block_tokens` we ensure all blocks are properly delimited
info = _ngx_scan_block_info(block_tokens, ("server_name", "root", "listen"))
try:
server_name = info["server_name"]
root = info["root"]
except KeyError:
continue
if not server_name and not root:
continue
yield {
"server_name": _strip_escape_quotes_of_config_value(server_name[0]),
"document_root": _strip_escape_quotes_of_config_value(root[0]),
"ssl": "ssl" in info.get("listen", []),
}
def nginx_conf_parser(conf_file):
"""Parse NGINX config file, see `nginx_conf_loose_parser` for more details"""
if not os.path.isfile(conf_file):
raise WebConfigMissing(f'File does not exists {conf_file}')
dirty_data = read_unicode_file_with_decode_fallback(conf_file)
return list(nginx_conf_loose_parser(dirty_data))
def apache_conf_parser(conf_file):
if not os.path.isfile(conf_file):
raise WebConfigMissing(f'File does not exists {conf_file}')
conf_data = []
data_all = read_unicode_file_with_decode_fallback(conf_file).splitlines()
data = [i for i in data_all if re.search('^((?!#).)*$', i)]
ID = 0
enable = False
result = {}
vhost = []
while len(data) > 0:
out = data.pop(0)
if "