Whamcloud - gitweb
* Landed b_cray_portals_merge.
[fs/lustre-release.git] / lnet / lnet / api-eq.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * api/api-eq.c
5  * User-level event queue management routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include <portals/api-support.h>
27
28 int ptl_get_event (ptl_eq_t *eq, ptl_event_t *ev)
29 {
30         int          new_index = eq->sequence & (eq->size - 1);
31         ptl_event_t *new_event = &eq->base[new_index];
32         ENTRY;
33
34         CDEBUG(D_INFO, "new_event: %p, sequence: %lu, eq->size: %u\n",
35                new_event, eq->sequence, eq->size);
36
37         if (PTL_SEQ_GT (eq->sequence, new_event->sequence)) {
38                 RETURN(PTL_EQ_EMPTY);
39         }
40
41         *ev = *new_event;
42
43         /* ensure event is delivered correctly despite possible 
44            races with lib_finalize */
45         if (eq->sequence != new_event->sequence) {
46                 CERROR("DROPPING EVENT: eq seq %lu ev seq %lu\n",
47                        eq->sequence, new_event->sequence);
48                 RETURN(PTL_EQ_DROPPED);
49         }
50
51         eq->sequence = new_event->sequence + 1;
52         RETURN(PTL_OK);
53 }
54
55 int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t * ev)
56 {
57         int which;
58         
59         return (PtlEQPoll (&eventq, 1, 0, ev, &which));
60 }
61
62 int PtlEQWait(ptl_handle_eq_t eventq_in, ptl_event_t *event_out)
63 {
64         int which;
65         
66         return (PtlEQPoll (&eventq_in, 1, PTL_TIME_FOREVER, 
67                            event_out, &which));
68 }
69
70 int PtlEQPoll(ptl_handle_eq_t *eventqs_in, int neq_in, int timeout,
71               ptl_event_t *event_out, int *which_out)
72 {
73         nal_t        *nal;
74         int           i;
75         int           rc;
76         unsigned long flags;
77         
78         if (!ptl_init)
79                 RETURN(PTL_NO_INIT);
80
81         if (neq_in < 1)
82                 RETURN(PTL_EQ_INVALID);
83         
84         nal = ptl_hndl2nal(&eventqs_in[0]);
85         if (nal == NULL)
86                 RETURN(PTL_EQ_INVALID);
87
88         nal->lock(nal, &flags);
89
90         for (;;) {
91                 for (i = 0; i < neq_in; i++) {
92                         ptl_eq_t *eq = ptl_handle2usereq(&eventqs_in[i]);
93
94                         if (i > 0 &&
95                             ptl_hndl2nal(&eventqs_in[i]) != nal) {
96                                 nal->unlock(nal, &flags);
97                                 RETURN (PTL_EQ_INVALID);
98                         }
99
100                         /* size must be a power of 2 to handle a wrapped sequence # */
101                         LASSERT (eq->size != 0 &&
102                                  eq->size == LOWEST_BIT_SET (eq->size));
103
104                         rc = ptl_get_event (eq, event_out);
105                         if (rc != PTL_EQ_EMPTY) {
106                                 nal->unlock(nal, &flags);
107                                 *which_out = i;
108                                 RETURN(rc);
109                         }
110                 }
111                 
112                 if (timeout == 0) {
113                         nal->unlock(nal, &flags);
114                         RETURN (PTL_EQ_EMPTY);
115                 }
116                         
117                 timeout = nal->yield(nal, &flags, timeout);
118         }
119 }