Whamcloud - gitweb
LU-4753 endianness: Fix cpu_to_leXX size mismatch
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
46 #include <md_object.h>
47
48 #include "lod_internal.h"
49
50 static const char dot[] = ".";
51 static const char dotdot[] = "..";
52
53 extern struct kmem_cache *lod_object_kmem;
54 static const struct dt_body_operations lod_body_lnk_ops;
55
56 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
57                             struct dt_rec *rec, const struct dt_key *key,
58                             struct lustre_capa *capa)
59 {
60         struct dt_object *next = dt_object_child(dt);
61         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
62 }
63
64 static int lod_declare_index_insert(const struct lu_env *env,
65                                     struct dt_object *dt,
66                                     const struct dt_rec *rec,
67                                     const struct dt_key *key,
68                                     struct thandle *handle)
69 {
70         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
71 }
72
73 static int lod_index_insert(const struct lu_env *env,
74                             struct dt_object *dt,
75                             const struct dt_rec *rec,
76                             const struct dt_key *key,
77                             struct thandle *th,
78                             struct lustre_capa *capa,
79                             int ign)
80 {
81         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
82 }
83
84 static int lod_declare_index_delete(const struct lu_env *env,
85                                     struct dt_object *dt,
86                                     const struct dt_key *key,
87                                     struct thandle *th)
88 {
89         return dt_declare_delete(env, dt_object_child(dt), key, th);
90 }
91
92 static int lod_index_delete(const struct lu_env *env,
93                             struct dt_object *dt,
94                             const struct dt_key *key,
95                             struct thandle *th,
96                             struct lustre_capa *capa)
97 {
98         return dt_delete(env, dt_object_child(dt), key, th, capa);
99 }
100
101 static struct dt_it *lod_it_init(const struct lu_env *env,
102                                  struct dt_object *dt, __u32 attr,
103                                  struct lustre_capa *capa)
104 {
105         struct dt_object        *next = dt_object_child(dt);
106         struct lod_it           *it = &lod_env_info(env)->lti_it;
107         struct dt_it            *it_next;
108
109
110         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
111         if (IS_ERR(it_next))
112                 return it_next;
113
114         /* currently we do not use more than one iterator per thread
115          * so we store it in thread info. if at some point we need
116          * more active iterators in a single thread, we can allocate
117          * additional ones */
118         LASSERT(it->lit_obj == NULL);
119
120         it->lit_it = it_next;
121         it->lit_obj = next;
122
123         return (struct dt_it *)it;
124 }
125
126 #define LOD_CHECK_IT(env, it)                                   \
127 {                                                               \
128         LASSERT((it)->lit_obj != NULL);                         \
129         LASSERT((it)->lit_it != NULL);                          \
130 } while(0)
131
132 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
133 {
134         struct lod_it *it = (struct lod_it *)di;
135
136         LOD_CHECK_IT(env, it);
137         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
138
139         /* the iterator not in use any more */
140         it->lit_obj = NULL;
141         it->lit_it = NULL;
142 }
143
144 int lod_it_get(const struct lu_env *env, struct dt_it *di,
145                const struct dt_key *key)
146 {
147         const struct lod_it *it = (const struct lod_it *)di;
148
149         LOD_CHECK_IT(env, it);
150         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
151 }
152
153 void lod_it_put(const struct lu_env *env, struct dt_it *di)
154 {
155         struct lod_it *it = (struct lod_it *)di;
156
157         LOD_CHECK_IT(env, it);
158         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
159 }
160
161 int lod_it_next(const struct lu_env *env, struct dt_it *di)
162 {
163         struct lod_it *it = (struct lod_it *)di;
164
165         LOD_CHECK_IT(env, it);
166         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
167 }
168
169 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
170 {
171         const struct lod_it *it = (const struct lod_it *)di;
172
173         LOD_CHECK_IT(env, it);
174         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
175 }
176
177 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
178 {
179         struct lod_it *it = (struct lod_it *)di;
180
181         LOD_CHECK_IT(env, it);
182         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
183 }
184
185 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
186                struct dt_rec *rec, __u32 attr)
187 {
188         const struct lod_it *it = (const struct lod_it *)di;
189
190         LOD_CHECK_IT(env, it);
191         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
192 }
193
194 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
195 {
196         const struct lod_it *it = (const struct lod_it *)di;
197
198         LOD_CHECK_IT(env, it);
199         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
200 }
201
202 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
203 {
204         const struct lod_it *it = (const struct lod_it *)di;
205
206         LOD_CHECK_IT(env, it);
207         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
208 }
209
210 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
211                    void* key_rec)
212 {
213         const struct lod_it *it = (const struct lod_it *)di;
214
215         LOD_CHECK_IT(env, it);
216         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec);
217 }
218
219 static struct dt_index_operations lod_index_ops = {
220         .dio_lookup             = lod_index_lookup,
221         .dio_declare_insert     = lod_declare_index_insert,
222         .dio_insert             = lod_index_insert,
223         .dio_declare_delete     = lod_declare_index_delete,
224         .dio_delete             = lod_index_delete,
225         .dio_it = {
226                 .init           = lod_it_init,
227                 .fini           = lod_it_fini,
228                 .get            = lod_it_get,
229                 .put            = lod_it_put,
230                 .next           = lod_it_next,
231                 .key            = lod_it_key,
232                 .key_size       = lod_it_key_size,
233                 .rec            = lod_it_rec,
234                 .store          = lod_it_store,
235                 .load           = lod_it_load,
236                 .key_rec        = lod_it_key_rec,
237         }
238 };
239
240 static void lod_object_read_lock(const struct lu_env *env,
241                                  struct dt_object *dt, unsigned role)
242 {
243         dt_read_lock(env, dt_object_child(dt), role);
244 }
245
246 static void lod_object_write_lock(const struct lu_env *env,
247                                   struct dt_object *dt, unsigned role)
248 {
249         dt_write_lock(env, dt_object_child(dt), role);
250 }
251
252 static void lod_object_read_unlock(const struct lu_env *env,
253                                    struct dt_object *dt)
254 {
255         dt_read_unlock(env, dt_object_child(dt));
256 }
257
258 static void lod_object_write_unlock(const struct lu_env *env,
259                                     struct dt_object *dt)
260 {
261         dt_write_unlock(env, dt_object_child(dt));
262 }
263
264 static int lod_object_write_locked(const struct lu_env *env,
265                                    struct dt_object *dt)
266 {
267         return dt_write_locked(env, dt_object_child(dt));
268 }
269
270 static int lod_attr_get(const struct lu_env *env,
271                         struct dt_object *dt,
272                         struct lu_attr *attr,
273                         struct lustre_capa *capa)
274 {
275         return dt_attr_get(env, dt_object_child(dt), attr, capa);
276 }
277
278 static int lod_declare_attr_set(const struct lu_env *env,
279                                 struct dt_object *dt,
280                                 const struct lu_attr *attr,
281                                 struct thandle *handle)
282 {
283         struct dt_object  *next = dt_object_child(dt);
284         struct lod_object *lo = lod_dt_obj(dt);
285         int                rc, i;
286         ENTRY;
287
288         /*
289          * declare setattr on the local object
290          */
291         rc = dt_declare_attr_set(env, next, attr, handle);
292         if (rc)
293                 RETURN(rc);
294
295         /* osp_declare_attr_set() ignores all attributes other than
296          * UID, GID, and size, and osp_attr_set() ignores all but UID
297          * and GID.  Declaration of size attr setting happens through
298          * lod_declare_init_size(), and not through this function.
299          * Therefore we need not load striping unless ownership is
300          * changing.  This should save memory and (we hope) speed up
301          * rename(). */
302         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
303                 if (!(attr->la_valid & (LA_UID | LA_GID)))
304                         RETURN(rc);
305
306                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
307                         RETURN(0);
308         } else {
309                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
310                                         LA_ATIME | LA_MTIME | LA_CTIME)))
311                         RETURN(rc);
312         }
313         /*
314          * load striping information, notice we don't do this when object
315          * is being initialized as we don't need this information till
316          * few specific cases like destroy, chown
317          */
318         rc = lod_load_striping(env, lo);
319         if (rc)
320                 RETURN(rc);
321
322         if (lo->ldo_stripenr == 0)
323                 RETURN(0);
324
325         if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
326                 struct lu_attr   *la = &lod_env_info(env)->lti_attr;
327                 bool             setattr_time = false;
328
329                 rc = dt_attr_get(env, dt_object_child(dt), la,
330                                  BYPASS_CAPA);
331                 if (rc != 0)
332                         RETURN(rc);
333
334                 /* If it will only setattr time, it will only set
335                  * time < current_time */
336                 if ((attr->la_valid & LA_ATIME &&
337                      attr->la_atime < la->la_atime) ||
338                     (attr->la_valid & LA_CTIME &&
339                      attr->la_ctime < la->la_ctime) ||
340                     (attr->la_valid & LA_MTIME &&
341                      attr->la_mtime < la->la_mtime))
342                         setattr_time = true;
343
344                 if (!setattr_time)
345                         RETURN(0);
346         }
347         /*
348          * if object is striped declare changes on the stripes
349          */
350         LASSERT(lo->ldo_stripe);
351         for (i = 0; i < lo->ldo_stripenr; i++) {
352                 LASSERT(lo->ldo_stripe[i]);
353
354                 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
355                 if (rc) {
356                         CERROR("failed declaration: %d\n", rc);
357                         break;
358                 }
359         }
360
361         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
362             dt_object_exists(next) != 0 &&
363             dt_object_remote(next) == 0)
364                 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
365
366         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
367             dt_object_exists(next) &&
368             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
369                 struct lod_thread_info *info = lod_env_info(env);
370                 struct lu_buf *buf = &info->lti_buf;
371
372                 buf->lb_buf = info->lti_ea_store;
373                 buf->lb_len = info->lti_ea_store_size;
374                 dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV,
375                                      LU_XATTR_REPLACE, handle);
376         }
377
378         RETURN(rc);
379 }
380
381 static int lod_attr_set(const struct lu_env *env,
382                         struct dt_object *dt,
383                         const struct lu_attr *attr,
384                         struct thandle *handle,
385                         struct lustre_capa *capa)
386 {
387         struct dt_object  *next = dt_object_child(dt);
388         struct lod_object *lo = lod_dt_obj(dt);
389         int                rc, i;
390         ENTRY;
391
392         /*
393          * apply changes to the local object
394          */
395         rc = dt_attr_set(env, next, attr, handle, capa);
396         if (rc)
397                 RETURN(rc);
398
399         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
400                 if (!(attr->la_valid & (LA_UID | LA_GID)))
401                         RETURN(rc);
402
403                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
404                         RETURN(0);
405         } else {
406                 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
407                                         LA_ATIME | LA_MTIME | LA_CTIME)))
408                         RETURN(rc);
409         }
410
411         if (lo->ldo_stripenr == 0)
412                 RETURN(0);
413
414         if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
415                 struct lu_attr   *la = &lod_env_info(env)->lti_attr;
416                 bool             setattr_time = false;
417
418                 rc = dt_attr_get(env, dt_object_child(dt), la,
419                                  BYPASS_CAPA);
420                 if (rc != 0)
421                         RETURN(rc);
422
423                 /* If it will only setattr time, it will only set
424                  * time < current_time */
425                 if ((attr->la_valid & LA_ATIME &&
426                      attr->la_atime < la->la_atime) ||
427                     (attr->la_valid & LA_CTIME &&
428                      attr->la_ctime < la->la_ctime) ||
429                     (attr->la_valid & LA_MTIME &&
430                      attr->la_mtime < la->la_mtime))
431                         setattr_time = true;
432
433                 if (!setattr_time)
434                         RETURN(0);
435         }
436
437         /*
438          * if object is striped, apply changes to all the stripes
439          */
440         LASSERT(lo->ldo_stripe);
441         for (i = 0; i < lo->ldo_stripenr; i++) {
442                 LASSERT(lo->ldo_stripe[i]);
443                 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
444                 if (rc) {
445                         CERROR("failed declaration: %d\n", rc);
446                         break;
447                 }
448         }
449
450         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
451             dt_object_exists(next) != 0 &&
452             dt_object_remote(next) == 0)
453                 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
454
455         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) &&
456             dt_object_exists(next) &&
457             dt_object_remote(next) == 0 && S_ISREG(attr->la_mode)) {
458                 struct lod_thread_info *info = lod_env_info(env);
459                 struct lu_buf *buf = &info->lti_buf;
460                 struct ost_id *oi = &info->lti_ostid;
461                 struct lu_fid *fid = &info->lti_fid;
462                 struct lov_mds_md_v1 *lmm;
463                 struct lov_ost_data_v1 *objs;
464                 __u32 magic;
465                 int rc1;
466
467                 rc1 = lod_get_lov_ea(env, lo);
468                 if (rc1  <= 0)
469                         RETURN(rc);
470
471                 buf->lb_buf = info->lti_ea_store;
472                 buf->lb_len = info->lti_ea_store_size;
473                 lmm = info->lti_ea_store;
474                 magic = le32_to_cpu(lmm->lmm_magic);
475                 if (magic == LOV_MAGIC_V1)
476                         objs = &(lmm->lmm_objects[0]);
477                 else
478                         objs = &((struct lov_mds_md_v3 *)lmm)->lmm_objects[0];
479                 ostid_le_to_cpu(&objs->l_ost_oi, oi);
480                 ostid_to_fid(fid, oi, le32_to_cpu(objs->l_ost_idx));
481                 fid->f_oid--;
482                 fid_to_ostid(fid, oi);
483                 ostid_cpu_to_le(oi, &objs->l_ost_oi);
484                 dt_xattr_set(env, next, buf, XATTR_NAME_LOV,
485                              LU_XATTR_REPLACE, handle, BYPASS_CAPA);
486         }
487
488         RETURN(rc);
489 }
490
491 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
492                          struct lu_buf *buf, const char *name,
493                          struct lustre_capa *capa)
494 {
495         struct lod_thread_info  *info = lod_env_info(env);
496         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
497         int                      rc, is_root;
498         ENTRY;
499
500         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
501         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
502                 RETURN(rc);
503
504         /*
505          * lod returns default striping on the real root of the device
506          * this is like the root stores default striping for the whole
507          * filesystem. historically we've been using a different approach
508          * and store it in the config.
509          */
510         dt_root_get(env, dev->lod_child, &info->lti_fid);
511         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
512
513         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
514                 struct lov_user_md *lum = buf->lb_buf;
515                 struct lov_desc    *desc = &dev->lod_desc;
516
517                 if (buf->lb_buf == NULL) {
518                         rc = sizeof(*lum);
519                 } else if (buf->lb_len >= sizeof(*lum)) {
520                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
521                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
522                         lmm_oi_set_id(&lum->lmm_oi, 0);
523                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
524                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
525                         lum->lmm_stripe_size = cpu_to_le32(
526                                                 desc->ld_default_stripe_size);
527                         lum->lmm_stripe_count = cpu_to_le16(
528                                                 desc->ld_default_stripe_count);
529                         lum->lmm_stripe_offset = cpu_to_le16(
530                                                 desc->ld_default_stripe_offset);
531                         rc = sizeof(*lum);
532                 } else {
533                         rc = -ERANGE;
534                 }
535         }
536
537         RETURN(rc);
538 }
539
540 static int lod_verify_md_striping(struct lod_device *lod,
541                                   const struct lmv_user_md_v1 *lum)
542 {
543         int     rc = 0;
544         ENTRY;
545
546         if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
547                 GOTO(out, rc = -EINVAL);
548
549         if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
550                 GOTO(out, rc = -EINVAL);
551
552         if (unlikely(le32_to_cpu(lum->lum_stripe_count) >
553                                 lod->lod_remote_mdt_count + 1))
554                 GOTO(out, rc = -EINVAL);
555 out:
556         if (rc != 0)
557                 CERROR("%s: invalid lmv_user_md: magic = %x, "
558                        "stripe_offset = %d, stripe_count = %u: rc = %d\n",
559                        lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
560                        (int)le32_to_cpu(lum->lum_stripe_offset),
561                        le32_to_cpu(lum->lum_stripe_count), rc);
562         return rc;
563 }
564
565 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
566                     struct lu_buf *lmv_buf)
567 {
568         struct lod_thread_info  *info = lod_env_info(env);
569         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
570         struct lod_object       *lo = lod_dt_obj(dt);
571         struct lmv_mds_md_v1    *lmm1;
572         int                     stripe_count;
573         int                     lmm_size;
574         int                     type = LU_SEQ_RANGE_ANY;
575         int                     i;
576         int                     rc;
577         __u32                   mdtidx;
578         ENTRY;
579
580         LASSERT(lo->ldo_dir_striped != 0);
581         LASSERT(lo->ldo_stripenr > 0);
582         stripe_count = lo->ldo_stripenr + 1;
583         lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
584         if (info->lti_ea_store_size < lmm_size) {
585                 rc = lod_ea_store_resize(info, lmm_size);
586                 if (rc != 0)
587                         RETURN(rc);
588         }
589
590         lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
591         lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
592         lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
593         lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
594         rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
595                             &mdtidx, &type);
596         if (rc != 0)
597                 RETURN(rc);
598
599         lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
600         fid_cpu_to_le(&lmm1->lmv_stripe_fids[0], lu_object_fid(&dt->do_lu));
601         for (i = 0; i < lo->ldo_stripenr; i++) {
602                 struct dt_object *dto;
603
604                 dto = lo->ldo_stripe[i];
605                 LASSERT(dto != NULL);
606                 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i + 1],
607                               lu_object_fid(&dto->do_lu));
608         }
609
610         lmv_buf->lb_buf = info->lti_ea_store;
611         lmv_buf->lb_len = lmm_size;
612         lo->ldo_dir_striping_cached = 1;
613
614         RETURN(rc);
615 }
616
617 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
618                            const struct lu_buf *buf)
619 {
620         struct lod_thread_info  *info = lod_env_info(env);
621         struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
622         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
623         struct dt_object        **stripe;
624         union lmv_mds_md        *lmm = buf->lb_buf;
625         struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
626         struct lu_fid           *fid = &info->lti_fid;
627         int                     i;
628         int                     rc = 0;
629         ENTRY;
630
631         if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_MIGRATE)
632                 RETURN(0);
633
634         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
635                 RETURN(-EINVAL);
636
637         if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
638                 RETURN(0);
639
640         fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[0]);
641         /* Do not load striping information for slave inode */
642         if (!lu_fid_eq(fid, lu_object_fid(&lo->ldo_obj.do_lu))) {
643                 lo->ldo_dir_slave_stripe = 1;
644                 RETURN(0);
645         }
646
647         LASSERT(lo->ldo_stripe == NULL);
648         OBD_ALLOC(stripe, sizeof(stripe[0]) *
649                   (le32_to_cpu(lmv1->lmv_stripe_count) - 1));
650         if (stripe == NULL)
651                 RETURN(-ENOMEM);
652
653         /* skip master stripe */
654         for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
655                 struct lod_tgt_desc     *tgt;
656                 int                     idx;
657                 int                     type = LU_SEQ_RANGE_ANY;
658                 struct dt_object        *dto;
659
660                 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
661                 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
662                 if (rc != 0)
663                         GOTO(out, rc);
664
665                 tgt = LTD_TGT(ltd, idx);
666                 if (tgt == NULL)
667                         GOTO(out, rc = -ESTALE);
668
669                 dto = dt_locate_at(env, tgt->ltd_tgt, fid,
670                                   lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
671                                   NULL);
672                 if (IS_ERR(dto))
673                         GOTO(out, rc = PTR_ERR(dto));
674
675                 stripe[i - 1] = dto;
676         }
677 out:
678         lo->ldo_stripe = stripe;
679         lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
680         lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
681         if (rc != 0)
682                 lod_object_free_striping(env, lo);
683
684         RETURN(rc);
685 }
686
687 static int lod_prep_md_striped_create(const struct lu_env *env,
688                                       struct dt_object *dt,
689                                       struct lu_attr *attr,
690                                       const struct lmv_user_md_v1 *lum,
691                                       struct thandle *th)
692 {
693         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
694         struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
695         struct lod_object       *lo = lod_dt_obj(dt);
696         struct dt_object        **stripe;
697         struct lu_buf           lmv_buf;
698         int                     stripe_count;
699         int                     *idx_array;
700         int                     rc = 0;
701         int                     i;
702         int                     j;
703         ENTRY;
704
705         /* The lum has been verifed in lod_verify_md_striping */
706         LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
707         LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
708
709         /* Do not need allocated master stripe */
710         stripe_count = le32_to_cpu(lum->lum_stripe_count);
711         OBD_ALLOC(stripe, sizeof(stripe[0]) * (stripe_count - 1));
712         if (stripe == NULL)
713                 RETURN(-ENOMEM);
714
715         OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
716         if (idx_array == NULL)
717                 GOTO(out_free, rc = -ENOMEM);
718
719         idx_array[0] = le32_to_cpu(lum->lum_stripe_offset);
720         for (i = 1; i < stripe_count; i++) {
721                 struct lod_tgt_desc     *tgt;
722                 struct dt_object        *dto;
723                 struct lu_fid           fid;
724                 int                     idx;
725                 struct lu_object_conf   conf = { 0 };
726
727                 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
728
729                 for (j = 0; j < lod->lod_remote_mdt_count;
730                      j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
731                         bool already_allocated = false;
732                         int k;
733
734                         CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
735                                " allocated %d, last allocated %d\n", idx,
736                                lod->lod_remote_mdt_count, i, idx_array[i - 1]);
737
738                         /* Find next avaible target */
739                         if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
740                                 continue;
741
742                         /* check whether the idx already exists
743                          * in current allocated array */
744                         for (k = 0; k < i; k++) {
745                                 if (idx_array[k] == idx) {
746                                         already_allocated = true;
747                                         break;
748                                 }
749                         }
750
751                         if (already_allocated)
752                                 continue;
753
754                         break;
755                 }
756
757                 /* Can not allocate more stripes */
758                 if (j == lod->lod_remote_mdt_count) {
759                         CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
760                                lod2obd(lod)->obd_name, stripe_count, i - 1);
761                         break;
762                 }
763
764                 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
765                        " allocated %d, last allocated %d\n", idx,
766                        lod->lod_remote_mdt_count, i, idx_array[i - 1]);
767
768                 tgt = LTD_TGT(ltd, idx);
769                 LASSERT(tgt != NULL);
770
771                 rc = obd_fid_alloc(tgt->ltd_exp, &fid, NULL);
772                 if (rc < 0)
773                         GOTO(out_put, rc);
774                 rc = 0;
775
776                 conf.loc_flags = LOC_F_NEW;
777                 dto = dt_locate_at(env, tgt->ltd_tgt, &fid,
778                                   dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf);
779                 if (IS_ERR(dto))
780                         GOTO(out_put, rc = PTR_ERR(dto));
781                 stripe[i - 1] = dto;
782                 idx_array[i] = idx;
783         }
784
785         lo->ldo_dir_striped = 1;
786         lo->ldo_stripe = stripe;
787         lo->ldo_stripenr = i - 1;
788         lo->ldo_stripes_allocated = stripe_count - 1;
789
790         if (lo->ldo_stripenr == 0)
791                 GOTO(out_put, rc = -ENOSPC);
792
793         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
794         if (rc != 0)
795                 GOTO(out_put, rc);
796
797         for (i = 0; i < lo->ldo_stripenr; i++) {
798                 struct dt_object *dto;
799
800                 dto = stripe[i];
801                 /* only create slave striped object */
802                 rc = dt_declare_create(env, dto, attr, NULL, NULL, th);
803                 if (rc != 0)
804                         GOTO(out_put, rc);
805
806                 if (!dt_try_as_dir(env, dto))
807                         GOTO(out_put, rc = -EINVAL);
808
809                 rc = dt_declare_insert(env, dto,
810                      (const struct dt_rec *)lu_object_fid(&dto->do_lu),
811                      (const struct dt_key *)dot, th);
812                 if (rc != 0)
813                         GOTO(out_put, rc);
814
815                 /* master stripe FID will be put to .. */
816                 rc = dt_declare_insert(env, dto,
817                      (const struct dt_rec *)lu_object_fid(&dt->do_lu),
818                      (const struct dt_key *)dotdot, th);
819                 if (rc != 0)
820                         GOTO(out_put, rc);
821
822                 /* probably nothing to inherite */
823                 if (lo->ldo_striping_cached &&
824                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
825                                          lo->ldo_def_stripenr,
826                                          lo->ldo_def_stripe_offset)) {
827                         struct lod_thread_info  *info;
828                         struct lov_user_md_v3   *v3;
829
830                         /* sigh, lti_ea_store has been used for lmv_buf,
831                          * so we have to allocate buffer for default
832                          * stripe EA */
833                         OBD_ALLOC_PTR(v3);
834                         if (v3 == NULL)
835                                 GOTO(out_put, rc = -ENOMEM);
836
837                         memset(v3, 0, sizeof(*v3));
838                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
839                         v3->lmm_stripe_count =
840                                 cpu_to_le16(lo->ldo_def_stripenr);
841                         v3->lmm_stripe_offset =
842                                 cpu_to_le16(lo->ldo_def_stripe_offset);
843                         v3->lmm_stripe_size =
844                                 cpu_to_le32(lo->ldo_def_stripe_size);
845                         if (lo->ldo_pool)
846                                 strncpy(v3->lmm_pool_name, lo->ldo_pool,
847                                         LOV_MAXPOOLNAME);
848
849                         info = lod_env_info(env);
850                         info->lti_buf.lb_buf = v3;
851                         info->lti_buf.lb_len = sizeof(*v3);
852                         rc = dt_declare_xattr_set(env, dto,
853                                                   &info->lti_buf,
854                                                   XATTR_NAME_LOV,
855                                                   0, th);
856                         OBD_FREE_PTR(v3);
857                         if (rc != 0)
858                                 GOTO(out_put, rc);
859                 }
860                 rc = dt_declare_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, 0,
861                                           th);
862                 if (rc != 0)
863                         GOTO(out_put, rc);
864         }
865
866         rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
867                                   XATTR_NAME_LMV, 0, th);
868         if (rc != 0)
869                 GOTO(out_put, rc);
870
871 out_put:
872         if (rc < 0) {
873                 for (i = 0; i < stripe_count - 1; i++)
874                         if (stripe[i] != NULL)
875                                 lu_object_put(env, &stripe[i]->do_lu);
876                 OBD_FREE(stripe, sizeof(stripe[0]) * (stripe_count - 1));
877         }
878
879 out_free:
880         if (idx_array != NULL)
881                 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
882
883         RETURN(rc);
884 }
885
886 /**
887  * Declare create striped md object.
888  */
889 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
890                                      struct dt_object *dt,
891                                      struct lu_attr *attr,
892                                      const struct lu_buf *lum_buf,
893                                      struct thandle *th)
894 {
895         struct lod_object       *lo = lod_dt_obj(dt);
896         struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
897         struct lmv_user_md_v1   *lum;
898         int                     rc;
899         ENTRY;
900
901         lum = lum_buf->lb_buf;
902         LASSERT(lum != NULL);
903
904         CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
905                le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
906                (int)le32_to_cpu(lum->lum_stripe_offset));
907
908         if (le32_to_cpu(lum->lum_stripe_count) <= 1)
909                 GOTO(out, rc = 0);
910
911         rc = lod_verify_md_striping(lod, lum);
912         if (rc != 0)
913                 GOTO(out, rc);
914
915         /* prepare dir striped objects */
916         rc = lod_prep_md_striped_create(env, dt, attr, lum, th);
917         if (rc != 0) {
918                 /* failed to create striping, let's reset
919                  * config so that others don't get confused */
920                 lod_object_free_striping(env, lo);
921                 GOTO(out, rc);
922         }
923 out:
924         RETURN(rc);
925 }
926
927 /*
928  * LOV xattr is a storage for striping, and LOD owns this xattr.
929  * but LOD allows others to control striping to some extent
930  * - to reset strping
931  * - to set new defined striping
932  * - to set new semi-defined striping
933  *   - number of stripes is defined
934  *   - number of stripes + osts are defined
935  *   - ??
936  */
937 static int lod_declare_xattr_set(const struct lu_env *env,
938                                  struct dt_object *dt,
939                                  const struct lu_buf *buf,
940                                  const char *name, int fl,
941                                  struct thandle *th)
942 {
943         struct dt_object *next = dt_object_child(dt);
944         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
945         __u32             mode;
946         int               rc;
947         ENTRY;
948
949         /*
950          * allow to declare predefined striping on a new (!mode) object
951          * which is supposed to be replay of regular file creation
952          * (when LOV setting is declared)
953          * LU_XATTR_REPLACE is set to indicate a layout swap
954          */
955         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
956         if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
957              !(fl & LU_XATTR_REPLACE)) {
958                 /*
959                  * this is a request to manipulate object's striping
960                  */
961                 if (dt_object_exists(dt)) {
962                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
963                         if (rc)
964                                 RETURN(rc);
965                 } else {
966                         memset(attr, 0, sizeof(*attr));
967                         attr->la_valid = LA_TYPE | LA_MODE;
968                         attr->la_mode = S_IFREG;
969                 }
970                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
971         } else if (S_ISDIR(mode)) {
972                 struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
973                 struct lod_object       *lo = lod_dt_obj(dt);
974                 int                     i;
975
976                 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
977                         struct lmv_user_md_v1 *lum;
978
979                         LASSERT(buf != NULL && buf->lb_buf != NULL);
980                         lum = buf->lb_buf;
981                         rc = lod_verify_md_striping(d, lum);
982                         if (rc != 0)
983                                 RETURN(rc);
984                 }
985
986                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
987                 if (rc != 0)
988                         RETURN(rc);
989
990                 /* set xattr to each stripes, if needed */
991                 rc = lod_load_striping(env, lo);
992                 if (rc != 0)
993                         RETURN(rc);
994
995                 if (lo->ldo_stripenr == 0)
996                         RETURN(rc);
997
998                 for (i = 0; i < lo->ldo_stripenr; i++) {
999                         LASSERT(lo->ldo_stripe[i]);
1000                         rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
1001                                                   name, fl, th);
1002                         if (rc != 0)
1003                                 break;
1004                 }
1005         } else {
1006                 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
1007         }
1008
1009         RETURN(rc);
1010 }
1011
1012 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
1013 {
1014         lo->ldo_striping_cached = 0;
1015         lo->ldo_def_striping_set = 0;
1016         lod_object_set_pool(lo, NULL);
1017         lo->ldo_def_stripe_size = 0;
1018         lo->ldo_def_stripenr = 0;
1019 }
1020
1021 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
1022                                     struct dt_object *dt,
1023                                     const struct lu_buf *buf,
1024                                     const char *name, int fl,
1025                                     struct thandle *th,
1026                                     struct lustre_capa *capa)
1027 {
1028         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
1029         struct dt_object        *next = dt_object_child(dt);
1030         struct lod_object       *l = lod_dt_obj(dt);
1031         struct lov_user_md_v1   *lum;
1032         struct lov_user_md_v3   *v3 = NULL;
1033         int                      rc;
1034         ENTRY;
1035
1036         /* If it is striped dir, we should clear the stripe cache for
1037          * slave stripe as well, but there are no effective way to
1038          * notify the LOD on the slave MDT, so we do not cache stripe
1039          * information for slave stripe for now. XXX*/
1040         lod_lov_stripe_cache_clear(l);
1041         LASSERT(buf != NULL && buf->lb_buf != NULL);
1042         lum = buf->lb_buf;
1043
1044         rc = lod_verify_striping(d, buf, 0);
1045         if (rc)
1046                 RETURN(rc);
1047
1048         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1049                 v3 = buf->lb_buf;
1050
1051         /* if { size, offset, count } = { 0, -1, 0 } and no pool
1052          * (i.e. all default values specified) then delete default
1053          * striping from dir. */
1054         CDEBUG(D_OTHER,
1055                 "set default striping: sz %u # %u offset %d %s %s\n",
1056                 (unsigned)lum->lmm_stripe_size,
1057                 (unsigned)lum->lmm_stripe_count,
1058                 (int)lum->lmm_stripe_offset,
1059                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1060
1061         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1062                                 (lum->lmm_stripe_count),
1063                                 (lum->lmm_stripe_offset)) &&
1064                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
1065                 rc = dt_xattr_del(env, next, name, th, capa);
1066                 if (rc == -ENODATA)
1067                         rc = 0;
1068         } else {
1069                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1070         }
1071
1072         RETURN(rc);
1073 }
1074
1075 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1076                                             struct dt_object *dt,
1077                                             const struct lu_buf *buf,
1078                                             const char *name, int fl,
1079                                             struct thandle *th,
1080                                             struct lustre_capa *capa)
1081 {
1082         struct dt_object        *next = dt_object_child(dt);
1083         struct lod_object       *l = lod_dt_obj(dt);
1084         struct lmv_user_md_v1   *lum;
1085         int                      rc;
1086         ENTRY;
1087
1088         LASSERT(buf != NULL && buf->lb_buf != NULL);
1089         lum = buf->lb_buf;
1090
1091         CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1092               le32_to_cpu(lum->lum_stripe_count),
1093               (int)le32_to_cpu(lum->lum_stripe_offset));
1094
1095         if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1096                                  le32_to_cpu(lum->lum_stripe_offset)) &&
1097                                 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1098                 rc = dt_xattr_del(env, next, name, th, capa);
1099                 if (rc == -ENODATA)
1100                         rc = 0;
1101         } else {
1102                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1103                 if (rc != 0)
1104                         RETURN(rc);
1105
1106                 /* Update default stripe cache */
1107                 if (l->ldo_dir_stripe == NULL) {
1108                         OBD_ALLOC_PTR(l->ldo_dir_stripe);
1109                         if (l->ldo_dir_stripe == NULL)
1110                                 RETURN(-ENOMEM);
1111                 }
1112
1113                 l->ldo_dir_striping_cached = 0;
1114                 l->ldo_dir_def_striping_set = 1;
1115                 l->ldo_dir_def_stripenr =
1116                         le32_to_cpu(lum->lum_stripe_count) - 1;
1117         }
1118
1119         RETURN(rc);
1120 }
1121
1122 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1123                              const struct lu_buf *buf, const char *name,
1124                              int fl, struct thandle *th,
1125                              struct lustre_capa *capa)
1126 {
1127         struct lod_object       *lo = lod_dt_obj(dt);
1128         struct lu_buf           lmv_buf;
1129         int                     i;
1130         int                     rc;
1131         ENTRY;
1132
1133         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1134                 RETURN(-ENOTDIR);
1135
1136         /* The stripes are supposed to be allocated in declare phase,
1137          * if there are no stripes being allocated, it will skip */
1138         if (lo->ldo_stripenr == 0)
1139                 RETURN(0);
1140
1141         rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1142         if (rc != 0)
1143                 RETURN(rc);
1144
1145         for (i = 0; i < lo->ldo_stripenr; i++) {
1146                 struct dt_object *dto;
1147                 struct lu_attr  *attr = &lod_env_info(env)->lti_attr;
1148
1149                 dto = lo->ldo_stripe[i];
1150                 memset(attr, 0, sizeof(*attr));
1151                 attr->la_valid = LA_TYPE | LA_MODE;
1152                 attr->la_mode = S_IFDIR;
1153                 rc = dt_create(env, dto, attr, NULL, NULL, th);
1154                 if (rc != 0)
1155                         RETURN(rc);
1156
1157                 rc = dt_insert(env, dto,
1158                               (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1159                               (const struct dt_key *)dot, th, capa, 0);
1160                 if (rc != 0)
1161                         RETURN(rc);
1162
1163                 rc = dt_insert(env, dto,
1164                               (struct dt_rec *)lu_object_fid(&dt->do_lu),
1165                               (const struct dt_key *)dotdot, th, capa, 0);
1166                 if (rc != 0)
1167                         RETURN(rc);
1168
1169                 if (lo->ldo_striping_cached &&
1170                     !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1171                                          lo->ldo_def_stripenr,
1172                                          lo->ldo_def_stripe_offset)) {
1173                         struct lod_thread_info  *info;
1174                         struct lov_user_md_v3   *v3;
1175
1176                         /* sigh, lti_ea_store has been used for lmv_buf,
1177                          * so we have to allocate buffer for default
1178                          * stripe EA */
1179                         OBD_ALLOC_PTR(v3);
1180                         if (v3 == NULL)
1181                                 RETURN(-ENOMEM);
1182
1183                         memset(v3, 0, sizeof(*v3));
1184                         v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1185                         v3->lmm_stripe_count =
1186                                 cpu_to_le16(lo->ldo_def_stripenr);
1187                         v3->lmm_stripe_offset =
1188                                 cpu_to_le16(lo->ldo_def_stripe_offset);
1189                         v3->lmm_stripe_size =
1190                                 cpu_to_le32(lo->ldo_def_stripe_size);
1191                         if (lo->ldo_pool)
1192                                 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1193                                         LOV_MAXPOOLNAME);
1194
1195                         info = lod_env_info(env);
1196                         info->lti_buf.lb_buf = v3;
1197                         info->lti_buf.lb_len = sizeof(*v3);
1198                         rc = dt_xattr_set(env, dto, &info->lti_buf,
1199                                           XATTR_NAME_LOV, 0, th, capa);
1200                         OBD_FREE_PTR(v3);
1201                         if (rc != 0)
1202                                 RETURN(rc);
1203                 }
1204
1205                 rc = dt_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, fl, th,
1206                                   capa);
1207         }
1208
1209         rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
1210                           fl, th, capa);
1211
1212         RETURN(rc);
1213 }
1214
1215 static int lod_xattr_set(const struct lu_env *env,
1216                          struct dt_object *dt, const struct lu_buf *buf,
1217                          const char *name, int fl, struct thandle *th,
1218                          struct lustre_capa *capa)
1219 {
1220         struct lod_object       *lo = lod_dt_obj(dt);
1221         struct dt_object        *next = dt_object_child(dt);
1222         __u32                    attr;
1223         int                      rc;
1224         int                     i;
1225         ENTRY;
1226
1227         attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
1228         if (S_ISDIR(attr) && strcmp(name, XATTR_NAME_LOV) == 0) {
1229                 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
1230         } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
1231                 /* in case of lov EA swap, just set it
1232                  * if not, it is a replay so check striping match what we
1233                  * already have during req replay, declare_xattr_set()
1234                  * defines striping, then create() does the work
1235                 */
1236                 if (fl & LU_XATTR_REPLACE) {
1237                         /* free stripes, then update disk */
1238                         lod_object_free_striping(env, lod_dt_obj(dt));
1239                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1240                 } else {
1241                         rc = lod_striping_create(env, dt, NULL, NULL, th);
1242                 }
1243         } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1244                 if (!S_ISDIR(attr))
1245                         RETURN(-ENOTDIR);
1246                 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
1247                                                       th, capa);
1248         } else {
1249                 /*
1250                  * behave transparantly for all other EAs
1251                  */
1252                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1253         }
1254
1255         if (rc != 0 || !S_ISDIR(attr))
1256                 RETURN(rc);
1257
1258         if (lo->ldo_stripenr == 0)
1259                 RETURN(rc);
1260
1261         for (i = 0; i < lo->ldo_stripenr; i++) {
1262                 LASSERT(lo->ldo_stripe[i]);
1263                 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1264                                   capa);
1265                 if (rc != 0)
1266                         break;
1267         }
1268
1269         RETURN(rc);
1270 }
1271
1272 static int lod_declare_xattr_del(const struct lu_env *env,
1273                                  struct dt_object *dt, const char *name,
1274                                  struct thandle *th)
1275 {
1276         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
1277 }
1278
1279 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
1280                          const char *name, struct thandle *th,
1281                          struct lustre_capa *capa)
1282 {
1283         if (!strcmp(name, XATTR_NAME_LOV))
1284                 lod_object_free_striping(env, lod_dt_obj(dt));
1285         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
1286 }
1287
1288 static int lod_xattr_list(const struct lu_env *env,
1289                           struct dt_object *dt, struct lu_buf *buf,
1290                           struct lustre_capa *capa)
1291 {
1292         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
1293 }
1294
1295 int lod_object_set_pool(struct lod_object *o, char *pool)
1296 {
1297         int len;
1298
1299         if (o->ldo_pool) {
1300                 len = strlen(o->ldo_pool);
1301                 OBD_FREE(o->ldo_pool, len + 1);
1302                 o->ldo_pool = NULL;
1303         }
1304         if (pool) {
1305                 len = strlen(pool);
1306                 OBD_ALLOC(o->ldo_pool, len + 1);
1307                 if (o->ldo_pool == NULL)
1308                         return -ENOMEM;
1309                 strcpy(o->ldo_pool, pool);
1310         }
1311         return 0;
1312 }
1313
1314 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
1315 {
1316         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
1317 }
1318
1319
1320 static int lod_cache_parent_lov_striping(const struct lu_env *env,
1321                                          struct lod_object *lp)
1322 {
1323         struct lod_thread_info  *info = lod_env_info(env);
1324         struct lov_user_md_v1   *v1 = NULL;
1325         struct lov_user_md_v3   *v3 = NULL;
1326         int                      rc;
1327         ENTRY;
1328
1329         /* called from MDD without parent being write locked,
1330          * lock it here */
1331         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
1332         rc = lod_get_lov_ea(env, lp);
1333         if (rc < 0)
1334                 GOTO(unlock, rc);
1335
1336         if (rc < sizeof(struct lov_user_md)) {
1337                 /* don't lookup for non-existing or invalid striping */
1338                 lp->ldo_def_striping_set = 0;
1339                 lp->ldo_striping_cached = 1;
1340                 lp->ldo_def_stripe_size = 0;
1341                 lp->ldo_def_stripenr = 0;
1342                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
1343                 GOTO(unlock, rc = 0);
1344         }
1345
1346         rc = 0;
1347         v1 = info->lti_ea_store;
1348         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
1349                 lustre_swab_lov_user_md_v1(v1);
1350         else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
1351                 lustre_swab_lov_user_md_v3(v3);
1352
1353         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
1354                 GOTO(unlock, rc = 0);
1355
1356         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
1357                 GOTO(unlock, rc = 0);
1358
1359         CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
1360                PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
1361                (int)v1->lmm_stripe_count,
1362                (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
1363
1364         lp->ldo_def_stripenr = v1->lmm_stripe_count;
1365         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
1366         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
1367         lp->ldo_striping_cached = 1;
1368         lp->ldo_def_striping_set = 1;
1369         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
1370                 /* XXX: sanity check here */
1371                 v3 = (struct lov_user_md_v3 *) v1;
1372                 if (v3->lmm_pool_name[0])
1373                         lod_object_set_pool(lp, v3->lmm_pool_name);
1374         }
1375         EXIT;
1376 unlock:
1377         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
1378         return rc;
1379 }
1380
1381
1382 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
1383                                          struct lod_object *lp)
1384 {
1385         struct lod_thread_info  *info = lod_env_info(env);
1386         struct lmv_user_md_v1   *v1 = NULL;
1387         int                      rc;
1388         ENTRY;
1389
1390         /* called from MDD without parent being write locked,
1391          * lock it here */
1392         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
1393         rc = lod_get_default_lmv_ea(env, lp);
1394         if (rc < 0)
1395                 GOTO(unlock, rc);
1396
1397         if (rc < sizeof(struct lmv_user_md)) {
1398                 /* don't lookup for non-existing or invalid striping */
1399                 lp->ldo_dir_def_striping_set = 0;
1400                 lp->ldo_dir_striping_cached = 1;
1401                 lp->ldo_dir_def_stripenr = 0;
1402                 lp->ldo_dir_def_stripe_offset =
1403                                         (typeof(v1->lum_stripe_offset))(-1);
1404                 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
1405                 GOTO(unlock, rc = 0);
1406         }
1407
1408         rc = 0;
1409         v1 = info->lti_ea_store;
1410
1411         lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count) - 1;
1412         lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
1413         lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
1414         lp->ldo_dir_def_striping_set = 1;
1415         lp->ldo_dir_striping_cached = 1;
1416
1417         EXIT;
1418 unlock:
1419         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
1420         return rc;
1421 }
1422
1423 static int lod_cache_parent_striping(const struct lu_env *env,
1424                                      struct lod_object *lp,
1425                                      umode_t child_mode)
1426 {
1427         int rc = 0;
1428         ENTRY;
1429
1430         rc = lod_load_striping(env, lp);
1431         if (rc != 0)
1432                 RETURN(rc);
1433
1434         if (!lp->ldo_striping_cached) {
1435                 /* we haven't tried to get default striping for
1436                  * the directory yet, let's cache it in the object */
1437                 rc = lod_cache_parent_lov_striping(env, lp);
1438                 if (rc != 0)
1439                         RETURN(rc);
1440         }
1441
1442         if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
1443                 rc = lod_cache_parent_lmv_striping(env, lp);
1444
1445         RETURN(rc);
1446 }
1447
1448 /**
1449  * used to transfer default striping data to the object being created
1450  */
1451 static void lod_ah_init(const struct lu_env *env,
1452                         struct dt_allocation_hint *ah,
1453                         struct dt_object *parent,
1454                         struct dt_object *child,
1455                         umode_t child_mode)
1456 {
1457         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
1458         struct dt_object  *nextp = NULL;
1459         struct dt_object  *nextc;
1460         struct lod_object *lp = NULL;
1461         struct lod_object *lc;
1462         struct lov_desc   *desc;
1463         ENTRY;
1464
1465         LASSERT(child);
1466
1467         if (likely(parent)) {
1468                 nextp = dt_object_child(parent);
1469                 lp = lod_dt_obj(parent);
1470         }
1471
1472         nextc = dt_object_child(child);
1473         lc = lod_dt_obj(child);
1474
1475         LASSERT(lc->ldo_stripenr == 0);
1476         LASSERT(lc->ldo_stripe == NULL);
1477
1478         /*
1479          * local object may want some hints
1480          * in case of late striping creation, ->ah_init()
1481          * can be called with local object existing
1482          */
1483         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
1484                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
1485                                           NULL : nextp, nextc, child_mode);
1486
1487         if (S_ISDIR(child_mode)) {
1488                 int rc;
1489
1490                 if (lc->ldo_dir_stripe == NULL) {
1491                         OBD_ALLOC_PTR(lc->ldo_dir_stripe);
1492                         if (lc->ldo_dir_stripe == NULL)
1493                                 return;
1494                 }
1495
1496                 if (lp->ldo_dir_stripe == NULL) {
1497                         OBD_ALLOC_PTR(lp->ldo_dir_stripe);
1498                         if (lp->ldo_dir_stripe == NULL)
1499                                 return;
1500                 }
1501
1502                 rc = lod_cache_parent_striping(env, lp, child_mode);
1503                 if (rc != 0)
1504                         return;
1505
1506                 /* transfer defaults to new directory */
1507                 if (lp->ldo_striping_cached) {
1508                         if (lp->ldo_pool)
1509                                 lod_object_set_pool(lc, lp->ldo_pool);
1510                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
1511                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
1512                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
1513                         lc->ldo_striping_cached = 1;
1514                         lc->ldo_def_striping_set = 1;
1515                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
1516                                (int)lc->ldo_def_stripe_size,
1517                                (int)lc->ldo_def_stripe_offset,
1518                                (int)lc->ldo_def_stripenr);
1519                 }
1520
1521                 /* transfer dir defaults to new directory */
1522                 if (lp->ldo_dir_striping_cached) {
1523                         lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
1524                         lc->ldo_dir_def_stripe_offset =
1525                                                   lp->ldo_dir_def_stripe_offset;
1526                         lc->ldo_dir_def_hash_type =
1527                                                   lp->ldo_dir_def_hash_type;
1528                         lc->ldo_dir_striping_cached = 1;
1529                         lc->ldo_dir_def_striping_set = 1;
1530                         CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
1531                                (int)lc->ldo_dir_def_stripenr,
1532                                (int)lc->ldo_dir_def_stripe_offset,
1533                                lc->ldo_dir_def_hash_type);
1534                 }
1535
1536                 /* If the directory is specified with certain stripes */
1537                 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
1538                         const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
1539                         int rc;
1540
1541                         rc = lod_verify_md_striping(d, lum1);
1542                         if (rc == 0 &&
1543                                 le32_to_cpu(lum1->lum_stripe_count) > 1) {
1544                                 /* Directory will be striped only if
1545                                  * stripe_count > 1 */
1546                                 lc->ldo_stripenr =
1547                                         le32_to_cpu(lum1->lum_stripe_count) - 1;
1548                                 lc->ldo_dir_stripe_offset =
1549                                         le32_to_cpu(lum1->lum_stripe_offset);
1550                                 lc->ldo_dir_hash_type =
1551                                         le32_to_cpu(lum1->lum_hash_type);
1552                                 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
1553                                        lc->ldo_stripenr,
1554                                        (int)lc->ldo_dir_stripe_offset);
1555                         }
1556                 } else if (lp->ldo_dir_def_striping_set) {
1557                         /* If there are default dir stripe from parent */
1558                         lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
1559                         lc->ldo_dir_stripe_offset =
1560                                         lp->ldo_dir_def_stripe_offset;
1561                         lc->ldo_dir_hash_type =
1562                                         lp->ldo_dir_def_hash_type;
1563                         CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
1564                                lc->ldo_stripenr,
1565                                (int)lc->ldo_dir_stripe_offset);
1566                 } else {
1567                         /* set default stripe for this directory */
1568                         lc->ldo_stripenr = 0;
1569                         lc->ldo_dir_stripe_offset = -1;
1570                 }
1571
1572                 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
1573                        lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
1574
1575                 goto out;
1576         }
1577
1578         /*
1579          * if object is going to be striped over OSTs, transfer default
1580          * striping information to the child, so that we can use it
1581          * during declaration and creation
1582          */
1583         if (!lod_object_will_be_striped(S_ISREG(child_mode),
1584                                         lu_object_fid(&child->do_lu)))
1585                 goto out;
1586         /*
1587          * try from the parent
1588          */
1589         if (likely(parent)) {
1590                 lod_cache_parent_striping(env, lp, child_mode);
1591
1592                 lc->ldo_def_stripe_offset = (__u16) -1;
1593
1594                 if (lp->ldo_def_striping_set) {
1595                         if (lp->ldo_pool)
1596                                 lod_object_set_pool(lc, lp->ldo_pool);
1597                         lc->ldo_stripenr = lp->ldo_def_stripenr;
1598                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
1599                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
1600                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
1601                                lc->ldo_stripenr, lc->ldo_stripe_size,
1602                                lp->ldo_pool ? lp->ldo_pool : "");
1603                 }
1604         }
1605
1606         /*
1607          * if the parent doesn't provide with specific pattern, grab fs-wide one
1608          */
1609         desc = &d->lod_desc;
1610         if (lc->ldo_stripenr == 0)
1611                 lc->ldo_stripenr = desc->ld_default_stripe_count;
1612         if (lc->ldo_stripe_size == 0)
1613                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
1614         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
1615                lc->ldo_stripenr, lc->ldo_stripe_size,
1616                lc->ldo_pool ? lc->ldo_pool : "");
1617
1618 out:
1619         /* we do not cache stripe information for slave stripe, see
1620          * lod_xattr_set_lov_on_dir */
1621         if (lp != NULL && lp->ldo_dir_slave_stripe)
1622                 lod_lov_stripe_cache_clear(lp);
1623
1624         EXIT;
1625 }
1626
1627 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
1628 /*
1629  * this function handles a special case when truncate was done
1630  * on a stripeless object and now striping is being created
1631  * we can't lose that size, so we have to propagate it to newly
1632  * created object
1633  */
1634 static int lod_declare_init_size(const struct lu_env *env,
1635                                  struct dt_object *dt, struct thandle *th)
1636 {
1637         struct dt_object   *next = dt_object_child(dt);
1638         struct lod_object  *lo = lod_dt_obj(dt);
1639         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
1640         uint64_t            size, offs;
1641         int                 rc, stripe;
1642         ENTRY;
1643
1644         /* XXX: we support the simplest (RAID0) striping so far */
1645         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
1646         LASSERT(lo->ldo_stripe_size > 0);
1647
1648         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1649         LASSERT(attr->la_valid & LA_SIZE);
1650         if (rc)
1651                 RETURN(rc);
1652
1653         size = attr->la_size;
1654         if (size == 0)
1655                 RETURN(0);
1656
1657         /* ll_do_div64(a, b) returns a % b, and a = a / b */
1658         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
1659         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
1660
1661         size = size * lo->ldo_stripe_size;
1662         offs = attr->la_size;
1663         size += ll_do_div64(offs, lo->ldo_stripe_size);
1664
1665         attr->la_valid = LA_SIZE;
1666         attr->la_size = size;
1667
1668         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
1669
1670         RETURN(rc);
1671 }
1672
1673 /**
1674  * Create declaration of striped object
1675  */
1676 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
1677                                struct lu_attr *attr,
1678                                const struct lu_buf *lovea, struct thandle *th)
1679 {
1680         struct lod_thread_info  *info = lod_env_info(env);
1681         struct dt_object        *next = dt_object_child(dt);
1682         struct lod_object       *lo = lod_dt_obj(dt);
1683         int                      rc;
1684         ENTRY;
1685
1686         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
1687                 /* failed to create striping, let's reset
1688                  * config so that others don't get confused */
1689                 lod_object_free_striping(env, lo);
1690                 GOTO(out, rc = -ENOMEM);
1691         }
1692
1693         if (!dt_object_remote(next)) {
1694                 /* choose OST and generate appropriate objects */
1695                 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
1696                 if (rc) {
1697                         /* failed to create striping, let's reset
1698                          * config so that others don't get confused */
1699                         lod_object_free_striping(env, lo);
1700                         GOTO(out, rc);
1701                 }
1702
1703                 /*
1704                  * declare storage for striping data
1705                  */
1706                 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
1707                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
1708         } else {
1709                 /* LOD can not choose OST objects for remote objects, i.e.
1710                  * stripes must be ready before that. Right now, it can only
1711                  * happen during migrate, i.e. migrate process needs to create
1712                  * remote regular file (mdd_migrate_create), then the migrate
1713                  * process will provide stripeEA. */
1714                 LASSERT(lovea != NULL);
1715                 info->lti_buf = *lovea;
1716         }
1717
1718         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
1719                                   XATTR_NAME_LOV, 0, th);
1720         if (rc)
1721                 GOTO(out, rc);
1722
1723         /*
1724          * if striping is created with local object's size > 0,
1725          * we have to propagate this size to specific object
1726          * the case is possible only when local object was created previously
1727          */
1728         if (dt_object_exists(next))
1729                 rc = lod_declare_init_size(env, dt, th);
1730
1731 out:
1732         RETURN(rc);
1733 }
1734
1735 int lod_dir_striping_create_internal(const struct lu_env *env,
1736                                      struct dt_object *dt,
1737                                      struct lu_attr *attr,
1738                                      const struct dt_object_format *dof,
1739                                      struct thandle *th,
1740                                      bool declare)
1741 {
1742         struct lod_thread_info  *info = lod_env_info(env);
1743         struct dt_object        *next = dt_object_child(dt);
1744         struct lod_object       *lo = lod_dt_obj(dt);
1745         int                     rc;
1746         ENTRY;
1747
1748         if (lo->ldo_dir_def_striping_set &&
1749             !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1750                                  lo->ldo_dir_stripe_offset)) {
1751                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1752                 int stripe_count = lo->ldo_stripenr + 1;
1753
1754                 if (info->lti_ea_store_size < sizeof(*v1)) {
1755                         rc = lod_ea_store_resize(info, sizeof(*v1));
1756                         if (rc != 0)
1757                                 RETURN(rc);
1758                         v1 = info->lti_ea_store;
1759                 }
1760
1761                 memset(v1, 0, sizeof(*v1));
1762                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
1763                 v1->lum_stripe_count = cpu_to_le32(stripe_count);
1764                 v1->lum_stripe_offset =
1765                                 cpu_to_le32(lo->ldo_dir_stripe_offset);
1766
1767                 info->lti_buf.lb_buf = v1;
1768                 info->lti_buf.lb_len = sizeof(*v1);
1769
1770                 if (declare)
1771                         rc = lod_declare_xattr_set_lmv(env, dt, attr,
1772                                                        &info->lti_buf, th);
1773                 else
1774                         rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
1775                                                XATTR_NAME_LMV, 0, th,
1776                                                BYPASS_CAPA);
1777                 if (rc != 0)
1778                         RETURN(rc);
1779         }
1780
1781         /* Transfer default LMV striping from the parent */
1782         if (lo->ldo_dir_striping_cached &&
1783             !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
1784                                  lo->ldo_dir_def_stripe_offset)) {
1785                 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1786                 int def_stripe_count = lo->ldo_dir_def_stripenr + 1;
1787
1788                 if (info->lti_ea_store_size < sizeof(*v1)) {
1789                         rc = lod_ea_store_resize(info, sizeof(*v1));
1790                         if (rc != 0)
1791                                 RETURN(rc);
1792                         v1 = info->lti_ea_store;
1793                 }
1794
1795                 memset(v1, 0, sizeof(*v1));
1796                 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
1797                 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
1798                 v1->lum_stripe_offset =
1799                                 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
1800                 v1->lum_hash_type =
1801                                 cpu_to_le32(lo->ldo_dir_def_hash_type);
1802
1803                 info->lti_buf.lb_buf = v1;
1804                 info->lti_buf.lb_len = sizeof(*v1);
1805                 if (declare)
1806                         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
1807                                                   XATTR_NAME_DEFAULT_LMV, 0,
1808                                                   th);
1809                 else
1810                         rc = dt_xattr_set(env, next, &info->lti_buf,
1811                                            XATTR_NAME_DEFAULT_LMV, 0, th,
1812                                            BYPASS_CAPA);
1813                 if (rc != 0)
1814                         RETURN(rc);
1815         }
1816
1817         /* Transfer default LOV striping from the parent */
1818         if (lo->ldo_striping_cached &&
1819             !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1820                                  lo->ldo_def_stripenr,
1821                                  lo->ldo_def_stripe_offset)) {
1822                 struct lov_user_md_v3 *v3 = info->lti_ea_store;
1823
1824                 if (info->lti_ea_store_size < sizeof(*v3)) {
1825                         rc = lod_ea_store_resize(info, sizeof(*v3));
1826                         if (rc != 0)
1827                                 RETURN(rc);
1828                         v3 = info->lti_ea_store;
1829                 }
1830
1831                 memset(v3, 0, sizeof(*v3));
1832                 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1833                 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
1834                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
1835                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
1836                 if (lo->ldo_pool)
1837                         strncpy(v3->lmm_pool_name, lo->ldo_pool,
1838                                 LOV_MAXPOOLNAME);
1839
1840                 info->lti_buf.lb_buf = v3;
1841                 info->lti_buf.lb_len = sizeof(*v3);
1842
1843                 if (declare)
1844                         rc = dt_declare_xattr_set(env, next, &info->lti_buf,
1845                                                   XATTR_NAME_LOV, 0, th);
1846                 else
1847                         rc = dt_xattr_set(env, next, &info->lti_buf,
1848                                           XATTR_NAME_LOV, 0, th,
1849                                           BYPASS_CAPA);
1850                 if (rc != 0)
1851                         RETURN(rc);
1852         }
1853
1854         RETURN(0);
1855 }
1856
1857 static int lod_declare_dir_striping_create(const struct lu_env *env,
1858                                            struct dt_object *dt,
1859                                            struct lu_attr *attr,
1860                                            struct dt_object_format *dof,
1861                                            struct thandle *th)
1862 {
1863         return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
1864 }
1865
1866 static int lod_dir_striping_create(const struct lu_env *env,
1867                                    struct dt_object *dt,
1868                                    struct lu_attr *attr,
1869                                    struct dt_object_format *dof,
1870                                    struct thandle *th)
1871 {
1872         return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
1873 }
1874
1875 static int lod_declare_object_create(const struct lu_env *env,
1876                                      struct dt_object *dt,
1877                                      struct lu_attr *attr,
1878                                      struct dt_allocation_hint *hint,
1879                                      struct dt_object_format *dof,
1880                                      struct thandle *th)
1881 {
1882         struct dt_object   *next = dt_object_child(dt);
1883         struct lod_object  *lo = lod_dt_obj(dt);
1884         int                 rc;
1885         ENTRY;
1886
1887         LASSERT(dof);
1888         LASSERT(attr);
1889         LASSERT(th);
1890
1891         /*
1892          * first of all, we declare creation of local object
1893          */
1894         rc = dt_declare_create(env, next, attr, hint, dof, th);
1895         if (rc)
1896                 GOTO(out, rc);
1897
1898         if (dof->dof_type == DFT_SYM)
1899                 dt->do_body_ops = &lod_body_lnk_ops;
1900
1901         /*
1902          * it's lod_ah_init() who has decided the object will striped
1903          */
1904         if (dof->dof_type == DFT_REGULAR) {
1905                 /* callers don't want stripes */
1906                 /* XXX: all tricky interactions with ->ah_make_hint() decided
1907                  * to use striping, then ->declare_create() behaving differently
1908                  * should be cleaned */
1909                 if (dof->u.dof_reg.striped == 0)
1910                         lo->ldo_stripenr = 0;
1911                 if (lo->ldo_stripenr > 0)
1912                         rc = lod_declare_striped_object(env, dt, attr,
1913                                                         NULL, th);
1914         } else if (dof->dof_type == DFT_DIR) {
1915                 /* Orphan object (like migrating object) does not have
1916                  * lod_dir_stripe, see lod_ah_init */
1917                 if (lo->ldo_dir_stripe != NULL)
1918                         rc = lod_declare_dir_striping_create(env, dt, attr,
1919                                                              dof, th);
1920         }
1921 out:
1922         RETURN(rc);
1923 }
1924
1925 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
1926                         struct lu_attr *attr, struct dt_object_format *dof,
1927                         struct thandle *th)
1928 {
1929         struct lod_object *lo = lod_dt_obj(dt);
1930         int                rc = 0, i;
1931         ENTRY;
1932
1933         LASSERT(lo->ldo_striping_cached == 0);
1934
1935         /* create all underlying objects */
1936         for (i = 0; i < lo->ldo_stripenr; i++) {
1937                 LASSERT(lo->ldo_stripe[i]);
1938                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
1939
1940                 if (rc)
1941                         break;
1942         }
1943         if (rc == 0)
1944                 rc = lod_generate_and_set_lovea(env, lo, th);
1945
1946         RETURN(rc);
1947 }
1948
1949 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
1950                              struct lu_attr *attr,
1951                              struct dt_allocation_hint *hint,
1952                              struct dt_object_format *dof, struct thandle *th)
1953 {
1954         struct dt_object   *next = dt_object_child(dt);
1955         struct lod_object  *lo = lod_dt_obj(dt);
1956         int                 rc;
1957         ENTRY;
1958
1959         /* create local object */
1960         rc = dt_create(env, next, attr, hint, dof, th);
1961
1962         if (rc == 0) {
1963                 if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
1964                     lo->ldo_dir_stripe != NULL)
1965                         rc = lod_dir_striping_create(env, dt, attr, dof, th);
1966                 else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0)
1967                         rc = lod_striping_create(env, dt, attr, dof, th);
1968         }
1969
1970         RETURN(rc);
1971 }
1972
1973 static int lod_declare_object_destroy(const struct lu_env *env,
1974                                       struct dt_object *dt,
1975                                       struct thandle *th)
1976 {
1977         struct dt_object   *next = dt_object_child(dt);
1978         struct lod_object  *lo = lod_dt_obj(dt);
1979         int                 rc, i;
1980         ENTRY;
1981
1982         /*
1983          * we declare destroy for the local object
1984          */
1985         rc = dt_declare_destroy(env, next, th);
1986         if (rc)
1987                 RETURN(rc);
1988
1989         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
1990                 RETURN(0);
1991
1992         /*
1993          * load striping information, notice we don't do this when object
1994          * is being initialized as we don't need this information till
1995          * few specific cases like destroy, chown
1996          */
1997         rc = lod_load_striping(env, lo);
1998         if (rc)
1999                 RETURN(rc);
2000
2001         /* declare destroy for all underlying objects */
2002         for (i = 0; i < lo->ldo_stripenr; i++) {
2003                 LASSERT(lo->ldo_stripe[i]);
2004                 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
2005
2006                 if (rc)
2007                         break;
2008         }
2009
2010         RETURN(rc);
2011 }
2012
2013 static int lod_object_destroy(const struct lu_env *env,
2014                 struct dt_object *dt, struct thandle *th)
2015 {
2016         struct dt_object  *next = dt_object_child(dt);
2017         struct lod_object *lo = lod_dt_obj(dt);
2018         int                rc, i;
2019         ENTRY;
2020
2021         /* destroy local object */
2022         rc = dt_destroy(env, next, th);
2023         if (rc)
2024                 RETURN(rc);
2025
2026         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
2027                 RETURN(0);
2028
2029         /* destroy all underlying objects */
2030         for (i = 0; i < lo->ldo_stripenr; i++) {
2031                 LASSERT(lo->ldo_stripe[i]);
2032                 /* for striped directory, next == ldo_stripe[0] */
2033                 if (next != lo->ldo_stripe[i]) {
2034                         rc = dt_destroy(env, lo->ldo_stripe[i], th);
2035                         if (rc)
2036                                 break;
2037                 }
2038         }
2039
2040         RETURN(rc);
2041 }
2042
2043 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
2044                          const struct dt_index_features *feat)
2045 {
2046         struct dt_object *next = dt_object_child(dt);
2047         int               rc;
2048         ENTRY;
2049
2050         LASSERT(next->do_ops);
2051         LASSERT(next->do_ops->do_index_try);
2052
2053         rc = next->do_ops->do_index_try(env, next, feat);
2054         if (next->do_index_ops && dt->do_index_ops == NULL)
2055                 dt->do_index_ops = &lod_index_ops;
2056
2057         RETURN(rc);
2058 }
2059
2060 static int lod_declare_ref_add(const struct lu_env *env,
2061                                struct dt_object *dt, struct thandle *th)
2062 {
2063         return dt_declare_ref_add(env, dt_object_child(dt), th);
2064 }
2065
2066 static int lod_ref_add(const struct lu_env *env,
2067                        struct dt_object *dt, struct thandle *th)
2068 {
2069         return dt_ref_add(env, dt_object_child(dt), th);
2070 }
2071
2072 static int lod_declare_ref_del(const struct lu_env *env,
2073                                struct dt_object *dt, struct thandle *th)
2074 {
2075         return dt_declare_ref_del(env, dt_object_child(dt), th);
2076 }
2077
2078 static int lod_ref_del(const struct lu_env *env,
2079                        struct dt_object *dt, struct thandle *th)
2080 {
2081         return dt_ref_del(env, dt_object_child(dt), th);
2082 }
2083
2084 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2085                                      struct dt_object *dt,
2086                                      struct lustre_capa *old, __u64 opc)
2087 {
2088         return dt_capa_get(env, dt_object_child(dt), old, opc);
2089 }
2090
2091 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
2092 {
2093         return dt_object_sync(env, dt_object_child(dt));
2094 }
2095
2096 struct lod_slave_locks  {
2097         int                     lsl_lock_count;
2098         struct lustre_handle    lsl_handle[0];
2099 };
2100
2101 static int lod_object_unlock_internal(const struct lu_env *env,
2102                                       struct dt_object *dt,
2103                                       struct ldlm_enqueue_info *einfo,
2104                                       ldlm_policy_data_t *policy)
2105 {
2106         struct lod_object       *lo = lod_dt_obj(dt);
2107         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2108         int                     rc = 0;
2109         int                     i;
2110         ENTRY;
2111
2112         if (slave_locks == NULL)
2113                 RETURN(0);
2114
2115         for (i = 0; i < slave_locks->lsl_lock_count; i++) {
2116                 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2117                         int     rc1;
2118
2119                         einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2120                         rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2121                                                policy);
2122                         if (rc1 < 0)
2123                                 rc = rc == 0 ? rc1 : rc;
2124                 }
2125         }
2126
2127         RETURN(rc);
2128 }
2129
2130 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2131                              struct ldlm_enqueue_info *einfo,
2132                              union ldlm_policy_data *policy)
2133 {
2134         struct lod_object       *lo = lod_dt_obj(dt);
2135         struct lod_slave_locks  *slave_locks = einfo->ei_cbdata;
2136         int                     slave_locks_size;
2137         int                     rc;
2138         ENTRY;
2139
2140         if (slave_locks == NULL)
2141                 RETURN(0);
2142
2143         rc = lod_load_striping(env, lo);
2144         if (rc != 0)
2145                 RETURN(rc);
2146
2147         /* Note: for remote lock for single stripe dir, MDT will cancel
2148          * the lock by lockh directly */
2149         if (lo->ldo_stripenr == 0 && dt_object_remote(dt_object_child(dt)))
2150                 RETURN(0);
2151
2152         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2153                 RETURN(-ENOTDIR);
2154
2155         /* Only cancel slave lock for striped dir */
2156         rc = lod_object_unlock_internal(env, dt, einfo, policy);
2157
2158         slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2159                            sizeof(slave_locks->lsl_handle[0]);
2160         OBD_FREE(slave_locks, slave_locks_size);
2161         einfo->ei_cbdata = NULL;
2162
2163         RETURN(rc);
2164 }
2165
2166 static int lod_object_lock(const struct lu_env *env,
2167                            struct dt_object *dt,
2168                            struct lustre_handle *lh,
2169                            struct ldlm_enqueue_info *einfo,
2170                            union ldlm_policy_data *policy)
2171 {
2172         struct lod_object       *lo = lod_dt_obj(dt);
2173         int                     rc = 0;
2174         int                     i;
2175         int                     slave_locks_size;
2176         struct lod_slave_locks  *slave_locks = NULL;
2177         ENTRY;
2178
2179         /* remote object lock */
2180         if (!einfo->ei_enq_slave) {
2181                 LASSERT(dt_object_remote(dt));
2182                 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2183                                       policy);
2184         }
2185
2186         if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2187                 RETURN(-ENOTDIR);
2188
2189         rc = lod_load_striping(env, lo);
2190         if (rc != 0)
2191                 RETURN(rc);
2192
2193         /* No stripes */
2194         if (lo->ldo_stripenr == 0)
2195                 RETURN(0);
2196
2197         slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2198                            sizeof(slave_locks->lsl_handle[0]);
2199         /* Freed in lod_object_unlock */
2200         OBD_ALLOC(slave_locks, slave_locks_size);
2201         if (slave_locks == NULL)
2202                 RETURN(-ENOMEM);
2203         slave_locks->lsl_lock_count = lo->ldo_stripenr;
2204
2205         /* striped directory lock */
2206         for (i = 0; i < lo->ldo_stripenr; i++) {
2207                 struct lustre_handle    lockh;
2208                 struct ldlm_res_id      *res_id;
2209
2210                 res_id = &lod_env_info(env)->lti_res_id;
2211                 fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
2212                                        res_id);
2213                 einfo->ei_res_id = res_id;
2214
2215                 LASSERT(lo->ldo_stripe[i]);
2216                 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
2217                                     policy);
2218                 if (rc != 0)
2219                         GOTO(out, rc);
2220
2221                 slave_locks->lsl_handle[i] = lockh;
2222         }
2223
2224         einfo->ei_cbdata = slave_locks;
2225
2226 out:
2227         if (rc != 0 && slave_locks != NULL) {
2228                 einfo->ei_cbdata = slave_locks;
2229                 lod_object_unlock_internal(env, dt, einfo, policy);
2230                 OBD_FREE(slave_locks, slave_locks_size);
2231                 einfo->ei_cbdata = NULL;
2232         }
2233
2234         RETURN(rc);
2235 }
2236
2237 struct dt_object_operations lod_obj_ops = {
2238         .do_read_lock           = lod_object_read_lock,
2239         .do_write_lock          = lod_object_write_lock,
2240         .do_read_unlock         = lod_object_read_unlock,
2241         .do_write_unlock        = lod_object_write_unlock,
2242         .do_write_locked        = lod_object_write_locked,
2243         .do_attr_get            = lod_attr_get,
2244         .do_declare_attr_set    = lod_declare_attr_set,
2245         .do_attr_set            = lod_attr_set,
2246         .do_xattr_get           = lod_xattr_get,
2247         .do_declare_xattr_set   = lod_declare_xattr_set,
2248         .do_xattr_set           = lod_xattr_set,
2249         .do_declare_xattr_del   = lod_declare_xattr_del,
2250         .do_xattr_del           = lod_xattr_del,
2251         .do_xattr_list          = lod_xattr_list,
2252         .do_ah_init             = lod_ah_init,
2253         .do_declare_create      = lod_declare_object_create,
2254         .do_create              = lod_object_create,
2255         .do_declare_destroy     = lod_declare_object_destroy,
2256         .do_destroy             = lod_object_destroy,
2257         .do_index_try           = lod_index_try,
2258         .do_declare_ref_add     = lod_declare_ref_add,
2259         .do_ref_add             = lod_ref_add,
2260         .do_declare_ref_del     = lod_declare_ref_del,
2261         .do_ref_del             = lod_ref_del,
2262         .do_capa_get            = lod_capa_get,
2263         .do_object_sync         = lod_object_sync,
2264         .do_object_lock         = lod_object_lock,
2265         .do_object_unlock       = lod_object_unlock,
2266 };
2267
2268 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
2269                         struct lu_buf *buf, loff_t *pos,
2270                         struct lustre_capa *capa)
2271 {
2272         struct dt_object *next = dt_object_child(dt);
2273         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
2274 }
2275
2276 static ssize_t lod_declare_write(const struct lu_env *env,
2277                                  struct dt_object *dt,
2278                                  const struct lu_buf *buf, loff_t pos,
2279                                  struct thandle *th)
2280 {
2281         return dt_declare_record_write(env, dt_object_child(dt),
2282                                        buf, pos, th);
2283 }
2284
2285 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
2286                          const struct lu_buf *buf, loff_t *pos,
2287                          struct thandle *th, struct lustre_capa *capa, int iq)
2288 {
2289         struct dt_object *next = dt_object_child(dt);
2290         LASSERT(next);
2291         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
2292 }
2293
2294 static const struct dt_body_operations lod_body_lnk_ops = {
2295         .dbo_read               = lod_read,
2296         .dbo_declare_write      = lod_declare_write,
2297         .dbo_write              = lod_write
2298 };
2299
2300 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
2301                            const struct lu_object_conf *conf)
2302 {
2303         struct lod_device       *lod    = lu2lod_dev(lo->lo_dev);
2304         struct lu_device        *cdev   = NULL;
2305         struct lu_object        *cobj;
2306         struct lod_tgt_descs    *ltd    = NULL;
2307         struct lod_tgt_desc     *tgt;
2308         mdsno_t                  idx    = 0;
2309         int                      type   = LU_SEQ_RANGE_ANY;
2310         int                      rc;
2311         ENTRY;
2312
2313         rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
2314         if (rc != 0)
2315                 RETURN(rc);
2316
2317         if (type == LU_SEQ_RANGE_MDT &&
2318             idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
2319                 cdev = &lod->lod_child->dd_lu_dev;
2320         } else if (type == LU_SEQ_RANGE_MDT) {
2321                 ltd = &lod->lod_mdt_descs;
2322                 lod_getref(ltd);
2323         } else if (type == LU_SEQ_RANGE_OST) {
2324                 ltd = &lod->lod_ost_descs;
2325                 lod_getref(ltd);
2326         } else {
2327                 LBUG();
2328         }
2329
2330         if (ltd != NULL) {
2331                 if (ltd->ltd_tgts_size > idx &&
2332                     cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
2333                         tgt = LTD_TGT(ltd, idx);
2334
2335                         LASSERT(tgt != NULL);
2336                         LASSERT(tgt->ltd_tgt != NULL);
2337
2338                         cdev = &(tgt->ltd_tgt->dd_lu_dev);
2339                 }
2340                 lod_putref(lod, ltd);
2341         }
2342
2343         if (unlikely(cdev == NULL))
2344                 RETURN(-ENOENT);
2345
2346         cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
2347         if (unlikely(cobj == NULL))
2348                 RETURN(-ENOMEM);
2349
2350         lu_object_add(lo, cobj);
2351
2352         RETURN(0);
2353 }
2354
2355 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
2356 {
2357         int i;
2358
2359         if (lo->ldo_dir_stripe != NULL) {
2360                 OBD_FREE_PTR(lo->ldo_dir_stripe);
2361                 lo->ldo_dir_stripe = NULL;
2362         }
2363
2364         if (lo->ldo_stripe) {
2365                 LASSERT(lo->ldo_stripes_allocated > 0);
2366
2367                 for (i = 0; i < lo->ldo_stripenr; i++) {
2368                         if (lo->ldo_stripe[i])
2369                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
2370                 }
2371
2372                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
2373                 OBD_FREE(lo->ldo_stripe, i);
2374                 lo->ldo_stripe = NULL;
2375                 lo->ldo_stripes_allocated = 0;
2376         }
2377         lo->ldo_stripenr = 0;
2378         lo->ldo_pattern = 0;
2379 }
2380
2381 /*
2382  * ->start is called once all slices are initialized, including header's
2383  * cache for mode (object type). using the type we can initialize ops
2384  */
2385 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
2386 {
2387         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
2388                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
2389         return 0;
2390 }
2391
2392 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
2393 {
2394         struct lod_object *mo = lu2lod_obj(o);
2395
2396         /*
2397          * release all underlying object pinned
2398          */
2399
2400         lod_object_free_striping(env, mo);
2401
2402         lod_object_set_pool(mo, NULL);
2403
2404         lu_object_fini(o);
2405         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
2406 }
2407
2408 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
2409 {
2410         /* XXX: shouldn't we release everything here in case if object
2411          * creation failed before? */
2412 }
2413
2414 static int lod_object_print(const struct lu_env *env, void *cookie,
2415                             lu_printer_t p, const struct lu_object *l)
2416 {
2417         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
2418
2419         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
2420 }
2421
2422 struct lu_object_operations lod_lu_obj_ops = {
2423         .loo_object_init        = lod_object_init,
2424         .loo_object_start       = lod_object_start,
2425         .loo_object_free        = lod_object_free,
2426         .loo_object_release     = lod_object_release,
2427         .loo_object_print       = lod_object_print,
2428 };