Whamcloud - gitweb
The first pass of some overdue copyright cleanup:
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-eq.c
5  * Library level Event queue management routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_PORTALS
26 #include <portals/lib-p30.h>
27
28 int 
29 lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
30                   ptl_eq_handler_t callback, 
31                   ptl_handle_eq_t *handle)
32 {
33         lib_nal_t     *nal = apinal->nal_data;
34         lib_eq_t      *eq;
35         unsigned long  flags;
36         int            rc;
37
38         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
39          * overflow, they don't skip entries, so the queue has the same
40          * apparant capacity at all times */
41
42         if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
43                 do {                    /* knock off all but the top bit... */
44                         count &= ~LOWEST_BIT_SET (count);
45                 } while (count != LOWEST_BIT_SET(count));
46
47                 count <<= 1;                             /* ...and round up */
48         }
49
50         if (count == 0)        /* catch bad parameter / overflow on roundup */
51                 return (PTL_VAL_FAILED);
52         
53         eq = lib_eq_alloc (nal);
54         if (eq == NULL)
55                 return (PTL_NO_SPACE);
56
57         PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t));
58         if (eq->eq_events == NULL) {
59                 LIB_LOCK(nal, flags);
60                 lib_eq_free (nal, eq);
61                 LIB_UNLOCK(nal, flags);
62         }
63
64         if (nal->libnal_map != NULL) {
65                 struct iovec iov = {
66                         .iov_base = eq->eq_events,
67                         .iov_len = count * sizeof(ptl_event_t)};
68
69                 rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey);
70                 if (rc != PTL_OK) {
71                         LIB_LOCK(nal, flags);
72                         lib_eq_free (nal, eq);
73                         LIB_UNLOCK(nal, flags);
74                         return (rc);
75                 }
76         }
77
78         /* NB this resets all event sequence numbers to 0, to be earlier
79          * than eq_deq_seq */
80         memset(eq->eq_events, 0, count * sizeof(ptl_event_t));
81
82         eq->eq_deq_seq = 1;
83         eq->eq_enq_seq = 1;
84         eq->eq_size = count;
85         eq->eq_refcount = 0;
86         eq->eq_callback = callback;
87
88         LIB_LOCK(nal, flags);
89
90         lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ);
91         list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs);
92
93         LIB_UNLOCK(nal, flags);
94
95         ptl_eq2handle(handle, nal, eq);
96         return (PTL_OK);
97 }
98
99 int 
100 lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh)
101 {
102         lib_nal_t     *nal = apinal->nal_data;
103         lib_eq_t      *eq;
104         int            size;
105         ptl_event_t   *events;
106         void          *addrkey;
107         unsigned long  flags;
108
109         LIB_LOCK(nal, flags);
110
111         eq = ptl_handle2eq(eqh, nal);
112         if (eq == NULL) {
113                 LIB_UNLOCK(nal, flags);
114                 return (PTL_EQ_INVALID);
115         }
116
117         if (eq->eq_refcount != 0) {
118                 LIB_UNLOCK(nal, flags);
119                 return (PTL_EQ_IN_USE);
120         }
121
122         /* stash for free after lock dropped */
123         events  = eq->eq_events;
124         size    = eq->eq_size;
125         addrkey = eq->eq_addrkey;
126
127         lib_invalidate_handle (nal, &eq->eq_lh);
128         list_del (&eq->eq_list);
129         lib_eq_free (nal, eq);
130
131         LIB_UNLOCK(nal, flags);
132
133         if (nal->libnal_unmap != NULL) {
134                 struct iovec iov = {
135                         .iov_base = events,
136                         .iov_len = size * sizeof(ptl_event_t)};
137
138                 nal->libnal_unmap(nal, 1, &iov, &addrkey);
139         }
140
141         PORTAL_FREE(events, size * sizeof (ptl_event_t));
142
143         return (PTL_OK);
144 }
145
146 int
147 lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
148 {
149         int          new_index = eq->eq_deq_seq & (eq->eq_size - 1);
150         ptl_event_t *new_event = &eq->eq_events[new_index];
151         int          rc;
152         ENTRY;
153
154         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
155                new_event, eq->eq_deq_seq, eq->eq_size);
156
157         if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
158                 RETURN(PTL_EQ_EMPTY);
159         }
160
161         /* We've got a new event... */
162         *ev = *new_event;
163
164         /* ...but did it overwrite an event we've not seen yet? */
165         if (eq->eq_deq_seq == new_event->sequence) {
166                 rc = PTL_OK;
167         } else {
168                 CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n",
169                        eq->eq_deq_seq, new_event->sequence);
170                 rc = PTL_EQ_DROPPED;
171         }
172
173         eq->eq_deq_seq = new_event->sequence + 1;
174         RETURN(rc);
175 }
176
177
178 int
179 lib_api_eq_poll (nal_t *apinal, 
180                  ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
181                  ptl_event_t *event, int *which)
182 {
183         lib_nal_t       *nal = apinal->nal_data;
184         lib_ni_t        *ni = &nal->libnal_ni;
185         unsigned long    flags;
186         int              i;
187         int              rc;
188 #ifdef __KERNEL__
189         wait_queue_t     wq;
190         unsigned long    now;
191 #else
192         struct timeval   then;
193         struct timeval   now;
194         struct timespec  ts;
195 #endif
196         ENTRY;
197
198         LIB_LOCK(nal, flags);
199
200         for (;;) {
201                 for (i = 0; i < neq; i++) {
202                         lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal);
203
204                         rc = lib_get_event (eq, event);
205                         if (rc != PTL_EQ_EMPTY) {
206                                 LIB_UNLOCK(nal, flags);
207                                 *which = i;
208                                 RETURN(rc);
209                         }
210                 }
211                 
212                 if (timeout_ms == 0) {
213                         LIB_UNLOCK (nal, flags);
214                         RETURN (PTL_EQ_EMPTY);
215                 }
216
217                 /* Some architectures force us to do spin locking/unlocking
218                  * in the same stack frame, means we can abstract the
219                  * locking here */
220 #ifdef __KERNEL__
221                 init_waitqueue_entry(&wq, current);
222                 set_current_state(TASK_INTERRUPTIBLE);
223                 add_wait_queue(&ni->ni_waitq, &wq);
224
225                 LIB_UNLOCK(nal, flags);
226
227                 if (timeout_ms < 0) {
228                         schedule ();
229                 } else {
230                         now = jiffies;
231                         schedule_timeout((timeout_ms * HZ)/1000);
232                         timeout_ms -= ((jiffies - now) * 1000)/HZ;
233                         if (timeout_ms < 0)
234                                 timeout_ms = 0;
235                 }
236                 
237                 LIB_LOCK(nal, flags);
238 #else
239                 if (timeout_ms < 0) {
240                         pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex);
241                 } else {
242                         gettimeofday(&then, NULL);
243                         
244                         ts.tv_sec = then.tv_sec + timeout_ms/1000;
245                         ts.tv_nsec = then.tv_usec * 1000 + 
246                                      (timeout_ms%1000) * 1000000;
247                         if (ts.tv_nsec >= 1000000000) {
248                                 ts.tv_sec++;
249                                 ts.tv_nsec -= 1000000000;
250                         }
251                         
252                         pthread_cond_timedwait(&ni->ni_cond,
253                                                &ni->ni_mutex, &ts);
254                         
255                         gettimeofday(&now, NULL);
256                         timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
257                                       (now.tv_usec - then.tv_usec) / 1000;
258                         
259                         if (timeout_ms < 0)
260                                 timeout_ms = 0;
261                 }
262 #endif
263         }
264 }