Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 88c46c5..795ca4a 100644 (file)
@@ -450,13 +450,12 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
 
         CDEBUG(D_HA, "transno: "LPD64"\n",
                lustre_msg_get_transno(req->rq_repmsg));
-        EXIT;
 out_req:
         ptlrpc_req_finished(req);
 out:
         if (rc && !*ea)
                 obd_free_memmd(exp, &lsm);
-        return rc;
+        RETURN(rc);
 }
 
 static int osc_punch_interpret(struct ptlrpc_request *req,
@@ -625,11 +624,12 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
                                         LDLM_FL_DISCARD_DATA);
         if (exp_connect_cancelset(exp) && count) {
                 bufcount = 3;
-                size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
+                size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,
+                                                             OST_DESTROY);
         }
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
                               OST_DESTROY, bufcount, size, NULL);
-        if (req)
+        if (exp_connect_cancelset(exp) && req)
                 ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
         else
                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
@@ -800,7 +800,8 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
 {
         client_obd_list_lock(&cli->cl_loi_list_lock);
         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
-        cli->cl_avail_grant += body->oa.o_grant;
+        if (body->oa.o_valid & OBD_MD_FLGRANT)
+                cli->cl_avail_grant += body->oa.o_grant;
         /* waiters are woken in brw_interpret_oap */
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 }
@@ -898,7 +899,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
 }
 
 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
-                                   struct brw_page **pga)
+                                   struct brw_page **pga, int opc)
 {
         __u32 cksum = ~0;
         int i = 0;
@@ -911,7 +912,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
 
                 /* corrupt the data before we compute the checksum, to
                  * simulate an OST->client data error */
-                if (i == 0 &&
+                if (i == 0 && opc == OST_READ &&
                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
                         memcpy(ptr + off, "bad1", min(4, nob));
                 cksum = crc32_le(cksum, ptr + off, count);
@@ -925,7 +926,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
         }
         /* For sending we only compute the wrong checksum instead
          * of corrupting the data so it is still correct on a redo */
-        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
+        if (opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OSC_CHECKSUM_SEND))
                 cksum++;
 
         return cksum;
@@ -949,6 +950,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         struct osc_brw_async_args *aa;
 
         ENTRY;
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM); /* Recoverable */
+        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ2, -EINVAL); /* Fatal */
+
         if ((cmd & OBD_BRW_WRITE) != 0) {
                 opc = OST_WRITE;
                 pool = cli->cl_import->imp_rq_pool;
@@ -967,7 +971,6 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         if (ocapa)
                 size[REQ_REC_OFF + 3] = sizeof(*capa);
 
-        OBD_FAIL_RETURN(OBD_FAIL_OSC_BRW_PREP_REQ, -ENOMEM);
         req = ptlrpc_prep_req_pool(cli->cl_import, LUSTRE_OST_VERSION, opc, 5,
                                    size, NULL, pool, NULL);
         if (req == NULL)
@@ -1049,7 +1052,8 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 if (unlikely(cli->cl_checksum)) {
                         body->oa.o_valid |= OBD_MD_FLCKSUM;
                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
-                                                             page_count, pga);
+                                                             page_count, pga,
+                                                             OST_WRITE);
                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
                                body->oa.o_cksum);
                         /* save this in 'oa', too, for later checking */
@@ -1076,7 +1080,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         aa->aa_requested_nob = requested_nob;
         aa->aa_nio_count = niocount;
         aa->aa_page_count = page_count;
-        aa->aa_retries = 5;     /*retry for checksum errors; lprocfs? */
+        aa->aa_resends = 0;
         aa->aa_ppga = pga;
         aa->aa_cli = cli;
         INIT_LIST_HEAD(&aa->aa_oaps);
@@ -1102,7 +1106,7 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
                 return 0;
         }
 
-        new_cksum = osc_checksum_bulk(nob, page_count, pga);
+        new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE);
 
         if (new_cksum == server_cksum)
                 msg = "changed on the client after we checksummed it - "
@@ -1182,7 +1186,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                                   aa->aa_ppga)))
                         RETURN(-EAGAIN);
 
-                sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
+                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+                        RETURN(-EAGAIN);
 
                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
                                      aa->aa_page_count, aa->aa_ppga);
@@ -1205,6 +1210,10 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
+        if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
+                                         aa->aa_ppga))
+                GOTO(out, rc = -EAGAIN);
+
         if (unlikely(body->oa.o_valid & OBD_MD_FLCKSUM)) {
                 static int cksum_counter;
                 __u32      server_cksum = body->oa.o_cksum;
@@ -1212,7 +1221,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 char      *router;
 
                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
-                                                 aa->aa_ppga);
+                                                 aa->aa_ppga, OST_READ);
 
                 if (peer->nid == req->rq_bulk->bd_sender) {
                         via = router = "";
@@ -1269,8 +1278,6 @@ out:
         if (rc >= 0)
                 *aa->aa_oa = body->oa;
 
-        sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count, aa->aa_ppga);
-
         RETURN(rc);
 }
 
@@ -1280,9 +1287,15 @@ static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
                             struct obd_capa *ocapa)
 {
         struct ptlrpc_request *req;
-        int                    rc, retries = 5; /* lprocfs? */
+        int                    rc;
+        cfs_waitq_t            waitq;
+        int                    resends = 0;
+        struct l_wait_info     lwi;
+
         ENTRY;
 
+        cfs_waitq_init(&waitq);
+
 restart_bulk:
         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
                                   page_count, pga, &req, ocapa);
@@ -1300,59 +1313,76 @@ restart_bulk:
         rc = osc_brw_fini_request(req, rc);
 
         ptlrpc_req_finished(req);
-        if (rc == -EAGAIN) {
-                if (retries-- > 0)
-                        goto restart_bulk;
-                rc = -EIO;
+        if (osc_recoverable_error(rc)) {
+                resends++;
+                if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
+                        CERROR("too many resend retries, returning error\n");
+                        RETURN(-EIO);
+                }
+
+                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+                l_wait_event(waitq, 0, &lwi);
+
+                goto restart_bulk;
         }
+        
         RETURN (rc);
 }
 
-int osc_brw_redo_request(struct ptlrpc_request *req,
+int osc_brw_redo_request(struct ptlrpc_request *request,
                          struct osc_brw_async_args *aa)
 {
         struct ptlrpc_request *new_req;
-        struct ptlrpc_request_set *set = req->rq_set;
+        struct ptlrpc_request_set *set = request->rq_set;
         struct osc_brw_async_args *new_aa;
         struct osc_async_page *oap;
         int rc = 0;
         ENTRY;
 
-        if (aa->aa_retries-- <= 0) {
-                CERROR("too many checksum retries, returning error\n");
+        if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
+                CERROR("too many resend retries, returning error\n");
                 RETURN(-EIO);
         }
+        
+        DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
+/*
+        body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
+                ocapa = lustre_unpack_capa(request->rq_reqmsg,
+                                           REQ_REC_OFF + 3);
+*/
+        rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
+                                        OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
+                                  aa->aa_cli, aa->aa_oa,
+                                  NULL /* lsm unused by osc currently */,
+                                  aa->aa_page_count, aa->aa_ppga, 
+                                  &new_req, NULL /* ocapa */);
+        if (rc)
+                RETURN(rc);
 
-        DEBUG_REQ(D_ERROR, req, "redo for checksum error");
+        client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
+   
         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
                 if (oap->oap_request != NULL) {
-                        LASSERTF(req == oap->oap_request,
+                        LASSERTF(request == oap->oap_request,
                                  "request %p != oap_request %p\n",
-                                 req, oap->oap_request);
+                                 request, oap->oap_request);
                         if (oap->oap_interrupted) {
-                                ptlrpc_mark_interrupted(oap->oap_request);
-                                rc = -EINTR;
-                                break;
+                                client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
+                                ptlrpc_req_finished(new_req);                        
+                                RETURN(-EINTR);
                         }
                 }
         }
-        if (rc)
-                RETURN(rc);
-        /* TODO-MERGE: and where to get ocapa?? */
-        rc = osc_brw_prep_request(lustre_msg_get_opc(req->rq_reqmsg) ==
-                                        OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
-                                  aa->aa_cli, aa->aa_oa,
-                                  NULL /* lsm unused by osc currently */,
-                                  aa->aa_page_count, aa->aa_ppga, &new_req,
-                                  NULL /* ocapa */);
-        if (rc)
-                RETURN(rc);
-
         /* New request takes over pga and oaps from old request.
          * Note that copying a list_head doesn't work, need to move it... */
-        new_req->rq_interpret_reply = req->rq_interpret_reply;
-        new_req->rq_async_args = req->rq_async_args;
+        aa->aa_resends++;
+        new_req->rq_interpret_reply = request->rq_interpret_reply;
+        new_req->rq_async_args = request->rq_async_args;
+        new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
+
         new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
+
         INIT_LIST_HEAD(&new_aa->aa_oaps);
         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
         INIT_LIST_HEAD(&aa->aa_oaps);
@@ -1363,6 +1393,9 @@ int osc_brw_redo_request(struct ptlrpc_request *req,
                         oap->oap_request = ptlrpc_request_addref(new_req);
                 }
         }
+        client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
+
+        DEBUG_REQ(D_INFO, new_req, "new request");
 
         ptlrpc_set_add_req(set, new_req);
 
@@ -1377,7 +1410,7 @@ static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
-        if (rc == -EAGAIN) {
+        if (osc_recoverable_error(rc)) {
                 rc = osc_brw_redo_request(req, aa);
                 if (rc == 0)
                         RETURN(0);
@@ -1385,10 +1418,14 @@ static int brw_interpret(struct ptlrpc_request *req, void *data, int rc)
         if ((rc >= 0) && req->rq_set && req->rq_set->set_countp)
                 atomic_add(nob, (atomic_t *)req->rq_set->set_countp);
 
-        spin_lock(&aa->aa_cli->cl_loi_list_lock);
+        client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
+        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
+                aa->aa_cli->cl_w_in_flight--;
+        else
+                aa->aa_cli->cl_r_in_flight--;
         for (i = 0; i < aa->aa_page_count; i++)
                 osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
-        spin_unlock(&aa->aa_cli->cl_loi_list_lock);
+        client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
 
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
 
@@ -1403,6 +1440,7 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
         struct ptlrpc_request     *req;
         struct client_obd         *cli = &exp->exp_obd->u.cli;
         int                        rc, i;
+        struct osc_brw_async_args *aa;
         ENTRY;
 
         /* Consume write credits even if doing a sync write -
@@ -1418,14 +1456,33 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
 
         rc = osc_brw_prep_request(cmd, cli, oa, lsm, page_count, pga,
                                   &req, ocapa);
+
+        aa = (struct osc_brw_async_args *)&req->rq_async_args;
+        if (cmd == OBD_BRW_READ) {
+                lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
+                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
+                ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
+        } else {
+                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
+                lprocfs_oh_tally(&cli->cl_write_rpc_hist,
+                                 cli->cl_w_in_flight);
+                ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
+        }
+
         if (rc == 0) {
                 req->rq_interpret_reply = brw_interpret;
                 ptlrpc_set_add_req(set, req);
+                client_obd_list_lock(&cli->cl_loi_list_lock);
+                if (cmd == OBD_BRW_READ)
+                        cli->cl_r_in_flight++;
+                else
+                        cli->cl_w_in_flight++;
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
         } else if (cmd == OBD_BRW_WRITE) {
-                spin_lock(&cli->cl_loi_list_lock);
+                client_obd_list_lock(&cli->cl_loi_list_lock);
                 for (i = 0; i < page_count; i++)
                         osc_release_write_grant(cli, pga[i], 0);
-                spin_unlock(&cli->cl_loi_list_lock);
+                client_obd_list_unlock(&cli->cl_loi_list_lock);
         }
         RETURN (rc);
 }
@@ -1792,7 +1849,7 @@ unlock:
  * the app does an fsync.  As long as errors persist we force future rpcs to be
  * sync so that the app can get a sync error and break the cycle of queueing
  * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
+static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
                            int rc)
 {
         if (rc) {
@@ -1805,7 +1862,7 @@ static void osc_process_ar(struct osc_async_rc *ar, struct ptlrpc_request *req,
 
         }
 
-        if (ar->ar_force_sync && req && (ptlrpc_req_xid(req) >= ar->ar_min_xid))
+        if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
                 ar->ar_force_sync = 0;
 }
 
@@ -1829,18 +1886,21 @@ static void osc_oap_to_pending(struct osc_async_page *oap)
 static void osc_ap_completion(struct client_obd *cli, struct obdo *oa,
                               struct osc_async_page *oap, int sent, int rc)
 {
+        __u64 xid = 0;
+
         ENTRY;
+        if (oap->oap_request != NULL) {
+                xid = ptlrpc_req_xid(oap->oap_request);
+                ptlrpc_req_finished(oap->oap_request);
+                oap->oap_request = NULL;
+        }
+
         oap->oap_async_flags = 0;
         oap->oap_interrupted = 0;
 
         if (oap->oap_cmd & OBD_BRW_WRITE) {
-                osc_process_ar(&cli->cl_ar, oap->oap_request, rc);
-                osc_process_ar(&oap->oap_loi->loi_ar, oap->oap_request, rc);
-        }
-
-        if (oap->oap_request != NULL) {
-                ptlrpc_req_finished(oap->oap_request);
-                oap->oap_request = NULL;
+                osc_process_ar(&cli->cl_ar, xid, rc);
+                osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
         }
 
         if (rc == 0 && oa != NULL) {
@@ -1885,11 +1945,10 @@ static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
 
         rc = osc_brw_fini_request(req, rc);
         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
-        if (rc == -EAGAIN) {
+        if (osc_recoverable_error(rc)) {
                 rc = osc_brw_redo_request(req, aa);
                 if (rc == 0)
                         RETURN(0);
-                GOTO(out, rc);
         }
 
         cli = aa->aa_cli;
@@ -1917,8 +1976,7 @@ static int brw_interpret_oap(struct ptlrpc_request *req, void *data, int rc)
         client_obd_list_unlock(&cli->cl_loi_list_lock);
 
         OBDO_FREE(aa->aa_oa);
-        rc = 0;
-out:
+        
         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
         RETURN(rc);
 }
@@ -2708,8 +2766,7 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                 return;
         }
         lock_res_and_lock(lock);
-#ifdef __KERNEL__
-#ifdef __LINUX__
+#if defined (__KERNEL__) && defined (__LINUX__)
         /* Liang XXX: Darwin and Winnt checking should be added */
         if (lock->l_ast_data && lock->l_ast_data != data) {
                 struct inode *new_inode = data;
@@ -2724,7 +2781,6 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
                          new_inode, new_inode->i_ino, new_inode->i_generation);
         }
 #endif
-#endif
         lock->l_ast_data = data;
         lock->l_flags |= (flags & LDLM_FL_NO_LRU);
         unlock_res_and_lock(lock);
@@ -2755,7 +2811,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
                         struct ldlm_reply *rep;
 
                         /* swabbed by ldlm_cli_enqueue() */
-                        LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF);
+                        LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
                         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
                                              sizeof(*rep));
                         LASSERT(rep != NULL);
@@ -2779,7 +2835,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
 static int osc_enqueue_interpret(struct ptlrpc_request *req,
                                  struct osc_enqueue_args *aa, int rc)
 {
-        int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
+        int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
         struct ldlm_lock *lock;
 
@@ -2790,7 +2846,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
                                    aa->oa_ei->ei_mode,
-                                   &aa->oa_ei->ei_flags,
+                                   &aa->oa_oi->oi_flags,
                                    &lsm->lsm_oinfo[0]->loi_lvb,
                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
                                    lustre_swab_ost_lvb,
@@ -2817,13 +2873,15 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
  * is excluded from the cluster -- such scenarious make the life difficult, so
  * release locks just after they are obtained. */
 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
-                       struct obd_enqueue_info *einfo)
+                       struct ldlm_enqueue_info *einfo,
+                       struct ptlrpc_request_set *rqset)
 {
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = exp->exp_obd;
         struct ldlm_reply *rep;
         struct ptlrpc_request *req = NULL;
-        int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
+        int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
+        ldlm_mode_t mode;
         int rc;
         ENTRY;
 
@@ -2840,13 +2898,31 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 goto no_match;
 
         /* Next, search for already existing extent locks that will cover us */
-        rc = ldlm_lock_match(obd->obd_namespace,
-                             einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
-                             einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
-                             oinfo->oi_lockh);
-        if (rc == 1) {
+        /* If we're trying to read, we also search for an existing PW lock.  The
+         * VFS and page cache already protect us locally, so lots of readers/
+         * writers can share a single PW lock.
+         *
+         * There are problems with conversion deadlocks, so instead of
+         * converting a read lock to a write lock, we'll just enqueue a new
+         * one.
+         *
+         * At some point we should cancel the read lock instead of making them
+         * send us a blocking callback, but there are problems with canceling
+         * locks out from other users right now, too. */
+        mode = einfo->ei_mode;
+        if (einfo->ei_mode == LCK_PR)
+                mode |= LCK_PW;
+        mode = ldlm_lock_match(obd->obd_namespace,
+                               oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
+                               einfo->ei_type, &oinfo->oi_policy, mode,
+                               oinfo->oi_lockh);
+        if (mode) {
+                /* addref the lock only if not async requests and PW lock is
+                 * matched whereas we asked for PR. */
+                if (!rqset && einfo->ei_mode != mode)
+                        ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
-                                        einfo->ei_flags);
+                                        oinfo->oi_flags);
                 if (intent) {
                         /* I would like to be able to ASSERT here that rss <=
                          * kms, but I can't, for reasons which are explained in
@@ -2857,45 +2933,14 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
 
                 /* For async requests, decref the lock. */
-                if (einfo->ei_rqset)
+                if (einfo->ei_mode != mode)
+                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
+                else if (rqset)
                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
 
                 RETURN(ELDLM_OK);
         }
 
-        /* If we're trying to read, we also search for an existing PW lock.  The
-         * VFS and page cache already protect us locally, so lots of readers/
-         * writers can share a single PW lock.
-         *
-         * There are problems with conversion deadlocks, so instead of
-         * converting a read lock to a write lock, we'll just enqueue a new
-         * one.
-         *
-         * At some point we should cancel the read lock instead of making them
-         * send us a blocking callback, but there are problems with canceling
-         * locks out from other users right now, too. */
-
-        if (einfo->ei_mode == LCK_PR) {
-                rc = ldlm_lock_match(obd->obd_namespace,
-                                     einfo->ei_flags | LDLM_FL_LVB_READY,
-                                     &res_id, einfo->ei_type, &oinfo->oi_policy,
-                                     LCK_PW, oinfo->oi_lockh);
-                if (rc == 1) {
-                        /* FIXME: This is not incredibly elegant, but it might
-                         * be more elegant than adding another parameter to
-                         * lock_match.  I want a second opinion. */
-                        /* addref the lock only if not async requests. */
-                        if (!einfo->ei_rqset)
-                                ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
-                        osc_set_data_with_check(oinfo->oi_lockh,
-                                                einfo->ei_cbdata,
-                                                einfo->ei_flags);
-                        oinfo->oi_cb_up(oinfo, ELDLM_OK);
-                        ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
-                        RETURN(ELDLM_OK);
-                }
-        }
-
  no_match:
         if (intent) {
                 int size[3] = {
@@ -2914,18 +2959,15 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
         }
 
         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
-        einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
+        oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
 
-        rc = ldlm_cli_enqueue(exp, &req, &res_id, einfo->ei_type,
-                              &oinfo->oi_policy, einfo->ei_mode,
-                              &einfo->ei_flags, einfo->ei_cb_bl,
-                              einfo->ei_cb_cp, einfo->ei_cb_gl,
-                              einfo->ei_cbdata,
+        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
+                              &oinfo->oi_policy, &oinfo->oi_flags,
                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
                               lustre_swab_ost_lvb, oinfo->oi_lockh,
-                              einfo->ei_rqset ? 1 : 0);
-        if (einfo->ei_rqset) {
+                              rqset ? 1 : 0);
+        if (rqset) {
                 if (!rc) {
                         struct osc_enqueue_args *aa;
                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
@@ -2935,7 +2977,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                         aa->oa_exp = exp;
 
                         req->rq_interpret_reply = osc_enqueue_interpret;
-                        ptlrpc_set_add_req(einfo->ei_rqset, req);
+                        ptlrpc_set_add_req(rqset, req);
                 } else if (intent) {
                         ptlrpc_req_finished(req);
                 }
@@ -2955,8 +2997,8 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
 {
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = exp->exp_obd;
-        int rc;
         int lflags = *flags;
+        ldlm_mode_t rc;
         ENTRY;
 
         res_id.name[0] = lsm->lsm_object_id;
@@ -2970,28 +3012,21 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         policy->l_extent.end |= ~CFS_PAGE_MASK;
 
         /* Next, search for already existing extent locks that will cover us */
-        rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
-                             &res_id, type, policy, mode, lockh);
-        if (rc) {
-                //if (!(*flags & LDLM_FL_TEST_LOCK))
-                        osc_set_data_with_check(lockh, data, lflags);
-                RETURN(rc);
-        }
         /* If we're trying to read, we also search for an existing PW lock.  The
          * VFS and page cache already protect us locally, so lots of readers/
          * writers can share a single PW lock. */
-        if (mode == LCK_PR) {
-                rc = ldlm_lock_match(obd->obd_namespace,
-                                     lflags | LDLM_FL_LVB_READY, &res_id,
-                                     type, policy, LCK_PW, lockh);
-                if (rc == 1 && !(lflags & LDLM_FL_TEST_LOCK)) {
-                        /* FIXME: This is not incredibly elegant, but it might
-                         * be more elegant than adding another parameter to
-                         * lock_match.  I want a second opinion. */
-                        osc_set_data_with_check(lockh, data, lflags);
+        rc = mode;
+        if (mode == LCK_PR)
+                rc |= LCK_PW;
+        rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
+                             &res_id, type, policy, rc, lockh);
+        if (rc) {
+                osc_set_data_with_check(lockh, data, lflags);
+                if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
                         ldlm_lock_addref(lockh, LCK_PR);
                         ldlm_lock_decref(lockh, LCK_PW);
                 }
+                RETURN(rc);
         }
         RETURN(rc);
 }
@@ -3341,7 +3376,7 @@ static int osc_setinfo_mds_conn_interpret(struct ptlrpc_request *req,
         imp->imp_server_timeout = 1;
         imp->imp_pingable = 1;
         spin_unlock(&imp->imp_lock);
-        CDEBUG(D_HA, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
+        CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
 
         RETURN(rc);
 }
@@ -3417,7 +3452,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        if (KEY_IS("mds_conn")) {
+        if (KEY_IS(KEY_MDS_CONN)) {
                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
 
                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
@@ -3744,6 +3779,7 @@ struct obd_ops osc_obd_ops = {
         .o_statfs_async         = osc_statfs_async,
         .o_packmd               = osc_packmd,
         .o_unpackmd             = osc_unpackmd,
+        .o_precreate            = osc_precreate,
         .o_create               = osc_create,
         .o_destroy              = osc_destroy,
         .o_getattr              = osc_getattr,
@@ -3774,7 +3810,6 @@ struct obd_ops osc_obd_ops = {
         .o_llog_finish          = osc_llog_finish,
         .o_process_config       = osc_process_config,
 };
-
 int __init osc_init(void)
 {
         struct lprocfs_static_vars lvars;