Whamcloud - gitweb
LU-5939 hsm: make HSM modification requests replayable
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 1699d29..56fcc0d 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <linux/miscdevice.h>
 #include <linux/init.h>
 #include <linux/utsname.h>
+#include <linux/kthread.h>
+#include <linux/user_namespace.h>
+#ifdef HAVE_UIDGID_HEADER
+# include <linux/uidgid.h>
+#endif
 
 #include <lustre_acl.h>
 #include <lustre_ioctl.h>
@@ -50,8 +55,8 @@
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <lustre_log.h>
+#include <lustre_kernelcomm.h>
 #include <cl_object.h>
-#include <lclient.h>
 
 #include "mdc_internal.h"
 
@@ -64,8 +69,9 @@ struct mdc_renew_capa_args {
 
 static int mdc_cleanup(struct obd_device *obd);
 
-int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
-                    const struct req_msg_field *field, struct obd_capa **oc)
+static int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
+                          const struct req_msg_field *field,
+                          struct obd_capa **oc)
 {
         struct lustre_capa *capa;
         struct obd_capa *c;
@@ -150,8 +156,8 @@ out:
 }
 
 /* This should be mdc_get_info("rootfid") */
-int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
-                  struct obd_capa **pc)
+static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid,
+                        struct obd_capa **pc)
 {
         return send_getstatus(class_exp2cliimp(exp), rootfid, pc,
                               LUSTRE_IMP_FULL, 0);
@@ -216,8 +222,8 @@ static int mdc_getattr_common(struct obd_export *exp,
         RETURN(0);
 }
 
-int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
-                struct ptlrpc_request **request)
+static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
+                      struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int                    rc;
@@ -261,8 +267,8 @@ int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
         RETURN(rc);
 }
 
-int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
-                     struct ptlrpc_request **request)
+static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
+                           struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int                    rc;
@@ -308,11 +314,11 @@ int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
 }
 
 static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
-                            const struct lu_fid *fid,
-                            struct obd_capa *oc, int opcode, obd_valid valid,
-                            const char *xattr_name, const char *input,
-                            int input_size, int output_size, int flags,
-                            __u32 suppgid, struct ptlrpc_request **request)
+                           const struct lu_fid *fid,
+                           struct obd_capa *oc, int opcode, u64 valid,
+                           const char *xattr_name, const char *input,
+                           int input_size, int output_size, int flags,
+                           __u32 suppgid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int   xattr_namelen = 0;
@@ -418,10 +424,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
         RETURN(rc);
 }
 
-int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
-                 struct obd_capa *oc, obd_valid valid, const char *xattr_name,
-                 const char *input, int input_size, int output_size,
-                 int flags, __u32 suppgid, struct ptlrpc_request **request)
+static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
+                       struct obd_capa *oc, u64 valid,
+                       const char *xattr_name,
+                       const char *input, int input_size, int output_size,
+                       int flags, __u32 suppgid,
+                       struct ptlrpc_request **request)
 {
         return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR,
                                 fid, oc, MDS_REINT, valid, xattr_name,
@@ -429,10 +437,11 @@ int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
                                 suppgid, request);
 }
 
-int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
-                 struct obd_capa *oc, obd_valid valid, const char *xattr_name,
-                 const char *input, int input_size, int output_size,
-                 int flags, struct ptlrpc_request **request)
+static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
+                       struct obd_capa *oc, u64 valid,
+                       const char *xattr_name,
+                       const char *input, int input_size, int output_size,
+                       int flags, struct ptlrpc_request **request)
 {
         return mdc_xattr_common(exp, &RQF_MDS_GETXATTR,
                                 fid, oc, MDS_GETXATTR, valid, xattr_name,
@@ -520,7 +529,7 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
                 if (rc < 0)
                         GOTO(out, rc);
 
-                if (rc < sizeof(*md->lsm)) {
+               if (rc < (typeof(rc))sizeof(*md->lsm)) {
                         CDEBUG(D_INFO, "lsm size too small: "
                                "rc < sizeof (*md->lsm) (%d < %d)\n",
                                rc, (int)sizeof(*md->lsm));
@@ -555,7 +564,7 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
                        if (rc < 0)
                                GOTO(out, rc);
 
-                       if (rc < sizeof(*md->lmv)) {
+                       if (rc < (typeof(rc))sizeof(*md->lmv)) {
                                CDEBUG(D_INFO, "size too small:  "
                                       "rc < sizeof(*md->lmv) (%d < %d)\n",
                                        rc, (int)sizeof(*md->lmv));
@@ -631,10 +640,6 @@ int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
         RETURN(0);
 }
 
-/**
- * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
- * RPC chains.
- */
 void mdc_replay_open(struct ptlrpc_request *req)
 {
         struct md_open_data *mod = req->rq_cb_data;
@@ -671,17 +676,18 @@ void mdc_replay_open(struct ptlrpc_request *req)
                 __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
                 struct mdt_ioepoch *epoch;
 
-                LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
-                epoch = req_capsule_client_get(&close_req->rq_pill,
-                                               &RMF_MDT_EPOCH);
-                LASSERT(epoch);
+               LASSERT(opc == MDS_CLOSE);
+               epoch = req_capsule_client_get(&close_req->rq_pill,
+                                              &RMF_MDT_EPOCH);
+               LASSERT(epoch);
 
-                if (och != NULL)
-                        LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
-                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
-               epoch->handle = body->mbo_handle;
-        }
-        EXIT;
+               if (och != NULL)
+                       LASSERT(!memcmp(&old, &epoch->mio_handle, sizeof(old)));
+
+               DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
+               epoch->mio_handle = body->mbo_handle;
+       }
+       EXIT;
 }
 
 void mdc_commit_open(struct ptlrpc_request *req)
@@ -817,24 +823,8 @@ int mdc_clear_open_replay_data(struct obd_export *exp,
         RETURN(0);
 }
 
-/* Prepares the request for the replay by the given reply */
-static void mdc_close_handle_reply(struct ptlrpc_request *req,
-                                   struct md_op_data *op_data, int rc) {
-        struct mdt_body  *repbody;
-        struct mdt_ioepoch *epoch;
-
-        if (req && rc == -EAGAIN) {
-                repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-                epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
-
-                epoch->flags |= MF_SOM_AU;
-               if (repbody->mbo_valid & OBD_MD_FLGETATTRLOCK)
-                        op_data->op_flags |= MF_GETATTR_LOCK;
-        }
-}
-
-int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
-              struct md_open_data *mod, struct ptlrpc_request **request)
+static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
+                    struct md_open_data *mod, struct ptlrpc_request **request)
 {
        struct obd_device     *obd = class_exp2obd(exp);
        struct ptlrpc_request *req;
@@ -947,126 +937,10 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                 obd_mod_put(mod);
         }
         *request = req;
-        mdc_close_handle_reply(req, op_data, rc);
-        RETURN(rc < 0 ? rc : saved_rc);
-}
-
-int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
-                     struct md_open_data *mod)
-{
-        struct obd_device     *obd = class_exp2obd(exp);
-        struct ptlrpc_request *req;
-        int                    rc;
-        ENTRY;
-
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
-                                   &RQF_MDS_DONE_WRITING);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-
-        if (mod != NULL) {
-                LASSERTF(mod->mod_open_req != NULL &&
-                         mod->mod_open_req->rq_type != LI_POISON,
-                         "POISONED setattr %p!\n", mod->mod_open_req);
-
-                mod->mod_close_req = req;
-                DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
-                /* We no longer want to preserve this setattr for replay even
-                 * though the open was committed. b=3632, b=3633 */
-               spin_lock(&mod->mod_open_req->rq_lock);
-               mod->mod_open_req->rq_replay = 0;
-               spin_unlock(&mod->mod_open_req->rq_lock);
-        }
-
-        mdc_close_pack(req, op_data);
-        ptlrpc_request_set_replen(req);
-
-        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
-        rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
-
-        if (rc == -ESTALE) {
-                /**
-                 * it can be allowed error after 3633 if open or setattr were
-                 * committed and server failed before close was sent.
-                 * Let's check if mod exists and return no error in that case
-                 */
-                if (mod) {
-                        LASSERT(mod->mod_open_req != NULL);
-                        if (mod->mod_open_req->rq_committed)
-                                rc = 0;
-                }
-        }
-
-        if (mod) {
-                if (rc != 0)
-                        mod->mod_close_req = NULL;
-               LASSERT(mod->mod_open_req != NULL);
-               mdc_free_open(mod);
 
-                /* Since now, mod is accessed through setattr req only,
-                 * thus DW req does not keep a reference on mod anymore. */
-                obd_mod_put(mod);
-        }
-
-        mdc_close_handle_reply(req, op_data, rc);
-        ptlrpc_req_finished(req);
-        RETURN(rc);
+       RETURN(rc < 0 ? rc : saved_rc);
 }
 
-#ifdef HAVE_SPLIT_SUPPORT
-int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
-                 const struct page *page, int offset)
-{
-        struct ptlrpc_request   *req;
-        struct ptlrpc_bulk_desc *desc;
-        int                      rc;
-        ENTRY;
-
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_WRITEPAGE);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        /* FIXME: capa doesn't support split yet */
-        mdc_set_capa_size(req, &RMF_CAPA1, NULL);
-
-        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_WRITEPAGE);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-
-        req->rq_request_portal = MDS_READPAGE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
-
-       desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL);
-       if (desc == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-        /* NB req now owns desc and will free it when it gets freed. */
-        ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset);
-        mdc_readdir_pack(req, 0, offset, fid, NULL);
-
-        ptlrpc_request_set_replen(req);
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
-
-        rc = sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
-out:
-        ptlrpc_req_finished(req);
-        return rc;
-}
-EXPORT_SYMBOL(mdc_sendpage);
-#endif
-
 static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid,
                       __u64 offset, struct obd_capa *oc,
                       struct page **pages, int npages,
@@ -1250,7 +1124,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
  *  |        |
  * .|--------v-------   -----.
  * |s|e|f|p|ent|ent| ... |ent|
- * '--|--------------   -----'   Each CFS_PAGE contains a single
+ * '--|--------------   -----'   Each PAGE contains a single
  *    '------.                   lu_dirpage.
  * .---------v-------   -----.
  * |s|e|f|p|ent| 0 | ... | 0 |
@@ -1260,26 +1134,26 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
  * larger than LU_PAGE_SIZE, a single host page may contain multiple
  * lu_dirpages. After reading the lu_dirpages from the MDS, the
  * ldp_hash_end of the first lu_dirpage refers to the one immediately
- * after it in the same CFS_PAGE (arrows simplified for brevity, but
+ * after it in the same PAGE (arrows simplified for brevity, but
  * in general e0==s1, e1==s2, etc.):
  *
  * .--------------------   -----.
  * |s0|e0|f0|p|ent|ent| ... |ent|
  * |---v----------------   -----|
  * |s1|e1|f1|p|ent|ent| ... |ent|
- * |---v----------------   -----|  Here, each CFS_PAGE contains
+ * |---v----------------   -----|  Here, each PAGE contains
  *             ...                 multiple lu_dirpages.
  * |---v----------------   -----|
  * |s'|e'|f'|p|ent|ent| ... |ent|
  * '---|----------------   -----'
  *     v
  * .----------------------------.
- * |        next CFS_PAGE       |
+ * |        next PAGE           |
  *
  * This structure is transformed into a single logical lu_dirpage as follows:
  *
  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
- *   labeled 'next CFS_PAGE'.
+ *   labeled 'next PAGE'.
  *
  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
  *   a hash collision with the next page exists.
@@ -1308,8 +1182,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs)
                        /* Advance dp to next lu_dirpage. */
                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
 
-                       /* Check if we've reached the end of the CFS_PAGE. */
-                       if (!((unsigned long)dp & ~CFS_PAGE_MASK))
+                       /* Check if we've reached the end of the PAGE. */
+                       if (!((unsigned long)dp & ~PAGE_MASK))
                                break;
 
                        /* Save the hash and flags of this lu_dirpage. */
@@ -1349,6 +1223,14 @@ struct readpage_param {
        struct md_callback      *rp_cb;
 };
 
+#ifndef HAVE_DELETE_FROM_PAGE_CACHE
+static inline void delete_from_page_cache(struct page *page)
+{
+       remove_from_page_cache(page);
+       page_cache_release(page);
+}
+#endif
+
 /**
  * Read pages from server.
  *
@@ -1356,7 +1238,7 @@ struct readpage_param {
  * a header lu_dirpage which describes the start/end hash, and whether this
  * page is empty (contains no dir entry) or hash collide with next page.
  * After client receives reply, several pages will be integrated into dir page
- * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the
+ * in PAGE_SIZE (if PAGE_SIZE greater than LU_PAGE_SIZE), and the
  * lu_dirpage for this integrated page will be adjusted.
  **/
 static int mdc_read_page_remote(void *data, struct page *page0)
@@ -1398,7 +1280,10 @@ static int mdc_read_page_remote(void *data, struct page *page0)
 
        rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, op_data->op_capa1,
                         page_pool, npages, &req);
-       if (rc == 0) {
+       if (rc < 0) {
+               /* page0 is special, which was added into page cache early */
+               delete_from_page_cache(page0);
+       } else {
                int lu_pgs;
 
                rd_pgs = (req->rq_bulk->bd_nob_transferred +
@@ -1407,15 +1292,14 @@ static int mdc_read_page_remote(void *data, struct page *page0)
                                                        LU_PAGE_SHIFT;
                LASSERT(!(req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
 
-               CDEBUG(D_INODE, "read %d(%d)/%d pages\n", rd_pgs, lu_pgs,
-                      op_data->op_npages);
+               CDEBUG(D_INODE, "read %d(%d) pages\n", rd_pgs, lu_pgs);
 
                mdc_adjust_dirpages(page_pool, rd_pgs, lu_pgs);
 
                SetPageUptodate(page0);
        }
-
        unlock_page(page0);
+
        ptlrpc_req_finished(req);
        CDEBUG(D_CACHE, "read %d/%d pages\n", rd_pgs, npages);
        for (i = 1; i < npages; i++) {
@@ -1541,7 +1425,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data,
                                            rp_param.rp_hash64),
                               mdc_read_page_remote, &rp_param);
        if (IS_ERR(page)) {
-               CERROR("%s: read cache page: "DFID" at "LPU64": rc %ld\n",
+               CDEBUG(D_INFO, "%s: read cache page: "DFID" at "LPU64": %ld\n",
                       exp->exp_obd->obd_name, PFID(&op_data->op_fid1),
                       rp_param.rp_off, PTR_ERR(page));
                GOTO(out_unlock, rc = PTR_ERR(page));
@@ -1689,7 +1573,7 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
         /* Val is struct getinfo_fid2path result plus path */
         vallen = sizeof(*gf) + gf->gf_pathlen;
 
-       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
+       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf);
        if (rc != 0 && rc != -EREMOTE)
                GOTO(out, rc);
 
@@ -1732,7 +1616,10 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
+       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1914,10 +1801,11 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
-       GOTO(out, rc);
+       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 
-       EXIT;
+       GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
        return rc;
@@ -1973,7 +1861,10 @@ static int mdc_ioc_hsm_request(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
+       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+
        GOTO(out, rc);
 
 out:
@@ -1981,7 +1872,7 @@ out:
        return rc;
 }
 
-static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
+static struct kuc_hdr *changelog_kuc_hdr(char *buf, size_t len, __u32 flags)
 {
        struct kuc_hdr *lh = (struct kuc_hdr *)buf;
 
@@ -1995,46 +1886,50 @@ static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
        return lh;
 }
 
-#define D_CHANGELOG 0
-
 struct changelog_show {
-       __u64           cs_startrec;
-       __u32           cs_flags;
-       struct file     *cs_fp;
-       char            *cs_buf;
-       struct obd_device *cs_obd;
+       __u64                            cs_startrec;
+       enum changelog_send_flag         cs_flags;
+       struct file                     *cs_fp;
+       char                            *cs_buf;
+       struct obd_device               *cs_obd;
 };
 
+static inline char *cs_obd_name(struct changelog_show *cs)
+{
+       return cs->cs_obd->obd_name;
+}
+
 static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
                             struct llog_rec_hdr *hdr, void *data)
 {
-       struct changelog_show *cs = data;
-       struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
-       struct kuc_hdr *lh;
-       int len, rc;
+       struct changelog_show           *cs = data;
+       struct llog_changelog_rec       *rec = (struct llog_changelog_rec *)hdr;
+       struct kuc_hdr                  *lh;
+       size_t                           len;
+       int                              rc;
        ENTRY;
 
        if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
                rc = -EINVAL;
                CERROR("%s: not a changelog rec %x/%d: rc = %d\n",
-                      cs->cs_obd->obd_name, rec->cr_hdr.lrh_type,
+                      cs_obd_name(cs), rec->cr_hdr.lrh_type,
                       rec->cr.cr_type, rc);
                RETURN(rc);
        }
 
        if (rec->cr.cr_index < cs->cs_startrec) {
                /* Skip entries earlier than what we are interested in */
-               CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n",
+               CDEBUG(D_HSM, "rec="LPU64" start="LPU64"\n",
                       rec->cr.cr_index, cs->cs_startrec);
                RETURN(0);
        }
 
-       CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID
-               " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
-               changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
-               rec->cr.cr_flags & CLF_FLAGMASK,
-               PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
-               rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
+       CDEBUG(D_HSM, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID" %.*s\n",
+              rec->cr.cr_index, rec->cr.cr_type,
+              changelog_type2str(rec->cr.cr_type), rec->cr.cr_time,
+              rec->cr.cr_flags & CLF_FLAGMASK,
+              PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid),
+              rec->cr.cr_namelen, changelog_rec_name(&rec->cr));
 
        len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen;
 
@@ -2043,38 +1938,43 @@ static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
         memcpy(lh + 1, &rec->cr, len - sizeof(*lh));
 
         rc = libcfs_kkuc_msg_put(cs->cs_fp, lh);
-        CDEBUG(D_CHANGELOG, "kucmsg fp %p len %d rc %d\n", cs->cs_fp, len,rc);
+       CDEBUG(D_HSM, "kucmsg fp %p len %zu rc %d\n", cs->cs_fp, len, rc);
 
         RETURN(rc);
 }
 
 static int mdc_changelog_send_thread(void *csdata)
 {
-       struct changelog_show *cs = csdata;
-       struct llog_ctxt *ctxt = NULL;
-       struct llog_handle *llh = NULL;
-       struct kuc_hdr *kuch;
-       int rc;
+       struct changelog_show   *cs = csdata;
+       struct llog_ctxt        *ctxt = NULL;
+       struct llog_handle      *llh = NULL;
+       struct kuc_hdr          *kuch;
+       enum llog_flag           flags = LLOG_F_IS_CAT;
+       int                      rc;
 
-       CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n",
+       CDEBUG(D_HSM, "changelog to fp=%p start "LPU64"\n",
               cs->cs_fp, cs->cs_startrec);
 
        OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
        if (cs->cs_buf == NULL)
                GOTO(out, rc = -ENOMEM);
 
-        /* Set up the remote catalog handle */
-        ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
-        if (ctxt == NULL)
-                GOTO(out, rc = -ENOENT);
+       /* Set up the remote catalog handle */
+       ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
+       if (ctxt == NULL)
+               GOTO(out, rc = -ENOENT);
        rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
                       LLOG_OPEN_EXISTS);
        if (rc) {
                CERROR("%s: fail to open changelog catalog: rc = %d\n",
-                      cs->cs_obd->obd_name, rc);
+                      cs_obd_name(cs), rc);
                GOTO(out, rc);
        }
-       rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL);
+
+       if (cs->cs_flags & CHANGELOG_FLAG_JOBID)
+               flags |= LLOG_F_EXT_JOBID;
+
+       rc = llog_init_handle(NULL, llh, flags, NULL);
        if (rc) {
                CERROR("llog_init_handle failed %d\n", rc);
                GOTO(out, rc);
@@ -2082,19 +1982,17 @@ static int mdc_changelog_send_thread(void *csdata)
 
        rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
 
-        /* Send EOF no matter what our result */
-        if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch),
-                                      cs->cs_flags))) {
-                kuch->kuc_msgtype = CL_EOF;
-                libcfs_kkuc_msg_put(cs->cs_fp, kuch);
-        }
+       /* Send EOF no matter what our result */
+       kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags);
+       kuch->kuc_msgtype = CL_EOF;
+       libcfs_kkuc_msg_put(cs->cs_fp, kuch);
 
 out:
        fput(cs->cs_fp);
        if (llh)
                llog_cat_close(NULL, llh);
-        if (ctxt)
-                llog_ctxt_put(ctxt);
+       if (ctxt)
+               llog_ctxt_put(ctxt);
        if (cs->cs_buf)
                OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
        OBD_FREE_PTR(cs);
@@ -2128,12 +2026,12 @@ static int mdc_ioc_changelog_send(struct obd_device *obd,
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
                CERROR("%s: cannot start changelog thread: rc = %d\n",
-                      obd->obd_name, rc);
+                      cs_obd_name(cs), rc);
                OBD_FREE_PTR(cs);
        } else {
                rc = 0;
-               CDEBUG(D_CHANGELOG, "%s: started changelog thread\n",
-                      obd->obd_name);
+               CDEBUG(D_HSM, "%s: started changelog thread\n",
+                      cs_obd_name(cs));
        }
 
        return rc;
@@ -2286,7 +2184,7 @@ out:
 }
 
 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
-                         void *karg, void *uarg)
+                        void *karg, void __user *uarg)
 {
         struct obd_device *obd = exp->exp_obd;
         struct obd_ioctl_data *data = karg;
@@ -2420,9 +2318,9 @@ out:
        return rc;
 }
 
-int mdc_get_info_rpc(struct obd_export *exp,
-                     obd_count keylen, void *key,
-                     int vallen, void *val)
+static int mdc_get_info_rpc(struct obd_export *exp,
+                           u32 keylen, void *key,
+                           u32 vallen, void *val)
 {
         struct obd_import      *imp = class_exp2cliimp(exp);
         struct ptlrpc_request  *req;
@@ -2437,7 +2335,7 @@ int mdc_get_info_rpc(struct obd_export *exp,
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
                              RCL_CLIENT, keylen);
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
-                             RCL_CLIENT, sizeof(__u32));
+                            RCL_CLIENT, sizeof(vallen));
 
         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
         if (rc) {
@@ -2448,7 +2346,7 @@ int mdc_get_info_rpc(struct obd_export *exp,
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
         memcpy(tmp, key, keylen);
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
-        memcpy(tmp, &vallen, sizeof(__u32));
+       memcpy(tmp, &vallen, sizeof(vallen));
 
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
                              RCL_SERVER, vallen);
@@ -2485,7 +2383,7 @@ static void lustre_swab_hai(struct hsm_action_item *h)
 static void lustre_swab_hal(struct hsm_action_list *h)
 {
        struct hsm_action_item  *hai;
-       int                      i;
+       __u32                    i;
 
        __swab32s(&h->hal_version);
        __swab32s(&h->hal_count);
@@ -2534,7 +2432,7 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
  * @param val KUC message (kuc_hdr + hsm_action_list)
  * @param len total length of message
  */
-static int mdc_hsm_copytool_send(int len, void *val)
+static int mdc_hsm_copytool_send(size_t len, void *val)
 {
        struct kuc_hdr          *lh = (struct kuc_hdr *)val;
        struct hsm_action_list  *hal = (struct hsm_action_list *)(lh + 1);
@@ -2542,8 +2440,8 @@ static int mdc_hsm_copytool_send(int len, void *val)
        ENTRY;
 
        if (len < sizeof(*lh) + sizeof(*hal)) {
-               CERROR("Short HSM message %d < %d\n", len,
-                      (int) (sizeof(*lh) + sizeof(*hal)));
+               CERROR("Short HSM message %zu < %zu\n", len,
+                      sizeof(*lh) + sizeof(*hal));
                RETURN(-EPROTO);
        }
        if (lh->kuc_magic == __swab16(KUC_MAGIC)) {
@@ -2602,11 +2500,11 @@ static int mdc_kuc_reregister(struct obd_import *imp)
                                         (void *)imp);
 }
 
-int mdc_set_info_async(const struct lu_env *env,
-                      struct obd_export *exp,
-                      obd_count keylen, void *key,
-                      obd_count vallen, void *val,
-                      struct ptlrpc_request_set *set)
+static int mdc_set_info_async(const struct lu_env *env,
+                             struct obd_export *exp,
+                             u32 keylen, void *key,
+                             u32 vallen, void *val,
+                             struct ptlrpc_request_set *set)
 {
        struct obd_import       *imp = class_exp2cliimp(exp);
        int                      rc;
@@ -2650,29 +2548,35 @@ int mdc_set_info_async(const struct lu_env *env,
                 RETURN(rc);
         }
 
+       if (KEY_IS(KEY_DEFAULT_EASIZE)) {
+               __u32 *default_easize = val;
+
+               exp->exp_obd->u.cli.cl_default_mds_easize = *default_easize;
+               RETURN(0);
+       }
+
        CERROR("Unknown key %s\n", (char *)key);
        RETURN(-EINVAL);
 }
 
-int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
-                __u32 keylen, void *key, __u32 *vallen, void *val,
-                struct lov_stripe_md *lsm)
+static int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
+                       __u32 keylen, void *key, __u32 *vallen, void *val)
 {
        int rc = -EINVAL;
 
        if (KEY_IS(KEY_MAX_EASIZE)) {
-               int mdsize, *max_easize;
+               __u32 mdsize, *max_easize;
 
                if (*vallen != sizeof(int))
                        RETURN(-EINVAL);
-               mdsize = *(int *)val;
+               mdsize = *(__u32 *)val;
                if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize)
                        exp->exp_obd->u.cli.cl_max_mds_easize = mdsize;
                max_easize = val;
                *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize;
                RETURN(0);
        } else if (KEY_IS(KEY_DEFAULT_EASIZE)) {
-               int *default_easize;
+               __u32 *default_easize;
 
                if (*vallen != sizeof(int))
                        RETURN(-EINVAL);
@@ -2680,18 +2584,15 @@ int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
                *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize;
                RETURN(0);
        } else if (KEY_IS(KEY_MAX_COOKIESIZE)) {
-               int mdsize, *max_cookiesize;
+               __u32 *max_cookiesize;
 
                if (*vallen != sizeof(int))
                        RETURN(-EINVAL);
-               mdsize = *(int *)val;
-               if (mdsize > exp->exp_obd->u.cli.cl_max_mds_cookiesize)
-                       exp->exp_obd->u.cli.cl_max_mds_cookiesize = mdsize;
                max_cookiesize = val;
                *max_cookiesize = exp->exp_obd->u.cli.cl_max_mds_cookiesize;
                RETURN(0);
        } else if (KEY_IS(KEY_DEFAULT_COOKIESIZE)) {
-               int *default_cookiesize;
+               __u32 *default_cookiesize;
 
                if (*vallen != sizeof(int))
                        RETURN(-EINVAL);
@@ -2709,7 +2610,7 @@ int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
                 *data = imp->imp_connect_data;
                 RETURN(0);
         } else if (KEY_IS(KEY_TGT_COUNT)) {
-                *((int *)val) = 1;
+               *((__u32 *)val) = 1;
                 RETURN(0);
         }
 
@@ -2718,8 +2619,8 @@ int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
         RETURN(rc);
 }
 
-int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid,
-             struct obd_capa *oc, struct ptlrpc_request **request)
+static int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid,
+                    struct obd_capa *oc, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int                    rc;
@@ -2812,7 +2713,8 @@ int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp,
        RETURN(seq_client_alloc_fid(env, seq, fid));
 }
 
-struct obd_uuid *mdc_get_uuid(struct obd_export *exp) {
+static struct obd_uuid *mdc_get_uuid(struct obd_export *exp)
+{
         struct client_obd *cli = &exp->exp_obd->u.cli;
         return &cli->cl_target_uuid;
 }
@@ -2844,10 +2746,43 @@ static int mdc_resource_inode_free(struct ldlm_resource *res)
        return 0;
 }
 
-struct ldlm_valblock_ops inode_lvbo = {
+static struct ldlm_valblock_ops inode_lvbo = {
        .lvbo_free = mdc_resource_inode_free
 };
 
+static int mdc_llog_init(struct obd_device *obd)
+{
+       struct obd_llog_group   *olg = &obd->obd_olg;
+       struct llog_ctxt        *ctxt;
+       int                      rc;
+
+       ENTRY;
+
+       rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, obd,
+                       &llog_client_ops);
+       if (rc < 0)
+               RETURN(rc);
+
+       ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
+       llog_initiator_connect(ctxt);
+       llog_ctxt_put(ctxt);
+
+       RETURN(0);
+}
+
+static void mdc_llog_finish(struct obd_device *obd)
+{
+       struct llog_ctxt *ctxt;
+
+       ENTRY;
+
+       ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
+       if (ctxt != NULL)
+               llog_cleanup(NULL, ctxt);
+
+       EXIT;
+}
+
 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 {
        struct client_obd               *cli = &obd->u.cli;
@@ -2871,9 +2806,9 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
         rc = client_obd_setup(obd, cfg);
         if (rc)
                 GOTO(err_close_lock, rc);
-#ifdef LPROCFS
+#ifdef CONFIG_PROC_FS
        obd->obd_vars = lprocfs_mdc_obd_vars;
-       lprocfs_seq_obd_setup(obd);
+       lprocfs_obd_setup(obd);
        lprocfs_alloc_md_stats(obd, 0);
 #endif
        sptlrpc_lprocfs_cliobd_attach(obd);
@@ -2883,7 +2818,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 
        obd->obd_namespace->ns_lvbo = &inode_lvbo;
 
-        rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
+       rc = mdc_llog_init(obd);
         if (rc) {
                 mdc_cleanup(obd);
                 CERROR("failed to setup llogging subsystems\n");
@@ -2908,8 +2843,9 @@ err_rpc_lock:
  * a large number of stripes is possible.  If a larger reply buffer is
  * required it will be reallocated in the ptlrpc layer due to overflow.
  */
-static int mdc_init_ea_size(struct obd_export *exp, int easize,
-                           int def_easize, int cookiesize, int def_cookiesize)
+static int mdc_init_ea_size(struct obd_export *exp, __u32 easize,
+                           __u32 def_easize, __u32 cookiesize,
+                           __u32 def_cookiesize)
 {
        struct obd_device *obd = exp->exp_obd;
        struct client_obd *cli = &obd->u.cli;
@@ -2947,10 +2883,7 @@ static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                 ptlrpc_lprocfs_unregister_obd(obd);
                 lprocfs_obd_cleanup(obd);
                lprocfs_free_md_stats(obd);
-
-                rc = obd_llog_finish(obd, 0);
-                if (rc != 0)
-                        CERROR("failed to cleanup llogging subsystems\n");
+               mdc_llog_finish(obd);
                 break;
         }
         RETURN(rc);
@@ -2968,55 +2901,18 @@ static int mdc_cleanup(struct obd_device *obd)
         return client_obd_cleanup(obd);
 }
 
-
-static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
-                         struct obd_device *tgt, int *index)
-{
-       struct llog_ctxt        *ctxt;
-       int                      rc;
-
-       ENTRY;
-
-       LASSERT(olg == &obd->obd_olg);
-
-       rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt,
-                       &llog_client_ops);
-       if (rc)
-               RETURN(rc);
-
-       ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT);
-       llog_initiator_connect(ctxt);
-       llog_ctxt_put(ctxt);
-
-       RETURN(0);
-}
-
-static int mdc_llog_finish(struct obd_device *obd, int count)
-{
-       struct llog_ctxt *ctxt;
-
-       ENTRY;
-
-       ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT);
-       if (ctxt)
-               llog_cleanup(NULL, ctxt);
-
-       RETURN(0);
-}
-
-static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
+static int mdc_process_config(struct obd_device *obd, size_t len, void *buf)
 {
         struct lustre_cfg *lcfg = buf;
-       int rc = class_process_proc_seq_param(PARAM_MDC, obd->obd_vars,
-                                             lcfg, obd);
+       int rc = class_process_proc_param(PARAM_MDC, obd->obd_vars, lcfg, obd);
        return (rc > 0 ? 0: rc);
 }
 
 
 /* get remote permission for current user on fid */
-int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
-                        struct obd_capa *oc, __u32 suppgid,
-                        struct ptlrpc_request **request)
+static int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid,
+                              struct obd_capa *oc, __u32 suppgid,
+                              struct ptlrpc_request **request)
 {
         struct ptlrpc_request  *req;
         int                    rc;
@@ -3107,7 +3003,7 @@ static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc,
         RETURN(0);
 }
 
-struct obd_ops mdc_obd_ops = {
+static struct obd_ops mdc_obd_ops = {
         .o_owner            = THIS_MODULE,
         .o_setup            = mdc_setup,
         .o_precleanup       = mdc_precleanup,
@@ -3123,8 +3019,6 @@ struct obd_ops mdc_obd_ops = {
        .o_fid_fini         = client_fid_fini,
         .o_fid_alloc        = mdc_fid_alloc,
         .o_import_event     = mdc_import_event,
-        .o_llog_init        = mdc_llog_init,
-        .o_llog_finish      = mdc_llog_finish,
         .o_get_info         = mdc_get_info,
         .o_process_config   = mdc_process_config,
         .o_get_uuid         = mdc_get_uuid,
@@ -3132,13 +3026,12 @@ struct obd_ops mdc_obd_ops = {
         .o_quotacheck       = mdc_quotacheck
 };
 
-struct md_ops mdc_md_ops = {
+static struct md_ops mdc_md_ops = {
         .m_getstatus        = mdc_getstatus,
         .m_null_inode      = mdc_null_inode,
         .m_find_cbdata      = mdc_find_cbdata,
         .m_close            = mdc_close,
         .m_create           = mdc_create,
-        .m_done_writing     = mdc_done_writing,
         .m_enqueue          = mdc_enqueue,
         .m_getattr          = mdc_getattr,
         .m_getattr_name     = mdc_getattr_name,
@@ -3166,12 +3059,9 @@ struct md_ops mdc_md_ops = {
         .m_revalidate_lock      = mdc_revalidate_lock
 };
 
-int __init mdc_init(void)
+static int __init mdc_init(void)
 {
        return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL,
-#ifndef HAVE_ONLY_PROCFS_SEQ
-                                  NULL,
-#endif
                                   LUSTRE_MDC_NAME, NULL);
 }