Whamcloud - gitweb
LU-3365 lmv: support DNE with HSM.
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 2853268..50f9600 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -41,6 +41,7 @@
 # include <linux/pagemap.h>
 # include <linux/miscdevice.h>
 # include <linux/init.h>
+# include <linux/utsname.h>
 #else
 # include <liblustre.h>
 #endif
@@ -48,7 +49,6 @@
 #include <lustre_acl.h>
 #include <obd_class.h>
 #include <lustre_fid.h>
-#include <md_object.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <lustre_log.h>
@@ -87,6 +87,24 @@ int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
         }
 }
 
+static inline int mdc_queue_wait(struct ptlrpc_request *req)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       int rc;
+
+       /* mdc_enter_request() ensures that this client has no more
+        * than cl_max_rpcs_in_flight RPCs simultaneously inf light
+        * against an MDT. */
+       rc = mdc_enter_request(cli);
+       if (rc != 0)
+               return rc;
+
+       rc = ptlrpc_queue_wait(req);
+       mdc_exit_request(cli);
+
+       return rc;
+}
+
 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
 static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
@@ -208,6 +226,11 @@ int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
         int                    rc;
         ENTRY;
 
+       /* Single MDS without an LMV case */
+       if (op_data->op_flags & MF_GET_MDT_IDX) {
+               op_data->op_mds = 0;
+               RETURN(0);
+       }
         *request = NULL;
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
         if (req == NULL)
@@ -448,7 +471,7 @@ static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md)
         if (!buf)
                 RETURN(-EPROTO);
 
-        acl = posix_acl_from_xattr(buf, body->aclsize);
+        acl = posix_acl_from_xattr(&init_user_ns, buf, body->aclsize);
         if (IS_ERR(acl)) {
                 rc = PTR_ERR(acl);
                 CERROR("convert xattr to acl: %d\n", rc);
@@ -992,9 +1015,9 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
         req->rq_request_portal = MDS_READPAGE_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
-        desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
-        if (desc == NULL)
-                GOTO(out, rc = -ENOMEM);
+       desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL);
+       if (desc == NULL)
+               GOTO(out, rc = -ENOMEM);
 
         /* NB req now owns desc and will free it when it gets freed. */
         ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset);
@@ -1044,8 +1067,8 @@ restart_bulk:
         req->rq_request_portal = MDS_READPAGE_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
-        desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, BULK_PUT_SINK,
-                                    MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK,
+                                   MDS_BULK_PORTAL);
         if (desc == NULL) {
                 ptlrpc_request_free(req);
                 RETURN(-ENOMEM);
@@ -1053,10 +1076,10 @@ restart_bulk:
 
         /* NB req now owns desc and will free it when it gets freed */
         for (i = 0; i < op_data->op_npages; i++)
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, CFS_PAGE_SIZE);
+               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
 
         mdc_readdir_pack(req, op_data->op_offset,
-                         CFS_PAGE_SIZE * op_data->op_npages,
+                        PAGE_CACHE_SIZE * op_data->op_npages,
                          &op_data->op_fid1, op_data->op_capa1);
 
         ptlrpc_request_set_replen(req);
@@ -1087,7 +1110,7 @@ restart_bulk:
         if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) {
                 CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
                         req->rq_bulk->bd_nob_transferred,
-                        CFS_PAGE_SIZE * op_data->op_npages);
+                       PAGE_CACHE_SIZE * op_data->op_npages);
                 ptlrpc_req_finished(req);
                 RETURN(-EPROTO);
         }
@@ -1180,9 +1203,9 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
         /* Val is struct getinfo_fid2path result plus path */
         vallen = sizeof(*gf) + gf->gf_pathlen;
 
-        rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
-        if (rc)
-                GOTO(out, rc);
+       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
+       if (rc != 0 && rc != -EREMOTE)
+               GOTO(out, rc);
 
         if (vallen <= sizeof(*gf))
                 GOTO(out, rc = -EPROTO);
@@ -1211,14 +1234,19 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        /* Copy hsm_progress struct */
        req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
-       LASSERT(req_hpk);
+       if (req_hpk == NULL)
+               GOTO(out, rc = -EPROTO);
+
        *req_hpk = *hpk;
+       req_hpk->hpk_errval = lustre_errno_hton(hpk->hpk_errval);
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1238,15 +1266,19 @@ static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        /* Copy hsm_progress struct */
        archive_mask = req_capsule_client_get(&req->rq_pill,
                                              &RMF_MDS_HSM_ARCHIVE);
-       LASSERT(archive_mask);
+       if (archive_mask == NULL)
+               GOTO(out, rc = -EPROTO);
+
        *archive_mask = archives;
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1280,7 +1312,7 @@ static int mdc_ioc_hsm_current_action(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        if (rc)
                GOTO(out, rc);
 
@@ -1309,9 +1341,11 @@ static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1322,7 +1356,6 @@ static int mdc_ioc_hsm_state_get(struct obd_export *exp,
                                 struct md_op_data *op_data)
 {
        struct hsm_user_state   *hus = op_data->op_data;
-       struct obd_import       *imp = class_exp2cliimp(exp);
        struct hsm_user_state   *req_hus;
        struct ptlrpc_request   *req;
        int                      rc;
@@ -1346,13 +1379,9 @@ static int mdc_ioc_hsm_state_get(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
-       if (rc) {
-               /* check connection error first */
-               if (imp->imp_connect_error)
-                       rc = imp->imp_connect_error;
+       rc = mdc_queue_wait(req);
+       if (rc)
                GOTO(out, rc);
-       }
 
        req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE);
        if (req_hus == NULL)
@@ -1370,7 +1399,6 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
                                 struct md_op_data *op_data)
 {
        struct hsm_state_set    *hss = op_data->op_data;
-       struct obd_import       *imp = class_exp2cliimp(exp);
        struct hsm_state_set    *req_hss;
        struct ptlrpc_request   *req;
        int                      rc;
@@ -1394,18 +1422,14 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
 
        /* Copy states */
        req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
-       LASSERT(req_hss);
+       if (req_hss == NULL)
+               GOTO(out, rc = -EPROTO);
        *req_hss = *hss;
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
-       if (rc) {
-               /* check connection error first */
-               if (imp->imp_connect_error)
-                       rc = imp->imp_connect_error;
-               GOTO(out, rc);
-       }
+       rc = mdc_queue_wait(req);
+       GOTO(out, rc);
 
        EXIT;
 out:
@@ -1413,6 +1437,63 @@ out:
        return rc;
 }
 
+static int mdc_ioc_hsm_request(struct obd_export *exp,
+                              struct hsm_user_request *hur)
+{
+       struct obd_import       *imp = class_exp2cliimp(exp);
+       struct ptlrpc_request   *req;
+       struct hsm_request      *req_hr;
+       struct hsm_user_item    *req_hui;
+       char                    *req_opaque;
+       int                      rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT,
+                            hur->hur_request.hr_itemcount
+                            * sizeof(struct hsm_user_item));
+       req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT,
+                            hur->hur_request.hr_data_len);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
+       /* Copy hsm_request struct */
+       req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
+       if (req_hr == NULL)
+               GOTO(out, rc = -EPROTO);
+       *req_hr = hur->hur_request;
+
+       /* Copy hsm_user_item structs */
+       req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
+       if (req_hui == NULL)
+               GOTO(out, rc = -EPROTO);
+       memcpy(req_hui, hur->hur_user_item,
+              hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
+
+       /* Copy opaque field */
+       req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
+       if (req_opaque == NULL)
+               GOTO(out, rc = -EPROTO);
+       memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
+
+       ptlrpc_request_set_replen(req);
+
+       rc = mdc_queue_wait(req);
+       GOTO(out, rc);
+
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
 
 static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
 {
@@ -1431,35 +1512,36 @@ static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
 #define D_CHANGELOG 0
 
 struct changelog_show {
-        __u64       cs_startrec;
-        __u32       cs_flags;
-        cfs_file_t *cs_fp;
-        char       *cs_buf;
-        struct obd_device *cs_obd;
+       __u64           cs_startrec;
+       __u32           cs_flags;
+       struct file     *cs_fp;
+       char            *cs_buf;
+       struct obd_device *cs_obd;
 };
 
-static int changelog_show_cb(const struct lu_env *env, struct llog_handle *llh,
+static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh,
                             struct llog_rec_hdr *hdr, void *data)
 {
-        struct changelog_show *cs = data;
-        struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
-        struct kuc_hdr *lh;
-        int len, rc;
-        ENTRY;
+       struct changelog_show *cs = data;
+       struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
+       struct kuc_hdr *lh;
+       int len, rc;
+       ENTRY;
 
-        if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) ||
-            (rec->cr.cr_type >= CL_LAST)) {
-                CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type,
-                       rec->cr.cr_type);
-                RETURN(-EINVAL);
-        }
+       if (rec->cr_hdr.lrh_type != CHANGELOG_REC) {
+               rc = -EINVAL;
+               CERROR("%s: not a changelog rec %x/%d: rc = %d\n",
+                      cs->cs_obd->obd_name, rec->cr_hdr.lrh_type,
+                      rec->cr.cr_type, rc);
+               RETURN(rc);
+       }
 
-        if (rec->cr.cr_index < cs->cs_startrec) {
-                /* Skip entries earlier than what we are interested in */
-                CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n",
-                       rec->cr.cr_index, cs->cs_startrec);
-                RETURN(0);
-        }
+       if (rec->cr.cr_index < cs->cs_startrec) {
+               /* Skip entries earlier than what we are interested in */
+               CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n",
+                      rec->cr.cr_index, cs->cs_startrec);
+               RETURN(0);
+       }
 
        CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID
                " %.*s\n", rec->cr.cr_index, rec->cr.cr_type,
@@ -1491,13 +1573,6 @@ static int mdc_changelog_send_thread(void *csdata)
         CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n",
                cs->cs_fp, cs->cs_startrec);
 
-        /*
-         * It's important to daemonize here to close unused FDs.
-         * The write fd from pipe is already opened by the caller,
-         * so it's fine to clear all files here
-         */
-        cfs_daemonize("mdc_clg_send_thread");
-
         OBD_ALLOC(cs->cs_buf, CR_MAXSIZE);
         if (cs->cs_buf == NULL)
                 GOTO(out, rc = -ENOMEM);
@@ -1519,7 +1594,7 @@ static int mdc_changelog_send_thread(void *csdata)
                GOTO(out, rc);
        }
 
-       rc = llog_cat_process(NULL, llh, changelog_show_cb, cs, 0, 0);
+       rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
 
         /* Send EOF no matter what our result */
         if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch),
@@ -1529,17 +1604,15 @@ static int mdc_changelog_send_thread(void *csdata)
         }
 
 out:
-        cfs_put_file(cs->cs_fp);
-        if (llh)
+       fput(cs->cs_fp);
+       if (llh)
                llog_cat_close(NULL, llh);
         if (ctxt)
                 llog_ctxt_put(ctxt);
-        if (cs->cs_buf)
-                OBD_FREE(cs->cs_buf, CR_MAXSIZE);
-        OBD_FREE_PTR(cs);
-        /* detach from parent process so we get cleaned up */
-        cfs_daemonize("cl_send");
-        return rc;
+       if (cs->cs_buf)
+               OBD_FREE(cs->cs_buf, CR_MAXSIZE);
+       OBD_FREE_PTR(cs);
+       return rc;
 }
 
 static int mdc_ioc_changelog_send(struct obd_device *obd,
@@ -1553,19 +1626,22 @@ static int mdc_ioc_changelog_send(struct obd_device *obd,
         if (!cs)
                 return -ENOMEM;
 
-        cs->cs_obd = obd;
-        cs->cs_startrec = icc->icc_recno;
-        /* matching cfs_put_file in mdc_changelog_send_thread */
-        cs->cs_fp = cfs_get_fd(icc->icc_id);
-        cs->cs_flags = icc->icc_flags;
-
-        /* New thread because we should return to user app before
-           writing into our pipe */
-        rc = cfs_create_thread(mdc_changelog_send_thread, cs, CFS_DAEMON_FLAGS);
-        if (rc >= 0) {
-                CDEBUG(D_CHANGELOG, "start changelog thread: %d\n", rc);
-                return 0;
-        }
+       cs->cs_obd = obd;
+       cs->cs_startrec = icc->icc_recno;
+       /* matching fput in mdc_changelog_send_thread */
+       cs->cs_fp = fget(icc->icc_id);
+       cs->cs_flags = icc->icc_flags;
+
+       /*
+        * New thread because we should return to user app before
+        * writing into our pipe
+        */
+       rc = PTR_ERR(kthread_run(mdc_changelog_send_thread, cs,
+                                "mdc_clg_send_thread"));
+       if (!IS_ERR_VALUE(rc)) {
+               CDEBUG(D_CHANGELOG, "start changelog thread\n");
+               return 0;
+       }
 
         CERROR("Failed to start changelog thread: %d\n", rc);
         OBD_FREE_PTR(cs);
@@ -1659,6 +1735,63 @@ static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
         RETURN(rc);
 }
 
+static int mdc_ioc_swap_layouts(struct obd_export *exp,
+                               struct md_op_data *op_data)
+{
+       CFS_LIST_HEAD(cancels);
+       struct ptlrpc_request   *req;
+       int                      rc, count;
+       struct mdc_swap_layouts *msl, *payload;
+       ENTRY;
+
+       msl = op_data->op_data;
+
+       /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
+        * first thing it will do is to cancel the 2 layout
+        * locks hold by this client.
+        * So the client must cancel its layout locks on the 2 fids
+        * with the request RPC to avoid extra RPC round trips
+        */
+       count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
+                                       LCK_CR, MDS_INODELOCK_LAYOUT);
+       count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
+                                        LCK_CR, MDS_INODELOCK_LAYOUT);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_MDS_SWAP_LAYOUTS);
+       if (req == NULL) {
+               ldlm_lock_list_put(&cancels, l_bl_ast, count);
+               RETURN(-ENOMEM);
+       }
+
+       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+       mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
+
+       rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       mdc_swap_layouts_pack(req, op_data);
+
+       payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
+       LASSERT(payload);
+
+       *payload = *msl;
+
+       ptlrpc_request_set_replen(req);
+
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
+       EXIT;
+
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg)
 {
@@ -1691,6 +1824,9 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                GOTO(out, rc);
        case LL_IOC_HSM_CT_START:
                rc = mdc_ioc_hsm_ct_start(exp, karg);
+               /* ignore if it was already registered on this MDS. */
+               if (rc == -EEXIST)
+                       rc = 0;
                GOTO(out, rc);
        case LL_IOC_HSM_PROGRESS:
                rc = mdc_ioc_hsm_progress(exp, karg);
@@ -1703,6 +1839,9 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
        case LL_IOC_HSM_ACTION:
                rc = mdc_ioc_hsm_current_action(exp, karg);
                GOTO(out, rc);
+       case LL_IOC_HSM_REQUEST:
+               rc = mdc_ioc_hsm_request(exp, karg);
+               GOTO(out, rc);
         case OBD_IOC_CLIENT_RECOVER:
                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
                 if (rc < 0)
@@ -1747,18 +1886,18 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         GOTO(out, rc = -ENODEV);
 
                 /* copy UUID */
-                if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
-                                     min((int) data->ioc_plen2,
-                                         (int) sizeof(struct obd_uuid))))
-                        GOTO(out, rc = -EFAULT);
-
-                rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf,
-                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
-                                0);
-                if (rc != 0)
-                        GOTO(out, rc);
-
-                if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf,
+               if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd),
+                                min((int)data->ioc_plen2,
+                                    (int)sizeof(struct obd_uuid))))
+                       GOTO(out, rc = -EFAULT);
+
+               rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf,
+                               cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
+                               0);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               if (copy_to_user(data->ioc_pbuf1, &stat_buf,
                                      min((int) data->ioc_plen1,
                                          (int) sizeof(stat_buf))))
                         GOTO(out, rc = -EFAULT);
@@ -1783,13 +1922,18 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 OBD_FREE_PTR(oqctl);
                 break;
         }
-        case LL_IOC_GET_CONNECT_FLAGS: {
-                if (cfs_copy_to_user(uarg, &exp->exp_connect_flags,
-                                     sizeof(__u64)))
-                        GOTO(out, rc = -EFAULT);
-                else
-                        GOTO(out, rc = 0);
-        }
+       case LL_IOC_GET_CONNECT_FLAGS: {
+               if (copy_to_user(uarg,
+                                    exp_connect_flags_ptr(exp),
+                                    sizeof(__u64)))
+                       GOTO(out, rc = -EFAULT);
+               else
+                       GOTO(out, rc = 0);
+       }
+       case LL_IOC_LOV_SWAP_LAYOUTS: {
+               rc = mdc_ioc_swap_layouts(exp, karg);
+               break;
+       }
         default:
                 CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
                 GOTO(out, rc = -ENOTTY);
@@ -1834,19 +1978,20 @@ int mdc_get_info_rpc(struct obd_export *exp,
                              RCL_SERVER, vallen);
         ptlrpc_request_set_replen(req);
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc == 0) {
-                tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
-                memcpy(val, tmp, vallen);
-                if (ptlrpc_rep_need_swab(req)) {
-                        if (KEY_IS(KEY_FID2PATH)) {
-                                lustre_swab_fid2path(val);
-                        }
-                }
-        }
-        ptlrpc_req_finished(req);
+       rc = ptlrpc_queue_wait(req);
+       /* -EREMOTE means the get_info result is partial, and it needs to
+        * continue on another MDT, see fid2path part in lmv_iocontrol */
+       if (rc == 0 || rc == -EREMOTE) {
+               tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
+               memcpy(val, tmp, vallen);
+               if (ptlrpc_rep_need_swab(req)) {
+                       if (KEY_IS(KEY_FID2PATH))
+                               lustre_swab_fid2path(val);
+               }
+       }
+       ptlrpc_req_finished(req);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static void lustre_swab_hai(struct hsm_action_item *h)
@@ -1868,7 +2013,7 @@ static void lustre_swab_hal(struct hsm_action_list *h)
 
        __swab32s(&h->hal_version);
        __swab32s(&h->hal_count);
-       __swab32s(&h->hal_archive_num);
+       __swab32s(&h->hal_archive_id);
        __swab64s(&h->hal_flags);
        hai = hai_zero(h);
        for (i = 0; i < h->hal_count; i++) {
@@ -1901,18 +2046,10 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
               lk->lk_uid, lk->lk_group, lk->lk_flags);
 
        if (lk->lk_flags & LK_FLG_STOP) {
-               rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
                /* Unregister with the coordinator */
-               if (rc == 0)
-                       rc = mdc_ioc_hsm_ct_unregister(imp);
+               rc = mdc_ioc_hsm_ct_unregister(imp);
        } else {
-               cfs_file_t *fp = cfs_get_fd(lk->lk_wfd);
-               rc = libcfs_kkuc_group_add(fp, lk->lk_uid, lk->lk_group,
-                                          lk->lk_data);
-               if (rc && fp)
-                       cfs_put_file(fp);
-               if (rc == 0)
-                       rc = mdc_ioc_hsm_ct_register(imp, archive);
+               rc = mdc_ioc_hsm_ct_register(imp, archive);
        }
 
        return rc;
@@ -2237,7 +2374,7 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
         }
        case IMP_EVENT_ACTIVE:
                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
-               /* restore re-establish kuc registration after reconnecting */
+               /* redo the kuc registration after reconnecting */
                if (rc == 0)
                        rc = mdc_kuc_reregister(imp);
                break;
@@ -2287,6 +2424,18 @@ static int mdc_cancel_for_recovery(struct ldlm_lock *lock)
         RETURN(1);
 }
 
+static int mdc_resource_inode_free(struct ldlm_resource *res)
+{
+       if (res->lr_lvb_inode)
+               res->lr_lvb_inode = NULL;
+
+       return 0;
+}
+
+struct ldlm_valblock_ops inode_lvbo = {
+       lvbo_free: mdc_resource_inode_free
+};
+
 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -2316,6 +2465,8 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 
         ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery);
 
+       obd->obd_namespace->ns_lvbo = &inode_lvbo;
+
         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
         if (rc) {
                 mdc_cleanup(obd);
@@ -2591,7 +2742,7 @@ struct obd_ops mdc_obd_ops = {
 
 struct md_ops mdc_md_ops = {
         .m_getstatus        = mdc_getstatus,
-        .m_change_cbdata    = mdc_change_cbdata,
+        .m_null_inode      = mdc_null_inode,
         .m_find_cbdata      = mdc_find_cbdata,
         .m_close            = mdc_close,
         .m_create           = mdc_create,