Whamcloud - gitweb
use one place for syscall.h
[fs/lustre-release.git] / lnet / ulnds / socklnd / poll.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Maxim Patlasov <maxim@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  */
11
12 #include "usocklnd.h"
13 #include <unistd.h>
14 #include <sys/syscall.h>
15
16 void
17 usocklnd_process_stale_list(usock_pollthread_t *pt_data)
18 {
19         while (!list_empty(&pt_data->upt_stale_list)) {
20                 usock_conn_t *conn;                        
21                 conn = list_entry(pt_data->upt_stale_list.next,
22                                   usock_conn_t, uc_stale_list);
23                 
24                 list_del(&conn->uc_stale_list);
25                 
26                 usocklnd_tear_peer_conn(conn);
27                 usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
28         }
29 }
30
31 int
32 usocklnd_poll_thread(void *arg)
33 {
34         int                 rc = 0;
35         usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
36         cfs_time_t          current_time;
37         cfs_time_t          planned_time;
38         int                 idx;
39         int                 idx_start;
40         int                 idx_finish;
41         int                 chunk;
42         int                 saved_nfds;
43         int                 extra;
44         int                 times;
45
46         /* mask signals to avoid SIGPIPE, etc */
47         sigset_t  sigs;
48         sigfillset (&sigs);
49         pthread_sigmask (SIG_SETMASK, &sigs, 0);
50         
51         LASSERT(pt_data != NULL);
52         
53         planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
54         chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
55         saved_nfds = pt_data->upt_nfds;
56         idx_start = 1;
57         
58         /* Main loop */
59         while (usock_data.ud_shutdown == 0) {
60                 rc = 0;
61
62                 /* Process all enqueued poll requests */
63                 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
64                 while (!list_empty(&pt_data->upt_pollrequests)) {
65                         usock_pollrequest_t *pr;
66                         pr = list_entry(pt_data->upt_pollrequests.next,
67                                         usock_pollrequest_t, upr_list);
68                         
69                         list_del(&pr->upr_list);
70                         rc = usocklnd_process_pollrequest(pr, pt_data);
71                         if (rc)
72                                 break;                        
73                 }
74                 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
75
76                 if (rc)
77                         break;
78
79                 /* Delete conns orphaned due to POLL_DEL_REQUESTs */
80                 usocklnd_process_stale_list(pt_data);
81                 
82                 /* Actual polling for events */
83                 rc = poll(pt_data->upt_pollfd,
84                           pt_data->upt_nfds,
85                           usock_tuns.ut_poll_timeout * 1000);
86
87                 if (rc < 0) {
88                         CERROR("Cannot poll(2): errno=%d\n", errno);
89                         break;
90                 }
91
92                 if (rc > 0)
93                         usocklnd_execute_handlers(pt_data);
94
95                 current_time = cfs_time_current();
96
97                 if (pt_data->upt_nfds < 2 ||
98                     cfs_time_before(current_time, planned_time))
99                         continue;
100
101                 /* catch up growing pollfd[] */
102                 if (pt_data->upt_nfds > saved_nfds) {
103                         extra = pt_data->upt_nfds - saved_nfds;
104                         saved_nfds = pt_data->upt_nfds;
105                 } else {
106                         extra = 0;
107                 }
108
109                 times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;                
110                 idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
111
112                 for (idx = idx_start; idx < idx_finish; idx++) {
113                         usock_conn_t *conn = pt_data->upt_idx2conn[idx];
114                         pthread_mutex_lock(&conn->uc_lock);
115                         if (usocklnd_conn_timed_out(conn, current_time) &&
116                             conn->uc_state != UC_DEAD) {
117                                 conn->uc_errored = 1;
118                                 usocklnd_conn_kill_locked(conn);
119                         }
120                         pthread_mutex_unlock(&conn->uc_lock);
121                 }
122
123                 if (idx_finish == pt_data->upt_nfds) {                        
124                         chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
125                         saved_nfds = pt_data->upt_nfds;
126                         idx_start = 1;
127                 }
128                 else {
129                         idx_start = idx_finish;
130                 }
131                 
132                 planned_time = cfs_time_add(current_time,
133                                             cfs_time_seconds(usock_tuns.ut_poll_timeout));
134         }
135         
136         /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
137         LASSERT (rc != 0 || pt_data->upt_nfds == 1);
138
139         if (rc) {
140                 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
141
142                 /* Block new poll requests to be enqueued */
143                 pt_data->upt_errno = rc;
144                 
145                 while (!list_empty(&pt_data->upt_pollrequests)) {
146                         usock_pollrequest_t *pr;
147                         pr = list_entry(pt_data->upt_pollrequests.next,
148                                         usock_pollrequest_t, upr_list);
149                         
150                         list_del(&pr->upr_list);
151
152                         if (pr->upr_type == POLL_ADD_REQUEST) {
153                                 close(pr->upr_conn->uc_fd);
154                                 list_add_tail(&pr->upr_conn->uc_stale_list,
155                                               &pt_data->upt_stale_list);
156                         } else {
157                                 usocklnd_conn_decref(pr->upr_conn);
158                         }
159                         
160                         LIBCFS_FREE (pr, sizeof(*pr));
161                 }
162                 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
163
164                 usocklnd_process_stale_list(pt_data);
165                 
166                 for (idx = 1; idx < pt_data->upt_nfds; idx++) {
167                         usock_conn_t *conn = pt_data->upt_idx2conn[idx];
168                         LASSERT(conn != NULL);
169                         close(conn->uc_fd);
170                         usocklnd_tear_peer_conn(conn);
171                         usocklnd_conn_decref(conn);
172                 }
173         }
174         
175         /* unblock usocklnd_shutdown() */
176         cfs_complete(&pt_data->upt_completion);
177
178         return 0;
179 }
180
181 /* Returns 0 on success, <0 else */
182 int
183 usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
184 {
185         int                  pt_idx = conn->uc_pt_idx;
186         usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];
187         usock_pollrequest_t *pr;
188
189         LIBCFS_ALLOC(pr, sizeof(*pr));
190         if (pr == NULL) {
191                 CERROR ("Cannot allocate poll request\n");
192                 return -ENOMEM;
193         }
194
195         pr->upr_conn = conn;
196         pr->upr_type = type;
197         pr->upr_value = value;
198
199         usocklnd_conn_addref(conn); /* +1 for poll request */
200         
201         pthread_mutex_lock(&pt->upt_pollrequests_lock);
202
203         if (pt->upt_errno) { /* very rare case: errored poll thread */
204                 int rc = pt->upt_errno;
205                 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
206                 usocklnd_conn_decref(conn);
207                 LIBCFS_FREE(pr, sizeof(*pr));
208                 return rc;
209         }
210         
211         list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
212         pthread_mutex_unlock(&pt->upt_pollrequests_lock);
213         return 0;
214 }
215
216 void
217 usocklnd_add_killrequest(usock_conn_t *conn)
218 {
219         int                  pt_idx = conn->uc_pt_idx;
220         usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];
221         usock_pollrequest_t *pr     = conn->uc_preq;
222         
223         /* Use preallocated poll request because there is no good
224          * workaround for ENOMEM error while killing connection */
225         if (pr) {
226                 pr->upr_conn  = conn;
227                 pr->upr_type  = POLL_DEL_REQUEST;
228                 pr->upr_value = 0;
229
230                 usocklnd_conn_addref(conn); /* +1 for poll request */
231                 
232                 pthread_mutex_lock(&pt->upt_pollrequests_lock);
233                 
234                 if (pt->upt_errno) { /* very rare case: errored poll thread */
235                         pthread_mutex_unlock(&pt->upt_pollrequests_lock);
236                         usocklnd_conn_decref(conn);
237                         return; /* conn will be killed in poll thread anyway */
238                 }
239
240                 list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
241                 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
242
243                 conn->uc_preq = NULL;
244         }
245 }
246
247 /* Process poll request. Update poll data.
248  * Returns 0 on success, <0 else */
249 int
250 usocklnd_process_pollrequest(usock_pollrequest_t *pr,
251                              usock_pollthread_t *pt_data)
252 {
253         int            type  = pr->upr_type;
254         short          value = pr->upr_value;
255         usock_conn_t  *conn  = pr->upr_conn;
256         int            idx = 0;
257         struct pollfd *pollfd   = pt_data->upt_pollfd;
258         int           *fd2idx   = pt_data->upt_fd2idx;
259         usock_conn_t **idx2conn = pt_data->upt_idx2conn;
260         int           *skip     = pt_data->upt_skip;
261         
262         LASSERT(conn != NULL);
263         LASSERT(conn->uc_fd >=0);
264         LASSERT(type == POLL_ADD_REQUEST ||
265                 conn->uc_fd < pt_data->upt_nfd2idx);
266
267         if (type != POLL_ADD_REQUEST) {
268                 idx = fd2idx[conn->uc_fd];
269                 if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
270                         LASSERT(pollfd[idx].fd == conn->uc_fd);
271                 } else { /* unlikely */
272                         CWARN("Very unlikely event happend: trying to"
273                               " handle poll request of type %d but idx=%d"
274                               " is out of range [1 ... %d]. Is shutdown"
275                               " in progress (%d)?\n",
276                               type, idx, pt_data->upt_nfds - 1,
277                               usock_data.ud_shutdown);
278
279                         LIBCFS_FREE (pr, sizeof(*pr));
280                         usocklnd_conn_decref(conn);
281                         return 0;
282                 }
283         }
284
285         LIBCFS_FREE (pr, sizeof(*pr));
286         
287         switch (type) {
288         case POLL_ADD_REQUEST:
289                 if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
290                         /* resize pollfd[], idx2conn[] and skip[] */
291                         struct pollfd *new_pollfd;
292                         int            new_npollfd = pt_data->upt_npollfd * 2;
293                         usock_conn_t **new_idx2conn;
294                         int           *new_skip;
295
296                         new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
297                                                      sizeof(struct pollfd));
298                         if (new_pollfd == NULL)
299                                 goto process_pollrequest_enomem;
300                         pt_data->upt_pollfd = pollfd = new_pollfd;
301                         
302                         new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
303                                                       sizeof(usock_conn_t *));
304                         if (new_idx2conn == NULL)
305                                 goto process_pollrequest_enomem;
306                         pt_data->upt_idx2conn = idx2conn = new_idx2conn;
307
308                         new_skip = LIBCFS_REALLOC(skip, new_npollfd *
309                                                   sizeof(int));
310                         if (new_skip == NULL)
311                                 goto process_pollrequest_enomem;
312                         pt_data->upt_skip = new_skip;
313                         
314                         pt_data->upt_npollfd = new_npollfd;
315                 }
316
317                 if (conn->uc_fd >= pt_data->upt_nfd2idx) {
318                         /* resize fd2idx[] */
319                         int *new_fd2idx;
320                         int  new_nfd2idx = pt_data->upt_nfd2idx * 2;
321
322                         while (new_nfd2idx <= conn->uc_fd)
323                                 new_nfd2idx *= 2;
324
325                         new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
326                                                     sizeof(int));
327                         if (new_fd2idx == NULL)
328                                 goto process_pollrequest_enomem;
329
330                         pt_data->upt_fd2idx = fd2idx = new_fd2idx;
331                         memset(fd2idx + pt_data->upt_nfd2idx, 0,
332                                (new_nfd2idx - pt_data->upt_nfd2idx)
333                                * sizeof(int));
334                         pt_data->upt_nfd2idx = new_nfd2idx;
335                 }
336
337                 LASSERT(fd2idx[conn->uc_fd] == 0);
338
339                 idx = pt_data->upt_nfds++;
340                 idx2conn[idx] = conn;
341                 fd2idx[conn->uc_fd] = idx;
342
343                 pollfd[idx].fd = conn->uc_fd;
344                 pollfd[idx].events = value;
345                 pollfd[idx].revents = 0;
346                 break;
347         case POLL_DEL_REQUEST:
348                 fd2idx[conn->uc_fd] = 0; /* invalidate this entry */
349                 
350                 --pt_data->upt_nfds;
351                 if (idx != pt_data->upt_nfds) {
352                         /* shift last entry into released position */
353                         memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
354                                sizeof(struct pollfd));
355                         idx2conn[idx] = idx2conn[pt_data->upt_nfds];
356                         fd2idx[pollfd[idx].fd] = idx;                        
357                 }
358
359                 close(conn->uc_fd);
360                 list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list);
361                 break;
362         case POLL_RX_SET_REQUEST:
363                 pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
364                 break;
365         case POLL_TX_SET_REQUEST:
366                 pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
367                 break;
368         case POLL_SET_REQUEST:
369                 pollfd[idx].events = value;
370                 break;
371         default:
372                 LBUG(); /* unknown type */                
373         }
374
375         /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
376          * reference that poll request possesses */
377         if (type != POLL_ADD_REQUEST)
378                 usocklnd_conn_decref(conn);
379         
380         return 0;
381
382   process_pollrequest_enomem:
383         usocklnd_conn_decref(conn);
384         return -ENOMEM;
385 }
386
387 /* Loop on poll data executing handlers repeatedly until
388  *  fair_limit is reached or all entries are exhausted */
389 void
390 usocklnd_execute_handlers(usock_pollthread_t *pt_data)
391 {
392         struct pollfd *pollfd   = pt_data->upt_pollfd;
393         int            nfds     = pt_data->upt_nfds;
394         usock_conn_t **idx2conn = pt_data->upt_idx2conn;
395         int           *skip     = pt_data->upt_skip;
396         int            j;
397
398         if (pollfd[0].revents & POLLIN)
399                 while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
400                         ;
401
402         skip[0] = 1; /* always skip notifier fd */
403
404         for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
405                 int prev = 0;
406                 int i = skip[0];
407                 
408                 if (i >= nfds) /* nothing ready */
409                         break;
410                 
411                 do {
412                         usock_conn_t *conn = idx2conn[i];
413                         int next;
414                         
415                         if (j == 0) /* first pass... */
416                                 next = skip[i] = i+1; /* set skip chain */
417                         else /* later passes... */
418                                 next = skip[i]; /* skip unready pollfds */
419
420                         /* kill connection if it's closed by peer and
421                          * there is no data pending for reading */
422                         if ((pollfd[i].revents & POLLERR) != 0 ||
423                             (pollfd[i].revents & POLLHUP) != 0) {
424                                 if ((pollfd[i].events & POLLIN) != 0 &&
425                                     (pollfd[i].revents & POLLIN) == 0)
426                                         usocklnd_conn_kill(conn);
427                                 else
428                                         usocklnd_exception_handler(conn);
429                         }
430                         
431                         if ((pollfd[i].revents & POLLIN) != 0 &&
432                             usocklnd_read_handler(conn) <= 0)
433                                 pollfd[i].revents &= ~POLLIN;
434                         
435                         if ((pollfd[i].revents & POLLOUT) != 0 &&
436                             usocklnd_write_handler(conn) <= 0)
437                                 pollfd[i].revents &= ~POLLOUT;
438                         
439                         if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
440                                 skip[prev] = next; /* skip this entry next pass */
441                         else
442                                 prev = i;
443                         
444                         i = next;
445                 } while (i < nfds);
446         }
447 }
448
449 int
450 usocklnd_calculate_chunk_size(int num)
451 {
452         const int n     = 4;
453         const int p     = usock_tuns.ut_poll_timeout;
454         int       chunk = num;
455         
456         /* chunk should be big enough to detect a timeout on any
457          * connection within (n+1)/n times the timeout interval
458          * if we checks every 'p' seconds 'chunk' conns */
459                  
460         if (usock_tuns.ut_timeout > n * p)
461                 chunk = (chunk * n * p) / usock_tuns.ut_timeout;
462         
463         if (chunk == 0)
464                 chunk = 1;
465
466         return chunk;
467 }
468
469 void
470 usocklnd_wakeup_pollthread(int i)
471 {
472         usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
473         int                 notification = 0;
474         int                 rc;
475
476         rc = syscall(SYS_write, pt->upt_notifier_fd, &notification,
477                      sizeof(notification));
478
479         if (rc != sizeof(notification))
480                 CERROR("Very unlikely event happend: "
481                        "cannot write to notifier fd (rc=%d; errno=%d)\n",
482                        rc, errno);
483 }