4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright 2009 Sun Microsystems, Inc. All rights reserved
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2013, Intel Corporation.
29 * lustre/lod/lod_object.c
31 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <lprocfs_status.h>
42 #include <lustre_fid.h>
43 #include <lustre_param.h>
44 #include <lustre_fid.h>
45 #include <lustre_lmv.h>
47 #include <md_object.h>
49 #include "lod_internal.h"
51 static const char dot[] = ".";
52 static const char dotdot[] = "..";
54 extern struct kmem_cache *lod_object_kmem;
55 static const struct dt_body_operations lod_body_lnk_ops;
57 static int lod_index_lookup(const struct lu_env *env, struct dt_object *dt,
58 struct dt_rec *rec, const struct dt_key *key,
59 struct lustre_capa *capa)
61 struct dt_object *next = dt_object_child(dt);
62 return next->do_index_ops->dio_lookup(env, next, rec, key, capa);
65 static int lod_declare_index_insert(const struct lu_env *env,
67 const struct dt_rec *rec,
68 const struct dt_key *key,
69 struct thandle *handle)
71 return dt_declare_insert(env, dt_object_child(dt), rec, key, handle);
74 static int lod_index_insert(const struct lu_env *env,
76 const struct dt_rec *rec,
77 const struct dt_key *key,
79 struct lustre_capa *capa,
82 return dt_insert(env, dt_object_child(dt), rec, key, th, capa, ign);
85 static int lod_declare_index_delete(const struct lu_env *env,
87 const struct dt_key *key,
90 return dt_declare_delete(env, dt_object_child(dt), key, th);
93 static int lod_index_delete(const struct lu_env *env,
95 const struct dt_key *key,
97 struct lustre_capa *capa)
99 return dt_delete(env, dt_object_child(dt), key, th, capa);
102 static struct dt_it *lod_it_init(const struct lu_env *env,
103 struct dt_object *dt, __u32 attr,
104 struct lustre_capa *capa)
106 struct dt_object *next = dt_object_child(dt);
107 struct lod_it *it = &lod_env_info(env)->lti_it;
108 struct dt_it *it_next;
111 it_next = next->do_index_ops->dio_it.init(env, next, attr, capa);
115 /* currently we do not use more than one iterator per thread
116 * so we store it in thread info. if at some point we need
117 * more active iterators in a single thread, we can allocate
119 LASSERT(it->lit_obj == NULL);
121 it->lit_it = it_next;
124 return (struct dt_it *)it;
127 #define LOD_CHECK_IT(env, it) \
129 LASSERT((it)->lit_obj != NULL); \
130 LASSERT((it)->lit_it != NULL); \
133 void lod_it_fini(const struct lu_env *env, struct dt_it *di)
135 struct lod_it *it = (struct lod_it *)di;
137 LOD_CHECK_IT(env, it);
138 it->lit_obj->do_index_ops->dio_it.fini(env, it->lit_it);
140 /* the iterator not in use any more */
145 int lod_it_get(const struct lu_env *env, struct dt_it *di,
146 const struct dt_key *key)
148 const struct lod_it *it = (const struct lod_it *)di;
150 LOD_CHECK_IT(env, it);
151 return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key);
154 void lod_it_put(const struct lu_env *env, struct dt_it *di)
156 struct lod_it *it = (struct lod_it *)di;
158 LOD_CHECK_IT(env, it);
159 return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it);
162 int lod_it_next(const struct lu_env *env, struct dt_it *di)
164 struct lod_it *it = (struct lod_it *)di;
166 LOD_CHECK_IT(env, it);
167 return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it);
170 struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di)
172 const struct lod_it *it = (const struct lod_it *)di;
174 LOD_CHECK_IT(env, it);
175 return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it);
178 int lod_it_key_size(const struct lu_env *env, const struct dt_it *di)
180 struct lod_it *it = (struct lod_it *)di;
182 LOD_CHECK_IT(env, it);
183 return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it);
186 int lod_it_rec(const struct lu_env *env, const struct dt_it *di,
187 struct dt_rec *rec, __u32 attr)
189 const struct lod_it *it = (const struct lod_it *)di;
191 LOD_CHECK_IT(env, it);
192 return it->lit_obj->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr);
195 __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di)
197 const struct lod_it *it = (const struct lod_it *)di;
199 LOD_CHECK_IT(env, it);
200 return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it);
203 int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash)
205 const struct lod_it *it = (const struct lod_it *)di;
207 LOD_CHECK_IT(env, it);
208 return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash);
211 int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di,
214 const struct lod_it *it = (const struct lod_it *)di;
216 LOD_CHECK_IT(env, it);
217 return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, key_rec);
220 static struct dt_index_operations lod_index_ops = {
221 .dio_lookup = lod_index_lookup,
222 .dio_declare_insert = lod_declare_index_insert,
223 .dio_insert = lod_index_insert,
224 .dio_declare_delete = lod_declare_index_delete,
225 .dio_delete = lod_index_delete,
233 .key_size = lod_it_key_size,
235 .store = lod_it_store,
237 .key_rec = lod_it_key_rec,
241 static void lod_object_read_lock(const struct lu_env *env,
242 struct dt_object *dt, unsigned role)
244 dt_read_lock(env, dt_object_child(dt), role);
247 static void lod_object_write_lock(const struct lu_env *env,
248 struct dt_object *dt, unsigned role)
250 dt_write_lock(env, dt_object_child(dt), role);
253 static void lod_object_read_unlock(const struct lu_env *env,
254 struct dt_object *dt)
256 dt_read_unlock(env, dt_object_child(dt));
259 static void lod_object_write_unlock(const struct lu_env *env,
260 struct dt_object *dt)
262 dt_write_unlock(env, dt_object_child(dt));
265 static int lod_object_write_locked(const struct lu_env *env,
266 struct dt_object *dt)
268 return dt_write_locked(env, dt_object_child(dt));
271 static int lod_attr_get(const struct lu_env *env,
272 struct dt_object *dt,
273 struct lu_attr *attr,
274 struct lustre_capa *capa)
276 return dt_attr_get(env, dt_object_child(dt), attr, capa);
279 static int lod_declare_attr_set(const struct lu_env *env,
280 struct dt_object *dt,
281 const struct lu_attr *attr,
282 struct thandle *handle)
284 struct dt_object *next = dt_object_child(dt);
285 struct lod_object *lo = lod_dt_obj(dt);
290 * declare setattr on the local object
292 rc = dt_declare_attr_set(env, next, attr, handle);
296 /* osp_declare_attr_set() ignores all attributes other than
297 * UID, GID, and size, and osp_attr_set() ignores all but UID
298 * and GID. Declaration of size attr setting happens through
299 * lod_declare_init_size(), and not through this function.
300 * Therefore we need not load striping unless ownership is
301 * changing. This should save memory and (we hope) speed up
303 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
304 if (!(attr->la_valid & (LA_UID | LA_GID)))
307 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
310 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
311 LA_ATIME | LA_MTIME | LA_CTIME)))
315 * load striping information, notice we don't do this when object
316 * is being initialized as we don't need this information till
317 * few specific cases like destroy, chown
319 rc = lod_load_striping(env, lo);
323 if (lo->ldo_stripenr == 0)
326 if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
327 struct lu_attr *la = &lod_env_info(env)->lti_attr;
328 bool setattr_time = false;
330 rc = dt_attr_get(env, dt_object_child(dt), la,
335 /* If it will only setattr time, it will only set
336 * time < current_time */
337 if ((attr->la_valid & LA_ATIME &&
338 attr->la_atime < la->la_atime) ||
339 (attr->la_valid & LA_CTIME &&
340 attr->la_ctime < la->la_ctime) ||
341 (attr->la_valid & LA_MTIME &&
342 attr->la_mtime < la->la_mtime))
349 * if object is striped declare changes on the stripes
351 LASSERT(lo->ldo_stripe);
352 for (i = 0; i < lo->ldo_stripenr; i++) {
353 LASSERT(lo->ldo_stripe[i]);
355 rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
357 CERROR("failed declaration: %d\n", rc);
362 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
363 dt_object_exists(next) != 0 &&
364 dt_object_remote(next) == 0)
365 dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle);
370 static int lod_attr_set(const struct lu_env *env,
371 struct dt_object *dt,
372 const struct lu_attr *attr,
373 struct thandle *handle,
374 struct lustre_capa *capa)
376 struct dt_object *next = dt_object_child(dt);
377 struct lod_object *lo = lod_dt_obj(dt);
382 * apply changes to the local object
384 rc = dt_attr_set(env, next, attr, handle, capa);
388 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
389 if (!(attr->la_valid & (LA_UID | LA_GID)))
392 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
395 if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
396 LA_ATIME | LA_MTIME | LA_CTIME)))
400 if (lo->ldo_stripenr == 0)
403 if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
404 struct lu_attr *la = &lod_env_info(env)->lti_attr;
405 bool setattr_time = false;
407 rc = dt_attr_get(env, dt_object_child(dt), la,
412 /* If it will only setattr time, it will only set
413 * time < current_time */
414 if ((attr->la_valid & LA_ATIME &&
415 attr->la_atime < la->la_atime) ||
416 (attr->la_valid & LA_CTIME &&
417 attr->la_ctime < la->la_ctime) ||
418 (attr->la_valid & LA_MTIME &&
419 attr->la_mtime < la->la_mtime))
427 * if object is striped, apply changes to all the stripes
429 LASSERT(lo->ldo_stripe);
430 for (i = 0; i < lo->ldo_stripenr; i++) {
431 LASSERT(lo->ldo_stripe[i]);
432 rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
434 CERROR("failed declaration: %d\n", rc);
439 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) &&
440 dt_object_exists(next) != 0 &&
441 dt_object_remote(next) == 0)
442 dt_xattr_del(env, next, XATTR_NAME_LOV, handle, BYPASS_CAPA);
447 static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
448 struct lu_buf *buf, const char *name,
449 struct lustre_capa *capa)
451 struct lod_thread_info *info = lod_env_info(env);
452 struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev);
456 rc = dt_xattr_get(env, dt_object_child(dt), buf, name, capa);
457 if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT))
461 * lod returns default striping on the real root of the device
462 * this is like the root stores default striping for the whole
463 * filesystem. historically we've been using a different approach
464 * and store it in the config.
466 dt_root_get(env, dev->lod_child, &info->lti_fid);
467 is_root = lu_fid_eq(&info->lti_fid, lu_object_fid(&dt->do_lu));
469 if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) {
470 struct lov_user_md *lum = buf->lb_buf;
471 struct lov_desc *desc = &dev->lod_desc;
473 if (buf->lb_buf == NULL) {
475 } else if (buf->lb_len >= sizeof(*lum)) {
476 lum->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1);
477 lmm_oi_set_seq(&lum->lmm_oi, FID_SEQ_LOV_DEFAULT);
478 lmm_oi_set_id(&lum->lmm_oi, 0);
479 lmm_oi_cpu_to_le(&lum->lmm_oi, &lum->lmm_oi);
480 lum->lmm_pattern = cpu_to_le32(desc->ld_pattern);
481 lum->lmm_stripe_size = cpu_to_le32(
482 desc->ld_default_stripe_size);
483 lum->lmm_stripe_count = cpu_to_le16(
484 desc->ld_default_stripe_count);
485 lum->lmm_stripe_offset = cpu_to_le16(
486 desc->ld_default_stripe_offset);
496 static int lod_verify_md_striping(struct lod_device *lod,
497 const struct lmv_user_md_v1 *lum)
502 if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
503 GOTO(out, rc = -EINVAL);
505 if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
506 GOTO(out, rc = -EINVAL);
508 if (unlikely(le32_to_cpu(lum->lum_stripe_count) >
509 lod->lod_remote_mdt_count + 1))
510 GOTO(out, rc = -EINVAL);
513 CERROR("%s: invalid lmv_user_md: magic = %x, "
514 "stripe_offset = %d, stripe_count = %u: rc = %d\n",
515 lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
516 (int)le32_to_cpu(lum->lum_stripe_offset),
517 le32_to_cpu(lum->lum_stripe_count), rc);
521 int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
522 struct lu_buf *lmv_buf)
524 struct lod_thread_info *info = lod_env_info(env);
525 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
526 struct lod_object *lo = lod_dt_obj(dt);
527 struct lmv_mds_md_v1 *lmm1;
530 int type = LU_SEQ_RANGE_ANY;
536 LASSERT(lo->ldo_dir_striped != 0);
537 LASSERT(lo->ldo_stripenr > 0);
538 stripe_count = lo->ldo_stripenr + 1;
539 lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
540 if (info->lti_ea_store_size < lmm_size) {
541 rc = lod_ea_store_resize(info, lmm_size);
546 lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
547 lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
548 lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
549 lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
550 rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
555 lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
556 fid_cpu_to_le(&lmm1->lmv_stripe_fids[0], lu_object_fid(&dt->do_lu));
557 for (i = 0; i < lo->ldo_stripenr; i++) {
558 struct dt_object *dto;
560 dto = lo->ldo_stripe[i];
561 LASSERT(dto != NULL);
562 fid_cpu_to_le(&lmm1->lmv_stripe_fids[i + 1],
563 lu_object_fid(&dto->do_lu));
566 lmv_buf->lb_buf = info->lti_ea_store;
567 lmv_buf->lb_len = lmm_size;
568 lo->ldo_dir_striping_cached = 1;
573 int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
574 const struct lu_buf *buf)
576 struct lod_thread_info *info = lod_env_info(env);
577 struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
578 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
579 struct dt_object **stripe;
580 union lmv_mds_md *lmm = buf->lb_buf;
581 struct lmv_mds_md_v1 *lmv1 = &lmm->lmv_md_v1;
582 struct lu_fid *fid = &info->lti_fid;
587 if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
590 if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
593 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[0]);
594 /* Do not load striping information for slave inode */
595 if (!lu_fid_eq(fid, lu_object_fid(&lo->ldo_obj.do_lu))) {
596 lo->ldo_dir_slave_stripe = 1;
600 LASSERT(lo->ldo_stripe == NULL);
601 OBD_ALLOC(stripe, sizeof(stripe[0]) *
602 (le32_to_cpu(lmv1->lmv_stripe_count) - 1));
606 /* skip master stripe */
607 for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
608 struct lod_tgt_desc *tgt;
610 int type = LU_SEQ_RANGE_ANY;
611 struct dt_object *dto;
613 fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
614 rc = lod_fld_lookup(env, lod, fid, &idx, &type);
618 tgt = LTD_TGT(ltd, idx);
620 GOTO(out, rc = -ESTALE);
622 dto = dt_locate_at(env, tgt->ltd_tgt, fid,
623 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
626 GOTO(out, rc = PTR_ERR(dto));
631 lo->ldo_stripe = stripe;
632 lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
633 lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
635 lod_object_free_striping(env, lo);
640 static int lod_prep_md_striped_create(const struct lu_env *env,
641 struct dt_object *dt,
642 struct lu_attr *attr,
643 const struct lmv_user_md_v1 *lum,
646 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
647 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
648 struct lod_object *lo = lod_dt_obj(dt);
649 struct dt_object **stripe;
650 struct lu_buf lmv_buf;
658 /* The lum has been verifed in lod_verify_md_striping */
659 LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
660 LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
662 /* Do not need allocated master stripe */
663 stripe_count = le32_to_cpu(lum->lum_stripe_count);
664 OBD_ALLOC(stripe, sizeof(stripe[0]) * (stripe_count - 1));
668 OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
669 if (idx_array == NULL)
670 GOTO(out_free, rc = -ENOMEM);
672 idx_array[0] = le32_to_cpu(lum->lum_stripe_offset);
673 for (i = 1; i < stripe_count; i++) {
674 struct lod_tgt_desc *tgt;
675 struct dt_object *dto;
678 struct lu_object_conf conf = { 0 };
680 idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
682 for (j = 0; j < lod->lod_remote_mdt_count;
683 j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
684 bool already_allocated = false;
687 CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
688 " allocated %d, last allocated %d\n", idx,
689 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
691 /* Find next avaible target */
692 if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
695 /* check whether the idx already exists
696 * in current allocated array */
697 for (k = 0; k < i; k++) {
698 if (idx_array[k] == idx) {
699 already_allocated = true;
704 if (already_allocated)
710 /* Can not allocate more stripes */
711 if (j == lod->lod_remote_mdt_count) {
712 CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
713 lod2obd(lod)->obd_name, stripe_count, i - 1);
717 CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
718 " allocated %d, last allocated %d\n", idx,
719 lod->lod_remote_mdt_count, i, idx_array[i - 1]);
721 tgt = LTD_TGT(ltd, idx);
722 LASSERT(tgt != NULL);
724 rc = obd_fid_alloc(tgt->ltd_exp, &fid, NULL);
729 conf.loc_flags = LOC_F_NEW;
730 dto = dt_locate_at(env, tgt->ltd_tgt, &fid,
731 dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf);
733 GOTO(out_put, rc = PTR_ERR(dto));
738 lo->ldo_dir_striped = 1;
739 lo->ldo_stripe = stripe;
740 lo->ldo_stripenr = i - 1;
741 lo->ldo_stripes_allocated = stripe_count - 1;
743 if (lo->ldo_stripenr == 0)
744 GOTO(out_put, rc = -ENOSPC);
746 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
750 for (i = 0; i < lo->ldo_stripenr; i++) {
751 struct dt_object *dto;
754 /* only create slave striped object */
755 rc = dt_declare_create(env, dto, attr, NULL, NULL, th);
759 if (!dt_try_as_dir(env, dto))
760 GOTO(out_put, rc = -EINVAL);
762 rc = dt_declare_insert(env, dto,
763 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
764 (const struct dt_key *)dot, th);
768 /* master stripe FID will be put to .. */
769 rc = dt_declare_insert(env, dto,
770 (const struct dt_rec *)lu_object_fid(&dt->do_lu),
771 (const struct dt_key *)dotdot, th);
775 /* probably nothing to inherite */
776 if (lo->ldo_striping_cached &&
777 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
778 lo->ldo_def_stripenr,
779 lo->ldo_def_stripe_offset)) {
780 struct lod_thread_info *info;
781 struct lov_user_md_v3 *v3;
783 /* sigh, lti_ea_store has been used for lmv_buf,
784 * so we have to allocate buffer for default
788 GOTO(out_put, rc = -ENOMEM);
790 memset(v3, 0, sizeof(*v3));
791 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
792 v3->lmm_stripe_count =
793 cpu_to_le32(lo->ldo_def_stripenr);
794 v3->lmm_stripe_offset =
795 cpu_to_le32(lo->ldo_def_stripe_offset);
796 v3->lmm_stripe_size =
797 cpu_to_le32(lo->ldo_def_stripe_size);
799 strncpy(v3->lmm_pool_name, lo->ldo_pool,
802 info = lod_env_info(env);
803 info->lti_buf.lb_buf = v3;
804 info->lti_buf.lb_len = sizeof(*v3);
805 rc = dt_declare_xattr_set(env, dto,
813 rc = dt_declare_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, 0,
819 rc = dt_declare_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, 0, th);
825 for (i = 0; i < stripe_count - 1; i++)
826 if (stripe[i] != NULL)
827 lu_object_put(env, &stripe[i]->do_lu);
828 OBD_FREE(stripe, sizeof(stripe[0]) * (stripe_count - 1));
832 if (idx_array != NULL)
833 OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
839 * Declare create striped md object.
841 static int lod_declare_xattr_set_lmv(const struct lu_env *env,
842 struct dt_object *dt,
843 struct lu_attr *attr,
844 const struct lu_buf *lum_buf,
847 struct lod_object *lo = lod_dt_obj(dt);
848 struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
849 struct lmv_user_md_v1 *lum;
853 lum = lum_buf->lb_buf;
854 LASSERT(lum != NULL);
856 CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
857 le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
858 (int)le32_to_cpu(lum->lum_stripe_offset));
860 if (le32_to_cpu(lum->lum_stripe_count) <= 1)
863 rc = lod_verify_md_striping(lod, lum);
867 /* prepare dir striped objects */
868 rc = lod_prep_md_striped_create(env, dt, attr, lum, th);
870 /* failed to create striping, let's reset
871 * config so that others don't get confused */
872 lod_object_free_striping(env, lo);
880 * LOV xattr is a storage for striping, and LOD owns this xattr.
881 * but LOD allows others to control striping to some extent
883 * - to set new defined striping
884 * - to set new semi-defined striping
885 * - number of stripes is defined
886 * - number of stripes + osts are defined
889 static int lod_declare_xattr_set(const struct lu_env *env,
890 struct dt_object *dt,
891 const struct lu_buf *buf,
892 const char *name, int fl,
895 struct dt_object *next = dt_object_child(dt);
896 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
902 * allow to declare predefined striping on a new (!mode) object
903 * which is supposed to be replay of regular file creation
904 * (when LOV setting is declared)
905 * LU_XATTR_REPLACE is set to indicate a layout swap
907 mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
908 if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
909 !(fl & LU_XATTR_REPLACE)) {
911 * this is a request to manipulate object's striping
913 if (dt_object_exists(dt)) {
914 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
918 memset(attr, 0, sizeof(*attr));
919 attr->la_valid = LA_TYPE | LA_MODE;
920 attr->la_mode = S_IFREG;
922 rc = lod_declare_striped_object(env, dt, attr, buf, th);
923 } else if (S_ISDIR(mode)) {
924 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
925 struct lod_object *lo = lod_dt_obj(dt);
928 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
929 struct lmv_user_md_v1 *lum;
931 LASSERT(buf != NULL && buf->lb_buf != NULL);
933 rc = lod_verify_md_striping(d, lum);
938 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
942 /* set xattr to each stripes, if needed */
943 rc = lod_load_striping(env, lo);
947 if (lo->ldo_stripenr == 0)
950 for (i = 0; i < lo->ldo_stripenr; i++) {
951 LASSERT(lo->ldo_stripe[i]);
952 rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf,
958 rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
964 static void lod_lov_stripe_cache_clear(struct lod_object *lo)
966 lo->ldo_striping_cached = 0;
967 lo->ldo_def_striping_set = 0;
968 lod_object_set_pool(lo, NULL);
969 lo->ldo_def_stripe_size = 0;
970 lo->ldo_def_stripenr = 0;
973 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
974 struct dt_object *dt,
975 const struct lu_buf *buf,
976 const char *name, int fl,
978 struct lustre_capa *capa)
980 struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
981 struct dt_object *next = dt_object_child(dt);
982 struct lod_object *l = lod_dt_obj(dt);
983 struct lov_user_md_v1 *lum;
984 struct lov_user_md_v3 *v3 = NULL;
988 /* If it is striped dir, we should clear the stripe cache for
989 * slave stripe as well, but there are no effective way to
990 * notify the LOD on the slave MDT, so we do not cache stripe
991 * information for slave stripe for now. XXX*/
992 lod_lov_stripe_cache_clear(l);
993 LASSERT(buf != NULL && buf->lb_buf != NULL);
996 rc = lod_verify_striping(d, buf, 0);
1000 if (lum->lmm_magic == LOV_USER_MAGIC_V3)
1003 /* if { size, offset, count } = { 0, -1, 0 } and no pool
1004 * (i.e. all default values specified) then delete default
1005 * striping from dir. */
1007 "set default striping: sz %u # %u offset %d %s %s\n",
1008 (unsigned)lum->lmm_stripe_size,
1009 (unsigned)lum->lmm_stripe_count,
1010 (int)lum->lmm_stripe_offset,
1011 v3 ? "from" : "", v3 ? v3->lmm_pool_name : "");
1013 if (LOVEA_DELETE_VALUES((lum->lmm_stripe_size),
1014 (lum->lmm_stripe_count),
1015 (lum->lmm_stripe_offset)) &&
1016 lum->lmm_magic == LOV_USER_MAGIC_V1) {
1017 rc = dt_xattr_del(env, next, name, th, capa);
1021 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1027 static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env,
1028 struct dt_object *dt,
1029 const struct lu_buf *buf,
1030 const char *name, int fl,
1032 struct lustre_capa *capa)
1034 struct dt_object *next = dt_object_child(dt);
1035 struct lod_object *l = lod_dt_obj(dt);
1036 struct lmv_user_md_v1 *lum;
1040 LASSERT(buf != NULL && buf->lb_buf != NULL);
1043 CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n",
1044 le32_to_cpu(lum->lum_stripe_count),
1045 (int)le32_to_cpu(lum->lum_stripe_offset));
1047 if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)),
1048 le32_to_cpu(lum->lum_stripe_offset)) &&
1049 le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) {
1050 rc = dt_xattr_del(env, next, name, th, capa);
1054 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1058 /* Update default stripe cache */
1059 if (l->ldo_dir_stripe == NULL) {
1060 OBD_ALLOC_PTR(l->ldo_dir_stripe);
1061 if (l->ldo_dir_stripe == NULL)
1065 l->ldo_dir_striping_cached = 0;
1066 l->ldo_dir_def_striping_set = 1;
1067 l->ldo_dir_def_stripenr =
1068 le32_to_cpu(lum->lum_stripe_count) - 1;
1074 static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
1075 const struct lu_buf *buf, const char *name,
1076 int fl, struct thandle *th,
1077 struct lustre_capa *capa)
1079 struct lod_object *lo = lod_dt_obj(dt);
1080 struct lu_buf lmv_buf;
1085 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
1088 /* The stripes are supposed to be allocated in declare phase,
1089 * if there are no stripes being allocated, it will skip */
1090 if (lo->ldo_stripenr == 0)
1093 rc = lod_prep_lmv_md(env, dt, &lmv_buf);
1097 for (i = 0; i < lo->ldo_stripenr; i++) {
1098 struct dt_object *dto;
1099 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
1101 dto = lo->ldo_stripe[i];
1102 memset(attr, 0, sizeof(*attr));
1103 attr->la_valid = LA_TYPE | LA_MODE;
1104 attr->la_mode = S_IFDIR;
1105 rc = dt_create(env, dto, attr, NULL, NULL, th);
1109 rc = dt_insert(env, dto,
1110 (const struct dt_rec *)lu_object_fid(&dto->do_lu),
1111 (const struct dt_key *)dot, th, capa, 0);
1115 rc = dt_insert(env, dto,
1116 (struct dt_rec *)lu_object_fid(&dt->do_lu),
1117 (const struct dt_key *)dotdot, th, capa, 0);
1121 if (lo->ldo_striping_cached &&
1122 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1123 lo->ldo_def_stripenr,
1124 lo->ldo_def_stripe_offset)) {
1125 struct lod_thread_info *info;
1126 struct lov_user_md_v3 *v3;
1128 /* sigh, lti_ea_store has been used for lmv_buf,
1129 * so we have to allocate buffer for default
1135 memset(v3, 0, sizeof(*v3));
1136 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1137 v3->lmm_stripe_count =
1138 cpu_to_le32(lo->ldo_def_stripenr);
1139 v3->lmm_stripe_offset =
1140 cpu_to_le32(lo->ldo_def_stripe_offset);
1141 v3->lmm_stripe_size =
1142 cpu_to_le32(lo->ldo_def_stripe_size);
1144 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1147 info = lod_env_info(env);
1148 info->lti_buf.lb_buf = v3;
1149 info->lti_buf.lb_len = sizeof(*v3);
1150 rc = dt_xattr_set(env, dto, &info->lti_buf,
1151 XATTR_NAME_LOV, 0, th, capa);
1157 rc = dt_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, fl, th,
1161 rc = dt_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, fl, th, capa);
1166 static int lod_xattr_set(const struct lu_env *env,
1167 struct dt_object *dt, const struct lu_buf *buf,
1168 const char *name, int fl, struct thandle *th,
1169 struct lustre_capa *capa)
1171 struct lod_object *lo = lod_dt_obj(dt);
1172 struct dt_object *next = dt_object_child(dt);
1178 attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
1179 if (S_ISDIR(attr) && strcmp(name, XATTR_NAME_LOV) == 0) {
1180 rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
1181 } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
1182 /* in case of lov EA swap, just set it
1183 * if not, it is a replay so check striping match what we
1184 * already have during req replay, declare_xattr_set()
1185 * defines striping, then create() does the work
1187 if (fl & LU_XATTR_REPLACE) {
1188 /* free stripes, then update disk */
1189 lod_object_free_striping(env, lod_dt_obj(dt));
1190 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1192 rc = lod_striping_create(env, dt, NULL, NULL, th);
1194 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1197 rc = lod_xattr_set_default_lmv_on_dir(env, dt, buf, name, fl,
1201 * behave transparantly for all other EAs
1203 rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
1206 if (rc != 0 || !S_ISDIR(attr))
1209 if (lo->ldo_stripenr == 0)
1212 for (i = 0; i < lo->ldo_stripenr; i++) {
1213 LASSERT(lo->ldo_stripe[i]);
1214 rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th,
1223 static int lod_declare_xattr_del(const struct lu_env *env,
1224 struct dt_object *dt, const char *name,
1227 return dt_declare_xattr_del(env, dt_object_child(dt), name, th);
1230 static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
1231 const char *name, struct thandle *th,
1232 struct lustre_capa *capa)
1234 if (!strcmp(name, XATTR_NAME_LOV))
1235 lod_object_free_striping(env, lod_dt_obj(dt));
1236 return dt_xattr_del(env, dt_object_child(dt), name, th, capa);
1239 static int lod_xattr_list(const struct lu_env *env,
1240 struct dt_object *dt, struct lu_buf *buf,
1241 struct lustre_capa *capa)
1243 return dt_xattr_list(env, dt_object_child(dt), buf, capa);
1246 int lod_object_set_pool(struct lod_object *o, char *pool)
1251 len = strlen(o->ldo_pool);
1252 OBD_FREE(o->ldo_pool, len + 1);
1257 OBD_ALLOC(o->ldo_pool, len + 1);
1258 if (o->ldo_pool == NULL)
1260 strcpy(o->ldo_pool, pool);
1265 static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fid)
1267 return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
1271 static int lod_cache_parent_lov_striping(const struct lu_env *env,
1272 struct lod_object *lp)
1274 struct lod_thread_info *info = lod_env_info(env);
1275 struct lov_user_md_v1 *v1 = NULL;
1276 struct lov_user_md_v3 *v3 = NULL;
1280 /* called from MDD without parent being write locked,
1282 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
1283 rc = lod_get_lov_ea(env, lp);
1287 if (rc < sizeof(struct lov_user_md)) {
1288 /* don't lookup for non-existing or invalid striping */
1289 lp->ldo_def_striping_set = 0;
1290 lp->ldo_striping_cached = 1;
1291 lp->ldo_def_stripe_size = 0;
1292 lp->ldo_def_stripenr = 0;
1293 lp->ldo_def_stripe_offset = (typeof(v1->lmm_stripe_offset))(-1);
1294 GOTO(unlock, rc = 0);
1298 v1 = info->lti_ea_store;
1299 if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
1300 lustre_swab_lov_user_md_v1(v1);
1301 else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
1302 lustre_swab_lov_user_md_v3(v3);
1304 if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1)
1305 GOTO(unlock, rc = 0);
1307 if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
1308 GOTO(unlock, rc = 0);
1310 lp->ldo_def_stripenr = v1->lmm_stripe_count;
1311 lp->ldo_def_stripe_size = v1->lmm_stripe_size;
1312 lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
1313 lp->ldo_striping_cached = 1;
1314 lp->ldo_def_striping_set = 1;
1315 if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
1316 /* XXX: sanity check here */
1317 v3 = (struct lov_user_md_v3 *) v1;
1318 if (v3->lmm_pool_name[0])
1319 lod_object_set_pool(lp, v3->lmm_pool_name);
1323 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
1328 static int lod_cache_parent_lmv_striping(const struct lu_env *env,
1329 struct lod_object *lp)
1331 struct lod_thread_info *info = lod_env_info(env);
1332 struct lmv_user_md_v1 *v1 = NULL;
1336 /* called from MDD without parent being write locked,
1338 dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
1339 rc = lod_get_default_lmv_ea(env, lp);
1343 if (rc < sizeof(struct lmv_user_md)) {
1344 /* don't lookup for non-existing or invalid striping */
1345 lp->ldo_dir_def_striping_set = 0;
1346 lp->ldo_dir_striping_cached = 1;
1347 lp->ldo_dir_def_stripenr = 0;
1348 lp->ldo_dir_def_stripe_offset =
1349 (typeof(v1->lum_stripe_offset))(-1);
1350 lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
1351 GOTO(unlock, rc = 0);
1355 v1 = info->lti_ea_store;
1357 lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count) - 1;
1358 lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
1359 lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
1360 lp->ldo_dir_def_striping_set = 1;
1361 lp->ldo_dir_striping_cached = 1;
1365 dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
1369 static int lod_cache_parent_striping(const struct lu_env *env,
1370 struct lod_object *lp,
1376 rc = lod_load_striping(env, lp);
1380 if (!lp->ldo_striping_cached) {
1381 /* we haven't tried to get default striping for
1382 * the directory yet, let's cache it in the object */
1383 rc = lod_cache_parent_lov_striping(env, lp);
1388 if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
1389 rc = lod_cache_parent_lmv_striping(env, lp);
1395 * used to transfer default striping data to the object being created
1397 static void lod_ah_init(const struct lu_env *env,
1398 struct dt_allocation_hint *ah,
1399 struct dt_object *parent,
1400 struct dt_object *child,
1403 struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev);
1404 struct dt_object *nextp = NULL;
1405 struct dt_object *nextc;
1406 struct lod_object *lp = NULL;
1407 struct lod_object *lc;
1408 struct lov_desc *desc;
1413 if (likely(parent)) {
1414 nextp = dt_object_child(parent);
1415 lp = lod_dt_obj(parent);
1418 nextc = dt_object_child(child);
1419 lc = lod_dt_obj(child);
1421 LASSERT(lc->ldo_stripenr == 0);
1422 LASSERT(lc->ldo_stripe == NULL);
1425 * local object may want some hints
1426 * in case of late striping creation, ->ah_init()
1427 * can be called with local object existing
1429 if (!dt_object_exists(nextc) || dt_object_remote(nextc))
1430 nextc->do_ops->do_ah_init(env, ah, dt_object_remote(nextp) ?
1431 NULL : nextp, nextc, child_mode);
1433 if (S_ISDIR(child_mode)) {
1436 if (lc->ldo_dir_stripe == NULL) {
1437 OBD_ALLOC_PTR(lc->ldo_dir_stripe);
1438 if (lc->ldo_dir_stripe == NULL)
1442 if (lp->ldo_dir_stripe == NULL) {
1443 OBD_ALLOC_PTR(lp->ldo_dir_stripe);
1444 if (lp->ldo_dir_stripe == NULL)
1448 rc = lod_cache_parent_striping(env, lp, child_mode);
1452 /* transfer defaults to new directory */
1453 if (lp->ldo_striping_cached) {
1455 lod_object_set_pool(lc, lp->ldo_pool);
1456 lc->ldo_def_stripenr = lp->ldo_def_stripenr;
1457 lc->ldo_def_stripe_size = lp->ldo_def_stripe_size;
1458 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
1459 lc->ldo_striping_cached = 1;
1460 lc->ldo_def_striping_set = 1;
1461 CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
1462 (int)lc->ldo_def_stripe_size,
1463 (int)lc->ldo_def_stripe_offset,
1464 (int)lc->ldo_def_stripenr);
1467 /* transfer dir defaults to new directory */
1468 if (lp->ldo_dir_striping_cached) {
1469 lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
1470 lc->ldo_dir_def_stripe_offset =
1471 lp->ldo_dir_def_stripe_offset;
1472 lc->ldo_dir_def_hash_type =
1473 lp->ldo_dir_def_hash_type;
1474 lc->ldo_dir_striping_cached = 1;
1475 lc->ldo_dir_def_striping_set = 1;
1476 CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
1477 (int)lc->ldo_dir_def_stripenr,
1478 (int)lc->ldo_dir_def_stripe_offset,
1479 lc->ldo_dir_def_hash_type);
1482 /* If the directory is specified with certain stripes */
1483 if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
1484 const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
1487 rc = lod_verify_md_striping(d, lum1);
1489 le32_to_cpu(lum1->lum_stripe_count) > 1) {
1490 /* Directory will be striped only if
1491 * stripe_count > 1 */
1493 le32_to_cpu(lum1->lum_stripe_count) - 1;
1494 lc->ldo_dir_stripe_offset =
1495 le32_to_cpu(lum1->lum_stripe_offset);
1496 lc->ldo_dir_hash_type =
1497 le32_to_cpu(lum1->lum_hash_type);
1498 CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
1500 (int)lc->ldo_dir_stripe_offset);
1502 } else if (lp->ldo_dir_def_striping_set) {
1503 /* If there are default dir stripe from parent */
1504 lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
1505 lc->ldo_dir_stripe_offset =
1506 lp->ldo_dir_def_stripe_offset;
1507 lc->ldo_dir_hash_type =
1508 lp->ldo_dir_def_hash_type;
1509 CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
1511 (int)lc->ldo_dir_stripe_offset);
1513 /* set default stripe for this directory */
1514 lc->ldo_stripenr = 0;
1515 lc->ldo_dir_stripe_offset = -1;
1518 CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
1519 lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
1525 * if object is going to be striped over OSTs, transfer default
1526 * striping information to the child, so that we can use it
1527 * during declaration and creation
1529 if (!lod_object_will_be_striped(S_ISREG(child_mode),
1530 lu_object_fid(&child->do_lu)))
1533 * try from the parent
1535 if (likely(parent)) {
1536 lod_cache_parent_striping(env, lp, child_mode);
1538 lc->ldo_def_stripe_offset = (__u16) -1;
1540 if (lp->ldo_def_striping_set) {
1542 lod_object_set_pool(lc, lp->ldo_pool);
1543 lc->ldo_stripenr = lp->ldo_def_stripenr;
1544 lc->ldo_stripe_size = lp->ldo_def_stripe_size;
1545 lc->ldo_def_stripe_offset = lp->ldo_def_stripe_offset;
1546 CDEBUG(D_OTHER, "striping from parent: #%d, sz %d %s\n",
1547 lc->ldo_stripenr, lc->ldo_stripe_size,
1548 lp->ldo_pool ? lp->ldo_pool : "");
1553 * if the parent doesn't provide with specific pattern, grab fs-wide one
1555 desc = &d->lod_desc;
1556 if (lc->ldo_stripenr == 0)
1557 lc->ldo_stripenr = desc->ld_default_stripe_count;
1558 if (lc->ldo_stripe_size == 0)
1559 lc->ldo_stripe_size = desc->ld_default_stripe_size;
1560 CDEBUG(D_OTHER, "final striping: # %d stripes, sz %d from %s\n",
1561 lc->ldo_stripenr, lc->ldo_stripe_size,
1562 lc->ldo_pool ? lc->ldo_pool : "");
1565 /* we do not cache stripe information for slave stripe, see
1566 * lod_xattr_set_lov_on_dir */
1567 if (lp != NULL && lp->ldo_dir_slave_stripe)
1568 lod_lov_stripe_cache_clear(lp);
1573 #define ll_do_div64(aaa,bbb) do_div((aaa), (bbb))
1575 * this function handles a special case when truncate was done
1576 * on a stripeless object and now striping is being created
1577 * we can't lose that size, so we have to propagate it to newly
1580 static int lod_declare_init_size(const struct lu_env *env,
1581 struct dt_object *dt, struct thandle *th)
1583 struct dt_object *next = dt_object_child(dt);
1584 struct lod_object *lo = lod_dt_obj(dt);
1585 struct lu_attr *attr = &lod_env_info(env)->lti_attr;
1586 uint64_t size, offs;
1590 /* XXX: we support the simplest (RAID0) striping so far */
1591 LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
1592 LASSERT(lo->ldo_stripe_size > 0);
1594 rc = dt_attr_get(env, next, attr, BYPASS_CAPA);
1595 LASSERT(attr->la_valid & LA_SIZE);
1599 size = attr->la_size;
1603 /* ll_do_div64(a, b) returns a % b, and a = a / b */
1604 ll_do_div64(size, (__u64) lo->ldo_stripe_size);
1605 stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr);
1607 size = size * lo->ldo_stripe_size;
1608 offs = attr->la_size;
1609 size += ll_do_div64(offs, lo->ldo_stripe_size);
1611 attr->la_valid = LA_SIZE;
1612 attr->la_size = size;
1614 rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th);
1620 * Create declaration of striped object
1622 int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
1623 struct lu_attr *attr,
1624 const struct lu_buf *lovea, struct thandle *th)
1626 struct lod_thread_info *info = lod_env_info(env);
1627 struct dt_object *next = dt_object_child(dt);
1628 struct lod_object *lo = lod_dt_obj(dt);
1632 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) {
1633 /* failed to create striping, let's reset
1634 * config so that others don't get confused */
1635 lod_object_free_striping(env, lo);
1636 GOTO(out, rc = -ENOMEM);
1639 /* choose OST and generate appropriate objects */
1640 rc = lod_qos_prep_create(env, lo, attr, lovea, th);
1642 /* failed to create striping, let's reset
1643 * config so that others don't get confused */
1644 lod_object_free_striping(env, lo);
1649 * declare storage for striping data
1651 info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
1652 lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1);
1653 rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
1659 * if striping is created with local object's size > 0,
1660 * we have to propagate this size to specific object
1661 * the case is possible only when local object was created previously
1663 if (dt_object_exists(next))
1664 rc = lod_declare_init_size(env, dt, th);
1670 int lod_dir_striping_create_internal(const struct lu_env *env,
1671 struct dt_object *dt,
1672 struct lu_attr *attr,
1673 const struct dt_object_format *dof,
1677 struct lod_thread_info *info = lod_env_info(env);
1678 struct dt_object *next = dt_object_child(dt);
1679 struct lod_object *lo = lod_dt_obj(dt);
1683 if (lo->ldo_dir_def_striping_set &&
1684 !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
1685 lo->ldo_dir_stripe_offset)) {
1686 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1687 int stripe_count = lo->ldo_stripenr + 1;
1689 if (info->lti_ea_store_size < sizeof(*v1)) {
1690 rc = lod_ea_store_resize(info, sizeof(*v1));
1693 v1 = info->lti_ea_store;
1696 memset(v1, 0, sizeof(*v1));
1697 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
1698 v1->lum_stripe_count = cpu_to_le32(stripe_count);
1699 v1->lum_stripe_offset =
1700 cpu_to_le32(lo->ldo_dir_stripe_offset);
1702 info->lti_buf.lb_buf = v1;
1703 info->lti_buf.lb_len = sizeof(*v1);
1706 rc = lod_declare_xattr_set_lmv(env, dt, attr,
1707 &info->lti_buf, th);
1709 rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
1710 XATTR_NAME_LMV, 0, th,
1716 /* Transfer default LMV striping from the parent */
1717 if (lo->ldo_dir_striping_cached &&
1718 !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
1719 lo->ldo_dir_def_stripe_offset)) {
1720 struct lmv_user_md_v1 *v1 = info->lti_ea_store;
1721 int def_stripe_count = lo->ldo_dir_def_stripenr + 1;
1723 if (info->lti_ea_store_size < sizeof(*v1)) {
1724 rc = lod_ea_store_resize(info, sizeof(*v1));
1727 v1 = info->lti_ea_store;
1730 memset(v1, 0, sizeof(*v1));
1731 v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
1732 v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
1733 v1->lum_stripe_offset =
1734 cpu_to_le32(lo->ldo_dir_def_stripe_offset);
1736 cpu_to_le32(lo->ldo_dir_def_hash_type);
1738 info->lti_buf.lb_buf = v1;
1739 info->lti_buf.lb_len = sizeof(*v1);
1741 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
1742 XATTR_NAME_DEFAULT_LMV, 0,
1745 rc = dt_xattr_set(env, next, &info->lti_buf,
1746 XATTR_NAME_DEFAULT_LMV, 0, th,
1752 /* Transfer default LOV striping from the parent */
1753 if (lo->ldo_striping_cached &&
1754 !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
1755 lo->ldo_def_stripenr,
1756 lo->ldo_def_stripe_offset)) {
1757 struct lov_user_md_v3 *v3 = info->lti_ea_store;
1759 if (info->lti_ea_store_size < sizeof(*v3)) {
1760 rc = lod_ea_store_resize(info, sizeof(*v3));
1763 v3 = info->lti_ea_store;
1766 memset(v3, 0, sizeof(*v3));
1767 v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
1768 v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
1769 v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
1770 v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
1772 strncpy(v3->lmm_pool_name, lo->ldo_pool,
1775 info->lti_buf.lb_buf = v3;
1776 info->lti_buf.lb_len = sizeof(*v3);
1779 rc = dt_declare_xattr_set(env, next, &info->lti_buf,
1780 XATTR_NAME_LOV, 0, th);
1782 rc = dt_xattr_set(env, next, &info->lti_buf,
1783 XATTR_NAME_LOV, 0, th,
1792 static int lod_declare_dir_striping_create(const struct lu_env *env,
1793 struct dt_object *dt,
1794 struct lu_attr *attr,
1795 struct dt_object_format *dof,
1798 return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
1801 static int lod_dir_striping_create(const struct lu_env *env,
1802 struct dt_object *dt,
1803 struct lu_attr *attr,
1804 struct dt_object_format *dof,
1807 return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
1810 static int lod_declare_object_create(const struct lu_env *env,
1811 struct dt_object *dt,
1812 struct lu_attr *attr,
1813 struct dt_allocation_hint *hint,
1814 struct dt_object_format *dof,
1817 struct dt_object *next = dt_object_child(dt);
1818 struct lod_object *lo = lod_dt_obj(dt);
1827 * first of all, we declare creation of local object
1829 rc = dt_declare_create(env, next, attr, hint, dof, th);
1833 if (dof->dof_type == DFT_SYM)
1834 dt->do_body_ops = &lod_body_lnk_ops;
1837 * it's lod_ah_init() who has decided the object will striped
1839 if (dof->dof_type == DFT_REGULAR) {
1840 /* callers don't want stripes */
1841 /* XXX: all tricky interactions with ->ah_make_hint() decided
1842 * to use striping, then ->declare_create() behaving differently
1843 * should be cleaned */
1844 if (dof->u.dof_reg.striped == 0)
1845 lo->ldo_stripenr = 0;
1846 if (lo->ldo_stripenr > 0)
1847 rc = lod_declare_striped_object(env, dt, attr,
1849 } else if (dof->dof_type == DFT_DIR) {
1850 rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
1856 int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
1857 struct lu_attr *attr, struct dt_object_format *dof,
1860 struct lod_object *lo = lod_dt_obj(dt);
1864 LASSERT(lo->ldo_striping_cached == 0);
1866 /* create all underlying objects */
1867 for (i = 0; i < lo->ldo_stripenr; i++) {
1868 LASSERT(lo->ldo_stripe[i]);
1869 rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th);
1875 rc = lod_generate_and_set_lovea(env, lo, th);
1880 static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
1881 struct lu_attr *attr,
1882 struct dt_allocation_hint *hint,
1883 struct dt_object_format *dof, struct thandle *th)
1885 struct dt_object *next = dt_object_child(dt);
1886 struct lod_object *lo = lod_dt_obj(dt);
1890 /* create local object */
1891 rc = dt_create(env, next, attr, hint, dof, th);
1894 if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
1895 rc = lod_dir_striping_create(env, dt, attr, dof, th);
1896 else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0)
1897 rc = lod_striping_create(env, dt, attr, dof, th);
1903 static int lod_declare_object_destroy(const struct lu_env *env,
1904 struct dt_object *dt,
1907 struct dt_object *next = dt_object_child(dt);
1908 struct lod_object *lo = lod_dt_obj(dt);
1913 * we declare destroy for the local object
1915 rc = dt_declare_destroy(env, next, th);
1920 * load striping information, notice we don't do this when object
1921 * is being initialized as we don't need this information till
1922 * few specific cases like destroy, chown
1924 rc = lod_load_striping(env, lo);
1928 /* declare destroy for all underlying objects */
1929 for (i = 0; i < lo->ldo_stripenr; i++) {
1930 LASSERT(lo->ldo_stripe[i]);
1931 rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
1940 static int lod_object_destroy(const struct lu_env *env,
1941 struct dt_object *dt, struct thandle *th)
1943 struct dt_object *next = dt_object_child(dt);
1944 struct lod_object *lo = lod_dt_obj(dt);
1948 /* destroy local object */
1949 rc = dt_destroy(env, next, th);
1953 /* destroy all underlying objects */
1954 for (i = 0; i < lo->ldo_stripenr; i++) {
1955 LASSERT(lo->ldo_stripe[i]);
1956 /* for striped directory, next == ldo_stripe[0] */
1957 if (next != lo->ldo_stripe[i]) {
1958 rc = dt_destroy(env, lo->ldo_stripe[i], th);
1967 static int lod_index_try(const struct lu_env *env, struct dt_object *dt,
1968 const struct dt_index_features *feat)
1970 struct dt_object *next = dt_object_child(dt);
1974 LASSERT(next->do_ops);
1975 LASSERT(next->do_ops->do_index_try);
1977 rc = next->do_ops->do_index_try(env, next, feat);
1978 if (next->do_index_ops && dt->do_index_ops == NULL)
1979 dt->do_index_ops = &lod_index_ops;
1984 static int lod_declare_ref_add(const struct lu_env *env,
1985 struct dt_object *dt, struct thandle *th)
1987 return dt_declare_ref_add(env, dt_object_child(dt), th);
1990 static int lod_ref_add(const struct lu_env *env,
1991 struct dt_object *dt, struct thandle *th)
1993 return dt_ref_add(env, dt_object_child(dt), th);
1996 static int lod_declare_ref_del(const struct lu_env *env,
1997 struct dt_object *dt, struct thandle *th)
1999 return dt_declare_ref_del(env, dt_object_child(dt), th);
2002 static int lod_ref_del(const struct lu_env *env,
2003 struct dt_object *dt, struct thandle *th)
2005 return dt_ref_del(env, dt_object_child(dt), th);
2008 static struct obd_capa *lod_capa_get(const struct lu_env *env,
2009 struct dt_object *dt,
2010 struct lustre_capa *old, __u64 opc)
2012 return dt_capa_get(env, dt_object_child(dt), old, opc);
2015 static int lod_object_sync(const struct lu_env *env, struct dt_object *dt)
2017 return dt_object_sync(env, dt_object_child(dt));
2020 struct lod_slave_locks {
2022 struct lustre_handle lsl_handle[0];
2025 static int lod_object_unlock_internal(const struct lu_env *env,
2026 struct dt_object *dt,
2027 struct ldlm_enqueue_info *einfo,
2028 ldlm_policy_data_t *policy)
2030 struct lod_object *lo = lod_dt_obj(dt);
2031 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2036 if (slave_locks == NULL)
2039 for (i = 0; i < slave_locks->lsl_lock_count; i++) {
2040 if (lustre_handle_is_used(&slave_locks->lsl_handle[i])) {
2043 einfo->ei_cbdata = &slave_locks->lsl_handle[i];
2044 rc1 = dt_object_unlock(env, lo->ldo_stripe[i], einfo,
2047 rc = rc == 0 ? rc1 : rc;
2054 static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
2055 struct ldlm_enqueue_info *einfo,
2056 union ldlm_policy_data *policy)
2058 struct lod_object *lo = lod_dt_obj(dt);
2059 struct lod_slave_locks *slave_locks = einfo->ei_cbdata;
2060 int slave_locks_size;
2064 if (slave_locks == NULL)
2067 rc = lod_load_striping(env, lo);
2071 /* Note: for remote lock for single stripe dir, MDT will cancel
2072 * the lock by lockh directly */
2073 if (lo->ldo_stripenr == 0 && dt_object_remote(dt_object_child(dt)))
2076 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2079 /* Only cancel slave lock for striped dir */
2080 rc = lod_object_unlock_internal(env, dt, einfo, policy);
2082 slave_locks_size = sizeof(*slave_locks) + slave_locks->lsl_lock_count *
2083 sizeof(slave_locks->lsl_handle[0]);
2084 OBD_FREE(slave_locks, slave_locks_size);
2085 einfo->ei_cbdata = NULL;
2090 static int lod_object_lock(const struct lu_env *env,
2091 struct dt_object *dt,
2092 struct lustre_handle *lh,
2093 struct ldlm_enqueue_info *einfo,
2094 union ldlm_policy_data *policy)
2096 struct lod_object *lo = lod_dt_obj(dt);
2099 int slave_locks_size;
2100 struct lod_slave_locks *slave_locks = NULL;
2103 /* remote object lock */
2104 if (!einfo->ei_enq_slave) {
2105 LASSERT(dt_object_remote(dt));
2106 return dt_object_lock(env, dt_object_child(dt), lh, einfo,
2110 if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
2113 rc = lod_load_striping(env, lo);
2118 if (lo->ldo_stripenr == 0)
2121 slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr *
2122 sizeof(slave_locks->lsl_handle[0]);
2123 /* Freed in lod_object_unlock */
2124 OBD_ALLOC(slave_locks, slave_locks_size);
2125 if (slave_locks == NULL)
2127 slave_locks->lsl_lock_count = lo->ldo_stripenr;
2129 /* striped directory lock */
2130 for (i = 0; i < lo->ldo_stripenr; i++) {
2131 struct lustre_handle lockh;
2133 LASSERT(lo->ldo_stripe[i]);
2134 rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
2139 slave_locks->lsl_handle[i] = lockh;
2142 einfo->ei_cbdata = slave_locks;
2145 if (rc != 0 && slave_locks != NULL) {
2146 einfo->ei_cbdata = slave_locks;
2147 lod_object_unlock_internal(env, dt, einfo, policy);
2148 OBD_FREE(slave_locks, slave_locks_size);
2149 einfo->ei_cbdata = NULL;
2155 struct dt_object_operations lod_obj_ops = {
2156 .do_read_lock = lod_object_read_lock,
2157 .do_write_lock = lod_object_write_lock,
2158 .do_read_unlock = lod_object_read_unlock,
2159 .do_write_unlock = lod_object_write_unlock,
2160 .do_write_locked = lod_object_write_locked,
2161 .do_attr_get = lod_attr_get,
2162 .do_declare_attr_set = lod_declare_attr_set,
2163 .do_attr_set = lod_attr_set,
2164 .do_xattr_get = lod_xattr_get,
2165 .do_declare_xattr_set = lod_declare_xattr_set,
2166 .do_xattr_set = lod_xattr_set,
2167 .do_declare_xattr_del = lod_declare_xattr_del,
2168 .do_xattr_del = lod_xattr_del,
2169 .do_xattr_list = lod_xattr_list,
2170 .do_ah_init = lod_ah_init,
2171 .do_declare_create = lod_declare_object_create,
2172 .do_create = lod_object_create,
2173 .do_declare_destroy = lod_declare_object_destroy,
2174 .do_destroy = lod_object_destroy,
2175 .do_index_try = lod_index_try,
2176 .do_declare_ref_add = lod_declare_ref_add,
2177 .do_ref_add = lod_ref_add,
2178 .do_declare_ref_del = lod_declare_ref_del,
2179 .do_ref_del = lod_ref_del,
2180 .do_capa_get = lod_capa_get,
2181 .do_object_sync = lod_object_sync,
2182 .do_object_lock = lod_object_lock,
2183 .do_object_unlock = lod_object_unlock,
2186 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
2187 struct lu_buf *buf, loff_t *pos,
2188 struct lustre_capa *capa)
2190 struct dt_object *next = dt_object_child(dt);
2191 return next->do_body_ops->dbo_read(env, next, buf, pos, capa);
2194 static ssize_t lod_declare_write(const struct lu_env *env,
2195 struct dt_object *dt,
2196 const loff_t size, loff_t pos,
2199 return dt_declare_record_write(env, dt_object_child(dt),
2203 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
2204 const struct lu_buf *buf, loff_t *pos,
2205 struct thandle *th, struct lustre_capa *capa, int iq)
2207 struct dt_object *next = dt_object_child(dt);
2209 return next->do_body_ops->dbo_write(env, next, buf, pos, th, capa, iq);
2212 static const struct dt_body_operations lod_body_lnk_ops = {
2213 .dbo_read = lod_read,
2214 .dbo_declare_write = lod_declare_write,
2215 .dbo_write = lod_write
2218 static int lod_object_init(const struct lu_env *env, struct lu_object *lo,
2219 const struct lu_object_conf *conf)
2221 struct lod_device *lod = lu2lod_dev(lo->lo_dev);
2222 struct lu_device *cdev = NULL;
2223 struct lu_object *cobj;
2224 struct lod_tgt_descs *ltd = NULL;
2225 struct lod_tgt_desc *tgt;
2227 int type = LU_SEQ_RANGE_ANY;
2231 rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type);
2235 if (type == LU_SEQ_RANGE_MDT &&
2236 idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) {
2237 cdev = &lod->lod_child->dd_lu_dev;
2238 } else if (type == LU_SEQ_RANGE_MDT) {
2239 ltd = &lod->lod_mdt_descs;
2241 } else if (type == LU_SEQ_RANGE_OST) {
2242 ltd = &lod->lod_ost_descs;
2249 if (ltd->ltd_tgts_size > idx &&
2250 cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) {
2251 tgt = LTD_TGT(ltd, idx);
2253 LASSERT(tgt != NULL);
2254 LASSERT(tgt->ltd_tgt != NULL);
2256 cdev = &(tgt->ltd_tgt->dd_lu_dev);
2258 lod_putref(lod, ltd);
2261 if (unlikely(cdev == NULL))
2264 cobj = cdev->ld_ops->ldo_object_alloc(env, lo->lo_header, cdev);
2265 if (unlikely(cobj == NULL))
2268 lu_object_add(lo, cobj);
2273 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
2277 if (lo->ldo_dir_stripe != NULL) {
2278 OBD_FREE_PTR(lo->ldo_dir_stripe);
2279 lo->ldo_dir_stripe = NULL;
2282 if (lo->ldo_stripe) {
2283 LASSERT(lo->ldo_stripes_allocated > 0);
2285 for (i = 0; i < lo->ldo_stripenr; i++) {
2286 if (lo->ldo_stripe[i])
2287 lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
2290 i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
2291 OBD_FREE(lo->ldo_stripe, i);
2292 lo->ldo_stripe = NULL;
2293 lo->ldo_stripes_allocated = 0;
2295 lo->ldo_stripenr = 0;
2296 lo->ldo_pattern = 0;
2300 * ->start is called once all slices are initialized, including header's
2301 * cache for mode (object type). using the type we can initialize ops
2303 static int lod_object_start(const struct lu_env *env, struct lu_object *o)
2305 if (S_ISLNK(o->lo_header->loh_attr & S_IFMT))
2306 lu2lod_obj(o)->ldo_obj.do_body_ops = &lod_body_lnk_ops;
2310 static void lod_object_free(const struct lu_env *env, struct lu_object *o)
2312 struct lod_object *mo = lu2lod_obj(o);
2315 * release all underlying object pinned
2318 lod_object_free_striping(env, mo);
2320 lod_object_set_pool(mo, NULL);
2323 OBD_SLAB_FREE_PTR(mo, lod_object_kmem);
2326 static void lod_object_release(const struct lu_env *env, struct lu_object *o)
2328 /* XXX: shouldn't we release everything here in case if object
2329 * creation failed before? */
2332 static int lod_object_print(const struct lu_env *env, void *cookie,
2333 lu_printer_t p, const struct lu_object *l)
2335 struct lod_object *o = lu2lod_obj((struct lu_object *) l);
2337 return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o);
2340 struct lu_object_operations lod_lu_obj_ops = {
2341 .loo_object_init = lod_object_init,
2342 .loo_object_start = lod_object_start,
2343 .loo_object_free = lod_object_free,
2344 .loo_object_release = lod_object_release,
2345 .loo_object_print = lod_object_print,