1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Maxim Patlasov <maxim@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
14 #include <sys/syscall.h>
17 usocklnd_process_stale_list(usock_pollthread_t *pt_data)
19 while (!list_empty(&pt_data->upt_stale_list)) {
21 conn = list_entry(pt_data->upt_stale_list.next,
22 usock_conn_t, uc_stale_list);
24 list_del(&conn->uc_stale_list);
26 usocklnd_tear_peer_conn(conn);
27 usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */
32 usocklnd_poll_thread(void *arg)
35 usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;
36 cfs_time_t current_time;
37 cfs_time_t planned_time;
46 /* mask signals to avoid SIGPIPE, etc */
49 pthread_sigmask (SIG_SETMASK, &sigs, 0);
51 LASSERT(pt_data != NULL);
53 planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);
54 chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
55 saved_nfds = pt_data->upt_nfds;
59 while (usock_data.ud_shutdown == 0) {
62 /* Process all enqueued poll requests */
63 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
64 while (!list_empty(&pt_data->upt_pollrequests)) {
65 usock_pollrequest_t *pr;
66 pr = list_entry(pt_data->upt_pollrequests.next,
67 usock_pollrequest_t, upr_list);
69 list_del(&pr->upr_list);
70 rc = usocklnd_process_pollrequest(pr, pt_data);
74 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
79 /* Delete conns orphaned due to POLL_DEL_REQUESTs */
80 usocklnd_process_stale_list(pt_data);
82 /* Actual polling for events */
83 rc = poll(pt_data->upt_pollfd,
85 usock_tuns.ut_poll_timeout * 1000);
88 CERROR("Cannot poll(2): errno=%d\n", errno);
93 usocklnd_execute_handlers(pt_data);
95 current_time = cfs_time_current();
97 if (pt_data->upt_nfds < 2 ||
98 cfs_time_before(current_time, planned_time))
101 /* catch up growing pollfd[] */
102 if (pt_data->upt_nfds > saved_nfds) {
103 extra = pt_data->upt_nfds - saved_nfds;
104 saved_nfds = pt_data->upt_nfds;
109 times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;
110 idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);
112 for (idx = idx_start; idx < idx_finish; idx++) {
113 usock_conn_t *conn = pt_data->upt_idx2conn[idx];
114 pthread_mutex_lock(&conn->uc_lock);
115 if (usocklnd_conn_timed_out(conn, current_time) &&
116 conn->uc_state != UC_DEAD) {
117 conn->uc_errored = 1;
118 usocklnd_conn_kill_locked(conn);
120 pthread_mutex_unlock(&conn->uc_lock);
123 if (idx_finish == pt_data->upt_nfds) {
124 chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);
125 saved_nfds = pt_data->upt_nfds;
129 idx_start = idx_finish;
132 planned_time = cfs_time_add(current_time,
133 cfs_time_seconds(usock_tuns.ut_poll_timeout));
136 /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */
137 LASSERT (rc != 0 || pt_data->upt_nfds == 1);
140 pthread_mutex_lock(&pt_data->upt_pollrequests_lock);
142 /* Block new poll requests to be enqueued */
143 pt_data->upt_errno = rc;
145 while (!list_empty(&pt_data->upt_pollrequests)) {
146 usock_pollrequest_t *pr;
147 pr = list_entry(pt_data->upt_pollrequests.next,
148 usock_pollrequest_t, upr_list);
150 list_del(&pr->upr_list);
152 if (pr->upr_type == POLL_ADD_REQUEST) {
153 close(pr->upr_conn->uc_fd);
154 list_add_tail(&pr->upr_conn->uc_stale_list,
155 &pt_data->upt_stale_list);
157 usocklnd_conn_decref(pr->upr_conn);
160 LIBCFS_FREE (pr, sizeof(*pr));
162 pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);
164 usocklnd_process_stale_list(pt_data);
166 for (idx = 1; idx < pt_data->upt_nfds; idx++) {
167 usock_conn_t *conn = pt_data->upt_idx2conn[idx];
168 LASSERT(conn != NULL);
170 usocklnd_tear_peer_conn(conn);
171 usocklnd_conn_decref(conn);
175 /* unblock usocklnd_shutdown() */
176 cfs_complete(&pt_data->upt_completion);
181 /* Returns 0 on success, <0 else */
183 usocklnd_add_pollrequest(usock_conn_t *conn, int type, short value)
185 int pt_idx = conn->uc_pt_idx;
186 usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
187 usock_pollrequest_t *pr;
189 LIBCFS_ALLOC(pr, sizeof(*pr));
191 CERROR ("Cannot allocate poll request\n");
197 pr->upr_value = value;
199 usocklnd_conn_addref(conn); /* +1 for poll request */
201 pthread_mutex_lock(&pt->upt_pollrequests_lock);
203 if (pt->upt_errno) { /* very rare case: errored poll thread */
204 int rc = pt->upt_errno;
205 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
206 usocklnd_conn_decref(conn);
207 LIBCFS_FREE(pr, sizeof(*pr));
211 list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
212 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
217 usocklnd_add_killrequest(usock_conn_t *conn)
219 int pt_idx = conn->uc_pt_idx;
220 usock_pollthread_t *pt = &usock_data.ud_pollthreads[pt_idx];
221 usock_pollrequest_t *pr = conn->uc_preq;
223 /* Use preallocated poll request because there is no good
224 * workaround for ENOMEM error while killing connection */
227 pr->upr_type = POLL_DEL_REQUEST;
230 usocklnd_conn_addref(conn); /* +1 for poll request */
232 pthread_mutex_lock(&pt->upt_pollrequests_lock);
234 if (pt->upt_errno) { /* very rare case: errored poll thread */
235 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
236 usocklnd_conn_decref(conn);
237 return; /* conn will be killed in poll thread anyway */
240 list_add_tail(&pr->upr_list, &pt->upt_pollrequests);
241 pthread_mutex_unlock(&pt->upt_pollrequests_lock);
243 conn->uc_preq = NULL;
247 /* Process poll request. Update poll data.
248 * Returns 0 on success, <0 else */
250 usocklnd_process_pollrequest(usock_pollrequest_t *pr,
251 usock_pollthread_t *pt_data)
253 int type = pr->upr_type;
254 short value = pr->upr_value;
255 usock_conn_t *conn = pr->upr_conn;
257 struct pollfd *pollfd = pt_data->upt_pollfd;
258 int *fd2idx = pt_data->upt_fd2idx;
259 usock_conn_t **idx2conn = pt_data->upt_idx2conn;
260 int *skip = pt_data->upt_skip;
262 LASSERT(conn != NULL);
263 LASSERT(conn->uc_fd >=0);
264 LASSERT(type == POLL_ADD_REQUEST ||
265 conn->uc_fd < pt_data->upt_nfd2idx);
267 if (type != POLL_ADD_REQUEST) {
268 idx = fd2idx[conn->uc_fd];
269 if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */
270 LASSERT(pollfd[idx].fd == conn->uc_fd);
271 } else { /* unlikely */
272 CWARN("Very unlikely event happend: trying to"
273 " handle poll request of type %d but idx=%d"
274 " is out of range [1 ... %d]. Is shutdown"
275 " in progress (%d)?\n",
276 type, idx, pt_data->upt_nfds - 1,
277 usock_data.ud_shutdown);
279 LIBCFS_FREE (pr, sizeof(*pr));
280 usocklnd_conn_decref(conn);
285 LIBCFS_FREE (pr, sizeof(*pr));
288 case POLL_ADD_REQUEST:
289 if (pt_data->upt_nfds >= pt_data->upt_npollfd) {
290 /* resize pollfd[], idx2conn[] and skip[] */
291 struct pollfd *new_pollfd;
292 int new_npollfd = pt_data->upt_npollfd * 2;
293 usock_conn_t **new_idx2conn;
296 new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *
297 sizeof(struct pollfd));
298 if (new_pollfd == NULL)
299 goto process_pollrequest_enomem;
300 pt_data->upt_pollfd = pollfd = new_pollfd;
302 new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *
303 sizeof(usock_conn_t *));
304 if (new_idx2conn == NULL)
305 goto process_pollrequest_enomem;
306 pt_data->upt_idx2conn = idx2conn = new_idx2conn;
308 new_skip = LIBCFS_REALLOC(skip, new_npollfd *
310 if (new_skip == NULL)
311 goto process_pollrequest_enomem;
312 pt_data->upt_skip = new_skip;
314 pt_data->upt_npollfd = new_npollfd;
317 if (conn->uc_fd >= pt_data->upt_nfd2idx) {
318 /* resize fd2idx[] */
320 int new_nfd2idx = pt_data->upt_nfd2idx * 2;
322 while (new_nfd2idx <= conn->uc_fd)
325 new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *
327 if (new_fd2idx == NULL)
328 goto process_pollrequest_enomem;
330 pt_data->upt_fd2idx = fd2idx = new_fd2idx;
331 memset(fd2idx + pt_data->upt_nfd2idx, 0,
332 (new_nfd2idx - pt_data->upt_nfd2idx)
334 pt_data->upt_nfd2idx = new_nfd2idx;
337 LASSERT(fd2idx[conn->uc_fd] == 0);
339 idx = pt_data->upt_nfds++;
340 idx2conn[idx] = conn;
341 fd2idx[conn->uc_fd] = idx;
343 pollfd[idx].fd = conn->uc_fd;
344 pollfd[idx].events = value;
345 pollfd[idx].revents = 0;
347 case POLL_DEL_REQUEST:
348 fd2idx[conn->uc_fd] = 0; /* invalidate this entry */
351 if (idx != pt_data->upt_nfds) {
352 /* shift last entry into released position */
353 memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],
354 sizeof(struct pollfd));
355 idx2conn[idx] = idx2conn[pt_data->upt_nfds];
356 fd2idx[pollfd[idx].fd] = idx;
360 list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list);
362 case POLL_RX_SET_REQUEST:
363 pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;
365 case POLL_TX_SET_REQUEST:
366 pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;
368 case POLL_SET_REQUEST:
369 pollfd[idx].events = value;
372 LBUG(); /* unknown type */
375 /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the
376 * reference that poll request possesses */
377 if (type != POLL_ADD_REQUEST)
378 usocklnd_conn_decref(conn);
382 process_pollrequest_enomem:
383 usocklnd_conn_decref(conn);
387 /* Loop on poll data executing handlers repeatedly until
388 * fair_limit is reached or all entries are exhausted */
390 usocklnd_execute_handlers(usock_pollthread_t *pt_data)
392 struct pollfd *pollfd = pt_data->upt_pollfd;
393 int nfds = pt_data->upt_nfds;
394 usock_conn_t **idx2conn = pt_data->upt_idx2conn;
395 int *skip = pt_data->upt_skip;
398 if (pollfd[0].revents & POLLIN)
399 while (usocklnd_notifier_handler(pollfd[0].fd) > 0)
402 skip[0] = 1; /* always skip notifier fd */
404 for (j = 0; j < usock_tuns.ut_fair_limit; j++) {
408 if (i >= nfds) /* nothing ready */
412 usock_conn_t *conn = idx2conn[i];
415 if (j == 0) /* first pass... */
416 next = skip[i] = i+1; /* set skip chain */
417 else /* later passes... */
418 next = skip[i]; /* skip unready pollfds */
420 /* kill connection if it's closed by peer and
421 * there is no data pending for reading */
422 if ((pollfd[i].revents & POLLERR) != 0 ||
423 (pollfd[i].revents & POLLHUP) != 0) {
424 if ((pollfd[i].events & POLLIN) != 0 &&
425 (pollfd[i].revents & POLLIN) == 0)
426 usocklnd_conn_kill(conn);
428 usocklnd_exception_handler(conn);
431 if ((pollfd[i].revents & POLLIN) != 0 &&
432 usocklnd_read_handler(conn) <= 0)
433 pollfd[i].revents &= ~POLLIN;
435 if ((pollfd[i].revents & POLLOUT) != 0 &&
436 usocklnd_write_handler(conn) <= 0)
437 pollfd[i].revents &= ~POLLOUT;
439 if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)
440 skip[prev] = next; /* skip this entry next pass */
450 usocklnd_calculate_chunk_size(int num)
453 const int p = usock_tuns.ut_poll_timeout;
456 /* chunk should be big enough to detect a timeout on any
457 * connection within (n+1)/n times the timeout interval
458 * if we checks every 'p' seconds 'chunk' conns */
460 if (usock_tuns.ut_timeout > n * p)
461 chunk = (chunk * n * p) / usock_tuns.ut_timeout;
470 usocklnd_wakeup_pollthread(int i)
472 usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];
473 int notification = 0;
476 rc = syscall(SYS_write, pt->upt_notifier_fd, ¬ification,
477 sizeof(notification));
479 if (rc != sizeof(notification))
480 CERROR("Very unlikely event happend: "
481 "cannot write to notifier fd (rc=%d; errno=%d)\n",