X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Ftarget%2Ftgt_main.c;h=4d3923723b2f19d3f9cbf0b6aa939461be79c4f0;hb=e125515c4c48094379cf828085ddcd166861cb6a;hp=b478b3d20263fbfee116e7b40964d69335fbbc9e;hpb=f8b4e51fb85c078ae48c3c14472c520e850a2a2d;p=fs%2Flustre-release.git diff --git a/lustre/target/tgt_main.c b/lustre/target/tgt_main.c index b478b3d..4d39237 100644 --- a/lustre/target/tgt_main.c +++ b/lustre/target/tgt_main.c @@ -21,7 +21,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, 2015, Intel Corporation. + * Copyright (c) 2012, 2016, Intel Corporation. */ /* * lustre/target/tgt_main.c @@ -37,23 +37,22 @@ #include "tgt_internal.h" #include "../ptlrpc/ptlrpc_internal.h" -static spinlock_t uncommitted_slc_locks_guard; -static struct list_head uncommitted_slc_locks; - /* - * Save cross-MDT lock in uncommitted_slc_locks. + * Save cross-MDT lock in lut_slc_locks. * * Lock R/W count is not saved, but released in unlock (not canceled remotely), * instead only a refcount is taken, so that the remote MDT where the object * resides can detect conflict with this lock there. * + * \param lut target * \param lock cross-MDT lock to save * \param transno when the transaction with this transno is committed, this lock * can be canceled. */ -void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno) +void tgt_save_slc_lock(struct lu_target *lut, struct ldlm_lock *lock, + __u64 transno) { - spin_lock(&uncommitted_slc_locks_guard); + spin_lock(&lut->lut_slc_locks_guard); lock_res_and_lock(lock); if (ldlm_is_cbpending(lock)) { /* if it was canceld by server, don't save, because remote MDT @@ -65,27 +64,27 @@ void tgt_save_slc_lock(struct ldlm_lock *lock, __u64 transno) * both use this lock, and save it after use, so for the second * one, just put the refcount. */ if (list_empty(&lock->l_slc_link)) - list_add_tail(&lock->l_slc_link, - &uncommitted_slc_locks); + list_add_tail(&lock->l_slc_link, &lut->lut_slc_locks); else LDLM_LOCK_PUT(lock); } unlock_res_and_lock(lock); - spin_unlock(&uncommitted_slc_locks_guard); + spin_unlock(&lut->lut_slc_locks_guard); } EXPORT_SYMBOL(tgt_save_slc_lock); /* - * Discard cross-MDT lock from uncommitted_slc_locks. + * Discard cross-MDT lock from lut_slc_locks. * - * This is called upon BAST, just remove lock from uncommitted_slc_locks and put - * lock refcount. The BAST will cancel this lock. + * This is called upon BAST, just remove lock from lut_slc_locks and put lock + * refcount. The BAST will cancel this lock. * + * \param lut target * \param lock cross-MDT lock to discard */ -void tgt_discard_slc_lock(struct ldlm_lock *lock) +void tgt_discard_slc_lock(struct lu_target *lut, struct ldlm_lock *lock) { - spin_lock(&uncommitted_slc_locks_guard); + spin_lock(&lut->lut_slc_locks_guard); lock_res_and_lock(lock); /* may race with tgt_cancel_slc_locks() */ if (lock->l_transno != 0) { @@ -96,26 +95,26 @@ void tgt_discard_slc_lock(struct ldlm_lock *lock) LDLM_LOCK_PUT(lock); } unlock_res_and_lock(lock); - spin_unlock(&uncommitted_slc_locks_guard); + spin_unlock(&lut->lut_slc_locks_guard); } EXPORT_SYMBOL(tgt_discard_slc_lock); /* * Cancel cross-MDT locks upon transaction commit. * - * Remove cross-MDT locks from uncommitted_slc_locks, cancel them and put lock - * refcount. + * Remove cross-MDT locks from lut_slc_locks, cancel them and put lock refcount. * + * \param lut target * \param transno transaction with this number was committed. */ -void tgt_cancel_slc_locks(__u64 transno) +void tgt_cancel_slc_locks(struct lu_target *lut, __u64 transno) { struct ldlm_lock *lock, *next; LIST_HEAD(list); struct lustre_handle lockh; - spin_lock(&uncommitted_slc_locks_guard); - list_for_each_entry_safe(lock, next, &uncommitted_slc_locks, + spin_lock(&lut->lut_slc_locks_guard); + list_for_each_entry_safe(lock, next, &lut->lut_slc_locks, l_slc_link) { lock_res_and_lock(lock); LASSERT(lock->l_transno != 0); @@ -134,7 +133,7 @@ void tgt_cancel_slc_locks(__u64 transno) list_move(&lock->l_slc_link, &list); unlock_res_and_lock(lock); } - spin_unlock(&uncommitted_slc_locks_guard); + spin_unlock(&lut->lut_slc_locks_guard); list_for_each_entry_safe(lock, next, &list, l_slc_link) { list_del_init(&lock->l_slc_link); @@ -153,7 +152,7 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, struct lu_attr attr; struct lu_fid fid; struct dt_object *o; - int rc = 0; + int i, rc = 0; ENTRY; @@ -182,6 +181,9 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, spin_lock_init(&lut->lut_flags_lock); lut->lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL; + spin_lock_init(&lut->lut_slc_locks_guard); + INIT_LIST_HEAD(&lut->lut_slc_locks); + /* last_rcvd initialization is needed by replayable targets only */ if (!obd->obd_replayable) RETURN(0); @@ -231,7 +233,7 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, OBD_ALLOC(lut->lut_reply_bitmap, LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *)); if (lut->lut_reply_bitmap == NULL) - GOTO(out, rc); + GOTO(out, rc = -ENOMEM); memset(&attr, 0, sizeof(attr)); attr.la_valid = LA_MODE; @@ -263,18 +265,26 @@ out_put: obd->u.obt.obt_magic = 0; obd->u.obt.obt_lut = NULL; if (lut->lut_last_rcvd != NULL) { - lu_object_put(env, &lut->lut_last_rcvd->do_lu); + dt_object_put(env, lut->lut_last_rcvd); lut->lut_last_rcvd = NULL; } if (lut->lut_client_bitmap != NULL) OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); lut->lut_client_bitmap = NULL; if (lut->lut_reply_data != NULL) - lu_object_put(env, &lut->lut_reply_data->do_lu); + dt_object_put(env, lut->lut_reply_data); lut->lut_reply_data = NULL; - if (lut->lut_reply_bitmap != NULL) + if (lut->lut_reply_bitmap != NULL) { + for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) { + if (lut->lut_reply_bitmap[i] != NULL) + OBD_FREE_LARGE(lut->lut_reply_bitmap[i], + BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * + sizeof(long)); + lut->lut_reply_bitmap[i] = NULL; + } OBD_FREE(lut->lut_reply_bitmap, LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *)); + } lut->lut_reply_bitmap = NULL; return rc; } @@ -301,12 +311,12 @@ void tgt_fini(const struct lu_env *env, struct lu_target *lut) sptlrpc_rule_set_free(&lut->lut_sptlrpc_rset); if (lut->lut_reply_data != NULL) - lu_object_put(env, &lut->lut_reply_data->do_lu); + dt_object_put(env, lut->lut_reply_data); lut->lut_reply_data = NULL; if (lut->lut_reply_bitmap != NULL) { for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) { if (lut->lut_reply_bitmap[i] != NULL) - OBD_FREE(lut->lut_reply_bitmap[i], + OBD_FREE_LARGE(lut->lut_reply_bitmap[i], BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * sizeof(long)); lut->lut_reply_bitmap[i] = NULL; @@ -321,7 +331,7 @@ void tgt_fini(const struct lu_env *env, struct lu_target *lut) } if (lut->lut_last_rcvd) { dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb); - lu_object_put(env, &lut->lut_last_rcvd->do_lu); + dt_object_put(env, lut->lut_last_rcvd); lut->lut_last_rcvd = NULL; } EXIT; @@ -401,19 +411,18 @@ int tgt_mod_init(void) tgt_ses_key_init_generic(&tgt_session_key, NULL); lu_context_key_register_many(&tgt_session_key, NULL); + barrier_init(); update_info_init(); - spin_lock_init(&uncommitted_slc_locks_guard); - INIT_LIST_HEAD(&uncommitted_slc_locks); - RETURN(0); } void tgt_mod_exit(void) { + barrier_fini(); if (tgt_page_to_corrupt != NULL) - page_cache_release(tgt_page_to_corrupt); + put_page(tgt_page_to_corrupt); lu_context_key_degister(&tgt_thread_key); lu_context_key_degister(&tgt_session_key);