Whamcloud - gitweb
- recovery support in seq-mgr.
[fs/lustre-release.git] / lustre / fid / fid_handler.c
index 582ecde..d2a5e35 100644 (file)
@@ -131,7 +131,8 @@ EXPORT_SYMBOL(seq_server_set_cli);
 /* on controller node, allocate new super sequence for regular sequence
  * server. */
 static int __seq_server_alloc_super(struct lu_server_seq *seq,
-                                    struct lu_range *range,
+                                    struct lu_range *in,
+                                    struct lu_range *out,
                                     const struct lu_context *ctx)
 {
         struct lu_range *space = &seq->lss_space;
@@ -140,53 +141,59 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
 
         LASSERT(range_is_sane(space));
 
-        if (range_space(space) < seq->lss_super_width) {
-                CWARN("sequences space is going to exhaust soon. "
-                      "Can allocate only "LPU64" sequences\n",
-                      range_space(space));
-                *range = *space;
-                space->lr_start = space->lr_end;
-                rc = 0;
-        } else if (range_is_exhausted(space)) {
-                CERROR("sequences space is exhausted\n");
-                rc = -ENOSPC;
-        } else {
-                range_alloc(range, space, seq->lss_super_width);
-                rc = 0;
-        }
+        if (in != NULL) {
+                CDEBUG(D_INFO, "%s: recovery - use input range "
+                       DRANGE"\n", seq->lss_name, PRANGE(in));
 
-        if (rc == 0) {
-                rc = seq_store_write(seq, ctx);
-                if (rc) {
-                        CERROR("can't save state, rc = %d\n",
-                               rc);
+                if (in->lr_start > space->lr_start)
+                        space->lr_start = in->lr_start;
+                *out = *in;
+        } else {
+                if (range_space(space) < seq->lss_super_width) {
+                        CWARN("sequences space is going to exhaust soon. "
+                              "Can allocate only "LPU64" sequences\n",
+                              range_space(space));
+                        *out = *space;
+                        space->lr_start = space->lr_end;
+                } else if (range_is_exhausted(space)) {
+                        CERROR("sequences space is exhausted\n");
+                        RETURN(-ENOSPC);
+                } else {
+                        range_alloc(out, space, seq->lss_super_width);
                 }
         }
 
-        if (rc == 0) {
-                CDEBUG(D_INFO, "%s: allocated super-sequence "
-                       DRANGE"\n", seq->lss_name, PRANGE(range));
+        rc = seq_store_write(seq, ctx);
+        if (rc) {
+                CERROR("can't save state, rc = %d\n",
+                       rc);
+                RETURN(rc);
         }
 
+        CDEBUG(D_INFO, "%s: allocated super-sequence "
+               DRANGE"\n", seq->lss_name, PRANGE(out));
+
         RETURN(rc);
 }
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
-                           struct lu_range *range,
+                           struct lu_range *in,
+                           struct lu_range *out,
                            const struct lu_context *ctx)
 {
         int rc;
         ENTRY;
 
         down(&seq->lss_sem);
-        rc = __seq_server_alloc_super(seq, range, ctx);
+        rc = __seq_server_alloc_super(seq, in, out, ctx);
         up(&seq->lss_sem);
 
         RETURN(rc);
 }
 
 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
-                                   struct lu_range *range,
+                                   struct lu_range *in,
+                                   struct lu_range *out,
                                    const struct lu_context *ctx)
 {
         struct lu_range *super = &seq->lss_super;
@@ -195,26 +202,60 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
 
         LASSERT(range_is_sane(super));
 
-        /* XXX: here we should avoid cascading RPCs using kind of async
-         * preallocation when meta-sequence is close to exhausting. */
-        if (range_is_exhausted(super)) {
-                if (!seq->lss_cli) {
-                        CERROR("no seq-controller client is setup\n");
-                        RETURN(-EOPNOTSUPP);
+        /* 
+         * This is recovery case. Adjust super range if input range looks like
+         * it is allocated from new super.
+         */
+        if (in != NULL) {
+                CDEBUG(D_INFO, "%s: recovery - use input range "
+                       DRANGE"\n", seq->lss_name, PRANGE(in));
+
+                if (range_is_exhausted(super)) {
+                        LASSERT(in->lr_start > super->lr_start);
+
+                        /* 
+                         * Server cannot send to client empty range, this is why
+                         * we check here that range from client is "newer" than
+                         * exhausted super.
+                         */
+                        super->lr_start = in->lr_start;
+                        
+                        super->lr_end = super->lr_start +
+                                LUSTRE_SEQ_SUPER_WIDTH;
+                } else {
+                        /* 
+                         * Update super start by start from client's range. End
+                         * should not be changed if range was not exhausted.
+                         */
+                        if (in->lr_start > super->lr_start)
+                                super->lr_start = in->lr_start;
                 }
 
-                rc = seq_client_alloc_super(seq->lss_cli);
-                if (rc) {
-                        CERROR("can't allocate new super-sequence, "
-                               "rc %d\n", rc);
-                        RETURN(rc);
+                *out = *in;
+        } else {
+                /*
+                 * XXX: avoid cascading RPCs using kind of async preallocation
+                 * when meta-sequence is close to exhausting.
+                 */
+                if (range_is_exhausted(super)) {
+                        if (!seq->lss_cli) {
+                                CERROR("no seq-controller client is setup\n");
+                                RETURN(-EOPNOTSUPP);
+                        }
+
+                        rc = seq_client_alloc_super(seq->lss_cli);
+                        if (rc) {
+                                CERROR("can't allocate new super-sequence, "
+                                       "rc %d\n", rc);
+                                RETURN(rc);
+                        }
+
+                        /* saving new range into allocation space. */
+                        *super = seq->lss_cli->lcs_range;
+                        LASSERT(range_is_sane(super));
                 }
-
-                /* saving new range into allocation space. */
-                *super = seq->lss_cli->lcs_range;
-                LASSERT(range_is_sane(super));
+                range_alloc(out, super, seq->lss_meta_width);
         }
-        range_alloc(range, super, seq->lss_meta_width);
 
         rc = seq_store_write(seq, ctx);
         if (rc) {
@@ -224,21 +265,22 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
 
         if (rc == 0) {
                 CDEBUG(D_INFO, "%s: allocated meta-sequence "
-                       DRANGE"\n", seq->lss_name, PRANGE(range));
+                       DRANGE"\n", seq->lss_name, PRANGE(out));
         }
 
         RETURN(rc);
 }
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
-                          struct lu_range *range,
+                          struct lu_range *in,
+                          struct lu_range *out,
                           const struct lu_context *ctx)
 {
         int rc;
         ENTRY;
 
         down(&seq->lss_sem);
-        rc = __seq_server_alloc_meta(seq, range, ctx);
+        rc = __seq_server_alloc_meta(seq, in, out, ctx);
         up(&seq->lss_sem);
 
         RETURN(rc);
@@ -246,7 +288,8 @@ int seq_server_alloc_meta(struct lu_server_seq *seq,
 
 static int seq_server_handle(struct lu_site *site,
                              const struct lu_context *ctx,
-                             __u32 opc, struct lu_range *out)
+                             __u32 opc, struct lu_range *in,
+                             struct lu_range *out)
 {
         int rc;
         ENTRY;
@@ -259,7 +302,7 @@ static int seq_server_handle(struct lu_site *site,
                         RETURN(-EINVAL);
                 }
                 rc = seq_server_alloc_meta(site->ls_server_seq,
-                                           out, ctx);
+                                           in, out, ctx);
                 break;
         case SEQ_ALLOC_SUPER:
                 if (!site->ls_control_seq) {
@@ -268,7 +311,7 @@ static int seq_server_handle(struct lu_site *site,
                         RETURN(-EINVAL);
                 }
                 rc = seq_server_alloc_super(site->ls_control_seq,
-                                            out, ctx);
+                                            in, out, ctx);
                 break;
         default:
                 rc = -EINVAL;
@@ -281,8 +324,8 @@ static int seq_server_handle(struct lu_site *site,
 static int seq_req_handle(struct ptlrpc_request *req,
                           struct seq_thread_info *info)
 {
+        struct lu_range *out, *in = NULL;
         struct lu_site *site;
-        struct lu_range *out;
         int rc = -EPROTO;
         __u32 *opc;
         ENTRY;
@@ -304,8 +347,15 @@ static int seq_req_handle(struct ptlrpc_request *req,
                 if (out == NULL)
                         RETURN(-EPROTO);
 
+                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                        in = req_capsule_server_get(&info->sti_pill,
+                                                    &RMF_SEQ_RANGE);
+
+                        LASSERT(!range_is_zero(in) && range_is_sane(in));
+                }
+        
                 ctx = req->rq_svc_thread->t_ctx;
-                rc = seq_server_handle(site, ctx, *opc, out);
+                rc = seq_server_handle(site, ctx, *opc, in, out);
         }
 
         RETURN(rc);