Whamcloud - gitweb
- fixed replay of meta-seq alloc whereas super-seq replay is also needed if ctl and...
authoryury <yury>
Sat, 14 Oct 2006 12:49:32 +0000 (12:49 +0000)
committeryury <yury>
Sat, 14 Oct 2006 12:49:32 +0000 (12:49 +0000)
lustre/fid/fid_handler.c
lustre/fid/fid_request.c
lustre/include/lustre_fid.h
lustre/llite/namei.c
lustre/tests/cfg/lmv.sh

index ea44aed..ffb2f02 100644 (file)
@@ -196,6 +196,23 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                          * before failure.
                          */
                         space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
+
+                        if (!seq->lss_cli) {
+                                CERROR("%s: No sequence controller "
+                                       "is attached.\n", seq->lss_name);
+                                RETURN(-ENODEV);
+                        }
+
+                        /* 
+                         * Let controller know that this is recovery and last
+                         * obtained range from it was @space.
+                         */
+                        rc = seq_client_replay_super(seq->lss_cli, space, env);
+                        if (rc) {
+                                CERROR("%s: Can't replay super-sequence, "
+                                       "rc %d\n", seq->lss_name, rc);
+                                RETURN(rc);
+                        }
                 } else {
                         /*
                          * Update super start by end from client's range. Super
@@ -499,21 +516,21 @@ int seq_server_init(struct lu_server_seq *seq,
         /* Request backing store for saved sequence info. */
         rc = seq_store_read(seq, env);
         if (rc == -ENODATA) {
-                CDEBUG(D_INFO|D_WARNING, "%s: No data found "
-                       "on storage\n", seq->lss_name);
-
+                
                 /* Nothing is read, init by default value. */
                 seq->lss_space = is_srv ?
                         LUSTRE_SEQ_ZERO_RANGE:
                         LUSTRE_SEQ_SPACE_RANGE;
 
-                if (!is_srv) {
-                        /* Save default controller value to store. */
-                        rc = seq_store_write(seq, env);
-                        if (rc) {
-                                CERROR("%s: Can't write space data, "
-                                       "rc %d\n", seq->lss_name, rc);
-                        }
+                CDEBUG(D_INFO|D_WARNING, "%s: No data found "
+                       "on store. Initialize space\n",
+                       seq->lss_name);
+
+                /* Save default controller value to store. */
+                rc = seq_store_write(seq, env);
+                if (rc) {
+                        CERROR("%s: Can't write space data, "
+                               "rc %d\n", seq->lss_name, rc);
                 }
         } else if (rc) {
                CERROR("%s: Can't read space data, rc %d\n",
index 2e62ea5..180d8fe 100644 (file)
@@ -50,7 +50,8 @@
 #include "fid_internal.h"
 
 static int seq_client_rpc(struct lu_client_seq *seq,
-                          struct lu_range *space,
+                          struct lu_range *input,
+                          struct lu_range *output,
                           __u32 opc, const char *opcname)
 {
         int rc, size[3] = { sizeof(struct ptlrpc_body),
@@ -73,13 +74,16 @@ static int seq_client_rpc(struct lu_client_seq *seq,
         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
         req_capsule_set(&pill, &RQF_SEQ_QUERY);
 
-        /* init operation code */
+        /* Init operation code */
         op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
         *op = opc;
 
-        /* zero out input range, this is not recovery yet. */
+        /* Zero out input range, this is not recovery yet. */
         in = req_capsule_client_get(&pill, &RMF_SEQ_RANGE);
-        range_zero(in);
+        if (input != NULL)
+                *in = *input;
+        else
+                range_zero(in);
 
         size[1] = sizeof(struct lu_range);
         ptlrpc_req_set_repsize(req, 2, size);
@@ -100,25 +104,28 @@ static int seq_client_rpc(struct lu_client_seq *seq,
                 GOTO(out_req, rc);
 
         out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
-        *space = *out;
+        *output = *out;
 
-        if (!range_is_sane(space)) {
+        if (!range_is_sane(output)) {
                 CERROR("%s: Invalid range received from server: "
-                       DRANGE"\n", seq->lcs_name, PRANGE(space));
+                       DRANGE"\n", seq->lcs_name, PRANGE(output));
                 GOTO(out_req, rc = -EINVAL);
         }
 
-        if (range_is_exhausted(space)) {
+        if (range_is_exhausted(output)) {
                 CERROR("%s: Range received from server is exhausted: "
-                       DRANGE"]\n", seq->lcs_name, PRANGE(space));
+                       DRANGE"]\n", seq->lcs_name, PRANGE(output));
                 GOTO(out_req, rc = -EINVAL);
         }
 
-        /* Save server out to request for recovery case. */
+        /* 
+         * Save server response to request for recovery case, it will be sent to
+         * server later if needed.
+         */
         *in = *out;
 
         CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
-               seq->lcs_name, opcname, PRANGE(space));
+               seq->lcs_name, opcname, PRANGE(output));
 
         EXIT;
 out_req:
@@ -127,7 +134,7 @@ out_req:
         return rc;
 }
 
-/* request sequence-controller node to allocate new super-sequence. */
+/* Request sequence-controller node to allocate new super-sequence. */
 static int __seq_client_alloc_super(struct lu_client_seq *seq,
                                     const struct lu_env *env)
 {
@@ -137,11 +144,10 @@ static int __seq_client_alloc_super(struct lu_client_seq *seq,
         if (seq->lcs_srv) {
                 LASSERT(env != NULL);
                 rc = seq_server_alloc_super(seq->lcs_srv, NULL,
-                                            &seq->lcs_space,
-                                            env);
+                                            &seq->lcs_space, env);
         } else {
 #endif
-                rc = seq_client_rpc(seq, &seq->lcs_space,
+                rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
                                     SEQ_ALLOC_SUPER, "super");
 #ifdef __KERNEL__
         }
@@ -163,7 +169,50 @@ int seq_client_alloc_super(struct lu_client_seq *seq,
 }
 EXPORT_SYMBOL(seq_client_alloc_super);
 
-/* request sequence-controller node to allocate new meta-sequence. */
+/* Request sequence-controller node to allocate new super-sequence. */
+static int __seq_client_replay_super(struct lu_client_seq *seq,
+                                     struct lu_range *range,
+                                     const struct lu_env *env)
+{
+        int rc = 0;
+
+#ifdef __KERNEL__
+        if (seq->lcs_srv) {
+                LASSERT(env != NULL);
+                rc = seq_server_alloc_super(seq->lcs_srv, range,
+                                            &seq->lcs_space, env);
+        } else {
+#endif
+#if 0
+                /* 
+                 * XXX: Seems we do not need to replay in case of remote
+                 * controller. Lustre anyway supports onlu signle failure
+                 * recovery.
+                 */
+                rc = seq_client_rpc(seq, range, &seq->lcs_space,
+                                    SEQ_ALLOC_SUPER, "super");
+#endif
+#ifdef __KERNEL__
+        }
+#endif
+        return rc;
+}
+
+int seq_client_replay_super(struct lu_client_seq *seq,
+                            struct lu_range *range,
+                            const struct lu_env *env)
+{
+        int rc;
+        ENTRY;
+
+        down(&seq->lcs_sem);
+        rc = __seq_client_replay_super(seq, range, env);
+        up(&seq->lcs_sem);
+
+        RETURN(rc);
+}
+
+/* Request sequence-controller node to allocate new meta-sequence. */
 static int __seq_client_alloc_meta(struct lu_client_seq *seq,
                                    const struct lu_env *env)
 {
@@ -177,7 +226,7 @@ static int __seq_client_alloc_meta(struct lu_client_seq *seq,
                                            env);
         } else {
 #endif
-                rc = seq_client_rpc(seq, &seq->lcs_space,
+                rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
                                     SEQ_ALLOC_META, "meta");
 #ifdef __KERNEL__
         }
@@ -270,7 +319,6 @@ int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
                         GOTO(out, rc);
                 }
 
-                /* init new fid */
                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
                 seq->lcs_fid.f_seq = seqnr;
                 seq->lcs_fid.f_ver = 0;
@@ -281,8 +329,8 @@ int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
                  */
                 rc = 1;
 
-                CDEBUG(D_INFO|D_WARNING, "%s: Switch to sequence ["LPX64"]\n",
-                       seq->lcs_name, seqnr);
+                CDEBUG(D_INFO|D_WARNING, "%s: Switch to sequence "
+                       "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
         } else {
                 seq->lcs_fid.f_oid++;
                 rc = 0;
@@ -291,7 +339,8 @@ int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
         *fid = seq->lcs_fid;
         LASSERT(fid_is_sane(fid));
 
-        CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
+        CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,
+               PFID(fid));
 
         EXIT;
 out:
index 62a6e21..9a70219 100644 (file)
@@ -189,6 +189,10 @@ void seq_client_fini(struct lu_client_seq *seq);
 int seq_client_alloc_super(struct lu_client_seq *seq,
                            const struct lu_env *env);
 
+int seq_client_replay_super(struct lu_client_seq *seq,
+                            struct lu_range *range,
+                            const struct lu_env *env);
+
 int seq_client_alloc_meta(struct lu_client_seq *seq,
                           const struct lu_env *env);
 
index f0ff372..a6757d7 100644 (file)
@@ -970,7 +970,7 @@ static int ll_mkdir_generic(struct inode *dir, struct qstr *name,
         /* Allocate new fid. */
         err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data->fid2, &hint);
         if (err) {
-                CERROR("can't allocate new fid, rc %d\n", err);
+                CERROR("Can't allocate new fid, rc %d\n", err);
                 LBUG();
         }
 
index 7eaf4e0..f89c12f 100644 (file)
@@ -10,6 +10,7 @@ ostfailover_HOST=${ostfailover_HOST}
 mds1_HOST=${mds1_HOST:-$mds_HOST}
 mds2_HOST=$mds1_HOST
 mds3_HOST=$mds1_HOST
+mds4_HOST=$mds1_HOST
 ost2_HOST=${ost2_HOST:-$ost_HOST}
 gks_HOST=${gks_HOST:-$mds_HOST}
 
@@ -26,6 +27,7 @@ TMP=${TMP:-/tmp}
 
 MDSDEV=${MDSDEV:-$TMP/${FSNAME}-mdt1}
 MDSCOUNT=${MDSCOUNT:-3}
+test $MDSCOUNT -gt 4 && MDSCOUNT=4
 MDSDEVBASE=${MDSDEVBASE:-$TMP/${FSNAME}-mdt}
 MDSSIZE=${MDSSIZE:-100000}