Whamcloud - gitweb
changes to protect directory splitting from concurrent modifies/lookups:
[fs/lustre-release.git] / lustre / mds / mds_reint.c
index 57f7c99..fe82cf2 100644 (file)
@@ -381,6 +381,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         struct dentry *de;
         struct inode *inode = NULL;
         struct lustre_handle lockh[2] = {{0}, {0}};
+        int parent_mode;
         void *handle = NULL;
         struct mds_logcancel_data *mlcd = NULL;
         int rc = 0, cleanup_phase = 0, err, locked = 0;
@@ -402,7 +403,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 if (rec->ur_iattr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID) )
                         lockpart |= MDS_INODELOCK_LOOKUP;
                 de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
-                                           lockh, NULL, 0, lockpart);
+                                           lockh, &parent_mode, NULL, 0, lockpart);
                 if (IS_ERR(de))
                         GOTO(cleanup, rc = PTR_ERR(de));
                 locked = 1;
@@ -502,7 +503,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 if (locked) {
 #ifdef S_PDIROPS
                         if (lockh[1].cookie != 0)
-                                ldlm_lock_decref(lockh + 1, LCK_CW);
+                                ldlm_lock_decref(lockh + 1, parent_mode);
 #endif
                         if (rc) {
                                 ldlm_lock_decref(lockh, LCK_PW);
@@ -557,6 +558,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         struct inode *dir = NULL;
         void *handle = NULL;
         struct lustre_handle lockh[2] = {{0}, {0}};
+        int parent_mode;
         int rc = 0, err, type = rec->ur_mode & S_IFMT, cleanup_phase = 0;
         int created = 0;
         struct dentry_params dp;
@@ -577,8 +579,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc = -ESTALE);
 
         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, lockh,
-                                        rec->ur_name, rec->ur_namelen - 1,
-                                        MDS_INODELOCK_UPDATE);
+                                        &parent_mode, rec->ur_name,
+                                        rec->ur_namelen - 1, MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
                 rc = PTR_ERR(dparent);
                 CERROR("parent lookup error %d\n", rc);
@@ -858,7 +860,7 @@ cleanup:
         case 1: /* locked parent dentry */
 #ifdef S_PDIROPS
                 if (lockh[1].cookie != 0)
-                        ldlm_lock_decref(lockh + 1, LCK_CW);
+                        ldlm_lock_decref(lockh + 1, parent_mode);
 #endif
                 if (rc) {
                         ldlm_lock_decref(lockh, LCK_PW);
@@ -1185,7 +1187,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 struct ll_fid *fid,
                                 struct lustre_handle *parent_lockh,
                                 struct dentry **dparentp, int parent_mode,
-                                __u64 parent_lockpart,
+                                __u64 parent_lockpart, int *update_mode,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
                                 struct dentry **dchildp, int child_mode,
@@ -1215,23 +1217,19 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
 #ifdef S_PDIROPS
         parent_lockh[1].cookie = 0;
         if (name && IS_PDIROPS((*dparentp)->d_inode)) {
-                /* lock just dir { ino, generation } to flush client cache */
-                if (parent_mode == LCK_PW) {
-                        struct ldlm_res_id res_id = { .name = {0} };
-                        ldlm_policy_data_t policy;
-                        int flags = 0;
-                        res_id.name[0] = (*dparentp)->d_inode->i_ino;
-                        res_id.name[1] = (*dparentp)->d_inode->i_generation;
-                        policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
-                                              res_id, LDLM_IBITS,
-                                              &policy, LCK_CW, &flags,
-                                              mds_blocking_ast,
-                                              ldlm_completion_ast, NULL, NULL,
-                                              NULL, 0, NULL, parent_lockh+1);
-                        if (rc != ELDLM_OK)
-                                RETURN(-ENOLCK);
-                }
+                struct ldlm_res_id res_id = { .name = {0} };
+                ldlm_policy_data_t policy;
+                int flags = 0;
+                *update_mode = mds_lock_mode_for_dir(obd, *dparentp, parent_mode);
+                res_id.name[0] = (*dparentp)->d_inode->i_ino;
+                res_id.name[1] = (*dparentp)->d_inode->i_generation;
+                policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
+                                      LDLM_IBITS, &policy, *update_mode, &flags,
+                                      mds_blocking_ast, ldlm_completion_ast,
+                                      NULL, NULL, NULL, 0, NULL, parent_lockh+1);
+                if (rc != ELDLM_OK)
+                        RETURN(-ENOLCK);
 
                 parent_res_id.name[2] = full_name_hash(name, namelen - 1);
                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
@@ -1312,7 +1310,7 @@ cleanup:
                 case 1:
 #ifdef S_PDIROPS
                         if (parent_lockh[1].cookie)
-                                ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                                ldlm_lock_decref(parent_lockh + 1, *update_mode);
 #endif
                         l_dput(*dparentp);
                 default: ;
@@ -1371,7 +1369,8 @@ int mds_create_local_dentry(struct mds_update_record *rec,
         d_drop(new_child);
 
         child = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX,
-                                      lockh, NULL, 0, MDS_INODELOCK_UPDATE);
+                                      lockh, NULL, NULL, 0,
+                                      MDS_INODELOCK_UPDATE);
         if (IS_ERR(child)) {
                 CERROR("can't get victim\n");
                 GOTO(cleanup, rc = PTR_ERR(child));
@@ -1475,6 +1474,7 @@ static int mds_copy_unlink_reply(struct ptlrpc_request *master,
 static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
                                    struct ptlrpc_request *req,
                                    struct lustre_handle *parent_lockh,
+                                   int update_mode,
                                    struct dentry *dparent,
                                    struct lustre_handle *child_lockh,
                                    struct dentry *dchild)
@@ -1510,7 +1510,7 @@ static int mds_reint_unlink_remote(struct mds_update_record *rec, int offset,
 
 #ifdef S_PDIROPS
         if (parent_lockh[1].cookie != 0)
-                ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
         ldlm_lock_decref(child_lockh, LCK_EX);
         if (rc)
@@ -1538,6 +1538,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         void *handle = NULL;
         int rc = 0, log_unlink = 0, cleanup_phase = 0;
         int unlink_by_fid = 0;
+        int update_mode;
         ENTRY;
 
         LASSERT(offset == 0 || offset == 2);
@@ -1559,7 +1560,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         }
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
                                          parent_lockh, &dparent, LCK_PW,
-                                         MDS_INODELOCK_UPDATE,
+                                         MDS_INODELOCK_UPDATE, &update_mode,
                                          rec->ur_name, rec->ur_namelen,
                                          &child_lockh, &dchild, LCK_EX,
                                          MDS_INODELOCK_LOOKUP|MDS_INODELOCK_UPDATE);
@@ -1571,7 +1572,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 LASSERT(unlink_by_fid == 0);
                 LASSERT(dchild->d_mdsnum != mds->mds_num);
                 mds_reint_unlink_remote(rec, offset, req, parent_lockh,
-                                             dparent, &child_lockh, dchild);
+                                        update_mode, dparent, &child_lockh, dchild);
                 RETURN(0);
         }
 
@@ -1735,7 +1736,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         case 1: /* child and parent dentry, parent lock */
 #ifdef S_PDIROPS
                 if (parent_lockh[1].cookie != 0)
-                        ldlm_lock_decref(parent_lockh + 1, LCK_CW);
+                        ldlm_lock_decref(parent_lockh + 1, update_mode);
 #endif
                 if (rc)
                         ldlm_lock_decref(parent_lockh, LCK_PW);
@@ -1843,6 +1844,7 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec,
         int rc = 0, cleanup_phase = 0;
         struct mdc_op_data op_data;
         struct ptlrpc_request *request = NULL;
+        int update_mode;
         ENTRY;
 
 #define fmt     "%s: request to link %u/%u/%u:%*s to foreign inode %u/%u/%u\n"
@@ -1856,8 +1858,8 @@ static int mds_reint_link_to_remote(struct mds_update_record *rec,
                   (unsigned)rec->ur_fid1->generation);
 
         de_tgt_dir = mds_fid2locked_dentry(obd, rec->ur_fid2, NULL, LCK_EX,
-                                           tgt_dir_lockh, rec->ur_name,
-                                           rec->ur_namelen - 1,
+                                           tgt_dir_lockh, &update_mode,
+                                           rec->ur_name, rec->ur_namelen - 1,
                                            MDS_INODELOCK_UPDATE);
         if (IS_ERR(de_tgt_dir))
                 GOTO(cleanup, rc = PTR_ERR(de_tgt_dir));
@@ -1903,12 +1905,12 @@ cleanup:
                         if (rc) {
                                 ldlm_lock_decref(tgt_dir_lockh, LCK_EX);
 #ifdef S_PDIROPS
-                                ldlm_lock_decref(tgt_dir_lockh + 1, LCK_CW);
+                                ldlm_lock_decref(tgt_dir_lockh + 1, update_mode);
 #endif
                         } else {
                                 ptlrpc_save_lock(req, tgt_dir_lockh, LCK_EX);
 #ifdef S_PDIROPS
-                                ptlrpc_save_lock(req, tgt_dir_lockh + 1, LCK_CW);
+                                ptlrpc_save_lock(req, tgt_dir_lockh+1, update_mode);
 #endif
                         }
                         l_dput(de_tgt_dir);
@@ -1937,8 +1939,8 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         ldlm_policy_data_t src_policy ={.l_inodebits = {MDS_INODELOCK_UPDATE}};
         ldlm_policy_data_t tgt_dir_policy =
                                        {.l_inodebits = {MDS_INODELOCK_UPDATE}};
-
         int rc = 0, cleanup_phase = 0;
+        int update_mode = 0;
         ENTRY;
 
         LASSERT(offset == 0);
@@ -1991,10 +1993,10 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
 #ifdef S_PDIROPS
         if (IS_PDIROPS(de_tgt_dir->d_inode)) {
                 int flags = 0;
-                /* Get a temp lock on just ino, gen to flush client cache */
+                update_mode = mds_lock_mode_for_dir(obd, de_tgt_dir, LCK_EX);
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
                                       tgt_dir_res_id, LDLM_IBITS, &src_policy,
-                                      LCK_CW, &flags, mds_blocking_ast,
+                                      update_mode, &flags, mds_blocking_ast,
                                       ldlm_completion_ast, NULL, NULL,
                                       NULL, 0, NULL, tgt_dir_lockh + 1);
                 if (rc != ELDLM_OK)
@@ -2065,8 +2067,8 @@ cleanup:
                 }
         case 2: /* target dentry */
 #ifdef S_PDIROPS
-                if (tgt_dir_lockh[1].cookie)
-                        ldlm_lock_decref(tgt_dir_lockh + 1, LCK_CW);
+                if (tgt_dir_lockh[1].cookie && update_mode)
+                        ldlm_lock_decref(tgt_dir_lockh + 1, update_mode);
 #endif
                 if (de_tgt_dir)
                         l_dput(de_tgt_dir);
@@ -2216,10 +2218,11 @@ static int mds_get_parents_children_locked(struct obd_device *obd,
         dlm_handles[5].cookie = 0;
         dlm_handles[6].cookie = 0;
         if (IS_PDIROPS((*de_srcdirp)->d_inode)) {
-                /* Get a temp lock on just ino, gen to flush client cache */
+                /* Get a temp lock on just ino, gen to flush client cache and
+                 * to protect dirs from concurrent splitting */
                 rc = enqueue_ordered_locks(obd, &p1_res_id, &(dlm_handles[5]),
-                                           LCK_CW, &p_policy, &p2_res_id,
-                                           &(dlm_handles[6]),LCK_CW,&p_policy);
+                                           LCK_PW, &p_policy, &p2_res_id,
+                                           &(dlm_handles[6]),LCK_PW,&p_policy);
                 if (rc != ELDLM_OK)
                         GOTO(cleanup, rc);
 
@@ -2376,7 +2379,7 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec,
         struct lustre_handle child_lockh = {0};
         int cleanup_phase = 0;
         void *handle = NULL;
-        int rc = 0;
+        int update_mode, rc = 0;
         ENTRY;
 
         /* another MDS executing rename operation has asked us
@@ -2393,7 +2396,7 @@ static int mds_reint_rename_create_name(struct mds_update_record *rec,
         child_lockh.cookie = 0;
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid2, parent_lockh,
                                          &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
-                                         rec->ur_tgt, rec->ur_tgtlen,
+                                         &update_mode, rec->ur_tgt, rec->ur_tgtlen,
                                          &child_lockh, &de_new, LCK_EX,
                                          MDS_INODELOCK_LOOKUP);
         if (rc)
@@ -2449,7 +2452,7 @@ cleanup:
                 case 1:
 #ifdef S_PDIROPS
                         if (parent_lockh[1].cookie != 0)
-                                ldlm_lock_decref(&parent_lockh[1], LCK_CW);
+                                ldlm_lock_decref(&parent_lockh[1], update_mode);
 #endif
                         ldlm_lock_decref(&parent_lockh[0], LCK_PW);
                         if (child_lockh.cookie != 0)
@@ -2478,7 +2481,7 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
         struct lustre_handle child_lockh = {0};
         struct mdc_op_data opdata;
         void *handle = NULL;
-        int rc = 0;
+        int update_mode, rc = 0;
         ENTRY;
 
         CDEBUG(D_OTHER, "%s: move name %s onto another mds%u\n",
@@ -2488,9 +2491,9 @@ static int mds_reint_rename_to_remote(struct mds_update_record *rec, int offset,
         child_lockh.cookie = 0;
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, parent_lockh,
                                          &de_srcdir,LCK_PW,MDS_INODELOCK_UPDATE,
-                                         rec->ur_name, rec->ur_namelen,
-                                         &child_lockh, &de_old, LCK_EX,
-                                         MDS_INODELOCK_LOOKUP);
+                                         &update_mode, rec->ur_name, 
+                                         rec->ur_namelen, &child_lockh, &de_old,
+                                         LCK_EX, MDS_INODELOCK_LOOKUP);
         LASSERT(rc == 0);
         LASSERT(de_srcdir);
         LASSERT(de_srcdir->d_inode);
@@ -2539,7 +2542,7 @@ cleanup:
 
 #ifdef S_PDIROPS
         if (parent_lockh[1].cookie != 0)
-                ldlm_lock_decref(&parent_lockh[1], LCK_CW);
+                ldlm_lock_decref(&parent_lockh[1], update_mode);
 #endif
         ldlm_lock_decref(&parent_lockh[0], LCK_PW);
         if (child_lockh.cookie != 0)
@@ -2669,9 +2672,9 @@ cleanup:
         case 1:
 #ifdef S_PDIROPS
                 if (dlm_handles[5].cookie != 0)
-                        ldlm_lock_decref(&(dlm_handles[5]), LCK_CW);
+                        ldlm_lock_decref(&(dlm_handles[5]), LCK_PW);
                 if (dlm_handles[6].cookie != 0)
-                        ldlm_lock_decref(&(dlm_handles[6]), LCK_CW);
+                        ldlm_lock_decref(&(dlm_handles[6]), LCK_PW);
 #endif
                 if (rc) {
                         if (lock_count == 4)