Whamcloud - gitweb
added some open recovery prototype code.
authorhuanghua <huanghua>
Thu, 31 Aug 2006 14:21:10 +0000 (14:21 +0000)
committerhuanghua <huanghua>
Thu, 31 Aug 2006 14:21:10 +0000 (14:21 +0000)
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c

index 137e594..ce51010 100644 (file)
@@ -1546,9 +1546,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         struct ldlm_reply      *ldlm_rep;
         struct mdt_lock_handle  tmp_lock;
         struct mdt_lock_handle *lhc = &tmp_lock;
-        struct mdt_device      *mdt = info->mti_mdt;
         __u64                   child_bits;
-        int                     rc;
 
         ENTRY;
 
index 8d4929a..e34ed29 100644 (file)
@@ -402,6 +402,7 @@ int mdt_done_writing(struct mdt_thread_info *info);
 void mdt_shrink_reply(struct mdt_thread_info *info, int offset);
 int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *,
                            const struct md_attr *);
+void mdt_reconstruct_open(struct mdt_thread_info *);
 
 void mdt_dump_lmm(int level, struct lov_mds_md *lmm);
 
index b62c0a1..6fc5e66 100644 (file)
@@ -300,6 +300,186 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
+extern void mdt_req_from_mcd(struct ptlrpc_request *req,
+                             struct mdt_client_data *mcd);
+
+
+/* TODO: much of the code here can be shared with mdt_open() 
+ *       and mdt_mfd_open(). This will be done later. :-)
+ */
+void mdt_reconstruct_open(struct mdt_thread_info *info)
+{
+        struct lu_fid          *child_fid = &info->mti_tmp_fid1;
+        __u32                   flags  = info->mti_spec.sp_cr_flags;
+        struct req_capsule      *pill  = &info->mti_pill;
+        struct ptlrpc_request   *req   = mdt_info_req(info);
+        struct mdt_export_data  *med   = &req->rq_export->exp_mdt_data;
+        struct mdt_client_data  *mcd   = med->med_mcd;
+        struct md_attr          *ma    = &info->mti_attr;
+        struct lu_attr          *la    = &ma->ma_attr;
+        struct mdt_device       *mdt   = info->mti_mdt;
+        struct mdt_reint_record *rr    = &info->mti_rr;
+        const struct lu_context *ctxt  = info->mti_ctxt;
+        struct mdt_object       *parent;
+        struct mdt_object       *child;
+        struct ldlm_reply       *ldlm_rep;
+        struct mdt_body         *repbody;
+        struct mdt_file_data    *mfd;
+        struct list_head        *h;
+        int                      result;
+        int                      isreg, isdir, islnk;
+        ENTRY;
+
+        LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
+
+        mdt_req_from_mcd(req, med->med_mcd);
+        mdt_set_disposition(info, ldlm_rep, le32_to_cpu(mcd->mcd_last_data));
+
+        if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) && req->rq_status) {
+                /* we did not create successfully, return error to client. */
+                EXIT;
+                return;
+        }
+
+        mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
+        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
+        parent = mdt_object_find(ctxt, mdt, rr->rr_fid1);
+        if (IS_ERR(parent)) {
+                /* FIXME: should this be assert? */
+                CERROR("Cannot find parent "DFID" while reconstruct open\n",
+                        PFID(rr->rr_fid1));
+                GOTO(out, result = PTR_ERR(parent));
+        }
+
+        result = mdo_lookup(ctxt, mdt_object_child(parent),
+                            rr->rr_name, child_fid);
+        if (result == 0)
+                mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
+        else {
+                if (result == -ENOENT || result == -ESTALE) {
+                        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
+                        result = -ENOENT;
+                }
+                GOTO(out_parent, result);
+        }
+        
+        LASSERT(lu_fid_eq(rr->rr_fid2, child_fid));
+        child = mdt_object_find(ctxt, mdt, child_fid);
+        if (IS_ERR(child))
+                GOTO(out_parent, result = PTR_ERR(child));
+
+        ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
+        ma->ma_lmm_size = mdt->mdt_max_mdsize;
+        ma->ma_need = MA_INODE | MA_LOV;
+        repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
+        repbody->eadatasize = 0;
+        repbody->aclsize = 0;
+
+        result = mo_attr_get(ctxt, mdt_object_child(child), ma);
+        if (result == -EREMOTE) {
+                /* the object is on remote node
+                 * return its FID for remote open */
+                repbody->fid1 = *mdt_object_fid(child);
+                repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+                GOTO(out_child, result = 0);
+        } else if(result != 0)
+                GOTO(out_child, result);
+
+        LASSERT(ma->ma_valid & MA_INODE);
+        isreg = S_ISREG(la->la_mode);
+        isdir = S_ISDIR(la->la_mode);
+        islnk = S_ISLNK(la->la_mode);
+        mdt_pack_attr2body(repbody, la, mdt_object_fid(child));
+
+        /* if we are following a symlink, don't open; and
+         * do not return open handle for special nodes as client required
+         */
+        if (islnk || (!isreg && !isdir &&
+            (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
+                info->mti_trans_flags |= MDT_NONEED_TRANSNO;
+                GOTO(out_child, 0);
+        }
+
+        mdt_set_disposition(info, ldlm_rep, DISP_OPEN_OPEN);
+        if (!mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) &&
+             (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT))
+                GOTO(out_child, result = -EEXIST);
+
+        if (isdir) {
+                if (flags & (MDS_OPEN_CREAT | FMODE_WRITE))
+                        /* we are trying to create or
+                         * write an existing dir. */
+                        GOTO(out_child, result = -EISDIR);
+        } else if (flags & MDS_OPEN_DIRECTORY)
+                GOTO(out_child, result = -ENOTDIR);
+
+        /* at this point, regular file should have lov */
+        LASSERT(ergo(isreg, ma->ma_valid & MA_LOV));
+
+        if (ma->ma_valid & MA_LOV) {
+                LASSERT(ma->ma_lmm_size);
+                repbody->eadatasize = ma->ma_lmm_size;
+                if (isdir)
+                        repbody->valid |= OBD_MD_FLDIREA;
+                else
+                        repbody->valid |= OBD_MD_FLEASIZE;
+        }
+        mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
+
+        if (flags & FMODE_WRITE) {
+                /* FIXME: in recovery, need to pass old epoch here */
+                result = mdt_get_write_access(mdt, child, 0);
+                if (result == 0)
+                        repbody->io_epoch = child->mot_io_epoch;
+        } else if (flags & MDS_FMODE_EXEC)
+                result = mdt_deny_write_access(mdt, child);
+        if (result)
+                GOTO(out_child, result);
+
+        result = mo_open(ctxt, mdt_object_child(child), flags);
+        if (result)
+                GOTO(out_child, result);
+
+        mfd = NULL;
+        spin_lock(&med->med_open_lock);
+        list_for_each(h, &med->med_open_head) {
+                mfd = list_entry(h, struct mdt_file_data, mfd_list);
+                if (mfd->mfd_xid == req->rq_xid) {
+                        break;
+                }
+                mfd = NULL;
+        }
+        spin_unlock(&med->med_open_lock);
+
+        if (mfd != NULL) {
+                repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+                GOTO(out_child, 0);
+        }
+
+        mfd = mdt_mfd_new();
+        if (mfd != NULL) {
+                mdt_object_get(ctxt, child);
+
+                mfd->mfd_mode = flags;
+                mfd->mfd_object = child;
+                mfd->mfd_xid = req->rq_xid;
+
+                med = &req->rq_export->exp_mdt_data;
+                spin_lock(&med->med_open_lock);
+                list_add(&mfd->mfd_list, &med->med_open_head);
+                spin_unlock(&med->med_open_lock);
+                repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+        } else
+                result = -ENOMEM;
+        EXIT;
+out_child:
+        mdt_object_put(ctxt, child);
+out_parent:
+        mdt_object_put(ctxt, parent);
+out:
+        req->rq_status = result;
+}
+
 static int mdt_open_by_fid(struct mdt_thread_info* info, 
                            const struct lu_fid *fid,
                            struct ldlm_reply *rep, 
@@ -319,29 +499,20 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
         if (rc > 0) {
                 /* successfully found the child object */
                 if (flags & MDS_OPEN_EXCL && flags & MDS_OPEN_CREAT)
-                                rc = -EEXIST;
+                        rc = -EEXIST;
                 else {
-                        struct txn_param txn;
-                        struct thandle *th;
-                        struct dt_device *dt = info->mti_mdt->mdt_bottom;
-                        txn.tp_credits = 1;
-
-                        LASSERT(dt);
-                        th = dt->dd_ops->dt_trans_start(info->mti_ctxt, 
-                                                        dt, &txn);
-                        if (!IS_ERR(th)) {
-                                dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
-                                rc = mo_attr_get(info->mti_ctxt, 
-                                                 mdt_object_child(o), ma);
-                        }
-                        else
-                                rc = PTR_ERR(th);
-                        if (rc == 0) {
-                                mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
-                                mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
-                                mdt_set_disposition(info, rep, DISP_OPEN_OPEN);
+                        const struct lu_context *ctxt = info->mti_ctxt;
+                        struct mdt_device *mdt = info->mti_mdt;
+
+                        spin_lock(&mdt->mdt_transno_lock);
+                        info->mti_transno = ++ mdt->mdt_last_transno;
+                        spin_unlock(&mdt->mdt_transno_lock);
+                        
+                        mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
+                        mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
+                        rc = mo_attr_get(ctxt, mdt_object_child(o), ma);
+                        if (rc == 0)
                                 rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
-                        }
                 } 
         } else if (rc == 0) {
                 rc = -ENOENT;
index 8557633..0b4928f 100644 (file)
@@ -663,8 +663,8 @@ void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt)
 }
 
 /* reconstruction code */
-static inline void mdt_req_from_mcd(struct ptlrpc_request *req,
-                                    struct mdt_client_data *mcd)
+void mdt_req_from_mcd(struct ptlrpc_request *req,
+                      struct mdt_client_data *mcd)
 {
         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
                   mcd->mcd_last_transno, mcd->mcd_last_result);
@@ -744,16 +744,6 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti)
         mdt_object_put(mti->mti_ctxt, obj);
 }
 
-static void mdt_reconstruct_open(struct mdt_thread_info *mti)
-{
-        struct ptlrpc_request *req = mdt_info_req(mti);
-        struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
-
-        /*TODO: after open recovery task will be finished */
-
-        mdt_req_from_mcd(req, med->med_mcd);
-}
-
 typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti);
 
 static mdt_reconstructor reconstructors[REINT_MAX] = {