Whamcloud - gitweb
ORNL-28 recovery: rework extend_recovery_timer()
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index da0fc09..e94ade4 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  *
@@ -341,6 +344,7 @@ static int mdt_getstatus(struct mdt_thread_info *info)
 
 static int mdt_statfs(struct mdt_thread_info *info)
 {
+        struct ptlrpc_request *req = mdt_info_req(info);
         struct md_device      *next  = info->mti_mdt->mdt_child;
         struct ptlrpc_service *svc;
         struct obd_statfs     *osfs;
@@ -367,6 +371,10 @@ static int mdt_statfs(struct mdt_thread_info *info)
                                               &info->mti_u.ksfs);
                 statfs_pack(osfs, &info->mti_u.ksfs);
         }
+
+        if (rc == 0)
+                mdt_counter_incr(req->rq_export, LPROC_MDT_STATFS);
+
         RETURN(rc);
 }
 
@@ -667,6 +675,7 @@ static int mdt_renew_capa(struct mdt_thread_info *info)
 
 static int mdt_getattr(struct mdt_thread_info *info)
 {
+        struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_object       *obj = info->mti_object;
         struct req_capsule      *pill = info->mti_pill;
         struct mdt_body         *reqbody;
@@ -728,6 +737,9 @@ static int mdt_getattr(struct mdt_thread_info *info)
                 mdt_exit_ucred(info);
         EXIT;
 out_shrink:
+        if (rc == 0)
+                mdt_counter_incr(req->rq_export, LPROC_MDT_GETATTR);
+
         mdt_shrink_reply(info);
         return rc;
 }
@@ -1207,7 +1219,7 @@ static int mdt_disconnect(struct mdt_thread_info *info)
 }
 
 static int mdt_sendpage(struct mdt_thread_info *info,
-                        struct lu_rdpg *rdpg)
+                        struct lu_rdpg *rdpg, int nob)
 {
         struct ptlrpc_request   *req = mdt_info_req(info);
         struct obd_export       *exp = req->rq_export;
@@ -1215,7 +1227,6 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
         int                      tmpcount;
         int                      tmpsize;
-        int                      timeout;
         int                      i;
         int                      rc;
         ENTRY;
@@ -1225,63 +1236,16 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         if (desc == NULL)
                 RETURN(-ENOMEM);
 
-        for (i = 0, tmpcount = rdpg->rp_count;
-                i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
+        for (i = 0, tmpcount = nob;
+                i < rdpg->rp_npages && tmpcount > 0; i++, tmpcount -= tmpsize) {
                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
         }
 
-        LASSERT(desc->bd_nob == rdpg->rp_count);
-        rc = sptlrpc_svc_wrap_bulk(req, desc);
-        if (rc)
-                GOTO(free_desc, rc);
-
-        rc = ptlrpc_start_bulk_transfer(desc);
-        if (rc)
-                GOTO(free_desc, rc);
-
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
-                GOTO(abort_bulk, rc = 0);
-
-        do {
-                timeout = (int) req->rq_deadline - cfs_time_current_sec();
-                if (timeout < 0)
-                        CERROR("Req deadline already passed %lu (now: %lu)\n",
-                               req->rq_deadline, cfs_time_current_sec());
-                *lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(max(timeout, 1)),
-                                            cfs_time_seconds(1), NULL, NULL);
-                rc = l_wait_event(desc->bd_waitq,
-                                  !ptlrpc_server_bulk_active(desc) ||
-                                  exp->exp_failed ||
-                                  exp->exp_abort_active_req, lwi);
-                LASSERT (rc == 0 || rc == -ETIMEDOUT);
-        } while ((rc == -ETIMEDOUT) &&
-                 (req->rq_deadline > cfs_time_current_sec()));
-
-        if (rc == 0) {
-                if (desc->bd_success &&
-                    desc->bd_nob_transferred == rdpg->rp_count)
-                        GOTO(free_desc, rc);
-
-                rc = -ETIMEDOUT;
-                if (exp->exp_abort_active_req || exp->exp_failed)
-                        GOTO(abort_bulk, rc);
-        }
-
-        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s",
-                  (rc == -ETIMEDOUT) ? "timeout" : "network error",
-                  desc->bd_nob_transferred, rdpg->rp_count,
-                  exp->exp_client_uuid.uuid,
-                  exp->exp_connection->c_remote_uuid.uuid);
-
-        class_fail_export(exp);
-
-        EXIT;
-abort_bulk:
-        ptlrpc_abort_bulk(desc);
-free_desc:
+        LASSERT(desc->bd_nob == nob);
+        rc = target_bulk_io(exp, desc, lwi);
         ptlrpc_free_bulk(desc);
-        return rc;
+        RETURN(rc);
 }
 
 #ifdef HAVE_SPLIT_SUPPORT
@@ -1489,8 +1453,12 @@ static int mdt_readpage(struct mdt_thread_info *info)
         }
 
         rdpg->rp_attrs = reqbody->mode;
-        rdpg->rp_count  = reqbody->nlink;
-        rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
+        if (info->mti_exp->exp_connect_flags & OBD_CONNECT_64BITHASH)
+                rdpg->rp_attrs |= LUDA_64BITHASH;
+        rdpg->rp_count  = min_t(unsigned int, reqbody->nlink,
+                                PTLRPC_MAX_BRW_SIZE);
+        rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
+                          CFS_PAGE_SHIFT;
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
         if (rdpg->rp_pages == NULL)
                 RETURN(-ENOMEM);
@@ -1503,11 +1471,11 @@ static int mdt_readpage(struct mdt_thread_info *info)
 
         /* call lower layers to fill allocated pages with directory data */
         rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
-        if (rc)
+        if (rc < 0)
                 GOTO(free_rdpg, rc);
 
         /* send pages to client */
-        rc = mdt_sendpage(info, rdpg);
+        rc = mdt_sendpage(info, rdpg, rc);
 
         EXIT;
 free_rdpg:
@@ -1682,6 +1650,7 @@ static int mdt_object_sync(struct mdt_thread_info *info)
 
 static int mdt_sync(struct mdt_thread_info *info)
 {
+        struct ptlrpc_request *req = mdt_info_req(info);
         struct req_capsule *pill = info->mti_pill;
         struct mdt_body *body;
         int rc;
@@ -1729,6 +1698,9 @@ static int mdt_sync(struct mdt_thread_info *info)
                 } else
                         rc = err_serious(rc);
         }
+        if (rc == 0)
+                mdt_counter_incr(req->rq_export, LPROC_MDT_SYNC);
+
         RETURN(rc);
 }
 
@@ -2052,7 +2024,7 @@ static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
                         sptlrpc_svc_ctx_invalidate(req);
         }
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, obd_fail_val);
+        CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
 
         return rc;
 }
@@ -3408,6 +3380,10 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
         }
         rep->lock_policy_res2 = clear_serious(rc);
 
+        if (rep->lock_policy_res2 == -ENOENT &&
+            mdt_get_disposition(rep, DISP_LOOKUP_NEG))
+                rep->lock_policy_res2 = 0;
+
         if (rc == -ENOTCONN || rc == -ENODEV ||
             rc == -EOVERFLOW) { /**< if VBR failure then return error */
                 /*
@@ -4528,14 +4504,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                         CERROR("CMD Operation not allowed in IOP mode\n");
                         GOTO(err_lmi, rc = -EINVAL);
                 }
-                /* Read recovery timeouts */
-                if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_soft)
-                        obd->obd_recovery_timeout =
-                                lsi->lsi_lmd->lmd_recovery_time_soft;
 
-                if (lsi->lsi_lmd && lsi->lsi_lmd->lmd_recovery_time_hard)
-                        obd->obd_recovery_time_hard =
-                                lsi->lsi_lmd->lmd_recovery_time_hard;
+                obd->u.obt.obt_magic = OBT_MAGIC;
         }
 
         cfs_rwlock_init(&m->mdt_sptlrpc_lock);
@@ -4752,15 +4722,21 @@ static int mdt_process_config(const struct lu_env *env,
 
                 /*
                  * For interoperability between 1.8 and 2.0,
-                 * skip old "mdt.group_upcall" param.
                  */
                 {
+                        /* Skip old "mdt.group_upcall" param. */
                         char *param = lustre_cfg_string(cfg, 1);
                         if (param && !strncmp("mdt.group_upcall", param, 16)) {
                                 CWARN("For 1.8 interoperability, skip this"
                                        " mdt.group_upcall. It is obsolete\n");
                                 break;
                         }
+                        /* Rename old "mdt.quota_type" to "mdd.quota_type. */
+                        if (param && !strncmp("mdt.quota_type", param, 14)) {
+                                CWARN("Found old param mdt.quota_type, changed"
+                                      " it to mdd.quota_type.\n");
+                                param[2] = 'd';
+                        }
                 }
 
                 lprocfs_mdt_init_vars(&lvars);
@@ -4810,6 +4786,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                 lu_object_add_top(h, o);
                 o->lo_ops = &mdt_obj_ops;
                 cfs_sema_init(&mo->mot_ioepoch_sem, 1);
+                cfs_sema_init(&mo->mot_lov_sem, 1);
                 RETURN(o);
         } else
                 RETURN(NULL);
@@ -4918,6 +4895,24 @@ static int mdt_connect_internal(struct obd_export *exp,
                 if (!mdt->mdt_som_conf)
                         data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
 
+                if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
+                        data->ocd_brw_size = min(data->ocd_brw_size,
+                               (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
+                        if (data->ocd_brw_size == 0) {
+                                CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
+                                       " ocd_version: %x ocd_grant: %d "
+                                       "ocd_index: %u ocd_brw_size is "
+                                       "unexpectedly zero, network data "
+                                       "corruption? Refusing connection of this"
+                                       " client\n",
+                                       exp->exp_obd->obd_name,
+                                       exp->exp_client_uuid.uuid,
+                                       exp, data->ocd_connect_flags, data->ocd_version,
+                                       data->ocd_grant, data->ocd_index);
+                                return -EPROTO;
+                        }
+                }
+
                 cfs_spin_lock(&exp->exp_lock);
                 exp->exp_connect_flags = data->ocd_connect_flags;
                 cfs_spin_unlock(&exp->exp_lock);
@@ -5028,6 +5023,9 @@ static int mdt_obd_connect(const struct lu_env *env,
         if (rc)
                 GOTO(out, rc);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_RCVG_FLAG))
+                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
+
         rc = mdt_connect_internal(lexp, mdt, data);
         if (rc == 0) {
                 struct mdt_thread_info *mti;
@@ -5296,11 +5294,11 @@ static int mdt_upcall(const struct lu_env *env, struct md_device *md,
         RETURN(rc);
 }
 
-static int mdt_obd_notify(struct obd_device *host,
+static int mdt_obd_notify(struct obd_device *obd,
                           struct obd_device *watched,
                           enum obd_notify_event ev, void *data)
 {
-        struct mdt_device *mdt = mdt_dev(host->obd_lu_dev);
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
 #ifdef HAVE_QUOTA_SUPPORT
         struct md_device *next = mdt->mdt_child;
 #endif
@@ -5458,6 +5456,7 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
         struct mdt_lock_handle  *lh;
         int rc;
         ENTRY;
+
         CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
         if (!fid_is_sane(fid))
                 RETURN(-EINVAL);
@@ -5477,6 +5476,9 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
                  * fid, this is error to find remote object here
                  */
                 CERROR("nonlocal object "DFID"\n", PFID(fid));
+        } else if (rc == 0) {
+                 *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
+                rc = -ENOENT;
         } else {
                 version = mo_version_get(mti->mti_env, mdt_object_child(obj));
                *(__u64 *)data->ioc_inlbuf2 = version;