X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_dir.c;h=15be75a1d10c24b5d751c7c7ee253a39d67619b0;hb=5bcec28c0b07bb154a5a2ae491a119c2fd7791da;hp=390ce748c8e07947463a461fe99d8bf07bf79e57;hpb=1ee003be6e768a47beac6787b34c962607bf56cd;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 390ce74..15be75a 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -26,8 +26,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -78,11 +80,14 @@ static struct lu_name lname_dotdot = { static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask); +static int mdd_declare_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct thandle *handle); static int mdd_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, const struct lu_fid *pfid, const struct lu_name *lname, - struct thandle *handle); + struct thandle *handle, int first); static int mdd_links_rename(const struct lu_env *env, struct mdd_object *mdd_obj, const struct lu_fid *oldpfid, @@ -250,8 +255,8 @@ static int mdd_dir_is_empty(const struct lu_env *env, RETURN(-ENOTDIR); iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, BYPASS_CAPA); - if (it != NULL) { + it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); + if (!IS_ERR(it)) { result = iops->get(env, it, (const void *)""); if (result > 0) { int i; @@ -270,7 +275,7 @@ static int mdd_dir_is_empty(const struct lu_env *env, iops->put(env, it); iops->fini(env, it); } else - result = -ENOMEM; + result = PTR_ERR(it); RETURN(result); } @@ -488,43 +493,6 @@ int mdd_link_sanity_check(const struct lu_env *env, RETURN(rc); } -/** - * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and - * assign i_nlink to 1 which means the i_nlink for subdir count is incredible - * (maybe too large to be represented). It is a trick to break through the - * "i_nlink" limitation for subdir count. - */ -void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj, - struct thandle *handle) -{ - struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - struct mdd_device *m = mdd_obj2mdd_dev(obj); - - if (!mdd_is_mnlink(obj)) { - if (S_ISDIR(mdd_object_type(obj))) { - if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA)) - return; - - if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) { - obj->mod_flags |= MNLINK_OBJ; - tmp_la->la_nlink = 1; - tmp_la->la_valid = LA_NLINK; - mdd_attr_set_internal(env, obj, tmp_la, handle, - 0); - return; - } - } - mdo_ref_add(env, obj, handle); - } -} - -void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj, - struct thandle *handle, int is_dot) -{ - if (!mdd_is_mnlink(obj) || is_dot) - mdo_ref_del(env, obj, handle); -} - static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj, const char *name, struct thandle *handle, struct lustre_capa *capa) @@ -578,7 +546,7 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa); if (rc == 0 && is_dir) { mdd_write_lock(env, pobj, MOR_TGT_PARENT); - __mdd_ref_add(env, pobj, handle); + mdo_ref_add(env, pobj, handle); mdd_write_unlock(env, pobj); } RETURN(rc); @@ -594,18 +562,86 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, rc = __mdd_index_delete_only(env, pobj, name, handle, capa); if (rc == 0 && is_dir) { - int is_dot = 0; - - if (name != NULL && name[0] == '.' && name[1] == 0) - is_dot = 1; mdd_write_lock(env, pobj, MOR_TGT_PARENT); - __mdd_ref_del(env, pobj, handle, is_dot); + mdo_ref_del(env, pobj, handle); mdd_write_unlock(env, pobj); } RETURN(rc); } +int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd, + int reclen, struct thandle *handle) +{ + int rc; + + /* XXX: this is a temporary solution to declare llog changes + * will be fixed in 2.3 with new llog implementation */ + + LASSERT(mdd->mdd_capa); + + /* XXX: Since we use the 'mdd_capa' as fake llog object here, we + * have to set the parameter 'size' as INT_MAX or 0 to inform + * OSD that this record write is for a llog write or catalog + * header update, and osd declare function will reserve less + * credits for optimization purpose. + * + * Reserve 6 blocks for a llog write, since the llog file is + * usually small, reserve 2 blocks for catalog header update, + * because we know for sure that catalog header is already + * allocated. + * + * This hack should be removed in 2.3. + */ + + /* record itself */ + rc = dt_declare_record_write(env, mdd->mdd_capa, + DECLARE_LLOG_WRITE, 0, handle); + if (rc) + return rc; + + /* header will be updated as well */ + rc = dt_declare_record_write(env, mdd->mdd_capa, + DECLARE_LLOG_WRITE, 0, handle); + if (rc) + return rc; + + /* also we should be able to create new plain log */ + rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle); + if (rc) + return rc; + + /* new record referencing new plain llog */ + rc = dt_declare_record_write(env, mdd->mdd_capa, + DECLARE_LLOG_WRITE, 0, handle); + if (rc) + return rc; + + /* catalog's header will be updated as well */ + rc = dt_declare_record_write(env, mdd->mdd_capa, + DECLARE_LLOG_REWRITE, 0, handle); + + return rc; +} + +int mdd_declare_changelog_store(const struct lu_env *env, + struct mdd_device *mdd, + const struct lu_name *fname, + struct thandle *handle) +{ + int reclen; + + /* Not recording */ + if (!(mdd->mdd_cl.mc_flags & CLM_ON)) + return 0; + + /* we'll be writing payload + llog header */ + reclen = sizeof(struct llog_changelog_rec); + if (fname) + reclen += llog_data_len(fname->ln_namelen); + + return mdd_declare_llog_record(env, mdd, reclen, handle); +} /** Store a namespace change changelog record * If this fails, we must fail the whole transaction; we don't @@ -619,6 +655,7 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, static int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd, enum changelog_rec_type type, + int flags, struct mdd_object *target, struct mdd_object *parent, const struct lu_fid *tf, @@ -633,8 +670,11 @@ static int mdd_changelog_ns_store(const struct lu_env *env, int rc; ENTRY; + /* Not recording */ if (!(mdd->mdd_cl.mc_flags & CLM_ON)) RETURN(0); + if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0) + RETURN(0); LASSERT(parent != NULL); LASSERT(tname != NULL); @@ -647,7 +687,7 @@ static int mdd_changelog_ns_store(const struct lu_env *env, RETURN(-ENOMEM); rec = (struct llog_changelog_rec *)buf->lb_buf; - rec->cr.cr_flags = CLF_VERSION; + rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags); rec->cr.cr_type = (__u32)type; tfid = tf ? tf : mdo2fid(target); rec->cr.cr_tfid = *tfid; @@ -667,6 +707,40 @@ static int mdd_changelog_ns_store(const struct lu_env *env, return 0; } +static int mdd_declare_link(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *p, + struct mdd_object *c, + const struct lu_name *name, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle); + if (rc) + return rc; + + rc = mdo_declare_ref_add(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, p, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, c, NULL, handle); + if (rc) + return rc; + + rc = mdd_declare_links_add(env, c, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + + return rc; +} + static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct md_object *src_obj, const struct lu_name *lname, struct md_attr *ma) @@ -680,6 +754,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct thandle *handle; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 }; @@ -698,18 +773,25 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, quota_opc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qids); /* get block quota for parent */ - lquota_chkquota(mds_quota_interface_ref, obd, + lquota_chkquota(mds_quota_interface_ref, obd, exp, qids, rec_pending, 1, NULL, LQUOTA_FLAGS_BLK, data, 1); } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -725,7 +807,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, if (rc) GOTO(out_unlock, rc); - __mdd_ref_add(env, mdd_sobj, handle); + mdo_ref_add(env, mdd_sobj, handle); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -737,8 +819,10 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0); - if (rc == 0) - mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj), lname, handle); + if (rc == 0) { + mdd_links_add(env, mdd_sobj, + mdo2fid(mdd_tobj), lname, handle, 0); + } EXIT; out_unlock: @@ -746,8 +830,9 @@ out_unlock: mdd_pdo_write_unlock(env, mdd_tobj, dlh); out_trans: if (rc == 0) - rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, mdd_sobj, + rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj, mdd_tobj, NULL, lname, handle); +stop: mdd_trans_stop(env, mdd, rc, handle); out_pending: #ifdef HAVE_QUOTA_SUPPORT @@ -763,6 +848,20 @@ out_pending: return rc; } +int mdd_declare_finish_unlink(const struct lu_env *env, + struct mdd_object *obj, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = orph_declare_index_insert(env, obj, handle); + if (rc) + return rc; + + return mdd_declare_object_kill(env, obj, ma, handle); +} + /* caller should take a lock before calling */ int mdd_finish_unlink(const struct lu_env *env, struct mdd_object *obj, struct md_attr *ma, @@ -770,12 +869,15 @@ int mdd_finish_unlink(const struct lu_env *env, { int rc; int reset = 1; + int is_dir = S_ISDIR(ma->ma_attr.la_mode); ENTRY; LASSERT(mdd_write_locked(env, obj) != 0); - rc = mdd_iattr_get(env, obj, ma); - if (rc == 0 && ma->ma_attr.la_nlink == 0) { + /* read HSM flags, needed to set changelogs flags */ + ma->ma_need = MA_HSM | MA_INODE; + rc = mdd_attr_get_internal(env, obj, ma); + if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) { obj->mod_flags |= DEAD_OBJ; /* add new orphan and the object * will be deleted during mdd_close() */ @@ -793,11 +895,14 @@ int mdd_finish_unlink(const struct lu_env *env, PFID(mdd_object_fid(obj)), obj->mod_count); } else { - rc = mdd_object_kill(env, obj, ma); + rc = mdd_object_kill(env, obj, ma, th); if (rc == 0) reset = 0; } + /* get the i_nlink */ + ma->ma_need = MA_INODE; + rc = mdd_attr_get_internal(env, obj, ma); } if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); @@ -820,6 +925,50 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj, RETURN(rc); } +static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd, + struct mdd_object *p, struct mdd_object *c, + const struct lu_name *name, struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_index_delete(env, p, name->ln_name, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, p, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, p, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, c, NULL, handle); + if (rc) + return rc; + + rc = mdd_declare_finish_unlink(env, c, ma, handle); + if (rc) + return rc; + + rc = mdd_declare_links_add(env, c, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + + return rc; +} + static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct lu_name *lname, struct md_attr *ma) @@ -842,17 +991,22 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, int rc; ENTRY; - LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n", - PFID(mdd_object_fid(mdd_cobj))); - - rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP); - if (rc) - RETURN(rc); + if (mdd_object_exists(mdd_cobj) <= 0) + RETURN(-ENOENT); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); + rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj, + lname, ma, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -867,10 +1021,10 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (rc) GOTO(cleanup, rc); - __mdd_ref_del(env, mdd_cobj, handle, 0); + mdo_ref_del(env, mdd_cobj, handle); if (is_dir) /* unlink dot */ - __mdd_ref_del(env, mdd_cobj, handle, 1); + mdo_ref_del(env, mdd_cobj, handle); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -880,10 +1034,14 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (rc) GOTO(cleanup, rc); - la->la_valid = LA_CTIME; - rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0); - if (rc) - GOTO(cleanup, rc); + if (ma->ma_attr.la_nlink > 0 || mdd_cobj->mod_count > 0) { + /* update ctime of an unlinked file only if it is still + * opened or a link still exists */ + la->la_valid = LA_CTIME; + rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0); + if (rc) + GOTO(cleanup, rc); + } rc = mdd_finish_unlink(env, mdd_cobj, ma, handle); #ifdef HAVE_QUOTA_SUPPORT @@ -903,11 +1061,6 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, } } #endif - - if (rc == 0) - obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp, - sizeof(KEY_UNLINKED), KEY_UNLINKED, 0, - NULL, NULL); if (!is_dir) /* old files may not have link ea; ignore errors */ mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj), @@ -918,13 +1071,33 @@ cleanup: mdd_write_unlock(env, mdd_cobj); mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: - if (rc == 0) + if (rc == 0) { + int cl_flags; + + cl_flags = (ma->ma_attr.la_nlink == 0) ? CLF_UNLINK_LAST : 0; + if ((ma->ma_valid & MA_HSM) && + (ma->ma_hsm.mh_flags & HS_EXISTS)) + cl_flags |= CLF_UNLINK_HSM_EXISTS; + rc = mdd_changelog_ns_store(env, mdd, - is_dir ? CL_RMDIR : CL_UNLINK, - mdd_cobj, mdd_pobj, NULL, lname, - handle); + is_dir ? CL_RMDIR : CL_UNLINK, cl_flags, + mdd_cobj, mdd_pobj, NULL, lname, handle); + } +stop: mdd_trans_stop(env, mdd, rc, handle); +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,3,50,0) + if (rc == 0 && ma->ma_valid & MA_COOKIE && ma->ma_valid & MA_LOV && + ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_UNLINK_DESTROY) + /* Since echo client is incapable of destorying ost object, + * it will destory the object here. */ + rc = mdd_lovobj_unlink(env, mdd, mdd_cobj, la, + ma->ma_lmm, ma->ma_lmm_size, + ma->ma_cookie, 1); +#else +#warning "please remove this after 2.4 (LOD/OSP)." +#endif + #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) /* Trigger dqrel on the owner of child and parent. If failed, @@ -971,6 +1144,7 @@ static int mdd_name_insert(const struct lu_env *env, #ifdef HAVE_QUOTA_SUPPORT struct md_ucred *uc = md_ucred(env); struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 }; @@ -979,6 +1153,10 @@ static int mdd_name_insert(const struct lu_env *env, int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) { @@ -992,7 +1170,7 @@ static int mdd_name_insert(const struct lu_env *env, mdd_quota_wrapper(la_tmp, qids); /* get block quota for parent */ lquota_chkquota(mds_quota_interface_ref, obd, - qids, rec_pending, 1, NULL, + exp, qids, rec_pending, 1, NULL, LQUOTA_FLAGS_BLK, data, 1); } } else { @@ -1000,11 +1178,12 @@ static int mdd_name_insert(const struct lu_env *env, } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP); - handle = mdd_trans_start(env, mdo2mdd(pobj)); + handle = mdd_trans_create(env, mdo2mdd(pobj)); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdo2mdd(pobj), handle); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1094,6 +1273,10 @@ static int mdd_name_remove(const struct lu_env *env, int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; @@ -1105,11 +1288,12 @@ static int mdd_name_remove(const struct lu_env *env, } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdd, handle); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1194,6 +1378,7 @@ static int mdd_rename_tgt(const struct lu_env *env, struct thandle *handle; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; @@ -1203,6 +1388,10 @@ static int mdd_rename_tgt(const struct lu_env *env, int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && !tobj) { struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; @@ -1214,17 +1403,18 @@ static int mdd_rename_tgt(const struct lu_env *env, quota_popc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qpids); /* get block quota for target parent */ - lquota_chkquota(mds_quota_interface_ref, obd, + lquota_chkquota(mds_quota_interface_ref, obd, exp, qpids, rec_pending, 1, NULL, LQUOTA_FLAGS_BLK, data, 1); } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdd, handle); + dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1262,11 +1452,11 @@ static int mdd_rename_tgt(const struct lu_env *env, * it must be local one. */ if (tobj && mdd_object_exists(mdd_tobj)) { - __mdd_ref_del(env, mdd_tobj, handle, 0); + mdo_ref_del(env, mdd_tobj, handle); /* Remove dot reference. */ if (S_ISDIR(ma->ma_attr.la_mode)) - __mdd_ref_del(env, mdd_tobj, handle, 1); + mdo_ref_del(env, mdd_tobj, handle); la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); @@ -1294,7 +1484,7 @@ out_trans: if (rc == 0) /* Bare EXT record with no RENAME in front of it signifies a partial slave op */ - rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj, + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, 0, mdd_tobj, mdd_tpobj, NULL, lname, handle); mdd_trans_stop(env, mdd, rc, handle); @@ -1332,6 +1522,27 @@ static int mdd_cd_sanity_check(const struct lu_env *env, } +static int mdd_declare_create_data(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *obj, + int lmm_size, + struct thandle *handle) +{ + struct lu_buf *buf = &mdd_env_info(env)->mti_buf; + int rc; + + buf->lb_buf = NULL; + buf->lb_len = lmm_size; + rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV, + 0, handle); + if (rc) + return rc; + + rc = mdd_declare_lov_objid_update(env, mdd, handle); + + return rc; +} + static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct md_op_spec *spec, struct md_attr *ma) @@ -1339,7 +1550,6 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, struct mdd_device *mdd = mdo2mdd(cobj); struct mdd_object *mdd_pobj = md2mdd_obj(pobj); struct mdd_object *son = md2mdd_obj(cobj); - struct lu_attr *attr = &ma->ma_attr; struct lov_mds_md *lmm = NULL; int lmm_size = 0; struct thandle *handle; @@ -1353,16 +1563,23 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (!md_should_create(spec->sp_cr_flags)) RETURN(0); lmm_size = ma->ma_lmm_size; - rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, - spec, attr); + + rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, ma); if (rc) RETURN(rc); - mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); + rc = mdd_declare_create_data(env, mdd, son, lmm_size, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + /* * XXX: Setting the lov ea is not locked but setting the attr is locked? * Should this be fixed? @@ -1388,9 +1605,12 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (rc == 0) mdd_lov_objid_update(mdd, lmm); +stop: mdd_trans_stop(env, mdd, rc, handle); out_free: /* Finish mdd_lov_create() stuff. */ + /* if no_create == 0 (not replay), we free lmm allocated by + * mdd_lov_create() */ mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec); RETURN(rc); } @@ -1417,7 +1637,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, else if (unlikely(rc < 0)) { CERROR("Object "DFID" locates on remote server\n", PFID(mdo2fid(mdd_obj))); - LBUG(); + RETURN(-EINVAL); } /* The common filename length check. */ @@ -1445,6 +1665,26 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, RETURN(rc); } +int mdd_declare_object_initialize(const struct lu_env *env, + struct mdd_object *child, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_attr_set(env, child, &ma->ma_attr, handle); + if (rc == 0 && S_ISDIR(ma->ma_attr.la_mode)) { + rc = mdo_declare_index_insert(env, child, mdo2fid(child), + dot, handle); + if (rc == 0) + rc = mdo_declare_ref_add(env, child, handle); + } + if (rc == 0) + mdd_declare_links_add(env, child, handle); + + return rc; +} + int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, const struct lu_name *lname, struct mdd_object *child, struct md_attr *ma, struct thandle *handle, @@ -1467,7 +1707,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, if (S_ISDIR(ma->ma_attr.la_mode)) { /* Add "." and ".." for newly created dir */ - __mdd_ref_add(env, child, handle); + mdo_ref_add(env, child, handle); rc = __mdd_index_insert_only(env, child, mdo2fid(child), dot, handle, BYPASS_CAPA); if (rc == 0) @@ -1475,10 +1715,10 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, dotdot, handle, BYPASS_CAPA); if (rc != 0) - __mdd_ref_del(env, child, handle, 1); + mdo_ref_del(env, child, handle); } if (rc == 0) - mdd_links_add(env, child, pfid, lname, handle); + mdd_links_add(env, child, pfid, lname, handle, 1); RETURN(rc); } @@ -1567,6 +1807,76 @@ static int mdd_create_sanity_check(const struct lu_env *env, RETURN(rc); } +static int mdd_declare_create(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *p, + struct mdd_object *c, + const struct lu_name *name, + struct md_attr *ma, + int lmm_size, + struct thandle *handle, + const struct md_op_spec *spec) +{ + struct lu_buf *buf = &mdd_env_info(env)->mti_buf; + int rc = 0; + + rc = mdd_declare_object_create_internal(env, p, c, ma, handle, spec); + if (rc) + GOTO(out, rc); + + /* if dir, then can inherit default ACl */ + buf->lb_buf = NULL; + buf->lb_len = lmm_size; + if (S_ISDIR(ma->ma_attr.la_mode)) { + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_DEFAULT, + 0, handle); + if (rc == 0) + rc = mdo_declare_ref_add(env, p, handle); + } + if (rc) + GOTO(out, rc); + + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_ACCESS, + 0, handle); + if (rc) + GOTO(out, rc); + + rc = mdd_declare_object_initialize(env, c, ma, handle); + if (rc) + GOTO(out, rc); + + rc = mdo_declare_index_insert(env, p, mdo2fid(c), + name->ln_name, handle); + if (rc) + GOTO(out, rc); + + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, + 0, handle); + if (rc) + GOTO(out, rc); + + if (S_ISLNK(ma->ma_attr.la_mode)) { + rc = dt_declare_record_write(env, mdd_object_child(c), + strlen(spec->u.sp_symname), 0, + handle); + if (rc) + GOTO(out, rc); + } + rc = mdo_declare_attr_set(env, p, &ma->ma_attr, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + if (rc) + return rc; + + rc = mdd_declare_lov_objid_update(env, mdd, handle); + +out: + return rc; +} + + /* * Create object and insert it into namespace. */ @@ -1592,6 +1902,7 @@ static int mdd_create(const struct lu_env *env, int got_def_acl = 0; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; @@ -1655,8 +1966,9 @@ static int mdd_create(const struct lu_env *env, mdd_quota_wrapper(&ma->ma_attr, qcids); mdd_quota_wrapper(la_tmp, qpids); /* get file quota for child */ - lquota_chkquota(mds_quota_interface_ref, obd, qcids, - inode_pending, 1, NULL, 0, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qcids, inode_pending, 1, NULL, 0, NULL, + 0); switch (ma->ma_attr.la_mode & S_IFMT) { case S_IFLNK: case S_IFDIR: @@ -1674,13 +1986,14 @@ static int mdd_create(const struct lu_env *env, /* get block quota for child and parent */ if (block_count) lquota_chkquota(mds_quota_interface_ref, obd, - qcids, block_pending, + exp, qcids, block_pending, block_count, NULL, LQUOTA_FLAGS_BLK, NULL, 0); if (!same) lquota_chkquota(mds_quota_interface_ref, obd, - qpids, parent_pending, 1, NULL, - LQUOTA_FLAGS_BLK, NULL, 0); + exp, qpids, parent_pending, 1, + NULL, LQUOTA_FLAGS_BLK, NULL, + 0); } } #endif @@ -1692,7 +2005,7 @@ static int mdd_create(const struct lu_env *env, if (S_ISREG(attr->la_mode)) { lmm_size = ma->ma_lmm_size; rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, - spec, attr); + spec, ma); if (rc) GOTO(out_pending, rc); } @@ -1712,11 +2025,19 @@ static int mdd_create(const struct lu_env *env, got_def_acl = 1; } - mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); + rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, ma, + lmm_size, handle, spec); + if (rc) + GOTO(out_stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(out_stop, rc); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1824,9 +2145,9 @@ cleanup: if (rc2 == 0) { mdd_write_lock(env, son, MOR_TGT_CHILD); - __mdd_ref_del(env, son, handle, 0); + mdo_ref_del(env, son, handle); if (initialized && S_ISDIR(attr->la_mode)) - __mdd_ref_del(env, son, handle, 1); + mdo_ref_del(env, son, handle); mdd_write_unlock(env, son); } } @@ -1842,10 +2163,11 @@ out_trans: S_ISDIR(attr->la_mode) ? CL_MKDIR : S_ISREG(attr->la_mode) ? CL_CREATE : S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD, - son, mdd_pobj, NULL, lname, handle); + 0, son, mdd_pobj, NULL, lname, handle); +out_stop: mdd_trans_stop(env, mdd, rc, handle); out_free: - /* finis lov_create stuff, free all temporary data */ + /* finish lov_create stuff, free all temporary data */ mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec); out_pending: #ifdef HAVE_QUOTA_SUPPORT @@ -1948,6 +2270,124 @@ static int mdd_rename_sanity_check(const struct lu_env *env, RETURN(rc); } +static int mdd_declare_rename(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *mdd_spobj, + struct mdd_object *mdd_tpobj, + struct mdd_object *mdd_sobj, + struct mdd_object *mdd_tobj, + const struct lu_name *sname, + const struct lu_name *tname, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + LASSERT(mdd_spobj); + LASSERT(mdd_tpobj); + LASSERT(mdd_sobj); + + /* name from source dir */ + rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle); + if (rc) + return rc; + + /* .. from source child */ + if (S_ISDIR(mdd_object_type(mdd_sobj))) { + /* source child can be directory, + * counted by source dir's nlink */ + rc = mdo_declare_ref_del(env, mdd_spobj, handle); + if (rc) + return rc; + + rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle); + if (rc) + return rc; + + rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj), + dotdot, handle); + if (rc) + return rc; + + /* new target child can be directory, + * counted by target dir's nlink */ + rc = mdo_declare_ref_add(env, mdd_tpobj, handle); + if (rc) + return rc; + + } + + rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle); + if (rc) + return rc; + mdd_declare_links_add(env, mdd_sobj, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle); + if (rc) + return rc; + + /* new name */ + rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj), + tname->ln_name, handle); + if (rc) + return rc; + + /* name from target dir (old name), we declare it unconditionally + * as mdd_rename() calls delete unconditionally as well. so just + * to balance declarations vs calls to change ... */ + rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle); + if (rc) + return rc; + + if (mdd_tobj && mdd_object_exists(mdd_tobj)) { + /* delete target child in target parent directory */ + rc = mdo_declare_ref_del(env, mdd_tobj, handle); + if (rc) + return rc; + + if (S_ISDIR(mdd_object_type(mdd_tobj))) { + /* target child can be directory, + * delete "." reference in target child directory */ + rc = mdo_declare_ref_del(env, mdd_tobj, handle); + if (rc) + return rc; + + /* delete ".." reference in target parent directory */ + rc = mdo_declare_ref_del(env, mdd_tpobj, handle); + if (rc) + return rc; + } + + rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle); + if (rc) + return rc; + + mdd_declare_links_add(env, mdd_tobj, handle); + if (rc) + return rc; + + rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle); + if (rc) + return rc; + } + + rc = mdd_declare_changelog_store(env, mdd, tname, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, sname, handle); + if (rc) + return rc; + + return rc; +} + /* src object can be remote that is why we use only fid and type of object */ static int mdd_rename(const struct lu_env *env, struct md_object *src_pobj, struct md_object *tgt_pobj, @@ -1972,6 +2412,7 @@ static int mdd_rename(const struct lu_env *env, #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qspids[MAXQUOTAS] = { 0, 0 }; unsigned int qtcids[MAXQUOTAS] = { 0, 0 }; @@ -2004,7 +2445,7 @@ static int mdd_rename(const struct lu_env *env, mdd_quota_wrapper(la_tmp, qtpids); /* get block quota for target parent */ lquota_chkquota(mds_quota_interface_ref, - obd, qtpids, + obd, exp, qtpids, rec_pending, 1, NULL, LQUOTA_FLAGS_BLK, data, 1); @@ -2013,11 +2454,21 @@ static int mdd_rename(const struct lu_env *env, } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP); - handle = mdd_trans_start(env, mdd); + mdd_sobj = mdd_object_find(env, mdd, lf); + + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj, + mdd_tobj, lsname, ltname, ma, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + /* FIXME: Should consider tobj and sobj too in rename_lock. */ rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj); if (rc < 0) @@ -2043,7 +2494,6 @@ static int mdd_rename(const struct lu_env *env, if (sdlh == NULL || tdlh == NULL) GOTO(cleanup, rc = -ENOMEM); - mdd_sobj = mdd_object_find(env, mdd, lf); rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj, ma); if (rc) @@ -2056,7 +2506,7 @@ static int mdd_rename(const struct lu_env *env, GOTO(cleanup, rc); /* "mv dir1 dir2" needs "dir1/.." link update */ - if (is_dir && mdd_sobj) { + if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) { rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle, mdd_object_capa(env, mdd_sobj)); if (rc) @@ -2064,9 +2514,8 @@ static int mdd_rename(const struct lu_env *env, rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot, handle, mdd_object_capa(env, mdd_sobj)); - if (rc) { + if (rc) GOTO(fixup_spobj, rc); - } } /* Remove target name from target directory @@ -2116,11 +2565,11 @@ static int mdd_rename(const struct lu_env *env, rc = -EINVAL; goto cleanup; } - __mdd_ref_del(env, mdd_tobj, handle, 0); + mdo_ref_del(env, mdd_tobj, handle); /* Remove dot reference. */ if (is_dir) - __mdd_ref_del(env, mdd_tobj, handle, 1); + mdo_ref_del(env, mdd_tobj, handle); la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); @@ -2159,7 +2608,7 @@ static int mdd_rename(const struct lu_env *env, if (rc == -ENOENT) /* Old files might not have EA entry */ mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj), - lsname, handle); + lsname, handle, 0); mdd_write_unlock(env, mdd_sobj); /* We don't fail the transaction if the link ea can't be updated -- fid2path will use alternate lookup method. */ @@ -2216,12 +2665,20 @@ cleanup: mdd_pdo_write_unlock(env, mdd_spobj, sdlh); cleanup_unlocked: if (rc == 0) - rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, mdd_tobj, + rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, 0, mdd_tobj, mdd_spobj, lf, lsname, handle); - if (rc == 0) - rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj, - mdd_tpobj, lf, ltname, handle); + if (rc == 0) { + struct lu_fid zero_fid; + fid_zero(&zero_fid); + /* If the rename target exist, The CL_EXT record should save + * the target fid as tfid, otherwise, use zero fid. LU-543 */ + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, 0, mdd_tobj, + mdd_tpobj, + mdd_tobj ? NULL : &zero_fid, + ltname, handle); + } +stop: mdd_trans_stop(env, mdd, rc, handle); if (mdd_sobj) mdd_object_put(env, mdd_sobj); @@ -2277,8 +2734,7 @@ struct lu_buf *mdd_links_get(const struct lu_env *env, rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); if (rc == -ERANGE) { /* Buf was too small, figure out what we need. */ - buf->lb_buf = NULL; - buf->lb_len = 0; + mdd_buf_put(buf); rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); if (rc < 0) return ERR_PTR(rc); @@ -2313,10 +2769,12 @@ struct lu_buf *mdd_links_get(const struct lu_env *env, static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname, const struct lu_fid *pfid) { - int reclen; + struct lu_fid tmpfid; + int reclen; - fid_cpu_to_be(&lee->lee_parent_fid, pfid); - strncpy(lee->lee_name, lname->ln_name, lname->ln_namelen); + fid_cpu_to_be(&tmpfid, pfid); + memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid)); + memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen); reclen = sizeof(struct link_ea_entry) + lname->ln_namelen; lee->lee_reclen[0] = (reclen >> 8) & 0xff; @@ -2328,7 +2786,8 @@ void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen, struct lu_name *lname, struct lu_fid *pfid) { *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1]; - fid_be_to_cpu(pfid, &lee->lee_parent_fid); + memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid)); + fid_be_to_cpu(pfid, pfid); lname->ln_name = lee->lee_name; lname->ln_namelen = *reclen - sizeof(struct link_ea_entry); } @@ -2361,6 +2820,20 @@ static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf, return 0; } +static int mdd_declare_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct thandle *handle) +{ + int rc; + + /* XXX: max size? */ + rc = mdo_declare_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, NULL, 4096), + XATTR_NAME_LINK, 0, handle); + + return rc; +} + /* For pathologic linkers, we don't want to spend lots of time scanning the * link ea. Limit ourseleves to something reasonable; links not in the EA * can be looked up via (slower) parent lookup. @@ -2371,7 +2844,7 @@ static int mdd_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, const struct lu_fid *pfid, const struct lu_name *lname, - struct thandle *handle) + struct thandle *handle, int first) { struct lu_buf *buf; struct link_ea_header *leh; @@ -2381,7 +2854,7 @@ static int mdd_links_add(const struct lu_env *env, if (!mdd_linkea_enable) RETURN(0); - buf = mdd_links_get(env, mdd_obj); + buf = first ? ERR_PTR(-ENODATA) : mdd_links_get(env, mdd_obj); if (IS_ERR(buf)) { rc = PTR_ERR(buf); if (rc != -ENODATA) { @@ -2411,11 +2884,16 @@ static int mdd_links_add(const struct lu_env *env, rc = __mdd_xattr_set(env, mdd_obj, mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), XATTR_NAME_LINK, 0, handle); - if (rc) - CERROR("link_ea add failed %d "DFID"\n", rc, - PFID(mdd_object_fid(mdd_obj))); + if (rc) { + if (rc == -ENOSPC) + CDEBUG(D_INODE, "link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + else + CERROR("link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + } - if (buf->lb_vmalloc) + if (buf->lb_len > OBD_ALLOC_BIG) /* if we vmalloced a large buffer drop it */ mdd_buf_put(buf); @@ -2450,8 +2928,12 @@ static int mdd_links_rename(const struct lu_env *env, buf = mdd_links_get(env, mdd_obj); if (IS_ERR(buf)) { rc = PTR_ERR(buf); - CERROR("link_ea read failed %d "DFID"\n", - rc, PFID(mdd_object_fid(mdd_obj))); + if (rc == -ENODATA) + CDEBUG(D_INODE, "link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); + else + CERROR("link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); RETURN(rc); } leh = buf->lb_buf; @@ -2498,7 +2980,7 @@ out: oldlname->ln_namelen, oldlname->ln_name, rc, PFID(mdd_object_fid(mdd_obj))); - if (buf->lb_vmalloc) + if (buf->lb_len > OBD_ALLOC_BIG) /* if we vmalloced a large buffer drop it */ mdd_buf_put(buf); @@ -2512,8 +2994,9 @@ const struct md_dir_operations mdd_dir_ops = { .mdo_rename = mdd_rename, .mdo_link = mdd_link, .mdo_unlink = mdd_unlink, + .mdo_lum_lmm_cmp = mdd_lum_lmm_cmp, .mdo_name_insert = mdd_name_insert, .mdo_name_remove = mdd_name_remove, .mdo_rename_tgt = mdd_rename_tgt, - .mdo_create_data = mdd_create_data + .mdo_create_data = mdd_create_data, };