4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2013, Intel Corporation.
29 * lustre/lod/lod_object.c
31 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
48 #include "lod_internal.h"
50 static const char dot[] = ".";
51 static const char dotdot[] = "..";
53 extern struct kmem_cache *lod_object_kmem;
54 static const struct dt_body_operations lod_body_lnk_ops;
56 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
57 struct dt_rec *rec, const struct dt_key *key,
58 struct lustre_capa *capa)
60 struct dt_object *next = dt_object_child(dt);
61 return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
64 static int lod_declare_index_insert(const struct lu_env *env,
66 const struct dt_rec *rec,
67 const struct dt_key *key,
68 struct thandle *handle)
70 return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
73 static int lod_index_insert(const struct lu_env *env,
75 const struct dt_rec *rec,
76 const struct dt_key *key,
78 struct lustre_capa *capa,
81 return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
84 static int lod_declare_index_delete(const struct lu_env *env,
86 const struct dt_key *key,
89 return dt_declare_delete(env, dt_object_child(dt), key, th);
92 static int lod_index_delete(const struct lu_env *env,
94 const struct dt_key *key,
96 struct lustre_capa *capa)
98 return dt_delete(env, dt_object_child(dt), key, th, capa);
101 static struct dt_it *lod_it_init(const struct lu_env *env,
102 struct dt_object *dt, __u32 attr,
103 struct lustre_capa *capa)
105 struct dt_object *next = dt_object_child(dt);
106 struct lod_it *it = &lod_env_info(env)->lti_it;
107 struct dt_it *it_next;
110 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
114 /* currently we do not use more than one iterator per thread
115 * so we store it in thread info. if at some point we need
116 * more active iterators in a single thread, we can allocate
118 LASSERT(it->lit_obj == NULL);
120 it->lit_it = it_next;
123 return (struct dt_it *)it;
126 #define LOD_CHECK_IT(env, it) \
128 LASSERT((it)->lit_obj != NULL); \
129 LASSERT((it)->lit_it != NULL); \
132 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
134 struct lod_it *it = (struct lod_it *)di;
136 LOD_CHECK_IT(env, it);
137 it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
139 /* the iterator not in use any more */
144 int lod_it_get(const struct lu_env *env, struct dt_it *di,
145 const struct dt_key *key)
147 const struct lod_it *it = (const struct lod_it *)di;
149 LOD_CHECK_IT(env, it);
150 return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
153 void lod_it_put(const struct lu_env *env, struct dt_it *di)
155 struct lod_it *it = (struct lod_it *)di;
157 LOD_CHECK_IT(env, it);
158 return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
161 int lod_it_next(const struct lu_env *env, struct dt_it *di)
163 struct lod_it *it = (struct lod_it *)di;
165 LOD_CHECK_IT(env, it);
166 return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
169 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
171 const struct lod_it *it = (const struct lod_it *)di;
173 LOD_CHECK_IT(env, it);
174 return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
177 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
179 struct lod_it *it = (struct lod_it *)di;
181 LOD_CHECK_IT(env, it);
182 return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
185 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
186 struct dt_rec *rec, __u32 attr)
188 const struct lod_it *it = (const struct lod_it *)di;
190 LOD_CHECK_IT(env, it);
191 return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
195 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
198 const struct lod_it *it = (const struct lod_it *)di;
200 LOD_CHECK_IT(env, it);
201 return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
205 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
207 const struct lod_it *it = (const struct lod_it *)di;
209 LOD_CHECK_IT(env, it);
210 return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
213 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
215 const struct lod_it *it = (const struct lod_it *)di;
217 LOD_CHECK_IT(env, it);
218 return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
221 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
224 const struct lod_it *it = (const struct lod_it *)di;
226 LOD_CHECK_IT(env, it);
227 return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
231 static struct dt_index_operations lod_index_ops = {
232 .dio_lookup = lod_index_lookup,
233 .dio_declare_insert = lod_declare_index_insert,
234 .dio_insert = lod_index_insert,
235 .dio_declare_delete = lod_declare_index_delete,
236 .dio_delete = lod_index_delete,
244 .key_size = lod_it_key_size,
246 .rec_size = lod_it_rec_size,
247 .store = lod_it_store,
249 .key_rec = lod_it_key_rec,
254 * Implementation of dt_index_operations:: dio_it.init
256 * This function is to initialize the iterator for striped directory,
257 * basically these lod_striped_it_xxx will just locate the stripe
258 * and call the correspondent api of its next lower layer.
260 * \param[in] env execution environment.
261 * \param[in] dt the striped directory object to be iterated.
262 * \param[in] attr the attribute of iterator, mostly used to indicate
263 * the entry attribute in the object to be iterated.
264 * \param[in] capa capability(useless in current implementation)
266 * \retval initialized iterator(dt_it) if successful initialize the
267 * iteration. lit_stripe_index will be used to indicate the
268 * current iterate position among stripes.
269 * \retval ERR pointer if initialization is failed.
271 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
272 struct dt_object *dt, __u32 attr,
273 struct lustre_capa *capa)
275 struct lod_object *lo = lod_dt_obj(dt);
276 struct dt_object *next;
277 struct lod_it *it = &lod_env_info(env)->lti_it;
278 struct dt_it *it_next;
281 LASSERT(lo->ldo_stripenr > 0);
282 next = lo->ldo_stripe[0];
283 LASSERT(next != NULL);
284 LASSERT(next->do_index_ops != NULL);
286 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
290 /* currently we do not use more than one iterator per thread
291 * so we store it in thread info. if at some point we need
292 * more active iterators in a single thread, we can allocate
294 LASSERT(it->lit_obj == NULL);
296 it->lit_stripe_index = 0;
298 it->lit_it = it_next;
301 return (struct dt_it *)it;
304 #define LOD_CHECK_STRIPED_IT(env, it, lo) \
306 LASSERT((it)->lit_obj != NULL); \
307 LASSERT((it)->lit_it != NULL); \
308 LASSERT((lo)->ldo_stripenr > 0); \
309 LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \
313 * Implementation of dt_index_operations:: dio_it.fini
315 * This function is to finish the iterator for striped directory.
317 * \param[in] env execution environment.
318 * \param[in] di the iterator for the striped directory
321 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
323 struct lod_it *it = (struct lod_it *)di;
324 struct lod_object *lo = lod_dt_obj(it->lit_obj);
325 struct dt_object *next;
327 LOD_CHECK_STRIPED_IT(env, it, lo);
329 next = lo->ldo_stripe[it->lit_stripe_index];
330 LASSERT(next != NULL);
331 LASSERT(next->do_index_ops != NULL);
333 next->do_index_ops->dio_it.fini(env, it->lit_it);
335 /* the iterator not in use any more */
338 it->lit_stripe_index = 0;
342 * Implementation of dt_index_operations:: dio_it.get
344 * This function is to position the iterator with given key
346 * \param[in] env execution environment.
347 * \param[in] di the iterator for striped directory.
348 * \param[in] key the key the iterator will be positioned.
350 * \retval 0 if successfully position iterator by the key.
351 * \retval negative error if position is failed.
353 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
354 const struct dt_key *key)
356 const struct lod_it *it = (const struct lod_it *)di;
357 struct lod_object *lo = lod_dt_obj(it->lit_obj);
358 struct dt_object *next;
361 LOD_CHECK_STRIPED_IT(env, it, lo);
363 next = lo->ldo_stripe[it->lit_stripe_index];
364 LASSERT(next != NULL);
365 LASSERT(next->do_index_ops != NULL);
367 return next->do_index_ops->dio_it.get(env, it->lit_it, key);
371 * Implementation of dt_index_operations:: dio_it.put
373 * This function is supposed to be the pair of it_get, but currently do
374 * nothing. see (osd_it_ea_put or osd_index_it_put)
376 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
378 struct lod_it *it = (struct lod_it *)di;
379 struct lod_object *lo = lod_dt_obj(it->lit_obj);
380 struct dt_object *next;
382 LOD_CHECK_STRIPED_IT(env, it, lo);
384 next = lo->ldo_stripe[it->lit_stripe_index];
385 LASSERT(next != NULL);
386 LASSERT(next->do_index_ops != NULL);
388 return next->do_index_ops->dio_it.put(env, it->lit_it);
392 * Implementation of dt_index_operations:: dio_it.next
394 * This function is to position the iterator to the next entry, if current
395 * stripe is finished by checking the return value of next() in current
396 * stripe. it will go to next stripe. In the mean time, the sub-iterator
397 * for next stripe needs to be initialized.
399 * \param[in] env execution environment.
400 * \param[in] di the iterator for striped directory.
402 * \retval 0 if successfully position iterator to the next entry.
403 * \retval negative error if position is failed.
405 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
407 struct lod_it *it = (struct lod_it *)di;
408 struct lod_object *lo = lod_dt_obj(it->lit_obj);
409 struct dt_object *next;
410 struct dt_it *it_next;
414 LOD_CHECK_STRIPED_IT(env, it, lo);
416 next = lo->ldo_stripe[it->lit_stripe_index];
417 LASSERT(next != NULL);
418 LASSERT(next->do_index_ops != NULL);
420 rc = next->do_index_ops->dio_it.next(env, it->lit_it);
424 if (rc == 0 && it->lit_stripe_index == 0)
427 if (rc == 0 && it->lit_stripe_index > 0) {
428 struct lu_dirent *ent;
430 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
432 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
433 (struct dt_rec *)ent,
438 /* skip . and .. for slave stripe */
439 if ((strncmp(ent->lde_name, ".",
440 le16_to_cpu(ent->lde_namelen)) == 0 &&
441 le16_to_cpu(ent->lde_namelen) == 1) ||
442 (strncmp(ent->lde_name, "..",
443 le16_to_cpu(ent->lde_namelen)) == 0 &&
444 le16_to_cpu(ent->lde_namelen) == 2))
450 /* go to next stripe */
451 if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
454 it->lit_stripe_index++;
456 next->do_index_ops->dio_it.put(env, it->lit_it);
457 next->do_index_ops->dio_it.fini(env, it->lit_it);
459 rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
463 next = lo->ldo_stripe[it->lit_stripe_index];
464 LASSERT(next != NULL);
465 LASSERT(next->do_index_ops != NULL);
467 it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
469 if (!IS_ERR(it_next)) {
470 it->lit_it = it_next;
473 rc = PTR_ERR(it_next);
480 * Implementation of dt_index_operations:: dio_it.key
482 * This function is to get the key of the iterator at current position.
484 * \param[in] env execution environment.
485 * \param[in] di the iterator for striped directory.
487 * \retval key(dt_key) if successfully get the key.
488 * \retval negative error if can not get the key.
490 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
491 const struct dt_it *di)
493 const struct lod_it *it = (const struct lod_it *)di;
494 struct lod_object *lo = lod_dt_obj(it->lit_obj);
495 struct dt_object *next;
497 LOD_CHECK_STRIPED_IT(env, it, lo);
499 next = lo->ldo_stripe[it->lit_stripe_index];
500 LASSERT(next != NULL);
501 LASSERT(next->do_index_ops != NULL);
503 return next->do_index_ops->dio_it.key(env, it->lit_it);
507 * Implementation of dt_index_operations:: dio_it.key_size
509 * This function is to get the key_size of current key.
511 * \param[in] env execution environment.
512 * \param[in] di the iterator for striped directory.
514 * \retval key_size if successfully get the key_size.
515 * \retval negative error if can not get the key_size.
517 static int lod_striped_it_key_size(const struct lu_env *env,
518 const struct dt_it *di)
520 struct lod_it *it = (struct lod_it *)di;
521 struct lod_object *lo = lod_dt_obj(it->lit_obj);
522 struct dt_object *next;
524 LOD_CHECK_STRIPED_IT(env, it, lo);
526 next = lo->ldo_stripe[it->lit_stripe_index];
527 LASSERT(next != NULL);
528 LASSERT(next->do_index_ops != NULL);
530 return next->do_index_ops->dio_it.key_size(env, it->lit_it);
534 * Implementation of dt_index_operations:: dio_it.rec
536 * This function is to get the record at current position.
538 * \param[in] env execution environment.
539 * \param[in] di the iterator for striped directory.
540 * \param[in] attr the attribute of iterator, mostly used to indicate
541 * the entry attribute in the object to be iterated.
542 * \param[out] rec hold the return record.
544 * \retval 0 if successfully get the entry.
545 * \retval negative error if can not get entry.
547 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
548 struct dt_rec *rec, __u32 attr)
550 const struct lod_it *it = (const struct lod_it *)di;
551 struct lod_object *lo = lod_dt_obj(it->lit_obj);
552 struct dt_object *next;
554 LOD_CHECK_STRIPED_IT(env, it, lo);
556 next = lo->ldo_stripe[it->lit_stripe_index];
557 LASSERT(next != NULL);
558 LASSERT(next->do_index_ops != NULL);
560 return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
564 * Implementation of dt_index_operations:: dio_it.rec_size
566 * This function is to get the record_size at current record.
568 * \param[in] env execution environment.
569 * \param[in] di the iterator for striped directory.
570 * \param[in] attr the attribute of iterator, mostly used to indicate
571 * the entry attribute in the object to be iterated.
573 * \retval rec_size if successfully get the entry size.
574 * \retval negative error if can not get entry size.
576 static int lod_striped_it_rec_size(const struct lu_env *env,
577 const struct dt_it *di, __u32 attr)
579 struct lod_it *it = (struct lod_it *)di;
580 struct lod_object *lo = lod_dt_obj(it->lit_obj);
581 struct dt_object *next;
583 LOD_CHECK_STRIPED_IT(env, it, lo);
585 next = lo->ldo_stripe[it->lit_stripe_index];
586 LASSERT(next != NULL);
587 LASSERT(next->do_index_ops != NULL);
589 return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
593 * Implementation of dt_index_operations:: dio_it.store
595 * This function will a cookie for current position of the iterator head,
596 * so that user can use this cookie to load/start the iterator next time.
598 * \param[in] env execution environment.
599 * \param[in] di the iterator for striped directory.
601 * \retval the cookie.
603 static __u64 lod_striped_it_store(const struct lu_env *env,
604 const struct dt_it *di)
606 const struct lod_it *it = (const struct lod_it *)di;
607 struct lod_object *lo = lod_dt_obj(it->lit_obj);
608 struct dt_object *next;
610 LOD_CHECK_STRIPED_IT(env, it, lo);
612 next = lo->ldo_stripe[it->lit_stripe_index];
613 LASSERT(next != NULL);
614 LASSERT(next->do_index_ops != NULL);
616 return next->do_index_ops->dio_it.store(env, it->lit_it);
620 * Implementation of dt_index_operations:: dio_it.load
622 * This function will position the iterator with the given hash(usually
625 * \param[in] env execution environment.
626 * \param[in] di the iterator for striped directory.
627 * \param[in] hash the given hash.
629 * \retval >0 if successfuly load the iterator to the given position.
630 * \retval <0 if load is failed.
632 static int lod_striped_it_load(const struct lu_env *env,
633 const struct dt_it *di, __u64 hash)
635 const struct lod_it *it = (const struct lod_it *)di;
636 struct lod_object *lo = lod_dt_obj(it->lit_obj);
637 struct dt_object *next;
639 LOD_CHECK_STRIPED_IT(env, it, lo);
641 next = lo->ldo_stripe[it->lit_stripe_index];
642 LASSERT(next != NULL);
643 LASSERT(next->do_index_ops != NULL);
645 return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
648 static struct dt_index_operations lod_striped_index_ops = {
649 .dio_lookup = lod_index_lookup,
650 .dio_declare_insert = lod_declare_index_insert,
651 .dio_insert = lod_index_insert,
652 .dio_declare_delete = lod_declare_index_delete,
653 .dio_delete = lod_index_delete,
655 .init = lod_striped_it_init,
656 .fini = lod_striped_it_fini,
657 .get = lod_striped_it_get,
658 .put = lod_striped_it_put,
659 .next = lod_striped_it_next,
660 .key = lod_striped_it_key,
661 .key_size = lod_striped_it_key_size,
662 .rec = lod_striped_it_rec,
663 .rec_size = lod_striped_it_rec_size,
664 .store = lod_striped_it_store,
665 .load = lod_striped_it_load,
670 * Implementation of dt_object_operations:: do_index_try
672 * This function will try to initialize the index api pointer for the
673 * given object, usually it the entry point of the index api. i.e.
674 * the index object should be initialized in index_try, then start
675 * using index api. For striped directory, it will try to initialize
676 * all of its sub_stripes.
678 * \param[in] env execution environment.
679 * \param[in] dt the index object to be initialized.
680 * \param[in] feat the features of this object, for example fixed or
681 * variable key size etc.
683 * \retval >0 if the initialization is successful.
684 * \retval <0 if the initialization is failed.
686 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
687 const struct dt_index_features *feat)
689 struct lod_object *lo = lod_dt_obj(dt);
690 struct dt_object *next = dt_object_child(dt);
694 LASSERT(next->do_ops);
695 LASSERT(next->do_ops->do_index_try);
697 rc = lod_load_striping_locked(env, lo);
701 rc = next->do_ops->do_index_try(env, next, feat);
705 if (lo->ldo_stripenr > 0) {
708 for (i = 0; i < lo->ldo_stripenr; i++) {
709 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
711 rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
712 lo->ldo_stripe[i], feat);
716 dt->do_index_ops = &lod_striped_index_ops;
718 dt->do_index_ops = &lod_index_ops;
724 static void lod_object_read_lock(const struct lu_env *env,
725 struct dt_object *dt, unsigned role)
727 dt_read_lock(env, dt_object_child(dt), role);
730 static void lod_object_write_lock(const struct lu_env *env,
731 struct dt_object *dt, unsigned role)
733 dt_write_lock(env, dt_object_child(dt), role);
736 static void lod_object_read_unlock(const struct lu_env *env,
737 struct dt_object *dt)
739 dt_read_unlock(env, dt_object_child(dt));
742 static void lod_object_write_unlock(const struct lu_env *env,
743 struct dt_object *dt)
745 dt_write_unlock(env, dt_object_child(dt));
748 static int lod_object_write_locked(const struct lu_env *env,
749 struct dt_object *dt)
751 return dt_write_locked(env, dt_object_child(dt));
754 static int lod_attr_get(const struct lu_env *env,
755 struct dt_object *dt,
756 struct lu_attr *attr,
757 struct lustre_capa *capa)
759 struct lod_object *lo = lod_dt_obj(dt);
764 rc = dt_attr_get(env, dt_object_child(dt), attr, capa);
765 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr) || rc != 0)
768 rc = lod_load_striping_locked(env, lo);
772 if (lo->ldo_stripenr == 0)
777 for (i = 0; i < lo->ldo_stripenr; i++) {
778 struct lu_attr *sub_attr = &lod_env_info(env)->lti_attr;
780 LASSERT(lo->ldo_stripe[i]);
781 if (dt_object_exists(lo->ldo_stripe[i]))
784 rc = dt_attr_get(env, lo->ldo_stripe[i], sub_attr, capa);
788 /* -2 for . and .. on each stripe */
789 if (sub_attr->la_valid & LA_NLINK && attr->la_valid & LA_NLINK)
790 attr->la_nlink += sub_attr->la_nlink - 2;
791 if (sub_attr->la_valid & LA_SIZE && attr->la_valid & LA_SIZE)
792 attr->la_size += sub_attr->la_size;
794 if (sub_attr->la_valid & LA_ATIME &&
795 attr->la_valid & LA_ATIME &&
796 attr->la_atime < sub_attr->la_atime)
797 attr->la_atime = sub_attr->la_atime;
799 if (sub_attr->la_valid & LA_CTIME &&
800 attr->la_valid & LA_CTIME &&
801 attr->la_ctime < sub_attr->la_ctime)
802 attr->la_ctime = sub_attr->la_ctime;
804 if (sub_attr->la_valid & LA_MTIME &&
805 attr->la_valid & LA_MTIME &&
806 attr->la_mtime < sub_attr->la_mtime)
807 attr->la_mtime = sub_attr->la_mtime;
810 CDEBUG(D_INFO, DFID" stripe_count %d nlink %u size "LPU64"\n",
811 PFID(lu_object_fid(&dt->do_lu)), lo->ldo_stripenr,
812 attr->la_nlink, attr->la_size);
818 * Mark all of sub-stripes dead of the striped directory.
820 static int lod_mark_dead_object(const struct lu_env *env,
821 struct dt_object *dt,
822 struct thandle *handle,
825 struct lod_object *lo = lod_dt_obj(dt);
826 struct lmv_mds_md_v1 *lmv;
827 __u32 dead_hash_type;
833 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
836 rc = lod_load_striping_locked(env, lo);
840 if (lo->ldo_stripenr == 0)
843 rc = lod_get_lmv_ea(env, lo);
847 lmv = lod_env_info(env)->lti_ea_store;
848 lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
849 dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
850 lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
851 for (i = 0; i < lo->ldo_stripenr; i++) {
854 lmv->lmv_master_mdt_index = i;
856 buf.lb_len = sizeof(*lmv);
858 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
860 LU_XATTR_REPLACE, handle);
862 rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
863 XATTR_NAME_LMV, LU_XATTR_REPLACE,
864 handle, BYPASS_CAPA);
873 static int lod_declare_attr_set(const struct lu_env *env,
874 struct dt_object *dt,
875 const struct lu_attr *attr,
876 struct thandle *handle)
878 struct dt_object *next = dt_object_child(dt);
879 struct lod_object *lo = lod_dt_obj(dt);
883 /* Set dead object on all other stripes */
884 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
885 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
886 rc = lod_mark_dead_object(env, dt, handle, true);
891 * declare setattr on the local object
893 rc = dt_declare_attr_set(env, next, attr, handle);
897 /* osp_declare_attr_set() ignores all attributes other than
898 * UID, GID, and size, and osp_attr_set() ignores all but UID
899 * and GID. Declaration of size attr setting happens through
900 * lod_declare_init_size(), and not through this function.
901 * Therefore we need not load striping unless ownership is
902 * changing. This should save memory and (we hope) speed up
904 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
905 if (!(attr->la_valid & (LA_UID | LA_GID)))
908 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
911 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
912 LA_ATIME | LA_MTIME | LA_CTIME)))
916 * load striping information, notice we don't do this when object
917 * is being initialized as we don't need this information till
918 * few specific cases like destroy, chown
920 rc = lod_load_striping(env, lo);
924 if (lo->ldo_stripenr == 0)
928 * if object is striped declare changes on the stripes
930 LASSERT(lo->ldo_stripe);
931 for (i = 0; i < lo->ldo_stripenr; i++) {
932 LASSERT(lo->ldo_stripe[i]);
934 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
936 CERROR("failed declaration: %d\n", rc);
941 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
942 dt_object_exists(next) != 0 &&
943 dt_object_remote(next) == 0)
944 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
946 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
947 dt_object_exists(next) &&
948 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
949 struct lod_thread_info *info = lod_env_info(env);
950 struct lu_buf *buf = &info->lti_buf;
952 buf->lb_buf = info->lti_ea_store;
953 buf->lb_len = info->lti_ea_store_size;
954 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
955 LU_XATTR_REPLACE, handle);
961 static int lod_attr_set(const struct lu_env *env,
962 struct dt_object *dt,
963 const struct lu_attr *attr,
964 struct thandle *handle,
965 struct lustre_capa *capa)
967 struct dt_object *next = dt_object_child(dt);
968 struct lod_object *lo = lod_dt_obj(dt);
972 /* Set dead object on all other stripes */
973 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
974 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
975 rc = lod_mark_dead_object(env, dt, handle, false);
980 * apply changes to the local object
982 rc = dt_attr_set(env, next, attr, handle, capa);
986 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
987 if (!(attr->la_valid & (LA_UID | LA_GID)))
990 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
993 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
994 LA_ATIME | LA_MTIME | LA_CTIME)))
998 if (lo->ldo_stripenr == 0)
1002 * if object is striped, apply changes to all the stripes
1004 LASSERT(lo->ldo_stripe);
1005 for (i = 0; i < lo->ldo_stripenr; i++) {
1006 LASSERT(lo->ldo_stripe[i]);
1007 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
1009 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
1011 CERROR("failed declaration: %d\n", rc);
1016 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1017 dt_object_exists(next) != 0 &&
1018 dt_object_remote(next) == 0)
1019 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1021 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1022 dt_object_exists(next) &&
1023 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1024 struct lod_thread_info *info = lod_env_info(env);
1025 struct lu_buf *buf = &info->lti_buf;
1026 struct ost_id *oi = &info->lti_ostid;
1027 struct lu_fid *fid = &info->lti_fid;
1028 struct lov_mds_md_v1 *lmm;
1029 struct lov_ost_data_v1 *objs;
1033 rc1 = lod_get_lov_ea(env, lo);
1037 buf->lb_buf = info->lti_ea_store;
1038 buf->lb_len = info->lti_ea_store_size;
1039 lmm = info->lti_ea_store;
1040 magic = le32_to_cpu(lmm->lmm_magic);
1041 if (magic == LOV_MAGIC_V1)
1042 objs = &(lmm->lmm_objects[0]);
1044 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1045 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1046 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1048 fid_to_ostid(fid, oi);
1049 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1050 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1051 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1057 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1058 struct lu_buf *buf, const char *name,
1059 struct lustre_capa *capa)
1061 struct lod_thread_info *info = lod_env_info(env);
1062 struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev);
1066 rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1067 if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1071 * lod returns default striping on the real root of the device
1072 * this is like the root stores default striping for the whole
1073 * filesystem. historically we've been using a different approach
1074 * and store it in the config.
1076 dt_root_get(env, dev->lod_child, &info->lti_fid);
1077 is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1079 if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1080 struct lov_user_md *lum = buf->lb_buf;
1081 struct lov_desc *desc = &dev->lod_desc;
1083 if (buf->lb_buf == NULL) {
1085 } else if (buf->lb_len >= sizeof(*lum)) {
1086 lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1087 lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1088 lmm_oi_set_id(&lum->lmm_oi, 0);
1089 lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1090 lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1091 lum->lmm_stripe_size = cpu_to_le32(
1092 desc->ld_default_stripe_size);
1093 lum->lmm_stripe_count = cpu_to_le16(
1094 desc->ld_default_stripe_count);
1095 lum->lmm_stripe_offset = cpu_to_le16(
1096 desc->ld_default_stripe_offset);
1106 static int lod_verify_md_striping(struct lod_device *lod,
1107 const struct lmv_user_md_v1 *lum)
1112 if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1113 GOTO(out, rc = -EINVAL);
1115 if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1116 GOTO(out, rc = -EINVAL);
1119 CERROR("%s: invalid lmv_user_md: magic = %x, "
1120 "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1121 lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1122 (int)le32_to_cpu(lum->lum_stripe_offset),
1123 le32_to_cpu(lum->lum_stripe_count), rc);
1128 * Master LMVEA will be same as slave LMVEA, except
1129 * 1. different magic
1130 * 2. No lmv_stripe_fids on slave
1131 * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1133 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1134 const struct lmv_mds_md_v1 *master_lmv)
1136 *slave_lmv = *master_lmv;
1137 slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1140 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1141 struct lu_buf *lmv_buf)
1143 struct lod_thread_info *info = lod_env_info(env);
1144 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1145 struct lod_object *lo = lod_dt_obj(dt);
1146 struct lmv_mds_md_v1 *lmm1;
1149 int type = LU_SEQ_RANGE_ANY;
1155 LASSERT(lo->ldo_dir_striped != 0);
1156 LASSERT(lo->ldo_stripenr > 0);
1157 stripe_count = lo->ldo_stripenr;
1158 lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
1159 if (info->lti_ea_store_size < lmm_size) {
1160 rc = lod_ea_store_resize(info, lmm_size);
1165 lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1166 lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1167 lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1168 lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1169 rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1174 lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1175 fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
1176 for (i = 0; i < lo->ldo_stripenr; i++) {
1177 struct dt_object *dto;
1179 dto = lo->ldo_stripe[i];
1180 LASSERT(dto != NULL);
1181 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
1182 lu_object_fid(&dto->do_lu));
1185 lmv_buf->lb_buf = info->lti_ea_store;
1186 lmv_buf->lb_len = lmm_size;
1187 lo->ldo_dir_striping_cached = 1;
1192 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1193 const struct lu_buf *buf)
1195 struct lod_thread_info *info = lod_env_info(env);
1196 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1197 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1198 struct dt_object **stripe;
1199 union lmv_mds_md *lmm = buf->lb_buf;
1200 struct lmv_mds_md_v1 *lmv1 = &lmm->lmv_md_v1;
1201 struct lu_fid *fid = &info->lti_fid;
1206 if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1209 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1210 lo->ldo_dir_slave_stripe = 1;
1214 if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1217 if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
1220 LASSERT(lo->ldo_stripe == NULL);
1221 OBD_ALLOC(stripe, sizeof(stripe[0]) *
1222 (le32_to_cpu(lmv1->lmv_stripe_count)));
1226 for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1227 struct dt_device *tgt_dt;
1228 struct dt_object *dto;
1229 int type = LU_SEQ_RANGE_ANY;
1232 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1233 if (!fid_is_sane(fid))
1234 GOTO(out, rc = -ESTALE);
1236 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1240 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1241 tgt_dt = lod->lod_child;
1243 struct lod_tgt_desc *tgt;
1245 tgt = LTD_TGT(ltd, idx);
1247 GOTO(out, rc = -ESTALE);
1248 tgt_dt = tgt->ltd_tgt;
1251 dto = dt_locate_at(env, tgt_dt, fid,
1252 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1255 GOTO(out, rc = PTR_ERR(dto));
1260 lo->ldo_stripe = stripe;
1261 lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1262 lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1264 lod_object_free_striping(env, lo);
1269 static int lod_prep_md_striped_create(const struct lu_env *env,
1270 struct dt_object *dt,
1271 struct lu_attr *attr,
1272 const struct lmv_user_md_v1 *lum,
1273 struct dt_object_format *dof,
1276 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1277 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1278 struct lod_object *lo = lod_dt_obj(dt);
1279 struct lod_thread_info *info = lod_env_info(env);
1280 struct dt_object **stripe;
1281 struct lu_buf lmv_buf;
1282 struct lu_buf slave_lmv_buf;
1283 struct lmv_mds_md_v1 *lmm;
1284 struct lmv_mds_md_v1 *slave_lmm = NULL;
1292 /* The lum has been verifed in lod_verify_md_striping */
1293 LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1294 LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1296 stripe_count = le32_to_cpu(lum->lum_stripe_count);
1298 /* shrink the stripe_count to the avaible MDT count */
1299 if (stripe_count > lod->lod_remote_mdt_count + 1)
1300 stripe_count = lod->lod_remote_mdt_count + 1;
1302 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1306 OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1307 if (idx_array == NULL)
1308 GOTO(out_free, rc = -ENOMEM);
1310 for (i = 0; i < stripe_count; i++) {
1311 struct lod_tgt_desc *tgt = NULL;
1312 struct dt_object *dto;
1313 struct lu_fid fid = { 0 };
1315 struct lu_object_conf conf = { 0 };
1316 struct dt_device *tgt_dt = NULL;
1319 /* Right now, master stripe and master object are
1320 * on the same MDT */
1321 idx = le32_to_cpu(lum->lum_stripe_offset);
1322 rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1326 tgt_dt = lod->lod_child;
1330 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1332 for (j = 0; j < lod->lod_remote_mdt_count;
1333 j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1334 bool already_allocated = false;
1337 CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1338 " allocated %d, last allocated %d\n", idx,
1339 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1341 /* Find next available target */
1342 if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1345 /* check whether the idx already exists
1346 * in current allocated array */
1347 for (k = 0; k < i; k++) {
1348 if (idx_array[k] == idx) {
1349 already_allocated = true;
1354 if (already_allocated)
1357 /* check the status of the OSP */
1358 tgt = LTD_TGT(ltd, idx);
1362 tgt_dt = tgt->ltd_tgt;
1363 rc = dt_statfs(env, tgt_dt, NULL);
1365 /* this OSP doesn't feel well */
1370 rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1379 /* Can not allocate more stripes */
1380 if (j == lod->lod_remote_mdt_count) {
1381 CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1382 lod2obd(lod)->obd_name, stripe_count, i - 1);
1386 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1387 " allocated %d, last allocated %d\n", idx,
1388 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1391 /* tgt_dt and fid must be ready after search avaible OSP
1392 * in the above loop */
1393 LASSERT(tgt_dt != NULL);
1394 LASSERT(fid_is_sane(&fid));
1395 conf.loc_flags = LOC_F_NEW;
1396 dto = dt_locate_at(env, tgt_dt, &fid,
1397 dt->do_lu.lo_dev->ld_site->ls_top_dev,
1400 GOTO(out_put, rc = PTR_ERR(dto));
1405 lo->ldo_dir_striped = 1;
1406 lo->ldo_stripe = stripe;
1407 lo->ldo_stripenr = i;
1408 lo->ldo_stripes_allocated = stripe_count;
1410 if (lo->ldo_stripenr == 0)
1411 GOTO(out_put, rc = -ENOSPC);
1413 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1416 lmm = lmv_buf.lb_buf;
1418 OBD_ALLOC_PTR(slave_lmm);
1419 if (slave_lmm == NULL)
1420 GOTO(out_put, rc = -ENOMEM);
1422 lod_prep_slave_lmv_md(slave_lmm, lmm);
1423 slave_lmv_buf.lb_buf = slave_lmm;
1424 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1426 if (!dt_try_as_dir(env, dt_object_child(dt)))
1427 GOTO(out_put, rc = -EINVAL);
1429 for (i = 0; i < lo->ldo_stripenr; i++) {
1430 struct dt_object *dto = stripe[i];
1431 char *stripe_name = info->lti_key;
1433 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1437 if (!dt_try_as_dir(env, dto))
1438 GOTO(out_put, rc = -EINVAL);
1440 rc = dt_declare_insert(env, dto,
1441 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1442 (const struct dt_key *)dot, th);
1446 /* master stripe FID will be put to .. */
1447 rc = dt_declare_insert(env, dto,
1448 (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1449 (const struct dt_key *)dotdot, th);
1453 /* probably nothing to inherite */
1454 if (lo->ldo_striping_cached &&
1455 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1456 lo->ldo_def_stripenr,
1457 lo->ldo_def_stripe_offset)) {
1458 struct lov_user_md_v3 *v3;
1460 /* sigh, lti_ea_store has been used for lmv_buf,
1461 * so we have to allocate buffer for default
1465 GOTO(out_put, rc = -ENOMEM);
1467 memset(v3, 0, sizeof(*v3));
1468 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1469 v3->lmm_stripe_count =
1470 cpu_to_le16(lo->ldo_def_stripenr);
1471 v3->lmm_stripe_offset =
1472 cpu_to_le16(lo->ldo_def_stripe_offset);
1473 v3->lmm_stripe_size =
1474 cpu_to_le32(lo->ldo_def_stripe_size);
1476 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1479 info->lti_buf.lb_buf = v3;
1480 info->lti_buf.lb_len = sizeof(*v3);
1481 rc = dt_declare_xattr_set(env, dto,
1490 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1491 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1492 XATTR_NAME_LMV, 0, th);
1496 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1497 PFID(lu_object_fid(&dto->do_lu)), i);
1498 rc = dt_declare_insert(env, dt_object_child(dt),
1499 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1500 (const struct dt_key *)stripe_name, th);
1504 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1509 rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1510 XATTR_NAME_LMV, 0, th);
1516 for (i = 0; i < stripe_count; i++)
1517 if (stripe[i] != NULL)
1518 lu_object_put(env, &stripe[i]->do_lu);
1519 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1520 lo->ldo_stripenr = 0;
1521 lo->ldo_stripes_allocated = 0;
1522 lo->ldo_stripe = NULL;
1526 if (idx_array != NULL)
1527 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1528 if (slave_lmm != NULL)
1529 OBD_FREE_PTR(slave_lmm);
1535 * Declare create striped md object.
1537 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1538 struct dt_object *dt,
1539 struct lu_attr *attr,
1540 const struct lu_buf *lum_buf,
1541 struct dt_object_format *dof,
1544 struct lod_object *lo = lod_dt_obj(dt);
1545 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1546 struct lmv_user_md_v1 *lum;
1550 lum = lum_buf->lb_buf;
1551 LASSERT(lum != NULL);
1553 CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1554 le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1555 (int)le32_to_cpu(lum->lum_stripe_offset));
1557 if (le32_to_cpu(lum->lum_stripe_count) == 0)
1560 rc = lod_verify_md_striping(lod, lum);
1564 /* prepare dir striped objects */
1565 rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1567 /* failed to create striping, let's reset
1568 * config so that others don't get confused */
1569 lod_object_free_striping(env, lo);
1576 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1577 struct dt_object *dt,
1578 const struct lu_buf *buf,
1579 const char *name, int fl,
1582 struct dt_object *next = dt_object_child(dt);
1583 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1584 struct lod_object *lo = lod_dt_obj(dt);
1589 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1590 struct lmv_user_md_v1 *lum;
1592 LASSERT(buf != NULL && buf->lb_buf != NULL);
1594 rc = lod_verify_md_striping(d, lum);
1599 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1603 /* set xattr to each stripes, if needed */
1604 rc = lod_load_striping(env, lo);
1608 if (lo->ldo_stripenr == 0)
1611 for (i = 0; i < lo->ldo_stripenr; i++) {
1612 LASSERT(lo->ldo_stripe[i]);
1613 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1623 * LOV xattr is a storage for striping, and LOD owns this xattr.
1624 * but LOD allows others to control striping to some extent
1625 * - to reset strping
1626 * - to set new defined striping
1627 * - to set new semi-defined striping
1628 * - number of stripes is defined
1629 * - number of stripes + osts are defined
1632 static int lod_declare_xattr_set(const struct lu_env *env,
1633 struct dt_object *dt,
1634 const struct lu_buf *buf,
1635 const char *name, int fl,
1638 struct dt_object *next = dt_object_child(dt);
1639 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
1645 * allow to declare predefined striping on a new (!mode) object
1646 * which is supposed to be replay of regular file creation
1647 * (when LOV setting is declared)
1648 * LU_XATTR_REPLACE is set to indicate a layout swap
1650 mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1651 if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1652 !(fl & LU_XATTR_REPLACE)) {
1654 * this is a request to manipulate object's striping
1656 if (dt_object_exists(dt)) {
1657 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1661 memset(attr, 0, sizeof(*attr));
1662 attr->la_valid = LA_TYPE | LA_MODE;
1663 attr->la_mode = S_IFREG;
1665 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1666 } else if (S_ISDIR(mode)) {
1667 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1669 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1675 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1677 lo->ldo_striping_cached = 0;
1678 lo->ldo_def_striping_set = 0;
1679 lod_object_set_pool(lo, NULL);
1680 lo->ldo_def_stripe_size = 0;
1681 lo->ldo_def_stripenr = 0;
1682 if (lo->ldo_dir_stripe != NULL)
1683 lo->ldo_dir_striping_cached = 0;
1686 static int lod_xattr_set_internal(const struct lu_env *env,
1687 struct dt_object *dt,
1688 const struct lu_buf *buf,
1689 const char *name, int fl, struct thandle *th,
1690 struct lustre_capa *capa)
1692 struct dt_object *next = dt_object_child(dt);
1693 struct lod_object *lo = lod_dt_obj(dt);
1698 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1699 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1702 if (lo->ldo_stripenr == 0)
1705 for (i = 0; i < lo->ldo_stripenr; i++) {
1706 LASSERT(lo->ldo_stripe[i]);
1707 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1716 static int lod_xattr_del_internal(const struct lu_env *env,
1717 struct dt_object *dt,
1718 const char *name, struct thandle *th,
1719 struct lustre_capa *capa)
1721 struct dt_object *next = dt_object_child(dt);
1722 struct lod_object *lo = lod_dt_obj(dt);
1727 rc = dt_xattr_del(env, next, name, th, capa);
1728 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1731 if (lo->ldo_stripenr == 0)
1734 for (i = 0; i < lo->ldo_stripenr; i++) {
1735 LASSERT(lo->ldo_stripe[i]);
1736 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1745 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1746 struct dt_object *dt,
1747 const struct lu_buf *buf,
1748 const char *name, int fl,
1750 struct lustre_capa *capa)
1752 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1753 struct lod_object *l = lod_dt_obj(dt);
1754 struct lov_user_md_v1 *lum;
1755 struct lov_user_md_v3 *v3 = NULL;
1759 /* If it is striped dir, we should clear the stripe cache for
1760 * slave stripe as well, but there are no effective way to
1761 * notify the LOD on the slave MDT, so we do not cache stripe
1762 * information for slave stripe for now. XXX*/
1763 lod_lov_stripe_cache_clear(l);
1764 LASSERT(buf != NULL && buf->lb_buf != NULL);
1767 rc = lod_verify_striping(d, buf, 0);
1771 if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1774 /* if { size, offset, count } = { 0, -1, 0 } and no pool
1775 * (i.e. all default values specified) then delete default
1776 * striping from dir. */
1778 "set default striping: sz %u # %u offset %d %s %s\n",
1779 (unsigned)lum->lmm_stripe_size,
1780 (unsigned)lum->lmm_stripe_count,
1781 (int)lum->lmm_stripe_offset,
1782 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1784 if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1785 (lum->lmm_stripe_count),
1786 (lum->lmm_stripe_offset)) &&
1787 lum->lmm_magic == LOV_USER_MAGIC_V1) {
1788 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1792 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1798 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1799 struct dt_object *dt,
1800 const struct lu_buf *buf,
1801 const char *name, int fl,
1803 struct lustre_capa *capa)
1805 struct lod_object *l = lod_dt_obj(dt);
1806 struct lmv_user_md_v1 *lum;
1810 LASSERT(buf != NULL && buf->lb_buf != NULL);
1813 CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1814 le32_to_cpu(lum->lum_stripe_count),
1815 (int)le32_to_cpu(lum->lum_stripe_offset));
1817 if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1818 le32_to_cpu(lum->lum_stripe_offset)) &&
1819 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1820 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1824 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1829 /* Update default stripe cache */
1830 if (l->ldo_dir_stripe == NULL) {
1831 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1832 if (l->ldo_dir_stripe == NULL)
1836 l->ldo_dir_striping_cached = 0;
1837 l->ldo_dir_def_striping_set = 1;
1838 l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
1843 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1844 const struct lu_buf *buf, const char *name,
1845 int fl, struct thandle *th,
1846 struct lustre_capa *capa)
1848 struct lod_object *lo = lod_dt_obj(dt);
1849 struct lod_thread_info *info = lod_env_info(env);
1850 struct lu_attr *attr = &info->lti_attr;
1851 struct dt_object_format *dof = &info->lti_format;
1852 struct lu_buf lmv_buf;
1853 struct lu_buf slave_lmv_buf;
1854 struct lmv_mds_md_v1 *lmm;
1855 struct lmv_mds_md_v1 *slave_lmm = NULL;
1860 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1863 /* The stripes are supposed to be allocated in declare phase,
1864 * if there are no stripes being allocated, it will skip */
1865 if (lo->ldo_stripenr == 0)
1868 rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
1872 attr->la_valid = LA_TYPE | LA_MODE;
1873 dof->dof_type = DFT_DIR;
1875 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1878 lmm = lmv_buf.lb_buf;
1880 OBD_ALLOC_PTR(slave_lmm);
1881 if (slave_lmm == NULL)
1884 lod_prep_slave_lmv_md(slave_lmm, lmm);
1885 slave_lmv_buf.lb_buf = slave_lmm;
1886 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1888 for (i = 0; i < lo->ldo_stripenr; i++) {
1889 struct dt_object *dto;
1890 char *stripe_name = info->lti_key;
1892 dto = lo->ldo_stripe[i];
1893 dt_write_lock(env, dto, MOR_TGT_CHILD);
1894 rc = dt_create(env, dto, attr, NULL, dof, th);
1895 dt_write_unlock(env, dto);
1899 rc = dt_insert(env, dto,
1900 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1901 (const struct dt_key *)dot, th, capa, 0);
1905 rc = dt_insert(env, dto,
1906 (struct dt_rec *)lu_object_fid(&dt->do_lu),
1907 (const struct dt_key *)dotdot, th, capa, 0);
1911 if (lo->ldo_striping_cached &&
1912 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1913 lo->ldo_def_stripenr,
1914 lo->ldo_def_stripe_offset)) {
1915 struct lov_user_md_v3 *v3;
1917 /* sigh, lti_ea_store has been used for lmv_buf,
1918 * so we have to allocate buffer for default
1924 memset(v3, 0, sizeof(*v3));
1925 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1926 v3->lmm_stripe_count =
1927 cpu_to_le16(lo->ldo_def_stripenr);
1928 v3->lmm_stripe_offset =
1929 cpu_to_le16(lo->ldo_def_stripe_offset);
1930 v3->lmm_stripe_size =
1931 cpu_to_le32(lo->ldo_def_stripe_size);
1933 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1936 info->lti_buf.lb_buf = v3;
1937 info->lti_buf.lb_len = sizeof(*v3);
1938 rc = dt_xattr_set(env, dto, &info->lti_buf,
1939 XATTR_NAME_LOV, 0, th, capa);
1945 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1946 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
1951 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1952 PFID(lu_object_fid(&dto->do_lu)), i);
1953 rc = dt_insert(env, dt_object_child(dt),
1954 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1955 (const struct dt_key *)stripe_name, th, capa, 0);
1959 rc = dt_ref_add(env, dt_object_child(dt), th);
1964 rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
1968 if (slave_lmm != NULL)
1969 OBD_FREE_PTR(slave_lmm);
1974 int lod_dir_striping_create_internal(const struct lu_env *env,
1975 struct dt_object *dt,
1976 struct lu_attr *attr,
1977 struct dt_object_format *dof,
1981 struct lod_thread_info *info = lod_env_info(env);
1982 struct lod_object *lo = lod_dt_obj(dt);
1986 if (lo->ldo_dir_def_striping_set &&
1987 !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1988 lo->ldo_dir_stripe_offset)) {
1989 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1990 int stripe_count = lo->ldo_stripenr;
1992 if (info->lti_ea_store_size < sizeof(*v1)) {
1993 rc = lod_ea_store_resize(info, sizeof(*v1));
1996 v1 = info->lti_ea_store;
1999 memset(v1, 0, sizeof(*v1));
2000 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2001 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2002 v1->lum_stripe_offset =
2003 cpu_to_le32(lo->ldo_dir_stripe_offset);
2005 info->lti_buf.lb_buf = v1;
2006 info->lti_buf.lb_len = sizeof(*v1);
2009 rc = lod_declare_xattr_set_lmv(env, dt, attr,
2010 &info->lti_buf, dof, th);
2012 rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2013 XATTR_NAME_LMV, 0, th,
2019 /* Transfer default LMV striping from the parent */
2020 if (lo->ldo_dir_striping_cached &&
2021 !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2022 lo->ldo_dir_def_stripe_offset)) {
2023 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2024 int def_stripe_count = lo->ldo_dir_def_stripenr;
2026 if (info->lti_ea_store_size < sizeof(*v1)) {
2027 rc = lod_ea_store_resize(info, sizeof(*v1));
2030 v1 = info->lti_ea_store;
2033 memset(v1, 0, sizeof(*v1));
2034 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2035 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2036 v1->lum_stripe_offset =
2037 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2039 cpu_to_le32(lo->ldo_dir_def_hash_type);
2041 info->lti_buf.lb_buf = v1;
2042 info->lti_buf.lb_len = sizeof(*v1);
2044 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2045 XATTR_NAME_DEFAULT_LMV,
2048 rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2050 XATTR_NAME_DEFAULT_LMV, 0,
2056 /* Transfer default LOV striping from the parent */
2057 if (lo->ldo_striping_cached &&
2058 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2059 lo->ldo_def_stripenr,
2060 lo->ldo_def_stripe_offset)) {
2061 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2063 if (info->lti_ea_store_size < sizeof(*v3)) {
2064 rc = lod_ea_store_resize(info, sizeof(*v3));
2067 v3 = info->lti_ea_store;
2070 memset(v3, 0, sizeof(*v3));
2071 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2072 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2073 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2074 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2076 strncpy(v3->lmm_pool_name, lo->ldo_pool,
2079 info->lti_buf.lb_buf = v3;
2080 info->lti_buf.lb_len = sizeof(*v3);
2083 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2084 XATTR_NAME_LOV, 0, th);
2086 rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2087 XATTR_NAME_LOV, 0, th,
2096 static int lod_declare_dir_striping_create(const struct lu_env *env,
2097 struct dt_object *dt,
2098 struct lu_attr *attr,
2099 struct dt_object_format *dof,
2102 return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2105 static int lod_dir_striping_create(const struct lu_env *env,
2106 struct dt_object *dt,
2107 struct lu_attr *attr,
2108 struct dt_object_format *dof,
2111 return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2114 static int lod_xattr_set(const struct lu_env *env,
2115 struct dt_object *dt, const struct lu_buf *buf,
2116 const char *name, int fl, struct thandle *th,
2117 struct lustre_capa *capa)
2119 struct dt_object *next = dt_object_child(dt);
2123 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2124 strcmp(name, XATTR_NAME_LMV) == 0) {
2125 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2127 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2128 LMV_HASH_FLAG_MIGRATION)
2129 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2131 rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2136 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2137 strcmp(name, XATTR_NAME_LOV) == 0) {
2139 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2141 } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2142 strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2144 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2147 } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2148 !strcmp(name, XATTR_NAME_LOV)) {
2149 /* in case of lov EA swap, just set it
2150 * if not, it is a replay so check striping match what we
2151 * already have during req replay, declare_xattr_set()
2152 * defines striping, then create() does the work
2154 if (fl & LU_XATTR_REPLACE) {
2155 /* free stripes, then update disk */
2156 lod_object_free_striping(env, lod_dt_obj(dt));
2157 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2159 rc = lod_striping_create(env, dt, NULL, NULL, th);
2164 /* then all other xattr */
2165 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2170 static int lod_declare_xattr_del(const struct lu_env *env,
2171 struct dt_object *dt, const char *name,
2174 return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2177 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2178 const char *name, struct thandle *th,
2179 struct lustre_capa *capa)
2181 if (!strcmp(name, XATTR_NAME_LOV))
2182 lod_object_free_striping(env, lod_dt_obj(dt));
2183 return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2186 static int lod_xattr_list(const struct lu_env *env,
2187 struct dt_object *dt, struct lu_buf *buf,
2188 struct lustre_capa *capa)
2190 return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2193 int lod_object_set_pool(struct lod_object *o, char *pool)
2198 len = strlen(o->ldo_pool);
2199 OBD_FREE(o->ldo_pool, len + 1);
2204 OBD_ALLOC(o->ldo_pool, len + 1);
2205 if (o->ldo_pool == NULL)
2207 strcpy(o->ldo_pool, pool);
2212 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2214 return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2218 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2219 struct lod_object *lp)
2221 struct lod_thread_info *info = lod_env_info(env);
2222 struct lov_user_md_v1 *v1 = NULL;
2223 struct lov_user_md_v3 *v3 = NULL;
2227 /* called from MDD without parent being write locked,
2229 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2230 rc = lod_get_lov_ea(env, lp);
2234 if (rc < sizeof(struct lov_user_md)) {
2235 /* don't lookup for non-existing or invalid striping */
2236 lp->ldo_def_striping_set = 0;
2237 lp->ldo_striping_cached = 1;
2238 lp->ldo_def_stripe_size = 0;
2239 lp->ldo_def_stripenr = 0;
2240 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2241 GOTO(unlock, rc = 0);
2245 v1 = info->lti_ea_store;
2246 if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
2247 lustre_swab_lov_user_md_v1(v1);
2248 else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
2249 lustre_swab_lov_user_md_v3(v3);
2251 if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2252 GOTO(unlock, rc = 0);
2254 if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2255 GOTO(unlock, rc = 0);
2257 CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2258 PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2259 (int)v1->lmm_stripe_count,
2260 (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2262 lp->ldo_def_stripenr = v1->lmm_stripe_count;
2263 lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2264 lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2265 lp->ldo_striping_cached = 1;
2266 lp->ldo_def_striping_set = 1;
2267 if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2268 /* XXX: sanity check here */
2269 v3 = (struct lov_user_md_v3 *) v1;
2270 if (v3->lmm_pool_name[0])
2271 lod_object_set_pool(lp, v3->lmm_pool_name);
2275 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2280 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2281 struct lod_object *lp)
2283 struct lod_thread_info *info = lod_env_info(env);
2284 struct lmv_user_md_v1 *v1 = NULL;
2288 /* called from MDD without parent being write locked,
2290 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2291 rc = lod_get_default_lmv_ea(env, lp);
2295 if (rc < sizeof(struct lmv_user_md)) {
2296 /* don't lookup for non-existing or invalid striping */
2297 lp->ldo_dir_def_striping_set = 0;
2298 lp->ldo_dir_striping_cached = 1;
2299 lp->ldo_dir_def_stripenr = 0;
2300 lp->ldo_dir_def_stripe_offset =
2301 (typeof(v1->lum_stripe_offset))(-1);
2302 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2303 GOTO(unlock, rc = 0);
2307 v1 = info->lti_ea_store;
2309 lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2310 lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2311 lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2312 lp->ldo_dir_def_striping_set = 1;
2313 lp->ldo_dir_striping_cached = 1;
2317 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2321 static int lod_cache_parent_striping(const struct lu_env *env,
2322 struct lod_object *lp,
2328 rc = lod_load_striping(env, lp);
2332 if (!lp->ldo_striping_cached) {
2333 /* we haven't tried to get default striping for
2334 * the directory yet, let's cache it in the object */
2335 rc = lod_cache_parent_lov_striping(env, lp);
2340 if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2341 rc = lod_cache_parent_lmv_striping(env, lp);
2347 * used to transfer default striping data to the object being created
2349 static void lod_ah_init(const struct lu_env *env,
2350 struct dt_allocation_hint *ah,
2351 struct dt_object *parent,
2352 struct dt_object *child,
2355 struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2356 struct dt_object *nextp = NULL;
2357 struct dt_object *nextc;
2358 struct lod_object *lp = NULL;
2359 struct lod_object *lc;
2360 struct lov_desc *desc;
2366 if (likely(parent)) {
2367 nextp = dt_object_child(parent);
2368 lp = lod_dt_obj(parent);
2369 rc = lod_load_striping(env, lp);
2374 nextc = dt_object_child(child);
2375 lc = lod_dt_obj(child);
2377 LASSERT(lc->ldo_stripenr == 0);
2378 LASSERT(lc->ldo_stripe == NULL);
2381 * local object may want some hints
2382 * in case of late striping creation, ->ah_init()
2383 * can be called with local object existing
2385 if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2386 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2387 NULL : nextp, nextc, child_mode);
2389 if (S_ISDIR(child_mode)) {
2390 if (lc->ldo_dir_stripe == NULL) {
2391 OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2392 if (lc->ldo_dir_stripe == NULL)
2396 if (lp->ldo_dir_stripe == NULL) {
2397 OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2398 if (lp->ldo_dir_stripe == NULL)
2402 rc = lod_cache_parent_striping(env, lp, child_mode);
2406 /* transfer defaults to new directory */
2407 if (lp->ldo_striping_cached) {
2409 lod_object_set_pool(lc, lp->ldo_pool);
2410 lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2411 lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2412 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2413 lc->ldo_striping_cached = 1;
2414 lc->ldo_def_striping_set = 1;
2415 CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2416 (int)lc->ldo_def_stripe_size,
2417 (int)lc->ldo_def_stripe_offset,
2418 (int)lc->ldo_def_stripenr);
2421 /* transfer dir defaults to new directory */
2422 if (lp->ldo_dir_striping_cached) {
2423 lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2424 lc->ldo_dir_def_stripe_offset =
2425 lp->ldo_dir_def_stripe_offset;
2426 lc->ldo_dir_def_hash_type =
2427 lp->ldo_dir_def_hash_type;
2428 lc->ldo_dir_striping_cached = 1;
2429 lc->ldo_dir_def_striping_set = 1;
2430 CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2431 (int)lc->ldo_dir_def_stripenr,
2432 (int)lc->ldo_dir_def_stripe_offset,
2433 lc->ldo_dir_def_hash_type);
2436 /* If the directory is specified with certain stripes */
2437 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2438 const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2440 rc = lod_verify_md_striping(d, lum1);
2442 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2443 /* Directory will be striped only if
2444 * stripe_count > 1 */
2446 le32_to_cpu(lum1->lum_stripe_count);
2447 lc->ldo_dir_stripe_offset =
2448 le32_to_cpu(lum1->lum_stripe_offset);
2449 lc->ldo_dir_hash_type =
2450 le32_to_cpu(lum1->lum_hash_type);
2451 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2453 (int)lc->ldo_dir_stripe_offset);
2455 } else if (lp->ldo_dir_def_striping_set) {
2456 /* If there are default dir stripe from parent */
2457 lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2458 lc->ldo_dir_stripe_offset =
2459 lp->ldo_dir_def_stripe_offset;
2460 lc->ldo_dir_hash_type =
2461 lp->ldo_dir_def_hash_type;
2462 CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2464 (int)lc->ldo_dir_stripe_offset);
2466 /* set default stripe for this directory */
2467 lc->ldo_stripenr = 0;
2468 lc->ldo_dir_stripe_offset = -1;
2471 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2472 lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2478 * if object is going to be striped over OSTs, transfer default
2479 * striping information to the child, so that we can use it
2480 * during declaration and creation
2482 if (!lod_object_will_be_striped(S_ISREG(child_mode),
2483 lu_object_fid(&child->do_lu)))
2486 * try from the parent
2488 if (likely(parent)) {
2489 lod_cache_parent_striping(env, lp, child_mode);
2491 lc->ldo_def_stripe_offset = (__u16) -1;
2493 if (lp->ldo_def_striping_set) {
2495 lod_object_set_pool(lc, lp->ldo_pool);
2496 lc->ldo_stripenr = lp->ldo_def_stripenr;
2497 lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2498 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2499 CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2500 lc->ldo_stripenr, lc->ldo_stripe_size,
2501 lp->ldo_pool ? lp->ldo_pool : "");
2506 * if the parent doesn't provide with specific pattern, grab fs-wide one
2508 desc = &d->lod_desc;
2509 if (lc->ldo_stripenr == 0)
2510 lc->ldo_stripenr = desc->ld_default_stripe_count;
2511 if (lc->ldo_stripe_size == 0)
2512 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2513 CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2514 lc->ldo_stripenr, lc->ldo_stripe_size,
2515 lc->ldo_pool ? lc->ldo_pool : "");
2518 /* we do not cache stripe information for slave stripe, see
2519 * lod_xattr_set_lov_on_dir */
2520 if (lp != NULL && lp->ldo_dir_slave_stripe)
2521 lod_lov_stripe_cache_clear(lp);
2526 #define ll_do_div64(aaa,bbb) do_div((aaa), (bbb))
2528 * this function handles a special case when truncate was done
2529 * on a stripeless object and now striping is being created
2530 * we can't lose that size, so we have to propagate it to newly
2533 static int lod_declare_init_size(const struct lu_env *env,
2534 struct dt_object *dt, struct thandle *th)
2536 struct dt_object *next = dt_object_child(dt);
2537 struct lod_object *lo = lod_dt_obj(dt);
2538 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
2539 uint64_t size, offs;
2543 /* XXX: we support the simplest (RAID0) striping so far */
2544 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2545 LASSERT(lo->ldo_stripe_size > 0);
2547 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2548 LASSERT(attr->la_valid & LA_SIZE);
2552 size = attr->la_size;
2556 /* ll_do_div64(a, b) returns a % b, and a = a / b */
2557 ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2558 stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2560 size = size * lo->ldo_stripe_size;
2561 offs = attr->la_size;
2562 size += ll_do_div64(offs, lo->ldo_stripe_size);
2564 attr->la_valid = LA_SIZE;
2565 attr->la_size = size;
2567 rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2573 * Create declaration of striped object
2575 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2576 struct lu_attr *attr,
2577 const struct lu_buf *lovea, struct thandle *th)
2579 struct lod_thread_info *info = lod_env_info(env);
2580 struct dt_object *next = dt_object_child(dt);
2581 struct lod_object *lo = lod_dt_obj(dt);
2585 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2586 /* failed to create striping, let's reset
2587 * config so that others don't get confused */
2588 lod_object_free_striping(env, lo);
2589 GOTO(out, rc = -ENOMEM);
2592 if (!dt_object_remote(next)) {
2593 /* choose OST and generate appropriate objects */
2594 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2596 /* failed to create striping, let's reset
2597 * config so that others don't get confused */
2598 lod_object_free_striping(env, lo);
2603 * declare storage for striping data
2605 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2606 lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
2608 /* LOD can not choose OST objects for remote objects, i.e.
2609 * stripes must be ready before that. Right now, it can only
2610 * happen during migrate, i.e. migrate process needs to create
2611 * remote regular file (mdd_migrate_create), then the migrate
2612 * process will provide stripeEA. */
2613 LASSERT(lovea != NULL);
2614 info->lti_buf = *lovea;
2617 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2618 XATTR_NAME_LOV, 0, th);
2623 * if striping is created with local object's size > 0,
2624 * we have to propagate this size to specific object
2625 * the case is possible only when local object was created previously
2627 if (dt_object_exists(next))
2628 rc = lod_declare_init_size(env, dt, th);
2634 static int lod_declare_object_create(const struct lu_env *env,
2635 struct dt_object *dt,
2636 struct lu_attr *attr,
2637 struct dt_allocation_hint *hint,
2638 struct dt_object_format *dof,
2641 struct dt_object *next = dt_object_child(dt);
2642 struct lod_object *lo = lod_dt_obj(dt);
2651 * first of all, we declare creation of local object
2653 rc = dt_declare_create(env, next, attr, hint, dof, th);
2657 if (dof->dof_type == DFT_SYM)
2658 dt->do_body_ops = &lod_body_lnk_ops;
2661 * it's lod_ah_init() who has decided the object will striped
2663 if (dof->dof_type == DFT_REGULAR) {
2664 /* callers don't want stripes */
2665 /* XXX: all tricky interactions with ->ah_make_hint() decided
2666 * to use striping, then ->declare_create() behaving differently
2667 * should be cleaned */
2668 if (dof->u.dof_reg.striped == 0)
2669 lo->ldo_stripenr = 0;
2670 if (lo->ldo_stripenr > 0)
2671 rc = lod_declare_striped_object(env, dt, attr,
2673 } else if (dof->dof_type == DFT_DIR) {
2674 /* Orphan object (like migrating object) does not have
2675 * lod_dir_stripe, see lod_ah_init */
2676 if (lo->ldo_dir_stripe != NULL)
2677 rc = lod_declare_dir_striping_create(env, dt, attr,
2684 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2685 struct lu_attr *attr, struct dt_object_format *dof,
2688 struct lod_object *lo = lod_dt_obj(dt);
2692 LASSERT(lo->ldo_striping_cached == 0);
2694 /* create all underlying objects */
2695 for (i = 0; i < lo->ldo_stripenr; i++) {
2696 LASSERT(lo->ldo_stripe[i]);
2697 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2703 rc = lod_generate_and_set_lovea(env, lo, th);
2708 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2709 struct lu_attr *attr,
2710 struct dt_allocation_hint *hint,
2711 struct dt_object_format *dof, struct thandle *th)
2713 struct dt_object *next = dt_object_child(dt);
2714 struct lod_object *lo = lod_dt_obj(dt);
2718 /* create local object */
2719 rc = dt_create(env, next, attr, hint, dof, th);
2723 if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2724 lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2725 rc = lod_striping_create(env, dt, attr, dof, th);
2730 static int lod_declare_object_destroy(const struct lu_env *env,
2731 struct dt_object *dt,
2734 struct dt_object *next = dt_object_child(dt);
2735 struct lod_object *lo = lod_dt_obj(dt);
2736 struct lod_thread_info *info = lod_env_info(env);
2737 char *stripe_name = info->lti_key;
2742 * load striping information, notice we don't do this when object
2743 * is being initialized as we don't need this information till
2744 * few specific cases like destroy, chown
2746 rc = lod_load_striping(env, lo);
2750 /* declare destroy for all underlying objects */
2751 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2752 rc = next->do_ops->do_index_try(env, next,
2753 &dt_directory_features);
2757 for (i = 0; i < lo->ldo_stripenr; i++) {
2758 rc = dt_declare_ref_del(env, next, th);
2761 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2762 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2764 rc = dt_declare_delete(env, next,
2765 (const struct dt_key *)stripe_name, th);
2771 * we declare destroy for the local object
2773 rc = dt_declare_destroy(env, next, th);
2777 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2780 /* declare destroy for all underlying objects */
2781 for (i = 0; i < lo->ldo_stripenr; i++) {
2782 LASSERT(lo->ldo_stripe[i]);
2783 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2791 static int lod_object_destroy(const struct lu_env *env,
2792 struct dt_object *dt, struct thandle *th)
2794 struct dt_object *next = dt_object_child(dt);
2795 struct lod_object *lo = lod_dt_obj(dt);
2796 struct lod_thread_info *info = lod_env_info(env);
2797 char *stripe_name = info->lti_key;
2801 /* destroy sub-stripe of master object */
2802 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2803 rc = next->do_ops->do_index_try(env, next,
2804 &dt_directory_features);
2808 for (i = 0; i < lo->ldo_stripenr; i++) {
2809 rc = dt_ref_del(env, next, th);
2813 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2814 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2817 CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
2818 PFID(lu_object_fid(&dt->do_lu)), stripe_name,
2819 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
2821 rc = dt_delete(env, next,
2822 (const struct dt_key *)stripe_name,
2828 rc = dt_destroy(env, next, th);
2832 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2835 /* destroy all striped objects */
2836 for (i = 0; i < lo->ldo_stripenr; i++) {
2837 LASSERT(lo->ldo_stripe[i]);
2838 rc = dt_destroy(env, lo->ldo_stripe[i], th);
2846 static int lod_declare_ref_add(const struct lu_env *env,
2847 struct dt_object *dt, struct thandle *th)
2849 return dt_declare_ref_add(env, dt_object_child(dt), th);
2852 static int lod_ref_add(const struct lu_env *env,
2853 struct dt_object *dt, struct thandle *th)
2855 return dt_ref_add(env, dt_object_child(dt), th);
2858 static int lod_declare_ref_del(const struct lu_env *env,
2859 struct dt_object *dt, struct thandle *th)
2861 return dt_declare_ref_del(env, dt_object_child(dt), th);
2864 static int lod_ref_del(const struct lu_env *env,
2865 struct dt_object *dt, struct thandle *th)
2867 return dt_ref_del(env, dt_object_child(dt), th);
2870 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2871 struct dt_object *dt,
2872 struct lustre_capa *old, __u64 opc)
2874 return dt_capa_get(env, dt_object_child(dt), old, opc);
2877 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
2878 __u64 start, __u64 end)
2880 return dt_object_sync(env, dt_object_child(dt), start, end);
2883 struct lod_slave_locks {
2885 struct lustre_handle lsl_handle[0];
2888 static int lod_object_unlock_internal(const struct lu_env *env,
2889 struct dt_object *dt,
2890 struct ldlm_enqueue_info *einfo,
2891 ldlm_policy_data_t *policy)
2893 struct lod_object *lo = lod_dt_obj(dt);
2894 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2899 if (slave_locks == NULL)
2902 for (i = 1; i < slave_locks->lsl_lock_count; i++) {
2903 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2906 einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2907 rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2910 rc = rc == 0 ? rc1 : rc;
2917 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2918 struct ldlm_enqueue_info *einfo,
2919 union ldlm_policy_data *policy)
2921 struct lod_object *lo = lod_dt_obj(dt);
2922 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2923 int slave_locks_size;
2927 if (slave_locks == NULL)
2930 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2933 rc = lod_load_striping(env, lo);
2937 /* Note: for remote lock for single stripe dir, MDT will cancel
2938 * the lock by lockh directly */
2939 if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
2942 /* Only cancel slave lock for striped dir */
2943 rc = lod_object_unlock_internal(env, dt, einfo, policy);
2945 slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2946 sizeof(slave_locks->lsl_handle[0]);
2947 OBD_FREE(slave_locks, slave_locks_size);
2948 einfo->ei_cbdata = NULL;
2953 static int lod_object_lock(const struct lu_env *env,
2954 struct dt_object *dt,
2955 struct lustre_handle *lh,
2956 struct ldlm_enqueue_info *einfo,
2957 union ldlm_policy_data *policy)
2959 struct lod_object *lo = lod_dt_obj(dt);
2962 int slave_locks_size;
2963 struct lod_slave_locks *slave_locks = NULL;
2966 /* remote object lock */
2967 if (!einfo->ei_enq_slave) {
2968 LASSERT(dt_object_remote(dt));
2969 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2973 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2976 rc = lod_load_striping(env, lo);
2981 if (lo->ldo_stripenr <= 1)
2984 slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2985 sizeof(slave_locks->lsl_handle[0]);
2986 /* Freed in lod_object_unlock */
2987 OBD_ALLOC(slave_locks, slave_locks_size);
2988 if (slave_locks == NULL)
2990 slave_locks->lsl_lock_count = lo->ldo_stripenr;
2992 /* striped directory lock */
2993 for (i = 1; i < lo->ldo_stripenr; i++) {
2994 struct lustre_handle lockh;
2995 struct ldlm_res_id *res_id;
2997 res_id = &lod_env_info(env)->lti_res_id;
2998 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3000 einfo->ei_res_id = res_id;
3002 LASSERT(lo->ldo_stripe[i]);
3003 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3007 slave_locks->lsl_handle[i] = lockh;
3010 einfo->ei_cbdata = slave_locks;
3013 if (rc != 0 && slave_locks != NULL) {
3014 einfo->ei_cbdata = slave_locks;
3015 lod_object_unlock_internal(env, dt, einfo, policy);
3016 OBD_FREE(slave_locks, slave_locks_size);
3017 einfo->ei_cbdata = NULL;
3023 struct dt_object_operations lod_obj_ops = {
3024 .do_read_lock = lod_object_read_lock,
3025 .do_write_lock = lod_object_write_lock,
3026 .do_read_unlock = lod_object_read_unlock,
3027 .do_write_unlock = lod_object_write_unlock,
3028 .do_write_locked = lod_object_write_locked,
3029 .do_attr_get = lod_attr_get,
3030 .do_declare_attr_set = lod_declare_attr_set,
3031 .do_attr_set = lod_attr_set,
3032 .do_xattr_get = lod_xattr_get,
3033 .do_declare_xattr_set = lod_declare_xattr_set,
3034 .do_xattr_set = lod_xattr_set,
3035 .do_declare_xattr_del = lod_declare_xattr_del,
3036 .do_xattr_del = lod_xattr_del,
3037 .do_xattr_list = lod_xattr_list,
3038 .do_ah_init = lod_ah_init,
3039 .do_declare_create = lod_declare_object_create,
3040 .do_create = lod_object_create,
3041 .do_declare_destroy = lod_declare_object_destroy,
3042 .do_destroy = lod_object_destroy,
3043 .do_index_try = lod_index_try,
3044 .do_declare_ref_add = lod_declare_ref_add,
3045 .do_ref_add = lod_ref_add,
3046 .do_declare_ref_del = lod_declare_ref_del,
3047 .do_ref_del = lod_ref_del,
3048 .do_capa_get = lod_capa_get,
3049 .do_object_sync = lod_object_sync,
3050 .do_object_lock = lod_object_lock,
3051 .do_object_unlock = lod_object_unlock,
3054 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3055 struct lu_buf *buf, loff_t *pos,
3056 struct lustre_capa *capa)
3058 struct dt_object *next = dt_object_child(dt);
3059 return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3062 static ssize_t lod_declare_write(const struct lu_env *env,
3063 struct dt_object *dt,
3064 const struct lu_buf *buf, loff_t pos,
3067 return dt_declare_record_write(env, dt_object_child(dt),
3071 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3072 const struct lu_buf *buf, loff_t *pos,
3073 struct thandle *th, struct lustre_capa *capa, int iq)
3075 struct dt_object *next = dt_object_child(dt);
3077 return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3080 static const struct dt_body_operations lod_body_lnk_ops = {
3081 .dbo_read = lod_read,
3082 .dbo_declare_write = lod_declare_write,
3083 .dbo_write = lod_write
3086 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3087 const struct lu_object_conf *conf)
3089 struct lod_device *lod = lu2lod_dev(lo->lo_dev);
3090 struct lu_device *cdev = NULL;
3091 struct lu_object *cobj;
3092 struct lod_tgt_descs *ltd = NULL;
3093 struct lod_tgt_desc *tgt;
3095 int type = LU_SEQ_RANGE_ANY;
3099 rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3103 if (type == LU_SEQ_RANGE_MDT &&
3104 idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3105 cdev = &lod->lod_child->dd_lu_dev;
3106 } else if (type == LU_SEQ_RANGE_MDT) {
3107 ltd = &lod->lod_mdt_descs;
3109 } else if (type == LU_SEQ_RANGE_OST) {
3110 ltd = &lod->lod_ost_descs;
3117 if (ltd->ltd_tgts_size > idx &&
3118 cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3119 tgt = LTD_TGT(ltd, idx);
3121 LASSERT(tgt != NULL);
3122 LASSERT(tgt->ltd_tgt != NULL);
3124 cdev = &(tgt->ltd_tgt->dd_lu_dev);
3126 lod_putref(lod, ltd);
3129 if (unlikely(cdev == NULL))
3132 cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3133 if (unlikely(cobj == NULL))
3136 lu_object_add(lo, cobj);
3141 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3145 if (lo->ldo_dir_stripe != NULL) {
3146 OBD_FREE_PTR(lo->ldo_dir_stripe);
3147 lo->ldo_dir_stripe = NULL;
3150 if (lo->ldo_stripe) {
3151 LASSERT(lo->ldo_stripes_allocated > 0);
3153 for (i = 0; i < lo->ldo_stripenr; i++) {
3154 if (lo->ldo_stripe[i])
3155 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3158 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3159 OBD_FREE(lo->ldo_stripe, i);
3160 lo->ldo_stripe = NULL;
3161 lo->ldo_stripes_allocated = 0;
3163 lo->ldo_stripenr = 0;
3164 lo->ldo_pattern = 0;
3168 * ->start is called once all slices are initialized, including header's
3169 * cache for mode (object type). using the type we can initialize ops
3171 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3173 if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3174 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3178 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3180 struct lod_object *mo = lu2lod_obj(o);
3183 * release all underlying object pinned
3186 lod_object_free_striping(env, mo);
3188 lod_object_set_pool(mo, NULL);
3191 OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3194 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3196 /* XXX: shouldn't we release everything here in case if object
3197 * creation failed before? */
3200 static int lod_object_print(const struct lu_env *env, void *cookie,
3201 lu_printer_t p, const struct lu_object *l)
3203 struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3205 return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3208 struct lu_object_operations lod_lu_obj_ops = {
3209 .loo_object_init = lod_object_init,
3210 .loo_object_start = lod_object_start,
3211 .loo_object_free = lod_object_free,
3212 .loo_object_release = lod_object_release,
3213 .loo_object_print = lod_object_print,