4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2013, Intel Corporation.
29 * lustre/lod/lod_object.c
31 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
49 #include "lod_internal.h"
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58 struct dt_rec *rec, const struct dt_key *key,
59 struct lustre_capa *capa)
61 struct dt_object *next = dt_object_child(dt);
62 return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
65 static int lod_declare_index_insert(const struct lu_env *env,
67 const struct dt_rec *rec,
68 const struct dt_key *key,
69 struct thandle *handle)
71 return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
74 static int lod_index_insert(const struct lu_env *env,
76 const struct dt_rec *rec,
77 const struct dt_key *key,
79 struct lustre_capa *capa,
82 return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
85 static int lod_declare_index_delete(const struct lu_env *env,
87 const struct dt_key *key,
90 return dt_declare_delete(env, dt_object_child(dt), key, th);
93 static int lod_index_delete(const struct lu_env *env,
95 const struct dt_key *key,
97 struct lustre_capa *capa)
99 return dt_delete(env, dt_object_child(dt), key, th, capa);
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103 struct dt_object *dt, __u32 attr,
104 struct lustre_capa *capa)
106 struct dt_object *next = dt_object_child(dt);
107 struct lod_it *it = &lod_env_info(env)->lti_it;
108 struct dt_it *it_next;
111 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
115 /* currently we do not use more than one iterator per thread
116 * so we store it in thread info. if at some point we need
117 * more active iterators in a single thread, we can allocate
119 LASSERT(it->lit_obj == NULL);
121 it->lit_it = it_next;
124 return (struct dt_it *)it;
127 #define LOD_CHECK_IT(env, it) \
129 LASSERT((it)->lit_obj != NULL); \
130 LASSERT((it)->lit_it != NULL); \
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
135 struct lod_it *it = (struct lod_it *)di;
137 LOD_CHECK_IT(env, it);
138 it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
140 /* the iterator not in use any more */
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146 const struct dt_key *key)
148 const struct lod_it *it = (const struct lod_it *)di;
150 LOD_CHECK_IT(env, it);
151 return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
156 struct lod_it *it = (struct lod_it *)di;
158 LOD_CHECK_IT(env, it);
159 return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
164 struct lod_it *it = (struct lod_it *)di;
166 LOD_CHECK_IT(env, it);
167 return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
172 const struct lod_it *it = (const struct lod_it *)di;
174 LOD_CHECK_IT(env, it);
175 return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
180 struct lod_it *it = (struct lod_it *)di;
182 LOD_CHECK_IT(env, it);
183 return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187 struct dt_rec *rec, __u32 attr)
189 const struct lod_it *it = (const struct lod_it *)di;
191 LOD_CHECK_IT(env, it);
192 return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
199 const struct lod_it *it = (const struct lod_it *)di;
201 LOD_CHECK_IT(env, it);
202 return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
208 const struct lod_it *it = (const struct lod_it *)di;
210 LOD_CHECK_IT(env, it);
211 return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
216 const struct lod_it *it = (const struct lod_it *)di;
218 LOD_CHECK_IT(env, it);
219 return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
225 const struct lod_it *it = (const struct lod_it *)di;
227 LOD_CHECK_IT(env, it);
228 return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
232 static struct dt_index_operations lod_index_ops = {
233 .dio_lookup = lod_index_lookup,
234 .dio_declare_insert = lod_declare_index_insert,
235 .dio_insert = lod_index_insert,
236 .dio_declare_delete = lod_declare_index_delete,
237 .dio_delete = lod_index_delete,
245 .key_size = lod_it_key_size,
247 .rec_size = lod_it_rec_size,
248 .store = lod_it_store,
250 .key_rec = lod_it_key_rec,
255 * Implementation of dt_index_operations:: dio_it.init
257 * This function is to initialize the iterator for striped directory,
258 * basically these lod_striped_it_xxx will just locate the stripe
259 * and call the correspondent api of its next lower layer.
261 * \param[in] env execution environment.
262 * \param[in] dt the striped directory object to be iterated.
263 * \param[in] attr the attribute of iterator, mostly used to indicate
264 * the entry attribute in the object to be iterated.
265 * \param[in] capa capability(useless in current implementation)
267 * \retval initialized iterator(dt_it) if successful initialize the
268 * iteration. lit_stripe_index will be used to indicate the
269 * current iterate position among stripes.
270 * \retval ERR pointer if initialization is failed.
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273 struct dt_object *dt, __u32 attr,
274 struct lustre_capa *capa)
276 struct lod_object *lo = lod_dt_obj(dt);
277 struct dt_object *next;
278 struct lod_it *it = &lod_env_info(env)->lti_it;
279 struct dt_it *it_next;
282 LASSERT(lo->ldo_stripenr > 0);
283 next = lo->ldo_stripe[0];
284 LASSERT(next != NULL);
285 LASSERT(next->do_index_ops != NULL);
287 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
291 /* currently we do not use more than one iterator per thread
292 * so we store it in thread info. if at some point we need
293 * more active iterators in a single thread, we can allocate
295 LASSERT(it->lit_obj == NULL);
297 it->lit_stripe_index = 0;
299 it->lit_it = it_next;
302 return (struct dt_it *)it;
305 #define LOD_CHECK_STRIPED_IT(env, it, lo) \
307 LASSERT((it)->lit_obj != NULL); \
308 LASSERT((it)->lit_it != NULL); \
309 LASSERT((lo)->ldo_stripenr > 0); \
310 LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \
314 * Implementation of dt_index_operations:: dio_it.fini
316 * This function is to finish the iterator for striped directory.
318 * \param[in] env execution environment.
319 * \param[in] di the iterator for the striped directory
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
324 struct lod_it *it = (struct lod_it *)di;
325 struct lod_object *lo = lod_dt_obj(it->lit_obj);
326 struct dt_object *next;
328 LOD_CHECK_STRIPED_IT(env, it, lo);
330 next = lo->ldo_stripe[it->lit_stripe_index];
331 LASSERT(next != NULL);
332 LASSERT(next->do_index_ops != NULL);
334 next->do_index_ops->dio_it.fini(env, it->lit_it);
336 /* the iterator not in use any more */
339 it->lit_stripe_index = 0;
343 * Implementation of dt_index_operations:: dio_it.get
345 * This function is to position the iterator with given key
347 * \param[in] env execution environment.
348 * \param[in] di the iterator for striped directory.
349 * \param[in] key the key the iterator will be positioned.
351 * \retval 0 if successfully position iterator by the key.
352 * \retval negative error if position is failed.
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355 const struct dt_key *key)
357 const struct lod_it *it = (const struct lod_it *)di;
358 struct lod_object *lo = lod_dt_obj(it->lit_obj);
359 struct dt_object *next;
362 LOD_CHECK_STRIPED_IT(env, it, lo);
364 next = lo->ldo_stripe[it->lit_stripe_index];
365 LASSERT(next != NULL);
366 LASSERT(next->do_index_ops != NULL);
368 return next->do_index_ops->dio_it.get(env, it->lit_it, key);
372 * Implementation of dt_index_operations:: dio_it.put
374 * This function is supposed to be the pair of it_get, but currently do
375 * nothing. see (osd_it_ea_put or osd_index_it_put)
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
379 struct lod_it *it = (struct lod_it *)di;
380 struct lod_object *lo = lod_dt_obj(it->lit_obj);
381 struct dt_object *next;
383 LOD_CHECK_STRIPED_IT(env, it, lo);
385 next = lo->ldo_stripe[it->lit_stripe_index];
386 LASSERT(next != NULL);
387 LASSERT(next->do_index_ops != NULL);
389 return next->do_index_ops->dio_it.put(env, it->lit_it);
393 * Implementation of dt_index_operations:: dio_it.next
395 * This function is to position the iterator to the next entry, if current
396 * stripe is finished by checking the return value of next() in current
397 * stripe. it will go to next stripe. In the mean time, the sub-iterator
398 * for next stripe needs to be initialized.
400 * \param[in] env execution environment.
401 * \param[in] di the iterator for striped directory.
403 * \retval 0 if successfully position iterator to the next entry.
404 * \retval negative error if position is failed.
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
408 struct lod_it *it = (struct lod_it *)di;
409 struct lod_object *lo = lod_dt_obj(it->lit_obj);
410 struct dt_object *next;
411 struct dt_it *it_next;
415 LOD_CHECK_STRIPED_IT(env, it, lo);
417 next = lo->ldo_stripe[it->lit_stripe_index];
418 LASSERT(next != NULL);
419 LASSERT(next->do_index_ops != NULL);
421 rc = next->do_index_ops->dio_it.next(env, it->lit_it);
425 if (rc == 0 && it->lit_stripe_index == 0)
428 if (rc == 0 && it->lit_stripe_index > 0) {
429 struct lu_dirent *ent;
431 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
433 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434 (struct dt_rec *)ent,
439 /* skip . and .. for slave stripe */
440 if ((strncmp(ent->lde_name, ".",
441 le16_to_cpu(ent->lde_namelen)) == 0 &&
442 le16_to_cpu(ent->lde_namelen) == 1) ||
443 (strncmp(ent->lde_name, "..",
444 le16_to_cpu(ent->lde_namelen)) == 0 &&
445 le16_to_cpu(ent->lde_namelen) == 2))
451 /* go to next stripe */
452 if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
455 it->lit_stripe_index++;
457 next->do_index_ops->dio_it.put(env, it->lit_it);
458 next->do_index_ops->dio_it.fini(env, it->lit_it);
460 rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
464 next = lo->ldo_stripe[it->lit_stripe_index];
465 LASSERT(next != NULL);
466 LASSERT(next->do_index_ops != NULL);
468 it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
470 if (!IS_ERR(it_next)) {
471 it->lit_it = it_next;
474 rc = PTR_ERR(it_next);
481 * Implementation of dt_index_operations:: dio_it.key
483 * This function is to get the key of the iterator at current position.
485 * \param[in] env execution environment.
486 * \param[in] di the iterator for striped directory.
488 * \retval key(dt_key) if successfully get the key.
489 * \retval negative error if can not get the key.
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492 const struct dt_it *di)
494 const struct lod_it *it = (const struct lod_it *)di;
495 struct lod_object *lo = lod_dt_obj(it->lit_obj);
496 struct dt_object *next;
498 LOD_CHECK_STRIPED_IT(env, it, lo);
500 next = lo->ldo_stripe[it->lit_stripe_index];
501 LASSERT(next != NULL);
502 LASSERT(next->do_index_ops != NULL);
504 return next->do_index_ops->dio_it.key(env, it->lit_it);
508 * Implementation of dt_index_operations:: dio_it.key_size
510 * This function is to get the key_size of current key.
512 * \param[in] env execution environment.
513 * \param[in] di the iterator for striped directory.
515 * \retval key_size if successfully get the key_size.
516 * \retval negative error if can not get the key_size.
518 static int lod_striped_it_key_size(const struct lu_env *env,
519 const struct dt_it *di)
521 struct lod_it *it = (struct lod_it *)di;
522 struct lod_object *lo = lod_dt_obj(it->lit_obj);
523 struct dt_object *next;
525 LOD_CHECK_STRIPED_IT(env, it, lo);
527 next = lo->ldo_stripe[it->lit_stripe_index];
528 LASSERT(next != NULL);
529 LASSERT(next->do_index_ops != NULL);
531 return next->do_index_ops->dio_it.key_size(env, it->lit_it);
535 * Implementation of dt_index_operations:: dio_it.rec
537 * This function is to get the record at current position.
539 * \param[in] env execution environment.
540 * \param[in] di the iterator for striped directory.
541 * \param[in] attr the attribute of iterator, mostly used to indicate
542 * the entry attribute in the object to be iterated.
543 * \param[out] rec hold the return record.
545 * \retval 0 if successfully get the entry.
546 * \retval negative error if can not get entry.
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549 struct dt_rec *rec, __u32 attr)
551 const struct lod_it *it = (const struct lod_it *)di;
552 struct lod_object *lo = lod_dt_obj(it->lit_obj);
553 struct dt_object *next;
555 LOD_CHECK_STRIPED_IT(env, it, lo);
557 next = lo->ldo_stripe[it->lit_stripe_index];
558 LASSERT(next != NULL);
559 LASSERT(next->do_index_ops != NULL);
561 return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
565 * Implementation of dt_index_operations:: dio_it.rec_size
567 * This function is to get the record_size at current record.
569 * \param[in] env execution environment.
570 * \param[in] di the iterator for striped directory.
571 * \param[in] attr the attribute of iterator, mostly used to indicate
572 * the entry attribute in the object to be iterated.
574 * \retval rec_size if successfully get the entry size.
575 * \retval negative error if can not get entry size.
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578 const struct dt_it *di, __u32 attr)
580 struct lod_it *it = (struct lod_it *)di;
581 struct lod_object *lo = lod_dt_obj(it->lit_obj);
582 struct dt_object *next;
584 LOD_CHECK_STRIPED_IT(env, it, lo);
586 next = lo->ldo_stripe[it->lit_stripe_index];
587 LASSERT(next != NULL);
588 LASSERT(next->do_index_ops != NULL);
590 return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
594 * Implementation of dt_index_operations:: dio_it.store
596 * This function will a cookie for current position of the iterator head,
597 * so that user can use this cookie to load/start the iterator next time.
599 * \param[in] env execution environment.
600 * \param[in] di the iterator for striped directory.
602 * \retval the cookie.
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605 const struct dt_it *di)
607 const struct lod_it *it = (const struct lod_it *)di;
608 struct lod_object *lo = lod_dt_obj(it->lit_obj);
609 struct dt_object *next;
611 LOD_CHECK_STRIPED_IT(env, it, lo);
613 next = lo->ldo_stripe[it->lit_stripe_index];
614 LASSERT(next != NULL);
615 LASSERT(next->do_index_ops != NULL);
617 return next->do_index_ops->dio_it.store(env, it->lit_it);
621 * Implementation of dt_index_operations:: dio_it.load
623 * This function will position the iterator with the given hash(usually
626 * \param[in] env execution environment.
627 * \param[in] di the iterator for striped directory.
628 * \param[in] hash the given hash.
630 * \retval >0 if successfuly load the iterator to the given position.
631 * \retval <0 if load is failed.
633 static int lod_striped_it_load(const struct lu_env *env,
634 const struct dt_it *di, __u64 hash)
636 const struct lod_it *it = (const struct lod_it *)di;
637 struct lod_object *lo = lod_dt_obj(it->lit_obj);
638 struct dt_object *next;
640 LOD_CHECK_STRIPED_IT(env, it, lo);
642 next = lo->ldo_stripe[it->lit_stripe_index];
643 LASSERT(next != NULL);
644 LASSERT(next->do_index_ops != NULL);
646 return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
649 static struct dt_index_operations lod_striped_index_ops = {
650 .dio_lookup = lod_index_lookup,
651 .dio_declare_insert = lod_declare_index_insert,
652 .dio_insert = lod_index_insert,
653 .dio_declare_delete = lod_declare_index_delete,
654 .dio_delete = lod_index_delete,
656 .init = lod_striped_it_init,
657 .fini = lod_striped_it_fini,
658 .get = lod_striped_it_get,
659 .put = lod_striped_it_put,
660 .next = lod_striped_it_next,
661 .key = lod_striped_it_key,
662 .key_size = lod_striped_it_key_size,
663 .rec = lod_striped_it_rec,
664 .rec_size = lod_striped_it_rec_size,
665 .store = lod_striped_it_store,
666 .load = lod_striped_it_load,
671 * Implementation of dt_object_operations:: do_index_try
673 * This function will try to initialize the index api pointer for the
674 * given object, usually it the entry point of the index api. i.e.
675 * the index object should be initialized in index_try, then start
676 * using index api. For striped directory, it will try to initialize
677 * all of its sub_stripes.
679 * \param[in] env execution environment.
680 * \param[in] dt the index object to be initialized.
681 * \param[in] feat the features of this object, for example fixed or
682 * variable key size etc.
684 * \retval >0 if the initialization is successful.
685 * \retval <0 if the initialization is failed.
687 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
688 const struct dt_index_features *feat)
690 struct lod_object *lo = lod_dt_obj(dt);
691 struct dt_object *next = dt_object_child(dt);
695 LASSERT(next->do_ops);
696 LASSERT(next->do_ops->do_index_try);
698 rc = lod_load_striping_locked(env, lo);
702 rc = next->do_ops->do_index_try(env, next, feat);
706 if (lo->ldo_stripenr > 0) {
709 for (i = 0; i < lo->ldo_stripenr; i++) {
710 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
712 rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
713 lo->ldo_stripe[i], feat);
717 dt->do_index_ops = &lod_striped_index_ops;
719 dt->do_index_ops = &lod_index_ops;
725 static void lod_object_read_lock(const struct lu_env *env,
726 struct dt_object *dt, unsigned role)
728 dt_read_lock(env, dt_object_child(dt), role);
731 static void lod_object_write_lock(const struct lu_env *env,
732 struct dt_object *dt, unsigned role)
734 dt_write_lock(env, dt_object_child(dt), role);
737 static void lod_object_read_unlock(const struct lu_env *env,
738 struct dt_object *dt)
740 dt_read_unlock(env, dt_object_child(dt));
743 static void lod_object_write_unlock(const struct lu_env *env,
744 struct dt_object *dt)
746 dt_write_unlock(env, dt_object_child(dt));
749 static int lod_object_write_locked(const struct lu_env *env,
750 struct dt_object *dt)
752 return dt_write_locked(env, dt_object_child(dt));
755 static int lod_attr_get(const struct lu_env *env,
756 struct dt_object *dt,
757 struct lu_attr *attr,
758 struct lustre_capa *capa)
760 /* Note: for striped directory, client will merge attributes
761 * from all of the sub-stripes see lmv_merge_attr(), and there
762 * no MDD logic depend on directory nlink/size/time, so we can
763 * always use master inode nlink and size for now. */
764 return dt_attr_get(env, dt_object_child(dt), attr, capa);
768 * Mark all of sub-stripes dead of the striped directory.
770 static int lod_mark_dead_object(const struct lu_env *env,
771 struct dt_object *dt,
772 struct thandle *handle,
775 struct lod_object *lo = lod_dt_obj(dt);
776 struct lmv_mds_md_v1 *lmv;
777 __u32 dead_hash_type;
783 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
786 rc = lod_load_striping_locked(env, lo);
790 if (lo->ldo_stripenr == 0)
793 rc = lod_get_lmv_ea(env, lo);
797 lmv = lod_env_info(env)->lti_ea_store;
798 lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
799 dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
800 lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
801 for (i = 0; i < lo->ldo_stripenr; i++) {
804 lmv->lmv_master_mdt_index = i;
806 buf.lb_len = sizeof(*lmv);
808 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
810 LU_XATTR_REPLACE, handle);
812 rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
813 XATTR_NAME_LMV, LU_XATTR_REPLACE,
814 handle, BYPASS_CAPA);
823 static int lod_declare_attr_set(const struct lu_env *env,
824 struct dt_object *dt,
825 const struct lu_attr *attr,
826 struct thandle *handle)
828 struct dt_object *next = dt_object_child(dt);
829 struct lod_object *lo = lod_dt_obj(dt);
833 /* Set dead object on all other stripes */
834 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
835 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
836 rc = lod_mark_dead_object(env, dt, handle, true);
841 * declare setattr on the local object
843 rc = dt_declare_attr_set(env, next, attr, handle);
847 /* osp_declare_attr_set() ignores all attributes other than
848 * UID, GID, and size, and osp_attr_set() ignores all but UID
849 * and GID. Declaration of size attr setting happens through
850 * lod_declare_init_size(), and not through this function.
851 * Therefore we need not load striping unless ownership is
852 * changing. This should save memory and (we hope) speed up
854 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
855 if (!(attr->la_valid & (LA_UID | LA_GID)))
858 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
861 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
862 LA_ATIME | LA_MTIME | LA_CTIME)))
866 * load striping information, notice we don't do this when object
867 * is being initialized as we don't need this information till
868 * few specific cases like destroy, chown
870 rc = lod_load_striping(env, lo);
874 if (lo->ldo_stripenr == 0)
878 * if object is striped declare changes on the stripes
880 LASSERT(lo->ldo_stripe);
881 for (i = 0; i < lo->ldo_stripenr; i++) {
882 if (likely(lo->ldo_stripe[i] != NULL)) {
883 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
886 CERROR("failed declaration: %d\n", rc);
892 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
893 dt_object_exists(next) != 0 &&
894 dt_object_remote(next) == 0)
895 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
897 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
898 dt_object_exists(next) &&
899 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
900 struct lod_thread_info *info = lod_env_info(env);
901 struct lu_buf *buf = &info->lti_buf;
903 buf->lb_buf = info->lti_ea_store;
904 buf->lb_len = info->lti_ea_store_size;
905 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
906 LU_XATTR_REPLACE, handle);
912 static int lod_attr_set(const struct lu_env *env,
913 struct dt_object *dt,
914 const struct lu_attr *attr,
915 struct thandle *handle,
916 struct lustre_capa *capa)
918 struct dt_object *next = dt_object_child(dt);
919 struct lod_object *lo = lod_dt_obj(dt);
923 /* Set dead object on all other stripes */
924 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
925 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
926 rc = lod_mark_dead_object(env, dt, handle, false);
931 * apply changes to the local object
933 rc = dt_attr_set(env, next, attr, handle, capa);
937 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
938 if (!(attr->la_valid & (LA_UID | LA_GID)))
941 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
944 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
945 LA_ATIME | LA_MTIME | LA_CTIME)))
949 if (lo->ldo_stripenr == 0)
953 * if object is striped, apply changes to all the stripes
955 LASSERT(lo->ldo_stripe);
956 for (i = 0; i < lo->ldo_stripenr; i++) {
957 if (likely(lo->ldo_stripe[i] != NULL)) {
958 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
961 rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
964 CERROR("failed declaration: %d\n", rc);
970 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
971 dt_object_exists(next) != 0 &&
972 dt_object_remote(next) == 0)
973 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
975 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
976 dt_object_exists(next) &&
977 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
978 struct lod_thread_info *info = lod_env_info(env);
979 struct lu_buf *buf = &info->lti_buf;
980 struct ost_id *oi = &info->lti_ostid;
981 struct lu_fid *fid = &info->lti_fid;
982 struct lov_mds_md_v1 *lmm;
983 struct lov_ost_data_v1 *objs;
987 rc1 = lod_get_lov_ea(env, lo);
991 buf->lb_buf = info->lti_ea_store;
992 buf->lb_len = info->lti_ea_store_size;
993 lmm = info->lti_ea_store;
994 magic = le32_to_cpu(lmm->lmm_magic);
995 if (magic == LOV_MAGIC_V1)
996 objs = &(lmm->lmm_objects[0]);
998 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
999 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1000 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1002 fid_to_ostid(fid, oi);
1003 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1004 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1005 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1011 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1012 struct lu_buf *buf, const char *name,
1013 struct lustre_capa *capa)
1015 struct lod_thread_info *info = lod_env_info(env);
1016 struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev);
1020 rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1021 if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1025 * lod returns default striping on the real root of the device
1026 * this is like the root stores default striping for the whole
1027 * filesystem. historically we've been using a different approach
1028 * and store it in the config.
1030 dt_root_get(env, dev->lod_child, &info->lti_fid);
1031 is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1033 if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1034 struct lov_user_md *lum = buf->lb_buf;
1035 struct lov_desc *desc = &dev->lod_desc;
1037 if (buf->lb_buf == NULL) {
1039 } else if (buf->lb_len >= sizeof(*lum)) {
1040 lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1041 lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1042 lmm_oi_set_id(&lum->lmm_oi, 0);
1043 lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1044 lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1045 lum->lmm_stripe_size = cpu_to_le32(
1046 desc->ld_default_stripe_size);
1047 lum->lmm_stripe_count = cpu_to_le16(
1048 desc->ld_default_stripe_count);
1049 lum->lmm_stripe_offset = cpu_to_le16(
1050 desc->ld_default_stripe_offset);
1060 static int lod_verify_md_striping(struct lod_device *lod,
1061 const struct lmv_user_md_v1 *lum)
1066 if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1067 GOTO(out, rc = -EINVAL);
1069 if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1070 GOTO(out, rc = -EINVAL);
1073 CERROR("%s: invalid lmv_user_md: magic = %x, "
1074 "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1075 lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1076 (int)le32_to_cpu(lum->lum_stripe_offset),
1077 le32_to_cpu(lum->lum_stripe_count), rc);
1082 * Master LMVEA will be same as slave LMVEA, except
1083 * 1. different magic
1084 * 2. No lmv_stripe_fids on slave
1085 * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1087 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1088 const struct lmv_mds_md_v1 *master_lmv)
1090 *slave_lmv = *master_lmv;
1091 slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1094 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1095 struct lu_buf *lmv_buf)
1097 struct lod_thread_info *info = lod_env_info(env);
1098 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1099 struct lod_object *lo = lod_dt_obj(dt);
1100 struct lmv_mds_md_v1 *lmm1;
1103 int type = LU_SEQ_RANGE_ANY;
1109 LASSERT(lo->ldo_dir_striped != 0);
1110 LASSERT(lo->ldo_stripenr > 0);
1111 stripe_count = lo->ldo_stripenr;
1112 lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
1113 if (info->lti_ea_store_size < lmm_size) {
1114 rc = lod_ea_store_resize(info, lmm_size);
1119 lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1120 lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1121 lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1122 lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1123 rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1128 lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1129 fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
1130 for (i = 0; i < lo->ldo_stripenr; i++) {
1131 struct dt_object *dto;
1133 dto = lo->ldo_stripe[i];
1134 LASSERT(dto != NULL);
1135 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
1136 lu_object_fid(&dto->do_lu));
1139 lmv_buf->lb_buf = info->lti_ea_store;
1140 lmv_buf->lb_len = lmm_size;
1141 lo->ldo_dir_striping_cached = 1;
1146 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1147 const struct lu_buf *buf)
1149 struct lod_thread_info *info = lod_env_info(env);
1150 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1151 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1152 struct dt_object **stripe;
1153 union lmv_mds_md *lmm = buf->lb_buf;
1154 struct lmv_mds_md_v1 *lmv1 = &lmm->lmv_md_v1;
1155 struct lu_fid *fid = &info->lti_fid;
1160 if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1163 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1164 lo->ldo_dir_slave_stripe = 1;
1168 if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1171 if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
1174 LASSERT(lo->ldo_stripe == NULL);
1175 OBD_ALLOC(stripe, sizeof(stripe[0]) *
1176 (le32_to_cpu(lmv1->lmv_stripe_count)));
1180 for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1181 struct dt_device *tgt_dt;
1182 struct dt_object *dto;
1183 int type = LU_SEQ_RANGE_ANY;
1186 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1187 if (!fid_is_sane(fid))
1188 GOTO(out, rc = -ESTALE);
1190 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1194 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1195 tgt_dt = lod->lod_child;
1197 struct lod_tgt_desc *tgt;
1199 tgt = LTD_TGT(ltd, idx);
1201 GOTO(out, rc = -ESTALE);
1202 tgt_dt = tgt->ltd_tgt;
1205 dto = dt_locate_at(env, tgt_dt, fid,
1206 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1209 GOTO(out, rc = PTR_ERR(dto));
1214 lo->ldo_stripe = stripe;
1215 lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1216 lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1218 lod_object_free_striping(env, lo);
1223 static int lod_prep_md_striped_create(const struct lu_env *env,
1224 struct dt_object *dt,
1225 struct lu_attr *attr,
1226 const struct lmv_user_md_v1 *lum,
1227 struct dt_object_format *dof,
1230 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1231 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1232 struct lod_object *lo = lod_dt_obj(dt);
1233 struct lod_thread_info *info = lod_env_info(env);
1234 struct dt_object **stripe;
1235 struct lu_buf lmv_buf;
1236 struct lu_buf slave_lmv_buf;
1237 struct lmv_mds_md_v1 *lmm;
1238 struct lmv_mds_md_v1 *slave_lmm = NULL;
1246 /* The lum has been verifed in lod_verify_md_striping */
1247 LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1248 LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1250 stripe_count = le32_to_cpu(lum->lum_stripe_count);
1252 /* shrink the stripe_count to the avaible MDT count */
1253 if (stripe_count > lod->lod_remote_mdt_count + 1)
1254 stripe_count = lod->lod_remote_mdt_count + 1;
1256 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1260 OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1261 if (idx_array == NULL)
1262 GOTO(out_free, rc = -ENOMEM);
1264 for (i = 0; i < stripe_count; i++) {
1265 struct lod_tgt_desc *tgt = NULL;
1266 struct dt_object *dto;
1267 struct lu_fid fid = { 0 };
1269 struct lu_object_conf conf = { 0 };
1270 struct dt_device *tgt_dt = NULL;
1273 /* Right now, master stripe and master object are
1274 * on the same MDT */
1275 idx = le32_to_cpu(lum->lum_stripe_offset);
1276 rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1280 tgt_dt = lod->lod_child;
1284 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1286 for (j = 0; j < lod->lod_remote_mdt_count;
1287 j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1288 bool already_allocated = false;
1291 CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1292 " allocated %d, last allocated %d\n", idx,
1293 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1295 /* Find next available target */
1296 if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1299 /* check whether the idx already exists
1300 * in current allocated array */
1301 for (k = 0; k < i; k++) {
1302 if (idx_array[k] == idx) {
1303 already_allocated = true;
1308 if (already_allocated)
1311 /* check the status of the OSP */
1312 tgt = LTD_TGT(ltd, idx);
1316 tgt_dt = tgt->ltd_tgt;
1317 rc = dt_statfs(env, tgt_dt, NULL);
1319 /* this OSP doesn't feel well */
1324 rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1333 /* Can not allocate more stripes */
1334 if (j == lod->lod_remote_mdt_count) {
1335 CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1336 lod2obd(lod)->obd_name, stripe_count, i - 1);
1340 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1341 " allocated %d, last allocated %d\n", idx,
1342 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1345 /* tgt_dt and fid must be ready after search avaible OSP
1346 * in the above loop */
1347 LASSERT(tgt_dt != NULL);
1348 LASSERT(fid_is_sane(&fid));
1349 conf.loc_flags = LOC_F_NEW;
1350 dto = dt_locate_at(env, tgt_dt, &fid,
1351 dt->do_lu.lo_dev->ld_site->ls_top_dev,
1354 GOTO(out_put, rc = PTR_ERR(dto));
1359 lo->ldo_dir_striped = 1;
1360 lo->ldo_stripe = stripe;
1361 lo->ldo_stripenr = i;
1362 lo->ldo_stripes_allocated = stripe_count;
1364 if (lo->ldo_stripenr == 0)
1365 GOTO(out_put, rc = -ENOSPC);
1367 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1370 lmm = lmv_buf.lb_buf;
1372 OBD_ALLOC_PTR(slave_lmm);
1373 if (slave_lmm == NULL)
1374 GOTO(out_put, rc = -ENOMEM);
1376 lod_prep_slave_lmv_md(slave_lmm, lmm);
1377 slave_lmv_buf.lb_buf = slave_lmm;
1378 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1380 if (!dt_try_as_dir(env, dt_object_child(dt)))
1381 GOTO(out_put, rc = -EINVAL);
1383 for (i = 0; i < lo->ldo_stripenr; i++) {
1384 struct dt_object *dto = stripe[i];
1385 char *stripe_name = info->lti_key;
1386 struct lu_name *sname;
1387 struct linkea_data ldata = { 0 };
1388 struct lu_buf linkea_buf;
1390 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1394 if (!dt_try_as_dir(env, dto))
1395 GOTO(out_put, rc = -EINVAL);
1397 rc = dt_declare_insert(env, dto,
1398 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1399 (const struct dt_key *)dot, th);
1403 /* master stripe FID will be put to .. */
1404 rc = dt_declare_insert(env, dto,
1405 (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1406 (const struct dt_key *)dotdot, th);
1410 /* probably nothing to inherite */
1411 if (lo->ldo_striping_cached &&
1412 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1413 lo->ldo_def_stripenr,
1414 lo->ldo_def_stripe_offset)) {
1415 struct lov_user_md_v3 *v3;
1417 /* sigh, lti_ea_store has been used for lmv_buf,
1418 * so we have to allocate buffer for default
1422 GOTO(out_put, rc = -ENOMEM);
1424 memset(v3, 0, sizeof(*v3));
1425 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1426 v3->lmm_stripe_count =
1427 cpu_to_le16(lo->ldo_def_stripenr);
1428 v3->lmm_stripe_offset =
1429 cpu_to_le16(lo->ldo_def_stripe_offset);
1430 v3->lmm_stripe_size =
1431 cpu_to_le32(lo->ldo_def_stripe_size);
1432 if (lo->ldo_pool != NULL)
1433 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1434 sizeof(v3->lmm_pool_name));
1436 info->lti_buf.lb_buf = v3;
1437 info->lti_buf.lb_len = sizeof(*v3);
1438 rc = dt_declare_xattr_set(env, dto,
1447 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1448 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1449 XATTR_NAME_LMV, 0, th);
1453 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1454 PFID(lu_object_fid(&dto->do_lu)), i);
1456 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1457 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1461 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1465 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1466 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1467 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1468 XATTR_NAME_LINK, 0, th);
1472 rc = dt_declare_insert(env, dt_object_child(dt),
1473 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1474 (const struct dt_key *)stripe_name, th);
1478 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1483 rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1484 XATTR_NAME_LMV, 0, th);
1490 for (i = 0; i < stripe_count; i++)
1491 if (stripe[i] != NULL)
1492 lu_object_put(env, &stripe[i]->do_lu);
1493 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1494 lo->ldo_stripenr = 0;
1495 lo->ldo_stripes_allocated = 0;
1496 lo->ldo_stripe = NULL;
1500 if (idx_array != NULL)
1501 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1502 if (slave_lmm != NULL)
1503 OBD_FREE_PTR(slave_lmm);
1509 * Declare create striped md object.
1511 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1512 struct dt_object *dt,
1513 struct lu_attr *attr,
1514 const struct lu_buf *lum_buf,
1515 struct dt_object_format *dof,
1518 struct lod_object *lo = lod_dt_obj(dt);
1519 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1520 struct lmv_user_md_v1 *lum;
1524 lum = lum_buf->lb_buf;
1525 LASSERT(lum != NULL);
1527 CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1528 le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1529 (int)le32_to_cpu(lum->lum_stripe_offset));
1531 if (le32_to_cpu(lum->lum_stripe_count) == 0)
1534 rc = lod_verify_md_striping(lod, lum);
1538 /* prepare dir striped objects */
1539 rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1541 /* failed to create striping, let's reset
1542 * config so that others don't get confused */
1543 lod_object_free_striping(env, lo);
1550 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1551 struct dt_object *dt,
1552 const struct lu_buf *buf,
1553 const char *name, int fl,
1556 struct dt_object *next = dt_object_child(dt);
1557 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1558 struct lod_object *lo = lod_dt_obj(dt);
1563 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1564 struct lmv_user_md_v1 *lum;
1566 LASSERT(buf != NULL && buf->lb_buf != NULL);
1568 rc = lod_verify_md_striping(d, lum);
1573 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1577 /* set xattr to each stripes, if needed */
1578 rc = lod_load_striping(env, lo);
1582 if (lo->ldo_stripenr == 0)
1585 for (i = 0; i < lo->ldo_stripenr; i++) {
1586 LASSERT(lo->ldo_stripe[i]);
1587 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1597 * LOV xattr is a storage for striping, and LOD owns this xattr.
1598 * but LOD allows others to control striping to some extent
1599 * - to reset strping
1600 * - to set new defined striping
1601 * - to set new semi-defined striping
1602 * - number of stripes is defined
1603 * - number of stripes + osts are defined
1606 static int lod_declare_xattr_set(const struct lu_env *env,
1607 struct dt_object *dt,
1608 const struct lu_buf *buf,
1609 const char *name, int fl,
1612 struct dt_object *next = dt_object_child(dt);
1613 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
1619 * allow to declare predefined striping on a new (!mode) object
1620 * which is supposed to be replay of regular file creation
1621 * (when LOV setting is declared)
1622 * LU_XATTR_REPLACE is set to indicate a layout swap
1624 mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1625 if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1626 !(fl & LU_XATTR_REPLACE)) {
1628 * this is a request to manipulate object's striping
1630 if (dt_object_exists(dt)) {
1631 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1635 memset(attr, 0, sizeof(*attr));
1636 attr->la_valid = LA_TYPE | LA_MODE;
1637 attr->la_mode = S_IFREG;
1639 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1640 } else if (S_ISDIR(mode)) {
1641 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1643 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1649 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1651 lo->ldo_striping_cached = 0;
1652 lo->ldo_def_striping_set = 0;
1653 lod_object_set_pool(lo, NULL);
1654 lo->ldo_def_stripe_size = 0;
1655 lo->ldo_def_stripenr = 0;
1656 if (lo->ldo_dir_stripe != NULL)
1657 lo->ldo_dir_striping_cached = 0;
1660 static int lod_xattr_set_internal(const struct lu_env *env,
1661 struct dt_object *dt,
1662 const struct lu_buf *buf,
1663 const char *name, int fl, struct thandle *th,
1664 struct lustre_capa *capa)
1666 struct dt_object *next = dt_object_child(dt);
1667 struct lod_object *lo = lod_dt_obj(dt);
1672 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1673 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1676 if (lo->ldo_stripenr == 0)
1679 for (i = 0; i < lo->ldo_stripenr; i++) {
1680 LASSERT(lo->ldo_stripe[i]);
1681 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1690 static int lod_xattr_del_internal(const struct lu_env *env,
1691 struct dt_object *dt,
1692 const char *name, struct thandle *th,
1693 struct lustre_capa *capa)
1695 struct dt_object *next = dt_object_child(dt);
1696 struct lod_object *lo = lod_dt_obj(dt);
1701 rc = dt_xattr_del(env, next, name, th, capa);
1702 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1705 if (lo->ldo_stripenr == 0)
1708 for (i = 0; i < lo->ldo_stripenr; i++) {
1709 LASSERT(lo->ldo_stripe[i]);
1710 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1719 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1720 struct dt_object *dt,
1721 const struct lu_buf *buf,
1722 const char *name, int fl,
1724 struct lustre_capa *capa)
1726 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1727 struct lod_object *l = lod_dt_obj(dt);
1728 struct lov_user_md_v1 *lum;
1729 struct lov_user_md_v3 *v3 = NULL;
1733 /* If it is striped dir, we should clear the stripe cache for
1734 * slave stripe as well, but there are no effective way to
1735 * notify the LOD on the slave MDT, so we do not cache stripe
1736 * information for slave stripe for now. XXX*/
1737 lod_lov_stripe_cache_clear(l);
1738 LASSERT(buf != NULL && buf->lb_buf != NULL);
1741 rc = lod_verify_striping(d, buf, false);
1745 if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1748 /* if { size, offset, count } = { 0, -1, 0 } and no pool
1749 * (i.e. all default values specified) then delete default
1750 * striping from dir. */
1752 "set default striping: sz %u # %u offset %d %s %s\n",
1753 (unsigned)lum->lmm_stripe_size,
1754 (unsigned)lum->lmm_stripe_count,
1755 (int)lum->lmm_stripe_offset,
1756 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1758 if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1759 (lum->lmm_stripe_count),
1760 (lum->lmm_stripe_offset)) &&
1761 lum->lmm_magic == LOV_USER_MAGIC_V1) {
1762 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1766 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1772 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1773 struct dt_object *dt,
1774 const struct lu_buf *buf,
1775 const char *name, int fl,
1777 struct lustre_capa *capa)
1779 struct lod_object *l = lod_dt_obj(dt);
1780 struct lmv_user_md_v1 *lum;
1784 LASSERT(buf != NULL && buf->lb_buf != NULL);
1787 CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1788 le32_to_cpu(lum->lum_stripe_count),
1789 (int)le32_to_cpu(lum->lum_stripe_offset));
1791 if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1792 le32_to_cpu(lum->lum_stripe_offset)) &&
1793 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1794 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1798 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1803 /* Update default stripe cache */
1804 if (l->ldo_dir_stripe == NULL) {
1805 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1806 if (l->ldo_dir_stripe == NULL)
1810 l->ldo_dir_striping_cached = 0;
1811 l->ldo_dir_def_striping_set = 1;
1812 l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
1817 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1818 const struct lu_buf *buf, const char *name,
1819 int fl, struct thandle *th,
1820 struct lustre_capa *capa)
1822 struct lod_object *lo = lod_dt_obj(dt);
1823 struct lod_thread_info *info = lod_env_info(env);
1824 struct lu_attr *attr = &info->lti_attr;
1825 struct dt_object_format *dof = &info->lti_format;
1826 struct lu_buf lmv_buf;
1827 struct lu_buf slave_lmv_buf;
1828 struct lmv_mds_md_v1 *lmm;
1829 struct lmv_mds_md_v1 *slave_lmm = NULL;
1834 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1837 /* The stripes are supposed to be allocated in declare phase,
1838 * if there are no stripes being allocated, it will skip */
1839 if (lo->ldo_stripenr == 0)
1842 rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
1846 attr->la_valid = LA_TYPE | LA_MODE;
1847 dof->dof_type = DFT_DIR;
1849 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1852 lmm = lmv_buf.lb_buf;
1854 OBD_ALLOC_PTR(slave_lmm);
1855 if (slave_lmm == NULL)
1858 lod_prep_slave_lmv_md(slave_lmm, lmm);
1859 slave_lmv_buf.lb_buf = slave_lmm;
1860 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1862 for (i = 0; i < lo->ldo_stripenr; i++) {
1863 struct dt_object *dto;
1864 char *stripe_name = info->lti_key;
1865 struct lu_name *sname;
1866 struct linkea_data ldata = { 0 };
1867 struct lu_buf linkea_buf;
1869 dto = lo->ldo_stripe[i];
1870 dt_write_lock(env, dto, MOR_TGT_CHILD);
1871 rc = dt_create(env, dto, attr, NULL, dof, th);
1872 dt_write_unlock(env, dto);
1876 rc = dt_insert(env, dto,
1877 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1878 (const struct dt_key *)dot, th, capa, 0);
1882 rc = dt_insert(env, dto,
1883 (struct dt_rec *)lu_object_fid(&dt->do_lu),
1884 (const struct dt_key *)dotdot, th, capa, 0);
1888 if (lo->ldo_striping_cached &&
1889 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1890 lo->ldo_def_stripenr,
1891 lo->ldo_def_stripe_offset)) {
1892 struct lov_user_md_v3 *v3;
1894 /* sigh, lti_ea_store has been used for lmv_buf,
1895 * so we have to allocate buffer for default
1901 memset(v3, 0, sizeof(*v3));
1902 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1903 v3->lmm_stripe_count =
1904 cpu_to_le16(lo->ldo_def_stripenr);
1905 v3->lmm_stripe_offset =
1906 cpu_to_le16(lo->ldo_def_stripe_offset);
1907 v3->lmm_stripe_size =
1908 cpu_to_le32(lo->ldo_def_stripe_size);
1909 if (lo->ldo_pool != NULL)
1910 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1911 sizeof(v3->lmm_pool_name));
1913 info->lti_buf.lb_buf = v3;
1914 info->lti_buf.lb_len = sizeof(*v3);
1915 rc = dt_xattr_set(env, dto, &info->lti_buf,
1916 XATTR_NAME_LOV, 0, th, capa);
1922 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1923 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
1928 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1929 PFID(lu_object_fid(&dto->do_lu)), i);
1931 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1932 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1936 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1940 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1941 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1942 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
1943 0, th, BYPASS_CAPA);
1947 rc = dt_insert(env, dt_object_child(dt),
1948 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1949 (const struct dt_key *)stripe_name, th, capa, 0);
1953 rc = dt_ref_add(env, dt_object_child(dt), th);
1958 rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
1962 if (slave_lmm != NULL)
1963 OBD_FREE_PTR(slave_lmm);
1968 int lod_dir_striping_create_internal(const struct lu_env *env,
1969 struct dt_object *dt,
1970 struct lu_attr *attr,
1971 struct dt_object_format *dof,
1975 struct lod_thread_info *info = lod_env_info(env);
1976 struct lod_object *lo = lod_dt_obj(dt);
1980 if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1981 lo->ldo_dir_stripe_offset)) {
1982 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1983 int stripe_count = lo->ldo_stripenr;
1985 if (info->lti_ea_store_size < sizeof(*v1)) {
1986 rc = lod_ea_store_resize(info, sizeof(*v1));
1989 v1 = info->lti_ea_store;
1992 memset(v1, 0, sizeof(*v1));
1993 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
1994 v1->lum_stripe_count = cpu_to_le32(stripe_count);
1995 v1->lum_stripe_offset =
1996 cpu_to_le32(lo->ldo_dir_stripe_offset);
1998 info->lti_buf.lb_buf = v1;
1999 info->lti_buf.lb_len = sizeof(*v1);
2002 rc = lod_declare_xattr_set_lmv(env, dt, attr,
2003 &info->lti_buf, dof, th);
2005 rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2006 XATTR_NAME_LMV, 0, th,
2012 /* Transfer default LMV striping from the parent */
2013 if (lo->ldo_dir_striping_cached &&
2014 !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2015 lo->ldo_dir_def_stripe_offset)) {
2016 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2017 int def_stripe_count = lo->ldo_dir_def_stripenr;
2019 if (info->lti_ea_store_size < sizeof(*v1)) {
2020 rc = lod_ea_store_resize(info, sizeof(*v1));
2023 v1 = info->lti_ea_store;
2026 memset(v1, 0, sizeof(*v1));
2027 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2028 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2029 v1->lum_stripe_offset =
2030 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2032 cpu_to_le32(lo->ldo_dir_def_hash_type);
2034 info->lti_buf.lb_buf = v1;
2035 info->lti_buf.lb_len = sizeof(*v1);
2037 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2038 XATTR_NAME_DEFAULT_LMV,
2041 rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2043 XATTR_NAME_DEFAULT_LMV, 0,
2049 /* Transfer default LOV striping from the parent */
2050 if (lo->ldo_striping_cached &&
2051 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2052 lo->ldo_def_stripenr,
2053 lo->ldo_def_stripe_offset)) {
2054 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2056 if (info->lti_ea_store_size < sizeof(*v3)) {
2057 rc = lod_ea_store_resize(info, sizeof(*v3));
2060 v3 = info->lti_ea_store;
2063 memset(v3, 0, sizeof(*v3));
2064 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2065 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2066 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2067 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2068 if (lo->ldo_pool != NULL)
2069 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2070 sizeof(v3->lmm_pool_name));
2072 info->lti_buf.lb_buf = v3;
2073 info->lti_buf.lb_len = sizeof(*v3);
2076 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2077 XATTR_NAME_LOV, 0, th);
2079 rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2080 XATTR_NAME_LOV, 0, th,
2089 static int lod_declare_dir_striping_create(const struct lu_env *env,
2090 struct dt_object *dt,
2091 struct lu_attr *attr,
2092 struct dt_object_format *dof,
2095 return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2098 static int lod_dir_striping_create(const struct lu_env *env,
2099 struct dt_object *dt,
2100 struct lu_attr *attr,
2101 struct dt_object_format *dof,
2104 return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2107 static int lod_xattr_set(const struct lu_env *env,
2108 struct dt_object *dt, const struct lu_buf *buf,
2109 const char *name, int fl, struct thandle *th,
2110 struct lustre_capa *capa)
2112 struct dt_object *next = dt_object_child(dt);
2116 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2117 strcmp(name, XATTR_NAME_LMV) == 0) {
2118 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2120 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2121 LMV_HASH_FLAG_MIGRATION)
2122 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2124 rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2129 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2130 strcmp(name, XATTR_NAME_LOV) == 0) {
2132 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2134 } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2135 strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2137 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2140 } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2141 !strcmp(name, XATTR_NAME_LOV)) {
2142 /* in case of lov EA swap, just set it
2143 * if not, it is a replay so check striping match what we
2144 * already have during req replay, declare_xattr_set()
2145 * defines striping, then create() does the work
2147 if (fl & LU_XATTR_REPLACE) {
2148 /* free stripes, then update disk */
2149 lod_object_free_striping(env, lod_dt_obj(dt));
2150 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2152 rc = lod_striping_create(env, dt, NULL, NULL, th);
2157 /* then all other xattr */
2158 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2163 static int lod_declare_xattr_del(const struct lu_env *env,
2164 struct dt_object *dt, const char *name,
2167 return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2170 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2171 const char *name, struct thandle *th,
2172 struct lustre_capa *capa)
2174 if (!strcmp(name, XATTR_NAME_LOV))
2175 lod_object_free_striping(env, lod_dt_obj(dt));
2176 return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2179 static int lod_xattr_list(const struct lu_env *env,
2180 struct dt_object *dt, struct lu_buf *buf,
2181 struct lustre_capa *capa)
2183 return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2186 int lod_object_set_pool(struct lod_object *o, char *pool)
2191 len = strlen(o->ldo_pool);
2192 OBD_FREE(o->ldo_pool, len + 1);
2197 OBD_ALLOC(o->ldo_pool, len + 1);
2198 if (o->ldo_pool == NULL)
2200 strcpy(o->ldo_pool, pool);
2205 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2207 return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2211 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2212 struct lod_object *lp)
2214 struct lod_thread_info *info = lod_env_info(env);
2215 struct lov_user_md_v1 *v1 = NULL;
2216 struct lov_user_md_v3 *v3 = NULL;
2220 /* called from MDD without parent being write locked,
2222 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2223 rc = lod_get_lov_ea(env, lp);
2227 if (rc < sizeof(struct lov_user_md)) {
2228 /* don't lookup for non-existing or invalid striping */
2229 lp->ldo_def_striping_set = 0;
2230 lp->ldo_striping_cached = 1;
2231 lp->ldo_def_stripe_size = 0;
2232 lp->ldo_def_stripenr = 0;
2233 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2234 GOTO(unlock, rc = 0);
2238 v1 = info->lti_ea_store;
2239 if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2240 lustre_swab_lov_user_md_v1(v1);
2241 } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2242 v3 = (struct lov_user_md_v3 *)v1;
2243 lustre_swab_lov_user_md_v3(v3);
2246 if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2247 GOTO(unlock, rc = 0);
2249 if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2250 GOTO(unlock, rc = 0);
2252 CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2253 PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2254 (int)v1->lmm_stripe_count,
2255 (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2257 lp->ldo_def_stripenr = v1->lmm_stripe_count;
2258 lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2259 lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2260 lp->ldo_striping_cached = 1;
2261 lp->ldo_def_striping_set = 1;
2262 if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2263 /* XXX: sanity check here */
2264 v3 = (struct lov_user_md_v3 *) v1;
2265 if (v3->lmm_pool_name[0])
2266 lod_object_set_pool(lp, v3->lmm_pool_name);
2270 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2275 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2276 struct lod_object *lp)
2278 struct lod_thread_info *info = lod_env_info(env);
2279 struct lmv_user_md_v1 *v1 = NULL;
2283 /* called from MDD without parent being write locked,
2285 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2286 rc = lod_get_default_lmv_ea(env, lp);
2290 if (rc < sizeof(struct lmv_user_md)) {
2291 /* don't lookup for non-existing or invalid striping */
2292 lp->ldo_dir_def_striping_set = 0;
2293 lp->ldo_dir_striping_cached = 1;
2294 lp->ldo_dir_def_stripenr = 0;
2295 lp->ldo_dir_def_stripe_offset =
2296 (typeof(v1->lum_stripe_offset))(-1);
2297 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2298 GOTO(unlock, rc = 0);
2302 v1 = info->lti_ea_store;
2304 lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2305 lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2306 lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2307 lp->ldo_dir_def_striping_set = 1;
2308 lp->ldo_dir_striping_cached = 1;
2312 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2316 static int lod_cache_parent_striping(const struct lu_env *env,
2317 struct lod_object *lp,
2323 rc = lod_load_striping(env, lp);
2327 if (!lp->ldo_striping_cached) {
2328 /* we haven't tried to get default striping for
2329 * the directory yet, let's cache it in the object */
2330 rc = lod_cache_parent_lov_striping(env, lp);
2335 if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2336 rc = lod_cache_parent_lmv_striping(env, lp);
2342 * used to transfer default striping data to the object being created
2344 static void lod_ah_init(const struct lu_env *env,
2345 struct dt_allocation_hint *ah,
2346 struct dt_object *parent,
2347 struct dt_object *child,
2350 struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2351 struct dt_object *nextp = NULL;
2352 struct dt_object *nextc;
2353 struct lod_object *lp = NULL;
2354 struct lod_object *lc;
2355 struct lov_desc *desc;
2361 if (likely(parent)) {
2362 nextp = dt_object_child(parent);
2363 lp = lod_dt_obj(parent);
2364 rc = lod_load_striping(env, lp);
2369 nextc = dt_object_child(child);
2370 lc = lod_dt_obj(child);
2372 LASSERT(lc->ldo_stripenr == 0);
2373 LASSERT(lc->ldo_stripe == NULL);
2376 * local object may want some hints
2377 * in case of late striping creation, ->ah_init()
2378 * can be called with local object existing
2380 if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2381 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2382 NULL : nextp, nextc, child_mode);
2384 if (S_ISDIR(child_mode)) {
2385 if (lc->ldo_dir_stripe == NULL) {
2386 OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2387 if (lc->ldo_dir_stripe == NULL)
2391 if (lp->ldo_dir_stripe == NULL) {
2392 OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2393 if (lp->ldo_dir_stripe == NULL)
2397 rc = lod_cache_parent_striping(env, lp, child_mode);
2401 /* transfer defaults to new directory */
2402 if (lp->ldo_striping_cached) {
2404 lod_object_set_pool(lc, lp->ldo_pool);
2405 lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2406 lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2407 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2408 lc->ldo_striping_cached = 1;
2409 lc->ldo_def_striping_set = 1;
2410 CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2411 (int)lc->ldo_def_stripe_size,
2412 (int)lc->ldo_def_stripe_offset,
2413 (int)lc->ldo_def_stripenr);
2416 /* transfer dir defaults to new directory */
2417 if (lp->ldo_dir_striping_cached) {
2418 lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2419 lc->ldo_dir_def_stripe_offset =
2420 lp->ldo_dir_def_stripe_offset;
2421 lc->ldo_dir_def_hash_type =
2422 lp->ldo_dir_def_hash_type;
2423 lc->ldo_dir_striping_cached = 1;
2424 lc->ldo_dir_def_striping_set = 1;
2425 CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2426 (int)lc->ldo_dir_def_stripenr,
2427 (int)lc->ldo_dir_def_stripe_offset,
2428 lc->ldo_dir_def_hash_type);
2431 /* It should always honour the specified stripes */
2432 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2433 const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2435 rc = lod_verify_md_striping(d, lum1);
2437 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2438 /* Directory will be striped only if
2439 * stripe_count > 1 */
2441 le32_to_cpu(lum1->lum_stripe_count);
2442 lc->ldo_dir_stripe_offset =
2443 le32_to_cpu(lum1->lum_stripe_offset);
2444 lc->ldo_dir_hash_type =
2445 le32_to_cpu(lum1->lum_hash_type);
2446 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2448 (int)lc->ldo_dir_stripe_offset);
2450 /* then check whether there is default stripes from parent */
2451 } else if (lp->ldo_dir_def_striping_set) {
2452 /* If there are default dir stripe from parent */
2453 lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2454 lc->ldo_dir_stripe_offset =
2455 lp->ldo_dir_def_stripe_offset;
2456 lc->ldo_dir_hash_type =
2457 lp->ldo_dir_def_hash_type;
2458 CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2460 (int)lc->ldo_dir_stripe_offset);
2462 /* set default stripe for this directory */
2463 lc->ldo_stripenr = 0;
2464 lc->ldo_dir_stripe_offset = -1;
2467 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2468 lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2474 * if object is going to be striped over OSTs, transfer default
2475 * striping information to the child, so that we can use it
2476 * during declaration and creation
2478 if (!lod_object_will_be_striped(S_ISREG(child_mode),
2479 lu_object_fid(&child->do_lu)))
2482 * try from the parent
2484 if (likely(parent)) {
2485 lod_cache_parent_striping(env, lp, child_mode);
2487 lc->ldo_def_stripe_offset = (__u16) -1;
2489 if (lp->ldo_def_striping_set) {
2491 lod_object_set_pool(lc, lp->ldo_pool);
2492 lc->ldo_stripenr = lp->ldo_def_stripenr;
2493 lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2494 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2495 CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2496 lc->ldo_stripenr, lc->ldo_stripe_size,
2497 lp->ldo_pool ? lp->ldo_pool : "");
2502 * if the parent doesn't provide with specific pattern, grab fs-wide one
2504 desc = &d->lod_desc;
2505 if (lc->ldo_stripenr == 0)
2506 lc->ldo_stripenr = desc->ld_default_stripe_count;
2507 if (lc->ldo_stripe_size == 0)
2508 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2509 CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2510 lc->ldo_stripenr, lc->ldo_stripe_size,
2511 lc->ldo_pool ? lc->ldo_pool : "");
2514 /* we do not cache stripe information for slave stripe, see
2515 * lod_xattr_set_lov_on_dir */
2516 if (lp != NULL && lp->ldo_dir_slave_stripe)
2517 lod_lov_stripe_cache_clear(lp);
2522 #define ll_do_div64(aaa,bbb) do_div((aaa), (bbb))
2524 * this function handles a special case when truncate was done
2525 * on a stripeless object and now striping is being created
2526 * we can't lose that size, so we have to propagate it to newly
2529 static int lod_declare_init_size(const struct lu_env *env,
2530 struct dt_object *dt, struct thandle *th)
2532 struct dt_object *next = dt_object_child(dt);
2533 struct lod_object *lo = lod_dt_obj(dt);
2534 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
2535 uint64_t size, offs;
2539 /* XXX: we support the simplest (RAID0) striping so far */
2540 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2541 LASSERT(lo->ldo_stripe_size > 0);
2543 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2544 LASSERT(attr->la_valid & LA_SIZE);
2548 size = attr->la_size;
2552 /* ll_do_div64(a, b) returns a % b, and a = a / b */
2553 ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2554 stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2556 size = size * lo->ldo_stripe_size;
2557 offs = attr->la_size;
2558 size += ll_do_div64(offs, lo->ldo_stripe_size);
2560 attr->la_valid = LA_SIZE;
2561 attr->la_size = size;
2563 rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2569 * Create declaration of striped object
2571 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2572 struct lu_attr *attr,
2573 const struct lu_buf *lovea, struct thandle *th)
2575 struct lod_thread_info *info = lod_env_info(env);
2576 struct dt_object *next = dt_object_child(dt);
2577 struct lod_object *lo = lod_dt_obj(dt);
2581 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2582 /* failed to create striping, let's reset
2583 * config so that others don't get confused */
2584 lod_object_free_striping(env, lo);
2585 GOTO(out, rc = -ENOMEM);
2588 if (!dt_object_remote(next)) {
2589 /* choose OST and generate appropriate objects */
2590 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2592 /* failed to create striping, let's reset
2593 * config so that others don't get confused */
2594 lod_object_free_striping(env, lo);
2599 * declare storage for striping data
2601 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2602 lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
2604 /* LOD can not choose OST objects for remote objects, i.e.
2605 * stripes must be ready before that. Right now, it can only
2606 * happen during migrate, i.e. migrate process needs to create
2607 * remote regular file (mdd_migrate_create), then the migrate
2608 * process will provide stripeEA. */
2609 LASSERT(lovea != NULL);
2610 info->lti_buf = *lovea;
2613 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2614 XATTR_NAME_LOV, 0, th);
2619 * if striping is created with local object's size > 0,
2620 * we have to propagate this size to specific object
2621 * the case is possible only when local object was created previously
2623 if (dt_object_exists(next))
2624 rc = lod_declare_init_size(env, dt, th);
2630 static int lod_declare_object_create(const struct lu_env *env,
2631 struct dt_object *dt,
2632 struct lu_attr *attr,
2633 struct dt_allocation_hint *hint,
2634 struct dt_object_format *dof,
2637 struct dt_object *next = dt_object_child(dt);
2638 struct lod_object *lo = lod_dt_obj(dt);
2647 * first of all, we declare creation of local object
2649 rc = dt_declare_create(env, next, attr, hint, dof, th);
2653 if (dof->dof_type == DFT_SYM)
2654 dt->do_body_ops = &lod_body_lnk_ops;
2657 * it's lod_ah_init() who has decided the object will striped
2659 if (dof->dof_type == DFT_REGULAR) {
2660 /* callers don't want stripes */
2661 /* XXX: all tricky interactions with ->ah_make_hint() decided
2662 * to use striping, then ->declare_create() behaving differently
2663 * should be cleaned */
2664 if (dof->u.dof_reg.striped == 0)
2665 lo->ldo_stripenr = 0;
2666 if (lo->ldo_stripenr > 0)
2667 rc = lod_declare_striped_object(env, dt, attr,
2669 } else if (dof->dof_type == DFT_DIR) {
2670 /* Orphan object (like migrating object) does not have
2671 * lod_dir_stripe, see lod_ah_init */
2672 if (lo->ldo_dir_stripe != NULL)
2673 rc = lod_declare_dir_striping_create(env, dt, attr,
2680 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2681 struct lu_attr *attr, struct dt_object_format *dof,
2684 struct lod_object *lo = lod_dt_obj(dt);
2688 LASSERT(lo->ldo_striping_cached == 0);
2690 /* create all underlying objects */
2691 for (i = 0; i < lo->ldo_stripenr; i++) {
2692 LASSERT(lo->ldo_stripe[i]);
2693 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2699 rc = lod_generate_and_set_lovea(env, lo, th);
2704 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2705 struct lu_attr *attr,
2706 struct dt_allocation_hint *hint,
2707 struct dt_object_format *dof, struct thandle *th)
2709 struct dt_object *next = dt_object_child(dt);
2710 struct lod_object *lo = lod_dt_obj(dt);
2714 /* create local object */
2715 rc = dt_create(env, next, attr, hint, dof, th);
2719 if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2720 lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2721 rc = lod_striping_create(env, dt, attr, dof, th);
2726 static int lod_declare_object_destroy(const struct lu_env *env,
2727 struct dt_object *dt,
2730 struct dt_object *next = dt_object_child(dt);
2731 struct lod_object *lo = lod_dt_obj(dt);
2732 struct lod_thread_info *info = lod_env_info(env);
2733 char *stripe_name = info->lti_key;
2738 * load striping information, notice we don't do this when object
2739 * is being initialized as we don't need this information till
2740 * few specific cases like destroy, chown
2742 rc = lod_load_striping(env, lo);
2746 /* declare destroy for all underlying objects */
2747 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2748 rc = next->do_ops->do_index_try(env, next,
2749 &dt_directory_features);
2753 for (i = 0; i < lo->ldo_stripenr; i++) {
2754 rc = dt_declare_ref_del(env, next, th);
2757 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2758 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2760 rc = dt_declare_delete(env, next,
2761 (const struct dt_key *)stripe_name, th);
2767 * we declare destroy for the local object
2769 rc = dt_declare_destroy(env, next, th);
2773 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2776 /* declare destroy all striped objects */
2777 for (i = 0; i < lo->ldo_stripenr; i++) {
2778 if (likely(lo->ldo_stripe[i] != NULL)) {
2779 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2788 static int lod_object_destroy(const struct lu_env *env,
2789 struct dt_object *dt, struct thandle *th)
2791 struct dt_object *next = dt_object_child(dt);
2792 struct lod_object *lo = lod_dt_obj(dt);
2793 struct lod_thread_info *info = lod_env_info(env);
2794 char *stripe_name = info->lti_key;
2798 /* destroy sub-stripe of master object */
2799 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2800 rc = next->do_ops->do_index_try(env, next,
2801 &dt_directory_features);
2805 for (i = 0; i < lo->ldo_stripenr; i++) {
2806 rc = dt_ref_del(env, next, th);
2810 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2811 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2814 CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
2815 PFID(lu_object_fid(&dt->do_lu)), stripe_name,
2816 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
2818 rc = dt_delete(env, next,
2819 (const struct dt_key *)stripe_name,
2825 rc = dt_destroy(env, next, th);
2829 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2832 /* destroy all striped objects */
2833 for (i = 0; i < lo->ldo_stripenr; i++) {
2834 if (likely(lo->ldo_stripe[i] != NULL) &&
2835 (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
2836 i == cfs_fail_val)) {
2837 rc = dt_destroy(env, lo->ldo_stripe[i], th);
2846 static int lod_declare_ref_add(const struct lu_env *env,
2847 struct dt_object *dt, struct thandle *th)
2849 return dt_declare_ref_add(env, dt_object_child(dt), th);
2852 static int lod_ref_add(const struct lu_env *env,
2853 struct dt_object *dt, struct thandle *th)
2855 return dt_ref_add(env, dt_object_child(dt), th);
2858 static int lod_declare_ref_del(const struct lu_env *env,
2859 struct dt_object *dt, struct thandle *th)
2861 return dt_declare_ref_del(env, dt_object_child(dt), th);
2864 static int lod_ref_del(const struct lu_env *env,
2865 struct dt_object *dt, struct thandle *th)
2867 return dt_ref_del(env, dt_object_child(dt), th);
2870 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2871 struct dt_object *dt,
2872 struct lustre_capa *old, __u64 opc)
2874 return dt_capa_get(env, dt_object_child(dt), old, opc);
2877 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
2878 __u64 start, __u64 end)
2880 return dt_object_sync(env, dt_object_child(dt), start, end);
2883 struct lod_slave_locks {
2885 struct lustre_handle lsl_handle[0];
2888 static int lod_object_unlock_internal(const struct lu_env *env,
2889 struct dt_object *dt,
2890 struct ldlm_enqueue_info *einfo,
2891 ldlm_policy_data_t *policy)
2893 struct lod_object *lo = lod_dt_obj(dt);
2894 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2899 if (slave_locks == NULL)
2902 for (i = 1; i < slave_locks->lsl_lock_count; i++) {
2903 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2906 einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2907 rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2910 rc = rc == 0 ? rc1 : rc;
2917 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2918 struct ldlm_enqueue_info *einfo,
2919 union ldlm_policy_data *policy)
2921 struct lod_object *lo = lod_dt_obj(dt);
2922 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2923 int slave_locks_size;
2927 if (slave_locks == NULL)
2930 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2933 rc = lod_load_striping(env, lo);
2937 /* Note: for remote lock for single stripe dir, MDT will cancel
2938 * the lock by lockh directly */
2939 if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
2942 /* Only cancel slave lock for striped dir */
2943 rc = lod_object_unlock_internal(env, dt, einfo, policy);
2945 slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2946 sizeof(slave_locks->lsl_handle[0]);
2947 OBD_FREE(slave_locks, slave_locks_size);
2948 einfo->ei_cbdata = NULL;
2953 static int lod_object_lock(const struct lu_env *env,
2954 struct dt_object *dt,
2955 struct lustre_handle *lh,
2956 struct ldlm_enqueue_info *einfo,
2957 union ldlm_policy_data *policy)
2959 struct lod_object *lo = lod_dt_obj(dt);
2962 int slave_locks_size;
2963 struct lod_slave_locks *slave_locks = NULL;
2966 /* remote object lock */
2967 if (!einfo->ei_enq_slave) {
2968 LASSERT(dt_object_remote(dt));
2969 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2973 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2976 rc = lod_load_striping(env, lo);
2981 if (lo->ldo_stripenr <= 1)
2984 slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2985 sizeof(slave_locks->lsl_handle[0]);
2986 /* Freed in lod_object_unlock */
2987 OBD_ALLOC(slave_locks, slave_locks_size);
2988 if (slave_locks == NULL)
2990 slave_locks->lsl_lock_count = lo->ldo_stripenr;
2992 /* striped directory lock */
2993 for (i = 1; i < lo->ldo_stripenr; i++) {
2994 struct lustre_handle lockh;
2995 struct ldlm_res_id *res_id;
2997 res_id = &lod_env_info(env)->lti_res_id;
2998 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3000 einfo->ei_res_id = res_id;
3002 LASSERT(lo->ldo_stripe[i]);
3003 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3007 slave_locks->lsl_handle[i] = lockh;
3010 einfo->ei_cbdata = slave_locks;
3013 if (rc != 0 && slave_locks != NULL) {
3014 einfo->ei_cbdata = slave_locks;
3015 lod_object_unlock_internal(env, dt, einfo, policy);
3016 OBD_FREE(slave_locks, slave_locks_size);
3017 einfo->ei_cbdata = NULL;
3023 struct dt_object_operations lod_obj_ops = {
3024 .do_read_lock = lod_object_read_lock,
3025 .do_write_lock = lod_object_write_lock,
3026 .do_read_unlock = lod_object_read_unlock,
3027 .do_write_unlock = lod_object_write_unlock,
3028 .do_write_locked = lod_object_write_locked,
3029 .do_attr_get = lod_attr_get,
3030 .do_declare_attr_set = lod_declare_attr_set,
3031 .do_attr_set = lod_attr_set,
3032 .do_xattr_get = lod_xattr_get,
3033 .do_declare_xattr_set = lod_declare_xattr_set,
3034 .do_xattr_set = lod_xattr_set,
3035 .do_declare_xattr_del = lod_declare_xattr_del,
3036 .do_xattr_del = lod_xattr_del,
3037 .do_xattr_list = lod_xattr_list,
3038 .do_ah_init = lod_ah_init,
3039 .do_declare_create = lod_declare_object_create,
3040 .do_create = lod_object_create,
3041 .do_declare_destroy = lod_declare_object_destroy,
3042 .do_destroy = lod_object_destroy,
3043 .do_index_try = lod_index_try,
3044 .do_declare_ref_add = lod_declare_ref_add,
3045 .do_ref_add = lod_ref_add,
3046 .do_declare_ref_del = lod_declare_ref_del,
3047 .do_ref_del = lod_ref_del,
3048 .do_capa_get = lod_capa_get,
3049 .do_object_sync = lod_object_sync,
3050 .do_object_lock = lod_object_lock,
3051 .do_object_unlock = lod_object_unlock,
3054 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3055 struct lu_buf *buf, loff_t *pos,
3056 struct lustre_capa *capa)
3058 struct dt_object *next = dt_object_child(dt);
3059 return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3062 static ssize_t lod_declare_write(const struct lu_env *env,
3063 struct dt_object *dt,
3064 const struct lu_buf *buf, loff_t pos,
3067 return dt_declare_record_write(env, dt_object_child(dt),
3071 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3072 const struct lu_buf *buf, loff_t *pos,
3073 struct thandle *th, struct lustre_capa *capa, int iq)
3075 struct dt_object *next = dt_object_child(dt);
3077 return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3080 static const struct dt_body_operations lod_body_lnk_ops = {
3081 .dbo_read = lod_read,
3082 .dbo_declare_write = lod_declare_write,
3083 .dbo_write = lod_write
3086 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3087 const struct lu_object_conf *conf)
3089 struct lod_device *lod = lu2lod_dev(lo->lo_dev);
3090 struct lu_device *cdev = NULL;
3091 struct lu_object *cobj;
3092 struct lod_tgt_descs *ltd = NULL;
3093 struct lod_tgt_desc *tgt;
3095 int type = LU_SEQ_RANGE_ANY;
3099 rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3103 if (type == LU_SEQ_RANGE_MDT &&
3104 idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3105 cdev = &lod->lod_child->dd_lu_dev;
3106 } else if (type == LU_SEQ_RANGE_MDT) {
3107 ltd = &lod->lod_mdt_descs;
3109 } else if (type == LU_SEQ_RANGE_OST) {
3110 ltd = &lod->lod_ost_descs;
3117 if (ltd->ltd_tgts_size > idx &&
3118 cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3119 tgt = LTD_TGT(ltd, idx);
3121 LASSERT(tgt != NULL);
3122 LASSERT(tgt->ltd_tgt != NULL);
3124 cdev = &(tgt->ltd_tgt->dd_lu_dev);
3126 lod_putref(lod, ltd);
3129 if (unlikely(cdev == NULL))
3132 cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3133 if (unlikely(cobj == NULL))
3136 lu_object_add(lo, cobj);
3141 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3145 if (lo->ldo_dir_stripe != NULL) {
3146 OBD_FREE_PTR(lo->ldo_dir_stripe);
3147 lo->ldo_dir_stripe = NULL;
3150 if (lo->ldo_stripe) {
3151 LASSERT(lo->ldo_stripes_allocated > 0);
3153 for (i = 0; i < lo->ldo_stripenr; i++) {
3154 if (lo->ldo_stripe[i])
3155 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3158 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3159 OBD_FREE(lo->ldo_stripe, i);
3160 lo->ldo_stripe = NULL;
3161 lo->ldo_stripes_allocated = 0;
3163 lo->ldo_stripenr = 0;
3164 lo->ldo_pattern = 0;
3168 * ->start is called once all slices are initialized, including header's
3169 * cache for mode (object type). using the type we can initialize ops
3171 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3173 if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3174 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3178 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3180 struct lod_object *mo = lu2lod_obj(o);
3183 * release all underlying object pinned
3186 lod_object_free_striping(env, mo);
3188 lod_object_set_pool(mo, NULL);
3191 OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3194 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3196 /* XXX: shouldn't we release everything here in case if object
3197 * creation failed before? */
3200 static int lod_object_print(const struct lu_env *env, void *cookie,
3201 lu_printer_t p, const struct lu_object *l)
3203 struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3205 return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3208 struct lu_object_operations lod_lu_obj_ops = {
3209 .loo_object_init = lod_object_init,
3210 .loo_object_start = lod_object_start,
3211 .loo_object_free = lod_object_free,
3212 .loo_object_release = lod_object_release,
3213 .loo_object_print = lod_object_print,