(root)/
Python-3.12.0/
Lib/
csv.py
       1  
       2  """
       3  csv.py - read/write/investigate CSV files
       4  """
       5  
       6  import re
       7  import types
       8  from _csv import Error, __version__, writer, reader, register_dialect, \
       9                   unregister_dialect, get_dialect, list_dialects, \
      10                   field_size_limit, \
      11                   QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \
      12                   QUOTE_STRINGS, QUOTE_NOTNULL, \
      13                   __doc__
      14  from _csv import Dialect as _Dialect
      15  
      16  from io import StringIO
      17  
      18  __all__ = ["QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE",
      19             "QUOTE_STRINGS", "QUOTE_NOTNULL",
      20             "Error", "Dialect", "__doc__", "excel", "excel_tab",
      21             "field_size_limit", "reader", "writer",
      22             "register_dialect", "get_dialect", "list_dialects", "Sniffer",
      23             "unregister_dialect", "__version__", "DictReader", "DictWriter",
      24             "unix_dialect"]
      25  
      26  class ESC[4;38;5;81mDialect:
      27      """Describe a CSV dialect.
      28  
      29      This must be subclassed (see csv.excel).  Valid attributes are:
      30      delimiter, quotechar, escapechar, doublequote, skipinitialspace,
      31      lineterminator, quoting.
      32  
      33      """
      34      _name = ""
      35      _valid = False
      36      # placeholders
      37      delimiter = None
      38      quotechar = None
      39      escapechar = None
      40      doublequote = None
      41      skipinitialspace = None
      42      lineterminator = None
      43      quoting = None
      44  
      45      def __init__(self):
      46          if self.__class__ != Dialect:
      47              self._valid = True
      48          self._validate()
      49  
      50      def _validate(self):
      51          try:
      52              _Dialect(self)
      53          except TypeError as e:
      54              # We do this for compatibility with py2.3
      55              raise Error(str(e))
      56  
      57  class ESC[4;38;5;81mexcel(ESC[4;38;5;149mDialect):
      58      """Describe the usual properties of Excel-generated CSV files."""
      59      delimiter = ','
      60      quotechar = '"'
      61      doublequote = True
      62      skipinitialspace = False
      63      lineterminator = '\r\n'
      64      quoting = QUOTE_MINIMAL
      65  register_dialect("excel", excel)
      66  
      67  class ESC[4;38;5;81mexcel_tab(ESC[4;38;5;149mexcel):
      68      """Describe the usual properties of Excel-generated TAB-delimited files."""
      69      delimiter = '\t'
      70  register_dialect("excel-tab", excel_tab)
      71  
      72  class ESC[4;38;5;81munix_dialect(ESC[4;38;5;149mDialect):
      73      """Describe the usual properties of Unix-generated CSV files."""
      74      delimiter = ','
      75      quotechar = '"'
      76      doublequote = True
      77      skipinitialspace = False
      78      lineterminator = '\n'
      79      quoting = QUOTE_ALL
      80  register_dialect("unix", unix_dialect)
      81  
      82  
      83  class ESC[4;38;5;81mDictReader:
      84      def __init__(self, f, fieldnames=None, restkey=None, restval=None,
      85                   dialect="excel", *args, **kwds):
      86          if fieldnames is not None and iter(fieldnames) is fieldnames:
      87              fieldnames = list(fieldnames)
      88          self._fieldnames = fieldnames   # list of keys for the dict
      89          self.restkey = restkey          # key to catch long rows
      90          self.restval = restval          # default value for short rows
      91          self.reader = reader(f, dialect, *args, **kwds)
      92          self.dialect = dialect
      93          self.line_num = 0
      94  
      95      def __iter__(self):
      96          return self
      97  
      98      @property
      99      def fieldnames(self):
     100          if self._fieldnames is None:
     101              try:
     102                  self._fieldnames = next(self.reader)
     103              except StopIteration:
     104                  pass
     105          self.line_num = self.reader.line_num
     106          return self._fieldnames
     107  
     108      @fieldnames.setter
     109      def fieldnames(self, value):
     110          self._fieldnames = value
     111  
     112      def __next__(self):
     113          if self.line_num == 0:
     114              # Used only for its side effect.
     115              self.fieldnames
     116          row = next(self.reader)
     117          self.line_num = self.reader.line_num
     118  
     119          # unlike the basic reader, we prefer not to return blanks,
     120          # because we will typically wind up with a dict full of None
     121          # values
     122          while row == []:
     123              row = next(self.reader)
     124          d = dict(zip(self.fieldnames, row))
     125          lf = len(self.fieldnames)
     126          lr = len(row)
     127          if lf < lr:
     128              d[self.restkey] = row[lf:]
     129          elif lf > lr:
     130              for key in self.fieldnames[lr:]:
     131                  d[key] = self.restval
     132          return d
     133  
     134      __class_getitem__ = classmethod(types.GenericAlias)
     135  
     136  
     137  class ESC[4;38;5;81mDictWriter:
     138      def __init__(self, f, fieldnames, restval="", extrasaction="raise",
     139                   dialect="excel", *args, **kwds):
     140          if fieldnames is not None and iter(fieldnames) is fieldnames:
     141              fieldnames = list(fieldnames)
     142          self.fieldnames = fieldnames    # list of keys for the dict
     143          self.restval = restval          # for writing short dicts
     144          extrasaction = extrasaction.lower()
     145          if extrasaction not in ("raise", "ignore"):
     146              raise ValueError("extrasaction (%s) must be 'raise' or 'ignore'"
     147                               % extrasaction)
     148          self.extrasaction = extrasaction
     149          self.writer = writer(f, dialect, *args, **kwds)
     150  
     151      def writeheader(self):
     152          header = dict(zip(self.fieldnames, self.fieldnames))
     153          return self.writerow(header)
     154  
     155      def _dict_to_list(self, rowdict):
     156          if self.extrasaction == "raise":
     157              wrong_fields = rowdict.keys() - self.fieldnames
     158              if wrong_fields:
     159                  raise ValueError("dict contains fields not in fieldnames: "
     160                                   + ", ".join([repr(x) for x in wrong_fields]))
     161          return (rowdict.get(key, self.restval) for key in self.fieldnames)
     162  
     163      def writerow(self, rowdict):
     164          return self.writer.writerow(self._dict_to_list(rowdict))
     165  
     166      def writerows(self, rowdicts):
     167          return self.writer.writerows(map(self._dict_to_list, rowdicts))
     168  
     169      __class_getitem__ = classmethod(types.GenericAlias)
     170  
     171  
     172  class ESC[4;38;5;81mSniffer:
     173      '''
     174      "Sniffs" the format of a CSV file (i.e. delimiter, quotechar)
     175      Returns a Dialect object.
     176      '''
     177      def __init__(self):
     178          # in case there is more than one possible delimiter
     179          self.preferred = [',', '\t', ';', ' ', ':']
     180  
     181  
     182      def sniff(self, sample, delimiters=None):
     183          """
     184          Returns a dialect (or None) corresponding to the sample
     185          """
     186  
     187          quotechar, doublequote, delimiter, skipinitialspace = \
     188                     self._guess_quote_and_delimiter(sample, delimiters)
     189          if not delimiter:
     190              delimiter, skipinitialspace = self._guess_delimiter(sample,
     191                                                                  delimiters)
     192  
     193          if not delimiter:
     194              raise Error("Could not determine delimiter")
     195  
     196          class ESC[4;38;5;81mdialect(ESC[4;38;5;149mDialect):
     197              _name = "sniffed"
     198              lineterminator = '\r\n'
     199              quoting = QUOTE_MINIMAL
     200              # escapechar = ''
     201  
     202          dialect.doublequote = doublequote
     203          dialect.delimiter = delimiter
     204          # _csv.reader won't accept a quotechar of ''
     205          dialect.quotechar = quotechar or '"'
     206          dialect.skipinitialspace = skipinitialspace
     207  
     208          return dialect
     209  
     210  
     211      def _guess_quote_and_delimiter(self, data, delimiters):
     212          """
     213          Looks for text enclosed between two identical quotes
     214          (the probable quotechar) which are preceded and followed
     215          by the same character (the probable delimiter).
     216          For example:
     217                           ,'some text',
     218          The quote with the most wins, same with the delimiter.
     219          If there is no quotechar the delimiter can't be determined
     220          this way.
     221          """
     222  
     223          matches = []
     224          for restr in (r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?",
     225                        r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)',   #  ".*?",
     226                        r'(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)',   # ,".*?"
     227                        r'(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'):                            #  ".*?" (no delim, no space)
     228              regexp = re.compile(restr, re.DOTALL | re.MULTILINE)
     229              matches = regexp.findall(data)
     230              if matches:
     231                  break
     232  
     233          if not matches:
     234              # (quotechar, doublequote, delimiter, skipinitialspace)
     235              return ('', False, None, 0)
     236          quotes = {}
     237          delims = {}
     238          spaces = 0
     239          groupindex = regexp.groupindex
     240          for m in matches:
     241              n = groupindex['quote'] - 1
     242              key = m[n]
     243              if key:
     244                  quotes[key] = quotes.get(key, 0) + 1
     245              try:
     246                  n = groupindex['delim'] - 1
     247                  key = m[n]
     248              except KeyError:
     249                  continue
     250              if key and (delimiters is None or key in delimiters):
     251                  delims[key] = delims.get(key, 0) + 1
     252              try:
     253                  n = groupindex['space'] - 1
     254              except KeyError:
     255                  continue
     256              if m[n]:
     257                  spaces += 1
     258  
     259          quotechar = max(quotes, key=quotes.get)
     260  
     261          if delims:
     262              delim = max(delims, key=delims.get)
     263              skipinitialspace = delims[delim] == spaces
     264              if delim == '\n': # most likely a file with a single column
     265                  delim = ''
     266          else:
     267              # there is *no* delimiter, it's a single column of quoted data
     268              delim = ''
     269              skipinitialspace = 0
     270  
     271          # if we see an extra quote between delimiters, we've got a
     272          # double quoted format
     273          dq_regexp = re.compile(
     274                                 r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \
     275                                 {'delim':re.escape(delim), 'quote':quotechar}, re.MULTILINE)
     276  
     277  
     278  
     279          if dq_regexp.search(data):
     280              doublequote = True
     281          else:
     282              doublequote = False
     283  
     284          return (quotechar, doublequote, delim, skipinitialspace)
     285  
     286  
     287      def _guess_delimiter(self, data, delimiters):
     288          """
     289          The delimiter /should/ occur the same number of times on
     290          each row. However, due to malformed data, it may not. We don't want
     291          an all or nothing approach, so we allow for small variations in this
     292          number.
     293            1) build a table of the frequency of each character on every line.
     294            2) build a table of frequencies of this frequency (meta-frequency?),
     295               e.g.  'x occurred 5 times in 10 rows, 6 times in 1000 rows,
     296               7 times in 2 rows'
     297            3) use the mode of the meta-frequency to determine the /expected/
     298               frequency for that character
     299            4) find out how often the character actually meets that goal
     300            5) the character that best meets its goal is the delimiter
     301          For performance reasons, the data is evaluated in chunks, so it can
     302          try and evaluate the smallest portion of the data possible, evaluating
     303          additional chunks as necessary.
     304          """
     305  
     306          data = list(filter(None, data.split('\n')))
     307  
     308          ascii = [chr(c) for c in range(127)] # 7-bit ASCII
     309  
     310          # build frequency tables
     311          chunkLength = min(10, len(data))
     312          iteration = 0
     313          charFrequency = {}
     314          modes = {}
     315          delims = {}
     316          start, end = 0, chunkLength
     317          while start < len(data):
     318              iteration += 1
     319              for line in data[start:end]:
     320                  for char in ascii:
     321                      metaFrequency = charFrequency.get(char, {})
     322                      # must count even if frequency is 0
     323                      freq = line.count(char)
     324                      # value is the mode
     325                      metaFrequency[freq] = metaFrequency.get(freq, 0) + 1
     326                      charFrequency[char] = metaFrequency
     327  
     328              for char in charFrequency.keys():
     329                  items = list(charFrequency[char].items())
     330                  if len(items) == 1 and items[0][0] == 0:
     331                      continue
     332                  # get the mode of the frequencies
     333                  if len(items) > 1:
     334                      modes[char] = max(items, key=lambda x: x[1])
     335                      # adjust the mode - subtract the sum of all
     336                      # other frequencies
     337                      items.remove(modes[char])
     338                      modes[char] = (modes[char][0], modes[char][1]
     339                                     - sum(item[1] for item in items))
     340                  else:
     341                      modes[char] = items[0]
     342  
     343              # build a list of possible delimiters
     344              modeList = modes.items()
     345              total = float(min(chunkLength * iteration, len(data)))
     346              # (rows of consistent data) / (number of rows) = 100%
     347              consistency = 1.0
     348              # minimum consistency threshold
     349              threshold = 0.9
     350              while len(delims) == 0 and consistency >= threshold:
     351                  for k, v in modeList:
     352                      if v[0] > 0 and v[1] > 0:
     353                          if ((v[1]/total) >= consistency and
     354                              (delimiters is None or k in delimiters)):
     355                              delims[k] = v
     356                  consistency -= 0.01
     357  
     358              if len(delims) == 1:
     359                  delim = list(delims.keys())[0]
     360                  skipinitialspace = (data[0].count(delim) ==
     361                                      data[0].count("%c " % delim))
     362                  return (delim, skipinitialspace)
     363  
     364              # analyze another chunkLength lines
     365              start = end
     366              end += chunkLength
     367  
     368          if not delims:
     369              return ('', 0)
     370  
     371          # if there's more than one, fall back to a 'preferred' list
     372          if len(delims) > 1:
     373              for d in self.preferred:
     374                  if d in delims.keys():
     375                      skipinitialspace = (data[0].count(d) ==
     376                                          data[0].count("%c " % d))
     377                      return (d, skipinitialspace)
     378  
     379          # nothing else indicates a preference, pick the character that
     380          # dominates(?)
     381          items = [(v,k) for (k,v) in delims.items()]
     382          items.sort()
     383          delim = items[-1][1]
     384  
     385          skipinitialspace = (data[0].count(delim) ==
     386                              data[0].count("%c " % delim))
     387          return (delim, skipinitialspace)
     388  
     389  
     390      def has_header(self, sample):
     391          # Creates a dictionary of types of data in each column. If any
     392          # column is of a single type (say, integers), *except* for the first
     393          # row, then the first row is presumed to be labels. If the type
     394          # can't be determined, it is assumed to be a string in which case
     395          # the length of the string is the determining factor: if all of the
     396          # rows except for the first are the same length, it's a header.
     397          # Finally, a 'vote' is taken at the end for each column, adding or
     398          # subtracting from the likelihood of the first row being a header.
     399  
     400          rdr = reader(StringIO(sample), self.sniff(sample))
     401  
     402          header = next(rdr) # assume first row is header
     403  
     404          columns = len(header)
     405          columnTypes = {}
     406          for i in range(columns): columnTypes[i] = None
     407  
     408          checked = 0
     409          for row in rdr:
     410              # arbitrary number of rows to check, to keep it sane
     411              if checked > 20:
     412                  break
     413              checked += 1
     414  
     415              if len(row) != columns:
     416                  continue # skip rows that have irregular number of columns
     417  
     418              for col in list(columnTypes.keys()):
     419                  thisType = complex
     420                  try:
     421                      thisType(row[col])
     422                  except (ValueError, OverflowError):
     423                      # fallback to length of string
     424                      thisType = len(row[col])
     425  
     426                  if thisType != columnTypes[col]:
     427                      if columnTypes[col] is None: # add new column type
     428                          columnTypes[col] = thisType
     429                      else:
     430                          # type is inconsistent, remove column from
     431                          # consideration
     432                          del columnTypes[col]
     433  
     434          # finally, compare results against first row and "vote"
     435          # on whether it's a header
     436          hasHeader = 0
     437          for col, colType in columnTypes.items():
     438              if isinstance(colType, int): # it's a length
     439                  if len(header[col]) != colType:
     440                      hasHeader += 1
     441                  else:
     442                      hasHeader -= 1
     443              else: # attempt typecast
     444                  try:
     445                      colType(header[col])
     446                  except (ValueError, TypeError):
     447                      hasHeader += 1
     448                  else:
     449                      hasHeader -= 1
     450  
     451          return hasHeader > 0