Whamcloud - gitweb
LU-9771 flr: read support for flr 85/29085/12
authorJinshan Xiong <jinshan.xiong@intel.com>
Fri, 15 Sep 2017 20:32:06 +0000 (20:32 +0000)
committerJinshan Xiong <jinshan.xiong@intel.com>
Fri, 17 Nov 2017 08:00:36 +0000 (08:00 +0000)
Avoid stale mirrors to read;
If preferred mirror is inaccessible, try next one, ndelay RPC
is implemented to make the error-out quick. ndelay RPC has
rq_no_delay bit set that can be applied to brw and lock RPC.

Test-Parameters: testlist=sanity-flr
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Change-Id: I52143079edd1566ecb3a734ed88dab19f882c2fc
Reviewed-on: https://review.whamcloud.com/29085
Tested-by: Jenkins
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
25 files changed:
lustre/include/cl_object.h
lustre/include/lustre_dlm_flags.h
lustre/include/lustre_osc.h
lustre/include/obd_support.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/ldlm/ldlm_request.c
lustre/llite/file.c
lustre/llite/glimpse.c
lustre/llite/rw.c
lustre/llite/vvp_internal.h
lustre/llite/vvp_io.c
lustre/llite/vvp_page.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_lock.c
lustre/lov/lov_object.c
lustre/lov/lov_page.c
lustre/obdclass/cl_io.c
lustre/osc/osc_cache.c
lustre/osc/osc_io.c
lustre/osc/osc_lock.c
lustre/osc/osc_request.c
lustre/tests/rwv.c
lustre/tests/sanity-flr.sh
lustre/tests/test-framework.sh

index 33281fc..cb1c5ab 100644 (file)
@@ -1299,7 +1299,7 @@ struct cl_page_list {
        struct task_struct      *pl_owner;
 };
 
-/** 
+/**
  * A 2-queue of pages. A convenience data-type for common use case, 2-queue
  * contains an incoming page list and an outgoing page list.
  */
@@ -1763,6 +1763,7 @@ struct cl_io_pt {
        struct iov_iter          cip_iter;
        struct file             *cip_file;
        enum cl_io_type          cip_iot;
+       unsigned int             cip_need_restart:1;
        loff_t                   cip_pos;
        size_t                   cip_count;
        ssize_t                  cip_result;
@@ -1893,7 +1894,20 @@ struct cl_io {
        /** Set to 1 if parallel execution is allowed for current I/O? */
                             ci_pio:1,
        /* Tell sublayers not to expand LDLM locks requested for this IO */
-                            ci_lock_no_expand:1;
+                            ci_lock_no_expand:1,
+       /**
+        * Set if non-delay RPC should be used for this IO.
+        *
+        * If this file has multiple mirrors, and if the OSTs of the current
+        * mirror is inaccessible, non-delay RPC would error out quickly so
+        * that the upper layer can try to access the next mirror.
+        */
+                            ci_ndelay:1;
+       /**
+        * How many times the read has retried before this one.
+        * Set by the top level and consumed by the LOV.
+        */
+       unsigned             ci_ndelay_tried;
        /**
         * Number of pages owned by this IO. For invariant checking.
         */
@@ -2355,13 +2369,12 @@ struct cl_io *cl_io_top(struct cl_io *io);
 void cl_io_print(const struct lu_env *env, void *cookie,
                  lu_printer_t printer, const struct cl_io *io);
 
-#define CL_IO_SLICE_CLEAN(foo_io, base)                                 \
-do {                                                                    \
-        typeof(foo_io) __foo_io = (foo_io);                             \
-                                                                        \
-        CLASSERT(offsetof(typeof(*__foo_io), base) == 0);               \
-        memset(&__foo_io->base + 1, 0,                                  \
-               (sizeof *__foo_io) - sizeof __foo_io->base);             \
+#define CL_IO_SLICE_CLEAN(foo_io, base)                                        \
+do {                                                                   \
+       typeof(foo_io) __foo_io = (foo_io);                             \
+                                                                       \
+       memset(&__foo_io->base, 0,                                      \
+              sizeof(*__foo_io) - offsetof(typeof(*__foo_io), base));  \
 } while (0)
 
 /** @} cl_io */
index 7576f16..43c24b5 100644 (file)
 #define ldlm_is_cos_enabled(_l)          LDLM_TEST_FLAG((_l), 1ULL << 57)
 #define ldlm_set_cos_enabled(_l)         LDLM_SET_FLAG((_l), 1ULL << 57)
 
+/**
+ * This flags means to use non-delay RPC to send dlm request RPC.
+ */
+#define LDLM_FL_NDELAY                  0x0400000000000000ULL /* bit  58 */
+#define ldlm_is_ndelay(_l)              LDLM_TEST_FLAG((_l), 1ULL << 58)
+#define ldlm_set_ndelay(_l)             LDLM_SET_FLAG((_l), 1ULL << 58)
+
 /** l_flags bits marked as "ast" bits */
 #define LDLM_FL_AST_MASK                (LDLM_FL_FLOCK_DEADLOCK                |\
                                         LDLM_FL_DISCARD_DATA)
index 88fe0dd..0bb766c 100644 (file)
@@ -595,7 +595,7 @@ int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj,
 int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
                         struct osc_page *ops);
 int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
-                        struct list_head *list, int cmd, int brw_flags);
+                        struct list_head *list, int brw_flags);
 int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
                             __u64 size, struct osc_extent **extp);
 void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext);
@@ -929,7 +929,9 @@ struct osc_extent {
                                oe_hp:1,
        /** this extent should be written back asap. set if one of pages is
         * called by page WB daemon, or sync write or reading requests. */
-                               oe_urgent:1;
+                               oe_urgent:1,
+       /** Non-delay RPC should be used for this extent. */
+                               oe_ndelay:1;
        /** how many grants allocated for this extent.
         *  Grant allocated for this extent. There is no grant allocated
         *  for reading extents and sync write extents. */
index e82f36d..ee48781 100644 (file)
@@ -608,6 +608,9 @@ extern char obd_jobid_var[];
 /* LMV */
 #define OBD_FAIL_UNKNOWN_LMV_STRIPE            0x1901
 
+/* FLR */
+#define OBD_FAIL_FLR_GLIMPSE_IMMUTABLE         0x1A00
+
 /* DT */
 #define OBD_FAIL_DT_DECLARE_ATTR_GET           0x2000
 #define OBD_FAIL_DT_ATTR_GET                   0x2001
index 3e8f706..0b44ba4 100644 (file)
@@ -1255,6 +1255,9 @@ struct hsm_state_set {
 #define OBD_BRW_READ            0x01
 #define OBD_BRW_WRITE           0x02
 #define OBD_BRW_RWMASK          (OBD_BRW_READ | OBD_BRW_WRITE)
+#define OBD_BRW_NDELAY         0x04 /* Non-delay RPC should be issued for
+                                     * this page. Non-delay RPCs have bit
+                                     * rq_no_delay set. */
 #define OBD_BRW_SYNC            0x08 /* this page is a part of synchronous
                                       * transfer and is not accounted in
                                       * the grant. */
index d1d068f..2032d99 100644 (file)
@@ -970,6 +970,14 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                         DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
        }
 
+       if (*flags & LDLM_FL_NDELAY) {
+               DEBUG_REQ(D_DLMTRACE, req, "enque lock with no delay\n");
+               req->rq_no_resend = req->rq_no_delay = 1;
+               /* probably set a shorter timeout value and handle ETIMEDOUT
+                * in osc_lock_upcall() correctly */
+               /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
+       }
+
        /* Dump lock data into the request buffer */
        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
        ldlm_lock2desc(lock, &body->lock_desc);
index 7f43784..542a026 100644 (file)
@@ -1126,6 +1126,10 @@ static void ll_io_init(struct cl_io *io, struct file *file, enum cl_io_type iot)
                io->ci_pio = !io->u.ci_rw.rw_append;
        else
                io->ci_pio = 0;
+
+       /* FLR: only use non-delay I/O for read as there is only one
+        * avaliable mirror for write. */
+       io->ci_ndelay = !(iot == CIT_WRITE);
 }
 
 static int ll_file_io_ptask(struct cfs_ptask *ptask)
@@ -1139,16 +1143,15 @@ static int ll_file_io_ptask(struct cfs_ptask *ptask)
        __u16 refcheck;
        ENTRY;
 
-       env = cl_env_get(&refcheck);
-       if (IS_ERR(env))
-               RETURN(PTR_ERR(env));
-
        CDEBUG(D_VFSTRACE, "%s: %s range: [%llu, %llu)\n",
                file_dentry(file)->d_name.name,
                pt->cip_iot == CIT_READ ? "read" : "write",
                pos, pos + pt->cip_count);
 
-restart:
+       env = cl_env_get(&refcheck);
+       if (IS_ERR(env))
+               RETURN(PTR_ERR(env));
+
        io = vvp_env_thread_io(env);
        ll_io_init(io, file, pt->cip_iot);
        io->u.ci_rw.rw_iter = pt->cip_iter;
@@ -1190,25 +1193,15 @@ restart:
        }
 
        cl_io_fini(env, io);
+       cl_env_put(env, &refcheck);
 
-       if ((rc == 0 || rc == -ENODATA) &&
-           pt->cip_result < pt->cip_count &&
-           io->ci_need_restart) {
-               CDEBUG(D_VFSTRACE,
-                       "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
-                       file_dentry(file)->d_name.name,
-                       pt->cip_iot == CIT_READ ? "read" : "write",
-                       pos, pos + pt->cip_count - pt->cip_result,
-                       pt->cip_result, rc);
-               goto restart;
-       }
+       pt->cip_need_restart = io->ci_need_restart;
 
        CDEBUG(D_VFSTRACE, "%s: %s ret: %zd, rc: %d\n",
                file_dentry(file)->d_name.name,
                pt->cip_iot == CIT_READ ? "read" : "write",
                pt->cip_result, rc);
 
-       cl_env_put(env, &refcheck);
        RETURN(pt->cip_result > 0 ? 0 : rc);
 }
 
@@ -1226,6 +1219,8 @@ ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
        loff_t                  pos = *ppos;
        ssize_t                 result = 0;
        int                     rc = 0;
+       unsigned                retried = 0;
+       bool                    restarted = false;
 
        ENTRY;
 
@@ -1239,9 +1234,10 @@ restart:
        if (args->via_io_subtype == IO_NORMAL) {
                io->u.ci_rw.rw_iter = *args->u.normal.via_iter;
                io->u.ci_rw.rw_iocb = *args->u.normal.via_iocb;
-       } else {
-               io->ci_pio = 0;
        }
+       if (args->via_io_subtype != IO_NORMAL || restarted)
+               io->ci_pio = 0;
+       io->ci_ndelay_tried = retried;
 
        if (cl_io_rw_init(env, io, iot, pos, count) == 0) {
                bool range_locked = false;
@@ -1324,12 +1320,20 @@ restart:
 out:
        cl_io_fini(env, io);
 
+       CDEBUG(D_VFSTRACE,
+              "%s: %d io complete with rc: %d, result: %zd, restart: %d\n",
+              file->f_path.dentry->d_name.name,
+              iot, rc, result, io->ci_need_restart);
+
        if ((rc == 0 || rc == -ENODATA) && count > 0 && io->ci_need_restart) {
                CDEBUG(D_VFSTRACE,
                        "%s: restart %s range: [%llu, %llu) ret: %zd, rc: %d\n",
                        file_dentry(file)->d_name.name,
                        iot == CIT_READ ? "read" : "write",
                        pos, pos + count, result, rc);
+               /* preserve the tried count for FLR */
+               retried = io->ci_ndelay_tried;
+               restarted = true;
                goto restart;
        }
 
index 7c9ec0d..94467aa 100644 (file)
@@ -187,27 +187,31 @@ int cl_glimpse_size0(struct inode *inode, int agl)
         struct lu_env          *env = NULL;
         struct cl_io           *io  = NULL;
        __u16                   refcheck;
-        int                     result;
+       int                     retried = 0;
+       int                     result;
 
-        ENTRY;
+       ENTRY;
 
        result = cl_io_get(inode, &env, &io, &refcheck);
        if (result <= 0)
                RETURN(result);
 
        do {
-               io->ci_need_restart = 0;
-               io->ci_verify_layout = 1;
+               io->ci_ndelay_tried = retried++;
+               io->ci_ndelay = io->ci_verify_layout = 1;
                result = cl_io_init(env, io, CIT_GLIMPSE, io->ci_obj);
-                if (result > 0)
-                        /*
-                         * nothing to do for this io. This currently happens
-                         * when stripe sub-object's are not yet created.
-                         */
-                        result = io->ci_result;
-                else if (result == 0)
-                        result = cl_glimpse_lock(env, io, inode, io->ci_obj,
+               if (result > 0) {
+                       /*
+                        * nothing to do for this io. This currently happens
+                        * when stripe sub-object's are not yet created.
+                        */
+                       result = io->ci_result;
+               } else if (result == 0) {
+                       result = cl_glimpse_lock(env, io, inode, io->ci_obj,
                                                 agl);
+                       if (!agl && result == -EWOULDBLOCK)
+                               io->ci_need_restart = 1;
+               }
 
                OBD_FAIL_TIMEOUT(OBD_FAIL_GLIMPSE_DELAY, 2);
                cl_io_fini(env, io);
index 0b0b0c2..69b6208 100644 (file)
@@ -1084,6 +1084,7 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
        struct ll_file_data       *fd     = LUSTRE_FPRIVATE(file);
        struct ll_readahead_state *ras    = &fd->fd_ras;
        struct cl_2queue          *queue  = &io->ci_queue;
+       struct cl_sync_io         *anchor = NULL;
        struct vvp_page           *vpg;
        int                        rc = 0;
        bool                       uptodate;
@@ -1111,6 +1112,10 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
                cl_page_export(env, page, 1);
                cl_page_disown(env, io, page);
        } else {
+               anchor = &vvp_env_info(env)->vti_anchor;
+               cl_sync_io_init(anchor, 1, &cl_sync_io_end);
+               page->cp_sync_io = anchor;
+
                cl_2queue_add(queue, page);
        }
 
@@ -1127,6 +1132,26 @@ static int ll_io_read_page(const struct lu_env *env, struct cl_io *io,
        if (queue->c2_qin.pl_nr > 0)
                rc = cl_io_submit_rw(env, io, CRT_READ, queue);
 
+       if (anchor != NULL && !cl_page_is_owned(page, io)) { /* have sent */
+               rc = cl_sync_io_wait(env, anchor, 0);
+
+               cl_page_assume(env, io, page);
+               cl_page_list_del(env, &queue->c2_qout, page);
+
+               if (!PageUptodate(cl_page_vmpage(page))) {
+                       /* Failed to read a mirror, discard this page so that
+                        * new page can be created with new mirror.
+                        *
+                        * TODO: this is not needed after page reinit
+                        * route is implemented */
+                       cl_page_discard(env, io, page);
+               }
+               cl_page_disown(env, io, page);
+       }
+
+       /* TODO: discard all pages until page reinit route is implemented */
+       cl_page_list_discard(env, io, &queue->c2_qin);
+
        /*
         * Unlock unsent pages in case of error.
         */
index e1f3041..058086d 100644 (file)
@@ -131,6 +131,7 @@ struct vvp_thread_info {
        struct cl_lock_descr    vti_descr;
        struct cl_io            vti_io;
        struct cl_attr          vti_attr;
+       struct cl_sync_io       vti_anchor;
 };
 
 static inline struct vvp_thread_info *vvp_env_info(const struct lu_env *env)
index 886b816..c9c19b5 100644 (file)
@@ -298,7 +298,9 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
        struct cl_object *obj = io->ci_obj;
        struct vvp_io    *vio = cl2vvp_io(env, ios);
        struct inode     *inode = vvp_object_inode(obj);
+       __u32             gen = 0;
        int rc;
+       ENTRY;
 
        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 
@@ -320,18 +322,40 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
                 * block on layout lock held by the MDT
                 * as MDT will not send new layout in lvb (see LU-3124)
                 * we have to explicitly fetch it, all this will be done
-                * by ll_layout_refresh()
+                * by ll_layout_refresh().
+                * Even if ll_layout_restore() returns zero, it doesn't mean
+                * that restore has been successful. Therefore it sets
+                * ci_verify_layout so that it will check layout at the end
+                * of this function.
                 */
-               if (rc == 0) {
-                       io->ci_restore_needed = 0;
-                       io->ci_need_restart = 1;
-                       io->ci_verify_layout = 1;
-               } else {
+               if (rc) {
                        io->ci_restore_needed = 1;
                        io->ci_need_restart = 0;
                        io->ci_verify_layout = 0;
                        io->ci_result = rc;
+                       GOTO(out, rc);
                }
+
+               io->ci_restore_needed = 0;
+
+               /* Even if ll_layout_restore() returns zero, it doesn't mean
+                * that restore has been successful. Therefore it should verify
+                * if there was layout change and restart I/O correspondingly.
+                */
+               ll_layout_refresh(inode, &gen);
+               io->ci_need_restart = vio->vui_layout_gen != gen;
+               if (io->ci_need_restart) {
+                       CDEBUG(D_VFSTRACE,
+                              DFID" layout changed from %d to %d.\n",
+                              PFID(lu_object_fid(&obj->co_lu)),
+                              vio->vui_layout_gen, gen);
+                       /* today successful restore is the only possible
+                        * case */
+                       /* restore was done, clear restoring state */
+                       ll_file_clear_flag(ll_i2info(vvp_object_inode(obj)),
+                                          LLIF_FILE_RESTORING);
+               }
+               GOTO(out, 0);
        }
 
        /**
@@ -368,11 +392,11 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
                io->ci_result = rc;
                if (!rc)
                        io->ci_need_restart = 1;
+               GOTO(out, rc);
        }
 
-       if (!io->ci_ignore_layout && io->ci_verify_layout) {
-               __u32 gen = 0;
-
+       if (!io->ci_need_restart &&
+           !io->ci_ignore_layout && io->ci_verify_layout) {
                /* check layout version */
                ll_layout_refresh(inode, &gen);
                io->ci_need_restart = vio->vui_layout_gen != gen;
@@ -381,13 +405,11 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
                               DFID" layout changed from %d to %d.\n",
                               PFID(lu_object_fid(&obj->co_lu)),
                               vio->vui_layout_gen, gen);
-                       /* today successful restore is the only possible
-                        * case */
-                       /* restore was done, clear restoring state */
-                       ll_file_clear_flag(ll_i2info(vvp_object_inode(obj)),
-                                          LLIF_FILE_RESTORING);
                }
+               GOTO(out, 0);
        }
+out:
+       EXIT;
 }
 
 static void vvp_io_fault_fini(const struct lu_env *env,
@@ -755,6 +777,7 @@ static int vvp_io_read_start(const struct lu_env *env,
        size_t tot = vio->vui_tot_count;
        int exceed = 0;
        int result;
+       ENTRY;
 
        CLOBINVRNT(env, obj, vvp_object_invariant(obj));
 
@@ -766,13 +789,16 @@ static int vvp_io_read_start(const struct lu_env *env,
                down_read(&lli->lli_trunc_sem);
 
        if (!can_populate_pages(env, io, inode))
-               return 0;
+               RETURN(0);
 
-       result = vvp_prep_size(env, obj, io, range->cir_pos, tot, &exceed);
+       /* Unless this is reading a sparse file, otherwise the lock has already
+        * been acquired so vvp_prep_size() is an empty op. */
+       result = vvp_prep_size(env, obj, io, range->cir_pos, range->cir_count,
+                               &exceed);
        if (result != 0)
-               return result;
+               RETURN(result);
        else if (exceed != 0)
-               goto out;
+               GOTO(out, result);
 
        LU_OBJECT_HEADER(D_INODE, env, &obj->co_lu,
                         "Read ino %lu, %lu bytes, offset %lld, size %llu\n",
@@ -815,6 +841,7 @@ static int vvp_io_read_start(const struct lu_env *env,
                CERROR("Wrong IO type %u\n", vio->vui_io_subtype);
                LBUG();
        }
+       GOTO(out, result);
 
 out:
        if (result >= 0) {
index 47d4863..42545a9 100644 (file)
@@ -269,8 +269,14 @@ static void vvp_page_completion_read(const struct lu_env *env,
        if (ioret == 0)  {
                if (!vpg->vpg_defer_uptodate)
                        cl_page_export(env, page, 1);
-       } else {
+       } else if (vpg->vpg_defer_uptodate) {
                vpg->vpg_defer_uptodate = 0;
+               if (ioret == -EWOULDBLOCK) {
+                       /* mirror read failed, it needs to destroy the page
+                        * because subpage would be from wrong osc when trying
+                        * to read from a new mirror */
+                       ll_invalidate_page(vmpage);
+               }
        }
 
        if (page->cp_sync_io == NULL)
index f97747d..375675c 100644 (file)
@@ -178,7 +178,7 @@ struct lov_layout_raid0 {
         * object. This field is reset to 0 when attributes of
         * any sub-object change.
         */
-       int                    lo_attr_valid;
+       bool                   lo_attr_valid;
        /**
         * Array of sub-objects. Allocated when top-object is
         * created (lov_init_raid0()).
@@ -216,15 +216,27 @@ struct lov_layout_dom {
 };
 
 struct lov_layout_entry {
-       __u32 lle_type;
-       struct lu_extent lle_extent;
+       __u32                           lle_type;
+       unsigned int                    lle_valid:1;
+       struct lu_extent                *lle_extent;
+       struct lov_stripe_md_entry      *lle_lsme;
        struct lov_comp_layout_entry_ops *lle_comp_ops;
        union {
-               struct lov_layout_raid0 lle_raid0;
-               struct lov_layout_dom lle_dom;
+               struct lov_layout_raid0 lle_raid0;
+               struct lov_layout_dom   lle_dom;
        };
 };
 
+struct lov_mirror_entry {
+       unsigned short  lre_mirror_id;
+       unsigned short  lre_preferred:1,
+                       lre_valid:1;    /* set if at least one of components
+                                        * in this mirror is valid */
+       unsigned short  lre_start;      /* index to lo_entries, start index of
+                                        * this mirror */
+       unsigned short  lre_end;        /* end index of this mirror */
+};
+
 /**
  * lov-specific file state.
  *
@@ -280,9 +292,36 @@ struct lov_object {
                } released;
                struct lov_layout_composite {
                        /**
-                        * Current valid entry count of entries.
+                        * flags of lov_comp_md_v1::lcm_flags. Mainly used
+                        * by FLR.
+                        */
+                       uint32_t        lo_flags;
+                       /**
+                        * For FLR: index of preferred mirror to read.
+                        * Preferred mirror is initialized by the preferred
+                        * bit of lsme. It can be changed when the preferred
+                        * is inaccessible.
+                        * In order to make lov_lsm_entry() return the same
+                        * mirror in the same IO context, it's only possible
+                        * to change the preferred mirror when the
+                        * lo_active_ios reaches zero.
                         */
-                       unsigned int lo_entry_count;
+                       int             lo_preferred_mirror;
+                       /**
+                        * For FLR: the lock to protect access to
+                        * lo_preferred_mirror.
+                        */
+                       spinlock_t      lo_write_lock;
+                       /**
+                        * For FLR: Number of (valid) mirrors.
+                        */
+                       unsigned        lo_mirror_count;
+                       struct lov_mirror_entry *lo_mirrors;
+                       /**
+                        * Current entry count of lo_entries, include
+                        * invalid entries.
+                        */
+                       unsigned int    lo_entry_count;
                        struct lov_layout_entry *lo_entries;
                } composite;
        } u;
@@ -293,11 +332,80 @@ struct lov_object {
        struct task_struct            *lo_owner;
 };
 
-#define lov_foreach_layout_entry(lov, entry)                   \
-       for (entry = &lov->u.composite.lo_entries[0];           \
-            entry < &lov->u.composite.lo_entries               \
-                       [lov->u.composite.lo_entry_count];      \
-            entry++)
+static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i)
+{
+       LASSERT(lov->lo_type == LLT_COMP);
+       LASSERTF(i < lov->u.composite.lo_entry_count,
+                "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
+
+       return &lov->u.composite.lo_entries[i].lle_raid0;
+}
+
+static inline struct lov_stripe_md_entry *lov_lse(struct lov_object *lov, int i)
+{
+       LASSERT(lov->lo_lsm != NULL);
+       LASSERT(i < lov->lo_lsm->lsm_entry_count);
+
+       return lov->lo_lsm->lsm_entries[i];
+}
+
+static inline unsigned lov_flr_state(const struct lov_object *lov)
+{
+       if (lov->lo_type != LLT_COMP)
+               return LCM_FL_NOT_FLR;
+
+       return lov->u.composite.lo_flags & LCM_FL_FLR_MASK;
+}
+
+static inline bool lov_is_flr(const struct lov_object *lov)
+{
+       return lov_flr_state(lov) != LCM_FL_NOT_FLR;
+}
+
+static inline struct lov_layout_entry *lov_entry(struct lov_object *lov, int i)
+{
+       LASSERT(lov->lo_type == LLT_COMP);
+       LASSERTF(i < lov->u.composite.lo_entry_count,
+                "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
+
+       return &lov->u.composite.lo_entries[i];
+}
+
+#define lov_for_layout_entry(lov, entry, start, end)                   \
+       for (entry = lov_entry(lov, start);                             \
+            entry <= lov_entry(lov, end); entry++)
+
+#define lov_foreach_layout_entry(lov, entry)                           \
+       lov_for_layout_entry(lov, entry, 0,                             \
+                            (lov)->u.composite.lo_entry_count - 1)
+
+#define lov_foreach_mirror_layout_entry(lov, entry, lre)               \
+       lov_for_layout_entry(lov, entry, (lre)->lre_start, (lre)->lre_end)
+
+static inline struct lov_mirror_entry *
+lov_mirror_entry(struct lov_object *lov, int i)
+{
+       LASSERT(i < lov->u.composite.lo_mirror_count);
+       return &lov->u.composite.lo_mirrors[i];
+}
+
+#define lov_foreach_mirror_entry(lov, lre)                             \
+       for (lre = lov_mirror_entry(lov, 0);                            \
+            lre <= lov_mirror_entry(lov,                               \
+                               lov->u.composite.lo_mirror_count - 1);  \
+            lre++)
+
+static inline unsigned
+lov_layout_entry_index(struct lov_object *lov, struct lov_layout_entry *entry)
+{
+       struct lov_layout_entry *first = &lov->u.composite.lo_entries[0];
+       unsigned index = (unsigned)(entry - first);
+
+       LASSERT(entry >= first);
+       LASSERT(index < lov->u.composite.lo_entry_count);
+
+       return index;
+}
 
 /**
  * State lov_lock keeps for each sub-lock.
@@ -413,6 +521,26 @@ struct lov_io_sub {
 struct lov_io {
         /** super-class */
         struct cl_io_slice lis_cl;
+
+       /**
+        * FLR: index to lo_mirrors. Valid only if lov_is_flr() returns true.
+        *
+        * The mirror index of this io. Preserved over cl_io_init()
+        * if io->ci_ndelay_tried is greater than zero.
+        */
+       int                     lis_mirror_index;
+       /**
+        * FLR: the layout gen when lis_mirror_index was cached. The
+        * mirror index makes sense only when the layout gen doesn't
+        * change.
+        */
+       int                     lis_mirror_layout_gen;
+
+       /**
+        * fields below this will be initialized in lov_io_init().
+        */
+       unsigned                lis_preserved;
+
         /**
          * Pointer to the object slice. This is a duplicate of
          * lov_io::lis_cl::cis_object.
@@ -455,6 +583,7 @@ struct lov_io {
         * All sub-io's created in this lov_io.
         */
        struct list_head        lis_subios;
+
 };
 
 struct lov_session {
@@ -519,10 +648,25 @@ struct lu_object *lovsub_object_alloc(const struct lu_env *env,
 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov);
 int lov_page_stripe(const struct cl_page *page);
 int lov_lsm_entry(const struct lov_stripe_md *lsm, __u64 offset);
+int lov_io_layout_at(struct lov_io *lio, __u64 offset);
 
 #define lov_foreach_target(lov, var)                    \
         for (var = 0; var < lov_targets_nr(lov); ++var)
 
+static inline struct lu_extent *lov_io_extent(struct lov_io *io, int i)
+{
+       return &lov_lse(io->lis_object, i)->lsme_extent;
+}
+
+/**
+ * For layout entries within @ext.
+ */
+#define lov_foreach_io_layout(ind, lio, ext)                           \
+       for (ind = lov_io_layout_at(lio, (ext)->e_start);               \
+            ind >= 0 &&                                                \
+            lu_extent_is_overlapped(lov_io_extent(lio, ind), ext);     \
+            ind = lov_io_layout_at(lio, lov_io_extent(lio, ind)->e_end))
+
 /*****************************************************************************
  *
  * Type conversions.
@@ -691,32 +835,6 @@ static inline struct lov_thread_info *lov_env_info(const struct lu_env *env)
         return info;
 }
 
-static inline struct lov_layout_entry *lov_entry(struct lov_object *lov, int i)
-{
-       LASSERT(lov->lo_type == LLT_COMP);
-       LASSERTF(i < lov->u.composite.lo_entry_count,
-                "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
-
-       return &lov->u.composite.lo_entries[i];
-}
-
-static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i)
-{
-       LASSERT(lov->lo_type == LLT_COMP);
-       LASSERTF(i < lov->u.composite.lo_entry_count,
-                "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
-
-       return &lov->u.composite.lo_entries[i].lle_raid0;
-}
-
-static inline struct lov_stripe_md_entry *lov_lse(struct lov_object *lov, int i)
-{
-       LASSERT(lov->lo_lsm != NULL);
-       LASSERT(i < lov->lo_lsm->lsm_entry_count);
-
-       return lov->lo_lsm->lsm_entries[i];
-}
-
 /* lov_pack.c */
 int lov_getstripe(const struct lu_env *env, struct lov_object *obj,
                  struct lov_stripe_md *lsm, struct lov_user_md __user *lump,
index b100973..8866e4f 100644 (file)
@@ -89,6 +89,15 @@ static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
        EXIT;
 }
 
+static inline bool
+is_index_within_mirror(struct lov_object *lov, int index, int mirror_index)
+{
+       struct lov_layout_composite *comp = &lov->u.composite;
+       struct lov_mirror_entry *lre = &comp->lo_mirrors[mirror_index];
+
+       return (index >= lre->lre_start && index <= lre->lre_end);
+}
+
 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
                           struct lov_io_sub *sub)
 {
@@ -106,6 +115,11 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
                     !lov_r0(lov, index)->lo_sub[stripe]))
                RETURN(-EIO);
 
+       LASSERTF(is_index_within_mirror(lov, index, lio->lis_mirror_index),
+                DFID "iot = %d, index = %d, mirror = %d\n",
+                PFID(lu_object_fid(lov2lu(lov))), io->ci_type, index,
+                lio->lis_mirror_index);
+
        /* obtain new environment */
        sub->sub_env = cl_env_get(&sub->sub_refcheck);
        if (IS_ERR(sub->sub_env))
@@ -124,6 +138,7 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
        sub_io->ci_noatime = io->ci_noatime;
        sub_io->ci_pio = io->ci_pio;
        sub_io->ci_lock_no_expand = io->ci_lock_no_expand;
+       sub_io->ci_ndelay = io->ci_ndelay;
 
        result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
 
@@ -200,9 +215,102 @@ static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
        RETURN(0);
 }
 
+static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
+                              struct cl_io *io)
+{
+       struct lov_layout_composite *comp = &obj->u.composite;
+       int index;
+       int i;
+       ENTRY;
+
+       if (!lov_is_flr(obj)) {
+               LASSERT(comp->lo_preferred_mirror == 0);
+               lio->lis_mirror_index = comp->lo_preferred_mirror;
+               io->ci_ndelay = 0;
+               RETURN(0);
+       }
+
+       if (io->ci_ndelay_tried == 0 || /* first time to try */
+           /* reset the mirror index if layout has changed */
+           lio->lis_mirror_layout_gen != obj->lo_lsm->lsm_layout_gen) {
+               lio->lis_mirror_layout_gen = obj->lo_lsm->lsm_layout_gen;
+               index = lio->lis_mirror_index = comp->lo_preferred_mirror;
+       } else {
+               index = lio->lis_mirror_index;
+               LASSERT(index >= 0);
+
+               /* move mirror index to the next one */
+               index = (index + 1) % comp->lo_mirror_count;
+       }
+
+       for (i = 0; i < comp->lo_mirror_count; i++) {
+               struct lu_extent ext = { .e_start = lio->lis_pos,
+                                        .e_end   = lio->lis_pos + 1 };
+               struct lov_mirror_entry *lre;
+               struct lov_layout_entry *lle;
+               bool found = false;
+
+               lre = &comp->lo_mirrors[(index + i) % comp->lo_mirror_count];
+               if (!lre->lre_valid)
+                       continue;
+
+               lov_foreach_mirror_layout_entry(obj, lle, lre) {
+                       if (!lle->lle_valid)
+                               continue;
+
+                       if (lu_extent_is_overlapped(&ext, lle->lle_extent)) {
+                               found = true;
+                               break;
+                       }
+               }
+
+               if (found) {
+                       index = (index + i) % comp->lo_mirror_count;
+                       break;
+               }
+       }
+       if (i == comp->lo_mirror_count) {
+               CERROR(DFID": failed to find a component covering "
+                      "I/O region at %llu\n",
+                      PFID(lu_object_fid(lov2lu(obj))), lio->lis_pos);
+
+               dump_lsm(D_ERROR, obj->lo_lsm);
+
+               RETURN(-EIO);
+       }
+
+       CDEBUG(D_VFSTRACE, DFID ": flr state: %d, move mirror from %d to %d, "
+              "have retried: %d, mirror count: %d\n",
+              PFID(lu_object_fid(lov2lu(obj))), lov_flr_state(obj),
+              lio->lis_mirror_index, index, io->ci_ndelay_tried,
+              comp->lo_mirror_count);
+
+       lio->lis_mirror_index = index;
+
+       /* FLR: if all mirrors have been tried once, most likely the network
+        * of this client has been partitioned. We should relinquish CPU for
+        * a while before trying again. */
+       ++io->ci_ndelay_tried;
+       if (io->ci_ndelay && io->ci_ndelay_tried >= comp->lo_mirror_count) {
+               set_current_state(TASK_INTERRUPTIBLE);
+               schedule_timeout(msecs_to_jiffies(MSEC_PER_SEC)); /* 10ms */
+               if (signal_pending(current))
+                       RETURN(-EINTR);
+
+               /* reset retry counter */
+               io->ci_ndelay_tried = 1;
+       }
+
+       CDEBUG(D_VFSTRACE, "use %sdelayed RPC state for this IO\n",
+              io->ci_ndelay ? "non-" : "");
+
+       RETURN(0);
+}
+
 static int lov_io_slice_init(struct lov_io *lio,
                             struct lov_object *obj, struct cl_io *io)
 {
+       int result = 0;
        ENTRY;
 
        io->ci_result = 0;
@@ -266,7 +374,8 @@ static int lov_io_slice_init(struct lov_io *lio,
                lio->lis_pos = 0;
                lio->lis_endpos = OBD_OBJECT_EOF;
 
-               if ((obj->lo_lsm->lsm_flags & LCM_FL_FLR_MASK) == LCM_FL_RDONLY)
+               if (lov_flr_state(obj) == LCM_FL_RDONLY &&
+                   !OBD_FAIL_CHECK(OBD_FAIL_FLR_GLIMPSE_IMMUTABLE))
                        RETURN(1); /* SoM is accurate, no need glimpse */
                break;
 
@@ -279,7 +388,8 @@ static int lov_io_slice_init(struct lov_io *lio,
                 LBUG();
         }
 
-       RETURN(0);
+       result = lov_io_mirror_init(lio, obj, io);
+       RETURN(result);
 }
 
 static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
@@ -406,7 +516,6 @@ static int lov_io_iter_init(const struct lu_env *env,
        struct lov_io        *lio = cl2lov_io(env, ios);
        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
        struct lov_io_sub    *sub;
-       struct lov_layout_entry *le;
        struct lu_extent ext;
        int index;
        int rc = 0;
@@ -416,20 +525,15 @@ static int lov_io_iter_init(const struct lu_env *env,
        ext.e_start = lio->lis_pos;
        ext.e_end = lio->lis_endpos;
 
-       index = 0;
-       lov_foreach_layout_entry(lio->lis_object, le) {
-               struct lov_layout_raid0 *r0 = &le->lle_raid0;
+       lov_foreach_io_layout(index, lio, &ext) {
+               struct lov_layout_raid0 *r0 = lov_r0(lio->lis_object, index);
                u64 start;
                u64 end;
                int stripe;
 
-               index++;
-               if (!lu_extent_is_overlapped(&ext, &le->lle_extent))
-                       continue;
-
                CDEBUG(D_VFSTRACE, "component[%d] flags %#x\n",
-                      index - 1, lsm->lsm_entries[index - 1]->lsme_flags);
-               if (!lsm_entry_inited(lsm, index - 1)) {
+                      index, lsm->lsm_entries[index]->lsme_flags);
+               if (!lsm_entry_inited(lsm, index)) {
                        /* truncate IO will trigger write intent as well, and
                         * it's handled in lov_io_setattr_iter_init() */
                        if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io)) {
@@ -446,7 +550,7 @@ static int lov_io_iter_init(const struct lu_env *env,
                }
 
                for (stripe = 0; stripe < r0->lo_nr; stripe++) {
-                       if (!lov_stripe_intersects(lsm, index - 1, stripe,
+                       if (!lov_stripe_intersects(lsm, index, stripe,
                                                   &ext, &start, &end))
                                continue;
 
@@ -461,7 +565,7 @@ static int lov_io_iter_init(const struct lu_env *env,
 
                        end = lov_offset_mod(end, 1);
                        sub = lov_sub_get(env, lio,
-                                         lov_comp_index(index - 1, stripe));
+                                         lov_comp_index(index, stripe));
                        if (IS_ERR(sub)) {
                                rc = PTR_ERR(sub);
                                break;
@@ -504,7 +608,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
        if (cl_io_is_append(io))
                RETURN(lov_io_iter_init(env, ios));
 
-       index = lov_lsm_entry(lsm, range->cir_pos);
+       index = lov_io_layout_at(lio, range->cir_pos);
        if (index < 0) { /* non-existing layout component */
                if (io->ci_type == CIT_READ) {
                        /* TODO: it needs to detect the next component and
@@ -580,7 +684,7 @@ static int lov_io_setattr_iter_init(const struct lu_env *env,
        ENTRY;
 
        if (cl_io_is_trunc(io) && lio->lis_pos > 0) {
-               index = lov_lsm_entry(lsm, lio->lis_pos - 1);
+               index = lov_io_layout_at(lio, lio->lis_pos - 1);
                /* no entry found for such offset */
                if (index < 0) {
                        RETURN(io->ci_result = -ENODATA);
@@ -724,7 +828,7 @@ static int lov_io_read_ahead(const struct lu_env *env,
        ENTRY;
 
        offset = cl_offset(obj, start);
-       index = lov_lsm_entry(loo->lo_lsm, offset);
+       index = lov_io_layout_at(lio, offset);
        if (index < 0 || !lsm_entry_inited(loo->lo_lsm, index))
                RETURN(-ENODATA);
 
@@ -762,7 +866,7 @@ static int lov_io_read_ahead(const struct lu_env *env,
                                               ra_end, stripe);
 
        /* boundary of current component */
-       ra_end = cl_index(obj, (loff_t)lov_lse(loo, index)->lsme_extent.e_end);
+       ra_end = cl_index(obj, (loff_t)lov_io_extent(lio, index)->e_end);
        if (ra_end != CL_PAGE_EOF && ra->cra_end >= ra_end)
                ra->cra_end = ra_end - 1;
 
@@ -1210,8 +1314,8 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
                LASSERTF(0, "invalid type %d\n", io->ci_type);
                result = -EOPNOTSUPP;
                break;
-       case CIT_MISC:
        case CIT_GLIMPSE:
+       case CIT_MISC:
        case CIT_FSYNC:
        case CIT_LADVISE:
        case CIT_DATA_VERSION:
@@ -1245,4 +1349,45 @@ int lov_io_init_released(const struct lu_env *env, struct cl_object *obj,
        io->ci_result = result < 0 ? result : 0;
        RETURN(result);
 }
+
+/**
+ * Return the index in composite:lo_entries by the file offset
+ */
+int lov_io_layout_at(struct lov_io *lio, __u64 offset)
+{
+       struct lov_object *lov = lio->lis_object;
+       struct lov_layout_composite *comp = &lov->u.composite;
+       int start_index = 0;
+       int end_index = comp->lo_entry_count - 1;
+       int i;
+
+       LASSERT(lov->lo_type == LLT_COMP);
+
+       /* This is actual file offset so nothing can cover eof. */
+       if (offset == LUSTRE_EOF)
+               return -1;
+
+       if (lov_is_flr(lov)) {
+               struct lov_mirror_entry *lre;
+
+               LASSERT(lio->lis_mirror_index >= 0);
+
+               lre = &comp->lo_mirrors[lio->lis_mirror_index];
+               start_index = lre->lre_start;
+               end_index = lre->lre_end;
+       }
+
+       for (i = start_index; i <= end_index; i++) {
+               struct lov_layout_entry *lle = lov_entry(lov, i);
+
+               if ((offset >= lle->lle_extent->e_start &&
+                    offset < lle->lle_extent->e_end) ||
+                   (offset == OBD_OBJECT_EOF &&
+                    lle->lle_extent->e_end == OBD_OBJECT_EOF))
+                       return i;
+       }
+
+       return -1;
+}
+
 /** @} lov */
index efa4cc1..4f1172c 100644 (file)
@@ -133,15 +133,9 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
                ext.e_end  = cl_offset(obj, lock->cll_descr.cld_end + 1);
 
        nr = 0;
-       for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
-            index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) {
+       lov_foreach_io_layout(index, lov_env_io(env), &ext) {
                struct lov_layout_raid0 *r0 = lov_r0(lov, index);
 
-               /* assume lsm entries are sorted. */
-               if (!lu_extent_is_overlapped(&ext,
-                                            &lov_lse(lov, index)->lsme_extent))
-                       break;
-
                for (i = 0; i < r0->lo_nr; i++) {
                        if (likely(r0->lo_sub[i] != NULL) && /* spare layout */
                            lov_stripe_intersects(lov->lo_lsm, index, i,
@@ -161,14 +155,9 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
 
        lovlck->lls_nr = nr;
        nr = 0;
-       for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
-            index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) {
+       lov_foreach_io_layout(index, lov_env_io(env), &ext) {
                struct lov_layout_raid0 *r0 = lov_r0(lov, index);
 
-               /* assume lsm entries are sorted. */
-               if (!lu_extent_is_overlapped(&ext,
-                                            &lov_lse(lov, index)->lsme_extent))
-                       break;
                for (i = 0; i < r0->lo_nr; ++i) {
                        struct lov_lock_sub *lls = &lovlck->lls_sub[nr];
                        struct cl_lock_descr *descr = &lls->sub_lock.cll_descr;
index 5a285d0..d8d479b 100644 (file)
@@ -454,8 +454,8 @@ static int lov_attr_get_dom(const struct lu_env *env, struct lov_object *lov,
         * client's setattr RPC, so do not count anything beyond
         * component end. Alternatively, check that limit on server
         * and do not allow size overflow there. */
-       if (attr->cat_size > lle->lle_extent.e_end)
-               attr->cat_size = lle->lle_extent.e_end;
+       if (attr->cat_size > lle->lle_extent->e_end)
+               attr->cat_size = lle->lle_extent->e_end;
 
        attr->cat_kms = attr->cat_size;
 
@@ -629,10 +629,13 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
 {
        struct lov_layout_composite *comp = &state->composite;
        struct lov_layout_entry *lle;
+       struct lov_mirror_entry *lre;
        unsigned int entry_count;
        unsigned int psz = 0;
+       unsigned int mirror_count;
+       int flr_state = lsm->lsm_flags & LCM_FL_FLR_MASK;
        int result = 0;
-       int i;
+       int i, j;
 
        ENTRY;
 
@@ -641,18 +644,36 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
        lov->lo_lsm = lsm_addref(lsm);
        lov->lo_layout_invalid = true;
 
+       dump_lsm(D_INODE, lsm);
+
        entry_count = lsm->lsm_entry_count;
-       comp->lo_entry_count = entry_count;
+
+       spin_lock_init(&comp->lo_write_lock);
+       comp->lo_flags = lsm->lsm_flags;
+       comp->lo_mirror_count = lsm->lsm_mirror_count + 1;
+       comp->lo_entry_count = lsm->lsm_entry_count;
+       comp->lo_preferred_mirror = -1;
+
+       if (equi(flr_state == LCM_FL_NOT_FLR, comp->lo_mirror_count > 1))
+               RETURN(-EINVAL);
+
+       OBD_ALLOC(comp->lo_mirrors,
+                 comp->lo_mirror_count * sizeof(*comp->lo_mirrors));
+       if (comp->lo_mirrors == NULL)
+               RETURN(-ENOMEM);
 
        OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
        if (comp->lo_entries == NULL)
                RETURN(-ENOMEM);
 
        /* Initiate all entry types and extents data at first */
-       for (i = 0; i < entry_count; i++) {
+       for (i = 0, j = 0, mirror_count = 1; i < entry_count; i++) {
+               int mirror_id = 0;
+
                lle = &comp->lo_entries[i];
 
-               lle->lle_type = lov_entry_type(lsm->lsm_entries[i]);
+               lle->lle_lsme = lsm->lsm_entries[i];
+               lle->lle_type = lov_entry_type(lle->lle_lsme);
                switch (lle->lle_type) {
                case LOV_PATTERN_RAID0:
                        lle->lle_comp_ops = &raid0_ops;
@@ -667,30 +688,96 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                        dump_lsm(D_ERROR, lsm);
                        RETURN(-EIO);
                }
-               lle->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+
+               lle->lle_extent = &lle->lle_lsme->lsme_extent;
+               lle->lle_valid = !(lle->lle_lsme->lsme_flags & LCME_FL_STALE);
+
+               if (flr_state != LCM_FL_NOT_FLR)
+                       mirror_id = mirror_id_of(lle->lle_lsme->lsme_id);
+
+               lre = &comp->lo_mirrors[j];
+               if (i > 0) {
+                       if (mirror_id == lre->lre_mirror_id) {
+                               lre->lre_valid |= lle->lle_valid;
+                               lre->lre_end = i;
+                               continue;
+                       }
+
+                       /* new mirror detected, assume that the mirrors
+                        * are shorted in layout */
+                       ++mirror_count;
+                       ++j;
+                       if (j >= comp->lo_mirror_count)
+                               break;
+
+                       lre = &comp->lo_mirrors[j];
+               }
+
+               /* entries must be sorted by mirrors */
+               lre->lre_mirror_id = mirror_id;
+               lre->lre_start = lre->lre_end = i;
+               lre->lre_preferred = (lle->lle_lsme->lsme_flags &
+                                       LCME_FL_PREFERRED);
+               lre->lre_valid = lle->lle_valid;
+       }
+
+       /* sanity check for FLR */
+       if (mirror_count != comp->lo_mirror_count) {
+               CDEBUG(D_INODE, DFID
+                      " doesn't have the # of mirrors it claims, %u/%u\n",
+                      PFID(lu_object_fid(lov2lu(lov))), mirror_count,
+                      comp->lo_mirror_count + 1);
+
+               GOTO(out, result = -EINVAL);
        }
 
-       i = 0;
        lov_foreach_layout_entry(lov, lle) {
+               int index = lov_layout_entry_index(lov, lle);
+
                /**
                 * If the component has not been init-ed on MDS side, for
                 * PFL layout, we'd know that the components beyond this one
                 * will be dynamically init-ed later on file write/trunc ops.
                 */
-               if (lsm_entry_inited(lsm, i)) {
-                       result = lle->lle_comp_ops->lco_init(env, dev, lov, i,
-                                                            conf, lle);
-                       if (result < 0)
-                               break;
+               if (!lsme_inited(lle->lle_lsme))
+                       continue;
 
-                       LASSERT(ergo(psz > 0, psz == result));
-                       psz = result;
-               }
-               i++;
+               result = lle->lle_comp_ops->lco_init(env, dev, lov, index,
+                                                    conf, lle);
+               if (result < 0)
+                       break;
+
+               LASSERT(ergo(psz > 0, psz == result));
+               psz = result;
        }
+
        if (psz > 0)
                cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
 
+       /* decide the preferred mirror */
+       mirror_count = 0, i = 0;
+       lov_foreach_mirror_entry(lov, lre) {
+               i++;
+               if (!lre->lre_valid)
+                       continue;
+
+               mirror_count++; /* valid mirror */
+
+               if (lre->lre_preferred || comp->lo_preferred_mirror < 0)
+                       comp->lo_preferred_mirror = i - 1;
+       }
+       if (mirror_count == 0) {
+               CDEBUG(D_INODE, DFID
+                      " doesn't have any valid mirrors\n",
+                      PFID(lu_object_fid(lov2lu(lov))));
+
+               GOTO(out, result = -EINVAL);
+       }
+
+       LASSERT(comp->lo_preferred_mirror >= 0);
+
+       EXIT;
+out:
        return result > 0 ? 0 : result;
 }
 
@@ -768,6 +855,14 @@ static void lov_fini_composite(const struct lu_env *env,
                comp->lo_entries = NULL;
        }
 
+       if (comp->lo_mirrors != NULL) {
+               OBD_FREE(comp->lo_mirrors,
+                        comp->lo_mirror_count * sizeof(*comp->lo_mirrors));
+               comp->lo_mirrors = NULL;
+       }
+
+       memset(comp, 0, sizeof(*comp));
+
        dump_lsm(D_INODE, lov->lo_lsm);
        lov_free_memmd(&lov->lo_lsm);
 
@@ -854,7 +949,6 @@ static int lov_attr_get_composite(const struct lu_env *env,
        struct lov_object       *lov = cl2lov(obj);
        struct lov_layout_entry *entry;
        int                      result = 0;
-       int                      index = 0;
 
        ENTRY;
 
@@ -862,18 +956,20 @@ static int lov_attr_get_composite(const struct lu_env *env,
        attr->cat_blocks = 0;
        lov_foreach_layout_entry(lov, entry) {
                struct cl_attr *lov_attr = NULL;
+               int index = lov_layout_entry_index(lov, entry);
+
+               if (!entry->lle_valid)
+                       continue;
 
                /* PFL: This component has not been init-ed. */
                if (!lsm_entry_inited(lov->lo_lsm, index))
-                       break;
+                       continue;
 
                result = entry->lle_comp_ops->lco_getattr(env, lov, index,
                                                          entry, &lov_attr);
                if (result < 0)
                        RETURN(result);
 
-               index++;
-
                if (lov_attr == NULL)
                        continue;
 
@@ -895,6 +991,7 @@ static int lov_attr_get_composite(const struct lu_env *env,
                if (attr->cat_mtime < lov_attr->cat_mtime)
                        attr->cat_mtime = lov_attr->cat_mtime;
        }
+
        RETURN(0);
 }
 
@@ -1089,12 +1186,11 @@ static int lov_layout_change(const struct lu_env *unused,
        CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
               PFID(lu_object_fid(lov2lu(lov))), lov, llt);
 
-       lov->lo_type = LLT_EMPTY;
-
        /* page bufsize fixup */
        cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
                lov_page_slice_fixup(lov, NULL);
 
+       lov->lo_type = llt;
        rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
        if (rc != 0) {
                struct obd_device *obd = lov2obd(lov_dev->ld_lov);
@@ -1104,11 +1200,10 @@ static int lov_layout_change(const struct lu_env *unused,
                new_ops->llo_delete(env, lov, state);
                new_ops->llo_fini(env, lov, state);
                /* this file becomes an EMPTY file. */
+               lov->lo_type = LLT_EMPTY;
                GOTO(out, rc);
        }
 
-       lov->lo_type = llt;
-
 out:
        cl_env_put(env, &refcheck);
        RETURN(rc);
@@ -1264,7 +1359,7 @@ int lov_page_init(const struct lu_env *env, struct cl_object *obj,
 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
                struct cl_io *io)
 {
-       CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
+       CL_IO_SLICE_CLEAN(lov_env_io(env), lis_preserved);
 
        CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
               PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
@@ -1786,6 +1881,7 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        if (start_entry == -1 || end_entry == -1)
                GOTO(out_fm_local, rc = -EINVAL);
 
+       /* TODO: rewrite it with lov_foreach_io_layout() */
        for (entry = start_entry; entry <= end_entry; entry++) {
                lsme = lsm->lsm_entries[entry];
 
index 869c0b8..5ab3da5 100644 (file)
@@ -82,7 +82,7 @@ int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj,
        ENTRY;
 
        offset = cl_offset(obj, index);
-       entry = lov_lsm_entry(loo->lo_lsm, offset);
+       entry = lov_io_layout_at(lio, offset);
        if (entry < 0 || !lsm_entry_inited(loo->lo_lsm, entry)) {
                /* non-existing layout component */
                lov_page_init_empty(env, obj, page, index);
index 54db7ac..2470386 100644 (file)
@@ -189,9 +189,12 @@ EXPORT_SYMBOL(cl_io_sub_init);
 int cl_io_init(const struct lu_env *env, struct cl_io *io,
                enum cl_io_type iot, struct cl_object *obj)
 {
-        LASSERT(obj == cl_object_top(obj));
+       LASSERT(obj == cl_object_top(obj));
 
-        return cl_io_init0(env, io, iot, obj);
+       /* clear I/O restart from previous instance */
+       io->ci_need_restart = 0;
+
+       return cl_io_init0(env, io, iot, obj);
 }
 EXPORT_SYMBOL(cl_io_init);
 
@@ -881,6 +884,11 @@ int cl_io_loop(const struct lu_env *env, struct cl_io *io)
                cl_io_iter_fini(env, io);
        } while (!rc && io->ci_continue);
 
+       if (rc == -EWOULDBLOCK && io->ci_ndelay) {
+               io->ci_need_restart = 1;
+               rc = 0;
+       }
+
        CDEBUG(D_VFSTRACE, "loop type %u done: nob: %zu, rc: %d %s\n",
                io->ci_type, io->ci_nob, rc,
                io->ci_continue ? "continue" : "stop");
@@ -900,8 +908,11 @@ int cl_io_loop(const struct lu_env *env, struct cl_io *io)
                        pt->cip_iot == CIT_READ ? "read" : "write",
                        pt->cip_pos, pt->cip_pos + pt->cip_count,
                        pt->cip_result, rc2);
-               if (rc2)
-                       rc = rc ? rc : rc2;
+
+               /* save the result of ptask */
+               rc = rc ? : rc2;
+               io->ci_need_restart |= pt->cip_need_restart;
+
                if (!short_io) {
                        if (!rc2) /* IO is done by this task successfully */
                                io->ci_nob += pt->cip_result;
@@ -1146,6 +1157,7 @@ void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
                cl_page_discard(env, io, page);
        EXIT;
 }
+EXPORT_SYMBOL(cl_page_list_discard);
 
 /**
  * Initialize dual page queue.
index d5b19e2..101e666 100644 (file)
@@ -1970,6 +1970,7 @@ static int try_to_add_extent_for_io(struct client_obd *cli,
 
                if (tmp->oe_srvlock != ext->oe_srvlock ||
                    !tmp->oe_grants != !ext->oe_grants ||
+                   tmp->oe_ndelay != ext->oe_ndelay ||
                    tmp->oe_no_merge || ext->oe_no_merge)
                        RETURN(0);
 
@@ -2720,7 +2721,7 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
 }
 
 int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
-                        struct list_head *list, int cmd, int brw_flags)
+                        struct list_head *list, int brw_flags)
 {
        struct client_obd     *cli = osc_cli(obj);
        struct osc_extent     *ext;
@@ -2758,7 +2759,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
                RETURN(-ENOMEM);
        }
 
-       ext->oe_rw = !!(cmd & OBD_BRW_READ);
+       ext->oe_rw = !!(brw_flags & OBD_BRW_READ);
        ext->oe_sync = 1;
        ext->oe_no_merge = !can_merge;
        ext->oe_urgent = 1;
@@ -2766,6 +2767,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
        ext->oe_end = ext->oe_max_end = end;
        ext->oe_obj = obj;
        ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
+       ext->oe_ndelay = !!(brw_flags & OBD_BRW_NDELAY);
        ext->oe_nr_pages = page_count;
        ext->oe_mppr = mppr;
        list_splice_init(list, &ext->oe_pages);
@@ -2773,7 +2775,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
        osc_object_lock(obj);
        /* Reuse the initial refcount for RPC, don't drop it */
        osc_extent_state_set(ext, OES_LOCK_DONE);
-       if (cmd & OBD_BRW_WRITE) {
+       if (!ext->oe_rw) { /* write */
                list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
                osc_update_pending(obj, OBD_BRW_WRITE, page_count);
        } else {
index 99733f3..6650f0a 100644 (file)
@@ -119,7 +119,6 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
        struct cl_page_list *qout     = &queue->c2_qout;
        unsigned int queued = 0;
        int result = 0;
-       int cmd;
        int brw_flags;
        unsigned int max_pages;
 
@@ -131,8 +130,10 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
        cli = osc_cli(osc);
        max_pages = cli->cl_max_pages_per_rpc;
 
-       cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
        brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0;
+       brw_flags |= crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
+       if (crt == CRT_READ && ios->cis_io->ci_ndelay)
+               brw_flags |= OBD_BRW_NDELAY;
 
         /*
          * NOTE: here @page is a top-level page. This is done to avoid
@@ -186,7 +187,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
 
                if (++queued == max_pages) {
                        queued = 0;
-                       result = osc_queue_sync_pages(env, osc, &list, cmd,
+                       result = osc_queue_sync_pages(env, osc, &list,
                                                      brw_flags);
                        if (result < 0)
                                break;
@@ -194,7 +195,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios,
        }
 
        if (queued > 0)
-               result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags);
+               result = osc_queue_sync_pages(env, osc, &list, brw_flags);
 
        /* Update c/mtime for sync write. LU-7310 */
        if (crt == CRT_WRITE && qout->pl_nr > 0 && result == 0) {
index ae62a40..c0e1472 100644 (file)
@@ -306,6 +306,8 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
                                    NULL, &oscl->ols_lvb);
                /* Hide the error. */
                rc = 0;
+       } else if (rc < 0 && oscl->ols_flags & LDLM_FL_NDELAY) {
+               rc = -EWOULDBLOCK;
        }
 
        if (oscl->ols_owner != NULL)
@@ -1184,6 +1186,8 @@ int osc_lock_init(const struct lu_env *env,
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
                oscl->ols_glimpse = 1;
        }
+       if (io->ci_ndelay && cl_object_same(io->ci_obj, obj))
+               oscl->ols_flags |= LDLM_FL_NDELAY;
        osc_lock_build_einfo(env, lock, cl2osc(obj), &oscl->ols_einfo);
 
        cl_lock_slice_add(lock, &oscl->ols_cl, obj, &osc_lock_ops);
index 5c6438c..9cb2c6d 100644 (file)
@@ -1778,7 +1778,7 @@ static int brw_interpret(const struct lu_env *env,
         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
         /* When server return -EINPROGRESS, client should always retry
          * regardless of the number of times the bulk was resent already. */
-       if (osc_recoverable_error(rc)) {
+       if (osc_recoverable_error(rc) && !req->rq_no_delay) {
                if (req->rq_import_generation !=
                    req->rq_import->imp_generation) {
                        CDEBUG(D_HA, "%s: resend cross eviction for object: "
@@ -1859,7 +1859,8 @@ static int brw_interpret(const struct lu_env *env,
 
        list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
                list_del_init(&ext->oe_link);
-               osc_extent_finish(env, ext, 1, rc);
+               osc_extent_finish(env, ext, 1,
+                                 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
        }
        LASSERT(list_empty(&aa->aa_exts));
        LASSERT(list_empty(&aa->aa_oaps));
@@ -1927,6 +1928,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        int                             page_count = 0;
        bool                            soft_sync = false;
        bool                            interrupted = false;
+       bool                            ndelay = false;
        int                             i;
        int                             grant = 0;
        int                             rc;
@@ -1983,6 +1985,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                        if (oap->oap_interrupted)
                                interrupted = true;
                }
+               if (ext->oe_ndelay)
+                       ndelay = true;
        }
 
        /* first page in the list */
@@ -2012,6 +2016,12 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        oap->oap_request = ptlrpc_request_addref(req);
        if (interrupted && !req->rq_intr)
                ptlrpc_mark_interrupted(req);
+       if (ndelay) {
+               req->rq_no_resend = req->rq_no_delay = 1;
+               /* probably set a shorter timeout value.
+                * to handle ETIMEDOUT in brw_interpret() correctly. */
+               /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
+       }
 
        /* Need to update the timestamps after the request is built in case
         * we race with setattr (locally or in queue at OST).  If OST gets
index d82249c..d37ccba 100644 (file)
 #define ACT_SEEK        4
 #define ACT_READHOLE    8
 #define ACT_VERIFY      16
+#define ACT_OUTPUT     32
 
 void usage()
 {
-       printf("usage: rwv -f filename <-r|-w> [-a] [-z] [-d] [-v]"
-                "[-s offset] -n iovcnt SIZE1 SIZE2 SIZE3...\n");
-        printf("-a  append IO (O_APPEND)\n");
-        printf("-r  file read (O_RDONLY)\n");
-        printf("-w  file write (O_WRONLY)\n");
-        printf("-s  set the start pos of the read/write test\n");
-        printf("-z  test for read hitting hole\n");
-        printf("-d  create flags (O_LOV_DELAY_CREATE)\n");
-        printf("-v  verify the data content of read\n");
+       printf("usage: rwv -f filename <-r|-w> [-a] [-z] [-d] [-v]");
+       printf(" [-s offset] [-o[outf]] -n iovcnt SIZE1 SIZE2 SIZE3...\n");
+       printf("-a  append IO (O_APPEND)\n");
+       printf("-r  file read (O_RDONLY)\n");
+       printf("-w  file write (O_WRONLY)\n");
+       printf("-s  set the start pos of the read/write test\n");
+       printf("-z  test for read hitting hole\n");
+       printf("-d  create flags (O_LOV_DELAY_CREATE)\n");
+       printf("-v  verify the data content of read\n");
+       printf("-o  write the file content of read to an optional file\n");
 }
 
 int data_verify(struct iovec *iov, int iovcnt, char c)
@@ -91,6 +93,7 @@ int main(int argc, char** argv)
         int flags = 0;
         int iovcnt = 0;
         int act = ACT_NONE;
+       int out_fd = -1;
         char pad = 0xba;
         char *end;
         char *fname = "FILE";
@@ -98,7 +101,7 @@ int main(int argc, char** argv)
         struct iovec *iov;
         off64_t offset = 0;
 
-        while ((c = getopt(argc, argv, "f:n:s:rwahvdz")) != -1) {
+       while ((c = getopt(argc, argv, "f:n:s:rwahvdzo::")) != -1) {
                 switch (c) {
                 case 'f':
                         fname = optarg;
@@ -122,12 +125,14 @@ int main(int argc, char** argv)
                                 return 1;
                         }
                         break;
-                case 'w':
-                        act |= ACT_WRITE;
-                        break;
-                case 'r':
-                        act |= ACT_READ;
-                        break;
+               case 'w':
+                       act |= ACT_WRITE;
+                       flags |= O_WRONLY | O_CREAT;
+                       break;
+               case 'r':
+                       act |= ACT_READ;
+                       flags |= O_RDONLY;
+                       break;
                 case 'a':
                         flags |= O_APPEND;
                         break;
@@ -141,6 +146,13 @@ int main(int argc, char** argv)
                 case 'v':
                         act |= ACT_VERIFY;
                         break;
+               case 'o':
+                       act |= ACT_OUTPUT;
+                       if (optarg != NULL)
+                               out_fd = open(optarg, O_WRONLY|O_CREAT, 0644);
+                       else
+                               out_fd = fileno(stdout);
+                       break;
                 case 'h':
                         usage();
                         break;
@@ -157,6 +169,11 @@ int main(int argc, char** argv)
                 return 1;
         }
 
+       if (act & ACT_OUTPUT && (!(act & ACT_READ) || out_fd < 0)) {
+               printf("-o not in read mode or cannot open the output file");
+               return 1;
+       }
+
         if (argc - optind < iovcnt) {
                 printf("Not enough parameters for iov size\n");
                 return 1;
@@ -189,17 +206,17 @@ int main(int argc, char** argv)
                 len += iv->iov_len;
         }
 
-        fd = open(fname, O_LARGEFILE | O_RDWR | O_CREAT | flags, 0644);
-        if (fd == -1) {
-                printf("Cannot open %s:%s\n", fname, strerror(errno));
-                return 1;
-        }
+       fd = open(fname, O_LARGEFILE | flags, 0644);
+       if (fd == -1) {
+               printf("Cannot open %s:%s\n", fname, strerror(errno));
+               return 1;
+       }
 
-        if ((act & ACT_SEEK) && (lseek64(fd, offset, SEEK_SET) < 0)) {
-                printf("Cannot seek %s\n", strerror(errno));
+       if ((act & ACT_SEEK) && (lseek64(fd, offset, SEEK_SET) < 0)) {
+               printf("Cannot seek %s\n", strerror(errno));
                rc = 1;
                goto out;
-        }
+       }
 
         if (act & ACT_WRITE) {
                 rc = writev(fd, iov, iovcnt);
@@ -223,11 +240,23 @@ int main(int argc, char** argv)
                        rc = 1;
                        goto out;
                }
+
+               if (act & ACT_OUTPUT) {
+                       rc = writev(out_fd, iov, iovcnt);
+                       if (rc != len) {
+                               printf("write error: %s rc = %d\n",
+                                      strerror(errno), rc);
+                               rc = 1;
+                               goto out;
+                       }
+               }
         }
 
         rc = 0;
 out:
         if (iov)
                 free(iov);
+       if (out_fd >= 0)
+               close(out_fd);
        return rc;
 }
index 101a5ff..8e069e1 100644 (file)
@@ -61,6 +61,35 @@ get_mirror_ids() {
        echo ${#mirror_array[@]}
 }
 
+drop_client_cache() {
+       echo 3 > /proc/sys/vm/drop_caches
+}
+
+stop_osts() {
+       local idx
+
+       for idx in "$@"; do
+               stop ost$idx
+       done
+
+       for idx in "$@"; do
+               wait_osc_import_state client ost$idx DISCONN
+       done
+}
+
+start_osts() {
+       local idx
+
+       for idx in "$@"; do
+               start ost$idx $(ostdevname $idx) $OST_MOUNT_OPTS ||
+                       error "start ost$idx failed"
+       done
+
+       for idx in "$@"; do
+               wait_osc_import_state client ost$idx FULL
+       done
+}
+
 # command line test cases
 test_1() {
        local tf=$DIR/$tfile
@@ -180,6 +209,20 @@ test_21() {
 }
 run_test 21 "glimpse should report accurate i_blocks"
 
+get_osc_lock_count() {
+       local lock_count=0
+
+       for idx in "$@"; do
+               local osc_name
+               local count
+
+               osc_name=${FSNAME}-OST$(printf "%04x" $((idx-1)))-osc-'ffff*'
+               count=$($LCTL get_param -n ldlm.namespaces.$osc_name.lock_count)
+               lock_count=$((lock_count + count))
+       done
+       echo $lock_count
+}
+
 test_22() {
        local tf=$DIR/$tfile
 
@@ -197,9 +240,7 @@ test_22() {
        local new_size_blocks=$(stat --format="%b %s" $tf)
 
        # make sure there is no lock cached
-       local lock_count=$($LCTL get_param -n \
-               ldlm.namespaces.${FSNAME}-OST0000-osc-ffff*.lock_count)
-       [ $lock_count -eq 0 ] || error "glimpse requests were sent"
+       [ $(get_osc_lock_count 1) -eq 0 ] || error "glimpse requests were sent"
 
        [ "$new_size_blocks" = "$size_blocks" ] ||
                echo "size expected: $size_blocks, actual: $new_size_blocks"
@@ -208,6 +249,226 @@ test_22() {
 }
 run_test 22 "no glimpse to OSTs for READ_ONLY files"
 
+test_31() {
+       local tf=$DIR/$tfile
+
+       $LFS setstripe -E EOF -o 0 $tf
+       $LFS setstripe --component-add --mirror -o 1 $tf
+
+       #define OBD_FAIL_GLIMPSE_IMMUTABLE 0x1A00
+       $LCTL set_param fail_loc=0x1A00
+
+       local ost_idx
+       for ((ost_idx = 1; ost_idx <= 2; ost_idx++)); do
+               cancel_lru_locks osc
+               stop_osts $ost_idx
+
+               local tmpfile=$(mktemp)
+               stat --format="%b %s" $tf > $tmpfile  &
+               local pid=$!
+
+               local cnt=0
+               while [ $cnt -le 5 ]; do
+                       kill -0 $pid > /dev/null 2>&1 || break
+                       sleep 1
+                       ((cnt += 1))
+               done
+               kill -0 $pid > /dev/null 2>&1 &&
+                       error "stat process stuck due to unavailable OSTs"
+
+               # make sure glimpse request has been sent
+               [ $(get_osc_lock_count 1 2) -ne 0 ] ||
+                       error "OST $ost_idx: no glimpse request was sent"
+
+               start_osts $ost_idx
+       done
+}
+run_test 31 "make sure glimpse request can be retried"
+
+test_32() {
+       [[ $OSTCOUNT -lt 2 ]] && skip "need >= 2 OSTs" && return
+       rm -f $DIR/$tfile $DIR/$tfile-2
+
+       $LFS setstripe -E EOF -o 0 $DIR/$tfile
+       dd if=/dev/urandom of=$DIR/$tfile bs=1M count=$((RANDOM % 10 + 2))
+
+       local fsize=$(stat -c %s $DIR/$tfile)
+       [[ $fsize -ne 0 ]] || error "file size is (wrongly) zero"
+
+       local cksum=$(md5sum $DIR/$tfile)
+
+       # create a new mirror in sync mode
+       $LFS setstripe --component-add --mirror -o 1 $DIR/$tfile
+
+       # make sure the mirrored file was created successfully
+       [ $(get_mirror_ids $DIR/$tfile) -eq 2 ] ||
+               { $LFS getstripe $DIR/$tfile; error "expected 2 mirrors"; }
+
+       drop_client_cache
+       stop_osts 1
+
+       # check size is correct, glimpse request should go to the 2nd mirror
+       $CHECKSTAT -t file -s $fsize $DIR/$tfile ||
+               error "file size error $fsize vs. $(stat -c %s $DIR/$tfile)"
+
+       echo "reading file from the 2nd mirror and verify checksum"
+       [[ "$cksum" == "$(md5sum $DIR/$tfile)" ]] ||
+               error "checksum error: expected $cksum"
+
+       start_osts 1
+}
+run_test 32 "data should be mirrored to newly created mirror"
+
+test_33() {
+       [[ $OSTCOUNT -lt 2 ]] && skip "need >= 2 OSTs" && return
+
+       rm -f $DIR/$tfile $DIR/$tfile-2
+
+       # create a file with two mirrors
+       $LFS setstripe -E EOF -o 0 $DIR/$tfile
+       local max_count=100
+       local count=0
+       while [ $count -lt $max_count ]; do
+               echo "ost1" >> $DIR/$tfile
+               count=$((count + 1));
+       done
+
+       # tmp file that will be used as mirror
+       $LFS setstripe -E EOF -o 1 $DIR/$tfile-2
+       count=0
+       while [ $count -lt $max_count ]; do
+               echo "ost2" >> $DIR/$tfile-2
+               count=$((count + 1));
+       done
+
+       # create a mirrored file
+       $LFS setstripe --component-add --mirror=$DIR/$tfile-2 $DIR/$tfile
+
+       # make sure that $tfile has two mirrors and $tfile-2 has no stripe
+       [ $(get_mirror_ids $DIR/$tfile) -eq 2 ] ||
+               { $LFS getstripe $DIR/$tfile; error "expected count 2"; }
+       $LFS getstripe $DIR/$tfile-2 | grep -q "no stripe info" ||
+               { $LFS getstripe $DIR/$tfile; error "expected no stripe"; }
+
+       # execpted file size
+       local fsize=$((5 * max_count))
+       $CHECKSTAT -t file -s $fsize $DIR/$tfile ||
+               error "mirrored file size is not $fsize"
+
+       # read file - all OSTs are available
+       echo "reading file (data should be provided by ost1)... "
+       local rs=$(cat $DIR/$tfile | head -1)
+       [[ "$rs" == "ost1" ]] ||
+               error "file content error: expected: \"ost1\", actual: \"$rs\""
+
+       # read file again with ost1 failed
+       stop_osts 1
+       drop_client_cache
+
+       echo "reading file (data should be provided by ost2)..."
+       local rs=$(cat $DIR/$tfile | head -1)
+       [[ "$rs" == "ost2" ]] ||
+               error "file content error: expected: \"ost2\", actual: \"$rs\""
+
+       # remount ost1
+       start_osts 1
+
+       # read file again with ost2 failed
+       $LCTL set_param ldlm.namespaces.lustre-*-osc-ffff*.lru_size=clear
+
+       fail ost2 &
+       sleep 1
+
+       # check size, glimpse should work
+       $CHECKSTAT -t file -s $fsize $DIR/$tfile ||
+               error "mirrored file size is not $fsize"
+
+       echo "reading file (data should be provided by ost1)..."
+       local rs=$(cat $DIR/$tfile | head -1)
+       [[ "$rs" == "ost1" ]] ||
+               error "file content error: expected: \"ost1\", actual: \"$rs\""
+
+       wait_osc_import_state client ost2 FULL
+}
+run_test 33 "read can choose available mirror to read"
+
+test_34a() {
+       [[ $OSTCOUNT -lt 4 ]] && skip "need >= 4 OSTs" && return
+
+       rm -f $DIR/$tfile $DIR/$tfile-2 $DIR/$tfile-ref
+
+       # reference file
+       $LFS setstripe -o 0 $DIR/$tfile-ref
+       dd if=/dev/urandom of=$DIR/$tfile-ref bs=1M count=3
+
+       # create a file with two mirrors
+       $LFS setstripe -E -1 -o 0,1 -S 1M $DIR/$tfile
+       dd if=$DIR/$tfile-ref of=$DIR/$tfile bs=1M
+
+       $LFS setstripe -E -1 -o 2,3 -S 1M $DIR/$tfile-2
+       dd if=$DIR/$tfile-ref of=$DIR/$tfile-2 bs=1M
+
+       $CHECKSTAT -t file -s $((3 * 1024 * 1024)) $DIR/$tfile ||
+               error "mirrored file size is not 3M"
+
+       # merge a mirrored file
+       $LFS setstripe --component-add --mirror=$DIR/$tfile-2 $DIR/$tfile
+
+       cancel_lru_locks osc
+
+       # stop two OSTs, so the 2nd stripe of the 1st mirror and
+       # the 1st stripe of the 2nd mirror will be inaccessible, ...
+       stop_osts 2 3
+
+       echo "comparing files ... "
+
+       # however, read can still return the correct data. It should return
+       # the 1st stripe from mirror 1 and 2st stripe from mirror 2.
+       cmp -n 2097152 <(rwv -f $DIR/$tfile -r -o -n 1 2097152) \
+               $DIR/$tfile-ref || error "file reading error"
+
+       start_osts 2 3
+}
+run_test 34a "read mirrored file with multiple stripes"
+
+test_34b() {
+       [[ $OSTCOUNT -lt 4 ]] && skip "need >= 4 OSTs" && return
+
+       rm -f $DIR/$tfile $DIR/$tfile-2 $DIR/$tfile-ref
+
+       # reference file
+       $LFS setstripe -o 0 $DIR/$tfile-ref
+       dd if=/dev/urandom of=$DIR/$tfile-ref bs=1M count=3
+
+       $LFS setstripe -E 1M -S 1M -o 0 -E eof -o 1 $DIR/$tfile
+       dd if=$DIR/$tfile-ref of=$DIR/$tfile bs=1M
+
+       $LFS setstripe -E 1M -S 1M -o 2 -E eof -o 3 $DIR/$tfile-2
+       dd if=$DIR/$tfile-ref of=$DIR/$tfile-2 bs=1M
+
+       $CHECKSTAT -t file -s $((3 * 1024 * 1024)) $DIR/$tfile ||
+               error "mirrored file size is not 3M"
+
+       # merge a mirrored file
+       $LFS setstripe --component-add --mirror=$DIR/$tfile-2 $DIR/$tfile
+
+       cancel_lru_locks osc
+
+       # stop two OSTs, so the 2nd component of the 1st mirror and
+       # the 1st component of the 2nd mirror will be inaccessible, ...
+       stop_osts 2 3
+
+       echo "comparing files ... "
+
+       # however, read can still return the correct data. It should return
+       # the 1st stripe from mirror 1 and 2st stripe from mirror 2.
+       cmp -n 2097152 <(rwv -f $DIR/$tfile -r -o -n 1 2097152) \
+               $DIR/$tfile-ref || error "file reading error"
+
+       start_osts 2 3
+}
+run_test 34b "read mirrored file with multiple components"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index 34d8580..6bc3f5d 100755 (executable)
@@ -6319,7 +6319,7 @@ convert_facet2label() {
 }
 
 get_clientosc_proc_path() {
-       echo "${1}-osc-*"
+       echo "${1}-osc-ffff*"
 }
 
 # If the 2.0 MDS was mounted on 1.8 device, then the OSC and LOV names