Whamcloud - gitweb
LU-9679 modules: Use LIST_HEAD for declaring list_heads
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index 855ce59..93b2e63 100644 (file)
@@ -38,6 +38,7 @@
 #include <linux/pagemap.h>
 #include <linux/user_namespace.h>
 #include <linux/utsname.h>
+#include <linux/delay.h>
 #ifdef HAVE_UIDGID_HEADER
 # include <linux/uidgid.h>
 #endif
@@ -348,7 +349,7 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
        /* Flush local XATTR locks to get rid of a possible cancel RPC */
        if (opcode == MDS_REINT && fid_is_sane(fid) &&
            exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) {
-               struct list_head cancels = LIST_HEAD_INIT(cancels);
+               LIST_HEAD(cancels);
                int count;
 
                /* Without that packing would fail */
@@ -451,6 +452,7 @@ static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid,
        LASSERT(obd_md_valid == OBD_MD_FLXATTR ||
                obd_md_valid == OBD_MD_FLXATTRLS);
 
+       /* The below message is checked in sanity-selinux.sh test_20d */
        CDEBUG(D_INFO, "%s: get xattr '%s' for "DFID"\n",
               exp->exp_obd->obd_name, name, PFID(fid));
        rc = mdc_xattr_common(exp, &RQF_MDS_GETXATTR, fid, MDS_GETXATTR,
@@ -500,7 +502,7 @@ out:
        return rc;
 }
 
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
 static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md)
 {
         struct req_capsule     *pill = &req->rq_pill;
@@ -649,7 +651,7 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
                         rc = mdc_unpack_acl(req, md);
                         if (rc)
                                 GOTO(out, rc);
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
                 } else {
                         md->posix_acl = NULL;
 #endif
@@ -659,7 +661,7 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
         EXIT;
 out:
         if (rc) {
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
                 posix_acl_release(md->posix_acl);
 #endif
         }
@@ -683,7 +685,7 @@ void mdc_replay_open(struct ptlrpc_request *req)
 
        if (mod == NULL) {
                DEBUG_REQ(D_ERROR, req,
-                         "Can't properly replay without open data.");
+                         "cannot properly replay without open data");
                EXIT;
                return;
        }
@@ -780,14 +782,14 @@ int mdc_set_open_replay_data(struct obd_export *exp,
         /* Outgoing messages always in my byte order. */
         LASSERT(body != NULL);
 
-        /* Only if the import is replayable, we set replay_open data */
-        if (och && imp->imp_replayable) {
-                mod = obd_mod_alloc();
-                if (mod == NULL) {
-                        DEBUG_REQ(D_ERROR, open_req,
-                                  "Can't allocate md_open_data");
-                        RETURN(0);
-                }
+       /* Only if the import is replayable, we set replay_open data */
+       if (och && imp->imp_replayable) {
+               mod = obd_mod_alloc();
+               if (mod == NULL) {
+                       DEBUG_REQ(D_ERROR, open_req,
+                                 "cannot allocate md_open_data");
+                       RETURN(0);
+               }
 
                 /**
                  * Take a reference on \var mod, to be freed on mdc_close().
@@ -839,8 +841,9 @@ static void mdc_free_open(struct md_open_data *mod)
         * The worst thing is eviction if the client gets open lock
         **/
 
-       DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request rq_replay"
-                 "= %d\n", mod->mod_open_req->rq_replay);
+       DEBUG_REQ(D_RPCTRACE, mod->mod_open_req,
+                 "free open request, rq_replay=%d",
+                 mod->mod_open_req->rq_replay);
 
        ptlrpc_request_committed(mod->mod_open_req, committed);
        if (mod->mod_close_req)
@@ -930,7 +933,7 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
 
                mod->mod_close_req = req;
 
-               DEBUG_REQ(D_HA, mod->mod_open_req, "matched open");
+               DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "matched open");
                /* We no longer want to preserve this open for replay even
                 * though the open was committed. b=3632, b=3633 */
                spin_lock(&mod->mod_open_req->rq_lock);
@@ -981,37 +984,37 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
        rc = ptlrpc_queue_wait(req);
        mdc_put_mod_rpc_slot(req, NULL);
 
-        if (req->rq_repmsg == NULL) {
-                CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req,
-                       req->rq_status);
-                if (rc == 0)
-                        rc = req->rq_status ?: -EIO;
-        } else if (rc == 0 || rc == -EAGAIN) {
-                struct mdt_body *body;
-
-                rc = lustre_msg_get_status(req->rq_repmsg);
-                if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
-                        DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err "
-                                  "= %d", rc);
-                        if (rc > 0)
-                                rc = -rc;
-                }
-                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
-                if (body == NULL)
-                        rc = -EPROTO;
-        } else if (rc == -ESTALE) {
-                /**
-                 * it can be allowed error after 3633 if open was committed and
-                 * server failed before close was sent. Let's check if mod
-                 * exists and return no error in that case
-                 */
-                if (mod) {
-                        DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc);
-                        LASSERT(mod->mod_open_req != NULL);
-                        if (mod->mod_open_req->rq_committed)
-                                rc = 0;
-                }
-        }
+       if (req->rq_repmsg == NULL) {
+               CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req,
+                      req->rq_status);
+               if (rc == 0)
+                       rc = req->rq_status ?: -EIO;
+       } else if (rc == 0 || rc == -EAGAIN) {
+               struct mdt_body *body;
+
+               rc = lustre_msg_get_status(req->rq_repmsg);
+               if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
+                       DEBUG_REQ(D_ERROR, req,
+                                 "type = PTL_RPC_MSG_ERR: rc = %d", rc);
+                       if (rc > 0)
+                               rc = -rc;
+               }
+               body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+               if (body == NULL)
+                       rc = -EPROTO;
+       } else if (rc == -ESTALE) {
+               /**
+                * it can be allowed error after 3633 if open was committed and
+                * server failed before close was sent. Let's check if mod
+                * exists and return no error in that case
+                */
+               if (mod) {
+                       DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc);
+                       LASSERT(mod->mod_open_req != NULL);
+                       if (mod->mod_open_req->rq_committed)
+                               rc = 0;
+               }
+       }
 
 out:
         if (mod) {
@@ -1033,14 +1036,11 @@ static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid,
        struct ptlrpc_request   *req;
        struct ptlrpc_bulk_desc *desc;
        int                      i;
-       wait_queue_head_t        waitq;
        int                      resends = 0;
-       struct l_wait_info       lwi;
        int                      rc;
        ENTRY;
 
        *request = NULL;
-       init_waitqueue_head(&waitq);
 
 restart_bulk:
        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE);
@@ -1085,9 +1085,7 @@ restart_bulk:
                               exp->exp_obd->obd_name, -EIO);
                        RETURN(-EIO);
                }
-               lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
-                                      NULL);
-               l_wait_event(waitq, 0, &lwi);
+               ssleep(resends);
 
                goto restart_bulk;
        }
@@ -1116,7 +1114,7 @@ static void mdc_release_page(struct page *page, int remove)
        if (remove) {
                lock_page(page);
                if (likely(page->mapping != NULL))
-                       truncate_complete_page(page->mapping, page);
+                       delete_from_page_cache(page);
                unlock_page(page);
        }
        put_page(page);
@@ -1132,16 +1130,17 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
         */
        unsigned long offset = hash_x_index(*hash, hash64);
        struct page *page;
+       unsigned long flags;
        int found;
 
-       xa_lock_irq(&mapping->i_pages);
+       xa_lock_irqsave(&mapping->i_pages, flags);
        found = radix_tree_gang_lookup(&mapping->page_tree,
                                       (void **)&page, offset, 1);
        if (found > 0 && !radix_tree_exceptional_entry(page)) {
                struct lu_dirpage *dp;
 
                get_page(page);
-               xa_unlock_irq(&mapping->i_pages);
+               xa_unlock_irqrestore(&mapping->i_pages, flags);
                /*
                 * In contrast to find_lock_page() we are sure that directory
                 * page cannot be truncated (while DLM lock is held) and,
@@ -1190,7 +1189,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash,
                        page = ERR_PTR(-EIO);
                }
        } else {
-               xa_unlock_irq(&mapping->i_pages);
+               xa_unlock_irqrestore(&mapping->i_pages, flags);
                page = NULL;
        }
        return page;
@@ -1252,12 +1251,12 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs)
        int i;
 
        for (i = 0; i < cfs_pgs; i++) {
-               struct lu_dirpage       *dp = kmap(pages[i]);
-               struct lu_dirpage       *first = dp;
-               struct lu_dirent        *end_dirent = NULL;
-               struct lu_dirent        *ent;
-               __u64           hash_end = le64_to_cpu(dp->ldp_hash_end);
-               __u32           flags = le32_to_cpu(dp->ldp_flags);
+               struct lu_dirpage *dp = kmap(pages[i]);
+               struct lu_dirpage *first = dp;
+               struct lu_dirent *end_dirent = NULL;
+               struct lu_dirent *ent;
+               __u64 hash_end = dp->ldp_hash_end;
+               __u32 flags = dp->ldp_flags;
 
                while (--lu_pgs > 0) {
                        ent = lu_dirent_start(dp);
@@ -1272,8 +1271,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs)
                                break;
 
                        /* Save the hash and flags of this lu_dirpage. */
-                       hash_end = le64_to_cpu(dp->ldp_hash_end);
-                       flags = le32_to_cpu(dp->ldp_flags);
+                       hash_end = dp->ldp_hash_end;
+                       flags = dp->ldp_flags;
 
                        /* Check if lu_dirpage contains no entries. */
                        if (end_dirent == NULL)
@@ -1308,14 +1307,6 @@ struct readpage_param {
        struct md_callback      *rp_cb;
 };
 
-#ifndef HAVE_DELETE_FROM_PAGE_CACHE
-static inline void delete_from_page_cache(struct page *page)
-{
-       remove_from_page_cache(page);
-       put_page(page);
-}
-#endif
-
 /**
  * Read pages from server.
  *
@@ -1610,8 +1601,7 @@ static int mdc_statfs_async(struct obd_export *exp,
        ptlrpc_request_set_replen(req);
        req->rq_interpret_reply = mdc_statfs_interpret;
 
-       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-       aa = ptlrpc_req_async_args(req);
+       aa = ptlrpc_req_async_args(aa, req);
        *aa = *oinfo;
 
        ptlrpcd_add_req(req);
@@ -2083,7 +2073,7 @@ static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
 static int mdc_ioc_swap_layouts(struct obd_export *exp,
                                struct md_op_data *op_data)
 {
-       struct list_head cancels = LIST_HEAD_INIT(cancels);
+       LIST_HEAD(cancels);
        struct ptlrpc_request   *req;
        int                      rc, count;
        struct mdc_swap_layouts *msl, *payload;
@@ -2574,6 +2564,81 @@ static int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid,
         RETURN(rc);
 }
 
+struct mdc_rmfid_args {
+       int *mra_rcs;
+       int mra_nr;
+};
+
+int mdc_rmfid_interpret(const struct lu_env *env, struct ptlrpc_request *req,
+                         void *args, int rc)
+{
+       struct mdc_rmfid_args *aa;
+       int *rcs, size;
+       ENTRY;
+
+       if (!rc) {
+               aa = ptlrpc_req_async_args(aa, req);
+
+               size = req_capsule_get_size(&req->rq_pill, &RMF_RCS,
+                                           RCL_SERVER);
+               LASSERT(size == sizeof(int) * aa->mra_nr);
+               rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
+               LASSERT(rcs);
+               LASSERT(aa->mra_rcs);
+               LASSERT(aa->mra_nr);
+               memcpy(aa->mra_rcs, rcs, size);
+       }
+
+       RETURN(rc);
+}
+
+static int mdc_rmfid(struct obd_export *exp, struct fid_array *fa,
+                    int *rcs, struct ptlrpc_request_set *set)
+{
+       struct ptlrpc_request *req;
+       struct mdc_rmfid_args *aa;
+       struct mdt_body *b;
+       struct lu_fid *tmp;
+       int rc, flen;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_RMFID);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       flen = fa->fa_nr * sizeof(struct lu_fid);
+       req_capsule_set_size(&req->rq_pill, &RMF_FID_ARRAY,
+                            RCL_CLIENT, flen);
+       req_capsule_set_size(&req->rq_pill, &RMF_FID_ARRAY,
+                            RCL_SERVER, flen);
+       req_capsule_set_size(&req->rq_pill, &RMF_RCS,
+                            RCL_SERVER, fa->fa_nr * sizeof(__u32));
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_RMFID);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_FID_ARRAY);
+       memcpy(tmp, fa->fa_fids, flen);
+
+       mdc_pack_body(req, NULL, 0, 0, -1, 0);
+       b = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY);
+       b->mbo_ctime = ktime_get_real_seconds();
+
+       ptlrpc_request_set_replen(req);
+
+       LASSERT(rcs);
+       aa = ptlrpc_req_async_args(aa, req);
+       aa->mra_rcs = rcs;
+       aa->mra_nr = fa->fa_nr;
+       req->rq_interpret_reply = mdc_rmfid_interpret;
+
+       ptlrpc_set_add_req(set, req);
+       ptlrpc_check_set(NULL, set);
+
+       RETURN(rc);
+}
+
 static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
                            enum obd_import_event event)
 {
@@ -2856,32 +2921,33 @@ static struct obd_ops mdc_obd_ops = {
 
 static struct md_ops mdc_md_ops = {
        .m_get_root         = mdc_get_root,
-        .m_null_inode      = mdc_null_inode,
-        .m_close            = mdc_close,
-        .m_create           = mdc_create,
-        .m_enqueue          = mdc_enqueue,
-        .m_getattr          = mdc_getattr,
-        .m_getattr_name     = mdc_getattr_name,
-        .m_intent_lock      = mdc_intent_lock,
-        .m_link             = mdc_link,
-        .m_rename           = mdc_rename,
-        .m_setattr          = mdc_setattr,
-        .m_setxattr         = mdc_setxattr,
-        .m_getxattr         = mdc_getxattr,
+       .m_null_inode       = mdc_null_inode,
+       .m_close            = mdc_close,
+       .m_create           = mdc_create,
+       .m_enqueue          = mdc_enqueue,
+       .m_getattr          = mdc_getattr,
+       .m_getattr_name     = mdc_getattr_name,
+       .m_intent_lock      = mdc_intent_lock,
+       .m_link             = mdc_link,
+       .m_rename           = mdc_rename,
+       .m_setattr          = mdc_setattr,
+       .m_setxattr         = mdc_setxattr,
+       .m_getxattr         = mdc_getxattr,
        .m_fsync                = mdc_fsync,
        .m_file_resync          = mdc_file_resync,
        .m_read_page            = mdc_read_page,
-        .m_unlink           = mdc_unlink,
-        .m_cancel_unused    = mdc_cancel_unused,
-        .m_init_ea_size     = mdc_init_ea_size,
-        .m_set_lock_data    = mdc_set_lock_data,
-        .m_lock_match       = mdc_lock_match,
-        .m_get_lustre_md    = mdc_get_lustre_md,
-        .m_free_lustre_md   = mdc_free_lustre_md,
-        .m_set_open_replay_data = mdc_set_open_replay_data,
-        .m_clear_open_replay_data = mdc_clear_open_replay_data,
-        .m_intent_getattr_async = mdc_intent_getattr_async,
-        .m_revalidate_lock      = mdc_revalidate_lock
+       .m_unlink           = mdc_unlink,
+       .m_cancel_unused    = mdc_cancel_unused,
+       .m_init_ea_size     = mdc_init_ea_size,
+       .m_set_lock_data    = mdc_set_lock_data,
+       .m_lock_match       = mdc_lock_match,
+       .m_get_lustre_md    = mdc_get_lustre_md,
+       .m_free_lustre_md   = mdc_free_lustre_md,
+       .m_set_open_replay_data = mdc_set_open_replay_data,
+       .m_clear_open_replay_data = mdc_clear_open_replay_data,
+       .m_intent_getattr_async = mdc_intent_getattr_async,
+       .m_revalidate_lock      = mdc_revalidate_lock,
+       .m_rmfid                = mdc_rmfid,
 };
 
 static int __init mdc_init(void)