Whamcloud - gitweb
LU-6485 libcfs: embed kr_data into kkuc_reg
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 09cfac5..06daca0 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <linux/miscdevice.h>
 #include <linux/init.h>
 #include <linux/utsname.h>
+#include <linux/kthread.h>
+#include <linux/user_namespace.h>
+#ifdef HAVE_UIDGID_HEADER
+# include <linux/uidgid.h>
+#endif
 
 #include <lustre_acl.h>
 #include <lustre_ioctl.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <lustre_log.h>
+#include <lustre_kernelcomm.h>
 #include <cl_object.h>
 
 #include "mdc_internal.h"
 
 #define REQUEST_MINOR 244
 
-struct mdc_renew_capa_args {
-        struct obd_capa        *ra_oc;
-        renew_capa_cb_t         ra_cb;
-};
-
 static int mdc_cleanup(struct obd_device *obd);
 
-int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
-                    const struct req_msg_field *field, struct obd_capa **oc)
-{
-        struct lustre_capa *capa;
-        struct obd_capa *c;
-        ENTRY;
-
-        /* swabbed already in mdc_enqueue */
-        capa = req_capsule_server_get(&req->rq_pill, field);
-        if (capa == NULL)
-                RETURN(-EPROTO);
-
-        c = alloc_capa(CAPA_SITE_CLIENT);
-        if (IS_ERR(c)) {
-                CDEBUG(D_INFO, "alloc capa failed!\n");
-                RETURN(PTR_ERR(c));
-        } else {
-                c->c_capa = *capa;
-                *oc = c;
-                RETURN(0);
-        }
-}
-
 static inline int mdc_queue_wait(struct ptlrpc_request *req)
 {
        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
@@ -104,56 +82,41 @@ static inline int mdc_queue_wait(struct ptlrpc_request *req)
        return rc;
 }
 
-/* Helper that implements most of mdc_getstatus and signal_completed_replay. */
-/* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
-static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
-                          struct obd_capa **pc, int level, int msg_flags)
+static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid)
 {
-        struct ptlrpc_request *req;
-        struct mdt_body       *body;
-        int                    rc;
-        ENTRY;
+       struct ptlrpc_request   *req;
+       struct mdt_body         *body;
+       int                      rc;
 
-        req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_GETSTATUS,
-                                        LUSTRE_MDS_VERSION, MDS_GETSTATUS);
-        if (req == NULL)
-                RETURN(-ENOMEM);
+       ENTRY;
 
-        mdc_pack_body(req, NULL, NULL, 0, 0, -1, 0);
-        lustre_msg_add_flags(req->rq_reqmsg, msg_flags);
-        req->rq_send_state = level;
+       req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
+                                       &RQF_MDS_GETSTATUS,
+                                       LUSTRE_MDS_VERSION, MDS_GETSTATUS);
+       if (req == NULL)
+               RETURN(-ENOMEM);
 
-        ptlrpc_request_set_replen(req);
+       mdc_pack_body(req, NULL, 0, 0, -1, 0);
+       req->rq_send_state = LUSTRE_IMP_FULL;
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
+       ptlrpc_request_set_replen(req);
 
-        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-        if (body == NULL)
-                GOTO(out, rc = -EPROTO);
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
 
-       if (body->mbo_valid & OBD_MD_FLMDSCAPA) {
-               rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, pc);
-               if (rc)
-                       GOTO(out, rc);
-       }
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               GOTO(out, rc = -EPROTO);
 
        *rootfid = body->mbo_fid1;
        CDEBUG(D_NET, "root fid="DFID", last_committed="LPU64"\n",
               PFID(rootfid), lustre_msg_get_last_committed(req->rq_repmsg));
-        EXIT;
+       EXIT;
 out:
-        ptlrpc_req_finished(req);
-        return rc;
-}
+       ptlrpc_req_finished(req);
 
-/* This should be mdc_get_info("rootfid") */
-int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
-                  struct obd_capa **pc)
-{
-        return send_getstatus(class_exp2cliimp(exp), rootfid, pc,
-                              LUSTRE_IMP_FULL, 0);
+       return rc;
 }
 
 /*
@@ -205,18 +168,11 @@ static int mdc_getattr_common(struct obd_export *exp,
                         RETURN(-EPROTO);
         }
 
-       if (body->mbo_valid & OBD_MD_FLMDSCAPA) {
-                struct lustre_capa *capa;
-                capa = req_capsule_server_get(pill, &RMF_CAPA1);
-                if (capa == NULL)
-                        RETURN(-EPROTO);
-        }
-
         RETURN(0);
 }
 
-int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
-                struct ptlrpc_request **request)
+static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
+                      struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int                    rc;
@@ -232,16 +188,14 @@ int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-
         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
         if (rc) {
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
 
-        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
-                      op_data->op_valid, op_data->op_mode, -1, 0);
+       mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
+                     op_data->op_mode, -1, 0);
 
         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
                              op_data->op_mode);
@@ -260,8 +214,8 @@ int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
         RETURN(rc);
 }
 
-int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
-                     struct ptlrpc_request **request)
+static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
+                           struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int                    rc;
@@ -273,7 +227,6 @@ int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
@@ -283,9 +236,8 @@ int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
                 RETURN(rc);
         }
 
-        mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
-                      op_data->op_valid, op_data->op_mode,
-                      op_data->op_suppgids[0], 0);
+       mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
+                     op_data->op_mode, op_data->op_suppgids[0], 0);
 
         if (op_data->op_name) {
                 char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
@@ -307,11 +259,10 @@ int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
 }
 
 static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
-                            const struct lu_fid *fid,
-                            struct obd_capa *oc, int opcode, obd_valid valid,
-                            const char *xattr_name, const char *input,
-                            int input_size, int output_size, int flags,
-                            __u32 suppgid, struct ptlrpc_request **request)
+                           const struct lu_fid *fid, int opcode, u64 valid,
+                           const char *xattr_name, const char *input,
+                           int input_size, int output_size, int flags,
+                           __u32 suppgid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int   xattr_namelen = 0;
@@ -324,7 +275,6 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_set_capa_size(req, &RMF_CAPA1, oc);
         if (xattr_name) {
                 xattr_namelen = strlen(xattr_name) + 1;
                 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
@@ -381,11 +331,9 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
                 rec->sx_time   = cfs_time_current_sec();
                 rec->sx_size   = output_size;
                 rec->sx_flags  = flags;
-
-                mdc_pack_capa(req, &RMF_CAPA1, oc);
-        } else {
-                mdc_pack_body(req, fid, oc, valid, output_size, suppgid, flags);
-        }
+       } else {
+               mdc_pack_body(req, fid, valid, output_size, suppgid, flags);
+       }
 
         if (xattr_name) {
                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
@@ -403,12 +351,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
 
         /* make rpc */
         if (opcode == MDS_REINT)
-                mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+               mdc_get_mod_rpc_slot(req, NULL);
 
         rc = ptlrpc_queue_wait(req);
 
         if (opcode == MDS_REINT)
-                mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+               mdc_put_mod_rpc_slot(req, NULL);
 
         if (rc)
                 ptlrpc_req_finished(req);
@@ -417,26 +365,27 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
         RETURN(rc);
 }
 
-int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
-                 struct obd_capa *oc, obd_valid valid, const char *xattr_name,
-                 const char *input, int input_size, int output_size,
-                 int flags, __u32 suppgid, struct ptlrpc_request **request)
+static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
+                       u64 valid, const char *xattr_name,
+                       const char *input, int input_size, int output_size,
+                       int flags, __u32 suppgid,
+                       struct ptlrpc_request **request)
 {
-        return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
-                                fid, oc, MDS_REINT, valid, xattr_name,
-                                input, input_size, output_size, flags,
-                                suppgid, request);
+       return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
+                               fid, MDS_REINT, valid, xattr_name,
+                               input, input_size, output_size, flags,
+                               suppgid, request);
 }
 
-int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
-                 struct obd_capa *oc, obd_valid valid, const char *xattr_name,
-                 const char *input, int input_size, int output_size,
-                 int flags, struct ptlrpc_request **request)
+static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
+                       u64 valid, const char *xattr_name,
+                       const char *input, int input_size, int output_size,
+                       int flags, struct ptlrpc_request **request)
 {
-        return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
-                                fid, oc, MDS_GETXATTR, valid, xattr_name,
-                                input, input_size, output_size, flags,
-                                -1, request);
+       return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
+                               fid, MDS_GETXATTR, valid, xattr_name,
+                               input, input_size, output_size, flags,
+                               -1, request);
 }
 
 #ifdef CONFIG_FS_POSIX_ACL
@@ -495,9 +444,6 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
         LASSERT(md->body != NULL);
 
        if (md->body->mbo_valid & OBD_MD_FLEASIZE) {
-               int lmmsize;
-               struct lov_mds_md *lmm;
-
                if (!S_ISREG(md->body->mbo_mode)) {
                        CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, should be a "
                               "regular file, but is not\n");
@@ -510,25 +456,15 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
                        GOTO(out, rc = -EPROTO);
                }
 
-               lmmsize = md->body->mbo_eadatasize;
-                lmm = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmmsize);
-                if (!lmm)
-                        GOTO(out, rc = -EPROTO);
-
-                rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize);
-                if (rc < 0)
-                        GOTO(out, rc);
-
-               if (rc < (typeof(rc))sizeof(*md->lsm)) {
-                        CDEBUG(D_INFO, "lsm size too small: "
-                               "rc < sizeof (*md->lsm) (%d < %d)\n",
-                               rc, (int)sizeof(*md->lsm));
-                        GOTO(out, rc = -EPROTO);
-                }
-
+               md->layout.lb_len = md->body->mbo_eadatasize;
+               md->layout.lb_buf = req_capsule_server_sized_get(pill,
+                                                       &RMF_MDT_MD,
+                                                       md->layout.lb_len);
+               if (md->layout.lb_buf == NULL)
+                       GOTO(out, rc = -EPROTO);
        } else if (md->body->mbo_valid & OBD_MD_FLDIREA) {
-               int lmvsize;
-               struct lov_mds_md *lmv;
+               const union lmv_mds_md *lmv;
+               size_t lmv_size;
 
                if (!S_ISDIR(md->body->mbo_mode)) {
                        CDEBUG(D_INFO, "OBD_MD_FLDIREA set, should be a "
@@ -536,21 +472,20 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
                        GOTO(out, rc = -EPROTO);
                }
 
-               if (md->body->mbo_eadatasize == 0) {
+               lmv_size = md->body->mbo_eadatasize;
+               if (lmv_size == 0) {
                        CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, "
                               "but eadatasize 0\n");
                        RETURN(-EPROTO);
                }
 
                if (md->body->mbo_valid & OBD_MD_MEA) {
-                       lmvsize = md->body->mbo_eadatasize;
                        lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
-                                                          lmvsize);
-                       if (!lmv)
+                                                          lmv_size);
+                       if (lmv == NULL)
                                GOTO(out, rc = -EPROTO);
 
-                       rc = obd_unpackmd(md_exp, (void *)&md->lmv, lmv,
-                                         lmvsize);
+                       rc = md_unpackmd(md_exp, &md->lmv, lmv, lmv_size);
                        if (rc < 0)
                                GOTO(out, rc);
 
@@ -586,40 +521,13 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
 #endif
                 }
         }
-       if (md->body->mbo_valid & OBD_MD_FLMDSCAPA) {
-                struct obd_capa *oc = NULL;
-
-                rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, &oc);
-                if (rc)
-                        GOTO(out, rc);
-                md->mds_capa = oc;
-        }
-
-       if (md->body->mbo_valid & OBD_MD_FLOSSCAPA) {
-                struct obd_capa *oc = NULL;
-
-                rc = mdc_unpack_capa(NULL, req, &RMF_CAPA2, &oc);
-                if (rc)
-                        GOTO(out, rc);
-                md->oss_capa = oc;
-        }
 
         EXIT;
 out:
         if (rc) {
-                if (md->oss_capa) {
-                        capa_put(md->oss_capa);
-                        md->oss_capa = NULL;
-                }
-                if (md->mds_capa) {
-                        capa_put(md->mds_capa);
-                        md->mds_capa = NULL;
-                }
 #ifdef CONFIG_FS_POSIX_ACL
                 posix_acl_release(md->posix_acl);
 #endif
-                if (md->lsm)
-                        obd_free_memmd(dt_exp, &md->lsm);
         }
         return rc;
 }
@@ -630,10 +538,6 @@ int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
         RETURN(0);
 }
 
-/**
- * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
- * RPC chains.
- */
 void mdc_replay_open(struct ptlrpc_request *req)
 {
         struct md_open_data *mod = req->rq_cb_data;
@@ -670,17 +574,18 @@ void mdc_replay_open(struct ptlrpc_request *req)
                 __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
                 struct mdt_ioepoch *epoch;
 
-                LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
-                epoch = req_capsule_client_get(&close_req->rq_pill,
-                                               &RMF_MDT_EPOCH);
-                LASSERT(epoch);
+               LASSERT(opc == MDS_CLOSE);
+               epoch = req_capsule_client_get(&close_req->rq_pill,
+                                              &RMF_MDT_EPOCH);
+               LASSERT(epoch);
 
-                if (och != NULL)
-                        LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
-                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
-               epoch->handle = body->mbo_handle;
-        }
-        EXIT;
+               if (och != NULL)
+                       LASSERT(!memcmp(&old, &epoch->mio_handle, sizeof(old)));
+
+               DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
+               epoch->mio_handle = body->mbo_handle;
+       }
+       EXIT;
 }
 
 void mdc_commit_open(struct ptlrpc_request *req)
@@ -816,24 +721,8 @@ int mdc_clear_open_replay_data(struct obd_export *exp,
         RETURN(0);
 }
 
-/* Prepares the request for the replay by the given reply */
-static void mdc_close_handle_reply(struct ptlrpc_request *req,
-                                   struct md_op_data *op_data, int rc) {
-        struct mdt_body  *repbody;
-        struct mdt_ioepoch *epoch;
-
-        if (req && rc == -EAGAIN) {
-                repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-                epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
-
-                epoch->flags |= MF_SOM_AU;
-               if (repbody->mbo_valid & OBD_MD_FLGETATTRLOCK)
-                        op_data->op_flags |= MF_GETATTR_LOCK;
-        }
-}
-
-int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
-              struct md_open_data *mod, struct ptlrpc_request **request)
+static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
+                    struct md_open_data *mod, struct ptlrpc_request **request)
 {
        struct obd_device     *obd = class_exp2obd(exp);
        struct ptlrpc_request *req;
@@ -842,9 +731,8 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
        int                    saved_rc = 0;
        ENTRY;
 
-       req_fmt = &RQF_MDS_CLOSE;
        if (op_data->op_bias & MDS_HSM_RELEASE) {
-               req_fmt = &RQF_MDS_RELEASE_CLOSE;
+               req_fmt = &RQF_MDS_INTENT_CLOSE;
 
                /* allocate a FID for volatile file */
                rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
@@ -854,6 +742,10 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                        /* save the errcode and proceed to close */
                        saved_rc = rc;
                }
+       } else if (op_data->op_bias & MDS_CLOSE_LAYOUT_SWAP) {
+               req_fmt = &RQF_MDS_INTENT_CLOSE;
+       } else {
+               req_fmt = &RQF_MDS_CLOSE;
        }
 
        *request = NULL;
@@ -861,8 +753,6 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-
         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE);
         if (rc) {
                 ptlrpc_request_free(req);
@@ -897,14 +787,12 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 
        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
                             obd->u.cli.cl_default_mds_easize);
-       req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                            obd->u.cli.cl_default_mds_cookiesize);
 
         ptlrpc_request_set_replen(req);
 
-        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
-        rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
+       mdc_get_mod_rpc_slot(req, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_mod_rpc_slot(req, NULL);
 
         if (req->rq_repmsg == NULL) {
                 CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
@@ -946,129 +834,12 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                 obd_mod_put(mod);
         }
         *request = req;
-        mdc_close_handle_reply(req, op_data, rc);
-        RETURN(rc < 0 ? rc : saved_rc);
-}
-
-int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
-                     struct md_open_data *mod)
-{
-        struct obd_device     *obd = class_exp2obd(exp);
-        struct ptlrpc_request *req;
-        int                    rc;
-        ENTRY;
-
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
-                                   &RQF_MDS_DONE_WRITING);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-
-        if (mod != NULL) {
-                LASSERTF(mod->mod_open_req != NULL &&
-                         mod->mod_open_req->rq_type != LI_POISON,
-                         "POISONED setattr %p!\n", mod->mod_open_req);
-
-                mod->mod_close_req = req;
-                DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
-                /* We no longer want to preserve this setattr for replay even
-                 * though the open was committed. b=3632, b=3633 */
-               spin_lock(&mod->mod_open_req->rq_lock);
-               mod->mod_open_req->rq_replay = 0;
-               spin_unlock(&mod->mod_open_req->rq_lock);
-        }
-
-        mdc_close_pack(req, op_data);
-        ptlrpc_request_set_replen(req);
-
-        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
-        rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
-
-        if (rc == -ESTALE) {
-                /**
-                 * it can be allowed error after 3633 if open or setattr were
-                 * committed and server failed before close was sent.
-                 * Let's check if mod exists and return no error in that case
-                 */
-                if (mod) {
-                        LASSERT(mod->mod_open_req != NULL);
-                        if (mod->mod_open_req->rq_committed)
-                                rc = 0;
-                }
-        }
-
-        if (mod) {
-                if (rc != 0)
-                        mod->mod_close_req = NULL;
-               LASSERT(mod->mod_open_req != NULL);
-               mdc_free_open(mod);
-
-                /* Since now, mod is accessed through setattr req only,
-                 * thus DW req does not keep a reference on mod anymore. */
-                obd_mod_put(mod);
-        }
 
-        mdc_close_handle_reply(req, op_data, rc);
-        ptlrpc_req_finished(req);
-        RETURN(rc);
-}
-
-#ifdef HAVE_SPLIT_SUPPORT
-int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
-                 const struct page *page, int offset)
-{
-        struct ptlrpc_request   *req;
-        struct ptlrpc_bulk_desc *desc;
-        int                      rc;
-        ENTRY;
-
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_WRITEPAGE);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        /* FIXME: capa doesn't support split yet */
-        mdc_set_capa_size(req, &RMF_CAPA1, NULL);
-
-        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_WRITEPAGE);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-
-        req->rq_request_portal = MDS_READPAGE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
-
-       desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL);
-       if (desc == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-        /* NB req now owns desc and will free it when it gets freed. */
-        ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset);
-        mdc_readdir_pack(req, 0, offset, fid, NULL);
-
-        ptlrpc_request_set_replen(req);
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
-
-        rc = sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
-out:
-        ptlrpc_req_finished(req);
-        return rc;
+       RETURN(rc < 0 ? rc : saved_rc);
 }
-EXPORT_SYMBOL(mdc_sendpage);
-#endif
 
 static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid,
-                      __u64 offset, struct obd_capa *oc,
-                      struct page **pages, int npages,
+                      u64 offset, struct page **pages, int npages,
                       struct ptlrpc_request **request)
 {
        struct ptlrpc_request   *req;
@@ -1088,8 +859,6 @@ restart_bulk:
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       mdc_set_capa_size(req, &RMF_CAPA1, oc);
-
        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE);
        if (rc) {
                ptlrpc_request_free(req);
@@ -1099,8 +868,10 @@ restart_bulk:
        req->rq_request_portal = MDS_READPAGE_PORTAL;
        ptlrpc_at_set_req_timeout(req);
 
-       desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
-                                   MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+                                   PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   MDS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL) {
                ptlrpc_request_free(req);
                RETURN(-ENOMEM);
@@ -1108,9 +879,10 @@ restart_bulk:
 
        /* NB req now owns desc and will free it when it gets freed */
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                PAGE_CACHE_SIZE);
 
-       mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE * npages, fid, oc);
+       mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE * npages, fid);
 
        ptlrpc_request_set_replen(req);
        rc = ptlrpc_queue_wait(req);
@@ -1249,7 +1021,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
  *  |        |
  * .|--------v-------   -----.
  * |s|e|f|p|ent|ent| ... |ent|
- * '--|--------------   -----'   Each CFS_PAGE contains a single
+ * '--|--------------   -----'   Each PAGE contains a single
  *    '------.                   lu_dirpage.
  * .---------v-------   -----.
  * |s|e|f|p|ent| 0 | ... | 0 |
@@ -1259,26 +1031,26 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
  * larger than LU_PAGE_SIZE, a single host page may contain multiple
  * lu_dirpages. After reading the lu_dirpages from the MDS, the
  * ldp_hash_end of the first lu_dirpage refers to the one immediately
- * after it in the same CFS_PAGE (arrows simplified for brevity, but
+ * after it in the same PAGE (arrows simplified for brevity, but
  * in general e0==s1, e1==s2, etc.):
  *
  * .--------------------   -----.
  * |s0|e0|f0|p|ent|ent| ... |ent|
  * |---v----------------   -----|
  * |s1|e1|f1|p|ent|ent| ... |ent|
- * |---v----------------   -----|  Here, each CFS_PAGE contains
+ * |---v----------------   -----|  Here, each PAGE contains
  *             ...                 multiple lu_dirpages.
  * |---v----------------   -----|
  * |s'|e'|f'|p|ent|ent| ... |ent|
  * '---|----------------   -----'
  *     v
  * .----------------------------.
- * |        next CFS_PAGE       |
+ * |        next PAGE           |
  *
  * This structure is transformed into a single logical lu_dirpage as follows:
  *
  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
- *   labeled 'next CFS_PAGE'.
+ *   labeled 'next PAGE'.
  *
  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
  *   a hash collision with the next page exists.
@@ -1307,8 +1079,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs)
                        /* Advance dp to next lu_dirpage. */
                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
 
-                       /* Check if we've reached the end of the CFS_PAGE. */
-                       if (!((unsigned long)dp & ~CFS_PAGE_MASK))
+                       /* Check if we've reached the end of the PAGE. */
+                       if (!((unsigned long)dp & ~PAGE_MASK))
                                break;
 
                        /* Save the hash and flags of this lu_dirpage. */
@@ -1348,6 +1120,14 @@ struct readpage_param {
        struct md_callback      *rp_cb;
 };
 
+#ifndef HAVE_DELETE_FROM_PAGE_CACHE
+static inline void delete_from_page_cache(struct page *page)
+{
+       remove_from_page_cache(page);
+       page_cache_release(page);
+}
+#endif
+
 /**
  * Read pages from server.
  *
@@ -1355,7 +1135,7 @@ struct readpage_param {
  * a header lu_dirpage which describes the start/end hash, and whether this
  * page is empty (contains no dir entry) or hash collide with next page.
  * After client receives reply, several pages will be integrated into dir page
- * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the
+ * in PAGE_SIZE (if PAGE_SIZE greater than LU_PAGE_SIZE), and the
  * lu_dirpage for this integrated page will be adjusted.
  **/
 static int mdc_read_page_remote(void *data, struct page *page0)
@@ -1395,9 +1175,11 @@ static int mdc_read_page_remote(void *data, struct page *page0)
                page_pool[npages] = page;
        }
 
-       rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, op_data->op_capa1,
-                        page_pool, npages, &req);
-       if (rc == 0) {
+       rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, page_pool, npages, &req);
+       if (rc < 0) {
+               /* page0 is special, which was added into page cache early */
+               delete_from_page_cache(page0);
+       } else {
                int lu_pgs;
 
                rd_pgs = (req->rq_bulk->bd_nob_transferred +
@@ -1412,8 +1194,8 @@ static int mdc_read_page_remote(void *data, struct page *page0)
 
                SetPageUptodate(page0);
        }
-
        unlock_page(page0);
+
        ptlrpc_req_finished(req);
        CDEBUG(D_CACHE, "read %d/%d pages\n", rd_pgs, npages);
        for (i = 1; i < npages; i++) {
@@ -1687,7 +1469,7 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
         /* Val is struct getinfo_fid2path result plus path */
         vallen = sizeof(*gf) + gf->gf_pathlen;
 
-       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
+       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf);
        if (rc != 0 && rc != -EREMOTE)
                GOTO(out, rc);
 
@@ -1718,7 +1500,7 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, -1, 0);
+       mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, -1, 0);
 
        /* Copy hsm_progress struct */
        req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
@@ -1730,7 +1512,10 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
+       mdc_get_mod_rpc_slot(req, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_mod_rpc_slot(req, NULL);
+
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1750,7 +1535,7 @@ static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, -1, 0);
+       mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, -1, 0);
 
        /* Copy hsm_progress struct */
        archive_mask = req_capsule_client_get(&req->rq_pill,
@@ -1783,16 +1568,14 @@ static int mdc_ioc_hsm_current_action(struct obd_export *exp,
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-
        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION);
        if (rc) {
                ptlrpc_request_free(req);
                RETURN(rc);
        }
 
-       mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
-                     OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
+       mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0,
+                     op_data->op_suppgids[0], 0);
 
        ptlrpc_request_set_replen(req);
 
@@ -1825,7 +1608,7 @@ static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, -1, 0);
+       mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, -1, 0);
 
        ptlrpc_request_set_replen(req);
 
@@ -1850,16 +1633,14 @@ static int mdc_ioc_hsm_state_get(struct obd_export *exp,
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-
        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET);
        if (rc != 0) {
                ptlrpc_request_free(req);
                RETURN(rc);
        }
 
-       mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
-                     OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
+       mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0,
+                     op_data->op_suppgids[0], 0);
 
        ptlrpc_request_set_replen(req);
 
@@ -1893,16 +1674,14 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-
        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET);
        if (rc) {
                ptlrpc_request_free(req);
                RETURN(rc);
        }
 
-       mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
-                     OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
+       mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0,
+                     op_data->op_suppgids[0], 0);
 
        /* Copy states */
        req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
@@ -1912,10 +1691,11 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
-       GOTO(out, rc);
+       mdc_get_mod_rpc_slot(req, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_mod_rpc_slot(req, NULL);
 
-       EXIT;
+       GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
        return rc;
@@ -1948,7 +1728,7 @@ static int mdc_ioc_hsm_request(struct obd_export *exp,
                RETURN(rc);
        }
 
-       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, -1, 0);
+       mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, -1, 0);
 
        /* Copy hsm_request struct */
        req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
@@ -1971,7 +1751,10 @@ static int mdc_ioc_hsm_request(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
+       mdc_get_mod_rpc_slot(req, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_mod_rpc_slot(req, NULL);
+
        GOTO(out, rc);
 
 out:
@@ -2066,10 +1849,10 @@ static int mdc_changelog_send_thread(void *csdata)
        if (cs->cs_buf == NULL)
                GOTO(out, rc = -ENOMEM);
 
-        /* Set up the remote catalog handle */
-        ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
-        if (ctxt == NULL)
-                GOTO(out, rc = -ENOENT);
+       /* Set up the remote catalog handle */
+       ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
+       if (ctxt == NULL)
+               GOTO(out, rc = -ENOENT);
        rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
                       LLOG_OPEN_EXISTS);
        if (rc) {
@@ -2089,19 +1872,17 @@ static int mdc_changelog_send_thread(void *csdata)
 
        rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
 
-        /* Send EOF no matter what our result */
-        if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch),
-                                      cs->cs_flags))) {
-                kuch->kuc_msgtype = CL_EOF;
-                libcfs_kkuc_msg_put(cs->cs_fp, kuch);
-        }
+       /* Send EOF no matter what our result */
+       kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags);
+       kuch->kuc_msgtype = CL_EOF;
+       libcfs_kkuc_msg_put(cs->cs_fp, kuch);
 
 out:
        fput(cs->cs_fp);
        if (llh)
                llog_cat_close(NULL, llh);
-        if (ctxt)
-                llog_ctxt_put(ctxt);
+       if (ctxt)
+               llog_ctxt_put(ctxt);
        if (cs->cs_buf)
                OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
        OBD_FREE_PTR(cs);
@@ -2149,53 +1930,6 @@ static int mdc_ioc_changelog_send(struct obd_device *obd,
 static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
                                 struct lustre_kernelcomm *lk);
 
-static int mdc_quotacheck(struct obd_device *unused, struct obd_export *exp,
-                          struct obd_quotactl *oqctl)
-{
-        struct client_obd       *cli = &exp->exp_obd->u.cli;
-        struct ptlrpc_request   *req;
-        struct obd_quotactl     *body;
-        int                      rc;
-        ENTRY;
-
-        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                        &RQF_MDS_QUOTACHECK, LUSTRE_MDS_VERSION,
-                                        MDS_QUOTACHECK);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
-        *body = *oqctl;
-
-        ptlrpc_request_set_replen(req);
-
-        /* the next poll will find -ENODATA, that means quotacheck is
-         * going on */
-        cli->cl_qchk_stat = -ENODATA;
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                cli->cl_qchk_stat = rc;
-        ptlrpc_req_finished(req);
-        RETURN(rc);
-}
-
-static int mdc_quota_poll_check(struct obd_export *exp,
-                                struct if_quotacheck *qchk)
-{
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        int rc;
-        ENTRY;
-
-        qchk->obd_uuid = cli->cl_target_uuid;
-        memcpy(qchk->obd_type, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME));
-
-        rc = cli->cl_qchk_stat;
-        /* the client is not the previous one */
-        if (rc == CL_NOT_QUOTACHECKED)
-                rc = -EINTR;
-        RETURN(rc);
-}
-
 static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
                         struct obd_quotactl *oqctl)
 {
@@ -2246,9 +1980,9 @@ static int mdc_ioc_swap_layouts(struct obd_export *exp,
 
        /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
         * first thing it will do is to cancel the 2 layout
-        * locks hold by this client.
+        * locks held by this client.
         * So the client must cancel its layout locks on the 2 fids
-        * with the request RPC to avoid extra RPC round trips
+        * with the request RPC to avoid extra RPC round trips.
         */
        count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
                                        LCK_EX, MDS_INODELOCK_LAYOUT |
@@ -2264,9 +1998,6 @@ static int mdc_ioc_swap_layouts(struct obd_export *exp,
                RETURN(-ENOMEM);
        }
 
-       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-       mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
-
        rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
        if (rc) {
                ptlrpc_request_free(req);
@@ -2351,9 +2082,6 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         case IOC_OSC_SET_ACTIVE:
                 rc = ptlrpc_set_import_active(imp, data->ioc_offset);
                 GOTO(out, rc);
-        case OBD_IOC_POLL_QUOTACHECK:
-                rc = mdc_quota_poll_check(exp, (struct if_quotacheck *)karg);
-                GOTO(out, rc);
         case OBD_IOC_PING_TARGET:
                 rc = ptlrpc_obd_ping(obd);
                 GOTO(out, rc);
@@ -2427,9 +2155,9 @@ out:
        return rc;
 }
 
-int mdc_get_info_rpc(struct obd_export *exp,
-                     obd_count keylen, void *key,
-                     int vallen, void *val)
+static int mdc_get_info_rpc(struct obd_export *exp,
+                           u32 keylen, void *key,
+                           u32 vallen, void *val)
 {
         struct obd_import      *imp = class_exp2cliimp(exp);
         struct ptlrpc_request  *req;
@@ -2444,7 +2172,7 @@ int mdc_get_info_rpc(struct obd_export *exp,
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
                              RCL_CLIENT, keylen);
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
-                             RCL_CLIENT, sizeof(__u32));
+                            RCL_CLIENT, sizeof(vallen));
 
         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
         if (rc) {
@@ -2455,7 +2183,7 @@ int mdc_get_info_rpc(struct obd_export *exp,
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
         memcpy(tmp, key, keylen);
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
-        memcpy(tmp, &vallen, sizeof(__u32));
+       memcpy(tmp, &vallen, sizeof(vallen));
 
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
                              RCL_SERVER, vallen);
@@ -2609,11 +2337,11 @@ static int mdc_kuc_reregister(struct obd_import *imp)
                                         (void *)imp);
 }
 
-int mdc_set_info_async(const struct lu_env *env,
-                      struct obd_export *exp,
-                      obd_count keylen, void *key,
-                      obd_count vallen, void *val,
-                      struct ptlrpc_request_set *set)
+static int mdc_set_info_async(const struct lu_env *env,
+                             struct obd_export *exp,
+                             u32 keylen, void *key,
+                             u32 vallen, void *val,
+                             struct ptlrpc_request_set *set)
 {
        struct obd_import       *imp = class_exp2cliimp(exp);
        int                      rc;
@@ -2657,13 +2385,19 @@ int mdc_set_info_async(const struct lu_env *env,
                 RETURN(rc);
         }
 
+       if (KEY_IS(KEY_DEFAULT_EASIZE)) {
+               __u32 *default_easize = val;
+
+               exp->exp_obd->u.cli.cl_default_mds_easize = *default_easize;
+               RETURN(0);
+       }
+
        CERROR("Unknown key %s\n", (char *)key);
        RETURN(-EINVAL);
 }
 
-int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
-                __u32 keylen, void *key, __u32 *vallen, void *val,
-                struct lov_stripe_md *lsm)
+static int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
+                       __u32 keylen, void *key, __u32 *vallen, void *val)
 {
        int rc = -EINVAL;
 
@@ -2686,26 +2420,6 @@ int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
                default_easize = val;
                *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize;
                RETURN(0);
-       } else if (KEY_IS(KEY_MAX_COOKIESIZE)) {
-               __u32 mdsize, *max_cookiesize;
-
-               if (*vallen != sizeof(int))
-                       RETURN(-EINVAL);
-               mdsize = *(int *)val;
-               if (mdsize > exp->exp_obd->u.cli.cl_max_mds_cookiesize)
-                       exp->exp_obd->u.cli.cl_max_mds_cookiesize = mdsize;
-               max_cookiesize = val;
-               *max_cookiesize = exp->exp_obd->u.cli.cl_max_mds_cookiesize;
-               RETURN(0);
-       } else if (KEY_IS(KEY_DEFAULT_COOKIESIZE)) {
-               __u32 *default_cookiesize;
-
-               if (*vallen != sizeof(int))
-                       RETURN(-EINVAL);
-               default_cookiesize = val;
-               *default_cookiesize =
-                       exp->exp_obd->u.cli.cl_default_mds_cookiesize;
-               RETURN(0);
         } else if (KEY_IS(KEY_CONN_DATA)) {
                 struct obd_import *imp = class_exp2cliimp(exp);
                 struct obd_connect_data *data = val;
@@ -2725,8 +2439,8 @@ int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
         RETURN(rc);
 }
 
-int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid,
-             struct obd_capa *oc, struct ptlrpc_request **request)
+static int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid,
+                    struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int                    rc;
@@ -2737,15 +2451,13 @@ int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_set_capa_size(req, &RMF_CAPA1, oc);
-
         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC);
         if (rc) {
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
 
-        mdc_pack_body(req, fid, oc, 0, 0, -1, 0);
+       mdc_pack_body(req, fid, 0, 0, -1, 0);
 
         ptlrpc_request_set_replen(req);
 
@@ -2819,7 +2531,8 @@ int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp,
        RETURN(seq_client_alloc_fid(env, seq, fid));
 }
 
-struct obd_uuid *mdc_get_uuid(struct obd_export *exp) {
+static struct obd_uuid *mdc_get_uuid(struct obd_export *exp)
+{
         struct client_obd *cli = &exp->exp_obd->u.cli;
         return &cli->cl_target_uuid;
 }
@@ -2851,7 +2564,7 @@ static int mdc_resource_inode_free(struct ldlm_resource *res)
        return 0;
 }
 
-struct ldlm_valblock_ops inode_lvbo = {
+static struct ldlm_valblock_ops inode_lvbo = {
        .lvbo_free = mdc_resource_inode_free
 };
 
@@ -2890,28 +2603,17 @@ static void mdc_llog_finish(struct obd_device *obd)
 
 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 {
-       struct client_obd               *cli = &obd->u.cli;
        int                             rc;
        ENTRY;
 
-        OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
-        if (!cli->cl_rpc_lock)
-                RETURN(-ENOMEM);
-        mdc_init_rpc_lock(cli->cl_rpc_lock);
-
        rc = ptlrpcd_addref();
        if (rc < 0)
-               GOTO(err_rpc_lock, rc);
-
-        OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
-        if (!cli->cl_close_lock)
-                GOTO(err_ptlrpcd_decref, rc = -ENOMEM);
-        mdc_init_rpc_lock(cli->cl_close_lock);
+               RETURN(rc);
 
         rc = client_obd_setup(obd, cfg);
         if (rc)
-                GOTO(err_close_lock, rc);
-#ifdef LPROCFS
+               GOTO(err_ptlrpcd_decref, rc);
+#ifdef CONFIG_PROC_FS
        obd->obd_vars = lprocfs_mdc_obd_vars;
        lprocfs_obd_setup(obd);
        lprocfs_alloc_md_stats(obd, 0);
@@ -2927,30 +2629,26 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
         if (rc) {
                 mdc_cleanup(obd);
                 CERROR("failed to setup llogging subsystems\n");
+               RETURN(rc);
         }
 
         RETURN(rc);
 
-err_close_lock:
-        OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
 err_ptlrpcd_decref:
         ptlrpcd_decref();
-err_rpc_lock:
-        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
         RETURN(rc);
 }
 
-/* Initialize the default and maximum LOV EA and cookie sizes.  This allows
+/* Initialize the default and maximum LOV EA sizes.  This allows
  * us to make MDS RPCs with large enough reply buffers to hold a default
- * sized EA and cookie without having to calculate this (via a call into the
+ * sized EA without having to calculate this (via a call into the
  * LOV + OSCs) each time we make an RPC.  The maximum size is also tracked
  * but not used to avoid wastefully vmalloc()'ing large reply buffers when
  * a large number of stripes is possible.  If a larger reply buffer is
  * required it will be reallocated in the ptlrpc layer due to overflow.
  */
 static int mdc_init_ea_size(struct obd_export *exp, __u32 easize,
-                           __u32 def_easize, __u32 cookiesize,
-                           __u32 def_cookiesize)
+                           __u32 def_easize)
 {
        struct obd_device *obd = exp->exp_obd;
        struct client_obd *cli = &obd->u.cli;
@@ -2962,12 +2660,6 @@ static int mdc_init_ea_size(struct obd_export *exp, __u32 easize,
        if (cli->cl_default_mds_easize < def_easize)
                cli->cl_default_mds_easize = def_easize;
 
-       if (cli->cl_max_mds_cookiesize < cookiesize)
-               cli->cl_max_mds_cookiesize = cookiesize;
-
-       if (cli->cl_default_mds_cookiesize < def_cookiesize)
-               cli->cl_default_mds_cookiesize = def_cookiesize;
-
        RETURN(0);
 }
 
@@ -2982,7 +2674,7 @@ static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
         case OBD_CLEANUP_EXPORTS:
                /* Failsafe, ok if racy */
                if (obd->obd_type->typ_refcnt <= 1)
-                       libcfs_kkuc_group_rem(0, KUC_GRP_HSM, NULL);
+                       libcfs_kkuc_group_rem(0, KUC_GRP_HSM);
 
                 obd_cleanup_client_import(obd);
                 ptlrpc_lprocfs_unregister_obd(obd);
@@ -2996,17 +2688,12 @@ static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 
 static int mdc_cleanup(struct obd_device *obd)
 {
-        struct client_obd *cli = &obd->u.cli;
-
-        OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock));
-        OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock));
-
         ptlrpcd_decref();
 
         return client_obd_cleanup(obd);
 }
 
-static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
+static int mdc_process_config(struct obd_device *obd, size_t len, void *buf)
 {
         struct lustre_cfg *lcfg = buf;
        int rc = class_process_proc_param(PARAM_MDC, obd->obd_vars, lcfg, obd);
@@ -3015,9 +2702,8 @@ static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
 
 
 /* get remote permission for current user on fid */
-int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
-                        struct obd_capa *oc, __u32 suppgid,
-                        struct ptlrpc_request **request)
+static int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
+                              u32 suppgid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request  *req;
         int                    rc;
@@ -3030,15 +2716,13 @@ int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        mdc_set_capa_size(req, &RMF_CAPA1, oc);
-
         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR);
         if (rc) {
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
 
-        mdc_pack_body(req, fid, oc, OBD_MD_FLRMTPERM, 0, suppgid, 0);
+       mdc_pack_body(req, fid, OBD_MD_FLRMTPERM, 0, suppgid, 0);
 
         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
                              sizeof(struct mdt_remote_perm));
@@ -3053,62 +2737,7 @@ int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
-static int mdc_interpret_renew_capa(const struct lu_env *env,
-                                    struct ptlrpc_request *req, void *args,
-                                    int status)
-{
-        struct mdc_renew_capa_args *ra = args;
-        struct mdt_body *body = NULL;
-        struct lustre_capa *capa;
-        ENTRY;
-
-        if (status)
-                GOTO(out, capa = ERR_PTR(status));
-
-        body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-        if (body == NULL)
-                GOTO(out, capa = ERR_PTR(-EFAULT));
-
-       if ((body->mbo_valid & OBD_MD_FLOSSCAPA) == 0)
-                GOTO(out, capa = ERR_PTR(-ENOENT));
-
-        capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA2);
-        if (!capa)
-                GOTO(out, capa = ERR_PTR(-EFAULT));
-        EXIT;
-out:
-        ra->ra_cb(ra->ra_oc, capa);
-        return 0;
-}
-
-static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
-                          renew_capa_cb_t cb)
-{
-        struct ptlrpc_request *req;
-        struct mdc_renew_capa_args *ra;
-        ENTRY;
-
-        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_GETATTR,
-                                        LUSTRE_MDS_VERSION, MDS_GETATTR);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        /* NB, OBD_MD_FLOSSCAPA is set here, but it doesn't necessarily mean the
-         * capa to renew is oss capa.
-         */
-        mdc_pack_body(req, &oc->c_capa.lc_fid, oc, OBD_MD_FLOSSCAPA, 0, -1, 0);
-        ptlrpc_request_set_replen(req);
-
-        CLASSERT(sizeof(*ra) <= sizeof(req->rq_async_args));
-        ra = ptlrpc_req_async_args(req);
-        ra->ra_oc = oc;
-        ra->ra_cb = cb;
-        req->rq_interpret_reply = mdc_interpret_renew_capa;
-        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
-        RETURN(0);
-}
-
-struct obd_ops mdc_obd_ops = {
+static struct obd_ops mdc_obd_ops = {
         .o_owner            = THIS_MODULE,
         .o_setup            = mdc_setup,
         .o_precleanup       = mdc_precleanup,
@@ -3128,16 +2757,14 @@ struct obd_ops mdc_obd_ops = {
         .o_process_config   = mdc_process_config,
         .o_get_uuid         = mdc_get_uuid,
         .o_quotactl         = mdc_quotactl,
-        .o_quotacheck       = mdc_quotacheck
 };
 
-struct md_ops mdc_md_ops = {
+static struct md_ops mdc_md_ops = {
         .m_getstatus        = mdc_getstatus,
         .m_null_inode      = mdc_null_inode,
         .m_find_cbdata      = mdc_find_cbdata,
         .m_close            = mdc_close,
         .m_create           = mdc_create,
-        .m_done_writing     = mdc_done_writing,
         .m_enqueue          = mdc_enqueue,
         .m_getattr          = mdc_getattr,
         .m_getattr_name     = mdc_getattr_name,
@@ -3158,14 +2785,12 @@ struct md_ops mdc_md_ops = {
         .m_free_lustre_md   = mdc_free_lustre_md,
         .m_set_open_replay_data = mdc_set_open_replay_data,
         .m_clear_open_replay_data = mdc_clear_open_replay_data,
-        .m_renew_capa       = mdc_renew_capa,
-        .m_unpack_capa      = mdc_unpack_capa,
         .m_get_remote_perm  = mdc_get_remote_perm,
         .m_intent_getattr_async = mdc_intent_getattr_async,
         .m_revalidate_lock      = mdc_revalidate_lock
 };
 
-int __init mdc_init(void)
+static int __init mdc_init(void)
 {
        return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL,
                                   LUSTRE_MDC_NAME, NULL);