X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_dir.c;h=0e893c9a9f9f3816d7e590a8f90ee0e95ba5f353;hp=1fb0ffd27e7fa29c712eb1cfa3bd200674316b7d;hb=bc962bde3b109b99c924137ed281d9400637e295;hpb=5165cdd4b063d523e5ae261f47818b5ba2bbc7cc diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 1fb0ffd..0e893c9 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -63,18 +63,24 @@ static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, static int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, struct thandle *handle); -static int mdd_links_add(const struct lu_env *env, - struct mdd_object *mdd_obj, - const struct lu_fid *pfid, - const struct lu_name *lname, - struct thandle *handle, int first); +static inline int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle, int first); +static inline int mdd_links_del(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle); static int mdd_links_rename(const struct lu_env *env, - struct mdd_object *mdd_obj, - const struct lu_fid *oldpfid, - const struct lu_name *oldlname, - const struct lu_fid *newpfid, - const struct lu_name *newlname, - struct thandle *handle); + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle, + int first, int check); static int __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, @@ -339,31 +345,31 @@ int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj, * VTX feature has been checked already, no need check again. */ static inline int mdd_is_sticky(const struct lu_env *env, - struct mdd_object *pobj, - struct mdd_object *cobj) + struct mdd_object *pobj, + struct mdd_object *cobj) { - struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - struct md_ucred *uc = md_ucred(env); - int rc; + struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; + struct lu_ucred *uc = lu_ucred_assert(env); + int rc; - if (pobj) { - rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA); - if (rc) - return rc; + if (pobj) { + rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA); + if (rc) + return rc; - if (!(tmp_la->la_mode & S_ISVTX) || - (tmp_la->la_uid == uc->mu_fsuid)) - return 0; - } + if (!(tmp_la->la_mode & S_ISVTX) || + (tmp_la->la_uid == uc->uc_fsuid)) + return 0; + } - rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA); - if (rc) - return rc; + rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA); + if (rc) + return rc; - if (tmp_la->la_uid == uc->mu_fsuid) - return 0; + if (tmp_la->la_uid == uc->uc_fsuid) + return 0; - return !mdd_capable(uc, CFS_CAP_FOWNER); + return !mdd_capable(uc, CFS_CAP_FOWNER); } /* @@ -493,27 +499,28 @@ static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object * } static int __mdd_index_insert_only(const struct lu_env *env, - struct mdd_object *pobj, - const struct lu_fid *lf, const char *name, - struct thandle *handle, - struct lustre_capa *capa) + struct mdd_object *pobj, + const struct lu_fid *lf, const char *name, + struct thandle *handle, + struct lustre_capa *capa) { - struct dt_object *next = mdd_object_child(pobj); - int rc; - ENTRY; + struct dt_object *next = mdd_object_child(pobj); + int rc; + ENTRY; - if (dt_try_as_dir(env, next)) { - struct md_ucred *uc = md_ucred(env); + if (dt_try_as_dir(env, next)) { + struct lu_ucred *uc = lu_ucred_check(env); + int ignore_quota; - rc = next->do_index_ops->dio_insert(env, next, - (struct dt_rec*)lf, - (const struct dt_key *)name, - handle, capa, uc->mu_cap & - CFS_CAP_SYS_RESOURCE_MASK); - } else { - rc = -ENOTDIR; - } - RETURN(rc); + ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1; + rc = next->do_index_ops->dio_insert(env, next, + (struct dt_rec*)lf, + (const struct dt_key *)name, + handle, capa, ignore_quota); + } else { + rc = -ENOTDIR; + } + RETURN(rc); } /* insert named index, add reference if isdir */ @@ -610,19 +617,35 @@ int mdd_declare_changelog_store(const struct lu_env *env, const struct lu_name *fname, struct thandle *handle) { - int reclen; + struct obd_device *obd = mdd2obd_dev(mdd); + struct llog_ctxt *ctxt; + struct llog_changelog_rec *rec; + struct lu_buf *buf; + int reclen; + int rc; /* Not recording */ if (!(mdd->mdd_cl.mc_flags & CLM_ON)) return 0; - /* we'll be writing payload + llog header */ - reclen = sizeof(struct llog_changelog_rec); - if (fname) - reclen += fname->ln_namelen; - reclen = llog_data_len(reclen); + reclen = llog_data_len(sizeof(*rec) + + (fname != NULL ? fname->ln_namelen : 0)); + buf = mdd_buf_alloc(env, reclen); + if (buf->lb_buf == NULL) + return -ENOMEM; + + rec = buf->lb_buf; + rec->cr_hdr.lrh_len = reclen; + rec->cr_hdr.lrh_type = CHANGELOG_REC; - return mdd_declare_llog_record(env, mdd, reclen, handle); + ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); + if (ctxt == NULL) + return -ENXIO; + + rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle); + llog_ctxt_put(ctxt); + + return rc; } static int mdd_declare_changelog_ext_store(const struct lu_env *env, @@ -631,21 +654,112 @@ static int mdd_declare_changelog_ext_store(const struct lu_env *env, const struct lu_name *sname, struct thandle *handle) { - int reclen; + struct obd_device *obd = mdd2obd_dev(mdd); + struct llog_ctxt *ctxt; + struct llog_changelog_ext_rec *rec; + struct lu_buf *buf; + int reclen; + int rc; /* Not recording */ if (!(mdd->mdd_cl.mc_flags & CLM_ON)) return 0; - /* we'll be writing payload + llog header */ - reclen = sizeof(struct llog_changelog_ext_rec); - if (tname) - reclen += tname->ln_namelen; - if (sname) - reclen += 1 + sname->ln_namelen; - reclen = llog_data_len(reclen); + reclen = llog_data_len(sizeof(*rec) + + (tname != NULL ? tname->ln_namelen : 0) + + (sname != NULL ? 1 + sname->ln_namelen : 0)); + buf = mdd_buf_alloc(env, reclen); + if (buf->lb_buf == NULL) + return -ENOMEM; + + rec = buf->lb_buf; + rec->cr_hdr.lrh_len = reclen; + rec->cr_hdr.lrh_type = CHANGELOG_REC; + + ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); + if (ctxt == NULL) + return -ENXIO; + + rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle); + llog_ctxt_put(ctxt); + + return rc; +} + +/** Add a changelog entry \a rec to the changelog llog + * \param mdd + * \param rec + * \param handle - currently ignored since llogs start their own transaction; + * this will hopefully be fixed in llog rewrite + * \retval 0 ok + */ +int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd, + struct llog_changelog_rec *rec, struct thandle *th) +{ + struct obd_device *obd = mdd2obd_dev(mdd); + struct llog_ctxt *ctxt; + int rc; + + rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen); + rec->cr_hdr.lrh_type = CHANGELOG_REC; + rec->cr.cr_time = cl_time(); + + spin_lock(&mdd->mdd_cl.mc_lock); + /* NB: I suppose it's possible llog_add adds out of order wrt cr_index, + * but as long as the MDD transactions are ordered correctly for e.g. + * rename conflicts, I don't think this should matter. */ + rec->cr.cr_index = ++mdd->mdd_cl.mc_index; + spin_unlock(&mdd->mdd_cl.mc_lock); + + ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); + if (ctxt == NULL) + return -ENXIO; + + rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th); + llog_ctxt_put(ctxt); + if (rc > 0) + rc = 0; + return rc; +} - return mdd_declare_llog_record(env, mdd, reclen, handle); +/** Add a changelog_ext entry \a rec to the changelog llog + * \param mdd + * \param rec + * \param handle - currently ignored since llogs start their own transaction; + * this will hopefully be fixed in llog rewrite + * \retval 0 ok + */ +int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd, + struct llog_changelog_ext_rec *rec, + struct thandle *th) +{ + struct obd_device *obd = mdd2obd_dev(mdd); + struct llog_ctxt *ctxt; + int rc; + + rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen); + /* llog_lvfs_write_rec sets the llog tail len */ + rec->cr_hdr.lrh_type = CHANGELOG_REC; + rec->cr.cr_time = cl_time(); + + spin_lock(&mdd->mdd_cl.mc_lock); + /* NB: I suppose it's possible llog_add adds out of order wrt cr_index, + * but as long as the MDD transactions are ordered correctly for e.g. + * rename conflicts, I don't think this should matter. */ + rec->cr.cr_index = ++mdd->mdd_cl.mc_index; + spin_unlock(&mdd->mdd_cl.mc_lock); + + ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT); + if (ctxt == NULL) + return -ENXIO; + + /* nested journal transaction */ + rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, NULL, th); + llog_ctxt_put(ctxt); + if (rc > 0) + rc = 0; + + return rc; } /** Store a namespace change changelog record @@ -686,7 +800,7 @@ static int mdd_changelog_ns_store(const struct lu_env *env, buf = mdd_buf_alloc(env, reclen); if (buf->lb_buf == NULL) RETURN(-ENOMEM); - rec = (struct llog_changelog_rec *)buf->lb_buf; + rec = buf->lb_buf; rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags); rec->cr.cr_type = (__u32)type; @@ -697,7 +811,7 @@ static int mdd_changelog_ns_store(const struct lu_env *env, target->mod_cltime = cfs_time_current_64(); - rc = mdd_changelog_llog_write(mdd, rec, handle); + rc = mdd_changelog_store(env, mdd, rec, handle); if (rc < 0) { CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n", rc, type, tname->ln_name, PFID(&rec->cr.cr_tfid), @@ -749,14 +863,12 @@ static int mdd_changelog_ext_ns_store(const struct lu_env *env, LASSERT(tname != NULL); LASSERT(handle != NULL); - reclen = sizeof(*rec) + tname->ln_namelen; - if (sname != NULL) - reclen += 1 + sname->ln_namelen; - reclen = llog_data_len(reclen); + reclen = llog_data_len(sizeof(*rec) + + sname != NULL ? 1 + sname->ln_namelen : 0); buf = mdd_buf_alloc(env, reclen); if (buf->lb_buf == NULL) RETURN(-ENOMEM); - rec = (struct llog_changelog_ext_rec *)buf->lb_buf; + rec = buf->lb_buf; rec->cr.cr_flags = CLF_EXT_VERSION | (CLF_FLAGMASK & flags); rec->cr.cr_type = (__u32)type; @@ -766,7 +878,6 @@ static int mdd_changelog_ext_ns_store(const struct lu_env *env, rec->cr.cr_namelen = tname->ln_namelen; memcpy(rec->cr.cr_name, tname->ln_name, tname->ln_namelen); if (sname) { - LASSERT(sfid != NULL); rec->cr.cr_name[tname->ln_namelen] = '\0'; memcpy(rec->cr.cr_name + tname->ln_namelen + 1, sname->ln_name, sname->ln_namelen); @@ -780,7 +891,7 @@ static int mdd_changelog_ext_ns_store(const struct lu_env *env, fid_zero(&rec->cr.cr_tfid); } - rc = mdd_changelog_ext_llog_write(mdd, rec, handle); + rc = mdd_changelog_ext_store(env, mdd, rec, handle); if (rc < 0) { CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n", rc, type, tname->ln_name, PFID(sfid), PFID(tpfid)); @@ -859,17 +970,17 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, if (rc) GOTO(out_unlock, rc); - rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj), - name, handle, - mdd_object_capa(env, mdd_tobj)); - if (rc) - GOTO(out_unlock, rc); - rc = mdo_ref_add(env, mdd_sobj, handle); + if (rc) + GOTO(out_unlock, rc); + + + rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj), + name, handle, + mdd_object_capa(env, mdd_tobj)); if (rc != 0) { - __mdd_index_delete_only(env, mdd_tobj, name, handle, - mdd_object_capa(env, mdd_tobj)); - GOTO(out_unlock, rc); + mdo_ref_del(env, mdd_sobj, handle); + GOTO(out_unlock, rc); } LASSERT(ma->ma_attr.la_valid & LA_CTIME); @@ -902,15 +1013,15 @@ out_pending: } int mdd_declare_finish_unlink(const struct lu_env *env, - struct mdd_object *obj, - struct md_attr *ma, - struct thandle *handle) + struct mdd_object *obj, + struct md_attr *ma, + struct thandle *handle) { - int rc; + int rc; - rc = orph_declare_index_insert(env, obj, handle); - if (rc) - return rc; + rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle); + if (rc) + return rc; return mdo_declare_destroy(env, obj, handle); } @@ -1057,6 +1168,11 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (rc) GOTO(cleanup, rc); + rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle, + mdd_object_capa(env, mdd_pobj)); + if (rc) + GOTO(cleanup, rc); + rc = mdo_ref_del(env, mdd_cobj, handle); if (rc != 0) { __mdd_index_insert_only(env, mdd_pobj, mdo2fid(mdd_cobj), @@ -1065,11 +1181,6 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, GOTO(cleanup, rc); } - rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle, - mdd_object_capa(env, mdd_pobj)); - if (rc) - GOTO(cleanup, rc); - if (is_dir) /* unlink dot */ mdo_ref_del(env, mdd_cobj, handle); @@ -1107,9 +1218,8 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, mdd_object_capa(env, mdd_cobj)); if (!is_dir) - /* old files may not have link ea; ignore errors */ - mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj), - lname, NULL, NULL, handle); + /* old files may not have link ea; ignore errors */ + mdd_links_del(env, mdd_cobj, mdo2fid(mdd_pobj), lname, handle); /* if object is removed then we can't get its attrs, use last get */ if (cattr->la_nlink == 0) { @@ -1205,21 +1315,12 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen, spec->sp_cr_flags, spec->no_create); - if (spec->no_create) { - /* replay case */ + if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) { + /* replay case or lfs setstripe */ buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen); - } else if (!(spec->sp_cr_flags & MDS_OPEN_HAS_OBJS)) { - if (spec->sp_cr_flags & MDS_OPEN_HAS_EA) { - /* lfs setstripe */ - buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, - spec->u.sp_ea.eadatalen); - } else { - buf = &LU_BUF_NULL; - } } else { - /* MDS_OPEN_HAS_OBJS is not used anymore ? */ - LBUG(); + buf = &LU_BUF_NULL; } rc = dt_declare_xattr_set(env, mdd_object_child(son), buf, @@ -1296,7 +1397,16 @@ int mdd_declare_object_initialize(const struct lu_env *env, { int rc; + /* + * inode mode has been set in creation time, and it's based on umask, + * la_mode and acl, don't set here again! (which will go wrong + * because below function doesn't consider umask). + * I'd suggest set all object attributes in creation time, see above. + */ + LASSERT(attr->la_valid & (LA_MODE | LA_TYPE)); + attr->la_valid &= ~(LA_MODE | LA_TYPE); rc = mdo_declare_attr_set(env, child, attr, handle); + attr->la_valid |= LA_MODE | LA_TYPE; if (rc == 0 && S_ISDIR(attr->la_mode)) { rc = mdo_declare_index_insert(env, child, mdo2fid(child), dot, handle); @@ -1326,9 +1436,19 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, * (2) maybe, the child attributes should be set in OSD when creation. */ + /* + * inode mode has been set in creation time, and it's based on umask, + * la_mode and acl, don't set here again! (which will go wrong + * because below function doesn't consider umask). + * I'd suggest set all object attributes in creation time, see above. + */ + LASSERT(attr->la_valid & (LA_MODE | LA_TYPE)); + attr->la_valid &= ~(LA_MODE | LA_TYPE); rc = mdd_attr_set_internal(env, child, attr, handle, 0); - if (rc != 0) - RETURN(rc); + /* arguments are supposed to stay the same */ + attr->la_valid |= LA_MODE | LA_TYPE; + if (rc != 0) + RETURN(rc); if (S_ISDIR(attr->la_mode)) { /* Add "." and ".." for newly created dir */ @@ -1368,7 +1488,7 @@ static int mdd_create_sanity_check(const struct lu_env *env, if (mdd_is_dead_obj(obj)) RETURN(-ENOENT); - /* + /* * In some cases this lookup is not needed - we know before if name * exists or not because MDT performs lookup for it. * name length check is done in lookup. @@ -1436,8 +1556,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, struct thandle *handle, const struct md_op_spec *spec) { - struct mdd_thread_info *info = mdd_env_info(env); - int rc = 0; + int rc; rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec); if (rc) @@ -1457,7 +1576,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, GOTO(out, rc); } - rc = mdo_declare_attr_set(env, c, &info->mti_pattr, handle); + rc = mdo_declare_attr_set(env, c, attr, handle); if (rc) GOTO(out, rc); @@ -1475,13 +1594,16 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, } rc = mdd_declare_object_initialize(env, c, attr, handle); - if (rc) - GOTO(out, rc); + if (rc) + GOTO(out, rc); - rc = mdo_declare_index_insert(env, p, mdo2fid(c), - name->ln_name, handle); - if (rc) - GOTO(out, rc); + if (spec->sp_cr_flags & MDS_OPEN_VOLATILE) + rc = orph_declare_index_insert(env, c, attr->la_mode, handle); + else + rc = mdo_declare_index_insert(env, p, mdo2fid(c), + name->ln_name, handle); + if (rc) + GOTO(out, rc); /* replay case, create LOV EA from client data */ if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) { @@ -1503,9 +1625,11 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, GOTO(out, rc); } - rc = mdo_declare_attr_set(env, p, attr, handle); - if (rc) - return rc; + if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) { + rc = mdo_declare_attr_set(env, p, attr, handle); + if (rc) + return rc; + } rc = mdd_declare_changelog_store(env, mdd, name, handle); if (rc) @@ -1523,19 +1647,19 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct md_object *child, struct md_op_spec *spec, struct md_attr* ma) { - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_attr *la = &info->mti_la_for_fix; - struct mdd_object *mdd_pobj = md2mdd_obj(pobj); - struct mdd_object *son = md2mdd_obj(child); - struct mdd_device *mdd = mdo2mdd(pobj); - struct lu_attr *attr = &ma->ma_attr; - struct thandle *handle; - struct lu_attr *pattr = &info->mti_pattr; - struct dynlock_handle *dlh; - const char *name = lname->ln_name; - int rc, created = 0, initialized = 0, inserted = 0; - int got_def_acl = 0; - ENTRY; + struct mdd_thread_info *info = mdd_env_info(env); + struct lu_attr *la = &info->mti_la_for_fix; + struct mdd_object *mdd_pobj = md2mdd_obj(pobj); + struct mdd_object *son = md2mdd_obj(child); + struct mdd_device *mdd = mdo2mdd(pobj); + struct lu_attr *attr = &ma->ma_attr; + struct thandle *handle; + struct lu_attr *pattr = &info->mti_pattr; + struct dynlock_handle *dlh; + const char *name = lname->ln_name; + int rc, created = 0, initialized = 0, inserted = 0; + int got_def_acl = 0; + ENTRY; /* * Two operations have to be performed: @@ -1615,21 +1739,21 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, if (rc) GOTO(out_stop, rc); - dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); - if (dlh == NULL) - GOTO(out_trans, rc = -ENOMEM); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); + if (dlh == NULL) + GOTO(out_trans, rc = -ENOMEM); - mdd_write_lock(env, son, MOR_TGT_CHILD); - rc = mdd_object_create_internal(env, mdd_pobj, son, attr, handle, spec); - if (rc) { - mdd_write_unlock(env, son); - GOTO(cleanup, rc); - } + mdd_write_lock(env, son, MOR_TGT_CHILD); + rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec); + if (rc) { + mdd_write_unlock(env, son); + GOTO(cleanup, rc); + } - created = 1; + created = 1; #ifdef CONFIG_FS_POSIX_ACL - if (got_def_acl) { + if (got_def_acl) { struct lu_buf *acl_buf; acl_buf = mdd_buf_get(env, info->mti_xattr_buf, got_def_acl); @@ -1638,10 +1762,10 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, mdd_write_unlock(env, son); GOTO(cleanup, rc); } - } + } #endif - rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname, + rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname, son, attr, handle, spec); /* @@ -1650,8 +1774,8 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, * MDT calls this xattr_set(LOV) in a different transaction. * probably this way we code can be made better. */ - if (rc == 0 && - (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) { + if (rc == 0 && (spec->no_create || + (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) { const struct lu_buf *buf; buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, @@ -1659,26 +1783,33 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle, BYPASS_CAPA); } - mdd_write_unlock(env, son); - if (rc) - /* - * Object has no links, so it will be destroyed when last - * reference is released. (XXX not now.) - */ - GOTO(cleanup, rc); - initialized = 1; + if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE) + rc = __mdd_orphan_add(env, son, handle); - rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son), - name, S_ISDIR(attr->la_mode), handle, - mdd_object_capa(env, mdd_pobj)); - if (rc) - GOTO(cleanup, rc); + mdd_write_unlock(env, son); + + if (rc != 0) + /* + * Object has no links, so it will be destroyed when last + * reference is released. (XXX not now.) + */ + GOTO(cleanup, rc); - inserted = 1; + initialized = 1; + + if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) + rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son), + name, S_ISDIR(attr->la_mode), handle, + mdd_object_capa(env, mdd_pobj)); + + if (rc != 0) + GOTO(cleanup, rc); + + inserted = 1; if (S_ISLNK(attr->la_mode)) { - struct md_ucred *uc = md_ucred(env); + struct lu_ucred *uc = lu_ucred_assert(env); struct dt_object *dt = mdd_object_child(son); const char *target_name = spec->u.sp_symname; int sym_len = strlen(target_name); @@ -1686,10 +1817,10 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, loff_t pos = 0; buf = mdd_buf_get_const(env, target_name, sym_len); - rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle, - mdd_object_capa(env, son), - uc->mu_cap & - CFS_CAP_SYS_RESOURCE_MASK); + rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle, + mdd_object_capa(env, son), + uc->uc_cap & + CFS_CAP_SYS_RESOURCE_MASK); if (rc == sym_len) rc = 0; @@ -1697,11 +1828,16 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, GOTO(cleanup, rc = -EFAULT); } + /* volatile file creation does not update parent directory times */ + if (spec->sp_cr_flags & MDS_OPEN_VOLATILE) + GOTO(cleanup, rc = 0); + + /* update parent directory mtime/ctime */ *la = *attr; - la->la_valid = LA_CTIME | LA_MTIME; + la->la_valid = LA_CTIME | LA_MTIME; rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0); - if (rc) - GOTO(cleanup, rc); + if (rc) + GOTO(cleanup, rc); EXIT; cleanup: @@ -1709,9 +1845,12 @@ cleanup: int rc2; if (inserted != 0) { - rc2 = __mdd_index_delete(env, mdd_pobj, name, - S_ISDIR(attr->la_mode), - handle, BYPASS_CAPA); + if (spec->sp_cr_flags & MDS_OPEN_VOLATILE) + rc2 = __mdd_orphan_del(env, son, handle); + else + rc2 = __mdd_index_delete(env, mdd_pobj, name, + S_ISDIR(attr->la_mode), + handle, BYPASS_CAPA); if (rc2 != 0) goto out_stop; } @@ -1750,7 +1889,7 @@ out_stop: out_free: /* The child object shouldn't be cached anymore */ if (rc) - cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, + set_bit(LU_OBJECT_HEARD_BANSHEE, &child->mo_lu.lo_header->loh_flags); return rc; } @@ -1971,7 +2110,9 @@ static int mdd_rename(const struct lu_env *env, struct thandle *handle; const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj); const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj); - int is_dir; + bool is_dir; + bool tobj_ref = 0; + bool tobj_locked = 0; unsigned cl_flags = 0; int rc, rc2; ENTRY; @@ -2096,8 +2237,8 @@ static int mdd_rename(const struct lu_env *env, */ if (tobj && mdd_object_exists(mdd_tobj)) { mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD); + tobj_locked = 1; if (mdd_is_dead_obj(mdd_tobj)) { - mdd_write_unlock(env, mdd_tobj); /* shld not be dead, something is wrong */ CERROR("tobj is dead, something is wrong\n"); rc = -EINVAL; @@ -2108,31 +2249,51 @@ static int mdd_rename(const struct lu_env *env, /* Remove dot reference. */ if (S_ISDIR(tg_attr->la_mode)) mdo_ref_del(env, mdd_tobj, handle); + tobj_ref = 1; /* fetch updated nlink */ rc = mdd_la_get(env, mdd_tobj, tg_attr, mdd_object_capa(env, mdd_tobj)); - if (rc) + if (rc != 0) { + CERROR("%s: Failed to get nlink for tobj " + DFID": rc = %d\n", + mdd2obd_dev(mdd)->obd_name, + PFID(tpobj_fid), rc); GOTO(fixup_tpobj, rc); + } - la->la_valid = LA_CTIME; - rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); - if (rc) - GOTO(fixup_tpobj, rc); + la->la_valid = LA_CTIME; + rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); + if (rc != 0) { + CERROR("%s: Failed to set ctime for tobj " + DFID": rc = %d\n", + mdd2obd_dev(mdd)->obd_name, + PFID(tpobj_fid), rc); + GOTO(fixup_tpobj, rc); + } /* XXX: this transfer to ma will be removed with LOD/OSP */ ma->ma_attr = *tg_attr; ma->ma_valid |= MA_INODE; - rc = mdd_finish_unlink(env, mdd_tobj, ma, handle); - mdd_write_unlock(env, mdd_tobj); - if (rc) - GOTO(fixup_tpobj, rc); + rc = mdd_finish_unlink(env, mdd_tobj, ma, handle); + if (rc != 0) { + CERROR("%s: Failed to unlink tobj " + DFID": rc = %d\n", + mdd2obd_dev(mdd)->obd_name, + PFID(tpobj_fid), rc); + GOTO(fixup_tpobj, rc); + } /* fetch updated nlink */ rc = mdd_la_get(env, mdd_tobj, tg_attr, mdd_object_capa(env, mdd_tobj)); - if (rc) + if (rc != 0) { + CERROR("%s: Failed to get nlink for tobj " + DFID": rc = %d\n", + mdd2obd_dev(mdd)->obd_name, + PFID(tpobj_fid), rc); GOTO(fixup_tpobj, rc); + } /* XXX: this transfer to ma will be removed with LOD/OSP */ ma->ma_attr = *tg_attr; ma->ma_valid |= MA_INODE; @@ -2154,8 +2315,8 @@ static int mdd_rename(const struct lu_env *env, if (rc == 0 && mdd_sobj) { mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD); - rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname, - mdo2fid(mdd_tpobj), ltname, handle); + rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname, + mdo2fid(mdd_tpobj), ltname, handle, 0, 0); if (rc == -ENOENT) /* Old files might not have EA entry */ mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj), @@ -2177,6 +2338,12 @@ fixup_tpobj: if (mdd_tobj && mdd_object_exists(mdd_tobj) && !mdd_is_dead_obj(mdd_tobj)) { + if (tobj_ref) { + mdo_ref_add(env, mdd_tobj, handle); + if (is_dir) + mdo_ref_add(env, mdd_tobj, handle); + } + rc2 = __mdd_index_insert(env, mdd_tpobj, mdo2fid(mdd_tobj), tname, is_dir, handle, @@ -2210,6 +2377,8 @@ fixup_spobj2: CWARN("sp obj fix error %d\n",rc2); } cleanup: + if (tobj_locked) + mdd_write_unlock(env, mdd_tobj); if (likely(tdlh) && sdlh != tdlh) mdd_pdo_write_unlock(env, mdd_tpobj, tdlh); if (likely(sdlh)) @@ -2229,10 +2398,92 @@ out_pending: return rc; } -/** enable/disable storing of hardlink info */ -int mdd_linkea_enable = 1; -CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644, - "record hardlink info in EAs"); +/** + * The data that link search is done on. + */ +struct mdd_link_data { + /** + * Buffer to keep link EA body. + */ + struct lu_buf *ml_buf; + /** + * The matched header, entry and its lenght in the EA + */ + struct link_ea_header *ml_leh; + struct link_ea_entry *ml_lee; + int ml_reclen; +}; + +static int mdd_links_new(const struct lu_env *env, + struct mdd_link_data *ldata) +{ + ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (ldata->ml_buf->lb_buf == NULL) + return -ENOMEM; + ldata->ml_leh = ldata->ml_buf->lb_buf; + ldata->ml_leh->leh_magic = LINK_EA_MAGIC; + ldata->ml_leh->leh_len = sizeof(struct link_ea_header); + ldata->ml_leh->leh_reccount = 0; + return 0; +} + +/** Read the link EA into a temp buffer. + * Uses the mdd_thread_info::mti_big_buf since it is generally large. + * A pointer to the buffer is stored in \a ldata::ml_buf. + * + * \retval 0 or error + */ +int mdd_links_read(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct mdd_link_data *ldata) +{ + struct lustre_capa *capa; + struct link_ea_header *leh; + int rc; + + /* First try a small buf */ + LASSERT(env != NULL); + ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (ldata->ml_buf->lb_buf == NULL) + return -ENOMEM; + + if (!mdd_object_exists(mdd_obj)) + return -ENODATA; + + capa = mdd_object_capa(env, mdd_obj); + rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf, + XATTR_NAME_LINK, capa); + if (rc == -ERANGE) { + /* Buf was too small, figure out what we need. */ + mdd_buf_put(ldata->ml_buf); + rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf, + XATTR_NAME_LINK, capa); + if (rc < 0) + return rc; + ldata->ml_buf = mdd_buf_alloc(env, rc); + if (ldata->ml_buf->lb_buf == NULL) + return -ENOMEM; + rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf, + XATTR_NAME_LINK, capa); + } + if (rc < 0) + return rc; + + leh = ldata->ml_buf->lb_buf; + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + /* entries are swabbed by mdd_lee_unpack */ + } + if (leh->leh_magic != LINK_EA_MAGIC) + return -EINVAL; + if (leh->leh_reccount == 0) + return -ENODATA; + + ldata->ml_leh = leh; + return 0; +} /** Read the link EA into a temp buffer. * Uses the name_buf since it is generally large. @@ -2240,50 +2491,24 @@ CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644, * \retval ptr to \a lu_buf (always \a mti_big_buf) */ struct lu_buf *mdd_links_get(const struct lu_env *env, - struct mdd_object *mdd_obj) + struct mdd_object *mdd_obj) { - struct lu_buf *buf; - struct lustre_capa *capa; - struct link_ea_header *leh; - int rc; - - /* First try a small buf */ - buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); - if (buf->lb_buf == NULL) - return ERR_PTR(-ENOMEM); + struct mdd_link_data ldata = { 0 }; + int rc; - if (!mdd_object_exists(mdd_obj)) - return ERR_PTR(-ENODATA); - - capa = mdd_object_capa(env, mdd_obj); - rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); - if (rc == -ERANGE) { - /* Buf was too small, figure out what we need. */ - mdd_buf_put(buf); - rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); - if (rc < 0) - return ERR_PTR(rc); - buf = mdd_buf_alloc(env, rc); - if (buf->lb_buf == NULL) - return ERR_PTR(-ENOMEM); - rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); - } - if (rc < 0) - return ERR_PTR(rc); - - leh = buf->lb_buf; - if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { - leh->leh_magic = LINK_EA_MAGIC; - leh->leh_reccount = __swab32(leh->leh_reccount); - leh->leh_len = __swab64(leh->leh_len); - /* entries are swabbed by mdd_lee_unpack */ - } - if (leh->leh_magic != LINK_EA_MAGIC) - return ERR_PTR(-EINVAL); - if (leh->leh_reccount == 0) - return ERR_PTR(-ENODATA); + rc = mdd_links_read(env, mdd_obj, &ldata); + return rc ? ERR_PTR(rc) : ldata.ml_buf; +} - return buf; +static int mdd_links_write(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct mdd_link_data *ldata, + struct thandle *handle) +{ + const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ml_buf->lb_buf, + ldata->ml_leh->leh_len); + return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle, + mdd_object_capa(env, mdd_obj)); } /** Pack a link_ea_entry. @@ -2317,34 +2542,6 @@ void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen, lname->ln_namelen = *reclen - sizeof(struct link_ea_entry); } -/** Add a record to the end of link ea buf */ -static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf, - const struct lu_fid *pfid, - const struct lu_name *lname) -{ - struct link_ea_header *leh; - struct link_ea_entry *lee; - int reclen; - - if (lname == NULL || pfid == NULL) - return -EINVAL; - - /* Make sure our buf is big enough for the new one */ - leh = buf->lb_buf; - reclen = lname->ln_namelen + sizeof(struct link_ea_entry); - if (leh->leh_len + reclen > buf->lb_len) { - if (mdd_buf_grow(env, leh->leh_len + reclen) < 0) - return -ENOMEM; - } - - leh = buf->lb_buf; - lee = buf->lb_buf + leh->leh_len; - reclen = mdd_lee_pack(lee, lname, pfid); - leh->leh_len += reclen; - leh->leh_reccount++; - return 0; -} - static int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, struct thandle *handle) @@ -2365,153 +2562,257 @@ static int mdd_declare_links_add(const struct lu_env *env, */ #define LINKEA_MAX_COUNT 128 -static int mdd_links_add(const struct lu_env *env, - struct mdd_object *mdd_obj, - const struct lu_fid *pfid, - const struct lu_name *lname, - struct thandle *handle, int first) +/** Add a record to the end of link ea buf */ +static int mdd_links_add_buf(const struct lu_env *env, + struct mdd_link_data *ldata, + const struct lu_name *lname, + const struct lu_fid *pfid) { - struct lu_buf *buf; - struct link_ea_header *leh; - int rc; - ENTRY; + LASSERT(ldata->ml_leh != NULL); - if (!mdd_linkea_enable) - RETURN(0); + if (lname == NULL || pfid == NULL) + return -EINVAL; - buf = first ? ERR_PTR(-ENODATA) : mdd_links_get(env, mdd_obj); - if (IS_ERR(buf)) { - rc = PTR_ERR(buf); - if (rc != -ENODATA) { - CERROR("link_ea read failed %d "DFID"\n", rc, - PFID(mdd_object_fid(mdd_obj))); - RETURN (rc); - } - /* empty EA; start one */ - buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); - if (buf->lb_buf == NULL) - RETURN(-ENOMEM); - leh = buf->lb_buf; - leh->leh_magic = LINK_EA_MAGIC; - leh->leh_len = sizeof(struct link_ea_header); - leh->leh_reccount = 0; - } + /* Make sure our buf is big enough for the new one */ + if (ldata->ml_leh->leh_reccount > LINKEA_MAX_COUNT) + return -EOVERFLOW; - leh = buf->lb_buf; - if (leh->leh_reccount > LINKEA_MAX_COUNT) - RETURN(-EOVERFLOW); + ldata->ml_reclen = lname->ln_namelen + sizeof(struct link_ea_entry); + if (ldata->ml_leh->leh_len + ldata->ml_reclen > + ldata->ml_buf->lb_len) { + if (mdd_buf_grow(env, ldata->ml_leh->leh_len + + ldata->ml_reclen) < 0) + return -ENOMEM; + } - rc = __mdd_links_add(env, buf, pfid, lname); - if (rc) - RETURN(rc); + ldata->ml_leh = ldata->ml_buf->lb_buf; + ldata->ml_lee = ldata->ml_buf->lb_buf + ldata->ml_leh->leh_len; + ldata->ml_reclen = mdd_lee_pack(ldata->ml_lee, lname, pfid); + ldata->ml_leh->leh_len += ldata->ml_reclen; + ldata->ml_leh->leh_reccount++; + CDEBUG(D_INODE, "New link_ea name '%.*s' is added\n", + lname->ln_namelen, lname->ln_name); + return 0; +} - leh = buf->lb_buf; - rc = mdo_xattr_set(env, mdd_obj, - mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), - XATTR_NAME_LINK, 0, handle, - mdd_object_capa(env, mdd_obj)); - if (rc) { - if (rc == -ENOSPC) - CDEBUG(D_INODE, "link_ea add failed %d "DFID"\n", rc, - PFID(mdd_object_fid(mdd_obj))); - else - CERROR("link_ea add failed %d "DFID"\n", rc, - PFID(mdd_object_fid(mdd_obj))); - } +/** Del the current record from the link ea buf */ +static void mdd_links_del_buf(const struct lu_env *env, + struct mdd_link_data *ldata, + const struct lu_name *lname) +{ + LASSERT(ldata->ml_leh != NULL); - if (buf->lb_len > OBD_ALLOC_BIG) - /* if we vmalloced a large buffer drop it */ - mdd_buf_put(buf); + ldata->ml_leh->leh_reccount--; + ldata->ml_leh->leh_len -= ldata->ml_reclen; + memmove(ldata->ml_lee, (char *)ldata->ml_lee + ldata->ml_reclen, + (char *)ldata->ml_leh + ldata->ml_leh->leh_len - + (char *)ldata->ml_lee); + CDEBUG(D_INODE, "Old link_ea name '%.*s' is removed\n", + lname->ln_namelen, lname->ln_name); - RETURN (rc); } -static int mdd_links_rename(const struct lu_env *env, - struct mdd_object *mdd_obj, - const struct lu_fid *oldpfid, - const struct lu_name *oldlname, - const struct lu_fid *newpfid, - const struct lu_name *newlname, - struct thandle *handle) +/** + * Check if such a link exists in linkEA. + * + * \param mdd_obj object being handled + * \param pfid parent fid the link to be found for + * \param lname name in the parent's directory entry pointing to this object + * \param ldata link data the search to be done on + * + * \retval 0 success + * \retval -ENOENT link does not exist + * \retval -ve on error + */ +static int mdd_links_find(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct mdd_link_data *ldata, + const struct lu_name *lname, + const struct lu_fid *pfid) { - struct lu_buf *buf; - struct link_ea_header *leh; - struct link_ea_entry *lee; - struct lu_name *tmpname = &mdd_env_info(env)->mti_name; - struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid; - int reclen = 0; - int count; - int rc, rc2 = 0; - ENTRY; + struct lu_name *tmpname = &mdd_env_info(env)->mti_name2; + struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid; + int count; + + LASSERT(ldata->ml_leh != NULL); + + /* link #0 */ + ldata->ml_lee = (struct link_ea_entry *)(ldata->ml_leh + 1); + + for (count = 0; count < ldata->ml_leh->leh_reccount; count++) { + mdd_lee_unpack(ldata->ml_lee, &ldata->ml_reclen, + tmpname, tmpfid); + if (tmpname->ln_namelen == lname->ln_namelen && + lu_fid_eq(tmpfid, pfid) && + (strncmp(tmpname->ln_name, lname->ln_name, + tmpname->ln_namelen) == 0)) + break; + ldata->ml_lee = (struct link_ea_entry *)((char *)ldata->ml_lee + + ldata->ml_reclen); + } - if (!mdd_linkea_enable) - RETURN(0); + if (count == ldata->ml_leh->leh_reccount) { + CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n", + lname->ln_namelen, lname->ln_name); + return -ENOENT; + } + return 0; +} - if (mdd_obj->mod_flags & DEAD_OBJ) - /* No more links, don't bother */ - RETURN(0); +static int __mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct mdd_link_data *ldata, + const struct lu_name *lname, + const struct lu_fid *pfid, + int first, int check) +{ + int rc; - buf = mdd_links_get(env, mdd_obj); - if (IS_ERR(buf)) { - rc = PTR_ERR(buf); - if (rc == -ENODATA) - CDEBUG(D_INODE, "link_ea read failed %d "DFID"\n", - rc, PFID(mdd_object_fid(mdd_obj))); - else - CERROR("link_ea read failed %d "DFID"\n", - rc, PFID(mdd_object_fid(mdd_obj))); - RETURN(rc); - } - leh = buf->lb_buf; - lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ - - /* Find the old record */ - for(count = 0; count < leh->leh_reccount; count++) { - mdd_lee_unpack(lee, &reclen, tmpname, tmpfid); - if (tmpname->ln_namelen == oldlname->ln_namelen && - lu_fid_eq(tmpfid, oldpfid) && - (strncmp(tmpname->ln_name, oldlname->ln_name, - tmpname->ln_namelen) == 0)) - break; - lee = (struct link_ea_entry *)((char *)lee + reclen); - } - if ((count + 1) > leh->leh_reccount) { - CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n", - oldlname->ln_namelen, oldlname->ln_name); - GOTO(out, rc = -ENOENT); - } + if (ldata->ml_leh == NULL) { + rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata); + if (rc) { + if (rc != -ENODATA) + return rc; + rc = mdd_links_new(env, ldata); + if (rc) + return rc; + } + } - /* Remove the old record */ - leh->leh_reccount--; - leh->leh_len -= reclen; - memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len - - (char *)lee); - - /* If renaming, add the new record */ - if (newpfid != NULL) { - /* if the add fails, we still delete the out-of-date old link */ - rc2 = __mdd_links_add(env, buf, newpfid, newlname); - leh = buf->lb_buf; - } + if (check) { + rc = mdd_links_find(env, mdd_obj, ldata, lname, pfid); + if (rc && rc != -ENOENT) + return rc; + if (rc == 0) + return -EEXIST; + } + + return mdd_links_add_buf(env, ldata, lname, pfid); +} + +static int __mdd_links_del(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct mdd_link_data *ldata, + const struct lu_name *lname, + const struct lu_fid *pfid) +{ + int rc; + + if (ldata->ml_leh == NULL) { + rc = mdd_links_read(env, mdd_obj, ldata); + if (rc) + return rc; + } + + rc = mdd_links_find(env, mdd_obj, ldata, lname, pfid); + if (rc) + return rc; + + mdd_links_del_buf(env, ldata, lname); + return 0; +} + +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle, + int first, int check) +{ + struct mdd_link_data ldata = { 0 }; + int updated = 0; + int rc2 = 0; + int rc = 0; + ENTRY; + + LASSERT(oldpfid != NULL || newpfid != NULL); + + if (mdd_obj->mod_flags & DEAD_OBJ) + /* No more links, don't bother */ + RETURN(0); + + if (oldpfid != NULL) { + rc = __mdd_links_del(env, mdd_obj, &ldata, + oldlname, oldpfid); + if (rc) { + if ((check == 0) || + (rc != -ENODATA && rc != -ENOENT)) + GOTO(out, rc); + /* No changes done. */ + rc = 0; + } else { + updated = 1; + } + } - rc = mdo_xattr_set(env, mdd_obj, - mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), - XATTR_NAME_LINK, 0, handle, - mdd_object_capa(env, mdd_obj)); + /* If renaming, add the new record */ + if (newpfid != NULL) { + /* even if the add fails, we still delete the out-of-date + * old link */ + rc2 = __mdd_links_add(env, mdd_obj, &ldata, + newlname, newpfid, first, check); + if (rc2 == -EEXIST) + rc2 = 0; + else if (rc2 == 0) + updated = 1; + } + if (updated) + rc = mdd_links_write(env, mdd_obj, &ldata, handle); + EXIT; out: - if (rc == 0) - rc = rc2; - if (rc) - CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n", - oldlname->ln_namelen, oldlname->ln_name, rc, - PFID(mdd_object_fid(mdd_obj))); + if (rc == 0) + rc = rc2; + if (rc) { + int error = 1; + if (rc == -EOVERFLOW || rc == - ENOENT) + error = 0; + if (oldpfid == NULL) + CDEBUG(error ? D_ERROR : D_OTHER, + "link_ea add '%.*s' failed %d "DFID"\n", + newlname->ln_namelen, newlname->ln_name, + rc, PFID(mdd_object_fid(mdd_obj))); + else if (newpfid == NULL) + CDEBUG(error ? D_ERROR : D_OTHER, + "link_ea del '%.*s' failed %d "DFID"\n", + oldlname->ln_namelen, oldlname->ln_name, + rc, PFID(mdd_object_fid(mdd_obj))); + else + CDEBUG(error ? D_ERROR : D_OTHER, + "link_ea rename '%.*s'->'%.*s' failed %d " + DFID"\n", + oldlname->ln_namelen, oldlname->ln_name, + newlname->ln_namelen, newlname->ln_name, + rc, PFID(mdd_object_fid(mdd_obj))); + } + + if (ldata.ml_buf && ldata.ml_buf->lb_len > OBD_ALLOC_BIG) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(ldata.ml_buf); + + return rc; +} - if (buf->lb_len > OBD_ALLOC_BIG) - /* if we vmalloced a large buffer drop it */ - mdd_buf_put(buf); +static inline int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle, int first) +{ + return mdd_links_rename(env, mdd_obj, NULL, NULL, + pfid, lname, handle, first, 0); +} - RETURN (rc); +static inline int mdd_links_del(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle) +{ + return mdd_links_rename(env, mdd_obj, pfid, lname, + NULL, NULL, handle, 0, 0); } const struct md_dir_operations mdd_dir_ops = {