Whamcloud - gitweb
b=13766
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 08281b0..9ccf69c 100644 (file)
@@ -87,6 +87,20 @@ int ldlm_expired_completion_wait(void *data)
         RETURN(0);
 }
 
+static int is_granted_or_cancelled(struct ldlm_lock *lock)
+{
+        int ret = 0;
+
+        lock_res_and_lock(lock);
+        if (((lock->l_req_mode == lock->l_granted_mode) &&
+             !(lock->l_flags & LDLM_FL_CP_REQD)) ||
+            (lock->l_flags & LDLM_FL_FAILED))
+                ret = 1;
+        unlock_res_and_lock(lock);
+
+        return ret;
+}
+
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
 {
         /* XXX ALLOCATE - 160 bytes */
@@ -139,9 +153,7 @@ noreproc:
         }
 
         /* Go to sleep until the lock is granted or cancelled. */
-        rc = l_wait_event(lock->l_waitq,
-                          ((lock->l_req_mode == lock->l_granted_mode) ||
-                           (lock->l_flags & LDLM_FL_FAILED)), &lwi);
+        rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
 
         if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
@@ -245,7 +257,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         ENTRY;
 
         LASSERT(!(*flags & LDLM_FL_REPLAY));
-        if (unlikely(ns->ns_client)) {
+        if (unlikely(ns_is_client(ns))) {
                 CERROR("Trying to enqueue local lock in a shadow namespace\n");
                 LBUG();
         }
@@ -334,19 +346,19 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
                 if (rc == ELDLM_LOCK_ABORTED) {
                         /* Before we return, swab the reply */
-                        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF,
-                                                   sizeof(*reply),
-                                                   lustre_swab_ldlm_reply);
-                        if (reply == NULL) {
-                                CERROR("Can't unpack ldlm_reply\n");
+                        reply = req_capsule_server_get(&req->rq_pill,
+                                                       &RMF_DLM_REP);
+                        if (reply == NULL)
                                 rc = -EPROTO;
-                        }
                         if (lvb_len) {
-                                void *tmplvb;
-                                tmplvb = lustre_swab_repbuf(req,
-                                                            DLM_REPLY_REC_OFF,
-                                                            lvb_len,
-                                                            lvb_swabber);
+                                struct ost_lvb *tmplvb;
+
+                                req_capsule_set_size(&req->rq_pill,
+                                                     &RMF_DLM_LVB, RCL_SERVER,
+                                                     lvb_len);
+                            tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                                 &RMF_DLM_LVB,
+                                                                 lvb_swabber);
                                 if (tmplvb == NULL)
                                         GOTO(cleanup, rc = -EPROTO);
                                 if (lvb != NULL)
@@ -356,12 +368,9 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                 GOTO(cleanup, rc);
         }
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
                 GOTO(cleanup, rc = -EPROTO);
-        }
 
         /* lock enqueued on the server */
         cleanup_phase = 0;
@@ -426,7 +435,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
              * bug 7311). */
             (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
                 lock_res_and_lock(lock);
-                lock->l_flags |= LDLM_FL_CBPENDING;
+                lock->l_flags |= LDLM_FL_CBPENDING |  LDLM_FL_BL_AST;
                 unlock_res_and_lock(lock);
                 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
         }
@@ -435,8 +444,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
          * clobber the LVB with an older one. */
         if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
                 void *tmplvb;
-                tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len,
-                                            lvb_swabber);
+
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                     lvb_len);
+                tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                     &RMF_DLM_LVB,
+                                                     lvb_swabber);
                 if (tmplvb == NULL)
                         GOTO(cleanup, rc = -EPROTO);
                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
@@ -474,17 +487,18 @@ cleanup:
  * a single page on the send/receive side. XXX: 512 should be changed
  * to more adequate value. */
 static inline int ldlm_req_handles_avail(struct obd_export *exp,
-                                         int *size, int bufcount, int off)
+                                         int *size, int bufcount,
+                                         int bufoff, int off)
 {
         int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
-        int old_size = size[DLM_LOCKREQ_OFF];
+        int old_size = size[bufoff];
 
-        size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+        size[bufoff] = sizeof(struct ldlm_request);
         avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
                                  bufcount, size);
         avail /= sizeof(struct lustre_handle);
         avail += LDLM_LOCKREQ_HANDLES - off;
-        size[DLM_LOCKREQ_OFF] = old_size;
+        size[bufoff] = old_size;
 
         return avail;
 }
@@ -493,53 +507,82 @@ static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
 {
         int size[2] = { sizeof(struct ptlrpc_body),
                         sizeof(struct ldlm_request) };
-        return ldlm_req_handles_avail(exp, size, 2, 0);
+        return ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
 }
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
  * @count locks in @cancels. */
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
-                                             int bufcount, int *size,
-                                             struct list_head *cancels,
-                                             int count)
+int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
+                      int version, int opc, int canceloff,
+                      struct list_head *cancels, int count)
 {
-        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        struct ldlm_request *dlm = NULL;
-        struct ptlrpc_request *req;
-        CFS_LIST_HEAD(head);
+        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
+        struct req_capsule      *pill = &req->rq_pill;
+        struct ldlm_request     *dlm = NULL;
+        int flags, avail, to_free, bufcount, pack = 0;
+        int rc;
         ENTRY;
-        
-        if (cancels == NULL)
-                cancels = &head;
+
+
+        LASSERT(cancels != NULL);
+
         if (exp_connect_cancelset(exp)) {
                 /* Estimate the amount of available space in the request. */
-                int avail = ldlm_req_handles_avail(exp, size, bufcount,
-                                                   LDLM_ENQUEUE_CANCEL_OFF);
-                LASSERT(avail >= count);
-                
+                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+                avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+                                               bufcount, bufcount - 1, canceloff);
+                flags = ns_connect_lru_resize(ns) ? 
+                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
+                to_free = !ns_connect_lru_resize(ns) &&
+                          opc == LDLM_ENQUEUE ? 1 : 0;
+
                 /* Cancel lru locks here _only_ if the server supports 
                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
-                 * rpc right on enqueue, what will make it slower, vs. 
-                 * asynchronous rpc in blocking thread. */
-                count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count,
-                                               LDLM_CANCEL_AGED);
-                size[DLM_LOCKREQ_OFF] =
-                        ldlm_request_bufsize(count, LDLM_ENQUEUE);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                              LDLM_ENQUEUE, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req) {
-                dlm = lustre_msg_buf(req->rq_reqmsg,
-                                     DLM_LOCKREQ_OFF, sizeof(*dlm));
-                /* Skip first lock handler in ldlm_request_pack(), this method
-                 * will incrment @lock_count according to the lock handle amount
-                 * actually written to the buffer. */
-                dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF;
-                ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF, 0);
+                 * rpc, what will make us slower. */
+                if (avail > count)
+                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
+                                                       avail - count, 0, flags);
+                if (avail > count)
+                        pack = count;
+                else
+                        pack = avail;
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, opc));
+        }
+
+        rc = ptlrpc_request_pack(req, version, opc);
+        if (rc) {
+                ldlm_lock_list_put(cancels, l_bl_ast, count);
+                RETURN(rc);
+        }
+
+        if (exp_connect_cancelset(exp)) {
+                if (canceloff) {
+                        dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
+                        LASSERT(dlm);
+                        /* Skip first lock handler in ldlm_request_pack(),
+                         * this method will incrment @lock_count according
+                         * to the lock handle amount actually written to
+                         * the buffer. */
+                        dlm->lock_count = canceloff;
+                }
+                /* Pack into the request @pack lock handles. */
+                ldlm_cli_cancel_list(cancels, pack, req, 0);
+                /* Prepare and send separate cancel rpc for others. */
+                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
         } else {
                 ldlm_lock_list_put(cancels, l_bl_ast, count);
         }
-        RETURN(req);
+        RETURN(0);
+}
+
+int ldlm_prep_enqueue_req(struct obd_export *exp,
+                          struct ptlrpc_request *req,
+                          struct list_head *cancels,
+                          int count)
+{
+        return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
+                                 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
 }
 
 /* If a request has some specific initialisation it is passed in @reqp,
@@ -556,14 +599,11 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                      struct lustre_handle *lockh, int async)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        struct ldlm_lock *lock;
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
-        int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body),
-                        [DLM_REPLY_REC_OFF]   = lvb_len };
-        int is_replay = *flags & LDLM_FL_REPLAY;
-        int req_passed_in = 1, rc, err;
+        struct ldlm_lock      *lock;
+        struct ldlm_request   *body;
+        int                    is_replay = *flags & LDLM_FL_REPLAY;
+        int                    req_passed_in = 1;
+        int                    rc, err;
         struct ptlrpc_request *req;
         ENTRY;
 
@@ -610,7 +650,10 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         /* lock not sent to server yet */
 
         if (reqp == NULL || *reqp == NULL) {
-                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+                req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                                &RQF_LDLM_ENQUEUE,
+                                                LUSTRE_DLM_VERSION,
+                                                LDLM_ENQUEUE);
                 if (req == NULL) {
                         failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
                         LDLM_LOCK_PUT(lock);
@@ -620,12 +663,13 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                 if (reqp)
                         *reqp = req;
         } else {
+                int len;
+
                 req = *reqp;
-                LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
-                         sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n",
-                         DLM_LOCKREQ_OFF,
-                         lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),
-                         sizeof(*body));
+                len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
+                                           RCL_CLIENT);
+                LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
+                         DLM_LOCKREQ_OFF, len, sizeof(*body));
         }
 
         lock->l_conn_export = exp;
@@ -633,15 +677,20 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         lock->l_blocking_ast = einfo->ei_cb_bl;
 
         /* Dump lock data into the request buffer */
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = *flags;
         body->lock_handle[0] = *lockh;
 
         /* Continue as normal. */
         if (!req_passed_in) {
-                size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
-                ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);
+                if (lvb_len > 0) {
+                        req_capsule_extend(&req->rq_pill,
+                                           &RQF_LDLM_ENQUEUE_LVB);
+                        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+                                             RCL_SERVER, lvb_len);
+                }
+                ptlrpc_request_set_replen(req);
         }
 
         /*
@@ -680,12 +729,12 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 }
 
 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
-                                  int *flags)
+                                  __u32 *flags)
 {
         struct ldlm_resource *res;
         int rc;
         ENTRY;
-        if (lock->l_resource->lr_namespace->ns_client) {
+        if (ns_is_client(lock->l_resource->lr_namespace)) {
                 CERROR("Trying to cancel local lock\n");
                 LBUG();
         }
@@ -707,16 +756,14 @@ static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
  * conversion of locks which are on the waiting or converting queue */
 /* Caller of this code is supposed to take care of lock readers/writers
    accounting */
-int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
+int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
 {
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
-        struct ldlm_lock *lock;
-        struct ldlm_resource *res;
+        struct ldlm_request   *body;
+        struct ldlm_reply     *reply;
+        struct ldlm_lock      *lock;
+        struct ldlm_resource  *res;
         struct ptlrpc_request *req;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int rc;
+        int                    rc;
         ENTRY;
 
         lock = ldlm_handle2lock(lockh);
@@ -731,30 +778,29 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
-                              LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
+                                        &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
+                                        LDLM_CONVERT);
+        if (req == NULL) {
+                LDLM_LOCK_PUT(lock);
+                RETURN(-ENOMEM);
+        }
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         body->lock_handle[0] = lock->l_remote_handle;
 
         body->lock_desc.l_req_mode = new_mode;
         body->lock_flags = *flags;
 
-        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
-        ptlrpc_req_set_repsize(req, 2, size);
 
+        ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR ("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
+                GOTO(out, rc = -EPROTO);
 
         if (req->rq_status)
                 GOTO(out, rc = req->rq_status);
@@ -811,7 +857,7 @@ static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
                 }
                 ldlm_lock_cancel(lock);
         } else {
-                if (lock->l_resource->lr_namespace->ns_client) {
+                if (ns_is_client(lock->l_resource->lr_namespace)) {
                         LDLM_ERROR(lock, "Trying to cancel local lock");
                         LBUG();
                 }
@@ -826,7 +872,7 @@ static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
 
 /* Pack @count locks in @head into ldlm_request buffer at the offset @off,
    of the request @req. */
-static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
+static void ldlm_cancel_pack(struct ptlrpc_request *req,
                              struct list_head *head, int count)
 {
         struct ldlm_request *dlm;
@@ -834,11 +880,11 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
         int max, packed = 0;
         ENTRY;
 
-        dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
+        dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         LASSERT(dlm != NULL);
 
         /* Check the room in the request buffer. */
-        max = lustre_msg_buflen(req->rq_reqmsg, off) - 
+        max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) - 
                 sizeof(struct ldlm_request);
         max /= sizeof(struct lustre_handle);
         max += LDLM_LOCKREQ_HANDLES;
@@ -866,9 +912,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                         int count, int flags)
 {
         struct ptlrpc_request *req = NULL;
-        struct ldlm_request *body;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                [DLM_LOCKREQ_OFF]     = sizeof(*body) };
         struct obd_import *imp;
         int free, sent = 0;
         int rc = 0;
@@ -880,24 +923,36 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
                 RETURN(count);
 
-        free = ldlm_req_handles_avail(exp, size, 2, 0);
-        if (count > free)
-                count = free;
-
-        size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);
         while (1) {
+                int bufcount;
+                struct req_capsule *pill; 
                 imp = class_exp2cliimp(exp);
                 if (imp == NULL || imp->imp_invalid) {
-                        CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
-                               imp);
-                        break;
+                        CDEBUG(D_DLMTRACE,
+                               "skipping cancel on invalid import %p\n", imp);
+                        RETURN(count);
                 }
 
-                req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
-                                      size, NULL);
-                if (!req)
+                req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
+                if (req == NULL)
                         GOTO(out, rc = -ENOMEM);
 
+                pill = &req->rq_pill;
+                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+
+                free = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+                                              bufcount, bufcount, 0);
+                if (count > free)
+                        count = free;
+
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, LDLM_CANCEL));
+
+                rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
+                if (rc) {
+                        ptlrpc_request_free(req);
+                        GOTO(out, rc);
+                }
                 req->rq_no_resend = 1;
                 req->rq_no_delay = 1;
 
@@ -905,11 +960,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
 
-                body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
-                                      sizeof(*body));
-                ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count);
+                ldlm_cancel_pack(req, cancels, count);
 
-                ptlrpc_req_set_repsize(req, 1, NULL);
+                ptlrpc_request_set_replen(req);
                 if (flags & LDLM_FL_ASYNC) {
                         ptlrpcd_add_req(req);
                         sent = count;
@@ -942,11 +995,72 @@ out:
         return sent ? sent : rc;
 }
 
+static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
+{
+        LASSERT(imp != NULL);
+        return &imp->imp_obd->obd_namespace->ns_pool;
+}
+
+int ldlm_cli_update_pool(struct ptlrpc_request *req)
+{
+        __u64 old_slv, new_slv;
+        struct ldlm_pool *pl;
+        __u32 new_limit;
+        ENTRY;
+    
+        if (!imp_connect_lru_resize(req->rq_import))
+                RETURN(0);
+
+        /* In some cases RPC may contain slv and limit zeroed out. This is 
+         * the case when server does not support lru resize feature. This is
+         * also possible in some recovery cases when server side reqs have no
+         * ref to obd export and thus access to server side namespace is no 
+         * possible. */
+        if (lustre_msg_get_slv(req->rq_repmsg) == 0 || 
+            lustre_msg_get_limit(req->rq_repmsg) == 0) {
+                DEBUG_REQ(D_HA, req, "zero SLV or Limit found "
+                          "(SLV: "LPU64", Limit: %u)", 
+                          lustre_msg_get_slv(req->rq_repmsg), 
+                          lustre_msg_get_limit(req->rq_repmsg));
+                RETURN(0);
+        }
+
+        new_limit = lustre_msg_get_limit(req->rq_repmsg);
+        new_slv = lustre_msg_get_slv(req->rq_repmsg);
+        pl = ldlm_imp2pl(req->rq_import);
+        
+        spin_lock(&pl->pl_lock);
+        old_slv = ldlm_pool_get_slv(pl);
+        ldlm_pool_set_slv(pl, new_slv);
+        ldlm_pool_set_limit(pl, new_limit);
+
+        /* Check if we need to wakeup pools thread for fast SLV change. 
+         * This is only done when threads period is noticably long like 
+         * 10s or more. */
+#if defined(__KERNEL__) && (LDLM_POOLS_THREAD_PERIOD >= 10)
+        {
+                __u64 fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
+                do_div(fast_change, 100);
+
+                /* Wake up pools thread only if SLV has changed more than 
+                 * 50% since last update. In this case we want to react asap. 
+                 * Otherwise it is no sense to wake up pools as they are 
+                 * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. */
+                if (old_slv > new_slv && old_slv - new_slv > fast_change)
+                        ldlm_pools_wakeup();
+        }
+#endif
+        spin_unlock(&pl->pl_lock);
+        RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_cli_update_pool);
+
 int ldlm_cli_cancel(struct lustre_handle *lockh)
 {
+        int avail, flags, count = 1, rc = 0;
+        struct ldlm_namespace *ns;
         struct ldlm_lock *lock;
         CFS_LIST_HEAD(cancels);
-        int rc = 0;
         ENTRY;
 
         /* concurrent cancels on the same handle can happen */
@@ -955,26 +1069,217 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
                 RETURN(0);
         }
-        
+
         rc = ldlm_cli_cancel_local(lock);
+        if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) {
+                LDLM_LOCK_PUT(lock);
+                RETURN(rc < 0 ? rc : 0);
+        }
+        /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
+         * rpc which goes to canceld portal, so we can cancel other lru locks
+         * here and send them all as one LDLM_CANCEL rpc. */
+        LASSERT(list_empty(&lock->l_bl_ast));
         list_add(&lock->l_bl_ast, &cancels);
-
-        if (rc == LDLM_FL_BL_AST) {
-                rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0);
-        } else if (rc == LDLM_FL_CANCELING) {
-                int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
-                int count = 1;
+        if (exp_connect_cancelset(lock->l_conn_export)) {
+                avail = ldlm_cancel_handles_avail(lock->l_conn_export);
                 LASSERT(avail > 0);
-                count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace,
-                                               &cancels, 0, avail - 1,
-                                               LDLM_CANCEL_AGED);
-                ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+
+                ns = lock->l_resource->lr_namespace;
+                flags = ns_connect_lru_resize(ns) ?
+                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
+                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
+                                               LDLM_FL_BL_AST, flags);
         }
-        if (rc != LDLM_FL_CANCELING)
-                LDLM_LOCK_PUT(lock);
-        RETURN(rc < 0 ? rc : 0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
+        RETURN(0);
 }
 
+/* XXX until we will have compound requests and can cut cancels from generic rpc
+ * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
+static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
+{
+        CFS_LIST_HEAD(head);
+        struct ldlm_lock *lock, *next;
+        int left = 0, bl_ast = 0, rc;
+
+        left = count;
+        list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+                if (left-- == 0)
+                        break;
+
+                if (flags & LDLM_FL_LOCAL_ONLY) {
+                        rc = LDLM_FL_LOCAL_ONLY;
+                        ldlm_lock_cancel(lock);
+                } else {
+                        rc = ldlm_cli_cancel_local(lock);
+                }
+                if (!(flags & LDLM_FL_BL_AST) && (rc == LDLM_FL_BL_AST)) {
+                        LDLM_DEBUG(lock, "Cancel lock separately");
+                        list_del_init(&lock->l_bl_ast);
+                        list_add(&lock->l_bl_ast, &head);
+                        bl_ast ++;
+                        continue;
+                }
+                if (rc == LDLM_FL_LOCAL_ONLY) {
+                        /* CANCEL RPC should not be sent to server. */
+                        list_del_init(&lock->l_bl_ast);
+                        LDLM_LOCK_PUT(lock);
+                        count--;
+                }
+
+        }
+        if (bl_ast > 0) {
+                count -= bl_ast;
+                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
+        }
+
+        RETURN(count);
+}
+
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
+                                                   struct ldlm_lock *lock,
+                                                   int unused, int added, 
+                                                   int count)
+{
+        int lock_cost;
+        __u64 page_nr;
+
+        /* Stop lru processing when we reached passed @count or checked all 
+         * locks in lru. */
+        if (count && added >= count)
+                return LDLM_POLICY_KEEP_LOCK;
+
+        if (lock->l_resource->lr_type == LDLM_EXTENT) {
+                struct ldlm_extent *l_extent;
+
+                /* For all extent locks cost is 1 + number of pages in
+                 * their extent. */
+                l_extent = &lock->l_policy_data.l_extent;
+                page_nr = (l_extent->end - l_extent->start);
+                do_div(page_nr, CFS_PAGE_SIZE);
+
+#ifdef __KERNEL__
+                /* XXX: In fact this is evil hack, we can't access inode
+                 * here. For doing it right we need somehow to have number
+                 * of covered by lock. This should be fixed later when 10718 
+                 * is landed. */
+                if (lock->l_ast_data != NULL) {
+                        struct inode *inode = lock->l_ast_data;
+                        if (page_nr > inode->i_mapping->nrpages)
+                                page_nr = inode->i_mapping->nrpages;
+                }
+#endif
+                lock_cost = 1 + page_nr;
+        } else {
+                /* For all locks which are not extent ones cost is 1 */
+                lock_cost = 1;
+        }
+
+        /* Keep all expensive locks in lru for the memory pressure time
+         * cancel policy. They anyways may be canceled by lru resize
+         * pplicy if they have not small enough CLV. */
+        return lock_cost > ns->ns_shrink_thumb ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+}
+
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
+                                                 struct ldlm_lock *lock, 
+                                                 int unused, int added, 
+                                                 int count)
+{
+        cfs_time_t cur = cfs_time_current();
+        struct ldlm_pool *pl = &ns->ns_pool;
+        __u64 slv, lvf, lv;
+        cfs_time_t la;
+
+        /* Stop lru processing when we reached passed @count or checked all 
+         * locks in lru. */
+        if (count && added >= count)
+                return LDLM_POLICY_KEEP_LOCK;
+
+        spin_lock(&pl->pl_lock);
+        slv = ldlm_pool_get_slv(pl);
+        lvf = atomic_read(&pl->pl_lock_volume_factor);
+        spin_unlock(&pl->pl_lock);
+
+        la = cfs_duration_sec(cfs_time_sub(cur, 
+                              lock->l_last_used));
+
+        /* Stop when slv is not yet come from server or 
+         * lv is smaller than it is. */
+        lv = lvf * la * unused;
+        return (slv == 1 || lv < slv) ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+}
+
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
+                                                   struct ldlm_lock *lock, 
+                                                   int unused, int added,
+                                                   int count)
+{
+        /* Stop lru processing when we reached passed @count or checked all 
+         * locks in lru. */
+        return (added >= count) ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+}
+
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
+                                                 struct ldlm_lock *lock, 
+                                                 int unused, int added,
+                                                 int count)
+{
+        /* Stop lru processing if young lock is found and we reached passed 
+         * @count. */
+        return ((added >= count) && 
+                cfs_time_before(cfs_time_current(),
+                                cfs_time_add(lock->l_last_used,
+                                             ns->ns_max_age))) ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+}
+
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
+                                                    struct ldlm_lock *lock, 
+                                                    int unused, int added,
+                                                    int count)
+{
+        /* Stop lru processing when we reached passed @count or checked all 
+         * locks in lru. */
+        return (added >= count) ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+}
+
+typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *, 
+                                                      struct ldlm_lock *, int, 
+                                                      int, int);
+
+static ldlm_cancel_lru_policy_t
+ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
+{
+        if (ns_connect_lru_resize(ns)) {
+                if (flags & LDLM_CANCEL_SHRINK)
+                        return ldlm_cancel_shrink_policy;
+                else if (flags & LDLM_CANCEL_LRUR)
+                        return ldlm_cancel_lrur_policy;
+                else if (flags & LDLM_CANCEL_PASSED)
+                        return ldlm_cancel_passed_policy;
+        } else {
+                if (flags & LDLM_CANCEL_AGED)
+                        return ldlm_cancel_aged_policy;
+        }
+        
+        return ldlm_cancel_default_policy;
+}
 /* - Free space in lru for @count new locks,
  *   redundant unused locks are canceled locally;
  * - also cancel locally unused aged locks;
@@ -986,35 +1291,67 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
  * There are the following use cases: ldlm_cancel_resource_local(),
  * ldlm_cancel_lru_local() and ldlm_cli_cancel(), which check&set this
  * flag properly. As any attempt to cancel a lock rely on this flag,
- * l_bl_ast list is accessed later without any special locking. */
+ * l_bl_ast list is accessed later without any special locking.
+ *
+ * Calling policies for enabled lru resize:
+ * ----------------------------------------
+ * flags & LDLM_CANCEL_LRUR - use lru resize policy (SLV from server) to
+ *                            cancel not more than @count locks;
+ *
+ * flags & LDLM_CANCEL_PASSED - cancel @count number of old locks (located at
+ *                              the beginning of lru list);
+ *
+ * flags & LDLM_CANCEL_SHRINK - cancel not more than @count locks according to
+ *                              memory pressre policy function;
+ *
+ * flags & LDLM_CANCEL_AGED -   cancel alocks according to "aged policy".
+ */
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
-                          int count, int max, int flags)
+                          int count, int max, int cancel_flags, int flags)
 {
-        cfs_time_t cur = cfs_time_current();
-        struct ldlm_lock *lock, *next;
-        int rc, added = 0, left;
+        ldlm_cancel_lru_policy_t pf;
+        struct ldlm_lock *lock;
+        int added = 0, unused;
         ENTRY;
 
         spin_lock(&ns->ns_unused_lock);
-        count += ns->ns_nr_unused - ns->ns_max_unused;
+        unused = ns->ns_nr_unused;
+
+        if (!ns_connect_lru_resize(ns))
+                count += unused - ns->ns_max_unused;
+
+        pf = ldlm_cancel_lru_policy(ns, flags);
+        LASSERT(pf != NULL);
+        
         while (!list_empty(&ns->ns_unused_list)) {
+                /* For any flags, stop scanning if @max is reached. */
                 if (max && added >= max)
                         break;
 
                 list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
-                        /* somebody is already doing CANCEL or there is a
+                        /* Somebody is already doing CANCEL or there is a
                          * blocking request will send cancel. */
                         if (!(lock->l_flags & LDLM_FL_CANCELING) &&
-                            !(lock->l_flags & LDLM_FL_BL_AST))
+                            !(lock->l_flags & LDLM_FL_BL_AST)) 
                                 break;
                 }
                 if (&lock->l_lru == &ns->ns_unused_list)
                         break;
 
-                if ((added >= count) && 
-                    (!(flags & LDLM_CANCEL_AGED) ||
-                     cfs_time_before_64(cur, (__u64)ns->ns_max_age +
-                                        lock->l_last_used)))
+                /* Pass the lock through the policy filter and see if it
+                 * should stay in lru.
+                 *
+                 * Even for shrinker policy we stop scanning if
+                 * we find a lock that should stay in the cache.
+                 * We should take into account lock age anyway
+                 * as new lock even if it is small of weight is
+                 * valuable resource. 
+                 *
+                 * That is, for shrinker policy we drop only
+                 * old locks, but additionally chose them by
+                 * their weight. Big extent locks will stay in 
+                 * the cache. */
+                if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
                         break;
 
                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
@@ -1036,57 +1373,74 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                 }
                 LASSERT(!lock->l_readers && !lock->l_writers);
 
-                /* If we have chosen to canecl this lock voluntarily, we better
-                   send cancel notification to server, so that it frees
-                   appropriate state. This might lead to a race where while
-                   we are doing cancel here, server is also silently
-                   cancelling this lock. */
+                /* If we have chosen to cancel this lock voluntarily, we
+                 * better send cancel notification to server, so that it
+                 * frees appropriate state. This might lead to a race 
+                 * where while we are doing cancel here, server is also 
+                 * silently cancelling this lock. */
                 lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
 
-                /* Setting the CBPENDING flag is a little misleading, but
-                 * prevents an important race; namely, once CBPENDING is set,
-                 * the lock can accumulate no more readers/writers.  Since
-                 * readers and writers are already zero here, ldlm_lock_decref
-                 * won't see this flag and call l_blocking_ast */
+                /* Setting the CBPENDING flag is a little misleading,
+                 * but prevents an important race; namely, once
+                 * CBPENDING is set, the lock can accumulate no more
+                 * readers/writers. Since readers and writers are
+                 * already zero here, ldlm_lock_decref() won't see
+                 * this flag and call l_blocking_ast */
                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
-                /* We can't re-add to l_lru as it confuses the refcounting in
-                 * ldlm_lock_remove_from_lru() if an AST arrives after we drop
-                 * ns_lock below. We use l_bl_ast and can't use l_pending_chain
-                 * as it is used both on server and client nevertheles bug 5666
-                 * says it is used only on server. --umka */
 
+                /* We can't re-add to l_lru as it confuses the
+                 * refcounting in ldlm_lock_remove_from_lru() if an AST
+                 * arrives after we drop ns_lock below. We use l_bl_ast
+                 * and can't use l_pending_chain as it is used both on
+                 * server and client nevertheless bug 5666 says it is
+                 * used only on server */
                 LASSERT(list_empty(&lock->l_bl_ast));
                 list_add(&lock->l_bl_ast, cancels);
                 unlock_res_and_lock(lock);
                 spin_lock(&ns->ns_unused_lock);
                 added++;
+                unused--;
         }
         spin_unlock(&ns->ns_unused_lock);
+        RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
+}
 
-        /* Handle only @added inserted locks. */
-        left = added;
-        list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
-                if (left-- == 0)
+/* Returns number of locks which could be canceled next time when 
+ * ldlm_cancel_lru() is called. Used from locks pool shrinker. */
+int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns,
+                             int count, int max, int flags)
+{
+        ldlm_cancel_lru_policy_t pf;
+        struct ldlm_lock *lock;
+        int added = 0, unused;
+        ENTRY;
+
+        pf = ldlm_cancel_lru_policy(ns, flags);
+        LASSERT(pf != NULL);
+        spin_lock(&ns->ns_unused_lock);
+        unused = ns->ns_nr_unused;
+
+        list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
+                /* For any flags, stop scanning if @max is reached. */
+                if (max && added >= max)
                         break;
 
-                rc = ldlm_cli_cancel_local(lock);
-                if (rc == LDLM_FL_BL_AST) {
-                        CFS_LIST_HEAD(head);
+                /* Somebody is already doing CANCEL or there is a
+                 * blocking request will send cancel. Let's not count 
+                 * this lock. */
+                if ((lock->l_flags & LDLM_FL_CANCELING) ||
+                    (lock->l_flags & LDLM_FL_BL_AST)) 
+                        continue;
 
-                        LDLM_DEBUG(lock, "Cancel lock separately");
-                        list_del_init(&lock->l_bl_ast);
-                        list_add(&lock->l_bl_ast, &head);
-                        ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
-                        rc = LDLM_FL_LOCAL_ONLY;
-                }
-                if (rc == LDLM_FL_LOCAL_ONLY) {
-                        /* CANCEL RPC should not be sent to server. */
-                        list_del_init(&lock->l_bl_ast);
-                        LDLM_LOCK_PUT(lock);
-                        added--;
-                }
+                /* Pass the lock through the policy filter and see if it
+                 * should stay in lru. */
+                if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
+                        break;
 
-        } 
+                added++;
+                unused--;
+        }
+        spin_unlock(&ns->ns_unused_lock);
         RETURN(added);
 }
 
@@ -1094,7 +1448,8 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
  * in a thread and this function will return after the thread has been
  * asked to call the callback.  when called with LDLM_SYNC the blocking
  * callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync, 
+                    int flags)
 {
         CFS_LIST_HEAD(cancels);
         int count, rc;
@@ -1103,7 +1458,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
 #ifndef __KERNEL__
         sync = LDLM_SYNC; /* force to be sync in user space */
 #endif
-        count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0);
+        count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags);
         if (sync == LDLM_ASYNC) {
                 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
                 if (rc == 0)
@@ -1112,8 +1467,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
 
         /* If an error occured in ASYNC mode, or
          * this is SYNC mode, cancel the list. */
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
-        RETURN(0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
+        RETURN(count);
 }
 
 /* Find and cancel locally unused locks found on resource, matched to the
@@ -1123,10 +1478,10 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                                struct list_head *cancels,
                                ldlm_policy_data_t *policy,
                                ldlm_mode_t mode, int lock_flags,
-                               int flags, void *opaque)
+                               int cancel_flags, void *opaque)
 {
-        struct ldlm_lock *lock, *next;
-        int count = 0, left;
+        struct ldlm_lock *lock;
+        int count = 0;
         ENTRY;
 
         lock_res(res);
@@ -1139,7 +1494,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                 }
 
                 if (lock->l_readers || lock->l_writers) {
-                        if (flags & LDLM_FL_WARN) {
+                        if (cancel_flags & LDLM_FL_WARN) {
                                 LDLM_ERROR(lock, "lock in use");
                                 //LBUG();
                         }
@@ -1164,7 +1519,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
 
                 /* See CBPENDING comment in ldlm_cancel_lru */
                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
-                        lock_flags;
+                                 lock_flags;
 
                 LASSERT(list_empty(&lock->l_bl_ast));
                 list_add(&lock->l_bl_ast, cancels);
@@ -1173,35 +1528,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
         }
         unlock_res(res);
 
-        /* Handle only @count inserted locks. */
-        left = count;
-        list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
-                int rc = LDLM_FL_LOCAL_ONLY;
-
-                if (left-- == 0)
-                        break;
-                if (flags & LDLM_FL_LOCAL_ONLY)
-                        ldlm_lock_cancel(lock);
-                else
-                        rc = ldlm_cli_cancel_local(lock);
-
-                if (rc == LDLM_FL_BL_AST) {
-                        CFS_LIST_HEAD(head);
-
-                        LDLM_DEBUG(lock, "Cancel lock separately");
-                        list_del_init(&lock->l_bl_ast);
-                        list_add(&lock->l_bl_ast, &head);
-                        ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
-                        rc = LDLM_FL_LOCAL_ONLY;
-                }
-                if (rc == LDLM_FL_LOCAL_ONLY) {
-                        /* CANCEL RPC should not be sent to server. */
-                        list_del_init(&lock->l_bl_ast);
-                        LDLM_LOCK_PUT(lock);
-                        count--;
-                }
-        }
-        RETURN(count);
+        RETURN(ldlm_cancel_list(cancels, count, cancel_flags));
 }
 
 /* If @req is NULL, send CANCEL request to server with handles of locks 
@@ -1211,7 +1538,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
  * buffer at the offset @off.
  * Destroy @cancels at the end. */
 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
-                         struct ptlrpc_request *req, int off, int flags)
+                         struct ptlrpc_request *req, int flags)
 {
         struct ldlm_lock *lock;
         int res = 0;
@@ -1233,10 +1560,11 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                 if (exp_connect_cancelset(lock->l_conn_export)) {
                         res = count;
                         if (req)
-                                ldlm_cancel_pack(req, off, cancels, count);
+                                ldlm_cancel_pack(req, cancels, count);
                         else
                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
-                                                          cancels, count, flags);
+                                                          cancels, count,
+                                                          flags);
                 } else {
                         res = ldlm_cli_cancel_req(lock->l_conn_export,
                                                   cancels, 1, flags);
@@ -1250,7 +1578,6 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                 count -= res;
                 ldlm_lock_list_put(cancels, l_bl_ast, res);
         }
-        LASSERT(list_empty(cancels));
         LASSERT(count == 0);
         RETURN(0);
 }
@@ -1275,7 +1602,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
         count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
                                            0, flags, opaque);
-        rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
+        rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
         if (rc != ELDLM_OK)
                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
 
@@ -1354,7 +1681,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns,
         int count = 0;
         ENTRY;
 
-        LASSERT(ns->ns_client == LDLM_NAMESPACE_CLIENT);
+        LASSERT(ns_is_client(ns));
 
         res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
         if (res == NULL)
@@ -1370,12 +1697,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns,
                     !lock->l_readers && !lock->l_writers &&
                     !(lock->l_flags & LDLM_FL_LOCAL) &&
                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
-                        lock->l_last_used = cfs_time_current();
-                        spin_lock(&ns->ns_unused_lock);
-                        LASSERT(ns->ns_nr_unused >= 0);
-                        list_add_tail(&lock->l_lru, &ns->ns_unused_list);
-                        ns->ns_nr_unused++;
-                        spin_unlock(&ns->ns_unused_lock);
+                        ldlm_lock_add_to_lru(lock);
                         lock->l_flags &= ~LDLM_FL_NO_LRU;
                         LDLM_DEBUG(lock, "join lock to lru");
                         count++;
@@ -1534,7 +1856,7 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
 static int replay_lock_interpret(struct ptlrpc_request *req,
                                  struct ldlm_async_args *aa, int rc)
 {
-        struct ldlm_lock *lock;
+        struct ldlm_lock  *lock;
         struct ldlm_reply *reply;
 
         ENTRY;
@@ -1543,12 +1865,9 @@ static int replay_lock_interpret(struct ptlrpc_request *req,
                 GOTO(out, rc);
 
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
+                GOTO(out, rc = -EPROTO);
 
         lock = ldlm_handle2lock(&aa->lock_handle);
         if (!lock) {
@@ -1575,11 +1894,8 @@ out:
 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 {
         struct ptlrpc_request *req;
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
         struct ldlm_async_args *aa;
-        int buffers = 2;
-        int size[3] = { sizeof(struct ptlrpc_body) };
+        struct ldlm_request   *body;
         int flags;
         ENTRY;
 
@@ -1621,26 +1937,25 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         else
                 flags = LDLM_FL_REPLAY;
 
-        size[DLM_LOCKREQ_OFF] = sizeof(*body);
-        req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 2, size,
-                              NULL);
-        if (!req)
+        req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
+                                        LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        if (req == NULL)
                 RETURN(-ENOMEM);
 
         /* We're part of recovery, so don't wait for it. */
         req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = flags;
 
         ldlm_lock2handle(lock, &body->lock_handle[0]);
-        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
         if (lock->l_lvb_len != 0) {
-                buffers = 3;
-                size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
+                req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                     lock->l_lvb_len);
         }
-        ptlrpc_req_set_repsize(req, buffers, size);
+        ptlrpc_request_set_replen(req);
         /* notify the server we've replayed all requests.
          * also, we mark the request to be put on a dedicated
          * queue to be processed after all request replayes.