int (*m_free_lustre_md)(struct obd_export *, struct lustre_md *);
- int (*m_merge_attr)(struct obd_export *,
+ int (*m_merge_attr)(struct obd_export *, const struct lu_fid *fid,
const struct lmv_stripe_md *lsm,
struct cl_attr *attr, ldlm_blocking_callback);
}
static inline int md_merge_attr(struct obd_export *exp,
+ const struct lu_fid *fid,
const struct lmv_stripe_md *lsm,
struct cl_attr *attr,
ldlm_blocking_callback cb)
if (rc)
return rc;
- return MDP(exp->exp_obd, merge_attr)(exp, lsm, attr, cb);
+ return MDP(exp->exp_obd, merge_attr)(exp, fid, lsm, attr, cb);
}
static inline int md_setxattr(struct obd_export *exp, const struct lu_fid *fid,
PFID(ll_inode2fid(inode)), inode, dentry->d_name.name);
/* Call getattr by fid, so do not provide name at all. */
- op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
- LUSTRE_OPC_ANY, NULL);
+ op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode, inode,
+ NULL, 0, 0, LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
RETURN(0);
down_read(&lli->lli_lsm_sem);
- rc = md_merge_attr(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
+ rc = md_merge_attr(ll_i2mdexp(inode), &lli->lli_fid, lli->lli_lsm_md,
&attr, ll_md_blocking_ast);
up_read(&lli->lli_lsm_sem);
if (rc != 0)
GOTO(unlock, rc = -ENOMEM);
/* validate the lsm */
- rc = md_merge_attr(ll_i2mdexp(inode), lli->lli_lsm_md, attr,
- ll_md_blocking_ast);
+ rc = md_merge_attr(ll_i2mdexp(inode), &lli->lli_fid, lli->lli_lsm_md,
+ attr, ll_md_blocking_ast);
if (!rc) {
if (md->body->mbo_valid & OBD_MD_FLNLINK)
md->body->mbo_nlink = attr->cat_nlink;
}
int lmv_revalidate_slaves(struct obd_export *exp,
+ const struct lu_fid *pfid,
const struct lmv_stripe_md *lsm,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags)
* which is not needed here.
*/
memset(op_data, 0, sizeof(*op_data));
- op_data->op_fid1 = fid;
+ op_data->op_fid1 = *pfid;
op_data->op_fid2 = fid;
tgt = lmv_tgt(lmv, lsm->lsm_md_oinfo[i].lmo_mds);
}
retry:
- tgt = lmv_locate_tgt(lmv, op_data);
+ if (op_data->op_name) {
+ tgt = lmv_locate_tgt(lmv, op_data);
+ if (!fid_is_sane(&op_data->op_fid2))
+ fid_zero(&op_data->op_fid2);
+ } else if (fid_is_sane(&op_data->op_fid2)) {
+ tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
+ } else {
+ tgt = lmv_fid2tgt(lmv, &op_data->op_fid1);
+ }
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
- if (!fid_is_sane(&op_data->op_fid2))
- fid_zero(&op_data->op_fid2);
-
CDEBUG(D_INODE, "LOOKUP_INTENT with fid1="DFID", fid2="DFID
", name='%s' -> mds #%u\n",
PFID(&op_data->op_fid1), PFID(&op_data->op_fid2),
/* If RPC happens, lsm information will be revalidated
* during update_inode process (see ll_update_lsm_md) */
if (lmv_dir_striped(op_data->op_mea2)) {
- rc = lmv_revalidate_slaves(exp, op_data->op_mea2,
+ rc = lmv_revalidate_slaves(exp, &op_data->op_fid2,
+ op_data->op_mea2,
cb_blocking,
extra_lock_flags);
if (rc != 0)
struct lu_fid *fid, struct md_op_data *op_data);
int lmv_revalidate_slaves(struct obd_export *exp,
+ const struct lu_fid *pfid,
const struct lmv_stripe_md *lsm,
ldlm_blocking_callback cb_blocking,
int extra_lock_flags);
}
static int lmv_merge_attr(struct obd_export *exp,
+ const struct lu_fid *fid,
const struct lmv_stripe_md *lsm,
struct cl_attr *attr,
ldlm_blocking_callback cb_blocking)
if (!lmv_dir_striped(lsm))
return 0;
- rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0);
+ rc = lmv_revalidate_slaves(exp, fid, lsm, cb_blocking, 0);
if (rc < 0)
return rc;
__u64 child_bits,
struct ldlm_reply *ldlm_rep)
{
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_body *reqbody = NULL;
- struct mdt_object *parent = info->mti_object;
- struct mdt_object *child;
- struct lu_fid *child_fid = &info->mti_tmp_fid1;
- struct lu_name *lname = NULL;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_body *reqbody = NULL;
+ struct mdt_object *parent = info->mti_object;
+ struct mdt_object *child = NULL;
+ struct lu_fid *child_fid = &info->mti_tmp_fid1;
+ struct lu_name *lname = NULL;
struct mdt_lock_handle *lhp = NULL;
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock;
struct req_capsule *pill = info->mti_pill;
__u64 try_bits = 0;
bool is_resent;
mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
if (lu_name_is_valid(lname)) {
+ if (mdt_object_remote(parent)) {
+ CERROR("%s: parent "DFID" is on remote target\n",
+ mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(parent)));
+ RETURN(-EPROTO);
+ }
+
CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
"ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
PNAME(lname), ldlm_rep);
RETURN(err_serious(-EPROTO));
*child_fid = reqbody->mbo_fid2;
-
if (unlikely(!fid_is_sane(child_fid)))
RETURN(err_serious(-EINVAL));
+ if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
+ mdt_object_get(info->mti_env, parent);
+ child = parent;
+ } else {
+ child = mdt_object_find(info->mti_env, info->mti_mdt,
+ child_fid);
+ if (IS_ERR(child))
+ RETURN(PTR_ERR(child));
+ }
+
+ if (mdt_object_remote(child)) {
+ CERROR("%s: child "DFID" is on remote target\n",
+ mdt_obd_name(info->mti_mdt),
+ PFID(mdt_object_fid(child)));
+ GOTO(out_child, rc = -EPROTO);
+ }
+
+ /* don't fetch LOOKUP lock if it's remote object */
+ rc = mdt_is_remote_object(info, parent, child);
+ if (rc < 0)
+ GOTO(out_child, rc);
+ if (rc)
+ child_bits &= ~MDS_INODELOCK_LOOKUP;
+
CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
"ldlm_rep = %p\n",
PFID(mdt_object_fid(parent)),
LU_OBJECT_DEBUG(D_INODE, info->mti_env,
&parent->mot_obj,
"Parent doesn't exist!");
- RETURN(-ESTALE);
- }
-
- if (mdt_object_remote(parent)) {
- CERROR("%s: parent "DFID" is on remote target\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(parent)));
- RETURN(-EIO);
+ GOTO(out_child, rc = -ESTALE);
}
if (lu_name_is_valid(lname)) {
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
if (rc != 0)
- GOTO(out_parent, rc);
- }
-
- mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+ GOTO(unlock_parent, rc);
- /*
- *step 3: find the child object by fid & lock it.
- * regardless if it is local or remote.
- *
- *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e)
- * set parent dir fid the same as child fid in getattr by fid case
- * we should not lu_object_find() the object again, could lead
- * to hung if there is a concurrent unlink destroyed the object.
- */
- if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
- mdt_object_get(info->mti_env, parent);
- child = parent;
- } else {
child = mdt_object_find(info->mti_env, info->mti_mdt,
child_fid);
+ if (unlikely(IS_ERR(child)))
+ GOTO(unlock_parent, rc = PTR_ERR(child));
}
- if (unlikely(IS_ERR(child)))
- GOTO(out_parent, rc = PTR_ERR(child));
+ mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+
+ /* step 3: lock child regardless if it is local or remote. */
+ LASSERT(child);
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
if (!mdt_object_exists(child)) {
unlock_res_and_lock(lock);
}
LDLM_LOCK_PUT(lock);
- GOTO(out_parent, rc = 0);
+ GOTO(unlock_parent, rc = 0);
}
LDLM_LOCK_PUT(lock);
}
EXIT;
out_child:
- mdt_object_put(info->mti_env, child);
-out_parent:
+ if (child)
+ mdt_object_put(info->mti_env, child);
+unlock_parent:
if (lhp)
mdt_object_unlock(info, parent, lhp, 1);
return rc;
/* FLR: layout change API */
struct md_layout_change mti_mlc;
+
+ struct lu_seq_range mti_range;
};
extern struct lu_context_key mdt_thread_key;
struct mdt_object *o);
void mdt_restripe_update_add(struct mdt_thread_info *info,
struct mdt_object *o);
+int mdt_is_remote_object(struct mdt_thread_info *info,
+ struct mdt_object *parent,
+ struct mdt_object *child);
#endif /* _MDT_INTERNAL_H */
}
return rc;
}
+
+/* check whether two FIDs belong to different MDT. */
+static int mdt_fids_different_target(struct mdt_thread_info *info,
+ const struct lu_fid *fid1,
+ const struct lu_fid *fid2)
+{
+ const struct lu_env *env = info->mti_env;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct lu_seq_range *range = &info->mti_range;
+ struct seq_server_site *ss;
+ __u32 index1, index2;
+ int rc;
+
+ if (fid_seq(fid1) == fid_seq(fid2))
+ return 0;
+
+ ss = mdt->mdt_lu_dev.ld_site->ld_seq_site;
+
+ range->lsr_flags = LU_SEQ_RANGE_MDT;
+ rc = fld_server_lookup(env, ss->ss_server_fld, fid1->f_seq, range);
+ if (rc)
+ return rc;
+
+ index1 = range->lsr_index;
+
+ rc = fld_server_lookup(env, ss->ss_server_fld, fid2->f_seq, range);
+ if (rc)
+ return rc;
+
+ index2 = range->lsr_index;
+
+ return index1 != index2;
+}
+
+static bool mdt_object_is_shard(struct mdt_thread_info *info,
+ struct mdt_object *obj)
+{
+ struct lmv_mds_md_v1 *lmv = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
+ struct lu_buf buf;
+ int rc;
+
+ if (!mdt_object_exists(obj))
+ return false;
+
+ if (!S_ISDIR(lu_object_attr(&obj->mot_obj)))
+ return false;
+
+ buf.lb_buf = lmv;
+ buf.lb_len = sizeof(*lmv);
+ rc = mo_xattr_get(info->mti_env, mdt_object_child(obj), &buf,
+ XATTR_NAME_LMV);
+ if (rc < 0)
+ return false;
+
+ return lmv->lmv_magic == cpu_to_le32(LMV_MAGIC_STRIPE);
+}
+
+/**
+ * Check whether \a child is remote object on \a parent.
+ *
+ * \param[in] info thread environment
+ * \param[in] parent parent object, it's the same as child object in
+ * getattr_by_fid
+ * \param[in] child child object
+ *
+ * \retval 1 is remote object.
+ * \retval 0 isn't remote object.
+ * \retval < 1 error code
+ */
+int mdt_is_remote_object(struct mdt_thread_info *info,
+ struct mdt_object *parent,
+ struct mdt_object *child)
+{
+ struct lu_buf *buf = &info->mti_big_buf;
+ struct linkea_data ldata = { NULL };
+ struct link_ea_header *leh;
+ struct link_ea_entry *lee;
+ struct lu_name name;
+ struct lu_fid pfid;
+ int reclen;
+ int i;
+ int rc;
+
+ ENTRY;
+
+ if (fid_is_root(mdt_object_fid(child)))
+ RETURN(0);
+
+ if (likely(parent != child)) {
+ if (mdt_object_remote(parent) ^ mdt_object_remote(child)) {
+ /* don't treat shard as remote object, otherwise client
+ * need to revalidate shards all the time.
+ */
+ if (mdt_object_is_shard(info, child))
+ RETURN(0);
+ RETURN(1);
+ }
+
+ if (!mdt_object_remote(parent) && !mdt_object_remote(child))
+ RETURN(0);
+
+ rc = mdt_fids_different_target(info, mdt_object_fid(parent),
+ mdt_object_fid(child));
+ RETURN(rc);
+ }
+
+ /* client < 2.13.52 getattr_by_fid parent and child are the same */
+ buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+ if (!buf->lb_buf)
+ RETURN(-ENOMEM);
+
+ ldata.ld_buf = buf;
+ rc = mdt_links_read(info, child, &ldata);
+ /* can't read linkea, just assume it's remote object */
+ if (rc == -ENOENT || rc == -ENODATA)
+ RETURN(1);
+ if (rc)
+ RETURN(rc);
+
+ leh = buf->lb_buf;
+ lee = (struct link_ea_entry *)(leh + 1);
+ for (i = 0; i < leh->leh_reccount; i++) {
+ linkea_entry_unpack(lee, &reclen, &name, &pfid);
+ lee = (struct link_ea_entry *) ((char *)lee + reclen);
+ if (mdt_fids_different_target(info, &pfid,
+ mdt_object_fid(child)))
+ RETURN(1);
+ }
+
+ RETURN(0);
+}