Whamcloud - gitweb
- introduce the struct md_create_spec. It contains the various type-depended
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 8a26b63..2f05e25 100644 (file)
@@ -31,6 +31,8 @@
 #endif
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <linux/lustre_acl.h>
+#include <lustre_mds.h>
 #include "mdt_internal.h"
 
 /* we do nothing because we do not have refcount now */
@@ -38,8 +40,8 @@ static void mdt_mfd_get(void *mfdp)
 {
 }
 
-/* Create a new mdt_file_data struct, initialize it, 
- * and insert it to global hash table */ 
+/* Create a new mdt_file_data struct, initialize it,
+ * and insert it to global hash table */
 static struct mdt_file_data *mdt_mfd_new(void)
 {
         struct mdt_file_data *mfd;
@@ -70,76 +72,211 @@ static void mdt_mfd_free(struct mdt_file_data *mfd)
         OBD_FREE_PTR(mfd);
 }
 
+static int mdt_create_data_obj(struct mdt_thread_info *info,
+                              struct mdt_object *p, struct mdt_object *o)
+{
+        struct md_attr   *ma = &info->mti_attr;
+        struct mdt_reint_record *mrr = &info->mti_rr;
+
+        return mdo_create_data(info->mti_ctxt, mdt_object_child(p),
+                               mdt_object_child(o), mrr->rr_eadata,
+                               mrr->rr_eadatalen, ma);
+}
+
+
+/*The following four functions are copied from MDS */
+
+/* Write access to a file: executors cause a negative count,
+ * writers a positive count.  The semaphore is needed to perform
+ * a check for the sign and then increment or decrement atomically.
+ *
+ * This code is closely tied to the allocation of the d_fsdata and the
+ * MDS epoch, so we use the same semaphore for the whole lot.
+ *
+ * FIXME and TODO : handle the epoch!
+ * epoch argument is nonzero during recovery */
+static int mdt_get_write_access(struct mdt_device *mdt, struct mdt_object *o,
+                                __u64 epoch)
+{
+        int rc = 0;
+        ENTRY;
+
+        spin_lock(&mdt->mdt_epoch_lock);
+
+        if (atomic_read(&o->mot_writecount) < 0) {
+                rc = -ETXTBSY;
+        } else {
+                if (o->mot_io_epoch != 0) {
+                        CDEBUG(D_INODE, "continue epoch "LPU64" for "DFID3"\n",
+                               o->mot_io_epoch, PFID3(mdt_object_fid(o)));
+                } else {
+                        if (epoch > mdt->mdt_io_epoch)
+                                mdt->mdt_io_epoch = epoch;
+                        else
+                                mdt->mdt_io_epoch++;
+                        o->mot_io_epoch = mdt->mdt_io_epoch;
+                        CDEBUG(D_INODE, "starting epoch "LPU64" for "DFID3"\n",
+                               mdt->mdt_io_epoch, PFID3(mdt_object_fid(o)));
+                }
+                atomic_inc(&o->mot_writecount);
+        }
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static int mdt_put_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+        int rc;
+        ENTRY;
+
+        spin_lock(&mdt->mdt_epoch_lock);
+        atomic_dec(&o->mot_writecount);
+        rc = atomic_read(&o->mot_writecount);
+        if (rc == 0)
+                o->mot_io_epoch = 0;
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static int mdt_deny_write_access(struct mdt_device *mdt, struct mdt_object *o)
+{
+        int rc = 0;
+        ENTRY;
+        spin_lock(&mdt->mdt_epoch_lock);
+        if (atomic_read(&o->mot_writecount) > 0) {
+                rc = -ETXTBSY;
+        } else
+                atomic_dec(&o->mot_writecount);
+        spin_unlock(&mdt->mdt_epoch_lock);
+        RETURN(rc);
+}
+
+static void mdt_allow_write_access(struct mdt_object *o)
+{
+        ENTRY;
+        atomic_inc(&o->mot_writecount);
+        EXIT;
+}
+
+int mdt_query_write_access(struct mdt_object *o)
+{
+        ENTRY;
+        RETURN(atomic_read(&o->mot_writecount));
+}
+
 static int mdt_mfd_open(struct mdt_thread_info *info,
-                        struct mdt_object *o, 
+                        struct mdt_object *p,
+                        struct mdt_object *o,
                         int flags, int created)
 {
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
+        struct mdt_device      *mdt = info->mti_mdt;
         struct mdt_body        *repbody;
         struct md_attr         *ma = &info->mti_attr;
         struct lu_attr         *la = &ma->ma_attr;
         struct ptlrpc_request  *req = mdt_info_req(info);
+        struct ldlm_reply      *ldlm_rep;
         int                     rc = 0;
+        int                     isreg, isdir, islnk;
         ENTRY;
 
-        med = &req->rq_export->exp_mdt_data;
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
 
         if (!created) {
                 /* we have to get attr & lov ea for this object*/
-                rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), la);
-                if (rc == 0) {
-                        ma->ma_valid |= MA_INODE;
-                        if (S_ISREG(la->la_mode)) {
-                                rc = mo_xattr_get(info->mti_ctxt, 
-                                                  mdt_object_child(o),
-                                                  ma->ma_lmm, 
-                                                  ma->ma_lmm_size,
-                                                  XATTR_NAME_LOV);
-                                if (rc >= 0) {
-                                        ma->ma_lmm_size = rc;
-                                        rc = 0;
-                                        ma->ma_valid |= MA_LOV;
-                                }
-                        }
-                }
-        }
-        if (rc == 0){
-                if (!S_ISREG(la->la_mode) &&
-                    !S_ISDIR(la->la_mode) &&
-                    (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH ||
-                     S_ISLNK(la->la_mode)))
-                        /* If client supports this, do not return open handle
-                        *  for special nodes */
-                        RETURN(0);
-
-                /* FIXME:maybe this can be done earlier? */
-                if (S_ISDIR(la->la_mode)) {
-                        if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
-                                /* we are trying to create or 
-                                 * write an existing dir. */
-                                rc = -EISDIR;
-                        }
-                } else if (flags & MDS_OPEN_DIRECTORY) 
-                        rc = -ENOTDIR;
+                rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
+                if (rc)
+                        RETURN(rc);
         }
-        if (rc != 0)
-                RETURN(rc);
+        isreg = S_ISREG(la->la_mode);
+        isdir = S_ISDIR(la->la_mode);
+        islnk = S_ISLNK(la->la_mode);
         if (ma->ma_valid & MA_INODE)
                 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
+
+        /* if we are following a symlink, don't open
+         * do not return open handle for special nodes as client required
+         */
+        if (islnk || (!isreg && !isdir &&
+            (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) {
+                info->mti_trans_flags |= MDT_NONEED_TANSNO; 
+                RETURN(0);
+        }
+        /* FIXME:maybe this can be done earlier? */
+        if (isdir) {
+                if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) {
+                        /* we are trying to create or
+                         * write an existing dir. */
+                        RETURN(-EISDIR);
+                }
+        } else if (flags & MDS_OPEN_DIRECTORY)
+                RETURN(-ENOTDIR);
+
+        if ((isreg) && !(ma->ma_valid & MA_LOV)) {
+                /*No EA, check whether it is will set regEA and dirEA
+                 *since in above attr get, these size might be zero,
+                 *so reset it, to retrieve the MD after create obj*/
+                ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+                                                       &RMF_MDT_MD,
+                                                       RCL_SERVER);
+                LASSERT(p != NULL);
+                /*XXX: Tom, do we need this?
+                rc = mdt_create_data_obj(info, p, o);
+                if (rc)
+                        RETURN(rc);
+                */
+        }
+
+        CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n", 
+                        ma->ma_valid, ma->ma_lmm_size);
+        repbody->eadatasize = 0;
+        repbody->aclsize = 0;
+
         if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
                 repbody->eadatasize = ma->ma_lmm_size;
-                if (S_ISDIR(la->la_mode))
+                if (isdir)
                         repbody->valid |= OBD_MD_FLDIREA;
                 else
                         repbody->valid |= OBD_MD_FLEASIZE;
         }
+        /*FIXME: should determine the offset dynamicly, 
+         *did not get ACL before shrink*/
+        lustre_shrink_reply(req, 2, repbody->eadatasize, 1);
+        lustre_shrink_reply(req, repbody->eadatasize ? 3 : 2, repbody->aclsize,
+                            0);
+
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
 
         if (flags & FMODE_WRITE) {
-                /*mds_get_write_access*/
+                /* FIXME: in recovery, need to pass old epoch here */
+                rc = mdt_get_write_access(mdt, o, 0);
+                if (rc == 0)
+                        repbody->io_epoch = o->mot_io_epoch;
         } else if (flags & MDS_FMODE_EXEC) {
-                /*mds_deny_write_access*/
+                rc = mdt_deny_write_access(mdt, o);
+        }
+        if (rc)
+                RETURN(rc);
+
+        /* (1) client wants transno when open to keep a ref count for replay;
+         *     see after_reply() and mdc_close_commit();
+         * (2) we need to record the transaction related stuff onto disk;
+         * But, question is: when do a rean only open, do we still need transno?
+         */
+        if (!created) {
+                struct txn_param txn;
+                struct thandle *th;
+                struct dt_device *dt = info->mti_mdt->mdt_bottom;
+                txn.tp_credits = 1;
+
+                LASSERT(dt);
+                th = dt->dd_ops->dt_trans_start(info->mti_ctxt, dt, &txn);
+                if (!IS_ERR(th))
+                        dt->dd_ops->dt_trans_stop(info->mti_ctxt, th);
+                else
+                        RETURN(PTR_ERR(th));
         }
 
         mfd = mdt_mfd_new();
@@ -152,12 +289,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 mfd->mfd_object = o;
                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
 
+                med = &req->rq_export->exp_mdt_data;
                 spin_lock(&med->med_open_lock);
                 list_add(&mfd->mfd_list, &med->med_open_head);
                 spin_unlock(&med->med_open_lock);
 
                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
-        } else 
+        } else
                 rc = -ENOMEM;
 
         RETURN(rc);
@@ -173,20 +311,21 @@ int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid,
 
         o = mdt_object_find(info->mti_ctxt, info->mti_mdt, fid);
         if (!IS_ERR(o)) {
-                if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
+                if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu) > 0) {
                         if (la->la_flags & MDS_OPEN_EXCL &&
                             la->la_flags & MDS_OPEN_CREAT)
                                 rc = -EEXIST;
-                        else 
-                                rc = mdt_mfd_open(info, o, flags, 0);
+                        else
+                                rc = mdt_mfd_open(info, NULL, o, flags, 0);
                 } else {
                         rc = -ENOENT;
                         if (la->la_flags & MDS_OPEN_CREAT) {
-                                rc = mo_object_create(info->mti_ctxt, 
+                                rc = mo_object_create(info->mti_ctxt,
                                                       mdt_object_child(o),
+                                                      &info->mti_spec,
                                                       &info->mti_attr);
                                 if (rc == 0)
-                                        rc = mdt_mfd_open(info, o, flags, 1);
+                                        rc = mdt_mfd_open(info, NULL, o, flags, 1);
                         }
                 }
                 mdt_object_put(info->mti_ctxt, o);
@@ -201,7 +340,7 @@ int mdt_pin(struct mdt_thread_info* info)
         struct mdt_body *body;
         int rc;
         ENTRY;
-        
+
         rc = req_capsule_pack(&info->mti_pill);
         if (rc == 0) {
                 body = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
@@ -224,12 +363,16 @@ int mdt_reint_open(struct mdt_thread_info *info)
         int                     created = 0;
         struct mdt_reint_record *rr = &info->mti_rr;
         ENTRY;
-        
-        ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
-                                            &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
-                                               &RMF_MDT_MD,
-                                               RCL_SERVER);
+
+        req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD, RCL_SERVER,
+                             mdt->mdt_max_mdsize);
+
+        result = req_capsule_pack(&info->mti_pill);
+        if (result)
+                RETURN(result);
+
+        ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
+        ma->ma_lmm_size = mdt->mdt_max_mdsize;
 
         if (rr->rr_name[0] == 0) {
                 /* reint partial remote open */
@@ -240,15 +383,19 @@ int mdt_reint_open(struct mdt_thread_info *info)
         /*TODO: remove this and add MDS_CHECK_RESENT if resent enabled*/
         LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
 
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        CDEBUG(D_INODE, "I am going to create "DFID3"/("DFID3":%s) flag=%x\n",
+                        PFID3(rr->rr_fid1), PFID3(rr->rr_fid2), 
+                        rr->rr_name, la->la_flags);
 
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
         intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+
         lh = &info->mti_lh[MDT_LH_PARENT];
         if (!(la->la_flags & MDS_OPEN_CREAT))
-                lh->mlh_mode = LCK_PR;
+                lh->mlh_mode = LCK_CR;
         else
-                lh->mlh_mode = LCK_PW;
-        parent = mdt_object_find_lock(info, rr->rr_fid1, lh, 
+                lh->mlh_mode = LCK_EX;
+        parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
                                       MDS_INODELOCK_UPDATE);
         if (IS_ERR(parent))
                 GOTO(out, result = PTR_ERR(parent));
@@ -266,8 +413,8 @@ int mdt_reint_open(struct mdt_thread_info *info)
                 /* new object will be created. see the following */
         } else {
                 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
-                if (la->la_flags & MDS_OPEN_EXCL &&
-                    la->la_flags & MDS_OPEN_CREAT)
+                if ((la->la_flags & MDS_OPEN_EXCL &&
+                         la->la_flags & MDS_OPEN_CREAT))
                         GOTO(out_parent, result = -EEXIST);
         }
 
@@ -281,7 +428,8 @@ int mdt_reint_open(struct mdt_thread_info *info)
                                     mdt_object_child(parent),
                                     rr->rr_name,
                                     mdt_object_child(child),
-                                    rr->rr_tgt,
+                                    &info->mti_spec,
+                                    /* rr->rr_tgt, rr->rr_eadata, rr->rr_eadatalen,*/
                                     &info->mti_attr);
                 intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
                 if (result != 0)
@@ -290,8 +438,7 @@ int mdt_reint_open(struct mdt_thread_info *info)
         }
 
         /* Open it now. */
-        result = mdt_mfd_open(info, child, la->la_flags, created);
-        intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+        result = mdt_mfd_open(info, parent, child, la->la_flags, created);
         GOTO(finish_open, result);
 
 finish_open:
@@ -301,24 +448,25 @@ finish_open:
                                      &info->mti_attr);
                 if (rc2 != 0)
                         CERROR("error in cleanup of open");
-        } 
+        }
 out_child:
         mdt_object_put(info->mti_ctxt, child);
 out_parent:
-        mdt_object_unlock_put(info, parent, lh);
+        mdt_object_unlock_put(info, parent, lh, result);
 out:
         return result;
 }
 
-int mdt_mfd_close(const struct lu_context *ctxt,
-                  struct mdt_file_data *mfd)
+void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
+                   struct mdt_file_data *mfd)
 {
+        struct mdt_object *o = mfd->mfd_object;
         ENTRY;
 
         if (mfd->mfd_mode & FMODE_WRITE) {
-                /*mdt_put_write_access*/
+                mdt_put_write_access(mdt, o);
         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
-                /*mdt_allow_write_access*/
+                mdt_allow_write_access(o);
         }
 
         /* release reference on this object.
@@ -327,13 +475,15 @@ int mdt_mfd_close(const struct lu_context *ctxt,
         mdt_object_put(ctxt, mfd->mfd_object);
 
         mdt_mfd_free(mfd);
-        RETURN(0);
+        EXIT;
 }
 
 int mdt_close(struct mdt_thread_info *info)
 {
+        struct md_attr         *ma = &info->mti_attr;
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
+        struct mdt_object      *o;
         int rc;
         ENTRY;
 
@@ -344,25 +494,36 @@ int mdt_close(struct mdt_thread_info *info)
         if (mfd == NULL) {
                 spin_unlock(&med->med_open_lock);
                 CDEBUG(D_INODE, "no handle for file close: fid = "DFID3
-                       ": cookie = "LPX64, PFID3(&info->mti_body->fid1),
+                       ": cookie = "LPX64"\n", PFID3(&info->mti_body->fid1),
                        info->mti_body->handle.cookie);
                 rc = -ESTALE;
         } else {
                 class_handle_unhash(&mfd->mfd_handle);
                 list_del_init(&mfd->mfd_list);
                 spin_unlock(&med->med_open_lock);
-        
-                rc = mdt_handle_last_unlink(info, mfd->mfd_object,
-                                            &RQF_MDS_CLOSE_LAST);
 
-                mdt_mfd_close(info->mti_ctxt, mfd);
+                o = mfd->mfd_object;
+                ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
+                                                    &RMF_MDT_MD);
+                ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
+                                                       &RMF_MDT_MD, RCL_SERVER);
+                rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
+                if (rc == 0)
+                        rc = mdt_handle_last_unlink(info, o, ma);
+
+                mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
         }
+        mdt_shrink_reply(info);
         RETURN(rc);
 }
 
 int mdt_done_writing(struct mdt_thread_info *info)
 {
+        int rc;
         ENTRY;
 
+        req_capsule_set(&info->mti_pill, &RQF_MDS_DONE_WRITING);
+        rc = req_capsule_pack(&info->mti_pill);
+
         RETURN(0);
 }