#endif /* !CDEBUG_ENTRY_EXIT */
+#define RETURN_EXIT \
+do { \
+ EXIT_NESTING; \
+ return; \
+} while (0)
struct libcfs_debug_msg_data {
cfs_debug_limit_state_t *msg_cdls;
unsigned int ns_max_unused;
unsigned int ns_max_age;
+ unsigned int ns_timeouts;
/**
* Seconds.
*/
exp_flvr_changed:1,
exp_flvr_adapt:1,
exp_libclient:1; /* liblustre client? */
+ struct list_head exp_queued_rpc; /* RPC to be handled */
/* also protected by exp_lock */
enum lustre_sec_part exp_sp_peer;
struct sptlrpc_flavor exp_flvr; /* current */
#define MGS_MAXREPSIZE (9 * 1024)
/* Absolute limits */
-#define OSS_THREADS_MIN 2
+#define OSS_THREADS_MIN 3 /* difficult replies, HPQ, others */
#define OSS_THREADS_MAX 512
#define OST_NBUFS (64 * num_online_cpus())
#define OST_BUFSIZE (8 * 1024)
struct lu_context;
struct lu_env;
+struct ldlm_lock;
+
+struct ptlrpc_hpreq_ops {
+ /**
+ * Check if the lock handle of the given lock is the same as
+ * taken from the request.
+ */
+ int (*hpreq_lock_match)(struct ptlrpc_request *, struct ldlm_lock *);
+ /**
+ * Check if the request is a high priority one.
+ */
+ int (*hpreq_check)(struct ptlrpc_request *);
+};
+
/**
* Represents remote procedure call.
*/
struct list_head rq_list;
struct list_head rq_timed_list; /* server-side early replies */
struct list_head rq_history_list; /* server-side history */
+ struct list_head rq_exp_list; /* server-side per-export list */
+ struct ptlrpc_hpreq_ops *rq_ops; /* server-side hp handlers */
__u64 rq_history_seq; /* history sequence # */
int rq_status;
spinlock_t rq_lock;
rq_early:1, rq_must_unlink:1,
/* server-side flags */
rq_packed_final:1, /* packed final reply */
- rq_sent_final:1; /* stop sending early replies */
+ rq_sent_final:1, /* stop sending early replies */
+ rq_hp:1; /* high priority RPC */
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"), \
FLAG(req->rq_no_resend, "N"), \
FLAG(req->rq_waiting, "W"), \
- FLAG(req->rq_wait_ctx, "C")
+ FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H")
-#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s"
+#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s"
void _debug_req(struct ptlrpc_request *req, __u32 mask,
struct libcfs_debug_msg_data *data, const char *fmt, ...)
typedef int (*svc_handler_t)(struct ptlrpc_request *req);
typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *);
+typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *);
+
+#define PTLRPC_SVC_HP_RATIO 10
struct ptlrpc_service {
struct list_head srv_list; /* chain thru all services */
int srv_threads_running; /* # running threads */
int srv_n_difficult_replies; /* # 'difficult' replies */
int srv_n_active_reqs; /* # reqs being served */
+ int srv_n_hpreq; /* # HPreqs being served */
cfs_duration_t srv_rqbd_timeout; /* timeout before re-posting reqs, in tick */
int srv_watchdog_factor; /* soft watchdog timeout mutiplier */
unsigned srv_cpu_affinity:1; /* bind threads to CPUs */
cfs_timer_t srv_at_timer; /* early reply timer */
int srv_n_queued_reqs; /* # reqs in either of the queues below */
+ int srv_hpreq_count; /* # hp requests handled */
+ int srv_hpreq_ratio; /* # hp per lp reqs to handle */
struct list_head srv_req_in_queue; /* incoming reqs */
struct list_head srv_request_queue; /* reqs waiting for service */
+ struct list_head srv_request_hpq; /* high priority queue */
struct list_head srv_request_history; /* request history */
__u64 srv_request_seq; /* next request sequence # */
struct list_head srv_threads; /* service thread list */
svc_handler_t srv_handler;
+ svc_hpreq_handler_t srv_hpreq_handler; /* hp request handler */
char *srv_name; /* only statically allocated strings here; we don't clean them */
char *srv_thread_name; /* only statically allocated strings here; we don't clean them */
cfs_proc_dir_entry_t *proc_entry,
svcreq_printfn_t,
int min_threads, int max_threads,
- char *threadname, __u32 ctx_tags);
+ char *threadname, __u32 ctx_tags,
+ svc_hpreq_handler_t);
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc);
int liblustre_check_services (void *arg);
void ptlrpc_daemonize(char *name);
int ptlrpc_service_health_check(struct ptlrpc_service *);
+void ptlrpc_hpreq_reorder(struct ptlrpc_request *req);
struct ptlrpc_svc_data {
#define OBD_FAIL_PTLRPC_DUMP_LOG 0x50e
#define OBD_FAIL_PTLRPC_LONG_UNLINK 0x50f
+#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510
+#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511
#define OBD_FAIL_OBD_PING_NET 0x600
#define OBD_FAIL_OBD_LOG_CANCEL_NET 0x601
CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list);
lu_ref_init(&lock->l_reference);
lu_ref_add(&lock->l_reference, "hash", lock);
+ lock->l_callback_timeout = 0;
RETURN(lock);
}
data->msg_fn, data->msg_line, fmt, args,
" ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
"res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: "
- LPX64" expref: %d pid: %u\n", lock,
+ LPX64" expref: %d pid: %u timeout: %lu\n", lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
lock->l_readers, lock->l_writers,
ldlm_lockname[lock->l_granted_mode],
lock->l_flags, lock->l_remote_handle.cookie,
lock->l_export ?
atomic_read(&lock->l_export->exp_refcount) : -99,
- lock->l_pid);
+ lock->l_pid, lock->l_callback_timeout);
va_end(args);
return;
}
" ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
"res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
"] (req "LPU64"->"LPU64") flags: %x remote: "LPX64
- " expref: %d pid: %u\n",
+ " expref: %d pid: %u timeout %lu\n",
lock->l_resource->lr_namespace->ns_name, lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
lock->l_readers, lock->l_writers,
lock->l_flags, lock->l_remote_handle.cookie,
lock->l_export ?
atomic_read(&lock->l_export->exp_refcount) : -99,
- lock->l_pid);
+ lock->l_pid, lock->l_callback_timeout);
break;
case LDLM_FLOCK:
" ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
"res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
"["LPU64"->"LPU64"] flags: %x remote: "LPX64
- " expref: %d pid: %u\n",
+ " expref: %d pid: %u timeout: %lu\n",
lock->l_resource->lr_namespace->ns_name, lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
lock->l_readers, lock->l_writers,
lock->l_flags, lock->l_remote_handle.cookie,
lock->l_export ?
atomic_read(&lock->l_export->exp_refcount) : -99,
- lock->l_pid);
+ lock->l_pid, lock->l_callback_timeout);
break;
case LDLM_IBITS:
" ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
"res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
"flags: %x remote: "LPX64" expref: %d "
- "pid %u\n",
+ "pid: %u timeout: %lu\n",
lock->l_resource->lr_namespace->ns_name,
lock, lock->l_handle.h_cookie,
atomic_read (&lock->l_refc),
lock->l_flags, lock->l_remote_handle.cookie,
lock->l_export ?
atomic_read(&lock->l_export->exp_refcount) : -99,
- lock->l_pid);
+ lock->l_pid, lock->l_callback_timeout);
break;
default:
data->msg_fn, data->msg_line, fmt, args,
" ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
"res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x "
- "remote: "LPX64" expref: %d pid: %u\n",
+ "remote: "LPX64" expref: %d pid: %u timeout %lu\n",
lock->l_resource->lr_namespace->ns_name,
lock, lock->l_handle.h_cookie,
atomic_read (&lock->l_refc),
lock->l_flags, lock->l_remote_handle.cookie,
lock->l_export ?
atomic_read(&lock->l_export->exp_refcount) : -99,
- lock->l_pid);
+ lock->l_pid, lock->l_callback_timeout);
break;
}
va_end(args);
static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
+/**
+ * Check if there is a request in the export request list
+ * which prevents the lock canceling.
+ */
+static int ldlm_lock_busy(struct ldlm_lock *lock)
+{
+ struct ptlrpc_request *req;
+ int match = 0;
+ ENTRY;
+
+ if (lock->l_export == NULL)
+ return 0;
+
+ spin_lock(&lock->l_export->exp_lock);
+ list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ if (req->rq_ops->hpreq_lock_match) {
+ match = req->rq_ops->hpreq_lock_match(req, lock);
+ if (match)
+ break;
+ }
+ }
+ spin_unlock(&lock->l_export->exp_lock);
+ RETURN(match);
+}
+
/* This is called from within a timer interrupt and cannot schedule */
static void waiting_locks_callback(unsigned long unused)
{
while (!list_empty(&waiting_locks_list)) {
lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
l_pending_chain);
-
if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) ||
(lock->l_req_mode == LCK_GROUP))
break;
goto repeat;
}
+ /* Check if we need to prolong timeout */
+ if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
+ ldlm_lock_busy(lock)) {
+ int cont = 1;
+
+ if (lock->l_pending_chain.next == &waiting_locks_list)
+ cont = 0;
+
+ LDLM_LOCK_GET(lock);
+ spin_unlock_bh(&waiting_locks_spinlock);
+ LDLM_DEBUG(lock, "prolong the busy lock");
+ ldlm_refresh_waiting_lock(lock);
+ spin_lock_bh(&waiting_locks_spinlock);
+
+ if (!cont) {
+ LDLM_LOCK_PUT(lock);
+ break;
+ }
+
+ LDLM_LOCK_PUT(lock);
+ continue;
+ }
+ lock->l_resource->lr_namespace->ns_timeouts++;
LDLM_ERROR(lock, "lock callback timer expired after %lds: "
"evicting client at %s ",
cfs_time_current_sec()- lock->l_enqueued_time.tv_sec,
*/
static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
- int timeout;
+ cfs_time_t timeout;
cfs_time_t timeout_rounded;
if (!list_empty(&lock->l_pending_chain))
return 0;
- timeout = ldlm_get_enq_timeout(lock);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
+ OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+ timeout = 2;
+ else
+ timeout = ldlm_get_enq_timeout(lock);
- lock->l_callback_timeout = cfs_time_shift(timeout);
+ timeout = cfs_time_shift(timeout);
+ if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
+ lock->l_callback_timeout = timeout;
timeout_rounded = round_timeout(lock->l_callback_timeout);
LDLM_DEBUG(lock, "refreshed");
return 1;
}
-
#else /* !__KERNEL__ */
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
RETURN(rc);
}
+/**
+ * Check if there are requests in the export request list which prevent
+ * the lock canceling and make these requests high priority ones.
+ */
+static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
+{
+ struct ptlrpc_request *req;
+ ENTRY;
+
+ if (lock->l_export == NULL) {
+ LDLM_DEBUG(lock, "client lock: no-op");
+ RETURN_EXIT;
+ }
+
+ spin_lock(&lock->l_export->exp_lock);
+ list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+ if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
+ req->rq_ops->hpreq_lock_match(req, lock))
+ ptlrpc_hpreq_reorder(req);
+ }
+ spin_unlock(&lock->l_export->exp_lock);
+ EXIT;
+}
+
/*
* ->l_blocking_ast() method for server-side locks. This is invoked when newly
* enqueued server lock conflicts with given one.
ldlm_lock_dump(D_ERROR, lock, 0);
}
+ ldlm_lock_reorder_req(lock);
+
req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
&RQF_LDLM_BL_CALLBACK,
LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
"ldlm_cb",
- LCT_MD_THREAD|LCT_DT_THREAD);
+ LCT_MD_THREAD|LCT_DT_THREAD, NULL);
if (!ldlm_state->ldlm_cb_service) {
CERROR("failed to start service\n");
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
"ldlm_cn",
- LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD);
+ LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
+ NULL);
if (!ldlm_state->ldlm_cancel_service) {
CERROR("failed to start service\n");
timeout = timeout + (timeout >> 1); /* 150% */
return max(timeout, ldlm_enqueue_min);
}
+EXPORT_SYMBOL(ldlm_get_enq_timeout);
static int is_granted_or_cancelled(struct ldlm_lock *lock)
{
lock_vars[0].write_fptr = lprocfs_wr_uint;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_timeouts",
+ ns->ns_name);
+ lock_vars[0].data = &ns->ns_timeouts;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes",
ns->ns_name);
lock_vars[0].data = &ns->ns_max_nolock_size;
ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE;
ns->ns_ctime_age_limit = LDLM_CTIME_AGE_LIMIT;
+ ns->ns_timeouts = 0;
spin_lock_init(&ns->ns_unused_lock);
ns->ns_orig_connect_flags = 0;
ns->ns_connect_flags = 0;
mgs_handle, LUSTRE_MGS_NAME,
obd->obd_proc_entry, target_print_req,
MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
- "ll_mgs", LCT_MD_THREAD);
+ "ll_mgs", LCT_MD_THREAD, NULL);
if (!mgs->mgs_service) {
CERROR("failed to start service\n");
LASSERT(list_empty(&exp->exp_outstanding_replies));
LASSERT(list_empty(&exp->exp_req_replay_queue));
+ LASSERT(list_empty(&exp->exp_queued_rpc));
obd_destroy_export(exp);
class_decref(obd, "export", exp);
CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
+ CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
class_handle_hash(&export->exp_handle, export_handle_addref);
export->exp_last_request_time = cfs_time_current_sec();
spin_lock_init(&export->exp_lock);
/* check that we do support OBD_CONNECT_TRUNCLOCK. */
CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL)
- RETURN(-EFAULT);
+ /* ost_body is varified and swabbed in ost_hpreq_handler() */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
oinfo.oi_oa = &body->oa;
oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
struct ost_prolong_data {
struct obd_export *opd_exp;
ldlm_policy_data_t opd_policy;
+ struct obdo *opd_oa;
ldlm_mode_t opd_mode;
+ int opd_lock_match;
};
static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
return LDLM_ITER_CONTINUE;
}
+ /* Fill the obdo with the matched lock handle.
+ * XXX: it is possible in some cases the IO RPC is covered by several
+ * locks, even for the write case, so it may need to be a lock list. */
+ if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
+ opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
+ opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
+ }
+
if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
/* ignore locks not being cancelled */
return LDLM_ITER_CONTINUE;
/* OK. this is a possible lock the user holds doing I/O
* let's refresh eviction timer for it */
ldlm_refresh_waiting_lock(lock);
+ opd->opd_lock_match = 1;
return LDLM_ITER_CONTINUE;
}
-static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
- struct niobuf_remote *nb, struct obdo *oa,
- ldlm_mode_t mode)
+static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, struct obdo *oa,
+ ldlm_mode_t mode)
{
struct ldlm_res_id res_id;
int nrbufs = obj->ioo_bufcnt;
- struct ost_prolong_data opd;
+ struct ost_prolong_data opd = { 0 };
ENTRY;
osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
lock = ldlm_handle2lock(&oa->o_handle);
if (lock != NULL) {
ost_prolong_locks_iter(lock, &opd);
+ if (opd.opd_lock_match) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(1);
+ }
+
+ /* Check if the lock covers the whole IO region,
+ * otherwise iterate through the resource. */
+ if (lock->l_policy_data.l_extent.end >=
+ opd.opd_policy.l_extent.end &&
+ lock->l_policy_data.l_extent.start <=
+ opd.opd_policy.l_extent.start) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(0);
+ }
LDLM_LOCK_PUT(lock);
- EXIT;
- return;
}
}
+ opd.opd_oa = oa;
ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
ost_prolong_locks_iter, &opd);
-
- EXIT;
+ RETURN(opd.opd_lock_match);
}
static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
struct l_wait_info lwi;
struct lustre_handle lockh = { 0 };
__u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
- int objcount, niocount, npages, nob = 0, rc, i;
+ int niocount, npages, nob = 0, rc, i;
int no_reply = 0;
ENTRY;
if (exp->exp_failed)
GOTO(out, rc = -ENOTCONN);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- GOTO(out, rc = -EFAULT);
- }
-
- objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
- sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- GOTO(out, rc = -EFAULT);
- }
+ /* ost_body, ioobj & noibuf_remote are verified and swabbed in
+ * ost_rw_hpreq_check(). */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
- ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
- lustre_swab_obd_ioobj);
- if (ioo == NULL) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
+ LASSERT(ioo != NULL);
niocount = ioo->ioo_bufcnt;
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
- niocount);
- GOTO(out, rc = -EFAULT);
- }
-
- remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
- niocount * sizeof(*remote_nb),
- lustre_swab_niobuf_remote);
- if (remote_nb == NULL) {
- CERROR("Missing/short niobuf\n");
- GOTO(out, rc = -EFAULT);
- }
- if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
- for (i = 1; i < niocount; i++)
- lustre_swab_niobuf_remote (&remote_nb[i]);
- }
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*remote_nb));
+ LASSERT(remote_nb != NULL);
if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
if (desc == NULL) /* XXX: check all cleanup stuff */
GOTO(out, rc = -ENOMEM);
- ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
+ ost_rw_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
nob = 0;
for (i = 0; i < npages; i++) {
__u32 *rcs;
__u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
int objcount, niocount, npages;
- int rc, swab, i, j;
+ int rc, i, j;
obd_count client_cksum = 0, server_cksum = 0;
cksum_type_t cksum_type = OBD_CKSUM_CRC32;
int no_reply = 0;
if (exp->exp_failed)
GOTO(out, rc = -ENOTCONN);
- swab = lustre_msg_swabbed(req->rq_reqmsg);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- GOTO(out, rc = -EFAULT);
- }
+ /* ost_body, ioobj & noibuf_remote are verified and swabbed in
+ * ost_rw_hpreq_check(). */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
- lustre_set_req_swabbed(req, REQ_REC_OFF + 1);
objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
- sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- GOTO(out, rc = -EFAULT);
- }
-
+ sizeof(*ioo);
ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
objcount * sizeof(*ioo));
- LASSERT (ioo != NULL);
- for (niocount = i = 0; i < objcount; i++) {
- if (swab)
- lustre_swab_obd_ioobj(&ioo[i]);
- if (ioo[i].ioo_bufcnt == 0) {
- CERROR("ioo[%d] has zero bufcnt\n", i);
- GOTO(out, rc = -EFAULT);
- }
+ LASSERT(ioo != NULL);
+ for (niocount = i = 0; i < objcount; i++)
niocount += ioo[i].ioo_bufcnt;
- }
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
- niocount);
- GOTO(out, rc = -EFAULT);
- }
-
- remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
- niocount * sizeof(*remote_nb),
- lustre_swab_niobuf_remote);
- if (remote_nb == NULL) {
- CERROR("Missing/short niobuf\n");
- GOTO(out, rc = -EFAULT);
- }
- if (swab) { /* swab the remaining niobufs */
- for (i = 1; i < niocount; i++)
- lustre_swab_niobuf_remote (&remote_nb[i]);
- }
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*remote_nb));
+ LASSERT(remote_nb != NULL);
if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
GOTO(out_lock, rc = -ETIMEDOUT);
}
- ost_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW);
+ ost_rw_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW);
/* obd_preprw clobbers oa->valid, so save what we need */
if (body->oa.o_valid & OBD_MD_FLCKSUM) {
return rc;
}
+static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int objcount, niocount;
+ int mode, opc, i;
+ __u64 start, end;
+ ENTRY;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+ /* As the request may be covered by several locks, do not look at
+ * o_handle, look at the RPC IO region. */
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
+ lustre_swab_obdo);
+ objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo));
+ LASSERT(ioo != NULL);
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+
+ nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb));
+ LASSERT(nb != NULL);
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+
+ start = nb[0].offset & CFS_PAGE_MASK;
+ end = (nb[ioo->ioo_bufcnt - 1].offset +
+ nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
+
+ if (!(lock->l_granted_mode & mode))
+ RETURN(0);
+
+ if (lock->l_policy_data.l_extent.end < start ||
+ lock->l_policy_data.l_extent.start > end)
+ RETURN(0);
+
+ RETURN(1);
+}
+
+/**
+ * Swab buffers needed to call ost_rw_prolong_locks() and call it.
+ * Return the value from ost_rw_prolong_locks() which is non-zero if
+ * there is a cancelled lock which is waiting for this IO request.
+ */
+static int ost_rw_hpreq_check(struct ptlrpc_request *req)
+{
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int objcount, niocount;
+ int mode, opc, i;
+ ENTRY;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
+
+ objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo));
+ LASSERT(ioo != NULL);
+
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+ nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb));
+ LASSERT(nb != NULL);
+ LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+ RETURN(ost_rw_prolong_locks(req->rq_export, ioo, nb, &body->oa, mode));
+}
+
+static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa)
+{
+ struct ldlm_res_id res_id = { .name = { oa->o_id } };
+ struct ost_prolong_data opd = { 0 };
+ __u64 start, end;
+ ENTRY;
+
+ start = oa->o_size;
+ end = start + oa->o_blocks;
+
+ opd.opd_mode = LCK_PW;
+ opd.opd_exp = exp;
+ opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
+ if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
+ opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
+ else
+ opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
+
+ CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+ res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
+ opd.opd_policy.l_extent.end);
+
+ opd.opd_oa = oa;
+ ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+ ost_prolong_locks_iter, &opd);
+ RETURN(opd.opd_lock_match);
+}
+
+static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct ost_body *body;
+ ENTRY;
+
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
+ lustre_swab_obdo);
+ LASSERT(body != NULL);
+
+ if (body->oa.o_valid & OBD_MD_FLHANDLE &&
+ body->oa.o_handle.cookie == lock->l_handle.h_cookie)
+ RETURN(1);
+ RETURN(0);
+}
+
+static int ost_punch_hpreq_check(struct ptlrpc_request *req)
+{
+ struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
+ REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
+ LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+ !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
+
+ RETURN(ost_punch_prolong_locks(req->rq_export, &body->oa));
+}
+
+struct ptlrpc_hpreq_ops ost_hpreq_rw = {
+ .hpreq_lock_match = ost_rw_hpreq_lock_match,
+ .hpreq_check = ost_rw_hpreq_check,
+};
+
+struct ptlrpc_hpreq_ops ost_hpreq_punch = {
+ .hpreq_lock_match = ost_punch_hpreq_lock_match,
+ .hpreq_check = ost_punch_hpreq_check,
+};
+
+/** Assign high priority operations to the request if needed. */
+static int ost_hpreq_handler(struct ptlrpc_request *req)
+{
+ ENTRY;
+ if (req->rq_export) {
+ int opc = lustre_msg_get_opc(req->rq_reqmsg);
+ struct ost_body *body;
+
+ if (opc == OST_READ || opc == OST_WRITE) {
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ int objcount, niocount;
+ int swab, i;
+
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF,
+ sizeof(*body),
+ lustre_swab_obdo);
+ if (!body) {
+ CERROR("Missing/short ost_body\n");
+ RETURN(-EFAULT);
+ }
+ objcount = lustre_msg_buflen(req->rq_reqmsg,
+ REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ if (objcount == 0) {
+ CERROR("Missing/short ioobj\n");
+ RETURN(-EFAULT);
+ }
+ if (objcount > 1) {
+ CERROR("too many ioobjs (%d)\n", objcount);
+ RETURN(-EFAULT);
+ }
+
+ swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) &&
+ lustre_msg_swabbed(req->rq_reqmsg);
+ ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo),
+ lustre_swab_obd_ioobj);
+ if (!ioo) {
+ CERROR("Missing/short ioobj\n");
+ RETURN(-EFAULT);
+ }
+ for (niocount = i = 0; i < objcount; i++) {
+ if (i > 0 && swab)
+ lustre_swab_obd_ioobj(&ioo[i]);
+ if (ioo[i].ioo_bufcnt == 0) {
+ CERROR("ioo[%d] has zero bufcnt\n", i);
+ RETURN(-EFAULT);
+ }
+ niocount += ioo[i].ioo_bufcnt;
+ }
+ if (niocount > PTLRPC_MAX_BRW_PAGES) {
+ DEBUG_REQ(D_ERROR, req, "bulk has too many "
+ "pages (%d)", niocount);
+ RETURN(-EFAULT);
+ }
+
+ swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) &&
+ lustre_msg_swabbed(req->rq_reqmsg);
+ nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb),
+ lustre_swab_niobuf_remote);
+ if (!nb) {
+ CERROR("Missing/short niobuf\n");
+ RETURN(-EFAULT);
+ }
+
+ if (swab) {
+ /* swab remaining niobufs */
+ for (i = 1; i < niocount; i++)
+ lustre_swab_niobuf_remote(&nb[i]);
+ }
+
+ if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+ req->rq_ops = &ost_hpreq_rw;
+ } else if (opc == OST_PUNCH) {
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF,
+ sizeof(*body),
+ lustre_swab_obdo);
+ if (!body) {
+ CERROR("Missing/short ost_body\n");
+ RETURN(-EFAULT);
+ }
+
+ if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+ !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
+ req->rq_ops = &ost_hpreq_punch;
+ }
+ }
+ RETURN(0);
+}
+
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
int ost_handle(struct ptlrpc_request *req)
{
/* Insure a 4x range for dynamic threads */
if (oss_min_threads > OSS_THREADS_MAX / 4)
oss_min_threads = OSS_THREADS_MAX / 4;
- oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4);
+ oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
}
ost->ost_service =
ost_handle, LUSTRE_OSS_NAME,
obd->obd_proc_entry, target_print_req,
oss_min_threads, oss_max_threads,
- "ll_ost", LCT_DT_THREAD);
+ "ll_ost", LCT_DT_THREAD, NULL);
if (ost->ost_service == NULL) {
CERROR("failed to start service\n");
GOTO(out_lprocfs, rc = -ENOMEM);
ost_handle, "ost_create",
obd->obd_proc_entry, target_print_req,
oss_min_create_threads, oss_max_create_threads,
- "ll_ost_creat", LCT_DT_THREAD);
+ "ll_ost_creat", LCT_DT_THREAD, NULL);
if (ost->ost_create_service == NULL) {
CERROR("failed to start OST create service\n");
GOTO(out_service, rc = -ENOMEM);
ost_handle, "ost_io",
obd->obd_proc_entry, target_print_req,
oss_min_threads, oss_max_threads,
- "ll_ost_io", LCT_DT_THREAD);
+ "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler);
if (ost->ost_io_service == NULL) {
CERROR("failed to start OST I/O service\n");
GOTO(out_create, rc = -ENOMEM);
CFS_INIT_LIST_HEAD(&request->rq_ctx_chain);
CFS_INIT_LIST_HEAD(&request->rq_set_chain);
CFS_INIT_LIST_HEAD(&request->rq_history_list);
+ CFS_INIT_LIST_HEAD(&request->rq_exp_list);
cfs_waitq_init(&request->rq_reply_waitq);
request->rq_xid = ptlrpc_next_xid();
atomic_set(&request->rq_refcount, 1);
LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
+ LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request);
LASSERTF(!request->rq_replay, "req %p\n", request);
LASSERT(request->rq_cli_ctx);
return rc;
}
+static int ptlrpc_lprocfs_rd_hp_ratio(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct ptlrpc_service *svc = data;
+ int rc = snprintf(page, count, "%d", svc->srv_hpreq_ratio);
+ return rc;
+}
+
+static int ptlrpc_lprocfs_wr_hp_ratio(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct ptlrpc_service *svc = data;
+ int rc, val;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc < 0)
+ return rc;
+ if (val < 0)
+ return -ERANGE;
+
+ spin_lock(&svc->srv_lock);
+ svc->srv_hpreq_ratio = val;
+ spin_unlock(&svc->srv_lock);
+ return count;
+}
+
void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry,
struct ptlrpc_service *svc)
{
{.name = "timeouts",
.read_fptr = ptlrpc_lprocfs_rd_timeouts,
.data = svc},
+ {.name = "high_priority_ratio",
+ .read_fptr = ptlrpc_lprocfs_rd_hp_ratio,
+ .write_fptr = ptlrpc_lprocfs_wr_hp_ratio,
+ .data = svc},
{NULL}
};
static struct file_operations req_history_fops = {
void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size,
void *swabber)
{
+ if (lustre_req_swabbed(req, index))
+ return lustre_msg_buf(req->rq_reqmsg, index, min_size);
+
lustre_set_req_swabbed(req, index);
return lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber);
}
void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size,
void *swabber)
{
+ if (lustre_rep_swabbed(req, index))
+ return lustre_msg_buf(req->rq_repmsg, index, min_size);
+
lustre_set_rep_swabbed(req, index);
return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber);
}
EXPORT_SYMBOL(ptlrpc_unregister_service);
EXPORT_SYMBOL(ptlrpc_daemonize);
EXPORT_SYMBOL(ptlrpc_service_health_check);
+EXPORT_SYMBOL(ptlrpc_hpreq_reorder);
/* pack_generic.c */
EXPORT_SYMBOL(lustre_msg_swabbed);
c->psc_watchdog_factor,
h, name, proc_entry,
prntfn, c->psc_min_threads, c->psc_max_threads,
- threadname, c->psc_ctx_tags);
+ threadname, c->psc_ctx_tags, NULL);
}
EXPORT_SYMBOL(ptlrpc_init_svc_conf);
cfs_proc_dir_entry_t *proc_entry,
svcreq_printfn_t svcreq_printfn,
int min_threads, int max_threads,
- char *threadname, __u32 ctx_tags)
+ char *threadname, __u32 ctx_tags,
+ svc_hpreq_handler_t hp_handler)
{
int rc;
struct ptlrpc_service *service;
service->srv_threads_max = max_threads;
service->srv_thread_name = threadname;
service->srv_ctx_tags = ctx_tags;
+ service->srv_hpreq_handler = hp_handler;
+ service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
+ service->srv_hpreq_count = 0;
+ service->srv_n_hpreq = 0;
rc = LNetSetLazyPortal(service->srv_req_portal);
LASSERT (rc == 0);
CFS_INIT_LIST_HEAD(&service->srv_request_queue);
+ CFS_INIT_LIST_HEAD(&service->srv_request_hpq);
CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
{
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+ if (req->rq_export) {
+ class_export_put(req->rq_export);
+ req->rq_export = NULL;
+ }
+
if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
DEBUG_REQ(D_INFO, req, "free req");
static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
{
struct obd_export *oldest_exp;
- time_t oldest_time;
+ time_t oldest_time, new_time;
ENTRY;
of the list, we can be really lazy here - we don't have to evict
at the exact right moment. Eventually, all silent exports
will make it to the top of the list. */
- exp->exp_last_request_time = max(exp->exp_last_request_time,
- cfs_time_current_sec() + extra_delay);
+ /* Do not pay attention on 1sec or smaller renewals. */
+ new_time = cfs_time_current_sec() + extra_delay;
+ if (exp->exp_last_request_time + 1 /*second */ >= new_time)
+ RETURN_EXIT;
+
+ exp->exp_last_request_time = new_time;
CDEBUG(D_HA, "updating export %s at "CFS_TIME_T" exp %p\n",
exp->exp_client_uuid.uuid,
exp->exp_last_request_time, exp);
if (list_empty(&exp->exp_obd_chain_timed)) {
/* this one is not timed */
spin_unlock(&exp->exp_obd->obd_dev_lock);
- EXIT;
- return;
+ RETURN_EXIT;
}
list_move_tail(&exp->exp_obd_chain_timed,
RETURN(0);
}
+/**
+ * Put the request to the export list if the request may become
+ * a high priority one.
+ */
+static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ int rc;
+ ENTRY;
+
+ if (svc->srv_hpreq_handler) {
+ rc = svc->srv_hpreq_handler(req);
+ if (rc)
+ RETURN(rc);
+ }
+ if (req->rq_export && req->rq_ops) {
+ spin_lock(&req->rq_export->exp_lock);
+ list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc);
+ spin_unlock(&req->rq_export->exp_lock);
+ }
+
+ RETURN(0);
+}
+
+/** Remove the request from the export list. */
+static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
+{
+ ENTRY;
+ if (req->rq_export && req->rq_ops) {
+ spin_lock(&req->rq_export->exp_lock);
+ list_del_init(&req->rq_exp_list);
+ spin_unlock(&req->rq_export->exp_lock);
+ }
+ EXIT;
+}
+
+/**
+ * Make the request a high priority one.
+ *
+ * All the high priority requests are queued in a separate FIFO
+ * ptlrpc_service::srv_request_hpq list which is parallel to
+ * ptlrpc_service::srv_request_queue list but has a higher priority
+ * for handling.
+ *
+ * \see ptlrpc_server_handle_request().
+ */
+static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ ENTRY;
+ LASSERT(svc != NULL);
+ spin_lock(&req->rq_lock);
+ if (req->rq_hp == 0) {
+ int opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ /* Add to the high priority queue. */
+ list_move_tail(&req->rq_list, &svc->srv_request_hpq);
+ req->rq_hp = 1;
+ if (opc != OBD_PING)
+ DEBUG_REQ(D_NET, req, "high priority req");
+ }
+ spin_unlock(&req->rq_lock);
+ EXIT;
+}
+
+void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+ ENTRY;
+
+ spin_lock(&svc->srv_lock);
+ /* It may happen that the request is already taken for the processing
+ * but still in the export list, do not re-add it into the HP list. */
+ if (req->rq_phase == RQ_PHASE_NEW)
+ ptlrpc_hpreq_reorder_nolock(svc, req);
+ spin_unlock(&svc->srv_lock);
+ EXIT;
+}
+
+/** Check if the request if a high priority one. */
+static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
+{
+ int opc, rc = 0;
+ ENTRY;
+
+ /* Check by request opc. */
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ if (opc == OBD_PING)
+ RETURN(1);
+
+ /* Perform request specific check. */
+ if (req->rq_ops && req->rq_ops->hpreq_check)
+ rc = req->rq_ops->hpreq_check(req);
+ RETURN(rc);
+}
+
+/** Check if a request is a high priority one. */
+static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ int rc;
+ ENTRY;
+
+ rc = ptlrpc_server_hpreq_check(req);
+ if (rc < 0)
+ RETURN(rc);
+
+ spin_lock(&svc->srv_lock);
+ /* Before inserting the request into the queue, check if it is not
+ * inserted yet, or even already handled -- it may happen due to
+ * a racing ldlm_server_blocking_ast(). */
+ if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) {
+ if (rc)
+ ptlrpc_hpreq_reorder_nolock(svc, req);
+ else
+ list_add_tail(&req->rq_list, &svc->srv_request_queue);
+ }
+ spin_unlock(&svc->srv_lock);
+
+ RETURN(0);
+}
+
+/* Only allow normal priority requests on a service that has a high-priority
+ * queue if forced (i.e. cleanup), if there are other high priority requests
+ * already being processed (i.e. those threads can service more high-priority
+ * requests), or if there are enough idle threads that a later thread can do
+ * a high priority request. */
+static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
+{
+ return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
+ svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+}
+
+static struct ptlrpc_request *
+ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+{
+ struct ptlrpc_request *req = NULL;
+ ENTRY;
+
+ if (ptlrpc_server_allow_normal(svc, force) &&
+ !list_empty(&svc->srv_request_queue) &&
+ (list_empty(&svc->srv_request_hpq) ||
+ svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
+ req = list_entry(svc->srv_request_queue.next,
+ struct ptlrpc_request, rq_list);
+ svc->srv_hpreq_count = 0;
+ } else if (!list_empty(&svc->srv_request_hpq)) {
+ req = list_entry(svc->srv_request_hpq.next,
+ struct ptlrpc_request, rq_list);
+ svc->srv_hpreq_count++;
+ }
+ RETURN(req);
+}
+
+static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+ return ((ptlrpc_server_allow_normal(svc, force) &&
+ !list_empty(&svc->srv_request_queue)) ||
+ !list_empty(&svc->srv_request_hpq));
+}
+
/* Handle freshly incoming reqs, add to timed early reply list,
pass on to regular request queue */
static int
"illegal security flavor,");
}
- class_export_put(req->rq_export);
- req->rq_export = NULL;
if (rc)
goto err_req;
+ ptlrpc_update_export_timer(req->rq_export, 0);
}
/* req_in handling should/must be fast */
}
ptlrpc_at_add_timed(req);
+ rc = ptlrpc_hpreq_init(svc, req);
+ if (rc)
+ GOTO(err_req, rc);
/* Move it over to the request processing queue */
- spin_lock(&svc->srv_lock);
- list_add_tail(&req->rq_list, &svc->srv_request_queue);
+ rc = ptlrpc_server_request_add(svc, req);
+ if (rc)
+ GOTO(err_req, rc);
cfs_waitq_signal(&svc->srv_waitq);
- spin_unlock(&svc->srv_lock);
RETURN(1);
err_req:
struct timeval work_start;
struct timeval work_end;
long timediff;
- int rc;
+ int opc, rc;
+ int fail_opc = 0;
ENTRY;
LASSERT(svc);
spin_lock(&svc->srv_lock);
- if (unlikely(list_empty (&svc->srv_request_queue) ||
+ if (unlikely(!ptlrpc_server_request_pending(svc, 0) ||
(
#ifndef __KERNEL__
/* !@%$# liblustre only has 1 thread */
* That means we always need at least 2 service threads. */
spin_unlock(&svc->srv_lock);
RETURN(0);
+ }
+
+ request = ptlrpc_server_request_get(svc, 0);
+ if (request == NULL) {
+ spin_unlock(&svc->srv_lock);
+ RETURN(0);
+ }
+
+ opc = lustre_msg_get_opc(request->rq_reqmsg);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
+ fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
+ else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+ fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
+
+ if (unlikely(fail_opc)) {
+ if (request->rq_export && request->rq_ops) {
+ spin_unlock(&svc->srv_lock);
+ OBD_FAIL_TIMEOUT(fail_opc, 4);
+ spin_lock(&svc->srv_lock);
+ request = ptlrpc_server_request_get(svc, 0);
+ if (request == NULL) {
+ spin_unlock(&svc->srv_lock);
+ RETURN(0);
+ }
+ LASSERT(ptlrpc_server_request_pending(svc, 0));
+ }
}
- request = list_entry (svc->srv_request_queue.next,
- struct ptlrpc_request, rq_list);
- list_del_init (&request->rq_list);
+ list_del_init(&request->rq_list);
svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
+ if (request->rq_hp)
+ svc->srv_n_hpreq++;
+ /* The phase is changed under the lock here because we need to know
+ * the request is under processing (see ptlrpc_hpreq_reorder()). */
+ ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
spin_unlock(&svc->srv_lock);
+ ptlrpc_hpreq_fini(request);
+
if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
libcfs_debug_dumplog();
if (thread)
request->rq_svc_thread->t_env->le_ses = &request->rq_session;
- request->rq_export = class_conn2export(
- lustre_msg_get_handle(request->rq_reqmsg));
-
if (likely(request->rq_export)) {
if (unlikely(ptlrpc_check_req(request)))
goto put_conn;
goto put_rpc_export;
}
- ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-
CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
"%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
(request->rq_export ?
if (export != NULL)
class_export_rpc_put(export);
put_conn:
- if (likely(request->rq_export != NULL))
- class_export_put(request->rq_export);
-
lu_context_exit(&request->rq_session);
lu_context_fini(&request->rq_session);
}
out_req:
+ spin_lock(&svc->srv_lock);
+ if (request->rq_hp)
+ svc->srv_n_hpreq--;
+ spin_unlock(&svc->srv_lock);
ptlrpc_server_finish_request(request);
RETURN(1);
svc->srv_rqbd_timeout == 0) ||
!list_empty(&svc->srv_req_in_queue) ||
!list_empty(&svc->srv_reply_queue) ||
- (!list_empty(&svc->srv_request_queue) &&
+ (ptlrpc_server_request_pending(svc, 0) &&
(svc->srv_n_active_reqs <
(svc->srv_threads_running - 1))) ||
svc->srv_at_check,
ptlrpc_at_check_timed(svc);
/* don't handle requests in the last thread */
- if (!list_empty (&svc->srv_request_queue) &&
+ if (ptlrpc_server_request_pending(svc, 0) &&
(svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
lu_context_enter(&env.le_ctx);
ptlrpc_server_handle_request(svc, thread);
service->srv_n_active_reqs++;
ptlrpc_server_finish_request(req);
}
- while (!list_empty(&service->srv_request_queue)) {
- struct ptlrpc_request *req =
- list_entry(service->srv_request_queue.next,
- struct ptlrpc_request,
- rq_list);
+ while (ptlrpc_server_request_pending(service, 1)) {
+ struct ptlrpc_request *req;
+ req = ptlrpc_server_request_get(service, 1);
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
-
+ ptlrpc_hpreq_fini(req);
ptlrpc_server_finish_request(req);
}
LASSERT(service->srv_n_queued_reqs == 0);
do_gettimeofday(&right_now);
spin_lock(&svc->srv_lock);
- if (list_empty(&svc->srv_request_queue)) {
+ if (!ptlrpc_server_request_pending(svc, 1)) {
spin_unlock(&svc->srv_lock);
return 0;
}
/* How long has the next entry been waiting? */
- request = list_entry(svc->srv_request_queue.next,
- struct ptlrpc_request, rq_list);
+ if (list_empty(&svc->srv_request_queue))
+ request = list_entry(svc->srv_request_hpq.next,
+ struct ptlrpc_request, rq_list);
+ else
+ request = list_entry(svc->srv_request_queue.next,
+ struct ptlrpc_request, rq_list);
timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
spin_unlock(&svc->srv_lock);
# End commit on sharing tests
+test_33() { #16129
+ for OPER in notimeout timeout ; do
+ rm $DIR1/$tfile 2>/dev/null
+ lock_in=0;
+ for f in `lctl get_param -n ldlm/namespaces/*/lock_timeouts`; do
+ lock_in=$(($lock_in + $f))
+ done
+ if [ $OPER == "timeout" ] ; then
+ for j in `seq $OSTCOUNT`; do
+ #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT 0x510
+ do_facet ost$j lctl set_param fail_loc=0x510
+ done
+ echo lock should expire
+ else
+ for j in `seq $OSTCOUNT`; do
+ #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT 0x511
+ do_facet ost$j lctl set_param fail_loc=0x511
+ done
+ echo lock should not expire
+ fi
+ echo writing on client1
+ dd if=/dev/zero of=$DIR1/$tfile count=100 conv=notrunc > /dev/null 2>&1
+ sync &
+ # wait for the flush
+ sleep 1
+ echo reading on client2
+ dd of=/dev/null if=$DIR2/$tfile > /dev/null 2>&1
+ # wait for a lock timeout
+ sleep 4
+ lock_out=0
+ for f in `lctl get_param -n ldlm/namespaces/*/lock_timeouts`; do
+ lock_out=$(($lock_out + $f))
+ done
+ if [ $OPER == "timeout" ] ; then
+ if [ $lock_in == $lock_out ]; then
+ error "no lock timeout happened"
+ else
+ echo "success"
+ fi
+ else
+ if [ $lock_in != $lock_out ]; then
+ error "lock timeout happened"
+ else
+ echo "success"
+ fi
+ fi
+ done
+}
+run_test 33 "no lock timeout under IO"
+
log "cleanup: ======================================================"
check_and_cleanup_lustre