1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/ulnds/socklnd/poll.c
38 * Author: Maxim Patlasov <maxim@clusterfs.com>
43 #include <sys/syscall.h>
46 usocklnd_process_stale_list(usock_pollthread_t *pt_data)
48 while (!list_empty(&pt_data->upt_stale_list)) {
50 conn = list_entry(pt_data->upt_stale_list.next,
51 usock_conn_t, uc_stale_list);
53 list_del(&conn->uc_stale_list);
55 usocklnd_tear_peer_conn(conn);
56 usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
61 usocklnd_poll_thread(void *arg)
64 usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
65 cfs_time_t current_time;
66 cfs_time_t planned_time;
75 /* mask signals to avoid SIGPIPE, etc */
78 pthread_sigmask (SIG_SETMASK, &sigs, 0);
80 LASSERT(pt_data != NULL);
82 planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
83 chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
84 saved_nfds = pt_data->upt_nfds;
88 while (usock_data.ud_shutdown == 0) {
91 /* Process all enqueued poll requests */
92 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
93 while (!list_empty(&pt_data->upt_pollrequests)) {
94 usock_pollrequest_t *pr;
95 pr = list_entry(pt_data->upt_pollrequests.next,
96 usock_pollrequest_t, upr_list);
98 list_del(&pr->upr_list);
99 rc = usocklnd_process_pollrequest(pr, pt_data);
103 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
108 /* Delete conns orphaned due to POLL_DEL_REQUESTs */
109 usocklnd_process_stale_list(pt_data);
111 /* Actual polling for events */
112 rc = poll(pt_data->upt_pollfd,
114 usock_tuns.ut_poll_timeout * 1000);
117 CERROR("Cannot poll(2): errno=%d\n", errno);
122 usocklnd_execute_handlers(pt_data);
124 current_time = cfs_time_current();
126 if (pt_data->upt_nfds < 2 ||
127 cfs_time_before(current_time, planned_time))
130 /* catch up growing pollfd[] */
131 if (pt_data->upt_nfds > saved_nfds) {
132 extra = pt_data->upt_nfds - saved_nfds;
133 saved_nfds = pt_data->upt_nfds;
138 times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
139 idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
141 for (idx = idx_start; idx < idx_finish; idx++) {
142 usock_conn_t *conn = pt_data->upt_idx2conn[idx];
143 pthread_mutex_lock(&conn->uc_lock);
144 if (usocklnd_conn_timed_out(conn, current_time) &&
145 conn->uc_state != UC_DEAD) {
146 conn->uc_errored = 1;
147 usocklnd_conn_kill_locked(conn);
149 pthread_mutex_unlock(&conn->uc_lock);
152 if (idx_finish == pt_data->upt_nfds) {
153 chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
154 saved_nfds = pt_data->upt_nfds;
158 idx_start = idx_finish;
161 planned_time = cfs_time_add(current_time,
162 cfs_time_seconds(usock_tuns.ut_poll_timeout));
165 /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
166 LASSERT (rc != 0 || pt_data->upt_nfds == 1);
169 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
171 /* Block new poll requests to be enqueued */
172 pt_data->upt_errno = rc;
174 while (!list_empty(&pt_data->upt_pollrequests)) {
175 usock_pollrequest_t *pr;
176 pr = list_entry(pt_data->upt_pollrequests.next,
177 usock_pollrequest_t, upr_list);
179 list_del(&pr->upr_list);
181 if (pr->upr_type == POLL_ADD_REQUEST) {
182 libcfs_sock_release(pr->upr_conn->uc_sock);
183 list_add_tail(&pr->upr_conn->uc_stale_list,
184 &pt_data->upt_stale_list);
186 usocklnd_conn_decref(pr->upr_conn);
189 LIBCFS_FREE (pr, sizeof(*pr));
191 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
193 usocklnd_process_stale_list(pt_data);
195 for (idx = 1; idx < pt_data->upt_nfds; idx++) {
196 usock_conn_t *conn = pt_data->upt_idx2conn[idx];
197 LASSERT(conn != NULL);
198 libcfs_sock_release(conn->uc_sock);
199 usocklnd_tear_peer_conn(conn);
200 usocklnd_conn_decref(conn);
204 /* unblock usocklnd_shutdown() */
205 cfs_complete(&pt_data->upt_completion);
210 /* Returns 0 on success, <0 else */
212 usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
214 int pt_idx = conn->uc_pt_idx;
215 usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
216 usock_pollrequest_t *pr;
218 LIBCFS_ALLOC(pr, sizeof(*pr));
220 CERROR ("Cannot allocate poll request\n");
226 pr->upr_value = value;
228 usocklnd_conn_addref(conn); /* +1 for poll request */
230 pthread_mutex_lock(&pt->upt_pollrequests_lock);
232 if (pt->upt_errno) { /* very rare case: errored poll thread */
233 int rc = pt->upt_errno;
234 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
235 usocklnd_conn_decref(conn);
236 LIBCFS_FREE(pr, sizeof(*pr));
240 list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
241 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
246 usocklnd_add_killrequest(usock_conn_t *conn)
248 int pt_idx = conn->uc_pt_idx;
249 usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
250 usock_pollrequest_t *pr = conn->uc_preq;
252 /* Use preallocated poll request because there is no good
253 * workaround for ENOMEM error while killing connection */
256 pr->upr_type = POLL_DEL_REQUEST;
259 usocklnd_conn_addref(conn); /* +1 for poll request */
261 pthread_mutex_lock(&pt->upt_pollrequests_lock);
263 if (pt->upt_errno) { /* very rare case: errored poll thread */
264 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
265 usocklnd_conn_decref(conn);
266 return; /* conn will be killed in poll thread anyway */
269 list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
270 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
272 conn->uc_preq = NULL;
276 /* Process poll request. Update poll data.
277 * Returns 0 on success, <0 else */
279 usocklnd_process_pollrequest(usock_pollrequest_t *pr,
280 usock_pollthread_t *pt_data)
282 int type = pr->upr_type;
283 short value = pr->upr_value;
284 usock_conn_t *conn = pr->upr_conn;
286 struct pollfd *pollfd = pt_data->upt_pollfd;
287 int *fd2idx = pt_data->upt_fd2idx;
288 usock_conn_t **idx2conn = pt_data->upt_idx2conn;
289 int *skip = pt_data->upt_skip;
291 LASSERT(conn != NULL);
292 LASSERT(conn->uc_sock != NULL);
293 LASSERT(type == POLL_ADD_REQUEST ||
294 LIBCFS_SOCK2FD(conn->uc_sock) < pt_data->upt_nfd2idx);
296 if (type != POLL_ADD_REQUEST) {
297 idx = fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)];
298 if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
299 LASSERT(pollfd[idx].fd ==
300 LIBCFS_SOCK2FD(conn->uc_sock));
301 } else { /* unlikely */
302 CWARN("Very unlikely event happend: trying to"
303 " handle poll request of type %d but idx=%d"
304 " is out of range [1 ... %d]. Is shutdown"
305 " in progress (%d)?\n",
306 type, idx, pt_data->upt_nfds - 1,
307 usock_data.ud_shutdown);
309 LIBCFS_FREE (pr, sizeof(*pr));
310 usocklnd_conn_decref(conn);
315 LIBCFS_FREE (pr, sizeof(*pr));
318 case POLL_ADD_REQUEST:
319 if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
320 /* resize pollfd[], idx2conn[] and skip[] */
321 struct pollfd *new_pollfd;
322 int new_npollfd = pt_data->upt_npollfd * 2;
323 usock_conn_t **new_idx2conn;
326 new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
327 sizeof(struct pollfd));
328 if (new_pollfd == NULL)
329 goto process_pollrequest_enomem;
330 pt_data->upt_pollfd = pollfd = new_pollfd;
332 new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
333 sizeof(usock_conn_t *));
334 if (new_idx2conn == NULL)
335 goto process_pollrequest_enomem;
336 pt_data->upt_idx2conn = idx2conn = new_idx2conn;
338 new_skip = LIBCFS_REALLOC(skip, new_npollfd *
340 if (new_skip == NULL)
341 goto process_pollrequest_enomem;
342 pt_data->upt_skip = new_skip;
344 pt_data->upt_npollfd = new_npollfd;
347 if (LIBCFS_SOCK2FD(conn->uc_sock) >= pt_data->upt_nfd2idx) {
348 /* resize fd2idx[] */
350 int new_nfd2idx = pt_data->upt_nfd2idx * 2;
352 while (new_nfd2idx <= LIBCFS_SOCK2FD(conn->uc_sock))
355 new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
357 if (new_fd2idx == NULL)
358 goto process_pollrequest_enomem;
360 pt_data->upt_fd2idx = fd2idx = new_fd2idx;
361 memset(fd2idx + pt_data->upt_nfd2idx, 0,
362 (new_nfd2idx - pt_data->upt_nfd2idx)
364 pt_data->upt_nfd2idx = new_nfd2idx;
367 LASSERT(fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] == 0);
369 idx = pt_data->upt_nfds++;
370 idx2conn[idx] = conn;
371 fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = idx;
373 pollfd[idx].fd = LIBCFS_SOCK2FD(conn->uc_sock);
374 pollfd[idx].events = value;
375 pollfd[idx].revents = 0;
377 case POLL_DEL_REQUEST:
378 fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = 0; /* invalidate this
381 if (idx != pt_data->upt_nfds) {
382 /* shift last entry into released position */
383 memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
384 sizeof(struct pollfd));
385 idx2conn[idx] = idx2conn[pt_data->upt_nfds];
386 fd2idx[pollfd[idx].fd] = idx;
389 libcfs_sock_release(conn->uc_sock);
390 list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list);
392 case POLL_RX_SET_REQUEST:
393 pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
395 case POLL_TX_SET_REQUEST:
396 pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
398 case POLL_SET_REQUEST:
399 pollfd[idx].events = value;
402 LBUG(); /* unknown type */
405 /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
406 * reference that poll request possesses */
407 if (type != POLL_ADD_REQUEST)
408 usocklnd_conn_decref(conn);
412 process_pollrequest_enomem:
413 usocklnd_conn_decref(conn);
417 /* Loop on poll data executing handlers repeatedly until
418 * fair_limit is reached or all entries are exhausted */
420 usocklnd_execute_handlers(usock_pollthread_t *pt_data)
422 struct pollfd *pollfd = pt_data->upt_pollfd;
423 int nfds = pt_data->upt_nfds;
424 usock_conn_t **idx2conn = pt_data->upt_idx2conn;
425 int *skip = pt_data->upt_skip;
428 if (pollfd[0].revents & POLLIN)
429 while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
432 skip[0] = 1; /* always skip notifier fd */
434 for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
438 if (i >= nfds) /* nothing ready */
442 usock_conn_t *conn = idx2conn[i];
445 if (j == 0) /* first pass... */
446 next = skip[i] = i+1; /* set skip chain */
447 else /* later passes... */
448 next = skip[i]; /* skip unready pollfds */
450 /* kill connection if it's closed by peer and
451 * there is no data pending for reading */
452 if ((pollfd[i].revents & POLLERR) != 0 ||
453 (pollfd[i].revents & POLLHUP) != 0) {
454 if ((pollfd[i].events & POLLIN) != 0 &&
455 (pollfd[i].revents & POLLIN) == 0)
456 usocklnd_conn_kill(conn);
458 usocklnd_exception_handler(conn);
461 if ((pollfd[i].revents & POLLIN) != 0 &&
462 usocklnd_read_handler(conn) <= 0)
463 pollfd[i].revents &= ~POLLIN;
465 if ((pollfd[i].revents & POLLOUT) != 0 &&
466 usocklnd_write_handler(conn) <= 0)
467 pollfd[i].revents &= ~POLLOUT;
469 if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
470 skip[prev] = next; /* skip this entry next pass */
480 usocklnd_calculate_chunk_size(int num)
483 const int p = usock_tuns.ut_poll_timeout;
486 /* chunk should be big enough to detect a timeout on any
487 * connection within (n+1)/n times the timeout interval
488 * if we checks every 'p' seconds 'chunk' conns */
490 if (usock_tuns.ut_timeout > n * p)
491 chunk = (chunk * n * p) / usock_tuns.ut_timeout;
500 usocklnd_wakeup_pollthread(int i)
502 usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
503 int notification = 0;
506 rc = syscall(SYS_write, LIBCFS_SOCK2FD(pt->upt_notifier[0]),
507 ¬ification, sizeof(notification));
509 if (rc != sizeof(notification))
510 CERROR("Very unlikely event happend: "
511 "cannot write to notifier fd (rc=%d; errno=%d)\n",