(root)/
glib-2.79.0/
gio/
gthreadedsocketservice.c
       1  /* GIO - GLib Input, Output and Streaming Library
       2   *
       3   * Copyright © 2009 Codethink Limited
       4   * Copyright © 2009 Red Hat, Inc
       5   *
       6   * SPDX-License-Identifier: LGPL-2.1-or-later
       7   *
       8   * This library is free software; you can redistribute it and/or
       9   * modify it under the terms of the GNU Lesser General Public
      10   * License as published by the Free Software Foundation; either
      11   * version 2.1 of the License, or (at your option) any later version.
      12   *
      13   * This library is distributed in the hope that it will be useful,
      14   * but WITHOUT ANY WARRANTY; without even the implied warranty of
      15   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      16   * Lesser General Public License for more details.
      17   *
      18   * You should have received a copy of the GNU Lesser General
      19   * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
      20   *
      21   * Authors: Ryan Lortie <desrt@desrt.ca>
      22   *          Alexander Larsson <alexl@redhat.com>
      23   */
      24  
      25  /**
      26   * GThreadedSocketService:
      27   *
      28   * A `GThreadedSocketService` is a simple subclass of [class@Gio.SocketService]
      29   * that handles incoming connections by creating a worker thread and
      30   * dispatching the connection to it by emitting the
      31   * [signal@Gio.ThreadedSocketService::run signal] in the new thread.
      32   *
      33   * The signal handler may perform blocking I/O and need not return
      34   * until the connection is closed.
      35   *
      36   * The service is implemented using a thread pool, so there is a
      37   * limited amount of threads available to serve incoming requests.
      38   * The service automatically stops the [class@Gio.SocketService] from accepting
      39   * new connections when all threads are busy.
      40   *
      41   * As with [class@Gio.SocketService], you may connect to
      42   * [signal@Gio.ThreadedSocketService::run], or subclass and override the default
      43   * handler.
      44   *
      45   * Since: 2.22
      46   */
      47  
      48  #include "config.h"
      49  #include "gsocketconnection.h"
      50  #include "gthreadedsocketservice.h"
      51  #include "glibintl.h"
      52  #include "gmarshal-internal.h"
      53  
      54  struct _GThreadedSocketServicePrivate
      55  {
      56    GThreadPool *thread_pool;
      57    int max_threads;
      58    gint job_count;
      59  };
      60  
      61  static guint g_threaded_socket_service_run_signal;
      62  
      63  G_DEFINE_TYPE_WITH_PRIVATE (GThreadedSocketService,
      64                              g_threaded_socket_service,
      65                              G_TYPE_SOCKET_SERVICE)
      66  
      67  typedef enum
      68  {
      69    PROP_MAX_THREADS = 1,
      70  } GThreadedSocketServiceProperty;
      71  
      72  G_LOCK_DEFINE_STATIC(job_count);
      73  
      74  typedef struct
      75  {
      76    GThreadedSocketService *service;  /* (owned) */
      77    GSocketConnection *connection;  /* (owned) */
      78    GObject *source_object;  /* (owned) (nullable) */
      79  } GThreadedSocketServiceData;
      80  
      81  static void
      82  g_threaded_socket_service_data_free (GThreadedSocketServiceData *data)
      83  {
      84    g_clear_object (&data->service);
      85    g_clear_object (&data->connection);
      86    g_clear_object (&data->source_object);
      87    g_slice_free (GThreadedSocketServiceData, data);
      88  }
      89  
      90  static void
      91  g_threaded_socket_service_func (gpointer job_data,
      92                                  gpointer user_data)
      93  {
      94    GThreadedSocketServiceData *data = job_data;
      95    gboolean result;
      96  
      97    g_signal_emit (data->service, g_threaded_socket_service_run_signal,
      98                   0, data->connection, data->source_object, &result);
      99  
     100    G_LOCK (job_count);
     101    if (data->service->priv->job_count-- == data->service->priv->max_threads)
     102      g_socket_service_start (G_SOCKET_SERVICE (data->service));
     103    G_UNLOCK (job_count);
     104  
     105    g_threaded_socket_service_data_free (data);
     106  }
     107  
     108  static gboolean
     109  g_threaded_socket_service_incoming (GSocketService    *service,
     110                                      GSocketConnection *connection,
     111                                      GObject           *source_object)
     112  {
     113    GThreadedSocketService *threaded;
     114    GThreadedSocketServiceData *data;
     115    GError *local_error = NULL;
     116  
     117    threaded = G_THREADED_SOCKET_SERVICE (service);
     118  
     119    data = g_slice_new0 (GThreadedSocketServiceData);
     120    data->service = g_object_ref (threaded);
     121    data->connection = g_object_ref (connection);
     122    data->source_object = (source_object != NULL) ? g_object_ref (source_object) : NULL;
     123  
     124    G_LOCK (job_count);
     125    if (++threaded->priv->job_count == threaded->priv->max_threads)
     126      g_socket_service_stop (service);
     127    G_UNLOCK (job_count);
     128  
     129    if (!g_thread_pool_push (threaded->priv->thread_pool, data, &local_error))
     130      {
     131        g_warning ("Error handling incoming socket: %s", local_error->message);
     132        g_threaded_socket_service_data_free (data);
     133      }
     134  
     135    g_clear_error (&local_error);
     136  
     137    return FALSE;
     138  }
     139  
     140  static void
     141  g_threaded_socket_service_init (GThreadedSocketService *service)
     142  {
     143    service->priv = g_threaded_socket_service_get_instance_private (service);
     144    service->priv->max_threads = 10;
     145  }
     146  
     147  static void
     148  g_threaded_socket_service_constructed (GObject *object)
     149  {
     150    GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
     151  
     152    service->priv->thread_pool =
     153      g_thread_pool_new  (g_threaded_socket_service_func,
     154  			NULL,
     155  			service->priv->max_threads,
     156  			FALSE,
     157  			NULL);
     158  }
     159  
     160  
     161  static void
     162  g_threaded_socket_service_finalize (GObject *object)
     163  {
     164    GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
     165  
     166    /* All jobs in the pool hold a reference to this #GThreadedSocketService, so
     167     * this should only be called once the pool is empty: */
     168    g_thread_pool_free (service->priv->thread_pool, FALSE, FALSE);
     169  
     170    G_OBJECT_CLASS (g_threaded_socket_service_parent_class)
     171      ->finalize (object);
     172  }
     173  
     174  static void
     175  g_threaded_socket_service_get_property (GObject    *object,
     176  					guint       prop_id,
     177  					GValue     *value,
     178  					GParamSpec *pspec)
     179  {
     180    GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
     181  
     182    switch ((GThreadedSocketServiceProperty) prop_id)
     183      {
     184        case PROP_MAX_THREADS:
     185  	g_value_set_int (value, service->priv->max_threads);
     186  	break;
     187  
     188        default:
     189  	G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     190      }
     191  }
     192  
     193  static void
     194  g_threaded_socket_service_set_property (GObject      *object,
     195  					guint         prop_id,
     196  					const GValue *value,
     197  					GParamSpec   *pspec)
     198  {
     199    GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object);
     200  
     201    switch ((GThreadedSocketServiceProperty) prop_id)
     202      {
     203        case PROP_MAX_THREADS:
     204  	service->priv->max_threads = g_value_get_int (value);
     205  	break;
     206  
     207        default:
     208  	G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     209      }
     210  }
     211  
     212  
     213  static void
     214  g_threaded_socket_service_class_init (GThreadedSocketServiceClass *class)
     215  {
     216    GObjectClass *gobject_class = G_OBJECT_CLASS (class);
     217    GSocketServiceClass *ss_class = &class->parent_class;
     218  
     219    gobject_class->constructed = g_threaded_socket_service_constructed;
     220    gobject_class->finalize = g_threaded_socket_service_finalize;
     221    gobject_class->set_property = g_threaded_socket_service_set_property;
     222    gobject_class->get_property = g_threaded_socket_service_get_property;
     223  
     224    ss_class->incoming = g_threaded_socket_service_incoming;
     225  
     226    /**
     227     * GThreadedSocketService::run:
     228     * @service: the #GThreadedSocketService.
     229     * @connection: a new #GSocketConnection object.
     230     * @source_object: (nullable): the source_object passed to g_socket_listener_add_address().
     231     *
     232     * The ::run signal is emitted in a worker thread in response to an
     233     * incoming connection. This thread is dedicated to handling
     234     * @connection and may perform blocking IO. The signal handler need
     235     * not return until the connection is closed.
     236     *
     237     * Returns: %TRUE to stop further signal handlers from being called
     238     */
     239    g_threaded_socket_service_run_signal =
     240      g_signal_new (I_("run"), G_TYPE_FROM_CLASS (class), G_SIGNAL_RUN_LAST,
     241  		  G_STRUCT_OFFSET (GThreadedSocketServiceClass, run),
     242  		  g_signal_accumulator_true_handled, NULL,
     243  		  _g_cclosure_marshal_BOOLEAN__OBJECT_OBJECT,
     244  		  G_TYPE_BOOLEAN,
     245  		  2, G_TYPE_SOCKET_CONNECTION, G_TYPE_OBJECT);
     246    g_signal_set_va_marshaller (g_threaded_socket_service_run_signal,
     247  			      G_TYPE_FROM_CLASS (class),
     248  			      _g_cclosure_marshal_BOOLEAN__OBJECT_OBJECTv);
     249  
     250    /**
     251     * GThreadedSocketService:max-threads:
     252     *
     253     * The maximum number of threads handling clients for this service.
     254     *
     255     * Since: 2.22
     256     */
     257    g_object_class_install_property (gobject_class, PROP_MAX_THREADS,
     258  				   g_param_spec_int ("max-threads", NULL, NULL,
     259  						     -1,
     260  						     G_MAXINT,
     261  						     10,
     262  						     G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     263  }
     264  
     265  /**
     266   * g_threaded_socket_service_new:
     267   * @max_threads: the maximal number of threads to execute concurrently
     268   *   handling incoming clients, -1 means no limit
     269   *
     270   * Creates a new #GThreadedSocketService with no listeners. Listeners
     271   * must be added with one of the #GSocketListener "add" methods.
     272   *
     273   * Returns: a new #GSocketService.
     274   *
     275   * Since: 2.22
     276   */
     277  GSocketService *
     278  g_threaded_socket_service_new (int max_threads)
     279  {
     280    return g_object_new (G_TYPE_THREADED_SOCKET_SERVICE,
     281  		       "max-threads", max_threads,
     282  		       NULL);
     283  }