Whamcloud - gitweb
add EXEC & WRITE pretection support on file to pass sanity 43
authorhuanghua <huanghua>
Sat, 5 Aug 2006 17:02:50 +0000 (17:02 +0000)
committerhuanghua <huanghua>
Sat, 5 Aug 2006 17:02:50 +0000 (17:02 +0000)
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c

index 3d907c4..b5c2f6b 100644 (file)
@@ -193,6 +193,8 @@ void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
         if (fid) {
                 b->fid1 = *fid;
                 b->valid |= OBD_MD_FLID;
+                CDEBUG(D_INODE, ""DFID3": nlink=%d, mode=%o, size="LPU64"\n",
+                                PFID3(fid), b->nlink, b->mode, b->size);
         }
 }
 
@@ -2271,6 +2273,7 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
         m->mdt_max_mdsize = MAX_MD_SIZE;
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
 
+        spin_lock_init(&m->mdt_epoch_lock);
         /* Temporary. should parse mount option. */
         m->mdt_opts.mo_user_xattr = 0;
         m->mdt_opts.mo_acl = 0;
@@ -2621,7 +2624,7 @@ static int mdt_destroy_export(struct obd_export *export)
                 class_handle_unhash(&mfd->mfd_handle);
                 list_del_init(&mfd->mfd_list);
                 spin_unlock(&med->med_open_lock);
-                mdt_mfd_close(&ctxt, mfd);
+                mdt_mfd_close(&ctxt, mdt, mfd);
                 spin_lock(&med->med_open_lock);
         }
         spin_unlock(&med->med_open_lock);
index d651a2c..929f4aa 100644 (file)
@@ -118,10 +118,18 @@ struct mdt_device {
                 signed int         mo_acl        :1;
                 signed int         mo_compat_resname:1;
         } mdt_opts;
+        
+        /* lock to pretect epoch and write count
+         * because we need not allocate memory, spinlock is fast.
+         */
+        spinlock_t                 mdt_epoch_lock;
+        __u64                      mdt_io_epoch;
+
         /* Transaction related stuff here */
         spinlock_t                 mdt_transno_lock;
         __u64                      mdt_last_transno;
         __u64                      mdt_last_committed;
+
         /* transaction callbacks */
         struct dt_txn_callback     mdt_txn_cb;
         /* last_rcvd file */
@@ -145,6 +153,8 @@ struct mdt_device {
 struct mdt_object {
         struct lu_object_header mot_header;
         struct md_object        mot_obj;
+        __u64                   mot_io_epoch;
+        atomic_t                mot_writecount;
 };
 
 struct mdt_lock_handle {
@@ -357,7 +367,7 @@ int mdt_lock_new_child(struct mdt_thread_info *info,
 
 int mdt_reint_open(struct mdt_thread_info *info);
 
-void mdt_mfd_close(const struct lu_context *ctxt,
+void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
                    struct mdt_file_data *mfd);
 
 int mdt_close(struct mdt_thread_info *info);
index 711faf3..60c0777 100644 (file)
@@ -83,6 +83,87 @@ static int mdt_create_data_obj(struct mdt_thread_info *info,
                                mrr->rr_eadatalen, ma);
 }
 
+
+/*The following four functions are copied from MDS */
+
+/* Write access to a file: executors cause a negative count,
+ * writers a positive count.  The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * FIXME and TODO : handle the epoch!
+ * epoch argument is nonzero during recovery */
+static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o,
+                                __u64 epoch)
+{
+        int rc = 0;
+        ENTRY;
+
+        spin_lock(&mdt->mdt_epoch_lock);
+
+        if (atomic_read(&o->mot_writecount) < 0) {
+                rc = -ETXTBSY;
+        } else {
+                if (o->mot_io_epoch != 0) {
+                        CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID3"\n",
+                               o->mot_io_epoch, PFID3(mdt_object_fid(o)));
+                } else {
+                        if (epoch > mdt->mdt_io_epoch)
+                                mdt->mdt_io_epoch = epoch;
+                        else
+                                mdt->mdt_io_epoch++;
+                        o->mot_io_epoch = mdt->mdt_io_epoch;
+                        CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID3"\n",
+                               mdt->mdt_io_epoch, PFID3(mdt_object_fid(o)));
+                }
+                atomic_inc(&o->mot_writecount);
+        }
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static int mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+        int rc;
+        ENTRY;
+
+        spin_lock(&mdt->mdt_epoch_lock);
+        atomic_dec(&o->mot_writecount);
+        rc = atomic_read(&o->mot_writecount);
+        if (rc == 0)
+                o->mot_io_epoch = 0;
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+        int rc = 0;
+        ENTRY;
+        spin_lock(&mdt->mdt_epoch_lock);
+        if (atomic_read(&o->mot_writecount) > 0) {
+                rc = -ETXTBSY;
+        } else
+                atomic_dec(&o->mot_writecount);
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static void mdt_allow_write_access(struct mdt_object *o)
+{
+        ENTRY;
+        atomic_inc(&o->mot_writecount);
+        EXIT;
+}
+
+int mdt_query_write_access(struct mdt_object *o)
+{
+        ENTRY;
+        RETURN(atomic_read(&o->mot_writecount));
+}
+
 static int mdt_mfd_open(struct mdt_thread_info *info,
                         struct mdt_object *p,
                         struct mdt_object *o,
@@ -90,6 +171,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
 {
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
+        struct mdt_device      *mdt = info->mti_mdt;
         struct mdt_body        *repbody;
         struct md_attr         *ma = &info->mti_attr;
         struct lu_attr         *la = &ma->ma_attr;
@@ -99,7 +181,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         int                     isreg, isdir, islnk;
         ENTRY;
 
-        med = &req->rq_export->exp_mdt_data;
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
 
         if (!created) {
@@ -114,7 +195,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         if (ma->ma_valid & MA_INODE)
                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
 
-        /* if we are following a symlink, don't open */
+        /* if we are following a symlink, don't open
+         * do not return open handle for special nodes as client required
+         */
         if (islnk || (!isreg && !isdir &&
             (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
                 info->mti_trans_flags |= MDT_NONEED_TANSNO; 
@@ -130,13 +213,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         } else if (flags & MDS_OPEN_DIRECTORY)
                 RETURN(-ENOTDIR);
 
-
-        if (!isreg && !isdir &&
-            (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))
-                /* If client supports this, do not return open handle
-                *  for special nodes */
-                RETURN(0);
-
         if ((isreg) && !(ma->ma_valid & MA_LOV)) {
                 /*No EA, check whether it is will set regEA and dirEA
                  *since in above attr get, these size might be zero,
@@ -150,8 +226,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                         RETURN(rc);
         }
 
-        CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64"\n", ma->ma_valid);
-        CDEBUG(D_INODE, "after open, lmm_size = %d\n", ma->ma_lmm_size);
+        CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n", 
+                        ma->ma_valid, ma->ma_lmm_size);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
@@ -162,21 +238,25 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 else
                         repbody->valid |= OBD_MD_FLEASIZE;
         }
-
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
-        
-        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
         /*FIXME: should determine the offset dynamicly, 
          *did not get ACL before shrink*/
         lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
         lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
                             0);
 
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+
         if (flags & FMODE_WRITE) {
-                /*mds_get_write_access*/
+                /* FIXME: in recovery, need to pass old epoch here */
+                rc = mdt_get_write_access(mdt, o, 0);
+                if (rc == 0)
+                        repbody->io_epoch = o->mot_io_epoch;
         } else if (flags & MDS_FMODE_EXEC) {
-                /*mds_deny_write_access*/
+                rc = mdt_deny_write_access(mdt, o);
         }
+        if (rc)
+                RETURN(rc);
 
         /* (1) client wants transno when open to keep a ref count for replay;
          *     see after_reply() and mdc_close_commit();
@@ -207,6 +287,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 mfd->mfd_object = o;
                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
 
+                med = &req->rq_export->exp_mdt_data;
                 spin_lock(&med->med_open_lock);
                 list_add(&mfd->mfd_list, &med->med_open_head);
                 spin_unlock(&med->med_open_lock);
@@ -299,9 +380,13 @@ int mdt_reint_open(struct mdt_thread_info *info)
         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
 
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        CDEBUG(D_INODE, "I am going to create "DFID3"/("DFID3":%s) flag=%x\n",
+                        PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), 
+                        rr->rr_name, la->la_flags);
 
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+
         lh = &info->mti_lh[MDT_LH_PARENT];
         if (!(la->la_flags & MDS_OPEN_CREAT))
                 lh->mlh_mode = LCK_CR;
@@ -368,15 +453,16 @@ out:
         return result;
 }
 
-void mdt_mfd_close(const struct lu_context *ctxt,
+void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
                    struct mdt_file_data *mfd)
 {
+        struct mdt_object *o = mfd->mfd_object;
         ENTRY;
 
         if (mfd->mfd_mode & FMODE_WRITE) {
-                /*mdt_put_write_access*/
+                mdt_put_write_access(mdt, o);
         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
-                /*mdt_allow_write_access*/
+                mdt_allow_write_access(o);
         }
 
         /* release reference on this object.
@@ -421,7 +507,7 @@ int mdt_close(struct mdt_thread_info *info)
                 if (rc == 0)
                         rc = mdt_handle_last_unlink(info, o, ma);
 
-                mdt_mfd_close(info->mti_ctxt, mfd);
+                mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
         }
         mdt_shrink_reply(info);
         RETURN(rc);