Whamcloud - gitweb
- recovery support in seq-mgr.
authoryury <yury>
Thu, 7 Sep 2006 10:58:55 +0000 (10:58 +0000)
committeryury <yury>
Thu, 7 Sep 2006 10:58:55 +0000 (10:58 +0000)
lustre/fid/fid_handler.c
lustre/fid/fid_internal.h
lustre/fid/fid_request.c
lustre/fld/fld_request.c
lustre/include/lustre_fid.h
lustre/include/lustre_net.h
lustre/ptlrpc/layout.c

index 582ecde..d2a5e35 100644 (file)
@@ -131,7 +131,8 @@ EXPORT_SYMBOL(seq_server_set_cli);
 /* on controller node, allocate new super sequence for regular sequence
  * server. */
 static int __seq_server_alloc_super(struct lu_server_seq *seq,
-                                    struct lu_range *range,
+                                    struct lu_range *in,
+                                    struct lu_range *out,
                                     const struct lu_context *ctx)
 {
         struct lu_range *space = &seq->lss_space;
@@ -140,53 +141,59 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
 
         LASSERT(range_is_sane(space));
 
-        if (range_space(space) < seq->lss_super_width) {
-                CWARN("sequences space is going to exhaust soon. "
-                      "Can allocate only "LPU64" sequences\n",
-                      range_space(space));
-                *range = *space;
-                space->lr_start = space->lr_end;
-                rc = 0;
-        } else if (range_is_exhausted(space)) {
-                CERROR("sequences space is exhausted\n");
-                rc = -ENOSPC;
-        } else {
-                range_alloc(range, space, seq->lss_super_width);
-                rc = 0;
-        }
+        if (in != NULL) {
+                CDEBUG(D_INFO, "%s: recovery - use input range "
+                       DRANGE"\n", seq->lss_name, PRANGE(in));
 
-        if (rc == 0) {
-                rc = seq_store_write(seq, ctx);
-                if (rc) {
-                        CERROR("can't save state, rc = %d\n",
-                               rc);
+                if (in->lr_start > space->lr_start)
+                        space->lr_start = in->lr_start;
+                *out = *in;
+        } else {
+                if (range_space(space) < seq->lss_super_width) {
+                        CWARN("sequences space is going to exhaust soon. "
+                              "Can allocate only "LPU64" sequences\n",
+                              range_space(space));
+                        *out = *space;
+                        space->lr_start = space->lr_end;
+                } else if (range_is_exhausted(space)) {
+                        CERROR("sequences space is exhausted\n");
+                        RETURN(-ENOSPC);
+                } else {
+                        range_alloc(out, space, seq->lss_super_width);
                 }
         }
 
-        if (rc == 0) {
-                CDEBUG(D_INFO, "%s: allocated super-sequence "
-                       DRANGE"\n", seq->lss_name, PRANGE(range));
+        rc = seq_store_write(seq, ctx);
+        if (rc) {
+                CERROR("can't save state, rc = %d\n",
+                       rc);
+                RETURN(rc);
         }
 
+        CDEBUG(D_INFO, "%s: allocated super-sequence "
+               DRANGE"\n", seq->lss_name, PRANGE(out));
+
         RETURN(rc);
 }
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
-                           struct lu_range *range,
+                           struct lu_range *in,
+                           struct lu_range *out,
                            const struct lu_context *ctx)
 {
         int rc;
         ENTRY;
 
         down(&seq->lss_sem);
-        rc = __seq_server_alloc_super(seq, range, ctx);
+        rc = __seq_server_alloc_super(seq, in, out, ctx);
         up(&seq->lss_sem);
 
         RETURN(rc);
 }
 
 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
-                                   struct lu_range *range,
+                                   struct lu_range *in,
+                                   struct lu_range *out,
                                    const struct lu_context *ctx)
 {
         struct lu_range *super = &seq->lss_super;
@@ -195,26 +202,60 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
 
         LASSERT(range_is_sane(super));
 
-        /* XXX: here we should avoid cascading RPCs using kind of async
-         * preallocation when meta-sequence is close to exhausting. */
-        if (range_is_exhausted(super)) {
-                if (!seq->lss_cli) {
-                        CERROR("no seq-controller client is setup\n");
-                        RETURN(-EOPNOTSUPP);
+        /* 
+         * This is recovery case. Adjust super range if input range looks like
+         * it is allocated from new super.
+         */
+        if (in != NULL) {
+                CDEBUG(D_INFO, "%s: recovery - use input range "
+                       DRANGE"\n", seq->lss_name, PRANGE(in));
+
+                if (range_is_exhausted(super)) {
+                        LASSERT(in->lr_start > super->lr_start);
+
+                        /* 
+                         * Server cannot send to client empty range, this is why
+                         * we check here that range from client is "newer" than
+                         * exhausted super.
+                         */
+                        super->lr_start = in->lr_start;
+                        
+                        super->lr_end = super->lr_start +
+                                LUSTRE_SEQ_SUPER_WIDTH;
+                } else {
+                        /* 
+                         * Update super start by start from client's range. End
+                         * should not be changed if range was not exhausted.
+                         */
+                        if (in->lr_start > super->lr_start)
+                                super->lr_start = in->lr_start;
                 }
 
-                rc = seq_client_alloc_super(seq->lss_cli);
-                if (rc) {
-                        CERROR("can't allocate new super-sequence, "
-                               "rc %d\n", rc);
-                        RETURN(rc);
+                *out = *in;
+        } else {
+                /*
+                 * XXX: avoid cascading RPCs using kind of async preallocation
+                 * when meta-sequence is close to exhausting.
+                 */
+                if (range_is_exhausted(super)) {
+                        if (!seq->lss_cli) {
+                                CERROR("no seq-controller client is setup\n");
+                                RETURN(-EOPNOTSUPP);
+                        }
+
+                        rc = seq_client_alloc_super(seq->lss_cli);
+                        if (rc) {
+                                CERROR("can't allocate new super-sequence, "
+                                       "rc %d\n", rc);
+                                RETURN(rc);
+                        }
+
+                        /* saving new range into allocation space. */
+                        *super = seq->lss_cli->lcs_range;
+                        LASSERT(range_is_sane(super));
                 }
-
-                /* saving new range into allocation space. */
-                *super = seq->lss_cli->lcs_range;
-                LASSERT(range_is_sane(super));
+                range_alloc(out, super, seq->lss_meta_width);
         }
-        range_alloc(range, super, seq->lss_meta_width);
 
         rc = seq_store_write(seq, ctx);
         if (rc) {
@@ -224,21 +265,22 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
 
         if (rc == 0) {
                 CDEBUG(D_INFO, "%s: allocated meta-sequence "
-                       DRANGE"\n", seq->lss_name, PRANGE(range));
+                       DRANGE"\n", seq->lss_name, PRANGE(out));
         }
 
         RETURN(rc);
 }
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
-                          struct lu_range *range,
+                          struct lu_range *in,
+                          struct lu_range *out,
                           const struct lu_context *ctx)
 {
         int rc;
         ENTRY;
 
         down(&seq->lss_sem);
-        rc = __seq_server_alloc_meta(seq, range, ctx);
+        rc = __seq_server_alloc_meta(seq, in, out, ctx);
         up(&seq->lss_sem);
 
         RETURN(rc);
@@ -246,7 +288,8 @@ int seq_server_alloc_meta(struct lu_server_seq *seq,
 
 static int seq_server_handle(struct lu_site *site,
                              const struct lu_context *ctx,
-                             __u32 opc, struct lu_range *out)
+                             __u32 opc, struct lu_range *in,
+                             struct lu_range *out)
 {
         int rc;
         ENTRY;
@@ -259,7 +302,7 @@ static int seq_server_handle(struct lu_site *site,
                         RETURN(-EINVAL);
                 }
                 rc = seq_server_alloc_meta(site->ls_server_seq,
-                                           out, ctx);
+                                           in, out, ctx);
                 break;
         case SEQ_ALLOC_SUPER:
                 if (!site->ls_control_seq) {
@@ -268,7 +311,7 @@ static int seq_server_handle(struct lu_site *site,
                         RETURN(-EINVAL);
                 }
                 rc = seq_server_alloc_super(site->ls_control_seq,
-                                            out, ctx);
+                                            in, out, ctx);
                 break;
         default:
                 rc = -EINVAL;
@@ -281,8 +324,8 @@ static int seq_server_handle(struct lu_site *site,
 static int seq_req_handle(struct ptlrpc_request *req,
                           struct seq_thread_info *info)
 {
+        struct lu_range *out, *in = NULL;
         struct lu_site *site;
-        struct lu_range *out;
         int rc = -EPROTO;
         __u32 *opc;
         ENTRY;
@@ -304,8 +347,15 @@ static int seq_req_handle(struct ptlrpc_request *req,
                 if (out == NULL)
                         RETURN(-EPROTO);
 
+                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                        in = req_capsule_server_get(&info->sti_pill,
+                                                    &RMF_SEQ_RANGE);
+
+                        LASSERT(!range_is_zero(in) && range_is_sane(in));
+                }
+        
                 ctx = req->rq_svc_thread->t_ctx;
-                rc = seq_server_handle(site, ctx, *opc, out);
+                rc = seq_server_handle(site, ctx, *opc, in, out);
         }
 
         RETURN(rc);
index 79ebea6..3152dec 100644 (file)
@@ -44,7 +44,7 @@ struct seq_thread_info {
         struct txn_param        sti_txn;
         struct req_capsule      sti_pill;
         struct seq_store_record sti_record;
-        int                     sti_rep_buf_size[3];
+        int                     sti_rep_buf_size[4];
 };
 
 extern struct lu_context_key seq_thread_key;
index 8ce5f3c..1b509d6 100644 (file)
@@ -51,29 +51,34 @@ static int seq_client_rpc(struct lu_client_seq *seq,
                           struct lu_range *range,
                           __u32 opc, const char *opcname)
 {
-        int rc, size[2] = { sizeof(struct ptlrpc_body),
-                            sizeof(__u32) };
+        int rc, size[3] = { sizeof(struct ptlrpc_body),
+                            sizeof(__u32),
+                            sizeof(struct lu_range) };
         struct obd_export *exp = seq->lcs_exp;
         struct ptlrpc_request *req;
+        struct lu_range *out, *in;
         struct req_capsule pill;
-        struct lu_range *ran;
         __u32 *op;
         ENTRY;
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp),
                              LUSTRE_MDS_VERSION,
-                              SEQ_QUERY, 2, size,
+                              SEQ_QUERY, 3, size,
                               NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
-
         req_capsule_set(&pill, &RQF_SEQ_QUERY);
 
+        /* init operation code */
         op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
         *op = opc;
 
+        /* zero out input range, this is not recovery yet. */
+        in = req_capsule_client_get(&pill, &RMF_SEQ_RANGE);
+        range_zero(in);
+        
         size[1] = sizeof(struct lu_range);
         ptlrpc_req_set_repsize(req, 2, size);
 
@@ -89,10 +94,8 @@ static int seq_client_rpc(struct lu_client_seq *seq,
         if (rc)
                 GOTO(out_req, rc);
 
-        ran = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
-        if (ran == NULL)
-                GOTO(out_req, rc = -EPROTO);
-        *range = *ran;
+        out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
+        *range = *out;
 
         if (!range_is_sane(range)) {
                 CERROR("invalid seq range obtained from server: "
@@ -106,11 +109,12 @@ static int seq_client_rpc(struct lu_client_seq *seq,
                 GOTO(out_req, rc = -EINVAL);
         }
 
-        if (rc == 0) {
-                CDEBUG(D_INFO, "%s: allocated %s-sequence "
-                       DRANGE"]\n", seq->lcs_name, opcname,
-                       PRANGE(range));
-        }
+        /* Save server out to request for recovery case. */
+        *in = *out;
+                
+        CDEBUG(D_INFO, "%s: allocated %s-sequence "
+               DRANGE"]\n", seq->lcs_name, opcname,
+               PRANGE(range));
         
         EXIT;
 out_req:
@@ -126,7 +130,7 @@ static int __seq_client_alloc_super(struct lu_client_seq *seq)
         
 #ifdef __KERNEL__
         if (seq->lcs_srv) {
-                rc = seq_server_alloc_super(seq->lcs_srv,
+                rc = seq_server_alloc_super(seq->lcs_srv, NULL,
                                             &seq->lcs_range,
                                             seq->lcs_ctx);
         } else {
@@ -159,7 +163,7 @@ static int __seq_client_alloc_meta(struct lu_client_seq *seq)
 
 #ifdef __KERNEL__
         if (seq->lcs_srv) {
-                rc = seq_server_alloc_meta(seq->lcs_srv,
+                rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
                                            &seq->lcs_range,
                                            seq->lcs_ctx);
         } else {
index 7c1440f..aea71fe 100644 (file)
@@ -374,13 +374,13 @@ static int fld_client_rpc(struct obd_export *exp,
         LASSERT(exp != NULL);
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                              LUSTRE_MDS_VERSION, FLD_QUERY,
-                              3, size, NULL);
+                              LUSTRE_MDS_VERSION,
+                              FLD_QUERY, 3, size,
+                              NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
-
         req_capsule_set(&pill, &RQF_FLD_QUERY);
 
         op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
index 2b0046c..0f06f41 100644 (file)
@@ -149,11 +149,13 @@ void seq_server_fini(struct lu_server_seq *seq,
                      const struct lu_context *ctx);
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
-                           struct lu_range *range,
+                           struct lu_range *in,
+                           struct lu_range *out,
                            const struct lu_context *ctx);
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
-                          struct lu_range *range,
+                          struct lu_range *in,
+                          struct lu_range *out,
                           const struct lu_context *ctx);
 
 int seq_server_set_cli(struct lu_server_seq *seq,
index 8fa6dbd..a2fd0df 100644 (file)
 /* FLD_MAXREPSIZE == lustre_msg + __u32 padding + ptlrpc_body + md_fld */
 #define FLD_MAXREPSIZE  (160)
 
-/* SEQ_MAXREQSIZE == lustre_msg + __u32 padding + opc + ptlrpc_body + __u32 padding */
-#define SEQ_MAXREQSIZE  (152)
+/* SEQ_MAXREQSIZE == lustre_msg + __u32 padding + ptlrpc_body + opc + lu_range +
+ * __u32 padding */
+#define SEQ_MAXREQSIZE  (168)
 
 /* SEQ_MAXREPSIZE == lustre_msg + __u32 padding + ptlrpc_body + lu_range */
 #define SEQ_MAXREPSIZE  (160)
index 5226cba..21554fa 100644 (file)
@@ -80,7 +80,8 @@ static const struct req_msg_field *mds_statfs_server[] = {
 
 static const struct req_msg_field *seq_query_client[] = {
         &RMF_PTLRPC_BODY,
-        &RMF_SEQ_OPC
+        &RMF_SEQ_OPC,
+        &RMF_SEQ_RANGE
 };
 
 static const struct req_msg_field *seq_query_server[] = {