(root)/
gcc-13.2.0/
libgfortran/
io/
async.c
       1  /* Copyright (C) 2018-2023 Free Software Foundation, Inc.
       2     Contributed by Nicolas Koenig
       3  
       4     This file is part of the GNU Fortran runtime library (libgfortran).
       5  
       6     Libgfortran is free software; you can redistribute it and/or modify
       7     it under the terms of the GNU General Public License as published by
       8     the Free Software Foundation; either version 3, or (at your option)
       9     any later version.
      10  
      11     Libgfortran is distributed in the hope that it will be useful,
      12     but WITHOUT ANY WARRANTY; without even the implied warranty of
      13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14     GNU General Public License for more details.
      15  
      16     Under Section 7 of GPL version 3, you are granted additional
      17     permissions described in the GCC Runtime Library Exception, version
      18     3.1, as published by the Free Software Foundation.
      19  
      20     You should have received a copy of the GNU General Public License and
      21     a copy of the GCC Runtime Library Exception along with this program;
      22     see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
      23     <http://www.gnu.org/licenses/>.  */
      24  
      25  #include "libgfortran.h"
      26  
      27  #define _GTHREAD_USE_COND_INIT_FUNC
      28  #include "../../libgcc/gthr.h"
      29  #include "io.h"
      30  #include "fbuf.h"
      31  #include "format.h"
      32  #include "unix.h"
      33  #include <string.h>
      34  #include <assert.h>
      35  
      36  #include <sys/types.h>
      37  
      38  #include "async.h"
      39  #if ASYNC_IO
      40  
      41  DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
      42  
      43  DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
      44  DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
      45  
      46  /* Current unit for asynchronous I/O.  Needed for error reporting.  */
      47  
      48  __thread gfc_unit *thread_unit = NULL;
      49  
      50  /* Queue entry for the asynchronous I/O entry.  */
      51  typedef struct transfer_queue
      52  {
      53    enum aio_do type;
      54    struct transfer_queue *next;
      55    struct st_parameter_dt *new_pdt;
      56    transfer_args arg;
      57    _Bool has_id;
      58    int read_flag;
      59  } transfer_queue;
      60  
      61  struct error {
      62    st_parameter_dt *dtp;
      63    int id;
      64  };
      65  
      66  /* Helper function to exchange the old vs. a new PDT.  */
      67  
      68  static void
      69  update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
      70    st_parameter_dt *temp;
      71    NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
      72    temp = *old;
      73    *old = new;
      74    if (temp)
      75      free (temp);
      76  }
      77  
      78  /* Destroy an adv_cond structure.  */
      79  
      80  static void
      81  destroy_adv_cond (struct adv_cond *ac)
      82  {
      83    T_ERROR (__gthread_cond_destroy, &ac->signal);
      84  }
      85  
      86  /* Function invoked as start routine for a new asynchronous I/O unit.
      87     Contains the main loop for accepting requests and handling them.  */
      88  
      89  static void *
      90  async_io (void *arg)
      91  {
      92    DEBUG_LINE (aio_prefix = TPREFIX);
      93    transfer_queue *ctq = NULL, *prev = NULL;
      94    gfc_unit *u = (gfc_unit *) arg;
      95    async_unit *au = u->au;
      96    LOCK (&au->lock);
      97    thread_unit = u;
      98    au->thread = __gthread_self ();
      99    while (true)
     100      {
     101        /* Main loop.  At this point, au->lock is always held. */
     102        WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
     103        LOCK (&au->lock);
     104        ctq = au->head;
     105        prev = NULL;
     106        /* Loop over the queue entries until they are finished.  */
     107        while (ctq)
     108  	{
     109  	  if (prev)
     110  	    free (prev);
     111  	  prev = ctq;
     112  	  if (!au->error.has_error)
     113  	    {
     114  	      UNLOCK (&au->lock);
     115  
     116  	      switch (ctq->type)
     117  		{
     118  		case AIO_WRITE_DONE:
     119  		  NOTE ("Finalizing write");
     120  		  st_write_done_worker (au->pdt, false);
     121  		  UNLOCK (&au->io_lock);
     122  		  break;
     123  
     124  		case AIO_READ_DONE:
     125  		  NOTE ("Finalizing read");
     126  		  st_read_done_worker (au->pdt, false);
     127  		  UNLOCK (&au->io_lock);
     128  		  break;
     129  
     130  		case AIO_DATA_TRANSFER_INIT:
     131  		  NOTE ("Data transfer init");
     132  		  LOCK (&au->io_lock);
     133  		  update_pdt (&au->pdt, ctq->new_pdt);
     134  		  data_transfer_init_worker (au->pdt, ctq->read_flag);
     135  		  break;
     136  
     137  		case AIO_TRANSFER_SCALAR:
     138  		  NOTE ("Starting scalar transfer");
     139  		  ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
     140  					    ctq->arg.scalar.data,
     141  					    ctq->arg.scalar.i,
     142  					    ctq->arg.scalar.s1,
     143  					    ctq->arg.scalar.s2);
     144  		  break;
     145  
     146  		case AIO_TRANSFER_ARRAY:
     147  		  NOTE ("Starting array transfer");
     148  		  NOTE ("ctq->arg.array.desc = %p",
     149  			(void *) (ctq->arg.array.desc));
     150  		  transfer_array_inner (au->pdt, ctq->arg.array.desc,
     151  					ctq->arg.array.kind,
     152  					ctq->arg.array.charlen);
     153  		  free (ctq->arg.array.desc);
     154  		  break;
     155  
     156  		case AIO_CLOSE:
     157  		  NOTE ("Received AIO_CLOSE");
     158  		  LOCK (&au->lock);
     159  		  goto finish_thread;
     160  
     161  		default:
     162  		  internal_error (NULL, "Invalid queue type");
     163  		  break;
     164  		}
     165  	      LOCK (&au->lock);
     166  	      if (unlikely (au->error.has_error))
     167  		au->error.last_good_id = au->id.low - 1;
     168  	    }
     169  	  else
     170  	    {
     171  	      if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
     172  		{
     173  		  UNLOCK (&au->io_lock);
     174  		}
     175  	      else if (ctq->type == AIO_CLOSE)
     176  		{
     177  		  NOTE ("Received AIO_CLOSE during error condition");
     178  		  goto finish_thread;
     179  		}
     180  	    }
     181  
     182    	  NOTE ("Next ctq, current id: %d", au->id.low);
     183    	  if (ctq->has_id && au->id.waiting == au->id.low++)
     184  	    SIGNAL (&au->id.done);
     185  
     186  	  ctq = ctq->next;
     187  	}
     188        au->tail = NULL;
     189        au->head = NULL;
     190        au->empty = 1;
     191        SIGNAL (&au->emptysignal);
     192      }
     193   finish_thread:
     194    au->tail = NULL;
     195    au->head = NULL;
     196    au->empty = 1;
     197    SIGNAL (&au->emptysignal);
     198    free (ctq);
     199    UNLOCK (&au->lock);
     200    return NULL;
     201  }
     202  
     203  /* Free an asynchronous unit.  */
     204  
     205  static void
     206  free_async_unit (async_unit *au)
     207  {
     208    if (au->tail)
     209      internal_error (NULL, "Trying to free nonempty asynchronous unit");
     210  
     211    destroy_adv_cond (&au->work);
     212    destroy_adv_cond (&au->emptysignal);
     213    destroy_adv_cond (&au->id.done);
     214    T_ERROR (__gthread_mutex_destroy, &au->lock);
     215    free (au);
     216  }
     217  
     218  /* Initialize an adv_cond structure.  */
     219  
     220  static void
     221  init_adv_cond (struct adv_cond *ac)
     222  {
     223    ac->pending = 0;
     224    __GTHREAD_COND_INIT_FUNCTION (&ac->signal);
     225  }
     226  
     227  /* Initialize an asyncronous unit, returning zero on success,
     228   nonzero on failure.  It also sets u->au.  */
     229  
     230  void
     231  init_async_unit (gfc_unit *u)
     232  {
     233    async_unit *au;
     234    if (!__gthread_active_p ())
     235      {
     236        u->au = NULL;
     237        return;
     238      }
     239    
     240    au = (async_unit *) xmalloc (sizeof (async_unit));
     241    u->au = au;
     242    init_adv_cond (&au->work);
     243    init_adv_cond (&au->emptysignal);
     244    __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
     245    __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
     246    LOCK (&au->lock);
     247    T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
     248    au->pdt = NULL;
     249    au->head = NULL;
     250    au->tail = NULL;
     251    au->empty = true;
     252    au->id.waiting = -1;
     253    au->id.low = 0;
     254    au->id.high = 0;
     255    au->error.fatal_error = 0;
     256    au->error.has_error = 0;
     257    au->error.last_good_id = 0;
     258    init_adv_cond (&au->id.done);
     259    UNLOCK (&au->lock);
     260  }
     261  
     262  /* Enqueue a transfer statement.  */
     263  
     264  void
     265  enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
     266  {
     267    transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
     268    tq->arg = *arg;
     269    tq->type = type;
     270    tq->has_id = 0;
     271    LOCK (&au->lock);
     272    if (!au->tail)
     273      au->head = tq;
     274    else
     275      au->tail->next = tq;
     276    au->tail = tq;
     277    REVOKE_SIGNAL (&(au->emptysignal));
     278    au->empty = false;
     279    SIGNAL (&au->work);
     280    UNLOCK (&au->lock);
     281  }
     282  
     283  /* Enqueue an st_write_done or st_read_done which contains an ID.  */
     284  
     285  int
     286  enqueue_done_id (async_unit *au, enum aio_do type)
     287  {
     288    int ret;
     289    transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
     290  
     291    tq->type = type;
     292    tq->has_id = 1;
     293    LOCK (&au->lock);
     294    if (!au->tail)
     295      au->head = tq;
     296    else
     297      au->tail->next = tq;
     298    au->tail = tq;
     299    REVOKE_SIGNAL (&(au->emptysignal));
     300    au->empty = false;
     301    ret = au->id.high++;
     302    NOTE ("Enqueue id: %d", ret);
     303    SIGNAL (&au->work);
     304    UNLOCK (&au->lock);
     305    return ret;
     306  }
     307  
     308  /* Enqueue an st_write_done or st_read_done without an ID.  */
     309  
     310  void
     311  enqueue_done (async_unit *au, enum aio_do type)
     312  {
     313    transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
     314    tq->type = type;
     315    tq->has_id = 0;
     316    LOCK (&au->lock);
     317    if (!au->tail)
     318      au->head = tq;
     319    else
     320      au->tail->next = tq;
     321    au->tail = tq;
     322    REVOKE_SIGNAL (&(au->emptysignal));
     323    au->empty = false;
     324    SIGNAL (&au->work);
     325    UNLOCK (&au->lock);
     326  }
     327  
     328  /* Enqueue a CLOSE statement.  */
     329  
     330  void
     331  enqueue_close (async_unit *au)
     332  {
     333    transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
     334  
     335    tq->type = AIO_CLOSE;
     336    LOCK (&au->lock);
     337    if (!au->tail)
     338      au->head = tq;
     339    else
     340      au->tail->next = tq;
     341    au->tail = tq;
     342    REVOKE_SIGNAL (&(au->emptysignal));
     343    au->empty = false;
     344    SIGNAL (&au->work);
     345    UNLOCK (&au->lock);
     346  }
     347  
     348  /* The asynchronous unit keeps the currently active PDT around.
     349     This function changes that to the current one.  */
     350  
     351  void
     352  enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
     353  {
     354    st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
     355    transfer_queue *tq = xmalloc (sizeof (transfer_queue));
     356  
     357    memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
     358  
     359    NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
     360    NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
     361    tq->next = NULL;
     362    tq->type = AIO_DATA_TRANSFER_INIT;
     363    tq->read_flag = read_flag;
     364    tq->has_id = 0;
     365    tq->new_pdt = new;
     366    LOCK (&au->lock);
     367  
     368    if (!au->tail)
     369      au->head = tq;
     370    else
     371      au->tail->next = tq;
     372    au->tail = tq;
     373    REVOKE_SIGNAL (&(au->emptysignal));
     374    au->empty = false;
     375    SIGNAL (&au->work);
     376    UNLOCK (&au->lock);
     377  }
     378  
     379  /* Collect the errors that may have happened asynchronously.  Return true if
     380     an error has been encountered.  */
     381  
     382  bool
     383  collect_async_errors (st_parameter_common *cmp, async_unit *au)
     384  {
     385    bool has_error = au->error.has_error;
     386  
     387    if (has_error)
     388      {
     389        if (generate_error_common (cmp, au->error.family, au->error.message))
     390  	{
     391  	  au->error.has_error = 0;
     392  	  au->error.cmp = NULL;
     393  	}
     394        else
     395  	{
     396  	  /* The program will exit later.  */
     397  	  au->error.fatal_error = true;
     398  	}
     399      }
     400    return has_error;
     401  }
     402  
     403  /* Perform a wait operation on an asynchronous unit with an ID specified,
     404     which means collecting the errors that may have happened asynchronously.
     405     Return true if an error has been encountered.  */
     406  
     407  bool
     408  async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
     409  {
     410    bool ret;
     411  
     412    if (au == NULL)
     413      return false;
     414  
     415    if (cmp == NULL)
     416      cmp = au->error.cmp;
     417  
     418    if (au->error.has_error)
     419      {
     420        if (i <= au->error.last_good_id)
     421  	return false;
     422  
     423        return collect_async_errors (cmp, au);
     424      }
     425  
     426    LOCK (&au->lock);
     427    if (i > au->id.high)
     428      {
     429        generate_error_common (cmp, LIBERROR_BAD_WAIT_ID, NULL);
     430        UNLOCK (&au->lock);
     431        return true;
     432      }
     433  
     434    NOTE ("Waiting for id %d", i);
     435    if (au->id.waiting < i)
     436      au->id.waiting = i;
     437    SIGNAL (&(au->work));
     438    WAIT_SIGNAL_MUTEX (&(au->id.done),
     439  		     (au->id.low >= au->id.waiting || au->empty), &au->lock);
     440    LOCK (&au->lock);
     441    ret = collect_async_errors (cmp, au);
     442    UNLOCK (&au->lock);
     443    return ret;
     444  }
     445  
     446  /* Perform a wait operation an an asynchronous unit without an ID.  */
     447  
     448  bool
     449  async_wait (st_parameter_common *cmp, async_unit *au)
     450  {
     451    bool ret;
     452  
     453    if (au == NULL)
     454      return false;
     455  
     456    if (cmp == NULL)
     457      cmp = au->error.cmp;
     458  
     459    LOCK (&(au->lock));
     460    SIGNAL (&(au->work));
     461  
     462    if (au->empty)
     463      {
     464        ret = collect_async_errors (cmp, au);
     465        UNLOCK (&au->lock);
     466        return ret;
     467      }
     468  
     469    WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
     470    ret = collect_async_errors (cmp, au);
     471    return ret;
     472  }
     473  
     474  /* Close an asynchronous unit.  */
     475  
     476  void
     477  async_close (async_unit *au)
     478  {
     479    if (au == NULL)
     480      return;
     481  
     482    NOTE ("Closing async unit");
     483    enqueue_close (au);
     484    T_ERROR (__gthread_join, au->thread, NULL);
     485    free_async_unit (au);
     486  }
     487  
     488  #else
     489  
     490  /* Only set u->au to NULL so no async I/O will happen.  */
     491  
     492  void
     493  init_async_unit (gfc_unit *u)
     494  {
     495    u->au = NULL;
     496    return;
     497  }
     498  
     499  /* Do-nothing function, which will not be called.  */
     500  
     501  void
     502  enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
     503  {
     504    return;
     505  }
     506  
     507  /* Do-nothing function, which will not be called.  */
     508  
     509  int
     510  enqueue_done_id (async_unit *au, enum aio_do type)
     511  {
     512    return 0;
     513  }
     514  
     515  /* Do-nothing function, which will not be called.  */
     516  
     517  void
     518  enqueue_done (async_unit *au, enum aio_do type)
     519  {
     520    return;
     521  }
     522  
     523  /* Do-nothing function, which will not be called.  */
     524  
     525  void
     526  enqueue_close (async_unit *au)
     527  {
     528    return;
     529  }
     530  
     531  /* Do-nothing function, which will not be called.  */
     532  
     533  void
     534  enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
     535  {
     536    return;
     537  }
     538  
     539  /* Do-nothing function, which will not be called.  */
     540  
     541  bool
     542  collect_async_errors (st_parameter_common *cmp, async_unit *au)
     543  {
     544    return false;
     545  }
     546  
     547  /* Do-nothing function, which will not be called.  */
     548  
     549  bool
     550  async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
     551  {
     552    return false;
     553  }
     554  
     555  /* Do-nothing function, which will not be called.  */
     556  
     557  bool
     558  async_wait (st_parameter_common *cmp, async_unit *au)
     559  {
     560    return false;
     561  }
     562  
     563  /* Do-nothing function, which will not be called.  */
     564  
     565  void
     566  async_close (async_unit *au)
     567  {
     568    return;
     569  }
     570  
     571  #endif