Whamcloud - gitweb
- introduce the struct md_create_spec. It contains the various type-depended
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 0af5147..2f05e25 100644 (file)
@@ -31,6 +31,8 @@
 #endif
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <linux/lustre_acl.h>
+#include <lustre_mds.h>
 #include "mdt_internal.h"
 
 /* we do nothing because we do not have refcount now */
@@ -70,15 +72,96 @@ static void mdt_mfd_free(struct mdt_file_data *mfd)
         OBD_FREE_PTR(mfd);
 }
 
-static int mdt_create_data_obj(struct mdt_thread_info *info, 
+static int mdt_create_data_obj(struct mdt_thread_info *info,
                               struct mdt_object *p, struct mdt_object *o)
 {
         struct md_attr   *ma = &info->mti_attr;
         struct mdt_reint_record *mrr = &info->mti_rr;
 
-        return mdo_create_data_object(info->mti_ctxt, mdt_object_child(p),
-                                 mdt_object_child(o), mrr->rr_eadata, 
-                                 mrr->rr_eadatalen, ma);
+        return mdo_create_data(info->mti_ctxt, mdt_object_child(p),
+                               mdt_object_child(o), mrr->rr_eadata,
+                               mrr->rr_eadatalen, ma);
+}
+
+
+/*The following four functions are copied from MDS */
+
+/* Write access to a file: executors cause a negative count,
+ * writers a positive count.  The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * FIXME and TODO : handle the epoch!
+ * epoch argument is nonzero during recovery */
+static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o,
+                                __u64 epoch)
+{
+        int rc = 0;
+        ENTRY;
+
+        spin_lock(&mdt->mdt_epoch_lock);
+
+        if (atomic_read(&o->mot_writecount) < 0) {
+                rc = -ETXTBSY;
+        } else {
+                if (o->mot_io_epoch != 0) {
+                        CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID3"\n",
+                               o->mot_io_epoch, PFID3(mdt_object_fid(o)));
+                } else {
+                        if (epoch > mdt->mdt_io_epoch)
+                                mdt->mdt_io_epoch = epoch;
+                        else
+                                mdt->mdt_io_epoch++;
+                        o->mot_io_epoch = mdt->mdt_io_epoch;
+                        CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID3"\n",
+                               mdt->mdt_io_epoch, PFID3(mdt_object_fid(o)));
+                }
+                atomic_inc(&o->mot_writecount);
+        }
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static int mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+        int rc;
+        ENTRY;
+
+        spin_lock(&mdt->mdt_epoch_lock);
+        atomic_dec(&o->mot_writecount);
+        rc = atomic_read(&o->mot_writecount);
+        if (rc == 0)
+                o->mot_io_epoch = 0;
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+        int rc = 0;
+        ENTRY;
+        spin_lock(&mdt->mdt_epoch_lock);
+        if (atomic_read(&o->mot_writecount) > 0) {
+                rc = -ETXTBSY;
+        } else
+                atomic_dec(&o->mot_writecount);
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static void mdt_allow_write_access(struct mdt_object *o)
+{
+        ENTRY;
+        atomic_inc(&o->mot_writecount);
+        EXIT;
+}
+
+int mdt_query_write_access(struct mdt_object *o)
+{
+        ENTRY;
+        RETURN(atomic_read(&o->mot_writecount));
 }
 
 static int mdt_mfd_open(struct mdt_thread_info *info,
@@ -88,78 +171,94 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
 {
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
+        struct mdt_device      *mdt = info->mti_mdt;
         struct mdt_body        *repbody;
         struct md_attr         *ma = &info->mti_attr;
         struct lu_attr         *la = &ma->ma_attr;
         struct ptlrpc_request  *req = mdt_info_req(info);
+        struct ldlm_reply      *ldlm_rep;
         int                     rc = 0;
+        int                     isreg, isdir, islnk;
         ENTRY;
 
-        med = &req->rq_export->exp_mdt_data;
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
 
         if (!created) {
                 /* we have to get attr & lov ea for this object*/
                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
+                if (rc)
+                        RETURN(rc);
         }
-        if (rc == 0){
-                if (!S_ISREG(la->la_mode) &&
-                    !S_ISDIR(la->la_mode) &&
-                    (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH ||
-                     S_ISLNK(la->la_mode)))
-                        /* If client supports this, do not return open handle
-                        *  for special nodes */
-                        RETURN(0);
-                if ((S_ISREG(la->la_mode) || S_ISDIR(la->la_mode))
-                     && !created && !(ma->ma_valid & MA_LOV)) {
-                        /*No EA, check whether it is will set regEA and dirEA
-                         *since in above attr get, these size might be zero,
-                         *so reset it, to retrieve the MD after create obj*/
-                        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
-                                                            &RMF_MDT_MD,
-                                                            RCL_SERVER);
-                        LASSERT(p != NULL);
-                        rc = mdt_create_data_obj(info, p, o);
-                        if (rc)
-                                RETURN(rc);
+        isreg = S_ISREG(la->la_mode);
+        isdir = S_ISDIR(la->la_mode);
+        islnk = S_ISLNK(la->la_mode);
+        if (ma->ma_valid & MA_INODE)
+                mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
+
+        /* if we are following a symlink, don't open
+         * do not return open handle for special nodes as client required
+         */
+        if (islnk || (!isreg && !isdir &&
+            (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
+                info->mti_trans_flags |= MDT_NONEED_TANSNO; 
+                RETURN(0);
+        }
+        /* FIXME:maybe this can be done earlier? */
+        if (isdir) {
+                if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
+                        /* we are trying to create or
+                         * write an existing dir. */
+                        RETURN(-EISDIR);
                 }
-                /* FIXME:maybe this can be done earlier? */
-                if (S_ISDIR(la->la_mode)) {
-                        if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
-                                /* we are trying to create or
-                                 * write an existing dir. */
-                                rc = -EISDIR;
-                        }
-                } else if (flags & MDS_OPEN_DIRECTORY)
-                        rc = -ENOTDIR;
+        } else if (flags & MDS_OPEN_DIRECTORY)
+                RETURN(-ENOTDIR);
+
+        if ((isreg) && !(ma->ma_valid & MA_LOV)) {
+                /*No EA, check whether it is will set regEA and dirEA
+                 *since in above attr get, these size might be zero,
+                 *so reset it, to retrieve the MD after create obj*/
+                ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+                                                       &RMF_MDT_MD,
+                                                       RCL_SERVER);
+                LASSERT(p != NULL);
+                /*XXX: Tom, do we need this?
+                rc = mdt_create_data_obj(info, p, o);
+                if (rc)
+                        RETURN(rc);
+                */
         }
-        if (rc != 0)
-                RETURN(rc);
 
-        CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64"\n", ma->ma_valid);
-        CDEBUG(D_INODE, "after open, lmm_size = %d\n", ma->ma_lmm_size);
+        CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n", 
+                        ma->ma_valid, ma->ma_lmm_size);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
-        if (ma->ma_valid & MA_INODE)
-                mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
         if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
                 repbody->eadatasize = ma->ma_lmm_size;
-                if (S_ISDIR(la->la_mode))
+                if (isdir)
                         repbody->valid |= OBD_MD_FLDIREA;
                 else
                         repbody->valid |= OBD_MD_FLEASIZE;
         }
-
-        /*FIXME: should determine the offset dynamicly */
+        /*FIXME: should determine the offset dynamicly, 
+         *did not get ACL before shrink*/
         lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
-        lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize, 0);
+        lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
+                            0);
+
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
 
         if (flags & FMODE_WRITE) {
-                /*mds_get_write_access*/
+                /* FIXME: in recovery, need to pass old epoch here */
+                rc = mdt_get_write_access(mdt, o, 0);
+                if (rc == 0)
+                        repbody->io_epoch = o->mot_io_epoch;
         } else if (flags & MDS_FMODE_EXEC) {
-                /*mds_deny_write_access*/
+                rc = mdt_deny_write_access(mdt, o);
         }
+        if (rc)
+                RETURN(rc);
 
         /* (1) client wants transno when open to keep a ref count for replay;
          *     see after_reply() and mdc_close_commit();
@@ -174,9 +273,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
 
                 LASSERT(dt);
                 th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
-                if (!IS_ERR(th)) 
+                if (!IS_ERR(th))
                         dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
-                else 
+                else
                         RETURN(PTR_ERR(th));
         }
 
@@ -190,6 +289,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 mfd->mfd_object = o;
                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
 
+                med = &req->rq_export->exp_mdt_data;
                 spin_lock(&med->med_open_lock);
                 list_add(&mfd->mfd_list, &med->med_open_head);
                 spin_unlock(&med->med_open_lock);
@@ -222,6 +322,7 @@ int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
                         if (la->la_flags & MDS_OPEN_CREAT) {
                                 rc = mo_object_create(info->mti_ctxt,
                                                       mdt_object_child(o),
+                                                      &info->mti_spec,
                                                       &info->mti_attr);
                                 if (rc == 0)
                                         rc = mdt_mfd_open(info, NULL, o, flags, 1);
@@ -263,11 +364,15 @@ int mdt_reint_open(struct mdt_thread_info *info)
         struct mdt_reint_record *rr = &info->mti_rr;
         ENTRY;
 
-        ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
-                                            &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
-                                               &RMF_MDT_MD,
-                                               RCL_SERVER);
+        req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
+                             mdt->mdt_max_mdsize);
+
+        result = req_capsule_pack(&info->mti_pill);
+        if (result)
+                RETURN(result);
+
+        ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
+        ma->ma_lmm_size = mdt->mdt_max_mdsize;
 
         if (rr->rr_name[0] == 0) {
                 /* reint partial remote open */
@@ -278,9 +383,13 @@ int mdt_reint_open(struct mdt_thread_info *info)
         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
 
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        CDEBUG(D_INODE, "I am going to create "DFID3"/("DFID3":%s) flag=%x\n",
+                        PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), 
+                        rr->rr_name, la->la_flags);
 
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+
         lh = &info->mti_lh[MDT_LH_PARENT];
         if (!(la->la_flags & MDS_OPEN_CREAT))
                 lh->mlh_mode = LCK_CR;
@@ -319,8 +428,9 @@ int mdt_reint_open(struct mdt_thread_info *info)
                                     mdt_object_child(parent),
                                     rr->rr_name,
                                     mdt_object_child(child),
-                                    rr->rr_tgt, rr->rr_eadata,
-                                    rr->rr_eadatalen, &info->mti_attr);
+                                    &info->mti_spec,
+                                    /* rr->rr_tgt, rr->rr_eadata, rr->rr_eadatalen,*/
+                                    &info->mti_attr);
                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
                 if (result != 0)
                         GOTO(out_child, result);
@@ -329,7 +439,6 @@ int mdt_reint_open(struct mdt_thread_info *info)
 
         /* Open it now. */
         result = mdt_mfd_open(info, parent, child, la->la_flags, created);
-        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
         GOTO(finish_open, result);
 
 finish_open:
@@ -348,15 +457,16 @@ out:
         return result;
 }
 
-void mdt_mfd_close(const struct lu_context *ctxt,
+void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
                    struct mdt_file_data *mfd)
 {
+        struct mdt_object *o = mfd->mfd_object;
         ENTRY;
 
         if (mfd->mfd_mode & FMODE_WRITE) {
-                /*mdt_put_write_access*/
+                mdt_put_write_access(mdt, o);
         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
-                /*mdt_allow_write_access*/
+                mdt_allow_write_access(o);
         }
 
         /* release reference on this object.
@@ -393,15 +503,15 @@ int mdt_close(struct mdt_thread_info *info)
                 spin_unlock(&med->med_open_lock);
 
                 o = mfd->mfd_object;
-                ma->ma_lmm = req_capsule_server_get(&info->mti_pill, 
+                ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
                                                     &RMF_MDT_MD);
                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
                                                        &RMF_MDT_MD, RCL_SERVER);
                 rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
                 if (rc == 0)
-                        rc = mdt_handle_last_unlink(info, o);
+                        rc = mdt_handle_last_unlink(info, o, ma);
 
-                mdt_mfd_close(info->mti_ctxt, mfd);
+                mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
         }
         mdt_shrink_reply(info);
         RETURN(rc);