Whamcloud - gitweb
390156a3be065ca80d80ca2bb53e3e7b23917dbc
[fs/lustre-release.git] / lnet / lnet / api-eq.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * api/api-eq.c
5  * User-level event queue management routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include <portals/api-support.h>
27
28 int ptl_eq_init(void)
29 {
30         /* Nothing to do anymore... */
31         return PTL_OK;
32 }
33
34 void ptl_eq_fini(void)
35 {
36         /* Nothing to do anymore... */
37 }
38
39 int ptl_eq_ni_init(nal_t * nal)
40 {
41         /* Nothing to do anymore... */
42         return PTL_OK;
43 }
44
45 void ptl_eq_ni_fini(nal_t * nal)
46 {
47         /* Nothing to do anymore... */
48 }
49
50 int ptl_get_event (ptl_eq_t *eq, ptl_event_t *ev)
51 {
52         int          new_index = eq->sequence & (eq->size - 1);
53         ptl_event_t *new_event = &eq->base[new_index];
54         ENTRY;
55
56         CDEBUG(D_INFO, "new_event: %p, sequence: %lu, eq->size: %u\n",
57                new_event, eq->sequence, eq->size);
58
59         if (PTL_SEQ_GT (eq->sequence, new_event->sequence)) {
60                 RETURN(PTL_EQ_EMPTY);
61         }
62
63         *ev = *new_event;
64
65         /* ensure event is delivered correctly despite possible 
66            races with lib_finalize */
67         if (eq->sequence != new_event->sequence) {
68                 CERROR("DROPPING EVENT: eq seq %lu ev seq %lu\n",
69                        eq->sequence, new_event->sequence);
70                 RETURN(PTL_EQ_DROPPED);
71         }
72
73         eq->sequence = new_event->sequence + 1;
74         RETURN(PTL_OK);
75 }
76
77 int PtlEQGet(ptl_handle_eq_t eventq, ptl_event_t * ev)
78 {
79         int which;
80         
81         return (PtlEQPoll (&eventq, 1, 0, ev, &which));
82 }
83
84 int PtlEQWait(ptl_handle_eq_t eventq_in, ptl_event_t *event_out)
85 {
86         int which;
87         
88         return (PtlEQPoll (&eventq_in, 1, PTL_TIME_FOREVER, 
89                            event_out, &which));
90 }
91
92 int PtlEQPoll(ptl_handle_eq_t *eventqs_in, int neq_in, int timeout,
93               ptl_event_t *event_out, int *which_out)
94 {
95         nal_t        *nal;
96         int           i;
97         int           rc;
98         unsigned long flags;
99         
100         if (!ptl_init)
101                 RETURN(PTL_NO_INIT);
102
103         if (neq_in < 1)
104                 RETURN(PTL_EQ_INVALID);
105         
106         nal = ptl_hndl2nal(&eventqs_in[0]);
107         if (nal == NULL)
108                 RETURN(PTL_EQ_INVALID);
109
110         nal->lock(nal, &flags);
111
112         for (;;) {
113                 for (i = 0; i < neq_in; i++) {
114                         ptl_eq_t *eq = ptl_handle2usereq(&eventqs_in[i]);
115
116                         if (i > 0 &&
117                             ptl_hndl2nal(&eventqs_in[i]) != nal) {
118                                 nal->unlock(nal, &flags);
119                                 RETURN (PTL_EQ_INVALID);
120                         }
121
122                         /* size must be a power of 2 to handle a wrapped sequence # */
123                         LASSERT (eq->size != 0 &&
124                                  eq->size == LOWEST_BIT_SET (eq->size));
125
126                         rc = ptl_get_event (eq, event_out);
127                         if (rc != PTL_EQ_EMPTY) {
128                                 nal->unlock(nal, &flags);
129                                 *which_out = i;
130                                 RETURN(rc);
131                         }
132                 }
133                 
134                 if (timeout == 0) {
135                         nal->unlock(nal, &flags);
136                         RETURN (PTL_EQ_EMPTY);
137                 }
138                         
139                 timeout = nal->yield(nal, &flags, timeout);
140         }
141 }