Whamcloud - gitweb
LU-4675 lfsck: new pattern flag for partially repaired file
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47
48 #include "lod_internal.h"
49
50 static const char dot[] = ".";
51 static const char dotdot[] = "..";
52
53 extern struct kmem_cache *lod_object_kmem;
54 static const struct dt_body_operations lod_body_lnk_ops;
55
56 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
57                             struct dt_rec *rec, const struct dt_key *key,
58                             struct lustre_capa *capa)
59 {
60         struct dt_object *next = dt_object_child(dt);
61         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
62 }
63
64 static int lod_declare_index_insert(const struct lu_env *env,
65                                     struct dt_object *dt,
66                                     const struct dt_rec *rec,
67                                     const struct dt_key *key,
68                                     struct thandle *handle)
69 {
70         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
71 }
72
73 static int lod_index_insert(const struct lu_env *env,
74                             struct dt_object *dt,
75                             const struct dt_rec *rec,
76                             const struct dt_key *key,
77                             struct thandle *th,
78                             struct lustre_capa *capa,
79                             int ign)
80 {
81         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
82 }
83
84 static int lod_declare_index_delete(const struct lu_env *env,
85                                     struct dt_object *dt,
86                                     const struct dt_key *key,
87                                     struct thandle *th)
88 {
89         return dt_declare_delete(env, dt_object_child(dt), key, th);
90 }
91
92 static int lod_index_delete(const struct lu_env *env,
93                             struct dt_object *dt,
94                             const struct dt_key *key,
95                             struct thandle *th,
96                             struct lustre_capa *capa)
97 {
98         return dt_delete(env, dt_object_child(dt), key, th, capa);
99 }
100
101 static struct dt_it *lod_it_init(const struct lu_env *env,
102                                  struct dt_object *dt, __u32 attr,
103                                  struct lustre_capa *capa)
104 {
105         struct dt_object        *next = dt_object_child(dt);
106         struct lod_it           *it = &lod_env_info(env)->lti_it;
107         struct dt_it            *it_next;
108
109
110         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
111         if (IS_ERR(it_next))
112                 return it_next;
113
114         /* currently we do not use more than one iterator per thread
115          * so we store it in thread info. if at some point we need
116          * more active iterators in a single thread, we can allocate
117          * additional ones */
118         LASSERT(it->lit_obj == NULL);
119
120         it->lit_it = it_next;
121         it->lit_obj = next;
122
123         return (struct dt_it *)it;
124 }
125
126 #define LOD_CHECK_IT(env, it)                                   \
127 do {                                                            \
128         LASSERT((it)->lit_obj != NULL);                         \
129         LASSERT((it)->lit_it != NULL);                          \
130 } while (0)
131
132 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
133 {
134         struct lod_it *it = (struct lod_it *)di;
135
136         LOD_CHECK_IT(env, it);
137         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
138
139         /* the iterator not in use any more */
140         it->lit_obj = NULL;
141         it->lit_it = NULL;
142 }
143
144 int lod_it_get(const struct lu_env *env, struct dt_it *di,
145                const struct dt_key *key)
146 {
147         const struct lod_it *it = (const struct lod_it *)di;
148
149         LOD_CHECK_IT(env, it);
150         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
151 }
152
153 void lod_it_put(const struct lu_env *env, struct dt_it *di)
154 {
155         struct lod_it *it = (struct lod_it *)di;
156
157         LOD_CHECK_IT(env, it);
158         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
159 }
160
161 int lod_it_next(const struct lu_env *env, struct dt_it *di)
162 {
163         struct lod_it *it = (struct lod_it *)di;
164
165         LOD_CHECK_IT(env, it);
166         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
167 }
168
169 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
170 {
171         const struct lod_it *it = (const struct lod_it *)di;
172
173         LOD_CHECK_IT(env, it);
174         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
175 }
176
177 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
178 {
179         struct lod_it *it = (struct lod_it *)di;
180
181         LOD_CHECK_IT(env, it);
182         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
183 }
184
185 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
186                struct dt_rec *rec, __u32 attr)
187 {
188         const struct lod_it *it = (const struct lod_it *)di;
189
190         LOD_CHECK_IT(env, it);
191         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
192                                                      attr);
193 }
194
195 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
196                     __u32 attr)
197 {
198         const struct lod_it *it = (const struct lod_it *)di;
199
200         LOD_CHECK_IT(env, it);
201         return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
202                                                           attr);
203 }
204
205 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
206 {
207         const struct lod_it *it = (const struct lod_it *)di;
208
209         LOD_CHECK_IT(env, it);
210         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
211 }
212
213 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
214 {
215         const struct lod_it *it = (const struct lod_it *)di;
216
217         LOD_CHECK_IT(env, it);
218         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
219 }
220
221 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
222                    void *key_rec)
223 {
224         const struct lod_it *it = (const struct lod_it *)di;
225
226         LOD_CHECK_IT(env, it);
227         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
228                                                          key_rec);
229 }
230
231 static struct dt_index_operations lod_index_ops = {
232         .dio_lookup             = lod_index_lookup,
233         .dio_declare_insert     = lod_declare_index_insert,
234         .dio_insert             = lod_index_insert,
235         .dio_declare_delete     = lod_declare_index_delete,
236         .dio_delete             = lod_index_delete,
237         .dio_it = {
238                 .init           = lod_it_init,
239                 .fini           = lod_it_fini,
240                 .get            = lod_it_get,
241                 .put            = lod_it_put,
242                 .next           = lod_it_next,
243                 .key            = lod_it_key,
244                 .key_size       = lod_it_key_size,
245                 .rec            = lod_it_rec,
246                 .rec_size       = lod_it_rec_size,
247                 .store          = lod_it_store,
248                 .load           = lod_it_load,
249                 .key_rec        = lod_it_key_rec,
250         }
251 };
252
253 /**
254  * Implementation of dt_index_operations:: dio_it.init
255  *
256  * This function is to initialize the iterator for striped directory,
257  * basically these lod_striped_it_xxx will just locate the stripe
258  * and call the correspondent api of its next lower layer.
259  *
260  * \param[in] env       execution environment.
261  * \param[in] dt        the striped directory object to be iterated.
262  * \param[in] attr      the attribute of iterator, mostly used to indicate
263  *                      the entry attribute in the object to be iterated.
264  * \param[in] capa      capability(useless in current implementation)
265  *
266  * \retval      initialized iterator(dt_it) if successful initialize the
267  *              iteration. lit_stripe_index will be used to indicate the
268  *              current iterate position among stripes.
269  * \retval      ERR pointer if initialization is failed.
270  */
271 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
272                                          struct dt_object *dt, __u32 attr,
273                                          struct lustre_capa *capa)
274 {
275         struct lod_object       *lo = lod_dt_obj(dt);
276         struct dt_object        *next;
277         struct lod_it           *it = &lod_env_info(env)->lti_it;
278         struct dt_it            *it_next;
279         ENTRY;
280
281         LASSERT(lo->ldo_stripenr > 0);
282         next = lo->ldo_stripe[0];
283         LASSERT(next != NULL);
284         LASSERT(next->do_index_ops != NULL);
285
286         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
287         if (IS_ERR(it_next))
288                 return it_next;
289
290         /* currently we do not use more than one iterator per thread
291          * so we store it in thread info. if at some point we need
292          * more active iterators in a single thread, we can allocate
293          * additional ones */
294         LASSERT(it->lit_obj == NULL);
295
296         it->lit_stripe_index = 0;
297         it->lit_attr = attr;
298         it->lit_it = it_next;
299         it->lit_obj = dt;
300
301         return (struct dt_it *)it;
302 }
303
304 #define LOD_CHECK_STRIPED_IT(env, it, lo)                       \
305 do {                                                            \
306         LASSERT((it)->lit_obj != NULL);                         \
307         LASSERT((it)->lit_it != NULL);                          \
308         LASSERT((lo)->ldo_stripenr > 0);                        \
309         LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr);   \
310 } while (0)
311
312 /**
313  * Implementation of dt_index_operations:: dio_it.fini
314  *
315  * This function is to finish the iterator for striped directory.
316  *
317  * \param[in] env       execution environment.
318  * \param[in] di        the iterator for the striped directory
319  *
320  */
321 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
322 {
323         struct lod_it           *it = (struct lod_it *)di;
324         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
325         struct dt_object        *next;
326
327         LOD_CHECK_STRIPED_IT(env, it, lo);
328
329         next = lo->ldo_stripe[it->lit_stripe_index];
330         LASSERT(next != NULL);
331         LASSERT(next->do_index_ops != NULL);
332
333         next->do_index_ops->dio_it.fini(env, it->lit_it);
334
335         /* the iterator not in use any more */
336         it->lit_obj = NULL;
337         it->lit_it = NULL;
338         it->lit_stripe_index = 0;
339 }
340
341 /**
342  * Implementation of dt_index_operations:: dio_it.get
343  *
344  * This function is to position the iterator with given key
345  *
346  * \param[in] env       execution environment.
347  * \param[in] di        the iterator for striped directory.
348  * \param[in] key       the key the iterator will be positioned.
349  *
350  * \retval      0 if successfully position iterator by the key.
351  * \retval      negative error if position is failed.
352  */
353 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
354                               const struct dt_key *key)
355 {
356         const struct lod_it     *it = (const struct lod_it *)di;
357         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
358         struct dt_object        *next;
359         ENTRY;
360
361         LOD_CHECK_STRIPED_IT(env, it, lo);
362
363         next = lo->ldo_stripe[it->lit_stripe_index];
364         LASSERT(next != NULL);
365         LASSERT(next->do_index_ops != NULL);
366
367         return next->do_index_ops->dio_it.get(env, it->lit_it, key);
368 }
369
370 /**
371  * Implementation of dt_index_operations:: dio_it.put
372  *
373  * This function is supposed to be the pair of it_get, but currently do
374  * nothing. see (osd_it_ea_put or osd_index_it_put)
375  */
376 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
377 {
378         struct lod_it           *it = (struct lod_it *)di;
379         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
380         struct dt_object        *next;
381
382         LOD_CHECK_STRIPED_IT(env, it, lo);
383
384         next = lo->ldo_stripe[it->lit_stripe_index];
385         LASSERT(next != NULL);
386         LASSERT(next->do_index_ops != NULL);
387
388         return next->do_index_ops->dio_it.put(env, it->lit_it);
389 }
390
391 /**
392  * Implementation of dt_index_operations:: dio_it.next
393  *
394  * This function is to position the iterator to the next entry, if current
395  * stripe is finished by checking the return value of next() in current
396  * stripe. it will go to next stripe. In the mean time, the sub-iterator
397  * for next stripe needs to be initialized.
398  *
399  * \param[in] env       execution environment.
400  * \param[in] di        the iterator for striped directory.
401  *
402  * \retval      0 if successfully position iterator to the next entry.
403  * \retval      negative error if position is failed.
404  */
405 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
406 {
407         struct lod_it           *it = (struct lod_it *)di;
408         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
409         struct dt_object        *next;
410         struct dt_it            *it_next;
411         int                     rc;
412         ENTRY;
413
414         LOD_CHECK_STRIPED_IT(env, it, lo);
415
416         next = lo->ldo_stripe[it->lit_stripe_index];
417         LASSERT(next != NULL);
418         LASSERT(next->do_index_ops != NULL);
419 again:
420         rc = next->do_index_ops->dio_it.next(env, it->lit_it);
421         if (rc < 0)
422                 RETURN(rc);
423
424         if (rc == 0 && it->lit_stripe_index == 0)
425                 RETURN(rc);
426
427         if (rc == 0 && it->lit_stripe_index > 0) {
428                 struct lu_dirent *ent;
429
430                 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
431
432                 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
433                                                     (struct dt_rec *)ent,
434                                                     it->lit_attr);
435                 if (rc != 0)
436                         RETURN(rc);
437
438                 /* skip . and .. for slave stripe */
439                 if ((strncmp(ent->lde_name, ".",
440                              le16_to_cpu(ent->lde_namelen)) == 0 &&
441                      le16_to_cpu(ent->lde_namelen) == 1) ||
442                     (strncmp(ent->lde_name, "..",
443                              le16_to_cpu(ent->lde_namelen)) == 0 &&
444                      le16_to_cpu(ent->lde_namelen) == 2))
445                         goto again;
446
447                 RETURN(rc);
448         }
449
450         /* go to next stripe */
451         if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
452                 RETURN(1);
453
454         it->lit_stripe_index++;
455
456         next->do_index_ops->dio_it.put(env, it->lit_it);
457         next->do_index_ops->dio_it.fini(env, it->lit_it);
458
459         rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
460         if (rc != 0)
461                 RETURN(rc);
462
463         next = lo->ldo_stripe[it->lit_stripe_index];
464         LASSERT(next != NULL);
465         LASSERT(next->do_index_ops != NULL);
466
467         it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
468                                                   BYPASS_CAPA);
469         if (!IS_ERR(it_next)) {
470                 it->lit_it = it_next;
471                 goto again;
472         } else {
473                 rc = PTR_ERR(it_next);
474         }
475
476         RETURN(rc);
477 }
478
479 /**
480  * Implementation of dt_index_operations:: dio_it.key
481  *
482  * This function is to get the key of the iterator at current position.
483  *
484  * \param[in] env       execution environment.
485  * \param[in] di        the iterator for striped directory.
486  *
487  * \retval      key(dt_key) if successfully get the key.
488  * \retval      negative error if can not get the key.
489  */
490 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
491                                          const struct dt_it *di)
492 {
493         const struct lod_it     *it = (const struct lod_it *)di;
494         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
495         struct dt_object        *next;
496
497         LOD_CHECK_STRIPED_IT(env, it, lo);
498
499         next = lo->ldo_stripe[it->lit_stripe_index];
500         LASSERT(next != NULL);
501         LASSERT(next->do_index_ops != NULL);
502
503         return next->do_index_ops->dio_it.key(env, it->lit_it);
504 }
505
506 /**
507  * Implementation of dt_index_operations:: dio_it.key_size
508  *
509  * This function is to get the key_size of current key.
510  *
511  * \param[in] env       execution environment.
512  * \param[in] di        the iterator for striped directory.
513  *
514  * \retval      key_size if successfully get the key_size.
515  * \retval      negative error if can not get the key_size.
516  */
517 static int lod_striped_it_key_size(const struct lu_env *env,
518                                    const struct dt_it *di)
519 {
520         struct lod_it           *it = (struct lod_it *)di;
521         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
522         struct dt_object        *next;
523
524         LOD_CHECK_STRIPED_IT(env, it, lo);
525
526         next = lo->ldo_stripe[it->lit_stripe_index];
527         LASSERT(next != NULL);
528         LASSERT(next->do_index_ops != NULL);
529
530         return next->do_index_ops->dio_it.key_size(env, it->lit_it);
531 }
532
533 /**
534  * Implementation of dt_index_operations:: dio_it.rec
535  *
536  * This function is to get the record at current position.
537  *
538  * \param[in] env       execution environment.
539  * \param[in] di        the iterator for striped directory.
540  * \param[in] attr      the attribute of iterator, mostly used to indicate
541  *                      the entry attribute in the object to be iterated.
542  * \param[out] rec      hold the return record.
543  *
544  * \retval      0 if successfully get the entry.
545  * \retval      negative error if can not get entry.
546  */
547 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
548                               struct dt_rec *rec, __u32 attr)
549 {
550         const struct lod_it     *it = (const struct lod_it *)di;
551         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
552         struct dt_object        *next;
553
554         LOD_CHECK_STRIPED_IT(env, it, lo);
555
556         next = lo->ldo_stripe[it->lit_stripe_index];
557         LASSERT(next != NULL);
558         LASSERT(next->do_index_ops != NULL);
559
560         return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
561 }
562
563 /**
564  * Implementation of dt_index_operations:: dio_it.rec_size
565  *
566  * This function is to get the record_size at current record.
567  *
568  * \param[in] env       execution environment.
569  * \param[in] di        the iterator for striped directory.
570  * \param[in] attr      the attribute of iterator, mostly used to indicate
571  *                      the entry attribute in the object to be iterated.
572  *
573  * \retval      rec_size if successfully get the entry size.
574  * \retval      negative error if can not get entry size.
575  */
576 static int lod_striped_it_rec_size(const struct lu_env *env,
577                                    const struct dt_it *di, __u32 attr)
578 {
579         struct lod_it           *it = (struct lod_it *)di;
580         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
581         struct dt_object        *next;
582
583         LOD_CHECK_STRIPED_IT(env, it, lo);
584
585         next = lo->ldo_stripe[it->lit_stripe_index];
586         LASSERT(next != NULL);
587         LASSERT(next->do_index_ops != NULL);
588
589         return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
590 }
591
592 /**
593  * Implementation of dt_index_operations:: dio_it.store
594  *
595  * This function will a cookie for current position of the iterator head,
596  * so that user can use this cookie to load/start the iterator next time.
597  *
598  * \param[in] env       execution environment.
599  * \param[in] di        the iterator for striped directory.
600  *
601  * \retval      the cookie.
602  */
603 static __u64 lod_striped_it_store(const struct lu_env *env,
604                                   const struct dt_it *di)
605 {
606         const struct lod_it     *it = (const struct lod_it *)di;
607         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
608         struct dt_object        *next;
609
610         LOD_CHECK_STRIPED_IT(env, it, lo);
611
612         next = lo->ldo_stripe[it->lit_stripe_index];
613         LASSERT(next != NULL);
614         LASSERT(next->do_index_ops != NULL);
615
616         return next->do_index_ops->dio_it.store(env, it->lit_it);
617 }
618
619 /**
620  * Implementation of dt_index_operations:: dio_it.load
621  *
622  * This function will position the iterator with the given hash(usually
623  * get from store),
624  *
625  * \param[in] env       execution environment.
626  * \param[in] di        the iterator for striped directory.
627  * \param[in] hash      the given hash.
628  *
629  * \retval      >0 if successfuly load the iterator to the given position.
630  * \retval      <0 if load is failed.
631  */
632 static int lod_striped_it_load(const struct lu_env *env,
633                                const struct dt_it *di, __u64 hash)
634 {
635         const struct lod_it     *it = (const struct lod_it *)di;
636         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
637         struct dt_object        *next;
638
639         LOD_CHECK_STRIPED_IT(env, it, lo);
640
641         next = lo->ldo_stripe[it->lit_stripe_index];
642         LASSERT(next != NULL);
643         LASSERT(next->do_index_ops != NULL);
644
645         return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
646 }
647
648 static struct dt_index_operations lod_striped_index_ops = {
649         .dio_lookup             = lod_index_lookup,
650         .dio_declare_insert     = lod_declare_index_insert,
651         .dio_insert             = lod_index_insert,
652         .dio_declare_delete     = lod_declare_index_delete,
653         .dio_delete             = lod_index_delete,
654         .dio_it = {
655                 .init           = lod_striped_it_init,
656                 .fini           = lod_striped_it_fini,
657                 .get            = lod_striped_it_get,
658                 .put            = lod_striped_it_put,
659                 .next           = lod_striped_it_next,
660                 .key            = lod_striped_it_key,
661                 .key_size       = lod_striped_it_key_size,
662                 .rec            = lod_striped_it_rec,
663                 .rec_size       = lod_striped_it_rec_size,
664                 .store          = lod_striped_it_store,
665                 .load           = lod_striped_it_load,
666         }
667 };
668
669 /**
670  * Implementation of dt_object_operations:: do_index_try
671  *
672  * This function will try to initialize the index api pointer for the
673  * given object, usually it the entry point of the index api. i.e.
674  * the index object should be initialized in index_try, then start
675  * using index api. For striped directory, it will try to initialize
676  * all of its sub_stripes.
677  *
678  * \param[in] env       execution environment.
679  * \param[in] dt        the index object to be initialized.
680  * \param[in] feat      the features of this object, for example fixed or
681  *                      variable key size etc.
682  *
683  * \retval      >0 if the initialization is successful.
684  * \retval      <0 if the initialization is failed.
685  */
686 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
687                          const struct dt_index_features *feat)
688 {
689         struct lod_object       *lo = lod_dt_obj(dt);
690         struct dt_object        *next = dt_object_child(dt);
691         int                     rc;
692         ENTRY;
693
694         LASSERT(next->do_ops);
695         LASSERT(next->do_ops->do_index_try);
696
697         rc = lod_load_striping_locked(env, lo);
698         if (rc != 0)
699                 RETURN(rc);
700
701         rc = next->do_ops->do_index_try(env, next, feat);
702         if (rc != 0)
703                 RETURN(rc);
704
705         if (lo->ldo_stripenr > 0) {
706                 int i;
707
708                 for (i = 0; i < lo->ldo_stripenr; i++) {
709                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
710                                 continue;
711                         rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
712                                                 lo->ldo_stripe[i], feat);
713                         if (rc != 0)
714                                 RETURN(rc);
715                 }
716                 dt->do_index_ops = &lod_striped_index_ops;
717         } else {
718                 dt->do_index_ops = &lod_index_ops;
719         }
720
721         RETURN(rc);
722 }
723
724 static void lod_object_read_lock(const struct lu_env *env,
725                                  struct dt_object *dt, unsigned role)
726 {
727         dt_read_lock(env, dt_object_child(dt), role);
728 }
729
730 static void lod_object_write_lock(const struct lu_env *env,
731                                   struct dt_object *dt, unsigned role)
732 {
733         dt_write_lock(env, dt_object_child(dt), role);
734 }
735
736 static void lod_object_read_unlock(const struct lu_env *env,
737                                    struct dt_object *dt)
738 {
739         dt_read_unlock(env, dt_object_child(dt));
740 }
741
742 static void lod_object_write_unlock(const struct lu_env *env,
743                                     struct dt_object *dt)
744 {
745         dt_write_unlock(env, dt_object_child(dt));
746 }
747
748 static int lod_object_write_locked(const struct lu_env *env,
749                                    struct dt_object *dt)
750 {
751         return dt_write_locked(env, dt_object_child(dt));
752 }
753
754 static int lod_attr_get(const struct lu_env *env,
755                         struct dt_object *dt,
756                         struct lu_attr *attr,
757                         struct lustre_capa *capa)
758 {
759         struct lod_object *lo = lod_dt_obj(dt);
760         int i;
761         int rc;
762         ENTRY;
763
764         rc = dt_attr_get(env, dt_object_child(dt), attr, capa);
765         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr) || rc != 0)
766                 RETURN(rc);
767
768         rc = lod_load_striping_locked(env, lo);
769         if (rc)
770                 RETURN(rc);
771
772         if (lo->ldo_stripenr == 0)
773                 RETURN(rc);
774
775         attr->la_nlink = 2;
776         attr->la_size = 0;
777         for (i = 0; i < lo->ldo_stripenr; i++) {
778                 struct lu_attr *sub_attr = &lod_env_info(env)->lti_attr;
779
780                 LASSERT(lo->ldo_stripe[i]);
781                 if (dt_object_exists(lo->ldo_stripe[i]))
782                         continue;
783
784                 rc = dt_attr_get(env, lo->ldo_stripe[i], sub_attr, capa);
785                 if (rc != 0)
786                         break;
787
788                 /* -2 for . and .. on each stripe */
789                 if (sub_attr->la_valid & LA_NLINK && attr->la_valid & LA_NLINK)
790                         attr->la_nlink += sub_attr->la_nlink - 2;
791                 if (sub_attr->la_valid & LA_SIZE && attr->la_valid & LA_SIZE)
792                         attr->la_size += sub_attr->la_size;
793
794                 if (sub_attr->la_valid & LA_ATIME &&
795                     attr->la_valid & LA_ATIME &&
796                     attr->la_atime < sub_attr->la_atime)
797                         attr->la_atime = sub_attr->la_atime;
798
799                 if (sub_attr->la_valid & LA_CTIME &&
800                     attr->la_valid & LA_CTIME &&
801                     attr->la_ctime < sub_attr->la_ctime)
802                         attr->la_ctime = sub_attr->la_ctime;
803
804                 if (sub_attr->la_valid & LA_MTIME &&
805                     attr->la_valid & LA_MTIME &&
806                     attr->la_mtime < sub_attr->la_mtime)
807                         attr->la_mtime = sub_attr->la_mtime;
808         }
809
810         CDEBUG(D_INFO, DFID" stripe_count %d nlink %u size "LPU64"\n",
811                PFID(lu_object_fid(&dt->do_lu)), lo->ldo_stripenr,
812                attr->la_nlink, attr->la_size);
813
814         RETURN(rc);
815 }
816
817 /**
818  * Mark all of sub-stripes dead of the striped directory.
819  **/
820 static int lod_mark_dead_object(const struct lu_env *env,
821                                 struct dt_object *dt,
822                                 struct thandle *handle,
823                                 bool declare)
824 {
825         struct lod_object       *lo = lod_dt_obj(dt);
826         struct lmv_mds_md_v1    *lmv;
827         __u32                   dead_hash_type;
828         int                     rc;
829         int                     i;
830
831         ENTRY;
832
833         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
834                 RETURN(0);
835
836         rc = lod_load_striping_locked(env, lo);
837         if (rc != 0)
838                 RETURN(rc);
839
840         if (lo->ldo_stripenr == 0)
841                 RETURN(0);
842
843         rc = lod_get_lmv_ea(env, lo);
844         if (rc <= 0)
845                 RETURN(rc);
846
847         lmv = lod_env_info(env)->lti_ea_store;
848         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
849         dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
850         lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
851         for (i = 0; i < lo->ldo_stripenr; i++) {
852                 struct lu_buf buf;
853
854                 lmv->lmv_master_mdt_index = i;
855                 buf.lb_buf = lmv;
856                 buf.lb_len = sizeof(*lmv);
857                 if (declare) {
858                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
859                                                   XATTR_NAME_LMV,
860                                                   LU_XATTR_REPLACE, handle);
861                 } else {
862                         rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
863                                           XATTR_NAME_LMV, LU_XATTR_REPLACE,
864                                           handle, BYPASS_CAPA);
865                 }
866                 if (rc != 0)
867                         break;
868         }
869
870         RETURN(rc);
871 }
872
873 static int lod_declare_attr_set(const struct lu_env *env,
874                                 struct dt_object *dt,
875                                 const struct lu_attr *attr,
876                                 struct thandle *handle)
877 {
878         struct dt_object  *next = dt_object_child(dt);
879         struct lod_object *lo = lod_dt_obj(dt);
880         int                rc, i;
881         ENTRY;
882
883         /* Set dead object on all other stripes */
884         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
885             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
886                 rc = lod_mark_dead_object(env, dt, handle, true);
887                 RETURN(rc);
888         }
889
890         /*
891          * declare setattr on the local object
892          */
893         rc = dt_declare_attr_set(env, next, attr, handle);
894         if (rc)
895                 RETURN(rc);
896
897         /* osp_declare_attr_set() ignores all attributes other than
898          * UID, GID, and size, and osp_attr_set() ignores all but UID
899          * and GID.  Declaration of size attr setting happens through
900          * lod_declare_init_size(), and not through this function.
901          * Therefore we need not load striping unless ownership is
902          * changing.  This should save memory and (we hope) speed up
903          * rename(). */
904         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
905                 if (!(attr->la_valid & (LA_UID | LA_GID)))
906                         RETURN(rc);
907
908                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
909                         RETURN(0);
910         } else {
911                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
912                                         LA_ATIME | LA_MTIME | LA_CTIME)))
913                         RETURN(rc);
914         }
915         /*
916          * load striping information, notice we don't do this when object
917          * is being initialized as we don't need this information till
918          * few specific cases like destroy, chown
919          */
920         rc = lod_load_striping(env, lo);
921         if (rc)
922                 RETURN(rc);
923
924         if (lo->ldo_stripenr == 0)
925                 RETURN(0);
926
927         /*
928          * if object is striped declare changes on the stripes
929          */
930         LASSERT(lo->ldo_stripe);
931         for (i = 0; i < lo->ldo_stripenr; i++) {
932                 if (likely(lo->ldo_stripe[i] != NULL)) {
933                         rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
934                                                  handle);
935                         if (rc != 0) {
936                                 CERROR("failed declaration: %d\n", rc);
937                                 break;
938                         }
939                 }
940         }
941
942         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
943             dt_object_exists(next) != 0 &&
944             dt_object_remote(next) == 0)
945                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
946
947         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
948             dt_object_exists(next) &&
949             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
950                 struct lod_thread_info *info = lod_env_info(env);
951                 struct lu_buf *buf = &info->lti_buf;
952
953                 buf->lb_buf = info->lti_ea_store;
954                 buf->lb_len = info->lti_ea_store_size;
955                 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
956                                      LU_XATTR_REPLACE, handle);
957         }
958
959         RETURN(rc);
960 }
961
962 static int lod_attr_set(const struct lu_env *env,
963                         struct dt_object *dt,
964                         const struct lu_attr *attr,
965                         struct thandle *handle,
966                         struct lustre_capa *capa)
967 {
968         struct dt_object        *next = dt_object_child(dt);
969         struct lod_object       *lo = lod_dt_obj(dt);
970         int                     rc, i;
971         ENTRY;
972
973         /* Set dead object on all other stripes */
974         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
975             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
976                 rc = lod_mark_dead_object(env, dt, handle, false);
977                 RETURN(rc);
978         }
979
980         /*
981          * apply changes to the local object
982          */
983         rc = dt_attr_set(env, next, attr, handle, capa);
984         if (rc)
985                 RETURN(rc);
986
987         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
988                 if (!(attr->la_valid & (LA_UID | LA_GID)))
989                         RETURN(rc);
990
991                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
992                         RETURN(0);
993         } else {
994                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
995                                         LA_ATIME | LA_MTIME | LA_CTIME)))
996                         RETURN(rc);
997         }
998
999         if (lo->ldo_stripenr == 0)
1000                 RETURN(0);
1001
1002         /*
1003          * if object is striped, apply changes to all the stripes
1004          */
1005         LASSERT(lo->ldo_stripe);
1006         for (i = 0; i < lo->ldo_stripenr; i++) {
1007                 if (likely(lo->ldo_stripe[i] != NULL)) {
1008                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
1009                                 continue;
1010
1011                         rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
1012                                          handle, capa);
1013                         if (rc != 0) {
1014                                 CERROR("failed declaration: %d\n", rc);
1015                                 break;
1016                         }
1017                 }
1018         }
1019
1020         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1021             dt_object_exists(next) != 0 &&
1022             dt_object_remote(next) == 0)
1023                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1024
1025         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1026             dt_object_exists(next) &&
1027             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1028                 struct lod_thread_info *info = lod_env_info(env);
1029                 struct lu_buf *buf = &info->lti_buf;
1030                 struct ost_id *oi = &info->lti_ostid;
1031                 struct lu_fid *fid = &info->lti_fid;
1032                 struct lov_mds_md_v1 *lmm;
1033                 struct lov_ost_data_v1 *objs;
1034                 __u32 magic;
1035                 int rc1;
1036
1037                 rc1 = lod_get_lov_ea(env, lo);
1038                 if (rc1  <= 0)
1039                         RETURN(rc);
1040
1041                 buf->lb_buf = info->lti_ea_store;
1042                 buf->lb_len = info->lti_ea_store_size;
1043                 lmm = info->lti_ea_store;
1044                 magic = le32_to_cpu(lmm->lmm_magic);
1045                 if (magic == LOV_MAGIC_V1)
1046                         objs = &(lmm->lmm_objects[0]);
1047                 else
1048                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1049                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1050                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1051                 fid->f_oid--;
1052                 fid_to_ostid(fid, oi);
1053                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1054                 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1055                              LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1056         }
1057
1058         RETURN(rc);
1059 }
1060
1061 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1062                          struct lu_buf *buf, const char *name,
1063                          struct lustre_capa *capa)
1064 {
1065         struct lod_thread_info  *info = lod_env_info(env);
1066         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
1067         int                      rc, is_root;
1068         ENTRY;
1069
1070         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1071         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1072                 RETURN(rc);
1073
1074         /*
1075          * lod returns default striping on the real root of the device
1076          * this is like the root stores default striping for the whole
1077          * filesystem. historically we've been using a different approach
1078          * and store it in the config.
1079          */
1080         dt_root_get(env, dev->lod_child, &info->lti_fid);
1081         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1082
1083         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1084                 struct lov_user_md *lum = buf->lb_buf;
1085                 struct lov_desc    *desc = &dev->lod_desc;
1086
1087                 if (buf->lb_buf == NULL) {
1088                         rc = sizeof(*lum);
1089                 } else if (buf->lb_len >= sizeof(*lum)) {
1090                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1091                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1092                         lmm_oi_set_id(&lum->lmm_oi, 0);
1093                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1094                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1095                         lum->lmm_stripe_size = cpu_to_le32(
1096                                                 desc->ld_default_stripe_size);
1097                         lum->lmm_stripe_count = cpu_to_le16(
1098                                                 desc->ld_default_stripe_count);
1099                         lum->lmm_stripe_offset = cpu_to_le16(
1100                                                 desc->ld_default_stripe_offset);
1101                         rc = sizeof(*lum);
1102                 } else {
1103                         rc = -ERANGE;
1104                 }
1105         }
1106
1107         RETURN(rc);
1108 }
1109
1110 static int lod_verify_md_striping(struct lod_device *lod,
1111                                   const struct lmv_user_md_v1 *lum)
1112 {
1113         int     rc = 0;
1114         ENTRY;
1115
1116         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1117                 GOTO(out, rc = -EINVAL);
1118
1119         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1120                 GOTO(out, rc = -EINVAL);
1121 out:
1122         if (rc != 0)
1123                 CERROR("%s: invalid lmv_user_md: magic = %x, "
1124                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1125                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1126                        (int)le32_to_cpu(lum->lum_stripe_offset),
1127                        le32_to_cpu(lum->lum_stripe_count), rc);
1128         return rc;
1129 }
1130
1131 /**
1132  * Master LMVEA will be same as slave LMVEA, except
1133  * 1. different magic
1134  * 2. No lmv_stripe_fids on slave
1135  * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1136  */
1137 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1138                                   const struct lmv_mds_md_v1 *master_lmv)
1139 {
1140         *slave_lmv = *master_lmv;
1141         slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1142 }
1143
1144 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1145                     struct lu_buf *lmv_buf)
1146 {
1147         struct lod_thread_info  *info = lod_env_info(env);
1148         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1149         struct lod_object       *lo = lod_dt_obj(dt);
1150         struct lmv_mds_md_v1    *lmm1;
1151         int                     stripe_count;
1152         int                     lmm_size;
1153         int                     type = LU_SEQ_RANGE_ANY;
1154         int                     i;
1155         int                     rc;
1156         __u32                   mdtidx;
1157         ENTRY;
1158
1159         LASSERT(lo->ldo_dir_striped != 0);
1160         LASSERT(lo->ldo_stripenr > 0);
1161         stripe_count = lo->ldo_stripenr;
1162         lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
1163         if (info->lti_ea_store_size < lmm_size) {
1164                 rc = lod_ea_store_resize(info, lmm_size);
1165                 if (rc != 0)
1166                         RETURN(rc);
1167         }
1168
1169         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1170         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1171         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1172         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1173         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1174                             &mdtidx, &type);
1175         if (rc != 0)
1176                 RETURN(rc);
1177
1178         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1179         fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
1180         for (i = 0; i < lo->ldo_stripenr; i++) {
1181                 struct dt_object *dto;
1182
1183                 dto = lo->ldo_stripe[i];
1184                 LASSERT(dto != NULL);
1185                 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
1186                               lu_object_fid(&dto->do_lu));
1187         }
1188
1189         lmv_buf->lb_buf = info->lti_ea_store;
1190         lmv_buf->lb_len = lmm_size;
1191         lo->ldo_dir_striping_cached = 1;
1192
1193         RETURN(rc);
1194 }
1195
1196 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1197                            const struct lu_buf *buf)
1198 {
1199         struct lod_thread_info  *info = lod_env_info(env);
1200         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1201         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1202         struct dt_object        **stripe;
1203         union lmv_mds_md        *lmm = buf->lb_buf;
1204         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
1205         struct lu_fid           *fid = &info->lti_fid;
1206         int                     i;
1207         int                     rc = 0;
1208         ENTRY;
1209
1210         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1211                 RETURN(0);
1212
1213         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1214                 lo->ldo_dir_slave_stripe = 1;
1215                 RETURN(0);
1216         }
1217
1218         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1219                 RETURN(-EINVAL);
1220
1221         if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
1222                 RETURN(0);
1223
1224         LASSERT(lo->ldo_stripe == NULL);
1225         OBD_ALLOC(stripe, sizeof(stripe[0]) *
1226                   (le32_to_cpu(lmv1->lmv_stripe_count)));
1227         if (stripe == NULL)
1228                 RETURN(-ENOMEM);
1229
1230         for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1231                 struct dt_device        *tgt_dt;
1232                 struct dt_object        *dto;
1233                 int                     type = LU_SEQ_RANGE_ANY;
1234                 __u32                   idx;
1235
1236                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1237                 if (!fid_is_sane(fid))
1238                         GOTO(out, rc = -ESTALE);
1239
1240                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1241                 if (rc != 0)
1242                         GOTO(out, rc);
1243
1244                 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1245                         tgt_dt = lod->lod_child;
1246                 } else {
1247                         struct lod_tgt_desc     *tgt;
1248
1249                         tgt = LTD_TGT(ltd, idx);
1250                         if (tgt == NULL)
1251                                 GOTO(out, rc = -ESTALE);
1252                         tgt_dt = tgt->ltd_tgt;
1253                 }
1254
1255                 dto = dt_locate_at(env, tgt_dt, fid,
1256                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1257                                   NULL);
1258                 if (IS_ERR(dto))
1259                         GOTO(out, rc = PTR_ERR(dto));
1260
1261                 stripe[i] = dto;
1262         }
1263 out:
1264         lo->ldo_stripe = stripe;
1265         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1266         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1267         if (rc != 0)
1268                 lod_object_free_striping(env, lo);
1269
1270         RETURN(rc);
1271 }
1272
1273 static int lod_prep_md_striped_create(const struct lu_env *env,
1274                                       struct dt_object *dt,
1275                                       struct lu_attr *attr,
1276                                       const struct lmv_user_md_v1 *lum,
1277                                       struct dt_object_format *dof,
1278                                       struct thandle *th)
1279 {
1280         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1281         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1282         struct lod_object       *lo = lod_dt_obj(dt);
1283         struct lod_thread_info  *info = lod_env_info(env);
1284         struct dt_object        **stripe;
1285         struct lu_buf           lmv_buf;
1286         struct lu_buf           slave_lmv_buf;
1287         struct lmv_mds_md_v1    *lmm;
1288         struct lmv_mds_md_v1    *slave_lmm = NULL;
1289         int                     stripe_count;
1290         int                     *idx_array;
1291         int                     rc = 0;
1292         int                     i;
1293         int                     j;
1294         ENTRY;
1295
1296         /* The lum has been verifed in lod_verify_md_striping */
1297         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1298         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1299
1300         stripe_count = le32_to_cpu(lum->lum_stripe_count);
1301
1302         /* shrink the stripe_count to the avaible MDT count */
1303         if (stripe_count > lod->lod_remote_mdt_count + 1)
1304                 stripe_count = lod->lod_remote_mdt_count + 1;
1305
1306         OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1307         if (stripe == NULL)
1308                 RETURN(-ENOMEM);
1309
1310         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1311         if (idx_array == NULL)
1312                 GOTO(out_free, rc = -ENOMEM);
1313
1314         for (i = 0; i < stripe_count; i++) {
1315                 struct lod_tgt_desc     *tgt = NULL;
1316                 struct dt_object        *dto;
1317                 struct lu_fid           fid = { 0 };
1318                 int                     idx;
1319                 struct lu_object_conf   conf = { 0 };
1320                 struct dt_device        *tgt_dt = NULL;
1321
1322                 if (i == 0) {
1323                         /* Right now, master stripe and master object are
1324                          * on the same MDT */
1325                         idx = le32_to_cpu(lum->lum_stripe_offset);
1326                         rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1327                                            NULL);
1328                         if (rc < 0)
1329                                 GOTO(out_put, rc);
1330                         tgt_dt = lod->lod_child;
1331                         goto next;
1332                 }
1333
1334                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1335
1336                 for (j = 0; j < lod->lod_remote_mdt_count;
1337                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1338                         bool already_allocated = false;
1339                         int k;
1340
1341                         CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1342                                " allocated %d, last allocated %d\n", idx,
1343                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1344
1345                         /* Find next available target */
1346                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1347                                 continue;
1348
1349                         /* check whether the idx already exists
1350                          * in current allocated array */
1351                         for (k = 0; k < i; k++) {
1352                                 if (idx_array[k] == idx) {
1353                                         already_allocated = true;
1354                                         break;
1355                                 }
1356                         }
1357
1358                         if (already_allocated)
1359                                 continue;
1360
1361                         /* check the status of the OSP */
1362                         tgt = LTD_TGT(ltd, idx);
1363                         if (tgt == NULL)
1364                                 continue;
1365
1366                         tgt_dt = tgt->ltd_tgt;
1367                         rc = dt_statfs(env, tgt_dt, NULL);
1368                         if (rc) {
1369                                 /* this OSP doesn't feel well */
1370                                 rc = 0;
1371                                 continue;
1372                         }
1373
1374                         rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1375                         if (rc < 0) {
1376                                 rc = 0;
1377                                 continue;
1378                         }
1379
1380                         break;
1381                 }
1382
1383                 /* Can not allocate more stripes */
1384                 if (j == lod->lod_remote_mdt_count) {
1385                         CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1386                                lod2obd(lod)->obd_name, stripe_count, i - 1);
1387                         break;
1388                 }
1389
1390                 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1391                        " allocated %d, last allocated %d\n", idx,
1392                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1393
1394 next:
1395                 /* tgt_dt and fid must be ready after search avaible OSP
1396                  * in the above loop */
1397                 LASSERT(tgt_dt != NULL);
1398                 LASSERT(fid_is_sane(&fid));
1399                 conf.loc_flags = LOC_F_NEW;
1400                 dto = dt_locate_at(env, tgt_dt, &fid,
1401                                    dt->do_lu.lo_dev->ld_site->ls_top_dev,
1402                                    &conf);
1403                 if (IS_ERR(dto))
1404                         GOTO(out_put, rc = PTR_ERR(dto));
1405                 stripe[i] = dto;
1406                 idx_array[i] = idx;
1407         }
1408
1409         lo->ldo_dir_striped = 1;
1410         lo->ldo_stripe = stripe;
1411         lo->ldo_stripenr = i;
1412         lo->ldo_stripes_allocated = stripe_count;
1413
1414         if (lo->ldo_stripenr == 0)
1415                 GOTO(out_put, rc = -ENOSPC);
1416
1417         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1418         if (rc != 0)
1419                 GOTO(out_put, rc);
1420         lmm = lmv_buf.lb_buf;
1421
1422         OBD_ALLOC_PTR(slave_lmm);
1423         if (slave_lmm == NULL)
1424                 GOTO(out_put, rc = -ENOMEM);
1425
1426         lod_prep_slave_lmv_md(slave_lmm, lmm);
1427         slave_lmv_buf.lb_buf = slave_lmm;
1428         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1429
1430         if (!dt_try_as_dir(env, dt_object_child(dt)))
1431                 GOTO(out_put, rc = -EINVAL);
1432
1433         for (i = 0; i < lo->ldo_stripenr; i++) {
1434                 struct dt_object *dto = stripe[i];
1435                 char             *stripe_name = info->lti_key;
1436
1437                 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1438                 if (rc != 0)
1439                         GOTO(out_put, rc);
1440
1441                 if (!dt_try_as_dir(env, dto))
1442                         GOTO(out_put, rc = -EINVAL);
1443
1444                 rc = dt_declare_insert(env, dto,
1445                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1446                      (const struct dt_key *)dot, th);
1447                 if (rc != 0)
1448                         GOTO(out_put, rc);
1449
1450                 /* master stripe FID will be put to .. */
1451                 rc = dt_declare_insert(env, dto,
1452                      (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1453                      (const struct dt_key *)dotdot, th);
1454                 if (rc != 0)
1455                         GOTO(out_put, rc);
1456
1457                 /* probably nothing to inherite */
1458                 if (lo->ldo_striping_cached &&
1459                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1460                                          lo->ldo_def_stripenr,
1461                                          lo->ldo_def_stripe_offset)) {
1462                         struct lov_user_md_v3   *v3;
1463
1464                         /* sigh, lti_ea_store has been used for lmv_buf,
1465                          * so we have to allocate buffer for default
1466                          * stripe EA */
1467                         OBD_ALLOC_PTR(v3);
1468                         if (v3 == NULL)
1469                                 GOTO(out_put, rc = -ENOMEM);
1470
1471                         memset(v3, 0, sizeof(*v3));
1472                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1473                         v3->lmm_stripe_count =
1474                                 cpu_to_le16(lo->ldo_def_stripenr);
1475                         v3->lmm_stripe_offset =
1476                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1477                         v3->lmm_stripe_size =
1478                                 cpu_to_le32(lo->ldo_def_stripe_size);
1479                         if (lo->ldo_pool)
1480                                 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1481                                         LOV_MAXPOOLNAME);
1482
1483                         info->lti_buf.lb_buf = v3;
1484                         info->lti_buf.lb_len = sizeof(*v3);
1485                         rc = dt_declare_xattr_set(env, dto,
1486                                                   &info->lti_buf,
1487                                                   XATTR_NAME_LOV,
1488                                                   0, th);
1489                         OBD_FREE_PTR(v3);
1490                         if (rc != 0)
1491                                 GOTO(out_put, rc);
1492                 }
1493
1494                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1495                 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1496                                           XATTR_NAME_LMV, 0, th);
1497                 if (rc != 0)
1498                         GOTO(out_put, rc);
1499
1500                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1501                         PFID(lu_object_fid(&dto->do_lu)), i);
1502                 rc = dt_declare_insert(env, dt_object_child(dt),
1503                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1504                      (const struct dt_key *)stripe_name, th);
1505                 if (rc != 0)
1506                         GOTO(out_put, rc);
1507
1508                 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1509                 if (rc != 0)
1510                         GOTO(out_put, rc);
1511         }
1512
1513         rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1514                                   XATTR_NAME_LMV, 0, th);
1515         if (rc != 0)
1516                 GOTO(out_put, rc);
1517
1518 out_put:
1519         if (rc < 0) {
1520                 for (i = 0; i < stripe_count; i++)
1521                         if (stripe[i] != NULL)
1522                                 lu_object_put(env, &stripe[i]->do_lu);
1523                 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1524                 lo->ldo_stripenr = 0;
1525                 lo->ldo_stripes_allocated = 0;
1526                 lo->ldo_stripe = NULL;
1527         }
1528
1529 out_free:
1530         if (idx_array != NULL)
1531                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1532         if (slave_lmm != NULL)
1533                 OBD_FREE_PTR(slave_lmm);
1534
1535         RETURN(rc);
1536 }
1537
1538 /**
1539  * Declare create striped md object.
1540  */
1541 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1542                                      struct dt_object *dt,
1543                                      struct lu_attr *attr,
1544                                      const struct lu_buf *lum_buf,
1545                                      struct dt_object_format *dof,
1546                                      struct thandle *th)
1547 {
1548         struct lod_object       *lo = lod_dt_obj(dt);
1549         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1550         struct lmv_user_md_v1   *lum;
1551         int                     rc;
1552         ENTRY;
1553
1554         lum = lum_buf->lb_buf;
1555         LASSERT(lum != NULL);
1556
1557         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1558                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1559                (int)le32_to_cpu(lum->lum_stripe_offset));
1560
1561         if (le32_to_cpu(lum->lum_stripe_count) == 0)
1562                 GOTO(out, rc = 0);
1563
1564         rc = lod_verify_md_striping(lod, lum);
1565         if (rc != 0)
1566                 GOTO(out, rc);
1567
1568         /* prepare dir striped objects */
1569         rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1570         if (rc != 0) {
1571                 /* failed to create striping, let's reset
1572                  * config so that others don't get confused */
1573                 lod_object_free_striping(env, lo);
1574                 GOTO(out, rc);
1575         }
1576 out:
1577         RETURN(rc);
1578 }
1579
1580 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1581                                      struct dt_object *dt,
1582                                      const struct lu_buf *buf,
1583                                      const char *name, int fl,
1584                                      struct thandle *th)
1585 {
1586         struct dt_object        *next = dt_object_child(dt);
1587         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1588         struct lod_object       *lo = lod_dt_obj(dt);
1589         int                     i;
1590         int                     rc;
1591         ENTRY;
1592
1593         if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1594                 struct lmv_user_md_v1 *lum;
1595
1596                 LASSERT(buf != NULL && buf->lb_buf != NULL);
1597                 lum = buf->lb_buf;
1598                 rc = lod_verify_md_striping(d, lum);
1599                 if (rc != 0)
1600                         RETURN(rc);
1601         }
1602
1603         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1604         if (rc != 0)
1605                 RETURN(rc);
1606
1607         /* set xattr to each stripes, if needed */
1608         rc = lod_load_striping(env, lo);
1609         if (rc != 0)
1610                 RETURN(rc);
1611
1612         if (lo->ldo_stripenr == 0)
1613                 RETURN(rc);
1614
1615         for (i = 0; i < lo->ldo_stripenr; i++) {
1616                 LASSERT(lo->ldo_stripe[i]);
1617                 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1618                                           name, fl, th);
1619                 if (rc != 0)
1620                         break;
1621         }
1622
1623         RETURN(rc);
1624 }
1625
1626 /*
1627  * LOV xattr is a storage for striping, and LOD owns this xattr.
1628  * but LOD allows others to control striping to some extent
1629  * - to reset strping
1630  * - to set new defined striping
1631  * - to set new semi-defined striping
1632  *   - number of stripes is defined
1633  *   - number of stripes + osts are defined
1634  *   - ??
1635  */
1636 static int lod_declare_xattr_set(const struct lu_env *env,
1637                                  struct dt_object *dt,
1638                                  const struct lu_buf *buf,
1639                                  const char *name, int fl,
1640                                  struct thandle *th)
1641 {
1642         struct dt_object *next = dt_object_child(dt);
1643         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
1644         __u32             mode;
1645         int               rc;
1646         ENTRY;
1647
1648         /*
1649          * allow to declare predefined striping on a new (!mode) object
1650          * which is supposed to be replay of regular file creation
1651          * (when LOV setting is declared)
1652          * LU_XATTR_REPLACE is set to indicate a layout swap
1653          */
1654         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1655         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1656              !(fl & LU_XATTR_REPLACE)) {
1657                 /*
1658                  * this is a request to manipulate object's striping
1659                  */
1660                 if (dt_object_exists(dt)) {
1661                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1662                         if (rc)
1663                                 RETURN(rc);
1664                 } else {
1665                         memset(attr, 0, sizeof(*attr));
1666                         attr->la_valid = LA_TYPE | LA_MODE;
1667                         attr->la_mode = S_IFREG;
1668                 }
1669                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1670         } else if (S_ISDIR(mode)) {
1671                 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1672         } else {
1673                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1674         }
1675
1676         RETURN(rc);
1677 }
1678
1679 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1680 {
1681         lo->ldo_striping_cached = 0;
1682         lo->ldo_def_striping_set = 0;
1683         lod_object_set_pool(lo, NULL);
1684         lo->ldo_def_stripe_size = 0;
1685         lo->ldo_def_stripenr = 0;
1686         if (lo->ldo_dir_stripe != NULL)
1687                 lo->ldo_dir_striping_cached = 0;
1688 }
1689
1690 static int lod_xattr_set_internal(const struct lu_env *env,
1691                                   struct dt_object *dt,
1692                                   const struct lu_buf *buf,
1693                                   const char *name, int fl, struct thandle *th,
1694                                   struct lustre_capa *capa)
1695 {
1696         struct dt_object        *next = dt_object_child(dt);
1697         struct lod_object       *lo = lod_dt_obj(dt);
1698         int                     rc;
1699         int                     i;
1700         ENTRY;
1701
1702         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1703         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1704                 RETURN(rc);
1705
1706         if (lo->ldo_stripenr == 0)
1707                 RETURN(rc);
1708
1709         for (i = 0; i < lo->ldo_stripenr; i++) {
1710                 LASSERT(lo->ldo_stripe[i]);
1711                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1712                                   capa);
1713                 if (rc != 0)
1714                         break;
1715         }
1716
1717         RETURN(rc);
1718 }
1719
1720 static int lod_xattr_del_internal(const struct lu_env *env,
1721                                   struct dt_object *dt,
1722                                   const char *name, struct thandle *th,
1723                                   struct lustre_capa *capa)
1724 {
1725         struct dt_object        *next = dt_object_child(dt);
1726         struct lod_object       *lo = lod_dt_obj(dt);
1727         int                     rc;
1728         int                     i;
1729         ENTRY;
1730
1731         rc = dt_xattr_del(env, next, name, th, capa);
1732         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1733                 RETURN(rc);
1734
1735         if (lo->ldo_stripenr == 0)
1736                 RETURN(rc);
1737
1738         for (i = 0; i < lo->ldo_stripenr; i++) {
1739                 LASSERT(lo->ldo_stripe[i]);
1740                 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1741                                   capa);
1742                 if (rc != 0)
1743                         break;
1744         }
1745
1746         RETURN(rc);
1747 }
1748
1749 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1750                                     struct dt_object *dt,
1751                                     const struct lu_buf *buf,
1752                                     const char *name, int fl,
1753                                     struct thandle *th,
1754                                     struct lustre_capa *capa)
1755 {
1756         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1757         struct lod_object       *l = lod_dt_obj(dt);
1758         struct lov_user_md_v1   *lum;
1759         struct lov_user_md_v3   *v3 = NULL;
1760         int                      rc;
1761         ENTRY;
1762
1763         /* If it is striped dir, we should clear the stripe cache for
1764          * slave stripe as well, but there are no effective way to
1765          * notify the LOD on the slave MDT, so we do not cache stripe
1766          * information for slave stripe for now. XXX*/
1767         lod_lov_stripe_cache_clear(l);
1768         LASSERT(buf != NULL && buf->lb_buf != NULL);
1769         lum = buf->lb_buf;
1770
1771         rc = lod_verify_striping(d, buf, 0);
1772         if (rc)
1773                 RETURN(rc);
1774
1775         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1776                 v3 = buf->lb_buf;
1777
1778         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1779          * (i.e. all default values specified) then delete default
1780          * striping from dir. */
1781         CDEBUG(D_OTHER,
1782                 "set default striping: sz %u # %u offset %d %s %s\n",
1783                 (unsigned)lum->lmm_stripe_size,
1784                 (unsigned)lum->lmm_stripe_count,
1785                 (int)lum->lmm_stripe_offset,
1786                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1787
1788         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1789                                 (lum->lmm_stripe_count),
1790                                 (lum->lmm_stripe_offset)) &&
1791                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1792                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1793                 if (rc == -ENODATA)
1794                         rc = 0;
1795         } else {
1796                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1797         }
1798
1799         RETURN(rc);
1800 }
1801
1802 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1803                                             struct dt_object *dt,
1804                                             const struct lu_buf *buf,
1805                                             const char *name, int fl,
1806                                             struct thandle *th,
1807                                             struct lustre_capa *capa)
1808 {
1809         struct lod_object       *l = lod_dt_obj(dt);
1810         struct lmv_user_md_v1   *lum;
1811         int                      rc;
1812         ENTRY;
1813
1814         LASSERT(buf != NULL && buf->lb_buf != NULL);
1815         lum = buf->lb_buf;
1816
1817         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1818               le32_to_cpu(lum->lum_stripe_count),
1819               (int)le32_to_cpu(lum->lum_stripe_offset));
1820
1821         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1822                                  le32_to_cpu(lum->lum_stripe_offset)) &&
1823                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1824                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1825                 if (rc == -ENODATA)
1826                         rc = 0;
1827         } else {
1828                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1829                 if (rc != 0)
1830                         RETURN(rc);
1831         }
1832
1833         /* Update default stripe cache */
1834         if (l->ldo_dir_stripe == NULL) {
1835                 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1836                 if (l->ldo_dir_stripe == NULL)
1837                         RETURN(-ENOMEM);
1838         }
1839
1840         l->ldo_dir_striping_cached = 0;
1841         l->ldo_dir_def_striping_set = 1;
1842         l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
1843
1844         RETURN(rc);
1845 }
1846
1847 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1848                              const struct lu_buf *buf, const char *name,
1849                              int fl, struct thandle *th,
1850                              struct lustre_capa *capa)
1851 {
1852         struct lod_object       *lo = lod_dt_obj(dt);
1853         struct lod_thread_info  *info = lod_env_info(env);
1854         struct lu_attr          *attr = &info->lti_attr;
1855         struct dt_object_format *dof = &info->lti_format;
1856         struct lu_buf           lmv_buf;
1857         struct lu_buf           slave_lmv_buf;
1858         struct lmv_mds_md_v1    *lmm;
1859         struct lmv_mds_md_v1    *slave_lmm = NULL;
1860         int                     i;
1861         int                     rc;
1862         ENTRY;
1863
1864         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1865                 RETURN(-ENOTDIR);
1866
1867         /* The stripes are supposed to be allocated in declare phase,
1868          * if there are no stripes being allocated, it will skip */
1869         if (lo->ldo_stripenr == 0)
1870                 RETURN(0);
1871
1872         rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
1873         if (rc != 0)
1874                 RETURN(rc);
1875
1876         attr->la_valid = LA_TYPE | LA_MODE;
1877         dof->dof_type = DFT_DIR;
1878
1879         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1880         if (rc != 0)
1881                 RETURN(rc);
1882         lmm = lmv_buf.lb_buf;
1883
1884         OBD_ALLOC_PTR(slave_lmm);
1885         if (slave_lmm == NULL)
1886                 RETURN(-ENOMEM);
1887
1888         lod_prep_slave_lmv_md(slave_lmm, lmm);
1889         slave_lmv_buf.lb_buf = slave_lmm;
1890         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1891
1892         for (i = 0; i < lo->ldo_stripenr; i++) {
1893                 struct dt_object *dto;
1894                 char             *stripe_name = info->lti_key;
1895
1896                 dto = lo->ldo_stripe[i];
1897                 dt_write_lock(env, dto, MOR_TGT_CHILD);
1898                 rc = dt_create(env, dto, attr, NULL, dof, th);
1899                 dt_write_unlock(env, dto);
1900                 if (rc != 0)
1901                         RETURN(rc);
1902
1903                 rc = dt_insert(env, dto,
1904                               (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1905                               (const struct dt_key *)dot, th, capa, 0);
1906                 if (rc != 0)
1907                         RETURN(rc);
1908
1909                 rc = dt_insert(env, dto,
1910                               (struct dt_rec *)lu_object_fid(&dt->do_lu),
1911                               (const struct dt_key *)dotdot, th, capa, 0);
1912                 if (rc != 0)
1913                         RETURN(rc);
1914
1915                 if (lo->ldo_striping_cached &&
1916                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1917                                          lo->ldo_def_stripenr,
1918                                          lo->ldo_def_stripe_offset)) {
1919                         struct lov_user_md_v3   *v3;
1920
1921                         /* sigh, lti_ea_store has been used for lmv_buf,
1922                          * so we have to allocate buffer for default
1923                          * stripe EA */
1924                         OBD_ALLOC_PTR(v3);
1925                         if (v3 == NULL)
1926                                 GOTO(out, rc);
1927
1928                         memset(v3, 0, sizeof(*v3));
1929                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1930                         v3->lmm_stripe_count =
1931                                 cpu_to_le16(lo->ldo_def_stripenr);
1932                         v3->lmm_stripe_offset =
1933                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1934                         v3->lmm_stripe_size =
1935                                 cpu_to_le32(lo->ldo_def_stripe_size);
1936                         if (lo->ldo_pool)
1937                                 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1938                                         LOV_MAXPOOLNAME);
1939
1940                         info->lti_buf.lb_buf = v3;
1941                         info->lti_buf.lb_len = sizeof(*v3);
1942                         rc = dt_xattr_set(env, dto, &info->lti_buf,
1943                                           XATTR_NAME_LOV, 0, th, capa);
1944                         OBD_FREE_PTR(v3);
1945                         if (rc != 0)
1946                                 GOTO(out, rc);
1947                 }
1948
1949                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1950                 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
1951                                   fl, th, capa);
1952                 if (rc != 0)
1953                         GOTO(out, rc);
1954
1955                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1956                          PFID(lu_object_fid(&dto->do_lu)), i);
1957                 rc = dt_insert(env, dt_object_child(dt),
1958                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1959                      (const struct dt_key *)stripe_name, th, capa, 0);
1960                 if (rc != 0)
1961                         GOTO(out, rc);
1962
1963                 rc = dt_ref_add(env, dt_object_child(dt), th);
1964                 if (rc != 0)
1965                         GOTO(out, rc);
1966         }
1967
1968         rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
1969                           fl, th, capa);
1970
1971 out:
1972         if (slave_lmm != NULL)
1973                 OBD_FREE_PTR(slave_lmm);
1974
1975         RETURN(rc);
1976 }
1977
1978 int lod_dir_striping_create_internal(const struct lu_env *env,
1979                                      struct dt_object *dt,
1980                                      struct lu_attr *attr,
1981                                      struct dt_object_format *dof,
1982                                      struct thandle *th,
1983                                      bool declare)
1984 {
1985         struct lod_thread_info  *info = lod_env_info(env);
1986         struct lod_object       *lo = lod_dt_obj(dt);
1987         int                     rc;
1988         ENTRY;
1989
1990         if (lo->ldo_dir_def_striping_set &&
1991             !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1992                                  lo->ldo_dir_stripe_offset)) {
1993                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1994                 int stripe_count = lo->ldo_stripenr;
1995
1996                 if (info->lti_ea_store_size < sizeof(*v1)) {
1997                         rc = lod_ea_store_resize(info, sizeof(*v1));
1998                         if (rc != 0)
1999                                 RETURN(rc);
2000                         v1 = info->lti_ea_store;
2001                 }
2002
2003                 memset(v1, 0, sizeof(*v1));
2004                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2005                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2006                 v1->lum_stripe_offset =
2007                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
2008
2009                 info->lti_buf.lb_buf = v1;
2010                 info->lti_buf.lb_len = sizeof(*v1);
2011
2012                 if (declare)
2013                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
2014                                                        &info->lti_buf, dof, th);
2015                 else
2016                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2017                                                XATTR_NAME_LMV, 0, th,
2018                                                BYPASS_CAPA);
2019                 if (rc != 0)
2020                         RETURN(rc);
2021         }
2022
2023         /* Transfer default LMV striping from the parent */
2024         if (lo->ldo_dir_striping_cached &&
2025             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2026                                  lo->ldo_dir_def_stripe_offset)) {
2027                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2028                 int def_stripe_count = lo->ldo_dir_def_stripenr;
2029
2030                 if (info->lti_ea_store_size < sizeof(*v1)) {
2031                         rc = lod_ea_store_resize(info, sizeof(*v1));
2032                         if (rc != 0)
2033                                 RETURN(rc);
2034                         v1 = info->lti_ea_store;
2035                 }
2036
2037                 memset(v1, 0, sizeof(*v1));
2038                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2039                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2040                 v1->lum_stripe_offset =
2041                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2042                 v1->lum_hash_type =
2043                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
2044
2045                 info->lti_buf.lb_buf = v1;
2046                 info->lti_buf.lb_len = sizeof(*v1);
2047                 if (declare)
2048                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2049                                                        XATTR_NAME_DEFAULT_LMV,
2050                                                        0, th);
2051                 else
2052                         rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2053                                                   &info->lti_buf,
2054                                                   XATTR_NAME_DEFAULT_LMV, 0,
2055                                                   th, BYPASS_CAPA);
2056                 if (rc != 0)
2057                         RETURN(rc);
2058         }
2059
2060         /* Transfer default LOV striping from the parent */
2061         if (lo->ldo_striping_cached &&
2062             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2063                                  lo->ldo_def_stripenr,
2064                                  lo->ldo_def_stripe_offset)) {
2065                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2066
2067                 if (info->lti_ea_store_size < sizeof(*v3)) {
2068                         rc = lod_ea_store_resize(info, sizeof(*v3));
2069                         if (rc != 0)
2070                                 RETURN(rc);
2071                         v3 = info->lti_ea_store;
2072                 }
2073
2074                 memset(v3, 0, sizeof(*v3));
2075                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2076                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2077                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2078                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2079                 if (lo->ldo_pool)
2080                         strncpy(v3->lmm_pool_name, lo->ldo_pool,
2081                                 LOV_MAXPOOLNAME);
2082
2083                 info->lti_buf.lb_buf = v3;
2084                 info->lti_buf.lb_len = sizeof(*v3);
2085
2086                 if (declare)
2087                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2088                                                        XATTR_NAME_LOV, 0, th);
2089                 else
2090                         rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2091                                                       XATTR_NAME_LOV, 0, th,
2092                                                       BYPASS_CAPA);
2093                 if (rc != 0)
2094                         RETURN(rc);
2095         }
2096
2097         RETURN(0);
2098 }
2099
2100 static int lod_declare_dir_striping_create(const struct lu_env *env,
2101                                            struct dt_object *dt,
2102                                            struct lu_attr *attr,
2103                                            struct dt_object_format *dof,
2104                                            struct thandle *th)
2105 {
2106         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2107 }
2108
2109 static int lod_dir_striping_create(const struct lu_env *env,
2110                                    struct dt_object *dt,
2111                                    struct lu_attr *attr,
2112                                    struct dt_object_format *dof,
2113                                    struct thandle *th)
2114 {
2115         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2116 }
2117
2118 static int lod_xattr_set(const struct lu_env *env,
2119                          struct dt_object *dt, const struct lu_buf *buf,
2120                          const char *name, int fl, struct thandle *th,
2121                          struct lustre_capa *capa)
2122 {
2123         struct dt_object        *next = dt_object_child(dt);
2124         int                      rc;
2125         ENTRY;
2126
2127         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2128             strcmp(name, XATTR_NAME_LMV) == 0) {
2129                 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2130
2131                 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2132                                                 LMV_HASH_FLAG_MIGRATION)
2133                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2134                 else
2135                         rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2136
2137                 RETURN(rc);
2138         }
2139
2140         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2141             strcmp(name, XATTR_NAME_LOV) == 0) {
2142                 /* default LOVEA */
2143                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2144                 RETURN(rc);
2145         } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2146                    strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2147                 /* default LMVEA */
2148                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2149                                                       th, capa);
2150                 RETURN(rc);
2151         } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2152                    !strcmp(name, XATTR_NAME_LOV)) {
2153                 /* in case of lov EA swap, just set it
2154                  * if not, it is a replay so check striping match what we
2155                  * already have during req replay, declare_xattr_set()
2156                  * defines striping, then create() does the work
2157                 */
2158                 if (fl & LU_XATTR_REPLACE) {
2159                         /* free stripes, then update disk */
2160                         lod_object_free_striping(env, lod_dt_obj(dt));
2161                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2162                 } else {
2163                         rc = lod_striping_create(env, dt, NULL, NULL, th);
2164                 }
2165                 RETURN(rc);
2166         }
2167
2168         /* then all other xattr */
2169         rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2170
2171         RETURN(rc);
2172 }
2173
2174 static int lod_declare_xattr_del(const struct lu_env *env,
2175                                  struct dt_object *dt, const char *name,
2176                                  struct thandle *th)
2177 {
2178         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2179 }
2180
2181 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2182                          const char *name, struct thandle *th,
2183                          struct lustre_capa *capa)
2184 {
2185         if (!strcmp(name, XATTR_NAME_LOV))
2186                 lod_object_free_striping(env, lod_dt_obj(dt));
2187         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2188 }
2189
2190 static int lod_xattr_list(const struct lu_env *env,
2191                           struct dt_object *dt, struct lu_buf *buf,
2192                           struct lustre_capa *capa)
2193 {
2194         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2195 }
2196
2197 int lod_object_set_pool(struct lod_object *o, char *pool)
2198 {
2199         int len;
2200
2201         if (o->ldo_pool) {
2202                 len = strlen(o->ldo_pool);
2203                 OBD_FREE(o->ldo_pool, len + 1);
2204                 o->ldo_pool = NULL;
2205         }
2206         if (pool) {
2207                 len = strlen(pool);
2208                 OBD_ALLOC(o->ldo_pool, len + 1);
2209                 if (o->ldo_pool == NULL)
2210                         return -ENOMEM;
2211                 strcpy(o->ldo_pool, pool);
2212         }
2213         return 0;
2214 }
2215
2216 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2217 {
2218         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2219 }
2220
2221
2222 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2223                                          struct lod_object *lp)
2224 {
2225         struct lod_thread_info  *info = lod_env_info(env);
2226         struct lov_user_md_v1   *v1 = NULL;
2227         struct lov_user_md_v3   *v3 = NULL;
2228         int                      rc;
2229         ENTRY;
2230
2231         /* called from MDD without parent being write locked,
2232          * lock it here */
2233         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2234         rc = lod_get_lov_ea(env, lp);
2235         if (rc < 0)
2236                 GOTO(unlock, rc);
2237
2238         if (rc < sizeof(struct lov_user_md)) {
2239                 /* don't lookup for non-existing or invalid striping */
2240                 lp->ldo_def_striping_set = 0;
2241                 lp->ldo_striping_cached = 1;
2242                 lp->ldo_def_stripe_size = 0;
2243                 lp->ldo_def_stripenr = 0;
2244                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2245                 GOTO(unlock, rc = 0);
2246         }
2247
2248         rc = 0;
2249         v1 = info->lti_ea_store;
2250         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2251                 lustre_swab_lov_user_md_v1(v1);
2252         } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2253                 v3 = (struct lov_user_md_v3 *)v1;
2254                 lustre_swab_lov_user_md_v3(v3);
2255         }
2256
2257         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2258                 GOTO(unlock, rc = 0);
2259
2260         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2261                 GOTO(unlock, rc = 0);
2262
2263         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2264                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2265                (int)v1->lmm_stripe_count,
2266                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2267
2268         lp->ldo_def_stripenr = v1->lmm_stripe_count;
2269         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2270         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2271         lp->ldo_striping_cached = 1;
2272         lp->ldo_def_striping_set = 1;
2273         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2274                 /* XXX: sanity check here */
2275                 v3 = (struct lov_user_md_v3 *) v1;
2276                 if (v3->lmm_pool_name[0])
2277                         lod_object_set_pool(lp, v3->lmm_pool_name);
2278         }
2279         EXIT;
2280 unlock:
2281         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2282         return rc;
2283 }
2284
2285
2286 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2287                                          struct lod_object *lp)
2288 {
2289         struct lod_thread_info  *info = lod_env_info(env);
2290         struct lmv_user_md_v1   *v1 = NULL;
2291         int                      rc;
2292         ENTRY;
2293
2294         /* called from MDD without parent being write locked,
2295          * lock it here */
2296         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2297         rc = lod_get_default_lmv_ea(env, lp);
2298         if (rc < 0)
2299                 GOTO(unlock, rc);
2300
2301         if (rc < sizeof(struct lmv_user_md)) {
2302                 /* don't lookup for non-existing or invalid striping */
2303                 lp->ldo_dir_def_striping_set = 0;
2304                 lp->ldo_dir_striping_cached = 1;
2305                 lp->ldo_dir_def_stripenr = 0;
2306                 lp->ldo_dir_def_stripe_offset =
2307                                         (typeof(v1->lum_stripe_offset))(-1);
2308                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2309                 GOTO(unlock, rc = 0);
2310         }
2311
2312         rc = 0;
2313         v1 = info->lti_ea_store;
2314
2315         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2316         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2317         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2318         lp->ldo_dir_def_striping_set = 1;
2319         lp->ldo_dir_striping_cached = 1;
2320
2321         EXIT;
2322 unlock:
2323         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2324         return rc;
2325 }
2326
2327 static int lod_cache_parent_striping(const struct lu_env *env,
2328                                      struct lod_object *lp,
2329                                      umode_t child_mode)
2330 {
2331         int rc = 0;
2332         ENTRY;
2333
2334         rc = lod_load_striping(env, lp);
2335         if (rc != 0)
2336                 RETURN(rc);
2337
2338         if (!lp->ldo_striping_cached) {
2339                 /* we haven't tried to get default striping for
2340                  * the directory yet, let's cache it in the object */
2341                 rc = lod_cache_parent_lov_striping(env, lp);
2342                 if (rc != 0)
2343                         RETURN(rc);
2344         }
2345
2346         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2347                 rc = lod_cache_parent_lmv_striping(env, lp);
2348
2349         RETURN(rc);
2350 }
2351
2352 /**
2353  * used to transfer default striping data to the object being created
2354  */
2355 static void lod_ah_init(const struct lu_env *env,
2356                         struct dt_allocation_hint *ah,
2357                         struct dt_object *parent,
2358                         struct dt_object *child,
2359                         umode_t child_mode)
2360 {
2361         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2362         struct dt_object  *nextp = NULL;
2363         struct dt_object  *nextc;
2364         struct lod_object *lp = NULL;
2365         struct lod_object *lc;
2366         struct lov_desc   *desc;
2367         int               rc;
2368         ENTRY;
2369
2370         LASSERT(child);
2371
2372         if (likely(parent)) {
2373                 nextp = dt_object_child(parent);
2374                 lp = lod_dt_obj(parent);
2375                 rc = lod_load_striping(env, lp);
2376                 if (rc != 0)
2377                         return;
2378         }
2379
2380         nextc = dt_object_child(child);
2381         lc = lod_dt_obj(child);
2382
2383         LASSERT(lc->ldo_stripenr == 0);
2384         LASSERT(lc->ldo_stripe == NULL);
2385
2386         /*
2387          * local object may want some hints
2388          * in case of late striping creation, ->ah_init()
2389          * can be called with local object existing
2390          */
2391         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2392                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2393                                           NULL : nextp, nextc, child_mode);
2394
2395         if (S_ISDIR(child_mode)) {
2396                 if (lc->ldo_dir_stripe == NULL) {
2397                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2398                         if (lc->ldo_dir_stripe == NULL)
2399                                 return;
2400                 }
2401
2402                 if (lp->ldo_dir_stripe == NULL) {
2403                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2404                         if (lp->ldo_dir_stripe == NULL)
2405                                 return;
2406                 }
2407
2408                 rc = lod_cache_parent_striping(env, lp, child_mode);
2409                 if (rc != 0)
2410                         return;
2411
2412                 /* transfer defaults to new directory */
2413                 if (lp->ldo_striping_cached) {
2414                         if (lp->ldo_pool)
2415                                 lod_object_set_pool(lc, lp->ldo_pool);
2416                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2417                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2418                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2419                         lc->ldo_striping_cached = 1;
2420                         lc->ldo_def_striping_set = 1;
2421                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2422                                (int)lc->ldo_def_stripe_size,
2423                                (int)lc->ldo_def_stripe_offset,
2424                                (int)lc->ldo_def_stripenr);
2425                 }
2426
2427                 /* transfer dir defaults to new directory */
2428                 if (lp->ldo_dir_striping_cached) {
2429                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2430                         lc->ldo_dir_def_stripe_offset =
2431                                                   lp->ldo_dir_def_stripe_offset;
2432                         lc->ldo_dir_def_hash_type =
2433                                                   lp->ldo_dir_def_hash_type;
2434                         lc->ldo_dir_striping_cached = 1;
2435                         lc->ldo_dir_def_striping_set = 1;
2436                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2437                                (int)lc->ldo_dir_def_stripenr,
2438                                (int)lc->ldo_dir_def_stripe_offset,
2439                                lc->ldo_dir_def_hash_type);
2440                 }
2441
2442                 /* If the directory is specified with certain stripes */
2443                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2444                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2445
2446                         rc = lod_verify_md_striping(d, lum1);
2447                         if (rc == 0 &&
2448                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2449                                 /* Directory will be striped only if
2450                                  * stripe_count > 1 */
2451                                 lc->ldo_stripenr =
2452                                         le32_to_cpu(lum1->lum_stripe_count);
2453                                 lc->ldo_dir_stripe_offset =
2454                                         le32_to_cpu(lum1->lum_stripe_offset);
2455                                 lc->ldo_dir_hash_type =
2456                                         le32_to_cpu(lum1->lum_hash_type);
2457                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2458                                        lc->ldo_stripenr,
2459                                        (int)lc->ldo_dir_stripe_offset);
2460                         }
2461                 } else if (lp->ldo_dir_def_striping_set) {
2462                         /* If there are default dir stripe from parent */
2463                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2464                         lc->ldo_dir_stripe_offset =
2465                                         lp->ldo_dir_def_stripe_offset;
2466                         lc->ldo_dir_hash_type =
2467                                         lp->ldo_dir_def_hash_type;
2468                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2469                                lc->ldo_stripenr,
2470                                (int)lc->ldo_dir_stripe_offset);
2471                 } else {
2472                         /* set default stripe for this directory */
2473                         lc->ldo_stripenr = 0;
2474                         lc->ldo_dir_stripe_offset = -1;
2475                 }
2476
2477                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2478                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2479
2480                 goto out;
2481         }
2482
2483         /*
2484          * if object is going to be striped over OSTs, transfer default
2485          * striping information to the child, so that we can use it
2486          * during declaration and creation
2487          */
2488         if (!lod_object_will_be_striped(S_ISREG(child_mode),
2489                                         lu_object_fid(&child->do_lu)))
2490                 goto out;
2491         /*
2492          * try from the parent
2493          */
2494         if (likely(parent)) {
2495                 lod_cache_parent_striping(env, lp, child_mode);
2496
2497                 lc->ldo_def_stripe_offset = (__u16) -1;
2498
2499                 if (lp->ldo_def_striping_set) {
2500                         if (lp->ldo_pool)
2501                                 lod_object_set_pool(lc, lp->ldo_pool);
2502                         lc->ldo_stripenr = lp->ldo_def_stripenr;
2503                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2504                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2505                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2506                                lc->ldo_stripenr, lc->ldo_stripe_size,
2507                                lp->ldo_pool ? lp->ldo_pool : "");
2508                 }
2509         }
2510
2511         /*
2512          * if the parent doesn't provide with specific pattern, grab fs-wide one
2513          */
2514         desc = &d->lod_desc;
2515         if (lc->ldo_stripenr == 0)
2516                 lc->ldo_stripenr = desc->ld_default_stripe_count;
2517         if (lc->ldo_stripe_size == 0)
2518                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2519         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2520                lc->ldo_stripenr, lc->ldo_stripe_size,
2521                lc->ldo_pool ? lc->ldo_pool : "");
2522
2523 out:
2524         /* we do not cache stripe information for slave stripe, see
2525          * lod_xattr_set_lov_on_dir */
2526         if (lp != NULL && lp->ldo_dir_slave_stripe)
2527                 lod_lov_stripe_cache_clear(lp);
2528
2529         EXIT;
2530 }
2531
2532 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
2533 /*
2534  * this function handles a special case when truncate was done
2535  * on a stripeless object and now striping is being created
2536  * we can't lose that size, so we have to propagate it to newly
2537  * created object
2538  */
2539 static int lod_declare_init_size(const struct lu_env *env,
2540                                  struct dt_object *dt, struct thandle *th)
2541 {
2542         struct dt_object   *next = dt_object_child(dt);
2543         struct lod_object  *lo = lod_dt_obj(dt);
2544         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
2545         uint64_t            size, offs;
2546         int                 rc, stripe;
2547         ENTRY;
2548
2549         /* XXX: we support the simplest (RAID0) striping so far */
2550         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2551         LASSERT(lo->ldo_stripe_size > 0);
2552
2553         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2554         LASSERT(attr->la_valid & LA_SIZE);
2555         if (rc)
2556                 RETURN(rc);
2557
2558         size = attr->la_size;
2559         if (size == 0)
2560                 RETURN(0);
2561
2562         /* ll_do_div64(a, b) returns a % b, and a = a / b */
2563         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2564         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2565
2566         size = size * lo->ldo_stripe_size;
2567         offs = attr->la_size;
2568         size += ll_do_div64(offs, lo->ldo_stripe_size);
2569
2570         attr->la_valid = LA_SIZE;
2571         attr->la_size = size;
2572
2573         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2574
2575         RETURN(rc);
2576 }
2577
2578 /**
2579  * Create declaration of striped object
2580  */
2581 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2582                                struct lu_attr *attr,
2583                                const struct lu_buf *lovea, struct thandle *th)
2584 {
2585         struct lod_thread_info  *info = lod_env_info(env);
2586         struct dt_object        *next = dt_object_child(dt);
2587         struct lod_object       *lo = lod_dt_obj(dt);
2588         int                      rc;
2589         ENTRY;
2590
2591         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2592                 /* failed to create striping, let's reset
2593                  * config so that others don't get confused */
2594                 lod_object_free_striping(env, lo);
2595                 GOTO(out, rc = -ENOMEM);
2596         }
2597
2598         if (!dt_object_remote(next)) {
2599                 /* choose OST and generate appropriate objects */
2600                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2601                 if (rc) {
2602                         /* failed to create striping, let's reset
2603                          * config so that others don't get confused */
2604                         lod_object_free_striping(env, lo);
2605                         GOTO(out, rc);
2606                 }
2607
2608                 /*
2609                  * declare storage for striping data
2610                  */
2611                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2612                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
2613         } else {
2614                 /* LOD can not choose OST objects for remote objects, i.e.
2615                  * stripes must be ready before that. Right now, it can only
2616                  * happen during migrate, i.e. migrate process needs to create
2617                  * remote regular file (mdd_migrate_create), then the migrate
2618                  * process will provide stripeEA. */
2619                 LASSERT(lovea != NULL);
2620                 info->lti_buf = *lovea;
2621         }
2622
2623         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2624                                   XATTR_NAME_LOV, 0, th);
2625         if (rc)
2626                 GOTO(out, rc);
2627
2628         /*
2629          * if striping is created with local object's size > 0,
2630          * we have to propagate this size to specific object
2631          * the case is possible only when local object was created previously
2632          */
2633         if (dt_object_exists(next))
2634                 rc = lod_declare_init_size(env, dt, th);
2635
2636 out:
2637         RETURN(rc);
2638 }
2639
2640 static int lod_declare_object_create(const struct lu_env *env,
2641                                      struct dt_object *dt,
2642                                      struct lu_attr *attr,
2643                                      struct dt_allocation_hint *hint,
2644                                      struct dt_object_format *dof,
2645                                      struct thandle *th)
2646 {
2647         struct dt_object   *next = dt_object_child(dt);
2648         struct lod_object  *lo = lod_dt_obj(dt);
2649         int                 rc;
2650         ENTRY;
2651
2652         LASSERT(dof);
2653         LASSERT(attr);
2654         LASSERT(th);
2655
2656         /*
2657          * first of all, we declare creation of local object
2658          */
2659         rc = dt_declare_create(env, next, attr, hint, dof, th);
2660         if (rc)
2661                 GOTO(out, rc);
2662
2663         if (dof->dof_type == DFT_SYM)
2664                 dt->do_body_ops = &lod_body_lnk_ops;
2665
2666         /*
2667          * it's lod_ah_init() who has decided the object will striped
2668          */
2669         if (dof->dof_type == DFT_REGULAR) {
2670                 /* callers don't want stripes */
2671                 /* XXX: all tricky interactions with ->ah_make_hint() decided
2672                  * to use striping, then ->declare_create() behaving differently
2673                  * should be cleaned */
2674                 if (dof->u.dof_reg.striped == 0)
2675                         lo->ldo_stripenr = 0;
2676                 if (lo->ldo_stripenr > 0)
2677                         rc = lod_declare_striped_object(env, dt, attr,
2678                                                         NULL, th);
2679         } else if (dof->dof_type == DFT_DIR) {
2680                 /* Orphan object (like migrating object) does not have
2681                  * lod_dir_stripe, see lod_ah_init */
2682                 if (lo->ldo_dir_stripe != NULL)
2683                         rc = lod_declare_dir_striping_create(env, dt, attr,
2684                                                              dof, th);
2685         }
2686 out:
2687         RETURN(rc);
2688 }
2689
2690 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2691                         struct lu_attr *attr, struct dt_object_format *dof,
2692                         struct thandle *th)
2693 {
2694         struct lod_object *lo = lod_dt_obj(dt);
2695         int                rc = 0, i;
2696         ENTRY;
2697
2698         LASSERT(lo->ldo_striping_cached == 0);
2699
2700         /* create all underlying objects */
2701         for (i = 0; i < lo->ldo_stripenr; i++) {
2702                 LASSERT(lo->ldo_stripe[i]);
2703                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2704
2705                 if (rc)
2706                         break;
2707         }
2708         if (rc == 0)
2709                 rc = lod_generate_and_set_lovea(env, lo, th);
2710
2711         RETURN(rc);
2712 }
2713
2714 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2715                              struct lu_attr *attr,
2716                              struct dt_allocation_hint *hint,
2717                              struct dt_object_format *dof, struct thandle *th)
2718 {
2719         struct dt_object   *next = dt_object_child(dt);
2720         struct lod_object  *lo = lod_dt_obj(dt);
2721         int                 rc;
2722         ENTRY;
2723
2724         /* create local object */
2725         rc = dt_create(env, next, attr, hint, dof, th);
2726         if (rc != 0)
2727                 RETURN(rc);
2728
2729         if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2730             lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2731                 rc = lod_striping_create(env, dt, attr, dof, th);
2732
2733         RETURN(rc);
2734 }
2735
2736 static int lod_declare_object_destroy(const struct lu_env *env,
2737                                       struct dt_object *dt,
2738                                       struct thandle *th)
2739 {
2740         struct dt_object   *next = dt_object_child(dt);
2741         struct lod_object  *lo = lod_dt_obj(dt);
2742         struct lod_thread_info *info = lod_env_info(env);
2743         char               *stripe_name = info->lti_key;
2744         int                 rc, i;
2745         ENTRY;
2746
2747         /*
2748          * load striping information, notice we don't do this when object
2749          * is being initialized as we don't need this information till
2750          * few specific cases like destroy, chown
2751          */
2752         rc = lod_load_striping(env, lo);
2753         if (rc)
2754                 RETURN(rc);
2755
2756         /* declare destroy for all underlying objects */
2757         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2758                 rc = next->do_ops->do_index_try(env, next,
2759                                                 &dt_directory_features);
2760                 if (rc != 0)
2761                         RETURN(rc);
2762
2763                 for (i = 0; i < lo->ldo_stripenr; i++) {
2764                         rc = dt_declare_ref_del(env, next, th);
2765                         if (rc != 0)
2766                                 RETURN(rc);
2767                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2768                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2769                                 i);
2770                         rc = dt_declare_delete(env, next,
2771                                         (const struct dt_key *)stripe_name, th);
2772                         if (rc != 0)
2773                                 RETURN(rc);
2774                 }
2775         }
2776         /*
2777          * we declare destroy for the local object
2778          */
2779         rc = dt_declare_destroy(env, next, th);
2780         if (rc)
2781                 RETURN(rc);
2782
2783         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2784                 RETURN(0);
2785
2786         /* declare destroy all striped objects */
2787         for (i = 0; i < lo->ldo_stripenr; i++) {
2788                 if (likely(lo->ldo_stripe[i] != NULL)) {
2789                         rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2790                         if (rc != 0)
2791                                 break;
2792                 }
2793         }
2794
2795         RETURN(rc);
2796 }
2797
2798 static int lod_object_destroy(const struct lu_env *env,
2799                 struct dt_object *dt, struct thandle *th)
2800 {
2801         struct dt_object  *next = dt_object_child(dt);
2802         struct lod_object *lo = lod_dt_obj(dt);
2803         struct lod_thread_info *info = lod_env_info(env);
2804         char               *stripe_name = info->lti_key;
2805         int                rc, i;
2806         ENTRY;
2807
2808         /* destroy sub-stripe of master object */
2809         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2810                 rc = next->do_ops->do_index_try(env, next,
2811                                                 &dt_directory_features);
2812                 if (rc != 0)
2813                         RETURN(rc);
2814
2815                 for (i = 0; i < lo->ldo_stripenr; i++) {
2816                         rc = dt_ref_del(env, next, th);
2817                         if (rc != 0)
2818                                 RETURN(rc);
2819
2820                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2821                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2822                                 i);
2823
2824                         CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
2825                                PFID(lu_object_fid(&dt->do_lu)), stripe_name,
2826                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
2827
2828                         rc = dt_delete(env, next,
2829                                        (const struct dt_key *)stripe_name,
2830                                        th, BYPASS_CAPA);
2831                         if (rc != 0)
2832                                 RETURN(rc);
2833                 }
2834         }
2835         rc = dt_destroy(env, next, th);
2836         if (rc != 0)
2837                 RETURN(rc);
2838
2839         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2840                 RETURN(0);
2841
2842         /* destroy all striped objects */
2843         for (i = 0; i < lo->ldo_stripenr; i++) {
2844                 if (likely(lo->ldo_stripe[i] != NULL) &&
2845                     (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
2846                      i == cfs_fail_val)) {
2847                         rc = dt_destroy(env, lo->ldo_stripe[i], th);
2848                         if (rc != 0)
2849                                 break;
2850                 }
2851         }
2852
2853         RETURN(rc);
2854 }
2855
2856 static int lod_declare_ref_add(const struct lu_env *env,
2857                                struct dt_object *dt, struct thandle *th)
2858 {
2859         return dt_declare_ref_add(env, dt_object_child(dt), th);
2860 }
2861
2862 static int lod_ref_add(const struct lu_env *env,
2863                        struct dt_object *dt, struct thandle *th)
2864 {
2865         return dt_ref_add(env, dt_object_child(dt), th);
2866 }
2867
2868 static int lod_declare_ref_del(const struct lu_env *env,
2869                                struct dt_object *dt, struct thandle *th)
2870 {
2871         return dt_declare_ref_del(env, dt_object_child(dt), th);
2872 }
2873
2874 static int lod_ref_del(const struct lu_env *env,
2875                        struct dt_object *dt, struct thandle *th)
2876 {
2877         return dt_ref_del(env, dt_object_child(dt), th);
2878 }
2879
2880 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2881                                      struct dt_object *dt,
2882                                      struct lustre_capa *old, __u64 opc)
2883 {
2884         return dt_capa_get(env, dt_object_child(dt), old, opc);
2885 }
2886
2887 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
2888                            __u64 start, __u64 end)
2889 {
2890         return dt_object_sync(env, dt_object_child(dt), start, end);
2891 }
2892
2893 struct lod_slave_locks  {
2894         int                     lsl_lock_count;
2895         struct lustre_handle    lsl_handle[0];
2896 };
2897
2898 static int lod_object_unlock_internal(const struct lu_env *env,
2899                                       struct dt_object *dt,
2900                                       struct ldlm_enqueue_info *einfo,
2901                                       ldlm_policy_data_t *policy)
2902 {
2903         struct lod_object       *lo = lod_dt_obj(dt);
2904         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2905         int                     rc = 0;
2906         int                     i;
2907         ENTRY;
2908
2909         if (slave_locks == NULL)
2910                 RETURN(0);
2911
2912         for (i = 1; i < slave_locks->lsl_lock_count; i++) {
2913                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2914                         int     rc1;
2915
2916                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2917                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2918                                                policy);
2919                         if (rc1 < 0)
2920                                 rc = rc == 0 ? rc1 : rc;
2921                 }
2922         }
2923
2924         RETURN(rc);
2925 }
2926
2927 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2928                              struct ldlm_enqueue_info *einfo,
2929                              union ldlm_policy_data *policy)
2930 {
2931         struct lod_object       *lo = lod_dt_obj(dt);
2932         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2933         int                     slave_locks_size;
2934         int                     rc;
2935         ENTRY;
2936
2937         if (slave_locks == NULL)
2938                 RETURN(0);
2939
2940         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2941                 RETURN(-ENOTDIR);
2942
2943         rc = lod_load_striping(env, lo);
2944         if (rc != 0)
2945                 RETURN(rc);
2946
2947         /* Note: for remote lock for single stripe dir, MDT will cancel
2948          * the lock by lockh directly */
2949         if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
2950                 RETURN(0);
2951
2952         /* Only cancel slave lock for striped dir */
2953         rc = lod_object_unlock_internal(env, dt, einfo, policy);
2954
2955         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2956                            sizeof(slave_locks->lsl_handle[0]);
2957         OBD_FREE(slave_locks, slave_locks_size);
2958         einfo->ei_cbdata = NULL;
2959
2960         RETURN(rc);
2961 }
2962
2963 static int lod_object_lock(const struct lu_env *env,
2964                            struct dt_object *dt,
2965                            struct lustre_handle *lh,
2966                            struct ldlm_enqueue_info *einfo,
2967                            union ldlm_policy_data *policy)
2968 {
2969         struct lod_object       *lo = lod_dt_obj(dt);
2970         int                     rc = 0;
2971         int                     i;
2972         int                     slave_locks_size;
2973         struct lod_slave_locks  *slave_locks = NULL;
2974         ENTRY;
2975
2976         /* remote object lock */
2977         if (!einfo->ei_enq_slave) {
2978                 LASSERT(dt_object_remote(dt));
2979                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2980                                       policy);
2981         }
2982
2983         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2984                 RETURN(-ENOTDIR);
2985
2986         rc = lod_load_striping(env, lo);
2987         if (rc != 0)
2988                 RETURN(rc);
2989
2990         /* No stripes */
2991         if (lo->ldo_stripenr <= 1)
2992                 RETURN(0);
2993
2994         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2995                            sizeof(slave_locks->lsl_handle[0]);
2996         /* Freed in lod_object_unlock */
2997         OBD_ALLOC(slave_locks, slave_locks_size);
2998         if (slave_locks == NULL)
2999                 RETURN(-ENOMEM);
3000         slave_locks->lsl_lock_count = lo->ldo_stripenr;
3001
3002         /* striped directory lock */
3003         for (i = 1; i < lo->ldo_stripenr; i++) {
3004                 struct lustre_handle    lockh;
3005                 struct ldlm_res_id      *res_id;
3006
3007                 res_id = &lod_env_info(env)->lti_res_id;
3008                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3009                                        res_id);
3010                 einfo->ei_res_id = res_id;
3011
3012                 LASSERT(lo->ldo_stripe[i]);
3013                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3014                                     policy);
3015                 if (rc != 0)
3016                         GOTO(out, rc);
3017                 slave_locks->lsl_handle[i] = lockh;
3018         }
3019
3020         einfo->ei_cbdata = slave_locks;
3021
3022 out:
3023         if (rc != 0 && slave_locks != NULL) {
3024                 einfo->ei_cbdata = slave_locks;
3025                 lod_object_unlock_internal(env, dt, einfo, policy);
3026                 OBD_FREE(slave_locks, slave_locks_size);
3027                 einfo->ei_cbdata = NULL;
3028         }
3029
3030         RETURN(rc);
3031 }
3032
3033 struct dt_object_operations lod_obj_ops = {
3034         .do_read_lock           = lod_object_read_lock,
3035         .do_write_lock          = lod_object_write_lock,
3036         .do_read_unlock         = lod_object_read_unlock,
3037         .do_write_unlock        = lod_object_write_unlock,
3038         .do_write_locked        = lod_object_write_locked,
3039         .do_attr_get            = lod_attr_get,
3040         .do_declare_attr_set    = lod_declare_attr_set,
3041         .do_attr_set            = lod_attr_set,
3042         .do_xattr_get           = lod_xattr_get,
3043         .do_declare_xattr_set   = lod_declare_xattr_set,
3044         .do_xattr_set           = lod_xattr_set,
3045         .do_declare_xattr_del   = lod_declare_xattr_del,
3046         .do_xattr_del           = lod_xattr_del,
3047         .do_xattr_list          = lod_xattr_list,
3048         .do_ah_init             = lod_ah_init,
3049         .do_declare_create      = lod_declare_object_create,
3050         .do_create              = lod_object_create,
3051         .do_declare_destroy     = lod_declare_object_destroy,
3052         .do_destroy             = lod_object_destroy,
3053         .do_index_try           = lod_index_try,
3054         .do_declare_ref_add     = lod_declare_ref_add,
3055         .do_ref_add             = lod_ref_add,
3056         .do_declare_ref_del     = lod_declare_ref_del,
3057         .do_ref_del             = lod_ref_del,
3058         .do_capa_get            = lod_capa_get,
3059         .do_object_sync         = lod_object_sync,
3060         .do_object_lock         = lod_object_lock,
3061         .do_object_unlock       = lod_object_unlock,
3062 };
3063
3064 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3065                         struct lu_buf *buf, loff_t *pos,
3066                         struct lustre_capa *capa)
3067 {
3068         struct dt_object *next = dt_object_child(dt);
3069         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3070 }
3071
3072 static ssize_t lod_declare_write(const struct lu_env *env,
3073                                  struct dt_object *dt,
3074                                  const struct lu_buf *buf, loff_t pos,
3075                                  struct thandle *th)
3076 {
3077         return dt_declare_record_write(env, dt_object_child(dt),
3078                                        buf, pos, th);
3079 }
3080
3081 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3082                          const struct lu_buf *buf, loff_t *pos,
3083                          struct thandle *th, struct lustre_capa *capa, int iq)
3084 {
3085         struct dt_object *next = dt_object_child(dt);
3086         LASSERT(next);
3087         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3088 }
3089
3090 static const struct dt_body_operations lod_body_lnk_ops = {
3091         .dbo_read               = lod_read,
3092         .dbo_declare_write      = lod_declare_write,
3093         .dbo_write              = lod_write
3094 };
3095
3096 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3097                            const struct lu_object_conf *conf)
3098 {
3099         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
3100         struct lu_device        *cdev   = NULL;
3101         struct lu_object        *cobj;
3102         struct lod_tgt_descs    *ltd    = NULL;
3103         struct lod_tgt_desc     *tgt;
3104         mdsno_t                  idx    = 0;
3105         int                      type   = LU_SEQ_RANGE_ANY;
3106         int                      rc;
3107         ENTRY;
3108
3109         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3110         if (rc != 0)
3111                 RETURN(rc);
3112
3113         if (type == LU_SEQ_RANGE_MDT &&
3114             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3115                 cdev = &lod->lod_child->dd_lu_dev;
3116         } else if (type == LU_SEQ_RANGE_MDT) {
3117                 ltd = &lod->lod_mdt_descs;
3118                 lod_getref(ltd);
3119         } else if (type == LU_SEQ_RANGE_OST) {
3120                 ltd = &lod->lod_ost_descs;
3121                 lod_getref(ltd);
3122         } else {
3123                 LBUG();
3124         }
3125
3126         if (ltd != NULL) {
3127                 if (ltd->ltd_tgts_size > idx &&
3128                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3129                         tgt = LTD_TGT(ltd, idx);
3130
3131                         LASSERT(tgt != NULL);
3132                         LASSERT(tgt->ltd_tgt != NULL);
3133
3134                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
3135                 }
3136                 lod_putref(lod, ltd);
3137         }
3138
3139         if (unlikely(cdev == NULL))
3140                 RETURN(-ENOENT);
3141
3142         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3143         if (unlikely(cobj == NULL))
3144                 RETURN(-ENOMEM);
3145
3146         lu_object_add(lo, cobj);
3147
3148         RETURN(0);
3149 }
3150
3151 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3152 {
3153         int i;
3154
3155         if (lo->ldo_dir_stripe != NULL) {
3156                 OBD_FREE_PTR(lo->ldo_dir_stripe);
3157                 lo->ldo_dir_stripe = NULL;
3158         }
3159
3160         if (lo->ldo_stripe) {
3161                 LASSERT(lo->ldo_stripes_allocated > 0);
3162
3163                 for (i = 0; i < lo->ldo_stripenr; i++) {
3164                         if (lo->ldo_stripe[i])
3165                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3166                 }
3167
3168                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3169                 OBD_FREE(lo->ldo_stripe, i);
3170                 lo->ldo_stripe = NULL;
3171                 lo->ldo_stripes_allocated = 0;
3172         }
3173         lo->ldo_stripenr = 0;
3174         lo->ldo_pattern = 0;
3175 }
3176
3177 /*
3178  * ->start is called once all slices are initialized, including header's
3179  * cache for mode (object type). using the type we can initialize ops
3180  */
3181 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3182 {
3183         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3184                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3185         return 0;
3186 }
3187
3188 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3189 {
3190         struct lod_object *mo = lu2lod_obj(o);
3191
3192         /*
3193          * release all underlying object pinned
3194          */
3195
3196         lod_object_free_striping(env, mo);
3197
3198         lod_object_set_pool(mo, NULL);
3199
3200         lu_object_fini(o);
3201         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3202 }
3203
3204 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3205 {
3206         /* XXX: shouldn't we release everything here in case if object
3207          * creation failed before? */
3208 }
3209
3210 static int lod_object_print(const struct lu_env *env, void *cookie,
3211                             lu_printer_t p, const struct lu_object *l)
3212 {
3213         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3214
3215         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3216 }
3217
3218 struct lu_object_operations lod_lu_obj_ops = {
3219         .loo_object_init        = lod_object_init,
3220         .loo_object_start       = lod_object_start,
3221         .loo_object_free        = lod_object_free,
3222         .loo_object_release     = lod_object_release,
3223         .loo_object_print       = lod_object_print,
3224 };