4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2013, Intel Corporation.
29 * lustre/lod/lod_object.c
31 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
48 #include "lod_internal.h"
50 static const char dot[] = ".";
51 static const char dotdot[] = "..";
53 extern struct kmem_cache *lod_object_kmem;
54 static const struct dt_body_operations lod_body_lnk_ops;
56 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
57 struct dt_rec *rec, const struct dt_key *key,
58 struct lustre_capa *capa)
60 struct dt_object *next = dt_object_child(dt);
61 return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
64 static int lod_declare_index_insert(const struct lu_env *env,
66 const struct dt_rec *rec,
67 const struct dt_key *key,
68 struct thandle *handle)
70 return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
73 static int lod_index_insert(const struct lu_env *env,
75 const struct dt_rec *rec,
76 const struct dt_key *key,
78 struct lustre_capa *capa,
81 return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
84 static int lod_declare_index_delete(const struct lu_env *env,
86 const struct dt_key *key,
89 return dt_declare_delete(env, dt_object_child(dt), key, th);
92 static int lod_index_delete(const struct lu_env *env,
94 const struct dt_key *key,
96 struct lustre_capa *capa)
98 return dt_delete(env, dt_object_child(dt), key, th, capa);
101 static struct dt_it *lod_it_init(const struct lu_env *env,
102 struct dt_object *dt, __u32 attr,
103 struct lustre_capa *capa)
105 struct dt_object *next = dt_object_child(dt);
106 struct lod_it *it = &lod_env_info(env)->lti_it;
107 struct dt_it *it_next;
110 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
114 /* currently we do not use more than one iterator per thread
115 * so we store it in thread info. if at some point we need
116 * more active iterators in a single thread, we can allocate
118 LASSERT(it->lit_obj == NULL);
120 it->lit_it = it_next;
123 return (struct dt_it *)it;
126 #define LOD_CHECK_IT(env, it) \
128 LASSERT((it)->lit_obj != NULL); \
129 LASSERT((it)->lit_it != NULL); \
132 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
134 struct lod_it *it = (struct lod_it *)di;
136 LOD_CHECK_IT(env, it);
137 it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
139 /* the iterator not in use any more */
144 int lod_it_get(const struct lu_env *env, struct dt_it *di,
145 const struct dt_key *key)
147 const struct lod_it *it = (const struct lod_it *)di;
149 LOD_CHECK_IT(env, it);
150 return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
153 void lod_it_put(const struct lu_env *env, struct dt_it *di)
155 struct lod_it *it = (struct lod_it *)di;
157 LOD_CHECK_IT(env, it);
158 return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
161 int lod_it_next(const struct lu_env *env, struct dt_it *di)
163 struct lod_it *it = (struct lod_it *)di;
165 LOD_CHECK_IT(env, it);
166 return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
169 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
171 const struct lod_it *it = (const struct lod_it *)di;
173 LOD_CHECK_IT(env, it);
174 return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
177 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
179 struct lod_it *it = (struct lod_it *)di;
181 LOD_CHECK_IT(env, it);
182 return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
185 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
186 struct dt_rec *rec, __u32 attr)
188 const struct lod_it *it = (const struct lod_it *)di;
190 LOD_CHECK_IT(env, it);
191 return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
195 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
198 const struct lod_it *it = (const struct lod_it *)di;
200 LOD_CHECK_IT(env, it);
201 return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
205 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
207 const struct lod_it *it = (const struct lod_it *)di;
209 LOD_CHECK_IT(env, it);
210 return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
213 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
215 const struct lod_it *it = (const struct lod_it *)di;
217 LOD_CHECK_IT(env, it);
218 return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
221 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
224 const struct lod_it *it = (const struct lod_it *)di;
226 LOD_CHECK_IT(env, it);
227 return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
231 static struct dt_index_operations lod_index_ops = {
232 .dio_lookup = lod_index_lookup,
233 .dio_declare_insert = lod_declare_index_insert,
234 .dio_insert = lod_index_insert,
235 .dio_declare_delete = lod_declare_index_delete,
236 .dio_delete = lod_index_delete,
244 .key_size = lod_it_key_size,
246 .rec_size = lod_it_rec_size,
247 .store = lod_it_store,
249 .key_rec = lod_it_key_rec,
254 * Implementation of dt_index_operations:: dio_it.init
256 * This function is to initialize the iterator for striped directory,
257 * basically these lod_striped_it_xxx will just locate the stripe
258 * and call the correspondent api of its next lower layer.
260 * \param[in] env execution environment.
261 * \param[in] dt the striped directory object to be iterated.
262 * \param[in] attr the attribute of iterator, mostly used to indicate
263 * the entry attribute in the object to be iterated.
264 * \param[in] capa capability(useless in current implementation)
266 * \retval initialized iterator(dt_it) if successful initialize the
267 * iteration. lit_stripe_index will be used to indicate the
268 * current iterate position among stripes.
269 * \retval ERR pointer if initialization is failed.
271 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
272 struct dt_object *dt, __u32 attr,
273 struct lustre_capa *capa)
275 struct lod_object *lo = lod_dt_obj(dt);
276 struct dt_object *next;
277 struct lod_it *it = &lod_env_info(env)->lti_it;
278 struct dt_it *it_next;
281 LASSERT(lo->ldo_stripenr > 0);
282 next = lo->ldo_stripe[0];
283 LASSERT(next != NULL);
284 LASSERT(next->do_index_ops != NULL);
286 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
290 /* currently we do not use more than one iterator per thread
291 * so we store it in thread info. if at some point we need
292 * more active iterators in a single thread, we can allocate
294 LASSERT(it->lit_obj == NULL);
296 it->lit_stripe_index = 0;
298 it->lit_it = it_next;
301 return (struct dt_it *)it;
304 #define LOD_CHECK_STRIPED_IT(env, it, lo) \
306 LASSERT((it)->lit_obj != NULL); \
307 LASSERT((it)->lit_it != NULL); \
308 LASSERT((lo)->ldo_stripenr > 0); \
309 LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \
313 * Implementation of dt_index_operations:: dio_it.fini
315 * This function is to finish the iterator for striped directory.
317 * \param[in] env execution environment.
318 * \param[in] di the iterator for the striped directory
321 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
323 struct lod_it *it = (struct lod_it *)di;
324 struct lod_object *lo = lod_dt_obj(it->lit_obj);
325 struct dt_object *next;
327 LOD_CHECK_STRIPED_IT(env, it, lo);
329 next = lo->ldo_stripe[it->lit_stripe_index];
330 LASSERT(next != NULL);
331 LASSERT(next->do_index_ops != NULL);
333 next->do_index_ops->dio_it.fini(env, it->lit_it);
335 /* the iterator not in use any more */
338 it->lit_stripe_index = 0;
342 * Implementation of dt_index_operations:: dio_it.get
344 * This function is to position the iterator with given key
346 * \param[in] env execution environment.
347 * \param[in] di the iterator for striped directory.
348 * \param[in] key the key the iterator will be positioned.
350 * \retval 0 if successfully position iterator by the key.
351 * \retval negative error if position is failed.
353 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
354 const struct dt_key *key)
356 const struct lod_it *it = (const struct lod_it *)di;
357 struct lod_object *lo = lod_dt_obj(it->lit_obj);
358 struct dt_object *next;
361 LOD_CHECK_STRIPED_IT(env, it, lo);
363 next = lo->ldo_stripe[it->lit_stripe_index];
364 LASSERT(next != NULL);
365 LASSERT(next->do_index_ops != NULL);
367 return next->do_index_ops->dio_it.get(env, it->lit_it, key);
371 * Implementation of dt_index_operations:: dio_it.put
373 * This function is supposed to be the pair of it_get, but currently do
374 * nothing. see (osd_it_ea_put or osd_index_it_put)
376 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
378 struct lod_it *it = (struct lod_it *)di;
379 struct lod_object *lo = lod_dt_obj(it->lit_obj);
380 struct dt_object *next;
382 LOD_CHECK_STRIPED_IT(env, it, lo);
384 next = lo->ldo_stripe[it->lit_stripe_index];
385 LASSERT(next != NULL);
386 LASSERT(next->do_index_ops != NULL);
388 return next->do_index_ops->dio_it.put(env, it->lit_it);
392 * Implementation of dt_index_operations:: dio_it.next
394 * This function is to position the iterator to the next entry, if current
395 * stripe is finished by checking the return value of next() in current
396 * stripe. it will go to next stripe. In the mean time, the sub-iterator
397 * for next stripe needs to be initialized.
399 * \param[in] env execution environment.
400 * \param[in] di the iterator for striped directory.
402 * \retval 0 if successfully position iterator to the next entry.
403 * \retval negative error if position is failed.
405 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
407 struct lod_it *it = (struct lod_it *)di;
408 struct lod_object *lo = lod_dt_obj(it->lit_obj);
409 struct dt_object *next;
410 struct dt_it *it_next;
414 LOD_CHECK_STRIPED_IT(env, it, lo);
416 next = lo->ldo_stripe[it->lit_stripe_index];
417 LASSERT(next != NULL);
418 LASSERT(next->do_index_ops != NULL);
420 rc = next->do_index_ops->dio_it.next(env, it->lit_it);
424 if (rc == 0 && it->lit_stripe_index == 0)
427 if (rc == 0 && it->lit_stripe_index > 0) {
428 struct lu_dirent *ent;
430 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
432 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
433 (struct dt_rec *)ent,
438 /* skip . and .. for slave stripe */
439 if ((strncmp(ent->lde_name, ".",
440 le16_to_cpu(ent->lde_namelen)) == 0 &&
441 le16_to_cpu(ent->lde_namelen) == 1) ||
442 (strncmp(ent->lde_name, "..",
443 le16_to_cpu(ent->lde_namelen)) == 0 &&
444 le16_to_cpu(ent->lde_namelen) == 2))
450 /* go to next stripe */
451 if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
454 it->lit_stripe_index++;
456 next->do_index_ops->dio_it.put(env, it->lit_it);
457 next->do_index_ops->dio_it.fini(env, it->lit_it);
459 rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
463 next = lo->ldo_stripe[it->lit_stripe_index];
464 LASSERT(next != NULL);
465 LASSERT(next->do_index_ops != NULL);
467 it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
469 if (!IS_ERR(it_next)) {
470 it->lit_it = it_next;
473 rc = PTR_ERR(it_next);
480 * Implementation of dt_index_operations:: dio_it.key
482 * This function is to get the key of the iterator at current position.
484 * \param[in] env execution environment.
485 * \param[in] di the iterator for striped directory.
487 * \retval key(dt_key) if successfully get the key.
488 * \retval negative error if can not get the key.
490 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
491 const struct dt_it *di)
493 const struct lod_it *it = (const struct lod_it *)di;
494 struct lod_object *lo = lod_dt_obj(it->lit_obj);
495 struct dt_object *next;
497 LOD_CHECK_STRIPED_IT(env, it, lo);
499 next = lo->ldo_stripe[it->lit_stripe_index];
500 LASSERT(next != NULL);
501 LASSERT(next->do_index_ops != NULL);
503 return next->do_index_ops->dio_it.key(env, it->lit_it);
507 * Implementation of dt_index_operations:: dio_it.key_size
509 * This function is to get the key_size of current key.
511 * \param[in] env execution environment.
512 * \param[in] di the iterator for striped directory.
514 * \retval key_size if successfully get the key_size.
515 * \retval negative error if can not get the key_size.
517 static int lod_striped_it_key_size(const struct lu_env *env,
518 const struct dt_it *di)
520 struct lod_it *it = (struct lod_it *)di;
521 struct lod_object *lo = lod_dt_obj(it->lit_obj);
522 struct dt_object *next;
524 LOD_CHECK_STRIPED_IT(env, it, lo);
526 next = lo->ldo_stripe[it->lit_stripe_index];
527 LASSERT(next != NULL);
528 LASSERT(next->do_index_ops != NULL);
530 return next->do_index_ops->dio_it.key_size(env, it->lit_it);
534 * Implementation of dt_index_operations:: dio_it.rec
536 * This function is to get the record at current position.
538 * \param[in] env execution environment.
539 * \param[in] di the iterator for striped directory.
540 * \param[in] attr the attribute of iterator, mostly used to indicate
541 * the entry attribute in the object to be iterated.
542 * \param[out] rec hold the return record.
544 * \retval 0 if successfully get the entry.
545 * \retval negative error if can not get entry.
547 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
548 struct dt_rec *rec, __u32 attr)
550 const struct lod_it *it = (const struct lod_it *)di;
551 struct lod_object *lo = lod_dt_obj(it->lit_obj);
552 struct dt_object *next;
554 LOD_CHECK_STRIPED_IT(env, it, lo);
556 next = lo->ldo_stripe[it->lit_stripe_index];
557 LASSERT(next != NULL);
558 LASSERT(next->do_index_ops != NULL);
560 return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
564 * Implementation of dt_index_operations:: dio_it.rec_size
566 * This function is to get the record_size at current record.
568 * \param[in] env execution environment.
569 * \param[in] di the iterator for striped directory.
570 * \param[in] attr the attribute of iterator, mostly used to indicate
571 * the entry attribute in the object to be iterated.
573 * \retval rec_size if successfully get the entry size.
574 * \retval negative error if can not get entry size.
576 static int lod_striped_it_rec_size(const struct lu_env *env,
577 const struct dt_it *di, __u32 attr)
579 struct lod_it *it = (struct lod_it *)di;
580 struct lod_object *lo = lod_dt_obj(it->lit_obj);
581 struct dt_object *next;
583 LOD_CHECK_STRIPED_IT(env, it, lo);
585 next = lo->ldo_stripe[it->lit_stripe_index];
586 LASSERT(next != NULL);
587 LASSERT(next->do_index_ops != NULL);
589 return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
593 * Implementation of dt_index_operations:: dio_it.store
595 * This function will a cookie for current position of the iterator head,
596 * so that user can use this cookie to load/start the iterator next time.
598 * \param[in] env execution environment.
599 * \param[in] di the iterator for striped directory.
601 * \retval the cookie.
603 static __u64 lod_striped_it_store(const struct lu_env *env,
604 const struct dt_it *di)
606 const struct lod_it *it = (const struct lod_it *)di;
607 struct lod_object *lo = lod_dt_obj(it->lit_obj);
608 struct dt_object *next;
610 LOD_CHECK_STRIPED_IT(env, it, lo);
612 next = lo->ldo_stripe[it->lit_stripe_index];
613 LASSERT(next != NULL);
614 LASSERT(next->do_index_ops != NULL);
616 return next->do_index_ops->dio_it.store(env, it->lit_it);
620 * Implementation of dt_index_operations:: dio_it.load
622 * This function will position the iterator with the given hash(usually
625 * \param[in] env execution environment.
626 * \param[in] di the iterator for striped directory.
627 * \param[in] hash the given hash.
629 * \retval >0 if successfuly load the iterator to the given position.
630 * \retval <0 if load is failed.
632 static int lod_striped_it_load(const struct lu_env *env,
633 const struct dt_it *di, __u64 hash)
635 const struct lod_it *it = (const struct lod_it *)di;
636 struct lod_object *lo = lod_dt_obj(it->lit_obj);
637 struct dt_object *next;
639 LOD_CHECK_STRIPED_IT(env, it, lo);
641 next = lo->ldo_stripe[it->lit_stripe_index];
642 LASSERT(next != NULL);
643 LASSERT(next->do_index_ops != NULL);
645 return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
648 static struct dt_index_operations lod_striped_index_ops = {
649 .dio_lookup = lod_index_lookup,
650 .dio_declare_insert = lod_declare_index_insert,
651 .dio_insert = lod_index_insert,
652 .dio_declare_delete = lod_declare_index_delete,
653 .dio_delete = lod_index_delete,
655 .init = lod_striped_it_init,
656 .fini = lod_striped_it_fini,
657 .get = lod_striped_it_get,
658 .put = lod_striped_it_put,
659 .next = lod_striped_it_next,
660 .key = lod_striped_it_key,
661 .key_size = lod_striped_it_key_size,
662 .rec = lod_striped_it_rec,
663 .rec_size = lod_striped_it_rec_size,
664 .store = lod_striped_it_store,
665 .load = lod_striped_it_load,
670 * Implementation of dt_object_operations:: do_index_try
672 * This function will try to initialize the index api pointer for the
673 * given object, usually it the entry point of the index api. i.e.
674 * the index object should be initialized in index_try, then start
675 * using index api. For striped directory, it will try to initialize
676 * all of its sub_stripes.
678 * \param[in] env execution environment.
679 * \param[in] dt the index object to be initialized.
680 * \param[in] feat the features of this object, for example fixed or
681 * variable key size etc.
683 * \retval >0 if the initialization is successful.
684 * \retval <0 if the initialization is failed.
686 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
687 const struct dt_index_features *feat)
689 struct lod_object *lo = lod_dt_obj(dt);
690 struct dt_object *next = dt_object_child(dt);
694 LASSERT(next->do_ops);
695 LASSERT(next->do_ops->do_index_try);
697 rc = lod_load_striping_locked(env, lo);
701 rc = next->do_ops->do_index_try(env, next, feat);
705 if (lo->ldo_stripenr > 0) {
708 for (i = 0; i < lo->ldo_stripenr; i++) {
709 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
711 rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
712 lo->ldo_stripe[i], feat);
716 dt->do_index_ops = &lod_striped_index_ops;
718 dt->do_index_ops = &lod_index_ops;
724 static void lod_object_read_lock(const struct lu_env *env,
725 struct dt_object *dt, unsigned role)
727 dt_read_lock(env, dt_object_child(dt), role);
730 static void lod_object_write_lock(const struct lu_env *env,
731 struct dt_object *dt, unsigned role)
733 dt_write_lock(env, dt_object_child(dt), role);
736 static void lod_object_read_unlock(const struct lu_env *env,
737 struct dt_object *dt)
739 dt_read_unlock(env, dt_object_child(dt));
742 static void lod_object_write_unlock(const struct lu_env *env,
743 struct dt_object *dt)
745 dt_write_unlock(env, dt_object_child(dt));
748 static int lod_object_write_locked(const struct lu_env *env,
749 struct dt_object *dt)
751 return dt_write_locked(env, dt_object_child(dt));
754 static int lod_attr_get(const struct lu_env *env,
755 struct dt_object *dt,
756 struct lu_attr *attr,
757 struct lustre_capa *capa)
759 struct lod_object *lo = lod_dt_obj(dt);
764 rc = dt_attr_get(env, dt_object_child(dt), attr, capa);
765 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr) || rc != 0)
768 rc = lod_load_striping_locked(env, lo);
772 if (lo->ldo_stripenr == 0)
777 for (i = 0; i < lo->ldo_stripenr; i++) {
778 struct lu_attr *sub_attr = &lod_env_info(env)->lti_attr;
780 LASSERT(lo->ldo_stripe[i]);
781 if (dt_object_exists(lo->ldo_stripe[i]))
784 rc = dt_attr_get(env, lo->ldo_stripe[i], sub_attr, capa);
788 /* -2 for . and .. on each stripe */
789 if (sub_attr->la_valid & LA_NLINK && attr->la_valid & LA_NLINK)
790 attr->la_nlink += sub_attr->la_nlink - 2;
791 if (sub_attr->la_valid & LA_SIZE && attr->la_valid & LA_SIZE)
792 attr->la_size += sub_attr->la_size;
794 if (sub_attr->la_valid & LA_ATIME &&
795 attr->la_valid & LA_ATIME &&
796 attr->la_atime < sub_attr->la_atime)
797 attr->la_atime = sub_attr->la_atime;
799 if (sub_attr->la_valid & LA_CTIME &&
800 attr->la_valid & LA_CTIME &&
801 attr->la_ctime < sub_attr->la_ctime)
802 attr->la_ctime = sub_attr->la_ctime;
804 if (sub_attr->la_valid & LA_MTIME &&
805 attr->la_valid & LA_MTIME &&
806 attr->la_mtime < sub_attr->la_mtime)
807 attr->la_mtime = sub_attr->la_mtime;
810 CDEBUG(D_INFO, DFID" stripe_count %d nlink %u size "LPU64"\n",
811 PFID(lu_object_fid(&dt->do_lu)), lo->ldo_stripenr,
812 attr->la_nlink, attr->la_size);
818 * Mark all of sub-stripes dead of the striped directory.
820 static int lod_mark_dead_object(const struct lu_env *env,
821 struct dt_object *dt,
822 struct thandle *handle,
825 struct lod_object *lo = lod_dt_obj(dt);
826 struct lmv_mds_md_v1 *lmv;
827 __u32 dead_hash_type;
833 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
836 rc = lod_load_striping_locked(env, lo);
840 if (lo->ldo_stripenr == 0)
843 rc = lod_get_lmv_ea(env, lo);
847 lmv = lod_env_info(env)->lti_ea_store;
848 lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
849 dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
850 lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
851 for (i = 0; i < lo->ldo_stripenr; i++) {
854 lmv->lmv_master_mdt_index = i;
856 buf.lb_len = sizeof(*lmv);
858 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
860 LU_XATTR_REPLACE, handle);
862 rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
863 XATTR_NAME_LMV, LU_XATTR_REPLACE,
864 handle, BYPASS_CAPA);
873 static int lod_declare_attr_set(const struct lu_env *env,
874 struct dt_object *dt,
875 const struct lu_attr *attr,
876 struct thandle *handle)
878 struct dt_object *next = dt_object_child(dt);
879 struct lod_object *lo = lod_dt_obj(dt);
883 /* Set dead object on all other stripes */
884 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
885 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
886 rc = lod_mark_dead_object(env, dt, handle, true);
891 * declare setattr on the local object
893 rc = dt_declare_attr_set(env, next, attr, handle);
897 /* osp_declare_attr_set() ignores all attributes other than
898 * UID, GID, and size, and osp_attr_set() ignores all but UID
899 * and GID. Declaration of size attr setting happens through
900 * lod_declare_init_size(), and not through this function.
901 * Therefore we need not load striping unless ownership is
902 * changing. This should save memory and (we hope) speed up
904 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
905 if (!(attr->la_valid & (LA_UID | LA_GID)))
908 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
911 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
912 LA_ATIME | LA_MTIME | LA_CTIME)))
916 * load striping information, notice we don't do this when object
917 * is being initialized as we don't need this information till
918 * few specific cases like destroy, chown
920 rc = lod_load_striping(env, lo);
924 if (lo->ldo_stripenr == 0)
928 * if object is striped declare changes on the stripes
930 LASSERT(lo->ldo_stripe);
931 for (i = 0; i < lo->ldo_stripenr; i++) {
932 if (likely(lo->ldo_stripe[i] != NULL)) {
933 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
936 CERROR("failed declaration: %d\n", rc);
942 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
943 dt_object_exists(next) != 0 &&
944 dt_object_remote(next) == 0)
945 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
947 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
948 dt_object_exists(next) &&
949 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
950 struct lod_thread_info *info = lod_env_info(env);
951 struct lu_buf *buf = &info->lti_buf;
953 buf->lb_buf = info->lti_ea_store;
954 buf->lb_len = info->lti_ea_store_size;
955 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
956 LU_XATTR_REPLACE, handle);
962 static int lod_attr_set(const struct lu_env *env,
963 struct dt_object *dt,
964 const struct lu_attr *attr,
965 struct thandle *handle,
966 struct lustre_capa *capa)
968 struct dt_object *next = dt_object_child(dt);
969 struct lod_object *lo = lod_dt_obj(dt);
973 /* Set dead object on all other stripes */
974 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
975 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
976 rc = lod_mark_dead_object(env, dt, handle, false);
981 * apply changes to the local object
983 rc = dt_attr_set(env, next, attr, handle, capa);
987 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
988 if (!(attr->la_valid & (LA_UID | LA_GID)))
991 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
994 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
995 LA_ATIME | LA_MTIME | LA_CTIME)))
999 if (lo->ldo_stripenr == 0)
1003 * if object is striped, apply changes to all the stripes
1005 LASSERT(lo->ldo_stripe);
1006 for (i = 0; i < lo->ldo_stripenr; i++) {
1007 if (likely(lo->ldo_stripe[i] != NULL)) {
1008 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
1011 rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
1014 CERROR("failed declaration: %d\n", rc);
1020 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1021 dt_object_exists(next) != 0 &&
1022 dt_object_remote(next) == 0)
1023 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1025 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1026 dt_object_exists(next) &&
1027 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1028 struct lod_thread_info *info = lod_env_info(env);
1029 struct lu_buf *buf = &info->lti_buf;
1030 struct ost_id *oi = &info->lti_ostid;
1031 struct lu_fid *fid = &info->lti_fid;
1032 struct lov_mds_md_v1 *lmm;
1033 struct lov_ost_data_v1 *objs;
1037 rc1 = lod_get_lov_ea(env, lo);
1041 buf->lb_buf = info->lti_ea_store;
1042 buf->lb_len = info->lti_ea_store_size;
1043 lmm = info->lti_ea_store;
1044 magic = le32_to_cpu(lmm->lmm_magic);
1045 if (magic == LOV_MAGIC_V1)
1046 objs = &(lmm->lmm_objects[0]);
1048 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1049 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1050 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1052 fid_to_ostid(fid, oi);
1053 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1054 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1055 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1061 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1062 struct lu_buf *buf, const char *name,
1063 struct lustre_capa *capa)
1065 struct lod_thread_info *info = lod_env_info(env);
1066 struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev);
1070 rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1071 if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1075 * lod returns default striping on the real root of the device
1076 * this is like the root stores default striping for the whole
1077 * filesystem. historically we've been using a different approach
1078 * and store it in the config.
1080 dt_root_get(env, dev->lod_child, &info->lti_fid);
1081 is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1083 if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1084 struct lov_user_md *lum = buf->lb_buf;
1085 struct lov_desc *desc = &dev->lod_desc;
1087 if (buf->lb_buf == NULL) {
1089 } else if (buf->lb_len >= sizeof(*lum)) {
1090 lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1091 lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1092 lmm_oi_set_id(&lum->lmm_oi, 0);
1093 lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1094 lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1095 lum->lmm_stripe_size = cpu_to_le32(
1096 desc->ld_default_stripe_size);
1097 lum->lmm_stripe_count = cpu_to_le16(
1098 desc->ld_default_stripe_count);
1099 lum->lmm_stripe_offset = cpu_to_le16(
1100 desc->ld_default_stripe_offset);
1110 static int lod_verify_md_striping(struct lod_device *lod,
1111 const struct lmv_user_md_v1 *lum)
1116 if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1117 GOTO(out, rc = -EINVAL);
1119 if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1120 GOTO(out, rc = -EINVAL);
1123 CERROR("%s: invalid lmv_user_md: magic = %x, "
1124 "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1125 lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1126 (int)le32_to_cpu(lum->lum_stripe_offset),
1127 le32_to_cpu(lum->lum_stripe_count), rc);
1132 * Master LMVEA will be same as slave LMVEA, except
1133 * 1. different magic
1134 * 2. No lmv_stripe_fids on slave
1135 * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1137 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1138 const struct lmv_mds_md_v1 *master_lmv)
1140 *slave_lmv = *master_lmv;
1141 slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1144 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1145 struct lu_buf *lmv_buf)
1147 struct lod_thread_info *info = lod_env_info(env);
1148 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1149 struct lod_object *lo = lod_dt_obj(dt);
1150 struct lmv_mds_md_v1 *lmm1;
1153 int type = LU_SEQ_RANGE_ANY;
1159 LASSERT(lo->ldo_dir_striped != 0);
1160 LASSERT(lo->ldo_stripenr > 0);
1161 stripe_count = lo->ldo_stripenr;
1162 lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
1163 if (info->lti_ea_store_size < lmm_size) {
1164 rc = lod_ea_store_resize(info, lmm_size);
1169 lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1170 lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1171 lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1172 lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1173 rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1178 lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1179 fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
1180 for (i = 0; i < lo->ldo_stripenr; i++) {
1181 struct dt_object *dto;
1183 dto = lo->ldo_stripe[i];
1184 LASSERT(dto != NULL);
1185 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
1186 lu_object_fid(&dto->do_lu));
1189 lmv_buf->lb_buf = info->lti_ea_store;
1190 lmv_buf->lb_len = lmm_size;
1191 lo->ldo_dir_striping_cached = 1;
1196 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1197 const struct lu_buf *buf)
1199 struct lod_thread_info *info = lod_env_info(env);
1200 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1201 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1202 struct dt_object **stripe;
1203 union lmv_mds_md *lmm = buf->lb_buf;
1204 struct lmv_mds_md_v1 *lmv1 = &lmm->lmv_md_v1;
1205 struct lu_fid *fid = &info->lti_fid;
1210 if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1213 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1214 lo->ldo_dir_slave_stripe = 1;
1218 if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1221 if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
1224 LASSERT(lo->ldo_stripe == NULL);
1225 OBD_ALLOC(stripe, sizeof(stripe[0]) *
1226 (le32_to_cpu(lmv1->lmv_stripe_count)));
1230 for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1231 struct dt_device *tgt_dt;
1232 struct dt_object *dto;
1233 int type = LU_SEQ_RANGE_ANY;
1236 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1237 if (!fid_is_sane(fid))
1238 GOTO(out, rc = -ESTALE);
1240 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1244 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1245 tgt_dt = lod->lod_child;
1247 struct lod_tgt_desc *tgt;
1249 tgt = LTD_TGT(ltd, idx);
1251 GOTO(out, rc = -ESTALE);
1252 tgt_dt = tgt->ltd_tgt;
1255 dto = dt_locate_at(env, tgt_dt, fid,
1256 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1259 GOTO(out, rc = PTR_ERR(dto));
1264 lo->ldo_stripe = stripe;
1265 lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1266 lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1268 lod_object_free_striping(env, lo);
1273 static int lod_prep_md_striped_create(const struct lu_env *env,
1274 struct dt_object *dt,
1275 struct lu_attr *attr,
1276 const struct lmv_user_md_v1 *lum,
1277 struct dt_object_format *dof,
1280 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1281 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1282 struct lod_object *lo = lod_dt_obj(dt);
1283 struct lod_thread_info *info = lod_env_info(env);
1284 struct dt_object **stripe;
1285 struct lu_buf lmv_buf;
1286 struct lu_buf slave_lmv_buf;
1287 struct lmv_mds_md_v1 *lmm;
1288 struct lmv_mds_md_v1 *slave_lmm = NULL;
1296 /* The lum has been verifed in lod_verify_md_striping */
1297 LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1298 LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1300 stripe_count = le32_to_cpu(lum->lum_stripe_count);
1302 /* shrink the stripe_count to the avaible MDT count */
1303 if (stripe_count > lod->lod_remote_mdt_count + 1)
1304 stripe_count = lod->lod_remote_mdt_count + 1;
1306 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1310 OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1311 if (idx_array == NULL)
1312 GOTO(out_free, rc = -ENOMEM);
1314 for (i = 0; i < stripe_count; i++) {
1315 struct lod_tgt_desc *tgt = NULL;
1316 struct dt_object *dto;
1317 struct lu_fid fid = { 0 };
1319 struct lu_object_conf conf = { 0 };
1320 struct dt_device *tgt_dt = NULL;
1323 /* Right now, master stripe and master object are
1324 * on the same MDT */
1325 idx = le32_to_cpu(lum->lum_stripe_offset);
1326 rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1330 tgt_dt = lod->lod_child;
1334 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1336 for (j = 0; j < lod->lod_remote_mdt_count;
1337 j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1338 bool already_allocated = false;
1341 CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1342 " allocated %d, last allocated %d\n", idx,
1343 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1345 /* Find next available target */
1346 if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1349 /* check whether the idx already exists
1350 * in current allocated array */
1351 for (k = 0; k < i; k++) {
1352 if (idx_array[k] == idx) {
1353 already_allocated = true;
1358 if (already_allocated)
1361 /* check the status of the OSP */
1362 tgt = LTD_TGT(ltd, idx);
1366 tgt_dt = tgt->ltd_tgt;
1367 rc = dt_statfs(env, tgt_dt, NULL);
1369 /* this OSP doesn't feel well */
1374 rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1383 /* Can not allocate more stripes */
1384 if (j == lod->lod_remote_mdt_count) {
1385 CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1386 lod2obd(lod)->obd_name, stripe_count, i - 1);
1390 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1391 " allocated %d, last allocated %d\n", idx,
1392 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1395 /* tgt_dt and fid must be ready after search avaible OSP
1396 * in the above loop */
1397 LASSERT(tgt_dt != NULL);
1398 LASSERT(fid_is_sane(&fid));
1399 conf.loc_flags = LOC_F_NEW;
1400 dto = dt_locate_at(env, tgt_dt, &fid,
1401 dt->do_lu.lo_dev->ld_site->ls_top_dev,
1404 GOTO(out_put, rc = PTR_ERR(dto));
1409 lo->ldo_dir_striped = 1;
1410 lo->ldo_stripe = stripe;
1411 lo->ldo_stripenr = i;
1412 lo->ldo_stripes_allocated = stripe_count;
1414 if (lo->ldo_stripenr == 0)
1415 GOTO(out_put, rc = -ENOSPC);
1417 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1420 lmm = lmv_buf.lb_buf;
1422 OBD_ALLOC_PTR(slave_lmm);
1423 if (slave_lmm == NULL)
1424 GOTO(out_put, rc = -ENOMEM);
1426 lod_prep_slave_lmv_md(slave_lmm, lmm);
1427 slave_lmv_buf.lb_buf = slave_lmm;
1428 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1430 if (!dt_try_as_dir(env, dt_object_child(dt)))
1431 GOTO(out_put, rc = -EINVAL);
1433 for (i = 0; i < lo->ldo_stripenr; i++) {
1434 struct dt_object *dto = stripe[i];
1435 char *stripe_name = info->lti_key;
1437 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1441 if (!dt_try_as_dir(env, dto))
1442 GOTO(out_put, rc = -EINVAL);
1444 rc = dt_declare_insert(env, dto,
1445 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1446 (const struct dt_key *)dot, th);
1450 /* master stripe FID will be put to .. */
1451 rc = dt_declare_insert(env, dto,
1452 (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1453 (const struct dt_key *)dotdot, th);
1457 /* probably nothing to inherite */
1458 if (lo->ldo_striping_cached &&
1459 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1460 lo->ldo_def_stripenr,
1461 lo->ldo_def_stripe_offset)) {
1462 struct lov_user_md_v3 *v3;
1464 /* sigh, lti_ea_store has been used for lmv_buf,
1465 * so we have to allocate buffer for default
1469 GOTO(out_put, rc = -ENOMEM);
1471 memset(v3, 0, sizeof(*v3));
1472 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1473 v3->lmm_stripe_count =
1474 cpu_to_le16(lo->ldo_def_stripenr);
1475 v3->lmm_stripe_offset =
1476 cpu_to_le16(lo->ldo_def_stripe_offset);
1477 v3->lmm_stripe_size =
1478 cpu_to_le32(lo->ldo_def_stripe_size);
1480 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1483 info->lti_buf.lb_buf = v3;
1484 info->lti_buf.lb_len = sizeof(*v3);
1485 rc = dt_declare_xattr_set(env, dto,
1494 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1495 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1496 XATTR_NAME_LMV, 0, th);
1500 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1501 PFID(lu_object_fid(&dto->do_lu)), i);
1502 rc = dt_declare_insert(env, dt_object_child(dt),
1503 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1504 (const struct dt_key *)stripe_name, th);
1508 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1513 rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1514 XATTR_NAME_LMV, 0, th);
1520 for (i = 0; i < stripe_count; i++)
1521 if (stripe[i] != NULL)
1522 lu_object_put(env, &stripe[i]->do_lu);
1523 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1524 lo->ldo_stripenr = 0;
1525 lo->ldo_stripes_allocated = 0;
1526 lo->ldo_stripe = NULL;
1530 if (idx_array != NULL)
1531 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1532 if (slave_lmm != NULL)
1533 OBD_FREE_PTR(slave_lmm);
1539 * Declare create striped md object.
1541 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1542 struct dt_object *dt,
1543 struct lu_attr *attr,
1544 const struct lu_buf *lum_buf,
1545 struct dt_object_format *dof,
1548 struct lod_object *lo = lod_dt_obj(dt);
1549 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1550 struct lmv_user_md_v1 *lum;
1554 lum = lum_buf->lb_buf;
1555 LASSERT(lum != NULL);
1557 CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1558 le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1559 (int)le32_to_cpu(lum->lum_stripe_offset));
1561 if (le32_to_cpu(lum->lum_stripe_count) == 0)
1564 rc = lod_verify_md_striping(lod, lum);
1568 /* prepare dir striped objects */
1569 rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1571 /* failed to create striping, let's reset
1572 * config so that others don't get confused */
1573 lod_object_free_striping(env, lo);
1580 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1581 struct dt_object *dt,
1582 const struct lu_buf *buf,
1583 const char *name, int fl,
1586 struct dt_object *next = dt_object_child(dt);
1587 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1588 struct lod_object *lo = lod_dt_obj(dt);
1593 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1594 struct lmv_user_md_v1 *lum;
1596 LASSERT(buf != NULL && buf->lb_buf != NULL);
1598 rc = lod_verify_md_striping(d, lum);
1603 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1607 /* set xattr to each stripes, if needed */
1608 rc = lod_load_striping(env, lo);
1612 if (lo->ldo_stripenr == 0)
1615 for (i = 0; i < lo->ldo_stripenr; i++) {
1616 LASSERT(lo->ldo_stripe[i]);
1617 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1627 * LOV xattr is a storage for striping, and LOD owns this xattr.
1628 * but LOD allows others to control striping to some extent
1629 * - to reset strping
1630 * - to set new defined striping
1631 * - to set new semi-defined striping
1632 * - number of stripes is defined
1633 * - number of stripes + osts are defined
1636 static int lod_declare_xattr_set(const struct lu_env *env,
1637 struct dt_object *dt,
1638 const struct lu_buf *buf,
1639 const char *name, int fl,
1642 struct dt_object *next = dt_object_child(dt);
1643 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
1649 * allow to declare predefined striping on a new (!mode) object
1650 * which is supposed to be replay of regular file creation
1651 * (when LOV setting is declared)
1652 * LU_XATTR_REPLACE is set to indicate a layout swap
1654 mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1655 if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1656 !(fl & LU_XATTR_REPLACE)) {
1658 * this is a request to manipulate object's striping
1660 if (dt_object_exists(dt)) {
1661 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1665 memset(attr, 0, sizeof(*attr));
1666 attr->la_valid = LA_TYPE | LA_MODE;
1667 attr->la_mode = S_IFREG;
1669 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1670 } else if (S_ISDIR(mode)) {
1671 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1673 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1679 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1681 lo->ldo_striping_cached = 0;
1682 lo->ldo_def_striping_set = 0;
1683 lod_object_set_pool(lo, NULL);
1684 lo->ldo_def_stripe_size = 0;
1685 lo->ldo_def_stripenr = 0;
1686 if (lo->ldo_dir_stripe != NULL)
1687 lo->ldo_dir_striping_cached = 0;
1690 static int lod_xattr_set_internal(const struct lu_env *env,
1691 struct dt_object *dt,
1692 const struct lu_buf *buf,
1693 const char *name, int fl, struct thandle *th,
1694 struct lustre_capa *capa)
1696 struct dt_object *next = dt_object_child(dt);
1697 struct lod_object *lo = lod_dt_obj(dt);
1702 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1703 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1706 if (lo->ldo_stripenr == 0)
1709 for (i = 0; i < lo->ldo_stripenr; i++) {
1710 LASSERT(lo->ldo_stripe[i]);
1711 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1720 static int lod_xattr_del_internal(const struct lu_env *env,
1721 struct dt_object *dt,
1722 const char *name, struct thandle *th,
1723 struct lustre_capa *capa)
1725 struct dt_object *next = dt_object_child(dt);
1726 struct lod_object *lo = lod_dt_obj(dt);
1731 rc = dt_xattr_del(env, next, name, th, capa);
1732 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1735 if (lo->ldo_stripenr == 0)
1738 for (i = 0; i < lo->ldo_stripenr; i++) {
1739 LASSERT(lo->ldo_stripe[i]);
1740 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1749 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1750 struct dt_object *dt,
1751 const struct lu_buf *buf,
1752 const char *name, int fl,
1754 struct lustre_capa *capa)
1756 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1757 struct lod_object *l = lod_dt_obj(dt);
1758 struct lov_user_md_v1 *lum;
1759 struct lov_user_md_v3 *v3 = NULL;
1763 /* If it is striped dir, we should clear the stripe cache for
1764 * slave stripe as well, but there are no effective way to
1765 * notify the LOD on the slave MDT, so we do not cache stripe
1766 * information for slave stripe for now. XXX*/
1767 lod_lov_stripe_cache_clear(l);
1768 LASSERT(buf != NULL && buf->lb_buf != NULL);
1771 rc = lod_verify_striping(d, buf, 0);
1775 if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1778 /* if { size, offset, count } = { 0, -1, 0 } and no pool
1779 * (i.e. all default values specified) then delete default
1780 * striping from dir. */
1782 "set default striping: sz %u # %u offset %d %s %s\n",
1783 (unsigned)lum->lmm_stripe_size,
1784 (unsigned)lum->lmm_stripe_count,
1785 (int)lum->lmm_stripe_offset,
1786 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1788 if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1789 (lum->lmm_stripe_count),
1790 (lum->lmm_stripe_offset)) &&
1791 lum->lmm_magic == LOV_USER_MAGIC_V1) {
1792 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1796 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1802 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1803 struct dt_object *dt,
1804 const struct lu_buf *buf,
1805 const char *name, int fl,
1807 struct lustre_capa *capa)
1809 struct lod_object *l = lod_dt_obj(dt);
1810 struct lmv_user_md_v1 *lum;
1814 LASSERT(buf != NULL && buf->lb_buf != NULL);
1817 CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1818 le32_to_cpu(lum->lum_stripe_count),
1819 (int)le32_to_cpu(lum->lum_stripe_offset));
1821 if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1822 le32_to_cpu(lum->lum_stripe_offset)) &&
1823 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1824 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1828 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1833 /* Update default stripe cache */
1834 if (l->ldo_dir_stripe == NULL) {
1835 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1836 if (l->ldo_dir_stripe == NULL)
1840 l->ldo_dir_striping_cached = 0;
1841 l->ldo_dir_def_striping_set = 1;
1842 l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
1847 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1848 const struct lu_buf *buf, const char *name,
1849 int fl, struct thandle *th,
1850 struct lustre_capa *capa)
1852 struct lod_object *lo = lod_dt_obj(dt);
1853 struct lod_thread_info *info = lod_env_info(env);
1854 struct lu_attr *attr = &info->lti_attr;
1855 struct dt_object_format *dof = &info->lti_format;
1856 struct lu_buf lmv_buf;
1857 struct lu_buf slave_lmv_buf;
1858 struct lmv_mds_md_v1 *lmm;
1859 struct lmv_mds_md_v1 *slave_lmm = NULL;
1864 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1867 /* The stripes are supposed to be allocated in declare phase,
1868 * if there are no stripes being allocated, it will skip */
1869 if (lo->ldo_stripenr == 0)
1872 rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
1876 attr->la_valid = LA_TYPE | LA_MODE;
1877 dof->dof_type = DFT_DIR;
1879 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1882 lmm = lmv_buf.lb_buf;
1884 OBD_ALLOC_PTR(slave_lmm);
1885 if (slave_lmm == NULL)
1888 lod_prep_slave_lmv_md(slave_lmm, lmm);
1889 slave_lmv_buf.lb_buf = slave_lmm;
1890 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1892 for (i = 0; i < lo->ldo_stripenr; i++) {
1893 struct dt_object *dto;
1894 char *stripe_name = info->lti_key;
1896 dto = lo->ldo_stripe[i];
1897 dt_write_lock(env, dto, MOR_TGT_CHILD);
1898 rc = dt_create(env, dto, attr, NULL, dof, th);
1899 dt_write_unlock(env, dto);
1903 rc = dt_insert(env, dto,
1904 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1905 (const struct dt_key *)dot, th, capa, 0);
1909 rc = dt_insert(env, dto,
1910 (struct dt_rec *)lu_object_fid(&dt->do_lu),
1911 (const struct dt_key *)dotdot, th, capa, 0);
1915 if (lo->ldo_striping_cached &&
1916 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1917 lo->ldo_def_stripenr,
1918 lo->ldo_def_stripe_offset)) {
1919 struct lov_user_md_v3 *v3;
1921 /* sigh, lti_ea_store has been used for lmv_buf,
1922 * so we have to allocate buffer for default
1928 memset(v3, 0, sizeof(*v3));
1929 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1930 v3->lmm_stripe_count =
1931 cpu_to_le16(lo->ldo_def_stripenr);
1932 v3->lmm_stripe_offset =
1933 cpu_to_le16(lo->ldo_def_stripe_offset);
1934 v3->lmm_stripe_size =
1935 cpu_to_le32(lo->ldo_def_stripe_size);
1937 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1940 info->lti_buf.lb_buf = v3;
1941 info->lti_buf.lb_len = sizeof(*v3);
1942 rc = dt_xattr_set(env, dto, &info->lti_buf,
1943 XATTR_NAME_LOV, 0, th, capa);
1949 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1950 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
1955 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1956 PFID(lu_object_fid(&dto->do_lu)), i);
1957 rc = dt_insert(env, dt_object_child(dt),
1958 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1959 (const struct dt_key *)stripe_name, th, capa, 0);
1963 rc = dt_ref_add(env, dt_object_child(dt), th);
1968 rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
1972 if (slave_lmm != NULL)
1973 OBD_FREE_PTR(slave_lmm);
1978 int lod_dir_striping_create_internal(const struct lu_env *env,
1979 struct dt_object *dt,
1980 struct lu_attr *attr,
1981 struct dt_object_format *dof,
1985 struct lod_thread_info *info = lod_env_info(env);
1986 struct lod_object *lo = lod_dt_obj(dt);
1990 if (lo->ldo_dir_def_striping_set &&
1991 !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1992 lo->ldo_dir_stripe_offset)) {
1993 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1994 int stripe_count = lo->ldo_stripenr;
1996 if (info->lti_ea_store_size < sizeof(*v1)) {
1997 rc = lod_ea_store_resize(info, sizeof(*v1));
2000 v1 = info->lti_ea_store;
2003 memset(v1, 0, sizeof(*v1));
2004 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2005 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2006 v1->lum_stripe_offset =
2007 cpu_to_le32(lo->ldo_dir_stripe_offset);
2009 info->lti_buf.lb_buf = v1;
2010 info->lti_buf.lb_len = sizeof(*v1);
2013 rc = lod_declare_xattr_set_lmv(env, dt, attr,
2014 &info->lti_buf, dof, th);
2016 rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2017 XATTR_NAME_LMV, 0, th,
2023 /* Transfer default LMV striping from the parent */
2024 if (lo->ldo_dir_striping_cached &&
2025 !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2026 lo->ldo_dir_def_stripe_offset)) {
2027 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2028 int def_stripe_count = lo->ldo_dir_def_stripenr;
2030 if (info->lti_ea_store_size < sizeof(*v1)) {
2031 rc = lod_ea_store_resize(info, sizeof(*v1));
2034 v1 = info->lti_ea_store;
2037 memset(v1, 0, sizeof(*v1));
2038 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2039 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2040 v1->lum_stripe_offset =
2041 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2043 cpu_to_le32(lo->ldo_dir_def_hash_type);
2045 info->lti_buf.lb_buf = v1;
2046 info->lti_buf.lb_len = sizeof(*v1);
2048 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2049 XATTR_NAME_DEFAULT_LMV,
2052 rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2054 XATTR_NAME_DEFAULT_LMV, 0,
2060 /* Transfer default LOV striping from the parent */
2061 if (lo->ldo_striping_cached &&
2062 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2063 lo->ldo_def_stripenr,
2064 lo->ldo_def_stripe_offset)) {
2065 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2067 if (info->lti_ea_store_size < sizeof(*v3)) {
2068 rc = lod_ea_store_resize(info, sizeof(*v3));
2071 v3 = info->lti_ea_store;
2074 memset(v3, 0, sizeof(*v3));
2075 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2076 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2077 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2078 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2080 strncpy(v3->lmm_pool_name, lo->ldo_pool,
2083 info->lti_buf.lb_buf = v3;
2084 info->lti_buf.lb_len = sizeof(*v3);
2087 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2088 XATTR_NAME_LOV, 0, th);
2090 rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2091 XATTR_NAME_LOV, 0, th,
2100 static int lod_declare_dir_striping_create(const struct lu_env *env,
2101 struct dt_object *dt,
2102 struct lu_attr *attr,
2103 struct dt_object_format *dof,
2106 return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2109 static int lod_dir_striping_create(const struct lu_env *env,
2110 struct dt_object *dt,
2111 struct lu_attr *attr,
2112 struct dt_object_format *dof,
2115 return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2118 static int lod_xattr_set(const struct lu_env *env,
2119 struct dt_object *dt, const struct lu_buf *buf,
2120 const char *name, int fl, struct thandle *th,
2121 struct lustre_capa *capa)
2123 struct dt_object *next = dt_object_child(dt);
2127 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2128 strcmp(name, XATTR_NAME_LMV) == 0) {
2129 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2131 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2132 LMV_HASH_FLAG_MIGRATION)
2133 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2135 rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2140 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2141 strcmp(name, XATTR_NAME_LOV) == 0) {
2143 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2145 } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2146 strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2148 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2151 } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2152 !strcmp(name, XATTR_NAME_LOV)) {
2153 /* in case of lov EA swap, just set it
2154 * if not, it is a replay so check striping match what we
2155 * already have during req replay, declare_xattr_set()
2156 * defines striping, then create() does the work
2158 if (fl & LU_XATTR_REPLACE) {
2159 /* free stripes, then update disk */
2160 lod_object_free_striping(env, lod_dt_obj(dt));
2161 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2163 rc = lod_striping_create(env, dt, NULL, NULL, th);
2168 /* then all other xattr */
2169 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2174 static int lod_declare_xattr_del(const struct lu_env *env,
2175 struct dt_object *dt, const char *name,
2178 return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2181 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2182 const char *name, struct thandle *th,
2183 struct lustre_capa *capa)
2185 if (!strcmp(name, XATTR_NAME_LOV))
2186 lod_object_free_striping(env, lod_dt_obj(dt));
2187 return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2190 static int lod_xattr_list(const struct lu_env *env,
2191 struct dt_object *dt, struct lu_buf *buf,
2192 struct lustre_capa *capa)
2194 return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2197 int lod_object_set_pool(struct lod_object *o, char *pool)
2202 len = strlen(o->ldo_pool);
2203 OBD_FREE(o->ldo_pool, len + 1);
2208 OBD_ALLOC(o->ldo_pool, len + 1);
2209 if (o->ldo_pool == NULL)
2211 strcpy(o->ldo_pool, pool);
2216 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2218 return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2222 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2223 struct lod_object *lp)
2225 struct lod_thread_info *info = lod_env_info(env);
2226 struct lov_user_md_v1 *v1 = NULL;
2227 struct lov_user_md_v3 *v3 = NULL;
2231 /* called from MDD without parent being write locked,
2233 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2234 rc = lod_get_lov_ea(env, lp);
2238 if (rc < sizeof(struct lov_user_md)) {
2239 /* don't lookup for non-existing or invalid striping */
2240 lp->ldo_def_striping_set = 0;
2241 lp->ldo_striping_cached = 1;
2242 lp->ldo_def_stripe_size = 0;
2243 lp->ldo_def_stripenr = 0;
2244 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2245 GOTO(unlock, rc = 0);
2249 v1 = info->lti_ea_store;
2250 if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2251 lustre_swab_lov_user_md_v1(v1);
2252 } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2253 v3 = (struct lov_user_md_v3 *)v1;
2254 lustre_swab_lov_user_md_v3(v3);
2257 if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2258 GOTO(unlock, rc = 0);
2260 if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2261 GOTO(unlock, rc = 0);
2263 CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2264 PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2265 (int)v1->lmm_stripe_count,
2266 (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2268 lp->ldo_def_stripenr = v1->lmm_stripe_count;
2269 lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2270 lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2271 lp->ldo_striping_cached = 1;
2272 lp->ldo_def_striping_set = 1;
2273 if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2274 /* XXX: sanity check here */
2275 v3 = (struct lov_user_md_v3 *) v1;
2276 if (v3->lmm_pool_name[0])
2277 lod_object_set_pool(lp, v3->lmm_pool_name);
2281 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2286 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2287 struct lod_object *lp)
2289 struct lod_thread_info *info = lod_env_info(env);
2290 struct lmv_user_md_v1 *v1 = NULL;
2294 /* called from MDD without parent being write locked,
2296 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2297 rc = lod_get_default_lmv_ea(env, lp);
2301 if (rc < sizeof(struct lmv_user_md)) {
2302 /* don't lookup for non-existing or invalid striping */
2303 lp->ldo_dir_def_striping_set = 0;
2304 lp->ldo_dir_striping_cached = 1;
2305 lp->ldo_dir_def_stripenr = 0;
2306 lp->ldo_dir_def_stripe_offset =
2307 (typeof(v1->lum_stripe_offset))(-1);
2308 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2309 GOTO(unlock, rc = 0);
2313 v1 = info->lti_ea_store;
2315 lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2316 lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2317 lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2318 lp->ldo_dir_def_striping_set = 1;
2319 lp->ldo_dir_striping_cached = 1;
2323 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2327 static int lod_cache_parent_striping(const struct lu_env *env,
2328 struct lod_object *lp,
2334 rc = lod_load_striping(env, lp);
2338 if (!lp->ldo_striping_cached) {
2339 /* we haven't tried to get default striping for
2340 * the directory yet, let's cache it in the object */
2341 rc = lod_cache_parent_lov_striping(env, lp);
2346 if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2347 rc = lod_cache_parent_lmv_striping(env, lp);
2353 * used to transfer default striping data to the object being created
2355 static void lod_ah_init(const struct lu_env *env,
2356 struct dt_allocation_hint *ah,
2357 struct dt_object *parent,
2358 struct dt_object *child,
2361 struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2362 struct dt_object *nextp = NULL;
2363 struct dt_object *nextc;
2364 struct lod_object *lp = NULL;
2365 struct lod_object *lc;
2366 struct lov_desc *desc;
2372 if (likely(parent)) {
2373 nextp = dt_object_child(parent);
2374 lp = lod_dt_obj(parent);
2375 rc = lod_load_striping(env, lp);
2380 nextc = dt_object_child(child);
2381 lc = lod_dt_obj(child);
2383 LASSERT(lc->ldo_stripenr == 0);
2384 LASSERT(lc->ldo_stripe == NULL);
2387 * local object may want some hints
2388 * in case of late striping creation, ->ah_init()
2389 * can be called with local object existing
2391 if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2392 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2393 NULL : nextp, nextc, child_mode);
2395 if (S_ISDIR(child_mode)) {
2396 if (lc->ldo_dir_stripe == NULL) {
2397 OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2398 if (lc->ldo_dir_stripe == NULL)
2402 if (lp->ldo_dir_stripe == NULL) {
2403 OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2404 if (lp->ldo_dir_stripe == NULL)
2408 rc = lod_cache_parent_striping(env, lp, child_mode);
2412 /* transfer defaults to new directory */
2413 if (lp->ldo_striping_cached) {
2415 lod_object_set_pool(lc, lp->ldo_pool);
2416 lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2417 lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2418 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2419 lc->ldo_striping_cached = 1;
2420 lc->ldo_def_striping_set = 1;
2421 CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2422 (int)lc->ldo_def_stripe_size,
2423 (int)lc->ldo_def_stripe_offset,
2424 (int)lc->ldo_def_stripenr);
2427 /* transfer dir defaults to new directory */
2428 if (lp->ldo_dir_striping_cached) {
2429 lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2430 lc->ldo_dir_def_stripe_offset =
2431 lp->ldo_dir_def_stripe_offset;
2432 lc->ldo_dir_def_hash_type =
2433 lp->ldo_dir_def_hash_type;
2434 lc->ldo_dir_striping_cached = 1;
2435 lc->ldo_dir_def_striping_set = 1;
2436 CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2437 (int)lc->ldo_dir_def_stripenr,
2438 (int)lc->ldo_dir_def_stripe_offset,
2439 lc->ldo_dir_def_hash_type);
2442 /* If the directory is specified with certain stripes */
2443 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2444 const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2446 rc = lod_verify_md_striping(d, lum1);
2448 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2449 /* Directory will be striped only if
2450 * stripe_count > 1 */
2452 le32_to_cpu(lum1->lum_stripe_count);
2453 lc->ldo_dir_stripe_offset =
2454 le32_to_cpu(lum1->lum_stripe_offset);
2455 lc->ldo_dir_hash_type =
2456 le32_to_cpu(lum1->lum_hash_type);
2457 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2459 (int)lc->ldo_dir_stripe_offset);
2461 } else if (lp->ldo_dir_def_striping_set) {
2462 /* If there are default dir stripe from parent */
2463 lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2464 lc->ldo_dir_stripe_offset =
2465 lp->ldo_dir_def_stripe_offset;
2466 lc->ldo_dir_hash_type =
2467 lp->ldo_dir_def_hash_type;
2468 CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2470 (int)lc->ldo_dir_stripe_offset);
2472 /* set default stripe for this directory */
2473 lc->ldo_stripenr = 0;
2474 lc->ldo_dir_stripe_offset = -1;
2477 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2478 lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2484 * if object is going to be striped over OSTs, transfer default
2485 * striping information to the child, so that we can use it
2486 * during declaration and creation
2488 if (!lod_object_will_be_striped(S_ISREG(child_mode),
2489 lu_object_fid(&child->do_lu)))
2492 * try from the parent
2494 if (likely(parent)) {
2495 lod_cache_parent_striping(env, lp, child_mode);
2497 lc->ldo_def_stripe_offset = (__u16) -1;
2499 if (lp->ldo_def_striping_set) {
2501 lod_object_set_pool(lc, lp->ldo_pool);
2502 lc->ldo_stripenr = lp->ldo_def_stripenr;
2503 lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2504 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2505 CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2506 lc->ldo_stripenr, lc->ldo_stripe_size,
2507 lp->ldo_pool ? lp->ldo_pool : "");
2512 * if the parent doesn't provide with specific pattern, grab fs-wide one
2514 desc = &d->lod_desc;
2515 if (lc->ldo_stripenr == 0)
2516 lc->ldo_stripenr = desc->ld_default_stripe_count;
2517 if (lc->ldo_stripe_size == 0)
2518 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2519 CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2520 lc->ldo_stripenr, lc->ldo_stripe_size,
2521 lc->ldo_pool ? lc->ldo_pool : "");
2524 /* we do not cache stripe information for slave stripe, see
2525 * lod_xattr_set_lov_on_dir */
2526 if (lp != NULL && lp->ldo_dir_slave_stripe)
2527 lod_lov_stripe_cache_clear(lp);
2532 #define ll_do_div64(aaa,bbb) do_div((aaa), (bbb))
2534 * this function handles a special case when truncate was done
2535 * on a stripeless object and now striping is being created
2536 * we can't lose that size, so we have to propagate it to newly
2539 static int lod_declare_init_size(const struct lu_env *env,
2540 struct dt_object *dt, struct thandle *th)
2542 struct dt_object *next = dt_object_child(dt);
2543 struct lod_object *lo = lod_dt_obj(dt);
2544 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
2545 uint64_t size, offs;
2549 /* XXX: we support the simplest (RAID0) striping so far */
2550 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2551 LASSERT(lo->ldo_stripe_size > 0);
2553 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2554 LASSERT(attr->la_valid & LA_SIZE);
2558 size = attr->la_size;
2562 /* ll_do_div64(a, b) returns a % b, and a = a / b */
2563 ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2564 stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2566 size = size * lo->ldo_stripe_size;
2567 offs = attr->la_size;
2568 size += ll_do_div64(offs, lo->ldo_stripe_size);
2570 attr->la_valid = LA_SIZE;
2571 attr->la_size = size;
2573 rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2579 * Create declaration of striped object
2581 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2582 struct lu_attr *attr,
2583 const struct lu_buf *lovea, struct thandle *th)
2585 struct lod_thread_info *info = lod_env_info(env);
2586 struct dt_object *next = dt_object_child(dt);
2587 struct lod_object *lo = lod_dt_obj(dt);
2591 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2592 /* failed to create striping, let's reset
2593 * config so that others don't get confused */
2594 lod_object_free_striping(env, lo);
2595 GOTO(out, rc = -ENOMEM);
2598 if (!dt_object_remote(next)) {
2599 /* choose OST and generate appropriate objects */
2600 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2602 /* failed to create striping, let's reset
2603 * config so that others don't get confused */
2604 lod_object_free_striping(env, lo);
2609 * declare storage for striping data
2611 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2612 lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
2614 /* LOD can not choose OST objects for remote objects, i.e.
2615 * stripes must be ready before that. Right now, it can only
2616 * happen during migrate, i.e. migrate process needs to create
2617 * remote regular file (mdd_migrate_create), then the migrate
2618 * process will provide stripeEA. */
2619 LASSERT(lovea != NULL);
2620 info->lti_buf = *lovea;
2623 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2624 XATTR_NAME_LOV, 0, th);
2629 * if striping is created with local object's size > 0,
2630 * we have to propagate this size to specific object
2631 * the case is possible only when local object was created previously
2633 if (dt_object_exists(next))
2634 rc = lod_declare_init_size(env, dt, th);
2640 static int lod_declare_object_create(const struct lu_env *env,
2641 struct dt_object *dt,
2642 struct lu_attr *attr,
2643 struct dt_allocation_hint *hint,
2644 struct dt_object_format *dof,
2647 struct dt_object *next = dt_object_child(dt);
2648 struct lod_object *lo = lod_dt_obj(dt);
2657 * first of all, we declare creation of local object
2659 rc = dt_declare_create(env, next, attr, hint, dof, th);
2663 if (dof->dof_type == DFT_SYM)
2664 dt->do_body_ops = &lod_body_lnk_ops;
2667 * it's lod_ah_init() who has decided the object will striped
2669 if (dof->dof_type == DFT_REGULAR) {
2670 /* callers don't want stripes */
2671 /* XXX: all tricky interactions with ->ah_make_hint() decided
2672 * to use striping, then ->declare_create() behaving differently
2673 * should be cleaned */
2674 if (dof->u.dof_reg.striped == 0)
2675 lo->ldo_stripenr = 0;
2676 if (lo->ldo_stripenr > 0)
2677 rc = lod_declare_striped_object(env, dt, attr,
2679 } else if (dof->dof_type == DFT_DIR) {
2680 /* Orphan object (like migrating object) does not have
2681 * lod_dir_stripe, see lod_ah_init */
2682 if (lo->ldo_dir_stripe != NULL)
2683 rc = lod_declare_dir_striping_create(env, dt, attr,
2690 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2691 struct lu_attr *attr, struct dt_object_format *dof,
2694 struct lod_object *lo = lod_dt_obj(dt);
2698 LASSERT(lo->ldo_striping_cached == 0);
2700 /* create all underlying objects */
2701 for (i = 0; i < lo->ldo_stripenr; i++) {
2702 LASSERT(lo->ldo_stripe[i]);
2703 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2709 rc = lod_generate_and_set_lovea(env, lo, th);
2714 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2715 struct lu_attr *attr,
2716 struct dt_allocation_hint *hint,
2717 struct dt_object_format *dof, struct thandle *th)
2719 struct dt_object *next = dt_object_child(dt);
2720 struct lod_object *lo = lod_dt_obj(dt);
2724 /* create local object */
2725 rc = dt_create(env, next, attr, hint, dof, th);
2729 if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2730 lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2731 rc = lod_striping_create(env, dt, attr, dof, th);
2736 static int lod_declare_object_destroy(const struct lu_env *env,
2737 struct dt_object *dt,
2740 struct dt_object *next = dt_object_child(dt);
2741 struct lod_object *lo = lod_dt_obj(dt);
2742 struct lod_thread_info *info = lod_env_info(env);
2743 char *stripe_name = info->lti_key;
2748 * load striping information, notice we don't do this when object
2749 * is being initialized as we don't need this information till
2750 * few specific cases like destroy, chown
2752 rc = lod_load_striping(env, lo);
2756 /* declare destroy for all underlying objects */
2757 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2758 rc = next->do_ops->do_index_try(env, next,
2759 &dt_directory_features);
2763 for (i = 0; i < lo->ldo_stripenr; i++) {
2764 rc = dt_declare_ref_del(env, next, th);
2767 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2768 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2770 rc = dt_declare_delete(env, next,
2771 (const struct dt_key *)stripe_name, th);
2777 * we declare destroy for the local object
2779 rc = dt_declare_destroy(env, next, th);
2783 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2786 /* declare destroy all striped objects */
2787 for (i = 0; i < lo->ldo_stripenr; i++) {
2788 if (likely(lo->ldo_stripe[i] != NULL)) {
2789 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2798 static int lod_object_destroy(const struct lu_env *env,
2799 struct dt_object *dt, struct thandle *th)
2801 struct dt_object *next = dt_object_child(dt);
2802 struct lod_object *lo = lod_dt_obj(dt);
2803 struct lod_thread_info *info = lod_env_info(env);
2804 char *stripe_name = info->lti_key;
2808 /* destroy sub-stripe of master object */
2809 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2810 rc = next->do_ops->do_index_try(env, next,
2811 &dt_directory_features);
2815 for (i = 0; i < lo->ldo_stripenr; i++) {
2816 rc = dt_ref_del(env, next, th);
2820 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2821 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2824 CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
2825 PFID(lu_object_fid(&dt->do_lu)), stripe_name,
2826 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
2828 rc = dt_delete(env, next,
2829 (const struct dt_key *)stripe_name,
2835 rc = dt_destroy(env, next, th);
2839 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2842 /* destroy all striped objects */
2843 for (i = 0; i < lo->ldo_stripenr; i++) {
2844 if (likely(lo->ldo_stripe[i] != NULL) &&
2845 (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
2846 i == cfs_fail_val)) {
2847 rc = dt_destroy(env, lo->ldo_stripe[i], th);
2856 static int lod_declare_ref_add(const struct lu_env *env,
2857 struct dt_object *dt, struct thandle *th)
2859 return dt_declare_ref_add(env, dt_object_child(dt), th);
2862 static int lod_ref_add(const struct lu_env *env,
2863 struct dt_object *dt, struct thandle *th)
2865 return dt_ref_add(env, dt_object_child(dt), th);
2868 static int lod_declare_ref_del(const struct lu_env *env,
2869 struct dt_object *dt, struct thandle *th)
2871 return dt_declare_ref_del(env, dt_object_child(dt), th);
2874 static int lod_ref_del(const struct lu_env *env,
2875 struct dt_object *dt, struct thandle *th)
2877 return dt_ref_del(env, dt_object_child(dt), th);
2880 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2881 struct dt_object *dt,
2882 struct lustre_capa *old, __u64 opc)
2884 return dt_capa_get(env, dt_object_child(dt), old, opc);
2887 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
2888 __u64 start, __u64 end)
2890 return dt_object_sync(env, dt_object_child(dt), start, end);
2893 struct lod_slave_locks {
2895 struct lustre_handle lsl_handle[0];
2898 static int lod_object_unlock_internal(const struct lu_env *env,
2899 struct dt_object *dt,
2900 struct ldlm_enqueue_info *einfo,
2901 ldlm_policy_data_t *policy)
2903 struct lod_object *lo = lod_dt_obj(dt);
2904 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2909 if (slave_locks == NULL)
2912 for (i = 1; i < slave_locks->lsl_lock_count; i++) {
2913 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2916 einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2917 rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2920 rc = rc == 0 ? rc1 : rc;
2927 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2928 struct ldlm_enqueue_info *einfo,
2929 union ldlm_policy_data *policy)
2931 struct lod_object *lo = lod_dt_obj(dt);
2932 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2933 int slave_locks_size;
2937 if (slave_locks == NULL)
2940 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2943 rc = lod_load_striping(env, lo);
2947 /* Note: for remote lock for single stripe dir, MDT will cancel
2948 * the lock by lockh directly */
2949 if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
2952 /* Only cancel slave lock for striped dir */
2953 rc = lod_object_unlock_internal(env, dt, einfo, policy);
2955 slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2956 sizeof(slave_locks->lsl_handle[0]);
2957 OBD_FREE(slave_locks, slave_locks_size);
2958 einfo->ei_cbdata = NULL;
2963 static int lod_object_lock(const struct lu_env *env,
2964 struct dt_object *dt,
2965 struct lustre_handle *lh,
2966 struct ldlm_enqueue_info *einfo,
2967 union ldlm_policy_data *policy)
2969 struct lod_object *lo = lod_dt_obj(dt);
2972 int slave_locks_size;
2973 struct lod_slave_locks *slave_locks = NULL;
2976 /* remote object lock */
2977 if (!einfo->ei_enq_slave) {
2978 LASSERT(dt_object_remote(dt));
2979 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2983 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2986 rc = lod_load_striping(env, lo);
2991 if (lo->ldo_stripenr <= 1)
2994 slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2995 sizeof(slave_locks->lsl_handle[0]);
2996 /* Freed in lod_object_unlock */
2997 OBD_ALLOC(slave_locks, slave_locks_size);
2998 if (slave_locks == NULL)
3000 slave_locks->lsl_lock_count = lo->ldo_stripenr;
3002 /* striped directory lock */
3003 for (i = 1; i < lo->ldo_stripenr; i++) {
3004 struct lustre_handle lockh;
3005 struct ldlm_res_id *res_id;
3007 res_id = &lod_env_info(env)->lti_res_id;
3008 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3010 einfo->ei_res_id = res_id;
3012 LASSERT(lo->ldo_stripe[i]);
3013 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3017 slave_locks->lsl_handle[i] = lockh;
3020 einfo->ei_cbdata = slave_locks;
3023 if (rc != 0 && slave_locks != NULL) {
3024 einfo->ei_cbdata = slave_locks;
3025 lod_object_unlock_internal(env, dt, einfo, policy);
3026 OBD_FREE(slave_locks, slave_locks_size);
3027 einfo->ei_cbdata = NULL;
3033 struct dt_object_operations lod_obj_ops = {
3034 .do_read_lock = lod_object_read_lock,
3035 .do_write_lock = lod_object_write_lock,
3036 .do_read_unlock = lod_object_read_unlock,
3037 .do_write_unlock = lod_object_write_unlock,
3038 .do_write_locked = lod_object_write_locked,
3039 .do_attr_get = lod_attr_get,
3040 .do_declare_attr_set = lod_declare_attr_set,
3041 .do_attr_set = lod_attr_set,
3042 .do_xattr_get = lod_xattr_get,
3043 .do_declare_xattr_set = lod_declare_xattr_set,
3044 .do_xattr_set = lod_xattr_set,
3045 .do_declare_xattr_del = lod_declare_xattr_del,
3046 .do_xattr_del = lod_xattr_del,
3047 .do_xattr_list = lod_xattr_list,
3048 .do_ah_init = lod_ah_init,
3049 .do_declare_create = lod_declare_object_create,
3050 .do_create = lod_object_create,
3051 .do_declare_destroy = lod_declare_object_destroy,
3052 .do_destroy = lod_object_destroy,
3053 .do_index_try = lod_index_try,
3054 .do_declare_ref_add = lod_declare_ref_add,
3055 .do_ref_add = lod_ref_add,
3056 .do_declare_ref_del = lod_declare_ref_del,
3057 .do_ref_del = lod_ref_del,
3058 .do_capa_get = lod_capa_get,
3059 .do_object_sync = lod_object_sync,
3060 .do_object_lock = lod_object_lock,
3061 .do_object_unlock = lod_object_unlock,
3064 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3065 struct lu_buf *buf, loff_t *pos,
3066 struct lustre_capa *capa)
3068 struct dt_object *next = dt_object_child(dt);
3069 return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3072 static ssize_t lod_declare_write(const struct lu_env *env,
3073 struct dt_object *dt,
3074 const struct lu_buf *buf, loff_t pos,
3077 return dt_declare_record_write(env, dt_object_child(dt),
3081 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3082 const struct lu_buf *buf, loff_t *pos,
3083 struct thandle *th, struct lustre_capa *capa, int iq)
3085 struct dt_object *next = dt_object_child(dt);
3087 return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3090 static const struct dt_body_operations lod_body_lnk_ops = {
3091 .dbo_read = lod_read,
3092 .dbo_declare_write = lod_declare_write,
3093 .dbo_write = lod_write
3096 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3097 const struct lu_object_conf *conf)
3099 struct lod_device *lod = lu2lod_dev(lo->lo_dev);
3100 struct lu_device *cdev = NULL;
3101 struct lu_object *cobj;
3102 struct lod_tgt_descs *ltd = NULL;
3103 struct lod_tgt_desc *tgt;
3105 int type = LU_SEQ_RANGE_ANY;
3109 rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3113 if (type == LU_SEQ_RANGE_MDT &&
3114 idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3115 cdev = &lod->lod_child->dd_lu_dev;
3116 } else if (type == LU_SEQ_RANGE_MDT) {
3117 ltd = &lod->lod_mdt_descs;
3119 } else if (type == LU_SEQ_RANGE_OST) {
3120 ltd = &lod->lod_ost_descs;
3127 if (ltd->ltd_tgts_size > idx &&
3128 cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3129 tgt = LTD_TGT(ltd, idx);
3131 LASSERT(tgt != NULL);
3132 LASSERT(tgt->ltd_tgt != NULL);
3134 cdev = &(tgt->ltd_tgt->dd_lu_dev);
3136 lod_putref(lod, ltd);
3139 if (unlikely(cdev == NULL))
3142 cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3143 if (unlikely(cobj == NULL))
3146 lu_object_add(lo, cobj);
3151 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3155 if (lo->ldo_dir_stripe != NULL) {
3156 OBD_FREE_PTR(lo->ldo_dir_stripe);
3157 lo->ldo_dir_stripe = NULL;
3160 if (lo->ldo_stripe) {
3161 LASSERT(lo->ldo_stripes_allocated > 0);
3163 for (i = 0; i < lo->ldo_stripenr; i++) {
3164 if (lo->ldo_stripe[i])
3165 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3168 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3169 OBD_FREE(lo->ldo_stripe, i);
3170 lo->ldo_stripe = NULL;
3171 lo->ldo_stripes_allocated = 0;
3173 lo->ldo_stripenr = 0;
3174 lo->ldo_pattern = 0;
3178 * ->start is called once all slices are initialized, including header's
3179 * cache for mode (object type). using the type we can initialize ops
3181 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3183 if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3184 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3188 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3190 struct lod_object *mo = lu2lod_obj(o);
3193 * release all underlying object pinned
3196 lod_object_free_striping(env, mo);
3198 lod_object_set_pool(mo, NULL);
3201 OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3204 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3206 /* XXX: shouldn't we release everything here in case if object
3207 * creation failed before? */
3210 static int lod_object_print(const struct lu_env *env, void *cookie,
3211 lu_printer_t p, const struct lu_object *l)
3213 struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3215 return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3218 struct lu_object_operations lod_lu_obj_ops = {
3219 .loo_object_init = lod_object_init,
3220 .loo_object_start = lod_object_start,
3221 .loo_object_free = lod_object_free,
3222 .loo_object_release = lod_object_release,
3223 .loo_object_print = lod_object_print,