4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2013, Intel Corporation.
29 * lustre/lod/lod_object.c
31 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
49 #include "lod_internal.h"
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58 struct dt_rec *rec, const struct dt_key *key,
59 struct lustre_capa *capa)
61 struct dt_object *next = dt_object_child(dt);
62 return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
65 static int lod_declare_index_insert(const struct lu_env *env,
67 const struct dt_rec *rec,
68 const struct dt_key *key,
69 struct thandle *handle)
71 return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
74 static int lod_index_insert(const struct lu_env *env,
76 const struct dt_rec *rec,
77 const struct dt_key *key,
79 struct lustre_capa *capa,
82 return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
85 static int lod_declare_index_delete(const struct lu_env *env,
87 const struct dt_key *key,
90 return dt_declare_delete(env, dt_object_child(dt), key, th);
93 static int lod_index_delete(const struct lu_env *env,
95 const struct dt_key *key,
97 struct lustre_capa *capa)
99 return dt_delete(env, dt_object_child(dt), key, th, capa);
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103 struct dt_object *dt, __u32 attr,
104 struct lustre_capa *capa)
106 struct dt_object *next = dt_object_child(dt);
107 struct lod_it *it = &lod_env_info(env)->lti_it;
108 struct dt_it *it_next;
111 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
115 /* currently we do not use more than one iterator per thread
116 * so we store it in thread info. if at some point we need
117 * more active iterators in a single thread, we can allocate
119 LASSERT(it->lit_obj == NULL);
121 it->lit_it = it_next;
124 return (struct dt_it *)it;
127 #define LOD_CHECK_IT(env, it) \
129 LASSERT((it)->lit_obj != NULL); \
130 LASSERT((it)->lit_it != NULL); \
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
135 struct lod_it *it = (struct lod_it *)di;
137 LOD_CHECK_IT(env, it);
138 it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
140 /* the iterator not in use any more */
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146 const struct dt_key *key)
148 const struct lod_it *it = (const struct lod_it *)di;
150 LOD_CHECK_IT(env, it);
151 return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
156 struct lod_it *it = (struct lod_it *)di;
158 LOD_CHECK_IT(env, it);
159 return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
164 struct lod_it *it = (struct lod_it *)di;
166 LOD_CHECK_IT(env, it);
167 return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
172 const struct lod_it *it = (const struct lod_it *)di;
174 LOD_CHECK_IT(env, it);
175 return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
180 struct lod_it *it = (struct lod_it *)di;
182 LOD_CHECK_IT(env, it);
183 return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187 struct dt_rec *rec, __u32 attr)
189 const struct lod_it *it = (const struct lod_it *)di;
191 LOD_CHECK_IT(env, it);
192 return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
199 const struct lod_it *it = (const struct lod_it *)di;
201 LOD_CHECK_IT(env, it);
202 return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
208 const struct lod_it *it = (const struct lod_it *)di;
210 LOD_CHECK_IT(env, it);
211 return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
216 const struct lod_it *it = (const struct lod_it *)di;
218 LOD_CHECK_IT(env, it);
219 return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
225 const struct lod_it *it = (const struct lod_it *)di;
227 LOD_CHECK_IT(env, it);
228 return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
232 static struct dt_index_operations lod_index_ops = {
233 .dio_lookup = lod_index_lookup,
234 .dio_declare_insert = lod_declare_index_insert,
235 .dio_insert = lod_index_insert,
236 .dio_declare_delete = lod_declare_index_delete,
237 .dio_delete = lod_index_delete,
245 .key_size = lod_it_key_size,
247 .rec_size = lod_it_rec_size,
248 .store = lod_it_store,
250 .key_rec = lod_it_key_rec,
255 * Implementation of dt_index_operations:: dio_it.init
257 * This function is to initialize the iterator for striped directory,
258 * basically these lod_striped_it_xxx will just locate the stripe
259 * and call the correspondent api of its next lower layer.
261 * \param[in] env execution environment.
262 * \param[in] dt the striped directory object to be iterated.
263 * \param[in] attr the attribute of iterator, mostly used to indicate
264 * the entry attribute in the object to be iterated.
265 * \param[in] capa capability(useless in current implementation)
267 * \retval initialized iterator(dt_it) if successful initialize the
268 * iteration. lit_stripe_index will be used to indicate the
269 * current iterate position among stripes.
270 * \retval ERR pointer if initialization is failed.
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273 struct dt_object *dt, __u32 attr,
274 struct lustre_capa *capa)
276 struct lod_object *lo = lod_dt_obj(dt);
277 struct dt_object *next;
278 struct lod_it *it = &lod_env_info(env)->lti_it;
279 struct dt_it *it_next;
282 LASSERT(lo->ldo_stripenr > 0);
283 next = lo->ldo_stripe[0];
284 LASSERT(next != NULL);
285 LASSERT(next->do_index_ops != NULL);
287 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
291 /* currently we do not use more than one iterator per thread
292 * so we store it in thread info. if at some point we need
293 * more active iterators in a single thread, we can allocate
295 LASSERT(it->lit_obj == NULL);
297 it->lit_stripe_index = 0;
299 it->lit_it = it_next;
302 return (struct dt_it *)it;
305 #define LOD_CHECK_STRIPED_IT(env, it, lo) \
307 LASSERT((it)->lit_obj != NULL); \
308 LASSERT((it)->lit_it != NULL); \
309 LASSERT((lo)->ldo_stripenr > 0); \
310 LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \
314 * Implementation of dt_index_operations:: dio_it.fini
316 * This function is to finish the iterator for striped directory.
318 * \param[in] env execution environment.
319 * \param[in] di the iterator for the striped directory
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
324 struct lod_it *it = (struct lod_it *)di;
325 struct lod_object *lo = lod_dt_obj(it->lit_obj);
326 struct dt_object *next;
328 LOD_CHECK_STRIPED_IT(env, it, lo);
330 next = lo->ldo_stripe[it->lit_stripe_index];
331 LASSERT(next != NULL);
332 LASSERT(next->do_index_ops != NULL);
334 next->do_index_ops->dio_it.fini(env, it->lit_it);
336 /* the iterator not in use any more */
339 it->lit_stripe_index = 0;
343 * Implementation of dt_index_operations:: dio_it.get
345 * This function is to position the iterator with given key
347 * \param[in] env execution environment.
348 * \param[in] di the iterator for striped directory.
349 * \param[in] key the key the iterator will be positioned.
351 * \retval 0 if successfully position iterator by the key.
352 * \retval negative error if position is failed.
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355 const struct dt_key *key)
357 const struct lod_it *it = (const struct lod_it *)di;
358 struct lod_object *lo = lod_dt_obj(it->lit_obj);
359 struct dt_object *next;
362 LOD_CHECK_STRIPED_IT(env, it, lo);
364 next = lo->ldo_stripe[it->lit_stripe_index];
365 LASSERT(next != NULL);
366 LASSERT(next->do_index_ops != NULL);
368 return next->do_index_ops->dio_it.get(env, it->lit_it, key);
372 * Implementation of dt_index_operations:: dio_it.put
374 * This function is supposed to be the pair of it_get, but currently do
375 * nothing. see (osd_it_ea_put or osd_index_it_put)
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
379 struct lod_it *it = (struct lod_it *)di;
380 struct lod_object *lo = lod_dt_obj(it->lit_obj);
381 struct dt_object *next;
383 LOD_CHECK_STRIPED_IT(env, it, lo);
385 next = lo->ldo_stripe[it->lit_stripe_index];
386 LASSERT(next != NULL);
387 LASSERT(next->do_index_ops != NULL);
389 return next->do_index_ops->dio_it.put(env, it->lit_it);
393 * Implementation of dt_index_operations:: dio_it.next
395 * This function is to position the iterator to the next entry, if current
396 * stripe is finished by checking the return value of next() in current
397 * stripe. it will go to next stripe. In the mean time, the sub-iterator
398 * for next stripe needs to be initialized.
400 * \param[in] env execution environment.
401 * \param[in] di the iterator for striped directory.
403 * \retval 0 if successfully position iterator to the next entry.
404 * \retval negative error if position is failed.
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
408 struct lod_it *it = (struct lod_it *)di;
409 struct lod_object *lo = lod_dt_obj(it->lit_obj);
410 struct dt_object *next;
411 struct dt_it *it_next;
415 LOD_CHECK_STRIPED_IT(env, it, lo);
417 next = lo->ldo_stripe[it->lit_stripe_index];
418 LASSERT(next != NULL);
419 LASSERT(next->do_index_ops != NULL);
421 rc = next->do_index_ops->dio_it.next(env, it->lit_it);
425 if (rc == 0 && it->lit_stripe_index == 0)
428 if (rc == 0 && it->lit_stripe_index > 0) {
429 struct lu_dirent *ent;
431 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
433 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434 (struct dt_rec *)ent,
439 /* skip . and .. for slave stripe */
440 if ((strncmp(ent->lde_name, ".",
441 le16_to_cpu(ent->lde_namelen)) == 0 &&
442 le16_to_cpu(ent->lde_namelen) == 1) ||
443 (strncmp(ent->lde_name, "..",
444 le16_to_cpu(ent->lde_namelen)) == 0 &&
445 le16_to_cpu(ent->lde_namelen) == 2))
451 /* go to next stripe */
452 if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
455 it->lit_stripe_index++;
457 next->do_index_ops->dio_it.put(env, it->lit_it);
458 next->do_index_ops->dio_it.fini(env, it->lit_it);
460 rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
464 next = lo->ldo_stripe[it->lit_stripe_index];
465 LASSERT(next != NULL);
466 LASSERT(next->do_index_ops != NULL);
468 it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
470 if (!IS_ERR(it_next)) {
471 it->lit_it = it_next;
474 rc = PTR_ERR(it_next);
481 * Implementation of dt_index_operations:: dio_it.key
483 * This function is to get the key of the iterator at current position.
485 * \param[in] env execution environment.
486 * \param[in] di the iterator for striped directory.
488 * \retval key(dt_key) if successfully get the key.
489 * \retval negative error if can not get the key.
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492 const struct dt_it *di)
494 const struct lod_it *it = (const struct lod_it *)di;
495 struct lod_object *lo = lod_dt_obj(it->lit_obj);
496 struct dt_object *next;
498 LOD_CHECK_STRIPED_IT(env, it, lo);
500 next = lo->ldo_stripe[it->lit_stripe_index];
501 LASSERT(next != NULL);
502 LASSERT(next->do_index_ops != NULL);
504 return next->do_index_ops->dio_it.key(env, it->lit_it);
508 * Implementation of dt_index_operations:: dio_it.key_size
510 * This function is to get the key_size of current key.
512 * \param[in] env execution environment.
513 * \param[in] di the iterator for striped directory.
515 * \retval key_size if successfully get the key_size.
516 * \retval negative error if can not get the key_size.
518 static int lod_striped_it_key_size(const struct lu_env *env,
519 const struct dt_it *di)
521 struct lod_it *it = (struct lod_it *)di;
522 struct lod_object *lo = lod_dt_obj(it->lit_obj);
523 struct dt_object *next;
525 LOD_CHECK_STRIPED_IT(env, it, lo);
527 next = lo->ldo_stripe[it->lit_stripe_index];
528 LASSERT(next != NULL);
529 LASSERT(next->do_index_ops != NULL);
531 return next->do_index_ops->dio_it.key_size(env, it->lit_it);
535 * Implementation of dt_index_operations:: dio_it.rec
537 * This function is to get the record at current position.
539 * \param[in] env execution environment.
540 * \param[in] di the iterator for striped directory.
541 * \param[in] attr the attribute of iterator, mostly used to indicate
542 * the entry attribute in the object to be iterated.
543 * \param[out] rec hold the return record.
545 * \retval 0 if successfully get the entry.
546 * \retval negative error if can not get entry.
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549 struct dt_rec *rec, __u32 attr)
551 const struct lod_it *it = (const struct lod_it *)di;
552 struct lod_object *lo = lod_dt_obj(it->lit_obj);
553 struct dt_object *next;
555 LOD_CHECK_STRIPED_IT(env, it, lo);
557 next = lo->ldo_stripe[it->lit_stripe_index];
558 LASSERT(next != NULL);
559 LASSERT(next->do_index_ops != NULL);
561 return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
565 * Implementation of dt_index_operations:: dio_it.rec_size
567 * This function is to get the record_size at current record.
569 * \param[in] env execution environment.
570 * \param[in] di the iterator for striped directory.
571 * \param[in] attr the attribute of iterator, mostly used to indicate
572 * the entry attribute in the object to be iterated.
574 * \retval rec_size if successfully get the entry size.
575 * \retval negative error if can not get entry size.
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578 const struct dt_it *di, __u32 attr)
580 struct lod_it *it = (struct lod_it *)di;
581 struct lod_object *lo = lod_dt_obj(it->lit_obj);
582 struct dt_object *next;
584 LOD_CHECK_STRIPED_IT(env, it, lo);
586 next = lo->ldo_stripe[it->lit_stripe_index];
587 LASSERT(next != NULL);
588 LASSERT(next->do_index_ops != NULL);
590 return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
594 * Implementation of dt_index_operations:: dio_it.store
596 * This function will a cookie for current position of the iterator head,
597 * so that user can use this cookie to load/start the iterator next time.
599 * \param[in] env execution environment.
600 * \param[in] di the iterator for striped directory.
602 * \retval the cookie.
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605 const struct dt_it *di)
607 const struct lod_it *it = (const struct lod_it *)di;
608 struct lod_object *lo = lod_dt_obj(it->lit_obj);
609 struct dt_object *next;
611 LOD_CHECK_STRIPED_IT(env, it, lo);
613 next = lo->ldo_stripe[it->lit_stripe_index];
614 LASSERT(next != NULL);
615 LASSERT(next->do_index_ops != NULL);
617 return next->do_index_ops->dio_it.store(env, it->lit_it);
621 * Implementation of dt_index_operations:: dio_it.load
623 * This function will position the iterator with the given hash(usually
626 * \param[in] env execution environment.
627 * \param[in] di the iterator for striped directory.
628 * \param[in] hash the given hash.
630 * \retval >0 if successfuly load the iterator to the given position.
631 * \retval <0 if load is failed.
633 static int lod_striped_it_load(const struct lu_env *env,
634 const struct dt_it *di, __u64 hash)
636 const struct lod_it *it = (const struct lod_it *)di;
637 struct lod_object *lo = lod_dt_obj(it->lit_obj);
638 struct dt_object *next;
640 LOD_CHECK_STRIPED_IT(env, it, lo);
642 next = lo->ldo_stripe[it->lit_stripe_index];
643 LASSERT(next != NULL);
644 LASSERT(next->do_index_ops != NULL);
646 return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
649 static struct dt_index_operations lod_striped_index_ops = {
650 .dio_lookup = lod_index_lookup,
651 .dio_declare_insert = lod_declare_index_insert,
652 .dio_insert = lod_index_insert,
653 .dio_declare_delete = lod_declare_index_delete,
654 .dio_delete = lod_index_delete,
656 .init = lod_striped_it_init,
657 .fini = lod_striped_it_fini,
658 .get = lod_striped_it_get,
659 .put = lod_striped_it_put,
660 .next = lod_striped_it_next,
661 .key = lod_striped_it_key,
662 .key_size = lod_striped_it_key_size,
663 .rec = lod_striped_it_rec,
664 .rec_size = lod_striped_it_rec_size,
665 .store = lod_striped_it_store,
666 .load = lod_striped_it_load,
671 * Append the FID for each shard of the striped directory after the
672 * given LMV EA header.
674 * To simplify striped directory and the consistency verification,
675 * we only store the LMV EA header on disk, for both master object
676 * and slave objects. When someone wants to know the whole LMV EA,
677 * such as client readdir(), we can build the entrie LMV EA on the
678 * MDT side (in RAM) via iterating the sub-directory entries that
679 * are contained in the master object of the stripe directory.
681 * For the master object of the striped directroy, the valid name
682 * for each shard is composed of the ${shard_FID}:${shard_idx}.
684 * There may be holes in the LMV EA if some shards' name entries
685 * are corrupted or lost.
687 * \param[in] env pointer to the thread context
688 * \param[in] lo pointer to the master object of the striped directory
689 * \param[in] buf pointer to the lu_buf which will hold the LMV EA
690 * \param[in] resize whether re-allocate the buffer if it is not big enough
692 * \retval positive size of the LMV EA
693 * \retval 0 for nothing to be loaded
694 * \retval negative error number on failure
696 int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
697 struct lu_buf *buf, bool resize)
699 struct lu_dirent *ent =
700 (struct lu_dirent *)lod_env_info(env)->lti_key;
701 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
702 struct dt_object *obj = dt_object_child(&lo->ldo_obj);
703 struct lmv_mds_md_v1 *lmv1 = buf->lb_buf;
705 const struct dt_it_ops *iops;
707 __u32 magic = le32_to_cpu(lmv1->lmv_magic);
712 /* If it is not a striped directory, then load nothing. */
713 if (magic != LMV_MAGIC_V1)
716 /* If it is in migration (or failure), then load nothing. */
717 if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
720 stripes = le32_to_cpu(lmv1->lmv_stripe_count);
724 rc = lmv_mds_md_size(stripes, magic);
728 if (buf->lb_len < lmv1_size) {
737 lu_buf_alloc(buf, lmv1_size);
742 memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
745 if (unlikely(!dt_try_as_dir(env, obj)))
748 memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
749 iops = &obj->do_index_ops->dio_it;
750 it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
754 rc = iops->load(env, it, 0);
756 rc = iops->next(env, it);
761 char name[FID_LEN + 2] = "";
766 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
772 fid_le_to_cpu(&fid, &ent->lde_fid);
773 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
774 if (ent->lde_name[0] == '.') {
775 if (ent->lde_namelen == 1)
778 if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
782 len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
783 /* The ent->lde_name is composed of ${FID}:${index} */
784 if (ent->lde_namelen < len + 1 ||
785 memcmp(ent->lde_name, name, len) != 0) {
786 CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
787 "%s: invalid shard name %.*s with the FID "DFID
788 " for the striped directory "DFID", %s\n",
789 lod2obd(lod)->obd_name, ent->lde_namelen,
790 ent->lde_name, PFID(&fid),
791 PFID(lu_object_fid(&obj->do_lu)),
792 lod->lod_lmv_failout ? "failout" : "skip");
794 if (lod->lod_lmv_failout)
802 if (ent->lde_name[len] < '0' ||
803 ent->lde_name[len] > '9') {
804 CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
805 "%s: invalid shard name %.*s with the "
806 "FID "DFID" for the striped directory "
808 lod2obd(lod)->obd_name, ent->lde_namelen,
809 ent->lde_name, PFID(&fid),
810 PFID(lu_object_fid(&obj->do_lu)),
811 lod->lod_lmv_failout ?
814 if (lod->lod_lmv_failout)
820 index = index * 10 + ent->lde_name[len++] - '0';
821 } while (len < ent->lde_namelen);
823 if (len == ent->lde_namelen) {
824 /* Out of LMV EA range. */
825 if (index >= stripes) {
826 CERROR("%s: the shard %.*s for the striped "
827 "directory "DFID" is out of the known "
828 "LMV EA range [0 - %u], failout\n",
829 lod2obd(lod)->obd_name, ent->lde_namelen,
831 PFID(lu_object_fid(&obj->do_lu)),
837 /* The slot has been occupied. */
838 if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
842 &lmv1->lmv_stripe_fids[index]);
843 CERROR("%s: both the shard "DFID" and "DFID
844 " for the striped directory "DFID
845 " claim the same LMV EA slot at the "
846 "index %d, failout\n",
847 lod2obd(lod)->obd_name,
848 PFID(&fid0), PFID(&fid),
849 PFID(lu_object_fid(&obj->do_lu)), index);
854 /* stored as LE mode */
855 lmv1->lmv_stripe_fids[index] = ent->lde_fid;
858 rc = iops->next(env, it);
865 RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
869 * Implementation of dt_object_operations:: do_index_try
871 * This function will try to initialize the index api pointer for the
872 * given object, usually it the entry point of the index api. i.e.
873 * the index object should be initialized in index_try, then start
874 * using index api. For striped directory, it will try to initialize
875 * all of its sub_stripes.
877 * \param[in] env execution environment.
878 * \param[in] dt the index object to be initialized.
879 * \param[in] feat the features of this object, for example fixed or
880 * variable key size etc.
882 * \retval >0 if the initialization is successful.
883 * \retval <0 if the initialization is failed.
885 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
886 const struct dt_index_features *feat)
888 struct lod_object *lo = lod_dt_obj(dt);
889 struct dt_object *next = dt_object_child(dt);
893 LASSERT(next->do_ops);
894 LASSERT(next->do_ops->do_index_try);
896 rc = lod_load_striping_locked(env, lo);
900 rc = next->do_ops->do_index_try(env, next, feat);
904 if (lo->ldo_stripenr > 0) {
907 for (i = 0; i < lo->ldo_stripenr; i++) {
908 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
910 rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
911 lo->ldo_stripe[i], feat);
915 dt->do_index_ops = &lod_striped_index_ops;
917 dt->do_index_ops = &lod_index_ops;
923 static void lod_object_read_lock(const struct lu_env *env,
924 struct dt_object *dt, unsigned role)
926 dt_read_lock(env, dt_object_child(dt), role);
929 static void lod_object_write_lock(const struct lu_env *env,
930 struct dt_object *dt, unsigned role)
932 dt_write_lock(env, dt_object_child(dt), role);
935 static void lod_object_read_unlock(const struct lu_env *env,
936 struct dt_object *dt)
938 dt_read_unlock(env, dt_object_child(dt));
941 static void lod_object_write_unlock(const struct lu_env *env,
942 struct dt_object *dt)
944 dt_write_unlock(env, dt_object_child(dt));
947 static int lod_object_write_locked(const struct lu_env *env,
948 struct dt_object *dt)
950 return dt_write_locked(env, dt_object_child(dt));
953 static int lod_attr_get(const struct lu_env *env,
954 struct dt_object *dt,
955 struct lu_attr *attr,
956 struct lustre_capa *capa)
958 /* Note: for striped directory, client will merge attributes
959 * from all of the sub-stripes see lmv_merge_attr(), and there
960 * no MDD logic depend on directory nlink/size/time, so we can
961 * always use master inode nlink and size for now. */
962 return dt_attr_get(env, dt_object_child(dt), attr, capa);
966 * Mark all of sub-stripes dead of the striped directory.
968 static int lod_mark_dead_object(const struct lu_env *env,
969 struct dt_object *dt,
970 struct thandle *handle,
973 struct lod_object *lo = lod_dt_obj(dt);
974 struct lmv_mds_md_v1 *lmv;
975 __u32 dead_hash_type;
981 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
984 rc = lod_load_striping_locked(env, lo);
988 if (lo->ldo_stripenr == 0)
991 rc = lod_get_lmv_ea(env, lo);
995 lmv = lod_env_info(env)->lti_ea_store;
996 lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
997 dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
998 lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
999 for (i = 0; i < lo->ldo_stripenr; i++) {
1002 lmv->lmv_master_mdt_index = i;
1004 buf.lb_len = sizeof(*lmv);
1006 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
1008 LU_XATTR_REPLACE, handle);
1010 rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
1011 XATTR_NAME_LMV, LU_XATTR_REPLACE,
1012 handle, BYPASS_CAPA);
1021 static int lod_declare_attr_set(const struct lu_env *env,
1022 struct dt_object *dt,
1023 const struct lu_attr *attr,
1024 struct thandle *handle)
1026 struct dt_object *next = dt_object_child(dt);
1027 struct lod_object *lo = lod_dt_obj(dt);
1031 /* Set dead object on all other stripes */
1032 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1033 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1034 rc = lod_mark_dead_object(env, dt, handle, true);
1039 * declare setattr on the local object
1041 rc = dt_declare_attr_set(env, next, attr, handle);
1045 /* osp_declare_attr_set() ignores all attributes other than
1046 * UID, GID, and size, and osp_attr_set() ignores all but UID
1047 * and GID. Declaration of size attr setting happens through
1048 * lod_declare_init_size(), and not through this function.
1049 * Therefore we need not load striping unless ownership is
1050 * changing. This should save memory and (we hope) speed up
1052 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1053 if (!(attr->la_valid & (LA_UID | LA_GID)))
1056 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1059 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1060 LA_ATIME | LA_MTIME | LA_CTIME)))
1064 * load striping information, notice we don't do this when object
1065 * is being initialized as we don't need this information till
1066 * few specific cases like destroy, chown
1068 rc = lod_load_striping(env, lo);
1072 if (lo->ldo_stripenr == 0)
1076 * if object is striped declare changes on the stripes
1078 LASSERT(lo->ldo_stripe);
1079 for (i = 0; i < lo->ldo_stripenr; i++) {
1080 if (likely(lo->ldo_stripe[i] != NULL)) {
1081 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
1084 CERROR("failed declaration: %d\n", rc);
1090 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1091 dt_object_exists(next) != 0 &&
1092 dt_object_remote(next) == 0)
1093 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
1095 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1096 dt_object_exists(next) &&
1097 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1098 struct lod_thread_info *info = lod_env_info(env);
1099 struct lu_buf *buf = &info->lti_buf;
1101 buf->lb_buf = info->lti_ea_store;
1102 buf->lb_len = info->lti_ea_store_size;
1103 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
1104 LU_XATTR_REPLACE, handle);
1110 static int lod_attr_set(const struct lu_env *env,
1111 struct dt_object *dt,
1112 const struct lu_attr *attr,
1113 struct thandle *handle,
1114 struct lustre_capa *capa)
1116 struct dt_object *next = dt_object_child(dt);
1117 struct lod_object *lo = lod_dt_obj(dt);
1121 /* Set dead object on all other stripes */
1122 if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1123 attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1124 rc = lod_mark_dead_object(env, dt, handle, false);
1129 * apply changes to the local object
1131 rc = dt_attr_set(env, next, attr, handle, capa);
1135 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1136 if (!(attr->la_valid & (LA_UID | LA_GID)))
1139 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1142 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1143 LA_ATIME | LA_MTIME | LA_CTIME)))
1147 if (lo->ldo_stripenr == 0)
1151 * if object is striped, apply changes to all the stripes
1153 LASSERT(lo->ldo_stripe);
1154 for (i = 0; i < lo->ldo_stripenr; i++) {
1155 if (unlikely(lo->ldo_stripe[i] == NULL))
1157 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
1158 (dt_object_exists(lo->ldo_stripe[i]) == 0))
1161 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
1163 CERROR("failed declaration: %d\n", rc);
1168 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1169 dt_object_exists(next) != 0 &&
1170 dt_object_remote(next) == 0)
1171 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1173 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1174 dt_object_exists(next) &&
1175 dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1176 struct lod_thread_info *info = lod_env_info(env);
1177 struct lu_buf *buf = &info->lti_buf;
1178 struct ost_id *oi = &info->lti_ostid;
1179 struct lu_fid *fid = &info->lti_fid;
1180 struct lov_mds_md_v1 *lmm;
1181 struct lov_ost_data_v1 *objs;
1185 rc1 = lod_get_lov_ea(env, lo);
1189 buf->lb_buf = info->lti_ea_store;
1190 buf->lb_len = info->lti_ea_store_size;
1191 lmm = info->lti_ea_store;
1192 magic = le32_to_cpu(lmm->lmm_magic);
1193 if (magic == LOV_MAGIC_V1)
1194 objs = &(lmm->lmm_objects[0]);
1196 objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1197 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1198 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1200 fid_to_ostid(fid, oi);
1201 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1202 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1203 LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1209 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1210 struct lu_buf *buf, const char *name,
1211 struct lustre_capa *capa)
1213 struct lod_thread_info *info = lod_env_info(env);
1214 struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev);
1218 rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1219 if (strcmp(name, XATTR_NAME_LMV) == 0) {
1220 struct lmv_mds_md_v1 *lmv1;
1223 if (rc > (typeof(rc))sizeof(*lmv1))
1226 if (rc < (typeof(rc))sizeof(*lmv1))
1227 RETURN(rc = rc > 0 ? -EINVAL : rc);
1229 if (buf->lb_buf == NULL || buf->lb_len == 0) {
1230 CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
1232 info->lti_buf.lb_buf = info->lti_key;
1233 info->lti_buf.lb_len = sizeof(*lmv1);
1234 rc = dt_xattr_get(env, dt_object_child(dt),
1235 &info->lti_buf, name, capa);
1236 if (unlikely(rc != sizeof(*lmv1)))
1237 RETURN(rc = rc > 0 ? -EINVAL : rc);
1239 lmv1 = info->lti_buf.lb_buf;
1240 /* The on-disk LMV EA only contains header, but the
1241 * returned LMV EA size should contain the space for
1242 * the FIDs of all shards of the striped directory. */
1243 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
1244 rc = lmv_mds_md_size(
1245 le32_to_cpu(lmv1->lmv_stripe_count),
1248 rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
1252 RETURN(rc = rc1 != 0 ? rc1 : rc);
1255 if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1259 * lod returns default striping on the real root of the device
1260 * this is like the root stores default striping for the whole
1261 * filesystem. historically we've been using a different approach
1262 * and store it in the config.
1264 dt_root_get(env, dev->lod_child, &info->lti_fid);
1265 is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1267 if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1268 struct lov_user_md *lum = buf->lb_buf;
1269 struct lov_desc *desc = &dev->lod_desc;
1271 if (buf->lb_buf == NULL) {
1273 } else if (buf->lb_len >= sizeof(*lum)) {
1274 lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1275 lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1276 lmm_oi_set_id(&lum->lmm_oi, 0);
1277 lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1278 lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1279 lum->lmm_stripe_size = cpu_to_le32(
1280 desc->ld_default_stripe_size);
1281 lum->lmm_stripe_count = cpu_to_le16(
1282 desc->ld_default_stripe_count);
1283 lum->lmm_stripe_offset = cpu_to_le16(
1284 desc->ld_default_stripe_offset);
1294 static int lod_verify_md_striping(struct lod_device *lod,
1295 const struct lmv_user_md_v1 *lum)
1300 if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1301 GOTO(out, rc = -EINVAL);
1303 if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1304 GOTO(out, rc = -EINVAL);
1307 CERROR("%s: invalid lmv_user_md: magic = %x, "
1308 "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1309 lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1310 (int)le32_to_cpu(lum->lum_stripe_offset),
1311 le32_to_cpu(lum->lum_stripe_count), rc);
1316 * Master LMVEA will be same as slave LMVEA, except
1317 * 1. different magic
1318 * 2. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1320 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1321 const struct lmv_mds_md_v1 *master_lmv)
1323 *slave_lmv = *master_lmv;
1324 slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1327 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1328 struct lu_buf *lmv_buf)
1330 struct lod_thread_info *info = lod_env_info(env);
1331 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1332 struct lod_object *lo = lod_dt_obj(dt);
1333 struct lmv_mds_md_v1 *lmm1;
1335 int type = LU_SEQ_RANGE_ANY;
1340 LASSERT(lo->ldo_dir_striped != 0);
1341 LASSERT(lo->ldo_stripenr > 0);
1342 stripe_count = lo->ldo_stripenr;
1343 /* Only store the LMV EA heahder on the disk. */
1344 if (info->lti_ea_store_size < sizeof(*lmm1)) {
1345 rc = lod_ea_store_resize(info, sizeof(*lmm1));
1349 memset(info->lti_ea_store, 0, sizeof(*lmm1));
1352 lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1353 lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1354 lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1355 lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1356 rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1361 lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1362 lmv_buf->lb_buf = info->lti_ea_store;
1363 lmv_buf->lb_len = sizeof(*lmm1);
1364 lo->ldo_dir_striping_cached = 1;
1369 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1370 const struct lu_buf *buf)
1372 struct lod_thread_info *info = lod_env_info(env);
1373 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1374 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1375 struct dt_object **stripe;
1376 union lmv_mds_md *lmm = buf->lb_buf;
1377 struct lmv_mds_md_v1 *lmv1 = &lmm->lmv_md_v1;
1378 struct lu_fid *fid = &info->lti_fid;
1383 if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1386 if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1387 lo->ldo_dir_slave_stripe = 1;
1391 if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1394 if (le32_to_cpu(lmv1->lmv_stripe_count) < 1)
1397 LASSERT(lo->ldo_stripe == NULL);
1398 OBD_ALLOC(stripe, sizeof(stripe[0]) *
1399 (le32_to_cpu(lmv1->lmv_stripe_count)));
1403 for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1404 struct dt_device *tgt_dt;
1405 struct dt_object *dto;
1406 int type = LU_SEQ_RANGE_ANY;
1409 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1410 if (!fid_is_sane(fid))
1411 GOTO(out, rc = -ESTALE);
1413 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1417 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1418 tgt_dt = lod->lod_child;
1420 struct lod_tgt_desc *tgt;
1422 tgt = LTD_TGT(ltd, idx);
1424 GOTO(out, rc = -ESTALE);
1425 tgt_dt = tgt->ltd_tgt;
1428 dto = dt_locate_at(env, tgt_dt, fid,
1429 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1432 GOTO(out, rc = PTR_ERR(dto));
1437 lo->ldo_stripe = stripe;
1438 lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1439 lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1441 lod_object_free_striping(env, lo);
1446 static int lod_prep_md_striped_create(const struct lu_env *env,
1447 struct dt_object *dt,
1448 struct lu_attr *attr,
1449 const struct lmv_user_md_v1 *lum,
1450 struct dt_object_format *dof,
1453 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1454 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1455 struct lod_object *lo = lod_dt_obj(dt);
1456 struct lod_thread_info *info = lod_env_info(env);
1457 struct dt_object **stripe;
1458 struct lu_buf lmv_buf;
1459 struct lu_buf slave_lmv_buf;
1460 struct lmv_mds_md_v1 *lmm;
1461 struct lmv_mds_md_v1 *slave_lmm = NULL;
1462 struct dt_insert_rec *rec = &info->lti_dt_rec;
1470 /* The lum has been verifed in lod_verify_md_striping */
1471 LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1472 LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1474 stripe_count = le32_to_cpu(lum->lum_stripe_count);
1476 /* shrink the stripe_count to the avaible MDT count */
1477 if (stripe_count > lod->lod_remote_mdt_count + 1)
1478 stripe_count = lod->lod_remote_mdt_count + 1;
1480 OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1484 OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1485 if (idx_array == NULL)
1486 GOTO(out_free, rc = -ENOMEM);
1488 for (i = 0; i < stripe_count; i++) {
1489 struct lod_tgt_desc *tgt = NULL;
1490 struct dt_object *dto;
1491 struct lu_fid fid = { 0 };
1493 struct lu_object_conf conf = { 0 };
1494 struct dt_device *tgt_dt = NULL;
1497 /* Right now, master stripe and master object are
1498 * on the same MDT */
1499 idx = le32_to_cpu(lum->lum_stripe_offset);
1500 rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1504 tgt_dt = lod->lod_child;
1508 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1510 for (j = 0; j < lod->lod_remote_mdt_count;
1511 j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1512 bool already_allocated = false;
1515 CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
1516 " allocated %u, last allocated %d\n", idx,
1517 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1519 /* Find next available target */
1520 if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1523 /* check whether the idx already exists
1524 * in current allocated array */
1525 for (k = 0; k < i; k++) {
1526 if (idx_array[k] == idx) {
1527 already_allocated = true;
1532 if (already_allocated)
1535 /* check the status of the OSP */
1536 tgt = LTD_TGT(ltd, idx);
1540 tgt_dt = tgt->ltd_tgt;
1541 rc = dt_statfs(env, tgt_dt, NULL);
1543 /* this OSP doesn't feel well */
1548 rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1557 /* Can not allocate more stripes */
1558 if (j == lod->lod_remote_mdt_count) {
1559 CDEBUG(D_INFO, "%s: require stripes %u only get %d\n",
1560 lod2obd(lod)->obd_name, stripe_count, i - 1);
1564 CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
1565 " allocated %u, last allocated %d\n", idx,
1566 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1569 /* tgt_dt and fid must be ready after search avaible OSP
1570 * in the above loop */
1571 LASSERT(tgt_dt != NULL);
1572 LASSERT(fid_is_sane(&fid));
1573 conf.loc_flags = LOC_F_NEW;
1574 dto = dt_locate_at(env, tgt_dt, &fid,
1575 dt->do_lu.lo_dev->ld_site->ls_top_dev,
1578 GOTO(out_put, rc = PTR_ERR(dto));
1583 lo->ldo_dir_striped = 1;
1584 lo->ldo_stripe = stripe;
1585 lo->ldo_stripenr = i;
1586 lo->ldo_stripes_allocated = stripe_count;
1588 if (lo->ldo_stripenr == 0)
1589 GOTO(out_put, rc = -ENOSPC);
1591 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1594 lmm = lmv_buf.lb_buf;
1596 OBD_ALLOC_PTR(slave_lmm);
1597 if (slave_lmm == NULL)
1598 GOTO(out_put, rc = -ENOMEM);
1600 lod_prep_slave_lmv_md(slave_lmm, lmm);
1601 slave_lmv_buf.lb_buf = slave_lmm;
1602 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1604 if (!dt_try_as_dir(env, dt_object_child(dt)))
1605 GOTO(out_put, rc = -EINVAL);
1607 rec->rec_type = S_IFDIR;
1608 for (i = 0; i < lo->ldo_stripenr; i++) {
1609 struct dt_object *dto = stripe[i];
1610 char *stripe_name = info->lti_key;
1611 struct lu_name *sname;
1612 struct linkea_data ldata = { 0 };
1613 struct lu_buf linkea_buf;
1615 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1619 if (!dt_try_as_dir(env, dto))
1620 GOTO(out_put, rc = -EINVAL);
1622 rec->rec_fid = lu_object_fid(&dto->do_lu);
1623 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
1624 (const struct dt_key *)dot, th);
1628 /* master stripe FID will be put to .. */
1629 rec->rec_fid = lu_object_fid(&dt->do_lu);
1630 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
1631 (const struct dt_key *)dotdot, th);
1635 /* probably nothing to inherite */
1636 if (lo->ldo_striping_cached &&
1637 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1638 lo->ldo_def_stripenr,
1639 lo->ldo_def_stripe_offset)) {
1640 struct lov_user_md_v3 *v3;
1642 /* sigh, lti_ea_store has been used for lmv_buf,
1643 * so we have to allocate buffer for default
1647 GOTO(out_put, rc = -ENOMEM);
1649 memset(v3, 0, sizeof(*v3));
1650 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1651 v3->lmm_stripe_count =
1652 cpu_to_le16(lo->ldo_def_stripenr);
1653 v3->lmm_stripe_offset =
1654 cpu_to_le16(lo->ldo_def_stripe_offset);
1655 v3->lmm_stripe_size =
1656 cpu_to_le32(lo->ldo_def_stripe_size);
1657 if (lo->ldo_pool != NULL)
1658 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1659 sizeof(v3->lmm_pool_name));
1661 info->lti_buf.lb_buf = v3;
1662 info->lti_buf.lb_len = sizeof(*v3);
1663 rc = dt_declare_xattr_set(env, dto,
1672 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1673 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1674 XATTR_NAME_LMV, 0, th);
1678 snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
1679 PFID(lu_object_fid(&dto->do_lu)), i);
1681 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1682 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1686 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1690 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1691 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1692 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1693 XATTR_NAME_LINK, 0, th);
1697 rec->rec_fid = lu_object_fid(&dto->do_lu);
1698 rc = dt_declare_insert(env, dt_object_child(dt),
1699 (const struct dt_rec *)rec,
1700 (const struct dt_key *)stripe_name, th);
1704 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1709 rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1710 XATTR_NAME_LMV, 0, th);
1716 for (i = 0; i < stripe_count; i++)
1717 if (stripe[i] != NULL)
1718 lu_object_put(env, &stripe[i]->do_lu);
1719 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1720 lo->ldo_stripenr = 0;
1721 lo->ldo_stripes_allocated = 0;
1722 lo->ldo_stripe = NULL;
1726 if (idx_array != NULL)
1727 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1728 if (slave_lmm != NULL)
1729 OBD_FREE_PTR(slave_lmm);
1735 * Declare create striped md object.
1737 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1738 struct dt_object *dt,
1739 struct lu_attr *attr,
1740 const struct lu_buf *lum_buf,
1741 struct dt_object_format *dof,
1744 struct lod_object *lo = lod_dt_obj(dt);
1745 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
1746 struct lmv_user_md_v1 *lum;
1750 lum = lum_buf->lb_buf;
1751 LASSERT(lum != NULL);
1753 CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1754 le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1755 (int)le32_to_cpu(lum->lum_stripe_offset));
1757 if (le32_to_cpu(lum->lum_stripe_count) == 0)
1760 rc = lod_verify_md_striping(lod, lum);
1764 /* prepare dir striped objects */
1765 rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1767 /* failed to create striping, let's reset
1768 * config so that others don't get confused */
1769 lod_object_free_striping(env, lo);
1776 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1777 struct dt_object *dt,
1778 const struct lu_buf *buf,
1779 const char *name, int fl,
1782 struct dt_object *next = dt_object_child(dt);
1783 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1784 struct lod_object *lo = lod_dt_obj(dt);
1789 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1790 struct lmv_user_md_v1 *lum;
1792 LASSERT(buf != NULL && buf->lb_buf != NULL);
1794 rc = lod_verify_md_striping(d, lum);
1799 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1803 /* set xattr to each stripes, if needed */
1804 rc = lod_load_striping(env, lo);
1808 /* Note: Do not set LinkEA on sub-stripes, otherwise
1809 * it will confuse the fid2path process(see mdt_path_current()).
1810 * The linkEA between master and sub-stripes is set in
1811 * lod_xattr_set_lmv(). */
1812 if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1815 for (i = 0; i < lo->ldo_stripenr; i++) {
1816 LASSERT(lo->ldo_stripe[i]);
1817 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1827 * LOV xattr is a storage for striping, and LOD owns this xattr.
1828 * but LOD allows others to control striping to some extent
1829 * - to reset strping
1830 * - to set new defined striping
1831 * - to set new semi-defined striping
1832 * - number of stripes is defined
1833 * - number of stripes + osts are defined
1836 static int lod_declare_xattr_set(const struct lu_env *env,
1837 struct dt_object *dt,
1838 const struct lu_buf *buf,
1839 const char *name, int fl,
1842 struct dt_object *next = dt_object_child(dt);
1843 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
1849 * allow to declare predefined striping on a new (!mode) object
1850 * which is supposed to be replay of regular file creation
1851 * (when LOV setting is declared)
1852 * LU_XATTR_REPLACE is set to indicate a layout swap
1854 mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1855 if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1856 !(fl & LU_XATTR_REPLACE)) {
1858 * this is a request to manipulate object's striping
1860 if (dt_object_exists(dt)) {
1861 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1865 memset(attr, 0, sizeof(*attr));
1866 attr->la_valid = LA_TYPE | LA_MODE;
1867 attr->la_mode = S_IFREG;
1869 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1870 } else if (S_ISDIR(mode)) {
1871 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1873 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1879 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1881 lo->ldo_striping_cached = 0;
1882 lo->ldo_def_striping_set = 0;
1883 lod_object_set_pool(lo, NULL);
1884 lo->ldo_def_stripe_size = 0;
1885 lo->ldo_def_stripenr = 0;
1886 if (lo->ldo_dir_stripe != NULL)
1887 lo->ldo_dir_striping_cached = 0;
1890 static int lod_xattr_set_internal(const struct lu_env *env,
1891 struct dt_object *dt,
1892 const struct lu_buf *buf,
1893 const char *name, int fl, struct thandle *th,
1894 struct lustre_capa *capa)
1896 struct dt_object *next = dt_object_child(dt);
1897 struct lod_object *lo = lod_dt_obj(dt);
1902 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1903 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1906 /* Note: Do not set LinkEA on sub-stripes, otherwise
1907 * it will confuse the fid2path process(see mdt_path_current()).
1908 * The linkEA between master and sub-stripes is set in
1909 * lod_xattr_set_lmv(). */
1910 if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1913 for (i = 0; i < lo->ldo_stripenr; i++) {
1914 LASSERT(lo->ldo_stripe[i]);
1915 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1924 static int lod_xattr_del_internal(const struct lu_env *env,
1925 struct dt_object *dt,
1926 const char *name, struct thandle *th,
1927 struct lustre_capa *capa)
1929 struct dt_object *next = dt_object_child(dt);
1930 struct lod_object *lo = lod_dt_obj(dt);
1935 rc = dt_xattr_del(env, next, name, th, capa);
1936 if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1939 if (lo->ldo_stripenr == 0)
1942 for (i = 0; i < lo->ldo_stripenr; i++) {
1943 LASSERT(lo->ldo_stripe[i]);
1944 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1953 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1954 struct dt_object *dt,
1955 const struct lu_buf *buf,
1956 const char *name, int fl,
1958 struct lustre_capa *capa)
1960 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
1961 struct lod_object *l = lod_dt_obj(dt);
1962 struct lov_user_md_v1 *lum;
1963 struct lov_user_md_v3 *v3 = NULL;
1967 /* If it is striped dir, we should clear the stripe cache for
1968 * slave stripe as well, but there are no effective way to
1969 * notify the LOD on the slave MDT, so we do not cache stripe
1970 * information for slave stripe for now. XXX*/
1971 lod_lov_stripe_cache_clear(l);
1972 LASSERT(buf != NULL && buf->lb_buf != NULL);
1975 rc = lod_verify_striping(d, buf, false);
1979 if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1982 /* if { size, offset, count } = { 0, -1, 0 } and no pool
1983 * (i.e. all default values specified) then delete default
1984 * striping from dir. */
1986 "set default striping: sz %u # %u offset %d %s %s\n",
1987 (unsigned)lum->lmm_stripe_size,
1988 (unsigned)lum->lmm_stripe_count,
1989 (int)lum->lmm_stripe_offset,
1990 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1992 if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1993 (lum->lmm_stripe_count),
1994 (lum->lmm_stripe_offset)) &&
1995 lum->lmm_magic == LOV_USER_MAGIC_V1) {
1996 rc = lod_xattr_del_internal(env, dt, name, th, capa);
2000 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2006 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
2007 struct dt_object *dt,
2008 const struct lu_buf *buf,
2009 const char *name, int fl,
2011 struct lustre_capa *capa)
2013 struct lod_object *l = lod_dt_obj(dt);
2014 struct lmv_user_md_v1 *lum;
2018 LASSERT(buf != NULL && buf->lb_buf != NULL);
2021 CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
2022 le32_to_cpu(lum->lum_stripe_count),
2023 (int)le32_to_cpu(lum->lum_stripe_offset));
2025 if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
2026 le32_to_cpu(lum->lum_stripe_offset)) &&
2027 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
2028 rc = lod_xattr_del_internal(env, dt, name, th, capa);
2032 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2037 /* Update default stripe cache */
2038 if (l->ldo_dir_stripe == NULL) {
2039 OBD_ALLOC_PTR(l->ldo_dir_stripe);
2040 if (l->ldo_dir_stripe == NULL)
2044 l->ldo_dir_striping_cached = 0;
2045 l->ldo_dir_def_striping_set = 1;
2046 l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
2051 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
2052 const struct lu_buf *buf, const char *name,
2053 int fl, struct thandle *th,
2054 struct lustre_capa *capa)
2056 struct lod_object *lo = lod_dt_obj(dt);
2057 struct lod_thread_info *info = lod_env_info(env);
2058 struct lu_attr *attr = &info->lti_attr;
2059 struct dt_object_format *dof = &info->lti_format;
2060 struct lu_buf lmv_buf;
2061 struct lu_buf slave_lmv_buf;
2062 struct lmv_mds_md_v1 *lmm;
2063 struct lmv_mds_md_v1 *slave_lmm = NULL;
2064 struct dt_insert_rec *rec = &info->lti_dt_rec;
2069 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2072 /* The stripes are supposed to be allocated in declare phase,
2073 * if there are no stripes being allocated, it will skip */
2074 if (lo->ldo_stripenr == 0)
2077 rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
2081 attr->la_valid = LA_TYPE | LA_MODE;
2082 dof->dof_type = DFT_DIR;
2084 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
2087 lmm = lmv_buf.lb_buf;
2089 OBD_ALLOC_PTR(slave_lmm);
2090 if (slave_lmm == NULL)
2093 lod_prep_slave_lmv_md(slave_lmm, lmm);
2094 slave_lmv_buf.lb_buf = slave_lmm;
2095 slave_lmv_buf.lb_len = sizeof(*slave_lmm);
2097 rec->rec_type = S_IFDIR;
2098 for (i = 0; i < lo->ldo_stripenr; i++) {
2099 struct dt_object *dto;
2100 char *stripe_name = info->lti_key;
2101 struct lu_name *sname;
2102 struct linkea_data ldata = { 0 };
2103 struct lu_buf linkea_buf;
2105 dto = lo->ldo_stripe[i];
2106 dt_write_lock(env, dto, MOR_TGT_CHILD);
2107 rc = dt_create(env, dto, attr, NULL, dof, th);
2108 dt_write_unlock(env, dto);
2112 rec->rec_fid = lu_object_fid(&dto->do_lu);
2113 rc = dt_insert(env, dto, (const struct dt_rec *)rec,
2114 (const struct dt_key *)dot, th, capa, 0);
2118 rec->rec_fid = lu_object_fid(&dt->do_lu);
2119 rc = dt_insert(env, dto, (struct dt_rec *)rec,
2120 (const struct dt_key *)dotdot, th, capa, 0);
2124 if (lo->ldo_striping_cached &&
2125 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2126 lo->ldo_def_stripenr,
2127 lo->ldo_def_stripe_offset)) {
2128 struct lov_user_md_v3 *v3;
2130 /* sigh, lti_ea_store has been used for lmv_buf,
2131 * so we have to allocate buffer for default
2137 memset(v3, 0, sizeof(*v3));
2138 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2139 v3->lmm_stripe_count =
2140 cpu_to_le16(lo->ldo_def_stripenr);
2141 v3->lmm_stripe_offset =
2142 cpu_to_le16(lo->ldo_def_stripe_offset);
2143 v3->lmm_stripe_size =
2144 cpu_to_le32(lo->ldo_def_stripe_size);
2145 if (lo->ldo_pool != NULL)
2146 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2147 sizeof(v3->lmm_pool_name));
2149 info->lti_buf.lb_buf = v3;
2150 info->lti_buf.lb_len = sizeof(*v3);
2151 rc = dt_xattr_set(env, dto, &info->lti_buf,
2152 XATTR_NAME_LOV, 0, th, capa);
2158 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
2159 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
2164 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2165 PFID(lu_object_fid(&dto->do_lu)), i);
2167 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
2168 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
2172 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
2176 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
2177 linkea_buf.lb_len = ldata.ld_leh->leh_len;
2178 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
2179 0, th, BYPASS_CAPA);
2183 rec->rec_fid = lu_object_fid(&dto->do_lu);
2184 rc = dt_insert(env, dt_object_child(dt),
2185 (const struct dt_rec *)rec,
2186 (const struct dt_key *)stripe_name, th, capa, 0);
2190 rc = dt_ref_add(env, dt_object_child(dt), th);
2195 rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
2199 if (slave_lmm != NULL)
2200 OBD_FREE_PTR(slave_lmm);
2205 int lod_dir_striping_create_internal(const struct lu_env *env,
2206 struct dt_object *dt,
2207 struct lu_attr *attr,
2208 struct dt_object_format *dof,
2212 struct lod_thread_info *info = lod_env_info(env);
2213 struct lod_object *lo = lod_dt_obj(dt);
2217 if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
2218 lo->ldo_dir_stripe_offset)) {
2219 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2220 int stripe_count = lo->ldo_stripenr;
2222 if (info->lti_ea_store_size < sizeof(*v1)) {
2223 rc = lod_ea_store_resize(info, sizeof(*v1));
2226 v1 = info->lti_ea_store;
2229 memset(v1, 0, sizeof(*v1));
2230 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2231 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2232 v1->lum_stripe_offset =
2233 cpu_to_le32(lo->ldo_dir_stripe_offset);
2235 info->lti_buf.lb_buf = v1;
2236 info->lti_buf.lb_len = sizeof(*v1);
2239 rc = lod_declare_xattr_set_lmv(env, dt, attr,
2240 &info->lti_buf, dof, th);
2242 rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2243 XATTR_NAME_LMV, 0, th,
2249 /* Transfer default LMV striping from the parent */
2250 if (lo->ldo_dir_striping_cached &&
2251 !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2252 lo->ldo_dir_def_stripe_offset)) {
2253 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2254 int def_stripe_count = lo->ldo_dir_def_stripenr;
2256 if (info->lti_ea_store_size < sizeof(*v1)) {
2257 rc = lod_ea_store_resize(info, sizeof(*v1));
2260 v1 = info->lti_ea_store;
2263 memset(v1, 0, sizeof(*v1));
2264 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2265 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2266 v1->lum_stripe_offset =
2267 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2269 cpu_to_le32(lo->ldo_dir_def_hash_type);
2271 info->lti_buf.lb_buf = v1;
2272 info->lti_buf.lb_len = sizeof(*v1);
2274 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2275 XATTR_NAME_DEFAULT_LMV,
2278 rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2280 XATTR_NAME_DEFAULT_LMV, 0,
2286 /* Transfer default LOV striping from the parent */
2287 if (lo->ldo_striping_cached &&
2288 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2289 lo->ldo_def_stripenr,
2290 lo->ldo_def_stripe_offset)) {
2291 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2293 if (info->lti_ea_store_size < sizeof(*v3)) {
2294 rc = lod_ea_store_resize(info, sizeof(*v3));
2297 v3 = info->lti_ea_store;
2300 memset(v3, 0, sizeof(*v3));
2301 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2302 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2303 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2304 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2305 if (lo->ldo_pool != NULL)
2306 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2307 sizeof(v3->lmm_pool_name));
2309 info->lti_buf.lb_buf = v3;
2310 info->lti_buf.lb_len = sizeof(*v3);
2313 rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2314 XATTR_NAME_LOV, 0, th);
2316 rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2317 XATTR_NAME_LOV, 0, th,
2326 static int lod_declare_dir_striping_create(const struct lu_env *env,
2327 struct dt_object *dt,
2328 struct lu_attr *attr,
2329 struct dt_object_format *dof,
2332 return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2335 static int lod_dir_striping_create(const struct lu_env *env,
2336 struct dt_object *dt,
2337 struct lu_attr *attr,
2338 struct dt_object_format *dof,
2341 return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2344 static int lod_xattr_set(const struct lu_env *env,
2345 struct dt_object *dt, const struct lu_buf *buf,
2346 const char *name, int fl, struct thandle *th,
2347 struct lustre_capa *capa)
2349 struct dt_object *next = dt_object_child(dt);
2353 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2354 strcmp(name, XATTR_NAME_LMV) == 0) {
2355 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2357 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2358 LMV_HASH_FLAG_MIGRATION)
2359 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2361 rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2366 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2367 strcmp(name, XATTR_NAME_LOV) == 0) {
2369 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2371 } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2372 strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2374 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2377 } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2378 !strcmp(name, XATTR_NAME_LOV)) {
2379 /* in case of lov EA swap, just set it
2380 * if not, it is a replay so check striping match what we
2381 * already have during req replay, declare_xattr_set()
2382 * defines striping, then create() does the work
2384 if (fl & LU_XATTR_REPLACE) {
2385 /* free stripes, then update disk */
2386 lod_object_free_striping(env, lod_dt_obj(dt));
2387 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2389 rc = lod_striping_create(env, dt, NULL, NULL, th);
2394 /* then all other xattr */
2395 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2400 static int lod_declare_xattr_del(const struct lu_env *env,
2401 struct dt_object *dt, const char *name,
2404 return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2407 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2408 const char *name, struct thandle *th,
2409 struct lustre_capa *capa)
2411 if (!strcmp(name, XATTR_NAME_LOV))
2412 lod_object_free_striping(env, lod_dt_obj(dt));
2413 return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2416 static int lod_xattr_list(const struct lu_env *env,
2417 struct dt_object *dt, struct lu_buf *buf,
2418 struct lustre_capa *capa)
2420 return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2423 int lod_object_set_pool(struct lod_object *o, char *pool)
2428 len = strlen(o->ldo_pool);
2429 OBD_FREE(o->ldo_pool, len + 1);
2434 OBD_ALLOC(o->ldo_pool, len + 1);
2435 if (o->ldo_pool == NULL)
2437 strcpy(o->ldo_pool, pool);
2442 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2444 return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2448 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2449 struct lod_object *lp)
2451 struct lod_thread_info *info = lod_env_info(env);
2452 struct lov_user_md_v1 *v1 = NULL;
2453 struct lov_user_md_v3 *v3 = NULL;
2457 /* called from MDD without parent being write locked,
2459 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2460 rc = lod_get_lov_ea(env, lp);
2464 if (rc < (typeof(rc))sizeof(struct lov_user_md)) {
2465 /* don't lookup for non-existing or invalid striping */
2466 lp->ldo_def_striping_set = 0;
2467 lp->ldo_striping_cached = 1;
2468 lp->ldo_def_stripe_size = 0;
2469 lp->ldo_def_stripenr = 0;
2470 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2471 GOTO(unlock, rc = 0);
2475 v1 = info->lti_ea_store;
2476 if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2477 lustre_swab_lov_user_md_v1(v1);
2478 } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2479 v3 = (struct lov_user_md_v3 *)v1;
2480 lustre_swab_lov_user_md_v3(v3);
2483 if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2484 GOTO(unlock, rc = 0);
2486 if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2487 GOTO(unlock, rc = 0);
2489 CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2490 PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2491 (int)v1->lmm_stripe_count,
2492 (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2494 lp->ldo_def_stripenr = v1->lmm_stripe_count;
2495 lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2496 lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2497 lp->ldo_striping_cached = 1;
2498 lp->ldo_def_striping_set = 1;
2499 if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2500 /* XXX: sanity check here */
2501 v3 = (struct lov_user_md_v3 *) v1;
2502 if (v3->lmm_pool_name[0])
2503 lod_object_set_pool(lp, v3->lmm_pool_name);
2507 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2512 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2513 struct lod_object *lp)
2515 struct lod_thread_info *info = lod_env_info(env);
2516 struct lmv_user_md_v1 *v1 = NULL;
2520 /* called from MDD without parent being write locked,
2522 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2523 rc = lod_get_default_lmv_ea(env, lp);
2527 if (rc < (typeof(rc))sizeof(struct lmv_user_md)) {
2528 /* don't lookup for non-existing or invalid striping */
2529 lp->ldo_dir_def_striping_set = 0;
2530 lp->ldo_dir_striping_cached = 1;
2531 lp->ldo_dir_def_stripenr = 0;
2532 lp->ldo_dir_def_stripe_offset =
2533 (typeof(v1->lum_stripe_offset))(-1);
2534 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2535 GOTO(unlock, rc = 0);
2539 v1 = info->lti_ea_store;
2541 lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2542 lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2543 lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2544 lp->ldo_dir_def_striping_set = 1;
2545 lp->ldo_dir_striping_cached = 1;
2549 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2553 static int lod_cache_parent_striping(const struct lu_env *env,
2554 struct lod_object *lp,
2560 rc = lod_load_striping(env, lp);
2564 if (!lp->ldo_striping_cached) {
2565 /* we haven't tried to get default striping for
2566 * the directory yet, let's cache it in the object */
2567 rc = lod_cache_parent_lov_striping(env, lp);
2572 if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2573 rc = lod_cache_parent_lmv_striping(env, lp);
2579 * used to transfer default striping data to the object being created
2581 static void lod_ah_init(const struct lu_env *env,
2582 struct dt_allocation_hint *ah,
2583 struct dt_object *parent,
2584 struct dt_object *child,
2587 struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2588 struct dt_object *nextp = NULL;
2589 struct dt_object *nextc;
2590 struct lod_object *lp = NULL;
2591 struct lod_object *lc;
2592 struct lov_desc *desc;
2598 if (likely(parent)) {
2599 nextp = dt_object_child(parent);
2600 lp = lod_dt_obj(parent);
2601 rc = lod_load_striping(env, lp);
2606 nextc = dt_object_child(child);
2607 lc = lod_dt_obj(child);
2609 LASSERT(lc->ldo_stripenr == 0);
2610 LASSERT(lc->ldo_stripe == NULL);
2613 * local object may want some hints
2614 * in case of late striping creation, ->ah_init()
2615 * can be called with local object existing
2617 if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2618 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2619 NULL : nextp, nextc, child_mode);
2621 if (S_ISDIR(child_mode)) {
2622 if (lc->ldo_dir_stripe == NULL) {
2623 OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2624 if (lc->ldo_dir_stripe == NULL)
2628 if (lp->ldo_dir_stripe == NULL) {
2629 OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2630 if (lp->ldo_dir_stripe == NULL)
2634 rc = lod_cache_parent_striping(env, lp, child_mode);
2638 /* transfer defaults to new directory */
2639 if (lp->ldo_striping_cached) {
2641 lod_object_set_pool(lc, lp->ldo_pool);
2642 lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2643 lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2644 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2645 lc->ldo_striping_cached = 1;
2646 lc->ldo_def_striping_set = 1;
2647 CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2648 (int)lc->ldo_def_stripe_size,
2649 (int)lc->ldo_def_stripe_offset,
2650 (int)lc->ldo_def_stripenr);
2653 /* transfer dir defaults to new directory */
2654 if (lp->ldo_dir_striping_cached) {
2655 lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2656 lc->ldo_dir_def_stripe_offset =
2657 lp->ldo_dir_def_stripe_offset;
2658 lc->ldo_dir_def_hash_type =
2659 lp->ldo_dir_def_hash_type;
2660 lc->ldo_dir_striping_cached = 1;
2661 lc->ldo_dir_def_striping_set = 1;
2662 CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2663 (int)lc->ldo_dir_def_stripenr,
2664 (int)lc->ldo_dir_def_stripe_offset,
2665 lc->ldo_dir_def_hash_type);
2668 /* It should always honour the specified stripes */
2669 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2670 const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2672 rc = lod_verify_md_striping(d, lum1);
2674 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2675 /* Directory will be striped only if
2676 * stripe_count > 1 */
2678 le32_to_cpu(lum1->lum_stripe_count);
2679 lc->ldo_dir_stripe_offset =
2680 le32_to_cpu(lum1->lum_stripe_offset);
2681 lc->ldo_dir_hash_type =
2682 le32_to_cpu(lum1->lum_hash_type);
2683 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2685 (int)lc->ldo_dir_stripe_offset);
2687 /* then check whether there is default stripes from parent */
2688 } else if (lp->ldo_dir_def_striping_set) {
2689 /* If there are default dir stripe from parent */
2690 lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2691 lc->ldo_dir_stripe_offset =
2692 lp->ldo_dir_def_stripe_offset;
2693 lc->ldo_dir_hash_type =
2694 lp->ldo_dir_def_hash_type;
2695 CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2697 (int)lc->ldo_dir_stripe_offset);
2699 /* set default stripe for this directory */
2700 lc->ldo_stripenr = 0;
2701 lc->ldo_dir_stripe_offset = -1;
2704 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2705 lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2711 * if object is going to be striped over OSTs, transfer default
2712 * striping information to the child, so that we can use it
2713 * during declaration and creation
2715 if (!lod_object_will_be_striped(S_ISREG(child_mode),
2716 lu_object_fid(&child->do_lu)))
2719 * try from the parent
2721 if (likely(parent)) {
2722 lod_cache_parent_striping(env, lp, child_mode);
2724 lc->ldo_def_stripe_offset = (__u16) -1;
2726 if (lp->ldo_def_striping_set) {
2728 lod_object_set_pool(lc, lp->ldo_pool);
2729 lc->ldo_stripenr = lp->ldo_def_stripenr;
2730 lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2731 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2732 CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2733 lc->ldo_stripenr, lc->ldo_stripe_size,
2734 lp->ldo_pool ? lp->ldo_pool : "");
2739 * if the parent doesn't provide with specific pattern, grab fs-wide one
2741 desc = &d->lod_desc;
2742 if (lc->ldo_stripenr == 0)
2743 lc->ldo_stripenr = desc->ld_default_stripe_count;
2744 if (lc->ldo_stripe_size == 0)
2745 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2746 CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2747 lc->ldo_stripenr, lc->ldo_stripe_size,
2748 lc->ldo_pool ? lc->ldo_pool : "");
2751 /* we do not cache stripe information for slave stripe, see
2752 * lod_xattr_set_lov_on_dir */
2753 if (lp != NULL && lp->ldo_dir_slave_stripe)
2754 lod_lov_stripe_cache_clear(lp);
2759 #define ll_do_div64(aaa,bbb) do_div((aaa), (bbb))
2761 * this function handles a special case when truncate was done
2762 * on a stripeless object and now striping is being created
2763 * we can't lose that size, so we have to propagate it to newly
2766 static int lod_declare_init_size(const struct lu_env *env,
2767 struct dt_object *dt, struct thandle *th)
2769 struct dt_object *next = dt_object_child(dt);
2770 struct lod_object *lo = lod_dt_obj(dt);
2771 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
2772 uint64_t size, offs;
2776 /* XXX: we support the simplest (RAID0) striping so far */
2777 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2778 LASSERT(lo->ldo_stripe_size > 0);
2780 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2781 LASSERT(attr->la_valid & LA_SIZE);
2785 size = attr->la_size;
2789 /* ll_do_div64(a, b) returns a % b, and a = a / b */
2790 ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2791 stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2793 size = size * lo->ldo_stripe_size;
2794 offs = attr->la_size;
2795 size += ll_do_div64(offs, lo->ldo_stripe_size);
2797 attr->la_valid = LA_SIZE;
2798 attr->la_size = size;
2800 rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2806 * Create declaration of striped object
2808 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2809 struct lu_attr *attr,
2810 const struct lu_buf *lovea, struct thandle *th)
2812 struct lod_thread_info *info = lod_env_info(env);
2813 struct dt_object *next = dt_object_child(dt);
2814 struct lod_object *lo = lod_dt_obj(dt);
2818 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2819 /* failed to create striping, let's reset
2820 * config so that others don't get confused */
2821 lod_object_free_striping(env, lo);
2822 GOTO(out, rc = -ENOMEM);
2825 if (!dt_object_remote(next)) {
2826 /* choose OST and generate appropriate objects */
2827 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2829 /* failed to create striping, let's reset
2830 * config so that others don't get confused */
2831 lod_object_free_striping(env, lo);
2836 * declare storage for striping data
2838 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2839 lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
2841 /* LOD can not choose OST objects for remote objects, i.e.
2842 * stripes must be ready before that. Right now, it can only
2843 * happen during migrate, i.e. migrate process needs to create
2844 * remote regular file (mdd_migrate_create), then the migrate
2845 * process will provide stripeEA. */
2846 LASSERT(lovea != NULL);
2847 info->lti_buf = *lovea;
2850 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2851 XATTR_NAME_LOV, 0, th);
2856 * if striping is created with local object's size > 0,
2857 * we have to propagate this size to specific object
2858 * the case is possible only when local object was created previously
2860 if (dt_object_exists(next))
2861 rc = lod_declare_init_size(env, dt, th);
2867 static int lod_declare_object_create(const struct lu_env *env,
2868 struct dt_object *dt,
2869 struct lu_attr *attr,
2870 struct dt_allocation_hint *hint,
2871 struct dt_object_format *dof,
2874 struct dt_object *next = dt_object_child(dt);
2875 struct lod_object *lo = lod_dt_obj(dt);
2884 * first of all, we declare creation of local object
2886 rc = dt_declare_create(env, next, attr, hint, dof, th);
2890 if (dof->dof_type == DFT_SYM)
2891 dt->do_body_ops = &lod_body_lnk_ops;
2894 * it's lod_ah_init() who has decided the object will striped
2896 if (dof->dof_type == DFT_REGULAR) {
2897 /* callers don't want stripes */
2898 /* XXX: all tricky interactions with ->ah_make_hint() decided
2899 * to use striping, then ->declare_create() behaving differently
2900 * should be cleaned */
2901 if (dof->u.dof_reg.striped == 0)
2902 lo->ldo_stripenr = 0;
2903 if (lo->ldo_stripenr > 0)
2904 rc = lod_declare_striped_object(env, dt, attr,
2906 } else if (dof->dof_type == DFT_DIR) {
2907 /* Orphan object (like migrating object) does not have
2908 * lod_dir_stripe, see lod_ah_init */
2909 if (lo->ldo_dir_stripe != NULL)
2910 rc = lod_declare_dir_striping_create(env, dt, attr,
2917 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2918 struct lu_attr *attr, struct dt_object_format *dof,
2921 struct lod_object *lo = lod_dt_obj(dt);
2925 LASSERT(lo->ldo_striping_cached == 0);
2927 /* create all underlying objects */
2928 for (i = 0; i < lo->ldo_stripenr; i++) {
2929 LASSERT(lo->ldo_stripe[i]);
2930 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2936 rc = lod_generate_and_set_lovea(env, lo, th);
2941 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2942 struct lu_attr *attr,
2943 struct dt_allocation_hint *hint,
2944 struct dt_object_format *dof, struct thandle *th)
2946 struct dt_object *next = dt_object_child(dt);
2947 struct lod_object *lo = lod_dt_obj(dt);
2951 /* create local object */
2952 rc = dt_create(env, next, attr, hint, dof, th);
2956 if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2957 lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2958 rc = lod_striping_create(env, dt, attr, dof, th);
2963 static int lod_declare_object_destroy(const struct lu_env *env,
2964 struct dt_object *dt,
2967 struct dt_object *next = dt_object_child(dt);
2968 struct lod_object *lo = lod_dt_obj(dt);
2969 struct lod_thread_info *info = lod_env_info(env);
2970 char *stripe_name = info->lti_key;
2975 * load striping information, notice we don't do this when object
2976 * is being initialized as we don't need this information till
2977 * few specific cases like destroy, chown
2979 rc = lod_load_striping(env, lo);
2983 /* declare destroy for all underlying objects */
2984 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2985 rc = next->do_ops->do_index_try(env, next,
2986 &dt_directory_features);
2990 for (i = 0; i < lo->ldo_stripenr; i++) {
2991 rc = dt_declare_ref_del(env, next, th);
2994 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2995 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2997 rc = dt_declare_delete(env, next,
2998 (const struct dt_key *)stripe_name, th);
3004 * we declare destroy for the local object
3006 rc = dt_declare_destroy(env, next, th);
3010 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3013 /* declare destroy all striped objects */
3014 for (i = 0; i < lo->ldo_stripenr; i++) {
3015 if (likely(lo->ldo_stripe[i] != NULL)) {
3016 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
3025 static int lod_object_destroy(const struct lu_env *env,
3026 struct dt_object *dt, struct thandle *th)
3028 struct dt_object *next = dt_object_child(dt);
3029 struct lod_object *lo = lod_dt_obj(dt);
3030 struct lod_thread_info *info = lod_env_info(env);
3031 char *stripe_name = info->lti_key;
3036 /* destroy sub-stripe of master object */
3037 if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
3038 rc = next->do_ops->do_index_try(env, next,
3039 &dt_directory_features);
3043 for (i = 0; i < lo->ldo_stripenr; i++) {
3044 rc = dt_ref_del(env, next, th);
3048 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
3049 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
3052 CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
3053 PFID(lu_object_fid(&dt->do_lu)), stripe_name,
3054 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
3056 rc = dt_delete(env, next,
3057 (const struct dt_key *)stripe_name,
3063 rc = dt_destroy(env, next, th);
3067 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3070 /* destroy all striped objects */
3071 for (i = 0; i < lo->ldo_stripenr; i++) {
3072 if (likely(lo->ldo_stripe[i] != NULL) &&
3073 (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
3074 i == cfs_fail_val)) {
3075 rc = dt_destroy(env, lo->ldo_stripe[i], th);
3084 static int lod_declare_ref_add(const struct lu_env *env,
3085 struct dt_object *dt, struct thandle *th)
3087 return dt_declare_ref_add(env, dt_object_child(dt), th);
3090 static int lod_ref_add(const struct lu_env *env,
3091 struct dt_object *dt, struct thandle *th)
3093 return dt_ref_add(env, dt_object_child(dt), th);
3096 static int lod_declare_ref_del(const struct lu_env *env,
3097 struct dt_object *dt, struct thandle *th)
3099 return dt_declare_ref_del(env, dt_object_child(dt), th);
3102 static int lod_ref_del(const struct lu_env *env,
3103 struct dt_object *dt, struct thandle *th)
3105 return dt_ref_del(env, dt_object_child(dt), th);
3108 static struct obd_capa *lod_capa_get(const struct lu_env *env,
3109 struct dt_object *dt,
3110 struct lustre_capa *old, __u64 opc)
3112 return dt_capa_get(env, dt_object_child(dt), old, opc);
3115 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
3116 __u64 start, __u64 end)
3118 return dt_object_sync(env, dt_object_child(dt), start, end);
3121 struct lod_slave_locks {
3123 struct lustre_handle lsl_handle[0];
3126 static int lod_object_unlock_internal(const struct lu_env *env,
3127 struct dt_object *dt,
3128 struct ldlm_enqueue_info *einfo,
3129 ldlm_policy_data_t *policy)
3131 struct lod_object *lo = lod_dt_obj(dt);
3132 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
3137 if (slave_locks == NULL)
3140 for (i = 1; i < slave_locks->lsl_lock_count; i++) {
3141 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
3144 einfo->ei_cbdata = &slave_locks->lsl_handle[i];
3145 rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
3148 rc = rc == 0 ? rc1 : rc;
3155 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
3156 struct ldlm_enqueue_info *einfo,
3157 union ldlm_policy_data *policy)
3159 struct lod_object *lo = lod_dt_obj(dt);
3160 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
3161 int slave_locks_size;
3165 if (slave_locks == NULL)
3168 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3171 rc = lod_load_striping(env, lo);
3175 /* Note: for remote lock for single stripe dir, MDT will cancel
3176 * the lock by lockh directly */
3177 if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
3180 /* Only cancel slave lock for striped dir */
3181 rc = lod_object_unlock_internal(env, dt, einfo, policy);
3183 slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
3184 sizeof(slave_locks->lsl_handle[0]);
3185 OBD_FREE(slave_locks, slave_locks_size);
3186 einfo->ei_cbdata = NULL;
3191 static int lod_object_lock(const struct lu_env *env,
3192 struct dt_object *dt,
3193 struct lustre_handle *lh,
3194 struct ldlm_enqueue_info *einfo,
3195 union ldlm_policy_data *policy)
3197 struct lod_object *lo = lod_dt_obj(dt);
3200 int slave_locks_size;
3201 struct lod_slave_locks *slave_locks = NULL;
3204 /* remote object lock */
3205 if (!einfo->ei_enq_slave) {
3206 LASSERT(dt_object_remote(dt));
3207 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
3211 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3214 rc = lod_load_striping(env, lo);
3219 if (lo->ldo_stripenr <= 1)
3222 slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
3223 sizeof(slave_locks->lsl_handle[0]);
3224 /* Freed in lod_object_unlock */
3225 OBD_ALLOC(slave_locks, slave_locks_size);
3226 if (slave_locks == NULL)
3228 slave_locks->lsl_lock_count = lo->ldo_stripenr;
3230 /* striped directory lock */
3231 for (i = 1; i < lo->ldo_stripenr; i++) {
3232 struct lustre_handle lockh;
3233 struct ldlm_res_id *res_id;
3235 res_id = &lod_env_info(env)->lti_res_id;
3236 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3238 einfo->ei_res_id = res_id;
3240 LASSERT(lo->ldo_stripe[i]);
3241 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3245 slave_locks->lsl_handle[i] = lockh;
3248 einfo->ei_cbdata = slave_locks;
3251 if (rc != 0 && slave_locks != NULL) {
3252 einfo->ei_cbdata = slave_locks;
3253 lod_object_unlock_internal(env, dt, einfo, policy);
3254 OBD_FREE(slave_locks, slave_locks_size);
3255 einfo->ei_cbdata = NULL;
3261 struct dt_object_operations lod_obj_ops = {
3262 .do_read_lock = lod_object_read_lock,
3263 .do_write_lock = lod_object_write_lock,
3264 .do_read_unlock = lod_object_read_unlock,
3265 .do_write_unlock = lod_object_write_unlock,
3266 .do_write_locked = lod_object_write_locked,
3267 .do_attr_get = lod_attr_get,
3268 .do_declare_attr_set = lod_declare_attr_set,
3269 .do_attr_set = lod_attr_set,
3270 .do_xattr_get = lod_xattr_get,
3271 .do_declare_xattr_set = lod_declare_xattr_set,
3272 .do_xattr_set = lod_xattr_set,
3273 .do_declare_xattr_del = lod_declare_xattr_del,
3274 .do_xattr_del = lod_xattr_del,
3275 .do_xattr_list = lod_xattr_list,
3276 .do_ah_init = lod_ah_init,
3277 .do_declare_create = lod_declare_object_create,
3278 .do_create = lod_object_create,
3279 .do_declare_destroy = lod_declare_object_destroy,
3280 .do_destroy = lod_object_destroy,
3281 .do_index_try = lod_index_try,
3282 .do_declare_ref_add = lod_declare_ref_add,
3283 .do_ref_add = lod_ref_add,
3284 .do_declare_ref_del = lod_declare_ref_del,
3285 .do_ref_del = lod_ref_del,
3286 .do_capa_get = lod_capa_get,
3287 .do_object_sync = lod_object_sync,
3288 .do_object_lock = lod_object_lock,
3289 .do_object_unlock = lod_object_unlock,
3292 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3293 struct lu_buf *buf, loff_t *pos,
3294 struct lustre_capa *capa)
3296 struct dt_object *next = dt_object_child(dt);
3297 return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3300 static ssize_t lod_declare_write(const struct lu_env *env,
3301 struct dt_object *dt,
3302 const struct lu_buf *buf, loff_t pos,
3305 return dt_declare_record_write(env, dt_object_child(dt),
3309 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3310 const struct lu_buf *buf, loff_t *pos,
3311 struct thandle *th, struct lustre_capa *capa, int iq)
3313 struct dt_object *next = dt_object_child(dt);
3315 return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3318 static const struct dt_body_operations lod_body_lnk_ops = {
3319 .dbo_read = lod_read,
3320 .dbo_declare_write = lod_declare_write,
3321 .dbo_write = lod_write
3324 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3325 const struct lu_object_conf *conf)
3327 struct lod_device *lod = lu2lod_dev(lo->lo_dev);
3328 struct lu_device *cdev = NULL;
3329 struct lu_object *cobj;
3330 struct lod_tgt_descs *ltd = NULL;
3331 struct lod_tgt_desc *tgt;
3333 int type = LU_SEQ_RANGE_ANY;
3337 rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3341 if (type == LU_SEQ_RANGE_MDT &&
3342 idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3343 cdev = &lod->lod_child->dd_lu_dev;
3344 } else if (type == LU_SEQ_RANGE_MDT) {
3345 ltd = &lod->lod_mdt_descs;
3347 } else if (type == LU_SEQ_RANGE_OST) {
3348 ltd = &lod->lod_ost_descs;
3355 if (ltd->ltd_tgts_size > idx &&
3356 cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3357 tgt = LTD_TGT(ltd, idx);
3359 LASSERT(tgt != NULL);
3360 LASSERT(tgt->ltd_tgt != NULL);
3362 cdev = &(tgt->ltd_tgt->dd_lu_dev);
3364 lod_putref(lod, ltd);
3367 if (unlikely(cdev == NULL))
3370 cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3371 if (unlikely(cobj == NULL))
3374 lu_object_add(lo, cobj);
3379 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3383 if (lo->ldo_dir_stripe != NULL) {
3384 OBD_FREE_PTR(lo->ldo_dir_stripe);
3385 lo->ldo_dir_stripe = NULL;
3388 if (lo->ldo_stripe) {
3389 LASSERT(lo->ldo_stripes_allocated > 0);
3391 for (i = 0; i < lo->ldo_stripenr; i++) {
3392 if (lo->ldo_stripe[i])
3393 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3396 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3397 OBD_FREE(lo->ldo_stripe, i);
3398 lo->ldo_stripe = NULL;
3399 lo->ldo_stripes_allocated = 0;
3401 lo->ldo_stripenr = 0;
3402 lo->ldo_pattern = 0;
3406 * ->start is called once all slices are initialized, including header's
3407 * cache for mode (object type). using the type we can initialize ops
3409 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3411 if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3412 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3416 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3418 struct lod_object *mo = lu2lod_obj(o);
3421 * release all underlying object pinned
3424 lod_object_free_striping(env, mo);
3426 lod_object_set_pool(mo, NULL);
3429 OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3432 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3434 /* XXX: shouldn't we release everything here in case if object
3435 * creation failed before? */
3438 static int lod_object_print(const struct lu_env *env, void *cookie,
3439 lu_printer_t p, const struct lu_object *l)
3441 struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3443 return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3446 struct lu_object_operations lod_lu_obj_ops = {
3447 .loo_object_init = lod_object_init,
3448 .loo_object_start = lod_object_start,
3449 .loo_object_free = lod_object_free,
3450 .loo_object_release = lod_object_release,
3451 .loo_object_print = lod_object_print,