Whamcloud - gitweb
- fixed a comment typo.
[fs/lustre-release.git] / lnet / lnet / lib-md.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-md.c
5  * Memory Descriptor management routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LNET
26
27 #include <lnet/lib-lnet.h>
28
29 /* must be called with LNET_LOCK held */
30 void
31 lnet_md_unlink(lnet_libmd_t *md)
32 {
33         if ((md->md_flags & LNET_MD_FLAG_ZOMBIE) == 0) {
34                 /* first unlink attempt... */
35                 lnet_me_t *me = md->md_me;
36
37                 md->md_flags |= LNET_MD_FLAG_ZOMBIE;
38
39                 /* Disassociate from ME (if any), and unlink it if it was created
40                  * with LNET_UNLINK */
41                 if (me != NULL) {
42                         me->me_md = NULL;
43                         if (me->me_unlink == LNET_UNLINK)
44                                 lnet_me_unlink(me);
45                 }
46
47                 /* ensure all future handle lookups fail */
48                 lnet_invalidate_handle(&md->md_lh);
49         }
50
51         if (md->md_refcount != 0) {
52                 CDEBUG(D_NET, "Queueing unlink of md %p\n", md);
53                 return;
54         }
55
56         CDEBUG(D_NET, "Unlinking md %p\n", md);
57
58         if (md->md_eq != NULL) {
59                 md->md_eq->eq_refcount--;
60                 LASSERT (md->md_eq->eq_refcount >= 0);
61         }
62
63         list_del (&md->md_list);
64         lnet_md_free(md);
65 }
66
67 /* must be called with LNET_LOCK held */
68 static int
69 lib_md_build(lnet_libmd_t *lmd, lnet_md_t *umd, int unlink)
70 {
71         lnet_eq_t   *eq = NULL;
72         int          i;
73         unsigned int niov;
74         int          total_length = 0;
75
76         /* NB we are passed an allocated, but uninitialised/active md.
77          * if we return success, caller may lnet_md_unlink() it.
78          * otherwise caller may only lnet_md_free() it.
79          */
80
81         if (!LNetHandleIsEqual (umd->eq_handle, LNET_EQ_NONE)) {
82                 eq = lnet_handle2eq(&umd->eq_handle);
83                 if (eq == NULL)
84                         return -ENOENT;
85         }
86
87         /* This implementation doesn't know how to create START events or
88          * disable END events.  Best to LASSERT our caller is compliant so
89          * we find out quickly...  */
90         /*  TODO - reevaluate what should be here in light of 
91          * the removal of the start and end events
92          * maybe there we shouldn't even allow LNET_EQ_NONE!)
93         LASSERT (eq == NULL);
94          */
95
96         lmd->md_me = NULL;
97         lmd->md_start = umd->start;
98         lmd->md_offset = 0;
99         lmd->md_max_size = umd->max_size;
100         lmd->md_options = umd->options;
101         lmd->md_user_ptr = umd->user_ptr;
102         lmd->md_eq = eq;
103         lmd->md_threshold = umd->threshold;
104         lmd->md_refcount = 0;
105         lmd->md_flags = (unlink == LNET_UNLINK) ? LNET_MD_FLAG_AUTO_UNLINK : 0;
106
107         if ((umd->options & LNET_MD_IOVEC) != 0) {
108
109                 if ((umd->options & LNET_MD_KIOV) != 0) /* Can't specify both */
110                         return -EINVAL;
111
112                 lmd->md_niov = niov = umd->length;
113                 memcpy(lmd->md_iov.iov, umd->start,
114                        niov * sizeof (lmd->md_iov.iov[0]));
115
116                 for (i = 0; i < niov; i++) {
117                         /* We take the base address on trust */
118                         if (lmd->md_iov.iov[i].iov_len <= 0) /* invalid length */
119                                 return -EINVAL;
120
121                         total_length += lmd->md_iov.iov[i].iov_len;
122                 }
123
124                 lmd->md_length = total_length;
125
126                 if ((umd->options & LNET_MD_MAX_SIZE) != 0 && /* max size used */
127                     (umd->max_size < 0 ||
128                      umd->max_size > total_length)) // illegal max_size
129                         return -EINVAL;
130
131         } else if ((umd->options & LNET_MD_KIOV) != 0) {
132 #ifndef __KERNEL__
133                 return -EINVAL;
134 #else
135                 lmd->md_niov = niov = umd->length;
136                 memcpy(lmd->md_iov.kiov, umd->start,
137                        niov * sizeof (lmd->md_iov.kiov[0]));
138
139                 for (i = 0; i < niov; i++) {
140                         /* We take the page pointer on trust */
141                         if (lmd->md_iov.kiov[i].kiov_offset +
142                             lmd->md_iov.kiov[i].kiov_len > CFS_PAGE_SIZE )
143                                 return -EINVAL; /* invalid length */
144
145                         total_length += lmd->md_iov.kiov[i].kiov_len;
146                 }
147
148                 lmd->md_length = total_length;
149
150                 if ((umd->options & LNET_MD_MAX_SIZE) != 0 && /* max size used */
151                     (umd->max_size < 0 ||
152                      umd->max_size > total_length)) // illegal max_size
153                         return -EINVAL;
154 #endif
155         } else {   /* contiguous */
156                 lmd->md_length = umd->length;
157                 lmd->md_niov = niov = 1;
158                 lmd->md_iov.iov[0].iov_base = umd->start;
159                 lmd->md_iov.iov[0].iov_len = umd->length;
160
161                 if ((umd->options & LNET_MD_MAX_SIZE) != 0 && /* max size used */
162                     (umd->max_size < 0 ||
163                      umd->max_size > umd->length)) // illegal max_size
164                         return -EINVAL;
165         }
166
167         if (eq != NULL)
168                 eq->eq_refcount++;
169
170         /* It's good; let handle2md succeed and add to active mds */
171         lnet_initialise_handle (&lmd->md_lh, LNET_COOKIE_TYPE_MD);
172         list_add (&lmd->md_list, &the_lnet.ln_active_mds);
173
174         return 0;
175 }
176
177 /* must be called with LNET_LOCK held */
178 void
179 lnet_md_deconstruct(lnet_libmd_t *lmd, lnet_md_t *umd)
180 {
181         /* NB this doesn't copy out all the iov entries so when a
182          * discontiguous MD is copied out, the target gets to know the
183          * original iov pointer (in start) and the number of entries it had
184          * and that's all.
185          */
186         umd->start = lmd->md_start;
187         umd->length = ((lmd->md_options & (LNET_MD_IOVEC | LNET_MD_KIOV)) == 0) ?
188                       lmd->md_length : lmd->md_niov;
189         umd->threshold = lmd->md_threshold;
190         umd->max_size = lmd->md_max_size;
191         umd->options = lmd->md_options;
192         umd->user_ptr = lmd->md_user_ptr;
193         lnet_eq2handle(&umd->eq_handle, lmd->md_eq);
194 }
195
196 int
197 LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
198              lnet_unlink_t unlink, lnet_handle_md_t *handle)
199 {
200         lnet_me_t     *me;
201         lnet_libmd_t  *md;
202         int            rc;
203
204         LASSERT (the_lnet.ln_init);
205         LASSERT (the_lnet.ln_refcount > 0);
206         
207         if ((umd.options & (LNET_MD_KIOV | LNET_MD_IOVEC)) != 0 &&
208             umd.length > LNET_MAX_IOV) /* too many fragments */
209                 return -EINVAL;
210
211         md = lnet_md_alloc(&umd);
212         if (md == NULL)
213                 return -ENOMEM;
214
215         LNET_LOCK();
216
217         me = lnet_handle2me(&meh);
218         if (me == NULL) {
219                 rc = -ENOENT;
220         } else if (me->me_md != NULL) {
221                 rc = -EBUSY;
222         } else {
223                 rc = lib_md_build(md, &umd, unlink);
224                 if (rc == 0) {
225                         me->me_md = md;
226                         md->md_me = me;
227
228                         lnet_md2handle(handle, md);
229
230                         /* check if this MD matches any blocked msgs */
231                         lnet_match_blocked_msg(md);   /* expects LNET_LOCK held */
232
233                         LNET_UNLOCK();
234                         return (0);
235                 }
236         }
237
238         lnet_md_free (md);
239
240         LNET_UNLOCK();
241         return (rc);
242 }
243
244 int
245 LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle)
246 {
247         lnet_libmd_t  *md;
248         int            rc;
249
250         LASSERT (the_lnet.ln_init);
251         LASSERT (the_lnet.ln_refcount > 0);
252         
253         if ((umd.options & (LNET_MD_KIOV | LNET_MD_IOVEC)) != 0 &&
254             umd.length > LNET_MAX_IOV) /* too many fragments */
255                 return -EINVAL;
256
257         md = lnet_md_alloc(&umd);
258         if (md == NULL)
259                 return -ENOMEM;
260
261         LNET_LOCK();
262
263         rc = lib_md_build(md, &umd, unlink);
264
265         if (rc == 0) {
266                 lnet_md2handle(handle, md);
267
268                 LNET_UNLOCK();
269                 return (0);
270         }
271
272         lnet_md_free (md);
273
274         LNET_UNLOCK();
275         return (rc);
276 }
277
278 int
279 LNetMDUnlink (lnet_handle_md_t mdh)
280 {
281         lnet_event_t     ev;
282         lnet_libmd_t    *md;
283
284         LASSERT (the_lnet.ln_init);
285         LASSERT (the_lnet.ln_refcount > 0);
286         
287         LNET_LOCK();
288
289         md = lnet_handle2md(&mdh);
290         if (md == NULL) {
291                 LNET_UNLOCK();
292                 return -ENOENT;
293         }
294
295         /* If the MD is busy, lnet_md_unlink just marks it for deletion, and
296          * when the NAL is done, the completion event flags that the MD was
297          * unlinked.  Otherwise, we enqueue an event now... */
298
299         if (md->md_eq != NULL &&
300             md->md_refcount == 0) {
301                 memset(&ev, 0, sizeof(ev));
302
303                 ev.type = LNET_EVENT_UNLINK;
304                 ev.status = 0;
305                 ev.unlinked = 1;
306                 lnet_md_deconstruct(md, &ev.md);
307                 lnet_md2handle(&ev.md_handle, md);
308
309                 lnet_enq_event_locked(md->md_eq, &ev);
310         }
311
312         lnet_md_unlink(md);
313
314         LNET_UNLOCK();
315         return 0;
316 }
317