(root)/
Python-3.11.7/
Lib/
csv.py
       1  
       2  """
       3  csv.py - read/write/investigate CSV files
       4  """
       5  
       6  import re
       7  from _csv import Error, __version__, writer, reader, register_dialect, \
       8                   unregister_dialect, get_dialect, list_dialects, \
       9                   field_size_limit, \
      10                   QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \
      11                   __doc__
      12  from _csv import Dialect as _Dialect
      13  
      14  from io import StringIO
      15  
      16  __all__ = ["QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE",
      17             "Error", "Dialect", "__doc__", "excel", "excel_tab",
      18             "field_size_limit", "reader", "writer",
      19             "register_dialect", "get_dialect", "list_dialects", "Sniffer",
      20             "unregister_dialect", "__version__", "DictReader", "DictWriter",
      21             "unix_dialect"]
      22  
      23  class ESC[4;38;5;81mDialect:
      24      """Describe a CSV dialect.
      25  
      26      This must be subclassed (see csv.excel).  Valid attributes are:
      27      delimiter, quotechar, escapechar, doublequote, skipinitialspace,
      28      lineterminator, quoting.
      29  
      30      """
      31      _name = ""
      32      _valid = False
      33      # placeholders
      34      delimiter = None
      35      quotechar = None
      36      escapechar = None
      37      doublequote = None
      38      skipinitialspace = None
      39      lineterminator = None
      40      quoting = None
      41  
      42      def __init__(self):
      43          if self.__class__ != Dialect:
      44              self._valid = True
      45          self._validate()
      46  
      47      def _validate(self):
      48          try:
      49              _Dialect(self)
      50          except TypeError as e:
      51              # We do this for compatibility with py2.3
      52              raise Error(str(e))
      53  
      54  class ESC[4;38;5;81mexcel(ESC[4;38;5;149mDialect):
      55      """Describe the usual properties of Excel-generated CSV files."""
      56      delimiter = ','
      57      quotechar = '"'
      58      doublequote = True
      59      skipinitialspace = False
      60      lineterminator = '\r\n'
      61      quoting = QUOTE_MINIMAL
      62  register_dialect("excel", excel)
      63  
      64  class ESC[4;38;5;81mexcel_tab(ESC[4;38;5;149mexcel):
      65      """Describe the usual properties of Excel-generated TAB-delimited files."""
      66      delimiter = '\t'
      67  register_dialect("excel-tab", excel_tab)
      68  
      69  class ESC[4;38;5;81munix_dialect(ESC[4;38;5;149mDialect):
      70      """Describe the usual properties of Unix-generated CSV files."""
      71      delimiter = ','
      72      quotechar = '"'
      73      doublequote = True
      74      skipinitialspace = False
      75      lineterminator = '\n'
      76      quoting = QUOTE_ALL
      77  register_dialect("unix", unix_dialect)
      78  
      79  
      80  class ESC[4;38;5;81mDictReader:
      81      def __init__(self, f, fieldnames=None, restkey=None, restval=None,
      82                   dialect="excel", *args, **kwds):
      83          self._fieldnames = fieldnames   # list of keys for the dict
      84          self.restkey = restkey          # key to catch long rows
      85          self.restval = restval          # default value for short rows
      86          self.reader = reader(f, dialect, *args, **kwds)
      87          self.dialect = dialect
      88          self.line_num = 0
      89  
      90      def __iter__(self):
      91          return self
      92  
      93      @property
      94      def fieldnames(self):
      95          if self._fieldnames is None:
      96              try:
      97                  self._fieldnames = next(self.reader)
      98              except StopIteration:
      99                  pass
     100          self.line_num = self.reader.line_num
     101          return self._fieldnames
     102  
     103      @fieldnames.setter
     104      def fieldnames(self, value):
     105          self._fieldnames = value
     106  
     107      def __next__(self):
     108          if self.line_num == 0:
     109              # Used only for its side effect.
     110              self.fieldnames
     111          row = next(self.reader)
     112          self.line_num = self.reader.line_num
     113  
     114          # unlike the basic reader, we prefer not to return blanks,
     115          # because we will typically wind up with a dict full of None
     116          # values
     117          while row == []:
     118              row = next(self.reader)
     119          d = dict(zip(self.fieldnames, row))
     120          lf = len(self.fieldnames)
     121          lr = len(row)
     122          if lf < lr:
     123              d[self.restkey] = row[lf:]
     124          elif lf > lr:
     125              for key in self.fieldnames[lr:]:
     126                  d[key] = self.restval
     127          return d
     128  
     129  
     130  class ESC[4;38;5;81mDictWriter:
     131      def __init__(self, f, fieldnames, restval="", extrasaction="raise",
     132                   dialect="excel", *args, **kwds):
     133          self.fieldnames = fieldnames    # list of keys for the dict
     134          self.restval = restval          # for writing short dicts
     135          if extrasaction.lower() not in ("raise", "ignore"):
     136              raise ValueError("extrasaction (%s) must be 'raise' or 'ignore'"
     137                               % extrasaction)
     138          self.extrasaction = extrasaction
     139          self.writer = writer(f, dialect, *args, **kwds)
     140  
     141      def writeheader(self):
     142          header = dict(zip(self.fieldnames, self.fieldnames))
     143          return self.writerow(header)
     144  
     145      def _dict_to_list(self, rowdict):
     146          if self.extrasaction == "raise":
     147              wrong_fields = rowdict.keys() - self.fieldnames
     148              if wrong_fields:
     149                  raise ValueError("dict contains fields not in fieldnames: "
     150                                   + ", ".join([repr(x) for x in wrong_fields]))
     151          return (rowdict.get(key, self.restval) for key in self.fieldnames)
     152  
     153      def writerow(self, rowdict):
     154          return self.writer.writerow(self._dict_to_list(rowdict))
     155  
     156      def writerows(self, rowdicts):
     157          return self.writer.writerows(map(self._dict_to_list, rowdicts))
     158  
     159  # Guard Sniffer's type checking against builds that exclude complex()
     160  try:
     161      complex
     162  except NameError:
     163      complex = float
     164  
     165  class ESC[4;38;5;81mSniffer:
     166      '''
     167      "Sniffs" the format of a CSV file (i.e. delimiter, quotechar)
     168      Returns a Dialect object.
     169      '''
     170      def __init__(self):
     171          # in case there is more than one possible delimiter
     172          self.preferred = [',', '\t', ';', ' ', ':']
     173  
     174  
     175      def sniff(self, sample, delimiters=None):
     176          """
     177          Returns a dialect (or None) corresponding to the sample
     178          """
     179  
     180          quotechar, doublequote, delimiter, skipinitialspace = \
     181                     self._guess_quote_and_delimiter(sample, delimiters)
     182          if not delimiter:
     183              delimiter, skipinitialspace = self._guess_delimiter(sample,
     184                                                                  delimiters)
     185  
     186          if not delimiter:
     187              raise Error("Could not determine delimiter")
     188  
     189          class ESC[4;38;5;81mdialect(ESC[4;38;5;149mDialect):
     190              _name = "sniffed"
     191              lineterminator = '\r\n'
     192              quoting = QUOTE_MINIMAL
     193              # escapechar = ''
     194  
     195          dialect.doublequote = doublequote
     196          dialect.delimiter = delimiter
     197          # _csv.reader won't accept a quotechar of ''
     198          dialect.quotechar = quotechar or '"'
     199          dialect.skipinitialspace = skipinitialspace
     200  
     201          return dialect
     202  
     203  
     204      def _guess_quote_and_delimiter(self, data, delimiters):
     205          """
     206          Looks for text enclosed between two identical quotes
     207          (the probable quotechar) which are preceded and followed
     208          by the same character (the probable delimiter).
     209          For example:
     210                           ,'some text',
     211          The quote with the most wins, same with the delimiter.
     212          If there is no quotechar the delimiter can't be determined
     213          this way.
     214          """
     215  
     216          matches = []
     217          for restr in (r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?",
     218                        r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)',   #  ".*?",
     219                        r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)',   # ,".*?"
     220                        r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'):                            #  ".*?" (no delim, no space)
     221              regexp = re.compile(restr, re.DOTALL | re.MULTILINE)
     222              matches = regexp.findall(data)
     223              if matches:
     224                  break
     225  
     226          if not matches:
     227              # (quotechar, doublequote, delimiter, skipinitialspace)
     228              return ('', False, None, 0)
     229          quotes = {}
     230          delims = {}
     231          spaces = 0
     232          groupindex = regexp.groupindex
     233          for m in matches:
     234              n = groupindex['quote'] - 1
     235              key = m[n]
     236              if key:
     237                  quotes[key] = quotes.get(key, 0) + 1
     238              try:
     239                  n = groupindex['delim'] - 1
     240                  key = m[n]
     241              except KeyError:
     242                  continue
     243              if key and (delimiters is None or key in delimiters):
     244                  delims[key] = delims.get(key, 0) + 1
     245              try:
     246                  n = groupindex['space'] - 1
     247              except KeyError:
     248                  continue
     249              if m[n]:
     250                  spaces += 1
     251  
     252          quotechar = max(quotes, key=quotes.get)
     253  
     254          if delims:
     255              delim = max(delims, key=delims.get)
     256              skipinitialspace = delims[delim] == spaces
     257              if delim == '\n': # most likely a file with a single column
     258                  delim = ''
     259          else:
     260              # there is *no* delimiter, it's a single column of quoted data
     261              delim = ''
     262              skipinitialspace = 0
     263  
     264          # if we see an extra quote between delimiters, we've got a
     265          # double quoted format
     266          dq_regexp = re.compile(
     267                                 r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \
     268                                 {'delim':re.escape(delim), 'quote':quotechar}, re.MULTILINE)
     269  
     270  
     271  
     272          if dq_regexp.search(data):
     273              doublequote = True
     274          else:
     275              doublequote = False
     276  
     277          return (quotechar, doublequote, delim, skipinitialspace)
     278  
     279  
     280      def _guess_delimiter(self, data, delimiters):
     281          """
     282          The delimiter /should/ occur the same number of times on
     283          each row. However, due to malformed data, it may not. We don't want
     284          an all or nothing approach, so we allow for small variations in this
     285          number.
     286            1) build a table of the frequency of each character on every line.
     287            2) build a table of frequencies of this frequency (meta-frequency?),
     288               e.g.  'x occurred 5 times in 10 rows, 6 times in 1000 rows,
     289               7 times in 2 rows'
     290            3) use the mode of the meta-frequency to determine the /expected/
     291               frequency for that character
     292            4) find out how often the character actually meets that goal
     293            5) the character that best meets its goal is the delimiter
     294          For performance reasons, the data is evaluated in chunks, so it can
     295          try and evaluate the smallest portion of the data possible, evaluating
     296          additional chunks as necessary.
     297          """
     298  
     299          data = list(filter(None, data.split('\n')))
     300  
     301          ascii = [chr(c) for c in range(127)] # 7-bit ASCII
     302  
     303          # build frequency tables
     304          chunkLength = min(10, len(data))
     305          iteration = 0
     306          charFrequency = {}
     307          modes = {}
     308          delims = {}
     309          start, end = 0, chunkLength
     310          while start < len(data):
     311              iteration += 1
     312              for line in data[start:end]:
     313                  for char in ascii:
     314                      metaFrequency = charFrequency.get(char, {})
     315                      # must count even if frequency is 0
     316                      freq = line.count(char)
     317                      # value is the mode
     318                      metaFrequency[freq] = metaFrequency.get(freq, 0) + 1
     319                      charFrequency[char] = metaFrequency
     320  
     321              for char in charFrequency.keys():
     322                  items = list(charFrequency[char].items())
     323                  if len(items) == 1 and items[0][0] == 0:
     324                      continue
     325                  # get the mode of the frequencies
     326                  if len(items) > 1:
     327                      modes[char] = max(items, key=lambda x: x[1])
     328                      # adjust the mode - subtract the sum of all
     329                      # other frequencies
     330                      items.remove(modes[char])
     331                      modes[char] = (modes[char][0], modes[char][1]
     332                                     - sum(item[1] for item in items))
     333                  else:
     334                      modes[char] = items[0]
     335  
     336              # build a list of possible delimiters
     337              modeList = modes.items()
     338              total = float(min(chunkLength * iteration, len(data)))
     339              # (rows of consistent data) / (number of rows) = 100%
     340              consistency = 1.0
     341              # minimum consistency threshold
     342              threshold = 0.9
     343              while len(delims) == 0 and consistency >= threshold:
     344                  for k, v in modeList:
     345                      if v[0] > 0 and v[1] > 0:
     346                          if ((v[1]/total) >= consistency and
     347                              (delimiters is None or k in delimiters)):
     348                              delims[k] = v
     349                  consistency -= 0.01
     350  
     351              if len(delims) == 1:
     352                  delim = list(delims.keys())[0]
     353                  skipinitialspace = (data[0].count(delim) ==
     354                                      data[0].count("%c " % delim))
     355                  return (delim, skipinitialspace)
     356  
     357              # analyze another chunkLength lines
     358              start = end
     359              end += chunkLength
     360  
     361          if not delims:
     362              return ('', 0)
     363  
     364          # if there's more than one, fall back to a 'preferred' list
     365          if len(delims) > 1:
     366              for d in self.preferred:
     367                  if d in delims.keys():
     368                      skipinitialspace = (data[0].count(d) ==
     369                                          data[0].count("%c " % d))
     370                      return (d, skipinitialspace)
     371  
     372          # nothing else indicates a preference, pick the character that
     373          # dominates(?)
     374          items = [(v,k) for (k,v) in delims.items()]
     375          items.sort()
     376          delim = items[-1][1]
     377  
     378          skipinitialspace = (data[0].count(delim) ==
     379                              data[0].count("%c " % delim))
     380          return (delim, skipinitialspace)
     381  
     382  
     383      def has_header(self, sample):
     384          # Creates a dictionary of types of data in each column. If any
     385          # column is of a single type (say, integers), *except* for the first
     386          # row, then the first row is presumed to be labels. If the type
     387          # can't be determined, it is assumed to be a string in which case
     388          # the length of the string is the determining factor: if all of the
     389          # rows except for the first are the same length, it's a header.
     390          # Finally, a 'vote' is taken at the end for each column, adding or
     391          # subtracting from the likelihood of the first row being a header.
     392  
     393          rdr = reader(StringIO(sample), self.sniff(sample))
     394  
     395          header = next(rdr) # assume first row is header
     396  
     397          columns = len(header)
     398          columnTypes = {}
     399          for i in range(columns): columnTypes[i] = None
     400  
     401          checked = 0
     402          for row in rdr:
     403              # arbitrary number of rows to check, to keep it sane
     404              if checked > 20:
     405                  break
     406              checked += 1
     407  
     408              if len(row) != columns:
     409                  continue # skip rows that have irregular number of columns
     410  
     411              for col in list(columnTypes.keys()):
     412                  thisType = complex
     413                  try:
     414                      thisType(row[col])
     415                  except (ValueError, OverflowError):
     416                      # fallback to length of string
     417                      thisType = len(row[col])
     418  
     419                  if thisType != columnTypes[col]:
     420                      if columnTypes[col] is None: # add new column type
     421                          columnTypes[col] = thisType
     422                      else:
     423                          # type is inconsistent, remove column from
     424                          # consideration
     425                          del columnTypes[col]
     426  
     427          # finally, compare results against first row and "vote"
     428          # on whether it's a header
     429          hasHeader = 0
     430          for col, colType in columnTypes.items():
     431              if type(colType) == type(0): # it's a length
     432                  if len(header[col]) != colType:
     433                      hasHeader += 1
     434                  else:
     435                      hasHeader -= 1
     436              else: # attempt typecast
     437                  try:
     438                      colType(header[col])
     439                  except (ValueError, TypeError):
     440                      hasHeader += 1
     441                  else:
     442                      hasHeader -= 1
     443  
     444          return hasHeader > 0