X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_lockd.c;h=4d9e72f8b9510cbab4bc664033473594b8fa1674;hp=a2d284cd062006d83441f3cd2d721d57e6c89137;hb=63851b5816bb30687fbf3750380d6b448e9400f1;hpb=1b2547843817b4b7adbeb87ea9b070d9cac35c90 diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index a2d284c..4d9e72f 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -95,25 +95,25 @@ static inline unsigned int ldlm_get_rq_timeout(void) struct ldlm_bl_pool { spinlock_t blp_lock; - /* - * blp_prio_list is used for callbacks that should be handled - * as a priority. It is used for LDLM_FL_DISCARD_DATA requests. - * see bug 13843 - */ - cfs_list_t blp_prio_list; - - /* - * blp_list is used for all other callbacks which are likely - * to take longer to process. - */ - cfs_list_t blp_list; - - cfs_waitq_t blp_waitq; - struct completion blp_comp; - cfs_atomic_t blp_num_threads; - cfs_atomic_t blp_busy_threads; - int blp_min_threads; - int blp_max_threads; + /* + * blp_prio_list is used for callbacks that should be handled + * as a priority. It is used for LDLM_FL_DISCARD_DATA requests. + * see bug 13843 + */ + cfs_list_t blp_prio_list; + + /* + * blp_list is used for all other callbacks which are likely + * to take longer to process. + */ + cfs_list_t blp_list; + + wait_queue_head_t blp_waitq; + struct completion blp_comp; + atomic_t blp_num_threads; + atomic_t blp_busy_threads; + int blp_min_threads; + int blp_max_threads; }; struct ldlm_bl_work_item { @@ -147,10 +147,10 @@ static spinlock_t waiting_locks_spinlock; /* BH lock (timer) */ * All access to it should be under waiting_locks_spinlock. */ static cfs_list_t waiting_locks_list; -static cfs_timer_t waiting_locks_timer; +static struct timer_list waiting_locks_timer; static struct expired_lock_thread { - cfs_waitq_t elt_waitq; + wait_queue_head_t elt_waitq; int elt_state; int elt_dump; cfs_list_t elt_expired_locks; @@ -173,20 +173,20 @@ static inline int have_expired_locks(void) */ static int expired_lock_main(void *arg) { - cfs_list_t *expired = &expired_lock_thread.elt_expired_locks; - struct l_wait_info lwi = { 0 }; - int do_dump; + cfs_list_t *expired = &expired_lock_thread.elt_expired_locks; + struct l_wait_info lwi = { 0 }; + int do_dump; - ENTRY; + ENTRY; - expired_lock_thread.elt_state = ELT_READY; - cfs_waitq_signal(&expired_lock_thread.elt_waitq); + expired_lock_thread.elt_state = ELT_READY; + wake_up(&expired_lock_thread.elt_waitq); - while (1) { - l_wait_event(expired_lock_thread.elt_waitq, - have_expired_locks() || - expired_lock_thread.elt_state == ELT_TERMINATE, - &lwi); + while (1) { + l_wait_event(expired_lock_thread.elt_waitq, + have_expired_locks() || + expired_lock_thread.elt_state == ELT_TERMINATE, + &lwi); spin_lock_bh(&waiting_locks_spinlock); if (expired_lock_thread.elt_dump) { @@ -201,17 +201,17 @@ static int expired_lock_main(void *arg) libcfs_run_lbug_upcall(&msgdata); spin_lock_bh(&waiting_locks_spinlock); - expired_lock_thread.elt_dump = 0; - } + expired_lock_thread.elt_dump = 0; + } - do_dump = 0; + do_dump = 0; - while (!cfs_list_empty(expired)) { - struct obd_export *export; - struct ldlm_lock *lock; + while (!cfs_list_empty(expired)) { + struct obd_export *export; + struct ldlm_lock *lock; - lock = cfs_list_entry(expired->next, struct ldlm_lock, - l_pending_chain); + lock = cfs_list_entry(expired->next, struct ldlm_lock, + l_pending_chain); if ((void *)lock < LP_POISON + PAGE_CACHE_SIZE && (void *)lock >= LP_POISON) { spin_unlock_bh(&waiting_locks_spinlock); @@ -221,19 +221,19 @@ static int expired_lock_main(void *arg) cfs_list_del_init(&lock->l_pending_chain); if ((void *)lock->l_export < LP_POISON + PAGE_CACHE_SIZE && - (void *)lock->l_export >= LP_POISON) { - CERROR("lock with free export on elt list %p\n", - lock->l_export); - lock->l_export = NULL; - LDLM_ERROR(lock, "free export"); - /* release extra ref grabbed by - * ldlm_add_waiting_lock() or - * ldlm_failed_ast() */ - LDLM_LOCK_RELEASE(lock); - continue; - } + (void *)lock->l_export >= LP_POISON) { + CERROR("lock with free export on elt list %p\n", + lock->l_export); + lock->l_export = NULL; + LDLM_ERROR(lock, "free export"); + /* release extra ref grabbed by + * ldlm_add_waiting_lock() or + * ldlm_failed_ast() */ + LDLM_LOCK_RELEASE(lock); + continue; + } - if (lock->l_flags & LDLM_FL_DESTROYED) { + if (ldlm_is_destroyed(lock)) { /* release the lock refcount where * waiting_locks_callback() founds */ LDLM_LOCK_RELEASE(lock); @@ -254,18 +254,18 @@ static int expired_lock_main(void *arg) } spin_unlock_bh(&waiting_locks_spinlock); - if (do_dump && obd_dump_on_eviction) { - CERROR("dump the log upon eviction\n"); - libcfs_debug_dumplog(); - } + if (do_dump && obd_dump_on_eviction) { + CERROR("dump the log upon eviction\n"); + libcfs_debug_dumplog(); + } - if (expired_lock_thread.elt_state == ELT_TERMINATE) - break; - } + if (expired_lock_thread.elt_state == ELT_TERMINATE) + break; + } - expired_lock_thread.elt_state = ELT_STOPPED; - cfs_waitq_signal(&expired_lock_thread.elt_waitq); - RETURN(0); + expired_lock_thread.elt_state = ELT_STOPPED; + wake_up(&expired_lock_thread.elt_waitq); + RETURN(0); } static int ldlm_add_waiting_lock(struct ldlm_lock *lock); @@ -312,53 +312,6 @@ static void waiting_locks_callback(unsigned long unused) (lock->l_req_mode == LCK_GROUP)) break; - if (ptlrpc_check_suspend()) { - /* there is a case when we talk to one mds, holding - * lock from another mds. this way we easily can get - * here, if second mds is being recovered. so, we - * suspend timeouts. bug 6019 */ - - LDLM_ERROR(lock, "recharge timeout: %s@%s nid %s ", - lock->l_export->exp_client_uuid.uuid, - lock->l_export->exp_connection->c_remote_uuid.uuid, - libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid)); - - cfs_list_del_init(&lock->l_pending_chain); - if (lock->l_flags & LDLM_FL_DESTROYED) { - /* relay the lock refcount decrease to - * expired lock thread */ - cfs_list_add(&lock->l_pending_chain, - &expired_lock_thread.elt_expired_locks); - } else { - __ldlm_add_waiting_lock(lock, - ldlm_get_enq_timeout(lock)); - } - continue; - } - - /* if timeout overlaps the activation time of suspended timeouts - * then extend it to give a chance for client to reconnect */ - if (cfs_time_before(cfs_time_sub(lock->l_callback_timeout, - cfs_time_seconds(obd_timeout)/2), - ptlrpc_suspend_wakeup_time())) { - LDLM_ERROR(lock, "extend timeout due to recovery: %s@%s nid %s ", - lock->l_export->exp_client_uuid.uuid, - lock->l_export->exp_connection->c_remote_uuid.uuid, - libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid)); - - cfs_list_del_init(&lock->l_pending_chain); - if (lock->l_flags & LDLM_FL_DESTROYED) { - /* relay the lock refcount decrease to - * expired lock thread */ - cfs_list_add(&lock->l_pending_chain, - &expired_lock_thread.elt_expired_locks); - } else { - __ldlm_add_waiting_lock(lock, - ldlm_get_enq_timeout(lock)); - } - continue; - } - /* Check if we need to prolong timeout */ if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) && ldlm_lock_busy(lock)) { @@ -386,7 +339,7 @@ static void waiting_locks_callback(unsigned long unused) ldlm_lock_to_ns(lock)->ns_timeouts++; LDLM_ERROR(lock, "lock callback timer expired after %lds: " "evicting client at %s ", - cfs_time_current_sec()- lock->l_last_activity, + cfs_time_current_sec() - lock->l_last_activity, libcfs_nid2str( lock->l_export->exp_connection->c_peer.nid)); @@ -403,7 +356,7 @@ static void waiting_locks_callback(unsigned long unused) if (obd_dump_on_timeout && need_dump) expired_lock_thread.elt_dump = __LINE__; - cfs_waitq_signal(&expired_lock_thread.elt_waitq); + wake_up(&expired_lock_thread.elt_waitq); } /* @@ -468,13 +421,13 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) int timeout = ldlm_get_enq_timeout(lock); /* NB: must be called with hold of lock_res_and_lock() */ - LASSERT(lock->l_flags & LDLM_FL_RES_LOCKED); - lock->l_flags |= LDLM_FL_WAITED; + LASSERT(ldlm_is_res_locked(lock)); + ldlm_set_waited(lock); - LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)); + LASSERT(!ldlm_is_cancel_on_block(lock)); spin_lock_bh(&waiting_locks_spinlock); - if (lock->l_flags & LDLM_FL_DESTROYED) { + if (ldlm_is_destroyed(lock)) { static cfs_time_t next; spin_unlock_bh(&waiting_locks_spinlock); LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)"); @@ -619,8 +572,7 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout) # ifdef HAVE_SERVER_SUPPORT static int ldlm_add_waiting_lock(struct ldlm_lock *lock) { - LASSERT((lock->l_flags & (LDLM_FL_RES_LOCKED|LDLM_FL_CANCEL_ON_BLOCK)) - == LDLM_FL_RES_LOCKED); + LASSERT(ldlm_is_res_locked(lock) && !ldlm_is_cancel_on_block(lock)); RETURN(1); } @@ -650,7 +602,7 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, LDLM_LOCK_GET(lock); cfs_list_add(&lock->l_pending_chain, &expired_lock_thread.elt_expired_locks); - cfs_waitq_signal(&expired_lock_thread.elt_waitq); + wake_up(&expired_lock_thread.elt_waitq); spin_unlock_bh(&waiting_locks_spinlock); #else class_fail_export(lock->l_export); @@ -674,7 +626,7 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock, libcfs_nid2str(peer.nid)); ldlm_lock_cancel(lock); rc = -ERESTART; - } else if (lock->l_flags & LDLM_FL_CANCEL) { + } else if (ldlm_is_cancel(lock)) { LDLM_DEBUG(lock, "%s AST timeout from nid %s, but " "cancel was received (AST reply lost?)", ast_type, libcfs_nid2str(peer.nid)); @@ -765,9 +717,9 @@ static int ldlm_cb_interpret(const struct lu_env *env, LDLM_LOCK_RELEASE(lock); if (rc == -ERESTART) - cfs_atomic_inc(&arg->restart); + atomic_inc(&arg->restart); - RETURN(0); + RETURN(0); } static inline int ldlm_ast_fini(struct ptlrpc_request *req, @@ -782,7 +734,7 @@ static inline int ldlm_ast_fini(struct ptlrpc_request *req, rc = ptl_send_rpc(req, 1); ptlrpc_req_finished(req); if (rc == 0) - cfs_atomic_inc(&arg->restart); + atomic_inc(&arg->restart); } else { LDLM_LOCK_GET(lock); ptlrpc_set_add_req(arg->set, req); @@ -878,20 +830,20 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, RETURN(0); } - if (lock->l_flags & LDLM_FL_DESTROYED) { + if (ldlm_is_destroyed(lock)) { /* What's the point? */ unlock_res_and_lock(lock); ptlrpc_req_finished(req); RETURN(0); } - if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) + if (ldlm_is_cancel_on_block(lock)) instant_cancel = 1; body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); body->lock_handle[0] = lock->l_remote_handle; body->lock_desc = *desc; - body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_AST_FLAGS); + body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK); LDLM_DEBUG(lock, "server preparing blocking AST"); @@ -910,6 +862,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); + lock->l_last_activity = cfs_time_current_sec(); + if (lock->l_export && lock->l_export->exp_nid_stats && lock->l_export->exp_nid_stats->nid_ldlm_stats) lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats, @@ -946,6 +900,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(), lock->l_last_activity); + if (OBD_FAIL_PRECHECK(OBD_FAIL_OST_LDLM_REPLY_NET)) { + LDLM_DEBUG(lock, "dropping CP AST"); + RETURN(0); + } + req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse, &RQF_LDLM_CP_CALLBACK); if (req == NULL) @@ -1002,6 +961,8 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)", total_enqueue_wait); + lock->l_last_activity = cfs_time_current_sec(); + /* Server-side enqueue wait time estimate, used in __ldlm_add_waiting_lock to set future enqueue timers */ if (total_enqueue_wait < ldlm_get_enq_timeout(lock)) @@ -1026,11 +987,11 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) /* We only send real blocking ASTs after the lock is granted */ lock_res_and_lock(lock); - if (lock->l_flags & LDLM_FL_AST_SENT) { + if (ldlm_is_ast_sent(lock)) { body->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT); /* Copy AST flags like LDLM_FL_DISCARD_DATA. */ body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & - LDLM_AST_FLAGS); + LDLM_FL_AST_MASK); /* We might get here prior to ldlm_handle_enqueue setting * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock @@ -1038,7 +999,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) * ldlm_handle_enqueue will call ldlm_lock_cancel() still, * that would not only cancel the lock, but will also remove * it from waiting list */ - if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) { + if (ldlm_is_cancel_on_block(lock)) { unlock_res_and_lock(lock); ldlm_lock_cancel(lock); instant_cancel = 1; @@ -1118,6 +1079,8 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data) if (AT_OFF) req->rq_timeout = ldlm_get_rq_timeout(); + lock->l_last_activity = cfs_time_current_sec(); + req->rq_interpret_reply = ldlm_cb_interpret; if (lock->l_export && lock->l_export->exp_nid_stats && @@ -1274,7 +1237,8 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, } #endif - if (unlikely(flags & LDLM_FL_REPLAY)) { + if (unlikely((flags & LDLM_FL_REPLAY) || + (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) { /* Find an existing lock in the per-export lock hash */ /* In the function below, .hs_keycmp resolves to * ldlm_export_lock_keycmp() */ @@ -1284,17 +1248,18 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, if (lock != NULL) { DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie " LPX64, lock->l_handle.h_cookie); + flags |= LDLM_FL_RESENT; GOTO(existing_lock, rc = 0); - } + } } - /* The lock's callback data might be set in the policy function */ - lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name, - dlm_req->lock_desc.l_resource.lr_type, - dlm_req->lock_desc.l_req_mode, + /* The lock's callback data might be set in the policy function */ + lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name, + dlm_req->lock_desc.l_resource.lr_type, + dlm_req->lock_desc.l_req_mode, cbs, NULL, 0, LVB_T_NONE); - if (!lock) - GOTO(out, rc = -ENOMEM); + if (IS_ERR(lock)) + GOTO(out, rc = PTR_ERR(lock)); lock->l_last_activity = cfs_time_current_sec(); lock->l_remote_handle = dlm_req->lock_handle[0]; @@ -1364,9 +1329,9 @@ existing_lock: /* Now take into account flags to be inherited from original lock request both in reply to client and in our own lock flags. */ - dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS; + dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_FL_INHERIT_MASK; lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & - LDLM_INHERIT_FLAGS); + LDLM_FL_INHERIT_MASK); /* Don't move a pending lock onto the export if it has already been * disconnected due to eviction (bug 5683) or server umount (bug 24324). @@ -1375,7 +1340,7 @@ existing_lock: OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) { LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export); rc = -ENOTCONN; - } else if (lock->l_flags & LDLM_FL_AST_SENT) { + } else if (ldlm_is_ast_sent(lock)) { dlm_rep->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT); if (lock->l_granted_mode == lock->l_req_mode) { /* @@ -1398,7 +1363,7 @@ existing_lock: if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN || dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) && req->rq_export->exp_libclient) { - if (unlikely(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) || + if (unlikely(!ldlm_is_cancel_on_block(lock) || !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){ CERROR("Granting sync lock to libclient. " "req fl %d, rep fl %d, lock fl "LPX64"\n", @@ -1431,11 +1396,11 @@ existing_lock: /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this * ldlm_reprocess_all. If this moves, revisit that code. -phil */ - if (lock) { - LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" - "(err=%d, rc=%d)", err, rc); + if (!IS_ERR(lock)) { + LDLM_DEBUG(lock, "server-side enqueue handler, sending reply" + "(err=%d, rc=%d)", err, rc); - if (rc == 0) { + if (rc == 0) { if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER) && ldlm_lvbo_size(lock) > 0) { @@ -1448,13 +1413,19 @@ existing_lock: req, lock); buflen = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER); - buflen = ldlm_lvbo_fill(lock, buf, buflen); - if (buflen >= 0) - req_capsule_shrink(&req->rq_pill, - &RMF_DLM_LVB, - buflen, RCL_SERVER); - else + if (buflen > 0) { + buflen = ldlm_lvbo_fill(lock, buf, + buflen); + if (buflen >= 0) + req_capsule_shrink( + &req->rq_pill, + &RMF_DLM_LVB, + buflen, RCL_SERVER); + else + rc = buflen; + } else { rc = buflen; + } } } else { lock_res_and_lock(lock); @@ -1696,10 +1667,10 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "client blocking AST callback handler"); lock_res_and_lock(lock); - lock->l_flags |= LDLM_FL_CBPENDING; + ldlm_set_cbpending(lock); - if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) - lock->l_flags |= LDLM_FL_CANCEL; + if (ldlm_is_cancel_on_block(lock)) + ldlm_set_cancel(lock); do_ast = (!lock->l_readers && !lock->l_writers); unlock_res_and_lock(lock); @@ -1740,10 +1711,10 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) { int to = cfs_time_seconds(1); while (to > 0) { - cfs_schedule_timeout_and_set_state( - CFS_TASK_INTERRUPTIBLE, to); + schedule_timeout_and_set_state( + TASK_INTERRUPTIBLE, to); if (lock->l_granted_mode == lock->l_req_mode || - lock->l_flags & LDLM_FL_DESTROYED) + ldlm_is_destroyed(lock)) break; } } @@ -1768,7 +1739,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, * variable length */ void *lvb_data; - OBD_ALLOC(lvb_data, lvb_len); + OBD_ALLOC_LARGE(lvb_data, lvb_len); if (lvb_data == NULL) { LDLM_ERROR(lock, "No memory: %d.\n", lvb_len); GOTO(out, rc = -ENOMEM); @@ -1776,6 +1747,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, lock_res_and_lock(lock); LASSERT(lock->l_lvb_data == NULL); + lock->l_lvb_type = LVB_T_LAYOUT; lock->l_lvb_data = lvb_data; lock->l_lvb_len = lvb_len; unlock_res_and_lock(lock); @@ -1783,7 +1755,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } lock_res_and_lock(lock); - if ((lock->l_flags & LDLM_FL_DESTROYED) || + if (ldlm_is_destroyed(lock) || lock->l_granted_mode == lock->l_req_mode) { /* bug 11300: the lock has already been granted */ unlock_res_and_lock(lock); @@ -1826,7 +1798,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, /* BL_AST locks are not needed in LRU. * Let ldlm_cancel_lru() be fast. */ ldlm_lock_remove_from_lru(lock); - lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; + lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; LDLM_DEBUG(lock, "completion AST includes blocking AST"); } @@ -1857,9 +1829,9 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, out: if (rc < 0) { lock_res_and_lock(lock); - lock->l_flags |= LDLM_FL_FAILED; + ldlm_set_failed(lock); unlock_res_and_lock(lock); - cfs_waitq_signal(&lock->l_waitq); + wake_up(&lock->l_waitq); } LDLM_LOCK_RELEASE(lock); } @@ -1932,7 +1904,7 @@ static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, spin_lock(&blp->blp_lock); if (blwi->blwi_lock && - blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) { + ldlm_is_discard_data(blwi->blwi_lock)) { /* add LDLM_FL_DISCARD_DATA requests to the priority list */ cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list); } else { @@ -1941,7 +1913,7 @@ static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, } spin_unlock(&blp->blp_lock); - cfs_waitq_signal(&blp->blp_waitq); + wake_up(&blp->blp_waitq); /* can not check blwi->blwi_flags as blwi could be already freed in LCF_ASYNC mode */ @@ -2163,16 +2135,6 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) rc = ldlm_handle_setinfo(req); ldlm_callback_reply(req, rc); RETURN(0); - case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */ - CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n"); - req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET)) - RETURN(0); - rc = llog_origin_handle_cancel(req); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP)) - RETURN(0); - ldlm_callback_reply(req, rc); - RETURN(0); case LLOG_ORIGIN_HANDLE_CREATE: req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE); if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) @@ -2248,22 +2210,21 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) RETURN(0); } - if ((lock->l_flags & LDLM_FL_FAIL_LOC) && + if (ldlm_is_fail_loc(lock) && lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE); /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */ lock_res_and_lock(lock); lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags & - LDLM_AST_FLAGS); + LDLM_FL_AST_MASK); if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) { /* If somebody cancels lock and cache is already dropped, * or lock is failed before cp_ast received on client, * we can tell the server we have no lock. Otherwise, we * should send cancel after dropping the cache. */ - if (((lock->l_flags & LDLM_FL_CANCELING) && - (lock->l_flags & LDLM_FL_BL_DONE)) || - (lock->l_flags & LDLM_FL_FAILED)) { + if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) || + ldlm_is_failed(lock)) { LDLM_DEBUG(lock, "callback on lock " LPX64" - lock disappeared\n", dlm_req->lock_handle[0].cookie); @@ -2277,7 +2238,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) /* BL_AST locks are not needed in LRU. * Let ldlm_cancel_lru() be fast. */ ldlm_lock_remove_from_lru(lock); - lock->l_flags |= LDLM_FL_BL_AST; + ldlm_set_bl_ast(lock); } unlock_res_and_lock(lock); @@ -2294,7 +2255,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) case LDLM_BL_CALLBACK: CDEBUG(D_INODE, "blocking ast\n"); req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK); - if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) { + if (!ldlm_is_cancel_on_block(lock)) { rc = ldlm_callback_reply(req, 0); if (req->rq_no_reply || rc) ldlm_callback_errmsg(req, "Normal process", rc, @@ -2374,15 +2335,6 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req) if (rc) break; RETURN(0); - case OBD_LOG_CANCEL: - req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET)) - RETURN(0); - rc = llog_origin_handle_cancel(req); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP)) - RETURN(0); - ldlm_callback_reply(req, rc); - RETURN(0); default: CERROR("invalid opcode %d\n", lustre_msg_get_opc(req->rq_reqmsg)); @@ -2444,7 +2396,7 @@ static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req) if (lock == NULL) continue; - rc = !!(lock->l_flags & LDLM_FL_AST_SENT); + rc = ldlm_is_ast_sent(lock) ? 1 : 0; if (rc) LDLM_DEBUG(lock, "hpreq cancel lock"); LDLM_LOCK_PUT(lock); @@ -2499,7 +2451,7 @@ int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, return 0; } - if (lock->l_flags & LDLM_FL_AST_SENT) { + if (ldlm_is_ast_sent(lock)) { unlock_res_and_lock(lock); return 0; } @@ -2507,7 +2459,7 @@ int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd, LASSERT(lock->l_blocking_ast); LASSERT(!lock->l_blocking_lock); - lock->l_flags |= LDLM_FL_AST_SENT; + ldlm_set_ast_sent(lock); if (lock->l_export && lock->l_export->exp_lock_hash) { /* NB: it's safe to call cfs_hash_del() even lock isn't * in exp_lock_hash. */ @@ -2548,22 +2500,22 @@ static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp) static unsigned int num_bl = 0; spin_lock(&blp->blp_lock); - /* process a request from the blp_list at least every blp_num_threads */ - if (!cfs_list_empty(&blp->blp_list) && - (cfs_list_empty(&blp->blp_prio_list) || num_bl == 0)) - blwi = cfs_list_entry(blp->blp_list.next, - struct ldlm_bl_work_item, blwi_entry); - else - if (!cfs_list_empty(&blp->blp_prio_list)) - blwi = cfs_list_entry(blp->blp_prio_list.next, - struct ldlm_bl_work_item, - blwi_entry); - - if (blwi) { - if (++num_bl >= cfs_atomic_read(&blp->blp_num_threads)) - num_bl = 0; - cfs_list_del(&blwi->blwi_entry); - } + /* process a request from the blp_list at least every blp_num_threads */ + if (!cfs_list_empty(&blp->blp_list) && + (cfs_list_empty(&blp->blp_prio_list) || num_bl == 0)) + blwi = cfs_list_entry(blp->blp_list.next, + struct ldlm_bl_work_item, blwi_entry); + else + if (!cfs_list_empty(&blp->blp_prio_list)) + blwi = cfs_list_entry(blp->blp_prio_list.next, + struct ldlm_bl_work_item, + blwi_entry); + + if (blwi) { + if (++num_bl >= atomic_read(&blp->blp_num_threads)) + num_bl = 0; + cfs_list_del(&blwi->blwi_entry); + } spin_unlock(&blp->blp_lock); return blwi; @@ -2582,16 +2534,16 @@ static int ldlm_bl_thread_main(void *arg); static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) { struct ldlm_bl_thread_data bltd = { .bltd_blp = blp }; - cfs_task_t *task; + struct task_struct *task; init_completion(&bltd.bltd_comp); - bltd.bltd_num = cfs_atomic_read(&blp->blp_num_threads); + bltd.bltd_num = atomic_read(&blp->blp_num_threads); snprintf(bltd.bltd_name, sizeof(bltd.bltd_name) - 1, "ldlm_bl_%02d", bltd.bltd_num); task = kthread_run(ldlm_bl_thread_main, &bltd, bltd.bltd_name); if (IS_ERR(task)) { CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n", - cfs_atomic_read(&blp->blp_num_threads), PTR_ERR(task)); + atomic_read(&blp->blp_num_threads), PTR_ERR(task)); return PTR_ERR(task); } wait_for_completion(&bltd.bltd_comp); @@ -2609,51 +2561,50 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp) static int ldlm_bl_thread_main(void *arg) { struct ldlm_bl_pool *blp; + struct ldlm_bl_thread_data *bltd = arg; ENTRY; - { - struct ldlm_bl_thread_data *bltd = arg; - - blp = bltd->bltd_blp; + blp = bltd->bltd_blp; - cfs_atomic_inc(&blp->blp_num_threads); - cfs_atomic_inc(&blp->blp_busy_threads); + atomic_inc(&blp->blp_num_threads); + atomic_inc(&blp->blp_busy_threads); - complete(&bltd->bltd_comp); - /* cannot use bltd after this, it is only on caller's stack */ - } + complete(&bltd->bltd_comp); + /* cannot use bltd after this, it is only on caller's stack */ - while (1) { - struct l_wait_info lwi = { 0 }; - struct ldlm_bl_work_item *blwi = NULL; - int busy; + while (1) { + struct l_wait_info lwi = { 0 }; + struct ldlm_bl_work_item *blwi = NULL; + int busy; - blwi = ldlm_bl_get_work(blp); + blwi = ldlm_bl_get_work(blp); - if (blwi == NULL) { - cfs_atomic_dec(&blp->blp_busy_threads); - l_wait_event_exclusive(blp->blp_waitq, - (blwi = ldlm_bl_get_work(blp)) != NULL, - &lwi); - busy = cfs_atomic_inc_return(&blp->blp_busy_threads); - } else { - busy = cfs_atomic_read(&blp->blp_busy_threads); - } + if (blwi == NULL) { + atomic_dec(&blp->blp_busy_threads); + l_wait_event_exclusive(blp->blp_waitq, + (blwi = ldlm_bl_get_work(blp)) != NULL, + &lwi); + busy = atomic_inc_return(&blp->blp_busy_threads); + } else { + busy = atomic_read(&blp->blp_busy_threads); + } - if (blwi->blwi_ns == NULL) - /* added by ldlm_cleanup() */ - break; + if (blwi->blwi_ns == NULL) + /* added by ldlm_cleanup() */ + break; - /* Not fatal if racy and have a few too many threads */ - if (unlikely(busy < blp->blp_max_threads && - busy >= cfs_atomic_read(&blp->blp_num_threads) && - !blwi->blwi_mem_pressure)) - /* discard the return value, we tried */ - ldlm_bl_thread_start(blp); + /* Not fatal if racy and have a few too many threads */ + if (unlikely(busy < blp->blp_max_threads && + busy >= atomic_read(&blp->blp_num_threads) && + !blwi->blwi_mem_pressure)) + /* discard the return value, we tried */ + ldlm_bl_thread_start(blp); if (blwi->blwi_mem_pressure) memory_pressure_set(); + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4); + if (blwi->blwi_count) { int count; /* The special case when we cancel locks in LRU @@ -2676,12 +2627,12 @@ static int ldlm_bl_thread_main(void *arg) OBD_FREE(blwi, sizeof(*blwi)); else complete(&blwi->blwi_comp); - } + } - cfs_atomic_dec(&blp->blp_busy_threads); - cfs_atomic_dec(&blp->blp_num_threads); + atomic_dec(&blp->blp_busy_threads); + atomic_dec(&blp->blp_num_threads); complete(&blp->blp_comp); - RETURN(0); + RETURN(0); } #endif @@ -2794,6 +2745,7 @@ static cfs_hash_ops_t ldlm_export_lock_ops = { int ldlm_init_export(struct obd_export *exp) { + int rc; ENTRY; exp->exp_lock_hash = @@ -2809,7 +2761,14 @@ int ldlm_init_export(struct obd_export *exp) if (!exp->exp_lock_hash) RETURN(-ENOMEM); + rc = ldlm_init_flock_export(exp); + if (rc) + GOTO(err, rc); + RETURN(0); +err: + ldlm_destroy_export(exp); + RETURN(rc); } EXPORT_SYMBOL(ldlm_init_export); @@ -2827,11 +2786,15 @@ EXPORT_SYMBOL(ldlm_destroy_export); static int ldlm_setup(void) { static struct ptlrpc_service_conf conf; - struct ldlm_bl_pool *blp = NULL; - int rc = 0; + struct ldlm_bl_pool *blp = NULL; #ifdef __KERNEL__ - int i; +# ifdef HAVE_SERVER_SUPPORT + struct task_struct *task; +# endif + int i; #endif + int rc = 0; + ENTRY; if (ldlm_state != NULL) @@ -2935,11 +2898,11 @@ static int ldlm_setup(void) ldlm_state->ldlm_bl_pool = blp; spin_lock_init(&blp->blp_lock); - CFS_INIT_LIST_HEAD(&blp->blp_list); - CFS_INIT_LIST_HEAD(&blp->blp_prio_list); - cfs_waitq_init(&blp->blp_waitq); - cfs_atomic_set(&blp->blp_num_threads, 0); - cfs_atomic_set(&blp->blp_busy_threads, 0); + CFS_INIT_LIST_HEAD(&blp->blp_list); + CFS_INIT_LIST_HEAD(&blp->blp_prio_list); + init_waitqueue_head(&blp->blp_waitq); + atomic_set(&blp->blp_num_threads, 0); + atomic_set(&blp->blp_busy_threads, 0); #ifdef __KERNEL__ if (ldlm_num_threads == 0) { @@ -2960,19 +2923,20 @@ static int ldlm_setup(void) # ifdef HAVE_SERVER_SUPPORT CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks); expired_lock_thread.elt_state = ELT_STOPPED; - cfs_waitq_init(&expired_lock_thread.elt_waitq); + init_waitqueue_head(&expired_lock_thread.elt_waitq); CFS_INIT_LIST_HEAD(&waiting_locks_list); spin_lock_init(&waiting_locks_spinlock); cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); - rc = PTR_ERR(kthread_run(expired_lock_main, NULL, "ldlm_elt")); - if (IS_ERR_VALUE(rc)) { + task = kthread_run(expired_lock_main, NULL, "ldlm_elt"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); GOTO(out, rc); } - cfs_wait_event(expired_lock_thread.elt_waitq, + wait_event(expired_lock_thread.elt_waitq, expired_lock_thread.elt_state == ELT_READY); # endif /* HAVE_SERVER_SUPPORT */ @@ -3007,14 +2971,14 @@ static int ldlm_cleanup(void) if (ldlm_state->ldlm_bl_pool != NULL) { struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; - while (cfs_atomic_read(&blp->blp_num_threads) > 0) { + while (atomic_read(&blp->blp_num_threads) > 0) { struct ldlm_bl_work_item blwi = { .blwi_ns = NULL }; init_completion(&blp->blp_comp); spin_lock(&blp->blp_lock); cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list); - cfs_waitq_signal(&blp->blp_waitq); + wake_up(&blp->blp_waitq); spin_unlock(&blp->blp_lock); wait_for_completion(&blp->blp_comp); @@ -3037,8 +3001,8 @@ static int ldlm_cleanup(void) # ifdef HAVE_SERVER_SUPPORT if (expired_lock_thread.elt_state != ELT_STOPPED) { expired_lock_thread.elt_state = ELT_TERMINATE; - cfs_waitq_signal(&expired_lock_thread.elt_waitq); - cfs_wait_event(expired_lock_thread.elt_waitq, + wake_up(&expired_lock_thread.elt_waitq); + wait_event(expired_lock_thread.elt_waitq, expired_lock_thread.elt_state == ELT_STOPPED); } # endif