- client allocates new meta-seq uppon connect.
LASSERT(range_is_sane(space));
- if (space->lr_end - space->lr_start < LUSTRE_SEQ_SUPER_CHUNK) {
+ if (range_space(space) < LUSTRE_SEQ_SUPER_CHUNK) {
CWARN("sequences space is going to exhauste soon. "
"Only can allocate "LPU64" sequences\n",
space->lr_end - space->lr_start);
*range = *space;
space->lr_start = space->lr_end;
rc = 0;
- } else if (space->lr_start == space->lr_end) {
+ } else if (range_is_exhausted(space)) {
CERROR("sequences space is exhausted\n");
rc = -ENOSPC;
} else {
LASSERT(range_is_sane(super));
- /* XXX: here should avoid cascading RPCs using kind of async
+ /* XXX: here we should avoid cascading RPCs using kind of async
* preallocation when meta-sequence is close to exhausting. */
- if (super->lr_start == super->lr_end) {
+ if (range_is_exhausted(super)) {
if (!seq->seq_cli) {
CERROR("no seq-controller client is setup\n");
RETURN(-EOPNOTSUPP);
struct lu_client_seq *cli,
const struct lu_context *ctx)
{
- int rc;
+ int rc = 0;
ENTRY;
LASSERT(cli != NULL);
"sequence controller client %s\n",
cli->seq_exp->exp_client_uuid.uuid);
+ down(&seq->seq_sem);
+
/* assign controller */
seq->seq_cli = cli;
- /* allocate new super-sequence */
- rc = seq_client_alloc_super(cli);
- if (rc) {
- CERROR("can't allocate super-sequence, "
- "rc %d\n", rc);
- RETURN(rc);
- }
+ /* get new range from controller only if super-sequence is not yet
+ * initialized from backing store or something else. */
+ if (range_is_zero(&seq->seq_super)) {
+ /* release sema to avoid deadlock for case we're asking our
+ * selves. */
+ up(&seq->seq_sem);
+ rc = seq_client_alloc_super(cli);
+ down(&seq->seq_sem);
+
+ if (rc) {
+ CERROR("can't allocate super-sequence, "
+ "rc %d\n", rc);
+ RETURN(rc);
+ }
- /* take super-seq from client seq mgr */
- LASSERT(range_is_sane(&cli->seq_range));
+ /* take super-seq from client seq mgr */
+ LASSERT(range_is_sane(&cli->seq_range));
- down(&seq->seq_sem);
- seq->seq_super = cli->seq_range;
+ seq->seq_super = cli->seq_range;
- /* save init seq to backing store. */
- rc = seq_server_write_state(seq, ctx);
- if (rc) {
- CERROR("can't write sequence state, "
- "rc = %d\n", rc);
+ /* save init seq to backing store. */
+ rc = seq_server_write_state(seq, ctx);
+ if (rc) {
+ CERROR("can't write sequence state, "
+ "rc = %d\n", rc);
+ }
}
-
up(&seq->seq_sem);
-
RETURN(rc);
}
EXPORT_SYMBOL(seq_server_controller);
struct lu_range *range,
__u32 opc)
{
+ struct obd_export *exp = seq->seq_exp;
int repsize = sizeof(struct lu_range);
int rc, reqsize = sizeof(__u32);
struct ptlrpc_request *req;
__u32 *op;
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp),
- LUSTRE_MDS_VERSION, SEQ_QUERY,
- 1, &reqsize, NULL);
+ req = ptlrpc_prep_req(class_exp2cliimp(exp),
+ LUSTRE_MDS_VERSION,
+ SEQ_QUERY, 1, &reqsize,
+ NULL);
if (req == NULL)
RETURN(-ENOMEM);
req->rq_replen = lustre_msg_size(1, &repsize);
req->rq_request_portal = MDS_SEQ_PORTAL;
+
rc = ptlrpc_queue_wait(req);
if (rc)
GOTO(out_req, rc);
}
/* request sequence-controller node to allocate new super-sequence. */
-int
-seq_client_alloc_super(struct lu_client_seq *seq)
+static int
+__seq_client_alloc_super(struct lu_client_seq *seq)
{
int rc;
ENTRY;
- rc = seq_client_rpc(seq, &seq->seq_range,
- SEQ_ALLOC_SUPER);
+ rc = seq_client_rpc(seq, &seq->seq_range, SEQ_ALLOC_SUPER);
if (rc == 0) {
CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated super-sequence "
"["LPX64"-"LPX64"]\n", seq->seq_range.lr_start,
}
RETURN(rc);
}
+
+int
+seq_client_alloc_super(struct lu_client_seq *seq)
+{
+ int rc;
+ ENTRY;
+
+ down(&seq->seq_sem);
+ rc = __seq_client_alloc_super(seq);
+ up(&seq->seq_sem);
+
+ RETURN(rc);
+}
EXPORT_SYMBOL(seq_client_alloc_super);
/* request sequence-controller node to allocate new meta-sequence. */
-int
-seq_client_alloc_meta(struct lu_client_seq *seq)
+static int
+__seq_client_alloc_meta(struct lu_client_seq *seq)
{
int rc;
ENTRY;
- rc = seq_client_rpc(seq, &seq->seq_range,
- SEQ_ALLOC_META);
+ rc = seq_client_rpc(seq, &seq->seq_range, SEQ_ALLOC_META);
if (rc == 0) {
CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated meta-sequence "
"["LPX64"-"LPX64"]\n", seq->seq_range.lr_start,
}
RETURN(rc);
}
-EXPORT_SYMBOL(seq_client_alloc_meta);
-/* allocate new sequence for client (llite or MDC are expected to use this) */
int
-seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+seq_client_alloc_meta(struct lu_client_seq *seq)
{
int rc;
ENTRY;
down(&seq->seq_sem);
+ rc = __seq_client_alloc_meta(seq);
+ up(&seq->seq_sem);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_meta);
+
+/* allocate new sequence for client (llite or MDC are expected to use this) */
+static int
+__seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+{
+ int rc = 0;
+ ENTRY;
+
LASSERT(range_is_sane(&seq->seq_range));
/* if we still have free sequences in meta-sequence we allocate new seq
- * from given range. */
- if (seq->seq_range.lr_end > seq->seq_range.lr_start) {
- *seqnr = seq->seq_range.lr_start;
- seq->seq_range.lr_start += 1;
- rc = 0;
- } else {
- /* meta-sequence is exhausted, request MDT to allocate new
- * meta-sequence for us. */
- rc = seq_client_alloc_meta(seq);
+ * from given range, if not - allocate new meta-sequence. */
+ if (range_space(&seq->seq_range) == 0) {
+ rc = __seq_client_alloc_meta(seq);
if (rc) {
CERROR("can't allocate new meta-sequence, "
"rc %d\n", rc);
}
-
- *seqnr = seq->seq_range.lr_start;
- seq->seq_range.lr_start += 1;
}
- up(&seq->seq_sem);
-
+
+ *seqnr = seq->seq_range.lr_start;
+ seq->seq_range.lr_start += 1;
+
if (rc == 0) {
- CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated sequence "
- "["LPX64"]\n", *seqnr);
+ CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated "
+ "sequence ["LPX64"]\n", *seqnr);
}
RETURN(rc);
}
+
+int
+seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+{
+ int rc = 0;
+ ENTRY;
+
+ down(&seq->seq_sem);
+ rc = __seq_client_alloc_seq(seq, seqnr);
+ up(&seq->seq_sem);
+
+ RETURN(rc);
+}
EXPORT_SYMBOL(seq_client_alloc_seq);
int
{
/* allocate new sequence for case client hass no sequence at all
* or sequnece is exhausted and should be switched. */
- up(&seq->seq_sem);
- rc = seq_client_alloc_seq(seq, &seqnr);
- down(&seq->seq_sem);
+ rc = __seq_client_alloc_seq(seq, &seqnr);
if (rc) {
CERROR("can't allocate new sequence, "
"rc %d\n", rc);
seq->seq_flags = flags;
fid_zero(&seq->seq_fid);
+ range_zero(&seq->seq_range);
sema_init(&seq->seq_sem, 1);
-
- seq->seq_range.lr_end = 0;
- seq->seq_range.lr_start = 0;
-
- if (exp != NULL)
- seq->seq_exp = class_export_get(exp);
+ seq->seq_exp = class_export_get(exp);
CDEBUG(D_INFO|D_WARNING, "Client Sequence "
"Manager initialized\n");
req->rq_replen = lustre_msg_size(1, &mf_size);
req->rq_request_portal = MDS_FLD_PORTAL;
+
rc = ptlrpc_queue_wait(req);
if (rc)
GOTO(out_req, rc);
*/
struct dt_device_operations {
/*
- * Method for getting/setting device wide back stored config data,
- * like last used meta-sequence, etc.
- *
- * XXX this is ioctl()-like interface we want to get rid of.
- */
- int (*dt_config) (const struct lu_context *ctx,
- struct dt_device *dev, const char *name,
- void *buf, int size, int mode);
- /*
* Return device-wide statistics.
*/
int (*dt_statfs)(const struct lu_context *ctx,
__u64 lr_end;
};
+static inline __u64 range_space(struct lu_range *r)
+{
+ return r->lr_end - r->lr_start;
+}
+
+static inline void range_zero(struct lu_range *r)
+{
+ r->lr_start = r->lr_end = 0;
+}
+
static inline int range_is_sane(struct lu_range *r)
{
if (r->lr_end >= r->lr_start)
return 0;
}
+static inline int range_is_zero(struct lu_range *r)
+{
+ return (r->lr_start == 0 && r->lr_end == 0);
+}
+
+static inline int range_is_exhausted(struct lu_range *r)
+{
+ return range_space(r) == 0;
+}
+
struct lu_fid {
__u64 f_seq; /* holds fid sequence. Lustre should support 2 ^ 64
* objects, thus even if one sequence has one object we
};
struct md_device_operations {
- /* method for getting/setting device wide back stored config data, like
- * last used meta-sequence, etc. */
- int (*mdo_config) (const struct lu_context *ctx,
- struct md_device *m, const char *name,
- void *buf, int size, int mode);
-
/* meta-data device related handlers. */
int (*mdo_root_get)(const struct lu_context *ctx,
struct md_device *m, struct lu_fid *f);
if (cli->cl_seq == NULL)
RETURN(-ENOMEM);
- /* init client side of sequence-manager */
+ /* init client side sequence-manager */
rc = seq_client_init(cli->cl_seq, exp, 0);
+ if (rc)
+ GOTO(out_free_seq, rc);
+
+ /* pre-allocate meta-sequence */
+ rc = seq_client_alloc_meta(cli->cl_seq);
if (rc) {
- OBD_FREE_PTR(cli->cl_seq);
- cli->cl_seq = NULL;
+ CERROR("can't allocate new mata-sequence, "
+ "rc %d\n", rc);
+ GOTO(out_free_seq, rc);
}
RETURN(rc);
+
+out_free_seq:
+ OBD_FREE_PTR(cli->cl_seq);
+ cli->cl_seq = NULL;
+ return rc;
}
static int mdc_fid_fini(struct obd_export *exp)
RETURN(0);
}
-static int mdd_config(const struct lu_context *ctx, struct md_device *m,
- const char *name, void *buf, int size, int mode)
-{
- struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
- int rc;
- ENTRY;
-
- rc = mdd_child_ops(mdd)->dt_config(ctx, mdd->mdd_child,
- name, buf, size, mode);
- RETURN(rc);
-}
-
static int mdd_statfs(const struct lu_context *ctx,
struct md_device *m, struct kstatfs *sfs) {
struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
struct md_device_operations mdd_ops = {
.mdo_root_get = mdd_root_get,
- .mdo_config = mdd_config,
.mdo_statfs = mdd_statfs,
};
o->oo_inode ? o->oo_inode->i_generation : 0);
}
-static int osd_config(const struct lu_context *ctx,
- struct dt_device *d, const char *name,
- void *buf, int size, int mode)
-{
- if (mode == LUSTRE_CONFIG_GET) {
- /* to be continued */
- return 0;
- } else {
- /* to be continued */
- return 0;
- }
-}
-
static int osd_statfs(const struct lu_context *ctx,
struct dt_device *d, struct kstatfs *sfs)
{
static struct dt_device_operations osd_dt_ops = {
.dt_root_get = osd_root_get,
- .dt_config = osd_config,
.dt_statfs = osd_statfs,
.dt_trans_start = osd_trans_start,
.dt_trans_stop = osd_trans_stop,