4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2012, Intel, Inc.
29 * lustre/lod/lod_object.c
31 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
35 # define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_MDS
40 #include <obd_class.h>
41 #include <lustre_ver.h>
42 #include <obd_support.h>
43 #include <lprocfs_status.h>
45 #include <lustre_fid.h>
46 #include <lustre_param.h>
47 #include <lustre_fid.h>
50 #include "lod_internal.h"
52 extern cfs_mem_cache_t *lod_object_kmem;
53 static const struct dt_body_operations lod_body_lnk_ops;
55 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
56 struct dt_rec *rec, const struct dt_key *key,
57 struct lustre_capa *capa)
59 struct dt_object *next = dt_object_child(dt);
60 return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
63 static int lod_declare_index_insert(const struct lu_env *env,
65 const struct dt_rec *rec,
66 const struct dt_key *key,
67 struct thandle *handle)
69 return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
72 static int lod_index_insert(const struct lu_env *env,
74 const struct dt_rec *rec,
75 const struct dt_key *key,
77 struct lustre_capa *capa,
80 return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
83 static int lod_declare_index_delete(const struct lu_env *env,
85 const struct dt_key *key,
88 return dt_declare_delete(env, dt_object_child(dt), key, th);
91 static int lod_index_delete(const struct lu_env *env,
93 const struct dt_key *key,
95 struct lustre_capa *capa)
97 return dt_delete(env, dt_object_child(dt), key, th, capa);
100 static struct dt_it *lod_it_init(const struct lu_env *env,
101 struct dt_object *dt, __u32 attr,
102 struct lustre_capa *capa)
104 struct dt_object *next = dt_object_child(dt);
106 return next->do_index_ops->dio_it.init(env, next, attr, capa);
109 static struct dt_index_operations lod_index_ops = {
110 .dio_lookup = lod_index_lookup,
111 .dio_declare_insert = lod_declare_index_insert,
112 .dio_insert = lod_index_insert,
113 .dio_declare_delete = lod_declare_index_delete,
114 .dio_delete = lod_index_delete,
120 static void lod_object_read_lock(const struct lu_env *env,
121 struct dt_object *dt, unsigned role)
123 dt_read_lock(env, dt_object_child(dt), role);
126 static void lod_object_write_lock(const struct lu_env *env,
127 struct dt_object *dt, unsigned role)
129 dt_write_lock(env, dt_object_child(dt), role);
132 static void lod_object_read_unlock(const struct lu_env *env,
133 struct dt_object *dt)
135 dt_read_unlock(env, dt_object_child(dt));
138 static void lod_object_write_unlock(const struct lu_env *env,
139 struct dt_object *dt)
141 dt_write_unlock(env, dt_object_child(dt));
144 static int lod_object_write_locked(const struct lu_env *env,
145 struct dt_object *dt)
147 return dt_write_locked(env, dt_object_child(dt));
150 static int lod_attr_get(const struct lu_env *env,
151 struct dt_object *dt,
152 struct lu_attr *attr,
153 struct lustre_capa *capa)
155 return dt_attr_get(env, dt_object_child(dt), attr, capa);
158 static int lod_declare_attr_set(const struct lu_env *env,
159 struct dt_object *dt,
160 const struct lu_attr *attr,
161 struct thandle *handle)
163 struct dt_object *next = dt_object_child(dt);
164 struct lod_object *lo = lod_dt_obj(dt);
169 * declare setattr on the local object
171 rc = dt_declare_attr_set(env, next, attr, handle);
176 * load striping information, notice we don't do this when object
177 * is being initialized as we don't need this information till
178 * few specific cases like destroy, chown
180 rc = lod_load_striping(env, lo);
185 * if object is striped declare changes on the stripes
187 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
188 for (i = 0; i < lo->ldo_stripenr; i++) {
189 LASSERT(lo->ldo_stripe[i]);
190 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
192 CERROR("failed declaration: %d\n", rc);
200 static int lod_attr_set(const struct lu_env *env,
201 struct dt_object *dt,
202 const struct lu_attr *attr,
203 struct thandle *handle,
204 struct lustre_capa *capa)
206 struct dt_object *next = dt_object_child(dt);
207 struct lod_object *lo = lod_dt_obj(dt);
212 * apply changes to the local object
214 rc = dt_attr_set(env, next, attr, handle, capa);
219 * if object is striped, apply changes to all the stripes
221 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
222 for (i = 0; i < lo->ldo_stripenr; i++) {
223 LASSERT(lo->ldo_stripe[i]);
224 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
226 CERROR("failed declaration: %d\n", rc);
234 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
235 struct lu_buf *buf, const char *name,
236 struct lustre_capa *capa)
238 return dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
242 * LOV xattr is a storage for striping, and LOD owns this xattr.
243 * but LOD allows others to control striping to some extent
245 * - to set new defined striping
246 * - to set new semi-defined striping
247 * - number of stripes is defined
248 * - number of stripes + osts are defined
251 static int lod_declare_xattr_set(const struct lu_env *env,
252 struct dt_object *dt,
253 const struct lu_buf *buf,
254 const char *name, int fl,
257 struct dt_object *next = dt_object_child(dt);
261 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
266 static int lod_xattr_set(const struct lu_env *env,
267 struct dt_object *dt, const struct lu_buf *buf,
268 const char *name, int fl, struct thandle *th,
269 struct lustre_capa *capa)
271 struct dt_object *next = dt_object_child(dt);
276 * behave transparantly for all other EAs
278 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
283 static int lod_declare_xattr_del(const struct lu_env *env,
284 struct dt_object *dt, const char *name,
287 return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
290 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
291 const char *name, struct thandle *th,
292 struct lustre_capa *capa)
294 return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
297 static int lod_xattr_list(const struct lu_env *env,
298 struct dt_object *dt, struct lu_buf *buf,
299 struct lustre_capa *capa)
301 return dt_xattr_list(env, dt_object_child(dt), buf, capa);
304 int lod_object_set_pool(struct lod_object *o, char *pool)
309 len = strlen(o->ldo_pool);
310 OBD_FREE(o->ldo_pool, len + 1);
315 OBD_ALLOC(o->ldo_pool, len + 1);
316 if (o->ldo_pool == NULL)
318 strcpy(o->ldo_pool, pool);
323 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
325 return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
328 static int lod_cache_parent_striping(const struct lu_env *env,
329 struct lod_object *lp)
331 struct lov_user_md_v1 *v1 = NULL;
332 struct lov_user_md_v3 *v3 = NULL;
336 /* dt_ah_init() is called from MDD without parent being write locked
338 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
339 if (lp->ldo_striping_cached)
340 GOTO(unlock, rc = 0);
342 rc = lod_get_lov_ea(env, lp);
346 if (rc < sizeof(struct lov_user_md)) {
347 /* don't lookup for non-existing or invalid striping */
348 lp->ldo_def_striping_set = 0;
349 lp->ldo_striping_cached = 1;
350 lp->ldo_def_stripe_size = 0;
351 lp->ldo_def_stripenr = 0;
352 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
353 GOTO(unlock, rc = 0);
356 v1 = (struct lov_user_md_v1 *)lod_env_info(env)->lti_ea_store;
357 if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
358 lustre_swab_lov_user_md_v1(v1);
359 else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
360 lustre_swab_lov_user_md_v3(v3);
362 if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
363 GOTO(unlock, rc = 0);
365 if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
366 GOTO(unlock, rc = 0);
368 lp->ldo_def_stripenr = v1->lmm_stripe_count;
369 lp->ldo_def_stripe_size = v1->lmm_stripe_size;
370 lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
371 lp->ldo_striping_cached = 1;
372 lp->ldo_def_striping_set = 1;
374 if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
375 /* XXX: sanity check here */
376 v3 = (struct lov_user_md_v3 *) v1;
377 if (v3->lmm_pool_name[0])
378 lod_object_set_pool(lp, v3->lmm_pool_name);
381 CDEBUG(D_OTHER, "def. striping: # %d, sz %d, off %d %s%s on "DFID"\n",
382 lp->ldo_def_stripenr, lp->ldo_def_stripe_size,
383 lp->ldo_def_stripe_offset, v3 ? "from " : "",
384 v3 ? lp->ldo_pool : "", PFID(lu_object_fid(&lp->ldo_obj.do_lu)));
388 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
393 * used to transfer default striping data to the object being created
395 static void lod_ah_init(const struct lu_env *env,
396 struct dt_allocation_hint *ah,
397 struct dt_object *parent,
398 struct dt_object *child,
399 cfs_umode_t child_mode)
401 struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
402 struct dt_object *nextp = NULL;
403 struct dt_object *nextc;
404 struct lod_object *lp = NULL;
405 struct lod_object *lc;
406 struct lov_desc *desc;
411 if (likely(parent)) {
412 nextp = dt_object_child(parent);
413 lp = lod_dt_obj(parent);
416 nextc = dt_object_child(child);
417 lc = lod_dt_obj(child);
419 LASSERT(lc->ldo_stripenr == 0);
420 LASSERT(lc->ldo_stripe == NULL);
423 * local object may want some hints
424 * in case of late striping creation, ->ah_init()
425 * can be called with local object existing
427 if (!dt_object_exists(nextc))
428 nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
430 if (S_ISDIR(child_mode)) {
431 if (lp->ldo_striping_cached == 0) {
432 /* we haven't tried to get default striping for
433 * the directory yet, let's cache it in the object */
434 lod_cache_parent_striping(env, lp);
436 /* transfer defaults to new directory */
437 if (lp->ldo_striping_cached) {
439 lod_object_set_pool(lc, lp->ldo_pool);
440 lc->ldo_def_stripenr = lp->ldo_def_stripenr;
441 lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
442 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
443 lc->ldo_striping_cached = 1;
444 lc->ldo_def_striping_set = 1;
445 CDEBUG(D_OTHER, "inherite striping defaults\n");
451 * if object is going to be striped over OSTs, transfer default
452 * striping information to the child, so that we can use it
453 * during declaration and creation
455 if (!lod_object_will_be_striped(S_ISREG(child_mode),
456 lu_object_fid(&child->do_lu)))
460 * try from the parent
462 if (likely(parent)) {
463 if (lp->ldo_striping_cached == 0) {
464 /* we haven't tried to get default striping for
465 * the directory yet, let's cache it in the object */
466 lod_cache_parent_striping(env, lp);
469 lc->ldo_def_stripe_offset = (__u16) -1;
471 if (lp->ldo_def_striping_set) {
473 lod_object_set_pool(lc, lp->ldo_pool);
474 lc->ldo_stripenr = lp->ldo_def_stripenr;
475 lc->ldo_stripe_size = lp->ldo_def_stripe_size;
476 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
477 CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
478 lc->ldo_stripenr, lc->ldo_stripe_size,
479 lp->ldo_pool ? lp->ldo_pool : "");
484 * if the parent doesn't provide with specific pattern, grab fs-wide one
487 if (lc->ldo_stripenr == 0)
488 lc->ldo_stripenr = desc->ld_default_stripe_count;
489 if (lc->ldo_stripe_size == 0)
490 lc->ldo_stripe_size = desc->ld_default_stripe_size;
491 CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
492 lc->ldo_stripenr, lc->ldo_stripe_size,
493 lc->ldo_pool ? lc->ldo_pool : "");
498 static int lod_declare_object_create(const struct lu_env *env,
499 struct dt_object *dt,
500 struct lu_attr *attr,
501 struct dt_allocation_hint *hint,
502 struct dt_object_format *dof,
505 struct dt_object *next = dt_object_child(dt);
512 LASSERT(!dt_object_exists(next));
515 * first of all, we declare creation of local object
517 rc = dt_declare_create(env, next, attr, hint, dof, th);
521 if (dof->dof_type == DFT_SYM)
522 dt->do_body_ops = &lod_body_lnk_ops;
528 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
529 struct lu_attr *attr,
530 struct dt_allocation_hint *hint,
531 struct dt_object_format *dof, struct thandle *th)
533 struct dt_object *next = dt_object_child(dt);
537 /* create local object */
538 rc = dt_create(env, next, attr, hint, dof, th);
543 static int lod_declare_object_destroy(const struct lu_env *env,
544 struct dt_object *dt,
547 struct dt_object *next = dt_object_child(dt);
548 struct lod_object *lo = lod_dt_obj(dt);
553 * we declare destroy for the local object
555 rc = dt_declare_destroy(env, next, th);
560 * load striping information, notice we don't do this when object
561 * is being initialized as we don't need this information till
562 * few specific cases like destroy, chown
564 rc = lod_load_striping(env, lo);
568 /* declare destroy for all underlying objects */
569 for (i = 0; i < lo->ldo_stripenr; i++) {
570 LASSERT(lo->ldo_stripe[i]);
571 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
580 static int lod_object_destroy(const struct lu_env *env,
581 struct dt_object *dt, struct thandle *th)
583 struct dt_object *next = dt_object_child(dt);
584 struct lod_object *lo = lod_dt_obj(dt);
588 /* destroy local object */
589 rc = dt_destroy(env, next, th);
593 /* destroy all underlying objects */
594 for (i = 0; i < lo->ldo_stripenr; i++) {
595 LASSERT(lo->ldo_stripe[i]);
596 rc = dt_destroy(env, lo->ldo_stripe[i], th);
604 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
605 const struct dt_index_features *feat)
607 struct dt_object *next = dt_object_child(dt);
611 LASSERT(next->do_ops);
612 LASSERT(next->do_ops->do_index_try);
614 rc = next->do_ops->do_index_try(env, next, feat);
615 if (next->do_index_ops && dt->do_index_ops == NULL) {
616 dt->do_index_ops = &lod_index_ops;
617 /* XXX: iterators don't accept device, so bypass LOD */
618 /* will be fixed with DNE */
619 if (lod_index_ops.dio_it.fini == NULL) {
620 lod_index_ops.dio_it = next->do_index_ops->dio_it;
621 lod_index_ops.dio_it.init = lod_it_init;
628 static int lod_declare_ref_add(const struct lu_env *env,
629 struct dt_object *dt, struct thandle *th)
631 return dt_declare_ref_add(env, dt_object_child(dt), th);
634 static int lod_ref_add(const struct lu_env *env,
635 struct dt_object *dt, struct thandle *th)
637 return dt_ref_add(env, dt_object_child(dt), th);
640 static int lod_declare_ref_del(const struct lu_env *env,
641 struct dt_object *dt, struct thandle *th)
643 return dt_declare_ref_del(env, dt_object_child(dt), th);
646 static int lod_ref_del(const struct lu_env *env,
647 struct dt_object *dt, struct thandle *th)
649 return dt_ref_del(env, dt_object_child(dt), th);
652 static struct obd_capa *lod_capa_get(const struct lu_env *env,
653 struct dt_object *dt,
654 struct lustre_capa *old, __u64 opc)
656 return dt_capa_get(env, dt_object_child(dt), old, opc);
659 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
661 return dt_object_sync(env, dt_object_child(dt));
664 struct dt_object_operations lod_obj_ops = {
665 .do_read_lock = lod_object_read_lock,
666 .do_write_lock = lod_object_write_lock,
667 .do_read_unlock = lod_object_read_unlock,
668 .do_write_unlock = lod_object_write_unlock,
669 .do_write_locked = lod_object_write_locked,
670 .do_attr_get = lod_attr_get,
671 .do_declare_attr_set = lod_declare_attr_set,
672 .do_attr_set = lod_attr_set,
673 .do_xattr_get = lod_xattr_get,
674 .do_declare_xattr_set = lod_declare_xattr_set,
675 .do_xattr_set = lod_xattr_set,
676 .do_declare_xattr_del = lod_declare_xattr_del,
677 .do_xattr_del = lod_xattr_del,
678 .do_xattr_list = lod_xattr_list,
679 .do_ah_init = lod_ah_init,
680 .do_declare_create = lod_declare_object_create,
681 .do_create = lod_object_create,
682 .do_declare_destroy = lod_declare_object_destroy,
683 .do_destroy = lod_object_destroy,
684 .do_index_try = lod_index_try,
685 .do_declare_ref_add = lod_declare_ref_add,
686 .do_ref_add = lod_ref_add,
687 .do_declare_ref_del = lod_declare_ref_del,
688 .do_ref_del = lod_ref_del,
689 .do_capa_get = lod_capa_get,
690 .do_object_sync = lod_object_sync,
693 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
694 struct lu_buf *buf, loff_t *pos,
695 struct lustre_capa *capa)
697 struct dt_object *next = dt_object_child(dt);
698 return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
701 static ssize_t lod_declare_write(const struct lu_env *env,
702 struct dt_object *dt,
703 const loff_t size, loff_t pos,
706 return dt_declare_record_write(env, dt_object_child(dt),
710 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
711 const struct lu_buf *buf, loff_t *pos,
712 struct thandle *th, struct lustre_capa *capa, int iq)
714 struct dt_object *next = dt_object_child(dt);
716 return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
719 static const struct dt_body_operations lod_body_lnk_ops = {
720 .dbo_read = lod_read,
721 .dbo_declare_write = lod_declare_write,
722 .dbo_write = lod_write
725 static int lod_object_init(const struct lu_env *env, struct lu_object *o,
726 const struct lu_object_conf *conf)
728 struct lod_device *d = lu2lod_dev(o->lo_dev);
729 struct lu_object *below;
730 struct lu_device *under;
734 * create local object
736 under = &d->lod_child->dd_lu_dev;
737 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
741 lu_object_add(o, below);
746 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
750 if (lo->ldo_stripe) {
751 LASSERT(lo->ldo_stripes_allocated > 0);
753 for (i = 0; i < lo->ldo_stripenr; i++) {
754 if (lo->ldo_stripe[i])
755 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
758 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
759 OBD_FREE(lo->ldo_stripe, i);
760 lo->ldo_stripe = NULL;
761 lo->ldo_stripes_allocated = 0;
763 lo->ldo_stripenr = 0;
767 * ->start is called once all slices are initialized, including header's
768 * cache for mode (object type). using the type we can initialize ops
770 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
772 if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
773 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
777 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
779 struct lod_object *mo = lu2lod_obj(o);
782 * release all underlying object pinned
785 lod_object_free_striping(env, mo);
787 lod_object_set_pool(mo, NULL);
790 OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
793 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
795 /* XXX: shouldn't we release everything here in case if object
796 * creation failed before? */
799 static int lod_object_print(const struct lu_env *env, void *cookie,
800 lu_printer_t p, const struct lu_object *l)
802 struct lod_object *o = lu2lod_obj((struct lu_object *) l);
804 return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
807 struct lu_object_operations lod_lu_obj_ops = {
808 .loo_object_init = lod_object_init,
809 .loo_object_start = lod_object_start,
810 .loo_object_free = lod_object_free,
811 .loo_object_release = lod_object_release,
812 .loo_object_print = lod_object_print,