Whamcloud - gitweb
LU-4690 lod: separate master object with master stripe
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47
48 #include "lod_internal.h"
49
50 static const char dot[] = ".";
51 static const char dotdot[] = "..";
52
53 extern struct kmem_cache *lod_object_kmem;
54 static const struct dt_body_operations lod_body_lnk_ops;
55
56 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
57                             struct dt_rec *rec, const struct dt_key *key,
58                             struct lustre_capa *capa)
59 {
60         struct dt_object *next = dt_object_child(dt);
61         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
62 }
63
64 static int lod_declare_index_insert(const struct lu_env *env,
65                                     struct dt_object *dt,
66                                     const struct dt_rec *rec,
67                                     const struct dt_key *key,
68                                     struct thandle *handle)
69 {
70         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
71 }
72
73 static int lod_index_insert(const struct lu_env *env,
74                             struct dt_object *dt,
75                             const struct dt_rec *rec,
76                             const struct dt_key *key,
77                             struct thandle *th,
78                             struct lustre_capa *capa,
79                             int ign)
80 {
81         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
82 }
83
84 static int lod_declare_index_delete(const struct lu_env *env,
85                                     struct dt_object *dt,
86                                     const struct dt_key *key,
87                                     struct thandle *th)
88 {
89         return dt_declare_delete(env, dt_object_child(dt), key, th);
90 }
91
92 static int lod_index_delete(const struct lu_env *env,
93                             struct dt_object *dt,
94                             const struct dt_key *key,
95                             struct thandle *th,
96                             struct lustre_capa *capa)
97 {
98         return dt_delete(env, dt_object_child(dt), key, th, capa);
99 }
100
101 static struct dt_it *lod_it_init(const struct lu_env *env,
102                                  struct dt_object *dt, __u32 attr,
103                                  struct lustre_capa *capa)
104 {
105         struct dt_object        *next = dt_object_child(dt);
106         struct lod_it           *it = &lod_env_info(env)->lti_it;
107         struct dt_it            *it_next;
108
109
110         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
111         if (IS_ERR(it_next))
112                 return it_next;
113
114         /* currently we do not use more than one iterator per thread
115          * so we store it in thread info. if at some point we need
116          * more active iterators in a single thread, we can allocate
117          * additional ones */
118         LASSERT(it->lit_obj == NULL);
119
120         it->lit_it = it_next;
121         it->lit_obj = next;
122
123         return (struct dt_it *)it;
124 }
125
126 #define LOD_CHECK_IT(env, it)                                   \
127 do {                                                            \
128         LASSERT((it)->lit_obj != NULL);                         \
129         LASSERT((it)->lit_it != NULL);                          \
130 } while (0)
131
132 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
133 {
134         struct lod_it *it = (struct lod_it *)di;
135
136         LOD_CHECK_IT(env, it);
137         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
138
139         /* the iterator not in use any more */
140         it->lit_obj = NULL;
141         it->lit_it = NULL;
142 }
143
144 int lod_it_get(const struct lu_env *env, struct dt_it *di,
145                const struct dt_key *key)
146 {
147         const struct lod_it *it = (const struct lod_it *)di;
148
149         LOD_CHECK_IT(env, it);
150         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
151 }
152
153 void lod_it_put(const struct lu_env *env, struct dt_it *di)
154 {
155         struct lod_it *it = (struct lod_it *)di;
156
157         LOD_CHECK_IT(env, it);
158         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
159 }
160
161 int lod_it_next(const struct lu_env *env, struct dt_it *di)
162 {
163         struct lod_it *it = (struct lod_it *)di;
164
165         LOD_CHECK_IT(env, it);
166         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
167 }
168
169 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
170 {
171         const struct lod_it *it = (const struct lod_it *)di;
172
173         LOD_CHECK_IT(env, it);
174         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
175 }
176
177 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
178 {
179         struct lod_it *it = (struct lod_it *)di;
180
181         LOD_CHECK_IT(env, it);
182         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
183 }
184
185 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
186                struct dt_rec *rec, __u32 attr)
187 {
188         const struct lod_it *it = (const struct lod_it *)di;
189
190         LOD_CHECK_IT(env, it);
191         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
192                                                      attr);
193 }
194
195 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
196                     __u32 attr)
197 {
198         const struct lod_it *it = (const struct lod_it *)di;
199
200         LOD_CHECK_IT(env, it);
201         return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
202                                                           attr);
203 }
204
205 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
206 {
207         const struct lod_it *it = (const struct lod_it *)di;
208
209         LOD_CHECK_IT(env, it);
210         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
211 }
212
213 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
214 {
215         const struct lod_it *it = (const struct lod_it *)di;
216
217         LOD_CHECK_IT(env, it);
218         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
219 }
220
221 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
222                    void *key_rec)
223 {
224         const struct lod_it *it = (const struct lod_it *)di;
225
226         LOD_CHECK_IT(env, it);
227         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
228                                                          key_rec);
229 }
230
231 static struct dt_index_operations lod_index_ops = {
232         .dio_lookup             = lod_index_lookup,
233         .dio_declare_insert     = lod_declare_index_insert,
234         .dio_insert             = lod_index_insert,
235         .dio_declare_delete     = lod_declare_index_delete,
236         .dio_delete             = lod_index_delete,
237         .dio_it = {
238                 .init           = lod_it_init,
239                 .fini           = lod_it_fini,
240                 .get            = lod_it_get,
241                 .put            = lod_it_put,
242                 .next           = lod_it_next,
243                 .key            = lod_it_key,
244                 .key_size       = lod_it_key_size,
245                 .rec            = lod_it_rec,
246                 .rec_size       = lod_it_rec_size,
247                 .store          = lod_it_store,
248                 .load           = lod_it_load,
249                 .key_rec        = lod_it_key_rec,
250         }
251 };
252
253 /**
254  * Implementation of dt_index_operations:: dio_it.init
255  *
256  * This function is to initialize the iterator for striped directory,
257  * basically these lod_striped_it_xxx will just locate the stripe
258  * and call the correspondent api of its next lower layer.
259  *
260  * \param[in] env       execution environment.
261  * \param[in] dt        the striped directory object to be iterated.
262  * \param[in] attr      the attribute of iterator, mostly used to indicate
263  *                      the entry attribute in the object to be iterated.
264  * \param[in] capa      capability(useless in current implementation)
265  *
266  * \retval      initialized iterator(dt_it) if successful initialize the
267  *              iteration. lit_stripe_index will be used to indicate the
268  *              current iterate position among stripes.
269  * \retval      ERR pointer if initialization is failed.
270  */
271 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
272                                          struct dt_object *dt, __u32 attr,
273                                          struct lustre_capa *capa)
274 {
275         struct lod_object       *lo = lod_dt_obj(dt);
276         struct dt_object        *next;
277         struct lod_it           *it = &lod_env_info(env)->lti_it;
278         struct dt_it            *it_next;
279         ENTRY;
280
281         LASSERT(lo->ldo_stripenr > 0);
282         next = lo->ldo_stripe[0];
283         LASSERT(next != NULL);
284         LASSERT(next->do_index_ops != NULL);
285
286         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
287         if (IS_ERR(it_next))
288                 return it_next;
289
290         /* currently we do not use more than one iterator per thread
291          * so we store it in thread info. if at some point we need
292          * more active iterators in a single thread, we can allocate
293          * additional ones */
294         LASSERT(it->lit_obj == NULL);
295
296         it->lit_stripe_index = 0;
297         it->lit_attr = attr;
298         it->lit_it = it_next;
299         it->lit_obj = dt;
300
301         return (struct dt_it *)it;
302 }
303
304 #define LOD_CHECK_STRIPED_IT(env, it, lo)                       \
305 do {                                                            \
306         LASSERT((it)->lit_obj != NULL);                         \
307         LASSERT((it)->lit_it != NULL);                          \
308         LASSERT((lo)->ldo_stripenr > 0);                        \
309         LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr);   \
310 } while (0)
311
312 /**
313  * Implementation of dt_index_operations:: dio_it.fini
314  *
315  * This function is to finish the iterator for striped directory.
316  *
317  * \param[in] env       execution environment.
318  * \param[in] di        the iterator for the striped directory
319  *
320  */
321 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
322 {
323         struct lod_it           *it = (struct lod_it *)di;
324         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
325         struct dt_object        *next;
326
327         LOD_CHECK_STRIPED_IT(env, it, lo);
328
329         next = lo->ldo_stripe[it->lit_stripe_index];
330         LASSERT(next != NULL);
331         LASSERT(next->do_index_ops != NULL);
332
333         next->do_index_ops->dio_it.fini(env, it->lit_it);
334
335         /* the iterator not in use any more */
336         it->lit_obj = NULL;
337         it->lit_it = NULL;
338         it->lit_stripe_index = 0;
339 }
340
341 /**
342  * Implementation of dt_index_operations:: dio_it.get
343  *
344  * This function is to position the iterator with given key
345  *
346  * \param[in] env       execution environment.
347  * \param[in] di        the iterator for striped directory.
348  * \param[in] key       the key the iterator will be positioned.
349  *
350  * \retval      0 if successfully position iterator by the key.
351  * \retval      negative error if position is failed.
352  */
353 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
354                               const struct dt_key *key)
355 {
356         const struct lod_it     *it = (const struct lod_it *)di;
357         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
358         struct dt_object        *next;
359         ENTRY;
360
361         LOD_CHECK_STRIPED_IT(env, it, lo);
362
363         next = lo->ldo_stripe[it->lit_stripe_index];
364         LASSERT(next != NULL);
365         LASSERT(next->do_index_ops != NULL);
366
367         return next->do_index_ops->dio_it.get(env, it->lit_it, key);
368 }
369
370 /**
371  * Implementation of dt_index_operations:: dio_it.put
372  *
373  * This function is supposed to be the pair of it_get, but currently do
374  * nothing. see (osd_it_ea_put or osd_index_it_put)
375  */
376 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
377 {
378         struct lod_it           *it = (struct lod_it *)di;
379         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
380         struct dt_object        *next;
381
382         LOD_CHECK_STRIPED_IT(env, it, lo);
383
384         next = lo->ldo_stripe[it->lit_stripe_index];
385         LASSERT(next != NULL);
386         LASSERT(next->do_index_ops != NULL);
387
388         return next->do_index_ops->dio_it.put(env, it->lit_it);
389 }
390
391 /**
392  * Implementation of dt_index_operations:: dio_it.next
393  *
394  * This function is to position the iterator to the next entry, if current
395  * stripe is finished by checking the return value of next() in current
396  * stripe. it will go to next stripe. In the mean time, the sub-iterator
397  * for next stripe needs to be initialized.
398  *
399  * \param[in] env       execution environment.
400  * \param[in] di        the iterator for striped directory.
401  *
402  * \retval      0 if successfully position iterator to the next entry.
403  * \retval      negative error if position is failed.
404  */
405 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
406 {
407         struct lod_it           *it = (struct lod_it *)di;
408         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
409         struct dt_object        *next;
410         struct dt_it            *it_next;
411         int                     rc;
412         ENTRY;
413
414         LOD_CHECK_STRIPED_IT(env, it, lo);
415
416         next = lo->ldo_stripe[it->lit_stripe_index];
417         LASSERT(next != NULL);
418         LASSERT(next->do_index_ops != NULL);
419 again:
420         rc = next->do_index_ops->dio_it.next(env, it->lit_it);
421         if (rc < 0)
422                 RETURN(rc);
423
424         if (rc == 0 && it->lit_stripe_index == 0)
425                 RETURN(rc);
426
427         if (rc == 0 && it->lit_stripe_index > 0) {
428                 struct lu_dirent *ent;
429
430                 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
431
432                 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
433                                                     (struct dt_rec *)ent,
434                                                     it->lit_attr);
435                 if (rc != 0)
436                         RETURN(rc);
437
438                 /* skip . and .. for slave stripe */
439                 if ((strncmp(ent->lde_name, ".",
440                              le16_to_cpu(ent->lde_namelen)) == 0 &&
441                      le16_to_cpu(ent->lde_namelen) == 1) ||
442                     (strncmp(ent->lde_name, "..",
443                              le16_to_cpu(ent->lde_namelen)) == 0 &&
444                      le16_to_cpu(ent->lde_namelen) == 2))
445                         goto again;
446
447                 RETURN(rc);
448         }
449
450         /* go to next stripe */
451         if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
452                 RETURN(1);
453
454         it->lit_stripe_index++;
455
456         next->do_index_ops->dio_it.put(env, it->lit_it);
457         next->do_index_ops->dio_it.fini(env, it->lit_it);
458
459         rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
460         if (rc != 0)
461                 RETURN(rc);
462
463         next = lo->ldo_stripe[it->lit_stripe_index];
464         LASSERT(next != NULL);
465         LASSERT(next->do_index_ops != NULL);
466
467         it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
468                                                   BYPASS_CAPA);
469         if (!IS_ERR(it_next)) {
470                 it->lit_it = it_next;
471                 goto again;
472         } else {
473                 rc = PTR_ERR(it_next);
474         }
475
476         RETURN(rc);
477 }
478
479 /**
480  * Implementation of dt_index_operations:: dio_it.key
481  *
482  * This function is to get the key of the iterator at current position.
483  *
484  * \param[in] env       execution environment.
485  * \param[in] di        the iterator for striped directory.
486  *
487  * \retval      key(dt_key) if successfully get the key.
488  * \retval      negative error if can not get the key.
489  */
490 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
491                                          const struct dt_it *di)
492 {
493         const struct lod_it     *it = (const struct lod_it *)di;
494         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
495         struct dt_object        *next;
496
497         LOD_CHECK_STRIPED_IT(env, it, lo);
498
499         next = lo->ldo_stripe[it->lit_stripe_index];
500         LASSERT(next != NULL);
501         LASSERT(next->do_index_ops != NULL);
502
503         return next->do_index_ops->dio_it.key(env, it->lit_it);
504 }
505
506 /**
507  * Implementation of dt_index_operations:: dio_it.key_size
508  *
509  * This function is to get the key_size of current key.
510  *
511  * \param[in] env       execution environment.
512  * \param[in] di        the iterator for striped directory.
513  *
514  * \retval      key_size if successfully get the key_size.
515  * \retval      negative error if can not get the key_size.
516  */
517 static int lod_striped_it_key_size(const struct lu_env *env,
518                                    const struct dt_it *di)
519 {
520         struct lod_it           *it = (struct lod_it *)di;
521         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
522         struct dt_object        *next;
523
524         LOD_CHECK_STRIPED_IT(env, it, lo);
525
526         next = lo->ldo_stripe[it->lit_stripe_index];
527         LASSERT(next != NULL);
528         LASSERT(next->do_index_ops != NULL);
529
530         return next->do_index_ops->dio_it.key_size(env, it->lit_it);
531 }
532
533 /**
534  * Implementation of dt_index_operations:: dio_it.rec
535  *
536  * This function is to get the record at current position.
537  *
538  * \param[in] env       execution environment.
539  * \param[in] di        the iterator for striped directory.
540  * \param[in] attr      the attribute of iterator, mostly used to indicate
541  *                      the entry attribute in the object to be iterated.
542  * \param[out] rec      hold the return record.
543  *
544  * \retval      0 if successfully get the entry.
545  * \retval      negative error if can not get entry.
546  */
547 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
548                               struct dt_rec *rec, __u32 attr)
549 {
550         const struct lod_it     *it = (const struct lod_it *)di;
551         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
552         struct dt_object        *next;
553
554         LOD_CHECK_STRIPED_IT(env, it, lo);
555
556         next = lo->ldo_stripe[it->lit_stripe_index];
557         LASSERT(next != NULL);
558         LASSERT(next->do_index_ops != NULL);
559
560         return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
561 }
562
563 /**
564  * Implementation of dt_index_operations:: dio_it.rec_size
565  *
566  * This function is to get the record_size at current record.
567  *
568  * \param[in] env       execution environment.
569  * \param[in] di        the iterator for striped directory.
570  * \param[in] attr      the attribute of iterator, mostly used to indicate
571  *                      the entry attribute in the object to be iterated.
572  *
573  * \retval      rec_size if successfully get the entry size.
574  * \retval      negative error if can not get entry size.
575  */
576 static int lod_striped_it_rec_size(const struct lu_env *env,
577                                    const struct dt_it *di, __u32 attr)
578 {
579         struct lod_it           *it = (struct lod_it *)di;
580         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
581         struct dt_object        *next;
582
583         LOD_CHECK_STRIPED_IT(env, it, lo);
584
585         next = lo->ldo_stripe[it->lit_stripe_index];
586         LASSERT(next != NULL);
587         LASSERT(next->do_index_ops != NULL);
588
589         return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
590 }
591
592 /**
593  * Implementation of dt_index_operations:: dio_it.store
594  *
595  * This function will a cookie for current position of the iterator head,
596  * so that user can use this cookie to load/start the iterator next time.
597  *
598  * \param[in] env       execution environment.
599  * \param[in] di        the iterator for striped directory.
600  *
601  * \retval      the cookie.
602  */
603 static __u64 lod_striped_it_store(const struct lu_env *env,
604                                   const struct dt_it *di)
605 {
606         const struct lod_it     *it = (const struct lod_it *)di;
607         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
608         struct dt_object        *next;
609
610         LOD_CHECK_STRIPED_IT(env, it, lo);
611
612         next = lo->ldo_stripe[it->lit_stripe_index];
613         LASSERT(next != NULL);
614         LASSERT(next->do_index_ops != NULL);
615
616         return next->do_index_ops->dio_it.store(env, it->lit_it);
617 }
618
619 /**
620  * Implementation of dt_index_operations:: dio_it.load
621  *
622  * This function will position the iterator with the given hash(usually
623  * get from store),
624  *
625  * \param[in] env       execution environment.
626  * \param[in] di        the iterator for striped directory.
627  * \param[in] hash      the given hash.
628  *
629  * \retval      >0 if successfuly load the iterator to the given position.
630  * \retval      <0 if load is failed.
631  */
632 static int lod_striped_it_load(const struct lu_env *env,
633                                const struct dt_it *di, __u64 hash)
634 {
635         const struct lod_it     *it = (const struct lod_it *)di;
636         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
637         struct dt_object        *next;
638
639         LOD_CHECK_STRIPED_IT(env, it, lo);
640
641         next = lo->ldo_stripe[it->lit_stripe_index];
642         LASSERT(next != NULL);
643         LASSERT(next->do_index_ops != NULL);
644
645         return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
646 }
647
648 static struct dt_index_operations lod_striped_index_ops = {
649         .dio_lookup             = lod_index_lookup,
650         .dio_declare_insert     = lod_declare_index_insert,
651         .dio_insert             = lod_index_insert,
652         .dio_declare_delete     = lod_declare_index_delete,
653         .dio_delete             = lod_index_delete,
654         .dio_it = {
655                 .init           = lod_striped_it_init,
656                 .fini           = lod_striped_it_fini,
657                 .get            = lod_striped_it_get,
658                 .put            = lod_striped_it_put,
659                 .next           = lod_striped_it_next,
660                 .key            = lod_striped_it_key,
661                 .key_size       = lod_striped_it_key_size,
662                 .rec            = lod_striped_it_rec,
663                 .rec_size       = lod_striped_it_rec_size,
664                 .store          = lod_striped_it_store,
665                 .load           = lod_striped_it_load,
666         }
667 };
668
669 /**
670  * Implementation of dt_object_operations:: do_index_try
671  *
672  * This function will try to initialize the index api pointer for the
673  * given object, usually it the entry point of the index api. i.e.
674  * the index object should be initialized in index_try, then start
675  * using index api. For striped directory, it will try to initialize
676  * all of its sub_stripes.
677  *
678  * \param[in] env       execution environment.
679  * \param[in] dt        the index object to be initialized.
680  * \param[in] feat      the features of this object, for example fixed or
681  *                      variable key size etc.
682  *
683  * \retval      >0 if the initialization is successful.
684  * \retval      <0 if the initialization is failed.
685  */
686 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
687                          const struct dt_index_features *feat)
688 {
689         struct lod_object       *lo = lod_dt_obj(dt);
690         struct dt_object        *next = dt_object_child(dt);
691         int                     rc;
692         ENTRY;
693
694         LASSERT(next->do_ops);
695         LASSERT(next->do_ops->do_index_try);
696
697         rc = lod_load_striping_locked(env, lo);
698         if (rc != 0)
699                 RETURN(rc);
700
701         rc = next->do_ops->do_index_try(env, next, feat);
702         if (rc != 0)
703                 RETURN(rc);
704
705         if (lo->ldo_stripenr > 0) {
706                 int i;
707
708                 for (i = 0; i < lo->ldo_stripenr; i++) {
709                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
710                                 continue;
711                         rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
712                                                 lo->ldo_stripe[i], feat);
713                         if (rc != 0)
714                                 RETURN(rc);
715                 }
716                 dt->do_index_ops = &lod_striped_index_ops;
717         } else {
718                 dt->do_index_ops = &lod_index_ops;
719         }
720
721         RETURN(rc);
722 }
723
724 static void lod_object_read_lock(const struct lu_env *env,
725                                  struct dt_object *dt, unsigned role)
726 {
727         dt_read_lock(env, dt_object_child(dt), role);
728 }
729
730 static void lod_object_write_lock(const struct lu_env *env,
731                                   struct dt_object *dt, unsigned role)
732 {
733         dt_write_lock(env, dt_object_child(dt), role);
734 }
735
736 static void lod_object_read_unlock(const struct lu_env *env,
737                                    struct dt_object *dt)
738 {
739         dt_read_unlock(env, dt_object_child(dt));
740 }
741
742 static void lod_object_write_unlock(const struct lu_env *env,
743                                     struct dt_object *dt)
744 {
745         dt_write_unlock(env, dt_object_child(dt));
746 }
747
748 static int lod_object_write_locked(const struct lu_env *env,
749                                    struct dt_object *dt)
750 {
751         return dt_write_locked(env, dt_object_child(dt));
752 }
753
754 static int lod_attr_get(const struct lu_env *env,
755                         struct dt_object *dt,
756                         struct lu_attr *attr,
757                         struct lustre_capa *capa)
758 {
759         struct lod_object *lo = lod_dt_obj(dt);
760         int i;
761         int rc;
762         ENTRY;
763
764         rc = dt_attr_get(env, dt_object_child(dt), attr, capa);
765         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr) || rc != 0)
766                 RETURN(rc);
767
768         rc = lod_load_striping_locked(env, lo);
769         if (rc)
770                 RETURN(rc);
771
772         if (lo->ldo_stripenr == 0)
773                 RETURN(rc);
774
775         attr->la_nlink = 2;
776         attr->la_size = 0;
777         for (i = 0; i < lo->ldo_stripenr; i++) {
778                 struct lu_attr *sub_attr = &lod_env_info(env)->lti_attr;
779
780                 LASSERT(lo->ldo_stripe[i]);
781                 if (dt_object_exists(lo->ldo_stripe[i]))
782                         continue;
783
784                 rc = dt_attr_get(env, lo->ldo_stripe[i], sub_attr, capa);
785                 if (rc != 0)
786                         break;
787
788                 /* -2 for . and .. on each stripe */
789                 if (sub_attr->la_valid & LA_NLINK && attr->la_valid & LA_NLINK)
790                         attr->la_nlink += sub_attr->la_nlink - 2;
791                 if (sub_attr->la_valid & LA_SIZE && attr->la_valid & LA_SIZE)
792                         attr->la_size += sub_attr->la_size;
793
794                 if (sub_attr->la_valid & LA_ATIME &&
795                     attr->la_valid & LA_ATIME &&
796                     attr->la_atime < sub_attr->la_atime)
797                         attr->la_atime = sub_attr->la_atime;
798
799                 if (sub_attr->la_valid & LA_CTIME &&
800                     attr->la_valid & LA_CTIME &&
801                     attr->la_ctime < sub_attr->la_ctime)
802                         attr->la_ctime = sub_attr->la_ctime;
803
804                 if (sub_attr->la_valid & LA_MTIME &&
805                     attr->la_valid & LA_MTIME &&
806                     attr->la_mtime < sub_attr->la_mtime)
807                         attr->la_mtime = sub_attr->la_mtime;
808         }
809
810         CDEBUG(D_INFO, DFID" stripe_count %d nlink %u size "LPU64"\n",
811                PFID(lu_object_fid(&dt->do_lu)), lo->ldo_stripenr,
812                attr->la_nlink, attr->la_size);
813
814         RETURN(rc);
815 }
816
817 /**
818  * Mark all of sub-stripes dead of the striped directory.
819  **/
820 static int lod_mark_dead_object(const struct lu_env *env,
821                                 struct dt_object *dt,
822                                 struct thandle *handle,
823                                 bool declare)
824 {
825         struct lod_object       *lo = lod_dt_obj(dt);
826         struct lmv_mds_md_v1    *lmv;
827         __u32                   dead_hash_type;
828         int                     rc;
829         int                     i;
830
831         ENTRY;
832
833         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
834                 RETURN(0);
835
836         rc = lod_load_striping_locked(env, lo);
837         if (rc != 0)
838                 RETURN(rc);
839
840         if (lo->ldo_stripenr == 0)
841                 RETURN(0);
842
843         rc = lod_get_lmv_ea(env, lo);
844         if (rc <= 0)
845                 RETURN(rc);
846
847         lmv = lod_env_info(env)->lti_ea_store;
848         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
849         dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
850         lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
851         for (i = 0; i < lo->ldo_stripenr; i++) {
852                 struct lu_buf buf;
853
854                 lmv->lmv_master_mdt_index = i;
855                 buf.lb_buf = lmv;
856                 buf.lb_len = sizeof(*lmv);
857                 if (declare) {
858                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
859                                                   XATTR_NAME_LMV,
860                                                   LU_XATTR_REPLACE, handle);
861                 } else {
862                         rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
863                                           XATTR_NAME_LMV, LU_XATTR_REPLACE,
864                                           handle, BYPASS_CAPA);
865                 }
866                 if (rc != 0)
867                         break;
868         }
869
870         RETURN(rc);
871 }
872
873 static int lod_declare_attr_set(const struct lu_env *env,
874                                 struct dt_object *dt,
875                                 const struct lu_attr *attr,
876                                 struct thandle *handle)
877 {
878         struct dt_object  *next = dt_object_child(dt);
879         struct lod_object *lo = lod_dt_obj(dt);
880         int                rc, i;
881         ENTRY;
882
883         /* Set dead object on all other stripes */
884         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
885             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
886                 rc = lod_mark_dead_object(env, dt, handle, true);
887                 RETURN(rc);
888         }
889
890         /*
891          * declare setattr on the local object
892          */
893         rc = dt_declare_attr_set(env, next, attr, handle);
894         if (rc)
895                 RETURN(rc);
896
897         /* osp_declare_attr_set() ignores all attributes other than
898          * UID, GID, and size, and osp_attr_set() ignores all but UID
899          * and GID.  Declaration of size attr setting happens through
900          * lod_declare_init_size(), and not through this function.
901          * Therefore we need not load striping unless ownership is
902          * changing.  This should save memory and (we hope) speed up
903          * rename(). */
904         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
905                 if (!(attr->la_valid & (LA_UID | LA_GID)))
906                         RETURN(rc);
907
908                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
909                         RETURN(0);
910         } else {
911                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
912                                         LA_ATIME | LA_MTIME | LA_CTIME)))
913                         RETURN(rc);
914         }
915         /*
916          * load striping information, notice we don't do this when object
917          * is being initialized as we don't need this information till
918          * few specific cases like destroy, chown
919          */
920         rc = lod_load_striping(env, lo);
921         if (rc)
922                 RETURN(rc);
923
924         if (lo->ldo_stripenr == 0)
925                 RETURN(0);
926
927         /*
928          * if object is striped declare changes on the stripes
929          */
930         LASSERT(lo->ldo_stripe);
931         for (i = 0; i < lo->ldo_stripenr; i++) {
932                 LASSERT(lo->ldo_stripe[i]);
933
934                 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
935                 if (rc) {
936                         CERROR("failed declaration: %d\n", rc);
937                         break;
938                 }
939         }
940
941         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
942             dt_object_exists(next) != 0 &&
943             dt_object_remote(next) == 0)
944                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
945
946         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
947             dt_object_exists(next) &&
948             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
949                 struct lod_thread_info *info = lod_env_info(env);
950                 struct lu_buf *buf = &info->lti_buf;
951
952                 buf->lb_buf = info->lti_ea_store;
953                 buf->lb_len = info->lti_ea_store_size;
954                 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
955                                      LU_XATTR_REPLACE, handle);
956         }
957
958         RETURN(rc);
959 }
960
961 static int lod_attr_set(const struct lu_env *env,
962                         struct dt_object *dt,
963                         const struct lu_attr *attr,
964                         struct thandle *handle,
965                         struct lustre_capa *capa)
966 {
967         struct dt_object        *next = dt_object_child(dt);
968         struct lod_object       *lo = lod_dt_obj(dt);
969         int                     rc, i;
970         ENTRY;
971
972         /* Set dead object on all other stripes */
973         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
974             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
975                 rc = lod_mark_dead_object(env, dt, handle, false);
976                 RETURN(rc);
977         }
978
979         /*
980          * apply changes to the local object
981          */
982         rc = dt_attr_set(env, next, attr, handle, capa);
983         if (rc)
984                 RETURN(rc);
985
986         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
987                 if (!(attr->la_valid & (LA_UID | LA_GID)))
988                         RETURN(rc);
989
990                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
991                         RETURN(0);
992         } else {
993                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
994                                         LA_ATIME | LA_MTIME | LA_CTIME)))
995                         RETURN(rc);
996         }
997
998         if (lo->ldo_stripenr == 0)
999                 RETURN(0);
1000
1001         /*
1002          * if object is striped, apply changes to all the stripes
1003          */
1004         LASSERT(lo->ldo_stripe);
1005         for (i = 0; i < lo->ldo_stripenr; i++) {
1006                 LASSERT(lo->ldo_stripe[i]);
1007                 if (dt_object_exists(lo->ldo_stripe[i]) == 0)
1008                         continue;
1009                 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
1010                 if (rc) {
1011                         CERROR("failed declaration: %d\n", rc);
1012                         break;
1013                 }
1014         }
1015
1016         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1017             dt_object_exists(next) != 0 &&
1018             dt_object_remote(next) == 0)
1019                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1020
1021         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1022             dt_object_exists(next) &&
1023             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1024                 struct lod_thread_info *info = lod_env_info(env);
1025                 struct lu_buf *buf = &info->lti_buf;
1026                 struct ost_id *oi = &info->lti_ostid;
1027                 struct lu_fid *fid = &info->lti_fid;
1028                 struct lov_mds_md_v1 *lmm;
1029                 struct lov_ost_data_v1 *objs;
1030                 __u32 magic;
1031                 int rc1;
1032
1033                 rc1 = lod_get_lov_ea(env, lo);
1034                 if (rc1  <= 0)
1035                         RETURN(rc);
1036
1037                 buf->lb_buf = info->lti_ea_store;
1038                 buf->lb_len = info->lti_ea_store_size;
1039                 lmm = info->lti_ea_store;
1040                 magic = le32_to_cpu(lmm->lmm_magic);
1041                 if (magic == LOV_MAGIC_V1)
1042                         objs = &(lmm->lmm_objects[0]);
1043                 else
1044                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1045                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1046                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1047                 fid->f_oid--;
1048                 fid_to_ostid(fid, oi);
1049                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1050                 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1051                              LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1052         }
1053
1054         RETURN(rc);
1055 }
1056
1057 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1058                          struct lu_buf *buf, const char *name,
1059                          struct lustre_capa *capa)
1060 {
1061         struct lod_thread_info  *info = lod_env_info(env);
1062         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
1063         int                      rc, is_root;
1064         ENTRY;
1065
1066         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1067         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1068                 RETURN(rc);
1069
1070         /*
1071          * lod returns default striping on the real root of the device
1072          * this is like the root stores default striping for the whole
1073          * filesystem. historically we've been using a different approach
1074          * and store it in the config.
1075          */
1076         dt_root_get(env, dev->lod_child, &info->lti_fid);
1077         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1078
1079         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1080                 struct lov_user_md *lum = buf->lb_buf;
1081                 struct lov_desc    *desc = &dev->lod_desc;
1082
1083                 if (buf->lb_buf == NULL) {
1084                         rc = sizeof(*lum);
1085                 } else if (buf->lb_len >= sizeof(*lum)) {
1086                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1087                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1088                         lmm_oi_set_id(&lum->lmm_oi, 0);
1089                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1090                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1091                         lum->lmm_stripe_size = cpu_to_le32(
1092                                                 desc->ld_default_stripe_size);
1093                         lum->lmm_stripe_count = cpu_to_le16(
1094                                                 desc->ld_default_stripe_count);
1095                         lum->lmm_stripe_offset = cpu_to_le16(
1096                                                 desc->ld_default_stripe_offset);
1097                         rc = sizeof(*lum);
1098                 } else {
1099                         rc = -ERANGE;
1100                 }
1101         }
1102
1103         RETURN(rc);
1104 }
1105
1106 static int lod_verify_md_striping(struct lod_device *lod,
1107                                   const struct lmv_user_md_v1 *lum)
1108 {
1109         int     rc = 0;
1110         ENTRY;
1111
1112         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1113                 GOTO(out, rc = -EINVAL);
1114
1115         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1116                 GOTO(out, rc = -EINVAL);
1117 out:
1118         if (rc != 0)
1119                 CERROR("%s: invalid lmv_user_md: magic = %x, "
1120                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1121                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1122                        (int)le32_to_cpu(lum->lum_stripe_offset),
1123                        le32_to_cpu(lum->lum_stripe_count), rc);
1124         return rc;
1125 }
1126
1127 /**
1128  * Master LMVEA will be same as slave LMVEA, except
1129  * 1. different magic
1130  * 2. No lmv_stripe_fids on slave
1131  * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1132  */
1133 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1134                                   const struct lmv_mds_md_v1 *master_lmv)
1135 {
1136         *slave_lmv = *master_lmv;
1137         slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1138 }
1139
1140 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1141                     struct lu_buf *lmv_buf)
1142 {
1143         struct lod_thread_info  *info = lod_env_info(env);
1144         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1145         struct lod_object       *lo = lod_dt_obj(dt);
1146         struct lmv_mds_md_v1    *lmm1;
1147         int                     stripe_count;
1148         int                     lmm_size;
1149         int                     type = LU_SEQ_RANGE_ANY;
1150         int                     i;
1151         int                     rc;
1152         __u32                   mdtidx;
1153         ENTRY;
1154
1155         LASSERT(lo->ldo_dir_striped != 0);
1156         LASSERT(lo->ldo_stripenr > 0);
1157         stripe_count = lo->ldo_stripenr;
1158         lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
1159         if (info->lti_ea_store_size < lmm_size) {
1160                 rc = lod_ea_store_resize(info, lmm_size);
1161                 if (rc != 0)
1162                         RETURN(rc);
1163         }
1164
1165         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1166         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1167         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1168         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1169         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1170                             &mdtidx, &type);
1171         if (rc != 0)
1172                 RETURN(rc);
1173
1174         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1175         fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
1176         for (i = 0; i < lo->ldo_stripenr; i++) {
1177                 struct dt_object *dto;
1178
1179                 dto = lo->ldo_stripe[i];
1180                 LASSERT(dto != NULL);
1181                 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
1182                               lu_object_fid(&dto->do_lu));
1183         }
1184
1185         lmv_buf->lb_buf = info->lti_ea_store;
1186         lmv_buf->lb_len = lmm_size;
1187         lo->ldo_dir_striping_cached = 1;
1188
1189         RETURN(rc);
1190 }
1191
1192 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1193                            const struct lu_buf *buf)
1194 {
1195         struct lod_thread_info  *info = lod_env_info(env);
1196         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1197         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1198         struct dt_object        **stripe;
1199         union lmv_mds_md        *lmm = buf->lb_buf;
1200         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
1201         struct lu_fid           *fid = &info->lti_fid;
1202         int                     i;
1203         int                     rc = 0;
1204         ENTRY;
1205
1206         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1207                 RETURN(0);
1208
1209         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1210                 lo->ldo_dir_slave_stripe = 1;
1211                 RETURN(0);
1212         }
1213
1214         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1215                 RETURN(-EINVAL);
1216
1217         if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
1218                 RETURN(0);
1219
1220         LASSERT(lo->ldo_stripe == NULL);
1221         OBD_ALLOC(stripe, sizeof(stripe[0]) *
1222                   (le32_to_cpu(lmv1->lmv_stripe_count)));
1223         if (stripe == NULL)
1224                 RETURN(-ENOMEM);
1225
1226         for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1227                 struct dt_device        *tgt_dt;
1228                 struct dt_object        *dto;
1229                 int                     type = LU_SEQ_RANGE_ANY;
1230                 __u32                   idx;
1231
1232                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1233                 if (!fid_is_sane(fid))
1234                         GOTO(out, rc = -ESTALE);
1235
1236                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1237                 if (rc != 0)
1238                         GOTO(out, rc);
1239
1240                 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1241                         tgt_dt = lod->lod_child;
1242                 } else {
1243                         struct lod_tgt_desc     *tgt;
1244
1245                         tgt = LTD_TGT(ltd, idx);
1246                         if (tgt == NULL)
1247                                 GOTO(out, rc = -ESTALE);
1248                         tgt_dt = tgt->ltd_tgt;
1249                 }
1250
1251                 dto = dt_locate_at(env, tgt_dt, fid,
1252                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1253                                   NULL);
1254                 if (IS_ERR(dto))
1255                         GOTO(out, rc = PTR_ERR(dto));
1256
1257                 stripe[i] = dto;
1258         }
1259 out:
1260         lo->ldo_stripe = stripe;
1261         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1262         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1263         if (rc != 0)
1264                 lod_object_free_striping(env, lo);
1265
1266         RETURN(rc);
1267 }
1268
1269 static int lod_prep_md_striped_create(const struct lu_env *env,
1270                                       struct dt_object *dt,
1271                                       struct lu_attr *attr,
1272                                       const struct lmv_user_md_v1 *lum,
1273                                       struct dt_object_format *dof,
1274                                       struct thandle *th)
1275 {
1276         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1277         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1278         struct lod_object       *lo = lod_dt_obj(dt);
1279         struct lod_thread_info  *info = lod_env_info(env);
1280         struct dt_object        **stripe;
1281         struct lu_buf           lmv_buf;
1282         struct lu_buf           slave_lmv_buf;
1283         struct lmv_mds_md_v1    *lmm;
1284         struct lmv_mds_md_v1    *slave_lmm = NULL;
1285         int                     stripe_count;
1286         int                     *idx_array;
1287         int                     rc = 0;
1288         int                     i;
1289         int                     j;
1290         ENTRY;
1291
1292         /* The lum has been verifed in lod_verify_md_striping */
1293         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1294         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1295
1296         stripe_count = le32_to_cpu(lum->lum_stripe_count);
1297
1298         /* shrink the stripe_count to the avaible MDT count */
1299         if (stripe_count > lod->lod_remote_mdt_count + 1)
1300                 stripe_count = lod->lod_remote_mdt_count + 1;
1301
1302         OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1303         if (stripe == NULL)
1304                 RETURN(-ENOMEM);
1305
1306         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1307         if (idx_array == NULL)
1308                 GOTO(out_free, rc = -ENOMEM);
1309
1310         for (i = 0; i < stripe_count; i++) {
1311                 struct lod_tgt_desc     *tgt = NULL;
1312                 struct dt_object        *dto;
1313                 struct lu_fid           fid = { 0 };
1314                 int                     idx;
1315                 struct lu_object_conf   conf = { 0 };
1316                 struct dt_device        *tgt_dt = NULL;
1317
1318                 if (i == 0) {
1319                         /* Right now, master stripe and master object are
1320                          * on the same MDT */
1321                         idx = le32_to_cpu(lum->lum_stripe_offset);
1322                         rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1323                                            NULL);
1324                         if (rc < 0)
1325                                 GOTO(out_put, rc);
1326                         tgt_dt = lod->lod_child;
1327                         goto next;
1328                 }
1329
1330                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1331
1332                 for (j = 0; j < lod->lod_remote_mdt_count;
1333                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1334                         bool already_allocated = false;
1335                         int k;
1336
1337                         CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1338                                " allocated %d, last allocated %d\n", idx,
1339                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1340
1341                         /* Find next available target */
1342                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1343                                 continue;
1344
1345                         /* check whether the idx already exists
1346                          * in current allocated array */
1347                         for (k = 0; k < i; k++) {
1348                                 if (idx_array[k] == idx) {
1349                                         already_allocated = true;
1350                                         break;
1351                                 }
1352                         }
1353
1354                         if (already_allocated)
1355                                 continue;
1356
1357                         /* check the status of the OSP */
1358                         tgt = LTD_TGT(ltd, idx);
1359                         if (tgt == NULL)
1360                                 continue;
1361
1362                         tgt_dt = tgt->ltd_tgt;
1363                         rc = dt_statfs(env, tgt_dt, NULL);
1364                         if (rc) {
1365                                 /* this OSP doesn't feel well */
1366                                 rc = 0;
1367                                 continue;
1368                         }
1369
1370                         rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1371                         if (rc < 0) {
1372                                 rc = 0;
1373                                 continue;
1374                         }
1375
1376                         break;
1377                 }
1378
1379                 /* Can not allocate more stripes */
1380                 if (j == lod->lod_remote_mdt_count) {
1381                         CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1382                                lod2obd(lod)->obd_name, stripe_count, i - 1);
1383                         break;
1384                 }
1385
1386                 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1387                        " allocated %d, last allocated %d\n", idx,
1388                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1389
1390 next:
1391                 /* tgt_dt and fid must be ready after search avaible OSP
1392                  * in the above loop */
1393                 LASSERT(tgt_dt != NULL);
1394                 LASSERT(fid_is_sane(&fid));
1395                 conf.loc_flags = LOC_F_NEW;
1396                 dto = dt_locate_at(env, tgt_dt, &fid,
1397                                    dt->do_lu.lo_dev->ld_site->ls_top_dev,
1398                                    &conf);
1399                 if (IS_ERR(dto))
1400                         GOTO(out_put, rc = PTR_ERR(dto));
1401                 stripe[i] = dto;
1402                 idx_array[i] = idx;
1403         }
1404
1405         lo->ldo_dir_striped = 1;
1406         lo->ldo_stripe = stripe;
1407         lo->ldo_stripenr = i;
1408         lo->ldo_stripes_allocated = stripe_count;
1409
1410         if (lo->ldo_stripenr == 0)
1411                 GOTO(out_put, rc = -ENOSPC);
1412
1413         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1414         if (rc != 0)
1415                 GOTO(out_put, rc);
1416         lmm = lmv_buf.lb_buf;
1417
1418         OBD_ALLOC_PTR(slave_lmm);
1419         if (slave_lmm == NULL)
1420                 GOTO(out_put, rc = -ENOMEM);
1421
1422         lod_prep_slave_lmv_md(slave_lmm, lmm);
1423         slave_lmv_buf.lb_buf = slave_lmm;
1424         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1425
1426         if (!dt_try_as_dir(env, dt_object_child(dt)))
1427                 GOTO(out_put, rc = -EINVAL);
1428
1429         for (i = 0; i < lo->ldo_stripenr; i++) {
1430                 struct dt_object *dto = stripe[i];
1431                 char             *stripe_name = info->lti_key;
1432
1433                 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1434                 if (rc != 0)
1435                         GOTO(out_put, rc);
1436
1437                 if (!dt_try_as_dir(env, dto))
1438                         GOTO(out_put, rc = -EINVAL);
1439
1440                 rc = dt_declare_insert(env, dto,
1441                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1442                      (const struct dt_key *)dot, th);
1443                 if (rc != 0)
1444                         GOTO(out_put, rc);
1445
1446                 /* master stripe FID will be put to .. */
1447                 rc = dt_declare_insert(env, dto,
1448                      (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1449                      (const struct dt_key *)dotdot, th);
1450                 if (rc != 0)
1451                         GOTO(out_put, rc);
1452
1453                 /* probably nothing to inherite */
1454                 if (lo->ldo_striping_cached &&
1455                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1456                                          lo->ldo_def_stripenr,
1457                                          lo->ldo_def_stripe_offset)) {
1458                         struct lov_user_md_v3   *v3;
1459
1460                         /* sigh, lti_ea_store has been used for lmv_buf,
1461                          * so we have to allocate buffer for default
1462                          * stripe EA */
1463                         OBD_ALLOC_PTR(v3);
1464                         if (v3 == NULL)
1465                                 GOTO(out_put, rc = -ENOMEM);
1466
1467                         memset(v3, 0, sizeof(*v3));
1468                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1469                         v3->lmm_stripe_count =
1470                                 cpu_to_le16(lo->ldo_def_stripenr);
1471                         v3->lmm_stripe_offset =
1472                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1473                         v3->lmm_stripe_size =
1474                                 cpu_to_le32(lo->ldo_def_stripe_size);
1475                         if (lo->ldo_pool)
1476                                 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1477                                         LOV_MAXPOOLNAME);
1478
1479                         info->lti_buf.lb_buf = v3;
1480                         info->lti_buf.lb_len = sizeof(*v3);
1481                         rc = dt_declare_xattr_set(env, dto,
1482                                                   &info->lti_buf,
1483                                                   XATTR_NAME_LOV,
1484                                                   0, th);
1485                         OBD_FREE_PTR(v3);
1486                         if (rc != 0)
1487                                 GOTO(out_put, rc);
1488                 }
1489
1490                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1491                 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1492                                           XATTR_NAME_LMV, 0, th);
1493                 if (rc != 0)
1494                         GOTO(out_put, rc);
1495
1496                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1497                         PFID(lu_object_fid(&dto->do_lu)), i);
1498                 rc = dt_declare_insert(env, dt_object_child(dt),
1499                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1500                      (const struct dt_key *)stripe_name, th);
1501                 if (rc != 0)
1502                         GOTO(out_put, rc);
1503
1504                 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1505                 if (rc != 0)
1506                         GOTO(out_put, rc);
1507         }
1508
1509         rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1510                                   XATTR_NAME_LMV, 0, th);
1511         if (rc != 0)
1512                 GOTO(out_put, rc);
1513
1514 out_put:
1515         if (rc < 0) {
1516                 for (i = 0; i < stripe_count; i++)
1517                         if (stripe[i] != NULL)
1518                                 lu_object_put(env, &stripe[i]->do_lu);
1519                 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1520                 lo->ldo_stripenr = 0;
1521                 lo->ldo_stripes_allocated = 0;
1522                 lo->ldo_stripe = NULL;
1523         }
1524
1525 out_free:
1526         if (idx_array != NULL)
1527                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1528         if (slave_lmm != NULL)
1529                 OBD_FREE_PTR(slave_lmm);
1530
1531         RETURN(rc);
1532 }
1533
1534 /**
1535  * Declare create striped md object.
1536  */
1537 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1538                                      struct dt_object *dt,
1539                                      struct lu_attr *attr,
1540                                      const struct lu_buf *lum_buf,
1541                                      struct dt_object_format *dof,
1542                                      struct thandle *th)
1543 {
1544         struct lod_object       *lo = lod_dt_obj(dt);
1545         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1546         struct lmv_user_md_v1   *lum;
1547         int                     rc;
1548         ENTRY;
1549
1550         lum = lum_buf->lb_buf;
1551         LASSERT(lum != NULL);
1552
1553         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1554                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1555                (int)le32_to_cpu(lum->lum_stripe_offset));
1556
1557         if (le32_to_cpu(lum->lum_stripe_count) == 0)
1558                 GOTO(out, rc = 0);
1559
1560         rc = lod_verify_md_striping(lod, lum);
1561         if (rc != 0)
1562                 GOTO(out, rc);
1563
1564         /* prepare dir striped objects */
1565         rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1566         if (rc != 0) {
1567                 /* failed to create striping, let's reset
1568                  * config so that others don't get confused */
1569                 lod_object_free_striping(env, lo);
1570                 GOTO(out, rc);
1571         }
1572 out:
1573         RETURN(rc);
1574 }
1575
1576 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1577                                      struct dt_object *dt,
1578                                      const struct lu_buf *buf,
1579                                      const char *name, int fl,
1580                                      struct thandle *th)
1581 {
1582         struct dt_object        *next = dt_object_child(dt);
1583         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1584         struct lod_object       *lo = lod_dt_obj(dt);
1585         int                     i;
1586         int                     rc;
1587         ENTRY;
1588
1589         if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1590                 struct lmv_user_md_v1 *lum;
1591
1592                 LASSERT(buf != NULL && buf->lb_buf != NULL);
1593                 lum = buf->lb_buf;
1594                 rc = lod_verify_md_striping(d, lum);
1595                 if (rc != 0)
1596                         RETURN(rc);
1597         }
1598
1599         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1600         if (rc != 0)
1601                 RETURN(rc);
1602
1603         /* set xattr to each stripes, if needed */
1604         rc = lod_load_striping(env, lo);
1605         if (rc != 0)
1606                 RETURN(rc);
1607
1608         if (lo->ldo_stripenr == 0)
1609                 RETURN(rc);
1610
1611         for (i = 0; i < lo->ldo_stripenr; i++) {
1612                 LASSERT(lo->ldo_stripe[i]);
1613                 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1614                                           name, fl, th);
1615                 if (rc != 0)
1616                         break;
1617         }
1618
1619         RETURN(rc);
1620 }
1621
1622 /*
1623  * LOV xattr is a storage for striping, and LOD owns this xattr.
1624  * but LOD allows others to control striping to some extent
1625  * - to reset strping
1626  * - to set new defined striping
1627  * - to set new semi-defined striping
1628  *   - number of stripes is defined
1629  *   - number of stripes + osts are defined
1630  *   - ??
1631  */
1632 static int lod_declare_xattr_set(const struct lu_env *env,
1633                                  struct dt_object *dt,
1634                                  const struct lu_buf *buf,
1635                                  const char *name, int fl,
1636                                  struct thandle *th)
1637 {
1638         struct dt_object *next = dt_object_child(dt);
1639         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
1640         __u32             mode;
1641         int               rc;
1642         ENTRY;
1643
1644         /*
1645          * allow to declare predefined striping on a new (!mode) object
1646          * which is supposed to be replay of regular file creation
1647          * (when LOV setting is declared)
1648          * LU_XATTR_REPLACE is set to indicate a layout swap
1649          */
1650         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1651         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1652              !(fl & LU_XATTR_REPLACE)) {
1653                 /*
1654                  * this is a request to manipulate object's striping
1655                  */
1656                 if (dt_object_exists(dt)) {
1657                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1658                         if (rc)
1659                                 RETURN(rc);
1660                 } else {
1661                         memset(attr, 0, sizeof(*attr));
1662                         attr->la_valid = LA_TYPE | LA_MODE;
1663                         attr->la_mode = S_IFREG;
1664                 }
1665                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1666         } else if (S_ISDIR(mode)) {
1667                 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1668         } else {
1669                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1670         }
1671
1672         RETURN(rc);
1673 }
1674
1675 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1676 {
1677         lo->ldo_striping_cached = 0;
1678         lo->ldo_def_striping_set = 0;
1679         lod_object_set_pool(lo, NULL);
1680         lo->ldo_def_stripe_size = 0;
1681         lo->ldo_def_stripenr = 0;
1682         if (lo->ldo_dir_stripe != NULL)
1683                 lo->ldo_dir_striping_cached = 0;
1684 }
1685
1686 static int lod_xattr_set_internal(const struct lu_env *env,
1687                                   struct dt_object *dt,
1688                                   const struct lu_buf *buf,
1689                                   const char *name, int fl, struct thandle *th,
1690                                   struct lustre_capa *capa)
1691 {
1692         struct dt_object        *next = dt_object_child(dt);
1693         struct lod_object       *lo = lod_dt_obj(dt);
1694         int                     rc;
1695         int                     i;
1696         ENTRY;
1697
1698         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1699         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1700                 RETURN(rc);
1701
1702         if (lo->ldo_stripenr == 0)
1703                 RETURN(rc);
1704
1705         for (i = 0; i < lo->ldo_stripenr; i++) {
1706                 LASSERT(lo->ldo_stripe[i]);
1707                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1708                                   capa);
1709                 if (rc != 0)
1710                         break;
1711         }
1712
1713         RETURN(rc);
1714 }
1715
1716 static int lod_xattr_del_internal(const struct lu_env *env,
1717                                   struct dt_object *dt,
1718                                   const char *name, struct thandle *th,
1719                                   struct lustre_capa *capa)
1720 {
1721         struct dt_object        *next = dt_object_child(dt);
1722         struct lod_object       *lo = lod_dt_obj(dt);
1723         int                     rc;
1724         int                     i;
1725         ENTRY;
1726
1727         rc = dt_xattr_del(env, next, name, th, capa);
1728         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1729                 RETURN(rc);
1730
1731         if (lo->ldo_stripenr == 0)
1732                 RETURN(rc);
1733
1734         for (i = 0; i < lo->ldo_stripenr; i++) {
1735                 LASSERT(lo->ldo_stripe[i]);
1736                 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1737                                   capa);
1738                 if (rc != 0)
1739                         break;
1740         }
1741
1742         RETURN(rc);
1743 }
1744
1745 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1746                                     struct dt_object *dt,
1747                                     const struct lu_buf *buf,
1748                                     const char *name, int fl,
1749                                     struct thandle *th,
1750                                     struct lustre_capa *capa)
1751 {
1752         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1753         struct lod_object       *l = lod_dt_obj(dt);
1754         struct lov_user_md_v1   *lum;
1755         struct lov_user_md_v3   *v3 = NULL;
1756         int                      rc;
1757         ENTRY;
1758
1759         /* If it is striped dir, we should clear the stripe cache for
1760          * slave stripe as well, but there are no effective way to
1761          * notify the LOD on the slave MDT, so we do not cache stripe
1762          * information for slave stripe for now. XXX*/
1763         lod_lov_stripe_cache_clear(l);
1764         LASSERT(buf != NULL && buf->lb_buf != NULL);
1765         lum = buf->lb_buf;
1766
1767         rc = lod_verify_striping(d, buf, 0);
1768         if (rc)
1769                 RETURN(rc);
1770
1771         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1772                 v3 = buf->lb_buf;
1773
1774         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1775          * (i.e. all default values specified) then delete default
1776          * striping from dir. */
1777         CDEBUG(D_OTHER,
1778                 "set default striping: sz %u # %u offset %d %s %s\n",
1779                 (unsigned)lum->lmm_stripe_size,
1780                 (unsigned)lum->lmm_stripe_count,
1781                 (int)lum->lmm_stripe_offset,
1782                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1783
1784         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1785                                 (lum->lmm_stripe_count),
1786                                 (lum->lmm_stripe_offset)) &&
1787                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1788                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1789                 if (rc == -ENODATA)
1790                         rc = 0;
1791         } else {
1792                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1793         }
1794
1795         RETURN(rc);
1796 }
1797
1798 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1799                                             struct dt_object *dt,
1800                                             const struct lu_buf *buf,
1801                                             const char *name, int fl,
1802                                             struct thandle *th,
1803                                             struct lustre_capa *capa)
1804 {
1805         struct lod_object       *l = lod_dt_obj(dt);
1806         struct lmv_user_md_v1   *lum;
1807         int                      rc;
1808         ENTRY;
1809
1810         LASSERT(buf != NULL && buf->lb_buf != NULL);
1811         lum = buf->lb_buf;
1812
1813         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1814               le32_to_cpu(lum->lum_stripe_count),
1815               (int)le32_to_cpu(lum->lum_stripe_offset));
1816
1817         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1818                                  le32_to_cpu(lum->lum_stripe_offset)) &&
1819                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1820                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1821                 if (rc == -ENODATA)
1822                         rc = 0;
1823         } else {
1824                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1825                 if (rc != 0)
1826                         RETURN(rc);
1827         }
1828
1829         /* Update default stripe cache */
1830         if (l->ldo_dir_stripe == NULL) {
1831                 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1832                 if (l->ldo_dir_stripe == NULL)
1833                         RETURN(-ENOMEM);
1834         }
1835
1836         l->ldo_dir_striping_cached = 0;
1837         l->ldo_dir_def_striping_set = 1;
1838         l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
1839
1840         RETURN(rc);
1841 }
1842
1843 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1844                              const struct lu_buf *buf, const char *name,
1845                              int fl, struct thandle *th,
1846                              struct lustre_capa *capa)
1847 {
1848         struct lod_object       *lo = lod_dt_obj(dt);
1849         struct lod_thread_info  *info = lod_env_info(env);
1850         struct lu_attr          *attr = &info->lti_attr;
1851         struct dt_object_format *dof = &info->lti_format;
1852         struct lu_buf           lmv_buf;
1853         struct lu_buf           slave_lmv_buf;
1854         struct lmv_mds_md_v1    *lmm;
1855         struct lmv_mds_md_v1    *slave_lmm = NULL;
1856         int                     i;
1857         int                     rc;
1858         ENTRY;
1859
1860         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1861                 RETURN(-ENOTDIR);
1862
1863         /* The stripes are supposed to be allocated in declare phase,
1864          * if there are no stripes being allocated, it will skip */
1865         if (lo->ldo_stripenr == 0)
1866                 RETURN(0);
1867
1868         rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
1869         if (rc != 0)
1870                 RETURN(rc);
1871
1872         attr->la_valid = LA_TYPE | LA_MODE;
1873         dof->dof_type = DFT_DIR;
1874
1875         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1876         if (rc != 0)
1877                 RETURN(rc);
1878         lmm = lmv_buf.lb_buf;
1879
1880         OBD_ALLOC_PTR(slave_lmm);
1881         if (slave_lmm == NULL)
1882                 RETURN(-ENOMEM);
1883
1884         lod_prep_slave_lmv_md(slave_lmm, lmm);
1885         slave_lmv_buf.lb_buf = slave_lmm;
1886         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1887
1888         for (i = 0; i < lo->ldo_stripenr; i++) {
1889                 struct dt_object *dto;
1890                 char             *stripe_name = info->lti_key;
1891
1892                 dto = lo->ldo_stripe[i];
1893                 dt_write_lock(env, dto, MOR_TGT_CHILD);
1894                 rc = dt_create(env, dto, attr, NULL, dof, th);
1895                 dt_write_unlock(env, dto);
1896                 if (rc != 0)
1897                         RETURN(rc);
1898
1899                 rc = dt_insert(env, dto,
1900                               (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1901                               (const struct dt_key *)dot, th, capa, 0);
1902                 if (rc != 0)
1903                         RETURN(rc);
1904
1905                 rc = dt_insert(env, dto,
1906                               (struct dt_rec *)lu_object_fid(&dt->do_lu),
1907                               (const struct dt_key *)dotdot, th, capa, 0);
1908                 if (rc != 0)
1909                         RETURN(rc);
1910
1911                 if (lo->ldo_striping_cached &&
1912                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1913                                          lo->ldo_def_stripenr,
1914                                          lo->ldo_def_stripe_offset)) {
1915                         struct lov_user_md_v3   *v3;
1916
1917                         /* sigh, lti_ea_store has been used for lmv_buf,
1918                          * so we have to allocate buffer for default
1919                          * stripe EA */
1920                         OBD_ALLOC_PTR(v3);
1921                         if (v3 == NULL)
1922                                 GOTO(out, rc);
1923
1924                         memset(v3, 0, sizeof(*v3));
1925                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1926                         v3->lmm_stripe_count =
1927                                 cpu_to_le16(lo->ldo_def_stripenr);
1928                         v3->lmm_stripe_offset =
1929                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1930                         v3->lmm_stripe_size =
1931                                 cpu_to_le32(lo->ldo_def_stripe_size);
1932                         if (lo->ldo_pool)
1933                                 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1934                                         LOV_MAXPOOLNAME);
1935
1936                         info->lti_buf.lb_buf = v3;
1937                         info->lti_buf.lb_len = sizeof(*v3);
1938                         rc = dt_xattr_set(env, dto, &info->lti_buf,
1939                                           XATTR_NAME_LOV, 0, th, capa);
1940                         OBD_FREE_PTR(v3);
1941                         if (rc != 0)
1942                                 GOTO(out, rc);
1943                 }
1944
1945                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1946                 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
1947                                   fl, th, capa);
1948                 if (rc != 0)
1949                         GOTO(out, rc);
1950
1951                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1952                          PFID(lu_object_fid(&dto->do_lu)), i);
1953                 rc = dt_insert(env, dt_object_child(dt),
1954                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1955                      (const struct dt_key *)stripe_name, th, capa, 0);
1956                 if (rc != 0)
1957                         GOTO(out, rc);
1958
1959                 rc = dt_ref_add(env, dt_object_child(dt), th);
1960                 if (rc != 0)
1961                         GOTO(out, rc);
1962         }
1963
1964         rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
1965                           fl, th, capa);
1966
1967 out:
1968         if (slave_lmm != NULL)
1969                 OBD_FREE_PTR(slave_lmm);
1970
1971         RETURN(rc);
1972 }
1973
1974 int lod_dir_striping_create_internal(const struct lu_env *env,
1975                                      struct dt_object *dt,
1976                                      struct lu_attr *attr,
1977                                      struct dt_object_format *dof,
1978                                      struct thandle *th,
1979                                      bool declare)
1980 {
1981         struct lod_thread_info  *info = lod_env_info(env);
1982         struct lod_object       *lo = lod_dt_obj(dt);
1983         int                     rc;
1984         ENTRY;
1985
1986         if (lo->ldo_dir_def_striping_set &&
1987             !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1988                                  lo->ldo_dir_stripe_offset)) {
1989                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1990                 int stripe_count = lo->ldo_stripenr;
1991
1992                 if (info->lti_ea_store_size < sizeof(*v1)) {
1993                         rc = lod_ea_store_resize(info, sizeof(*v1));
1994                         if (rc != 0)
1995                                 RETURN(rc);
1996                         v1 = info->lti_ea_store;
1997                 }
1998
1999                 memset(v1, 0, sizeof(*v1));
2000                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2001                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2002                 v1->lum_stripe_offset =
2003                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
2004
2005                 info->lti_buf.lb_buf = v1;
2006                 info->lti_buf.lb_len = sizeof(*v1);
2007
2008                 if (declare)
2009                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
2010                                                        &info->lti_buf, dof, th);
2011                 else
2012                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2013                                                XATTR_NAME_LMV, 0, th,
2014                                                BYPASS_CAPA);
2015                 if (rc != 0)
2016                         RETURN(rc);
2017         }
2018
2019         /* Transfer default LMV striping from the parent */
2020         if (lo->ldo_dir_striping_cached &&
2021             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2022                                  lo->ldo_dir_def_stripe_offset)) {
2023                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2024                 int def_stripe_count = lo->ldo_dir_def_stripenr;
2025
2026                 if (info->lti_ea_store_size < sizeof(*v1)) {
2027                         rc = lod_ea_store_resize(info, sizeof(*v1));
2028                         if (rc != 0)
2029                                 RETURN(rc);
2030                         v1 = info->lti_ea_store;
2031                 }
2032
2033                 memset(v1, 0, sizeof(*v1));
2034                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2035                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2036                 v1->lum_stripe_offset =
2037                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2038                 v1->lum_hash_type =
2039                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
2040
2041                 info->lti_buf.lb_buf = v1;
2042                 info->lti_buf.lb_len = sizeof(*v1);
2043                 if (declare)
2044                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2045                                                        XATTR_NAME_DEFAULT_LMV,
2046                                                        0, th);
2047                 else
2048                         rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2049                                                   &info->lti_buf,
2050                                                   XATTR_NAME_DEFAULT_LMV, 0,
2051                                                   th, BYPASS_CAPA);
2052                 if (rc != 0)
2053                         RETURN(rc);
2054         }
2055
2056         /* Transfer default LOV striping from the parent */
2057         if (lo->ldo_striping_cached &&
2058             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2059                                  lo->ldo_def_stripenr,
2060                                  lo->ldo_def_stripe_offset)) {
2061                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2062
2063                 if (info->lti_ea_store_size < sizeof(*v3)) {
2064                         rc = lod_ea_store_resize(info, sizeof(*v3));
2065                         if (rc != 0)
2066                                 RETURN(rc);
2067                         v3 = info->lti_ea_store;
2068                 }
2069
2070                 memset(v3, 0, sizeof(*v3));
2071                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2072                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2073                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2074                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2075                 if (lo->ldo_pool)
2076                         strncpy(v3->lmm_pool_name, lo->ldo_pool,
2077                                 LOV_MAXPOOLNAME);
2078
2079                 info->lti_buf.lb_buf = v3;
2080                 info->lti_buf.lb_len = sizeof(*v3);
2081
2082                 if (declare)
2083                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2084                                                        XATTR_NAME_LOV, 0, th);
2085                 else
2086                         rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2087                                                       XATTR_NAME_LOV, 0, th,
2088                                                       BYPASS_CAPA);
2089                 if (rc != 0)
2090                         RETURN(rc);
2091         }
2092
2093         RETURN(0);
2094 }
2095
2096 static int lod_declare_dir_striping_create(const struct lu_env *env,
2097                                            struct dt_object *dt,
2098                                            struct lu_attr *attr,
2099                                            struct dt_object_format *dof,
2100                                            struct thandle *th)
2101 {
2102         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2103 }
2104
2105 static int lod_dir_striping_create(const struct lu_env *env,
2106                                    struct dt_object *dt,
2107                                    struct lu_attr *attr,
2108                                    struct dt_object_format *dof,
2109                                    struct thandle *th)
2110 {
2111         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2112 }
2113
2114 static int lod_xattr_set(const struct lu_env *env,
2115                          struct dt_object *dt, const struct lu_buf *buf,
2116                          const char *name, int fl, struct thandle *th,
2117                          struct lustre_capa *capa)
2118 {
2119         struct dt_object        *next = dt_object_child(dt);
2120         int                      rc;
2121         ENTRY;
2122
2123         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2124             strcmp(name, XATTR_NAME_LMV) == 0) {
2125                 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2126
2127                 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2128                                                 LMV_HASH_FLAG_MIGRATION)
2129                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2130                 else
2131                         rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2132
2133                 RETURN(rc);
2134         }
2135
2136         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2137             strcmp(name, XATTR_NAME_LOV) == 0) {
2138                 /* default LOVEA */
2139                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2140                 RETURN(rc);
2141         } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2142                    strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2143                 /* default LMVEA */
2144                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2145                                                       th, capa);
2146                 RETURN(rc);
2147         } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2148                    !strcmp(name, XATTR_NAME_LOV)) {
2149                 /* in case of lov EA swap, just set it
2150                  * if not, it is a replay so check striping match what we
2151                  * already have during req replay, declare_xattr_set()
2152                  * defines striping, then create() does the work
2153                 */
2154                 if (fl & LU_XATTR_REPLACE) {
2155                         /* free stripes, then update disk */
2156                         lod_object_free_striping(env, lod_dt_obj(dt));
2157                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2158                 } else {
2159                         rc = lod_striping_create(env, dt, NULL, NULL, th);
2160                 }
2161                 RETURN(rc);
2162         }
2163
2164         /* then all other xattr */
2165         rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2166
2167         RETURN(rc);
2168 }
2169
2170 static int lod_declare_xattr_del(const struct lu_env *env,
2171                                  struct dt_object *dt, const char *name,
2172                                  struct thandle *th)
2173 {
2174         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2175 }
2176
2177 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2178                          const char *name, struct thandle *th,
2179                          struct lustre_capa *capa)
2180 {
2181         if (!strcmp(name, XATTR_NAME_LOV))
2182                 lod_object_free_striping(env, lod_dt_obj(dt));
2183         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2184 }
2185
2186 static int lod_xattr_list(const struct lu_env *env,
2187                           struct dt_object *dt, struct lu_buf *buf,
2188                           struct lustre_capa *capa)
2189 {
2190         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2191 }
2192
2193 int lod_object_set_pool(struct lod_object *o, char *pool)
2194 {
2195         int len;
2196
2197         if (o->ldo_pool) {
2198                 len = strlen(o->ldo_pool);
2199                 OBD_FREE(o->ldo_pool, len + 1);
2200                 o->ldo_pool = NULL;
2201         }
2202         if (pool) {
2203                 len = strlen(pool);
2204                 OBD_ALLOC(o->ldo_pool, len + 1);
2205                 if (o->ldo_pool == NULL)
2206                         return -ENOMEM;
2207                 strcpy(o->ldo_pool, pool);
2208         }
2209         return 0;
2210 }
2211
2212 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2213 {
2214         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2215 }
2216
2217
2218 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2219                                          struct lod_object *lp)
2220 {
2221         struct lod_thread_info  *info = lod_env_info(env);
2222         struct lov_user_md_v1   *v1 = NULL;
2223         struct lov_user_md_v3   *v3 = NULL;
2224         int                      rc;
2225         ENTRY;
2226
2227         /* called from MDD without parent being write locked,
2228          * lock it here */
2229         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2230         rc = lod_get_lov_ea(env, lp);
2231         if (rc < 0)
2232                 GOTO(unlock, rc);
2233
2234         if (rc < sizeof(struct lov_user_md)) {
2235                 /* don't lookup for non-existing or invalid striping */
2236                 lp->ldo_def_striping_set = 0;
2237                 lp->ldo_striping_cached = 1;
2238                 lp->ldo_def_stripe_size = 0;
2239                 lp->ldo_def_stripenr = 0;
2240                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2241                 GOTO(unlock, rc = 0);
2242         }
2243
2244         rc = 0;
2245         v1 = info->lti_ea_store;
2246         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
2247                 lustre_swab_lov_user_md_v1(v1);
2248         else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
2249                 lustre_swab_lov_user_md_v3(v3);
2250
2251         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2252                 GOTO(unlock, rc = 0);
2253
2254         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2255                 GOTO(unlock, rc = 0);
2256
2257         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2258                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2259                (int)v1->lmm_stripe_count,
2260                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2261
2262         lp->ldo_def_stripenr = v1->lmm_stripe_count;
2263         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2264         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2265         lp->ldo_striping_cached = 1;
2266         lp->ldo_def_striping_set = 1;
2267         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2268                 /* XXX: sanity check here */
2269                 v3 = (struct lov_user_md_v3 *) v1;
2270                 if (v3->lmm_pool_name[0])
2271                         lod_object_set_pool(lp, v3->lmm_pool_name);
2272         }
2273         EXIT;
2274 unlock:
2275         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2276         return rc;
2277 }
2278
2279
2280 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2281                                          struct lod_object *lp)
2282 {
2283         struct lod_thread_info  *info = lod_env_info(env);
2284         struct lmv_user_md_v1   *v1 = NULL;
2285         int                      rc;
2286         ENTRY;
2287
2288         /* called from MDD without parent being write locked,
2289          * lock it here */
2290         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2291         rc = lod_get_default_lmv_ea(env, lp);
2292         if (rc < 0)
2293                 GOTO(unlock, rc);
2294
2295         if (rc < sizeof(struct lmv_user_md)) {
2296                 /* don't lookup for non-existing or invalid striping */
2297                 lp->ldo_dir_def_striping_set = 0;
2298                 lp->ldo_dir_striping_cached = 1;
2299                 lp->ldo_dir_def_stripenr = 0;
2300                 lp->ldo_dir_def_stripe_offset =
2301                                         (typeof(v1->lum_stripe_offset))(-1);
2302                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2303                 GOTO(unlock, rc = 0);
2304         }
2305
2306         rc = 0;
2307         v1 = info->lti_ea_store;
2308
2309         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2310         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2311         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2312         lp->ldo_dir_def_striping_set = 1;
2313         lp->ldo_dir_striping_cached = 1;
2314
2315         EXIT;
2316 unlock:
2317         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2318         return rc;
2319 }
2320
2321 static int lod_cache_parent_striping(const struct lu_env *env,
2322                                      struct lod_object *lp,
2323                                      umode_t child_mode)
2324 {
2325         int rc = 0;
2326         ENTRY;
2327
2328         rc = lod_load_striping(env, lp);
2329         if (rc != 0)
2330                 RETURN(rc);
2331
2332         if (!lp->ldo_striping_cached) {
2333                 /* we haven't tried to get default striping for
2334                  * the directory yet, let's cache it in the object */
2335                 rc = lod_cache_parent_lov_striping(env, lp);
2336                 if (rc != 0)
2337                         RETURN(rc);
2338         }
2339
2340         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2341                 rc = lod_cache_parent_lmv_striping(env, lp);
2342
2343         RETURN(rc);
2344 }
2345
2346 /**
2347  * used to transfer default striping data to the object being created
2348  */
2349 static void lod_ah_init(const struct lu_env *env,
2350                         struct dt_allocation_hint *ah,
2351                         struct dt_object *parent,
2352                         struct dt_object *child,
2353                         umode_t child_mode)
2354 {
2355         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2356         struct dt_object  *nextp = NULL;
2357         struct dt_object  *nextc;
2358         struct lod_object *lp = NULL;
2359         struct lod_object *lc;
2360         struct lov_desc   *desc;
2361         int               rc;
2362         ENTRY;
2363
2364         LASSERT(child);
2365
2366         if (likely(parent)) {
2367                 nextp = dt_object_child(parent);
2368                 lp = lod_dt_obj(parent);
2369                 rc = lod_load_striping(env, lp);
2370                 if (rc != 0)
2371                         return;
2372         }
2373
2374         nextc = dt_object_child(child);
2375         lc = lod_dt_obj(child);
2376
2377         LASSERT(lc->ldo_stripenr == 0);
2378         LASSERT(lc->ldo_stripe == NULL);
2379
2380         /*
2381          * local object may want some hints
2382          * in case of late striping creation, ->ah_init()
2383          * can be called with local object existing
2384          */
2385         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2386                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2387                                           NULL : nextp, nextc, child_mode);
2388
2389         if (S_ISDIR(child_mode)) {
2390                 if (lc->ldo_dir_stripe == NULL) {
2391                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2392                         if (lc->ldo_dir_stripe == NULL)
2393                                 return;
2394                 }
2395
2396                 if (lp->ldo_dir_stripe == NULL) {
2397                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2398                         if (lp->ldo_dir_stripe == NULL)
2399                                 return;
2400                 }
2401
2402                 rc = lod_cache_parent_striping(env, lp, child_mode);
2403                 if (rc != 0)
2404                         return;
2405
2406                 /* transfer defaults to new directory */
2407                 if (lp->ldo_striping_cached) {
2408                         if (lp->ldo_pool)
2409                                 lod_object_set_pool(lc, lp->ldo_pool);
2410                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2411                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2412                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2413                         lc->ldo_striping_cached = 1;
2414                         lc->ldo_def_striping_set = 1;
2415                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2416                                (int)lc->ldo_def_stripe_size,
2417                                (int)lc->ldo_def_stripe_offset,
2418                                (int)lc->ldo_def_stripenr);
2419                 }
2420
2421                 /* transfer dir defaults to new directory */
2422                 if (lp->ldo_dir_striping_cached) {
2423                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2424                         lc->ldo_dir_def_stripe_offset =
2425                                                   lp->ldo_dir_def_stripe_offset;
2426                         lc->ldo_dir_def_hash_type =
2427                                                   lp->ldo_dir_def_hash_type;
2428                         lc->ldo_dir_striping_cached = 1;
2429                         lc->ldo_dir_def_striping_set = 1;
2430                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2431                                (int)lc->ldo_dir_def_stripenr,
2432                                (int)lc->ldo_dir_def_stripe_offset,
2433                                lc->ldo_dir_def_hash_type);
2434                 }
2435
2436                 /* If the directory is specified with certain stripes */
2437                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2438                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2439
2440                         rc = lod_verify_md_striping(d, lum1);
2441                         if (rc == 0 &&
2442                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2443                                 /* Directory will be striped only if
2444                                  * stripe_count > 1 */
2445                                 lc->ldo_stripenr =
2446                                         le32_to_cpu(lum1->lum_stripe_count);
2447                                 lc->ldo_dir_stripe_offset =
2448                                         le32_to_cpu(lum1->lum_stripe_offset);
2449                                 lc->ldo_dir_hash_type =
2450                                         le32_to_cpu(lum1->lum_hash_type);
2451                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2452                                        lc->ldo_stripenr,
2453                                        (int)lc->ldo_dir_stripe_offset);
2454                         }
2455                 } else if (lp->ldo_dir_def_striping_set) {
2456                         /* If there are default dir stripe from parent */
2457                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2458                         lc->ldo_dir_stripe_offset =
2459                                         lp->ldo_dir_def_stripe_offset;
2460                         lc->ldo_dir_hash_type =
2461                                         lp->ldo_dir_def_hash_type;
2462                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2463                                lc->ldo_stripenr,
2464                                (int)lc->ldo_dir_stripe_offset);
2465                 } else {
2466                         /* set default stripe for this directory */
2467                         lc->ldo_stripenr = 0;
2468                         lc->ldo_dir_stripe_offset = -1;
2469                 }
2470
2471                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2472                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2473
2474                 goto out;
2475         }
2476
2477         /*
2478          * if object is going to be striped over OSTs, transfer default
2479          * striping information to the child, so that we can use it
2480          * during declaration and creation
2481          */
2482         if (!lod_object_will_be_striped(S_ISREG(child_mode),
2483                                         lu_object_fid(&child->do_lu)))
2484                 goto out;
2485         /*
2486          * try from the parent
2487          */
2488         if (likely(parent)) {
2489                 lod_cache_parent_striping(env, lp, child_mode);
2490
2491                 lc->ldo_def_stripe_offset = (__u16) -1;
2492
2493                 if (lp->ldo_def_striping_set) {
2494                         if (lp->ldo_pool)
2495                                 lod_object_set_pool(lc, lp->ldo_pool);
2496                         lc->ldo_stripenr = lp->ldo_def_stripenr;
2497                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2498                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2499                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2500                                lc->ldo_stripenr, lc->ldo_stripe_size,
2501                                lp->ldo_pool ? lp->ldo_pool : "");
2502                 }
2503         }
2504
2505         /*
2506          * if the parent doesn't provide with specific pattern, grab fs-wide one
2507          */
2508         desc = &d->lod_desc;
2509         if (lc->ldo_stripenr == 0)
2510                 lc->ldo_stripenr = desc->ld_default_stripe_count;
2511         if (lc->ldo_stripe_size == 0)
2512                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2513         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2514                lc->ldo_stripenr, lc->ldo_stripe_size,
2515                lc->ldo_pool ? lc->ldo_pool : "");
2516
2517 out:
2518         /* we do not cache stripe information for slave stripe, see
2519          * lod_xattr_set_lov_on_dir */
2520         if (lp != NULL && lp->ldo_dir_slave_stripe)
2521                 lod_lov_stripe_cache_clear(lp);
2522
2523         EXIT;
2524 }
2525
2526 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
2527 /*
2528  * this function handles a special case when truncate was done
2529  * on a stripeless object and now striping is being created
2530  * we can't lose that size, so we have to propagate it to newly
2531  * created object
2532  */
2533 static int lod_declare_init_size(const struct lu_env *env,
2534                                  struct dt_object *dt, struct thandle *th)
2535 {
2536         struct dt_object   *next = dt_object_child(dt);
2537         struct lod_object  *lo = lod_dt_obj(dt);
2538         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
2539         uint64_t            size, offs;
2540         int                 rc, stripe;
2541         ENTRY;
2542
2543         /* XXX: we support the simplest (RAID0) striping so far */
2544         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2545         LASSERT(lo->ldo_stripe_size > 0);
2546
2547         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2548         LASSERT(attr->la_valid & LA_SIZE);
2549         if (rc)
2550                 RETURN(rc);
2551
2552         size = attr->la_size;
2553         if (size == 0)
2554                 RETURN(0);
2555
2556         /* ll_do_div64(a, b) returns a % b, and a = a / b */
2557         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2558         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2559
2560         size = size * lo->ldo_stripe_size;
2561         offs = attr->la_size;
2562         size += ll_do_div64(offs, lo->ldo_stripe_size);
2563
2564         attr->la_valid = LA_SIZE;
2565         attr->la_size = size;
2566
2567         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2568
2569         RETURN(rc);
2570 }
2571
2572 /**
2573  * Create declaration of striped object
2574  */
2575 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2576                                struct lu_attr *attr,
2577                                const struct lu_buf *lovea, struct thandle *th)
2578 {
2579         struct lod_thread_info  *info = lod_env_info(env);
2580         struct dt_object        *next = dt_object_child(dt);
2581         struct lod_object       *lo = lod_dt_obj(dt);
2582         int                      rc;
2583         ENTRY;
2584
2585         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2586                 /* failed to create striping, let's reset
2587                  * config so that others don't get confused */
2588                 lod_object_free_striping(env, lo);
2589                 GOTO(out, rc = -ENOMEM);
2590         }
2591
2592         if (!dt_object_remote(next)) {
2593                 /* choose OST and generate appropriate objects */
2594                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2595                 if (rc) {
2596                         /* failed to create striping, let's reset
2597                          * config so that others don't get confused */
2598                         lod_object_free_striping(env, lo);
2599                         GOTO(out, rc);
2600                 }
2601
2602                 /*
2603                  * declare storage for striping data
2604                  */
2605                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2606                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
2607         } else {
2608                 /* LOD can not choose OST objects for remote objects, i.e.
2609                  * stripes must be ready before that. Right now, it can only
2610                  * happen during migrate, i.e. migrate process needs to create
2611                  * remote regular file (mdd_migrate_create), then the migrate
2612                  * process will provide stripeEA. */
2613                 LASSERT(lovea != NULL);
2614                 info->lti_buf = *lovea;
2615         }
2616
2617         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2618                                   XATTR_NAME_LOV, 0, th);
2619         if (rc)
2620                 GOTO(out, rc);
2621
2622         /*
2623          * if striping is created with local object's size > 0,
2624          * we have to propagate this size to specific object
2625          * the case is possible only when local object was created previously
2626          */
2627         if (dt_object_exists(next))
2628                 rc = lod_declare_init_size(env, dt, th);
2629
2630 out:
2631         RETURN(rc);
2632 }
2633
2634 static int lod_declare_object_create(const struct lu_env *env,
2635                                      struct dt_object *dt,
2636                                      struct lu_attr *attr,
2637                                      struct dt_allocation_hint *hint,
2638                                      struct dt_object_format *dof,
2639                                      struct thandle *th)
2640 {
2641         struct dt_object   *next = dt_object_child(dt);
2642         struct lod_object  *lo = lod_dt_obj(dt);
2643         int                 rc;
2644         ENTRY;
2645
2646         LASSERT(dof);
2647         LASSERT(attr);
2648         LASSERT(th);
2649
2650         /*
2651          * first of all, we declare creation of local object
2652          */
2653         rc = dt_declare_create(env, next, attr, hint, dof, th);
2654         if (rc)
2655                 GOTO(out, rc);
2656
2657         if (dof->dof_type == DFT_SYM)
2658                 dt->do_body_ops = &lod_body_lnk_ops;
2659
2660         /*
2661          * it's lod_ah_init() who has decided the object will striped
2662          */
2663         if (dof->dof_type == DFT_REGULAR) {
2664                 /* callers don't want stripes */
2665                 /* XXX: all tricky interactions with ->ah_make_hint() decided
2666                  * to use striping, then ->declare_create() behaving differently
2667                  * should be cleaned */
2668                 if (dof->u.dof_reg.striped == 0)
2669                         lo->ldo_stripenr = 0;
2670                 if (lo->ldo_stripenr > 0)
2671                         rc = lod_declare_striped_object(env, dt, attr,
2672                                                         NULL, th);
2673         } else if (dof->dof_type == DFT_DIR) {
2674                 /* Orphan object (like migrating object) does not have
2675                  * lod_dir_stripe, see lod_ah_init */
2676                 if (lo->ldo_dir_stripe != NULL)
2677                         rc = lod_declare_dir_striping_create(env, dt, attr,
2678                                                              dof, th);
2679         }
2680 out:
2681         RETURN(rc);
2682 }
2683
2684 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2685                         struct lu_attr *attr, struct dt_object_format *dof,
2686                         struct thandle *th)
2687 {
2688         struct lod_object *lo = lod_dt_obj(dt);
2689         int                rc = 0, i;
2690         ENTRY;
2691
2692         LASSERT(lo->ldo_striping_cached == 0);
2693
2694         /* create all underlying objects */
2695         for (i = 0; i < lo->ldo_stripenr; i++) {
2696                 LASSERT(lo->ldo_stripe[i]);
2697                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2698
2699                 if (rc)
2700                         break;
2701         }
2702         if (rc == 0)
2703                 rc = lod_generate_and_set_lovea(env, lo, th);
2704
2705         RETURN(rc);
2706 }
2707
2708 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2709                              struct lu_attr *attr,
2710                              struct dt_allocation_hint *hint,
2711                              struct dt_object_format *dof, struct thandle *th)
2712 {
2713         struct dt_object   *next = dt_object_child(dt);
2714         struct lod_object  *lo = lod_dt_obj(dt);
2715         int                 rc;
2716         ENTRY;
2717
2718         /* create local object */
2719         rc = dt_create(env, next, attr, hint, dof, th);
2720         if (rc != 0)
2721                 RETURN(rc);
2722
2723         if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2724             lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2725                 rc = lod_striping_create(env, dt, attr, dof, th);
2726
2727         RETURN(rc);
2728 }
2729
2730 static int lod_declare_object_destroy(const struct lu_env *env,
2731                                       struct dt_object *dt,
2732                                       struct thandle *th)
2733 {
2734         struct dt_object   *next = dt_object_child(dt);
2735         struct lod_object  *lo = lod_dt_obj(dt);
2736         struct lod_thread_info *info = lod_env_info(env);
2737         char               *stripe_name = info->lti_key;
2738         int                 rc, i;
2739         ENTRY;
2740
2741         /*
2742          * load striping information, notice we don't do this when object
2743          * is being initialized as we don't need this information till
2744          * few specific cases like destroy, chown
2745          */
2746         rc = lod_load_striping(env, lo);
2747         if (rc)
2748                 RETURN(rc);
2749
2750         /* declare destroy for all underlying objects */
2751         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2752                 rc = next->do_ops->do_index_try(env, next,
2753                                                 &dt_directory_features);
2754                 if (rc != 0)
2755                         RETURN(rc);
2756
2757                 for (i = 0; i < lo->ldo_stripenr; i++) {
2758                         rc = dt_declare_ref_del(env, next, th);
2759                         if (rc != 0)
2760                                 RETURN(rc);
2761                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2762                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2763                                 i);
2764                         rc = dt_declare_delete(env, next,
2765                                         (const struct dt_key *)stripe_name, th);
2766                         if (rc != 0)
2767                                 RETURN(rc);
2768                 }
2769         }
2770         /*
2771          * we declare destroy for the local object
2772          */
2773         rc = dt_declare_destroy(env, next, th);
2774         if (rc)
2775                 RETURN(rc);
2776
2777         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2778                 RETURN(0);
2779
2780         /* declare destroy for all underlying objects */
2781         for (i = 0; i < lo->ldo_stripenr; i++) {
2782                 LASSERT(lo->ldo_stripe[i]);
2783                 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2784                 if (rc != 0)
2785                         break;
2786         }
2787
2788         RETURN(rc);
2789 }
2790
2791 static int lod_object_destroy(const struct lu_env *env,
2792                 struct dt_object *dt, struct thandle *th)
2793 {
2794         struct dt_object  *next = dt_object_child(dt);
2795         struct lod_object *lo = lod_dt_obj(dt);
2796         struct lod_thread_info *info = lod_env_info(env);
2797         char               *stripe_name = info->lti_key;
2798         int                rc, i;
2799         ENTRY;
2800
2801         /* destroy sub-stripe of master object */
2802         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2803                 rc = next->do_ops->do_index_try(env, next,
2804                                                 &dt_directory_features);
2805                 if (rc != 0)
2806                         RETURN(rc);
2807
2808                 for (i = 0; i < lo->ldo_stripenr; i++) {
2809                         rc = dt_ref_del(env, next, th);
2810                         if (rc != 0)
2811                                 RETURN(rc);
2812
2813                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2814                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2815                                 i);
2816
2817                         CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
2818                                PFID(lu_object_fid(&dt->do_lu)), stripe_name,
2819                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
2820
2821                         rc = dt_delete(env, next,
2822                                        (const struct dt_key *)stripe_name,
2823                                        th, BYPASS_CAPA);
2824                         if (rc != 0)
2825                                 RETURN(rc);
2826                 }
2827         }
2828         rc = dt_destroy(env, next, th);
2829         if (rc != 0)
2830                 RETURN(rc);
2831
2832         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2833                 RETURN(0);
2834
2835         /* destroy all striped objects */
2836         for (i = 0; i < lo->ldo_stripenr; i++) {
2837                 LASSERT(lo->ldo_stripe[i]);
2838                 rc = dt_destroy(env, lo->ldo_stripe[i], th);
2839                 if (rc != 0)
2840                         break;
2841         }
2842
2843         RETURN(rc);
2844 }
2845
2846 static int lod_declare_ref_add(const struct lu_env *env,
2847                                struct dt_object *dt, struct thandle *th)
2848 {
2849         return dt_declare_ref_add(env, dt_object_child(dt), th);
2850 }
2851
2852 static int lod_ref_add(const struct lu_env *env,
2853                        struct dt_object *dt, struct thandle *th)
2854 {
2855         return dt_ref_add(env, dt_object_child(dt), th);
2856 }
2857
2858 static int lod_declare_ref_del(const struct lu_env *env,
2859                                struct dt_object *dt, struct thandle *th)
2860 {
2861         return dt_declare_ref_del(env, dt_object_child(dt), th);
2862 }
2863
2864 static int lod_ref_del(const struct lu_env *env,
2865                        struct dt_object *dt, struct thandle *th)
2866 {
2867         return dt_ref_del(env, dt_object_child(dt), th);
2868 }
2869
2870 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2871                                      struct dt_object *dt,
2872                                      struct lustre_capa *old, __u64 opc)
2873 {
2874         return dt_capa_get(env, dt_object_child(dt), old, opc);
2875 }
2876
2877 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
2878                            __u64 start, __u64 end)
2879 {
2880         return dt_object_sync(env, dt_object_child(dt), start, end);
2881 }
2882
2883 struct lod_slave_locks  {
2884         int                     lsl_lock_count;
2885         struct lustre_handle    lsl_handle[0];
2886 };
2887
2888 static int lod_object_unlock_internal(const struct lu_env *env,
2889                                       struct dt_object *dt,
2890                                       struct ldlm_enqueue_info *einfo,
2891                                       ldlm_policy_data_t *policy)
2892 {
2893         struct lod_object       *lo = lod_dt_obj(dt);
2894         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2895         int                     rc = 0;
2896         int                     i;
2897         ENTRY;
2898
2899         if (slave_locks == NULL)
2900                 RETURN(0);
2901
2902         for (i = 1; i < slave_locks->lsl_lock_count; i++) {
2903                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2904                         int     rc1;
2905
2906                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2907                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2908                                                policy);
2909                         if (rc1 < 0)
2910                                 rc = rc == 0 ? rc1 : rc;
2911                 }
2912         }
2913
2914         RETURN(rc);
2915 }
2916
2917 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2918                              struct ldlm_enqueue_info *einfo,
2919                              union ldlm_policy_data *policy)
2920 {
2921         struct lod_object       *lo = lod_dt_obj(dt);
2922         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2923         int                     slave_locks_size;
2924         int                     rc;
2925         ENTRY;
2926
2927         if (slave_locks == NULL)
2928                 RETURN(0);
2929
2930         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2931                 RETURN(-ENOTDIR);
2932
2933         rc = lod_load_striping(env, lo);
2934         if (rc != 0)
2935                 RETURN(rc);
2936
2937         /* Note: for remote lock for single stripe dir, MDT will cancel
2938          * the lock by lockh directly */
2939         if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
2940                 RETURN(0);
2941
2942         /* Only cancel slave lock for striped dir */
2943         rc = lod_object_unlock_internal(env, dt, einfo, policy);
2944
2945         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2946                            sizeof(slave_locks->lsl_handle[0]);
2947         OBD_FREE(slave_locks, slave_locks_size);
2948         einfo->ei_cbdata = NULL;
2949
2950         RETURN(rc);
2951 }
2952
2953 static int lod_object_lock(const struct lu_env *env,
2954                            struct dt_object *dt,
2955                            struct lustre_handle *lh,
2956                            struct ldlm_enqueue_info *einfo,
2957                            union ldlm_policy_data *policy)
2958 {
2959         struct lod_object       *lo = lod_dt_obj(dt);
2960         int                     rc = 0;
2961         int                     i;
2962         int                     slave_locks_size;
2963         struct lod_slave_locks  *slave_locks = NULL;
2964         ENTRY;
2965
2966         /* remote object lock */
2967         if (!einfo->ei_enq_slave) {
2968                 LASSERT(dt_object_remote(dt));
2969                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2970                                       policy);
2971         }
2972
2973         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2974                 RETURN(-ENOTDIR);
2975
2976         rc = lod_load_striping(env, lo);
2977         if (rc != 0)
2978                 RETURN(rc);
2979
2980         /* No stripes */
2981         if (lo->ldo_stripenr <= 1)
2982                 RETURN(0);
2983
2984         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2985                            sizeof(slave_locks->lsl_handle[0]);
2986         /* Freed in lod_object_unlock */
2987         OBD_ALLOC(slave_locks, slave_locks_size);
2988         if (slave_locks == NULL)
2989                 RETURN(-ENOMEM);
2990         slave_locks->lsl_lock_count = lo->ldo_stripenr;
2991
2992         /* striped directory lock */
2993         for (i = 1; i < lo->ldo_stripenr; i++) {
2994                 struct lustre_handle    lockh;
2995                 struct ldlm_res_id      *res_id;
2996
2997                 res_id = &lod_env_info(env)->lti_res_id;
2998                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
2999                                        res_id);
3000                 einfo->ei_res_id = res_id;
3001
3002                 LASSERT(lo->ldo_stripe[i]);
3003                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3004                                     policy);
3005                 if (rc != 0)
3006                         GOTO(out, rc);
3007                 slave_locks->lsl_handle[i] = lockh;
3008         }
3009
3010         einfo->ei_cbdata = slave_locks;
3011
3012 out:
3013         if (rc != 0 && slave_locks != NULL) {
3014                 einfo->ei_cbdata = slave_locks;
3015                 lod_object_unlock_internal(env, dt, einfo, policy);
3016                 OBD_FREE(slave_locks, slave_locks_size);
3017                 einfo->ei_cbdata = NULL;
3018         }
3019
3020         RETURN(rc);
3021 }
3022
3023 struct dt_object_operations lod_obj_ops = {
3024         .do_read_lock           = lod_object_read_lock,
3025         .do_write_lock          = lod_object_write_lock,
3026         .do_read_unlock         = lod_object_read_unlock,
3027         .do_write_unlock        = lod_object_write_unlock,
3028         .do_write_locked        = lod_object_write_locked,
3029         .do_attr_get            = lod_attr_get,
3030         .do_declare_attr_set    = lod_declare_attr_set,
3031         .do_attr_set            = lod_attr_set,
3032         .do_xattr_get           = lod_xattr_get,
3033         .do_declare_xattr_set   = lod_declare_xattr_set,
3034         .do_xattr_set           = lod_xattr_set,
3035         .do_declare_xattr_del   = lod_declare_xattr_del,
3036         .do_xattr_del           = lod_xattr_del,
3037         .do_xattr_list          = lod_xattr_list,
3038         .do_ah_init             = lod_ah_init,
3039         .do_declare_create      = lod_declare_object_create,
3040         .do_create              = lod_object_create,
3041         .do_declare_destroy     = lod_declare_object_destroy,
3042         .do_destroy             = lod_object_destroy,
3043         .do_index_try           = lod_index_try,
3044         .do_declare_ref_add     = lod_declare_ref_add,
3045         .do_ref_add             = lod_ref_add,
3046         .do_declare_ref_del     = lod_declare_ref_del,
3047         .do_ref_del             = lod_ref_del,
3048         .do_capa_get            = lod_capa_get,
3049         .do_object_sync         = lod_object_sync,
3050         .do_object_lock         = lod_object_lock,
3051         .do_object_unlock       = lod_object_unlock,
3052 };
3053
3054 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3055                         struct lu_buf *buf, loff_t *pos,
3056                         struct lustre_capa *capa)
3057 {
3058         struct dt_object *next = dt_object_child(dt);
3059         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3060 }
3061
3062 static ssize_t lod_declare_write(const struct lu_env *env,
3063                                  struct dt_object *dt,
3064                                  const struct lu_buf *buf, loff_t pos,
3065                                  struct thandle *th)
3066 {
3067         return dt_declare_record_write(env, dt_object_child(dt),
3068                                        buf, pos, th);
3069 }
3070
3071 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3072                          const struct lu_buf *buf, loff_t *pos,
3073                          struct thandle *th, struct lustre_capa *capa, int iq)
3074 {
3075         struct dt_object *next = dt_object_child(dt);
3076         LASSERT(next);
3077         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3078 }
3079
3080 static const struct dt_body_operations lod_body_lnk_ops = {
3081         .dbo_read               = lod_read,
3082         .dbo_declare_write      = lod_declare_write,
3083         .dbo_write              = lod_write
3084 };
3085
3086 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3087                            const struct lu_object_conf *conf)
3088 {
3089         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
3090         struct lu_device        *cdev   = NULL;
3091         struct lu_object        *cobj;
3092         struct lod_tgt_descs    *ltd    = NULL;
3093         struct lod_tgt_desc     *tgt;
3094         mdsno_t                  idx    = 0;
3095         int                      type   = LU_SEQ_RANGE_ANY;
3096         int                      rc;
3097         ENTRY;
3098
3099         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3100         if (rc != 0)
3101                 RETURN(rc);
3102
3103         if (type == LU_SEQ_RANGE_MDT &&
3104             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3105                 cdev = &lod->lod_child->dd_lu_dev;
3106         } else if (type == LU_SEQ_RANGE_MDT) {
3107                 ltd = &lod->lod_mdt_descs;
3108                 lod_getref(ltd);
3109         } else if (type == LU_SEQ_RANGE_OST) {
3110                 ltd = &lod->lod_ost_descs;
3111                 lod_getref(ltd);
3112         } else {
3113                 LBUG();
3114         }
3115
3116         if (ltd != NULL) {
3117                 if (ltd->ltd_tgts_size > idx &&
3118                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3119                         tgt = LTD_TGT(ltd, idx);
3120
3121                         LASSERT(tgt != NULL);
3122                         LASSERT(tgt->ltd_tgt != NULL);
3123
3124                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
3125                 }
3126                 lod_putref(lod, ltd);
3127         }
3128
3129         if (unlikely(cdev == NULL))
3130                 RETURN(-ENOENT);
3131
3132         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3133         if (unlikely(cobj == NULL))
3134                 RETURN(-ENOMEM);
3135
3136         lu_object_add(lo, cobj);
3137
3138         RETURN(0);
3139 }
3140
3141 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3142 {
3143         int i;
3144
3145         if (lo->ldo_dir_stripe != NULL) {
3146                 OBD_FREE_PTR(lo->ldo_dir_stripe);
3147                 lo->ldo_dir_stripe = NULL;
3148         }
3149
3150         if (lo->ldo_stripe) {
3151                 LASSERT(lo->ldo_stripes_allocated > 0);
3152
3153                 for (i = 0; i < lo->ldo_stripenr; i++) {
3154                         if (lo->ldo_stripe[i])
3155                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3156                 }
3157
3158                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3159                 OBD_FREE(lo->ldo_stripe, i);
3160                 lo->ldo_stripe = NULL;
3161                 lo->ldo_stripes_allocated = 0;
3162         }
3163         lo->ldo_stripenr = 0;
3164         lo->ldo_pattern = 0;
3165 }
3166
3167 /*
3168  * ->start is called once all slices are initialized, including header's
3169  * cache for mode (object type). using the type we can initialize ops
3170  */
3171 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3172 {
3173         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3174                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3175         return 0;
3176 }
3177
3178 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3179 {
3180         struct lod_object *mo = lu2lod_obj(o);
3181
3182         /*
3183          * release all underlying object pinned
3184          */
3185
3186         lod_object_free_striping(env, mo);
3187
3188         lod_object_set_pool(mo, NULL);
3189
3190         lu_object_fini(o);
3191         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3192 }
3193
3194 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3195 {
3196         /* XXX: shouldn't we release everything here in case if object
3197          * creation failed before? */
3198 }
3199
3200 static int lod_object_print(const struct lu_env *env, void *cookie,
3201                             lu_printer_t p, const struct lu_object *l)
3202 {
3203         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3204
3205         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3206 }
3207
3208 struct lu_object_operations lod_lu_obj_ops = {
3209         .loo_object_init        = lod_object_init,
3210         .loo_object_start       = lod_object_start,
3211         .loo_object_free        = lod_object_free,
3212         .loo_object_release     = lod_object_release,
3213         .loo_object_print       = lod_object_print,
3214 };