Whamcloud - gitweb
LU-502 don't allow to kill service threads by OOM killer.
[fs/lustre-release.git] / lnet / ulnds / socklnd / poll.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/poll.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42 #include <unistd.h>
43 #include <sys/syscall.h>
44
45 void
46 usocklnd_process_stale_list(usock_pollthread_t *pt_data)
47 {
48         while (!cfs_list_empty(&pt_data->upt_stale_list)) {
49                 usock_conn_t *conn;
50                 conn = cfs_list_entry(pt_data->upt_stale_list.next,
51                                       usock_conn_t, uc_stale_list);
52
53                 cfs_list_del(&conn->uc_stale_list);
54
55                 usocklnd_tear_peer_conn(conn);
56                 usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
57         }
58 }
59
60 int
61 usocklnd_poll_thread(void *arg)
62 {
63         int                 rc = 0;
64         usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
65         cfs_time_t          current_time;
66         cfs_time_t          planned_time;
67         int                 idx;
68         int                 idx_start;
69         int                 idx_finish;
70         int                 chunk;
71         int                 saved_nfds;
72         int                 extra;
73         int                 times;
74
75         /* mask signals to avoid SIGPIPE, etc */
76         sigset_t  sigs;
77         sigfillset (&sigs);
78         pthread_sigmask (SIG_SETMASK, &sigs, 0);
79
80         LASSERT(pt_data != NULL);
81
82         planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
83         chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
84         saved_nfds = pt_data->upt_nfds;
85         idx_start = 1;
86
87         /* Main loop */
88         while (usock_data.ud_shutdown == 0) {
89                 rc = 0;
90
91                 /* Process all enqueued poll requests */
92                 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
93                 while (!cfs_list_empty(&pt_data->upt_pollrequests)) {
94                         usock_pollrequest_t *pr;
95                         pr = cfs_list_entry(pt_data->upt_pollrequests.next,
96                                             usock_pollrequest_t, upr_list);
97
98                         cfs_list_del(&pr->upr_list);
99                         rc = usocklnd_process_pollrequest(pr, pt_data);
100                         if (rc)
101                                 break;
102                 }
103                 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
104
105                 if (rc)
106                         break;
107
108                 /* Delete conns orphaned due to POLL_DEL_REQUESTs */
109                 usocklnd_process_stale_list(pt_data);
110
111                 /* Actual polling for events */
112                 rc = poll(pt_data->upt_pollfd,
113                           pt_data->upt_nfds,
114                           usock_tuns.ut_poll_timeout * 1000);
115
116                 if (rc < 0 && errno != EINTR) {
117                         CERROR("Cannot poll(2): errno=%d\n", errno);
118                         break;
119                 }
120
121                 if (rc > 0)
122                         usocklnd_execute_handlers(pt_data);
123
124                 current_time = cfs_time_current();
125
126                 if (pt_data->upt_nfds < 2 ||
127                     cfs_time_before(current_time, planned_time))
128                         continue;
129
130                 /* catch up growing pollfd[] */
131                 if (pt_data->upt_nfds > saved_nfds) {
132                         extra = pt_data->upt_nfds - saved_nfds;
133                         saved_nfds = pt_data->upt_nfds;
134                 } else {
135                         extra = 0;
136                 }
137
138                 times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
139                 idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
140
141                 for (idx = idx_start; idx < idx_finish; idx++) {
142                         usock_conn_t *conn = pt_data->upt_idx2conn[idx];
143                         pthread_mutex_lock(&conn->uc_lock);
144                         if (usocklnd_conn_timed_out(conn, current_time) &&
145                             conn->uc_state != UC_DEAD) {
146                                 conn->uc_errored = 1;
147                                 usocklnd_conn_kill_locked(conn);
148                         }
149                         pthread_mutex_unlock(&conn->uc_lock);
150                 }
151
152                 if (idx_finish == pt_data->upt_nfds) {
153                         chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
154                         saved_nfds = pt_data->upt_nfds;
155                         idx_start = 1;
156                 }
157                 else {
158                         idx_start = idx_finish;
159                 }
160
161                 planned_time = cfs_time_add(current_time,
162                                             cfs_time_seconds(usock_tuns.ut_poll_timeout));
163         }
164
165         /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
166         LASSERT (rc != 0 || pt_data->upt_nfds == 1);
167
168         if (rc) {
169                 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
170
171                 /* Block new poll requests to be enqueued */
172                 pt_data->upt_errno = rc;
173
174                 while (!cfs_list_empty(&pt_data->upt_pollrequests)) {
175                         usock_pollrequest_t *pr;
176                         pr = cfs_list_entry(pt_data->upt_pollrequests.next,
177                                         usock_pollrequest_t, upr_list);
178
179                         cfs_list_del(&pr->upr_list);
180
181                         if (pr->upr_type == POLL_ADD_REQUEST) {
182                                 libcfs_sock_release(pr->upr_conn->uc_sock);
183                                 cfs_list_add_tail(&pr->upr_conn->uc_stale_list,
184                                                   &pt_data->upt_stale_list);
185                         } else {
186                                 usocklnd_conn_decref(pr->upr_conn);
187                         }
188
189                         LIBCFS_FREE (pr, sizeof(*pr));
190                 }
191                 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
192
193                 usocklnd_process_stale_list(pt_data);
194
195                 for (idx = 1; idx < pt_data->upt_nfds; idx++) {
196                         usock_conn_t *conn = pt_data->upt_idx2conn[idx];
197                         LASSERT(conn != NULL);
198                         libcfs_sock_release(conn->uc_sock);
199                         usocklnd_tear_peer_conn(conn);
200                         usocklnd_conn_decref(conn);
201                 }
202         }
203
204         /* unblock usocklnd_shutdown() */
205         cfs_mt_complete(&pt_data->upt_completion);
206
207         return 0;
208 }
209
210 /* Returns 0 on success, <0 else */
211 int
212 usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
213 {
214         int                  pt_idx = conn->uc_pt_idx;
215         usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];
216         usock_pollrequest_t *pr;
217
218         LIBCFS_ALLOC(pr, sizeof(*pr));
219         if (pr == NULL) {
220                 CERROR ("Cannot allocate poll request\n");
221                 return -ENOMEM;
222         }
223
224         pr->upr_conn = conn;
225         pr->upr_type = type;
226         pr->upr_value = value;
227
228         usocklnd_conn_addref(conn); /* +1 for poll request */
229
230         pthread_mutex_lock(&pt->upt_pollrequests_lock);
231
232         if (pt->upt_errno) { /* very rare case: errored poll thread */
233                 int rc = pt->upt_errno;
234                 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
235                 usocklnd_conn_decref(conn);
236                 LIBCFS_FREE(pr, sizeof(*pr));
237                 return rc;
238         }
239
240         cfs_list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
241         pthread_mutex_unlock(&pt->upt_pollrequests_lock);
242         return 0;
243 }
244
245 void
246 usocklnd_add_killrequest(usock_conn_t *conn)
247 {
248         int                  pt_idx = conn->uc_pt_idx;
249         usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];
250         usock_pollrequest_t *pr     = conn->uc_preq;
251
252         /* Use preallocated poll request because there is no good
253          * workaround for ENOMEM error while killing connection */
254         if (pr) {
255                 pr->upr_conn  = conn;
256                 pr->upr_type  = POLL_DEL_REQUEST;
257                 pr->upr_value = 0;
258
259                 usocklnd_conn_addref(conn); /* +1 for poll request */
260
261                 pthread_mutex_lock(&pt->upt_pollrequests_lock);
262
263                 if (pt->upt_errno) { /* very rare case: errored poll thread */
264                         pthread_mutex_unlock(&pt->upt_pollrequests_lock);
265                         usocklnd_conn_decref(conn);
266                         return; /* conn will be killed in poll thread anyway */
267                 }
268
269                 cfs_list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
270                 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
271
272                 conn->uc_preq = NULL;
273         }
274 }
275
276 /* Process poll request. Update poll data.
277  * Returns 0 on success, <0 else */
278 int
279 usocklnd_process_pollrequest(usock_pollrequest_t *pr,
280                              usock_pollthread_t *pt_data)
281 {
282         int            type  = pr->upr_type;
283         short          value = pr->upr_value;
284         usock_conn_t  *conn  = pr->upr_conn;
285         int            idx = 0;
286         struct pollfd *pollfd   = pt_data->upt_pollfd;
287         int           *fd2idx   = pt_data->upt_fd2idx;
288         usock_conn_t **idx2conn = pt_data->upt_idx2conn;
289         int           *skip     = pt_data->upt_skip;
290
291         LASSERT(conn != NULL);
292         LASSERT(conn->uc_sock != NULL);
293         LASSERT(type == POLL_ADD_REQUEST ||
294                 LIBCFS_SOCK2FD(conn->uc_sock) < pt_data->upt_nfd2idx);
295
296         if (type != POLL_ADD_REQUEST) {
297                 idx = fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)];
298                 if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
299                         LASSERT(pollfd[idx].fd ==
300                                 LIBCFS_SOCK2FD(conn->uc_sock));
301                 } else { /* unlikely */
302                         CWARN("Very unlikely event happend: trying to"
303                               " handle poll request of type %d but idx=%d"
304                               " is out of range [1 ... %d]. Is shutdown"
305                               " in progress (%d)?\n",
306                               type, idx, pt_data->upt_nfds - 1,
307                               usock_data.ud_shutdown);
308
309                         LIBCFS_FREE (pr, sizeof(*pr));
310                         usocklnd_conn_decref(conn);
311                         return 0;
312                 }
313         }
314
315         LIBCFS_FREE (pr, sizeof(*pr));
316
317         switch (type) {
318         case POLL_ADD_REQUEST:
319                 if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
320                         /* resize pollfd[], idx2conn[] and skip[] */
321                         struct pollfd *new_pollfd;
322                         int            new_npollfd = pt_data->upt_npollfd * 2;
323                         usock_conn_t **new_idx2conn;
324                         int           *new_skip;
325
326                         new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
327                                                      sizeof(struct pollfd));
328                         if (new_pollfd == NULL)
329                                 goto process_pollrequest_enomem;
330                         pt_data->upt_pollfd = pollfd = new_pollfd;
331
332                         new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
333                                                       sizeof(usock_conn_t *));
334                         if (new_idx2conn == NULL)
335                                 goto process_pollrequest_enomem;
336                         pt_data->upt_idx2conn = idx2conn = new_idx2conn;
337
338                         new_skip = LIBCFS_REALLOC(skip, new_npollfd *
339                                                   sizeof(int));
340                         if (new_skip == NULL)
341                                 goto process_pollrequest_enomem;
342                         pt_data->upt_skip = new_skip;
343
344                         pt_data->upt_npollfd = new_npollfd;
345                 }
346
347                 if (LIBCFS_SOCK2FD(conn->uc_sock) >= pt_data->upt_nfd2idx) {
348                         /* resize fd2idx[] */
349                         int *new_fd2idx;
350                         int  new_nfd2idx = pt_data->upt_nfd2idx * 2;
351
352                         while (new_nfd2idx <= LIBCFS_SOCK2FD(conn->uc_sock))
353                                 new_nfd2idx *= 2;
354
355                         new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
356                                                     sizeof(int));
357                         if (new_fd2idx == NULL)
358                                 goto process_pollrequest_enomem;
359
360                         pt_data->upt_fd2idx = fd2idx = new_fd2idx;
361                         memset(fd2idx + pt_data->upt_nfd2idx, 0,
362                                (new_nfd2idx - pt_data->upt_nfd2idx)
363                                * sizeof(int));
364                         pt_data->upt_nfd2idx = new_nfd2idx;
365                 }
366
367                 LASSERT(fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] == 0);
368
369                 idx = pt_data->upt_nfds++;
370                 idx2conn[idx] = conn;
371                 fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = idx;
372
373                 pollfd[idx].fd = LIBCFS_SOCK2FD(conn->uc_sock);
374                 pollfd[idx].events = value;
375                 pollfd[idx].revents = 0;
376                 break;
377         case POLL_DEL_REQUEST:
378                 fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = 0; /* invalidate this
379                                                             * entry */
380                 --pt_data->upt_nfds;
381                 if (idx != pt_data->upt_nfds) {
382                         /* shift last entry into released position */
383                         memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
384                                sizeof(struct pollfd));
385                         idx2conn[idx] = idx2conn[pt_data->upt_nfds];
386                         fd2idx[pollfd[idx].fd] = idx;
387                 }
388
389                 libcfs_sock_release(conn->uc_sock);
390                 cfs_list_add_tail(&conn->uc_stale_list,
391                                   &pt_data->upt_stale_list);
392                 break;
393         case POLL_RX_SET_REQUEST:
394                 pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
395                 break;
396         case POLL_TX_SET_REQUEST:
397                 pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
398                 break;
399         case POLL_SET_REQUEST:
400                 pollfd[idx].events = value;
401                 break;
402         default:
403                 LBUG(); /* unknown type */
404         }
405
406         /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
407          * reference that poll request possesses */
408         if (type != POLL_ADD_REQUEST)
409                 usocklnd_conn_decref(conn);
410
411         return 0;
412
413   process_pollrequest_enomem:
414         usocklnd_conn_decref(conn);
415         return -ENOMEM;
416 }
417
418 /* Loop on poll data executing handlers repeatedly until
419  *  fair_limit is reached or all entries are exhausted */
420 void
421 usocklnd_execute_handlers(usock_pollthread_t *pt_data)
422 {
423         struct pollfd *pollfd   = pt_data->upt_pollfd;
424         int            nfds     = pt_data->upt_nfds;
425         usock_conn_t **idx2conn = pt_data->upt_idx2conn;
426         int           *skip     = pt_data->upt_skip;
427         int            j;
428
429         if (pollfd[0].revents & POLLIN)
430                 while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
431                         ;
432
433         skip[0] = 1; /* always skip notifier fd */
434
435         for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
436                 int prev = 0;
437                 int i = skip[0];
438
439                 if (i >= nfds) /* nothing ready */
440                         break;
441
442                 do {
443                         usock_conn_t *conn = idx2conn[i];
444                         int next;
445
446                         if (j == 0) /* first pass... */
447                                 next = skip[i] = i+1; /* set skip chain */
448                         else /* later passes... */
449                                 next = skip[i]; /* skip unready pollfds */
450
451                         /* kill connection if it's closed by peer and
452                          * there is no data pending for reading */
453                         if ((pollfd[i].revents & POLLERR) != 0 ||
454                             (pollfd[i].revents & POLLHUP) != 0) {
455                                 if ((pollfd[i].events & POLLIN) != 0 &&
456                                     (pollfd[i].revents & POLLIN) == 0)
457                                         usocklnd_conn_kill(conn);
458                                 else
459                                         usocklnd_exception_handler(conn);
460                         }
461
462                         if ((pollfd[i].revents & POLLIN) != 0 &&
463                             usocklnd_read_handler(conn) <= 0)
464                                 pollfd[i].revents &= ~POLLIN;
465
466                         if ((pollfd[i].revents & POLLOUT) != 0 &&
467                             usocklnd_write_handler(conn) <= 0)
468                                 pollfd[i].revents &= ~POLLOUT;
469
470                         if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
471                                 skip[prev] = next; /* skip this entry next pass */
472                         else
473                                 prev = i;
474
475                         i = next;
476                 } while (i < nfds);
477         }
478 }
479
480 int
481 usocklnd_calculate_chunk_size(int num)
482 {
483         const int n     = 4;
484         const int p     = usock_tuns.ut_poll_timeout;
485         int       chunk = num;
486
487         /* chunk should be big enough to detect a timeout on any
488          * connection within (n+1)/n times the timeout interval
489          * if we checks every 'p' seconds 'chunk' conns */
490
491         if (usock_tuns.ut_timeout > n * p)
492                 chunk = (chunk * n * p) / usock_tuns.ut_timeout;
493
494         if (chunk == 0)
495                 chunk = 1;
496
497         return chunk;
498 }
499
500 void
501 usocklnd_wakeup_pollthread(int i)
502 {
503         usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
504         int                 notification = 0;
505         int                 rc;
506
507         rc = syscall(SYS_write, LIBCFS_SOCK2FD(pt->upt_notifier[0]),
508                      &notification, sizeof(notification));
509
510         if (rc != sizeof(notification))
511                 CERROR("Very unlikely event happend: "
512                        "cannot write to notifier fd (rc=%d; errno=%d)\n",
513                        rc, errno);
514 }