Whamcloud - gitweb
fb43a8dd5d13d3cb2c8c4e98fed2de8a67bf6a7a
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
48
49 #include "lod_internal.h"
50
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
53
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
56
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58                             struct dt_rec *rec, const struct dt_key *key,
59                             struct lustre_capa *capa)
60 {
61         struct dt_object *next = dt_object_child(dt);
62         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
63 }
64
65 static int lod_declare_index_insert(const struct lu_env *env,
66                                     struct dt_object *dt,
67                                     const struct dt_rec *rec,
68                                     const struct dt_key *key,
69                                     struct thandle *handle)
70 {
71         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
72 }
73
74 static int lod_index_insert(const struct lu_env *env,
75                             struct dt_object *dt,
76                             const struct dt_rec *rec,
77                             const struct dt_key *key,
78                             struct thandle *th,
79                             struct lustre_capa *capa,
80                             int ign)
81 {
82         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
83 }
84
85 static int lod_declare_index_delete(const struct lu_env *env,
86                                     struct dt_object *dt,
87                                     const struct dt_key *key,
88                                     struct thandle *th)
89 {
90         return dt_declare_delete(env, dt_object_child(dt), key, th);
91 }
92
93 static int lod_index_delete(const struct lu_env *env,
94                             struct dt_object *dt,
95                             const struct dt_key *key,
96                             struct thandle *th,
97                             struct lustre_capa *capa)
98 {
99         return dt_delete(env, dt_object_child(dt), key, th, capa);
100 }
101
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103                                  struct dt_object *dt, __u32 attr,
104                                  struct lustre_capa *capa)
105 {
106         struct dt_object        *next = dt_object_child(dt);
107         struct lod_it           *it = &lod_env_info(env)->lti_it;
108         struct dt_it            *it_next;
109
110
111         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
112         if (IS_ERR(it_next))
113                 return it_next;
114
115         /* currently we do not use more than one iterator per thread
116          * so we store it in thread info. if at some point we need
117          * more active iterators in a single thread, we can allocate
118          * additional ones */
119         LASSERT(it->lit_obj == NULL);
120
121         it->lit_it = it_next;
122         it->lit_obj = next;
123
124         return (struct dt_it *)it;
125 }
126
127 #define LOD_CHECK_IT(env, it)                                   \
128 do {                                                            \
129         LASSERT((it)->lit_obj != NULL);                         \
130         LASSERT((it)->lit_it != NULL);                          \
131 } while (0)
132
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
134 {
135         struct lod_it *it = (struct lod_it *)di;
136
137         LOD_CHECK_IT(env, it);
138         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
139
140         /* the iterator not in use any more */
141         it->lit_obj = NULL;
142         it->lit_it = NULL;
143 }
144
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146                const struct dt_key *key)
147 {
148         const struct lod_it *it = (const struct lod_it *)di;
149
150         LOD_CHECK_IT(env, it);
151         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
152 }
153
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
155 {
156         struct lod_it *it = (struct lod_it *)di;
157
158         LOD_CHECK_IT(env, it);
159         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
160 }
161
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
163 {
164         struct lod_it *it = (struct lod_it *)di;
165
166         LOD_CHECK_IT(env, it);
167         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
168 }
169
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
171 {
172         const struct lod_it *it = (const struct lod_it *)di;
173
174         LOD_CHECK_IT(env, it);
175         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
176 }
177
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
179 {
180         struct lod_it *it = (struct lod_it *)di;
181
182         LOD_CHECK_IT(env, it);
183         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
184 }
185
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187                struct dt_rec *rec, __u32 attr)
188 {
189         const struct lod_it *it = (const struct lod_it *)di;
190
191         LOD_CHECK_IT(env, it);
192         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
193                                                      attr);
194 }
195
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
197                     __u32 attr)
198 {
199         const struct lod_it *it = (const struct lod_it *)di;
200
201         LOD_CHECK_IT(env, it);
202         return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
203                                                           attr);
204 }
205
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
207 {
208         const struct lod_it *it = (const struct lod_it *)di;
209
210         LOD_CHECK_IT(env, it);
211         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
212 }
213
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
215 {
216         const struct lod_it *it = (const struct lod_it *)di;
217
218         LOD_CHECK_IT(env, it);
219         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
220 }
221
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
223                    void *key_rec)
224 {
225         const struct lod_it *it = (const struct lod_it *)di;
226
227         LOD_CHECK_IT(env, it);
228         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
229                                                          key_rec);
230 }
231
232 static struct dt_index_operations lod_index_ops = {
233         .dio_lookup             = lod_index_lookup,
234         .dio_declare_insert     = lod_declare_index_insert,
235         .dio_insert             = lod_index_insert,
236         .dio_declare_delete     = lod_declare_index_delete,
237         .dio_delete             = lod_index_delete,
238         .dio_it = {
239                 .init           = lod_it_init,
240                 .fini           = lod_it_fini,
241                 .get            = lod_it_get,
242                 .put            = lod_it_put,
243                 .next           = lod_it_next,
244                 .key            = lod_it_key,
245                 .key_size       = lod_it_key_size,
246                 .rec            = lod_it_rec,
247                 .rec_size       = lod_it_rec_size,
248                 .store          = lod_it_store,
249                 .load           = lod_it_load,
250                 .key_rec        = lod_it_key_rec,
251         }
252 };
253
254 /**
255  * Implementation of dt_index_operations:: dio_it.init
256  *
257  * This function is to initialize the iterator for striped directory,
258  * basically these lod_striped_it_xxx will just locate the stripe
259  * and call the correspondent api of its next lower layer.
260  *
261  * \param[in] env       execution environment.
262  * \param[in] dt        the striped directory object to be iterated.
263  * \param[in] attr      the attribute of iterator, mostly used to indicate
264  *                      the entry attribute in the object to be iterated.
265  * \param[in] capa      capability(useless in current implementation)
266  *
267  * \retval      initialized iterator(dt_it) if successful initialize the
268  *              iteration. lit_stripe_index will be used to indicate the
269  *              current iterate position among stripes.
270  * \retval      ERR pointer if initialization is failed.
271  */
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273                                          struct dt_object *dt, __u32 attr,
274                                          struct lustre_capa *capa)
275 {
276         struct lod_object       *lo = lod_dt_obj(dt);
277         struct dt_object        *next;
278         struct lod_it           *it = &lod_env_info(env)->lti_it;
279         struct dt_it            *it_next;
280         ENTRY;
281
282         LASSERT(lo->ldo_stripenr > 0);
283         next = lo->ldo_stripe[0];
284         LASSERT(next != NULL);
285         LASSERT(next->do_index_ops != NULL);
286
287         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
288         if (IS_ERR(it_next))
289                 return it_next;
290
291         /* currently we do not use more than one iterator per thread
292          * so we store it in thread info. if at some point we need
293          * more active iterators in a single thread, we can allocate
294          * additional ones */
295         LASSERT(it->lit_obj == NULL);
296
297         it->lit_stripe_index = 0;
298         it->lit_attr = attr;
299         it->lit_it = it_next;
300         it->lit_obj = dt;
301
302         return (struct dt_it *)it;
303 }
304
305 #define LOD_CHECK_STRIPED_IT(env, it, lo)                       \
306 do {                                                            \
307         LASSERT((it)->lit_obj != NULL);                         \
308         LASSERT((it)->lit_it != NULL);                          \
309         LASSERT((lo)->ldo_stripenr > 0);                        \
310         LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr);   \
311 } while (0)
312
313 /**
314  * Implementation of dt_index_operations:: dio_it.fini
315  *
316  * This function is to finish the iterator for striped directory.
317  *
318  * \param[in] env       execution environment.
319  * \param[in] di        the iterator for the striped directory
320  *
321  */
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
323 {
324         struct lod_it           *it = (struct lod_it *)di;
325         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
326         struct dt_object        *next;
327
328         LOD_CHECK_STRIPED_IT(env, it, lo);
329
330         next = lo->ldo_stripe[it->lit_stripe_index];
331         LASSERT(next != NULL);
332         LASSERT(next->do_index_ops != NULL);
333
334         next->do_index_ops->dio_it.fini(env, it->lit_it);
335
336         /* the iterator not in use any more */
337         it->lit_obj = NULL;
338         it->lit_it = NULL;
339         it->lit_stripe_index = 0;
340 }
341
342 /**
343  * Implementation of dt_index_operations:: dio_it.get
344  *
345  * This function is to position the iterator with given key
346  *
347  * \param[in] env       execution environment.
348  * \param[in] di        the iterator for striped directory.
349  * \param[in] key       the key the iterator will be positioned.
350  *
351  * \retval      0 if successfully position iterator by the key.
352  * \retval      negative error if position is failed.
353  */
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355                               const struct dt_key *key)
356 {
357         const struct lod_it     *it = (const struct lod_it *)di;
358         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
359         struct dt_object        *next;
360         ENTRY;
361
362         LOD_CHECK_STRIPED_IT(env, it, lo);
363
364         next = lo->ldo_stripe[it->lit_stripe_index];
365         LASSERT(next != NULL);
366         LASSERT(next->do_index_ops != NULL);
367
368         return next->do_index_ops->dio_it.get(env, it->lit_it, key);
369 }
370
371 /**
372  * Implementation of dt_index_operations:: dio_it.put
373  *
374  * This function is supposed to be the pair of it_get, but currently do
375  * nothing. see (osd_it_ea_put or osd_index_it_put)
376  */
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
378 {
379         struct lod_it           *it = (struct lod_it *)di;
380         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
381         struct dt_object        *next;
382
383         LOD_CHECK_STRIPED_IT(env, it, lo);
384
385         next = lo->ldo_stripe[it->lit_stripe_index];
386         LASSERT(next != NULL);
387         LASSERT(next->do_index_ops != NULL);
388
389         return next->do_index_ops->dio_it.put(env, it->lit_it);
390 }
391
392 /**
393  * Implementation of dt_index_operations:: dio_it.next
394  *
395  * This function is to position the iterator to the next entry, if current
396  * stripe is finished by checking the return value of next() in current
397  * stripe. it will go to next stripe. In the mean time, the sub-iterator
398  * for next stripe needs to be initialized.
399  *
400  * \param[in] env       execution environment.
401  * \param[in] di        the iterator for striped directory.
402  *
403  * \retval      0 if successfully position iterator to the next entry.
404  * \retval      negative error if position is failed.
405  */
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
407 {
408         struct lod_it           *it = (struct lod_it *)di;
409         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
410         struct dt_object        *next;
411         struct dt_it            *it_next;
412         int                     rc;
413         ENTRY;
414
415         LOD_CHECK_STRIPED_IT(env, it, lo);
416
417         next = lo->ldo_stripe[it->lit_stripe_index];
418         LASSERT(next != NULL);
419         LASSERT(next->do_index_ops != NULL);
420 again:
421         rc = next->do_index_ops->dio_it.next(env, it->lit_it);
422         if (rc < 0)
423                 RETURN(rc);
424
425         if (rc == 0 && it->lit_stripe_index == 0)
426                 RETURN(rc);
427
428         if (rc == 0 && it->lit_stripe_index > 0) {
429                 struct lu_dirent *ent;
430
431                 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
432
433                 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434                                                     (struct dt_rec *)ent,
435                                                     it->lit_attr);
436                 if (rc != 0)
437                         RETURN(rc);
438
439                 /* skip . and .. for slave stripe */
440                 if ((strncmp(ent->lde_name, ".",
441                              le16_to_cpu(ent->lde_namelen)) == 0 &&
442                      le16_to_cpu(ent->lde_namelen) == 1) ||
443                     (strncmp(ent->lde_name, "..",
444                              le16_to_cpu(ent->lde_namelen)) == 0 &&
445                      le16_to_cpu(ent->lde_namelen) == 2))
446                         goto again;
447
448                 RETURN(rc);
449         }
450
451         /* go to next stripe */
452         if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
453                 RETURN(1);
454
455         it->lit_stripe_index++;
456
457         next->do_index_ops->dio_it.put(env, it->lit_it);
458         next->do_index_ops->dio_it.fini(env, it->lit_it);
459
460         rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
461         if (rc != 0)
462                 RETURN(rc);
463
464         next = lo->ldo_stripe[it->lit_stripe_index];
465         LASSERT(next != NULL);
466         LASSERT(next->do_index_ops != NULL);
467
468         it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
469                                                   BYPASS_CAPA);
470         if (!IS_ERR(it_next)) {
471                 it->lit_it = it_next;
472                 goto again;
473         } else {
474                 rc = PTR_ERR(it_next);
475         }
476
477         RETURN(rc);
478 }
479
480 /**
481  * Implementation of dt_index_operations:: dio_it.key
482  *
483  * This function is to get the key of the iterator at current position.
484  *
485  * \param[in] env       execution environment.
486  * \param[in] di        the iterator for striped directory.
487  *
488  * \retval      key(dt_key) if successfully get the key.
489  * \retval      negative error if can not get the key.
490  */
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492                                          const struct dt_it *di)
493 {
494         const struct lod_it     *it = (const struct lod_it *)di;
495         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
496         struct dt_object        *next;
497
498         LOD_CHECK_STRIPED_IT(env, it, lo);
499
500         next = lo->ldo_stripe[it->lit_stripe_index];
501         LASSERT(next != NULL);
502         LASSERT(next->do_index_ops != NULL);
503
504         return next->do_index_ops->dio_it.key(env, it->lit_it);
505 }
506
507 /**
508  * Implementation of dt_index_operations:: dio_it.key_size
509  *
510  * This function is to get the key_size of current key.
511  *
512  * \param[in] env       execution environment.
513  * \param[in] di        the iterator for striped directory.
514  *
515  * \retval      key_size if successfully get the key_size.
516  * \retval      negative error if can not get the key_size.
517  */
518 static int lod_striped_it_key_size(const struct lu_env *env,
519                                    const struct dt_it *di)
520 {
521         struct lod_it           *it = (struct lod_it *)di;
522         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
523         struct dt_object        *next;
524
525         LOD_CHECK_STRIPED_IT(env, it, lo);
526
527         next = lo->ldo_stripe[it->lit_stripe_index];
528         LASSERT(next != NULL);
529         LASSERT(next->do_index_ops != NULL);
530
531         return next->do_index_ops->dio_it.key_size(env, it->lit_it);
532 }
533
534 /**
535  * Implementation of dt_index_operations:: dio_it.rec
536  *
537  * This function is to get the record at current position.
538  *
539  * \param[in] env       execution environment.
540  * \param[in] di        the iterator for striped directory.
541  * \param[in] attr      the attribute of iterator, mostly used to indicate
542  *                      the entry attribute in the object to be iterated.
543  * \param[out] rec      hold the return record.
544  *
545  * \retval      0 if successfully get the entry.
546  * \retval      negative error if can not get entry.
547  */
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549                               struct dt_rec *rec, __u32 attr)
550 {
551         const struct lod_it     *it = (const struct lod_it *)di;
552         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
553         struct dt_object        *next;
554
555         LOD_CHECK_STRIPED_IT(env, it, lo);
556
557         next = lo->ldo_stripe[it->lit_stripe_index];
558         LASSERT(next != NULL);
559         LASSERT(next->do_index_ops != NULL);
560
561         return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
562 }
563
564 /**
565  * Implementation of dt_index_operations:: dio_it.rec_size
566  *
567  * This function is to get the record_size at current record.
568  *
569  * \param[in] env       execution environment.
570  * \param[in] di        the iterator for striped directory.
571  * \param[in] attr      the attribute of iterator, mostly used to indicate
572  *                      the entry attribute in the object to be iterated.
573  *
574  * \retval      rec_size if successfully get the entry size.
575  * \retval      negative error if can not get entry size.
576  */
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578                                    const struct dt_it *di, __u32 attr)
579 {
580         struct lod_it           *it = (struct lod_it *)di;
581         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
582         struct dt_object        *next;
583
584         LOD_CHECK_STRIPED_IT(env, it, lo);
585
586         next = lo->ldo_stripe[it->lit_stripe_index];
587         LASSERT(next != NULL);
588         LASSERT(next->do_index_ops != NULL);
589
590         return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
591 }
592
593 /**
594  * Implementation of dt_index_operations:: dio_it.store
595  *
596  * This function will a cookie for current position of the iterator head,
597  * so that user can use this cookie to load/start the iterator next time.
598  *
599  * \param[in] env       execution environment.
600  * \param[in] di        the iterator for striped directory.
601  *
602  * \retval      the cookie.
603  */
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605                                   const struct dt_it *di)
606 {
607         const struct lod_it     *it = (const struct lod_it *)di;
608         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
609         struct dt_object        *next;
610
611         LOD_CHECK_STRIPED_IT(env, it, lo);
612
613         next = lo->ldo_stripe[it->lit_stripe_index];
614         LASSERT(next != NULL);
615         LASSERT(next->do_index_ops != NULL);
616
617         return next->do_index_ops->dio_it.store(env, it->lit_it);
618 }
619
620 /**
621  * Implementation of dt_index_operations:: dio_it.load
622  *
623  * This function will position the iterator with the given hash(usually
624  * get from store),
625  *
626  * \param[in] env       execution environment.
627  * \param[in] di        the iterator for striped directory.
628  * \param[in] hash      the given hash.
629  *
630  * \retval      >0 if successfuly load the iterator to the given position.
631  * \retval      <0 if load is failed.
632  */
633 static int lod_striped_it_load(const struct lu_env *env,
634                                const struct dt_it *di, __u64 hash)
635 {
636         const struct lod_it     *it = (const struct lod_it *)di;
637         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
638         struct dt_object        *next;
639
640         LOD_CHECK_STRIPED_IT(env, it, lo);
641
642         next = lo->ldo_stripe[it->lit_stripe_index];
643         LASSERT(next != NULL);
644         LASSERT(next->do_index_ops != NULL);
645
646         return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
647 }
648
649 static struct dt_index_operations lod_striped_index_ops = {
650         .dio_lookup             = lod_index_lookup,
651         .dio_declare_insert     = lod_declare_index_insert,
652         .dio_insert             = lod_index_insert,
653         .dio_declare_delete     = lod_declare_index_delete,
654         .dio_delete             = lod_index_delete,
655         .dio_it = {
656                 .init           = lod_striped_it_init,
657                 .fini           = lod_striped_it_fini,
658                 .get            = lod_striped_it_get,
659                 .put            = lod_striped_it_put,
660                 .next           = lod_striped_it_next,
661                 .key            = lod_striped_it_key,
662                 .key_size       = lod_striped_it_key_size,
663                 .rec            = lod_striped_it_rec,
664                 .rec_size       = lod_striped_it_rec_size,
665                 .store          = lod_striped_it_store,
666                 .load           = lod_striped_it_load,
667         }
668 };
669
670 /**
671  * Append the FID for each shard of the striped directory after the
672  * given LMV EA header.
673  *
674  * To simplify striped directory and the consistency verification,
675  * we only store the LMV EA header on disk, for both master object
676  * and slave objects. When someone wants to know the whole LMV EA,
677  * such as client readdir(), we can build the entrie LMV EA on the
678  * MDT side (in RAM) via iterating the sub-directory entries that
679  * are contained in the master object of the stripe directory.
680  *
681  * For the master object of the striped directroy, the valid name
682  * for each shard is composed of the ${shard_FID}:${shard_idx}.
683  *
684  * There may be holes in the LMV EA if some shards' name entries
685  * are corrupted or lost.
686  *
687  * \param[in] env       pointer to the thread context
688  * \param[in] lo        pointer to the master object of the striped directory
689  * \param[in] buf       pointer to the lu_buf which will hold the LMV EA
690  * \param[in] resize    whether re-allocate the buffer if it is not big enough
691  *
692  * \retval              positive size of the LMV EA
693  * \retval              0 for nothing to be loaded
694  * \retval              negative error number on failure
695  */
696 int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
697                         struct lu_buf *buf, bool resize)
698 {
699         struct lu_dirent        *ent    =
700                         (struct lu_dirent *)lod_env_info(env)->lti_key;
701         struct lod_device       *lod    = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
702         struct dt_object        *obj    = dt_object_child(&lo->ldo_obj);
703         struct lmv_mds_md_v1    *lmv1   = buf->lb_buf;
704         struct dt_it            *it;
705         const struct dt_it_ops  *iops;
706         __u32                    stripes;
707         __u32                    magic  = le32_to_cpu(lmv1->lmv_magic);
708         int                      size;
709         int                      rc;
710         ENTRY;
711
712         /* If it is not a striped directory, then load nothing. */
713         if (magic != LMV_MAGIC_V1)
714                 RETURN(0);
715
716         /* If it is in migration (or failure), then load nothing. */
717         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
718                 RETURN(0);
719
720         stripes = le32_to_cpu(lmv1->lmv_stripe_count);
721         if (stripes < 1)
722                 RETURN(0);
723
724         size = lmv_mds_md_size(stripes, magic);
725         if (buf->lb_len < size) {
726                 struct lu_buf tbuf;
727
728                 if (!resize)
729                         RETURN(-ERANGE);
730
731                 tbuf = *buf;
732                 buf->lb_buf = NULL;
733                 buf->lb_len = 0;
734                 lu_buf_alloc(buf, size);
735                 lmv1 = buf->lb_buf;
736                 if (lmv1 == NULL)
737                         RETURN(-ENOMEM);
738
739                 memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len);
740         }
741
742         if (unlikely(!dt_try_as_dir(env, obj)))
743                 RETURN(-ENOTDIR);
744
745         memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid));
746         iops = &obj->do_index_ops->dio_it;
747         it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA);
748         if (IS_ERR(it))
749                 RETURN(PTR_ERR(it));
750
751         rc = iops->load(env, it, 0);
752         if (rc == 0)
753                 rc = iops->next(env, it);
754         else if (rc > 0)
755                 rc = 0;
756
757         while (rc == 0) {
758                 char             name[FID_LEN + 2] = "";
759                 struct lu_fid    fid;
760                 __u32            index;
761                 int              len;
762
763                 rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
764                 if (rc != 0)
765                         break;
766
767                 rc = -EIO;
768
769                 fid_le_to_cpu(&fid, &ent->lde_fid);
770                 ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
771                 if (ent->lde_name[0] == '.') {
772                         if (ent->lde_namelen == 1)
773                                 goto next;
774
775                         if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
776                                 goto next;
777                 }
778
779                 len = snprintf(name, FID_LEN + 1, DFID":", PFID(&ent->lde_fid));
780                 /* The ent->lde_name is composed of ${FID}:${index} */
781                 if (ent->lde_namelen < len + 1 ||
782                     memcmp(ent->lde_name, name, len) != 0) {
783                         CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
784                                "%s: invalid shard name %.*s with the FID "DFID
785                                " for the striped directory "DFID", %s\n",
786                                lod2obd(lod)->obd_name, ent->lde_namelen,
787                                ent->lde_name, PFID(&fid),
788                                PFID(lu_object_fid(&obj->do_lu)),
789                                lod->lod_lmv_failout ? "failout" : "skip");
790
791                         if (lod->lod_lmv_failout)
792                                 break;
793
794                         goto next;
795                 }
796
797                 index = 0;
798                 do {
799                         if (ent->lde_name[len] < '0' ||
800                             ent->lde_name[len] > '9') {
801                                 CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO,
802                                        "%s: invalid shard name %.*s with the "
803                                        "FID "DFID" for the striped directory "
804                                        DFID", %s\n",
805                                        lod2obd(lod)->obd_name, ent->lde_namelen,
806                                        ent->lde_name, PFID(&fid),
807                                        PFID(lu_object_fid(&obj->do_lu)),
808                                        lod->lod_lmv_failout ?
809                                        "failout" : "skip");
810
811                                 if (lod->lod_lmv_failout)
812                                         break;
813
814                                 goto next;
815                         }
816
817                         index = index * 10 + ent->lde_name[len++] - '0';
818                 } while (len < ent->lde_namelen);
819
820                 if (len == ent->lde_namelen) {
821                         /* Out of LMV EA range. */
822                         if (index >= stripes) {
823                                 CERROR("%s: the shard %.*s for the striped "
824                                        "directory "DFID" is out of the known "
825                                        "LMV EA range [0 - %u], failout\n",
826                                        lod2obd(lod)->obd_name, ent->lde_namelen,
827                                        ent->lde_name,
828                                        PFID(lu_object_fid(&obj->do_lu)),
829                                        stripes - 1);
830
831                                 break;
832                         }
833
834                         /* The slot has been occupied. */
835                         if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) {
836                                 struct lu_fid fid0;
837
838                                 fid_le_to_cpu(&fid0,
839                                         &lmv1->lmv_stripe_fids[index]);
840                                 CERROR("%s: both the shard "DFID" and "DFID
841                                        " for the striped directory "DFID
842                                        " claim the same LMV EA slot at the "
843                                        "index %d, failout\n",
844                                        lod2obd(lod)->obd_name,
845                                        PFID(&fid0), PFID(&fid),
846                                        PFID(lu_object_fid(&obj->do_lu)), index);
847
848                                 break;
849                         }
850
851                         /* stored as LE mode */
852                         lmv1->lmv_stripe_fids[index] = ent->lde_fid;
853
854 next:
855                         rc = iops->next(env, it);
856                 }
857         }
858
859         iops->put(env, it);
860         iops->fini(env, it);
861
862         RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc);
863 }
864
865 /**
866  * Implementation of dt_object_operations:: do_index_try
867  *
868  * This function will try to initialize the index api pointer for the
869  * given object, usually it the entry point of the index api. i.e.
870  * the index object should be initialized in index_try, then start
871  * using index api. For striped directory, it will try to initialize
872  * all of its sub_stripes.
873  *
874  * \param[in] env       execution environment.
875  * \param[in] dt        the index object to be initialized.
876  * \param[in] feat      the features of this object, for example fixed or
877  *                      variable key size etc.
878  *
879  * \retval      >0 if the initialization is successful.
880  * \retval      <0 if the initialization is failed.
881  */
882 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
883                          const struct dt_index_features *feat)
884 {
885         struct lod_object       *lo = lod_dt_obj(dt);
886         struct dt_object        *next = dt_object_child(dt);
887         int                     rc;
888         ENTRY;
889
890         LASSERT(next->do_ops);
891         LASSERT(next->do_ops->do_index_try);
892
893         rc = lod_load_striping_locked(env, lo);
894         if (rc != 0)
895                 RETURN(rc);
896
897         rc = next->do_ops->do_index_try(env, next, feat);
898         if (rc != 0)
899                 RETURN(rc);
900
901         if (lo->ldo_stripenr > 0) {
902                 int i;
903
904                 for (i = 0; i < lo->ldo_stripenr; i++) {
905                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
906                                 continue;
907                         rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
908                                                 lo->ldo_stripe[i], feat);
909                         if (rc != 0)
910                                 RETURN(rc);
911                 }
912                 dt->do_index_ops = &lod_striped_index_ops;
913         } else {
914                 dt->do_index_ops = &lod_index_ops;
915         }
916
917         RETURN(rc);
918 }
919
920 static void lod_object_read_lock(const struct lu_env *env,
921                                  struct dt_object *dt, unsigned role)
922 {
923         dt_read_lock(env, dt_object_child(dt), role);
924 }
925
926 static void lod_object_write_lock(const struct lu_env *env,
927                                   struct dt_object *dt, unsigned role)
928 {
929         dt_write_lock(env, dt_object_child(dt), role);
930 }
931
932 static void lod_object_read_unlock(const struct lu_env *env,
933                                    struct dt_object *dt)
934 {
935         dt_read_unlock(env, dt_object_child(dt));
936 }
937
938 static void lod_object_write_unlock(const struct lu_env *env,
939                                     struct dt_object *dt)
940 {
941         dt_write_unlock(env, dt_object_child(dt));
942 }
943
944 static int lod_object_write_locked(const struct lu_env *env,
945                                    struct dt_object *dt)
946 {
947         return dt_write_locked(env, dt_object_child(dt));
948 }
949
950 static int lod_attr_get(const struct lu_env *env,
951                         struct dt_object *dt,
952                         struct lu_attr *attr,
953                         struct lustre_capa *capa)
954 {
955         /* Note: for striped directory, client will merge attributes
956          * from all of the sub-stripes see lmv_merge_attr(), and there
957          * no MDD logic depend on directory nlink/size/time, so we can
958          * always use master inode nlink and size for now. */
959         return dt_attr_get(env, dt_object_child(dt), attr, capa);
960 }
961
962 /**
963  * Mark all of sub-stripes dead of the striped directory.
964  **/
965 static int lod_mark_dead_object(const struct lu_env *env,
966                                 struct dt_object *dt,
967                                 struct thandle *handle,
968                                 bool declare)
969 {
970         struct lod_object       *lo = lod_dt_obj(dt);
971         struct lmv_mds_md_v1    *lmv;
972         __u32                   dead_hash_type;
973         int                     rc;
974         int                     i;
975
976         ENTRY;
977
978         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
979                 RETURN(0);
980
981         rc = lod_load_striping_locked(env, lo);
982         if (rc != 0)
983                 RETURN(rc);
984
985         if (lo->ldo_stripenr == 0)
986                 RETURN(0);
987
988         rc = lod_get_lmv_ea(env, lo);
989         if (rc <= 0)
990                 RETURN(rc);
991
992         lmv = lod_env_info(env)->lti_ea_store;
993         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
994         dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
995         lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
996         for (i = 0; i < lo->ldo_stripenr; i++) {
997                 struct lu_buf buf;
998
999                 lmv->lmv_master_mdt_index = i;
1000                 buf.lb_buf = lmv;
1001                 buf.lb_len = sizeof(*lmv);
1002                 if (declare) {
1003                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
1004                                                   XATTR_NAME_LMV,
1005                                                   LU_XATTR_REPLACE, handle);
1006                 } else {
1007                         rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
1008                                           XATTR_NAME_LMV, LU_XATTR_REPLACE,
1009                                           handle, BYPASS_CAPA);
1010                 }
1011                 if (rc != 0)
1012                         break;
1013         }
1014
1015         RETURN(rc);
1016 }
1017
1018 static int lod_declare_attr_set(const struct lu_env *env,
1019                                 struct dt_object *dt,
1020                                 const struct lu_attr *attr,
1021                                 struct thandle *handle)
1022 {
1023         struct dt_object  *next = dt_object_child(dt);
1024         struct lod_object *lo = lod_dt_obj(dt);
1025         int                rc, i;
1026         ENTRY;
1027
1028         /* Set dead object on all other stripes */
1029         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1030             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1031                 rc = lod_mark_dead_object(env, dt, handle, true);
1032                 RETURN(rc);
1033         }
1034
1035         /*
1036          * declare setattr on the local object
1037          */
1038         rc = dt_declare_attr_set(env, next, attr, handle);
1039         if (rc)
1040                 RETURN(rc);
1041
1042         /* osp_declare_attr_set() ignores all attributes other than
1043          * UID, GID, and size, and osp_attr_set() ignores all but UID
1044          * and GID.  Declaration of size attr setting happens through
1045          * lod_declare_init_size(), and not through this function.
1046          * Therefore we need not load striping unless ownership is
1047          * changing.  This should save memory and (we hope) speed up
1048          * rename(). */
1049         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1050                 if (!(attr->la_valid & (LA_UID | LA_GID)))
1051                         RETURN(rc);
1052
1053                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1054                         RETURN(0);
1055         } else {
1056                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1057                                         LA_ATIME | LA_MTIME | LA_CTIME)))
1058                         RETURN(rc);
1059         }
1060         /*
1061          * load striping information, notice we don't do this when object
1062          * is being initialized as we don't need this information till
1063          * few specific cases like destroy, chown
1064          */
1065         rc = lod_load_striping(env, lo);
1066         if (rc)
1067                 RETURN(rc);
1068
1069         if (lo->ldo_stripenr == 0)
1070                 RETURN(0);
1071
1072         /*
1073          * if object is striped declare changes on the stripes
1074          */
1075         LASSERT(lo->ldo_stripe);
1076         for (i = 0; i < lo->ldo_stripenr; i++) {
1077                 if (likely(lo->ldo_stripe[i] != NULL)) {
1078                         rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
1079                                                  handle);
1080                         if (rc != 0) {
1081                                 CERROR("failed declaration: %d\n", rc);
1082                                 break;
1083                         }
1084                 }
1085         }
1086
1087         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1088             dt_object_exists(next) != 0 &&
1089             dt_object_remote(next) == 0)
1090                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
1091
1092         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1093             dt_object_exists(next) &&
1094             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1095                 struct lod_thread_info *info = lod_env_info(env);
1096                 struct lu_buf *buf = &info->lti_buf;
1097
1098                 buf->lb_buf = info->lti_ea_store;
1099                 buf->lb_len = info->lti_ea_store_size;
1100                 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
1101                                      LU_XATTR_REPLACE, handle);
1102         }
1103
1104         RETURN(rc);
1105 }
1106
1107 static int lod_attr_set(const struct lu_env *env,
1108                         struct dt_object *dt,
1109                         const struct lu_attr *attr,
1110                         struct thandle *handle,
1111                         struct lustre_capa *capa)
1112 {
1113         struct dt_object        *next = dt_object_child(dt);
1114         struct lod_object       *lo = lod_dt_obj(dt);
1115         int                     rc, i;
1116         ENTRY;
1117
1118         /* Set dead object on all other stripes */
1119         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
1120             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
1121                 rc = lod_mark_dead_object(env, dt, handle, false);
1122                 RETURN(rc);
1123         }
1124
1125         /*
1126          * apply changes to the local object
1127          */
1128         rc = dt_attr_set(env, next, attr, handle, capa);
1129         if (rc)
1130                 RETURN(rc);
1131
1132         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
1133                 if (!(attr->la_valid & (LA_UID | LA_GID)))
1134                         RETURN(rc);
1135
1136                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
1137                         RETURN(0);
1138         } else {
1139                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
1140                                         LA_ATIME | LA_MTIME | LA_CTIME)))
1141                         RETURN(rc);
1142         }
1143
1144         if (lo->ldo_stripenr == 0)
1145                 RETURN(0);
1146
1147         /*
1148          * if object is striped, apply changes to all the stripes
1149          */
1150         LASSERT(lo->ldo_stripe);
1151         for (i = 0; i < lo->ldo_stripenr; i++) {
1152                 if (likely(lo->ldo_stripe[i] != NULL)) {
1153                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
1154                                 continue;
1155
1156                         rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
1157                                          handle, capa);
1158                         if (rc != 0) {
1159                                 CERROR("failed declaration: %d\n", rc);
1160                                 break;
1161                         }
1162                 }
1163         }
1164
1165         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1166             dt_object_exists(next) != 0 &&
1167             dt_object_remote(next) == 0)
1168                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1169
1170         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1171             dt_object_exists(next) &&
1172             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1173                 struct lod_thread_info *info = lod_env_info(env);
1174                 struct lu_buf *buf = &info->lti_buf;
1175                 struct ost_id *oi = &info->lti_ostid;
1176                 struct lu_fid *fid = &info->lti_fid;
1177                 struct lov_mds_md_v1 *lmm;
1178                 struct lov_ost_data_v1 *objs;
1179                 __u32 magic;
1180                 int rc1;
1181
1182                 rc1 = lod_get_lov_ea(env, lo);
1183                 if (rc1  <= 0)
1184                         RETURN(rc);
1185
1186                 buf->lb_buf = info->lti_ea_store;
1187                 buf->lb_len = info->lti_ea_store_size;
1188                 lmm = info->lti_ea_store;
1189                 magic = le32_to_cpu(lmm->lmm_magic);
1190                 if (magic == LOV_MAGIC_V1)
1191                         objs = &(lmm->lmm_objects[0]);
1192                 else
1193                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1194                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1195                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1196                 fid->f_oid--;
1197                 fid_to_ostid(fid, oi);
1198                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1199                 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1200                              LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1201         }
1202
1203         RETURN(rc);
1204 }
1205
1206 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1207                          struct lu_buf *buf, const char *name,
1208                          struct lustre_capa *capa)
1209 {
1210         struct lod_thread_info  *info = lod_env_info(env);
1211         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
1212         int                      rc, is_root;
1213         ENTRY;
1214
1215         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1216         if (strcmp(name, XATTR_NAME_LMV) == 0) {
1217                 struct lmv_mds_md_v1    *lmv1;
1218                 int                      rc1 = 0;
1219
1220                 if (rc > sizeof(*lmv1))
1221                         RETURN(rc);
1222
1223                 if (rc < sizeof(*lmv1))
1224                         RETURN(rc = rc > 0 ? -EINVAL : rc);
1225
1226                 if (buf->lb_buf == NULL || buf->lb_len == 0) {
1227                         CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key));
1228
1229                         info->lti_buf.lb_buf = info->lti_key;
1230                         info->lti_buf.lb_len = sizeof(*lmv1);
1231                         rc = dt_xattr_get(env, dt_object_child(dt),
1232                                           &info->lti_buf, name, capa);
1233                         if (unlikely(rc != sizeof(*lmv1)))
1234                                 RETURN(rc = rc > 0 ? -EINVAL : rc);
1235
1236                         lmv1 = info->lti_buf.lb_buf;
1237                         /* The on-disk LMV EA only contains header, but the
1238                          * returned LMV EA size should contain the space for
1239                          * the FIDs of all shards of the striped directory. */
1240                         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
1241                                 rc = lmv_mds_md_size(
1242                                         le32_to_cpu(lmv1->lmv_stripe_count),
1243                                         LMV_MAGIC_V1);
1244                 } else {
1245                         rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt),
1246                                                   buf, false);
1247                 }
1248
1249                 RETURN(rc = rc1 != 0 ? rc1 : rc);
1250         }
1251
1252         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1253                 RETURN(rc);
1254
1255         /*
1256          * lod returns default striping on the real root of the device
1257          * this is like the root stores default striping for the whole
1258          * filesystem. historically we've been using a different approach
1259          * and store it in the config.
1260          */
1261         dt_root_get(env, dev->lod_child, &info->lti_fid);
1262         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1263
1264         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1265                 struct lov_user_md *lum = buf->lb_buf;
1266                 struct lov_desc    *desc = &dev->lod_desc;
1267
1268                 if (buf->lb_buf == NULL) {
1269                         rc = sizeof(*lum);
1270                 } else if (buf->lb_len >= sizeof(*lum)) {
1271                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1272                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1273                         lmm_oi_set_id(&lum->lmm_oi, 0);
1274                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1275                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1276                         lum->lmm_stripe_size = cpu_to_le32(
1277                                                 desc->ld_default_stripe_size);
1278                         lum->lmm_stripe_count = cpu_to_le16(
1279                                                 desc->ld_default_stripe_count);
1280                         lum->lmm_stripe_offset = cpu_to_le16(
1281                                                 desc->ld_default_stripe_offset);
1282                         rc = sizeof(*lum);
1283                 } else {
1284                         rc = -ERANGE;
1285                 }
1286         }
1287
1288         RETURN(rc);
1289 }
1290
1291 static int lod_verify_md_striping(struct lod_device *lod,
1292                                   const struct lmv_user_md_v1 *lum)
1293 {
1294         int     rc = 0;
1295         ENTRY;
1296
1297         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1298                 GOTO(out, rc = -EINVAL);
1299
1300         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1301                 GOTO(out, rc = -EINVAL);
1302 out:
1303         if (rc != 0)
1304                 CERROR("%s: invalid lmv_user_md: magic = %x, "
1305                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1306                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1307                        (int)le32_to_cpu(lum->lum_stripe_offset),
1308                        le32_to_cpu(lum->lum_stripe_count), rc);
1309         return rc;
1310 }
1311
1312 /**
1313  * Master LMVEA will be same as slave LMVEA, except
1314  * 1. different magic
1315  * 2. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1316  */
1317 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1318                                   const struct lmv_mds_md_v1 *master_lmv)
1319 {
1320         *slave_lmv = *master_lmv;
1321         slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1322 }
1323
1324 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1325                     struct lu_buf *lmv_buf)
1326 {
1327         struct lod_thread_info  *info = lod_env_info(env);
1328         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1329         struct lod_object       *lo = lod_dt_obj(dt);
1330         struct lmv_mds_md_v1    *lmm1;
1331         int                     stripe_count;
1332         int                     type = LU_SEQ_RANGE_ANY;
1333         int                     rc;
1334         __u32                   mdtidx;
1335         ENTRY;
1336
1337         LASSERT(lo->ldo_dir_striped != 0);
1338         LASSERT(lo->ldo_stripenr > 0);
1339         stripe_count = lo->ldo_stripenr;
1340         /* Only store the LMV EA heahder on the disk. */
1341         if (info->lti_ea_store_size < sizeof(*lmm1)) {
1342                 rc = lod_ea_store_resize(info, sizeof(*lmm1));
1343                 if (rc != 0)
1344                         RETURN(rc);
1345         } else {
1346                 memset(info->lti_ea_store, 0, sizeof(*lmm1));
1347         }
1348
1349         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1350         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1351         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1352         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1353         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1354                             &mdtidx, &type);
1355         if (rc != 0)
1356                 RETURN(rc);
1357
1358         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1359         lmv_buf->lb_buf = info->lti_ea_store;
1360         lmv_buf->lb_len = sizeof(*lmm1);
1361         lo->ldo_dir_striping_cached = 1;
1362
1363         RETURN(rc);
1364 }
1365
1366 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1367                            const struct lu_buf *buf)
1368 {
1369         struct lod_thread_info  *info = lod_env_info(env);
1370         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1371         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1372         struct dt_object        **stripe;
1373         union lmv_mds_md        *lmm = buf->lb_buf;
1374         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
1375         struct lu_fid           *fid = &info->lti_fid;
1376         int                     i;
1377         int                     rc = 0;
1378         ENTRY;
1379
1380         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1381                 RETURN(0);
1382
1383         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1384                 lo->ldo_dir_slave_stripe = 1;
1385                 RETURN(0);
1386         }
1387
1388         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1389                 RETURN(-EINVAL);
1390
1391         if (le32_to_cpu(lmv1->lmv_stripe_count) < 1)
1392                 RETURN(0);
1393
1394         LASSERT(lo->ldo_stripe == NULL);
1395         OBD_ALLOC(stripe, sizeof(stripe[0]) *
1396                   (le32_to_cpu(lmv1->lmv_stripe_count)));
1397         if (stripe == NULL)
1398                 RETURN(-ENOMEM);
1399
1400         for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1401                 struct dt_device        *tgt_dt;
1402                 struct dt_object        *dto;
1403                 int                     type = LU_SEQ_RANGE_ANY;
1404                 __u32                   idx;
1405
1406                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1407                 if (!fid_is_sane(fid))
1408                         GOTO(out, rc = -ESTALE);
1409
1410                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1411                 if (rc != 0)
1412                         GOTO(out, rc);
1413
1414                 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1415                         tgt_dt = lod->lod_child;
1416                 } else {
1417                         struct lod_tgt_desc     *tgt;
1418
1419                         tgt = LTD_TGT(ltd, idx);
1420                         if (tgt == NULL)
1421                                 GOTO(out, rc = -ESTALE);
1422                         tgt_dt = tgt->ltd_tgt;
1423                 }
1424
1425                 dto = dt_locate_at(env, tgt_dt, fid,
1426                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1427                                   NULL);
1428                 if (IS_ERR(dto))
1429                         GOTO(out, rc = PTR_ERR(dto));
1430
1431                 stripe[i] = dto;
1432         }
1433 out:
1434         lo->ldo_stripe = stripe;
1435         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1436         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1437         if (rc != 0)
1438                 lod_object_free_striping(env, lo);
1439
1440         RETURN(rc);
1441 }
1442
1443 static int lod_prep_md_striped_create(const struct lu_env *env,
1444                                       struct dt_object *dt,
1445                                       struct lu_attr *attr,
1446                                       const struct lmv_user_md_v1 *lum,
1447                                       struct dt_object_format *dof,
1448                                       struct thandle *th)
1449 {
1450         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1451         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1452         struct lod_object       *lo = lod_dt_obj(dt);
1453         struct lod_thread_info  *info = lod_env_info(env);
1454         struct dt_object        **stripe;
1455         struct lu_buf           lmv_buf;
1456         struct lu_buf           slave_lmv_buf;
1457         struct lmv_mds_md_v1    *lmm;
1458         struct lmv_mds_md_v1    *slave_lmm = NULL;
1459         int                     stripe_count;
1460         int                     *idx_array;
1461         int                     rc = 0;
1462         int                     i;
1463         int                     j;
1464         ENTRY;
1465
1466         /* The lum has been verifed in lod_verify_md_striping */
1467         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1468         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1469
1470         stripe_count = le32_to_cpu(lum->lum_stripe_count);
1471
1472         /* shrink the stripe_count to the avaible MDT count */
1473         if (stripe_count > lod->lod_remote_mdt_count + 1)
1474                 stripe_count = lod->lod_remote_mdt_count + 1;
1475
1476         OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1477         if (stripe == NULL)
1478                 RETURN(-ENOMEM);
1479
1480         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1481         if (idx_array == NULL)
1482                 GOTO(out_free, rc = -ENOMEM);
1483
1484         for (i = 0; i < stripe_count; i++) {
1485                 struct lod_tgt_desc     *tgt = NULL;
1486                 struct dt_object        *dto;
1487                 struct lu_fid           fid = { 0 };
1488                 int                     idx;
1489                 struct lu_object_conf   conf = { 0 };
1490                 struct dt_device        *tgt_dt = NULL;
1491
1492                 if (i == 0) {
1493                         /* Right now, master stripe and master object are
1494                          * on the same MDT */
1495                         idx = le32_to_cpu(lum->lum_stripe_offset);
1496                         rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1497                                            NULL);
1498                         if (rc < 0)
1499                                 GOTO(out_put, rc);
1500                         tgt_dt = lod->lod_child;
1501                         goto next;
1502                 }
1503
1504                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1505
1506                 for (j = 0; j < lod->lod_remote_mdt_count;
1507                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1508                         bool already_allocated = false;
1509                         int k;
1510
1511                         CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1512                                " allocated %d, last allocated %d\n", idx,
1513                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1514
1515                         /* Find next available target */
1516                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1517                                 continue;
1518
1519                         /* check whether the idx already exists
1520                          * in current allocated array */
1521                         for (k = 0; k < i; k++) {
1522                                 if (idx_array[k] == idx) {
1523                                         already_allocated = true;
1524                                         break;
1525                                 }
1526                         }
1527
1528                         if (already_allocated)
1529                                 continue;
1530
1531                         /* check the status of the OSP */
1532                         tgt = LTD_TGT(ltd, idx);
1533                         if (tgt == NULL)
1534                                 continue;
1535
1536                         tgt_dt = tgt->ltd_tgt;
1537                         rc = dt_statfs(env, tgt_dt, NULL);
1538                         if (rc) {
1539                                 /* this OSP doesn't feel well */
1540                                 rc = 0;
1541                                 continue;
1542                         }
1543
1544                         rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1545                         if (rc < 0) {
1546                                 rc = 0;
1547                                 continue;
1548                         }
1549
1550                         break;
1551                 }
1552
1553                 /* Can not allocate more stripes */
1554                 if (j == lod->lod_remote_mdt_count) {
1555                         CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1556                                lod2obd(lod)->obd_name, stripe_count, i - 1);
1557                         break;
1558                 }
1559
1560                 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1561                        " allocated %d, last allocated %d\n", idx,
1562                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1563
1564 next:
1565                 /* tgt_dt and fid must be ready after search avaible OSP
1566                  * in the above loop */
1567                 LASSERT(tgt_dt != NULL);
1568                 LASSERT(fid_is_sane(&fid));
1569                 conf.loc_flags = LOC_F_NEW;
1570                 dto = dt_locate_at(env, tgt_dt, &fid,
1571                                    dt->do_lu.lo_dev->ld_site->ls_top_dev,
1572                                    &conf);
1573                 if (IS_ERR(dto))
1574                         GOTO(out_put, rc = PTR_ERR(dto));
1575                 stripe[i] = dto;
1576                 idx_array[i] = idx;
1577         }
1578
1579         lo->ldo_dir_striped = 1;
1580         lo->ldo_stripe = stripe;
1581         lo->ldo_stripenr = i;
1582         lo->ldo_stripes_allocated = stripe_count;
1583
1584         if (lo->ldo_stripenr == 0)
1585                 GOTO(out_put, rc = -ENOSPC);
1586
1587         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1588         if (rc != 0)
1589                 GOTO(out_put, rc);
1590         lmm = lmv_buf.lb_buf;
1591
1592         OBD_ALLOC_PTR(slave_lmm);
1593         if (slave_lmm == NULL)
1594                 GOTO(out_put, rc = -ENOMEM);
1595
1596         lod_prep_slave_lmv_md(slave_lmm, lmm);
1597         slave_lmv_buf.lb_buf = slave_lmm;
1598         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1599
1600         if (!dt_try_as_dir(env, dt_object_child(dt)))
1601                 GOTO(out_put, rc = -EINVAL);
1602
1603         for (i = 0; i < lo->ldo_stripenr; i++) {
1604                 struct dt_object        *dto            = stripe[i];
1605                 char                    *stripe_name    = info->lti_key;
1606                 struct lu_name          *sname;
1607                 struct linkea_data       ldata          = { 0 };
1608                 struct lu_buf            linkea_buf;
1609
1610                 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1611                 if (rc != 0)
1612                         GOTO(out_put, rc);
1613
1614                 if (!dt_try_as_dir(env, dto))
1615                         GOTO(out_put, rc = -EINVAL);
1616
1617                 rc = dt_declare_insert(env, dto,
1618                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1619                      (const struct dt_key *)dot, th);
1620                 if (rc != 0)
1621                         GOTO(out_put, rc);
1622
1623                 /* master stripe FID will be put to .. */
1624                 rc = dt_declare_insert(env, dto,
1625                      (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1626                      (const struct dt_key *)dotdot, th);
1627                 if (rc != 0)
1628                         GOTO(out_put, rc);
1629
1630                 /* probably nothing to inherite */
1631                 if (lo->ldo_striping_cached &&
1632                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1633                                          lo->ldo_def_stripenr,
1634                                          lo->ldo_def_stripe_offset)) {
1635                         struct lov_user_md_v3   *v3;
1636
1637                         /* sigh, lti_ea_store has been used for lmv_buf,
1638                          * so we have to allocate buffer for default
1639                          * stripe EA */
1640                         OBD_ALLOC_PTR(v3);
1641                         if (v3 == NULL)
1642                                 GOTO(out_put, rc = -ENOMEM);
1643
1644                         memset(v3, 0, sizeof(*v3));
1645                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1646                         v3->lmm_stripe_count =
1647                                 cpu_to_le16(lo->ldo_def_stripenr);
1648                         v3->lmm_stripe_offset =
1649                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1650                         v3->lmm_stripe_size =
1651                                 cpu_to_le32(lo->ldo_def_stripe_size);
1652                         if (lo->ldo_pool != NULL)
1653                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1654                                         sizeof(v3->lmm_pool_name));
1655
1656                         info->lti_buf.lb_buf = v3;
1657                         info->lti_buf.lb_len = sizeof(*v3);
1658                         rc = dt_declare_xattr_set(env, dto,
1659                                                   &info->lti_buf,
1660                                                   XATTR_NAME_LOV,
1661                                                   0, th);
1662                         OBD_FREE_PTR(v3);
1663                         if (rc != 0)
1664                                 GOTO(out_put, rc);
1665                 }
1666
1667                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1668                 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1669                                           XATTR_NAME_LMV, 0, th);
1670                 if (rc != 0)
1671                         GOTO(out_put, rc);
1672
1673                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1674                         PFID(lu_object_fid(&dto->do_lu)), i);
1675
1676                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1677                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1678                 if (rc != 0)
1679                         GOTO(out_put, rc);
1680
1681                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1682                 if (rc != 0)
1683                         GOTO(out_put, rc);
1684
1685                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1686                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1687                 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1688                                           XATTR_NAME_LINK, 0, th);
1689                 if (rc != 0)
1690                         GOTO(out_put, rc);
1691
1692                 rc = dt_declare_insert(env, dt_object_child(dt),
1693                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1694                      (const struct dt_key *)stripe_name, th);
1695                 if (rc != 0)
1696                         GOTO(out_put, rc);
1697
1698                 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1699                 if (rc != 0)
1700                         GOTO(out_put, rc);
1701         }
1702
1703         rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1704                                   XATTR_NAME_LMV, 0, th);
1705         if (rc != 0)
1706                 GOTO(out_put, rc);
1707
1708 out_put:
1709         if (rc < 0) {
1710                 for (i = 0; i < stripe_count; i++)
1711                         if (stripe[i] != NULL)
1712                                 lu_object_put(env, &stripe[i]->do_lu);
1713                 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1714                 lo->ldo_stripenr = 0;
1715                 lo->ldo_stripes_allocated = 0;
1716                 lo->ldo_stripe = NULL;
1717         }
1718
1719 out_free:
1720         if (idx_array != NULL)
1721                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1722         if (slave_lmm != NULL)
1723                 OBD_FREE_PTR(slave_lmm);
1724
1725         RETURN(rc);
1726 }
1727
1728 /**
1729  * Declare create striped md object.
1730  */
1731 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1732                                      struct dt_object *dt,
1733                                      struct lu_attr *attr,
1734                                      const struct lu_buf *lum_buf,
1735                                      struct dt_object_format *dof,
1736                                      struct thandle *th)
1737 {
1738         struct lod_object       *lo = lod_dt_obj(dt);
1739         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1740         struct lmv_user_md_v1   *lum;
1741         int                     rc;
1742         ENTRY;
1743
1744         lum = lum_buf->lb_buf;
1745         LASSERT(lum != NULL);
1746
1747         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1748                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1749                (int)le32_to_cpu(lum->lum_stripe_offset));
1750
1751         if (le32_to_cpu(lum->lum_stripe_count) == 0)
1752                 GOTO(out, rc = 0);
1753
1754         rc = lod_verify_md_striping(lod, lum);
1755         if (rc != 0)
1756                 GOTO(out, rc);
1757
1758         /* prepare dir striped objects */
1759         rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1760         if (rc != 0) {
1761                 /* failed to create striping, let's reset
1762                  * config so that others don't get confused */
1763                 lod_object_free_striping(env, lo);
1764                 GOTO(out, rc);
1765         }
1766 out:
1767         RETURN(rc);
1768 }
1769
1770 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1771                                      struct dt_object *dt,
1772                                      const struct lu_buf *buf,
1773                                      const char *name, int fl,
1774                                      struct thandle *th)
1775 {
1776         struct dt_object        *next = dt_object_child(dt);
1777         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1778         struct lod_object       *lo = lod_dt_obj(dt);
1779         int                     i;
1780         int                     rc;
1781         ENTRY;
1782
1783         if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1784                 struct lmv_user_md_v1 *lum;
1785
1786                 LASSERT(buf != NULL && buf->lb_buf != NULL);
1787                 lum = buf->lb_buf;
1788                 rc = lod_verify_md_striping(d, lum);
1789                 if (rc != 0)
1790                         RETURN(rc);
1791         }
1792
1793         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1794         if (rc != 0)
1795                 RETURN(rc);
1796
1797         /* set xattr to each stripes, if needed */
1798         rc = lod_load_striping(env, lo);
1799         if (rc != 0)
1800                 RETURN(rc);
1801
1802         /* Note: Do not set LinkEA on sub-stripes, otherwise
1803          * it will confuse the fid2path process(see mdt_path_current()).
1804          * The linkEA between master and sub-stripes is set in
1805          * lod_xattr_set_lmv(). */
1806         if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1807                 RETURN(0);
1808
1809         for (i = 0; i < lo->ldo_stripenr; i++) {
1810                 LASSERT(lo->ldo_stripe[i]);
1811                 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1812                                           name, fl, th);
1813                 if (rc != 0)
1814                         break;
1815         }
1816
1817         RETURN(rc);
1818 }
1819
1820 /*
1821  * LOV xattr is a storage for striping, and LOD owns this xattr.
1822  * but LOD allows others to control striping to some extent
1823  * - to reset strping
1824  * - to set new defined striping
1825  * - to set new semi-defined striping
1826  *   - number of stripes is defined
1827  *   - number of stripes + osts are defined
1828  *   - ??
1829  */
1830 static int lod_declare_xattr_set(const struct lu_env *env,
1831                                  struct dt_object *dt,
1832                                  const struct lu_buf *buf,
1833                                  const char *name, int fl,
1834                                  struct thandle *th)
1835 {
1836         struct dt_object *next = dt_object_child(dt);
1837         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
1838         __u32             mode;
1839         int               rc;
1840         ENTRY;
1841
1842         /*
1843          * allow to declare predefined striping on a new (!mode) object
1844          * which is supposed to be replay of regular file creation
1845          * (when LOV setting is declared)
1846          * LU_XATTR_REPLACE is set to indicate a layout swap
1847          */
1848         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1849         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1850              !(fl & LU_XATTR_REPLACE)) {
1851                 /*
1852                  * this is a request to manipulate object's striping
1853                  */
1854                 if (dt_object_exists(dt)) {
1855                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1856                         if (rc)
1857                                 RETURN(rc);
1858                 } else {
1859                         memset(attr, 0, sizeof(*attr));
1860                         attr->la_valid = LA_TYPE | LA_MODE;
1861                         attr->la_mode = S_IFREG;
1862                 }
1863                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1864         } else if (S_ISDIR(mode)) {
1865                 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1866         } else {
1867                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1868         }
1869
1870         RETURN(rc);
1871 }
1872
1873 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1874 {
1875         lo->ldo_striping_cached = 0;
1876         lo->ldo_def_striping_set = 0;
1877         lod_object_set_pool(lo, NULL);
1878         lo->ldo_def_stripe_size = 0;
1879         lo->ldo_def_stripenr = 0;
1880         if (lo->ldo_dir_stripe != NULL)
1881                 lo->ldo_dir_striping_cached = 0;
1882 }
1883
1884 static int lod_xattr_set_internal(const struct lu_env *env,
1885                                   struct dt_object *dt,
1886                                   const struct lu_buf *buf,
1887                                   const char *name, int fl, struct thandle *th,
1888                                   struct lustre_capa *capa)
1889 {
1890         struct dt_object        *next = dt_object_child(dt);
1891         struct lod_object       *lo = lod_dt_obj(dt);
1892         int                     rc;
1893         int                     i;
1894         ENTRY;
1895
1896         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1897         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1898                 RETURN(rc);
1899
1900         /* Note: Do not set LinkEA on sub-stripes, otherwise
1901          * it will confuse the fid2path process(see mdt_path_current()).
1902          * The linkEA between master and sub-stripes is set in
1903          * lod_xattr_set_lmv(). */
1904         if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0)
1905                 RETURN(0);
1906
1907         for (i = 0; i < lo->ldo_stripenr; i++) {
1908                 LASSERT(lo->ldo_stripe[i]);
1909                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1910                                   capa);
1911                 if (rc != 0)
1912                         break;
1913         }
1914
1915         RETURN(rc);
1916 }
1917
1918 static int lod_xattr_del_internal(const struct lu_env *env,
1919                                   struct dt_object *dt,
1920                                   const char *name, struct thandle *th,
1921                                   struct lustre_capa *capa)
1922 {
1923         struct dt_object        *next = dt_object_child(dt);
1924         struct lod_object       *lo = lod_dt_obj(dt);
1925         int                     rc;
1926         int                     i;
1927         ENTRY;
1928
1929         rc = dt_xattr_del(env, next, name, th, capa);
1930         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1931                 RETURN(rc);
1932
1933         if (lo->ldo_stripenr == 0)
1934                 RETURN(rc);
1935
1936         for (i = 0; i < lo->ldo_stripenr; i++) {
1937                 LASSERT(lo->ldo_stripe[i]);
1938                 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1939                                   capa);
1940                 if (rc != 0)
1941                         break;
1942         }
1943
1944         RETURN(rc);
1945 }
1946
1947 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1948                                     struct dt_object *dt,
1949                                     const struct lu_buf *buf,
1950                                     const char *name, int fl,
1951                                     struct thandle *th,
1952                                     struct lustre_capa *capa)
1953 {
1954         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1955         struct lod_object       *l = lod_dt_obj(dt);
1956         struct lov_user_md_v1   *lum;
1957         struct lov_user_md_v3   *v3 = NULL;
1958         int                      rc;
1959         ENTRY;
1960
1961         /* If it is striped dir, we should clear the stripe cache for
1962          * slave stripe as well, but there are no effective way to
1963          * notify the LOD on the slave MDT, so we do not cache stripe
1964          * information for slave stripe for now. XXX*/
1965         lod_lov_stripe_cache_clear(l);
1966         LASSERT(buf != NULL && buf->lb_buf != NULL);
1967         lum = buf->lb_buf;
1968
1969         rc = lod_verify_striping(d, buf, false);
1970         if (rc)
1971                 RETURN(rc);
1972
1973         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1974                 v3 = buf->lb_buf;
1975
1976         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1977          * (i.e. all default values specified) then delete default
1978          * striping from dir. */
1979         CDEBUG(D_OTHER,
1980                 "set default striping: sz %u # %u offset %d %s %s\n",
1981                 (unsigned)lum->lmm_stripe_size,
1982                 (unsigned)lum->lmm_stripe_count,
1983                 (int)lum->lmm_stripe_offset,
1984                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1985
1986         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1987                                 (lum->lmm_stripe_count),
1988                                 (lum->lmm_stripe_offset)) &&
1989                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1990                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1991                 if (rc == -ENODATA)
1992                         rc = 0;
1993         } else {
1994                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1995         }
1996
1997         RETURN(rc);
1998 }
1999
2000 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
2001                                             struct dt_object *dt,
2002                                             const struct lu_buf *buf,
2003                                             const char *name, int fl,
2004                                             struct thandle *th,
2005                                             struct lustre_capa *capa)
2006 {
2007         struct lod_object       *l = lod_dt_obj(dt);
2008         struct lmv_user_md_v1   *lum;
2009         int                      rc;
2010         ENTRY;
2011
2012         LASSERT(buf != NULL && buf->lb_buf != NULL);
2013         lum = buf->lb_buf;
2014
2015         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
2016               le32_to_cpu(lum->lum_stripe_count),
2017               (int)le32_to_cpu(lum->lum_stripe_offset));
2018
2019         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
2020                                  le32_to_cpu(lum->lum_stripe_offset)) &&
2021                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
2022                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
2023                 if (rc == -ENODATA)
2024                         rc = 0;
2025         } else {
2026                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2027                 if (rc != 0)
2028                         RETURN(rc);
2029         }
2030
2031         /* Update default stripe cache */
2032         if (l->ldo_dir_stripe == NULL) {
2033                 OBD_ALLOC_PTR(l->ldo_dir_stripe);
2034                 if (l->ldo_dir_stripe == NULL)
2035                         RETURN(-ENOMEM);
2036         }
2037
2038         l->ldo_dir_striping_cached = 0;
2039         l->ldo_dir_def_striping_set = 1;
2040         l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
2041
2042         RETURN(rc);
2043 }
2044
2045 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
2046                              const struct lu_buf *buf, const char *name,
2047                              int fl, struct thandle *th,
2048                              struct lustre_capa *capa)
2049 {
2050         struct lod_object       *lo = lod_dt_obj(dt);
2051         struct lod_thread_info  *info = lod_env_info(env);
2052         struct lu_attr          *attr = &info->lti_attr;
2053         struct dt_object_format *dof = &info->lti_format;
2054         struct lu_buf           lmv_buf;
2055         struct lu_buf           slave_lmv_buf;
2056         struct lmv_mds_md_v1    *lmm;
2057         struct lmv_mds_md_v1    *slave_lmm = NULL;
2058         int                     i;
2059         int                     rc;
2060         ENTRY;
2061
2062         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2063                 RETURN(-ENOTDIR);
2064
2065         /* The stripes are supposed to be allocated in declare phase,
2066          * if there are no stripes being allocated, it will skip */
2067         if (lo->ldo_stripenr == 0)
2068                 RETURN(0);
2069
2070         rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
2071         if (rc != 0)
2072                 RETURN(rc);
2073
2074         attr->la_valid = LA_TYPE | LA_MODE;
2075         dof->dof_type = DFT_DIR;
2076
2077         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
2078         if (rc != 0)
2079                 RETURN(rc);
2080         lmm = lmv_buf.lb_buf;
2081
2082         OBD_ALLOC_PTR(slave_lmm);
2083         if (slave_lmm == NULL)
2084                 RETURN(-ENOMEM);
2085
2086         lod_prep_slave_lmv_md(slave_lmm, lmm);
2087         slave_lmv_buf.lb_buf = slave_lmm;
2088         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
2089
2090         for (i = 0; i < lo->ldo_stripenr; i++) {
2091                 struct dt_object        *dto;
2092                 char                    *stripe_name    = info->lti_key;
2093                 struct lu_name          *sname;
2094                 struct linkea_data       ldata          = { 0 };
2095                 struct lu_buf            linkea_buf;
2096
2097                 dto = lo->ldo_stripe[i];
2098                 dt_write_lock(env, dto, MOR_TGT_CHILD);
2099                 rc = dt_create(env, dto, attr, NULL, dof, th);
2100                 dt_write_unlock(env, dto);
2101                 if (rc != 0)
2102                         RETURN(rc);
2103
2104                 rc = dt_insert(env, dto,
2105                               (const struct dt_rec *)lu_object_fid(&dto->do_lu),
2106                               (const struct dt_key *)dot, th, capa, 0);
2107                 if (rc != 0)
2108                         RETURN(rc);
2109
2110                 rc = dt_insert(env, dto,
2111                               (struct dt_rec *)lu_object_fid(&dt->do_lu),
2112                               (const struct dt_key *)dotdot, th, capa, 0);
2113                 if (rc != 0)
2114                         RETURN(rc);
2115
2116                 if (lo->ldo_striping_cached &&
2117                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2118                                          lo->ldo_def_stripenr,
2119                                          lo->ldo_def_stripe_offset)) {
2120                         struct lov_user_md_v3   *v3;
2121
2122                         /* sigh, lti_ea_store has been used for lmv_buf,
2123                          * so we have to allocate buffer for default
2124                          * stripe EA */
2125                         OBD_ALLOC_PTR(v3);
2126                         if (v3 == NULL)
2127                                 GOTO(out, rc);
2128
2129                         memset(v3, 0, sizeof(*v3));
2130                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2131                         v3->lmm_stripe_count =
2132                                 cpu_to_le16(lo->ldo_def_stripenr);
2133                         v3->lmm_stripe_offset =
2134                                 cpu_to_le16(lo->ldo_def_stripe_offset);
2135                         v3->lmm_stripe_size =
2136                                 cpu_to_le32(lo->ldo_def_stripe_size);
2137                         if (lo->ldo_pool != NULL)
2138                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2139                                         sizeof(v3->lmm_pool_name));
2140
2141                         info->lti_buf.lb_buf = v3;
2142                         info->lti_buf.lb_len = sizeof(*v3);
2143                         rc = dt_xattr_set(env, dto, &info->lti_buf,
2144                                           XATTR_NAME_LOV, 0, th, capa);
2145                         OBD_FREE_PTR(v3);
2146                         if (rc != 0)
2147                                 GOTO(out, rc);
2148                 }
2149
2150                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
2151                 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
2152                                   fl, th, capa);
2153                 if (rc != 0)
2154                         GOTO(out, rc);
2155
2156                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2157                          PFID(lu_object_fid(&dto->do_lu)), i);
2158
2159                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
2160                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
2161                 if (rc != 0)
2162                         GOTO(out, rc);
2163
2164                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
2165                 if (rc != 0)
2166                         GOTO(out, rc);
2167
2168                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
2169                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
2170                 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
2171                                   0, th, BYPASS_CAPA);
2172                 if (rc != 0)
2173                         GOTO(out, rc);
2174
2175                 rc = dt_insert(env, dt_object_child(dt),
2176                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
2177                      (const struct dt_key *)stripe_name, th, capa, 0);
2178                 if (rc != 0)
2179                         GOTO(out, rc);
2180
2181                 rc = dt_ref_add(env, dt_object_child(dt), th);
2182                 if (rc != 0)
2183                         GOTO(out, rc);
2184         }
2185
2186         rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
2187                           fl, th, capa);
2188
2189 out:
2190         if (slave_lmm != NULL)
2191                 OBD_FREE_PTR(slave_lmm);
2192
2193         RETURN(rc);
2194 }
2195
2196 int lod_dir_striping_create_internal(const struct lu_env *env,
2197                                      struct dt_object *dt,
2198                                      struct lu_attr *attr,
2199                                      struct dt_object_format *dof,
2200                                      struct thandle *th,
2201                                      bool declare)
2202 {
2203         struct lod_thread_info  *info = lod_env_info(env);
2204         struct lod_object       *lo = lod_dt_obj(dt);
2205         int                     rc;
2206         ENTRY;
2207
2208         if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
2209                                  lo->ldo_dir_stripe_offset)) {
2210                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2211                 int stripe_count = lo->ldo_stripenr;
2212
2213                 if (info->lti_ea_store_size < sizeof(*v1)) {
2214                         rc = lod_ea_store_resize(info, sizeof(*v1));
2215                         if (rc != 0)
2216                                 RETURN(rc);
2217                         v1 = info->lti_ea_store;
2218                 }
2219
2220                 memset(v1, 0, sizeof(*v1));
2221                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2222                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2223                 v1->lum_stripe_offset =
2224                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
2225
2226                 info->lti_buf.lb_buf = v1;
2227                 info->lti_buf.lb_len = sizeof(*v1);
2228
2229                 if (declare)
2230                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
2231                                                        &info->lti_buf, dof, th);
2232                 else
2233                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2234                                                XATTR_NAME_LMV, 0, th,
2235                                                BYPASS_CAPA);
2236                 if (rc != 0)
2237                         RETURN(rc);
2238         }
2239
2240         /* Transfer default LMV striping from the parent */
2241         if (lo->ldo_dir_striping_cached &&
2242             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2243                                  lo->ldo_dir_def_stripe_offset)) {
2244                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2245                 int def_stripe_count = lo->ldo_dir_def_stripenr;
2246
2247                 if (info->lti_ea_store_size < sizeof(*v1)) {
2248                         rc = lod_ea_store_resize(info, sizeof(*v1));
2249                         if (rc != 0)
2250                                 RETURN(rc);
2251                         v1 = info->lti_ea_store;
2252                 }
2253
2254                 memset(v1, 0, sizeof(*v1));
2255                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2256                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2257                 v1->lum_stripe_offset =
2258                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2259                 v1->lum_hash_type =
2260                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
2261
2262                 info->lti_buf.lb_buf = v1;
2263                 info->lti_buf.lb_len = sizeof(*v1);
2264                 if (declare)
2265                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2266                                                        XATTR_NAME_DEFAULT_LMV,
2267                                                        0, th);
2268                 else
2269                         rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2270                                                   &info->lti_buf,
2271                                                   XATTR_NAME_DEFAULT_LMV, 0,
2272                                                   th, BYPASS_CAPA);
2273                 if (rc != 0)
2274                         RETURN(rc);
2275         }
2276
2277         /* Transfer default LOV striping from the parent */
2278         if (lo->ldo_striping_cached &&
2279             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2280                                  lo->ldo_def_stripenr,
2281                                  lo->ldo_def_stripe_offset)) {
2282                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2283
2284                 if (info->lti_ea_store_size < sizeof(*v3)) {
2285                         rc = lod_ea_store_resize(info, sizeof(*v3));
2286                         if (rc != 0)
2287                                 RETURN(rc);
2288                         v3 = info->lti_ea_store;
2289                 }
2290
2291                 memset(v3, 0, sizeof(*v3));
2292                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2293                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2294                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2295                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2296                 if (lo->ldo_pool != NULL)
2297                         strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2298                                 sizeof(v3->lmm_pool_name));
2299
2300                 info->lti_buf.lb_buf = v3;
2301                 info->lti_buf.lb_len = sizeof(*v3);
2302
2303                 if (declare)
2304                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2305                                                        XATTR_NAME_LOV, 0, th);
2306                 else
2307                         rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2308                                                       XATTR_NAME_LOV, 0, th,
2309                                                       BYPASS_CAPA);
2310                 if (rc != 0)
2311                         RETURN(rc);
2312         }
2313
2314         RETURN(0);
2315 }
2316
2317 static int lod_declare_dir_striping_create(const struct lu_env *env,
2318                                            struct dt_object *dt,
2319                                            struct lu_attr *attr,
2320                                            struct dt_object_format *dof,
2321                                            struct thandle *th)
2322 {
2323         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2324 }
2325
2326 static int lod_dir_striping_create(const struct lu_env *env,
2327                                    struct dt_object *dt,
2328                                    struct lu_attr *attr,
2329                                    struct dt_object_format *dof,
2330                                    struct thandle *th)
2331 {
2332         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2333 }
2334
2335 static int lod_xattr_set(const struct lu_env *env,
2336                          struct dt_object *dt, const struct lu_buf *buf,
2337                          const char *name, int fl, struct thandle *th,
2338                          struct lustre_capa *capa)
2339 {
2340         struct dt_object        *next = dt_object_child(dt);
2341         int                      rc;
2342         ENTRY;
2343
2344         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2345             strcmp(name, XATTR_NAME_LMV) == 0) {
2346                 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2347
2348                 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2349                                                 LMV_HASH_FLAG_MIGRATION)
2350                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2351                 else
2352                         rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2353
2354                 RETURN(rc);
2355         }
2356
2357         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2358             strcmp(name, XATTR_NAME_LOV) == 0) {
2359                 /* default LOVEA */
2360                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2361                 RETURN(rc);
2362         } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2363                    strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2364                 /* default LMVEA */
2365                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2366                                                       th, capa);
2367                 RETURN(rc);
2368         } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2369                    !strcmp(name, XATTR_NAME_LOV)) {
2370                 /* in case of lov EA swap, just set it
2371                  * if not, it is a replay so check striping match what we
2372                  * already have during req replay, declare_xattr_set()
2373                  * defines striping, then create() does the work
2374                 */
2375                 if (fl & LU_XATTR_REPLACE) {
2376                         /* free stripes, then update disk */
2377                         lod_object_free_striping(env, lod_dt_obj(dt));
2378                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2379                 } else {
2380                         rc = lod_striping_create(env, dt, NULL, NULL, th);
2381                 }
2382                 RETURN(rc);
2383         }
2384
2385         /* then all other xattr */
2386         rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2387
2388         RETURN(rc);
2389 }
2390
2391 static int lod_declare_xattr_del(const struct lu_env *env,
2392                                  struct dt_object *dt, const char *name,
2393                                  struct thandle *th)
2394 {
2395         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2396 }
2397
2398 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2399                          const char *name, struct thandle *th,
2400                          struct lustre_capa *capa)
2401 {
2402         if (!strcmp(name, XATTR_NAME_LOV))
2403                 lod_object_free_striping(env, lod_dt_obj(dt));
2404         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2405 }
2406
2407 static int lod_xattr_list(const struct lu_env *env,
2408                           struct dt_object *dt, struct lu_buf *buf,
2409                           struct lustre_capa *capa)
2410 {
2411         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2412 }
2413
2414 int lod_object_set_pool(struct lod_object *o, char *pool)
2415 {
2416         int len;
2417
2418         if (o->ldo_pool) {
2419                 len = strlen(o->ldo_pool);
2420                 OBD_FREE(o->ldo_pool, len + 1);
2421                 o->ldo_pool = NULL;
2422         }
2423         if (pool) {
2424                 len = strlen(pool);
2425                 OBD_ALLOC(o->ldo_pool, len + 1);
2426                 if (o->ldo_pool == NULL)
2427                         return -ENOMEM;
2428                 strcpy(o->ldo_pool, pool);
2429         }
2430         return 0;
2431 }
2432
2433 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2434 {
2435         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2436 }
2437
2438
2439 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2440                                          struct lod_object *lp)
2441 {
2442         struct lod_thread_info  *info = lod_env_info(env);
2443         struct lov_user_md_v1   *v1 = NULL;
2444         struct lov_user_md_v3   *v3 = NULL;
2445         int                      rc;
2446         ENTRY;
2447
2448         /* called from MDD without parent being write locked,
2449          * lock it here */
2450         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2451         rc = lod_get_lov_ea(env, lp);
2452         if (rc < 0)
2453                 GOTO(unlock, rc);
2454
2455         if (rc < sizeof(struct lov_user_md)) {
2456                 /* don't lookup for non-existing or invalid striping */
2457                 lp->ldo_def_striping_set = 0;
2458                 lp->ldo_striping_cached = 1;
2459                 lp->ldo_def_stripe_size = 0;
2460                 lp->ldo_def_stripenr = 0;
2461                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2462                 GOTO(unlock, rc = 0);
2463         }
2464
2465         rc = 0;
2466         v1 = info->lti_ea_store;
2467         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2468                 lustre_swab_lov_user_md_v1(v1);
2469         } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2470                 v3 = (struct lov_user_md_v3 *)v1;
2471                 lustre_swab_lov_user_md_v3(v3);
2472         }
2473
2474         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2475                 GOTO(unlock, rc = 0);
2476
2477         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2478                 GOTO(unlock, rc = 0);
2479
2480         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2481                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2482                (int)v1->lmm_stripe_count,
2483                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2484
2485         lp->ldo_def_stripenr = v1->lmm_stripe_count;
2486         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2487         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2488         lp->ldo_striping_cached = 1;
2489         lp->ldo_def_striping_set = 1;
2490         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2491                 /* XXX: sanity check here */
2492                 v3 = (struct lov_user_md_v3 *) v1;
2493                 if (v3->lmm_pool_name[0])
2494                         lod_object_set_pool(lp, v3->lmm_pool_name);
2495         }
2496         EXIT;
2497 unlock:
2498         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2499         return rc;
2500 }
2501
2502
2503 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2504                                          struct lod_object *lp)
2505 {
2506         struct lod_thread_info  *info = lod_env_info(env);
2507         struct lmv_user_md_v1   *v1 = NULL;
2508         int                      rc;
2509         ENTRY;
2510
2511         /* called from MDD without parent being write locked,
2512          * lock it here */
2513         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2514         rc = lod_get_default_lmv_ea(env, lp);
2515         if (rc < 0)
2516                 GOTO(unlock, rc);
2517
2518         if (rc < sizeof(struct lmv_user_md)) {
2519                 /* don't lookup for non-existing or invalid striping */
2520                 lp->ldo_dir_def_striping_set = 0;
2521                 lp->ldo_dir_striping_cached = 1;
2522                 lp->ldo_dir_def_stripenr = 0;
2523                 lp->ldo_dir_def_stripe_offset =
2524                                         (typeof(v1->lum_stripe_offset))(-1);
2525                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2526                 GOTO(unlock, rc = 0);
2527         }
2528
2529         rc = 0;
2530         v1 = info->lti_ea_store;
2531
2532         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2533         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2534         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2535         lp->ldo_dir_def_striping_set = 1;
2536         lp->ldo_dir_striping_cached = 1;
2537
2538         EXIT;
2539 unlock:
2540         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2541         return rc;
2542 }
2543
2544 static int lod_cache_parent_striping(const struct lu_env *env,
2545                                      struct lod_object *lp,
2546                                      umode_t child_mode)
2547 {
2548         int rc = 0;
2549         ENTRY;
2550
2551         rc = lod_load_striping(env, lp);
2552         if (rc != 0)
2553                 RETURN(rc);
2554
2555         if (!lp->ldo_striping_cached) {
2556                 /* we haven't tried to get default striping for
2557                  * the directory yet, let's cache it in the object */
2558                 rc = lod_cache_parent_lov_striping(env, lp);
2559                 if (rc != 0)
2560                         RETURN(rc);
2561         }
2562
2563         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2564                 rc = lod_cache_parent_lmv_striping(env, lp);
2565
2566         RETURN(rc);
2567 }
2568
2569 /**
2570  * used to transfer default striping data to the object being created
2571  */
2572 static void lod_ah_init(const struct lu_env *env,
2573                         struct dt_allocation_hint *ah,
2574                         struct dt_object *parent,
2575                         struct dt_object *child,
2576                         umode_t child_mode)
2577 {
2578         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2579         struct dt_object  *nextp = NULL;
2580         struct dt_object  *nextc;
2581         struct lod_object *lp = NULL;
2582         struct lod_object *lc;
2583         struct lov_desc   *desc;
2584         int               rc;
2585         ENTRY;
2586
2587         LASSERT(child);
2588
2589         if (likely(parent)) {
2590                 nextp = dt_object_child(parent);
2591                 lp = lod_dt_obj(parent);
2592                 rc = lod_load_striping(env, lp);
2593                 if (rc != 0)
2594                         return;
2595         }
2596
2597         nextc = dt_object_child(child);
2598         lc = lod_dt_obj(child);
2599
2600         LASSERT(lc->ldo_stripenr == 0);
2601         LASSERT(lc->ldo_stripe == NULL);
2602
2603         /*
2604          * local object may want some hints
2605          * in case of late striping creation, ->ah_init()
2606          * can be called with local object existing
2607          */
2608         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2609                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2610                                           NULL : nextp, nextc, child_mode);
2611
2612         if (S_ISDIR(child_mode)) {
2613                 if (lc->ldo_dir_stripe == NULL) {
2614                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2615                         if (lc->ldo_dir_stripe == NULL)
2616                                 return;
2617                 }
2618
2619                 if (lp->ldo_dir_stripe == NULL) {
2620                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2621                         if (lp->ldo_dir_stripe == NULL)
2622                                 return;
2623                 }
2624
2625                 rc = lod_cache_parent_striping(env, lp, child_mode);
2626                 if (rc != 0)
2627                         return;
2628
2629                 /* transfer defaults to new directory */
2630                 if (lp->ldo_striping_cached) {
2631                         if (lp->ldo_pool)
2632                                 lod_object_set_pool(lc, lp->ldo_pool);
2633                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2634                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2635                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2636                         lc->ldo_striping_cached = 1;
2637                         lc->ldo_def_striping_set = 1;
2638                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2639                                (int)lc->ldo_def_stripe_size,
2640                                (int)lc->ldo_def_stripe_offset,
2641                                (int)lc->ldo_def_stripenr);
2642                 }
2643
2644                 /* transfer dir defaults to new directory */
2645                 if (lp->ldo_dir_striping_cached) {
2646                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2647                         lc->ldo_dir_def_stripe_offset =
2648                                                   lp->ldo_dir_def_stripe_offset;
2649                         lc->ldo_dir_def_hash_type =
2650                                                   lp->ldo_dir_def_hash_type;
2651                         lc->ldo_dir_striping_cached = 1;
2652                         lc->ldo_dir_def_striping_set = 1;
2653                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2654                                (int)lc->ldo_dir_def_stripenr,
2655                                (int)lc->ldo_dir_def_stripe_offset,
2656                                lc->ldo_dir_def_hash_type);
2657                 }
2658
2659                 /* It should always honour the specified stripes */
2660                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2661                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2662
2663                         rc = lod_verify_md_striping(d, lum1);
2664                         if (rc == 0 &&
2665                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2666                                 /* Directory will be striped only if
2667                                  * stripe_count > 1 */
2668                                 lc->ldo_stripenr =
2669                                         le32_to_cpu(lum1->lum_stripe_count);
2670                                 lc->ldo_dir_stripe_offset =
2671                                         le32_to_cpu(lum1->lum_stripe_offset);
2672                                 lc->ldo_dir_hash_type =
2673                                         le32_to_cpu(lum1->lum_hash_type);
2674                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2675                                        lc->ldo_stripenr,
2676                                        (int)lc->ldo_dir_stripe_offset);
2677                         }
2678                 /* then check whether there is default stripes from parent */
2679                 } else if (lp->ldo_dir_def_striping_set) {
2680                         /* If there are default dir stripe from parent */
2681                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2682                         lc->ldo_dir_stripe_offset =
2683                                         lp->ldo_dir_def_stripe_offset;
2684                         lc->ldo_dir_hash_type =
2685                                         lp->ldo_dir_def_hash_type;
2686                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2687                                lc->ldo_stripenr,
2688                                (int)lc->ldo_dir_stripe_offset);
2689                 } else {
2690                         /* set default stripe for this directory */
2691                         lc->ldo_stripenr = 0;
2692                         lc->ldo_dir_stripe_offset = -1;
2693                 }
2694
2695                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2696                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2697
2698                 goto out;
2699         }
2700
2701         /*
2702          * if object is going to be striped over OSTs, transfer default
2703          * striping information to the child, so that we can use it
2704          * during declaration and creation
2705          */
2706         if (!lod_object_will_be_striped(S_ISREG(child_mode),
2707                                         lu_object_fid(&child->do_lu)))
2708                 goto out;
2709         /*
2710          * try from the parent
2711          */
2712         if (likely(parent)) {
2713                 lod_cache_parent_striping(env, lp, child_mode);
2714
2715                 lc->ldo_def_stripe_offset = (__u16) -1;
2716
2717                 if (lp->ldo_def_striping_set) {
2718                         if (lp->ldo_pool)
2719                                 lod_object_set_pool(lc, lp->ldo_pool);
2720                         lc->ldo_stripenr = lp->ldo_def_stripenr;
2721                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2722                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2723                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2724                                lc->ldo_stripenr, lc->ldo_stripe_size,
2725                                lp->ldo_pool ? lp->ldo_pool : "");
2726                 }
2727         }
2728
2729         /*
2730          * if the parent doesn't provide with specific pattern, grab fs-wide one
2731          */
2732         desc = &d->lod_desc;
2733         if (lc->ldo_stripenr == 0)
2734                 lc->ldo_stripenr = desc->ld_default_stripe_count;
2735         if (lc->ldo_stripe_size == 0)
2736                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2737         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2738                lc->ldo_stripenr, lc->ldo_stripe_size,
2739                lc->ldo_pool ? lc->ldo_pool : "");
2740
2741 out:
2742         /* we do not cache stripe information for slave stripe, see
2743          * lod_xattr_set_lov_on_dir */
2744         if (lp != NULL && lp->ldo_dir_slave_stripe)
2745                 lod_lov_stripe_cache_clear(lp);
2746
2747         EXIT;
2748 }
2749
2750 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
2751 /*
2752  * this function handles a special case when truncate was done
2753  * on a stripeless object and now striping is being created
2754  * we can't lose that size, so we have to propagate it to newly
2755  * created object
2756  */
2757 static int lod_declare_init_size(const struct lu_env *env,
2758                                  struct dt_object *dt, struct thandle *th)
2759 {
2760         struct dt_object   *next = dt_object_child(dt);
2761         struct lod_object  *lo = lod_dt_obj(dt);
2762         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
2763         uint64_t            size, offs;
2764         int                 rc, stripe;
2765         ENTRY;
2766
2767         /* XXX: we support the simplest (RAID0) striping so far */
2768         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2769         LASSERT(lo->ldo_stripe_size > 0);
2770
2771         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2772         LASSERT(attr->la_valid & LA_SIZE);
2773         if (rc)
2774                 RETURN(rc);
2775
2776         size = attr->la_size;
2777         if (size == 0)
2778                 RETURN(0);
2779
2780         /* ll_do_div64(a, b) returns a % b, and a = a / b */
2781         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2782         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2783
2784         size = size * lo->ldo_stripe_size;
2785         offs = attr->la_size;
2786         size += ll_do_div64(offs, lo->ldo_stripe_size);
2787
2788         attr->la_valid = LA_SIZE;
2789         attr->la_size = size;
2790
2791         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2792
2793         RETURN(rc);
2794 }
2795
2796 /**
2797  * Create declaration of striped object
2798  */
2799 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2800                                struct lu_attr *attr,
2801                                const struct lu_buf *lovea, struct thandle *th)
2802 {
2803         struct lod_thread_info  *info = lod_env_info(env);
2804         struct dt_object        *next = dt_object_child(dt);
2805         struct lod_object       *lo = lod_dt_obj(dt);
2806         int                      rc;
2807         ENTRY;
2808
2809         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2810                 /* failed to create striping, let's reset
2811                  * config so that others don't get confused */
2812                 lod_object_free_striping(env, lo);
2813                 GOTO(out, rc = -ENOMEM);
2814         }
2815
2816         if (!dt_object_remote(next)) {
2817                 /* choose OST and generate appropriate objects */
2818                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2819                 if (rc) {
2820                         /* failed to create striping, let's reset
2821                          * config so that others don't get confused */
2822                         lod_object_free_striping(env, lo);
2823                         GOTO(out, rc);
2824                 }
2825
2826                 /*
2827                  * declare storage for striping data
2828                  */
2829                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2830                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
2831         } else {
2832                 /* LOD can not choose OST objects for remote objects, i.e.
2833                  * stripes must be ready before that. Right now, it can only
2834                  * happen during migrate, i.e. migrate process needs to create
2835                  * remote regular file (mdd_migrate_create), then the migrate
2836                  * process will provide stripeEA. */
2837                 LASSERT(lovea != NULL);
2838                 info->lti_buf = *lovea;
2839         }
2840
2841         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2842                                   XATTR_NAME_LOV, 0, th);
2843         if (rc)
2844                 GOTO(out, rc);
2845
2846         /*
2847          * if striping is created with local object's size > 0,
2848          * we have to propagate this size to specific object
2849          * the case is possible only when local object was created previously
2850          */
2851         if (dt_object_exists(next))
2852                 rc = lod_declare_init_size(env, dt, th);
2853
2854 out:
2855         RETURN(rc);
2856 }
2857
2858 static int lod_declare_object_create(const struct lu_env *env,
2859                                      struct dt_object *dt,
2860                                      struct lu_attr *attr,
2861                                      struct dt_allocation_hint *hint,
2862                                      struct dt_object_format *dof,
2863                                      struct thandle *th)
2864 {
2865         struct dt_object   *next = dt_object_child(dt);
2866         struct lod_object  *lo = lod_dt_obj(dt);
2867         int                 rc;
2868         ENTRY;
2869
2870         LASSERT(dof);
2871         LASSERT(attr);
2872         LASSERT(th);
2873
2874         /*
2875          * first of all, we declare creation of local object
2876          */
2877         rc = dt_declare_create(env, next, attr, hint, dof, th);
2878         if (rc)
2879                 GOTO(out, rc);
2880
2881         if (dof->dof_type == DFT_SYM)
2882                 dt->do_body_ops = &lod_body_lnk_ops;
2883
2884         /*
2885          * it's lod_ah_init() who has decided the object will striped
2886          */
2887         if (dof->dof_type == DFT_REGULAR) {
2888                 /* callers don't want stripes */
2889                 /* XXX: all tricky interactions with ->ah_make_hint() decided
2890                  * to use striping, then ->declare_create() behaving differently
2891                  * should be cleaned */
2892                 if (dof->u.dof_reg.striped == 0)
2893                         lo->ldo_stripenr = 0;
2894                 if (lo->ldo_stripenr > 0)
2895                         rc = lod_declare_striped_object(env, dt, attr,
2896                                                         NULL, th);
2897         } else if (dof->dof_type == DFT_DIR) {
2898                 /* Orphan object (like migrating object) does not have
2899                  * lod_dir_stripe, see lod_ah_init */
2900                 if (lo->ldo_dir_stripe != NULL)
2901                         rc = lod_declare_dir_striping_create(env, dt, attr,
2902                                                              dof, th);
2903         }
2904 out:
2905         RETURN(rc);
2906 }
2907
2908 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2909                         struct lu_attr *attr, struct dt_object_format *dof,
2910                         struct thandle *th)
2911 {
2912         struct lod_object *lo = lod_dt_obj(dt);
2913         int                rc = 0, i;
2914         ENTRY;
2915
2916         LASSERT(lo->ldo_striping_cached == 0);
2917
2918         /* create all underlying objects */
2919         for (i = 0; i < lo->ldo_stripenr; i++) {
2920                 LASSERT(lo->ldo_stripe[i]);
2921                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2922
2923                 if (rc)
2924                         break;
2925         }
2926         if (rc == 0)
2927                 rc = lod_generate_and_set_lovea(env, lo, th);
2928
2929         RETURN(rc);
2930 }
2931
2932 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2933                              struct lu_attr *attr,
2934                              struct dt_allocation_hint *hint,
2935                              struct dt_object_format *dof, struct thandle *th)
2936 {
2937         struct dt_object   *next = dt_object_child(dt);
2938         struct lod_object  *lo = lod_dt_obj(dt);
2939         int                 rc;
2940         ENTRY;
2941
2942         /* create local object */
2943         rc = dt_create(env, next, attr, hint, dof, th);
2944         if (rc != 0)
2945                 RETURN(rc);
2946
2947         if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2948             lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2949                 rc = lod_striping_create(env, dt, attr, dof, th);
2950
2951         RETURN(rc);
2952 }
2953
2954 static int lod_declare_object_destroy(const struct lu_env *env,
2955                                       struct dt_object *dt,
2956                                       struct thandle *th)
2957 {
2958         struct dt_object   *next = dt_object_child(dt);
2959         struct lod_object  *lo = lod_dt_obj(dt);
2960         struct lod_thread_info *info = lod_env_info(env);
2961         char               *stripe_name = info->lti_key;
2962         int                 rc, i;
2963         ENTRY;
2964
2965         /*
2966          * load striping information, notice we don't do this when object
2967          * is being initialized as we don't need this information till
2968          * few specific cases like destroy, chown
2969          */
2970         rc = lod_load_striping(env, lo);
2971         if (rc)
2972                 RETURN(rc);
2973
2974         /* declare destroy for all underlying objects */
2975         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2976                 rc = next->do_ops->do_index_try(env, next,
2977                                                 &dt_directory_features);
2978                 if (rc != 0)
2979                         RETURN(rc);
2980
2981                 for (i = 0; i < lo->ldo_stripenr; i++) {
2982                         rc = dt_declare_ref_del(env, next, th);
2983                         if (rc != 0)
2984                                 RETURN(rc);
2985                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2986                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2987                                 i);
2988                         rc = dt_declare_delete(env, next,
2989                                         (const struct dt_key *)stripe_name, th);
2990                         if (rc != 0)
2991                                 RETURN(rc);
2992                 }
2993         }
2994         /*
2995          * we declare destroy for the local object
2996          */
2997         rc = dt_declare_destroy(env, next, th);
2998         if (rc)
2999                 RETURN(rc);
3000
3001         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3002                 RETURN(0);
3003
3004         /* declare destroy all striped objects */
3005         for (i = 0; i < lo->ldo_stripenr; i++) {
3006                 if (likely(lo->ldo_stripe[i] != NULL)) {
3007                         rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
3008                         if (rc != 0)
3009                                 break;
3010                 }
3011         }
3012
3013         RETURN(rc);
3014 }
3015
3016 static int lod_object_destroy(const struct lu_env *env,
3017                 struct dt_object *dt, struct thandle *th)
3018 {
3019         struct dt_object  *next = dt_object_child(dt);
3020         struct lod_object *lo = lod_dt_obj(dt);
3021         struct lod_thread_info *info = lod_env_info(env);
3022         char               *stripe_name = info->lti_key;
3023         int                rc, i;
3024         ENTRY;
3025
3026         /* destroy sub-stripe of master object */
3027         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
3028                 rc = next->do_ops->do_index_try(env, next,
3029                                                 &dt_directory_features);
3030                 if (rc != 0)
3031                         RETURN(rc);
3032
3033                 for (i = 0; i < lo->ldo_stripenr; i++) {
3034                         rc = dt_ref_del(env, next, th);
3035                         if (rc != 0)
3036                                 RETURN(rc);
3037
3038                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
3039                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
3040                                 i);
3041
3042                         CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
3043                                PFID(lu_object_fid(&dt->do_lu)), stripe_name,
3044                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
3045
3046                         rc = dt_delete(env, next,
3047                                        (const struct dt_key *)stripe_name,
3048                                        th, BYPASS_CAPA);
3049                         if (rc != 0)
3050                                 RETURN(rc);
3051                 }
3052         }
3053         rc = dt_destroy(env, next, th);
3054         if (rc != 0)
3055                 RETURN(rc);
3056
3057         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
3058                 RETURN(0);
3059
3060         /* destroy all striped objects */
3061         for (i = 0; i < lo->ldo_stripenr; i++) {
3062                 if (likely(lo->ldo_stripe[i] != NULL) &&
3063                     (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
3064                      i == cfs_fail_val)) {
3065                         rc = dt_destroy(env, lo->ldo_stripe[i], th);
3066                         if (rc != 0)
3067                                 break;
3068                 }
3069         }
3070
3071         RETURN(rc);
3072 }
3073
3074 static int lod_declare_ref_add(const struct lu_env *env,
3075                                struct dt_object *dt, struct thandle *th)
3076 {
3077         return dt_declare_ref_add(env, dt_object_child(dt), th);
3078 }
3079
3080 static int lod_ref_add(const struct lu_env *env,
3081                        struct dt_object *dt, struct thandle *th)
3082 {
3083         return dt_ref_add(env, dt_object_child(dt), th);
3084 }
3085
3086 static int lod_declare_ref_del(const struct lu_env *env,
3087                                struct dt_object *dt, struct thandle *th)
3088 {
3089         return dt_declare_ref_del(env, dt_object_child(dt), th);
3090 }
3091
3092 static int lod_ref_del(const struct lu_env *env,
3093                        struct dt_object *dt, struct thandle *th)
3094 {
3095         return dt_ref_del(env, dt_object_child(dt), th);
3096 }
3097
3098 static struct obd_capa *lod_capa_get(const struct lu_env *env,
3099                                      struct dt_object *dt,
3100                                      struct lustre_capa *old, __u64 opc)
3101 {
3102         return dt_capa_get(env, dt_object_child(dt), old, opc);
3103 }
3104
3105 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
3106                            __u64 start, __u64 end)
3107 {
3108         return dt_object_sync(env, dt_object_child(dt), start, end);
3109 }
3110
3111 struct lod_slave_locks  {
3112         int                     lsl_lock_count;
3113         struct lustre_handle    lsl_handle[0];
3114 };
3115
3116 static int lod_object_unlock_internal(const struct lu_env *env,
3117                                       struct dt_object *dt,
3118                                       struct ldlm_enqueue_info *einfo,
3119                                       ldlm_policy_data_t *policy)
3120 {
3121         struct lod_object       *lo = lod_dt_obj(dt);
3122         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
3123         int                     rc = 0;
3124         int                     i;
3125         ENTRY;
3126
3127         if (slave_locks == NULL)
3128                 RETURN(0);
3129
3130         for (i = 1; i < slave_locks->lsl_lock_count; i++) {
3131                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
3132                         int     rc1;
3133
3134                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
3135                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
3136                                                policy);
3137                         if (rc1 < 0)
3138                                 rc = rc == 0 ? rc1 : rc;
3139                 }
3140         }
3141
3142         RETURN(rc);
3143 }
3144
3145 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
3146                              struct ldlm_enqueue_info *einfo,
3147                              union ldlm_policy_data *policy)
3148 {
3149         struct lod_object       *lo = lod_dt_obj(dt);
3150         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
3151         int                     slave_locks_size;
3152         int                     rc;
3153         ENTRY;
3154
3155         if (slave_locks == NULL)
3156                 RETURN(0);
3157
3158         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3159                 RETURN(-ENOTDIR);
3160
3161         rc = lod_load_striping(env, lo);
3162         if (rc != 0)
3163                 RETURN(rc);
3164
3165         /* Note: for remote lock for single stripe dir, MDT will cancel
3166          * the lock by lockh directly */
3167         if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
3168                 RETURN(0);
3169
3170         /* Only cancel slave lock for striped dir */
3171         rc = lod_object_unlock_internal(env, dt, einfo, policy);
3172
3173         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
3174                            sizeof(slave_locks->lsl_handle[0]);
3175         OBD_FREE(slave_locks, slave_locks_size);
3176         einfo->ei_cbdata = NULL;
3177
3178         RETURN(rc);
3179 }
3180
3181 static int lod_object_lock(const struct lu_env *env,
3182                            struct dt_object *dt,
3183                            struct lustre_handle *lh,
3184                            struct ldlm_enqueue_info *einfo,
3185                            union ldlm_policy_data *policy)
3186 {
3187         struct lod_object       *lo = lod_dt_obj(dt);
3188         int                     rc = 0;
3189         int                     i;
3190         int                     slave_locks_size;
3191         struct lod_slave_locks  *slave_locks = NULL;
3192         ENTRY;
3193
3194         /* remote object lock */
3195         if (!einfo->ei_enq_slave) {
3196                 LASSERT(dt_object_remote(dt));
3197                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
3198                                       policy);
3199         }
3200
3201         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3202                 RETURN(-ENOTDIR);
3203
3204         rc = lod_load_striping(env, lo);
3205         if (rc != 0)
3206                 RETURN(rc);
3207
3208         /* No stripes */
3209         if (lo->ldo_stripenr <= 1)
3210                 RETURN(0);
3211
3212         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
3213                            sizeof(slave_locks->lsl_handle[0]);
3214         /* Freed in lod_object_unlock */
3215         OBD_ALLOC(slave_locks, slave_locks_size);
3216         if (slave_locks == NULL)
3217                 RETURN(-ENOMEM);
3218         slave_locks->lsl_lock_count = lo->ldo_stripenr;
3219
3220         /* striped directory lock */
3221         for (i = 1; i < lo->ldo_stripenr; i++) {
3222                 struct lustre_handle    lockh;
3223                 struct ldlm_res_id      *res_id;
3224
3225                 res_id = &lod_env_info(env)->lti_res_id;
3226                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3227                                        res_id);
3228                 einfo->ei_res_id = res_id;
3229
3230                 LASSERT(lo->ldo_stripe[i]);
3231                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3232                                     policy);
3233                 if (rc != 0)
3234                         GOTO(out, rc);
3235                 slave_locks->lsl_handle[i] = lockh;
3236         }
3237
3238         einfo->ei_cbdata = slave_locks;
3239
3240 out:
3241         if (rc != 0 && slave_locks != NULL) {
3242                 einfo->ei_cbdata = slave_locks;
3243                 lod_object_unlock_internal(env, dt, einfo, policy);
3244                 OBD_FREE(slave_locks, slave_locks_size);
3245                 einfo->ei_cbdata = NULL;
3246         }
3247
3248         RETURN(rc);
3249 }
3250
3251 struct dt_object_operations lod_obj_ops = {
3252         .do_read_lock           = lod_object_read_lock,
3253         .do_write_lock          = lod_object_write_lock,
3254         .do_read_unlock         = lod_object_read_unlock,
3255         .do_write_unlock        = lod_object_write_unlock,
3256         .do_write_locked        = lod_object_write_locked,
3257         .do_attr_get            = lod_attr_get,
3258         .do_declare_attr_set    = lod_declare_attr_set,
3259         .do_attr_set            = lod_attr_set,
3260         .do_xattr_get           = lod_xattr_get,
3261         .do_declare_xattr_set   = lod_declare_xattr_set,
3262         .do_xattr_set           = lod_xattr_set,
3263         .do_declare_xattr_del   = lod_declare_xattr_del,
3264         .do_xattr_del           = lod_xattr_del,
3265         .do_xattr_list          = lod_xattr_list,
3266         .do_ah_init             = lod_ah_init,
3267         .do_declare_create      = lod_declare_object_create,
3268         .do_create              = lod_object_create,
3269         .do_declare_destroy     = lod_declare_object_destroy,
3270         .do_destroy             = lod_object_destroy,
3271         .do_index_try           = lod_index_try,
3272         .do_declare_ref_add     = lod_declare_ref_add,
3273         .do_ref_add             = lod_ref_add,
3274         .do_declare_ref_del     = lod_declare_ref_del,
3275         .do_ref_del             = lod_ref_del,
3276         .do_capa_get            = lod_capa_get,
3277         .do_object_sync         = lod_object_sync,
3278         .do_object_lock         = lod_object_lock,
3279         .do_object_unlock       = lod_object_unlock,
3280 };
3281
3282 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3283                         struct lu_buf *buf, loff_t *pos,
3284                         struct lustre_capa *capa)
3285 {
3286         struct dt_object *next = dt_object_child(dt);
3287         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3288 }
3289
3290 static ssize_t lod_declare_write(const struct lu_env *env,
3291                                  struct dt_object *dt,
3292                                  const struct lu_buf *buf, loff_t pos,
3293                                  struct thandle *th)
3294 {
3295         return dt_declare_record_write(env, dt_object_child(dt),
3296                                        buf, pos, th);
3297 }
3298
3299 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3300                          const struct lu_buf *buf, loff_t *pos,
3301                          struct thandle *th, struct lustre_capa *capa, int iq)
3302 {
3303         struct dt_object *next = dt_object_child(dt);
3304         LASSERT(next);
3305         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3306 }
3307
3308 static const struct dt_body_operations lod_body_lnk_ops = {
3309         .dbo_read               = lod_read,
3310         .dbo_declare_write      = lod_declare_write,
3311         .dbo_write              = lod_write
3312 };
3313
3314 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3315                            const struct lu_object_conf *conf)
3316 {
3317         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
3318         struct lu_device        *cdev   = NULL;
3319         struct lu_object        *cobj;
3320         struct lod_tgt_descs    *ltd    = NULL;
3321         struct lod_tgt_desc     *tgt;
3322         mdsno_t                  idx    = 0;
3323         int                      type   = LU_SEQ_RANGE_ANY;
3324         int                      rc;
3325         ENTRY;
3326
3327         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3328         if (rc != 0)
3329                 RETURN(rc);
3330
3331         if (type == LU_SEQ_RANGE_MDT &&
3332             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3333                 cdev = &lod->lod_child->dd_lu_dev;
3334         } else if (type == LU_SEQ_RANGE_MDT) {
3335                 ltd = &lod->lod_mdt_descs;
3336                 lod_getref(ltd);
3337         } else if (type == LU_SEQ_RANGE_OST) {
3338                 ltd = &lod->lod_ost_descs;
3339                 lod_getref(ltd);
3340         } else {
3341                 LBUG();
3342         }
3343
3344         if (ltd != NULL) {
3345                 if (ltd->ltd_tgts_size > idx &&
3346                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3347                         tgt = LTD_TGT(ltd, idx);
3348
3349                         LASSERT(tgt != NULL);
3350                         LASSERT(tgt->ltd_tgt != NULL);
3351
3352                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
3353                 }
3354                 lod_putref(lod, ltd);
3355         }
3356
3357         if (unlikely(cdev == NULL))
3358                 RETURN(-ENOENT);
3359
3360         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3361         if (unlikely(cobj == NULL))
3362                 RETURN(-ENOMEM);
3363
3364         lu_object_add(lo, cobj);
3365
3366         RETURN(0);
3367 }
3368
3369 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3370 {
3371         int i;
3372
3373         if (lo->ldo_dir_stripe != NULL) {
3374                 OBD_FREE_PTR(lo->ldo_dir_stripe);
3375                 lo->ldo_dir_stripe = NULL;
3376         }
3377
3378         if (lo->ldo_stripe) {
3379                 LASSERT(lo->ldo_stripes_allocated > 0);
3380
3381                 for (i = 0; i < lo->ldo_stripenr; i++) {
3382                         if (lo->ldo_stripe[i])
3383                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3384                 }
3385
3386                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3387                 OBD_FREE(lo->ldo_stripe, i);
3388                 lo->ldo_stripe = NULL;
3389                 lo->ldo_stripes_allocated = 0;
3390         }
3391         lo->ldo_stripenr = 0;
3392         lo->ldo_pattern = 0;
3393 }
3394
3395 /*
3396  * ->start is called once all slices are initialized, including header's
3397  * cache for mode (object type). using the type we can initialize ops
3398  */
3399 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3400 {
3401         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3402                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3403         return 0;
3404 }
3405
3406 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3407 {
3408         struct lod_object *mo = lu2lod_obj(o);
3409
3410         /*
3411          * release all underlying object pinned
3412          */
3413
3414         lod_object_free_striping(env, mo);
3415
3416         lod_object_set_pool(mo, NULL);
3417
3418         lu_object_fini(o);
3419         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3420 }
3421
3422 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3423 {
3424         /* XXX: shouldn't we release everything here in case if object
3425          * creation failed before? */
3426 }
3427
3428 static int lod_object_print(const struct lu_env *env, void *cookie,
3429                             lu_printer_t p, const struct lu_object *l)
3430 {
3431         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3432
3433         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3434 }
3435
3436 struct lu_object_operations lod_lu_obj_ops = {
3437         .loo_object_init        = lod_object_init,
3438         .loo_object_start       = lod_object_start,
3439         .loo_object_free        = lod_object_free,
3440         .loo_object_release     = lod_object_release,
3441         .loo_object_print       = lod_object_print,
3442 };