* \see ldlm_blocking_ast_nocheck
*/
int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
+ void *data, int flag)
{
- struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
- struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
- int rc;
- ENTRY;
+ struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ bool commit_async = false;
+ int rc;
+ ENTRY;
- if (flag == LDLM_CB_CANCELING)
- RETURN(0);
+ if (flag == LDLM_CB_CANCELING)
+ RETURN(0);
- lock_res_and_lock(lock);
- if (lock->l_blocking_ast != mdt_blocking_ast) {
- unlock_res_and_lock(lock);
- RETURN(0);
- }
+ lock_res_and_lock(lock);
+ if (lock->l_blocking_ast != mdt_blocking_ast) {
+ unlock_res_and_lock(lock);
+ RETURN(0);
+ }
+ /* There is no lock conflict if l_blocking_lock == NULL,
+ * it indicates a blocking ast sent from ldlm_lock_decref_internal
+ * when the last reference to a local lock was released */
if (lock->l_req_mode & (LCK_PW | LCK_EX) &&
lock->l_blocking_lock != NULL) {
- if (mdt_cos_is_enabled(mdt) &&
- lock->l_client_cookie !=
- lock->l_blocking_lock->l_client_cookie)
- mdt_set_lock_sync(lock);
- else if (mdt_slc_is_enabled(mdt) &&
- ldlm_is_cos_incompat(lock->l_blocking_lock))
+ if (mdt_cos_is_enabled(mdt)) {
+ if (lock->l_client_cookie !=
+ lock->l_blocking_lock->l_client_cookie)
+ mdt_set_lock_sync(lock);
+ } else if (mdt_slc_is_enabled(mdt) &&
+ ldlm_is_cos_incompat(lock->l_blocking_lock)) {
mdt_set_lock_sync(lock);
+ /*
+ * we may do extra commit here, but there is a small
+ * window to miss a commit: lock was unlocked (saved),
+ * then a conflict lock queued and we come here, but
+ * REP-ACK not received, so lock was not converted to
+ * COS mode yet.
+ * Fortunately this window is quite small, so the
+ * extra commit should be rare (not to say distributed
+ * operation is rare too).
+ */
+ commit_async = true;
+ }
+ } else if (lock->l_req_mode == LCK_COS &&
+ lock->l_blocking_lock != NULL) {
+ commit_async = true;
}
- rc = ldlm_blocking_ast_nocheck(lock);
- /* There is no lock conflict if l_blocking_lock == NULL,
- * it indicates a blocking ast sent from ldlm_lock_decref_internal
- * when the last reference to a local lock was released */
- if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
- struct lu_env env;
+ rc = ldlm_blocking_ast_nocheck(lock);
+
+ if (commit_async) {
+ struct lu_env env;
rc = lu_env_init(&env, LCT_LOCAL);
if (unlikely(rc != 0))
CWARN("%s: lu_env initialization failed, cannot "
"start asynchronous commit: rc = %d\n",
obd->obd_name, rc);
- else
- mdt_device_commit_async(&env, mdt);
- lu_env_fini(&env);
- }
- RETURN(rc);
+ else
+ mdt_device_commit_async(&env, mdt);
+ lu_env_fini(&env);
+ }
+ RETURN(rc);
}
/*
struct mdt_device *mdt = info->mti_mdt;
struct ldlm_lock *lock = ldlm_handle2lock(h);
struct ptlrpc_request *req = mdt_info_req(info);
- int cos;
-
- cos = (mdt_cos_is_enabled(mdt) ||
- mdt_slc_is_enabled(mdt));
+ bool cos = mdt_cos_is_enabled(mdt);
+ bool convert_lock = !cos && mdt_slc_is_enabled(mdt);
LASSERTF(lock != NULL, "no lock for cookie %#llx\n",
h->cookie);
/* there is no request if mdt_object_unlock() is called
* from mdt_export_cleanup()->mdt_add_dirty_flag() */
if (likely(req != NULL)) {
- CDEBUG(D_HA, "request = %p reply state = %p"
- " transno = %lld\n", req,
- req->rq_reply_state, req->rq_transno);
+ LDLM_DEBUG(lock, "save lock request %p reply "
+ "state %p transno %lld\n", req,
+ req->rq_reply_state, req->rq_transno);
if (cos) {
ldlm_lock_downgrade(lock, LCK_COS);
mode = LCK_COS;
}
- ptlrpc_save_lock(req, h, mode, cos);
+ ptlrpc_save_lock(req, h, mode, cos,
+ convert_lock);
} else {
mdt_fid_unlock(h, mode);
}
* Puts a lock and its mode into reply state assotiated to request reply.
*/
void
-ptlrpc_save_lock(struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode, int no_ack)
+ptlrpc_save_lock(struct ptlrpc_request *req, struct lustre_handle *lock,
+ int mode, bool no_ack, bool convert_lock)
{
- struct ptlrpc_reply_state *rs = req->rq_reply_state;
- int idx;
+ struct ptlrpc_reply_state *rs = req->rq_reply_state;
+ int idx;
- LASSERT(rs != NULL);
- LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
+ LASSERT(rs != NULL);
+ LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
- if (req->rq_export->exp_disconnected) {
- ldlm_lock_decref(lock, mode);
- } else {
- idx = rs->rs_nlocks++;
- rs->rs_locks[idx] = *lock;
- rs->rs_modes[idx] = mode;
- rs->rs_difficult = 1;
- rs->rs_no_ack = !!no_ack;
- }
+ if (req->rq_export->exp_disconnected) {
+ ldlm_lock_decref(lock, mode);
+ } else {
+ idx = rs->rs_nlocks++;
+ rs->rs_locks[idx] = *lock;
+ rs->rs_modes[idx] = mode;
+ rs->rs_difficult = 1;
+ rs->rs_no_ack = no_ack;
+ rs->rs_convert_lock = convert_lock;
+ }
}
EXPORT_SYMBOL(ptlrpc_save_lock);
int hrt_id; /* thread ID */
spinlock_t hrt_lock;
wait_queue_head_t hrt_waitq;
- struct list_head hrt_queue; /* RS queue */
+ struct list_head hrt_queue;
struct ptlrpc_hr_partition *hrt_partition;
};
{
struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
- struct obd_export *exp;
- int nlocks;
- int been_handled;
- ENTRY;
+ struct obd_export *exp;
+ int nlocks;
+ int been_handled;
+ ENTRY;
exp = rs->rs_export;
LASSERT(rs->rs_scheduled);
LASSERT(list_empty(&rs->rs_list));
- spin_lock(&exp->exp_lock);
- /* Noop if removed already */
- list_del_init(&rs->rs_exp_list);
- spin_unlock(&exp->exp_lock);
-
- /* The disk commit callback holds exp_uncommitted_replies_lock while it
- * iterates over newly committed replies, removing them from
- * exp_uncommitted_replies. It then drops this lock and schedules the
- * replies it found for handling here.
- *
- * We can avoid contention for exp_uncommitted_replies_lock between the
- * HRT threads and further commit callbacks by checking rs_committed
- * which is set in the commit callback while it holds both
- * rs_lock and exp_uncommitted_reples.
- *
- * If we see rs_committed clear, the commit callback _may_ not have
- * handled this reply yet and we race with it to grab
- * exp_uncommitted_replies_lock before removing the reply from
- * exp_uncommitted_replies. Note that if we lose the race and the
- * reply has already been removed, list_del_init() is a noop.
- *
- * If we see rs_committed set, we know the commit callback is handling,
- * or has handled this reply since store reordering might allow us to
- * see rs_committed set out of sequence. But since this is done
- * holding rs_lock, we can be sure it has all completed once we hold
- * rs_lock, which we do right next.
- */
+ /* The disk commit callback holds exp_uncommitted_replies_lock while it
+ * iterates over newly committed replies, removing them from
+ * exp_uncommitted_replies. It then drops this lock and schedules the
+ * replies it found for handling here.
+ *
+ * We can avoid contention for exp_uncommitted_replies_lock between the
+ * HRT threads and further commit callbacks by checking rs_committed
+ * which is set in the commit callback while it holds both
+ * rs_lock and exp_uncommitted_reples.
+ *
+ * If we see rs_committed clear, the commit callback _may_ not have
+ * handled this reply yet and we race with it to grab
+ * exp_uncommitted_replies_lock before removing the reply from
+ * exp_uncommitted_replies. Note that if we lose the race and the
+ * reply has already been removed, list_del_init() is a noop.
+ *
+ * If we see rs_committed set, we know the commit callback is handling,
+ * or has handled this reply since store reordering might allow us to
+ * see rs_committed set out of sequence. But since this is done
+ * holding rs_lock, we can be sure it has all completed once we hold
+ * rs_lock, which we do right next.
+ */
if (!rs->rs_committed) {
+ /* if rs was commited, no need to convert locks, don't check
+ * rs_committed here because rs may never be added into
+ * exp_uncommitted_replies and this flag never be set, see
+ * target_send_reply() */
+ if (rs->rs_convert_lock &&
+ rs->rs_transno > exp->exp_last_committed) {
+ struct ldlm_lock *lock;
+
+ spin_lock(&rs->rs_lock);
+ if (rs->rs_convert_lock &&
+ rs->rs_transno > exp->exp_last_committed) {
+ nlocks = rs->rs_nlocks;
+ while (nlocks-- > 0)
+ rs->rs_modes[nlocks] = LCK_COS;
+ nlocks = rs->rs_nlocks;
+ rs->rs_convert_lock = 0;
+ /* clear rs_scheduled so that commit callback
+ * can schedule again */
+ rs->rs_scheduled = 0;
+ spin_unlock(&rs->rs_lock);
+
+ while (nlocks-- > 0) {
+ lock = ldlm_handle2lock(
+ &rs->rs_locks[nlocks]);
+ LASSERT(lock != NULL);
+ ldlm_lock_downgrade(lock, LCK_COS);
+ LDLM_LOCK_PUT(lock);
+ }
+ RETURN(0);
+ }
+ spin_unlock(&rs->rs_lock);
+ }
+
spin_lock(&exp->exp_uncommitted_replies_lock);
list_del_init(&rs->rs_obd_list);
spin_unlock(&exp->exp_uncommitted_replies_lock);
}
+ spin_lock(&exp->exp_lock);
+ /* Noop if removed already */
+ list_del_init(&rs->rs_exp_list);
+ spin_unlock(&exp->exp_lock);
+
spin_lock(&rs->rs_lock);
- been_handled = rs->rs_handled;
- rs->rs_handled = 1;
+ been_handled = rs->rs_handled;
+ rs->rs_handled = 1;
- nlocks = rs->rs_nlocks; /* atomic "steal", but */
- rs->rs_nlocks = 0; /* locks still on rs_locks! */
+ nlocks = rs->rs_nlocks; /* atomic "steal", but */
+ rs->rs_nlocks = 0; /* locks still on rs_locks! */
- if (nlocks == 0 && !been_handled) {
- /* If we see this, we should already have seen the warning
- * in mds_steal_ack_locks() */
+ if (nlocks == 0 && !been_handled) {
+ /* If we see this, we should already have seen the warning
+ * in mds_steal_ack_locks() */
CDEBUG(D_HA, "All locks stolen from rs %p x%lld.t%lld"
" o%d NID %s\n",
rs,
rs->rs_xid, rs->rs_transno, rs->rs_opc,
libcfs_nid2str(exp->exp_connection->c_peer.nid));
- }
+ }
- if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
+ if ((!been_handled && rs->rs_on_net) || nlocks > 0) {
spin_unlock(&rs->rs_lock);
if (!been_handled && rs->rs_on_net) {
}
rs->rs_scheduled = 0;
+ rs->rs_convert_lock = 0;
if (!rs->rs_on_net) {
/* Off the net */