Whamcloud - gitweb
LU-4629 mdc: fix issue found by Klocwork Insight tool
[fs/lustre-release.git] / lustre / mdc / mdc_reint.c
index ecab6c5..a233011 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDC
 
 #ifdef __KERNEL__
-#ifndef AUTOCONF_INCLUDED
-# include <linux/config.h>
-#endif
 # include <linux/module.h>
 # include <linux/kernel.h>
 #else
@@ -76,16 +70,26 @@ static int mdc_reint(struct ptlrpc_request *request,
 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
  * found by @fid. Found locks are added into @cancel list. Returns the amount of
  * locks added to @cancels list. */
-int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
-                            struct list_head *cancels, ldlm_mode_t mode,
+int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
+                            cfs_list_t *cancels, ldlm_mode_t mode,
                             __u64 bits)
 {
+       struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         ldlm_policy_data_t policy = {{0}};
         struct ldlm_res_id res_id;
         struct ldlm_resource *res;
         int count;
         ENTRY;
 
+       /* Return, i.e. cancel nothing, only if ELC is supported (flag in
+        * export) but disabled through procfs (flag in NS).
+        *
+        * This distinguishes from a case when ELC is not supported originally,
+        * when we still want to cancel locks in advance and just cancel them
+        * locally, without sending any RPC. */
+       if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
+               RETURN(0);
+
         fid_build_reg_res_name(fid, &res_id);
         res = ldlm_resource_get(exp->exp_obd->obd_namespace,
                                 NULL, &res_id, 0, 0);
@@ -101,19 +105,6 @@ int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
         RETURN(count);
 }
 
-static int mdc_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
-                            struct list_head *cancels, int count)
-{
-        return ldlm_prep_elc_req(exp, req, LUSTRE_MDS_VERSION, MDS_REINT,
-                                 0, cancels, count);
-}
-
-/* If mdc_setattr is called with an 'iattr', then it is a normal RPC that
- * should take the normal semaphore and go to the normal portal.
- *
- * If it is called with iattr->ia_valid & ATTR_FROM_OPEN, then it is a
- * magic open-path setattr that should take the setattr semaphore and
- * go to the setattr portal. */
 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 void *ea, int ealen, void *ea2, int ea2len,
                 struct ptlrpc_request **request, struct md_open_data **mod)
@@ -131,10 +122,11 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
         bits = MDS_INODELOCK_UPDATE;
         if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
                 bits |= MDS_INODELOCK_LOOKUP;
-        if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
-            (fid_is_sane(&op_data->op_fid1)))
-                count = mdc_resource_get_unused(exp, &op_data->op_fid1,
-                                                &cancels, LCK_EX, bits);
+       if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
+           (fid_is_sane(&op_data->op_fid1)) &&
+           !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+               count = mdc_resource_get_unused(exp, &op_data->op_fid1,
+                                               &cancels, LCK_EX, bits);
         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
                                    &RQF_MDS_REINT_SETATTR);
         if (req == NULL) {
@@ -149,19 +141,13 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
                              ea2len);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
-        if (op_data->op_attr.ia_valid & ATTR_FROM_OPEN) {
-                req->rq_request_portal = MDS_SETATTR_PORTAL;
-                ptlrpc_at_set_req_timeout(req);
-                rpc_lock = obd->u.cli.cl_setattr_lock;
-        } else {
-                rpc_lock = obd->u.cli.cl_rpc_lock;
-        }
+        rpc_lock = obd->u.cli.cl_rpc_lock;
 
         if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
                 CDEBUG(D_INODE, "setting mtime "CFS_TIME_T
@@ -185,6 +171,7 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                         req->rq_cb_data = *mod;
                         (*mod)->mod_open_req = req;
                         req->rq_commit_cb = mdc_commit_open;
+                       (*mod)->mod_is_create = true;
                         /**
                          * Take an extra reference on \var mod, it protects \var
                          * mod from being freed on eviction (commit callback is
@@ -199,7 +186,7 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
 
         /* Save the obtained info in the original RPC for the replay case. */
         if (rc == 0 && (op_data->op_flags & MF_EPOCH_OPEN)) {
-                struct mdt_epoch *epoch;
+                struct mdt_ioepoch *epoch;
                 struct mdt_body  *body;
 
                 epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
@@ -216,12 +203,13 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 rc = 0;
         }
         *request = req;
-        if (rc && req->rq_commit_cb) {
-                /* Put an extra reference on \var mod on error case. */
-                obd_mod_put(*mod);
-                req->rq_commit_cb(req);
-        }
-        RETURN(rc);
+       if (rc && req->rq_commit_cb) {
+               /* Put an extra reference on \var mod on error case. */
+               if (mod != NULL && *mod != NULL)
+                       obd_mod_put(*mod);
+               req->rq_commit_cb(req);
+       }
+       RETURN(rc);
 }
 
 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
@@ -231,7 +219,9 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
 {
         struct ptlrpc_request *req;
         int level, rc;
-        int count = 0;
+        int count, resends = 0;
+        struct obd_import *import = exp->exp_obd->u.cli.cl_import;
+        int generation = import->imp_generation;
         CFS_LIST_HEAD(cancels);
         ENTRY;
 
@@ -248,6 +238,8 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                 }
         }
 
+rebuild:
+        count = 0;
         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
@@ -266,11 +258,11 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
                              data && datalen ? datalen : 0);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
         /*
          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
@@ -281,6 +273,15 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
 
         ptlrpc_request_set_replen(req);
 
+       /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
+        * logic here */
+       req->rq_no_retry_einprogress = 1;
+
+        if (resends) {
+                req->rq_generation_set = 1;
+                req->rq_import_generation = generation;
+                req->rq_sent = cfs_time_current_sec() + resends;
+        }
         level = LUSTRE_IMP_FULL;
  resend:
         rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
@@ -289,6 +290,22 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
         if (rc == -ERESTARTSYS) {
                 level = LUSTRE_IMP_RECOVER;
                 goto resend;
+        } else if (rc == -EINPROGRESS) {
+                /* Retry create infinitely until succeed or get other
+                 * error code. */
+                ptlrpc_req_finished(req);
+                resends++;
+
+                CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
+                       exp->exp_obd->obd_name, resends,
+                       PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
+
+                if (generation == import->imp_generation) {
+                        goto rebuild;
+                } else {
+                        CDEBUG(D_HA, "resend cross eviction\n");
+                        RETURN(-EIO);
+                }
         } else if (rc == 0) {
                 struct mdt_body *body;
                 struct lustre_capa *capa;
@@ -318,16 +335,18 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
 
         LASSERT(req == NULL);
 
-        if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
-            (fid_is_sane(&op_data->op_fid1)))
-                count = mdc_resource_get_unused(exp, &op_data->op_fid1,
-                                                &cancels, LCK_EX,
-                                                MDS_INODELOCK_UPDATE);
-        if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
-            (fid_is_sane(&op_data->op_fid3)))
-                count += mdc_resource_get_unused(exp, &op_data->op_fid3,
-                                                 &cancels, LCK_EX,
-                                                 MDS_INODELOCK_FULL);
+       if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
+           (fid_is_sane(&op_data->op_fid1)) &&
+           !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+               count = mdc_resource_get_unused(exp, &op_data->op_fid1,
+                                               &cancels, LCK_EX,
+                                               MDS_INODELOCK_UPDATE);
+       if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
+           (fid_is_sane(&op_data->op_fid3)) &&
+           !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+               count += mdc_resource_get_unused(exp, &op_data->op_fid3,
+                                                &cancels, LCK_EX,
+                                                MDS_INODELOCK_FULL);
         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
                                    &RQF_MDS_REINT_UNLINK);
         if (req == NULL) {
@@ -338,19 +357,19 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
-        mdc_unlink_pack(req, op_data);
+       mdc_unlink_pack(req, op_data);
 
-        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
-                             obd->u.cli.cl_max_mds_easize);
-        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                             obd->u.cli.cl_max_mds_cookiesize);
-        ptlrpc_request_set_replen(req);
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                            obd->u.cli.cl_default_mds_easize);
+       req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                            obd->u.cli.cl_default_mds_cookiesize);
+       ptlrpc_request_set_replen(req);
 
         *request = req;
 
@@ -390,11 +409,11 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
         mdc_link_pack(req, op_data);
         ptlrpc_request_set_replen(req);
@@ -450,22 +469,22 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
 
-        rc = mdc_prep_elc_req(exp, req, &cancels, count);
-        if (rc) {
-                ptlrpc_request_free(req);
-                RETURN(rc);
-        }
+       rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
 
         if (exp_connect_cancelset(exp) && req)
                 ldlm_cli_cancel_list(&cancels, count, req, 0);
 
         mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
 
-        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
-                             obd->u.cli.cl_max_mds_easize);
-        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
-                             obd->u.cli.cl_max_mds_cookiesize);
-        ptlrpc_request_set_replen(req);
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                            obd->u.cli.cl_default_mds_easize);
+       req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                            obd->u.cli.cl_default_mds_cookiesize);
+       ptlrpc_request_set_replen(req);
 
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
         *request = req;