RETURN(m);
}
+/**
+ * Asyncronous commit for mdt device.
+ *
+ * Pass asynchonous commit call down the MDS stack.
+ *
+ * \param env environment
+ * \param mdt the mdt device
+ */
+static void mdt_device_commit_async(const struct lu_env *env,
+ struct mdt_device *mdt)
+{
+ struct dt_device *dt = mdt->mdt_bottom;
+ int rc;
+
+ rc = dt->dd_ops->dt_commit_async(env, dt);
+ if (unlikely(rc != 0))
+ CWARN("async commit start failed with rc = %d", rc);
+}
+
+/**
+ * Mark the lock as "synchonous".
+ *
+ * Mark the lock to deffer transaction commit to the unlock time.
+ *
+ * \param lock the lock to mark as "synchonous"
+ *
+ * \see mdt_is_lock_sync
+ * \see mdt_save_lock
+ */
+static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
+{
+ lock->l_ast_data = (void*)1;
+}
+
+/**
+ * Check whehter the lock "synchonous" or not.
+ *
+ * \param lock the lock to check
+ * \retval 1 the lock is "synchonous"
+ * \retval 0 the lock isn't "synchronous"
+ *
+ * \see mdt_set_lock_sync
+ * \see mdt_save_lock
+ */
+static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
+{
+ return lock->l_ast_data != NULL;
+}
+
+/**
+ * Blocking AST for mdt locks.
+ *
+ * Starts transaction commit if in case of COS lock conflict or
+ * deffers such a commit to the mdt_save_lock.
+ *
+ * \param lock the lock which blocks a request or cancelling lock
+ * \param desc unused
+ * \param data unused
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval 0
+ * \see ldlm_blocking_ast_nocheck
+ */
+int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ struct obd_device *obd = lock->l_resource->lr_namespace->ns_obd;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ int rc;
+ ENTRY;
+
+ if (flag == LDLM_CB_CANCELING)
+ RETURN(0);
+ lock_res_and_lock(lock);
+ if (lock->l_blocking_ast != mdt_blocking_ast) {
+ unlock_res_and_lock(lock);
+ RETURN(0);
+ }
+ if (mdt_cos_is_enabled(mdt) &&
+ lock->l_req_mode & (LCK_PW | LCK_EX) &&
+ lock->l_blocking_lock != NULL &&
+ lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) {
+ mdt_set_lock_sync(lock);
+ }
+ rc = ldlm_blocking_ast_nocheck(lock);
+
+ /* There is no lock conflict if l_blocking_lock == NULL,
+ * it indicates a blocking ast sent from ldlm_lock_decref_internal
+ * when the last reference to a local lock was released */
+ if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
+ struct lu_env env;
+
+ rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+ if (unlikely(rc != 0))
+ CWARN("lu_env initialization failed with rc = %d,"
+ "cannot start asynchronous commit\n", rc);
+ else
+ mdt_device_commit_async(&env, mdt);
+ lu_env_fini(&env);
+ }
+ RETURN(rc);
+}
+
int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, __u64 ibits, int locality)
{
*/
policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
- policy, res_id, LDLM_FL_ATOMIC_CB);
+ policy, res_id, LDLM_FL_ATOMIC_CB,
+ &info->mti_exp->exp_handle.h_cookie);
if (unlikely(rc))
RETURN(rc);
}
* fix it up and turn FL_LOCAL flag off.
*/
rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
- res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
-
+ res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+ &info->mti_exp->exp_handle.h_cookie);
if (rc)
GOTO(out, rc);
RETURN(rc);
}
-static inline
-void mdt_save_lock(struct ptlrpc_request *req, struct lustre_handle *h,
+/**
+ * Save a lock within request object.
+ *
+ * Keep the lock referenced until whether client ACK or transaction
+ * commit happens or release the lock immediately depending on input
+ * parameters. If COS is ON, a write lock is converted to COS lock
+ * before saving.
+ *
+ * \param info thead info object
+ * \param h lock handle
+ * \param mode lock mode
+ * \param decref force immediate lock releasing
+ */
+static
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
ldlm_mode_t mode, int decref)
{
ENTRY;
if (lustre_handle_is_used(h)) {
- if (decref)
+ if (decref || !info->mti_has_trans ||
+ !(mode & (LCK_PW | LCK_EX))){
mdt_fid_unlock(h, mode);
- else
- ptlrpc_save_lock(req, h, mode);
+ } else {
+ struct mdt_device *mdt = info->mti_mdt;
+ struct ldlm_lock *lock = ldlm_handle2lock(h);
+ struct ptlrpc_request *req = mdt_info_req(info);
+ int no_ack = 0;
+
+ LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
+ h->cookie);
+ CDEBUG(D_HA, "request = %p reply state = %p"
+ " transno = "LPD64"\n",
+ req, req->rq_reply_state, req->rq_transno);
+ if (mdt_cos_is_enabled(mdt)) {
+ no_ack = 1;
+ ldlm_lock_downgrade(lock, LCK_COS);
+ mode = LCK_COS;
+ }
+ ptlrpc_save_lock(req, h, mode, no_ack);
+ if (mdt_is_lock_sync(lock)) {
+ CDEBUG(D_HA, "found sync-lock,"
+ " async commit started\n");
+ mdt_device_commit_async(info->mti_env,
+ mdt);
+ }
+ LDLM_LOCK_PUT(lock);
+ }
h->cookie = 0ull;
}
EXIT;
}
-/*
- * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
- * to save this lock in req. when transaction committed, req will be released,
- * and lock will, too.
+/**
+ * Unlock mdt object.
+ *
+ * Immeditely release the regular lock and the PDO lock or save the
+ * lock in reqeuest and keep them referenced until client ACK or
+ * transaction commit.
+ *
+ * \param info thread info object
+ * \param o mdt object
+ * \param h mdt lock handle referencing regular and PDO locks
+ * \param decref force immediate lock releasing
*/
void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, int decref)
{
- struct ptlrpc_request *req = mdt_info_req(info);
ENTRY;
- mdt_save_lock(req, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
- mdt_save_lock(req, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+ mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+ mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
EXIT;
}
ptlrpc_unregister_service(m->mdt_fld_service);
m->mdt_fld_service = NULL;
}
- ENTRY;
+ EXIT;
}
static int mdt_start_ptlrpc_service(struct mdt_device *m)
m->mdt_opts.mo_user_xattr = 0;
m->mdt_opts.mo_acl = 0;
+ m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
lmi = server_get_mount_2(dev);
if (lmi == NULL) {
CERROR("Cannot get mount info for %s!\n", dev);
rc = mdt_device_sync(&env, mdt);
break;
case OBD_IOC_SET_READONLY:
- rc = dt->dd_ops->dt_sync(&env, dt);
dt->dd_ops->dt_ro(&env, dt);
break;
case OBD_IOC_ABORT_RECOVERY:
return md_ucred(info->mti_env);
}
+/**
+ * Enable/disable COS.
+ *
+ * Set/Clear the COS flag in mdt options.
+ *
+ * \param mdt mdt device
+ * \param val 0 disables COS, other values enable COS
+ */
+void mdt_enable_cos(struct mdt_device *mdt, int val)
+{
+ struct lu_env env;
+ int rc;
+
+ mdt->mdt_opts.mo_cos = !!val;
+ rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+ if (unlikely(rc != 0)) {
+ CWARN("lu_env initialization failed with rc = %d,"
+ "cannot sync\n", rc);
+ return;
+ }
+ mdt_device_sync(&env, mdt);
+ lu_env_fini(&env);
+}
+
+/**
+ * Check COS status.
+ *
+ * Return COS flag status/
+ *
+ * \param mdt mdt device
+ */
+int mdt_cos_is_enabled(struct mdt_device *mdt)
+{
+ return mdt->mdt_opts.mo_cos != 0;
+}
+
/* type constructor/destructor: mdt_type_init, mdt_type_fini */
LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);