Whamcloud - gitweb
improve handling recoverable errors
authorshadow <shadow>
Tue, 2 Oct 2007 14:32:38 +0000 (14:32 +0000)
committershadow <shadow>
Tue, 2 Oct 2007 14:32:38 +0000 (14:32 +0000)
b=11710
i=green
i=johann

16 files changed:
lustre/ChangeLog
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/include/obd_ost.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_lib.c
lustre/llite/file.c
lustre/llite/lproc_llite.c
lustre/llite/rw.c
lustre/osc/lproc_osc.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/tests/sanity.sh

index 071864e..2bce9ed 100644 (file)
@@ -350,6 +350,13 @@ Details    : Modify targets/2.6-vanilla.target.in.
              Add config file kernel-2.6.18-2.6-vanilla-x86_64.config.
              Add config file kernel-2.6.18-2.6-vanilla-x86_64-smp.config.
 
              Add config file kernel-2.6.18-2.6-vanilla-x86_64.config.
              Add config file kernel-2.6.18-2.6-vanilla-x86_64-smp.config.
 
+Severity   : major
+Bugzilla   : 11710
+Description: improve handling recoverable errors
+Details    : if request processig with error which can be recoverable on server
+             request should be resend, otherwise page released from cache and
+             marked as error.
+
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
 --------------------------------------------------------------------------------
 
 2007-08-10         Cluster File Systems, Inc. <info@clusterfs.com>
index 49092fd..9d1cf20 100644 (file)
@@ -394,8 +394,9 @@ struct ptlrpc_request {
         void  *rq_cb_data;
 
         struct ptlrpc_bulk_desc *rq_bulk;       /* client side bulk */
         void  *rq_cb_data;
 
         struct ptlrpc_bulk_desc *rq_bulk;       /* client side bulk */
-        time_t rq_sent;                         /* when request sent, seconds */
-
+        time_t rq_sent;                         /* when request sent, seconds,
+                                                 * or time when request should
+                                                 * be sent */
         /* Multi-rpc bits */
         struct list_head rq_set_chain;
         struct ptlrpc_request_set *rq_set;
         /* Multi-rpc bits */
         struct list_head rq_set_chain;
         struct ptlrpc_request_set *rq_set;
index a11b770..e86e153 100644 (file)
@@ -376,6 +376,7 @@ struct filter_obd {
 #define OSC_MAX_RIF_MAX         256
 #define OSC_MAX_DIRTY_DEFAULT  (OSC_MAX_RIF_DEFAULT * 4)
 #define OSC_MAX_DIRTY_MB_MAX   2048     /* arbitrary, but < MAX_LONG bytes */
 #define OSC_MAX_RIF_MAX         256
 #define OSC_MAX_DIRTY_DEFAULT  (OSC_MAX_RIF_DEFAULT * 4)
 #define OSC_MAX_DIRTY_MB_MAX   2048     /* arbitrary, but < MAX_LONG bytes */
+#define OSC_DEFAULT_RESENDS      10
 
 #define MDC_MAX_RIF_DEFAULT       8
 #define MDC_MAX_RIF_MAX         512
 
 #define MDC_MAX_RIF_DEFAULT       8
 #define MDC_MAX_RIF_MAX         512
@@ -465,6 +466,8 @@ struct client_obd {
 
         /* sequence manager */
         struct lu_client_seq    *cl_seq;
 
         /* sequence manager */
         struct lu_client_seq    *cl_seq;
+
+        atomic_t                 cl_resends; /* resend count */
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
 };
 #define obd2cli_tgt(obd) ((char *)(obd)->u.cli.cl_target_uuid.uuid)
 
index 23bbb04..12beb63 100644 (file)
@@ -18,7 +18,7 @@ struct osc_brw_async_args {
         int                aa_requested_nob;
         int                aa_nio_count;
         obd_count          aa_page_count;
         int                aa_requested_nob;
         int                aa_nio_count;
         obd_count          aa_page_count;
-        int                aa_retries;
+        int                aa_resends;
         struct brw_page  **aa_ppga;
         struct client_obd *aa_cli;
         struct list_head   aa_oaps;
         struct brw_page  **aa_ppga;
         struct client_obd *aa_cli;
         struct list_head   aa_oaps;
index f6717a1..8452374 100644 (file)
@@ -145,6 +145,7 @@ extern int obd_race_state;
 #define OBD_FAIL_OST_DROP_REQ            0x21d
 #define OBD_FAIL_OST_SETATTR_CREDITS     0x21e
 #define OBD_FAIL_OST_HOLD_WRITE_RPC      0x21f
 #define OBD_FAIL_OST_DROP_REQ            0x21d
 #define OBD_FAIL_OST_SETATTR_CREDITS     0x21e
 #define OBD_FAIL_OST_HOLD_WRITE_RPC      0x21f
+#define OBD_FAIL_OST_BRW_WRITE_BULK2     0x220
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
@@ -175,6 +176,7 @@ extern int obd_race_state;
 #define OBD_FAIL_OSC_SHUTDOWN            0x407
 #define OBD_FAIL_OSC_CHECKSUM_RECEIVE    0x408
 #define OBD_FAIL_OSC_CHECKSUM_SEND       0x409
 #define OBD_FAIL_OSC_SHUTDOWN            0x407
 #define OBD_FAIL_OSC_CHECKSUM_RECEIVE    0x408
 #define OBD_FAIL_OSC_CHECKSUM_SEND       0x409
+#define OBD_FAIL_OSC_BRW_PREP_REQ2       0x40a
 
 #define OBD_FAIL_PTLRPC                  0x500
 #define OBD_FAIL_PTLRPC_ACK              0x501
 
 #define OBD_FAIL_PTLRPC                  0x500
 #define OBD_FAIL_PTLRPC_ACK              0x501
index 90a7945..94add82 100644 (file)
@@ -277,6 +277,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
 #ifdef ENABLE_CHECKSUM
         cli->cl_checksum = 1;
 #endif
 #ifdef ENABLE_CHECKSUM
         cli->cl_checksum = 1;
 #endif
+        atomic_set(&cli->cl_resends, OSC_DEFAULT_RESENDS);
 
         /* This value may be changed at connect time in
            ptlrpc_connect_interpret. */
 
         /* This value may be changed at connect time in
            ptlrpc_connect_interpret. */
index 4dd95ab..dc07a8a 100644 (file)
@@ -751,9 +751,10 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
         struct page *page;
         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
         struct lustre_handle lockh;
         struct page *page;
         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
         struct lustre_handle lockh;
-        ENTRY;
+        struct address_space *mapping = inode->i_mapping;
 
 
-        memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
+        ENTRY;
+        tmpex = lock->l_policy_data;
         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
                i_size_read(inode));
         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
                i_size_read(inode));
@@ -798,8 +799,8 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
         for (i = start; i <= end; i += (j + skip)) {
                 j = min(count - (i % count), end - i + 1);
                 LASSERT(j > 0);
         for (i = start; i <= end; i += (j + skip)) {
                 j = min(count - (i % count), end - i + 1);
                 LASSERT(j > 0);
-                LASSERT(inode->i_mapping);
-                if (ll_teardown_mmaps(inode->i_mapping,
+                LASSERT(mapping);
+                if (ll_teardown_mmaps(mapping,
                                       (__u64)i << CFS_PAGE_SHIFT,
                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
                         break;
                                       (__u64)i << CFS_PAGE_SHIFT,
                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
                         break;
@@ -824,14 +825,14 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
                          start, i, end);
 
                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
                          start, i, end);
 
-                if (!mapping_has_pages(inode->i_mapping)) {
+                if (!mapping_has_pages(mapping)) {
                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
                         break;
                 }
 
                 cond_resched();
 
                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
                         break;
                 }
 
                 cond_resched();
 
-                page = find_get_page(inode->i_mapping, i);
+                page = find_get_page(mapping, i);
                 if (page == NULL)
                         continue;
                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
                 if (page == NULL)
                         continue;
                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
@@ -841,13 +842,19 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
                 /* page->mapping to check with racing against teardown */
                 if (!discard && clear_page_dirty_for_io(page)) {
                         rc = ll_call_writepage(inode, page);
                 /* page->mapping to check with racing against teardown */
                 if (!discard && clear_page_dirty_for_io(page)) {
                         rc = ll_call_writepage(inode, page);
-                        if (rc != 0)
-                                CERROR("writepage inode %lu(%p) of page %p "
-                                       "failed: %d\n", inode->i_ino, inode,
-                                       page, rc);
                         /* either waiting for io to complete or reacquiring
                          * the lock that the failed writepage released */
                         lock_page(page);
                         /* either waiting for io to complete or reacquiring
                          * the lock that the failed writepage released */
                         lock_page(page);
+                        wait_on_page_writeback(page);
+                        if (rc != 0) {
+                                CERROR("writepage inode %lu(%p) of page %p "
+                                       "failed: %d\n", inode->i_ino, inode,
+                                       page, rc);
+                                if (rc == -ENOSPC)
+                                        set_bit(AS_ENOSPC, &mapping->flags);
+                                else
+                                        set_bit(AS_EIO, &mapping->flags);
+                        }
                 }
 
                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
                 }
 
                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
@@ -864,7 +871,7 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
                          * lock_page() */
                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
                         if (llap)
                          * lock_page() */
                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
                         if (llap)
-                                ll_ra_accounting(llap, inode->i_mapping);
+                                ll_ra_accounting(llap, mapping);
                         ll_truncate_complete_page(page);
                 }
                 unlock_page(page);
                         ll_truncate_complete_page(page);
                 }
                 unlock_page(page);
index f2a6922..32a48d3 100644 (file)
@@ -736,7 +736,7 @@ static int llite_dump_pgcache_seq_show(struct seq_file *seq, void *v)
         /* 2.4 doesn't seem to have SEQ_START_TOKEN, so we implement
          * it in our own state */
         if (dummy_llap->llap_magic == 0) {
         /* 2.4 doesn't seem to have SEQ_START_TOKEN, so we implement
          * it in our own state */
         if (dummy_llap->llap_magic == 0) {
-                seq_printf(seq, "gener |  llap  cookie  origin wq du | page "
+                seq_printf(seq, "gener |  llap  cookie  origin wq du wb | page "
                                 "inode index count [ page flags ]\n");
                 return 0;
         }
                                 "inode index count [ page flags ]\n");
                 return 0;
         }
@@ -751,13 +751,17 @@ static int llite_dump_pgcache_seq_show(struct seq_file *seq, void *v)
                 LASSERTF(llap->llap_origin < LLAP__ORIGIN_MAX, "%u\n",
                          llap->llap_origin);
 
                 LASSERTF(llap->llap_origin < LLAP__ORIGIN_MAX, "%u\n",
                          llap->llap_origin);
 
-                seq_printf(seq, "%5lu | %p %p %s %s %s | %p %p %lu %u [",
+                seq_printf(seq," %5lu | %p %p %s %s %s %s | %p %lu/%u(%p) "
+                           "%lu %u [",
                            sbi->ll_pglist_gen,
                            llap, llap->llap_cookie,
                            llap_origins[llap->llap_origin],
                            llap->llap_write_queued ? "wq" : "- ",
                            llap->llap_defer_uptodate ? "du" : "- ",
                            sbi->ll_pglist_gen,
                            llap, llap->llap_cookie,
                            llap_origins[llap->llap_origin],
                            llap->llap_write_queued ? "wq" : "- ",
                            llap->llap_defer_uptodate ? "du" : "- ",
-                           page, page->mapping->host, page->index,
+                           PageWriteback(page) ? "wb" : "-",
+                           page, page->mapping->host->i_ino,
+                           page->mapping->host->i_generation,
+                           page->mapping->host, page->index,
                            page_count(page));
                 seq_page_flag(seq, page, locked, has_flags);
                 seq_page_flag(seq, page, error, has_flags);
                            page_count(page));
                 seq_page_flag(seq, page, locked, has_flags);
                 seq_page_flag(seq, page, error, has_flags);
@@ -767,9 +771,10 @@ static int llite_dump_pgcache_seq_show(struct seq_file *seq, void *v)
 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,12))
                 seq_page_flag(seq, page, highmem, has_flags);
 #endif
 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,12))
                 seq_page_flag(seq, page, highmem, has_flags);
 #endif
+                seq_page_flag(seq, page, writeback, has_flags);
                 if (!has_flags)
                         seq_puts(seq, "-]\n");
                 if (!has_flags)
                         seq_puts(seq, "-]\n");
-                else 
+                else
                         seq_puts(seq, "]\n");
         }
 
                         seq_puts(seq, "]\n");
         }
 
index 8587902..ac97e87 100644 (file)
@@ -515,18 +515,16 @@ int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction)
                         continue;
                 }
 
                         continue;
                 }
 
-                if (llap->llap_write_queued || PageDirty(page) ||
-                    (!PageUptodate(page) &&
-                     llap->llap_origin != LLAP_ORIGIN_READAHEAD))
-                        keep = 1;
-                else
-                        keep = 0;
+               keep = (llap->llap_write_queued || PageDirty(page) ||
+                      PageWriteback(page) || (!PageUptodate(page) &&
+                      llap->llap_origin != LLAP_ORIGIN_READAHEAD));
 
 
-                LL_CDEBUG_PAGE(D_PAGE, page,"%s LRU page: %s%s%s%s origin %s\n",
+                LL_CDEBUG_PAGE(D_PAGE, page,"%s LRU page: %s%s%s%s%s origin %s\n",
                                keep ? "keep" : "drop",
                                llap->llap_write_queued ? "wq " : "",
                                PageDirty(page) ? "pd " : "",
                                PageUptodate(page) ? "" : "!pu ",
                                keep ? "keep" : "drop",
                                llap->llap_write_queued ? "wq " : "",
                                PageDirty(page) ? "pd " : "",
                                PageUptodate(page) ? "" : "!pu ",
+                               PageWriteback(page) ? "wb" : "",
                                llap->llap_defer_uptodate ? "" : "!du",
                                llap_origins[llap->llap_origin]);
 
                                llap->llap_defer_uptodate ? "" : "!du",
                                llap_origins[llap->llap_origin]);
 
@@ -904,11 +902,16 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
         } else {
                 if (cmd & OBD_BRW_READ) {
                         llap->llap_defer_uptodate = 0;
         } else {
                 if (cmd & OBD_BRW_READ) {
                         llap->llap_defer_uptodate = 0;
-                } else {
-                        ll_redirty_page(page);
-                        ret = 1;
                 }
                 SetPageError(page);
                 }
                 SetPageError(page);
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+                if (rc == -ENOSPC)
+                        set_bit(AS_ENOSPC, &page->mapping->flags);
+                else
+                        set_bit(AS_EIO, &page->mapping->flags);
+#else
+                page->mapping->gfp_mask |= AS_EIO_MASK;
+#endif
         }
 
         unlock_page(page);
         }
 
         unlock_page(page);
@@ -1439,7 +1442,9 @@ out:
                 if (PageWriteback(page)) {
                         end_page_writeback(page);
                 }
                 if (PageWriteback(page)) {
                         end_page_writeback(page);
                 }
-                ll_redirty_page(page);
+                /* resend page only for not started IO*/
+                if (!PageError(page))
+                        ll_redirty_page(page);
                 unlock_page(page);
         }
         RETURN(rc);
                 unlock_page(page);
         }
         RETURN(rc);
index eabdc3f..3c40725 100644 (file)
@@ -303,6 +303,32 @@ static int osc_wr_checksum(struct file *file, const char *buffer,
         return count;
 }
 
         return count;
 }
 
+static int osc_rd_resend_count(char *page, char **start, off_t off, int count,
+                               int *eof, void *data)
+{
+        struct obd_device *obd = data;
+
+        return snprintf(page, count, "%u\n", atomic_read(&obd->u.cli.cl_resends)); 
+}
+
+static int osc_wr_resend_count(struct file *file, const char *buffer,
+                               unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        int val, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+
+        if (val < 0)
+               return -EINVAL;
+
+        atomic_set(&obd->u.cli.cl_resends, val);
+
+        return count;
+}
+
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,        0, 0 },
         { "ping",            0, lprocfs_wr_ping,        0 },
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,        0, 0 },
         { "ping",            0, lprocfs_wr_ping,        0 },
@@ -330,6 +356,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 },
         { "checksums",       osc_rd_checksum, osc_wr_checksum, 0 },
         { "sptlrpc",         sptlrpc_lprocfs_rd, 0, 0 },
         { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 },
         { "checksums",       osc_rd_checksum, osc_wr_checksum, 0 },
         { "sptlrpc",         sptlrpc_lprocfs_rd, 0, 0 },
+        { "resend_count",    osc_rd_resend_count, osc_wr_resend_count, 0},
         { 0 }
 };
 
         { 0 }
 };
 
@@ -464,3 +491,4 @@ int lproc_osc_attach_seqstat(struct obd_device *dev)
 
 LPROCFS_INIT_VARS(osc, lprocfs_module_vars, lprocfs_obd_vars)
 #endif /* LPROCFS */
 
 LPROCFS_INIT_VARS(osc, lprocfs_module_vars, lprocfs_obd_vars)
 #endif /* LPROCFS */
+
index e3e6013..72ce3ec 100644 (file)
@@ -73,4 +73,17 @@ static inline int lproc_osc_attach_seqstat(struct obd_device *dev) {return 0;}
         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
 #endif
 
         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
 #endif
 
+static inline int osc_recoverable_error(int rc)
+{
+        return (rc == -EIO || rc == -EROFS || rc == -ENOMEM || rc == -EAGAIN);
+}
+
+/* return 1 if osc should be resend request */
+static inline int osc_should_resend(int resend, struct client_obd *cli)
+{
+        return atomic_read(&cli->cl_resends) ? 
+                atomic_read(&cli->cl_resends) > resend : 1; 
+}
+
+
 #endif /* OSC_INTERNAL_H */
 #endif /* OSC_INTERNAL_H */
index 11f0a2b..81c3ced 100644 (file)
@@ -63,6 +63,9 @@ extern quota_interface_t osc_quota_interface;
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
 
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
 
+/* by default 10s */
+atomic_t osc_resend_time; 
+
 /* Pack OSC object metadata for disk storage (LE byte order). */
 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
                       struct lov_stripe_md *lsm)
 /* Pack OSC object metadata for disk storage (LE byte order). */
 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
                       struct lov_stripe_md *lsm)
@@ -949,6 +952,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         struct osc_brw_async_args *aa;
 
         ENTRY;
         struct osc_brw_async_args *aa;
 
         ENTRY;
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
+
         if ((cmd & OBD_BRW_WRITE) != 0) {
                 opc = OST_WRITE;
                 pool = cli->cl_import->imp_rq_pool;
         if ((cmd & OBD_BRW_WRITE) != 0) {
                 opc = OST_WRITE;
                 pool = cli->cl_import->imp_rq_pool;
@@ -967,7 +973,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         if (ocapa)
                 size[REQ_REC_OFF + 3] = sizeof(*capa);
 
         if (ocapa)
                 size[REQ_REC_OFF + 3] = sizeof(*capa);
 
-        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
                                    size, NULL, pool, NULL);
         if (req == NULL)
         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
                                    size, NULL, pool, NULL);
         if (req == NULL)
@@ -1076,7 +1081,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         aa->aa_requested_nob = requested_nob;
         aa->aa_nio_count = niocount;
         aa->aa_page_count = page_count;
         aa->aa_requested_nob = requested_nob;
         aa->aa_nio_count = niocount;
         aa->aa_page_count = page_count;
-        aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
+        aa->aa_resends = 0;
         aa->aa_ppga = pga;
         aa->aa_cli = cli;
         INIT_LIST_HEAD(&aa->aa_oaps);
         aa->aa_ppga = pga;
         aa->aa_cli = cli;
         INIT_LIST_HEAD(&aa->aa_oaps);
@@ -1283,9 +1288,15 @@ static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
                             struct obd_capa *ocapa)
 {
         struct ptlrpc_request *req;
                             struct obd_capa *ocapa)
 {
         struct ptlrpc_request *req;
-        int                    rc, retries = 5; /* lprocfs? */
+        int                    rc;
+        cfs_waitq_t            waitq;
+        int                    resends = 0;
+        struct l_wait_info     lwi;
+
         ENTRY;
 
         ENTRY;
 
+        cfs_waitq_init(&waitq);
+
 restart_bulk:
         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
                                   page_count, pga, &req, ocapa);
 restart_bulk:
         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
                                   page_count, pga, &req, ocapa);
@@ -1303,59 +1314,76 @@ restart_bulk:
         rc = osc_brw_fini_request(req, rc);
 
         ptlrpc_req_finished(req);
         rc = osc_brw_fini_request(req, rc);
 
         ptlrpc_req_finished(req);
-        if (rc == -EAGAIN) {
-                if (retries-- > 0)
-                        goto restart_bulk;
-                rc = -EIO;
+        if (osc_recoverable_error(rc)) {
+                resends++;
+                if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
+                        CERROR("too many resend retries, returning error\n");
+                        RETURN(-EIO);
+                }
+
+                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+                l_wait_event(waitq, 0, &lwi);
+
+                goto restart_bulk;
         }
         }
+        
         RETURN (rc);
 }
 
         RETURN (rc);
 }
 
-int osc_brw_redo_request(struct ptlrpc_request *req,
+int osc_brw_redo_request(struct ptlrpc_request *request,
                          struct osc_brw_async_args *aa)
 {
         struct ptlrpc_request *new_req;
                          struct osc_brw_async_args *aa)
 {
         struct ptlrpc_request *new_req;
-        struct ptlrpc_request_set *set = req->rq_set;
+        struct ptlrpc_request_set *set = request->rq_set;
         struct osc_brw_async_args *new_aa;
         struct osc_async_page *oap;
         int rc = 0;
         ENTRY;
 
         struct osc_brw_async_args *new_aa;
         struct osc_async_page *oap;
         int rc = 0;
         ENTRY;
 
-        if (aa->aa_retries-- <= 0) {
-                CERROR("too many checksum retries, returning error\n");
+        if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
+                CERROR("too many resend retries, returning error\n");
                 RETURN(-EIO);
         }
                 RETURN(-EIO);
         }
+        
+        DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
+/*
+        body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
+                ocapa = lustre_unpack_capa(request->rq_reqmsg,
+                                           REQ_REC_OFF + 3);
+*/
+        rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
+                                        OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
+                                  aa->aa_cli, aa->aa_oa,
+                                  NULL /* lsm unused by osc currently */,
+                                  aa->aa_page_count, aa->aa_ppga, 
+                                  &new_req, NULL /* ocapa */);
+        if (rc)
+                RETURN(rc);
 
 
-        DEBUG_REQ(D_ERROR, req, "redo for checksum error");
+        client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
+   
         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
                 if (oap->oap_request != NULL) {
         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
                 if (oap->oap_request != NULL) {
-                        LASSERTF(req == oap->oap_request,
+                        LASSERTF(request == oap->oap_request,
                                  "request %p != oap_request %p\n",
                                  "request %p != oap_request %p\n",
-                                 req, oap->oap_request);
+                                 request, oap->oap_request);
                         if (oap->oap_interrupted) {
                         if (oap->oap_interrupted) {
-                                ptlrpc_mark_interrupted(oap->oap_request);
-                                rc = -EINTR;
-                                break;
+                                client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
+                                ptlrpc_req_finished(new_req);                        
+                                RETURN(-EINTR);
                         }
                 }
         }
                         }
                 }
         }
-        if (rc)
-                RETURN(rc);
-        /* TODO-MERGE: and where to get ocapa?? */
-        rc = osc_brw_prep_request(lustre_msg_get_opc(req->rq_reqmsg) ==
-                                        OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
-                                  aa->aa_cli, aa->aa_oa,
-                                  NULL /* lsm unused by osc currently */,
-                                  aa->aa_page_count, aa->aa_ppga, &new_req,
-                                  NULL /* ocapa */);
-        if (rc)
-                RETURN(rc);
-
         /* New request takes over pga and oaps from old request.
          * Note that copying a list_head doesn't work, need to move it... */
         /* New request takes over pga and oaps from old request.
          * Note that copying a list_head doesn't work, need to move it... */
-        new_req->rq_interpret_reply = req->rq_interpret_reply;
-        new_req->rq_async_args = req->rq_async_args;
+        aa->aa_resends++;
+        new_req->rq_interpret_reply = request->rq_interpret_reply;
+        new_req->rq_async_args = request->rq_async_args;
+        new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
+
         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
+
         INIT_LIST_HEAD(&new_aa->aa_oaps);
         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
         INIT_LIST_HEAD(&aa->aa_oaps);
         INIT_LIST_HEAD(&new_aa->aa_oaps);
         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
         INIT_LIST_HEAD(&aa->aa_oaps);
@@ -1366,6 +1394,9 @@ int osc_brw_redo_request(struct ptlrpc_request *req,
                         oap->oap_request = ptlrpc_request_addref(new_req);
                 }
         }
                         oap->oap_request = ptlrpc_request_addref(new_req);
                 }
         }
+        client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
+
+        DEBUG_REQ(D_INFO, new_req, "new request");
 
         ptlrpc_set_add_req(set, new_req);
 
 
         ptlrpc_set_add_req(set, new_req);
 
@@ -1380,7 +1411,7 @@ static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
-        if (rc == -EAGAIN) {
+        if (osc_recoverable_error(rc)) {
                 rc = osc_brw_redo_request(req, aa);
                 if (rc == 0)
                         RETURN(0);
                 rc = osc_brw_redo_request(req, aa);
                 if (rc == 0)
                         RETURN(0);
@@ -1795,7 +1826,7 @@ unlock:
  * the app does an fsync.  As long as errors persist we force future rpcs to be
  * sync so that the app can get a sync error and break the cycle of queueing
  * pages for which writeback will fail. */
  * the app does an fsync.  As long as errors persist we force future rpcs to be
  * sync so that the app can get a sync error and break the cycle of queueing
  * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
+static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                            int rc)
 {
         if (rc) {
                            int rc)
 {
         if (rc) {
@@ -1808,7 +1839,7 @@ static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
 
         }
 
 
         }
 
-        if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
+        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
                 ar->ar_force_sync = 0;
 }
 
                 ar->ar_force_sync = 0;
 }
 
@@ -1832,18 +1863,21 @@ static void osc_oap_to_pending(struct osc_async_page *oap)
 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
                               struct osc_async_page *oap, int sent, int rc)
 {
 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
                               struct osc_async_page *oap, int sent, int rc)
 {
+        __u64 xid = 0;
+
         ENTRY;
         ENTRY;
+        if (oap->oap_request != NULL) {
+                xid = ptlrpc_req_xid(oap->oap_request);
+                ptlrpc_req_finished(oap->oap_request);
+                oap->oap_request = NULL;
+        }
+
         oap->oap_async_flags = 0;
         oap->oap_interrupted = 0;
 
         if (oap->oap_cmd & OBD_BRW_WRITE) {
         oap->oap_async_flags = 0;
         oap->oap_interrupted = 0;
 
         if (oap->oap_cmd & OBD_BRW_WRITE) {
-                osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
-                osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
-        }
-
-        if (oap->oap_request != NULL) {
-                ptlrpc_req_finished(oap->oap_request);
-                oap->oap_request = NULL;
+                osc_process_ar(&cli->cl_ar, xid, rc);
+                osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
         }
 
         if (rc == 0 && oa != NULL) {
         }
 
         if (rc == 0 && oa != NULL) {
@@ -1888,11 +1922,10 @@ static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
 
         rc = osc_brw_fini_request(req, rc);
         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
 
         rc = osc_brw_fini_request(req, rc);
         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
-        if (rc == -EAGAIN) {
+        if (osc_recoverable_error(rc)) {
                 rc = osc_brw_redo_request(req, aa);
                 if (rc == 0)
                         RETURN(0);
                 rc = osc_brw_redo_request(req, aa);
                 if (rc == 0)
                         RETURN(0);
-                GOTO(out, rc);
         }
 
         cli = aa->aa_cli;
         }
 
         cli = aa->aa_cli;
@@ -1920,8 +1953,7 @@ static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
         OBDO_FREE(aa->aa_oa);
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
         OBDO_FREE(aa->aa_oa);
-        rc = 0;
-out:
+        
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
         RETURN(rc);
 }
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
         RETURN(rc);
 }
@@ -3776,7 +3808,6 @@ struct obd_ops osc_obd_ops = {
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
 };
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
 };
-
 int __init osc_init(void)
 {
         struct lprocfs_static_vars lvars;
 int __init osc_init(void)
 {
         struct lprocfs_static_vars lvars;
index 9b1526a..cf051b4 100644 (file)
@@ -679,8 +679,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
                 GOTO(out, rc = -EIO);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
                 GOTO(out, rc = -EIO);
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
-                         (obd_timeout + 1) / 4);
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
 
         /* Check if there is eviction in progress, and if so, wait for it to
          * finish */
 
         /* Check if there is eviction in progress, and if so, wait for it to
          * finish */
@@ -936,10 +935,11 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
                 GOTO(out, rc = -EIO);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
                 GOTO(out, rc = -EIO);
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
+                GOTO(out, rc = -EFAULT);
 
         /* pause before transaction has been started */
 
         /* pause before transaction has been started */
-        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
-                         (obd_timeout + 1) / 4);
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
 
         /* Check if there is eviction in progress, and if so, wait for it to
          * finish */
 
         /* Check if there is eviction in progress, and if so, wait for it to
          * finish */
index 8547338..18ca9d2 100644 (file)
@@ -680,10 +680,10 @@ static int after_reply(struct ptlrpc_request *req)
                 /* Either we've been evicted, or the server has failed for
                  * some reason. Try to reconnect, and if that fails, punt to
                  * the upcall. */
                 /* Either we've been evicted, or the server has failed for
                  * some reason. Try to reconnect, and if that fails, punt to
                  * the upcall. */
-                if (rc == -ENOTCONN || rc == -ENODEV) {
+                if (ll_rpc_recoverable_error(rc)) {
                         if (req->rq_send_state != LUSTRE_IMP_FULL ||
                             imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
                         if (req->rq_send_state != LUSTRE_IMP_FULL ||
                             imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
-                                RETURN(-ENOTCONN);
+                                RETURN(rc);
                         }
                         ptlrpc_request_handle_notconn(req);
                         RETURN(rc);
                         }
                         ptlrpc_request_handle_notconn(req);
                         RETURN(rc);
@@ -737,6 +737,9 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
         ENTRY;
 
         LASSERT(req->rq_phase == RQ_PHASE_NEW);
         ENTRY;
 
         LASSERT(req->rq_phase == RQ_PHASE_NEW);
+        if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS))
+                RETURN (0);
+        
         req->rq_phase = RQ_PHASE_RPC;
 
         imp = req->rq_import;
         req->rq_phase = RQ_PHASE_RPC;
 
         imp = req->rq_import;
@@ -824,6 +827,9 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                     ptlrpc_send_new_req(req)) {
                         force_timer_recalc = 1;
                 }
                     ptlrpc_send_new_req(req)) {
                         force_timer_recalc = 1;
                 }
+                /* delayed send - skip */
+                if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
+                        continue;
 
                 if (!(req->rq_phase == RQ_PHASE_RPC ||
                       req->rq_phase == RQ_PHASE_BULK ||
 
                 if (!(req->rq_phase == RQ_PHASE_RPC ||
                       req->rq_phase == RQ_PHASE_BULK ||
@@ -1180,13 +1186,18 @@ int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
 
                 /* request in-flight? */
                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
 
                 /* request in-flight? */
                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
-                      (req->rq_phase == RQ_PHASE_BULK)))
+                      (req->rq_phase == RQ_PHASE_BULK) ||
+                      (req->rq_phase == RQ_PHASE_NEW)))
                         continue;
 
                 if (req->rq_timedout)   /* already timed out */
                         continue;
 
                         continue;
 
                 if (req->rq_timedout)   /* already timed out */
                         continue;
 
-                deadline = req->rq_sent + req->rq_timeout;
+                if (req->rq_phase == RQ_PHASE_NEW)
+                        deadline = req->rq_sent;
+                else
+                        deadline = req->rq_sent + req->rq_timeout;
+
                 if (deadline <= now)    /* actually expired already */
                         timeout = 1;    /* ASAP */
                 else if (timeout == 0 || timeout > deadline - now)
                 if (deadline <= now)    /* actually expired already */
                         timeout = 1;    /* ASAP */
                 else if (timeout == 0 || timeout > deadline - now)
index cc1bf80..efa11e5 100644 (file)
@@ -157,7 +157,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req);
 
 /* pers.c */
 void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc);
 
 /* pers.c */
 void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc);
-void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page, 
+void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page,
                           int pageoffset, int len);
 void ptl_rpc_wipe_bulk_pages(struct ptlrpc_bulk_desc *desc);
 
                           int pageoffset, int len);
 void ptl_rpc_wipe_bulk_pages(struct ptlrpc_bulk_desc *desc);
 
@@ -204,4 +204,8 @@ void sptlrpc_gc_stop_thread(void);
 int  __init sptlrpc_init(void);
 void __exit sptlrpc_fini(void);
 
 int  __init sptlrpc_init(void);
 void __exit sptlrpc_fini(void);
 
+static inline int ll_rpc_recoverable_error(int rc)
+{ 
+        return (rc == -ENOTCONN || rc == -ENODEV);
+}
 #endif /* PTLRPC_INTERNAL_H */
 #endif /* PTLRPC_INTERNAL_H */
index 626e23e..013de80 100644 (file)
@@ -3986,15 +3986,306 @@ test_117() # bug 10891
 }
 run_test 117 "verify fsfilt_extend =========="
 
 }
 run_test 117 "verify fsfilt_extend =========="
 
-test_118() #bug 11710
+# Reset async IO behavior after error case
+reset_async() {
+       FILE=$DIR/reset_async
+
+       # Ensure all OSCs are cleared
+       $LSTRIPE $FILE 0 -1 -1
+        dd if=/dev/zero of=$FILE bs=64k count=$OSTCOUNT
+       sync
+        rm $FILE
+}
+
+test_118a() #bug 11710
 {
 {
-       sync; sleep 1; sync
-       multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c;
-       dirty=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+       reset_async
+       
+       multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c
+       DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+               return 1;
+        fi
+}
+run_test 118a "verify O_SYNC works =========="
+
+test_118b()
+{
+       reset_async
+
+       #define OBD_FAIL_OST_ENOENT 0x217
+       sysctl -w lustre.fail_loc=0x217
+       multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c
+       RC=$?
+       sysctl -w lustre.fail_loc=0
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+               return 1;
+       fi
+
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+               return 1;
+       fi
+
+       echo "Dirty pages not leaked on ENOENT"
+
+       # Due to the above error the OSC will issue all RPCs syncronously
+       # until a subsequent RPC completes successfully without error.
+       multiop $DIR/$tfile Ow4096yc
+       rm -f $DIR/$tfile
+       
+       return 0
+}
+run_test 118b "Reclaim dirty pages on fatal error =========="
+
+test_118c()
+{
+       reset_async
+
+       #define OBD_FAIL_OST_EROFS               0x216
+       sysctl -w lustre.fail_loc=0x216
+
+       # multiop should block due to fsync until pages are written
+       multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c &
+       MULTIPID=$!
+       sleep 1
+
+       if [[ `ps h -o comm -p $MULTIPID` != "multiop" ]]; then
+               error "Multiop failed to block on fsync, pid=$MULTIPID"
+       fi
+
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $WRITEBACK -eq 0 ]]; then
+               error "No page in writeback, writeback=$WRITEBACK"
+       fi
+
+       sysctl -w lustre.fail_loc=0
+        wait $MULTIPID
+       RC=$?
+       if [[ $RC -ne 0 ]]; then
+               error "Multiop fsync failed, rc=$RC"
+       fi
+
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
        
        
-       return $dirty
+       rm -f $DIR/$tfile
+       echo "Dirty pages flushed via fsync on EROFS"
+       return 0
+}
+run_test 118c "Fsync blocks on EROFS until dirty pages are flushed =========="
+
+test_118d()
+{
+       reset_async
+
+       #define OBD_FAIL_OST_BRW_PAUSE_BULK
+       sysctl -w lustre.fail_loc=0x214
+       # multiop should block due to fsync until pages are written
+       multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c &     
+       MULTIPID=$!
+       sleep 1
+
+       if [[ `ps h -o comm -p $MULTIPID` != "multiop" ]]; then
+               error "Multiop failed to block on fsync, pid=$MULTIPID"
+       fi
+
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $WRITEBACK -eq 0 ]]; then
+               error "No page in writeback, writeback=$WRITEBACK"
+       fi
+
+        wait $MULTIPID || error "Multiop fsync failed, rc=$?"
+
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)  
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "Dirty pages gaurenteed flushed via fsync"
+       return 0
+}
+run_test 118d "Fsync validation inject a delay of the bulk =========="
+
+test_118f() {
+        reset_async
+
+        #define OBD_FAIL_OSC_BRW_PREP_REQ2        0x40a
+        sysctl -w lustre.fail_loc=0x8000040a
+
+       # Should simulate EINVAL error which is fatal
+        multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c
+        RC=$?
+
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+        reset_async
+       return 0
+}
+run_test 118f "Simulate unrecoverable OSC side error =========="
+
+test_118g() {
+        reset_async
+
+       #define OBD_FAIL_OSC_BRW_PREP_REQ        0x406
+        sysctl -w lustre.fail_loc=0x406
+
+       # simulate local -ENOMEM
+        multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c
+        RC=$?
+       
+        sysctl -w lustre.fail_loc=0
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+       
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+        reset_async
+       return 0
+}
+run_test 118g "Don't stay in wait if we got local -ENOMEM  =========="
+
+test_118h() {
+        reset_async
+
+       #define OBD_FAIL_OST_BRW_WRITE_BULK      0x20e
+        sysctl -w lustre.fail_loc=0x20e
+       # Should simulate ENOMEM error which is recoverable and should be handled by timeout
+        multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c
+        RC=$?
+       
+        sysctl -w lustre.fail_loc=0
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+       
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+       return 0
+}
+run_test 118h "Verify timeout in handling recoverables errors  =========="
+
+test_118i() {
+        reset_async
+
+       #define OBD_FAIL_OST_BRW_WRITE_BULK      0x20e
+        sysctl -w lustre.fail_loc=0x20e
+       
+       # Should simulate ENOMEM error which is recoverable and should be handled by timeout
+        multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c &
+       PID=$!
+       sleep 5
+       sysctl -w lustre.fail_loc=0
+       
+       wait $PID
+        RC=$?
+       if [[ $RC -ne 0 ]]; then
+               error "got error, but should be not, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+       
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+       return 0
+}
+run_test 118i "Fix error before timeout in recoverable error  =========="
+
+test_118j() {
+        reset_async
+
+       #define OBD_FAIL_OST_BRW_WRITE_BULK2     0x220
+        sysctl -w lustre.fail_loc=0x220
+
+       # return -EIO from OST
+        multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c
+        RC=$?
+        sysctl -w lustre.fail_loc=0x0
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+       
+       # in recoverable error on OST we want resend and stay until it finished
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+       return 0
 }
 }
-run_test 118 "verify O_SYNC work"
+run_test 118j "Simulate unrecoverable OST side error =========="
 
 test_119a() # bug 11737
 {
 
 test_119a() # bug 11737
 {