1 """zipimport provides support for importing Python modules from Zip archives.
2
3 This module exports three objects:
4 - zipimporter: a class; its constructor takes a path to a Zip archive.
5 - ZipImportError: exception raised by zipimporter objects. It's a
6 subclass of ImportError, so it can be caught as ImportError, too.
7 - _zip_directory_cache: a dict, mapping archive paths to zip directory
8 info dicts, as used in zipimporter._files.
9
10 It is usually not needed to use the zipimport module explicitly; it is
11 used by the builtin import mechanism for sys.path items that are paths
12 to Zip archives.
13 """
14
15 #from importlib import _bootstrap_external
16 #from importlib import _bootstrap # for _verbose_message
17 import _frozen_importlib_external as _bootstrap_external
18 from _frozen_importlib_external import _unpack_uint16, _unpack_uint32
19 import _frozen_importlib as _bootstrap # for _verbose_message
20 import _imp # for check_hash_based_pycs
21 import _io # for open
22 import marshal # for loads
23 import sys # for modules
24 import time # for mktime
25 import _warnings # For warn()
26
27 __all__ = ['ZipImportError', 'zipimporter']
28
29
30 path_sep = _bootstrap_external.path_sep
31 alt_path_sep = _bootstrap_external.path_separators[1:]
32
33
34 class ESC[4;38;5;81mZipImportError(ESC[4;38;5;149mImportError):
35 pass
36
37 # _read_directory() cache
38 _zip_directory_cache = {}
39
40 _module_type = type(sys)
41
42 END_CENTRAL_DIR_SIZE = 22
43 STRING_END_ARCHIVE = b'PK\x05\x06'
44 MAX_COMMENT_LEN = (1 << 16) - 1
45
46 class ESC[4;38;5;81mzipimporter(ESC[4;38;5;149m_bootstrap_externalESC[4;38;5;149m.ESC[4;38;5;149m_LoaderBasics):
47 """zipimporter(archivepath) -> zipimporter object
48
49 Create a new zipimporter instance. 'archivepath' must be a path to
50 a zipfile, or to a specific path inside a zipfile. For example, it can be
51 '/tmp/myimport.zip', or '/tmp/myimport.zip/mydirectory', if mydirectory is a
52 valid directory inside the archive.
53
54 'ZipImportError is raised if 'archivepath' doesn't point to a valid Zip
55 archive.
56
57 The 'archive' attribute of zipimporter objects contains the name of the
58 zipfile targeted.
59 """
60
61 # Split the "subdirectory" from the Zip archive path, lookup a matching
62 # entry in sys.path_importer_cache, fetch the file directory from there
63 # if found, or else read it from the archive.
64 def __init__(self, path):
65 if not isinstance(path, str):
66 raise TypeError(f"expected str, not {type(path)!r}")
67 if not path:
68 raise ZipImportError('archive path is empty', path=path)
69 if alt_path_sep:
70 path = path.replace(alt_path_sep, path_sep)
71
72 prefix = []
73 while True:
74 try:
75 st = _bootstrap_external._path_stat(path)
76 except (OSError, ValueError):
77 # On Windows a ValueError is raised for too long paths.
78 # Back up one path element.
79 dirname, basename = _bootstrap_external._path_split(path)
80 if dirname == path:
81 raise ZipImportError('not a Zip file', path=path)
82 path = dirname
83 prefix.append(basename)
84 else:
85 # it exists
86 if (st.st_mode & 0o170000) != 0o100000: # stat.S_ISREG
87 # it's a not file
88 raise ZipImportError('not a Zip file', path=path)
89 break
90
91 try:
92 files = _zip_directory_cache[path]
93 except KeyError:
94 files = _read_directory(path)
95 _zip_directory_cache[path] = files
96 self._files = files
97 self.archive = path
98 # a prefix directory following the ZIP file path.
99 self.prefix = _bootstrap_external._path_join(*prefix[::-1])
100 if self.prefix:
101 self.prefix += path_sep
102
103
104 # Check whether we can satisfy the import of the module named by
105 # 'fullname', or whether it could be a portion of a namespace
106 # package. Return self if we can load it, a string containing the
107 # full path if it's a possible namespace portion, None if we
108 # can't load it.
109 def find_loader(self, fullname, path=None):
110 """find_loader(fullname, path=None) -> self, str or None.
111
112 Search for a module specified by 'fullname'. 'fullname' must be the
113 fully qualified (dotted) module name. It returns the zipimporter
114 instance itself if the module was found, a string containing the
115 full path name if it's possibly a portion of a namespace package,
116 or None otherwise. The optional 'path' argument is ignored -- it's
117 there for compatibility with the importer protocol.
118
119 Deprecated since Python 3.10. Use find_spec() instead.
120 """
121 _warnings.warn("zipimporter.find_loader() is deprecated and slated for "
122 "removal in Python 3.12; use find_spec() instead",
123 DeprecationWarning)
124 mi = _get_module_info(self, fullname)
125 if mi is not None:
126 # This is a module or package.
127 return self, []
128
129 # Not a module or regular package. See if this is a directory, and
130 # therefore possibly a portion of a namespace package.
131
132 # We're only interested in the last path component of fullname
133 # earlier components are recorded in self.prefix.
134 modpath = _get_module_path(self, fullname)
135 if _is_dir(self, modpath):
136 # This is possibly a portion of a namespace
137 # package. Return the string representing its path,
138 # without a trailing separator.
139 return None, [f'{self.archive}{path_sep}{modpath}']
140
141 return None, []
142
143
144 # Check whether we can satisfy the import of the module named by
145 # 'fullname'. Return self if we can, None if we can't.
146 def find_module(self, fullname, path=None):
147 """find_module(fullname, path=None) -> self or None.
148
149 Search for a module specified by 'fullname'. 'fullname' must be the
150 fully qualified (dotted) module name. It returns the zipimporter
151 instance itself if the module was found, or None if it wasn't.
152 The optional 'path' argument is ignored -- it's there for compatibility
153 with the importer protocol.
154
155 Deprecated since Python 3.10. Use find_spec() instead.
156 """
157 _warnings.warn("zipimporter.find_module() is deprecated and slated for "
158 "removal in Python 3.12; use find_spec() instead",
159 DeprecationWarning)
160 return self.find_loader(fullname, path)[0]
161
162 def find_spec(self, fullname, target=None):
163 """Create a ModuleSpec for the specified module.
164
165 Returns None if the module cannot be found.
166 """
167 module_info = _get_module_info(self, fullname)
168 if module_info is not None:
169 return _bootstrap.spec_from_loader(fullname, self, is_package=module_info)
170 else:
171 # Not a module or regular package. See if this is a directory, and
172 # therefore possibly a portion of a namespace package.
173
174 # We're only interested in the last path component of fullname
175 # earlier components are recorded in self.prefix.
176 modpath = _get_module_path(self, fullname)
177 if _is_dir(self, modpath):
178 # This is possibly a portion of a namespace
179 # package. Return the string representing its path,
180 # without a trailing separator.
181 path = f'{self.archive}{path_sep}{modpath}'
182 spec = _bootstrap.ModuleSpec(name=fullname, loader=None,
183 is_package=True)
184 spec.submodule_search_locations.append(path)
185 return spec
186 else:
187 return None
188
189 def get_code(self, fullname):
190 """get_code(fullname) -> code object.
191
192 Return the code object for the specified module. Raise ZipImportError
193 if the module couldn't be imported.
194 """
195 code, ispackage, modpath = _get_module_code(self, fullname)
196 return code
197
198
199 def get_data(self, pathname):
200 """get_data(pathname) -> string with file data.
201
202 Return the data associated with 'pathname'. Raise OSError if
203 the file wasn't found.
204 """
205 if alt_path_sep:
206 pathname = pathname.replace(alt_path_sep, path_sep)
207
208 key = pathname
209 if pathname.startswith(self.archive + path_sep):
210 key = pathname[len(self.archive + path_sep):]
211
212 try:
213 toc_entry = self._files[key]
214 except KeyError:
215 raise OSError(0, '', key)
216 return _get_data(self.archive, toc_entry)
217
218
219 # Return a string matching __file__ for the named module
220 def get_filename(self, fullname):
221 """get_filename(fullname) -> filename string.
222
223 Return the filename for the specified module or raise ZipImportError
224 if it couldn't be imported.
225 """
226 # Deciding the filename requires working out where the code
227 # would come from if the module was actually loaded
228 code, ispackage, modpath = _get_module_code(self, fullname)
229 return modpath
230
231
232 def get_source(self, fullname):
233 """get_source(fullname) -> source string.
234
235 Return the source code for the specified module. Raise ZipImportError
236 if the module couldn't be found, return None if the archive does
237 contain the module, but has no source for it.
238 """
239 mi = _get_module_info(self, fullname)
240 if mi is None:
241 raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
242
243 path = _get_module_path(self, fullname)
244 if mi:
245 fullpath = _bootstrap_external._path_join(path, '__init__.py')
246 else:
247 fullpath = f'{path}.py'
248
249 try:
250 toc_entry = self._files[fullpath]
251 except KeyError:
252 # we have the module, but no source
253 return None
254 return _get_data(self.archive, toc_entry).decode()
255
256
257 # Return a bool signifying whether the module is a package or not.
258 def is_package(self, fullname):
259 """is_package(fullname) -> bool.
260
261 Return True if the module specified by fullname is a package.
262 Raise ZipImportError if the module couldn't be found.
263 """
264 mi = _get_module_info(self, fullname)
265 if mi is None:
266 raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
267 return mi
268
269
270 # Load and return the module named by 'fullname'.
271 def load_module(self, fullname):
272 """load_module(fullname) -> module.
273
274 Load the module specified by 'fullname'. 'fullname' must be the
275 fully qualified (dotted) module name. It returns the imported
276 module, or raises ZipImportError if it could not be imported.
277
278 Deprecated since Python 3.10. Use exec_module() instead.
279 """
280 msg = ("zipimport.zipimporter.load_module() is deprecated and slated for "
281 "removal in Python 3.12; use exec_module() instead")
282 _warnings.warn(msg, DeprecationWarning)
283 code, ispackage, modpath = _get_module_code(self, fullname)
284 mod = sys.modules.get(fullname)
285 if mod is None or not isinstance(mod, _module_type):
286 mod = _module_type(fullname)
287 sys.modules[fullname] = mod
288 mod.__loader__ = self
289
290 try:
291 if ispackage:
292 # add __path__ to the module *before* the code gets
293 # executed
294 path = _get_module_path(self, fullname)
295 fullpath = _bootstrap_external._path_join(self.archive, path)
296 mod.__path__ = [fullpath]
297
298 if not hasattr(mod, '__builtins__'):
299 mod.__builtins__ = __builtins__
300 _bootstrap_external._fix_up_module(mod.__dict__, fullname, modpath)
301 exec(code, mod.__dict__)
302 except:
303 del sys.modules[fullname]
304 raise
305
306 try:
307 mod = sys.modules[fullname]
308 except KeyError:
309 raise ImportError(f'Loaded module {fullname!r} not found in sys.modules')
310 _bootstrap._verbose_message('import {} # loaded from Zip {}', fullname, modpath)
311 return mod
312
313
314 def get_resource_reader(self, fullname):
315 """Return the ResourceReader for a package in a zip file.
316
317 If 'fullname' is a package within the zip file, return the
318 'ResourceReader' object for the package. Otherwise return None.
319 """
320 try:
321 if not self.is_package(fullname):
322 return None
323 except ZipImportError:
324 return None
325 from importlib.readers import ZipReader
326 return ZipReader(self, fullname)
327
328
329 def invalidate_caches(self):
330 """Reload the file data of the archive path."""
331 try:
332 self._files = _read_directory(self.archive)
333 _zip_directory_cache[self.archive] = self._files
334 except ZipImportError:
335 _zip_directory_cache.pop(self.archive, None)
336 self._files = {}
337
338
339 def __repr__(self):
340 return f'<zipimporter object "{self.archive}{path_sep}{self.prefix}">'
341
342
343 # _zip_searchorder defines how we search for a module in the Zip
344 # archive: we first search for a package __init__, then for
345 # non-package .pyc, and .py entries. The .pyc entries
346 # are swapped by initzipimport() if we run in optimized mode. Also,
347 # '/' is replaced by path_sep there.
348 _zip_searchorder = (
349 (path_sep + '__init__.pyc', True, True),
350 (path_sep + '__init__.py', False, True),
351 ('.pyc', True, False),
352 ('.py', False, False),
353 )
354
355 # Given a module name, return the potential file path in the
356 # archive (without extension).
357 def _get_module_path(self, fullname):
358 return self.prefix + fullname.rpartition('.')[2]
359
360 # Does this path represent a directory?
361 def _is_dir(self, path):
362 # See if this is a "directory". If so, it's eligible to be part
363 # of a namespace package. We test by seeing if the name, with an
364 # appended path separator, exists.
365 dirpath = path + path_sep
366 # If dirpath is present in self._files, we have a directory.
367 return dirpath in self._files
368
369 # Return some information about a module.
370 def _get_module_info(self, fullname):
371 path = _get_module_path(self, fullname)
372 for suffix, isbytecode, ispackage in _zip_searchorder:
373 fullpath = path + suffix
374 if fullpath in self._files:
375 return ispackage
376 return None
377
378
379 # implementation
380
381 # _read_directory(archive) -> files dict (new reference)
382 #
383 # Given a path to a Zip archive, build a dict, mapping file names
384 # (local to the archive, using SEP as a separator) to toc entries.
385 #
386 # A toc_entry is a tuple:
387 #
388 # (__file__, # value to use for __file__, available for all files,
389 # # encoded to the filesystem encoding
390 # compress, # compression kind; 0 for uncompressed
391 # data_size, # size of compressed data on disk
392 # file_size, # size of decompressed data
393 # file_offset, # offset of file header from start of archive
394 # time, # mod time of file (in dos format)
395 # date, # mod data of file (in dos format)
396 # crc, # crc checksum of the data
397 # )
398 #
399 # Directories can be recognized by the trailing path_sep in the name,
400 # data_size and file_offset are 0.
401 def _read_directory(archive):
402 try:
403 fp = _io.open_code(archive)
404 except OSError:
405 raise ZipImportError(f"can't open Zip file: {archive!r}", path=archive)
406
407 with fp:
408 # GH-87235: On macOS all file descriptors for /dev/fd/N share the same
409 # file offset, reset the file offset after scanning the zipfile diretory
410 # to not cause problems when some runs 'python3 /dev/fd/9 9<some_script'
411 start_offset = fp.tell()
412 try:
413 try:
414 fp.seek(-END_CENTRAL_DIR_SIZE, 2)
415 header_position = fp.tell()
416 buffer = fp.read(END_CENTRAL_DIR_SIZE)
417 except OSError:
418 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
419 if len(buffer) != END_CENTRAL_DIR_SIZE:
420 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
421 if buffer[:4] != STRING_END_ARCHIVE:
422 # Bad: End of Central Dir signature
423 # Check if there's a comment.
424 try:
425 fp.seek(0, 2)
426 file_size = fp.tell()
427 except OSError:
428 raise ZipImportError(f"can't read Zip file: {archive!r}",
429 path=archive)
430 max_comment_start = max(file_size - MAX_COMMENT_LEN -
431 END_CENTRAL_DIR_SIZE, 0)
432 try:
433 fp.seek(max_comment_start)
434 data = fp.read()
435 except OSError:
436 raise ZipImportError(f"can't read Zip file: {archive!r}",
437 path=archive)
438 pos = data.rfind(STRING_END_ARCHIVE)
439 if pos < 0:
440 raise ZipImportError(f'not a Zip file: {archive!r}',
441 path=archive)
442 buffer = data[pos:pos+END_CENTRAL_DIR_SIZE]
443 if len(buffer) != END_CENTRAL_DIR_SIZE:
444 raise ZipImportError(f"corrupt Zip file: {archive!r}",
445 path=archive)
446 header_position = file_size - len(data) + pos
447
448 header_size = _unpack_uint32(buffer[12:16])
449 header_offset = _unpack_uint32(buffer[16:20])
450 if header_position < header_size:
451 raise ZipImportError(f'bad central directory size: {archive!r}', path=archive)
452 if header_position < header_offset:
453 raise ZipImportError(f'bad central directory offset: {archive!r}', path=archive)
454 header_position -= header_size
455 arc_offset = header_position - header_offset
456 if arc_offset < 0:
457 raise ZipImportError(f'bad central directory size or offset: {archive!r}', path=archive)
458
459 files = {}
460 # Start of Central Directory
461 count = 0
462 try:
463 fp.seek(header_position)
464 except OSError:
465 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
466 while True:
467 buffer = fp.read(46)
468 if len(buffer) < 4:
469 raise EOFError('EOF read where not expected')
470 # Start of file header
471 if buffer[:4] != b'PK\x01\x02':
472 break # Bad: Central Dir File Header
473 if len(buffer) != 46:
474 raise EOFError('EOF read where not expected')
475 flags = _unpack_uint16(buffer[8:10])
476 compress = _unpack_uint16(buffer[10:12])
477 time = _unpack_uint16(buffer[12:14])
478 date = _unpack_uint16(buffer[14:16])
479 crc = _unpack_uint32(buffer[16:20])
480 data_size = _unpack_uint32(buffer[20:24])
481 file_size = _unpack_uint32(buffer[24:28])
482 name_size = _unpack_uint16(buffer[28:30])
483 extra_size = _unpack_uint16(buffer[30:32])
484 comment_size = _unpack_uint16(buffer[32:34])
485 file_offset = _unpack_uint32(buffer[42:46])
486 header_size = name_size + extra_size + comment_size
487 if file_offset > header_offset:
488 raise ZipImportError(f'bad local header offset: {archive!r}', path=archive)
489 file_offset += arc_offset
490
491 try:
492 name = fp.read(name_size)
493 except OSError:
494 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
495 if len(name) != name_size:
496 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
497 # On Windows, calling fseek to skip over the fields we don't use is
498 # slower than reading the data because fseek flushes stdio's
499 # internal buffers. See issue #8745.
500 try:
501 if len(fp.read(header_size - name_size)) != header_size - name_size:
502 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
503 except OSError:
504 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
505
506 if flags & 0x800:
507 # UTF-8 file names extension
508 name = name.decode()
509 else:
510 # Historical ZIP filename encoding
511 try:
512 name = name.decode('ascii')
513 except UnicodeDecodeError:
514 name = name.decode('latin1').translate(cp437_table)
515
516 name = name.replace('/', path_sep)
517 path = _bootstrap_external._path_join(archive, name)
518 t = (path, compress, data_size, file_size, file_offset, time, date, crc)
519 files[name] = t
520 count += 1
521 finally:
522 fp.seek(start_offset)
523 _bootstrap._verbose_message('zipimport: found {} names in {!r}', count, archive)
524 return files
525
526 # During bootstrap, we may need to load the encodings
527 # package from a ZIP file. But the cp437 encoding is implemented
528 # in Python in the encodings package.
529 #
530 # Break out of this dependency by using the translation table for
531 # the cp437 encoding.
532 cp437_table = (
533 # ASCII part, 8 rows x 16 chars
534 '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
535 '\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
536 ' !"#$%&\'()*+,-./'
537 '0123456789:;<=>?'
538 '@ABCDEFGHIJKLMNO'
539 'PQRSTUVWXYZ[\\]^_'
540 '`abcdefghijklmno'
541 'pqrstuvwxyz{|}~\x7f'
542 # non-ASCII part, 16 rows x 8 chars
543 '\xc7\xfc\xe9\xe2\xe4\xe0\xe5\xe7'
544 '\xea\xeb\xe8\xef\xee\xec\xc4\xc5'
545 '\xc9\xe6\xc6\xf4\xf6\xf2\xfb\xf9'
546 '\xff\xd6\xdc\xa2\xa3\xa5\u20a7\u0192'
547 '\xe1\xed\xf3\xfa\xf1\xd1\xaa\xba'
548 '\xbf\u2310\xac\xbd\xbc\xa1\xab\xbb'
549 '\u2591\u2592\u2593\u2502\u2524\u2561\u2562\u2556'
550 '\u2555\u2563\u2551\u2557\u255d\u255c\u255b\u2510'
551 '\u2514\u2534\u252c\u251c\u2500\u253c\u255e\u255f'
552 '\u255a\u2554\u2569\u2566\u2560\u2550\u256c\u2567'
553 '\u2568\u2564\u2565\u2559\u2558\u2552\u2553\u256b'
554 '\u256a\u2518\u250c\u2588\u2584\u258c\u2590\u2580'
555 '\u03b1\xdf\u0393\u03c0\u03a3\u03c3\xb5\u03c4'
556 '\u03a6\u0398\u03a9\u03b4\u221e\u03c6\u03b5\u2229'
557 '\u2261\xb1\u2265\u2264\u2320\u2321\xf7\u2248'
558 '\xb0\u2219\xb7\u221a\u207f\xb2\u25a0\xa0'
559 )
560
561 _importing_zlib = False
562
563 # Return the zlib.decompress function object, or NULL if zlib couldn't
564 # be imported. The function is cached when found, so subsequent calls
565 # don't import zlib again.
566 def _get_decompress_func():
567 global _importing_zlib
568 if _importing_zlib:
569 # Someone has a zlib.py[co] in their Zip file
570 # let's avoid a stack overflow.
571 _bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
572 raise ZipImportError("can't decompress data; zlib not available")
573
574 _importing_zlib = True
575 try:
576 from zlib import decompress
577 except Exception:
578 _bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
579 raise ZipImportError("can't decompress data; zlib not available")
580 finally:
581 _importing_zlib = False
582
583 _bootstrap._verbose_message('zipimport: zlib available')
584 return decompress
585
586 # Given a path to a Zip file and a toc_entry, return the (uncompressed) data.
587 def _get_data(archive, toc_entry):
588 datapath, compress, data_size, file_size, file_offset, time, date, crc = toc_entry
589 if data_size < 0:
590 raise ZipImportError('negative data size')
591
592 with _io.open_code(archive) as fp:
593 # Check to make sure the local file header is correct
594 try:
595 fp.seek(file_offset)
596 except OSError:
597 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
598 buffer = fp.read(30)
599 if len(buffer) != 30:
600 raise EOFError('EOF read where not expected')
601
602 if buffer[:4] != b'PK\x03\x04':
603 # Bad: Local File Header
604 raise ZipImportError(f'bad local file header: {archive!r}', path=archive)
605
606 name_size = _unpack_uint16(buffer[26:28])
607 extra_size = _unpack_uint16(buffer[28:30])
608 header_size = 30 + name_size + extra_size
609 file_offset += header_size # Start of file data
610 try:
611 fp.seek(file_offset)
612 except OSError:
613 raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
614 raw_data = fp.read(data_size)
615 if len(raw_data) != data_size:
616 raise OSError("zipimport: can't read data")
617
618 if compress == 0:
619 # data is not compressed
620 return raw_data
621
622 # Decompress with zlib
623 try:
624 decompress = _get_decompress_func()
625 except Exception:
626 raise ZipImportError("can't decompress data; zlib not available")
627 return decompress(raw_data, -15)
628
629
630 # Lenient date/time comparison function. The precision of the mtime
631 # in the archive is lower than the mtime stored in a .pyc: we
632 # must allow a difference of at most one second.
633 def _eq_mtime(t1, t2):
634 # dostime only stores even seconds, so be lenient
635 return abs(t1 - t2) <= 1
636
637
638 # Given the contents of a .py[co] file, unmarshal the data
639 # and return the code object. Raises ImportError it the magic word doesn't
640 # match, or if the recorded .py[co] metadata does not match the source.
641 def _unmarshal_code(self, pathname, fullpath, fullname, data):
642 exc_details = {
643 'name': fullname,
644 'path': fullpath,
645 }
646
647 flags = _bootstrap_external._classify_pyc(data, fullname, exc_details)
648
649 hash_based = flags & 0b1 != 0
650 if hash_based:
651 check_source = flags & 0b10 != 0
652 if (_imp.check_hash_based_pycs != 'never' and
653 (check_source or _imp.check_hash_based_pycs == 'always')):
654 source_bytes = _get_pyc_source(self, fullpath)
655 if source_bytes is not None:
656 source_hash = _imp.source_hash(
657 _bootstrap_external._RAW_MAGIC_NUMBER,
658 source_bytes,
659 )
660
661 _bootstrap_external._validate_hash_pyc(
662 data, source_hash, fullname, exc_details)
663 else:
664 source_mtime, source_size = \
665 _get_mtime_and_size_of_source(self, fullpath)
666
667 if source_mtime:
668 # We don't use _bootstrap_external._validate_timestamp_pyc
669 # to allow for a more lenient timestamp check.
670 if (not _eq_mtime(_unpack_uint32(data[8:12]), source_mtime) or
671 _unpack_uint32(data[12:16]) != source_size):
672 _bootstrap._verbose_message(
673 f'bytecode is stale for {fullname!r}')
674 return None
675
676 code = marshal.loads(data[16:])
677 if not isinstance(code, _code_type):
678 raise TypeError(f'compiled module {pathname!r} is not a code object')
679 return code
680
681 _code_type = type(_unmarshal_code.__code__)
682
683
684 # Replace any occurrences of '\r\n?' in the input string with '\n'.
685 # This converts DOS and Mac line endings to Unix line endings.
686 def _normalize_line_endings(source):
687 source = source.replace(b'\r\n', b'\n')
688 source = source.replace(b'\r', b'\n')
689 return source
690
691 # Given a string buffer containing Python source code, compile it
692 # and return a code object.
693 def _compile_source(pathname, source):
694 source = _normalize_line_endings(source)
695 return compile(source, pathname, 'exec', dont_inherit=True)
696
697 # Convert the date/time values found in the Zip archive to a value
698 # that's compatible with the time stamp stored in .pyc files.
699 def _parse_dostime(d, t):
700 return time.mktime((
701 (d >> 9) + 1980, # bits 9..15: year
702 (d >> 5) & 0xF, # bits 5..8: month
703 d & 0x1F, # bits 0..4: day
704 t >> 11, # bits 11..15: hours
705 (t >> 5) & 0x3F, # bits 8..10: minutes
706 (t & 0x1F) * 2, # bits 0..7: seconds / 2
707 -1, -1, -1))
708
709 # Given a path to a .pyc file in the archive, return the
710 # modification time of the matching .py file and its size,
711 # or (0, 0) if no source is available.
712 def _get_mtime_and_size_of_source(self, path):
713 try:
714 # strip 'c' or 'o' from *.py[co]
715 assert path[-1:] in ('c', 'o')
716 path = path[:-1]
717 toc_entry = self._files[path]
718 # fetch the time stamp of the .py file for comparison
719 # with an embedded pyc time stamp
720 time = toc_entry[5]
721 date = toc_entry[6]
722 uncompressed_size = toc_entry[3]
723 return _parse_dostime(date, time), uncompressed_size
724 except (KeyError, IndexError, TypeError):
725 return 0, 0
726
727
728 # Given a path to a .pyc file in the archive, return the
729 # contents of the matching .py file, or None if no source
730 # is available.
731 def _get_pyc_source(self, path):
732 # strip 'c' or 'o' from *.py[co]
733 assert path[-1:] in ('c', 'o')
734 path = path[:-1]
735
736 try:
737 toc_entry = self._files[path]
738 except KeyError:
739 return None
740 else:
741 return _get_data(self.archive, toc_entry)
742
743
744 # Get the code object associated with the module specified by
745 # 'fullname'.
746 def _get_module_code(self, fullname):
747 path = _get_module_path(self, fullname)
748 import_error = None
749 for suffix, isbytecode, ispackage in _zip_searchorder:
750 fullpath = path + suffix
751 _bootstrap._verbose_message('trying {}{}{}', self.archive, path_sep, fullpath, verbosity=2)
752 try:
753 toc_entry = self._files[fullpath]
754 except KeyError:
755 pass
756 else:
757 modpath = toc_entry[0]
758 data = _get_data(self.archive, toc_entry)
759 code = None
760 if isbytecode:
761 try:
762 code = _unmarshal_code(self, modpath, fullpath, fullname, data)
763 except ImportError as exc:
764 import_error = exc
765 else:
766 code = _compile_source(modpath, data)
767 if code is None:
768 # bad magic number or non-matching mtime
769 # in byte code, try next
770 continue
771 modpath = toc_entry[0]
772 return code, ispackage, modpath
773 else:
774 if import_error:
775 msg = f"module load failed: {import_error}"
776 raise ZipImportError(msg, name=fullname) from import_error
777 else:
778 raise ZipImportError(f"can't find module {fullname!r}", name=fullname)