1 | /* GIO - GLib Input, Output and Streaming Library |
2 | * |
3 | * Copyright © 2009 Codethink Limited |
4 | * Copyright © 2009 Red Hat, Inc |
5 | * |
6 | * This library is free software; you can redistribute it and/or |
7 | * modify it under the terms of the GNU Lesser General Public |
8 | * License as published by the Free Software Foundation; either |
9 | * version 2.1 of the License, or (at your option) any later version. |
10 | * |
11 | * This library is distributed in the hope that it will be useful, |
12 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
14 | * Lesser General Public License for more details. |
15 | * |
16 | * You should have received a copy of the GNU Lesser General |
17 | * Public License along with this library; if not, see <http://www.gnu.org/licenses/>. |
18 | * |
19 | * Authors: Ryan Lortie <desrt@desrt.ca> |
20 | * Alexander Larsson <alexl@redhat.com> |
21 | */ |
22 | |
23 | /** |
24 | * SECTION:gthreadedsocketservice |
25 | * @title: GThreadedSocketService |
26 | * @short_description: A threaded GSocketService |
27 | * @include: gio/gio.h |
28 | * @see_also: #GSocketService. |
29 | * |
30 | * A #GThreadedSocketService is a simple subclass of #GSocketService |
31 | * that handles incoming connections by creating a worker thread and |
32 | * dispatching the connection to it by emitting the |
33 | * #GThreadedSocketService::run signal in the new thread. |
34 | * |
35 | * The signal handler may perform blocking IO and need not return |
36 | * until the connection is closed. |
37 | * |
38 | * The service is implemented using a thread pool, so there is a |
39 | * limited amount of threads available to serve incoming requests. |
40 | * The service automatically stops the #GSocketService from accepting |
41 | * new connections when all threads are busy. |
42 | * |
43 | * As with #GSocketService, you may connect to #GThreadedSocketService::run, |
44 | * or subclass and override the default handler. |
45 | */ |
46 | |
47 | #include "config.h" |
48 | #include "gsocketconnection.h" |
49 | #include "gthreadedsocketservice.h" |
50 | #include "glibintl.h" |
51 | #include "gmarshal-internal.h" |
52 | |
53 | struct _GThreadedSocketServicePrivate |
54 | { |
55 | GThreadPool *thread_pool; |
56 | int max_threads; |
57 | gint job_count; |
58 | }; |
59 | |
60 | static guint g_threaded_socket_service_run_signal; |
61 | |
62 | G_DEFINE_TYPE_WITH_PRIVATE (GThreadedSocketService, |
63 | g_threaded_socket_service, |
64 | G_TYPE_SOCKET_SERVICE) |
65 | |
66 | typedef enum |
67 | { |
68 | PROP_MAX_THREADS = 1, |
69 | } GThreadedSocketServiceProperty; |
70 | |
71 | G_LOCK_DEFINE_STATIC(job_count); |
72 | |
73 | typedef struct |
74 | { |
75 | GThreadedSocketService *service; /* (owned) */ |
76 | GSocketConnection *connection; /* (owned) */ |
77 | GObject *source_object; /* (owned) (nullable) */ |
78 | } GThreadedSocketServiceData; |
79 | |
80 | static void |
81 | g_threaded_socket_service_data_free (GThreadedSocketServiceData *data) |
82 | { |
83 | g_clear_object (&data->service); |
84 | g_clear_object (&data->connection); |
85 | g_clear_object (&data->source_object); |
86 | g_slice_free (GThreadedSocketServiceData, data); |
87 | } |
88 | |
89 | static void |
90 | g_threaded_socket_service_func (gpointer job_data, |
91 | gpointer user_data) |
92 | { |
93 | GThreadedSocketServiceData *data = job_data; |
94 | gboolean result; |
95 | |
96 | g_signal_emit (instance: data->service, signal_id: g_threaded_socket_service_run_signal, |
97 | detail: 0, data->connection, data->source_object, &result); |
98 | |
99 | G_LOCK (job_count); |
100 | if (data->service->priv->job_count-- == data->service->priv->max_threads) |
101 | g_socket_service_start (G_SOCKET_SERVICE (data->service)); |
102 | G_UNLOCK (job_count); |
103 | |
104 | g_threaded_socket_service_data_free (data); |
105 | } |
106 | |
107 | static gboolean |
108 | g_threaded_socket_service_incoming (GSocketService *service, |
109 | GSocketConnection *connection, |
110 | GObject *source_object) |
111 | { |
112 | GThreadedSocketService *threaded; |
113 | GThreadedSocketServiceData *data; |
114 | GError *local_error = NULL; |
115 | |
116 | threaded = G_THREADED_SOCKET_SERVICE (service); |
117 | |
118 | data = g_slice_new0 (GThreadedSocketServiceData); |
119 | data->service = g_object_ref (threaded); |
120 | data->connection = g_object_ref (connection); |
121 | data->source_object = (source_object != NULL) ? g_object_ref (source_object) : NULL; |
122 | |
123 | G_LOCK (job_count); |
124 | if (++threaded->priv->job_count == threaded->priv->max_threads) |
125 | g_socket_service_stop (service); |
126 | G_UNLOCK (job_count); |
127 | |
128 | if (!g_thread_pool_push (pool: threaded->priv->thread_pool, data, error: &local_error)) |
129 | { |
130 | g_warning ("Error handling incoming socket: %s" , local_error->message); |
131 | g_threaded_socket_service_data_free (data); |
132 | } |
133 | |
134 | g_clear_error (err: &local_error); |
135 | |
136 | return FALSE; |
137 | } |
138 | |
139 | static void |
140 | g_threaded_socket_service_init (GThreadedSocketService *service) |
141 | { |
142 | service->priv = g_threaded_socket_service_get_instance_private (self: service); |
143 | service->priv->max_threads = 10; |
144 | } |
145 | |
146 | static void |
147 | g_threaded_socket_service_constructed (GObject *object) |
148 | { |
149 | GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object); |
150 | |
151 | service->priv->thread_pool = |
152 | g_thread_pool_new (func: g_threaded_socket_service_func, |
153 | NULL, |
154 | max_threads: service->priv->max_threads, |
155 | FALSE, |
156 | NULL); |
157 | } |
158 | |
159 | |
160 | static void |
161 | g_threaded_socket_service_finalize (GObject *object) |
162 | { |
163 | GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object); |
164 | |
165 | /* All jobs in the pool hold a reference to this #GThreadedSocketService, so |
166 | * this should only be called once the pool is empty: */ |
167 | g_thread_pool_free (pool: service->priv->thread_pool, FALSE, FALSE); |
168 | |
169 | G_OBJECT_CLASS (g_threaded_socket_service_parent_class) |
170 | ->finalize (object); |
171 | } |
172 | |
173 | static void |
174 | g_threaded_socket_service_get_property (GObject *object, |
175 | guint prop_id, |
176 | GValue *value, |
177 | GParamSpec *pspec) |
178 | { |
179 | GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object); |
180 | |
181 | switch ((GThreadedSocketServiceProperty) prop_id) |
182 | { |
183 | case PROP_MAX_THREADS: |
184 | g_value_set_int (value, v_int: service->priv->max_threads); |
185 | break; |
186 | |
187 | default: |
188 | G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
189 | } |
190 | } |
191 | |
192 | static void |
193 | g_threaded_socket_service_set_property (GObject *object, |
194 | guint prop_id, |
195 | const GValue *value, |
196 | GParamSpec *pspec) |
197 | { |
198 | GThreadedSocketService *service = G_THREADED_SOCKET_SERVICE (object); |
199 | |
200 | switch ((GThreadedSocketServiceProperty) prop_id) |
201 | { |
202 | case PROP_MAX_THREADS: |
203 | service->priv->max_threads = g_value_get_int (value); |
204 | break; |
205 | |
206 | default: |
207 | G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
208 | } |
209 | } |
210 | |
211 | |
212 | static void |
213 | g_threaded_socket_service_class_init (GThreadedSocketServiceClass *class) |
214 | { |
215 | GObjectClass *gobject_class = G_OBJECT_CLASS (class); |
216 | GSocketServiceClass *ss_class = &class->parent_class; |
217 | |
218 | gobject_class->constructed = g_threaded_socket_service_constructed; |
219 | gobject_class->finalize = g_threaded_socket_service_finalize; |
220 | gobject_class->set_property = g_threaded_socket_service_set_property; |
221 | gobject_class->get_property = g_threaded_socket_service_get_property; |
222 | |
223 | ss_class->incoming = g_threaded_socket_service_incoming; |
224 | |
225 | /** |
226 | * GThreadedSocketService::run: |
227 | * @service: the #GThreadedSocketService. |
228 | * @connection: a new #GSocketConnection object. |
229 | * @source_object: (nullable): the source_object passed to g_socket_listener_add_address(). |
230 | * |
231 | * The ::run signal is emitted in a worker thread in response to an |
232 | * incoming connection. This thread is dedicated to handling |
233 | * @connection and may perform blocking IO. The signal handler need |
234 | * not return until the connection is closed. |
235 | * |
236 | * Returns: %TRUE to stop further signal handlers from being called |
237 | */ |
238 | g_threaded_socket_service_run_signal = |
239 | g_signal_new (I_("run" ), G_TYPE_FROM_CLASS (class), signal_flags: G_SIGNAL_RUN_LAST, |
240 | G_STRUCT_OFFSET (GThreadedSocketServiceClass, run), |
241 | accumulator: g_signal_accumulator_true_handled, NULL, |
242 | c_marshaller: _g_cclosure_marshal_BOOLEAN__OBJECT_OBJECT, |
243 | G_TYPE_BOOLEAN, |
244 | n_params: 2, G_TYPE_SOCKET_CONNECTION, G_TYPE_OBJECT); |
245 | g_signal_set_va_marshaller (signal_id: g_threaded_socket_service_run_signal, |
246 | G_TYPE_FROM_CLASS (class), |
247 | va_marshaller: _g_cclosure_marshal_BOOLEAN__OBJECT_OBJECTv); |
248 | |
249 | g_object_class_install_property (oclass: gobject_class, property_id: PROP_MAX_THREADS, |
250 | pspec: g_param_spec_int (name: "max-threads" , |
251 | P_("Max threads" ), |
252 | P_("The max number of threads handling clients for this service" ), |
253 | minimum: -1, |
254 | G_MAXINT, |
255 | default_value: 10, |
256 | flags: G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
257 | } |
258 | |
259 | /** |
260 | * g_threaded_socket_service_new: |
261 | * @max_threads: the maximal number of threads to execute concurrently |
262 | * handling incoming clients, -1 means no limit |
263 | * |
264 | * Creates a new #GThreadedSocketService with no listeners. Listeners |
265 | * must be added with one of the #GSocketListener "add" methods. |
266 | * |
267 | * Returns: a new #GSocketService. |
268 | * |
269 | * Since: 2.22 |
270 | */ |
271 | GSocketService * |
272 | g_threaded_socket_service_new (int max_threads) |
273 | { |
274 | return g_object_new (G_TYPE_THREADED_SOCKET_SERVICE, |
275 | first_property_name: "max-threads" , max_threads, |
276 | NULL); |
277 | } |
278 | |