From: vitaly Date: Wed, 19 Nov 2008 21:14:46 +0000 (+0000) Subject: Branch b1_8_gate X-Git-Tag: v1_9_120~123 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=f31b79be5a0380df3ed05c16fa43feca2bf5905c Branch b1_8_gate b=16129 i=adilger i=green - a high priority request list is added into service; - once a lock is canceled, all the IO requests, including coming ones, under this lock, are moved into this list; - PING is also added into this list; - once a lock cancel timeout occurs, the timeout is prolonged if there is an IO rpc under this lock; - another request list is added into the export, used to speed up the rpc-lock matching. --- diff --git a/libcfs/include/libcfs/libcfs_debug.h b/libcfs/include/libcfs/libcfs_debug.h index 8487c39..d2663c8 100644 --- a/libcfs/include/libcfs/libcfs_debug.h +++ b/libcfs/include/libcfs/libcfs_debug.h @@ -294,6 +294,11 @@ do { \ #endif /* !CDEBUG_ENTRY_EXIT */ +#define RETURN_EXIT \ +do { \ + EXIT_NESTING; \ + return; \ +} while (0) struct libcfs_debug_msg_data { cfs_debug_limit_state_t *msg_cdls; diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index aa0e358..3eac378 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -432,6 +432,7 @@ struct ldlm_namespace { unsigned int ns_max_unused; unsigned int ns_max_age; + unsigned int ns_timeouts; /** * Seconds. */ diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index b2a5c86..94033ef 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -148,6 +148,7 @@ struct obd_export { exp_flvr_changed:1, exp_flvr_adapt:1, exp_libclient:1; /* liblustre client? */ + struct list_head exp_queued_rpc; /* RPC to be handled */ /* also protected by exp_lock */ enum lustre_sec_part exp_sp_peer; struct sptlrpc_flavor exp_flvr; /* current */ diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 9d7f95b..0561fea 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -169,7 +169,7 @@ #define MGS_MAXREPSIZE (9 * 1024) /* Absolute limits */ -#define OSS_THREADS_MIN 2 +#define OSS_THREADS_MIN 3 /* difficult replies, HPQ, others */ #define OSS_THREADS_MAX 512 #define OST_NBUFS (64 * num_online_cpus()) #define OST_BUFSIZE (8 * 1024) @@ -317,6 +317,20 @@ struct ptlrpc_request_pool { struct lu_context; struct lu_env; +struct ldlm_lock; + +struct ptlrpc_hpreq_ops { + /** + * Check if the lock handle of the given lock is the same as + * taken from the request. + */ + int (*hpreq_lock_match)(struct ptlrpc_request *, struct ldlm_lock *); + /** + * Check if the request is a high priority one. + */ + int (*hpreq_check)(struct ptlrpc_request *); +}; + /** * Represents remote procedure call. */ @@ -325,6 +339,8 @@ struct ptlrpc_request { struct list_head rq_list; struct list_head rq_timed_list; /* server-side early replies */ struct list_head rq_history_list; /* server-side history */ + struct list_head rq_exp_list; /* server-side per-export list */ + struct ptlrpc_hpreq_ops *rq_ops; /* server-side hp handlers */ __u64 rq_history_seq; /* history sequence # */ int rq_status; spinlock_t rq_lock; @@ -348,7 +364,8 @@ struct ptlrpc_request { rq_early:1, rq_must_unlink:1, /* server-side flags */ rq_packed_final:1, /* packed final reply */ - rq_sent_final:1; /* stop sending early replies */ + rq_sent_final:1, /* stop sending early replies */ + rq_hp:1; /* high priority RPC */ enum rq_phase rq_phase; /* one of RQ_PHASE_* */ enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */ @@ -548,9 +565,9 @@ ptlrpc_rqphase2str(struct ptlrpc_request *req) FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"), \ FLAG(req->rq_no_resend, "N"), \ FLAG(req->rq_waiting, "W"), \ - FLAG(req->rq_wait_ctx, "C") + FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H") -#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s" +#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s" void _debug_req(struct ptlrpc_request *req, __u32 mask, struct libcfs_debug_msg_data *data, const char *fmt, ...) @@ -659,6 +676,9 @@ struct ptlrpc_request_buffer_desc { typedef int (*svc_handler_t)(struct ptlrpc_request *req); typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *); +typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *); + +#define PTLRPC_SVC_HP_RATIO 10 struct ptlrpc_service { struct list_head srv_list; /* chain thru all services */ @@ -673,6 +693,7 @@ struct ptlrpc_service { int srv_threads_running; /* # running threads */ int srv_n_difficult_replies; /* # 'difficult' replies */ int srv_n_active_reqs; /* # reqs being served */ + int srv_n_hpreq; /* # HPreqs being served */ cfs_duration_t srv_rqbd_timeout; /* timeout before re-posting reqs, in tick */ int srv_watchdog_factor; /* soft watchdog timeout mutiplier */ unsigned srv_cpu_affinity:1; /* bind threads to CPUs */ @@ -690,8 +711,11 @@ struct ptlrpc_service { cfs_timer_t srv_at_timer; /* early reply timer */ int srv_n_queued_reqs; /* # reqs in either of the queues below */ + int srv_hpreq_count; /* # hp requests handled */ + int srv_hpreq_ratio; /* # hp per lp reqs to handle */ struct list_head srv_req_in_queue; /* incoming reqs */ struct list_head srv_request_queue; /* reqs waiting for service */ + struct list_head srv_request_hpq; /* high priority queue */ struct list_head srv_request_history; /* request history */ __u64 srv_request_seq; /* next request sequence # */ @@ -716,6 +740,7 @@ struct ptlrpc_service { struct list_head srv_threads; /* service thread list */ svc_handler_t srv_handler; + svc_hpreq_handler_t srv_hpreq_handler; /* hp request handler */ char *srv_name; /* only statically allocated strings here; we don't clean them */ char *srv_thread_name; /* only statically allocated strings here; we don't clean them */ @@ -973,7 +998,8 @@ struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, cfs_proc_dir_entry_t *proc_entry, svcreq_printfn_t, int min_threads, int max_threads, - char *threadname, __u32 ctx_tags); + char *threadname, __u32 ctx_tags, + svc_hpreq_handler_t); void ptlrpc_stop_all_threads(struct ptlrpc_service *svc); int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc); @@ -982,6 +1008,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service); int liblustre_check_services (void *arg); void ptlrpc_daemonize(char *name); int ptlrpc_service_health_check(struct ptlrpc_service *); +void ptlrpc_hpreq_reorder(struct ptlrpc_request *req); struct ptlrpc_svc_data { diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 50379c5..a02b764 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -292,6 +292,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_PTLRPC_DUMP_LOG 0x50e #define OBD_FAIL_PTLRPC_LONG_UNLINK 0x50f +#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510 +#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511 #define OBD_FAIL_OBD_PING_NET 0x600 #define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601 diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 50d4504..1ec702a 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -369,6 +369,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list); lu_ref_init(&lock->l_reference); lu_ref_add(&lock->l_reference, "hash", lock); + lock->l_callback_timeout = 0; RETURN(lock); } @@ -1874,7 +1875,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, data->msg_fn, data->msg_line, fmt, args, " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: " - LPX64" expref: %d pid: %u\n", lock, + LPX64" expref: %d pid: %u timeout: %lu\n", lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, ldlm_lockname[lock->l_granted_mode], @@ -1882,7 +1883,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); va_end(args); return; } @@ -1894,7 +1895,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64 "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64 - " expref: %d pid: %u\n", + " expref: %d pid: %u timeout %lu\n", lock->l_resource->lr_namespace->ns_name, lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, @@ -1910,7 +1911,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); break; case LDLM_FLOCK: @@ -1919,7 +1920,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d " "["LPU64"->"LPU64"] flags: %x remote: "LPX64 - " expref: %d pid: %u\n", + " expref: %d pid: %u timeout: %lu\n", lock->l_resource->lr_namespace->ns_name, lock, lock->l_handle.h_cookie, atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, @@ -1935,7 +1936,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); break; case LDLM_IBITS: @@ -1944,7 +1945,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s " "flags: %x remote: "LPX64" expref: %d " - "pid %u\n", + "pid: %u timeout: %lu\n", lock->l_resource->lr_namespace->ns_name, lock, lock->l_handle.h_cookie, atomic_read (&lock->l_refc), @@ -1959,7 +1960,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); break; default: @@ -1967,7 +1968,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, data->msg_fn, data->msg_line, fmt, args, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x " - "remote: "LPX64" expref: %d pid: %u\n", + "remote: "LPX64" expref: %d pid: %u timeout %lu\n", lock->l_resource->lr_namespace->ns_name, lock, lock->l_handle.h_cookie, atomic_read (&lock->l_refc), @@ -1981,7 +1982,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level, lock->l_flags, lock->l_remote_handle.cookie, lock->l_export ? atomic_read(&lock->l_export->exp_refcount) : -99, - lock->l_pid); + lock->l_pid, lock->l_callback_timeout); break; } va_end(args); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index b809cba..9d6bb38 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -238,6 +238,31 @@ static int expired_lock_main(void *arg) static int ldlm_add_waiting_lock(struct ldlm_lock *lock); +/** + * Check if there is a request in the export request list + * which prevents the lock canceling. + */ +static int ldlm_lock_busy(struct ldlm_lock *lock) +{ + struct ptlrpc_request *req; + int match = 0; + ENTRY; + + if (lock->l_export == NULL) + return 0; + + spin_lock(&lock->l_export->exp_lock); + list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) { + if (req->rq_ops->hpreq_lock_match) { + match = req->rq_ops->hpreq_lock_match(req, lock); + if (match) + break; + } + } + spin_unlock(&lock->l_export->exp_lock); + RETURN(match); +} + /* This is called from within a timer interrupt and cannot schedule */ static void waiting_locks_callback(unsigned long unused) { @@ -248,7 +273,6 @@ repeat: while (!list_empty(&waiting_locks_list)) { lock = list_entry(waiting_locks_list.next, struct ldlm_lock, l_pending_chain); - if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) || (lock->l_req_mode == LCK_GROUP)) break; @@ -286,6 +310,29 @@ repeat: goto repeat; } + /* Check if we need to prolong timeout */ + if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) && + ldlm_lock_busy(lock)) { + int cont = 1; + + if (lock->l_pending_chain.next == &waiting_locks_list) + cont = 0; + + LDLM_LOCK_GET(lock); + spin_unlock_bh(&waiting_locks_spinlock); + LDLM_DEBUG(lock, "prolong the busy lock"); + ldlm_refresh_waiting_lock(lock); + spin_lock_bh(&waiting_locks_spinlock); + + if (!cont) { + LDLM_LOCK_PUT(lock); + break; + } + + LDLM_LOCK_PUT(lock); + continue; + } + lock->l_resource->lr_namespace->ns_timeouts++; LDLM_ERROR(lock, "lock callback timer expired after %lds: " "evicting client at %s ", cfs_time_current_sec()- lock->l_enqueued_time.tv_sec, @@ -335,15 +382,21 @@ repeat: */ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock) { - int timeout; + cfs_time_t timeout; cfs_time_t timeout_rounded; if (!list_empty(&lock->l_pending_chain)) return 0; - timeout = ldlm_get_enq_timeout(lock); + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) || + OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT)) + timeout = 2; + else + timeout = ldlm_get_enq_timeout(lock); - lock->l_callback_timeout = cfs_time_shift(timeout); + timeout = cfs_time_shift(timeout); + if (likely(cfs_time_after(timeout, lock->l_callback_timeout))) + lock->l_callback_timeout = timeout; timeout_rounded = round_timeout(lock->l_callback_timeout); @@ -475,7 +528,6 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock) LDLM_DEBUG(lock, "refreshed"); return 1; } - #else /* !__KERNEL__ */ static int ldlm_add_waiting_lock(struct ldlm_lock *lock) @@ -621,6 +673,30 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req, RETURN(rc); } +/** + * Check if there are requests in the export request list which prevent + * the lock canceling and make these requests high priority ones. + */ +static void ldlm_lock_reorder_req(struct ldlm_lock *lock) +{ + struct ptlrpc_request *req; + ENTRY; + + if (lock->l_export == NULL) { + LDLM_DEBUG(lock, "client lock: no-op"); + RETURN_EXIT; + } + + spin_lock(&lock->l_export->exp_lock); + list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) { + if (!req->rq_hp && req->rq_ops->hpreq_lock_match && + req->rq_ops->hpreq_lock_match(req, lock)) + ptlrpc_hpreq_reorder(req); + } + spin_unlock(&lock->l_export->exp_lock); + EXIT; +} + /* * ->l_blocking_ast() method for server-side locks. This is invoked when newly * enqueued server lock conflicts with given one. @@ -650,6 +726,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, ldlm_lock_dump(D_ERROR, lock, 0); } + ldlm_lock_reorder_req(lock); + req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse, &RQF_LDLM_BL_CALLBACK, LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK); @@ -2193,7 +2271,7 @@ static int ldlm_setup(void) ldlm_svc_proc_dir, NULL, ldlm_min_threads, ldlm_max_threads, "ldlm_cb", - LCT_MD_THREAD|LCT_DT_THREAD); + LCT_MD_THREAD|LCT_DT_THREAD, NULL); if (!ldlm_state->ldlm_cb_service) { CERROR("failed to start service\n"); @@ -2208,7 +2286,8 @@ static int ldlm_setup(void) ldlm_svc_proc_dir, NULL, ldlm_min_threads, ldlm_max_threads, "ldlm_cn", - LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD); + LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD, + NULL); if (!ldlm_state->ldlm_cancel_service) { CERROR("failed to start service\n"); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 92068f6..1317844 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -120,6 +120,7 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock) timeout = timeout + (timeout >> 1); /* 150% */ return max(timeout, ldlm_enqueue_min); } +EXPORT_SYMBOL(ldlm_get_enq_timeout); static int is_granted_or_cancelled(struct ldlm_lock *lock) { diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 1b4e46e..320a870 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -287,6 +287,12 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) lock_vars[0].write_fptr = lprocfs_wr_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_timeouts", + ns->ns_name); + lock_vars[0].data = &ns->ns_timeouts; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); + snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes", ns->ns_name); lock_vars[0].data = &ns->ns_max_nolock_size; @@ -370,6 +376,7 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name, ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE; ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE; ns->ns_ctime_age_limit = LDLM_CTIME_AGE_LIMIT; + ns->ns_timeouts = 0; spin_lock_init(&ns->ns_unused_lock); ns->ns_orig_connect_flags = 0; ns->ns_connect_flags = 0; diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index e724962..9fbe6c2 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -248,7 +248,7 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg) mgs_handle, LUSTRE_MGS_NAME, obd->obd_proc_entry, target_print_req, MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX, - "ll_mgs", LCT_MD_THREAD); + "ll_mgs", LCT_MD_THREAD, NULL); if (!mgs->mgs_service) { CERROR("failed to start service\n"); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index edd6dfb..5adafa3 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -717,6 +717,7 @@ static void class_export_destroy(struct obd_export *exp) LASSERT(list_empty(&exp->exp_outstanding_replies)); LASSERT(list_empty(&exp->exp_req_replay_queue)); + LASSERT(list_empty(&exp->exp_queued_rpc)); obd_destroy_export(exp); class_decref(obd, "export", exp); @@ -745,6 +746,7 @@ struct obd_export *class_new_export(struct obd_device *obd, CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies); CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue); CFS_INIT_LIST_HEAD(&export->exp_handle.h_link); + CFS_INIT_LIST_HEAD(&export->exp_queued_rpc); class_handle_hash(&export->exp_handle, export_handle_addref); export->exp_last_request_time = cfs_time_current_sec(); spin_lock_init(&export->exp_lock); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 8b2b7fb..bdea74e 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -300,10 +300,9 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, /* check that we do support OBD_CONNECT_TRUNCLOCK. */ CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) - RETURN(-EFAULT); + /* ost_body is varified and swabbed in ost_hpreq_handler() */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); oinfo.oi_oa = &body->oa; oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size; @@ -493,7 +492,9 @@ static void ost_brw_lock_put(int mode, struct ost_prolong_data { struct obd_export *opd_exp; ldlm_policy_data_t opd_policy; + struct obdo *opd_oa; ldlm_mode_t opd_mode; + int opd_lock_match; }; static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) @@ -523,6 +524,14 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) return LDLM_ITER_CONTINUE; } + /* Fill the obdo with the matched lock handle. + * XXX: it is possible in some cases the IO RPC is covered by several + * locks, even for the write case, so it may need to be a lock list. */ + if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) { + opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie; + opd->opd_oa->o_valid |= OBD_MD_FLHANDLE; + } + if (!(lock->l_flags & LDLM_FL_AST_SENT)) { /* ignore locks not being cancelled */ return LDLM_ITER_CONTINUE; @@ -531,17 +540,18 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) /* OK. this is a possible lock the user holds doing I/O * let's refresh eviction timer for it */ ldlm_refresh_waiting_lock(lock); + opd->opd_lock_match = 1; return LDLM_ITER_CONTINUE; } -static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, - struct niobuf_remote *nb, struct obdo *oa, - ldlm_mode_t mode) +static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, + struct niobuf_remote *nb, struct obdo *oa, + ldlm_mode_t mode) { struct ldlm_res_id res_id; int nrbufs = obj->ioo_bufcnt; - struct ost_prolong_data opd; + struct ost_prolong_data opd = { 0 }; ENTRY; osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id); @@ -562,16 +572,28 @@ static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, lock = ldlm_handle2lock(&oa->o_handle); if (lock != NULL) { ost_prolong_locks_iter(lock, &opd); + if (opd.opd_lock_match) { + LDLM_LOCK_PUT(lock); + RETURN(1); + } + + /* Check if the lock covers the whole IO region, + * otherwise iterate through the resource. */ + if (lock->l_policy_data.l_extent.end >= + opd.opd_policy.l_extent.end && + lock->l_policy_data.l_extent.start <= + opd.opd_policy.l_extent.start) { + LDLM_LOCK_PUT(lock); + RETURN(0); + } LDLM_LOCK_PUT(lock); - EXIT; - return; } } + opd.opd_oa = oa; ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id, ost_prolong_locks_iter, &opd); - - EXIT; + RETURN(opd.opd_lock_match); } static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) @@ -586,7 +608,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) struct l_wait_info lwi; struct lustre_handle lockh = { 0 }; __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - int objcount, niocount, npages, nob = 0, rc, i; + int niocount, npages, nob = 0, rc, i; int no_reply = 0; ENTRY; @@ -608,49 +630,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (exp->exp_failed) GOTO(out, rc = -ENOTCONN); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR("Missing/short ost_body\n"); - GOTO(out, rc = -EFAULT); - } - - objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / - sizeof(*ioo); - if (objcount == 0) { - CERROR("Missing/short ioobj\n"); - GOTO(out, rc = -EFAULT); - } - if (objcount > 1) { - CERROR("too many ioobjs (%d)\n", objcount); - GOTO(out, rc = -EFAULT); - } + /* ost_body, ioobj & noibuf_remote are verified and swabbed in + * ost_rw_hpreq_check(). */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); - ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo), - lustre_swab_obd_ioobj); - if (ioo == NULL) { - CERROR("Missing/short ioobj\n"); - GOTO(out, rc = -EFAULT); - } + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo)); + LASSERT(ioo != NULL); niocount = ioo->ioo_bufcnt; - if (niocount > PTLRPC_MAX_BRW_PAGES) { - DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)", - niocount); - GOTO(out, rc = -EFAULT); - } - - remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, - niocount * sizeof(*remote_nb), - lustre_swab_niobuf_remote); - if (remote_nb == NULL) { - CERROR("Missing/short niobuf\n"); - GOTO(out, rc = -EFAULT); - } - if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */ - for (i = 1; i < niocount; i++) - lustre_swab_niobuf_remote (&remote_nb[i]); - } + remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*remote_nb)); + LASSERT(remote_nb != NULL); if (body->oa.o_valid & OBD_MD_FLOSSCAPA) capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3); @@ -695,7 +686,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (desc == NULL) /* XXX: check all cleanup stuff */ GOTO(out, rc = -ENOMEM); - ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR); + ost_rw_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR); nob = 0; for (i = 0; i < npages; i++) { @@ -860,7 +851,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) __u32 *rcs; __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int objcount, niocount, npages; - int rc, swab, i, j; + int rc, i, j; obd_count client_cksum = 0, server_cksum = 0; cksum_type_t cksum_type = OBD_CKSUM_CRC32; int no_reply = 0; @@ -888,56 +879,22 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if (exp->exp_failed) GOTO(out, rc = -ENOTCONN); - swab = lustre_msg_swabbed(req->rq_reqmsg); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR("Missing/short ost_body\n"); - GOTO(out, rc = -EFAULT); - } + /* ost_body, ioobj & noibuf_remote are verified and swabbed in + * ost_rw_hpreq_check(). */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); - lustre_set_req_swabbed(req, REQ_REC_OFF + 1); objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / - sizeof(*ioo); - if (objcount == 0) { - CERROR("Missing/short ioobj\n"); - GOTO(out, rc = -EFAULT); - } - if (objcount > 1) { - CERROR("too many ioobjs (%d)\n", objcount); - GOTO(out, rc = -EFAULT); - } - + sizeof(*ioo); ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, objcount * sizeof(*ioo)); - LASSERT (ioo != NULL); - for (niocount = i = 0; i < objcount; i++) { - if (swab) - lustre_swab_obd_ioobj(&ioo[i]); - if (ioo[i].ioo_bufcnt == 0) { - CERROR("ioo[%d] has zero bufcnt\n", i); - GOTO(out, rc = -EFAULT); - } + LASSERT(ioo != NULL); + for (niocount = i = 0; i < objcount; i++) niocount += ioo[i].ioo_bufcnt; - } - if (niocount > PTLRPC_MAX_BRW_PAGES) { - DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)", - niocount); - GOTO(out, rc = -EFAULT); - } - - remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, - niocount * sizeof(*remote_nb), - lustre_swab_niobuf_remote); - if (remote_nb == NULL) { - CERROR("Missing/short niobuf\n"); - GOTO(out, rc = -EFAULT); - } - if (swab) { /* swab the remaining niobufs */ - for (i = 1; i < niocount; i++) - lustre_swab_niobuf_remote (&remote_nb[i]); - } + remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*remote_nb)); + LASSERT(remote_nb != NULL); if (body->oa.o_valid & OBD_MD_FLOSSCAPA) capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3); @@ -975,7 +932,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_lock, rc = -ETIMEDOUT); } - ost_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW); + ost_rw_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW); /* obd_preprw clobbers oa->valid, so save what we need */ if (body->oa.o_valid & OBD_MD_FLCKSUM) { @@ -1590,6 +1547,250 @@ int ost_msg_check_version(struct lustre_msg *msg) return rc; } +static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + struct ost_body *body; + int objcount, niocount; + int mode, opc, i; + __u64 start, end; + ENTRY; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + LASSERT(opc == OST_READ || opc == OST_WRITE); + + /* As the request may be covered by several locks, do not look at + * o_handle, look at the RPC IO region. */ + body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), + lustre_swab_obdo); + objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / + sizeof(*ioo); + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, + objcount * sizeof(*ioo)); + LASSERT(ioo != NULL); + for (niocount = i = 0; i < objcount; i++) + niocount += ioo[i].ioo_bufcnt; + + nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*nb)); + LASSERT(nb != NULL); + + mode = LCK_PW; + if (opc == OST_READ) + mode |= LCK_PR; + + start = nb[0].offset & CFS_PAGE_MASK; + end = (nb[ioo->ioo_bufcnt - 1].offset + + nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK; + + if (!(lock->l_granted_mode & mode)) + RETURN(0); + + if (lock->l_policy_data.l_extent.end < start || + lock->l_policy_data.l_extent.start > end) + RETURN(0); + + RETURN(1); +} + +/** + * Swab buffers needed to call ost_rw_prolong_locks() and call it. + * Return the value from ost_rw_prolong_locks() which is non-zero if + * there is a cancelled lock which is waiting for this IO request. + */ +static int ost_rw_hpreq_check(struct ptlrpc_request *req) +{ + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + struct ost_body *body; + int objcount, niocount; + int mode, opc, i; + ENTRY; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + LASSERT(opc == OST_READ || opc == OST_WRITE); + + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); + + objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / + sizeof(*ioo); + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, + objcount * sizeof(*ioo)); + LASSERT(ioo != NULL); + + for (niocount = i = 0; i < objcount; i++) + niocount += ioo[i].ioo_bufcnt; + nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*nb)); + LASSERT(nb != NULL); + LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)); + + mode = LCK_PW; + if (opc == OST_READ) + mode |= LCK_PR; + RETURN(ost_rw_prolong_locks(req->rq_export, ioo, nb, &body->oa, mode)); +} + +static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa) +{ + struct ldlm_res_id res_id = { .name = { oa->o_id } }; + struct ost_prolong_data opd = { 0 }; + __u64 start, end; + ENTRY; + + start = oa->o_size; + end = start + oa->o_blocks; + + opd.opd_mode = LCK_PW; + opd.opd_exp = exp; + opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK; + if (oa->o_blocks == OBD_OBJECT_EOF || end < start) + opd.opd_policy.l_extent.end = OBD_OBJECT_EOF; + else + opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK; + + CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n", + res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start, + opd.opd_policy.l_extent.end); + + opd.opd_oa = oa; + ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id, + ost_prolong_locks_iter, &opd); + RETURN(opd.opd_lock_match); +} + +static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct ost_body *body; + ENTRY; + + body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), + lustre_swab_obdo); + LASSERT(body != NULL); + + if (body->oa.o_valid & OBD_MD_FLHANDLE && + body->oa.o_handle.cookie == lock->l_handle.h_cookie) + RETURN(1); + RETURN(0); +} + +static int ost_punch_hpreq_check(struct ptlrpc_request *req) +{ + struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, + REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); + LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) || + !(body->oa.o_flags & OBD_FL_TRUNCLOCK)); + + RETURN(ost_punch_prolong_locks(req->rq_export, &body->oa)); +} + +struct ptlrpc_hpreq_ops ost_hpreq_rw = { + .hpreq_lock_match = ost_rw_hpreq_lock_match, + .hpreq_check = ost_rw_hpreq_check, +}; + +struct ptlrpc_hpreq_ops ost_hpreq_punch = { + .hpreq_lock_match = ost_punch_hpreq_lock_match, + .hpreq_check = ost_punch_hpreq_check, +}; + +/** Assign high priority operations to the request if needed. */ +static int ost_hpreq_handler(struct ptlrpc_request *req) +{ + ENTRY; + if (req->rq_export) { + int opc = lustre_msg_get_opc(req->rq_reqmsg); + struct ost_body *body; + + if (opc == OST_READ || opc == OST_WRITE) { + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + int objcount, niocount; + int swab, i; + + body = lustre_swab_reqbuf(req, REQ_REC_OFF, + sizeof(*body), + lustre_swab_obdo); + if (!body) { + CERROR("Missing/short ost_body\n"); + RETURN(-EFAULT); + } + objcount = lustre_msg_buflen(req->rq_reqmsg, + REQ_REC_OFF + 1) / + sizeof(*ioo); + if (objcount == 0) { + CERROR("Missing/short ioobj\n"); + RETURN(-EFAULT); + } + if (objcount > 1) { + CERROR("too many ioobjs (%d)\n", objcount); + RETURN(-EFAULT); + } + + swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) && + lustre_msg_swabbed(req->rq_reqmsg); + ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, + objcount * sizeof(*ioo), + lustre_swab_obd_ioobj); + if (!ioo) { + CERROR("Missing/short ioobj\n"); + RETURN(-EFAULT); + } + for (niocount = i = 0; i < objcount; i++) { + if (i > 0 && swab) + lustre_swab_obd_ioobj(&ioo[i]); + if (ioo[i].ioo_bufcnt == 0) { + CERROR("ioo[%d] has zero bufcnt\n", i); + RETURN(-EFAULT); + } + niocount += ioo[i].ioo_bufcnt; + } + if (niocount > PTLRPC_MAX_BRW_PAGES) { + DEBUG_REQ(D_ERROR, req, "bulk has too many " + "pages (%d)", niocount); + RETURN(-EFAULT); + } + + swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) && + lustre_msg_swabbed(req->rq_reqmsg); + nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, + niocount * sizeof(*nb), + lustre_swab_niobuf_remote); + if (!nb) { + CERROR("Missing/short niobuf\n"); + RETURN(-EFAULT); + } + + if (swab) { + /* swab remaining niobufs */ + for (i = 1; i < niocount; i++) + lustre_swab_niobuf_remote(&nb[i]); + } + + if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)) + req->rq_ops = &ost_hpreq_rw; + } else if (opc == OST_PUNCH) { + body = lustre_swab_reqbuf(req, REQ_REC_OFF, + sizeof(*body), + lustre_swab_obdo); + if (!body) { + CERROR("Missing/short ost_body\n"); + RETURN(-EFAULT); + } + + if (!(body->oa.o_valid & OBD_MD_FLFLAGS) || + !(body->oa.o_flags & OBD_FL_TRUNCLOCK)) + req->rq_ops = &ost_hpreq_punch; + } + } + RETURN(0); +} + /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */ int ost_handle(struct ptlrpc_request *req) { @@ -1955,7 +2156,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) /* Insure a 4x range for dynamic threads */ if (oss_min_threads > OSS_THREADS_MAX / 4) oss_min_threads = OSS_THREADS_MAX / 4; - oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4); + oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1); } ost->ost_service = @@ -1965,7 +2166,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) ost_handle, LUSTRE_OSS_NAME, obd->obd_proc_entry, target_print_req, oss_min_threads, oss_max_threads, - "ll_ost", LCT_DT_THREAD); + "ll_ost", LCT_DT_THREAD, NULL); if (ost->ost_service == NULL) { CERROR("failed to start service\n"); GOTO(out_lprocfs, rc = -ENOMEM); @@ -1994,7 +2195,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) ost_handle, "ost_create", obd->obd_proc_entry, target_print_req, oss_min_create_threads, oss_max_create_threads, - "ll_ost_creat", LCT_DT_THREAD); + "ll_ost_creat", LCT_DT_THREAD, NULL); if (ost->ost_create_service == NULL) { CERROR("failed to start OST create service\n"); GOTO(out_service, rc = -ENOMEM); @@ -2011,7 +2212,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) ost_handle, "ost_io", obd->obd_proc_entry, target_print_req, oss_min_threads, oss_max_threads, - "ll_ost_io", LCT_DT_THREAD); + "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler); if (ost->ost_io_service == NULL) { CERROR("failed to start OST I/O service\n"); GOTO(out_create, rc = -ENOMEM); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 000904c..045365c 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -538,6 +538,7 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request, CFS_INIT_LIST_HEAD(&request->rq_ctx_chain); CFS_INIT_LIST_HEAD(&request->rq_set_chain); CFS_INIT_LIST_HEAD(&request->rq_history_list); + CFS_INIT_LIST_HEAD(&request->rq_exp_list); cfs_waitq_init(&request->rq_reply_waitq); request->rq_xid = ptlrpc_next_xid(); atomic_set(&request->rq_refcount, 1); @@ -1705,6 +1706,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */ LASSERTF(list_empty(&request->rq_list), "req %p\n", request); LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request); + LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request); LASSERTF(!request->rq_replay, "req %p\n", request); LASSERT(request->rq_cli_ctx); diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 29b805c..d5af015 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -514,6 +514,32 @@ static int ptlrpc_lprocfs_rd_timeouts(char *page, char **start, off_t off, return rc; } +static int ptlrpc_lprocfs_rd_hp_ratio(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct ptlrpc_service *svc = data; + int rc = snprintf(page, count, "%d", svc->srv_hpreq_ratio); + return rc; +} + +static int ptlrpc_lprocfs_wr_hp_ratio(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct ptlrpc_service *svc = data; + int rc, val; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc < 0) + return rc; + if (val < 0) + return -ERANGE; + + spin_lock(&svc->srv_lock); + svc->srv_hpreq_ratio = val; + spin_unlock(&svc->srv_lock); + return count; +} + void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry, struct ptlrpc_service *svc) { @@ -529,6 +555,10 @@ void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry, {.name = "timeouts", .read_fptr = ptlrpc_lprocfs_rd_timeouts, .data = svc}, + {.name = "high_priority_ratio", + .read_fptr = ptlrpc_lprocfs_rd_hp_ratio, + .write_fptr = ptlrpc_lprocfs_wr_hp_ratio, + .data = svc}, {NULL} }; static struct file_operations req_history_fops = { diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 2b9466a..8739098 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -754,6 +754,9 @@ void *lustre_swab_buf(struct lustre_msg *msg, int index, int min_size, void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size, void *swabber) { + if (lustre_req_swabbed(req, index)) + return lustre_msg_buf(req->rq_reqmsg, index, min_size); + lustre_set_req_swabbed(req, index); return lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber); } @@ -761,6 +764,9 @@ void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size, void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size, void *swabber) { + if (lustre_rep_swabbed(req, index)) + return lustre_msg_buf(req->rq_repmsg, index, min_size); + lustre_set_rep_swabbed(req, index); return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber); } diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index d8e7c86..03adc07 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -211,6 +211,7 @@ EXPORT_SYMBOL(ptlrpc_start_thread); EXPORT_SYMBOL(ptlrpc_unregister_service); EXPORT_SYMBOL(ptlrpc_daemonize); EXPORT_SYMBOL(ptlrpc_service_health_check); +EXPORT_SYMBOL(ptlrpc_hpreq_reorder); /* pack_generic.c */ EXPORT_SYMBOL(lustre_msg_swabbed); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index d8d737d..a0bf907 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -300,7 +300,7 @@ struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, c->psc_watchdog_factor, h, name, proc_entry, prntfn, c->psc_min_threads, c->psc_max_threads, - threadname, c->psc_ctx_tags); + threadname, c->psc_ctx_tags, NULL); } EXPORT_SYMBOL(ptlrpc_init_svc_conf); @@ -320,7 +320,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, cfs_proc_dir_entry_t *proc_entry, svcreq_printfn_t svcreq_printfn, int min_threads, int max_threads, - char *threadname, __u32 ctx_tags) + char *threadname, __u32 ctx_tags, + svc_hpreq_handler_t hp_handler) { int rc; struct ptlrpc_service *service; @@ -355,11 +356,16 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, service->srv_threads_max = max_threads; service->srv_thread_name = threadname; service->srv_ctx_tags = ctx_tags; + service->srv_hpreq_handler = hp_handler; + service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO; + service->srv_hpreq_count = 0; + service->srv_n_hpreq = 0; rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT (rc == 0); CFS_INIT_LIST_HEAD(&service->srv_request_queue); + CFS_INIT_LIST_HEAD(&service->srv_request_hpq); CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds); CFS_INIT_LIST_HEAD(&service->srv_active_rqbds); CFS_INIT_LIST_HEAD(&service->srv_history_rqbds); @@ -520,6 +526,11 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) { struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + if (req->rq_export) { + class_export_put(req->rq_export); + req->rq_export = NULL; + } + if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ DEBUG_REQ(D_INFO, req, "free req"); @@ -537,7 +548,7 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) { struct obd_export *oldest_exp; - time_t oldest_time; + time_t oldest_time, new_time; ENTRY; @@ -548,9 +559,13 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) of the list, we can be really lazy here - we don't have to evict at the exact right moment. Eventually, all silent exports will make it to the top of the list. */ - exp->exp_last_request_time = max(exp->exp_last_request_time, - cfs_time_current_sec() + extra_delay); + /* Do not pay attention on 1sec or smaller renewals. */ + new_time = cfs_time_current_sec() + extra_delay; + if (exp->exp_last_request_time + 1 /*second */ >= new_time) + RETURN_EXIT; + + exp->exp_last_request_time = new_time; CDEBUG(D_HA, "updating export %s at "CFS_TIME_T" exp %p\n", exp->exp_client_uuid.uuid, exp->exp_last_request_time, exp); @@ -563,8 +578,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) if (list_empty(&exp->exp_obd_chain_timed)) { /* this one is not timed */ spin_unlock(&exp->exp_obd->obd_dev_lock); - EXIT; - return; + RETURN_EXIT; } list_move_tail(&exp->exp_obd_chain_timed, @@ -924,6 +938,167 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) RETURN(0); } +/** + * Put the request to the export list if the request may become + * a high priority one. + */ +static int ptlrpc_hpreq_init(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + int rc; + ENTRY; + + if (svc->srv_hpreq_handler) { + rc = svc->srv_hpreq_handler(req); + if (rc) + RETURN(rc); + } + if (req->rq_export && req->rq_ops) { + spin_lock(&req->rq_export->exp_lock); + list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc); + spin_unlock(&req->rq_export->exp_lock); + } + + RETURN(0); +} + +/** Remove the request from the export list. */ +static void ptlrpc_hpreq_fini(struct ptlrpc_request *req) +{ + ENTRY; + if (req->rq_export && req->rq_ops) { + spin_lock(&req->rq_export->exp_lock); + list_del_init(&req->rq_exp_list); + spin_unlock(&req->rq_export->exp_lock); + } + EXIT; +} + +/** + * Make the request a high priority one. + * + * All the high priority requests are queued in a separate FIFO + * ptlrpc_service::srv_request_hpq list which is parallel to + * ptlrpc_service::srv_request_queue list but has a higher priority + * for handling. + * + * \see ptlrpc_server_handle_request(). + */ +static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + ENTRY; + LASSERT(svc != NULL); + spin_lock(&req->rq_lock); + if (req->rq_hp == 0) { + int opc = lustre_msg_get_opc(req->rq_reqmsg); + + /* Add to the high priority queue. */ + list_move_tail(&req->rq_list, &svc->srv_request_hpq); + req->rq_hp = 1; + if (opc != OBD_PING) + DEBUG_REQ(D_NET, req, "high priority req"); + } + spin_unlock(&req->rq_lock); + EXIT; +} + +void ptlrpc_hpreq_reorder(struct ptlrpc_request *req) +{ + struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; + ENTRY; + + spin_lock(&svc->srv_lock); + /* It may happen that the request is already taken for the processing + * but still in the export list, do not re-add it into the HP list. */ + if (req->rq_phase == RQ_PHASE_NEW) + ptlrpc_hpreq_reorder_nolock(svc, req); + spin_unlock(&svc->srv_lock); + EXIT; +} + +/** Check if the request if a high priority one. */ +static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req) +{ + int opc, rc = 0; + ENTRY; + + /* Check by request opc. */ + opc = lustre_msg_get_opc(req->rq_reqmsg); + if (opc == OBD_PING) + RETURN(1); + + /* Perform request specific check. */ + if (req->rq_ops && req->rq_ops->hpreq_check) + rc = req->rq_ops->hpreq_check(req); + RETURN(rc); +} + +/** Check if a request is a high priority one. */ +static int ptlrpc_server_request_add(struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + int rc; + ENTRY; + + rc = ptlrpc_server_hpreq_check(req); + if (rc < 0) + RETURN(rc); + + spin_lock(&svc->srv_lock); + /* Before inserting the request into the queue, check if it is not + * inserted yet, or even already handled -- it may happen due to + * a racing ldlm_server_blocking_ast(). */ + if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) { + if (rc) + ptlrpc_hpreq_reorder_nolock(svc, req); + else + list_add_tail(&req->rq_list, &svc->srv_request_queue); + } + spin_unlock(&svc->srv_lock); + + RETURN(0); +} + +/* Only allow normal priority requests on a service that has a high-priority + * queue if forced (i.e. cleanup), if there are other high priority requests + * already being processed (i.e. those threads can service more high-priority + * requests), or if there are enough idle threads that a later thread can do + * a high priority request. */ +static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force) +{ + return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 || + svc->srv_n_active_reqs < svc->srv_threads_running - 2; +} + +static struct ptlrpc_request * +ptlrpc_server_request_get(struct ptlrpc_service *svc, int force) +{ + struct ptlrpc_request *req = NULL; + ENTRY; + + if (ptlrpc_server_allow_normal(svc, force) && + !list_empty(&svc->srv_request_queue) && + (list_empty(&svc->srv_request_hpq) || + svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) { + req = list_entry(svc->srv_request_queue.next, + struct ptlrpc_request, rq_list); + svc->srv_hpreq_count = 0; + } else if (!list_empty(&svc->srv_request_hpq)) { + req = list_entry(svc->srv_request_hpq.next, + struct ptlrpc_request, rq_list); + svc->srv_hpreq_count++; + } + RETURN(req); +} + +static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force) +{ + return ((ptlrpc_server_allow_normal(svc, force) && + !list_empty(&svc->srv_request_queue)) || + !list_empty(&svc->srv_request_hpq)); +} + /* Handle freshly incoming reqs, add to timed early reply list, pass on to regular request queue */ static int @@ -1003,10 +1178,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) "illegal security flavor,"); } - class_export_put(req->rq_export); - req->rq_export = NULL; if (rc) goto err_req; + ptlrpc_update_export_timer(req->rq_export, 0); } /* req_in handling should/must be fast */ @@ -1027,12 +1201,15 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc) } ptlrpc_at_add_timed(req); + rc = ptlrpc_hpreq_init(svc, req); + if (rc) + GOTO(err_req, rc); /* Move it over to the request processing queue */ - spin_lock(&svc->srv_lock); - list_add_tail(&req->rq_list, &svc->srv_request_queue); + rc = ptlrpc_server_request_add(svc, req); + if (rc) + GOTO(err_req, rc); cfs_waitq_signal(&svc->srv_waitq); - spin_unlock(&svc->srv_lock); RETURN(1); err_req: @@ -1054,13 +1231,14 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, struct timeval work_start; struct timeval work_end; long timediff; - int rc; + int opc, rc; + int fail_opc = 0; ENTRY; LASSERT(svc); spin_lock(&svc->srv_lock); - if (unlikely(list_empty (&svc->srv_request_queue) || + if (unlikely(!ptlrpc_server_request_pending(svc, 0) || ( #ifndef __KERNEL__ /* !@%$# liblustre only has 1 thread */ @@ -1073,16 +1251,47 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, * That means we always need at least 2 service threads. */ spin_unlock(&svc->srv_lock); RETURN(0); + } + + request = ptlrpc_server_request_get(svc, 0); + if (request == NULL) { + spin_unlock(&svc->srv_lock); + RETURN(0); + } + + opc = lustre_msg_get_opc(request->rq_reqmsg); + if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT)) + fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT; + else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT)) + fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT; + + if (unlikely(fail_opc)) { + if (request->rq_export && request->rq_ops) { + spin_unlock(&svc->srv_lock); + OBD_FAIL_TIMEOUT(fail_opc, 4); + spin_lock(&svc->srv_lock); + request = ptlrpc_server_request_get(svc, 0); + if (request == NULL) { + spin_unlock(&svc->srv_lock); + RETURN(0); + } + LASSERT(ptlrpc_server_request_pending(svc, 0)); + } } - request = list_entry (svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); - list_del_init (&request->rq_list); + list_del_init(&request->rq_list); svc->srv_n_queued_reqs--; svc->srv_n_active_reqs++; + if (request->rq_hp) + svc->srv_n_hpreq++; + /* The phase is changed under the lock here because we need to know + * the request is under processing (see ptlrpc_hpreq_reorder()). */ + ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); spin_unlock(&svc->srv_lock); + ptlrpc_hpreq_fini(request); + if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG)) libcfs_debug_dumplog(); @@ -1115,9 +1324,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, if (thread) request->rq_svc_thread->t_env->le_ses = &request->rq_session; - request->rq_export = class_conn2export( - lustre_msg_get_handle(request->rq_reqmsg)); - if (likely(request->rq_export)) { if (unlikely(ptlrpc_check_req(request))) goto put_conn; @@ -1138,8 +1344,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc, goto put_rpc_export; } - ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET); - CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc " "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(), (request->rq_export ? @@ -1170,9 +1374,6 @@ put_rpc_export: if (export != NULL) class_export_rpc_put(export); put_conn: - if (likely(request->rq_export != NULL)) - class_export_put(request->rq_export); - lu_context_exit(&request->rq_session); lu_context_fini(&request->rq_session); @@ -1217,6 +1418,10 @@ put_conn: } out_req: + spin_lock(&svc->srv_lock); + if (request->rq_hp) + svc->srv_n_hpreq--; + spin_unlock(&svc->srv_lock); ptlrpc_server_finish_request(request); RETURN(1); @@ -1515,7 +1720,7 @@ static int ptlrpc_main(void *arg) svc->srv_rqbd_timeout == 0) || !list_empty(&svc->srv_req_in_queue) || !list_empty(&svc->srv_reply_queue) || - (!list_empty(&svc->srv_request_queue) && + (ptlrpc_server_request_pending(svc, 0) && (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) || svc->srv_at_check, @@ -1550,7 +1755,7 @@ static int ptlrpc_main(void *arg) ptlrpc_at_check_timed(svc); /* don't handle requests in the last thread */ - if (!list_empty (&svc->srv_request_queue) && + if (ptlrpc_server_request_pending(svc, 0) && (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) { lu_context_enter(&env.le_ctx); ptlrpc_server_handle_request(svc, thread); @@ -1806,16 +2011,14 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) service->srv_n_active_reqs++; ptlrpc_server_finish_request(req); } - while (!list_empty(&service->srv_request_queue)) { - struct ptlrpc_request *req = - list_entry(service->srv_request_queue.next, - struct ptlrpc_request, - rq_list); + while (ptlrpc_server_request_pending(service, 1)) { + struct ptlrpc_request *req; + req = ptlrpc_server_request_get(service, 1); list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; - + ptlrpc_hpreq_fini(req); ptlrpc_server_finish_request(req); } LASSERT(service->srv_n_queued_reqs == 0); @@ -1879,14 +2082,18 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) do_gettimeofday(&right_now); spin_lock(&svc->srv_lock); - if (list_empty(&svc->srv_request_queue)) { + if (!ptlrpc_server_request_pending(svc, 1)) { spin_unlock(&svc->srv_lock); return 0; } /* How long has the next entry been waiting? */ - request = list_entry(svc->srv_request_queue.next, - struct ptlrpc_request, rq_list); + if (list_empty(&svc->srv_request_queue)) + request = list_entry(svc->srv_request_hpq.next, + struct ptlrpc_request, rq_list); + else + request = list_entry(svc->srv_request_queue.next, + struct ptlrpc_request, rq_list); timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL); spin_unlock(&svc->srv_lock); diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index 589573c..08b4899 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -779,6 +779,56 @@ run_test 33a "commit on sharing, cross crete/delete, 2 clients, benchmark" # End commit on sharing tests +test_33() { #16129 + for OPER in notimeout timeout ; do + rm $DIR1/$tfile 2>/dev/null + lock_in=0; + for f in `lctl get_param -n ldlm/namespaces/*/lock_timeouts`; do + lock_in=$(($lock_in + $f)) + done + if [ $OPER == "timeout" ] ; then + for j in `seq $OSTCOUNT`; do + #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510 + do_facet ost$j lctl set_param fail_loc=0x510 + done + echo lock should expire + else + for j in `seq $OSTCOUNT`; do + #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511 + do_facet ost$j lctl set_param fail_loc=0x511 + done + echo lock should not expire + fi + echo writing on client1 + dd if=/dev/zero of=$DIR1/$tfile count=100 conv=notrunc > /dev/null 2>&1 + sync & + # wait for the flush + sleep 1 + echo reading on client2 + dd of=/dev/null if=$DIR2/$tfile > /dev/null 2>&1 + # wait for a lock timeout + sleep 4 + lock_out=0 + for f in `lctl get_param -n ldlm/namespaces/*/lock_timeouts`; do + lock_out=$(($lock_out + $f)) + done + if [ $OPER == "timeout" ] ; then + if [ $lock_in == $lock_out ]; then + error "no lock timeout happened" + else + echo "success" + fi + else + if [ $lock_in != $lock_out ]; then + error "lock timeout happened" + else + echo "success" + fi + fi + done +} +run_test 33 "no lock timeout under IO" + log "cleanup: ======================================================" check_and_cleanup_lustre