Whamcloud - gitweb
LU-3569 ofd: packing ost_idx in IDIF
[fs/lustre-release.git] / lustre / lod / lod_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2013, Intel Corporation.
27  */
28 /*
29  * lustre/lod/lod_object.c
30  *
31  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
41
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <obd_lov.h>
46
47 #include "lod_internal.h"
48
49 extern struct kmem_cache *lod_object_kmem;
50 static const struct dt_body_operations lod_body_lnk_ops;
51
52 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
53                             struct dt_rec *rec, const struct dt_key *key,
54                             struct lustre_capa *capa)
55 {
56         struct dt_object *next = dt_object_child(dt);
57         return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
58 }
59
60 static int lod_declare_index_insert(const struct lu_env *env,
61                                     struct dt_object *dt,
62                                     const struct dt_rec *rec,
63                                     const struct dt_key *key,
64                                     struct thandle *handle)
65 {
66         return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
67 }
68
69 static int lod_index_insert(const struct lu_env *env,
70                             struct dt_object *dt,
71                             const struct dt_rec *rec,
72                             const struct dt_key *key,
73                             struct thandle *th,
74                             struct lustre_capa *capa,
75                             int ign)
76 {
77         return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
78 }
79
80 static int lod_declare_index_delete(const struct lu_env *env,
81                                     struct dt_object *dt,
82                                     const struct dt_key *key,
83                                     struct thandle *th)
84 {
85         return dt_declare_delete(env, dt_object_child(dt), key, th);
86 }
87
88 static int lod_index_delete(const struct lu_env *env,
89                             struct dt_object *dt,
90                             const struct dt_key *key,
91                             struct thandle *th,
92                             struct lustre_capa *capa)
93 {
94         return dt_delete(env, dt_object_child(dt), key, th, capa);
95 }
96
97 static struct dt_it *lod_it_init(const struct lu_env *env,
98                                  struct dt_object *dt, __u32 attr,
99                                  struct lustre_capa *capa)
100 {
101         struct dt_object        *next = dt_object_child(dt);
102         struct lod_it           *it = &lod_env_info(env)->lti_it;
103         struct dt_it            *it_next;
104
105
106         it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
107         if (IS_ERR(it_next))
108                 return it_next;
109
110         /* currently we do not use more than one iterator per thread
111          * so we store it in thread info. if at some point we need
112          * more active iterators in a single thread, we can allocate
113          * additional ones */
114         LASSERT(it->lit_obj == NULL);
115
116         it->lit_it = it_next;
117         it->lit_obj = next;
118
119         return (struct dt_it *)it;
120 }
121
122 #define LOD_CHECK_IT(env, it)                                   \
123 {                                                               \
124         LASSERT((it)->lit_obj != NULL);                         \
125         LASSERT((it)->lit_it != NULL);                          \
126 } while(0)
127
128 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
129 {
130         struct lod_it *it = (struct lod_it *)di;
131
132         LOD_CHECK_IT(env, it);
133         it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
134
135         /* the iterator not in use any more */
136         it->lit_obj = NULL;
137         it->lit_it = NULL;
138 }
139
140 int lod_it_get(const struct lu_env *env, struct dt_it *di,
141                const struct dt_key *key)
142 {
143         const struct lod_it *it = (const struct lod_it *)di;
144
145         LOD_CHECK_IT(env, it);
146         return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
147 }
148
149 void lod_it_put(const struct lu_env *env, struct dt_it *di)
150 {
151         struct lod_it *it = (struct lod_it *)di;
152
153         LOD_CHECK_IT(env, it);
154         return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
155 }
156
157 int lod_it_next(const struct lu_env *env, struct dt_it *di)
158 {
159         struct lod_it *it = (struct lod_it *)di;
160
161         LOD_CHECK_IT(env, it);
162         return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
163 }
164
165 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
166 {
167         const struct lod_it *it = (const struct lod_it *)di;
168
169         LOD_CHECK_IT(env, it);
170         return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
171 }
172
173 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
174 {
175         struct lod_it *it = (struct lod_it *)di;
176
177         LOD_CHECK_IT(env, it);
178         return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
179 }
180
181 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
182                struct dt_rec *rec, __u32 attr)
183 {
184         const struct lod_it *it = (const struct lod_it *)di;
185
186         LOD_CHECK_IT(env, it);
187         return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
188 }
189
190 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
191 {
192         const struct lod_it *it = (const struct lod_it *)di;
193
194         LOD_CHECK_IT(env, it);
195         return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
196 }
197
198 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
199 {
200         const struct lod_it *it = (const struct lod_it *)di;
201
202         LOD_CHECK_IT(env, it);
203         return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
204 }
205
206 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
207                    void* key_rec)
208 {
209         const struct lod_it *it = (const struct lod_it *)di;
210
211         LOD_CHECK_IT(env, it);
212         return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec);
213 }
214
215 static struct dt_index_operations lod_index_ops = {
216         .dio_lookup             = lod_index_lookup,
217         .dio_declare_insert     = lod_declare_index_insert,
218         .dio_insert             = lod_index_insert,
219         .dio_declare_delete     = lod_declare_index_delete,
220         .dio_delete             = lod_index_delete,
221         .dio_it = {
222                 .init           = lod_it_init,
223                 .fini           = lod_it_fini,
224                 .get            = lod_it_get,
225                 .put            = lod_it_put,
226                 .next           = lod_it_next,
227                 .key            = lod_it_key,
228                 .key_size       = lod_it_key_size,
229                 .rec            = lod_it_rec,
230                 .store          = lod_it_store,
231                 .load           = lod_it_load,
232                 .key_rec        = lod_it_key_rec,
233         }
234 };
235
236 static void lod_object_read_lock(const struct lu_env *env,
237                                  struct dt_object *dt, unsigned role)
238 {
239         dt_read_lock(env, dt_object_child(dt), role);
240 }
241
242 static void lod_object_write_lock(const struct lu_env *env,
243                                   struct dt_object *dt, unsigned role)
244 {
245         dt_write_lock(env, dt_object_child(dt), role);
246 }
247
248 static void lod_object_read_unlock(const struct lu_env *env,
249                                    struct dt_object *dt)
250 {
251         dt_read_unlock(env, dt_object_child(dt));
252 }
253
254 static void lod_object_write_unlock(const struct lu_env *env,
255                                     struct dt_object *dt)
256 {
257         dt_write_unlock(env, dt_object_child(dt));
258 }
259
260 static int lod_object_write_locked(const struct lu_env *env,
261                                    struct dt_object *dt)
262 {
263         return dt_write_locked(env, dt_object_child(dt));
264 }
265
266 static int lod_attr_get(const struct lu_env *env,
267                         struct dt_object *dt,
268                         struct lu_attr *attr,
269                         struct lustre_capa *capa)
270 {
271         return dt_attr_get(env, dt_object_child(dt), attr, capa);
272 }
273
274 static int lod_declare_attr_set(const struct lu_env *env,
275                                 struct dt_object *dt,
276                                 const struct lu_attr *attr,
277                                 struct thandle *handle)
278 {
279         struct dt_object  *next = dt_object_child(dt);
280         struct lod_object *lo = lod_dt_obj(dt);
281         int                rc, i;
282         ENTRY;
283
284         /*
285          * declare setattr on the local object
286          */
287         rc = dt_declare_attr_set(env, next, attr, handle);
288         if (rc)
289                 RETURN(rc);
290
291         /* osp_declare_attr_set() ignores all attributes other than
292          * UID, GID, and size, and osp_attr_set() ignores all but UID
293          * and GID.  Declaration of size attr setting happens through
294          * lod_declare_init_size(), and not through this function.
295          * Therefore we need not load striping unless ownership is
296          * changing.  This should save memory and (we hope) speed up
297          * rename(). */
298         if (!(attr->la_valid & (LA_UID | LA_GID)))
299                 RETURN(rc);
300
301         /*
302          * load striping information, notice we don't do this when object
303          * is being initialized as we don't need this information till
304          * few specific cases like destroy, chown
305          */
306         rc = lod_load_striping(env, lo);
307         if (rc)
308                 RETURN(rc);
309
310         /*
311          * if object is striped declare changes on the stripes
312          */
313         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
314         for (i = 0; i < lo->ldo_stripenr; i++) {
315                 LASSERT(lo->ldo_stripe[i]);
316                 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
317                 if (rc) {
318                         CERROR("failed declaration: %d\n", rc);
319                         break;
320                 }
321         }
322
323         RETURN(rc);
324 }
325
326 static int lod_attr_set(const struct lu_env *env,
327                         struct dt_object *dt,
328                         const struct lu_attr *attr,
329                         struct thandle *handle,
330                         struct lustre_capa *capa)
331 {
332         struct dt_object  *next = dt_object_child(dt);
333         struct lod_object *lo = lod_dt_obj(dt);
334         int                rc, i;
335         ENTRY;
336
337         /*
338          * apply changes to the local object
339          */
340         rc = dt_attr_set(env, next, attr, handle, capa);
341         if (rc)
342                 RETURN(rc);
343
344         if (!(attr->la_valid & (LA_UID | LA_GID)))
345                 RETURN(rc);
346
347         /*
348          * if object is striped, apply changes to all the stripes
349          */
350         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
351         for (i = 0; i < lo->ldo_stripenr; i++) {
352                 LASSERT(lo->ldo_stripe[i]);
353                 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
354                 if (rc) {
355                         CERROR("failed declaration: %d\n", rc);
356                         break;
357                 }
358         }
359
360         RETURN(rc);
361 }
362
363 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
364                          struct lu_buf *buf, const char *name,
365                          struct lustre_capa *capa)
366 {
367         struct lod_thread_info  *info = lod_env_info(env);
368         struct lod_device       *dev = lu2lod_dev(dt->do_lu.lo_dev);
369         int                      rc, is_root;
370         ENTRY;
371
372         rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
373         if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
374                 RETURN(rc);
375
376         /*
377          * lod returns default striping on the real root of the device
378          * this is like the root stores default striping for the whole
379          * filesystem. historically we've been using a different approach
380          * and store it in the config.
381          */
382         dt_root_get(env, dev->lod_child, &info->lti_fid);
383         is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
384
385         if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
386                 struct lov_user_md *lum = buf->lb_buf;
387                 struct lov_desc    *desc = &dev->lod_desc;
388
389                 if (buf->lb_buf == NULL) {
390                         rc = sizeof(*lum);
391                 } else if (buf->lb_len >= sizeof(*lum)) {
392                         lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
393                         lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
394                         lmm_oi_set_id(&lum->lmm_oi, 0);
395                         lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
396                         lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
397                         lum->lmm_stripe_size = cpu_to_le32(
398                                                 desc->ld_default_stripe_size);
399                         lum->lmm_stripe_count = cpu_to_le16(
400                                                 desc->ld_default_stripe_count);
401                         lum->lmm_stripe_offset = cpu_to_le16(
402                                                 desc->ld_default_stripe_offset);
403                         rc = sizeof(*lum);
404                 } else {
405                         rc = -ERANGE;
406                 }
407         }
408
409         RETURN(rc);
410 }
411
412 /*
413  * LOV xattr is a storage for striping, and LOD owns this xattr.
414  * but LOD allows others to control striping to some extent
415  * - to reset strping
416  * - to set new defined striping
417  * - to set new semi-defined striping
418  *   - number of stripes is defined
419  *   - number of stripes + osts are defined
420  *   - ??
421  */
422 static int lod_declare_xattr_set(const struct lu_env *env,
423                                  struct dt_object *dt,
424                                  const struct lu_buf *buf,
425                                  const char *name, int fl,
426                                  struct thandle *th)
427 {
428         struct dt_object *next = dt_object_child(dt);
429         struct lu_attr   *attr = &lod_env_info(env)->lti_attr;
430         __u32             mode;
431         int               rc;
432         ENTRY;
433
434         /*
435          * allow to declare predefined striping on a new (!mode) object
436          * which is supposed to be replay of regular file creation
437          * (when LOV setting is declared)
438          * LU_XATTR_REPLACE is set to indicate a layout swap
439          */
440         mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
441         if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV) &&
442              !(fl & LU_XATTR_REPLACE)) {
443                 /*
444                  * this is a request to manipulate object's striping
445                  */
446                 if (dt_object_exists(dt)) {
447                         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
448                         if (rc)
449                                 RETURN(rc);
450                 } else {
451                         memset(attr, 0, sizeof(*attr));
452                         attr->la_valid = LA_TYPE | LA_MODE;
453                         attr->la_mode = S_IFREG;
454                 }
455                 rc = lod_declare_striped_object(env, dt, attr, buf, th);
456                 if (rc)
457                         RETURN(rc);
458         }
459
460         rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
461
462         RETURN(rc);
463 }
464
465 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
466                                     struct dt_object *dt,
467                                     const struct lu_buf *buf,
468                                     const char *name, int fl,
469                                     struct thandle *th,
470                                     struct lustre_capa *capa)
471 {
472         struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
473         struct dt_object        *next = dt_object_child(dt);
474         struct lod_object       *l = lod_dt_obj(dt);
475         struct lov_user_md_v1   *lum;
476         struct lov_user_md_v3   *v3 = NULL;
477         int                      rc;
478         ENTRY;
479
480         LASSERT(l->ldo_stripe == NULL);
481         l->ldo_striping_cached = 0;
482         l->ldo_def_striping_set = 0;
483         lod_object_set_pool(l, NULL);
484         l->ldo_def_stripe_size = 0;
485         l->ldo_def_stripenr = 0;
486
487         LASSERT(buf != NULL && buf->lb_buf != NULL);
488         lum = buf->lb_buf;
489
490         rc = lod_verify_striping(d, buf, 0);
491         if (rc)
492                 RETURN(rc);
493
494         if (lum->lmm_magic == LOV_USER_MAGIC_V3)
495                 v3 = buf->lb_buf;
496
497         /* if { size, offset, count } = { 0, -1, 0 } and no pool
498          * (i.e. all default values specified) then delete default
499          * striping from dir. */
500         CDEBUG(D_OTHER,
501                 "set default striping: sz %u # %u offset %d %s %s\n",
502                 (unsigned)lum->lmm_stripe_size,
503                 (unsigned)lum->lmm_stripe_count,
504                 (int)lum->lmm_stripe_offset,
505                 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
506
507         if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
508                                 (lum->lmm_stripe_count),
509                                 (lum->lmm_stripe_offset)) &&
510                         lum->lmm_magic == LOV_USER_MAGIC_V1) {
511                 rc = dt_xattr_del(env, next, name, th, capa);
512                 if (rc == -ENODATA)
513                         rc = 0;
514         } else {
515                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
516         }
517
518         RETURN(rc);
519 }
520
521 static int lod_xattr_set(const struct lu_env *env,
522                          struct dt_object *dt, const struct lu_buf *buf,
523                          const char *name, int fl, struct thandle *th,
524                          struct lustre_capa *capa)
525 {
526         struct dt_object        *next = dt_object_child(dt);
527         __u32                    attr;
528         int                      rc;
529         ENTRY;
530
531         attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
532         if (S_ISDIR(attr)) {
533                 if (strncmp(name, XATTR_NAME_LOV, strlen(XATTR_NAME_LOV)) == 0)
534                         rc = lod_xattr_set_lov_on_dir(env, dt, buf, name,
535                                                       fl, th, capa);
536                 else
537                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
538
539         } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
540                 /* in case of lov EA swap, just set it
541                  * if not, it is a replay so check striping match what we
542                  * already have during req replay, declare_xattr_set()
543                  * defines striping, then create() does the work
544                 */
545                 if (fl & LU_XATTR_REPLACE) {
546                         /* free stripes, then update disk */
547                         lod_object_free_striping(env, lod_dt_obj(dt));
548                         rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
549                 } else {
550                         rc = lod_striping_create(env, dt, NULL, NULL, th);
551                 }
552                 RETURN(rc);
553         } else {
554                 /*
555                  * behave transparantly for all other EAs
556                  */
557                 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
558         }
559
560         RETURN(rc);
561 }
562
563 static int lod_declare_xattr_del(const struct lu_env *env,
564                                  struct dt_object *dt, const char *name,
565                                  struct thandle *th)
566 {
567         return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
568 }
569
570 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
571                          const char *name, struct thandle *th,
572                          struct lustre_capa *capa)
573 {
574         if (!strcmp(name, XATTR_NAME_LOV))
575                 lod_object_free_striping(env, lod_dt_obj(dt));
576         return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
577 }
578
579 static int lod_xattr_list(const struct lu_env *env,
580                           struct dt_object *dt, struct lu_buf *buf,
581                           struct lustre_capa *capa)
582 {
583         return dt_xattr_list(env, dt_object_child(dt), buf, capa);
584 }
585
586 int lod_object_set_pool(struct lod_object *o, char *pool)
587 {
588         int len;
589
590         if (o->ldo_pool) {
591                 len = strlen(o->ldo_pool);
592                 OBD_FREE(o->ldo_pool, len + 1);
593                 o->ldo_pool = NULL;
594         }
595         if (pool) {
596                 len = strlen(pool);
597                 OBD_ALLOC(o->ldo_pool, len + 1);
598                 if (o->ldo_pool == NULL)
599                         return -ENOMEM;
600                 strcpy(o->ldo_pool, pool);
601         }
602         return 0;
603 }
604
605 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
606 {
607         return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
608 }
609
610 static int lod_cache_parent_striping(const struct lu_env *env,
611                                      struct lod_object *lp)
612 {
613         struct lov_user_md_v1   *v1 = NULL;
614         struct lov_user_md_v3   *v3 = NULL;
615         int                      rc;
616         ENTRY;
617
618         /* dt_ah_init() is called from MDD without parent being write locked
619          * lock it here */
620         dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
621         if (lp->ldo_striping_cached)
622                 GOTO(unlock, rc = 0);
623
624         rc = lod_get_lov_ea(env, lp);
625         if (rc < 0)
626                 GOTO(unlock, rc);
627
628         if (rc < sizeof(struct lov_user_md)) {
629                 /* don't lookup for non-existing or invalid striping */
630                 lp->ldo_def_striping_set = 0;
631                 lp->ldo_striping_cached = 1;
632                 lp->ldo_def_stripe_size = 0;
633                 lp->ldo_def_stripenr = 0;
634                 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
635                 GOTO(unlock, rc = 0);
636         }
637
638         v1 = (struct lov_user_md_v1 *)lod_env_info(env)->lti_ea_store;
639         if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
640                 lustre_swab_lov_user_md_v1(v1);
641         else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
642                 lustre_swab_lov_user_md_v3(v3);
643
644         if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
645                 GOTO(unlock, rc = 0);
646
647         if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
648                 GOTO(unlock, rc = 0);
649
650         lp->ldo_def_stripenr = v1->lmm_stripe_count;
651         lp->ldo_def_stripe_size = v1->lmm_stripe_size;
652         lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
653         lp->ldo_striping_cached = 1;
654         lp->ldo_def_striping_set = 1;
655
656         if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
657                 /* XXX: sanity check here */
658                 v3 = (struct lov_user_md_v3 *) v1;
659                 if (v3->lmm_pool_name[0])
660                         lod_object_set_pool(lp, v3->lmm_pool_name);
661         }
662
663         CDEBUG(D_OTHER, "def. striping: # %d, sz %d, off %d %s%s on "DFID"\n",
664                lp->ldo_def_stripenr, lp->ldo_def_stripe_size,
665                lp->ldo_def_stripe_offset, v3 ? "from " : "",
666                v3 ? lp->ldo_pool : "", PFID(lu_object_fid(&lp->ldo_obj.do_lu)));
667
668         EXIT;
669 unlock:
670         dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
671         return rc;
672 }
673
674 /**
675  * used to transfer default striping data to the object being created
676  */
677 static void lod_ah_init(const struct lu_env *env,
678                         struct dt_allocation_hint *ah,
679                         struct dt_object *parent,
680                         struct dt_object *child,
681                         umode_t child_mode)
682 {
683         struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
684         struct dt_object  *nextp = NULL;
685         struct dt_object  *nextc;
686         struct lod_object *lp = NULL;
687         struct lod_object *lc;
688         struct lov_desc   *desc;
689         ENTRY;
690
691         LASSERT(child);
692
693         if (likely(parent)) {
694                 nextp = dt_object_child(parent);
695                 lp = lod_dt_obj(parent);
696         }
697
698         nextc = dt_object_child(child);
699         lc = lod_dt_obj(child);
700
701         LASSERT(lc->ldo_stripenr == 0);
702         LASSERT(lc->ldo_stripe == NULL);
703
704         /*
705          * local object may want some hints
706          * in case of late striping creation, ->ah_init()
707          * can be called with local object existing
708          */
709         if (!dt_object_exists(nextc) || dt_object_remote(nextc))
710                 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
711                                           NULL : nextp, nextc, child_mode);
712
713         if (S_ISDIR(child_mode)) {
714                 if (lp->ldo_striping_cached == 0) {
715                         /* we haven't tried to get default striping for
716                          * the directory yet, let's cache it in the object */
717                         lod_cache_parent_striping(env, lp);
718                 }
719                 /* transfer defaults to new directory */
720                 if (lp->ldo_striping_cached) {
721                         if (lp->ldo_pool)
722                                 lod_object_set_pool(lc, lp->ldo_pool);
723                         lc->ldo_def_stripenr = lp->ldo_def_stripenr;
724                         lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
725                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
726                         lc->ldo_striping_cached = 1;
727                         lc->ldo_def_striping_set = 1;
728                         CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
729                                (int)lc->ldo_def_stripenr,
730                                (int)lc->ldo_def_stripe_size,
731                                (int)lc->ldo_def_stripe_offset);
732                 }
733                 return;
734         }
735
736         /*
737          * if object is going to be striped over OSTs, transfer default
738          * striping information to the child, so that we can use it
739          * during declaration and creation
740          */
741         if (!lod_object_will_be_striped(S_ISREG(child_mode),
742                                         lu_object_fid(&child->do_lu)))
743                 return;
744
745         /*
746          * try from the parent
747          */
748         if (likely(parent)) {
749                 if (lp->ldo_striping_cached == 0) {
750                         /* we haven't tried to get default striping for
751                          * the directory yet, let's cache it in the object */
752                         lod_cache_parent_striping(env, lp);
753                 }
754
755                 lc->ldo_def_stripe_offset = (__u16) -1;
756
757                 if (lp->ldo_def_striping_set) {
758                         if (lp->ldo_pool)
759                                 lod_object_set_pool(lc, lp->ldo_pool);
760                         lc->ldo_stripenr = lp->ldo_def_stripenr;
761                         lc->ldo_stripe_size = lp->ldo_def_stripe_size;
762                         lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
763                         CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
764                                lc->ldo_stripenr, lc->ldo_stripe_size,
765                                lp->ldo_pool ? lp->ldo_pool : "");
766                 }
767         }
768
769         /*
770          * if the parent doesn't provide with specific pattern, grab fs-wide one
771          */
772         desc = &d->lod_desc;
773         if (lc->ldo_stripenr == 0)
774                 lc->ldo_stripenr = desc->ld_default_stripe_count;
775         if (lc->ldo_stripe_size == 0)
776                 lc->ldo_stripe_size = desc->ld_default_stripe_size;
777         CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
778                lc->ldo_stripenr, lc->ldo_stripe_size,
779                lc->ldo_pool ? lc->ldo_pool : "");
780
781         EXIT;
782 }
783
784 #define ll_do_div64(aaa,bbb)    do_div((aaa), (bbb))
785 /*
786  * this function handles a special case when truncate was done
787  * on a stripeless object and now striping is being created
788  * we can't lose that size, so we have to propagate it to newly
789  * created object
790  */
791 static int lod_declare_init_size(const struct lu_env *env,
792                                  struct dt_object *dt, struct thandle *th)
793 {
794         struct dt_object   *next = dt_object_child(dt);
795         struct lod_object  *lo = lod_dt_obj(dt);
796         struct lu_attr     *attr = &lod_env_info(env)->lti_attr;
797         uint64_t            size, offs;
798         int                 rc, stripe;
799         ENTRY;
800
801         /* XXX: we support the simplest (RAID0) striping so far */
802         LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
803         LASSERT(lo->ldo_stripe_size > 0);
804
805         rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
806         LASSERT(attr->la_valid & LA_SIZE);
807         if (rc)
808                 RETURN(rc);
809
810         size = attr->la_size;
811         if (size == 0)
812                 RETURN(0);
813
814         /* ll_do_div64(a, b) returns a % b, and a = a / b */
815         ll_do_div64(size, (__u64) lo->ldo_stripe_size);
816         stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
817
818         size = size * lo->ldo_stripe_size;
819         offs = attr->la_size;
820         size += ll_do_div64(offs, lo->ldo_stripe_size);
821
822         attr->la_valid = LA_SIZE;
823         attr->la_size = size;
824
825         rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
826
827         RETURN(rc);
828 }
829
830
831 /**
832  * Create declaration of striped object
833  */
834 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
835                                struct lu_attr *attr,
836                                const struct lu_buf *lovea, struct thandle *th)
837 {
838         struct lod_thread_info  *info = lod_env_info(env);
839         struct dt_object        *next = dt_object_child(dt);
840         struct lod_object       *lo = lod_dt_obj(dt);
841         int                      rc;
842         ENTRY;
843
844         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
845                 /* failed to create striping, let's reset
846                  * config so that others don't get confused */
847                 lod_object_free_striping(env, lo);
848                 GOTO(out, rc = -ENOMEM);
849         }
850
851         /* choose OST and generate appropriate objects */
852         rc = lod_qos_prep_create(env, lo, attr, lovea, th);
853         if (rc) {
854                 /* failed to create striping, let's reset
855                  * config so that others don't get confused */
856                 lod_object_free_striping(env, lo);
857                 GOTO(out, rc);
858         }
859
860         /*
861          * declare storage for striping data
862          */
863         info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
864                                 lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
865         rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
866                                   0, th);
867         if (rc)
868                 GOTO(out, rc);
869
870         /*
871          * if striping is created with local object's size > 0,
872          * we have to propagate this size to specific object
873          * the case is possible only when local object was created previously
874          */
875         if (dt_object_exists(next))
876                 rc = lod_declare_init_size(env, dt, th);
877
878 out:
879         RETURN(rc);
880 }
881
882 static int lod_declare_object_create(const struct lu_env *env,
883                                      struct dt_object *dt,
884                                      struct lu_attr *attr,
885                                      struct dt_allocation_hint *hint,
886                                      struct dt_object_format *dof,
887                                      struct thandle *th)
888 {
889         struct dt_object   *next = dt_object_child(dt);
890         struct lod_object  *lo = lod_dt_obj(dt);
891         int                 rc;
892         ENTRY;
893
894         LASSERT(dof);
895         LASSERT(attr);
896         LASSERT(th);
897
898         /*
899          * first of all, we declare creation of local object
900          */
901         rc = dt_declare_create(env, next, attr, hint, dof, th);
902         if (rc)
903                 GOTO(out, rc);
904
905         if (dof->dof_type == DFT_SYM)
906                 dt->do_body_ops = &lod_body_lnk_ops;
907
908         /*
909          * it's lod_ah_init() who has decided the object will striped
910          */
911         if (dof->dof_type == DFT_REGULAR) {
912                 /* callers don't want stripes */
913                 /* XXX: all tricky interactions with ->ah_make_hint() decided
914                  * to use striping, then ->declare_create() behaving differently
915                  * should be cleaned */
916                 if (dof->u.dof_reg.striped == 0)
917                         lo->ldo_stripenr = 0;
918                 if (lo->ldo_stripenr > 0)
919                         rc = lod_declare_striped_object(env, dt, attr,
920                                                         NULL, th);
921         } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) {
922                 struct lod_thread_info *info = lod_env_info(env);
923
924                 struct lov_user_md_v3 *v3;
925
926                 if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
927                                         lo->ldo_def_stripenr,
928                                         lo->ldo_def_stripe_offset))
929                         RETURN(0);
930
931                 OBD_ALLOC_PTR(v3);
932                 if (v3 == NULL)
933                         RETURN(-ENOMEM);
934
935                 v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
936                 v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
937                 fid_to_lmm_oi(lu_object_fid(&dt->do_lu), &v3->lmm_oi);
938                 lmm_oi_cpu_to_le(&v3->lmm_oi, &v3->lmm_oi);
939                 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
940                 v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr);
941                 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
942                 if (lo->ldo_pool)
943                         strncpy(v3->lmm_pool_name, lo->ldo_pool,
944                                 LOV_MAXPOOLNAME);
945
946                 info->lti_buf.lb_buf = v3;
947                 info->lti_buf.lb_len = sizeof(*v3);
948
949                 /* to transfer default striping from the parent */
950                 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
951                                           XATTR_NAME_LOV, 0, th);
952                 OBD_FREE_PTR(v3);
953         }
954
955 out:
956         RETURN(rc);
957 }
958
959 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
960                         struct lu_attr *attr, struct dt_object_format *dof,
961                         struct thandle *th)
962 {
963         struct lod_object *lo = lod_dt_obj(dt);
964         int                rc = 0, i;
965         ENTRY;
966
967         LASSERT(lo->ldo_striping_cached == 0);
968
969         /* create all underlying objects */
970         for (i = 0; i < lo->ldo_stripenr; i++) {
971                 LASSERT(lo->ldo_stripe[i]);
972                 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
973
974                 if (rc)
975                         break;
976         }
977         if (rc == 0)
978                 rc = lod_generate_and_set_lovea(env, lo, th);
979
980         RETURN(rc);
981 }
982
983 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
984                              struct lu_attr *attr,
985                              struct dt_allocation_hint *hint,
986                              struct dt_object_format *dof, struct thandle *th)
987 {
988         struct dt_object   *next = dt_object_child(dt);
989         struct lod_object  *lo = lod_dt_obj(dt);
990         int                 rc;
991         ENTRY;
992
993         /* create local object */
994         rc = dt_create(env, next, attr, hint, dof, th);
995
996         if (rc == 0) {
997                 if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
998                         rc = lod_store_def_striping(env, dt, th);
999                 else if (lo->ldo_stripe)
1000                         rc = lod_striping_create(env, dt, attr, dof, th);
1001         }
1002
1003         RETURN(rc);
1004 }
1005
1006 static int lod_declare_object_destroy(const struct lu_env *env,
1007                                       struct dt_object *dt,
1008                                       struct thandle *th)
1009 {
1010         struct dt_object   *next = dt_object_child(dt);
1011         struct lod_object  *lo = lod_dt_obj(dt);
1012         int                 rc, i;
1013         ENTRY;
1014
1015         /*
1016          * we declare destroy for the local object
1017          */
1018         rc = dt_declare_destroy(env, next, th);
1019         if (rc)
1020                 RETURN(rc);
1021
1022         /*
1023          * load striping information, notice we don't do this when object
1024          * is being initialized as we don't need this information till
1025          * few specific cases like destroy, chown
1026          */
1027         rc = lod_load_striping(env, lo);
1028         if (rc)
1029                 RETURN(rc);
1030
1031         /* declare destroy for all underlying objects */
1032         for (i = 0; i < lo->ldo_stripenr; i++) {
1033                 LASSERT(lo->ldo_stripe[i]);
1034                 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
1035
1036                 if (rc)
1037                         break;
1038         }
1039
1040         RETURN(rc);
1041 }
1042
1043 static int lod_object_destroy(const struct lu_env *env,
1044                 struct dt_object *dt, struct thandle *th)
1045 {
1046         struct dt_object  *next = dt_object_child(dt);
1047         struct lod_object *lo = lod_dt_obj(dt);
1048         int                rc, i;
1049         ENTRY;
1050
1051         /* destroy local object */
1052         rc = dt_destroy(env, next, th);
1053         if (rc)
1054                 RETURN(rc);
1055
1056         /* destroy all underlying objects */
1057         for (i = 0; i < lo->ldo_stripenr; i++) {
1058                 LASSERT(lo->ldo_stripe[i]);
1059                 rc = dt_destroy(env, lo->ldo_stripe[i], th);
1060                 if (rc)
1061                         break;
1062         }
1063
1064         RETURN(rc);
1065 }
1066
1067 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
1068                          const struct dt_index_features *feat)
1069 {
1070         struct dt_object *next = dt_object_child(dt);
1071         int               rc;
1072         ENTRY;
1073
1074         LASSERT(next->do_ops);
1075         LASSERT(next->do_ops->do_index_try);
1076
1077         rc = next->do_ops->do_index_try(env, next, feat);
1078         if (next->do_index_ops && dt->do_index_ops == NULL)
1079                 dt->do_index_ops = &lod_index_ops;
1080
1081         RETURN(rc);
1082 }
1083
1084 static int lod_declare_ref_add(const struct lu_env *env,
1085                                struct dt_object *dt, struct thandle *th)
1086 {
1087         return dt_declare_ref_add(env, dt_object_child(dt), th);
1088 }
1089
1090 static int lod_ref_add(const struct lu_env *env,
1091                        struct dt_object *dt, struct thandle *th)
1092 {
1093         return dt_ref_add(env, dt_object_child(dt), th);
1094 }
1095
1096 static int lod_declare_ref_del(const struct lu_env *env,
1097                                struct dt_object *dt, struct thandle *th)
1098 {
1099         return dt_declare_ref_del(env, dt_object_child(dt), th);
1100 }
1101
1102 static int lod_ref_del(const struct lu_env *env,
1103                        struct dt_object *dt, struct thandle *th)
1104 {
1105         return dt_ref_del(env, dt_object_child(dt), th);
1106 }
1107
1108 static struct obd_capa *lod_capa_get(const struct lu_env *env,
1109                                      struct dt_object *dt,
1110                                      struct lustre_capa *old, __u64 opc)
1111 {
1112         return dt_capa_get(env, dt_object_child(dt), old, opc);
1113 }
1114
1115 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
1116 {
1117         return dt_object_sync(env, dt_object_child(dt));
1118 }
1119
1120 static int lod_object_lock(const struct lu_env *env,
1121                            struct dt_object *dt, struct lustre_handle *lh,
1122                            struct ldlm_enqueue_info *einfo,
1123                            void *policy)
1124 {
1125         struct dt_object   *next = dt_object_child(dt);
1126         int              rc;
1127         ENTRY;
1128
1129         /*
1130          * declare setattr on the local object
1131          */
1132         rc = dt_object_lock(env, next, lh, einfo, policy);
1133
1134         RETURN(rc);
1135 }
1136
1137 struct dt_object_operations lod_obj_ops = {
1138         .do_read_lock           = lod_object_read_lock,
1139         .do_write_lock          = lod_object_write_lock,
1140         .do_read_unlock         = lod_object_read_unlock,
1141         .do_write_unlock        = lod_object_write_unlock,
1142         .do_write_locked        = lod_object_write_locked,
1143         .do_attr_get            = lod_attr_get,
1144         .do_declare_attr_set    = lod_declare_attr_set,
1145         .do_attr_set            = lod_attr_set,
1146         .do_xattr_get           = lod_xattr_get,
1147         .do_declare_xattr_set   = lod_declare_xattr_set,
1148         .do_xattr_set           = lod_xattr_set,
1149         .do_declare_xattr_del   = lod_declare_xattr_del,
1150         .do_xattr_del           = lod_xattr_del,
1151         .do_xattr_list          = lod_xattr_list,
1152         .do_ah_init             = lod_ah_init,
1153         .do_declare_create      = lod_declare_object_create,
1154         .do_create              = lod_object_create,
1155         .do_declare_destroy     = lod_declare_object_destroy,
1156         .do_destroy             = lod_object_destroy,
1157         .do_index_try           = lod_index_try,
1158         .do_declare_ref_add     = lod_declare_ref_add,
1159         .do_ref_add             = lod_ref_add,
1160         .do_declare_ref_del     = lod_declare_ref_del,
1161         .do_ref_del             = lod_ref_del,
1162         .do_capa_get            = lod_capa_get,
1163         .do_object_sync         = lod_object_sync,
1164         .do_object_lock         = lod_object_lock,
1165 };
1166
1167 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
1168                         struct lu_buf *buf, loff_t *pos,
1169                         struct lustre_capa *capa)
1170 {
1171         struct dt_object *next = dt_object_child(dt);
1172         return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
1173 }
1174
1175 static ssize_t lod_declare_write(const struct lu_env *env,
1176                                  struct dt_object *dt,
1177                                  const loff_t size, loff_t pos,
1178                                  struct thandle *th)
1179 {
1180         return dt_declare_record_write(env, dt_object_child(dt),
1181                                        size, pos, th);
1182 }
1183
1184 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
1185                          const struct lu_buf *buf, loff_t *pos,
1186                          struct thandle *th, struct lustre_capa *capa, int iq)
1187 {
1188         struct dt_object *next = dt_object_child(dt);
1189         LASSERT(next);
1190         return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
1191 }
1192
1193 static const struct dt_body_operations lod_body_lnk_ops = {
1194         .dbo_read               = lod_read,
1195         .dbo_declare_write      = lod_declare_write,
1196         .dbo_write              = lod_write
1197 };
1198
1199 static int lod_object_init(const struct lu_env *env, struct lu_object *o,
1200                            const struct lu_object_conf *conf)
1201 {
1202         struct lod_device *d = lu2lod_dev(o->lo_dev);
1203         struct lu_object  *below;
1204         struct lu_device  *under;
1205         ENTRY;
1206
1207         /*
1208          * create local object
1209          */
1210         under = &d->lod_child->dd_lu_dev;
1211         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
1212         if (below == NULL)
1213                 RETURN(-ENOMEM);
1214
1215         lu_object_add(o, below);
1216
1217         RETURN(0);
1218 }
1219
1220 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
1221 {
1222         int i;
1223
1224         if (lo->ldo_stripe) {
1225                 LASSERT(lo->ldo_stripes_allocated > 0);
1226
1227                 for (i = 0; i < lo->ldo_stripenr; i++) {
1228                         if (lo->ldo_stripe[i])
1229                                 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
1230                 }
1231
1232                 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
1233                 OBD_FREE(lo->ldo_stripe, i);
1234                 lo->ldo_stripe = NULL;
1235                 lo->ldo_stripes_allocated = 0;
1236         }
1237         lo->ldo_stripenr = 0;
1238         lo->ldo_pattern = 0;
1239 }
1240
1241 /*
1242  * ->start is called once all slices are initialized, including header's
1243  * cache for mode (object type). using the type we can initialize ops
1244  */
1245 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
1246 {
1247         if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
1248                 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
1249         return 0;
1250 }
1251
1252 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
1253 {
1254         struct lod_object *mo = lu2lod_obj(o);
1255
1256         /*
1257          * release all underlying object pinned
1258          */
1259
1260         lod_object_free_striping(env, mo);
1261
1262         lod_object_set_pool(mo, NULL);
1263
1264         lu_object_fini(o);
1265         OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
1266 }
1267
1268 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
1269 {
1270         /* XXX: shouldn't we release everything here in case if object
1271          * creation failed before? */
1272 }
1273
1274 static int lod_object_print(const struct lu_env *env, void *cookie,
1275                             lu_printer_t p, const struct lu_object *l)
1276 {
1277         struct lod_object *o = lu2lod_obj((struct lu_object *) l);
1278
1279         return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
1280 }
1281
1282 struct lu_object_operations lod_lu_obj_ops = {
1283         .loo_object_init        = lod_object_init,
1284         .loo_object_start       = lod_object_start,
1285         .loo_object_free        = lod_object_free,
1286         .loo_object_release     = lod_object_release,
1287         .loo_object_print       = lod_object_print,
1288 };
1289
1290 /**
1291  * Init remote lod object
1292  */
1293 static int lod_robject_init(const struct lu_env *env, struct lu_object *lo,
1294                             const struct lu_object_conf *conf)
1295 {
1296         struct lod_device *lod = lu2lod_dev(lo->lo_dev);
1297         struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1298         struct lu_device  *c_dev = NULL;
1299         struct lu_object  *c_obj;
1300         int i;
1301         ENTRY;
1302
1303         lod_getref(ltd);
1304         if (ltd->ltd_tgts_size > 0) {
1305                 cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
1306                         struct lod_tgt_desc *tgt;
1307                         tgt = LTD_TGT(ltd, i);
1308                         LASSERT(tgt && tgt->ltd_tgt);
1309                         if (tgt->ltd_index ==
1310                             lu2lod_obj(lo)->ldo_mds_num) {
1311                                 c_dev = &(tgt->ltd_tgt->dd_lu_dev);
1312                                 break;
1313                         }
1314                 }
1315         }
1316         lod_putref(lod, ltd);
1317
1318         if (unlikely(c_dev == NULL))
1319                 RETURN(-ENOENT);
1320
1321         c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev);
1322         if (unlikely(c_obj == NULL))
1323                 RETURN(-ENOMEM);
1324
1325         lu_object_add(lo, c_obj);
1326
1327         RETURN(0);
1328 }
1329
1330 struct lu_object_operations lod_lu_robj_ops = {
1331         .loo_object_init      = lod_robject_init,
1332         .loo_object_start     = lod_object_start,
1333         .loo_object_free      = lod_object_free,
1334         .loo_object_release   = lod_object_release,
1335         .loo_object_print     = lod_object_print,
1336 };