Whamcloud - gitweb
b=5492,5624,5654,5664,5672
authorphil <phil>
Wed, 2 Mar 2005 23:35:09 +0000 (23:35 +0000)
committerphil <phil>
Wed, 2 Mar 2005 23:35:09 +0000 (23:35 +0000)
Fundamentally changes the locking rules for the i_size and KMS.  The way in
which this was done is documented in bug 5654 comment #6, and I'll ask the
colibri team to either adopt or adapt this design, but in any case make it a
part of their documentation after they've merged it.

i_size and KMS sampling and updates are now protected by an lli_size_sem.
The reasons are many:

The truncate path has been reorganized not to hold this semaphore during RPCs,
which allows us to reverse the deadlock-inducing order with the ns_lock in
ll_pgcache_remove_extent. (bug 5492)

The introduction of the i_alloc_sem in 2.6 was wreaking havoc on our ability
to get our ordering right.  The truncate path was a festering boil of unlocking
and relocking which may well have been the source of other concurrency bugs.
Not using the i_sem for i_size updates eliminates this rat nest (bugs 5624,
5654)

Finally, the CMD team reported a similar inversion between a writing thread
and ptlrpcd (the writing thread has the i_sem, and won't release it until
ptlrpcd finishes the I/O; ptlrpcd wants the i_sem to finalize lock cancellation
after being evicted). (bugs 5664, 5672)

This has been running at NERSC for the last week, so I think it's ready for
more exposure.

lustre/ChangeLog
lustre/include/linux/lustre_lite.h
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/llite/rw.c
lustre/lov/lov_request.c
lustre/mds/mds_open.c

index 662945e..b7f610e 100644 (file)
@@ -33,6 +33,7 @@ tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        - avoid needless client->OST connect, fix handle mismatch (5317)
        - fix DLM error path that led to out-of-sync client, long delays (5779)
        - support common vfs-enforced mount options (nodev,nosuid,noexec) (5637)
+       - fix several locking issues related to i_size (5492,5624,5654,5672)
        * miscellania
        - service request history (4965)
        - put {ll,lov,osc}_async_page structs in a single slab (4699)
index f659ef8..9316230 100644 (file)
@@ -71,6 +71,7 @@ struct ll_inode_info {
         struct lov_stripe_md   *lli_smd;
         char                   *lli_symlink_name;
         struct semaphore        lli_open_sem;
+        struct semaphore        lli_size_sem;
         __u64                   lli_maxbytes;
         __u64                   lli_io_epoch;
         unsigned long           lli_flags;
index 1763524..aa9a996 100644 (file)
@@ -507,12 +507,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                         goto iput;
                 ll_pgcache_remove_extent(inode, lsm, lock, stripe);
 
-                /* grabbing the i_sem will wait for write() to complete.  ns
-                 * lock hold times should be very short as ast processing
-                 * requires them and has a short timeout.  so, i_sem before ns
-                 * lock.*/
-                down(&inode->i_sem);
                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
+                down(&lli->lli_size_sem);
                 kms = ldlm_extent_shift_kms(lock,
                                             lsm->lsm_oinfo[stripe].loi_kms);
 
@@ -520,8 +516,8 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                         LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
                                    lsm->lsm_oinfo[stripe].loi_kms, kms);
                 lsm->lsm_oinfo[stripe].loi_kms = kms;
+                up(&lli->lli_size_sem);
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                up(&inode->i_sem);
                 //ll_try_done_writing(inode);
         iput:
                 iput(inode);
@@ -671,8 +667,10 @@ int ll_glimpse_size(struct inode *inode)
                 RETURN(rc > 0 ? -EIO : rc);
         }
 
+        down(&lli->lli_size_sem);
         inode->i_size = lov_merge_size(lli->lli_smd, 0);
         inode->i_blocks = lov_merge_blocks(lli->lli_smd);
+        up(&lli->lli_size_sem);
         LTIME_S(inode->i_mtime) =
                 lov_merge_mtime(lli->lli_smd, LTIME_S(inode->i_mtime));
 
@@ -690,6 +688,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
                    int ast_flags)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_inode_info *lli = ll_i2info(inode);
         int rc;
         ENTRY;
 
@@ -724,9 +723,9 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
                  * when doing appending writes and effectively cancel the
                  * result of the truncate.  Getting the i_sem after the enqueue
                  * maintains the DLM -> i_sem acquiry order. */
-                down(&inode->i_sem);
+                down(&lli->lli_size_sem);
                 inode->i_size = lov_merge_size(lsm, 1);
-                up(&inode->i_sem);
+                up(&lli->lli_size_sem);
         }
 
         if (rc == 0)
@@ -787,15 +786,18 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         if (rc != 0)
                 RETURN(rc);
 
+        down(&lli->lli_size_sem);
         kms = lov_merge_size(lsm, 1);
         if (*ppos + count - 1 > kms) {
                 /* A glimpse is necessary to determine whether we return a short
                  * read or some zeroes at the end of the buffer */
+                up(&lli->lli_size_sem);
                 retval = ll_glimpse_size(inode);
                 if (retval)
                         goto out;
         } else {
                 inode->i_size = kms;
+                up(&lli->lli_size_sem);
         }
 
         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
@@ -1136,13 +1138,16 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_LLSEEK);
         if (origin == 2) { /* SEEK_END */
                 ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }};
+                struct ll_inode_info *lli = ll_i2info(inode);
                 int rc;
 
                 rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,0);
                 if (rc != 0)
                         RETURN(rc);
 
+                down(&lli->lli_size_sem);
                 offset += inode->i_size;
+                up(&lli->lli_size_sem);
         } else if (origin == 1) { /* SEEK_CUR */
                 offset += file->f_pos;
         }
@@ -1398,8 +1403,9 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it)
 int ll_getattr(struct vfsmount *mnt, struct dentry *de,
                struct lookup_intent *it, struct kstat *stat)
 {
-        int res = 0;
         struct inode *inode = de->d_inode;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        int res = 0;
 
         res = ll_inode_revalidate_it(de, it);
         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_GETATTR);
@@ -1417,9 +1423,13 @@ int ll_getattr(struct vfsmount *mnt, struct dentry *de,
         stat->atime = inode->i_atime;
         stat->mtime = inode->i_mtime;
         stat->ctime = inode->i_ctime;
-        stat->size = inode->i_size;
         stat->blksize = inode->i_blksize;
+
+        down(&lli->lli_size_sem);
+        stat->size = inode->i_size;
         stat->blocks = inode->i_blocks;
+        up(&lli->lli_size_sem);
+
         return 0;
 }
 #endif
index 9086073..b7a58c4 100644 (file)
@@ -397,6 +397,7 @@ void ll_options(char *options, char **ost, char **mdc, int *flags)
 void ll_lli_init(struct ll_inode_info *lli)
 {
         sema_init(&lli->lli_open_sem, 1);
+        sema_init(&lli->lli_size_sem, 1);
         lli->lli_flags = 0;
         lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
         spin_lock_init(&lli->lli_lock);
@@ -952,6 +953,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
          * inode ourselves so we can call obdo_from_inode() always. */
         if (ia_valid & (lsm ? ~(ATTR_SIZE | ATTR_FROM_OPEN | ATTR_RAW) : ~0)) {
                 struct lustre_md md;
+                int save_valid;
                 ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0);
 
                 rc = mdc_setattr(sbi->ll_mdc_exp, &op_data,
@@ -970,9 +972,16 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                         RETURN(rc);
                 }
 
-                /* Won't invoke vmtruncate as we already cleared ATTR_SIZE,
-                 * but needed to set timestamps backwards on utime. */
+                /* We call inode_setattr to adjust timestamps, but we first
+                 * clear ATTR_SIZE to avoid invoking vmtruncate.
+                 *
+                 * NB: ATTR_SIZE will only be set at this point if the size
+                 * resides on the MDS, ie, this file has no objects. */
+                save_valid = attr->ia_valid;
+                attr->ia_valid &= ~ATTR_SIZE;
                 inode_setattr(inode, attr);
+                attr->ia_valid = save_valid;
+
                 ll_update_inode(inode, md.body, md.lsm);
                 ptlrpc_req_finished(request);
 
@@ -1013,6 +1022,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 ldlm_policy_data_t policy = { .l_extent = {attr->ia_size,
                                                            OBD_OBJECT_EOF } };
                 struct lustre_handle lockh = { 0 };
+                struct ll_inode_info *lli = ll_i2info(inode);
                 int err, ast_flags = 0;
                 /* XXX when we fix the AST intents to pass the discard-range
                  * XXX extent, make ast_flags always LDLM_AST_DISCARD_DATA
@@ -1020,38 +1030,20 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 if (attr->ia_size == 0)
                         ast_flags = LDLM_AST_DISCARD_DATA;
 
-                /* bug 1639: avoid write/truncate i_sem/DLM deadlock */
-                LASSERT(atomic_read(&inode->i_sem.count) <= 0);
-                up(&inode->i_sem);
-                UP_WRITE_I_ALLOC_SEM(inode);
                 rc = ll_extent_lock(NULL, inode, lsm, LCK_PW, &policy, &lockh,
                                     ast_flags);
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-                DOWN_WRITE_I_ALLOC_SEM(inode);
-                down(&inode->i_sem);
-#else
-                down(&inode->i_sem);
-                DOWN_WRITE_I_ALLOC_SEM(inode);
-#endif
                 if (rc != 0)
                         RETURN(rc);
 
+                down(&lli->lli_size_sem);
                 rc = vmtruncate(inode, attr->ia_size);
+                // if vmtruncate returned 0, then ll_truncate dropped _size_sem
+                if (rc != 0) {
+                        LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
+                        up(&lli->lli_size_sem);
+                }
 
-                /* We need to drop the semaphore here, because this unlock may
-                 * result in a cancellation, which will need the i_sem */
-                up(&inode->i_sem);
-                UP_WRITE_I_ALLOC_SEM(inode);
-                /* unlock now as we don't mind others file lockers racing with
-                 * the mds updates below? */
                 err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-                DOWN_WRITE_I_ALLOC_SEM(inode);
-                down(&inode->i_sem);
-#else
-                down(&inode->i_sem);
-                DOWN_WRITE_I_ALLOC_SEM(inode);
-#endif
                 if (err) {
                         CERROR("ll_extent_unlock failed: %d\n", err);
                         if (!rc)
index 3e947dd..8bbd1d8 100644 (file)
@@ -105,10 +105,13 @@ __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
 
 /* this isn't where truncate starts.   roughly:
  * sys_truncate->ll_setattr_raw->vmtruncate->ll_truncate
- * we grab the lock back in setattr_raw to avoid races. */
+ * we grab the lock back in setattr_raw to avoid races.
+ *
+ * must be called with lli_size_sem held */
 void ll_truncate(struct inode *inode)
 {
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+        struct ll_inode_info *lli = ll_i2info(inode);
         struct obdo oa;
         int rc;
         ENTRY;
@@ -118,37 +121,43 @@ void ll_truncate(struct inode *inode)
         if (!lsm) {
                 CDEBUG(D_INODE, "truncate on inode %lu with no objects\n",
                        inode->i_ino);
-                EXIT;
-                return;
+                GOTO(out_unlock, 0);
         }
 
         if (lov_merge_size(lsm, 0) == inode->i_size) {
                 CDEBUG(D_VFSTRACE, "skipping punch for "LPX64" (size = %llu)\n",
                        lsm->lsm_object_id, inode->i_size);
-        } else {
-                CDEBUG(D_INFO, "calling punch for "LPX64" (new size %llu)\n",
-                       lsm->lsm_object_id, inode->i_size);
+                GOTO(out_unlock, 0);
+        }
 
-                oa.o_id = lsm->lsm_object_id;
-                oa.o_valid = OBD_MD_FLID;
-                obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
-                                OBD_MD_FLATIME |OBD_MD_FLMTIME |OBD_MD_FLCTIME);
+        CDEBUG(D_INFO, "calling punch for "LPX64" (new size %llu)\n",
+               lsm->lsm_object_id, inode->i_size);
 
-                /* truncate == punch from new size to absolute end of file */
-                /* NB: must call obd_punch with i_sem held!  It updates kms! */
-                rc = obd_punch(ll_i2obdexp(inode), &oa, lsm, inode->i_size,
-                               OBD_OBJECT_EOF, NULL);
-                if (rc)
-                        CERROR("obd_truncate fails (%d) ino %lu\n", rc,
-                               inode->i_ino);
-                else
-                        obdo_to_inode(inode, &oa, OBD_MD_FLSIZE|OBD_MD_FLBLOCKS|
-                                      OBD_MD_FLATIME | OBD_MD_FLMTIME |
-                                      OBD_MD_FLCTIME);
-        }
+        oa.o_id = lsm->lsm_object_id;
+        oa.o_valid = OBD_MD_FLID;
+        obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE |
+                        OBD_MD_FLATIME |OBD_MD_FLMTIME |OBD_MD_FLCTIME);
+
+        obd_adjust_kms(ll_i2obdexp(inode), lsm, inode->i_size, 1);
+
+        LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
+        up(&lli->lli_size_sem);
 
+        rc = obd_punch(ll_i2obdexp(inode), &oa, lsm, inode->i_size,
+                       OBD_OBJECT_EOF, NULL);
+        if (rc)
+                CERROR("obd_truncate fails (%d) ino %lu\n", rc,
+                       inode->i_ino);
+        else
+                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE|OBD_MD_FLBLOCKS|
+                              OBD_MD_FLATIME | OBD_MD_FLMTIME |
+                              OBD_MD_FLCTIME);
         EXIT;
         return;
+
+ out_unlock:
+        LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0);
+        up(&lli->lli_size_sem);
 } /* ll_truncate */
 
 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
@@ -199,7 +208,9 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from,
         /* If are writing to a new page, no need to read old data.  The extent
          * locking will have updated the KMS, and for our purposes here we can
          * treat it like i_size. */
+        down(&lli->lli_size_sem);
         kms = lov_merge_size(lsm, 1);
+        up(&lli->lli_size_sem);
         if (kms <= offset) {
                 LL_CDEBUG_PAGE(D_PAGE, page, "kms "LPU64" <= offset "LPU64"\n",
                                kms, offset);
@@ -641,6 +652,7 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from,
 
 out:
         size = (((obd_off)page->index) << PAGE_SHIFT) + to;
+        down(&lli->lli_size_sem);
         if (rc == 0) {
                 obd_adjust_kms(exp, lsm, size, 0);
                 if (size > inode->i_size)
@@ -652,6 +664,7 @@ out:
                  * teardown our book-keeping here. */
                 ll_removepage(page);
         }
+        up(&lli->lli_size_sem);
         RETURN(rc);
 }
 
index e273c63..1ac7190 100644 (file)
@@ -1126,15 +1126,10 @@ int lov_update_setattr_set(struct lov_request_set *set,
 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
                          int rc)
 {
-        struct lov_stripe_md *lsm = set->set_md;
         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
         ENTRY;
 
         lov_update_set(set, req, rc);
-        if (rc == 0) {
-                struct lov_oinfo *loi = &lsm->lsm_oinfo[req->rq_stripe];
-                loi->loi_kms = loi->loi_rss = req->rq_extent.start;
-        }
         if (rc && !lov->tgts[req->rq_idx].active)
                 rc = 0;
         /* FIXME in raid1 regime, should return 0 */
index f135417..194bbb5 100644 (file)
@@ -814,6 +814,7 @@ int mds_open(struct mds_update_record *rec, int offset,
                         }
                         DEBUG_REQ(D_ERROR, req, "fid2 not set on open replay");
                 }
+                /* this LASSERT needs to go; handle more gracefully. */
                 LASSERT(rec->ur_fid2->id);
 
                 rc = mds_open_by_fid(req, rec->ur_fid2, body, rec->ur_flags,