rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
if (rc == ELDLM_LOCK_ABORTED) {
/* Before we return, swab the reply */
- reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF,
- sizeof(*reply),
- lustre_swab_ldlm_reply);
- if (reply == NULL) {
- CERROR("Can't unpack ldlm_reply\n");
+ reply = req_capsule_server_get(&req->rq_pill,
+ &RMF_DLM_REP);
+ if (reply == NULL)
rc = -EPROTO;
- }
if (lvb_len) {
- void *tmplvb;
- tmplvb = lustre_swab_repbuf(req,
- DLM_REPLY_REC_OFF,
- lvb_len,
- lvb_swabber);
+ struct ost_lvb *tmplvb;
+
+ req_capsule_set_size(&req->rq_pill,
+ &RMF_DLM_LVB, RCL_SERVER,
+ lvb_len);
+ tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+ &RMF_DLM_LVB,
+ lvb_swabber);
if (tmplvb == NULL)
GOTO(cleanup, rc = -EPROTO);
if (lvb != NULL)
GOTO(cleanup, rc);
}
- reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
- lustre_swab_ldlm_reply);
- if (reply == NULL) {
- CERROR("Can't unpack ldlm_reply\n");
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ if (reply == NULL)
GOTO(cleanup, rc = -EPROTO);
- }
/* lock enqueued on the server */
cleanup_phase = 0;
* clobber the LVB with an older one. */
if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
void *tmplvb;
- tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len,
- lvb_swabber);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ lvb_len);
+ tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+ &RMF_DLM_LVB,
+ lvb_swabber);
if (tmplvb == NULL)
GOTO(cleanup, rc = -EPROTO);
memcpy(lock->l_lvb_data, tmplvb, lvb_len);
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
* @count locks in @cancels. */
-struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
- int opc, int bufcount, int *size,
- int bufoff, int canceloff,
- struct list_head *cancels, int count)
+int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
+ int version, int opc, int canceloff,
+ struct list_head *cancels, int count)
{
- struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
- int flags, avail, to_free, pack = 0;
- struct ldlm_request *dlm = NULL;
- struct ptlrpc_request *req;
- CFS_LIST_HEAD(head);
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct req_capsule *pill = &req->rq_pill;
+ struct ldlm_request *dlm = NULL;
+ int flags, avail, to_free, bufcount, pack = 0;
+ int rc;
ENTRY;
- if (cancels == NULL)
- cancels = &head;
+
+ LASSERT(cancels != NULL);
+
if (exp_connect_cancelset(exp)) {
/* Estimate the amount of available space in the request. */
- avail = ldlm_req_handles_avail(exp, size, bufcount,
- bufoff, canceloff);
+ bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+ avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+ bufcount, bufcount - 1, canceloff);
flags = ns_connect_lru_resize(ns) ?
LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
to_free = !ns_connect_lru_resize(ns) &&
pack = count;
else
pack = avail;
- size[bufoff] = ldlm_request_bufsize(pack, opc);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+ ldlm_request_bufsize(count, opc));
}
- req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
- opc, bufcount, size, NULL);
- if (exp_connect_cancelset(exp) && req) {
+
+ rc = ptlrpc_request_pack(req, version, opc);
+ if (rc) {
+ ldlm_lock_list_put(cancels, l_bl_ast, count);
+ RETURN(rc);
+ }
+
+ if (exp_connect_cancelset(exp)) {
if (canceloff) {
- dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
- sizeof(*dlm));
+ dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
+ LASSERT(dlm);
/* Skip first lock handler in ldlm_request_pack(),
* this method will incrment @lock_count according
* to the lock handle amount actually written to
dlm->lock_count = canceloff;
}
/* Pack into the request @pack lock handles. */
- ldlm_cli_cancel_list(cancels, pack, req, bufoff, 0);
+ ldlm_cli_cancel_list(cancels, pack, req, 0);
/* Prepare and send separate cancel rpc for others. */
- ldlm_cli_cancel_list(cancels, count - pack, NULL, 0, 0);
+ ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
} else {
ldlm_lock_list_put(cancels, l_bl_ast, count);
}
- RETURN(req);
+ RETURN(0);
}
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
- int bufcount, int *size,
- struct list_head *cancels,
- int count)
+int ldlm_prep_enqueue_req(struct obd_export *exp,
+ struct ptlrpc_request *req,
+ struct list_head *cancels,
+ int count)
{
- return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
- bufcount, size, DLM_LOCKREQ_OFF,
+ return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
}
struct lustre_handle *lockh, int async)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
- struct ldlm_lock *lock;
- struct ldlm_request *body;
- struct ldlm_reply *reply;
- int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(*body),
- [DLM_REPLY_REC_OFF] = lvb_len };
- int is_replay = *flags & LDLM_FL_REPLAY;
- int req_passed_in = 1, rc, err;
+ struct ldlm_lock *lock;
+ struct ldlm_request *body;
+ int is_replay = *flags & LDLM_FL_REPLAY;
+ int req_passed_in = 1;
+ int rc, err;
struct ptlrpc_request *req;
ENTRY;
/* lock not sent to server yet */
if (reqp == NULL || *reqp == NULL) {
- req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+ req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+ &RQF_LDLM_ENQUEUE,
+ LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE);
if (req == NULL) {
failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
LDLM_LOCK_PUT(lock);
if (reqp)
*reqp = req;
} else {
+ int len;
+
req = *reqp;
- LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
- sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n",
- DLM_LOCKREQ_OFF,
- lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),
- sizeof(*body));
+ len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
+ RCL_CLIENT);
+ LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
+ DLM_LOCKREQ_OFF, len, sizeof(*body));
}
lock->l_conn_export = exp;
lock->l_blocking_ast = einfo->ei_cb_bl;
/* Dump lock data into the request buffer */
- body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+ body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
ldlm_lock2desc(lock, &body->lock_desc);
body->lock_flags = *flags;
body->lock_handle[0] = *lockh;
/* Continue as normal. */
if (!req_passed_in) {
- size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
- ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);
+ if (lvb_len > 0) {
+ req_capsule_extend(&req->rq_pill,
+ &RQF_LDLM_ENQUEUE_LVB);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+ RCL_SERVER, lvb_len);
+ }
+ ptlrpc_request_set_replen(req);
}
/*
accounting */
int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
{
- struct ldlm_request *body;
- struct ldlm_reply *reply;
- struct ldlm_lock *lock;
- struct ldlm_resource *res;
+ struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ struct ldlm_lock *lock;
+ struct ldlm_resource *res;
struct ptlrpc_request *req;
- int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(*body) };
- int rc;
+ int rc;
ENTRY;
lock = ldlm_handle2lock(lockh);
LDLM_DEBUG(lock, "client-side convert");
- req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
- LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL);
- if (!req)
- GOTO(out, rc = -ENOMEM);
+ req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
+ &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
+ LDLM_CONVERT);
+ if (req == NULL) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(-ENOMEM);
+ }
- body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+ body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
body->lock_handle[0] = lock->l_remote_handle;
body->lock_desc.l_req_mode = new_mode;
body->lock_flags = *flags;
- size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
- ptlrpc_req_set_repsize(req, 2, size);
+ ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req);
if (rc != ELDLM_OK)
GOTO(out, rc);
- reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
- lustre_swab_ldlm_reply);
- if (reply == NULL) {
- CERROR ("Can't unpack ldlm_reply\n");
- GOTO (out, rc = -EPROTO);
- }
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ if (reply == NULL)
+ GOTO(out, rc = -EPROTO);
if (req->rq_status)
GOTO(out, rc = req->rq_status);
/* Pack @count locks in @head into ldlm_request buffer at the offset @off,
of the request @req. */
-static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
+static void ldlm_cancel_pack(struct ptlrpc_request *req,
struct list_head *head, int count)
{
struct ldlm_request *dlm;
int max, packed = 0;
ENTRY;
- dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
+ dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
LASSERT(dlm != NULL);
/* Check the room in the request buffer. */
- max = lustre_msg_buflen(req->rq_reqmsg, off) -
+ max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
sizeof(struct ldlm_request);
max /= sizeof(struct lustre_handle);
max += LDLM_LOCKREQ_HANDLES;
int count, int flags)
{
struct ptlrpc_request *req = NULL;
- struct ldlm_request *body;
- int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(*body) };
struct obd_import *imp;
int free, sent = 0;
int rc = 0;
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
RETURN(count);
- free = ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
- if (count > free)
- count = free;
-
- size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);
while (1) {
+ int bufcount;
+ struct req_capsule *pill;
imp = class_exp2cliimp(exp);
if (imp == NULL || imp->imp_invalid) {
CDEBUG(D_DLMTRACE,
RETURN(count);
}
- req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
- size, NULL);
- if (!req)
+ req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
+ if (req == NULL)
GOTO(out, rc = -ENOMEM);
+ pill = &req->rq_pill;
+ bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+
+ free = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+ bufcount, bufcount, 0);
+ if (count > free)
+ count = free;
+
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+ ldlm_request_bufsize(count, LDLM_CANCEL));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
+ if (rc) {
+ ptlrpc_request_free(req);
+ GOTO(out, rc);
+ }
req->rq_no_resend = 1;
req->rq_no_delay = 1;
req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
- body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
- sizeof(*body));
- ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count);
+ ldlm_cancel_pack(req, cancels, count);
- ptlrpc_req_set_repsize(req, 1, NULL);
+ ptlrpc_request_set_replen(req);
if (flags & LDLM_FL_ASYNC) {
ptlrpcd_add_req(req);
sent = count;
count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
LDLM_FL_BL_AST, flags);
}
- ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0);
RETURN(0);
}
}
if (bl_ast > 0) {
count -= bl_ast;
- ldlm_cli_cancel_list(&head, bl_ast, NULL, 0, 0);
+ ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
}
RETURN(count);
/* If an error occured in ASYNC mode, or
* this is SYNC mode, cancel the list. */
- ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0);
RETURN(count);
}
* buffer at the offset @off.
* Destroy @cancels at the end. */
int ldlm_cli_cancel_list(struct list_head *cancels, int count,
- struct ptlrpc_request *req, int off, int flags)
+ struct ptlrpc_request *req, int flags)
{
struct ldlm_lock *lock;
int res = 0;
if (exp_connect_cancelset(lock->l_conn_export)) {
res = count;
if (req)
- ldlm_cancel_pack(req, off, cancels, count);
+ ldlm_cancel_pack(req, cancels, count);
else
res = ldlm_cli_cancel_req(lock->l_conn_export,
cancels, count,
count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
0, flags, opaque);
- rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
+ rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
static int replay_lock_interpret(struct ptlrpc_request *req,
struct ldlm_async_args *aa, int rc)
{
- struct ldlm_lock *lock;
+ struct ldlm_lock *lock;
struct ldlm_reply *reply;
ENTRY;
GOTO(out, rc);
- reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
- lustre_swab_ldlm_reply);
- if (reply == NULL) {
- CERROR("Can't unpack ldlm_reply\n");
- GOTO (out, rc = -EPROTO);
- }
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ if (reply == NULL)
+ GOTO(out, rc = -EPROTO);
lock = ldlm_handle2lock(&aa->lock_handle);
if (!lock) {
static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
{
struct ptlrpc_request *req;
- struct ldlm_request *body;
- struct ldlm_reply *reply;
struct ldlm_async_args *aa;
- int buffers = 2;
- int size[3] = { sizeof(struct ptlrpc_body) };
+ struct ldlm_request *body;
int flags;
ENTRY;
else
flags = LDLM_FL_REPLAY;
- size[DLM_LOCKREQ_OFF] = sizeof(*body);
- req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 2, size,
- NULL);
- if (!req)
+ req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
+ LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+ if (req == NULL)
RETURN(-ENOMEM);
/* We're part of recovery, so don't wait for it. */
req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
- body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+ body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
ldlm_lock2desc(lock, &body->lock_desc);
body->lock_flags = flags;
ldlm_lock2handle(lock, &body->lock_handle[0]);
- size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
if (lock->l_lvb_len != 0) {
- buffers = 3;
- size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
+ req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ lock->l_lvb_len);
}
- ptlrpc_req_set_repsize(req, buffers, size);
+ ptlrpc_request_set_replen(req);
/* notify the server we've replayed all requests.
* also, we mark the request to be put on a dedicated
* queue to be processed after all request replayes.