Whamcloud - gitweb
b=16919 Async journal commit support
authorOleg Drokin <green@linuxhacker.ru>
Fri, 23 Jul 2010 10:03:02 +0000 (14:03 +0400)
committerMikhail Pershin <tappro@sun.com>
Wed, 28 Jul 2010 12:07:46 +0000 (16:07 +0400)
Async journal commit for 2.0 branch. Also reverts patch from bug 22190 as no
longer needed.
Also included changes from b1_8 that were introduced in bug 22241.
Async commit is ENABLED by default.
i=adilger
i=tappro

19 files changed:
libcfs/include/libcfs/linux/linux-mem.h
libcfs/include/libcfs/user-mem.h
lustre/include/lustre/lustre_idl.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/obdecho/echo_client.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_io_26.c
lustre/obdfilter/lproc_obdfilter.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/tests/replay-single.sh

index 18c64e7..08b8e63 100644 (file)
@@ -98,6 +98,9 @@ static inline int cfs_page_count(cfs_page_t *page)
 
 #define cfs_page_index(p)       ((p)->index)
 
+#define cfs_page_pin(page) page_cache_get(page)
+#define cfs_page_unpin(page) page_cache_release(page)
+
 /*
  * Memory allocator
  * XXX Liang: move these declare to public file
index 3e72e20..df32135 100644 (file)
@@ -49,6 +49,8 @@ void cfs_kunmap(cfs_page_t *pg);
 #define cfs_get_page(p)                        __I_should_not_be_called__(at_all)
 #define cfs_page_count(p)              __I_should_not_be_called__(at_all)
 #define cfs_page_index(p)               ((p)->index)
+#define cfs_page_pin(page) do {} while (0)
+#define cfs_page_unpin(page) do {} while (0)
 
 /*
  * Memory allocator
index 2eaa676..e8df3f5 100644 (file)
@@ -1346,6 +1346,7 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os);
 #define OBD_BRW_NOCACHE         0x80 /* this page is a part of non-cached IO */
 #define OBD_BRW_NOQUOTA        0x100
 #define OBD_BRW_SRVLOCK        0x200 /* Client holds no lock over this page */
+#define OBD_BRW_ASYNC          0x400 /* Server may delay commit to disk */
 #define OBD_BRW_MEMALLOC       0x800 /* Client runs in the "kswapd" context */
 
 #define OBD_OBJECT_EOF 0xffffffffffffffffULL
index 79674c1..2c7da99 100644 (file)
@@ -345,6 +345,9 @@ struct filter_obd {
 
         int                      fo_fmd_max_num; /* per exp filter_mod_data */
         int                      fo_fmd_max_age; /* jiffies to fmd expiry */
+        unsigned long            fo_syncjournal:1, /* sync journal on writes */
+                                 fo_sync_lock_cancel:2;/* sync on lock cancel */
+
 
         /* sptlrpc stuff */
         cfs_rwlock_t             fo_sptlrpc_lock;
@@ -373,6 +376,14 @@ struct timeout_item {
 #define OSC_MAX_DIRTY_MB_MAX   2048     /* arbitrary, but < MAX_LONG bytes */
 #define OSC_DEFAULT_RESENDS      10
 
+/* possible values for fo_sync_lock_cancel */
+enum {
+        NEVER_SYNC_ON_CANCEL = 0,
+        BLOCKING_SYNC_ON_CANCEL = 1,
+        ALWAYS_SYNC_ON_CANCEL = 2,
+        NUM_SYNC_ON_CANCEL_STATES
+};
+
 #define MDC_MAX_RIF_DEFAULT       8
 #define MDC_MAX_RIF_MAX         512
 
@@ -1140,6 +1151,7 @@ enum obd_cleanup_stage {
 /*      KEY_SET_INFO in lustre_idl.h */
 #define KEY_SPTLRPC_CONF        "sptlrpc_conf"
 #define KEY_CONNECT_FLAG        "connect_flags"
+#define KEY_SYNC_LOCK_CANCEL    "sync_lock_cancel"
 
 
 struct lu_context;
index c944348..d9def8b 100644 (file)
@@ -121,6 +121,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_RECOVERY_TIME_SOFT          (obd_timeout * 3)
 /* Change recovery-small 26b time if you change this */
 #define PING_INTERVAL max(obd_timeout / 4, 1U)
+/* a bit more than maximal journal commit time in seconds */
+#define PING_INTERVAL_SHORT min(PING_INTERVAL, 7U)
 /* Client may skip 1 ping; we must wait at least 2.5. But for multiple
  * failover targets the client only pings one server at a time, and pings
  * can be lost on a loaded network. Since eviction has serious consequences,
index 912d8b6..b2c8815 100644 (file)
@@ -1515,6 +1515,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
         int                     rc;
         int                     verify;
         int                     gfp_mask;
+        int                     brw_flags = 0;
         ENTRY;
 
         verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
@@ -1534,6 +1535,9 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
         /* XXX think again with misaligned I/O */
         npages = count >> CFS_PAGE_SHIFT;
 
+        if (rw == OBD_BRW_WRITE)
+                brw_flags = OBD_BRW_ASYNC;
+
         OBD_ALLOC(pga, npages * sizeof(*pga));
         if (pga == NULL)
                 RETURN(-ENOMEM);
@@ -1558,7 +1562,7 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
                 pages[i] = pgp->pg;
                 pgp->count = CFS_PAGE_SIZE;
                 pgp->off = off;
-                pgp->flag = 0;
+                pgp->flag = brw_flags;
 
                 if (verify)
                         echo_client_page_debug_setup(lsm, pgp->pg, rw,
index 82dfe24..8d11d00 100644 (file)
@@ -2046,6 +2046,8 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
         filter->fo_readcache_max_filesize = FILTER_MAX_CACHE_SIZE;
         filter->fo_fmd_max_num = FILTER_FMD_MAX_NUM_DEFAULT;
         filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
+        filter->fo_syncjournal = 0; /* Don't sync journals on i/o by default */
+        filter_slc_set(filter); /* initialize sync on lock cancel */
 
         rc = filter_prep(obd);
         if (rc)
@@ -3921,8 +3923,8 @@ set_last_id:
         RETURN(rc);
 }
 
-static int filter_create(struct obd_export *exp, struct obdo *oa,
-                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
+int filter_create(struct obd_export *exp, struct obdo *oa,
+                  struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
         struct obd_device *obd = exp->exp_obd;
         struct filter_export_data *fed;
@@ -3961,7 +3963,8 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
 
         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
-                if (oa->o_id > filter_last_id(filter, oa->o_seq)) {
+                if (!obd->obd_recovering ||
+                    oa->o_id > filter_last_id(filter, oa->o_seq)) {
                         CERROR("recreate objid "LPU64" > last id "LPU64"\n",
                                oa->o_id, filter_last_id(filter, oa->o_seq));
                         rc = -EINVAL;
@@ -4337,6 +4340,12 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen,
                 RETURN(rc);
         }
 
+        if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) {
+                *((__u32 *) val) = obd->u.filter.fo_sync_lock_cancel;
+                *vallen = sizeof(__u32);
+                RETURN(0);
+        }
+
         CDEBUG(D_IOCTL, "invalid key\n");
         RETURN(-EINVAL);
 }
index 91d8ef9..7c73619 100644 (file)
@@ -152,7 +152,8 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
 int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
                    struct obd_trans_info *oti);
 
-struct dentry *filter_create_object(struct obd_device *obd, struct obdo *oa);
+int filter_create(struct obd_export *exp, struct obdo *oa,
+                  struct lov_stripe_md **ea, struct obd_trans_info *oti);
 
 struct obd_llog_group *filter_find_olg(struct obd_device *obd, int seq);
 
@@ -243,4 +244,15 @@ void blacklist_add(uid_t uid);
 void blacklist_del(uid_t uid);
 int blacklist_display(char *buf, int bufsize);
 
+/* sync on lock cancel is useless when we force a journal flush,
+ * and if we enable async journal commit, we should also turn on
+ * sync on lock cancel if it is not enabled already. */
+static inline void filter_slc_set(struct filter_obd *filter)
+{
+        if (filter->fo_syncjournal == 1)
+                filter->fo_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
+        else if (filter->fo_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
+                filter->fo_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
+}
+
 #endif /* _FILTER_INTERNAL_H */
index 17d4685..c53c435 100644 (file)
@@ -681,9 +681,36 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         cleanup_phase = 2;
 
         if (dentry->d_inode == NULL) {
-                CERROR("%s: trying to BRW to non-existent file "LPU64"\n",
-                       obd->obd_name, obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
+                if (exp->exp_obd->obd_recovering) {
+                        struct obdo *noa = oa;
+
+                        if (oa == NULL) {
+                                OBDO_ALLOC(noa);
+                                if (noa == NULL)
+                                        GOTO(recreate_out, rc = -ENOMEM);
+                                noa->o_id = obj->ioo_id;
+                                noa->o_valid = OBD_MD_FLID;
+                        }
+
+                        if (filter_create(exp, noa, NULL, oti) == 0) {
+                                f_dput(dentry);
+                                dentry = filter_fid2dentry(exp->exp_obd, NULL,
+                                                           obj->ioo_seq,
+                                                           obj->ioo_id);
+                        }
+                        if (oa == NULL)
+                                OBDO_FREE(noa);
+                }
+    recreate_out:
+                if (IS_ERR(dentry) || dentry->d_inode == NULL) {
+                        CERROR("%s: BRW to missing obj "LPU64"/"LPU64":rc %d\n",
+                               exp->exp_obd->obd_name,
+                               obj->ioo_id, obj->ioo_seq,
+                               IS_ERR(dentry) ? (int)PTR_ERR(dentry) : -ENOENT);
+                        if (IS_ERR(dentry))
+                                cleanup_phase = 1;
+                        GOTO(cleanup, rc = -ENOENT);
+                }
         }
 
         if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) &&
@@ -999,6 +1026,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
                 lnb[i].page = pga[i].pg;
                 rnb[i].offset = pga[i].off;
                 rnb[i].len = pga[i].count;
+                lnb[i].flags = rnb[i].flags = pga[i].flag;
         }
 
         obdo_to_ioobj(oinfo->oi_oa, &ioo);
index 7b39704..82cdfba 100644 (file)
@@ -510,8 +510,11 @@ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf,
                                 rc = rc2;
                 }
 
-                rc2 = fsfilt_commit_async(obd,inode,oti->oti_handle,
-                                          wait_handle);
+                if (wait_handle)
+                        rc2 = fsfilt_commit_async(obd,inode,oti->oti_handle,
+                                                  wait_handle);
+                else
+                        rc2 = fsfilt_commit(obd, inode, oti->oti_handle, 0);
                 if (rc == 0)
                         rc = rc2;
                 if (rc != 0)
@@ -572,10 +575,11 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         int i, err, cleanup_phase = 0;
         struct obd_device *obd = exp->exp_obd;
         struct filter_obd *fo = &obd->u.filter;
-        void *wait_handle;
+        void *wait_handle = NULL;
         int total_size = 0;
         unsigned int qcids[MAXQUOTAS] = { oa->o_uid, oa->o_gid };
         int rec_pending[MAXQUOTAS] = { 0, 0 }, quota_pages = 0;
+        int sync_journal_commit = obd->u.filter.fo_syncjournal;
         ENTRY;
 
         LASSERT(oti != NULL);
@@ -643,6 +647,10 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
                     (flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) ==
                      OBD_BRW_FROM_GRANT)
                         iobuf->dr_ignore_quota = 1;
+
+                if (!(lnb->flags & OBD_BRW_ASYNC)) {
+                        sync_journal_commit = 1;
+                }
         }
 
         /* we try to get enough quota to write here, and let ldiskfs
@@ -716,7 +724,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 
         /* filter_direct_io drops i_mutex */
         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
-                              oti, &wait_handle);
+                              oti, sync_journal_commit ? &wait_handle : NULL);
 
         obdo_from_inode(oa, inode, NULL, rc == 0 ? FILTER_VALID_FLAGS : 0 |
                                                    OBD_MD_FLUID |OBD_MD_FLGID);
@@ -725,13 +733,18 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 
         fsfilt_check_slow(obd, now, "direct_io");
 
-        err = fsfilt_commit_wait(obd, inode, wait_handle);
+        if (wait_handle)
+                err = fsfilt_commit_wait(obd, inode, wait_handle);
+        else
+                err = 0;
+
         if (err) {
                 CERROR("Failure to commit OST transaction (%d)?\n", err);
-                rc = err;
+                if (rc == 0)
+                        rc = err;
         }
 
-        if (obd->obd_replayable && !rc)
+        if (obd->obd_replayable && !rc && wait_handle)
                 LASSERTF(oti->oti_transno <= obd->obd_last_committed,
                          "oti_transno "LPU64" last_committed "LPU64"\n",
                          oti->oti_transno, obd->obd_last_committed);
index 92effc9..99bc28c 100644 (file)
@@ -354,6 +354,79 @@ int lprocfs_filter_wr_degraded(struct file *file, const char *buffer,
         return count;
 }
 
+int lprocfs_filter_rd_syncjournal(char *page, char **start, off_t off,
+                                  int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+        int rc;
+
+        rc = snprintf(page, count, "%u\n", obd->u.filter.fo_syncjournal);
+        return rc;
+}
+
+int lprocfs_filter_wr_syncjournal(struct file *file, const char *buffer,
+                                  unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        int val;
+        int rc;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+
+        if (val < 0)
+                return -EINVAL;
+
+        obd->u.filter.fo_syncjournal = !!val;
+        filter_slc_set(&obd->u.filter);
+
+        return count;
+}
+
+static char *sync_on_cancel_states[] = {"never",
+                                        "blocking",
+                                        "always" };
+
+int lprocfs_filter_rd_sync_lock_cancel(char *page, char **start, off_t off,
+                                       int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+        int rc;
+
+        rc = snprintf(page, count, "%s\n",
+                      sync_on_cancel_states[obd->u.filter.fo_sync_lock_cancel]);
+        return rc;
+}
+
+int lprocfs_filter_wr_sync_lock_cancel(struct file *file, const char *buffer,
+                                          unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        int val = -1;
+        int i;
+
+        for (i = 0 ; i < NUM_SYNC_ON_CANCEL_STATES; i++) {
+                if (memcmp(buffer, sync_on_cancel_states[i],
+                    strlen(sync_on_cancel_states[i])) == 0) {
+                        val = i;
+                        break;
+                }
+        }
+        if (val == -1) {
+                int rc;
+                rc = lprocfs_write_helper(buffer, count, &val);
+                if (rc)
+                        return rc;
+        }
+
+        if (val < 0 || val > 2)
+                return -EINVAL;
+
+        obd->u.filter.fo_sync_lock_cancel = val;
+        return count;
+}
+
 static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
         { "uuid",         lprocfs_rd_uuid,          0, 0 },
         { "blocksize",    lprocfs_rd_blksize,       0, 0 },
@@ -400,6 +473,10 @@ static struct lprocfs_vars lprocfs_filter_obd_vars[] = {
         { "mds_sync",     lprocfs_filter_rd_mds_sync, 0, 0},
         { "degraded",     lprocfs_filter_rd_degraded,
                           lprocfs_filter_wr_degraded, 0 },
+        { "sync_journal", lprocfs_filter_rd_syncjournal,
+                          lprocfs_filter_wr_syncjournal, 0 },
+        { "sync_on_lock_cancel", lprocfs_filter_rd_sync_lock_cancel,
+                                 lprocfs_filter_wr_sync_lock_cancel, 0 },
         { 0 }
 };
 
index 29568a7..167e3eb 100644 (file)
@@ -213,14 +213,15 @@ static int osc_page_cache_add(const struct lu_env *env,
         struct osc_page   *opg = cl2osc_page(slice);
         struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
         int result;
-        int brw_flags;
+        /* All cacheable IO is async-capable */
+        int brw_flags = OBD_BRW_ASYNC;
         int noquota = 0;
 
         LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
         ENTRY;
 
         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
-        brw_flags = opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
+        brw_flags |= opg->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
         if (!client_is_remote(osc_export(obj)) &&
             cfs_capable(CFS_CAP_SYS_RESOURCE)) {
                 brw_flags |= OBD_BRW_NOQUOTA;
index 6a21fc2..b8914c3 100644 (file)
@@ -1177,7 +1177,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 {
         if (p1->flag != p2->flag) {
                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
-                                  OBD_BRW_NOCACHE|OBD_BRW_SYNC);
+                                  OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
 
                 /* warn if we try to combine flags that we don't know to be
                  * safe to combine */
@@ -2216,7 +2216,7 @@ static int brw_interpret(const struct lu_env *env,
                 }
                 OBDO_FREE(aa->aa_oa);
         } else { /* from async_internal() */
-                int i;
+                obd_count i;
                 for (i = 0; i < aa->aa_page_count; i++)
                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
 
@@ -2229,6 +2229,7 @@ static int brw_interpret(const struct lu_env *env,
         if (!async)
                 cl_req_completion(env, aa->aa_clerq, rc);
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+
         RETURN(rc);
 }
 
index 2d00451..e2cde0b 100644 (file)
@@ -1323,14 +1323,6 @@ out_bulk:
         if (desc)
                 ptlrpc_free_bulk(desc);
 out:
-       /* XXX: don't send reply if obd rdonly mode, this can cause data loss
-        * on client, see bug 22190. Remove this when async bulk will be done.
-        * Meanwhile, if this is umount then don't reply anything. */
-        if (req->rq_export->exp_obd->obd_no_transno) {
-                no_reply = req->rq_export->exp_obd->obd_stopping;
-                rc = -EIO;
-        }
-
         if (rc == 0) {
                 oti_to_request(oti, req);
                 target_committed_to_req(req);
@@ -1731,6 +1723,45 @@ static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
         return rc;
 }
 
+/* Ensure that data and metadata are synced to the disk when lock is cancelled
+ * (if requested) */
+int ost_blocking_ast(struct ldlm_lock *lock,
+                             struct ldlm_lock_desc *desc,
+                             void *data, int flag)
+{
+        __u32 sync_lock_cancel = 0;
+        __u32 len = sizeof(sync_lock_cancel);
+        int rc = 0;
+        ENTRY;
+
+        rc = obd_get_info(lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
+                          KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
+
+        if (!rc && flag == LDLM_CB_CANCELING &&
+            (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
+            (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
+             (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
+              lock->l_flags & LDLM_FL_CBPENDING))) {
+                struct obdo *oa;
+                int rc;
+
+                OBDO_ALLOC(oa);
+                oa->o_id = lock->l_resource->lr_name.name[0];
+                oa->o_seq = lock->l_resource->lr_name.name[1];
+                oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
+
+                rc = obd_sync(lock->l_export, oa, NULL,
+                              lock->l_policy_data.l_extent.start,
+                              lock->l_policy_data.l_extent.end, NULL);
+                if (rc)
+                        CERROR("Error %d syncing data on lock cancel\n", rc);
+
+                OBDO_FREE(oa);
+        }
+
+        return ldlm_server_blocking_ast(lock, desc, data, flag);
+}
+
 static int ost_filter_recovery_request(struct ptlrpc_request *req,
                                        struct obd_device *obd, int *process)
 {
@@ -2366,7 +2397,7 @@ int ost_handle(struct ptlrpc_request *req)
                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
                         RETURN(0);
                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
-                                         ldlm_server_blocking_ast,
+                                         ost_blocking_ast,
                                          ldlm_server_glimpse_ast);
                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
                 break;
index 4e1a4de..7a26e90 100644 (file)
@@ -194,6 +194,7 @@ void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
 
         desc->bd_nob += len;
 
+        cfs_page_pin(page);
         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
 }
 
@@ -203,6 +204,7 @@ void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
  */
 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
 {
+        int i;
         ENTRY;
 
         LASSERT(desc != NULL);
@@ -217,6 +219,9 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
         else
                 class_import_put(desc->bd_import);
 
+        for (i = 0; i < desc->bd_iov_count ; i++)
+                cfs_page_unpin(desc->bd_iov[i].kiov_page);
+
         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
                                 bd_iov[desc->bd_max_iov]));
         EXIT;
@@ -1306,6 +1311,10 @@ static int after_reply(struct ptlrpc_request *req)
                                 lustre_msg_get_last_committed(req->rq_repmsg);
                 }
                 ptlrpc_free_committed(imp);
+
+                if (req->rq_transno > imp->imp_peer_committed_transno)
+                        ptlrpc_pinger_commit_expected(imp);
+
                 cfs_spin_unlock(&imp->imp_lock);
         }
 
@@ -2576,8 +2585,6 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         ENTRY;
 
         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
-        /* Not handling automatic bulk replay yet (or ever?) */
-        LASSERT(req->rq_bulk == NULL);
 
         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
         aa = ptlrpc_req_async_args(req);
index 537be4f..515014e 100644 (file)
@@ -254,10 +254,14 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
 
         /* XXX Registering the same xid on retried bulk makes my head
          * explode trying to understand how the original request's bulk
-         * might interfere with the retried request -eeb */
-        LASSERTF (!desc->bd_registered || req->rq_xid != desc->bd_last_xid,
-                  "registered: %d  rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
-                  desc->bd_registered, req->rq_xid, desc->bd_last_xid);
+         * might interfere with the retried request -eeb
+         * On the other hand replaying with the same xid is fine, since
+         * we are guaranteed old request have completed. -green */
+        LASSERTF(!(desc->bd_registered &&
+                 req->rq_send_state != LUSTRE_IMP_REPLAY) ||
+                 req->rq_xid != desc->bd_last_xid,
+                 "registered: %d  rq_xid: "LPU64" bd_last_xid: "LPU64"\n",
+                 desc->bd_registered, req->rq_xid, desc->bd_last_xid);
         desc->bd_registered = 1;
         desc->bd_last_xid = req->rq_xid;
 
index d2f56f5..821f55c 100644 (file)
@@ -105,10 +105,10 @@ int ptlrpc_ping(struct obd_import *imp)
         RETURN(0);
 }
 
-void ptlrpc_update_next_ping(struct obd_import *imp)
+void ptlrpc_update_next_ping(struct obd_import *imp, int soon)
 {
 #ifdef ENABLE_PINGER
-        int time = PING_INTERVAL;
+        int time = soon ? PING_INTERVAL_SHORT : PING_INTERVAL;
         if (imp->imp_state == LUSTRE_IMP_DISCON) {
                 int dtime = max_t(int, CONNECTION_SWITCH_MIN,
                                   AT_OFF ? 0 :
@@ -296,7 +296,7 @@ static int ptlrpc_pinger_main(void *arg)
                             cfs_time_after(imp->imp_next_ping,
                                            cfs_time_add(this_ping,
                                                         cfs_time_seconds(PING_INTERVAL))))
-                                ptlrpc_update_next_ping(imp);
+                                ptlrpc_update_next_ping(imp, 0);
                 }
                 cfs_mutex_up(&pinger_sem);
                 /* update memory usage info */
@@ -406,7 +406,12 @@ int ptlrpc_stop_pinger(void)
 
 void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
 {
-        ptlrpc_update_next_ping(imp);
+        ptlrpc_update_next_ping(imp, 0);
+}
+
+void ptlrpc_pinger_commit_expected(struct obd_import *imp)
+{
+        ptlrpc_update_next_ping(imp, 1);
 }
 
 int ptlrpc_pinger_add_import(struct obd_import *imp)
@@ -420,7 +425,7 @@ int ptlrpc_pinger_add_import(struct obd_import *imp)
                imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
         /* if we add to pinger we want recovery on this import */
         imp->imp_obd->obd_no_recov = 0;
-        ptlrpc_update_next_ping(imp);
+        ptlrpc_update_next_ping(imp, 0);
         /* XXX sort, blah blah */
         cfs_list_add_tail(&imp->imp_pinger_chain, &pinger_imports);
         class_import_get(imp);
@@ -902,7 +907,7 @@ void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
 {
 #ifdef ENABLE_PINGER
         cfs_mutex_down(&pinger_sem);
-        ptlrpc_update_next_ping(imp);
+        ptlrpc_update_next_ping(imp, 0);
         if (pinger_args.pd_set == NULL &&
             cfs_time_before(imp->imp_next_ping, pinger_args.pd_next_ping)) {
                 CDEBUG(D_HA, "set next ping to "CFS_TIME_T"(cur "CFS_TIME_T")\n",
@@ -913,6 +918,21 @@ void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
 #endif
 }
 
+void ptlrpc_pinger_commit_expected(struct obd_import *imp)
+{
+#ifdef ENABLE_PINGER
+        cfs_mutex_down(&pinger_sem);
+        ptlrpc_update_next_ping(imp, 1);
+        if (pinger_args.pd_set == NULL &&
+            cfs_time_before(imp->imp_next_ping, pinger_args.pd_next_ping)) {
+                CDEBUG(D_HA,"set next ping to "CFS_TIME_T"(cur "CFS_TIME_T")\n",
+                        imp->imp_next_ping, cfs_time_current());
+                pinger_args.pd_next_ping = imp->imp_next_ping;
+        }
+        cfs_mutex_up(&pinger_sem);
+#endif
+}
+
 int ptlrpc_add_timeout_client(int time, enum timeout_event event,
                               timeout_cb_t cb, void *data,
                               cfs_list_t *obd_list)
index 9186d5d..57bf5ba 100644 (file)
@@ -96,6 +96,7 @@ void lustre_put_emerg_rs(struct ptlrpc_reply_state *rs);
 int ptlrpc_start_pinger(void);
 int ptlrpc_stop_pinger(void);
 void ptlrpc_pinger_sending_on_import(struct obd_import *imp);
+void ptlrpc_pinger_commit_expected(struct obd_import *imp);
 void ptlrpc_pinger_wake_up(void);
 void ptlrpc_ping_import_soon(struct obd_import *imp);
 #ifdef __KERNEL__
index bdb9f40..1d073ec 100755 (executable)
@@ -2074,19 +2074,6 @@ test_84a() {
 }
 run_test 84a "stale open during export disconnect"
 
-test_85() { # bug 22190
-    local fail=0
-    do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 1"
-
-    replay_barrier ost1
-    lfs setstripe -i 0 -c 1 $DIR/$tfile
-    dd oflag=dsync if=/dev/urandom of=$DIR/$tfile bs=4k count=100 || fail=1
-    fail_abort ost1
-    echo "FAIL $fail"
-    [ $fail -ne 0 ] || error "Write was successful"
-}
-run_test 85 "ensure there is no reply on bulk write if obd is in rdonly mode"
-
 test_86() {
         local clients=${CLIENTS:-$HOSTNAME}
 
@@ -2097,6 +2084,42 @@ test_86() {
 }
 run_test 86 "umount server after clear nid_stats should not hit LBUG"
 
+test_87() {
+    do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 0"
+
+    replay_barrier ost1
+    lfs setstripe -i 0 -c 1 $DIR/$tfile
+    dd if=/dev/urandom of=$DIR/$tfile bs=1024k count=8 || error "Cannot write"
+    cksum=`md5sum $DIR/$tfile | awk '{print $1}'`
+    cancel_lru_locks osc
+    fail ost1
+    dd if=$DIR/$tfile of=/dev/null bs=1024k count=8 || error "Cannot read"
+    cksum2=`md5sum $DIR/$tfile | awk '{print $1}'`
+    if [ $cksum != $cksum2 ] ; then
+       error "New checksum $cksum2 does not match original $cksum"
+    fi
+}
+run_test 87 "write replay"
+
+test_87b() {
+    do_facet ost1 "lctl set_param -n obdfilter.${ost1_svc}.sync_journal 0"
+
+    replay_barrier ost1
+    lfs setstripe -i 0 -c 1 $DIR/$tfile
+    dd if=/dev/urandom of=$DIR/$tfile bs=1024k count=8 || error "Cannot write"
+    sleep 1 # Give it a chance to flush dirty data
+    echo TESTTEST | dd of=$DIR/$tfile bs=1 count=8 seek=64
+    cksum=`md5sum $DIR/$tfile | awk '{print $1}'`
+    cancel_lru_locks osc
+    fail ost1
+    dd if=$DIR/$tfile of=/dev/null bs=1024k count=8 || error "Cannot read"
+    cksum2=`md5sum $DIR/$tfile | awk '{print $1}'`
+    if [ $cksum != $cksum2 ] ; then
+       error "New checksum $cksum2 does not match original $cksum"
+    fi
+}
+run_test 87b "write replay with changed data (checksum resend)"
+
 equals_msg `basename $0`: test complete, cleaning up
 check_and_cleanup_lustre
 [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG && grep -q FAIL $TESTSUITELOG && exit 1 || true