Whamcloud - gitweb
b=14149
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 852cea8..7110846 100644 (file)
@@ -346,19 +346,19 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
                 if (rc == ELDLM_LOCK_ABORTED) {
                         /* Before we return, swab the reply */
-                        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF,
-                                                   sizeof(*reply),
-                                                   lustre_swab_ldlm_reply);
-                        if (reply == NULL) {
-                                CERROR("Can't unpack ldlm_reply\n");
+                        reply = req_capsule_server_get(&req->rq_pill,
+                                                       &RMF_DLM_REP);
+                        if (reply == NULL)
                                 rc = -EPROTO;
-                        }
                         if (lvb_len) {
-                                void *tmplvb;
-                                tmplvb = lustre_swab_repbuf(req,
-                                                            DLM_REPLY_REC_OFF,
-                                                            lvb_len,
-                                                            lvb_swabber);
+                                struct ost_lvb *tmplvb;
+
+                                req_capsule_set_size(&req->rq_pill,
+                                                     &RMF_DLM_LVB, RCL_SERVER,
+                                                     lvb_len);
+                            tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                                 &RMF_DLM_LVB,
+                                                                 lvb_swabber);
                                 if (tmplvb == NULL)
                                         GOTO(cleanup, rc = -EPROTO);
                                 if (lvb != NULL)
@@ -368,12 +368,9 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                 GOTO(cleanup, rc);
         }
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
                 GOTO(cleanup, rc = -EPROTO);
-        }
 
         /* lock enqueued on the server */
         cleanup_phase = 0;
@@ -447,8 +444,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
          * clobber the LVB with an older one. */
         if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
                 void *tmplvb;
-                tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len,
-                                            lvb_swabber);
+
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                     lvb_len);
+                tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                     &RMF_DLM_LVB,
+                                                     lvb_swabber);
                 if (tmplvb == NULL)
                         GOTO(cleanup, rc = -EPROTO);
                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
@@ -511,24 +512,25 @@ static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
  * @count locks in @cancels. */
-struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
-                                         int opc, int bufcount, int *size,
-                                         int bufoff, int canceloff,
-                                         struct list_head *cancels, int count)
+int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
+                      int version, int opc, int canceloff,
+                      struct list_head *cancels, int count)
 {
-        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        int flags, avail, to_free, pack = 0;
-        struct ldlm_request *dlm = NULL;
-        struct ptlrpc_request *req;
-        CFS_LIST_HEAD(head);
+        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
+        struct req_capsule      *pill = &req->rq_pill;
+        struct ldlm_request     *dlm = NULL;
+        int flags, avail, to_free, bufcount, pack = 0;
+        int rc;
         ENTRY;
 
-        if (cancels == NULL)
-                cancels = &head;
+
+        LASSERT(cancels != NULL);
+
         if (exp_connect_cancelset(exp)) {
                 /* Estimate the amount of available space in the request. */
-                avail = ldlm_req_handles_avail(exp, size, bufcount,
-                                               bufoff, canceloff);
+                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+                avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+                                               bufcount, bufcount - 1, canceloff);
                 flags = ns_connect_lru_resize(ns) ? 
                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
                 to_free = !ns_connect_lru_resize(ns) &&
@@ -544,14 +546,20 @@ struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
                         pack = count;
                 else
                         pack = avail;
-                size[bufoff] = ldlm_request_bufsize(pack, opc);
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, opc));
         }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
-                              opc, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req) {
+
+        rc = ptlrpc_request_pack(req, version, opc);
+        if (rc) {
+                ldlm_lock_list_put(cancels, l_bl_ast, count);
+                RETURN(rc);
+        }
+
+        if (exp_connect_cancelset(exp)) {
                 if (canceloff) {
-                        dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
-                                             sizeof(*dlm));
+                        dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
+                        LASSERT(dlm);
                         /* Skip first lock handler in ldlm_request_pack(),
                          * this method will incrment @lock_count according
                          * to the lock handle amount actually written to
@@ -559,22 +567,21 @@ struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
                         dlm->lock_count = canceloff;
                 }
                 /* Pack into the request @pack lock handles. */
-                ldlm_cli_cancel_list(cancels, pack, req, bufoff, 0);
+                ldlm_cli_cancel_list(cancels, pack, req, 0);
                 /* Prepare and send separate cancel rpc for others. */
-                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0, 0);
+                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
         } else {
                 ldlm_lock_list_put(cancels, l_bl_ast, count);
         }
-        RETURN(req);
+        RETURN(0);
 }
 
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
-                                             int bufcount, int *size,
-                                             struct list_head *cancels,
-                                             int count)
+int ldlm_prep_enqueue_req(struct obd_export *exp,
+                          struct ptlrpc_request *req,
+                          struct list_head *cancels,
+                          int count)
 {
-        return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
-                                 bufcount, size, DLM_LOCKREQ_OFF,
+        return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
 }
 
@@ -592,14 +599,11 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                      struct lustre_handle *lockh, int async)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        struct ldlm_lock *lock;
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
-        int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body),
-                        [DLM_REPLY_REC_OFF]   = lvb_len };
-        int is_replay = *flags & LDLM_FL_REPLAY;
-        int req_passed_in = 1, rc, err;
+        struct ldlm_lock      *lock;
+        struct ldlm_request   *body;
+        int                    is_replay = *flags & LDLM_FL_REPLAY;
+        int                    req_passed_in = 1;
+        int                    rc, err;
         struct ptlrpc_request *req;
         ENTRY;
 
@@ -646,7 +650,10 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         /* lock not sent to server yet */
 
         if (reqp == NULL || *reqp == NULL) {
-                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+                req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                                &RQF_LDLM_ENQUEUE,
+                                                LUSTRE_DLM_VERSION,
+                                                LDLM_ENQUEUE);
                 if (req == NULL) {
                         failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
                         LDLM_LOCK_PUT(lock);
@@ -656,12 +663,13 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                 if (reqp)
                         *reqp = req;
         } else {
+                int len;
+
                 req = *reqp;
-                LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
-                         sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n",
-                         DLM_LOCKREQ_OFF,
-                         lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),
-                         sizeof(*body));
+                len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
+                                           RCL_CLIENT);
+                LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
+                         DLM_LOCKREQ_OFF, len, sizeof(*body));
         }
 
         lock->l_conn_export = exp;
@@ -669,15 +677,20 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         lock->l_blocking_ast = einfo->ei_cb_bl;
 
         /* Dump lock data into the request buffer */
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = *flags;
         body->lock_handle[0] = *lockh;
 
         /* Continue as normal. */
         if (!req_passed_in) {
-                size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
-                ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);
+                if (lvb_len > 0) {
+                        req_capsule_extend(&req->rq_pill,
+                                           &RQF_LDLM_ENQUEUE_LVB);
+                        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+                                             RCL_SERVER, lvb_len);
+                }
+                ptlrpc_request_set_replen(req);
         }
 
         /*
@@ -745,14 +758,12 @@ static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
    accounting */
 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
 {
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
-        struct ldlm_lock *lock;
-        struct ldlm_resource *res;
+        struct ldlm_request   *body;
+        struct ldlm_reply     *reply;
+        struct ldlm_lock      *lock;
+        struct ldlm_resource  *res;
         struct ptlrpc_request *req;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int rc;
+        int                    rc;
         ENTRY;
 
         lock = ldlm_handle2lock(lockh);
@@ -767,30 +778,29 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
-                              LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
+                                        &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
+                                        LDLM_CONVERT);
+        if (req == NULL) {
+                LDLM_LOCK_PUT(lock);
+                RETURN(-ENOMEM);
+        }
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         body->lock_handle[0] = lock->l_remote_handle;
 
         body->lock_desc.l_req_mode = new_mode;
         body->lock_flags = *flags;
 
-        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
-        ptlrpc_req_set_repsize(req, 2, size);
 
+        ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR ("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
+                GOTO(out, rc = -EPROTO);
 
         if (req->rq_status)
                 GOTO(out, rc = req->rq_status);
@@ -862,7 +872,7 @@ static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
 
 /* Pack @count locks in @head into ldlm_request buffer at the offset @off,
    of the request @req. */
-static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
+static void ldlm_cancel_pack(struct ptlrpc_request *req,
                              struct list_head *head, int count)
 {
         struct ldlm_request *dlm;
@@ -870,11 +880,11 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
         int max, packed = 0;
         ENTRY;
 
-        dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
+        dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         LASSERT(dlm != NULL);
 
         /* Check the room in the request buffer. */
-        max = lustre_msg_buflen(req->rq_reqmsg, off) - 
+        max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) - 
                 sizeof(struct ldlm_request);
         max /= sizeof(struct lustre_handle);
         max += LDLM_LOCKREQ_HANDLES;
@@ -902,9 +912,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                         int count, int flags)
 {
         struct ptlrpc_request *req = NULL;
-        struct ldlm_request *body;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                [DLM_LOCKREQ_OFF]     = sizeof(*body) };
         struct obd_import *imp;
         int free, sent = 0;
         int rc = 0;
@@ -916,12 +923,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
                 RETURN(count);
 
-        free = ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
-        if (count > free)
-                count = free;
-
-        size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);
         while (1) {
+                int bufcount;
+                struct req_capsule *pill; 
                 imp = class_exp2cliimp(exp);
                 if (imp == NULL || imp->imp_invalid) {
                         CDEBUG(D_DLMTRACE,
@@ -929,11 +933,26 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                         RETURN(count);
                 }
 
-                req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
-                                      size, NULL);
-                if (!req)
+                req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
+                if (req == NULL)
                         GOTO(out, rc = -ENOMEM);
 
+                pill = &req->rq_pill;
+                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+
+                free = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+                                              bufcount, bufcount, 0);
+                if (count > free)
+                        count = free;
+
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, LDLM_CANCEL));
+
+                rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
+                if (rc) {
+                        ptlrpc_request_free(req);
+                        GOTO(out, rc);
+                }
                 req->rq_no_resend = 1;
                 req->rq_no_delay = 1;
 
@@ -941,11 +960,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
 
-                body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
-                                      sizeof(*body));
-                ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count);
+                ldlm_cancel_pack(req, cancels, count);
 
-                ptlrpc_req_set_repsize(req, 1, NULL);
+                ptlrpc_request_set_replen(req);
                 if (flags & LDLM_FL_ASYNC) {
                         ptlrpcd_add_req(req);
                         sent = count;
@@ -1076,7 +1093,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
                                                LDLM_FL_BL_AST, flags);
         }
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(0);
 }
 
@@ -1116,7 +1133,7 @@ static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
         }
         if (bl_ast > 0) {
                 count -= bl_ast;
-                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0, 0);
+                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
         }
 
         RETURN(count);
@@ -1384,7 +1401,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
 
         /* If an error occured in ASYNC mode, or
          * this is SYNC mode, cancel the list. */
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(count);
 }
 
@@ -1455,7 +1472,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
  * buffer at the offset @off.
  * Destroy @cancels at the end. */
 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
-                         struct ptlrpc_request *req, int off, int flags)
+                         struct ptlrpc_request *req, int flags)
 {
         struct ldlm_lock *lock;
         int res = 0;
@@ -1477,7 +1494,7 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                 if (exp_connect_cancelset(lock->l_conn_export)) {
                         res = count;
                         if (req)
-                                ldlm_cancel_pack(req, off, cancels, count);
+                                ldlm_cancel_pack(req, cancels, count);
                         else
                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
                                                           cancels, count,
@@ -1519,7 +1536,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
         count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
                                            0, flags, opaque);
-        rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
+        rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
         if (rc != ELDLM_OK)
                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
 
@@ -1773,7 +1790,7 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
 static int replay_lock_interpret(struct ptlrpc_request *req,
                                  struct ldlm_async_args *aa, int rc)
 {
-        struct ldlm_lock *lock;
+        struct ldlm_lock  *lock;
         struct ldlm_reply *reply;
 
         ENTRY;
@@ -1782,12 +1799,9 @@ static int replay_lock_interpret(struct ptlrpc_request *req,
                 GOTO(out, rc);
 
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
+                GOTO(out, rc = -EPROTO);
 
         lock = ldlm_handle2lock(&aa->lock_handle);
         if (!lock) {
@@ -1814,11 +1828,8 @@ out:
 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 {
         struct ptlrpc_request *req;
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
         struct ldlm_async_args *aa;
-        int buffers = 2;
-        int size[3] = { sizeof(struct ptlrpc_body) };
+        struct ldlm_request   *body;
         int flags;
         ENTRY;
 
@@ -1860,26 +1871,25 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         else
                 flags = LDLM_FL_REPLAY;
 
-        size[DLM_LOCKREQ_OFF] = sizeof(*body);
-        req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 2, size,
-                              NULL);
-        if (!req)
+        req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
+                                        LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        if (req == NULL)
                 RETURN(-ENOMEM);
 
         /* We're part of recovery, so don't wait for it. */
         req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = flags;
 
         ldlm_lock2handle(lock, &body->lock_handle[0]);
-        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
         if (lock->l_lvb_len != 0) {
-                buffers = 3;
-                size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
+                req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                     lock->l_lvb_len);
         }
-        ptlrpc_req_set_repsize(req, buffers, size);
+        ptlrpc_request_set_replen(req);
         /* notify the server we've replayed all requests.
          * also, we mark the request to be put on a dedicated
          * queue to be processed after all request replayes.