4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2013, Intel Corporation.
29 * lustre/lod/lod_object.c
31 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
49 #include "lod_internal.h"
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58 struct dt_rec *rec, const struct dt_key *key,
59 struct lustre_capa *capa)
61 struct dt_object *next = dt_object_child(dt);
62 return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
65 static int lod_declare_index_insert(const struct lu_env *env,
67 const struct dt_rec *rec,
68 const struct dt_key *key,
69 struct thandle *handle)
71 return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
74 static int lod_index_insert(const struct lu_env *env,
76 const struct dt_rec *rec,
77 const struct dt_key *key,
79 struct lustre_capa *capa,
82 return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
85 static int lod_declare_index_delete(const struct lu_env *env,
87 const struct dt_key *key,
90 return dt_declare_delete(env, dt_object_child(dt), key, th);
93 static int lod_index_delete(const struct lu_env *env,
95 const struct dt_key *key,
97 struct lustre_capa *capa)
99 return dt_delete(env, dt_object_child(dt), key, th, capa);
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103 struct dt_object *dt, __u32 attr,
104 struct lustre_capa *capa)
106 struct dt_object *next = dt_object_child(dt);
107 struct lod_it *it = &lod_env_info(env)->lti_it;
108 struct dt_it *it_next;
111 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
115 /* currently we do not use more than one iterator per thread
116 * so we store it in thread info. if at some point we need
117 * more active iterators in a single thread, we can allocate
119 LASSERT(it->lit_obj == NULL);
121 it->lit_it = it_next;
124 return (struct dt_it *)it;
127 #define LOD_CHECK_IT(env, it) \
129 LASSERT((it)->lit_obj != NULL); \
130 LASSERT((it)->lit_it != NULL); \
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
135 struct lod_it *it = (struct lod_it *)di;
137 LOD_CHECK_IT(env, it);
138 it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
140 /* the iterator not in use any more */
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146 const struct dt_key *key)
148 const struct lod_it *it = (const struct lod_it *)di;
150 LOD_CHECK_IT(env, it);
151 return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
156 struct lod_it *it = (struct lod_it *)di;
158 LOD_CHECK_IT(env, it);
159 return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
164 struct lod_it *it = (struct lod_it *)di;
166 LOD_CHECK_IT(env, it);
167 return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
172 const struct lod_it *it = (const struct lod_it *)di;
174 LOD_CHECK_IT(env, it);
175 return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
180 struct lod_it *it = (struct lod_it *)di;
182 LOD_CHECK_IT(env, it);
183 return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187 struct dt_rec *rec, __u32 attr)
189 const struct lod_it *it = (const struct lod_it *)di;
191 LOD_CHECK_IT(env, it);
192 return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
199 const struct lod_it *it = (const struct lod_it *)di;
201 LOD_CHECK_IT(env, it);
202 return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
208 const struct lod_it *it = (const struct lod_it *)di;
210 LOD_CHECK_IT(env, it);
211 return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
216 const struct lod_it *it = (const struct lod_it *)di;
218 LOD_CHECK_IT(env, it);
219 return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
225 const struct lod_it *it = (const struct lod_it *)di;
227 LOD_CHECK_IT(env, it);
228 return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
232 static struct dt_index_operations lod_index_ops = {
233 .dio_lookup = lod_index_lookup,
234 .dio_declare_insert = lod_declare_index_insert,
235 .dio_insert = lod_index_insert,
236 .dio_declare_delete = lod_declare_index_delete,
237 .dio_delete = lod_index_delete,
245 .key_size = lod_it_key_size,
247 .rec_size = lod_it_rec_size,
248 .store = lod_it_store,
250 .key_rec = lod_it_key_rec,
255 * Implementation of dt_index_operations:: dio_it.init
257 * This function is to initialize the iterator for striped directory,
258 * basically these lod_striped_it_xxx will just locate the stripe
259 * and call the correspondent api of its next lower layer.
261 * \param[in] env execution environment.
262 * \param[in] dt the striped directory object to be iterated.
263 * \param[in] attr the attribute of iterator, mostly used to indicate
264 * the entry attribute in the object to be iterated.
265 * \param[in] capa capability(useless in current implementation)
267 * \retval initialized iterator(dt_it) if successful initialize the
268 * iteration. lit_stripe_index will be used to indicate the
269 * current iterate position among stripes.
270 * \retval ERR pointer if initialization is failed.
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273 struct dt_object *dt, __u32 attr,
274 struct lustre_capa *capa)
276 struct lod_object *lo = lod_dt_obj(dt);
277 struct dt_object *next;
278 struct lod_it *it = &lod_env_info(env)->lti_it;
279 struct dt_it *it_next;
282 LASSERT(lo->ldo_stripenr > 0);
283 next = lo->ldo_stripe[0];
284 LASSERT(next != NULL);
285 LASSERT(next->do_index_ops != NULL);
287 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
291 /* currently we do not use more than one iterator per thread
292 * so we store it in thread info. if at some point we need
293 * more active iterators in a single thread, we can allocate
295 LASSERT(it->lit_obj == NULL);
297 it->lit_stripe_index = 0;
299 it->lit_it = it_next;
302 return (struct dt_it *)it;
305 #define LOD_CHECK_STRIPED_IT(env, it, lo) \
307 LASSERT((it)->lit_obj != NULL); \
308 LASSERT((it)->lit_it != NULL); \
309 LASSERT((lo)->ldo_stripenr > 0); \
310 LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \
314 * Implementation of dt_index_operations:: dio_it.fini
316 * This function is to finish the iterator for striped directory.
318 * \param[in] env execution environment.
319 * \param[in] di the iterator for the striped directory
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
324 struct lod_it *it = (struct lod_it *)di;
325 struct lod_object *lo = lod_dt_obj(it->lit_obj);
326 struct dt_object *next;
328 LOD_CHECK_STRIPED_IT(env, it, lo);
330 next = lo->ldo_stripe[it->lit_stripe_index];
331 LASSERT(next != NULL);
332 LASSERT(next->do_index_ops != NULL);
334 next->do_index_ops->dio_it.fini(env, it->lit_it);
336 /* the iterator not in use any more */
339 it->lit_stripe_index = 0;
343 * Implementation of dt_index_operations:: dio_it.get
345 * This function is to position the iterator with given key
347 * \param[in] env execution environment.
348 * \param[in] di the iterator for striped directory.
349 * \param[in] key the key the iterator will be positioned.
351 * \retval 0 if successfully position iterator by the key.
352 * \retval negative error if position is failed.
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355 const struct dt_key *key)
357 const struct lod_it *it = (const struct lod_it *)di;
358 struct lod_object *lo = lod_dt_obj(it->lit_obj);
359 struct dt_object *next;
362 LOD_CHECK_STRIPED_IT(env, it, lo);
364 next = lo->ldo_stripe[it->lit_stripe_index];
365 LASSERT(next != NULL);
366 LASSERT(next->do_index_ops != NULL);
368 return next->do_index_ops->dio_it.get(env, it->lit_it, key);
372 * Implementation of dt_index_operations:: dio_it.put
374 * This function is supposed to be the pair of it_get, but currently do
375 * nothing. see (osd_it_ea_put or osd_index_it_put)
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
379 struct lod_it *it = (struct lod_it *)di;
380 struct lod_object *lo = lod_dt_obj(it->lit_obj);
381 struct dt_object *next;
383 LOD_CHECK_STRIPED_IT(env, it, lo);
385 next = lo->ldo_stripe[it->lit_stripe_index];
386 LASSERT(next != NULL);
387 LASSERT(next->do_index_ops != NULL);
389 return next->do_index_ops->dio_it.put(env, it->lit_it);
393 * Implementation of dt_index_operations:: dio_it.next
395 * This function is to position the iterator to the next entry, if current
396 * stripe is finished by checking the return value of next() in current
397 * stripe. it will go to next stripe. In the mean time, the sub-iterator
398 * for next stripe needs to be initialized.
400 * \param[in] env execution environment.
401 * \param[in] di the iterator for striped directory.
403 * \retval 0 if successfully position iterator to the next entry.
404 * \retval negative error if position is failed.
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
408 struct lod_it *it = (struct lod_it *)di;
409 struct lod_object *lo = lod_dt_obj(it->lit_obj);
410 struct dt_object *next;
411 struct dt_it *it_next;
415 LOD_CHECK_STRIPED_IT(env, it, lo);
417 next = lo->ldo_stripe[it->lit_stripe_index];
418 LASSERT(next != NULL);
419 LASSERT(next->do_index_ops != NULL);
421 rc = next->do_index_ops->dio_it.next(env, it->lit_it);
425 if (rc == 0 && it->lit_stripe_index == 0)
428 if (rc == 0 && it->lit_stripe_index > 0) {
429 struct lu_dirent *ent;
431 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
433 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434 (struct dt_rec *)ent,
439 /* skip . and .. for slave stripe */
440 if ((strncmp(ent->lde_name, ".",
441 le16_to_cpu(ent->lde_namelen)) == 0 &&
442 le16_to_cpu(ent->lde_namelen) == 1) ||
443 (strncmp(ent->lde_name, "..",
444 le16_to_cpu(ent->lde_namelen)) == 0 &&
445 le16_to_cpu(ent->lde_namelen) == 2))
451 /* go to next stripe */
452 if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
455 it->lit_stripe_index++;
457 next->do_index_ops->dio_it.put(env, it->lit_it);
458 next->do_index_ops->dio_it.fini(env, it->lit_it);
460 rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
464 next = lo->ldo_stripe[it->lit_stripe_index];
465 LASSERT(next != NULL);
466 LASSERT(next->do_index_ops != NULL);
468 it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
470 if (!IS_ERR(it_next)) {
471 it->lit_it = it_next;
474 rc = PTR_ERR(it_next);
481 * Implementation of dt_index_operations:: dio_it.key
483 * This function is to get the key of the iterator at current position.
485 * \param[in] env execution environment.
486 * \param[in] di the iterator for striped directory.
488 * \retval key(dt_key) if successfully get the key.
489 * \retval negative error if can not get the key.
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492 const struct dt_it *di)
494 const struct lod_it *it = (const struct lod_it *)di;
495 struct lod_object *lo = lod_dt_obj(it->lit_obj);
496 struct dt_object *next;
498 LOD_CHECK_STRIPED_IT(env, it, lo);
500 next = lo->ldo_stripe[it->lit_stripe_index];
501 LASSERT(next != NULL);
502 LASSERT(next->do_index_ops != NULL);
504 return next->do_index_ops->dio_it.key(env, it->lit_it);
508 * Implementation of dt_index_operations:: dio_it.key_size
510 * This function is to get the key_size of current key.
512 * \param[in] env execution environment.
513 * \param[in] di the iterator for striped directory.
515 * \retval key_size if successfully get the key_size.
516 * \retval negative error if can not get the key_size.
518 static int lod_striped_it_key_size(const struct lu_env *env,
519 const struct dt_it *di)
521 struct lod_it *it = (struct lod_it *)di;
522 struct lod_object *lo = lod_dt_obj(it->lit_obj);
523 struct dt_object *next;
525 LOD_CHECK_STRIPED_IT(env, it, lo);
527 next = lo->ldo_stripe[it->lit_stripe_index];
528 LASSERT(next != NULL);
529 LASSERT(next->do_index_ops != NULL);
531 return next->do_index_ops->dio_it.key_size(env, it->lit_it);
535 * Implementation of dt_index_operations:: dio_it.rec
537 * This function is to get the record at current position.
539 * \param[in] env execution environment.
540 * \param[in] di the iterator for striped directory.
541 * \param[in] attr the attribute of iterator, mostly used to indicate
542 * the entry attribute in the object to be iterated.
543 * \param[out] rec hold the return record.
545 * \retval 0 if successfully get the entry.
546 * \retval negative error if can not get entry.
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549 struct dt_rec *rec, __u32 attr)
551 const struct lod_it *it = (const struct lod_it *)di;
552 struct lod_object *lo = lod_dt_obj(it->lit_obj);
553 struct dt_object *next;
555 LOD_CHECK_STRIPED_IT(env, it, lo);
557 next = lo->ldo_stripe[it->lit_stripe_index];
558 LASSERT(next != NULL);
559 LASSERT(next->do_index_ops != NULL);
561 return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
565 * Implementation of dt_index_operations:: dio_it.rec_size
567 * This function is to get the record_size at current record.
569 * \param[in] env execution environment.
570 * \param[in] di the iterator for striped directory.
571 * \param[in] attr the attribute of iterator, mostly used to indicate
572 * the entry attribute in the object to be iterated.
574 * \retval rec_size if successfully get the entry size.
575 * \retval negative error if can not get entry size.
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578 const struct dt_it *di, __u32 attr)
580 struct lod_it *it = (struct lod_it *)di;
581 struct lod_object *lo = lod_dt_obj(it->lit_obj);
582 struct dt_object *next;
584 LOD_CHECK_STRIPED_IT(env, it, lo);
586 next = lo->ldo_stripe[it->lit_stripe_index];
587 LASSERT(next != NULL);
588 LASSERT(next->do_index_ops != NULL);
590 return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
594 * Implementation of dt_index_operations:: dio_it.store
596 * This function will a cookie for current position of the iterator head,
597 * so that user can use this cookie to load/start the iterator next time.
599 * \param[in] env execution environment.
600 * \param[in] di the iterator for striped directory.
602 * \retval the cookie.
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605 const struct dt_it *di)
607 const struct lod_it *it = (const struct lod_it *)di;
608 struct lod_object *lo = lod_dt_obj(it->lit_obj);
609 struct dt_object *next;
611 LOD_CHECK_STRIPED_IT(env, it, lo);
613 next = lo->ldo_stripe[it->lit_stripe_index];
614 LASSERT(next != NULL);
615 LASSERT(next->do_index_ops != NULL);
617 return next->do_index_ops->dio_it.store(env, it->lit_it);
621 * Implementation of dt_index_operations:: dio_it.load
623 * This function will position the iterator with the given hash(usually
626 * \param[in] env execution environment.
627 * \param[in] di the iterator for striped directory.
628 * \param[in] hash the given hash.
630 * \retval >0 if successfuly load the iterator to the given position.
631 * \retval <0 if load is failed.
633 static int lod_striped_it_load(const struct lu_env *env,
634 const struct dt_it *di, __u64 hash)
636 const struct lod_it *it = (const struct lod_it *)di;
637 struct lod_object *lo = lod_dt_obj(it->lit_obj);
638 struct dt_object *next;
640 LOD_CHECK_STRIPED_IT(env, it, lo);
642 next = lo->ldo_stripe[it->lit_stripe_index];
643 LASSERT(next != NULL);
644 LASSERT(next->do_index_ops != NULL);
646 return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
649 static struct dt_index_operations lod_striped_index_ops = {
650 .dio_lookup = lod_index_lookup,
651 .dio_declare_insert = lod_declare_index_insert,
652 .dio_insert = lod_index_insert,
653 .dio_declare_delete = lod_declare_index_delete,
654 .dio_delete = lod_index_delete,
656 .init = lod_striped_it_init,
657 .fini = lod_striped_it_fini,
658 .get = lod_striped_it_get,
659 .put = lod_striped_it_put,
660 .next = lod_striped_it_next,
661 .key = lod_striped_it_key,
662 .key_size = lod_striped_it_key_size,
663 .rec = lod_striped_it_rec,
664 .rec_size = lod_striped_it_rec_size,
665 .store = lod_striped_it_store,
666 .load = lod_striped_it_load,
671 * Append the FID for each shard of the striped directory after the
672 * given LMV EA header.
674 * To simplify striped directory and the consistency verification,
675 * we only store the LMV EA header on disk, for both master object
676 * and slave objects. When someone wants to know the whole LMV EA,
677 * such as client readdir(), we can build the entrie LMV EA on the
678 * MDT side (in RAM) via iterating the sub-directory entries that
679 * are contained in the master object of the stripe directory.
681 * For the master object of the striped directroy, the valid name
682 * for each shard is composed of the ${shard_FID}:${shard_idx}.
684 * There may be holes in the LMV EA if some shards' name entries
685 * are corrupted or lost.
687 * \param[in] env pointer to the thread context
688 * \param[in] lo pointer to the master object of the striped directory
689 * \param[in] buf pointer to the lu_buf which will hold the LMV EA
690 * \param[in] resize whether re-allocate the buffer if it is not big enough
692 * \retval positive size of the LMV EA
693 * \retval 0 for nothing to be loaded
694 * \retval negative error number on failure
696 int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
697 struct lu_buf *buf, bool resize)
699 struct lu_dirent *ent =
700 (struct lu_dirent *)lod_env_info(env)->lti_key;
701 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
702 struct dt_object *obj = dt_object_child(&lo->ldo_obj);
703 struct lmv_mds_md_v1 *lmv1 = buf->lb_buf;
705 const struct dt_it_ops *iops;
707 __u32 magic = le32_to_cpu(lmv1->lmv_magic);
712 /* If it is not a striped directory, then load nothing. */
713 if (magic != LMV_MAGIC_V1)
716 /* If it is in migration (or failure), then load nothing. */
717 if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
720 stripes = le32_to_cpu(lmv1->lmv_stripe_count);
724 size = lmv_mds_md_size(stripes, magic);
725 if (buf->lb_len < size) {
734 lu_buf_alloc(buf, size);
739 memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
742 if (unlikely(!dt_try_as_dir(env, obj)))
745 memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
746 iops = &obj->do_index_ops->dio_it;
747 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
751 rc = iops->load(env, it, 0);
753 rc = iops->next(env, it);
758 char name[FID_LEN + 2] = "";
763 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
769 fid_le_to_cpu(&fid, &ent->lde_fid);
770 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
771 if (ent->lde_name[0] == '.') {
772 if (ent->lde_namelen == 1)
775 if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
779 len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
780 /* The ent->lde_name is composed of ${FID}:${index} */
781 if (ent->lde_namelen < len + 1 ||
782 memcmp(ent->lde_name, name, len) != 0) {
783 CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
784 "%s: invalid shard name %.*s with the FID "DFID
785 " for the striped directory "DFID", %s\n",
786 lod2obd(lod)->obd_name, ent->lde_namelen,
787 ent->lde_name, PFID(&fid),
788 PFID(lu_object_fid(&obj->do_lu)),
789 lod->lod_lmv_failout ? "failout" : "skip");
791 if (lod->lod_lmv_failout)
799 if (ent->lde_name[len] < '0' ||
800 ent->lde_name[len] > '9') {
801 CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
802 "%s: invalid shard name %.*s with the "
803 "FID "DFID" for the striped directory "
805 lod2obd(lod)->obd_name, ent->lde_namelen,
806 ent->lde_name, PFID(&fid),
807 PFID(lu_object_fid(&obj->do_lu)),
808 lod->lod_lmv_failout ?
811 if (lod->lod_lmv_failout)
817 index = index * 10 + ent->lde_name[len++] - '0';
818 } while (len < ent->lde_namelen);
820 if (len == ent->lde_namelen) {
821 /* Out of LMV EA range. */
822 if (index >= stripes) {
823 CERROR("%s: the shard %.*s for the striped "
824 "directory "DFID" is out of the known "
825 "LMV EA range [0 - %u], failout\n",
826 lod2obd(lod)->obd_name, ent->lde_namelen,
828 PFID(lu_object_fid(&obj->do_lu)),
834 /* The slot has been occupied. */
835 if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
839 &lmv1->lmv_stripe_fids[index]);
840 CERROR("%s: both the shard "DFID" and "DFID
841 " for the striped directory "DFID
842 " claim the same LMV EA slot at the "
843 "index %d, failout\n",
844 lod2obd(lod)->obd_name,
845 PFID(&fid0), PFID(&fid),
846 PFID(lu_object_fid(&obj->do_lu)), index);
851 /* stored as LE mode */
852 lmv1->lmv_stripe_fids[index] = ent->lde_fid;
855 rc = iops->next(env, it);
862 RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
866 * Implementation of dt_object_operations:: do_index_try
868 * This function will try to initialize the index api pointer for the
869 * given object, usually it the entry point of the index api. i.e.
870 * the index object should be initialized in index_try, then start
871 * using index api. For striped directory, it will try to initialize
872 * all of its sub_stripes.
874 * \param[in] env execution environment.
875 * \param[in] dt the index object to be initialized.
876 * \param[in] feat the features of this object, for example fixed or
877 * variable key size etc.
879 * \retval >0 if the initialization is successful.
880 * \retval <0 if the initialization is failed.
882 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
883 const struct dt_index_features *feat)
885 struct lod_object *lo = lod_dt_obj(dt);
886 struct dt_object *next = dt_object_child(dt);
890 LASSERT(next->do_ops);
891 LASSERT(next->do_ops->do_index_try);
893 rc = lod_load_striping_locked(env, lo);
897 rc = next->do_ops->do_index_try(env, next, feat);
901 if (lo->ldo_stripenr > 0) {
904 for (i = 0; i < lo->ldo_stripenr; i++) {
905 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
907 rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
908 lo->ldo_stripe[i], feat);
912 dt->do_index_ops = &lod_striped_index_ops;
914 dt->do_index_ops = &lod_index_ops;
920 static void lod_object_read_lock(const struct lu_env *env,
921 struct dt_object *dt, unsigned role)
923 dt_read_lock(env, dt_object_child(dt), role);
926 static void lod_object_write_lock(const struct lu_env *env,
927 struct dt_object *dt, unsigned role)
929 dt_write_lock(env, dt_object_child(dt), role);
932 static void lod_object_read_unlock(const struct lu_env *env,
933 struct dt_object *dt)
935 dt_read_unlock(env, dt_object_child(dt));
938 static void lod_object_write_unlock(const struct lu_env *env,
939 struct dt_object *dt)
941 dt_write_unlock(env, dt_object_child(dt));
944 static int lod_object_write_locked(const struct lu_env *env,
945 struct dt_object *dt)
947 return dt_write_locked(env, dt_object_child(dt));
950 static int lod_attr_get(const struct lu_env *env,
951 struct dt_object *dt,
952 struct lu_attr *attr,
953 struct lustre_capa *capa)
955 /* Note: for striped directory, client will merge attributes
956 * from all of the sub-stripes see lmv_merge_attr(), and there
957 * no MDD logic depend on directory nlink/size/time, so we can
958 * always use master inode nlink and size for now. */
959 return dt_attr_get(env, dt_object_child(dt), attr, capa);
963 * Mark all of sub-stripes dead of the striped directory.
965 static int lod_mark_dead_object(const struct lu_env *env,
966 struct dt_object *dt,
967 struct thandle *handle,
970 struct lod_object *lo = lod_dt_obj(dt);
971 struct lmv_mds_md_v1 *lmv;
972 __u32 dead_hash_type;
978 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
981 rc = lod_load_striping_locked(env, lo);
985 if (lo->ldo_stripenr == 0)
988 rc = lod_get_lmv_ea(env, lo);
992 lmv = lod_env_info(env)->lti_ea_store;
993 lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
994 dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
995 lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
996 for (i = 0; i < lo->ldo_stripenr; i++) {
999 lmv->lmv_master_mdt_index = i;
1001 buf.lb_len = sizeof(*lmv);
1003 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
1005 LU_XATTR_REPLACE, handle);
1007 rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
1008 XATTR_NAME_LMV, LU_XATTR_REPLACE,
1009 handle, BYPASS_CAPA);
1018 static int lod_declare_attr_set(const struct lu_env *env,
1019 struct dt_object *dt,
1020 const struct lu_attr *attr,
1021 struct thandle *handle)
1023 struct dt_object *next = dt_object_child(dt);
1024 struct lod_object *lo = lod_dt_obj(dt);
1028 /* Set dead object on all other stripes */
1029 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1030 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1031 rc = lod_mark_dead_object(env, dt, handle, true);
1036 * declare setattr on the local object
1038 rc = dt_declare_attr_set(env, next, attr, handle);
1042 /* osp_declare_attr_set() ignores all attributes other than
1043 * UID, GID, and size, and osp_attr_set() ignores all but UID
1044 * and GID. Declaration of size attr setting happens through
1045 * lod_declare_init_size(), and not through this function.
1046 * Therefore we need not load striping unless ownership is
1047 * changing. This should save memory and (we hope) speed up
1049 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1050 if (!(attr->la_valid & (LA_UID | LA_GID)))
1053 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1056 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1057 LA_ATIME | LA_MTIME | LA_CTIME)))
1061 * load striping information, notice we don't do this when object
1062 * is being initialized as we don't need this information till
1063 * few specific cases like destroy, chown
1065 rc = lod_load_striping(env, lo);
1069 if (lo->ldo_stripenr == 0)
1073 * if object is striped declare changes on the stripes
1075 LASSERT(lo->ldo_stripe);
1076 for (i = 0; i < lo->ldo_stripenr; i++) {
1077 if (likely(lo->ldo_stripe[i] != NULL)) {
1078 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
1081 CERROR("failed declaration: %d\n", rc);
1087 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1088 dt_object_exists(next) != 0 &&
1089 dt_object_remote(next) == 0)
1090 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
1092 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1093 dt_object_exists(next) &&
1094 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1095 struct lod_thread_info *info = lod_env_info(env);
1096 struct lu_buf *buf = &info->lti_buf;
1098 buf->lb_buf = info->lti_ea_store;
1099 buf->lb_len = info->lti_ea_store_size;
1100 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
1101 LU_XATTR_REPLACE, handle);
1107 static int lod_attr_set(const struct lu_env *env,
1108 struct dt_object *dt,
1109 const struct lu_attr *attr,
1110 struct thandle *handle,
1111 struct lustre_capa *capa)
1113 struct dt_object *next = dt_object_child(dt);
1114 struct lod_object *lo = lod_dt_obj(dt);
1118 /* Set dead object on all other stripes */
1119 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1120 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1121 rc = lod_mark_dead_object(env, dt, handle, false);
1126 * apply changes to the local object
1128 rc = dt_attr_set(env, next, attr, handle, capa);
1132 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1133 if (!(attr->la_valid & (LA_UID | LA_GID)))
1136 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1139 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1140 LA_ATIME | LA_MTIME | LA_CTIME)))
1144 if (lo->ldo_stripenr == 0)
1148 * if object is striped, apply changes to all the stripes
1150 LASSERT(lo->ldo_stripe);
1151 for (i = 0; i < lo->ldo_stripenr; i++) {
1152 if (unlikely(lo->ldo_stripe[i] == NULL))
1154 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
1155 (dt_object_exists(lo->ldo_stripe[i]) == 0))
1158 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
1160 CERROR("failed declaration: %d\n", rc);
1165 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1166 dt_object_exists(next) != 0 &&
1167 dt_object_remote(next) == 0)
1168 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1170 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1171 dt_object_exists(next) &&
1172 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1173 struct lod_thread_info *info = lod_env_info(env);
1174 struct lu_buf *buf = &info->lti_buf;
1175 struct ost_id *oi = &info->lti_ostid;
1176 struct lu_fid *fid = &info->lti_fid;
1177 struct lov_mds_md_v1 *lmm;
1178 struct lov_ost_data_v1 *objs;
1182 rc1 = lod_get_lov_ea(env, lo);
1186 buf->lb_buf = info->lti_ea_store;
1187 buf->lb_len = info->lti_ea_store_size;
1188 lmm = info->lti_ea_store;
1189 magic = le32_to_cpu(lmm->lmm_magic);
1190 if (magic == LOV_MAGIC_V1)
1191 objs = &(lmm->lmm_objects[0]);
1193 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1194 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1195 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1197 fid_to_ostid(fid, oi);
1198 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1199 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1200 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1206 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1207 struct lu_buf *buf, const char *name,
1208 struct lustre_capa *capa)
1210 struct lod_thread_info *info = lod_env_info(env);
1211 struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev);
1215 rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1216 if (strcmp(name, XATTR_NAME_LMV) == 0) {
1217 struct lmv_mds_md_v1 *lmv1;
1220 if (rc > (typeof(rc))sizeof(*lmv1))
1223 if (rc < (typeof(rc))sizeof(*lmv1))
1224 RETURN(rc = rc > 0 ? -EINVAL : rc);
1226 if (buf->lb_buf == NULL || buf->lb_len == 0) {
1227 CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
1229 info->lti_buf.lb_buf = info->lti_key;
1230 info->lti_buf.lb_len = sizeof(*lmv1);
1231 rc = dt_xattr_get(env, dt_object_child(dt),
1232 &info->lti_buf, name, capa);
1233 if (unlikely(rc != sizeof(*lmv1)))
1234 RETURN(rc = rc > 0 ? -EINVAL : rc);
1236 lmv1 = info->lti_buf.lb_buf;
1237 /* The on-disk LMV EA only contains header, but the
1238 * returned LMV EA size should contain the space for
1239 * the FIDs of all shards of the striped directory. */
1240 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
1241 rc = lmv_mds_md_size(
1242 le32_to_cpu(lmv1->lmv_stripe_count),
1245 rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
1249 RETURN(rc = rc1 != 0 ? rc1 : rc);
1252 if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1256 * lod returns default striping on the real root of the device
1257 * this is like the root stores default striping for the whole
1258 * filesystem. historically we've been using a different approach
1259 * and store it in the config.
1261 dt_root_get(env, dev->lod_child, &info->lti_fid);
1262 is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1264 if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1265 struct lov_user_md *lum = buf->lb_buf;
1266 struct lov_desc *desc = &dev->lod_desc;
1268 if (buf->lb_buf == NULL) {
1270 } else if (buf->lb_len >= sizeof(*lum)) {
1271 lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1272 lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1273 lmm_oi_set_id(&lum->lmm_oi, 0);
1274 lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1275 lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1276 lum->lmm_stripe_size = cpu_to_le32(
1277 desc->ld_default_stripe_size);
1278 lum->lmm_stripe_count = cpu_to_le16(
1279 desc->ld_default_stripe_count);
1280 lum->lmm_stripe_offset = cpu_to_le16(
1281 desc->ld_default_stripe_offset);
1291 static int lod_verify_md_striping(struct lod_device *lod,
1292 const struct lmv_user_md_v1 *lum)
1297 if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1298 GOTO(out, rc = -EINVAL);
1300 if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1301 GOTO(out, rc = -EINVAL);
1304 CERROR("%s: invalid lmv_user_md: magic = %x, "
1305 "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1306 lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1307 (int)le32_to_cpu(lum->lum_stripe_offset),
1308 le32_to_cpu(lum->lum_stripe_count), rc);
1313 * Master LMVEA will be same as slave LMVEA, except
1314 * 1. different magic
1315 * 2. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1317 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1318 const struct lmv_mds_md_v1 *master_lmv)
1320 *slave_lmv = *master_lmv;
1321 slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1324 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1325 struct lu_buf *lmv_buf)
1327 struct lod_thread_info *info = lod_env_info(env);
1328 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1329 struct lod_object *lo = lod_dt_obj(dt);
1330 struct lmv_mds_md_v1 *lmm1;
1332 int type = LU_SEQ_RANGE_ANY;
1337 LASSERT(lo->ldo_dir_striped != 0);
1338 LASSERT(lo->ldo_stripenr > 0);
1339 stripe_count = lo->ldo_stripenr;
1340 /* Only store the LMV EA heahder on the disk. */
1341 if (info->lti_ea_store_size < sizeof(*lmm1)) {
1342 rc = lod_ea_store_resize(info, sizeof(*lmm1));
1346 memset(info->lti_ea_store, 0, sizeof(*lmm1));
1349 lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1350 lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1351 lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1352 lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1353 rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1358 lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1359 lmv_buf->lb_buf = info->lti_ea_store;
1360 lmv_buf->lb_len = sizeof(*lmm1);
1361 lo->ldo_dir_striping_cached = 1;
1366 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1367 const struct lu_buf *buf)
1369 struct lod_thread_info *info = lod_env_info(env);
1370 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1371 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1372 struct dt_object **stripe;
1373 union lmv_mds_md *lmm = buf->lb_buf;
1374 struct lmv_mds_md_v1 *lmv1 = &lmm->lmv_md_v1;
1375 struct lu_fid *fid = &info->lti_fid;
1380 if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1383 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1384 lo->ldo_dir_slave_stripe = 1;
1388 if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1391 if (le32_to_cpu(lmv1->lmv_stripe_count) < 1)
1394 LASSERT(lo->ldo_stripe == NULL);
1395 OBD_ALLOC(stripe, sizeof(stripe[0]) *
1396 (le32_to_cpu(lmv1->lmv_stripe_count)));
1400 for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1401 struct dt_device *tgt_dt;
1402 struct dt_object *dto;
1403 int type = LU_SEQ_RANGE_ANY;
1406 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1407 if (!fid_is_sane(fid))
1408 GOTO(out, rc = -ESTALE);
1410 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1414 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1415 tgt_dt = lod->lod_child;
1417 struct lod_tgt_desc *tgt;
1419 tgt = LTD_TGT(ltd, idx);
1421 GOTO(out, rc = -ESTALE);
1422 tgt_dt = tgt->ltd_tgt;
1425 dto = dt_locate_at(env, tgt_dt, fid,
1426 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1429 GOTO(out, rc = PTR_ERR(dto));
1434 lo->ldo_stripe = stripe;
1435 lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1436 lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1438 lod_object_free_striping(env, lo);
1443 static int lod_prep_md_striped_create(const struct lu_env *env,
1444 struct dt_object *dt,
1445 struct lu_attr *attr,
1446 const struct lmv_user_md_v1 *lum,
1447 struct dt_object_format *dof,
1450 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1451 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1452 struct lod_object *lo = lod_dt_obj(dt);
1453 struct lod_thread_info *info = lod_env_info(env);
1454 struct dt_object **stripe;
1455 struct lu_buf lmv_buf;
1456 struct lu_buf slave_lmv_buf;
1457 struct lmv_mds_md_v1 *lmm;
1458 struct lmv_mds_md_v1 *slave_lmm = NULL;
1459 struct dt_insert_rec *rec = &info->lti_dt_rec;
1467 /* The lum has been verifed in lod_verify_md_striping */
1468 LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1469 LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1471 stripe_count = le32_to_cpu(lum->lum_stripe_count);
1473 /* shrink the stripe_count to the avaible MDT count */
1474 if (stripe_count > lod->lod_remote_mdt_count + 1)
1475 stripe_count = lod->lod_remote_mdt_count + 1;
1477 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1481 OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1482 if (idx_array == NULL)
1483 GOTO(out_free, rc = -ENOMEM);
1485 for (i = 0; i < stripe_count; i++) {
1486 struct lod_tgt_desc *tgt = NULL;
1487 struct dt_object *dto;
1488 struct lu_fid fid = { 0 };
1490 struct lu_object_conf conf = { 0 };
1491 struct dt_device *tgt_dt = NULL;
1494 /* Right now, master stripe and master object are
1495 * on the same MDT */
1496 idx = le32_to_cpu(lum->lum_stripe_offset);
1497 rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1501 tgt_dt = lod->lod_child;
1505 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1507 for (j = 0; j < lod->lod_remote_mdt_count;
1508 j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1509 bool already_allocated = false;
1512 CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
1513 " allocated %u, last allocated %d\n", idx,
1514 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1516 /* Find next available target */
1517 if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1520 /* check whether the idx already exists
1521 * in current allocated array */
1522 for (k = 0; k < i; k++) {
1523 if (idx_array[k] == idx) {
1524 already_allocated = true;
1529 if (already_allocated)
1532 /* check the status of the OSP */
1533 tgt = LTD_TGT(ltd, idx);
1537 tgt_dt = tgt->ltd_tgt;
1538 rc = dt_statfs(env, tgt_dt, NULL);
1540 /* this OSP doesn't feel well */
1545 rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1554 /* Can not allocate more stripes */
1555 if (j == lod->lod_remote_mdt_count) {
1556 CDEBUG(D_INFO, "%s: require stripes %u only get %d\n",
1557 lod2obd(lod)->obd_name, stripe_count, i - 1);
1561 CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
1562 " allocated %u, last allocated %d\n", idx,
1563 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1566 /* tgt_dt and fid must be ready after search avaible OSP
1567 * in the above loop */
1568 LASSERT(tgt_dt != NULL);
1569 LASSERT(fid_is_sane(&fid));
1570 conf.loc_flags = LOC_F_NEW;
1571 dto = dt_locate_at(env, tgt_dt, &fid,
1572 dt->do_lu.lo_dev->ld_site->ls_top_dev,
1575 GOTO(out_put, rc = PTR_ERR(dto));
1580 lo->ldo_dir_striped = 1;
1581 lo->ldo_stripe = stripe;
1582 lo->ldo_stripenr = i;
1583 lo->ldo_stripes_allocated = stripe_count;
1585 if (lo->ldo_stripenr == 0)
1586 GOTO(out_put, rc = -ENOSPC);
1588 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1591 lmm = lmv_buf.lb_buf;
1593 OBD_ALLOC_PTR(slave_lmm);
1594 if (slave_lmm == NULL)
1595 GOTO(out_put, rc = -ENOMEM);
1597 lod_prep_slave_lmv_md(slave_lmm, lmm);
1598 slave_lmv_buf.lb_buf = slave_lmm;
1599 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1601 if (!dt_try_as_dir(env, dt_object_child(dt)))
1602 GOTO(out_put, rc = -EINVAL);
1604 rec->rec_type = S_IFDIR;
1605 for (i = 0; i < lo->ldo_stripenr; i++) {
1606 struct dt_object *dto = stripe[i];
1607 char *stripe_name = info->lti_key;
1608 struct lu_name *sname;
1609 struct linkea_data ldata = { 0 };
1610 struct lu_buf linkea_buf;
1612 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1616 if (!dt_try_as_dir(env, dto))
1617 GOTO(out_put, rc = -EINVAL);
1619 rec->rec_fid = lu_object_fid(&dto->do_lu);
1620 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
1621 (const struct dt_key *)dot, th);
1625 /* master stripe FID will be put to .. */
1626 rec->rec_fid = lu_object_fid(&dt->do_lu);
1627 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
1628 (const struct dt_key *)dotdot, th);
1632 /* probably nothing to inherite */
1633 if (lo->ldo_striping_cached &&
1634 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1635 lo->ldo_def_stripenr,
1636 lo->ldo_def_stripe_offset)) {
1637 struct lov_user_md_v3 *v3;
1639 /* sigh, lti_ea_store has been used for lmv_buf,
1640 * so we have to allocate buffer for default
1644 GOTO(out_put, rc = -ENOMEM);
1646 memset(v3, 0, sizeof(*v3));
1647 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1648 v3->lmm_stripe_count =
1649 cpu_to_le16(lo->ldo_def_stripenr);
1650 v3->lmm_stripe_offset =
1651 cpu_to_le16(lo->ldo_def_stripe_offset);
1652 v3->lmm_stripe_size =
1653 cpu_to_le32(lo->ldo_def_stripe_size);
1654 if (lo->ldo_pool != NULL)
1655 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1656 sizeof(v3->lmm_pool_name));
1658 info->lti_buf.lb_buf = v3;
1659 info->lti_buf.lb_len = sizeof(*v3);
1660 rc = dt_declare_xattr_set(env, dto,
1669 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1670 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1671 XATTR_NAME_LMV, 0, th);
1675 snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
1676 PFID(lu_object_fid(&dto->do_lu)), i);
1678 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1679 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1683 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1687 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1688 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1689 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1690 XATTR_NAME_LINK, 0, th);
1694 rec->rec_fid = lu_object_fid(&dto->do_lu);
1695 rc = dt_declare_insert(env, dt_object_child(dt),
1696 (const struct dt_rec *)rec,
1697 (const struct dt_key *)stripe_name, th);
1701 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1706 rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1707 XATTR_NAME_LMV, 0, th);
1713 for (i = 0; i < stripe_count; i++)
1714 if (stripe[i] != NULL)
1715 lu_object_put(env, &stripe[i]->do_lu);
1716 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1717 lo->ldo_stripenr = 0;
1718 lo->ldo_stripes_allocated = 0;
1719 lo->ldo_stripe = NULL;
1723 if (idx_array != NULL)
1724 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1725 if (slave_lmm != NULL)
1726 OBD_FREE_PTR(slave_lmm);
1732 * Declare create striped md object.
1734 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1735 struct dt_object *dt,
1736 struct lu_attr *attr,
1737 const struct lu_buf *lum_buf,
1738 struct dt_object_format *dof,
1741 struct lod_object *lo = lod_dt_obj(dt);
1742 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1743 struct lmv_user_md_v1 *lum;
1747 lum = lum_buf->lb_buf;
1748 LASSERT(lum != NULL);
1750 CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1751 le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1752 (int)le32_to_cpu(lum->lum_stripe_offset));
1754 if (le32_to_cpu(lum->lum_stripe_count) == 0)
1757 rc = lod_verify_md_striping(lod, lum);
1761 /* prepare dir striped objects */
1762 rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1764 /* failed to create striping, let's reset
1765 * config so that others don't get confused */
1766 lod_object_free_striping(env, lo);
1773 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1774 struct dt_object *dt,
1775 const struct lu_buf *buf,
1776 const char *name, int fl,
1779 struct dt_object *next = dt_object_child(dt);
1780 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1781 struct lod_object *lo = lod_dt_obj(dt);
1786 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1787 struct lmv_user_md_v1 *lum;
1789 LASSERT(buf != NULL && buf->lb_buf != NULL);
1791 rc = lod_verify_md_striping(d, lum);
1796 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1800 /* set xattr to each stripes, if needed */
1801 rc = lod_load_striping(env, lo);
1805 /* Note: Do not set LinkEA on sub-stripes, otherwise
1806 * it will confuse the fid2path process(see mdt_path_current()).
1807 * The linkEA between master and sub-stripes is set in
1808 * lod_xattr_set_lmv(). */
1809 if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1812 for (i = 0; i < lo->ldo_stripenr; i++) {
1813 LASSERT(lo->ldo_stripe[i]);
1814 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1824 * LOV xattr is a storage for striping, and LOD owns this xattr.
1825 * but LOD allows others to control striping to some extent
1826 * - to reset strping
1827 * - to set new defined striping
1828 * - to set new semi-defined striping
1829 * - number of stripes is defined
1830 * - number of stripes + osts are defined
1833 static int lod_declare_xattr_set(const struct lu_env *env,
1834 struct dt_object *dt,
1835 const struct lu_buf *buf,
1836 const char *name, int fl,
1839 struct dt_object *next = dt_object_child(dt);
1840 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
1846 * allow to declare predefined striping on a new (!mode) object
1847 * which is supposed to be replay of regular file creation
1848 * (when LOV setting is declared)
1849 * LU_XATTR_REPLACE is set to indicate a layout swap
1851 mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1852 if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1853 !(fl & LU_XATTR_REPLACE)) {
1855 * this is a request to manipulate object's striping
1857 if (dt_object_exists(dt)) {
1858 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1862 memset(attr, 0, sizeof(*attr));
1863 attr->la_valid = LA_TYPE | LA_MODE;
1864 attr->la_mode = S_IFREG;
1866 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1867 } else if (S_ISDIR(mode)) {
1868 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1870 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1876 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1878 lo->ldo_striping_cached = 0;
1879 lo->ldo_def_striping_set = 0;
1880 lod_object_set_pool(lo, NULL);
1881 lo->ldo_def_stripe_size = 0;
1882 lo->ldo_def_stripenr = 0;
1883 if (lo->ldo_dir_stripe != NULL)
1884 lo->ldo_dir_striping_cached = 0;
1887 static int lod_xattr_set_internal(const struct lu_env *env,
1888 struct dt_object *dt,
1889 const struct lu_buf *buf,
1890 const char *name, int fl, struct thandle *th,
1891 struct lustre_capa *capa)
1893 struct dt_object *next = dt_object_child(dt);
1894 struct lod_object *lo = lod_dt_obj(dt);
1899 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1900 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1903 /* Note: Do not set LinkEA on sub-stripes, otherwise
1904 * it will confuse the fid2path process(see mdt_path_current()).
1905 * The linkEA between master and sub-stripes is set in
1906 * lod_xattr_set_lmv(). */
1907 if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1910 for (i = 0; i < lo->ldo_stripenr; i++) {
1911 LASSERT(lo->ldo_stripe[i]);
1912 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1921 static int lod_xattr_del_internal(const struct lu_env *env,
1922 struct dt_object *dt,
1923 const char *name, struct thandle *th,
1924 struct lustre_capa *capa)
1926 struct dt_object *next = dt_object_child(dt);
1927 struct lod_object *lo = lod_dt_obj(dt);
1932 rc = dt_xattr_del(env, next, name, th, capa);
1933 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1936 if (lo->ldo_stripenr == 0)
1939 for (i = 0; i < lo->ldo_stripenr; i++) {
1940 LASSERT(lo->ldo_stripe[i]);
1941 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1950 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1951 struct dt_object *dt,
1952 const struct lu_buf *buf,
1953 const char *name, int fl,
1955 struct lustre_capa *capa)
1957 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1958 struct lod_object *l = lod_dt_obj(dt);
1959 struct lov_user_md_v1 *lum;
1960 struct lov_user_md_v3 *v3 = NULL;
1964 /* If it is striped dir, we should clear the stripe cache for
1965 * slave stripe as well, but there are no effective way to
1966 * notify the LOD on the slave MDT, so we do not cache stripe
1967 * information for slave stripe for now. XXX*/
1968 lod_lov_stripe_cache_clear(l);
1969 LASSERT(buf != NULL && buf->lb_buf != NULL);
1972 rc = lod_verify_striping(d, buf, false);
1976 if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1979 /* if { size, offset, count } = { 0, -1, 0 } and no pool
1980 * (i.e. all default values specified) then delete default
1981 * striping from dir. */
1983 "set default striping: sz %u # %u offset %d %s %s\n",
1984 (unsigned)lum->lmm_stripe_size,
1985 (unsigned)lum->lmm_stripe_count,
1986 (int)lum->lmm_stripe_offset,
1987 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1989 if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1990 (lum->lmm_stripe_count),
1991 (lum->lmm_stripe_offset)) &&
1992 lum->lmm_magic == LOV_USER_MAGIC_V1) {
1993 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1997 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2003 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
2004 struct dt_object *dt,
2005 const struct lu_buf *buf,
2006 const char *name, int fl,
2008 struct lustre_capa *capa)
2010 struct lod_object *l = lod_dt_obj(dt);
2011 struct lmv_user_md_v1 *lum;
2015 LASSERT(buf != NULL && buf->lb_buf != NULL);
2018 CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
2019 le32_to_cpu(lum->lum_stripe_count),
2020 (int)le32_to_cpu(lum->lum_stripe_offset));
2022 if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
2023 le32_to_cpu(lum->lum_stripe_offset)) &&
2024 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
2025 rc = lod_xattr_del_internal(env, dt, name, th, capa);
2029 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2034 /* Update default stripe cache */
2035 if (l->ldo_dir_stripe == NULL) {
2036 OBD_ALLOC_PTR(l->ldo_dir_stripe);
2037 if (l->ldo_dir_stripe == NULL)
2041 l->ldo_dir_striping_cached = 0;
2042 l->ldo_dir_def_striping_set = 1;
2043 l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
2048 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
2049 const struct lu_buf *buf, const char *name,
2050 int fl, struct thandle *th,
2051 struct lustre_capa *capa)
2053 struct lod_object *lo = lod_dt_obj(dt);
2054 struct lod_thread_info *info = lod_env_info(env);
2055 struct lu_attr *attr = &info->lti_attr;
2056 struct dt_object_format *dof = &info->lti_format;
2057 struct lu_buf lmv_buf;
2058 struct lu_buf slave_lmv_buf;
2059 struct lmv_mds_md_v1 *lmm;
2060 struct lmv_mds_md_v1 *slave_lmm = NULL;
2061 struct dt_insert_rec *rec = &info->lti_dt_rec;
2066 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2069 /* The stripes are supposed to be allocated in declare phase,
2070 * if there are no stripes being allocated, it will skip */
2071 if (lo->ldo_stripenr == 0)
2074 rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
2078 attr->la_valid = LA_TYPE | LA_MODE;
2079 dof->dof_type = DFT_DIR;
2081 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
2084 lmm = lmv_buf.lb_buf;
2086 OBD_ALLOC_PTR(slave_lmm);
2087 if (slave_lmm == NULL)
2090 lod_prep_slave_lmv_md(slave_lmm, lmm);
2091 slave_lmv_buf.lb_buf = slave_lmm;
2092 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
2094 rec->rec_type = S_IFDIR;
2095 for (i = 0; i < lo->ldo_stripenr; i++) {
2096 struct dt_object *dto;
2097 char *stripe_name = info->lti_key;
2098 struct lu_name *sname;
2099 struct linkea_data ldata = { 0 };
2100 struct lu_buf linkea_buf;
2102 dto = lo->ldo_stripe[i];
2103 dt_write_lock(env, dto, MOR_TGT_CHILD);
2104 rc = dt_create(env, dto, attr, NULL, dof, th);
2105 dt_write_unlock(env, dto);
2109 rec->rec_fid = lu_object_fid(&dto->do_lu);
2110 rc = dt_insert(env, dto, (const struct dt_rec *)rec,
2111 (const struct dt_key *)dot, th, capa, 0);
2115 rec->rec_fid = lu_object_fid(&dt->do_lu);
2116 rc = dt_insert(env, dto, (struct dt_rec *)rec,
2117 (const struct dt_key *)dotdot, th, capa, 0);
2121 if (lo->ldo_striping_cached &&
2122 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2123 lo->ldo_def_stripenr,
2124 lo->ldo_def_stripe_offset)) {
2125 struct lov_user_md_v3 *v3;
2127 /* sigh, lti_ea_store has been used for lmv_buf,
2128 * so we have to allocate buffer for default
2134 memset(v3, 0, sizeof(*v3));
2135 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2136 v3->lmm_stripe_count =
2137 cpu_to_le16(lo->ldo_def_stripenr);
2138 v3->lmm_stripe_offset =
2139 cpu_to_le16(lo->ldo_def_stripe_offset);
2140 v3->lmm_stripe_size =
2141 cpu_to_le32(lo->ldo_def_stripe_size);
2142 if (lo->ldo_pool != NULL)
2143 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2144 sizeof(v3->lmm_pool_name));
2146 info->lti_buf.lb_buf = v3;
2147 info->lti_buf.lb_len = sizeof(*v3);
2148 rc = dt_xattr_set(env, dto, &info->lti_buf,
2149 XATTR_NAME_LOV, 0, th, capa);
2155 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
2156 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
2161 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2162 PFID(lu_object_fid(&dto->do_lu)), i);
2164 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
2165 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
2169 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
2173 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
2174 linkea_buf.lb_len = ldata.ld_leh->leh_len;
2175 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
2176 0, th, BYPASS_CAPA);
2180 rec->rec_fid = lu_object_fid(&dto->do_lu);
2181 rc = dt_insert(env, dt_object_child(dt),
2182 (const struct dt_rec *)rec,
2183 (const struct dt_key *)stripe_name, th, capa, 0);
2187 rc = dt_ref_add(env, dt_object_child(dt), th);
2192 rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
2196 if (slave_lmm != NULL)
2197 OBD_FREE_PTR(slave_lmm);
2202 int lod_dir_striping_create_internal(const struct lu_env *env,
2203 struct dt_object *dt,
2204 struct lu_attr *attr,
2205 struct dt_object_format *dof,
2209 struct lod_thread_info *info = lod_env_info(env);
2210 struct lod_object *lo = lod_dt_obj(dt);
2214 if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
2215 lo->ldo_dir_stripe_offset)) {
2216 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2217 int stripe_count = lo->ldo_stripenr;
2219 if (info->lti_ea_store_size < sizeof(*v1)) {
2220 rc = lod_ea_store_resize(info, sizeof(*v1));
2223 v1 = info->lti_ea_store;
2226 memset(v1, 0, sizeof(*v1));
2227 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2228 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2229 v1->lum_stripe_offset =
2230 cpu_to_le32(lo->ldo_dir_stripe_offset);
2232 info->lti_buf.lb_buf = v1;
2233 info->lti_buf.lb_len = sizeof(*v1);
2236 rc = lod_declare_xattr_set_lmv(env, dt, attr,
2237 &info->lti_buf, dof, th);
2239 rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2240 XATTR_NAME_LMV, 0, th,
2246 /* Transfer default LMV striping from the parent */
2247 if (lo->ldo_dir_striping_cached &&
2248 !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2249 lo->ldo_dir_def_stripe_offset)) {
2250 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2251 int def_stripe_count = lo->ldo_dir_def_stripenr;
2253 if (info->lti_ea_store_size < sizeof(*v1)) {
2254 rc = lod_ea_store_resize(info, sizeof(*v1));
2257 v1 = info->lti_ea_store;
2260 memset(v1, 0, sizeof(*v1));
2261 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2262 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2263 v1->lum_stripe_offset =
2264 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2266 cpu_to_le32(lo->ldo_dir_def_hash_type);
2268 info->lti_buf.lb_buf = v1;
2269 info->lti_buf.lb_len = sizeof(*v1);
2271 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2272 XATTR_NAME_DEFAULT_LMV,
2275 rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2277 XATTR_NAME_DEFAULT_LMV, 0,
2283 /* Transfer default LOV striping from the parent */
2284 if (lo->ldo_striping_cached &&
2285 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2286 lo->ldo_def_stripenr,
2287 lo->ldo_def_stripe_offset)) {
2288 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2290 if (info->lti_ea_store_size < sizeof(*v3)) {
2291 rc = lod_ea_store_resize(info, sizeof(*v3));
2294 v3 = info->lti_ea_store;
2297 memset(v3, 0, sizeof(*v3));
2298 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2299 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2300 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2301 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2302 if (lo->ldo_pool != NULL)
2303 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2304 sizeof(v3->lmm_pool_name));
2306 info->lti_buf.lb_buf = v3;
2307 info->lti_buf.lb_len = sizeof(*v3);
2310 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2311 XATTR_NAME_LOV, 0, th);
2313 rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2314 XATTR_NAME_LOV, 0, th,
2323 static int lod_declare_dir_striping_create(const struct lu_env *env,
2324 struct dt_object *dt,
2325 struct lu_attr *attr,
2326 struct dt_object_format *dof,
2329 return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2332 static int lod_dir_striping_create(const struct lu_env *env,
2333 struct dt_object *dt,
2334 struct lu_attr *attr,
2335 struct dt_object_format *dof,
2338 return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2341 static int lod_xattr_set(const struct lu_env *env,
2342 struct dt_object *dt, const struct lu_buf *buf,
2343 const char *name, int fl, struct thandle *th,
2344 struct lustre_capa *capa)
2346 struct dt_object *next = dt_object_child(dt);
2350 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2351 strcmp(name, XATTR_NAME_LMV) == 0) {
2352 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2354 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2355 LMV_HASH_FLAG_MIGRATION)
2356 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2358 rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2363 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2364 strcmp(name, XATTR_NAME_LOV) == 0) {
2366 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2368 } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2369 strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2371 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2374 } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2375 !strcmp(name, XATTR_NAME_LOV)) {
2376 /* in case of lov EA swap, just set it
2377 * if not, it is a replay so check striping match what we
2378 * already have during req replay, declare_xattr_set()
2379 * defines striping, then create() does the work
2381 if (fl & LU_XATTR_REPLACE) {
2382 /* free stripes, then update disk */
2383 lod_object_free_striping(env, lod_dt_obj(dt));
2384 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2386 rc = lod_striping_create(env, dt, NULL, NULL, th);
2391 /* then all other xattr */
2392 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2397 static int lod_declare_xattr_del(const struct lu_env *env,
2398 struct dt_object *dt, const char *name,
2401 return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2404 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2405 const char *name, struct thandle *th,
2406 struct lustre_capa *capa)
2408 if (!strcmp(name, XATTR_NAME_LOV))
2409 lod_object_free_striping(env, lod_dt_obj(dt));
2410 return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2413 static int lod_xattr_list(const struct lu_env *env,
2414 struct dt_object *dt, struct lu_buf *buf,
2415 struct lustre_capa *capa)
2417 return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2420 int lod_object_set_pool(struct lod_object *o, char *pool)
2425 len = strlen(o->ldo_pool);
2426 OBD_FREE(o->ldo_pool, len + 1);
2431 OBD_ALLOC(o->ldo_pool, len + 1);
2432 if (o->ldo_pool == NULL)
2434 strcpy(o->ldo_pool, pool);
2439 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2441 return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2445 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2446 struct lod_object *lp)
2448 struct lod_thread_info *info = lod_env_info(env);
2449 struct lov_user_md_v1 *v1 = NULL;
2450 struct lov_user_md_v3 *v3 = NULL;
2454 /* called from MDD without parent being write locked,
2456 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2457 rc = lod_get_lov_ea(env, lp);
2461 if (rc < (typeof(rc))sizeof(struct lov_user_md)) {
2462 /* don't lookup for non-existing or invalid striping */
2463 lp->ldo_def_striping_set = 0;
2464 lp->ldo_striping_cached = 1;
2465 lp->ldo_def_stripe_size = 0;
2466 lp->ldo_def_stripenr = 0;
2467 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2468 GOTO(unlock, rc = 0);
2472 v1 = info->lti_ea_store;
2473 if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2474 lustre_swab_lov_user_md_v1(v1);
2475 } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2476 v3 = (struct lov_user_md_v3 *)v1;
2477 lustre_swab_lov_user_md_v3(v3);
2480 if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2481 GOTO(unlock, rc = 0);
2483 if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2484 GOTO(unlock, rc = 0);
2486 CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2487 PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2488 (int)v1->lmm_stripe_count,
2489 (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2491 lp->ldo_def_stripenr = v1->lmm_stripe_count;
2492 lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2493 lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2494 lp->ldo_striping_cached = 1;
2495 lp->ldo_def_striping_set = 1;
2496 if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2497 /* XXX: sanity check here */
2498 v3 = (struct lov_user_md_v3 *) v1;
2499 if (v3->lmm_pool_name[0])
2500 lod_object_set_pool(lp, v3->lmm_pool_name);
2504 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2509 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2510 struct lod_object *lp)
2512 struct lod_thread_info *info = lod_env_info(env);
2513 struct lmv_user_md_v1 *v1 = NULL;
2517 /* called from MDD without parent being write locked,
2519 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2520 rc = lod_get_default_lmv_ea(env, lp);
2524 if (rc < (typeof(rc))sizeof(struct lmv_user_md)) {
2525 /* don't lookup for non-existing or invalid striping */
2526 lp->ldo_dir_def_striping_set = 0;
2527 lp->ldo_dir_striping_cached = 1;
2528 lp->ldo_dir_def_stripenr = 0;
2529 lp->ldo_dir_def_stripe_offset =
2530 (typeof(v1->lum_stripe_offset))(-1);
2531 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2532 GOTO(unlock, rc = 0);
2536 v1 = info->lti_ea_store;
2538 lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2539 lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2540 lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2541 lp->ldo_dir_def_striping_set = 1;
2542 lp->ldo_dir_striping_cached = 1;
2546 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2550 static int lod_cache_parent_striping(const struct lu_env *env,
2551 struct lod_object *lp,
2557 rc = lod_load_striping(env, lp);
2561 if (!lp->ldo_striping_cached) {
2562 /* we haven't tried to get default striping for
2563 * the directory yet, let's cache it in the object */
2564 rc = lod_cache_parent_lov_striping(env, lp);
2569 if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2570 rc = lod_cache_parent_lmv_striping(env, lp);
2576 * used to transfer default striping data to the object being created
2578 static void lod_ah_init(const struct lu_env *env,
2579 struct dt_allocation_hint *ah,
2580 struct dt_object *parent,
2581 struct dt_object *child,
2584 struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2585 struct dt_object *nextp = NULL;
2586 struct dt_object *nextc;
2587 struct lod_object *lp = NULL;
2588 struct lod_object *lc;
2589 struct lov_desc *desc;
2595 if (likely(parent)) {
2596 nextp = dt_object_child(parent);
2597 lp = lod_dt_obj(parent);
2598 rc = lod_load_striping(env, lp);
2603 nextc = dt_object_child(child);
2604 lc = lod_dt_obj(child);
2606 LASSERT(lc->ldo_stripenr == 0);
2607 LASSERT(lc->ldo_stripe == NULL);
2610 * local object may want some hints
2611 * in case of late striping creation, ->ah_init()
2612 * can be called with local object existing
2614 if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2615 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2616 NULL : nextp, nextc, child_mode);
2618 if (S_ISDIR(child_mode)) {
2619 if (lc->ldo_dir_stripe == NULL) {
2620 OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2621 if (lc->ldo_dir_stripe == NULL)
2625 if (lp->ldo_dir_stripe == NULL) {
2626 OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2627 if (lp->ldo_dir_stripe == NULL)
2631 rc = lod_cache_parent_striping(env, lp, child_mode);
2635 /* transfer defaults to new directory */
2636 if (lp->ldo_striping_cached) {
2638 lod_object_set_pool(lc, lp->ldo_pool);
2639 lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2640 lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2641 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2642 lc->ldo_striping_cached = 1;
2643 lc->ldo_def_striping_set = 1;
2644 CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2645 (int)lc->ldo_def_stripe_size,
2646 (int)lc->ldo_def_stripe_offset,
2647 (int)lc->ldo_def_stripenr);
2650 /* transfer dir defaults to new directory */
2651 if (lp->ldo_dir_striping_cached) {
2652 lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2653 lc->ldo_dir_def_stripe_offset =
2654 lp->ldo_dir_def_stripe_offset;
2655 lc->ldo_dir_def_hash_type =
2656 lp->ldo_dir_def_hash_type;
2657 lc->ldo_dir_striping_cached = 1;
2658 lc->ldo_dir_def_striping_set = 1;
2659 CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2660 (int)lc->ldo_dir_def_stripenr,
2661 (int)lc->ldo_dir_def_stripe_offset,
2662 lc->ldo_dir_def_hash_type);
2665 /* It should always honour the specified stripes */
2666 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2667 const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2669 rc = lod_verify_md_striping(d, lum1);
2671 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2672 /* Directory will be striped only if
2673 * stripe_count > 1 */
2675 le32_to_cpu(lum1->lum_stripe_count);
2676 lc->ldo_dir_stripe_offset =
2677 le32_to_cpu(lum1->lum_stripe_offset);
2678 lc->ldo_dir_hash_type =
2679 le32_to_cpu(lum1->lum_hash_type);
2680 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2682 (int)lc->ldo_dir_stripe_offset);
2684 /* then check whether there is default stripes from parent */
2685 } else if (lp->ldo_dir_def_striping_set) {
2686 /* If there are default dir stripe from parent */
2687 lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2688 lc->ldo_dir_stripe_offset =
2689 lp->ldo_dir_def_stripe_offset;
2690 lc->ldo_dir_hash_type =
2691 lp->ldo_dir_def_hash_type;
2692 CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2694 (int)lc->ldo_dir_stripe_offset);
2696 /* set default stripe for this directory */
2697 lc->ldo_stripenr = 0;
2698 lc->ldo_dir_stripe_offset = -1;
2701 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2702 lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2708 * if object is going to be striped over OSTs, transfer default
2709 * striping information to the child, so that we can use it
2710 * during declaration and creation
2712 if (!lod_object_will_be_striped(S_ISREG(child_mode),
2713 lu_object_fid(&child->do_lu)))
2716 * try from the parent
2718 if (likely(parent)) {
2719 lod_cache_parent_striping(env, lp, child_mode);
2721 lc->ldo_def_stripe_offset = (__u16) -1;
2723 if (lp->ldo_def_striping_set) {
2725 lod_object_set_pool(lc, lp->ldo_pool);
2726 lc->ldo_stripenr = lp->ldo_def_stripenr;
2727 lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2728 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2729 CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2730 lc->ldo_stripenr, lc->ldo_stripe_size,
2731 lp->ldo_pool ? lp->ldo_pool : "");
2736 * if the parent doesn't provide with specific pattern, grab fs-wide one
2738 desc = &d->lod_desc;
2739 if (lc->ldo_stripenr == 0)
2740 lc->ldo_stripenr = desc->ld_default_stripe_count;
2741 if (lc->ldo_stripe_size == 0)
2742 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2743 CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2744 lc->ldo_stripenr, lc->ldo_stripe_size,
2745 lc->ldo_pool ? lc->ldo_pool : "");
2748 /* we do not cache stripe information for slave stripe, see
2749 * lod_xattr_set_lov_on_dir */
2750 if (lp != NULL && lp->ldo_dir_slave_stripe)
2751 lod_lov_stripe_cache_clear(lp);
2756 #define ll_do_div64(aaa,bbb) do_div((aaa), (bbb))
2758 * this function handles a special case when truncate was done
2759 * on a stripeless object and now striping is being created
2760 * we can't lose that size, so we have to propagate it to newly
2763 static int lod_declare_init_size(const struct lu_env *env,
2764 struct dt_object *dt, struct thandle *th)
2766 struct dt_object *next = dt_object_child(dt);
2767 struct lod_object *lo = lod_dt_obj(dt);
2768 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
2769 uint64_t size, offs;
2773 /* XXX: we support the simplest (RAID0) striping so far */
2774 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2775 LASSERT(lo->ldo_stripe_size > 0);
2777 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2778 LASSERT(attr->la_valid & LA_SIZE);
2782 size = attr->la_size;
2786 /* ll_do_div64(a, b) returns a % b, and a = a / b */
2787 ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2788 stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2790 size = size * lo->ldo_stripe_size;
2791 offs = attr->la_size;
2792 size += ll_do_div64(offs, lo->ldo_stripe_size);
2794 attr->la_valid = LA_SIZE;
2795 attr->la_size = size;
2797 rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2803 * Create declaration of striped object
2805 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2806 struct lu_attr *attr,
2807 const struct lu_buf *lovea, struct thandle *th)
2809 struct lod_thread_info *info = lod_env_info(env);
2810 struct dt_object *next = dt_object_child(dt);
2811 struct lod_object *lo = lod_dt_obj(dt);
2815 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2816 /* failed to create striping, let's reset
2817 * config so that others don't get confused */
2818 lod_object_free_striping(env, lo);
2819 GOTO(out, rc = -ENOMEM);
2822 if (!dt_object_remote(next)) {
2823 /* choose OST and generate appropriate objects */
2824 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2826 /* failed to create striping, let's reset
2827 * config so that others don't get confused */
2828 lod_object_free_striping(env, lo);
2833 * declare storage for striping data
2835 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2836 lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
2838 /* LOD can not choose OST objects for remote objects, i.e.
2839 * stripes must be ready before that. Right now, it can only
2840 * happen during migrate, i.e. migrate process needs to create
2841 * remote regular file (mdd_migrate_create), then the migrate
2842 * process will provide stripeEA. */
2843 LASSERT(lovea != NULL);
2844 info->lti_buf = *lovea;
2847 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2848 XATTR_NAME_LOV, 0, th);
2853 * if striping is created with local object's size > 0,
2854 * we have to propagate this size to specific object
2855 * the case is possible only when local object was created previously
2857 if (dt_object_exists(next))
2858 rc = lod_declare_init_size(env, dt, th);
2864 static int lod_declare_object_create(const struct lu_env *env,
2865 struct dt_object *dt,
2866 struct lu_attr *attr,
2867 struct dt_allocation_hint *hint,
2868 struct dt_object_format *dof,
2871 struct dt_object *next = dt_object_child(dt);
2872 struct lod_object *lo = lod_dt_obj(dt);
2881 * first of all, we declare creation of local object
2883 rc = dt_declare_create(env, next, attr, hint, dof, th);
2887 if (dof->dof_type == DFT_SYM)
2888 dt->do_body_ops = &lod_body_lnk_ops;
2891 * it's lod_ah_init() who has decided the object will striped
2893 if (dof->dof_type == DFT_REGULAR) {
2894 /* callers don't want stripes */
2895 /* XXX: all tricky interactions with ->ah_make_hint() decided
2896 * to use striping, then ->declare_create() behaving differently
2897 * should be cleaned */
2898 if (dof->u.dof_reg.striped == 0)
2899 lo->ldo_stripenr = 0;
2900 if (lo->ldo_stripenr > 0)
2901 rc = lod_declare_striped_object(env, dt, attr,
2903 } else if (dof->dof_type == DFT_DIR) {
2904 /* Orphan object (like migrating object) does not have
2905 * lod_dir_stripe, see lod_ah_init */
2906 if (lo->ldo_dir_stripe != NULL)
2907 rc = lod_declare_dir_striping_create(env, dt, attr,
2914 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2915 struct lu_attr *attr, struct dt_object_format *dof,
2918 struct lod_object *lo = lod_dt_obj(dt);
2922 LASSERT(lo->ldo_striping_cached == 0);
2924 /* create all underlying objects */
2925 for (i = 0; i < lo->ldo_stripenr; i++) {
2926 LASSERT(lo->ldo_stripe[i]);
2927 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2933 rc = lod_generate_and_set_lovea(env, lo, th);
2938 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2939 struct lu_attr *attr,
2940 struct dt_allocation_hint *hint,
2941 struct dt_object_format *dof, struct thandle *th)
2943 struct dt_object *next = dt_object_child(dt);
2944 struct lod_object *lo = lod_dt_obj(dt);
2948 /* create local object */
2949 rc = dt_create(env, next, attr, hint, dof, th);
2953 if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2954 lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2955 rc = lod_striping_create(env, dt, attr, dof, th);
2960 static int lod_declare_object_destroy(const struct lu_env *env,
2961 struct dt_object *dt,
2964 struct dt_object *next = dt_object_child(dt);
2965 struct lod_object *lo = lod_dt_obj(dt);
2966 struct lod_thread_info *info = lod_env_info(env);
2967 char *stripe_name = info->lti_key;
2972 * load striping information, notice we don't do this when object
2973 * is being initialized as we don't need this information till
2974 * few specific cases like destroy, chown
2976 rc = lod_load_striping(env, lo);
2980 /* declare destroy for all underlying objects */
2981 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2982 rc = next->do_ops->do_index_try(env, next,
2983 &dt_directory_features);
2987 for (i = 0; i < lo->ldo_stripenr; i++) {
2988 rc = dt_declare_ref_del(env, next, th);
2991 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2992 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2994 rc = dt_declare_delete(env, next,
2995 (const struct dt_key *)stripe_name, th);
3001 * we declare destroy for the local object
3003 rc = dt_declare_destroy(env, next, th);
3007 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3010 /* declare destroy all striped objects */
3011 for (i = 0; i < lo->ldo_stripenr; i++) {
3012 if (likely(lo->ldo_stripe[i] != NULL)) {
3013 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
3022 static int lod_object_destroy(const struct lu_env *env,
3023 struct dt_object *dt, struct thandle *th)
3025 struct dt_object *next = dt_object_child(dt);
3026 struct lod_object *lo = lod_dt_obj(dt);
3027 struct lod_thread_info *info = lod_env_info(env);
3028 char *stripe_name = info->lti_key;
3033 /* destroy sub-stripe of master object */
3034 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
3035 rc = next->do_ops->do_index_try(env, next,
3036 &dt_directory_features);
3040 for (i = 0; i < lo->ldo_stripenr; i++) {
3041 rc = dt_ref_del(env, next, th);
3045 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
3046 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
3049 CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
3050 PFID(lu_object_fid(&dt->do_lu)), stripe_name,
3051 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
3053 rc = dt_delete(env, next,
3054 (const struct dt_key *)stripe_name,
3060 rc = dt_destroy(env, next, th);
3064 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3067 /* destroy all striped objects */
3068 for (i = 0; i < lo->ldo_stripenr; i++) {
3069 if (likely(lo->ldo_stripe[i] != NULL) &&
3070 (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
3071 i == cfs_fail_val)) {
3072 rc = dt_destroy(env, lo->ldo_stripe[i], th);
3081 static int lod_declare_ref_add(const struct lu_env *env,
3082 struct dt_object *dt, struct thandle *th)
3084 return dt_declare_ref_add(env, dt_object_child(dt), th);
3087 static int lod_ref_add(const struct lu_env *env,
3088 struct dt_object *dt, struct thandle *th)
3090 return dt_ref_add(env, dt_object_child(dt), th);
3093 static int lod_declare_ref_del(const struct lu_env *env,
3094 struct dt_object *dt, struct thandle *th)
3096 return dt_declare_ref_del(env, dt_object_child(dt), th);
3099 static int lod_ref_del(const struct lu_env *env,
3100 struct dt_object *dt, struct thandle *th)
3102 return dt_ref_del(env, dt_object_child(dt), th);
3105 static struct obd_capa *lod_capa_get(const struct lu_env *env,
3106 struct dt_object *dt,
3107 struct lustre_capa *old, __u64 opc)
3109 return dt_capa_get(env, dt_object_child(dt), old, opc);
3112 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
3113 __u64 start, __u64 end)
3115 return dt_object_sync(env, dt_object_child(dt), start, end);
3118 struct lod_slave_locks {
3120 struct lustre_handle lsl_handle[0];
3123 static int lod_object_unlock_internal(const struct lu_env *env,
3124 struct dt_object *dt,
3125 struct ldlm_enqueue_info *einfo,
3126 ldlm_policy_data_t *policy)
3128 struct lod_object *lo = lod_dt_obj(dt);
3129 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
3134 if (slave_locks == NULL)
3137 for (i = 1; i < slave_locks->lsl_lock_count; i++) {
3138 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
3141 einfo->ei_cbdata = &slave_locks->lsl_handle[i];
3142 rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
3145 rc = rc == 0 ? rc1 : rc;
3152 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
3153 struct ldlm_enqueue_info *einfo,
3154 union ldlm_policy_data *policy)
3156 struct lod_object *lo = lod_dt_obj(dt);
3157 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
3158 int slave_locks_size;
3162 if (slave_locks == NULL)
3165 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3168 rc = lod_load_striping(env, lo);
3172 /* Note: for remote lock for single stripe dir, MDT will cancel
3173 * the lock by lockh directly */
3174 if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
3177 /* Only cancel slave lock for striped dir */
3178 rc = lod_object_unlock_internal(env, dt, einfo, policy);
3180 slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
3181 sizeof(slave_locks->lsl_handle[0]);
3182 OBD_FREE(slave_locks, slave_locks_size);
3183 einfo->ei_cbdata = NULL;
3188 static int lod_object_lock(const struct lu_env *env,
3189 struct dt_object *dt,
3190 struct lustre_handle *lh,
3191 struct ldlm_enqueue_info *einfo,
3192 union ldlm_policy_data *policy)
3194 struct lod_object *lo = lod_dt_obj(dt);
3197 int slave_locks_size;
3198 struct lod_slave_locks *slave_locks = NULL;
3201 /* remote object lock */
3202 if (!einfo->ei_enq_slave) {
3203 LASSERT(dt_object_remote(dt));
3204 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
3208 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3211 rc = lod_load_striping(env, lo);
3216 if (lo->ldo_stripenr <= 1)
3219 slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
3220 sizeof(slave_locks->lsl_handle[0]);
3221 /* Freed in lod_object_unlock */
3222 OBD_ALLOC(slave_locks, slave_locks_size);
3223 if (slave_locks == NULL)
3225 slave_locks->lsl_lock_count = lo->ldo_stripenr;
3227 /* striped directory lock */
3228 for (i = 1; i < lo->ldo_stripenr; i++) {
3229 struct lustre_handle lockh;
3230 struct ldlm_res_id *res_id;
3232 res_id = &lod_env_info(env)->lti_res_id;
3233 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3235 einfo->ei_res_id = res_id;
3237 LASSERT(lo->ldo_stripe[i]);
3238 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3242 slave_locks->lsl_handle[i] = lockh;
3245 einfo->ei_cbdata = slave_locks;
3248 if (rc != 0 && slave_locks != NULL) {
3249 einfo->ei_cbdata = slave_locks;
3250 lod_object_unlock_internal(env, dt, einfo, policy);
3251 OBD_FREE(slave_locks, slave_locks_size);
3252 einfo->ei_cbdata = NULL;
3258 struct dt_object_operations lod_obj_ops = {
3259 .do_read_lock = lod_object_read_lock,
3260 .do_write_lock = lod_object_write_lock,
3261 .do_read_unlock = lod_object_read_unlock,
3262 .do_write_unlock = lod_object_write_unlock,
3263 .do_write_locked = lod_object_write_locked,
3264 .do_attr_get = lod_attr_get,
3265 .do_declare_attr_set = lod_declare_attr_set,
3266 .do_attr_set = lod_attr_set,
3267 .do_xattr_get = lod_xattr_get,
3268 .do_declare_xattr_set = lod_declare_xattr_set,
3269 .do_xattr_set = lod_xattr_set,
3270 .do_declare_xattr_del = lod_declare_xattr_del,
3271 .do_xattr_del = lod_xattr_del,
3272 .do_xattr_list = lod_xattr_list,
3273 .do_ah_init = lod_ah_init,
3274 .do_declare_create = lod_declare_object_create,
3275 .do_create = lod_object_create,
3276 .do_declare_destroy = lod_declare_object_destroy,
3277 .do_destroy = lod_object_destroy,
3278 .do_index_try = lod_index_try,
3279 .do_declare_ref_add = lod_declare_ref_add,
3280 .do_ref_add = lod_ref_add,
3281 .do_declare_ref_del = lod_declare_ref_del,
3282 .do_ref_del = lod_ref_del,
3283 .do_capa_get = lod_capa_get,
3284 .do_object_sync = lod_object_sync,
3285 .do_object_lock = lod_object_lock,
3286 .do_object_unlock = lod_object_unlock,
3289 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3290 struct lu_buf *buf, loff_t *pos,
3291 struct lustre_capa *capa)
3293 struct dt_object *next = dt_object_child(dt);
3294 return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3297 static ssize_t lod_declare_write(const struct lu_env *env,
3298 struct dt_object *dt,
3299 const struct lu_buf *buf, loff_t pos,
3302 return dt_declare_record_write(env, dt_object_child(dt),
3306 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3307 const struct lu_buf *buf, loff_t *pos,
3308 struct thandle *th, struct lustre_capa *capa, int iq)
3310 struct dt_object *next = dt_object_child(dt);
3312 return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3315 static const struct dt_body_operations lod_body_lnk_ops = {
3316 .dbo_read = lod_read,
3317 .dbo_declare_write = lod_declare_write,
3318 .dbo_write = lod_write
3321 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3322 const struct lu_object_conf *conf)
3324 struct lod_device *lod = lu2lod_dev(lo->lo_dev);
3325 struct lu_device *cdev = NULL;
3326 struct lu_object *cobj;
3327 struct lod_tgt_descs *ltd = NULL;
3328 struct lod_tgt_desc *tgt;
3330 int type = LU_SEQ_RANGE_ANY;
3334 rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3338 if (type == LU_SEQ_RANGE_MDT &&
3339 idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3340 cdev = &lod->lod_child->dd_lu_dev;
3341 } else if (type == LU_SEQ_RANGE_MDT) {
3342 ltd = &lod->lod_mdt_descs;
3344 } else if (type == LU_SEQ_RANGE_OST) {
3345 ltd = &lod->lod_ost_descs;
3352 if (ltd->ltd_tgts_size > idx &&
3353 cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3354 tgt = LTD_TGT(ltd, idx);
3356 LASSERT(tgt != NULL);
3357 LASSERT(tgt->ltd_tgt != NULL);
3359 cdev = &(tgt->ltd_tgt->dd_lu_dev);
3361 lod_putref(lod, ltd);
3364 if (unlikely(cdev == NULL))
3367 cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3368 if (unlikely(cobj == NULL))
3371 lu_object_add(lo, cobj);
3376 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3380 if (lo->ldo_dir_stripe != NULL) {
3381 OBD_FREE_PTR(lo->ldo_dir_stripe);
3382 lo->ldo_dir_stripe = NULL;
3385 if (lo->ldo_stripe) {
3386 LASSERT(lo->ldo_stripes_allocated > 0);
3388 for (i = 0; i < lo->ldo_stripenr; i++) {
3389 if (lo->ldo_stripe[i])
3390 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3393 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3394 OBD_FREE(lo->ldo_stripe, i);
3395 lo->ldo_stripe = NULL;
3396 lo->ldo_stripes_allocated = 0;
3398 lo->ldo_stripenr = 0;
3399 lo->ldo_pattern = 0;
3403 * ->start is called once all slices are initialized, including header's
3404 * cache for mode (object type). using the type we can initialize ops
3406 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3408 if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3409 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3413 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3415 struct lod_object *mo = lu2lod_obj(o);
3418 * release all underlying object pinned
3421 lod_object_free_striping(env, mo);
3423 lod_object_set_pool(mo, NULL);
3426 OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3429 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3431 /* XXX: shouldn't we release everything here in case if object
3432 * creation failed before? */
3435 static int lod_object_print(const struct lu_env *env, void *cookie,
3436 lu_printer_t p, const struct lu_object *l)
3438 struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3440 return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3443 struct lu_object_operations lod_lu_obj_ops = {
3444 .loo_object_init = lod_object_init,
3445 .loo_object_start = lod_object_start,
3446 .loo_object_free = lod_object_free,
3447 .loo_object_release = lod_object_release,
3448 .loo_object_print = lod_object_print,