Whamcloud - gitweb
LU-5099 api: transfer object type via dt_insert API
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
48
49 #include "lod_internal.h"
50
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
53
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
56
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58                             struct dt_rec *rec, const struct dt_key *key,
59                             struct lustre_capa *capa)
60 {
61         struct dt_object *next = dt_object_child(dt);
62         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
63 }
64
65 static int lod_declare_index_insert(const struct lu_env *env,
66                                     struct dt_object *dt,
67                                     const struct dt_rec *rec,
68                                     const struct dt_key *key,
69                                     struct thandle *handle)
70 {
71         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
72 }
73
74 static int lod_index_insert(const struct lu_env *env,
75                             struct dt_object *dt,
76                             const struct dt_rec *rec,
77                             const struct dt_key *key,
78                             struct thandle *th,
79                             struct lustre_capa *capa,
80                             int ign)
81 {
82         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
83 }
84
85 static int lod_declare_index_delete(const struct lu_env *env,
86                                     struct dt_object *dt,
87                                     const struct dt_key *key,
88                                     struct thandle *th)
89 {
90         return dt_declare_delete(env, dt_object_child(dt), key, th);
91 }
92
93 static int lod_index_delete(const struct lu_env *env,
94                             struct dt_object *dt,
95                             const struct dt_key *key,
96                             struct thandle *th,
97                             struct lustre_capa *capa)
98 {
99         return dt_delete(env, dt_object_child(dt), key, th, capa);
100 }
101
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103                                  struct dt_object *dt, __u32 attr,
104                                  struct lustre_capa *capa)
105 {
106         struct dt_object        *next = dt_object_child(dt);
107         struct lod_it           *it = &lod_env_info(env)->lti_it;
108         struct dt_it            *it_next;
109
110
111         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
112         if (IS_ERR(it_next))
113                 return it_next;
114
115         /* currently we do not use more than one iterator per thread
116          * so we store it in thread info. if at some point we need
117          * more active iterators in a single thread, we can allocate
118          * additional ones */
119         LASSERT(it->lit_obj == NULL);
120
121         it->lit_it = it_next;
122         it->lit_obj = next;
123
124         return (struct dt_it *)it;
125 }
126
127 #define LOD_CHECK_IT(env, it)                                   \
128 do {                                                            \
129         LASSERT((it)->lit_obj != NULL);                         \
130         LASSERT((it)->lit_it != NULL);                          \
131 } while (0)
132
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
134 {
135         struct lod_it *it = (struct lod_it *)di;
136
137         LOD_CHECK_IT(env, it);
138         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
139
140         /* the iterator not in use any more */
141         it->lit_obj = NULL;
142         it->lit_it = NULL;
143 }
144
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146                const struct dt_key *key)
147 {
148         const struct lod_it *it = (const struct lod_it *)di;
149
150         LOD_CHECK_IT(env, it);
151         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
152 }
153
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
155 {
156         struct lod_it *it = (struct lod_it *)di;
157
158         LOD_CHECK_IT(env, it);
159         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
160 }
161
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
163 {
164         struct lod_it *it = (struct lod_it *)di;
165
166         LOD_CHECK_IT(env, it);
167         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
168 }
169
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
171 {
172         const struct lod_it *it = (const struct lod_it *)di;
173
174         LOD_CHECK_IT(env, it);
175         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
176 }
177
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
179 {
180         struct lod_it *it = (struct lod_it *)di;
181
182         LOD_CHECK_IT(env, it);
183         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
184 }
185
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187                struct dt_rec *rec, __u32 attr)
188 {
189         const struct lod_it *it = (const struct lod_it *)di;
190
191         LOD_CHECK_IT(env, it);
192         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
193                                                      attr);
194 }
195
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
197                     __u32 attr)
198 {
199         const struct lod_it *it = (const struct lod_it *)di;
200
201         LOD_CHECK_IT(env, it);
202         return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
203                                                           attr);
204 }
205
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
207 {
208         const struct lod_it *it = (const struct lod_it *)di;
209
210         LOD_CHECK_IT(env, it);
211         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
212 }
213
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
215 {
216         const struct lod_it *it = (const struct lod_it *)di;
217
218         LOD_CHECK_IT(env, it);
219         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
220 }
221
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
223                    void *key_rec)
224 {
225         const struct lod_it *it = (const struct lod_it *)di;
226
227         LOD_CHECK_IT(env, it);
228         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
229                                                          key_rec);
230 }
231
232 static struct dt_index_operations lod_index_ops = {
233         .dio_lookup             = lod_index_lookup,
234         .dio_declare_insert     = lod_declare_index_insert,
235         .dio_insert             = lod_index_insert,
236         .dio_declare_delete     = lod_declare_index_delete,
237         .dio_delete             = lod_index_delete,
238         .dio_it = {
239                 .init           = lod_it_init,
240                 .fini           = lod_it_fini,
241                 .get            = lod_it_get,
242                 .put            = lod_it_put,
243                 .next           = lod_it_next,
244                 .key            = lod_it_key,
245                 .key_size       = lod_it_key_size,
246                 .rec            = lod_it_rec,
247                 .rec_size       = lod_it_rec_size,
248                 .store          = lod_it_store,
249                 .load           = lod_it_load,
250                 .key_rec        = lod_it_key_rec,
251         }
252 };
253
254 /**
255  * Implementation of dt_index_operations:: dio_it.init
256  *
257  * This function is to initialize the iterator for striped directory,
258  * basically these lod_striped_it_xxx will just locate the stripe
259  * and call the correspondent api of its next lower layer.
260  *
261  * \param[in] env       execution environment.
262  * \param[in] dt        the striped directory object to be iterated.
263  * \param[in] attr      the attribute of iterator, mostly used to indicate
264  *                      the entry attribute in the object to be iterated.
265  * \param[in] capa      capability(useless in current implementation)
266  *
267  * \retval      initialized iterator(dt_it) if successful initialize the
268  *              iteration. lit_stripe_index will be used to indicate the
269  *              current iterate position among stripes.
270  * \retval      ERR pointer if initialization is failed.
271  */
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273                                          struct dt_object *dt, __u32 attr,
274                                          struct lustre_capa *capa)
275 {
276         struct lod_object       *lo = lod_dt_obj(dt);
277         struct dt_object        *next;
278         struct lod_it           *it = &lod_env_info(env)->lti_it;
279         struct dt_it            *it_next;
280         ENTRY;
281
282         LASSERT(lo->ldo_stripenr > 0);
283         next = lo->ldo_stripe[0];
284         LASSERT(next != NULL);
285         LASSERT(next->do_index_ops != NULL);
286
287         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
288         if (IS_ERR(it_next))
289                 return it_next;
290
291         /* currently we do not use more than one iterator per thread
292          * so we store it in thread info. if at some point we need
293          * more active iterators in a single thread, we can allocate
294          * additional ones */
295         LASSERT(it->lit_obj == NULL);
296
297         it->lit_stripe_index = 0;
298         it->lit_attr = attr;
299         it->lit_it = it_next;
300         it->lit_obj = dt;
301
302         return (struct dt_it *)it;
303 }
304
305 #define LOD_CHECK_STRIPED_IT(env, it, lo)                       \
306 do {                                                            \
307         LASSERT((it)->lit_obj != NULL);                         \
308         LASSERT((it)->lit_it != NULL);                          \
309         LASSERT((lo)->ldo_stripenr > 0);                        \
310         LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr);   \
311 } while (0)
312
313 /**
314  * Implementation of dt_index_operations:: dio_it.fini
315  *
316  * This function is to finish the iterator for striped directory.
317  *
318  * \param[in] env       execution environment.
319  * \param[in] di        the iterator for the striped directory
320  *
321  */
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
323 {
324         struct lod_it           *it = (struct lod_it *)di;
325         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
326         struct dt_object        *next;
327
328         LOD_CHECK_STRIPED_IT(env, it, lo);
329
330         next = lo->ldo_stripe[it->lit_stripe_index];
331         LASSERT(next != NULL);
332         LASSERT(next->do_index_ops != NULL);
333
334         next->do_index_ops->dio_it.fini(env, it->lit_it);
335
336         /* the iterator not in use any more */
337         it->lit_obj = NULL;
338         it->lit_it = NULL;
339         it->lit_stripe_index = 0;
340 }
341
342 /**
343  * Implementation of dt_index_operations:: dio_it.get
344  *
345  * This function is to position the iterator with given key
346  *
347  * \param[in] env       execution environment.
348  * \param[in] di        the iterator for striped directory.
349  * \param[in] key       the key the iterator will be positioned.
350  *
351  * \retval      0 if successfully position iterator by the key.
352  * \retval      negative error if position is failed.
353  */
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355                               const struct dt_key *key)
356 {
357         const struct lod_it     *it = (const struct lod_it *)di;
358         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
359         struct dt_object        *next;
360         ENTRY;
361
362         LOD_CHECK_STRIPED_IT(env, it, lo);
363
364         next = lo->ldo_stripe[it->lit_stripe_index];
365         LASSERT(next != NULL);
366         LASSERT(next->do_index_ops != NULL);
367
368         return next->do_index_ops->dio_it.get(env, it->lit_it, key);
369 }
370
371 /**
372  * Implementation of dt_index_operations:: dio_it.put
373  *
374  * This function is supposed to be the pair of it_get, but currently do
375  * nothing. see (osd_it_ea_put or osd_index_it_put)
376  */
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
378 {
379         struct lod_it           *it = (struct lod_it *)di;
380         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
381         struct dt_object        *next;
382
383         LOD_CHECK_STRIPED_IT(env, it, lo);
384
385         next = lo->ldo_stripe[it->lit_stripe_index];
386         LASSERT(next != NULL);
387         LASSERT(next->do_index_ops != NULL);
388
389         return next->do_index_ops->dio_it.put(env, it->lit_it);
390 }
391
392 /**
393  * Implementation of dt_index_operations:: dio_it.next
394  *
395  * This function is to position the iterator to the next entry, if current
396  * stripe is finished by checking the return value of next() in current
397  * stripe. it will go to next stripe. In the mean time, the sub-iterator
398  * for next stripe needs to be initialized.
399  *
400  * \param[in] env       execution environment.
401  * \param[in] di        the iterator for striped directory.
402  *
403  * \retval      0 if successfully position iterator to the next entry.
404  * \retval      negative error if position is failed.
405  */
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
407 {
408         struct lod_it           *it = (struct lod_it *)di;
409         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
410         struct dt_object        *next;
411         struct dt_it            *it_next;
412         int                     rc;
413         ENTRY;
414
415         LOD_CHECK_STRIPED_IT(env, it, lo);
416
417         next = lo->ldo_stripe[it->lit_stripe_index];
418         LASSERT(next != NULL);
419         LASSERT(next->do_index_ops != NULL);
420 again:
421         rc = next->do_index_ops->dio_it.next(env, it->lit_it);
422         if (rc < 0)
423                 RETURN(rc);
424
425         if (rc == 0 && it->lit_stripe_index == 0)
426                 RETURN(rc);
427
428         if (rc == 0 && it->lit_stripe_index > 0) {
429                 struct lu_dirent *ent;
430
431                 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
432
433                 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434                                                     (struct dt_rec *)ent,
435                                                     it->lit_attr);
436                 if (rc != 0)
437                         RETURN(rc);
438
439                 /* skip . and .. for slave stripe */
440                 if ((strncmp(ent->lde_name, ".",
441                              le16_to_cpu(ent->lde_namelen)) == 0 &&
442                      le16_to_cpu(ent->lde_namelen) == 1) ||
443                     (strncmp(ent->lde_name, "..",
444                              le16_to_cpu(ent->lde_namelen)) == 0 &&
445                      le16_to_cpu(ent->lde_namelen) == 2))
446                         goto again;
447
448                 RETURN(rc);
449         }
450
451         /* go to next stripe */
452         if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
453                 RETURN(1);
454
455         it->lit_stripe_index++;
456
457         next->do_index_ops->dio_it.put(env, it->lit_it);
458         next->do_index_ops->dio_it.fini(env, it->lit_it);
459
460         rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
461         if (rc != 0)
462                 RETURN(rc);
463
464         next = lo->ldo_stripe[it->lit_stripe_index];
465         LASSERT(next != NULL);
466         LASSERT(next->do_index_ops != NULL);
467
468         it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
469                                                   BYPASS_CAPA);
470         if (!IS_ERR(it_next)) {
471                 it->lit_it = it_next;
472                 goto again;
473         } else {
474                 rc = PTR_ERR(it_next);
475         }
476
477         RETURN(rc);
478 }
479
480 /**
481  * Implementation of dt_index_operations:: dio_it.key
482  *
483  * This function is to get the key of the iterator at current position.
484  *
485  * \param[in] env       execution environment.
486  * \param[in] di        the iterator for striped directory.
487  *
488  * \retval      key(dt_key) if successfully get the key.
489  * \retval      negative error if can not get the key.
490  */
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492                                          const struct dt_it *di)
493 {
494         const struct lod_it     *it = (const struct lod_it *)di;
495         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
496         struct dt_object        *next;
497
498         LOD_CHECK_STRIPED_IT(env, it, lo);
499
500         next = lo->ldo_stripe[it->lit_stripe_index];
501         LASSERT(next != NULL);
502         LASSERT(next->do_index_ops != NULL);
503
504         return next->do_index_ops->dio_it.key(env, it->lit_it);
505 }
506
507 /**
508  * Implementation of dt_index_operations:: dio_it.key_size
509  *
510  * This function is to get the key_size of current key.
511  *
512  * \param[in] env       execution environment.
513  * \param[in] di        the iterator for striped directory.
514  *
515  * \retval      key_size if successfully get the key_size.
516  * \retval      negative error if can not get the key_size.
517  */
518 static int lod_striped_it_key_size(const struct lu_env *env,
519                                    const struct dt_it *di)
520 {
521         struct lod_it           *it = (struct lod_it *)di;
522         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
523         struct dt_object        *next;
524
525         LOD_CHECK_STRIPED_IT(env, it, lo);
526
527         next = lo->ldo_stripe[it->lit_stripe_index];
528         LASSERT(next != NULL);
529         LASSERT(next->do_index_ops != NULL);
530
531         return next->do_index_ops->dio_it.key_size(env, it->lit_it);
532 }
533
534 /**
535  * Implementation of dt_index_operations:: dio_it.rec
536  *
537  * This function is to get the record at current position.
538  *
539  * \param[in] env       execution environment.
540  * \param[in] di        the iterator for striped directory.
541  * \param[in] attr      the attribute of iterator, mostly used to indicate
542  *                      the entry attribute in the object to be iterated.
543  * \param[out] rec      hold the return record.
544  *
545  * \retval      0 if successfully get the entry.
546  * \retval      negative error if can not get entry.
547  */
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549                               struct dt_rec *rec, __u32 attr)
550 {
551         const struct lod_it     *it = (const struct lod_it *)di;
552         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
553         struct dt_object        *next;
554
555         LOD_CHECK_STRIPED_IT(env, it, lo);
556
557         next = lo->ldo_stripe[it->lit_stripe_index];
558         LASSERT(next != NULL);
559         LASSERT(next->do_index_ops != NULL);
560
561         return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
562 }
563
564 /**
565  * Implementation of dt_index_operations:: dio_it.rec_size
566  *
567  * This function is to get the record_size at current record.
568  *
569  * \param[in] env       execution environment.
570  * \param[in] di        the iterator for striped directory.
571  * \param[in] attr      the attribute of iterator, mostly used to indicate
572  *                      the entry attribute in the object to be iterated.
573  *
574  * \retval      rec_size if successfully get the entry size.
575  * \retval      negative error if can not get entry size.
576  */
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578                                    const struct dt_it *di, __u32 attr)
579 {
580         struct lod_it           *it = (struct lod_it *)di;
581         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
582         struct dt_object        *next;
583
584         LOD_CHECK_STRIPED_IT(env, it, lo);
585
586         next = lo->ldo_stripe[it->lit_stripe_index];
587         LASSERT(next != NULL);
588         LASSERT(next->do_index_ops != NULL);
589
590         return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
591 }
592
593 /**
594  * Implementation of dt_index_operations:: dio_it.store
595  *
596  * This function will a cookie for current position of the iterator head,
597  * so that user can use this cookie to load/start the iterator next time.
598  *
599  * \param[in] env       execution environment.
600  * \param[in] di        the iterator for striped directory.
601  *
602  * \retval      the cookie.
603  */
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605                                   const struct dt_it *di)
606 {
607         const struct lod_it     *it = (const struct lod_it *)di;
608         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
609         struct dt_object        *next;
610
611         LOD_CHECK_STRIPED_IT(env, it, lo);
612
613         next = lo->ldo_stripe[it->lit_stripe_index];
614         LASSERT(next != NULL);
615         LASSERT(next->do_index_ops != NULL);
616
617         return next->do_index_ops->dio_it.store(env, it->lit_it);
618 }
619
620 /**
621  * Implementation of dt_index_operations:: dio_it.load
622  *
623  * This function will position the iterator with the given hash(usually
624  * get from store),
625  *
626  * \param[in] env       execution environment.
627  * \param[in] di        the iterator for striped directory.
628  * \param[in] hash      the given hash.
629  *
630  * \retval      >0 if successfuly load the iterator to the given position.
631  * \retval      <0 if load is failed.
632  */
633 static int lod_striped_it_load(const struct lu_env *env,
634                                const struct dt_it *di, __u64 hash)
635 {
636         const struct lod_it     *it = (const struct lod_it *)di;
637         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
638         struct dt_object        *next;
639
640         LOD_CHECK_STRIPED_IT(env, it, lo);
641
642         next = lo->ldo_stripe[it->lit_stripe_index];
643         LASSERT(next != NULL);
644         LASSERT(next->do_index_ops != NULL);
645
646         return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
647 }
648
649 static struct dt_index_operations lod_striped_index_ops = {
650         .dio_lookup             = lod_index_lookup,
651         .dio_declare_insert     = lod_declare_index_insert,
652         .dio_insert             = lod_index_insert,
653         .dio_declare_delete     = lod_declare_index_delete,
654         .dio_delete             = lod_index_delete,
655         .dio_it = {
656                 .init           = lod_striped_it_init,
657                 .fini           = lod_striped_it_fini,
658                 .get            = lod_striped_it_get,
659                 .put            = lod_striped_it_put,
660                 .next           = lod_striped_it_next,
661                 .key            = lod_striped_it_key,
662                 .key_size       = lod_striped_it_key_size,
663                 .rec            = lod_striped_it_rec,
664                 .rec_size       = lod_striped_it_rec_size,
665                 .store          = lod_striped_it_store,
666                 .load           = lod_striped_it_load,
667         }
668 };
669
670 /**
671  * Append the FID for each shard of the striped directory after the
672  * given LMV EA header.
673  *
674  * To simplify striped directory and the consistency verification,
675  * we only store the LMV EA header on disk, for both master object
676  * and slave objects. When someone wants to know the whole LMV EA,
677  * such as client readdir(), we can build the entrie LMV EA on the
678  * MDT side (in RAM) via iterating the sub-directory entries that
679  * are contained in the master object of the stripe directory.
680  *
681  * For the master object of the striped directroy, the valid name
682  * for each shard is composed of the ${shard_FID}:${shard_idx}.
683  *
684  * There may be holes in the LMV EA if some shards' name entries
685  * are corrupted or lost.
686  *
687  * \param[in] env       pointer to the thread context
688  * \param[in] lo        pointer to the master object of the striped directory
689  * \param[in] buf       pointer to the lu_buf which will hold the LMV EA
690  * \param[in] resize    whether re-allocate the buffer if it is not big enough
691  *
692  * \retval              positive size of the LMV EA
693  * \retval              0 for nothing to be loaded
694  * \retval              negative error number on failure
695  */
696 int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
697                         struct lu_buf *buf, bool resize)
698 {
699         struct lu_dirent        *ent    =
700                         (struct lu_dirent *)lod_env_info(env)->lti_key;
701         struct lod_device       *lod    = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
702         struct dt_object        *obj    = dt_object_child(&lo->ldo_obj);
703         struct lmv_mds_md_v1    *lmv1   = buf->lb_buf;
704         struct dt_it            *it;
705         const struct dt_it_ops  *iops;
706         __u32                    stripes;
707         __u32                    magic  = le32_to_cpu(lmv1->lmv_magic);
708         int                      size;
709         int                      rc;
710         ENTRY;
711
712         /* If it is not a striped directory, then load nothing. */
713         if (magic != LMV_MAGIC_V1)
714                 RETURN(0);
715
716         /* If it is in migration (or failure), then load nothing. */
717         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
718                 RETURN(0);
719
720         stripes = le32_to_cpu(lmv1->lmv_stripe_count);
721         if (stripes < 1)
722                 RETURN(0);
723
724         size = lmv_mds_md_size(stripes, magic);
725         if (buf->lb_len < size) {
726                 struct lu_buf tbuf;
727
728                 if (!resize)
729                         RETURN(-ERANGE);
730
731                 tbuf = *buf;
732                 buf->lb_buf = NULL;
733                 buf->lb_len = 0;
734                 lu_buf_alloc(buf, size);
735                 lmv1 = buf->lb_buf;
736                 if (lmv1 == NULL)
737                         RETURN(-ENOMEM);
738
739                 memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
740         }
741
742         if (unlikely(!dt_try_as_dir(env, obj)))
743                 RETURN(-ENOTDIR);
744
745         memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
746         iops = &obj->do_index_ops->dio_it;
747         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
748         if (IS_ERR(it))
749                 RETURN(PTR_ERR(it));
750
751         rc = iops->load(env, it, 0);
752         if (rc == 0)
753                 rc = iops->next(env, it);
754         else if (rc > 0)
755                 rc = 0;
756
757         while (rc == 0) {
758                 char             name[FID_LEN + 2] = "";
759                 struct lu_fid    fid;
760                 __u32            index;
761                 int              len;
762
763                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
764                 if (rc != 0)
765                         break;
766
767                 rc = -EIO;
768
769                 fid_le_to_cpu(&fid, &ent->lde_fid);
770                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
771                 if (ent->lde_name[0] == '.') {
772                         if (ent->lde_namelen == 1)
773                                 goto next;
774
775                         if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
776                                 goto next;
777                 }
778
779                 len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
780                 /* The ent->lde_name is composed of ${FID}:${index} */
781                 if (ent->lde_namelen < len + 1 ||
782                     memcmp(ent->lde_name, name, len) != 0) {
783                         CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
784                                "%s: invalid shard name %.*s with the FID "DFID
785                                " for the striped directory "DFID", %s\n",
786                                lod2obd(lod)->obd_name, ent->lde_namelen,
787                                ent->lde_name, PFID(&fid),
788                                PFID(lu_object_fid(&obj->do_lu)),
789                                lod->lod_lmv_failout ? "failout" : "skip");
790
791                         if (lod->lod_lmv_failout)
792                                 break;
793
794                         goto next;
795                 }
796
797                 index = 0;
798                 do {
799                         if (ent->lde_name[len] < '0' ||
800                             ent->lde_name[len] > '9') {
801                                 CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
802                                        "%s: invalid shard name %.*s with the "
803                                        "FID "DFID" for the striped directory "
804                                        DFID", %s\n",
805                                        lod2obd(lod)->obd_name, ent->lde_namelen,
806                                        ent->lde_name, PFID(&fid),
807                                        PFID(lu_object_fid(&obj->do_lu)),
808                                        lod->lod_lmv_failout ?
809                                        "failout" : "skip");
810
811                                 if (lod->lod_lmv_failout)
812                                         break;
813
814                                 goto next;
815                         }
816
817                         index = index * 10 + ent->lde_name[len++] - '0';
818                 } while (len < ent->lde_namelen);
819
820                 if (len == ent->lde_namelen) {
821                         /* Out of LMV EA range. */
822                         if (index >= stripes) {
823                                 CERROR("%s: the shard %.*s for the striped "
824                                        "directory "DFID" is out of the known "
825                                        "LMV EA range [0 - %u], failout\n",
826                                        lod2obd(lod)->obd_name, ent->lde_namelen,
827                                        ent->lde_name,
828                                        PFID(lu_object_fid(&obj->do_lu)),
829                                        stripes - 1);
830
831                                 break;
832                         }
833
834                         /* The slot has been occupied. */
835                         if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
836                                 struct lu_fid fid0;
837
838                                 fid_le_to_cpu(&fid0,
839                                         &lmv1->lmv_stripe_fids[index]);
840                                 CERROR("%s: both the shard "DFID" and "DFID
841                                        " for the striped directory "DFID
842                                        " claim the same LMV EA slot at the "
843                                        "index %d, failout\n",
844                                        lod2obd(lod)->obd_name,
845                                        PFID(&fid0), PFID(&fid),
846                                        PFID(lu_object_fid(&obj->do_lu)), index);
847
848                                 break;
849                         }
850
851                         /* stored as LE mode */
852                         lmv1->lmv_stripe_fids[index] = ent->lde_fid;
853
854 next:
855                         rc = iops->next(env, it);
856                 }
857         }
858
859         iops->put(env, it);
860         iops->fini(env, it);
861
862         RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
863 }
864
865 /**
866  * Implementation of dt_object_operations:: do_index_try
867  *
868  * This function will try to initialize the index api pointer for the
869  * given object, usually it the entry point of the index api. i.e.
870  * the index object should be initialized in index_try, then start
871  * using index api. For striped directory, it will try to initialize
872  * all of its sub_stripes.
873  *
874  * \param[in] env       execution environment.
875  * \param[in] dt        the index object to be initialized.
876  * \param[in] feat      the features of this object, for example fixed or
877  *                      variable key size etc.
878  *
879  * \retval      >0 if the initialization is successful.
880  * \retval      <0 if the initialization is failed.
881  */
882 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
883                          const struct dt_index_features *feat)
884 {
885         struct lod_object       *lo = lod_dt_obj(dt);
886         struct dt_object        *next = dt_object_child(dt);
887         int                     rc;
888         ENTRY;
889
890         LASSERT(next->do_ops);
891         LASSERT(next->do_ops->do_index_try);
892
893         rc = lod_load_striping_locked(env, lo);
894         if (rc != 0)
895                 RETURN(rc);
896
897         rc = next->do_ops->do_index_try(env, next, feat);
898         if (rc != 0)
899                 RETURN(rc);
900
901         if (lo->ldo_stripenr > 0) {
902                 int i;
903
904                 for (i = 0; i < lo->ldo_stripenr; i++) {
905                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
906                                 continue;
907                         rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
908                                                 lo->ldo_stripe[i], feat);
909                         if (rc != 0)
910                                 RETURN(rc);
911                 }
912                 dt->do_index_ops = &lod_striped_index_ops;
913         } else {
914                 dt->do_index_ops = &lod_index_ops;
915         }
916
917         RETURN(rc);
918 }
919
920 static void lod_object_read_lock(const struct lu_env *env,
921                                  struct dt_object *dt, unsigned role)
922 {
923         dt_read_lock(env, dt_object_child(dt), role);
924 }
925
926 static void lod_object_write_lock(const struct lu_env *env,
927                                   struct dt_object *dt, unsigned role)
928 {
929         dt_write_lock(env, dt_object_child(dt), role);
930 }
931
932 static void lod_object_read_unlock(const struct lu_env *env,
933                                    struct dt_object *dt)
934 {
935         dt_read_unlock(env, dt_object_child(dt));
936 }
937
938 static void lod_object_write_unlock(const struct lu_env *env,
939                                     struct dt_object *dt)
940 {
941         dt_write_unlock(env, dt_object_child(dt));
942 }
943
944 static int lod_object_write_locked(const struct lu_env *env,
945                                    struct dt_object *dt)
946 {
947         return dt_write_locked(env, dt_object_child(dt));
948 }
949
950 static int lod_attr_get(const struct lu_env *env,
951                         struct dt_object *dt,
952                         struct lu_attr *attr,
953                         struct lustre_capa *capa)
954 {
955         /* Note: for striped directory, client will merge attributes
956          * from all of the sub-stripes see lmv_merge_attr(), and there
957          * no MDD logic depend on directory nlink/size/time, so we can
958          * always use master inode nlink and size for now. */
959         return dt_attr_get(env, dt_object_child(dt), attr, capa);
960 }
961
962 /**
963  * Mark all of sub-stripes dead of the striped directory.
964  **/
965 static int lod_mark_dead_object(const struct lu_env *env,
966                                 struct dt_object *dt,
967                                 struct thandle *handle,
968                                 bool declare)
969 {
970         struct lod_object       *lo = lod_dt_obj(dt);
971         struct lmv_mds_md_v1    *lmv;
972         __u32                   dead_hash_type;
973         int                     rc;
974         int                     i;
975
976         ENTRY;
977
978         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
979                 RETURN(0);
980
981         rc = lod_load_striping_locked(env, lo);
982         if (rc != 0)
983                 RETURN(rc);
984
985         if (lo->ldo_stripenr == 0)
986                 RETURN(0);
987
988         rc = lod_get_lmv_ea(env, lo);
989         if (rc <= 0)
990                 RETURN(rc);
991
992         lmv = lod_env_info(env)->lti_ea_store;
993         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
994         dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
995         lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
996         for (i = 0; i < lo->ldo_stripenr; i++) {
997                 struct lu_buf buf;
998
999                 lmv->lmv_master_mdt_index = i;
1000                 buf.lb_buf = lmv;
1001                 buf.lb_len = sizeof(*lmv);
1002                 if (declare) {
1003                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
1004                                                   XATTR_NAME_LMV,
1005                                                   LU_XATTR_REPLACE, handle);
1006                 } else {
1007                         rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
1008                                           XATTR_NAME_LMV, LU_XATTR_REPLACE,
1009                                           handle, BYPASS_CAPA);
1010                 }
1011                 if (rc != 0)
1012                         break;
1013         }
1014
1015         RETURN(rc);
1016 }
1017
1018 static int lod_declare_attr_set(const struct lu_env *env,
1019                                 struct dt_object *dt,
1020                                 const struct lu_attr *attr,
1021                                 struct thandle *handle)
1022 {
1023         struct dt_object  *next = dt_object_child(dt);
1024         struct lod_object *lo = lod_dt_obj(dt);
1025         int                rc, i;
1026         ENTRY;
1027
1028         /* Set dead object on all other stripes */
1029         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1030             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1031                 rc = lod_mark_dead_object(env, dt, handle, true);
1032                 RETURN(rc);
1033         }
1034
1035         /*
1036          * declare setattr on the local object
1037          */
1038         rc = dt_declare_attr_set(env, next, attr, handle);
1039         if (rc)
1040                 RETURN(rc);
1041
1042         /* osp_declare_attr_set() ignores all attributes other than
1043          * UID, GID, and size, and osp_attr_set() ignores all but UID
1044          * and GID.  Declaration of size attr setting happens through
1045          * lod_declare_init_size(), and not through this function.
1046          * Therefore we need not load striping unless ownership is
1047          * changing.  This should save memory and (we hope) speed up
1048          * rename(). */
1049         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1050                 if (!(attr->la_valid & (LA_UID | LA_GID)))
1051                         RETURN(rc);
1052
1053                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1054                         RETURN(0);
1055         } else {
1056                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1057                                         LA_ATIME | LA_MTIME | LA_CTIME)))
1058                         RETURN(rc);
1059         }
1060         /*
1061          * load striping information, notice we don't do this when object
1062          * is being initialized as we don't need this information till
1063          * few specific cases like destroy, chown
1064          */
1065         rc = lod_load_striping(env, lo);
1066         if (rc)
1067                 RETURN(rc);
1068
1069         if (lo->ldo_stripenr == 0)
1070                 RETURN(0);
1071
1072         /*
1073          * if object is striped declare changes on the stripes
1074          */
1075         LASSERT(lo->ldo_stripe);
1076         for (i = 0; i < lo->ldo_stripenr; i++) {
1077                 if (likely(lo->ldo_stripe[i] != NULL)) {
1078                         rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
1079                                                  handle);
1080                         if (rc != 0) {
1081                                 CERROR("failed declaration: %d\n", rc);
1082                                 break;
1083                         }
1084                 }
1085         }
1086
1087         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1088             dt_object_exists(next) != 0 &&
1089             dt_object_remote(next) == 0)
1090                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
1091
1092         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1093             dt_object_exists(next) &&
1094             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1095                 struct lod_thread_info *info = lod_env_info(env);
1096                 struct lu_buf *buf = &info->lti_buf;
1097
1098                 buf->lb_buf = info->lti_ea_store;
1099                 buf->lb_len = info->lti_ea_store_size;
1100                 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
1101                                      LU_XATTR_REPLACE, handle);
1102         }
1103
1104         RETURN(rc);
1105 }
1106
1107 static int lod_attr_set(const struct lu_env *env,
1108                         struct dt_object *dt,
1109                         const struct lu_attr *attr,
1110                         struct thandle *handle,
1111                         struct lustre_capa *capa)
1112 {
1113         struct dt_object        *next = dt_object_child(dt);
1114         struct lod_object       *lo = lod_dt_obj(dt);
1115         int                     rc, i;
1116         ENTRY;
1117
1118         /* Set dead object on all other stripes */
1119         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1120             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1121                 rc = lod_mark_dead_object(env, dt, handle, false);
1122                 RETURN(rc);
1123         }
1124
1125         /*
1126          * apply changes to the local object
1127          */
1128         rc = dt_attr_set(env, next, attr, handle, capa);
1129         if (rc)
1130                 RETURN(rc);
1131
1132         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1133                 if (!(attr->la_valid & (LA_UID | LA_GID)))
1134                         RETURN(rc);
1135
1136                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1137                         RETURN(0);
1138         } else {
1139                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1140                                         LA_ATIME | LA_MTIME | LA_CTIME)))
1141                         RETURN(rc);
1142         }
1143
1144         if (lo->ldo_stripenr == 0)
1145                 RETURN(0);
1146
1147         /*
1148          * if object is striped, apply changes to all the stripes
1149          */
1150         LASSERT(lo->ldo_stripe);
1151         for (i = 0; i < lo->ldo_stripenr; i++) {
1152                 if (likely(lo->ldo_stripe[i] != NULL)) {
1153                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
1154                                 continue;
1155
1156                         rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
1157                                          handle, capa);
1158                         if (rc != 0) {
1159                                 CERROR("failed declaration: %d\n", rc);
1160                                 break;
1161                         }
1162                 }
1163         }
1164
1165         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1166             dt_object_exists(next) != 0 &&
1167             dt_object_remote(next) == 0)
1168                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1169
1170         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1171             dt_object_exists(next) &&
1172             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1173                 struct lod_thread_info *info = lod_env_info(env);
1174                 struct lu_buf *buf = &info->lti_buf;
1175                 struct ost_id *oi = &info->lti_ostid;
1176                 struct lu_fid *fid = &info->lti_fid;
1177                 struct lov_mds_md_v1 *lmm;
1178                 struct lov_ost_data_v1 *objs;
1179                 __u32 magic;
1180                 int rc1;
1181
1182                 rc1 = lod_get_lov_ea(env, lo);
1183                 if (rc1  <= 0)
1184                         RETURN(rc);
1185
1186                 buf->lb_buf = info->lti_ea_store;
1187                 buf->lb_len = info->lti_ea_store_size;
1188                 lmm = info->lti_ea_store;
1189                 magic = le32_to_cpu(lmm->lmm_magic);
1190                 if (magic == LOV_MAGIC_V1)
1191                         objs = &(lmm->lmm_objects[0]);
1192                 else
1193                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1194                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1195                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1196                 fid->f_oid--;
1197                 fid_to_ostid(fid, oi);
1198                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1199                 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1200                              LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1201         }
1202
1203         RETURN(rc);
1204 }
1205
1206 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1207                          struct lu_buf *buf, const char *name,
1208                          struct lustre_capa *capa)
1209 {
1210         struct lod_thread_info  *info = lod_env_info(env);
1211         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
1212         int                      rc, is_root;
1213         ENTRY;
1214
1215         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1216         if (strcmp(name, XATTR_NAME_LMV) == 0) {
1217                 struct lmv_mds_md_v1    *lmv1;
1218                 int                      rc1 = 0;
1219
1220                 if (rc > sizeof(*lmv1))
1221                         RETURN(rc);
1222
1223                 if (rc < sizeof(*lmv1))
1224                         RETURN(rc = rc > 0 ? -EINVAL : rc);
1225
1226                 if (buf->lb_buf == NULL || buf->lb_len == 0) {
1227                         CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
1228
1229                         info->lti_buf.lb_buf = info->lti_key;
1230                         info->lti_buf.lb_len = sizeof(*lmv1);
1231                         rc = dt_xattr_get(env, dt_object_child(dt),
1232                                           &info->lti_buf, name, capa);
1233                         if (unlikely(rc != sizeof(*lmv1)))
1234                                 RETURN(rc = rc > 0 ? -EINVAL : rc);
1235
1236                         lmv1 = info->lti_buf.lb_buf;
1237                         /* The on-disk LMV EA only contains header, but the
1238                          * returned LMV EA size should contain the space for
1239                          * the FIDs of all shards of the striped directory. */
1240                         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
1241                                 rc = lmv_mds_md_size(
1242                                         le32_to_cpu(lmv1->lmv_stripe_count),
1243                                         LMV_MAGIC_V1);
1244                 } else {
1245                         rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
1246                                                   buf, false);
1247                 }
1248
1249                 RETURN(rc = rc1 != 0 ? rc1 : rc);
1250         }
1251
1252         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1253                 RETURN(rc);
1254
1255         /*
1256          * lod returns default striping on the real root of the device
1257          * this is like the root stores default striping for the whole
1258          * filesystem. historically we've been using a different approach
1259          * and store it in the config.
1260          */
1261         dt_root_get(env, dev->lod_child, &info->lti_fid);
1262         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1263
1264         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1265                 struct lov_user_md *lum = buf->lb_buf;
1266                 struct lov_desc    *desc = &dev->lod_desc;
1267
1268                 if (buf->lb_buf == NULL) {
1269                         rc = sizeof(*lum);
1270                 } else if (buf->lb_len >= sizeof(*lum)) {
1271                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1272                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1273                         lmm_oi_set_id(&lum->lmm_oi, 0);
1274                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1275                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1276                         lum->lmm_stripe_size = cpu_to_le32(
1277                                                 desc->ld_default_stripe_size);
1278                         lum->lmm_stripe_count = cpu_to_le16(
1279                                                 desc->ld_default_stripe_count);
1280                         lum->lmm_stripe_offset = cpu_to_le16(
1281                                                 desc->ld_default_stripe_offset);
1282                         rc = sizeof(*lum);
1283                 } else {
1284                         rc = -ERANGE;
1285                 }
1286         }
1287
1288         RETURN(rc);
1289 }
1290
1291 static int lod_verify_md_striping(struct lod_device *lod,
1292                                   const struct lmv_user_md_v1 *lum)
1293 {
1294         int     rc = 0;
1295         ENTRY;
1296
1297         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1298                 GOTO(out, rc = -EINVAL);
1299
1300         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1301                 GOTO(out, rc = -EINVAL);
1302 out:
1303         if (rc != 0)
1304                 CERROR("%s: invalid lmv_user_md: magic = %x, "
1305                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1306                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1307                        (int)le32_to_cpu(lum->lum_stripe_offset),
1308                        le32_to_cpu(lum->lum_stripe_count), rc);
1309         return rc;
1310 }
1311
1312 /**
1313  * Master LMVEA will be same as slave LMVEA, except
1314  * 1. different magic
1315  * 2. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1316  */
1317 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1318                                   const struct lmv_mds_md_v1 *master_lmv)
1319 {
1320         *slave_lmv = *master_lmv;
1321         slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1322 }
1323
1324 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1325                     struct lu_buf *lmv_buf)
1326 {
1327         struct lod_thread_info  *info = lod_env_info(env);
1328         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1329         struct lod_object       *lo = lod_dt_obj(dt);
1330         struct lmv_mds_md_v1    *lmm1;
1331         int                     stripe_count;
1332         int                     type = LU_SEQ_RANGE_ANY;
1333         int                     rc;
1334         __u32                   mdtidx;
1335         ENTRY;
1336
1337         LASSERT(lo->ldo_dir_striped != 0);
1338         LASSERT(lo->ldo_stripenr > 0);
1339         stripe_count = lo->ldo_stripenr;
1340         /* Only store the LMV EA heahder on the disk. */
1341         if (info->lti_ea_store_size < sizeof(*lmm1)) {
1342                 rc = lod_ea_store_resize(info, sizeof(*lmm1));
1343                 if (rc != 0)
1344                         RETURN(rc);
1345         } else {
1346                 memset(info->lti_ea_store, 0, sizeof(*lmm1));
1347         }
1348
1349         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1350         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1351         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1352         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1353         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1354                             &mdtidx, &type);
1355         if (rc != 0)
1356                 RETURN(rc);
1357
1358         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1359         lmv_buf->lb_buf = info->lti_ea_store;
1360         lmv_buf->lb_len = sizeof(*lmm1);
1361         lo->ldo_dir_striping_cached = 1;
1362
1363         RETURN(rc);
1364 }
1365
1366 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1367                            const struct lu_buf *buf)
1368 {
1369         struct lod_thread_info  *info = lod_env_info(env);
1370         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1371         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1372         struct dt_object        **stripe;
1373         union lmv_mds_md        *lmm = buf->lb_buf;
1374         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
1375         struct lu_fid           *fid = &info->lti_fid;
1376         int                     i;
1377         int                     rc = 0;
1378         ENTRY;
1379
1380         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1381                 RETURN(0);
1382
1383         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1384                 lo->ldo_dir_slave_stripe = 1;
1385                 RETURN(0);
1386         }
1387
1388         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1389                 RETURN(-EINVAL);
1390
1391         if (le32_to_cpu(lmv1->lmv_stripe_count) < 1)
1392                 RETURN(0);
1393
1394         LASSERT(lo->ldo_stripe == NULL);
1395         OBD_ALLOC(stripe, sizeof(stripe[0]) *
1396                   (le32_to_cpu(lmv1->lmv_stripe_count)));
1397         if (stripe == NULL)
1398                 RETURN(-ENOMEM);
1399
1400         for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1401                 struct dt_device        *tgt_dt;
1402                 struct dt_object        *dto;
1403                 int                     type = LU_SEQ_RANGE_ANY;
1404                 __u32                   idx;
1405
1406                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1407                 if (!fid_is_sane(fid))
1408                         GOTO(out, rc = -ESTALE);
1409
1410                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1411                 if (rc != 0)
1412                         GOTO(out, rc);
1413
1414                 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1415                         tgt_dt = lod->lod_child;
1416                 } else {
1417                         struct lod_tgt_desc     *tgt;
1418
1419                         tgt = LTD_TGT(ltd, idx);
1420                         if (tgt == NULL)
1421                                 GOTO(out, rc = -ESTALE);
1422                         tgt_dt = tgt->ltd_tgt;
1423                 }
1424
1425                 dto = dt_locate_at(env, tgt_dt, fid,
1426                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1427                                   NULL);
1428                 if (IS_ERR(dto))
1429                         GOTO(out, rc = PTR_ERR(dto));
1430
1431                 stripe[i] = dto;
1432         }
1433 out:
1434         lo->ldo_stripe = stripe;
1435         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1436         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1437         if (rc != 0)
1438                 lod_object_free_striping(env, lo);
1439
1440         RETURN(rc);
1441 }
1442
1443 static int lod_prep_md_striped_create(const struct lu_env *env,
1444                                       struct dt_object *dt,
1445                                       struct lu_attr *attr,
1446                                       const struct lmv_user_md_v1 *lum,
1447                                       struct dt_object_format *dof,
1448                                       struct thandle *th)
1449 {
1450         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1451         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1452         struct lod_object       *lo = lod_dt_obj(dt);
1453         struct lod_thread_info  *info = lod_env_info(env);
1454         struct dt_object        **stripe;
1455         struct lu_buf           lmv_buf;
1456         struct lu_buf           slave_lmv_buf;
1457         struct lmv_mds_md_v1    *lmm;
1458         struct lmv_mds_md_v1    *slave_lmm = NULL;
1459         struct dt_insert_rec    *rec = &info->lti_dt_rec;
1460         int                     stripe_count;
1461         int                     *idx_array;
1462         int                     rc = 0;
1463         int                     i;
1464         int                     j;
1465         ENTRY;
1466
1467         /* The lum has been verifed in lod_verify_md_striping */
1468         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1469         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1470
1471         stripe_count = le32_to_cpu(lum->lum_stripe_count);
1472
1473         /* shrink the stripe_count to the avaible MDT count */
1474         if (stripe_count > lod->lod_remote_mdt_count + 1)
1475                 stripe_count = lod->lod_remote_mdt_count + 1;
1476
1477         OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1478         if (stripe == NULL)
1479                 RETURN(-ENOMEM);
1480
1481         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1482         if (idx_array == NULL)
1483                 GOTO(out_free, rc = -ENOMEM);
1484
1485         for (i = 0; i < stripe_count; i++) {
1486                 struct lod_tgt_desc     *tgt = NULL;
1487                 struct dt_object        *dto;
1488                 struct lu_fid           fid = { 0 };
1489                 int                     idx;
1490                 struct lu_object_conf   conf = { 0 };
1491                 struct dt_device        *tgt_dt = NULL;
1492
1493                 if (i == 0) {
1494                         /* Right now, master stripe and master object are
1495                          * on the same MDT */
1496                         idx = le32_to_cpu(lum->lum_stripe_offset);
1497                         rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1498                                            NULL);
1499                         if (rc < 0)
1500                                 GOTO(out_put, rc);
1501                         tgt_dt = lod->lod_child;
1502                         goto next;
1503                 }
1504
1505                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1506
1507                 for (j = 0; j < lod->lod_remote_mdt_count;
1508                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1509                         bool already_allocated = false;
1510                         int k;
1511
1512                         CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1513                                " allocated %d, last allocated %d\n", idx,
1514                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1515
1516                         /* Find next available target */
1517                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1518                                 continue;
1519
1520                         /* check whether the idx already exists
1521                          * in current allocated array */
1522                         for (k = 0; k < i; k++) {
1523                                 if (idx_array[k] == idx) {
1524                                         already_allocated = true;
1525                                         break;
1526                                 }
1527                         }
1528
1529                         if (already_allocated)
1530                                 continue;
1531
1532                         /* check the status of the OSP */
1533                         tgt = LTD_TGT(ltd, idx);
1534                         if (tgt == NULL)
1535                                 continue;
1536
1537                         tgt_dt = tgt->ltd_tgt;
1538                         rc = dt_statfs(env, tgt_dt, NULL);
1539                         if (rc) {
1540                                 /* this OSP doesn't feel well */
1541                                 rc = 0;
1542                                 continue;
1543                         }
1544
1545                         rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1546                         if (rc < 0) {
1547                                 rc = 0;
1548                                 continue;
1549                         }
1550
1551                         break;
1552                 }
1553
1554                 /* Can not allocate more stripes */
1555                 if (j == lod->lod_remote_mdt_count) {
1556                         CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1557                                lod2obd(lod)->obd_name, stripe_count, i - 1);
1558                         break;
1559                 }
1560
1561                 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1562                        " allocated %d, last allocated %d\n", idx,
1563                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1564
1565 next:
1566                 /* tgt_dt and fid must be ready after search avaible OSP
1567                  * in the above loop */
1568                 LASSERT(tgt_dt != NULL);
1569                 LASSERT(fid_is_sane(&fid));
1570                 conf.loc_flags = LOC_F_NEW;
1571                 dto = dt_locate_at(env, tgt_dt, &fid,
1572                                    dt->do_lu.lo_dev->ld_site->ls_top_dev,
1573                                    &conf);
1574                 if (IS_ERR(dto))
1575                         GOTO(out_put, rc = PTR_ERR(dto));
1576                 stripe[i] = dto;
1577                 idx_array[i] = idx;
1578         }
1579
1580         lo->ldo_dir_striped = 1;
1581         lo->ldo_stripe = stripe;
1582         lo->ldo_stripenr = i;
1583         lo->ldo_stripes_allocated = stripe_count;
1584
1585         if (lo->ldo_stripenr == 0)
1586                 GOTO(out_put, rc = -ENOSPC);
1587
1588         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1589         if (rc != 0)
1590                 GOTO(out_put, rc);
1591         lmm = lmv_buf.lb_buf;
1592
1593         OBD_ALLOC_PTR(slave_lmm);
1594         if (slave_lmm == NULL)
1595                 GOTO(out_put, rc = -ENOMEM);
1596
1597         lod_prep_slave_lmv_md(slave_lmm, lmm);
1598         slave_lmv_buf.lb_buf = slave_lmm;
1599         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1600
1601         if (!dt_try_as_dir(env, dt_object_child(dt)))
1602                 GOTO(out_put, rc = -EINVAL);
1603
1604         rec->rec_type = S_IFDIR;
1605         for (i = 0; i < lo->ldo_stripenr; i++) {
1606                 struct dt_object        *dto            = stripe[i];
1607                 char                    *stripe_name    = info->lti_key;
1608                 struct lu_name          *sname;
1609                 struct linkea_data       ldata          = { 0 };
1610                 struct lu_buf            linkea_buf;
1611
1612                 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1613                 if (rc != 0)
1614                         GOTO(out_put, rc);
1615
1616                 if (!dt_try_as_dir(env, dto))
1617                         GOTO(out_put, rc = -EINVAL);
1618
1619                 rec->rec_fid = lu_object_fid(&dto->do_lu);
1620                 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
1621                                        (const struct dt_key *)dot, th);
1622                 if (rc != 0)
1623                         GOTO(out_put, rc);
1624
1625                 /* master stripe FID will be put to .. */
1626                 rec->rec_fid = lu_object_fid(&dt->do_lu);
1627                 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
1628                                        (const struct dt_key *)dotdot, th);
1629                 if (rc != 0)
1630                         GOTO(out_put, rc);
1631
1632                 /* probably nothing to inherite */
1633                 if (lo->ldo_striping_cached &&
1634                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1635                                          lo->ldo_def_stripenr,
1636                                          lo->ldo_def_stripe_offset)) {
1637                         struct lov_user_md_v3   *v3;
1638
1639                         /* sigh, lti_ea_store has been used for lmv_buf,
1640                          * so we have to allocate buffer for default
1641                          * stripe EA */
1642                         OBD_ALLOC_PTR(v3);
1643                         if (v3 == NULL)
1644                                 GOTO(out_put, rc = -ENOMEM);
1645
1646                         memset(v3, 0, sizeof(*v3));
1647                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1648                         v3->lmm_stripe_count =
1649                                 cpu_to_le16(lo->ldo_def_stripenr);
1650                         v3->lmm_stripe_offset =
1651                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1652                         v3->lmm_stripe_size =
1653                                 cpu_to_le32(lo->ldo_def_stripe_size);
1654                         if (lo->ldo_pool != NULL)
1655                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1656                                         sizeof(v3->lmm_pool_name));
1657
1658                         info->lti_buf.lb_buf = v3;
1659                         info->lti_buf.lb_len = sizeof(*v3);
1660                         rc = dt_declare_xattr_set(env, dto,
1661                                                   &info->lti_buf,
1662                                                   XATTR_NAME_LOV,
1663                                                   0, th);
1664                         OBD_FREE_PTR(v3);
1665                         if (rc != 0)
1666                                 GOTO(out_put, rc);
1667                 }
1668
1669                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1670                 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1671                                           XATTR_NAME_LMV, 0, th);
1672                 if (rc != 0)
1673                         GOTO(out_put, rc);
1674
1675                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1676                         PFID(lu_object_fid(&dto->do_lu)), i);
1677
1678                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1679                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1680                 if (rc != 0)
1681                         GOTO(out_put, rc);
1682
1683                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1684                 if (rc != 0)
1685                         GOTO(out_put, rc);
1686
1687                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1688                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1689                 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1690                                           XATTR_NAME_LINK, 0, th);
1691                 if (rc != 0)
1692                         GOTO(out_put, rc);
1693
1694                 rec->rec_fid = lu_object_fid(&dto->do_lu);
1695                 rc = dt_declare_insert(env, dt_object_child(dt),
1696                                        (const struct dt_rec *)rec,
1697                                        (const struct dt_key *)stripe_name, th);
1698                 if (rc != 0)
1699                         GOTO(out_put, rc);
1700
1701                 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1702                 if (rc != 0)
1703                         GOTO(out_put, rc);
1704         }
1705
1706         rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1707                                   XATTR_NAME_LMV, 0, th);
1708         if (rc != 0)
1709                 GOTO(out_put, rc);
1710
1711 out_put:
1712         if (rc < 0) {
1713                 for (i = 0; i < stripe_count; i++)
1714                         if (stripe[i] != NULL)
1715                                 lu_object_put(env, &stripe[i]->do_lu);
1716                 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1717                 lo->ldo_stripenr = 0;
1718                 lo->ldo_stripes_allocated = 0;
1719                 lo->ldo_stripe = NULL;
1720         }
1721
1722 out_free:
1723         if (idx_array != NULL)
1724                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1725         if (slave_lmm != NULL)
1726                 OBD_FREE_PTR(slave_lmm);
1727
1728         RETURN(rc);
1729 }
1730
1731 /**
1732  * Declare create striped md object.
1733  */
1734 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1735                                      struct dt_object *dt,
1736                                      struct lu_attr *attr,
1737                                      const struct lu_buf *lum_buf,
1738                                      struct dt_object_format *dof,
1739                                      struct thandle *th)
1740 {
1741         struct lod_object       *lo = lod_dt_obj(dt);
1742         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1743         struct lmv_user_md_v1   *lum;
1744         int                     rc;
1745         ENTRY;
1746
1747         lum = lum_buf->lb_buf;
1748         LASSERT(lum != NULL);
1749
1750         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1751                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1752                (int)le32_to_cpu(lum->lum_stripe_offset));
1753
1754         if (le32_to_cpu(lum->lum_stripe_count) == 0)
1755                 GOTO(out, rc = 0);
1756
1757         rc = lod_verify_md_striping(lod, lum);
1758         if (rc != 0)
1759                 GOTO(out, rc);
1760
1761         /* prepare dir striped objects */
1762         rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1763         if (rc != 0) {
1764                 /* failed to create striping, let's reset
1765                  * config so that others don't get confused */
1766                 lod_object_free_striping(env, lo);
1767                 GOTO(out, rc);
1768         }
1769 out:
1770         RETURN(rc);
1771 }
1772
1773 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1774                                      struct dt_object *dt,
1775                                      const struct lu_buf *buf,
1776                                      const char *name, int fl,
1777                                      struct thandle *th)
1778 {
1779         struct dt_object        *next = dt_object_child(dt);
1780         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1781         struct lod_object       *lo = lod_dt_obj(dt);
1782         int                     i;
1783         int                     rc;
1784         ENTRY;
1785
1786         if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1787                 struct lmv_user_md_v1 *lum;
1788
1789                 LASSERT(buf != NULL && buf->lb_buf != NULL);
1790                 lum = buf->lb_buf;
1791                 rc = lod_verify_md_striping(d, lum);
1792                 if (rc != 0)
1793                         RETURN(rc);
1794         }
1795
1796         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1797         if (rc != 0)
1798                 RETURN(rc);
1799
1800         /* set xattr to each stripes, if needed */
1801         rc = lod_load_striping(env, lo);
1802         if (rc != 0)
1803                 RETURN(rc);
1804
1805         /* Note: Do not set LinkEA on sub-stripes, otherwise
1806          * it will confuse the fid2path process(see mdt_path_current()).
1807          * The linkEA between master and sub-stripes is set in
1808          * lod_xattr_set_lmv(). */
1809         if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1810                 RETURN(0);
1811
1812         for (i = 0; i < lo->ldo_stripenr; i++) {
1813                 LASSERT(lo->ldo_stripe[i]);
1814                 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1815                                           name, fl, th);
1816                 if (rc != 0)
1817                         break;
1818         }
1819
1820         RETURN(rc);
1821 }
1822
1823 /*
1824  * LOV xattr is a storage for striping, and LOD owns this xattr.
1825  * but LOD allows others to control striping to some extent
1826  * - to reset strping
1827  * - to set new defined striping
1828  * - to set new semi-defined striping
1829  *   - number of stripes is defined
1830  *   - number of stripes + osts are defined
1831  *   - ??
1832  */
1833 static int lod_declare_xattr_set(const struct lu_env *env,
1834                                  struct dt_object *dt,
1835                                  const struct lu_buf *buf,
1836                                  const char *name, int fl,
1837                                  struct thandle *th)
1838 {
1839         struct dt_object *next = dt_object_child(dt);
1840         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
1841         __u32             mode;
1842         int               rc;
1843         ENTRY;
1844
1845         /*
1846          * allow to declare predefined striping on a new (!mode) object
1847          * which is supposed to be replay of regular file creation
1848          * (when LOV setting is declared)
1849          * LU_XATTR_REPLACE is set to indicate a layout swap
1850          */
1851         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1852         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1853              !(fl & LU_XATTR_REPLACE)) {
1854                 /*
1855                  * this is a request to manipulate object's striping
1856                  */
1857                 if (dt_object_exists(dt)) {
1858                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1859                         if (rc)
1860                                 RETURN(rc);
1861                 } else {
1862                         memset(attr, 0, sizeof(*attr));
1863                         attr->la_valid = LA_TYPE | LA_MODE;
1864                         attr->la_mode = S_IFREG;
1865                 }
1866                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1867         } else if (S_ISDIR(mode)) {
1868                 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1869         } else {
1870                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1871         }
1872
1873         RETURN(rc);
1874 }
1875
1876 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1877 {
1878         lo->ldo_striping_cached = 0;
1879         lo->ldo_def_striping_set = 0;
1880         lod_object_set_pool(lo, NULL);
1881         lo->ldo_def_stripe_size = 0;
1882         lo->ldo_def_stripenr = 0;
1883         if (lo->ldo_dir_stripe != NULL)
1884                 lo->ldo_dir_striping_cached = 0;
1885 }
1886
1887 static int lod_xattr_set_internal(const struct lu_env *env,
1888                                   struct dt_object *dt,
1889                                   const struct lu_buf *buf,
1890                                   const char *name, int fl, struct thandle *th,
1891                                   struct lustre_capa *capa)
1892 {
1893         struct dt_object        *next = dt_object_child(dt);
1894         struct lod_object       *lo = lod_dt_obj(dt);
1895         int                     rc;
1896         int                     i;
1897         ENTRY;
1898
1899         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1900         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1901                 RETURN(rc);
1902
1903         /* Note: Do not set LinkEA on sub-stripes, otherwise
1904          * it will confuse the fid2path process(see mdt_path_current()).
1905          * The linkEA between master and sub-stripes is set in
1906          * lod_xattr_set_lmv(). */
1907         if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1908                 RETURN(0);
1909
1910         for (i = 0; i < lo->ldo_stripenr; i++) {
1911                 LASSERT(lo->ldo_stripe[i]);
1912                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1913                                   capa);
1914                 if (rc != 0)
1915                         break;
1916         }
1917
1918         RETURN(rc);
1919 }
1920
1921 static int lod_xattr_del_internal(const struct lu_env *env,
1922                                   struct dt_object *dt,
1923                                   const char *name, struct thandle *th,
1924                                   struct lustre_capa *capa)
1925 {
1926         struct dt_object        *next = dt_object_child(dt);
1927         struct lod_object       *lo = lod_dt_obj(dt);
1928         int                     rc;
1929         int                     i;
1930         ENTRY;
1931
1932         rc = dt_xattr_del(env, next, name, th, capa);
1933         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1934                 RETURN(rc);
1935
1936         if (lo->ldo_stripenr == 0)
1937                 RETURN(rc);
1938
1939         for (i = 0; i < lo->ldo_stripenr; i++) {
1940                 LASSERT(lo->ldo_stripe[i]);
1941                 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1942                                   capa);
1943                 if (rc != 0)
1944                         break;
1945         }
1946
1947         RETURN(rc);
1948 }
1949
1950 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1951                                     struct dt_object *dt,
1952                                     const struct lu_buf *buf,
1953                                     const char *name, int fl,
1954                                     struct thandle *th,
1955                                     struct lustre_capa *capa)
1956 {
1957         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1958         struct lod_object       *l = lod_dt_obj(dt);
1959         struct lov_user_md_v1   *lum;
1960         struct lov_user_md_v3   *v3 = NULL;
1961         int                      rc;
1962         ENTRY;
1963
1964         /* If it is striped dir, we should clear the stripe cache for
1965          * slave stripe as well, but there are no effective way to
1966          * notify the LOD on the slave MDT, so we do not cache stripe
1967          * information for slave stripe for now. XXX*/
1968         lod_lov_stripe_cache_clear(l);
1969         LASSERT(buf != NULL && buf->lb_buf != NULL);
1970         lum = buf->lb_buf;
1971
1972         rc = lod_verify_striping(d, buf, false);
1973         if (rc)
1974                 RETURN(rc);
1975
1976         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1977                 v3 = buf->lb_buf;
1978
1979         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1980          * (i.e. all default values specified) then delete default
1981          * striping from dir. */
1982         CDEBUG(D_OTHER,
1983                 "set default striping: sz %u # %u offset %d %s %s\n",
1984                 (unsigned)lum->lmm_stripe_size,
1985                 (unsigned)lum->lmm_stripe_count,
1986                 (int)lum->lmm_stripe_offset,
1987                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1988
1989         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1990                                 (lum->lmm_stripe_count),
1991                                 (lum->lmm_stripe_offset)) &&
1992                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1993                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1994                 if (rc == -ENODATA)
1995                         rc = 0;
1996         } else {
1997                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1998         }
1999
2000         RETURN(rc);
2001 }
2002
2003 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
2004                                             struct dt_object *dt,
2005                                             const struct lu_buf *buf,
2006                                             const char *name, int fl,
2007                                             struct thandle *th,
2008                                             struct lustre_capa *capa)
2009 {
2010         struct lod_object       *l = lod_dt_obj(dt);
2011         struct lmv_user_md_v1   *lum;
2012         int                      rc;
2013         ENTRY;
2014
2015         LASSERT(buf != NULL && buf->lb_buf != NULL);
2016         lum = buf->lb_buf;
2017
2018         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
2019               le32_to_cpu(lum->lum_stripe_count),
2020               (int)le32_to_cpu(lum->lum_stripe_offset));
2021
2022         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
2023                                  le32_to_cpu(lum->lum_stripe_offset)) &&
2024                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
2025                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
2026                 if (rc == -ENODATA)
2027                         rc = 0;
2028         } else {
2029                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2030                 if (rc != 0)
2031                         RETURN(rc);
2032         }
2033
2034         /* Update default stripe cache */
2035         if (l->ldo_dir_stripe == NULL) {
2036                 OBD_ALLOC_PTR(l->ldo_dir_stripe);
2037                 if (l->ldo_dir_stripe == NULL)
2038                         RETURN(-ENOMEM);
2039         }
2040
2041         l->ldo_dir_striping_cached = 0;
2042         l->ldo_dir_def_striping_set = 1;
2043         l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
2044
2045         RETURN(rc);
2046 }
2047
2048 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
2049                              const struct lu_buf *buf, const char *name,
2050                              int fl, struct thandle *th,
2051                              struct lustre_capa *capa)
2052 {
2053         struct lod_object       *lo = lod_dt_obj(dt);
2054         struct lod_thread_info  *info = lod_env_info(env);
2055         struct lu_attr          *attr = &info->lti_attr;
2056         struct dt_object_format *dof = &info->lti_format;
2057         struct lu_buf           lmv_buf;
2058         struct lu_buf           slave_lmv_buf;
2059         struct lmv_mds_md_v1    *lmm;
2060         struct lmv_mds_md_v1    *slave_lmm = NULL;
2061         struct dt_insert_rec    *rec = &info->lti_dt_rec;
2062         int                     i;
2063         int                     rc;
2064         ENTRY;
2065
2066         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2067                 RETURN(-ENOTDIR);
2068
2069         /* The stripes are supposed to be allocated in declare phase,
2070          * if there are no stripes being allocated, it will skip */
2071         if (lo->ldo_stripenr == 0)
2072                 RETURN(0);
2073
2074         rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
2075         if (rc != 0)
2076                 RETURN(rc);
2077
2078         attr->la_valid = LA_TYPE | LA_MODE;
2079         dof->dof_type = DFT_DIR;
2080
2081         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
2082         if (rc != 0)
2083                 RETURN(rc);
2084         lmm = lmv_buf.lb_buf;
2085
2086         OBD_ALLOC_PTR(slave_lmm);
2087         if (slave_lmm == NULL)
2088                 RETURN(-ENOMEM);
2089
2090         lod_prep_slave_lmv_md(slave_lmm, lmm);
2091         slave_lmv_buf.lb_buf = slave_lmm;
2092         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
2093
2094         rec->rec_type = S_IFDIR;
2095         for (i = 0; i < lo->ldo_stripenr; i++) {
2096                 struct dt_object        *dto;
2097                 char                    *stripe_name    = info->lti_key;
2098                 struct lu_name          *sname;
2099                 struct linkea_data       ldata          = { 0 };
2100                 struct lu_buf            linkea_buf;
2101
2102                 dto = lo->ldo_stripe[i];
2103                 dt_write_lock(env, dto, MOR_TGT_CHILD);
2104                 rc = dt_create(env, dto, attr, NULL, dof, th);
2105                 dt_write_unlock(env, dto);
2106                 if (rc != 0)
2107                         RETURN(rc);
2108
2109                 rec->rec_fid = lu_object_fid(&dto->do_lu);
2110                 rc = dt_insert(env, dto, (const struct dt_rec *)rec,
2111                                (const struct dt_key *)dot, th, capa, 0);
2112                 if (rc != 0)
2113                         RETURN(rc);
2114
2115                 rec->rec_fid = lu_object_fid(&dt->do_lu);
2116                 rc = dt_insert(env, dto, (struct dt_rec *)rec,
2117                                (const struct dt_key *)dotdot, th, capa, 0);
2118                 if (rc != 0)
2119                         RETURN(rc);
2120
2121                 if (lo->ldo_striping_cached &&
2122                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2123                                          lo->ldo_def_stripenr,
2124                                          lo->ldo_def_stripe_offset)) {
2125                         struct lov_user_md_v3   *v3;
2126
2127                         /* sigh, lti_ea_store has been used for lmv_buf,
2128                          * so we have to allocate buffer for default
2129                          * stripe EA */
2130                         OBD_ALLOC_PTR(v3);
2131                         if (v3 == NULL)
2132                                 GOTO(out, rc);
2133
2134                         memset(v3, 0, sizeof(*v3));
2135                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2136                         v3->lmm_stripe_count =
2137                                 cpu_to_le16(lo->ldo_def_stripenr);
2138                         v3->lmm_stripe_offset =
2139                                 cpu_to_le16(lo->ldo_def_stripe_offset);
2140                         v3->lmm_stripe_size =
2141                                 cpu_to_le32(lo->ldo_def_stripe_size);
2142                         if (lo->ldo_pool != NULL)
2143                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2144                                         sizeof(v3->lmm_pool_name));
2145
2146                         info->lti_buf.lb_buf = v3;
2147                         info->lti_buf.lb_len = sizeof(*v3);
2148                         rc = dt_xattr_set(env, dto, &info->lti_buf,
2149                                           XATTR_NAME_LOV, 0, th, capa);
2150                         OBD_FREE_PTR(v3);
2151                         if (rc != 0)
2152                                 GOTO(out, rc);
2153                 }
2154
2155                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
2156                 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
2157                                   fl, th, capa);
2158                 if (rc != 0)
2159                         GOTO(out, rc);
2160
2161                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2162                          PFID(lu_object_fid(&dto->do_lu)), i);
2163
2164                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
2165                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
2166                 if (rc != 0)
2167                         GOTO(out, rc);
2168
2169                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
2170                 if (rc != 0)
2171                         GOTO(out, rc);
2172
2173                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
2174                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
2175                 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
2176                                   0, th, BYPASS_CAPA);
2177                 if (rc != 0)
2178                         GOTO(out, rc);
2179
2180                 rec->rec_fid = lu_object_fid(&dto->do_lu);
2181                 rc = dt_insert(env, dt_object_child(dt),
2182                                (const struct dt_rec *)rec,
2183                                (const struct dt_key *)stripe_name, th, capa, 0);
2184                 if (rc != 0)
2185                         GOTO(out, rc);
2186
2187                 rc = dt_ref_add(env, dt_object_child(dt), th);
2188                 if (rc != 0)
2189                         GOTO(out, rc);
2190         }
2191
2192         rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
2193                           fl, th, capa);
2194
2195 out:
2196         if (slave_lmm != NULL)
2197                 OBD_FREE_PTR(slave_lmm);
2198
2199         RETURN(rc);
2200 }
2201
2202 int lod_dir_striping_create_internal(const struct lu_env *env,
2203                                      struct dt_object *dt,
2204                                      struct lu_attr *attr,
2205                                      struct dt_object_format *dof,
2206                                      struct thandle *th,
2207                                      bool declare)
2208 {
2209         struct lod_thread_info  *info = lod_env_info(env);
2210         struct lod_object       *lo = lod_dt_obj(dt);
2211         int                     rc;
2212         ENTRY;
2213
2214         if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
2215                                  lo->ldo_dir_stripe_offset)) {
2216                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2217                 int stripe_count = lo->ldo_stripenr;
2218
2219                 if (info->lti_ea_store_size < sizeof(*v1)) {
2220                         rc = lod_ea_store_resize(info, sizeof(*v1));
2221                         if (rc != 0)
2222                                 RETURN(rc);
2223                         v1 = info->lti_ea_store;
2224                 }
2225
2226                 memset(v1, 0, sizeof(*v1));
2227                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2228                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2229                 v1->lum_stripe_offset =
2230                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
2231
2232                 info->lti_buf.lb_buf = v1;
2233                 info->lti_buf.lb_len = sizeof(*v1);
2234
2235                 if (declare)
2236                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
2237                                                        &info->lti_buf, dof, th);
2238                 else
2239                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2240                                                XATTR_NAME_LMV, 0, th,
2241                                                BYPASS_CAPA);
2242                 if (rc != 0)
2243                         RETURN(rc);
2244         }
2245
2246         /* Transfer default LMV striping from the parent */
2247         if (lo->ldo_dir_striping_cached &&
2248             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2249                                  lo->ldo_dir_def_stripe_offset)) {
2250                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2251                 int def_stripe_count = lo->ldo_dir_def_stripenr;
2252
2253                 if (info->lti_ea_store_size < sizeof(*v1)) {
2254                         rc = lod_ea_store_resize(info, sizeof(*v1));
2255                         if (rc != 0)
2256                                 RETURN(rc);
2257                         v1 = info->lti_ea_store;
2258                 }
2259
2260                 memset(v1, 0, sizeof(*v1));
2261                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2262                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2263                 v1->lum_stripe_offset =
2264                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2265                 v1->lum_hash_type =
2266                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
2267
2268                 info->lti_buf.lb_buf = v1;
2269                 info->lti_buf.lb_len = sizeof(*v1);
2270                 if (declare)
2271                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2272                                                        XATTR_NAME_DEFAULT_LMV,
2273                                                        0, th);
2274                 else
2275                         rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2276                                                   &info->lti_buf,
2277                                                   XATTR_NAME_DEFAULT_LMV, 0,
2278                                                   th, BYPASS_CAPA);
2279                 if (rc != 0)
2280                         RETURN(rc);
2281         }
2282
2283         /* Transfer default LOV striping from the parent */
2284         if (lo->ldo_striping_cached &&
2285             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2286                                  lo->ldo_def_stripenr,
2287                                  lo->ldo_def_stripe_offset)) {
2288                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2289
2290                 if (info->lti_ea_store_size < sizeof(*v3)) {
2291                         rc = lod_ea_store_resize(info, sizeof(*v3));
2292                         if (rc != 0)
2293                                 RETURN(rc);
2294                         v3 = info->lti_ea_store;
2295                 }
2296
2297                 memset(v3, 0, sizeof(*v3));
2298                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2299                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2300                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2301                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2302                 if (lo->ldo_pool != NULL)
2303                         strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2304                                 sizeof(v3->lmm_pool_name));
2305
2306                 info->lti_buf.lb_buf = v3;
2307                 info->lti_buf.lb_len = sizeof(*v3);
2308
2309                 if (declare)
2310                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2311                                                        XATTR_NAME_LOV, 0, th);
2312                 else
2313                         rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2314                                                       XATTR_NAME_LOV, 0, th,
2315                                                       BYPASS_CAPA);
2316                 if (rc != 0)
2317                         RETURN(rc);
2318         }
2319
2320         RETURN(0);
2321 }
2322
2323 static int lod_declare_dir_striping_create(const struct lu_env *env,
2324                                            struct dt_object *dt,
2325                                            struct lu_attr *attr,
2326                                            struct dt_object_format *dof,
2327                                            struct thandle *th)
2328 {
2329         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2330 }
2331
2332 static int lod_dir_striping_create(const struct lu_env *env,
2333                                    struct dt_object *dt,
2334                                    struct lu_attr *attr,
2335                                    struct dt_object_format *dof,
2336                                    struct thandle *th)
2337 {
2338         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2339 }
2340
2341 static int lod_xattr_set(const struct lu_env *env,
2342                          struct dt_object *dt, const struct lu_buf *buf,
2343                          const char *name, int fl, struct thandle *th,
2344                          struct lustre_capa *capa)
2345 {
2346         struct dt_object        *next = dt_object_child(dt);
2347         int                      rc;
2348         ENTRY;
2349
2350         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2351             strcmp(name, XATTR_NAME_LMV) == 0) {
2352                 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2353
2354                 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2355                                                 LMV_HASH_FLAG_MIGRATION)
2356                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2357                 else
2358                         rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2359
2360                 RETURN(rc);
2361         }
2362
2363         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2364             strcmp(name, XATTR_NAME_LOV) == 0) {
2365                 /* default LOVEA */
2366                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2367                 RETURN(rc);
2368         } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2369                    strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2370                 /* default LMVEA */
2371                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2372                                                       th, capa);
2373                 RETURN(rc);
2374         } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2375                    !strcmp(name, XATTR_NAME_LOV)) {
2376                 /* in case of lov EA swap, just set it
2377                  * if not, it is a replay so check striping match what we
2378                  * already have during req replay, declare_xattr_set()
2379                  * defines striping, then create() does the work
2380                 */
2381                 if (fl & LU_XATTR_REPLACE) {
2382                         /* free stripes, then update disk */
2383                         lod_object_free_striping(env, lod_dt_obj(dt));
2384                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2385                 } else {
2386                         rc = lod_striping_create(env, dt, NULL, NULL, th);
2387                 }
2388                 RETURN(rc);
2389         }
2390
2391         /* then all other xattr */
2392         rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2393
2394         RETURN(rc);
2395 }
2396
2397 static int lod_declare_xattr_del(const struct lu_env *env,
2398                                  struct dt_object *dt, const char *name,
2399                                  struct thandle *th)
2400 {
2401         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2402 }
2403
2404 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2405                          const char *name, struct thandle *th,
2406                          struct lustre_capa *capa)
2407 {
2408         if (!strcmp(name, XATTR_NAME_LOV))
2409                 lod_object_free_striping(env, lod_dt_obj(dt));
2410         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2411 }
2412
2413 static int lod_xattr_list(const struct lu_env *env,
2414                           struct dt_object *dt, struct lu_buf *buf,
2415                           struct lustre_capa *capa)
2416 {
2417         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2418 }
2419
2420 int lod_object_set_pool(struct lod_object *o, char *pool)
2421 {
2422         int len;
2423
2424         if (o->ldo_pool) {
2425                 len = strlen(o->ldo_pool);
2426                 OBD_FREE(o->ldo_pool, len + 1);
2427                 o->ldo_pool = NULL;
2428         }
2429         if (pool) {
2430                 len = strlen(pool);
2431                 OBD_ALLOC(o->ldo_pool, len + 1);
2432                 if (o->ldo_pool == NULL)
2433                         return -ENOMEM;
2434                 strcpy(o->ldo_pool, pool);
2435         }
2436         return 0;
2437 }
2438
2439 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2440 {
2441         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2442 }
2443
2444
2445 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2446                                          struct lod_object *lp)
2447 {
2448         struct lod_thread_info  *info = lod_env_info(env);
2449         struct lov_user_md_v1   *v1 = NULL;
2450         struct lov_user_md_v3   *v3 = NULL;
2451         int                      rc;
2452         ENTRY;
2453
2454         /* called from MDD without parent being write locked,
2455          * lock it here */
2456         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2457         rc = lod_get_lov_ea(env, lp);
2458         if (rc < 0)
2459                 GOTO(unlock, rc);
2460
2461         if (rc < sizeof(struct lov_user_md)) {
2462                 /* don't lookup for non-existing or invalid striping */
2463                 lp->ldo_def_striping_set = 0;
2464                 lp->ldo_striping_cached = 1;
2465                 lp->ldo_def_stripe_size = 0;
2466                 lp->ldo_def_stripenr = 0;
2467                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2468                 GOTO(unlock, rc = 0);
2469         }
2470
2471         rc = 0;
2472         v1 = info->lti_ea_store;
2473         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2474                 lustre_swab_lov_user_md_v1(v1);
2475         } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2476                 v3 = (struct lov_user_md_v3 *)v1;
2477                 lustre_swab_lov_user_md_v3(v3);
2478         }
2479
2480         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2481                 GOTO(unlock, rc = 0);
2482
2483         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2484                 GOTO(unlock, rc = 0);
2485
2486         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2487                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2488                (int)v1->lmm_stripe_count,
2489                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2490
2491         lp->ldo_def_stripenr = v1->lmm_stripe_count;
2492         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2493         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2494         lp->ldo_striping_cached = 1;
2495         lp->ldo_def_striping_set = 1;
2496         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2497                 /* XXX: sanity check here */
2498                 v3 = (struct lov_user_md_v3 *) v1;
2499                 if (v3->lmm_pool_name[0])
2500                         lod_object_set_pool(lp, v3->lmm_pool_name);
2501         }
2502         EXIT;
2503 unlock:
2504         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2505         return rc;
2506 }
2507
2508
2509 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2510                                          struct lod_object *lp)
2511 {
2512         struct lod_thread_info  *info = lod_env_info(env);
2513         struct lmv_user_md_v1   *v1 = NULL;
2514         int                      rc;
2515         ENTRY;
2516
2517         /* called from MDD without parent being write locked,
2518          * lock it here */
2519         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2520         rc = lod_get_default_lmv_ea(env, lp);
2521         if (rc < 0)
2522                 GOTO(unlock, rc);
2523
2524         if (rc < sizeof(struct lmv_user_md)) {
2525                 /* don't lookup for non-existing or invalid striping */
2526                 lp->ldo_dir_def_striping_set = 0;
2527                 lp->ldo_dir_striping_cached = 1;
2528                 lp->ldo_dir_def_stripenr = 0;
2529                 lp->ldo_dir_def_stripe_offset =
2530                                         (typeof(v1->lum_stripe_offset))(-1);
2531                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2532                 GOTO(unlock, rc = 0);
2533         }
2534
2535         rc = 0;
2536         v1 = info->lti_ea_store;
2537
2538         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2539         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2540         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2541         lp->ldo_dir_def_striping_set = 1;
2542         lp->ldo_dir_striping_cached = 1;
2543
2544         EXIT;
2545 unlock:
2546         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2547         return rc;
2548 }
2549
2550 static int lod_cache_parent_striping(const struct lu_env *env,
2551                                      struct lod_object *lp,
2552                                      umode_t child_mode)
2553 {
2554         int rc = 0;
2555         ENTRY;
2556
2557         rc = lod_load_striping(env, lp);
2558         if (rc != 0)
2559                 RETURN(rc);
2560
2561         if (!lp->ldo_striping_cached) {
2562                 /* we haven't tried to get default striping for
2563                  * the directory yet, let's cache it in the object */
2564                 rc = lod_cache_parent_lov_striping(env, lp);
2565                 if (rc != 0)
2566                         RETURN(rc);
2567         }
2568
2569         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2570                 rc = lod_cache_parent_lmv_striping(env, lp);
2571
2572         RETURN(rc);
2573 }
2574
2575 /**
2576  * used to transfer default striping data to the object being created
2577  */
2578 static void lod_ah_init(const struct lu_env *env,
2579                         struct dt_allocation_hint *ah,
2580                         struct dt_object *parent,
2581                         struct dt_object *child,
2582                         umode_t child_mode)
2583 {
2584         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2585         struct dt_object  *nextp = NULL;
2586         struct dt_object  *nextc;
2587         struct lod_object *lp = NULL;
2588         struct lod_object *lc;
2589         struct lov_desc   *desc;
2590         int               rc;
2591         ENTRY;
2592
2593         LASSERT(child);
2594
2595         if (likely(parent)) {
2596                 nextp = dt_object_child(parent);
2597                 lp = lod_dt_obj(parent);
2598                 rc = lod_load_striping(env, lp);
2599                 if (rc != 0)
2600                         return;
2601         }
2602
2603         nextc = dt_object_child(child);
2604         lc = lod_dt_obj(child);
2605
2606         LASSERT(lc->ldo_stripenr == 0);
2607         LASSERT(lc->ldo_stripe == NULL);
2608
2609         /*
2610          * local object may want some hints
2611          * in case of late striping creation, ->ah_init()
2612          * can be called with local object existing
2613          */
2614         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2615                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2616                                           NULL : nextp, nextc, child_mode);
2617
2618         if (S_ISDIR(child_mode)) {
2619                 if (lc->ldo_dir_stripe == NULL) {
2620                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2621                         if (lc->ldo_dir_stripe == NULL)
2622                                 return;
2623                 }
2624
2625                 if (lp->ldo_dir_stripe == NULL) {
2626                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2627                         if (lp->ldo_dir_stripe == NULL)
2628                                 return;
2629                 }
2630
2631                 rc = lod_cache_parent_striping(env, lp, child_mode);
2632                 if (rc != 0)
2633                         return;
2634
2635                 /* transfer defaults to new directory */
2636                 if (lp->ldo_striping_cached) {
2637                         if (lp->ldo_pool)
2638                                 lod_object_set_pool(lc, lp->ldo_pool);
2639                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2640                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2641                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2642                         lc->ldo_striping_cached = 1;
2643                         lc->ldo_def_striping_set = 1;
2644                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2645                                (int)lc->ldo_def_stripe_size,
2646                                (int)lc->ldo_def_stripe_offset,
2647                                (int)lc->ldo_def_stripenr);
2648                 }
2649
2650                 /* transfer dir defaults to new directory */
2651                 if (lp->ldo_dir_striping_cached) {
2652                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2653                         lc->ldo_dir_def_stripe_offset =
2654                                                   lp->ldo_dir_def_stripe_offset;
2655                         lc->ldo_dir_def_hash_type =
2656                                                   lp->ldo_dir_def_hash_type;
2657                         lc->ldo_dir_striping_cached = 1;
2658                         lc->ldo_dir_def_striping_set = 1;
2659                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2660                                (int)lc->ldo_dir_def_stripenr,
2661                                (int)lc->ldo_dir_def_stripe_offset,
2662                                lc->ldo_dir_def_hash_type);
2663                 }
2664
2665                 /* It should always honour the specified stripes */
2666                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2667                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2668
2669                         rc = lod_verify_md_striping(d, lum1);
2670                         if (rc == 0 &&
2671                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2672                                 /* Directory will be striped only if
2673                                  * stripe_count > 1 */
2674                                 lc->ldo_stripenr =
2675                                         le32_to_cpu(lum1->lum_stripe_count);
2676                                 lc->ldo_dir_stripe_offset =
2677                                         le32_to_cpu(lum1->lum_stripe_offset);
2678                                 lc->ldo_dir_hash_type =
2679                                         le32_to_cpu(lum1->lum_hash_type);
2680                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2681                                        lc->ldo_stripenr,
2682                                        (int)lc->ldo_dir_stripe_offset);
2683                         }
2684                 /* then check whether there is default stripes from parent */
2685                 } else if (lp->ldo_dir_def_striping_set) {
2686                         /* If there are default dir stripe from parent */
2687                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2688                         lc->ldo_dir_stripe_offset =
2689                                         lp->ldo_dir_def_stripe_offset;
2690                         lc->ldo_dir_hash_type =
2691                                         lp->ldo_dir_def_hash_type;
2692                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2693                                lc->ldo_stripenr,
2694                                (int)lc->ldo_dir_stripe_offset);
2695                 } else {
2696                         /* set default stripe for this directory */
2697                         lc->ldo_stripenr = 0;
2698                         lc->ldo_dir_stripe_offset = -1;
2699                 }
2700
2701                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2702                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2703
2704                 goto out;
2705         }
2706
2707         /*
2708          * if object is going to be striped over OSTs, transfer default
2709          * striping information to the child, so that we can use it
2710          * during declaration and creation
2711          */
2712         if (!lod_object_will_be_striped(S_ISREG(child_mode),
2713                                         lu_object_fid(&child->do_lu)))
2714                 goto out;
2715         /*
2716          * try from the parent
2717          */
2718         if (likely(parent)) {
2719                 lod_cache_parent_striping(env, lp, child_mode);
2720
2721                 lc->ldo_def_stripe_offset = (__u16) -1;
2722
2723                 if (lp->ldo_def_striping_set) {
2724                         if (lp->ldo_pool)
2725                                 lod_object_set_pool(lc, lp->ldo_pool);
2726                         lc->ldo_stripenr = lp->ldo_def_stripenr;
2727                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2728                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2729                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2730                                lc->ldo_stripenr, lc->ldo_stripe_size,
2731                                lp->ldo_pool ? lp->ldo_pool : "");
2732                 }
2733         }
2734
2735         /*
2736          * if the parent doesn't provide with specific pattern, grab fs-wide one
2737          */
2738         desc = &d->lod_desc;
2739         if (lc->ldo_stripenr == 0)
2740                 lc->ldo_stripenr = desc->ld_default_stripe_count;
2741         if (lc->ldo_stripe_size == 0)
2742                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2743         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2744                lc->ldo_stripenr, lc->ldo_stripe_size,
2745                lc->ldo_pool ? lc->ldo_pool : "");
2746
2747 out:
2748         /* we do not cache stripe information for slave stripe, see
2749          * lod_xattr_set_lov_on_dir */
2750         if (lp != NULL && lp->ldo_dir_slave_stripe)
2751                 lod_lov_stripe_cache_clear(lp);
2752
2753         EXIT;
2754 }
2755
2756 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
2757 /*
2758  * this function handles a special case when truncate was done
2759  * on a stripeless object and now striping is being created
2760  * we can't lose that size, so we have to propagate it to newly
2761  * created object
2762  */
2763 static int lod_declare_init_size(const struct lu_env *env,
2764                                  struct dt_object *dt, struct thandle *th)
2765 {
2766         struct dt_object   *next = dt_object_child(dt);
2767         struct lod_object  *lo = lod_dt_obj(dt);
2768         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
2769         uint64_t            size, offs;
2770         int                 rc, stripe;
2771         ENTRY;
2772
2773         /* XXX: we support the simplest (RAID0) striping so far */
2774         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2775         LASSERT(lo->ldo_stripe_size > 0);
2776
2777         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2778         LASSERT(attr->la_valid & LA_SIZE);
2779         if (rc)
2780                 RETURN(rc);
2781
2782         size = attr->la_size;
2783         if (size == 0)
2784                 RETURN(0);
2785
2786         /* ll_do_div64(a, b) returns a % b, and a = a / b */
2787         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2788         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2789
2790         size = size * lo->ldo_stripe_size;
2791         offs = attr->la_size;
2792         size += ll_do_div64(offs, lo->ldo_stripe_size);
2793
2794         attr->la_valid = LA_SIZE;
2795         attr->la_size = size;
2796
2797         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2798
2799         RETURN(rc);
2800 }
2801
2802 /**
2803  * Create declaration of striped object
2804  */
2805 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2806                                struct lu_attr *attr,
2807                                const struct lu_buf *lovea, struct thandle *th)
2808 {
2809         struct lod_thread_info  *info = lod_env_info(env);
2810         struct dt_object        *next = dt_object_child(dt);
2811         struct lod_object       *lo = lod_dt_obj(dt);
2812         int                      rc;
2813         ENTRY;
2814
2815         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2816                 /* failed to create striping, let's reset
2817                  * config so that others don't get confused */
2818                 lod_object_free_striping(env, lo);
2819                 GOTO(out, rc = -ENOMEM);
2820         }
2821
2822         if (!dt_object_remote(next)) {
2823                 /* choose OST and generate appropriate objects */
2824                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2825                 if (rc) {
2826                         /* failed to create striping, let's reset
2827                          * config so that others don't get confused */
2828                         lod_object_free_striping(env, lo);
2829                         GOTO(out, rc);
2830                 }
2831
2832                 /*
2833                  * declare storage for striping data
2834                  */
2835                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2836                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
2837         } else {
2838                 /* LOD can not choose OST objects for remote objects, i.e.
2839                  * stripes must be ready before that. Right now, it can only
2840                  * happen during migrate, i.e. migrate process needs to create
2841                  * remote regular file (mdd_migrate_create), then the migrate
2842                  * process will provide stripeEA. */
2843                 LASSERT(lovea != NULL);
2844                 info->lti_buf = *lovea;
2845         }
2846
2847         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2848                                   XATTR_NAME_LOV, 0, th);
2849         if (rc)
2850                 GOTO(out, rc);
2851
2852         /*
2853          * if striping is created with local object's size > 0,
2854          * we have to propagate this size to specific object
2855          * the case is possible only when local object was created previously
2856          */
2857         if (dt_object_exists(next))
2858                 rc = lod_declare_init_size(env, dt, th);
2859
2860 out:
2861         RETURN(rc);
2862 }
2863
2864 static int lod_declare_object_create(const struct lu_env *env,
2865                                      struct dt_object *dt,
2866                                      struct lu_attr *attr,
2867                                      struct dt_allocation_hint *hint,
2868                                      struct dt_object_format *dof,
2869                                      struct thandle *th)
2870 {
2871         struct dt_object   *next = dt_object_child(dt);
2872         struct lod_object  *lo = lod_dt_obj(dt);
2873         int                 rc;
2874         ENTRY;
2875
2876         LASSERT(dof);
2877         LASSERT(attr);
2878         LASSERT(th);
2879
2880         /*
2881          * first of all, we declare creation of local object
2882          */
2883         rc = dt_declare_create(env, next, attr, hint, dof, th);
2884         if (rc)
2885                 GOTO(out, rc);
2886
2887         if (dof->dof_type == DFT_SYM)
2888                 dt->do_body_ops = &lod_body_lnk_ops;
2889
2890         /*
2891          * it's lod_ah_init() who has decided the object will striped
2892          */
2893         if (dof->dof_type == DFT_REGULAR) {
2894                 /* callers don't want stripes */
2895                 /* XXX: all tricky interactions with ->ah_make_hint() decided
2896                  * to use striping, then ->declare_create() behaving differently
2897                  * should be cleaned */
2898                 if (dof->u.dof_reg.striped == 0)
2899                         lo->ldo_stripenr = 0;
2900                 if (lo->ldo_stripenr > 0)
2901                         rc = lod_declare_striped_object(env, dt, attr,
2902                                                         NULL, th);
2903         } else if (dof->dof_type == DFT_DIR) {
2904                 /* Orphan object (like migrating object) does not have
2905                  * lod_dir_stripe, see lod_ah_init */
2906                 if (lo->ldo_dir_stripe != NULL)
2907                         rc = lod_declare_dir_striping_create(env, dt, attr,
2908                                                              dof, th);
2909         }
2910 out:
2911         RETURN(rc);
2912 }
2913
2914 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2915                         struct lu_attr *attr, struct dt_object_format *dof,
2916                         struct thandle *th)
2917 {
2918         struct lod_object *lo = lod_dt_obj(dt);
2919         int                rc = 0, i;
2920         ENTRY;
2921
2922         LASSERT(lo->ldo_striping_cached == 0);
2923
2924         /* create all underlying objects */
2925         for (i = 0; i < lo->ldo_stripenr; i++) {
2926                 LASSERT(lo->ldo_stripe[i]);
2927                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2928
2929                 if (rc)
2930                         break;
2931         }
2932         if (rc == 0)
2933                 rc = lod_generate_and_set_lovea(env, lo, th);
2934
2935         RETURN(rc);
2936 }
2937
2938 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2939                              struct lu_attr *attr,
2940                              struct dt_allocation_hint *hint,
2941                              struct dt_object_format *dof, struct thandle *th)
2942 {
2943         struct dt_object   *next = dt_object_child(dt);
2944         struct lod_object  *lo = lod_dt_obj(dt);
2945         int                 rc;
2946         ENTRY;
2947
2948         /* create local object */
2949         rc = dt_create(env, next, attr, hint, dof, th);
2950         if (rc != 0)
2951                 RETURN(rc);
2952
2953         if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2954             lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2955                 rc = lod_striping_create(env, dt, attr, dof, th);
2956
2957         RETURN(rc);
2958 }
2959
2960 static int lod_declare_object_destroy(const struct lu_env *env,
2961                                       struct dt_object *dt,
2962                                       struct thandle *th)
2963 {
2964         struct dt_object   *next = dt_object_child(dt);
2965         struct lod_object  *lo = lod_dt_obj(dt);
2966         struct lod_thread_info *info = lod_env_info(env);
2967         char               *stripe_name = info->lti_key;
2968         int                 rc, i;
2969         ENTRY;
2970
2971         /*
2972          * load striping information, notice we don't do this when object
2973          * is being initialized as we don't need this information till
2974          * few specific cases like destroy, chown
2975          */
2976         rc = lod_load_striping(env, lo);
2977         if (rc)
2978                 RETURN(rc);
2979
2980         /* declare destroy for all underlying objects */
2981         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2982                 rc = next->do_ops->do_index_try(env, next,
2983                                                 &dt_directory_features);
2984                 if (rc != 0)
2985                         RETURN(rc);
2986
2987                 for (i = 0; i < lo->ldo_stripenr; i++) {
2988                         rc = dt_declare_ref_del(env, next, th);
2989                         if (rc != 0)
2990                                 RETURN(rc);
2991                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2992                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2993                                 i);
2994                         rc = dt_declare_delete(env, next,
2995                                         (const struct dt_key *)stripe_name, th);
2996                         if (rc != 0)
2997                                 RETURN(rc);
2998                 }
2999         }
3000         /*
3001          * we declare destroy for the local object
3002          */
3003         rc = dt_declare_destroy(env, next, th);
3004         if (rc)
3005                 RETURN(rc);
3006
3007         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3008                 RETURN(0);
3009
3010         /* declare destroy all striped objects */
3011         for (i = 0; i < lo->ldo_stripenr; i++) {
3012                 if (likely(lo->ldo_stripe[i] != NULL)) {
3013                         rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
3014                         if (rc != 0)
3015                                 break;
3016                 }
3017         }
3018
3019         RETURN(rc);
3020 }
3021
3022 static int lod_object_destroy(const struct lu_env *env,
3023                 struct dt_object *dt, struct thandle *th)
3024 {
3025         struct dt_object  *next = dt_object_child(dt);
3026         struct lod_object *lo = lod_dt_obj(dt);
3027         struct lod_thread_info *info = lod_env_info(env);
3028         char               *stripe_name = info->lti_key;
3029         int                rc, i;
3030         ENTRY;
3031
3032         /* destroy sub-stripe of master object */
3033         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
3034                 rc = next->do_ops->do_index_try(env, next,
3035                                                 &dt_directory_features);
3036                 if (rc != 0)
3037                         RETURN(rc);
3038
3039                 for (i = 0; i < lo->ldo_stripenr; i++) {
3040                         rc = dt_ref_del(env, next, th);
3041                         if (rc != 0)
3042                                 RETURN(rc);
3043
3044                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
3045                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
3046                                 i);
3047
3048                         CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
3049                                PFID(lu_object_fid(&dt->do_lu)), stripe_name,
3050                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
3051
3052                         rc = dt_delete(env, next,
3053                                        (const struct dt_key *)stripe_name,
3054                                        th, BYPASS_CAPA);
3055                         if (rc != 0)
3056                                 RETURN(rc);
3057                 }
3058         }
3059         rc = dt_destroy(env, next, th);
3060         if (rc != 0)
3061                 RETURN(rc);
3062
3063         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3064                 RETURN(0);
3065
3066         /* destroy all striped objects */
3067         for (i = 0; i < lo->ldo_stripenr; i++) {
3068                 if (likely(lo->ldo_stripe[i] != NULL) &&
3069                     (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
3070                      i == cfs_fail_val)) {
3071                         rc = dt_destroy(env, lo->ldo_stripe[i], th);
3072                         if (rc != 0)
3073                                 break;
3074                 }
3075         }
3076
3077         RETURN(rc);
3078 }
3079
3080 static int lod_declare_ref_add(const struct lu_env *env,
3081                                struct dt_object *dt, struct thandle *th)
3082 {
3083         return dt_declare_ref_add(env, dt_object_child(dt), th);
3084 }
3085
3086 static int lod_ref_add(const struct lu_env *env,
3087                        struct dt_object *dt, struct thandle *th)
3088 {
3089         return dt_ref_add(env, dt_object_child(dt), th);
3090 }
3091
3092 static int lod_declare_ref_del(const struct lu_env *env,
3093                                struct dt_object *dt, struct thandle *th)
3094 {
3095         return dt_declare_ref_del(env, dt_object_child(dt), th);
3096 }
3097
3098 static int lod_ref_del(const struct lu_env *env,
3099                        struct dt_object *dt, struct thandle *th)
3100 {
3101         return dt_ref_del(env, dt_object_child(dt), th);
3102 }
3103
3104 static struct obd_capa *lod_capa_get(const struct lu_env *env,
3105                                      struct dt_object *dt,
3106                                      struct lustre_capa *old, __u64 opc)
3107 {
3108         return dt_capa_get(env, dt_object_child(dt), old, opc);
3109 }
3110
3111 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
3112                            __u64 start, __u64 end)
3113 {
3114         return dt_object_sync(env, dt_object_child(dt), start, end);
3115 }
3116
3117 struct lod_slave_locks  {
3118         int                     lsl_lock_count;
3119         struct lustre_handle    lsl_handle[0];
3120 };
3121
3122 static int lod_object_unlock_internal(const struct lu_env *env,
3123                                       struct dt_object *dt,
3124                                       struct ldlm_enqueue_info *einfo,
3125                                       ldlm_policy_data_t *policy)
3126 {
3127         struct lod_object       *lo = lod_dt_obj(dt);
3128         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
3129         int                     rc = 0;
3130         int                     i;
3131         ENTRY;
3132
3133         if (slave_locks == NULL)
3134                 RETURN(0);
3135
3136         for (i = 1; i < slave_locks->lsl_lock_count; i++) {
3137                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
3138                         int     rc1;
3139
3140                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
3141                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
3142                                                policy);
3143                         if (rc1 < 0)
3144                                 rc = rc == 0 ? rc1 : rc;
3145                 }
3146         }
3147
3148         RETURN(rc);
3149 }
3150
3151 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
3152                              struct ldlm_enqueue_info *einfo,
3153                              union ldlm_policy_data *policy)
3154 {
3155         struct lod_object       *lo = lod_dt_obj(dt);
3156         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
3157         int                     slave_locks_size;
3158         int                     rc;
3159         ENTRY;
3160
3161         if (slave_locks == NULL)
3162                 RETURN(0);
3163
3164         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3165                 RETURN(-ENOTDIR);
3166
3167         rc = lod_load_striping(env, lo);
3168         if (rc != 0)
3169                 RETURN(rc);
3170
3171         /* Note: for remote lock for single stripe dir, MDT will cancel
3172          * the lock by lockh directly */
3173         if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
3174                 RETURN(0);
3175
3176         /* Only cancel slave lock for striped dir */
3177         rc = lod_object_unlock_internal(env, dt, einfo, policy);
3178
3179         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
3180                            sizeof(slave_locks->lsl_handle[0]);
3181         OBD_FREE(slave_locks, slave_locks_size);
3182         einfo->ei_cbdata = NULL;
3183
3184         RETURN(rc);
3185 }
3186
3187 static int lod_object_lock(const struct lu_env *env,
3188                            struct dt_object *dt,
3189                            struct lustre_handle *lh,
3190                            struct ldlm_enqueue_info *einfo,
3191                            union ldlm_policy_data *policy)
3192 {
3193         struct lod_object       *lo = lod_dt_obj(dt);
3194         int                     rc = 0;
3195         int                     i;
3196         int                     slave_locks_size;
3197         struct lod_slave_locks  *slave_locks = NULL;
3198         ENTRY;
3199
3200         /* remote object lock */
3201         if (!einfo->ei_enq_slave) {
3202                 LASSERT(dt_object_remote(dt));
3203                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
3204                                       policy);
3205         }
3206
3207         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3208                 RETURN(-ENOTDIR);
3209
3210         rc = lod_load_striping(env, lo);
3211         if (rc != 0)
3212                 RETURN(rc);
3213
3214         /* No stripes */
3215         if (lo->ldo_stripenr <= 1)
3216                 RETURN(0);
3217
3218         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
3219                            sizeof(slave_locks->lsl_handle[0]);
3220         /* Freed in lod_object_unlock */
3221         OBD_ALLOC(slave_locks, slave_locks_size);
3222         if (slave_locks == NULL)
3223                 RETURN(-ENOMEM);
3224         slave_locks->lsl_lock_count = lo->ldo_stripenr;
3225
3226         /* striped directory lock */
3227         for (i = 1; i < lo->ldo_stripenr; i++) {
3228                 struct lustre_handle    lockh;
3229                 struct ldlm_res_id      *res_id;
3230
3231                 res_id = &lod_env_info(env)->lti_res_id;
3232                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3233                                        res_id);
3234                 einfo->ei_res_id = res_id;
3235
3236                 LASSERT(lo->ldo_stripe[i]);
3237                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3238                                     policy);
3239                 if (rc != 0)
3240                         GOTO(out, rc);
3241                 slave_locks->lsl_handle[i] = lockh;
3242         }
3243
3244         einfo->ei_cbdata = slave_locks;
3245
3246 out:
3247         if (rc != 0 && slave_locks != NULL) {
3248                 einfo->ei_cbdata = slave_locks;
3249                 lod_object_unlock_internal(env, dt, einfo, policy);
3250                 OBD_FREE(slave_locks, slave_locks_size);
3251                 einfo->ei_cbdata = NULL;
3252         }
3253
3254         RETURN(rc);
3255 }
3256
3257 struct dt_object_operations lod_obj_ops = {
3258         .do_read_lock           = lod_object_read_lock,
3259         .do_write_lock          = lod_object_write_lock,
3260         .do_read_unlock         = lod_object_read_unlock,
3261         .do_write_unlock        = lod_object_write_unlock,
3262         .do_write_locked        = lod_object_write_locked,
3263         .do_attr_get            = lod_attr_get,
3264         .do_declare_attr_set    = lod_declare_attr_set,
3265         .do_attr_set            = lod_attr_set,
3266         .do_xattr_get           = lod_xattr_get,
3267         .do_declare_xattr_set   = lod_declare_xattr_set,
3268         .do_xattr_set           = lod_xattr_set,
3269         .do_declare_xattr_del   = lod_declare_xattr_del,
3270         .do_xattr_del           = lod_xattr_del,
3271         .do_xattr_list          = lod_xattr_list,
3272         .do_ah_init             = lod_ah_init,
3273         .do_declare_create      = lod_declare_object_create,
3274         .do_create              = lod_object_create,
3275         .do_declare_destroy     = lod_declare_object_destroy,
3276         .do_destroy             = lod_object_destroy,
3277         .do_index_try           = lod_index_try,
3278         .do_declare_ref_add     = lod_declare_ref_add,
3279         .do_ref_add             = lod_ref_add,
3280         .do_declare_ref_del     = lod_declare_ref_del,
3281         .do_ref_del             = lod_ref_del,
3282         .do_capa_get            = lod_capa_get,
3283         .do_object_sync         = lod_object_sync,
3284         .do_object_lock         = lod_object_lock,
3285         .do_object_unlock       = lod_object_unlock,
3286 };
3287
3288 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3289                         struct lu_buf *buf, loff_t *pos,
3290                         struct lustre_capa *capa)
3291 {
3292         struct dt_object *next = dt_object_child(dt);
3293         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3294 }
3295
3296 static ssize_t lod_declare_write(const struct lu_env *env,
3297                                  struct dt_object *dt,
3298                                  const struct lu_buf *buf, loff_t pos,
3299                                  struct thandle *th)
3300 {
3301         return dt_declare_record_write(env, dt_object_child(dt),
3302                                        buf, pos, th);
3303 }
3304
3305 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3306                          const struct lu_buf *buf, loff_t *pos,
3307                          struct thandle *th, struct lustre_capa *capa, int iq)
3308 {
3309         struct dt_object *next = dt_object_child(dt);
3310         LASSERT(next);
3311         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3312 }
3313
3314 static const struct dt_body_operations lod_body_lnk_ops = {
3315         .dbo_read               = lod_read,
3316         .dbo_declare_write      = lod_declare_write,
3317         .dbo_write              = lod_write
3318 };
3319
3320 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3321                            const struct lu_object_conf *conf)
3322 {
3323         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
3324         struct lu_device        *cdev   = NULL;
3325         struct lu_object        *cobj;
3326         struct lod_tgt_descs    *ltd    = NULL;
3327         struct lod_tgt_desc     *tgt;
3328         mdsno_t                  idx    = 0;
3329         int                      type   = LU_SEQ_RANGE_ANY;
3330         int                      rc;
3331         ENTRY;
3332
3333         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3334         if (rc != 0)
3335                 RETURN(rc);
3336
3337         if (type == LU_SEQ_RANGE_MDT &&
3338             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3339                 cdev = &lod->lod_child->dd_lu_dev;
3340         } else if (type == LU_SEQ_RANGE_MDT) {
3341                 ltd = &lod->lod_mdt_descs;
3342                 lod_getref(ltd);
3343         } else if (type == LU_SEQ_RANGE_OST) {
3344                 ltd = &lod->lod_ost_descs;
3345                 lod_getref(ltd);
3346         } else {
3347                 LBUG();
3348         }
3349
3350         if (ltd != NULL) {
3351                 if (ltd->ltd_tgts_size > idx &&
3352                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3353                         tgt = LTD_TGT(ltd, idx);
3354
3355                         LASSERT(tgt != NULL);
3356                         LASSERT(tgt->ltd_tgt != NULL);
3357
3358                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
3359                 }
3360                 lod_putref(lod, ltd);
3361         }
3362
3363         if (unlikely(cdev == NULL))
3364                 RETURN(-ENOENT);
3365
3366         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3367         if (unlikely(cobj == NULL))
3368                 RETURN(-ENOMEM);
3369
3370         lu_object_add(lo, cobj);
3371
3372         RETURN(0);
3373 }
3374
3375 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3376 {
3377         int i;
3378
3379         if (lo->ldo_dir_stripe != NULL) {
3380                 OBD_FREE_PTR(lo->ldo_dir_stripe);
3381                 lo->ldo_dir_stripe = NULL;
3382         }
3383
3384         if (lo->ldo_stripe) {
3385                 LASSERT(lo->ldo_stripes_allocated > 0);
3386
3387                 for (i = 0; i < lo->ldo_stripenr; i++) {
3388                         if (lo->ldo_stripe[i])
3389                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3390                 }
3391
3392                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3393                 OBD_FREE(lo->ldo_stripe, i);
3394                 lo->ldo_stripe = NULL;
3395                 lo->ldo_stripes_allocated = 0;
3396         }
3397         lo->ldo_stripenr = 0;
3398         lo->ldo_pattern = 0;
3399 }
3400
3401 /*
3402  * ->start is called once all slices are initialized, including header's
3403  * cache for mode (object type). using the type we can initialize ops
3404  */
3405 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3406 {
3407         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3408                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3409         return 0;
3410 }
3411
3412 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3413 {
3414         struct lod_object *mo = lu2lod_obj(o);
3415
3416         /*
3417          * release all underlying object pinned
3418          */
3419
3420         lod_object_free_striping(env, mo);
3421
3422         lod_object_set_pool(mo, NULL);
3423
3424         lu_object_fini(o);
3425         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3426 }
3427
3428 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3429 {
3430         /* XXX: shouldn't we release everything here in case if object
3431          * creation failed before? */
3432 }
3433
3434 static int lod_object_print(const struct lu_env *env, void *cookie,
3435                             lu_printer_t p, const struct lu_object *l)
3436 {
3437         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3438
3439         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3440 }
3441
3442 struct lu_object_operations lod_lu_obj_ops = {
3443         .loo_object_init        = lod_object_init,
3444         .loo_object_start       = lod_object_start,
3445         .loo_object_free        = lod_object_free,
3446         .loo_object_release     = lod_object_release,
3447         .loo_object_print       = lod_object_print,
3448 };