Whamcloud - gitweb
ORNL-23 avoid unnecessary CMD lock contention in LMV
[fs/lustre-release.git] / lustre / lmv / lmv_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #ifndef EXPORT_SYMTAB
41 # define EXPORT_SYMTAB
42 #endif
43 #define DEBUG_SUBSYSTEM S_LMV
44 #ifdef __KERNEL__
45 #include <linux/slab.h>
46 #include <linux/module.h>
47 #include <linux/init.h>
48 #include <linux/slab.h>
49 #include <linux/pagemap.h>
50 #include <asm/div64.h>
51 #include <linux/seq_file.h>
52 #else
53 #include <liblustre.h>
54 #endif
55
56 #include <obd_support.h>
57 #include <lustre/lustre_idl.h>
58 #include <lustre_lib.h>
59 #include <lustre_net.h>
60 #include <lustre_dlm.h>
61 #include <obd_class.h>
62 #include <lprocfs_status.h>
63 #include "lmv_internal.h"
64
65 extern cfs_mem_cache_t *lmv_object_cache;
66 extern cfs_atomic_t lmv_object_count;
67
68 static CFS_LIST_HEAD(obj_list);
69 static cfs_spinlock_t obj_list_lock = CFS_SPIN_LOCK_UNLOCKED;
70
71 struct lmv_object *lmv_object_alloc(struct obd_device *obd,
72                                     const struct lu_fid *fid,
73                                     struct lmv_stripe_md *mea)
74 {
75         struct lmv_obd          *lmv = &obd->u.lmv;
76         unsigned int             obj_size;
77         struct lmv_object       *obj;
78         int                      i;
79
80         LASSERT(mea->mea_magic == MEA_MAGIC_LAST_CHAR
81                 || mea->mea_magic == MEA_MAGIC_ALL_CHARS
82                 || mea->mea_magic == MEA_MAGIC_HASH_SEGMENT);
83
84         OBD_SLAB_ALLOC_PTR(obj, lmv_object_cache);
85         if (!obj)
86                 return NULL;
87
88         cfs_atomic_inc(&lmv_object_count);
89
90         obj->lo_fid = *fid;
91         obj->lo_obd = obd;
92         obj->lo_state = 0;
93         obj->lo_hashtype = mea->mea_magic;
94
95         cfs_init_mutex(&obj->lo_guard);
96         cfs_atomic_set(&obj->lo_count, 0);
97         obj->lo_objcount = mea->mea_count;
98
99         obj_size = sizeof(struct lmv_stripe) * 
100                 lmv->desc.ld_tgt_count;
101
102         OBD_ALLOC_LARGE(obj->lo_stripes, obj_size);
103         if (!obj->lo_stripes)
104                 goto err_obj;
105
106         CDEBUG(D_INODE, "Allocate object for "DFID"\n", 
107                PFID(fid));
108         for (i = 0; i < mea->mea_count; i++) {
109                 int rc;
110
111                 CDEBUG(D_INODE, "Process subobject "DFID"\n", 
112                        PFID(&mea->mea_ids[i]));
113                 obj->lo_stripes[i].ls_fid = mea->mea_ids[i];
114                 LASSERT(fid_is_sane(&obj->lo_stripes[i].ls_fid));
115
116                 /*
117                  * Cache slave mds number to use it in all cases it is needed
118                  * instead of constant lookup.
119                  */
120                 rc = lmv_fld_lookup(lmv, &obj->lo_stripes[i].ls_fid,
121                                     &obj->lo_stripes[i].ls_mds);
122                 if (rc)
123                         goto err_obj;
124         }
125
126         return obj;
127 err_obj:
128         OBD_FREE(obj, sizeof(*obj));
129         return NULL;
130 }
131
132 void lmv_object_free(struct lmv_object *obj)
133 {
134         struct lmv_obd          *lmv = &obj->lo_obd->u.lmv;
135         unsigned int             obj_size;
136
137         LASSERT(!cfs_atomic_read(&obj->lo_count));
138
139         obj_size = sizeof(struct lmv_stripe) *
140                 lmv->desc.ld_tgt_count;
141
142         OBD_FREE_LARGE(obj->lo_stripes, obj_size);
143         OBD_SLAB_FREE(obj, lmv_object_cache, sizeof(*obj));
144         cfs_atomic_dec(&lmv_object_count);
145 }
146
147 static void __lmv_object_add(struct lmv_object *obj)
148 {
149         cfs_atomic_inc(&obj->lo_count);
150         cfs_list_add(&obj->lo_list, &obj_list);
151 }
152
153 void lmv_object_add(struct lmv_object *obj)
154 {
155         cfs_spin_lock(&obj_list_lock);
156         __lmv_object_add(obj);
157         cfs_spin_unlock(&obj_list_lock);
158 }
159
160 static void __lmv_object_del(struct lmv_object *obj)
161 {
162         cfs_list_del(&obj->lo_list);
163         lmv_object_free(obj);
164 }
165
166 void lmv_object_del(struct lmv_object *obj)
167 {
168         cfs_spin_lock(&obj_list_lock);
169         __lmv_object_del(obj);
170         cfs_spin_unlock(&obj_list_lock);
171 }
172
173 static struct lmv_object *__lmv_object_get(struct lmv_object *obj)
174 {
175         LASSERT(obj != NULL);
176         cfs_atomic_inc(&obj->lo_count);
177         return obj;
178 }
179
180 struct lmv_object *lmv_object_get(struct lmv_object *obj)
181 {
182         cfs_spin_lock(&obj_list_lock);
183         __lmv_object_get(obj);
184         cfs_spin_unlock(&obj_list_lock);
185         return obj;
186 }
187
188 static void __lmv_object_put(struct lmv_object *obj)
189 {
190         LASSERT(obj);
191
192         if (cfs_atomic_dec_and_test(&obj->lo_count)) {
193                 CDEBUG(D_INODE, "Last reference to "DFID" - "
194                        "destroying\n", PFID(&obj->lo_fid));
195                 __lmv_object_del(obj);
196         }
197 }
198
199 void lmv_object_put(struct lmv_object *obj)
200 {
201         cfs_spin_lock(&obj_list_lock);
202         __lmv_object_put(obj);
203         cfs_spin_unlock(&obj_list_lock);
204 }
205
206 void lmv_object_put_unlock(struct lmv_object *obj)
207 {
208         lmv_object_unlock(obj);
209         lmv_object_put(obj);
210 }
211
212 static struct lmv_object *__lmv_object_find(struct obd_device *obd, const struct lu_fid *fid)
213 {
214         struct lmv_object       *obj;
215         cfs_list_t              *cur;
216
217         cfs_list_for_each(cur, &obj_list) {
218                 obj = cfs_list_entry(cur, struct lmv_object, lo_list);
219
220                 /*
221                  * Check if object is in destroying phase. If so - skip
222                  * it.
223                  */
224                 if (obj->lo_state & O_FREEING)
225                         continue;
226
227                 /*
228                  * We should make sure, that we have found object belong to
229                  * passed obd. It is possible that, object manager will have two
230                  * objects with the same fid belong to different obds, if client
231                  * and mds runs on the same host. May be it is good idea to have
232                  * objects list associated with obd.
233                  */
234                 if (obj->lo_obd != obd)
235                         continue;
236
237                 /*
238                  * Check if this is what we're looking for.
239                  */
240                 if (lu_fid_eq(&obj->lo_fid, fid))
241                         return __lmv_object_get(obj);
242         }
243
244         return NULL;
245 }
246
247 struct lmv_object *lmv_object_find(struct obd_device *obd,
248                                    const struct lu_fid *fid)
249 {
250         struct lmv_obd          *lmv = &obd->u.lmv;
251         struct lmv_object       *obj = NULL;
252         ENTRY;
253
254         /* For single MDT case, lmv_object list is always empty. */
255         if (lmv->desc.ld_tgt_count > 1) {
256                 cfs_spin_lock(&obj_list_lock);
257                 obj = __lmv_object_find(obd, fid);
258                 cfs_spin_unlock(&obj_list_lock);
259         }
260
261         RETURN(obj);
262 }
263
264 struct lmv_object *lmv_object_find_lock(struct obd_device *obd, 
265                                         const struct lu_fid *fid)
266 {
267         struct lmv_object       *obj;
268         ENTRY;
269
270         obj = lmv_object_find(obd, fid);
271         if (obj)
272                 lmv_object_lock(obj);
273
274         RETURN(obj);
275 }
276
277 static struct lmv_object *__lmv_object_create(struct obd_device *obd, 
278                                               const struct lu_fid *fid,
279                                               struct lmv_stripe_md *mea)
280 {
281         struct lmv_object       *new;
282         struct lmv_object       *obj;
283         ENTRY;
284
285         obj = lmv_object_find(obd, fid);
286         if (obj)
287                 RETURN(obj);
288
289         new = lmv_object_alloc(obd, fid, mea);
290         if (!new)
291                 RETURN(NULL);
292
293         /* 
294          * Check if someone created it already while we were dealing with
295          * allocating @obj. 
296          */
297         cfs_spin_lock(&obj_list_lock);
298         obj = __lmv_object_find(obd, fid);
299         if (obj) {
300                 /* 
301                  * Someone created it already - put @obj and getting out. 
302                  */
303                 cfs_spin_unlock(&obj_list_lock);
304                 lmv_object_free(new);
305                 RETURN(obj);
306         }
307
308         __lmv_object_add(new);
309         __lmv_object_get(new);
310
311         cfs_spin_unlock(&obj_list_lock);
312
313         CDEBUG(D_INODE, "New obj in lmv cache: "DFID"\n",
314                PFID(fid));
315
316         RETURN(new);
317 }
318
319 struct lmv_object *lmv_object_create(struct obd_export *exp, 
320                                      const struct lu_fid *fid,
321                                      struct lmv_stripe_md *mea)
322 {
323         struct obd_device       *obd = exp->exp_obd;
324         struct lmv_obd          *lmv = &obd->u.lmv;
325         struct ptlrpc_request   *req = NULL;
326         struct lmv_tgt_desc     *tgt;
327         struct lmv_object       *obj;
328         struct lustre_md         md;
329         int                      mealen;
330         int                      rc;
331         ENTRY;
332
333         CDEBUG(D_INODE, "Get mea for "DFID" and create lmv obj\n",
334                PFID(fid));
335
336         md.mea = NULL;
337
338         if (mea == NULL) {
339                 struct md_op_data *op_data;
340                 __u64 valid;
341
342                 CDEBUG(D_INODE, "Mea isn't passed in, get it now\n");
343                 mealen = lmv_get_easize(lmv);
344
345                 /*
346                  * Time to update mea of parent fid.
347                  */
348                 md.mea = NULL;
349                 valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
350
351                 tgt = lmv_find_target(lmv, fid);
352                 if (IS_ERR(tgt))
353                         GOTO(cleanup, obj = (void *)tgt);
354
355                 OBD_ALLOC_PTR(op_data);
356                 if (op_data == NULL)
357                         GOTO(cleanup, obj = ERR_PTR(-ENOMEM));
358
359                 op_data->op_fid1 = *fid;
360                 op_data->op_mode = mealen;
361                 op_data->op_valid = valid;
362                 rc = md_getattr(tgt->ltd_exp, op_data, &req);
363                 OBD_FREE_PTR(op_data);
364                 if (rc) {
365                         CERROR("md_getattr() failed, error %d\n", rc);
366                         GOTO(cleanup, obj = ERR_PTR(rc));
367                 }
368
369                 rc = md_get_lustre_md(exp, req, NULL, exp, &md);
370                 if (rc) {
371                         CERROR("md_get_lustre_md() failed, error %d\n", rc);
372                         GOTO(cleanup, obj = ERR_PTR(rc));
373                 }
374
375                 if (md.mea == NULL)
376                         GOTO(cleanup, obj = ERR_PTR(-ENODATA));
377
378                 mea = md.mea;
379         }
380
381         /*
382          * Got mea, now create obj for it.
383          */
384         obj = __lmv_object_create(obd, fid, mea);
385         if (!obj) {
386                 CERROR("Can't create new object "DFID"\n",
387                        PFID(fid));
388                 GOTO(cleanup, obj = ERR_PTR(-ENOMEM));
389         }
390
391         if (md.mea != NULL)
392                 obd_free_memmd(exp, (void *)&md.mea);
393
394         EXIT;
395 cleanup:
396         if (req)
397                 ptlrpc_req_finished(req);
398         return obj;
399 }
400
401 int lmv_object_delete(struct obd_export *exp, const struct lu_fid *fid)
402 {
403         struct obd_device       *obd = exp->exp_obd;
404         struct lmv_object       *obj;
405         int                      rc = 0;
406         ENTRY;
407
408         cfs_spin_lock(&obj_list_lock);
409         obj = __lmv_object_find(obd, fid);
410         if (obj) {
411                 obj->lo_state |= O_FREEING;
412                 __lmv_object_put(obj);
413                 __lmv_object_put(obj);
414                 rc = 1;
415         }
416         cfs_spin_unlock(&obj_list_lock);
417         RETURN(rc);
418 }
419
420 int lmv_object_setup(struct obd_device *obd)
421 {
422         ENTRY;
423         LASSERT(obd != NULL);
424
425         CDEBUG(D_INFO, "LMV object manager setup (%s)\n",
426                obd->obd_uuid.uuid);
427
428         RETURN(0);
429 }
430
431 void lmv_object_cleanup(struct obd_device *obd)
432 {
433         cfs_list_t              *cur;
434         cfs_list_t              *tmp;
435         struct lmv_object       *obj;
436         ENTRY;
437
438         CDEBUG(D_INFO, "LMV object manager cleanup (%s)\n",
439                obd->obd_uuid.uuid);
440
441         cfs_spin_lock(&obj_list_lock);
442         cfs_list_for_each_safe(cur, tmp, &obj_list) {
443                 obj = cfs_list_entry(cur, struct lmv_object, lo_list);
444
445                 if (obj->lo_obd != obd)
446                         continue;
447
448                 obj->lo_state |= O_FREEING;
449                 if (cfs_atomic_read(&obj->lo_count) > 1) {
450                         CERROR("Object "DFID" has count (%d)\n", 
451                                PFID(&obj->lo_fid),
452                                cfs_atomic_read(&obj->lo_count));
453                 }
454                 __lmv_object_put(obj);
455         }
456         cfs_spin_unlock(&obj_list_lock);
457         EXIT;
458 }