Whamcloud - gitweb
new tag 2.2.93
[fs/lustre-release.git] / lnet / ulnds / socklnd / poll.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/ulnds/socklnd/poll.c
35  *
36  * Author: Maxim Patlasov <maxim@clusterfs.com>
37  */
38
39 #include "usocklnd.h"
40 #include <unistd.h>
41 #include <sys/syscall.h>
42
43 void
44 usocklnd_process_stale_list(usock_pollthread_t *pt_data)
45 {
46         while (!cfs_list_empty(&pt_data->upt_stale_list)) {
47                 usock_conn_t *conn;
48                 conn = cfs_list_entry(pt_data->upt_stale_list.next,
49                                       usock_conn_t, uc_stale_list);
50
51                 cfs_list_del(&conn->uc_stale_list);
52
53                 usocklnd_tear_peer_conn(conn);
54                 usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
55         }
56 }
57
58 int
59 usocklnd_poll_thread(void *arg)
60 {
61         int                 rc = 0;
62         usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
63         cfs_time_t          current_time;
64         cfs_time_t          planned_time;
65         int                 idx;
66         int                 idx_start;
67         int                 idx_finish;
68         int                 chunk;
69         int                 saved_nfds;
70         int                 extra;
71         int                 times;
72
73         /* mask signals to avoid SIGPIPE, etc */
74         sigset_t  sigs;
75         sigfillset (&sigs);
76         pthread_sigmask (SIG_SETMASK, &sigs, 0);
77
78         LASSERT(pt_data != NULL);
79
80         planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
81         chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
82         saved_nfds = pt_data->upt_nfds;
83         idx_start = 1;
84
85         /* Main loop */
86         while (usock_data.ud_shutdown == 0) {
87                 rc = 0;
88
89                 /* Process all enqueued poll requests */
90                 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
91                 while (!cfs_list_empty(&pt_data->upt_pollrequests)) {
92                         usock_pollrequest_t *pr;
93                         pr = cfs_list_entry(pt_data->upt_pollrequests.next,
94                                             usock_pollrequest_t, upr_list);
95
96                         cfs_list_del(&pr->upr_list);
97                         rc = usocklnd_process_pollrequest(pr, pt_data);
98                         if (rc)
99                                 break;
100                 }
101                 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
102
103                 if (rc)
104                         break;
105
106                 /* Delete conns orphaned due to POLL_DEL_REQUESTs */
107                 usocklnd_process_stale_list(pt_data);
108
109                 /* Actual polling for events */
110                 rc = poll(pt_data->upt_pollfd,
111                           pt_data->upt_nfds,
112                           usock_tuns.ut_poll_timeout * 1000);
113
114                 if (rc < 0 && errno != EINTR) {
115                         CERROR("Cannot poll(2): errno=%d\n", errno);
116                         break;
117                 }
118
119                 if (rc > 0)
120                         usocklnd_execute_handlers(pt_data);
121
122                 current_time = cfs_time_current();
123
124                 if (pt_data->upt_nfds < 2 ||
125                     cfs_time_before(current_time, planned_time))
126                         continue;
127
128                 /* catch up growing pollfd[] */
129                 if (pt_data->upt_nfds > saved_nfds) {
130                         extra = pt_data->upt_nfds - saved_nfds;
131                         saved_nfds = pt_data->upt_nfds;
132                 } else {
133                         extra = 0;
134                 }
135
136                 times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
137                 idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
138
139                 for (idx = idx_start; idx < idx_finish; idx++) {
140                         usock_conn_t *conn = pt_data->upt_idx2conn[idx];
141                         pthread_mutex_lock(&conn->uc_lock);
142                         if (usocklnd_conn_timed_out(conn, current_time) &&
143                             conn->uc_state != UC_DEAD) {
144                                 conn->uc_errored = 1;
145                                 usocklnd_conn_kill_locked(conn);
146                         }
147                         pthread_mutex_unlock(&conn->uc_lock);
148                 }
149
150                 if (idx_finish == pt_data->upt_nfds) {
151                         chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
152                         saved_nfds = pt_data->upt_nfds;
153                         idx_start = 1;
154                 }
155                 else {
156                         idx_start = idx_finish;
157                 }
158
159                 planned_time = cfs_time_add(current_time,
160                                             cfs_time_seconds(usock_tuns.ut_poll_timeout));
161         }
162
163         /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
164         LASSERT (rc != 0 || pt_data->upt_nfds == 1);
165
166         if (rc) {
167                 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
168
169                 /* Block new poll requests to be enqueued */
170                 pt_data->upt_errno = rc;
171
172                 while (!cfs_list_empty(&pt_data->upt_pollrequests)) {
173                         usock_pollrequest_t *pr;
174                         pr = cfs_list_entry(pt_data->upt_pollrequests.next,
175                                         usock_pollrequest_t, upr_list);
176
177                         cfs_list_del(&pr->upr_list);
178
179                         if (pr->upr_type == POLL_ADD_REQUEST) {
180                                 libcfs_sock_release(pr->upr_conn->uc_sock);
181                                 cfs_list_add_tail(&pr->upr_conn->uc_stale_list,
182                                                   &pt_data->upt_stale_list);
183                         } else {
184                                 usocklnd_conn_decref(pr->upr_conn);
185                         }
186
187                         LIBCFS_FREE (pr, sizeof(*pr));
188                 }
189                 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
190
191                 usocklnd_process_stale_list(pt_data);
192
193                 for (idx = 1; idx < pt_data->upt_nfds; idx++) {
194                         usock_conn_t *conn = pt_data->upt_idx2conn[idx];
195                         LASSERT(conn != NULL);
196                         libcfs_sock_release(conn->uc_sock);
197                         usocklnd_tear_peer_conn(conn);
198                         usocklnd_conn_decref(conn);
199                 }
200         }
201
202         /* unblock usocklnd_shutdown() */
203         cfs_mt_complete(&pt_data->upt_completion);
204
205         return 0;
206 }
207
208 /* Returns 0 on success, <0 else */
209 int
210 usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
211 {
212         int                  pt_idx = conn->uc_pt_idx;
213         usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];
214         usock_pollrequest_t *pr;
215
216         LIBCFS_ALLOC(pr, sizeof(*pr));
217         if (pr == NULL) {
218                 CERROR ("Cannot allocate poll request\n");
219                 return -ENOMEM;
220         }
221
222         pr->upr_conn = conn;
223         pr->upr_type = type;
224         pr->upr_value = value;
225
226         usocklnd_conn_addref(conn); /* +1 for poll request */
227
228         pthread_mutex_lock(&pt->upt_pollrequests_lock);
229
230         if (pt->upt_errno) { /* very rare case: errored poll thread */
231                 int rc = pt->upt_errno;
232                 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
233                 usocklnd_conn_decref(conn);
234                 LIBCFS_FREE(pr, sizeof(*pr));
235                 return rc;
236         }
237
238         cfs_list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
239         pthread_mutex_unlock(&pt->upt_pollrequests_lock);
240         return 0;
241 }
242
243 void
244 usocklnd_add_killrequest(usock_conn_t *conn)
245 {
246         int                  pt_idx = conn->uc_pt_idx;
247         usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];
248         usock_pollrequest_t *pr     = conn->uc_preq;
249
250         /* Use preallocated poll request because there is no good
251          * workaround for ENOMEM error while killing connection */
252         if (pr) {
253                 pr->upr_conn  = conn;
254                 pr->upr_type  = POLL_DEL_REQUEST;
255                 pr->upr_value = 0;
256
257                 usocklnd_conn_addref(conn); /* +1 for poll request */
258
259                 pthread_mutex_lock(&pt->upt_pollrequests_lock);
260
261                 if (pt->upt_errno) { /* very rare case: errored poll thread */
262                         pthread_mutex_unlock(&pt->upt_pollrequests_lock);
263                         usocklnd_conn_decref(conn);
264                         return; /* conn will be killed in poll thread anyway */
265                 }
266
267                 cfs_list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
268                 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
269
270                 conn->uc_preq = NULL;
271         }
272 }
273
274 /* Process poll request. Update poll data.
275  * Returns 0 on success, <0 else */
276 int
277 usocklnd_process_pollrequest(usock_pollrequest_t *pr,
278                              usock_pollthread_t *pt_data)
279 {
280         int            type  = pr->upr_type;
281         short          value = pr->upr_value;
282         usock_conn_t  *conn  = pr->upr_conn;
283         int            idx = 0;
284         struct pollfd *pollfd   = pt_data->upt_pollfd;
285         int           *fd2idx   = pt_data->upt_fd2idx;
286         usock_conn_t **idx2conn = pt_data->upt_idx2conn;
287         int           *skip     = pt_data->upt_skip;
288
289         LASSERT(conn != NULL);
290         LASSERT(conn->uc_sock != NULL);
291         LASSERT(type == POLL_ADD_REQUEST ||
292                 LIBCFS_SOCK2FD(conn->uc_sock) < pt_data->upt_nfd2idx);
293
294         if (type != POLL_ADD_REQUEST) {
295                 idx = fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)];
296                 if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
297                         LASSERT(pollfd[idx].fd ==
298                                 LIBCFS_SOCK2FD(conn->uc_sock));
299                 } else { /* unlikely */
300                         CWARN("Very unlikely event happend: trying to"
301                               " handle poll request of type %d but idx=%d"
302                               " is out of range [1 ... %d]. Is shutdown"
303                               " in progress (%d)?\n",
304                               type, idx, pt_data->upt_nfds - 1,
305                               usock_data.ud_shutdown);
306
307                         LIBCFS_FREE (pr, sizeof(*pr));
308                         usocklnd_conn_decref(conn);
309                         return 0;
310                 }
311         }
312
313         LIBCFS_FREE (pr, sizeof(*pr));
314
315         switch (type) {
316         case POLL_ADD_REQUEST:
317                 if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
318                         /* resize pollfd[], idx2conn[] and skip[] */
319                         struct pollfd *new_pollfd;
320                         int            new_npollfd = pt_data->upt_npollfd * 2;
321                         usock_conn_t **new_idx2conn;
322                         int           *new_skip;
323
324                         new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
325                                                      sizeof(struct pollfd));
326                         if (new_pollfd == NULL)
327                                 goto process_pollrequest_enomem;
328                         pt_data->upt_pollfd = pollfd = new_pollfd;
329
330                         new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
331                                                       sizeof(usock_conn_t *));
332                         if (new_idx2conn == NULL)
333                                 goto process_pollrequest_enomem;
334                         pt_data->upt_idx2conn = idx2conn = new_idx2conn;
335
336                         new_skip = LIBCFS_REALLOC(skip, new_npollfd *
337                                                   sizeof(int));
338                         if (new_skip == NULL)
339                                 goto process_pollrequest_enomem;
340                         pt_data->upt_skip = new_skip;
341
342                         pt_data->upt_npollfd = new_npollfd;
343                 }
344
345                 if (LIBCFS_SOCK2FD(conn->uc_sock) >= pt_data->upt_nfd2idx) {
346                         /* resize fd2idx[] */
347                         int *new_fd2idx;
348                         int  new_nfd2idx = pt_data->upt_nfd2idx * 2;
349
350                         while (new_nfd2idx <= LIBCFS_SOCK2FD(conn->uc_sock))
351                                 new_nfd2idx *= 2;
352
353                         new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
354                                                     sizeof(int));
355                         if (new_fd2idx == NULL)
356                                 goto process_pollrequest_enomem;
357
358                         pt_data->upt_fd2idx = fd2idx = new_fd2idx;
359                         memset(fd2idx + pt_data->upt_nfd2idx, 0,
360                                (new_nfd2idx - pt_data->upt_nfd2idx)
361                                * sizeof(int));
362                         pt_data->upt_nfd2idx = new_nfd2idx;
363                 }
364
365                 LASSERT(fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] == 0);
366
367                 idx = pt_data->upt_nfds++;
368                 idx2conn[idx] = conn;
369                 fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = idx;
370
371                 pollfd[idx].fd = LIBCFS_SOCK2FD(conn->uc_sock);
372                 pollfd[idx].events = value;
373                 pollfd[idx].revents = 0;
374                 break;
375         case POLL_DEL_REQUEST:
376                 fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = 0; /* invalidate this
377                                                             * entry */
378                 --pt_data->upt_nfds;
379                 if (idx != pt_data->upt_nfds) {
380                         /* shift last entry into released position */
381                         memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
382                                sizeof(struct pollfd));
383                         idx2conn[idx] = idx2conn[pt_data->upt_nfds];
384                         fd2idx[pollfd[idx].fd] = idx;
385                 }
386
387                 libcfs_sock_release(conn->uc_sock);
388                 cfs_list_add_tail(&conn->uc_stale_list,
389                                   &pt_data->upt_stale_list);
390                 break;
391         case POLL_RX_SET_REQUEST:
392                 pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
393                 break;
394         case POLL_TX_SET_REQUEST:
395                 pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
396                 break;
397         case POLL_SET_REQUEST:
398                 pollfd[idx].events = value;
399                 break;
400         default:
401                 LBUG(); /* unknown type */
402         }
403
404         /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
405          * reference that poll request possesses */
406         if (type != POLL_ADD_REQUEST)
407                 usocklnd_conn_decref(conn);
408
409         return 0;
410
411   process_pollrequest_enomem:
412         usocklnd_conn_decref(conn);
413         return -ENOMEM;
414 }
415
416 /* Loop on poll data executing handlers repeatedly until
417  *  fair_limit is reached or all entries are exhausted */
418 void
419 usocklnd_execute_handlers(usock_pollthread_t *pt_data)
420 {
421         struct pollfd *pollfd   = pt_data->upt_pollfd;
422         int            nfds     = pt_data->upt_nfds;
423         usock_conn_t **idx2conn = pt_data->upt_idx2conn;
424         int           *skip     = pt_data->upt_skip;
425         int            j;
426
427         if (pollfd[0].revents & POLLIN)
428                 while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
429                         ;
430
431         skip[0] = 1; /* always skip notifier fd */
432
433         for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
434                 int prev = 0;
435                 int i = skip[0];
436
437                 if (i >= nfds) /* nothing ready */
438                         break;
439
440                 do {
441                         usock_conn_t *conn = idx2conn[i];
442                         int next;
443
444                         if (j == 0) /* first pass... */
445                                 next = skip[i] = i+1; /* set skip chain */
446                         else /* later passes... */
447                                 next = skip[i]; /* skip unready pollfds */
448
449                         /* kill connection if it's closed by peer and
450                          * there is no data pending for reading */
451                         if ((pollfd[i].revents & POLLERR) != 0 ||
452                             (pollfd[i].revents & POLLHUP) != 0) {
453                                 if ((pollfd[i].events & POLLIN) != 0 &&
454                                     (pollfd[i].revents & POLLIN) == 0)
455                                         usocklnd_conn_kill(conn);
456                                 else
457                                         usocklnd_exception_handler(conn);
458                         }
459
460                         if ((pollfd[i].revents & POLLIN) != 0 &&
461                             usocklnd_read_handler(conn) <= 0)
462                                 pollfd[i].revents &= ~POLLIN;
463
464                         if ((pollfd[i].revents & POLLOUT) != 0 &&
465                             usocklnd_write_handler(conn) <= 0)
466                                 pollfd[i].revents &= ~POLLOUT;
467
468                         if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
469                                 skip[prev] = next; /* skip this entry next pass */
470                         else
471                                 prev = i;
472
473                         i = next;
474                 } while (i < nfds);
475         }
476 }
477
478 int
479 usocklnd_calculate_chunk_size(int num)
480 {
481         const int n     = 4;
482         const int p     = usock_tuns.ut_poll_timeout;
483         int       chunk = num;
484
485         /* chunk should be big enough to detect a timeout on any
486          * connection within (n+1)/n times the timeout interval
487          * if we checks every 'p' seconds 'chunk' conns */
488
489         if (usock_tuns.ut_timeout > n * p)
490                 chunk = (chunk * n * p) / usock_tuns.ut_timeout;
491
492         if (chunk == 0)
493                 chunk = 1;
494
495         return chunk;
496 }
497
498 void
499 usocklnd_wakeup_pollthread(int i)
500 {
501         usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
502         int                 notification = 0;
503         int                 rc;
504
505         rc = syscall(SYS_write, LIBCFS_SOCK2FD(pt->upt_notifier[0]),
506                      &notification, sizeof(notification));
507
508         if (rc != sizeof(notification))
509                 CERROR("Very unlikely event happend: "
510                        "cannot write to notifier fd (rc=%d; errno=%d)\n",
511                        rc, errno);
512 }