* Ask client for new range, assign that range to ->seq_space and write
* seq state to backing store should be atomic.
*/
- down(&seq->lss_sem);
+ cfs_down(&seq->lss_sem);
if (cli == NULL) {
CDEBUG(D_INFO, "%s: Detached sequence client %s\n",
seq->lss_name, cli->lcs_name);
seq->lss_cli = cli;
+ cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
EXIT;
out_up:
- up(&seq->lss_sem);
+ cfs_up(&seq->lss_sem);
return rc;
}
EXPORT_SYMBOL(seq_server_set_cli);
-/*
+/**
* On controller node, allocate new super sequence for regular sequence server.
+ * As this super sequence controller, this node suppose to maintain fld
+ * and update index.
+ * \a out range always has currect mds node number of requester.
*/
+
static int __seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
- struct lu_range *space = &seq->lss_space;
- int rc;
+ struct lu_seq_range *space = &seq->lss_space;
+ struct thandle *th;
+ __u64 mdt = out->lsr_mdt;
+ int rc, credit;
ENTRY;
LASSERT(range_is_sane(space));
CDEBUG(D_INFO, "%s: Input seq range: "
DRANGE"\n", seq->lss_name, PRANGE(in));
- if (in->lr_end > space->lr_start)
- space->lr_start = in->lr_end;
+ if (in->lsr_end > space->lsr_start)
+ space->lsr_start = in->lsr_end;
*out = *in;
CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
"Only "LPU64" sequences left\n", seq->lss_name,
range_space(space));
*out = *space;
- space->lr_start = space->lr_end;
+ space->lsr_start = space->lsr_end;
} else if (range_is_exhausted(space)) {
CERROR("%s: Sequences space is exhausted\n",
seq->lss_name);
range_alloc(out, space, seq->lss_width);
}
}
+ out->lsr_mdt = mdt;
+
+ credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS;
- rc = seq_store_write(seq, env);
+ th = seq_store_trans_start(seq, env, credit);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
seq->lss_name, rc);
- RETURN(rc);
+ goto out;
+ }
+
+ rc = fld_server_create(seq->lss_site->ms_server_fld,
+ env, out, th);
+ if (rc) {
+ CERROR("%s: Can't Update fld database, rc %d\n",
+ seq->lss_name, rc);
}
- CDEBUG(D_INFO, "%s: Allocated super-sequence "
- DRANGE"\n", seq->lss_name, PRANGE(out));
+out:
+ seq_store_trans_stop(seq, env, th);
+
+ CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
+ DRANGE"\n", seq->lss_name, rc, PRANGE(out));
RETURN(rc);
}
int seq_server_alloc_super(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
int rc;
ENTRY;
- down(&seq->lss_sem);
+ cfs_down(&seq->lss_sem);
rc = __seq_server_alloc_super(seq, in, out, env);
- up(&seq->lss_sem);
+ cfs_up(&seq->lss_sem);
RETURN(rc);
}
static int __seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
- struct lu_range *space = &seq->lss_space;
+ struct lu_seq_range *space = &seq->lss_space;
+ struct thandle *th;
int rc = 0;
+
ENTRY;
LASSERT(range_is_sane(space));
CDEBUG(D_INFO, "%s: Input seq range: "
DRANGE"\n", seq->lss_name, PRANGE(in));
- if (range_is_exhausted(space)) {
+ if (in->lsr_end <= space->lsr_start) {
/*
- * Server cannot send empty range to client, this is why
- * we check here that range from client is "newer" than
- * exhausted super.
+ * Client is replaying a fairly old range, server
+ * don't need to do any allocation.
*/
- LASSERT(in->lr_end > space->lr_start);
-
+ } else if (range_is_exhausted(space)) {
/*
* Start is set to end of last allocated, because it
* *is* already allocated so we take that into account
* and do not use for other allocations.
*/
- space->lr_start = in->lr_end;
+ space->lsr_start = in->lsr_end;
/*
- * End is set to in->lr_start + super sequence
- * allocation unit. That is because in->lr_start is
+ * End is set to in->lsr_start + super sequence
+ * allocation unit. That is because in->lsr_start is
* first seq in new allocated range from controller
* before failure.
*/
- space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
+ space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH;
if (!seq->lss_cli) {
CERROR("%s: No sequence controller "
* obtained range from it was @space.
*/
rc = seq_client_replay_super(seq->lss_cli, space, env);
+
if (rc) {
CERROR("%s: Can't replay super-sequence, "
"rc %d\n", seq->lss_name, rc);
* Update super start by end from client's range. Super
* end should not be changed if range was not exhausted.
*/
- if (in->lr_end > space->lr_start)
- space->lr_start = in->lr_end;
+ space->lsr_start = in->lsr_end;
+ }
+
+ /* sending replay_super to update fld as only super sequence
+ * server can update fld.
+ * we are sending meta sequence to fld rather than super
+ * sequence, but fld server can handle range merging. */
+
+ in->lsr_mdt = space->lsr_mdt;
+ rc = seq_client_replay_super(seq->lss_cli, in, env);
+
+ if (rc) {
+ CERROR("%s: Can't replay super-sequence, "
+ "rc %d\n", seq->lss_name, rc);
+ RETURN(rc);
}
*out = *in;
range_alloc(out, space, seq->lss_width);
}
- rc = seq_store_write(seq, env);
+ th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, rc %d\n",
seq->lss_name, rc);
DRANGE"\n", seq->lss_name, PRANGE(out));
}
+ seq_store_trans_stop(seq, env, th);
RETURN(rc);
}
int seq_server_alloc_meta(struct lu_server_seq *seq,
- struct lu_range *in,
- struct lu_range *out,
+ struct lu_seq_range *in,
+ struct lu_seq_range *out,
const struct lu_env *env)
{
int rc;
ENTRY;
- down(&seq->lss_sem);
+ cfs_down(&seq->lss_sem);
rc = __seq_server_alloc_meta(seq, in, out, env);
- up(&seq->lss_sem);
+ cfs_up(&seq->lss_sem);
RETURN(rc);
}
static int seq_server_handle(struct lu_site *site,
const struct lu_env *env,
- __u32 opc, struct lu_range *in,
- struct lu_range *out)
+ __u32 opc, struct lu_seq_range *in,
+ struct lu_seq_range *out)
{
int rc;
struct md_site *mite;
const struct lu_env *env,
struct seq_thread_info *info)
{
- struct lu_range *out, *in = NULL;
+ struct lu_seq_range *out, *in = NULL, *tmp;
struct lu_site *site;
int rc = -EPROTO;
__u32 *opc;
if (out == NULL)
RETURN(err_serious(-EPROTO));
+ tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
+
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
- in = req_capsule_client_get(info->sti_pill,
- &RMF_SEQ_RANGE);
+ in = tmp;
- LASSERT(!range_is_zero(in) && range_is_sane(in));
+ if (range_is_zero(in) || !range_is_sane(in)) {
+ CERROR("Replayed seq range is invalid: "
+ DRANGE"\n", PRANGE(in));
+ RETURN(err_serious(-EINVAL));
+ }
}
+ /* seq client passed mdt id, we need to pass that using out
+ * range parameter */
+ out->lsr_mdt = tmp->lsr_mdt;
rc = seq_server_handle(site, env, *opc, in, out);
} else
rc = err_serious(-EPROTO);
struct dt_device *dev,
const char *prefix,
enum lu_mgr_type type,
+ struct md_site *ms,
const struct lu_env *env)
{
+ struct thandle *th;
int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
ENTRY;
seq->lss_cli = NULL;
seq->lss_type = type;
+ seq->lss_site = ms;
range_init(&seq->lss_space);
- sema_init(&seq->lss_sem, 1);
+ cfs_sema_init(&seq->lss_sem, 1);
seq->lss_width = is_srv ?
LUSTRE_SEQ_META_WIDTH : LUSTRE_SEQ_SUPER_WIDTH;
rc = seq_store_init(seq, env, dev);
if (rc)
GOTO(out, rc);
-
/* Request backing store for saved sequence info. */
rc = seq_store_read(seq, env);
if (rc == -ENODATA) {
LUSTRE_SEQ_ZERO_RANGE:
LUSTRE_SEQ_SPACE_RANGE;
+ seq->lss_space.lsr_mdt = ms->ms_node_id;
CDEBUG(D_INFO, "%s: No data found "
"on store. Initialize space\n",
seq->lss_name);
+ th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
/* Save default controller value to store. */
- rc = seq_store_write(seq, env);
+ rc = seq_store_write(seq, env, th);
if (rc) {
CERROR("%s: Can't write space data, "
"rc %d\n", seq->lss_name, rc);
}
+ seq_store_trans_stop(seq, env, th);
} else if (rc) {
CERROR("%s: Can't read space data, rc %d\n",
seq->lss_name, rc);
static void __exit fid_mod_exit(void)
{
+ llo_local_obj_unregister(&llod_seq_srv);
+ llo_local_obj_unregister(&llod_seq_ctl);
+
lu_context_key_degister(&seq_thread_key);
if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) {
lprocfs_remove(&seq_type_proc_dir);