Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / lmv / lmv_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LMV
41 #ifdef __KERNEL__
42 #include <linux/slab.h>
43 #include <linux/module.h>
44 #include <linux/init.h>
45 #include <linux/slab.h>
46 #include <linux/pagemap.h>
47 #include <asm/div64.h>
48 #include <linux/seq_file.h>
49 #else
50 #include <liblustre.h>
51 #endif
52
53 #include <lustre/lustre_idl.h>
54 #include <obd_support.h>
55 #include <lustre_lib.h>
56 #include <lustre_net.h>
57 #include <lustre_dlm.h>
58 #include <obd_class.h>
59 #include <lprocfs_status.h>
60 #include "lmv_internal.h"
61
62 /* objects cache. */
63 extern cfs_mem_cache_t *obj_cache;
64 extern atomic_t obj_cache_count;
65
66 /* object list and its guard. */
67 static CFS_LIST_HEAD(obj_list);
68 static spinlock_t obj_list_lock = SPIN_LOCK_UNLOCKED;
69
70 /* creates new obj on passed @fid and @mea. */
71 struct lmv_obj *
72 lmv_obj_alloc(struct obd_device *obd,
73               const struct lu_fid *fid,
74               struct lmv_stripe_md *mea)
75 {
76         int i;
77         struct lmv_obj *obj;
78         unsigned int obj_size;
79         struct lmv_obd *lmv = &obd->u.lmv;
80
81         LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR
82                 || mea->mea_magic == MEA_MAGIC_ALL_CHARS
83                 || mea->mea_magic == MEA_MAGIC_HASH_SEGMENT);
84
85         OBD_SLAB_ALLOC(obj, obj_cache, CFS_ALLOC_STD,
86                        sizeof(*obj));
87         if (!obj)
88                 return NULL;
89
90         atomic_inc(&obj_cache_count);
91
92         obj->lo_fid = *fid;
93         obj->lo_obd = obd;
94         obj->lo_state = 0;
95         obj->lo_hashtype = mea->mea_magic;
96
97         init_MUTEX(&obj->lo_guard);
98         atomic_set(&obj->lo_count, 0);
99         obj->lo_objcount = mea->mea_count;
100
101         obj_size = sizeof(struct lmv_inode) *
102                 lmv->desc.ld_tgt_count;
103
104         OBD_ALLOC(obj->lo_inodes, obj_size);
105         if (!obj->lo_inodes)
106                 goto err_obj;
107
108         memset(obj->lo_inodes, 0, obj_size);
109
110         /* put all ids in */
111         for (i = 0; i < mea->mea_count; i++) {
112                 int rc;
113
114                 CDEBUG(D_OTHER, "subobj "DFID"\n",
115                        PFID(&mea->mea_ids[i]));
116                 obj->lo_inodes[i].li_fid = mea->mea_ids[i];
117                 LASSERT(fid_is_sane(&obj->lo_inodes[i].li_fid));
118
119                 /*
120                  * Cache slave mds number to use it in all cases it is needed
121                  * instead of constant lookup.
122                  */
123                 rc = lmv_fld_lookup(lmv, &obj->lo_inodes[i].li_fid,
124                                     &obj->lo_inodes[i].li_mds);
125                 if (rc)
126                         goto err_obj;
127         }
128
129         return obj;
130
131 err_obj:
132         OBD_FREE(obj, sizeof(*obj));
133         return NULL;
134 }
135
136 /* destroy passed @obj. */
137 void
138 lmv_obj_free(struct lmv_obj *obj)
139 {
140         struct lmv_obd *lmv = &obj->lo_obd->u.lmv;
141         unsigned int obj_size;
142
143         LASSERT(!atomic_read(&obj->lo_count));
144
145         obj_size = sizeof(struct lmv_inode) *
146                 lmv->desc.ld_tgt_count;
147
148         OBD_FREE(obj->lo_inodes, obj_size);
149         OBD_SLAB_FREE(obj, obj_cache, sizeof(*obj));
150         atomic_dec(&obj_cache_count);
151 }
152
153 static void
154 __lmv_obj_add(struct lmv_obj *obj)
155 {
156         atomic_inc(&obj->lo_count);
157         list_add(&obj->lo_list, &obj_list);
158 }
159
160 void
161 lmv_obj_add(struct lmv_obj *obj)
162 {
163         spin_lock(&obj_list_lock);
164         __lmv_obj_add(obj);
165         spin_unlock(&obj_list_lock);
166 }
167
168 static void
169 __lmv_obj_del(struct lmv_obj *obj)
170 {
171         list_del(&obj->lo_list);
172         lmv_obj_free(obj);
173 }
174
175 void
176 lmv_obj_del(struct lmv_obj *obj)
177 {
178         spin_lock(&obj_list_lock);
179         __lmv_obj_del(obj);
180         spin_unlock(&obj_list_lock);
181 }
182
183 static struct lmv_obj *
184 __lmv_obj_get(struct lmv_obj *obj)
185 {
186         LASSERT(obj != NULL);
187         atomic_inc(&obj->lo_count);
188         return obj;
189 }
190
191 struct lmv_obj *
192 lmv_obj_get(struct lmv_obj *obj)
193 {
194         spin_lock(&obj_list_lock);
195         __lmv_obj_get(obj);
196         spin_unlock(&obj_list_lock);
197         return obj;
198 }
199
200 static void
201 __lmv_obj_put(struct lmv_obj *obj)
202 {
203         LASSERT(obj);
204
205         if (atomic_dec_and_test(&obj->lo_count)) {
206                 CDEBUG(D_OTHER, "last reference to "DFID" - "
207                        "destroying\n", PFID(&obj->lo_fid));
208                 __lmv_obj_del(obj);
209         }
210 }
211
212 void
213 lmv_obj_put(struct lmv_obj *obj)
214 {
215         spin_lock(&obj_list_lock);
216         __lmv_obj_put(obj);
217         spin_unlock(&obj_list_lock);
218 }
219
220 static struct lmv_obj *
221 __lmv_obj_grab(struct obd_device *obd, const struct lu_fid *fid)
222 {
223         struct lmv_obj *obj;
224         struct list_head *cur;
225
226         list_for_each(cur, &obj_list) {
227                 obj = list_entry(cur, struct lmv_obj, lo_list);
228
229                 /* check if object is in progress of destroying. If so - skip
230                  * it. */
231                 if (obj->lo_state & O_FREEING)
232                         continue;
233
234                 /*
235                  * we should make sure, that we have found object belong to
236                  * passed obd. It is possible that, object manager will have two
237                  * objects with the same fid belong to different obds, if client
238                  * and mds runs on the same host. May be it is good idea to have
239                  * objects list associated with obd.
240                  */
241                 if (obj->lo_obd != obd)
242                         continue;
243
244                 /* check if this is what we're looking for. */
245                 if (lu_fid_eq(&obj->lo_fid, fid))
246                         return __lmv_obj_get(obj);
247         }
248
249         return NULL;
250 }
251
252 struct lmv_obj *
253 lmv_obj_grab(struct obd_device *obd, const struct lu_fid *fid)
254 {
255         struct lmv_obj *obj;
256         ENTRY;
257
258         spin_lock(&obj_list_lock);
259         obj = __lmv_obj_grab(obd, fid);
260         spin_unlock(&obj_list_lock);
261
262         RETURN(obj);
263 }
264
265 /* looks in objects list for an object that matches passed @fid. If it is not
266  * found -- creates it using passed @mea and puts onto list. */
267 static struct lmv_obj *
268 __lmv_obj_create(struct obd_device *obd, const struct lu_fid *fid,
269                  struct lmv_stripe_md *mea)
270 {
271         struct lmv_obj *new, *obj;
272         ENTRY;
273
274         obj = lmv_obj_grab(obd, fid);
275         if (obj)
276                 RETURN(obj);
277
278         /* no such object yet, allocate and initialize it. */
279         new = lmv_obj_alloc(obd, fid, mea);
280         if (!new)
281                 RETURN(NULL);
282
283         /* check if someone create it already while we were dealing with
284          * allocating @obj. */
285         spin_lock(&obj_list_lock);
286         obj = __lmv_obj_grab(obd, fid);
287         if (obj) {
288                 /* someone created it already - put @obj and getting out. */
289                 spin_unlock(&obj_list_lock);
290                 lmv_obj_free(new);
291                 RETURN(obj);
292         }
293
294         __lmv_obj_add(new);
295         __lmv_obj_get(new);
296
297         spin_unlock(&obj_list_lock);
298
299         CDEBUG(D_OTHER, "new obj in lmv cache: "DFID"\n",
300                PFID(fid));
301
302         RETURN(new);
303
304 }
305
306 /* creates object from passed @fid and @mea. If @mea is NULL, it will be
307  * obtained from correct MDT and used for constructing the object. */
308 struct lmv_obj *
309 lmv_obj_create(struct obd_export *exp, const struct lu_fid *fid,
310                struct lmv_stripe_md *mea)
311 {
312         struct obd_device *obd = exp->exp_obd;
313         struct lmv_obd *lmv = &obd->u.lmv;
314         struct ptlrpc_request *req = NULL;
315         struct obd_export *tgt_exp;
316         struct lmv_obj *obj;
317         struct lustre_md md;
318         int mealen, rc;
319         ENTRY;
320
321         CDEBUG(D_OTHER, "get mea for "DFID" and create lmv obj\n",
322                PFID(fid));
323
324         md.mea = NULL;
325         
326         if (mea == NULL) {
327                 __u64 valid;
328
329                 CDEBUG(D_OTHER, "mea isn't passed in, get it now\n");
330                 mealen = lmv_get_easize(lmv);
331
332                 /* time to update mea of parent fid */
333                 md.mea = NULL;
334                 valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
335
336                 tgt_exp = lmv_find_export(lmv, fid);
337                 if (IS_ERR(tgt_exp))
338                         GOTO(cleanup, obj = (void *)tgt_exp);
339
340                 rc = md_getattr(tgt_exp, fid, NULL, valid, mealen, &req);
341                 if (rc) {
342                         CERROR("md_getattr() failed, error %d\n", rc);
343                         GOTO(cleanup, obj = ERR_PTR(rc));
344                 }
345
346                 rc = md_get_lustre_md(exp, req, NULL, exp, &md);
347                 if (rc) {
348                         CERROR("mdc_get_lustre_md() failed, error %d\n", rc);
349                         GOTO(cleanup, obj = ERR_PTR(rc));
350                 }
351
352                 if (md.mea == NULL)
353                         GOTO(cleanup, obj = ERR_PTR(-ENODATA));
354
355                 mea = md.mea;
356         }
357
358         /* got mea, now create obj for it. */
359         obj = __lmv_obj_create(obd, fid, mea);
360         if (!obj) {
361                 CERROR("Can't create new object "DFID"\n",
362                        PFID(fid));
363                 GOTO(cleanup, obj = ERR_PTR(-ENOMEM));
364         }
365
366         /* XXX LOV STACKING */
367         if (md.mea != NULL)
368                 obd_free_memmd(exp, (void *)&md.mea);
369
370         EXIT;
371 cleanup:
372         if (req)
373                 ptlrpc_req_finished(req);
374         return obj;
375 }
376
377 /*
378  * looks for object with @fid and orders to destroy it. It is possible the object
379  * will not be destroyed right now, because it is still using by someone. In
380  * this case it will be marked as "freeing" and will not be accessible anymore
381  * for subsequent callers of lmv_obj_grab().
382  */
383 int
384 lmv_obj_delete(struct obd_export *exp, const struct lu_fid *fid)
385 {
386         struct obd_device *obd = exp->exp_obd;
387         struct lmv_obj *obj;
388         int rc = 0;
389         ENTRY;
390
391         spin_lock(&obj_list_lock);
392         obj = __lmv_obj_grab(obd, fid);
393         if (obj) {
394                 obj->lo_state |= O_FREEING;
395                 __lmv_obj_put(obj);
396                 __lmv_obj_put(obj);
397                 rc = 1;
398         }
399         spin_unlock(&obj_list_lock);
400
401         RETURN(rc);
402 }
403
404 int
405 lmv_obj_setup(struct obd_device *obd)
406 {
407         ENTRY;
408         LASSERT(obd != NULL);
409
410         CDEBUG(D_INFO, "LMV object manager setup (%s)\n",
411                obd->obd_uuid.uuid);
412
413         RETURN(0);
414 }
415
416 void
417 lmv_obj_cleanup(struct obd_device *obd)
418 {
419         struct list_head *cur, *tmp;
420         struct lmv_obj *obj;
421         ENTRY;
422
423         CDEBUG(D_INFO, "LMV object manager cleanup (%s)\n",
424                obd->obd_uuid.uuid);
425
426         spin_lock(&obj_list_lock);
427         list_for_each_safe(cur, tmp, &obj_list) {
428                 obj = list_entry(cur, struct lmv_obj, lo_list);
429
430                 if (obj->lo_obd != obd)
431                         continue;
432
433                 obj->lo_state |= O_FREEING;
434                 if (atomic_read(&obj->lo_count) > 1) {
435                         CERROR("obj "DFID" has count > 1 (%d)\n",
436                                PFID(&obj->lo_fid), atomic_read(&obj->lo_count));
437                 }
438                 __lmv_obj_put(obj);
439         }
440         spin_unlock(&obj_list_lock);
441         EXIT;
442 }