Whamcloud - gitweb
25cc60d84a2c06fcf7271a7befaa65c171884d5d
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47 #include <lustre_linkea.h>
48
49 #include "lod_internal.h"
50
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
53
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
56
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58                             struct dt_rec *rec, const struct dt_key *key,
59                             struct lustre_capa *capa)
60 {
61         struct dt_object *next = dt_object_child(dt);
62         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
63 }
64
65 static int lod_declare_index_insert(const struct lu_env *env,
66                                     struct dt_object *dt,
67                                     const struct dt_rec *rec,
68                                     const struct dt_key *key,
69                                     struct thandle *handle)
70 {
71         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
72 }
73
74 static int lod_index_insert(const struct lu_env *env,
75                             struct dt_object *dt,
76                             const struct dt_rec *rec,
77                             const struct dt_key *key,
78                             struct thandle *th,
79                             struct lustre_capa *capa,
80                             int ign)
81 {
82         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
83 }
84
85 static int lod_declare_index_delete(const struct lu_env *env,
86                                     struct dt_object *dt,
87                                     const struct dt_key *key,
88                                     struct thandle *th)
89 {
90         return dt_declare_delete(env, dt_object_child(dt), key, th);
91 }
92
93 static int lod_index_delete(const struct lu_env *env,
94                             struct dt_object *dt,
95                             const struct dt_key *key,
96                             struct thandle *th,
97                             struct lustre_capa *capa)
98 {
99         return dt_delete(env, dt_object_child(dt), key, th, capa);
100 }
101
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103                                  struct dt_object *dt, __u32 attr,
104                                  struct lustre_capa *capa)
105 {
106         struct dt_object        *next = dt_object_child(dt);
107         struct lod_it           *it = &lod_env_info(env)->lti_it;
108         struct dt_it            *it_next;
109
110
111         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
112         if (IS_ERR(it_next))
113                 return it_next;
114
115         /* currently we do not use more than one iterator per thread
116          * so we store it in thread info. if at some point we need
117          * more active iterators in a single thread, we can allocate
118          * additional ones */
119         LASSERT(it->lit_obj == NULL);
120
121         it->lit_it = it_next;
122         it->lit_obj = next;
123
124         return (struct dt_it *)it;
125 }
126
127 #define LOD_CHECK_IT(env, it)                                   \
128 do {                                                            \
129         LASSERT((it)->lit_obj != NULL);                         \
130         LASSERT((it)->lit_it != NULL);                          \
131 } while (0)
132
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
134 {
135         struct lod_it *it = (struct lod_it *)di;
136
137         LOD_CHECK_IT(env, it);
138         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
139
140         /* the iterator not in use any more */
141         it->lit_obj = NULL;
142         it->lit_it = NULL;
143 }
144
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146                const struct dt_key *key)
147 {
148         const struct lod_it *it = (const struct lod_it *)di;
149
150         LOD_CHECK_IT(env, it);
151         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
152 }
153
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
155 {
156         struct lod_it *it = (struct lod_it *)di;
157
158         LOD_CHECK_IT(env, it);
159         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
160 }
161
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
163 {
164         struct lod_it *it = (struct lod_it *)di;
165
166         LOD_CHECK_IT(env, it);
167         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
168 }
169
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
171 {
172         const struct lod_it *it = (const struct lod_it *)di;
173
174         LOD_CHECK_IT(env, it);
175         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
176 }
177
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
179 {
180         struct lod_it *it = (struct lod_it *)di;
181
182         LOD_CHECK_IT(env, it);
183         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
184 }
185
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187                struct dt_rec *rec, __u32 attr)
188 {
189         const struct lod_it *it = (const struct lod_it *)di;
190
191         LOD_CHECK_IT(env, it);
192         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec,
193                                                      attr);
194 }
195
196 int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di,
197                     __u32 attr)
198 {
199         const struct lod_it *it = (const struct lod_it *)di;
200
201         LOD_CHECK_IT(env, it);
202         return it->lit_obj->do_index_ops->dio_it.rec_size(env, it->lit_it,
203                                                           attr);
204 }
205
206 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
207 {
208         const struct lod_it *it = (const struct lod_it *)di;
209
210         LOD_CHECK_IT(env, it);
211         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
212 }
213
214 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
215 {
216         const struct lod_it *it = (const struct lod_it *)di;
217
218         LOD_CHECK_IT(env, it);
219         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
220 }
221
222 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
223                    void *key_rec)
224 {
225         const struct lod_it *it = (const struct lod_it *)di;
226
227         LOD_CHECK_IT(env, it);
228         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it,
229                                                          key_rec);
230 }
231
232 static struct dt_index_operations lod_index_ops = {
233         .dio_lookup             = lod_index_lookup,
234         .dio_declare_insert     = lod_declare_index_insert,
235         .dio_insert             = lod_index_insert,
236         .dio_declare_delete     = lod_declare_index_delete,
237         .dio_delete             = lod_index_delete,
238         .dio_it = {
239                 .init           = lod_it_init,
240                 .fini           = lod_it_fini,
241                 .get            = lod_it_get,
242                 .put            = lod_it_put,
243                 .next           = lod_it_next,
244                 .key            = lod_it_key,
245                 .key_size       = lod_it_key_size,
246                 .rec            = lod_it_rec,
247                 .rec_size       = lod_it_rec_size,
248                 .store          = lod_it_store,
249                 .load           = lod_it_load,
250                 .key_rec        = lod_it_key_rec,
251         }
252 };
253
254 /**
255  * Implementation of dt_index_operations:: dio_it.init
256  *
257  * This function is to initialize the iterator for striped directory,
258  * basically these lod_striped_it_xxx will just locate the stripe
259  * and call the correspondent api of its next lower layer.
260  *
261  * \param[in] env       execution environment.
262  * \param[in] dt        the striped directory object to be iterated.
263  * \param[in] attr      the attribute of iterator, mostly used to indicate
264  *                      the entry attribute in the object to be iterated.
265  * \param[in] capa      capability(useless in current implementation)
266  *
267  * \retval      initialized iterator(dt_it) if successful initialize the
268  *              iteration. lit_stripe_index will be used to indicate the
269  *              current iterate position among stripes.
270  * \retval      ERR pointer if initialization is failed.
271  */
272 static struct dt_it *lod_striped_it_init(const struct lu_env *env,
273                                          struct dt_object *dt, __u32 attr,
274                                          struct lustre_capa *capa)
275 {
276         struct lod_object       *lo = lod_dt_obj(dt);
277         struct dt_object        *next;
278         struct lod_it           *it = &lod_env_info(env)->lti_it;
279         struct dt_it            *it_next;
280         ENTRY;
281
282         LASSERT(lo->ldo_stripenr > 0);
283         next = lo->ldo_stripe[0];
284         LASSERT(next != NULL);
285         LASSERT(next->do_index_ops != NULL);
286
287         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
288         if (IS_ERR(it_next))
289                 return it_next;
290
291         /* currently we do not use more than one iterator per thread
292          * so we store it in thread info. if at some point we need
293          * more active iterators in a single thread, we can allocate
294          * additional ones */
295         LASSERT(it->lit_obj == NULL);
296
297         it->lit_stripe_index = 0;
298         it->lit_attr = attr;
299         it->lit_it = it_next;
300         it->lit_obj = dt;
301
302         return (struct dt_it *)it;
303 }
304
305 #define LOD_CHECK_STRIPED_IT(env, it, lo)                       \
306 do {                                                            \
307         LASSERT((it)->lit_obj != NULL);                         \
308         LASSERT((it)->lit_it != NULL);                          \
309         LASSERT((lo)->ldo_stripenr > 0);                        \
310         LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr);   \
311 } while (0)
312
313 /**
314  * Implementation of dt_index_operations:: dio_it.fini
315  *
316  * This function is to finish the iterator for striped directory.
317  *
318  * \param[in] env       execution environment.
319  * \param[in] di        the iterator for the striped directory
320  *
321  */
322 static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di)
323 {
324         struct lod_it           *it = (struct lod_it *)di;
325         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
326         struct dt_object        *next;
327
328         LOD_CHECK_STRIPED_IT(env, it, lo);
329
330         next = lo->ldo_stripe[it->lit_stripe_index];
331         LASSERT(next != NULL);
332         LASSERT(next->do_index_ops != NULL);
333
334         next->do_index_ops->dio_it.fini(env, it->lit_it);
335
336         /* the iterator not in use any more */
337         it->lit_obj = NULL;
338         it->lit_it = NULL;
339         it->lit_stripe_index = 0;
340 }
341
342 /**
343  * Implementation of dt_index_operations:: dio_it.get
344  *
345  * This function is to position the iterator with given key
346  *
347  * \param[in] env       execution environment.
348  * \param[in] di        the iterator for striped directory.
349  * \param[in] key       the key the iterator will be positioned.
350  *
351  * \retval      0 if successfully position iterator by the key.
352  * \retval      negative error if position is failed.
353  */
354 static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di,
355                               const struct dt_key *key)
356 {
357         const struct lod_it     *it = (const struct lod_it *)di;
358         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
359         struct dt_object        *next;
360         ENTRY;
361
362         LOD_CHECK_STRIPED_IT(env, it, lo);
363
364         next = lo->ldo_stripe[it->lit_stripe_index];
365         LASSERT(next != NULL);
366         LASSERT(next->do_index_ops != NULL);
367
368         return next->do_index_ops->dio_it.get(env, it->lit_it, key);
369 }
370
371 /**
372  * Implementation of dt_index_operations:: dio_it.put
373  *
374  * This function is supposed to be the pair of it_get, but currently do
375  * nothing. see (osd_it_ea_put or osd_index_it_put)
376  */
377 static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di)
378 {
379         struct lod_it           *it = (struct lod_it *)di;
380         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
381         struct dt_object        *next;
382
383         LOD_CHECK_STRIPED_IT(env, it, lo);
384
385         next = lo->ldo_stripe[it->lit_stripe_index];
386         LASSERT(next != NULL);
387         LASSERT(next->do_index_ops != NULL);
388
389         return next->do_index_ops->dio_it.put(env, it->lit_it);
390 }
391
392 /**
393  * Implementation of dt_index_operations:: dio_it.next
394  *
395  * This function is to position the iterator to the next entry, if current
396  * stripe is finished by checking the return value of next() in current
397  * stripe. it will go to next stripe. In the mean time, the sub-iterator
398  * for next stripe needs to be initialized.
399  *
400  * \param[in] env       execution environment.
401  * \param[in] di        the iterator for striped directory.
402  *
403  * \retval      0 if successfully position iterator to the next entry.
404  * \retval      negative error if position is failed.
405  */
406 static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di)
407 {
408         struct lod_it           *it = (struct lod_it *)di;
409         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
410         struct dt_object        *next;
411         struct dt_it            *it_next;
412         int                     rc;
413         ENTRY;
414
415         LOD_CHECK_STRIPED_IT(env, it, lo);
416
417         next = lo->ldo_stripe[it->lit_stripe_index];
418         LASSERT(next != NULL);
419         LASSERT(next->do_index_ops != NULL);
420 again:
421         rc = next->do_index_ops->dio_it.next(env, it->lit_it);
422         if (rc < 0)
423                 RETURN(rc);
424
425         if (rc == 0 && it->lit_stripe_index == 0)
426                 RETURN(rc);
427
428         if (rc == 0 && it->lit_stripe_index > 0) {
429                 struct lu_dirent *ent;
430
431                 ent = (struct lu_dirent *)lod_env_info(env)->lti_key;
432
433                 rc = next->do_index_ops->dio_it.rec(env, it->lit_it,
434                                                     (struct dt_rec *)ent,
435                                                     it->lit_attr);
436                 if (rc != 0)
437                         RETURN(rc);
438
439                 /* skip . and .. for slave stripe */
440                 if ((strncmp(ent->lde_name, ".",
441                              le16_to_cpu(ent->lde_namelen)) == 0 &&
442                      le16_to_cpu(ent->lde_namelen) == 1) ||
443                     (strncmp(ent->lde_name, "..",
444                              le16_to_cpu(ent->lde_namelen)) == 0 &&
445                      le16_to_cpu(ent->lde_namelen) == 2))
446                         goto again;
447
448                 RETURN(rc);
449         }
450
451         /* go to next stripe */
452         if (it->lit_stripe_index + 1 >= lo->ldo_stripenr)
453                 RETURN(1);
454
455         it->lit_stripe_index++;
456
457         next->do_index_ops->dio_it.put(env, it->lit_it);
458         next->do_index_ops->dio_it.fini(env, it->lit_it);
459
460         rc = next->do_ops->do_index_try(env, next, &dt_directory_features);
461         if (rc != 0)
462                 RETURN(rc);
463
464         next = lo->ldo_stripe[it->lit_stripe_index];
465         LASSERT(next != NULL);
466         LASSERT(next->do_index_ops != NULL);
467
468         it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr,
469                                                   BYPASS_CAPA);
470         if (!IS_ERR(it_next)) {
471                 it->lit_it = it_next;
472                 goto again;
473         } else {
474                 rc = PTR_ERR(it_next);
475         }
476
477         RETURN(rc);
478 }
479
480 /**
481  * Implementation of dt_index_operations:: dio_it.key
482  *
483  * This function is to get the key of the iterator at current position.
484  *
485  * \param[in] env       execution environment.
486  * \param[in] di        the iterator for striped directory.
487  *
488  * \retval      key(dt_key) if successfully get the key.
489  * \retval      negative error if can not get the key.
490  */
491 static struct dt_key *lod_striped_it_key(const struct lu_env *env,
492                                          const struct dt_it *di)
493 {
494         const struct lod_it     *it = (const struct lod_it *)di;
495         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
496         struct dt_object        *next;
497
498         LOD_CHECK_STRIPED_IT(env, it, lo);
499
500         next = lo->ldo_stripe[it->lit_stripe_index];
501         LASSERT(next != NULL);
502         LASSERT(next->do_index_ops != NULL);
503
504         return next->do_index_ops->dio_it.key(env, it->lit_it);
505 }
506
507 /**
508  * Implementation of dt_index_operations:: dio_it.key_size
509  *
510  * This function is to get the key_size of current key.
511  *
512  * \param[in] env       execution environment.
513  * \param[in] di        the iterator for striped directory.
514  *
515  * \retval      key_size if successfully get the key_size.
516  * \retval      negative error if can not get the key_size.
517  */
518 static int lod_striped_it_key_size(const struct lu_env *env,
519                                    const struct dt_it *di)
520 {
521         struct lod_it           *it = (struct lod_it *)di;
522         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
523         struct dt_object        *next;
524
525         LOD_CHECK_STRIPED_IT(env, it, lo);
526
527         next = lo->ldo_stripe[it->lit_stripe_index];
528         LASSERT(next != NULL);
529         LASSERT(next->do_index_ops != NULL);
530
531         return next->do_index_ops->dio_it.key_size(env, it->lit_it);
532 }
533
534 /**
535  * Implementation of dt_index_operations:: dio_it.rec
536  *
537  * This function is to get the record at current position.
538  *
539  * \param[in] env       execution environment.
540  * \param[in] di        the iterator for striped directory.
541  * \param[in] attr      the attribute of iterator, mostly used to indicate
542  *                      the entry attribute in the object to be iterated.
543  * \param[out] rec      hold the return record.
544  *
545  * \retval      0 if successfully get the entry.
546  * \retval      negative error if can not get entry.
547  */
548 static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di,
549                               struct dt_rec *rec, __u32 attr)
550 {
551         const struct lod_it     *it = (const struct lod_it *)di;
552         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
553         struct dt_object        *next;
554
555         LOD_CHECK_STRIPED_IT(env, it, lo);
556
557         next = lo->ldo_stripe[it->lit_stripe_index];
558         LASSERT(next != NULL);
559         LASSERT(next->do_index_ops != NULL);
560
561         return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
562 }
563
564 /**
565  * Implementation of dt_index_operations:: dio_it.rec_size
566  *
567  * This function is to get the record_size at current record.
568  *
569  * \param[in] env       execution environment.
570  * \param[in] di        the iterator for striped directory.
571  * \param[in] attr      the attribute of iterator, mostly used to indicate
572  *                      the entry attribute in the object to be iterated.
573  *
574  * \retval      rec_size if successfully get the entry size.
575  * \retval      negative error if can not get entry size.
576  */
577 static int lod_striped_it_rec_size(const struct lu_env *env,
578                                    const struct dt_it *di, __u32 attr)
579 {
580         struct lod_it           *it = (struct lod_it *)di;
581         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
582         struct dt_object        *next;
583
584         LOD_CHECK_STRIPED_IT(env, it, lo);
585
586         next = lo->ldo_stripe[it->lit_stripe_index];
587         LASSERT(next != NULL);
588         LASSERT(next->do_index_ops != NULL);
589
590         return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr);
591 }
592
593 /**
594  * Implementation of dt_index_operations:: dio_it.store
595  *
596  * This function will a cookie for current position of the iterator head,
597  * so that user can use this cookie to load/start the iterator next time.
598  *
599  * \param[in] env       execution environment.
600  * \param[in] di        the iterator for striped directory.
601  *
602  * \retval      the cookie.
603  */
604 static __u64 lod_striped_it_store(const struct lu_env *env,
605                                   const struct dt_it *di)
606 {
607         const struct lod_it     *it = (const struct lod_it *)di;
608         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
609         struct dt_object        *next;
610
611         LOD_CHECK_STRIPED_IT(env, it, lo);
612
613         next = lo->ldo_stripe[it->lit_stripe_index];
614         LASSERT(next != NULL);
615         LASSERT(next->do_index_ops != NULL);
616
617         return next->do_index_ops->dio_it.store(env, it->lit_it);
618 }
619
620 /**
621  * Implementation of dt_index_operations:: dio_it.load
622  *
623  * This function will position the iterator with the given hash(usually
624  * get from store),
625  *
626  * \param[in] env       execution environment.
627  * \param[in] di        the iterator for striped directory.
628  * \param[in] hash      the given hash.
629  *
630  * \retval      >0 if successfuly load the iterator to the given position.
631  * \retval      <0 if load is failed.
632  */
633 static int lod_striped_it_load(const struct lu_env *env,
634                                const struct dt_it *di, __u64 hash)
635 {
636         const struct lod_it     *it = (const struct lod_it *)di;
637         struct lod_object       *lo = lod_dt_obj(it->lit_obj);
638         struct dt_object        *next;
639
640         LOD_CHECK_STRIPED_IT(env, it, lo);
641
642         next = lo->ldo_stripe[it->lit_stripe_index];
643         LASSERT(next != NULL);
644         LASSERT(next->do_index_ops != NULL);
645
646         return next->do_index_ops->dio_it.load(env, it->lit_it, hash);
647 }
648
649 static struct dt_index_operations lod_striped_index_ops = {
650         .dio_lookup             = lod_index_lookup,
651         .dio_declare_insert     = lod_declare_index_insert,
652         .dio_insert             = lod_index_insert,
653         .dio_declare_delete     = lod_declare_index_delete,
654         .dio_delete             = lod_index_delete,
655         .dio_it = {
656                 .init           = lod_striped_it_init,
657                 .fini           = lod_striped_it_fini,
658                 .get            = lod_striped_it_get,
659                 .put            = lod_striped_it_put,
660                 .next           = lod_striped_it_next,
661                 .key            = lod_striped_it_key,
662                 .key_size       = lod_striped_it_key_size,
663                 .rec            = lod_striped_it_rec,
664                 .rec_size       = lod_striped_it_rec_size,
665                 .store          = lod_striped_it_store,
666                 .load           = lod_striped_it_load,
667         }
668 };
669
670 /**
671  * Implementation of dt_object_operations:: do_index_try
672  *
673  * This function will try to initialize the index api pointer for the
674  * given object, usually it the entry point of the index api. i.e.
675  * the index object should be initialized in index_try, then start
676  * using index api. For striped directory, it will try to initialize
677  * all of its sub_stripes.
678  *
679  * \param[in] env       execution environment.
680  * \param[in] dt        the index object to be initialized.
681  * \param[in] feat      the features of this object, for example fixed or
682  *                      variable key size etc.
683  *
684  * \retval      >0 if the initialization is successful.
685  * \retval      <0 if the initialization is failed.
686  */
687 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
688                          const struct dt_index_features *feat)
689 {
690         struct lod_object       *lo = lod_dt_obj(dt);
691         struct dt_object        *next = dt_object_child(dt);
692         int                     rc;
693         ENTRY;
694
695         LASSERT(next->do_ops);
696         LASSERT(next->do_ops->do_index_try);
697
698         rc = lod_load_striping_locked(env, lo);
699         if (rc != 0)
700                 RETURN(rc);
701
702         rc = next->do_ops->do_index_try(env, next, feat);
703         if (rc != 0)
704                 RETURN(rc);
705
706         if (lo->ldo_stripenr > 0) {
707                 int i;
708
709                 for (i = 0; i < lo->ldo_stripenr; i++) {
710                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
711                                 continue;
712                         rc = lo->ldo_stripe[i]->do_ops->do_index_try(env,
713                                                 lo->ldo_stripe[i], feat);
714                         if (rc != 0)
715                                 RETURN(rc);
716                 }
717                 dt->do_index_ops = &lod_striped_index_ops;
718         } else {
719                 dt->do_index_ops = &lod_index_ops;
720         }
721
722         RETURN(rc);
723 }
724
725 static void lod_object_read_lock(const struct lu_env *env,
726                                  struct dt_object *dt, unsigned role)
727 {
728         dt_read_lock(env, dt_object_child(dt), role);
729 }
730
731 static void lod_object_write_lock(const struct lu_env *env,
732                                   struct dt_object *dt, unsigned role)
733 {
734         dt_write_lock(env, dt_object_child(dt), role);
735 }
736
737 static void lod_object_read_unlock(const struct lu_env *env,
738                                    struct dt_object *dt)
739 {
740         dt_read_unlock(env, dt_object_child(dt));
741 }
742
743 static void lod_object_write_unlock(const struct lu_env *env,
744                                     struct dt_object *dt)
745 {
746         dt_write_unlock(env, dt_object_child(dt));
747 }
748
749 static int lod_object_write_locked(const struct lu_env *env,
750                                    struct dt_object *dt)
751 {
752         return dt_write_locked(env, dt_object_child(dt));
753 }
754
755 static int lod_attr_get(const struct lu_env *env,
756                         struct dt_object *dt,
757                         struct lu_attr *attr,
758                         struct lustre_capa *capa)
759 {
760         struct lod_object *lo = lod_dt_obj(dt);
761         int i;
762         int rc;
763         ENTRY;
764
765         rc = dt_attr_get(env, dt_object_child(dt), attr, capa);
766         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr) || rc != 0)
767                 RETURN(rc);
768
769         rc = lod_load_striping_locked(env, lo);
770         if (rc)
771                 RETURN(rc);
772
773         if (lo->ldo_stripenr == 0)
774                 RETURN(rc);
775
776         attr->la_nlink = 2;
777         attr->la_size = 0;
778         for (i = 0; i < lo->ldo_stripenr; i++) {
779                 struct lu_attr *sub_attr = &lod_env_info(env)->lti_attr;
780
781                 LASSERT(lo->ldo_stripe[i]);
782                 if (dt_object_exists(lo->ldo_stripe[i]))
783                         continue;
784
785                 rc = dt_attr_get(env, lo->ldo_stripe[i], sub_attr, capa);
786                 if (rc != 0)
787                         break;
788
789                 /* -2 for . and .. on each stripe */
790                 if (sub_attr->la_valid & LA_NLINK && attr->la_valid & LA_NLINK)
791                         attr->la_nlink += sub_attr->la_nlink - 2;
792                 if (sub_attr->la_valid & LA_SIZE && attr->la_valid & LA_SIZE)
793                         attr->la_size += sub_attr->la_size;
794
795                 if (sub_attr->la_valid & LA_ATIME &&
796                     attr->la_valid & LA_ATIME &&
797                     attr->la_atime < sub_attr->la_atime)
798                         attr->la_atime = sub_attr->la_atime;
799
800                 if (sub_attr->la_valid & LA_CTIME &&
801                     attr->la_valid & LA_CTIME &&
802                     attr->la_ctime < sub_attr->la_ctime)
803                         attr->la_ctime = sub_attr->la_ctime;
804
805                 if (sub_attr->la_valid & LA_MTIME &&
806                     attr->la_valid & LA_MTIME &&
807                     attr->la_mtime < sub_attr->la_mtime)
808                         attr->la_mtime = sub_attr->la_mtime;
809         }
810
811         CDEBUG(D_INFO, DFID" stripe_count %d nlink %u size "LPU64"\n",
812                PFID(lu_object_fid(&dt->do_lu)), lo->ldo_stripenr,
813                attr->la_nlink, attr->la_size);
814
815         RETURN(rc);
816 }
817
818 /**
819  * Mark all of sub-stripes dead of the striped directory.
820  **/
821 static int lod_mark_dead_object(const struct lu_env *env,
822                                 struct dt_object *dt,
823                                 struct thandle *handle,
824                                 bool declare)
825 {
826         struct lod_object       *lo = lod_dt_obj(dt);
827         struct lmv_mds_md_v1    *lmv;
828         __u32                   dead_hash_type;
829         int                     rc;
830         int                     i;
831
832         ENTRY;
833
834         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
835                 RETURN(0);
836
837         rc = lod_load_striping_locked(env, lo);
838         if (rc != 0)
839                 RETURN(rc);
840
841         if (lo->ldo_stripenr == 0)
842                 RETURN(0);
843
844         rc = lod_get_lmv_ea(env, lo);
845         if (rc <= 0)
846                 RETURN(rc);
847
848         lmv = lod_env_info(env)->lti_ea_store;
849         lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
850         dead_hash_type = le32_to_cpu(lmv->lmv_hash_type) | LMV_HASH_FLAG_DEAD;
851         lmv->lmv_hash_type = cpu_to_le32(dead_hash_type);
852         for (i = 0; i < lo->ldo_stripenr; i++) {
853                 struct lu_buf buf;
854
855                 lmv->lmv_master_mdt_index = i;
856                 buf.lb_buf = lmv;
857                 buf.lb_len = sizeof(*lmv);
858                 if (declare) {
859                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf,
860                                                   XATTR_NAME_LMV,
861                                                   LU_XATTR_REPLACE, handle);
862                 } else {
863                         rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf,
864                                           XATTR_NAME_LMV, LU_XATTR_REPLACE,
865                                           handle, BYPASS_CAPA);
866                 }
867                 if (rc != 0)
868                         break;
869         }
870
871         RETURN(rc);
872 }
873
874 static int lod_declare_attr_set(const struct lu_env *env,
875                                 struct dt_object *dt,
876                                 const struct lu_attr *attr,
877                                 struct thandle *handle)
878 {
879         struct dt_object  *next = dt_object_child(dt);
880         struct lod_object *lo = lod_dt_obj(dt);
881         int                rc, i;
882         ENTRY;
883
884         /* Set dead object on all other stripes */
885         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
886             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
887                 rc = lod_mark_dead_object(env, dt, handle, true);
888                 RETURN(rc);
889         }
890
891         /*
892          * declare setattr on the local object
893          */
894         rc = dt_declare_attr_set(env, next, attr, handle);
895         if (rc)
896                 RETURN(rc);
897
898         /* osp_declare_attr_set() ignores all attributes other than
899          * UID, GID, and size, and osp_attr_set() ignores all but UID
900          * and GID.  Declaration of size attr setting happens through
901          * lod_declare_init_size(), and not through this function.
902          * Therefore we need not load striping unless ownership is
903          * changing.  This should save memory and (we hope) speed up
904          * rename(). */
905         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
906                 if (!(attr->la_valid & (LA_UID | LA_GID)))
907                         RETURN(rc);
908
909                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
910                         RETURN(0);
911         } else {
912                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
913                                         LA_ATIME | LA_MTIME | LA_CTIME)))
914                         RETURN(rc);
915         }
916         /*
917          * load striping information, notice we don't do this when object
918          * is being initialized as we don't need this information till
919          * few specific cases like destroy, chown
920          */
921         rc = lod_load_striping(env, lo);
922         if (rc)
923                 RETURN(rc);
924
925         if (lo->ldo_stripenr == 0)
926                 RETURN(0);
927
928         /*
929          * if object is striped declare changes on the stripes
930          */
931         LASSERT(lo->ldo_stripe);
932         for (i = 0; i < lo->ldo_stripenr; i++) {
933                 if (likely(lo->ldo_stripe[i] != NULL)) {
934                         rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr,
935                                                  handle);
936                         if (rc != 0) {
937                                 CERROR("failed declaration: %d\n", rc);
938                                 break;
939                         }
940                 }
941         }
942
943         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
944             dt_object_exists(next) != 0 &&
945             dt_object_remote(next) == 0)
946                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
947
948         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
949             dt_object_exists(next) &&
950             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
951                 struct lod_thread_info *info = lod_env_info(env);
952                 struct lu_buf *buf = &info->lti_buf;
953
954                 buf->lb_buf = info->lti_ea_store;
955                 buf->lb_len = info->lti_ea_store_size;
956                 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
957                                      LU_XATTR_REPLACE, handle);
958         }
959
960         RETURN(rc);
961 }
962
963 static int lod_attr_set(const struct lu_env *env,
964                         struct dt_object *dt,
965                         const struct lu_attr *attr,
966                         struct thandle *handle,
967                         struct lustre_capa *capa)
968 {
969         struct dt_object        *next = dt_object_child(dt);
970         struct lod_object       *lo = lod_dt_obj(dt);
971         int                     rc, i;
972         ENTRY;
973
974         /* Set dead object on all other stripes */
975         if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) &&
976             attr->la_flags & LUSTRE_SLAVE_DEAD_FL) {
977                 rc = lod_mark_dead_object(env, dt, handle, false);
978                 RETURN(rc);
979         }
980
981         /*
982          * apply changes to the local object
983          */
984         rc = dt_attr_set(env, next, attr, handle, capa);
985         if (rc)
986                 RETURN(rc);
987
988         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
989                 if (!(attr->la_valid & (LA_UID | LA_GID)))
990                         RETURN(rc);
991
992                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
993                         RETURN(0);
994         } else {
995                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
996                                         LA_ATIME | LA_MTIME | LA_CTIME)))
997                         RETURN(rc);
998         }
999
1000         if (lo->ldo_stripenr == 0)
1001                 RETURN(0);
1002
1003         /*
1004          * if object is striped, apply changes to all the stripes
1005          */
1006         LASSERT(lo->ldo_stripe);
1007         for (i = 0; i < lo->ldo_stripenr; i++) {
1008                 if (likely(lo->ldo_stripe[i] != NULL)) {
1009                         if (dt_object_exists(lo->ldo_stripe[i]) == 0)
1010                                 continue;
1011
1012                         rc = dt_attr_set(env, lo->ldo_stripe[i], attr,
1013                                          handle, capa);
1014                         if (rc != 0) {
1015                                 CERROR("failed declaration: %d\n", rc);
1016                                 break;
1017                         }
1018                 }
1019         }
1020
1021         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
1022             dt_object_exists(next) != 0 &&
1023             dt_object_remote(next) == 0)
1024                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
1025
1026         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
1027             dt_object_exists(next) &&
1028             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
1029                 struct lod_thread_info *info = lod_env_info(env);
1030                 struct lu_buf *buf = &info->lti_buf;
1031                 struct ost_id *oi = &info->lti_ostid;
1032                 struct lu_fid *fid = &info->lti_fid;
1033                 struct lov_mds_md_v1 *lmm;
1034                 struct lov_ost_data_v1 *objs;
1035                 __u32 magic;
1036                 int rc1;
1037
1038                 rc1 = lod_get_lov_ea(env, lo);
1039                 if (rc1  <= 0)
1040                         RETURN(rc);
1041
1042                 buf->lb_buf = info->lti_ea_store;
1043                 buf->lb_len = info->lti_ea_store_size;
1044                 lmm = info->lti_ea_store;
1045                 magic = le32_to_cpu(lmm->lmm_magic);
1046                 if (magic == LOV_MAGIC_V1)
1047                         objs = &(lmm->lmm_objects[0]);
1048                 else
1049                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
1050                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
1051                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
1052                 fid->f_oid--;
1053                 fid_to_ostid(fid, oi);
1054                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
1055                 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
1056                              LU_XATTR_REPLACE, handle, BYPASS_CAPA);
1057         }
1058
1059         RETURN(rc);
1060 }
1061
1062 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
1063                          struct lu_buf *buf, const char *name,
1064                          struct lustre_capa *capa)
1065 {
1066         struct lod_thread_info  *info = lod_env_info(env);
1067         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
1068         int                      rc, is_root;
1069         ENTRY;
1070
1071         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
1072         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
1073                 RETURN(rc);
1074
1075         /*
1076          * lod returns default striping on the real root of the device
1077          * this is like the root stores default striping for the whole
1078          * filesystem. historically we've been using a different approach
1079          * and store it in the config.
1080          */
1081         dt_root_get(env, dev->lod_child, &info->lti_fid);
1082         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
1083
1084         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
1085                 struct lov_user_md *lum = buf->lb_buf;
1086                 struct lov_desc    *desc = &dev->lod_desc;
1087
1088                 if (buf->lb_buf == NULL) {
1089                         rc = sizeof(*lum);
1090                 } else if (buf->lb_len >= sizeof(*lum)) {
1091                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
1092                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
1093                         lmm_oi_set_id(&lum->lmm_oi, 0);
1094                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
1095                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
1096                         lum->lmm_stripe_size = cpu_to_le32(
1097                                                 desc->ld_default_stripe_size);
1098                         lum->lmm_stripe_count = cpu_to_le16(
1099                                                 desc->ld_default_stripe_count);
1100                         lum->lmm_stripe_offset = cpu_to_le16(
1101                                                 desc->ld_default_stripe_offset);
1102                         rc = sizeof(*lum);
1103                 } else {
1104                         rc = -ERANGE;
1105                 }
1106         }
1107
1108         RETURN(rc);
1109 }
1110
1111 static int lod_verify_md_striping(struct lod_device *lod,
1112                                   const struct lmv_user_md_v1 *lum)
1113 {
1114         int     rc = 0;
1115         ENTRY;
1116
1117         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
1118                 GOTO(out, rc = -EINVAL);
1119
1120         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
1121                 GOTO(out, rc = -EINVAL);
1122 out:
1123         if (rc != 0)
1124                 CERROR("%s: invalid lmv_user_md: magic = %x, "
1125                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
1126                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
1127                        (int)le32_to_cpu(lum->lum_stripe_offset),
1128                        le32_to_cpu(lum->lum_stripe_count), rc);
1129         return rc;
1130 }
1131
1132 /**
1133  * Master LMVEA will be same as slave LMVEA, except
1134  * 1. different magic
1135  * 2. No lmv_stripe_fids on slave
1136  * 3. lmv_master_mdt_index on slave LMV EA will be stripe_index.
1137  */
1138 static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv,
1139                                   const struct lmv_mds_md_v1 *master_lmv)
1140 {
1141         *slave_lmv = *master_lmv;
1142         slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
1143 }
1144
1145 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
1146                     struct lu_buf *lmv_buf)
1147 {
1148         struct lod_thread_info  *info = lod_env_info(env);
1149         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1150         struct lod_object       *lo = lod_dt_obj(dt);
1151         struct lmv_mds_md_v1    *lmm1;
1152         int                     stripe_count;
1153         int                     lmm_size;
1154         int                     type = LU_SEQ_RANGE_ANY;
1155         int                     i;
1156         int                     rc;
1157         __u32                   mdtidx;
1158         ENTRY;
1159
1160         LASSERT(lo->ldo_dir_striped != 0);
1161         LASSERT(lo->ldo_stripenr > 0);
1162         stripe_count = lo->ldo_stripenr;
1163         lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
1164         if (info->lti_ea_store_size < lmm_size) {
1165                 rc = lod_ea_store_resize(info, lmm_size);
1166                 if (rc != 0)
1167                         RETURN(rc);
1168         }
1169
1170         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
1171         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
1172         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
1173         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
1174         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
1175                             &mdtidx, &type);
1176         if (rc != 0)
1177                 RETURN(rc);
1178
1179         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
1180         fid_cpu_to_le(&lmm1->lmv_master_fid, lu_object_fid(&dt->do_lu));
1181         for (i = 0; i < lo->ldo_stripenr; i++) {
1182                 struct dt_object *dto;
1183
1184                 dto = lo->ldo_stripe[i];
1185                 LASSERT(dto != NULL);
1186                 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i],
1187                               lu_object_fid(&dto->do_lu));
1188         }
1189
1190         lmv_buf->lb_buf = info->lti_ea_store;
1191         lmv_buf->lb_len = lmm_size;
1192         lo->ldo_dir_striping_cached = 1;
1193
1194         RETURN(rc);
1195 }
1196
1197 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
1198                            const struct lu_buf *buf)
1199 {
1200         struct lod_thread_info  *info = lod_env_info(env);
1201         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
1202         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1203         struct dt_object        **stripe;
1204         union lmv_mds_md        *lmm = buf->lb_buf;
1205         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
1206         struct lu_fid           *fid = &info->lti_fid;
1207         int                     i;
1208         int                     rc = 0;
1209         ENTRY;
1210
1211         if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
1212                 RETURN(0);
1213
1214         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) {
1215                 lo->ldo_dir_slave_stripe = 1;
1216                 RETURN(0);
1217         }
1218
1219         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
1220                 RETURN(-EINVAL);
1221
1222         if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
1223                 RETURN(0);
1224
1225         LASSERT(lo->ldo_stripe == NULL);
1226         OBD_ALLOC(stripe, sizeof(stripe[0]) *
1227                   (le32_to_cpu(lmv1->lmv_stripe_count)));
1228         if (stripe == NULL)
1229                 RETURN(-ENOMEM);
1230
1231         for (i = 0; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
1232                 struct dt_device        *tgt_dt;
1233                 struct dt_object        *dto;
1234                 int                     type = LU_SEQ_RANGE_ANY;
1235                 __u32                   idx;
1236
1237                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
1238                 if (!fid_is_sane(fid))
1239                         GOTO(out, rc = -ESTALE);
1240
1241                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
1242                 if (rc != 0)
1243                         GOTO(out, rc);
1244
1245                 if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
1246                         tgt_dt = lod->lod_child;
1247                 } else {
1248                         struct lod_tgt_desc     *tgt;
1249
1250                         tgt = LTD_TGT(ltd, idx);
1251                         if (tgt == NULL)
1252                                 GOTO(out, rc = -ESTALE);
1253                         tgt_dt = tgt->ltd_tgt;
1254                 }
1255
1256                 dto = dt_locate_at(env, tgt_dt, fid,
1257                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
1258                                   NULL);
1259                 if (IS_ERR(dto))
1260                         GOTO(out, rc = PTR_ERR(dto));
1261
1262                 stripe[i] = dto;
1263         }
1264 out:
1265         lo->ldo_stripe = stripe;
1266         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count);
1267         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
1268         if (rc != 0)
1269                 lod_object_free_striping(env, lo);
1270
1271         RETURN(rc);
1272 }
1273
1274 static int lod_prep_md_striped_create(const struct lu_env *env,
1275                                       struct dt_object *dt,
1276                                       struct lu_attr *attr,
1277                                       const struct lmv_user_md_v1 *lum,
1278                                       struct dt_object_format *dof,
1279                                       struct thandle *th)
1280 {
1281         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1282         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
1283         struct lod_object       *lo = lod_dt_obj(dt);
1284         struct lod_thread_info  *info = lod_env_info(env);
1285         struct dt_object        **stripe;
1286         struct lu_buf           lmv_buf;
1287         struct lu_buf           slave_lmv_buf;
1288         struct lmv_mds_md_v1    *lmm;
1289         struct lmv_mds_md_v1    *slave_lmm = NULL;
1290         int                     stripe_count;
1291         int                     *idx_array;
1292         int                     rc = 0;
1293         int                     i;
1294         int                     j;
1295         ENTRY;
1296
1297         /* The lum has been verifed in lod_verify_md_striping */
1298         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
1299         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
1300
1301         stripe_count = le32_to_cpu(lum->lum_stripe_count);
1302
1303         /* shrink the stripe_count to the avaible MDT count */
1304         if (stripe_count > lod->lod_remote_mdt_count + 1)
1305                 stripe_count = lod->lod_remote_mdt_count + 1;
1306
1307         OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
1308         if (stripe == NULL)
1309                 RETURN(-ENOMEM);
1310
1311         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
1312         if (idx_array == NULL)
1313                 GOTO(out_free, rc = -ENOMEM);
1314
1315         for (i = 0; i < stripe_count; i++) {
1316                 struct lod_tgt_desc     *tgt = NULL;
1317                 struct dt_object        *dto;
1318                 struct lu_fid           fid = { 0 };
1319                 int                     idx;
1320                 struct lu_object_conf   conf = { 0 };
1321                 struct dt_device        *tgt_dt = NULL;
1322
1323                 if (i == 0) {
1324                         /* Right now, master stripe and master object are
1325                          * on the same MDT */
1326                         idx = le32_to_cpu(lum->lum_stripe_offset);
1327                         rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
1328                                            NULL);
1329                         if (rc < 0)
1330                                 GOTO(out_put, rc);
1331                         tgt_dt = lod->lod_child;
1332                         goto next;
1333                 }
1334
1335                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
1336
1337                 for (j = 0; j < lod->lod_remote_mdt_count;
1338                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
1339                         bool already_allocated = false;
1340                         int k;
1341
1342                         CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
1343                                " allocated %d, last allocated %d\n", idx,
1344                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1345
1346                         /* Find next available target */
1347                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
1348                                 continue;
1349
1350                         /* check whether the idx already exists
1351                          * in current allocated array */
1352                         for (k = 0; k < i; k++) {
1353                                 if (idx_array[k] == idx) {
1354                                         already_allocated = true;
1355                                         break;
1356                                 }
1357                         }
1358
1359                         if (already_allocated)
1360                                 continue;
1361
1362                         /* check the status of the OSP */
1363                         tgt = LTD_TGT(ltd, idx);
1364                         if (tgt == NULL)
1365                                 continue;
1366
1367                         tgt_dt = tgt->ltd_tgt;
1368                         rc = dt_statfs(env, tgt_dt, NULL);
1369                         if (rc) {
1370                                 /* this OSP doesn't feel well */
1371                                 rc = 0;
1372                                 continue;
1373                         }
1374
1375                         rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL);
1376                         if (rc < 0) {
1377                                 rc = 0;
1378                                 continue;
1379                         }
1380
1381                         break;
1382                 }
1383
1384                 /* Can not allocate more stripes */
1385                 if (j == lod->lod_remote_mdt_count) {
1386                         CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
1387                                lod2obd(lod)->obd_name, stripe_count, i - 1);
1388                         break;
1389                 }
1390
1391                 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
1392                        " allocated %d, last allocated %d\n", idx,
1393                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
1394
1395 next:
1396                 /* tgt_dt and fid must be ready after search avaible OSP
1397                  * in the above loop */
1398                 LASSERT(tgt_dt != NULL);
1399                 LASSERT(fid_is_sane(&fid));
1400                 conf.loc_flags = LOC_F_NEW;
1401                 dto = dt_locate_at(env, tgt_dt, &fid,
1402                                    dt->do_lu.lo_dev->ld_site->ls_top_dev,
1403                                    &conf);
1404                 if (IS_ERR(dto))
1405                         GOTO(out_put, rc = PTR_ERR(dto));
1406                 stripe[i] = dto;
1407                 idx_array[i] = idx;
1408         }
1409
1410         lo->ldo_dir_striped = 1;
1411         lo->ldo_stripe = stripe;
1412         lo->ldo_stripenr = i;
1413         lo->ldo_stripes_allocated = stripe_count;
1414
1415         if (lo->ldo_stripenr == 0)
1416                 GOTO(out_put, rc = -ENOSPC);
1417
1418         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1419         if (rc != 0)
1420                 GOTO(out_put, rc);
1421         lmm = lmv_buf.lb_buf;
1422
1423         OBD_ALLOC_PTR(slave_lmm);
1424         if (slave_lmm == NULL)
1425                 GOTO(out_put, rc = -ENOMEM);
1426
1427         lod_prep_slave_lmv_md(slave_lmm, lmm);
1428         slave_lmv_buf.lb_buf = slave_lmm;
1429         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1430
1431         if (!dt_try_as_dir(env, dt_object_child(dt)))
1432                 GOTO(out_put, rc = -EINVAL);
1433
1434         for (i = 0; i < lo->ldo_stripenr; i++) {
1435                 struct dt_object        *dto            = stripe[i];
1436                 char                    *stripe_name    = info->lti_key;
1437                 struct lu_name          *sname;
1438                 struct linkea_data       ldata          = { 0 };
1439                 struct lu_buf            linkea_buf;
1440
1441                 rc = dt_declare_create(env, dto, attr, NULL, dof, th);
1442                 if (rc != 0)
1443                         GOTO(out_put, rc);
1444
1445                 if (!dt_try_as_dir(env, dto))
1446                         GOTO(out_put, rc = -EINVAL);
1447
1448                 rc = dt_declare_insert(env, dto,
1449                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1450                      (const struct dt_key *)dot, th);
1451                 if (rc != 0)
1452                         GOTO(out_put, rc);
1453
1454                 /* master stripe FID will be put to .. */
1455                 rc = dt_declare_insert(env, dto,
1456                      (const struct dt_rec *)lu_object_fid(&dt->do_lu),
1457                      (const struct dt_key *)dotdot, th);
1458                 if (rc != 0)
1459                         GOTO(out_put, rc);
1460
1461                 /* probably nothing to inherite */
1462                 if (lo->ldo_striping_cached &&
1463                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1464                                          lo->ldo_def_stripenr,
1465                                          lo->ldo_def_stripe_offset)) {
1466                         struct lov_user_md_v3   *v3;
1467
1468                         /* sigh, lti_ea_store has been used for lmv_buf,
1469                          * so we have to allocate buffer for default
1470                          * stripe EA */
1471                         OBD_ALLOC_PTR(v3);
1472                         if (v3 == NULL)
1473                                 GOTO(out_put, rc = -ENOMEM);
1474
1475                         memset(v3, 0, sizeof(*v3));
1476                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1477                         v3->lmm_stripe_count =
1478                                 cpu_to_le16(lo->ldo_def_stripenr);
1479                         v3->lmm_stripe_offset =
1480                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1481                         v3->lmm_stripe_size =
1482                                 cpu_to_le32(lo->ldo_def_stripe_size);
1483                         if (lo->ldo_pool != NULL)
1484                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1485                                         sizeof(v3->lmm_pool_name));
1486
1487                         info->lti_buf.lb_buf = v3;
1488                         info->lti_buf.lb_len = sizeof(*v3);
1489                         rc = dt_declare_xattr_set(env, dto,
1490                                                   &info->lti_buf,
1491                                                   XATTR_NAME_LOV,
1492                                                   0, th);
1493                         OBD_FREE_PTR(v3);
1494                         if (rc != 0)
1495                                 GOTO(out_put, rc);
1496                 }
1497
1498                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1499                 rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf,
1500                                           XATTR_NAME_LMV, 0, th);
1501                 if (rc != 0)
1502                         GOTO(out_put, rc);
1503
1504                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1505                         PFID(lu_object_fid(&dto->do_lu)), i);
1506
1507                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1508                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1509                 if (rc != 0)
1510                         GOTO(out_put, rc);
1511
1512                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1513                 if (rc != 0)
1514                         GOTO(out_put, rc);
1515
1516                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1517                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1518                 rc = dt_declare_xattr_set(env, dto, &linkea_buf,
1519                                           XATTR_NAME_LINK, 0, th);
1520                 if (rc != 0)
1521                         GOTO(out_put, rc);
1522
1523                 rc = dt_declare_insert(env, dt_object_child(dt),
1524                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1525                      (const struct dt_key *)stripe_name, th);
1526                 if (rc != 0)
1527                         GOTO(out_put, rc);
1528
1529                 rc = dt_declare_ref_add(env, dt_object_child(dt), th);
1530                 if (rc != 0)
1531                         GOTO(out_put, rc);
1532         }
1533
1534         rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
1535                                   XATTR_NAME_LMV, 0, th);
1536         if (rc != 0)
1537                 GOTO(out_put, rc);
1538
1539 out_put:
1540         if (rc < 0) {
1541                 for (i = 0; i < stripe_count; i++)
1542                         if (stripe[i] != NULL)
1543                                 lu_object_put(env, &stripe[i]->do_lu);
1544                 OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count);
1545                 lo->ldo_stripenr = 0;
1546                 lo->ldo_stripes_allocated = 0;
1547                 lo->ldo_stripe = NULL;
1548         }
1549
1550 out_free:
1551         if (idx_array != NULL)
1552                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
1553         if (slave_lmm != NULL)
1554                 OBD_FREE_PTR(slave_lmm);
1555
1556         RETURN(rc);
1557 }
1558
1559 /**
1560  * Declare create striped md object.
1561  */
1562 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
1563                                      struct dt_object *dt,
1564                                      struct lu_attr *attr,
1565                                      const struct lu_buf *lum_buf,
1566                                      struct dt_object_format *dof,
1567                                      struct thandle *th)
1568 {
1569         struct lod_object       *lo = lod_dt_obj(dt);
1570         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
1571         struct lmv_user_md_v1   *lum;
1572         int                     rc;
1573         ENTRY;
1574
1575         lum = lum_buf->lb_buf;
1576         LASSERT(lum != NULL);
1577
1578         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
1579                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
1580                (int)le32_to_cpu(lum->lum_stripe_offset));
1581
1582         if (le32_to_cpu(lum->lum_stripe_count) == 0)
1583                 GOTO(out, rc = 0);
1584
1585         rc = lod_verify_md_striping(lod, lum);
1586         if (rc != 0)
1587                 GOTO(out, rc);
1588
1589         /* prepare dir striped objects */
1590         rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th);
1591         if (rc != 0) {
1592                 /* failed to create striping, let's reset
1593                  * config so that others don't get confused */
1594                 lod_object_free_striping(env, lo);
1595                 GOTO(out, rc);
1596         }
1597 out:
1598         RETURN(rc);
1599 }
1600
1601 static int lod_dir_declare_xattr_set(const struct lu_env *env,
1602                                      struct dt_object *dt,
1603                                      const struct lu_buf *buf,
1604                                      const char *name, int fl,
1605                                      struct thandle *th)
1606 {
1607         struct dt_object        *next = dt_object_child(dt);
1608         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1609         struct lod_object       *lo = lod_dt_obj(dt);
1610         int                     i;
1611         int                     rc;
1612         ENTRY;
1613
1614         if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1615                 struct lmv_user_md_v1 *lum;
1616
1617                 LASSERT(buf != NULL && buf->lb_buf != NULL);
1618                 lum = buf->lb_buf;
1619                 rc = lod_verify_md_striping(d, lum);
1620                 if (rc != 0)
1621                         RETURN(rc);
1622         }
1623
1624         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1625         if (rc != 0)
1626                 RETURN(rc);
1627
1628         /* set xattr to each stripes, if needed */
1629         rc = lod_load_striping(env, lo);
1630         if (rc != 0)
1631                 RETURN(rc);
1632
1633         if (lo->ldo_stripenr == 0)
1634                 RETURN(rc);
1635
1636         for (i = 0; i < lo->ldo_stripenr; i++) {
1637                 LASSERT(lo->ldo_stripe[i]);
1638                 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1639                                           name, fl, th);
1640                 if (rc != 0)
1641                         break;
1642         }
1643
1644         RETURN(rc);
1645 }
1646
1647 /*
1648  * LOV xattr is a storage for striping, and LOD owns this xattr.
1649  * but LOD allows others to control striping to some extent
1650  * - to reset strping
1651  * - to set new defined striping
1652  * - to set new semi-defined striping
1653  *   - number of stripes is defined
1654  *   - number of stripes + osts are defined
1655  *   - ??
1656  */
1657 static int lod_declare_xattr_set(const struct lu_env *env,
1658                                  struct dt_object *dt,
1659                                  const struct lu_buf *buf,
1660                                  const char *name, int fl,
1661                                  struct thandle *th)
1662 {
1663         struct dt_object *next = dt_object_child(dt);
1664         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
1665         __u32             mode;
1666         int               rc;
1667         ENTRY;
1668
1669         /*
1670          * allow to declare predefined striping on a new (!mode) object
1671          * which is supposed to be replay of regular file creation
1672          * (when LOV setting is declared)
1673          * LU_XATTR_REPLACE is set to indicate a layout swap
1674          */
1675         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
1676         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
1677              !(fl & LU_XATTR_REPLACE)) {
1678                 /*
1679                  * this is a request to manipulate object's striping
1680                  */
1681                 if (dt_object_exists(dt)) {
1682                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1683                         if (rc)
1684                                 RETURN(rc);
1685                 } else {
1686                         memset(attr, 0, sizeof(*attr));
1687                         attr->la_valid = LA_TYPE | LA_MODE;
1688                         attr->la_mode = S_IFREG;
1689                 }
1690                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
1691         } else if (S_ISDIR(mode)) {
1692                 rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
1693         } else {
1694                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1695         }
1696
1697         RETURN(rc);
1698 }
1699
1700 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1701 {
1702         lo->ldo_striping_cached = 0;
1703         lo->ldo_def_striping_set = 0;
1704         lod_object_set_pool(lo, NULL);
1705         lo->ldo_def_stripe_size = 0;
1706         lo->ldo_def_stripenr = 0;
1707         if (lo->ldo_dir_stripe != NULL)
1708                 lo->ldo_dir_striping_cached = 0;
1709 }
1710
1711 static int lod_xattr_set_internal(const struct lu_env *env,
1712                                   struct dt_object *dt,
1713                                   const struct lu_buf *buf,
1714                                   const char *name, int fl, struct thandle *th,
1715                                   struct lustre_capa *capa)
1716 {
1717         struct dt_object        *next = dt_object_child(dt);
1718         struct lod_object       *lo = lod_dt_obj(dt);
1719         int                     rc;
1720         int                     i;
1721         ENTRY;
1722
1723         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1724         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1725                 RETURN(rc);
1726
1727         if (lo->ldo_stripenr == 0)
1728                 RETURN(rc);
1729
1730         for (i = 0; i < lo->ldo_stripenr; i++) {
1731                 LASSERT(lo->ldo_stripe[i]);
1732                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1733                                   capa);
1734                 if (rc != 0)
1735                         break;
1736         }
1737
1738         RETURN(rc);
1739 }
1740
1741 static int lod_xattr_del_internal(const struct lu_env *env,
1742                                   struct dt_object *dt,
1743                                   const char *name, struct thandle *th,
1744                                   struct lustre_capa *capa)
1745 {
1746         struct dt_object        *next = dt_object_child(dt);
1747         struct lod_object       *lo = lod_dt_obj(dt);
1748         int                     rc;
1749         int                     i;
1750         ENTRY;
1751
1752         rc = dt_xattr_del(env, next, name, th, capa);
1753         if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr))
1754                 RETURN(rc);
1755
1756         if (lo->ldo_stripenr == 0)
1757                 RETURN(rc);
1758
1759         for (i = 0; i < lo->ldo_stripenr; i++) {
1760                 LASSERT(lo->ldo_stripe[i]);
1761                 rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th,
1762                                   capa);
1763                 if (rc != 0)
1764                         break;
1765         }
1766
1767         RETURN(rc);
1768 }
1769
1770 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1771                                     struct dt_object *dt,
1772                                     const struct lu_buf *buf,
1773                                     const char *name, int fl,
1774                                     struct thandle *th,
1775                                     struct lustre_capa *capa)
1776 {
1777         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1778         struct lod_object       *l = lod_dt_obj(dt);
1779         struct lov_user_md_v1   *lum;
1780         struct lov_user_md_v3   *v3 = NULL;
1781         int                      rc;
1782         ENTRY;
1783
1784         /* If it is striped dir, we should clear the stripe cache for
1785          * slave stripe as well, but there are no effective way to
1786          * notify the LOD on the slave MDT, so we do not cache stripe
1787          * information for slave stripe for now. XXX*/
1788         lod_lov_stripe_cache_clear(l);
1789         LASSERT(buf != NULL && buf->lb_buf != NULL);
1790         lum = buf->lb_buf;
1791
1792         rc = lod_verify_striping(d, buf, false);
1793         if (rc)
1794                 RETURN(rc);
1795
1796         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1797                 v3 = buf->lb_buf;
1798
1799         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1800          * (i.e. all default values specified) then delete default
1801          * striping from dir. */
1802         CDEBUG(D_OTHER,
1803                 "set default striping: sz %u # %u offset %d %s %s\n",
1804                 (unsigned)lum->lmm_stripe_size,
1805                 (unsigned)lum->lmm_stripe_count,
1806                 (int)lum->lmm_stripe_offset,
1807                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1808
1809         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1810                                 (lum->lmm_stripe_count),
1811                                 (lum->lmm_stripe_offset)) &&
1812                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1813                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1814                 if (rc == -ENODATA)
1815                         rc = 0;
1816         } else {
1817                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1818         }
1819
1820         RETURN(rc);
1821 }
1822
1823 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1824                                             struct dt_object *dt,
1825                                             const struct lu_buf *buf,
1826                                             const char *name, int fl,
1827                                             struct thandle *th,
1828                                             struct lustre_capa *capa)
1829 {
1830         struct lod_object       *l = lod_dt_obj(dt);
1831         struct lmv_user_md_v1   *lum;
1832         int                      rc;
1833         ENTRY;
1834
1835         LASSERT(buf != NULL && buf->lb_buf != NULL);
1836         lum = buf->lb_buf;
1837
1838         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1839               le32_to_cpu(lum->lum_stripe_count),
1840               (int)le32_to_cpu(lum->lum_stripe_offset));
1841
1842         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1843                                  le32_to_cpu(lum->lum_stripe_offset)) &&
1844                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1845                 rc = lod_xattr_del_internal(env, dt, name, th, capa);
1846                 if (rc == -ENODATA)
1847                         rc = 0;
1848         } else {
1849                 rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
1850                 if (rc != 0)
1851                         RETURN(rc);
1852         }
1853
1854         /* Update default stripe cache */
1855         if (l->ldo_dir_stripe == NULL) {
1856                 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1857                 if (l->ldo_dir_stripe == NULL)
1858                         RETURN(-ENOMEM);
1859         }
1860
1861         l->ldo_dir_striping_cached = 0;
1862         l->ldo_dir_def_striping_set = 1;
1863         l->ldo_dir_def_stripenr = le32_to_cpu(lum->lum_stripe_count);
1864
1865         RETURN(rc);
1866 }
1867
1868 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1869                              const struct lu_buf *buf, const char *name,
1870                              int fl, struct thandle *th,
1871                              struct lustre_capa *capa)
1872 {
1873         struct lod_object       *lo = lod_dt_obj(dt);
1874         struct lod_thread_info  *info = lod_env_info(env);
1875         struct lu_attr          *attr = &info->lti_attr;
1876         struct dt_object_format *dof = &info->lti_format;
1877         struct lu_buf           lmv_buf;
1878         struct lu_buf           slave_lmv_buf;
1879         struct lmv_mds_md_v1    *lmm;
1880         struct lmv_mds_md_v1    *slave_lmm = NULL;
1881         int                     i;
1882         int                     rc;
1883         ENTRY;
1884
1885         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1886                 RETURN(-ENOTDIR);
1887
1888         /* The stripes are supposed to be allocated in declare phase,
1889          * if there are no stripes being allocated, it will skip */
1890         if (lo->ldo_stripenr == 0)
1891                 RETURN(0);
1892
1893         rc = dt_attr_get(env, dt_object_child(dt), attr, BYPASS_CAPA);
1894         if (rc != 0)
1895                 RETURN(rc);
1896
1897         attr->la_valid = LA_TYPE | LA_MODE;
1898         dof->dof_type = DFT_DIR;
1899
1900         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1901         if (rc != 0)
1902                 RETURN(rc);
1903         lmm = lmv_buf.lb_buf;
1904
1905         OBD_ALLOC_PTR(slave_lmm);
1906         if (slave_lmm == NULL)
1907                 RETURN(-ENOMEM);
1908
1909         lod_prep_slave_lmv_md(slave_lmm, lmm);
1910         slave_lmv_buf.lb_buf = slave_lmm;
1911         slave_lmv_buf.lb_len = sizeof(*slave_lmm);
1912
1913         for (i = 0; i < lo->ldo_stripenr; i++) {
1914                 struct dt_object        *dto;
1915                 char                    *stripe_name    = info->lti_key;
1916                 struct lu_name          *sname;
1917                 struct linkea_data       ldata          = { 0 };
1918                 struct lu_buf            linkea_buf;
1919
1920                 dto = lo->ldo_stripe[i];
1921                 dt_write_lock(env, dto, MOR_TGT_CHILD);
1922                 rc = dt_create(env, dto, attr, NULL, dof, th);
1923                 dt_write_unlock(env, dto);
1924                 if (rc != 0)
1925                         RETURN(rc);
1926
1927                 rc = dt_insert(env, dto,
1928                               (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1929                               (const struct dt_key *)dot, th, capa, 0);
1930                 if (rc != 0)
1931                         RETURN(rc);
1932
1933                 rc = dt_insert(env, dto,
1934                               (struct dt_rec *)lu_object_fid(&dt->do_lu),
1935                               (const struct dt_key *)dotdot, th, capa, 0);
1936                 if (rc != 0)
1937                         RETURN(rc);
1938
1939                 if (lo->ldo_striping_cached &&
1940                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1941                                          lo->ldo_def_stripenr,
1942                                          lo->ldo_def_stripe_offset)) {
1943                         struct lov_user_md_v3   *v3;
1944
1945                         /* sigh, lti_ea_store has been used for lmv_buf,
1946                          * so we have to allocate buffer for default
1947                          * stripe EA */
1948                         OBD_ALLOC_PTR(v3);
1949                         if (v3 == NULL)
1950                                 GOTO(out, rc);
1951
1952                         memset(v3, 0, sizeof(*v3));
1953                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1954                         v3->lmm_stripe_count =
1955                                 cpu_to_le16(lo->ldo_def_stripenr);
1956                         v3->lmm_stripe_offset =
1957                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1958                         v3->lmm_stripe_size =
1959                                 cpu_to_le32(lo->ldo_def_stripe_size);
1960                         if (lo->ldo_pool != NULL)
1961                                 strlcpy(v3->lmm_pool_name, lo->ldo_pool,
1962                                         sizeof(v3->lmm_pool_name));
1963
1964                         info->lti_buf.lb_buf = v3;
1965                         info->lti_buf.lb_len = sizeof(*v3);
1966                         rc = dt_xattr_set(env, dto, &info->lti_buf,
1967                                           XATTR_NAME_LOV, 0, th, capa);
1968                         OBD_FREE_PTR(v3);
1969                         if (rc != 0)
1970                                 GOTO(out, rc);
1971                 }
1972
1973                 slave_lmm->lmv_master_mdt_index = cpu_to_le32(i);
1974                 rc = dt_xattr_set(env, dto, &slave_lmv_buf, XATTR_NAME_LMV,
1975                                   fl, th, capa);
1976                 if (rc != 0)
1977                         GOTO(out, rc);
1978
1979                 snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
1980                          PFID(lu_object_fid(&dto->do_lu)), i);
1981
1982                 sname = lod_name_get(env, stripe_name, strlen(stripe_name));
1983                 rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
1984                 if (rc != 0)
1985                         GOTO(out, rc);
1986
1987                 rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
1988                 if (rc != 0)
1989                         GOTO(out, rc);
1990
1991                 linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
1992                 linkea_buf.lb_len = ldata.ld_leh->leh_len;
1993                 rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK,
1994                                   0, th, BYPASS_CAPA);
1995                 if (rc != 0)
1996                         GOTO(out, rc);
1997
1998                 rc = dt_insert(env, dt_object_child(dt),
1999                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
2000                      (const struct dt_key *)stripe_name, th, capa, 0);
2001                 if (rc != 0)
2002                         GOTO(out, rc);
2003
2004                 rc = dt_ref_add(env, dt_object_child(dt), th);
2005                 if (rc != 0)
2006                         GOTO(out, rc);
2007         }
2008
2009         rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
2010                           fl, th, capa);
2011
2012 out:
2013         if (slave_lmm != NULL)
2014                 OBD_FREE_PTR(slave_lmm);
2015
2016         RETURN(rc);
2017 }
2018
2019 int lod_dir_striping_create_internal(const struct lu_env *env,
2020                                      struct dt_object *dt,
2021                                      struct lu_attr *attr,
2022                                      struct dt_object_format *dof,
2023                                      struct thandle *th,
2024                                      bool declare)
2025 {
2026         struct lod_thread_info  *info = lod_env_info(env);
2027         struct lod_object       *lo = lod_dt_obj(dt);
2028         int                     rc;
2029         ENTRY;
2030
2031         if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr,
2032                                  lo->ldo_dir_stripe_offset)) {
2033                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2034                 int stripe_count = lo->ldo_stripenr;
2035
2036                 if (info->lti_ea_store_size < sizeof(*v1)) {
2037                         rc = lod_ea_store_resize(info, sizeof(*v1));
2038                         if (rc != 0)
2039                                 RETURN(rc);
2040                         v1 = info->lti_ea_store;
2041                 }
2042
2043                 memset(v1, 0, sizeof(*v1));
2044                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2045                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
2046                 v1->lum_stripe_offset =
2047                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
2048
2049                 info->lti_buf.lb_buf = v1;
2050                 info->lti_buf.lb_len = sizeof(*v1);
2051
2052                 if (declare)
2053                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
2054                                                        &info->lti_buf, dof, th);
2055                 else
2056                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
2057                                                XATTR_NAME_LMV, 0, th,
2058                                                BYPASS_CAPA);
2059                 if (rc != 0)
2060                         RETURN(rc);
2061         }
2062
2063         /* Transfer default LMV striping from the parent */
2064         if (lo->ldo_dir_striping_cached &&
2065             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
2066                                  lo->ldo_dir_def_stripe_offset)) {
2067                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
2068                 int def_stripe_count = lo->ldo_dir_def_stripenr;
2069
2070                 if (info->lti_ea_store_size < sizeof(*v1)) {
2071                         rc = lod_ea_store_resize(info, sizeof(*v1));
2072                         if (rc != 0)
2073                                 RETURN(rc);
2074                         v1 = info->lti_ea_store;
2075                 }
2076
2077                 memset(v1, 0, sizeof(*v1));
2078                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
2079                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
2080                 v1->lum_stripe_offset =
2081                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
2082                 v1->lum_hash_type =
2083                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
2084
2085                 info->lti_buf.lb_buf = v1;
2086                 info->lti_buf.lb_len = sizeof(*v1);
2087                 if (declare)
2088                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2089                                                        XATTR_NAME_DEFAULT_LMV,
2090                                                        0, th);
2091                 else
2092                         rc = lod_xattr_set_default_lmv_on_dir(env, dt,
2093                                                   &info->lti_buf,
2094                                                   XATTR_NAME_DEFAULT_LMV, 0,
2095                                                   th, BYPASS_CAPA);
2096                 if (rc != 0)
2097                         RETURN(rc);
2098         }
2099
2100         /* Transfer default LOV striping from the parent */
2101         if (lo->ldo_striping_cached &&
2102             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
2103                                  lo->ldo_def_stripenr,
2104                                  lo->ldo_def_stripe_offset)) {
2105                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
2106
2107                 if (info->lti_ea_store_size < sizeof(*v3)) {
2108                         rc = lod_ea_store_resize(info, sizeof(*v3));
2109                         if (rc != 0)
2110                                 RETURN(rc);
2111                         v3 = info->lti_ea_store;
2112                 }
2113
2114                 memset(v3, 0, sizeof(*v3));
2115                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
2116                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
2117                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
2118                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
2119                 if (lo->ldo_pool != NULL)
2120                         strlcpy(v3->lmm_pool_name, lo->ldo_pool,
2121                                 sizeof(v3->lmm_pool_name));
2122
2123                 info->lti_buf.lb_buf = v3;
2124                 info->lti_buf.lb_len = sizeof(*v3);
2125
2126                 if (declare)
2127                         rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf,
2128                                                        XATTR_NAME_LOV, 0, th);
2129                 else
2130                         rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf,
2131                                                       XATTR_NAME_LOV, 0, th,
2132                                                       BYPASS_CAPA);
2133                 if (rc != 0)
2134                         RETURN(rc);
2135         }
2136
2137         RETURN(0);
2138 }
2139
2140 static int lod_declare_dir_striping_create(const struct lu_env *env,
2141                                            struct dt_object *dt,
2142                                            struct lu_attr *attr,
2143                                            struct dt_object_format *dof,
2144                                            struct thandle *th)
2145 {
2146         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
2147 }
2148
2149 static int lod_dir_striping_create(const struct lu_env *env,
2150                                    struct dt_object *dt,
2151                                    struct lu_attr *attr,
2152                                    struct dt_object_format *dof,
2153                                    struct thandle *th)
2154 {
2155         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
2156 }
2157
2158 static int lod_xattr_set(const struct lu_env *env,
2159                          struct dt_object *dt, const struct lu_buf *buf,
2160                          const char *name, int fl, struct thandle *th,
2161                          struct lustre_capa *capa)
2162 {
2163         struct dt_object        *next = dt_object_child(dt);
2164         int                      rc;
2165         ENTRY;
2166
2167         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2168             strcmp(name, XATTR_NAME_LMV) == 0) {
2169                 struct lmv_mds_md_v1 *lmm = buf->lb_buf;
2170
2171                 if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
2172                                                 LMV_HASH_FLAG_MIGRATION)
2173                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2174                 else
2175                         rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
2176
2177                 RETURN(rc);
2178         }
2179
2180         if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2181             strcmp(name, XATTR_NAME_LOV) == 0) {
2182                 /* default LOVEA */
2183                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
2184                 RETURN(rc);
2185         } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
2186                    strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
2187                 /* default LMVEA */
2188                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
2189                                                       th, capa);
2190                 RETURN(rc);
2191         } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2192                    !strcmp(name, XATTR_NAME_LOV)) {
2193                 /* in case of lov EA swap, just set it
2194                  * if not, it is a replay so check striping match what we
2195                  * already have during req replay, declare_xattr_set()
2196                  * defines striping, then create() does the work
2197                 */
2198                 if (fl & LU_XATTR_REPLACE) {
2199                         /* free stripes, then update disk */
2200                         lod_object_free_striping(env, lod_dt_obj(dt));
2201                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
2202                 } else {
2203                         rc = lod_striping_create(env, dt, NULL, NULL, th);
2204                 }
2205                 RETURN(rc);
2206         }
2207
2208         /* then all other xattr */
2209         rc = lod_xattr_set_internal(env, dt, buf, name, fl, th, capa);
2210
2211         RETURN(rc);
2212 }
2213
2214 static int lod_declare_xattr_del(const struct lu_env *env,
2215                                  struct dt_object *dt, const char *name,
2216                                  struct thandle *th)
2217 {
2218         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
2219 }
2220
2221 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
2222                          const char *name, struct thandle *th,
2223                          struct lustre_capa *capa)
2224 {
2225         if (!strcmp(name, XATTR_NAME_LOV))
2226                 lod_object_free_striping(env, lod_dt_obj(dt));
2227         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
2228 }
2229
2230 static int lod_xattr_list(const struct lu_env *env,
2231                           struct dt_object *dt, struct lu_buf *buf,
2232                           struct lustre_capa *capa)
2233 {
2234         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
2235 }
2236
2237 int lod_object_set_pool(struct lod_object *o, char *pool)
2238 {
2239         int len;
2240
2241         if (o->ldo_pool) {
2242                 len = strlen(o->ldo_pool);
2243                 OBD_FREE(o->ldo_pool, len + 1);
2244                 o->ldo_pool = NULL;
2245         }
2246         if (pool) {
2247                 len = strlen(pool);
2248                 OBD_ALLOC(o->ldo_pool, len + 1);
2249                 if (o->ldo_pool == NULL)
2250                         return -ENOMEM;
2251                 strcpy(o->ldo_pool, pool);
2252         }
2253         return 0;
2254 }
2255
2256 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
2257 {
2258         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
2259 }
2260
2261
2262 static int lod_cache_parent_lov_striping(const struct lu_env *env,
2263                                          struct lod_object *lp)
2264 {
2265         struct lod_thread_info  *info = lod_env_info(env);
2266         struct lov_user_md_v1   *v1 = NULL;
2267         struct lov_user_md_v3   *v3 = NULL;
2268         int                      rc;
2269         ENTRY;
2270
2271         /* called from MDD without parent being write locked,
2272          * lock it here */
2273         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2274         rc = lod_get_lov_ea(env, lp);
2275         if (rc < 0)
2276                 GOTO(unlock, rc);
2277
2278         if (rc < sizeof(struct lov_user_md)) {
2279                 /* don't lookup for non-existing or invalid striping */
2280                 lp->ldo_def_striping_set = 0;
2281                 lp->ldo_striping_cached = 1;
2282                 lp->ldo_def_stripe_size = 0;
2283                 lp->ldo_def_stripenr = 0;
2284                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
2285                 GOTO(unlock, rc = 0);
2286         }
2287
2288         rc = 0;
2289         v1 = info->lti_ea_store;
2290         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) {
2291                 lustre_swab_lov_user_md_v1(v1);
2292         } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) {
2293                 v3 = (struct lov_user_md_v3 *)v1;
2294                 lustre_swab_lov_user_md_v3(v3);
2295         }
2296
2297         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
2298                 GOTO(unlock, rc = 0);
2299
2300         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
2301                 GOTO(unlock, rc = 0);
2302
2303         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
2304                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
2305                (int)v1->lmm_stripe_count,
2306                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
2307
2308         lp->ldo_def_stripenr = v1->lmm_stripe_count;
2309         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
2310         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
2311         lp->ldo_striping_cached = 1;
2312         lp->ldo_def_striping_set = 1;
2313         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
2314                 /* XXX: sanity check here */
2315                 v3 = (struct lov_user_md_v3 *) v1;
2316                 if (v3->lmm_pool_name[0])
2317                         lod_object_set_pool(lp, v3->lmm_pool_name);
2318         }
2319         EXIT;
2320 unlock:
2321         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2322         return rc;
2323 }
2324
2325
2326 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
2327                                          struct lod_object *lp)
2328 {
2329         struct lod_thread_info  *info = lod_env_info(env);
2330         struct lmv_user_md_v1   *v1 = NULL;
2331         int                      rc;
2332         ENTRY;
2333
2334         /* called from MDD without parent being write locked,
2335          * lock it here */
2336         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
2337         rc = lod_get_default_lmv_ea(env, lp);
2338         if (rc < 0)
2339                 GOTO(unlock, rc);
2340
2341         if (rc < sizeof(struct lmv_user_md)) {
2342                 /* don't lookup for non-existing or invalid striping */
2343                 lp->ldo_dir_def_striping_set = 0;
2344                 lp->ldo_dir_striping_cached = 1;
2345                 lp->ldo_dir_def_stripenr = 0;
2346                 lp->ldo_dir_def_stripe_offset =
2347                                         (typeof(v1->lum_stripe_offset))(-1);
2348                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
2349                 GOTO(unlock, rc = 0);
2350         }
2351
2352         rc = 0;
2353         v1 = info->lti_ea_store;
2354
2355         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count);
2356         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
2357         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
2358         lp->ldo_dir_def_striping_set = 1;
2359         lp->ldo_dir_striping_cached = 1;
2360
2361         EXIT;
2362 unlock:
2363         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
2364         return rc;
2365 }
2366
2367 static int lod_cache_parent_striping(const struct lu_env *env,
2368                                      struct lod_object *lp,
2369                                      umode_t child_mode)
2370 {
2371         int rc = 0;
2372         ENTRY;
2373
2374         rc = lod_load_striping(env, lp);
2375         if (rc != 0)
2376                 RETURN(rc);
2377
2378         if (!lp->ldo_striping_cached) {
2379                 /* we haven't tried to get default striping for
2380                  * the directory yet, let's cache it in the object */
2381                 rc = lod_cache_parent_lov_striping(env, lp);
2382                 if (rc != 0)
2383                         RETURN(rc);
2384         }
2385
2386         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
2387                 rc = lod_cache_parent_lmv_striping(env, lp);
2388
2389         RETURN(rc);
2390 }
2391
2392 /**
2393  * used to transfer default striping data to the object being created
2394  */
2395 static void lod_ah_init(const struct lu_env *env,
2396                         struct dt_allocation_hint *ah,
2397                         struct dt_object *parent,
2398                         struct dt_object *child,
2399                         umode_t child_mode)
2400 {
2401         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
2402         struct dt_object  *nextp = NULL;
2403         struct dt_object  *nextc;
2404         struct lod_object *lp = NULL;
2405         struct lod_object *lc;
2406         struct lov_desc   *desc;
2407         int               rc;
2408         ENTRY;
2409
2410         LASSERT(child);
2411
2412         if (likely(parent)) {
2413                 nextp = dt_object_child(parent);
2414                 lp = lod_dt_obj(parent);
2415                 rc = lod_load_striping(env, lp);
2416                 if (rc != 0)
2417                         return;
2418         }
2419
2420         nextc = dt_object_child(child);
2421         lc = lod_dt_obj(child);
2422
2423         LASSERT(lc->ldo_stripenr == 0);
2424         LASSERT(lc->ldo_stripe == NULL);
2425
2426         /*
2427          * local object may want some hints
2428          * in case of late striping creation, ->ah_init()
2429          * can be called with local object existing
2430          */
2431         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
2432                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
2433                                           NULL : nextp, nextc, child_mode);
2434
2435         if (S_ISDIR(child_mode)) {
2436                 if (lc->ldo_dir_stripe == NULL) {
2437                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
2438                         if (lc->ldo_dir_stripe == NULL)
2439                                 return;
2440                 }
2441
2442                 if (lp->ldo_dir_stripe == NULL) {
2443                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
2444                         if (lp->ldo_dir_stripe == NULL)
2445                                 return;
2446                 }
2447
2448                 rc = lod_cache_parent_striping(env, lp, child_mode);
2449                 if (rc != 0)
2450                         return;
2451
2452                 /* transfer defaults to new directory */
2453                 if (lp->ldo_striping_cached) {
2454                         if (lp->ldo_pool)
2455                                 lod_object_set_pool(lc, lp->ldo_pool);
2456                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
2457                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
2458                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2459                         lc->ldo_striping_cached = 1;
2460                         lc->ldo_def_striping_set = 1;
2461                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
2462                                (int)lc->ldo_def_stripe_size,
2463                                (int)lc->ldo_def_stripe_offset,
2464                                (int)lc->ldo_def_stripenr);
2465                 }
2466
2467                 /* transfer dir defaults to new directory */
2468                 if (lp->ldo_dir_striping_cached) {
2469                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
2470                         lc->ldo_dir_def_stripe_offset =
2471                                                   lp->ldo_dir_def_stripe_offset;
2472                         lc->ldo_dir_def_hash_type =
2473                                                   lp->ldo_dir_def_hash_type;
2474                         lc->ldo_dir_striping_cached = 1;
2475                         lc->ldo_dir_def_striping_set = 1;
2476                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
2477                                (int)lc->ldo_dir_def_stripenr,
2478                                (int)lc->ldo_dir_def_stripe_offset,
2479                                lc->ldo_dir_def_hash_type);
2480                 }
2481
2482                 /* It should always honour the specified stripes */
2483                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
2484                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
2485
2486                         rc = lod_verify_md_striping(d, lum1);
2487                         if (rc == 0 &&
2488                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
2489                                 /* Directory will be striped only if
2490                                  * stripe_count > 1 */
2491                                 lc->ldo_stripenr =
2492                                         le32_to_cpu(lum1->lum_stripe_count);
2493                                 lc->ldo_dir_stripe_offset =
2494                                         le32_to_cpu(lum1->lum_stripe_offset);
2495                                 lc->ldo_dir_hash_type =
2496                                         le32_to_cpu(lum1->lum_hash_type);
2497                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
2498                                        lc->ldo_stripenr,
2499                                        (int)lc->ldo_dir_stripe_offset);
2500                         }
2501                 /* then check whether there is default stripes from parent */
2502                 } else if (lp->ldo_dir_def_striping_set) {
2503                         /* If there are default dir stripe from parent */
2504                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
2505                         lc->ldo_dir_stripe_offset =
2506                                         lp->ldo_dir_def_stripe_offset;
2507                         lc->ldo_dir_hash_type =
2508                                         lp->ldo_dir_def_hash_type;
2509                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
2510                                lc->ldo_stripenr,
2511                                (int)lc->ldo_dir_stripe_offset);
2512                 } else {
2513                         /* set default stripe for this directory */
2514                         lc->ldo_stripenr = 0;
2515                         lc->ldo_dir_stripe_offset = -1;
2516                 }
2517
2518                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
2519                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
2520
2521                 goto out;
2522         }
2523
2524         /*
2525          * if object is going to be striped over OSTs, transfer default
2526          * striping information to the child, so that we can use it
2527          * during declaration and creation
2528          */
2529         if (!lod_object_will_be_striped(S_ISREG(child_mode),
2530                                         lu_object_fid(&child->do_lu)))
2531                 goto out;
2532         /*
2533          * try from the parent
2534          */
2535         if (likely(parent)) {
2536                 lod_cache_parent_striping(env, lp, child_mode);
2537
2538                 lc->ldo_def_stripe_offset = (__u16) -1;
2539
2540                 if (lp->ldo_def_striping_set) {
2541                         if (lp->ldo_pool)
2542                                 lod_object_set_pool(lc, lp->ldo_pool);
2543                         lc->ldo_stripenr = lp->ldo_def_stripenr;
2544                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
2545                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
2546                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
2547                                lc->ldo_stripenr, lc->ldo_stripe_size,
2548                                lp->ldo_pool ? lp->ldo_pool : "");
2549                 }
2550         }
2551
2552         /*
2553          * if the parent doesn't provide with specific pattern, grab fs-wide one
2554          */
2555         desc = &d->lod_desc;
2556         if (lc->ldo_stripenr == 0)
2557                 lc->ldo_stripenr = desc->ld_default_stripe_count;
2558         if (lc->ldo_stripe_size == 0)
2559                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
2560         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
2561                lc->ldo_stripenr, lc->ldo_stripe_size,
2562                lc->ldo_pool ? lc->ldo_pool : "");
2563
2564 out:
2565         /* we do not cache stripe information for slave stripe, see
2566          * lod_xattr_set_lov_on_dir */
2567         if (lp != NULL && lp->ldo_dir_slave_stripe)
2568                 lod_lov_stripe_cache_clear(lp);
2569
2570         EXIT;
2571 }
2572
2573 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
2574 /*
2575  * this function handles a special case when truncate was done
2576  * on a stripeless object and now striping is being created
2577  * we can't lose that size, so we have to propagate it to newly
2578  * created object
2579  */
2580 static int lod_declare_init_size(const struct lu_env *env,
2581                                  struct dt_object *dt, struct thandle *th)
2582 {
2583         struct dt_object   *next = dt_object_child(dt);
2584         struct lod_object  *lo = lod_dt_obj(dt);
2585         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
2586         uint64_t            size, offs;
2587         int                 rc, stripe;
2588         ENTRY;
2589
2590         /* XXX: we support the simplest (RAID0) striping so far */
2591         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
2592         LASSERT(lo->ldo_stripe_size > 0);
2593
2594         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
2595         LASSERT(attr->la_valid & LA_SIZE);
2596         if (rc)
2597                 RETURN(rc);
2598
2599         size = attr->la_size;
2600         if (size == 0)
2601                 RETURN(0);
2602
2603         /* ll_do_div64(a, b) returns a % b, and a = a / b */
2604         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
2605         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
2606
2607         size = size * lo->ldo_stripe_size;
2608         offs = attr->la_size;
2609         size += ll_do_div64(offs, lo->ldo_stripe_size);
2610
2611         attr->la_valid = LA_SIZE;
2612         attr->la_size = size;
2613
2614         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
2615
2616         RETURN(rc);
2617 }
2618
2619 /**
2620  * Create declaration of striped object
2621  */
2622 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
2623                                struct lu_attr *attr,
2624                                const struct lu_buf *lovea, struct thandle *th)
2625 {
2626         struct lod_thread_info  *info = lod_env_info(env);
2627         struct dt_object        *next = dt_object_child(dt);
2628         struct lod_object       *lo = lod_dt_obj(dt);
2629         int                      rc;
2630         ENTRY;
2631
2632         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
2633                 /* failed to create striping, let's reset
2634                  * config so that others don't get confused */
2635                 lod_object_free_striping(env, lo);
2636                 GOTO(out, rc = -ENOMEM);
2637         }
2638
2639         if (!dt_object_remote(next)) {
2640                 /* choose OST and generate appropriate objects */
2641                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
2642                 if (rc) {
2643                         /* failed to create striping, let's reset
2644                          * config so that others don't get confused */
2645                         lod_object_free_striping(env, lo);
2646                         GOTO(out, rc);
2647                 }
2648
2649                 /*
2650                  * declare storage for striping data
2651                  */
2652                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
2653                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
2654         } else {
2655                 /* LOD can not choose OST objects for remote objects, i.e.
2656                  * stripes must be ready before that. Right now, it can only
2657                  * happen during migrate, i.e. migrate process needs to create
2658                  * remote regular file (mdd_migrate_create), then the migrate
2659                  * process will provide stripeEA. */
2660                 LASSERT(lovea != NULL);
2661                 info->lti_buf = *lovea;
2662         }
2663
2664         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
2665                                   XATTR_NAME_LOV, 0, th);
2666         if (rc)
2667                 GOTO(out, rc);
2668
2669         /*
2670          * if striping is created with local object's size > 0,
2671          * we have to propagate this size to specific object
2672          * the case is possible only when local object was created previously
2673          */
2674         if (dt_object_exists(next))
2675                 rc = lod_declare_init_size(env, dt, th);
2676
2677 out:
2678         RETURN(rc);
2679 }
2680
2681 static int lod_declare_object_create(const struct lu_env *env,
2682                                      struct dt_object *dt,
2683                                      struct lu_attr *attr,
2684                                      struct dt_allocation_hint *hint,
2685                                      struct dt_object_format *dof,
2686                                      struct thandle *th)
2687 {
2688         struct dt_object   *next = dt_object_child(dt);
2689         struct lod_object  *lo = lod_dt_obj(dt);
2690         int                 rc;
2691         ENTRY;
2692
2693         LASSERT(dof);
2694         LASSERT(attr);
2695         LASSERT(th);
2696
2697         /*
2698          * first of all, we declare creation of local object
2699          */
2700         rc = dt_declare_create(env, next, attr, hint, dof, th);
2701         if (rc)
2702                 GOTO(out, rc);
2703
2704         if (dof->dof_type == DFT_SYM)
2705                 dt->do_body_ops = &lod_body_lnk_ops;
2706
2707         /*
2708          * it's lod_ah_init() who has decided the object will striped
2709          */
2710         if (dof->dof_type == DFT_REGULAR) {
2711                 /* callers don't want stripes */
2712                 /* XXX: all tricky interactions with ->ah_make_hint() decided
2713                  * to use striping, then ->declare_create() behaving differently
2714                  * should be cleaned */
2715                 if (dof->u.dof_reg.striped == 0)
2716                         lo->ldo_stripenr = 0;
2717                 if (lo->ldo_stripenr > 0)
2718                         rc = lod_declare_striped_object(env, dt, attr,
2719                                                         NULL, th);
2720         } else if (dof->dof_type == DFT_DIR) {
2721                 /* Orphan object (like migrating object) does not have
2722                  * lod_dir_stripe, see lod_ah_init */
2723                 if (lo->ldo_dir_stripe != NULL)
2724                         rc = lod_declare_dir_striping_create(env, dt, attr,
2725                                                              dof, th);
2726         }
2727 out:
2728         RETURN(rc);
2729 }
2730
2731 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
2732                         struct lu_attr *attr, struct dt_object_format *dof,
2733                         struct thandle *th)
2734 {
2735         struct lod_object *lo = lod_dt_obj(dt);
2736         int                rc = 0, i;
2737         ENTRY;
2738
2739         LASSERT(lo->ldo_striping_cached == 0);
2740
2741         /* create all underlying objects */
2742         for (i = 0; i < lo->ldo_stripenr; i++) {
2743                 LASSERT(lo->ldo_stripe[i]);
2744                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
2745
2746                 if (rc)
2747                         break;
2748         }
2749         if (rc == 0)
2750                 rc = lod_generate_and_set_lovea(env, lo, th);
2751
2752         RETURN(rc);
2753 }
2754
2755 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
2756                              struct lu_attr *attr,
2757                              struct dt_allocation_hint *hint,
2758                              struct dt_object_format *dof, struct thandle *th)
2759 {
2760         struct dt_object   *next = dt_object_child(dt);
2761         struct lod_object  *lo = lod_dt_obj(dt);
2762         int                 rc;
2763         ENTRY;
2764
2765         /* create local object */
2766         rc = dt_create(env, next, attr, hint, dof, th);
2767         if (rc != 0)
2768                 RETURN(rc);
2769
2770         if (S_ISREG(dt->do_lu.lo_header->loh_attr) &&
2771             lo->ldo_stripe && dof->u.dof_reg.striped != 0)
2772                 rc = lod_striping_create(env, dt, attr, dof, th);
2773
2774         RETURN(rc);
2775 }
2776
2777 static int lod_declare_object_destroy(const struct lu_env *env,
2778                                       struct dt_object *dt,
2779                                       struct thandle *th)
2780 {
2781         struct dt_object   *next = dt_object_child(dt);
2782         struct lod_object  *lo = lod_dt_obj(dt);
2783         struct lod_thread_info *info = lod_env_info(env);
2784         char               *stripe_name = info->lti_key;
2785         int                 rc, i;
2786         ENTRY;
2787
2788         /*
2789          * load striping information, notice we don't do this when object
2790          * is being initialized as we don't need this information till
2791          * few specific cases like destroy, chown
2792          */
2793         rc = lod_load_striping(env, lo);
2794         if (rc)
2795                 RETURN(rc);
2796
2797         /* declare destroy for all underlying objects */
2798         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2799                 rc = next->do_ops->do_index_try(env, next,
2800                                                 &dt_directory_features);
2801                 if (rc != 0)
2802                         RETURN(rc);
2803
2804                 for (i = 0; i < lo->ldo_stripenr; i++) {
2805                         rc = dt_declare_ref_del(env, next, th);
2806                         if (rc != 0)
2807                                 RETURN(rc);
2808                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2809                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2810                                 i);
2811                         rc = dt_declare_delete(env, next,
2812                                         (const struct dt_key *)stripe_name, th);
2813                         if (rc != 0)
2814                                 RETURN(rc);
2815                 }
2816         }
2817         /*
2818          * we declare destroy for the local object
2819          */
2820         rc = dt_declare_destroy(env, next, th);
2821         if (rc)
2822                 RETURN(rc);
2823
2824         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2825                 RETURN(0);
2826
2827         /* declare destroy all striped objects */
2828         for (i = 0; i < lo->ldo_stripenr; i++) {
2829                 if (likely(lo->ldo_stripe[i] != NULL)) {
2830                         rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2831                         if (rc != 0)
2832                                 break;
2833                 }
2834         }
2835
2836         RETURN(rc);
2837 }
2838
2839 static int lod_object_destroy(const struct lu_env *env,
2840                 struct dt_object *dt, struct thandle *th)
2841 {
2842         struct dt_object  *next = dt_object_child(dt);
2843         struct lod_object *lo = lod_dt_obj(dt);
2844         struct lod_thread_info *info = lod_env_info(env);
2845         char               *stripe_name = info->lti_key;
2846         int                rc, i;
2847         ENTRY;
2848
2849         /* destroy sub-stripe of master object */
2850         if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
2851                 rc = next->do_ops->do_index_try(env, next,
2852                                                 &dt_directory_features);
2853                 if (rc != 0)
2854                         RETURN(rc);
2855
2856                 for (i = 0; i < lo->ldo_stripenr; i++) {
2857                         rc = dt_ref_del(env, next, th);
2858                         if (rc != 0)
2859                                 RETURN(rc);
2860
2861                         snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
2862                                 PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)),
2863                                 i);
2864
2865                         CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n",
2866                                PFID(lu_object_fid(&dt->do_lu)), stripe_name,
2867                                PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)));
2868
2869                         rc = dt_delete(env, next,
2870                                        (const struct dt_key *)stripe_name,
2871                                        th, BYPASS_CAPA);
2872                         if (rc != 0)
2873                                 RETURN(rc);
2874                 }
2875         }
2876         rc = dt_destroy(env, next, th);
2877         if (rc != 0)
2878                 RETURN(rc);
2879
2880         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2881                 RETURN(0);
2882
2883         /* destroy all striped objects */
2884         for (i = 0; i < lo->ldo_stripenr; i++) {
2885                 if (likely(lo->ldo_stripe[i] != NULL) &&
2886                     (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) ||
2887                      i == cfs_fail_val)) {
2888                         rc = dt_destroy(env, lo->ldo_stripe[i], th);
2889                         if (rc != 0)
2890                                 break;
2891                 }
2892         }
2893
2894         RETURN(rc);
2895 }
2896
2897 static int lod_declare_ref_add(const struct lu_env *env,
2898                                struct dt_object *dt, struct thandle *th)
2899 {
2900         return dt_declare_ref_add(env, dt_object_child(dt), th);
2901 }
2902
2903 static int lod_ref_add(const struct lu_env *env,
2904                        struct dt_object *dt, struct thandle *th)
2905 {
2906         return dt_ref_add(env, dt_object_child(dt), th);
2907 }
2908
2909 static int lod_declare_ref_del(const struct lu_env *env,
2910                                struct dt_object *dt, struct thandle *th)
2911 {
2912         return dt_declare_ref_del(env, dt_object_child(dt), th);
2913 }
2914
2915 static int lod_ref_del(const struct lu_env *env,
2916                        struct dt_object *dt, struct thandle *th)
2917 {
2918         return dt_ref_del(env, dt_object_child(dt), th);
2919 }
2920
2921 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2922                                      struct dt_object *dt,
2923                                      struct lustre_capa *old, __u64 opc)
2924 {
2925         return dt_capa_get(env, dt_object_child(dt), old, opc);
2926 }
2927
2928 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
2929                            __u64 start, __u64 end)
2930 {
2931         return dt_object_sync(env, dt_object_child(dt), start, end);
2932 }
2933
2934 struct lod_slave_locks  {
2935         int                     lsl_lock_count;
2936         struct lustre_handle    lsl_handle[0];
2937 };
2938
2939 static int lod_object_unlock_internal(const struct lu_env *env,
2940                                       struct dt_object *dt,
2941                                       struct ldlm_enqueue_info *einfo,
2942                                       ldlm_policy_data_t *policy)
2943 {
2944         struct lod_object       *lo = lod_dt_obj(dt);
2945         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2946         int                     rc = 0;
2947         int                     i;
2948         ENTRY;
2949
2950         if (slave_locks == NULL)
2951                 RETURN(0);
2952
2953         for (i = 1; i < slave_locks->lsl_lock_count; i++) {
2954                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2955                         int     rc1;
2956
2957                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2958                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2959                                                policy);
2960                         if (rc1 < 0)
2961                                 rc = rc == 0 ? rc1 : rc;
2962                 }
2963         }
2964
2965         RETURN(rc);
2966 }
2967
2968 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2969                              struct ldlm_enqueue_info *einfo,
2970                              union ldlm_policy_data *policy)
2971 {
2972         struct lod_object       *lo = lod_dt_obj(dt);
2973         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2974         int                     slave_locks_size;
2975         int                     rc;
2976         ENTRY;
2977
2978         if (slave_locks == NULL)
2979                 RETURN(0);
2980
2981         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2982                 RETURN(-ENOTDIR);
2983
2984         rc = lod_load_striping(env, lo);
2985         if (rc != 0)
2986                 RETURN(rc);
2987
2988         /* Note: for remote lock for single stripe dir, MDT will cancel
2989          * the lock by lockh directly */
2990         if (lo->ldo_stripenr <= 1 && dt_object_remote(dt_object_child(dt)))
2991                 RETURN(0);
2992
2993         /* Only cancel slave lock for striped dir */
2994         rc = lod_object_unlock_internal(env, dt, einfo, policy);
2995
2996         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2997                            sizeof(slave_locks->lsl_handle[0]);
2998         OBD_FREE(slave_locks, slave_locks_size);
2999         einfo->ei_cbdata = NULL;
3000
3001         RETURN(rc);
3002 }
3003
3004 static int lod_object_lock(const struct lu_env *env,
3005                            struct dt_object *dt,
3006                            struct lustre_handle *lh,
3007                            struct ldlm_enqueue_info *einfo,
3008                            union ldlm_policy_data *policy)
3009 {
3010         struct lod_object       *lo = lod_dt_obj(dt);
3011         int                     rc = 0;
3012         int                     i;
3013         int                     slave_locks_size;
3014         struct lod_slave_locks  *slave_locks = NULL;
3015         ENTRY;
3016
3017         /* remote object lock */
3018         if (!einfo->ei_enq_slave) {
3019                 LASSERT(dt_object_remote(dt));
3020                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
3021                                       policy);
3022         }
3023
3024         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
3025                 RETURN(-ENOTDIR);
3026
3027         rc = lod_load_striping(env, lo);
3028         if (rc != 0)
3029                 RETURN(rc);
3030
3031         /* No stripes */
3032         if (lo->ldo_stripenr <= 1)
3033                 RETURN(0);
3034
3035         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
3036                            sizeof(slave_locks->lsl_handle[0]);
3037         /* Freed in lod_object_unlock */
3038         OBD_ALLOC(slave_locks, slave_locks_size);
3039         if (slave_locks == NULL)
3040                 RETURN(-ENOMEM);
3041         slave_locks->lsl_lock_count = lo->ldo_stripenr;
3042
3043         /* striped directory lock */
3044         for (i = 1; i < lo->ldo_stripenr; i++) {
3045                 struct lustre_handle    lockh;
3046                 struct ldlm_res_id      *res_id;
3047
3048                 res_id = &lod_env_info(env)->lti_res_id;
3049                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
3050                                        res_id);
3051                 einfo->ei_res_id = res_id;
3052
3053                 LASSERT(lo->ldo_stripe[i]);
3054                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
3055                                     policy);
3056                 if (rc != 0)
3057                         GOTO(out, rc);
3058                 slave_locks->lsl_handle[i] = lockh;
3059         }
3060
3061         einfo->ei_cbdata = slave_locks;
3062
3063 out:
3064         if (rc != 0 && slave_locks != NULL) {
3065                 einfo->ei_cbdata = slave_locks;
3066                 lod_object_unlock_internal(env, dt, einfo, policy);
3067                 OBD_FREE(slave_locks, slave_locks_size);
3068                 einfo->ei_cbdata = NULL;
3069         }
3070
3071         RETURN(rc);
3072 }
3073
3074 struct dt_object_operations lod_obj_ops = {
3075         .do_read_lock           = lod_object_read_lock,
3076         .do_write_lock          = lod_object_write_lock,
3077         .do_read_unlock         = lod_object_read_unlock,
3078         .do_write_unlock        = lod_object_write_unlock,
3079         .do_write_locked        = lod_object_write_locked,
3080         .do_attr_get            = lod_attr_get,
3081         .do_declare_attr_set    = lod_declare_attr_set,
3082         .do_attr_set            = lod_attr_set,
3083         .do_xattr_get           = lod_xattr_get,
3084         .do_declare_xattr_set   = lod_declare_xattr_set,
3085         .do_xattr_set           = lod_xattr_set,
3086         .do_declare_xattr_del   = lod_declare_xattr_del,
3087         .do_xattr_del           = lod_xattr_del,
3088         .do_xattr_list          = lod_xattr_list,
3089         .do_ah_init             = lod_ah_init,
3090         .do_declare_create      = lod_declare_object_create,
3091         .do_create              = lod_object_create,
3092         .do_declare_destroy     = lod_declare_object_destroy,
3093         .do_destroy             = lod_object_destroy,
3094         .do_index_try           = lod_index_try,
3095         .do_declare_ref_add     = lod_declare_ref_add,
3096         .do_ref_add             = lod_ref_add,
3097         .do_declare_ref_del     = lod_declare_ref_del,
3098         .do_ref_del             = lod_ref_del,
3099         .do_capa_get            = lod_capa_get,
3100         .do_object_sync         = lod_object_sync,
3101         .do_object_lock         = lod_object_lock,
3102         .do_object_unlock       = lod_object_unlock,
3103 };
3104
3105 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
3106                         struct lu_buf *buf, loff_t *pos,
3107                         struct lustre_capa *capa)
3108 {
3109         struct dt_object *next = dt_object_child(dt);
3110         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
3111 }
3112
3113 static ssize_t lod_declare_write(const struct lu_env *env,
3114                                  struct dt_object *dt,
3115                                  const struct lu_buf *buf, loff_t pos,
3116                                  struct thandle *th)
3117 {
3118         return dt_declare_record_write(env, dt_object_child(dt),
3119                                        buf, pos, th);
3120 }
3121
3122 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
3123                          const struct lu_buf *buf, loff_t *pos,
3124                          struct thandle *th, struct lustre_capa *capa, int iq)
3125 {
3126         struct dt_object *next = dt_object_child(dt);
3127         LASSERT(next);
3128         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
3129 }
3130
3131 static const struct dt_body_operations lod_body_lnk_ops = {
3132         .dbo_read               = lod_read,
3133         .dbo_declare_write      = lod_declare_write,
3134         .dbo_write              = lod_write
3135 };
3136
3137 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
3138                            const struct lu_object_conf *conf)
3139 {
3140         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
3141         struct lu_device        *cdev   = NULL;
3142         struct lu_object        *cobj;
3143         struct lod_tgt_descs    *ltd    = NULL;
3144         struct lod_tgt_desc     *tgt;
3145         mdsno_t                  idx    = 0;
3146         int                      type   = LU_SEQ_RANGE_ANY;
3147         int                      rc;
3148         ENTRY;
3149
3150         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
3151         if (rc != 0)
3152                 RETURN(rc);
3153
3154         if (type == LU_SEQ_RANGE_MDT &&
3155             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
3156                 cdev = &lod->lod_child->dd_lu_dev;
3157         } else if (type == LU_SEQ_RANGE_MDT) {
3158                 ltd = &lod->lod_mdt_descs;
3159                 lod_getref(ltd);
3160         } else if (type == LU_SEQ_RANGE_OST) {
3161                 ltd = &lod->lod_ost_descs;
3162                 lod_getref(ltd);
3163         } else {
3164                 LBUG();
3165         }
3166
3167         if (ltd != NULL) {
3168                 if (ltd->ltd_tgts_size > idx &&
3169                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
3170                         tgt = LTD_TGT(ltd, idx);
3171
3172                         LASSERT(tgt != NULL);
3173                         LASSERT(tgt->ltd_tgt != NULL);
3174
3175                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
3176                 }
3177                 lod_putref(lod, ltd);
3178         }
3179
3180         if (unlikely(cdev == NULL))
3181                 RETURN(-ENOENT);
3182
3183         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
3184         if (unlikely(cobj == NULL))
3185                 RETURN(-ENOMEM);
3186
3187         lu_object_add(lo, cobj);
3188
3189         RETURN(0);
3190 }
3191
3192 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
3193 {
3194         int i;
3195
3196         if (lo->ldo_dir_stripe != NULL) {
3197                 OBD_FREE_PTR(lo->ldo_dir_stripe);
3198                 lo->ldo_dir_stripe = NULL;
3199         }
3200
3201         if (lo->ldo_stripe) {
3202                 LASSERT(lo->ldo_stripes_allocated > 0);
3203
3204                 for (i = 0; i < lo->ldo_stripenr; i++) {
3205                         if (lo->ldo_stripe[i])
3206                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
3207                 }
3208
3209                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
3210                 OBD_FREE(lo->ldo_stripe, i);
3211                 lo->ldo_stripe = NULL;
3212                 lo->ldo_stripes_allocated = 0;
3213         }
3214         lo->ldo_stripenr = 0;
3215         lo->ldo_pattern = 0;
3216 }
3217
3218 /*
3219  * ->start is called once all slices are initialized, including header's
3220  * cache for mode (object type). using the type we can initialize ops
3221  */
3222 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
3223 {
3224         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
3225                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
3226         return 0;
3227 }
3228
3229 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
3230 {
3231         struct lod_object *mo = lu2lod_obj(o);
3232
3233         /*
3234          * release all underlying object pinned
3235          */
3236
3237         lod_object_free_striping(env, mo);
3238
3239         lod_object_set_pool(mo, NULL);
3240
3241         lu_object_fini(o);
3242         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
3243 }
3244
3245 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
3246 {
3247         /* XXX: shouldn't we release everything here in case if object
3248          * creation failed before? */
3249 }
3250
3251 static int lod_object_print(const struct lu_env *env, void *cookie,
3252                             lu_printer_t p, const struct lu_object *l)
3253 {
3254         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
3255
3256         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
3257 }
3258
3259 struct lu_object_operations lod_lu_obj_ops = {
3260         .loo_object_init        = lod_object_init,
3261         .loo_object_start       = lod_object_start,
3262         .loo_object_free        = lod_object_free,
3263         .loo_object_release     = lod_object_release,
3264         .loo_object_print       = lod_object_print,
3265 };