RETURN(rc);
}
+extern void mdt_req_from_mcd(struct ptlrpc_request *req,
+ struct mdt_client_data *mcd);
+
+
+/* TODO: much of the code here can be shared with mdt_open()
+ * and mdt_mfd_open(). This will be done later. :-)
+ */
+void mdt_reconstruct_open(struct mdt_thread_info *info)
+{
+ struct lu_fid *child_fid = &info->mti_tmp_fid1;
+ __u32 flags = info->mti_spec.sp_cr_flags;
+ struct req_capsule *pill = &info->mti_pill;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+ struct mdt_client_data *mcd = med->med_mcd;
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_attr *la = &ma->ma_attr;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_reint_record *rr = &info->mti_rr;
+ const struct lu_context *ctxt = info->mti_ctxt;
+ struct mdt_object *parent;
+ struct mdt_object *child;
+ struct ldlm_reply *ldlm_rep;
+ struct mdt_body *repbody;
+ struct mdt_file_data *mfd;
+ struct list_head *h;
+ int result;
+ int isreg, isdir, islnk;
+ ENTRY;
+
+ LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
+
+ mdt_req_from_mcd(req, med->med_mcd);
+ mdt_set_disposition(info, ldlm_rep, le32_to_cpu(mcd->mcd_last_data));
+
+ if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) && req->rq_status) {
+ /* we did not create successfully, return error to client. */
+ EXIT;
+ return;
+ }
+
+ mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
+ mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
+ parent = mdt_object_find(ctxt, mdt, rr->rr_fid1);
+ if (IS_ERR(parent)) {
+ /* FIXME: should this be assert? */
+ CERROR("Cannot find parent "DFID" while reconstruct open\n",
+ PFID(rr->rr_fid1));
+ GOTO(out, result = PTR_ERR(parent));
+ }
+
+ result = mdo_lookup(ctxt, mdt_object_child(parent),
+ rr->rr_name, child_fid);
+ if (result == 0)
+ mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+ else {
+ if (result == -ENOENT || result == -ESTALE) {
+ mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+ result = -ENOENT;
+ }
+ GOTO(out_parent, result);
+ }
+
+ LASSERT(lu_fid_eq(rr->rr_fid2, child_fid));
+ child = mdt_object_find(ctxt, mdt, child_fid);
+ if (IS_ERR(child))
+ GOTO(out_parent, result = PTR_ERR(child));
+
+ ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
+ ma->ma_lmm_size = mdt->mdt_max_mdsize;
+ ma->ma_need = MA_INODE | MA_LOV;
+ repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
+ repbody->eadatasize = 0;
+ repbody->aclsize = 0;
+
+ result = mo_attr_get(ctxt, mdt_object_child(child), ma);
+ if (result == -EREMOTE) {
+ /* the object is on remote node
+ * return its FID for remote open */
+ repbody->fid1 = *mdt_object_fid(child);
+ repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+ GOTO(out_child, result = 0);
+ } else if(result != 0)
+ GOTO(out_child, result);
+
+ LASSERT(ma->ma_valid & MA_INODE);
+ isreg = S_ISREG(la->la_mode);
+ isdir = S_ISDIR(la->la_mode);
+ islnk = S_ISLNK(la->la_mode);
+ mdt_pack_attr2body(repbody, la, mdt_object_fid(child));
+
+ /* if we are following a symlink, don't open; and
+ * do not return open handle for special nodes as client required
+ */
+ if (islnk || (!isreg && !isdir &&
+ (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
+ info->mti_trans_flags |= MDT_NONEED_TRANSNO;
+ GOTO(out_child, 0);
+ }
+
+ mdt_set_disposition(info, ldlm_rep, DISP_OPEN_OPEN);
+ if (!mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
+ (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
+ GOTO(out_child, result = -EEXIST);
+
+ if (isdir) {
+ if (flags & (MDS_OPEN_CREAT | FMODE_WRITE))
+ /* we are trying to create or
+ * write an existing dir. */
+ GOTO(out_child, result = -EISDIR);
+ } else if (flags & MDS_OPEN_DIRECTORY)
+ GOTO(out_child, result = -ENOTDIR);
+
+ /* at this point, regular file should have lov */
+ LASSERT(ergo(isreg, ma->ma_valid & MA_LOV));
+
+ if (ma->ma_valid & MA_LOV) {
+ LASSERT(ma->ma_lmm_size);
+ repbody->eadatasize = ma->ma_lmm_size;
+ if (isdir)
+ repbody->valid |= OBD_MD_FLDIREA;
+ else
+ repbody->valid |= OBD_MD_FLEASIZE;
+ }
+ mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
+
+ if (flags & FMODE_WRITE) {
+ /* FIXME: in recovery, need to pass old epoch here */
+ result = mdt_get_write_access(mdt, child, 0);
+ if (result == 0)
+ repbody->io_epoch = child->mot_io_epoch;
+ } else if (flags & MDS_FMODE_EXEC)
+ result = mdt_deny_write_access(mdt, child);
+ if (result)
+ GOTO(out_child, result);
+
+ result = mo_open(ctxt, mdt_object_child(child), flags);
+ if (result)
+ GOTO(out_child, result);
+
+ mfd = NULL;
+ spin_lock(&med->med_open_lock);
+ list_for_each(h, &med->med_open_head) {
+ mfd = list_entry(h, struct mdt_file_data, mfd_list);
+ if (mfd->mfd_xid == req->rq_xid) {
+ break;
+ }
+ mfd = NULL;
+ }
+ spin_unlock(&med->med_open_lock);
+
+ if (mfd != NULL) {
+ repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+ GOTO(out_child, 0);
+ }
+
+ mfd = mdt_mfd_new();
+ if (mfd != NULL) {
+ mdt_object_get(ctxt, child);
+
+ mfd->mfd_mode = flags;
+ mfd->mfd_object = child;
+ mfd->mfd_xid = req->rq_xid;
+
+ med = &req->rq_export->exp_mdt_data;
+ spin_lock(&med->med_open_lock);
+ list_add(&mfd->mfd_list, &med->med_open_head);
+ spin_unlock(&med->med_open_lock);
+ repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+ } else
+ result = -ENOMEM;
+ EXIT;
+out_child:
+ mdt_object_put(ctxt, child);
+out_parent:
+ mdt_object_put(ctxt, parent);
+out:
+ req->rq_status = result;
+}
+
static int mdt_open_by_fid(struct mdt_thread_info* info,
const struct lu_fid *fid,
struct ldlm_reply *rep,
if (rc > 0) {
/* successfully found the child object */
if (flags & MDS_OPEN_EXCL && flags & MDS_OPEN_CREAT)
- rc = -EEXIST;
+ rc = -EEXIST;
else {
- struct txn_param txn;
- struct thandle *th;
- struct dt_device *dt = info->mti_mdt->mdt_bottom;
- txn.tp_credits = 1;
-
- LASSERT(dt);
- th = dt->dd_ops->dt_trans_start(info->mti_ctxt,
- dt, &txn);
- if (!IS_ERR(th)) {
- dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
- rc = mo_attr_get(info->mti_ctxt,
- mdt_object_child(o), ma);
- }
- else
- rc = PTR_ERR(th);
- if (rc == 0) {
- mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
- mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
- mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
+ const struct lu_context *ctxt = info->mti_ctxt;
+ struct mdt_device *mdt = info->mti_mdt;
+
+ spin_lock(&mdt->mdt_transno_lock);
+ info->mti_transno = ++ mdt->mdt_last_transno;
+ spin_unlock(&mdt->mdt_transno_lock);
+
+ mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
+ mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
+ rc = mo_attr_get(ctxt, mdt_object_child(o), ma);
+ if (rc == 0)
rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
- }
}
} else if (rc == 0) {
rc = -ENOENT;