Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / ulnds / socklnd / poll.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/socklnd/poll.c
37  *
38  * Author: Maxim Patlasov <maxim@clusterfs.com>
39  */
40
41 #include "usocklnd.h"
42 #include <unistd.h>
43 #include <sys/syscall.h>
44
45 void
46 usocklnd_process_stale_list(usock_pollthread_t *pt_data)
47 {
48         while (!list_empty(&pt_data->upt_stale_list)) {
49                 usock_conn_t *conn;                        
50                 conn = list_entry(pt_data->upt_stale_list.next,
51                                   usock_conn_t, uc_stale_list);
52                 
53                 list_del(&conn->uc_stale_list);
54                 
55                 usocklnd_tear_peer_conn(conn);
56                 usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
57         }
58 }
59
60 int
61 usocklnd_poll_thread(void *arg)
62 {
63         int                 rc = 0;
64         usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
65         cfs_time_t          current_time;
66         cfs_time_t          planned_time;
67         int                 idx;
68         int                 idx_start;
69         int                 idx_finish;
70         int                 chunk;
71         int                 saved_nfds;
72         int                 extra;
73         int                 times;
74
75         /* mask signals to avoid SIGPIPE, etc */
76         sigset_t  sigs;
77         sigfillset (&sigs);
78         pthread_sigmask (SIG_SETMASK, &sigs, 0);
79         
80         LASSERT(pt_data != NULL);
81         
82         planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
83         chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
84         saved_nfds = pt_data->upt_nfds;
85         idx_start = 1;
86         
87         /* Main loop */
88         while (usock_data.ud_shutdown == 0) {
89                 rc = 0;
90
91                 /* Process all enqueued poll requests */
92                 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
93                 while (!list_empty(&pt_data->upt_pollrequests)) {
94                         usock_pollrequest_t *pr;
95                         pr = list_entry(pt_data->upt_pollrequests.next,
96                                         usock_pollrequest_t, upr_list);
97                         
98                         list_del(&pr->upr_list);
99                         rc = usocklnd_process_pollrequest(pr, pt_data);
100                         if (rc)
101                                 break;                        
102                 }
103                 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
104
105                 if (rc)
106                         break;
107
108                 /* Delete conns orphaned due to POLL_DEL_REQUESTs */
109                 usocklnd_process_stale_list(pt_data);
110                 
111                 /* Actual polling for events */
112                 rc = poll(pt_data->upt_pollfd,
113                           pt_data->upt_nfds,
114                           usock_tuns.ut_poll_timeout * 1000);
115
116                 if (rc < 0) {
117                         CERROR("Cannot poll(2): errno=%d\n", errno);
118                         break;
119                 }
120
121                 if (rc > 0)
122                         usocklnd_execute_handlers(pt_data);
123
124                 current_time = cfs_time_current();
125
126                 if (pt_data->upt_nfds < 2 ||
127                     cfs_time_before(current_time, planned_time))
128                         continue;
129
130                 /* catch up growing pollfd[] */
131                 if (pt_data->upt_nfds > saved_nfds) {
132                         extra = pt_data->upt_nfds - saved_nfds;
133                         saved_nfds = pt_data->upt_nfds;
134                 } else {
135                         extra = 0;
136                 }
137
138                 times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;                
139                 idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
140
141                 for (idx = idx_start; idx < idx_finish; idx++) {
142                         usock_conn_t *conn = pt_data->upt_idx2conn[idx];
143                         pthread_mutex_lock(&conn->uc_lock);
144                         if (usocklnd_conn_timed_out(conn, current_time) &&
145                             conn->uc_state != UC_DEAD) {
146                                 conn->uc_errored = 1;
147                                 usocklnd_conn_kill_locked(conn);
148                         }
149                         pthread_mutex_unlock(&conn->uc_lock);
150                 }
151
152                 if (idx_finish == pt_data->upt_nfds) {                        
153                         chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
154                         saved_nfds = pt_data->upt_nfds;
155                         idx_start = 1;
156                 }
157                 else {
158                         idx_start = idx_finish;
159                 }
160                 
161                 planned_time = cfs_time_add(current_time,
162                                             cfs_time_seconds(usock_tuns.ut_poll_timeout));
163         }
164         
165         /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
166         LASSERT (rc != 0 || pt_data->upt_nfds == 1);
167
168         if (rc) {
169                 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
170
171                 /* Block new poll requests to be enqueued */
172                 pt_data->upt_errno = rc;
173                 
174                 while (!list_empty(&pt_data->upt_pollrequests)) {
175                         usock_pollrequest_t *pr;
176                         pr = list_entry(pt_data->upt_pollrequests.next,
177                                         usock_pollrequest_t, upr_list);
178                         
179                         list_del(&pr->upr_list);
180
181                         if (pr->upr_type == POLL_ADD_REQUEST) {
182                                 close(pr->upr_conn->uc_fd);
183                                 list_add_tail(&pr->upr_conn->uc_stale_list,
184                                               &pt_data->upt_stale_list);
185                         } else {
186                                 usocklnd_conn_decref(pr->upr_conn);
187                         }
188                         
189                         LIBCFS_FREE (pr, sizeof(*pr));
190                 }
191                 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
192
193                 usocklnd_process_stale_list(pt_data);
194                 
195                 for (idx = 1; idx < pt_data->upt_nfds; idx++) {
196                         usock_conn_t *conn = pt_data->upt_idx2conn[idx];
197                         LASSERT(conn != NULL);
198                         close(conn->uc_fd);
199                         usocklnd_tear_peer_conn(conn);
200                         usocklnd_conn_decref(conn);
201                 }
202         }
203         
204         /* unblock usocklnd_shutdown() */
205         cfs_complete(&pt_data->upt_completion);
206
207         return 0;
208 }
209
210 /* Returns 0 on success, <0 else */
211 int
212 usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
213 {
214         int                  pt_idx = conn->uc_pt_idx;
215         usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];
216         usock_pollrequest_t *pr;
217
218         LIBCFS_ALLOC(pr, sizeof(*pr));
219         if (pr == NULL) {
220                 CERROR ("Cannot allocate poll request\n");
221                 return -ENOMEM;
222         }
223
224         pr->upr_conn = conn;
225         pr->upr_type = type;
226         pr->upr_value = value;
227
228         usocklnd_conn_addref(conn); /* +1 for poll request */
229         
230         pthread_mutex_lock(&pt->upt_pollrequests_lock);
231
232         if (pt->upt_errno) { /* very rare case: errored poll thread */
233                 int rc = pt->upt_errno;
234                 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
235                 usocklnd_conn_decref(conn);
236                 LIBCFS_FREE(pr, sizeof(*pr));
237                 return rc;
238         }
239         
240         list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
241         pthread_mutex_unlock(&pt->upt_pollrequests_lock);
242         return 0;
243 }
244
245 void
246 usocklnd_add_killrequest(usock_conn_t *conn)
247 {
248         int                  pt_idx = conn->uc_pt_idx;
249         usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];
250         usock_pollrequest_t *pr     = conn->uc_preq;
251         
252         /* Use preallocated poll request because there is no good
253          * workaround for ENOMEM error while killing connection */
254         if (pr) {
255                 pr->upr_conn  = conn;
256                 pr->upr_type  = POLL_DEL_REQUEST;
257                 pr->upr_value = 0;
258
259                 usocklnd_conn_addref(conn); /* +1 for poll request */
260                 
261                 pthread_mutex_lock(&pt->upt_pollrequests_lock);
262                 
263                 if (pt->upt_errno) { /* very rare case: errored poll thread */
264                         pthread_mutex_unlock(&pt->upt_pollrequests_lock);
265                         usocklnd_conn_decref(conn);
266                         return; /* conn will be killed in poll thread anyway */
267                 }
268
269                 list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
270                 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
271
272                 conn->uc_preq = NULL;
273         }
274 }
275
276 /* Process poll request. Update poll data.
277  * Returns 0 on success, <0 else */
278 int
279 usocklnd_process_pollrequest(usock_pollrequest_t *pr,
280                              usock_pollthread_t *pt_data)
281 {
282         int            type  = pr->upr_type;
283         short          value = pr->upr_value;
284         usock_conn_t  *conn  = pr->upr_conn;
285         int            idx = 0;
286         struct pollfd *pollfd   = pt_data->upt_pollfd;
287         int           *fd2idx   = pt_data->upt_fd2idx;
288         usock_conn_t **idx2conn = pt_data->upt_idx2conn;
289         int           *skip     = pt_data->upt_skip;
290         
291         LASSERT(conn != NULL);
292         LASSERT(conn->uc_fd >=0);
293         LASSERT(type == POLL_ADD_REQUEST ||
294                 conn->uc_fd < pt_data->upt_nfd2idx);
295
296         if (type != POLL_ADD_REQUEST) {
297                 idx = fd2idx[conn->uc_fd];
298                 if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
299                         LASSERT(pollfd[idx].fd == conn->uc_fd);
300                 } else { /* unlikely */
301                         CWARN("Very unlikely event happend: trying to"
302                               " handle poll request of type %d but idx=%d"
303                               " is out of range [1 ... %d]. Is shutdown"
304                               " in progress (%d)?\n",
305                               type, idx, pt_data->upt_nfds - 1,
306                               usock_data.ud_shutdown);
307
308                         LIBCFS_FREE (pr, sizeof(*pr));
309                         usocklnd_conn_decref(conn);
310                         return 0;
311                 }
312         }
313
314         LIBCFS_FREE (pr, sizeof(*pr));
315         
316         switch (type) {
317         case POLL_ADD_REQUEST:
318                 if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
319                         /* resize pollfd[], idx2conn[] and skip[] */
320                         struct pollfd *new_pollfd;
321                         int            new_npollfd = pt_data->upt_npollfd * 2;
322                         usock_conn_t **new_idx2conn;
323                         int           *new_skip;
324
325                         new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
326                                                      sizeof(struct pollfd));
327                         if (new_pollfd == NULL)
328                                 goto process_pollrequest_enomem;
329                         pt_data->upt_pollfd = pollfd = new_pollfd;
330                         
331                         new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
332                                                       sizeof(usock_conn_t *));
333                         if (new_idx2conn == NULL)
334                                 goto process_pollrequest_enomem;
335                         pt_data->upt_idx2conn = idx2conn = new_idx2conn;
336
337                         new_skip = LIBCFS_REALLOC(skip, new_npollfd *
338                                                   sizeof(int));
339                         if (new_skip == NULL)
340                                 goto process_pollrequest_enomem;
341                         pt_data->upt_skip = new_skip;
342                         
343                         pt_data->upt_npollfd = new_npollfd;
344                 }
345
346                 if (conn->uc_fd >= pt_data->upt_nfd2idx) {
347                         /* resize fd2idx[] */
348                         int *new_fd2idx;
349                         int  new_nfd2idx = pt_data->upt_nfd2idx * 2;
350
351                         while (new_nfd2idx <= conn->uc_fd)
352                                 new_nfd2idx *= 2;
353
354                         new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
355                                                     sizeof(int));
356                         if (new_fd2idx == NULL)
357                                 goto process_pollrequest_enomem;
358
359                         pt_data->upt_fd2idx = fd2idx = new_fd2idx;
360                         memset(fd2idx + pt_data->upt_nfd2idx, 0,
361                                (new_nfd2idx - pt_data->upt_nfd2idx)
362                                * sizeof(int));
363                         pt_data->upt_nfd2idx = new_nfd2idx;
364                 }
365
366                 LASSERT(fd2idx[conn->uc_fd] == 0);
367
368                 idx = pt_data->upt_nfds++;
369                 idx2conn[idx] = conn;
370                 fd2idx[conn->uc_fd] = idx;
371
372                 pollfd[idx].fd = conn->uc_fd;
373                 pollfd[idx].events = value;
374                 pollfd[idx].revents = 0;
375                 break;
376         case POLL_DEL_REQUEST:
377                 fd2idx[conn->uc_fd] = 0; /* invalidate this entry */
378                 
379                 --pt_data->upt_nfds;
380                 if (idx != pt_data->upt_nfds) {
381                         /* shift last entry into released position */
382                         memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
383                                sizeof(struct pollfd));
384                         idx2conn[idx] = idx2conn[pt_data->upt_nfds];
385                         fd2idx[pollfd[idx].fd] = idx;                        
386                 }
387
388                 close(conn->uc_fd);
389                 list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list);
390                 break;
391         case POLL_RX_SET_REQUEST:
392                 pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
393                 break;
394         case POLL_TX_SET_REQUEST:
395                 pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
396                 break;
397         case POLL_SET_REQUEST:
398                 pollfd[idx].events = value;
399                 break;
400         default:
401                 LBUG(); /* unknown type */                
402         }
403
404         /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
405          * reference that poll request possesses */
406         if (type != POLL_ADD_REQUEST)
407                 usocklnd_conn_decref(conn);
408         
409         return 0;
410
411   process_pollrequest_enomem:
412         usocklnd_conn_decref(conn);
413         return -ENOMEM;
414 }
415
416 /* Loop on poll data executing handlers repeatedly until
417  *  fair_limit is reached or all entries are exhausted */
418 void
419 usocklnd_execute_handlers(usock_pollthread_t *pt_data)
420 {
421         struct pollfd *pollfd   = pt_data->upt_pollfd;
422         int            nfds     = pt_data->upt_nfds;
423         usock_conn_t **idx2conn = pt_data->upt_idx2conn;
424         int           *skip     = pt_data->upt_skip;
425         int            j;
426
427         if (pollfd[0].revents & POLLIN)
428                 while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
429                         ;
430
431         skip[0] = 1; /* always skip notifier fd */
432
433         for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
434                 int prev = 0;
435                 int i = skip[0];
436                 
437                 if (i >= nfds) /* nothing ready */
438                         break;
439                 
440                 do {
441                         usock_conn_t *conn = idx2conn[i];
442                         int next;
443                         
444                         if (j == 0) /* first pass... */
445                                 next = skip[i] = i+1; /* set skip chain */
446                         else /* later passes... */
447                                 next = skip[i]; /* skip unready pollfds */
448
449                         /* kill connection if it's closed by peer and
450                          * there is no data pending for reading */
451                         if ((pollfd[i].revents & POLLERR) != 0 ||
452                             (pollfd[i].revents & POLLHUP) != 0) {
453                                 if ((pollfd[i].events & POLLIN) != 0 &&
454                                     (pollfd[i].revents & POLLIN) == 0)
455                                         usocklnd_conn_kill(conn);
456                                 else
457                                         usocklnd_exception_handler(conn);
458                         }
459                         
460                         if ((pollfd[i].revents & POLLIN) != 0 &&
461                             usocklnd_read_handler(conn) <= 0)
462                                 pollfd[i].revents &= ~POLLIN;
463                         
464                         if ((pollfd[i].revents & POLLOUT) != 0 &&
465                             usocklnd_write_handler(conn) <= 0)
466                                 pollfd[i].revents &= ~POLLOUT;
467                         
468                         if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
469                                 skip[prev] = next; /* skip this entry next pass */
470                         else
471                                 prev = i;
472                         
473                         i = next;
474                 } while (i < nfds);
475         }
476 }
477
478 int
479 usocklnd_calculate_chunk_size(int num)
480 {
481         const int n     = 4;
482         const int p     = usock_tuns.ut_poll_timeout;
483         int       chunk = num;
484         
485         /* chunk should be big enough to detect a timeout on any
486          * connection within (n+1)/n times the timeout interval
487          * if we checks every 'p' seconds 'chunk' conns */
488                  
489         if (usock_tuns.ut_timeout > n * p)
490                 chunk = (chunk * n * p) / usock_tuns.ut_timeout;
491         
492         if (chunk == 0)
493                 chunk = 1;
494
495         return chunk;
496 }
497
498 void
499 usocklnd_wakeup_pollthread(int i)
500 {
501         usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
502         int                 notification = 0;
503         int                 rc;
504
505         rc = syscall(SYS_write, pt->upt_notifier_fd, &notification,
506                      sizeof(notification));
507
508         if (rc != sizeof(notification))
509                 CERROR("Very unlikely event happend: "
510                        "cannot write to notifier fd (rc=%d; errno=%d)\n",
511                        rc, errno);
512 }