Whamcloud - gitweb
- introduce the struct md_create_spec. It contains the various type-depended
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 397c78b..2f05e25 100644 (file)
@@ -83,6 +83,87 @@ static int mdt_create_data_obj(struct mdt_thread_info *info,
                                mrr->rr_eadatalen, ma);
 }
 
+
+/*The following four functions are copied from MDS */
+
+/* Write access to a file: executors cause a negative count,
+ * writers a positive count.  The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * FIXME and TODO : handle the epoch!
+ * epoch argument is nonzero during recovery */
+static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o,
+                                __u64 epoch)
+{
+        int rc = 0;
+        ENTRY;
+
+        spin_lock(&mdt->mdt_epoch_lock);
+
+        if (atomic_read(&o->mot_writecount) < 0) {
+                rc = -ETXTBSY;
+        } else {
+                if (o->mot_io_epoch != 0) {
+                        CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID3"\n",
+                               o->mot_io_epoch, PFID3(mdt_object_fid(o)));
+                } else {
+                        if (epoch > mdt->mdt_io_epoch)
+                                mdt->mdt_io_epoch = epoch;
+                        else
+                                mdt->mdt_io_epoch++;
+                        o->mot_io_epoch = mdt->mdt_io_epoch;
+                        CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID3"\n",
+                               mdt->mdt_io_epoch, PFID3(mdt_object_fid(o)));
+                }
+                atomic_inc(&o->mot_writecount);
+        }
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static int mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+        int rc;
+        ENTRY;
+
+        spin_lock(&mdt->mdt_epoch_lock);
+        atomic_dec(&o->mot_writecount);
+        rc = atomic_read(&o->mot_writecount);
+        if (rc == 0)
+                o->mot_io_epoch = 0;
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+        int rc = 0;
+        ENTRY;
+        spin_lock(&mdt->mdt_epoch_lock);
+        if (atomic_read(&o->mot_writecount) > 0) {
+                rc = -ETXTBSY;
+        } else
+                atomic_dec(&o->mot_writecount);
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static void mdt_allow_write_access(struct mdt_object *o)
+{
+        ENTRY;
+        atomic_inc(&o->mot_writecount);
+        EXIT;
+}
+
+int mdt_query_write_access(struct mdt_object *o)
+{
+        ENTRY;
+        RETURN(atomic_read(&o->mot_writecount));
+}
+
 static int mdt_mfd_open(struct mdt_thread_info *info,
                         struct mdt_object *p,
                         struct mdt_object *o,
@@ -90,6 +171,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
 {
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
+        struct mdt_device      *mdt = info->mti_mdt;
         struct mdt_body        *repbody;
         struct md_attr         *ma = &info->mti_attr;
         struct lu_attr         *la = &ma->ma_attr;
@@ -99,7 +181,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         int                     isreg, isdir, islnk;
         ENTRY;
 
-        med = &req->rq_export->exp_mdt_data;
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
 
         if (!created) {
@@ -114,7 +195,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         if (ma->ma_valid & MA_INODE)
                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
 
-        /* if we are following a symlink, don't open */
+        /* if we are following a symlink, don't open
+         * do not return open handle for special nodes as client required
+         */
         if (islnk || (!isreg && !isdir &&
             (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
                 info->mti_trans_flags |= MDT_NONEED_TANSNO; 
@@ -130,13 +213,6 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         } else if (flags & MDS_OPEN_DIRECTORY)
                 RETURN(-ENOTDIR);
 
-
-        if (!isreg && !isdir &&
-            (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))
-                /* If client supports this, do not return open handle
-                *  for special nodes */
-                RETURN(0);
-
         if ((isreg) && !(ma->ma_valid & MA_LOV)) {
                 /*No EA, check whether it is will set regEA and dirEA
                  *since in above attr get, these size might be zero,
@@ -145,13 +221,15 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                                                        &RMF_MDT_MD,
                                                        RCL_SERVER);
                 LASSERT(p != NULL);
+                /*XXX: Tom, do we need this?
                 rc = mdt_create_data_obj(info, p, o);
                 if (rc)
                         RETURN(rc);
+                */
         }
 
-        CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64"\n", ma->ma_valid);
-        CDEBUG(D_INODE, "after open, lmm_size = %d\n", ma->ma_lmm_size);
+        CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n", 
+                        ma->ma_valid, ma->ma_lmm_size);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
@@ -162,21 +240,25 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 else
                         repbody->valid |= OBD_MD_FLEASIZE;
         }
-
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
-        
-        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
         /*FIXME: should determine the offset dynamicly, 
          *did not get ACL before shrink*/
         lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
         lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
                             0);
 
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+
         if (flags & FMODE_WRITE) {
-                /*mds_get_write_access*/
+                /* FIXME: in recovery, need to pass old epoch here */
+                rc = mdt_get_write_access(mdt, o, 0);
+                if (rc == 0)
+                        repbody->io_epoch = o->mot_io_epoch;
         } else if (flags & MDS_FMODE_EXEC) {
-                /*mds_deny_write_access*/
+                rc = mdt_deny_write_access(mdt, o);
         }
+        if (rc)
+                RETURN(rc);
 
         /* (1) client wants transno when open to keep a ref count for replay;
          *     see after_reply() and mdc_close_commit();
@@ -207,6 +289,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 mfd->mfd_object = o;
                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
 
+                med = &req->rq_export->exp_mdt_data;
                 spin_lock(&med->med_open_lock);
                 list_add(&mfd->mfd_list, &med->med_open_head);
                 spin_unlock(&med->med_open_lock);
@@ -239,6 +322,7 @@ int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
                         if (la->la_flags & MDS_OPEN_CREAT) {
                                 rc = mo_object_create(info->mti_ctxt,
                                                       mdt_object_child(o),
+                                                      &info->mti_spec,
                                                       &info->mti_attr);
                                 if (rc == 0)
                                         rc = mdt_mfd_open(info, NULL, o, flags, 1);
@@ -288,8 +372,7 @@ int mdt_reint_open(struct mdt_thread_info *info)
                 RETURN(result);
 
         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
-                                               &RMF_MDT_MD, RCL_SERVER);
+        ma->ma_lmm_size = mdt->mdt_max_mdsize;
 
         if (rr->rr_name[0] == 0) {
                 /* reint partial remote open */
@@ -300,9 +383,13 @@ int mdt_reint_open(struct mdt_thread_info *info)
         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
 
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        CDEBUG(D_INODE, "I am going to create "DFID3"/("DFID3":%s) flag=%x\n",
+                        PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), 
+                        rr->rr_name, la->la_flags);
 
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+
         lh = &info->mti_lh[MDT_LH_PARENT];
         if (!(la->la_flags & MDS_OPEN_CREAT))
                 lh->mlh_mode = LCK_CR;
@@ -341,8 +428,9 @@ int mdt_reint_open(struct mdt_thread_info *info)
                                     mdt_object_child(parent),
                                     rr->rr_name,
                                     mdt_object_child(child),
-                                    rr->rr_tgt, rr->rr_eadata,
-                                    rr->rr_eadatalen, &info->mti_attr);
+                                    &info->mti_spec,
+                                    /* rr->rr_tgt, rr->rr_eadata, rr->rr_eadatalen,*/
+                                    &info->mti_attr);
                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
                 if (result != 0)
                         GOTO(out_child, result);
@@ -369,15 +457,16 @@ out:
         return result;
 }
 
-void mdt_mfd_close(const struct lu_context *ctxt,
+void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
                    struct mdt_file_data *mfd)
 {
+        struct mdt_object *o = mfd->mfd_object;
         ENTRY;
 
         if (mfd->mfd_mode & FMODE_WRITE) {
-                /*mdt_put_write_access*/
+                mdt_put_write_access(mdt, o);
         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
-                /*mdt_allow_write_access*/
+                mdt_allow_write_access(o);
         }
 
         /* release reference on this object.
@@ -422,7 +511,7 @@ int mdt_close(struct mdt_thread_info *info)
                 if (rc == 0)
                         rc = mdt_handle_last_unlink(info, o, ma);
 
-                mdt_mfd_close(info->mti_ctxt, mfd);
+                mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
         }
         mdt_shrink_reply(info);
         RETURN(rc);