Whamcloud - gitweb
LU-4788 lfsck: replace cfs_list_t with list_head
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
48
49 #include "lod_internal.h"
50
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
53
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
56
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58                             struct dt_rec *rec, const struct dt_key *key,
59                             struct lustre_capa *capa)
60 {
61         struct dt_object *next = dt_object_child(dt);
62         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
63 }
64
65 static int lod_declare_index_insert(const struct lu_env *env,
66                                     struct dt_object *dt,
67                                     const struct dt_rec *rec,
68                                     const struct dt_key *key,
69                                     struct thandle *handle)
70 {
71         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
72 }
73
74 static int lod_index_insert(const struct lu_env *env,
75                             struct dt_object *dt,
76                             const struct dt_rec *rec,
77                             const struct dt_key *key,
78                             struct thandle *th,
79                             struct lustre_capa *capa,
80                             int ign)
81 {
82         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
83 }
84
85 static int lod_declare_index_delete(const struct lu_env *env,
86                                     struct dt_object *dt,
87                                     const struct dt_key *key,
88                                     struct thandle *th)
89 {
90         return dt_declare_delete(env, dt_object_child(dt), key, th);
91 }
92
93 static int lod_index_delete(const struct lu_env *env,
94                             struct dt_object *dt,
95                             const struct dt_key *key,
96                             struct thandle *th,
97                             struct lustre_capa *capa)
98 {
99         return dt_delete(env, dt_object_child(dt), key, th, capa);
100 }
101
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103                                  struct dt_object *dt, __u32 attr,
104                                  struct lustre_capa *capa)
105 {
106         struct dt_object        *next = dt_object_child(dt);
107         struct lod_it           *it = &lod_env_info(env)->lti_it;
108         struct dt_it            *it_next;
109
110
111         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
112         if (IS_ERR(it_next))
113                 return it_next;
114
115         /* currently we do not use more than one iterator per thread
116          * so we store it in thread info. if at some point we need
117          * more active iterators in a single thread, we can allocate
118          * additional ones */
119         LASSERT(it->lit_obj == NULL);
120
121         it->lit_it = it_next;
122         it->lit_obj = next;
123
124         return (struct dt_it *)it;
125 }
126
127 #define LOD_CHECK_IT(env, it)                                   \
128 do {                                                            \
129         LASSERT((it)->lit_obj != NULL);                         \
130         LASSERT((it)->lit_it != NULL);                          \
131 } while (0)
132
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
134 {
135         struct lod_it *it = (struct lod_it *)di;
136
137         LOD_CHECK_IT(env, it);
138         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
139
140         /* the iterator not in use any more */
141         it->lit_obj = NULL;
142         it->lit_it = NULL;
143 }
144
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146                const struct dt_key *key)
147 {
148         const struct lod_it *it = (const struct lod_it *)di;
149
150         LOD_CHECK_IT(env, it);
151         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
152 }
153
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
155 {
156         struct lod_it *it = (struct lod_it *)di;
157
158         LOD_CHECK_IT(env, it);
159         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
160 }
161
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
163 {
164         struct lod_it *it = (struct lod_it *)di;
165
166         LOD_CHECK_IT(env, it);
167         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
168 }
169
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
171 {
172         const struct lod_it *it = (const struct lod_it *)di;
173
174         LOD_CHECK_IT(env, it);
175         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
176 }
177
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
179 {
180         struct lod_it *it = (struct lod_it *)di;
181
182         LOD_CHECK_IT(env, it);
183         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
184 }
185
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187                struct dt_rec *rec, __u32 attr)
188 {
189         const struct lod_it *it = (const struct lod_it *)di;
190
191         LOD_CHECK_IT(env, it);
192         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
193                                                      attr);
194 }
195
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
197                     __u32 attr)
198 {
199         const struct lod_it *it = (const struct lod_it *)di;
200
201         LOD_CHECK_IT(env, it);
202         return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
203                                                           attr);
204 }
205
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
207 {
208         const struct lod_it *it = (const struct lod_it *)di;
209
210         LOD_CHECK_IT(env, it);
211         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
212 }
213
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
215 {
216         const struct lod_it *it = (const struct lod_it *)di;
217
218         LOD_CHECK_IT(env, it);
219         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
220 }
221
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
223                    void *key_rec)
224 {
225         const struct lod_it *it = (const struct lod_it *)di;
226
227         LOD_CHECK_IT(env, it);
228         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
229                                                          key_rec);
230 }
231
232 static struct dt_index_operations lod_index_ops = {
233         .dio_lookup             = lod_index_lookup,
234         .dio_declare_insert     = lod_declare_index_insert,
235         .dio_insert             = lod_index_insert,
236         .dio_declare_delete     = lod_declare_index_delete,
237         .dio_delete             = lod_index_delete,
238         .dio_it = {
239                 .init           = lod_it_init,
240                 .fini           = lod_it_fini,
241                 .get            = lod_it_get,
242                 .put            = lod_it_put,
243                 .next           = lod_it_next,
244                 .key            = lod_it_key,
245                 .key_size       = lod_it_key_size,
246                 .rec            = lod_it_rec,
247                 .rec_size       = lod_it_rec_size,
248                 .store          = lod_it_store,
249                 .load           = lod_it_load,
250                 .key_rec        = lod_it_key_rec,
251         }
252 };
253
254 /**
255  * Implementation of dt_index_operations:: dio_it.init
256  *
257  * This function is to initialize the iterator for striped directory,
258  * basically these lod_striped_it_xxx will just locate the stripe
259  * and call the correspondent api of its next lower layer.
260  *
261  * \param[in] env       execution environment.
262  * \param[in] dt        the striped directory object to be iterated.
263  * \param[in] attr      the attribute of iterator, mostly used to indicate
264  *                      the entry attribute in the object to be iterated.
265  * \param[in] capa      capability(useless in current implementation)
266  *
267  * \retval      initialized iterator(dt_it) if successful initialize the
268  *              iteration. lit_stripe_index will be used to indicate the
269  *              current iterate position among stripes.
270  * \retval      ERR pointer if initialization is failed.
271  */
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273                                          struct dt_object *dt, __u32 attr,
274                                          struct lustre_capa *capa)
275 {
276         struct lod_object       *lo = lod_dt_obj(dt);
277         struct dt_object        *next;
278         struct lod_it           *it = &lod_env_info(env)->lti_it;
279         struct dt_it            *it_next;
280         ENTRY;
281
282         LASSERT(lo->ldo_stripenr > 0);
283         next = lo->ldo_stripe[0];
284         LASSERT(next != NULL);
285         LASSERT(next->do_index_ops != NULL);
286
287         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
288         if (IS_ERR(it_next))
289                 return it_next;
290
291         /* currently we do not use more than one iterator per thread
292          * so we store it in thread info. if at some point we need
293          * more active iterators in a single thread, we can allocate
294          * additional ones */
295         LASSERT(it->lit_obj == NULL);
296
297         it->lit_stripe_index = 0;
298         it->lit_attr = attr;
299         it->lit_it = it_next;
300         it->lit_obj = dt;
301
302         return (struct dt_it *)it;
303 }
304
305 #define LOD_CHECK_STRIPED_IT(env, it, lo)                       \
306 do {                                                            \
307         LASSERT((it)->lit_obj != NULL);                         \
308         LASSERT((it)->lit_it != NULL);                          \
309         LASSERT((lo)->ldo_stripenr > 0);                        \
310         LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr);   \
311 } while (0)
312
313 /**
314  * Implementation of dt_index_operations:: dio_it.fini
315  *
316  * This function is to finish the iterator for striped directory.
317  *
318  * \param[in] env       execution environment.
319  * \param[in] di        the iterator for the striped directory
320  *
321  */
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
323 {
324         struct lod_it           *it = (struct lod_it *)di;
325         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
326         struct dt_object        *next;
327
328         LOD_CHECK_STRIPED_IT(env, it, lo);
329
330         next = lo->ldo_stripe[it->lit_stripe_index];
331         LASSERT(next != NULL);
332         LASSERT(next->do_index_ops != NULL);
333
334         next->do_index_ops->dio_it.fini(env, it->lit_it);
335
336         /* the iterator not in use any more */
337         it->lit_obj = NULL;
338         it->lit_it = NULL;
339         it->lit_stripe_index = 0;
340 }
341
342 /**
343  * Implementation of dt_index_operations:: dio_it.get
344  *
345  * This function is to position the iterator with given key
346  *
347  * \param[in] env       execution environment.
348  * \param[in] di        the iterator for striped directory.
349  * \param[in] key       the key the iterator will be positioned.
350  *
351  * \retval      0 if successfully position iterator by the key.
352  * \retval      negative error if position is failed.
353  */
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355                               const struct dt_key *key)
356 {
357         const struct lod_it     *it = (const struct lod_it *)di;
358         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
359         struct dt_object        *next;
360         ENTRY;
361
362         LOD_CHECK_STRIPED_IT(env, it, lo);
363
364         next = lo->ldo_stripe[it->lit_stripe_index];
365         LASSERT(next != NULL);
366         LASSERT(next->do_index_ops != NULL);
367
368         return next->do_index_ops->dio_it.get(env, it->lit_it, key);
369 }
370
371 /**
372  * Implementation of dt_index_operations:: dio_it.put
373  *
374  * This function is supposed to be the pair of it_get, but currently do
375  * nothing. see (osd_it_ea_put or osd_index_it_put)
376  */
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
378 {
379         struct lod_it           *it = (struct lod_it *)di;
380         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
381         struct dt_object        *next;
382
383         LOD_CHECK_STRIPED_IT(env, it, lo);
384
385         next = lo->ldo_stripe[it->lit_stripe_index];
386         LASSERT(next != NULL);
387         LASSERT(next->do_index_ops != NULL);
388
389         return next->do_index_ops->dio_it.put(env, it->lit_it);
390 }
391
392 /**
393  * Implementation of dt_index_operations:: dio_it.next
394  *
395  * This function is to position the iterator to the next entry, if current
396  * stripe is finished by checking the return value of next() in current
397  * stripe. it will go to next stripe. In the mean time, the sub-iterator
398  * for next stripe needs to be initialized.
399  *
400  * \param[in] env       execution environment.
401  * \param[in] di        the iterator for striped directory.
402  *
403  * \retval      0 if successfully position iterator to the next entry.
404  * \retval      negative error if position is failed.
405  */
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
407 {
408         struct lod_it           *it = (struct lod_it *)di;
409         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
410         struct dt_object        *next;
411         struct dt_it            *it_next;
412         int                     rc;
413         ENTRY;
414
415         LOD_CHECK_STRIPED_IT(env, it, lo);
416
417         next = lo->ldo_stripe[it->lit_stripe_index];
418         LASSERT(next != NULL);
419         LASSERT(next->do_index_ops != NULL);
420 again:
421         rc = next->do_index_ops->dio_it.next(env, it->lit_it);
422         if (rc < 0)
423                 RETURN(rc);
424
425         if (rc == 0 && it->lit_stripe_index == 0)
426                 RETURN(rc);
427
428         if (rc == 0 && it->lit_stripe_index > 0) {
429                 struct lu_dirent *ent;
430
431                 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
432
433                 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434                                                     (struct dt_rec *)ent,
435                                                     it->lit_attr);
436                 if (rc != 0)
437                         RETURN(rc);
438
439                 /* skip . and .. for slave stripe */
440                 if ((strncmp(ent->lde_name, ".",
441                              le16_to_cpu(ent->lde_namelen)) == 0 &&
442                      le16_to_cpu(ent->lde_namelen) == 1) ||
443                     (strncmp(ent->lde_name, "..",
444                              le16_to_cpu(ent->lde_namelen)) == 0 &&
445                      le16_to_cpu(ent->lde_namelen) == 2))
446                         goto again;
447
448                 RETURN(rc);
449         }
450
451         /* go to next stripe */
452         if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
453                 RETURN(1);
454
455         it->lit_stripe_index++;
456
457         next->do_index_ops->dio_it.put(env, it->lit_it);
458         next->do_index_ops->dio_it.fini(env, it->lit_it);
459
460         rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
461         if (rc != 0)
462                 RETURN(rc);
463
464         next = lo->ldo_stripe[it->lit_stripe_index];
465         LASSERT(next != NULL);
466         LASSERT(next->do_index_ops != NULL);
467
468         it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
469                                                   BYPASS_CAPA);
470         if (!IS_ERR(it_next)) {
471                 it->lit_it = it_next;
472                 goto again;
473         } else {
474                 rc = PTR_ERR(it_next);
475         }
476
477         RETURN(rc);
478 }
479
480 /**
481  * Implementation of dt_index_operations:: dio_it.key
482  *
483  * This function is to get the key of the iterator at current position.
484  *
485  * \param[in] env       execution environment.
486  * \param[in] di        the iterator for striped directory.
487  *
488  * \retval      key(dt_key) if successfully get the key.
489  * \retval      negative error if can not get the key.
490  */
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492                                          const struct dt_it *di)
493 {
494         const struct lod_it     *it = (const struct lod_it *)di;
495         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
496         struct dt_object        *next;
497
498         LOD_CHECK_STRIPED_IT(env, it, lo);
499
500         next = lo->ldo_stripe[it->lit_stripe_index];
501         LASSERT(next != NULL);
502         LASSERT(next->do_index_ops != NULL);
503
504         return next->do_index_ops->dio_it.key(env, it->lit_it);
505 }
506
507 /**
508  * Implementation of dt_index_operations:: dio_it.key_size
509  *
510  * This function is to get the key_size of current key.
511  *
512  * \param[in] env       execution environment.
513  * \param[in] di        the iterator for striped directory.
514  *
515  * \retval      key_size if successfully get the key_size.
516  * \retval      negative error if can not get the key_size.
517  */
518 static int lod_striped_it_key_size(const struct lu_env *env,
519                                    const struct dt_it *di)
520 {
521         struct lod_it           *it = (struct lod_it *)di;
522         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
523         struct dt_object        *next;
524
525         LOD_CHECK_STRIPED_IT(env, it, lo);
526
527         next = lo->ldo_stripe[it->lit_stripe_index];
528         LASSERT(next != NULL);
529         LASSERT(next->do_index_ops != NULL);
530
531         return next->do_index_ops->dio_it.key_size(env, it->lit_it);
532 }
533
534 /**
535  * Implementation of dt_index_operations:: dio_it.rec
536  *
537  * This function is to get the record at current position.
538  *
539  * \param[in] env       execution environment.
540  * \param[in] di        the iterator for striped directory.
541  * \param[in] attr      the attribute of iterator, mostly used to indicate
542  *                      the entry attribute in the object to be iterated.
543  * \param[out] rec      hold the return record.
544  *
545  * \retval      0 if successfully get the entry.
546  * \retval      negative error if can not get entry.
547  */
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549                               struct dt_rec *rec, __u32 attr)
550 {
551         const struct lod_it     *it = (const struct lod_it *)di;
552         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
553         struct dt_object        *next;
554
555         LOD_CHECK_STRIPED_IT(env, it, lo);
556
557         next = lo->ldo_stripe[it->lit_stripe_index];
558         LASSERT(next != NULL);
559         LASSERT(next->do_index_ops != NULL);
560
561         return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
562 }
563
564 /**
565  * Implementation of dt_index_operations:: dio_it.rec_size
566  *
567  * This function is to get the record_size at current record.
568  *
569  * \param[in] env       execution environment.
570  * \param[in] di        the iterator for striped directory.
571  * \param[in] attr      the attribute of iterator, mostly used to indicate
572  *                      the entry attribute in the object to be iterated.
573  *
574  * \retval      rec_size if successfully get the entry size.
575  * \retval      negative error if can not get entry size.
576  */
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578                                    const struct dt_it *di, __u32 attr)
579 {
580         struct lod_it           *it = (struct lod_it *)di;
581         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
582         struct dt_object        *next;
583
584         LOD_CHECK_STRIPED_IT(env, it, lo);
585
586         next = lo->ldo_stripe[it->lit_stripe_index];
587         LASSERT(next != NULL);
588         LASSERT(next->do_index_ops != NULL);
589
590         return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
591 }
592
593 /**
594  * Implementation of dt_index_operations:: dio_it.store
595  *
596  * This function will a cookie for current position of the iterator head,
597  * so that user can use this cookie to load/start the iterator next time.
598  *
599  * \param[in] env       execution environment.
600  * \param[in] di        the iterator for striped directory.
601  *
602  * \retval      the cookie.
603  */
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605                                   const struct dt_it *di)
606 {
607         const struct lod_it     *it = (const struct lod_it *)di;
608         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
609         struct dt_object        *next;
610
611         LOD_CHECK_STRIPED_IT(env, it, lo);
612
613         next = lo->ldo_stripe[it->lit_stripe_index];
614         LASSERT(next != NULL);
615         LASSERT(next->do_index_ops != NULL);
616
617         return next->do_index_ops->dio_it.store(env, it->lit_it);
618 }
619
620 /**
621  * Implementation of dt_index_operations:: dio_it.load
622  *
623  * This function will position the iterator with the given hash(usually
624  * get from store),
625  *
626  * \param[in] env       execution environment.
627  * \param[in] di        the iterator for striped directory.
628  * \param[in] hash      the given hash.
629  *
630  * \retval      >0 if successfuly load the iterator to the given position.
631  * \retval      <0 if load is failed.
632  */
633 static int lod_striped_it_load(const struct lu_env *env,
634                                const struct dt_it *di, __u64 hash)
635 {
636         const struct lod_it     *it = (const struct lod_it *)di;
637         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
638         struct dt_object        *next;
639
640         LOD_CHECK_STRIPED_IT(env, it, lo);
641
642         next = lo->ldo_stripe[it->lit_stripe_index];
643         LASSERT(next != NULL);
644         LASSERT(next->do_index_ops != NULL);
645
646         return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
647 }
648
649 static struct dt_index_operations lod_striped_index_ops = {
650         .dio_lookup             = lod_index_lookup,
651         .dio_declare_insert     = lod_declare_index_insert,
652         .dio_insert             = lod_index_insert,
653         .dio_declare_delete     = lod_declare_index_delete,
654         .dio_delete             = lod_index_delete,
655         .dio_it = {
656                 .init           = lod_striped_it_init,
657                 .fini           = lod_striped_it_fini,
658                 .get            = lod_striped_it_get,
659                 .put            = lod_striped_it_put,
660                 .next           = lod_striped_it_next,
661                 .key            = lod_striped_it_key,
662                 .key_size       = lod_striped_it_key_size,
663                 .rec            = lod_striped_it_rec,
664                 .rec_size       = lod_striped_it_rec_size,
665                 .store          = lod_striped_it_store,
666                 .load           = lod_striped_it_load,
667         }
668 };
669
670 /**
671  * Append the FID for each shard of the striped directory after the
672  * given LMV EA header.
673  *
674  * To simplify striped directory and the consistency verification,
675  * we only store the LMV EA header on disk, for both master object
676  * and slave objects. When someone wants to know the whole LMV EA,
677  * such as client readdir(), we can build the entrie LMV EA on the
678  * MDT side (in RAM) via iterating the sub-directory entries that
679  * are contained in the master object of the stripe directory.
680  *
681  * For the master object of the striped directroy, the valid name
682  * for each shard is composed of the ${shard_FID}:${shard_idx}.
683  *
684  * There may be holes in the LMV EA if some shards' name entries
685  * are corrupted or lost.
686  *
687  * \param[in] env       pointer to the thread context
688  * \param[in] lo        pointer to the master object of the striped directory
689  * \param[in] buf       pointer to the lu_buf which will hold the LMV EA
690  * \param[in] resize    whether re-allocate the buffer if it is not big enough
691  *
692  * \retval              positive size of the LMV EA
693  * \retval              0 for nothing to be loaded
694  * \retval              negative error number on failure
695  */
696 int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
697                         struct lu_buf *buf, bool resize)
698 {
699         struct lu_dirent        *ent    =
700                         (struct lu_dirent *)lod_env_info(env)->lti_key;
701         struct lod_device       *lod    = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
702         struct dt_object        *obj    = dt_object_child(&lo->ldo_obj);
703         struct lmv_mds_md_v1    *lmv1   = buf->lb_buf;
704         struct dt_it            *it;
705         const struct dt_it_ops  *iops;
706         __u32                    stripes;
707         __u32                    magic  = le32_to_cpu(lmv1->lmv_magic);
708         size_t                   lmv1_size;
709         int                      rc;
710         ENTRY;
711
712         /* If it is not a striped directory, then load nothing. */
713         if (magic != LMV_MAGIC_V1)
714                 RETURN(0);
715
716         /* If it is in migration (or failure), then load nothing. */
717         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
718                 RETURN(0);
719
720         stripes = le32_to_cpu(lmv1->lmv_stripe_count);
721         if (stripes < 1)
722                 RETURN(0);
723
724         rc = lmv_mds_md_size(stripes, magic);
725         if (rc < 0)
726                 RETURN(rc);
727         lmv1_size = rc;
728         if (buf->lb_len < lmv1_size) {
729                 struct lu_buf tbuf;
730
731                 if (!resize)
732                         RETURN(-ERANGE);
733
734                 tbuf = *buf;
735                 buf->lb_buf = NULL;
736                 buf->lb_len = 0;
737                 lu_buf_alloc(buf, lmv1_size);
738                 lmv1 = buf->lb_buf;
739                 if (lmv1 == NULL)
740                         RETURN(-ENOMEM);
741
742                 memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
743         }
744
745         if (unlikely(!dt_try_as_dir(env, obj)))
746                 RETURN(-ENOTDIR);
747
748         memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
749         iops = &obj->do_index_ops->dio_it;
750         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
751         if (IS_ERR(it))
752                 RETURN(PTR_ERR(it));
753
754         rc = iops->load(env, it, 0);
755         if (rc == 0)
756                 rc = iops->next(env, it);
757         else if (rc > 0)
758                 rc = 0;
759
760         while (rc == 0) {
761                 char             name[FID_LEN + 2] = "";
762                 struct lu_fid    fid;
763                 __u32            index;
764                 int              len;
765
766                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
767                 if (rc != 0)
768                         break;
769
770                 rc = -EIO;
771
772                 fid_le_to_cpu(&fid, &ent->lde_fid);
773                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
774                 if (ent->lde_name[0] == '.') {
775                         if (ent->lde_namelen == 1)
776                                 goto next;
777
778                         if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
779                                 goto next;
780                 }
781
782                 len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
783                 /* The ent->lde_name is composed of ${FID}:${index} */
784                 if (ent->lde_namelen < len + 1 ||
785                     memcmp(ent->lde_name, name, len) != 0) {
786                         CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
787                                "%s: invalid shard name %.*s with the FID "DFID
788                                " for the striped directory "DFID", %s\n",
789                                lod2obd(lod)->obd_name, ent->lde_namelen,
790                                ent->lde_name, PFID(&fid),
791                                PFID(lu_object_fid(&obj->do_lu)),
792                                lod->lod_lmv_failout ? "failout" : "skip");
793
794                         if (lod->lod_lmv_failout)
795                                 break;
796
797                         goto next;
798                 }
799
800                 index = 0;
801                 do {
802                         if (ent->lde_name[len] < '0' ||
803                             ent->lde_name[len] > '9') {
804                                 CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
805                                        "%s: invalid shard name %.*s with the "
806                                        "FID "DFID" for the striped directory "
807                                        DFID", %s\n",
808                                        lod2obd(lod)->obd_name, ent->lde_namelen,
809                                        ent->lde_name, PFID(&fid),
810                                        PFID(lu_object_fid(&obj->do_lu)),
811                                        lod->lod_lmv_failout ?
812                                        "failout" : "skip");
813
814                                 if (lod->lod_lmv_failout)
815                                         break;
816
817                                 goto next;
818                         }
819
820                         index = index * 10 + ent->lde_name[len++] - '0';
821                 } while (len < ent->lde_namelen);
822
823                 if (len == ent->lde_namelen) {
824                         /* Out of LMV EA range. */
825                         if (index >= stripes) {
826                                 CERROR("%s: the shard %.*s for the striped "
827                                        "directory "DFID" is out of the known "
828                                        "LMV EA range [0 - %u], failout\n",
829                                        lod2obd(lod)->obd_name, ent->lde_namelen,
830                                        ent->lde_name,
831                                        PFID(lu_object_fid(&obj->do_lu)),
832                                        stripes - 1);
833
834                                 break;
835                         }
836
837                         /* The slot has been occupied. */
838                         if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
839                                 struct lu_fid fid0;
840
841                                 fid_le_to_cpu(&fid0,
842                                         &lmv1->lmv_stripe_fids[index]);
843                                 CERROR("%s: both the shard "DFID" and "DFID
844                                        " for the striped directory "DFID
845                                        " claim the same LMV EA slot at the "
846                                        "index %d, failout\n",
847                                        lod2obd(lod)->obd_name,
848                                        PFID(&fid0), PFID(&fid),
849                                        PFID(lu_object_fid(&obj->do_lu)), index);
850
851                                 break;
852                         }
853
854                         /* stored as LE mode */
855                         lmv1->lmv_stripe_fids[index] = ent->lde_fid;
856
857 next:
858                         rc = iops->next(env, it);
859                 }
860         }
861
862         iops->put(env, it);
863         iops->fini(env, it);
864
865         RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
866 }
867
868 /**
869  * Implementation of dt_object_operations:: do_index_try
870  *
871  * This function will try to initialize the index api pointer for the
872  * given object, usually it the entry point of the index api. i.e.
873  * the index object should be initialized in index_try, then start
874  * using index api. For striped directory, it will try to initialize
875  * all of its sub_stripes.
876  *
877  * \param[in] env       execution environment.
878  * \param[in] dt        the index object to be initialized.
879  * \param[in] feat      the features of this object, for example fixed or
880  *                      variable key size etc.
881  *
882  * \retval      >0 if the initialization is successful.
883  * \retval      <0 if the initialization is failed.
884  */
885 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
886                          const struct dt_index_features *feat)
887 {
888         struct lod_object       *lo = lod_dt_obj(dt);
889         struct dt_object        *next = dt_object_child(dt);
890         int                     rc;
891         ENTRY;
892
893         LASSERT(next->do_ops);
894         LASSERT(next->do_ops->do_index_try);
895
896         rc = lod_load_striping_locked(env, lo);
897         if (rc != 0)
898                 RETURN(rc);
899
900         rc = next->do_ops->do_index_try(env, next, feat);
901         if (rc != 0)
902                 RETURN(rc);
903
904         if (lo->ldo_stripenr > 0) {
905                 int i;
906
907                 for (i = 0; i < lo->ldo_stripenr; i++) {
908                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
909                                 continue;
910                         rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
911                                                 lo->ldo_stripe[i], feat);
912                         if (rc != 0)
913                                 RETURN(rc);
914                 }
915                 dt->do_index_ops = &lod_striped_index_ops;
916         } else {
917                 dt->do_index_ops = &lod_index_ops;
918         }
919
920         RETURN(rc);
921 }
922
923 static void lod_object_read_lock(const struct lu_env *env,
924                                  struct dt_object *dt, unsigned role)
925 {
926         dt_read_lock(env, dt_object_child(dt), role);
927 }
928
929 static void lod_object_write_lock(const struct lu_env *env,
930                                   struct dt_object *dt, unsigned role)
931 {
932         dt_write_lock(env, dt_object_child(dt), role);
933 }
934
935 static void lod_object_read_unlock(const struct lu_env *env,
936                                    struct dt_object *dt)
937 {
938         dt_read_unlock(env, dt_object_child(dt));
939 }
940
941 static void lod_object_write_unlock(const struct lu_env *env,
942                                     struct dt_object *dt)
943 {
944         dt_write_unlock(env, dt_object_child(dt));
945 }
946
947 static int lod_object_write_locked(const struct lu_env *env,
948                                    struct dt_object *dt)
949 {
950         return dt_write_locked(env, dt_object_child(dt));
951 }
952
953 static int lod_attr_get(const struct lu_env *env,
954                         struct dt_object *dt,
955                         struct lu_attr *attr,
956                         struct lustre_capa *capa)
957 {
958         /* Note: for striped directory, client will merge attributes
959          * from all of the sub-stripes see lmv_merge_attr(), and there
960          * no MDD logic depend on directory nlink/size/time, so we can
961          * always use master inode nlink and size for now. */
962         return dt_attr_get(env, dt_object_child(dt), attr, capa);
963 }
964
965 /**
966  * Mark all of sub-stripes dead of the striped directory.
967  **/
968 static int lod_mark_dead_object(const struct lu_env *env,
969                                 struct dt_object *dt,
970                                 struct thandle *handle,
971                                 bool declare)
972 {
973         struct lod_object       *lo = lod_dt_obj(dt);
974         struct lmv_mds_md_v1    *lmv;
975         __u32                   dead_hash_type;
976         int                     rc;
977         int                     i;
978
979         ENTRY;
980
981         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
982                 RETURN(0);
983
984         rc = lod_load_striping_locked(env, lo);
985         if (rc != 0)
986                 RETURN(rc);
987
988         if (lo->ldo_stripenr == 0)
989                 RETURN(0);
990
991         rc = lod_get_lmv_ea(env, lo);
992         if (rc <= 0)
993                 RETURN(rc);
994
995         lmv = lod_env_info(env)->lti_ea_store;
996         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
997         dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
998         lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
999         for (i = 0; i < lo->ldo_stripenr; i++) {
1000                 struct lu_buf buf;
1001
1002                 lmv->lmv_master_mdt_index = i;
1003                 buf.lb_buf = lmv;
1004                 buf.lb_len = sizeof(*lmv);
1005                 if (declare) {
1006                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
1007                                                   XATTR_NAME_LMV,
1008                                                   LU_XATTR_REPLACE, handle);
1009                 } else {
1010                         rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
1011                                           XATTR_NAME_LMV, LU_XATTR_REPLACE,
1012                                           handle, BYPASS_CAPA);
1013                 }
1014                 if (rc != 0)
1015                         break;
1016         }
1017
1018         RETURN(rc);
1019 }
1020
1021 static int lod_declare_attr_set(const struct lu_env *env,
1022                                 struct dt_object *dt,
1023                                 const struct lu_attr *attr,
1024                                 struct thandle *handle)
1025 {
1026         struct dt_object  *next = dt_object_child(dt);
1027         struct lod_object *lo = lod_dt_obj(dt);
1028         int                rc, i;
1029         ENTRY;
1030
1031         /* Set dead object on all other stripes */
1032         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1033             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1034                 rc = lod_mark_dead_object(env, dt, handle, true);
1035                 RETURN(rc);
1036         }
1037
1038         /*
1039          * declare setattr on the local object
1040          */
1041         rc = dt_declare_attr_set(env, next, attr, handle);
1042         if (rc)
1043                 RETURN(rc);
1044
1045         /* osp_declare_attr_set() ignores all attributes other than
1046          * UID, GID, and size, and osp_attr_set() ignores all but UID
1047          * and GID.  Declaration of size attr setting happens through
1048          * lod_declare_init_size(), and not through this function.
1049          * Therefore we need not load striping unless ownership is
1050          * changing.  This should save memory and (we hope) speed up
1051          * rename(). */
1052         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1053                 if (!(attr->la_valid & (LA_UID | LA_GID)))
1054                         RETURN(rc);
1055
1056                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1057                         RETURN(0);
1058         } else {
1059                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1060                                         LA_ATIME | LA_MTIME | LA_CTIME)))
1061                         RETURN(rc);
1062         }
1063         /*
1064          * load striping information, notice we don't do this when object
1065          * is being initialized as we don't need this information till
1066          * few specific cases like destroy, chown
1067          */
1068         rc = lod_load_striping(env, lo);
1069         if (rc)
1070                 RETURN(rc);
1071
1072         if (lo->ldo_stripenr == 0)
1073                 RETURN(0);
1074
1075         /*
1076          * if object is striped declare changes on the stripes
1077          */
1078         LASSERT(lo->ldo_stripe);
1079         for (i = 0; i < lo->ldo_stripenr; i++) {
1080                 if (likely(lo->ldo_stripe[i] != NULL)) {
1081                         rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
1082                                                  handle);
1083                         if (rc != 0) {
1084                                 CERROR("failed declaration: %d\n", rc);
1085                                 break;
1086                         }
1087                 }
1088         }
1089
1090         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1091             dt_object_exists(next) != 0 &&
1092             dt_object_remote(next) == 0)
1093                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
1094
1095         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1096             dt_object_exists(next) &&
1097             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1098                 struct lod_thread_info *info = lod_env_info(env);
1099                 struct lu_buf *buf = &info->lti_buf;
1100
1101                 buf->lb_buf = info->lti_ea_store;
1102                 buf->lb_len = info->lti_ea_store_size;
1103                 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
1104                                      LU_XATTR_REPLACE, handle);
1105         }
1106
1107         RETURN(rc);
1108 }
1109
1110 static int lod_attr_set(const struct lu_env *env,
1111                         struct dt_object *dt,
1112                         const struct lu_attr *attr,
1113                         struct thandle *handle,
1114                         struct lustre_capa *capa)
1115 {
1116         struct dt_object        *next = dt_object_child(dt);
1117         struct lod_object       *lo = lod_dt_obj(dt);
1118         int                     rc, i;
1119         ENTRY;
1120
1121         /* Set dead object on all other stripes */
1122         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1123             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1124                 rc = lod_mark_dead_object(env, dt, handle, false);
1125                 RETURN(rc);
1126         }
1127
1128         /*
1129          * apply changes to the local object
1130          */
1131         rc = dt_attr_set(env, next, attr, handle, capa);
1132         if (rc)
1133                 RETURN(rc);
1134
1135         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1136                 if (!(attr->la_valid & (LA_UID | LA_GID)))
1137                         RETURN(rc);
1138
1139                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1140                         RETURN(0);
1141         } else {
1142                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1143                                         LA_ATIME | LA_MTIME | LA_CTIME)))
1144                         RETURN(rc);
1145         }
1146
1147         if (lo->ldo_stripenr == 0)
1148                 RETURN(0);
1149
1150         /*
1151          * if object is striped, apply changes to all the stripes
1152          */
1153         LASSERT(lo->ldo_stripe);
1154         for (i = 0; i < lo->ldo_stripenr; i++) {
1155                 if (unlikely(lo->ldo_stripe[i] == NULL))
1156                         continue;
1157                 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
1158                     (dt_object_exists(lo->ldo_stripe[i]) == 0))
1159                         continue;
1160
1161                 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
1162                 if (rc != 0) {
1163                         CERROR("failed declaration: %d\n", rc);
1164                         break;
1165                 }
1166         }
1167
1168         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1169             dt_object_exists(next) != 0 &&
1170             dt_object_remote(next) == 0)
1171                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1172
1173         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1174             dt_object_exists(next) &&
1175             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1176                 struct lod_thread_info *info = lod_env_info(env);
1177                 struct lu_buf *buf = &info->lti_buf;
1178                 struct ost_id *oi = &info->lti_ostid;
1179                 struct lu_fid *fid = &info->lti_fid;
1180                 struct lov_mds_md_v1 *lmm;
1181                 struct lov_ost_data_v1 *objs;
1182                 __u32 magic;
1183                 int rc1;
1184
1185                 rc1 = lod_get_lov_ea(env, lo);
1186                 if (rc1  <= 0)
1187                         RETURN(rc);
1188
1189                 buf->lb_buf = info->lti_ea_store;
1190                 buf->lb_len = info->lti_ea_store_size;
1191                 lmm = info->lti_ea_store;
1192                 magic = le32_to_cpu(lmm->lmm_magic);
1193                 if (magic == LOV_MAGIC_V1)
1194                         objs = &(lmm->lmm_objects[0]);
1195                 else
1196                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1197                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1198                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1199                 fid->f_oid--;
1200                 fid_to_ostid(fid, oi);
1201                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1202                 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1203                              LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1204         }
1205
1206         RETURN(rc);
1207 }
1208
1209 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1210                          struct lu_buf *buf, const char *name,
1211                          struct lustre_capa *capa)
1212 {
1213         struct lod_thread_info  *info = lod_env_info(env);
1214         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
1215         int                      rc, is_root;
1216         ENTRY;
1217
1218         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1219         if (strcmp(name, XATTR_NAME_LMV) == 0) {
1220                 struct lmv_mds_md_v1    *lmv1;
1221                 int                      rc1 = 0;
1222
1223                 if (rc > (typeof(rc))sizeof(*lmv1))
1224                         RETURN(rc);
1225
1226                 if (rc < (typeof(rc))sizeof(*lmv1))
1227                         RETURN(rc = rc > 0 ? -EINVAL : rc);
1228
1229                 if (buf->lb_buf == NULL || buf->lb_len == 0) {
1230                         CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
1231
1232                         info->lti_buf.lb_buf = info->lti_key;
1233                         info->lti_buf.lb_len = sizeof(*lmv1);
1234                         rc = dt_xattr_get(env, dt_object_child(dt),
1235                                           &info->lti_buf, name, capa);
1236                         if (unlikely(rc != sizeof(*lmv1)))
1237                                 RETURN(rc = rc > 0 ? -EINVAL : rc);
1238
1239                         lmv1 = info->lti_buf.lb_buf;
1240                         /* The on-disk LMV EA only contains header, but the
1241                          * returned LMV EA size should contain the space for
1242                          * the FIDs of all shards of the striped directory. */
1243                         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
1244                                 rc = lmv_mds_md_size(
1245                                         le32_to_cpu(lmv1->lmv_stripe_count),
1246                                         LMV_MAGIC_V1);
1247                 } else {
1248                         rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
1249                                                   buf, false);
1250                 }
1251
1252                 RETURN(rc = rc1 != 0 ? rc1 : rc);
1253         }
1254
1255         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1256                 RETURN(rc);
1257
1258         /*
1259          * lod returns default striping on the real root of the device
1260          * this is like the root stores default striping for the whole
1261          * filesystem. historically we've been using a different approach
1262          * and store it in the config.
1263          */
1264         dt_root_get(env, dev->lod_child, &info->lti_fid);
1265         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1266
1267         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1268                 struct lov_user_md *lum = buf->lb_buf;
1269                 struct lov_desc    *desc = &dev->lod_desc;
1270
1271                 if (buf->lb_buf == NULL) {
1272                         rc = sizeof(*lum);
1273                 } else if (buf->lb_len >= sizeof(*lum)) {
1274                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1275                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1276                         lmm_oi_set_id(&lum->lmm_oi, 0);
1277                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1278                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1279                         lum->lmm_stripe_size = cpu_to_le32(
1280                                                 desc->ld_default_stripe_size);
1281                         lum->lmm_stripe_count = cpu_to_le16(
1282                                                 desc->ld_default_stripe_count);
1283                         lum->lmm_stripe_offset = cpu_to_le16(
1284                                                 desc->ld_default_stripe_offset);
1285                         rc = sizeof(*lum);
1286                 } else {
1287                         rc = -ERANGE;
1288                 }
1289         }
1290
1291         RETURN(rc);
1292 }
1293
1294 static int lod_verify_md_striping(struct lod_device *lod,
1295                                   const struct lmv_user_md_v1 *lum)
1296 {
1297         int     rc = 0;
1298         ENTRY;
1299
1300         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1301                 GOTO(out, rc = -EINVAL);
1302
1303         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1304                 GOTO(out, rc = -EINVAL);
1305 out:
1306         if (rc != 0)
1307                 CERROR("%s: invalid lmv_user_md: magic = %x, "
1308                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1309                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1310                        (int)le32_to_cpu(lum->lum_stripe_offset),
1311                        le32_to_cpu(lum->lum_stripe_count), rc);
1312         return rc;
1313 }
1314
1315 /**
1316  * Master LMVEA will be same as slave LMVEA, except
1317  * 1. different magic
1318  * 2. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1319  */
1320 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1321                                   const struct lmv_mds_md_v1 *master_lmv)
1322 {
1323         *slave_lmv = *master_lmv;
1324         slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1325 }
1326
1327 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1328                     struct lu_buf *lmv_buf)
1329 {
1330         struct lod_thread_info  *info = lod_env_info(env);
1331         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1332         struct lod_object       *lo = lod_dt_obj(dt);
1333         struct lmv_mds_md_v1    *lmm1;
1334         int                     stripe_count;
1335         int                     type = LU_SEQ_RANGE_ANY;
1336         int                     rc;
1337         __u32                   mdtidx;
1338         ENTRY;
1339
1340         LASSERT(lo->ldo_dir_striped != 0);
1341         LASSERT(lo->ldo_stripenr > 0);
1342         stripe_count = lo->ldo_stripenr;
1343         /* Only store the LMV EA heahder on the disk. */
1344         if (info->lti_ea_store_size < sizeof(*lmm1)) {
1345                 rc = lod_ea_store_resize(info, sizeof(*lmm1));
1346                 if (rc != 0)
1347                         RETURN(rc);
1348         } else {
1349                 memset(info->lti_ea_store, 0, sizeof(*lmm1));
1350         }
1351
1352         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1353         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1354         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1355         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1356         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1357                             &mdtidx, &type);
1358         if (rc != 0)
1359                 RETURN(rc);
1360
1361         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1362         lmv_buf->lb_buf = info->lti_ea_store;
1363         lmv_buf->lb_len = sizeof(*lmm1);
1364         lo->ldo_dir_striping_cached = 1;
1365
1366         RETURN(rc);
1367 }
1368
1369 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1370                            const struct lu_buf *buf)
1371 {
1372         struct lod_thread_info  *info = lod_env_info(env);
1373         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1374         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1375         struct dt_object        **stripe;
1376         union lmv_mds_md        *lmm = buf->lb_buf;
1377         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
1378         struct lu_fid           *fid = &info->lti_fid;
1379         unsigned int            i;
1380         int                     rc = 0;
1381         ENTRY;
1382
1383         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1384                 RETURN(0);
1385
1386         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1387                 lo->ldo_dir_slave_stripe = 1;
1388                 RETURN(0);
1389         }
1390
1391         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1392                 RETURN(-EINVAL);
1393
1394         if (le32_to_cpu(lmv1->lmv_stripe_count) < 1)
1395                 RETURN(0);
1396
1397         LASSERT(lo->ldo_stripe == NULL);
1398         OBD_ALLOC(stripe, sizeof(stripe[0]) *
1399                   (le32_to_cpu(lmv1->lmv_stripe_count)));
1400         if (stripe == NULL)
1401                 RETURN(-ENOMEM);
1402
1403         for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1404                 struct dt_device        *tgt_dt;
1405                 struct dt_object        *dto;
1406                 int                     type = LU_SEQ_RANGE_ANY;
1407                 __u32                   idx;
1408
1409                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1410                 if (!fid_is_sane(fid))
1411                         GOTO(out, rc = -ESTALE);
1412
1413                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1414                 if (rc != 0)
1415                         GOTO(out, rc);
1416
1417                 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1418                         tgt_dt = lod->lod_child;
1419                 } else {
1420                         struct lod_tgt_desc     *tgt;
1421
1422                         tgt = LTD_TGT(ltd, idx);
1423                         if (tgt == NULL)
1424                                 GOTO(out, rc = -ESTALE);
1425                         tgt_dt = tgt->ltd_tgt;
1426                 }
1427
1428                 dto = dt_locate_at(env, tgt_dt, fid,
1429                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1430                                   NULL);
1431                 if (IS_ERR(dto))
1432                         GOTO(out, rc = PTR_ERR(dto));
1433
1434                 stripe[i] = dto;
1435         }
1436 out:
1437         lo->ldo_stripe = stripe;
1438         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1439         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1440         if (rc != 0)
1441                 lod_object_free_striping(env, lo);
1442
1443         RETURN(rc);
1444 }
1445
1446 static int lod_prep_md_striped_create(const struct lu_env *env,
1447                                       struct dt_object *dt,
1448                                       struct lu_attr *attr,
1449                                       const struct lmv_user_md_v1 *lum,
1450                                       struct dt_object_format *dof,
1451                                       struct thandle *th)
1452 {
1453         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1454         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1455         struct lod_object       *lo = lod_dt_obj(dt);
1456         struct lod_thread_info  *info = lod_env_info(env);
1457         struct dt_object        **stripe;
1458         struct lu_buf           lmv_buf;
1459         struct lu_buf           slave_lmv_buf;
1460         struct lmv_mds_md_v1    *lmm;
1461         struct lmv_mds_md_v1    *slave_lmm = NULL;
1462         struct dt_insert_rec    *rec = &info->lti_dt_rec;
1463         __u32                   stripe_count;
1464         int                     *idx_array;
1465         int                     rc = 0;
1466         __u32                   i;
1467         __u32                   j;
1468         ENTRY;
1469
1470         /* The lum has been verifed in lod_verify_md_striping */
1471         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1472         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1473
1474         stripe_count = le32_to_cpu(lum->lum_stripe_count);
1475
1476         /* shrink the stripe_count to the avaible MDT count */
1477         if (stripe_count > lod->lod_remote_mdt_count + 1)
1478                 stripe_count = lod->lod_remote_mdt_count + 1;
1479
1480         OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1481         if (stripe == NULL)
1482                 RETURN(-ENOMEM);
1483
1484         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1485         if (idx_array == NULL)
1486                 GOTO(out_free, rc = -ENOMEM);
1487
1488         for (i = 0; i < stripe_count; i++) {
1489                 struct lod_tgt_desc     *tgt = NULL;
1490                 struct dt_object        *dto;
1491                 struct lu_fid           fid = { 0 };
1492                 int                     idx;
1493                 struct lu_object_conf   conf = { 0 };
1494                 struct dt_device        *tgt_dt = NULL;
1495
1496                 if (i == 0) {
1497                         /* Right now, master stripe and master object are
1498                          * on the same MDT */
1499                         idx = le32_to_cpu(lum->lum_stripe_offset);
1500                         rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1501                                            NULL);
1502                         if (rc < 0)
1503                                 GOTO(out_put, rc);
1504                         tgt_dt = lod->lod_child;
1505                         goto next;
1506                 }
1507
1508                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1509
1510                 for (j = 0; j < lod->lod_remote_mdt_count;
1511                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1512                         bool already_allocated = false;
1513                         __u32 k;
1514
1515                         CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
1516                                " allocated %u, last allocated %d\n", idx,
1517                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1518
1519                         /* Find next available target */
1520                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1521                                 continue;
1522
1523                         /* check whether the idx already exists
1524                          * in current allocated array */
1525                         for (k = 0; k < i; k++) {
1526                                 if (idx_array[k] == idx) {
1527                                         already_allocated = true;
1528                                         break;
1529                                 }
1530                         }
1531
1532                         if (already_allocated)
1533                                 continue;
1534
1535                         /* check the status of the OSP */
1536                         tgt = LTD_TGT(ltd, idx);
1537                         if (tgt == NULL)
1538                                 continue;
1539
1540                         tgt_dt = tgt->ltd_tgt;
1541                         rc = dt_statfs(env, tgt_dt, NULL);
1542                         if (rc) {
1543                                 /* this OSP doesn't feel well */
1544                                 rc = 0;
1545                                 continue;
1546                         }
1547
1548                         rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1549                         if (rc < 0) {
1550                                 rc = 0;
1551                                 continue;
1552                         }
1553
1554                         break;
1555                 }
1556
1557                 /* Can not allocate more stripes */
1558                 if (j == lod->lod_remote_mdt_count) {
1559                         CDEBUG(D_INFO, "%s: require stripes %u only get %d\n",
1560                                lod2obd(lod)->obd_name, stripe_count, i - 1);
1561                         break;
1562                 }
1563
1564                 CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
1565                        " allocated %u, last allocated %d\n", idx,
1566                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1567
1568 next:
1569                 /* tgt_dt and fid must be ready after search avaible OSP
1570                  * in the above loop */
1571                 LASSERT(tgt_dt != NULL);
1572                 LASSERT(fid_is_sane(&fid));
1573                 conf.loc_flags = LOC_F_NEW;
1574                 dto = dt_locate_at(env, tgt_dt, &fid,
1575                                    dt->do_lu.lo_dev->ld_site->ls_top_dev,
1576                                    &conf);
1577                 if (IS_ERR(dto))
1578                         GOTO(out_put, rc = PTR_ERR(dto));
1579                 stripe[i] = dto;
1580                 idx_array[i] = idx;
1581         }
1582
1583         lo->ldo_dir_striped = 1;
1584         lo->ldo_stripe = stripe;
1585         lo->ldo_stripenr = i;
1586         lo->ldo_stripes_allocated = stripe_count;
1587
1588         if (lo->ldo_stripenr == 0)
1589                 GOTO(out_put, rc = -ENOSPC);
1590
1591         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1592         if (rc != 0)
1593                 GOTO(out_put, rc);
1594         lmm = lmv_buf.lb_buf;
1595
1596         OBD_ALLOC_PTR(slave_lmm);
1597         if (slave_lmm == NULL)
1598                 GOTO(out_put, rc = -ENOMEM);
1599
1600         lod_prep_slave_lmv_md(slave_lmm, lmm);
1601         slave_lmv_buf.lb_buf = slave_lmm;
1602         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1603
1604         if (!dt_try_as_dir(env, dt_object_child(dt)))
1605                 GOTO(out_put, rc = -EINVAL);
1606
1607         rec->rec_type = S_IFDIR;
1608         for (i = 0; i < lo->ldo_stripenr; i++) {
1609                 struct dt_object        *dto            = stripe[i];
1610                 char                    *stripe_name    = info->lti_key;
1611                 struct lu_name          *sname;
1612                 struct linkea_data       ldata          = { 0 };
1613                 struct lu_buf            linkea_buf;
1614
1615                 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1616                 if (rc != 0)
1617                         GOTO(out_put, rc);
1618
1619                 if (!dt_try_as_dir(env, dto))
1620                         GOTO(out_put, rc = -EINVAL);
1621
1622                 rec->rec_fid = lu_object_fid(&dto->do_lu);
1623                 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
1624                                        (const struct dt_key *)dot, th);
1625                 if (rc != 0)
1626                         GOTO(out_put, rc);
1627
1628                 /* master stripe FID will be put to .. */
1629                 rec->rec_fid = lu_object_fid(&dt->do_lu);
1630                 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
1631                                        (const struct dt_key *)dotdot, th);
1632                 if (rc != 0)
1633                         GOTO(out_put, rc);
1634
1635                 /* probably nothing to inherite */
1636                 if (lo->ldo_striping_cached &&
1637                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1638                                          lo->ldo_def_stripenr,
1639                                          lo->ldo_def_stripe_offset)) {
1640                         struct lov_user_md_v3   *v3;
1641
1642                         /* sigh, lti_ea_store has been used for lmv_buf,
1643                          * so we have to allocate buffer for default
1644                          * stripe EA */
1645                         OBD_ALLOC_PTR(v3);
1646                         if (v3 == NULL)
1647                                 GOTO(out_put, rc = -ENOMEM);
1648
1649                         memset(v3, 0, sizeof(*v3));
1650                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1651                         v3->lmm_stripe_count =
1652                                 cpu_to_le16(lo->ldo_def_stripenr);
1653                         v3->lmm_stripe_offset =
1654                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1655                         v3->lmm_stripe_size =
1656                                 cpu_to_le32(lo->ldo_def_stripe_size);
1657                         if (lo->ldo_pool != NULL)
1658                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1659                                         sizeof(v3->lmm_pool_name));
1660
1661                         info->lti_buf.lb_buf = v3;
1662                         info->lti_buf.lb_len = sizeof(*v3);
1663                         rc = dt_declare_xattr_set(env, dto,
1664                                                   &info->lti_buf,
1665                                                   XATTR_NAME_LOV,
1666                                                   0, th);
1667                         OBD_FREE_PTR(v3);
1668                         if (rc != 0)
1669                                 GOTO(out_put, rc);
1670                 }
1671
1672                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1673                 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1674                                           XATTR_NAME_LMV, 0, th);
1675                 if (rc != 0)
1676                         GOTO(out_put, rc);
1677
1678                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
1679                         PFID(lu_object_fid(&dto->do_lu)), i);
1680
1681                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1682                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1683                 if (rc != 0)
1684                         GOTO(out_put, rc);
1685
1686                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1687                 if (rc != 0)
1688                         GOTO(out_put, rc);
1689
1690                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1691                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1692                 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1693                                           XATTR_NAME_LINK, 0, th);
1694                 if (rc != 0)
1695                         GOTO(out_put, rc);
1696
1697                 rec->rec_fid = lu_object_fid(&dto->do_lu);
1698                 rc = dt_declare_insert(env, dt_object_child(dt),
1699                                        (const struct dt_rec *)rec,
1700                                        (const struct dt_key *)stripe_name, th);
1701                 if (rc != 0)
1702                         GOTO(out_put, rc);
1703
1704                 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1705                 if (rc != 0)
1706                         GOTO(out_put, rc);
1707         }
1708
1709         rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1710                                   XATTR_NAME_LMV, 0, th);
1711         if (rc != 0)
1712                 GOTO(out_put, rc);
1713
1714 out_put:
1715         if (rc < 0) {
1716                 for (i = 0; i < stripe_count; i++)
1717                         if (stripe[i] != NULL)
1718                                 lu_object_put(env, &stripe[i]->do_lu);
1719                 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1720                 lo->ldo_stripenr = 0;
1721                 lo->ldo_stripes_allocated = 0;
1722                 lo->ldo_stripe = NULL;
1723         }
1724
1725 out_free:
1726         if (idx_array != NULL)
1727                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1728         if (slave_lmm != NULL)
1729                 OBD_FREE_PTR(slave_lmm);
1730
1731         RETURN(rc);
1732 }
1733
1734 /**
1735  * Declare create striped md object.
1736  */
1737 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1738                                      struct dt_object *dt,
1739                                      struct lu_attr *attr,
1740                                      const struct lu_buf *lum_buf,
1741                                      struct dt_object_format *dof,
1742                                      struct thandle *th)
1743 {
1744         struct lod_object       *lo = lod_dt_obj(dt);
1745         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1746         struct lmv_user_md_v1   *lum;
1747         int                     rc;
1748         ENTRY;
1749
1750         lum = lum_buf->lb_buf;
1751         LASSERT(lum != NULL);
1752
1753         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1754                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1755                (int)le32_to_cpu(lum->lum_stripe_offset));
1756
1757         if (le32_to_cpu(lum->lum_stripe_count) == 0)
1758                 GOTO(out, rc = 0);
1759
1760         rc = lod_verify_md_striping(lod, lum);
1761         if (rc != 0)
1762                 GOTO(out, rc);
1763
1764         /* prepare dir striped objects */
1765         rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1766         if (rc != 0) {
1767                 /* failed to create striping, let's reset
1768                  * config so that others don't get confused */
1769                 lod_object_free_striping(env, lo);
1770                 GOTO(out, rc);
1771         }
1772 out:
1773         RETURN(rc);
1774 }
1775
1776 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1777                                      struct dt_object *dt,
1778                                      const struct lu_buf *buf,
1779                                      const char *name, int fl,
1780                                      struct thandle *th)
1781 {
1782         struct dt_object        *next = dt_object_child(dt);
1783         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1784         struct lod_object       *lo = lod_dt_obj(dt);
1785         int                     i;
1786         int                     rc;
1787         ENTRY;
1788
1789         if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1790                 struct lmv_user_md_v1 *lum;
1791
1792                 LASSERT(buf != NULL && buf->lb_buf != NULL);
1793                 lum = buf->lb_buf;
1794                 rc = lod_verify_md_striping(d, lum);
1795                 if (rc != 0)
1796                         RETURN(rc);
1797         }
1798
1799         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1800         if (rc != 0)
1801                 RETURN(rc);
1802
1803         /* set xattr to each stripes, if needed */
1804         rc = lod_load_striping(env, lo);
1805         if (rc != 0)
1806                 RETURN(rc);
1807
1808         /* Note: Do not set LinkEA on sub-stripes, otherwise
1809          * it will confuse the fid2path process(see mdt_path_current()).
1810          * The linkEA between master and sub-stripes is set in
1811          * lod_xattr_set_lmv(). */
1812         if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1813                 RETURN(0);
1814
1815         for (i = 0; i < lo->ldo_stripenr; i++) {
1816                 LASSERT(lo->ldo_stripe[i]);
1817                 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1818                                           name, fl, th);
1819                 if (rc != 0)
1820                         break;
1821         }
1822
1823         RETURN(rc);
1824 }
1825
1826 /*
1827  * LOV xattr is a storage for striping, and LOD owns this xattr.
1828  * but LOD allows others to control striping to some extent
1829  * - to reset strping
1830  * - to set new defined striping
1831  * - to set new semi-defined striping
1832  *   - number of stripes is defined
1833  *   - number of stripes + osts are defined
1834  *   - ??
1835  */
1836 static int lod_declare_xattr_set(const struct lu_env *env,
1837                                  struct dt_object *dt,
1838                                  const struct lu_buf *buf,
1839                                  const char *name, int fl,
1840                                  struct thandle *th)
1841 {
1842         struct dt_object *next = dt_object_child(dt);
1843         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
1844         __u32             mode;
1845         int               rc;
1846         ENTRY;
1847
1848         /*
1849          * allow to declare predefined striping on a new (!mode) object
1850          * which is supposed to be replay of regular file creation
1851          * (when LOV setting is declared)
1852          * LU_XATTR_REPLACE is set to indicate a layout swap
1853          */
1854         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1855         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1856              !(fl & LU_XATTR_REPLACE)) {
1857                 /*
1858                  * this is a request to manipulate object's striping
1859                  */
1860                 if (dt_object_exists(dt)) {
1861                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1862                         if (rc)
1863                                 RETURN(rc);
1864                 } else {
1865                         memset(attr, 0, sizeof(*attr));
1866                         attr->la_valid = LA_TYPE | LA_MODE;
1867                         attr->la_mode = S_IFREG;
1868                 }
1869                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1870         } else if (S_ISDIR(mode)) {
1871                 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1872         } else {
1873                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1874         }
1875
1876         RETURN(rc);
1877 }
1878
1879 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1880 {
1881         lo->ldo_striping_cached = 0;
1882         lo->ldo_def_striping_set = 0;
1883         lod_object_set_pool(lo, NULL);
1884         lo->ldo_def_stripe_size = 0;
1885         lo->ldo_def_stripenr = 0;
1886         if (lo->ldo_dir_stripe != NULL)
1887                 lo->ldo_dir_striping_cached = 0;
1888 }
1889
1890 static int lod_xattr_set_internal(const struct lu_env *env,
1891                                   struct dt_object *dt,
1892                                   const struct lu_buf *buf,
1893                                   const char *name, int fl, struct thandle *th,
1894                                   struct lustre_capa *capa)
1895 {
1896         struct dt_object        *next = dt_object_child(dt);
1897         struct lod_object       *lo = lod_dt_obj(dt);
1898         int                     rc;
1899         int                     i;
1900         ENTRY;
1901
1902         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1903         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1904                 RETURN(rc);
1905
1906         /* Note: Do not set LinkEA on sub-stripes, otherwise
1907          * it will confuse the fid2path process(see mdt_path_current()).
1908          * The linkEA between master and sub-stripes is set in
1909          * lod_xattr_set_lmv(). */
1910         if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1911                 RETURN(0);
1912
1913         for (i = 0; i < lo->ldo_stripenr; i++) {
1914                 LASSERT(lo->ldo_stripe[i]);
1915                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1916                                   capa);
1917                 if (rc != 0)
1918                         break;
1919         }
1920
1921         RETURN(rc);
1922 }
1923
1924 static int lod_xattr_del_internal(const struct lu_env *env,
1925                                   struct dt_object *dt,
1926                                   const char *name, struct thandle *th,
1927                                   struct lustre_capa *capa)
1928 {
1929         struct dt_object        *next = dt_object_child(dt);
1930         struct lod_object       *lo = lod_dt_obj(dt);
1931         int                     rc;
1932         int                     i;
1933         ENTRY;
1934
1935         rc = dt_xattr_del(env, next, name, th, capa);
1936         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1937                 RETURN(rc);
1938
1939         if (lo->ldo_stripenr == 0)
1940                 RETURN(rc);
1941
1942         for (i = 0; i < lo->ldo_stripenr; i++) {
1943                 LASSERT(lo->ldo_stripe[i]);
1944                 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1945                                   capa);
1946                 if (rc != 0)
1947                         break;
1948         }
1949
1950         RETURN(rc);
1951 }
1952
1953 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1954                                     struct dt_object *dt,
1955                                     const struct lu_buf *buf,
1956                                     const char *name, int fl,
1957                                     struct thandle *th,
1958                                     struct lustre_capa *capa)
1959 {
1960         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1961         struct lod_object       *l = lod_dt_obj(dt);
1962         struct lov_user_md_v1   *lum;
1963         struct lov_user_md_v3   *v3 = NULL;
1964         int                      rc;
1965         ENTRY;
1966
1967         /* If it is striped dir, we should clear the stripe cache for
1968          * slave stripe as well, but there are no effective way to
1969          * notify the LOD on the slave MDT, so we do not cache stripe
1970          * information for slave stripe for now. XXX*/
1971         lod_lov_stripe_cache_clear(l);
1972         LASSERT(buf != NULL && buf->lb_buf != NULL);
1973         lum = buf->lb_buf;
1974
1975         rc = lod_verify_striping(d, buf, false);
1976         if (rc)
1977                 RETURN(rc);
1978
1979         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1980                 v3 = buf->lb_buf;
1981
1982         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1983          * (i.e. all default values specified) then delete default
1984          * striping from dir. */
1985         CDEBUG(D_OTHER,
1986                 "set default striping: sz %u # %u offset %d %s %s\n",
1987                 (unsigned)lum->lmm_stripe_size,
1988                 (unsigned)lum->lmm_stripe_count,
1989                 (int)lum->lmm_stripe_offset,
1990                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1991
1992         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1993                                 (lum->lmm_stripe_count),
1994                                 (lum->lmm_stripe_offset)) &&
1995                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1996                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1997                 if (rc == -ENODATA)
1998                         rc = 0;
1999         } else {
2000                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2001         }
2002
2003         RETURN(rc);
2004 }
2005
2006 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
2007                                             struct dt_object *dt,
2008                                             const struct lu_buf *buf,
2009                                             const char *name, int fl,
2010                                             struct thandle *th,
2011                                             struct lustre_capa *capa)
2012 {
2013         struct lod_object       *l = lod_dt_obj(dt);
2014         struct lmv_user_md_v1   *lum;
2015         int                      rc;
2016         ENTRY;
2017
2018         LASSERT(buf != NULL && buf->lb_buf != NULL);
2019         lum = buf->lb_buf;
2020
2021         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
2022               le32_to_cpu(lum->lum_stripe_count),
2023               (int)le32_to_cpu(lum->lum_stripe_offset));
2024
2025         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
2026                                  le32_to_cpu(lum->lum_stripe_offset)) &&
2027                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
2028                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
2029                 if (rc == -ENODATA)
2030                         rc = 0;
2031         } else {
2032                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2033                 if (rc != 0)
2034                         RETURN(rc);
2035         }
2036
2037         /* Update default stripe cache */
2038         if (l->ldo_dir_stripe == NULL) {
2039                 OBD_ALLOC_PTR(l->ldo_dir_stripe);
2040                 if (l->ldo_dir_stripe == NULL)
2041                         RETURN(-ENOMEM);
2042         }
2043
2044         l->ldo_dir_striping_cached = 0;
2045         l->ldo_dir_def_striping_set = 1;
2046         l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
2047
2048         RETURN(rc);
2049 }
2050
2051 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
2052                              const struct lu_buf *buf, const char *name,
2053                              int fl, struct thandle *th,
2054                              struct lustre_capa *capa)
2055 {
2056         struct lod_object       *lo = lod_dt_obj(dt);
2057         struct lod_thread_info  *info = lod_env_info(env);
2058         struct lu_attr          *attr = &info->lti_attr;
2059         struct dt_object_format *dof = &info->lti_format;
2060         struct lu_buf           lmv_buf;
2061         struct lu_buf           slave_lmv_buf;
2062         struct lmv_mds_md_v1    *lmm;
2063         struct lmv_mds_md_v1    *slave_lmm = NULL;
2064         struct dt_insert_rec    *rec = &info->lti_dt_rec;
2065         int                     i;
2066         int                     rc;
2067         ENTRY;
2068
2069         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2070                 RETURN(-ENOTDIR);
2071
2072         /* The stripes are supposed to be allocated in declare phase,
2073          * if there are no stripes being allocated, it will skip */
2074         if (lo->ldo_stripenr == 0)
2075                 RETURN(0);
2076
2077         rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
2078         if (rc != 0)
2079                 RETURN(rc);
2080
2081         attr->la_valid = LA_TYPE | LA_MODE;
2082         dof->dof_type = DFT_DIR;
2083
2084         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
2085         if (rc != 0)
2086                 RETURN(rc);
2087         lmm = lmv_buf.lb_buf;
2088
2089         OBD_ALLOC_PTR(slave_lmm);
2090         if (slave_lmm == NULL)
2091                 RETURN(-ENOMEM);
2092
2093         lod_prep_slave_lmv_md(slave_lmm, lmm);
2094         slave_lmv_buf.lb_buf = slave_lmm;
2095         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
2096
2097         rec->rec_type = S_IFDIR;
2098         for (i = 0; i < lo->ldo_stripenr; i++) {
2099                 struct dt_object        *dto;
2100                 char                    *stripe_name    = info->lti_key;
2101                 struct lu_name          *sname;
2102                 struct linkea_data       ldata          = { 0 };
2103                 struct lu_buf            linkea_buf;
2104
2105                 dto = lo->ldo_stripe[i];
2106                 dt_write_lock(env, dto, MOR_TGT_CHILD);
2107                 rc = dt_create(env, dto, attr, NULL, dof, th);
2108                 dt_write_unlock(env, dto);
2109                 if (rc != 0)
2110                         RETURN(rc);
2111
2112                 rec->rec_fid = lu_object_fid(&dto->do_lu);
2113                 rc = dt_insert(env, dto, (const struct dt_rec *)rec,
2114                                (const struct dt_key *)dot, th, capa, 0);
2115                 if (rc != 0)
2116                         RETURN(rc);
2117
2118                 rec->rec_fid = lu_object_fid(&dt->do_lu);
2119                 rc = dt_insert(env, dto, (struct dt_rec *)rec,
2120                                (const struct dt_key *)dotdot, th, capa, 0);
2121                 if (rc != 0)
2122                         RETURN(rc);
2123
2124                 if (lo->ldo_striping_cached &&
2125                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2126                                          lo->ldo_def_stripenr,
2127                                          lo->ldo_def_stripe_offset)) {
2128                         struct lov_user_md_v3   *v3;
2129
2130                         /* sigh, lti_ea_store has been used for lmv_buf,
2131                          * so we have to allocate buffer for default
2132                          * stripe EA */
2133                         OBD_ALLOC_PTR(v3);
2134                         if (v3 == NULL)
2135                                 GOTO(out, rc);
2136
2137                         memset(v3, 0, sizeof(*v3));
2138                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2139                         v3->lmm_stripe_count =
2140                                 cpu_to_le16(lo->ldo_def_stripenr);
2141                         v3->lmm_stripe_offset =
2142                                 cpu_to_le16(lo->ldo_def_stripe_offset);
2143                         v3->lmm_stripe_size =
2144                                 cpu_to_le32(lo->ldo_def_stripe_size);
2145                         if (lo->ldo_pool != NULL)
2146                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2147                                         sizeof(v3->lmm_pool_name));
2148
2149                         info->lti_buf.lb_buf = v3;
2150                         info->lti_buf.lb_len = sizeof(*v3);
2151                         rc = dt_xattr_set(env, dto, &info->lti_buf,
2152                                           XATTR_NAME_LOV, 0, th, capa);
2153                         OBD_FREE_PTR(v3);
2154                         if (rc != 0)
2155                                 GOTO(out, rc);
2156                 }
2157
2158                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
2159                 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
2160                                   fl, th, capa);
2161                 if (rc != 0)
2162                         GOTO(out, rc);
2163
2164                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2165                          PFID(lu_object_fid(&dto->do_lu)), i);
2166
2167                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
2168                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
2169                 if (rc != 0)
2170                         GOTO(out, rc);
2171
2172                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
2173                 if (rc != 0)
2174                         GOTO(out, rc);
2175
2176                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
2177                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
2178                 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
2179                                   0, th, BYPASS_CAPA);
2180                 if (rc != 0)
2181                         GOTO(out, rc);
2182
2183                 rec->rec_fid = lu_object_fid(&dto->do_lu);
2184                 rc = dt_insert(env, dt_object_child(dt),
2185                                (const struct dt_rec *)rec,
2186                                (const struct dt_key *)stripe_name, th, capa, 0);
2187                 if (rc != 0)
2188                         GOTO(out, rc);
2189
2190                 rc = dt_ref_add(env, dt_object_child(dt), th);
2191                 if (rc != 0)
2192                         GOTO(out, rc);
2193         }
2194
2195         rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
2196                           fl, th, capa);
2197
2198 out:
2199         if (slave_lmm != NULL)
2200                 OBD_FREE_PTR(slave_lmm);
2201
2202         RETURN(rc);
2203 }
2204
2205 int lod_dir_striping_create_internal(const struct lu_env *env,
2206                                      struct dt_object *dt,
2207                                      struct lu_attr *attr,
2208                                      struct dt_object_format *dof,
2209                                      struct thandle *th,
2210                                      bool declare)
2211 {
2212         struct lod_thread_info  *info = lod_env_info(env);
2213         struct lod_object       *lo = lod_dt_obj(dt);
2214         int                     rc;
2215         ENTRY;
2216
2217         if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
2218                                  lo->ldo_dir_stripe_offset)) {
2219                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2220                 int stripe_count = lo->ldo_stripenr;
2221
2222                 if (info->lti_ea_store_size < sizeof(*v1)) {
2223                         rc = lod_ea_store_resize(info, sizeof(*v1));
2224                         if (rc != 0)
2225                                 RETURN(rc);
2226                         v1 = info->lti_ea_store;
2227                 }
2228
2229                 memset(v1, 0, sizeof(*v1));
2230                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2231                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2232                 v1->lum_stripe_offset =
2233                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
2234
2235                 info->lti_buf.lb_buf = v1;
2236                 info->lti_buf.lb_len = sizeof(*v1);
2237
2238                 if (declare)
2239                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
2240                                                        &info->lti_buf, dof, th);
2241                 else
2242                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2243                                                XATTR_NAME_LMV, 0, th,
2244                                                BYPASS_CAPA);
2245                 if (rc != 0)
2246                         RETURN(rc);
2247         }
2248
2249         /* Transfer default LMV striping from the parent */
2250         if (lo->ldo_dir_striping_cached &&
2251             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2252                                  lo->ldo_dir_def_stripe_offset)) {
2253                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2254                 int def_stripe_count = lo->ldo_dir_def_stripenr;
2255
2256                 if (info->lti_ea_store_size < sizeof(*v1)) {
2257                         rc = lod_ea_store_resize(info, sizeof(*v1));
2258                         if (rc != 0)
2259                                 RETURN(rc);
2260                         v1 = info->lti_ea_store;
2261                 }
2262
2263                 memset(v1, 0, sizeof(*v1));
2264                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2265                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2266                 v1->lum_stripe_offset =
2267                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2268                 v1->lum_hash_type =
2269                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
2270
2271                 info->lti_buf.lb_buf = v1;
2272                 info->lti_buf.lb_len = sizeof(*v1);
2273                 if (declare)
2274                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2275                                                        XATTR_NAME_DEFAULT_LMV,
2276                                                        0, th);
2277                 else
2278                         rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2279                                                   &info->lti_buf,
2280                                                   XATTR_NAME_DEFAULT_LMV, 0,
2281                                                   th, BYPASS_CAPA);
2282                 if (rc != 0)
2283                         RETURN(rc);
2284         }
2285
2286         /* Transfer default LOV striping from the parent */
2287         if (lo->ldo_striping_cached &&
2288             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2289                                  lo->ldo_def_stripenr,
2290                                  lo->ldo_def_stripe_offset)) {
2291                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2292
2293                 if (info->lti_ea_store_size < sizeof(*v3)) {
2294                         rc = lod_ea_store_resize(info, sizeof(*v3));
2295                         if (rc != 0)
2296                                 RETURN(rc);
2297                         v3 = info->lti_ea_store;
2298                 }
2299
2300                 memset(v3, 0, sizeof(*v3));
2301                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2302                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2303                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2304                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2305                 if (lo->ldo_pool != NULL)
2306                         strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2307                                 sizeof(v3->lmm_pool_name));
2308
2309                 info->lti_buf.lb_buf = v3;
2310                 info->lti_buf.lb_len = sizeof(*v3);
2311
2312                 if (declare)
2313                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2314                                                        XATTR_NAME_LOV, 0, th);
2315                 else
2316                         rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2317                                                       XATTR_NAME_LOV, 0, th,
2318                                                       BYPASS_CAPA);
2319                 if (rc != 0)
2320                         RETURN(rc);
2321         }
2322
2323         RETURN(0);
2324 }
2325
2326 static int lod_declare_dir_striping_create(const struct lu_env *env,
2327                                            struct dt_object *dt,
2328                                            struct lu_attr *attr,
2329                                            struct dt_object_format *dof,
2330                                            struct thandle *th)
2331 {
2332         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2333 }
2334
2335 static int lod_dir_striping_create(const struct lu_env *env,
2336                                    struct dt_object *dt,
2337                                    struct lu_attr *attr,
2338                                    struct dt_object_format *dof,
2339                                    struct thandle *th)
2340 {
2341         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2342 }
2343
2344 static int lod_xattr_set(const struct lu_env *env,
2345                          struct dt_object *dt, const struct lu_buf *buf,
2346                          const char *name, int fl, struct thandle *th,
2347                          struct lustre_capa *capa)
2348 {
2349         struct dt_object        *next = dt_object_child(dt);
2350         int                      rc;
2351         ENTRY;
2352
2353         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2354             strcmp(name, XATTR_NAME_LMV) == 0) {
2355                 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2356
2357                 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2358                                                 LMV_HASH_FLAG_MIGRATION)
2359                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2360                 else
2361                         rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2362
2363                 RETURN(rc);
2364         }
2365
2366         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2367             strcmp(name, XATTR_NAME_LOV) == 0) {
2368                 /* default LOVEA */
2369                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2370                 RETURN(rc);
2371         } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2372                    strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2373                 /* default LMVEA */
2374                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2375                                                       th, capa);
2376                 RETURN(rc);
2377         } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2378                    !strcmp(name, XATTR_NAME_LOV)) {
2379                 /* in case of lov EA swap, just set it
2380                  * if not, it is a replay so check striping match what we
2381                  * already have during req replay, declare_xattr_set()
2382                  * defines striping, then create() does the work
2383                 */
2384                 if (fl & LU_XATTR_REPLACE) {
2385                         /* free stripes, then update disk */
2386                         lod_object_free_striping(env, lod_dt_obj(dt));
2387                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2388                 } else {
2389                         rc = lod_striping_create(env, dt, NULL, NULL, th);
2390                 }
2391                 RETURN(rc);
2392         }
2393
2394         /* then all other xattr */
2395         rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2396
2397         RETURN(rc);
2398 }
2399
2400 static int lod_declare_xattr_del(const struct lu_env *env,
2401                                  struct dt_object *dt, const char *name,
2402                                  struct thandle *th)
2403 {
2404         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2405 }
2406
2407 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2408                          const char *name, struct thandle *th,
2409                          struct lustre_capa *capa)
2410 {
2411         if (!strcmp(name, XATTR_NAME_LOV))
2412                 lod_object_free_striping(env, lod_dt_obj(dt));
2413         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2414 }
2415
2416 static int lod_xattr_list(const struct lu_env *env,
2417                           struct dt_object *dt, struct lu_buf *buf,
2418                           struct lustre_capa *capa)
2419 {
2420         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2421 }
2422
2423 int lod_object_set_pool(struct lod_object *o, char *pool)
2424 {
2425         int len;
2426
2427         if (o->ldo_pool) {
2428                 len = strlen(o->ldo_pool);
2429                 OBD_FREE(o->ldo_pool, len + 1);
2430                 o->ldo_pool = NULL;
2431         }
2432         if (pool) {
2433                 len = strlen(pool);
2434                 OBD_ALLOC(o->ldo_pool, len + 1);
2435                 if (o->ldo_pool == NULL)
2436                         return -ENOMEM;
2437                 strcpy(o->ldo_pool, pool);
2438         }
2439         return 0;
2440 }
2441
2442 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2443 {
2444         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2445 }
2446
2447
2448 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2449                                          struct lod_object *lp)
2450 {
2451         struct lod_thread_info  *info = lod_env_info(env);
2452         struct lov_user_md_v1   *v1 = NULL;
2453         struct lov_user_md_v3   *v3 = NULL;
2454         int                      rc;
2455         ENTRY;
2456
2457         /* called from MDD without parent being write locked,
2458          * lock it here */
2459         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2460         rc = lod_get_lov_ea(env, lp);
2461         if (rc < 0)
2462                 GOTO(unlock, rc);
2463
2464         if (rc < (typeof(rc))sizeof(struct lov_user_md)) {
2465                 /* don't lookup for non-existing or invalid striping */
2466                 lp->ldo_def_striping_set = 0;
2467                 lp->ldo_striping_cached = 1;
2468                 lp->ldo_def_stripe_size = 0;
2469                 lp->ldo_def_stripenr = 0;
2470                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2471                 GOTO(unlock, rc = 0);
2472         }
2473
2474         rc = 0;
2475         v1 = info->lti_ea_store;
2476         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2477                 lustre_swab_lov_user_md_v1(v1);
2478         } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2479                 v3 = (struct lov_user_md_v3 *)v1;
2480                 lustre_swab_lov_user_md_v3(v3);
2481         }
2482
2483         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2484                 GOTO(unlock, rc = 0);
2485
2486         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2487                 GOTO(unlock, rc = 0);
2488
2489         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2490                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2491                (int)v1->lmm_stripe_count,
2492                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2493
2494         lp->ldo_def_stripenr = v1->lmm_stripe_count;
2495         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2496         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2497         lp->ldo_striping_cached = 1;
2498         lp->ldo_def_striping_set = 1;
2499         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2500                 /* XXX: sanity check here */
2501                 v3 = (struct lov_user_md_v3 *) v1;
2502                 if (v3->lmm_pool_name[0])
2503                         lod_object_set_pool(lp, v3->lmm_pool_name);
2504         }
2505         EXIT;
2506 unlock:
2507         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2508         return rc;
2509 }
2510
2511
2512 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2513                                          struct lod_object *lp)
2514 {
2515         struct lod_thread_info  *info = lod_env_info(env);
2516         struct lmv_user_md_v1   *v1 = NULL;
2517         int                      rc;
2518         ENTRY;
2519
2520         /* called from MDD without parent being write locked,
2521          * lock it here */
2522         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2523         rc = lod_get_default_lmv_ea(env, lp);
2524         if (rc < 0)
2525                 GOTO(unlock, rc);
2526
2527         if (rc < (typeof(rc))sizeof(struct lmv_user_md)) {
2528                 /* don't lookup for non-existing or invalid striping */
2529                 lp->ldo_dir_def_striping_set = 0;
2530                 lp->ldo_dir_striping_cached = 1;
2531                 lp->ldo_dir_def_stripenr = 0;
2532                 lp->ldo_dir_def_stripe_offset =
2533                                         (typeof(v1->lum_stripe_offset))(-1);
2534                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2535                 GOTO(unlock, rc = 0);
2536         }
2537
2538         rc = 0;
2539         v1 = info->lti_ea_store;
2540
2541         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2542         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2543         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2544         lp->ldo_dir_def_striping_set = 1;
2545         lp->ldo_dir_striping_cached = 1;
2546
2547         EXIT;
2548 unlock:
2549         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2550         return rc;
2551 }
2552
2553 static int lod_cache_parent_striping(const struct lu_env *env,
2554                                      struct lod_object *lp,
2555                                      umode_t child_mode)
2556 {
2557         int rc = 0;
2558         ENTRY;
2559
2560         rc = lod_load_striping(env, lp);
2561         if (rc != 0)
2562                 RETURN(rc);
2563
2564         if (!lp->ldo_striping_cached) {
2565                 /* we haven't tried to get default striping for
2566                  * the directory yet, let's cache it in the object */
2567                 rc = lod_cache_parent_lov_striping(env, lp);
2568                 if (rc != 0)
2569                         RETURN(rc);
2570         }
2571
2572         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2573                 rc = lod_cache_parent_lmv_striping(env, lp);
2574
2575         RETURN(rc);
2576 }
2577
2578 /**
2579  * used to transfer default striping data to the object being created
2580  */
2581 static void lod_ah_init(const struct lu_env *env,
2582                         struct dt_allocation_hint *ah,
2583                         struct dt_object *parent,
2584                         struct dt_object *child,
2585                         umode_t child_mode)
2586 {
2587         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2588         struct dt_object  *nextp = NULL;
2589         struct dt_object  *nextc;
2590         struct lod_object *lp = NULL;
2591         struct lod_object *lc;
2592         struct lov_desc   *desc;
2593         int               rc;
2594         ENTRY;
2595
2596         LASSERT(child);
2597
2598         if (likely(parent)) {
2599                 nextp = dt_object_child(parent);
2600                 lp = lod_dt_obj(parent);
2601                 rc = lod_load_striping(env, lp);
2602                 if (rc != 0)
2603                         return;
2604         }
2605
2606         nextc = dt_object_child(child);
2607         lc = lod_dt_obj(child);
2608
2609         LASSERT(lc->ldo_stripenr == 0);
2610         LASSERT(lc->ldo_stripe == NULL);
2611
2612         /*
2613          * local object may want some hints
2614          * in case of late striping creation, ->ah_init()
2615          * can be called with local object existing
2616          */
2617         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2618                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2619                                           NULL : nextp, nextc, child_mode);
2620
2621         if (S_ISDIR(child_mode)) {
2622                 if (lc->ldo_dir_stripe == NULL) {
2623                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2624                         if (lc->ldo_dir_stripe == NULL)
2625                                 return;
2626                 }
2627
2628                 if (lp->ldo_dir_stripe == NULL) {
2629                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2630                         if (lp->ldo_dir_stripe == NULL)
2631                                 return;
2632                 }
2633
2634                 rc = lod_cache_parent_striping(env, lp, child_mode);
2635                 if (rc != 0)
2636                         return;
2637
2638                 /* transfer defaults to new directory */
2639                 if (lp->ldo_striping_cached) {
2640                         if (lp->ldo_pool)
2641                                 lod_object_set_pool(lc, lp->ldo_pool);
2642                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2643                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2644                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2645                         lc->ldo_striping_cached = 1;
2646                         lc->ldo_def_striping_set = 1;
2647                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2648                                (int)lc->ldo_def_stripe_size,
2649                                (int)lc->ldo_def_stripe_offset,
2650                                (int)lc->ldo_def_stripenr);
2651                 }
2652
2653                 /* transfer dir defaults to new directory */
2654                 if (lp->ldo_dir_striping_cached) {
2655                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2656                         lc->ldo_dir_def_stripe_offset =
2657                                                   lp->ldo_dir_def_stripe_offset;
2658                         lc->ldo_dir_def_hash_type =
2659                                                   lp->ldo_dir_def_hash_type;
2660                         lc->ldo_dir_striping_cached = 1;
2661                         lc->ldo_dir_def_striping_set = 1;
2662                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2663                                (int)lc->ldo_dir_def_stripenr,
2664                                (int)lc->ldo_dir_def_stripe_offset,
2665                                lc->ldo_dir_def_hash_type);
2666                 }
2667
2668                 /* It should always honour the specified stripes */
2669                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2670                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2671
2672                         rc = lod_verify_md_striping(d, lum1);
2673                         if (rc == 0 &&
2674                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2675                                 /* Directory will be striped only if
2676                                  * stripe_count > 1 */
2677                                 lc->ldo_stripenr =
2678                                         le32_to_cpu(lum1->lum_stripe_count);
2679                                 lc->ldo_dir_stripe_offset =
2680                                         le32_to_cpu(lum1->lum_stripe_offset);
2681                                 lc->ldo_dir_hash_type =
2682                                         le32_to_cpu(lum1->lum_hash_type);
2683                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2684                                        lc->ldo_stripenr,
2685                                        (int)lc->ldo_dir_stripe_offset);
2686                         }
2687                 /* then check whether there is default stripes from parent */
2688                 } else if (lp->ldo_dir_def_striping_set) {
2689                         /* If there are default dir stripe from parent */
2690                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2691                         lc->ldo_dir_stripe_offset =
2692                                         lp->ldo_dir_def_stripe_offset;
2693                         lc->ldo_dir_hash_type =
2694                                         lp->ldo_dir_def_hash_type;
2695                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2696                                lc->ldo_stripenr,
2697                                (int)lc->ldo_dir_stripe_offset);
2698                 } else {
2699                         /* set default stripe for this directory */
2700                         lc->ldo_stripenr = 0;
2701                         lc->ldo_dir_stripe_offset = -1;
2702                 }
2703
2704                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2705                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2706
2707                 goto out;
2708         }
2709
2710         /*
2711          * if object is going to be striped over OSTs, transfer default
2712          * striping information to the child, so that we can use it
2713          * during declaration and creation
2714          */
2715         if (!lod_object_will_be_striped(S_ISREG(child_mode),
2716                                         lu_object_fid(&child->do_lu)))
2717                 goto out;
2718         /*
2719          * try from the parent
2720          */
2721         if (likely(parent)) {
2722                 lod_cache_parent_striping(env, lp, child_mode);
2723
2724                 lc->ldo_def_stripe_offset = (__u16) -1;
2725
2726                 if (lp->ldo_def_striping_set) {
2727                         if (lp->ldo_pool)
2728                                 lod_object_set_pool(lc, lp->ldo_pool);
2729                         lc->ldo_stripenr = lp->ldo_def_stripenr;
2730                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2731                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2732                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2733                                lc->ldo_stripenr, lc->ldo_stripe_size,
2734                                lp->ldo_pool ? lp->ldo_pool : "");
2735                 }
2736         }
2737
2738         /*
2739          * if the parent doesn't provide with specific pattern, grab fs-wide one
2740          */
2741         desc = &d->lod_desc;
2742         if (lc->ldo_stripenr == 0)
2743                 lc->ldo_stripenr = desc->ld_default_stripe_count;
2744         if (lc->ldo_stripe_size == 0)
2745                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2746         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2747                lc->ldo_stripenr, lc->ldo_stripe_size,
2748                lc->ldo_pool ? lc->ldo_pool : "");
2749
2750 out:
2751         /* we do not cache stripe information for slave stripe, see
2752          * lod_xattr_set_lov_on_dir */
2753         if (lp != NULL && lp->ldo_dir_slave_stripe)
2754                 lod_lov_stripe_cache_clear(lp);
2755
2756         EXIT;
2757 }
2758
2759 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
2760 /*
2761  * this function handles a special case when truncate was done
2762  * on a stripeless object and now striping is being created
2763  * we can't lose that size, so we have to propagate it to newly
2764  * created object
2765  */
2766 static int lod_declare_init_size(const struct lu_env *env,
2767                                  struct dt_object *dt, struct thandle *th)
2768 {
2769         struct dt_object   *next = dt_object_child(dt);
2770         struct lod_object  *lo = lod_dt_obj(dt);
2771         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
2772         uint64_t            size, offs;
2773         int                 rc, stripe;
2774         ENTRY;
2775
2776         /* XXX: we support the simplest (RAID0) striping so far */
2777         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2778         LASSERT(lo->ldo_stripe_size > 0);
2779
2780         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2781         LASSERT(attr->la_valid & LA_SIZE);
2782         if (rc)
2783                 RETURN(rc);
2784
2785         size = attr->la_size;
2786         if (size == 0)
2787                 RETURN(0);
2788
2789         /* ll_do_div64(a, b) returns a % b, and a = a / b */
2790         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2791         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2792
2793         size = size * lo->ldo_stripe_size;
2794         offs = attr->la_size;
2795         size += ll_do_div64(offs, lo->ldo_stripe_size);
2796
2797         attr->la_valid = LA_SIZE;
2798         attr->la_size = size;
2799
2800         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2801
2802         RETURN(rc);
2803 }
2804
2805 /**
2806  * Create declaration of striped object
2807  */
2808 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2809                                struct lu_attr *attr,
2810                                const struct lu_buf *lovea, struct thandle *th)
2811 {
2812         struct lod_thread_info  *info = lod_env_info(env);
2813         struct dt_object        *next = dt_object_child(dt);
2814         struct lod_object       *lo = lod_dt_obj(dt);
2815         int                      rc;
2816         ENTRY;
2817
2818         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2819                 /* failed to create striping, let's reset
2820                  * config so that others don't get confused */
2821                 lod_object_free_striping(env, lo);
2822                 GOTO(out, rc = -ENOMEM);
2823         }
2824
2825         if (!dt_object_remote(next)) {
2826                 /* choose OST and generate appropriate objects */
2827                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2828                 if (rc) {
2829                         /* failed to create striping, let's reset
2830                          * config so that others don't get confused */
2831                         lod_object_free_striping(env, lo);
2832                         GOTO(out, rc);
2833                 }
2834
2835                 /*
2836                  * declare storage for striping data
2837                  */
2838                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2839                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
2840         } else {
2841                 /* LOD can not choose OST objects for remote objects, i.e.
2842                  * stripes must be ready before that. Right now, it can only
2843                  * happen during migrate, i.e. migrate process needs to create
2844                  * remote regular file (mdd_migrate_create), then the migrate
2845                  * process will provide stripeEA. */
2846                 LASSERT(lovea != NULL);
2847                 info->lti_buf = *lovea;
2848         }
2849
2850         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2851                                   XATTR_NAME_LOV, 0, th);
2852         if (rc)
2853                 GOTO(out, rc);
2854
2855         /*
2856          * if striping is created with local object's size > 0,
2857          * we have to propagate this size to specific object
2858          * the case is possible only when local object was created previously
2859          */
2860         if (dt_object_exists(next))
2861                 rc = lod_declare_init_size(env, dt, th);
2862
2863 out:
2864         RETURN(rc);
2865 }
2866
2867 static int lod_declare_object_create(const struct lu_env *env,
2868                                      struct dt_object *dt,
2869                                      struct lu_attr *attr,
2870                                      struct dt_allocation_hint *hint,
2871                                      struct dt_object_format *dof,
2872                                      struct thandle *th)
2873 {
2874         struct dt_object   *next = dt_object_child(dt);
2875         struct lod_object  *lo = lod_dt_obj(dt);
2876         int                 rc;
2877         ENTRY;
2878
2879         LASSERT(dof);
2880         LASSERT(attr);
2881         LASSERT(th);
2882
2883         /*
2884          * first of all, we declare creation of local object
2885          */
2886         rc = dt_declare_create(env, next, attr, hint, dof, th);
2887         if (rc)
2888                 GOTO(out, rc);
2889
2890         if (dof->dof_type == DFT_SYM)
2891                 dt->do_body_ops = &lod_body_lnk_ops;
2892
2893         /*
2894          * it's lod_ah_init() who has decided the object will striped
2895          */
2896         if (dof->dof_type == DFT_REGULAR) {
2897                 /* callers don't want stripes */
2898                 /* XXX: all tricky interactions with ->ah_make_hint() decided
2899                  * to use striping, then ->declare_create() behaving differently
2900                  * should be cleaned */
2901                 if (dof->u.dof_reg.striped == 0)
2902                         lo->ldo_stripenr = 0;
2903                 if (lo->ldo_stripenr > 0)
2904                         rc = lod_declare_striped_object(env, dt, attr,
2905                                                         NULL, th);
2906         } else if (dof->dof_type == DFT_DIR) {
2907                 /* Orphan object (like migrating object) does not have
2908                  * lod_dir_stripe, see lod_ah_init */
2909                 if (lo->ldo_dir_stripe != NULL)
2910                         rc = lod_declare_dir_striping_create(env, dt, attr,
2911                                                              dof, th);
2912         }
2913 out:
2914         RETURN(rc);
2915 }
2916
2917 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2918                         struct lu_attr *attr, struct dt_object_format *dof,
2919                         struct thandle *th)
2920 {
2921         struct lod_object *lo = lod_dt_obj(dt);
2922         int                rc = 0, i;
2923         ENTRY;
2924
2925         LASSERT(lo->ldo_striping_cached == 0);
2926
2927         /* create all underlying objects */
2928         for (i = 0; i < lo->ldo_stripenr; i++) {
2929                 LASSERT(lo->ldo_stripe[i]);
2930                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2931
2932                 if (rc)
2933                         break;
2934         }
2935         if (rc == 0)
2936                 rc = lod_generate_and_set_lovea(env, lo, th);
2937
2938         RETURN(rc);
2939 }
2940
2941 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2942                              struct lu_attr *attr,
2943                              struct dt_allocation_hint *hint,
2944                              struct dt_object_format *dof, struct thandle *th)
2945 {
2946         struct dt_object   *next = dt_object_child(dt);
2947         struct lod_object  *lo = lod_dt_obj(dt);
2948         int                 rc;
2949         ENTRY;
2950
2951         /* create local object */
2952         rc = dt_create(env, next, attr, hint, dof, th);
2953         if (rc != 0)
2954                 RETURN(rc);
2955
2956         if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2957             lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2958                 rc = lod_striping_create(env, dt, attr, dof, th);
2959
2960         RETURN(rc);
2961 }
2962
2963 static int lod_declare_object_destroy(const struct lu_env *env,
2964                                       struct dt_object *dt,
2965                                       struct thandle *th)
2966 {
2967         struct dt_object   *next = dt_object_child(dt);
2968         struct lod_object  *lo = lod_dt_obj(dt);
2969         struct lod_thread_info *info = lod_env_info(env);
2970         char               *stripe_name = info->lti_key;
2971         int                 rc, i;
2972         ENTRY;
2973
2974         /*
2975          * load striping information, notice we don't do this when object
2976          * is being initialized as we don't need this information till
2977          * few specific cases like destroy, chown
2978          */
2979         rc = lod_load_striping(env, lo);
2980         if (rc)
2981                 RETURN(rc);
2982
2983         /* declare destroy for all underlying objects */
2984         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2985                 rc = next->do_ops->do_index_try(env, next,
2986                                                 &dt_directory_features);
2987                 if (rc != 0)
2988                         RETURN(rc);
2989
2990                 for (i = 0; i < lo->ldo_stripenr; i++) {
2991                         rc = dt_declare_ref_del(env, next, th);
2992                         if (rc != 0)
2993                                 RETURN(rc);
2994                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2995                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2996                                 i);
2997                         rc = dt_declare_delete(env, next,
2998                                         (const struct dt_key *)stripe_name, th);
2999                         if (rc != 0)
3000                                 RETURN(rc);
3001                 }
3002         }
3003         /*
3004          * we declare destroy for the local object
3005          */
3006         rc = dt_declare_destroy(env, next, th);
3007         if (rc)
3008                 RETURN(rc);
3009
3010         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3011                 RETURN(0);
3012
3013         /* declare destroy all striped objects */
3014         for (i = 0; i < lo->ldo_stripenr; i++) {
3015                 if (likely(lo->ldo_stripe[i] != NULL)) {
3016                         rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
3017                         if (rc != 0)
3018                                 break;
3019                 }
3020         }
3021
3022         RETURN(rc);
3023 }
3024
3025 static int lod_object_destroy(const struct lu_env *env,
3026                 struct dt_object *dt, struct thandle *th)
3027 {
3028         struct dt_object  *next = dt_object_child(dt);
3029         struct lod_object *lo = lod_dt_obj(dt);
3030         struct lod_thread_info *info = lod_env_info(env);
3031         char               *stripe_name = info->lti_key;
3032         unsigned int       i;
3033         int                rc;
3034         ENTRY;
3035
3036         /* destroy sub-stripe of master object */
3037         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
3038                 rc = next->do_ops->do_index_try(env, next,
3039                                                 &dt_directory_features);
3040                 if (rc != 0)
3041                         RETURN(rc);
3042
3043                 for (i = 0; i < lo->ldo_stripenr; i++) {
3044                         rc = dt_ref_del(env, next, th);
3045                         if (rc != 0)
3046                                 RETURN(rc);
3047
3048                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
3049                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
3050                                 i);
3051
3052                         CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
3053                                PFID(lu_object_fid(&dt->do_lu)), stripe_name,
3054                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
3055
3056                         rc = dt_delete(env, next,
3057                                        (const struct dt_key *)stripe_name,
3058                                        th, BYPASS_CAPA);
3059                         if (rc != 0)
3060                                 RETURN(rc);
3061                 }
3062         }
3063         rc = dt_destroy(env, next, th);
3064         if (rc != 0)
3065                 RETURN(rc);
3066
3067         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3068                 RETURN(0);
3069
3070         /* destroy all striped objects */
3071         for (i = 0; i < lo->ldo_stripenr; i++) {
3072                 if (likely(lo->ldo_stripe[i] != NULL) &&
3073                     (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
3074                      i == cfs_fail_val)) {
3075                         rc = dt_destroy(env, lo->ldo_stripe[i], th);
3076                         if (rc != 0)
3077                                 break;
3078                 }
3079         }
3080
3081         RETURN(rc);
3082 }
3083
3084 static int lod_declare_ref_add(const struct lu_env *env,
3085                                struct dt_object *dt, struct thandle *th)
3086 {
3087         return dt_declare_ref_add(env, dt_object_child(dt), th);
3088 }
3089
3090 static int lod_ref_add(const struct lu_env *env,
3091                        struct dt_object *dt, struct thandle *th)
3092 {
3093         return dt_ref_add(env, dt_object_child(dt), th);
3094 }
3095
3096 static int lod_declare_ref_del(const struct lu_env *env,
3097                                struct dt_object *dt, struct thandle *th)
3098 {
3099         return dt_declare_ref_del(env, dt_object_child(dt), th);
3100 }
3101
3102 static int lod_ref_del(const struct lu_env *env,
3103                        struct dt_object *dt, struct thandle *th)
3104 {
3105         return dt_ref_del(env, dt_object_child(dt), th);
3106 }
3107
3108 static struct obd_capa *lod_capa_get(const struct lu_env *env,
3109                                      struct dt_object *dt,
3110                                      struct lustre_capa *old, __u64 opc)
3111 {
3112         return dt_capa_get(env, dt_object_child(dt), old, opc);
3113 }
3114
3115 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
3116                            __u64 start, __u64 end)
3117 {
3118         return dt_object_sync(env, dt_object_child(dt), start, end);
3119 }
3120
3121 struct lod_slave_locks  {
3122         int                     lsl_lock_count;
3123         struct lustre_handle    lsl_handle[0];
3124 };
3125
3126 static int lod_object_unlock_internal(const struct lu_env *env,
3127                                       struct dt_object *dt,
3128                                       struct ldlm_enqueue_info *einfo,
3129                                       ldlm_policy_data_t *policy)
3130 {
3131         struct lod_object       *lo = lod_dt_obj(dt);
3132         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
3133         int                     rc = 0;
3134         int                     i;
3135         ENTRY;
3136
3137         if (slave_locks == NULL)
3138                 RETURN(0);
3139
3140         for (i = 1; i < slave_locks->lsl_lock_count; i++) {
3141                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
3142                         int     rc1;
3143
3144                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
3145                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
3146                                                policy);
3147                         if (rc1 < 0)
3148                                 rc = rc == 0 ? rc1 : rc;
3149                 }
3150         }
3151
3152         RETURN(rc);
3153 }
3154
3155 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
3156                              struct ldlm_enqueue_info *einfo,
3157                              union ldlm_policy_data *policy)
3158 {
3159         struct lod_object       *lo = lod_dt_obj(dt);
3160         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
3161         int                     slave_locks_size;
3162         int                     rc;
3163         ENTRY;
3164
3165         if (slave_locks == NULL)
3166                 RETURN(0);
3167
3168         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3169                 RETURN(-ENOTDIR);
3170
3171         rc = lod_load_striping(env, lo);
3172         if (rc != 0)
3173                 RETURN(rc);
3174
3175         /* Note: for remote lock for single stripe dir, MDT will cancel
3176          * the lock by lockh directly */
3177         if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
3178                 RETURN(0);
3179
3180         /* Only cancel slave lock for striped dir */
3181         rc = lod_object_unlock_internal(env, dt, einfo, policy);
3182
3183         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
3184                            sizeof(slave_locks->lsl_handle[0]);
3185         OBD_FREE(slave_locks, slave_locks_size);
3186         einfo->ei_cbdata = NULL;
3187
3188         RETURN(rc);
3189 }
3190
3191 static int lod_object_lock(const struct lu_env *env,
3192                            struct dt_object *dt,
3193                            struct lustre_handle *lh,
3194                            struct ldlm_enqueue_info *einfo,
3195                            union ldlm_policy_data *policy)
3196 {
3197         struct lod_object       *lo = lod_dt_obj(dt);
3198         int                     rc = 0;
3199         int                     i;
3200         int                     slave_locks_size;
3201         struct lod_slave_locks  *slave_locks = NULL;
3202         ENTRY;
3203
3204         /* remote object lock */
3205         if (!einfo->ei_enq_slave) {
3206                 LASSERT(dt_object_remote(dt));
3207                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
3208                                       policy);
3209         }
3210
3211         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3212                 RETURN(-ENOTDIR);
3213
3214         rc = lod_load_striping(env, lo);
3215         if (rc != 0)
3216                 RETURN(rc);
3217
3218         /* No stripes */
3219         if (lo->ldo_stripenr <= 1)
3220                 RETURN(0);
3221
3222         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
3223                            sizeof(slave_locks->lsl_handle[0]);
3224         /* Freed in lod_object_unlock */
3225         OBD_ALLOC(slave_locks, slave_locks_size);
3226         if (slave_locks == NULL)
3227                 RETURN(-ENOMEM);
3228         slave_locks->lsl_lock_count = lo->ldo_stripenr;
3229
3230         /* striped directory lock */
3231         for (i = 1; i < lo->ldo_stripenr; i++) {
3232                 struct lustre_handle    lockh;
3233                 struct ldlm_res_id      *res_id;
3234
3235                 res_id = &lod_env_info(env)->lti_res_id;
3236                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3237                                        res_id);
3238                 einfo->ei_res_id = res_id;
3239
3240                 LASSERT(lo->ldo_stripe[i]);
3241                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3242                                     policy);
3243                 if (rc != 0)
3244                         GOTO(out, rc);
3245                 slave_locks->lsl_handle[i] = lockh;
3246         }
3247
3248         einfo->ei_cbdata = slave_locks;
3249
3250 out:
3251         if (rc != 0 && slave_locks != NULL) {
3252                 einfo->ei_cbdata = slave_locks;
3253                 lod_object_unlock_internal(env, dt, einfo, policy);
3254                 OBD_FREE(slave_locks, slave_locks_size);
3255                 einfo->ei_cbdata = NULL;
3256         }
3257
3258         RETURN(rc);
3259 }
3260
3261 struct dt_object_operations lod_obj_ops = {
3262         .do_read_lock           = lod_object_read_lock,
3263         .do_write_lock          = lod_object_write_lock,
3264         .do_read_unlock         = lod_object_read_unlock,
3265         .do_write_unlock        = lod_object_write_unlock,
3266         .do_write_locked        = lod_object_write_locked,
3267         .do_attr_get            = lod_attr_get,
3268         .do_declare_attr_set    = lod_declare_attr_set,
3269         .do_attr_set            = lod_attr_set,
3270         .do_xattr_get           = lod_xattr_get,
3271         .do_declare_xattr_set   = lod_declare_xattr_set,
3272         .do_xattr_set           = lod_xattr_set,
3273         .do_declare_xattr_del   = lod_declare_xattr_del,
3274         .do_xattr_del           = lod_xattr_del,
3275         .do_xattr_list          = lod_xattr_list,
3276         .do_ah_init             = lod_ah_init,
3277         .do_declare_create      = lod_declare_object_create,
3278         .do_create              = lod_object_create,
3279         .do_declare_destroy     = lod_declare_object_destroy,
3280         .do_destroy             = lod_object_destroy,
3281         .do_index_try           = lod_index_try,
3282         .do_declare_ref_add     = lod_declare_ref_add,
3283         .do_ref_add             = lod_ref_add,
3284         .do_declare_ref_del     = lod_declare_ref_del,
3285         .do_ref_del             = lod_ref_del,
3286         .do_capa_get            = lod_capa_get,
3287         .do_object_sync         = lod_object_sync,
3288         .do_object_lock         = lod_object_lock,
3289         .do_object_unlock       = lod_object_unlock,
3290 };
3291
3292 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3293                         struct lu_buf *buf, loff_t *pos,
3294                         struct lustre_capa *capa)
3295 {
3296         struct dt_object *next = dt_object_child(dt);
3297         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3298 }
3299
3300 static ssize_t lod_declare_write(const struct lu_env *env,
3301                                  struct dt_object *dt,
3302                                  const struct lu_buf *buf, loff_t pos,
3303                                  struct thandle *th)
3304 {
3305         return dt_declare_record_write(env, dt_object_child(dt),
3306                                        buf, pos, th);
3307 }
3308
3309 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3310                          const struct lu_buf *buf, loff_t *pos,
3311                          struct thandle *th, struct lustre_capa *capa, int iq)
3312 {
3313         struct dt_object *next = dt_object_child(dt);
3314         LASSERT(next);
3315         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3316 }
3317
3318 static const struct dt_body_operations lod_body_lnk_ops = {
3319         .dbo_read               = lod_read,
3320         .dbo_declare_write      = lod_declare_write,
3321         .dbo_write              = lod_write
3322 };
3323
3324 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3325                            const struct lu_object_conf *conf)
3326 {
3327         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
3328         struct lu_device        *cdev   = NULL;
3329         struct lu_object        *cobj;
3330         struct lod_tgt_descs    *ltd    = NULL;
3331         struct lod_tgt_desc     *tgt;
3332         u32                      idx    = 0;
3333         int                      type   = LU_SEQ_RANGE_ANY;
3334         int                      rc;
3335         ENTRY;
3336
3337         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3338         if (rc != 0)
3339                 RETURN(rc);
3340
3341         if (type == LU_SEQ_RANGE_MDT &&
3342             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3343                 cdev = &lod->lod_child->dd_lu_dev;
3344         } else if (type == LU_SEQ_RANGE_MDT) {
3345                 ltd = &lod->lod_mdt_descs;
3346                 lod_getref(ltd);
3347         } else if (type == LU_SEQ_RANGE_OST) {
3348                 ltd = &lod->lod_ost_descs;
3349                 lod_getref(ltd);
3350         } else {
3351                 LBUG();
3352         }
3353
3354         if (ltd != NULL) {
3355                 if (ltd->ltd_tgts_size > idx &&
3356                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3357                         tgt = LTD_TGT(ltd, idx);
3358
3359                         LASSERT(tgt != NULL);
3360                         LASSERT(tgt->ltd_tgt != NULL);
3361
3362                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
3363                 }
3364                 lod_putref(lod, ltd);
3365         }
3366
3367         if (unlikely(cdev == NULL))
3368                 RETURN(-ENOENT);
3369
3370         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3371         if (unlikely(cobj == NULL))
3372                 RETURN(-ENOMEM);
3373
3374         lu_object_add(lo, cobj);
3375
3376         RETURN(0);
3377 }
3378
3379 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3380 {
3381         int i;
3382
3383         if (lo->ldo_dir_stripe != NULL) {
3384                 OBD_FREE_PTR(lo->ldo_dir_stripe);
3385                 lo->ldo_dir_stripe = NULL;
3386         }
3387
3388         if (lo->ldo_stripe) {
3389                 LASSERT(lo->ldo_stripes_allocated > 0);
3390
3391                 for (i = 0; i < lo->ldo_stripenr; i++) {
3392                         if (lo->ldo_stripe[i])
3393                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3394                 }
3395
3396                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3397                 OBD_FREE(lo->ldo_stripe, i);
3398                 lo->ldo_stripe = NULL;
3399                 lo->ldo_stripes_allocated = 0;
3400         }
3401         lo->ldo_stripenr = 0;
3402         lo->ldo_pattern = 0;
3403 }
3404
3405 /*
3406  * ->start is called once all slices are initialized, including header's
3407  * cache for mode (object type). using the type we can initialize ops
3408  */
3409 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3410 {
3411         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3412                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3413         return 0;
3414 }
3415
3416 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3417 {
3418         struct lod_object *mo = lu2lod_obj(o);
3419
3420         /*
3421          * release all underlying object pinned
3422          */
3423
3424         lod_object_free_striping(env, mo);
3425
3426         lod_object_set_pool(mo, NULL);
3427
3428         lu_object_fini(o);
3429         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3430 }
3431
3432 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3433 {
3434         /* XXX: shouldn't we release everything here in case if object
3435          * creation failed before? */
3436 }
3437
3438 static int lod_object_print(const struct lu_env *env, void *cookie,
3439                             lu_printer_t p, const struct lu_object *l)
3440 {
3441         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3442
3443         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3444 }
3445
3446 struct lu_object_operations lod_lu_obj_ops = {
3447         .loo_object_init        = lod_object_init,
3448         .loo_object_start       = lod_object_start,
3449         .loo_object_free        = lod_object_free,
3450         .loo_object_release     = lod_object_release,
3451         .loo_object_print       = lod_object_print,
3452 };