X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Ftarget%2Ftgt_main.c;h=4d3923723b2f19d3f9cbf0b6aa939461be79c4f0;hb=2c9ff6dffdf4320af95c9db9af07a416529275f0;hp=a920716e32859122d1aaa6d63dc1a5d28bae23d9;hpb=0addfa9fa1d48cc9fa5eb05026848e55382f81a8;p=fs%2Flustre-release.git diff --git a/lustre/target/tgt_main.c b/lustre/target/tgt_main.c index a920716..4d39237 100644 --- a/lustre/target/tgt_main.c +++ b/lustre/target/tgt_main.c @@ -21,7 +21,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, 2014, Intel Corporation. + * Copyright (c) 2012, 2016, Intel Corporation. */ /* * lustre/target/tgt_main.c @@ -37,6 +37,112 @@ #include "tgt_internal.h" #include "../ptlrpc/ptlrpc_internal.h" +/* + * Save cross-MDT lock in lut_slc_locks. + * + * Lock R/W count is not saved, but released in unlock (not canceled remotely), + * instead only a refcount is taken, so that the remote MDT where the object + * resides can detect conflict with this lock there. + * + * \param lut target + * \param lock cross-MDT lock to save + * \param transno when the transaction with this transno is committed, this lock + * can be canceled. + */ +void tgt_save_slc_lock(struct lu_target *lut, struct ldlm_lock *lock, + __u64 transno) +{ + spin_lock(&lut->lut_slc_locks_guard); + lock_res_and_lock(lock); + if (ldlm_is_cbpending(lock)) { + /* if it was canceld by server, don't save, because remote MDT + * will do Sync-on-Cancel. */ + LDLM_LOCK_PUT(lock); + } else { + lock->l_transno = transno; + /* if this lock is in the list already, there are two operations + * both use this lock, and save it after use, so for the second + * one, just put the refcount. */ + if (list_empty(&lock->l_slc_link)) + list_add_tail(&lock->l_slc_link, &lut->lut_slc_locks); + else + LDLM_LOCK_PUT(lock); + } + unlock_res_and_lock(lock); + spin_unlock(&lut->lut_slc_locks_guard); +} +EXPORT_SYMBOL(tgt_save_slc_lock); + +/* + * Discard cross-MDT lock from lut_slc_locks. + * + * This is called upon BAST, just remove lock from lut_slc_locks and put lock + * refcount. The BAST will cancel this lock. + * + * \param lut target + * \param lock cross-MDT lock to discard + */ +void tgt_discard_slc_lock(struct lu_target *lut, struct ldlm_lock *lock) +{ + spin_lock(&lut->lut_slc_locks_guard); + lock_res_and_lock(lock); + /* may race with tgt_cancel_slc_locks() */ + if (lock->l_transno != 0) { + LASSERT(!list_empty(&lock->l_slc_link)); + LASSERT(ldlm_is_cbpending(lock)); + list_del_init(&lock->l_slc_link); + lock->l_transno = 0; + LDLM_LOCK_PUT(lock); + } + unlock_res_and_lock(lock); + spin_unlock(&lut->lut_slc_locks_guard); +} +EXPORT_SYMBOL(tgt_discard_slc_lock); + +/* + * Cancel cross-MDT locks upon transaction commit. + * + * Remove cross-MDT locks from lut_slc_locks, cancel them and put lock refcount. + * + * \param lut target + * \param transno transaction with this number was committed. + */ +void tgt_cancel_slc_locks(struct lu_target *lut, __u64 transno) +{ + struct ldlm_lock *lock, *next; + LIST_HEAD(list); + struct lustre_handle lockh; + + spin_lock(&lut->lut_slc_locks_guard); + list_for_each_entry_safe(lock, next, &lut->lut_slc_locks, + l_slc_link) { + lock_res_and_lock(lock); + LASSERT(lock->l_transno != 0); + if (lock->l_transno > transno) { + unlock_res_and_lock(lock); + continue; + } + /* ouch, another operation is using it after it's saved */ + if (lock->l_readers != 0 || lock->l_writers != 0) { + unlock_res_and_lock(lock); + continue; + } + /* set CBPENDING so that this lock won't be used again */ + ldlm_set_cbpending(lock); + lock->l_transno = 0; + list_move(&lock->l_slc_link, &list); + unlock_res_and_lock(lock); + } + spin_unlock(&lut->lut_slc_locks_guard); + + list_for_each_entry_safe(lock, next, &list, l_slc_link) { + list_del_init(&lock->l_slc_link); + ldlm_lock2handle(lock, &lockh); + ldlm_cli_cancel(&lockh, LCF_ASYNC); + LDLM_LOCK_PUT(lock); + } +} + int tgt_init(const struct lu_env *env, struct lu_target *lut, struct obd_device *obd, struct dt_device *dt, struct tgt_opc_slice *slice, int request_fail_id, @@ -46,7 +152,7 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, struct lu_attr attr; struct lu_fid fid; struct dt_object *o; - int rc = 0; + int i, rc = 0; ENTRY; @@ -75,11 +181,15 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, spin_lock_init(&lut->lut_flags_lock); lut->lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL; + spin_lock_init(&lut->lut_slc_locks_guard); + INIT_LIST_HEAD(&lut->lut_slc_locks); + /* last_rcvd initialization is needed by replayable targets only */ if (!obd->obd_replayable) RETURN(0); spin_lock_init(&lut->lut_translock); + spin_lock_init(&lut->lut_client_bitmap_lock); OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); if (lut->lut_client_bitmap == NULL) @@ -97,13 +207,13 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, rc = PTR_ERR(o); CERROR("%s: cannot open LAST_RCVD: rc = %d\n", tgt_name(lut), rc); - GOTO(out, rc); + GOTO(out_put, rc); } lut->lut_last_rcvd = o; rc = tgt_server_data_init(env, lut); if (rc < 0) - GOTO(out, rc); + GOTO(out_put, rc); /* prepare transactions callbacks */ lut->lut_txn_cb.dtc_txn_start = tgt_txn_start_cb; @@ -123,7 +233,7 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, OBD_ALLOC(lut->lut_reply_bitmap, LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *)); if (lut->lut_reply_bitmap == NULL) - GOTO(out, rc); + GOTO(out, rc = -ENOMEM); memset(&attr, 0, sizeof(attr)); attr.la_valid = LA_MODE; @@ -145,22 +255,36 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, if (rc < 0) GOTO(out, rc); + atomic_set(&lut->lut_sync_count, 0); + RETURN(0); + out: + dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb); +out_put: + obd->u.obt.obt_magic = 0; + obd->u.obt.obt_lut = NULL; if (lut->lut_last_rcvd != NULL) { - lu_object_put(env, &lut->lut_last_rcvd->do_lu); - dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb); + dt_object_put(env, lut->lut_last_rcvd); + lut->lut_last_rcvd = NULL; } - lut->lut_last_rcvd = NULL; if (lut->lut_client_bitmap != NULL) OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); lut->lut_client_bitmap = NULL; if (lut->lut_reply_data != NULL) - lu_object_put(env, &lut->lut_reply_data->do_lu); + dt_object_put(env, lut->lut_reply_data); lut->lut_reply_data = NULL; - if (lut->lut_reply_bitmap != NULL) + if (lut->lut_reply_bitmap != NULL) { + for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) { + if (lut->lut_reply_bitmap[i] != NULL) + OBD_FREE_LARGE(lut->lut_reply_bitmap[i], + BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * + sizeof(long)); + lut->lut_reply_bitmap[i] = NULL; + } OBD_FREE(lut->lut_reply_bitmap, LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *)); + } lut->lut_reply_bitmap = NULL; return rc; } @@ -187,12 +311,12 @@ void tgt_fini(const struct lu_env *env, struct lu_target *lut) sptlrpc_rule_set_free(&lut->lut_sptlrpc_rset); if (lut->lut_reply_data != NULL) - lu_object_put(env, &lut->lut_reply_data->do_lu); + dt_object_put(env, lut->lut_reply_data); lut->lut_reply_data = NULL; if (lut->lut_reply_bitmap != NULL) { for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) { if (lut->lut_reply_bitmap[i] != NULL) - OBD_FREE(lut->lut_reply_bitmap[i], + OBD_FREE_LARGE(lut->lut_reply_bitmap[i], BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * sizeof(long)); lut->lut_reply_bitmap[i] = NULL; @@ -207,7 +331,7 @@ void tgt_fini(const struct lu_env *env, struct lu_target *lut) } if (lut->lut_last_rcvd) { dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb); - lu_object_put(env, &lut->lut_last_rcvd->do_lu); + dt_object_put(env, lut->lut_last_rcvd); lut->lut_last_rcvd = NULL; } EXIT; @@ -280,13 +404,14 @@ int tgt_mod_init(void) { ENTRY; - tgt_page_to_corrupt = alloc_page(GFP_IOFS); + tgt_page_to_corrupt = alloc_page(GFP_KERNEL); tgt_key_init_generic(&tgt_thread_key, NULL); lu_context_key_register_many(&tgt_thread_key, NULL); tgt_ses_key_init_generic(&tgt_session_key, NULL); lu_context_key_register_many(&tgt_session_key, NULL); + barrier_init(); update_info_init(); @@ -295,8 +420,9 @@ int tgt_mod_init(void) void tgt_mod_exit(void) { + barrier_fini(); if (tgt_page_to_corrupt != NULL) - page_cache_release(tgt_page_to_corrupt); + put_page(tgt_page_to_corrupt); lu_context_key_degister(&tgt_thread_key); lu_context_key_degister(&tgt_session_key);