Whamcloud - gitweb
LU-7039 llog: update llog header and size 69/16969/35
authorDi Wang <di.wang@intel.com>
Mon, 26 Oct 2015 10:02:29 +0000 (03:02 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 26 Jan 2016 20:20:00 +0000 (20:20 +0000)
Once update request fails due to eviction or other failures,
all of update request in the sending list should return fail,
because after the failure, the update log in the following
request will have wrong llog bitmap. So once this happens,it
will

1. invalidate all of requests in the sending list.
2. lod_sub will update the llog header from remote target.
3. Then Sending list can accept new request.

Also a few other fixes for llog corruption

1. Because the size in OSP cache is not safe, because no lock
protect it. So we will add lgh_write_offset in loghandle to
track the write offset for remote update llog, and revalidate
the offset during updating the llog header.

2. rollback the lgh_index and bitmap once add new records
fails.

Add replay-single.sh 118 to verify the case.

Signed-off-by: Di Wang <di.wang@intel.com>
Change-Id: I2d3a700d3363867ac60aeb6b7641eceb65dfe12a
Reviewed-on: http://review.whamcloud.com/16969
Tested-by: Jenkins
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
14 files changed:
lustre/include/lustre_log.h
lustre/include/obd_support.h
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_trans.c
lustre/target/out_handler.c
lustre/target/update_trans.c
lustre/tests/replay-single.sh
lustre/tests/sanity.sh

index b526aaa..71bff91 100644 (file)
@@ -104,6 +104,8 @@ int llog_is_empty(const struct lu_env *env, struct llog_ctxt *ctxt,
 int llog_backup(const struct lu_env *env, struct obd_device *obd,
                struct llog_ctxt *ctxt, struct llog_ctxt *bak_ctxt,
                char *name, char *backup);
 int llog_backup(const struct lu_env *env, struct obd_device *obd,
                struct llog_ctxt *ctxt, struct llog_ctxt *bak_ctxt,
                char *name, char *backup);
+int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
+                    const struct obd_uuid *uuid);
 
 /* llog_process flags */
 #define LLOG_FLAG_NODEAMON 0x0001
 
 /* llog_process flags */
 #define LLOG_FLAG_NODEAMON 0x0001
@@ -271,6 +273,8 @@ struct llog_handle {
        int                      lgh_last_idx;
        int                      lgh_cur_idx; /* used during llog_process */
        __u64                    lgh_cur_offset; /* used during llog_process */
        int                      lgh_last_idx;
        int                      lgh_cur_idx; /* used during llog_process */
        __u64                    lgh_cur_offset; /* used during llog_process */
+       /* used during llog_osd_write_rec */
+       __u64                    lgh_write_offset;
        struct llog_ctxt        *lgh_ctxt;
        union {
                struct plain_handle_data         phd;
        struct llog_ctxt        *lgh_ctxt;
        union {
                struct plain_handle_data         phd;
@@ -280,6 +284,8 @@ struct llog_handle {
        void                    *private_data;
        struct llog_operations  *lgh_logops;
        atomic_t                 lgh_refcount;
        void                    *private_data;
        struct llog_operations  *lgh_logops;
        atomic_t                 lgh_refcount;
+
+       __u32                   lgh_stale:1;
 };
 
 /* llog_osd.c */
 };
 
 /* llog_osd.c */
index c1056ee..aedd2c8 100644 (file)
@@ -568,6 +568,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_SPLIT_UPDATE_REC      0x1702
 #define OBD_FAIL_LARGE_STRIPE          0x1703
 #define OBD_FAIL_OUT_ENOSPC             0x1704
 #define OBD_FAIL_SPLIT_UPDATE_REC      0x1702
 #define OBD_FAIL_LARGE_STRIPE          0x1703
 #define OBD_FAIL_OUT_ENOSPC             0x1704
+#define OBD_FAIL_INVALIDATE_UPDATE     0x1705
 
 /* MIGRATE */
 #define OBD_FAIL_MIGRATE_NET_REP               0x1800
 
 /* MIGRATE */
 #define OBD_FAIL_MIGRATE_NET_REP               0x1800
index 4f46275..56e941e 100644 (file)
@@ -291,12 +291,12 @@ out_trans:
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int llog_read_header(const struct lu_env *env,
-                           struct llog_handle *handle,
-                           struct obd_uuid *uuid)
+int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
+                    const struct obd_uuid *uuid)
 {
        struct llog_operations *lop;
        int rc;
 {
        struct llog_operations *lop;
        int rc;
+       ENTRY;
 
        rc = llog_handle2ops(handle, &lop);
        if (rc)
 
        rc = llog_handle2ops(handle, &lop);
        if (rc)
@@ -311,6 +311,7 @@ static int llog_read_header(const struct lu_env *env,
 
                /* lrh_len should be initialized in llog_init_handle */
                handle->lgh_last_idx = 0; /* header is record with index 0 */
 
                /* lrh_len should be initialized in llog_init_handle */
                handle->lgh_last_idx = 0; /* header is record with index 0 */
+               handle->lgh_write_offset = 0;
                llh->llh_count = 1;         /* for the header record */
                llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
                LASSERT(handle->lgh_ctxt->loc_chunk_size >=
                llh->llh_count = 1;         /* for the header record */
                llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
                LASSERT(handle->lgh_ctxt->loc_chunk_size >=
@@ -322,13 +323,19 @@ static int llog_read_header(const struct lu_env *env,
                        memcpy(&llh->llh_tgtuuid, uuid,
                               sizeof(llh->llh_tgtuuid));
                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
                        memcpy(&llh->llh_tgtuuid, uuid,
                               sizeof(llh->llh_tgtuuid));
                llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
+               /* Since update llog header might also call this function,
+                * let's reset the bitmap to 0 here */
+               memset(LLOG_HDR_BITMAP(llh), 0, llh->llh_hdr.lrh_len -
+                                               llh->llh_bitmap_offset -
+                                               sizeof(llh->llh_tail));
                ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
                LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
                LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
                rc = 0;
        }
                ext2_set_bit(0, LLOG_HDR_BITMAP(llh));
                LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
                LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
                rc = 0;
        }
-       return rc;
+       RETURN(rc);
 }
 }
+EXPORT_SYMBOL(llog_read_header);
 
 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                     int flags, struct obd_uuid *uuid)
 
 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                     int flags, struct obd_uuid *uuid)
@@ -571,17 +578,30 @@ out:
                cd->lpcd_last_idx = last_called_index;
 
        if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
                cd->lpcd_last_idx = last_called_index;
 
        if (unlikely(rc == -EIO && loghandle->lgh_obj != NULL)) {
-               /* something bad happened to the processing of a local
-                * llog file, probably I/O error or the log got corrupted..
-                * to be able to finally release the log we discard any
-                * remaining bits in the header */
-               CERROR("Local llog found corrupted\n");
-               while (index <= last_index) {
-                       if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh)) != 0)
-                               llog_cancel_rec(lpi->lpi_env, loghandle, index);
-                       index++;
+               if (dt_object_remote(loghandle->lgh_obj)) {
+                       /* If it is remote object, then -EIO might means
+                        * disconnection or eviction, let's return -EAGAIN,
+                        * so for update recovery log processing, it will
+                        * retry until the umount or abort recovery, see
+                        * lod_sub_recovery_thread() */
+                       CERROR("%s retry remote llog process\n",
+                              loghandle->lgh_ctxt->loc_obd->obd_name);
+                       rc = -EAGAIN;
+               } else {
+                       /* something bad happened to the processing of a local
+                        * llog file, probably I/O error or the log got
+                        * corrupted to be able to finally release the log we
+                        * discard any remaining bits in the header */
+                       CERROR("Local llog found corrupted\n");
+                       while (index <= last_index) {
+                               if (ext2_test_bit(index,
+                                                 LLOG_HDR_BITMAP(llh)) != 0)
+                                       llog_cancel_rec(lpi->lpi_env, loghandle,
+                                                       index);
+                               index++;
+                       }
+                       rc = 0;
                }
                }
-               rc = 0;
        }
 
        OBD_FREE_LARGE(buf, chunk_size);
        }
 
        OBD_FREE_LARGE(buf, chunk_size);
index 82de341..9f1f4ea 100644 (file)
@@ -176,10 +176,11 @@ static int llog_cat_new_log(const struct lu_env *env,
 
        loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
 out:
 
        loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
 out:
-       if (handle != NULL)
+       if (handle != NULL) {
+               handle->th_result = rc >= 0 ? 0 : rc;
                dt_trans_stop(env, dt, handle);
                dt_trans_stop(env, dt, handle);
-
-       RETURN(0);
+       }
+       RETURN(rc);
 
 out_destroy:
        /* to signal llog_cat_close() it shouldn't try to destroy the llog,
 
 out_destroy:
        /* to signal llog_cat_close() it shouldn't try to destroy the llog,
@@ -382,6 +383,40 @@ next:
        RETURN(loghandle);
 }
 
        RETURN(loghandle);
 }
 
+static int llog_cat_update_header(const struct lu_env *env,
+                          struct llog_handle *cathandle)
+{
+       struct llog_handle *loghandle;
+       int rc;
+       ENTRY;
+
+       /* refresh llog */
+       down_write(&cathandle->lgh_lock);
+       if (!cathandle->lgh_stale) {
+               up_write(&cathandle->lgh_lock);
+               RETURN(0);
+       }
+       list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
+                           u.phd.phd_entry) {
+               if (!llog_exist(loghandle))
+                       continue;
+
+               rc = llog_read_header(env, loghandle, NULL);
+               if (rc != 0) {
+                       up_write(&cathandle->lgh_lock);
+                       GOTO(out, rc);
+               }
+       }
+       rc = llog_read_header(env, cathandle, NULL);
+       if (rc == 0)
+               cathandle->lgh_stale = 0;
+       up_write(&cathandle->lgh_lock);
+       if (rc != 0)
+               GOTO(out, rc);
+out:
+       RETURN(rc);
+}
+
 /* Add a single record to the recovery log(s) using a catalog
  * Returns as llog_write_record
  *
 /* Add a single record to the recovery log(s) using a catalog
  * Returns as llog_write_record
  *
@@ -495,17 +530,31 @@ int llog_cat_declare_add_rec(const struct lu_env *env,
                         * on the success of this transaction. So let's
                         * create the llog object synchronously here to
                         * remove the dependency. */
                         * on the success of this transaction. So let's
                         * create the llog object synchronously here to
                         * remove the dependency. */
+create_again:
                        down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
                        loghandle = cathandle->u.chd.chd_current_log;
                        down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
                        down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
                        loghandle = cathandle->u.chd.chd_current_log;
                        down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
-                       if (!llog_exist(loghandle))
+                       if (cathandle->lgh_stale) {
+                               up_write(&loghandle->lgh_lock);
+                               up_read(&cathandle->lgh_lock);
+                               GOTO(out, rc = -EIO);
+                       }
+                       if (!llog_exist(loghandle)) {
                                rc = llog_cat_new_log(env, cathandle, loghandle,
                                                      NULL);
                                rc = llog_cat_new_log(env, cathandle, loghandle,
                                                      NULL);
+                               if (rc == -ESTALE)
+                                       cathandle->lgh_stale = 1;
+                       }
                        up_write(&loghandle->lgh_lock);
                        up_read(&cathandle->lgh_lock);
                        up_write(&loghandle->lgh_lock);
                        up_read(&cathandle->lgh_lock);
-                       if (rc < 0)
+                       if (rc == -ESTALE) {
+                               rc = llog_cat_update_header(env, cathandle);
+                               if (rc != 0)
+                                       GOTO(out, rc);
+                               goto create_again;
+                       } else if (rc < 0) {
                                GOTO(out, rc);
                                GOTO(out, rc);
-
+                       }
                } else {
                        rc = llog_declare_create(env,
                                        cathandle->u.chd.chd_current_log, th);
                } else {
                        rc = llog_declare_create(env,
                                        cathandle->u.chd.chd_current_log, th);
@@ -515,11 +564,27 @@ int llog_cat_declare_add_rec(const struct lu_env *env,
                                               &lirec->lid_hdr, -1, th);
                }
        }
                                               &lirec->lid_hdr, -1, th);
                }
        }
+
+write_again:
        /* declare records in the llogs */
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
                                    rec, -1, th);
        /* declare records in the llogs */
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
                                    rec, -1, th);
-       if (rc)
+       if (rc == -ESTALE) {
+               down_write(&cathandle->lgh_lock);
+               if (cathandle->lgh_stale) {
+                       up_write(&cathandle->lgh_lock);
+                       GOTO(out, rc = -EIO);
+               }
+
+               cathandle->lgh_stale = 1;
+               up_write(&cathandle->lgh_lock);
+               rc = llog_cat_update_header(env, cathandle);
+               if (rc != 0)
+                       GOTO(out, rc);
+               goto write_again;
+       } else if (rc < 0) {
                GOTO(out, rc);
                GOTO(out, rc);
+       }
 
        next = cathandle->u.chd.chd_next_log;
        if (next) {
 
        next = cathandle->u.chd.chd_next_log;
        if (next) {
index 0503e66..8432d39 100644 (file)
@@ -280,6 +280,7 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
+       handle->lgh_write_offset = lgi->lgi_attr.la_size;
 
        RETURN(0);
 }
 
        RETURN(0);
 }
@@ -382,7 +383,8 @@ static int llog_osd_write_rec(const struct lu_env *env,
        struct dt_object        *o;
        __u32                   chunk_size;
        size_t                   left;
        struct dt_object        *o;
        __u32                   chunk_size;
        size_t                   left;
-
+       __u32                   orig_last_idx;
+       __u64                   orig_write_offset;
        ENTRY;
 
        LASSERT(env);
        ENTRY;
 
        LASSERT(env);
@@ -551,6 +553,8 @@ static int llog_osd_write_rec(const struct lu_env *env,
                RETURN(-ENOSPC);
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
                RETURN(-ENOSPC);
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
+       orig_last_idx = loghandle->lgh_last_idx;
+       orig_write_offset = loghandle->lgh_write_offset;
        lgi->lgi_off = lgi->lgi_attr.la_size;
        left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
        /* NOTE: padding is a record, but no bit is set */
        lgi->lgi_off = lgi->lgi_attr.la_size;
        left = chunk_size - (lgi->lgi_off & (chunk_size - 1));
        /* NOTE: padding is a record, but no bit is set */
@@ -560,6 +564,10 @@ static int llog_osd_write_rec(const struct lu_env *env,
                rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
                if (rc)
                        RETURN(rc);
                rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th);
                if (rc)
                        RETURN(rc);
+
+               if (dt_object_remote(o))
+                       loghandle->lgh_write_offset = lgi->lgi_off;
+
                loghandle->lgh_last_idx++; /* for pad rec */
        }
        /* if it's the last idx in log file, then return -ENOSPC
                loghandle->lgh_last_idx++; /* for pad rec */
        }
        /* if it's the last idx in log file, then return -ENOSPC
@@ -661,6 +669,9 @@ out_unlock:
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
         * records. This also allows to handle Catalog wrap around case */
        if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
                lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
+       } else if (dt_object_remote(o)) {
+               lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset,
+                                    lgi->lgi_off);
        } else {
                rc = dt_attr_get(env, o, &lgi->lgi_attr);
                if (rc)
        } else {
                rc = dt_attr_get(env, o, &lgi->lgi_attr);
                if (rc)
@@ -677,8 +688,11 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
        if (rc < 0)
                GOTO(out, rc);
 
-       CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
-              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
+       if (dt_object_remote(o))
+               loghandle->lgh_write_offset = lgi->lgi_off;
+
+       CDEBUG(D_HA, "added record "DFID": idx: %u, %u off"LPU64"\n",
+              PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len,
               lgi->lgi_off);
        if (reccookie != NULL) {
                reccookie->lgc_lgl = loghandle->lgh_id;
               lgi->lgi_off);
        if (reccookie != NULL) {
                reccookie->lgc_lgl = loghandle->lgh_id;
@@ -701,11 +715,15 @@ out:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
-       if (--loghandle->lgh_last_idx == 0 &&
+       if (dt_object_remote(o)) {
+               loghandle->lgh_last_idx = orig_last_idx;
+               loghandle->lgh_write_offset = orig_write_offset;
+       } else if (--loghandle->lgh_last_idx == 0 &&
            (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
                /* catalog had just wrap-around case */
                loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
        }
            (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
                /* catalog had just wrap-around case */
                loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
        }
+
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);
@@ -822,9 +840,6 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (len == 0 || len & (chunk_size - 1))
                RETURN(-EINVAL);
 
        if (len == 0 || len & (chunk_size - 1))
                RETURN(-EINVAL);
 
-       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n",
-              next_idx, *cur_idx, *cur_offset);
-
        LASSERT(loghandle);
        LASSERT(loghandle->lgh_ctxt);
 
        LASSERT(loghandle);
        LASSERT(loghandle->lgh_ctxt);
 
@@ -838,6 +853,10 @@ static int llog_osd_next_block(const struct lu_env *env,
        if (rc)
                GOTO(out, rc);
 
        if (rc)
                GOTO(out, rc);
 
+       CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off"
+              LPU64"), size %llu\n", next_idx, *cur_idx,
+              *cur_offset, lgi->lgi_attr.la_size);
+
        while (*cur_offset < lgi->lgi_attr.la_size) {
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
        while (*cur_offset < lgi->lgi_attr.la_size) {
                struct llog_rec_hdr     *rec, *last_rec;
                struct llog_rec_tail    *tail;
@@ -899,7 +918,16 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
 
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
-               LASSERT(last_rec->lrh_index == tail->lrt_index);
+
+               if (last_rec->lrh_index != tail->lrt_index) {
+                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
+                              "offset "LPU64" last_rec idx %u tail idx %u\n",
+                              o->do_lu.lo_dev->ld_obd->obd_name,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen, *cur_offset,
+                              last_rec->lrh_index, tail->lrt_index);
+                       GOTO(out, rc = -EINVAL);
+               }
 
                *cur_idx = tail->lrt_index;
 
 
                *cur_idx = tail->lrt_index;
 
index e4af6b6..b2a6da9 100644 (file)
@@ -1614,6 +1614,9 @@ static int osp_import_event(struct obd_device *obd, struct obd_import *imp,
                CDEBUG(D_HA, "got connected\n");
                break;
        case IMP_EVENT_INVALIDATE:
                CDEBUG(D_HA, "got connected\n");
                break;
        case IMP_EVENT_INVALIDATE:
+               if (d->opd_connect_mdt)
+                       osp_invalidate_request(d);
+
                if (obd->obd_namespace == NULL)
                        break;
                ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY);
                if (obd->obd_namespace == NULL)
                        break;
                ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY);
index 2a32244..727cf6e 100644 (file)
@@ -101,9 +101,6 @@ struct osp_update_request_sub {
        struct list_head                ours_list;
 };
 
        struct list_head                ours_list;
 };
 
-/**
- * Tracking the updates being executed on this dt_device.
- */
 struct osp_update_request {
        int                             our_flags;
        /* update request result */
 struct osp_update_request {
        int                             our_flags;
        /* update request result */
@@ -118,10 +115,15 @@ struct osp_update_request {
 
        /* points to thandle if this update request belongs to one */
        struct osp_thandle              *our_th;
 
        /* points to thandle if this update request belongs to one */
        struct osp_thandle              *our_th;
+
+       __u64                           our_version;
+       /* protect our_list and flag */
+       spinlock_t                      our_list_lock;
        /* linked to the list(ou_list) in osp_updates */
        struct list_head                our_list;
        __u32                           our_batchid;
        /* linked to the list(ou_list) in osp_updates */
        struct list_head                our_list;
        __u32                           our_batchid;
-       __u32                           our_req_sent:1;
+       __u32                           our_req_ready:1;
+
 };
 
 struct osp_updates {
 };
 
 struct osp_updates {
@@ -288,7 +290,8 @@ struct osp_object {
        struct lu_object_header opo_header;
        struct dt_object        opo_obj;
        unsigned int            opo_reserved:1,
        struct lu_object_header opo_header;
        struct dt_object        opo_obj;
        unsigned int            opo_reserved:1,
-                               opo_non_exist:1;
+                               opo_non_exist:1,
+                               opo_stale:1;
 
        /* read/write lock for md osp object */
        struct rw_semaphore     opo_sem;
 
        /* read/write lock for md osp object */
        struct rw_semaphore     opo_sem;
@@ -353,7 +356,6 @@ struct osp_thandle {
        struct list_head         ot_stop_dcb_list;
        struct osp_update_request *ot_our;
        atomic_t                 ot_refcount;
        struct list_head         ot_stop_dcb_list;
        struct osp_update_request *ot_our;
        atomic_t                 ot_refcount;
-       __u64                    ot_version;
 };
 
 static inline struct osp_thandle *
 };
 
 static inline struct osp_thandle *
@@ -688,7 +690,8 @@ struct osp_update_request *osp_update_request_create(struct dt_device *dt);
 void osp_update_request_destroy(struct osp_update_request *update);
 
 int osp_send_update_thread(void *arg);
 void osp_update_request_destroy(struct osp_update_request *update);
 
 int osp_send_update_thread(void *arg);
-int osp_check_and_set_rpc_version(struct osp_thandle *oth);
+int osp_check_and_set_rpc_version(struct osp_thandle *oth,
+                                 struct osp_object *obj);
 
 void osp_thandle_destroy(struct osp_thandle *oth);
 static inline void osp_thandle_get(struct osp_thandle *oth)
 
 void osp_thandle_destroy(struct osp_thandle *oth);
 static inline void osp_thandle_get(struct osp_thandle *oth)
@@ -714,6 +717,7 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env,
                                        struct osp_device *osp);
 void osp_trans_callback(const struct lu_env *env,
                        struct osp_thandle *oth, int rc);
                                        struct osp_device *osp);
 void osp_trans_callback(const struct lu_env *env,
                        struct osp_thandle *oth, int rc);
+void osp_invalidate_request(struct osp_device *osp);
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_attr *attr);
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_attr *attr);
index f9afd25..26996f6 100644 (file)
@@ -1068,7 +1068,20 @@ static ssize_t osp_md_declare_write(const struct lu_env *env,
                                    const struct lu_buf *buf,
                                    loff_t pos, struct thandle *th)
 {
                                    const struct lu_buf *buf,
                                    loff_t pos, struct thandle *th)
 {
-       return osp_trans_update_request_create(th);
+       struct osp_device *osp = dt2osp_dev(th->th_dev);
+       int rc;
+
+       rc = osp_trans_update_request_create(th);
+       if (rc != 0)
+               return rc;
+
+       if (osp->opd_update == NULL)
+               return 0;
+
+       if (dt2osp_obj(dt)->opo_stale)
+               return -ESTALE;
+
+       return 0;
 }
 
 /**
 }
 
 /**
@@ -1100,13 +1113,17 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
        update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
+       CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n",
+              PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len);
+
        rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
                                 lu_object_fid(&dt->do_lu), buf, *pos);
        if (rc < 0)
                RETURN(rc);
 
        rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
                                 lu_object_fid(&dt->do_lu), buf, *pos);
        if (rc < 0)
                RETURN(rc);
 
-       CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n",
-              PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len);
+       rc = osp_check_and_set_rpc_version(oth, obj);
+       if (rc < 0)
+               RETURN(rc);
 
        /* XXX: how about the write error happened later? */
        *pos += buf->lb_len;
 
        /* XXX: how about the write error happened later? */
        *pos += buf->lb_len;
@@ -1116,10 +1133,6 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
            obj->opo_ooa->ooa_attr.la_size < *pos)
                obj->opo_ooa->ooa_attr.la_size = *pos;
 
            obj->opo_ooa->ooa_attr.la_size < *pos)
                obj->opo_ooa->ooa_attr.la_size = *pos;
 
-       rc = osp_check_and_set_rpc_version(oth);
-       if (rc < 0)
-               RETURN(rc);
-
        RETURN(buf->lb_len);
 }
 
        RETURN(buf->lb_len);
 }
 
@@ -1157,6 +1170,9 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
                GOTO(out_update, rc);
        }
 
                GOTO(out_update, rc);
        }
 
+       CDEBUG(D_INFO, "%s "DFID" read offset %llu size %zu\n",
+              dt_dev->dd_lu_dev.ld_obd->obd_name,
+              PFID(lu_object_fid(&dt->do_lu)), *pos, rbuf->lb_len);
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update,
                                 &req);
        if (rc != 0)
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update,
                                 &req);
        if (rc != 0)
index 41aa529..a71af4e 100644 (file)
@@ -554,7 +554,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
 
        if (obj->opo_ooa != NULL) {
                spin_lock(&obj->opo_lock);
 
        if (obj->opo_ooa != NULL) {
                spin_lock(&obj->opo_lock);
-               if (obj->opo_ooa->ooa_attr.la_valid != 0) {
+               if (obj->opo_ooa->ooa_attr.la_valid != 0 && !obj->opo_stale) {
                        *attr = obj->opo_ooa->ooa_attr;
                        spin_unlock(&obj->opo_lock);
 
                        *attr = obj->opo_ooa->ooa_attr;
                        spin_unlock(&obj->opo_lock);
 
@@ -603,7 +603,12 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0)
                GOTO(out, rc);
 
        if (rc != 0)
                GOTO(out, rc);
 
-       GOTO(out, rc = 0);
+       spin_lock(&obj->opo_lock);
+       if (obj->opo_stale)
+               obj->opo_stale = 0;
+       spin_unlock(&obj->opo_lock);
+
+       GOTO(out, rc);
 
 out:
        if (req != NULL)
 
 out:
        if (req != NULL)
index f6a5bbf..368bec5 100644 (file)
@@ -194,6 +194,7 @@ struct osp_update_request *osp_update_request_create(struct dt_device *dt)
        INIT_LIST_HEAD(&our->our_req_list);
        INIT_LIST_HEAD(&our->our_cb_items);
        INIT_LIST_HEAD(&our->our_list);
        INIT_LIST_HEAD(&our->our_req_list);
        INIT_LIST_HEAD(&our->our_cb_items);
        INIT_LIST_HEAD(&our->our_list);
+       spin_lock_init(&our->our_list_lock);
 
        osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
        return our;
 
        osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
        return our;
@@ -468,7 +469,62 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
+/**
+ * Invalidate all objects in the osp thandle
+ *
+ * invalidate all of objects in the update request, which will be called
+ * when the transaction is aborted.
+ *
+ * \param[in] oth      osp thandle.
+ */
+static void osp_thandle_invalidate_object(const struct lu_env *env,
+                                         struct osp_thandle *oth)
+{
+       struct osp_update_request *our = oth->ot_our;
+       struct osp_update_request_sub *ours;
+
+       if (our == NULL)
+               return;
+
+       list_for_each_entry(ours, &our->our_req_list, ours_list) {
+               struct object_update_request *our_req = ours->ours_req;
+               unsigned int i;
+               struct lu_object *obj;
+               struct osp_object *osp_obj;
+
+               for (i = 0; i < our_req->ourq_count; i++) {
+                       struct object_update *update;
+
+                       update = object_update_request_get(our_req, i, NULL);
+                       if (update == NULL)
+                               break;
+
+                       if (update->ou_type != OUT_WRITE)
+                               continue;
+
+                       if (!fid_is_sane(&update->ou_fid))
+                               continue;
+
+                       obj = lu_object_find_slice(env,
+                                       &oth->ot_super.th_dev->dd_lu_dev,
+                                       &update->ou_fid, NULL);
+                       if (IS_ERR(obj))
+                               break;
+
+                       osp_obj = lu2osp_obj(obj);
+                       if (osp_obj->opo_ooa != NULL) {
+                               spin_lock(&osp_obj->opo_lock);
+                               osp_obj->opo_ooa->ooa_attr.la_valid = 0;
+                               osp_obj->opo_stale = 1;
+                               spin_unlock(&osp_obj->opo_lock);
+                       }
+                       lu_object_put(env, obj);
+               }
+       }
+}
+
+static void osp_trans_stop_cb(const struct lu_env *env,
+                             struct osp_thandle *oth, int result)
 {
        struct dt_txn_commit_cb *dcb;
        struct dt_txn_commit_cb *tmp;
 {
        struct dt_txn_commit_cb *dcb;
        struct dt_txn_commit_cb *tmp;
@@ -482,6 +538,9 @@ static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
                list_del_init(&dcb->dcb_linkage);
                dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
        }
                list_del_init(&dcb->dcb_linkage);
                dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
        }
+
+       if (result < 0)
+               osp_thandle_invalidate_object(env, oth);
 }
 
 /**
 }
 
 /**
@@ -575,12 +634,12 @@ static int osp_update_interpret(const struct lu_env *env,
                reply = req_capsule_server_sized_get(&req->rq_pill,
                                                     &RMF_OUT_UPDATE_REPLY,
                                                     OUT_UPDATE_REPLY_SIZE);
                reply = req_capsule_server_sized_get(&req->rq_pill,
                                                     &RMF_OUT_UPDATE_REPLY,
                                                     OUT_UPDATE_REPLY_SIZE);
-               if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
-                       rc1 = -EPROTO;
-               else
+               if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) {
+                       if (rc == 0)
+                               rc = -EPROTO;
+               } else {
                        count = reply->ourp_count;
                        count = reply->ourp_count;
-       } else {
-               rc1 = rc;
+               }
        }
 
        list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
        }
 
        list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
@@ -589,18 +648,21 @@ static int osp_update_interpret(const struct lu_env *env,
                /* The peer may only have handled some requests (indicated
                 * by the 'count') in the packaged OUT RPC, we can only get
                 * results for the handled part. */
                /* The peer may only have handled some requests (indicated
                 * by the 'count') in the packaged OUT RPC, we can only get
                 * results for the handled part. */
-               if (index < count && reply->ourp_lens[index] > 0) {
+               if (index < count && reply->ourp_lens[index] > 0 && rc >= 0) {
                        struct object_update_result *result;
 
                        result = object_update_result_get(reply, index, NULL);
                        if (result == NULL)
                        struct object_update_result *result;
 
                        result = object_update_result_get(reply, index, NULL);
                        if (result == NULL)
-                               rc1 = -EPROTO;
+                               rc1 = rc = -EPROTO;
                        else
                        else
-                               rc1 = result->our_rc;
-               } else {
-                       rc1 = rc;
-                       if (unlikely(rc1 == 0))
+                               rc1 = rc = result->our_rc;
+               } else if (rc1 >= 0) {
+                       /* The peer did not handle these request, let's return
+                        * -EINVAL to update interpret for now */
+                       if (rc >= 0)
                                rc1 = -EINVAL;
                                rc1 = -EINVAL;
+                       else
+                               rc1 = rc;
                }
 
                if (ouc->ouc_interpreter != NULL)
                }
 
                if (ouc->ouc_interpreter != NULL)
@@ -617,13 +679,13 @@ static int osp_update_interpret(const struct lu_env *env,
        if (oth != NULL) {
                /* oth and osp_update_requests will be destoryed in
                 * osp_thandle_put */
        if (oth != NULL) {
                /* oth and osp_update_requests will be destoryed in
                 * osp_thandle_put */
-               osp_trans_stop_cb(oth, rc);
+               osp_trans_stop_cb(env, oth, rc);
                osp_thandle_put(oth);
        } else {
                osp_update_request_destroy(our);
        }
 
                osp_thandle_put(oth);
        } else {
                osp_update_request_destroy(our);
        }
 
-       RETURN(0);
+       RETURN(rc);
 }
 
 /**
 }
 
 /**
@@ -1003,7 +1065,7 @@ void osp_trans_callback(const struct lu_env *env,
                        osp_update_callback_fini(env, ouc);
                }
        }
                        osp_update_callback_fini(env, ouc);
                }
        }
-       osp_trans_stop_cb(oth, rc);
+       osp_trans_stop_cb(env, oth, rc);
        osp_trans_commit_cb(oth, rc);
 }
 
        osp_trans_commit_cb(oth, rc);
 }
 
@@ -1035,7 +1097,6 @@ static int osp_send_update_req(const struct lu_env *env,
        ENTRY;
 
        LASSERT(oth != NULL);
        ENTRY;
 
        LASSERT(oth != NULL);
-       LASSERT(our->our_req_sent == 0);
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                 our, &req);
        if (rc != 0) {
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                 our, &req);
        if (rc != 0) {
@@ -1168,16 +1229,17 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env,
 /**
  * Set version for the transaction
  *
 /**
  * Set version for the transaction
  *
- * Set the version for the transaction, then the osp RPC will be
- * sent in the order of version, i.e. the transaction with lower
- * version will be sent first.
+ * Set the version for the transaction and add the request to
+ * the sending list, then after transaction stop, the request
+ * will be picked in the order of version, by sending thread.
  *
  * \param [in] oth     osp thandle to be set version.
  *
  * \retval             0 if set version succeeds
  *                      negative errno if set version fails.
  */
  *
  * \param [in] oth     osp thandle to be set version.
  *
  * \retval             0 if set version succeeds
  *                      negative errno if set version fails.
  */
-int osp_check_and_set_rpc_version(struct osp_thandle *oth)
+int osp_check_and_set_rpc_version(struct osp_thandle *oth,
+                                 struct osp_object *obj)
 {
        struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
        struct osp_updates *ou = osp->opd_update;
 {
        struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
        struct osp_updates *ou = osp->opd_update;
@@ -1185,15 +1247,30 @@ int osp_check_and_set_rpc_version(struct osp_thandle *oth)
        if (ou == NULL)
                return -EIO;
 
        if (ou == NULL)
                return -EIO;
 
-       if (oth->ot_version != 0)
+       if (oth->ot_our->our_version != 0)
                return 0;
 
        spin_lock(&ou->ou_lock);
                return 0;
 
        spin_lock(&ou->ou_lock);
-       oth->ot_version = ou->ou_version++;
+       spin_lock(&oth->ot_our->our_list_lock);
+       if (obj->opo_stale) {
+               spin_unlock(&oth->ot_our->our_list_lock);
+               spin_unlock(&ou->ou_lock);
+               return -ESTALE;
+       }
+
+       /* Assign the version and add it to the sending list */
+       osp_thandle_get(oth);
+       oth->ot_our->our_version = ou->ou_version++;
+       list_add_tail(&oth->ot_our->our_list,
+                     &osp->opd_update->ou_list);
+       oth->ot_our->our_req_ready = 0;
+       spin_unlock(&oth->ot_our->our_list_lock);
        spin_unlock(&ou->ou_lock);
 
        spin_unlock(&ou->ou_lock);
 
+       LASSERT(oth->ot_super.th_wait_submit == 1);
        CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
        CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
-              osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version);
+              osp->opd_obd->obd_name, ou->ou_version, oth,
+              oth->ot_our->our_version);
 
        return 0;
 }
 
        return 0;
 }
@@ -1221,38 +1298,90 @@ osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
        spin_lock(&ou->ou_lock);
        list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
                LASSERT(our->our_th != NULL);
        spin_lock(&ou->ou_lock);
        list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
                LASSERT(our->our_th != NULL);
-               CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n",
-                      our, our->our_th->ot_version, ou->ou_rpc_version);
-               if (our->our_th->ot_version == 0) {
-                       list_del_init(&our->our_list);
-                       *ourp = our;
-                       got_req = true;
-                       break;
-               }
-
+               CDEBUG(D_HA, "ou %p version "LPU64" rpc_version "LPU64"\n",
+                      ou, our->our_version, ou->ou_rpc_version);
+               spin_lock(&our->our_list_lock);
                /* Find next osp_update_request in the list */
                /* Find next osp_update_request in the list */
-               if (our->our_th->ot_version == ou->ou_rpc_version) {
+               if (our->our_version == ou->ou_rpc_version &&
+                   our->our_req_ready) {
                        list_del_init(&our->our_list);
                        list_del_init(&our->our_list);
+                       spin_unlock(&our->our_list_lock);
                        *ourp = our;
                        got_req = true;
                        break;
                }
                        *ourp = our;
                        got_req = true;
                        break;
                }
+               spin_unlock(&our->our_list_lock);
        }
        spin_unlock(&ou->ou_lock);
 
        return got_req;
 }
 
        }
        spin_unlock(&ou->ou_lock);
 
        return got_req;
 }
 
-static void osp_update_rpc_version(struct osp_updates *ou,
-                                  struct osp_thandle *oth)
+/**
+ * Invalidate update request
+ *
+ * Invalidate update request in the OSP sending list, so all of
+ * requests in the sending list will return error, which happens
+ * when it finds one update (with writing llog) requests fails or
+ * the OSP is evicted by remote target. see osp_send_update_thread().
+ *
+ * \param[in] osp      OSP device whose update requests will be
+ *                      invalidated.
+ **/
+void osp_invalidate_request(struct osp_device *osp)
 {
 {
-       if (oth->ot_version == 0)
+       struct lu_env env;
+       struct osp_updates *ou = osp->opd_update;
+       struct osp_update_request *our;
+       struct osp_update_request *tmp;
+       LIST_HEAD(list);
+       int                     rc;
+       ENTRY;
+
+       if (ou == NULL)
+               return;
+
+       rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
+       if (rc < 0) {
+               CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
+                      rc);
                return;
                return;
+       }
+
+       INIT_LIST_HEAD(&list);
 
 
-       LASSERT(oth->ot_version == ou->ou_rpc_version);
        spin_lock(&ou->ou_lock);
        spin_lock(&ou->ou_lock);
-       ou->ou_rpc_version++;
+       /* invalidate all of request in the sending list */
+       list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
+               spin_lock(&our->our_list_lock);
+               if (our->our_req_ready)
+                       list_move(&our->our_list, &list);
+               else
+                       list_del_init(&our->our_list);
+
+               if (our->our_th->ot_super.th_result == 0)
+                       our->our_th->ot_super.th_result = -EIO;
+
+               if (our->our_version >= ou->ou_rpc_version)
+                       ou->ou_rpc_version = our->our_version + 1;
+               spin_unlock(&our->our_list_lock);
+
+               CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name,
+                      our);
+       }
+
        spin_unlock(&ou->ou_lock);
        spin_unlock(&ou->ou_lock);
+
+       /* invalidate all of request in the sending list */
+       list_for_each_entry_safe(our, tmp, &list, our_list) {
+               spin_lock(&our->our_list_lock);
+               list_del_init(&our->our_list);
+               spin_unlock(&our->our_list_lock);
+               osp_trans_callback(&env, our->our_th,
+                                  our->our_th->ot_super.th_result);
+               osp_thandle_put(our->our_th);
+       }
+       lu_env_fini(&env);
 }
 
 /**
 }
 
 /**
@@ -1293,32 +1422,43 @@ int osp_send_update_thread(void *arg)
                our = NULL;
                l_wait_event(ou->ou_waitq,
                             !osp_send_update_thread_running(osp) ||
                our = NULL;
                l_wait_event(ou->ou_waitq,
                             !osp_send_update_thread_running(osp) ||
-                            osp_get_next_request(ou, &our),
-                            &lwi);
+                            osp_get_next_request(ou, &our), &lwi);
 
                if (!osp_send_update_thread_running(osp)) {
 
                if (!osp_send_update_thread_running(osp)) {
-                       if (our != NULL && our->our_th != NULL) {
+                       if (our != NULL) {
                                osp_trans_callback(&env, our->our_th, -EINTR);
                                osp_thandle_put(our->our_th);
                        }
                        break;
                }
 
                                osp_trans_callback(&env, our->our_th, -EINTR);
                                osp_thandle_put(our->our_th);
                        }
                        break;
                }
 
-               if (our->our_req_sent == 0) {
-                       if (our->our_th != NULL &&
-                           our->our_th->ot_super.th_result != 0)
-                               osp_trans_callback(&env, our->our_th,
-                                       our->our_th->ot_super.th_result);
-                       else
-                               rc = osp_send_update_req(&env, osp, our);
+               LASSERT(our->our_th != NULL);
+               if (our->our_th->ot_super.th_result != 0) {
+                       osp_trans_callback(&env, our->our_th,
+                               our->our_th->ot_super.th_result);
+                       rc = our->our_th->ot_super.th_result;
+               } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
+                       rc = -EIO;
+                       osp_trans_callback(&env, our->our_th, rc);
+               } else {
+                       rc = osp_send_update_req(&env, osp, our);
                }
 
                }
 
-               if (our->our_th != NULL) {
-                       /* Update the rpc version */
-                       osp_update_rpc_version(ou, our->our_th);
-                       /* Balanced for thandle_get in osp_trans_trigger() */
-                       osp_thandle_put(our->our_th);
-               }
+               /* Update the rpc version */
+               spin_lock(&ou->ou_lock);
+               if (our->our_version == ou->ou_rpc_version)
+                       ou->ou_rpc_version++;
+               spin_unlock(&ou->ou_lock);
+
+               /* If one update request fails, let's fail all of the requests
+                * in the sending list, because the request in the sending
+                * list are dependent on either other, continue sending these
+                * request might cause llog or filesystem corruption */
+               if (rc < 0)
+                       osp_invalidate_request(osp);
+
+               /* Balanced for thandle_get in osp_check_and_set_rpc_version */
+               osp_thandle_put(our->our_th);
        }
 
        thread->t_flags = SVC_STOPPED;
        }
 
        thread->t_flags = SVC_STOPPED;
@@ -1329,36 +1469,6 @@ int osp_send_update_thread(void *arg)
 }
 
 /**
 }
 
 /**
- * Trigger the request for remote updates.
- *
- * Add the request to the sending list, and wake up osp update
- * sending thread.
- *
- * \param[in] env              pointer to the thread context
- * \param[in] osp              pointer to the OSP device
- * \param[in] oth              pointer to the transaction handler
- *
- */
-static void osp_trans_trigger(const struct lu_env *env,
-                            struct osp_device *osp,
-                            struct osp_thandle *oth)
-{
-
-       CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n",
-              osp->opd_obd->obd_name, oth, oth->ot_version);
-
-       LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
-       osp_thandle_get(oth);
-       LASSERT(oth->ot_our != NULL);
-       spin_lock(&osp->opd_update->ou_lock);
-       list_add_tail(&oth->ot_our->our_list,
-                     &osp->opd_update->ou_list);
-       spin_unlock(&osp->opd_update->ou_lock);
-
-       wake_up(&osp->opd_update->ou_waitq);
-}
-
-/**
  * The OSP layer dt_device_operations::dt_trans_start() interface
  * to start the transaction.
  *
  * The OSP layer dt_device_operations::dt_trans_start() interface
  * to start the transaction.
  *
@@ -1392,7 +1502,7 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
  * to stop the transaction.
  *
  * If the transaction is a remote transaction, related remote
  * to stop the transaction.
  *
  * If the transaction is a remote transaction, related remote
- * updates will be triggered here via osp_trans_trigger().
+ * updates will be triggered at the end of this function.
  *
  * For synchronous mode update or any failed update, the request
  * will be destroyed explicitly when the osp_trans_stop().
  *
  * For synchronous mode update or any failed update, the request
  * will be destroyed explicitly when the osp_trans_stop().
@@ -1440,17 +1550,25 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                GOTO(out, rc = -EIO);
        }
 
                GOTO(out, rc = -EIO);
        }
 
-       if (th->th_sync) {
-               /* if th_sync is set, then it needs to be sent
-                * right away. Note: even thought the RPC has been
-                * sent, it still needs to be added to the sending
-                * list (see osp_trans_trigger()), so ou_rpc_version
-                * can be updated correctly. */
+       CDEBUG(D_HA, "%s: add oth %p with version "LPU64"\n",
+              osp->opd_obd->obd_name, oth, our->our_version);
+
+       LASSERT(our->our_req_ready == 0);
+       spin_lock(&our->our_list_lock);
+       if (likely(!list_empty(&our->our_list))) {
+               /* notify sending thread */
+               our->our_req_ready = 1;
+               wake_up(&osp->opd_update->ou_waitq);
+               spin_unlock(&our->our_list_lock);
+       } else if (th->th_result == 0) {
+               /* if the request does not needs to be serialized,
+                * read-only request etc, let's send it right away */
+               spin_unlock(&our->our_list_lock);
                rc = osp_send_update_req(env, osp, our);
                rc = osp_send_update_req(env, osp, our);
-               our->our_req_sent = 1;
+       } else {
+               spin_unlock(&our->our_list_lock);
+               osp_trans_callback(env, oth, th->th_result);
        }
        }
-
-       osp_trans_trigger(env, osp, oth);
 out:
        osp_thandle_put(oth);
 
 out:
        osp_thandle_put(oth);
 
index eb86c58..4e2a258 100644 (file)
@@ -946,7 +946,7 @@ int out_handle(struct tgt_session_info *tsi)
 
                oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
                if (oub == NULL)
 
                oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
                if (oub == NULL)
-                       GOTO(out_free, rc = -EPROTO);
+                       GOTO(out_free, rc = err_serious(-EPROTO));
 
                desc = ptlrpc_prep_bulk_exp(pill->rc_req, update_buf_count,
                                            PTLRPC_BULK_OPS_COUNT,
 
                desc = ptlrpc_prep_bulk_exp(pill->rc_req, update_buf_count,
                                            PTLRPC_BULK_OPS_COUNT,
@@ -960,7 +960,7 @@ int out_handle(struct tgt_session_info *tsi)
                tmp = oub;
                for (i = 0; i < update_buf_count; i++, tmp++) {
                        if (tmp->oub_size >= OUT_MAXREQSIZE)
                tmp = oub;
                for (i = 0; i < update_buf_count; i++, tmp++) {
                        if (tmp->oub_size >= OUT_MAXREQSIZE)
-                               GOTO(out_free, rc = -EPROTO);
+                               GOTO(out_free, rc = err_serious(-EPROTO));
 
                        OBD_ALLOC(update_bufs[i], tmp->oub_size);
                        if (update_bufs[i] == NULL)
 
                        OBD_ALLOC(update_bufs[i], tmp->oub_size);
                        if (update_bufs[i] == NULL)
@@ -973,11 +973,11 @@ int out_handle(struct tgt_session_info *tsi)
                pill->rc_req->rq_bulk_write = 1;
                rc = sptlrpc_svc_prep_bulk(pill->rc_req, desc);
                if (rc != 0)
                pill->rc_req->rq_bulk_write = 1;
                rc = sptlrpc_svc_prep_bulk(pill->rc_req, desc);
                if (rc != 0)
-                       GOTO(out_free, rc);
+                       GOTO(out_free, err_serious(rc));
 
                rc = target_bulk_io(pill->rc_req->rq_export, desc, &lwi);
                if (rc < 0)
 
                rc = target_bulk_io(pill->rc_req->rq_export, desc, &lwi);
                if (rc < 0)
-                       GOTO(out_free, rc);
+                       GOTO(out_free, err_serious(rc));
        }
        /* validate the request and calculate the total update count and
         * set it to reply */
        }
        /* validate the request and calculate the total update count and
         * set it to reply */
index a7b42d3..95d1fb6 100644 (file)
@@ -83,9 +83,9 @@ static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt,
        list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
                struct sub_thandle_cookie *stc;
 
        list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
                struct sub_thandle_cookie *stc;
 
-               CDEBUG(mask, "st %p obd %s committed %d sub_th %p\n",
+               CDEBUG(mask, "st %p obd %s committed %d stopped %d sub_th %p\n",
                       st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
                       st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
-                      st->st_committed, st->st_sub_th);
+                      st->st_committed, st->st_stopped, st->st_sub_th);
 
                list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
                        CDEBUG(mask, " cookie "DOSTID": %u\n",
 
                list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
                        CDEBUG(mask, " cookie "DOSTID": %u\n",
index a775227..cd0aa1e 100755 (executable)
@@ -4330,6 +4330,35 @@ test_117() {
 }
 run_test 117 "DNE: cross MDT unlink, fail MDT1 and MDT2"
 
 }
 run_test 117 "DNE: cross MDT unlink, fail MDT1 and MDT2"
 
+test_118() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.64) ] &&
+               skip "Do not support large update log before 2.7.64" &&
+               return 0
+
+       mkdir -p $DIR/$tdir
+
+       $LFS setdirstripe -c2 $DIR/$tdir/striped_dir ||
+               error "setdirstripe fails"
+       $LFS setdirstripe -c2 $DIR/$tdir/striped_dir1 ||
+               error "setdirstripe fails 1"
+       rm -rf $DIR/$tdir/striped_dir* || error "rmdir fails"
+
+       # OBD_FAIL_INVALIDATE_UPDATE       0x1705
+       do_facet mds1 "lctl set_param fail_loc=0x1705"
+       $LFS setdirstripe -c2 $DIR/$tdir/striped_dir
+       $LFS setdirstripe -c2 $DIR/$tdir/striped_dir1
+       do_facet mds1 "lctl set_param fail_loc=0x0"
+
+       replay_barrier mds1
+       $LFS setdirstripe -c2 $DIR/$tdir/striped_dir
+       $LFS setdirstripe -c2 $DIR/$tdir/striped_dir1
+       fail mds1
+
+       true
+}
+run_test 118 "invalidate osp update will not cause update log corruption"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index 5985805..7185c99 100755 (executable)
@@ -14152,6 +14152,8 @@ test_300p() {
                        error "create striped directory should fail"
 
        [ -e $DIR/$tdir/bad_striped_dir ] && error "striped dir exists"
                        error "create striped directory should fail"
 
        [ -e $DIR/$tdir/bad_striped_dir ] && error "striped dir exists"
+
+       $LFS setdirstripe -c2 $DIR/$tdir/bad_striped_dir
        true
 }
 run_test 300p "create striped directory without space"
        true
 }
 run_test 300p "create striped directory without space"