X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_dir.c;h=2c1ffecf16dbb429418ae41343fdfc5918972851;hb=ec20be97b9f977d3f4944523baaffb1bf95cf76c;hp=fa56fcdbd1505149ea2aa65fbf6c3044541af004;hpb=f2e5761d5b64f360b114d434f46fdc50d607a55c;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index fa56fcd..2c1ffec 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -26,8 +26,11 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. + * */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -46,14 +49,21 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include #include #include #include - +#ifdef HAVE_EXT4_LDISKFS +#include +#else #include +#endif #include #include #include @@ -71,6 +81,22 @@ static struct lu_name lname_dotdot = { static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask); +static int mdd_declare_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct thandle *handle); +static int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle, int first); +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle); + static int __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) @@ -89,9 +115,9 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, return rc; } -static int mdd_lookup(const struct lu_env *env, - struct md_object *pobj, const struct lu_name *lname, - struct lu_fid* fid, struct md_op_spec *spec) +int mdd_lookup(const struct lu_env *env, + struct md_object *pobj, const struct lu_name *lname, + struct lu_fid* fid, struct md_op_spec *spec) { int rc; ENTRY; @@ -99,7 +125,6 @@ static int mdd_lookup(const struct lu_env *env, RETURN(rc); } - static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, struct lu_fid *fid) { @@ -107,10 +132,10 @@ static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj, } /* - * For root fid use special function, whcih does not compare version component - * of fid. Vresion component is different for root fids on all MDTs. + * For root fid use special function, which does not compare version component + * of fid. Version component is different for root fids on all MDTs. */ -static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid) +int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid) { return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) && fid_oid(&mdd->mdd_root_fid) == fid_oid(fid); @@ -184,9 +209,8 @@ out: * * returns < 0: if error */ -static int mdd_is_subdir(const struct lu_env *env, - struct md_object *mo, const struct lu_fid *fid, - struct lu_fid *sfid) +int mdd_is_subdir(const struct lu_env *env, struct md_object *mo, + const struct lu_fid *fid, struct lu_fid *sfid) { struct mdd_device *mdd = mdo2mdd(mo); int rc; @@ -232,8 +256,8 @@ static int mdd_dir_is_empty(const struct lu_env *env, RETURN(-ENOTDIR); iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, BYPASS_CAPA); - if (it != NULL) { + it = iops->init(env, obj, LUDA_64BITHASH, BYPASS_CAPA); + if (!IS_ERR(it)) { result = iops->get(env, it, (const void *)""); if (result > 0) { int i; @@ -252,7 +276,7 @@ static int mdd_dir_is_empty(const struct lu_env *env, iops->put(env, it); iops->fini(env, it); } else - result = -ENOMEM; + result = PTR_ERR(it); RETURN(result); } @@ -379,7 +403,13 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj, if (!mdd_object_exists(cobj)) RETURN(-ENOENT); + if (mdd_is_dead_obj(cobj)) + RETURN(-ESTALE); + if (pobj) { + if (!mdd_object_exists(pobj)) + RETURN(-ENOENT); + if (mdd_is_dead_obj(pobj)) RETURN(-ENOENT); @@ -436,6 +466,12 @@ int mdd_link_sanity_check(const struct lu_env *env, int rc = 0; ENTRY; + if (!mdd_object_exists(src_obj)) + RETURN(-ENOENT); + + if (mdd_is_dead_obj(src_obj)) + RETURN(-ESTALE); + /* Local ops, no lookup before link, check filename length here. */ if (lname && (lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len)) RETURN(-ENAMETOOLONG); @@ -458,47 +494,29 @@ int mdd_link_sanity_check(const struct lu_env *env, RETURN(rc); } -/** - * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and - * assign i_nlink to 1 which means the i_nlink for subdir count is incredible - * (maybe too large to be represented). It is a trick to break through the - * "i_nlink" limitation for subdir count. - */ -void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj, - struct thandle *handle) +static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj, + const char *name, struct thandle *handle, + struct lustre_capa *capa) { - struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - struct mdd_device *m = mdd_obj2mdd_dev(obj); + struct dt_object *next = mdd_object_child(pobj); + int rc; + ENTRY; - if (!mdd_is_mnlink(obj)) { - if (S_ISDIR(mdd_object_type(obj))) { - if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA)) - return; - - if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) { - obj->mod_flags |= MNLINK_OBJ; - tmp_la->la_nlink = 1; - tmp_la->la_valid = LA_NLINK; - mdd_attr_set_internal(env, obj, tmp_la, handle, - 0); - return; - } - } - mdo_ref_add(env, obj, handle); - } -} + if (dt_try_as_dir(env, next)) { + rc = next->do_index_ops->dio_delete(env, next, + (struct dt_key *)name, + handle, capa); + } else + rc = -ENOTDIR; -void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj, - struct thandle *handle, int is_dot) -{ - if (!mdd_is_mnlink(obj) || is_dot) - mdo_ref_del(env, obj, handle); + RETURN(rc); } -/* insert named index, add reference if isdir */ -static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, - const struct lu_fid *lf, const char *name, int is_dir, - struct thandle *handle, struct lustre_capa *capa) +static int __mdd_index_insert_only(const struct lu_env *env, + struct mdd_object *pobj, + const struct lu_fid *lf, const char *name, + struct thandle *handle, + struct lustre_capa *capa) { struct dt_object *next = mdd_object_child(pobj); int rc; @@ -508,20 +526,29 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, struct md_ucred *uc = md_ucred(env); rc = next->do_index_ops->dio_insert(env, next, - __mdd_fid_rec(env, lf), + (struct dt_rec*)lf, (const struct dt_key *)name, handle, capa, uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); } else { rc = -ENOTDIR; } + RETURN(rc); +} - if (rc == 0) { - if (is_dir) { - mdd_write_lock(env, pobj, MOR_TGT_PARENT); - __mdd_ref_add(env, pobj, handle); - mdd_write_unlock(env, pobj); - } +/* insert named index, add reference if isdir */ +static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj, + const struct lu_fid *lf, const char *name, int is_dir, + struct thandle *handle, struct lustre_capa *capa) +{ + int rc; + ENTRY; + + rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa); + if (rc == 0 && is_dir) { + mdd_write_lock(env, pobj, MOR_TGT_PARENT); + mdo_ref_add(env, pobj, handle); + mdd_write_unlock(env, pobj); } RETURN(rc); } @@ -531,50 +558,173 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj, const char *name, int is_dir, struct thandle *handle, struct lustre_capa *capa) { - struct dt_object *next = mdd_object_child(pobj); int rc; ENTRY; - if (dt_try_as_dir(env, next)) { - rc = next->do_index_ops->dio_delete(env, next, - (struct dt_key *)name, - handle, capa); - if (rc == 0 && is_dir) { - int is_dot = 0; - - if (name != NULL && name[0] == '.' && name[1] == 0) - is_dot = 1; - mdd_write_lock(env, pobj, MOR_TGT_PARENT); - __mdd_ref_del(env, pobj, handle, is_dot); - mdd_write_unlock(env, pobj); - } - } else - rc = -ENOTDIR; + rc = __mdd_index_delete_only(env, pobj, name, handle, capa); + if (rc == 0 && is_dir) { + mdd_write_lock(env, pobj, MOR_TGT_PARENT); + mdo_ref_del(env, pobj, handle); + mdd_write_unlock(env, pobj); + } RETURN(rc); } -static int -__mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj, - const struct lu_fid *lf, const char *name, - struct thandle *handle, struct lustre_capa *capa) +int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd, + int reclen, struct thandle *handle) { - struct dt_object *next = mdd_object_child(pobj); - int rc; + int rc; + + /* XXX: this is a temporary solution to declare llog changes + * will be fixed in 2.3 with new llog implementation */ + + LASSERT(mdd->mdd_capa); + + /* record itself */ + rc = dt_declare_record_write(env, mdd->mdd_capa, reclen, 0, handle); + if (rc) + return rc; + + /* header will be updated as well */ + rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE, + 0, handle); + if (rc) + return rc; + + /* also we should be able to create new plain log */ + rc = dt_declare_create(env, mdd->mdd_capa, NULL, NULL, NULL, handle); + if (rc) + return rc; + + /* new record referencing new plain llog */ + rc = dt_declare_record_write(env, mdd->mdd_capa, + sizeof(struct llog_logid_rec), 0, handle); + if (rc) + return rc; + + /* catalog's header will be updated as well */ + rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE, + 0, handle); + + return rc; +} + +int mdd_declare_changelog_store(const struct lu_env *env, + struct mdd_device *mdd, + const struct lu_name *fname, + struct thandle *handle) +{ + int reclen; + + /* Not recording */ + if (!(mdd->mdd_cl.mc_flags & CLM_ON)) + return 0; + + /* we'll be writing payload + llog header */ + reclen = sizeof(struct llog_changelog_rec); + if (fname) + reclen += llog_data_len(fname->ln_namelen); + + return mdd_declare_llog_record(env, mdd, reclen, handle); +} + +/** Store a namespace change changelog record + * If this fails, we must fail the whole transaction; we don't + * want the change to commit without the log entry. + * \param target - mdd_object of change + * \param parent - parent dir/object + * \param tf - target lu_fid, overrides fid of \a target if this is non-null + * \param tname - target name string + * \param handle - transacion handle + */ +static int mdd_changelog_ns_store(const struct lu_env *env, + struct mdd_device *mdd, + enum changelog_rec_type type, + int flags, + struct mdd_object *target, + struct mdd_object *parent, + const struct lu_fid *tf, + const struct lu_name *tname, + struct thandle *handle) +{ + const struct lu_fid *tfid; + const struct lu_fid *tpfid = mdo2fid(parent); + struct llog_changelog_rec *rec; + struct lu_buf *buf; + int reclen; + int rc; ENTRY; - if (dt_try_as_dir(env, next)) { - struct md_ucred *uc = md_ucred(env); + /* Not recording */ + if (!(mdd->mdd_cl.mc_flags & CLM_ON)) + RETURN(0); + if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0) + RETURN(0); - rc = next->do_index_ops->dio_insert(env, next, - __mdd_fid_rec(env, lf), - (const struct dt_key *)name, - handle, capa, uc->mu_cap & - CFS_CAP_SYS_RESOURCE_MASK); - } else { - rc = -ENOTDIR; + LASSERT(parent != NULL); + LASSERT(tname != NULL); + LASSERT(handle != NULL); + + /* target */ + reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen); + buf = mdd_buf_alloc(env, reclen); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + rec = (struct llog_changelog_rec *)buf->lb_buf; + + rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags); + rec->cr.cr_type = (__u32)type; + tfid = tf ? tf : mdo2fid(target); + rec->cr.cr_tfid = *tfid; + rec->cr.cr_pfid = *tpfid; + rec->cr.cr_namelen = tname->ln_namelen; + memcpy(rec->cr.cr_name, tname->ln_name, rec->cr.cr_namelen); + if (likely(target)) + target->mod_cltime = cfs_time_current_64(); + + rc = mdd_changelog_llog_write(mdd, rec, handle); + if (rc < 0) { + CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n", + rc, type, tname->ln_name, PFID(tfid), PFID(tpfid)); + return -EFAULT; } - RETURN(rc); + + return 0; +} + +static int mdd_declare_link(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *p, + struct mdd_object *c, + const struct lu_name *name, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,handle); + if (rc) + return rc; + + rc = mdo_declare_ref_add(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, p, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, c, NULL, handle); + if (rc) + return rc; + + rc = mdd_declare_links_add(env, c, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + + return rc; } static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, @@ -590,9 +740,10 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct thandle *handle; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc; ENTRY; @@ -603,21 +754,30 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA); if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_tobj, &data); quota_opc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qids); /* get block quota for parent */ - lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], 1, - &rec_pending, NULL, LQUOTA_FLAGS_BLK); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + dlh = mdd_pdo_write_lock(env, mdd_tobj, name, MOR_TGT_CHILD); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -633,7 +793,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, if (rc) GOTO(out_unlock, rc); - __mdd_ref_add(env, mdd_sobj, handle); + mdo_ref_add(env, mdd_sobj, handle); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -645,19 +805,26 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0); + if (rc == 0) { + mdd_links_add(env, mdd_sobj, + mdo2fid(mdd_tobj), lname, handle, 0); + } + EXIT; out_unlock: mdd_write_unlock(env, mdd_sobj); mdd_pdo_write_unlock(env, mdd_tobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj, + mdd_tobj, NULL, lname, handle); +stop: mdd_trans_stop(env, mdd, rc, handle); out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (rec_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - 1, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, + qids, rec_pending, 1); /* Trigger dqacq for the parent owner. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc, @@ -667,6 +834,20 @@ out_pending: return rc; } +int mdd_declare_finish_unlink(const struct lu_env *env, + struct mdd_object *obj, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = orph_declare_index_insert(env, obj, handle); + if (rc) + return rc; + + return mdd_declare_object_kill(env, obj, ma, handle); +} + /* caller should take a lock before calling */ int mdd_finish_unlink(const struct lu_env *env, struct mdd_object *obj, struct md_attr *ma, @@ -674,25 +855,40 @@ int mdd_finish_unlink(const struct lu_env *env, { int rc; int reset = 1; + int is_dir = S_ISDIR(ma->ma_attr.la_mode); ENTRY; - rc = mdd_iattr_get(env, obj, ma); - if (rc == 0 && ma->ma_attr.la_nlink == 0) { + LASSERT(mdd_write_locked(env, obj) != 0); + + /* read HSM flags, needed to set changelogs flags */ + ma->ma_need = MA_HSM | MA_INODE; + rc = mdd_attr_get_internal(env, obj, ma); + if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) { + obj->mod_flags |= DEAD_OBJ; /* add new orphan and the object * will be deleted during mdd_close() */ if (obj->mod_count) { rc = __mdd_orphan_add(env, obj, th); if (rc == 0) - obj->mod_flags |= ORPHAN_OBJ; - } - - obj->mod_flags |= DEAD_OBJ; - if (!(obj->mod_flags & ORPHAN_OBJ)) { - rc = mdd_object_kill(env, obj, ma); + CDEBUG(D_HA, "Object "DFID" is inserted into " + "orphan list, open count = %d\n", + PFID(mdd_object_fid(obj)), + obj->mod_count); + else + CERROR("Object "DFID" fail to be an orphan, " + "open count = %d, maybe cause failed " + "open replay\n", + PFID(mdd_object_fid(obj)), + obj->mod_count); + } else { + rc = mdd_object_kill(env, obj, ma, th); if (rc == 0) reset = 0; } + /* get the i_nlink */ + ma->ma_need = MA_INODE; + rc = mdd_attr_get_internal(env, obj, ma); } if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); @@ -715,6 +911,50 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj, RETURN(rc); } +static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd, + struct mdd_object *p, struct mdd_object *c, + const struct lu_name *name, struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_index_delete(env, p, name->ln_name, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, p, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_ref_del(env, c, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, p, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, c, NULL, handle); + if (rc) + return rc; + + rc = mdd_declare_finish_unlink(env, c, ma, handle); + if (rc) + return rc; + + rc = mdd_declare_links_add(env, c, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + + return rc; +} + static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct lu_name *lname, struct md_attr *ma) @@ -733,27 +973,31 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, unsigned int qpids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0; #endif - int rc, is_dir; + int is_dir = S_ISDIR(ma->ma_attr.la_mode); + int rc; ENTRY; - LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n", - PFID(mdd_object_fid(mdd_cobj))); - - rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP); - if (rc) - RETURN(rc); + if (mdd_object_exists(mdd_cobj) <= 0) + RETURN(-ENOENT); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); + rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj, + lname, ma, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD); - is_dir = S_ISDIR(ma->ma_attr.la_mode); rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma); if (rc) GOTO(cleanup, rc); @@ -763,10 +1007,10 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (rc) GOTO(cleanup, rc); - __mdd_ref_del(env, mdd_cobj, handle, 0); + mdo_ref_del(env, mdd_cobj, handle); if (is_dir) /* unlink dot */ - __mdd_ref_del(env, mdd_cobj, handle, 1); + mdo_ref_del(env, mdd_cobj, handle); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -776,10 +1020,14 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, if (rc) GOTO(cleanup, rc); - la->la_valid = LA_CTIME; - rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0); - if (rc) - GOTO(cleanup, rc); + if (ma->ma_attr.la_nlink > 0 || mdd_cobj->mod_count > 0) { + /* update ctime of an unlinked file only if it is still + * opened or a link still exists */ + la->la_valid = LA_CTIME; + rc = mdd_attr_check_set_internal(env, mdd_cobj, la, handle, 0); + if (rc) + GOTO(cleanup, rc); + } rc = mdd_finish_unlink(env, mdd_cobj, ma, handle); #ifdef HAVE_QUOTA_SUPPORT @@ -799,16 +1047,30 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, } } #endif + if (!is_dir) + /* old files may not have link ea; ignore errors */ + mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj), + lname, NULL, NULL, handle); - if (rc == 0) - obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp, - sizeof(KEY_UNLINKED), KEY_UNLINKED, 0, - NULL, NULL); EXIT; cleanup: mdd_write_unlock(env, mdd_cobj); mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: + if (rc == 0) { + int cl_flags; + + cl_flags = (ma->ma_attr.la_nlink == 0) ? CLF_UNLINK_LAST : 0; + if ((ma->ma_valid & MA_HSM) && + (ma->ma_hsm.mh_flags & HS_EXISTS)) + cl_flags |= CLF_UNLINK_HSM_EXISTS; + + rc = mdd_changelog_ns_store(env, mdd, + is_dir ? CL_RMDIR : CL_UNLINK, cl_flags, + mdd_cobj, mdd_pobj, NULL, lname, handle); + } + +stop: mdd_trans_stop(env, mdd, rc, handle); #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) @@ -856,14 +1118,19 @@ static int mdd_name_insert(const struct lu_env *env, #ifdef HAVE_QUOTA_SUPPORT struct md_ucred *uc = md_ucred(env); struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_opc = 0, rec_pending[MAXQUOTAS] = { 0, 0 }; cfs_cap_t save = uc->mu_cap; #endif int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { if (!(ma->ma_attr_flags & MDS_QUOTA_IGNORE)) { @@ -871,24 +1138,26 @@ static int mdd_name_insert(const struct lu_env *env, rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA); if (!rc) { + void *data = NULL; + mdd_data_get(env, mdd_obj, &data); quota_opc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qids); /* get block quota for parent */ lquota_chkquota(mds_quota_interface_ref, obd, - qids[USRQUOTA], qids[GRPQUOTA], - 1, &rec_pending, NULL, - LQUOTA_FLAGS_BLK); + exp, qids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } else { uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK; } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP); - handle = mdd_trans_start(env, mdo2mdd(pobj)); + handle = mdd_trans_create(env, mdo2mdd(pobj)); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdo2mdd(pobj), handle); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -923,10 +1192,8 @@ out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { if (quota_opc) { - if (rec_pending) - lquota_pending_commit(mds_quota_interface_ref, - obd, qids[USRQUOTA], - qids[GRPQUOTA], 1, 1); + lquota_pending_commit(mds_quota_interface_ref, + obd, qids, rec_pending, 1); /* Trigger dqacq for the parent owner. If failed, * the next call for lquota_chkquota will process it*/ lquota_adjust(mds_quota_interface_ref, obd, 0, qids, @@ -980,6 +1247,10 @@ static int mdd_name_remove(const struct lu_env *env, int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; @@ -991,11 +1262,12 @@ static int mdd_name_remove(const struct lu_env *env, } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdd, handle); + dlh = mdd_pdo_write_lock(env, mdd_obj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1059,12 +1331,13 @@ static int mdd_rt_sanity_check(const struct lu_env *env, * processed in cmr_rename_tgt before mdd_rename_tgt and enable * MDS_PERM_BYPASS. * So check may_delete, but not check nlink of tgt_pobj. */ - LASSERT(tobj); + rc = mdd_may_delete(env, tgt_pobj, tobj, ma, 1, 1); RETURN(rc); } +/* Partial rename op on slave MDD */ static int mdd_rename_tgt(const struct lu_env *env, struct md_object *pobj, struct md_object *tobj, const struct lu_fid *lf, const struct lu_name *lname, @@ -1079,34 +1352,43 @@ static int mdd_rename_tgt(const struct lu_env *env, struct thandle *handle; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_copc = 0, quota_popc = 0; + int rec_pending[MAXQUOTAS] = { 0, 0 }; #endif int rc; ENTRY; + /* XXX: this code won't be used ever: + * DNE uses slightly different approach */ + LBUG(); + #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && !tobj) { struct lu_attr *la_tmp = &mdd_env_info(env)->mti_la; rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA); if (!rc) { - quota_opc = FSFILT_OP_LINK; + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_popc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qpids); /* get block quota for target parent */ - lquota_chkquota(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], 1, - &rec_pending, NULL, LQUOTA_FLAGS_BLK); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qpids, rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, data, 1); } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_trans_start(env, mdd, handle); + dlh = mdd_pdo_write_lock(env, mdd_tpobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1144,11 +1426,11 @@ static int mdd_rename_tgt(const struct lu_env *env, * it must be local one. */ if (tobj && mdd_object_exists(mdd_tobj)) { - __mdd_ref_del(env, mdd_tobj, handle, 0); + mdo_ref_del(env, mdd_tobj, handle); /* Remove dot reference. */ if (S_ISDIR(ma->ma_attr.la_mode)) - __mdd_ref_del(env, mdd_tobj, handle, 1); + mdo_ref_del(env, mdd_tobj, handle); la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); @@ -1162,7 +1444,7 @@ static int mdd_rename_tgt(const struct lu_env *env, #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && ma->ma_valid & MA_INODE && ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qcids); } #endif @@ -1173,21 +1455,26 @@ cleanup: mdd_write_unlock(env, mdd_tobj); mdd_pdo_write_unlock(env, mdd_tpobj, dlh); out_trans: + if (rc == 0) + /* Bare EXT record with no RENAME in front of it signifies + a partial slave op */ + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, 0, mdd_tobj, + mdd_tpobj, NULL, lname, handle); + mdd_trans_stop(env, mdd, rc, handle); out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { - if (rec_pending) + if (quota_popc) lquota_pending_commit(mds_quota_interface_ref, obd, - qpids[USRQUOTA], - qpids[GRPQUOTA], - 1, 1); - if (quota_opc) - /* Trigger dqrel/dqacq on the target owner of child and - * parent. If failed, the next call for lquota_chkquota + qpids, rec_pending, 1); + + if (quota_copc) + /* Trigger dqrel on the target owner of child. + * If failed, the next call for lquota_chkquota * will process it. */ - lquota_adjust(mds_quota_interface_ref, obd, qcids, - qpids, rc, quota_opc); + lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, + rc, quota_copc); } #endif return rc; @@ -1209,6 +1496,27 @@ static int mdd_cd_sanity_check(const struct lu_env *env, } +static int mdd_declare_create_data(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *obj, + int lmm_size, + struct thandle *handle) +{ + struct lu_buf *buf = &mdd_env_info(env)->mti_buf; + int rc; + + buf->lb_buf = NULL; + buf->lb_len = lmm_size; + rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV, + 0, handle); + if (rc) + return rc; + + rc = mdd_declare_lov_objid_update(env, mdd, handle); + + return rc; +} + static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct md_op_spec *spec, struct md_attr *ma) @@ -1216,7 +1524,6 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, struct mdd_device *mdd = mdo2mdd(cobj); struct mdd_object *mdd_pobj = md2mdd_obj(pobj); struct mdd_object *son = md2mdd_obj(cobj); - struct lu_attr *attr = &ma->ma_attr; struct lov_mds_md *lmm = NULL; int lmm_size = 0; struct thandle *handle; @@ -1229,17 +1536,24 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (!md_should_create(spec->sp_cr_flags)) RETURN(0); + lmm_size = ma->ma_lmm_size; - rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, - spec, attr); + rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, ma); if (rc) RETURN(rc); - mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); + rc = mdd_declare_create_data(env, mdd, son, lmm_size, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + /* * XXX: Setting the lov ea is not locked but setting the attr is locked? * Should this be fixed? @@ -1265,13 +1579,17 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (rc == 0) mdd_lov_objid_update(mdd, lmm); +stop: mdd_trans_stop(env, mdd, rc, handle); out_free: /* Finish mdd_lov_create() stuff. */ + /* if no_create == 0 (not replay), we free lmm allocated by + * mdd_lov_create() */ mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec); RETURN(rc); } +/* Get fid from name and parent */ static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) @@ -1281,7 +1599,6 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *m = mdo2mdd(pobj); struct dt_object *dir = mdd_object_child(mdd_obj); - struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack; int rc; ENTRY; @@ -1294,7 +1611,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, else if (unlikely(rc < 0)) { CERROR("Object "DFID" locates on remote server\n", PFID(mdo2fid(mdd_obj))); - LBUG(); + RETURN(-EINVAL); } /* The common filename length check. */ @@ -1308,11 +1625,12 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, if (likely(S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir))) { + rc = dir->do_index_ops->dio_lookup(env, dir, - (struct dt_rec *)pack, key, + (struct dt_rec *)fid, key, mdd_object_capa(env, mdd_obj)); if (rc > 0) - rc = fid_unpack(pack, fid); + rc = 0; else if (rc == 0) rc = -ENOENT; } else @@ -1321,9 +1639,30 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, RETURN(rc); } +int mdd_declare_object_initialize(const struct lu_env *env, + struct mdd_object *child, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + rc = mdo_declare_attr_set(env, child, &ma->ma_attr, handle); + if (rc == 0 && S_ISDIR(ma->ma_attr.la_mode)) { + rc = mdo_declare_index_insert(env, child, mdo2fid(child), + dot, handle); + if (rc == 0) + rc = mdo_declare_ref_add(env, child, handle); + } + if (rc == 0) + mdd_declare_links_add(env, child, handle); + + return rc; +} + int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, - struct mdd_object *child, struct md_attr *ma, - struct thandle *handle, const struct md_op_spec *spec) + const struct lu_name *lname, struct mdd_object *child, + struct md_attr *ma, struct thandle *handle, + const struct md_op_spec *spec) { int rc; ENTRY; @@ -1342,24 +1681,19 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, if (S_ISDIR(ma->ma_attr.la_mode)) { /* Add "." and ".." for newly created dir */ - __mdd_ref_add(env, child, handle); + mdo_ref_add(env, child, handle); rc = __mdd_index_insert_only(env, child, mdo2fid(child), dot, handle, BYPASS_CAPA); - if (rc == 0) { + if (rc == 0) rc = __mdd_index_insert_only(env, child, pfid, dotdot, handle, BYPASS_CAPA); - if (rc != 0) { - int rc2; - - rc2 = __mdd_index_delete(env, child, dot, 1, - handle, BYPASS_CAPA); - if (rc2 != 0) - CERROR("Failure to cleanup after dotdot" - " creation: %d (%d)\n", rc2, rc); - } - } + if (rc != 0) + mdo_ref_del(env, child, handle); } + if (rc == 0) + mdd_links_add(env, child, pfid, lname, handle, 1); + RETURN(rc); } @@ -1447,6 +1781,76 @@ static int mdd_create_sanity_check(const struct lu_env *env, RETURN(rc); } +static int mdd_declare_create(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *p, + struct mdd_object *c, + const struct lu_name *name, + struct md_attr *ma, + int lmm_size, + struct thandle *handle, + const struct md_op_spec *spec) +{ + struct lu_buf *buf = &mdd_env_info(env)->mti_buf; + int rc = 0; + + rc = mdd_declare_object_create_internal(env, p, c, ma, handle, spec); + if (rc) + GOTO(out, rc); + + /* if dir, then can inherit default ACl */ + buf->lb_buf = NULL; + buf->lb_len = lmm_size; + if (S_ISDIR(ma->ma_attr.la_mode)) { + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_DEFAULT, + 0, handle); + if (rc == 0) + rc = mdo_declare_ref_add(env, p, handle); + } + if (rc) + GOTO(out, rc); + + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_ACL_ACCESS, + 0, handle); + if (rc) + GOTO(out, rc); + + rc = mdd_declare_object_initialize(env, c, ma, handle); + if (rc) + GOTO(out, rc); + + rc = mdo_declare_index_insert(env, p, mdo2fid(c), + name->ln_name, handle); + if (rc) + GOTO(out, rc); + + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, + 0, handle); + if (rc) + GOTO(out, rc); + + if (S_ISLNK(ma->ma_attr.la_mode)) { + rc = dt_declare_record_write(env, mdd_object_child(c), + strlen(spec->u.sp_symname), 0, + handle); + if (rc) + GOTO(out, rc); + } + rc = mdo_declare_attr_set(env, p, &ma->ma_attr, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, name, handle); + if (rc) + return rc; + + rc = mdd_declare_lov_objid_update(env, mdd, handle); + +out: + return rc; +} + + /* * Create object and insert it into namespace. */ @@ -1472,11 +1876,14 @@ static int mdd_create(const struct lu_env *env, int got_def_acl = 0; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, block_count = 0; - int inode_pending = 0, block_pending = 0, parent_pending = 0; + int inode_pending[MAXQUOTAS] = { 0, 0 }; + int block_pending[MAXQUOTAS] = { 0, 0 }; + int parent_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1533,9 +1940,9 @@ static int mdd_create(const struct lu_env *env, mdd_quota_wrapper(&ma->ma_attr, qcids); mdd_quota_wrapper(la_tmp, qpids); /* get file quota for child */ - lquota_chkquota(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], 1, - &inode_pending, NULL, 0); + lquota_chkquota(mds_quota_interface_ref, obd, exp, + qcids, inode_pending, 1, NULL, 0, NULL, + 0); switch (ma->ma_attr.la_mode & S_IFMT) { case S_IFLNK: case S_IFDIR: @@ -1553,15 +1960,14 @@ static int mdd_create(const struct lu_env *env, /* get block quota for child and parent */ if (block_count) lquota_chkquota(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - block_count, - &block_pending, NULL, - LQUOTA_FLAGS_BLK); + exp, qcids, block_pending, + block_count, NULL, + LQUOTA_FLAGS_BLK, NULL, 0); if (!same) lquota_chkquota(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], 1, - &parent_pending, NULL, - LQUOTA_FLAGS_BLK); + exp, qpids, parent_pending, 1, + NULL, LQUOTA_FLAGS_BLK, NULL, + 0); } } #endif @@ -1571,8 +1977,9 @@ static int mdd_create(const struct lu_env *env, * first. */ if (S_ISREG(attr->la_mode)) { + lmm_size = ma->ma_lmm_size; rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, - spec, attr); + spec, ma); if (rc) GOTO(out_pending, rc); } @@ -1592,11 +1999,19 @@ static int mdd_create(const struct lu_env *env, got_def_acl = 1; } - mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP); - handle = mdd_trans_start(env, mdd); + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); + rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, ma, + lmm_size, handle, spec); + if (rc) + GOTO(out_stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(out_stop, rc); + dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT); if (dlh == NULL) GOTO(out_trans, rc = -ENOMEM); @@ -1626,7 +2041,7 @@ static int mdd_create(const struct lu_env *env, } #endif - rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), + rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname, son, ma, handle, spec); mdd_write_unlock(env, son); if (rc) @@ -1704,9 +2119,9 @@ cleanup: if (rc2 == 0) { mdd_write_lock(env, son, MOR_TGT_CHILD); - __mdd_ref_del(env, son, handle, 0); + mdo_ref_del(env, son, handle); if (initialized && S_ISDIR(attr->la_mode)) - __mdd_ref_del(env, son, handle, 1); + mdo_ref_del(env, son, handle); mdd_write_unlock(env, son); } } @@ -1717,25 +2132,26 @@ cleanup: mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, + S_ISDIR(attr->la_mode) ? CL_MKDIR : + S_ISREG(attr->la_mode) ? CL_CREATE : + S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD, + 0, son, mdd_pobj, NULL, lname, handle); +out_stop: mdd_trans_stop(env, mdd, rc, handle); out_free: - /* finis lov_create stuff, free all temporary data */ + /* finish lov_create stuff, free all temporary data */ mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec); out_pending: #ifdef HAVE_QUOTA_SUPPORT if (quota_opc) { - if (inode_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - 1, 0); - if (block_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qcids[USRQUOTA], qcids[GRPQUOTA], - block_count, 1); - if (parent_pending) - lquota_pending_commit(mds_quota_interface_ref, obd, - qpids[USRQUOTA], qpids[GRPQUOTA], - 1, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qcids, + inode_pending, 0); + lquota_pending_commit(mds_quota_interface_ref, obd, qcids, + block_pending, 1); + lquota_pending_commit(mds_quota_interface_ref, obd, qpids, + parent_pending, 1); /* Trigger dqacq on the owner of child and parent. If failed, * the next call for lquota_chkquota will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, @@ -1804,6 +2220,7 @@ static int mdd_rename_sanity_check(const struct lu_env *env, * the other case has been processed in cml_rename * before mdd_rename and enable MDS_PERM_BYPASS. */ LASSERT(sobj); + rc = mdd_may_delete(env, src_pobj, sobj, ma, 1, 0); if (rc) RETURN(rc); @@ -1827,6 +2244,124 @@ static int mdd_rename_sanity_check(const struct lu_env *env, RETURN(rc); } +static int mdd_declare_rename(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *mdd_spobj, + struct mdd_object *mdd_tpobj, + struct mdd_object *mdd_sobj, + struct mdd_object *mdd_tobj, + const struct lu_name *sname, + const struct lu_name *tname, + struct md_attr *ma, + struct thandle *handle) +{ + int rc; + + LASSERT(mdd_spobj); + LASSERT(mdd_tpobj); + LASSERT(mdd_sobj); + + /* name from source dir */ + rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle); + if (rc) + return rc; + + /* .. from source child */ + if (S_ISDIR(mdd_object_type(mdd_sobj))) { + /* source child can be directory, + * counted by source dir's nlink */ + rc = mdo_declare_ref_del(env, mdd_spobj, handle); + if (rc) + return rc; + + rc = mdo_declare_index_delete(env, mdd_sobj, dotdot, handle); + if (rc) + return rc; + + rc = mdo_declare_index_insert(env, mdd_sobj, mdo2fid(mdd_tpobj), + dotdot, handle); + if (rc) + return rc; + + /* new target child can be directory, + * counted by target dir's nlink */ + rc = mdo_declare_ref_add(env, mdd_tpobj, handle); + if (rc) + return rc; + + } + + rc = mdo_declare_attr_set(env, mdd_spobj, NULL, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle); + if (rc) + return rc; + mdd_declare_links_add(env, mdd_sobj, handle); + if (rc) + return rc; + + rc = mdo_declare_attr_set(env, mdd_tpobj, NULL, handle); + if (rc) + return rc; + + /* new name */ + rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj), + tname->ln_name, handle); + if (rc) + return rc; + + /* name from target dir (old name), we declare it unconditionally + * as mdd_rename() calls delete unconditionally as well. so just + * to balance declarations vs calls to change ... */ + rc = mdo_declare_index_delete(env, mdd_tpobj, tname->ln_name, handle); + if (rc) + return rc; + + if (mdd_tobj && mdd_object_exists(mdd_tobj)) { + /* delete target child in target parent directory */ + rc = mdo_declare_ref_del(env, mdd_tobj, handle); + if (rc) + return rc; + + if (S_ISDIR(mdd_object_type(mdd_tobj))) { + /* target child can be directory, + * delete "." reference in target child directory */ + rc = mdo_declare_ref_del(env, mdd_tobj, handle); + if (rc) + return rc; + + /* delete ".." reference in target parent directory */ + rc = mdo_declare_ref_del(env, mdd_tpobj, handle); + if (rc) + return rc; + } + + rc = mdo_declare_attr_set(env, mdd_tobj, NULL, handle); + if (rc) + return rc; + + mdd_declare_links_add(env, mdd_tobj, handle); + if (rc) + return rc; + + rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle); + if (rc) + return rc; + } + + rc = mdd_declare_changelog_store(env, mdd, tname, handle); + if (rc) + return rc; + + rc = mdd_declare_changelog_store(env, mdd, sname, handle); + if (rc) + return rc; + + return rc; +} + /* src object can be remote that is why we use only fid and type of object */ static int mdd_rename(const struct lu_env *env, struct md_object *src_pobj, struct md_object *tgt_pobj, @@ -1837,24 +2372,27 @@ static int mdd_rename(const struct lu_env *env, const char *sname = lsname->ln_name; const char *tname = ltname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; - struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); + struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */ struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); struct mdd_device *mdd = mdo2mdd(src_pobj); - struct mdd_object *mdd_sobj = NULL; + struct mdd_object *mdd_sobj = NULL; /* source object */ struct mdd_object *mdd_tobj = NULL; struct dynlock_handle *sdlh, *tdlh; struct thandle *handle; const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj); + const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj); int is_dir; - int rc; + int rc, rc2; #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; + struct obd_export *exp = md_quota(env)->mq_exp; struct mds_obd *mds = &obd->u.mds; unsigned int qspids[MAXQUOTAS] = { 0, 0 }; unsigned int qtcids[MAXQUOTAS] = { 0, 0 }; unsigned int qtpids[MAXQUOTAS] = { 0, 0 }; - int quota_opc = 0, rec_pending = 0; + int quota_copc = 0, quota_popc = 0; + int rec_pending[MAXQUOTAS] = { 0, 0 }; #endif ENTRY; @@ -1875,24 +2413,36 @@ static int mdd_rename(const struct lu_env *env, rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA); if (!rc) { - quota_opc = FSFILT_OP_LINK; + void *data = NULL; + mdd_data_get(env, mdd_tpobj, &data); + quota_popc = FSFILT_OP_LINK; mdd_quota_wrapper(la_tmp, qtpids); /* get block quota for target parent */ lquota_chkquota(mds_quota_interface_ref, - obd, qtpids[USRQUOTA], - qtpids[GRPQUOTA], 1, - &rec_pending, NULL, - LQUOTA_FLAGS_BLK); + obd, exp, qtpids, + rec_pending, 1, NULL, + LQUOTA_FLAGS_BLK, + data, 1); } } } } #endif - mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP); - handle = mdd_trans_start(env, mdd); + mdd_sobj = mdd_object_find(env, mdd, lf); + + handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) GOTO(out_pending, rc = PTR_ERR(handle)); + rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj, + mdd_tobj, lsname, ltname, ma, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + /* FIXME: Should consider tobj and sobj too in rename_lock. */ rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj); if (rc < 0) @@ -1918,44 +2468,50 @@ static int mdd_rename(const struct lu_env *env, if (sdlh == NULL || tdlh == NULL) GOTO(cleanup, rc = -ENOMEM); - mdd_sobj = mdd_object_find(env, mdd, lf); rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj, ma); if (rc) GOTO(cleanup, rc); + /* Remove source name from source directory */ rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle, mdd_object_capa(env, mdd_spobj)); if (rc) GOTO(cleanup, rc); /* "mv dir1 dir2" needs "dir1/.." link update */ - if (is_dir) { - rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle, - mdd_object_capa(env, mdd_spobj)); + if (is_dir && mdd_sobj && !lu_fid_eq(spobj_fid, tpobj_fid)) { + rc = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle, + mdd_object_capa(env, mdd_sobj)); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_spobj2, rc); - rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot, - is_dir, handle, - mdd_object_capa(env, mdd_tpobj)); + rc = __mdd_index_insert_only(env, mdd_sobj, tpobj_fid, dotdot, + handle, mdd_object_capa(env, mdd_sobj)); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_spobj, rc); } - /* + /* Remove target name from target directory * Here tobj can be remote one, so we do index_delete unconditionally * and -ENOENT is allowed. */ rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, mdd_object_capa(env, mdd_tpobj)); - if (rc != 0 && rc != -ENOENT) - GOTO(cleanup, rc); + if (rc != 0) { + if (mdd_tobj) { + /* tname might been renamed to something else */ + GOTO(fixup_spobj, rc); + } + if (rc != -ENOENT) + GOTO(fixup_spobj, rc); + } + /* Insert new fid with target name into target dir */ rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle, mdd_object_capa(env, mdd_tpobj)); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); LASSERT(ma->ma_attr.la_valid & LA_CTIME); la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime; @@ -1966,36 +2522,43 @@ static int mdd_rename(const struct lu_env *env, rc = mdd_attr_check_set_internal_locked(env, mdd_sobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); } - /* + /* Remove old target object * For tobj is remote case cmm layer has processed * and set tobj to NULL then. So when tobj is NOT NULL, * it must be local one. */ if (tobj && mdd_object_exists(mdd_tobj)) { mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD); - __mdd_ref_del(env, mdd_tobj, handle, 0); + if (mdd_is_dead_obj(mdd_tobj)) { + mdd_write_unlock(env, mdd_tobj); + /* shld not be dead, something is wrong */ + CERROR("tobj is dead, something is wrong\n"); + rc = -EINVAL; + goto cleanup; + } + mdo_ref_del(env, mdd_tobj, handle); /* Remove dot reference. */ if (is_dir) - __mdd_ref_del(env, mdd_tobj, handle, 1); + mdo_ref_del(env, mdd_tobj, handle); la->la_valid = LA_CTIME; rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); rc = mdd_finish_unlink(env, mdd_tobj, ma, handle); mdd_write_unlock(env, mdd_tobj); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota && ma->ma_valid & MA_INODE && ma->ma_attr.la_nlink == 0 && mdd_tobj->mod_count == 0) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + quota_copc = FSFILT_OP_UNLINK_PARTIAL_CHILD; mdd_quota_wrapper(&ma->ma_attr, qtcids); } #endif @@ -2004,7 +2567,7 @@ static int mdd_rename(const struct lu_env *env, la->la_valid = LA_CTIME | LA_MTIME; rc = mdd_attr_check_set_internal_locked(env, mdd_spobj, la, handle, 0); if (rc) - GOTO(cleanup, rc); + GOTO(fixup_tpobj, rc); if (mdd_spobj != mdd_tpobj) { la->la_valid = LA_CTIME | LA_MTIME; @@ -2012,40 +2575,392 @@ static int mdd_rename(const struct lu_env *env, handle, 0); } + if (rc == 0 && mdd_sobj) { + mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD); + rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname, + mdo2fid(mdd_tpobj), ltname, handle); + if (rc == -ENOENT) + /* Old files might not have EA entry */ + mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj), + lsname, handle, 0); + mdd_write_unlock(env, mdd_sobj); + /* We don't fail the transaction if the link ea can't be + updated -- fid2path will use alternate lookup method. */ + rc = 0; + } + EXIT; + +fixup_tpobj: + if (rc) { + rc2 = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle, + BYPASS_CAPA); + if (rc2) + CWARN("tp obj fix error %d\n",rc2); + + if (mdd_tobj && mdd_object_exists(mdd_tobj) && + !mdd_is_dead_obj(mdd_tobj)) { + rc2 = __mdd_index_insert(env, mdd_tpobj, + mdo2fid(mdd_tobj), tname, + is_dir, handle, + BYPASS_CAPA); + + if (rc2) + CWARN("tp obj fix error %d\n",rc2); + } + } + +fixup_spobj: + if (rc && is_dir && mdd_sobj) { + rc2 = __mdd_index_delete_only(env, mdd_sobj, dotdot, handle, + BYPASS_CAPA); + + if (rc2) + CWARN("sp obj dotdot delete error %d\n",rc2); + + + rc2 = __mdd_index_insert_only(env, mdd_sobj, spobj_fid, + dotdot, handle, BYPASS_CAPA); + if (rc2) + CWARN("sp obj dotdot insert error %d\n",rc2); + } + +fixup_spobj2: + if (rc) { + rc2 = __mdd_index_insert(env, mdd_spobj, + lf, sname, is_dir, handle, BYPASS_CAPA); + if (rc2) + CWARN("sp obj fix error %d\n",rc2); + } cleanup: if (likely(tdlh) && sdlh != tdlh) mdd_pdo_write_unlock(env, mdd_tpobj, tdlh); if (likely(sdlh)) mdd_pdo_write_unlock(env, mdd_spobj, sdlh); cleanup_unlocked: + if (rc == 0) + rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, 0, mdd_tobj, + mdd_spobj, lf, lsname, handle); + if (rc == 0) { + struct lu_fid zero_fid; + fid_zero(&zero_fid); + /* If the rename target exist, The CL_EXT record should save + * the target fid as tfid, otherwise, use zero fid. LU-543 */ + rc = mdd_changelog_ns_store(env, mdd, CL_EXT, 0, mdd_tobj, + mdd_tpobj, + mdd_tobj ? NULL : &zero_fid, + ltname, handle); + } + +stop: mdd_trans_stop(env, mdd, rc, handle); if (mdd_sobj) mdd_object_put(env, mdd_sobj); out_pending: #ifdef HAVE_QUOTA_SUPPORT if (mds->mds_quota) { - if (rec_pending) + if (quota_popc) lquota_pending_commit(mds_quota_interface_ref, obd, - qtpids[USRQUOTA], - qtpids[GRPQUOTA], - 1, 1); - /* Trigger dqrel on the source owner of parent. - * If failed, the next call for lquota_chkquota will - * process it. */ - lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc, - FSFILT_OP_UNLINK_PARTIAL_PARENT); - if (quota_opc) - /* Trigger dqrel/dqacq on the target owner of child and - * parent. If failed, the next call for lquota_chkquota + qtpids, rec_pending, 1); + + if (quota_copc) { + /* Trigger dqrel on the source owner of parent. + * If failed, the next call for lquota_chkquota will + * process it. */ + lquota_adjust(mds_quota_interface_ref, obd, 0, qspids, rc, + FSFILT_OP_UNLINK_PARTIAL_PARENT); + + /* Trigger dqrel on the target owner of child. + * If failed, the next call for lquota_chkquota * will process it. */ lquota_adjust(mds_quota_interface_ref, obd, qtcids, - qtpids, rc, quota_opc); + qtpids, rc, quota_copc); + } } #endif return rc; } +/** enable/disable storing of hardlink info */ +int mdd_linkea_enable = 1; +CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644, + "record hardlink info in EAs"); + +/** Read the link EA into a temp buffer. + * Uses the name_buf since it is generally large. + * \retval IS_ERR err + * \retval ptr to \a lu_buf (always \a mti_big_buf) + */ +struct lu_buf *mdd_links_get(const struct lu_env *env, + struct mdd_object *mdd_obj) +{ + struct lu_buf *buf; + struct lustre_capa *capa; + struct link_ea_header *leh; + int rc; + + /* First try a small buf */ + buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (buf->lb_buf == NULL) + return ERR_PTR(-ENOMEM); + + capa = mdd_object_capa(env, mdd_obj); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + if (rc == -ERANGE) { + /* Buf was too small, figure out what we need. */ + mdd_buf_put(buf); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + if (rc < 0) + return ERR_PTR(rc); + buf = mdd_buf_alloc(env, rc); + if (buf->lb_buf == NULL) + return ERR_PTR(-ENOMEM); + rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa); + } + if (rc < 0) + return ERR_PTR(rc); + + leh = buf->lb_buf; + if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) { + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_reccount = __swab32(leh->leh_reccount); + leh->leh_len = __swab64(leh->leh_len); + /* entries are swabbed by mdd_lee_unpack */ + } + if (leh->leh_magic != LINK_EA_MAGIC) + return ERR_PTR(-EINVAL); + if (leh->leh_reccount == 0) + return ERR_PTR(-ENODATA); + + return buf; +} + +/** Pack a link_ea_entry. + * All elements are stored as chars to avoid alignment issues. + * Numbers are always big-endian + * \retval record length + */ +static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname, + const struct lu_fid *pfid) +{ + struct lu_fid tmpfid; + int reclen; + + fid_cpu_to_be(&tmpfid, pfid); + memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid)); + memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen); + reclen = sizeof(struct link_ea_entry) + lname->ln_namelen; + + lee->lee_reclen[0] = (reclen >> 8) & 0xff; + lee->lee_reclen[1] = reclen & 0xff; + return reclen; +} + +void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen, + struct lu_name *lname, struct lu_fid *pfid) +{ + *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1]; + memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid)); + fid_be_to_cpu(pfid, pfid); + lname->ln_name = lee->lee_name; + lname->ln_namelen = *reclen - sizeof(struct link_ea_entry); +} + +/** Add a record to the end of link ea buf */ +static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf, + const struct lu_fid *pfid, + const struct lu_name *lname) +{ + struct link_ea_header *leh; + struct link_ea_entry *lee; + int reclen; + + if (lname == NULL || pfid == NULL) + return -EINVAL; + + /* Make sure our buf is big enough for the new one */ + leh = buf->lb_buf; + reclen = lname->ln_namelen + sizeof(struct link_ea_entry); + if (leh->leh_len + reclen > buf->lb_len) { + if (mdd_buf_grow(env, leh->leh_len + reclen) < 0) + return -ENOMEM; + } + + leh = buf->lb_buf; + lee = buf->lb_buf + leh->leh_len; + reclen = mdd_lee_pack(lee, lname, pfid); + leh->leh_len += reclen; + leh->leh_reccount++; + return 0; +} + +static int mdd_declare_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + struct thandle *handle) +{ + int rc; + + /* XXX: max size? */ + rc = mdo_declare_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, NULL, 4096), + XATTR_NAME_LINK, 0, handle); + + return rc; +} + +/* For pathologic linkers, we don't want to spend lots of time scanning the + * link ea. Limit ourseleves to something reasonable; links not in the EA + * can be looked up via (slower) parent lookup. + */ +#define LINKEA_MAX_COUNT 128 + +static int mdd_links_add(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *pfid, + const struct lu_name *lname, + struct thandle *handle, int first) +{ + struct lu_buf *buf; + struct link_ea_header *leh; + int rc; + ENTRY; + + if (!mdd_linkea_enable) + RETURN(0); + + buf = first ? ERR_PTR(-ENODATA) : mdd_links_get(env, mdd_obj); + if (IS_ERR(buf)) { + rc = PTR_ERR(buf); + if (rc != -ENODATA) { + CERROR("link_ea read failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + RETURN (rc); + } + /* empty EA; start one */ + buf = mdd_buf_alloc(env, CFS_PAGE_SIZE); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + leh = buf->lb_buf; + leh->leh_magic = LINK_EA_MAGIC; + leh->leh_len = sizeof(struct link_ea_header); + leh->leh_reccount = 0; + } + + leh = buf->lb_buf; + if (leh->leh_reccount > LINKEA_MAX_COUNT) + RETURN(-EOVERFLOW); + + rc = __mdd_links_add(env, buf, pfid, lname); + if (rc) + RETURN(rc); + + leh = buf->lb_buf; + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), + XATTR_NAME_LINK, 0, handle); + if (rc) { + if (rc == -ENOSPC) + CDEBUG(D_INODE, "link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + else + CERROR("link_ea add failed %d "DFID"\n", rc, + PFID(mdd_object_fid(mdd_obj))); + } + + if (buf->lb_len > OBD_ALLOC_BIG) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + RETURN (rc); +} + +static int mdd_links_rename(const struct lu_env *env, + struct mdd_object *mdd_obj, + const struct lu_fid *oldpfid, + const struct lu_name *oldlname, + const struct lu_fid *newpfid, + const struct lu_name *newlname, + struct thandle *handle) +{ + struct lu_buf *buf; + struct link_ea_header *leh; + struct link_ea_entry *lee; + struct lu_name *tmpname = &mdd_env_info(env)->mti_name; + struct lu_fid *tmpfid = &mdd_env_info(env)->mti_fid; + int reclen = 0; + int count; + int rc, rc2 = 0; + ENTRY; + + if (!mdd_linkea_enable) + RETURN(0); + + if (mdd_obj->mod_flags & DEAD_OBJ) + /* No more links, don't bother */ + RETURN(0); + + buf = mdd_links_get(env, mdd_obj); + if (IS_ERR(buf)) { + rc = PTR_ERR(buf); + if (rc == -ENODATA) + CDEBUG(D_INODE, "link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); + else + CERROR("link_ea read failed %d "DFID"\n", + rc, PFID(mdd_object_fid(mdd_obj))); + RETURN(rc); + } + leh = buf->lb_buf; + lee = (struct link_ea_entry *)(leh + 1); /* link #0 */ + + /* Find the old record */ + for(count = 0; count < leh->leh_reccount; count++) { + mdd_lee_unpack(lee, &reclen, tmpname, tmpfid); + if (tmpname->ln_namelen == oldlname->ln_namelen && + lu_fid_eq(tmpfid, oldpfid) && + (strncmp(tmpname->ln_name, oldlname->ln_name, + tmpname->ln_namelen) == 0)) + break; + lee = (struct link_ea_entry *)((char *)lee + reclen); + } + if ((count + 1) > leh->leh_reccount) { + CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n", + oldlname->ln_namelen, oldlname->ln_name); + GOTO(out, rc = -ENOENT); + } + + /* Remove the old record */ + leh->leh_reccount--; + leh->leh_len -= reclen; + memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len - + (char *)lee); + + /* If renaming, add the new record */ + if (newpfid != NULL) { + /* if the add fails, we still delete the out-of-date old link */ + rc2 = __mdd_links_add(env, buf, newpfid, newlname); + leh = buf->lb_buf; + } + + rc = __mdd_xattr_set(env, mdd_obj, + mdd_buf_get_const(env, buf->lb_buf, leh->leh_len), + XATTR_NAME_LINK, 0, handle); + +out: + if (rc == 0) + rc = rc2; + if (rc) + CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n", + oldlname->ln_namelen, oldlname->ln_name, rc, + PFID(mdd_object_fid(mdd_obj))); + + if (buf->lb_len > OBD_ALLOC_BIG) + /* if we vmalloced a large buffer drop it */ + mdd_buf_put(buf); + + RETURN (rc); +} + const struct md_dir_operations mdd_dir_ops = { .mdo_is_subdir = mdd_is_subdir, .mdo_lookup = mdd_lookup, @@ -2053,8 +2968,9 @@ const struct md_dir_operations mdd_dir_ops = { .mdo_rename = mdd_rename, .mdo_link = mdd_link, .mdo_unlink = mdd_unlink, + .mdo_lum_lmm_cmp = mdd_lum_lmm_cmp, .mdo_name_insert = mdd_name_insert, .mdo_name_remove = mdd_name_remove, .mdo_rename_tgt = mdd_rename_tgt, - .mdo_create_data = mdd_create_data + .mdo_create_data = mdd_create_data, };