Whamcloud - gitweb
Remove global atomic counter for MDS opens, it just slows things down.
[fs/lustre-release.git] / lustre / mds / mds_open.c
index 2f3e7d0..244b593 100644 (file)
@@ -389,7 +389,11 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
                                            0, &lsm, rec->ur_eadata);
                         if (rc)
                                 GOTO(out_oa, rc);
-                }
+                } else {
+                        /* Per-directory striping default code removed, because
+                         * it uses the same unnamed EA storage as the directory
+                         * striping for CMD. -p */
+                } 
                 LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS);
                 rc = obd_create(mds->mds_osc_exp, oa, &lsm, &oti);
                 if (rc) {
@@ -619,8 +623,8 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
         /* atomically create objects if necessary */
         down(&dchild->d_inode->i_sem);
         mode = dchild->d_inode->i_mode;
-        if ((S_ISREG(mode) && (body->valid & OBD_MD_FLEASIZE)) || 
-            (S_ISDIR(mode) && (body->valid & OBD_MD_FLDIREA))) {
+        if ((S_ISREG(mode) && !(body->valid & OBD_MD_FLEASIZE)) || 
+            (S_ISDIR(mode) && !(body->valid & OBD_MD_FLDIREA))) {
                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
                                  dchild->d_inode, 0);
                 if (rc) {
@@ -637,8 +641,20 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                         up(&dchild->d_inode->i_sem);
                         RETURN(rc);
                 }
+                if (S_ISREG(dchild->d_inode->i_mode) &&
+                    (body->valid & OBD_MD_FLEASIZE)) {
+                        rc = mds_revalidate_lov_ea(obd, dchild->d_inode,
+                                                   req->rq_repmsg, 2);
+                        if (!rc)
+                                rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
+                                                 dchild->d_inode, 0);
+                        if (rc) {
+                                up(&dchild->d_inode->i_sem);
+                                RETURN(rc);
+                        }
+                }
         }
-        /* If the inode has EA data, then OSTs hold size, mtime */
+        /* If the inode has no EA data, then MDSs hold size, mtime */
         if (S_ISREG(dchild->d_inode->i_mode) &&
             !(body->valid & OBD_MD_FLEASIZE)) {
                 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
@@ -802,6 +818,10 @@ int mds_open(struct mds_update_record *rec, int offset,
         int mea_size, update_mode;
         ENTRY;
 
+        DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %*s mode %o",
+                  rec->ur_fid1->id, rec->ur_fid1->generation,
+                  rec->ur_namelen - 1, rec->ur_name, rec->ur_mode);
+
         parent_lockh[0].cookie = 0;
         parent_lockh[1].cookie = 0;
 
@@ -821,8 +841,10 @@ int mds_open(struct mds_update_record *rec, int offset,
          * opened this file and is only replaying the RPC, so we open the
          * inode by fid (at some large expense in security). */
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
-                DEBUG_REQ(D_HA, req, "open replay, disp: "LPX64"\n",
-                          rep->lock_policy_res1);
+                DEBUG_REQ(D_HA, req, "open replay");
+                CDEBUG(D_HA, "open fid "LPU64"/%u name %*s mode %o\n",
+                          rec->ur_fid2->id, rec->ur_fid2->generation,
+                          rec->ur_namelen - 1, rec->ur_name, rec->ur_mode);
 
                 LASSERT(rec->ur_fid2->id);
 
@@ -893,9 +915,13 @@ int mds_open(struct mds_update_record *rec, int offset,
                  * should live at this MDS or at another one */
                 int i;
                 i = mea_name2idx(mea, rec->ur_name, rec->ur_namelen - 1);
-                if (mea->mea_master != i) {
-                        CERROR("inapropriate MDS(%d) for %s. should be %d\n",
-                                mea->mea_master, rec->ur_name, i);
+                if (mea->mea_master != mea->mea_fids[i].mds) {
+                        CDEBUG(D_OTHER,
+                               "%s: inapropriate MDS(%d) for %lu/%u:%s."
+                               " should be %d(%d)\n", obd->obd_name,
+                               mea->mea_master, dparent->d_inode->i_ino,
+                               dparent->d_inode->i_generation, rec->ur_name,
+                               mea->mea_fids[i].mds, i);
                         GOTO(cleanup, rc = -ERESTART);
                 }
         }
@@ -957,15 +983,16 @@ got_child:
                 unsigned long ino = rec->ur_fid2->id;
                 struct iattr iattr;
                 struct inode *inode;
-
-                if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) {
-                        if (rc > 0) {
-                                /* dir got splitted */
-                                GOTO(cleanup, rc = -ERESTART);
-                        } else {
-                                /* error happened during spitting */
-                                GOTO(cleanup, rc);
-                        }
+                rc = mds_try_to_split_dir(obd, dparent, &mea, 0, update_mode);
+                CDEBUG(D_OTHER, "%s: splitted %lu/%u - %d\n",
+                       obd->obd_name, dparent->d_inode->i_ino,
+                       dparent->d_inode->i_generation, rc);
+                if (rc > 0) {
+                        /* dir got splitted */
+                        GOTO(cleanup, rc = -ERESTART);
+                } else if (rc < 0) {
+                        /* error happened during spitting */
+                        GOTO(cleanup, rc);
                 }
 
                 if (!(rec->ur_flags & MDS_OPEN_CREAT)) {
@@ -1084,24 +1111,6 @@ got_child:
                 }
         }
 
-        if (rc == 0) {
-                struct ldlm_res_id res_id = { . name = {0} };
-                ldlm_policy_data_t policy;
-                int flags = 0;
-                res_id.name[0] = dchild->d_inode->i_ino;
-                res_id.name[1] = dchild->d_inode->i_generation;
-                policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP |
-                                                MDS_INODELOCK_UPDATE;
-                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                      res_id, LDLM_IBITS, &policy,
-                                      LCK_PR, &flags,
-                                      mds_blocking_ast,
-                                      ldlm_completion_ast, NULL, NULL,
-                                      NULL, 0, NULL, child_lockh);
-                if (rc == 0)
-                        cleanup_phase = 3;
-        }
-
         /* Step 5: mds_open it */
         rc = mds_finish_open(req, dchild, body, rec->ur_flags, &handle, rec,
                              rep);
@@ -1112,9 +1121,6 @@ got_child:
                                 req, rc, rep ? rep->lock_policy_res1 : 0);
 
         switch (cleanup_phase) {
-        case 3:
-                if (rc)
-                        ldlm_lock_decref(child_lockh, LCK_PR);
         case 2:
                 if (rc && created) {
                         int err = vfs_unlink(dparent->d_inode, dchild);
@@ -1124,7 +1130,9 @@ got_child:
                                        err);
                         }
                 } else if (created) {
+#if 0
                         mds_lock_new_child(obd, dchild->d_inode, NULL);
+#endif
                 }
                 l_dput(dchild);
         case 1:
@@ -1141,12 +1149,8 @@ got_child:
                 else
                         ptlrpc_save_lock (req, parent_lockh, parent_mode);
         }
-        if (rc == 0)
-                atomic_inc(&mds->mds_open_count);
         if (mea)
                 OBD_FREE(mea, mea_size);
-        if ((cleanup_phase != 3) && !rc)
-                rc = ENOLCK;
         RETURN(rc);
 }
 
@@ -1172,6 +1176,7 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
         struct mds_body *request_body = NULL, *reply_body = NULL;
         struct dentry_params dp;
         struct iattr iattr = { 0 };
+        struct llog_create_locks *lcl = NULL;
         ENTRY;
 
         if (req && req->rq_reqmsg != NULL)
@@ -1233,15 +1238,6 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
                         GOTO(cleanup, rc);
                 }
 
-                if (req != NULL && req->rq_repmsg != NULL &&
-                    (reply_body->valid & OBD_MD_FLEASIZE) &&
-                    mds_log_op_unlink(obd, pending_child->d_inode, lmm,
-                                      req->rq_repmsg->buflens[1],
-                                      lustre_msg_buf(req->rq_repmsg, 2, 0),
-                                      req->rq_repmsg->buflens[2]) > 0) {
-                        reply_body->valid |= OBD_MD_FLCOOKIE;
-                }
-
                 pending_child->d_fsdata = (void *) &dp;
                 dp.p_inum = 0;
                 dp.p_ptr = req;
@@ -1252,6 +1248,15 @@ int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
                 if (rc)
                         CERROR("error unlinking orphan %s: rc %d\n",fidname,rc);
 
+                if (req != NULL && req->rq_repmsg != NULL &&
+                    (reply_body->valid & OBD_MD_FLEASIZE) &&
+                    mds_log_op_unlink(obd, pending_child->d_inode,
+                                                lmm, req->rq_repmsg->buflens[1],
+                                                lustre_msg_buf(req->rq_repmsg, 2, 0),
+                                                req->rq_repmsg->buflens[2], &lcl) > 0) {
+                        reply_body->valid |= OBD_MD_FLCOOKIE;
+                }
+
                 goto out; /* Don't bother updating attrs on unlinked inode */
         }
 
@@ -1317,7 +1322,6 @@ out:
         mds_mfd_destroy(mfd);
 
  cleanup:
-        atomic_dec(&mds->mds_open_count);
         if (req != NULL && reply_body != NULL) {
                 rc = mds_finish_transno(mds, pending_dir, handle, req, rc, 0);
         } else if (handle) {
@@ -1331,6 +1335,8 @@ out:
 
         switch (cleanup_phase) {
         case 2:
+                if (lcl != NULL)
+                        ptlrpc_save_llog_lock(req, lcl);
                 dput(pending_child);
         case 1:
                 up(&pending_dir->i_sem);
@@ -1359,6 +1365,13 @@ int mds_close(struct ptlrpc_request *req)
                 MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
         }
 
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                DEBUG_REQ(D_HA, req, "close replay\n");
+                memcpy(lustre_msg_buf(req->rq_repmsg, 2, 0),
+                       lustre_msg_buf(req->rq_reqmsg, 1, 0),
+                       req->rq_repmsg->buflens[2]);
+        }
+
         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
         if (body == NULL) {
                 CERROR("Can't unpack body\n");