Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index 42a6e78..ffdc67c 100644 (file)
@@ -1776,6 +1776,108 @@ struct mdt_object *mdt_object_find(const struct lu_env *env,
         RETURN(m);
 }
 
+/**
+ * Asyncronous commit for mdt device.
+ *
+ * Pass asynchonous commit call down the MDS stack.
+ *
+ * \param env environment
+ * \param mdt the mdt device
+ */
+static void mdt_device_commit_async(const struct lu_env *env,
+                                    struct mdt_device *mdt)
+{
+        struct dt_device *dt = mdt->mdt_bottom;
+        int rc;
+
+        rc = dt->dd_ops->dt_commit_async(env, dt);
+        if (unlikely(rc != 0))
+                CWARN("async commit start failed with rc = %d", rc);
+}
+
+/**
+ * Mark the lock as "synchonous".
+ *
+ * Mark the lock to deffer transaction commit to the unlock time.
+ *
+ * \param lock the lock to mark as "synchonous"
+ *
+ * \see mdt_is_lock_sync
+ * \see mdt_save_lock
+ */
+static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
+{
+        lock->l_ast_data = (void*)1;
+}
+
+/**
+ * Check whehter the lock "synchonous" or not.
+ *
+ * \param lock the lock to check
+ * \retval 1 the lock is "synchonous"
+ * \retval 0 the lock isn't "synchronous"
+ *
+ * \see mdt_set_lock_sync
+ * \see mdt_save_lock
+ */
+static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
+{
+        return lock->l_ast_data != NULL;
+}
+
+/**
+ * Blocking AST for mdt locks.
+ *
+ * Starts transaction commit if in case of COS lock conflict or
+ * deffers such a commit to the mdt_save_lock.
+ *
+ * \param lock the lock which blocks a request or cancelling lock
+ * \param desc unused
+ * \param data unused
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval 0
+ * \see ldlm_blocking_ast_nocheck
+ */
+int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                     void *data, int flag)
+{
+        struct obd_device *obd = lock->l_resource->lr_namespace->ns_obd;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+        int rc;
+        ENTRY;
+
+        if (flag == LDLM_CB_CANCELING)
+                RETURN(0);
+        lock_res_and_lock(lock);
+        if (lock->l_blocking_ast != mdt_blocking_ast) {
+                unlock_res_and_lock(lock);
+                RETURN(0);
+        }
+        if (mdt_cos_is_enabled(mdt) &&
+            lock->l_req_mode & (LCK_PW | LCK_EX) &&
+            lock->l_blocking_lock != NULL &&
+            lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) {
+                mdt_set_lock_sync(lock);
+        }
+        rc = ldlm_blocking_ast_nocheck(lock);
+
+        /* There is no lock conflict if l_blocking_lock == NULL,
+         * it indicates a blocking ast sent from ldlm_lock_decref_internal
+         * when the last reference to a local lock was released */
+        if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
+                struct lu_env env;
+
+                rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+                if (unlikely(rc != 0))
+                        CWARN("lu_env initialization failed with rc = %d,"
+                              "cannot start asynchronous commit\n", rc);
+                else
+                        mdt_device_commit_async(&env, mdt);
+                lu_env_fini(&env);
+        }
+        RETURN(rc);
+}
+
 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                     struct mdt_lock_handle *lh, __u64 ibits, int locality)
 {
@@ -1832,7 +1934,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                          */
                         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
                         rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
-                                          policy, res_id, LDLM_FL_ATOMIC_CB);
+                                          policy, res_id, LDLM_FL_ATOMIC_CB,
+                                          &info->mti_exp->exp_handle.h_cookie);
                         if (unlikely(rc))
                                 RETURN(rc);
                 }
@@ -1852,8 +1955,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
          * fix it up and turn FL_LOCAL flag off.
          */
         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
-                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
-
+                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+                          &info->mti_exp->exp_handle.h_cookie);
         if (rc)
                 GOTO(out, rc);
 
@@ -1865,36 +1968,79 @@ out:
         RETURN(rc);
 }
 
-static inline
-void mdt_save_lock(struct ptlrpc_request *req, struct lustre_handle *h,
+/**
+ * Save a lock within request object.
+ *
+ * Keep the lock referenced until whether client ACK or transaction
+ * commit happens or release the lock immediately depending on input
+ * parameters. If COS is ON, a write lock is converted to COS lock
+ * before saving.
+ *
+ * \param info thead info object
+ * \param h lock handle
+ * \param mode lock mode
+ * \param decref force immediate lock releasing
+ */
+static
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                    ldlm_mode_t mode, int decref)
 {
         ENTRY;
 
         if (lustre_handle_is_used(h)) {
-                if (decref)
+                if (decref || !info->mti_has_trans ||
+                    !(mode & (LCK_PW | LCK_EX))){
                         mdt_fid_unlock(h, mode);
-                else
-                        ptlrpc_save_lock(req, h, mode);
+                } else {
+                        struct mdt_device *mdt = info->mti_mdt;
+                        struct ldlm_lock *lock = ldlm_handle2lock(h);
+                        struct ptlrpc_request *req = mdt_info_req(info);
+                        int no_ack = 0;
+
+                        LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
+                                 h->cookie);
+                        CDEBUG(D_HA, "request = %p reply state = %p"
+                               " transno = "LPD64"\n",
+                               req, req->rq_reply_state, req->rq_transno);
+                        if (mdt_cos_is_enabled(mdt)) {
+                                no_ack = 1;
+                                ldlm_lock_downgrade(lock, LCK_COS);
+                                mode = LCK_COS;
+                        }
+                        ptlrpc_save_lock(req, h, mode, no_ack);
+                        if (mdt_is_lock_sync(lock)) {
+                                CDEBUG(D_HA, "found sync-lock,"
+                                       " async commit started\n");
+                                mdt_device_commit_async(info->mti_env,
+                                                        mdt);
+                        }
+                        LDLM_LOCK_PUT(lock);
+                }
                 h->cookie = 0ull;
         }
 
         EXIT;
 }
 
-/*
- * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
- * to save this lock in req.  when transaction committed, req will be released,
- * and lock will, too.
+/**
+ * Unlock mdt object.
+ *
+ * Immeditely release the regular lock and the PDO lock or save the
+ * lock in reqeuest and keep them referenced until client ACK or
+ * transaction commit.
+ *
+ * \param info thread info object
+ * \param o mdt object
+ * \param h mdt lock handle referencing regular and PDO locks
+ * \param decref force immediate lock releasing
  */
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
                        struct mdt_lock_handle *lh, int decref)
 {
-        struct ptlrpc_request *req = mdt_info_req(info);
         ENTRY;
 
-        mdt_save_lock(req, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
-        mdt_save_lock(req, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+        mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+        mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
 
         EXIT;
 }
@@ -3388,7 +3534,7 @@ static void mdt_stop_ptlrpc_service(struct mdt_device *m)
                 ptlrpc_unregister_service(m->mdt_fld_service);
                 m->mdt_fld_service = NULL;
         }
-        ENTRY;
+        EXIT;
 }
 
 static int mdt_start_ptlrpc_service(struct mdt_device *m)
@@ -3944,6 +4090,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         m->mdt_opts.mo_user_xattr = 0;
         m->mdt_opts.mo_acl = 0;
+        m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
         lmi = server_get_mount_2(dev);
         if (lmi == NULL) {
                 CERROR("Cannot get mount info for %s!\n", dev);
@@ -4715,7 +4862,6 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 rc = mdt_device_sync(&env, mdt);
                 break;
         case OBD_IOC_SET_READONLY:
-                rc = dt->dd_ops->dt_sync(&env, dt);
                 dt->dd_ops->dt_ro(&env, dt);
                 break;
         case OBD_IOC_ABORT_RECOVERY:
@@ -4837,6 +4983,42 @@ struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
         return md_ucred(info->mti_env);
 }
 
+/**
+ * Enable/disable COS.
+ *
+ * Set/Clear the COS flag in mdt options.
+ *
+ * \param mdt mdt device
+ * \param val 0 disables COS, other values enable COS
+ */
+void mdt_enable_cos(struct mdt_device *mdt, int val)
+{
+        struct lu_env env;
+        int rc;
+
+        mdt->mdt_opts.mo_cos = !!val;
+        rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+        if (unlikely(rc != 0)) {
+                CWARN("lu_env initialization failed with rc = %d,"
+                      "cannot sync\n", rc);
+                return;
+        }
+        mdt_device_sync(&env, mdt);
+        lu_env_fini(&env);
+}
+
+/**
+ * Check COS status.
+ *
+ * Return COS flag status/
+ *
+ * \param mdt mdt device
+ */
+int mdt_cos_is_enabled(struct mdt_device *mdt)
+{
+        return mdt->mdt_opts.mo_cos != 0;
+}
+
 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
 LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);