Whamcloud - gitweb
b=22768 mdt open error handling
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 4ce9a49..2aef7e7 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -85,8 +85,7 @@ struct mdt_file_data *mdt_handle2mfd(struct mdt_thread_info *info,
         LASSERT(handle != NULL);
         mfd = class_handle2object(handle->cookie);
         /* during dw/setattr replay the mfd can be found by old handle */
-        if (mfd == NULL &&
-            lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+        if (mfd == NULL && req_is_replay(req)) {
                 struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
                 cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
                         if (mfd->mfd_old_handle.cookie == handle->cookie)
@@ -231,7 +230,7 @@ static int mdt_som_attr_set(struct mdt_thread_info *info,
                 ma->ma_som->msd_size = la->la_valid & LA_SIZE ? la->la_size : 0;
                 ma->ma_som->msd_blocks = la->la_valid & LA_BLOCKS ?
                                          la->la_blocks : 0;
-                ma->ma_som->msd_mountid = mdt->mdt_mount_count;
+                ma->ma_som->msd_mountid = mdt->mdt_lut.lut_mount_count;
                 ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
         } else {
                 ma->ma_som->msd_ioepoch = IOEPOCH_INVAL;
@@ -557,17 +556,17 @@ static void mdt_empty_transno(struct mdt_thread_info* info)
                 return;
         }
 
-        cfs_spin_lock(&mdt->mdt_transno_lock);
+        cfs_spin_lock(&mdt->mdt_lut.lut_translock);
         if (info->mti_transno == 0) {
-                info->mti_transno = ++ mdt->mdt_last_transno;
+                info->mti_transno = ++ mdt->mdt_lut.lut_last_transno;
         } else {
                 /* should be replay */
-                if (info->mti_transno > mdt->mdt_last_transno)
-                        mdt->mdt_last_transno = info->mti_transno;
+                if (info->mti_transno > mdt->mdt_lut.lut_last_transno)
+                        mdt->mdt_lut.lut_last_transno = info->mti_transno;
         }
-        cfs_spin_unlock(&mdt->mdt_transno_lock);
+        cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
 
-        CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
+        CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
                         info->mti_transno,
                         req->rq_export->exp_obd->obd_last_committed);
 
@@ -667,7 +666,7 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                 mfd->mfd_xid = req->rq_xid;
 
                 /* replay handle */
-                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                if (req_is_replay(req)) {
                         struct mdt_file_data *old_mfd;
                         /* Check wheather old cookie already exist in
                          * the list, becasue when do recovery, client
@@ -883,8 +882,8 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         struct mdt_device       *mdt  = info->mti_mdt;
         struct req_capsule      *pill = info->mti_pill;
         struct ptlrpc_request   *req  = mdt_info_req(info);
-        struct mdt_export_data  *med  = &req->rq_export->exp_mdt_data;
-        struct lsd_client_data  *lcd  = med->med_lcd;
+        struct tg_export_data   *ted  = &req->rq_export->exp_target_data;
+        struct lsd_client_data  *lcd  = ted->ted_lcd;
         struct md_attr          *ma   = &info->mti_attr;
         struct mdt_reint_record *rr   = &info->mti_rr;
         __u32                   flags = info->mti_spec.sp_cr_flags;
@@ -908,7 +907,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
 
         ma->ma_valid = 0;
 
-        mdt_req_from_lcd(req, med->med_lcd);
+        mdt_req_from_lcd(req, lcd);
         mdt_set_disposition(info, ldlm_rep, lcd->lcd_last_data);
 
         CERROR("This is reconstruct open: disp="LPX64", result=%d\n",
@@ -1079,7 +1078,7 @@ static int mdt_open_anon_by_fid(struct mdt_thread_info* info,
                 mdt_set_disposition(info, rep, DISP_OPEN_LOCK);
         rc = mdt_finish_open(info, NULL, o, flags, 0, rep);
 
-        if (!(flags & MDS_OPEN_LOCK))
+        if (!(flags & MDS_OPEN_LOCK) || rc)
                 mdt_object_unlock(info, o, lhc, 1);
 
         GOTO(out, rc);
@@ -1194,7 +1193,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                PFID(rr->rr_fid2), create_flags,
                ma->ma_attr.la_mode, msg_flags);
 
-        if (msg_flags & MSG_REPLAY ||
+        if (req_is_replay(req) ||
             (req->rq_export->exp_libclient && create_flags&MDS_OPEN_HAS_EA)) {
                 /* This is a replay request or from liblustre with ea. */
                 result = mdt_open_by_fid(info, ldlm_rep);
@@ -1244,10 +1243,14 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         if (IS_ERR(parent))
                 GOTO(out, result = PTR_ERR(parent));
 
+        /* get and check version of parent */
+        result = mdt_version_get_check(info, parent, 0);
+        if (result)
+                GOTO(out_parent, result);
+
         fid_zero(child_fid);
 
         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-
         result = mdo_lookup(info->mti_env, mdt_object_child(parent),
                             lname, child_fid, &info->mti_spec);
         LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
@@ -1284,17 +1287,23 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         if (IS_ERR(child))
                 GOTO(out_parent, result = PTR_ERR(child));
 
+        /** check version of child  */
+        rc = mdt_version_get_check(info, child, 1);
+        if (rc)
+                GOTO(out_child, result = rc);
+
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
+                /* save versions in reply */
+                mdt_version_get_save(info, parent, 0);
+                mdt_version_get_save(info, child, 1);
+
+                /* version of child will be changed */
+                info->mti_mos = child;
+
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
 
-                info->mti_mos[0] = parent;
-                info->mti_mos[1] = child;
-                result = mdt_version_get_check(info, 0);
-                if (result)
-                        GOTO(out_child, result);
-
                 /* Let lower layers know what is lock mode on directory. */
                 info->mti_spec.sp_cr_mode =
                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
@@ -1369,10 +1378,9 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         LASSERT(!lustre_handle_is_used(&lhc->mlh_reg_lh));
 
         /* get openlock if this is not replay and if a client requested it */
-        if (!(msg_flags & MSG_REPLAY) && create_flags & MDS_OPEN_LOCK) {
+        if (!req_is_replay(req) && create_flags & MDS_OPEN_LOCK) {
                 ldlm_mode_t lm;
 
-                LASSERT(!created);
                 if (create_flags & FMODE_WRITE)
                         lm = LCK_CW;
                 else if (create_flags & MDS_FMODE_EXEC)
@@ -1420,7 +1428,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 out_child:
         mdt_object_put(info->mti_env, child);
 out_parent:
-        mdt_object_unlock_put(info, parent, lh, result);
+        mdt_object_unlock_put(info, parent, lh, result || !created);
 out:
         if (result && result != -EREMOTE)
                 lustre_msg_set_transno(req->rq_repmsg, 0);
@@ -1653,13 +1661,13 @@ int mdt_done_writing(struct mdt_thread_info *info)
         info->mti_attr.ma_valid = 0;
 
         info->mti_attr.ma_lmm_size = info->mti_mdt->mdt_max_mdsize;
-        OBD_ALLOC(info->mti_attr.ma_lmm, info->mti_attr.ma_lmm_size);
+        OBD_ALLOC(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
         if (info->mti_attr.ma_lmm == NULL)
                 RETURN(-ENOMEM);
 
         rc = mdt_mfd_close(info, mfd);
 
-        OBD_FREE(info->mti_attr.ma_lmm, info->mti_attr.ma_lmm_size);
+        OBD_FREE(info->mti_attr.ma_lmm, info->mti_mdt->mdt_max_mdsize);
         mdt_empty_transno(info);
         RETURN(rc);
 }