Whamcloud - gitweb
LU-5939 hsm: make HSM modification requests replayable
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 7fb41ec..56fcc0d 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <linux/miscdevice.h>
 #include <linux/init.h>
 #include <linux/utsname.h>
+#include <linux/kthread.h>
+#include <linux/user_namespace.h>
+#ifdef HAVE_UIDGID_HEADER
+# include <linux/uidgid.h>
+#endif
 
 #include <lustre_acl.h>
 #include <lustre_ioctl.h>
@@ -50,6 +55,7 @@
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <lustre_log.h>
+#include <lustre_kernelcomm.h>
 #include <cl_object.h>
 
 #include "mdc_internal.h"
@@ -308,11 +314,11 @@ static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
 }
 
 static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
-                            const struct lu_fid *fid,
-                            struct obd_capa *oc, int opcode, obd_valid valid,
-                            const char *xattr_name, const char *input,
-                            int input_size, int output_size, int flags,
-                            __u32 suppgid, struct ptlrpc_request **request)
+                           const struct lu_fid *fid,
+                           struct obd_capa *oc, int opcode, u64 valid,
+                           const char *xattr_name, const char *input,
+                           int input_size, int output_size, int flags,
+                           __u32 suppgid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         int   xattr_namelen = 0;
@@ -419,7 +425,7 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
 }
 
 static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
-                       struct obd_capa *oc, obd_valid valid,
+                       struct obd_capa *oc, u64 valid,
                        const char *xattr_name,
                        const char *input, int input_size, int output_size,
                        int flags, __u32 suppgid,
@@ -432,7 +438,7 @@ static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid,
 }
 
 static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
-                       struct obd_capa *oc, obd_valid valid,
+                       struct obd_capa *oc, u64 valid,
                        const char *xattr_name,
                        const char *input, int input_size, int output_size,
                        int flags, struct ptlrpc_request **request)
@@ -634,10 +640,6 @@ int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
         RETURN(0);
 }
 
-/**
- * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING
- * RPC chains.
- */
 void mdc_replay_open(struct ptlrpc_request *req)
 {
         struct md_open_data *mod = req->rq_cb_data;
@@ -674,17 +676,18 @@ void mdc_replay_open(struct ptlrpc_request *req)
                 __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
                 struct mdt_ioepoch *epoch;
 
-                LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING);
-                epoch = req_capsule_client_get(&close_req->rq_pill,
-                                               &RMF_MDT_EPOCH);
-                LASSERT(epoch);
+               LASSERT(opc == MDS_CLOSE);
+               epoch = req_capsule_client_get(&close_req->rq_pill,
+                                              &RMF_MDT_EPOCH);
+               LASSERT(epoch);
 
-                if (och != NULL)
-                        LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
-                DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
-               epoch->handle = body->mbo_handle;
-        }
-        EXIT;
+               if (och != NULL)
+                       LASSERT(!memcmp(&old, &epoch->mio_handle, sizeof(old)));
+
+               DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
+               epoch->mio_handle = body->mbo_handle;
+       }
+       EXIT;
 }
 
 void mdc_commit_open(struct ptlrpc_request *req)
@@ -820,22 +823,6 @@ int mdc_clear_open_replay_data(struct obd_export *exp,
         RETURN(0);
 }
 
-/* Prepares the request for the replay by the given reply */
-static void mdc_close_handle_reply(struct ptlrpc_request *req,
-                                   struct md_op_data *op_data, int rc) {
-        struct mdt_body  *repbody;
-        struct mdt_ioepoch *epoch;
-
-        if (req && rc == -EAGAIN) {
-                repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-                epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
-
-                epoch->flags |= MF_SOM_AU;
-               if (repbody->mbo_valid & OBD_MD_FLGETATTRLOCK)
-                        op_data->op_flags |= MF_GETATTR_LOCK;
-        }
-}
-
 static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                     struct md_open_data *mod, struct ptlrpc_request **request)
 {
@@ -950,125 +937,9 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
                 obd_mod_put(mod);
         }
         *request = req;
-        mdc_close_handle_reply(req, op_data, rc);
-        RETURN(rc < 0 ? rc : saved_rc);
-}
 
-static int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data,
-                           struct md_open_data *mod)
-{
-        struct obd_device     *obd = class_exp2obd(exp);
-        struct ptlrpc_request *req;
-        int                    rc;
-        ENTRY;
-
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
-                                   &RQF_MDS_DONE_WRITING);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
-        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-
-        if (mod != NULL) {
-                LASSERTF(mod->mod_open_req != NULL &&
-                         mod->mod_open_req->rq_type != LI_POISON,
-                         "POISONED setattr %p!\n", mod->mod_open_req);
-
-                mod->mod_close_req = req;
-                DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr");
-                /* We no longer want to preserve this setattr for replay even
-                 * though the open was committed. b=3632, b=3633 */
-               spin_lock(&mod->mod_open_req->rq_lock);
-               mod->mod_open_req->rq_replay = 0;
-               spin_unlock(&mod->mod_open_req->rq_lock);
-        }
-
-        mdc_close_pack(req, op_data);
-        ptlrpc_request_set_replen(req);
-
-        mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL);
-        rc = ptlrpc_queue_wait(req);
-        mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL);
-
-        if (rc == -ESTALE) {
-                /**
-                 * it can be allowed error after 3633 if open or setattr were
-                 * committed and server failed before close was sent.
-                 * Let's check if mod exists and return no error in that case
-                 */
-                if (mod) {
-                        LASSERT(mod->mod_open_req != NULL);
-                        if (mod->mod_open_req->rq_committed)
-                                rc = 0;
-                }
-        }
-
-        if (mod) {
-                if (rc != 0)
-                        mod->mod_close_req = NULL;
-               LASSERT(mod->mod_open_req != NULL);
-               mdc_free_open(mod);
-
-                /* Since now, mod is accessed through setattr req only,
-                 * thus DW req does not keep a reference on mod anymore. */
-                obd_mod_put(mod);
-        }
-
-        mdc_close_handle_reply(req, op_data, rc);
-        ptlrpc_req_finished(req);
-        RETURN(rc);
-}
-
-#ifdef HAVE_SPLIT_SUPPORT
-int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
-                 const struct page *page, int offset)
-{
-        struct ptlrpc_request   *req;
-        struct ptlrpc_bulk_desc *desc;
-        int                      rc;
-        ENTRY;
-
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_WRITEPAGE);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        /* FIXME: capa doesn't support split yet */
-        mdc_set_capa_size(req, &RMF_CAPA1, NULL);
-
-        rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_WRITEPAGE);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
-
-        req->rq_request_portal = MDS_READPAGE_PORTAL;
-        ptlrpc_at_set_req_timeout(req);
-
-       desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL);
-       if (desc == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-        /* NB req now owns desc and will free it when it gets freed. */
-        ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset);
-        mdc_readdir_pack(req, 0, offset, fid, NULL);
-
-        ptlrpc_request_set_replen(req);
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out, rc);
-
-        rc = sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
-out:
-        ptlrpc_req_finished(req);
-        return rc;
+       RETURN(rc < 0 ? rc : saved_rc);
 }
-EXPORT_SYMBOL(mdc_sendpage);
-#endif
 
 static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid,
                       __u64 offset, struct obd_capa *oc,
@@ -1253,7 +1124,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
  *  |        |
  * .|--------v-------   -----.
  * |s|e|f|p|ent|ent| ... |ent|
- * '--|--------------   -----'   Each CFS_PAGE contains a single
+ * '--|--------------   -----'   Each PAGE contains a single
  *    '------.                   lu_dirpage.
  * .---------v-------   -----.
  * |s|e|f|p|ent| 0 | ... | 0 |
@@ -1263,26 +1134,26 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
  * larger than LU_PAGE_SIZE, a single host page may contain multiple
  * lu_dirpages. After reading the lu_dirpages from the MDS, the
  * ldp_hash_end of the first lu_dirpage refers to the one immediately
- * after it in the same CFS_PAGE (arrows simplified for brevity, but
+ * after it in the same PAGE (arrows simplified for brevity, but
  * in general e0==s1, e1==s2, etc.):
  *
  * .--------------------   -----.
  * |s0|e0|f0|p|ent|ent| ... |ent|
  * |---v----------------   -----|
  * |s1|e1|f1|p|ent|ent| ... |ent|
- * |---v----------------   -----|  Here, each CFS_PAGE contains
+ * |---v----------------   -----|  Here, each PAGE contains
  *             ...                 multiple lu_dirpages.
  * |---v----------------   -----|
  * |s'|e'|f'|p|ent|ent| ... |ent|
  * '---|----------------   -----'
  *     v
  * .----------------------------.
- * |        next CFS_PAGE       |
+ * |        next PAGE           |
  *
  * This structure is transformed into a single logical lu_dirpage as follows:
  *
  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
- *   labeled 'next CFS_PAGE'.
+ *   labeled 'next PAGE'.
  *
  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
  *   a hash collision with the next page exists.
@@ -1311,8 +1182,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs)
                        /* Advance dp to next lu_dirpage. */
                        dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
 
-                       /* Check if we've reached the end of the CFS_PAGE. */
-                       if (!((unsigned long)dp & ~CFS_PAGE_MASK))
+                       /* Check if we've reached the end of the PAGE. */
+                       if (!((unsigned long)dp & ~PAGE_MASK))
                                break;
 
                        /* Save the hash and flags of this lu_dirpage. */
@@ -1352,6 +1223,14 @@ struct readpage_param {
        struct md_callback      *rp_cb;
 };
 
+#ifndef HAVE_DELETE_FROM_PAGE_CACHE
+static inline void delete_from_page_cache(struct page *page)
+{
+       remove_from_page_cache(page);
+       page_cache_release(page);
+}
+#endif
+
 /**
  * Read pages from server.
  *
@@ -1359,7 +1238,7 @@ struct readpage_param {
  * a header lu_dirpage which describes the start/end hash, and whether this
  * page is empty (contains no dir entry) or hash collide with next page.
  * After client receives reply, several pages will be integrated into dir page
- * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the
+ * in PAGE_SIZE (if PAGE_SIZE greater than LU_PAGE_SIZE), and the
  * lu_dirpage for this integrated page will be adjusted.
  **/
 static int mdc_read_page_remote(void *data, struct page *page0)
@@ -1401,7 +1280,10 @@ static int mdc_read_page_remote(void *data, struct page *page0)
 
        rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, op_data->op_capa1,
                         page_pool, npages, &req);
-       if (rc == 0) {
+       if (rc < 0) {
+               /* page0 is special, which was added into page cache early */
+               delete_from_page_cache(page0);
+       } else {
                int lu_pgs;
 
                rd_pgs = (req->rq_bulk->bd_nob_transferred +
@@ -1416,8 +1298,8 @@ static int mdc_read_page_remote(void *data, struct page *page0)
 
                SetPageUptodate(page0);
        }
-
        unlock_page(page0);
+
        ptlrpc_req_finished(req);
        CDEBUG(D_CACHE, "read %d/%d pages\n", rd_pgs, npages);
        for (i = 1; i < npages; i++) {
@@ -1691,7 +1573,7 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
         /* Val is struct getinfo_fid2path result plus path */
         vallen = sizeof(*gf) + gf->gf_pathlen;
 
-       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
+       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf);
        if (rc != 0 && rc != -EREMOTE)
                GOTO(out, rc);
 
@@ -1734,7 +1616,10 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
+       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1916,10 +1801,11 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
-       GOTO(out, rc);
+       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
 
-       EXIT;
+       GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
        return rc;
@@ -1975,7 +1861,10 @@ static int mdc_ioc_hsm_request(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = mdc_queue_wait(req);
+       mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+       rc = ptlrpc_queue_wait(req);
+       mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
+
        GOTO(out, rc);
 
 out:
@@ -2070,10 +1959,10 @@ static int mdc_changelog_send_thread(void *csdata)
        if (cs->cs_buf == NULL)
                GOTO(out, rc = -ENOMEM);
 
-        /* Set up the remote catalog handle */
-        ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
-        if (ctxt == NULL)
-                GOTO(out, rc = -ENOENT);
+       /* Set up the remote catalog handle */
+       ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT);
+       if (ctxt == NULL)
+               GOTO(out, rc = -ENOENT);
        rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG,
                       LLOG_OPEN_EXISTS);
        if (rc) {
@@ -2093,19 +1982,17 @@ static int mdc_changelog_send_thread(void *csdata)
 
        rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0);
 
-        /* Send EOF no matter what our result */
-        if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch),
-                                      cs->cs_flags))) {
-                kuch->kuc_msgtype = CL_EOF;
-                libcfs_kkuc_msg_put(cs->cs_fp, kuch);
-        }
+       /* Send EOF no matter what our result */
+       kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags);
+       kuch->kuc_msgtype = CL_EOF;
+       libcfs_kkuc_msg_put(cs->cs_fp, kuch);
 
 out:
        fput(cs->cs_fp);
        if (llh)
                llog_cat_close(NULL, llh);
-        if (ctxt)
-                llog_ctxt_put(ctxt);
+       if (ctxt)
+               llog_ctxt_put(ctxt);
        if (cs->cs_buf)
                OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE);
        OBD_FREE_PTR(cs);
@@ -2432,8 +2319,8 @@ out:
 }
 
 static int mdc_get_info_rpc(struct obd_export *exp,
-                           obd_count keylen, void *key,
-                           int vallen, void *val)
+                           u32 keylen, void *key,
+                           u32 vallen, void *val)
 {
         struct obd_import      *imp = class_exp2cliimp(exp);
         struct ptlrpc_request  *req;
@@ -2448,7 +2335,7 @@ static int mdc_get_info_rpc(struct obd_export *exp,
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY,
                              RCL_CLIENT, keylen);
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN,
-                             RCL_CLIENT, sizeof(__u32));
+                            RCL_CLIENT, sizeof(vallen));
 
         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO);
         if (rc) {
@@ -2459,7 +2346,7 @@ static int mdc_get_info_rpc(struct obd_export *exp,
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
         memcpy(tmp, key, keylen);
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN);
-        memcpy(tmp, &vallen, sizeof(__u32));
+       memcpy(tmp, &vallen, sizeof(vallen));
 
         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL,
                              RCL_SERVER, vallen);
@@ -2615,8 +2502,8 @@ static int mdc_kuc_reregister(struct obd_import *imp)
 
 static int mdc_set_info_async(const struct lu_env *env,
                              struct obd_export *exp,
-                             obd_count keylen, void *key,
-                             obd_count vallen, void *val,
+                             u32 keylen, void *key,
+                             u32 vallen, void *val,
                              struct ptlrpc_request_set *set)
 {
        struct obd_import       *imp = class_exp2cliimp(exp);
@@ -2661,13 +2548,19 @@ static int mdc_set_info_async(const struct lu_env *env,
                 RETURN(rc);
         }
 
+       if (KEY_IS(KEY_DEFAULT_EASIZE)) {
+               __u32 *default_easize = val;
+
+               exp->exp_obd->u.cli.cl_default_mds_easize = *default_easize;
+               RETURN(0);
+       }
+
        CERROR("Unknown key %s\n", (char *)key);
        RETURN(-EINVAL);
 }
 
 static int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
-                       __u32 keylen, void *key, __u32 *vallen, void *val,
-                       struct lov_stripe_md *lsm)
+                       __u32 keylen, void *key, __u32 *vallen, void *val)
 {
        int rc = -EINVAL;
 
@@ -2691,13 +2584,10 @@ static int mdc_get_info(const struct lu_env *env, struct obd_export *exp,
                *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize;
                RETURN(0);
        } else if (KEY_IS(KEY_MAX_COOKIESIZE)) {
-               __u32 mdsize, *max_cookiesize;
+               __u32 *max_cookiesize;
 
                if (*vallen != sizeof(int))
                        RETURN(-EINVAL);
-               mdsize = *(int *)val;
-               if (mdsize > exp->exp_obd->u.cli.cl_max_mds_cookiesize)
-                       exp->exp_obd->u.cli.cl_max_mds_cookiesize = mdsize;
                max_cookiesize = val;
                *max_cookiesize = exp->exp_obd->u.cli.cl_max_mds_cookiesize;
                RETURN(0);
@@ -2916,7 +2806,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
         rc = client_obd_setup(obd, cfg);
         if (rc)
                 GOTO(err_close_lock, rc);
-#ifdef LPROCFS
+#ifdef CONFIG_PROC_FS
        obd->obd_vars = lprocfs_mdc_obd_vars;
        lprocfs_obd_setup(obd);
        lprocfs_alloc_md_stats(obd, 0);
@@ -3011,7 +2901,7 @@ static int mdc_cleanup(struct obd_device *obd)
         return client_obd_cleanup(obd);
 }
 
-static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf)
+static int mdc_process_config(struct obd_device *obd, size_t len, void *buf)
 {
         struct lustre_cfg *lcfg = buf;
        int rc = class_process_proc_param(PARAM_MDC, obd->obd_vars, lcfg, obd);
@@ -3142,7 +3032,6 @@ static struct md_ops mdc_md_ops = {
         .m_find_cbdata      = mdc_find_cbdata,
         .m_close            = mdc_close,
         .m_create           = mdc_create,
-        .m_done_writing     = mdc_done_writing,
         .m_enqueue          = mdc_enqueue,
         .m_getattr          = mdc_getattr,
         .m_getattr_name     = mdc_getattr_name,