Whamcloud - gitweb
b=18857 enhance seq allocation scalability by updating seq data asynchronously.
authorpravin <Pravin.Shelar@sun.com>
Wed, 12 May 2010 14:25:24 +0000 (19:55 +0530)
committerRobert Read <robert.read@oracle.com>
Wed, 12 May 2010 19:04:51 +0000 (12:04 -0700)
this patch also removes seq replay. ref bug for details.

a=pravin,tappro
i=tappro
i=alexander.zarochentsev
i=pravin

lustre/fid/fid_handler.c
lustre/fid/fid_internal.h
lustre/fid/fid_request.c
lustre/fid/fid_store.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fid.h
lustre/include/obd_support.h
lustre/lmv/lmv_fld.c
lustre/mdt/mdt_handler.c
lustre/tests/replay-single.sh

index e85d0ef..b19ba65 100644 (file)
@@ -54,6 +54,7 @@
 
 #include <obd.h>
 #include <obd_class.h>
+#include <lu_target.h>
 #include <dt_object.h>
 #include <md_object.h>
 #include <obd_support.h>
@@ -100,6 +101,18 @@ out_up:
         return rc;
 }
 EXPORT_SYMBOL(seq_server_set_cli);
+/*
+ * allocate \a w units of sequence from range \a from.
+ */
+static inline void range_alloc(struct lu_seq_range *to,
+                               struct lu_seq_range *from,
+                               __u64 width)
+{
+        width = min(range_space(from), width);
+        to->lsr_start = from->lsr_start;
+        to->lsr_end = from->lsr_start + width;
+        from->lsr_start += width;
+}
 
 /**
  * On controller node, allocate new super sequence for regular sequence server.
@@ -109,67 +122,24 @@ EXPORT_SYMBOL(seq_server_set_cli);
  */
 
 static int __seq_server_alloc_super(struct lu_server_seq *seq,
-                                    struct lu_seq_range *in,
                                     struct lu_seq_range *out,
                                     const struct lu_env *env)
 {
         struct lu_seq_range *space = &seq->lss_space;
-        struct thandle *th;
-        __u64 mdt = out->lsr_mdt;
-        int rc, credit;
+        int rc;
         ENTRY;
 
         LASSERT(range_is_sane(space));
 
-        if (in != NULL) {
-                CDEBUG(D_INFO, "%s: Input seq range: "
-                       DRANGE"\n", seq->lss_name, PRANGE(in));
-
-                if (in->lsr_end > space->lsr_start)
-                        space->lsr_start = in->lsr_end;
-                *out = *in;
-
-                CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
-                       seq->lss_name, PRANGE(space));
+        if (range_is_exhausted(space)) {
+                CERROR("%s: Sequences space is exhausted\n",
+                       seq->lss_name);
+                RETURN(-ENOSPC);
         } else {
-                if (range_space(space) < seq->lss_width) {
-                        CWARN("%s: Sequences space to be exhausted soon. "
-                              "Only "LPU64" sequences left\n", seq->lss_name,
-                              range_space(space));
-                        *out = *space;
-                        space->lsr_start = space->lsr_end;
-                } else if (range_is_exhausted(space)) {
-                        CERROR("%s: Sequences space is exhausted\n",
-                               seq->lss_name);
-                        RETURN(-ENOSPC);
-                } else {
-                        range_alloc(out, space, seq->lss_width);
-                }
-        }
-        out->lsr_mdt = mdt;
-
-        credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS;
-
-        th = seq_store_trans_start(seq, env, credit);
-        if (IS_ERR(th))
-                RETURN(PTR_ERR(th));
-
-        rc = seq_store_write(seq, env, th);
-        if (rc) {
-                CERROR("%s: Can't write space data, rc %d\n",
-                       seq->lss_name, rc);
-                goto out;
-        }
-
-        rc = fld_server_create(seq->lss_site->ms_server_fld,
-                               env, out, th);
-        if (rc) {
-                CERROR("%s: Can't Update fld database, rc %d\n",
-                       seq->lss_name, rc);
+                range_alloc(out, space, seq->lss_width);
         }
 
-out:
-        seq_store_trans_stop(seq, env, th);
+        rc = seq_store_update(env, seq, out, 1 /* sync */);
 
         CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
                DRANGE"\n", seq->lss_name, rc, PRANGE(out));
@@ -178,7 +148,6 @@ out:
 }
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
-                           struct lu_seq_range *in,
                            struct lu_seq_range *out,
                            const struct lu_env *env)
 {
@@ -186,145 +155,137 @@ int seq_server_alloc_super(struct lu_server_seq *seq,
         ENTRY;
 
         cfs_down(&seq->lss_sem);
-        rc = __seq_server_alloc_super(seq, in, out, env);
+        rc = __seq_server_alloc_super(seq, out, env);
         cfs_up(&seq->lss_sem);
 
         RETURN(rc);
 }
 
+static int __seq_set_init(const struct lu_env *env,
+                            struct lu_server_seq *seq)
+{
+        struct lu_seq_range *space = &seq->lss_space;
+        int rc;
+
+        range_alloc(&seq->lss_lowater_set, space, seq->lss_set_width);
+        range_alloc(&seq->lss_hiwater_set, space, seq->lss_set_width);
+
+        rc = seq_store_update(env, seq, NULL, 1);
+        seq->lss_set_transno = 0;
+
+        return rc;
+}
+
+/*
+ * This function implements new seq allocation algorithm using async
+ * updates to seq file on disk. ref bug 18857 for details.
+ * there are four variable to keep track of this process
+ *
+ * lss_space; - available lss_space
+ * lss_lowater_set; - lu_seq_range for all seqs before barrier, i.e. safe to use
+ * lss_hiwater_set; - lu_seq_range after barrier, i.e. allocated but may be
+ *                    not yet committed
+ *
+ * when lss_lowater_set reaches the end it is replaced with hiwater one and
+ * a write operation is initiated to allocate new hiwater range.
+ * if last seq write opearion is still not commited, current operation is
+ * flaged as sync write op.
+ */
+static int range_alloc_set(const struct lu_env *env,
+                            struct lu_seq_range *out,
+                            struct lu_server_seq *seq)
+{
+        struct lu_seq_range *space = &seq->lss_space;
+        struct lu_seq_range *loset = &seq->lss_lowater_set;
+        struct lu_seq_range *hiset = &seq->lss_hiwater_set;
+        int rc = 0;
+
+        if (range_is_zero(loset))
+                __seq_set_init(env, seq);
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_ALLOC)) /* exhaust set */
+                loset->lsr_start = loset->lsr_end;
+
+        if (range_is_exhausted(loset)) {
+                /* reached high water mark. */
+                struct lu_device *dev = seq->lss_site->ms_lu.ls_top_dev;
+                struct lu_target *tg = dev->ld_obd->u.obt.obt_lut;
+                int obd_num_clients = dev->ld_obd->obd_num_exports;
+                __u64 set_sz;
+                int sync = 0;
+
+                /* calculate new seq width based on number of clients */
+                set_sz = max(seq->lss_set_width,
+                               obd_num_clients * seq->lss_width);
+                set_sz = min(range_space(space), set_sz);
+
+                /* Switch to hiwater range now */
+                loset = hiset;
+                /* allocate new hiwater range */
+                range_alloc(hiset, space, set_sz);
+
+                if (seq->lss_set_transno > dev->ld_obd->obd_last_committed)
+                        sync = 1;
+
+                /* update ondisk seq with new *space */
+                rc = seq_store_update(env, seq, NULL, sync);
+
+                /* set new hiwater transno */
+                cfs_spin_lock(&tg->lut_translock);
+                seq->lss_set_transno = tg->lut_last_transno;
+                cfs_spin_unlock(&tg->lut_translock);
+        }
+
+        LASSERTF(!range_is_exhausted(loset) || range_is_sane(loset),
+                 DRANGE"\n", PRANGE(loset));
+
+        if (rc == 0)
+                range_alloc(out, loset, seq->lss_width);
+
+        RETURN(rc);
+}
+
 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
-                                   struct lu_seq_range *in,
                                    struct lu_seq_range *out,
                                    const struct lu_env *env)
 {
         struct lu_seq_range *space = &seq->lss_space;
-        struct thandle *th;
         int rc = 0;
 
         ENTRY;
 
         LASSERT(range_is_sane(space));
 
-        /*
-         * This is recovery case. Adjust super range if input range looks like
-         * it is allocated from new super.
-         */
-        if (in != NULL) {
-                CDEBUG(D_INFO, "%s: Input seq range: "
-                       DRANGE"\n", seq->lss_name, PRANGE(in));
-
-                if (in->lsr_end <= space->lsr_start) {
-                        /*
-                         * Client is replaying a fairly old range, server
-                         * don't need to do any allocation.
-                         */
-                } else if (range_is_exhausted(space)) {
-                        /*
-                         * Start is set to end of last allocated, because it
-                         * *is* already allocated so we take that into account
-                         * and do not use for other allocations.
-                         */
-                        space->lsr_start = in->lsr_end;
-
-                        /*
-                         * End is set to in->lsr_start + super sequence
-                         * allocation unit. That is because in->lsr_start is
-                         * first seq in new allocated range from controller
-                         * before failure.
-                         */
-                        space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH;
-
-                        if (!seq->lss_cli) {
-                                CERROR("%s: No sequence controller "
-                                       "is attached.\n", seq->lss_name);
-                                RETURN(-ENODEV);
-                        }
-
-                        /*
-                         * Let controller know that this is recovery and last
-                         * obtained range from it was @space.
-                         */
-                        rc = seq_client_replay_super(seq->lss_cli, space, env);
-
-                        if (rc) {
-                                CERROR("%s: Can't replay super-sequence, "
-                                       "rc %d\n", seq->lss_name, rc);
-                                RETURN(rc);
-                        }
-                } else {
-                        /*
-                         * Update super start by end from client's range. Super
-                         * end should not be changed if range was not exhausted.
-                         */
-                        space->lsr_start = in->lsr_end;
+        /* Check if available space ends and allocate new super seq */
+        if (range_is_exhausted(space)) {
+                if (!seq->lss_cli) {
+                        CERROR("%s: No sequence controller is attached.\n",
+                               seq->lss_name);
+                        RETURN(-ENODEV);
                 }
 
-                /* sending replay_super to update fld as only super sequence
-                 * server can update fld.
-                 * we are sending meta sequence to fld rather than super
-                 * sequence, but fld server can handle range merging. */
-
-                in->lsr_mdt = space->lsr_mdt;
-                rc = seq_client_replay_super(seq->lss_cli, in, env);
-
+                rc = seq_client_alloc_super(seq->lss_cli, env);
                 if (rc) {
-                        CERROR("%s: Can't replay super-sequence, "
-                                        "rc %d\n", seq->lss_name, rc);
+                        CERROR("%s: Can't allocate super-sequence, rc %d\n",
+                               seq->lss_name, rc);
                         RETURN(rc);
                 }
 
-                *out = *in;
-
-                CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
-                       seq->lss_name, PRANGE(space));
-        } else {
-                /*
-                 * XXX: Avoid cascading RPCs using kind of async preallocation
-                 * when meta-sequence is close to exhausting.
-                 */
-                if (range_is_exhausted(space)) {
-                        if (!seq->lss_cli) {
-                                CERROR("%s: No sequence controller "
-                                       "is attached.\n", seq->lss_name);
-                                RETURN(-ENODEV);
-                        }
-
-                        rc = seq_client_alloc_super(seq->lss_cli, env);
-                        if (rc) {
-                                CERROR("%s: Can't allocate super-sequence, "
-                                       "rc %d\n", seq->lss_name, rc);
-                                RETURN(rc);
-                        }
-
-                        /* Saving new range to allocation space. */
-                        *space = seq->lss_cli->lcs_space;
-                        LASSERT(range_is_sane(space));
-                }
-
-                range_alloc(out, space, seq->lss_width);
-        }
-
-        th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
-        if (IS_ERR(th))
-                RETURN(PTR_ERR(th));
-
-        rc = seq_store_write(seq, env, th);
-        if (rc) {
-                CERROR("%s: Can't write space data, rc %d\n",
-                      seq->lss_name, rc);
+                /* Saving new range to allocation space. */
+                *space = seq->lss_cli->lcs_space;
+                LASSERT(range_is_sane(space));
         }
 
+        rc = range_alloc_set(env, out, seq);
         if (rc == 0) {
                 CDEBUG(D_INFO, "%s: Allocated meta-sequence "
                        DRANGE"\n", seq->lss_name, PRANGE(out));
         }
 
-        seq_store_trans_stop(seq, env, th);
         RETURN(rc);
 }
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
-                          struct lu_seq_range *in,
                           struct lu_seq_range *out,
                           const struct lu_env *env)
 {
@@ -332,7 +293,7 @@ int seq_server_alloc_meta(struct lu_server_seq *seq,
         ENTRY;
 
         cfs_down(&seq->lss_sem);
-        rc = __seq_server_alloc_meta(seq, in, out, env);
+        rc = __seq_server_alloc_meta(seq, out, env);
         cfs_up(&seq->lss_sem);
 
         RETURN(rc);
@@ -341,8 +302,7 @@ EXPORT_SYMBOL(seq_server_alloc_meta);
 
 static int seq_server_handle(struct lu_site *site,
                              const struct lu_env *env,
-                             __u32 opc, struct lu_seq_range *in,
-                             struct lu_seq_range *out)
+                             __u32 opc, struct lu_seq_range *out)
 {
         int rc;
         struct md_site *mite;
@@ -356,8 +316,7 @@ static int seq_server_handle(struct lu_site *site,
                                "initialized\n");
                         RETURN(-EINVAL);
                 }
-                rc = seq_server_alloc_meta(mite->ms_server_seq,
-                                           in, out, env);
+                rc = seq_server_alloc_meta(mite->ms_server_seq, out, env);
                 break;
         case SEQ_ALLOC_SUPER:
                 if (!mite->ms_control_seq) {
@@ -365,8 +324,7 @@ static int seq_server_handle(struct lu_site *site,
                                "initialized\n");
                         RETURN(-EINVAL);
                 }
-                rc = seq_server_alloc_super(mite->ms_control_seq,
-                                            in, out, env);
+                rc = seq_server_alloc_super(mite->ms_control_seq, out, env);
                 break;
         default:
                 rc = -EINVAL;
@@ -380,12 +338,13 @@ static int seq_req_handle(struct ptlrpc_request *req,
                           const struct lu_env *env,
                           struct seq_thread_info *info)
 {
-        struct lu_seq_range *out, *in = NULL, *tmp;
+        struct lu_seq_range *out, *tmp;
         struct lu_site *site;
         int rc = -EPROTO;
         __u32 *opc;
         ENTRY;
 
+       LASSERT(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY));
         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
         LASSERT(site != NULL);
                        
@@ -401,20 +360,11 @@ static int seq_req_handle(struct ptlrpc_request *req,
 
                 tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
 
-                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
-                        in = tmp;
-
-                        if (range_is_zero(in) || !range_is_sane(in)) {
-                                CERROR("Replayed seq range is invalid: "
-                                       DRANGE"\n", PRANGE(in));
-                                RETURN(err_serious(-EINVAL));
-                        }
-                }
                 /* seq client passed mdt id, we need to pass that using out
                  * range parameter */
 
                 out->lsr_mdt = tmp->lsr_mdt;
-                rc = seq_server_handle(site, env, *opc, in, out);
+                rc = seq_server_handle(site, env, *opc, out);
         } else
                 rc = err_serious(-EPROTO);
 
@@ -455,6 +405,9 @@ static int seq_handle(struct ptlrpc_request *req)
 
         seq_thread_info_init(req, info);
         rc = seq_req_handle(req, env, info);
+        /* XXX: we don't need replay but MDT assign transno in any case,
+         * remove it manually before reply*/
+        lustre_msg_set_transno(req->rq_repmsg, 0);
         seq_thread_info_fini(info);
 
         return rc;
@@ -522,6 +475,7 @@ static void seq_server_proc_fini(struct lu_server_seq *seq)
 }
 #endif
 
+
 int seq_server_init(struct lu_server_seq *seq,
                     struct dt_device *dev,
                     const char *prefix,
@@ -529,17 +483,21 @@ int seq_server_init(struct lu_server_seq *seq,
                     struct md_site *ms,
                     const struct lu_env *env)
 {
-        struct thandle *th;
         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
         ENTRY;
 
-       LASSERT(dev != NULL);
+        LASSERT(dev != NULL);
         LASSERT(prefix != NULL);
 
         seq->lss_cli = NULL;
         seq->lss_type = type;
         seq->lss_site = ms;
         range_init(&seq->lss_space);
+
+        range_init(&seq->lss_lowater_set);
+        range_init(&seq->lss_hiwater_set);
+        seq->lss_set_width = LUSTRE_SEQ_BATCH_WIDTH;
+
         cfs_sema_init(&seq->lss_sem, 1);
 
         seq->lss_width = is_srv ?
@@ -565,22 +523,16 @@ int seq_server_init(struct lu_server_seq *seq,
                        "on store. Initialize space\n",
                        seq->lss_name);
 
-                th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
-                if (IS_ERR(th))
-                        RETURN(PTR_ERR(th));
-
-                /* Save default controller value to store. */
-                rc = seq_store_write(seq, env, th);
+                rc = seq_store_update(env, seq, NULL, 0);
                 if (rc) {
                         CERROR("%s: Can't write space data, "
                                "rc %d\n", seq->lss_name, rc);
                 }
-                seq_store_trans_stop(seq, env, th);
         } else if (rc) {
-               CERROR("%s: Can't read space data, rc %d\n",
-                      seq->lss_name, rc);
-               GOTO(out, rc);
-       }
+                CERROR("%s: Can't read space data, rc %d\n",
+                       seq->lss_name, rc);
+                GOTO(out, rc);
+        }
 
         if (is_srv) {
                 LASSERT(range_is_sane(&seq->lss_space));
@@ -591,13 +543,13 @@ int seq_server_init(struct lu_server_seq *seq,
 
         rc  = seq_server_proc_init(seq);
         if (rc)
-               GOTO(out, rc);
+                GOTO(out, rc);
 
-       EXIT;
+        EXIT;
 out:
-       if (rc)
-               seq_server_fini(seq, env);
-       return rc;
+        if (rc)
+                seq_server_fini(seq, env);
+        return rc;
 }
 EXPORT_SYMBOL(seq_server_init);
 
index 03c5227..428044c 100644 (file)
@@ -75,19 +75,11 @@ int seq_store_init(struct lu_server_seq *seq,
 void seq_store_fini(struct lu_server_seq *seq,
                     const struct lu_env *env);
 
-int seq_store_write(struct lu_server_seq *seq,
-                    const struct lu_env *env,
-                    struct thandle *th);
-
 int seq_store_read(struct lu_server_seq *seq,
                    const struct lu_env *env);
 
-struct thandle * seq_store_trans_start(struct lu_server_seq *seq,
-                                       const struct lu_env *env,
-                                       int credits);
-void seq_store_trans_stop(struct lu_server_seq *seq,
-                          const struct lu_env *env,
-                          struct thandle *th);
+int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
+                     struct lu_seq_range *out, int sync);
 
 #ifdef LPROCFS
 extern struct lprocfs_vars seq_server_proc_list[];
index 62d7349..7b5d545 100644 (file)
 #include <lustre_mdc.h>
 #include "fid_internal.h"
 
-static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *input,
+static int seq_client_rpc(struct lu_client_seq *seq,
                           struct lu_seq_range *output, __u32 opc,
                           const char *opcname)
 {
         struct obd_export     *exp = seq->lcs_exp;
         struct ptlrpc_request *req;
-        struct lu_seq_range       *out, *in;
+        struct lu_seq_range   *out, *in;
         __u32                 *op;
         int                    rc;
         ENTRY;
@@ -85,10 +85,7 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *input,
 
         /* Zero out input range, this is not recovery yet. */
         in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
-        if (input != NULL)
-                *in = *input;
-        else
-                range_init(in);
+        range_init(in);
 
         ptlrpc_request_set_replen(req);
 
@@ -126,7 +123,6 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *input,
                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
                 GOTO(out_req, rc = -EINVAL);
         }
-        *in = *out;
 
         CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
                seq->lcs_name, opcname, PRANGE(output));
@@ -138,9 +134,8 @@ out_req:
 }
 
 /* Request sequence-controller node to allocate new super-sequence. */
-int seq_client_replay_super(struct lu_client_seq *seq,
-                            struct lu_seq_range *range,
-                            const struct lu_env *env)
+int seq_client_alloc_super(struct lu_client_seq *seq,
+                           const struct lu_env *env)
 {
         int rc;
         ENTRY;
@@ -150,11 +145,11 @@ int seq_client_replay_super(struct lu_client_seq *seq,
 #ifdef __KERNEL__
         if (seq->lcs_srv) {
                 LASSERT(env != NULL);
-                rc = seq_server_alloc_super(seq->lcs_srv, range,
-                                            &seq->lcs_space, env);
+                rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space,
+                                            env);
         } else {
 #endif
-                rc = seq_client_rpc(seq, range, &seq->lcs_space,
+                rc = seq_client_rpc(seq, &seq->lcs_space,
                                     SEQ_ALLOC_SUPER, "super");
 #ifdef __KERNEL__
         }
@@ -163,14 +158,6 @@ int seq_client_replay_super(struct lu_client_seq *seq,
         RETURN(rc);
 }
 
-/* Request sequence-controller node to allocate new super-sequence. */
-int seq_client_alloc_super(struct lu_client_seq *seq,
-                           const struct lu_env *env)
-{
-        ENTRY;
-        RETURN(seq_client_replay_super(seq, NULL, env));
-}
-
 /* Request sequence-controller node to allocate new meta-sequence. */
 static int seq_client_alloc_meta(struct lu_client_seq *seq,
                                  const struct lu_env *env)
@@ -181,11 +168,10 @@ static int seq_client_alloc_meta(struct lu_client_seq *seq,
 #ifdef __KERNEL__
         if (seq->lcs_srv) {
                 LASSERT(env != NULL);
-                rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
-                                           &seq->lcs_space, env);
+                rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env);
         } else {
 #endif
-                rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
+                rc = seq_client_rpc(seq, &seq->lcs_space,
                                     SEQ_ALLOC_META, "meta");
 #ifdef __KERNEL__
         }
index aea967b..feb34ef 100644 (file)
@@ -73,8 +73,9 @@ static struct lu_buf *seq_store_buf(struct seq_thread_info *info)
         return buf;
 }
 
-struct thandle * seq_store_trans_start(struct lu_server_seq *seq,
-                                       const struct lu_env *env, int credit)
+struct thandle *seq_store_trans_start(struct lu_server_seq *seq,
+                                      const struct lu_env *env, int credit,
+                                      int sync)
 {
         struct seq_thread_info *info;
         struct dt_device *dt_dev;
@@ -86,6 +87,8 @@ struct thandle * seq_store_trans_start(struct lu_server_seq *seq,
         LASSERT(info != NULL);
 
         txn_param_init(&info->sti_txn, credit);
+        if (sync)
+                txn_param_sync(&info->sti_txn);
 
         th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn);
         return th;
@@ -137,6 +140,37 @@ int seq_store_write(struct lu_server_seq *seq,
         RETURN(rc);
 }
 
+int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
+                     struct lu_seq_range *out, int sync)
+{
+        struct thandle *th;
+        int rc;
+        int credits = SEQ_TXN_STORE_CREDITS;
+
+        if (out != NULL)
+                credits += FLD_TXN_INDEX_INSERT_CREDITS;
+
+        th = seq_store_trans_start(seq, env, credits, sync);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
+
+        rc = seq_store_write(seq, env, th);
+        if (rc) {
+                CERROR("%s: Can't write space data, rc %d\n",
+                       seq->lss_name, rc);
+        } else if (out != NULL) {
+                rc = fld_server_create(seq->lss_site->ms_server_fld,
+                                       env, out, th);
+                if (rc) {
+                        CERROR("%s: Can't Update fld database, rc %d\n",
+                               seq->lss_name, rc);
+                }
+        }
+
+        seq_store_trans_stop(seq, env, th);
+        return rc;
+}
+
 /*
  * This function implies that caller takes care about locking or locking is not
  * needed (init time).
index f331ec5..3a1581f 100644 (file)
@@ -216,18 +216,6 @@ static inline int range_within(const struct lu_seq_range *range,
         return s >= range->lsr_start && s < range->lsr_end;
 }
 
-/**
- * allocate \a w units of sequence from range \a from.
- */
-static inline void range_alloc(struct lu_seq_range *to,
-                               struct lu_seq_range *from,
-                               __u64 width)
-{
-        to->lsr_start = from->lsr_start;
-        to->lsr_end = from->lsr_start + width;
-        from->lsr_start += width;
-}
-
 static inline int range_is_sane(const struct lu_seq_range *range)
 {
         return (range->lsr_end >= range->lsr_start);
index 6f60da5..d00b9af 100644 (file)
@@ -79,6 +79,11 @@ enum {
         /* changed to 16 to avoid overflow in test11 */
         LUSTRE_SEQ_META_WIDTH = 0x0000000000000010ULL,
 
+         /*
+          * seq allocation pool size.
+          */
+        LUSTRE_SEQ_BATCH_WIDTH = LUSTRE_SEQ_META_WIDTH * 1000,
+
         /*
          * This is how many sequences may be in one super-sequence allocated to
          * MDTs.
@@ -183,6 +188,10 @@ struct lu_server_seq {
         /* Available sequences space */
         struct lu_seq_range         lss_space;
 
+        /* keeps highwater in lsr_end for seq allocation algorithm */
+        struct lu_seq_range         lss_lowater_set;
+        struct lu_seq_range         lss_hiwater_set;
+
         /*
          * Device for server side seq manager needs (saving sequences to backing
          * store).
@@ -216,6 +225,14 @@ struct lu_server_seq {
          */
         __u64                   lss_width;
 
+        /*
+         * minimum lss_alloc_set size that should be allocated from
+         * lss_space
+         */
+        __u64                   lss_set_width;
+
+        /* transaction no of seq update write operation */
+        __u64                   lss_set_transno;
         /**
          * Pointer to site object, required to access site fld.
          */
@@ -236,12 +253,10 @@ void seq_server_fini(struct lu_server_seq *seq,
                      const struct lu_env *env);
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
-                           struct lu_seq_range *in,
                            struct lu_seq_range *out,
                            const struct lu_env *env);
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
-                          struct lu_seq_range *in,
                           struct lu_seq_range *out,
                           const struct lu_env *env);
 
index 591e098..fc51ca0 100644 (file)
@@ -407,7 +407,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LLOG_ORIGIN_HANDLE_WRITE_REC_NET   0x1307
 #define OBD_FAIL_LLOG_ORIGIN_HANDLE_CLOSE_NET       0x1308
 #define OBD_FAIL_LLOG_CATINFO_NET                   0x1309
-#define OBD_FAIL_MDS_SYNC_CAPA_SL                    0x1310
+#define OBD_FAIL_MDS_SYNC_CAPA_SL                   0x1310
+#define OBD_FAIL_SEQ_ALLOC                          0x1311
 
 /* Failure injection control */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
index 8f4f94c..064031c 100644 (file)
@@ -70,7 +70,7 @@ int lmv_fld_lookup(struct lmv_obd *lmv,
         LASSERT(fid_is_sane(fid));
         rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), mds, NULL);
         if (rc) {
-                CERROR("Error while looking for mds number. Seq "LPU64
+                CERROR("Error while looking for mds number. Seq "LPX64
                        ", err = %d\n", fid_seq(fid), rc);
                 RETURN(rc);
         }
index bce519f..6c63afd 100644 (file)
@@ -3548,25 +3548,6 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
         RETURN(rc);
 }
 
-/*
- * Seq wrappers
- */
-static void mdt_seq_adjust(const struct lu_env *env,
-                          struct mdt_device *m, int lost)
-{
-        struct md_site *ms = mdt_md_site(m);
-        struct lu_seq_range out;
-        ENTRY;
-
-        LASSERT(ms && ms->ms_server_seq);
-        LASSERT(lost >= 0);
-        /* get extra seq from seq_server, moving it's range up */
-        while (lost-- > 0) {
-                seq_server_alloc_meta(ms->ms_server_seq, NULL, &out, env);
-        }
-        EXIT;
-}
-
 static int mdt_seq_fini(const struct lu_env *env,
                         struct mdt_device *m)
 {
@@ -5543,12 +5524,8 @@ int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
 #ifdef HAVE_QUOTA_SUPPORT
         struct md_device *next = mdt->mdt_child;
 #endif
-        int rc, lost;
+        int rc;
         ENTRY;
-        /* if some clients didn't participate in recovery then we can possibly
-         * lost sequence. Now we should increase sequence for safe value */
-        lost = obd->obd_max_recoverable_clients - obd->obd_connected_clients;
-        mdt_seq_adjust(env, mdt, lost);
 
         rc = ld->ld_ops->ldo_recovery_complete(env, ld);
 #ifdef HAVE_QUOTA_SUPPORT
index b9310f1..8ca4206 100755 (executable)
@@ -718,30 +718,30 @@ test_32() {
 }
 run_test 32 "close() notices client eviction; close() after client eviction"
 
-# Abort recovery before client complete
-test_33a() {   # was test_33
-    replay_barrier $SINGLEMDS
-    createmany -o $DIR/$tfile-%d 100
+test_33a() {
+    createmany -o $DIR/$tfile-%d 10
+    replay_barrier_nosync $SINGLEMDS
     fail_abort $SINGLEMDS
-    # this file should be gone, because the replay was aborted
-    $CHECKSTAT -t file $DIR/$tfile-* && return 3
-    unlinkmany $DIR/$tfile-%d 0 100
+    # recreate shouldn't fail
+    createmany -o $DIR/$tfile--%d 10 || return 1
+    rm $DIR/$tfile-* -f
     return 0
 }
-run_test 33a "abort recovery before client does replay"
+run_test 33a "fid seq shouldn't be reused after abort recovery"
+
+test_33b() {
+    #define OBD_FAIL_SEQ_ALLOC                          0x1311
+    do_facet $SINGLEMDS "lctl set_param fail_loc=0x1311"
 
-# Stale FID sequence bug 15962
-test_33b() {   # was test_33a
-    replay_barrier $SINGLEMDS
     createmany -o $DIR/$tfile-%d 10
+    replay_barrier_nosync $SINGLEMDS
     fail_abort $SINGLEMDS
-    unlinkmany $DIR/$tfile-%d 0 10
     # recreate shouldn't fail
-    createmany -o $DIR/$tfile-%d 10 || return 3
-    unlinkmany $DIR/$tfile-%d 0 10
+    createmany -o $DIR/$tfile--%d 10 || return 1
+    rm $DIR/$tfile-* -f
     return 0
 }
-run_test 33b "fid shouldn't be reused after abort recovery"
+run_test 33b "test fid seq allocation"
 
 test_34() {
     multiop_bg_pause $DIR/$tfile O_c || return 2