Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 18bec0d..1446971 100644 (file)
@@ -40,7 +40,6 @@
 #include <linux/module.h>
 /*
  * struct OBD_{ALLOC,FREE}*()
- * MDT_FAIL_CHECK
  */
 #include <obd_support.h>
 /* struct ptlrpc_request */
@@ -286,7 +285,7 @@ static int mdt_getstatus(struct mdt_thread_info *info)
         if (rc)
                 RETURN(err_serious(rc));
 
-        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
                 RETURN(err_serious(-ENOMEM));
 
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
@@ -334,7 +333,7 @@ static int mdt_statfs(struct mdt_thread_info *info)
         if (rc)
                 RETURN(err_serious(rc));
 
-        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
                 rc = err_serious(-ENOMEM);
         } else {
                 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
@@ -419,7 +418,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
         int                     rc;
         ENTRY;
 
-        if (unlikely(MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)))
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
                 RETURN(err_serious(-ENOMEM));
 
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
@@ -1121,7 +1120,7 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         if (rc)
                 GOTO(free_desc, rc);
 
-        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
                 GOTO(abort_bulk, rc);
 
         *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
@@ -1263,7 +1262,7 @@ static int mdt_writepage(struct mdt_thread_info *info)
                 RETURN(err_serious(-ENOMEM));
 
         /* allocate the page for the desc */
-        page = alloc_pages(GFP_KERNEL, 0);
+        page = cfs_alloc_page(CFS_ALLOC_STD);
         if (page == NULL)
                 GOTO(desc_cleanup, rc = -ENOMEM);
 
@@ -1317,7 +1316,7 @@ static int mdt_writepage(struct mdt_thread_info *info)
 cleanup_lwi:
         OBD_FREE_PTR(lwi);
 cleanup_page:
-        __free_pages(page, 0);
+        __cfs_free_page(page);
 desc_cleanup:
         ptlrpc_free_bulk(desc);
         RETURN(rc);
@@ -1334,7 +1333,7 @@ static int mdt_readpage(struct mdt_thread_info *info)
         int                i;
         ENTRY;
 
-        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
                 RETURN(err_serious(-ENOMEM));
 
         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
@@ -1364,7 +1363,7 @@ static int mdt_readpage(struct mdt_thread_info *info)
                 RETURN(-ENOMEM);
 
         for (i = 0; i < rdpg->rp_npages; ++i) {
-                rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
+                rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
                 if (rdpg->rp_pages[i] == NULL)
                         GOTO(free_rdpg, rc = -ENOMEM);
         }
@@ -1382,10 +1381,10 @@ free_rdpg:
 
         for (i = 0; i < rdpg->rp_npages; i++)
                 if (rdpg->rp_pages[i] != NULL)
-                        __free_pages(rdpg->rp_pages[i], 0);
+                        __cfs_free_page(rdpg->rp_pages[i]);
         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
 
-        MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
+        OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
 
         return rc;
 }
@@ -1396,7 +1395,6 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
 {
         struct req_capsule      *pill = &info->mti_pill;
         struct mdt_device       *mdt = info->mti_mdt;
-        struct ptlrpc_request   *req = mdt_info_req(info);
         struct mdt_body         *repbody;
         int                      need_shrink = 0;
         int                      rc;
@@ -1426,7 +1424,7 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                 repbody->aclsize = 0;
         }
 
-        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
                 GOTO(out_shrink, rc = err_serious(-EFAULT));
 
         rc = mdt_reint_unpack(info, op);
@@ -1443,21 +1441,12 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
         if (rc != 0)
                 GOTO(out_ucred, rc = err_serious(rc));
 
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
-                struct mdt_client_data *mcd;
-
-                mcd = req->rq_export->exp_mdt_data.med_mcd;
-                if (req_xid_is_last(req)) {
-                        need_shrink = 0;
-                        mdt_reconstruct(info, lhc);
-                        rc = lustre_msg_get_status(req->rq_repmsg);
-                        GOTO(out_ucred, rc);
-                }
-                DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
-                          mcd->mcd_last_xid);
-        }
-
         need_shrink = 0;
+        if (mdt_check_resent(info, mdt_reconstruct, lhc)) {
+                rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
+                GOTO(out_ucred, rc);
+        }
+        
         rc = mdt_reint_rec(info, lhc);
         EXIT;
 out_ucred:
@@ -1548,7 +1537,7 @@ static int mdt_sync(struct mdt_thread_info *info)
         if (body == NULL)
                 RETURN(err_serious(-EINVAL));
 
-        if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
                 RETURN(err_serious(-ENOMEM));
 
         if (fid_seq(&body->fid1) == 0) {
@@ -1654,7 +1643,9 @@ static int mdt_enqueue(struct mdt_thread_info *info)
          * bits get corrupted somewhere in mdt_intent_policy().
          */
         req_bits = info->mti_dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
-        LASSERT(req_bits != 0);
+        /* This is disabled because we need to support liblustre flock.
+         * LASSERT(req_bits != 0);
+         */
 
         rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
                                   req, info->mti_dlm_req, &cbs);
@@ -2034,7 +2025,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
         LASSERT(current->journal_info == NULL);
 
         /*
-         * Do not use *_FAIL_CHECK_ONCE() macros, because they will stop
+         * Mask out OBD_FAIL_ONCE, because that will stop
          * correct handling of failed req later in ldlm due to doing
          * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
          * correct actions like it is done in target_send_reply_msg().
@@ -2044,7 +2035,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
                  * Set to info->mti_fail_id to handler fail_id, it will be used
                  * later, and better than use default fail_id.
                  */
-                if (OBD_FAIL_CHECK(h->mh_fail_id)) {
+                if (OBD_FAIL_CHECK(h->mh_fail_id && OBD_FAIL_MASK_LOC)) {
                         info->mti_fail_id = h->mh_fail_id;
                         RETURN(0);
                 }
@@ -2174,7 +2165,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
         if (req->rq_export) {
                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
                 info->mti_exp = req->rq_export;
-        }         else
+        } else
                 info->mti_mdt = NULL;
         info->mti_env = req->rq_svc_thread->t_env;
         ci = md_capainfo(info->mti_env);
@@ -2330,7 +2321,7 @@ static int mdt_handle0(struct ptlrpc_request *req,
 
         ENTRY;
 
-        MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
+        OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
 
         LASSERT(current->journal_info == NULL);
 
@@ -2633,7 +2624,8 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
                         lh->mlh_reg_mode = lock->l_granted_mode;
 
                         LDLM_DEBUG(lock, "restoring lock cookie");
-                        DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+                        DEBUG_REQ(D_DLMTRACE, req,
+                                  "restoring lock cookie "LPX64,
                                   lh->mlh_reg_lh.cookie);
                         if (old_lock)
                                 *old_lock = LDLM_LOCK_GET(lock);
@@ -2657,7 +2649,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
          */
         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
 
-        DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+        DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
                   remote_hdl.cookie);
 }
 
@@ -3882,7 +3874,8 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
                  LUSTRE_MDT_NAME"-%p", m);
         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
-                                              LDLM_NAMESPACE_SERVER);
+                                              LDLM_NAMESPACE_SERVER,
+                                              LDLM_NAMESPACE_GREEDY);
         if (m->mdt_namespace == NULL)
                 GOTO(err_fini_seq, rc = -ENOMEM);
 
@@ -4367,6 +4360,18 @@ out:
         return rc;
 }
 
+static void mdt_allow_cli(struct mdt_device *m, unsigned int flag)
+{
+        if (flag & CONFIG_LOG)
+                m->mdt_fl_cfglog = 1;
+        if (flag & CONFIG_SYNC)
+                m->mdt_fl_synced = 1;
+
+        if (m->mdt_fl_cfglog /* bz11778: && m->mdt_fl_synced */)
+                /* Open for clients */
+                m->mdt_md_dev.md_lu_dev.ld_obd->obd_no_conn = 0;
+}
+
 static int mdt_upcall(const struct lu_env *env, struct md_device *md,
                       enum md_upcall_event ev)
 {
@@ -4383,12 +4388,17 @@ static int mdt_upcall(const struct lu_env *env, struct md_device *md,
                                         &m->mdt_max_cookiesize);
                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
+                        mdt_allow_cli(m, CONFIG_SYNC);
                         break;
                 case MD_NO_TRANS:
                         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
                         mti->mti_no_need_trans = 1;
                         CDEBUG(D_INFO, "disable mdt trans for this thread\n");
                         break;
+                case MD_LOV_CONFIG:
+                        /* Check that MDT is not yet configured */
+                        LASSERT(!m->mdt_fl_cfglog);
+                        break;
                 default:
                         CERROR("invalid event\n");
                         rc = -EINVAL;
@@ -4399,15 +4409,16 @@ static int mdt_upcall(const struct lu_env *env, struct md_device *md,
 
 static int mdt_obd_notify(struct obd_device *host,
                           struct obd_device *watched,
-                          enum obd_notify_event ev, void *owner)
+                          enum obd_notify_event ev, void *data)
 {
         ENTRY;
 
         switch (ev) {
         case OBD_NOTIFY_CONFIG:
-                host->obd_no_conn = 0;
+                mdt_allow_cli(mdt_dev(host->obd_lu_dev), (unsigned int)data);
+                break;
         default:
-                CDEBUG(D_INFO, "Notification 0x%x\n", ev);
+                CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
         }
         RETURN(0);
 }
@@ -4531,59 +4542,14 @@ static struct lu_device *mdt_device_alloc(const struct lu_env *env,
         return l;
 }
 
-/*
- * context key constructor/destructor
- */
-static void *mdt_key_init(const struct lu_context *ctx,
-                          struct lu_context_key *key)
-{
-        struct mdt_thread_info *info;
-
-        /*
-         * check that no high order allocations are incurred.
-         */
-        CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
-        OBD_ALLOC_PTR(info);
-        if (info == NULL)
-                info = ERR_PTR(-ENOMEM);
-        return info;
-}
-
-static void mdt_key_fini(const struct lu_context *ctx,
-                         struct lu_context_key *key, void *data)
-{
-        struct mdt_thread_info *info = data;
-        OBD_FREE_PTR(info);
-}
-
-struct lu_context_key mdt_thread_key = {
-        .lct_tags = LCT_MD_THREAD,
-        .lct_init = mdt_key_init,
-        .lct_fini = mdt_key_fini
-};
+/* context key constructor/destructor: mdt_key_init, mdt_key_fini */
+LU_KEY_INIT_FINI(mdt, struct mdt_thread_info);
 
-static void *mdt_txn_key_init(const struct lu_context *ctx,
-                              struct lu_context_key *key)
-{
-        struct mdt_txn_info *txi;
+/* context key: mdt_thread_key */
+LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD);
 
-        /*
-         * check that no high order allocations are incurred.
-         */
-        CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
-        OBD_ALLOC_PTR(txi);
-        if (txi == NULL)
-                txi = ERR_PTR(-ENOMEM);
-        memset(txi, 0, sizeof(*txi));
-        return txi;
-}
-
-static void mdt_txn_key_fini(const struct lu_context *ctx,
-                             struct lu_context_key *key, void *data)
-{
-        struct mdt_txn_info *txi = data;
-        OBD_FREE_PTR(txi);
-}
+/* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
+LU_KEY_INIT_FINI(mdt_txn, struct mdt_txn_info);
 
 struct lu_context_key mdt_txn_key = {
         .lct_tags = LCT_TX_HANDLE,
@@ -4596,24 +4562,8 @@ struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
         return md_ucred(info->mti_env);
 }
 
-static int mdt_type_init(struct lu_device_type *t)
-{
-        int rc;
-
-        LU_CONTEXT_KEY_INIT(&mdt_thread_key);
-        rc = lu_context_key_register(&mdt_thread_key);
-        if (rc == 0) {
-                LU_CONTEXT_KEY_INIT(&mdt_txn_key);
-                rc = lu_context_key_register(&mdt_txn_key);
-        }
-        return rc;
-}
-
-static void mdt_type_fini(struct lu_device_type *t)
-{
-        lu_context_key_degister(&mdt_thread_key);
-        lu_context_key_degister(&mdt_txn_key);
-}
+/* type constructor/destructor: mdt_type_init, mdt_type_fini */
+LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);
 
 static struct lu_device_type_operations mdt_device_type_ops = {
         .ldto_init = mdt_type_init,