Whamcloud - gitweb
b=13766
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 852cea8..9ccf69c 100644 (file)
@@ -346,19 +346,19 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
                 if (rc == ELDLM_LOCK_ABORTED) {
                         /* Before we return, swab the reply */
-                        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF,
-                                                   sizeof(*reply),
-                                                   lustre_swab_ldlm_reply);
-                        if (reply == NULL) {
-                                CERROR("Can't unpack ldlm_reply\n");
+                        reply = req_capsule_server_get(&req->rq_pill,
+                                                       &RMF_DLM_REP);
+                        if (reply == NULL)
                                 rc = -EPROTO;
-                        }
                         if (lvb_len) {
-                                void *tmplvb;
-                                tmplvb = lustre_swab_repbuf(req,
-                                                            DLM_REPLY_REC_OFF,
-                                                            lvb_len,
-                                                            lvb_swabber);
+                                struct ost_lvb *tmplvb;
+
+                                req_capsule_set_size(&req->rq_pill,
+                                                     &RMF_DLM_LVB, RCL_SERVER,
+                                                     lvb_len);
+                            tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                                 &RMF_DLM_LVB,
+                                                                 lvb_swabber);
                                 if (tmplvb == NULL)
                                         GOTO(cleanup, rc = -EPROTO);
                                 if (lvb != NULL)
@@ -368,12 +368,9 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                 GOTO(cleanup, rc);
         }
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
                 GOTO(cleanup, rc = -EPROTO);
-        }
 
         /* lock enqueued on the server */
         cleanup_phase = 0;
@@ -447,8 +444,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
          * clobber the LVB with an older one. */
         if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
                 void *tmplvb;
-                tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len,
-                                            lvb_swabber);
+
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                     lvb_len);
+                tmplvb = req_capsule_server_swab_get(&req->rq_pill,
+                                                     &RMF_DLM_LVB,
+                                                     lvb_swabber);
                 if (tmplvb == NULL)
                         GOTO(cleanup, rc = -EPROTO);
                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
@@ -511,24 +512,25 @@ static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
  * @count locks in @cancels. */
-struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
-                                         int opc, int bufcount, int *size,
-                                         int bufoff, int canceloff,
-                                         struct list_head *cancels, int count)
+int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
+                      int version, int opc, int canceloff,
+                      struct list_head *cancels, int count)
 {
-        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        int flags, avail, to_free, pack = 0;
-        struct ldlm_request *dlm = NULL;
-        struct ptlrpc_request *req;
-        CFS_LIST_HEAD(head);
+        struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
+        struct req_capsule      *pill = &req->rq_pill;
+        struct ldlm_request     *dlm = NULL;
+        int flags, avail, to_free, bufcount, pack = 0;
+        int rc;
         ENTRY;
 
-        if (cancels == NULL)
-                cancels = &head;
+
+        LASSERT(cancels != NULL);
+
         if (exp_connect_cancelset(exp)) {
                 /* Estimate the amount of available space in the request. */
-                avail = ldlm_req_handles_avail(exp, size, bufcount,
-                                               bufoff, canceloff);
+                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+                avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+                                               bufcount, bufcount - 1, canceloff);
                 flags = ns_connect_lru_resize(ns) ? 
                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
                 to_free = !ns_connect_lru_resize(ns) &&
@@ -544,14 +546,20 @@ struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
                         pack = count;
                 else
                         pack = avail;
-                size[bufoff] = ldlm_request_bufsize(pack, opc);
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, opc));
+        }
+
+        rc = ptlrpc_request_pack(req, version, opc);
+        if (rc) {
+                ldlm_lock_list_put(cancels, l_bl_ast, count);
+                RETURN(rc);
         }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
-                              opc, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req) {
+
+        if (exp_connect_cancelset(exp)) {
                 if (canceloff) {
-                        dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
-                                             sizeof(*dlm));
+                        dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
+                        LASSERT(dlm);
                         /* Skip first lock handler in ldlm_request_pack(),
                          * this method will incrment @lock_count according
                          * to the lock handle amount actually written to
@@ -559,22 +567,21 @@ struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
                         dlm->lock_count = canceloff;
                 }
                 /* Pack into the request @pack lock handles. */
-                ldlm_cli_cancel_list(cancels, pack, req, bufoff, 0);
+                ldlm_cli_cancel_list(cancels, pack, req, 0);
                 /* Prepare and send separate cancel rpc for others. */
-                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0, 0);
+                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
         } else {
                 ldlm_lock_list_put(cancels, l_bl_ast, count);
         }
-        RETURN(req);
+        RETURN(0);
 }
 
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
-                                             int bufcount, int *size,
-                                             struct list_head *cancels,
-                                             int count)
+int ldlm_prep_enqueue_req(struct obd_export *exp,
+                          struct ptlrpc_request *req,
+                          struct list_head *cancels,
+                          int count)
 {
-        return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
-                                 bufcount, size, DLM_LOCKREQ_OFF,
+        return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
 }
 
@@ -592,14 +599,11 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                      struct lustre_handle *lockh, int async)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        struct ldlm_lock *lock;
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
-        int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body),
-                        [DLM_REPLY_REC_OFF]   = lvb_len };
-        int is_replay = *flags & LDLM_FL_REPLAY;
-        int req_passed_in = 1, rc, err;
+        struct ldlm_lock      *lock;
+        struct ldlm_request   *body;
+        int                    is_replay = *flags & LDLM_FL_REPLAY;
+        int                    req_passed_in = 1;
+        int                    rc, err;
         struct ptlrpc_request *req;
         ENTRY;
 
@@ -646,7 +650,10 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         /* lock not sent to server yet */
 
         if (reqp == NULL || *reqp == NULL) {
-                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
+                req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                                &RQF_LDLM_ENQUEUE,
+                                                LUSTRE_DLM_VERSION,
+                                                LDLM_ENQUEUE);
                 if (req == NULL) {
                         failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
                         LDLM_LOCK_PUT(lock);
@@ -656,12 +663,13 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                 if (reqp)
                         *reqp = req;
         } else {
+                int len;
+
                 req = *reqp;
-                LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
-                         sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n",
-                         DLM_LOCKREQ_OFF,
-                         lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),
-                         sizeof(*body));
+                len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
+                                           RCL_CLIENT);
+                LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
+                         DLM_LOCKREQ_OFF, len, sizeof(*body));
         }
 
         lock->l_conn_export = exp;
@@ -669,15 +677,20 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         lock->l_blocking_ast = einfo->ei_cb_bl;
 
         /* Dump lock data into the request buffer */
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = *flags;
         body->lock_handle[0] = *lockh;
 
         /* Continue as normal. */
         if (!req_passed_in) {
-                size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
-                ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);
+                if (lvb_len > 0) {
+                        req_capsule_extend(&req->rq_pill,
+                                           &RQF_LDLM_ENQUEUE_LVB);
+                        req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+                                             RCL_SERVER, lvb_len);
+                }
+                ptlrpc_request_set_replen(req);
         }
 
         /*
@@ -745,14 +758,12 @@ static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
    accounting */
 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
 {
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
-        struct ldlm_lock *lock;
-        struct ldlm_resource *res;
+        struct ldlm_request   *body;
+        struct ldlm_reply     *reply;
+        struct ldlm_lock      *lock;
+        struct ldlm_resource  *res;
         struct ptlrpc_request *req;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
-        int rc;
+        int                    rc;
         ENTRY;
 
         lock = ldlm_handle2lock(lockh);
@@ -767,30 +778,29 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
-                              LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
+        req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
+                                        &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
+                                        LDLM_CONVERT);
+        if (req == NULL) {
+                LDLM_LOCK_PUT(lock);
+                RETURN(-ENOMEM);
+        }
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         body->lock_handle[0] = lock->l_remote_handle;
 
         body->lock_desc.l_req_mode = new_mode;
         body->lock_flags = *flags;
 
-        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
-        ptlrpc_req_set_repsize(req, 2, size);
 
+        ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
         if (rc != ELDLM_OK)
                 GOTO(out, rc);
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR ("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
+                GOTO(out, rc = -EPROTO);
 
         if (req->rq_status)
                 GOTO(out, rc = req->rq_status);
@@ -862,7 +872,7 @@ static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
 
 /* Pack @count locks in @head into ldlm_request buffer at the offset @off,
    of the request @req. */
-static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
+static void ldlm_cancel_pack(struct ptlrpc_request *req,
                              struct list_head *head, int count)
 {
         struct ldlm_request *dlm;
@@ -870,11 +880,11 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
         int max, packed = 0;
         ENTRY;
 
-        dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
+        dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         LASSERT(dlm != NULL);
 
         /* Check the room in the request buffer. */
-        max = lustre_msg_buflen(req->rq_reqmsg, off) - 
+        max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) - 
                 sizeof(struct ldlm_request);
         max /= sizeof(struct lustre_handle);
         max += LDLM_LOCKREQ_HANDLES;
@@ -902,9 +912,6 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                         int count, int flags)
 {
         struct ptlrpc_request *req = NULL;
-        struct ldlm_request *body;
-        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                [DLM_LOCKREQ_OFF]     = sizeof(*body) };
         struct obd_import *imp;
         int free, sent = 0;
         int rc = 0;
@@ -916,12 +923,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
                 RETURN(count);
 
-        free = ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
-        if (count > free)
-                count = free;
-
-        size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);
         while (1) {
+                int bufcount;
+                struct req_capsule *pill; 
                 imp = class_exp2cliimp(exp);
                 if (imp == NULL || imp->imp_invalid) {
                         CDEBUG(D_DLMTRACE,
@@ -929,11 +933,26 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                         RETURN(count);
                 }
 
-                req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
-                                      size, NULL);
-                if (!req)
+                req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
+                if (req == NULL)
                         GOTO(out, rc = -ENOMEM);
 
+                pill = &req->rq_pill;
+                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
+
+                free = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
+                                              bufcount, bufcount, 0);
+                if (count > free)
+                        count = free;
+
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(count, LDLM_CANCEL));
+
+                rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
+                if (rc) {
+                        ptlrpc_request_free(req);
+                        GOTO(out, rc);
+                }
                 req->rq_no_resend = 1;
                 req->rq_no_delay = 1;
 
@@ -941,11 +960,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
 
-                body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
-                                      sizeof(*body));
-                ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count);
+                ldlm_cancel_pack(req, cancels, count);
 
-                ptlrpc_req_set_repsize(req, 1, NULL);
+                ptlrpc_request_set_replen(req);
                 if (flags & LDLM_FL_ASYNC) {
                         ptlrpcd_add_req(req);
                         sent = count;
@@ -986,30 +1003,43 @@ static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
 
 int ldlm_cli_update_pool(struct ptlrpc_request *req)
 {
+        __u64 old_slv, new_slv;
         struct ldlm_pool *pl;
+        __u32 new_limit;
         ENTRY;
     
         if (!imp_connect_lru_resize(req->rq_import))
                 RETURN(0);
 
-        if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
-            lustre_msg_get_limit(req->rq_repmsg) == 0)
+        /* In some cases RPC may contain slv and limit zeroed out. This is 
+         * the case when server does not support lru resize feature. This is
+         * also possible in some recovery cases when server side reqs have no
+         * ref to obd export and thus access to server side namespace is no 
+         * possible. */
+        if (lustre_msg_get_slv(req->rq_repmsg) == 0 || 
+            lustre_msg_get_limit(req->rq_repmsg) == 0) {
+                DEBUG_REQ(D_HA, req, "zero SLV or Limit found "
+                          "(SLV: "LPU64", Limit: %u)", 
+                          lustre_msg_get_slv(req->rq_repmsg), 
+                          lustre_msg_get_limit(req->rq_repmsg));
                 RETURN(0);
+        }
 
+        new_limit = lustre_msg_get_limit(req->rq_repmsg);
+        new_slv = lustre_msg_get_slv(req->rq_repmsg);
         pl = ldlm_imp2pl(req->rq_import);
         
         spin_lock(&pl->pl_lock);
+        old_slv = ldlm_pool_get_slv(pl);
+        ldlm_pool_set_slv(pl, new_slv);
+        ldlm_pool_set_limit(pl, new_limit);
 
         /* Check if we need to wakeup pools thread for fast SLV change. 
          * This is only done when threads period is noticably long like 
          * 10s or more. */
 #if defined(__KERNEL__) && (LDLM_POOLS_THREAD_PERIOD >= 10)
         {
-                __u64 old_slv, new_slv, fast_change;
-
-                old_slv = ldlm_pool_get_slv(pl);
-                new_slv = lustre_msg_get_slv(req->rq_repmsg);
-                fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
+                __u64 fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
                 do_div(fast_change, 100);
 
                 /* Wake up pools thread only if SLV has changed more than 
@@ -1020,23 +1050,7 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
                         ldlm_pools_wakeup();
         }
 #endif
-        /* In some cases RPC may contain slv and limit zeroed out. This is 
-         * the case when server does not support lru resize feature. This is
-         * also possible in some recovery cases when server side reqs have no
-         * ref to obd export and thus access to server side namespace is no 
-         * possible. */
-        if (lustre_msg_get_slv(req->rq_repmsg) != 0 && 
-            lustre_msg_get_limit(req->rq_repmsg) != 0) {
-                ldlm_pool_set_slv(pl, lustre_msg_get_slv(req->rq_repmsg));
-                ldlm_pool_set_limit(pl, lustre_msg_get_limit(req->rq_repmsg));
-        } else {
-                DEBUG_REQ(D_HA, req, "zero SLV or Limit found "
-                          "(SLV: "LPU64", Limit: %u)", 
-                          lustre_msg_get_slv(req->rq_repmsg), 
-                          lustre_msg_get_limit(req->rq_repmsg));
-        }
         spin_unlock(&pl->pl_lock);
-
         RETURN(0);
 }
 EXPORT_SYMBOL(ldlm_cli_update_pool);
@@ -1076,7 +1090,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
                                                LDLM_FL_BL_AST, flags);
         }
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(0);
 }
 
@@ -1116,22 +1130,27 @@ static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
         }
         if (bl_ast > 0) {
                 count -= bl_ast;
-                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0, 0);
+                ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
         }
 
         RETURN(count);
 }
 
-/* Return 1 if @lock should be canceled according to shrinker policy. 
- * Return zero otherwise. */
-static int ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
-                                     struct ldlm_lock *lock,
-                                     int unused, int added, 
-                                     int asked)
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
+                                                   struct ldlm_lock *lock,
+                                                   int unused, int added, 
+                                                   int count)
 {
         int lock_cost;
         __u64 page_nr;
 
+        /* Stop lru processing when we reached passed @count or checked all 
+         * locks in lru. */
+        if (count && added >= count)
+                return LDLM_POLICY_KEEP_LOCK;
+
         if (lock->l_resource->lr_type == LDLM_EXTENT) {
                 struct ldlm_extent *l_extent;
 
@@ -1161,21 +1180,27 @@ static int ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
         /* Keep all expensive locks in lru for the memory pressure time
          * cancel policy. They anyways may be canceled by lru resize
          * pplicy if they have not small enough CLV. */
-        return (lock_cost <= ns->ns_shrink_thumb);
+        return lock_cost > ns->ns_shrink_thumb ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
 }
 
-/* Return 1 if @lock should be canceled according to lru resize policy. 
- * Return zero otherwise. */
-static int ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
-                                   struct ldlm_lock *lock, 
-                                   int unused, int added, 
-                                   int asked)
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
+                                                 struct ldlm_lock *lock, 
+                                                 int unused, int added, 
+                                                 int count)
 {
         cfs_time_t cur = cfs_time_current();
         struct ldlm_pool *pl = &ns->ns_pool;
         __u64 slv, lvf, lv;
         cfs_time_t la;
 
+        /* Stop lru processing when we reached passed @count or checked all 
+         * locks in lru. */
+        if (count && added >= count)
+                return LDLM_POLICY_KEEP_LOCK;
+
         spin_lock(&pl->pl_lock);
         slv = ldlm_pool_get_slv(pl);
         lvf = atomic_read(&pl->pl_lock_volume_factor);
@@ -1187,40 +1212,55 @@ static int ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
         /* Stop when slv is not yet come from server or 
          * lv is smaller than it is. */
         lv = lvf * la * unused;
-        return (slv > 1 && lv >= slv);
+        return (slv == 1 || lv < slv) ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
+}
+
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
+                                                   struct ldlm_lock *lock, 
+                                                   int unused, int added,
+                                                   int count)
+{
+        /* Stop lru processing when we reached passed @count or checked all 
+         * locks in lru. */
+        return (added >= count) ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
 }
 
-/* Return 1 if @lock should be canceled according to passed policy. 
- * Return zero otherwise. */
-static int ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
-                                     struct ldlm_lock *lock, 
-                                     int unused, int added,
-                                     int asked)
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
+                                                 struct ldlm_lock *lock, 
+                                                 int unused, int added,
+                                                 int count)
 {
-        /* Do nothing here, we allow canceling all locks which
-         * are passed here from upper layer logic. So that locks
-         * number to be canceled will be limited by @count and
-         * @max in ldlm_cancel_lru_local(). */
-        return 1;
+        /* Stop lru processing if young lock is found and we reached passed 
+         * @count. */
+        return ((added >= count) && 
+                cfs_time_before(cfs_time_current(),
+                                cfs_time_add(lock->l_last_used,
+                                             ns->ns_max_age))) ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
 }
 
-/* Return 1 if @lock should be canceled according to aged policy. 
- * Return zero otherwise. */
-static int ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
-                                   struct ldlm_lock *lock, 
-                                   int unused, int added,
-                                   int asked)
+/* Return 1 to stop lru processing and keep current lock cached. Return zero 
+ * otherwise. */
+static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
+                                                    struct ldlm_lock *lock, 
+                                                    int unused, int added,
+                                                    int count)
 {
-        /* Cancel old locks if reached asked limit. */
-        return !((added >= asked) && 
-                 cfs_time_before_64(cfs_time_current(),
-                                    cfs_time_add(lock->l_last_used,
-                                                 ns->ns_max_age)));
+        /* Stop lru processing when we reached passed @count or checked all 
+         * locks in lru. */
+        return (added >= count) ? 
+                LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
 }
 
-typedef int (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *, 
-                                        struct ldlm_lock *, int, 
-                                        int, int);
+typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *, 
+                                                      struct ldlm_lock *, int, 
+                                                      int, int);
 
 static ldlm_cancel_lru_policy_t
 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
@@ -1236,7 +1276,8 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
                 if (flags & LDLM_CANCEL_AGED)
                         return ldlm_cancel_aged_policy;
         }
-        return NULL;
+        
+        return ldlm_cancel_default_policy;
 }
  
 /* - Free space in lru for @count new locks,
@@ -1261,14 +1302,16 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
  *                              the beginning of lru list);
  *
  * flags & LDLM_CANCEL_SHRINK - cancel not more than @count locks according to
- *                              memory pressre policy function.
+ *                              memory pressre policy function;
+ *
+ * flags & LDLM_CANCEL_AGED -   cancel alocks according to "aged policy".
  */
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                           int count, int max, int cancel_flags, int flags)
 {
-        ldlm_cancel_lru_policy_t cancel_lru_policy_func;
-        int added = 0, unused, cancel;
-        struct ldlm_lock *lock, *next;
+        ldlm_cancel_lru_policy_t pf;
+        struct ldlm_lock *lock;
+        int added = 0, unused;
         ENTRY;
 
         spin_lock(&ns->ns_unused_lock);
@@ -1277,90 +1320,130 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
         if (!ns_connect_lru_resize(ns))
                 count += unused - ns->ns_max_unused;
 
-        cancel_lru_policy_func = ldlm_cancel_lru_policy(ns, flags);
-     
-        list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru) {
-                /* Make sure that we skip locks being already in cancel. */
-                if ((lock->l_flags & LDLM_FL_CANCELING) ||
-                    (lock->l_flags & LDLM_FL_BL_AST))
-                        continue;
-
-                /* For any flags, stop scanning if @max or passed @count is
-                 * reached. */
-                if ((max && added >= max) || (count && added >= count))
+        pf = ldlm_cancel_lru_policy(ns, flags);
+        LASSERT(pf != NULL);
+        
+        while (!list_empty(&ns->ns_unused_list)) {
+                /* For any flags, stop scanning if @max is reached. */
+                if (max && added >= max)
                         break;
 
-                /* Pass the lock through the policy filter and see if it
-                 * should stay in lru. */
-                if (cancel_lru_policy_func != NULL) {
-                        cancel = cancel_lru_policy_func(ns, lock, unused, 
-                                                        added, count);
-                     
-                        /* Take next lock for shrink policy, we need to check
-                         * whole list. Stop scanning for other policies. */
-                        if ((flags & LDLM_CANCEL_SHRINK) && !cancel)
-                                continue;
-                        else if (!cancel)
+                list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
+                        /* Somebody is already doing CANCEL or there is a
+                         * blocking request will send cancel. */
+                        if (!(lock->l_flags & LDLM_FL_CANCELING) &&
+                            !(lock->l_flags & LDLM_FL_BL_AST)) 
                                 break;
                 }
+                if (&lock->l_lru == &ns->ns_unused_list)
+                        break;
 
-                if (cancels != NULL) {
-                        LDLM_LOCK_GET(lock); /* dropped by bl thread */
-                        spin_unlock(&ns->ns_unused_lock);
-
-                        lock_res_and_lock(lock);
-                        /* Check flags again under the lock. */
-                        if ((lock->l_flags & LDLM_FL_CANCELING) ||
-                            (lock->l_flags & LDLM_FL_BL_AST) ||
-                            (ldlm_lock_remove_from_lru(lock) == 0)) {
-                                /* other thread is removing lock from lru or
-                                 * somebody is already doing CANCEL or
-                                 * there is a blocking request which will send
-                                 * cancel by itseft. */
-                                unlock_res_and_lock(lock);
-                                LDLM_LOCK_PUT(lock);
-                                spin_lock(&ns->ns_unused_lock);
-                                continue;
-                        }
-                        LASSERT(!lock->l_readers && !lock->l_writers);
-
-                        /* If we have chosen to cancel this lock voluntarily, we
-                         * better send cancel notification to server, so that it
-                         * frees appropriate state. This might lead to a race 
-                         * where while we are doing cancel here, server is also 
-                         * silently cancelling this lock. */
-                        lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
-
-                        /* Setting the CBPENDING flag is a little misleading,
-                         * but prevents an important race; namely, once
-                         * CBPENDING is set, the lock can accumulate no more
-                         * readers/writers. Since readers and writers are
-                         * already zero here, ldlm_lock_decref() won't see
-                         * this flag and call l_blocking_ast */
-                        lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
-
-                        /* We can't re-add to l_lru as it confuses the
-                         * refcounting in ldlm_lock_remove_from_lru() if an AST
-                         * arrives after we drop ns_lock below. We use l_bl_ast
-                         * and can't use l_pending_chain as it is used both on
-                         * server and client nevertheless bug 5666 says it is
-                         * used only on server */
-                        LASSERT(list_empty(&lock->l_bl_ast));
-                        list_add(&lock->l_bl_ast, cancels);
+                /* Pass the lock through the policy filter and see if it
+                 * should stay in lru.
+                 *
+                 * Even for shrinker policy we stop scanning if
+                 * we find a lock that should stay in the cache.
+                 * We should take into account lock age anyway
+                 * as new lock even if it is small of weight is
+                 * valuable resource. 
+                 *
+                 * That is, for shrinker policy we drop only
+                 * old locks, but additionally chose them by
+                 * their weight. Big extent locks will stay in 
+                 * the cache. */
+                if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
+                        break;
+
+                LDLM_LOCK_GET(lock); /* dropped by bl thread */
+                spin_unlock(&ns->ns_unused_lock);
+
+                lock_res_and_lock(lock);
+                /* Check flags again under the lock. */
+                if ((lock->l_flags & LDLM_FL_CANCELING) ||
+                    (lock->l_flags & LDLM_FL_BL_AST) ||
+                    (ldlm_lock_remove_from_lru(lock) == 0)) {
+                        /* other thread is removing lock from lru or
+                         * somebody is already doing CANCEL or
+                         * there is a blocking request which will send
+                         * cancel by itseft. */
                         unlock_res_and_lock(lock);
+                        LDLM_LOCK_PUT(lock);
                         spin_lock(&ns->ns_unused_lock);
+                        continue;
                 }
+                LASSERT(!lock->l_readers && !lock->l_writers);
+
+                /* If we have chosen to cancel this lock voluntarily, we
+                 * better send cancel notification to server, so that it
+                 * frees appropriate state. This might lead to a race 
+                 * where while we are doing cancel here, server is also 
+                 * silently cancelling this lock. */
+                lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
+
+                /* Setting the CBPENDING flag is a little misleading,
+                 * but prevents an important race; namely, once
+                 * CBPENDING is set, the lock can accumulate no more
+                 * readers/writers. Since readers and writers are
+                 * already zero here, ldlm_lock_decref() won't see
+                 * this flag and call l_blocking_ast */
+                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
+
+                /* We can't re-add to l_lru as it confuses the
+                 * refcounting in ldlm_lock_remove_from_lru() if an AST
+                 * arrives after we drop ns_lock below. We use l_bl_ast
+                 * and can't use l_pending_chain as it is used both on
+                 * server and client nevertheless bug 5666 says it is
+                 * used only on server */
+                LASSERT(list_empty(&lock->l_bl_ast));
+                list_add(&lock->l_bl_ast, cancels);
+                unlock_res_and_lock(lock);
+                spin_lock(&ns->ns_unused_lock);
                 added++;
                 unused--;
         }
         spin_unlock(&ns->ns_unused_lock);
-  
-        if (cancels == NULL)
-                RETURN(added);
-
         RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
 }
 
+/* Returns number of locks which could be canceled next time when 
+ * ldlm_cancel_lru() is called. Used from locks pool shrinker. */
+int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns,
+                             int count, int max, int flags)
+{
+        ldlm_cancel_lru_policy_t pf;
+        struct ldlm_lock *lock;
+        int added = 0, unused;
+        ENTRY;
+
+        pf = ldlm_cancel_lru_policy(ns, flags);
+        LASSERT(pf != NULL);
+        spin_lock(&ns->ns_unused_lock);
+        unused = ns->ns_nr_unused;
+
+        list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
+                /* For any flags, stop scanning if @max is reached. */
+                if (max && added >= max)
+                        break;
+
+                /* Somebody is already doing CANCEL or there is a
+                 * blocking request will send cancel. Let's not count 
+                 * this lock. */
+                if ((lock->l_flags & LDLM_FL_CANCELING) ||
+                    (lock->l_flags & LDLM_FL_BL_AST)) 
+                        continue;
+
+                /* Pass the lock through the policy filter and see if it
+                 * should stay in lru. */
+                if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
+                        break;
+
+                added++;
+                unused--;
+        }
+        spin_unlock(&ns->ns_unused_lock);
+        RETURN(added);
+}
+
 /* when called with LDLM_ASYNC the blocking callback will be handled
  * in a thread and this function will return after the thread has been
  * asked to call the callback.  when called with LDLM_SYNC the blocking
@@ -1384,7 +1467,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
 
         /* If an error occured in ASYNC mode, or
          * this is SYNC mode, cancel the list. */
-        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(count);
 }
 
@@ -1455,7 +1538,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
  * buffer at the offset @off.
  * Destroy @cancels at the end. */
 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
-                         struct ptlrpc_request *req, int off, int flags)
+                         struct ptlrpc_request *req, int flags)
 {
         struct ldlm_lock *lock;
         int res = 0;
@@ -1477,7 +1560,7 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                 if (exp_connect_cancelset(lock->l_conn_export)) {
                         res = count;
                         if (req)
-                                ldlm_cancel_pack(req, off, cancels, count);
+                                ldlm_cancel_pack(req, cancels, count);
                         else
                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
                                                           cancels, count,
@@ -1519,7 +1602,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
         count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
                                            0, flags, opaque);
-        rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
+        rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
         if (rc != ELDLM_OK)
                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
 
@@ -1773,7 +1856,7 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
 static int replay_lock_interpret(struct ptlrpc_request *req,
                                  struct ldlm_async_args *aa, int rc)
 {
-        struct ldlm_lock *lock;
+        struct ldlm_lock  *lock;
         struct ldlm_reply *reply;
 
         ENTRY;
@@ -1782,12 +1865,9 @@ static int replay_lock_interpret(struct ptlrpc_request *req,
                 GOTO(out, rc);
 
 
-        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+        if (reply == NULL)
+                GOTO(out, rc = -EPROTO);
 
         lock = ldlm_handle2lock(&aa->lock_handle);
         if (!lock) {
@@ -1814,11 +1894,8 @@ out:
 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 {
         struct ptlrpc_request *req;
-        struct ldlm_request *body;
-        struct ldlm_reply *reply;
         struct ldlm_async_args *aa;
-        int buffers = 2;
-        int size[3] = { sizeof(struct ptlrpc_body) };
+        struct ldlm_request   *body;
         int flags;
         ENTRY;
 
@@ -1860,26 +1937,25 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         else
                 flags = LDLM_FL_REPLAY;
 
-        size[DLM_LOCKREQ_OFF] = sizeof(*body);
-        req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 2, size,
-                              NULL);
-        if (!req)
+        req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
+                                        LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        if (req == NULL)
                 RETURN(-ENOMEM);
 
         /* We're part of recovery, so don't wait for it. */
         req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
 
-        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
+        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
         ldlm_lock2desc(lock, &body->lock_desc);
         body->lock_flags = flags;
 
         ldlm_lock2handle(lock, &body->lock_handle[0]);
-        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
         if (lock->l_lvb_len != 0) {
-                buffers = 3;
-                size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
+                req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
+                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                                     lock->l_lvb_len);
         }
-        ptlrpc_req_set_repsize(req, buffers, size);
+        ptlrpc_request_set_replen(req);
         /* notify the server we've replayed all requests.
          * also, we mark the request to be put on a dedicated
          * queue to be processed after all request replayes.