Whamcloud - gitweb
LU-3336 lfsck: regenerate lost layout EA
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <obd_lov.h>
47 #include <md_object.h>
48
49 #include "lod_internal.h"
50
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
53
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
56
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58                             struct dt_rec *rec, const struct dt_key *key,
59                             struct lustre_capa *capa)
60 {
61         struct dt_object *next = dt_object_child(dt);
62         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
63 }
64
65 static int lod_declare_index_insert(const struct lu_env *env,
66                                     struct dt_object *dt,
67                                     const struct dt_rec *rec,
68                                     const struct dt_key *key,
69                                     struct thandle *handle)
70 {
71         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
72 }
73
74 static int lod_index_insert(const struct lu_env *env,
75                             struct dt_object *dt,
76                             const struct dt_rec *rec,
77                             const struct dt_key *key,
78                             struct thandle *th,
79                             struct lustre_capa *capa,
80                             int ign)
81 {
82         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
83 }
84
85 static int lod_declare_index_delete(const struct lu_env *env,
86                                     struct dt_object *dt,
87                                     const struct dt_key *key,
88                                     struct thandle *th)
89 {
90         return dt_declare_delete(env, dt_object_child(dt), key, th);
91 }
92
93 static int lod_index_delete(const struct lu_env *env,
94                             struct dt_object *dt,
95                             const struct dt_key *key,
96                             struct thandle *th,
97                             struct lustre_capa *capa)
98 {
99         return dt_delete(env, dt_object_child(dt), key, th, capa);
100 }
101
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103                                  struct dt_object *dt, __u32 attr,
104                                  struct lustre_capa *capa)
105 {
106         struct dt_object        *next = dt_object_child(dt);
107         struct lod_it           *it = &lod_env_info(env)->lti_it;
108         struct dt_it            *it_next;
109
110
111         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
112         if (IS_ERR(it_next))
113                 return it_next;
114
115         /* currently we do not use more than one iterator per thread
116          * so we store it in thread info. if at some point we need
117          * more active iterators in a single thread, we can allocate
118          * additional ones */
119         LASSERT(it->lit_obj == NULL);
120
121         it->lit_it = it_next;
122         it->lit_obj = next;
123
124         return (struct dt_it *)it;
125 }
126
127 #define LOD_CHECK_IT(env, it)                                   \
128 {                                                               \
129         LASSERT((it)->lit_obj != NULL);                         \
130         LASSERT((it)->lit_it != NULL);                          \
131 } while(0)
132
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
134 {
135         struct lod_it *it = (struct lod_it *)di;
136
137         LOD_CHECK_IT(env, it);
138         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
139
140         /* the iterator not in use any more */
141         it->lit_obj = NULL;
142         it->lit_it = NULL;
143 }
144
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146                const struct dt_key *key)
147 {
148         const struct lod_it *it = (const struct lod_it *)di;
149
150         LOD_CHECK_IT(env, it);
151         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
152 }
153
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
155 {
156         struct lod_it *it = (struct lod_it *)di;
157
158         LOD_CHECK_IT(env, it);
159         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
160 }
161
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
163 {
164         struct lod_it *it = (struct lod_it *)di;
165
166         LOD_CHECK_IT(env, it);
167         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
168 }
169
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
171 {
172         const struct lod_it *it = (const struct lod_it *)di;
173
174         LOD_CHECK_IT(env, it);
175         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
176 }
177
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
179 {
180         struct lod_it *it = (struct lod_it *)di;
181
182         LOD_CHECK_IT(env, it);
183         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
184 }
185
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187                struct dt_rec *rec, __u32 attr)
188 {
189         const struct lod_it *it = (const struct lod_it *)di;
190
191         LOD_CHECK_IT(env, it);
192         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
193 }
194
195 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
196 {
197         const struct lod_it *it = (const struct lod_it *)di;
198
199         LOD_CHECK_IT(env, it);
200         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
201 }
202
203 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
204 {
205         const struct lod_it *it = (const struct lod_it *)di;
206
207         LOD_CHECK_IT(env, it);
208         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
209 }
210
211 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
212                    void* key_rec)
213 {
214         const struct lod_it *it = (const struct lod_it *)di;
215
216         LOD_CHECK_IT(env, it);
217         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec);
218 }
219
220 static struct dt_index_operations lod_index_ops = {
221         .dio_lookup             = lod_index_lookup,
222         .dio_declare_insert     = lod_declare_index_insert,
223         .dio_insert             = lod_index_insert,
224         .dio_declare_delete     = lod_declare_index_delete,
225         .dio_delete             = lod_index_delete,
226         .dio_it = {
227                 .init           = lod_it_init,
228                 .fini           = lod_it_fini,
229                 .get            = lod_it_get,
230                 .put            = lod_it_put,
231                 .next           = lod_it_next,
232                 .key            = lod_it_key,
233                 .key_size       = lod_it_key_size,
234                 .rec            = lod_it_rec,
235                 .store          = lod_it_store,
236                 .load           = lod_it_load,
237                 .key_rec        = lod_it_key_rec,
238         }
239 };
240
241 static void lod_object_read_lock(const struct lu_env *env,
242                                  struct dt_object *dt, unsigned role)
243 {
244         dt_read_lock(env, dt_object_child(dt), role);
245 }
246
247 static void lod_object_write_lock(const struct lu_env *env,
248                                   struct dt_object *dt, unsigned role)
249 {
250         dt_write_lock(env, dt_object_child(dt), role);
251 }
252
253 static void lod_object_read_unlock(const struct lu_env *env,
254                                    struct dt_object *dt)
255 {
256         dt_read_unlock(env, dt_object_child(dt));
257 }
258
259 static void lod_object_write_unlock(const struct lu_env *env,
260                                     struct dt_object *dt)
261 {
262         dt_write_unlock(env, dt_object_child(dt));
263 }
264
265 static int lod_object_write_locked(const struct lu_env *env,
266                                    struct dt_object *dt)
267 {
268         return dt_write_locked(env, dt_object_child(dt));
269 }
270
271 static int lod_attr_get(const struct lu_env *env,
272                         struct dt_object *dt,
273                         struct lu_attr *attr,
274                         struct lustre_capa *capa)
275 {
276         return dt_attr_get(env, dt_object_child(dt), attr, capa);
277 }
278
279 static int lod_declare_attr_set(const struct lu_env *env,
280                                 struct dt_object *dt,
281                                 const struct lu_attr *attr,
282                                 struct thandle *handle)
283 {
284         struct dt_object  *next = dt_object_child(dt);
285         struct lod_object *lo = lod_dt_obj(dt);
286         int                rc, i;
287         ENTRY;
288
289         /*
290          * declare setattr on the local object
291          */
292         rc = dt_declare_attr_set(env, next, attr, handle);
293         if (rc)
294                 RETURN(rc);
295
296         /* osp_declare_attr_set() ignores all attributes other than
297          * UID, GID, and size, and osp_attr_set() ignores all but UID
298          * and GID.  Declaration of size attr setting happens through
299          * lod_declare_init_size(), and not through this function.
300          * Therefore we need not load striping unless ownership is
301          * changing.  This should save memory and (we hope) speed up
302          * rename(). */
303         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
304                 if (!(attr->la_valid & (LA_UID | LA_GID)))
305                         RETURN(rc);
306
307                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
308                         RETURN(0);
309         } else {
310                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
311                                         LA_ATIME | LA_MTIME | LA_CTIME)))
312                         RETURN(rc);
313         }
314         /*
315          * load striping information, notice we don't do this when object
316          * is being initialized as we don't need this information till
317          * few specific cases like destroy, chown
318          */
319         rc = lod_load_striping(env, lo);
320         if (rc)
321                 RETURN(rc);
322
323         if (lo->ldo_stripenr == 0)
324                 RETURN(0);
325
326         if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
327                 struct lu_attr   *la = &lod_env_info(env)->lti_attr;
328                 bool             setattr_time = false;
329
330                 rc = dt_attr_get(env, dt_object_child(dt), la,
331                                  BYPASS_CAPA);
332                 if (rc != 0)
333                         RETURN(rc);
334
335                 /* If it will only setattr time, it will only set
336                  * time < current_time */
337                 if ((attr->la_valid & LA_ATIME &&
338                      attr->la_atime < la->la_atime) ||
339                     (attr->la_valid & LA_CTIME &&
340                      attr->la_ctime < la->la_ctime) ||
341                     (attr->la_valid & LA_MTIME &&
342                      attr->la_mtime < la->la_mtime))
343                         setattr_time = true;
344
345                 if (!setattr_time)
346                         RETURN(0);
347         }
348         /*
349          * if object is striped declare changes on the stripes
350          */
351         LASSERT(lo->ldo_stripe);
352         for (i = 0; i < lo->ldo_stripenr; i++) {
353                 LASSERT(lo->ldo_stripe[i]);
354
355                 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
356                 if (rc) {
357                         CERROR("failed declaration: %d\n", rc);
358                         break;
359                 }
360         }
361
362         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
363             dt_object_exists(next) != 0 &&
364             dt_object_remote(next) == 0)
365                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
366
367         RETURN(rc);
368 }
369
370 static int lod_attr_set(const struct lu_env *env,
371                         struct dt_object *dt,
372                         const struct lu_attr *attr,
373                         struct thandle *handle,
374                         struct lustre_capa *capa)
375 {
376         struct dt_object  *next = dt_object_child(dt);
377         struct lod_object *lo = lod_dt_obj(dt);
378         int                rc, i;
379         ENTRY;
380
381         /*
382          * apply changes to the local object
383          */
384         rc = dt_attr_set(env, next, attr, handle, capa);
385         if (rc)
386                 RETURN(rc);
387
388         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
389                 if (!(attr->la_valid & (LA_UID | LA_GID)))
390                         RETURN(rc);
391
392                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
393                         RETURN(0);
394         } else {
395                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
396                                         LA_ATIME | LA_MTIME | LA_CTIME)))
397                         RETURN(rc);
398         }
399
400         if (lo->ldo_stripenr == 0)
401                 RETURN(0);
402
403         if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
404                 struct lu_attr   *la = &lod_env_info(env)->lti_attr;
405                 bool             setattr_time = false;
406
407                 rc = dt_attr_get(env, dt_object_child(dt), la,
408                                  BYPASS_CAPA);
409                 if (rc != 0)
410                         RETURN(rc);
411
412                 /* If it will only setattr time, it will only set
413                  * time < current_time */
414                 if ((attr->la_valid & LA_ATIME &&
415                      attr->la_atime < la->la_atime) ||
416                     (attr->la_valid & LA_CTIME &&
417                      attr->la_ctime < la->la_ctime) ||
418                     (attr->la_valid & LA_MTIME &&
419                      attr->la_mtime < la->la_mtime))
420                         setattr_time = true;
421
422                 if (!setattr_time)
423                         RETURN(0);
424         }
425
426         /*
427          * if object is striped, apply changes to all the stripes
428          */
429         LASSERT(lo->ldo_stripe);
430         for (i = 0; i < lo->ldo_stripenr; i++) {
431                 LASSERT(lo->ldo_stripe[i]);
432                 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
433                 if (rc) {
434                         CERROR("failed declaration: %d\n", rc);
435                         break;
436                 }
437         }
438
439         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
440             dt_object_exists(next) != 0 &&
441             dt_object_remote(next) == 0)
442                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
443
444         RETURN(rc);
445 }
446
447 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
448                          struct lu_buf *buf, const char *name,
449                          struct lustre_capa *capa)
450 {
451         struct lod_thread_info  *info = lod_env_info(env);
452         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
453         int                      rc, is_root;
454         ENTRY;
455
456         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
457         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
458                 RETURN(rc);
459
460         /*
461          * lod returns default striping on the real root of the device
462          * this is like the root stores default striping for the whole
463          * filesystem. historically we've been using a different approach
464          * and store it in the config.
465          */
466         dt_root_get(env, dev->lod_child, &info->lti_fid);
467         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
468
469         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
470                 struct lov_user_md *lum = buf->lb_buf;
471                 struct lov_desc    *desc = &dev->lod_desc;
472
473                 if (buf->lb_buf == NULL) {
474                         rc = sizeof(*lum);
475                 } else if (buf->lb_len >= sizeof(*lum)) {
476                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
477                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
478                         lmm_oi_set_id(&lum->lmm_oi, 0);
479                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
480                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
481                         lum->lmm_stripe_size = cpu_to_le32(
482                                                 desc->ld_default_stripe_size);
483                         lum->lmm_stripe_count = cpu_to_le16(
484                                                 desc->ld_default_stripe_count);
485                         lum->lmm_stripe_offset = cpu_to_le16(
486                                                 desc->ld_default_stripe_offset);
487                         rc = sizeof(*lum);
488                 } else {
489                         rc = -ERANGE;
490                 }
491         }
492
493         RETURN(rc);
494 }
495
496 static int lod_verify_md_striping(struct lod_device *lod,
497                                   const struct lmv_user_md_v1 *lum)
498 {
499         int     rc = 0;
500         ENTRY;
501
502         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
503                 GOTO(out, rc = -EINVAL);
504
505         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
506                 GOTO(out, rc = -EINVAL);
507
508         if (unlikely(le32_to_cpu(lum->lum_stripe_count) >
509                                 lod->lod_remote_mdt_count + 1))
510                 GOTO(out, rc = -EINVAL);
511 out:
512         if (rc != 0)
513                 CERROR("%s: invalid lmv_user_md: magic = %x, "
514                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
515                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
516                        (int)le32_to_cpu(lum->lum_stripe_offset),
517                        le32_to_cpu(lum->lum_stripe_count), rc);
518         return rc;
519 }
520
521 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
522                     struct lu_buf *lmv_buf)
523 {
524         struct lod_thread_info  *info = lod_env_info(env);
525         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
526         struct lod_object       *lo = lod_dt_obj(dt);
527         struct lmv_mds_md_v1    *lmm1;
528         int                     stripe_count;
529         int                     lmm_size;
530         int                     type = LU_SEQ_RANGE_ANY;
531         int                     i;
532         int                     rc;
533         __u32                   mdtidx;
534         ENTRY;
535
536         LASSERT(lo->ldo_dir_striped != 0);
537         LASSERT(lo->ldo_stripenr > 0);
538         stripe_count = lo->ldo_stripenr + 1;
539         lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
540         if (info->lti_ea_store_size < lmm_size) {
541                 rc = lod_ea_store_resize(info, lmm_size);
542                 if (rc != 0)
543                         RETURN(rc);
544         }
545
546         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
547         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
548         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
549         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
550         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
551                             &mdtidx, &type);
552         if (rc != 0)
553                 RETURN(rc);
554
555         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
556         fid_cpu_to_le(&lmm1->lmv_stripe_fids[0], lu_object_fid(&dt->do_lu));
557         for (i = 0; i < lo->ldo_stripenr; i++) {
558                 struct dt_object *dto;
559
560                 dto = lo->ldo_stripe[i];
561                 LASSERT(dto != NULL);
562                 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i + 1],
563                               lu_object_fid(&dto->do_lu));
564         }
565
566         lmv_buf->lb_buf = info->lti_ea_store;
567         lmv_buf->lb_len = lmm_size;
568         lo->ldo_dir_striping_cached = 1;
569
570         RETURN(rc);
571 }
572
573 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
574                            const struct lu_buf *buf)
575 {
576         struct lod_thread_info  *info = lod_env_info(env);
577         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
578         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
579         struct dt_object        **stripe;
580         union lmv_mds_md        *lmm = buf->lb_buf;
581         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
582         struct lu_fid           *fid = &info->lti_fid;
583         int                     i;
584         int                     rc = 0;
585         ENTRY;
586
587         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
588                 RETURN(-EINVAL);
589
590         if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
591                 RETURN(0);
592
593         fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[0]);
594         /* Do not load striping information for slave inode */
595         if (!lu_fid_eq(fid, lu_object_fid(&lo->ldo_obj.do_lu))) {
596                 lo->ldo_dir_slave_stripe = 1;
597                 RETURN(0);
598         }
599
600         LASSERT(lo->ldo_stripe == NULL);
601         OBD_ALLOC(stripe, sizeof(stripe[0]) *
602                   (le32_to_cpu(lmv1->lmv_stripe_count) - 1));
603         if (stripe == NULL)
604                 RETURN(-ENOMEM);
605
606         /* skip master stripe */
607         for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
608                 struct lod_tgt_desc     *tgt;
609                 int                     idx;
610                 int                     type = LU_SEQ_RANGE_ANY;
611                 struct dt_object        *dto;
612
613                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
614                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
615                 if (rc != 0)
616                         GOTO(out, rc);
617
618                 tgt = LTD_TGT(ltd, idx);
619                 if (tgt == NULL)
620                         GOTO(out, rc = -ESTALE);
621
622                 dto = dt_locate_at(env, tgt->ltd_tgt, fid,
623                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
624                                   NULL);
625                 if (IS_ERR(dto))
626                         GOTO(out, rc = PTR_ERR(dto));
627
628                 stripe[i - 1] = dto;
629         }
630 out:
631         lo->ldo_stripe = stripe;
632         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
633         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
634         if (rc != 0)
635                 lod_object_free_striping(env, lo);
636
637         RETURN(rc);
638 }
639
640 static int lod_prep_md_striped_create(const struct lu_env *env,
641                                       struct dt_object *dt,
642                                       struct lu_attr *attr,
643                                       const struct lmv_user_md_v1 *lum,
644                                       struct thandle *th)
645 {
646         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
647         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
648         struct lod_object       *lo = lod_dt_obj(dt);
649         struct dt_object        **stripe;
650         struct lu_buf           lmv_buf;
651         int                     stripe_count;
652         int                     *idx_array;
653         int                     rc = 0;
654         int                     i;
655         int                     j;
656         ENTRY;
657
658         /* The lum has been verifed in lod_verify_md_striping */
659         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
660         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
661
662         /* Do not need allocated master stripe */
663         stripe_count = le32_to_cpu(lum->lum_stripe_count);
664         OBD_ALLOC(stripe, sizeof(stripe[0]) * (stripe_count - 1));
665         if (stripe == NULL)
666                 RETURN(-ENOMEM);
667
668         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
669         if (idx_array == NULL)
670                 GOTO(out_free, rc = -ENOMEM);
671
672         idx_array[0] = le32_to_cpu(lum->lum_stripe_offset);
673         for (i = 1; i < stripe_count; i++) {
674                 struct lod_tgt_desc     *tgt;
675                 struct dt_object        *dto;
676                 struct lu_fid           fid;
677                 int                     idx;
678                 struct lu_object_conf   conf = { 0 };
679
680                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
681
682                 for (j = 0; j < lod->lod_remote_mdt_count;
683                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
684                         bool already_allocated = false;
685                         int k;
686
687                         CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
688                                " allocated %d, last allocated %d\n", idx,
689                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
690
691                         /* Find next avaible target */
692                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
693                                 continue;
694
695                         /* check whether the idx already exists
696                          * in current allocated array */
697                         for (k = 0; k < i; k++) {
698                                 if (idx_array[k] == idx) {
699                                         already_allocated = true;
700                                         break;
701                                 }
702                         }
703
704                         if (already_allocated)
705                                 continue;
706
707                         break;
708                 }
709
710                 /* Can not allocate more stripes */
711                 if (j == lod->lod_remote_mdt_count) {
712                         CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
713                                lod2obd(lod)->obd_name, stripe_count, i - 1);
714                         break;
715                 }
716
717                 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
718                        " allocated %d, last allocated %d\n", idx,
719                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
720
721                 tgt = LTD_TGT(ltd, idx);
722                 LASSERT(tgt != NULL);
723
724                 rc = obd_fid_alloc(tgt->ltd_exp, &fid, NULL);
725                 if (rc < 0)
726                         GOTO(out_put, rc);
727                 rc = 0;
728
729                 conf.loc_flags = LOC_F_NEW;
730                 dto = dt_locate_at(env, tgt->ltd_tgt, &fid,
731                                   dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf);
732                 if (IS_ERR(dto))
733                         GOTO(out_put, rc = PTR_ERR(dto));
734                 stripe[i - 1] = dto;
735                 idx_array[i] = idx;
736         }
737
738         lo->ldo_dir_striped = 1;
739         lo->ldo_stripe = stripe;
740         lo->ldo_stripenr = i - 1;
741         lo->ldo_stripes_allocated = stripe_count - 1;
742
743         if (lo->ldo_stripenr == 0)
744                 GOTO(out_put, rc = -ENOSPC);
745
746         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
747         if (rc != 0)
748                 GOTO(out_put, rc);
749
750         for (i = 0; i < lo->ldo_stripenr; i++) {
751                 struct dt_object *dto;
752
753                 dto = stripe[i];
754                 /* only create slave striped object */
755                 rc = dt_declare_create(env, dto, attr, NULL, NULL, th);
756                 if (rc != 0)
757                         GOTO(out_put, rc);
758
759                 if (!dt_try_as_dir(env, dto))
760                         GOTO(out_put, rc = -EINVAL);
761
762                 rc = dt_declare_insert(env, dto,
763                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
764                      (const struct dt_key *)dot, th);
765                 if (rc != 0)
766                         GOTO(out_put, rc);
767
768                 /* master stripe FID will be put to .. */
769                 rc = dt_declare_insert(env, dto,
770                      (const struct dt_rec *)lu_object_fid(&dt->do_lu),
771                      (const struct dt_key *)dotdot, th);
772                 if (rc != 0)
773                         GOTO(out_put, rc);
774
775                 /* probably nothing to inherite */
776                 if (lo->ldo_striping_cached &&
777                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
778                                          lo->ldo_def_stripenr,
779                                          lo->ldo_def_stripe_offset)) {
780                         struct lod_thread_info  *info;
781                         struct lov_user_md_v3   *v3;
782
783                         /* sigh, lti_ea_store has been used for lmv_buf,
784                          * so we have to allocate buffer for default
785                          * stripe EA */
786                         OBD_ALLOC_PTR(v3);
787                         if (v3 == NULL)
788                                 GOTO(out_put, rc = -ENOMEM);
789
790                         memset(v3, 0, sizeof(*v3));
791                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
792                         v3->lmm_stripe_count =
793                                 cpu_to_le32(lo->ldo_def_stripenr);
794                         v3->lmm_stripe_offset =
795                                 cpu_to_le32(lo->ldo_def_stripe_offset);
796                         v3->lmm_stripe_size =
797                                 cpu_to_le32(lo->ldo_def_stripe_size);
798                         if (lo->ldo_pool)
799                                 strncpy(v3->lmm_pool_name, lo->ldo_pool,
800                                         LOV_MAXPOOLNAME);
801
802                         info = lod_env_info(env);
803                         info->lti_buf.lb_buf = v3;
804                         info->lti_buf.lb_len = sizeof(*v3);
805                         rc = dt_declare_xattr_set(env, dto,
806                                                   &info->lti_buf,
807                                                   XATTR_NAME_LOV,
808                                                   0, th);
809                         OBD_FREE_PTR(v3);
810                         if (rc != 0)
811                                 GOTO(out_put, rc);
812                 }
813                 rc = dt_declare_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, 0,
814                                           th);
815                 if (rc != 0)
816                         GOTO(out_put, rc);
817         }
818
819         rc = dt_declare_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, 0, th);
820         if (rc != 0)
821                 GOTO(out_put, rc);
822
823 out_put:
824         if (rc < 0) {
825                 for (i = 0; i < stripe_count - 1; i++)
826                         if (stripe[i] != NULL)
827                                 lu_object_put(env, &stripe[i]->do_lu);
828                 OBD_FREE(stripe, sizeof(stripe[0]) * (stripe_count - 1));
829         }
830
831 out_free:
832         if (idx_array != NULL)
833                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
834
835         RETURN(rc);
836 }
837
838 /**
839  * Declare create striped md object.
840  */
841 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
842                                      struct dt_object *dt,
843                                      struct lu_attr *attr,
844                                      const struct lu_buf *lum_buf,
845                                      struct thandle *th)
846 {
847         struct lod_object       *lo = lod_dt_obj(dt);
848         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
849         struct lmv_user_md_v1   *lum;
850         int                     rc;
851         ENTRY;
852
853         lum = lum_buf->lb_buf;
854         LASSERT(lum != NULL);
855
856         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
857                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
858                (int)le32_to_cpu(lum->lum_stripe_offset));
859
860         if (le32_to_cpu(lum->lum_stripe_count) <= 1)
861                 GOTO(out, rc = 0);
862
863         rc = lod_verify_md_striping(lod, lum);
864         if (rc != 0)
865                 GOTO(out, rc);
866
867         /* prepare dir striped objects */
868         rc = lod_prep_md_striped_create(env, dt, attr, lum, th);
869         if (rc != 0) {
870                 /* failed to create striping, let's reset
871                  * config so that others don't get confused */
872                 lod_object_free_striping(env, lo);
873                 GOTO(out, rc);
874         }
875 out:
876         RETURN(rc);
877 }
878
879 /*
880  * LOV xattr is a storage for striping, and LOD owns this xattr.
881  * but LOD allows others to control striping to some extent
882  * - to reset strping
883  * - to set new defined striping
884  * - to set new semi-defined striping
885  *   - number of stripes is defined
886  *   - number of stripes + osts are defined
887  *   - ??
888  */
889 static int lod_declare_xattr_set(const struct lu_env *env,
890                                  struct dt_object *dt,
891                                  const struct lu_buf *buf,
892                                  const char *name, int fl,
893                                  struct thandle *th)
894 {
895         struct dt_object *next = dt_object_child(dt);
896         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
897         __u32             mode;
898         int               rc;
899         ENTRY;
900
901         /*
902          * allow to declare predefined striping on a new (!mode) object
903          * which is supposed to be replay of regular file creation
904          * (when LOV setting is declared)
905          * LU_XATTR_REPLACE is set to indicate a layout swap
906          */
907         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
908         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
909              !(fl & LU_XATTR_REPLACE)) {
910                 /*
911                  * this is a request to manipulate object's striping
912                  */
913                 if (dt_object_exists(dt)) {
914                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
915                         if (rc)
916                                 RETURN(rc);
917                 } else {
918                         memset(attr, 0, sizeof(*attr));
919                         attr->la_valid = LA_TYPE | LA_MODE;
920                         attr->la_mode = S_IFREG;
921                 }
922                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
923         } else if (S_ISDIR(mode)) {
924                 struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
925                 struct lod_object       *lo = lod_dt_obj(dt);
926                 int                     i;
927
928                 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
929                         struct lmv_user_md_v1 *lum;
930
931                         LASSERT(buf != NULL && buf->lb_buf != NULL);
932                         lum = buf->lb_buf;
933                         rc = lod_verify_md_striping(d, lum);
934                         if (rc != 0)
935                                 RETURN(rc);
936                 }
937
938                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
939                 if (rc != 0)
940                         RETURN(rc);
941
942                 /* set xattr to each stripes, if needed */
943                 rc = lod_load_striping(env, lo);
944                 if (rc != 0)
945                         RETURN(rc);
946
947                 if (lo->ldo_stripenr == 0)
948                         RETURN(rc);
949
950                 for (i = 0; i < lo->ldo_stripenr; i++) {
951                         LASSERT(lo->ldo_stripe[i]);
952                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
953                                                   name, fl, th);
954                         if (rc != 0)
955                                 break;
956                 }
957         } else {
958                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
959         }
960
961         RETURN(rc);
962 }
963
964 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
965 {
966         lo->ldo_striping_cached = 0;
967         lo->ldo_def_striping_set = 0;
968         lod_object_set_pool(lo, NULL);
969         lo->ldo_def_stripe_size = 0;
970         lo->ldo_def_stripenr = 0;
971 }
972
973 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
974                                     struct dt_object *dt,
975                                     const struct lu_buf *buf,
976                                     const char *name, int fl,
977                                     struct thandle *th,
978                                     struct lustre_capa *capa)
979 {
980         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
981         struct dt_object        *next = dt_object_child(dt);
982         struct lod_object       *l = lod_dt_obj(dt);
983         struct lov_user_md_v1   *lum;
984         struct lov_user_md_v3   *v3 = NULL;
985         int                      rc;
986         ENTRY;
987
988         /* If it is striped dir, we should clear the stripe cache for
989          * slave stripe as well, but there are no effective way to
990          * notify the LOD on the slave MDT, so we do not cache stripe
991          * information for slave stripe for now. XXX*/
992         lod_lov_stripe_cache_clear(l);
993         LASSERT(buf != NULL && buf->lb_buf != NULL);
994         lum = buf->lb_buf;
995
996         rc = lod_verify_striping(d, buf, 0);
997         if (rc)
998                 RETURN(rc);
999
1000         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1001                 v3 = buf->lb_buf;
1002
1003         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1004          * (i.e. all default values specified) then delete default
1005          * striping from dir. */
1006         CDEBUG(D_OTHER,
1007                 "set default striping: sz %u # %u offset %d %s %s\n",
1008                 (unsigned)lum->lmm_stripe_size,
1009                 (unsigned)lum->lmm_stripe_count,
1010                 (int)lum->lmm_stripe_offset,
1011                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1012
1013         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1014                                 (lum->lmm_stripe_count),
1015                                 (lum->lmm_stripe_offset)) &&
1016                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1017                 rc = dt_xattr_del(env, next, name, th, capa);
1018                 if (rc == -ENODATA)
1019                         rc = 0;
1020         } else {
1021                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1022         }
1023
1024         RETURN(rc);
1025 }
1026
1027 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1028                                             struct dt_object *dt,
1029                                             const struct lu_buf *buf,
1030                                             const char *name, int fl,
1031                                             struct thandle *th,
1032                                             struct lustre_capa *capa)
1033 {
1034         struct dt_object        *next = dt_object_child(dt);
1035         struct lod_object       *l = lod_dt_obj(dt);
1036         struct lmv_user_md_v1   *lum;
1037         int                      rc;
1038         ENTRY;
1039
1040         LASSERT(buf != NULL && buf->lb_buf != NULL);
1041         lum = buf->lb_buf;
1042
1043         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1044               le32_to_cpu(lum->lum_stripe_count),
1045               (int)le32_to_cpu(lum->lum_stripe_offset));
1046
1047         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1048                                  le32_to_cpu(lum->lum_stripe_offset)) &&
1049                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1050                 rc = dt_xattr_del(env, next, name, th, capa);
1051                 if (rc == -ENODATA)
1052                         rc = 0;
1053         } else {
1054                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1055                 if (rc != 0)
1056                         RETURN(rc);
1057
1058                 /* Update default stripe cache */
1059                 if (l->ldo_dir_stripe == NULL) {
1060                         OBD_ALLOC_PTR(l->ldo_dir_stripe);
1061                         if (l->ldo_dir_stripe == NULL)
1062                                 RETURN(-ENOMEM);
1063                 }
1064
1065                 l->ldo_dir_striping_cached = 0;
1066                 l->ldo_dir_def_striping_set = 1;
1067                 l->ldo_dir_def_stripenr =
1068                         le32_to_cpu(lum->lum_stripe_count) - 1;
1069         }
1070
1071         RETURN(rc);
1072 }
1073
1074 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1075                              const struct lu_buf *buf, const char *name,
1076                              int fl, struct thandle *th,
1077                              struct lustre_capa *capa)
1078 {
1079         struct lod_object       *lo = lod_dt_obj(dt);
1080         struct lu_buf           lmv_buf;
1081         int                     i;
1082         int                     rc;
1083         ENTRY;
1084
1085         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1086                 RETURN(-ENOTDIR);
1087
1088         /* The stripes are supposed to be allocated in declare phase,
1089          * if there are no stripes being allocated, it will skip */
1090         if (lo->ldo_stripenr == 0)
1091                 RETURN(0);
1092
1093         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1094         if (rc != 0)
1095                 RETURN(rc);
1096
1097         for (i = 0; i < lo->ldo_stripenr; i++) {
1098                 struct dt_object *dto;
1099                 struct lu_attr  *attr = &lod_env_info(env)->lti_attr;
1100
1101                 dto = lo->ldo_stripe[i];
1102                 memset(attr, 0, sizeof(*attr));
1103                 attr->la_valid = LA_TYPE | LA_MODE;
1104                 attr->la_mode = S_IFDIR;
1105                 rc = dt_create(env, dto, attr, NULL, NULL, th);
1106                 if (rc != 0)
1107                         RETURN(rc);
1108
1109                 rc = dt_insert(env, dto,
1110                               (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1111                               (const struct dt_key *)dot, th, capa, 0);
1112                 if (rc != 0)
1113                         RETURN(rc);
1114
1115                 rc = dt_insert(env, dto,
1116                               (struct dt_rec *)lu_object_fid(&dt->do_lu),
1117                               (const struct dt_key *)dotdot, th, capa, 0);
1118                 if (rc != 0)
1119                         RETURN(rc);
1120
1121                 if (lo->ldo_striping_cached &&
1122                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1123                                          lo->ldo_def_stripenr,
1124                                          lo->ldo_def_stripe_offset)) {
1125                         struct lod_thread_info  *info;
1126                         struct lov_user_md_v3   *v3;
1127
1128                         /* sigh, lti_ea_store has been used for lmv_buf,
1129                          * so we have to allocate buffer for default
1130                          * stripe EA */
1131                         OBD_ALLOC_PTR(v3);
1132                         if (v3 == NULL)
1133                                 RETURN(-ENOMEM);
1134
1135                         memset(v3, 0, sizeof(*v3));
1136                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1137                         v3->lmm_stripe_count =
1138                                 cpu_to_le32(lo->ldo_def_stripenr);
1139                         v3->lmm_stripe_offset =
1140                                 cpu_to_le32(lo->ldo_def_stripe_offset);
1141                         v3->lmm_stripe_size =
1142                                 cpu_to_le32(lo->ldo_def_stripe_size);
1143                         if (lo->ldo_pool)
1144                                 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1145                                         LOV_MAXPOOLNAME);
1146
1147                         info = lod_env_info(env);
1148                         info->lti_buf.lb_buf = v3;
1149                         info->lti_buf.lb_len = sizeof(*v3);
1150                         rc = dt_xattr_set(env, dto, &info->lti_buf,
1151                                           XATTR_NAME_LOV, 0, th, capa);
1152                         OBD_FREE_PTR(v3);
1153                         if (rc != 0)
1154                                 RETURN(rc);
1155                 }
1156
1157                 rc = dt_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, fl, th,
1158                                   capa);
1159         }
1160
1161         rc = dt_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, fl, th, capa);
1162
1163         RETURN(rc);
1164 }
1165
1166 static int lod_xattr_set(const struct lu_env *env,
1167                          struct dt_object *dt, const struct lu_buf *buf,
1168                          const char *name, int fl, struct thandle *th,
1169                          struct lustre_capa *capa)
1170 {
1171         struct lod_object       *lo = lod_dt_obj(dt);
1172         struct dt_object        *next = dt_object_child(dt);
1173         __u32                    attr;
1174         int                      rc;
1175         int                     i;
1176         ENTRY;
1177
1178         attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
1179         if (S_ISDIR(attr) && strcmp(name, XATTR_NAME_LOV) == 0) {
1180                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
1181         } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
1182                 /* in case of lov EA swap, just set it
1183                  * if not, it is a replay so check striping match what we
1184                  * already have during req replay, declare_xattr_set()
1185                  * defines striping, then create() does the work
1186                 */
1187                 if (fl & LU_XATTR_REPLACE) {
1188                         /* free stripes, then update disk */
1189                         lod_object_free_striping(env, lod_dt_obj(dt));
1190                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1191                 } else {
1192                         rc = lod_striping_create(env, dt, NULL, NULL, th);
1193                 }
1194         } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1195                 if (!S_ISDIR(attr))
1196                         RETURN(-ENOTDIR);
1197                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
1198                                                       th, capa);
1199         } else {
1200                 /*
1201                  * behave transparantly for all other EAs
1202                  */
1203                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1204         }
1205
1206         if (rc != 0 || !S_ISDIR(attr))
1207                 RETURN(rc);
1208
1209         if (lo->ldo_stripenr == 0)
1210                 RETURN(rc);
1211
1212         for (i = 0; i < lo->ldo_stripenr; i++) {
1213                 LASSERT(lo->ldo_stripe[i]);
1214                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1215                                   capa);
1216                 if (rc != 0)
1217                         break;
1218         }
1219
1220         RETURN(rc);
1221 }
1222
1223 static int lod_declare_xattr_del(const struct lu_env *env,
1224                                  struct dt_object *dt, const char *name,
1225                                  struct thandle *th)
1226 {
1227         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
1228 }
1229
1230 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
1231                          const char *name, struct thandle *th,
1232                          struct lustre_capa *capa)
1233 {
1234         if (!strcmp(name, XATTR_NAME_LOV))
1235                 lod_object_free_striping(env, lod_dt_obj(dt));
1236         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
1237 }
1238
1239 static int lod_xattr_list(const struct lu_env *env,
1240                           struct dt_object *dt, struct lu_buf *buf,
1241                           struct lustre_capa *capa)
1242 {
1243         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
1244 }
1245
1246 int lod_object_set_pool(struct lod_object *o, char *pool)
1247 {
1248         int len;
1249
1250         if (o->ldo_pool) {
1251                 len = strlen(o->ldo_pool);
1252                 OBD_FREE(o->ldo_pool, len + 1);
1253                 o->ldo_pool = NULL;
1254         }
1255         if (pool) {
1256                 len = strlen(pool);
1257                 OBD_ALLOC(o->ldo_pool, len + 1);
1258                 if (o->ldo_pool == NULL)
1259                         return -ENOMEM;
1260                 strcpy(o->ldo_pool, pool);
1261         }
1262         return 0;
1263 }
1264
1265 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
1266 {
1267         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
1268 }
1269
1270
1271 static int lod_cache_parent_lov_striping(const struct lu_env *env,
1272                                          struct lod_object *lp)
1273 {
1274         struct lod_thread_info  *info = lod_env_info(env);
1275         struct lov_user_md_v1   *v1 = NULL;
1276         struct lov_user_md_v3   *v3 = NULL;
1277         int                      rc;
1278         ENTRY;
1279
1280         /* called from MDD without parent being write locked,
1281          * lock it here */
1282         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
1283         rc = lod_get_lov_ea(env, lp);
1284         if (rc < 0)
1285                 GOTO(unlock, rc);
1286
1287         if (rc < sizeof(struct lov_user_md)) {
1288                 /* don't lookup for non-existing or invalid striping */
1289                 lp->ldo_def_striping_set = 0;
1290                 lp->ldo_striping_cached = 1;
1291                 lp->ldo_def_stripe_size = 0;
1292                 lp->ldo_def_stripenr = 0;
1293                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
1294                 GOTO(unlock, rc = 0);
1295         }
1296
1297         rc = 0;
1298         v1 = info->lti_ea_store;
1299         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
1300                 lustre_swab_lov_user_md_v1(v1);
1301         else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
1302                 lustre_swab_lov_user_md_v3(v3);
1303
1304         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
1305                 GOTO(unlock, rc = 0);
1306
1307         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
1308                 GOTO(unlock, rc = 0);
1309
1310         lp->ldo_def_stripenr = v1->lmm_stripe_count;
1311         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
1312         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
1313         lp->ldo_striping_cached = 1;
1314         lp->ldo_def_striping_set = 1;
1315         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
1316                 /* XXX: sanity check here */
1317                 v3 = (struct lov_user_md_v3 *) v1;
1318                 if (v3->lmm_pool_name[0])
1319                         lod_object_set_pool(lp, v3->lmm_pool_name);
1320         }
1321         EXIT;
1322 unlock:
1323         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
1324         return rc;
1325 }
1326
1327
1328 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
1329                                          struct lod_object *lp)
1330 {
1331         struct lod_thread_info  *info = lod_env_info(env);
1332         struct lmv_user_md_v1   *v1 = NULL;
1333         int                      rc;
1334         ENTRY;
1335
1336         /* called from MDD without parent being write locked,
1337          * lock it here */
1338         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
1339         rc = lod_get_default_lmv_ea(env, lp);
1340         if (rc < 0)
1341                 GOTO(unlock, rc);
1342
1343         if (rc < sizeof(struct lmv_user_md)) {
1344                 /* don't lookup for non-existing or invalid striping */
1345                 lp->ldo_dir_def_striping_set = 0;
1346                 lp->ldo_dir_striping_cached = 1;
1347                 lp->ldo_dir_def_stripenr = 0;
1348                 lp->ldo_dir_def_stripe_offset =
1349                                         (typeof(v1->lum_stripe_offset))(-1);
1350                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
1351                 GOTO(unlock, rc = 0);
1352         }
1353
1354         rc = 0;
1355         v1 = info->lti_ea_store;
1356
1357         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count) - 1;
1358         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
1359         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
1360         lp->ldo_dir_def_striping_set = 1;
1361         lp->ldo_dir_striping_cached = 1;
1362
1363         EXIT;
1364 unlock:
1365         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
1366         return rc;
1367 }
1368
1369 static int lod_cache_parent_striping(const struct lu_env *env,
1370                                      struct lod_object *lp,
1371                                      umode_t child_mode)
1372 {
1373         int rc = 0;
1374         ENTRY;
1375
1376         rc = lod_load_striping(env, lp);
1377         if (rc != 0)
1378                 RETURN(rc);
1379
1380         if (!lp->ldo_striping_cached) {
1381                 /* we haven't tried to get default striping for
1382                  * the directory yet, let's cache it in the object */
1383                 rc = lod_cache_parent_lov_striping(env, lp);
1384                 if (rc != 0)
1385                         RETURN(rc);
1386         }
1387
1388         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
1389                 rc = lod_cache_parent_lmv_striping(env, lp);
1390
1391         RETURN(rc);
1392 }
1393
1394 /**
1395  * used to transfer default striping data to the object being created
1396  */
1397 static void lod_ah_init(const struct lu_env *env,
1398                         struct dt_allocation_hint *ah,
1399                         struct dt_object *parent,
1400                         struct dt_object *child,
1401                         umode_t child_mode)
1402 {
1403         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
1404         struct dt_object  *nextp = NULL;
1405         struct dt_object  *nextc;
1406         struct lod_object *lp = NULL;
1407         struct lod_object *lc;
1408         struct lov_desc   *desc;
1409         ENTRY;
1410
1411         LASSERT(child);
1412
1413         if (likely(parent)) {
1414                 nextp = dt_object_child(parent);
1415                 lp = lod_dt_obj(parent);
1416         }
1417
1418         nextc = dt_object_child(child);
1419         lc = lod_dt_obj(child);
1420
1421         LASSERT(lc->ldo_stripenr == 0);
1422         LASSERT(lc->ldo_stripe == NULL);
1423
1424         /*
1425          * local object may want some hints
1426          * in case of late striping creation, ->ah_init()
1427          * can be called with local object existing
1428          */
1429         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
1430                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
1431                                           NULL : nextp, nextc, child_mode);
1432
1433         if (S_ISDIR(child_mode)) {
1434                 int rc;
1435
1436                 if (lc->ldo_dir_stripe == NULL) {
1437                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
1438                         if (lc->ldo_dir_stripe == NULL)
1439                                 return;
1440                 }
1441
1442                 if (lp->ldo_dir_stripe == NULL) {
1443                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
1444                         if (lp->ldo_dir_stripe == NULL)
1445                                 return;
1446                 }
1447
1448                 rc = lod_cache_parent_striping(env, lp, child_mode);
1449                 if (rc != 0)
1450                         return;
1451
1452                 /* transfer defaults to new directory */
1453                 if (lp->ldo_striping_cached) {
1454                         if (lp->ldo_pool)
1455                                 lod_object_set_pool(lc, lp->ldo_pool);
1456                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
1457                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
1458                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
1459                         lc->ldo_striping_cached = 1;
1460                         lc->ldo_def_striping_set = 1;
1461                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
1462                                (int)lc->ldo_def_stripe_size,
1463                                (int)lc->ldo_def_stripe_offset,
1464                                (int)lc->ldo_def_stripenr);
1465                 }
1466
1467                 /* transfer dir defaults to new directory */
1468                 if (lp->ldo_dir_striping_cached) {
1469                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
1470                         lc->ldo_dir_def_stripe_offset =
1471                                                   lp->ldo_dir_def_stripe_offset;
1472                         lc->ldo_dir_def_hash_type =
1473                                                   lp->ldo_dir_def_hash_type;
1474                         lc->ldo_dir_striping_cached = 1;
1475                         lc->ldo_dir_def_striping_set = 1;
1476                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
1477                                (int)lc->ldo_dir_def_stripenr,
1478                                (int)lc->ldo_dir_def_stripe_offset,
1479                                lc->ldo_dir_def_hash_type);
1480                 }
1481
1482                 /* If the directory is specified with certain stripes */
1483                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
1484                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
1485                         int rc;
1486
1487                         rc = lod_verify_md_striping(d, lum1);
1488                         if (rc == 0 &&
1489                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
1490                                 /* Directory will be striped only if
1491                                  * stripe_count > 1 */
1492                                 lc->ldo_stripenr =
1493                                         le32_to_cpu(lum1->lum_stripe_count) - 1;
1494                                 lc->ldo_dir_stripe_offset =
1495                                         le32_to_cpu(lum1->lum_stripe_offset);
1496                                 lc->ldo_dir_hash_type =
1497                                         le32_to_cpu(lum1->lum_hash_type);
1498                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
1499                                        lc->ldo_stripenr,
1500                                        (int)lc->ldo_dir_stripe_offset);
1501                         }
1502                 } else if (lp->ldo_dir_def_striping_set) {
1503                         /* If there are default dir stripe from parent */
1504                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
1505                         lc->ldo_dir_stripe_offset =
1506                                         lp->ldo_dir_def_stripe_offset;
1507                         lc->ldo_dir_hash_type =
1508                                         lp->ldo_dir_def_hash_type;
1509                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
1510                                lc->ldo_stripenr,
1511                                (int)lc->ldo_dir_stripe_offset);
1512                 } else {
1513                         /* set default stripe for this directory */
1514                         lc->ldo_stripenr = 0;
1515                         lc->ldo_dir_stripe_offset = -1;
1516                 }
1517
1518                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
1519                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
1520
1521                 goto out;
1522         }
1523
1524         /*
1525          * if object is going to be striped over OSTs, transfer default
1526          * striping information to the child, so that we can use it
1527          * during declaration and creation
1528          */
1529         if (!lod_object_will_be_striped(S_ISREG(child_mode),
1530                                         lu_object_fid(&child->do_lu)))
1531                 goto out;
1532         /*
1533          * try from the parent
1534          */
1535         if (likely(parent)) {
1536                 lod_cache_parent_striping(env, lp, child_mode);
1537
1538                 lc->ldo_def_stripe_offset = (__u16) -1;
1539
1540                 if (lp->ldo_def_striping_set) {
1541                         if (lp->ldo_pool)
1542                                 lod_object_set_pool(lc, lp->ldo_pool);
1543                         lc->ldo_stripenr = lp->ldo_def_stripenr;
1544                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
1545                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
1546                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
1547                                lc->ldo_stripenr, lc->ldo_stripe_size,
1548                                lp->ldo_pool ? lp->ldo_pool : "");
1549                 }
1550         }
1551
1552         /*
1553          * if the parent doesn't provide with specific pattern, grab fs-wide one
1554          */
1555         desc = &d->lod_desc;
1556         if (lc->ldo_stripenr == 0)
1557                 lc->ldo_stripenr = desc->ld_default_stripe_count;
1558         if (lc->ldo_stripe_size == 0)
1559                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
1560         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
1561                lc->ldo_stripenr, lc->ldo_stripe_size,
1562                lc->ldo_pool ? lc->ldo_pool : "");
1563
1564 out:
1565         /* we do not cache stripe information for slave stripe, see
1566          * lod_xattr_set_lov_on_dir */
1567         if (lp != NULL && lp->ldo_dir_slave_stripe)
1568                 lod_lov_stripe_cache_clear(lp);
1569
1570         EXIT;
1571 }
1572
1573 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
1574 /*
1575  * this function handles a special case when truncate was done
1576  * on a stripeless object and now striping is being created
1577  * we can't lose that size, so we have to propagate it to newly
1578  * created object
1579  */
1580 static int lod_declare_init_size(const struct lu_env *env,
1581                                  struct dt_object *dt, struct thandle *th)
1582 {
1583         struct dt_object   *next = dt_object_child(dt);
1584         struct lod_object  *lo = lod_dt_obj(dt);
1585         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
1586         uint64_t            size, offs;
1587         int                 rc, stripe;
1588         ENTRY;
1589
1590         /* XXX: we support the simplest (RAID0) striping so far */
1591         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
1592         LASSERT(lo->ldo_stripe_size > 0);
1593
1594         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1595         LASSERT(attr->la_valid & LA_SIZE);
1596         if (rc)
1597                 RETURN(rc);
1598
1599         size = attr->la_size;
1600         if (size == 0)
1601                 RETURN(0);
1602
1603         /* ll_do_div64(a, b) returns a % b, and a = a / b */
1604         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
1605         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
1606
1607         size = size * lo->ldo_stripe_size;
1608         offs = attr->la_size;
1609         size += ll_do_div64(offs, lo->ldo_stripe_size);
1610
1611         attr->la_valid = LA_SIZE;
1612         attr->la_size = size;
1613
1614         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
1615
1616         RETURN(rc);
1617 }
1618
1619 /**
1620  * Create declaration of striped object
1621  */
1622 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
1623                                struct lu_attr *attr,
1624                                const struct lu_buf *lovea, struct thandle *th)
1625 {
1626         struct lod_thread_info  *info = lod_env_info(env);
1627         struct dt_object        *next = dt_object_child(dt);
1628         struct lod_object       *lo = lod_dt_obj(dt);
1629         int                      rc;
1630         ENTRY;
1631
1632         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
1633                 /* failed to create striping, let's reset
1634                  * config so that others don't get confused */
1635                 lod_object_free_striping(env, lo);
1636                 GOTO(out, rc = -ENOMEM);
1637         }
1638
1639         /* choose OST and generate appropriate objects */
1640         rc = lod_qos_prep_create(env, lo, attr, lovea, th);
1641         if (rc) {
1642                 /* failed to create striping, let's reset
1643                  * config so that others don't get confused */
1644                 lod_object_free_striping(env, lo);
1645                 GOTO(out, rc);
1646         }
1647
1648         /*
1649          * declare storage for striping data
1650          */
1651         info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
1652                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
1653         rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
1654                                   0, th);
1655         if (rc)
1656                 GOTO(out, rc);
1657
1658         /*
1659          * if striping is created with local object's size > 0,
1660          * we have to propagate this size to specific object
1661          * the case is possible only when local object was created previously
1662          */
1663         if (dt_object_exists(next))
1664                 rc = lod_declare_init_size(env, dt, th);
1665
1666 out:
1667         RETURN(rc);
1668 }
1669
1670 int lod_dir_striping_create_internal(const struct lu_env *env,
1671                                      struct dt_object *dt,
1672                                      struct lu_attr *attr,
1673                                      const struct dt_object_format *dof,
1674                                      struct thandle *th,
1675                                      bool declare)
1676 {
1677         struct lod_thread_info  *info = lod_env_info(env);
1678         struct dt_object        *next = dt_object_child(dt);
1679         struct lod_object       *lo = lod_dt_obj(dt);
1680         int                     rc;
1681         ENTRY;
1682
1683         if (lo->ldo_dir_def_striping_set &&
1684             !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1685                                  lo->ldo_dir_stripe_offset)) {
1686                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1687                 int stripe_count = lo->ldo_stripenr + 1;
1688
1689                 if (info->lti_ea_store_size < sizeof(*v1)) {
1690                         rc = lod_ea_store_resize(info, sizeof(*v1));
1691                         if (rc != 0)
1692                                 RETURN(rc);
1693                         v1 = info->lti_ea_store;
1694                 }
1695
1696                 memset(v1, 0, sizeof(*v1));
1697                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
1698                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
1699                 v1->lum_stripe_offset =
1700                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
1701
1702                 info->lti_buf.lb_buf = v1;
1703                 info->lti_buf.lb_len = sizeof(*v1);
1704
1705                 if (declare)
1706                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
1707                                                        &info->lti_buf, th);
1708                 else
1709                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
1710                                                XATTR_NAME_LMV, 0, th,
1711                                                BYPASS_CAPA);
1712                 if (rc != 0)
1713                         RETURN(rc);
1714         }
1715
1716         /* Transfer default LMV striping from the parent */
1717         if (lo->ldo_dir_striping_cached &&
1718             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
1719                                  lo->ldo_dir_def_stripe_offset)) {
1720                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1721                 int def_stripe_count = lo->ldo_dir_def_stripenr + 1;
1722
1723                 if (info->lti_ea_store_size < sizeof(*v1)) {
1724                         rc = lod_ea_store_resize(info, sizeof(*v1));
1725                         if (rc != 0)
1726                                 RETURN(rc);
1727                         v1 = info->lti_ea_store;
1728                 }
1729
1730                 memset(v1, 0, sizeof(*v1));
1731                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
1732                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
1733                 v1->lum_stripe_offset =
1734                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
1735                 v1->lum_hash_type =
1736                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
1737
1738                 info->lti_buf.lb_buf = v1;
1739                 info->lti_buf.lb_len = sizeof(*v1);
1740                 if (declare)
1741                         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
1742                                                   XATTR_NAME_DEFAULT_LMV, 0,
1743                                                   th);
1744                 else
1745                         rc = dt_xattr_set(env, next, &info->lti_buf,
1746                                            XATTR_NAME_DEFAULT_LMV, 0, th,
1747                                            BYPASS_CAPA);
1748                 if (rc != 0)
1749                         RETURN(rc);
1750         }
1751
1752         /* Transfer default LOV striping from the parent */
1753         if (lo->ldo_striping_cached &&
1754             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1755                                  lo->ldo_def_stripenr,
1756                                  lo->ldo_def_stripe_offset)) {
1757                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
1758
1759                 if (info->lti_ea_store_size < sizeof(*v3)) {
1760                         rc = lod_ea_store_resize(info, sizeof(*v3));
1761                         if (rc != 0)
1762                                 RETURN(rc);
1763                         v3 = info->lti_ea_store;
1764                 }
1765
1766                 memset(v3, 0, sizeof(*v3));
1767                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1768                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
1769                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
1770                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
1771                 if (lo->ldo_pool)
1772                         strncpy(v3->lmm_pool_name, lo->ldo_pool,
1773                                 LOV_MAXPOOLNAME);
1774
1775                 info->lti_buf.lb_buf = v3;
1776                 info->lti_buf.lb_len = sizeof(*v3);
1777
1778                 if (declare)
1779                         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
1780                                                   XATTR_NAME_LOV, 0, th);
1781                 else
1782                         rc = dt_xattr_set(env, next, &info->lti_buf,
1783                                           XATTR_NAME_LOV, 0, th,
1784                                           BYPASS_CAPA);
1785                 if (rc != 0)
1786                         RETURN(rc);
1787         }
1788
1789         RETURN(0);
1790 }
1791
1792 static int lod_declare_dir_striping_create(const struct lu_env *env,
1793                                            struct dt_object *dt,
1794                                            struct lu_attr *attr,
1795                                            struct dt_object_format *dof,
1796                                            struct thandle *th)
1797 {
1798         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
1799 }
1800
1801 static int lod_dir_striping_create(const struct lu_env *env,
1802                                    struct dt_object *dt,
1803                                    struct lu_attr *attr,
1804                                    struct dt_object_format *dof,
1805                                    struct thandle *th)
1806 {
1807         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
1808 }
1809
1810 static int lod_declare_object_create(const struct lu_env *env,
1811                                      struct dt_object *dt,
1812                                      struct lu_attr *attr,
1813                                      struct dt_allocation_hint *hint,
1814                                      struct dt_object_format *dof,
1815                                      struct thandle *th)
1816 {
1817         struct dt_object   *next = dt_object_child(dt);
1818         struct lod_object  *lo = lod_dt_obj(dt);
1819         int                 rc;
1820         ENTRY;
1821
1822         LASSERT(dof);
1823         LASSERT(attr);
1824         LASSERT(th);
1825
1826         /*
1827          * first of all, we declare creation of local object
1828          */
1829         rc = dt_declare_create(env, next, attr, hint, dof, th);
1830         if (rc)
1831                 GOTO(out, rc);
1832
1833         if (dof->dof_type == DFT_SYM)
1834                 dt->do_body_ops = &lod_body_lnk_ops;
1835
1836         /*
1837          * it's lod_ah_init() who has decided the object will striped
1838          */
1839         if (dof->dof_type == DFT_REGULAR) {
1840                 /* callers don't want stripes */
1841                 /* XXX: all tricky interactions with ->ah_make_hint() decided
1842                  * to use striping, then ->declare_create() behaving differently
1843                  * should be cleaned */
1844                 if (dof->u.dof_reg.striped == 0)
1845                         lo->ldo_stripenr = 0;
1846                 if (lo->ldo_stripenr > 0)
1847                         rc = lod_declare_striped_object(env, dt, attr,
1848                                                         NULL, th);
1849         } else if (dof->dof_type == DFT_DIR) {
1850                 rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
1851         }
1852 out:
1853         RETURN(rc);
1854 }
1855
1856 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
1857                         struct lu_attr *attr, struct dt_object_format *dof,
1858                         struct thandle *th)
1859 {
1860         struct lod_object *lo = lod_dt_obj(dt);
1861         int                rc = 0, i;
1862         ENTRY;
1863
1864         LASSERT(lo->ldo_striping_cached == 0);
1865
1866         /* create all underlying objects */
1867         for (i = 0; i < lo->ldo_stripenr; i++) {
1868                 LASSERT(lo->ldo_stripe[i]);
1869                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
1870
1871                 if (rc)
1872                         break;
1873         }
1874         if (rc == 0)
1875                 rc = lod_generate_and_set_lovea(env, lo, th);
1876
1877         RETURN(rc);
1878 }
1879
1880 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
1881                              struct lu_attr *attr,
1882                              struct dt_allocation_hint *hint,
1883                              struct dt_object_format *dof, struct thandle *th)
1884 {
1885         struct dt_object   *next = dt_object_child(dt);
1886         struct lod_object  *lo = lod_dt_obj(dt);
1887         int                 rc;
1888         ENTRY;
1889
1890         /* create local object */
1891         rc = dt_create(env, next, attr, hint, dof, th);
1892
1893         if (rc == 0) {
1894                 if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
1895                         rc = lod_dir_striping_create(env, dt, attr, dof, th);
1896                 else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0)
1897                         rc = lod_striping_create(env, dt, attr, dof, th);
1898         }
1899
1900         RETURN(rc);
1901 }
1902
1903 static int lod_declare_object_destroy(const struct lu_env *env,
1904                                       struct dt_object *dt,
1905                                       struct thandle *th)
1906 {
1907         struct dt_object   *next = dt_object_child(dt);
1908         struct lod_object  *lo = lod_dt_obj(dt);
1909         int                 rc, i;
1910         ENTRY;
1911
1912         /*
1913          * we declare destroy for the local object
1914          */
1915         rc = dt_declare_destroy(env, next, th);
1916         if (rc)
1917                 RETURN(rc);
1918
1919         /*
1920          * load striping information, notice we don't do this when object
1921          * is being initialized as we don't need this information till
1922          * few specific cases like destroy, chown
1923          */
1924         rc = lod_load_striping(env, lo);
1925         if (rc)
1926                 RETURN(rc);
1927
1928         /* declare destroy for all underlying objects */
1929         for (i = 0; i < lo->ldo_stripenr; i++) {
1930                 LASSERT(lo->ldo_stripe[i]);
1931                 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
1932
1933                 if (rc)
1934                         break;
1935         }
1936
1937         RETURN(rc);
1938 }
1939
1940 static int lod_object_destroy(const struct lu_env *env,
1941                 struct dt_object *dt, struct thandle *th)
1942 {
1943         struct dt_object  *next = dt_object_child(dt);
1944         struct lod_object *lo = lod_dt_obj(dt);
1945         int                rc, i;
1946         ENTRY;
1947
1948         /* destroy local object */
1949         rc = dt_destroy(env, next, th);
1950         if (rc)
1951                 RETURN(rc);
1952
1953         /* destroy all underlying objects */
1954         for (i = 0; i < lo->ldo_stripenr; i++) {
1955                 LASSERT(lo->ldo_stripe[i]);
1956                 /* for striped directory, next == ldo_stripe[0] */
1957                 if (next != lo->ldo_stripe[i]) {
1958                         rc = dt_destroy(env, lo->ldo_stripe[i], th);
1959                         if (rc)
1960                                 break;
1961                 }
1962         }
1963
1964         RETURN(rc);
1965 }
1966
1967 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
1968                          const struct dt_index_features *feat)
1969 {
1970         struct dt_object *next = dt_object_child(dt);
1971         int               rc;
1972         ENTRY;
1973
1974         LASSERT(next->do_ops);
1975         LASSERT(next->do_ops->do_index_try);
1976
1977         rc = next->do_ops->do_index_try(env, next, feat);
1978         if (next->do_index_ops && dt->do_index_ops == NULL)
1979                 dt->do_index_ops = &lod_index_ops;
1980
1981         RETURN(rc);
1982 }
1983
1984 static int lod_declare_ref_add(const struct lu_env *env,
1985                                struct dt_object *dt, struct thandle *th)
1986 {
1987         return dt_declare_ref_add(env, dt_object_child(dt), th);
1988 }
1989
1990 static int lod_ref_add(const struct lu_env *env,
1991                        struct dt_object *dt, struct thandle *th)
1992 {
1993         return dt_ref_add(env, dt_object_child(dt), th);
1994 }
1995
1996 static int lod_declare_ref_del(const struct lu_env *env,
1997                                struct dt_object *dt, struct thandle *th)
1998 {
1999         return dt_declare_ref_del(env, dt_object_child(dt), th);
2000 }
2001
2002 static int lod_ref_del(const struct lu_env *env,
2003                        struct dt_object *dt, struct thandle *th)
2004 {
2005         return dt_ref_del(env, dt_object_child(dt), th);
2006 }
2007
2008 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2009                                      struct dt_object *dt,
2010                                      struct lustre_capa *old, __u64 opc)
2011 {
2012         return dt_capa_get(env, dt_object_child(dt), old, opc);
2013 }
2014
2015 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
2016 {
2017         return dt_object_sync(env, dt_object_child(dt));
2018 }
2019
2020 struct lod_slave_locks  {
2021         int                     lsl_lock_count;
2022         struct lustre_handle    lsl_handle[0];
2023 };
2024
2025 static int lod_object_unlock_internal(const struct lu_env *env,
2026                                       struct dt_object *dt,
2027                                       struct ldlm_enqueue_info *einfo,
2028                                       ldlm_policy_data_t *policy)
2029 {
2030         struct lod_object       *lo = lod_dt_obj(dt);
2031         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2032         int                     rc = 0;
2033         int                     i;
2034         ENTRY;
2035
2036         if (slave_locks == NULL)
2037                 RETURN(0);
2038
2039         for (i = 0; i < slave_locks->lsl_lock_count; i++) {
2040                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2041                         int     rc1;
2042
2043                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2044                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2045                                                policy);
2046                         if (rc1 < 0)
2047                                 rc = rc == 0 ? rc1 : rc;
2048                 }
2049         }
2050
2051         RETURN(rc);
2052 }
2053
2054 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2055                              struct ldlm_enqueue_info *einfo,
2056                              union ldlm_policy_data *policy)
2057 {
2058         struct lod_object       *lo = lod_dt_obj(dt);
2059         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2060         int                     slave_locks_size;
2061         int                     rc;
2062         ENTRY;
2063
2064         if (slave_locks == NULL)
2065                 RETURN(0);
2066
2067         rc = lod_load_striping(env, lo);
2068         if (rc != 0)
2069                 RETURN(rc);
2070
2071         /* Note: for remote lock for single stripe dir, MDT will cancel
2072          * the lock by lockh directly */
2073         if (lo->ldo_stripenr == 0 && dt_object_remote(dt_object_child(dt)))
2074                 RETURN(0);
2075
2076         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2077                 RETURN(-ENOTDIR);
2078
2079         /* Only cancel slave lock for striped dir */
2080         rc = lod_object_unlock_internal(env, dt, einfo, policy);
2081
2082         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2083                            sizeof(slave_locks->lsl_handle[0]);
2084         OBD_FREE(slave_locks, slave_locks_size);
2085         einfo->ei_cbdata = NULL;
2086
2087         RETURN(rc);
2088 }
2089
2090 static int lod_object_lock(const struct lu_env *env,
2091                            struct dt_object *dt,
2092                            struct lustre_handle *lh,
2093                            struct ldlm_enqueue_info *einfo,
2094                            union ldlm_policy_data *policy)
2095 {
2096         struct lod_object       *lo = lod_dt_obj(dt);
2097         int                     rc = 0;
2098         int                     i;
2099         int                     slave_locks_size;
2100         struct lod_slave_locks  *slave_locks = NULL;
2101         ENTRY;
2102
2103         /* remote object lock */
2104         if (!einfo->ei_enq_slave) {
2105                 LASSERT(dt_object_remote(dt));
2106                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2107                                       policy);
2108         }
2109
2110         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2111                 RETURN(-ENOTDIR);
2112
2113         rc = lod_load_striping(env, lo);
2114         if (rc != 0)
2115                 RETURN(rc);
2116
2117         /* No stripes */
2118         if (lo->ldo_stripenr == 0)
2119                 RETURN(0);
2120
2121         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2122                            sizeof(slave_locks->lsl_handle[0]);
2123         /* Freed in lod_object_unlock */
2124         OBD_ALLOC(slave_locks, slave_locks_size);
2125         if (slave_locks == NULL)
2126                 RETURN(-ENOMEM);
2127         slave_locks->lsl_lock_count = lo->ldo_stripenr;
2128
2129         /* striped directory lock */
2130         for (i = 0; i < lo->ldo_stripenr; i++) {
2131                 struct lustre_handle    lockh;
2132
2133                 LASSERT(lo->ldo_stripe[i]);
2134                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
2135                                     policy);
2136                 if (rc != 0)
2137                         GOTO(out, rc);
2138
2139                 slave_locks->lsl_handle[i] = lockh;
2140         }
2141
2142         einfo->ei_cbdata = slave_locks;
2143
2144 out:
2145         if (rc != 0 && slave_locks != NULL) {
2146                 einfo->ei_cbdata = slave_locks;
2147                 lod_object_unlock_internal(env, dt, einfo, policy);
2148                 OBD_FREE(slave_locks, slave_locks_size);
2149                 einfo->ei_cbdata = NULL;
2150         }
2151
2152         RETURN(rc);
2153 }
2154
2155 struct dt_object_operations lod_obj_ops = {
2156         .do_read_lock           = lod_object_read_lock,
2157         .do_write_lock          = lod_object_write_lock,
2158         .do_read_unlock         = lod_object_read_unlock,
2159         .do_write_unlock        = lod_object_write_unlock,
2160         .do_write_locked        = lod_object_write_locked,
2161         .do_attr_get            = lod_attr_get,
2162         .do_declare_attr_set    = lod_declare_attr_set,
2163         .do_attr_set            = lod_attr_set,
2164         .do_xattr_get           = lod_xattr_get,
2165         .do_declare_xattr_set   = lod_declare_xattr_set,
2166         .do_xattr_set           = lod_xattr_set,
2167         .do_declare_xattr_del   = lod_declare_xattr_del,
2168         .do_xattr_del           = lod_xattr_del,
2169         .do_xattr_list          = lod_xattr_list,
2170         .do_ah_init             = lod_ah_init,
2171         .do_declare_create      = lod_declare_object_create,
2172         .do_create              = lod_object_create,
2173         .do_declare_destroy     = lod_declare_object_destroy,
2174         .do_destroy             = lod_object_destroy,
2175         .do_index_try           = lod_index_try,
2176         .do_declare_ref_add     = lod_declare_ref_add,
2177         .do_ref_add             = lod_ref_add,
2178         .do_declare_ref_del     = lod_declare_ref_del,
2179         .do_ref_del             = lod_ref_del,
2180         .do_capa_get            = lod_capa_get,
2181         .do_object_sync         = lod_object_sync,
2182         .do_object_lock         = lod_object_lock,
2183         .do_object_unlock       = lod_object_unlock,
2184 };
2185
2186 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
2187                         struct lu_buf *buf, loff_t *pos,
2188                         struct lustre_capa *capa)
2189 {
2190         struct dt_object *next = dt_object_child(dt);
2191         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
2192 }
2193
2194 static ssize_t lod_declare_write(const struct lu_env *env,
2195                                  struct dt_object *dt,
2196                                  const loff_t size, loff_t pos,
2197                                  struct thandle *th)
2198 {
2199         return dt_declare_record_write(env, dt_object_child(dt),
2200                                        size, pos, th);
2201 }
2202
2203 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
2204                          const struct lu_buf *buf, loff_t *pos,
2205                          struct thandle *th, struct lustre_capa *capa, int iq)
2206 {
2207         struct dt_object *next = dt_object_child(dt);
2208         LASSERT(next);
2209         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
2210 }
2211
2212 static const struct dt_body_operations lod_body_lnk_ops = {
2213         .dbo_read               = lod_read,
2214         .dbo_declare_write      = lod_declare_write,
2215         .dbo_write              = lod_write
2216 };
2217
2218 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
2219                            const struct lu_object_conf *conf)
2220 {
2221         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
2222         struct lu_device        *cdev   = NULL;
2223         struct lu_object        *cobj;
2224         struct lod_tgt_descs    *ltd    = NULL;
2225         struct lod_tgt_desc     *tgt;
2226         mdsno_t                  idx    = 0;
2227         int                      type   = LU_SEQ_RANGE_ANY;
2228         int                      rc;
2229         ENTRY;
2230
2231         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
2232         if (rc != 0)
2233                 RETURN(rc);
2234
2235         if (type == LU_SEQ_RANGE_MDT &&
2236             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
2237                 cdev = &lod->lod_child->dd_lu_dev;
2238         } else if (type == LU_SEQ_RANGE_MDT) {
2239                 ltd = &lod->lod_mdt_descs;
2240                 lod_getref(ltd);
2241         } else if (type == LU_SEQ_RANGE_OST) {
2242                 ltd = &lod->lod_ost_descs;
2243                 lod_getref(ltd);
2244         } else {
2245                 LBUG();
2246         }
2247
2248         if (ltd != NULL) {
2249                 if (ltd->ltd_tgts_size > idx &&
2250                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
2251                         tgt = LTD_TGT(ltd, idx);
2252
2253                         LASSERT(tgt != NULL);
2254                         LASSERT(tgt->ltd_tgt != NULL);
2255
2256                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
2257                 }
2258                 lod_putref(lod, ltd);
2259         }
2260
2261         if (unlikely(cdev == NULL))
2262                 RETURN(-ENOENT);
2263
2264         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
2265         if (unlikely(cobj == NULL))
2266                 RETURN(-ENOMEM);
2267
2268         lu_object_add(lo, cobj);
2269
2270         RETURN(0);
2271 }
2272
2273 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
2274 {
2275         int i;
2276
2277         if (lo->ldo_dir_stripe != NULL) {
2278                 OBD_FREE_PTR(lo->ldo_dir_stripe);
2279                 lo->ldo_dir_stripe = NULL;
2280         }
2281
2282         if (lo->ldo_stripe) {
2283                 LASSERT(lo->ldo_stripes_allocated > 0);
2284
2285                 for (i = 0; i < lo->ldo_stripenr; i++) {
2286                         if (lo->ldo_stripe[i])
2287                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
2288                 }
2289
2290                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
2291                 OBD_FREE(lo->ldo_stripe, i);
2292                 lo->ldo_stripe = NULL;
2293                 lo->ldo_stripes_allocated = 0;
2294         }
2295         lo->ldo_stripenr = 0;
2296         lo->ldo_pattern = 0;
2297 }
2298
2299 /*
2300  * ->start is called once all slices are initialized, including header's
2301  * cache for mode (object type). using the type we can initialize ops
2302  */
2303 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
2304 {
2305         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
2306                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
2307         return 0;
2308 }
2309
2310 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
2311 {
2312         struct lod_object *mo = lu2lod_obj(o);
2313
2314         /*
2315          * release all underlying object pinned
2316          */
2317
2318         lod_object_free_striping(env, mo);
2319
2320         lod_object_set_pool(mo, NULL);
2321
2322         lu_object_fini(o);
2323         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
2324 }
2325
2326 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
2327 {
2328         /* XXX: shouldn't we release everything here in case if object
2329          * creation failed before? */
2330 }
2331
2332 static int lod_object_print(const struct lu_env *env, void *cookie,
2333                             lu_printer_t p, const struct lu_object *l)
2334 {
2335         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
2336
2337         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
2338 }
2339
2340 struct lu_object_operations lod_lu_obj_ops = {
2341         .loo_object_init        = lod_object_init,
2342         .loo_object_start       = lod_object_start,
2343         .loo_object_free        = lod_object_free,
2344         .loo_object_release     = lod_object_release,
2345         .loo_object_print       = lod_object_print,
2346 };