#include <lustre_fid.h>
#include "fid_internal.h"
-/* XXX: this should use new req-layout interface */
-static int
-seq_client_rpc(struct lu_client_seq *seq,
- struct lu_range *range,
- __u32 opc)
+static int seq_client_rpc(struct lu_client_seq *seq,
+ struct lu_range *range,
+ __u32 opc, const char *opcname)
{
- struct obd_export *exp = seq->seq_exp;
- int repsize = sizeof(struct lu_range);
- int rc, reqsize = sizeof(__u32);
+ struct obd_export *exp = seq->lcs_exp;
+ int repsize[2] = { sizeof(struct ptlrpc_body),
+ sizeof(struct lu_range) };
+ int rc, reqsize[2] = { sizeof(struct ptlrpc_body),
+ sizeof(__u32) };
struct ptlrpc_request *req;
+ struct req_capsule pill;
struct lu_range *ran;
__u32 *op;
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp),
+ req = ptlrpc_prep_req(class_exp2cliimp(exp),
LUSTRE_MDS_VERSION,
- SEQ_QUERY, 1, &reqsize,
+ SEQ_QUERY, 2, reqsize,
NULL);
if (req == NULL)
RETURN(-ENOMEM);
- op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
+ req_capsule_init(&pill, req, RCL_CLIENT, NULL);
+
+ req_capsule_set(&pill, &RQF_SEQ_QUERY);
+
+ op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
*op = opc;
- req->rq_replen = lustre_msg_size(1, &repsize);
-
- req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
- SEQ_CTLR_PORTAL : SEQ_SRV_PORTAL;
+ ptlrpc_req_set_repsize(req, 2, repsize);
+
+ if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
+ req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
+ SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
+ } else {
+ req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
+ SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
+ }
rc = ptlrpc_queue_wait(req);
if (rc)
GOTO(out_req, rc);
- ran = lustre_swab_repbuf(req, 0, sizeof(*ran),
- lustre_swab_lu_range);
-
- if (ran == NULL) {
- CERROR("invalid range is returned\n");
+ ran = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
+ if (ran == NULL)
GOTO(out_req, rc = -EPROTO);
- }
*range = *ran;
-
- LASSERT(range_is_sane(range));
- LASSERT(!range_is_exhausted(range));
-
- EXIT;
-out_req:
- ptlrpc_req_finished(req);
- return rc;
-}
-static int
-__seq_client_alloc_opc(struct lu_client_seq *seq,
- int opc, const char *opcname)
-{
- int rc;
- ENTRY;
+ if (!range_is_sane(range)) {
+ CERROR("invalid seq range obtained from server: "
+ DRANGE"\n", PRANGE(range));
+ GOTO(out_req, rc = -EINVAL);
+ }
+
+ if (range_is_exhausted(range)) {
+ CERROR("seq range obtained from server is exhausted: "
+ DRANGE"]\n", PRANGE(range));
+ GOTO(out_req, rc = -EINVAL);
+ }
- rc = seq_client_rpc(seq, &seq->seq_range, opc);
if (rc == 0) {
- CDEBUG(D_INFO, "SEQ-MGR(cli): allocated "
- "%s-sequence ["LPX64"-"LPX64"]\n",
- opcname, seq->seq_range.lr_start,
- seq->seq_range.lr_end);
+ CDEBUG(D_INFO, "%s: allocated %s-sequence "
+ DRANGE"]\n", seq->lcs_name, opcname,
+ PRANGE(range));
}
- RETURN(rc);
+
+ EXIT;
+out_req:
+ req_capsule_fini(&pill);
+ ptlrpc_req_finished(req);
+ return rc;
}
/* request sequence-controller node to allocate new super-sequence. */
-static int
-__seq_client_alloc_super(struct lu_client_seq *seq)
+static int __seq_client_alloc_super(struct lu_client_seq *seq)
{
- ENTRY;
- RETURN(__seq_client_alloc_opc(seq, SEQ_ALLOC_SUPER, "super"));
+ return seq_client_rpc(seq, &seq->lcs_range,
+ SEQ_ALLOC_SUPER, "super");
}
-int
-seq_client_alloc_super(struct lu_client_seq *seq)
+int seq_client_alloc_super(struct lu_client_seq *seq)
{
int rc;
ENTRY;
-
- down(&seq->seq_sem);
+
+ down(&seq->lcs_sem);
rc = __seq_client_alloc_super(seq);
- up(&seq->seq_sem);
+ up(&seq->lcs_sem);
RETURN(rc);
}
EXPORT_SYMBOL(seq_client_alloc_super);
/* request sequence-controller node to allocate new meta-sequence. */
-static int
-__seq_client_alloc_meta(struct lu_client_seq *seq)
+static int __seq_client_alloc_meta(struct lu_client_seq *seq)
{
- ENTRY;
- RETURN(__seq_client_alloc_opc(seq, SEQ_ALLOC_META, "meta"));
+ return seq_client_rpc(seq, &seq->lcs_range,
+ SEQ_ALLOC_META, "meta");
}
-int
-seq_client_alloc_meta(struct lu_client_seq *seq)
+int seq_client_alloc_meta(struct lu_client_seq *seq)
{
int rc;
ENTRY;
- down(&seq->seq_sem);
+ down(&seq->lcs_sem);
rc = __seq_client_alloc_meta(seq);
- up(&seq->seq_sem);
+ up(&seq->lcs_sem);
RETURN(rc);
}
EXPORT_SYMBOL(seq_client_alloc_meta);
/* allocate new sequence for client (llite or MDC are expected to use this) */
-static int
-__seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
+static int __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
{
int rc = 0;
ENTRY;
- LASSERT(range_is_sane(&seq->seq_range));
+ LASSERT(range_is_sane(&seq->lcs_range));
/* if we still have free sequences in meta-sequence we allocate new seq
* from given range, if not - allocate new meta-sequence. */
- if (range_space(&seq->seq_range) == 0) {
+ if (range_space(&seq->lcs_range) == 0) {
rc = __seq_client_alloc_meta(seq);
if (rc) {
CERROR("can't allocate new meta-sequence, "
RETURN(rc);
}
}
-
- *seqnr = seq->seq_range.lr_start;
- seq->seq_range.lr_start++;
-
- CDEBUG(D_INFO, "SEQ-MGR(cli): allocated "
- "sequence ["LPX64"]\n", *seqnr);
+
+ LASSERT(range_space(&seq->lcs_range) > 0);
+ *seqnr = seq->lcs_range.lr_start;
+ seq->lcs_range.lr_start++;
+
+ CDEBUG(D_INFO, "%s: allocated sequence ["LPX64"]\n",
+ seq->lcs_name, *seqnr);
RETURN(rc);
}
-int
-seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
+int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
{
int rc = 0;
ENTRY;
- down(&seq->seq_sem);
+ down(&seq->lcs_sem);
rc = __seq_client_alloc_seq(seq, seqnr);
- up(&seq->seq_sem);
+ up(&seq->lcs_sem);
RETURN(rc);
}
EXPORT_SYMBOL(seq_client_alloc_seq);
-int
-seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
+int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
{
- seqno_t seqnr = 0;
int rc;
ENTRY;
LASSERT(fid != NULL);
- down(&seq->seq_sem);
+ down(&seq->lcs_sem);
- if (!fid_is_sane(&seq->seq_fid) ||
- fid_oid(&seq->seq_fid) >= seq->seq_width)
+ if (!fid_is_sane(&seq->lcs_fid) ||
+ fid_oid(&seq->lcs_fid) >= seq->lcs_width)
{
- /* allocate new sequence for case client hass no sequence at all
- * or sequnece is exhausted and should be switched. */
+ seqno_t seqnr;
+
+ /* allocate new sequence for case client has no sequence at all
+ * or sequence is exhausted and should be switched. */
rc = __seq_client_alloc_seq(seq, &seqnr);
if (rc) {
CERROR("can't allocate new sequence, "
}
/* init new fid */
- seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
- seq->seq_fid.f_seq = seqnr;
- seq->seq_fid.f_ver = 0;
+ seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
+ seq->lcs_fid.f_seq = seqnr;
+ seq->lcs_fid.f_ver = 0;
- /* inform caller that sequnece switch is performed to allow it
+ /* inform caller that sequence switch is performed to allow it
* to setup FLD for it. */
- rc = -ERESTART;
+ rc = 1;
} else {
- seq->seq_fid.f_oid++;
+ seq->lcs_fid.f_oid++;
rc = 0;
}
- *fid = seq->seq_fid;
+ *fid = seq->lcs_fid;
LASSERT(fid_is_sane(fid));
-
- CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
- PFID3(fid));
+
+ CDEBUG(D_INFO, "%s: allocated FID "DFID"\n",
+ seq->lcs_name, PFID(fid));
EXIT;
out:
- up(&seq->seq_sem);
+ up(&seq->lcs_sem);
return rc;
}
EXPORT_SYMBOL(seq_client_alloc_fid);
+static void seq_client_proc_fini(struct lu_client_seq *seq);
+
#ifdef LPROCFS
-static int
-seq_client_proc_init(struct lu_client_seq *seq)
+static int seq_client_proc_init(struct lu_client_seq *seq)
{
int rc;
ENTRY;
- seq->seq_proc_dir = lprocfs_register(seq->seq_name,
+ seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
proc_lustre_root,
NULL, NULL);
-
- if (IS_ERR(seq->seq_proc_dir)) {
+
+ if (IS_ERR(seq->lcs_proc_dir)) {
CERROR("LProcFS failed in seq-init\n");
- rc = PTR_ERR(seq->seq_proc_dir);
- GOTO(err, rc);
+ rc = PTR_ERR(seq->lcs_proc_dir);
+ RETURN(rc);
}
- rc = lprocfs_add_vars(seq->seq_proc_dir,
+ rc = lprocfs_add_vars(seq->lcs_proc_dir,
seq_client_proc_list, seq);
if (rc) {
CERROR("can't init sequence manager "
"proc, rc %d\n", rc);
- GOTO(err_dir, rc);
+ GOTO(out_cleanup, rc);
}
RETURN(0);
-err_dir:
- lprocfs_remove(seq->seq_proc_dir);
-err:
- seq->seq_proc_dir = NULL;
+out_cleanup:
+ seq_client_proc_fini(seq);
return rc;
}
-static void
-seq_client_proc_fini(struct lu_client_seq *seq)
+static void seq_client_proc_fini(struct lu_client_seq *seq)
{
ENTRY;
- if (seq->seq_proc_dir) {
- lprocfs_remove(seq->seq_proc_dir);
- seq->seq_proc_dir = NULL;
+ if (seq->lcs_proc_dir) {
+ if (!IS_ERR(seq->lcs_proc_dir))
+ lprocfs_remove(seq->lcs_proc_dir);
+ seq->lcs_proc_dir = NULL;
}
EXIT;
}
+#else
+static int seq_client_proc_init(struct lu_client_seq *seq)
+{
+ return 0;
+}
+
+static void seq_client_proc_fini(struct lu_client_seq *seq)
+{
+ return;
+}
#endif
-int
-seq_client_init(struct lu_client_seq *seq,
- const char *uuid,
- struct obd_export *exp)
+int seq_client_init(struct lu_client_seq *seq,
+ const char *uuid,
+ struct obd_export *exp,
+ enum lu_cli_type type)
{
int rc = 0;
ENTRY;
LASSERT(exp != NULL);
-
- fid_zero(&seq->seq_fid);
- range_zero(&seq->seq_range);
- sema_init(&seq->seq_sem, 1);
- seq->seq_exp = class_export_get(exp);
- seq->seq_width = LUSTRE_SEQ_MAX_WIDTH;
- snprintf(seq->seq_name, sizeof(seq->seq_name),
- "%s-%s", LUSTRE_SEQ_NAME, uuid);
+ seq->lcs_type = type;
+ fid_zero(&seq->lcs_fid);
+ range_zero(&seq->lcs_range);
+ sema_init(&seq->lcs_sem, 1);
+ seq->lcs_exp = class_export_get(exp);
+ seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
-#ifdef LPROCFS
- rc = seq_client_proc_init(seq);
-#endif
+ snprintf(seq->lcs_name, sizeof(seq->lcs_name),
+ "%s-cli-%s", LUSTRE_SEQ_NAME, uuid);
+ rc = seq_client_proc_init(seq);
if (rc)
seq_client_fini(seq);
else
{
ENTRY;
-#ifdef LPROCFS
seq_client_proc_fini(seq);
-#endif
-
- if (seq->seq_exp != NULL) {
- class_export_put(seq->seq_exp);
- seq->seq_exp = NULL;
+
+ if (seq->lcs_exp != NULL) {
+ class_export_put(seq->lcs_exp);
+ seq->lcs_exp = NULL;
}
-
+
CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager\n");
-
+
EXIT;
}
EXPORT_SYMBOL(seq_client_fini);