Whamcloud - gitweb
ORNL-28 recovery: rework extend_recovery_timer()
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 0a86137..e94ade4 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -341,6 +344,7 @@ static int mdt_getstatus(struct mdt_thread_info *info)
 
 static int mdt_statfs(struct mdt_thread_info *info)
 {
+        struct ptlrpc_request *req = mdt_info_req(info);
         struct md_device      *next  = info->mti_mdt->mdt_child;
         struct ptlrpc_service *svc;
         struct obd_statfs     *osfs;
@@ -367,6 +371,10 @@ static int mdt_statfs(struct mdt_thread_info *info)
                                               &info->mti_u.ksfs);
                 statfs_pack(osfs, &info->mti_u.ksfs);
         }
+
+        if (rc == 0)
+                mdt_counter_incr(req->rq_export, LPROC_MDT_STATFS);
+
         RETURN(rc);
 }
 
@@ -667,6 +675,7 @@ static int mdt_renew_capa(struct mdt_thread_info *info)
 
 static int mdt_getattr(struct mdt_thread_info *info)
 {
+        struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_object       *obj = info->mti_object;
         struct req_capsule      *pill = info->mti_pill;
         struct mdt_body         *reqbody;
@@ -728,6 +737,9 @@ static int mdt_getattr(struct mdt_thread_info *info)
                 mdt_exit_ucred(info);
         EXIT;
 out_shrink:
+        if (rc == 0)
+                mdt_counter_incr(req->rq_export, LPROC_MDT_GETATTR);
+
         mdt_shrink_reply(info);
         return rc;
 }
@@ -818,7 +830,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         struct lu_name         *lname     = NULL;
         const char             *name      = NULL;
         int                     namelen   = 0;
-        struct mdt_lock_handle *lhp;
+        struct mdt_lock_handle *lhp       = NULL;
         struct ldlm_lock       *lock;
         struct ldlm_res_id     *res_id;
         int                     is_resent;
@@ -848,9 +860,13 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 if (namelen == 0) {
                         reqbody = req_capsule_client_get(info->mti_pill,
                                                          &RMF_MDT_BODY);
-                        LASSERT(fid_is_sane(&reqbody->fid2));
-                        name = NULL;
+                        if (unlikely(reqbody == NULL))
+                                RETURN(err_serious(-EFAULT));
 
+                        if (unlikely(!fid_is_sane(&reqbody->fid2)))
+                                RETURN(err_serious(-EINVAL));
+
+                        name = NULL;
                         CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
                                "ldlm_rep = %p\n",
                                PFID(mdt_object_fid(parent)), PFID(&reqbody->fid2),
@@ -924,16 +940,15 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 RETURN(rc);
         }
 
-        /* step 1: lock parent */
-        lhp = &info->mti_lh[MDT_LH_PARENT];
-        mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
-        rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
-                             MDT_LOCAL_LOCK);
-
-        if (unlikely(rc != 0))
-                RETURN(rc);
-
         if (lname) {
+                /* step 1: lock parent */
+                lhp = &info->mti_lh[MDT_LH_PARENT];
+                mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
+                rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
+                                     MDT_LOCAL_LOCK);
+                if (unlikely(rc != 0))
+                        RETURN(rc);
+
                 /* step 2: lookup child's fid by name */
                 rc = mdo_lookup(info->mti_env, next, lname, child_fid,
                                 &info->mti_spec);
@@ -993,7 +1008,7 @@ relock:
                         LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
                                         &child->mot_obj.mo_lu,
                                         "Object doesn't exist!\n");
-                        GOTO(out_child, rc = -ESTALE);
+                        GOTO(out_child, rc = -ENOENT);
                 }
 
                 if (!(child_bits & MDS_INODELOCK_UPDATE)) {
@@ -1057,7 +1072,8 @@ relock:
 out_child:
         mdt_object_put(info->mti_env, child);
 out_parent:
-        mdt_object_unlock(info, parent, lhp, 1);
+        if (lhp)
+                mdt_object_unlock(info, parent, lhp, 1);
         return rc;
 }
 
@@ -1203,7 +1219,7 @@ static int mdt_disconnect(struct mdt_thread_info *info)
 }
 
 static int mdt_sendpage(struct mdt_thread_info *info,
-                        struct lu_rdpg *rdpg)
+                        struct lu_rdpg *rdpg, int nob)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct obd_export       *exp = req->rq_export;
@@ -1211,7 +1227,6 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
         int                      tmpcount;
         int                      tmpsize;
-        int                      timeout;
         int                      i;
         int                      rc;
         ENTRY;
@@ -1221,63 +1236,16 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         if (desc == NULL)
                 RETURN(-ENOMEM);
 
-        for (i = 0, tmpcount = rdpg->rp_count;
-                i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
+        for (i = 0, tmpcount = nob;
+                i < rdpg->rp_npages && tmpcount > 0; i++, tmpcount -= tmpsize) {
                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
         }
 
-        LASSERT(desc->bd_nob == rdpg->rp_count);
-        rc = sptlrpc_svc_wrap_bulk(req, desc);
-        if (rc)
-                GOTO(free_desc, rc);
-
-        rc = ptlrpc_start_bulk_transfer(desc);
-        if (rc)
-                GOTO(free_desc, rc);
-
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
-                GOTO(abort_bulk, rc = 0);
-
-        do {
-                timeout = (int) req->rq_deadline - cfs_time_current_sec();
-                if (timeout < 0)
-                        CERROR("Req deadline already passed %lu (now: %lu)\n",
-                               req->rq_deadline, cfs_time_current_sec());
-                *lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(max(timeout, 1)),
-                                            cfs_time_seconds(1), NULL, NULL);
-                rc = l_wait_event(desc->bd_waitq,
-                                  !ptlrpc_server_bulk_active(desc) ||
-                                  exp->exp_failed ||
-                                  exp->exp_abort_active_req, lwi);
-                LASSERT (rc == 0 || rc == -ETIMEDOUT);
-        } while ((rc == -ETIMEDOUT) &&
-                 (req->rq_deadline > cfs_time_current_sec()));
-
-        if (rc == 0) {
-                if (desc->bd_success &&
-                    desc->bd_nob_transferred == rdpg->rp_count)
-                        GOTO(free_desc, rc);
-
-                rc = -ETIMEDOUT;
-                if (exp->exp_abort_active_req || exp->exp_failed)
-                        GOTO(abort_bulk, rc);
-        }
-
-        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s",
-                  (rc == -ETIMEDOUT) ? "timeout" : "network error",
-                  desc->bd_nob_transferred, rdpg->rp_count,
-                  exp->exp_client_uuid.uuid,
-                  exp->exp_connection->c_remote_uuid.uuid);
-
-        class_fail_export(exp);
-
-        EXIT;
-abort_bulk:
-        ptlrpc_abort_bulk(desc);
-free_desc:
+        LASSERT(desc->bd_nob == nob);
+        rc = target_bulk_io(exp, desc, lwi);
         ptlrpc_free_bulk(desc);
-        return rc;
+        RETURN(rc);
 }
 
 #ifdef HAVE_SPLIT_SUPPORT
@@ -1485,8 +1453,12 @@ static int mdt_readpage(struct mdt_thread_info *info)
         }
 
         rdpg->rp_attrs = reqbody->mode;
-        rdpg->rp_count  = reqbody->nlink;
-        rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
+        if (info->mti_exp->exp_connect_flags & OBD_CONNECT_64BITHASH)
+                rdpg->rp_attrs |= LUDA_64BITHASH;
+        rdpg->rp_count  = min_t(unsigned int, reqbody->nlink,
+                                PTLRPC_MAX_BRW_SIZE);
+        rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
+                          CFS_PAGE_SHIFT;
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
         if (rdpg->rp_pages == NULL)
                 RETURN(-ENOMEM);
@@ -1499,11 +1471,11 @@ static int mdt_readpage(struct mdt_thread_info *info)
 
         /* call lower layers to fill allocated pages with directory data */
         rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
-        if (rc)
+        if (rc < 0)
                 GOTO(free_rdpg, rc);
 
         /* send pages to client */
-        rc = mdt_sendpage(info, rdpg);
+        rc = mdt_sendpage(info, rdpg, rc);
 
         EXIT;
 free_rdpg:
@@ -1678,6 +1650,7 @@ static int mdt_object_sync(struct mdt_thread_info *info)
 
 static int mdt_sync(struct mdt_thread_info *info)
 {
+        struct ptlrpc_request *req = mdt_info_req(info);
         struct req_capsule *pill = info->mti_pill;
         struct mdt_body *body;
         int rc;
@@ -1725,6 +1698,9 @@ static int mdt_sync(struct mdt_thread_info *info)
                 } else
                         rc = err_serious(rc);
         }
+        if (rc == 0)
+                mdt_counter_incr(req->rq_export, LPROC_MDT_SYNC);
+
         RETURN(rc);
 }
 
@@ -2048,7 +2024,7 @@ static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
                         sptlrpc_svc_ctx_invalidate(req);
         }
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, obd_fail_val);
+        CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
 
         return rc;
 }
@@ -3404,6 +3380,10 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
         }
         rep->lock_policy_res2 = clear_serious(rc);
 
+        if (rep->lock_policy_res2 == -ENOENT &&
+            mdt_get_disposition(rep, DISP_LOOKUP_NEG))
+                rep->lock_policy_res2 = 0;
+
         if (rc == -ENOTCONN || rc == -ENODEV ||
             rc == -EOVERFLOW) { /**< if VBR failure then return error */
                 /*
@@ -4524,14 +4504,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                         CERROR("CMD Operation not allowed in IOP mode\n");
                         GOTO(err_lmi, rc = -EINVAL);
                 }
-                /* Read recovery timeouts */
-                if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_soft)
-                        obd->obd_recovery_timeout =
-                                lsi->lsi_lmd->lmd_recovery_time_soft;
 
-                if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_hard)
-                        obd->obd_recovery_time_hard =
-                                lsi->lsi_lmd->lmd_recovery_time_hard;
+                obd->u.obt.obt_magic = OBT_MAGIC;
         }
 
         cfs_rwlock_init(&m->mdt_sptlrpc_lock);
@@ -4748,15 +4722,21 @@ static int mdt_process_config(const struct lu_env *env,
 
                 /*
                  * For interoperability between 1.8 and 2.0,
-                 * skip old "mdt.group_upcall" param.
                  */
                 {
+                        /* Skip old "mdt.group_upcall" param. */
                         char *param = lustre_cfg_string(cfg, 1);
                         if (param && !strncmp("mdt.group_upcall", param, 16)) {
                                 CWARN("For 1.8 interoperability, skip this"
                                        " mdt.group_upcall. It is obsolete\n");
                                 break;
                         }
+                        /* Rename old "mdt.quota_type" to "mdd.quota_type. */
+                        if (param && !strncmp("mdt.quota_type", param, 14)) {
+                                CWARN("Found old param mdt.quota_type, changed"
+                                      " it to mdd.quota_type.\n");
+                                param[2] = 'd';
+                        }
                 }
 
                 lprocfs_mdt_init_vars(&lvars);
@@ -4806,6 +4786,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                 lu_object_add_top(h, o);
                 o->lo_ops = &mdt_obj_ops;
                 cfs_sema_init(&mo->mot_ioepoch_sem, 1);
+                cfs_sema_init(&mo->mot_lov_sem, 1);
                 RETURN(o);
         } else
                 RETURN(NULL);
@@ -4914,6 +4895,24 @@ static int mdt_connect_internal(struct obd_export *exp,
                 if (!mdt->mdt_som_conf)
                         data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
 
+                if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
+                        data->ocd_brw_size = min(data->ocd_brw_size,
+                               (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
+                        if (data->ocd_brw_size == 0) {
+                                CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
+                                       " ocd_version: %x ocd_grant: %d "
+                                       "ocd_index: %u ocd_brw_size is "
+                                       "unexpectedly zero, network data "
+                                       "corruption? Refusing connection of this"
+                                       " client\n",
+                                       exp->exp_obd->obd_name,
+                                       exp->exp_client_uuid.uuid,
+                                       exp, data->ocd_connect_flags, data->ocd_version,
+                                       data->ocd_grant, data->ocd_index);
+                                return -EPROTO;
+                        }
+                }
+
                 cfs_spin_lock(&exp->exp_lock);
                 exp->exp_connect_flags = data->ocd_connect_flags;
                 cfs_spin_unlock(&exp->exp_lock);
@@ -5024,6 +5023,9 @@ static int mdt_obd_connect(const struct lu_env *env,
         if (rc)
                 GOTO(out, rc);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_RCVG_FLAG))
+                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
+
         rc = mdt_connect_internal(lexp, mdt, data);
         if (rc == 0) {
                 struct mdt_thread_info *mti;
@@ -5139,8 +5141,11 @@ static int mdt_export_cleanup(struct obd_export *exp)
                         ma->ma_need = 0;
                         /* It is not for setattr, just tell MDD to send
                          * DESTROY RPC to OSS if needed */
-                        ma->ma_attr_flags = MDS_CLOSE_CLEANUP;
                         ma->ma_valid = MA_FLAGS;
+                        ma->ma_attr_flags = MDS_CLOSE_CLEANUP;
+                        /* Don't unlink orphan on failover umount, LU-184 */
+                        if (exp->exp_flags & OBD_OPT_FAILOVER)
+                                ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
                         mdt_mfd_close(info, mfd);
                 }
                 OBD_FREE_LARGE(ma->ma_cookie, cookie_size);
@@ -5153,7 +5158,7 @@ out_lmm:
         info->mti_mdt = NULL;
         /* cleanup client slot early */
         /* Do not erase record for recoverable client. */
-        if (!obd->obd_fail || exp->exp_failed)
+        if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
                 mdt_client_del(&env, mdt);
         lu_env_fini(&env);
 
@@ -5289,11 +5294,11 @@ static int mdt_upcall(const struct lu_env *env, struct md_device *md,
         RETURN(rc);
 }
 
-static int mdt_obd_notify(struct obd_device *host,
+static int mdt_obd_notify(struct obd_device *obd,
                           struct obd_device *watched,
                           enum obd_notify_event ev, void *data)
 {
-        struct mdt_device *mdt = mdt_dev(host->obd_lu_dev);
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
 #ifdef HAVE_QUOTA_SUPPORT
         struct md_device *next = mdt->mdt_child;
 #endif
@@ -5451,6 +5456,7 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
         struct mdt_lock_handle  *lh;
         int rc;
         ENTRY;
+
         CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
         if (!fid_is_sane(fid))
                 RETURN(-EINVAL);
@@ -5470,6 +5476,9 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
                  * fid, this is error to find remote object here
                  */
                 CERROR("nonlocal object "DFID"\n", PFID(fid));
+        } else if (rc == 0) {
+                 *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
+                rc = -ENOENT;
         } else {
                 version = mo_version_get(mti->mti_env, mdt_object_child(obj));
                *(__u64 *)data->ioc_inlbuf2 = version;