]>
Commit | Line | Data |
---|---|---|
53e6db90 DC |
1 | """Configuration management setup |
2 | ||
3 | Some terminology: | |
4 | - name | |
5 | As written in config files. | |
6 | - value | |
7 | Value associated with a name | |
8 | - key | |
9 | Name combined with it's section (section.name) | |
10 | - variant | |
11 | A single word describing where the configuration key-value pair came from | |
12 | """ | |
13 | ||
14 | import configparser | |
15 | import locale | |
16 | import os | |
17 | import sys | |
18 | from typing import Any, Dict, Iterable, List, NewType, Optional, Tuple | |
19 | ||
20 | from pip._internal.exceptions import ( | |
21 | ConfigurationError, | |
22 | ConfigurationFileCouldNotBeLoaded, | |
23 | ) | |
24 | from pip._internal.utils import appdirs | |
25 | from pip._internal.utils.compat import WINDOWS | |
26 | from pip._internal.utils.logging import getLogger | |
27 | from pip._internal.utils.misc import ensure_dir, enum | |
28 | ||
29 | RawConfigParser = configparser.RawConfigParser # Shorthand | |
30 | Kind = NewType("Kind", str) | |
31 | ||
32 | CONFIG_BASENAME = "pip.ini" if WINDOWS else "pip.conf" | |
33 | ENV_NAMES_IGNORED = "version", "help" | |
34 | ||
35 | # The kinds of configurations there are. | |
36 | kinds = enum( | |
37 | USER="user", # User Specific | |
38 | GLOBAL="global", # System Wide | |
39 | SITE="site", # [Virtual] Environment Specific | |
40 | ENV="env", # from PIP_CONFIG_FILE | |
41 | ENV_VAR="env-var", # from Environment Variables | |
42 | ) | |
43 | OVERRIDE_ORDER = kinds.GLOBAL, kinds.USER, kinds.SITE, kinds.ENV, kinds.ENV_VAR | |
44 | VALID_LOAD_ONLY = kinds.USER, kinds.GLOBAL, kinds.SITE | |
45 | ||
46 | logger = getLogger(__name__) | |
47 | ||
48 | ||
49 | # NOTE: Maybe use the optionx attribute to normalize keynames. | |
50 | def _normalize_name(name: str) -> str: | |
51 | """Make a name consistent regardless of source (environment or file)""" | |
52 | name = name.lower().replace("_", "-") | |
53 | if name.startswith("--"): | |
54 | name = name[2:] # only prefer long opts | |
55 | return name | |
56 | ||
57 | ||
58 | def _disassemble_key(name: str) -> List[str]: | |
59 | if "." not in name: | |
60 | error_message = ( | |
61 | "Key does not contain dot separated section and key. " | |
62 | "Perhaps you wanted to use 'global.{}' instead?" | |
63 | ).format(name) | |
64 | raise ConfigurationError(error_message) | |
65 | return name.split(".", 1) | |
66 | ||
67 | ||
68 | def get_configuration_files() -> Dict[Kind, List[str]]: | |
69 | global_config_files = [ | |
70 | os.path.join(path, CONFIG_BASENAME) for path in appdirs.site_config_dirs("pip") | |
71 | ] | |
72 | ||
73 | site_config_file = os.path.join(sys.prefix, CONFIG_BASENAME) | |
74 | legacy_config_file = os.path.join( | |
75 | os.path.expanduser("~"), | |
76 | "pip" if WINDOWS else ".pip", | |
77 | CONFIG_BASENAME, | |
78 | ) | |
79 | new_config_file = os.path.join(appdirs.user_config_dir("pip"), CONFIG_BASENAME) | |
80 | return { | |
81 | kinds.GLOBAL: global_config_files, | |
82 | kinds.SITE: [site_config_file], | |
83 | kinds.USER: [legacy_config_file, new_config_file], | |
84 | } | |
85 | ||
86 | ||
87 | class Configuration: | |
88 | """Handles management of configuration. | |
89 | ||
90 | Provides an interface to accessing and managing configuration files. | |
91 | ||
92 | This class converts provides an API that takes "section.key-name" style | |
93 | keys and stores the value associated with it as "key-name" under the | |
94 | section "section". | |
95 | ||
96 | This allows for a clean interface wherein the both the section and the | |
97 | key-name are preserved in an easy to manage form in the configuration files | |
98 | and the data stored is also nice. | |
99 | """ | |
100 | ||
101 | def __init__(self, isolated: bool, load_only: Optional[Kind] = None) -> None: | |
102 | super().__init__() | |
103 | ||
104 | if load_only is not None and load_only not in VALID_LOAD_ONLY: | |
105 | raise ConfigurationError( | |
106 | "Got invalid value for load_only - should be one of {}".format( | |
107 | ", ".join(map(repr, VALID_LOAD_ONLY)) | |
108 | ) | |
109 | ) | |
110 | self.isolated = isolated | |
111 | self.load_only = load_only | |
112 | ||
113 | # Because we keep track of where we got the data from | |
114 | self._parsers: Dict[Kind, List[Tuple[str, RawConfigParser]]] = { | |
115 | variant: [] for variant in OVERRIDE_ORDER | |
116 | } | |
117 | self._config: Dict[Kind, Dict[str, Any]] = { | |
118 | variant: {} for variant in OVERRIDE_ORDER | |
119 | } | |
120 | self._modified_parsers: List[Tuple[str, RawConfigParser]] = [] | |
121 | ||
122 | def load(self) -> None: | |
123 | """Loads configuration from configuration files and environment""" | |
124 | self._load_config_files() | |
125 | if not self.isolated: | |
126 | self._load_environment_vars() | |
127 | ||
128 | def get_file_to_edit(self) -> Optional[str]: | |
129 | """Returns the file with highest priority in configuration""" | |
130 | assert self.load_only is not None, "Need to be specified a file to be editing" | |
131 | ||
132 | try: | |
133 | return self._get_parser_to_modify()[0] | |
134 | except IndexError: | |
135 | return None | |
136 | ||
137 | def items(self) -> Iterable[Tuple[str, Any]]: | |
138 | """Returns key-value pairs like dict.items() representing the loaded | |
139 | configuration | |
140 | """ | |
141 | return self._dictionary.items() | |
142 | ||
143 | def get_value(self, key: str) -> Any: | |
144 | """Get a value from the configuration.""" | |
145 | orig_key = key | |
146 | key = _normalize_name(key) | |
147 | try: | |
148 | return self._dictionary[key] | |
149 | except KeyError: | |
150 | # disassembling triggers a more useful error message than simply | |
151 | # "No such key" in the case that the key isn't in the form command.option | |
152 | _disassemble_key(key) | |
153 | raise ConfigurationError(f"No such key - {orig_key}") | |
154 | ||
155 | def set_value(self, key: str, value: Any) -> None: | |
156 | """Modify a value in the configuration.""" | |
157 | key = _normalize_name(key) | |
158 | self._ensure_have_load_only() | |
159 | ||
160 | assert self.load_only | |
161 | fname, parser = self._get_parser_to_modify() | |
162 | ||
163 | if parser is not None: | |
164 | section, name = _disassemble_key(key) | |
165 | ||
166 | # Modify the parser and the configuration | |
167 | if not parser.has_section(section): | |
168 | parser.add_section(section) | |
169 | parser.set(section, name, value) | |
170 | ||
171 | self._config[self.load_only][key] = value | |
172 | self._mark_as_modified(fname, parser) | |
173 | ||
174 | def unset_value(self, key: str) -> None: | |
175 | """Unset a value in the configuration.""" | |
176 | orig_key = key | |
177 | key = _normalize_name(key) | |
178 | self._ensure_have_load_only() | |
179 | ||
180 | assert self.load_only | |
181 | if key not in self._config[self.load_only]: | |
182 | raise ConfigurationError(f"No such key - {orig_key}") | |
183 | ||
184 | fname, parser = self._get_parser_to_modify() | |
185 | ||
186 | if parser is not None: | |
187 | section, name = _disassemble_key(key) | |
188 | if not ( | |
189 | parser.has_section(section) and parser.remove_option(section, name) | |
190 | ): | |
191 | # The option was not removed. | |
192 | raise ConfigurationError( | |
193 | "Fatal Internal error [id=1]. Please report as a bug." | |
194 | ) | |
195 | ||
196 | # The section may be empty after the option was removed. | |
197 | if not parser.items(section): | |
198 | parser.remove_section(section) | |
199 | self._mark_as_modified(fname, parser) | |
200 | ||
201 | del self._config[self.load_only][key] | |
202 | ||
203 | def save(self) -> None: | |
204 | """Save the current in-memory state.""" | |
205 | self._ensure_have_load_only() | |
206 | ||
207 | for fname, parser in self._modified_parsers: | |
208 | logger.info("Writing to %s", fname) | |
209 | ||
210 | # Ensure directory exists. | |
211 | ensure_dir(os.path.dirname(fname)) | |
212 | ||
213 | with open(fname, "w") as f: | |
214 | parser.write(f) | |
215 | ||
216 | # | |
217 | # Private routines | |
218 | # | |
219 | ||
220 | def _ensure_have_load_only(self) -> None: | |
221 | if self.load_only is None: | |
222 | raise ConfigurationError("Needed a specific file to be modifying.") | |
223 | logger.debug("Will be working with %s variant only", self.load_only) | |
224 | ||
225 | @property | |
226 | def _dictionary(self) -> Dict[str, Any]: | |
227 | """A dictionary representing the loaded configuration.""" | |
228 | # NOTE: Dictionaries are not populated if not loaded. So, conditionals | |
229 | # are not needed here. | |
230 | retval = {} | |
231 | ||
232 | for variant in OVERRIDE_ORDER: | |
233 | retval.update(self._config[variant]) | |
234 | ||
235 | return retval | |
236 | ||
237 | def _load_config_files(self) -> None: | |
238 | """Loads configuration from configuration files""" | |
239 | config_files = dict(self.iter_config_files()) | |
240 | if config_files[kinds.ENV][0:1] == [os.devnull]: | |
241 | logger.debug( | |
242 | "Skipping loading configuration files due to " | |
243 | "environment's PIP_CONFIG_FILE being os.devnull" | |
244 | ) | |
245 | return | |
246 | ||
247 | for variant, files in config_files.items(): | |
248 | for fname in files: | |
249 | # If there's specific variant set in `load_only`, load only | |
250 | # that variant, not the others. | |
251 | if self.load_only is not None and variant != self.load_only: | |
252 | logger.debug("Skipping file '%s' (variant: %s)", fname, variant) | |
253 | continue | |
254 | ||
255 | parser = self._load_file(variant, fname) | |
256 | ||
257 | # Keeping track of the parsers used | |
258 | self._parsers[variant].append((fname, parser)) | |
259 | ||
260 | def _load_file(self, variant: Kind, fname: str) -> RawConfigParser: | |
261 | logger.verbose("For variant '%s', will try loading '%s'", variant, fname) | |
262 | parser = self._construct_parser(fname) | |
263 | ||
264 | for section in parser.sections(): | |
265 | items = parser.items(section) | |
266 | self._config[variant].update(self._normalized_keys(section, items)) | |
267 | ||
268 | return parser | |
269 | ||
270 | def _construct_parser(self, fname: str) -> RawConfigParser: | |
271 | parser = configparser.RawConfigParser() | |
272 | # If there is no such file, don't bother reading it but create the | |
273 | # parser anyway, to hold the data. | |
274 | # Doing this is useful when modifying and saving files, where we don't | |
275 | # need to construct a parser. | |
276 | if os.path.exists(fname): | |
277 | locale_encoding = locale.getpreferredencoding(False) | |
278 | try: | |
279 | parser.read(fname, encoding=locale_encoding) | |
280 | except UnicodeDecodeError: | |
281 | # See https://github.com/pypa/pip/issues/4963 | |
282 | raise ConfigurationFileCouldNotBeLoaded( | |
283 | reason=f"contains invalid {locale_encoding} characters", | |
284 | fname=fname, | |
285 | ) | |
286 | except configparser.Error as error: | |
287 | # See https://github.com/pypa/pip/issues/4893 | |
288 | raise ConfigurationFileCouldNotBeLoaded(error=error) | |
289 | return parser | |
290 | ||
291 | def _load_environment_vars(self) -> None: | |
292 | """Loads configuration from environment variables""" | |
293 | self._config[kinds.ENV_VAR].update( | |
294 | self._normalized_keys(":env:", self.get_environ_vars()) | |
295 | ) | |
296 | ||
297 | def _normalized_keys( | |
298 | self, section: str, items: Iterable[Tuple[str, Any]] | |
299 | ) -> Dict[str, Any]: | |
300 | """Normalizes items to construct a dictionary with normalized keys. | |
301 | ||
302 | This routine is where the names become keys and are made the same | |
303 | regardless of source - configuration files or environment. | |
304 | """ | |
305 | normalized = {} | |
306 | for name, val in items: | |
307 | key = section + "." + _normalize_name(name) | |
308 | normalized[key] = val | |
309 | return normalized | |
310 | ||
311 | def get_environ_vars(self) -> Iterable[Tuple[str, str]]: | |
312 | """Returns a generator with all environmental vars with prefix PIP_""" | |
313 | for key, val in os.environ.items(): | |
314 | if key.startswith("PIP_"): | |
315 | name = key[4:].lower() | |
316 | if name not in ENV_NAMES_IGNORED: | |
317 | yield name, val | |
318 | ||
319 | # XXX: This is patched in the tests. | |
320 | def iter_config_files(self) -> Iterable[Tuple[Kind, List[str]]]: | |
321 | """Yields variant and configuration files associated with it. | |
322 | ||
323 | This should be treated like items of a dictionary. | |
324 | """ | |
325 | # SMELL: Move the conditions out of this function | |
326 | ||
327 | # environment variables have the lowest priority | |
328 | config_file = os.environ.get("PIP_CONFIG_FILE", None) | |
329 | if config_file is not None: | |
330 | yield kinds.ENV, [config_file] | |
331 | else: | |
332 | yield kinds.ENV, [] | |
333 | ||
334 | config_files = get_configuration_files() | |
335 | ||
336 | # at the base we have any global configuration | |
337 | yield kinds.GLOBAL, config_files[kinds.GLOBAL] | |
338 | ||
339 | # per-user configuration next | |
340 | should_load_user_config = not self.isolated and not ( | |
341 | config_file and os.path.exists(config_file) | |
342 | ) | |
343 | if should_load_user_config: | |
344 | # The legacy config file is overridden by the new config file | |
345 | yield kinds.USER, config_files[kinds.USER] | |
346 | ||
347 | # finally virtualenv configuration first trumping others | |
348 | yield kinds.SITE, config_files[kinds.SITE] | |
349 | ||
350 | def get_values_in_config(self, variant: Kind) -> Dict[str, Any]: | |
351 | """Get values present in a config file""" | |
352 | return self._config[variant] | |
353 | ||
354 | def _get_parser_to_modify(self) -> Tuple[str, RawConfigParser]: | |
355 | # Determine which parser to modify | |
356 | assert self.load_only | |
357 | parsers = self._parsers[self.load_only] | |
358 | if not parsers: | |
359 | # This should not happen if everything works correctly. | |
360 | raise ConfigurationError( | |
361 | "Fatal Internal error [id=2]. Please report as a bug." | |
362 | ) | |
363 | ||
364 | # Use the highest priority parser. | |
365 | return parsers[-1] | |
366 | ||
367 | # XXX: This is patched in the tests. | |
368 | def _mark_as_modified(self, fname: str, parser: RawConfigParser) -> None: | |
369 | file_parser_tuple = (fname, parser) | |
370 | if file_parser_tuple not in self._modified_parsers: | |
371 | self._modified_parsers.append(file_parser_tuple) | |
372 | ||
373 | def __repr__(self) -> str: | |
374 | return f"{self.__class__.__name__}({self._dictionary!r})" |