Whamcloud - gitweb
Handle glimpse races where inode is evicted by client but still holding lock.
authoradilger <adilger>
Mon, 8 Mar 2004 20:08:55 +0000 (20:08 +0000)
committeradilger <adilger>
Mon, 8 Mar 2004 20:08:55 +0000 (20:08 +0000)
b=2818

lustre/ChangeLog
lustre/include/linux/lustre_dlm.h
lustre/include/linux/obd.h
lustre/ldlm/ldlm_lockd.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/lov/lov_obd.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_lvb.c
lustre/osc/osc_request.c

index 274a565..21c3e3f 100644 (file)
@@ -1,3 +1,8 @@
+tbd  Cluster File Systems, Inc. <info@clusterfs.com>
+       * version 1.2.1
+       * bug fixes
+       - fixes for glimpse AST timeouts / incorrectly 0-sized files (2818)
+
 2004-03-04  Cluster File Systems, Inc. <info@clusterfs.com>
        * version 1.2.0
        * bug fixes
index b8515a3..d85d7a1 100644 (file)
@@ -29,6 +29,7 @@ typedef enum {
         ELDLM_LOCK_CHANGED = 300,
         ELDLM_LOCK_ABORTED = 301,
         ELDLM_LOCK_REPLACED = 302,
+        ELDLM_NO_LOCK_DATA = 303,
 
         ELDLM_NAMESPACE_EXISTS = 400,
         ELDLM_BAD_NAMESPACE    = 401
@@ -144,7 +145,7 @@ typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
 struct ldlm_valblock_ops {
         int (*lvbo_init)(struct ldlm_resource *res);
         int (*lvbo_update)(struct ldlm_resource *res, struct lustre_msg *m,
-                           int buf_idx);
+                           int buf_idx, int increase);
 };
 
 struct ldlm_namespace {
index 24ee1c2..242498e 100644 (file)
@@ -55,6 +55,7 @@ struct lov_oinfo {                 /* per-stripe data structure */
         struct list_head loi_cli_item;
         struct list_head loi_write_item;
 
+        int loi_kms_valid:1;
         __u64 loi_kms; /* known minimum size */
         __u64 loi_rss; /* recently seen size */
         __u64 loi_mtime; /* recently seen mtime */
index 5765d8c..b1c265d 100644 (file)
@@ -547,12 +547,15 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         } else if (rc == -EINVAL) {
                 LDLM_DEBUG(lock, "lost the race -- client no longer has this "
                            "lock");
+        } else if (rc == -ELDLM_NO_LOCK_DATA) {
+                LDLM_DEBUG(lock, "lost a race -- client has a lock, but no "
+                           "inode");
         } else if (rc) {
                 LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
                            "glimpse AST", rc, req->rq_status);
         } else {
-                rc = res->lr_namespace->ns_lvbo->lvbo_update(res,
-                                                             req->rq_repmsg, 0);
+                rc = res->lr_namespace->ns_lvbo->lvbo_update
+                        (res, req->rq_repmsg, 0, 1);
         }
         ptlrpc_req_finished(req);
         RETURN(rc);
@@ -767,7 +770,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                 if (res && res->lr_namespace->ns_lvbo &&
                     res->lr_namespace->ns_lvbo->lvbo_update) {
                         (void)res->lr_namespace->ns_lvbo->lvbo_update
-                                (res, NULL, 0);
+                                (res, NULL, 0, 0);
                                 //(res, req->rq_reqmsg, 1);
                 }
 
@@ -889,6 +892,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
                                     struct ldlm_request *dlm_req,
                                     struct ldlm_lock *lock)
 {
+        int rc = -ENOSYS;
         ENTRY;
 
         l_lock(&ns->ns_lock);
@@ -897,10 +901,17 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
         if (lock->l_glimpse_ast != NULL) {
                 l_unlock(&ns->ns_lock);
                 l_check_no_ns_lock(ns);
-                lock->l_glimpse_ast(lock, req);
+                rc = lock->l_glimpse_ast(lock, req);
                 l_lock(&ns->ns_lock);
         }
 
+        if (req->rq_repmsg != NULL) {
+                ptlrpc_reply(req);
+        } else {
+                req->rq_status = rc;
+                ptlrpc_error(req);
+        }
+
         if (lock->l_granted_mode == LCK_PW &&
             !lock->l_readers && !lock->l_writers &&
             time_after(jiffies, lock->l_last_used + 10 * HZ)) {
index 2cbc22e..9e487d5 100644 (file)
@@ -549,64 +549,50 @@ int ll_async_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 }
 #endif
 
-/* This function is a disaster.  I hate the LOV. */
 static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
 {
         struct ptlrpc_request *req = reqp;
         struct inode *inode = ll_inode_from_lock(lock);
-        struct obd_export *exp;
         struct ll_inode_info *lli;
         struct ost_lvb *lvb;
-        struct {
-                int stripe_number;
-                __u64 size;
-                struct lov_stripe_md *lsm;
-        } data;
-        __u32 vallen = sizeof(data);
-        int rc, size = sizeof(*lvb);
+        int rc, size = sizeof(*lvb), stripe = 0;
         ENTRY;
 
         if (inode == NULL)
-                RETURN(0);
+                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
         lli = ll_i2info(inode);
         if (lli == NULL)
-                goto iput;
+                GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
         if (lli->lli_smd == NULL)
-                goto iput;
-        exp = ll_i2obdexp(inode);
+                GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
 
         /* First, find out which stripe index this lock corresponds to. */
         if (lli->lli_smd->lsm_stripe_count > 1)
-                data.stripe_number = ll_lock_to_stripe_offset(inode, lock);
-        else
-                data.stripe_number = 0;
-
-        data.size = inode->i_size;
-        data.lsm = lli->lli_smd;
-
-        rc = obd_get_info(exp, strlen("size_to_stripe"), "size_to_stripe",
-                          &vallen, &data);
-        if (rc != 0) {
-                CERROR("obd_get_info: rc = %d\n", rc);
-                LBUG();
-        }
-
-        LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> size "LPU64,
-                   inode->i_size, data.stripe_number, data.size);
+                stripe = ll_lock_to_stripe_offset(inode, lock);
 
         rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc) {
                 CERROR("lustre_pack_reply: %d\n", rc);
-                goto iput;
+                GOTO(iput, rc);
         }
 
         lvb = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*lvb));
-        lvb->lvb_size = data.size;
-        ptlrpc_reply(req);
+        lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe].loi_kms;
 
+        LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,
+                   inode->i_size, stripe, lvb->lvb_size);
+        GOTO(iput, 0);
  iput:
         iput(inode);
-        RETURN(0);
+
+ out:
+        /* These errors are normal races, so we don't want to fill the console
+         * with messages by calling ptlrpc_error() */
+        if (rc == -ELDLM_NO_LOCK_DATA)
+                lustre_pack_reply(req, 0, NULL, NULL);
+
+        req->rq_status = rc;
+        return rc;
 }
 
 __u64 lov_merge_size(struct lov_stripe_md *lsm, int kms);
index 4c59d71..5784eb8 100644 (file)
@@ -104,6 +104,7 @@ int ll_set_inode(struct inode *inode, void *opaque)
         ll_read_inode2(inode, opaque);
         return 0;
 }
+
 struct inode *ll_iget(struct super_block *sb, ino_t hash,
                       struct lustre_md *md)
 {
index 4d3b635..6267ef5 100644 (file)
@@ -2057,6 +2057,7 @@ static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
                 /* XXX LOV STACKING: submd should be from the subobj */
                 submd->lsm_object_id = loi->loi_id;
                 submd->lsm_stripe_count = 0;
+                submd->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
                 submd->lsm_oinfo->loi_rss = loi->loi_rss;
                 submd->lsm_oinfo->loi_kms = loi->loi_kms;
                 loi->loi_mtime = submd->lsm_oinfo->loi_mtime;
@@ -2087,6 +2088,7 @@ static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
                                        LPU64", kms="LPU64"\n", loi->loi_rss,
                                        tmp);
                                 loi->loi_kms = tmp;
+                                loi->loi_kms_valid = 1;
                         } else {
                                 CDEBUG(D_INODE, "lock acquired, setting rss="
                                        LPU64"; leaving kms="LPU64", end="LPU64
index 0e8e458..5d9b641 100644 (file)
@@ -1136,12 +1136,12 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
 
         LASSERT(l->l_glimpse_ast != NULL);
         rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+        if (rc != 0 && res->lr_namespace->ns_lvbo &&
+            res->lr_namespace->ns_lvbo->lvbo_update) {
+                res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+        }
 
         down(&res->lr_lvb_sem);
-#if 0
-        if (res_lvb->lvb_size == reply_lvb->lvb_size)
-                LDLM_ERROR(l, "we lost the glimpse race!");
-#endif
         reply_lvb->lvb_size = res_lvb->lvb_size;
         up(&res->lr_lvb_sem);
 
@@ -1665,7 +1665,7 @@ static int filter_setattr(struct obd_export *exp, struct obdo *oa,
                         if (res->lr_namespace->ns_lvbo &&
                             res->lr_namespace->ns_lvbo->lvbo_update) {
                                 rc = res->lr_namespace->ns_lvbo->lvbo_update
-                                        (res, NULL, 0);
+                                        (res, NULL, 0, 0);
                         }
                         ldlm_resource_putref(res);
                 }
index 852aeaf..acfba4c 100644 (file)
@@ -102,9 +102,11 @@ static int filter_lvbo_init(struct ldlm_resource *res)
  *
  *   m != NULL : called by the DLM itself after a glimpse callback
  *   m == NULL : called by the filter after a disk write
+ *
+ *   If 'increase' is true, don't allow values to move backwards.
  */
 static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
-                              int buf_idx)
+                              int buf_idx, int increase)
 {
         int rc = 0;
         struct ost_lvb *lvb = res->lr_lvb_data;
@@ -137,13 +139,13 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
                         //GOTO(out, rc = -EPROTO);
                         GOTO(out, rc = 0);
                 }
-                if (new->lvb_size > lvb->lvb_size) {
+                if (new->lvb_size > lvb->lvb_size || !increase) {
                         CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: "
                                LPU64" -> "LPU64"\n", res->lr_name.name[0],
                                lvb->lvb_size, new->lvb_size);
                         lvb->lvb_size = new->lvb_size;
                 }
-                if (new->lvb_mtime > lvb->lvb_mtime) {
+                if (new->lvb_mtime > lvb->lvb_mtime || !increase) {
                         CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime: "
                                LPU64" -> "LPU64"\n", res->lr_name.name[0],
                                lvb->lvb_mtime, new->lvb_mtime);
@@ -170,10 +172,18 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
         oa->o_valid = OBD_MD_FLID;
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
-        lvb->lvb_size = dentry->d_inode->i_size;
-        lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
-        CDEBUG(D_DLMTRACE, "res: "LPU64" disk lvb size: "LPU64", mtime: "
-               LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_mtime);
+        if (dentry->d_inode->i_size > lvb->lvb_size || !increase) {
+                CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size from disk: "
+                       LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                       lvb->lvb_size, dentry->d_inode->i_size);
+                lvb->lvb_size = dentry->d_inode->i_size;
+        }
+        if (dentry->d_inode->i_mtime > lvb->lvb_mtime || !increase) {
+                CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime from disk: "
+                       LPU64" -> "LPU64"\n", res->lr_name.name[0],
+                       lvb->lvb_mtime,(__u64)LTIME_S(dentry->d_inode->i_mtime));
+                lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+        }
         f_dput(dentry);
 
  out:
index 6858fe1..a6a3992 100644 (file)
@@ -2384,6 +2384,9 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
         policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
         policy->l_extent.end |= ~PAGE_MASK;
 
+        if (lsm->lsm_oinfo->loi_kms_valid == 0)
+                goto no_match;
+
         /* Next, search for already existing extent locks that will cover us */
         rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode,
                              lockh);
@@ -2424,6 +2427,7 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
                 }
         }
 
+ no_match:
         rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, res_id, type,
                               policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
                               &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);