Whamcloud - gitweb
b=15272
[fs/lustre-release.git] / lnet / lnet / lib-md.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-md.c
5  * Memory Descriptor management routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LNET
26
27 #include <lnet/lib-lnet.h>
28
29 /* must be called with LNET_LOCK held */
30 void
31 lnet_md_unlink(lnet_libmd_t *md)
32 {
33         if ((md->md_flags & LNET_MD_FLAG_ZOMBIE) == 0) {
34                 /* first unlink attempt... */
35                 lnet_me_t *me = md->md_me;
36
37                 md->md_flags |= LNET_MD_FLAG_ZOMBIE;
38
39                 /* Disassociate from ME (if any), and unlink it if it was created
40                  * with LNET_UNLINK */
41                 if (me != NULL) {
42                         md->md_me = NULL;
43                         me->me_md = NULL;
44                         if (me->me_unlink == LNET_UNLINK)
45                                 lnet_me_unlink(me);
46                 }
47
48                 /* ensure all future handle lookups fail */
49                 lnet_invalidate_handle(&md->md_lh);
50         }
51
52         if (md->md_refcount != 0) {
53                 CDEBUG(D_NET, "Queueing unlink of md %p\n", md);
54                 return;
55         }
56
57         CDEBUG(D_NET, "Unlinking md %p\n", md);
58
59         if (md->md_eq != NULL) {
60                 md->md_eq->eq_refcount--;
61                 LASSERT (md->md_eq->eq_refcount >= 0);
62         }
63
64         list_del (&md->md_list);
65         lnet_md_free(md);
66 }
67
68 /* must be called with LNET_LOCK held */
69 static int
70 lib_md_build(lnet_libmd_t *lmd, lnet_md_t *umd, int unlink)
71 {
72         lnet_eq_t   *eq = NULL;
73         int          i;
74         unsigned int niov;
75         int          total_length = 0;
76
77         /* NB we are passed an allocated, but uninitialised/active md.
78          * if we return success, caller may lnet_md_unlink() it.
79          * otherwise caller may only lnet_md_free() it.
80          */
81
82         if (!LNetHandleIsEqual (umd->eq_handle, LNET_EQ_NONE)) {
83                 eq = lnet_handle2eq(&umd->eq_handle);
84                 if (eq == NULL)
85                         return -ENOENT;
86         }
87
88         /* This implementation doesn't know how to create START events or
89          * disable END events.  Best to LASSERT our caller is compliant so
90          * we find out quickly...  */
91         /*  TODO - reevaluate what should be here in light of 
92          * the removal of the start and end events
93          * maybe there we shouldn't even allow LNET_EQ_NONE!)
94         LASSERT (eq == NULL);
95          */
96
97         lmd->md_me = NULL;
98         lmd->md_start = umd->start;
99         lmd->md_offset = 0;
100         lmd->md_max_size = umd->max_size;
101         lmd->md_options = umd->options;
102         lmd->md_user_ptr = umd->user_ptr;
103         lmd->md_eq = eq;
104         lmd->md_threshold = umd->threshold;
105         lmd->md_refcount = 0;
106         lmd->md_flags = (unlink == LNET_UNLINK) ? LNET_MD_FLAG_AUTO_UNLINK : 0;
107
108         if ((umd->options & LNET_MD_IOVEC) != 0) {
109
110                 if ((umd->options & LNET_MD_KIOV) != 0) /* Can't specify both */
111                         return -EINVAL;
112
113                 lmd->md_niov = niov = umd->length;
114                 memcpy(lmd->md_iov.iov, umd->start,
115                        niov * sizeof (lmd->md_iov.iov[0]));
116
117                 for (i = 0; i < niov; i++) {
118                         /* We take the base address on trust */
119                         if (lmd->md_iov.iov[i].iov_len <= 0) /* invalid length */
120                                 return -EINVAL;
121
122                         total_length += lmd->md_iov.iov[i].iov_len;
123                 }
124
125                 lmd->md_length = total_length;
126
127                 if ((umd->options & LNET_MD_MAX_SIZE) != 0 && /* max size used */
128                     (umd->max_size < 0 ||
129                      umd->max_size > total_length)) // illegal max_size
130                         return -EINVAL;
131
132         } else if ((umd->options & LNET_MD_KIOV) != 0) {
133 #ifndef __KERNEL__
134                 return -EINVAL;
135 #else
136                 lmd->md_niov = niov = umd->length;
137                 memcpy(lmd->md_iov.kiov, umd->start,
138                        niov * sizeof (lmd->md_iov.kiov[0]));
139
140                 for (i = 0; i < niov; i++) {
141                         /* We take the page pointer on trust */
142                         if (lmd->md_iov.kiov[i].kiov_offset +
143                             lmd->md_iov.kiov[i].kiov_len > CFS_PAGE_SIZE )
144                                 return -EINVAL; /* invalid length */
145
146                         total_length += lmd->md_iov.kiov[i].kiov_len;
147                 }
148
149                 lmd->md_length = total_length;
150
151                 if ((umd->options & LNET_MD_MAX_SIZE) != 0 && /* max size used */
152                     (umd->max_size < 0 ||
153                      umd->max_size > total_length)) // illegal max_size
154                         return -EINVAL;
155 #endif
156         } else {   /* contiguous */
157                 lmd->md_length = umd->length;
158                 lmd->md_niov = niov = 1;
159                 lmd->md_iov.iov[0].iov_base = umd->start;
160                 lmd->md_iov.iov[0].iov_len = umd->length;
161
162                 if ((umd->options & LNET_MD_MAX_SIZE) != 0 && /* max size used */
163                     (umd->max_size < 0 ||
164                      umd->max_size > umd->length)) // illegal max_size
165                         return -EINVAL;
166         }
167
168         if (eq != NULL)
169                 eq->eq_refcount++;
170
171         /* It's good; let handle2md succeed and add to active mds */
172         lnet_initialise_handle (&lmd->md_lh, LNET_COOKIE_TYPE_MD);
173         list_add (&lmd->md_list, &the_lnet.ln_active_mds);
174
175         return 0;
176 }
177
178 /* must be called with LNET_LOCK held */
179 void
180 lnet_md_deconstruct(lnet_libmd_t *lmd, lnet_md_t *umd)
181 {
182         /* NB this doesn't copy out all the iov entries so when a
183          * discontiguous MD is copied out, the target gets to know the
184          * original iov pointer (in start) and the number of entries it had
185          * and that's all.
186          */
187         umd->start = lmd->md_start;
188         umd->length = ((lmd->md_options & (LNET_MD_IOVEC | LNET_MD_KIOV)) == 0) ?
189                       lmd->md_length : lmd->md_niov;
190         umd->threshold = lmd->md_threshold;
191         umd->max_size = lmd->md_max_size;
192         umd->options = lmd->md_options;
193         umd->user_ptr = lmd->md_user_ptr;
194         lnet_eq2handle(&umd->eq_handle, lmd->md_eq);
195 }
196
197 int
198 LNetMDAttach(lnet_handle_me_t meh, lnet_md_t umd,
199              lnet_unlink_t unlink, lnet_handle_md_t *handle)
200 {
201         lnet_me_t     *me;
202         lnet_libmd_t  *md;
203         int            rc;
204
205         LASSERT (the_lnet.ln_init);
206         LASSERT (the_lnet.ln_refcount > 0);
207         
208         if ((umd.options & (LNET_MD_KIOV | LNET_MD_IOVEC)) != 0 &&
209             umd.length > LNET_MAX_IOV) /* too many fragments */
210                 return -EINVAL;
211
212         md = lnet_md_alloc(&umd);
213         if (md == NULL)
214                 return -ENOMEM;
215
216         LNET_LOCK();
217
218         me = lnet_handle2me(&meh);
219         if (me == NULL) {
220                 rc = -ENOENT;
221         } else if (me->me_md != NULL) {
222                 rc = -EBUSY;
223         } else {
224                 rc = lib_md_build(md, &umd, unlink);
225                 if (rc == 0) {
226                         me->me_md = md;
227                         md->md_me = me;
228
229                         lnet_md2handle(handle, md);
230
231                         /* check if this MD matches any blocked msgs */
232                         lnet_match_blocked_msg(md);   /* expects LNET_LOCK held */
233
234                         LNET_UNLOCK();
235                         return (0);
236                 }
237         }
238
239         lnet_md_free (md);
240
241         LNET_UNLOCK();
242         return (rc);
243 }
244
245 int
246 LNetMDBind(lnet_md_t umd, lnet_unlink_t unlink, lnet_handle_md_t *handle)
247 {
248         lnet_libmd_t  *md;
249         int            rc;
250
251         LASSERT (the_lnet.ln_init);
252         LASSERT (the_lnet.ln_refcount > 0);
253         
254         if ((umd.options & (LNET_MD_KIOV | LNET_MD_IOVEC)) != 0 &&
255             umd.length > LNET_MAX_IOV) /* too many fragments */
256                 return -EINVAL;
257
258         md = lnet_md_alloc(&umd);
259         if (md == NULL)
260                 return -ENOMEM;
261
262         LNET_LOCK();
263
264         rc = lib_md_build(md, &umd, unlink);
265
266         if (rc == 0) {
267                 lnet_md2handle(handle, md);
268
269                 LNET_UNLOCK();
270                 return (0);
271         }
272
273         lnet_md_free (md);
274
275         LNET_UNLOCK();
276         return (rc);
277 }
278
279 int
280 LNetMDUnlink (lnet_handle_md_t mdh)
281 {
282         lnet_event_t     ev;
283         lnet_libmd_t    *md;
284
285         LASSERT (the_lnet.ln_init);
286         LASSERT (the_lnet.ln_refcount > 0);
287
288         LNET_LOCK();
289
290         md = lnet_handle2md(&mdh);
291         if (md == NULL) {
292                 LNET_UNLOCK();
293                 return -ENOENT;
294         }
295
296         /* If the MD is busy, lnet_md_unlink just marks it for deletion, and
297          * when the NAL is done, the completion event flags that the MD was
298          * unlinked.  Otherwise, we enqueue an event now... */
299
300         if (md->md_eq != NULL &&
301             md->md_refcount == 0) {
302                 lnet_build_unlink_event(md, &ev);
303                 lnet_enq_event_locked(md->md_eq, &ev);
304         }
305
306         lnet_md_unlink(md);
307
308         LNET_UNLOCK();
309         return 0;
310 }
311