Whamcloud - gitweb
8ea6fddb12d745b59baf8a1efaf0a9be056668ad
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-eq.c
5  * Library level Event queue management routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define DEBUG_SUBSYSTEM S_PORTALS
27 #include <portals/lib-p30.h>
28
29 int 
30 lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
31                   ptl_eq_handler_t callback, 
32                   ptl_handle_eq_t *handle)
33 {
34         lib_nal_t     *nal = apinal->nal_data;
35         lib_eq_t      *eq;
36         unsigned long  flags;
37         int            rc;
38
39         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
40          * overflow, they don't skip entries, so the queue has the same
41          * apparant capacity at all times */
42
43         if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
44                 do {                    /* knock off all but the top bit... */
45                         count &= ~LOWEST_BIT_SET (count);
46                 } while (count != LOWEST_BIT_SET(count));
47
48                 count <<= 1;                             /* ...and round up */
49         }
50
51         if (count == 0)        /* catch bad parameter / overflow on roundup */
52                 return (PTL_VAL_FAILED);
53         
54         eq = lib_eq_alloc (nal);
55         if (eq == NULL)
56                 return (PTL_NO_SPACE);
57
58         PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t));
59         if (eq->eq_events == NULL) {
60                 LIB_LOCK(nal, flags);
61                 lib_eq_free (nal, eq);
62                 LIB_UNLOCK(nal, flags);
63         }
64
65         if (nal->libnal_map != NULL) {
66                 struct iovec iov = {
67                         .iov_base = eq->eq_events,
68                         .iov_len = count * sizeof(ptl_event_t)};
69
70                 rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey);
71                 if (rc != PTL_OK) {
72                         LIB_LOCK(nal, flags);
73                         lib_eq_free (nal, eq);
74                         LIB_UNLOCK(nal, flags);
75                         return (rc);
76                 }
77         }
78
79         /* NB this resets all event sequence numbers to 0, to be earlier
80          * than eq_deq_seq */
81         memset(eq->eq_events, 0, count * sizeof(ptl_event_t));
82
83         eq->eq_deq_seq = 1;
84         eq->eq_enq_seq = 1;
85         eq->eq_size = count;
86         eq->eq_refcount = 0;
87         eq->eq_callback = callback;
88
89         LIB_LOCK(nal, flags);
90
91         lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ);
92         list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs);
93
94         LIB_UNLOCK(nal, flags);
95
96         ptl_eq2handle(handle, nal, eq);
97         return (PTL_OK);
98 }
99
100 int 
101 lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh)
102 {
103         lib_nal_t     *nal = apinal->nal_data;
104         lib_eq_t      *eq;
105         int            size;
106         ptl_event_t   *events;
107         void          *addrkey;
108         unsigned long  flags;
109
110         LIB_LOCK(nal, flags);
111
112         eq = ptl_handle2eq(eqh, nal);
113         if (eq == NULL) {
114                 LIB_UNLOCK(nal, flags);
115                 return (PTL_EQ_INVALID);
116         }
117
118         if (eq->eq_refcount != 0) {
119                 LIB_UNLOCK(nal, flags);
120                 return (PTL_EQ_IN_USE);
121         }
122
123         /* stash for free after lock dropped */
124         events  = eq->eq_events;
125         size    = eq->eq_size;
126         addrkey = eq->eq_addrkey;
127
128         lib_invalidate_handle (nal, &eq->eq_lh);
129         list_del (&eq->eq_list);
130         lib_eq_free (nal, eq);
131
132         LIB_UNLOCK(nal, flags);
133
134         if (nal->libnal_unmap != NULL) {
135                 struct iovec iov = {
136                         .iov_base = events,
137                         .iov_len = size * sizeof(ptl_event_t)};
138
139                 nal->libnal_unmap(nal, 1, &iov, &addrkey);
140         }
141
142         PORTAL_FREE(events, size * sizeof (ptl_event_t));
143
144         return (PTL_OK);
145 }
146
147 int
148 lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
149 {
150         int          new_index = eq->eq_deq_seq & (eq->eq_size - 1);
151         ptl_event_t *new_event = &eq->eq_events[new_index];
152         int          rc;
153         ENTRY;
154
155         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
156                new_event, eq->eq_deq_seq, eq->eq_size);
157
158         if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
159                 RETURN(PTL_EQ_EMPTY);
160         }
161
162         /* We've got a new event... */
163         *ev = *new_event;
164
165         /* ...but did it overwrite an event we've not seen yet? */
166         if (eq->eq_deq_seq == new_event->sequence) {
167                 rc = PTL_OK;
168         } else {
169                 CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n",
170                        eq->eq_deq_seq, new_event->sequence);
171                 rc = PTL_EQ_DROPPED;
172         }
173
174         eq->eq_deq_seq = new_event->sequence + 1;
175         RETURN(rc);
176 }
177
178
179 int
180 lib_api_eq_poll (nal_t *apinal, 
181                  ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
182                  ptl_event_t *event, int *which)
183 {
184         lib_nal_t       *nal = apinal->nal_data;
185         lib_ni_t        *ni = &nal->libnal_ni;
186         unsigned long    flags;
187         int              i;
188         int              rc;
189 #ifdef __KERNEL__
190         wait_queue_t     wq;
191         unsigned long    now;
192 #else
193         struct timeval   then;
194         struct timeval   now;
195         struct timespec  ts;
196 #endif
197         ENTRY;
198
199         LIB_LOCK(nal, flags);
200
201         for (;;) {
202                 for (i = 0; i < neq; i++) {
203                         lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal);
204
205                         rc = lib_get_event (eq, event);
206                         if (rc != PTL_EQ_EMPTY) {
207                                 LIB_UNLOCK(nal, flags);
208                                 *which = i;
209                                 RETURN(rc);
210                         }
211                 }
212                 
213                 if (timeout_ms == 0) {
214                         LIB_UNLOCK (nal, flags);
215                         RETURN (PTL_EQ_EMPTY);
216                 }
217
218                 /* Some architectures force us to do spin locking/unlocking
219                  * in the same stack frame, means we can abstract the
220                  * locking here */
221 #ifdef __KERNEL__
222                 init_waitqueue_entry(&wq, current);
223                 set_current_state(TASK_INTERRUPTIBLE);
224                 add_wait_queue(&ni->ni_waitq, &wq);
225
226                 LIB_UNLOCK(nal, flags);
227
228                 if (timeout_ms < 0) {
229                         schedule ();
230                 } else {
231                         now = jiffies;
232                         schedule_timeout((timeout_ms * HZ)/1000);
233                         timeout_ms -= ((jiffies - now) * 1000)/HZ;
234                         if (timeout_ms < 0)
235                                 timeout_ms = 0;
236                 }
237                 
238                 LIB_LOCK(nal, flags);
239 #else
240                 if (timeout_ms < 0) {
241                         pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex);
242                 } else {
243                         gettimeofday(&then, NULL);
244                         
245                         ts.tv_sec = then.tv_sec + timeout_ms/1000;
246                         ts.tv_nsec = then.tv_usec * 1000 + 
247                                      (timeout_ms%1000) * 1000000;
248                         if (ts.tv_nsec >= 1000000000) {
249                                 ts.tv_sec++;
250                                 ts.tv_nsec -= 1000000000;
251                         }
252                         
253                         pthread_cond_timedwait(&ni->ni_cond,
254                                                &ni->ni_mutex, &ts);
255                         
256                         gettimeofday(&now, NULL);
257                         timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
258                                       (now.tv_usec - then.tv_usec) / 1000;
259                         
260                         if (timeout_ms < 0)
261                                 timeout_ms = 0;
262                 }
263 #endif
264         }
265 }