1"""Classes for managing templates and their runtime and compile time 2options. 3""" 4import os 5import typing 6import typing as t 7import weakref 8from collections import ChainMap 9from functools import lru_cache 10from functools import partial 11from functools import reduce 12from types import CodeType 13 14from markupsafe import Markup 15 16from . import nodes 17from .compiler import CodeGenerator 18from .compiler import generate 19from .defaults import BLOCK_END_STRING 20from .defaults import BLOCK_START_STRING 21from .defaults import COMMENT_END_STRING 22from .defaults import COMMENT_START_STRING 23from .defaults import DEFAULT_FILTERS 24from .defaults import DEFAULT_NAMESPACE 25from .defaults import DEFAULT_POLICIES 26from .defaults import DEFAULT_TESTS 27from .defaults import KEEP_TRAILING_NEWLINE 28from .defaults import LINE_COMMENT_PREFIX 29from .defaults import LINE_STATEMENT_PREFIX 30from .defaults import LSTRIP_BLOCKS 31from .defaults import NEWLINE_SEQUENCE 32from .defaults import TRIM_BLOCKS 33from .defaults import VARIABLE_END_STRING 34from .defaults import VARIABLE_START_STRING 35from .exceptions import TemplateNotFound 36from .exceptions import TemplateRuntimeError 37from .exceptions import TemplatesNotFound 38from .exceptions import TemplateSyntaxError 39from .exceptions import UndefinedError 40from .lexer import get_lexer 41from .lexer import Lexer 42from .lexer import TokenStream 43from .nodes import EvalContext 44from .parser import Parser 45from .runtime import Context 46from .runtime import new_context 47from .runtime import Undefined 48from .utils import _PassArg 49from .utils import concat 50from .utils import consume 51from .utils import import_string 52from .utils import internalcode 53from .utils import LRUCache 54from .utils import missing 55 56if t.TYPE_CHECKING: 57 import typing_extensions as te 58 from .bccache import BytecodeCache 59 from .ext import Extension 60 from .loaders import BaseLoader 61 62_env_bound = t.TypeVar("_env_bound", bound="Environment") 63 64 65# for direct template usage we have up to ten living environments 66@lru_cache(maxsize=10) 67def get_spontaneous_environment(cls: t.Type[_env_bound], *args: t.Any) -> _env_bound: 68 """Return a new spontaneous environment. A spontaneous environment 69 is used for templates created directly rather than through an 70 existing environment. 71 72 :param cls: Environment class to create. 73 :param args: Positional arguments passed to environment. 74 """ 75 env = cls(*args) 76 env.shared = True 77 return env 78 79 80def create_cache( 81 size: int, 82) -> t.Optional[t.MutableMapping[t.Tuple[weakref.ref, str], "Template"]]: 83 """Return the cache class for the given size.""" 84 if size == 0: 85 return None 86 87 if size < 0: 88 return {} 89 90 return LRUCache(size) # type: ignore 91 92 93def copy_cache( 94 cache: t.Optional[t.MutableMapping], 95) -> t.Optional[t.MutableMapping[t.Tuple[weakref.ref, str], "Template"]]: 96 """Create an empty copy of the given cache.""" 97 if cache is None: 98 return None 99 100 if type(cache) is dict: 101 return {} 102 103 return LRUCache(cache.capacity) # type: ignore 104 105 106def load_extensions( 107 environment: "Environment", 108 extensions: t.Sequence[t.Union[str, t.Type["Extension"]]], 109) -> t.Dict[str, "Extension"]: 110 """Load the extensions from the list and bind it to the environment. 111 Returns a dict of instantiated extensions. 112 """ 113 result = {} 114 115 for extension in extensions: 116 if isinstance(extension, str): 117 extension = t.cast(t.Type["Extension"], import_string(extension)) 118 119 result[extension.identifier] = extension(environment) 120 121 return result 122 123 124def _environment_config_check(environment: "Environment") -> "Environment": 125 """Perform a sanity check on the environment.""" 126 assert issubclass( 127 environment.undefined, Undefined 128 ), "'undefined' must be a subclass of 'jinja2.Undefined'." 129 assert ( 130 environment.block_start_string 131 != environment.variable_start_string 132 != environment.comment_start_string 133 ), "block, variable and comment start strings must be different." 134 assert environment.newline_sequence in { 135 "\r", 136 "\r\n", 137 "\n", 138 }, "'newline_sequence' must be one of '\\n', '\\r\\n', or '\\r'." 139 return environment 140 141 142class Environment: 143 r"""The core component of Jinja is the `Environment`. It contains 144 important shared variables like configuration, filters, tests, 145 globals and others. Instances of this class may be modified if 146 they are not shared and if no template was loaded so far. 147 Modifications on environments after the first template was loaded 148 will lead to surprising effects and undefined behavior. 149 150 Here are the possible initialization parameters: 151 152 `block_start_string` 153 The string marking the beginning of a block. Defaults to ``'{%'``. 154 155 `block_end_string` 156 The string marking the end of a block. Defaults to ``'%}'``. 157 158 `variable_start_string` 159 The string marking the beginning of a print statement. 160 Defaults to ``'{{'``. 161 162 `variable_end_string` 163 The string marking the end of a print statement. Defaults to 164 ``'}}'``. 165 166 `comment_start_string` 167 The string marking the beginning of a comment. Defaults to ``'{#'``. 168 169 `comment_end_string` 170 The string marking the end of a comment. Defaults to ``'#}'``. 171 172 `line_statement_prefix` 173 If given and a string, this will be used as prefix for line based 174 statements. See also :ref:`line-statements`. 175 176 `line_comment_prefix` 177 If given and a string, this will be used as prefix for line based 178 comments. See also :ref:`line-statements`. 179 180 .. versionadded:: 2.2 181 182 `trim_blocks` 183 If this is set to ``True`` the first newline after a block is 184 removed (block, not variable tag!). Defaults to `False`. 185 186 `lstrip_blocks` 187 If this is set to ``True`` leading spaces and tabs are stripped 188 from the start of a line to a block. Defaults to `False`. 189 190 `newline_sequence` 191 The sequence that starts a newline. Must be one of ``'\r'``, 192 ``'\n'`` or ``'\r\n'``. The default is ``'\n'`` which is a 193 useful default for Linux and OS X systems as well as web 194 applications. 195 196 `keep_trailing_newline` 197 Preserve the trailing newline when rendering templates. 198 The default is ``False``, which causes a single newline, 199 if present, to be stripped from the end of the template. 200 201 .. versionadded:: 2.7 202 203 `extensions` 204 List of Jinja extensions to use. This can either be import paths 205 as strings or extension classes. For more information have a 206 look at :ref:`the extensions documentation <jinja-extensions>`. 207 208 `optimized` 209 should the optimizer be enabled? Default is ``True``. 210 211 `undefined` 212 :class:`Undefined` or a subclass of it that is used to represent 213 undefined values in the template. 214 215 `finalize` 216 A callable that can be used to process the result of a variable 217 expression before it is output. For example one can convert 218 ``None`` implicitly into an empty string here. 219 220 `autoescape` 221 If set to ``True`` the XML/HTML autoescaping feature is enabled by 222 default. For more details about autoescaping see 223 :class:`~markupsafe.Markup`. As of Jinja 2.4 this can also 224 be a callable that is passed the template name and has to 225 return ``True`` or ``False`` depending on autoescape should be 226 enabled by default. 227 228 .. versionchanged:: 2.4 229 `autoescape` can now be a function 230 231 `loader` 232 The template loader for this environment. 233 234 `cache_size` 235 The size of the cache. Per default this is ``400`` which means 236 that if more than 400 templates are loaded the loader will clean 237 out the least recently used template. If the cache size is set to 238 ``0`` templates are recompiled all the time, if the cache size is 239 ``-1`` the cache will not be cleaned. 240 241 .. versionchanged:: 2.8 242 The cache size was increased to 400 from a low 50. 243 244 `auto_reload` 245 Some loaders load templates from locations where the template 246 sources may change (ie: file system or database). If 247 ``auto_reload`` is set to ``True`` (default) every time a template is 248 requested the loader checks if the source changed and if yes, it 249 will reload the template. For higher performance it's possible to 250 disable that. 251 252 `bytecode_cache` 253 If set to a bytecode cache object, this object will provide a 254 cache for the internal Jinja bytecode so that templates don't 255 have to be parsed if they were not changed. 256 257 See :ref:`bytecode-cache` for more information. 258 259 `enable_async` 260 If set to true this enables async template execution which 261 allows using async functions and generators. 262 """ 263 264 #: if this environment is sandboxed. Modifying this variable won't make 265 #: the environment sandboxed though. For a real sandboxed environment 266 #: have a look at jinja2.sandbox. This flag alone controls the code 267 #: generation by the compiler. 268 sandboxed = False 269 270 #: True if the environment is just an overlay 271 overlayed = False 272 273 #: the environment this environment is linked to if it is an overlay 274 linked_to: t.Optional["Environment"] = None 275 276 #: shared environments have this set to `True`. A shared environment 277 #: must not be modified 278 shared = False 279 280 #: the class that is used for code generation. See 281 #: :class:`~jinja2.compiler.CodeGenerator` for more information. 282 code_generator_class: t.Type["CodeGenerator"] = CodeGenerator 283 284 concat = "".join 285 286 #: the context class that is used for templates. See 287 #: :class:`~jinja2.runtime.Context` for more information. 288 context_class: t.Type[Context] = Context 289 290 template_class: t.Type["Template"] 291 292 def __init__( 293 self, 294 block_start_string: str = BLOCK_START_STRING, 295 block_end_string: str = BLOCK_END_STRING, 296 variable_start_string: str = VARIABLE_START_STRING, 297 variable_end_string: str = VARIABLE_END_STRING, 298 comment_start_string: str = COMMENT_START_STRING, 299 comment_end_string: str = COMMENT_END_STRING, 300 line_statement_prefix: t.Optional[str] = LINE_STATEMENT_PREFIX, 301 line_comment_prefix: t.Optional[str] = LINE_COMMENT_PREFIX, 302 trim_blocks: bool = TRIM_BLOCKS, 303 lstrip_blocks: bool = LSTRIP_BLOCKS, 304 newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = NEWLINE_SEQUENCE, 305 keep_trailing_newline: bool = KEEP_TRAILING_NEWLINE, 306 extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = (), 307 optimized: bool = True, 308 undefined: t.Type[Undefined] = Undefined, 309 finalize: t.Optional[t.Callable[..., t.Any]] = None, 310 autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = False, 311 loader: t.Optional["BaseLoader"] = None, 312 cache_size: int = 400, 313 auto_reload: bool = True, 314 bytecode_cache: t.Optional["BytecodeCache"] = None, 315 enable_async: bool = False, 316 ): 317 # !!Important notice!! 318 # The constructor accepts quite a few arguments that should be 319 # passed by keyword rather than position. However it's important to 320 # not change the order of arguments because it's used at least 321 # internally in those cases: 322 # - spontaneous environments (i18n extension and Template) 323 # - unittests 324 # If parameter changes are required only add parameters at the end 325 # and don't change the arguments (or the defaults!) of the arguments 326 # existing already. 327 328 # lexer / parser information 329 self.block_start_string = block_start_string 330 self.block_end_string = block_end_string 331 self.variable_start_string = variable_start_string 332 self.variable_end_string = variable_end_string 333 self.comment_start_string = comment_start_string 334 self.comment_end_string = comment_end_string 335 self.line_statement_prefix = line_statement_prefix 336 self.line_comment_prefix = line_comment_prefix 337 self.trim_blocks = trim_blocks 338 self.lstrip_blocks = lstrip_blocks 339 self.newline_sequence = newline_sequence 340 self.keep_trailing_newline = keep_trailing_newline 341 342 # runtime information 343 self.undefined: t.Type[Undefined] = undefined 344 self.optimized = optimized 345 self.finalize = finalize 346 self.autoescape = autoescape 347 348 # defaults 349 self.filters = DEFAULT_FILTERS.copy() 350 self.tests = DEFAULT_TESTS.copy() 351 self.globals = DEFAULT_NAMESPACE.copy() 352 353 # set the loader provided 354 self.loader = loader 355 self.cache = create_cache(cache_size) 356 self.bytecode_cache = bytecode_cache 357 self.auto_reload = auto_reload 358 359 # configurable policies 360 self.policies = DEFAULT_POLICIES.copy() 361 362 # load extensions 363 self.extensions = load_extensions(self, extensions) 364 365 self.is_async = enable_async 366 _environment_config_check(self) 367 368 def add_extension(self, extension: t.Union[str, t.Type["Extension"]]) -> None: 369 """Adds an extension after the environment was created. 370 371 .. versionadded:: 2.5 372 """ 373 self.extensions.update(load_extensions(self, [extension])) 374 375 def extend(self, **attributes: t.Any) -> None: 376 """Add the items to the instance of the environment if they do not exist 377 yet. This is used by :ref:`extensions <writing-extensions>` to register 378 callbacks and configuration values without breaking inheritance. 379 """ 380 for key, value in attributes.items(): 381 if not hasattr(self, key): 382 setattr(self, key, value) 383 384 def overlay( 385 self, 386 block_start_string: str = missing, 387 block_end_string: str = missing, 388 variable_start_string: str = missing, 389 variable_end_string: str = missing, 390 comment_start_string: str = missing, 391 comment_end_string: str = missing, 392 line_statement_prefix: t.Optional[str] = missing, 393 line_comment_prefix: t.Optional[str] = missing, 394 trim_blocks: bool = missing, 395 lstrip_blocks: bool = missing, 396 newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = missing, 397 keep_trailing_newline: bool = missing, 398 extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = missing, 399 optimized: bool = missing, 400 undefined: t.Type[Undefined] = missing, 401 finalize: t.Optional[t.Callable[..., t.Any]] = missing, 402 autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = missing, 403 loader: t.Optional["BaseLoader"] = missing, 404 cache_size: int = missing, 405 auto_reload: bool = missing, 406 bytecode_cache: t.Optional["BytecodeCache"] = missing, 407 enable_async: bool = False, 408 ) -> "Environment": 409 """Create a new overlay environment that shares all the data with the 410 current environment except for cache and the overridden attributes. 411 Extensions cannot be removed for an overlayed environment. An overlayed 412 environment automatically gets all the extensions of the environment it 413 is linked to plus optional extra extensions. 414 415 Creating overlays should happen after the initial environment was set 416 up completely. Not all attributes are truly linked, some are just 417 copied over so modifications on the original environment may not shine 418 through. 419 420 .. versionchanged:: 3.1.2 421 Added the ``newline_sequence``,, ``keep_trailing_newline``, 422 and ``enable_async`` parameters to match ``__init__``. 423 """ 424 args = dict(locals()) 425 del args["self"], args["cache_size"], args["extensions"], args["enable_async"] 426 427 rv = object.__new__(self.__class__) 428 rv.__dict__.update(self.__dict__) 429 rv.overlayed = True 430 rv.linked_to = self 431 432 for key, value in args.items(): 433 if value is not missing: 434 setattr(rv, key, value) 435 436 if cache_size is not missing: 437 rv.cache = create_cache(cache_size) 438 else: 439 rv.cache = copy_cache(self.cache) 440 441 rv.extensions = {} 442 for key, value in self.extensions.items(): 443 rv.extensions[key] = value.bind(rv) 444 if extensions is not missing: 445 rv.extensions.update(load_extensions(rv, extensions)) 446 447 if enable_async is not missing: 448 rv.is_async = enable_async 449 450 return _environment_config_check(rv) 451 452 @property 453 def lexer(self) -> Lexer: 454 """The lexer for this environment.""" 455 return get_lexer(self) 456 457 def iter_extensions(self) -> t.Iterator["Extension"]: 458 """Iterates over the extensions by priority.""" 459 return iter(sorted(self.extensions.values(), key=lambda x: x.priority)) 460 461 def getitem( 462 self, obj: t.Any, argument: t.Union[str, t.Any] 463 ) -> t.Union[t.Any, Undefined]: 464 """Get an item or attribute of an object but prefer the item.""" 465 try: 466 return obj[argument] 467 except (AttributeError, TypeError, LookupError): 468 if isinstance(argument, str): 469 try: 470 attr = str(argument) 471 except Exception: 472 pass 473 else: 474 try: 475 return getattr(obj, attr) 476 except AttributeError: 477 pass 478 return self.undefined(obj=obj, name=argument) 479 480 def getattr(self, obj: t.Any, attribute: str) -> t.Any: 481 """Get an item or attribute of an object but prefer the attribute. 482 Unlike :meth:`getitem` the attribute *must* be a string. 483 """ 484 try: 485 return getattr(obj, attribute) 486 except AttributeError: 487 pass 488 try: 489 return obj[attribute] 490 except (TypeError, LookupError, AttributeError): 491 return self.undefined(obj=obj, name=attribute) 492 493 def _filter_test_common( 494 self, 495 name: t.Union[str, Undefined], 496 value: t.Any, 497 args: t.Optional[t.Sequence[t.Any]], 498 kwargs: t.Optional[t.Mapping[str, t.Any]], 499 context: t.Optional[Context], 500 eval_ctx: t.Optional[EvalContext], 501 is_filter: bool, 502 ) -> t.Any: 503 if is_filter: 504 env_map = self.filters 505 type_name = "filter" 506 else: 507 env_map = self.tests 508 type_name = "test" 509 510 func = env_map.get(name) # type: ignore 511 512 if func is None: 513 msg = f"No {type_name} named {name!r}." 514 515 if isinstance(name, Undefined): 516 try: 517 name._fail_with_undefined_error() 518 except Exception as e: 519 msg = f"{msg} ({e}; did you forget to quote the callable name?)" 520 521 raise TemplateRuntimeError(msg) 522 523 args = [value, *(args if args is not None else ())] 524 kwargs = kwargs if kwargs is not None else {} 525 pass_arg = _PassArg.from_obj(func) 526 527 if pass_arg is _PassArg.context: 528 if context is None: 529 raise TemplateRuntimeError( 530 f"Attempted to invoke a context {type_name} without context." 531 ) 532 533 args.insert(0, context) 534 elif pass_arg is _PassArg.eval_context: 535 if eval_ctx is None: 536 if context is not None: 537 eval_ctx = context.eval_ctx 538 else: 539 eval_ctx = EvalContext(self) 540 541 args.insert(0, eval_ctx) 542 elif pass_arg is _PassArg.environment: 543 args.insert(0, self) 544 545 return func(*args, **kwargs) 546 547 def call_filter( 548 self, 549 name: str, 550 value: t.Any, 551 args: t.Optional[t.Sequence[t.Any]] = None, 552 kwargs: t.Optional[t.Mapping[str, t.Any]] = None, 553 context: t.Optional[Context] = None, 554 eval_ctx: t.Optional[EvalContext] = None, 555 ) -> t.Any: 556 """Invoke a filter on a value the same way the compiler does. 557 558 This might return a coroutine if the filter is running from an 559 environment in async mode and the filter supports async 560 execution. It's your responsibility to await this if needed. 561 562 .. versionadded:: 2.7 563 """ 564 return self._filter_test_common( 565 name, value, args, kwargs, context, eval_ctx, True 566 ) 567 568 def call_test( 569 self, 570 name: str, 571 value: t.Any, 572 args: t.Optional[t.Sequence[t.Any]] = None, 573 kwargs: t.Optional[t.Mapping[str, t.Any]] = None, 574 context: t.Optional[Context] = None, 575 eval_ctx: t.Optional[EvalContext] = None, 576 ) -> t.Any: 577 """Invoke a test on a value the same way the compiler does. 578 579 This might return a coroutine if the test is running from an 580 environment in async mode and the test supports async execution. 581 It's your responsibility to await this if needed. 582 583 .. versionchanged:: 3.0 584 Tests support ``@pass_context``, etc. decorators. Added 585 the ``context`` and ``eval_ctx`` parameters. 586 587 .. versionadded:: 2.7 588 """ 589 return self._filter_test_common( 590 name, value, args, kwargs, context, eval_ctx, False 591 ) 592 593 @internalcode 594 def parse( 595 self, 596 source: str, 597 name: t.Optional[str] = None, 598 filename: t.Optional[str] = None, 599 ) -> nodes.Template: 600 """Parse the sourcecode and return the abstract syntax tree. This 601 tree of nodes is used by the compiler to convert the template into 602 executable source- or bytecode. This is useful for debugging or to 603 extract information from templates. 604 605 If you are :ref:`developing Jinja extensions <writing-extensions>` 606 this gives you a good overview of the node tree generated. 607 """ 608 try: 609 return self._parse(source, name, filename) 610 except TemplateSyntaxError: 611 self.handle_exception(source=source) 612 613 def _parse( 614 self, source: str, name: t.Optional[str], filename: t.Optional[str] 615 ) -> nodes.Template: 616 """Internal parsing function used by `parse` and `compile`.""" 617 return Parser(self, source, name, filename).parse() 618 619 def lex( 620 self, 621 source: str, 622 name: t.Optional[str] = None, 623 filename: t.Optional[str] = None, 624 ) -> t.Iterator[t.Tuple[int, str, str]]: 625 """Lex the given sourcecode and return a generator that yields 626 tokens as tuples in the form ``(lineno, token_type, value)``. 627 This can be useful for :ref:`extension development <writing-extensions>` 628 and debugging templates. 629 630 This does not perform preprocessing. If you want the preprocessing 631 of the extensions to be applied you have to filter source through 632 the :meth:`preprocess` method. 633 """ 634 source = str(source) 635 try: 636 return self.lexer.tokeniter(source, name, filename) 637 except TemplateSyntaxError: 638 self.handle_exception(source=source) 639 640 def preprocess( 641 self, 642 source: str, 643 name: t.Optional[str] = None, 644 filename: t.Optional[str] = None, 645 ) -> str: 646 """Preprocesses the source with all extensions. This is automatically 647 called for all parsing and compiling methods but *not* for :meth:`lex` 648 because there you usually only want the actual source tokenized. 649 """ 650 return reduce( 651 lambda s, e: e.preprocess(s, name, filename), 652 self.iter_extensions(), 653 str(source), 654 ) 655 656 def _tokenize( 657 self, 658 source: str, 659 name: t.Optional[str], 660 filename: t.Optional[str] = None, 661 state: t.Optional[str] = None, 662 ) -> TokenStream: 663 """Called by the parser to do the preprocessing and filtering 664 for all the extensions. Returns a :class:`~jinja2.lexer.TokenStream`. 665 """ 666 source = self.preprocess(source, name, filename) 667 stream = self.lexer.tokenize(source, name, filename, state) 668 669 for ext in self.iter_extensions(): 670 stream = ext.filter_stream(stream) # type: ignore 671 672 if not isinstance(stream, TokenStream): 673 stream = TokenStream(stream, name, filename) # type: ignore 674 675 return stream 676 677 def _generate( 678 self, 679 source: nodes.Template, 680 name: t.Optional[str], 681 filename: t.Optional[str], 682 defer_init: bool = False, 683 ) -> str: 684 """Internal hook that can be overridden to hook a different generate 685 method in. 686 687 .. versionadded:: 2.5 688 """ 689 return generate( # type: ignore 690 source, 691 self, 692 name, 693 filename, 694 defer_init=defer_init, 695 optimized=self.optimized, 696 ) 697 698 def _compile(self, source: str, filename: str) -> CodeType: 699 """Internal hook that can be overridden to hook a different compile 700 method in. 701 702 .. versionadded:: 2.5 703 """ 704 return compile(source, filename, "exec") # type: ignore 705 706 @typing.overload 707 def compile( # type: ignore 708 self, 709 source: t.Union[str, nodes.Template], 710 name: t.Optional[str] = None, 711 filename: t.Optional[str] = None, 712 raw: "te.Literal[False]" = False, 713 defer_init: bool = False, 714 ) -> CodeType: 715 ... 716 717 @typing.overload 718 def compile( 719 self, 720 source: t.Union[str, nodes.Template], 721 name: t.Optional[str] = None, 722 filename: t.Optional[str] = None, 723 raw: "te.Literal[True]" = ..., 724 defer_init: bool = False, 725 ) -> str: 726 ... 727 728 @internalcode 729 def compile( 730 self, 731 source: t.Union[str, nodes.Template], 732 name: t.Optional[str] = None, 733 filename: t.Optional[str] = None, 734 raw: bool = False, 735 defer_init: bool = False, 736 ) -> t.Union[str, CodeType]: 737 """Compile a node or template source code. The `name` parameter is 738 the load name of the template after it was joined using 739 :meth:`join_path` if necessary, not the filename on the file system. 740 the `filename` parameter is the estimated filename of the template on 741 the file system. If the template came from a database or memory this 742 can be omitted. 743 744 The return value of this method is a python code object. If the `raw` 745 parameter is `True` the return value will be a string with python 746 code equivalent to the bytecode returned otherwise. This method is 747 mainly used internally. 748 749 `defer_init` is use internally to aid the module code generator. This 750 causes the generated code to be able to import without the global 751 environment variable to be set. 752 753 .. versionadded:: 2.4 754 `defer_init` parameter added. 755 """ 756 source_hint = None 757 try: 758 if isinstance(source, str): 759 source_hint = source 760 source = self._parse(source, name, filename) 761 source = self._generate(source, name, filename, defer_init=defer_init) 762 if raw: 763 return source 764 if filename is None: 765 filename = "<template>" 766 return self._compile(source, filename) 767 except TemplateSyntaxError: 768 self.handle_exception(source=source_hint) 769 770 def compile_expression( 771 self, source: str, undefined_to_none: bool = True 772 ) -> "TemplateExpression": 773 """A handy helper method that returns a callable that accepts keyword 774 arguments that appear as variables in the expression. If called it 775 returns the result of the expression. 776 777 This is useful if applications want to use the same rules as Jinja 778 in template "configuration files" or similar situations. 779 780 Example usage: 781 782 >>> env = Environment() 783 >>> expr = env.compile_expression('foo == 42') 784 >>> expr(foo=23) 785 False 786 >>> expr(foo=42) 787 True 788 789 Per default the return value is converted to `None` if the 790 expression returns an undefined value. This can be changed 791 by setting `undefined_to_none` to `False`. 792 793 >>> env.compile_expression('var')() is None 794 True 795 >>> env.compile_expression('var', undefined_to_none=False)() 796 Undefined 797 798 .. versionadded:: 2.1 799 """ 800 parser = Parser(self, source, state="variable") 801 try: 802 expr = parser.parse_expression() 803 if not parser.stream.eos: 804 raise TemplateSyntaxError( 805 "chunk after expression", parser.stream.current.lineno, None, None 806 ) 807 expr.set_environment(self) 808 except TemplateSyntaxError: 809 self.handle_exception(source=source) 810 811 body = [nodes.Assign(nodes.Name("result", "store"), expr, lineno=1)] 812 template = self.from_string(nodes.Template(body, lineno=1)) 813 return TemplateExpression(template, undefined_to_none) 814 815 def compile_templates( 816 self, 817 target: t.Union[str, os.PathLike], 818 extensions: t.Optional[t.Collection[str]] = None, 819 filter_func: t.Optional[t.Callable[[str], bool]] = None, 820 zip: t.Optional[str] = "deflated", 821 log_function: t.Optional[t.Callable[[str], None]] = None, 822 ignore_errors: bool = True, 823 ) -> None: 824 """Finds all the templates the loader can find, compiles them 825 and stores them in `target`. If `zip` is `None`, instead of in a 826 zipfile, the templates will be stored in a directory. 827 By default a deflate zip algorithm is used. To switch to 828 the stored algorithm, `zip` can be set to ``'stored'``. 829 830 `extensions` and `filter_func` are passed to :meth:`list_templates`. 831 Each template returned will be compiled to the target folder or 832 zipfile. 833 834 By default template compilation errors are ignored. In case a 835 log function is provided, errors are logged. If you want template 836 syntax errors to abort the compilation you can set `ignore_errors` 837 to `False` and you will get an exception on syntax errors. 838 839 .. versionadded:: 2.4 840 """ 841 from .loaders import ModuleLoader 842 843 if log_function is None: 844 845 def log_function(x: str) -> None: 846 pass 847 848 assert log_function is not None 849 assert self.loader is not None, "No loader configured." 850 851 def write_file(filename: str, data: str) -> None: 852 if zip: 853 info = ZipInfo(filename) 854 info.external_attr = 0o755 << 16 855 zip_file.writestr(info, data) 856 else: 857 with open(os.path.join(target, filename), "wb") as f: 858 f.write(data.encode("utf8")) 859 860 if zip is not None: 861 from zipfile import ZipFile, ZipInfo, ZIP_DEFLATED, ZIP_STORED 862 863 zip_file = ZipFile( 864 target, "w", dict(deflated=ZIP_DEFLATED, stored=ZIP_STORED)[zip] 865 ) 866 log_function(f"Compiling into Zip archive {target!r}") 867 else: 868 if not os.path.isdir(target): 869 os.makedirs(target) 870 log_function(f"Compiling into folder {target!r}") 871 872 try: 873 for name in self.list_templates(extensions, filter_func): 874 source, filename, _ = self.loader.get_source(self, name) 875 try: 876 code = self.compile(source, name, filename, True, True) 877 except TemplateSyntaxError as e: 878 if not ignore_errors: 879 raise 880 log_function(f'Could not compile "{name}": {e}') 881 continue 882 883 filename = ModuleLoader.get_module_filename(name) 884 885 write_file(filename, code) 886 log_function(f'Compiled "{name}" as {filename}') 887 finally: 888 if zip: 889 zip_file.close() 890 891 log_function("Finished compiling templates") 892 893 def list_templates( 894 self, 895 extensions: t.Optional[t.Collection[str]] = None, 896 filter_func: t.Optional[t.Callable[[str], bool]] = None, 897 ) -> t.List[str]: 898 """Returns a list of templates for this environment. This requires 899 that the loader supports the loader's 900 :meth:`~BaseLoader.list_templates` method. 901 902 If there are other files in the template folder besides the 903 actual templates, the returned list can be filtered. There are two 904 ways: either `extensions` is set to a list of file extensions for 905 templates, or a `filter_func` can be provided which is a callable that 906 is passed a template name and should return `True` if it should end up 907 in the result list. 908 909 If the loader does not support that, a :exc:`TypeError` is raised. 910 911 .. versionadded:: 2.4 912 """ 913 assert self.loader is not None, "No loader configured." 914 names = self.loader.list_templates() 915 916 if extensions is not None: 917 if filter_func is not None: 918 raise TypeError( 919 "either extensions or filter_func can be passed, but not both" 920 ) 921 922 def filter_func(x: str) -> bool: 923 return "." in x and x.rsplit(".", 1)[1] in extensions # type: ignore 924 925 if filter_func is not None: 926 names = [name for name in names if filter_func(name)] 927 928 return names 929 930 def handle_exception(self, source: t.Optional[str] = None) -> "te.NoReturn": 931 """Exception handling helper. This is used internally to either raise 932 rewritten exceptions or return a rendered traceback for the template. 933 """ 934 from .debug import rewrite_traceback_stack 935 936 raise rewrite_traceback_stack(source=source) 937 938 def join_path(self, template: str, parent: str) -> str: 939 """Join a template with the parent. By default all the lookups are 940 relative to the loader root so this method returns the `template` 941 parameter unchanged, but if the paths should be relative to the 942 parent template, this function can be used to calculate the real 943 template name. 944 945 Subclasses may override this method and implement template path 946 joining here. 947 """ 948 return template 949 950 @internalcode 951 def _load_template( 952 self, name: str, globals: t.Optional[t.MutableMapping[str, t.Any]] 953 ) -> "Template": 954 if self.loader is None: 955 raise TypeError("no loader for this environment specified") 956 cache_key = (weakref.ref(self.loader), name) 957 if self.cache is not None: 958 template = self.cache.get(cache_key) 959 if template is not None and ( 960 not self.auto_reload or template.is_up_to_date 961 ): 962 # template.globals is a ChainMap, modifying it will only 963 # affect the template, not the environment globals. 964 if globals: 965 template.globals.update(globals) 966 967 return template 968 969 template = self.loader.load(self, name, self.make_globals(globals)) 970 971 if self.cache is not None: 972 self.cache[cache_key] = template 973 return template 974 975 @internalcode 976 def get_template( 977 self, 978 name: t.Union[str, "Template"], 979 parent: t.Optional[str] = None, 980 globals: t.Optional[t.MutableMapping[str, t.Any]] = None, 981 ) -> "Template": 982 """Load a template by name with :attr:`loader` and return a 983 :class:`Template`. If the template does not exist a 984 :exc:`TemplateNotFound` exception is raised. 985 986 :param name: Name of the template to load. When loading 987 templates from the filesystem, "/" is used as the path 988 separator, even on Windows. 989 :param parent: The name of the parent template importing this 990 template. :meth:`join_path` can be used to implement name 991 transformations with this. 992 :param globals: Extend the environment :attr:`globals` with 993 these extra variables available for all renders of this 994 template. If the template has already been loaded and 995 cached, its globals are updated with any new items. 996 997 .. versionchanged:: 3.0 998 If a template is loaded from cache, ``globals`` will update 999 the template's globals instead of ignoring the new values. 1000 1001 .. versionchanged:: 2.4 1002 If ``name`` is a :class:`Template` object it is returned 1003 unchanged. 1004 """ 1005 if isinstance(name, Template): 1006 return name 1007 if parent is not None: 1008 name = self.join_path(name, parent) 1009 1010 return self._load_template(name, globals) 1011 1012 @internalcode 1013 def select_template( 1014 self, 1015 names: t.Iterable[t.Union[str, "Template"]], 1016 parent: t.Optional[str] = None, 1017 globals: t.Optional[t.MutableMapping[str, t.Any]] = None, 1018 ) -> "Template": 1019 """Like :meth:`get_template`, but tries loading multiple names. 1020 If none of the names can be loaded a :exc:`TemplatesNotFound` 1021 exception is raised. 1022 1023 :param names: List of template names to try loading in order. 1024 :param parent: The name of the parent template importing this 1025 template. :meth:`join_path` can be used to implement name 1026 transformations with this. 1027 :param globals: Extend the environment :attr:`globals` with 1028 these extra variables available for all renders of this 1029 template. If the template has already been loaded and 1030 cached, its globals are updated with any new items. 1031 1032 .. versionchanged:: 3.0 1033 If a template is loaded from cache, ``globals`` will update 1034 the template's globals instead of ignoring the new values. 1035 1036 .. versionchanged:: 2.11 1037 If ``names`` is :class:`Undefined`, an :exc:`UndefinedError` 1038 is raised instead. If no templates were found and ``names`` 1039 contains :class:`Undefined`, the message is more helpful. 1040 1041 .. versionchanged:: 2.4 1042 If ``names`` contains a :class:`Template` object it is 1043 returned unchanged. 1044 1045 .. versionadded:: 2.3 1046 """ 1047 if isinstance(names, Undefined): 1048 names._fail_with_undefined_error() 1049 1050 if not names: 1051 raise TemplatesNotFound( 1052 message="Tried to select from an empty list of templates." 1053 ) 1054 1055 for name in names: 1056 if isinstance(name, Template): 1057 return name 1058 if parent is not None: 1059 name = self.join_path(name, parent) 1060 try: 1061 return self._load_template(name, globals) 1062 except (TemplateNotFound, UndefinedError): 1063 pass 1064 raise TemplatesNotFound(names) # type: ignore 1065 1066 @internalcode 1067 def get_or_select_template( 1068 self, 1069 template_name_or_list: t.Union[ 1070 str, "Template", t.List[t.Union[str, "Template"]] 1071 ], 1072 parent: t.Optional[str] = None, 1073 globals: t.Optional[t.MutableMapping[str, t.Any]] = None, 1074 ) -> "Template": 1075 """Use :meth:`select_template` if an iterable of template names 1076 is given, or :meth:`get_template` if one name is given. 1077 1078 .. versionadded:: 2.3 1079 """ 1080 if isinstance(template_name_or_list, (str, Undefined)): 1081 return self.get_template(template_name_or_list, parent, globals) 1082 elif isinstance(template_name_or_list, Template): 1083 return template_name_or_list 1084 return self.select_template(template_name_or_list, parent, globals) 1085 1086 def from_string( 1087 self, 1088 source: t.Union[str, nodes.Template], 1089 globals: t.Optional[t.MutableMapping[str, t.Any]] = None, 1090 template_class: t.Optional[t.Type["Template"]] = None, 1091 ) -> "Template": 1092 """Load a template from a source string without using 1093 :attr:`loader`. 1094 1095 :param source: Jinja source to compile into a template. 1096 :param globals: Extend the environment :attr:`globals` with 1097 these extra variables available for all renders of this 1098 template. If the template has already been loaded and 1099 cached, its globals are updated with any new items. 1100 :param template_class: Return an instance of this 1101 :class:`Template` class. 1102 """ 1103 gs = self.make_globals(globals) 1104 cls = template_class or self.template_class 1105 return cls.from_code(self, self.compile(source), gs, None) 1106 1107 def make_globals( 1108 self, d: t.Optional[t.MutableMapping[str, t.Any]] 1109 ) -> t.MutableMapping[str, t.Any]: 1110 """Make the globals map for a template. Any given template 1111 globals overlay the environment :attr:`globals`. 1112 1113 Returns a :class:`collections.ChainMap`. This allows any changes 1114 to a template's globals to only affect that template, while 1115 changes to the environment's globals are still reflected. 1116 However, avoid modifying any globals after a template is loaded. 1117 1118 :param d: Dict of template-specific globals. 1119 1120 .. versionchanged:: 3.0 1121 Use :class:`collections.ChainMap` to always prevent mutating 1122 environment globals. 1123 """ 1124 if d is None: 1125 d = {} 1126 1127 return ChainMap(d, self.globals) 1128 1129 1130class Template: 1131 """A compiled template that can be rendered. 1132 1133 Use the methods on :class:`Environment` to create or load templates. 1134 The environment is used to configure how templates are compiled and 1135 behave. 1136 1137 It is also possible to create a template object directly. This is 1138 not usually recommended. The constructor takes most of the same 1139 arguments as :class:`Environment`. All templates created with the 1140 same environment arguments share the same ephemeral ``Environment`` 1141 instance behind the scenes. 1142 1143 A template object should be considered immutable. Modifications on 1144 the object are not supported. 1145 """ 1146 1147 #: Type of environment to create when creating a template directly 1148 #: rather than through an existing environment. 1149 environment_class: t.Type[Environment] = Environment 1150 1151 environment: Environment 1152 globals: t.MutableMapping[str, t.Any] 1153 name: t.Optional[str] 1154 filename: t.Optional[str] 1155 blocks: t.Dict[str, t.Callable[[Context], t.Iterator[str]]] 1156 root_render_func: t.Callable[[Context], t.Iterator[str]] 1157 _module: t.Optional["TemplateModule"] 1158 _debug_info: str 1159 _uptodate: t.Optional[t.Callable[[], bool]] 1160 1161 def __new__( 1162 cls, 1163 source: t.Union[str, nodes.Template], 1164 block_start_string: str = BLOCK_START_STRING, 1165 block_end_string: str = BLOCK_END_STRING, 1166 variable_start_string: str = VARIABLE_START_STRING, 1167 variable_end_string: str = VARIABLE_END_STRING, 1168 comment_start_string: str = COMMENT_START_STRING, 1169 comment_end_string: str = COMMENT_END_STRING, 1170 line_statement_prefix: t.Optional[str] = LINE_STATEMENT_PREFIX, 1171 line_comment_prefix: t.Optional[str] = LINE_COMMENT_PREFIX, 1172 trim_blocks: bool = TRIM_BLOCKS, 1173 lstrip_blocks: bool = LSTRIP_BLOCKS, 1174 newline_sequence: "te.Literal['\\n', '\\r\\n', '\\r']" = NEWLINE_SEQUENCE, 1175 keep_trailing_newline: bool = KEEP_TRAILING_NEWLINE, 1176 extensions: t.Sequence[t.Union[str, t.Type["Extension"]]] = (), 1177 optimized: bool = True, 1178 undefined: t.Type[Undefined] = Undefined, 1179 finalize: t.Optional[t.Callable[..., t.Any]] = None, 1180 autoescape: t.Union[bool, t.Callable[[t.Optional[str]], bool]] = False, 1181 enable_async: bool = False, 1182 ) -> t.Any: # it returns a `Template`, but this breaks the sphinx build... 1183 env = get_spontaneous_environment( 1184 cls.environment_class, # type: ignore 1185 block_start_string, 1186 block_end_string, 1187 variable_start_string, 1188 variable_end_string, 1189 comment_start_string, 1190 comment_end_string, 1191 line_statement_prefix, 1192 line_comment_prefix, 1193 trim_blocks, 1194 lstrip_blocks, 1195 newline_sequence, 1196 keep_trailing_newline, 1197 frozenset(extensions), 1198 optimized, 1199 undefined, # type: ignore 1200 finalize, 1201 autoescape, 1202 None, 1203 0, 1204 False, 1205 None, 1206 enable_async, 1207 ) 1208 return env.from_string(source, template_class=cls) 1209 1210 @classmethod 1211 def from_code( 1212 cls, 1213 environment: Environment, 1214 code: CodeType, 1215 globals: t.MutableMapping[str, t.Any], 1216 uptodate: t.Optional[t.Callable[[], bool]] = None, 1217 ) -> "Template": 1218 """Creates a template object from compiled code and the globals. This 1219 is used by the loaders and environment to create a template object. 1220 """ 1221 namespace = {"environment": environment, "__file__": code.co_filename} 1222 exec(code, namespace) 1223 rv = cls._from_namespace(environment, namespace, globals) 1224 rv._uptodate = uptodate 1225 return rv 1226 1227 @classmethod 1228 def from_module_dict( 1229 cls, 1230 environment: Environment, 1231 module_dict: t.MutableMapping[str, t.Any], 1232 globals: t.MutableMapping[str, t.Any], 1233 ) -> "Template": 1234 """Creates a template object from a module. This is used by the 1235 module loader to create a template object. 1236 1237 .. versionadded:: 2.4 1238 """ 1239 return cls._from_namespace(environment, module_dict, globals) 1240 1241 @classmethod 1242 def _from_namespace( 1243 cls, 1244 environment: Environment, 1245 namespace: t.MutableMapping[str, t.Any], 1246 globals: t.MutableMapping[str, t.Any], 1247 ) -> "Template": 1248 t: "Template" = object.__new__(cls) 1249 t.environment = environment 1250 t.globals = globals 1251 t.name = namespace["name"] 1252 t.filename = namespace["__file__"] 1253 t.blocks = namespace["blocks"] 1254 1255 # render function and module 1256 t.root_render_func = namespace["root"] # type: ignore 1257 t._module = None 1258 1259 # debug and loader helpers 1260 t._debug_info = namespace["debug_info"] 1261 t._uptodate = None 1262 1263 # store the reference 1264 namespace["environment"] = environment 1265 namespace["__jinja_template__"] = t 1266 1267 return t 1268 1269 def render(self, *args: t.Any, **kwargs: t.Any) -> str: 1270 """This method accepts the same arguments as the `dict` constructor: 1271 A dict, a dict subclass or some keyword arguments. If no arguments 1272 are given the context will be empty. These two calls do the same:: 1273 1274 template.render(knights='that say nih') 1275 template.render({'knights': 'that say nih'}) 1276 1277 This will return the rendered template as a string. 1278 """ 1279 if self.environment.is_async: 1280 import asyncio 1281 1282 close = False 1283 1284 try: 1285 loop = asyncio.get_running_loop() 1286 except RuntimeError: 1287 loop = asyncio.new_event_loop() 1288 close = True 1289 1290 try: 1291 return loop.run_until_complete(self.render_async(*args, **kwargs)) 1292 finally: 1293 if close: 1294 loop.close() 1295 1296 ctx = self.new_context(dict(*args, **kwargs)) 1297 1298 try: 1299 return self.environment.concat(self.root_render_func(ctx)) # type: ignore 1300 except Exception: 1301 self.environment.handle_exception() 1302 1303 async def render_async(self, *args: t.Any, **kwargs: t.Any) -> str: 1304 """This works similar to :meth:`render` but returns a coroutine 1305 that when awaited returns the entire rendered template string. This 1306 requires the async feature to be enabled. 1307 1308 Example usage:: 1309 1310 await template.render_async(knights='that say nih; asynchronously') 1311 """ 1312 if not self.environment.is_async: 1313 raise RuntimeError( 1314 "The environment was not created with async mode enabled." 1315 ) 1316 1317 ctx = self.new_context(dict(*args, **kwargs)) 1318 1319 try: 1320 return self.environment.concat( # type: ignore 1321 [n async for n in self.root_render_func(ctx)] # type: ignore 1322 ) 1323 except Exception: 1324 return self.environment.handle_exception() 1325 1326 def stream(self, *args: t.Any, **kwargs: t.Any) -> "TemplateStream": 1327 """Works exactly like :meth:`generate` but returns a 1328 :class:`TemplateStream`. 1329 """ 1330 return TemplateStream(self.generate(*args, **kwargs)) 1331 1332 def generate(self, *args: t.Any, **kwargs: t.Any) -> t.Iterator[str]: 1333 """For very large templates it can be useful to not render the whole 1334 template at once but evaluate each statement after another and yield 1335 piece for piece. This method basically does exactly that and returns 1336 a generator that yields one item after another as strings. 1337 1338 It accepts the same arguments as :meth:`render`. 1339 """ 1340 if self.environment.is_async: 1341 import asyncio 1342 1343 async def to_list() -> t.List[str]: 1344 return [x async for x in self.generate_async(*args, **kwargs)] 1345 1346 yield from asyncio.run(to_list()) 1347 return 1348 1349 ctx = self.new_context(dict(*args, **kwargs)) 1350 1351 try: 1352 yield from self.root_render_func(ctx) # type: ignore 1353 except Exception: 1354 yield self.environment.handle_exception() 1355 1356 async def generate_async( 1357 self, *args: t.Any, **kwargs: t.Any 1358 ) -> t.AsyncIterator[str]: 1359 """An async version of :meth:`generate`. Works very similarly but 1360 returns an async iterator instead. 1361 """ 1362 if not self.environment.is_async: 1363 raise RuntimeError( 1364 "The environment was not created with async mode enabled." 1365 ) 1366 1367 ctx = self.new_context(dict(*args, **kwargs)) 1368 1369 try: 1370 async for event in self.root_render_func(ctx): # type: ignore 1371 yield event 1372 except Exception: 1373 yield self.environment.handle_exception() 1374 1375 def new_context( 1376 self, 1377 vars: t.Optional[t.Dict[str, t.Any]] = None, 1378 shared: bool = False, 1379 locals: t.Optional[t.Mapping[str, t.Any]] = None, 1380 ) -> Context: 1381 """Create a new :class:`Context` for this template. The vars 1382 provided will be passed to the template. Per default the globals 1383 are added to the context. If shared is set to `True` the data 1384 is passed as is to the context without adding the globals. 1385 1386 `locals` can be a dict of local variables for internal usage. 1387 """ 1388 return new_context( 1389 self.environment, self.name, self.blocks, vars, shared, self.globals, locals 1390 ) 1391 1392 def make_module( 1393 self, 1394 vars: t.Optional[t.Dict[str, t.Any]] = None, 1395 shared: bool = False, 1396 locals: t.Optional[t.Mapping[str, t.Any]] = None, 1397 ) -> "TemplateModule": 1398 """This method works like the :attr:`module` attribute when called 1399 without arguments but it will evaluate the template on every call 1400 rather than caching it. It's also possible to provide 1401 a dict which is then used as context. The arguments are the same 1402 as for the :meth:`new_context` method. 1403 """ 1404 ctx = self.new_context(vars, shared, locals) 1405 return TemplateModule(self, ctx) 1406 1407 async def make_module_async( 1408 self, 1409 vars: t.Optional[t.Dict[str, t.Any]] = None, 1410 shared: bool = False, 1411 locals: t.Optional[t.Mapping[str, t.Any]] = None, 1412 ) -> "TemplateModule": 1413 """As template module creation can invoke template code for 1414 asynchronous executions this method must be used instead of the 1415 normal :meth:`make_module` one. Likewise the module attribute 1416 becomes unavailable in async mode. 1417 """ 1418 ctx = self.new_context(vars, shared, locals) 1419 return TemplateModule( 1420 self, ctx, [x async for x in self.root_render_func(ctx)] # type: ignore 1421 ) 1422 1423 @internalcode 1424 def _get_default_module(self, ctx: t.Optional[Context] = None) -> "TemplateModule": 1425 """If a context is passed in, this means that the template was 1426 imported. Imported templates have access to the current 1427 template's globals by default, but they can only be accessed via 1428 the context during runtime. 1429 1430 If there are new globals, we need to create a new module because 1431 the cached module is already rendered and will not have access 1432 to globals from the current context. This new module is not 1433 cached because the template can be imported elsewhere, and it 1434 should have access to only the current template's globals. 1435 """ 1436 if self.environment.is_async: 1437 raise RuntimeError("Module is not available in async mode.") 1438 1439 if ctx is not None: 1440 keys = ctx.globals_keys - self.globals.keys() 1441 1442 if keys: 1443 return self.make_module({k: ctx.parent[k] for k in keys}) 1444 1445 if self._module is None: 1446 self._module = self.make_module() 1447 1448 return self._module 1449 1450 async def _get_default_module_async( 1451 self, ctx: t.Optional[Context] = None 1452 ) -> "TemplateModule": 1453 if ctx is not None: 1454 keys = ctx.globals_keys - self.globals.keys() 1455 1456 if keys: 1457 return await self.make_module_async({k: ctx.parent[k] for k in keys}) 1458 1459 if self._module is None: 1460 self._module = await self.make_module_async() 1461 1462 return self._module 1463 1464 @property 1465 def module(self) -> "TemplateModule": 1466 """The template as module. This is used for imports in the 1467 template runtime but is also useful if one wants to access 1468 exported template variables from the Python layer: 1469 1470 >>> t = Template('{% macro foo() %}42{% endmacro %}23') 1471 >>> str(t.module) 1472 '23' 1473 >>> t.module.foo() == u'42' 1474 True 1475 1476 This attribute is not available if async mode is enabled. 1477 """ 1478 return self._get_default_module() 1479 1480 def get_corresponding_lineno(self, lineno: int) -> int: 1481 """Return the source line number of a line number in the 1482 generated bytecode as they are not in sync. 1483 """ 1484 for template_line, code_line in reversed(self.debug_info): 1485 if code_line <= lineno: 1486 return template_line 1487 return 1 1488 1489 @property 1490 def is_up_to_date(self) -> bool: 1491 """If this variable is `False` there is a newer version available.""" 1492 if self._uptodate is None: 1493 return True 1494 return self._uptodate() 1495 1496 @property 1497 def debug_info(self) -> t.List[t.Tuple[int, int]]: 1498 """The debug info mapping.""" 1499 if self._debug_info: 1500 return [ 1501 tuple(map(int, x.split("="))) # type: ignore 1502 for x in self._debug_info.split("&") 1503 ] 1504 1505 return [] 1506 1507 def __repr__(self) -> str: 1508 if self.name is None: 1509 name = f"memory:{id(self):x}" 1510 else: 1511 name = repr(self.name) 1512 return f"<{type(self).__name__} {name}>" 1513 1514 1515class TemplateModule: 1516 """Represents an imported template. All the exported names of the 1517 template are available as attributes on this object. Additionally 1518 converting it into a string renders the contents. 1519 """ 1520 1521 def __init__( 1522 self, 1523 template: Template, 1524 context: Context, 1525 body_stream: t.Optional[t.Iterable[str]] = None, 1526 ) -> None: 1527 if body_stream is None: 1528 if context.environment.is_async: 1529 raise RuntimeError( 1530 "Async mode requires a body stream to be passed to" 1531 " a template module. Use the async methods of the" 1532 " API you are using." 1533 ) 1534 1535 body_stream = list(template.root_render_func(context)) # type: ignore 1536 1537 self._body_stream = body_stream 1538 self.__dict__.update(context.get_exported()) 1539 self.__name__ = template.name 1540 1541 def __html__(self) -> Markup: 1542 return Markup(concat(self._body_stream)) 1543 1544 def __str__(self) -> str: 1545 return concat(self._body_stream) 1546 1547 def __repr__(self) -> str: 1548 if self.__name__ is None: 1549 name = f"memory:{id(self):x}" 1550 else: 1551 name = repr(self.__name__) 1552 return f"<{type(self).__name__} {name}>" 1553 1554 1555class TemplateExpression: 1556 """The :meth:`jinja2.Environment.compile_expression` method returns an 1557 instance of this object. It encapsulates the expression-like access 1558 to the template with an expression it wraps. 1559 """ 1560 1561 def __init__(self, template: Template, undefined_to_none: bool) -> None: 1562 self._template = template 1563 self._undefined_to_none = undefined_to_none 1564 1565 def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Optional[t.Any]: 1566 context = self._template.new_context(dict(*args, **kwargs)) 1567 consume(self._template.root_render_func(context)) # type: ignore 1568 rv = context.vars["result"] 1569 if self._undefined_to_none and isinstance(rv, Undefined): 1570 rv = None 1571 return rv 1572 1573 1574class TemplateStream: 1575 """A template stream works pretty much like an ordinary python generator 1576 but it can buffer multiple items to reduce the number of total iterations. 1577 Per default the output is unbuffered which means that for every unbuffered 1578 instruction in the template one string is yielded. 1579 1580 If buffering is enabled with a buffer size of 5, five items are combined 1581 into a new string. This is mainly useful if you are streaming 1582 big templates to a client via WSGI which flushes after each iteration. 1583 """ 1584 1585 def __init__(self, gen: t.Iterator[str]) -> None: 1586 self._gen = gen 1587 self.disable_buffering() 1588 1589 def dump( 1590 self, 1591 fp: t.Union[str, t.IO], 1592 encoding: t.Optional[str] = None, 1593 errors: t.Optional[str] = "strict", 1594 ) -> None: 1595 """Dump the complete stream into a file or file-like object. 1596 Per default strings are written, if you want to encode 1597 before writing specify an `encoding`. 1598 1599 Example usage:: 1600 1601 Template('Hello {{ name }}!').stream(name='foo').dump('hello.html') 1602 """ 1603 close = False 1604 1605 if isinstance(fp, str): 1606 if encoding is None: 1607 encoding = "utf-8" 1608 1609 fp = open(fp, "wb") 1610 close = True 1611 try: 1612 if encoding is not None: 1613 iterable = (x.encode(encoding, errors) for x in self) # type: ignore 1614 else: 1615 iterable = self # type: ignore 1616 1617 if hasattr(fp, "writelines"): 1618 fp.writelines(iterable) 1619 else: 1620 for item in iterable: 1621 fp.write(item) 1622 finally: 1623 if close: 1624 fp.close() 1625 1626 def disable_buffering(self) -> None: 1627 """Disable the output buffering.""" 1628 self._next = partial(next, self._gen) 1629 self.buffered = False 1630 1631 def _buffered_generator(self, size: int) -> t.Iterator[str]: 1632 buf: t.List[str] = [] 1633 c_size = 0 1634 push = buf.append 1635 1636 while True: 1637 try: 1638 while c_size < size: 1639 c = next(self._gen) 1640 push(c) 1641 if c: 1642 c_size += 1 1643 except StopIteration: 1644 if not c_size: 1645 return 1646 yield concat(buf) 1647 del buf[:] 1648 c_size = 0 1649 1650 def enable_buffering(self, size: int = 5) -> None: 1651 """Enable buffering. Buffer `size` items before yielding them.""" 1652 if size <= 1: 1653 raise ValueError("buffer size too small") 1654 1655 self.buffered = True 1656 self._next = partial(next, self._buffered_generator(size)) 1657 1658 def __iter__(self) -> "TemplateStream": 1659 return self 1660 1661 def __next__(self) -> str: 1662 return self._next() # type: ignore 1663 1664 1665# hook in default template class. if anyone reads this comment: ignore that 1666# it's possible to use custom templates ;-) 1667Environment.template_class = Template 1668