static int mdt_mfd_open(struct mdt_thread_info *info,
struct mdt_object *p,
struct mdt_object *o,
- int flags, int created)
+ int flags,
+ int created,
+ struct ldlm_reply *rep)
{
struct mdt_export_data *med;
struct mdt_file_data *mfd;
struct md_attr *ma = &info->mti_attr;
struct lu_attr *la = &ma->ma_attr;
struct ptlrpc_request *req = mdt_info_req(info);
- struct ldlm_reply *ldlm_rep;
int rc = 0;
int isreg, isdir, islnk;
ENTRY;
- repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ LASSERT(ma->ma_valid & MA_INODE);
- if (!created) {
- /* we have to get attr & lov ea for this object*/
- ma->ma_need = MA_INODE | MA_LOV;
- rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
- if (rc)
- RETURN(rc);
- }
+ repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ repbody->eadatasize = 0;
+ repbody->aclsize = 0;
isreg = S_ISREG(la->la_mode);
isdir = S_ISDIR(la->la_mode);
islnk = S_ISLNK(la->la_mode);
- if (ma->ma_valid & MA_INODE)
- mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
+ mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
- /* we need to return the existing object's fid back, so it is done
- * here, after preparing the reply */
- if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
- RETURN(-EEXIST);
-
- /* if we are following a symlink, don't open;
+ /* if we are following a symlink, don't open; and
* do not return open handle for special nodes as client required
*/
if (islnk || (!isreg && !isdir &&
info->mti_trans_flags |= MDT_NONEED_TRANSNO;
RETURN(0);
}
+
+ intent_set_disposition(rep, DISP_OPEN_OPEN);
+ /* we need to return the existing object's fid back, so it is done
+ * here, after preparing the reply */
+ if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
+ RETURN(-EEXIST);
+
/* This can't be done earlier, we need to return reply body */
if (isdir) {
if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
&RMF_MDT_MD,
RCL_SERVER);
- LASSERT(p != NULL);
+ /* TODO: handle REPLAY (p == NULL)*/
rc = mdt_create_data(info, p, o);
if (rc)
RETURN(rc);
}
+
CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
ma->ma_valid, ma->ma_lmm_size);
- repbody->eadatasize = 0;
- repbody->aclsize = 0;
if (ma->ma_valid & MA_LOV) {
LASSERT(ma->ma_lmm_size);
* did not get ACL before shrink. */
mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
- ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
- intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
if (flags & FMODE_WRITE) {
/* FIXME: in recovery, need to pass old epoch here */
if (rc)
RETURN(rc);
- /* (1) client wants transno when open to keep a ref count for replay;
- * see after_reply() and mdc_close_commit();
- * (2) we need to record the transaction related stuff onto disk;
- * The question is: when do a read_only open, do we still need transno?
- */
- if (!created) {
- struct txn_param txn;
- struct thandle *th;
- struct dt_device *dt = info->mti_mdt->mdt_bottom;
- txn.tp_credits = 1;
-
- LASSERT(dt);
- th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
- if (!IS_ERR(th))
- dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
- else
- RETURN(PTR_ERR(th));
- }
-
mfd = mdt_mfd_new();
if (mfd != NULL) {
/* keep a reference on this object for this open,
RETURN(rc);
}
-int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
- __u32 flags)
+int mdt_open_replay(struct mdt_thread_info* info, const struct lu_fid *fid,
+ struct ldlm_reply *rep, __u32 flags)
{
+ struct md_attr *ma = &info->mti_attr;
struct mdt_object *o;
int rc;
ENTRY;
o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
- if (!IS_ERR(o)) {
- if (lu_object_exists(&o->mot_obj.mo_lu) > 0) {
- if (flags & MDS_OPEN_EXCL &&
- flags & MDS_OPEN_CREAT)
+ if (IS_ERR(o))
+ RETURN(rc = PTR_ERR(o));
+
+ rc = lu_object_exists(&o->mot_obj.mo_lu);
+
+ if (rc > 0) {
+ if (flags & MDS_OPEN_EXCL && flags & MDS_OPEN_CREAT)
rc = -EEXIST;
+ else {
+ struct txn_param txn;
+ struct thandle *th;
+ struct dt_device *dt = info->mti_mdt->mdt_bottom;
+ txn.tp_credits = 1;
+
+ LASSERT(dt);
+ th = dt->dd_ops->dt_trans_start(info->mti_ctxt,
+ dt, &txn);
+ if (!IS_ERR(th)) {
+ dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
+ rc = mo_attr_get(info->mti_ctxt,
+ mdt_object_child(o), ma);
+ }
else
- rc = mdt_mfd_open(info, NULL, o, flags, 0);
- } else {
- rc = -ENOENT;
- if (flags & MDS_OPEN_CREAT) {
- info->mti_attr.ma_need = MA_INODE | MA_LOV;
- rc = mo_object_create(info->mti_ctxt,
- mdt_object_child(o),
- &info->mti_spec,
- &info->mti_attr);
- if (rc == 0)
- rc = mdt_mfd_open(info, NULL, o,
- flags, 1);
+ rc = PTR_ERR(th);
+ if (rc == 0) {
+ intent_set_disposition(rep, DISP_LOOKUP_EXECD);
+ intent_set_disposition(rep, DISP_LOOKUP_POS);
+ intent_set_disposition(rep, DISP_OPEN_OPEN);
+ rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
}
- }
- mdt_object_put(info->mti_ctxt, o);
- } else
- rc = PTR_ERR(o);
-
+ }
+ } else if (rc == 0) {
+ rc = -ENOENT;
+ } else {
+ struct mdt_body *repbody;
+ repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ repbody->fid1 = *fid;
+ repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+ rc = 0;
+ }
+ mdt_object_put(info->mti_ctxt, o);
RETURN(rc);
}
int mdt_pin(struct mdt_thread_info* info)
{
- struct mdt_body *body;
- int rc;
ENTRY;
-
- rc = req_capsule_pack(&info->mti_pill);
- if (rc == 0) {
- body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
- rc = mdt_open_by_fid(info, &body->fid1, body->flags);
- }
- RETURN(rc);
+ RETURN(-EOPNOTSUPP);
}
-int mdt_reint_open(struct mdt_thread_info *info)
+int mdt_open(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
struct mdt_object *parent;
ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
ma->ma_lmm_size = mdt->mdt_max_mdsize;
+ ma->ma_need = MA_INODE | MA_LOV;
- if (rr->rr_name[0] == 0) {
- /* reint partial remote open */
- result = mdt_open_by_fid(info, rr->rr_fid1, create_flags);
- RETURN(result);
- }
-
- /* we now have no resent message, so it must be an intent */
- /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
CDEBUG(D_INODE, "I am going to create "DFID"/("DFID":%s) "
if (result == -ENOENT) {
/* not found and with MDS_OPEN_CREAT: let's create it */
- ma->ma_need = MA_INODE | MA_LOV;
+ intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
result = mdo_create(info->mti_ctxt,
mdt_object_child(parent),
rr->rr_name,
mdt_object_child(child),
&info->mti_spec,
&info->mti_attr);
- intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
if (result != 0)
GOTO(out_child, result);
created = 1;
+ } else {
+ /* (1) client wants transno when open to keep a ref count for replay;
+ * see after_reply() and mdc_close_commit();
+ * (2) we need to record the transaction related stuff onto disk;
+ * The question is: when do a read_only open, do we still need transno?
+ */
+ struct txn_param txn;
+ struct thandle *th;
+ struct dt_device *dt = info->mti_mdt->mdt_bottom;
+ txn.tp_credits = 1;
+
+ LASSERT(dt);
+ th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
+ if (!IS_ERR(th)) {
+ dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
+ /* we have to get attr & lov ea for this object*/
+ result = mo_attr_get(info->mti_ctxt,
+ mdt_object_child(child), ma);
+ if (result)
+ GOTO(out_child, result);
+ } else
+ GOTO(out_child, result = PTR_ERR(th));
}
/* Try to open it now. */
- result = mdt_mfd_open(info, parent, child, create_flags, created);
+ result = mdt_mfd_open(info, parent, child, create_flags,
+ created, ldlm_rep);
GOTO(finish_open, result);
finish_open: