Whamcloud - gitweb
d6dc2e5c5102cb87119b5307c1b2557f687311a7
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
48
49 #include "lod_internal.h"
50
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
53
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
56
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58                             struct dt_rec *rec, const struct dt_key *key,
59                             struct lustre_capa *capa)
60 {
61         struct dt_object *next = dt_object_child(dt);
62         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
63 }
64
65 static int lod_declare_index_insert(const struct lu_env *env,
66                                     struct dt_object *dt,
67                                     const struct dt_rec *rec,
68                                     const struct dt_key *key,
69                                     struct thandle *handle)
70 {
71         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
72 }
73
74 static int lod_index_insert(const struct lu_env *env,
75                             struct dt_object *dt,
76                             const struct dt_rec *rec,
77                             const struct dt_key *key,
78                             struct thandle *th,
79                             struct lustre_capa *capa,
80                             int ign)
81 {
82         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
83 }
84
85 static int lod_declare_index_delete(const struct lu_env *env,
86                                     struct dt_object *dt,
87                                     const struct dt_key *key,
88                                     struct thandle *th)
89 {
90         return dt_declare_delete(env, dt_object_child(dt), key, th);
91 }
92
93 static int lod_index_delete(const struct lu_env *env,
94                             struct dt_object *dt,
95                             const struct dt_key *key,
96                             struct thandle *th,
97                             struct lustre_capa *capa)
98 {
99         return dt_delete(env, dt_object_child(dt), key, th, capa);
100 }
101
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103                                  struct dt_object *dt, __u32 attr,
104                                  struct lustre_capa *capa)
105 {
106         struct dt_object        *next = dt_object_child(dt);
107         struct lod_it           *it = &lod_env_info(env)->lti_it;
108         struct dt_it            *it_next;
109
110
111         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
112         if (IS_ERR(it_next))
113                 return it_next;
114
115         /* currently we do not use more than one iterator per thread
116          * so we store it in thread info. if at some point we need
117          * more active iterators in a single thread, we can allocate
118          * additional ones */
119         LASSERT(it->lit_obj == NULL);
120
121         it->lit_it = it_next;
122         it->lit_obj = next;
123
124         return (struct dt_it *)it;
125 }
126
127 #define LOD_CHECK_IT(env, it)                                   \
128 do {                                                            \
129         LASSERT((it)->lit_obj != NULL);                         \
130         LASSERT((it)->lit_it != NULL);                          \
131 } while (0)
132
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
134 {
135         struct lod_it *it = (struct lod_it *)di;
136
137         LOD_CHECK_IT(env, it);
138         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
139
140         /* the iterator not in use any more */
141         it->lit_obj = NULL;
142         it->lit_it = NULL;
143 }
144
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146                const struct dt_key *key)
147 {
148         const struct lod_it *it = (const struct lod_it *)di;
149
150         LOD_CHECK_IT(env, it);
151         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
152 }
153
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
155 {
156         struct lod_it *it = (struct lod_it *)di;
157
158         LOD_CHECK_IT(env, it);
159         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
160 }
161
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
163 {
164         struct lod_it *it = (struct lod_it *)di;
165
166         LOD_CHECK_IT(env, it);
167         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
168 }
169
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
171 {
172         const struct lod_it *it = (const struct lod_it *)di;
173
174         LOD_CHECK_IT(env, it);
175         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
176 }
177
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
179 {
180         struct lod_it *it = (struct lod_it *)di;
181
182         LOD_CHECK_IT(env, it);
183         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
184 }
185
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187                struct dt_rec *rec, __u32 attr)
188 {
189         const struct lod_it *it = (const struct lod_it *)di;
190
191         LOD_CHECK_IT(env, it);
192         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
193                                                      attr);
194 }
195
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
197                     __u32 attr)
198 {
199         const struct lod_it *it = (const struct lod_it *)di;
200
201         LOD_CHECK_IT(env, it);
202         return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
203                                                           attr);
204 }
205
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
207 {
208         const struct lod_it *it = (const struct lod_it *)di;
209
210         LOD_CHECK_IT(env, it);
211         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
212 }
213
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
215 {
216         const struct lod_it *it = (const struct lod_it *)di;
217
218         LOD_CHECK_IT(env, it);
219         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
220 }
221
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
223                    void *key_rec)
224 {
225         const struct lod_it *it = (const struct lod_it *)di;
226
227         LOD_CHECK_IT(env, it);
228         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
229                                                          key_rec);
230 }
231
232 static struct dt_index_operations lod_index_ops = {
233         .dio_lookup             = lod_index_lookup,
234         .dio_declare_insert     = lod_declare_index_insert,
235         .dio_insert             = lod_index_insert,
236         .dio_declare_delete     = lod_declare_index_delete,
237         .dio_delete             = lod_index_delete,
238         .dio_it = {
239                 .init           = lod_it_init,
240                 .fini           = lod_it_fini,
241                 .get            = lod_it_get,
242                 .put            = lod_it_put,
243                 .next           = lod_it_next,
244                 .key            = lod_it_key,
245                 .key_size       = lod_it_key_size,
246                 .rec            = lod_it_rec,
247                 .rec_size       = lod_it_rec_size,
248                 .store          = lod_it_store,
249                 .load           = lod_it_load,
250                 .key_rec        = lod_it_key_rec,
251         }
252 };
253
254 /**
255  * Implementation of dt_index_operations:: dio_it.init
256  *
257  * This function is to initialize the iterator for striped directory,
258  * basically these lod_striped_it_xxx will just locate the stripe
259  * and call the correspondent api of its next lower layer.
260  *
261  * \param[in] env       execution environment.
262  * \param[in] dt        the striped directory object to be iterated.
263  * \param[in] attr      the attribute of iterator, mostly used to indicate
264  *                      the entry attribute in the object to be iterated.
265  * \param[in] capa      capability(useless in current implementation)
266  *
267  * \retval      initialized iterator(dt_it) if successful initialize the
268  *              iteration. lit_stripe_index will be used to indicate the
269  *              current iterate position among stripes.
270  * \retval      ERR pointer if initialization is failed.
271  */
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273                                          struct dt_object *dt, __u32 attr,
274                                          struct lustre_capa *capa)
275 {
276         struct lod_object       *lo = lod_dt_obj(dt);
277         struct dt_object        *next;
278         struct lod_it           *it = &lod_env_info(env)->lti_it;
279         struct dt_it            *it_next;
280         ENTRY;
281
282         LASSERT(lo->ldo_stripenr > 0);
283         next = lo->ldo_stripe[0];
284         LASSERT(next != NULL);
285         LASSERT(next->do_index_ops != NULL);
286
287         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
288         if (IS_ERR(it_next))
289                 return it_next;
290
291         /* currently we do not use more than one iterator per thread
292          * so we store it in thread info. if at some point we need
293          * more active iterators in a single thread, we can allocate
294          * additional ones */
295         LASSERT(it->lit_obj == NULL);
296
297         it->lit_stripe_index = 0;
298         it->lit_attr = attr;
299         it->lit_it = it_next;
300         it->lit_obj = dt;
301
302         return (struct dt_it *)it;
303 }
304
305 #define LOD_CHECK_STRIPED_IT(env, it, lo)                       \
306 do {                                                            \
307         LASSERT((it)->lit_obj != NULL);                         \
308         LASSERT((it)->lit_it != NULL);                          \
309         LASSERT((lo)->ldo_stripenr > 0);                        \
310         LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr);   \
311 } while (0)
312
313 /**
314  * Implementation of dt_index_operations:: dio_it.fini
315  *
316  * This function is to finish the iterator for striped directory.
317  *
318  * \param[in] env       execution environment.
319  * \param[in] di        the iterator for the striped directory
320  *
321  */
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
323 {
324         struct lod_it           *it = (struct lod_it *)di;
325         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
326         struct dt_object        *next;
327
328         LOD_CHECK_STRIPED_IT(env, it, lo);
329
330         next = lo->ldo_stripe[it->lit_stripe_index];
331         LASSERT(next != NULL);
332         LASSERT(next->do_index_ops != NULL);
333
334         next->do_index_ops->dio_it.fini(env, it->lit_it);
335
336         /* the iterator not in use any more */
337         it->lit_obj = NULL;
338         it->lit_it = NULL;
339         it->lit_stripe_index = 0;
340 }
341
342 /**
343  * Implementation of dt_index_operations:: dio_it.get
344  *
345  * This function is to position the iterator with given key
346  *
347  * \param[in] env       execution environment.
348  * \param[in] di        the iterator for striped directory.
349  * \param[in] key       the key the iterator will be positioned.
350  *
351  * \retval      0 if successfully position iterator by the key.
352  * \retval      negative error if position is failed.
353  */
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355                               const struct dt_key *key)
356 {
357         const struct lod_it     *it = (const struct lod_it *)di;
358         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
359         struct dt_object        *next;
360         ENTRY;
361
362         LOD_CHECK_STRIPED_IT(env, it, lo);
363
364         next = lo->ldo_stripe[it->lit_stripe_index];
365         LASSERT(next != NULL);
366         LASSERT(next->do_index_ops != NULL);
367
368         return next->do_index_ops->dio_it.get(env, it->lit_it, key);
369 }
370
371 /**
372  * Implementation of dt_index_operations:: dio_it.put
373  *
374  * This function is supposed to be the pair of it_get, but currently do
375  * nothing. see (osd_it_ea_put or osd_index_it_put)
376  */
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
378 {
379         struct lod_it           *it = (struct lod_it *)di;
380         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
381         struct dt_object        *next;
382
383         LOD_CHECK_STRIPED_IT(env, it, lo);
384
385         next = lo->ldo_stripe[it->lit_stripe_index];
386         LASSERT(next != NULL);
387         LASSERT(next->do_index_ops != NULL);
388
389         return next->do_index_ops->dio_it.put(env, it->lit_it);
390 }
391
392 /**
393  * Implementation of dt_index_operations:: dio_it.next
394  *
395  * This function is to position the iterator to the next entry, if current
396  * stripe is finished by checking the return value of next() in current
397  * stripe. it will go to next stripe. In the mean time, the sub-iterator
398  * for next stripe needs to be initialized.
399  *
400  * \param[in] env       execution environment.
401  * \param[in] di        the iterator for striped directory.
402  *
403  * \retval      0 if successfully position iterator to the next entry.
404  * \retval      negative error if position is failed.
405  */
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
407 {
408         struct lod_it           *it = (struct lod_it *)di;
409         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
410         struct dt_object        *next;
411         struct dt_it            *it_next;
412         int                     rc;
413         ENTRY;
414
415         LOD_CHECK_STRIPED_IT(env, it, lo);
416
417         next = lo->ldo_stripe[it->lit_stripe_index];
418         LASSERT(next != NULL);
419         LASSERT(next->do_index_ops != NULL);
420 again:
421         rc = next->do_index_ops->dio_it.next(env, it->lit_it);
422         if (rc < 0)
423                 RETURN(rc);
424
425         if (rc == 0 && it->lit_stripe_index == 0)
426                 RETURN(rc);
427
428         if (rc == 0 && it->lit_stripe_index > 0) {
429                 struct lu_dirent *ent;
430
431                 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
432
433                 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434                                                     (struct dt_rec *)ent,
435                                                     it->lit_attr);
436                 if (rc != 0)
437                         RETURN(rc);
438
439                 /* skip . and .. for slave stripe */
440                 if ((strncmp(ent->lde_name, ".",
441                              le16_to_cpu(ent->lde_namelen)) == 0 &&
442                      le16_to_cpu(ent->lde_namelen) == 1) ||
443                     (strncmp(ent->lde_name, "..",
444                              le16_to_cpu(ent->lde_namelen)) == 0 &&
445                      le16_to_cpu(ent->lde_namelen) == 2))
446                         goto again;
447
448                 RETURN(rc);
449         }
450
451         /* go to next stripe */
452         if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
453                 RETURN(1);
454
455         it->lit_stripe_index++;
456
457         next->do_index_ops->dio_it.put(env, it->lit_it);
458         next->do_index_ops->dio_it.fini(env, it->lit_it);
459
460         rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
461         if (rc != 0)
462                 RETURN(rc);
463
464         next = lo->ldo_stripe[it->lit_stripe_index];
465         LASSERT(next != NULL);
466         LASSERT(next->do_index_ops != NULL);
467
468         it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
469                                                   BYPASS_CAPA);
470         if (!IS_ERR(it_next)) {
471                 it->lit_it = it_next;
472                 goto again;
473         } else {
474                 rc = PTR_ERR(it_next);
475         }
476
477         RETURN(rc);
478 }
479
480 /**
481  * Implementation of dt_index_operations:: dio_it.key
482  *
483  * This function is to get the key of the iterator at current position.
484  *
485  * \param[in] env       execution environment.
486  * \param[in] di        the iterator for striped directory.
487  *
488  * \retval      key(dt_key) if successfully get the key.
489  * \retval      negative error if can not get the key.
490  */
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492                                          const struct dt_it *di)
493 {
494         const struct lod_it     *it = (const struct lod_it *)di;
495         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
496         struct dt_object        *next;
497
498         LOD_CHECK_STRIPED_IT(env, it, lo);
499
500         next = lo->ldo_stripe[it->lit_stripe_index];
501         LASSERT(next != NULL);
502         LASSERT(next->do_index_ops != NULL);
503
504         return next->do_index_ops->dio_it.key(env, it->lit_it);
505 }
506
507 /**
508  * Implementation of dt_index_operations:: dio_it.key_size
509  *
510  * This function is to get the key_size of current key.
511  *
512  * \param[in] env       execution environment.
513  * \param[in] di        the iterator for striped directory.
514  *
515  * \retval      key_size if successfully get the key_size.
516  * \retval      negative error if can not get the key_size.
517  */
518 static int lod_striped_it_key_size(const struct lu_env *env,
519                                    const struct dt_it *di)
520 {
521         struct lod_it           *it = (struct lod_it *)di;
522         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
523         struct dt_object        *next;
524
525         LOD_CHECK_STRIPED_IT(env, it, lo);
526
527         next = lo->ldo_stripe[it->lit_stripe_index];
528         LASSERT(next != NULL);
529         LASSERT(next->do_index_ops != NULL);
530
531         return next->do_index_ops->dio_it.key_size(env, it->lit_it);
532 }
533
534 /**
535  * Implementation of dt_index_operations:: dio_it.rec
536  *
537  * This function is to get the record at current position.
538  *
539  * \param[in] env       execution environment.
540  * \param[in] di        the iterator for striped directory.
541  * \param[in] attr      the attribute of iterator, mostly used to indicate
542  *                      the entry attribute in the object to be iterated.
543  * \param[out] rec      hold the return record.
544  *
545  * \retval      0 if successfully get the entry.
546  * \retval      negative error if can not get entry.
547  */
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549                               struct dt_rec *rec, __u32 attr)
550 {
551         const struct lod_it     *it = (const struct lod_it *)di;
552         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
553         struct dt_object        *next;
554
555         LOD_CHECK_STRIPED_IT(env, it, lo);
556
557         next = lo->ldo_stripe[it->lit_stripe_index];
558         LASSERT(next != NULL);
559         LASSERT(next->do_index_ops != NULL);
560
561         return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
562 }
563
564 /**
565  * Implementation of dt_index_operations:: dio_it.rec_size
566  *
567  * This function is to get the record_size at current record.
568  *
569  * \param[in] env       execution environment.
570  * \param[in] di        the iterator for striped directory.
571  * \param[in] attr      the attribute of iterator, mostly used to indicate
572  *                      the entry attribute in the object to be iterated.
573  *
574  * \retval      rec_size if successfully get the entry size.
575  * \retval      negative error if can not get entry size.
576  */
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578                                    const struct dt_it *di, __u32 attr)
579 {
580         struct lod_it           *it = (struct lod_it *)di;
581         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
582         struct dt_object        *next;
583
584         LOD_CHECK_STRIPED_IT(env, it, lo);
585
586         next = lo->ldo_stripe[it->lit_stripe_index];
587         LASSERT(next != NULL);
588         LASSERT(next->do_index_ops != NULL);
589
590         return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
591 }
592
593 /**
594  * Implementation of dt_index_operations:: dio_it.store
595  *
596  * This function will a cookie for current position of the iterator head,
597  * so that user can use this cookie to load/start the iterator next time.
598  *
599  * \param[in] env       execution environment.
600  * \param[in] di        the iterator for striped directory.
601  *
602  * \retval      the cookie.
603  */
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605                                   const struct dt_it *di)
606 {
607         const struct lod_it     *it = (const struct lod_it *)di;
608         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
609         struct dt_object        *next;
610
611         LOD_CHECK_STRIPED_IT(env, it, lo);
612
613         next = lo->ldo_stripe[it->lit_stripe_index];
614         LASSERT(next != NULL);
615         LASSERT(next->do_index_ops != NULL);
616
617         return next->do_index_ops->dio_it.store(env, it->lit_it);
618 }
619
620 /**
621  * Implementation of dt_index_operations:: dio_it.load
622  *
623  * This function will position the iterator with the given hash(usually
624  * get from store),
625  *
626  * \param[in] env       execution environment.
627  * \param[in] di        the iterator for striped directory.
628  * \param[in] hash      the given hash.
629  *
630  * \retval      >0 if successfuly load the iterator to the given position.
631  * \retval      <0 if load is failed.
632  */
633 static int lod_striped_it_load(const struct lu_env *env,
634                                const struct dt_it *di, __u64 hash)
635 {
636         const struct lod_it     *it = (const struct lod_it *)di;
637         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
638         struct dt_object        *next;
639
640         LOD_CHECK_STRIPED_IT(env, it, lo);
641
642         next = lo->ldo_stripe[it->lit_stripe_index];
643         LASSERT(next != NULL);
644         LASSERT(next->do_index_ops != NULL);
645
646         return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
647 }
648
649 static struct dt_index_operations lod_striped_index_ops = {
650         .dio_lookup             = lod_index_lookup,
651         .dio_declare_insert     = lod_declare_index_insert,
652         .dio_insert             = lod_index_insert,
653         .dio_declare_delete     = lod_declare_index_delete,
654         .dio_delete             = lod_index_delete,
655         .dio_it = {
656                 .init           = lod_striped_it_init,
657                 .fini           = lod_striped_it_fini,
658                 .get            = lod_striped_it_get,
659                 .put            = lod_striped_it_put,
660                 .next           = lod_striped_it_next,
661                 .key            = lod_striped_it_key,
662                 .key_size       = lod_striped_it_key_size,
663                 .rec            = lod_striped_it_rec,
664                 .rec_size       = lod_striped_it_rec_size,
665                 .store          = lod_striped_it_store,
666                 .load           = lod_striped_it_load,
667         }
668 };
669
670 /**
671  * Implementation of dt_object_operations:: do_index_try
672  *
673  * This function will try to initialize the index api pointer for the
674  * given object, usually it the entry point of the index api. i.e.
675  * the index object should be initialized in index_try, then start
676  * using index api. For striped directory, it will try to initialize
677  * all of its sub_stripes.
678  *
679  * \param[in] env       execution environment.
680  * \param[in] dt        the index object to be initialized.
681  * \param[in] feat      the features of this object, for example fixed or
682  *                      variable key size etc.
683  *
684  * \retval      >0 if the initialization is successful.
685  * \retval      <0 if the initialization is failed.
686  */
687 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
688                          const struct dt_index_features *feat)
689 {
690         struct lod_object       *lo = lod_dt_obj(dt);
691         struct dt_object        *next = dt_object_child(dt);
692         int                     rc;
693         ENTRY;
694
695         LASSERT(next->do_ops);
696         LASSERT(next->do_ops->do_index_try);
697
698         rc = lod_load_striping_locked(env, lo);
699         if (rc != 0)
700                 RETURN(rc);
701
702         rc = next->do_ops->do_index_try(env, next, feat);
703         if (rc != 0)
704                 RETURN(rc);
705
706         if (lo->ldo_stripenr > 0) {
707                 int i;
708
709                 for (i = 0; i < lo->ldo_stripenr; i++) {
710                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
711                                 continue;
712                         rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
713                                                 lo->ldo_stripe[i], feat);
714                         if (rc != 0)
715                                 RETURN(rc);
716                 }
717                 dt->do_index_ops = &lod_striped_index_ops;
718         } else {
719                 dt->do_index_ops = &lod_index_ops;
720         }
721
722         RETURN(rc);
723 }
724
725 static void lod_object_read_lock(const struct lu_env *env,
726                                  struct dt_object *dt, unsigned role)
727 {
728         dt_read_lock(env, dt_object_child(dt), role);
729 }
730
731 static void lod_object_write_lock(const struct lu_env *env,
732                                   struct dt_object *dt, unsigned role)
733 {
734         dt_write_lock(env, dt_object_child(dt), role);
735 }
736
737 static void lod_object_read_unlock(const struct lu_env *env,
738                                    struct dt_object *dt)
739 {
740         dt_read_unlock(env, dt_object_child(dt));
741 }
742
743 static void lod_object_write_unlock(const struct lu_env *env,
744                                     struct dt_object *dt)
745 {
746         dt_write_unlock(env, dt_object_child(dt));
747 }
748
749 static int lod_object_write_locked(const struct lu_env *env,
750                                    struct dt_object *dt)
751 {
752         return dt_write_locked(env, dt_object_child(dt));
753 }
754
755 static int lod_attr_get(const struct lu_env *env,
756                         struct dt_object *dt,
757                         struct lu_attr *attr,
758                         struct lustre_capa *capa)
759 {
760         /* Note: for striped directory, client will merge attributes
761          * from all of the sub-stripes see lmv_merge_attr(), and there
762          * no MDD logic depend on directory nlink/size/time, so we can
763          * always use master inode nlink and size for now. */
764         return dt_attr_get(env, dt_object_child(dt), attr, capa);
765 }
766
767 /**
768  * Mark all of sub-stripes dead of the striped directory.
769  **/
770 static int lod_mark_dead_object(const struct lu_env *env,
771                                 struct dt_object *dt,
772                                 struct thandle *handle,
773                                 bool declare)
774 {
775         struct lod_object       *lo = lod_dt_obj(dt);
776         struct lmv_mds_md_v1    *lmv;
777         __u32                   dead_hash_type;
778         int                     rc;
779         int                     i;
780
781         ENTRY;
782
783         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
784                 RETURN(0);
785
786         rc = lod_load_striping_locked(env, lo);
787         if (rc != 0)
788                 RETURN(rc);
789
790         if (lo->ldo_stripenr == 0)
791                 RETURN(0);
792
793         rc = lod_get_lmv_ea(env, lo);
794         if (rc <= 0)
795                 RETURN(rc);
796
797         lmv = lod_env_info(env)->lti_ea_store;
798         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
799         dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
800         lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
801         for (i = 0; i < lo->ldo_stripenr; i++) {
802                 struct lu_buf buf;
803
804                 lmv->lmv_master_mdt_index = i;
805                 buf.lb_buf = lmv;
806                 buf.lb_len = sizeof(*lmv);
807                 if (declare) {
808                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
809                                                   XATTR_NAME_LMV,
810                                                   LU_XATTR_REPLACE, handle);
811                 } else {
812                         rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
813                                           XATTR_NAME_LMV, LU_XATTR_REPLACE,
814                                           handle, BYPASS_CAPA);
815                 }
816                 if (rc != 0)
817                         break;
818         }
819
820         RETURN(rc);
821 }
822
823 static int lod_declare_attr_set(const struct lu_env *env,
824                                 struct dt_object *dt,
825                                 const struct lu_attr *attr,
826                                 struct thandle *handle)
827 {
828         struct dt_object  *next = dt_object_child(dt);
829         struct lod_object *lo = lod_dt_obj(dt);
830         int                rc, i;
831         ENTRY;
832
833         /* Set dead object on all other stripes */
834         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
835             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
836                 rc = lod_mark_dead_object(env, dt, handle, true);
837                 RETURN(rc);
838         }
839
840         /*
841          * declare setattr on the local object
842          */
843         rc = dt_declare_attr_set(env, next, attr, handle);
844         if (rc)
845                 RETURN(rc);
846
847         /* osp_declare_attr_set() ignores all attributes other than
848          * UID, GID, and size, and osp_attr_set() ignores all but UID
849          * and GID.  Declaration of size attr setting happens through
850          * lod_declare_init_size(), and not through this function.
851          * Therefore we need not load striping unless ownership is
852          * changing.  This should save memory and (we hope) speed up
853          * rename(). */
854         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
855                 if (!(attr->la_valid & (LA_UID | LA_GID)))
856                         RETURN(rc);
857
858                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
859                         RETURN(0);
860         } else {
861                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
862                                         LA_ATIME | LA_MTIME | LA_CTIME)))
863                         RETURN(rc);
864         }
865         /*
866          * load striping information, notice we don't do this when object
867          * is being initialized as we don't need this information till
868          * few specific cases like destroy, chown
869          */
870         rc = lod_load_striping(env, lo);
871         if (rc)
872                 RETURN(rc);
873
874         if (lo->ldo_stripenr == 0)
875                 RETURN(0);
876
877         /*
878          * if object is striped declare changes on the stripes
879          */
880         LASSERT(lo->ldo_stripe);
881         for (i = 0; i < lo->ldo_stripenr; i++) {
882                 if (likely(lo->ldo_stripe[i] != NULL)) {
883                         rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
884                                                  handle);
885                         if (rc != 0) {
886                                 CERROR("failed declaration: %d\n", rc);
887                                 break;
888                         }
889                 }
890         }
891
892         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
893             dt_object_exists(next) != 0 &&
894             dt_object_remote(next) == 0)
895                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
896
897         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
898             dt_object_exists(next) &&
899             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
900                 struct lod_thread_info *info = lod_env_info(env);
901                 struct lu_buf *buf = &info->lti_buf;
902
903                 buf->lb_buf = info->lti_ea_store;
904                 buf->lb_len = info->lti_ea_store_size;
905                 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
906                                      LU_XATTR_REPLACE, handle);
907         }
908
909         RETURN(rc);
910 }
911
912 static int lod_attr_set(const struct lu_env *env,
913                         struct dt_object *dt,
914                         const struct lu_attr *attr,
915                         struct thandle *handle,
916                         struct lustre_capa *capa)
917 {
918         struct dt_object        *next = dt_object_child(dt);
919         struct lod_object       *lo = lod_dt_obj(dt);
920         int                     rc, i;
921         ENTRY;
922
923         /* Set dead object on all other stripes */
924         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
925             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
926                 rc = lod_mark_dead_object(env, dt, handle, false);
927                 RETURN(rc);
928         }
929
930         /*
931          * apply changes to the local object
932          */
933         rc = dt_attr_set(env, next, attr, handle, capa);
934         if (rc)
935                 RETURN(rc);
936
937         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
938                 if (!(attr->la_valid & (LA_UID | LA_GID)))
939                         RETURN(rc);
940
941                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
942                         RETURN(0);
943         } else {
944                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
945                                         LA_ATIME | LA_MTIME | LA_CTIME)))
946                         RETURN(rc);
947         }
948
949         if (lo->ldo_stripenr == 0)
950                 RETURN(0);
951
952         /*
953          * if object is striped, apply changes to all the stripes
954          */
955         LASSERT(lo->ldo_stripe);
956         for (i = 0; i < lo->ldo_stripenr; i++) {
957                 if (likely(lo->ldo_stripe[i] != NULL)) {
958                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
959                                 continue;
960
961                         rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
962                                          handle, capa);
963                         if (rc != 0) {
964                                 CERROR("failed declaration: %d\n", rc);
965                                 break;
966                         }
967                 }
968         }
969
970         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
971             dt_object_exists(next) != 0 &&
972             dt_object_remote(next) == 0)
973                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
974
975         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
976             dt_object_exists(next) &&
977             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
978                 struct lod_thread_info *info = lod_env_info(env);
979                 struct lu_buf *buf = &info->lti_buf;
980                 struct ost_id *oi = &info->lti_ostid;
981                 struct lu_fid *fid = &info->lti_fid;
982                 struct lov_mds_md_v1 *lmm;
983                 struct lov_ost_data_v1 *objs;
984                 __u32 magic;
985                 int rc1;
986
987                 rc1 = lod_get_lov_ea(env, lo);
988                 if (rc1  <= 0)
989                         RETURN(rc);
990
991                 buf->lb_buf = info->lti_ea_store;
992                 buf->lb_len = info->lti_ea_store_size;
993                 lmm = info->lti_ea_store;
994                 magic = le32_to_cpu(lmm->lmm_magic);
995                 if (magic == LOV_MAGIC_V1)
996                         objs = &(lmm->lmm_objects[0]);
997                 else
998                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
999                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1000                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1001                 fid->f_oid--;
1002                 fid_to_ostid(fid, oi);
1003                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1004                 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1005                              LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1006         }
1007
1008         RETURN(rc);
1009 }
1010
1011 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1012                          struct lu_buf *buf, const char *name,
1013                          struct lustre_capa *capa)
1014 {
1015         struct lod_thread_info  *info = lod_env_info(env);
1016         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
1017         int                      rc, is_root;
1018         ENTRY;
1019
1020         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1021         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1022                 RETURN(rc);
1023
1024         /*
1025          * lod returns default striping on the real root of the device
1026          * this is like the root stores default striping for the whole
1027          * filesystem. historically we've been using a different approach
1028          * and store it in the config.
1029          */
1030         dt_root_get(env, dev->lod_child, &info->lti_fid);
1031         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1032
1033         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1034                 struct lov_user_md *lum = buf->lb_buf;
1035                 struct lov_desc    *desc = &dev->lod_desc;
1036
1037                 if (buf->lb_buf == NULL) {
1038                         rc = sizeof(*lum);
1039                 } else if (buf->lb_len >= sizeof(*lum)) {
1040                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1041                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1042                         lmm_oi_set_id(&lum->lmm_oi, 0);
1043                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1044                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1045                         lum->lmm_stripe_size = cpu_to_le32(
1046                                                 desc->ld_default_stripe_size);
1047                         lum->lmm_stripe_count = cpu_to_le16(
1048                                                 desc->ld_default_stripe_count);
1049                         lum->lmm_stripe_offset = cpu_to_le16(
1050                                                 desc->ld_default_stripe_offset);
1051                         rc = sizeof(*lum);
1052                 } else {
1053                         rc = -ERANGE;
1054                 }
1055         }
1056
1057         RETURN(rc);
1058 }
1059
1060 static int lod_verify_md_striping(struct lod_device *lod,
1061                                   const struct lmv_user_md_v1 *lum)
1062 {
1063         int     rc = 0;
1064         ENTRY;
1065
1066         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1067                 GOTO(out, rc = -EINVAL);
1068
1069         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1070                 GOTO(out, rc = -EINVAL);
1071 out:
1072         if (rc != 0)
1073                 CERROR("%s: invalid lmv_user_md: magic = %x, "
1074                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1075                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1076                        (int)le32_to_cpu(lum->lum_stripe_offset),
1077                        le32_to_cpu(lum->lum_stripe_count), rc);
1078         return rc;
1079 }
1080
1081 /**
1082  * Master LMVEA will be same as slave LMVEA, except
1083  * 1. different magic
1084  * 2. No lmv_stripe_fids on slave
1085  * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1086  */
1087 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1088                                   const struct lmv_mds_md_v1 *master_lmv)
1089 {
1090         *slave_lmv = *master_lmv;
1091         slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1092 }
1093
1094 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1095                     struct lu_buf *lmv_buf)
1096 {
1097         struct lod_thread_info  *info = lod_env_info(env);
1098         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1099         struct lod_object       *lo = lod_dt_obj(dt);
1100         struct lmv_mds_md_v1    *lmm1;
1101         int                     stripe_count;
1102         int                     lmm_size;
1103         int                     type = LU_SEQ_RANGE_ANY;
1104         int                     i;
1105         int                     rc;
1106         __u32                   mdtidx;
1107         ENTRY;
1108
1109         LASSERT(lo->ldo_dir_striped != 0);
1110         LASSERT(lo->ldo_stripenr > 0);
1111         stripe_count = lo->ldo_stripenr;
1112         lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
1113         if (info->lti_ea_store_size < lmm_size) {
1114                 rc = lod_ea_store_resize(info, lmm_size);
1115                 if (rc != 0)
1116                         RETURN(rc);
1117         }
1118
1119         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1120         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1121         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1122         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1123         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1124                             &mdtidx, &type);
1125         if (rc != 0)
1126                 RETURN(rc);
1127
1128         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1129         fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
1130         for (i = 0; i < lo->ldo_stripenr; i++) {
1131                 struct dt_object *dto;
1132
1133                 dto = lo->ldo_stripe[i];
1134                 LASSERT(dto != NULL);
1135                 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
1136                               lu_object_fid(&dto->do_lu));
1137         }
1138
1139         lmv_buf->lb_buf = info->lti_ea_store;
1140         lmv_buf->lb_len = lmm_size;
1141         lo->ldo_dir_striping_cached = 1;
1142
1143         RETURN(rc);
1144 }
1145
1146 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1147                            const struct lu_buf *buf)
1148 {
1149         struct lod_thread_info  *info = lod_env_info(env);
1150         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1151         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1152         struct dt_object        **stripe;
1153         union lmv_mds_md        *lmm = buf->lb_buf;
1154         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
1155         struct lu_fid           *fid = &info->lti_fid;
1156         int                     i;
1157         int                     rc = 0;
1158         ENTRY;
1159
1160         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1161                 RETURN(0);
1162
1163         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1164                 lo->ldo_dir_slave_stripe = 1;
1165                 RETURN(0);
1166         }
1167
1168         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1169                 RETURN(-EINVAL);
1170
1171         if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
1172                 RETURN(0);
1173
1174         LASSERT(lo->ldo_stripe == NULL);
1175         OBD_ALLOC(stripe, sizeof(stripe[0]) *
1176                   (le32_to_cpu(lmv1->lmv_stripe_count)));
1177         if (stripe == NULL)
1178                 RETURN(-ENOMEM);
1179
1180         for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1181                 struct dt_device        *tgt_dt;
1182                 struct dt_object        *dto;
1183                 int                     type = LU_SEQ_RANGE_ANY;
1184                 __u32                   idx;
1185
1186                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1187                 if (!fid_is_sane(fid))
1188                         GOTO(out, rc = -ESTALE);
1189
1190                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1191                 if (rc != 0)
1192                         GOTO(out, rc);
1193
1194                 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1195                         tgt_dt = lod->lod_child;
1196                 } else {
1197                         struct lod_tgt_desc     *tgt;
1198
1199                         tgt = LTD_TGT(ltd, idx);
1200                         if (tgt == NULL)
1201                                 GOTO(out, rc = -ESTALE);
1202                         tgt_dt = tgt->ltd_tgt;
1203                 }
1204
1205                 dto = dt_locate_at(env, tgt_dt, fid,
1206                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1207                                   NULL);
1208                 if (IS_ERR(dto))
1209                         GOTO(out, rc = PTR_ERR(dto));
1210
1211                 stripe[i] = dto;
1212         }
1213 out:
1214         lo->ldo_stripe = stripe;
1215         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1216         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1217         if (rc != 0)
1218                 lod_object_free_striping(env, lo);
1219
1220         RETURN(rc);
1221 }
1222
1223 static int lod_prep_md_striped_create(const struct lu_env *env,
1224                                       struct dt_object *dt,
1225                                       struct lu_attr *attr,
1226                                       const struct lmv_user_md_v1 *lum,
1227                                       struct dt_object_format *dof,
1228                                       struct thandle *th)
1229 {
1230         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1231         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1232         struct lod_object       *lo = lod_dt_obj(dt);
1233         struct lod_thread_info  *info = lod_env_info(env);
1234         struct dt_object        **stripe;
1235         struct lu_buf           lmv_buf;
1236         struct lu_buf           slave_lmv_buf;
1237         struct lmv_mds_md_v1    *lmm;
1238         struct lmv_mds_md_v1    *slave_lmm = NULL;
1239         int                     stripe_count;
1240         int                     *idx_array;
1241         int                     rc = 0;
1242         int                     i;
1243         int                     j;
1244         ENTRY;
1245
1246         /* The lum has been verifed in lod_verify_md_striping */
1247         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1248         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1249
1250         stripe_count = le32_to_cpu(lum->lum_stripe_count);
1251
1252         /* shrink the stripe_count to the avaible MDT count */
1253         if (stripe_count > lod->lod_remote_mdt_count + 1)
1254                 stripe_count = lod->lod_remote_mdt_count + 1;
1255
1256         OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1257         if (stripe == NULL)
1258                 RETURN(-ENOMEM);
1259
1260         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1261         if (idx_array == NULL)
1262                 GOTO(out_free, rc = -ENOMEM);
1263
1264         for (i = 0; i < stripe_count; i++) {
1265                 struct lod_tgt_desc     *tgt = NULL;
1266                 struct dt_object        *dto;
1267                 struct lu_fid           fid = { 0 };
1268                 int                     idx;
1269                 struct lu_object_conf   conf = { 0 };
1270                 struct dt_device        *tgt_dt = NULL;
1271
1272                 if (i == 0) {
1273                         /* Right now, master stripe and master object are
1274                          * on the same MDT */
1275                         idx = le32_to_cpu(lum->lum_stripe_offset);
1276                         rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1277                                            NULL);
1278                         if (rc < 0)
1279                                 GOTO(out_put, rc);
1280                         tgt_dt = lod->lod_child;
1281                         goto next;
1282                 }
1283
1284                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1285
1286                 for (j = 0; j < lod->lod_remote_mdt_count;
1287                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1288                         bool already_allocated = false;
1289                         int k;
1290
1291                         CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1292                                " allocated %d, last allocated %d\n", idx,
1293                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1294
1295                         /* Find next available target */
1296                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1297                                 continue;
1298
1299                         /* check whether the idx already exists
1300                          * in current allocated array */
1301                         for (k = 0; k < i; k++) {
1302                                 if (idx_array[k] == idx) {
1303                                         already_allocated = true;
1304                                         break;
1305                                 }
1306                         }
1307
1308                         if (already_allocated)
1309                                 continue;
1310
1311                         /* check the status of the OSP */
1312                         tgt = LTD_TGT(ltd, idx);
1313                         if (tgt == NULL)
1314                                 continue;
1315
1316                         tgt_dt = tgt->ltd_tgt;
1317                         rc = dt_statfs(env, tgt_dt, NULL);
1318                         if (rc) {
1319                                 /* this OSP doesn't feel well */
1320                                 rc = 0;
1321                                 continue;
1322                         }
1323
1324                         rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1325                         if (rc < 0) {
1326                                 rc = 0;
1327                                 continue;
1328                         }
1329
1330                         break;
1331                 }
1332
1333                 /* Can not allocate more stripes */
1334                 if (j == lod->lod_remote_mdt_count) {
1335                         CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1336                                lod2obd(lod)->obd_name, stripe_count, i - 1);
1337                         break;
1338                 }
1339
1340                 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1341                        " allocated %d, last allocated %d\n", idx,
1342                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1343
1344 next:
1345                 /* tgt_dt and fid must be ready after search avaible OSP
1346                  * in the above loop */
1347                 LASSERT(tgt_dt != NULL);
1348                 LASSERT(fid_is_sane(&fid));
1349                 conf.loc_flags = LOC_F_NEW;
1350                 dto = dt_locate_at(env, tgt_dt, &fid,
1351                                    dt->do_lu.lo_dev->ld_site->ls_top_dev,
1352                                    &conf);
1353                 if (IS_ERR(dto))
1354                         GOTO(out_put, rc = PTR_ERR(dto));
1355                 stripe[i] = dto;
1356                 idx_array[i] = idx;
1357         }
1358
1359         lo->ldo_dir_striped = 1;
1360         lo->ldo_stripe = stripe;
1361         lo->ldo_stripenr = i;
1362         lo->ldo_stripes_allocated = stripe_count;
1363
1364         if (lo->ldo_stripenr == 0)
1365                 GOTO(out_put, rc = -ENOSPC);
1366
1367         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1368         if (rc != 0)
1369                 GOTO(out_put, rc);
1370         lmm = lmv_buf.lb_buf;
1371
1372         OBD_ALLOC_PTR(slave_lmm);
1373         if (slave_lmm == NULL)
1374                 GOTO(out_put, rc = -ENOMEM);
1375
1376         lod_prep_slave_lmv_md(slave_lmm, lmm);
1377         slave_lmv_buf.lb_buf = slave_lmm;
1378         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1379
1380         if (!dt_try_as_dir(env, dt_object_child(dt)))
1381                 GOTO(out_put, rc = -EINVAL);
1382
1383         for (i = 0; i < lo->ldo_stripenr; i++) {
1384                 struct dt_object        *dto            = stripe[i];
1385                 char                    *stripe_name    = info->lti_key;
1386                 struct lu_name          *sname;
1387                 struct linkea_data       ldata          = { 0 };
1388                 struct lu_buf            linkea_buf;
1389
1390                 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1391                 if (rc != 0)
1392                         GOTO(out_put, rc);
1393
1394                 if (!dt_try_as_dir(env, dto))
1395                         GOTO(out_put, rc = -EINVAL);
1396
1397                 rc = dt_declare_insert(env, dto,
1398                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1399                      (const struct dt_key *)dot, th);
1400                 if (rc != 0)
1401                         GOTO(out_put, rc);
1402
1403                 /* master stripe FID will be put to .. */
1404                 rc = dt_declare_insert(env, dto,
1405                      (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1406                      (const struct dt_key *)dotdot, th);
1407                 if (rc != 0)
1408                         GOTO(out_put, rc);
1409
1410                 /* probably nothing to inherite */
1411                 if (lo->ldo_striping_cached &&
1412                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1413                                          lo->ldo_def_stripenr,
1414                                          lo->ldo_def_stripe_offset)) {
1415                         struct lov_user_md_v3   *v3;
1416
1417                         /* sigh, lti_ea_store has been used for lmv_buf,
1418                          * so we have to allocate buffer for default
1419                          * stripe EA */
1420                         OBD_ALLOC_PTR(v3);
1421                         if (v3 == NULL)
1422                                 GOTO(out_put, rc = -ENOMEM);
1423
1424                         memset(v3, 0, sizeof(*v3));
1425                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1426                         v3->lmm_stripe_count =
1427                                 cpu_to_le16(lo->ldo_def_stripenr);
1428                         v3->lmm_stripe_offset =
1429                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1430                         v3->lmm_stripe_size =
1431                                 cpu_to_le32(lo->ldo_def_stripe_size);
1432                         if (lo->ldo_pool != NULL)
1433                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1434                                         sizeof(v3->lmm_pool_name));
1435
1436                         info->lti_buf.lb_buf = v3;
1437                         info->lti_buf.lb_len = sizeof(*v3);
1438                         rc = dt_declare_xattr_set(env, dto,
1439                                                   &info->lti_buf,
1440                                                   XATTR_NAME_LOV,
1441                                                   0, th);
1442                         OBD_FREE_PTR(v3);
1443                         if (rc != 0)
1444                                 GOTO(out_put, rc);
1445                 }
1446
1447                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1448                 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1449                                           XATTR_NAME_LMV, 0, th);
1450                 if (rc != 0)
1451                         GOTO(out_put, rc);
1452
1453                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1454                         PFID(lu_object_fid(&dto->do_lu)), i);
1455
1456                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1457                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1458                 if (rc != 0)
1459                         GOTO(out_put, rc);
1460
1461                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1462                 if (rc != 0)
1463                         GOTO(out_put, rc);
1464
1465                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1466                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1467                 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1468                                           XATTR_NAME_LINK, 0, th);
1469                 if (rc != 0)
1470                         GOTO(out_put, rc);
1471
1472                 rc = dt_declare_insert(env, dt_object_child(dt),
1473                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1474                      (const struct dt_key *)stripe_name, th);
1475                 if (rc != 0)
1476                         GOTO(out_put, rc);
1477
1478                 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1479                 if (rc != 0)
1480                         GOTO(out_put, rc);
1481         }
1482
1483         rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1484                                   XATTR_NAME_LMV, 0, th);
1485         if (rc != 0)
1486                 GOTO(out_put, rc);
1487
1488 out_put:
1489         if (rc < 0) {
1490                 for (i = 0; i < stripe_count; i++)
1491                         if (stripe[i] != NULL)
1492                                 lu_object_put(env, &stripe[i]->do_lu);
1493                 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1494                 lo->ldo_stripenr = 0;
1495                 lo->ldo_stripes_allocated = 0;
1496                 lo->ldo_stripe = NULL;
1497         }
1498
1499 out_free:
1500         if (idx_array != NULL)
1501                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1502         if (slave_lmm != NULL)
1503                 OBD_FREE_PTR(slave_lmm);
1504
1505         RETURN(rc);
1506 }
1507
1508 /**
1509  * Declare create striped md object.
1510  */
1511 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1512                                      struct dt_object *dt,
1513                                      struct lu_attr *attr,
1514                                      const struct lu_buf *lum_buf,
1515                                      struct dt_object_format *dof,
1516                                      struct thandle *th)
1517 {
1518         struct lod_object       *lo = lod_dt_obj(dt);
1519         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1520         struct lmv_user_md_v1   *lum;
1521         int                     rc;
1522         ENTRY;
1523
1524         lum = lum_buf->lb_buf;
1525         LASSERT(lum != NULL);
1526
1527         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1528                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1529                (int)le32_to_cpu(lum->lum_stripe_offset));
1530
1531         if (le32_to_cpu(lum->lum_stripe_count) == 0)
1532                 GOTO(out, rc = 0);
1533
1534         rc = lod_verify_md_striping(lod, lum);
1535         if (rc != 0)
1536                 GOTO(out, rc);
1537
1538         /* prepare dir striped objects */
1539         rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1540         if (rc != 0) {
1541                 /* failed to create striping, let's reset
1542                  * config so that others don't get confused */
1543                 lod_object_free_striping(env, lo);
1544                 GOTO(out, rc);
1545         }
1546 out:
1547         RETURN(rc);
1548 }
1549
1550 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1551                                      struct dt_object *dt,
1552                                      const struct lu_buf *buf,
1553                                      const char *name, int fl,
1554                                      struct thandle *th)
1555 {
1556         struct dt_object        *next = dt_object_child(dt);
1557         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1558         struct lod_object       *lo = lod_dt_obj(dt);
1559         int                     i;
1560         int                     rc;
1561         ENTRY;
1562
1563         if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1564                 struct lmv_user_md_v1 *lum;
1565
1566                 LASSERT(buf != NULL && buf->lb_buf != NULL);
1567                 lum = buf->lb_buf;
1568                 rc = lod_verify_md_striping(d, lum);
1569                 if (rc != 0)
1570                         RETURN(rc);
1571         }
1572
1573         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1574         if (rc != 0)
1575                 RETURN(rc);
1576
1577         /* set xattr to each stripes, if needed */
1578         rc = lod_load_striping(env, lo);
1579         if (rc != 0)
1580                 RETURN(rc);
1581
1582         if (lo->ldo_stripenr == 0)
1583                 RETURN(rc);
1584
1585         for (i = 0; i < lo->ldo_stripenr; i++) {
1586                 LASSERT(lo->ldo_stripe[i]);
1587                 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1588                                           name, fl, th);
1589                 if (rc != 0)
1590                         break;
1591         }
1592
1593         RETURN(rc);
1594 }
1595
1596 /*
1597  * LOV xattr is a storage for striping, and LOD owns this xattr.
1598  * but LOD allows others to control striping to some extent
1599  * - to reset strping
1600  * - to set new defined striping
1601  * - to set new semi-defined striping
1602  *   - number of stripes is defined
1603  *   - number of stripes + osts are defined
1604  *   - ??
1605  */
1606 static int lod_declare_xattr_set(const struct lu_env *env,
1607                                  struct dt_object *dt,
1608                                  const struct lu_buf *buf,
1609                                  const char *name, int fl,
1610                                  struct thandle *th)
1611 {
1612         struct dt_object *next = dt_object_child(dt);
1613         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
1614         __u32             mode;
1615         int               rc;
1616         ENTRY;
1617
1618         /*
1619          * allow to declare predefined striping on a new (!mode) object
1620          * which is supposed to be replay of regular file creation
1621          * (when LOV setting is declared)
1622          * LU_XATTR_REPLACE is set to indicate a layout swap
1623          */
1624         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1625         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1626              !(fl & LU_XATTR_REPLACE)) {
1627                 /*
1628                  * this is a request to manipulate object's striping
1629                  */
1630                 if (dt_object_exists(dt)) {
1631                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1632                         if (rc)
1633                                 RETURN(rc);
1634                 } else {
1635                         memset(attr, 0, sizeof(*attr));
1636                         attr->la_valid = LA_TYPE | LA_MODE;
1637                         attr->la_mode = S_IFREG;
1638                 }
1639                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1640         } else if (S_ISDIR(mode)) {
1641                 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1642         } else {
1643                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1644         }
1645
1646         RETURN(rc);
1647 }
1648
1649 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1650 {
1651         lo->ldo_striping_cached = 0;
1652         lo->ldo_def_striping_set = 0;
1653         lod_object_set_pool(lo, NULL);
1654         lo->ldo_def_stripe_size = 0;
1655         lo->ldo_def_stripenr = 0;
1656         if (lo->ldo_dir_stripe != NULL)
1657                 lo->ldo_dir_striping_cached = 0;
1658 }
1659
1660 static int lod_xattr_set_internal(const struct lu_env *env,
1661                                   struct dt_object *dt,
1662                                   const struct lu_buf *buf,
1663                                   const char *name, int fl, struct thandle *th,
1664                                   struct lustre_capa *capa)
1665 {
1666         struct dt_object        *next = dt_object_child(dt);
1667         struct lod_object       *lo = lod_dt_obj(dt);
1668         int                     rc;
1669         int                     i;
1670         ENTRY;
1671
1672         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1673         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1674                 RETURN(rc);
1675
1676         if (lo->ldo_stripenr == 0)
1677                 RETURN(rc);
1678
1679         for (i = 0; i < lo->ldo_stripenr; i++) {
1680                 LASSERT(lo->ldo_stripe[i]);
1681                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1682                                   capa);
1683                 if (rc != 0)
1684                         break;
1685         }
1686
1687         RETURN(rc);
1688 }
1689
1690 static int lod_xattr_del_internal(const struct lu_env *env,
1691                                   struct dt_object *dt,
1692                                   const char *name, struct thandle *th,
1693                                   struct lustre_capa *capa)
1694 {
1695         struct dt_object        *next = dt_object_child(dt);
1696         struct lod_object       *lo = lod_dt_obj(dt);
1697         int                     rc;
1698         int                     i;
1699         ENTRY;
1700
1701         rc = dt_xattr_del(env, next, name, th, capa);
1702         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1703                 RETURN(rc);
1704
1705         if (lo->ldo_stripenr == 0)
1706                 RETURN(rc);
1707
1708         for (i = 0; i < lo->ldo_stripenr; i++) {
1709                 LASSERT(lo->ldo_stripe[i]);
1710                 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1711                                   capa);
1712                 if (rc != 0)
1713                         break;
1714         }
1715
1716         RETURN(rc);
1717 }
1718
1719 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1720                                     struct dt_object *dt,
1721                                     const struct lu_buf *buf,
1722                                     const char *name, int fl,
1723                                     struct thandle *th,
1724                                     struct lustre_capa *capa)
1725 {
1726         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1727         struct lod_object       *l = lod_dt_obj(dt);
1728         struct lov_user_md_v1   *lum;
1729         struct lov_user_md_v3   *v3 = NULL;
1730         int                      rc;
1731         ENTRY;
1732
1733         /* If it is striped dir, we should clear the stripe cache for
1734          * slave stripe as well, but there are no effective way to
1735          * notify the LOD on the slave MDT, so we do not cache stripe
1736          * information for slave stripe for now. XXX*/
1737         lod_lov_stripe_cache_clear(l);
1738         LASSERT(buf != NULL && buf->lb_buf != NULL);
1739         lum = buf->lb_buf;
1740
1741         rc = lod_verify_striping(d, buf, false);
1742         if (rc)
1743                 RETURN(rc);
1744
1745         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1746                 v3 = buf->lb_buf;
1747
1748         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1749          * (i.e. all default values specified) then delete default
1750          * striping from dir. */
1751         CDEBUG(D_OTHER,
1752                 "set default striping: sz %u # %u offset %d %s %s\n",
1753                 (unsigned)lum->lmm_stripe_size,
1754                 (unsigned)lum->lmm_stripe_count,
1755                 (int)lum->lmm_stripe_offset,
1756                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1757
1758         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1759                                 (lum->lmm_stripe_count),
1760                                 (lum->lmm_stripe_offset)) &&
1761                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1762                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1763                 if (rc == -ENODATA)
1764                         rc = 0;
1765         } else {
1766                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1767         }
1768
1769         RETURN(rc);
1770 }
1771
1772 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1773                                             struct dt_object *dt,
1774                                             const struct lu_buf *buf,
1775                                             const char *name, int fl,
1776                                             struct thandle *th,
1777                                             struct lustre_capa *capa)
1778 {
1779         struct lod_object       *l = lod_dt_obj(dt);
1780         struct lmv_user_md_v1   *lum;
1781         int                      rc;
1782         ENTRY;
1783
1784         LASSERT(buf != NULL && buf->lb_buf != NULL);
1785         lum = buf->lb_buf;
1786
1787         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1788               le32_to_cpu(lum->lum_stripe_count),
1789               (int)le32_to_cpu(lum->lum_stripe_offset));
1790
1791         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1792                                  le32_to_cpu(lum->lum_stripe_offset)) &&
1793                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1794                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1795                 if (rc == -ENODATA)
1796                         rc = 0;
1797         } else {
1798                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1799                 if (rc != 0)
1800                         RETURN(rc);
1801         }
1802
1803         /* Update default stripe cache */
1804         if (l->ldo_dir_stripe == NULL) {
1805                 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1806                 if (l->ldo_dir_stripe == NULL)
1807                         RETURN(-ENOMEM);
1808         }
1809
1810         l->ldo_dir_striping_cached = 0;
1811         l->ldo_dir_def_striping_set = 1;
1812         l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
1813
1814         RETURN(rc);
1815 }
1816
1817 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1818                              const struct lu_buf *buf, const char *name,
1819                              int fl, struct thandle *th,
1820                              struct lustre_capa *capa)
1821 {
1822         struct lod_object       *lo = lod_dt_obj(dt);
1823         struct lod_thread_info  *info = lod_env_info(env);
1824         struct lu_attr          *attr = &info->lti_attr;
1825         struct dt_object_format *dof = &info->lti_format;
1826         struct lu_buf           lmv_buf;
1827         struct lu_buf           slave_lmv_buf;
1828         struct lmv_mds_md_v1    *lmm;
1829         struct lmv_mds_md_v1    *slave_lmm = NULL;
1830         int                     i;
1831         int                     rc;
1832         ENTRY;
1833
1834         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1835                 RETURN(-ENOTDIR);
1836
1837         /* The stripes are supposed to be allocated in declare phase,
1838          * if there are no stripes being allocated, it will skip */
1839         if (lo->ldo_stripenr == 0)
1840                 RETURN(0);
1841
1842         rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
1843         if (rc != 0)
1844                 RETURN(rc);
1845
1846         attr->la_valid = LA_TYPE | LA_MODE;
1847         dof->dof_type = DFT_DIR;
1848
1849         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1850         if (rc != 0)
1851                 RETURN(rc);
1852         lmm = lmv_buf.lb_buf;
1853
1854         OBD_ALLOC_PTR(slave_lmm);
1855         if (slave_lmm == NULL)
1856                 RETURN(-ENOMEM);
1857
1858         lod_prep_slave_lmv_md(slave_lmm, lmm);
1859         slave_lmv_buf.lb_buf = slave_lmm;
1860         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1861
1862         for (i = 0; i < lo->ldo_stripenr; i++) {
1863                 struct dt_object        *dto;
1864                 char                    *stripe_name    = info->lti_key;
1865                 struct lu_name          *sname;
1866                 struct linkea_data       ldata          = { 0 };
1867                 struct lu_buf            linkea_buf;
1868
1869                 dto = lo->ldo_stripe[i];
1870                 dt_write_lock(env, dto, MOR_TGT_CHILD);
1871                 rc = dt_create(env, dto, attr, NULL, dof, th);
1872                 dt_write_unlock(env, dto);
1873                 if (rc != 0)
1874                         RETURN(rc);
1875
1876                 rc = dt_insert(env, dto,
1877                               (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1878                               (const struct dt_key *)dot, th, capa, 0);
1879                 if (rc != 0)
1880                         RETURN(rc);
1881
1882                 rc = dt_insert(env, dto,
1883                               (struct dt_rec *)lu_object_fid(&dt->do_lu),
1884                               (const struct dt_key *)dotdot, th, capa, 0);
1885                 if (rc != 0)
1886                         RETURN(rc);
1887
1888                 if (lo->ldo_striping_cached &&
1889                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1890                                          lo->ldo_def_stripenr,
1891                                          lo->ldo_def_stripe_offset)) {
1892                         struct lov_user_md_v3   *v3;
1893
1894                         /* sigh, lti_ea_store has been used for lmv_buf,
1895                          * so we have to allocate buffer for default
1896                          * stripe EA */
1897                         OBD_ALLOC_PTR(v3);
1898                         if (v3 == NULL)
1899                                 GOTO(out, rc);
1900
1901                         memset(v3, 0, sizeof(*v3));
1902                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1903                         v3->lmm_stripe_count =
1904                                 cpu_to_le16(lo->ldo_def_stripenr);
1905                         v3->lmm_stripe_offset =
1906                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1907                         v3->lmm_stripe_size =
1908                                 cpu_to_le32(lo->ldo_def_stripe_size);
1909                         if (lo->ldo_pool != NULL)
1910                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1911                                         sizeof(v3->lmm_pool_name));
1912
1913                         info->lti_buf.lb_buf = v3;
1914                         info->lti_buf.lb_len = sizeof(*v3);
1915                         rc = dt_xattr_set(env, dto, &info->lti_buf,
1916                                           XATTR_NAME_LOV, 0, th, capa);
1917                         OBD_FREE_PTR(v3);
1918                         if (rc != 0)
1919                                 GOTO(out, rc);
1920                 }
1921
1922                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1923                 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
1924                                   fl, th, capa);
1925                 if (rc != 0)
1926                         GOTO(out, rc);
1927
1928                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1929                          PFID(lu_object_fid(&dto->do_lu)), i);
1930
1931                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1932                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1933                 if (rc != 0)
1934                         GOTO(out, rc);
1935
1936                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1937                 if (rc != 0)
1938                         GOTO(out, rc);
1939
1940                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1941                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1942                 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
1943                                   0, th, BYPASS_CAPA);
1944                 if (rc != 0)
1945                         GOTO(out, rc);
1946
1947                 rc = dt_insert(env, dt_object_child(dt),
1948                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1949                      (const struct dt_key *)stripe_name, th, capa, 0);
1950                 if (rc != 0)
1951                         GOTO(out, rc);
1952
1953                 rc = dt_ref_add(env, dt_object_child(dt), th);
1954                 if (rc != 0)
1955                         GOTO(out, rc);
1956         }
1957
1958         rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
1959                           fl, th, capa);
1960
1961 out:
1962         if (slave_lmm != NULL)
1963                 OBD_FREE_PTR(slave_lmm);
1964
1965         RETURN(rc);
1966 }
1967
1968 int lod_dir_striping_create_internal(const struct lu_env *env,
1969                                      struct dt_object *dt,
1970                                      struct lu_attr *attr,
1971                                      struct dt_object_format *dof,
1972                                      struct thandle *th,
1973                                      bool declare)
1974 {
1975         struct lod_thread_info  *info = lod_env_info(env);
1976         struct lod_object       *lo = lod_dt_obj(dt);
1977         int                     rc;
1978         ENTRY;
1979
1980         if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1981                                  lo->ldo_dir_stripe_offset)) {
1982                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1983                 int stripe_count = lo->ldo_stripenr;
1984
1985                 if (info->lti_ea_store_size < sizeof(*v1)) {
1986                         rc = lod_ea_store_resize(info, sizeof(*v1));
1987                         if (rc != 0)
1988                                 RETURN(rc);
1989                         v1 = info->lti_ea_store;
1990                 }
1991
1992                 memset(v1, 0, sizeof(*v1));
1993                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
1994                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
1995                 v1->lum_stripe_offset =
1996                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
1997
1998                 info->lti_buf.lb_buf = v1;
1999                 info->lti_buf.lb_len = sizeof(*v1);
2000
2001                 if (declare)
2002                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
2003                                                        &info->lti_buf, dof, th);
2004                 else
2005                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2006                                                XATTR_NAME_LMV, 0, th,
2007                                                BYPASS_CAPA);
2008                 if (rc != 0)
2009                         RETURN(rc);
2010         }
2011
2012         /* Transfer default LMV striping from the parent */
2013         if (lo->ldo_dir_striping_cached &&
2014             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2015                                  lo->ldo_dir_def_stripe_offset)) {
2016                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2017                 int def_stripe_count = lo->ldo_dir_def_stripenr;
2018
2019                 if (info->lti_ea_store_size < sizeof(*v1)) {
2020                         rc = lod_ea_store_resize(info, sizeof(*v1));
2021                         if (rc != 0)
2022                                 RETURN(rc);
2023                         v1 = info->lti_ea_store;
2024                 }
2025
2026                 memset(v1, 0, sizeof(*v1));
2027                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2028                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2029                 v1->lum_stripe_offset =
2030                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2031                 v1->lum_hash_type =
2032                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
2033
2034                 info->lti_buf.lb_buf = v1;
2035                 info->lti_buf.lb_len = sizeof(*v1);
2036                 if (declare)
2037                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2038                                                        XATTR_NAME_DEFAULT_LMV,
2039                                                        0, th);
2040                 else
2041                         rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2042                                                   &info->lti_buf,
2043                                                   XATTR_NAME_DEFAULT_LMV, 0,
2044                                                   th, BYPASS_CAPA);
2045                 if (rc != 0)
2046                         RETURN(rc);
2047         }
2048
2049         /* Transfer default LOV striping from the parent */
2050         if (lo->ldo_striping_cached &&
2051             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2052                                  lo->ldo_def_stripenr,
2053                                  lo->ldo_def_stripe_offset)) {
2054                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2055
2056                 if (info->lti_ea_store_size < sizeof(*v3)) {
2057                         rc = lod_ea_store_resize(info, sizeof(*v3));
2058                         if (rc != 0)
2059                                 RETURN(rc);
2060                         v3 = info->lti_ea_store;
2061                 }
2062
2063                 memset(v3, 0, sizeof(*v3));
2064                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2065                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2066                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2067                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2068                 if (lo->ldo_pool != NULL)
2069                         strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2070                                 sizeof(v3->lmm_pool_name));
2071
2072                 info->lti_buf.lb_buf = v3;
2073                 info->lti_buf.lb_len = sizeof(*v3);
2074
2075                 if (declare)
2076                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2077                                                        XATTR_NAME_LOV, 0, th);
2078                 else
2079                         rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2080                                                       XATTR_NAME_LOV, 0, th,
2081                                                       BYPASS_CAPA);
2082                 if (rc != 0)
2083                         RETURN(rc);
2084         }
2085
2086         RETURN(0);
2087 }
2088
2089 static int lod_declare_dir_striping_create(const struct lu_env *env,
2090                                            struct dt_object *dt,
2091                                            struct lu_attr *attr,
2092                                            struct dt_object_format *dof,
2093                                            struct thandle *th)
2094 {
2095         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2096 }
2097
2098 static int lod_dir_striping_create(const struct lu_env *env,
2099                                    struct dt_object *dt,
2100                                    struct lu_attr *attr,
2101                                    struct dt_object_format *dof,
2102                                    struct thandle *th)
2103 {
2104         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2105 }
2106
2107 static int lod_xattr_set(const struct lu_env *env,
2108                          struct dt_object *dt, const struct lu_buf *buf,
2109                          const char *name, int fl, struct thandle *th,
2110                          struct lustre_capa *capa)
2111 {
2112         struct dt_object        *next = dt_object_child(dt);
2113         int                      rc;
2114         ENTRY;
2115
2116         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2117             strcmp(name, XATTR_NAME_LMV) == 0) {
2118                 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2119
2120                 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2121                                                 LMV_HASH_FLAG_MIGRATION)
2122                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2123                 else
2124                         rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2125
2126                 RETURN(rc);
2127         }
2128
2129         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2130             strcmp(name, XATTR_NAME_LOV) == 0) {
2131                 /* default LOVEA */
2132                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2133                 RETURN(rc);
2134         } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2135                    strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2136                 /* default LMVEA */
2137                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2138                                                       th, capa);
2139                 RETURN(rc);
2140         } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2141                    !strcmp(name, XATTR_NAME_LOV)) {
2142                 /* in case of lov EA swap, just set it
2143                  * if not, it is a replay so check striping match what we
2144                  * already have during req replay, declare_xattr_set()
2145                  * defines striping, then create() does the work
2146                 */
2147                 if (fl & LU_XATTR_REPLACE) {
2148                         /* free stripes, then update disk */
2149                         lod_object_free_striping(env, lod_dt_obj(dt));
2150                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2151                 } else {
2152                         rc = lod_striping_create(env, dt, NULL, NULL, th);
2153                 }
2154                 RETURN(rc);
2155         }
2156
2157         /* then all other xattr */
2158         rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2159
2160         RETURN(rc);
2161 }
2162
2163 static int lod_declare_xattr_del(const struct lu_env *env,
2164                                  struct dt_object *dt, const char *name,
2165                                  struct thandle *th)
2166 {
2167         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2168 }
2169
2170 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2171                          const char *name, struct thandle *th,
2172                          struct lustre_capa *capa)
2173 {
2174         if (!strcmp(name, XATTR_NAME_LOV))
2175                 lod_object_free_striping(env, lod_dt_obj(dt));
2176         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2177 }
2178
2179 static int lod_xattr_list(const struct lu_env *env,
2180                           struct dt_object *dt, struct lu_buf *buf,
2181                           struct lustre_capa *capa)
2182 {
2183         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2184 }
2185
2186 int lod_object_set_pool(struct lod_object *o, char *pool)
2187 {
2188         int len;
2189
2190         if (o->ldo_pool) {
2191                 len = strlen(o->ldo_pool);
2192                 OBD_FREE(o->ldo_pool, len + 1);
2193                 o->ldo_pool = NULL;
2194         }
2195         if (pool) {
2196                 len = strlen(pool);
2197                 OBD_ALLOC(o->ldo_pool, len + 1);
2198                 if (o->ldo_pool == NULL)
2199                         return -ENOMEM;
2200                 strcpy(o->ldo_pool, pool);
2201         }
2202         return 0;
2203 }
2204
2205 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2206 {
2207         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2208 }
2209
2210
2211 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2212                                          struct lod_object *lp)
2213 {
2214         struct lod_thread_info  *info = lod_env_info(env);
2215         struct lov_user_md_v1   *v1 = NULL;
2216         struct lov_user_md_v3   *v3 = NULL;
2217         int                      rc;
2218         ENTRY;
2219
2220         /* called from MDD without parent being write locked,
2221          * lock it here */
2222         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2223         rc = lod_get_lov_ea(env, lp);
2224         if (rc < 0)
2225                 GOTO(unlock, rc);
2226
2227         if (rc < sizeof(struct lov_user_md)) {
2228                 /* don't lookup for non-existing or invalid striping */
2229                 lp->ldo_def_striping_set = 0;
2230                 lp->ldo_striping_cached = 1;
2231                 lp->ldo_def_stripe_size = 0;
2232                 lp->ldo_def_stripenr = 0;
2233                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2234                 GOTO(unlock, rc = 0);
2235         }
2236
2237         rc = 0;
2238         v1 = info->lti_ea_store;
2239         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2240                 lustre_swab_lov_user_md_v1(v1);
2241         } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2242                 v3 = (struct lov_user_md_v3 *)v1;
2243                 lustre_swab_lov_user_md_v3(v3);
2244         }
2245
2246         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2247                 GOTO(unlock, rc = 0);
2248
2249         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2250                 GOTO(unlock, rc = 0);
2251
2252         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2253                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2254                (int)v1->lmm_stripe_count,
2255                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2256
2257         lp->ldo_def_stripenr = v1->lmm_stripe_count;
2258         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2259         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2260         lp->ldo_striping_cached = 1;
2261         lp->ldo_def_striping_set = 1;
2262         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2263                 /* XXX: sanity check here */
2264                 v3 = (struct lov_user_md_v3 *) v1;
2265                 if (v3->lmm_pool_name[0])
2266                         lod_object_set_pool(lp, v3->lmm_pool_name);
2267         }
2268         EXIT;
2269 unlock:
2270         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2271         return rc;
2272 }
2273
2274
2275 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2276                                          struct lod_object *lp)
2277 {
2278         struct lod_thread_info  *info = lod_env_info(env);
2279         struct lmv_user_md_v1   *v1 = NULL;
2280         int                      rc;
2281         ENTRY;
2282
2283         /* called from MDD without parent being write locked,
2284          * lock it here */
2285         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2286         rc = lod_get_default_lmv_ea(env, lp);
2287         if (rc < 0)
2288                 GOTO(unlock, rc);
2289
2290         if (rc < sizeof(struct lmv_user_md)) {
2291                 /* don't lookup for non-existing or invalid striping */
2292                 lp->ldo_dir_def_striping_set = 0;
2293                 lp->ldo_dir_striping_cached = 1;
2294                 lp->ldo_dir_def_stripenr = 0;
2295                 lp->ldo_dir_def_stripe_offset =
2296                                         (typeof(v1->lum_stripe_offset))(-1);
2297                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2298                 GOTO(unlock, rc = 0);
2299         }
2300
2301         rc = 0;
2302         v1 = info->lti_ea_store;
2303
2304         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2305         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2306         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2307         lp->ldo_dir_def_striping_set = 1;
2308         lp->ldo_dir_striping_cached = 1;
2309
2310         EXIT;
2311 unlock:
2312         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2313         return rc;
2314 }
2315
2316 static int lod_cache_parent_striping(const struct lu_env *env,
2317                                      struct lod_object *lp,
2318                                      umode_t child_mode)
2319 {
2320         int rc = 0;
2321         ENTRY;
2322
2323         rc = lod_load_striping(env, lp);
2324         if (rc != 0)
2325                 RETURN(rc);
2326
2327         if (!lp->ldo_striping_cached) {
2328                 /* we haven't tried to get default striping for
2329                  * the directory yet, let's cache it in the object */
2330                 rc = lod_cache_parent_lov_striping(env, lp);
2331                 if (rc != 0)
2332                         RETURN(rc);
2333         }
2334
2335         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2336                 rc = lod_cache_parent_lmv_striping(env, lp);
2337
2338         RETURN(rc);
2339 }
2340
2341 /**
2342  * used to transfer default striping data to the object being created
2343  */
2344 static void lod_ah_init(const struct lu_env *env,
2345                         struct dt_allocation_hint *ah,
2346                         struct dt_object *parent,
2347                         struct dt_object *child,
2348                         umode_t child_mode)
2349 {
2350         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2351         struct dt_object  *nextp = NULL;
2352         struct dt_object  *nextc;
2353         struct lod_object *lp = NULL;
2354         struct lod_object *lc;
2355         struct lov_desc   *desc;
2356         int               rc;
2357         ENTRY;
2358
2359         LASSERT(child);
2360
2361         if (likely(parent)) {
2362                 nextp = dt_object_child(parent);
2363                 lp = lod_dt_obj(parent);
2364                 rc = lod_load_striping(env, lp);
2365                 if (rc != 0)
2366                         return;
2367         }
2368
2369         nextc = dt_object_child(child);
2370         lc = lod_dt_obj(child);
2371
2372         LASSERT(lc->ldo_stripenr == 0);
2373         LASSERT(lc->ldo_stripe == NULL);
2374
2375         /*
2376          * local object may want some hints
2377          * in case of late striping creation, ->ah_init()
2378          * can be called with local object existing
2379          */
2380         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2381                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2382                                           NULL : nextp, nextc, child_mode);
2383
2384         if (S_ISDIR(child_mode)) {
2385                 if (lc->ldo_dir_stripe == NULL) {
2386                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2387                         if (lc->ldo_dir_stripe == NULL)
2388                                 return;
2389                 }
2390
2391                 if (lp->ldo_dir_stripe == NULL) {
2392                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2393                         if (lp->ldo_dir_stripe == NULL)
2394                                 return;
2395                 }
2396
2397                 rc = lod_cache_parent_striping(env, lp, child_mode);
2398                 if (rc != 0)
2399                         return;
2400
2401                 /* transfer defaults to new directory */
2402                 if (lp->ldo_striping_cached) {
2403                         if (lp->ldo_pool)
2404                                 lod_object_set_pool(lc, lp->ldo_pool);
2405                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2406                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2407                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2408                         lc->ldo_striping_cached = 1;
2409                         lc->ldo_def_striping_set = 1;
2410                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2411                                (int)lc->ldo_def_stripe_size,
2412                                (int)lc->ldo_def_stripe_offset,
2413                                (int)lc->ldo_def_stripenr);
2414                 }
2415
2416                 /* transfer dir defaults to new directory */
2417                 if (lp->ldo_dir_striping_cached) {
2418                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2419                         lc->ldo_dir_def_stripe_offset =
2420                                                   lp->ldo_dir_def_stripe_offset;
2421                         lc->ldo_dir_def_hash_type =
2422                                                   lp->ldo_dir_def_hash_type;
2423                         lc->ldo_dir_striping_cached = 1;
2424                         lc->ldo_dir_def_striping_set = 1;
2425                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2426                                (int)lc->ldo_dir_def_stripenr,
2427                                (int)lc->ldo_dir_def_stripe_offset,
2428                                lc->ldo_dir_def_hash_type);
2429                 }
2430
2431                 /* It should always honour the specified stripes */
2432                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2433                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2434
2435                         rc = lod_verify_md_striping(d, lum1);
2436                         if (rc == 0 &&
2437                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2438                                 /* Directory will be striped only if
2439                                  * stripe_count > 1 */
2440                                 lc->ldo_stripenr =
2441                                         le32_to_cpu(lum1->lum_stripe_count);
2442                                 lc->ldo_dir_stripe_offset =
2443                                         le32_to_cpu(lum1->lum_stripe_offset);
2444                                 lc->ldo_dir_hash_type =
2445                                         le32_to_cpu(lum1->lum_hash_type);
2446                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2447                                        lc->ldo_stripenr,
2448                                        (int)lc->ldo_dir_stripe_offset);
2449                         }
2450                 /* then check whether there is default stripes from parent */
2451                 } else if (lp->ldo_dir_def_striping_set) {
2452                         /* If there are default dir stripe from parent */
2453                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2454                         lc->ldo_dir_stripe_offset =
2455                                         lp->ldo_dir_def_stripe_offset;
2456                         lc->ldo_dir_hash_type =
2457                                         lp->ldo_dir_def_hash_type;
2458                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2459                                lc->ldo_stripenr,
2460                                (int)lc->ldo_dir_stripe_offset);
2461                 } else {
2462                         /* set default stripe for this directory */
2463                         lc->ldo_stripenr = 0;
2464                         lc->ldo_dir_stripe_offset = -1;
2465                 }
2466
2467                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2468                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2469
2470                 goto out;
2471         }
2472
2473         /*
2474          * if object is going to be striped over OSTs, transfer default
2475          * striping information to the child, so that we can use it
2476          * during declaration and creation
2477          */
2478         if (!lod_object_will_be_striped(S_ISREG(child_mode),
2479                                         lu_object_fid(&child->do_lu)))
2480                 goto out;
2481         /*
2482          * try from the parent
2483          */
2484         if (likely(parent)) {
2485                 lod_cache_parent_striping(env, lp, child_mode);
2486
2487                 lc->ldo_def_stripe_offset = (__u16) -1;
2488
2489                 if (lp->ldo_def_striping_set) {
2490                         if (lp->ldo_pool)
2491                                 lod_object_set_pool(lc, lp->ldo_pool);
2492                         lc->ldo_stripenr = lp->ldo_def_stripenr;
2493                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2494                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2495                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2496                                lc->ldo_stripenr, lc->ldo_stripe_size,
2497                                lp->ldo_pool ? lp->ldo_pool : "");
2498                 }
2499         }
2500
2501         /*
2502          * if the parent doesn't provide with specific pattern, grab fs-wide one
2503          */
2504         desc = &d->lod_desc;
2505         if (lc->ldo_stripenr == 0)
2506                 lc->ldo_stripenr = desc->ld_default_stripe_count;
2507         if (lc->ldo_stripe_size == 0)
2508                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2509         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2510                lc->ldo_stripenr, lc->ldo_stripe_size,
2511                lc->ldo_pool ? lc->ldo_pool : "");
2512
2513 out:
2514         /* we do not cache stripe information for slave stripe, see
2515          * lod_xattr_set_lov_on_dir */
2516         if (lp != NULL && lp->ldo_dir_slave_stripe)
2517                 lod_lov_stripe_cache_clear(lp);
2518
2519         EXIT;
2520 }
2521
2522 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
2523 /*
2524  * this function handles a special case when truncate was done
2525  * on a stripeless object and now striping is being created
2526  * we can't lose that size, so we have to propagate it to newly
2527  * created object
2528  */
2529 static int lod_declare_init_size(const struct lu_env *env,
2530                                  struct dt_object *dt, struct thandle *th)
2531 {
2532         struct dt_object   *next = dt_object_child(dt);
2533         struct lod_object  *lo = lod_dt_obj(dt);
2534         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
2535         uint64_t            size, offs;
2536         int                 rc, stripe;
2537         ENTRY;
2538
2539         /* XXX: we support the simplest (RAID0) striping so far */
2540         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2541         LASSERT(lo->ldo_stripe_size > 0);
2542
2543         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2544         LASSERT(attr->la_valid & LA_SIZE);
2545         if (rc)
2546                 RETURN(rc);
2547
2548         size = attr->la_size;
2549         if (size == 0)
2550                 RETURN(0);
2551
2552         /* ll_do_div64(a, b) returns a % b, and a = a / b */
2553         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2554         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2555
2556         size = size * lo->ldo_stripe_size;
2557         offs = attr->la_size;
2558         size += ll_do_div64(offs, lo->ldo_stripe_size);
2559
2560         attr->la_valid = LA_SIZE;
2561         attr->la_size = size;
2562
2563         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2564
2565         RETURN(rc);
2566 }
2567
2568 /**
2569  * Create declaration of striped object
2570  */
2571 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2572                                struct lu_attr *attr,
2573                                const struct lu_buf *lovea, struct thandle *th)
2574 {
2575         struct lod_thread_info  *info = lod_env_info(env);
2576         struct dt_object        *next = dt_object_child(dt);
2577         struct lod_object       *lo = lod_dt_obj(dt);
2578         int                      rc;
2579         ENTRY;
2580
2581         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2582                 /* failed to create striping, let's reset
2583                  * config so that others don't get confused */
2584                 lod_object_free_striping(env, lo);
2585                 GOTO(out, rc = -ENOMEM);
2586         }
2587
2588         if (!dt_object_remote(next)) {
2589                 /* choose OST and generate appropriate objects */
2590                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2591                 if (rc) {
2592                         /* failed to create striping, let's reset
2593                          * config so that others don't get confused */
2594                         lod_object_free_striping(env, lo);
2595                         GOTO(out, rc);
2596                 }
2597
2598                 /*
2599                  * declare storage for striping data
2600                  */
2601                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2602                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
2603         } else {
2604                 /* LOD can not choose OST objects for remote objects, i.e.
2605                  * stripes must be ready before that. Right now, it can only
2606                  * happen during migrate, i.e. migrate process needs to create
2607                  * remote regular file (mdd_migrate_create), then the migrate
2608                  * process will provide stripeEA. */
2609                 LASSERT(lovea != NULL);
2610                 info->lti_buf = *lovea;
2611         }
2612
2613         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2614                                   XATTR_NAME_LOV, 0, th);
2615         if (rc)
2616                 GOTO(out, rc);
2617
2618         /*
2619          * if striping is created with local object's size > 0,
2620          * we have to propagate this size to specific object
2621          * the case is possible only when local object was created previously
2622          */
2623         if (dt_object_exists(next))
2624                 rc = lod_declare_init_size(env, dt, th);
2625
2626 out:
2627         RETURN(rc);
2628 }
2629
2630 static int lod_declare_object_create(const struct lu_env *env,
2631                                      struct dt_object *dt,
2632                                      struct lu_attr *attr,
2633                                      struct dt_allocation_hint *hint,
2634                                      struct dt_object_format *dof,
2635                                      struct thandle *th)
2636 {
2637         struct dt_object   *next = dt_object_child(dt);
2638         struct lod_object  *lo = lod_dt_obj(dt);
2639         int                 rc;
2640         ENTRY;
2641
2642         LASSERT(dof);
2643         LASSERT(attr);
2644         LASSERT(th);
2645
2646         /*
2647          * first of all, we declare creation of local object
2648          */
2649         rc = dt_declare_create(env, next, attr, hint, dof, th);
2650         if (rc)
2651                 GOTO(out, rc);
2652
2653         if (dof->dof_type == DFT_SYM)
2654                 dt->do_body_ops = &lod_body_lnk_ops;
2655
2656         /*
2657          * it's lod_ah_init() who has decided the object will striped
2658          */
2659         if (dof->dof_type == DFT_REGULAR) {
2660                 /* callers don't want stripes */
2661                 /* XXX: all tricky interactions with ->ah_make_hint() decided
2662                  * to use striping, then ->declare_create() behaving differently
2663                  * should be cleaned */
2664                 if (dof->u.dof_reg.striped == 0)
2665                         lo->ldo_stripenr = 0;
2666                 if (lo->ldo_stripenr > 0)
2667                         rc = lod_declare_striped_object(env, dt, attr,
2668                                                         NULL, th);
2669         } else if (dof->dof_type == DFT_DIR) {
2670                 /* Orphan object (like migrating object) does not have
2671                  * lod_dir_stripe, see lod_ah_init */
2672                 if (lo->ldo_dir_stripe != NULL)
2673                         rc = lod_declare_dir_striping_create(env, dt, attr,
2674                                                              dof, th);
2675         }
2676 out:
2677         RETURN(rc);
2678 }
2679
2680 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2681                         struct lu_attr *attr, struct dt_object_format *dof,
2682                         struct thandle *th)
2683 {
2684         struct lod_object *lo = lod_dt_obj(dt);
2685         int                rc = 0, i;
2686         ENTRY;
2687
2688         LASSERT(lo->ldo_striping_cached == 0);
2689
2690         /* create all underlying objects */
2691         for (i = 0; i < lo->ldo_stripenr; i++) {
2692                 LASSERT(lo->ldo_stripe[i]);
2693                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2694
2695                 if (rc)
2696                         break;
2697         }
2698         if (rc == 0)
2699                 rc = lod_generate_and_set_lovea(env, lo, th);
2700
2701         RETURN(rc);
2702 }
2703
2704 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2705                              struct lu_attr *attr,
2706                              struct dt_allocation_hint *hint,
2707                              struct dt_object_format *dof, struct thandle *th)
2708 {
2709         struct dt_object   *next = dt_object_child(dt);
2710         struct lod_object  *lo = lod_dt_obj(dt);
2711         int                 rc;
2712         ENTRY;
2713
2714         /* create local object */
2715         rc = dt_create(env, next, attr, hint, dof, th);
2716         if (rc != 0)
2717                 RETURN(rc);
2718
2719         if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2720             lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2721                 rc = lod_striping_create(env, dt, attr, dof, th);
2722
2723         RETURN(rc);
2724 }
2725
2726 static int lod_declare_object_destroy(const struct lu_env *env,
2727                                       struct dt_object *dt,
2728                                       struct thandle *th)
2729 {
2730         struct dt_object   *next = dt_object_child(dt);
2731         struct lod_object  *lo = lod_dt_obj(dt);
2732         struct lod_thread_info *info = lod_env_info(env);
2733         char               *stripe_name = info->lti_key;
2734         int                 rc, i;
2735         ENTRY;
2736
2737         /*
2738          * load striping information, notice we don't do this when object
2739          * is being initialized as we don't need this information till
2740          * few specific cases like destroy, chown
2741          */
2742         rc = lod_load_striping(env, lo);
2743         if (rc)
2744                 RETURN(rc);
2745
2746         /* declare destroy for all underlying objects */
2747         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2748                 rc = next->do_ops->do_index_try(env, next,
2749                                                 &dt_directory_features);
2750                 if (rc != 0)
2751                         RETURN(rc);
2752
2753                 for (i = 0; i < lo->ldo_stripenr; i++) {
2754                         rc = dt_declare_ref_del(env, next, th);
2755                         if (rc != 0)
2756                                 RETURN(rc);
2757                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2758                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2759                                 i);
2760                         rc = dt_declare_delete(env, next,
2761                                         (const struct dt_key *)stripe_name, th);
2762                         if (rc != 0)
2763                                 RETURN(rc);
2764                 }
2765         }
2766         /*
2767          * we declare destroy for the local object
2768          */
2769         rc = dt_declare_destroy(env, next, th);
2770         if (rc)
2771                 RETURN(rc);
2772
2773         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2774                 RETURN(0);
2775
2776         /* declare destroy all striped objects */
2777         for (i = 0; i < lo->ldo_stripenr; i++) {
2778                 if (likely(lo->ldo_stripe[i] != NULL)) {
2779                         rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2780                         if (rc != 0)
2781                                 break;
2782                 }
2783         }
2784
2785         RETURN(rc);
2786 }
2787
2788 static int lod_object_destroy(const struct lu_env *env,
2789                 struct dt_object *dt, struct thandle *th)
2790 {
2791         struct dt_object  *next = dt_object_child(dt);
2792         struct lod_object *lo = lod_dt_obj(dt);
2793         struct lod_thread_info *info = lod_env_info(env);
2794         char               *stripe_name = info->lti_key;
2795         int                rc, i;
2796         ENTRY;
2797
2798         /* destroy sub-stripe of master object */
2799         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2800                 rc = next->do_ops->do_index_try(env, next,
2801                                                 &dt_directory_features);
2802                 if (rc != 0)
2803                         RETURN(rc);
2804
2805                 for (i = 0; i < lo->ldo_stripenr; i++) {
2806                         rc = dt_ref_del(env, next, th);
2807                         if (rc != 0)
2808                                 RETURN(rc);
2809
2810                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2811                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2812                                 i);
2813
2814                         CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
2815                                PFID(lu_object_fid(&dt->do_lu)), stripe_name,
2816                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
2817
2818                         rc = dt_delete(env, next,
2819                                        (const struct dt_key *)stripe_name,
2820                                        th, BYPASS_CAPA);
2821                         if (rc != 0)
2822                                 RETURN(rc);
2823                 }
2824         }
2825         rc = dt_destroy(env, next, th);
2826         if (rc != 0)
2827                 RETURN(rc);
2828
2829         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2830                 RETURN(0);
2831
2832         /* destroy all striped objects */
2833         for (i = 0; i < lo->ldo_stripenr; i++) {
2834                 if (likely(lo->ldo_stripe[i] != NULL) &&
2835                     (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
2836                      i == cfs_fail_val)) {
2837                         rc = dt_destroy(env, lo->ldo_stripe[i], th);
2838                         if (rc != 0)
2839                                 break;
2840                 }
2841         }
2842
2843         RETURN(rc);
2844 }
2845
2846 static int lod_declare_ref_add(const struct lu_env *env,
2847                                struct dt_object *dt, struct thandle *th)
2848 {
2849         return dt_declare_ref_add(env, dt_object_child(dt), th);
2850 }
2851
2852 static int lod_ref_add(const struct lu_env *env,
2853                        struct dt_object *dt, struct thandle *th)
2854 {
2855         return dt_ref_add(env, dt_object_child(dt), th);
2856 }
2857
2858 static int lod_declare_ref_del(const struct lu_env *env,
2859                                struct dt_object *dt, struct thandle *th)
2860 {
2861         return dt_declare_ref_del(env, dt_object_child(dt), th);
2862 }
2863
2864 static int lod_ref_del(const struct lu_env *env,
2865                        struct dt_object *dt, struct thandle *th)
2866 {
2867         return dt_ref_del(env, dt_object_child(dt), th);
2868 }
2869
2870 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2871                                      struct dt_object *dt,
2872                                      struct lustre_capa *old, __u64 opc)
2873 {
2874         return dt_capa_get(env, dt_object_child(dt), old, opc);
2875 }
2876
2877 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
2878                            __u64 start, __u64 end)
2879 {
2880         return dt_object_sync(env, dt_object_child(dt), start, end);
2881 }
2882
2883 struct lod_slave_locks  {
2884         int                     lsl_lock_count;
2885         struct lustre_handle    lsl_handle[0];
2886 };
2887
2888 static int lod_object_unlock_internal(const struct lu_env *env,
2889                                       struct dt_object *dt,
2890                                       struct ldlm_enqueue_info *einfo,
2891                                       ldlm_policy_data_t *policy)
2892 {
2893         struct lod_object       *lo = lod_dt_obj(dt);
2894         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2895         int                     rc = 0;
2896         int                     i;
2897         ENTRY;
2898
2899         if (slave_locks == NULL)
2900                 RETURN(0);
2901
2902         for (i = 1; i < slave_locks->lsl_lock_count; i++) {
2903                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2904                         int     rc1;
2905
2906                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2907                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2908                                                policy);
2909                         if (rc1 < 0)
2910                                 rc = rc == 0 ? rc1 : rc;
2911                 }
2912         }
2913
2914         RETURN(rc);
2915 }
2916
2917 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2918                              struct ldlm_enqueue_info *einfo,
2919                              union ldlm_policy_data *policy)
2920 {
2921         struct lod_object       *lo = lod_dt_obj(dt);
2922         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2923         int                     slave_locks_size;
2924         int                     rc;
2925         ENTRY;
2926
2927         if (slave_locks == NULL)
2928                 RETURN(0);
2929
2930         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2931                 RETURN(-ENOTDIR);
2932
2933         rc = lod_load_striping(env, lo);
2934         if (rc != 0)
2935                 RETURN(rc);
2936
2937         /* Note: for remote lock for single stripe dir, MDT will cancel
2938          * the lock by lockh directly */
2939         if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
2940                 RETURN(0);
2941
2942         /* Only cancel slave lock for striped dir */
2943         rc = lod_object_unlock_internal(env, dt, einfo, policy);
2944
2945         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2946                            sizeof(slave_locks->lsl_handle[0]);
2947         OBD_FREE(slave_locks, slave_locks_size);
2948         einfo->ei_cbdata = NULL;
2949
2950         RETURN(rc);
2951 }
2952
2953 static int lod_object_lock(const struct lu_env *env,
2954                            struct dt_object *dt,
2955                            struct lustre_handle *lh,
2956                            struct ldlm_enqueue_info *einfo,
2957                            union ldlm_policy_data *policy)
2958 {
2959         struct lod_object       *lo = lod_dt_obj(dt);
2960         int                     rc = 0;
2961         int                     i;
2962         int                     slave_locks_size;
2963         struct lod_slave_locks  *slave_locks = NULL;
2964         ENTRY;
2965
2966         /* remote object lock */
2967         if (!einfo->ei_enq_slave) {
2968                 LASSERT(dt_object_remote(dt));
2969                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2970                                       policy);
2971         }
2972
2973         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2974                 RETURN(-ENOTDIR);
2975
2976         rc = lod_load_striping(env, lo);
2977         if (rc != 0)
2978                 RETURN(rc);
2979
2980         /* No stripes */
2981         if (lo->ldo_stripenr <= 1)
2982                 RETURN(0);
2983
2984         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2985                            sizeof(slave_locks->lsl_handle[0]);
2986         /* Freed in lod_object_unlock */
2987         OBD_ALLOC(slave_locks, slave_locks_size);
2988         if (slave_locks == NULL)
2989                 RETURN(-ENOMEM);
2990         slave_locks->lsl_lock_count = lo->ldo_stripenr;
2991
2992         /* striped directory lock */
2993         for (i = 1; i < lo->ldo_stripenr; i++) {
2994                 struct lustre_handle    lockh;
2995                 struct ldlm_res_id      *res_id;
2996
2997                 res_id = &lod_env_info(env)->lti_res_id;
2998                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
2999                                        res_id);
3000                 einfo->ei_res_id = res_id;
3001
3002                 LASSERT(lo->ldo_stripe[i]);
3003                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3004                                     policy);
3005                 if (rc != 0)
3006                         GOTO(out, rc);
3007                 slave_locks->lsl_handle[i] = lockh;
3008         }
3009
3010         einfo->ei_cbdata = slave_locks;
3011
3012 out:
3013         if (rc != 0 && slave_locks != NULL) {
3014                 einfo->ei_cbdata = slave_locks;
3015                 lod_object_unlock_internal(env, dt, einfo, policy);
3016                 OBD_FREE(slave_locks, slave_locks_size);
3017                 einfo->ei_cbdata = NULL;
3018         }
3019
3020         RETURN(rc);
3021 }
3022
3023 struct dt_object_operations lod_obj_ops = {
3024         .do_read_lock           = lod_object_read_lock,
3025         .do_write_lock          = lod_object_write_lock,
3026         .do_read_unlock         = lod_object_read_unlock,
3027         .do_write_unlock        = lod_object_write_unlock,
3028         .do_write_locked        = lod_object_write_locked,
3029         .do_attr_get            = lod_attr_get,
3030         .do_declare_attr_set    = lod_declare_attr_set,
3031         .do_attr_set            = lod_attr_set,
3032         .do_xattr_get           = lod_xattr_get,
3033         .do_declare_xattr_set   = lod_declare_xattr_set,
3034         .do_xattr_set           = lod_xattr_set,
3035         .do_declare_xattr_del   = lod_declare_xattr_del,
3036         .do_xattr_del           = lod_xattr_del,
3037         .do_xattr_list          = lod_xattr_list,
3038         .do_ah_init             = lod_ah_init,
3039         .do_declare_create      = lod_declare_object_create,
3040         .do_create              = lod_object_create,
3041         .do_declare_destroy     = lod_declare_object_destroy,
3042         .do_destroy             = lod_object_destroy,
3043         .do_index_try           = lod_index_try,
3044         .do_declare_ref_add     = lod_declare_ref_add,
3045         .do_ref_add             = lod_ref_add,
3046         .do_declare_ref_del     = lod_declare_ref_del,
3047         .do_ref_del             = lod_ref_del,
3048         .do_capa_get            = lod_capa_get,
3049         .do_object_sync         = lod_object_sync,
3050         .do_object_lock         = lod_object_lock,
3051         .do_object_unlock       = lod_object_unlock,
3052 };
3053
3054 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3055                         struct lu_buf *buf, loff_t *pos,
3056                         struct lustre_capa *capa)
3057 {
3058         struct dt_object *next = dt_object_child(dt);
3059         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3060 }
3061
3062 static ssize_t lod_declare_write(const struct lu_env *env,
3063                                  struct dt_object *dt,
3064                                  const struct lu_buf *buf, loff_t pos,
3065                                  struct thandle *th)
3066 {
3067         return dt_declare_record_write(env, dt_object_child(dt),
3068                                        buf, pos, th);
3069 }
3070
3071 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3072                          const struct lu_buf *buf, loff_t *pos,
3073                          struct thandle *th, struct lustre_capa *capa, int iq)
3074 {
3075         struct dt_object *next = dt_object_child(dt);
3076         LASSERT(next);
3077         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3078 }
3079
3080 static const struct dt_body_operations lod_body_lnk_ops = {
3081         .dbo_read               = lod_read,
3082         .dbo_declare_write      = lod_declare_write,
3083         .dbo_write              = lod_write
3084 };
3085
3086 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3087                            const struct lu_object_conf *conf)
3088 {
3089         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
3090         struct lu_device        *cdev   = NULL;
3091         struct lu_object        *cobj;
3092         struct lod_tgt_descs    *ltd    = NULL;
3093         struct lod_tgt_desc     *tgt;
3094         mdsno_t                  idx    = 0;
3095         int                      type   = LU_SEQ_RANGE_ANY;
3096         int                      rc;
3097         ENTRY;
3098
3099         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3100         if (rc != 0)
3101                 RETURN(rc);
3102
3103         if (type == LU_SEQ_RANGE_MDT &&
3104             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3105                 cdev = &lod->lod_child->dd_lu_dev;
3106         } else if (type == LU_SEQ_RANGE_MDT) {
3107                 ltd = &lod->lod_mdt_descs;
3108                 lod_getref(ltd);
3109         } else if (type == LU_SEQ_RANGE_OST) {
3110                 ltd = &lod->lod_ost_descs;
3111                 lod_getref(ltd);
3112         } else {
3113                 LBUG();
3114         }
3115
3116         if (ltd != NULL) {
3117                 if (ltd->ltd_tgts_size > idx &&
3118                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3119                         tgt = LTD_TGT(ltd, idx);
3120
3121                         LASSERT(tgt != NULL);
3122                         LASSERT(tgt->ltd_tgt != NULL);
3123
3124                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
3125                 }
3126                 lod_putref(lod, ltd);
3127         }
3128
3129         if (unlikely(cdev == NULL))
3130                 RETURN(-ENOENT);
3131
3132         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3133         if (unlikely(cobj == NULL))
3134                 RETURN(-ENOMEM);
3135
3136         lu_object_add(lo, cobj);
3137
3138         RETURN(0);
3139 }
3140
3141 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3142 {
3143         int i;
3144
3145         if (lo->ldo_dir_stripe != NULL) {
3146                 OBD_FREE_PTR(lo->ldo_dir_stripe);
3147                 lo->ldo_dir_stripe = NULL;
3148         }
3149
3150         if (lo->ldo_stripe) {
3151                 LASSERT(lo->ldo_stripes_allocated > 0);
3152
3153                 for (i = 0; i < lo->ldo_stripenr; i++) {
3154                         if (lo->ldo_stripe[i])
3155                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3156                 }
3157
3158                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3159                 OBD_FREE(lo->ldo_stripe, i);
3160                 lo->ldo_stripe = NULL;
3161                 lo->ldo_stripes_allocated = 0;
3162         }
3163         lo->ldo_stripenr = 0;
3164         lo->ldo_pattern = 0;
3165 }
3166
3167 /*
3168  * ->start is called once all slices are initialized, including header's
3169  * cache for mode (object type). using the type we can initialize ops
3170  */
3171 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3172 {
3173         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3174                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3175         return 0;
3176 }
3177
3178 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3179 {
3180         struct lod_object *mo = lu2lod_obj(o);
3181
3182         /*
3183          * release all underlying object pinned
3184          */
3185
3186         lod_object_free_striping(env, mo);
3187
3188         lod_object_set_pool(mo, NULL);
3189
3190         lu_object_fini(o);
3191         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3192 }
3193
3194 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3195 {
3196         /* XXX: shouldn't we release everything here in case if object
3197          * creation failed before? */
3198 }
3199
3200 static int lod_object_print(const struct lu_env *env, void *cookie,
3201                             lu_printer_t p, const struct lu_object *l)
3202 {
3203         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3204
3205         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3206 }
3207
3208 struct lu_object_operations lod_lu_obj_ops = {
3209         .loo_object_init        = lod_object_init,
3210         .loo_object_start       = lod_object_start,
3211         .loo_object_free        = lod_object_free,
3212         .loo_object_release     = lod_object_release,
3213         .loo_object_print       = lod_object_print,
3214 };