Whamcloud - gitweb
LU-6406 tests: fix undersized fsname and FID strings
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * This file contains implementations of methods for the OSD API
32  * for the Logical Object Device (LOD) layer, which provides a virtual
33  * local OSD object interface to the MDD layer, and abstracts the
34  * addressing of local (OSD) and remote (OSP) objects. The API is
35  * described in the file lustre/include/dt_object.h and in
36  * lustre/doc/osd-api.txt.
37  *
38  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_MDS
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <lustre_fid.h>
50 #include <lustre_param.h>
51 #include <lustre_fid.h>
52 #include <lustre_lmv.h>
53 #include <md_object.h>
54 #include <lustre_linkea.h>
55
56 #include "lod_internal.h"
57
58 static const char dot[] = ".";
59 static const char dotdot[] = "..";
60
61 static const struct dt_body_operations lod_body_lnk_ops;
62
63 /**
64  * Implementation of dt_index_operations::dio_lookup
65  *
66  * Used with regular (non-striped) objects.
67  *
68  * \see dt_index_operations::dio_lookup() in the API description for details.
69  */
70 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
71                             struct dt_rec *rec, const struct dt_key *key)
72 {
73         struct dt_object *next = dt_object_child(dt);
74         return next->do_index_ops->dio_lookup(env, next, rec, key);
75 }
76
77 /**
78  * Implementation of dt_index_operations::dio_declare_insert.
79  *
80  * Used with regular (non-striped) objects.
81  *
82  * \see dt_index_operations::dio_declare_insert() in the API description
83  * for details.
84  */
85 static int lod_declare_index_insert(const struct lu_env *env,
86                                     struct dt_object *dt,
87                                     const struct dt_rec *rec,
88                                     const struct dt_key *key,
89                                     struct thandle *th)
90 {
91         return lod_sub_object_declare_insert(env, dt_object_child(dt),
92                                              rec, key, th);
93 }
94
95 /**
96  * Implementation of dt_index_operations::dio_insert.
97  *
98  * Used with regular (non-striped) objects
99  *
100  * \see dt_index_operations::dio_insert() in the API description for details.
101  */
102 static int lod_index_insert(const struct lu_env *env,
103                             struct dt_object *dt,
104                             const struct dt_rec *rec,
105                             const struct dt_key *key,
106                             struct thandle *th,
107                             int ign)
108 {
109         return lod_sub_object_index_insert(env, dt_object_child(dt), rec, key,
110                                            th, ign);
111 }
112
113 /**
114  * Implementation of dt_index_operations::dio_declare_delete.
115  *
116  * Used with regular (non-striped) objects.
117  *
118  * \see dt_index_operations::dio_declare_delete() in the API description
119  * for details.
120  */
121 static int lod_declare_index_delete(const struct lu_env *env,
122                                     struct dt_object *dt,
123                                     const struct dt_key *key,
124                                     struct thandle *th)
125 {
126         return lod_sub_object_declare_delete(env, dt_object_child(dt), key,
127                                              th);
128 }
129
130 /**
131  * Implementation of dt_index_operations::dio_delete.
132  *
133  * Used with regular (non-striped) objects.
134  *
135  * \see dt_index_operations::dio_delete() in the API description for details.
136  */
137 static int lod_index_delete(const struct lu_env *env,
138                             struct dt_object *dt,
139                             const struct dt_key *key,
140                             struct thandle *th)
141 {
142         return lod_sub_object_delete(env, dt_object_child(dt), key, th);
143 }
144
145 /**
146  * Implementation of dt_it_ops::init.
147  *
148  * Used with regular (non-striped) objects.
149  *
150  * \see dt_it_ops::init() in the API description for details.
151  */
152 static struct dt_it *lod_it_init(const struct lu_env *env,
153                                  struct dt_object *dt, __u32 attr)
154 {
155         struct dt_object        *next = dt_object_child(dt);
156         struct lod_it           *it = &lod_env_info(env)->lti_it;
157         struct dt_it            *it_next;
158
159         it_next = next->do_index_ops->dio_it.init(env, next, attr);
160         if (IS_ERR(it_next))
161                 return it_next;
162
163         /* currently we do not use more than one iterator per thread
164          * so we store it in thread info. if at some point we need
165          * more active iterators in a single thread, we can allocate
166          * additional ones */
167         LASSERT(it->lit_obj == NULL);
168
169         it->lit_it = it_next;
170         it->lit_obj = next;
171
172         return (struct dt_it *)it;
173 }
174
175 #define LOD_CHECK_IT(env, it)                                   \
176 do {                                                            \
177         LASSERT((it)->lit_obj != NULL);                         \
178         LASSERT((it)->lit_it != NULL);                          \
179 } while (0)
180
181 /**
182  * Implementation of dt_index_operations::dio_it.fini.
183  *
184  * Used with regular (non-striped) objects.
185  *
186  * \see dt_index_operations::dio_it.fini() in the API description for details.
187  */
188 static void lod_it_fini(const struct lu_env *env, struct dt_it *di)
189 {
190         struct lod_it *it = (struct lod_it *)di;
191
192         LOD_CHECK_IT(env, it);
193         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
194
195         /* the iterator not in use any more */
196         it->lit_obj = NULL;
197         it->lit_it = NULL;
198 }
199
200 /**
201  * Implementation of dt_it_ops::get.
202  *
203  * Used with regular (non-striped) objects.
204  *
205  * \see dt_it_ops::get() in the API description for details.
206  */
207 static int lod_it_get(const struct lu_env *env, struct dt_it *di,
208                       const struct dt_key *key)
209 {
210         const struct lod_it *it = (const struct lod_it *)di;
211
212         LOD_CHECK_IT(env, it);
213         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
214 }
215
216 /**
217  * Implementation of dt_it_ops::put.
218  *
219  * Used with regular (non-striped) objects.
220  *
221  * \see dt_it_ops::put() in the API description for details.
222  */
223 static void lod_it_put(const struct lu_env *env, struct dt_it *di)
224 {
225         struct lod_it *it = (struct lod_it *)di;
226
227         LOD_CHECK_IT(env, it);
228         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
229 }
230
231 /**
232  * Implementation of dt_it_ops::next.
233  *
234  * Used with regular (non-striped) objects
235  *
236  * \see dt_it_ops::next() in the API description for details.
237  */
238 static int lod_it_next(const struct lu_env *env, struct dt_it *di)
239 {
240         struct lod_it *it = (struct lod_it *)di;
241
242         LOD_CHECK_IT(env, it);
243         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
244 }
245
246 /**
247  * Implementation of dt_it_ops::key.
248  *
249  * Used with regular (non-striped) objects.
250  *
251  * \see dt_it_ops::key() in the API description for details.
252  */
253 static struct dt_key *lod_it_key(const struct lu_env *env,
254                                  const struct dt_it *di)
255 {
256         const struct lod_it *it = (const struct lod_it *)di;
257
258         LOD_CHECK_IT(env, it);
259         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
260 }
261
262 /**
263  * Implementation of dt_it_ops::key_size.
264  *
265  * Used with regular (non-striped) objects.
266  *
267  * \see dt_it_ops::key_size() in the API description for details.
268  */
269 static int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
270 {
271         struct lod_it *it = (struct lod_it *)di;
272
273         LOD_CHECK_IT(env, it);
274         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
275 }
276
277 /**
278  * Implementation of dt_it_ops::rec.
279  *
280  * Used with regular (non-striped) objects.
281  *
282  * \see dt_it_ops::rec() in the API description for details.
283  */
284 static int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
285                       struct dt_rec *rec, __u32 attr)
286 {
287         const struct lod_it *it = (const struct lod_it *)di;
288
289         LOD_CHECK_IT(env, it);
290         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
291                                                      attr);
292 }
293
294 /**
295  * Implementation of dt_it_ops::rec_size.
296  *
297  * Used with regular (non-striped) objects.
298  *
299  * \see dt_it_ops::rec_size() in the API description for details.
300  */
301 static int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
302                            __u32 attr)
303 {
304         const struct lod_it *it = (const struct lod_it *)di;
305
306         LOD_CHECK_IT(env, it);
307         return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
308                                                           attr);
309 }
310
311 /**
312  * Implementation of dt_it_ops::store.
313  *
314  * Used with regular (non-striped) objects.
315  *
316  * \see dt_it_ops::store() in the API description for details.
317  */
318 static __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
319 {
320         const struct lod_it *it = (const struct lod_it *)di;
321
322         LOD_CHECK_IT(env, it);
323         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
324 }
325
326 /**
327  * Implementation of dt_it_ops::load.
328  *
329  * Used with regular (non-striped) objects.
330  *
331  * \see dt_it_ops::load() in the API description for details.
332  */
333 static int lod_it_load(const struct lu_env *env, const struct dt_it *di,
334                        __u64 hash)
335 {
336         const struct lod_it *it = (const struct lod_it *)di;
337
338         LOD_CHECK_IT(env, it);
339         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
340 }
341
342 /**
343  * Implementation of dt_it_ops::key_rec.
344  *
345  * Used with regular (non-striped) objects.
346  *
347  * \see dt_it_ops::rec() in the API description for details.
348  */
349 static int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
350                           void *key_rec)
351 {
352         const struct lod_it *it = (const struct lod_it *)di;
353
354         LOD_CHECK_IT(env, it);
355         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
356                                                          key_rec);
357 }
358
359 static struct dt_index_operations lod_index_ops = {
360         .dio_lookup             = lod_index_lookup,
361         .dio_declare_insert     = lod_declare_index_insert,
362         .dio_insert             = lod_index_insert,
363         .dio_declare_delete     = lod_declare_index_delete,
364         .dio_delete             = lod_index_delete,
365         .dio_it = {
366                 .init           = lod_it_init,
367                 .fini           = lod_it_fini,
368                 .get            = lod_it_get,
369                 .put            = lod_it_put,
370                 .next           = lod_it_next,
371                 .key            = lod_it_key,
372                 .key_size       = lod_it_key_size,
373                 .rec            = lod_it_rec,
374                 .rec_size       = lod_it_rec_size,
375                 .store          = lod_it_store,
376                 .load           = lod_it_load,
377                 .key_rec        = lod_it_key_rec,
378         }
379 };
380
381 /**
382  * Implementation of dt_it_ops::init.
383  *
384  * Used with striped objects. Internally just initializes the iterator
385  * on the first stripe.
386  *
387  * \see dt_it_ops::init() in the API description for details.
388  */
389 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
390                                          struct dt_object *dt, __u32 attr)
391 {
392         struct lod_object       *lo = lod_dt_obj(dt);
393         struct dt_object        *next;
394         struct lod_it           *it = &lod_env_info(env)->lti_it;
395         struct dt_it            *it_next;
396         ENTRY;
397
398         LASSERT(lo->ldo_stripenr > 0);
399         next = lo->ldo_stripe[0];
400         LASSERT(next != NULL);
401         LASSERT(next->do_index_ops != NULL);
402
403         it_next = next->do_index_ops->dio_it.init(env, next, attr);
404         if (IS_ERR(it_next))
405                 return it_next;
406
407         /* currently we do not use more than one iterator per thread
408          * so we store it in thread info. if at some point we need
409          * more active iterators in a single thread, we can allocate
410          * additional ones */
411         LASSERT(it->lit_obj == NULL);
412
413         it->lit_stripe_index = 0;
414         it->lit_attr = attr;
415         it->lit_it = it_next;
416         it->lit_obj = dt;
417
418         return (struct dt_it *)it;
419 }
420
421 #define LOD_CHECK_STRIPED_IT(env, it, lo)                       \
422 do {                                                            \
423         LASSERT((it)->lit_obj != NULL);                         \
424         LASSERT((it)->lit_it != NULL);                          \
425         LASSERT((lo)->ldo_stripenr > 0);                        \
426         LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr);   \
427 } while (0)
428
429 /**
430  * Implementation of dt_it_ops::fini.
431  *
432  * Used with striped objects.
433  *
434  * \see dt_it_ops::fini() in the API description for details.
435  */
436 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
437 {
438         struct lod_it           *it = (struct lod_it *)di;
439         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
440         struct dt_object        *next;
441
442         LOD_CHECK_STRIPED_IT(env, it, lo);
443
444         next = lo->ldo_stripe[it->lit_stripe_index];
445         LASSERT(next != NULL);
446         LASSERT(next->do_index_ops != NULL);
447
448         next->do_index_ops->dio_it.fini(env, it->lit_it);
449
450         /* the iterator not in use any more */
451         it->lit_obj = NULL;
452         it->lit_it = NULL;
453         it->lit_stripe_index = 0;
454 }
455
456 /**
457  * Implementation of dt_it_ops::get.
458  *
459  * Right now it's not used widely, only to reset the iterator to the
460  * initial position. It should be possible to implement a full version
461  * which chooses a correct stripe to be able to position with any key.
462  *
463  * \see dt_it_ops::get() in the API description for details.
464  */
465 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
466                               const struct dt_key *key)
467 {
468         const struct lod_it     *it = (const struct lod_it *)di;
469         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
470         struct dt_object        *next;
471         ENTRY;
472
473         LOD_CHECK_STRIPED_IT(env, it, lo);
474
475         next = lo->ldo_stripe[it->lit_stripe_index];
476         LASSERT(next != NULL);
477         LASSERT(next->do_index_ops != NULL);
478
479         return next->do_index_ops->dio_it.get(env, it->lit_it, key);
480 }
481
482 /**
483  * Implementation of dt_it_ops::put.
484  *
485  * Used with striped objects.
486  *
487  * \see dt_it_ops::put() in the API description for details.
488  */
489 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
490 {
491         struct lod_it           *it = (struct lod_it *)di;
492         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
493         struct dt_object        *next;
494
495         LOD_CHECK_STRIPED_IT(env, it, lo);
496
497         next = lo->ldo_stripe[it->lit_stripe_index];
498         LASSERT(next != NULL);
499         LASSERT(next->do_index_ops != NULL);
500
501         return next->do_index_ops->dio_it.put(env, it->lit_it);
502 }
503
504 /**
505  * Implementation of dt_it_ops::next.
506  *
507  * Used with striped objects. When the end of the current stripe is
508  * reached, the method takes the next stripe's iterator.
509  *
510  * \see dt_it_ops::next() in the API description for details.
511  */
512 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
513 {
514         struct lod_it           *it = (struct lod_it *)di;
515         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
516         struct dt_object        *next;
517         struct dt_it            *it_next;
518         int                     rc;
519         ENTRY;
520
521         LOD_CHECK_STRIPED_IT(env, it, lo);
522
523         next = lo->ldo_stripe[it->lit_stripe_index];
524         LASSERT(next != NULL);
525         LASSERT(next->do_index_ops != NULL);
526 again:
527         rc = next->do_index_ops->dio_it.next(env, it->lit_it);
528         if (rc < 0)
529                 RETURN(rc);
530
531         if (rc == 0 && it->lit_stripe_index == 0)
532                 RETURN(rc);
533
534         if (rc == 0 && it->lit_stripe_index > 0) {
535                 struct lu_dirent *ent;
536
537                 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
538
539                 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
540                                                     (struct dt_rec *)ent,
541                                                     it->lit_attr);
542                 if (rc != 0)
543                         RETURN(rc);
544
545                 /* skip . and .. for slave stripe */
546                 if ((strncmp(ent->lde_name, ".",
547                              le16_to_cpu(ent->lde_namelen)) == 0 &&
548                      le16_to_cpu(ent->lde_namelen) == 1) ||
549                     (strncmp(ent->lde_name, "..",
550                              le16_to_cpu(ent->lde_namelen)) == 0 &&
551                      le16_to_cpu(ent->lde_namelen) == 2))
552                         goto again;
553
554                 RETURN(rc);
555         }
556
557         /* go to next stripe */
558         if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
559                 RETURN(1);
560
561         it->lit_stripe_index++;
562
563         next->do_index_ops->dio_it.put(env, it->lit_it);
564         next->do_index_ops->dio_it.fini(env, it->lit_it);
565
566         rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
567         if (rc != 0)
568                 RETURN(rc);
569
570         next = lo->ldo_stripe[it->lit_stripe_index];
571         LASSERT(next != NULL);
572         LASSERT(next->do_index_ops != NULL);
573
574         it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr);
575         if (!IS_ERR(it_next)) {
576                 it->lit_it = it_next;
577                 goto again;
578         } else {
579                 rc = PTR_ERR(it_next);
580         }
581
582         RETURN(rc);
583 }
584
585 /**
586  * Implementation of dt_it_ops::key.
587  *
588  * Used with striped objects.
589  *
590  * \see dt_it_ops::key() in the API description for details.
591  */
592 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
593                                          const struct dt_it *di)
594 {
595         const struct lod_it     *it = (const struct lod_it *)di;
596         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
597         struct dt_object        *next;
598
599         LOD_CHECK_STRIPED_IT(env, it, lo);
600
601         next = lo->ldo_stripe[it->lit_stripe_index];
602         LASSERT(next != NULL);
603         LASSERT(next->do_index_ops != NULL);
604
605         return next->do_index_ops->dio_it.key(env, it->lit_it);
606 }
607
608 /**
609  * Implementation of dt_it_ops::key_size.
610  *
611  * Used with striped objects.
612  *
613  * \see dt_it_ops::size() in the API description for details.
614  */
615 static int lod_striped_it_key_size(const struct lu_env *env,
616                                    const struct dt_it *di)
617 {
618         struct lod_it           *it = (struct lod_it *)di;
619         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
620         struct dt_object        *next;
621
622         LOD_CHECK_STRIPED_IT(env, it, lo);
623
624         next = lo->ldo_stripe[it->lit_stripe_index];
625         LASSERT(next != NULL);
626         LASSERT(next->do_index_ops != NULL);
627
628         return next->do_index_ops->dio_it.key_size(env, it->lit_it);
629 }
630
631 /**
632  * Implementation of dt_it_ops::rec.
633  *
634  * Used with striped objects.
635  *
636  * \see dt_it_ops::rec() in the API description for details.
637  */
638 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
639                               struct dt_rec *rec, __u32 attr)
640 {
641         const struct lod_it     *it = (const struct lod_it *)di;
642         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
643         struct dt_object        *next;
644
645         LOD_CHECK_STRIPED_IT(env, it, lo);
646
647         next = lo->ldo_stripe[it->lit_stripe_index];
648         LASSERT(next != NULL);
649         LASSERT(next->do_index_ops != NULL);
650
651         return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
652 }
653
654 /**
655  * Implementation of dt_it_ops::rec_size.
656  *
657  * Used with striped objects.
658  *
659  * \see dt_it_ops::rec_size() in the API description for details.
660  */
661 static int lod_striped_it_rec_size(const struct lu_env *env,
662                                    const struct dt_it *di, __u32 attr)
663 {
664         struct lod_it           *it = (struct lod_it *)di;
665         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
666         struct dt_object        *next;
667
668         LOD_CHECK_STRIPED_IT(env, it, lo);
669
670         next = lo->ldo_stripe[it->lit_stripe_index];
671         LASSERT(next != NULL);
672         LASSERT(next->do_index_ops != NULL);
673
674         return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
675 }
676
677 /**
678  * Implementation of dt_it_ops::store.
679  *
680  * Used with striped objects.
681  *
682  * \see dt_it_ops::store() in the API description for details.
683  */
684 static __u64 lod_striped_it_store(const struct lu_env *env,
685                                   const struct dt_it *di)
686 {
687         const struct lod_it     *it = (const struct lod_it *)di;
688         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
689         struct dt_object        *next;
690
691         LOD_CHECK_STRIPED_IT(env, it, lo);
692
693         next = lo->ldo_stripe[it->lit_stripe_index];
694         LASSERT(next != NULL);
695         LASSERT(next->do_index_ops != NULL);
696
697         return next->do_index_ops->dio_it.store(env, it->lit_it);
698 }
699
700 /**
701  * Implementation of dt_it_ops::load.
702  *
703  * Used with striped objects.
704  *
705  * \see dt_it_ops::load() in the API description for details.
706  */
707 static int lod_striped_it_load(const struct lu_env *env,
708                                const struct dt_it *di, __u64 hash)
709 {
710         const struct lod_it     *it = (const struct lod_it *)di;
711         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
712         struct dt_object        *next;
713
714         LOD_CHECK_STRIPED_IT(env, it, lo);
715
716         next = lo->ldo_stripe[it->lit_stripe_index];
717         LASSERT(next != NULL);
718         LASSERT(next->do_index_ops != NULL);
719
720         return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
721 }
722
723 static struct dt_index_operations lod_striped_index_ops = {
724         .dio_lookup             = lod_index_lookup,
725         .dio_declare_insert     = lod_declare_index_insert,
726         .dio_insert             = lod_index_insert,
727         .dio_declare_delete     = lod_declare_index_delete,
728         .dio_delete             = lod_index_delete,
729         .dio_it = {
730                 .init           = lod_striped_it_init,
731                 .fini           = lod_striped_it_fini,
732                 .get            = lod_striped_it_get,
733                 .put            = lod_striped_it_put,
734                 .next           = lod_striped_it_next,
735                 .key            = lod_striped_it_key,
736                 .key_size       = lod_striped_it_key_size,
737                 .rec            = lod_striped_it_rec,
738                 .rec_size       = lod_striped_it_rec_size,
739                 .store          = lod_striped_it_store,
740                 .load           = lod_striped_it_load,
741         }
742 };
743
744 /**
745  * Append the FID for each shard of the striped directory after the
746  * given LMV EA header.
747  *
748  * To simplify striped directory and the consistency verification,
749  * we only store the LMV EA header on disk, for both master object
750  * and slave objects. When someone wants to know the whole LMV EA,
751  * such as client readdir(), we can build the entrie LMV EA on the
752  * MDT side (in RAM) via iterating the sub-directory entries that
753  * are contained in the master object of the stripe directory.
754  *
755  * For the master object of the striped directroy, the valid name
756  * for each shard is composed of the ${shard_FID}:${shard_idx}.
757  *
758  * There may be holes in the LMV EA if some shards' name entries
759  * are corrupted or lost.
760  *
761  * \param[in] env       pointer to the thread context
762  * \param[in] lo        pointer to the master object of the striped directory
763  * \param[in] buf       pointer to the lu_buf which will hold the LMV EA
764  * \param[in] resize    whether re-allocate the buffer if it is not big enough
765  *
766  * \retval              positive size of the LMV EA
767  * \retval              0 for nothing to be loaded
768  * \retval              negative error number on failure
769  */
770 int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
771                         struct lu_buf *buf, bool resize)
772 {
773         struct lu_dirent        *ent    =
774                         (struct lu_dirent *)lod_env_info(env)->lti_key;
775         struct lod_device       *lod    = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
776         struct dt_object        *obj    = dt_object_child(&lo->ldo_obj);
777         struct lmv_mds_md_v1    *lmv1   = buf->lb_buf;
778         struct dt_it            *it;
779         const struct dt_it_ops  *iops;
780         __u32                    stripes;
781         __u32                    magic  = le32_to_cpu(lmv1->lmv_magic);
782         size_t                   lmv1_size;
783         int                      rc;
784         ENTRY;
785
786         /* If it is not a striped directory, then load nothing. */
787         if (magic != LMV_MAGIC_V1)
788                 RETURN(0);
789
790         /* If it is in migration (or failure), then load nothing. */
791         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
792                 RETURN(0);
793
794         stripes = le32_to_cpu(lmv1->lmv_stripe_count);
795         if (stripes < 1)
796                 RETURN(0);
797
798         rc = lmv_mds_md_size(stripes, magic);
799         if (rc < 0)
800                 RETURN(rc);
801         lmv1_size = rc;
802         if (buf->lb_len < lmv1_size) {
803                 struct lu_buf tbuf;
804
805                 if (!resize)
806                         RETURN(-ERANGE);
807
808                 tbuf = *buf;
809                 buf->lb_buf = NULL;
810                 buf->lb_len = 0;
811                 lu_buf_alloc(buf, lmv1_size);
812                 lmv1 = buf->lb_buf;
813                 if (lmv1 == NULL)
814                         RETURN(-ENOMEM);
815
816                 memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
817         }
818
819         if (unlikely(!dt_try_as_dir(env, obj)))
820                 RETURN(-ENOTDIR);
821
822         memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
823         iops = &obj->do_index_ops->dio_it;
824         it = iops->init(env, obj, LUDA_64BITHASH);
825         if (IS_ERR(it))
826                 RETURN(PTR_ERR(it));
827
828         rc = iops->load(env, it, 0);
829         if (rc == 0)
830                 rc = iops->next(env, it);
831         else if (rc > 0)
832                 rc = 0;
833
834         while (rc == 0) {
835                 char             name[FID_LEN + 2] = "";
836                 struct lu_fid    fid;
837                 __u32            index;
838                 int              len;
839
840                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
841                 if (rc != 0)
842                         break;
843
844                 rc = -EIO;
845
846                 fid_le_to_cpu(&fid, &ent->lde_fid);
847                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
848                 if (ent->lde_name[0] == '.') {
849                         if (ent->lde_namelen == 1)
850                                 goto next;
851
852                         if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
853                                 goto next;
854                 }
855
856                 len = snprintf(name, sizeof(name),
857                                DFID":", PFID(&ent->lde_fid));
858                 /* The ent->lde_name is composed of ${FID}:${index} */
859                 if (ent->lde_namelen < len + 1 ||
860                     memcmp(ent->lde_name, name, len) != 0) {
861                         CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
862                                "%s: invalid shard name %.*s with the FID "DFID
863                                " for the striped directory "DFID", %s\n",
864                                lod2obd(lod)->obd_name, ent->lde_namelen,
865                                ent->lde_name, PFID(&fid),
866                                PFID(lu_object_fid(&obj->do_lu)),
867                                lod->lod_lmv_failout ? "failout" : "skip");
868
869                         if (lod->lod_lmv_failout)
870                                 break;
871
872                         goto next;
873                 }
874
875                 index = 0;
876                 do {
877                         if (ent->lde_name[len] < '0' ||
878                             ent->lde_name[len] > '9') {
879                                 CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
880                                        "%s: invalid shard name %.*s with the "
881                                        "FID "DFID" for the striped directory "
882                                        DFID", %s\n",
883                                        lod2obd(lod)->obd_name, ent->lde_namelen,
884                                        ent->lde_name, PFID(&fid),
885                                        PFID(lu_object_fid(&obj->do_lu)),
886                                        lod->lod_lmv_failout ?
887                                        "failout" : "skip");
888
889                                 if (lod->lod_lmv_failout)
890                                         break;
891
892                                 goto next;
893                         }
894
895                         index = index * 10 + ent->lde_name[len++] - '0';
896                 } while (len < ent->lde_namelen);
897
898                 if (len == ent->lde_namelen) {
899                         /* Out of LMV EA range. */
900                         if (index >= stripes) {
901                                 CERROR("%s: the shard %.*s for the striped "
902                                        "directory "DFID" is out of the known "
903                                        "LMV EA range [0 - %u], failout\n",
904                                        lod2obd(lod)->obd_name, ent->lde_namelen,
905                                        ent->lde_name,
906                                        PFID(lu_object_fid(&obj->do_lu)),
907                                        stripes - 1);
908
909                                 break;
910                         }
911
912                         /* The slot has been occupied. */
913                         if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
914                                 struct lu_fid fid0;
915
916                                 fid_le_to_cpu(&fid0,
917                                         &lmv1->lmv_stripe_fids[index]);
918                                 CERROR("%s: both the shard "DFID" and "DFID
919                                        " for the striped directory "DFID
920                                        " claim the same LMV EA slot at the "
921                                        "index %d, failout\n",
922                                        lod2obd(lod)->obd_name,
923                                        PFID(&fid0), PFID(&fid),
924                                        PFID(lu_object_fid(&obj->do_lu)), index);
925
926                                 break;
927                         }
928
929                         /* stored as LE mode */
930                         lmv1->lmv_stripe_fids[index] = ent->lde_fid;
931
932 next:
933                         rc = iops->next(env, it);
934                 }
935         }
936
937         iops->put(env, it);
938         iops->fini(env, it);
939
940         RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
941 }
942
943 /**
944  * Implementation of dt_object_operations::do_index_try.
945  *
946  * \see dt_object_operations::do_index_try() in the API description for details.
947  */
948 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
949                          const struct dt_index_features *feat)
950 {
951         struct lod_object       *lo = lod_dt_obj(dt);
952         struct dt_object        *next = dt_object_child(dt);
953         int                     rc;
954         ENTRY;
955
956         LASSERT(next->do_ops);
957         LASSERT(next->do_ops->do_index_try);
958
959         rc = lod_load_striping_locked(env, lo);
960         if (rc != 0)
961                 RETURN(rc);
962
963         rc = next->do_ops->do_index_try(env, next, feat);
964         if (rc != 0)
965                 RETURN(rc);
966
967         if (lo->ldo_stripenr > 0) {
968                 int i;
969
970                 for (i = 0; i < lo->ldo_stripenr; i++) {
971                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
972                                 continue;
973                         rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
974                                                 lo->ldo_stripe[i], feat);
975                         if (rc != 0)
976                                 RETURN(rc);
977                 }
978                 dt->do_index_ops = &lod_striped_index_ops;
979         } else {
980                 dt->do_index_ops = &lod_index_ops;
981         }
982
983         RETURN(rc);
984 }
985
986 /**
987  * Implementation of dt_object_operations::do_read_lock.
988  *
989  * \see dt_object_operations::do_read_lock() in the API description for details.
990  */
991 static void lod_object_read_lock(const struct lu_env *env,
992                                  struct dt_object *dt, unsigned role)
993 {
994         dt_read_lock(env, dt_object_child(dt), role);
995 }
996
997 /**
998  * Implementation of dt_object_operations::do_write_lock.
999  *
1000  * \see dt_object_operations::do_write_lock() in the API description for
1001  * details.
1002  */
1003 static void lod_object_write_lock(const struct lu_env *env,
1004                                   struct dt_object *dt, unsigned role)
1005 {
1006         dt_write_lock(env, dt_object_child(dt), role);
1007 }
1008
1009 /**
1010  * Implementation of dt_object_operations::do_read_unlock.
1011  *
1012  * \see dt_object_operations::do_read_unlock() in the API description for
1013  * details.
1014  */
1015 static void lod_object_read_unlock(const struct lu_env *env,
1016                                    struct dt_object *dt)
1017 {
1018         dt_read_unlock(env, dt_object_child(dt));
1019 }
1020
1021 /**
1022  * Implementation of dt_object_operations::do_write_unlock.
1023  *
1024  * \see dt_object_operations::do_write_unlock() in the API description for
1025  * details.
1026  */
1027 static void lod_object_write_unlock(const struct lu_env *env,
1028                                     struct dt_object *dt)
1029 {
1030         dt_write_unlock(env, dt_object_child(dt));
1031 }
1032
1033 /**
1034  * Implementation of dt_object_operations::do_write_locked.
1035  *
1036  * \see dt_object_operations::do_write_locked() in the API description for
1037  * details.
1038  */
1039 static int lod_object_write_locked(const struct lu_env *env,
1040                                    struct dt_object *dt)
1041 {
1042         return dt_write_locked(env, dt_object_child(dt));
1043 }
1044
1045 /**
1046  * Implementation of dt_object_operations::do_attr_get.
1047  *
1048  * \see dt_object_operations::do_attr_get() in the API description for details.
1049  */
1050 static int lod_attr_get(const struct lu_env *env,
1051                         struct dt_object *dt,
1052                         struct lu_attr *attr)
1053 {
1054         /* Note: for striped directory, client will merge attributes
1055          * from all of the sub-stripes see lmv_merge_attr(), and there
1056          * no MDD logic depend on directory nlink/size/time, so we can
1057          * always use master inode nlink and size for now. */
1058         return dt_attr_get(env, dt_object_child(dt), attr);
1059 }
1060
1061 /**
1062  * Mark all of the striped directory sub-stripes dead.
1063  *
1064  * When a striped object is a subject to removal, we have
1065  * to mark all the stripes to prevent further access to
1066  * them (e.g. create a new file in those). So we mark
1067  * all the stripes with LMV_HASH_FLAG_DEAD. The function
1068  * can be used to declare the changes and to apply them.
1069  * If the object isn't striped, then just return success.
1070  *
1071  * \param[in] env       execution environment
1072  * \param[in] dt        the striped object
1073  * \param[in] handle    transaction handle
1074  * \param[in] declare   whether to declare the change or apply
1075  *
1076  * \retval              0 on success
1077  * \retval              negative if failed
1078  **/
1079 static int lod_mark_dead_object(const struct lu_env *env,
1080                                 struct dt_object *dt,
1081                                 struct thandle *th,
1082                                 bool declare)
1083 {
1084         struct lod_object       *lo = lod_dt_obj(dt);
1085         struct lmv_mds_md_v1    *lmv;
1086         __u32                   dead_hash_type;
1087         int                     rc;
1088         int                     i;
1089
1090         ENTRY;
1091
1092         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1093                 RETURN(0);
1094
1095         rc = lod_load_striping_locked(env, lo);
1096         if (rc != 0)
1097                 RETURN(rc);
1098
1099         if (lo->ldo_stripenr == 0)
1100                 RETURN(0);
1101
1102         rc = lod_get_lmv_ea(env, lo);
1103         if (rc <= 0)
1104                 RETURN(rc);
1105
1106         lmv = lod_env_info(env)->lti_ea_store;
1107         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1108         dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
1109         lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
1110         for (i = 0; i < lo->ldo_stripenr; i++) {
1111                 struct lu_buf buf;
1112
1113                 lmv->lmv_master_mdt_index = i;
1114                 buf.lb_buf = lmv;
1115                 buf.lb_len = sizeof(*lmv);
1116                 if (declare) {
1117                         rc = lod_sub_object_declare_xattr_set(env,
1118                                                 lo->ldo_stripe[i], &buf,
1119                                                 XATTR_NAME_LMV,
1120                                                 LU_XATTR_REPLACE, th);
1121                 } else {
1122                         rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i],
1123                                                       &buf, XATTR_NAME_LMV,
1124                                                       LU_XATTR_REPLACE, th);
1125                 }
1126                 if (rc != 0)
1127                         break;
1128         }
1129
1130         RETURN(rc);
1131 }
1132
1133 /**
1134  * Implementation of dt_object_operations::do_declare_attr_set.
1135  *
1136  * If the object is striped, then apply the changes to all the stripes.
1137  *
1138  * \see dt_object_operations::do_declare_attr_set() in the API description
1139  * for details.
1140  */
1141 static int lod_declare_attr_set(const struct lu_env *env,
1142                                 struct dt_object *dt,
1143                                 const struct lu_attr *attr,
1144                                 struct thandle *th)
1145 {
1146         struct dt_object  *next = dt_object_child(dt);
1147         struct lod_object *lo = lod_dt_obj(dt);
1148         int                rc, i;
1149         ENTRY;
1150
1151         /* Set dead object on all other stripes */
1152         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1153             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1154                 rc = lod_mark_dead_object(env, dt, th, true);
1155                 RETURN(rc);
1156         }
1157
1158         /*
1159          * declare setattr on the local object
1160          */
1161         rc = lod_sub_object_declare_attr_set(env, next, attr, th);
1162         if (rc)
1163                 RETURN(rc);
1164
1165         /* osp_declare_attr_set() ignores all attributes other than
1166          * UID, GID, and size, and osp_attr_set() ignores all but UID
1167          * and GID.  Declaration of size attr setting happens through
1168          * lod_declare_init_size(), and not through this function.
1169          * Therefore we need not load striping unless ownership is
1170          * changing.  This should save memory and (we hope) speed up
1171          * rename(). */
1172         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1173                 if (!(attr->la_valid & (LA_UID | LA_GID)))
1174                         RETURN(rc);
1175
1176                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1177                         RETURN(0);
1178         } else {
1179                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1180                                         LA_ATIME | LA_MTIME | LA_CTIME |
1181                                         LA_FLAGS)))
1182                         RETURN(rc);
1183         }
1184         /*
1185          * load striping information, notice we don't do this when object
1186          * is being initialized as we don't need this information till
1187          * few specific cases like destroy, chown
1188          */
1189         rc = lod_load_striping(env, lo);
1190         if (rc)
1191                 RETURN(rc);
1192
1193         if (lo->ldo_stripenr == 0)
1194                 RETURN(0);
1195
1196         /*
1197          * if object is striped declare changes on the stripes
1198          */
1199         LASSERT(lo->ldo_stripe);
1200         for (i = 0; i < lo->ldo_stripenr; i++) {
1201                 if (lo->ldo_stripe[i] == NULL)
1202                         continue;
1203                 rc = lod_sub_object_declare_attr_set(env,
1204                                         lo->ldo_stripe[i], attr,
1205                                         th);
1206                 if (rc != 0)
1207                         RETURN(rc);
1208         }
1209
1210         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1211             dt_object_exists(next) != 0 &&
1212             dt_object_remote(next) == 0)
1213                 lod_sub_object_declare_xattr_del(env, next,
1214                                                 XATTR_NAME_LOV, th);
1215
1216         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1217             dt_object_exists(next) &&
1218             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1219                 struct lod_thread_info *info = lod_env_info(env);
1220                 struct lu_buf *buf = &info->lti_buf;
1221
1222                 buf->lb_buf = info->lti_ea_store;
1223                 buf->lb_len = info->lti_ea_store_size;
1224                 lod_sub_object_declare_xattr_set(env, next, buf,
1225                                                  XATTR_NAME_LOV,
1226                                                  LU_XATTR_REPLACE, th);
1227         }
1228
1229         RETURN(rc);
1230 }
1231
1232 /**
1233  * Implementation of dt_object_operations::do_attr_set.
1234  *
1235  * If the object is striped, then apply the changes to all or subset of
1236  * the stripes depending on the object type and specific attributes.
1237  *
1238  * \see dt_object_operations::do_attr_set() in the API description for details.
1239  */
1240 static int lod_attr_set(const struct lu_env *env,
1241                         struct dt_object *dt,
1242                         const struct lu_attr *attr,
1243                         struct thandle *th)
1244 {
1245         struct dt_object        *next = dt_object_child(dt);
1246         struct lod_object       *lo = lod_dt_obj(dt);
1247         int                     rc, i;
1248         ENTRY;
1249
1250         /* Set dead object on all other stripes */
1251         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1252             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1253                 rc = lod_mark_dead_object(env, dt, th, false);
1254                 RETURN(rc);
1255         }
1256
1257         /*
1258          * apply changes to the local object
1259          */
1260         rc = lod_sub_object_attr_set(env, next, attr, th);
1261         if (rc)
1262                 RETURN(rc);
1263
1264         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1265                 if (!(attr->la_valid & (LA_UID | LA_GID)))
1266                         RETURN(rc);
1267
1268                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1269                         RETURN(0);
1270         } else {
1271                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1272                                         LA_ATIME | LA_MTIME | LA_CTIME |
1273                                         LA_FLAGS)))
1274                         RETURN(rc);
1275         }
1276
1277         if (lo->ldo_stripenr == 0)
1278                 RETURN(0);
1279
1280         /*
1281          * if object is striped, apply changes to all the stripes
1282          */
1283         LASSERT(lo->ldo_stripe);
1284         for (i = 0; i < lo->ldo_stripenr; i++) {
1285                 if (unlikely(lo->ldo_stripe[i] == NULL))
1286                         continue;
1287
1288                 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
1289                     (dt_object_exists(lo->ldo_stripe[i]) == 0))
1290                         continue;
1291
1292                 rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], attr, th);
1293                 if (rc != 0)
1294                         break;
1295         }
1296
1297         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1298             dt_object_exists(next) != 0 &&
1299             dt_object_remote(next) == 0)
1300                 rc = lod_sub_object_xattr_del(env, next, XATTR_NAME_LOV, th);
1301
1302         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1303             dt_object_exists(next) &&
1304             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1305                 struct lod_thread_info *info = lod_env_info(env);
1306                 struct lu_buf *buf = &info->lti_buf;
1307                 struct ost_id *oi = &info->lti_ostid;
1308                 struct lu_fid *fid = &info->lti_fid;
1309                 struct lov_mds_md_v1 *lmm;
1310                 struct lov_ost_data_v1 *objs;
1311                 __u32 magic;
1312                 int rc1;
1313
1314                 rc1 = lod_get_lov_ea(env, lo);
1315                 if (rc1  <= 0)
1316                         RETURN(rc);
1317
1318                 buf->lb_buf = info->lti_ea_store;
1319                 buf->lb_len = info->lti_ea_store_size;
1320                 lmm = info->lti_ea_store;
1321                 magic = le32_to_cpu(lmm->lmm_magic);
1322                 if (magic == LOV_MAGIC_V1)
1323                         objs = &(lmm->lmm_objects[0]);
1324                 else
1325                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1326                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1327                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1328                 fid->f_oid--;
1329                 fid_to_ostid(fid, oi);
1330                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1331
1332                 rc = lod_sub_object_xattr_set(env, next, buf, XATTR_NAME_LOV,
1333                                               LU_XATTR_REPLACE, th);
1334         }
1335
1336         RETURN(rc);
1337 }
1338
1339 /**
1340  * Implementation of dt_object_operations::do_xattr_get.
1341  *
1342  * If LOV EA is requested from the root object and it's not
1343  * found, then return default striping for the filesystem.
1344  *
1345  * \see dt_object_operations::do_xattr_get() in the API description for details.
1346  */
1347 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1348                          struct lu_buf *buf, const char *name)
1349 {
1350         struct lod_thread_info  *info = lod_env_info(env);
1351         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
1352         int                      rc, is_root;
1353         ENTRY;
1354
1355         rc = dt_xattr_get(env, dt_object_child(dt), buf, name);
1356         if (strcmp(name, XATTR_NAME_LMV) == 0) {
1357                 struct lmv_mds_md_v1    *lmv1;
1358                 int                      rc1 = 0;
1359
1360                 if (rc > (typeof(rc))sizeof(*lmv1))
1361                         RETURN(rc);
1362
1363                 if (rc < (typeof(rc))sizeof(*lmv1))
1364                         RETURN(rc = rc > 0 ? -EINVAL : rc);
1365
1366                 if (buf->lb_buf == NULL || buf->lb_len == 0) {
1367                         CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
1368
1369                         info->lti_buf.lb_buf = info->lti_key;
1370                         info->lti_buf.lb_len = sizeof(*lmv1);
1371                         rc = dt_xattr_get(env, dt_object_child(dt),
1372                                           &info->lti_buf, name);
1373                         if (unlikely(rc != sizeof(*lmv1)))
1374                                 RETURN(rc = rc > 0 ? -EINVAL : rc);
1375
1376                         lmv1 = info->lti_buf.lb_buf;
1377                         /* The on-disk LMV EA only contains header, but the
1378                          * returned LMV EA size should contain the space for
1379                          * the FIDs of all shards of the striped directory. */
1380                         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
1381                                 rc = lmv_mds_md_size(
1382                                         le32_to_cpu(lmv1->lmv_stripe_count),
1383                                         LMV_MAGIC_V1);
1384                 } else {
1385                         rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
1386                                                   buf, false);
1387                 }
1388
1389                 RETURN(rc = rc1 != 0 ? rc1 : rc);
1390         }
1391
1392         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1393                 RETURN(rc);
1394
1395         /*
1396          * lod returns default striping on the real root of the device
1397          * this is like the root stores default striping for the whole
1398          * filesystem. historically we've been using a different approach
1399          * and store it in the config.
1400          */
1401         dt_root_get(env, dev->lod_child, &info->lti_fid);
1402         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1403
1404         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1405                 struct lov_user_md *lum = buf->lb_buf;
1406                 struct lov_desc    *desc = &dev->lod_desc;
1407
1408                 if (buf->lb_buf == NULL) {
1409                         rc = sizeof(*lum);
1410                 } else if (buf->lb_len >= sizeof(*lum)) {
1411                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1412                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1413                         lmm_oi_set_id(&lum->lmm_oi, 0);
1414                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1415                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1416                         lum->lmm_stripe_size = cpu_to_le32(
1417                                                 desc->ld_default_stripe_size);
1418                         lum->lmm_stripe_count = cpu_to_le16(
1419                                                 desc->ld_default_stripe_count);
1420                         lum->lmm_stripe_offset = cpu_to_le16(
1421                                                 desc->ld_default_stripe_offset);
1422                         rc = sizeof(*lum);
1423                 } else {
1424                         rc = -ERANGE;
1425                 }
1426         }
1427
1428         RETURN(rc);
1429 }
1430
1431 /**
1432  * Verify LVM EA.
1433  *
1434  * Checks that the magic of the stripe is sane.
1435  *
1436  * \param[in] lod       lod device
1437  * \param[in] lum       a buffer storing LMV EA to verify
1438  *
1439  * \retval              0 if the EA is sane
1440  * \retval              negative otherwise
1441  */
1442 static int lod_verify_md_striping(struct lod_device *lod,
1443                                   const struct lmv_user_md_v1 *lum)
1444 {
1445         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC)) {
1446                 CERROR("%s: invalid lmv_user_md: magic = %x, "
1447                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1448                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1449                        (int)le32_to_cpu(lum->lum_stripe_offset),
1450                        le32_to_cpu(lum->lum_stripe_count), -EINVAL);
1451                 return -EINVAL;
1452         }
1453
1454         return 0;
1455 }
1456
1457 /**
1458  * Initialize LMV EA for a slave.
1459  *
1460  * Initialize slave's LMV EA from the master's LMV EA.
1461  *
1462  * \param[in] master_lmv        a buffer containing master's EA
1463  * \param[out] slave_lmv        a buffer where slave's EA will be stored
1464  *
1465  */
1466 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1467                                   const struct lmv_mds_md_v1 *master_lmv)
1468 {
1469         *slave_lmv = *master_lmv;
1470         slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1471 }
1472
1473 /**
1474  * Generate LMV EA.
1475  *
1476  * Generate LMV EA from the object passed as \a dt. The object must have
1477  * the stripes created and initialized.
1478  *
1479  * \param[in] env       execution environment
1480  * \param[in] dt        object
1481  * \param[out] lmv_buf  buffer storing generated LMV EA
1482  *
1483  * \retval              0 on success
1484  * \retval              negative if failed
1485  */
1486 static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1487                            struct lu_buf *lmv_buf)
1488 {
1489         struct lod_thread_info  *info = lod_env_info(env);
1490         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1491         struct lod_object       *lo = lod_dt_obj(dt);
1492         struct lmv_mds_md_v1    *lmm1;
1493         int                     stripe_count;
1494         int                     type = LU_SEQ_RANGE_ANY;
1495         int                     rc;
1496         __u32                   mdtidx;
1497         ENTRY;
1498
1499         LASSERT(lo->ldo_dir_striped != 0);
1500         LASSERT(lo->ldo_stripenr > 0);
1501         stripe_count = lo->ldo_stripenr;
1502         /* Only store the LMV EA heahder on the disk. */
1503         if (info->lti_ea_store_size < sizeof(*lmm1)) {
1504                 rc = lod_ea_store_resize(info, sizeof(*lmm1));
1505                 if (rc != 0)
1506                         RETURN(rc);
1507         } else {
1508                 memset(info->lti_ea_store, 0, sizeof(*lmm1));
1509         }
1510
1511         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1512         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1513         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1514         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1515         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1516                             &mdtidx, &type);
1517         if (rc != 0)
1518                 RETURN(rc);
1519
1520         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1521         lmv_buf->lb_buf = info->lti_ea_store;
1522         lmv_buf->lb_len = sizeof(*lmm1);
1523
1524         RETURN(rc);
1525 }
1526
1527 /**
1528  * Create in-core represenation for a striped directory.
1529  *
1530  * Parse the buffer containing LMV EA and instantiate LU objects
1531  * representing the stripe objects. The pointers to the objects are
1532  * stored in ldo_stripe field of \a lo. This function is used when
1533  * we need to access an already created object (i.e. load from a disk).
1534  *
1535  * \param[in] env       execution environment
1536  * \param[in] lo        lod object
1537  * \param[in] buf       buffer containing LMV EA
1538  *
1539  * \retval              0 on success
1540  * \retval              negative if failed
1541  */
1542 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1543                            const struct lu_buf *buf)
1544 {
1545         struct lod_thread_info  *info = lod_env_info(env);
1546         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1547         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1548         struct dt_object        **stripe;
1549         union lmv_mds_md        *lmm = buf->lb_buf;
1550         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
1551         struct lu_fid           *fid = &info->lti_fid;
1552         unsigned int            i;
1553         int                     rc = 0;
1554         ENTRY;
1555
1556         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1557                 RETURN(0);
1558
1559         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1560                 lo->ldo_dir_slave_stripe = 1;
1561                 RETURN(0);
1562         }
1563
1564         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1565                 RETURN(-EINVAL);
1566
1567         if (le32_to_cpu(lmv1->lmv_stripe_count) < 1)
1568                 RETURN(0);
1569
1570         LASSERT(lo->ldo_stripe == NULL);
1571         OBD_ALLOC(stripe, sizeof(stripe[0]) *
1572                   (le32_to_cpu(lmv1->lmv_stripe_count)));
1573         if (stripe == NULL)
1574                 RETURN(-ENOMEM);
1575
1576         for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1577                 struct dt_device        *tgt_dt;
1578                 struct dt_object        *dto;
1579                 int                     type = LU_SEQ_RANGE_ANY;
1580                 __u32                   idx;
1581
1582                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1583                 if (!fid_is_sane(fid))
1584                         GOTO(out, rc = -ESTALE);
1585
1586                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1587                 if (rc != 0)
1588                         GOTO(out, rc);
1589
1590                 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1591                         tgt_dt = lod->lod_child;
1592                 } else {
1593                         struct lod_tgt_desc     *tgt;
1594
1595                         tgt = LTD_TGT(ltd, idx);
1596                         if (tgt == NULL)
1597                                 GOTO(out, rc = -ESTALE);
1598                         tgt_dt = tgt->ltd_tgt;
1599                 }
1600
1601                 dto = dt_locate_at(env, tgt_dt, fid,
1602                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1603                                   NULL);
1604                 if (IS_ERR(dto))
1605                         GOTO(out, rc = PTR_ERR(dto));
1606
1607                 stripe[i] = dto;
1608         }
1609 out:
1610         lo->ldo_stripe = stripe;
1611         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1612         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1613         if (rc != 0)
1614                 lod_object_free_striping(env, lo);
1615
1616         RETURN(rc);
1617 }
1618
1619 /**
1620  * Create a striped directory.
1621  *
1622  * Create a striped directory with a given stripe pattern on the specified MDTs.
1623  * A striped directory is represented as a regular directory - an index listing
1624  * all the stripes. The stripes point back to the master object with ".." and
1625  * LinkEA. The master object gets LMV EA which identifies it as a striped
1626  * directory. The function allocates FIDs for all the stripes.
1627  *
1628  * \param[in] env       execution environment
1629  * \param[in] dt        object
1630  * \param[in] attr      attributes to initialize the objects with
1631  * \param[in] lum       a pattern specifying the number of stripes and
1632  *                      MDT to start from
1633  * \param[in] dof       type of objects to be created
1634  * \param[in] th        transaction handle
1635  *
1636  * \retval              0 on success
1637  * \retval              negative if failed
1638  */
1639 static int lod_dir_declare_create_stripes(const struct lu_env *env,
1640                                           struct dt_object *dt,
1641                                           struct lu_attr *attr,
1642                                           struct dt_object_format *dof,
1643                                           struct thandle *th)
1644 {
1645         struct lod_thread_info  *info = lod_env_info(env);
1646         struct lu_buf           lmv_buf;
1647         struct lu_buf           slave_lmv_buf;
1648         struct lmv_mds_md_v1    *lmm;
1649         struct lmv_mds_md_v1    *slave_lmm = NULL;
1650         struct dt_insert_rec    *rec = &info->lti_dt_rec;
1651         struct lod_object       *lo = lod_dt_obj(dt);
1652         int                     rc;
1653         __u32                   i;
1654         ENTRY;
1655
1656         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1657         if (rc != 0)
1658                 GOTO(out, rc);
1659         lmm = lmv_buf.lb_buf;
1660
1661         OBD_ALLOC_PTR(slave_lmm);
1662         if (slave_lmm == NULL)
1663                 GOTO(out, rc = -ENOMEM);
1664
1665         lod_prep_slave_lmv_md(slave_lmm, lmm);
1666         slave_lmv_buf.lb_buf = slave_lmm;
1667         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1668
1669         if (!dt_try_as_dir(env, dt_object_child(dt)))
1670                 GOTO(out, rc = -EINVAL);
1671
1672         rec->rec_type = S_IFDIR;
1673         for (i = 0; i < lo->ldo_stripenr; i++) {
1674                 struct dt_object        *dto = lo->ldo_stripe[i];
1675                 char                    *stripe_name = info->lti_key;
1676                 struct lu_name          *sname;
1677                 struct linkea_data       ldata          = { NULL };
1678                 struct lu_buf           linkea_buf;
1679
1680                 rc = lod_sub_object_declare_create(env, dto, attr, NULL,
1681                                                    dof, th);
1682                 if (rc != 0)
1683                         GOTO(out, rc);
1684
1685                 if (!dt_try_as_dir(env, dto))
1686                         GOTO(out, rc = -EINVAL);
1687
1688                 rc = lod_sub_object_declare_ref_add(env, dto, th);
1689                 if (rc != 0)
1690                         GOTO(out, rc);
1691
1692                 rec->rec_fid = lu_object_fid(&dto->do_lu);
1693                 rc = lod_sub_object_declare_insert(env, dto,
1694                                         (const struct dt_rec *)rec,
1695                                         (const struct dt_key *)dot, th);
1696                 if (rc != 0)
1697                         GOTO(out, rc);
1698
1699                 /* master stripe FID will be put to .. */
1700                 rec->rec_fid = lu_object_fid(&dt->do_lu);
1701                 rc = lod_sub_object_declare_insert(env, dto,
1702                                         (const struct dt_rec *)rec,
1703                                         (const struct dt_key *)dotdot,
1704                                         th);
1705                 if (rc != 0)
1706                         GOTO(out, rc);
1707
1708                 if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
1709                     cfs_fail_val != i) {
1710                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
1711                             cfs_fail_val == i)
1712                                 slave_lmm->lmv_master_mdt_index =
1713                                                         cpu_to_le32(i + 1);
1714                         else
1715                                 slave_lmm->lmv_master_mdt_index =
1716                                                         cpu_to_le32(i);
1717                         rc = lod_sub_object_declare_xattr_set(env, dto,
1718                                         &slave_lmv_buf, XATTR_NAME_LMV, 0, th);
1719                         if (rc != 0)
1720                                 GOTO(out, rc);
1721                 }
1722
1723                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
1724                     cfs_fail_val == i)
1725                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
1726                                 PFID(lu_object_fid(&dto->do_lu)), i + 1);
1727                 else
1728                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
1729                                 PFID(lu_object_fid(&dto->do_lu)), i);
1730
1731                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1732                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1733                 if (rc != 0)
1734                         GOTO(out, rc);
1735
1736                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1737                 if (rc != 0)
1738                         GOTO(out, rc);
1739
1740                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1741                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1742                 rc = lod_sub_object_declare_xattr_set(env, dto, &linkea_buf,
1743                                           XATTR_NAME_LINK, 0, th);
1744                 if (rc != 0)
1745                         GOTO(out, rc);
1746
1747                 rec->rec_fid = lu_object_fid(&dto->do_lu);
1748                 rc = lod_sub_object_declare_insert(env, dt_object_child(dt),
1749                                        (const struct dt_rec *)rec,
1750                                        (const struct dt_key *)stripe_name,
1751                                        th);
1752                 if (rc != 0)
1753                         GOTO(out, rc);
1754
1755                 rc = lod_sub_object_declare_ref_add(env, dt_object_child(dt),
1756                                                     th);
1757                 if (rc != 0)
1758                         GOTO(out, rc);
1759         }
1760
1761         rc = lod_sub_object_declare_xattr_set(env, dt_object_child(dt),
1762                                 &lmv_buf, XATTR_NAME_LMV, 0, th);
1763         if (rc != 0)
1764                 GOTO(out, rc);
1765 out:
1766         if (slave_lmm != NULL)
1767                 OBD_FREE_PTR(slave_lmm);
1768
1769         RETURN(rc);
1770 }
1771
1772 static int lod_prep_md_striped_create(const struct lu_env *env,
1773                                       struct dt_object *dt,
1774                                       struct lu_attr *attr,
1775                                       const struct lmv_user_md_v1 *lum,
1776                                       struct dt_object_format *dof,
1777                                       struct thandle *th)
1778 {
1779         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1780         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1781         struct lod_object       *lo = lod_dt_obj(dt);
1782         struct dt_object        **stripe;
1783         __u32                   stripe_count;
1784         int                     *idx_array;
1785         int                     rc = 0;
1786         __u32                   i;
1787         __u32                   j;
1788         ENTRY;
1789
1790         /* The lum has been verifed in lod_verify_md_striping */
1791         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1792         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1793
1794         stripe_count = le32_to_cpu(lum->lum_stripe_count);
1795
1796         /* shrink the stripe_count to the avaible MDT count */
1797         if (stripe_count > lod->lod_remote_mdt_count + 1)
1798                 stripe_count = lod->lod_remote_mdt_count + 1;
1799
1800         OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1801         if (stripe == NULL)
1802                 RETURN(-ENOMEM);
1803
1804         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1805         if (idx_array == NULL)
1806                 GOTO(out_free, rc = -ENOMEM);
1807
1808         for (i = 0; i < stripe_count; i++) {
1809                 struct lod_tgt_desc     *tgt = NULL;
1810                 struct dt_object        *dto;
1811                 struct lu_fid           fid = { 0 };
1812                 int                     idx;
1813                 struct lu_object_conf   conf = { 0 };
1814                 struct dt_device        *tgt_dt = NULL;
1815
1816                 if (i == 0) {
1817                         /* Right now, master stripe and master object are
1818                          * on the same MDT */
1819                         idx = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
1820                         rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1821                                            NULL);
1822                         if (rc < 0)
1823                                 GOTO(out_put, rc);
1824                         tgt_dt = lod->lod_child;
1825                         goto next;
1826                 }
1827
1828                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1829
1830                 for (j = 0; j < lod->lod_remote_mdt_count;
1831                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1832                         bool already_allocated = false;
1833                         __u32 k;
1834
1835                         CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
1836                                " allocated %u, last allocated %d\n", idx,
1837                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1838
1839                         /* Find next available target */
1840                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1841                                 continue;
1842
1843                         /* check whether the idx already exists
1844                          * in current allocated array */
1845                         for (k = 0; k < i; k++) {
1846                                 if (idx_array[k] == idx) {
1847                                         already_allocated = true;
1848                                         break;
1849                                 }
1850                         }
1851
1852                         if (already_allocated)
1853                                 continue;
1854
1855                         /* check the status of the OSP */
1856                         tgt = LTD_TGT(ltd, idx);
1857                         if (tgt == NULL)
1858                                 continue;
1859
1860                         tgt_dt = tgt->ltd_tgt;
1861                         rc = dt_statfs(env, tgt_dt, NULL);
1862                         if (rc) {
1863                                 /* this OSP doesn't feel well */
1864                                 rc = 0;
1865                                 continue;
1866                         }
1867
1868                         rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1869                         if (rc < 0) {
1870                                 rc = 0;
1871                                 continue;
1872                         }
1873
1874                         break;
1875                 }
1876
1877                 /* Can not allocate more stripes */
1878                 if (j == lod->lod_remote_mdt_count) {
1879                         CDEBUG(D_INFO, "%s: require stripes %u only get %d\n",
1880                                lod2obd(lod)->obd_name, stripe_count, i - 1);
1881                         break;
1882                 }
1883
1884                 CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
1885                        " allocated %u, last allocated %d\n", idx,
1886                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1887
1888 next:
1889                 /* tgt_dt and fid must be ready after search avaible OSP
1890                  * in the above loop */
1891                 LASSERT(tgt_dt != NULL);
1892                 LASSERT(fid_is_sane(&fid));
1893                 conf.loc_flags = LOC_F_NEW;
1894                 dto = dt_locate_at(env, tgt_dt, &fid,
1895                                    dt->do_lu.lo_dev->ld_site->ls_top_dev,
1896                                    &conf);
1897                 if (IS_ERR(dto))
1898                         GOTO(out_put, rc = PTR_ERR(dto));
1899                 stripe[i] = dto;
1900                 idx_array[i] = idx;
1901         }
1902
1903         lo->ldo_dir_striped = 1;
1904         lo->ldo_stripe = stripe;
1905         lo->ldo_stripenr = i;
1906         lo->ldo_stripes_allocated = stripe_count;
1907
1908         if (lo->ldo_stripenr == 0)
1909                 GOTO(out_put, rc = -ENOSPC);
1910
1911         rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th);
1912         if (rc != 0)
1913                 GOTO(out_put, rc);
1914
1915 out_put:
1916         if (rc < 0) {
1917                 for (i = 0; i < stripe_count; i++)
1918                         if (stripe[i] != NULL)
1919                                 lu_object_put(env, &stripe[i]->do_lu);
1920                 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1921                 lo->ldo_stripenr = 0;
1922                 lo->ldo_stripes_allocated = 0;
1923                 lo->ldo_stripe = NULL;
1924         }
1925
1926 out_free:
1927         if (idx_array != NULL)
1928                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1929
1930         RETURN(rc);
1931 }
1932
1933 /**
1934  * Declare create striped md object.
1935  *
1936  * The function declares intention to create a striped directory. This is a
1937  * wrapper for lod_prep_md_striped_create(). The only additional functionality
1938  * is to verify pattern \a lum_buf is good. Check that function for the details.
1939  *
1940  * \param[in] env       execution environment
1941  * \param[in] dt        object
1942  * \param[in] attr      attributes to initialize the objects with
1943  * \param[in] lum_buf   a pattern specifying the number of stripes and
1944  *                      MDT to start from
1945  * \param[in] dof       type of objects to be created
1946  * \param[in] th        transaction handle
1947  *
1948  * \retval              0 on success
1949  * \retval              negative if failed
1950  *
1951  */
1952 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1953                                      struct dt_object *dt,
1954                                      struct lu_attr *attr,
1955                                      const struct lu_buf *lum_buf,
1956                                      struct dt_object_format *dof,
1957                                      struct thandle *th)
1958 {
1959         struct lod_object       *lo = lod_dt_obj(dt);
1960         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1961         struct lmv_user_md_v1   *lum;
1962         int                     rc;
1963         ENTRY;
1964
1965         lum = lum_buf->lb_buf;
1966         LASSERT(lum != NULL);
1967
1968         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1969                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1970                (int)le32_to_cpu(lum->lum_stripe_offset));
1971
1972         if (le32_to_cpu(lum->lum_stripe_count) == 0)
1973                 GOTO(out, rc = 0);
1974
1975         rc = lod_verify_md_striping(lod, lum);
1976         if (rc != 0)
1977                 GOTO(out, rc);
1978
1979         /* prepare dir striped objects */
1980         rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1981         if (rc != 0) {
1982                 /* failed to create striping, let's reset
1983                  * config so that others don't get confused */
1984                 lod_object_free_striping(env, lo);
1985                 GOTO(out, rc);
1986         }
1987 out:
1988         RETURN(rc);
1989 }
1990
1991
1992 /**
1993  * Implementation of dt_object_operations::do_declare_xattr_set.
1994  *
1995  * Used with regular (non-striped) objects. Basically it
1996  * initializes the striping information and applies the
1997  * change to all the stripes.
1998  *
1999  * \see dt_object_operations::do_declare_xattr_set() in the API description
2000  * for details.
2001  */
2002 static int lod_dir_declare_xattr_set(const struct lu_env *env,
2003                                      struct dt_object *dt,
2004                                      const struct lu_buf *buf,
2005                                      const char *name, int fl,
2006                                      struct thandle *th)
2007 {
2008         struct dt_object        *next = dt_object_child(dt);
2009         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
2010         struct lod_object       *lo = lod_dt_obj(dt);
2011         int                     i;
2012         int                     rc;
2013         ENTRY;
2014
2015         if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2016                 struct lmv_user_md_v1 *lum;
2017
2018                 LASSERT(buf != NULL && buf->lb_buf != NULL);
2019                 lum = buf->lb_buf;
2020                 rc = lod_verify_md_striping(d, lum);
2021                 if (rc != 0)
2022                         RETURN(rc);
2023         }
2024
2025         rc = lod_sub_object_declare_xattr_set(env, next, buf, name, fl, th);
2026         if (rc != 0)
2027                 RETURN(rc);
2028
2029         /* set xattr to each stripes, if needed */
2030         rc = lod_load_striping(env, lo);
2031         if (rc != 0)
2032                 RETURN(rc);
2033
2034         /* Note: Do not set LinkEA on sub-stripes, otherwise
2035          * it will confuse the fid2path process(see mdt_path_current()).
2036          * The linkEA between master and sub-stripes is set in
2037          * lod_xattr_set_lmv(). */
2038         if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
2039                 RETURN(0);
2040
2041         for (i = 0; i < lo->ldo_stripenr; i++) {
2042                 LASSERT(lo->ldo_stripe[i]);
2043
2044                 rc = lod_sub_object_declare_xattr_set(env, lo->ldo_stripe[i],
2045                                                 buf, name, fl, th);
2046                 if (rc != 0)
2047                         break;
2048         }
2049
2050         RETURN(rc);
2051 }
2052
2053 /**
2054  * Implementation of dt_object_operations::do_declare_xattr_set.
2055  *
2056  * \see dt_object_operations::do_declare_xattr_set() in the API description
2057  * for details.
2058  *
2059  * the extension to the API:
2060  *   - declaring LOVEA requests striping creation
2061  *   - LU_XATTR_REPLACE means layout swap
2062  */
2063 static int lod_declare_xattr_set(const struct lu_env *env,
2064                                  struct dt_object *dt,
2065                                  const struct lu_buf *buf,
2066                                  const char *name, int fl,
2067                                  struct thandle *th)
2068 {
2069         struct dt_object *next = dt_object_child(dt);
2070         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
2071         __u32             mode;
2072         int               rc;
2073         ENTRY;
2074
2075         /*
2076          * allow to declare predefined striping on a new (!mode) object
2077          * which is supposed to be replay of regular file creation
2078          * (when LOV setting is declared)
2079          * LU_XATTR_REPLACE is set to indicate a layout swap
2080          */
2081         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
2082         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
2083              !(fl & LU_XATTR_REPLACE)) {
2084                 /*
2085                  * this is a request to manipulate object's striping
2086                  */
2087                 if (dt_object_exists(dt)) {
2088                         rc = dt_attr_get(env, next, attr);
2089                         if (rc)
2090                                 RETURN(rc);
2091                 } else {
2092                         memset(attr, 0, sizeof(*attr));
2093                         attr->la_valid = LA_TYPE | LA_MODE;
2094                         attr->la_mode = S_IFREG;
2095                 }
2096                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
2097         } else if (S_ISDIR(mode)) {
2098                 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
2099         } else {
2100                 rc = lod_sub_object_declare_xattr_set(env, next, buf, name,
2101                                                       fl, th);
2102         }
2103
2104         RETURN(rc);
2105 }
2106
2107 /**
2108  * Resets cached default striping in the object.
2109  *
2110  * \param[in] lo        object
2111  */
2112 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
2113 {
2114         lo->ldo_def_striping_set = 0;
2115         lo->ldo_def_striping_cached = 0;
2116         lod_object_set_pool(lo, NULL);
2117         lo->ldo_def_stripe_size = 0;
2118         lo->ldo_def_stripenr = 0;
2119         if (lo->ldo_dir_stripe != NULL)
2120                 lo->ldo_dir_def_striping_cached = 0;
2121 }
2122
2123 /**
2124  * Apply xattr changes to the object.
2125  *
2126  * Applies xattr changes to the object and the stripes if the latter exist.
2127  *
2128  * \param[in] env       execution environment
2129  * \param[in] dt        object
2130  * \param[in] buf       buffer pointing to the new value of xattr
2131  * \param[in] name      name of xattr
2132  * \param[in] fl        flags
2133  * \param[in] th        transaction handle
2134  *
2135  * \retval              0 on success
2136  * \retval              negative if failed
2137  */
2138 static int lod_xattr_set_internal(const struct lu_env *env,
2139                                   struct dt_object *dt,
2140                                   const struct lu_buf *buf,
2141                                   const char *name, int fl, struct thandle *th)
2142 {
2143         struct dt_object        *next = dt_object_child(dt);
2144         struct lod_object       *lo = lod_dt_obj(dt);
2145         int                     rc;
2146         int                     i;
2147         ENTRY;
2148
2149         rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th);
2150         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
2151                 RETURN(rc);
2152
2153         /* Note: Do not set LinkEA on sub-stripes, otherwise
2154          * it will confuse the fid2path process(see mdt_path_current()).
2155          * The linkEA between master and sub-stripes is set in
2156          * lod_xattr_set_lmv(). */
2157         if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
2158                 RETURN(0);
2159
2160         for (i = 0; i < lo->ldo_stripenr; i++) {
2161                 LASSERT(lo->ldo_stripe[i]);
2162
2163                 rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], buf, name,
2164                                               fl, th);
2165                 if (rc != 0)
2166                         break;
2167         }
2168
2169         RETURN(rc);
2170 }
2171
2172 /**
2173  * Delete an extended attribute.
2174  *
2175  * Deletes specified xattr from the object and the stripes if the latter exist.
2176  *
2177  * \param[in] env       execution environment
2178  * \param[in] dt        object
2179  * \param[in] name      name of xattr
2180  * \param[in] th        transaction handle
2181  *
2182  * \retval              0 on success
2183  * \retval              negative if failed
2184  */
2185 static int lod_xattr_del_internal(const struct lu_env *env,
2186                                   struct dt_object *dt,
2187                                   const char *name, struct thandle *th)
2188 {
2189         struct dt_object        *next = dt_object_child(dt);
2190         struct lod_object       *lo = lod_dt_obj(dt);
2191         int                     rc;
2192         int                     i;
2193         ENTRY;
2194
2195         rc = lod_sub_object_xattr_del(env, next, name, th);
2196         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
2197                 RETURN(rc);
2198
2199         if (lo->ldo_stripenr == 0)
2200                 RETURN(rc);
2201
2202         for (i = 0; i < lo->ldo_stripenr; i++) {
2203                 LASSERT(lo->ldo_stripe[i]);
2204
2205                 rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name,
2206                                               th);
2207                 if (rc != 0)
2208                         break;
2209         }
2210
2211         RETURN(rc);
2212 }
2213
2214 /**
2215  * Set default striping on a directory.
2216  *
2217  * Sets specified striping on a directory object unless it matches the default
2218  * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing
2219  * EA. This striping will be used when regular file is being created in this
2220  * directory.
2221  *
2222  * \param[in] env       execution environment
2223  * \param[in] dt        the striped object
2224  * \param[in] buf       buffer with the striping
2225  * \param[in] name      name of EA
2226  * \param[in] fl        xattr flag (see OSD API description)
2227  * \param[in] th        transaction handle
2228  *
2229  * \retval              0 on success
2230  * \retval              negative if failed
2231  */
2232 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
2233                                     struct dt_object *dt,
2234                                     const struct lu_buf *buf,
2235                                     const char *name, int fl,
2236                                     struct thandle *th)
2237 {
2238         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
2239         struct lod_object       *l = lod_dt_obj(dt);
2240         struct lov_user_md_v1   *lum;
2241         struct lov_user_md_v3   *v3 = NULL;
2242         const char              *pool_name = NULL;
2243         int                      rc;
2244         ENTRY;
2245
2246         /* If it is striped dir, we should clear the stripe cache for
2247          * slave stripe as well, but there are no effective way to
2248          * notify the LOD on the slave MDT, so we do not cache stripe
2249          * information for slave stripe for now. XXX*/
2250         lod_lov_stripe_cache_clear(l);
2251         LASSERT(buf != NULL && buf->lb_buf != NULL);
2252         lum = buf->lb_buf;
2253
2254         rc = lod_verify_striping(d, buf, false);
2255         if (rc)
2256                 RETURN(rc);
2257
2258         if (lum->lmm_magic == LOV_USER_MAGIC_V3) {
2259                 v3 = buf->lb_buf;
2260                 if (v3->lmm_pool_name[0] != '\0')
2261                         pool_name = v3->lmm_pool_name;
2262         }
2263
2264         /* if { size, offset, count } = { 0, -1, 0 } and no pool
2265          * (i.e. all default values specified) then delete default
2266          * striping from dir. */
2267         CDEBUG(D_OTHER,
2268                 "set default striping: sz %u # %u offset %d %s %s\n",
2269                 (unsigned)lum->lmm_stripe_size,
2270                 (unsigned)lum->lmm_stripe_count,
2271                 (int)lum->lmm_stripe_offset,
2272                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
2273
2274         if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count,
2275                                 lum->lmm_stripe_offset, pool_name)) {
2276                 rc = lod_xattr_del_internal(env, dt, name, th);
2277                 if (rc == -ENODATA)
2278                         rc = 0;
2279         } else {
2280                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th);
2281         }
2282
2283         RETURN(rc);
2284 }
2285
2286 /**
2287  * Set default striping on a directory object.
2288  *
2289  * Sets specified striping on a directory object unless it matches the default
2290  * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing
2291  * EA. This striping will be used when a new directory is being created in the
2292  * directory.
2293  *
2294  * \param[in] env       execution environment
2295  * \param[in] dt        the striped object
2296  * \param[in] buf       buffer with the striping
2297  * \param[in] name      name of EA
2298  * \param[in] fl        xattr flag (see OSD API description)
2299  * \param[in] th        transaction handle
2300  *
2301  * \retval              0 on success
2302  * \retval              negative if failed
2303  */
2304 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
2305                                             struct dt_object *dt,
2306                                             const struct lu_buf *buf,
2307                                             const char *name, int fl,
2308                                             struct thandle *th)
2309 {
2310         struct lod_object       *l = lod_dt_obj(dt);
2311         struct lmv_user_md_v1   *lum;
2312         int                      rc;
2313         ENTRY;
2314
2315         LASSERT(buf != NULL && buf->lb_buf != NULL);
2316         lum = buf->lb_buf;
2317
2318         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
2319               le32_to_cpu(lum->lum_stripe_count),
2320               (int)le32_to_cpu(lum->lum_stripe_offset));
2321
2322         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
2323                                  le32_to_cpu(lum->lum_stripe_offset)) &&
2324                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
2325                 rc = lod_xattr_del_internal(env, dt, name, th);
2326                 if (rc == -ENODATA)
2327                         rc = 0;
2328         } else {
2329                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th);
2330                 if (rc != 0)
2331                         RETURN(rc);
2332         }
2333
2334         /* Update default stripe cache */
2335         if (l->ldo_dir_stripe == NULL) {
2336                 OBD_ALLOC_PTR(l->ldo_dir_stripe);
2337                 if (l->ldo_dir_stripe == NULL)
2338                         RETURN(-ENOMEM);
2339         }
2340
2341         l->ldo_dir_def_striping_cached = 0;
2342         RETURN(rc);
2343 }
2344
2345 /**
2346  * Turn directory into a striped directory.
2347  *
2348  * During replay the client sends the striping created before MDT
2349  * failure, then the layer above LOD sends this defined striping
2350  * using ->do_xattr_set(), so LOD uses this method to replay creation
2351  * of the stripes. Notice the original information for the striping
2352  * (#stripes, FIDs, etc) was transferred in declare path.
2353  *
2354  * \param[in] env       execution environment
2355  * \param[in] dt        the striped object
2356  * \param[in] buf       not used currently
2357  * \param[in] name      not used currently
2358  * \param[in] fl        xattr flag (see OSD API description)
2359  * \param[in] th        transaction handle
2360  *
2361  * \retval              0 on success
2362  * \retval              negative if failed
2363  */
2364 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
2365                              const struct lu_buf *buf, const char *name,
2366                              int fl, struct thandle *th)
2367 {
2368         struct lod_object       *lo = lod_dt_obj(dt);
2369         struct lod_thread_info  *info = lod_env_info(env);
2370         struct lu_attr          *attr = &info->lti_attr;
2371         struct dt_object_format *dof = &info->lti_format;
2372         struct lu_buf           lmv_buf;
2373         struct lu_buf           slave_lmv_buf;
2374         struct lmv_mds_md_v1    *lmm;
2375         struct lmv_mds_md_v1    *slave_lmm = NULL;
2376         struct dt_insert_rec    *rec = &info->lti_dt_rec;
2377         int                     i;
2378         int                     rc;
2379         ENTRY;
2380
2381         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2382                 RETURN(-ENOTDIR);
2383
2384         /* The stripes are supposed to be allocated in declare phase,
2385          * if there are no stripes being allocated, it will skip */
2386         if (lo->ldo_stripenr == 0)
2387                 RETURN(0);
2388
2389         rc = dt_attr_get(env, dt_object_child(dt), attr);
2390         if (rc != 0)
2391                 RETURN(rc);
2392
2393         attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME |
2394                          LA_MODE | LA_UID | LA_GID | LA_TYPE;
2395         dof->dof_type = DFT_DIR;
2396
2397         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
2398         if (rc != 0)
2399                 RETURN(rc);
2400         lmm = lmv_buf.lb_buf;
2401
2402         OBD_ALLOC_PTR(slave_lmm);
2403         if (slave_lmm == NULL)
2404                 RETURN(-ENOMEM);
2405
2406         lod_prep_slave_lmv_md(slave_lmm, lmm);
2407         slave_lmv_buf.lb_buf = slave_lmm;
2408         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
2409
2410         rec->rec_type = S_IFDIR;
2411         for (i = 0; i < lo->ldo_stripenr; i++) {
2412                 struct dt_object *dto;
2413                 char             *stripe_name = info->lti_key;
2414                 struct lu_name          *sname;
2415                 struct linkea_data       ldata          = { NULL };
2416                 struct lu_buf            linkea_buf;
2417
2418                 dto = lo->ldo_stripe[i];
2419
2420                 dt_write_lock(env, dto, MOR_TGT_CHILD);
2421                 rc = lod_sub_object_create(env, dto, attr, NULL, dof,
2422                                            th);
2423                 if (rc != 0) {
2424                         dt_write_unlock(env, dto);
2425                         GOTO(out, rc);
2426                 }
2427
2428                 rc = lod_sub_object_ref_add(env, dto, th);
2429                 dt_write_unlock(env, dto);
2430                 if (rc != 0)
2431                         GOTO(out, rc);
2432
2433                 rec->rec_fid = lu_object_fid(&dto->do_lu);
2434                 rc = lod_sub_object_index_insert(env, dto,
2435                                 (const struct dt_rec *)rec,
2436                                 (const struct dt_key *)dot, th, 0);
2437                 if (rc != 0)
2438                         GOTO(out, rc);
2439
2440                 rec->rec_fid = lu_object_fid(&dt->do_lu);
2441                 rc = lod_sub_object_index_insert(env, dto, (struct dt_rec *)rec,
2442                                (const struct dt_key *)dotdot, th, 0);
2443                 if (rc != 0)
2444                         GOTO(out, rc);
2445
2446                 if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
2447                     cfs_fail_val != i) {
2448                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
2449                             cfs_fail_val == i)
2450                                 slave_lmm->lmv_master_mdt_index =
2451                                                         cpu_to_le32(i + 1);
2452                         else
2453                                 slave_lmm->lmv_master_mdt_index =
2454                                                         cpu_to_le32(i);
2455
2456                         rc = lod_sub_object_xattr_set(env, dto, &slave_lmv_buf,
2457                                                       XATTR_NAME_LMV, fl, th);
2458                         if (rc != 0)
2459                                 GOTO(out, rc);
2460                 }
2461
2462                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
2463                     cfs_fail_val == i)
2464                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2465                                  PFID(lu_object_fid(&dto->do_lu)), i + 1);
2466                 else
2467                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2468                                  PFID(lu_object_fid(&dto->do_lu)), i);
2469
2470                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
2471                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
2472                 if (rc != 0)
2473                         GOTO(out, rc);
2474
2475                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
2476                 if (rc != 0)
2477                         GOTO(out, rc);
2478
2479                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
2480                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
2481                 rc = lod_sub_object_xattr_set(env, dto, &linkea_buf,
2482                                         XATTR_NAME_LINK, 0, th);
2483                 if (rc != 0)
2484                         GOTO(out, rc);
2485
2486                 rec->rec_fid = lu_object_fid(&dto->do_lu);
2487                 rc = lod_sub_object_index_insert(env, dt_object_child(dt),
2488                                (const struct dt_rec *)rec,
2489                                (const struct dt_key *)stripe_name, th, 0);
2490                 if (rc != 0)
2491                         GOTO(out, rc);
2492
2493                 rc = lod_sub_object_ref_add(env, dt_object_child(dt), th);
2494                 if (rc != 0)
2495                         GOTO(out, rc);
2496         }
2497
2498         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV))
2499                 rc = lod_sub_object_xattr_set(env, dt_object_child(dt),
2500                                               &lmv_buf, XATTR_NAME_LMV, fl, th);
2501 out:
2502         if (slave_lmm != NULL)
2503                 OBD_FREE_PTR(slave_lmm);
2504
2505         RETURN(rc);
2506 }
2507
2508 /**
2509  * Helper function to declare/execute creation of a striped directory
2510  *
2511  * Called in declare/create object path, prepare striping for a directory
2512  * and prepare defaults data striping for the objects to be created in
2513  * that directory. Notice the function calls "declaration" or "execution"
2514  * methods depending on \a declare param. This is a consequence of the
2515  * current approach while we don't have natural distributed transactions:
2516  * we basically execute non-local updates in the declare phase. So, the
2517  * arguments for the both phases are the same and this is the reason for
2518  * this function to exist.
2519  *
2520  * \param[in] env       execution environment
2521  * \param[in] dt        object
2522  * \param[in] attr      attributes the stripes will be created with
2523  * \param[in] dof       format of stripes (see OSD API description)
2524  * \param[in] th        transaction handle
2525  * \param[in] declare   where to call "declare" or "execute" methods
2526  *
2527  * \retval              0 on success
2528  * \retval              negative if failed
2529  */
2530 static int lod_dir_striping_create_internal(const struct lu_env *env,
2531                                             struct dt_object *dt,
2532                                             struct lu_attr *attr,
2533                                             struct dt_object_format *dof,
2534                                             struct thandle *th,
2535                                             bool declare)
2536 {
2537         struct lod_thread_info  *info = lod_env_info(env);
2538         struct lod_object       *lo = lod_dt_obj(dt);
2539         int                     rc;
2540         ENTRY;
2541
2542         if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
2543                                  lo->ldo_dir_stripe_offset)) {
2544                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2545                 int stripe_count = lo->ldo_stripenr;
2546
2547                 if (info->lti_ea_store_size < sizeof(*v1)) {
2548                         rc = lod_ea_store_resize(info, sizeof(*v1));
2549                         if (rc != 0)
2550                                 RETURN(rc);
2551                         v1 = info->lti_ea_store;
2552                 }
2553
2554                 memset(v1, 0, sizeof(*v1));
2555                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2556                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2557                 v1->lum_stripe_offset =
2558                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
2559
2560                 info->lti_buf.lb_buf = v1;
2561                 info->lti_buf.lb_len = sizeof(*v1);
2562
2563                 if (declare)
2564                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
2565                                                        &info->lti_buf, dof, th);
2566                 else
2567                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2568                                                XATTR_NAME_LMV, 0, th);
2569                 if (rc != 0)
2570                         RETURN(rc);
2571         }
2572
2573         /* Transfer default LMV striping from the parent */
2574         if (lo->ldo_dir_def_striping_set &&
2575             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2576                                  lo->ldo_dir_def_stripe_offset)) {
2577                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2578                 int def_stripe_count = lo->ldo_dir_def_stripenr;
2579
2580                 if (info->lti_ea_store_size < sizeof(*v1)) {
2581                         rc = lod_ea_store_resize(info, sizeof(*v1));
2582                         if (rc != 0)
2583                                 RETURN(rc);
2584                         v1 = info->lti_ea_store;
2585                 }
2586
2587                 memset(v1, 0, sizeof(*v1));
2588                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2589                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2590                 v1->lum_stripe_offset =
2591                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2592                 v1->lum_hash_type =
2593                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
2594
2595                 info->lti_buf.lb_buf = v1;
2596                 info->lti_buf.lb_len = sizeof(*v1);
2597                 if (declare)
2598                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2599                                                        XATTR_NAME_DEFAULT_LMV,
2600                                                        0, th);
2601                 else
2602                         rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2603                                                   &info->lti_buf,
2604                                                   XATTR_NAME_DEFAULT_LMV, 0,
2605                                                   th);
2606                 if (rc != 0)
2607                         RETURN(rc);
2608         }
2609
2610         /* Transfer default LOV striping from the parent */
2611         if (lo->ldo_def_striping_set &&
2612             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2613                                  lo->ldo_def_stripenr,
2614                                  lo->ldo_def_stripe_offset,
2615                                  lo->ldo_pool)) {
2616                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2617
2618                 if (info->lti_ea_store_size < sizeof(*v3)) {
2619                         rc = lod_ea_store_resize(info, sizeof(*v3));
2620                         if (rc != 0)
2621                                 RETURN(rc);
2622                         v3 = info->lti_ea_store;
2623                 }
2624
2625                 memset(v3, 0, sizeof(*v3));
2626                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2627                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2628                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2629                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2630                 if (lo->ldo_pool != NULL)
2631                         strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2632                                 sizeof(v3->lmm_pool_name));
2633
2634                 info->lti_buf.lb_buf = v3;
2635                 info->lti_buf.lb_len = sizeof(*v3);
2636
2637                 if (declare)
2638                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2639                                                        XATTR_NAME_LOV, 0, th);
2640                 else
2641                         rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2642                                                       XATTR_NAME_LOV, 0, th);
2643                 if (rc != 0)
2644                         RETURN(rc);
2645         }
2646
2647         RETURN(0);
2648 }
2649
2650 static int lod_declare_dir_striping_create(const struct lu_env *env,
2651                                            struct dt_object *dt,
2652                                            struct lu_attr *attr,
2653                                            struct dt_object_format *dof,
2654                                            struct thandle *th)
2655 {
2656         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2657 }
2658
2659 static int lod_dir_striping_create(const struct lu_env *env,
2660                                    struct dt_object *dt,
2661                                    struct lu_attr *attr,
2662                                    struct dt_object_format *dof,
2663                                    struct thandle *th)
2664 {
2665         struct lod_object *lo = lod_dt_obj(dt);
2666         int rc;
2667
2668         rc = lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2669         if (rc == 0)
2670                 lo->ldo_striping_cached = 1;
2671
2672         return rc;
2673 }
2674
2675 /**
2676  * Implementation of dt_object_operations::do_xattr_set.
2677  *
2678  * Sets specified extended attribute on the object. Three types of EAs are
2679  * special:
2680  *   LOV EA - stores striping for a regular file or default striping (when set
2681  *            on a directory)
2682  *   LMV EA - stores a marker for the striped directories
2683  *   DMV EA - stores default directory striping
2684  *
2685  * When striping is applied to a non-striped existing object (this is called
2686  * late striping), then LOD notices the caller wants to turn the object into a
2687  * striped one. The stripe objects are created and appropriate EA is set:
2688  * LOV EA storing all the stripes directly or LMV EA storing just a small header
2689  * with striping configuration.
2690  *
2691  * \see dt_object_operations::do_xattr_set() in the API description for details.
2692  */
2693 static int lod_xattr_set(const struct lu_env *env,
2694                          struct dt_object *dt, const struct lu_buf *buf,
2695                          const char *name, int fl, struct thandle *th)
2696 {
2697         struct dt_object        *next = dt_object_child(dt);
2698         int                      rc;
2699         ENTRY;
2700
2701         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2702             strcmp(name, XATTR_NAME_LMV) == 0) {
2703                 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2704
2705                 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2706                                                 LMV_HASH_FLAG_MIGRATION)
2707                         rc = lod_sub_object_xattr_set(env, next, buf, name, fl,
2708                                                       th);
2709                 else
2710                         rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2711
2712                 RETURN(rc);
2713         }
2714
2715         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2716             strcmp(name, XATTR_NAME_LOV) == 0) {
2717                 /* default LOVEA */
2718                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th);
2719                 RETURN(rc);
2720         } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2721                    strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2722                 /* default LMVEA */
2723                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2724                                                       th);
2725                 RETURN(rc);
2726         } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2727                    !strcmp(name, XATTR_NAME_LOV)) {
2728                 /* in case of lov EA swap, just set it
2729                  * if not, it is a replay so check striping match what we
2730                  * already have during req replay, declare_xattr_set()
2731                  * defines striping, then create() does the work */
2732                 if (fl & LU_XATTR_REPLACE) {
2733                         /* free stripes, then update disk */
2734                         lod_object_free_striping(env, lod_dt_obj(dt));
2735
2736                         rc = lod_sub_object_xattr_set(env, next, buf, name,
2737                                                       fl, th);
2738                 } else if (dt_object_remote(dt)) {
2739                         /* This only happens during migration, see
2740                          * mdd_migrate_create(), in which Master MDT will
2741                          * create a remote target object, and only set
2742                          * (migrating) stripe EA on the remote object,
2743                          * and does not need creating each stripes. */
2744                         rc = lod_sub_object_xattr_set(env, next, buf, name,
2745                                                       fl, th);
2746                 } else {
2747                         rc = lod_striping_create(env, dt, NULL, NULL, th);
2748                 }
2749                 RETURN(rc);
2750         }
2751
2752         /* then all other xattr */
2753         rc = lod_xattr_set_internal(env, dt, buf, name, fl, th);
2754
2755         RETURN(rc);
2756 }
2757
2758 /**
2759  * Implementation of dt_object_operations::do_declare_xattr_del.
2760  *
2761  * \see dt_object_operations::do_declare_xattr_del() in the API description
2762  * for details.
2763  */
2764 static int lod_declare_xattr_del(const struct lu_env *env,
2765                                  struct dt_object *dt, const char *name,
2766                                  struct thandle *th)
2767 {
2768         struct lod_object       *lo = lod_dt_obj(dt);
2769         int                     rc;
2770         int                     i;
2771         ENTRY;
2772
2773         rc = lod_sub_object_declare_xattr_del(env, dt_object_child(dt),
2774                                               name, th);
2775         if (rc != 0)
2776                 RETURN(rc);
2777
2778         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2779                 RETURN(0);
2780
2781         /* set xattr to each stripes, if needed */
2782         rc = lod_load_striping(env, lo);
2783         if (rc != 0)
2784                 RETURN(rc);
2785
2786         if (lo->ldo_stripenr == 0)
2787                 RETURN(0);
2788
2789         for (i = 0; i < lo->ldo_stripenr; i++) {
2790                 LASSERT(lo->ldo_stripe[i]);
2791                 rc = lod_sub_object_declare_xattr_del(env, lo->ldo_stripe[i],
2792                                                       name, th);
2793                 if (rc != 0)
2794                         break;
2795         }
2796
2797         RETURN(rc);
2798 }
2799
2800 /**
2801  * Implementation of dt_object_operations::do_xattr_del.
2802  *
2803  * If EA storing a regular striping is being deleted, then release
2804  * all the references to the stripe objects in core.
2805  *
2806  * \see dt_object_operations::do_xattr_del() in the API description for details.
2807  */
2808 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2809                          const char *name, struct thandle *th)
2810 {
2811         struct dt_object        *next = dt_object_child(dt);
2812         struct lod_object       *lo = lod_dt_obj(dt);
2813         int                     rc;
2814         int                     i;
2815         ENTRY;
2816
2817         if (!strcmp(name, XATTR_NAME_LOV))
2818                 lod_object_free_striping(env, lod_dt_obj(dt));
2819
2820         rc = lod_sub_object_xattr_del(env, next, name, th);
2821         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
2822                 RETURN(rc);
2823
2824         if (lo->ldo_stripenr == 0)
2825                 RETURN(0);
2826
2827         for (i = 0; i < lo->ldo_stripenr; i++) {
2828                 LASSERT(lo->ldo_stripe[i]);
2829
2830                 rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, th);
2831                 if (rc != 0)
2832                         break;
2833         }
2834
2835         RETURN(rc);
2836 }
2837
2838 /**
2839  * Implementation of dt_object_operations::do_xattr_list.
2840  *
2841  * \see dt_object_operations::do_xattr_list() in the API description
2842  * for details.
2843  */
2844 static int lod_xattr_list(const struct lu_env *env,
2845                           struct dt_object *dt, const struct lu_buf *buf)
2846 {
2847         return dt_xattr_list(env, dt_object_child(dt), buf);
2848 }
2849
2850 /**
2851  * Initialize a pool the object belongs to.
2852  *
2853  * When a striped object is being created, striping configuration
2854  * may demand the stripes are allocated on a limited set of the
2855  * targets. These limited sets are known as "pools". So we copy
2856  * a pool name into the object and later actual creation methods
2857  * (like lod_object_create()) will use this information to allocate
2858  * the stripes properly.
2859  *
2860  * \param[in] o         object
2861  * \param[in] pool      pool name
2862  */
2863 int lod_object_set_pool(struct lod_object *o, char *pool)
2864 {
2865         int len;
2866
2867         if (o->ldo_pool) {
2868                 len = strlen(o->ldo_pool);
2869                 OBD_FREE(o->ldo_pool, len + 1);
2870                 o->ldo_pool = NULL;
2871         }
2872         if (pool) {
2873                 len = strlen(pool);
2874                 OBD_ALLOC(o->ldo_pool, len + 1);
2875                 if (o->ldo_pool == NULL)
2876                         return -ENOMEM;
2877                 strcpy(o->ldo_pool, pool);
2878         }
2879         return 0;
2880 }
2881
2882 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2883 {
2884         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2885 }
2886
2887
2888 /**
2889  * Cache default regular striping in the object.
2890  *
2891  * To improve performance of striped regular object creation we cache
2892  * default LOV striping (if it exists) in the parent directory object.
2893  *
2894  * \param[in] env               execution environment
2895  * \param[in] lp                object
2896  *
2897  * \retval              0 on success
2898  * \retval              negative if failed
2899  */
2900 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2901                                          struct lod_object *lp)
2902 {
2903         struct lod_thread_info  *info = lod_env_info(env);
2904         struct lov_user_md_v1   *v1 = NULL;
2905         struct lov_user_md_v3   *v3 = NULL;
2906         int                      rc;
2907         ENTRY;
2908
2909         /* called from MDD without parent being write locked,
2910          * lock it here */
2911         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2912         rc = lod_get_lov_ea(env, lp);
2913         if (rc < 0)
2914                 GOTO(unlock, rc);
2915
2916         if (rc < (typeof(rc))sizeof(struct lov_user_md)) {
2917                 /* don't lookup for non-existing or invalid striping */
2918                 lp->ldo_def_striping_set = 0;
2919                 lp->ldo_def_striping_cached = 1;
2920                 lp->ldo_def_stripe_size = 0;
2921                 lp->ldo_def_stripenr = 0;
2922                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2923                 GOTO(unlock, rc = 0);
2924         }
2925
2926         rc = 0;
2927         v1 = info->lti_ea_store;
2928         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2929                 lustre_swab_lov_user_md_v1(v1);
2930         } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2931                 v3 = (struct lov_user_md_v3 *)v1;
2932                 lustre_swab_lov_user_md_v3(v3);
2933         }
2934
2935         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2936                 GOTO(unlock, rc = 0);
2937
2938         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2939                 GOTO(unlock, rc = 0);
2940
2941         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2942                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2943                (int)v1->lmm_stripe_count,
2944                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2945
2946         lp->ldo_def_stripenr = v1->lmm_stripe_count;
2947         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2948         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2949         lp->ldo_def_striping_cached = 1;
2950         lp->ldo_def_striping_set = 1;
2951         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2952                 /* XXX: sanity check here */
2953                 v3 = (struct lov_user_md_v3 *) v1;
2954                 if (v3->lmm_pool_name[0])
2955                         lod_object_set_pool(lp, v3->lmm_pool_name);
2956         }
2957         EXIT;
2958 unlock:
2959         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2960         return rc;
2961 }
2962
2963
2964 /**
2965  * Cache default directory striping in the object.
2966  *
2967  * To improve performance of striped directory creation we cache default
2968  * directory striping (if it exists) in the parent directory object.
2969  *
2970  * \param[in] env               execution environment
2971  * \param[in] lp                object
2972  *
2973  * \retval              0 on success
2974  * \retval              negative if failed
2975  */
2976 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2977                                          struct lod_object *lp)
2978 {
2979         struct lod_thread_info  *info = lod_env_info(env);
2980         struct lmv_user_md_v1   *v1 = NULL;
2981         int                      rc;
2982         ENTRY;
2983
2984         /* called from MDD without parent being write locked,
2985          * lock it here */
2986         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2987         rc = lod_get_default_lmv_ea(env, lp);
2988         if (rc < 0)
2989                 GOTO(unlock, rc);
2990
2991         if (rc < (typeof(rc))sizeof(struct lmv_user_md)) {
2992                 /* don't lookup for non-existing or invalid striping */
2993                 lp->ldo_dir_def_striping_set = 0;
2994                 lp->ldo_dir_def_striping_cached = 1;
2995                 lp->ldo_dir_def_stripenr = 0;
2996                 lp->ldo_dir_def_stripe_offset =
2997                                         (typeof(v1->lum_stripe_offset))(-1);
2998                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2999                 GOTO(unlock, rc = 0);
3000         }
3001
3002         rc = 0;
3003         v1 = info->lti_ea_store;
3004
3005         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
3006         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
3007         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
3008         lp->ldo_dir_def_striping_set = 1;
3009         lp->ldo_dir_def_striping_cached = 1;
3010
3011         EXIT;
3012 unlock:
3013         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
3014         return rc;
3015 }
3016
3017 /**
3018  * Cache default striping in the object.
3019  *
3020  * To improve performance of striped object creation we cache default striping
3021  * (if it exists) in the parent directory object. We always cache default
3022  * striping for the regular files (stored in LOV EA) and we cache default
3023  * striping for the directories if requested by \a child_mode (when a new
3024  * directory is being created).
3025  *
3026  * \param[in] env               execution environment
3027  * \param[in] lp                object
3028  * \param[in] child_mode        new object's mode
3029  *
3030  * \retval              0 on success
3031  * \retval              negative if failed
3032  */
3033 static int lod_cache_parent_striping(const struct lu_env *env,
3034                                      struct lod_object *lp,
3035                                      umode_t child_mode)
3036 {
3037         int rc = 0;
3038         ENTRY;
3039
3040         if (!lp->ldo_def_striping_cached) {
3041                 /* we haven't tried to get default striping for
3042                  * the directory yet, let's cache it in the object */
3043                 rc = lod_cache_parent_lov_striping(env, lp);
3044                 if (rc != 0)
3045                         RETURN(rc);
3046         }
3047
3048         /* If the parent is on the remote MDT, we should always
3049          * try to refresh the default stripeEA cache, because we
3050          * do not cache default striping information for remote
3051          * object. */
3052         if (S_ISDIR(child_mode) && (!lp->ldo_dir_def_striping_cached ||
3053                                     dt_object_remote(&lp->ldo_obj)))
3054                 rc = lod_cache_parent_lmv_striping(env, lp);
3055
3056         RETURN(rc);
3057 }
3058
3059 /**
3060  * Implementation of dt_object_operations::do_ah_init.
3061  *
3062  * This method is used to make a decision on the striping configuration for the
3063  * object being created. It can be taken from the \a parent object if it exists,
3064  * or filesystem's default. The resulting configuration (number of stripes,
3065  * stripe size/offset, pool name, etc) is stored in the object itself and will
3066  * be used by the methods like ->doo_declare_create().
3067  *
3068  * \see dt_object_operations::do_ah_init() in the API description for details.
3069  */
3070 static void lod_ah_init(const struct lu_env *env,
3071                         struct dt_allocation_hint *ah,
3072                         struct dt_object *parent,
3073                         struct dt_object *child,
3074                         umode_t child_mode)
3075 {
3076         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
3077         struct dt_object  *nextp = NULL;
3078         struct dt_object  *nextc;
3079         struct lod_object *lp = NULL;
3080         struct lod_object *lc;
3081         struct lov_desc   *desc;
3082         int               rc;
3083         ENTRY;
3084
3085         LASSERT(child);
3086
3087         if (likely(parent)) {
3088                 nextp = dt_object_child(parent);
3089                 lp = lod_dt_obj(parent);
3090                 rc = lod_load_striping(env, lp);
3091                 if (rc != 0)
3092                         return;
3093         }
3094
3095         nextc = dt_object_child(child);
3096         lc = lod_dt_obj(child);
3097
3098         LASSERT(lc->ldo_stripenr == 0);
3099         LASSERT(lc->ldo_stripe == NULL);
3100
3101         /*
3102          * local object may want some hints
3103          * in case of late striping creation, ->ah_init()
3104          * can be called with local object existing
3105          */
3106         if (!dt_object_exists(nextc) || dt_object_remote(nextc)) {
3107                 struct dt_object *obj;
3108
3109                 obj = (nextp != NULL && dt_object_remote(nextp)) ? NULL : nextp;
3110                 nextc->do_ops->do_ah_init(env, ah, obj, nextc, child_mode);
3111         }
3112
3113         if (S_ISDIR(child_mode)) {
3114                 if (lc->ldo_dir_stripe == NULL) {
3115                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
3116                         if (lc->ldo_dir_stripe == NULL)
3117                                 return;
3118                 }
3119
3120                 LASSERT(lp != NULL);
3121                 if (lp->ldo_dir_stripe == NULL) {
3122                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
3123                         if (lp->ldo_dir_stripe == NULL)
3124                                 return;
3125                 }
3126
3127                 rc = lod_cache_parent_striping(env, lp, child_mode);
3128                 if (rc != 0)
3129                         return;
3130
3131                 /* transfer defaults to new directory */
3132                 if (lp->ldo_def_striping_set) {
3133                         if (lp->ldo_pool)
3134                                 lod_object_set_pool(lc, lp->ldo_pool);
3135                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
3136                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
3137                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
3138                         lc->ldo_def_striping_set = 1;
3139                         lc->ldo_def_striping_cached = 1;
3140                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
3141                                (int)lc->ldo_def_stripe_size,
3142                                (int)lc->ldo_def_stripe_offset,
3143                                (int)lc->ldo_def_stripenr);
3144                 }
3145
3146                 /* transfer dir defaults to new directory */
3147                 if (lp->ldo_dir_def_striping_set) {
3148                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
3149                         lc->ldo_dir_def_stripe_offset =
3150                                                   lp->ldo_dir_def_stripe_offset;
3151                         lc->ldo_dir_def_hash_type =
3152                                                   lp->ldo_dir_def_hash_type;
3153                         lc->ldo_dir_def_striping_set = 1;
3154                         lc->ldo_dir_def_striping_cached = 1;
3155                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
3156                                (int)lc->ldo_dir_def_stripenr,
3157                                (int)lc->ldo_dir_def_stripe_offset,
3158                                lc->ldo_dir_def_hash_type);
3159                 }
3160
3161                 /* It should always honour the specified stripes */
3162                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
3163                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
3164
3165                         rc = lod_verify_md_striping(d, lum1);
3166                         if (rc == 0 &&
3167                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
3168                                 /* Directory will be striped only if
3169                                  * stripe_count > 1 */
3170                                 lc->ldo_stripenr =
3171                                         le32_to_cpu(lum1->lum_stripe_count);
3172                                 lc->ldo_dir_stripe_offset =
3173                                         le32_to_cpu(lum1->lum_stripe_offset);
3174                                 lc->ldo_dir_hash_type =
3175                                         le32_to_cpu(lum1->lum_hash_type);
3176                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
3177                                        lc->ldo_stripenr,
3178                                        (int)lc->ldo_dir_stripe_offset);
3179                         }
3180                 /* then check whether there is default stripes from parent */
3181                 } else if (lp->ldo_dir_def_striping_set) {
3182                         /* If there are default dir stripe from parent */
3183                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
3184                         lc->ldo_dir_stripe_offset =
3185                                         lp->ldo_dir_def_stripe_offset;
3186                         lc->ldo_dir_hash_type =
3187                                         lp->ldo_dir_def_hash_type;
3188                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
3189                                lc->ldo_stripenr,
3190                                (int)lc->ldo_dir_stripe_offset);
3191                 } else {
3192                         /* set default stripe for this directory */
3193                         lc->ldo_stripenr = 0;
3194                         lc->ldo_dir_stripe_offset = -1;
3195                 }
3196
3197                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
3198                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
3199
3200                 goto out;
3201         }
3202
3203         /*
3204          * if object is going to be striped over OSTs, transfer default
3205          * striping information to the child, so that we can use it
3206          * during declaration and creation
3207          */
3208         if (!lod_object_will_be_striped(S_ISREG(child_mode),
3209                                         lu_object_fid(&child->do_lu)))
3210                 goto out;
3211         /*
3212          * try from the parent
3213          */
3214         if (likely(parent)) {
3215                 lod_cache_parent_striping(env, lp, child_mode);
3216
3217                 lc->ldo_def_stripe_offset = LOV_OFFSET_DEFAULT;
3218
3219                 if (lp->ldo_def_striping_set) {
3220                         if (lp->ldo_pool)
3221                                 lod_object_set_pool(lc, lp->ldo_pool);
3222                         lc->ldo_stripenr = lp->ldo_def_stripenr;
3223                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
3224                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
3225                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
3226                                lc->ldo_stripenr, lc->ldo_stripe_size,
3227                                lp->ldo_pool ? lp->ldo_pool : "");
3228                 }
3229         }
3230
3231         /*
3232          * if the parent doesn't provide with specific pattern, grab fs-wide one
3233          */
3234         desc = &d->lod_desc;
3235         if (lc->ldo_stripenr == 0)
3236                 lc->ldo_stripenr = desc->ld_default_stripe_count;
3237         if (lc->ldo_stripe_size == 0)
3238                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
3239         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
3240                lc->ldo_stripenr, lc->ldo_stripe_size,
3241                lc->ldo_pool ? lc->ldo_pool : "");
3242
3243 out:
3244         /* we do not cache stripe information for slave stripe, see
3245          * lod_xattr_set_lov_on_dir */
3246         if (lp != NULL && lp->ldo_dir_slave_stripe)
3247                 lod_lov_stripe_cache_clear(lp);
3248
3249         EXIT;
3250 }
3251
3252 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
3253 /**
3254  * Size initialization on late striping.
3255  *
3256  * Propagate the size of a truncated object to a deferred striping.
3257  * This function handles a special case when truncate was done on a
3258  * non-striped object and now while the striping is being created
3259  * we can't lose that size, so we have to propagate it to the stripes
3260  * being created.
3261  *
3262  * \param[in] env       execution environment
3263  * \param[in] dt        object
3264  * \param[in] th        transaction handle
3265  *
3266  * \retval              0 on success
3267  * \retval              negative if failed
3268  */
3269 static int lod_declare_init_size(const struct lu_env *env,
3270                                  struct dt_object *dt, struct thandle *th)
3271 {
3272         struct dt_object   *next = dt_object_child(dt);
3273         struct lod_object  *lo = lod_dt_obj(dt);
3274         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
3275         uint64_t            size, offs;
3276         int                 rc, stripe;
3277         ENTRY;
3278
3279         /* XXX: we support the simplest (RAID0) striping so far */
3280         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
3281         LASSERT(lo->ldo_stripe_size > 0);
3282
3283         rc = dt_attr_get(env, next, attr);
3284         LASSERT(attr->la_valid & LA_SIZE);
3285         if (rc)
3286                 RETURN(rc);
3287
3288         size = attr->la_size;
3289         if (size == 0)
3290                 RETURN(0);
3291
3292         /* ll_do_div64(a, b) returns a % b, and a = a / b */
3293         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
3294         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
3295
3296         size = size * lo->ldo_stripe_size;
3297         offs = attr->la_size;
3298         size += ll_do_div64(offs, lo->ldo_stripe_size);
3299
3300         attr->la_valid = LA_SIZE;
3301         attr->la_size = size;
3302
3303         rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[stripe], attr,
3304                                              th);
3305
3306         RETURN(rc);
3307 }
3308
3309 /**
3310  * Declare creation of striped object.
3311  *
3312  * The function declares creation stripes for a regular object. The function
3313  * also declares whether the stripes will be created with non-zero size if
3314  * previously size was set non-zero on the master object. If object \a dt is
3315  * not local, then only fully defined striping can be applied in \a lovea.
3316  * Otherwise \a lovea can be in the form of pattern, see lod_qos_parse_config()
3317  * for the details.
3318  *
3319  * \param[in] env       execution environment
3320  * \param[in] dt        object
3321  * \param[in] attr      attributes the stripes will be created with
3322  * \param[in] lovea     a buffer containing striping description
3323  * \param[in] th        transaction handle
3324  *
3325  * \retval              0 on success
3326  * \retval              negative if failed
3327  */
3328 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
3329                                struct lu_attr *attr,
3330                                const struct lu_buf *lovea, struct thandle *th)
3331 {
3332         struct lod_thread_info  *info = lod_env_info(env);
3333         struct dt_object        *next = dt_object_child(dt);
3334         struct lod_object       *lo = lod_dt_obj(dt);
3335         int                      rc;
3336         ENTRY;
3337
3338         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
3339                 /* failed to create striping, let's reset
3340                  * config so that others don't get confused */
3341                 lod_object_free_striping(env, lo);
3342                 GOTO(out, rc = -ENOMEM);
3343         }
3344
3345         if (!dt_object_remote(next)) {
3346                 /* choose OST and generate appropriate objects */
3347                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
3348                 if (rc) {
3349                         /* failed to create striping, let's reset
3350                          * config so that others don't get confused */
3351                         lod_object_free_striping(env, lo);
3352                         GOTO(out, rc);
3353                 }
3354
3355                 /*
3356                  * declare storage for striping data
3357                  */
3358                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
3359                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
3360         } else {
3361                 /* LOD can not choose OST objects for remote objects, i.e.
3362                  * stripes must be ready before that. Right now, it can only
3363                  * happen during migrate, i.e. migrate process needs to create
3364                  * remote regular file (mdd_migrate_create), then the migrate
3365                  * process will provide stripeEA. */
3366                 LASSERT(lovea != NULL);
3367                 info->lti_buf = *lovea;
3368         }
3369
3370         rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
3371                                               XATTR_NAME_LOV, 0, th);
3372         if (rc)
3373                 GOTO(out, rc);
3374
3375         /*
3376          * if striping is created with local object's size > 0,
3377          * we have to propagate this size to specific object
3378          * the case is possible only when local object was created previously
3379          */
3380         if (dt_object_exists(next))
3381                 rc = lod_declare_init_size(env, dt, th);
3382
3383 out:
3384         RETURN(rc);
3385 }
3386
3387 /**
3388  * Implementation of dt_object_operations::do_declare_create.
3389  *
3390  * The method declares creation of a new object. If the object will be striped,
3391  * then helper functions are called to find FIDs for the stripes, declare
3392  * creation of the stripes and declare initialization of the striping
3393  * information to be stored in the master object.
3394  *
3395  * \see dt_object_operations::do_declare_create() in the API description
3396  * for details.
3397  */
3398 static int lod_declare_object_create(const struct lu_env *env,
3399                                      struct dt_object *dt,
3400                                      struct lu_attr *attr,
3401                                      struct dt_allocation_hint *hint,
3402                                      struct dt_object_format *dof,
3403                                      struct thandle *th)
3404 {
3405         struct dt_object   *next = dt_object_child(dt);
3406         struct lod_object  *lo = lod_dt_obj(dt);
3407         int                 rc;
3408         ENTRY;
3409
3410         LASSERT(dof);
3411         LASSERT(attr);
3412         LASSERT(th);
3413
3414         /*
3415          * first of all, we declare creation of local object
3416          */
3417         rc = lod_sub_object_declare_create(env, next, attr, hint, dof, th);
3418         if (rc != 0)
3419                 GOTO(out, rc);
3420
3421         if (dof->dof_type == DFT_SYM)
3422                 dt->do_body_ops = &lod_body_lnk_ops;
3423
3424         /*
3425          * it's lod_ah_init() that has decided the object will be striped
3426          */
3427         if (dof->dof_type == DFT_REGULAR) {
3428                 /* callers don't want stripes */
3429                 /* XXX: all tricky interactions with ->ah_make_hint() decided
3430                  * to use striping, then ->declare_create() behaving differently
3431                  * should be cleaned */
3432                 if (dof->u.dof_reg.striped == 0)
3433                         lo->ldo_stripenr = 0;
3434                 if (lo->ldo_stripenr > 0)
3435                         rc = lod_declare_striped_object(env, dt, attr,
3436                                                         NULL, th);
3437         } else if (dof->dof_type == DFT_DIR) {
3438                 struct seq_server_site *ss;
3439
3440                 ss = lu_site2seq(dt->do_lu.lo_dev->ld_site);
3441
3442                 /* If the parent has default stripeEA, and client
3443                  * did not find it before sending create request,
3444                  * then MDT will return -EREMOTE, and client will
3445                  * retrieve the default stripeEA and re-create the
3446                  * sub directory.
3447                  *
3448                  * Note: if dah_eadata != NULL, it means creating the
3449                  * striped directory with specified stripeEA, then it
3450                  * should ignore the default stripeEA */
3451                 if ((hint == NULL || hint->dah_eadata == NULL) &&
3452                     lo->ldo_dir_stripe_offset != -1 &&
3453                     lo->ldo_dir_stripe_offset != ss->ss_node_id)
3454                         GOTO(out, rc = -EREMOTE);
3455
3456                 /* Orphan object (like migrating object) does not have
3457                  * lod_dir_stripe, see lod_ah_init */
3458                 if (lo->ldo_dir_stripe != NULL)
3459                         rc = lod_declare_dir_striping_create(env, dt, attr,
3460                                                              dof, th);
3461         }
3462 out:
3463         RETURN(rc);
3464 }
3465
3466 /**
3467  * Creation of a striped regular object.
3468  *
3469  * The function is called to create the stripe objects for a regular
3470  * striped file. This can happen at the initial object creation or
3471  * when the caller asks LOD to do so using ->do_xattr_set() method
3472  * (so called late striping). Notice all the information are already
3473  * prepared in the form of the list of objects (ldo_stripe field).
3474  * This is done during declare phase.
3475  *
3476  * \param[in] env       execution environment
3477  * \param[in] dt        object
3478  * \param[in] attr      attributes the stripes will be created with
3479  * \param[in] dof       format of stripes (see OSD API description)
3480  * \param[in] th        transaction handle
3481  *
3482  * \retval              0 on success
3483  * \retval              negative if failed
3484  */
3485 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
3486                         struct lu_attr *attr, struct dt_object_format *dof,
3487                         struct thandle *th)
3488 {
3489         struct lod_object *lo = lod_dt_obj(dt);
3490         int                rc = 0, i;
3491         ENTRY;
3492
3493         LASSERT(lo->ldo_striping_cached == 0);
3494
3495         /* create all underlying objects */
3496         for (i = 0; i < lo->ldo_stripenr; i++) {
3497                 LASSERT(lo->ldo_stripe[i]);
3498                 rc = lod_sub_object_create(env, lo->ldo_stripe[i], attr, NULL,
3499                                            dof, th);
3500                 if (rc)
3501                         break;
3502         }
3503
3504         if (rc == 0) {
3505                 rc = lod_generate_and_set_lovea(env, lo, th);
3506                 if (rc == 0)
3507                         lo->ldo_striping_cached = 1;
3508         }
3509
3510         RETURN(rc);
3511 }
3512
3513 /**
3514  * Implementation of dt_object_operations::do_create.
3515  *
3516  * If any of preceeding methods (like ->do_declare_create(),
3517  * ->do_ah_init(), etc) chose to create a striped object,
3518  * then this method will create the master and the stripes.
3519  *
3520  * \see dt_object_operations::do_create() in the API description for details.
3521  */
3522 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
3523                              struct lu_attr *attr,
3524                              struct dt_allocation_hint *hint,
3525                              struct dt_object_format *dof, struct thandle *th)
3526 {
3527         struct lod_object  *lo = lod_dt_obj(dt);
3528         int                 rc;
3529         ENTRY;
3530
3531         /* create local object */
3532         rc = lod_sub_object_create(env, dt_object_child(dt), attr, hint, dof,
3533                                    th);
3534         if (rc != 0)
3535                 RETURN(rc);
3536
3537         if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
3538             lo->ldo_stripe && dof->u.dof_reg.striped != 0)
3539                 rc = lod_striping_create(env, dt, attr, dof, th);
3540
3541         RETURN(rc);
3542 }
3543
3544 /**
3545  * Implementation of dt_object_operations::do_declare_destroy.
3546  *
3547  * If the object is a striped directory, then the function declares reference
3548  * removal from the master object (this is an index) to the stripes and declares
3549  * destroy of all the stripes. In all the cases, it declares an intention to
3550  * destroy the object itself.
3551  *
3552  * \see dt_object_operations::do_declare_destroy() in the API description
3553  * for details.
3554  */
3555 static int lod_declare_object_destroy(const struct lu_env *env,
3556                                       struct dt_object *dt,
3557                                       struct thandle *th)
3558 {
3559         struct dt_object   *next = dt_object_child(dt);
3560         struct lod_object  *lo = lod_dt_obj(dt);
3561         struct lod_thread_info *info = lod_env_info(env);
3562         char               *stripe_name = info->lti_key;
3563         int                 rc, i;
3564         ENTRY;
3565
3566         /*
3567          * load striping information, notice we don't do this when object
3568          * is being initialized as we don't need this information till
3569          * few specific cases like destroy, chown
3570          */
3571         rc = lod_load_striping(env, lo);
3572         if (rc)
3573                 RETURN(rc);
3574
3575         /* declare destroy for all underlying objects */
3576         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
3577                 rc = next->do_ops->do_index_try(env, next,
3578                                                 &dt_directory_features);
3579                 if (rc != 0)
3580                         RETURN(rc);
3581
3582                 for (i = 0; i < lo->ldo_stripenr; i++) {
3583                         rc = lod_sub_object_declare_ref_del(env, next, th);
3584                         if (rc != 0)
3585                                 RETURN(rc);
3586
3587                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
3588                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
3589                                 i);
3590                         rc = lod_sub_object_declare_delete(env, next,
3591                                         (const struct dt_key *)stripe_name, th);
3592                         if (rc != 0)
3593                                 RETURN(rc);
3594                 }
3595         }
3596
3597         /*
3598          * we declare destroy for the local object
3599          */
3600         rc = lod_sub_object_declare_destroy(env, next, th);
3601         if (rc)
3602                 RETURN(rc);
3603
3604         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3605                 RETURN(0);
3606
3607         /* declare destroy all striped objects */
3608         for (i = 0; i < lo->ldo_stripenr; i++) {
3609                 if (lo->ldo_stripe[i] == NULL)
3610                         continue;
3611
3612                 if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
3613                         rc = lod_sub_object_declare_ref_del(env,
3614                                         lo->ldo_stripe[i], th);
3615
3616                 rc = lod_sub_object_declare_destroy(env, lo->ldo_stripe[i],
3617                                         th);
3618                 if (rc != 0)
3619                         break;
3620         }
3621
3622         RETURN(rc);
3623 }
3624
3625 /**
3626  * Implementation of dt_object_operations::do_destroy.
3627  *
3628  * If the object is a striped directory, then the function removes references
3629  * from the master object (this is an index) to the stripes and destroys all
3630  * the stripes. In all the cases, the function destroys the object itself.
3631  *
3632  * \see dt_object_operations::do_destroy() in the API description for details.
3633  */
3634 static int lod_object_destroy(const struct lu_env *env,
3635                 struct dt_object *dt, struct thandle *th)
3636 {
3637         struct dt_object  *next = dt_object_child(dt);
3638         struct lod_object *lo = lod_dt_obj(dt);
3639         struct lod_thread_info *info = lod_env_info(env);
3640         char               *stripe_name = info->lti_key;
3641         unsigned int       i;
3642         int                rc;
3643         ENTRY;
3644
3645         /* destroy sub-stripe of master object */
3646         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
3647                 rc = next->do_ops->do_index_try(env, next,
3648                                                 &dt_directory_features);
3649                 if (rc != 0)
3650                         RETURN(rc);
3651
3652                 for (i = 0; i < lo->ldo_stripenr; i++) {
3653                         rc = lod_sub_object_ref_del(env, next, th);
3654                         if (rc != 0)
3655                                 RETURN(rc);
3656
3657                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
3658                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
3659                                 i);
3660
3661                         CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
3662                                PFID(lu_object_fid(&dt->do_lu)), stripe_name,
3663                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
3664
3665                         rc = lod_sub_object_delete(env, next,
3666                                        (const struct dt_key *)stripe_name, th);
3667                         if (rc != 0)
3668                                 RETURN(rc);
3669                 }
3670         }
3671
3672         rc = lod_sub_object_destroy(env, next, th);
3673         if (rc != 0)
3674                 RETURN(rc);
3675
3676         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3677                 RETURN(0);
3678
3679         /* destroy all striped objects */
3680         for (i = 0; i < lo->ldo_stripenr; i++) {
3681                 if (likely(lo->ldo_stripe[i] != NULL) &&
3682                     (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
3683                      i == cfs_fail_val)) {
3684                         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
3685                                 dt_write_lock(env, lo->ldo_stripe[i],
3686                                               MOR_TGT_CHILD);
3687                                 rc = lod_sub_object_ref_del(env,
3688                                                 lo->ldo_stripe[i], th);
3689                                 dt_write_unlock(env, lo->ldo_stripe[i]);
3690                                 if (rc != 0)
3691                                         break;
3692                         }
3693
3694                         rc = lod_sub_object_destroy(env, lo->ldo_stripe[i], th);
3695                         if (rc != 0)
3696                                 break;
3697                 }
3698         }
3699
3700         RETURN(rc);
3701 }
3702
3703 /**
3704  * Implementation of dt_object_operations::do_declare_ref_add.
3705  *
3706  * \see dt_object_operations::do_declare_ref_add() in the API description
3707  * for details.
3708  */
3709 static int lod_declare_ref_add(const struct lu_env *env,
3710                                struct dt_object *dt, struct thandle *th)
3711 {
3712         return lod_sub_object_declare_ref_add(env, dt_object_child(dt), th);
3713 }
3714
3715 /**
3716  * Implementation of dt_object_operations::do_ref_add.
3717  *
3718  * \see dt_object_operations::do_ref_add() in the API description for details.
3719  */
3720 static int lod_ref_add(const struct lu_env *env,
3721                        struct dt_object *dt, struct thandle *th)
3722 {
3723         return lod_sub_object_ref_add(env, dt_object_child(dt), th);
3724 }
3725
3726 /**
3727  * Implementation of dt_object_operations::do_declare_ref_del.
3728  *
3729  * \see dt_object_operations::do_declare_ref_del() in the API description
3730  * for details.
3731  */
3732 static int lod_declare_ref_del(const struct lu_env *env,
3733                                struct dt_object *dt, struct thandle *th)
3734 {
3735         return lod_sub_object_declare_ref_del(env, dt_object_child(dt), th);
3736 }
3737
3738 /**
3739  * Implementation of dt_object_operations::do_ref_del
3740  *
3741  * \see dt_object_operations::do_ref_del() in the API description for details.
3742  */
3743 static int lod_ref_del(const struct lu_env *env,
3744                        struct dt_object *dt, struct thandle *th)
3745 {
3746         return lod_sub_object_ref_del(env, dt_object_child(dt), th);
3747 }
3748
3749 /**
3750  * Implementation of dt_object_operations::do_object_sync.
3751  *
3752  * \see dt_object_operations::do_object_sync() in the API description
3753  * for details.
3754  */
3755 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
3756                            __u64 start, __u64 end)
3757 {
3758         return dt_object_sync(env, dt_object_child(dt), start, end);
3759 }
3760
3761 struct lod_slave_locks  {
3762         int                     lsl_lock_count;
3763         struct lustre_handle    lsl_handle[0];
3764 };
3765
3766 /**
3767  * Release LDLM locks on the stripes of a striped directory.
3768  *
3769  * Iterates over all the locks taken on the stripe objects and
3770  * release them using ->do_object_unlock() method.
3771  *
3772  * \param[in] env       execution environment
3773  * \param[in] dt        striped object
3774  * \param[in] einfo     lock description
3775  * \param[in] policy    data describing requested lock
3776  *
3777  * \retval              0 on success
3778  * \retval              negative if failed
3779  */
3780 static int lod_object_unlock_internal(const struct lu_env *env,
3781                                       struct dt_object *dt,
3782                                       struct ldlm_enqueue_info *einfo,
3783                                       ldlm_policy_data_t *policy)
3784 {
3785         struct lod_object       *lo = lod_dt_obj(dt);
3786         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
3787         int                     rc = 0;
3788         int                     i;
3789         ENTRY;
3790
3791         if (slave_locks == NULL)
3792                 RETURN(0);
3793
3794         for (i = 1; i < slave_locks->lsl_lock_count; i++) {
3795                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
3796                         int     rc1;
3797
3798                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
3799                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
3800                                                policy);
3801                         if (rc1 < 0)
3802                                 rc = rc == 0 ? rc1 : rc;
3803                 }
3804         }
3805
3806         RETURN(rc);
3807 }
3808
3809 /**
3810  * Implementation of dt_object_operations::do_object_unlock.
3811  *
3812  * Used to release LDLM lock(s).
3813  *
3814  * \see dt_object_operations::do_object_unlock() in the API description
3815  * for details.
3816  */
3817 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
3818                              struct ldlm_enqueue_info *einfo,
3819                              union ldlm_policy_data *policy)
3820 {
3821         struct lod_object       *lo = lod_dt_obj(dt);
3822         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
3823         int                     slave_locks_size;
3824         int                     rc;
3825         ENTRY;
3826
3827         if (slave_locks == NULL)
3828                 RETURN(0);
3829
3830         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3831                 RETURN(-ENOTDIR);
3832
3833         rc = lod_load_striping(env, lo);
3834         if (rc != 0)
3835                 RETURN(rc);
3836
3837         /* Note: for remote lock for single stripe dir, MDT will cancel
3838          * the lock by lockh directly */
3839         if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
3840                 RETURN(0);
3841
3842         /* Only cancel slave lock for striped dir */
3843         rc = lod_object_unlock_internal(env, dt, einfo, policy);
3844
3845         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
3846                            sizeof(slave_locks->lsl_handle[0]);
3847         OBD_FREE(slave_locks, slave_locks_size);
3848         einfo->ei_cbdata = NULL;
3849
3850         RETURN(rc);
3851 }
3852
3853 /**
3854  * Implementation of dt_object_operations::do_object_lock.
3855  *
3856  * Used to get LDLM lock on the non-striped and striped objects.
3857  *
3858  * \see dt_object_operations::do_object_lock() in the API description
3859  * for details.
3860  */
3861 static int lod_object_lock(const struct lu_env *env,
3862                            struct dt_object *dt,
3863                            struct lustre_handle *lh,
3864                            struct ldlm_enqueue_info *einfo,
3865                            union ldlm_policy_data *policy)
3866 {
3867         struct lod_object       *lo = lod_dt_obj(dt);
3868         int                     rc = 0;
3869         int                     i;
3870         int                     slave_locks_size;
3871         struct lod_slave_locks  *slave_locks = NULL;
3872         ENTRY;
3873
3874         /* remote object lock */
3875         if (!einfo->ei_enq_slave) {
3876                 LASSERT(dt_object_remote(dt));
3877                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
3878                                       policy);
3879         }
3880
3881         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3882                 RETURN(-ENOTDIR);
3883
3884         rc = lod_load_striping(env, lo);
3885         if (rc != 0)
3886                 RETURN(rc);
3887
3888         /* No stripes */
3889         if (lo->ldo_stripenr <= 1)
3890                 RETURN(0);
3891
3892         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
3893                            sizeof(slave_locks->lsl_handle[0]);
3894         /* Freed in lod_object_unlock */
3895         OBD_ALLOC(slave_locks, slave_locks_size);
3896         if (slave_locks == NULL)
3897                 RETURN(-ENOMEM);
3898         slave_locks->lsl_lock_count = lo->ldo_stripenr;
3899
3900         /* striped directory lock */
3901         for (i = 1; i < lo->ldo_stripenr; i++) {
3902                 struct lustre_handle    lockh;
3903                 struct ldlm_res_id      *res_id;
3904
3905                 res_id = &lod_env_info(env)->lti_res_id;
3906                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3907                                        res_id);
3908                 einfo->ei_res_id = res_id;
3909
3910                 LASSERT(lo->ldo_stripe[i]);
3911                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3912                                     policy);
3913                 if (rc != 0)
3914                         GOTO(out, rc);
3915                 slave_locks->lsl_handle[i] = lockh;
3916         }
3917
3918         einfo->ei_cbdata = slave_locks;
3919
3920 out:
3921         if (rc != 0 && slave_locks != NULL) {
3922                 einfo->ei_cbdata = slave_locks;
3923                 lod_object_unlock_internal(env, dt, einfo, policy);
3924                 OBD_FREE(slave_locks, slave_locks_size);
3925                 einfo->ei_cbdata = NULL;
3926         }
3927
3928         RETURN(rc);
3929 }
3930
3931 struct dt_object_operations lod_obj_ops = {
3932         .do_read_lock           = lod_object_read_lock,
3933         .do_write_lock          = lod_object_write_lock,
3934         .do_read_unlock         = lod_object_read_unlock,
3935         .do_write_unlock        = lod_object_write_unlock,
3936         .do_write_locked        = lod_object_write_locked,
3937         .do_attr_get            = lod_attr_get,
3938         .do_declare_attr_set    = lod_declare_attr_set,
3939         .do_attr_set            = lod_attr_set,
3940         .do_xattr_get           = lod_xattr_get,
3941         .do_declare_xattr_set   = lod_declare_xattr_set,
3942         .do_xattr_set           = lod_xattr_set,
3943         .do_declare_xattr_del   = lod_declare_xattr_del,
3944         .do_xattr_del           = lod_xattr_del,
3945         .do_xattr_list          = lod_xattr_list,
3946         .do_ah_init             = lod_ah_init,
3947         .do_declare_create      = lod_declare_object_create,
3948         .do_create              = lod_object_create,
3949         .do_declare_destroy     = lod_declare_object_destroy,
3950         .do_destroy             = lod_object_destroy,
3951         .do_index_try           = lod_index_try,
3952         .do_declare_ref_add     = lod_declare_ref_add,
3953         .do_ref_add             = lod_ref_add,
3954         .do_declare_ref_del     = lod_declare_ref_del,
3955         .do_ref_del             = lod_ref_del,
3956         .do_object_sync         = lod_object_sync,
3957         .do_object_lock         = lod_object_lock,
3958         .do_object_unlock       = lod_object_unlock,
3959 };
3960
3961 /**
3962  * Implementation of dt_body_operations::dbo_read.
3963  *
3964  * \see dt_body_operations::dbo_read() in the API description for details.
3965  */
3966 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3967                         struct lu_buf *buf, loff_t *pos)
3968 {
3969         struct dt_object *next = dt_object_child(dt);
3970         return next->do_body_ops->dbo_read(env, next, buf, pos);
3971 }
3972
3973 /**
3974  * Implementation of dt_body_operations::dbo_declare_write.
3975  *
3976  * \see dt_body_operations::dbo_declare_write() in the API description
3977  * for details.
3978  */
3979 static ssize_t lod_declare_write(const struct lu_env *env,
3980                                  struct dt_object *dt,
3981                                  const struct lu_buf *buf, loff_t pos,
3982                                  struct thandle *th)
3983 {
3984         return lod_sub_object_declare_write(env, dt_object_child(dt), buf, pos,
3985                                             th);
3986 }
3987
3988 /**
3989  * Implementation of dt_body_operations::dbo_write.
3990  *
3991  * \see dt_body_operations::dbo_write() in the API description for details.
3992  */
3993 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3994                          const struct lu_buf *buf, loff_t *pos,
3995                          struct thandle *th, int iq)
3996 {
3997         return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq);
3998 }
3999
4000 static const struct dt_body_operations lod_body_lnk_ops = {
4001         .dbo_read               = lod_read,
4002         .dbo_declare_write      = lod_declare_write,
4003         .dbo_write              = lod_write
4004 };
4005
4006 /**
4007  * Implementation of lu_object_operations::loo_object_init.
4008  *
4009  * The function determines the type and the index of the target device using
4010  * sequence of the object's FID. Then passes control down to the
4011  * corresponding device:
4012  *  OSD for the local objects, OSP for remote
4013  *
4014  * \see lu_object_operations::loo_object_init() in the API description
4015  * for details.
4016  */
4017 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
4018                            const struct lu_object_conf *conf)
4019 {
4020         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
4021         struct lu_device        *cdev   = NULL;
4022         struct lu_object        *cobj;
4023         struct lod_tgt_descs    *ltd    = NULL;
4024         struct lod_tgt_desc     *tgt;
4025         u32                      idx    = 0;
4026         int                      type   = LU_SEQ_RANGE_ANY;
4027         int                      rc;
4028         ENTRY;
4029
4030         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
4031         if (rc != 0) {
4032                 /* Note: Sometimes, it will Return EAGAIN here, see
4033                  * ptrlpc_import_delay_req(), which might confuse
4034                  * lu_object_find_at() and make it wait there incorrectly.
4035                  * so we convert it to EIO here.*/
4036                 if (rc == -EAGAIN)
4037                         rc = -EIO;
4038
4039                 RETURN(rc);
4040         }
4041
4042         if (type == LU_SEQ_RANGE_MDT &&
4043             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
4044                 cdev = &lod->lod_child->dd_lu_dev;
4045         } else if (type == LU_SEQ_RANGE_MDT) {
4046                 ltd = &lod->lod_mdt_descs;
4047                 lod_getref(ltd);
4048         } else if (type == LU_SEQ_RANGE_OST) {
4049                 ltd = &lod->lod_ost_descs;
4050                 lod_getref(ltd);
4051         } else {
4052                 LBUG();
4053         }
4054
4055         if (ltd != NULL) {
4056                 if (ltd->ltd_tgts_size > idx &&
4057                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
4058                         tgt = LTD_TGT(ltd, idx);
4059
4060                         LASSERT(tgt != NULL);
4061                         LASSERT(tgt->ltd_tgt != NULL);
4062
4063                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
4064                 }
4065                 lod_putref(lod, ltd);
4066         }
4067
4068         if (unlikely(cdev == NULL))
4069                 RETURN(-ENOENT);
4070
4071         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
4072         if (unlikely(cobj == NULL))
4073                 RETURN(-ENOMEM);
4074
4075         lu_object_add(lo, cobj);
4076
4077         RETURN(0);
4078 }
4079
4080 /**
4081  *
4082  * Release resources associated with striping.
4083  *
4084  * If the object is striped (regular or directory), then release
4085  * the stripe objects references and free the ldo_stripe array.
4086  *
4087  * \param[in] env       execution environment
4088  * \param[in] lo        object
4089  */
4090 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
4091 {
4092         int i;
4093
4094         if (lo->ldo_dir_stripe != NULL) {
4095                 OBD_FREE_PTR(lo->ldo_dir_stripe);
4096                 lo->ldo_dir_stripe = NULL;
4097         }
4098
4099         if (lo->ldo_stripe) {
4100                 LASSERT(lo->ldo_stripes_allocated > 0);
4101
4102                 for (i = 0; i < lo->ldo_stripenr; i++) {
4103                         if (lo->ldo_stripe[i])
4104                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
4105                 }
4106
4107                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
4108                 OBD_FREE(lo->ldo_stripe, i);
4109                 lo->ldo_stripe = NULL;
4110                 lo->ldo_stripes_allocated = 0;
4111         }
4112         lo->ldo_striping_cached = 0;
4113         lo->ldo_stripenr = 0;
4114         lo->ldo_pattern = 0;
4115 }
4116
4117 /**
4118  * Implementation of lu_object_operations::loo_object_start.
4119  *
4120  * \see lu_object_operations::loo_object_start() in the API description
4121  * for details.
4122  */
4123 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
4124 {
4125         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
4126                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
4127         return 0;
4128 }
4129
4130 /**
4131  * Implementation of lu_object_operations::loo_object_free.
4132  *
4133  * \see lu_object_operations::loo_object_free() in the API description
4134  * for details.
4135  */
4136 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
4137 {
4138         struct lod_object *mo = lu2lod_obj(o);
4139
4140         /*
4141          * release all underlying object pinned
4142          */
4143
4144         lod_object_free_striping(env, mo);
4145
4146         lod_object_set_pool(mo, NULL);
4147
4148         lu_object_fini(o);
4149         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
4150 }
4151
4152 /**
4153  * Implementation of lu_object_operations::loo_object_release.
4154  *
4155  * \see lu_object_operations::loo_object_release() in the API description
4156  * for details.
4157  */
4158 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
4159 {
4160         /* XXX: shouldn't we release everything here in case if object
4161          * creation failed before? */
4162 }
4163
4164 /**
4165  * Implementation of lu_object_operations::loo_object_print.
4166  *
4167  * \see lu_object_operations::loo_object_print() in the API description
4168  * for details.
4169  */
4170 static int lod_object_print(const struct lu_env *env, void *cookie,
4171                             lu_printer_t p, const struct lu_object *l)
4172 {
4173         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
4174
4175         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
4176 }
4177
4178 struct lu_object_operations lod_lu_obj_ops = {
4179         .loo_object_init        = lod_object_init,
4180         .loo_object_start       = lod_object_start,
4181         .loo_object_free        = lod_object_free,
4182         .loo_object_release     = lod_object_release,
4183         .loo_object_print       = lod_object_print,
4184 };