Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / osc / osc_request.c
index a4f7394..6654809 100644 (file)
@@ -1,32 +1,37 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
- *   Author Peter Braam <braam@clusterfs.com>
+ * GPL HEADER START
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *  For testing and management it is treated as an obd_device,
- *  although * it does not export a full OBD method table (the
- *  requests are coming * in over the wire, so object target modules
- *  do not have a full * method table.)
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
  *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
 #ifndef EXPORT_SYMTAB
@@ -70,7 +75,7 @@ static quota_interface_t *quota_interface;
 extern quota_interface_t osc_quota_interface;
 
 /* by default 10s */
-atomic_t osc_resend_time; 
+atomic_t osc_resend_time;
 
 /* Pack OSC object metadata for disk storage (LE byte order). */
 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
@@ -191,7 +196,7 @@ static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         struct osc_async_args *aa;
         ENTRY;
 
@@ -207,7 +212,7 @@ static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
         req->rq_interpret_reply = osc_getattr_interpret;
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = (struct osc_async_args *)&req->rq_async_args;
+        aa = ptlrpc_req_async_args(req);
         aa->aa_oi = oinfo;
 
         ptlrpc_set_add_req(set, req);
@@ -218,7 +223,8 @@ static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int rc;
         ENTRY;
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
@@ -262,7 +268,8 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int rc;
         ENTRY;
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
@@ -320,12 +327,17 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
+        int bufcount = 2;
         struct osc_async_args *aa;
         ENTRY;
 
+        if (osc_exp_is_2_0_server(exp)) {
+                bufcount = 3;
+        }
+
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_SETATTR, 2, size, NULL);
+                              OST_SETATTR, bufcount, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -346,7 +358,7 @@ static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_interpret_reply = osc_setattr_interpret;
 
                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-                aa = (struct osc_async_args *)&req->rq_async_args;
+                aa = ptlrpc_req_async_args(req);
                 aa->aa_oi = oinfo;
 
                 ptlrpc_set_add_req(rqset, req);
@@ -361,7 +373,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
         struct ptlrpc_request *req;
         struct ost_body *body;
         struct lov_stripe_md *lsm;
-        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        int rc;
         ENTRY;
 
         LASSERT(oa);
@@ -464,7 +477,7 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
         struct ptlrpc_request *req;
         struct osc_async_args *aa;
         struct ost_body *body;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
         if (!oinfo->oi_oa) {
@@ -492,22 +505,46 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
 
         req->rq_interpret_reply = osc_punch_interpret;
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = (struct osc_async_args *)&req->rq_async_args;
+        aa = ptlrpc_req_async_args(req);
         aa->aa_oi = oinfo;
         ptlrpc_set_add_req(rqset, req);
 
         RETURN(0);
 }
 
-static int osc_sync(struct obd_export *exp, struct obdo *oa,
-                    struct lov_stripe_md *md, obd_size start, obd_size end)
+static int osc_sync_interpret(struct ptlrpc_request *req,
+                              struct osc_async_args *aa, int rc)
+{
+        struct ost_body *body;
+        ENTRY;
+
+        if (rc)
+                GOTO(out, rc);
+
+        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
+                                  lustre_swab_ost_body);
+        if (body == NULL) {
+                CERROR ("can't unpack ost_body\n");
+                GOTO(out, rc = -EPROTO);
+        }
+
+        *aa->aa_oi->oi_oa = body->oa;
+out:
+        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+        RETURN(rc);
+}
+
+static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
+                    obd_size start, obd_size end,
+                    struct ptlrpc_request_set *set)
 {
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        struct osc_async_args *aa;
         ENTRY;
 
-        if (!oa) {
+        if (!oinfo->oi_oa) {
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
@@ -518,7 +555,7 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
-        memcpy(&body->oa, oa, sizeof(*oa));
+        memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));
 
         /* overload the size and blocks fields in the oa with start/end */
         body->oa.o_size = start;
@@ -526,39 +563,31 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa,
         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
 
         ptlrpc_req_set_repsize(req, 2, size);
+        req->rq_interpret_reply = osc_sync_interpret;
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
-
-        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
-        if (body == NULL) {
-                CERROR ("can't unpack ost_body\n");
-                GOTO (out, rc = -EPROTO);
-        }
-
-        memcpy(oa, &body->oa, sizeof(*oa));
+        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+        aa = ptlrpc_req_async_args(req);
+        aa->aa_oi = oinfo;
 
-        EXIT;
- out:
-        ptlrpc_req_finished(req);
-        return rc;
+        ptlrpc_set_add_req(set, req);
+        RETURN (0);
 }
 
 /* Find and cancel locally locks matched by @mode in the resource found by
  * @objid. Found locks are added into @cancel list. Returns the amount of
  * locks added to @cancels list. */
-static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,
+static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
                                    struct list_head *cancels, ldlm_mode_t mode,
                                    int lock_flags)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
-        struct ldlm_res_id res_id = { .name = { objid } };
-        struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+        struct ldlm_res_id res_id;
+        struct ldlm_resource *res;
         int count;
         ENTRY;
 
+        osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
+        res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
         if (res == NULL)
                 RETURN(0);
 
@@ -613,7 +642,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         CFS_LIST_HEAD(cancels);
         struct ptlrpc_request *req;
         struct ost_body *body;
-        int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
+        __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),
                         sizeof(struct ldlm_request) };
         int count, bufcount = 2;
         struct client_obd *cli = &exp->exp_obd->u.cli;
@@ -626,7 +655,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
 
         LASSERT(oa->o_id != 0);
 
-        count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
+        count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
                                         LDLM_FL_DISCARD_DATA);
         if (exp_connect_cancelset(exp))
                 bufcount = 3;
@@ -835,7 +864,7 @@ static void handle_short_read(int nob_read, obd_count page_count,
 
                 if (pga[i]->count > nob_read) {
                         /* EOF inside this page */
-                        ptr = cfs_kmap(pga[i]->pg) + 
+                        ptr = cfs_kmap(pga[i]->pg) +
                                 (pga[i]->off & ~CFS_PAGE_MASK);
                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
                         cfs_kunmap(pga[i]->pg);
@@ -888,7 +917,7 @@ static int check_write_rcs(struct ptlrpc_request *req,
 
         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
-                       requested_nob, req->rq_bulk->bd_nob_transferred);
+                       req->rq_bulk->bd_nob_transferred, requested_nob);
                 return(-EPROTO);
         }
 
@@ -957,7 +986,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
-        int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+        __u32 size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         int niocount, i, requested_nob, opc, rc;
         struct ptlrpc_request_pool *pool;
         struct osc_brw_async_args *aa;
@@ -1048,8 +1077,8 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         LASSERTF((void *)(niobuf - niocount) ==
                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
                                niocount * sizeof(*niobuf)),
-                "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg, 
-                REQ_REC_OFF + 2, niocount * sizeof(*niobuf)), 
+                "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
+                REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
                 (void *)(niobuf - niocount));
 
         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
@@ -1095,7 +1124,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         }
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = (struct osc_brw_async_args *)&req->rq_async_args;
+        aa = ptlrpc_req_async_args(req);
         aa->aa_oa = oa;
         aa->aa_requested_nob = requested_nob;
         aa->aa_nio_count = niocount;
@@ -1152,7 +1181,7 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
                            "["LPU64"-"LPU64"]\n",
                            msg, libcfs_nid2str(peer->nid),
                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
-                           oa->o_valid & OBD_MD_FLFID ? oa->o_generation : 
+                           oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
                                                         (__u64)0,
                            oa->o_id,
                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
@@ -1168,7 +1197,7 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
 /* Note rc enters this function as number of bytes transferred */
 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
 {
-        struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
+        struct osc_brw_async_args *aa = ptlrpc_req_async_args(req);
         const lnet_process_id_t *peer =
                         &req->rq_import->imp_connection->c_peer;
         struct client_obd *cli = aa->aa_cli;
@@ -1262,7 +1291,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 if (server_cksum == ~0 && rc > 0) {
                         CERROR("Protocol error: server %s set the 'checksum' "
                                "bit, but didn't send a checksum.  Not fatal, "
-                               "but please tell CFS.\n",
+                               "but please notify on http://bugzilla.lustre.org/\n",
                                libcfs_nid2str(peer->nid));
                 } else if (server_cksum != client_cksum) {
                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
@@ -1401,7 +1430,7 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         new_req->rq_async_args = request->rq_async_args;
         new_req->rq_sent = CURRENT_SECONDS + aa->aa_resends;
 
-        new_aa = (struct osc_brw_async_args *)&new_req->rq_async_args;
+        new_aa = ptlrpc_req_async_args(new_req);
 
         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
@@ -1414,9 +1443,9 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
                 }
         }
 
-        /* use ptlrpc_set_add_req is safe because interpret functions work 
-         * in check_set context. only one way exist with access to request 
-         * from different thread got -EINTR - this way protected with 
+        /* use ptlrpc_set_add_req is safe because interpret functions work
+         * in check_set context. only one way exist with access to request
+         * from different thread got -EINTR - this way protected with
          * cl_loi_list_lock */
         ptlrpc_set_add_req(set, new_req);
 
@@ -1450,17 +1479,18 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa,
         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
                                   page_count, pga, &request);
 
-        aa = (struct osc_brw_async_args *)&request->rq_async_args;
+        CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
+        aa = ptlrpc_req_async_args(request);
         if (cmd == OBD_BRW_READ) {
                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
-                ptlrpc_lprocfs_brw(request, OST_READ, aa->aa_requested_nob);
         } else {
                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
                                  cli->cl_w_in_flight);
-                ptlrpc_lprocfs_brw(request, OST_WRITE, aa->aa_requested_nob);
         }
+        ptlrpc_lprocfs_brw(request, aa->aa_requested_nob);
+
         LASSERT(list_empty(&aa->aa_oaps));
 
         if (rc == 0) {
@@ -1762,6 +1792,25 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         RETURN(0);
 }
 
+static int lop_makes_hprpc(struct loi_oap_pages *lop)
+{
+        struct osc_async_page *oap;
+        ENTRY;
+
+        if (list_empty(&lop->lop_urgent))
+                RETURN(0);
+
+        oap = list_entry(lop->lop_urgent.next,
+                         struct osc_async_page, oap_urgent_item);
+
+        if (oap->oap_async_flags & ASYNC_HP) {
+                CDEBUG(D_CACHE, "hp request forcing RPC\n");
+                RETURN(1);
+        }
+
+        RETURN(0);
+}
+
 static void on_list(struct list_head *item, struct list_head *list,
                     int should_be_on)
 {
@@ -1775,9 +1824,17 @@ static void on_list(struct list_head *item, struct list_head *list,
  * can find pages to build into rpcs quickly */
 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
 {
-        on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
-                lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
-                lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        if (lop_makes_hprpc(&loi->loi_write_lop) ||
+            lop_makes_hprpc(&loi->loi_read_lop)) {
+                /* HP rpc */
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
+        } else {
+                on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+                on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+                        lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+                        lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+        }
 
         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
                 loi->loi_write_lop.lop_num_pending);
@@ -1873,8 +1930,10 @@ static void osc_oap_to_pending(struct osc_async_page *oap)
         else
                 lop = &oap->oap_loi->loi_read_lop;
 
-        if (oap->oap_async_flags & ASYNC_URGENT)
+        if (oap->oap_async_flags & ASYNC_HP)
                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+        else if (oap->oap_async_flags & ASYNC_URGENT)
+                list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
 }
@@ -1993,6 +2052,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
         void *caller_data = NULL;
         struct osc_async_page *oap;
         struct ldlm_lock *lock = NULL;
+        obd_valid valid;
         int i, rc;
 
         ENTRY;
@@ -2034,17 +2094,35 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
                 CERROR("prep_req failed: %d\n", rc);
                 GOTO(out, req = ERR_PTR(rc));
         }
+        oa = &((struct ost_body *)lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
+                                                 sizeof(struct ost_body)))->oa;
 
         /* Need to update the timestamps after the request is built in case
          * we race with setattr (locally or in queue at OST).  If OST gets
          * later setattr before earlier BRW (as determined by the request xid),
          * the OST will not use BRW timestamps.  Sadly, there is no obvious
          * way to do this in a single call.  bug 10150 */
-        ops->ap_update_obdo(caller_data, cmd, oa,
-                            OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
+        if (pga[0]->flag & OBD_BRW_SRVLOCK) {
+                /* in case of lockless read/write do not use inode's
+                 * timestamps because concurrent stat might fill the
+                 * inode with out-of-date times, send current
+                 * instead */
+                if (cmd & OBD_BRW_WRITE) {
+                        oa->o_mtime = oa->o_ctime = LTIME_S(CURRENT_TIME);
+                        oa->o_valid |= OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+                        valid = OBD_MD_FLATIME;
+                } else {
+                        oa->o_atime = LTIME_S(CURRENT_TIME);
+                        oa->o_valid |= OBD_MD_FLATIME;
+                        valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+                }
+        } else {
+                valid = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
+        }
+        ops->ap_update_obdo(caller_data, cmd, oa, valid);
 
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = (struct osc_brw_async_args *)&req->rq_async_args;
+        aa = ptlrpc_req_async_args(req);
         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
         list_splice(rpc_list, &aa->aa_oaps);
         CFS_INIT_LIST_HEAD(rpc_list);
@@ -2086,6 +2164,15 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
         int srvlock = 0;
         ENTRY;
 
+        /* If there are HP OAPs we need to handle at least 1 of them,
+         * move it the beginning of the pending list for that. */
+        if (!list_empty(&lop->lop_urgent)) {
+                oap = list_entry(lop->lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                if (oap->oap_async_flags & ASYNC_HP)
+                        list_move(&oap->oap_pending_item, &lop->lop_pending);
+        }
+
         /* first we find the pages we're allowed to work with */
         list_for_each_entry_safe(oap, tmp, &lop->lop_pending, oap_pending_item){
                 ops = oap->oap_caller_ops;
@@ -2234,21 +2321,20 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                 RETURN(PTR_ERR(req));
         }
 
-        aa = (struct osc_brw_async_args *)&req->rq_async_args;
+        aa = ptlrpc_req_async_args(req);
         if (cmd == OBD_BRW_READ) {
                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
-                ptlrpc_lprocfs_brw(req, OST_READ, aa->aa_requested_nob);
         } else {
                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
                                  cli->cl_w_in_flight);
                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
-                ptlrpc_lprocfs_brw(req, OST_WRITE, aa->aa_requested_nob);
         }
+        ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
 
         client_obd_list_lock(&cli->cl_loi_list_lock);
 
@@ -2283,7 +2369,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 
 #define LOI_DEBUG(LOI, STR, args...)                                     \
         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
-               !list_empty(&(LOI)->loi_cli_item),                        \
+               !list_empty(&(LOI)->loi_ready_item) ||                    \
+               !list_empty(&(LOI)->loi_hp_ready_item),                   \
                (LOI)->loi_write_lop.lop_num_pending,                     \
                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
                (LOI)->loi_read_lop.lop_num_pending,                      \
@@ -2295,11 +2382,15 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
 {
         ENTRY;
-        /* first return all objects which we already know to have
-         * pages ready to be stuffed into rpcs */
+        /* First return objects that have blocked locks so that they
+         * will be flushed quickly and other clients can get the lock,
+         * then objects which have pages ready to be stuffed into RPCs */
+        if (!list_empty(&cli->cl_loi_hp_ready_list))
+                RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
+                                  struct lov_oinfo, loi_hp_ready_item));
         if (!list_empty(&cli->cl_loi_ready_list))
                 RETURN(list_entry(cli->cl_loi_ready_list.next,
-                                  struct lov_oinfo, loi_cli_item));
+                                  struct lov_oinfo, loi_ready_item));
 
         /* then if we have cache waiters, return all objects with queued
          * writes.  This is especially important when many small files
@@ -2323,6 +2414,26 @@ struct lov_oinfo *osc_next_loi(struct client_obd *cli)
         RETURN(NULL);
 }
 
+static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
+{
+        struct osc_async_page *oap;
+        int hprpc = 0;
+
+        if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
+                oap = list_entry(loi->loi_write_lop.lop_urgent.next,
+                                 struct osc_async_page, oap_urgent_item);
+                hprpc = !!(oap->oap_async_flags & ASYNC_HP);
+        }
+
+        return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
+}
+
 /* called with the loi list lock held */
 static void osc_check_rpcs(struct client_obd *cli)
 {
@@ -2333,7 +2444,7 @@ static void osc_check_rpcs(struct client_obd *cli)
         while ((loi = osc_next_loi(cli)) != NULL) {
                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
 
-                if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
+                if (osc_max_rpc_in_flight(cli, loi))
                         break;
 
                 /* attempt some read/write balancing by alternating between
@@ -2365,8 +2476,10 @@ static void osc_check_rpcs(struct client_obd *cli)
 
                 /* attempt some inter-object balancing by issueing rpcs
                  * for each object in turn */
-                if (!list_empty(&loi->loi_cli_item))
-                        list_del_init(&loi->loi_cli_item);
+                if (!list_empty(&loi->loi_hp_ready_item))
+                        list_del_init(&loi->loi_hp_ready_item);
+                if (!list_empty(&loi->loi_ready_item))
+                        list_del_init(&loi->loi_ready_item);
                 if (!list_empty(&loi->loi_write_item))
                         list_del_init(&loi->loi_write_item);
                 if (!list_empty(&loi->loi_read_item))
@@ -2523,9 +2636,9 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         spin_lock_init(&oap->oap_lock);
 
-        /* If the page was marked as notcacheable - don't add to any locks */ 
+        /* If the page was marked as notcacheable - don't add to any locks */
         if (!nocache) {
-                oid.name[0] = loi->loi_id;
+                osc_build_res_name(loi->loi_id, loi->loi_gr, &oid);
                 /* This is the only place where we can call cache_add_extent
                    without oap_lock, because this page is locked now, and
                    the lock we are adding it to is referenced, so cannot lose
@@ -2570,7 +2683,6 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
                 RETURN(-EBUSY);
 
         /* check if the file's owner/group is over quota */
-#ifdef HAVE_QUOTA_SUPPORT
         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)){
                 struct obd_async_page_ops *ops;
                 struct obdo *oa;
@@ -2589,7 +2701,6 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm,
                 if (rc)
                         RETURN(rc);
         }
-#endif
 
         if (loi == NULL)
                 loi = lsm->lsm_oinfo[0];
@@ -2673,11 +2784,14 @@ static int osc_set_async_flags(struct obd_export *exp,
         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
                 oap->oap_async_flags |= ASYNC_READY;
 
-        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
-                if (list_empty(&oap->oap_rpc_item)) {
+        if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
+            list_empty(&oap->oap_rpc_item)) {
+                if (oap->oap_async_flags & ASYNC_HP)
                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
-                        loi_list_maint(cli, loi);
-                }
+                else
+                        list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+                oap->oap_async_flags |= ASYNC_URGENT;
+                loi_list_maint(cli, loi);
         }
 
         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
@@ -2812,8 +2926,9 @@ static int osc_teardown_async_page(struct obd_export *exp,
 
         if (!list_empty(&oap->oap_urgent_item)) {
                 list_del_init(&oap->oap_urgent_item);
-                oap->oap_async_flags &= ~ASYNC_URGENT;
+                oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
         }
+
         if (!list_empty(&oap->oap_pending_item)) {
                 list_del_init(&oap->oap_pending_item);
                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
@@ -2833,12 +2948,12 @@ int osc_extent_blocking_cb(struct ldlm_lock *lock,
 {
         struct lustre_handle lockh = { 0 };
         int rc;
-        ENTRY;  
-                
+        ENTRY;
+
         if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {
                 LDLM_ERROR(lock, "cancelling lock with bad data %p", data);
-                LBUG(); 
-        }       
+                LBUG();
+        }
 
         switch (flag) {
         case LDLM_CB_BLOCKING:
@@ -2904,9 +3019,10 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data,
 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
                              ldlm_iterator_t replace, void *data)
 {
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id;
         struct obd_device *obd = class_exp2obd(exp);
 
+        osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
         return 0;
 }
@@ -2990,7 +3106,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                        struct ldlm_enqueue_info *einfo,
                        struct ptlrpc_request_set *rqset)
 {
-        struct ldlm_res_id res_id = { .name = {oinfo->oi_md->lsm_object_id} };
+        struct ldlm_res_id res_id;
         struct obd_device *obd = exp->exp_obd;
         struct ldlm_reply *rep;
         struct ptlrpc_request *req = NULL;
@@ -2999,6 +3115,8 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
         int rc;
         ENTRY;
 
+        osc_build_res_name(oinfo->oi_md->lsm_object_id,
+                           oinfo->oi_md->lsm_object_gr, &res_id);
         /* Filesystem lock extents are extended to page boundaries so that
          * dealing with the page cache is a little smoother.  */
         oinfo->oi_policy.l_extent.start -=
@@ -3054,7 +3172,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
 
  no_match:
         if (intent) {
-                int size[3] = {
+                __u32 size[3] = {
                         [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
                         [DLM_LOCKREQ_OFF + 1] = 0 };
@@ -3064,7 +3182,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                         RETURN(-ENOMEM);
 
                 size[DLM_LOCKREPLY_OFF] = sizeof(*rep);
-                size[DLM_REPLY_REC_OFF] = 
+                size[DLM_REPLY_REC_OFF] =
                         sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb);
                 ptlrpc_req_set_repsize(req, 3, size);
         }
@@ -3082,7 +3200,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 if (!rc) {
                         struct osc_enqueue_args *aa;
                         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-                        aa = (struct osc_enqueue_args *)&req->rq_async_args;
+                        aa = ptlrpc_req_async_args(req);
                         aa->oa_oi = oinfo;
                         aa->oa_ei = einfo;
                         aa->oa_exp = exp;
@@ -3106,12 +3224,14 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                      __u32 type, ldlm_policy_data_t *policy, __u32 mode,
                      int *flags, void *data, struct lustre_handle *lockh)
 {
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id;
         struct obd_device *obd = exp->exp_obd;
         int lflags = *flags;
         ldlm_mode_t rc;
         ENTRY;
 
+        osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
+
         OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO);
 
         /* Filesystem lock extents are extended to page boundaries so that
@@ -3157,19 +3277,30 @@ static int osc_cancel_unused(struct obd_export *exp,
                              struct lov_stripe_md *lsm, int flags, void *opaque)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id, *resp = NULL;
+
+        if (lsm != NULL) {
+                resp = osc_build_res_name(lsm->lsm_object_id,
+                                          lsm->lsm_object_gr, &res_id);
+        }
+
+        return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
 
-        return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags,
-                                      opaque);
 }
 
 static int osc_join_lru(struct obd_export *exp,
                         struct lov_stripe_md *lsm, int join)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} };
+        struct ldlm_res_id res_id, *resp = NULL;
+
+        if (lsm != NULL) {
+                resp = osc_build_res_name(lsm->lsm_object_id,
+                                          lsm->lsm_object_gr, &res_id);
+        }
+
+        return ldlm_cli_join_lru(obd->obd_namespace, resp, join);
 
-        return ldlm_cli_join_lru(obd->obd_namespace, &res_id, join);
 }
 
 static int osc_statfs_interpret(struct ptlrpc_request *req,
@@ -3199,7 +3330,7 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
 {
         struct ptlrpc_request *req;
         struct osc_async_args *aa;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*oinfo->oi_osfs) };
         ENTRY;
 
         /* We could possibly pass max_age in the request (as an absolute
@@ -3224,7 +3355,7 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
 
         req->rq_interpret_reply = osc_statfs_interpret;
         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = (struct osc_async_args *)&req->rq_async_args;
+        aa = ptlrpc_req_async_args(req);
         aa->aa_oi = oinfo;
 
         ptlrpc_set_add_req(rqset, req);
@@ -3237,10 +3368,11 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         struct obd_statfs *msfs;
         struct ptlrpc_request *req;
         struct obd_import     *imp = NULL;
-        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
+        __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*osfs) };
+        int rc;
         ENTRY;
 
-        /*Since the request might also come from lprocfs, so we need 
+        /*Since the request might also come from lprocfs, so we need
          *sync this with client_disconnect_export Bug15684*/
         down_read(&obd->u.cli.cl_sem);
         if (obd->u.cli.cl_import)
@@ -3248,7 +3380,7 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         up_read(&obd->u.cli.cl_sem);
         if (!imp)
                 RETURN(-ENODEV);
-  
+
         /* We could possibly pass max_age in the request (as an absolute
          * timestamp or a "seconds.usec ago") so the target can avoid doing
          * extra calls into the filesystem if that isn't necessary (e.g.
@@ -3295,32 +3427,48 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
  *
  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
  * the maximum number of OST indices which will fit in the user buffer.
- * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
+ * lmm_magic must be LOV_MAGIC_V1 or LOV_MAGIC_V3 (we only use 1 slot here).
  */
 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
 {
-        struct lov_user_md lum, *lumk;
+        /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
+        struct lov_user_md_v3 lum, *lumk;
         int rc = 0, lum_size;
+        struct lov_user_ost_data_v1 *lmm_objects;
         ENTRY;
 
         if (!lsm)
                 RETURN(-ENODATA);
 
-        if (copy_from_user(&lum, lump, sizeof(lum)))
+        /* we only need the header part from user space to get lmm_magic and
+         * lmm_stripe_count, (the header part is common to v1 and v3) */
+        lum_size = sizeof(struct lov_user_md_v1);
+        if (copy_from_user(&lum, lump, lum_size))
                 RETURN(-EFAULT);
 
-        if (lum.lmm_magic != LOV_USER_MAGIC)
+        if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
+            (lum.lmm_magic != LOV_USER_MAGIC_V3))
                 RETURN(-EINVAL);
 
+        /* lov_user_md_vX and lov_mds_md_vX must have the same size */
+        LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
+        LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
+        LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
+
+        /* we can use lov_mds_md_size() to compute lum_size
+         * because lov_user_md_vX and lov_mds_md_vX have the same size */
         if (lum.lmm_stripe_count > 0) {
-                lum_size = sizeof(lum) + sizeof(lum.lmm_objects[0]);
+                lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
                 OBD_ALLOC(lumk, lum_size);
                 if (!lumk)
                         RETURN(-ENOMEM);
-
-                lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id;
+                if (lum.lmm_magic == LOV_USER_MAGIC_V1)
+                        lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
+                else
+                        lmm_objects = &(lumk->lmm_objects[0]);
+                lmm_objects->l_object_id = lsm->lsm_object_id;
         } else {
-                lum_size = sizeof(lum);
+                lum_size = lov_mds_md_size(0, lum.lmm_magic);
                 lumk = &lum;
         }
 
@@ -3345,14 +3493,10 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         int err = 0;
         ENTRY;
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        MOD_INC_USE_COUNT;
-#else
         if (!try_module_get(THIS_MODULE)) {
                 CERROR("Can't get module. Is it alive?");
                 return -EINVAL;
         }
-#endif
         switch (cmd) {
         case OBD_IOC_LOV_GET_CONFIG: {
                 char *buf;
@@ -3418,7 +3562,7 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case OBD_IOC_DESTROY: {
                 struct obdo            *oa;
 
-                if (!capable (CAP_SYS_ADMIN))
+                if (!cfs_capable(CFS_CAP_SYS_ADMIN))
                         GOTO (out, err = -EPERM);
                 oa = &data->ioc_obdo1;
 
@@ -3430,22 +3574,21 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 err = osc_destroy(exp, oa, NULL, NULL, NULL);
                 GOTO(out, err);
         }
+        case OBD_IOC_PING_TARGET:
+                err = ptlrpc_obd_ping(obd);
+                GOTO(out, err);
         default:
                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
                        cmd, cfs_curproc_comm());
                 GOTO(out, err = -ENOTTY);
         }
 out:
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        MOD_DEC_USE_COUNT;
-#else
         module_put(THIS_MODULE);
-#endif
         return err;
 }
 
 static int osc_get_info(struct obd_export *exp, obd_count keylen,
-                        void *key, __u32 *vallen, void *val)
+                        void *key, __u32 *vallen, void *val, struct lov_stripe_md *lsm)
 {
         ENTRY;
         if (!vallen || !val)
@@ -3460,7 +3603,8 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen,
                 struct ptlrpc_request *req;
                 obd_id *reply;
                 char *bufs[2] = { NULL, key };
-                int rc, size[2] = { sizeof(struct ptlrpc_body), keylen };
+                __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
+                int rc;
 
                 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
                                       OST_GET_INFO, 2, size, bufs);
@@ -3483,7 +3627,39 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen,
         out:
                 ptlrpc_req_finished(req);
                 RETURN(rc);
+        } else if (KEY_IS(KEY_FIEMAP)) {
+                struct ptlrpc_request *req;
+                struct ll_user_fiemap *reply;
+                char *bufs[2] = { NULL, key };
+                __u32 size[2] = { sizeof(struct ptlrpc_body), keylen };
+                int rc;
+
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
+                                      OST_GET_INFO, 2, size, bufs);
+                if (req == NULL)
+                        RETURN(-ENOMEM);
+
+                size[REPLY_REC_OFF] = *vallen;
+                ptlrpc_req_set_repsize(req, 2, size);
+
+                rc = ptlrpc_queue_wait(req);
+                if (rc)
+                        GOTO(out1, rc);
+                reply = lustre_swab_repbuf(req, REPLY_REC_OFF, *vallen,
+                                           lustre_swab_fiemap);
+                if (reply == NULL) {
+                        CERROR("Can't unpack FIEMAP reply.\n");
+                        GOTO(out1, rc = -EPROTO);
+                }
+
+                memcpy(val, reply, *vallen);
+
+        out1:
+                ptlrpc_req_finished(req);
+
+                RETURN(rc);
         }
+
         RETURN(-EINVAL);
 }
 
@@ -3523,7 +3699,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
         struct ptlrpc_request *req;
         struct obd_device  *obd = exp->exp_obd;
         struct obd_import *imp = class_exp2cliimp(exp);
-        int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
+        __u32 size[3] = { sizeof(struct ptlrpc_body), keylen, vallen };
         char *bufs[3] = { NULL, key, val };
         ENTRY;
 
@@ -3599,7 +3775,7 @@ static struct llog_operations osc_size_repl_logops = {
 
 static struct llog_operations osc_mds_ost_orig_logops;
 static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                         int count, struct llog_catid *catid, 
+                         int count, struct llog_catid *catid,
                          struct obd_uuid *uuid)
 {
         int rc;
@@ -3624,11 +3800,16 @@ static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt,
 
         rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL,
                         &osc_size_repl_logops);
-        if (rc) 
+        if (rc) {
+                struct llog_ctxt *ctxt = 
+                        llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+                if (ctxt)
+                        llog_cleanup(ctxt);
                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
+        }
 out:
         if (rc) {
-                CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n", 
+                CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
                        obd->obd_name, tgt->obd_name, count, catid, rc);
                 CERROR("logid "LPX64":0x%x\n",
                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
@@ -3657,7 +3838,8 @@ static int osc_llog_finish(struct obd_device *obd, int count)
 
 static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
                          struct obd_uuid *cluuid,
-                         struct obd_connect_data *data)
+                         struct obd_connect_data *data,
+                         void *localdata)
 {
         struct client_obd *cli = &obd->u.cli;
 
@@ -3685,14 +3867,21 @@ static int osc_reconnect(struct obd_export *exp, struct obd_device *obd,
 static int osc_disconnect(struct obd_export *exp)
 {
         struct obd_device *obd = class_exp2obd(exp);
-        struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
+        struct llog_ctxt  *ctxt;
         int rc;
 
-        if (obd->u.cli.cl_conn_count == 1)
-                /* flush any remaining cancel messages out to the target */
-                llog_sync(ctxt, exp);
-        
-        llog_ctxt_put(ctxt);
+        ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
+        if (ctxt) {
+                if (obd->u.cli.cl_conn_count == 1) {
+                        /* Flush any remaining cancel messages out to the 
+                         * target */
+                        llog_sync(ctxt, exp);
+                }
+                llog_ctxt_put(ctxt);
+        } else {
+                CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", 
+                       obd);
+        }
 
         rc = client_disconnect_export(exp);
         return rc;
@@ -3841,12 +4030,17 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                    client import will not have been cleaned. */
                 if (obd->u.cli.cl_import) {
                         struct obd_import *imp;
+                        down_write(&obd->u.cli.cl_sem);
                         imp = obd->u.cli.cl_import;
                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
                                obd->obd_name);
                         ptlrpc_invalidate_import(imp);
-                        ptlrpc_free_rq_pool(imp->imp_rq_pool);
+                        if (imp->imp_rq_pool) {
+                                ptlrpc_free_rq_pool(imp->imp_rq_pool);
+                                imp->imp_rq_pool = NULL;
+                        }
                         class_destroy_import(imp);
+                        up_write(&obd->u.cli.cl_sem);
                         obd->u.cli.cl_import = NULL;
                 }
                 rc = obd_llog_finish(obd, 0);
@@ -4020,7 +4214,7 @@ static void /*__exit*/ osc_exit(void)
         class_unregister_type(LUSTRE_OSC_NAME);
 }
 
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
 MODULE_LICENSE("GPL");