Whamcloud - gitweb
some cleanup for MDS_OPEN. prepare for open replay handling.
authorhuanghua <huanghua>
Fri, 25 Aug 2006 14:55:45 +0000 (14:55 +0000)
committerhuanghua <huanghua>
Fri, 25 Aug 2006 14:55:45 +0000 (14:55 +0000)
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c

index 7b88226..9a2602f 100644 (file)
@@ -381,7 +381,7 @@ int mdt_lock_new_child(struct mdt_thread_info *info,
                        struct mdt_object *o,
                        struct mdt_lock_handle *child_lockh);
 
-int mdt_reint_open(struct mdt_thread_info *info);
+int mdt_open(struct mdt_thread_info *info);
 
 void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
                    struct mdt_file_data *mfd, struct md_attr *ma);
index 2602974..93841d2 100644 (file)
@@ -177,7 +177,9 @@ int mdt_query_write_access(struct mdt_device *mdt, struct mdt_object *o)
 static int mdt_mfd_open(struct mdt_thread_info *info,
                         struct mdt_object *p,
                         struct mdt_object *o,
-                        int flags, int created)
+                        int flags, 
+                        int created,
+                        struct ldlm_reply *rep)
 {
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
@@ -186,33 +188,22 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         struct md_attr         *ma = &info->mti_attr;
         struct lu_attr         *la = &ma->ma_attr;
         struct ptlrpc_request  *req = mdt_info_req(info);
-        struct ldlm_reply      *ldlm_rep;
         int                     rc = 0;
         int                     isreg, isdir, islnk;
         ENTRY;
 
-        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        LASSERT(ma->ma_valid & MA_INODE);
 
-        if (!created) {
-                /* we have to get attr & lov ea for this object*/
-                ma->ma_need = MA_INODE | MA_LOV;
-                rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
-                if (rc)
-                        RETURN(rc);
-        }
+        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        repbody->eadatasize = 0;
+        repbody->aclsize = 0;
 
         isreg = S_ISREG(la->la_mode);
         isdir = S_ISDIR(la->la_mode);
         islnk = S_ISLNK(la->la_mode);
-        if (ma->ma_valid & MA_INODE)
-                mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
+        mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
 
-        /* we need to return the existing object's fid back, so it is done
-         * here, after preparing the reply */
-        if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
-                RETURN(-EEXIST);
-
-        /* if we are following a symlink, don't open;
+        /* if we are following a symlink, don't open; and
          * do not return open handle for special nodes as client required
          */
         if (islnk || (!isreg && !isdir &&
@@ -220,6 +211,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 info->mti_trans_flags |= MDT_NONEED_TRANSNO;
                 RETURN(0);
         }
+
+        intent_set_disposition(rep, DISP_OPEN_OPEN);
+        /* we need to return the existing object's fid back, so it is done
+         * here, after preparing the reply */
+        if (!created && (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
+                RETURN(-EEXIST);
+
         /* This can't be done earlier, we need to return reply body */
         if (isdir) {
                 if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
@@ -237,15 +235,14 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
                                                        &RMF_MDT_MD,
                                                        RCL_SERVER);
-                LASSERT(p != NULL);
+                /* TODO: handle REPLAY (p == NULL)*/
                 rc = mdt_create_data(info, p, o);
                 if (rc)
                         RETURN(rc);
         }
+
         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
                         ma->ma_valid, ma->ma_lmm_size);
-        repbody->eadatasize = 0;
-        repbody->aclsize = 0;
 
         if (ma->ma_valid & MA_LOV) {
                 LASSERT(ma->ma_lmm_size);
@@ -259,8 +256,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
          * did not get ACL before shrink. */
         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
 
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
-        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
 
         if (flags & FMODE_WRITE) {
                 /* FIXME: in recovery, need to pass old epoch here */
@@ -277,25 +272,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         if (rc)
                 RETURN(rc);
         
-        /* (1) client wants transno when open to keep a ref count for replay;
-         *     see after_reply() and mdc_close_commit();
-         * (2) we need to record the transaction related stuff onto disk;
-         * The question is: when do a read_only open, do we still need transno?
-         */
-        if (!created) {
-                struct txn_param txn;
-                struct thandle *th;
-                struct dt_device *dt = info->mti_mdt->mdt_bottom;
-                txn.tp_credits = 1;
-
-                LASSERT(dt);
-                th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
-                if (!IS_ERR(th))
-                        dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
-                else
-                        RETURN(PTR_ERR(th));
-        }
-
         mfd = mdt_mfd_new();
         if (mfd != NULL) {
                 /* keep a reference on this object for this open,
@@ -319,56 +295,66 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
-int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
-                    __u32 flags)
+int mdt_open_replay(struct mdt_thread_info* info, const struct lu_fid *fid,
+                    struct ldlm_reply *rep, __u32 flags)
 {
+        struct md_attr    *ma = &info->mti_attr;
         struct mdt_object *o;
         int                rc;
         ENTRY;
 
         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
-        if (!IS_ERR(o)) {
-                if (lu_object_exists(&o->mot_obj.mo_lu) > 0) {
-                        if (flags & MDS_OPEN_EXCL &&
-                            flags & MDS_OPEN_CREAT)
+        if (IS_ERR(o)) 
+                RETURN(rc = PTR_ERR(o));
+
+        rc = lu_object_exists(&o->mot_obj.mo_lu);
+
+        if (rc > 0) {
+                if (flags & MDS_OPEN_EXCL && flags & MDS_OPEN_CREAT)
                                 rc = -EEXIST;
+                else {
+                        struct txn_param txn;
+                        struct thandle *th;
+                        struct dt_device *dt = info->mti_mdt->mdt_bottom;
+                        txn.tp_credits = 1;
+
+                        LASSERT(dt);
+                        th = dt->dd_ops->dt_trans_start(info->mti_ctxt, 
+                                                        dt, &txn);
+                        if (!IS_ERR(th)) {
+                                dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
+                                rc = mo_attr_get(info->mti_ctxt, 
+                                                 mdt_object_child(o), ma);
+                        }
                         else
-                                rc = mdt_mfd_open(info, NULL, o, flags, 0);
-                } else {
-                        rc = -ENOENT;
-                        if (flags & MDS_OPEN_CREAT) {
-                                info->mti_attr.ma_need = MA_INODE | MA_LOV;
-                                rc = mo_object_create(info->mti_ctxt,
-                                                      mdt_object_child(o),
-                                                      &info->mti_spec,
-                                                      &info->mti_attr);
-                                if (rc == 0)
-                                        rc = mdt_mfd_open(info, NULL, o, 
-                                                          flags, 1);
+                                rc = PTR_ERR(th);
+                        if (rc == 0) {
+                                intent_set_disposition(rep, DISP_LOOKUP_EXECD);
+                                intent_set_disposition(rep, DISP_LOOKUP_POS);
+                                intent_set_disposition(rep, DISP_OPEN_OPEN);
+                                rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
                         }
-                }
-                mdt_object_put(info->mti_ctxt, o);
-        } else
-                rc = PTR_ERR(o);
-
+                } 
+        } else if (rc == 0) {
+                rc = -ENOENT;
+        } else  {
+                struct mdt_body *repbody;
+                repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+                repbody->fid1 = *fid;
+                repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+                rc = 0;
+        }
+        mdt_object_put(info->mti_ctxt, o);
         RETURN(rc);
 }
 
 int mdt_pin(struct mdt_thread_info* info)
 {
-        struct mdt_body *body;
-        int rc;
         ENTRY;
-
-        rc = req_capsule_pack(&info->mti_pill);
-        if (rc == 0) {
-                body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
-                rc = mdt_open_by_fid(info, &body->fid1, body->flags);
-        }
-        RETURN(rc);
+        RETURN(-EOPNOTSUPP);
 }
 
-int mdt_reint_open(struct mdt_thread_info *info)
+int mdt_open(struct mdt_thread_info *info)
 {
         struct mdt_device      *mdt = info->mti_mdt;
         struct mdt_object      *parent;
@@ -393,15 +379,8 @@ int mdt_reint_open(struct mdt_thread_info *info)
 
         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
         ma->ma_lmm_size = mdt->mdt_max_mdsize;
+        ma->ma_need = MA_INODE | MA_LOV;
 
-        if (rr->rr_name[0] == 0) {
-                /* reint partial remote open */
-                result = mdt_open_by_fid(info, rr->rr_fid1, create_flags);
-                RETURN(result);
-        }
-
-        /* we now have no resent message, so it must be an intent */
-        /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
 
         CDEBUG(D_INODE, "I am going to create "DFID"/("DFID":%s) "
@@ -451,21 +430,43 @@ int mdt_reint_open(struct mdt_thread_info *info)
 
         if (result == -ENOENT) {
                 /* not found and with MDS_OPEN_CREAT: let's create it */
-                ma->ma_need = MA_INODE | MA_LOV;
+                intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
                 result = mdo_create(info->mti_ctxt,
                                     mdt_object_child(parent),
                                     rr->rr_name,
                                     mdt_object_child(child),
                                     &info->mti_spec,
                                     &info->mti_attr);
-                intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
                 if (result != 0)
                         GOTO(out_child, result);
                 created = 1;
+        } else {
+        /* (1) client wants transno when open to keep a ref count for replay;
+         *     see after_reply() and mdc_close_commit();
+         * (2) we need to record the transaction related stuff onto disk;
+         * The question is: when do a read_only open, do we still need transno?
+         */
+                struct txn_param txn;
+                struct thandle *th;
+                struct dt_device *dt = info->mti_mdt->mdt_bottom;
+                txn.tp_credits = 1;
+
+                LASSERT(dt);
+                th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
+                if (!IS_ERR(th)) {
+                        dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
+                        /* we have to get attr & lov ea for this object*/
+                        result = mo_attr_get(info->mti_ctxt, 
+                                             mdt_object_child(child), ma);
+                        if (result)
+                                GOTO(out_child, result);
+                } else
+                        GOTO(out_child, result = PTR_ERR(th));
         }
 
         /* Try to open it now. */
-        result = mdt_mfd_open(info, parent, child, create_flags, created);
+        result = mdt_mfd_open(info, parent, child, create_flags, 
+                              created, ldlm_rep);
         GOTO(finish_open, result);
 
 finish_open:
index 67ffcbf..555ff3a 100644 (file)
@@ -577,7 +577,7 @@ static mdt_reinter reinters[REINT_MAX] = {
         [REINT_LINK] = mdt_reint_link,
         [REINT_UNLINK] = mdt_reint_unlink,
         [REINT_RENAME] = mdt_reint_rename,
-        [REINT_OPEN] = mdt_reint_open
+        [REINT_OPEN] = mdt_open
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info)