4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2013, Intel Corporation.
29 * lustre/lod/lod_object.c
31 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
49 #include "lod_internal.h"
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58 struct dt_rec *rec, const struct dt_key *key,
59 struct lustre_capa *capa)
61 struct dt_object *next = dt_object_child(dt);
62 return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
65 static int lod_declare_index_insert(const struct lu_env *env,
67 const struct dt_rec *rec,
68 const struct dt_key *key,
69 struct thandle *handle)
71 return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
74 static int lod_index_insert(const struct lu_env *env,
76 const struct dt_rec *rec,
77 const struct dt_key *key,
79 struct lustre_capa *capa,
82 return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
85 static int lod_declare_index_delete(const struct lu_env *env,
87 const struct dt_key *key,
90 return dt_declare_delete(env, dt_object_child(dt), key, th);
93 static int lod_index_delete(const struct lu_env *env,
95 const struct dt_key *key,
97 struct lustre_capa *capa)
99 return dt_delete(env, dt_object_child(dt), key, th, capa);
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103 struct dt_object *dt, __u32 attr,
104 struct lustre_capa *capa)
106 struct dt_object *next = dt_object_child(dt);
107 struct lod_it *it = &lod_env_info(env)->lti_it;
108 struct dt_it *it_next;
111 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
115 /* currently we do not use more than one iterator per thread
116 * so we store it in thread info. if at some point we need
117 * more active iterators in a single thread, we can allocate
119 LASSERT(it->lit_obj == NULL);
121 it->lit_it = it_next;
124 return (struct dt_it *)it;
127 #define LOD_CHECK_IT(env, it) \
129 LASSERT((it)->lit_obj != NULL); \
130 LASSERT((it)->lit_it != NULL); \
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
135 struct lod_it *it = (struct lod_it *)di;
137 LOD_CHECK_IT(env, it);
138 it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
140 /* the iterator not in use any more */
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146 const struct dt_key *key)
148 const struct lod_it *it = (const struct lod_it *)di;
150 LOD_CHECK_IT(env, it);
151 return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
156 struct lod_it *it = (struct lod_it *)di;
158 LOD_CHECK_IT(env, it);
159 return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
164 struct lod_it *it = (struct lod_it *)di;
166 LOD_CHECK_IT(env, it);
167 return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
172 const struct lod_it *it = (const struct lod_it *)di;
174 LOD_CHECK_IT(env, it);
175 return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
180 struct lod_it *it = (struct lod_it *)di;
182 LOD_CHECK_IT(env, it);
183 return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187 struct dt_rec *rec, __u32 attr)
189 const struct lod_it *it = (const struct lod_it *)di;
191 LOD_CHECK_IT(env, it);
192 return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
199 const struct lod_it *it = (const struct lod_it *)di;
201 LOD_CHECK_IT(env, it);
202 return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
208 const struct lod_it *it = (const struct lod_it *)di;
210 LOD_CHECK_IT(env, it);
211 return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
216 const struct lod_it *it = (const struct lod_it *)di;
218 LOD_CHECK_IT(env, it);
219 return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
225 const struct lod_it *it = (const struct lod_it *)di;
227 LOD_CHECK_IT(env, it);
228 return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
232 static struct dt_index_operations lod_index_ops = {
233 .dio_lookup = lod_index_lookup,
234 .dio_declare_insert = lod_declare_index_insert,
235 .dio_insert = lod_index_insert,
236 .dio_declare_delete = lod_declare_index_delete,
237 .dio_delete = lod_index_delete,
245 .key_size = lod_it_key_size,
247 .rec_size = lod_it_rec_size,
248 .store = lod_it_store,
250 .key_rec = lod_it_key_rec,
255 * Implementation of dt_index_operations:: dio_it.init
257 * This function is to initialize the iterator for striped directory,
258 * basically these lod_striped_it_xxx will just locate the stripe
259 * and call the correspondent api of its next lower layer.
261 * \param[in] env execution environment.
262 * \param[in] dt the striped directory object to be iterated.
263 * \param[in] attr the attribute of iterator, mostly used to indicate
264 * the entry attribute in the object to be iterated.
265 * \param[in] capa capability(useless in current implementation)
267 * \retval initialized iterator(dt_it) if successful initialize the
268 * iteration. lit_stripe_index will be used to indicate the
269 * current iterate position among stripes.
270 * \retval ERR pointer if initialization is failed.
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273 struct dt_object *dt, __u32 attr,
274 struct lustre_capa *capa)
276 struct lod_object *lo = lod_dt_obj(dt);
277 struct dt_object *next;
278 struct lod_it *it = &lod_env_info(env)->lti_it;
279 struct dt_it *it_next;
282 LASSERT(lo->ldo_stripenr > 0);
283 next = lo->ldo_stripe[0];
284 LASSERT(next != NULL);
285 LASSERT(next->do_index_ops != NULL);
287 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
291 /* currently we do not use more than one iterator per thread
292 * so we store it in thread info. if at some point we need
293 * more active iterators in a single thread, we can allocate
295 LASSERT(it->lit_obj == NULL);
297 it->lit_stripe_index = 0;
299 it->lit_it = it_next;
302 return (struct dt_it *)it;
305 #define LOD_CHECK_STRIPED_IT(env, it, lo) \
307 LASSERT((it)->lit_obj != NULL); \
308 LASSERT((it)->lit_it != NULL); \
309 LASSERT((lo)->ldo_stripenr > 0); \
310 LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \
314 * Implementation of dt_index_operations:: dio_it.fini
316 * This function is to finish the iterator for striped directory.
318 * \param[in] env execution environment.
319 * \param[in] di the iterator for the striped directory
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
324 struct lod_it *it = (struct lod_it *)di;
325 struct lod_object *lo = lod_dt_obj(it->lit_obj);
326 struct dt_object *next;
328 LOD_CHECK_STRIPED_IT(env, it, lo);
330 next = lo->ldo_stripe[it->lit_stripe_index];
331 LASSERT(next != NULL);
332 LASSERT(next->do_index_ops != NULL);
334 next->do_index_ops->dio_it.fini(env, it->lit_it);
336 /* the iterator not in use any more */
339 it->lit_stripe_index = 0;
343 * Implementation of dt_index_operations:: dio_it.get
345 * This function is to position the iterator with given key
347 * \param[in] env execution environment.
348 * \param[in] di the iterator for striped directory.
349 * \param[in] key the key the iterator will be positioned.
351 * \retval 0 if successfully position iterator by the key.
352 * \retval negative error if position is failed.
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355 const struct dt_key *key)
357 const struct lod_it *it = (const struct lod_it *)di;
358 struct lod_object *lo = lod_dt_obj(it->lit_obj);
359 struct dt_object *next;
362 LOD_CHECK_STRIPED_IT(env, it, lo);
364 next = lo->ldo_stripe[it->lit_stripe_index];
365 LASSERT(next != NULL);
366 LASSERT(next->do_index_ops != NULL);
368 return next->do_index_ops->dio_it.get(env, it->lit_it, key);
372 * Implementation of dt_index_operations:: dio_it.put
374 * This function is supposed to be the pair of it_get, but currently do
375 * nothing. see (osd_it_ea_put or osd_index_it_put)
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
379 struct lod_it *it = (struct lod_it *)di;
380 struct lod_object *lo = lod_dt_obj(it->lit_obj);
381 struct dt_object *next;
383 LOD_CHECK_STRIPED_IT(env, it, lo);
385 next = lo->ldo_stripe[it->lit_stripe_index];
386 LASSERT(next != NULL);
387 LASSERT(next->do_index_ops != NULL);
389 return next->do_index_ops->dio_it.put(env, it->lit_it);
393 * Implementation of dt_index_operations:: dio_it.next
395 * This function is to position the iterator to the next entry, if current
396 * stripe is finished by checking the return value of next() in current
397 * stripe. it will go to next stripe. In the mean time, the sub-iterator
398 * for next stripe needs to be initialized.
400 * \param[in] env execution environment.
401 * \param[in] di the iterator for striped directory.
403 * \retval 0 if successfully position iterator to the next entry.
404 * \retval negative error if position is failed.
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
408 struct lod_it *it = (struct lod_it *)di;
409 struct lod_object *lo = lod_dt_obj(it->lit_obj);
410 struct dt_object *next;
411 struct dt_it *it_next;
415 LOD_CHECK_STRIPED_IT(env, it, lo);
417 next = lo->ldo_stripe[it->lit_stripe_index];
418 LASSERT(next != NULL);
419 LASSERT(next->do_index_ops != NULL);
421 rc = next->do_index_ops->dio_it.next(env, it->lit_it);
425 if (rc == 0 && it->lit_stripe_index == 0)
428 if (rc == 0 && it->lit_stripe_index > 0) {
429 struct lu_dirent *ent;
431 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
433 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434 (struct dt_rec *)ent,
439 /* skip . and .. for slave stripe */
440 if ((strncmp(ent->lde_name, ".",
441 le16_to_cpu(ent->lde_namelen)) == 0 &&
442 le16_to_cpu(ent->lde_namelen) == 1) ||
443 (strncmp(ent->lde_name, "..",
444 le16_to_cpu(ent->lde_namelen)) == 0 &&
445 le16_to_cpu(ent->lde_namelen) == 2))
451 /* go to next stripe */
452 if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
455 it->lit_stripe_index++;
457 next->do_index_ops->dio_it.put(env, it->lit_it);
458 next->do_index_ops->dio_it.fini(env, it->lit_it);
460 rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
464 next = lo->ldo_stripe[it->lit_stripe_index];
465 LASSERT(next != NULL);
466 LASSERT(next->do_index_ops != NULL);
468 it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
470 if (!IS_ERR(it_next)) {
471 it->lit_it = it_next;
474 rc = PTR_ERR(it_next);
481 * Implementation of dt_index_operations:: dio_it.key
483 * This function is to get the key of the iterator at current position.
485 * \param[in] env execution environment.
486 * \param[in] di the iterator for striped directory.
488 * \retval key(dt_key) if successfully get the key.
489 * \retval negative error if can not get the key.
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492 const struct dt_it *di)
494 const struct lod_it *it = (const struct lod_it *)di;
495 struct lod_object *lo = lod_dt_obj(it->lit_obj);
496 struct dt_object *next;
498 LOD_CHECK_STRIPED_IT(env, it, lo);
500 next = lo->ldo_stripe[it->lit_stripe_index];
501 LASSERT(next != NULL);
502 LASSERT(next->do_index_ops != NULL);
504 return next->do_index_ops->dio_it.key(env, it->lit_it);
508 * Implementation of dt_index_operations:: dio_it.key_size
510 * This function is to get the key_size of current key.
512 * \param[in] env execution environment.
513 * \param[in] di the iterator for striped directory.
515 * \retval key_size if successfully get the key_size.
516 * \retval negative error if can not get the key_size.
518 static int lod_striped_it_key_size(const struct lu_env *env,
519 const struct dt_it *di)
521 struct lod_it *it = (struct lod_it *)di;
522 struct lod_object *lo = lod_dt_obj(it->lit_obj);
523 struct dt_object *next;
525 LOD_CHECK_STRIPED_IT(env, it, lo);
527 next = lo->ldo_stripe[it->lit_stripe_index];
528 LASSERT(next != NULL);
529 LASSERT(next->do_index_ops != NULL);
531 return next->do_index_ops->dio_it.key_size(env, it->lit_it);
535 * Implementation of dt_index_operations:: dio_it.rec
537 * This function is to get the record at current position.
539 * \param[in] env execution environment.
540 * \param[in] di the iterator for striped directory.
541 * \param[in] attr the attribute of iterator, mostly used to indicate
542 * the entry attribute in the object to be iterated.
543 * \param[out] rec hold the return record.
545 * \retval 0 if successfully get the entry.
546 * \retval negative error if can not get entry.
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549 struct dt_rec *rec, __u32 attr)
551 const struct lod_it *it = (const struct lod_it *)di;
552 struct lod_object *lo = lod_dt_obj(it->lit_obj);
553 struct dt_object *next;
555 LOD_CHECK_STRIPED_IT(env, it, lo);
557 next = lo->ldo_stripe[it->lit_stripe_index];
558 LASSERT(next != NULL);
559 LASSERT(next->do_index_ops != NULL);
561 return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
565 * Implementation of dt_index_operations:: dio_it.rec_size
567 * This function is to get the record_size at current record.
569 * \param[in] env execution environment.
570 * \param[in] di the iterator for striped directory.
571 * \param[in] attr the attribute of iterator, mostly used to indicate
572 * the entry attribute in the object to be iterated.
574 * \retval rec_size if successfully get the entry size.
575 * \retval negative error if can not get entry size.
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578 const struct dt_it *di, __u32 attr)
580 struct lod_it *it = (struct lod_it *)di;
581 struct lod_object *lo = lod_dt_obj(it->lit_obj);
582 struct dt_object *next;
584 LOD_CHECK_STRIPED_IT(env, it, lo);
586 next = lo->ldo_stripe[it->lit_stripe_index];
587 LASSERT(next != NULL);
588 LASSERT(next->do_index_ops != NULL);
590 return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
594 * Implementation of dt_index_operations:: dio_it.store
596 * This function will a cookie for current position of the iterator head,
597 * so that user can use this cookie to load/start the iterator next time.
599 * \param[in] env execution environment.
600 * \param[in] di the iterator for striped directory.
602 * \retval the cookie.
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605 const struct dt_it *di)
607 const struct lod_it *it = (const struct lod_it *)di;
608 struct lod_object *lo = lod_dt_obj(it->lit_obj);
609 struct dt_object *next;
611 LOD_CHECK_STRIPED_IT(env, it, lo);
613 next = lo->ldo_stripe[it->lit_stripe_index];
614 LASSERT(next != NULL);
615 LASSERT(next->do_index_ops != NULL);
617 return next->do_index_ops->dio_it.store(env, it->lit_it);
621 * Implementation of dt_index_operations:: dio_it.load
623 * This function will position the iterator with the given hash(usually
626 * \param[in] env execution environment.
627 * \param[in] di the iterator for striped directory.
628 * \param[in] hash the given hash.
630 * \retval >0 if successfuly load the iterator to the given position.
631 * \retval <0 if load is failed.
633 static int lod_striped_it_load(const struct lu_env *env,
634 const struct dt_it *di, __u64 hash)
636 const struct lod_it *it = (const struct lod_it *)di;
637 struct lod_object *lo = lod_dt_obj(it->lit_obj);
638 struct dt_object *next;
640 LOD_CHECK_STRIPED_IT(env, it, lo);
642 next = lo->ldo_stripe[it->lit_stripe_index];
643 LASSERT(next != NULL);
644 LASSERT(next->do_index_ops != NULL);
646 return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
649 static struct dt_index_operations lod_striped_index_ops = {
650 .dio_lookup = lod_index_lookup,
651 .dio_declare_insert = lod_declare_index_insert,
652 .dio_insert = lod_index_insert,
653 .dio_declare_delete = lod_declare_index_delete,
654 .dio_delete = lod_index_delete,
656 .init = lod_striped_it_init,
657 .fini = lod_striped_it_fini,
658 .get = lod_striped_it_get,
659 .put = lod_striped_it_put,
660 .next = lod_striped_it_next,
661 .key = lod_striped_it_key,
662 .key_size = lod_striped_it_key_size,
663 .rec = lod_striped_it_rec,
664 .rec_size = lod_striped_it_rec_size,
665 .store = lod_striped_it_store,
666 .load = lod_striped_it_load,
671 * Implementation of dt_object_operations:: do_index_try
673 * This function will try to initialize the index api pointer for the
674 * given object, usually it the entry point of the index api. i.e.
675 * the index object should be initialized in index_try, then start
676 * using index api. For striped directory, it will try to initialize
677 * all of its sub_stripes.
679 * \param[in] env execution environment.
680 * \param[in] dt the index object to be initialized.
681 * \param[in] feat the features of this object, for example fixed or
682 * variable key size etc.
684 * \retval >0 if the initialization is successful.
685 * \retval <0 if the initialization is failed.
687 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
688 const struct dt_index_features *feat)
690 struct lod_object *lo = lod_dt_obj(dt);
691 struct dt_object *next = dt_object_child(dt);
695 LASSERT(next->do_ops);
696 LASSERT(next->do_ops->do_index_try);
698 rc = lod_load_striping_locked(env, lo);
702 rc = next->do_ops->do_index_try(env, next, feat);
706 if (lo->ldo_stripenr > 0) {
709 for (i = 0; i < lo->ldo_stripenr; i++) {
710 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
712 rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
713 lo->ldo_stripe[i], feat);
717 dt->do_index_ops = &lod_striped_index_ops;
719 dt->do_index_ops = &lod_index_ops;
725 static void lod_object_read_lock(const struct lu_env *env,
726 struct dt_object *dt, unsigned role)
728 dt_read_lock(env, dt_object_child(dt), role);
731 static void lod_object_write_lock(const struct lu_env *env,
732 struct dt_object *dt, unsigned role)
734 dt_write_lock(env, dt_object_child(dt), role);
737 static void lod_object_read_unlock(const struct lu_env *env,
738 struct dt_object *dt)
740 dt_read_unlock(env, dt_object_child(dt));
743 static void lod_object_write_unlock(const struct lu_env *env,
744 struct dt_object *dt)
746 dt_write_unlock(env, dt_object_child(dt));
749 static int lod_object_write_locked(const struct lu_env *env,
750 struct dt_object *dt)
752 return dt_write_locked(env, dt_object_child(dt));
755 static int lod_attr_get(const struct lu_env *env,
756 struct dt_object *dt,
757 struct lu_attr *attr,
758 struct lustre_capa *capa)
760 struct lod_object *lo = lod_dt_obj(dt);
765 rc = dt_attr_get(env, dt_object_child(dt), attr, capa);
766 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr) || rc != 0)
769 rc = lod_load_striping_locked(env, lo);
773 if (lo->ldo_stripenr == 0)
778 for (i = 0; i < lo->ldo_stripenr; i++) {
779 struct lu_attr *sub_attr = &lod_env_info(env)->lti_attr;
781 LASSERT(lo->ldo_stripe[i]);
782 if (dt_object_exists(lo->ldo_stripe[i]))
785 rc = dt_attr_get(env, lo->ldo_stripe[i], sub_attr, capa);
789 /* -2 for . and .. on each stripe */
790 if (sub_attr->la_valid & LA_NLINK && attr->la_valid & LA_NLINK)
791 attr->la_nlink += sub_attr->la_nlink - 2;
792 if (sub_attr->la_valid & LA_SIZE && attr->la_valid & LA_SIZE)
793 attr->la_size += sub_attr->la_size;
795 if (sub_attr->la_valid & LA_ATIME &&
796 attr->la_valid & LA_ATIME &&
797 attr->la_atime < sub_attr->la_atime)
798 attr->la_atime = sub_attr->la_atime;
800 if (sub_attr->la_valid & LA_CTIME &&
801 attr->la_valid & LA_CTIME &&
802 attr->la_ctime < sub_attr->la_ctime)
803 attr->la_ctime = sub_attr->la_ctime;
805 if (sub_attr->la_valid & LA_MTIME &&
806 attr->la_valid & LA_MTIME &&
807 attr->la_mtime < sub_attr->la_mtime)
808 attr->la_mtime = sub_attr->la_mtime;
811 CDEBUG(D_INFO, DFID" stripe_count %d nlink %u size "LPU64"\n",
812 PFID(lu_object_fid(&dt->do_lu)), lo->ldo_stripenr,
813 attr->la_nlink, attr->la_size);
819 * Mark all of sub-stripes dead of the striped directory.
821 static int lod_mark_dead_object(const struct lu_env *env,
822 struct dt_object *dt,
823 struct thandle *handle,
826 struct lod_object *lo = lod_dt_obj(dt);
827 struct lmv_mds_md_v1 *lmv;
828 __u32 dead_hash_type;
834 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
837 rc = lod_load_striping_locked(env, lo);
841 if (lo->ldo_stripenr == 0)
844 rc = lod_get_lmv_ea(env, lo);
848 lmv = lod_env_info(env)->lti_ea_store;
849 lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
850 dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
851 lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
852 for (i = 0; i < lo->ldo_stripenr; i++) {
855 lmv->lmv_master_mdt_index = i;
857 buf.lb_len = sizeof(*lmv);
859 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
861 LU_XATTR_REPLACE, handle);
863 rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
864 XATTR_NAME_LMV, LU_XATTR_REPLACE,
865 handle, BYPASS_CAPA);
874 static int lod_declare_attr_set(const struct lu_env *env,
875 struct dt_object *dt,
876 const struct lu_attr *attr,
877 struct thandle *handle)
879 struct dt_object *next = dt_object_child(dt);
880 struct lod_object *lo = lod_dt_obj(dt);
884 /* Set dead object on all other stripes */
885 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
886 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
887 rc = lod_mark_dead_object(env, dt, handle, true);
892 * declare setattr on the local object
894 rc = dt_declare_attr_set(env, next, attr, handle);
898 /* osp_declare_attr_set() ignores all attributes other than
899 * UID, GID, and size, and osp_attr_set() ignores all but UID
900 * and GID. Declaration of size attr setting happens through
901 * lod_declare_init_size(), and not through this function.
902 * Therefore we need not load striping unless ownership is
903 * changing. This should save memory and (we hope) speed up
905 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
906 if (!(attr->la_valid & (LA_UID | LA_GID)))
909 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
912 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
913 LA_ATIME | LA_MTIME | LA_CTIME)))
917 * load striping information, notice we don't do this when object
918 * is being initialized as we don't need this information till
919 * few specific cases like destroy, chown
921 rc = lod_load_striping(env, lo);
925 if (lo->ldo_stripenr == 0)
929 * if object is striped declare changes on the stripes
931 LASSERT(lo->ldo_stripe);
932 for (i = 0; i < lo->ldo_stripenr; i++) {
933 if (likely(lo->ldo_stripe[i] != NULL)) {
934 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
937 CERROR("failed declaration: %d\n", rc);
943 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
944 dt_object_exists(next) != 0 &&
945 dt_object_remote(next) == 0)
946 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
948 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
949 dt_object_exists(next) &&
950 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
951 struct lod_thread_info *info = lod_env_info(env);
952 struct lu_buf *buf = &info->lti_buf;
954 buf->lb_buf = info->lti_ea_store;
955 buf->lb_len = info->lti_ea_store_size;
956 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
957 LU_XATTR_REPLACE, handle);
963 static int lod_attr_set(const struct lu_env *env,
964 struct dt_object *dt,
965 const struct lu_attr *attr,
966 struct thandle *handle,
967 struct lustre_capa *capa)
969 struct dt_object *next = dt_object_child(dt);
970 struct lod_object *lo = lod_dt_obj(dt);
974 /* Set dead object on all other stripes */
975 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
976 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
977 rc = lod_mark_dead_object(env, dt, handle, false);
982 * apply changes to the local object
984 rc = dt_attr_set(env, next, attr, handle, capa);
988 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
989 if (!(attr->la_valid & (LA_UID | LA_GID)))
992 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
995 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
996 LA_ATIME | LA_MTIME | LA_CTIME)))
1000 if (lo->ldo_stripenr == 0)
1004 * if object is striped, apply changes to all the stripes
1006 LASSERT(lo->ldo_stripe);
1007 for (i = 0; i < lo->ldo_stripenr; i++) {
1008 if (likely(lo->ldo_stripe[i] != NULL)) {
1009 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
1012 rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
1015 CERROR("failed declaration: %d\n", rc);
1021 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1022 dt_object_exists(next) != 0 &&
1023 dt_object_remote(next) == 0)
1024 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1026 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1027 dt_object_exists(next) &&
1028 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1029 struct lod_thread_info *info = lod_env_info(env);
1030 struct lu_buf *buf = &info->lti_buf;
1031 struct ost_id *oi = &info->lti_ostid;
1032 struct lu_fid *fid = &info->lti_fid;
1033 struct lov_mds_md_v1 *lmm;
1034 struct lov_ost_data_v1 *objs;
1038 rc1 = lod_get_lov_ea(env, lo);
1042 buf->lb_buf = info->lti_ea_store;
1043 buf->lb_len = info->lti_ea_store_size;
1044 lmm = info->lti_ea_store;
1045 magic = le32_to_cpu(lmm->lmm_magic);
1046 if (magic == LOV_MAGIC_V1)
1047 objs = &(lmm->lmm_objects[0]);
1049 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1050 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1051 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1053 fid_to_ostid(fid, oi);
1054 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1055 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1056 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1062 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1063 struct lu_buf *buf, const char *name,
1064 struct lustre_capa *capa)
1066 struct lod_thread_info *info = lod_env_info(env);
1067 struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev);
1071 rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1072 if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1076 * lod returns default striping on the real root of the device
1077 * this is like the root stores default striping for the whole
1078 * filesystem. historically we've been using a different approach
1079 * and store it in the config.
1081 dt_root_get(env, dev->lod_child, &info->lti_fid);
1082 is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1084 if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1085 struct lov_user_md *lum = buf->lb_buf;
1086 struct lov_desc *desc = &dev->lod_desc;
1088 if (buf->lb_buf == NULL) {
1090 } else if (buf->lb_len >= sizeof(*lum)) {
1091 lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1092 lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1093 lmm_oi_set_id(&lum->lmm_oi, 0);
1094 lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1095 lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1096 lum->lmm_stripe_size = cpu_to_le32(
1097 desc->ld_default_stripe_size);
1098 lum->lmm_stripe_count = cpu_to_le16(
1099 desc->ld_default_stripe_count);
1100 lum->lmm_stripe_offset = cpu_to_le16(
1101 desc->ld_default_stripe_offset);
1111 static int lod_verify_md_striping(struct lod_device *lod,
1112 const struct lmv_user_md_v1 *lum)
1117 if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1118 GOTO(out, rc = -EINVAL);
1120 if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1121 GOTO(out, rc = -EINVAL);
1124 CERROR("%s: invalid lmv_user_md: magic = %x, "
1125 "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1126 lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1127 (int)le32_to_cpu(lum->lum_stripe_offset),
1128 le32_to_cpu(lum->lum_stripe_count), rc);
1133 * Master LMVEA will be same as slave LMVEA, except
1134 * 1. different magic
1135 * 2. No lmv_stripe_fids on slave
1136 * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1138 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1139 const struct lmv_mds_md_v1 *master_lmv)
1141 *slave_lmv = *master_lmv;
1142 slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1145 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1146 struct lu_buf *lmv_buf)
1148 struct lod_thread_info *info = lod_env_info(env);
1149 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1150 struct lod_object *lo = lod_dt_obj(dt);
1151 struct lmv_mds_md_v1 *lmm1;
1154 int type = LU_SEQ_RANGE_ANY;
1160 LASSERT(lo->ldo_dir_striped != 0);
1161 LASSERT(lo->ldo_stripenr > 0);
1162 stripe_count = lo->ldo_stripenr;
1163 lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
1164 if (info->lti_ea_store_size < lmm_size) {
1165 rc = lod_ea_store_resize(info, lmm_size);
1170 lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1171 lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1172 lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1173 lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1174 rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1179 lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1180 fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
1181 for (i = 0; i < lo->ldo_stripenr; i++) {
1182 struct dt_object *dto;
1184 dto = lo->ldo_stripe[i];
1185 LASSERT(dto != NULL);
1186 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
1187 lu_object_fid(&dto->do_lu));
1190 lmv_buf->lb_buf = info->lti_ea_store;
1191 lmv_buf->lb_len = lmm_size;
1192 lo->ldo_dir_striping_cached = 1;
1197 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1198 const struct lu_buf *buf)
1200 struct lod_thread_info *info = lod_env_info(env);
1201 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1202 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1203 struct dt_object **stripe;
1204 union lmv_mds_md *lmm = buf->lb_buf;
1205 struct lmv_mds_md_v1 *lmv1 = &lmm->lmv_md_v1;
1206 struct lu_fid *fid = &info->lti_fid;
1211 if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1214 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1215 lo->ldo_dir_slave_stripe = 1;
1219 if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1222 if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
1225 LASSERT(lo->ldo_stripe == NULL);
1226 OBD_ALLOC(stripe, sizeof(stripe[0]) *
1227 (le32_to_cpu(lmv1->lmv_stripe_count)));
1231 for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1232 struct dt_device *tgt_dt;
1233 struct dt_object *dto;
1234 int type = LU_SEQ_RANGE_ANY;
1237 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1238 if (!fid_is_sane(fid))
1239 GOTO(out, rc = -ESTALE);
1241 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1245 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1246 tgt_dt = lod->lod_child;
1248 struct lod_tgt_desc *tgt;
1250 tgt = LTD_TGT(ltd, idx);
1252 GOTO(out, rc = -ESTALE);
1253 tgt_dt = tgt->ltd_tgt;
1256 dto = dt_locate_at(env, tgt_dt, fid,
1257 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1260 GOTO(out, rc = PTR_ERR(dto));
1265 lo->ldo_stripe = stripe;
1266 lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1267 lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1269 lod_object_free_striping(env, lo);
1274 static int lod_prep_md_striped_create(const struct lu_env *env,
1275 struct dt_object *dt,
1276 struct lu_attr *attr,
1277 const struct lmv_user_md_v1 *lum,
1278 struct dt_object_format *dof,
1281 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1282 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1283 struct lod_object *lo = lod_dt_obj(dt);
1284 struct lod_thread_info *info = lod_env_info(env);
1285 struct dt_object **stripe;
1286 struct lu_buf lmv_buf;
1287 struct lu_buf slave_lmv_buf;
1288 struct lmv_mds_md_v1 *lmm;
1289 struct lmv_mds_md_v1 *slave_lmm = NULL;
1297 /* The lum has been verifed in lod_verify_md_striping */
1298 LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1299 LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1301 stripe_count = le32_to_cpu(lum->lum_stripe_count);
1303 /* shrink the stripe_count to the avaible MDT count */
1304 if (stripe_count > lod->lod_remote_mdt_count + 1)
1305 stripe_count = lod->lod_remote_mdt_count + 1;
1307 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1311 OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1312 if (idx_array == NULL)
1313 GOTO(out_free, rc = -ENOMEM);
1315 for (i = 0; i < stripe_count; i++) {
1316 struct lod_tgt_desc *tgt = NULL;
1317 struct dt_object *dto;
1318 struct lu_fid fid = { 0 };
1320 struct lu_object_conf conf = { 0 };
1321 struct dt_device *tgt_dt = NULL;
1324 /* Right now, master stripe and master object are
1325 * on the same MDT */
1326 idx = le32_to_cpu(lum->lum_stripe_offset);
1327 rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1331 tgt_dt = lod->lod_child;
1335 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1337 for (j = 0; j < lod->lod_remote_mdt_count;
1338 j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1339 bool already_allocated = false;
1342 CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1343 " allocated %d, last allocated %d\n", idx,
1344 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1346 /* Find next available target */
1347 if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1350 /* check whether the idx already exists
1351 * in current allocated array */
1352 for (k = 0; k < i; k++) {
1353 if (idx_array[k] == idx) {
1354 already_allocated = true;
1359 if (already_allocated)
1362 /* check the status of the OSP */
1363 tgt = LTD_TGT(ltd, idx);
1367 tgt_dt = tgt->ltd_tgt;
1368 rc = dt_statfs(env, tgt_dt, NULL);
1370 /* this OSP doesn't feel well */
1375 rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1384 /* Can not allocate more stripes */
1385 if (j == lod->lod_remote_mdt_count) {
1386 CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1387 lod2obd(lod)->obd_name, stripe_count, i - 1);
1391 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1392 " allocated %d, last allocated %d\n", idx,
1393 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1396 /* tgt_dt and fid must be ready after search avaible OSP
1397 * in the above loop */
1398 LASSERT(tgt_dt != NULL);
1399 LASSERT(fid_is_sane(&fid));
1400 conf.loc_flags = LOC_F_NEW;
1401 dto = dt_locate_at(env, tgt_dt, &fid,
1402 dt->do_lu.lo_dev->ld_site->ls_top_dev,
1405 GOTO(out_put, rc = PTR_ERR(dto));
1410 lo->ldo_dir_striped = 1;
1411 lo->ldo_stripe = stripe;
1412 lo->ldo_stripenr = i;
1413 lo->ldo_stripes_allocated = stripe_count;
1415 if (lo->ldo_stripenr == 0)
1416 GOTO(out_put, rc = -ENOSPC);
1418 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1421 lmm = lmv_buf.lb_buf;
1423 OBD_ALLOC_PTR(slave_lmm);
1424 if (slave_lmm == NULL)
1425 GOTO(out_put, rc = -ENOMEM);
1427 lod_prep_slave_lmv_md(slave_lmm, lmm);
1428 slave_lmv_buf.lb_buf = slave_lmm;
1429 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1431 if (!dt_try_as_dir(env, dt_object_child(dt)))
1432 GOTO(out_put, rc = -EINVAL);
1434 for (i = 0; i < lo->ldo_stripenr; i++) {
1435 struct dt_object *dto = stripe[i];
1436 char *stripe_name = info->lti_key;
1437 struct lu_name *sname;
1438 struct linkea_data ldata = { 0 };
1439 struct lu_buf linkea_buf;
1441 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1445 if (!dt_try_as_dir(env, dto))
1446 GOTO(out_put, rc = -EINVAL);
1448 rc = dt_declare_insert(env, dto,
1449 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1450 (const struct dt_key *)dot, th);
1454 /* master stripe FID will be put to .. */
1455 rc = dt_declare_insert(env, dto,
1456 (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1457 (const struct dt_key *)dotdot, th);
1461 /* probably nothing to inherite */
1462 if (lo->ldo_striping_cached &&
1463 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1464 lo->ldo_def_stripenr,
1465 lo->ldo_def_stripe_offset)) {
1466 struct lov_user_md_v3 *v3;
1468 /* sigh, lti_ea_store has been used for lmv_buf,
1469 * so we have to allocate buffer for default
1473 GOTO(out_put, rc = -ENOMEM);
1475 memset(v3, 0, sizeof(*v3));
1476 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1477 v3->lmm_stripe_count =
1478 cpu_to_le16(lo->ldo_def_stripenr);
1479 v3->lmm_stripe_offset =
1480 cpu_to_le16(lo->ldo_def_stripe_offset);
1481 v3->lmm_stripe_size =
1482 cpu_to_le32(lo->ldo_def_stripe_size);
1483 if (lo->ldo_pool != NULL)
1484 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1485 sizeof(v3->lmm_pool_name));
1487 info->lti_buf.lb_buf = v3;
1488 info->lti_buf.lb_len = sizeof(*v3);
1489 rc = dt_declare_xattr_set(env, dto,
1498 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1499 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1500 XATTR_NAME_LMV, 0, th);
1504 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1505 PFID(lu_object_fid(&dto->do_lu)), i);
1507 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1508 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1512 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1516 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1517 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1518 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1519 XATTR_NAME_LINK, 0, th);
1523 rc = dt_declare_insert(env, dt_object_child(dt),
1524 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1525 (const struct dt_key *)stripe_name, th);
1529 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1534 rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1535 XATTR_NAME_LMV, 0, th);
1541 for (i = 0; i < stripe_count; i++)
1542 if (stripe[i] != NULL)
1543 lu_object_put(env, &stripe[i]->do_lu);
1544 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1545 lo->ldo_stripenr = 0;
1546 lo->ldo_stripes_allocated = 0;
1547 lo->ldo_stripe = NULL;
1551 if (idx_array != NULL)
1552 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1553 if (slave_lmm != NULL)
1554 OBD_FREE_PTR(slave_lmm);
1560 * Declare create striped md object.
1562 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1563 struct dt_object *dt,
1564 struct lu_attr *attr,
1565 const struct lu_buf *lum_buf,
1566 struct dt_object_format *dof,
1569 struct lod_object *lo = lod_dt_obj(dt);
1570 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1571 struct lmv_user_md_v1 *lum;
1575 lum = lum_buf->lb_buf;
1576 LASSERT(lum != NULL);
1578 CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1579 le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1580 (int)le32_to_cpu(lum->lum_stripe_offset));
1582 if (le32_to_cpu(lum->lum_stripe_count) == 0)
1585 rc = lod_verify_md_striping(lod, lum);
1589 /* prepare dir striped objects */
1590 rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1592 /* failed to create striping, let's reset
1593 * config so that others don't get confused */
1594 lod_object_free_striping(env, lo);
1601 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1602 struct dt_object *dt,
1603 const struct lu_buf *buf,
1604 const char *name, int fl,
1607 struct dt_object *next = dt_object_child(dt);
1608 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1609 struct lod_object *lo = lod_dt_obj(dt);
1614 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1615 struct lmv_user_md_v1 *lum;
1617 LASSERT(buf != NULL && buf->lb_buf != NULL);
1619 rc = lod_verify_md_striping(d, lum);
1624 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1628 /* set xattr to each stripes, if needed */
1629 rc = lod_load_striping(env, lo);
1633 if (lo->ldo_stripenr == 0)
1636 for (i = 0; i < lo->ldo_stripenr; i++) {
1637 LASSERT(lo->ldo_stripe[i]);
1638 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1648 * LOV xattr is a storage for striping, and LOD owns this xattr.
1649 * but LOD allows others to control striping to some extent
1650 * - to reset strping
1651 * - to set new defined striping
1652 * - to set new semi-defined striping
1653 * - number of stripes is defined
1654 * - number of stripes + osts are defined
1657 static int lod_declare_xattr_set(const struct lu_env *env,
1658 struct dt_object *dt,
1659 const struct lu_buf *buf,
1660 const char *name, int fl,
1663 struct dt_object *next = dt_object_child(dt);
1664 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
1670 * allow to declare predefined striping on a new (!mode) object
1671 * which is supposed to be replay of regular file creation
1672 * (when LOV setting is declared)
1673 * LU_XATTR_REPLACE is set to indicate a layout swap
1675 mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1676 if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1677 !(fl & LU_XATTR_REPLACE)) {
1679 * this is a request to manipulate object's striping
1681 if (dt_object_exists(dt)) {
1682 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1686 memset(attr, 0, sizeof(*attr));
1687 attr->la_valid = LA_TYPE | LA_MODE;
1688 attr->la_mode = S_IFREG;
1690 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1691 } else if (S_ISDIR(mode)) {
1692 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1694 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1700 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1702 lo->ldo_striping_cached = 0;
1703 lo->ldo_def_striping_set = 0;
1704 lod_object_set_pool(lo, NULL);
1705 lo->ldo_def_stripe_size = 0;
1706 lo->ldo_def_stripenr = 0;
1707 if (lo->ldo_dir_stripe != NULL)
1708 lo->ldo_dir_striping_cached = 0;
1711 static int lod_xattr_set_internal(const struct lu_env *env,
1712 struct dt_object *dt,
1713 const struct lu_buf *buf,
1714 const char *name, int fl, struct thandle *th,
1715 struct lustre_capa *capa)
1717 struct dt_object *next = dt_object_child(dt);
1718 struct lod_object *lo = lod_dt_obj(dt);
1723 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1724 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1727 if (lo->ldo_stripenr == 0)
1730 for (i = 0; i < lo->ldo_stripenr; i++) {
1731 LASSERT(lo->ldo_stripe[i]);
1732 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1741 static int lod_xattr_del_internal(const struct lu_env *env,
1742 struct dt_object *dt,
1743 const char *name, struct thandle *th,
1744 struct lustre_capa *capa)
1746 struct dt_object *next = dt_object_child(dt);
1747 struct lod_object *lo = lod_dt_obj(dt);
1752 rc = dt_xattr_del(env, next, name, th, capa);
1753 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1756 if (lo->ldo_stripenr == 0)
1759 for (i = 0; i < lo->ldo_stripenr; i++) {
1760 LASSERT(lo->ldo_stripe[i]);
1761 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1770 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1771 struct dt_object *dt,
1772 const struct lu_buf *buf,
1773 const char *name, int fl,
1775 struct lustre_capa *capa)
1777 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1778 struct lod_object *l = lod_dt_obj(dt);
1779 struct lov_user_md_v1 *lum;
1780 struct lov_user_md_v3 *v3 = NULL;
1784 /* If it is striped dir, we should clear the stripe cache for
1785 * slave stripe as well, but there are no effective way to
1786 * notify the LOD on the slave MDT, so we do not cache stripe
1787 * information for slave stripe for now. XXX*/
1788 lod_lov_stripe_cache_clear(l);
1789 LASSERT(buf != NULL && buf->lb_buf != NULL);
1792 rc = lod_verify_striping(d, buf, false);
1796 if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1799 /* if { size, offset, count } = { 0, -1, 0 } and no pool
1800 * (i.e. all default values specified) then delete default
1801 * striping from dir. */
1803 "set default striping: sz %u # %u offset %d %s %s\n",
1804 (unsigned)lum->lmm_stripe_size,
1805 (unsigned)lum->lmm_stripe_count,
1806 (int)lum->lmm_stripe_offset,
1807 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1809 if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1810 (lum->lmm_stripe_count),
1811 (lum->lmm_stripe_offset)) &&
1812 lum->lmm_magic == LOV_USER_MAGIC_V1) {
1813 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1817 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1823 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1824 struct dt_object *dt,
1825 const struct lu_buf *buf,
1826 const char *name, int fl,
1828 struct lustre_capa *capa)
1830 struct lod_object *l = lod_dt_obj(dt);
1831 struct lmv_user_md_v1 *lum;
1835 LASSERT(buf != NULL && buf->lb_buf != NULL);
1838 CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1839 le32_to_cpu(lum->lum_stripe_count),
1840 (int)le32_to_cpu(lum->lum_stripe_offset));
1842 if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1843 le32_to_cpu(lum->lum_stripe_offset)) &&
1844 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1845 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1849 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1854 /* Update default stripe cache */
1855 if (l->ldo_dir_stripe == NULL) {
1856 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1857 if (l->ldo_dir_stripe == NULL)
1861 l->ldo_dir_striping_cached = 0;
1862 l->ldo_dir_def_striping_set = 1;
1863 l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
1868 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1869 const struct lu_buf *buf, const char *name,
1870 int fl, struct thandle *th,
1871 struct lustre_capa *capa)
1873 struct lod_object *lo = lod_dt_obj(dt);
1874 struct lod_thread_info *info = lod_env_info(env);
1875 struct lu_attr *attr = &info->lti_attr;
1876 struct dt_object_format *dof = &info->lti_format;
1877 struct lu_buf lmv_buf;
1878 struct lu_buf slave_lmv_buf;
1879 struct lmv_mds_md_v1 *lmm;
1880 struct lmv_mds_md_v1 *slave_lmm = NULL;
1885 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1888 /* The stripes are supposed to be allocated in declare phase,
1889 * if there are no stripes being allocated, it will skip */
1890 if (lo->ldo_stripenr == 0)
1893 rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
1897 attr->la_valid = LA_TYPE | LA_MODE;
1898 dof->dof_type = DFT_DIR;
1900 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1903 lmm = lmv_buf.lb_buf;
1905 OBD_ALLOC_PTR(slave_lmm);
1906 if (slave_lmm == NULL)
1909 lod_prep_slave_lmv_md(slave_lmm, lmm);
1910 slave_lmv_buf.lb_buf = slave_lmm;
1911 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1913 for (i = 0; i < lo->ldo_stripenr; i++) {
1914 struct dt_object *dto;
1915 char *stripe_name = info->lti_key;
1916 struct lu_name *sname;
1917 struct linkea_data ldata = { 0 };
1918 struct lu_buf linkea_buf;
1920 dto = lo->ldo_stripe[i];
1921 dt_write_lock(env, dto, MOR_TGT_CHILD);
1922 rc = dt_create(env, dto, attr, NULL, dof, th);
1923 dt_write_unlock(env, dto);
1927 rc = dt_insert(env, dto,
1928 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1929 (const struct dt_key *)dot, th, capa, 0);
1933 rc = dt_insert(env, dto,
1934 (struct dt_rec *)lu_object_fid(&dt->do_lu),
1935 (const struct dt_key *)dotdot, th, capa, 0);
1939 if (lo->ldo_striping_cached &&
1940 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1941 lo->ldo_def_stripenr,
1942 lo->ldo_def_stripe_offset)) {
1943 struct lov_user_md_v3 *v3;
1945 /* sigh, lti_ea_store has been used for lmv_buf,
1946 * so we have to allocate buffer for default
1952 memset(v3, 0, sizeof(*v3));
1953 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1954 v3->lmm_stripe_count =
1955 cpu_to_le16(lo->ldo_def_stripenr);
1956 v3->lmm_stripe_offset =
1957 cpu_to_le16(lo->ldo_def_stripe_offset);
1958 v3->lmm_stripe_size =
1959 cpu_to_le32(lo->ldo_def_stripe_size);
1960 if (lo->ldo_pool != NULL)
1961 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1962 sizeof(v3->lmm_pool_name));
1964 info->lti_buf.lb_buf = v3;
1965 info->lti_buf.lb_len = sizeof(*v3);
1966 rc = dt_xattr_set(env, dto, &info->lti_buf,
1967 XATTR_NAME_LOV, 0, th, capa);
1973 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1974 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
1979 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1980 PFID(lu_object_fid(&dto->do_lu)), i);
1982 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1983 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1987 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1991 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1992 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1993 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
1994 0, th, BYPASS_CAPA);
1998 rc = dt_insert(env, dt_object_child(dt),
1999 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
2000 (const struct dt_key *)stripe_name, th, capa, 0);
2004 rc = dt_ref_add(env, dt_object_child(dt), th);
2009 rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
2013 if (slave_lmm != NULL)
2014 OBD_FREE_PTR(slave_lmm);
2019 int lod_dir_striping_create_internal(const struct lu_env *env,
2020 struct dt_object *dt,
2021 struct lu_attr *attr,
2022 struct dt_object_format *dof,
2026 struct lod_thread_info *info = lod_env_info(env);
2027 struct lod_object *lo = lod_dt_obj(dt);
2031 if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
2032 lo->ldo_dir_stripe_offset)) {
2033 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2034 int stripe_count = lo->ldo_stripenr;
2036 if (info->lti_ea_store_size < sizeof(*v1)) {
2037 rc = lod_ea_store_resize(info, sizeof(*v1));
2040 v1 = info->lti_ea_store;
2043 memset(v1, 0, sizeof(*v1));
2044 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2045 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2046 v1->lum_stripe_offset =
2047 cpu_to_le32(lo->ldo_dir_stripe_offset);
2049 info->lti_buf.lb_buf = v1;
2050 info->lti_buf.lb_len = sizeof(*v1);
2053 rc = lod_declare_xattr_set_lmv(env, dt, attr,
2054 &info->lti_buf, dof, th);
2056 rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2057 XATTR_NAME_LMV, 0, th,
2063 /* Transfer default LMV striping from the parent */
2064 if (lo->ldo_dir_striping_cached &&
2065 !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2066 lo->ldo_dir_def_stripe_offset)) {
2067 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2068 int def_stripe_count = lo->ldo_dir_def_stripenr;
2070 if (info->lti_ea_store_size < sizeof(*v1)) {
2071 rc = lod_ea_store_resize(info, sizeof(*v1));
2074 v1 = info->lti_ea_store;
2077 memset(v1, 0, sizeof(*v1));
2078 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2079 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2080 v1->lum_stripe_offset =
2081 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2083 cpu_to_le32(lo->ldo_dir_def_hash_type);
2085 info->lti_buf.lb_buf = v1;
2086 info->lti_buf.lb_len = sizeof(*v1);
2088 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2089 XATTR_NAME_DEFAULT_LMV,
2092 rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2094 XATTR_NAME_DEFAULT_LMV, 0,
2100 /* Transfer default LOV striping from the parent */
2101 if (lo->ldo_striping_cached &&
2102 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2103 lo->ldo_def_stripenr,
2104 lo->ldo_def_stripe_offset)) {
2105 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2107 if (info->lti_ea_store_size < sizeof(*v3)) {
2108 rc = lod_ea_store_resize(info, sizeof(*v3));
2111 v3 = info->lti_ea_store;
2114 memset(v3, 0, sizeof(*v3));
2115 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2116 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2117 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2118 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2119 if (lo->ldo_pool != NULL)
2120 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2121 sizeof(v3->lmm_pool_name));
2123 info->lti_buf.lb_buf = v3;
2124 info->lti_buf.lb_len = sizeof(*v3);
2127 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2128 XATTR_NAME_LOV, 0, th);
2130 rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2131 XATTR_NAME_LOV, 0, th,
2140 static int lod_declare_dir_striping_create(const struct lu_env *env,
2141 struct dt_object *dt,
2142 struct lu_attr *attr,
2143 struct dt_object_format *dof,
2146 return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2149 static int lod_dir_striping_create(const struct lu_env *env,
2150 struct dt_object *dt,
2151 struct lu_attr *attr,
2152 struct dt_object_format *dof,
2155 return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2158 static int lod_xattr_set(const struct lu_env *env,
2159 struct dt_object *dt, const struct lu_buf *buf,
2160 const char *name, int fl, struct thandle *th,
2161 struct lustre_capa *capa)
2163 struct dt_object *next = dt_object_child(dt);
2167 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2168 strcmp(name, XATTR_NAME_LMV) == 0) {
2169 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2171 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2172 LMV_HASH_FLAG_MIGRATION)
2173 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2175 rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2180 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2181 strcmp(name, XATTR_NAME_LOV) == 0) {
2183 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2185 } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2186 strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2188 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2191 } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2192 !strcmp(name, XATTR_NAME_LOV)) {
2193 /* in case of lov EA swap, just set it
2194 * if not, it is a replay so check striping match what we
2195 * already have during req replay, declare_xattr_set()
2196 * defines striping, then create() does the work
2198 if (fl & LU_XATTR_REPLACE) {
2199 /* free stripes, then update disk */
2200 lod_object_free_striping(env, lod_dt_obj(dt));
2201 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2203 rc = lod_striping_create(env, dt, NULL, NULL, th);
2208 /* then all other xattr */
2209 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2214 static int lod_declare_xattr_del(const struct lu_env *env,
2215 struct dt_object *dt, const char *name,
2218 return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2221 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2222 const char *name, struct thandle *th,
2223 struct lustre_capa *capa)
2225 if (!strcmp(name, XATTR_NAME_LOV))
2226 lod_object_free_striping(env, lod_dt_obj(dt));
2227 return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2230 static int lod_xattr_list(const struct lu_env *env,
2231 struct dt_object *dt, struct lu_buf *buf,
2232 struct lustre_capa *capa)
2234 return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2237 int lod_object_set_pool(struct lod_object *o, char *pool)
2242 len = strlen(o->ldo_pool);
2243 OBD_FREE(o->ldo_pool, len + 1);
2248 OBD_ALLOC(o->ldo_pool, len + 1);
2249 if (o->ldo_pool == NULL)
2251 strcpy(o->ldo_pool, pool);
2256 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2258 return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2262 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2263 struct lod_object *lp)
2265 struct lod_thread_info *info = lod_env_info(env);
2266 struct lov_user_md_v1 *v1 = NULL;
2267 struct lov_user_md_v3 *v3 = NULL;
2271 /* called from MDD without parent being write locked,
2273 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2274 rc = lod_get_lov_ea(env, lp);
2278 if (rc < sizeof(struct lov_user_md)) {
2279 /* don't lookup for non-existing or invalid striping */
2280 lp->ldo_def_striping_set = 0;
2281 lp->ldo_striping_cached = 1;
2282 lp->ldo_def_stripe_size = 0;
2283 lp->ldo_def_stripenr = 0;
2284 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2285 GOTO(unlock, rc = 0);
2289 v1 = info->lti_ea_store;
2290 if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2291 lustre_swab_lov_user_md_v1(v1);
2292 } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2293 v3 = (struct lov_user_md_v3 *)v1;
2294 lustre_swab_lov_user_md_v3(v3);
2297 if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2298 GOTO(unlock, rc = 0);
2300 if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2301 GOTO(unlock, rc = 0);
2303 CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2304 PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2305 (int)v1->lmm_stripe_count,
2306 (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2308 lp->ldo_def_stripenr = v1->lmm_stripe_count;
2309 lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2310 lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2311 lp->ldo_striping_cached = 1;
2312 lp->ldo_def_striping_set = 1;
2313 if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2314 /* XXX: sanity check here */
2315 v3 = (struct lov_user_md_v3 *) v1;
2316 if (v3->lmm_pool_name[0])
2317 lod_object_set_pool(lp, v3->lmm_pool_name);
2321 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2326 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2327 struct lod_object *lp)
2329 struct lod_thread_info *info = lod_env_info(env);
2330 struct lmv_user_md_v1 *v1 = NULL;
2334 /* called from MDD without parent being write locked,
2336 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2337 rc = lod_get_default_lmv_ea(env, lp);
2341 if (rc < sizeof(struct lmv_user_md)) {
2342 /* don't lookup for non-existing or invalid striping */
2343 lp->ldo_dir_def_striping_set = 0;
2344 lp->ldo_dir_striping_cached = 1;
2345 lp->ldo_dir_def_stripenr = 0;
2346 lp->ldo_dir_def_stripe_offset =
2347 (typeof(v1->lum_stripe_offset))(-1);
2348 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2349 GOTO(unlock, rc = 0);
2353 v1 = info->lti_ea_store;
2355 lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2356 lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2357 lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2358 lp->ldo_dir_def_striping_set = 1;
2359 lp->ldo_dir_striping_cached = 1;
2363 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2367 static int lod_cache_parent_striping(const struct lu_env *env,
2368 struct lod_object *lp,
2374 rc = lod_load_striping(env, lp);
2378 if (!lp->ldo_striping_cached) {
2379 /* we haven't tried to get default striping for
2380 * the directory yet, let's cache it in the object */
2381 rc = lod_cache_parent_lov_striping(env, lp);
2386 if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2387 rc = lod_cache_parent_lmv_striping(env, lp);
2393 * used to transfer default striping data to the object being created
2395 static void lod_ah_init(const struct lu_env *env,
2396 struct dt_allocation_hint *ah,
2397 struct dt_object *parent,
2398 struct dt_object *child,
2401 struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2402 struct dt_object *nextp = NULL;
2403 struct dt_object *nextc;
2404 struct lod_object *lp = NULL;
2405 struct lod_object *lc;
2406 struct lov_desc *desc;
2412 if (likely(parent)) {
2413 nextp = dt_object_child(parent);
2414 lp = lod_dt_obj(parent);
2415 rc = lod_load_striping(env, lp);
2420 nextc = dt_object_child(child);
2421 lc = lod_dt_obj(child);
2423 LASSERT(lc->ldo_stripenr == 0);
2424 LASSERT(lc->ldo_stripe == NULL);
2427 * local object may want some hints
2428 * in case of late striping creation, ->ah_init()
2429 * can be called with local object existing
2431 if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2432 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2433 NULL : nextp, nextc, child_mode);
2435 if (S_ISDIR(child_mode)) {
2436 if (lc->ldo_dir_stripe == NULL) {
2437 OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2438 if (lc->ldo_dir_stripe == NULL)
2442 if (lp->ldo_dir_stripe == NULL) {
2443 OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2444 if (lp->ldo_dir_stripe == NULL)
2448 rc = lod_cache_parent_striping(env, lp, child_mode);
2452 /* transfer defaults to new directory */
2453 if (lp->ldo_striping_cached) {
2455 lod_object_set_pool(lc, lp->ldo_pool);
2456 lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2457 lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2458 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2459 lc->ldo_striping_cached = 1;
2460 lc->ldo_def_striping_set = 1;
2461 CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2462 (int)lc->ldo_def_stripe_size,
2463 (int)lc->ldo_def_stripe_offset,
2464 (int)lc->ldo_def_stripenr);
2467 /* transfer dir defaults to new directory */
2468 if (lp->ldo_dir_striping_cached) {
2469 lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2470 lc->ldo_dir_def_stripe_offset =
2471 lp->ldo_dir_def_stripe_offset;
2472 lc->ldo_dir_def_hash_type =
2473 lp->ldo_dir_def_hash_type;
2474 lc->ldo_dir_striping_cached = 1;
2475 lc->ldo_dir_def_striping_set = 1;
2476 CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2477 (int)lc->ldo_dir_def_stripenr,
2478 (int)lc->ldo_dir_def_stripe_offset,
2479 lc->ldo_dir_def_hash_type);
2482 /* It should always honour the specified stripes */
2483 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2484 const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2486 rc = lod_verify_md_striping(d, lum1);
2488 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2489 /* Directory will be striped only if
2490 * stripe_count > 1 */
2492 le32_to_cpu(lum1->lum_stripe_count);
2493 lc->ldo_dir_stripe_offset =
2494 le32_to_cpu(lum1->lum_stripe_offset);
2495 lc->ldo_dir_hash_type =
2496 le32_to_cpu(lum1->lum_hash_type);
2497 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2499 (int)lc->ldo_dir_stripe_offset);
2501 /* then check whether there is default stripes from parent */
2502 } else if (lp->ldo_dir_def_striping_set) {
2503 /* If there are default dir stripe from parent */
2504 lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2505 lc->ldo_dir_stripe_offset =
2506 lp->ldo_dir_def_stripe_offset;
2507 lc->ldo_dir_hash_type =
2508 lp->ldo_dir_def_hash_type;
2509 CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2511 (int)lc->ldo_dir_stripe_offset);
2513 /* set default stripe for this directory */
2514 lc->ldo_stripenr = 0;
2515 lc->ldo_dir_stripe_offset = -1;
2518 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2519 lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2525 * if object is going to be striped over OSTs, transfer default
2526 * striping information to the child, so that we can use it
2527 * during declaration and creation
2529 if (!lod_object_will_be_striped(S_ISREG(child_mode),
2530 lu_object_fid(&child->do_lu)))
2533 * try from the parent
2535 if (likely(parent)) {
2536 lod_cache_parent_striping(env, lp, child_mode);
2538 lc->ldo_def_stripe_offset = (__u16) -1;
2540 if (lp->ldo_def_striping_set) {
2542 lod_object_set_pool(lc, lp->ldo_pool);
2543 lc->ldo_stripenr = lp->ldo_def_stripenr;
2544 lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2545 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2546 CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2547 lc->ldo_stripenr, lc->ldo_stripe_size,
2548 lp->ldo_pool ? lp->ldo_pool : "");
2553 * if the parent doesn't provide with specific pattern, grab fs-wide one
2555 desc = &d->lod_desc;
2556 if (lc->ldo_stripenr == 0)
2557 lc->ldo_stripenr = desc->ld_default_stripe_count;
2558 if (lc->ldo_stripe_size == 0)
2559 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2560 CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2561 lc->ldo_stripenr, lc->ldo_stripe_size,
2562 lc->ldo_pool ? lc->ldo_pool : "");
2565 /* we do not cache stripe information for slave stripe, see
2566 * lod_xattr_set_lov_on_dir */
2567 if (lp != NULL && lp->ldo_dir_slave_stripe)
2568 lod_lov_stripe_cache_clear(lp);
2573 #define ll_do_div64(aaa,bbb) do_div((aaa), (bbb))
2575 * this function handles a special case when truncate was done
2576 * on a stripeless object and now striping is being created
2577 * we can't lose that size, so we have to propagate it to newly
2580 static int lod_declare_init_size(const struct lu_env *env,
2581 struct dt_object *dt, struct thandle *th)
2583 struct dt_object *next = dt_object_child(dt);
2584 struct lod_object *lo = lod_dt_obj(dt);
2585 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
2586 uint64_t size, offs;
2590 /* XXX: we support the simplest (RAID0) striping so far */
2591 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2592 LASSERT(lo->ldo_stripe_size > 0);
2594 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2595 LASSERT(attr->la_valid & LA_SIZE);
2599 size = attr->la_size;
2603 /* ll_do_div64(a, b) returns a % b, and a = a / b */
2604 ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2605 stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2607 size = size * lo->ldo_stripe_size;
2608 offs = attr->la_size;
2609 size += ll_do_div64(offs, lo->ldo_stripe_size);
2611 attr->la_valid = LA_SIZE;
2612 attr->la_size = size;
2614 rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2620 * Create declaration of striped object
2622 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2623 struct lu_attr *attr,
2624 const struct lu_buf *lovea, struct thandle *th)
2626 struct lod_thread_info *info = lod_env_info(env);
2627 struct dt_object *next = dt_object_child(dt);
2628 struct lod_object *lo = lod_dt_obj(dt);
2632 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2633 /* failed to create striping, let's reset
2634 * config so that others don't get confused */
2635 lod_object_free_striping(env, lo);
2636 GOTO(out, rc = -ENOMEM);
2639 if (!dt_object_remote(next)) {
2640 /* choose OST and generate appropriate objects */
2641 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2643 /* failed to create striping, let's reset
2644 * config so that others don't get confused */
2645 lod_object_free_striping(env, lo);
2650 * declare storage for striping data
2652 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2653 lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
2655 /* LOD can not choose OST objects for remote objects, i.e.
2656 * stripes must be ready before that. Right now, it can only
2657 * happen during migrate, i.e. migrate process needs to create
2658 * remote regular file (mdd_migrate_create), then the migrate
2659 * process will provide stripeEA. */
2660 LASSERT(lovea != NULL);
2661 info->lti_buf = *lovea;
2664 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2665 XATTR_NAME_LOV, 0, th);
2670 * if striping is created with local object's size > 0,
2671 * we have to propagate this size to specific object
2672 * the case is possible only when local object was created previously
2674 if (dt_object_exists(next))
2675 rc = lod_declare_init_size(env, dt, th);
2681 static int lod_declare_object_create(const struct lu_env *env,
2682 struct dt_object *dt,
2683 struct lu_attr *attr,
2684 struct dt_allocation_hint *hint,
2685 struct dt_object_format *dof,
2688 struct dt_object *next = dt_object_child(dt);
2689 struct lod_object *lo = lod_dt_obj(dt);
2698 * first of all, we declare creation of local object
2700 rc = dt_declare_create(env, next, attr, hint, dof, th);
2704 if (dof->dof_type == DFT_SYM)
2705 dt->do_body_ops = &lod_body_lnk_ops;
2708 * it's lod_ah_init() who has decided the object will striped
2710 if (dof->dof_type == DFT_REGULAR) {
2711 /* callers don't want stripes */
2712 /* XXX: all tricky interactions with ->ah_make_hint() decided
2713 * to use striping, then ->declare_create() behaving differently
2714 * should be cleaned */
2715 if (dof->u.dof_reg.striped == 0)
2716 lo->ldo_stripenr = 0;
2717 if (lo->ldo_stripenr > 0)
2718 rc = lod_declare_striped_object(env, dt, attr,
2720 } else if (dof->dof_type == DFT_DIR) {
2721 /* Orphan object (like migrating object) does not have
2722 * lod_dir_stripe, see lod_ah_init */
2723 if (lo->ldo_dir_stripe != NULL)
2724 rc = lod_declare_dir_striping_create(env, dt, attr,
2731 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2732 struct lu_attr *attr, struct dt_object_format *dof,
2735 struct lod_object *lo = lod_dt_obj(dt);
2739 LASSERT(lo->ldo_striping_cached == 0);
2741 /* create all underlying objects */
2742 for (i = 0; i < lo->ldo_stripenr; i++) {
2743 LASSERT(lo->ldo_stripe[i]);
2744 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2750 rc = lod_generate_and_set_lovea(env, lo, th);
2755 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2756 struct lu_attr *attr,
2757 struct dt_allocation_hint *hint,
2758 struct dt_object_format *dof, struct thandle *th)
2760 struct dt_object *next = dt_object_child(dt);
2761 struct lod_object *lo = lod_dt_obj(dt);
2765 /* create local object */
2766 rc = dt_create(env, next, attr, hint, dof, th);
2770 if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2771 lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2772 rc = lod_striping_create(env, dt, attr, dof, th);
2777 static int lod_declare_object_destroy(const struct lu_env *env,
2778 struct dt_object *dt,
2781 struct dt_object *next = dt_object_child(dt);
2782 struct lod_object *lo = lod_dt_obj(dt);
2783 struct lod_thread_info *info = lod_env_info(env);
2784 char *stripe_name = info->lti_key;
2789 * load striping information, notice we don't do this when object
2790 * is being initialized as we don't need this information till
2791 * few specific cases like destroy, chown
2793 rc = lod_load_striping(env, lo);
2797 /* declare destroy for all underlying objects */
2798 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2799 rc = next->do_ops->do_index_try(env, next,
2800 &dt_directory_features);
2804 for (i = 0; i < lo->ldo_stripenr; i++) {
2805 rc = dt_declare_ref_del(env, next, th);
2808 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2809 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2811 rc = dt_declare_delete(env, next,
2812 (const struct dt_key *)stripe_name, th);
2818 * we declare destroy for the local object
2820 rc = dt_declare_destroy(env, next, th);
2824 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2827 /* declare destroy all striped objects */
2828 for (i = 0; i < lo->ldo_stripenr; i++) {
2829 if (likely(lo->ldo_stripe[i] != NULL)) {
2830 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2839 static int lod_object_destroy(const struct lu_env *env,
2840 struct dt_object *dt, struct thandle *th)
2842 struct dt_object *next = dt_object_child(dt);
2843 struct lod_object *lo = lod_dt_obj(dt);
2844 struct lod_thread_info *info = lod_env_info(env);
2845 char *stripe_name = info->lti_key;
2849 /* destroy sub-stripe of master object */
2850 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2851 rc = next->do_ops->do_index_try(env, next,
2852 &dt_directory_features);
2856 for (i = 0; i < lo->ldo_stripenr; i++) {
2857 rc = dt_ref_del(env, next, th);
2861 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2862 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2865 CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
2866 PFID(lu_object_fid(&dt->do_lu)), stripe_name,
2867 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
2869 rc = dt_delete(env, next,
2870 (const struct dt_key *)stripe_name,
2876 rc = dt_destroy(env, next, th);
2880 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2883 /* destroy all striped objects */
2884 for (i = 0; i < lo->ldo_stripenr; i++) {
2885 if (likely(lo->ldo_stripe[i] != NULL) &&
2886 (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
2887 i == cfs_fail_val)) {
2888 rc = dt_destroy(env, lo->ldo_stripe[i], th);
2897 static int lod_declare_ref_add(const struct lu_env *env,
2898 struct dt_object *dt, struct thandle *th)
2900 return dt_declare_ref_add(env, dt_object_child(dt), th);
2903 static int lod_ref_add(const struct lu_env *env,
2904 struct dt_object *dt, struct thandle *th)
2906 return dt_ref_add(env, dt_object_child(dt), th);
2909 static int lod_declare_ref_del(const struct lu_env *env,
2910 struct dt_object *dt, struct thandle *th)
2912 return dt_declare_ref_del(env, dt_object_child(dt), th);
2915 static int lod_ref_del(const struct lu_env *env,
2916 struct dt_object *dt, struct thandle *th)
2918 return dt_ref_del(env, dt_object_child(dt), th);
2921 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2922 struct dt_object *dt,
2923 struct lustre_capa *old, __u64 opc)
2925 return dt_capa_get(env, dt_object_child(dt), old, opc);
2928 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
2929 __u64 start, __u64 end)
2931 return dt_object_sync(env, dt_object_child(dt), start, end);
2934 struct lod_slave_locks {
2936 struct lustre_handle lsl_handle[0];
2939 static int lod_object_unlock_internal(const struct lu_env *env,
2940 struct dt_object *dt,
2941 struct ldlm_enqueue_info *einfo,
2942 ldlm_policy_data_t *policy)
2944 struct lod_object *lo = lod_dt_obj(dt);
2945 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2950 if (slave_locks == NULL)
2953 for (i = 1; i < slave_locks->lsl_lock_count; i++) {
2954 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2957 einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2958 rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2961 rc = rc == 0 ? rc1 : rc;
2968 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2969 struct ldlm_enqueue_info *einfo,
2970 union ldlm_policy_data *policy)
2972 struct lod_object *lo = lod_dt_obj(dt);
2973 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2974 int slave_locks_size;
2978 if (slave_locks == NULL)
2981 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2984 rc = lod_load_striping(env, lo);
2988 /* Note: for remote lock for single stripe dir, MDT will cancel
2989 * the lock by lockh directly */
2990 if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
2993 /* Only cancel slave lock for striped dir */
2994 rc = lod_object_unlock_internal(env, dt, einfo, policy);
2996 slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2997 sizeof(slave_locks->lsl_handle[0]);
2998 OBD_FREE(slave_locks, slave_locks_size);
2999 einfo->ei_cbdata = NULL;
3004 static int lod_object_lock(const struct lu_env *env,
3005 struct dt_object *dt,
3006 struct lustre_handle *lh,
3007 struct ldlm_enqueue_info *einfo,
3008 union ldlm_policy_data *policy)
3010 struct lod_object *lo = lod_dt_obj(dt);
3013 int slave_locks_size;
3014 struct lod_slave_locks *slave_locks = NULL;
3017 /* remote object lock */
3018 if (!einfo->ei_enq_slave) {
3019 LASSERT(dt_object_remote(dt));
3020 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
3024 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3027 rc = lod_load_striping(env, lo);
3032 if (lo->ldo_stripenr <= 1)
3035 slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
3036 sizeof(slave_locks->lsl_handle[0]);
3037 /* Freed in lod_object_unlock */
3038 OBD_ALLOC(slave_locks, slave_locks_size);
3039 if (slave_locks == NULL)
3041 slave_locks->lsl_lock_count = lo->ldo_stripenr;
3043 /* striped directory lock */
3044 for (i = 1; i < lo->ldo_stripenr; i++) {
3045 struct lustre_handle lockh;
3046 struct ldlm_res_id *res_id;
3048 res_id = &lod_env_info(env)->lti_res_id;
3049 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3051 einfo->ei_res_id = res_id;
3053 LASSERT(lo->ldo_stripe[i]);
3054 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3058 slave_locks->lsl_handle[i] = lockh;
3061 einfo->ei_cbdata = slave_locks;
3064 if (rc != 0 && slave_locks != NULL) {
3065 einfo->ei_cbdata = slave_locks;
3066 lod_object_unlock_internal(env, dt, einfo, policy);
3067 OBD_FREE(slave_locks, slave_locks_size);
3068 einfo->ei_cbdata = NULL;
3074 struct dt_object_operations lod_obj_ops = {
3075 .do_read_lock = lod_object_read_lock,
3076 .do_write_lock = lod_object_write_lock,
3077 .do_read_unlock = lod_object_read_unlock,
3078 .do_write_unlock = lod_object_write_unlock,
3079 .do_write_locked = lod_object_write_locked,
3080 .do_attr_get = lod_attr_get,
3081 .do_declare_attr_set = lod_declare_attr_set,
3082 .do_attr_set = lod_attr_set,
3083 .do_xattr_get = lod_xattr_get,
3084 .do_declare_xattr_set = lod_declare_xattr_set,
3085 .do_xattr_set = lod_xattr_set,
3086 .do_declare_xattr_del = lod_declare_xattr_del,
3087 .do_xattr_del = lod_xattr_del,
3088 .do_xattr_list = lod_xattr_list,
3089 .do_ah_init = lod_ah_init,
3090 .do_declare_create = lod_declare_object_create,
3091 .do_create = lod_object_create,
3092 .do_declare_destroy = lod_declare_object_destroy,
3093 .do_destroy = lod_object_destroy,
3094 .do_index_try = lod_index_try,
3095 .do_declare_ref_add = lod_declare_ref_add,
3096 .do_ref_add = lod_ref_add,
3097 .do_declare_ref_del = lod_declare_ref_del,
3098 .do_ref_del = lod_ref_del,
3099 .do_capa_get = lod_capa_get,
3100 .do_object_sync = lod_object_sync,
3101 .do_object_lock = lod_object_lock,
3102 .do_object_unlock = lod_object_unlock,
3105 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3106 struct lu_buf *buf, loff_t *pos,
3107 struct lustre_capa *capa)
3109 struct dt_object *next = dt_object_child(dt);
3110 return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3113 static ssize_t lod_declare_write(const struct lu_env *env,
3114 struct dt_object *dt,
3115 const struct lu_buf *buf, loff_t pos,
3118 return dt_declare_record_write(env, dt_object_child(dt),
3122 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3123 const struct lu_buf *buf, loff_t *pos,
3124 struct thandle *th, struct lustre_capa *capa, int iq)
3126 struct dt_object *next = dt_object_child(dt);
3128 return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3131 static const struct dt_body_operations lod_body_lnk_ops = {
3132 .dbo_read = lod_read,
3133 .dbo_declare_write = lod_declare_write,
3134 .dbo_write = lod_write
3137 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3138 const struct lu_object_conf *conf)
3140 struct lod_device *lod = lu2lod_dev(lo->lo_dev);
3141 struct lu_device *cdev = NULL;
3142 struct lu_object *cobj;
3143 struct lod_tgt_descs *ltd = NULL;
3144 struct lod_tgt_desc *tgt;
3146 int type = LU_SEQ_RANGE_ANY;
3150 rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3154 if (type == LU_SEQ_RANGE_MDT &&
3155 idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3156 cdev = &lod->lod_child->dd_lu_dev;
3157 } else if (type == LU_SEQ_RANGE_MDT) {
3158 ltd = &lod->lod_mdt_descs;
3160 } else if (type == LU_SEQ_RANGE_OST) {
3161 ltd = &lod->lod_ost_descs;
3168 if (ltd->ltd_tgts_size > idx &&
3169 cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3170 tgt = LTD_TGT(ltd, idx);
3172 LASSERT(tgt != NULL);
3173 LASSERT(tgt->ltd_tgt != NULL);
3175 cdev = &(tgt->ltd_tgt->dd_lu_dev);
3177 lod_putref(lod, ltd);
3180 if (unlikely(cdev == NULL))
3183 cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3184 if (unlikely(cobj == NULL))
3187 lu_object_add(lo, cobj);
3192 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3196 if (lo->ldo_dir_stripe != NULL) {
3197 OBD_FREE_PTR(lo->ldo_dir_stripe);
3198 lo->ldo_dir_stripe = NULL;
3201 if (lo->ldo_stripe) {
3202 LASSERT(lo->ldo_stripes_allocated > 0);
3204 for (i = 0; i < lo->ldo_stripenr; i++) {
3205 if (lo->ldo_stripe[i])
3206 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3209 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3210 OBD_FREE(lo->ldo_stripe, i);
3211 lo->ldo_stripe = NULL;
3212 lo->ldo_stripes_allocated = 0;
3214 lo->ldo_stripenr = 0;
3215 lo->ldo_pattern = 0;
3219 * ->start is called once all slices are initialized, including header's
3220 * cache for mode (object type). using the type we can initialize ops
3222 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3224 if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3225 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3229 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3231 struct lod_object *mo = lu2lod_obj(o);
3234 * release all underlying object pinned
3237 lod_object_free_striping(env, mo);
3239 lod_object_set_pool(mo, NULL);
3242 OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3245 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3247 /* XXX: shouldn't we release everything here in case if object
3248 * creation failed before? */
3251 static int lod_object_print(const struct lu_env *env, void *cookie,
3252 lu_printer_t p, const struct lu_object *l)
3254 struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3256 return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3259 struct lu_object_operations lod_lu_obj_ops = {
3260 .loo_object_init = lod_object_init,
3261 .loo_object_start = lod_object_start,
3262 .loo_object_free = lod_object_free,
3263 .loo_object_release = lod_object_release,
3264 .loo_object_print = lod_object_print,