1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/ulnds/socklnd/poll.c
38 * Author: Maxim Patlasov <maxim@clusterfs.com>
43 #include <sys/syscall.h>
46 usocklnd_process_stale_list(usock_pollthread_t *pt_data)
48 while (!list_empty(&pt_data->upt_stale_list)) {
50 conn = list_entry(pt_data->upt_stale_list.next,
51 usock_conn_t, uc_stale_list);
53 list_del(&conn->uc_stale_list);
55 usocklnd_tear_peer_conn(conn);
56 usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
61 usocklnd_poll_thread(void *arg)
64 usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
65 cfs_time_t current_time;
66 cfs_time_t planned_time;
75 /* mask signals to avoid SIGPIPE, etc */
78 pthread_sigmask (SIG_SETMASK, &sigs, 0);
80 LASSERT(pt_data != NULL);
82 planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
83 chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
84 saved_nfds = pt_data->upt_nfds;
88 while (usock_data.ud_shutdown == 0) {
91 /* Process all enqueued poll requests */
92 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
93 while (!list_empty(&pt_data->upt_pollrequests)) {
94 usock_pollrequest_t *pr;
95 pr = list_entry(pt_data->upt_pollrequests.next,
96 usock_pollrequest_t, upr_list);
98 list_del(&pr->upr_list);
99 rc = usocklnd_process_pollrequest(pr, pt_data);
103 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
108 /* Delete conns orphaned due to POLL_DEL_REQUESTs */
109 usocklnd_process_stale_list(pt_data);
111 /* Actual polling for events */
112 rc = poll(pt_data->upt_pollfd,
114 usock_tuns.ut_poll_timeout * 1000);
117 CERROR("Cannot poll(2): errno=%d\n", errno);
122 usocklnd_execute_handlers(pt_data);
124 current_time = cfs_time_current();
126 if (pt_data->upt_nfds < 2 ||
127 cfs_time_before(current_time, planned_time))
130 /* catch up growing pollfd[] */
131 if (pt_data->upt_nfds > saved_nfds) {
132 extra = pt_data->upt_nfds - saved_nfds;
133 saved_nfds = pt_data->upt_nfds;
138 times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
139 idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
141 for (idx = idx_start; idx < idx_finish; idx++) {
142 usock_conn_t *conn = pt_data->upt_idx2conn[idx];
143 pthread_mutex_lock(&conn->uc_lock);
144 if (usocklnd_conn_timed_out(conn, current_time) &&
145 conn->uc_state != UC_DEAD) {
146 conn->uc_errored = 1;
147 usocklnd_conn_kill_locked(conn);
149 pthread_mutex_unlock(&conn->uc_lock);
152 if (idx_finish == pt_data->upt_nfds) {
153 chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
154 saved_nfds = pt_data->upt_nfds;
158 idx_start = idx_finish;
161 planned_time = cfs_time_add(current_time,
162 cfs_time_seconds(usock_tuns.ut_poll_timeout));
165 /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
166 LASSERT (rc != 0 || pt_data->upt_nfds == 1);
169 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
171 /* Block new poll requests to be enqueued */
172 pt_data->upt_errno = rc;
174 while (!list_empty(&pt_data->upt_pollrequests)) {
175 usock_pollrequest_t *pr;
176 pr = list_entry(pt_data->upt_pollrequests.next,
177 usock_pollrequest_t, upr_list);
179 list_del(&pr->upr_list);
181 if (pr->upr_type == POLL_ADD_REQUEST) {
182 close(pr->upr_conn->uc_fd);
183 list_add_tail(&pr->upr_conn->uc_stale_list,
184 &pt_data->upt_stale_list);
186 usocklnd_conn_decref(pr->upr_conn);
189 LIBCFS_FREE (pr, sizeof(*pr));
191 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
193 usocklnd_process_stale_list(pt_data);
195 for (idx = 1; idx < pt_data->upt_nfds; idx++) {
196 usock_conn_t *conn = pt_data->upt_idx2conn[idx];
197 LASSERT(conn != NULL);
199 usocklnd_tear_peer_conn(conn);
200 usocklnd_conn_decref(conn);
204 /* unblock usocklnd_shutdown() */
205 cfs_complete(&pt_data->upt_completion);
210 /* Returns 0 on success, <0 else */
212 usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
214 int pt_idx = conn->uc_pt_idx;
215 usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
216 usock_pollrequest_t *pr;
218 LIBCFS_ALLOC(pr, sizeof(*pr));
220 CERROR ("Cannot allocate poll request\n");
226 pr->upr_value = value;
228 usocklnd_conn_addref(conn); /* +1 for poll request */
230 pthread_mutex_lock(&pt->upt_pollrequests_lock);
232 if (pt->upt_errno) { /* very rare case: errored poll thread */
233 int rc = pt->upt_errno;
234 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
235 usocklnd_conn_decref(conn);
236 LIBCFS_FREE(pr, sizeof(*pr));
240 list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
241 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
246 usocklnd_add_killrequest(usock_conn_t *conn)
248 int pt_idx = conn->uc_pt_idx;
249 usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
250 usock_pollrequest_t *pr = conn->uc_preq;
252 /* Use preallocated poll request because there is no good
253 * workaround for ENOMEM error while killing connection */
256 pr->upr_type = POLL_DEL_REQUEST;
259 usocklnd_conn_addref(conn); /* +1 for poll request */
261 pthread_mutex_lock(&pt->upt_pollrequests_lock);
263 if (pt->upt_errno) { /* very rare case: errored poll thread */
264 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
265 usocklnd_conn_decref(conn);
266 return; /* conn will be killed in poll thread anyway */
269 list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
270 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
272 conn->uc_preq = NULL;
276 /* Process poll request. Update poll data.
277 * Returns 0 on success, <0 else */
279 usocklnd_process_pollrequest(usock_pollrequest_t *pr,
280 usock_pollthread_t *pt_data)
282 int type = pr->upr_type;
283 short value = pr->upr_value;
284 usock_conn_t *conn = pr->upr_conn;
286 struct pollfd *pollfd = pt_data->upt_pollfd;
287 int *fd2idx = pt_data->upt_fd2idx;
288 usock_conn_t **idx2conn = pt_data->upt_idx2conn;
289 int *skip = pt_data->upt_skip;
291 LASSERT(conn != NULL);
292 LASSERT(conn->uc_fd >=0);
293 LASSERT(type == POLL_ADD_REQUEST ||
294 conn->uc_fd < pt_data->upt_nfd2idx);
296 if (type != POLL_ADD_REQUEST) {
297 idx = fd2idx[conn->uc_fd];
298 if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
299 LASSERT(pollfd[idx].fd == conn->uc_fd);
300 } else { /* unlikely */
301 CWARN("Very unlikely event happend: trying to"
302 " handle poll request of type %d but idx=%d"
303 " is out of range [1 ... %d]. Is shutdown"
304 " in progress (%d)?\n",
305 type, idx, pt_data->upt_nfds - 1,
306 usock_data.ud_shutdown);
308 LIBCFS_FREE (pr, sizeof(*pr));
309 usocklnd_conn_decref(conn);
314 LIBCFS_FREE (pr, sizeof(*pr));
317 case POLL_ADD_REQUEST:
318 if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
319 /* resize pollfd[], idx2conn[] and skip[] */
320 struct pollfd *new_pollfd;
321 int new_npollfd = pt_data->upt_npollfd * 2;
322 usock_conn_t **new_idx2conn;
325 new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
326 sizeof(struct pollfd));
327 if (new_pollfd == NULL)
328 goto process_pollrequest_enomem;
329 pt_data->upt_pollfd = pollfd = new_pollfd;
331 new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
332 sizeof(usock_conn_t *));
333 if (new_idx2conn == NULL)
334 goto process_pollrequest_enomem;
335 pt_data->upt_idx2conn = idx2conn = new_idx2conn;
337 new_skip = LIBCFS_REALLOC(skip, new_npollfd *
339 if (new_skip == NULL)
340 goto process_pollrequest_enomem;
341 pt_data->upt_skip = new_skip;
343 pt_data->upt_npollfd = new_npollfd;
346 if (conn->uc_fd >= pt_data->upt_nfd2idx) {
347 /* resize fd2idx[] */
349 int new_nfd2idx = pt_data->upt_nfd2idx * 2;
351 while (new_nfd2idx <= conn->uc_fd)
354 new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
356 if (new_fd2idx == NULL)
357 goto process_pollrequest_enomem;
359 pt_data->upt_fd2idx = fd2idx = new_fd2idx;
360 memset(fd2idx + pt_data->upt_nfd2idx, 0,
361 (new_nfd2idx - pt_data->upt_nfd2idx)
363 pt_data->upt_nfd2idx = new_nfd2idx;
366 LASSERT(fd2idx[conn->uc_fd] == 0);
368 idx = pt_data->upt_nfds++;
369 idx2conn[idx] = conn;
370 fd2idx[conn->uc_fd] = idx;
372 pollfd[idx].fd = conn->uc_fd;
373 pollfd[idx].events = value;
374 pollfd[idx].revents = 0;
376 case POLL_DEL_REQUEST:
377 fd2idx[conn->uc_fd] = 0; /* invalidate this entry */
380 if (idx != pt_data->upt_nfds) {
381 /* shift last entry into released position */
382 memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
383 sizeof(struct pollfd));
384 idx2conn[idx] = idx2conn[pt_data->upt_nfds];
385 fd2idx[pollfd[idx].fd] = idx;
389 list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list);
391 case POLL_RX_SET_REQUEST:
392 pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
394 case POLL_TX_SET_REQUEST:
395 pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
397 case POLL_SET_REQUEST:
398 pollfd[idx].events = value;
401 LBUG(); /* unknown type */
404 /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
405 * reference that poll request possesses */
406 if (type != POLL_ADD_REQUEST)
407 usocklnd_conn_decref(conn);
411 process_pollrequest_enomem:
412 usocklnd_conn_decref(conn);
416 /* Loop on poll data executing handlers repeatedly until
417 * fair_limit is reached or all entries are exhausted */
419 usocklnd_execute_handlers(usock_pollthread_t *pt_data)
421 struct pollfd *pollfd = pt_data->upt_pollfd;
422 int nfds = pt_data->upt_nfds;
423 usock_conn_t **idx2conn = pt_data->upt_idx2conn;
424 int *skip = pt_data->upt_skip;
427 if (pollfd[0].revents & POLLIN)
428 while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
431 skip[0] = 1; /* always skip notifier fd */
433 for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
437 if (i >= nfds) /* nothing ready */
441 usock_conn_t *conn = idx2conn[i];
444 if (j == 0) /* first pass... */
445 next = skip[i] = i+1; /* set skip chain */
446 else /* later passes... */
447 next = skip[i]; /* skip unready pollfds */
449 /* kill connection if it's closed by peer and
450 * there is no data pending for reading */
451 if ((pollfd[i].revents & POLLERR) != 0 ||
452 (pollfd[i].revents & POLLHUP) != 0) {
453 if ((pollfd[i].events & POLLIN) != 0 &&
454 (pollfd[i].revents & POLLIN) == 0)
455 usocklnd_conn_kill(conn);
457 usocklnd_exception_handler(conn);
460 if ((pollfd[i].revents & POLLIN) != 0 &&
461 usocklnd_read_handler(conn) <= 0)
462 pollfd[i].revents &= ~POLLIN;
464 if ((pollfd[i].revents & POLLOUT) != 0 &&
465 usocklnd_write_handler(conn) <= 0)
466 pollfd[i].revents &= ~POLLOUT;
468 if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
469 skip[prev] = next; /* skip this entry next pass */
479 usocklnd_calculate_chunk_size(int num)
482 const int p = usock_tuns.ut_poll_timeout;
485 /* chunk should be big enough to detect a timeout on any
486 * connection within (n+1)/n times the timeout interval
487 * if we checks every 'p' seconds 'chunk' conns */
489 if (usock_tuns.ut_timeout > n * p)
490 chunk = (chunk * n * p) / usock_tuns.ut_timeout;
499 usocklnd_wakeup_pollthread(int i)
501 usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
502 int notification = 0;
505 rc = syscall(SYS_write, pt->upt_notifier_fd, ¬ification,
506 sizeof(notification));
508 if (rc != sizeof(notification))
509 CERROR("Very unlikely event happend: "
510 "cannot write to notifier fd (rc=%d; errno=%d)\n",