/* on controller node, allocate new super sequence for regular sequence
* server. */
static int __seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *range,
+ struct lu_range *in,
+ struct lu_range *out,
const struct lu_context *ctx)
{
struct lu_range *space = &seq->lss_space;
LASSERT(range_is_sane(space));
- if (range_space(space) < seq->lss_super_width) {
- CWARN("sequences space is going to exhaust soon. "
- "Can allocate only "LPU64" sequences\n",
- range_space(space));
- *range = *space;
- space->lr_start = space->lr_end;
- rc = 0;
- } else if (range_is_exhausted(space)) {
- CERROR("sequences space is exhausted\n");
- rc = -ENOSPC;
- } else {
- range_alloc(range, space, seq->lss_super_width);
- rc = 0;
- }
+ if (in != NULL) {
+ CDEBUG(D_INFO, "%s: recovery - use input range "
+ DRANGE"\n", seq->lss_name, PRANGE(in));
- if (rc == 0) {
- rc = seq_store_write(seq, ctx);
- if (rc) {
- CERROR("can't save state, rc = %d\n",
- rc);
+ if (in->lr_start > space->lr_start)
+ space->lr_start = in->lr_start;
+ *out = *in;
+ } else {
+ if (range_space(space) < seq->lss_super_width) {
+ CWARN("sequences space is going to exhaust soon. "
+ "Can allocate only "LPU64" sequences\n",
+ range_space(space));
+ *out = *space;
+ space->lr_start = space->lr_end;
+ } else if (range_is_exhausted(space)) {
+ CERROR("sequences space is exhausted\n");
+ RETURN(-ENOSPC);
+ } else {
+ range_alloc(out, space, seq->lss_super_width);
}
}
- if (rc == 0) {
- CDEBUG(D_INFO, "%s: allocated super-sequence "
- DRANGE"\n", seq->lss_name, PRANGE(range));
+ rc = seq_store_write(seq, ctx);
+ if (rc) {
+ CERROR("can't save state, rc = %d\n",
+ rc);
+ RETURN(rc);
}
+ CDEBUG(D_INFO, "%s: allocated super-sequence "
+ DRANGE"\n", seq->lss_name, PRANGE(out));
+
RETURN(rc);
}
int seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *range,
+ struct lu_range *in,
+ struct lu_range *out,
const struct lu_context *ctx)
{
int rc;
ENTRY;
down(&seq->lss_sem);
- rc = __seq_server_alloc_super(seq, range, ctx);
+ rc = __seq_server_alloc_super(seq, in, out, ctx);
up(&seq->lss_sem);
RETURN(rc);
}
static int __seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *range,
+ struct lu_range *in,
+ struct lu_range *out,
const struct lu_context *ctx)
{
struct lu_range *super = &seq->lss_super;
LASSERT(range_is_sane(super));
- /* XXX: here we should avoid cascading RPCs using kind of async
- * preallocation when meta-sequence is close to exhausting. */
- if (range_is_exhausted(super)) {
- if (!seq->lss_cli) {
- CERROR("no seq-controller client is setup\n");
- RETURN(-EOPNOTSUPP);
+ /*
+ * This is recovery case. Adjust super range if input range looks like
+ * it is allocated from new super.
+ */
+ if (in != NULL) {
+ CDEBUG(D_INFO, "%s: recovery - use input range "
+ DRANGE"\n", seq->lss_name, PRANGE(in));
+
+ if (range_is_exhausted(super)) {
+ LASSERT(in->lr_start > super->lr_start);
+
+ /*
+ * Server cannot send to client empty range, this is why
+ * we check here that range from client is "newer" than
+ * exhausted super.
+ */
+ super->lr_start = in->lr_start;
+
+ super->lr_end = super->lr_start +
+ LUSTRE_SEQ_SUPER_WIDTH;
+ } else {
+ /*
+ * Update super start by start from client's range. End
+ * should not be changed if range was not exhausted.
+ */
+ if (in->lr_start > super->lr_start)
+ super->lr_start = in->lr_start;
}
- rc = seq_client_alloc_super(seq->lss_cli);
- if (rc) {
- CERROR("can't allocate new super-sequence, "
- "rc %d\n", rc);
- RETURN(rc);
+ *out = *in;
+ } else {
+ /*
+ * XXX: avoid cascading RPCs using kind of async preallocation
+ * when meta-sequence is close to exhausting.
+ */
+ if (range_is_exhausted(super)) {
+ if (!seq->lss_cli) {
+ CERROR("no seq-controller client is setup\n");
+ RETURN(-EOPNOTSUPP);
+ }
+
+ rc = seq_client_alloc_super(seq->lss_cli);
+ if (rc) {
+ CERROR("can't allocate new super-sequence, "
+ "rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ /* saving new range into allocation space. */
+ *super = seq->lss_cli->lcs_range;
+ LASSERT(range_is_sane(super));
}
-
- /* saving new range into allocation space. */
- *super = seq->lss_cli->lcs_range;
- LASSERT(range_is_sane(super));
+ range_alloc(out, super, seq->lss_meta_width);
}
- range_alloc(range, super, seq->lss_meta_width);
rc = seq_store_write(seq, ctx);
if (rc) {
if (rc == 0) {
CDEBUG(D_INFO, "%s: allocated meta-sequence "
- DRANGE"\n", seq->lss_name, PRANGE(range));
+ DRANGE"\n", seq->lss_name, PRANGE(out));
}
RETURN(rc);
}
int seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *range,
+ struct lu_range *in,
+ struct lu_range *out,
const struct lu_context *ctx)
{
int rc;
ENTRY;
down(&seq->lss_sem);
- rc = __seq_server_alloc_meta(seq, range, ctx);
+ rc = __seq_server_alloc_meta(seq, in, out, ctx);
up(&seq->lss_sem);
RETURN(rc);
static int seq_server_handle(struct lu_site *site,
const struct lu_context *ctx,
- __u32 opc, struct lu_range *out)
+ __u32 opc, struct lu_range *in,
+ struct lu_range *out)
{
int rc;
ENTRY;
RETURN(-EINVAL);
}
rc = seq_server_alloc_meta(site->ls_server_seq,
- out, ctx);
+ in, out, ctx);
break;
case SEQ_ALLOC_SUPER:
if (!site->ls_control_seq) {
RETURN(-EINVAL);
}
rc = seq_server_alloc_super(site->ls_control_seq,
- out, ctx);
+ in, out, ctx);
break;
default:
rc = -EINVAL;
static int seq_req_handle(struct ptlrpc_request *req,
struct seq_thread_info *info)
{
+ struct lu_range *out, *in = NULL;
struct lu_site *site;
- struct lu_range *out;
int rc = -EPROTO;
__u32 *opc;
ENTRY;
if (out == NULL)
RETURN(-EPROTO);
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ in = req_capsule_server_get(&info->sti_pill,
+ &RMF_SEQ_RANGE);
+
+ LASSERT(!range_is_zero(in) && range_is_sane(in));
+ }
+
ctx = req->rq_svc_thread->t_ctx;
- rc = seq_server_handle(site, ctx, *opc, out);
+ rc = seq_server_handle(site, ctx, *opc, in, out);
}
RETURN(rc);