Whamcloud - gitweb
improve handling recoverable errors
authorshadow <shadow>
Fri, 22 Jun 2007 15:38:30 +0000 (15:38 +0000)
committershadow <shadow>
Fri, 22 Jun 2007 15:38:30 +0000 (15:38 +0000)
b=11710
i=green
i=wangdi

15 files changed:
lustre/ChangeLog
lustre/include/liblustre.h
lustre/include/linux/lustre_compat25.h
lustre/include/obd_ost.h
lustre/include/obd_support.h
lustre/llite/file.c
lustre/llite/lproc_llite.c
lustre/llite/rw.c
lustre/osc/lproc_osc.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/tests/sanity.sh

index bf5fbc0..e9979a2 100644 (file)
@@ -378,6 +378,14 @@ Details    : improve checks for exported symbols. This allow run check without
              sources, but with Module.symvers shipped with kernel distribution.
              add check for truncate_complete_page used by patchless client.
 
+Severity   : major
+Bugzilla   : 11710
+Frequency  : always
+Description: improve handling recoverable errors
+Details    : if request processig with error which can be recoverable on server side
+            request should be resend, otherwise page released from cache and marked 
+            as error.
+
 --------------------------------------------------------------------------------
 
 2007-05-03  Cluster File Systems, Inc. <info@clusterfs.com>
index 1bcb9ab..efabc0d 100644 (file)
@@ -701,8 +701,12 @@ static inline void del_timer(struct timer_list *l)
         free(l);
 }
 
+
+
 typedef struct { volatile int counter; } atomic_t;
 
+#define ATOMIC_INIT(i)  { (i) }
+
 #define atomic_read(a) ((a)->counter)
 #define atomic_set(a,b) do {(a)->counter = b; } while (0)
 #define atomic_dec_and_test(a) ((--((a)->counter)) == 0)
@@ -711,7 +715,6 @@ typedef struct { volatile int counter; } atomic_t;
 #define atomic_dec(a)  do { (a)->counter--; } while (0)
 #define atomic_add(b,a)  do {(a)->counter += b;} while (0)
 #define atomic_sub(b,a)  do {(a)->counter -= b;} while (0)
-#define ATOMIC_INIT(i) { i }
 
 #ifndef likely
 #define likely(exp) (exp)
index 02248ac..6702e75 100644 (file)
@@ -359,6 +359,7 @@ static inline void cond_resched(void)
 #define CheckWriteback(page, cmd) 1
 #define set_page_writeback(page) do {} while (0)
 #define end_page_writeback(page) do {} while (0)
+#define wait_on_page_writeback(page do {} while (0)
 
 static inline int mapping_mapped(struct address_space *mapping)
 {
index d9accb5..ce21612 100644 (file)
@@ -18,7 +18,7 @@ struct osc_brw_async_args {
         int              aa_requested_nob;
         int              aa_nio_count;
         obd_count        aa_page_count;
-        int              aa_retries;
+        cfs_time_t       aa_start_send;
         struct brw_page **aa_ppga;
         struct client_obd *aa_cli;
         struct list_head aa_oaps;
index f4a3007..c70d881 100644 (file)
@@ -134,6 +134,7 @@ extern int obd_race_state;
 #define OBD_FAIL_OST_DROP_REQ            0x21d
 #define OBD_FAIL_OST_SETATTR_CREDITS     0x21e
 #define OBD_FAIL_OST_HOLD_WRITE_RPC      0x21f
+#define OBD_FAIL_OST_BRW_WRITE_BULK2     0x220
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
@@ -164,6 +165,7 @@ extern int obd_race_state;
 #define OBD_FAIL_OSC_SHUTDOWN            0x407
 #define OBD_FAIL_OSC_CHECKSUM_RECEIVE    0x408
 #define OBD_FAIL_OSC_CHECKSUM_SEND       0x409
+#define OBD_FAIL_OSC_BRW_PREP_REQ2       0x40a
 
 #define OBD_FAIL_PTLRPC                  0x500
 #define OBD_FAIL_PTLRPC_ACK              0x501
index 76b5753..0e67513 100644 (file)
@@ -638,9 +638,10 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
         struct page *page;
         int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
         struct lustre_handle lockh;
-        ENTRY;
+        struct address_space *mapping = inode->i_mapping;
 
-        memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
+        ENTRY;
+        tmpex = lock->l_policy_data;
         CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
                inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
                inode->i_size);
@@ -683,8 +684,8 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
         for (i = start; i <= end; i += (j + skip)) {
                 j = min(count - (i % count), end - i + 1);
                 LASSERT(j > 0);
-                LASSERT(inode->i_mapping);
-                if (ll_teardown_mmaps(inode->i_mapping,
+                LASSERT(mapping);
+                if (ll_teardown_mmaps(mapping,
                                       (__u64)i << CFS_PAGE_SHIFT,
                                       ((__u64)(i+j) << CFS_PAGE_SHIFT) - 1) )
                         break;
@@ -709,14 +710,14 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
                          tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
                          start, i, end);
 
-                if (!mapping_has_pages(inode->i_mapping)) {
+                if (!mapping_has_pages(mapping)) {
                         CDEBUG(D_INODE|D_PAGE, "nothing left\n");
                         break;
                 }
 
                 cond_resched();
 
-                page = find_get_page(inode->i_mapping, i);
+                page = find_get_page(mapping, i);
                 if (page == NULL)
                         continue;
                 LL_CDEBUG_PAGE(D_PAGE, page, "lock page idx %lu ext "LPU64"\n",
@@ -726,12 +727,22 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
                 /* page->mapping to check with racing against teardown */
                 if (!discard && clear_page_dirty_for_io(page)) {
                         rc = ll_call_writepage(inode, page);
-                        if (rc != 0)
-                                CERROR("writepage of page %p failed: %d\n",
-                                       page, rc);
                         /* either waiting for io to complete or reacquiring
                          * the lock that the failed writepage released */
                         lock_page(page);
+                        wait_on_page_writeback(page);
+                        if (rc != 0) {
+                                CERROR("writepage of page %p failed: %d\n",
+                                       page, rc);
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+                                if (rc == -ENOSPC)
+                                        set_bit(AS_ENOSPC, &mapping->flags);
+                                else
+                                        set_bit(AS_EIO, &mapping->flags);
+#else
+                                mapping->gfp_mask |= AS_EIO_MASK;
+#endif
+                        }
                 }
 
                 tmpex.l_extent.end = tmpex.l_extent.start + CFS_PAGE_SIZE - 1;
@@ -746,7 +757,7 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
                         // checking again to account for writeback's lock_page()
                         LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
                         if (llap)
-                                ll_ra_accounting(llap, inode->i_mapping);
+                                ll_ra_accounting(llap, mapping);
                         ll_truncate_complete_page(page);
                 }
                 unlock_page(page);
index 53591ec..dd8415a 100644 (file)
@@ -717,7 +717,7 @@ static int llite_dump_pgcache_seq_show(struct seq_file *seq, void *v)
         /* 2.4 doesn't seem to have SEQ_START_TOKEN, so we implement
          * it in our own state */
         if (dummy_llap->llap_magic == 0) {
-                seq_printf(seq, "gener |  llap  cookie  origin wq du | page "
+                seq_printf(seq, "gener |  llap  cookie  origin wq du wb | page "
                                 "inode index count [ page flags ]\n");
                 return 0;
         }
@@ -732,12 +732,13 @@ static int llite_dump_pgcache_seq_show(struct seq_file *seq, void *v)
                 LASSERTF(llap->llap_origin < LLAP__ORIGIN_MAX, "%u\n",
                          llap->llap_origin);
 
-                seq_printf(seq, "%5lu | %p %p %s %s %s | %p %p %lu %u [",
+                seq_printf(seq, "%5lu | %p %p %s %s %s %s | %p %p %lu %u [",
                            sbi->ll_pglist_gen,
                            llap, llap->llap_cookie,
                            llap_origins[llap->llap_origin],
                            llap->llap_write_queued ? "wq" : "- ",
                            llap->llap_defer_uptodate ? "du" : "- ",
+                           PageWriteback(page) ? "wb" : "-",
                            page, page->mapping->host, page->index,
                            page_count(page));
                 seq_page_flag(seq, page, locked, has_flags);
@@ -748,9 +749,10 @@ static int llite_dump_pgcache_seq_show(struct seq_file *seq, void *v)
 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,12))
                 seq_page_flag(seq, page, highmem, has_flags);
 #endif
+                seq_page_flag(seq, page, writeback, has_flags);
                 if (!has_flags)
                         seq_puts(seq, "-]\n");
-                else 
+                else
                         seq_puts(seq, "]\n");
         }
 
index a749101..5ef5aaa 100644 (file)
@@ -492,18 +492,16 @@ int llap_shrink_cache(struct ll_sb_info *sbi, int shrink_fraction)
                         continue;
                 }
 
-                if (llap->llap_write_queued || PageDirty(page) ||
-                    (!PageUptodate(page) &&
-                     llap->llap_origin != LLAP_ORIGIN_READAHEAD))
-                        keep = 1;
-                else
-                        keep = 0;
-
-                LL_CDEBUG_PAGE(D_PAGE, page,"%s LRU page: %s%s%s%s origin %s\n",
+               keep = (llap->llap_write_queued || PageDirty(page) ||
+                      PageWriteback(page) || (!PageUptodate(page) &&
+                      llap->llap_origin != LLAP_ORIGIN_READAHEAD));
+
+                LL_CDEBUG_PAGE(D_PAGE, page,"%s LRU page: %s%s%s%s%s origin %s\n",
                                keep ? "keep" : "drop",
                                llap->llap_write_queued ? "wq " : "",
                                PageDirty(page) ? "pd " : "",
                                PageUptodate(page) ? "" : "!pu ",
+                               PageWriteback(page) ? "wb" : "",
                                llap->llap_defer_uptodate ? "" : "!du",
                                llap_origins[llap->llap_origin]);
 
@@ -859,9 +857,6 @@ int ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
         } else {
                 if (cmd & OBD_BRW_READ) {
                         llap->llap_defer_uptodate = 0;
-                } else {
-                        ll_redirty_page(page);
-                        ret = 1;
                 }
                 SetPageError(page);
         }
@@ -1388,7 +1383,9 @@ out:
                 if (PageWriteback(page)) {
                         end_page_writeback(page);
                 }
-                ll_redirty_page(page);
+                /* resend page only for not started IO*/
+                if (!PageError(page))
+                        ll_redirty_page(page);
                 unlock_page(page);
         }
         RETURN(rc);
index 4a0f183..57766e4 100644 (file)
@@ -303,6 +303,30 @@ static int osc_wr_checksum(struct file *file, const char *buffer,
         return count;
 }
 
+static int osc_rd_resendtime(char *page, char **start, off_t off, int count,
+                             int *eof, void *data)
+{
+        return snprintf(page, count, CFS_TIME_T, 
+                        cfs_duration_sec(atomic_read(&osc_resend_time)));
+}
+
+static int osc_wr_resendtime(struct file *file, const char *buffer,
+                             unsigned long count, void *data)
+{
+        int val, rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+
+        if (val < 0)
+               return -EINVAL;
+
+        atomic_set(&osc_resend_time, cfs_time_seconds(val));
+
+        return count;
+}
+
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,        0, 0 },
         { "ping",            0, lprocfs_wr_ping,        0 },
@@ -334,6 +358,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
 
 static struct lprocfs_vars lprocfs_module_vars[] = {
         { "num_refs",        lprocfs_rd_numrefs,     0, 0 },
+        { "resend_timeout",  osc_rd_resendtime, osc_wr_resendtime, 0},
         { 0 }
 };
 
@@ -463,3 +488,4 @@ int lproc_osc_attach_seqstat(struct obd_device *dev)
 
 LPROCFS_INIT_VARS(osc, lprocfs_module_vars, lprocfs_obd_vars)
 #endif /* LPROCFS */
+
index 18148c8..7fd68ba 100644 (file)
@@ -72,4 +72,29 @@ static inline int lproc_osc_attach_seqstat(struct obd_device *dev) {return 0;}
         ({ type __x = (x); type __y = (y); __x < __y ? __x: __y; })
 #endif
 
+static inline int osc_recoverable_error(int rc)
+{
+        return (rc == -EIO || rc == -EROFS || rc == -ENOMEM || rc == -EAGAIN);
+}
+
+/* osc_requests.c */
+
+/* how long time request will be resend after got a recoverable error.
+ * time measured in seconds */
+extern atomic_t osc_resend_time;
+/*default timeout is 10s */
+#define OSC_DEFAULT_TIMEOUT 10
+
+static inline int osc_should_resend(cfs_time_t start)
+{
+        cfs_duration_t resend = atomic_read(&osc_resend_time);
+        int ret;
+
+        ret = resend != 0 && 
+              (cfs_time_after(cfs_time_add(start, resend),
+               cfs_time_current()));
+
+        return ret;
+}
+
 #endif /* OSC_INTERNAL_H */
index 65833f7..ceb0c69 100644 (file)
@@ -66,6 +66,9 @@ static void osc_release_ppga(struct brw_page **ppga, obd_count count);
 static quota_interface_t *quota_interface;
 extern quota_interface_t osc_quota_interface;
 
+/* by default 10s */
+atomic_t osc_resend_time; 
+
 /* Pack OSC object metadata for disk storage (LE byte order). */
 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
                       struct lov_stripe_md *lsm)
@@ -917,6 +920,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         struct osc_brw_async_args *aa;
 
         ENTRY;
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
+
         opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
         pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
 
@@ -928,7 +934,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
 
-        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
                                    NULL, pool);
         if (req == NULL)
@@ -1031,7 +1036,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         aa->aa_requested_nob = requested_nob;
         aa->aa_nio_count = niocount;
         aa->aa_page_count = page_count;
-        aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
+        aa->aa_start_send = cfs_time_current();
         aa->aa_ppga = pga;
         aa->aa_cli = cli;
         INIT_LIST_HEAD(&aa->aa_oaps);
@@ -1230,7 +1235,8 @@ static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
                             obd_count page_count, struct brw_page **pga)
 {
         struct ptlrpc_request *request;
-        int                    rc, retries = 5; /* lprocfs? */
+        int                    rc;
+        cfs_time_t             start_send = cfs_time_current();
         ENTRY;
 
 restart_bulk:
@@ -1250,10 +1256,12 @@ restart_bulk:
         rc = osc_brw_fini_request(request, rc);
 
         ptlrpc_req_finished(request);
-        if (rc == -EAGAIN) {
-                if (retries-- > 0)
-                        goto restart_bulk;
-                rc = -EIO;
+        if (osc_recoverable_error(rc)) {
+                if (!osc_should_resend(start_send)) {
+                        CERROR("too many resend retries, returning error\n");
+                        RETURN(-EIO);
+                }
+                goto restart_bulk;
         }
         RETURN(rc);
 }
@@ -1268,12 +1276,12 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         int rc = 0;
         ENTRY;
 
-        if (aa->aa_retries-- <= 0) {
-                CERROR("too many checksum retries, returning error\n");
+        if (!osc_should_resend(aa->aa_start_send)) {
+                CERROR("too many resend retries, returning error\n");
                 RETURN(-EIO);
         }
 
-        DEBUG_REQ(D_ERROR, request, "redo for checksum error");
+        DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
                 if (oap->oap_request != NULL) {
                         LASSERTF(request == oap->oap_request,
@@ -1326,7 +1334,8 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
         ENTRY;
 
         rc = osc_brw_fini_request(request, rc);
-        if (rc == -EAGAIN) {
+        CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);  
+        if (osc_recoverable_error(rc)) {
                 rc = osc_brw_redo_request(request, aa);
                 if (rc == 0)
                         RETURN(0);
@@ -1743,7 +1752,7 @@ unlock:
  * the app does an fsync.  As long as errors persist we force future rpcs to be
  * sync so that the app can get a sync error and break the cycle of queueing
  * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
+static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                            int rc)
 {
         if (rc) {
@@ -1756,7 +1765,7 @@ static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
 
         }
 
-        if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
+        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
                 ar->ar_force_sync = 0;
 }
 
@@ -1780,18 +1789,21 @@ static void osc_oap_to_pending(struct osc_async_page *oap)
 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
                               struct osc_async_page *oap, int sent, int rc)
 {
+        __u64 xid = 0;
+
         ENTRY;
+        if (oap->oap_request != NULL) {
+                xid = ptlrpc_req_xid(oap->oap_request);
+                ptlrpc_req_finished(oap->oap_request);
+                oap->oap_request = NULL;
+        }
+
         oap->oap_async_flags = 0;
         oap->oap_interrupted = 0;
 
         if (oap->oap_cmd & OBD_BRW_WRITE) {
-                osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
-                osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
-        }
-
-        if (oap->oap_request != NULL) {
-                ptlrpc_req_finished(oap->oap_request);
-                oap->oap_request = NULL;
+                osc_process_ar(&cli->cl_ar, xid, rc);
+                osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
         }
 
         if (rc == 0 && oa != NULL) {
@@ -1836,11 +1848,11 @@ static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
 
         rc = osc_brw_fini_request(request, rc);
         CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
-        if (rc == -EAGAIN) {
+
+       if (osc_recoverable_error(rc)) {
                 rc = osc_brw_redo_request(request, aa);
                 if (rc == 0)
                         RETURN(0);
-                GOTO(out, rc);
         }
 
         cli = aa->aa_cli;
@@ -1869,8 +1881,6 @@ static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
 
         obdo_free(aa->aa_oa);
 
-        rc = 0;
-out:
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
         RETURN(rc);
 }
@@ -3681,13 +3691,13 @@ struct obd_ops osc_obd_ops = {
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
 };
-
 int __init osc_init(void)
 {
         struct lprocfs_static_vars lvars;
         int rc;
         ENTRY;
 
+        atomic_set(&osc_resend_time, cfs_time_seconds(OSC_DEFAULT_TIMEOUT));
         lprocfs_init_vars(osc, &lvars);
 
         request_module("lquota");
index 9ad72bc..9d0d065 100644 (file)
@@ -880,6 +880,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
                 GOTO(out, rc = -EIO);
+        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
+                GOTO(out, rc = -EFAULT);
 
         /* pause before transaction has been started */
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
index 80a6b50..af45f4d 100644 (file)
@@ -512,10 +512,12 @@ static int ptlrpc_import_delay_req(struct obd_import *imp,
                 /* allow CONNECT even if import is invalid */ ;
         } else if (imp->imp_invalid) {
                 /* If the import has been invalidated (such as by an OST
-                 * failure), the request must fail with -EIO. */
+                 * failure) the request must fail with -EINVAL.  This
+                 * indicates the requests should be discarded, an -EIO
+                 * may result in a resend of the request. */
                 if (!imp->imp_deactive)
                         DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
-                *status = -EIO;
+                *status = -EINVAL;
         } else if (req->rq_import_generation != imp->imp_generation) {
                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
                 *status = -EIO;
@@ -630,7 +632,7 @@ static int after_reply(struct ptlrpc_request *req)
         /* Either we've been evicted, or the server has failed for
          * some reason. Try to reconnect, and if that fails, punt to the
          * upcall. */
-        if ((rc == -ENOTCONN) || (rc == -ENODEV)) {
+        if (ptlrpc_recoverable_error(rc)) {
                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
                         RETURN(-ENOTCONN);
index a9a9a8d..698d9bc 100644 (file)
@@ -116,7 +116,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req);
 
 /* pers.c */
 void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc);
-void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page, 
+void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page,
                           int pageoffset, int len);
 void ptl_rpc_wipe_bulk_pages(struct ptlrpc_bulk_desc *desc);
 
@@ -132,4 +132,8 @@ int ping_evictor_wake(struct obd_export *exp);
 #define ping_evictor_wake(exp)     1
 #endif
 
+static inline int ptlrpc_recoverable_error(int rc)
+{ 
+        return (rc == -ENOTCONN || rc == -ENODEV);
+}
 #endif /* PTLRPC_INTERNAL_H */
index 14f033f..98b5360 100644 (file)
@@ -3820,15 +3820,315 @@ test_117() # bug 10891
 }
 run_test 117 "verify fsfilt_extend =========="
 
-test_118() #bug 11710
+# Reset async IO behavior after error case
+reset_async() {
+       FILE=$DIR/reset_async
+
+       # Ensure all OSCs are cleared
+       $LSTRIPE $FILE 0 -1 -1
+        dd if=/dev/zero of=$FILE bs=64k count=$OSTCOUNT
+       sync
+        rm $FILE
+}
+
+test_118a() #bug 11710
 {
-       sync; sleep 1; sync
-       multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c;
-       dirty=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+       reset_async
+
+       multiop $DIR/$tfile oO_CREAT:O_RDWR:O_SYNC:w4096c
+       DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+               return 1;
+        fi
+}
+run_test 118a "verify O_SYNC works =========="
+
+test_118b()
+{
+       reset_async
+
+       #define OBD_FAIL_OST_ENOENT 0x217
+       sysctl -w lustre.fail_loc=0x217
+       multiop $DIR/$tfile Ow4096yc
+       RC=$?
+       sysctl -w lustre.fail_loc=0
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+               return 1;
+       fi
+
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+               return 1;
+       fi
+
+       echo "Dirty pages not leaked on ENOENT"
+
+       # Due to the above error the OSC will issue all RPCs syncronously
+       # until a subsequent RPC completes successfully without error.
+       multiop $DIR/$tfile Ow4096yc
+       rm -f $DIR/$tfile
+       
+       return 0
+}
+run_test 118b "Reclaim dirty pages on fatal error =========="
+
+test_118c()
+{
+       reset_async
+
+       #define OBD_FAIL_OST_EROFS               0x216
+       sysctl -w lustre.fail_loc=0x216
+
+       # multiop should block due to fsync until pages are written
+       multiop $DIR/$tfile Ow4096yc &
+       MULTIPID=$!
+       sleep 1
+
+       if [[ `ps h -o comm -p $MULTIPID` != "multiop" ]]; then
+               error "Multiop failed to block on fsync, pid=$MULTIPID"
+       fi
+
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $WRITEBACK -eq 0 ]]; then
+               error "No page in writeback, writeback=$WRITEBACK"
+       fi
+
+       sysctl -w lustre.fail_loc=0
+        wait $MULTIPID
+       RC=$?
+       if [[ $RC -ne 0 ]]; then
+               error "Multiop fsync failed, rc=$RC"
+       fi
+
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+       
+       rm -f $DIR/$tfile
+       echo "Dirty pages flushed via fsync on EROFS"
+       return 0
+}
+run_test 118c "Fsync blocks on EROFS until dirty pages are flushed =========="
+
+test_118d()
+{
+       reset_async
+
+       #define OBD_FAIL_OST_BRW_PAUSE_BULK
+       sysctl -w lustre.fail_loc=0x214
+       # set 10s timeout
+       echo "10" > $LPROC/osc/resend_timeout
+       # multiop should block due to fsync until pages are written
+       multiop $DIR/$tfile Ow4096yc &
+       
+       MULTIPID=$!
+       if [[ `ps h -o comm -p $MULTIPID` != "multiop" ]]; then
+               error "Multiop failed to block on fsync, pid=$MULTIPID"
+       fi
+
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $WRITEBACK -eq 0 ]]; then
+               error "No page in writeback, writeback=$WRITEBACK"
+       fi
+
+        wait $MULTIPID || error "Multiop fsync failed, rc=$?"
+
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)  
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "Dirty pages gaurenteed flushed via fsync"
+       return 0
+}
+run_test 118d "Fsync validation inject a delay of the bulk =========="
+
+test_118f() {
+        reset_async
+
+        #define OBD_FAIL_OSC_BRW_PREP_REQ2        0x40a
+        sysctl -w lustre.fail_loc=0x8000040a
+
+       # Should simulate EINVAL error which is fatal
+        multiop $DIR/$tfile Owy
+        RC=$?
+
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
        
-       return $dirty
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+        reset_async
+       return 0
+}
+run_test 118f "Simulate unrecoverable OSC side error =========="
+
+test_118g() {
+        reset_async
+
+       #define OBD_FAIL_OSC_BRW_PREP_REQ        0x406
+        sysctl -w lustre.fail_loc=0x406
+
+       # simulate local -ENOMEM
+        multiop $DIR/$tfile Owy
+        RC=$?
+       
+        sysctl -w lustre.fail_loc=0
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+       
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+        reset_async
+       return 0
+}
+run_test 118g "Don't stay in wait if we got local -ENOMEM  =========="
+
+test_118h() {
+        reset_async
+
+       #define OBD_FAIL_OST_BRW_WRITE_BULK      0x20e
+        sysctl -w lustre.fail_loc=0x20e
+       # set 10s timeout
+       echo "10" > $LPROC/osc/resend_timeout
+       # Should simulate ENOMEM error which is recoverable and should be handled by timeout
+        multiop $DIR/$tfile Owy
+        RC=$?
+       
+        sysctl -w lustre.fail_loc=0
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+       
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+        reset_async
+       return 0
+}
+run_test 118h "Verify timeout in handling recoverables errors  =========="
+
+test_118i() {
+        reset_async
+
+       #define OBD_FAIL_OST_BRW_WRITE_BULK      0x20e
+        sysctl -w lustre.fail_loc=0x20e
+       
+       # set 10s timeout
+       echo "10" > $LPROC/osc/resend_timeout
+       # Should simulate ENOMEM error which is recoverable and should be handled by timeout
+        multiop $DIR/$tfile Owy &
+       PID=$!
+       sleep 5
+       sysctl -w lustre.fail_loc=0
+       
+       wait $PID
+        RC=$?
+       if [[ $RC -ne 0 ]]; then
+               error "got error, but should be not, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+       
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+        reset_async
+       return 0
+}
+run_test 118i "Fix error before timeout in recoverable error  =========="
+
+test_118j() {
+        reset_async
+
+       #define OBD_FAIL_OST_BRW_WRITE_BULK2     0x220
+        sysctl -w lustre.fail_loc=0x220
+
+       # return -EIO from OST
+        multiop $DIR/$tfile Owy
+        RC=$?
+        sysctl -w lustre.fail_loc=0x0
+       if [[ $RC -eq 0 ]]; then
+               error "Must return error due to dropped pages, rc=$RC"
+       fi
+
+        LOCKED=$(grep -c locked $LPROC/llite/*/dump_page_cache)
+        DIRTY=$(grep -c dirty $LPROC/llite/*/dump_page_cache)
+        WRITEBACK=$(grep -c writeback $LPROC/llite/*/dump_page_cache)
+       if [[ $LOCKED -ne 0 ]]; then
+               error "Locked pages remain in cache, locked=$LOCKED"
+       fi
+       
+       # in recoverable error on OST we want resend and stay until it finished
+       if [[ $DIRTY -ne 0 || $WRITEBACK -ne 0 ]]; then
+               error "Dirty pages not flushed to disk, dirty=$DIRTY, writeback=$WRITEBACK"
+       fi
+
+
+       rm -f $DIR/$tfile
+       echo "No pages locked after fsync"
+
+        reset_async
+       return 0
 }
-run_test 118 "verify O_SYNC works"
+run_test 118j "Simulate unrecoverable OST side error =========="
 
 test_119a() # bug 11737
 {