return lu_object_exists(&dt->do_lu);
}
+static inline int dt_object_remote(const struct dt_object *dt)
+{
+ return lu_object_remote(&dt->do_lu);
+}
+
static inline struct dt_object *lu2dt_obj(struct lu_object *o)
{
LASSERT(ergo(o != NULL, lu_device_is_dt(o->lo_dev)));
/**
- * \retval 1 iff object \a o exists on stable storage,
- * \retval 0 iff object \a o not exists on stable storage.
- * \retval -1 iff object \a o is on remote server.
+ * Check whether object exists, no matter on local or remote storage.
+ * Note: LOHA_EXISTS will be set once some one created the object,
+ * and it does not needs to be committed to storage.
*/
-static inline int lu_object_exists(const struct lu_object *o)
-{
- __u32 attr;
-
- attr = o->lo_header->loh_attr;
- if (attr & LOHA_REMOTE)
- return -1;
- else if (attr & LOHA_EXISTS)
- return +1;
- else
- return 0;
-}
+#define lu_object_exists(o) ((o)->lo_header->loh_attr & LOHA_EXISTS)
+
+/**
+ * Check whether object on the remote storage.
+ */
+#define lu_object_remote(o) unlikely((o)->lo_header->loh_attr & LOHA_REMOTE)
static inline int lu_object_assert_exists(const struct lu_object *o)
{
- return lu_object_exists(o) != 0;
+ return lu_object_exists(o);
}
static inline int lu_object_assert_not_exists(const struct lu_object *o)
{
- return lu_object_exists(o) <= 0;
+ return !lu_object_exists(o);
}
/**
/* llog_osd.c */
extern struct llog_operations llog_osd_ops;
int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
- int idx, int count, struct llog_catid *idarray);
+ int idx, int count,
+ struct llog_catid *idarray);
int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
- int idx, int count, struct llog_catid *idarray);
+ int idx, int count,
+ struct llog_catid *idarray);
#define LLOG_CTXT_FLAG_UNINITIALIZED 0x00000001
#define LLOG_CTXT_FLAG_STOP 0x00000002
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *tgt = NULL;
struct mdt_body *body;
- int rc;
+ int rc;
ENTRY;
rc = lmv_check_connect(obd);
body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
if (body == NULL)
RETURN(-EPROTO);
- /*
- * Not cross-ref case, just get out of here.
- */
+
+ /* Not cross-ref case, just get out of here. */
if (likely(!(body->valid & OBD_MD_MDS)))
RETURN(0);
- /* Clearly this is a remote object, try remote MDT */
+ CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
+ exp->exp_obd->obd_name, PFID(&body->fid1));
+
+ /* This is a remote object, try remote MDT, Note: it may
+ * try more than 1 time here, Considering following case
+ * /mnt/lustre is root on MDT0, remote1 is on MDT1
+ * 1. Initially A does not know where remote1 is, it send
+ * unlink RPC to MDT0, MDT0 return -EREMOTE, it will
+ * resend unlink RPC to MDT1 (retry 1st time).
+ *
+ * 2. During the unlink RPC in flight,
+ * client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
+ * and create new remote1, but on MDT0
+ *
+ * 3. MDT1 get unlink RPC(from A), then do remote lock on
+ * /mnt/lustre, then lookup get fid of remote1, and find
+ * it is remote dir again, and replay -EREMOTE again.
+ *
+ * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
+ *
+ * In theory, it might try unlimited time here, but it should
+ * be very rare case. */
op_data->op_fid2 = body->fid1;
+ ptlrpc_req_finished(*request);
+ *request = NULL;
+
goto retry;
}
* in case of late striping creation, ->ah_init()
* can be called with local object existing
*/
- if (!dt_object_exists(nextc))
+ if (!dt_object_exists(nextc) || dt_object_remote(nextc))
nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode);
if (S_ISDIR(child_mode)) {
LASSERT(dof);
LASSERT(attr);
LASSERT(th);
- LASSERT(!dt_object_exists(next));
/*
* first of all, we declare creation of local object
parent = mdd_object_find(env, mdd, pfid);
if (IS_ERR(parent)) {
GOTO(out, rc = PTR_ERR(parent));
- } else if (mdd_object_exists(parent) < 0) {
+ } else if (mdd_object_remote(parent)) {
/*FIXME: Because of the restriction of rename in Phase I.
* If the parent is remote, we just assumed lf is not the
* parent of P1 for now */
int rc = 0;
ENTRY;
- if (cobj && mdd_object_exists(cobj))
+ if (cobj && mdd_object_exists(cobj))
RETURN(-EEXIST);
if (mdd_is_dead_obj(pobj))
if (unlikely(mdd_is_dead_obj(mdd_obj)))
RETURN(-ESTALE);
- rc = mdd_object_exists(mdd_obj);
- if (unlikely(rc == 0))
- RETURN(-ESTALE);
- else if (unlikely(rc < 0)) {
- CERROR("Object "DFID" locates on remote server\n",
- PFID(mdo2fid(mdd_obj)));
- RETURN(-EINVAL);
- }
+ if (mdd_object_remote(mdd_obj)) {
+ CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
+ mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
+ } else if (!mdd_object_exists(mdd_obj)) {
+ RETURN(-ESTALE);
+ }
/* The common filename length check. */
if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
RETURN(rc);
}
-int mdd_declare_object_initialize(const struct lu_env *env,
- struct mdd_object *child,
- struct lu_attr *attr,
- struct thandle *handle)
+static int mdd_declare_object_initialize(const struct lu_env *env,
+ struct mdd_object *parent,
+ struct mdd_object *child,
+ struct lu_attr *attr,
+ struct thandle *handle)
{
int rc;
+ ENTRY;
/*
* inode mode has been set in creation time, and it's based on umask,
dot, handle);
if (rc == 0)
rc = mdo_declare_ref_add(env, child, handle);
+
+ rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
+ dotdot, handle);
}
if (rc == 0)
mdd_declare_links_add(env, child, handle);
- return rc;
+ RETURN(rc);
}
int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
GOTO(out, rc);
}
- rc = mdd_declare_object_initialize(env, c, attr, handle);
+ rc = mdd_declare_object_initialize(env, p, c, attr, handle);
if (rc)
GOTO(out, rc);
return lu_object_exists(mdd2lu_obj(obj));
}
+static inline int mdd_object_remote(struct mdd_object *obj)
+{
+ return lu_object_remote(mdd2lu_obj(obj));
+}
+
static inline const struct lu_fid *mdd_object_fid(struct mdd_object *obj)
{
return lu_object_fid(mdd2lu_obj(obj));
* if the object doesn't exist yet, then it's supposed to be created
* and declaration of the creation should be enough to insert ./..
*/
- if (mdd_object_exists(obj)) {
+ /* FIXME: remote object should not be awared by MDD layer, but local
+ * creation does not declare insert ./.. (comments above), which
+ * is required by remote directory creation.
+ * This remote check should be removed when mdd_object_exists check is
+ * removed.
+ */
+ if (mdd_object_exists(obj) || mdd_object_remote(obj)) {
rc = -ENOTDIR;
if (dt_try_as_dir(env, next))
rc = dt_declare_insert(env, next,
static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
{
- if (lu_object_exists(o))
+ if (lu_object_exists(o))
return mdd_get_flags(env, lu2mdd_obj(o));
else
return 0;
const struct md_op_spec *spec)
{
struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+ struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
const struct dt_index_features *feat = spec->sp_feat;
int rc;
ENTRY;
}
}
- rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle);
+ rc = mdo_declare_create_obj(env, c, attr, hint, dof, handle);
RETURN(rc);
}
ma->ma_valid = 0;
- rc = mdt_object_exists(o);
- if (rc < 0) {
- /* This object is located on remote node.*/
- repbody->fid1 = *mdt_object_fid(o);
- repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
- GOTO(out, rc = 0);
- }
+ if (mdt_object_remote(o)) {
+ /* This object is located on remote node.*/
+ repbody->fid1 = *mdt_object_fid(o);
+ repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
+ GOTO(out, rc = 0);
+ }
buffer->lb_len = reqbody->eadatasize;
if (buffer->lb_len > 0)
repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
- /*
- * We save last checked parent fid to @repbody->fid1 for remote
- * directory case.
- */
- LASSERT(fid_is_sane(&body->fid2));
- LASSERT(mdt_object_exists(o) > 0);
- rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
- &body->fid2, &repbody->fid1);
- if (rc == 0 || rc == -EREMOTE)
- repbody->valid |= OBD_MD_FLID;
+ /*
+ * We save last checked parent fid to @repbody->fid1 for remote
+ * directory case.
+ */
+ LASSERT(fid_is_sane(&body->fid2));
+ LASSERT(mdt_object_exists(o) && !mdt_object_remote(o));
+ rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
+ &body->fid2, &repbody->fid1);
+ if (rc == 0 || rc == -EREMOTE)
+ repbody->valid |= OBD_MD_FLID;
- RETURN(rc);
+ RETURN(rc);
}
static int mdt_raw_lookup(struct mdt_thread_info *info,
}
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
- rc = mdt_object_exists(parent);
- if (unlikely(rc == 0)) {
+ if (unlikely(!mdt_object_exists(parent))) {
LU_OBJECT_DEBUG(D_INODE, info->mti_env,
&parent->mot_obj.mo_lu,
"Parent doesn't exist!\n");
RETURN(-ESTALE);
} else if (!info->mti_cross_ref) {
- LASSERTF(rc > 0, "Parent "DFID" is on remote server\n",
+ LASSERTF(!mdt_object_remote(parent),
+ "Parent "DFID" is on remote server\n",
PFID(mdt_object_fid(parent)));
}
if (lname) {
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PR);
- if (mdt_object_exists(child) == 0) {
- LU_OBJECT_DEBUG(D_INODE, info->mti_env,
- &child->mot_obj.mo_lu,
- "Object doesn't exist!\n");
- GOTO(out_child, rc = -ENOENT);
- }
+ if (!mdt_object_exists(child)) {
+ LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+ &child->mot_obj.mo_lu,
+ "Object doesn't exist!\n");
+ GOTO(out_child, rc = -ENOENT);
+ }
if (!(child_bits & MDS_INODELOCK_UPDATE) &&
- mdt_object_exists(child) > 0) {
+ mdt_object_exists(child) && !mdt_object_remote(child)) {
struct md_attr *ma = &info->mti_attr;
ma->ma_valid = 0;
(unsigned long)res_id->name[1],
(unsigned long)res_id->name[2],
PFID(mdt_object_fid(child)));
- if (mdt_object_exists(child) > 0)
+ if (mdt_object_exists(child) && !mdt_object_remote(child))
mdt_pack_size2body(info, child);
}
if (lock)
int rc = 0;
ENTRY;
- LASSERT(mdt_object_exists(o) < 0);
+ LASSERT(mdt_object_remote(o));
LASSERT((ibits & MDS_INODELOCK_UPDATE));
LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
LASSERT(lh->mlh_type != MDT_NUL_LOCK);
- if (mdt_object_exists(o) < 0) {
+ if (mdt_object_remote(o)) {
if (locality == MDT_CROSS_LOCK) {
ibits &= ~MDS_INODELOCK_UPDATE;
ibits |= MDS_INODELOCK_LOOKUP;
LASSERT(lh->mlh_type != MDT_PDO_LOCK);
}
- if (lh->mlh_type == MDT_PDO_LOCK) {
+ if (lh->mlh_type == MDT_PDO_LOCK) {
/* check for exists after object is locked */
if (mdt_object_exists(o) == 0) {
/* Non-existent object shouldn't have PDO lock */
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
- rc = mdt_object_exists(obj);
- if (rc < 0) {
- rc = -EREMOTE;
- /**
- * before calling version get the correct MDS should be
- * fid, this is error to find remote object here
- */
- CERROR("nonlocal object "DFID"\n", PFID(fid));
- } else if (rc == 0) {
- *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
- rc = -ENOENT;
- } else {
- version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
- *(__u64 *)data->ioc_inlbuf2 = version;
- rc = 0;
- }
- mdt_object_unlock_put(mti, obj, lh, 1);
- RETURN(rc);
+ if (mdt_object_remote(obj)) {
+ rc = -EREMOTE;
+ /**
+ * before calling version get the correct MDS should be
+ * fid, this is error to find remote object here
+ */
+ CERROR("nonlocal object "DFID"\n", PFID(fid));
+ } else if (!mdt_object_exists(obj)) {
+ *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
+ rc = -ENOENT;
+ } else {
+ version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
+ *(__u64 *)data->ioc_inlbuf2 = version;
+ rc = 0;
+ }
+ mdt_object_unlock_put(mti, obj, lh, 1);
+ RETURN(rc);
}
/* ioctls on obd dev */
return lu_object_exists(&o->mot_obj.mo_lu);
}
+static inline int mdt_object_remote(const struct mdt_object *o)
+{
+ return lu_object_remote(&o->mot_obj.mo_lu);
+}
+
static inline const struct lu_fid *mdt_object_fid(const struct mdt_object *o)
{
return lu_object_fid(&o->mot_obj.mo_lu);
if (IS_ERR(obj))
GOTO(out, rc = PTR_ERR(obj));
- if (mdt_object_exists(obj) <= 0)
+ if (!mdt_object_exists(obj) || mdt_object_remote(obj))
GOTO(out, rc = -ENOENT);
child = mdt_object_child(obj);
mdt_export_evict(exp);
RETURN_EXIT;
}
- rc = mdt_object_exists(child);
- if (rc > 0) {
-
- mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA);
- rc = mdt_attr_get_complex(info, child, ma);
- if (rc == 0)
- rc = mdt_finish_open(info, parent, child,
- flags, 1, ldlm_rep);
- } else if (rc < 0) {
- /* the child object was created on remote server */
- repbody->fid1 = *rr->rr_fid2;
- repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
- rc = 0;
- } else if (rc == 0) {
- /* the child does not exist, we should do regular open */
- mdt_object_put(env, parent);
- mdt_object_put(env, child);
- GOTO(regular_open, 0);
- }
+
+ if (unlikely(mdt_object_remote(child))) {
+ /* the child object was created on remote server */
+ repbody->fid1 = *rr->rr_fid2;
+ repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+ rc = 0;
+ } else {
+ if (mdt_object_exists(child)) {
+ mdt_set_capainfo(info, 1, rr->rr_fid2,
+ BYPASS_CAPA);
+ rc = mdt_attr_get_complex(info, child, ma);
+ if (rc == 0)
+ rc = mdt_finish_open(info, parent,
+ child, flags,
+ 1, ldlm_rep);
+ } else {
+ /* the child does not exist, we should do
+ * regular open */
+ mdt_object_put(env, parent);
+ mdt_object_put(env, child);
+ GOTO(regular_open, 0);
+ }
+ }
mdt_object_put(env, parent);
mdt_object_put(env, child);
GOTO(out, rc);
if (IS_ERR(o))
RETURN(rc = PTR_ERR(o));
- rc = mdt_object_exists(o);
- if (rc > 0) {
- mdt_set_disposition(info, rep, (DISP_IT_EXECD |
- DISP_LOOKUP_EXECD |
- DISP_LOOKUP_POS));
-
- rc = mdt_attr_get_complex(info, o, ma);
- if (rc == 0)
- rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
- } else if (rc == 0) {
- rc = -ENOENT;
- } else {
+ if (unlikely(mdt_object_remote(o))) {
/* the child object was created on remote server */
struct mdt_body *repbody;
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
repbody->fid1 = *rr->rr_fid2;
repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
rc = 0;
- }
+ } else {
+ if (mdt_object_exists(o)) {
+ mdt_set_disposition(info, rep, (DISP_IT_EXECD |
+ DISP_LOOKUP_EXECD |
+ DISP_LOOKUP_POS));
+
+ rc = mdt_attr_get_complex(info, o, ma);
+ if (rc == 0)
+ rc = mdt_finish_open(info, NULL, o, flags, 0,
+ rep);
+ } else {
+ rc = -ENOENT;
+ }
+ }
mdt_object_put(info->mti_env, o);
RETURN(rc);
if (IS_ERR(o))
RETURN(rc = PTR_ERR(o));
- rc = mdt_object_exists(o);
- if (rc == 0) {
- mdt_set_disposition(info, rep, (DISP_LOOKUP_EXECD |
- DISP_LOOKUP_NEG));
- GOTO(out, rc = -ENOENT);
- } else if (rc < 0) {
- CERROR("NFS remote open shouldn't happen.\n");
- GOTO(out, rc);
- }
- mdt_set_disposition(info, rep, (DISP_IT_EXECD |
- DISP_LOOKUP_EXECD));
+ if (mdt_object_remote(o)) {
+ CDEBUG(D_INFO, "%s: "DFID" is on remote MDT.\n",
+ info->mti_mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name,
+ PFID(rr->rr_fid2));
+ GOTO(out, rc = -EREMOTE);
+ } else if (!mdt_object_exists(o)) {
+ mdt_set_disposition(info, rep, (DISP_LOOKUP_EXECD |
+ DISP_LOOKUP_NEG));
+ GOTO(out, rc = -ENOENT);
+ }
+
+ mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
rc = mdt_attr_get_complex(info, o, ma);
if (rc)
if (IS_ERR(o))
RETURN(rc = PTR_ERR(o));
- rc = mdt_object_exists(o);
- if (rc > 0) {
- /* Do permission check for cross-open. */
- rc = mo_permission(info->mti_env, NULL, mdt_object_child(o),
- NULL, flags | MDS_OPEN_CROSS);
- if (rc)
- goto out;
-
- mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
- rc = mdt_attr_get_complex(info, o, ma);
- if (rc == 0)
- rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
- } else if (rc == 0) {
- /*
- * Something is wrong here. lookup was positive but there is
- * no object!
- */
- CERROR("Cross-ref object doesn't exist!\n");
- rc = -EFAULT;
- } else {
- /* Something is wrong here, the object is on another MDS! */
- CERROR("The object isn't on this server! FLD error?\n");
- LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
- &o->mot_obj.mo_lu,
- "Object isn't on this server! FLD error?\n");
-
+ if (mdt_object_remote(o)) {
+ /* Something is wrong here, the object is on another MDS! */
+ CERROR("%s: "DFID" isn't on this server!: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
+ LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
+ &o->mot_obj.mo_lu,
+ "Object isn't on this server! FLD error?\n");
rc = -EFAULT;
+ } else {
+ if (mdt_object_exists(o)) {
+ /* Do permission check for cross-open. */
+ rc = mo_permission(info->mti_env, NULL,
+ mdt_object_child(o),
+ NULL, flags | MDS_OPEN_CROSS);
+ if (rc)
+ goto out;
+
+ mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
+ rc = mdt_attr_get_complex(info, o, ma);
+ if (rc == 0)
+ rc = mdt_finish_open(info, NULL, o, flags, 0,
+ rep);
+ } else {
+ /*
+ * Something is wrong here. lookup was positive but
+ * there is no object!
+ */
+ CERROR("%s: "DFID" doesn't exist!: rc = %d\n",
+ mdt_obd_name(info->mti_mdt), PFID(fid), -EFAULT);
+ rc = -EFAULT;
+ }
}
-
out:
mdt_object_put(info->mti_env, o);
RETURN(rc);
}
created = 1;
} else {
- /* We have to get attr & LOV EA, HSM bits for this object */
- ma->ma_need |= MA_HSM;
- result = mdt_attr_get_complex(info, child, ma);
/*
* The object is on remote node, return its FID for remote open.
*/
- if (result == -EREMOTE) {
+ if (mdt_object_remote(child)) {
/*
* Check if this lock already was sent to client and
* this is resent case. For resent case do not take lock
else
result = -EREMOTE;
GOTO(out_child, result);
- }
+ } else {
+ if (mdt_object_exists(child)) {
+ /* We have to get attr & LOV EA & HSM for this
+ * object */
+ ma->ma_need |= MA_HSM;
+ result = mdt_attr_get_complex(info, child, ma);
+ } else {
+ /*object non-exist!!!*/
+ LBUG();
+ }
+ }
}
LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
if (rc)
return rc;
- if (mti->mti_mos != NULL)
+ /* we probably should not set local transno to the remote object
+ * on another storage, What about VBR on remote object? XXX */
+ if (mti->mti_mos != NULL && !mdt_object_remote(mti->mti_mos))
rc = dt_declare_version_set(env, mdt_obj2dt(mti->mti_mos), th);
return rc;
LASSERT(req != NULL && req->rq_repmsg != NULL);
/** VBR: set new versions */
- if (txn->th_result == 0 && mti->mti_mos != NULL) {
+ /* we probably should not set local transno to the remote object
+ * on another storage, What about VBR on remote object? XXX */
+ if (txn->th_result == 0 && mti->mti_mos != NULL &&
+ !mdt_object_remote(mti->mti_mos)) {
+
dt_version_set(env, mdt_obj2dt(mti->mti_mos),
mti->mti_transno, txn);
mti->mti_mos = NULL;
struct mdt_object *o, __u64 *version)
{
LASSERT(o);
- if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
+ if (mdt_object_exists(o) && !mdt_object_remote(o) &&
+ !mdt_object_obf(o))
*version = dt_version_get(info->mti_env, mdt_obj2dt(o));
else
*version = ENOENT_VERSION;
if (likely(!IS_ERR(child))) {
struct md_object *next = mdt_object_child(parent);
- if (mdt_object_exists(child) < 0) {
+ if (mdt_object_remote(child)) {
struct seq_server_site *ss;
struct lu_ucred *uc = mdt_ucred(info);
int rc;
ENTRY;
- /* attr shouldn't be set on remote object */
- LASSERT(mdt_object_exists(mo) >= 0);
+ /* attr shouldn't be set on remote object */
+ LASSERT(!mdt_object_remote(mo));
lh = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_reg_init(lh, LCK_PW);
parent_lh = &info->mti_lh[MDT_LH_PARENT];
lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
- if (mdt_object_exists(mp) < 0) {
+ if (mdt_object_remote(mp)) {
mdt_lock_reg_init(parent_lh, LCK_EX);
rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
parent_lh->mlh_rreg_mode,
child_lh = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_reg_init(child_lh, LCK_EX);
- if (mdt_object_exists(mc) < 0) {
+ if (mdt_object_remote(mc)) {
struct mdt_body *repbody;
if (!fid_is_zero(rr->rr_fid2)) {
if (IS_ERR(ms))
GOTO(out_unlock_parent, rc = PTR_ERR(ms));
- if (mdt_object_exists(ms) < 0) {
+ if (mdt_object_remote(ms)) {
mdt_object_put(info->mti_env, ms);
CERROR("Target directory "DFID" is on another MDT\n",
PFID(rr->rr_fid1));
if (rc)
GOTO(out_put_target, rc);
- rc = mdt_object_exists(mtgtdir);
- if (rc == 0) {
- GOTO(out_put_target, rc = -ESTALE);
- } else if (rc > 0) {
- /* we lock the target dir if it is local */
- rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
- MDS_INODELOCK_UPDATE,
- MDT_LOCAL_LOCK);
- if (rc != 0)
- GOTO(out_put_target, rc);
- /* get and save correct version after locking */
- mdt_version_get_save(info, mtgtdir, 1);
- } else if (rc < 0) {
- CERROR("Source dir "DFID" target dir "DFID
+ if (unlikely(mdt_object_remote(mtgtdir))) {
+ CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID
"on different MDTs\n", PFID(rr->rr_fid1),
PFID(rr->rr_fid2));
GOTO(out_put_target, rc = -EXDEV);
+ } else {
+ if (likely(mdt_object_exists(mtgtdir))) {
+ /* we lock the target dir if it is local */
+ rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
+ MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
+ if (rc != 0)
+ GOTO(out_put_target, rc);
+ /* get and save correct version after locking */
+ mdt_version_get_save(info, mtgtdir, 1);
+ } else {
+ GOTO(out_put_target, rc = -ESTALE);
+ }
}
- }
+ }
/* step 3: find & lock the old object. */
lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
if (IS_ERR(mold))
GOTO(out_unlock_target, rc = PTR_ERR(mold));
- if (mdt_object_exists(mold) < 0) {
+ if (mdt_object_remote(mold)) {
mdt_object_put(info->mti_env, mold);
- CERROR("Source child "DFID" is on another MDT\n", PFID(old_fid));
+ CDEBUG(D_INFO, "Source child "DFID" is on another MDT\n",
+ PFID(old_fid));
GOTO(out_unlock_target, rc = -EXDEV);
}
GOTO(out_unlock_old, rc = -EPERM);
}
- if (mdt_object_exists(mnew) < 0) {
+ if (mdt_object_remote(mnew)) {
mdt_object_put(info->mti_env, mnew);
- CERROR("Source child "DFID" is on another MDT\n",
+ CDEBUG(D_INFO, "src child "DFID" is on another MDT\n",
PFID(new_fid));
GOTO(out_unlock_old, rc = -EXDEV);
}
lgi->lgi_off = idx * sizeof(*idarray);
lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+
o = dt_locate(env, d, &lgi->lgi_fid);
if (IS_ERR(o))
RETURN(PTR_ERR(o));
lgi->lgi_off = idx * sizeof(*idarray);
lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+
o = dt_locate(env, d, &lgi->lgi_fid);
if (IS_ERR(o))
RETURN(PTR_ERR(o));
osd->od_ost_map = NULL;
}
-int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh)
+int osd_add_to_agent(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, struct osd_thandle *oh)
{
struct osd_mdobj_map *omm = osd->od_mdt_map;
struct osd_thread_info *oti = osd_oti_get(env);
- char *name_buf = oti->oti_name;
+ char *name = oti->oti_name;
struct dentry *agent;
struct dentry *parent;
int rc;
parent = omm->omm_agent_dentry;
- sprintf(name_buf, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+ sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
agent = osd_child_dentry_by_inode(env, parent->d_inode,
- name_buf, strlen(name_buf));
+ name, strlen(name));
mutex_lock(&parent->d_inode->i_mutex);
rc = osd_ldiskfs_add_entry(oh->ot_handle, agent, obj->oo_inode, NULL);
+ LASSERTF(parent->d_inode->i_nlink > 1, "%s: agent inode nlink %d",
+ osd_name(osd), parent->d_inode->i_nlink);
parent->d_inode->i_nlink++;
mark_inode_dirty(parent->d_inode);
mutex_unlock(&parent->d_inode->i_mutex);
- if (rc != 0)
- CERROR("%s: "DFID" add agent error: rc = %d\n", osd_name(osd),
- PFID(lu_object_fid(&obj->oo_dt.do_lu)), rc);
RETURN(rc);
}
-int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh)
+int osd_delete_from_agent(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, struct osd_thandle *oh)
{
struct osd_mdobj_map *omm = osd->od_mdt_map;
struct osd_thread_info *oti = osd_oti_get(env);
int rc;
parent = omm->omm_agent_dentry;
- sprintf(name, DFID, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
+ sprintf(name, DFID_NOBRACE, PFID(lu_object_fid(&obj->oo_dt.do_lu)));
agent = osd_child_dentry_by_inode(env, parent->d_inode,
name, strlen(name));
mutex_lock(&parent->d_inode->i_mutex);
RETURN(-ENOENT);
}
rc = ldiskfs_delete_entry(oh->ot_handle, parent->d_inode, de, bh);
+ LASSERTF(parent->d_inode->i_nlink > 1, "%s: agent inode nlink %d",
+ osd_name(osd), parent->d_inode->i_nlink);
parent->d_inode->i_nlink--;
mark_inode_dirty(parent->d_inode);
mutex_unlock(&parent->d_inode->i_mutex);
const struct lu_fid *fid, struct osd_inode_id *id)
{
struct dentry *root;
- struct dentry *dentry;
- struct inode *inode;
- char *name;
- int rc = -ENOENT;
+ struct dentry *dentry;
+ struct inode *inode;
+ char *name;
+ int rc = -ENOENT;
ENTRY;
if (fid_is_last_id(fid)) {
{
struct osd_object *obj = osd_dt_obj(dt);
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LINVRNT(osd_invariant(obj));
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
int rc;
LASSERT(handle != NULL);
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(osd_invariant(obj));
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return rc;
}
-
+/*
+ * Concurrency: no external locking is necessary.
+ */
static int osd_declare_object_create(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
ENTRY;
LINVRNT(osd_invariant(obj));
- LASSERT(!dt_object_exists(dt));
+ LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
if (result == 0)
result = __osd_oi_insert(env, obj, fid, th);
- LASSERT(ergo(result == 0, dt_object_exists(dt)));
+ LASSERT(ergo(result == 0,
+ dt_object_exists(dt) && !dt_object_remote(dt)));
+
LASSERT(osd_invariant(obj));
RETURN(result);
}
/* it will check/delete the agent inode for every dir
* destory, how to optimize it? unlink performance
* impaction XXX */
- result = osd_delete_agent_inode(env, osd, obj, oh);
+ result = osd_delete_from_agent(env, osd, obj, oh);
if (result != 0 && result != -ENOENT) {
CERROR("%s: delete agent inode "DFID": rc = %d\n",
osd_name(osd), PFID(fid), result);
ENTRY;
LASSERT(osd_invariant(obj));
- LASSERT(!dt_object_exists(dt));
+ LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
if (result == 0)
result = __osd_oi_insert(env, obj, fid, th);
- LASSERT(ergo(result == 0, dt_object_exists(dt)));
+ LASSERT(ergo(result == 0,
+ dt_object_exists(dt) && !dt_object_remote(dt)));
LINVRNT(osd_invariant(obj));
RETURN(result);
}
struct inode *inode = obj->oo_inode;
LINVRNT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
{
struct osd_thandle *oh;
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(handle != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
struct inode *inode = obj->oo_inode;
LINVRNT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
return sizeof(dt_obj_version_t);
}
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
struct osd_thread_info *info = osd_oti_get(env);
struct dentry *dentry = &info->oti_obj_dentry;
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
{
struct osd_thandle *oh;
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(handle != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
struct dentry *dentry = &info->oti_obj_dentry;
int rc;
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
LASSERT(handle != NULL);
if (!dev->od_fl_capa)
RETURN(ERR_PTR(-ENOENT));
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LINVRNT(osd_invariant(obj));
/* renewal sanity check */
struct osd_object *obj = osd_dt_obj(dt);
LINVRNT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
if (osd_object_is_root(obj)) {
dt->do_index_ops = &osd_index_ea_ops;
result = 0;
} else if (feat == &dt_directory_features) {
dt->do_index_ops = &osd_index_ea_ops;
- if (S_ISDIR(obj->oo_inode->i_mode))
+ if (obj->oo_inode != NULL && S_ISDIR(obj->oo_inode->i_mode))
result = 0;
else
result = -ENOTDIR;
ENTRY;
LINVRNT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(bag->ic_object == obj->oo_inode);
LASSERT(handle != NULL);
int rc;
ENTRY;
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(handle != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
ENTRY;
LINVRNT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(handle != NULL);
osd_trans_exec_op(env, handle, OSD_OT_DELETE);
if (rc != 0)
GOTO(out, rc);
+ /* For inode on the remote MDT, .. will point to
+ * /Agent directory. So do not try to lookup/delete
+ * remote inode for .. */
+ if (strcmp((char *)key, dotdot) == 0)
+ GOTO(out, rc = 0);
+
LASSERT(de != NULL);
rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid);
if (rc == 0 && osd_remote_fid(env, osd, fid)) {
ENTRY;
LASSERT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(bag->ic_object == obj->oo_inode);
if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
{
struct osd_thandle *oh;
- LASSERT(dt_object_exists(dt));
LASSERT(handle != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
ENTRY;
LINVRNT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(bag->ic_object == obj->oo_inode);
LASSERT(th != NULL);
int rc;
ENTRY;
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(handle != NULL);
oh = container_of0(handle, struct osd_thandle, ot_super);
ENTRY;
LASSERT(osd_invariant(obj));
- LASSERT(dt_object_exists(dt));
+ LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
LASSERT(th != NULL);
osd_trans_exec_op(env, th, OSD_OT_INSERT);
/* If parent on remote MDT, we need put this object
* under AGENT */
oh = container_of(th, typeof(*oh), ot_super);
- rc = osd_create_agent_inode(env, osd, obj, oh);
+ rc = osd_add_to_agent(env, osd, obj, oh);
if (rc != 0) {
CERROR("%s: add agent "DFID" error: rc = %d\n",
osd_name(osd),
struct dentry *osd_agent_load(const struct osd_device *osd, int mdt_index,
int create);
-int osd_delete_agent_inode(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh);
-int osd_create_agent_inode(const struct lu_env *env, struct osd_device *osd,
- struct osd_object *obj, struct osd_thandle *oh);
+int osd_delete_from_agent(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, struct osd_thandle *oh);
+int osd_add_to_agent(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj, struct osd_thandle *oh);
/* osd_quota_fmt.c */
int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
RETURN(-EINVAL);
}
m->opd_index = idx;
+ m->opd_group = 0;
idx = tgt - src;
} else {
/* New OSC name fsname-OSTXXXX-osc-MDTXXXX */
RETURN(-EINVAL);
}
+ idx = simple_strtol(tgt + 4, &mdt, 16);
+ if (*mdt != '\0' || idx > INT_MAX || idx < 0) {
+ CERROR("%s: invalid OST index in '%s'\n",
+ m->opd_obd->obd_name, src);
+ RETURN(-EINVAL);
+ }
+
+ /* Get MDT index from the name and set it to opd_group,
+ * which will be used by OSP to connect with OST */
+ m->opd_group = idx;
if (tgt - src <= 12) {
- CERROR("%s: invalid target name %s\n",
+ CERROR("%s: invalid mdt index retrieve from %s\n",
m->opd_obd->obd_name, lustre_cfg_string(cfg, 0));
RETURN(-EINVAL);
}
struct dt_device opd_dt_dev;
/* corresponded OST index */
int opd_index;
+
+ /* corrsponded MDT index, which will be used when connecting to OST
+ * for validating the connection (see ofd_parse_connect_data) */
+ int opd_group;
/* device used to store persistent state (llogs, last ids) */
struct obd_export *opd_storage_exp;
struct dt_device *opd_storage;