Whamcloud - gitweb
b=22755 Don't consume grant twice on recoverable resend
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index 17d4685..859e290 100644 (file)
@@ -109,7 +109,7 @@ void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
         fed->fed_grant -= oa->o_dropped;
         fed->fed_dirty = oa->o_dirty;
 
-        if (oa->o_flags & OBD_FL_SHRINK_GRANT) {
+        if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_SHRINK_GRANT) {
                 obd_size left_space = filter_grant_space_left(exp);
                 struct filter_obd *filter = &exp->exp_obd->u.filter;
 
@@ -401,7 +401,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 cfs_spin_lock(&obd->obd_osfs_lock);
                 filter_grant_incoming(exp, oa);
 
-                if (!(oa->o_flags & OBD_FL_SHRINK_GRANT))
+                if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+                    !(oa->o_flags & OBD_FL_SHRINK_GRANT))
                         oa->o_grant = 0;
                 cfs_spin_unlock(&obd->obd_osfs_lock);
         }
@@ -529,6 +530,14 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
         int blocksize = exp->exp_obd->u.obt.obt_sb->s_blocksize;
         unsigned long used = 0, ungranted = 0, using;
         int i, rc = -ENOSPC, obj, n = 0;
+        int resend = 0;
+
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            (oa->o_flags & OBD_FL_RECOV_RESEND)) {
+                resend = 1;
+                CDEBUG(D_CACHE, "Recoverable resend arrived, skipping "
+                       "accounting\n");
+        }
 
         LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
 
@@ -545,7 +554,12 @@ static int filter_grant_check(struct obd_export *exp, struct obdo *oa,
 
                         if ((lnb[n].flags & OBD_BRW_FROM_GRANT) &&
                             (oa->o_valid & OBD_MD_FLGRANT)) {
-                                if (fed->fed_grant < used + bytes) {
+                                if (resend) {
+                                        /* this is a recoverable resent */
+                                        lnb[n].flags |= OBD_BRW_GRANTED;
+                                        rc = 0;
+                                        continue;
+                                } else if (fed->fed_grant < used + bytes) {
                                         CDEBUG(D_CACHE,
                                                "%s: cli %s/%p claims %ld+%d "
                                                "GRANT, real grant %lu idx %d\n",
@@ -681,9 +695,36 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         cleanup_phase = 2;
 
         if (dentry->d_inode == NULL) {
-                CERROR("%s: trying to BRW to non-existent file "LPU64"\n",
-                       obd->obd_name, obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
+                if (exp->exp_obd->obd_recovering) {
+                        struct obdo *noa = oa;
+
+                        if (oa == NULL) {
+                                OBDO_ALLOC(noa);
+                                if (noa == NULL)
+                                        GOTO(recreate_out, rc = -ENOMEM);
+                                noa->o_id = obj->ioo_id;
+                                noa->o_valid = OBD_MD_FLID;
+                        }
+
+                        if (filter_create(exp, noa, NULL, oti) == 0) {
+                                f_dput(dentry);
+                                dentry = filter_fid2dentry(exp->exp_obd, NULL,
+                                                           obj->ioo_seq,
+                                                           obj->ioo_id);
+                        }
+                        if (oa == NULL)
+                                OBDO_FREE(noa);
+                }
+    recreate_out:
+                if (IS_ERR(dentry) || dentry->d_inode == NULL) {
+                        CERROR("%s: BRW to missing obj "LPU64"/"LPU64":rc %d\n",
+                               exp->exp_obd->obd_name,
+                               obj->ioo_id, obj->ioo_seq,
+                               IS_ERR(dentry) ? (int)PTR_ERR(dentry) : -ENOENT);
+                        if (IS_ERR(dentry))
+                                cleanup_phase = 1;
+                        GOTO(cleanup, rc = -ENOENT);
+                }
         }
 
         if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) &&
@@ -706,6 +747,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
          * filter truncates are serialized by i_alloc_sem, allowing
          * multiple writes or single truncate. */
         down_read(&dentry->d_inode->i_alloc_sem);
+        fsfilt_check_slow(obd, now, "i_alloc_sem");
 
         /* Don't update inode timestamps if this write is older than a
          * setattr which modifies the timestamps. b=10150 */
@@ -739,7 +781,9 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
          * so no sense in allocating it some more. We either return the grant
          * back to the client if we have plenty of space or we don't return
          * anything if we are short. This was decided in filter_grant_incoming*/
-        if (oa->o_valid & OBD_MD_FLGRANT &&!(oa->o_valid & OBD_FL_SHRINK_GRANT))
+        if ((oa->o_valid & OBD_MD_FLGRANT) &&
+            (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+             !(oa->o_flags & OBD_FL_SHRINK_GRANT)))
                 oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty,
                                            left, 1);
 
@@ -999,6 +1043,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
                 lnb[i].page = pga[i].pg;
                 rnb[i].offset = pga[i].off;
                 rnb[i].len = pga[i].count;
+                lnb[i].flags = rnb[i].flags = pga[i].flag;
         }
 
         obdo_to_ioobj(oinfo->oi_oa, &ioo);