Whamcloud - gitweb
b=16919
authorgreen <green>
Fri, 9 Jan 2009 04:03:08 +0000 (04:03 +0000)
committergreen <green>
Fri, 9 Jan 2009 04:03:08 +0000 (04:03 +0000)
r=adilger,shadow

Implement replay of bulk write RPCs, allow servers to reply to write RPCs before
all the metadata was finally committed to disk

18 files changed:
lustre/ChangeLog
lustre/include/lustre/lustre_idl.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/llite/rw.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_io_26.c
lustre/obdfilter/lproc_obdfilter.c
lustre/osc/osc_request.c
lustre/ost/lproc_ost.c
lustre/ost/ost_handler.c
lustre/ost/ost_internal.h
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/tests/replay-single.sh

index 6947046..7141d2c 100644 (file)
@@ -65,6 +65,12 @@ Bugzilla   : 18049
 Descriptoin: aborting recovery hang on MDS
 Details    : don't throttle destroy RPCs for the MDT.
 
+Severity   : enhancement
+Bugzilla   : 16919
+Descriptoin: Don't sync journal after every i/o
+Details    : Implement write RPC replay to allow server replies for write RPCs
+            before data is on disk.
+
 -------------------------------------------------------------------------------
 2008-12-31 Sun Microsystems, Inc.
        * version 1.8.0
index a092647..b30225a 100644 (file)
@@ -635,6 +635,7 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os);
 #define OBD_BRW_DROP            0x80 /* drop the page after IO */
 #define OBD_BRW_NOQUOTA        0x100
 #define OBD_BRW_SRVLOCK        0x200 /* Client holds no lock over this page */
+#define OBD_BRW_ASYNC          0x400 /* Server may delay commit to disk */
 
 #define OBD_OBJECT_EOF 0xffffffffffffffffULL
 
index a11e312..045b9e9 100644 (file)
@@ -371,6 +371,7 @@ struct filter_obd {
 
         int                      fo_fmd_max_num; /* per exp filter_mod_data */
         int                      fo_fmd_max_age; /* jiffies to fmd expiry */
+        int                      fo_syncjournal; /* sync journal on writes */
         struct llog_commit_master *fo_lcm;
 };
 
@@ -597,6 +598,7 @@ struct ost_obd {
         struct ptlrpc_service *ost_create_service;
         struct ptlrpc_service *ost_io_service;
         struct semaphore       ost_health_sem;
+        int                    ost_sync_on_lock_cancel;
 };
 
 struct echo_client_obd {
index 224034d..6e2b234 100644 (file)
@@ -83,6 +83,8 @@ extern unsigned int obd_alloc_fail_rate;
 #define OBD_RECOVERY_FACTOR (3) /* times obd_timeout */
 /* Change recovery-small 26b time if you change this */
 #define PING_INTERVAL max(obd_timeout / 4, 1U)
+/* a bit more than maximal journal commit time in seconds */
+#define PING_INTERVAL_SHORT 7
 /* Client may skip 1 ping; we must wait at least 2.5. But for multiple
  * failover targets the client only pings one server at a time, and pings
  * can be lost on a loaded network. Since eviction has serious consequences,
index 8449fc4..dbd1ab5 100644 (file)
@@ -926,13 +926,14 @@ static int queue_or_sync_write(struct obd_export *exp, struct inode *inode,
         struct obd_io_group *oig;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         int rc, noquot = llap->llap_ignore_quota ? OBD_BRW_NOQUOTA : 0;
+        int brwflags = OBD_BRW_ASYNC;
         ENTRY;
 
         /* _make_ready only sees llap once we've unlocked the page */
         llap->llap_write_queued = 1;
         rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL,
                                 llap->llap_cookie, OBD_BRW_WRITE | noquot,
-                                0, 0, 0, async_flags);
+                                0, 0, brwflags, async_flags);
         if (rc == 0) {
                 LL_CDEBUG_PAGE(D_PAGE, llap->llap_page, "write queued\n");
                 llap_write_pending(inode, llap);
index 7438201..d906c40 100644 (file)
@@ -1933,6 +1933,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
         filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
         filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
+        filter->fo_syncjournal = 0; /* Don't sync journals on i/o by default */
 
         rc = filter_prep(obd);
         if (rc)
@@ -3353,16 +3354,22 @@ int filter_recreate(struct obd_device *obd, struct obdo *oa)
         ENTRY;
 
         if (oa->o_id > filter_last_id(&obd->u.filter, oa->o_gr)) {
-                CERROR("recreate objid "LPU64" > last id "LPU64"\n",
-                       oa->o_id, filter_last_id(&obd->u.filter, oa->o_gr));
-                RETURN(-EINVAL);
-        }
-
-        if ((oa->o_valid & OBD_MD_FLFLAGS) == 0) {
-                oa->o_valid |= OBD_MD_FLFLAGS;
-                oa->o_flags = OBD_FL_RECREATE_OBJS;
+                if (!obd->obd_recovering ||
+                    oa->o_id > filter_last_id(&obd->u.filter, oa->o_gr) +
+                    OST_MAX_PRECREATE) {
+                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
+                               oa->o_id, filter_last_id(&obd->u.filter,
+                               oa->o_gr));
+                        RETURN(-EINVAL);
+                }
+                diff = oa->o_id - filter_last_id(&obd->u.filter, oa->o_gr);
         } else {
-                oa->o_flags |= OBD_FL_RECREATE_OBJS;
+                if ((oa->o_valid & OBD_MD_FLFLAGS) == 0) {
+                        oa->o_valid |= OBD_MD_FLFLAGS;
+                        oa->o_flags = OBD_FL_RECREATE_OBJS;
+                } else {
+                        oa->o_flags |= OBD_FL_RECREATE_OBJS;
+                }
         }
 
         down(&obd->u.filter.fo_create_lock);
index 1d56114..27f38cf 100644 (file)
@@ -629,9 +629,36 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         cleanup_phase = 2;
 
         if (dentry->d_inode == NULL) {
-                CERROR("%s: trying to BRW to non-existent file "LPU64"\n",
-                       obd->obd_name, obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
+                if (exp->exp_obd->obd_recovering) {
+                        struct obdo *noa = oa;
+
+                        if (oa == NULL) {
+                                OBDO_ALLOC(noa);
+                                if (noa == NULL)
+                                        GOTO(recreate_out, rc = -ENOMEM);
+                                noa->o_id = obj->ioo_id;
+                                noa->o_valid = OBD_MD_FLID;
+                        }
+
+                        if (filter_recreate(exp->exp_obd, noa) == 0) {
+                                f_dput(dentry);
+                                dentry = filter_fid2dentry(exp->exp_obd, NULL,
+                                                           obj->ioo_gr,
+                                                           obj->ioo_id);
+                        }
+                        if (oa == NULL)
+                                OBDO_FREE(noa);
+                }
+    recreate_out:
+                if (IS_ERR(dentry) || dentry->d_inode == NULL) {
+                        CERROR("%s: BRW to missing obj "LPU64"/"LPU64":rc %d\n",
+                               exp->exp_obd->obd_name,
+                               obj->ioo_id, obj->ioo_gr,
+                               IS_ERR(dentry) ? (int)PTR_ERR(dentry) : -ENOENT);
+                        if (IS_ERR(dentry))
+                                cleanup_phase = 1;
+                        GOTO(cleanup, rc = -ENOENT);
+                }
         }
 
         rc = filter_map_remote_to_local(objcount, obj, nb, pages, res);
index 5ad3dda..2689ae7 100644 (file)
@@ -492,7 +492,11 @@ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf,
                                 rc = rc2;
                 }
 
-                rc2 =fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
+                if (wait_handle)
+                        rc2 = fsfilt_commit_async(obd, inode, oti->oti_handle,
+                                                  wait_handle);
+                else
+                        rc2 = fsfilt_commit(obd, inode, oti->oti_handle, 0);
                 if (rc == 0)
                         rc = rc2;
                 if (rc != 0)
@@ -553,10 +557,11 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         int i, err, cleanup_phase = 0;
         struct obd_device *obd = exp->exp_obd;
         struct filter_obd *fo = &obd->u.filter;
-        void *wait_handle;
+        void *wait_handle = NULL;
         int total_size = 0;
         int rec_pending = 0;
         unsigned int qcids[MAXQUOTAS] = {0, 0};
+        int sync_journal_commit = obd->u.filter.fo_syncjournal;
         ENTRY;
 
         LASSERT(oti != NULL);
@@ -621,6 +626,10 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
                     (lnb->flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) ==
                     OBD_BRW_FROM_GRANT)
                         iobuf->dr_ignore_quota = 1;
+
+                if (!(lnb->flags & OBD_BRW_ASYNC)) {
+                        sync_journal_commit = 1;
+                }
         }
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
@@ -685,7 +694,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 
         /* filter_direct_io drops i_mutex */
         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
-                              oti, &wait_handle);
+                              oti, sync_journal_commit ? &wait_handle : NULL);
         if (rc == 0)
                 obdo_from_inode(oa, inode,
                                 FILTER_VALID_FLAGS |OBD_MD_FLUID |OBD_MD_FLGID);
@@ -696,13 +705,18 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 
         fsfilt_check_slow(obd, now, "direct_io");
 
-        err = fsfilt_commit_wait(obd, inode, wait_handle);
+        if (wait_handle)
+                err = fsfilt_commit_wait(obd, inode, wait_handle);
+        else
+                err = 0;
+
         if (err) {
                 CERROR("Failure to commit OST transaction (%d)?\n", err);
-                rc = err;
+                if (rc == 0)
+                        rc = err;
         }
 
-        if (obd->obd_replayable && !rc)
+        if (obd->obd_replayable && !rc && wait_handle)
                 LASSERTF(oti->oti_transno <= obd->obd_last_committed,
                          "oti_transno "LPU64" last_committed "LPU64"\n",
                          oti->oti_transno, obd->obd_last_committed);
index efd5f67..52801fc 100644 (file)
@@ -237,6 +237,34 @@ static int lprocfs_filter_wr_wcache(struct file *file, const char *buffer,
         return count;
 }
 
+int lprocfs_filter_rd_syncjournal(char *page, char **start, off_t off,
+                                  int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+        int rc;
+
+        rc = snprintf(page, count, "%u\n", obd->u.filter.fo_syncjournal);
+        return rc;
+}
+
+int lprocfs_filter_wr_syncjournal(struct file *file, const char *buffer,
+                                  unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        int val;
+        int rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+
+        if (val < 0)
+                return -EINVAL;
+
+        obd->u.filter.fo_syncjournal = !!val;
+        return count;
+}
+
 static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
         { "uuid",         lprocfs_rd_uuid,          0, 0 },
         { "blocksize",    lprocfs_rd_blksize,       0, 0 },
@@ -290,6 +318,8 @@ static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
                               lprocfs_obd_wr_stale_export_age, 0},
         { "flush_stale_exports", 0, lprocfs_obd_wr_flush_stale_exports, 0 },
 #endif
+        { "sync_journal", lprocfs_filter_rd_syncjournal,
+                          lprocfs_filter_wr_syncjournal, 0 },
         { 0 }
 };
 
index b7d986f..f194942 100644 (file)
@@ -930,7 +930,7 @@ static int check_write_rcs(struct ptlrpc_request *req,
 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 {
         if (p1->flag != p2->flag) {
-                unsigned mask = ~OBD_BRW_FROM_GRANT;
+                unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_ASYNC);
 
                 /* warn if we try to combine flags that we don't know to be
                  * safe to combine */
@@ -2031,7 +2031,7 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
                 }
                 OBDO_FREE(aa->aa_oa);
         } else { /* from async_internal() */
-                int i;
+                obd_count i;
                 for (i = 0; i < aa->aa_page_count; i++)
                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
         }
@@ -2040,6 +2040,7 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+
         RETURN(rc);
 }
 
index 9cb8912..9bb91ba 100644 (file)
 #include "ost_internal.h"
 
 #ifdef LPROCFS
+static char *sync_on_cancel_states[] = {"never",
+                                        "blocking",
+                                        "always" };
+
+int lprocfs_ost_rd_ost_sync_on_lock_cancel(char *page, char **start, off_t off,
+                                           int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+        int rc;
+
+        rc = snprintf(page, count, "%s\n",
+                     sync_on_cancel_states[obd->u.ost.ost_sync_on_lock_cancel]);
+        return rc;
+}
+
+int lprocfs_ost_wr_ost_sync_on_lock_cancel(struct file *file,
+                                           const char *buffer,
+                                           unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        int val = -1;
+        int i;
+
+        for (i = 0 ; i < NUM_SYNC_ON_CANCEL_STATES; i++) {
+                if (memcmp(buffer, sync_on_cancel_states[i],
+                    strlen(sync_on_cancel_states[i])) == 0) {
+                        val = i;
+                        break;
+                }
+        }
+        if (val == -1) {
+                int rc;
+                rc = lprocfs_write_helper(buffer, count, &val);
+                if (rc)
+                        return rc;
+        }
+
+        if (val < 0 || val > 2)
+                return -EINVAL;
+
+        obd->u.ost.ost_sync_on_lock_cancel = val;
+        return count;
+}
+
 static struct lprocfs_vars lprocfs_ost_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,   0, 0 },
+        { "sync_on_lock_cancel", lprocfs_ost_rd_ost_sync_on_lock_cancel,
+                                 lprocfs_ost_wr_ost_sync_on_lock_cancel, 0 },
         { 0 }
 };
 
index 9eb25aa..fe24fb8 100644 (file)
@@ -1241,6 +1241,37 @@ static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
 }
 #endif
 
+/* Ensure that data and metadata are synced to the disk when lock is cancelled
+ * (if requested) */
+int ost_blocking_ast(struct ldlm_lock *lock,
+                             struct ldlm_lock_desc *desc,
+                             void *data, int flag)
+{
+        struct obd_device *obd = lock->l_export->exp_obd;
+        if (flag == LDLM_CB_CANCELING &&
+            (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
+            (obd->u.ost.ost_sync_on_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
+             (obd->u.ost.ost_sync_on_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
+              lock->l_flags & LDLM_FL_CBPENDING))) {
+                struct obdo *oa;
+                int rc;
+
+                OBDO_ALLOC(oa);
+                oa->o_id = lock->l_resource->lr_name.name[0];
+                oa->o_valid = OBD_MD_FLID;
+
+                rc = obd_sync(lock->l_export, oa, NULL,
+                              lock->l_policy_data.l_extent.start,
+                              lock->l_policy_data.l_extent.end);
+                if (rc)
+                        CERROR("Error %d syncing data on lock cancel\n", rc);
+
+                OBDO_FREE(oa);
+        }
+
+        return ldlm_server_blocking_ast(lock, desc, data, flag);
+}
+
 static int ost_filter_recovery_request(struct ptlrpc_request *req,
                                        struct obd_device *obd, int *process)
 {
@@ -1773,7 +1804,7 @@ static int ost_handle(struct ptlrpc_request *req)
                 CDEBUG(D_INODE, "enqueue\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
-                                         ldlm_server_blocking_ast,
+                                         ost_blocking_ast,
                                          ldlm_server_glimpse_ast);
                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
                 break;
@@ -1877,6 +1908,9 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf)
 
         sema_init(&ost->ost_health_sem, 1);
 
+        /* Always sync on lock cancel */
+        ost->ost_sync_on_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
+
         if (oss_num_threads) {
                 /* If oss_num_threads is set, it is the min and the max. */
                 if (oss_num_threads > OSS_THREADS_MAX)
index 058db78..f0dc910 100644 (file)
@@ -80,4 +80,11 @@ static void lprocfs_ost_init_vars(struct lprocfs_static_vars *lvars)
 }
 #endif
 
+enum {
+        NEVER_SYNC_ON_CANCEL = 0,
+        BLOCKING_SYNC_ON_CANCEL = 1,
+        ALWAYS_SYNC_ON_CANCEL = 2,
+        NUM_SYNC_ON_CANCEL_STATES
+};
+
 #endif /* OST_INTERNAL_H */
index fb16181..1e77223 100644 (file)
@@ -161,11 +161,13 @@ void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
 
         desc->bd_nob += len;
 
+        cfs_page_pin(page);
         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
 }
 
 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 {
+        int i;
         ENTRY;
 
         LASSERT(desc != NULL);
@@ -177,6 +179,9 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
         else
                 class_import_put(desc->bd_import);
 
+        for (i = 0; i < desc->bd_iov_count ; i++)
+                cfs_page_unpin(desc->bd_iov[i].kiov_page);
+
         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
                                 bd_iov[desc->bd_max_iov]));
         EXIT;
@@ -985,6 +990,9 @@ static int after_reply(struct ptlrpc_request *req)
                         imp->imp_peer_committed_transno =
                                 lustre_msg_get_last_committed(req->rq_repmsg);
                 ptlrpc_free_committed(imp);
+
+                if (req->rq_transno > imp->imp_peer_committed_transno)
+                        ptlrpc_pinger_sending_on_import(imp);
                 spin_unlock(&imp->imp_lock);
         }
 
@@ -2213,8 +2221,6 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         ENTRY;
 
         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
-        /* Not handling automatic bulk replay yet (or ever?) */
-        LASSERT(req->rq_bulk == NULL);
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
index 6207437..ac317a9 100644 (file)
@@ -241,10 +241,14 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
 
         /* XXX Registering the same xid on retried bulk makes my head
          * explode trying to understand how the original request's bulk
-         * might interfere with the retried request -eeb */
-        LASSERTF (!desc->bd_registered || req->rq_xid != desc->bd_last_xid,
-                  "registered: %d  rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
-                  desc->bd_registered, req->rq_xid, desc->bd_last_xid);
+         * might interfere with the retried request -eeb
+         * On the other hand replaying with the same xid is fine, since
+         * we are guaranteed old request have completed. -green */
+        LASSERTF(!(desc->bd_registered &&
+                 req->rq_send_state != LUSTRE_IMP_REPLAY) ||
+                 req->rq_xid != desc->bd_last_xid,
+                 "registered: %d  rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
+                 desc->bd_registered, req->rq_xid, desc->bd_last_xid);
         desc->bd_registered = 1;
         desc->bd_last_xid = req->rq_xid;
 
index 4b9e1a0..be4955a 100644 (file)
@@ -108,10 +108,10 @@ int ptlrpc_ping(struct obd_import *imp)
 }
 EXPORT_SYMBOL(ptlrpc_ping);
 
-void ptlrpc_update_next_ping(struct obd_import *imp)
+void ptlrpc_update_next_ping(struct obd_import *imp, int soon)
 {
 #ifdef ENABLE_PINGER
-        int time = PING_INTERVAL;
+        int time = soon ? PING_INTERVAL_SHORT : PING_INTERVAL;
         if (imp->imp_state == LUSTRE_IMP_DISCON) {
                 int dtime = max_t(int, CONNECTION_SWITCH_MIN,
                                   AT_OFF ? 0 :
@@ -209,7 +209,7 @@ static int ptlrpc_pinger_main(void *arg)
                         if (cfs_time_after(imp->imp_next_ping,
                                            cfs_time_add(this_ping, 
                                                         cfs_time_seconds(PING_INTERVAL))))
-                                ptlrpc_update_next_ping(imp);
+                                ptlrpc_update_next_ping(imp, 0);
                 }
                 mutex_up(&pinger_sem);
                 /* update memory usage info */
@@ -317,7 +317,12 @@ int ptlrpc_stop_pinger(void)
 
 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
 {
-        ptlrpc_update_next_ping(imp);
+        ptlrpc_update_next_ping(imp, 0);
+}
+
+void ptlrpc_pinger_commit_expected(struct obd_import *imp)
+{
+        ptlrpc_update_next_ping(imp, 1);
 }
 
 int ptlrpc_pinger_add_import(struct obd_import *imp)
@@ -332,7 +337,7 @@ int ptlrpc_pinger_add_import(struct obd_import *imp)
         /* if we add to pinger we want recovery on this import */
         imp->imp_obd->obd_no_recov = 0;
 
-        ptlrpc_update_next_ping(imp);
+        ptlrpc_update_next_ping(imp, 0);
         /* XXX sort, blah blah */
         list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
         class_import_get(imp);
@@ -685,7 +690,7 @@ void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
 {
 #ifdef ENABLE_PINGER
         mutex_down(&pinger_sem);
-        ptlrpc_update_next_ping(imp);
+        ptlrpc_update_next_ping(imp, 0);
         if (pinger_args.pd_set == NULL &&
             time_before(imp->imp_next_ping, pinger_args.pd_next_ping)) {
                 CDEBUG(D_HA, "set next ping to "CFS_TIME_T"(cur "CFS_TIME_T")\n",
index 6fdb453..bdcdf48 100644 (file)
@@ -95,6 +95,7 @@ void ptl_rpc_wipe_bulk_pages(struct ptlrpc_bulk_desc *desc);
 int ptlrpc_start_pinger(void);
 int ptlrpc_stop_pinger(void);
 void ptlrpc_pinger_sending_on_import(struct obd_import *imp);
+void ptlrpc_pinger_commit_expected(struct obd_import *imp);
 void ptlrpc_pinger_wake_up(void);
 void ptlrpc_ping_import_soon(struct obd_import *imp);
 #ifdef __KERNEL__
index 5573475..f3868b1 100755 (executable)
@@ -1895,6 +1895,42 @@ test_73c() {
 }
 run_test 73c "open(O_CREAT), unlink, replay, reconnect at last_replay, close"
 
+test_80() {
+    do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 0"
+    
+    replay_barrier ost1
+    lfs setstripe -i 0 -c 1 $DIR/$tfile
+    dd if=/dev/urandom of=$DIR/$tfile bs=1024k count=8 || error "Cannot write"
+    cksum=`md5sum $DIR/$tfile | awk '{print $1}'`
+    cancel_lru_locks osc
+    fail ost1
+    dd if=$DIR/$tfile of=/dev/null bs=1024k count=8 || error "Cannot read"
+    cksum2=`md5sum $DIR/$tfile | awk '{print $1}'`
+    if [ $cksum != $cksum2 ] ; then
+       error "New checksum $cksum2 does not match original $cksum"
+    fi
+}
+run_test 80 "write replay"
+
+test_80b() {
+    do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 0"
+    
+    replay_barrier ost1
+    lfs setstripe -i 0 -c 1 $DIR/$tfile
+    dd if=/dev/urandom of=$DIR/$tfile bs=1024k count=8 || error "Cannot write"
+    sleep 1 # Give it a chance to flush dirty data
+    echo TESTTEST | dd of=$DIR/$tfile bs=1 count=8 seek=64
+    cksum=`md5sum $DIR/$tfile | awk '{print $1}'`
+    cancel_lru_locks osc
+    fail ost1
+    dd if=$DIR/$tfile of=/dev/null bs=1024k count=8 || error "Cannot read"
+    cksum2=`md5sum $DIR/$tfile | awk '{print $1}'`
+    if [ $cksum != $cksum2 ] ; then
+       error "New checksum $cksum2 does not match original $cksum"
+    fi
+}
+run_test 80b "write replay with changed data (checksum resend)"
+
 equals_msg `basename $0`: test complete, cleaning up
 check_and_cleanup_lustre
 [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG && grep -q FAIL $TESTSUITELOG && exit 1 || true