4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/ulnds/socklnd/poll.c
36 * Author: Maxim Patlasov <maxim@clusterfs.com>
41 #include <sys/syscall.h>
44 usocklnd_process_stale_list(usock_pollthread_t *pt_data)
46 while (!cfs_list_empty(&pt_data->upt_stale_list)) {
48 conn = cfs_list_entry(pt_data->upt_stale_list.next,
49 usock_conn_t, uc_stale_list);
51 cfs_list_del(&conn->uc_stale_list);
53 usocklnd_tear_peer_conn(conn);
54 usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
59 usocklnd_poll_thread(void *arg)
62 usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
63 cfs_time_t current_time;
64 cfs_time_t planned_time;
73 /* mask signals to avoid SIGPIPE, etc */
76 pthread_sigmask (SIG_SETMASK, &sigs, 0);
78 LASSERT(pt_data != NULL);
80 planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
81 chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
82 saved_nfds = pt_data->upt_nfds;
86 while (usock_data.ud_shutdown == 0) {
89 /* Process all enqueued poll requests */
90 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
91 while (!cfs_list_empty(&pt_data->upt_pollrequests)) {
92 usock_pollrequest_t *pr;
93 pr = cfs_list_entry(pt_data->upt_pollrequests.next,
94 usock_pollrequest_t, upr_list);
96 cfs_list_del(&pr->upr_list);
97 rc = usocklnd_process_pollrequest(pr, pt_data);
101 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
106 /* Delete conns orphaned due to POLL_DEL_REQUESTs */
107 usocklnd_process_stale_list(pt_data);
109 /* Actual polling for events */
110 rc = poll(pt_data->upt_pollfd,
112 usock_tuns.ut_poll_timeout * 1000);
114 if (rc < 0 && errno != EINTR) {
115 CERROR("Cannot poll(2): errno=%d\n", errno);
120 usocklnd_execute_handlers(pt_data);
122 current_time = cfs_time_current();
124 if (pt_data->upt_nfds < 2 ||
125 cfs_time_before(current_time, planned_time))
128 /* catch up growing pollfd[] */
129 if (pt_data->upt_nfds > saved_nfds) {
130 extra = pt_data->upt_nfds - saved_nfds;
131 saved_nfds = pt_data->upt_nfds;
136 times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
137 idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
139 for (idx = idx_start; idx < idx_finish; idx++) {
140 usock_conn_t *conn = pt_data->upt_idx2conn[idx];
141 pthread_mutex_lock(&conn->uc_lock);
142 if (usocklnd_conn_timed_out(conn, current_time) &&
143 conn->uc_state != UC_DEAD) {
144 conn->uc_errored = 1;
145 usocklnd_conn_kill_locked(conn);
147 pthread_mutex_unlock(&conn->uc_lock);
150 if (idx_finish == pt_data->upt_nfds) {
151 chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
152 saved_nfds = pt_data->upt_nfds;
156 idx_start = idx_finish;
159 planned_time = cfs_time_add(current_time,
160 cfs_time_seconds(usock_tuns.ut_poll_timeout));
163 /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
164 LASSERT (rc != 0 || pt_data->upt_nfds == 1);
167 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
169 /* Block new poll requests to be enqueued */
170 pt_data->upt_errno = rc;
172 while (!cfs_list_empty(&pt_data->upt_pollrequests)) {
173 usock_pollrequest_t *pr;
174 pr = cfs_list_entry(pt_data->upt_pollrequests.next,
175 usock_pollrequest_t, upr_list);
177 cfs_list_del(&pr->upr_list);
179 if (pr->upr_type == POLL_ADD_REQUEST) {
180 libcfs_sock_release(pr->upr_conn->uc_sock);
181 cfs_list_add_tail(&pr->upr_conn->uc_stale_list,
182 &pt_data->upt_stale_list);
184 usocklnd_conn_decref(pr->upr_conn);
187 LIBCFS_FREE (pr, sizeof(*pr));
189 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
191 usocklnd_process_stale_list(pt_data);
193 for (idx = 1; idx < pt_data->upt_nfds; idx++) {
194 usock_conn_t *conn = pt_data->upt_idx2conn[idx];
195 LASSERT(conn != NULL);
196 libcfs_sock_release(conn->uc_sock);
197 usocklnd_tear_peer_conn(conn);
198 usocklnd_conn_decref(conn);
202 /* unblock usocklnd_shutdown() */
203 cfs_mt_complete(&pt_data->upt_completion);
208 /* Returns 0 on success, <0 else */
210 usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
212 int pt_idx = conn->uc_pt_idx;
213 usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
214 usock_pollrequest_t *pr;
216 LIBCFS_ALLOC(pr, sizeof(*pr));
218 CERROR ("Cannot allocate poll request\n");
224 pr->upr_value = value;
226 usocklnd_conn_addref(conn); /* +1 for poll request */
228 pthread_mutex_lock(&pt->upt_pollrequests_lock);
230 if (pt->upt_errno) { /* very rare case: errored poll thread */
231 int rc = pt->upt_errno;
232 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
233 usocklnd_conn_decref(conn);
234 LIBCFS_FREE(pr, sizeof(*pr));
238 cfs_list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
239 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
244 usocklnd_add_killrequest(usock_conn_t *conn)
246 int pt_idx = conn->uc_pt_idx;
247 usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
248 usock_pollrequest_t *pr = conn->uc_preq;
250 /* Use preallocated poll request because there is no good
251 * workaround for ENOMEM error while killing connection */
254 pr->upr_type = POLL_DEL_REQUEST;
257 usocklnd_conn_addref(conn); /* +1 for poll request */
259 pthread_mutex_lock(&pt->upt_pollrequests_lock);
261 if (pt->upt_errno) { /* very rare case: errored poll thread */
262 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
263 usocklnd_conn_decref(conn);
264 return; /* conn will be killed in poll thread anyway */
267 cfs_list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
268 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
270 conn->uc_preq = NULL;
274 /* Process poll request. Update poll data.
275 * Returns 0 on success, <0 else */
277 usocklnd_process_pollrequest(usock_pollrequest_t *pr,
278 usock_pollthread_t *pt_data)
280 int type = pr->upr_type;
281 short value = pr->upr_value;
282 usock_conn_t *conn = pr->upr_conn;
284 struct pollfd *pollfd = pt_data->upt_pollfd;
285 int *fd2idx = pt_data->upt_fd2idx;
286 usock_conn_t **idx2conn = pt_data->upt_idx2conn;
287 int *skip = pt_data->upt_skip;
289 LASSERT(conn != NULL);
290 LASSERT(conn->uc_sock != NULL);
291 LASSERT(type == POLL_ADD_REQUEST ||
292 LIBCFS_SOCK2FD(conn->uc_sock) < pt_data->upt_nfd2idx);
294 if (type != POLL_ADD_REQUEST) {
295 idx = fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)];
296 if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
297 LASSERT(pollfd[idx].fd ==
298 LIBCFS_SOCK2FD(conn->uc_sock));
299 } else { /* unlikely */
300 CWARN("Very unlikely event happend: trying to"
301 " handle poll request of type %d but idx=%d"
302 " is out of range [1 ... %d]. Is shutdown"
303 " in progress (%d)?\n",
304 type, idx, pt_data->upt_nfds - 1,
305 usock_data.ud_shutdown);
307 LIBCFS_FREE (pr, sizeof(*pr));
308 usocklnd_conn_decref(conn);
313 LIBCFS_FREE (pr, sizeof(*pr));
316 case POLL_ADD_REQUEST:
317 if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
318 /* resize pollfd[], idx2conn[] and skip[] */
319 struct pollfd *new_pollfd;
320 int new_npollfd = pt_data->upt_npollfd * 2;
321 usock_conn_t **new_idx2conn;
324 new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
325 sizeof(struct pollfd));
326 if (new_pollfd == NULL)
327 goto process_pollrequest_enomem;
328 pt_data->upt_pollfd = pollfd = new_pollfd;
330 new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
331 sizeof(usock_conn_t *));
332 if (new_idx2conn == NULL)
333 goto process_pollrequest_enomem;
334 pt_data->upt_idx2conn = idx2conn = new_idx2conn;
336 new_skip = LIBCFS_REALLOC(skip, new_npollfd *
338 if (new_skip == NULL)
339 goto process_pollrequest_enomem;
340 pt_data->upt_skip = new_skip;
342 pt_data->upt_npollfd = new_npollfd;
345 if (LIBCFS_SOCK2FD(conn->uc_sock) >= pt_data->upt_nfd2idx) {
346 /* resize fd2idx[] */
348 int new_nfd2idx = pt_data->upt_nfd2idx * 2;
350 while (new_nfd2idx <= LIBCFS_SOCK2FD(conn->uc_sock))
353 new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
355 if (new_fd2idx == NULL)
356 goto process_pollrequest_enomem;
358 pt_data->upt_fd2idx = fd2idx = new_fd2idx;
359 memset(fd2idx + pt_data->upt_nfd2idx, 0,
360 (new_nfd2idx - pt_data->upt_nfd2idx)
362 pt_data->upt_nfd2idx = new_nfd2idx;
365 LASSERT(fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] == 0);
367 idx = pt_data->upt_nfds++;
368 idx2conn[idx] = conn;
369 fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = idx;
371 pollfd[idx].fd = LIBCFS_SOCK2FD(conn->uc_sock);
372 pollfd[idx].events = value;
373 pollfd[idx].revents = 0;
375 case POLL_DEL_REQUEST:
376 fd2idx[LIBCFS_SOCK2FD(conn->uc_sock)] = 0; /* invalidate this
379 if (idx != pt_data->upt_nfds) {
380 /* shift last entry into released position */
381 memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
382 sizeof(struct pollfd));
383 idx2conn[idx] = idx2conn[pt_data->upt_nfds];
384 fd2idx[pollfd[idx].fd] = idx;
387 libcfs_sock_release(conn->uc_sock);
388 cfs_list_add_tail(&conn->uc_stale_list,
389 &pt_data->upt_stale_list);
391 case POLL_RX_SET_REQUEST:
392 pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
394 case POLL_TX_SET_REQUEST:
395 pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
397 case POLL_SET_REQUEST:
398 pollfd[idx].events = value;
401 LBUG(); /* unknown type */
404 /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
405 * reference that poll request possesses */
406 if (type != POLL_ADD_REQUEST)
407 usocklnd_conn_decref(conn);
411 process_pollrequest_enomem:
412 usocklnd_conn_decref(conn);
416 /* Loop on poll data executing handlers repeatedly until
417 * fair_limit is reached or all entries are exhausted */
419 usocklnd_execute_handlers(usock_pollthread_t *pt_data)
421 struct pollfd *pollfd = pt_data->upt_pollfd;
422 int nfds = pt_data->upt_nfds;
423 usock_conn_t **idx2conn = pt_data->upt_idx2conn;
424 int *skip = pt_data->upt_skip;
427 if (pollfd[0].revents & POLLIN)
428 while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
431 skip[0] = 1; /* always skip notifier fd */
433 for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
437 if (i >= nfds) /* nothing ready */
441 usock_conn_t *conn = idx2conn[i];
444 if (j == 0) /* first pass... */
445 next = skip[i] = i+1; /* set skip chain */
446 else /* later passes... */
447 next = skip[i]; /* skip unready pollfds */
449 /* kill connection if it's closed by peer and
450 * there is no data pending for reading */
451 if ((pollfd[i].revents & POLLERR) != 0 ||
452 (pollfd[i].revents & POLLHUP) != 0) {
453 if ((pollfd[i].events & POLLIN) != 0 &&
454 (pollfd[i].revents & POLLIN) == 0)
455 usocklnd_conn_kill(conn);
457 usocklnd_exception_handler(conn);
460 if ((pollfd[i].revents & POLLIN) != 0 &&
461 usocklnd_read_handler(conn) <= 0)
462 pollfd[i].revents &= ~POLLIN;
464 if ((pollfd[i].revents & POLLOUT) != 0 &&
465 usocklnd_write_handler(conn) <= 0)
466 pollfd[i].revents &= ~POLLOUT;
468 if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
469 skip[prev] = next; /* skip this entry next pass */
479 usocklnd_calculate_chunk_size(int num)
482 const int p = usock_tuns.ut_poll_timeout;
485 /* chunk should be big enough to detect a timeout on any
486 * connection within (n+1)/n times the timeout interval
487 * if we checks every 'p' seconds 'chunk' conns */
489 if (usock_tuns.ut_timeout > n * p)
490 chunk = (chunk * n * p) / usock_tuns.ut_timeout;
499 usocklnd_wakeup_pollthread(int i)
501 usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
502 int notification = 0;
505 rc = syscall(SYS_write, LIBCFS_SOCK2FD(pt->upt_notifier[0]),
506 ¬ification, sizeof(notification));
508 if (rc != sizeof(notification))
509 CERROR("Very unlikely event happend: "
510 "cannot write to notifier fd (rc=%d; errno=%d)\n",