Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
index dca0fd9..795ca4a 100644 (file)
@@ -40,7 +40,7 @@
 # include <liblustre.h>
 #endif
 
-# include <lustre_dlm.h>
+#include <lustre_dlm.h>
 #include <libcfs/kp30.h>
 #include <lustre_net.h>
 #include <lustre/lustre_user.h>
@@ -63,9 +63,6 @@ extern quota_interface_t osc_quota_interface;
 
 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
 
-static quota_interface_t *quota_interface;
-extern quota_interface_t osc_quota_interface;
-
 /* Pack OSC object metadata for disk storage (LE byte order). */
 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
                       struct lov_stripe_md *lsm)
@@ -91,7 +88,9 @@ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
 
         if (lsm) {
                 LASSERT(lsm->lsm_object_id);
+                LASSERT(lsm->lsm_object_gr);
                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
+                (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
         }
 
         RETURN(lmm_size);
@@ -144,7 +143,9 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         if (lmm != NULL) {
                 /* XXX zero *lsmp? */
                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
+                (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
                 LASSERT((*lsmp)->lsm_object_id);
+                LASSERT((*lsmp)->lsm_object_gr);
         }
 
         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
@@ -152,6 +153,32 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         RETURN(lsm_size);
 }
 
+static inline void osc_pack_capa(struct ptlrpc_request *req, int offset,
+                                 struct ost_body *body, void *capa)
+{
+        struct obd_capa *oc = (struct obd_capa *)capa;
+        struct lustre_capa *c;
+
+        if (!capa)
+                return;
+
+        c = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*c));
+        LASSERT(c);
+        capa_cpy(c, oc);
+        body->oa.o_valid |= OBD_MD_FLOSSCAPA;
+        DEBUG_CAPA(D_SEC, c, "pack");
+}
+
+static inline void osc_pack_req_body(struct ptlrpc_request *req, int offset,
+                                     struct obd_info *oinfo)
+{
+        struct ost_body *body;
+
+        body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
+        body->oa = *oinfo->oi_oa;
+        osc_pack_capa(req, offset + 1, body, oinfo->oi_capa);
+}
+
 static int osc_getattr_interpret(struct ptlrpc_request *req,
                                  struct osc_async_args *aa, int rc)
 {
@@ -185,17 +212,17 @@ static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         struct osc_async_args *aa;
         ENTRY;
 
+        size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_GETATTR, 2, size,NULL);
+                              OST_GETATTR, 3, size,NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
+        osc_pack_req_body(req, REQ_REC_OFF, oinfo);
 
         ptlrpc_req_set_repsize(req, 2, size);
         req->rq_interpret_reply = osc_getattr_interpret;
@@ -212,16 +239,16 @@ static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
+        size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_GETATTR, 2, size, NULL);
+                              OST_GETATTR, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
+        osc_pack_req_body(req, REQ_REC_OFF, oinfo);
 
         ptlrpc_req_set_repsize(req, 2, size);
 
@@ -239,7 +266,7 @@ static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
         }
 
         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-        memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
+        *oinfo->oi_oa = body->oa;
 
         /* This should really be sent by the OST */
         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
@@ -256,16 +283,18 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
+        LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
+                                        oinfo->oi_oa->o_gr > 0);
+        size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_SETATTR, 2, size, NULL);
+                              OST_SETATTR, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
+        osc_pack_req_body(req, REQ_REC_OFF, oinfo);
 
         ptlrpc_req_set_repsize(req, 2, size);
 
@@ -278,7 +307,7 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
         if (body == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        memcpy(oinfo->oi_oa, &body->oa, sizeof(*oinfo->oi_oa));
+        *oinfo->oi_oa = body->oa;
 
         EXIT;
 out:
@@ -302,7 +331,7 @@ static int osc_setattr_interpret(struct ptlrpc_request *req,
                 GOTO(out, rc = -EPROTO);
         }
 
-        memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
+        *aa->aa_oi->oi_oa = body->oa;
 out:
         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
         RETURN(rc);
@@ -313,25 +342,22 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
                              struct ptlrpc_request_set *rqset)
 {
         struct ptlrpc_request *req;
-        struct ost_body *body;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct ost_body) };
         struct osc_async_args *aa;
         ENTRY;
 
+        size[REQ_REC_OFF + 1] = oinfo->oi_capa ? sizeof(struct lustre_capa) : 0;
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_SETATTR, 2, size, NULL);
+                              OST_SETATTR, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-
+        osc_pack_req_body(req, REQ_REC_OFF, oinfo);
         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
                 LASSERT(oti);
-                memcpy(obdo_logcookie(oinfo->oi_oa), oti->oti_logcookies,
-                       sizeof(*oti->oti_logcookies));
+                *obdo_logcookie(oinfo->oi_oa) = *oti->oti_logcookies;
         }
 
-        memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
         ptlrpc_req_set_repsize(req, 2, size);
         /* do mds to ost setattr asynchronouly */
         if (!rqset) {
@@ -340,7 +366,7 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
         } else {
                 req->rq_interpret_reply = osc_setattr_interpret;
 
-                CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+                CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
                 aa = (struct osc_async_args *)&req->rq_async_args;
                 aa->aa_oi = oinfo;
 
@@ -375,7 +401,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
                 GOTO(out, rc = -ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        memcpy(&body->oa, oa, sizeof(body->oa));
+        body->oa = *oa;
 
         ptlrpc_req_set_repsize(req, 2, size);
         if (oa->o_valid & OBD_MD_FLINLINE) {
@@ -398,7 +424,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
                 GOTO (out_req, rc = -EPROTO);
         }
 
-        memcpy(oa, &body->oa, sizeof(*oa));
+        *oa = body->oa;
 
         /* This should really be sent by the OST */
         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
@@ -409,6 +435,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
          * This needs to be fixed in a big way.
          */
         lsm->lsm_object_id = oa->o_id;
+        lsm->lsm_object_gr = oa->o_gr;
         *ea = lsm;
 
         if (oti != NULL) {
@@ -417,20 +444,18 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
                         if (!oti->oti_logcookies)
                                 oti_alloc_cookies(oti, 1);
-                        memcpy(oti->oti_logcookies, obdo_logcookie(oa),
-                               sizeof(oti->oti_onecookie));
+                        *oti->oti_logcookies = *obdo_logcookie(oa);
                 }
         }
 
         CDEBUG(D_HA, "transno: "LPD64"\n",
                lustre_msg_get_transno(req->rq_repmsg));
-        EXIT;
 out_req:
         ptlrpc_req_finished(req);
 out:
         if (rc && !*ea)
                 obd_free_memmd(exp, &lsm);
-        return rc;
+        RETURN(rc);
 }
 
 static int osc_punch_interpret(struct ptlrpc_request *req,
@@ -449,7 +474,7 @@ static int osc_punch_interpret(struct ptlrpc_request *req,
                 GOTO(out, rc = -EPROTO);
         }
 
-        memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
+        *aa->aa_oi->oi_oa = body->oa;
 out:
         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
         RETURN(rc);
@@ -462,7 +487,7 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
         struct ptlrpc_request *req;
         struct osc_async_args *aa;
         struct ost_body *body;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
         if (!oinfo->oi_oa) {
@@ -470,20 +495,17 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(-EINVAL);
         }
 
+        size[REQ_REC_OFF + 1] = oinfo->oi_capa? sizeof(struct lustre_capa) : 0;
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_PUNCH, 2, size, NULL);
+                              OST_PUNCH, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        /* FIXME bug 249. Also see bug 7198 */
-        if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
-            OBD_CONNECT_REQPORTAL)
-                req->rq_request_portal = OST_IO_PORTAL;
-
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
+        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
 
+        osc_pack_req_body(req, REQ_REC_OFF, oinfo);
         /* overload the size and blocks fields in the oa with start/end */
+        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
         body->oa.o_size = oinfo->oi_policy.l_extent.start;
         body->oa.o_blocks = oinfo->oi_policy.l_extent.end;
         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
@@ -491,7 +513,7 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
         ptlrpc_req_set_repsize(req, 2, size);
 
         req->rq_interpret_reply = osc_punch_interpret;
-        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = (struct osc_async_args *)&req->rq_async_args;
         aa->aa_oi = oinfo;
         ptlrpc_set_add_req(rqset, req);
@@ -500,11 +522,12 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
 }
 
 static int osc_sync(struct obd_export *exp, struct obdo *oa,
-                    struct lov_stripe_md *md, obd_size start, obd_size end)
+                    struct lov_stripe_md *md, obd_size start, obd_size end,
+                    void *capa)
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
         if (!oa) {
@@ -512,19 +535,22 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
+        size[REQ_REC_OFF + 1] = capa ? sizeof(struct lustre_capa) : 0;
+
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_SYNC, 2, size, NULL);
+                              OST_SYNC, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
-        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        memcpy(&body->oa, oa, sizeof(*oa));
-
         /* overload the size and blocks fields in the oa with start/end */
+        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        body->oa = *oa;
         body->oa.o_size = start;
         body->oa.o_blocks = end;
         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
 
+        osc_pack_capa(req, REQ_REC_OFF + 1, body, capa);
+
         ptlrpc_req_set_repsize(req, 2, size);
 
         rc = ptlrpc_queue_wait(req);
@@ -538,7 +564,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
                 GOTO (out, rc = -EPROTO);
         }
 
-        memcpy(oa, &body->oa, sizeof(*oa));
+        *oa = body->oa;
 
         EXIT;
  out:
@@ -546,6 +572,28 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
         return rc;
 }
 
+/* Find and cancel locally locks matched by @mode in the resource found by
+ * @objid. Found locks are added into @cancel list. Returns the amount of
+ * locks added to @cancels list. */
+static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
+                                   struct list_head *cancels, ldlm_mode_t mode,
+                                   int lock_flags)
+{
+        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+        struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
+        struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
+        int count;
+        ENTRY;
+
+        if (res == NULL)
+                RETURN(0);
+
+        count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
+                                           lock_flags, 0, NULL);
+        ldlm_resource_putref(res);
+        RETURN(count);
+}
+
 /* Destroy requests can be async always on the client, and we don't even really
  * care about the return code since the client cannot do anything at all about
  * a destroy failure.
@@ -560,9 +608,11 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
                        struct obd_export *md_export)
 {
+        CFS_LIST_HEAD(cancels);
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
+        int count, bufcount = 2;
         ENTRY;
 
         if (!oa) {
@@ -570,24 +620,31 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
+        count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
+                                        LDLM_FL_DISCARD_DATA);
+        if (exp_connect_cancelset(exp) && count) {
+                bufcount = 3;
+                size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,
+                                                             OST_DESTROY);
+        }
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_DESTROY, 2, size, NULL);
+                              OST_DESTROY, bufcount, size, NULL);
+        if (exp_connect_cancelset(exp) && req)
+                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
+        else
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+
         if (!req)
                 RETURN(-ENOMEM);
 
-        /* FIXME bug 249. Also see bug 7198 */
-        if (class_exp2cliimp(exp)->imp_connect_data.ocd_connect_flags &
-            OBD_CONNECT_REQPORTAL)
-                req->rq_request_portal = OST_IO_PORTAL;
+        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
 
         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-
-        if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {
+        if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
                        sizeof(*oti->oti_logcookies));
-        }
+        body->oa = *oa;
 
-        memcpy(&body->oa, oa, sizeof(*oa));
         ptlrpc_req_set_repsize(req, 2, size);
 
         ptlrpcd_add_req(req);
@@ -630,7 +687,8 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
 }
 
 /* caller must hold loi_list_lock */
-static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
+static void osc_consume_write_grant(struct client_obd *cli,
+                                    struct brw_page *pga)
 {
         atomic_inc(&obd_dirty_pages);
         cli->cl_dirty += CFS_PAGE_SIZE;
@@ -695,7 +753,7 @@ void osc_wake_cache_waiters(struct client_obd *cli)
         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
                 /* if we can't dirty more, we must wait until some is written */
                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
-                   ((atomic_read(&obd_dirty_pages)+1)>(obd_max_dirty_pages))) {
+                   (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
                                "osc max %ld, sys max %d\n", cli->cl_dirty,
                                cli->cl_dirty_max, obd_max_dirty_pages);
@@ -742,7 +800,8 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
-        cli->cl_avail_grant += body->oa.o_grant;
+        if (body->oa.o_valid & OBD_MD_FLGRANT)
+                cli->cl_avail_grant += body->oa.o_grant;
         /* waiters are woken in brw_interpret_oap */
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 }
@@ -763,7 +822,7 @@ static void handle_short_read(int nob_read, obd_count page_count,
 
                 if (pga[i]->count > nob_read) {
                         /* EOF inside this page */
-                        ptr = cfs_kmap(pga[i]->pg) + 
+                        ptr = cfs_kmap(pga[i]->pg) +
                                 (pga[i]->off & ~CFS_PAGE_MASK);
                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
                         cfs_kunmap(pga[i]->pg);
@@ -840,7 +899,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 }
 
 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
-                                   struct brw_page **pga)
+                                   struct brw_page **pga, int opc)
 {
         __u32 cksum = ~0;
         int i = 0;
@@ -853,7 +912,8 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
 
                 /* corrupt the data before we compute the checksum, to
                  * simulate an OST->client data error */
-                if (i == 0 &&OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
+                if (i == 0 && opc == OST_READ &&
+                    OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
                         memcpy(ptr + off, "bad1", min(4, nob));
                 cksum = crc32_le(cksum, ptr + off, count);
                 cfs_kunmap(pga[i]->pg);
@@ -866,7 +926,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
         }
         /* For sending we only compute the wrong checksum instead
          * of corrupting the data so it is still correct on a redo */
-        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
+        if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
                 cksum++;
 
         return cksum;
@@ -874,22 +934,32 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
 
 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                                 struct lov_stripe_md *lsm, obd_count page_count,
-                                struct brw_page **pga,
-                                struct ptlrpc_request **reqp)
+                                struct brw_page **pga, 
+                                struct ptlrpc_request **reqp,
+                                struct obd_capa *ocapa)
 {
         struct ptlrpc_request   *req;
         struct ptlrpc_bulk_desc *desc;
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
-        int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         int niocount, i, requested_nob, opc, rc;
         struct ptlrpc_request_pool *pool;
+        struct lustre_capa      *capa;
         struct osc_brw_async_args *aa;
 
         ENTRY;
-        opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
-        pool = ((cmd & OBD_BRW_WRITE) != 0) ? cli->cl_import->imp_rq_pool :NULL;
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
+
+        if ((cmd & OBD_BRW_WRITE) != 0) {
+                opc = OST_WRITE;
+                pool = cli->cl_import->imp_rq_pool;
+        } else {
+                opc = OST_READ;
+                pool = NULL;
+        }
 
         for (niocount = i = 1; i < page_count; i++) {
                 if (!can_merge_pages(pga[i - 1], pga[i]))
@@ -898,17 +968,15 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
 
         size[REQ_REC_OFF + 1] = sizeof(*ioobj);
         size[REQ_REC_OFF + 2] = niocount * sizeof(*niobuf);
+        if (ocapa)
+                size[REQ_REC_OFF + 3] = sizeof(*capa);
 
-        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
-        req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 4, size,
-                                   NULL, pool);
+        req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
+                                   size, NULL, pool, NULL);
         if (req == NULL)
                 RETURN (-ENOMEM);
 
-        /* FIXME bug 249. Also see bug 7198 */
-        if (cli->cl_import->imp_connect_data.ocd_connect_flags &
-            OBD_CONNECT_REQPORTAL)
-                req->rq_request_portal = OST_IO_PORTAL;
+        req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
 
         if (opc == OST_WRITE)
                 desc = ptlrpc_prep_bulk_imp (req, page_count,
@@ -925,10 +993,16 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         niobuf = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
                                 niocount * sizeof(*niobuf));
 
-        memcpy(&body->oa, oa, sizeof(*oa));
+        body->oa = *oa;
 
         obdo_to_ioobj(oa, ioobj);
         ioobj->ioo_bufcnt = niocount;
+        if (ocapa) {
+                capa = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 3,
+                                      sizeof(*capa));
+                capa_cpy(capa, ocapa);
+                body->oa.o_valid |= OBD_MD_FLOSSCAPA;
+        }
 
         LASSERT (page_count > 0);
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
@@ -978,7 +1052,8 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 if (unlikely(cli->cl_checksum)) {
                         body->oa.o_valid |= OBD_MD_FLCKSUM;
                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
-                                                             page_count, pga);
+                                                             page_count, pga,
+                                                             OST_WRITE);
                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
                                body->oa.o_cksum);
                         /* save this in 'oa', too, for later checking */
@@ -1005,7 +1080,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         aa->aa_requested_nob = requested_nob;
         aa->aa_nio_count = niocount;
         aa->aa_page_count = page_count;
-        aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
+        aa->aa_resends = 0;
         aa->aa_ppga = pga;
         aa->aa_cli = cli;
         INIT_LIST_HEAD(&aa->aa_oaps);
@@ -1019,8 +1094,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
 }
 
 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
-                                 __u32 client_cksum, __u32 server_cksum, int nob,
-                                 obd_count page_count, struct brw_page **pga)
+                                __u32 client_cksum, __u32 server_cksum,
+                                int nob, obd_count page_count,
+                                struct brw_page **pga)
 {
         __u32 new_cksum;
         char *msg;
@@ -1030,28 +1106,31 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
                 return 0;
         }
 
-        new_cksum = osc_checksum_bulk(nob, page_count, pga);
+        new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
 
         if (new_cksum == server_cksum)
-                msg = "changed on the client after we checksummed it";
+                msg = "changed on the client after we checksummed it - "
+                      "likely false positive due to mmap IO (bug 11742)";
         else if (new_cksum == client_cksum)
                 msg = "changed in transit before arrival at OST";
         else
-                msg = "changed in transit AND doesn't match the original";
-
-        LCONSOLE_ERROR("BAD WRITE CHECKSUM: %s: from %s inum "LPU64"/"LPU64
-                       " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
-                       msg, libcfs_nid2str(peer->nid),
-                       oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
-                       oa->o_valid & OBD_MD_FLFID ? oa->o_generation : (__u64)0,
-                       oa->o_id,
-                       oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
-                       pga[0]->off,
-                       pga[page_count-1]->off + pga[page_count-1]->count - 1);
+                msg = "changed in transit AND doesn't match the original - "
+                      "likely false positive due to mmap IO (bug 11742)";
+
+        LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
+                           LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
+                           "["LPU64"-"LPU64"]\n",
+                           msg, libcfs_nid2str(peer->nid),
+                           oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
+                           oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
+                                                        (__u64)0,
+                           oa->o_id,
+                           oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
+                           pga[0]->off,
+                           pga[page_count-1]->off + pga[page_count-1]->count - 1);
         CERROR("original client csum %x, server csum %x, client csum now %x\n",
                client_cksum, server_cksum, new_cksum);
-
-        return 1;
+        return 1;        
 }
 
 /* Note rc enters this function as number of bytes transferred */
@@ -1101,10 +1180,13 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 if (unlikely((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) &&
                              client_cksum &&
                              check_write_checksum(&body->oa, peer, client_cksum,
-                                                 body->oa.o_cksum,
-                                                 aa->aa_requested_nob,
-                                                 aa->aa_page_count,
-                                                 aa->aa_ppga)))
+                                                  body->oa.o_cksum,
+                                                  aa->aa_requested_nob,
+                                                  aa->aa_page_count,
+                                                  aa->aa_ppga)))
+                        RETURN(-EAGAIN);
+
+                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
                         RETURN(-EAGAIN);
 
                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
@@ -1128,11 +1210,25 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
+        if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
+                                         aa->aa_ppga))
+                GOTO(out, rc = -EAGAIN);
+
         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
                 static int cksum_counter;
-                __u32 server_cksum = body->oa.o_cksum;
+                __u32      server_cksum = body->oa.o_cksum;
+                char      *via;
+                char      *router;
+
                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
-                                                 aa->aa_ppga);
+                                                 aa->aa_ppga, OST_READ);
+
+                if (peer->nid == req->rq_bulk->bd_sender) {
+                        via = router = "";
+                } else {
+                        via = " via ";
+                        router = libcfs_nid2str(req->rq_bulk->bd_sender);
+                }
 
                 if (server_cksum == ~0 && rc > 0) {
                         CERROR("Protocol error: server %s set the 'checksum' "
@@ -1140,21 +1236,23 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                "but please tell CFS.\n",
                                libcfs_nid2str(peer->nid));
                 } else if (server_cksum != client_cksum) {
-                        LCONSOLE_ERROR("%s: BAD READ CHECKSUM: from %s inum "
-                                       LPU64"/"LPU64" object "LPU64"/"LPU64
-                                       " extent ["LPU64"-"LPU64"]\n",
-                                       req->rq_import->imp_obd->obd_name,
-                                       libcfs_nid2str(peer->nid),
-                                       body->oa.o_valid & OBD_MD_FLFID ?
+                        LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
+                                           "%s%s%s inum "LPU64"/"LPU64" object "
+                                           LPU64"/"LPU64" extent "
+                                           "["LPU64"-"LPU64"]\n",
+                                           req->rq_import->imp_obd->obd_name,
+                                           libcfs_nid2str(peer->nid),
+                                           via, router,
+                                           body->oa.o_valid & OBD_MD_FLFID ?
                                                 body->oa.o_fid : (__u64)0,
-                                       body->oa.o_valid & OBD_MD_FLFID ?
+                                           body->oa.o_valid & OBD_MD_FLFID ?
                                                 body->oa.o_generation :(__u64)0,
-                                       body->oa.o_id,
-                                       body->oa.o_valid & OBD_MD_FLGROUP ?
+                                           body->oa.o_id,
+                                           body->oa.o_valid & OBD_MD_FLGROUP ?
                                                 body->oa.o_gr : (__u64)0,
-                                       aa->aa_ppga[0]->off,
-                                       aa->aa_ppga[aa->aa_page_count-1]->off +
-                                       aa->aa_ppga[aa->aa_page_count-1]->count -
+                                           aa->aa_ppga[0]->off,
+                                           aa->aa_ppga[aa->aa_page_count-1]->off +
+                                           aa->aa_ppga[aa->aa_page_count-1]->count -
                                                                         1);
                         CERROR("client %x, server %x\n",
                                client_cksum, server_cksum);
@@ -1178,42 +1276,57 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
         }
 out:
         if (rc >= 0)
-                memcpy(aa->aa_oa, &body->oa, sizeof(*aa->aa_oa));
+                *aa->aa_oa = body->oa;
 
         RETURN(rc);
 }
 
-static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
+static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
                             struct lov_stripe_md *lsm,
-                            obd_count page_count, struct brw_page **pga)
+                            obd_count page_count, struct brw_page **pga,
+                            struct obd_capa *ocapa)
 {
-        struct ptlrpc_request *request;
-        int                    rc, retries = 5; /* lprocfs? */
+        struct ptlrpc_request *req;
+        int                    rc;
+        cfs_waitq_t            waitq;
+        int                    resends = 0;
+        struct l_wait_info     lwi;
+
         ENTRY;
 
+        cfs_waitq_init(&waitq);
+
 restart_bulk:
         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
-                                  page_count, pga, &request);
+                                  page_count, pga, &req, ocapa);
         if (rc != 0)
                 return (rc);
 
-        rc = ptlrpc_queue_wait(request);
+        rc = ptlrpc_queue_wait(req);
 
-        if (rc == -ETIMEDOUT && request->rq_resend) {
-                DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
-                ptlrpc_req_finished(request);
+        if (rc == -ETIMEDOUT && req->rq_resend) {
+                DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
+                ptlrpc_req_finished(req);
                 goto restart_bulk;
         }
 
-        rc = osc_brw_fini_request(request, rc);
+        rc = osc_brw_fini_request(req, rc);
+
+        ptlrpc_req_finished(req);
+        if (osc_recoverable_error(rc)) {
+                resends++;
+                if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
+                        CERROR("too many resend retries, returning error\n");
+                        RETURN(-EIO);
+                }
+
+                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+                l_wait_event(waitq, 0, &lwi);
 
-        ptlrpc_req_finished(request);
-        if (rc == -EAGAIN) {
-                if (retries-- > 0)
-                        goto restart_bulk;
-                rc = -EIO;
+                goto restart_bulk;
         }
-        RETURN(rc);
+        
+        RETURN (rc);
 }
 
 int osc_brw_redo_request(struct ptlrpc_request *request,
@@ -1226,40 +1339,50 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         int rc = 0;
         ENTRY;
 
-        if (aa->aa_retries-- <= 0) {
-                CERROR("too many checksum retries, returning error\n");
+        if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
+                CERROR("too many resend retries, returning error\n");
                 RETURN(-EIO);
         }
+        
+        DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
+/*
+        body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
+                ocapa = lustre_unpack_capa(request->rq_reqmsg,
+                                           REQ_REC_OFF + 3);
+*/
+        rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
+                                        OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
+                                  aa->aa_cli, aa->aa_oa,
+                                  NULL /* lsm unused by osc currently */,
+                                  aa->aa_page_count, aa->aa_ppga, 
+                                  &new_req, NULL /* ocapa */);
+        if (rc)
+                RETURN(rc);
 
-        DEBUG_REQ(D_ERROR, request, "redo for checksum error");
+        client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
+   
         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
                 if (oap->oap_request != NULL) {
                         LASSERTF(request == oap->oap_request,
                                  "request %p != oap_request %p\n",
                                  request, oap->oap_request);
                         if (oap->oap_interrupted) {
-                                ptlrpc_mark_interrupted(oap->oap_request);
-                                rc = -EINTR;
-                                break;
+                                client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
+                                ptlrpc_req_finished(new_req);                        
+                                RETURN(-EINTR);
                         }
                 }
         }
-        if (rc)
-                RETURN(rc);
-
-        rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
-                                        OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
-                                  aa->aa_cli, aa->aa_oa,
-                                  NULL /* lsm unused by osc currently */,
-                                  aa->aa_page_count, aa->aa_ppga, &new_req);
-        if (rc)
-                RETURN(rc);
-
         /* New request takes over pga and oaps from old request.
          * Note that copying a list_head doesn't work, need to move it... */
+        aa->aa_resends++;
         new_req->rq_interpret_reply = request->rq_interpret_reply;
         new_req->rq_async_args = request->rq_async_args;
+        new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
+
         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
+
         INIT_LIST_HEAD(&new_aa->aa_oaps);
         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
         INIT_LIST_HEAD(&aa->aa_oaps);
@@ -1270,29 +1393,39 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
                         oap->oap_request = ptlrpc_request_addref(new_req);
                 }
         }
+        client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
+
+        DEBUG_REQ(D_INFO, new_req, "new request");
 
         ptlrpc_set_add_req(set, new_req);
 
         RETURN(0);
 }
 
-static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
+static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
 {
         struct osc_brw_async_args *aa = data;
         int                        i;
+        int                        nob = rc;
         ENTRY;
 
-        rc = osc_brw_fini_request(request, rc);
-        if (rc == -EAGAIN) {
-                rc = osc_brw_redo_request(request, aa);
+        rc = osc_brw_fini_request(req, rc);
+        if (osc_recoverable_error(rc)) {
+                rc = osc_brw_redo_request(req, aa);
                 if (rc == 0)
                         RETURN(0);
         }
+        if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
+                atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
 
-        spin_lock(&aa->aa_cli->cl_loi_list_lock);
+        client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
+        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
+                aa->aa_cli->cl_w_in_flight--;
+        else
+                aa->aa_cli->cl_r_in_flight--;
         for (i = 0; i < aa->aa_page_count; i++)
                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
-        spin_unlock(&aa->aa_cli->cl_loi_list_lock);
+        client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
 
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
 
@@ -1301,35 +1434,56 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc)
 
 static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
                           struct lov_stripe_md *lsm, obd_count page_count,
-                          struct brw_page **pga, struct ptlrpc_request_set *set)
+                          struct brw_page **pga, struct ptlrpc_request_set *set,
+                          struct obd_capa *ocapa)
 {
-        struct ptlrpc_request     *request;
+        struct ptlrpc_request     *req;
         struct client_obd         *cli = &exp->exp_obd->u.cli;
         int                        rc, i;
+        struct osc_brw_async_args *aa;
         ENTRY;
 
         /* Consume write credits even if doing a sync write -
          * otherwise we may run out of space on OST due to grant. */
-        spin_lock(&cli->cl_loi_list_lock);
-        for (i = 0; i < page_count; i++) {
-                if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
-                        osc_consume_write_grant(cli, pga[i]);
+        if (cmd == OBD_BRW_WRITE) {
+                spin_lock(&cli->cl_loi_list_lock);
+                for (i = 0; i < page_count; i++) {
+                        if (cli->cl_avail_grant >= CFS_PAGE_SIZE)
+                                osc_consume_write_grant(cli, pga[i]);
+                }
+                spin_unlock(&cli->cl_loi_list_lock);
         }
-        spin_unlock(&cli->cl_loi_list_lock);
 
-        rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
-                                  page_count, pga, &request);
+        rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
+                                  &req, ocapa);
 
-        if (rc == 0) {
-                request->rq_interpret_reply = brw_interpret;
-                ptlrpc_set_add_req(set, request);
+        aa = (struct osc_brw_async_args *)&req->rq_async_args;
+        if (cmd == OBD_BRW_READ) {
+                lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
+                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
+                ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
         } else {
-                spin_lock(&cli->cl_loi_list_lock);
+                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
+                lprocfs_oh_tally(&cli->cl_write_rpc_hist,
+                                 cli->cl_w_in_flight);
+                ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
+        }
+
+        if (rc == 0) {
+                req->rq_interpret_reply = brw_interpret;
+                ptlrpc_set_add_req(set, req);
+                client_obd_list_lock(&cli->cl_loi_list_lock);
+                if (cmd == OBD_BRW_READ)
+                        cli->cl_r_in_flight++;
+                else
+                        cli->cl_w_in_flight++;
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
+        } else if (cmd == OBD_BRW_WRITE) {
+                client_obd_list_lock(&cli->cl_loi_list_lock);
                 for (i = 0; i < page_count; i++)
                         osc_release_write_grant(cli, pga[i], 0);
-                spin_unlock(&cli->cl_loi_list_lock);
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
         }
-
         RETURN (rc);
 }
 
@@ -1355,7 +1509,7 @@ static void sort_brw_pages(struct brw_page **array, int num)
                 for (i = stride ; i < num ; i++) {
                         tmp = array[i];
                         j = i;
-                        while (j >= stride && array[j-stride]->off > tmp->off) {
+                        while (j >= stride && array[j - stride]->off > tmp->off) {
                                 array[j] = array[j - stride];
                                 j -= stride;
                         }
@@ -1371,7 +1525,7 @@ static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
         int i = 0;
 
         LASSERT (pages > 0);
-        offset = pg[i]->off & (~CFS_PAGE_MASK);
+        offset = pg[i]->off & ~CFS_PAGE_MASK;
 
         for (;;) {
                 pages--;
@@ -1382,7 +1536,7 @@ static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
                         return count;   /* doesn't end on page boundary */
 
                 i++;
-                offset = pg[i]->off & (~CFS_PAGE_MASK);
+                offset = pg[i]->off & ~CFS_PAGE_MASK;
                 if (offset != 0)        /* doesn't start on page boundary */
                         return count;
 
@@ -1456,14 +1610,14 @@ static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
                         *oinfo->oi_oa = *saved_oa;
                 } else if (page_count > pages_per_brw) {
                         /* save a copy of oa (brw will clobber it) */
-                        saved_oa = obdo_alloc();
+                        OBDO_ALLOC(saved_oa);
                         if (saved_oa == NULL)
                                 GOTO(out, rc = -ENOMEM);
                         *saved_oa = *oinfo->oi_oa;
                 }
 
                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
-                                      pages_per_brw, ppga);
+                                      pages_per_brw, ppga, oinfo->oi_capa);
 
                 if (rc != 0)
                         break;
@@ -1476,7 +1630,7 @@ out:
         osc_release_ppga(orig, page_count_orig);
 
         if (saved_oa != NULL)
-                obdo_free(saved_oa);
+                OBDO_FREE(saved_oa);
 
         RETURN(rc);
 }
@@ -1487,14 +1641,15 @@ static int osc_brw_async(int cmd, struct obd_export *exp,
                          struct ptlrpc_request_set *set)
 {
         struct brw_page **ppga, **orig;
+        struct client_obd *cli = &exp->exp_obd->u.cli;
         int page_count_orig;
         int rc = 0;
         ENTRY;
 
         if (cmd & OBD_BRW_CHECK) {
+                struct obd_import *imp = class_exp2cliimp(exp);
                 /* The caller just wants to know if there's a chance that this
                  * I/O can succeed */
-                struct obd_import *imp = class_exp2cliimp(exp);
 
                 if (imp == NULL || imp->imp_invalid)
                         RETURN(-EIO);
@@ -1512,28 +1667,27 @@ static int osc_brw_async(int cmd, struct obd_export *exp,
                 obd_count pages_per_brw;
 
                 pages_per_brw = min_t(obd_count, page_count,
-                    class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
+                                      cli->cl_max_pages_per_rpc);
 
                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
 
                 /* use ppga only if single RPC is going to fly */
                 if (pages_per_brw != page_count_orig || ppga != orig) {
-                        OBD_ALLOC(copy, pages_per_brw * sizeof(*copy));
+                        OBD_ALLOC(copy, sizeof(*copy) * pages_per_brw);
                         if (copy == NULL)
                                 GOTO(out, rc = -ENOMEM);
-                        memcpy(copy, ppga, pages_per_brw * sizeof(*copy));
+                        memcpy(copy, ppga, sizeof(*copy) * pages_per_brw);
                 } else
                         copy = ppga;
 
                 rc = async_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
-                                    pages_per_brw, copy, set);
+                                    pages_per_brw, copy, set, oinfo->oi_capa);
 
                 if (rc != 0) {
                         if (copy != ppga)
-                                OBD_FREE(copy, pages_per_brw * sizeof(*copy));
+                                OBD_FREE(copy, sizeof(*copy) * pages_per_brw);
                         break;
                 }
-
                 if (copy == orig) {
                         /* we passed it to async_internal() which is
                          * now responsible for releasing memory */
@@ -1560,6 +1714,7 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
 }
 
+
 /* This maintains the lists of pending pages to read/write for a given object
  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
  * to quickly find objects that are ready to send an RPC. */
@@ -1587,7 +1742,6 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
                 RETURN(1);
         }
-
         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
         optimal = cli->cl_max_pages_per_rpc;
         if (cmd & OBD_BRW_WRITE) {
@@ -1598,7 +1752,6 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
                         RETURN(1);
                 }
-
                 /* +16 to avoid triggering rpcs that would want to include pages
                  * that are being queued but which can't be made ready until
                  * the queuer finishes with the page. this is a wart for
@@ -1696,7 +1849,7 @@ unlock:
  * the app does an fsync.  As long as errors persist we force future rpcs to be
  * sync so that the app can get a sync error and break the cycle of queueing
  * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
+static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                            int rc)
 {
         if (rc) {
@@ -1709,7 +1862,7 @@ static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
 
         }
 
-        if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
+        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
                 ar->ar_force_sync = 0;
 }
 
@@ -1733,18 +1886,21 @@ static void osc_oap_to_pending(struct osc_async_page *oap)
 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
                               struct osc_async_page *oap, int sent, int rc)
 {
+        __u64 xid = 0;
+
         ENTRY;
+        if (oap->oap_request != NULL) {
+                xid = ptlrpc_req_xid(oap->oap_request);
+                ptlrpc_req_finished(oap->oap_request);
+                oap->oap_request = NULL;
+        }
+
         oap->oap_async_flags = 0;
         oap->oap_interrupted = 0;
 
         if (oap->oap_cmd & OBD_BRW_WRITE) {
-                osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
-                osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
-        }
-
-        if (oap->oap_request != NULL) {
-                ptlrpc_req_finished(oap->oap_request);
-                oap->oap_request = NULL;
+                osc_process_ar(&cli->cl_ar, xid, rc);
+                osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
         }
 
         if (rc == 0 && oa != NULL) {
@@ -1780,20 +1936,19 @@ static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
         EXIT;
 }
 
-static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
+static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
 {
-        struct osc_brw_async_args *aa = data;
         struct osc_async_page *oap, *tmp;
+        struct osc_brw_async_args *aa = data;
         struct client_obd *cli;
         ENTRY;
 
-        rc = osc_brw_fini_request(request, rc);
-        CDEBUG(D_INODE, "request %p aa %p rc %d\n", request, aa, rc);
-        if (rc == -EAGAIN) {
-                rc = osc_brw_redo_request(request, aa);
+        rc = osc_brw_fini_request(req, rc);
+        CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+        if (osc_recoverable_error(rc)) {
+                rc = osc_brw_redo_request(req, aa);
                 if (rc == 0)
                         RETURN(0);
-                GOTO(out, rc);
         }
 
         cli = aa->aa_cli;
@@ -1803,7 +1958,7 @@ static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
          * is called so we know whether to go to sync BRWs or wait for more
          * RPCs to complete */
-        if (lustre_msg_get_opc(request->rq_reqmsg) == OST_WRITE)
+        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
                 cli->cl_w_in_flight--;
         else
                 cli->cl_r_in_flight--;
@@ -1820,10 +1975,8 @@ static int brw_interpret_oap(struct ptlrpc_request *request, void *data, int rc)
 
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-        obdo_free(aa->aa_oa);
-
-        rc = 0;
-out:
+        OBDO_FREE(aa->aa_oa);
+        
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
         RETURN(rc);
 }
@@ -1838,6 +1991,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
         struct obdo *oa = NULL;
         struct obd_async_page_ops *ops = NULL;
         void *caller_data = NULL;
+        struct obd_capa *ocapa;
         struct osc_async_page *oap;
         int i, rc;
 
@@ -1848,7 +2002,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
         if (pga == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
 
-        oa = obdo_alloc();
+        OBDO_ALLOC(oa);
         if (oa == NULL)
                 GOTO(out, req = ERR_PTR(-ENOMEM));
 
@@ -1868,9 +2022,12 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
         /* always get the data for the obdo for the rpc */
         LASSERT(ops != NULL);
         ops->ap_fill_obdo(caller_data, cmd, oa);
+        ocapa = ops->ap_lookup_capa(caller_data, cmd);
 
         sort_brw_pages(pga, page_count);
-        rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req);
+        rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
+                                  pga, &req, ocapa);
+        capa_put(ocapa);
         if (rc != 0) {
                 CERROR("prep_req failed: %d\n", rc);
                 GOTO(out, req = ERR_PTR(rc));
@@ -1893,7 +2050,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
 out:
         if (IS_ERR(req)) {
                 if (oa)
-                        obdo_free(oa);
+                        OBDO_FREE(oa);
                 if (pga)
                         OBD_FREE(pga, sizeof(*pga) * page_count);
         }
@@ -1916,7 +2073,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
         ENTRY;
 
         /* first we find the pages we're allowed to work with */
-        list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
+        list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
+                                 oap_pending_item) {
                 ops = oap->oap_caller_ops;
 
                 LASSERT(oap->oap_magic == OAP_MAGIC);
@@ -2053,6 +2211,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
         }
 
         aa = (struct osc_brw_async_args *)&req->rq_async_args;
+
         if (cmd == OBD_BRW_READ) {
                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
@@ -2228,6 +2387,7 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
 {
         struct osc_cache_waiter ocw;
         struct l_wait_info lwi = { 0 };
+
         ENTRY;
 
         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
@@ -2345,7 +2505,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
                 struct obd_async_page_ops *ops;
                 struct obdo *oa;
 
-                oa = obdo_alloc();
+                OBDO_ALLOC(oa);
                 if (oa == NULL)
                         RETURN(-ENOMEM);
 
@@ -2355,7 +2515,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
                     NO_QUOTA)
                         rc = -EDQUOT;
 
-                obdo_free(oa);
+                OBDO_FREE(oa);
                 if (rc)
                         RETURN(rc);
         }
@@ -2606,8 +2766,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                 return;
         }
         lock_res_and_lock(lock);
-#ifdef __KERNEL__
-#ifdef __LINUX__
+#if defined (__KERNEL__) && defined (__LINUX__)
         /* Liang XXX: Darwin and Winnt checking should be added */
         if (lock->l_ast_data && lock->l_ast_data != data) {
                 struct inode *new_inode = data;
@@ -2622,7 +2781,6 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                          new_inode, new_inode->i_ino, new_inode->i_generation);
         }
 #endif
-#endif
         lock->l_ast_data = data;
         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
         unlock_res_and_lock(lock);
@@ -2632,9 +2790,12 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
                              ldlm_iterator_t replace, void *data)
 {
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = class_exp2obd(exp);
 
+        res_id.name[0] = lsm->lsm_object_id;
+        res_id.name[2] = lsm->lsm_object_gr;
+
         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
         return 0;
 }
@@ -2650,7 +2811,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
                         struct ldlm_reply *rep;
 
                         /* swabbed by ldlm_cli_enqueue() */
-                        LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
+                        LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
                                              sizeof(*rep));
                         LASSERT(rep != NULL);
@@ -2674,7 +2835,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
 static int osc_enqueue_interpret(struct ptlrpc_request *req,
                                  struct osc_enqueue_args *aa, int rc)
 {
-        int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
+        int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
         struct ldlm_lock *lock;
 
@@ -2685,7 +2846,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
                                    aa->oa_ei->ei_mode,
-                                   &aa->oa_ei->ei_flags,
+                                   &aa->oa_oi->oi_flags,
                                    &lsm->lsm_oinfo[0]->loi_lvb,
                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
                                    lustre_swab_ost_lvb,
@@ -2712,16 +2873,21 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
  * is excluded from the cluster -- such scenarious make the life difficult, so
  * release locks just after they are obtained. */
 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
-                       struct obd_enqueue_info *einfo)
+                       struct ldlm_enqueue_info *einfo,
+                       struct ptlrpc_request_set *rqset)
 {
-        struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = exp->exp_obd;
         struct ldlm_reply *rep;
         struct ptlrpc_request *req = NULL;
-        int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
+        int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
+        ldlm_mode_t mode;
         int rc;
         ENTRY;
 
+        res_id.name[0] = oinfo->oi_md->lsm_object_id;
+        res_id.name[2] = oinfo->oi_md->lsm_object_gr;
+
         /* Filesystem lock extents are extended to page boundaries so that
          * dealing with the page cache is a little smoother.  */
         oinfo->oi_policy.l_extent.start -=
@@ -2732,12 +2898,31 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 goto no_match;
 
         /* Next, search for already existing extent locks that will cover us */
-        rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
-                             einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
-                             oinfo->oi_lockh);
-        if (rc == 1) {
+        /* If we're trying to read, we also search for an existing PW lock.  The
+         * VFS and page cache already protect us locally, so lots of readers/
+         * writers can share a single PW lock.
+         *
+         * There are problems with conversion deadlocks, so instead of
+         * converting a read lock to a write lock, we'll just enqueue a new
+         * one.
+         *
+         * At some point we should cancel the read lock instead of making them
+         * send us a blocking callback, but there are problems with canceling
+         * locks out from other users right now, too. */
+        mode = einfo->ei_mode;
+        if (einfo->ei_mode == LCK_PR)
+                mode |= LCK_PW;
+        mode = ldlm_lock_match(obd->obd_namespace,
+                               oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
+                               einfo->ei_type, &oinfo->oi_policy, mode,
+                               oinfo->oi_lockh);
+        if (mode) {
+                /* addref the lock only if not async requests and PW lock is
+                 * matched whereas we asked for PR. */
+                if (!rqset && einfo->ei_mode != mode)
+                        ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
-                                        einfo->ei_flags);
+                                        oinfo->oi_flags);
                 if (intent) {
                         /* I would like to be able to ASSERT here that rss <=
                          * kms, but I can't, for reasons which are explained in
@@ -2748,84 +2933,51 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
 
                 /* For async requests, decref the lock. */
-                if (einfo->ei_rqset)
+                if (einfo->ei_mode != mode)
+                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
+                else if (rqset)
                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
 
                 RETURN(ELDLM_OK);
         }
 
-        /* If we're trying to read, we also search for an existing PW lock.  The
-         * VFS and page cache already protect us locally, so lots of readers/
-         * writers can share a single PW lock.
-         *
-         * There are problems with conversion deadlocks, so instead of
-         * converting a read lock to a write lock, we'll just enqueue a new
-         * one.
-         *
-         * At some point we should cancel the read lock instead of making them
-         * send us a blocking callback, but there are problems with canceling
-         * locks out from other users right now, too. */
-
-        if (einfo->ei_mode == LCK_PR) {
-                rc = ldlm_lock_match(obd->obd_namespace, einfo->ei_flags | LDLM_FL_LVB_READY,
-                                     &res_id, einfo->ei_type, &oinfo->oi_policy,
-                                     LCK_PW, oinfo->oi_lockh);
-                if (rc == 1) {
-                        /* FIXME: This is not incredibly elegant, but it might
-                         * be more elegant than adding another parameter to
-                         * lock_match.  I want a second opinion. */
-                        /* addref the lock only if not async requests. */
-                        if (!einfo->ei_rqset)
-                                ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
-                        osc_set_data_with_check(oinfo->oi_lockh,
-                                                einfo->ei_cbdata,
-                                                einfo->ei_flags);
-                        oinfo->oi_cb_up(oinfo, ELDLM_OK);
-                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
-                        RETURN(ELDLM_OK);
-                }
-        }
-
  no_match:
         if (intent) {
                 int size[3] = {
                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
-                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
+                        [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
+                        [DLM_LOCKREQ_OFF + 1] = 0 };
 
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                                      LDLM_ENQUEUE, 2, size, NULL);
+                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
                 if (req == NULL)
                         RETURN(-ENOMEM);
 
                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
-                size[DLM_REPLY_REC_OFF] = 
+                size[DLM_REPLY_REC_OFF] =
                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
                 ptlrpc_req_set_repsize(req, 3, size);
         }
 
         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
-        einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
+        oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
 
-        rc = ldlm_cli_enqueue(exp, &req, res_id, einfo->ei_type,
-                              &oinfo->oi_policy, einfo->ei_mode,
-                              &einfo->ei_flags, einfo->ei_cb_bl,
-                              einfo->ei_cb_cp, einfo->ei_cb_gl,
-                              einfo->ei_cbdata,
+        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
+                              &oinfo->oi_policy, &oinfo->oi_flags,
                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
                               lustre_swab_ost_lvb, oinfo->oi_lockh,
-                              einfo->ei_rqset ? 1 : 0);
-        if (einfo->ei_rqset) {
+                              rqset ? 1 : 0);
+        if (rqset) {
                 if (!rc) {
                         struct osc_enqueue_args *aa;
-                        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+                        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
                         aa = (struct osc_enqueue_args *)&req->rq_async_args;
                         aa->oa_oi = oinfo;
                         aa->oa_ei = einfo;
                         aa->oa_exp = exp;
 
                         req->rq_interpret_reply = osc_enqueue_interpret;
-                        ptlrpc_set_add_req(einfo->ei_rqset, req);
+                        ptlrpc_set_add_req(rqset, req);
                 } else if (intent) {
                         ptlrpc_req_finished(req);
                 }
@@ -2843,12 +2995,15 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
                      int *flags, void *data, struct lustre_handle *lockh)
 {
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = exp->exp_obd;
-        int rc;
         int lflags = *flags;
+        ldlm_mode_t rc;
         ENTRY;
 
+        res_id.name[0] = lsm->lsm_object_id;
+        res_id.name[2] = lsm->lsm_object_gr;
+
         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
 
         /* Filesystem lock extents are extended to page boundaries so that
@@ -2857,28 +3012,21 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         policy->l_extent.end |= ~CFS_PAGE_MASK;
 
         /* Next, search for already existing extent locks that will cover us */
-        rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, &res_id, type,
-                             policy, mode, lockh);
-        if (rc) {
-                //if (!(*flags & LDLM_FL_TEST_LOCK))
-                        osc_set_data_with_check(lockh, data, lflags);
-                RETURN(rc);
-        }
         /* If we're trying to read, we also search for an existing PW lock.  The
          * VFS and page cache already protect us locally, so lots of readers/
          * writers can share a single PW lock. */
-        if (mode == LCK_PR) {
-                rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY, 
-                                     &res_id, type,
-                                     policy, LCK_PW, lockh);
-                if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
-                        /* FIXME: This is not incredibly elegant, but it might
-                         * be more elegant than adding another parameter to
-                         * lock_match.  I want a second opinion. */
-                        osc_set_data_with_check(lockh, data, lflags);
+        rc = mode;
+        if (mode == LCK_PR)
+                rc |= LCK_PW;
+        rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
+                             &res_id, type, policy, rc, lockh);
+        if (rc) {
+                osc_set_data_with_check(lockh, data, lflags);
+                if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
                         ldlm_lock_addref(lockh, LCK_PR);
                         ldlm_lock_decref(lockh, LCK_PW);
                 }
+                RETURN(rc);
         }
         RETURN(rc);
 }
@@ -2897,22 +3045,34 @@ static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
 }
 
 static int osc_cancel_unused(struct obd_export *exp,
-                             struct lov_stripe_md *lsm, int flags, void *opaque)
+                             struct lov_stripe_md *lsm, int flags,
+                             void *opaque)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
 
-        return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
-                                      opaque);
+        if (lsm != NULL) {
+                res_id.name[0] = lsm->lsm_object_id;
+                res_id.name[2] = lsm->lsm_object_gr;
+                resp = &res_id;
+        }
+
+        return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
 }
 
 static int osc_join_lru(struct obd_export *exp,
                         struct lov_stripe_md *lsm, int join)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL;
 
-        return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
+        if (lsm != NULL) {
+                res_id.name[0] = lsm->lsm_object_id;
+                res_id.name[2] = lsm->lsm_object_gr;
+                resp = &res_id;
+        }
+
+        return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
 }
 
 static int osc_statfs_interpret(struct ptlrpc_request *req,
@@ -2960,7 +3120,7 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
         req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
 
         req->rq_interpret_reply = osc_statfs_interpret;
-        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+        CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
         aa = (struct osc_async_args *)&req->rq_async_args;
         aa->aa_oi = oinfo;
 
@@ -3037,12 +3197,14 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
                         RETURN(-ENOMEM);
 
                 lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
+                lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr;
         } else {
                 lum_size = sizeof(lum);
                 lumk = &lum;
         }
 
         lumk->lmm_object_id = lsm->lsm_object_id;
+        lumk->lmm_object_gr = lsm->lsm_object_gr;
         lumk->lmm_stripe_count = 1;
 
         if (copy_to_user(lump, lumk, lum_size))
@@ -3210,9 +3372,11 @@ static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
                                "ctxt %p: %d\n", ctxt, rc);
         }
 
+        spin_lock(&imp->imp_lock);
         imp->imp_server_timeout = 1;
-        CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
         imp->imp_pingable = 1;
+        spin_unlock(&imp->imp_lock);
+        CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
 
         RETURN(rc);
 }
@@ -3252,7 +3416,9 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
         if (KEY_IS(KEY_INIT_RECOV)) {
                 if (vallen != sizeof(int))
                         RETURN(-EINVAL);
+                spin_lock(&imp->imp_lock);
                 imp->imp_initial_recov = *(int *)val;
+                spin_unlock(&imp->imp_lock);
                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
                        exp->exp_obd->obd_name,
                        imp->imp_initial_recov);
@@ -3266,6 +3432,11 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
+        if (KEY_IS(KEY_FLUSH_CTX)) {
+                sptlrpc_import_flush_my_ctx(imp);
+                RETURN(0);
+        }
+
         if (!set)
                 RETURN(-EINVAL);
 
@@ -3281,8 +3452,14 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        if (KEY_IS("mds_conn"))
+        if (KEY_IS(KEY_MDS_CONN)) {
+                struct osc_creator *oscc = &obd->u.cli.cl_oscc;
+
+                oscc->oscc_oa.o_gr = (*(__u32 *)val);
+                oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
+                LASSERT(oscc->oscc_oa.o_gr > 0);
                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
+        }
 
         ptlrpc_req_set_repsize(req, 1, NULL);
         ptlrpc_set_add_req(set, req);
@@ -3297,9 +3474,9 @@ static struct llog_operations osc_size_repl_logops = {
 };
 
 static struct llog_operations osc_mds_ost_orig_logops;
-static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                         int count, struct llog_catid *catid, 
-                         struct obd_uuid *uuid)
+static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
+                         struct obd_device *tgt, int count,
+                         struct llog_catid *catid, struct obd_uuid *uuid)
 {
         int rc;
         ENTRY;
@@ -3314,20 +3491,20 @@ static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
         }
         spin_unlock(&obd->obd_dev_lock);
 
-        rc = llog_setup(obd, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
+        rc = llog_setup(obd, llogs, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
                         &catid->lci_logid, &osc_mds_ost_orig_logops);
         if (rc) {
                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
                 GOTO (out, rc);
         }
 
-        rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
+        rc = llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
                         &osc_size_repl_logops);
-        if (rc) 
+        if (rc)
                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
 out:
         if (rc) {
-                CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
+                CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
                        obd->obd_name, tgt->obd_name, count, catid, rc);
                 CERROR("logid "LPX64":0x%x\n",
                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
@@ -3415,7 +3592,11 @@ static int osc_import_event(struct obd_device *obd,
                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
                         spin_unlock(&oscc->oscc_lock);
                 }
-
+                cli = &obd->u.cli;
+                client_obd_list_lock(&cli->cl_loi_list_lock);
+                cli->cl_avail_grant = 0;
+                cli->cl_lost_grant = 0;
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
                 break;
         }
         case IMP_EVENT_INACTIVE: {
@@ -3428,8 +3609,6 @@ static int osc_import_event(struct obd_device *obd,
                 /* Reset grants */
                 cli = &obd->u.cli;
                 client_obd_list_lock(&cli->cl_loi_list_lock);
-                cli->cl_avail_grant = 0;
-                cli->cl_lost_grant = 0;
                 /* all pages go to failing rpcs due to the invalid import */
                 osc_check_rpcs(cli);
                 client_obd_list_unlock(&cli->cl_loi_list_lock);
@@ -3470,7 +3649,7 @@ static int osc_import_event(struct obd_device *obd,
         RETURN(rc);
 }
 
-int osc_setup(struct obd_device *obd, obd_count len, void *buf)
+int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 {
         int rc;
         ENTRY;
@@ -3480,7 +3659,7 @@ int osc_setup(struct obd_device *obd, obd_count len, void *buf)
         if (rc)
                 RETURN(rc);
 
-        rc = client_obd_setup(obd, len, buf);
+        rc = client_obd_setup(obd, lcfg);
         if (rc) {
                 ptlrpcd_decref();
         } else {
@@ -3520,6 +3699,9 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
                 ptlrpc_deactivate_import(imp);
+                spin_lock(&imp->imp_lock);
+                imp->imp_pingable = 0;
+                spin_unlock(&imp->imp_lock);
                 break;
         }
         case OBD_CLEANUP_EXPORTS: {
@@ -3597,6 +3779,7 @@ struct obd_ops osc_obd_ops = {
         .o_statfs_async         = osc_statfs_async,
         .o_packmd               = osc_packmd,
         .o_unpackmd             = osc_unpackmd,
+        .o_precreate            = osc_precreate,
         .o_create               = osc_create,
         .o_destroy              = osc_destroy,
         .o_getattr              = osc_getattr,
@@ -3627,7 +3810,6 @@ struct obd_ops osc_obd_ops = {
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
 };
-
 int __init osc_init(void)
 {
         struct lprocfs_static_vars lvars;
@@ -3641,8 +3823,8 @@ int __init osc_init(void)
         lquota_init(quota_interface);
         init_obd_quota_ops(quota_interface, &osc_obd_ops);
 
-        rc = class_register_type(&osc_obd_ops, lvars.module_vars,
-                                 LUSTRE_OSC_NAME);
+        rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
+                                 LUSTRE_OSC_NAME, NULL);
         if (rc) {
                 if (quota_interface)
                         PORTAL_SYMBOL_PUT(osc_quota_interface);