void
ptlrpc_save_lock (struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode)
+ struct lustre_handle *lock, int mode, int no_ack)
{
struct ptlrpc_reply_state *rs = req->rq_reply_state;
int idx;
rs->rs_locks[idx] = *lock;
rs->rs_modes[idx] = mode;
rs->rs_difficult = 1;
+ rs->rs_no_ack = !!no_ack;
}
void
ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
{
struct ptlrpc_service *svc = rs->rs_service;
+ ENTRY;
#ifdef CONFIG_SMP
LASSERT (spin_is_locked (&svc->srv_lock));
LASSERT (rs->rs_difficult);
rs->rs_scheduled_ever = 1; /* flag any notification attempt */
- if (rs->rs_scheduled) /* being set up or already notified */
+ if (rs->rs_scheduled) { /* being set up or already notified */
+ EXIT;
return;
+ }
rs->rs_scheduled = 1;
list_del (&rs->rs_list);
list_add (&rs->rs_list, &svc->srv_reply_queue);
cfs_waitq_signal (&svc->srv_waitq);
+ EXIT;
}
void
{
struct list_head *tmp;
struct list_head *nxt;
+ ENTRY;
/* Find any replies that have been committed and get their service
* to attend to complete them. */
}
spin_unlock(&obd->obd_uncommitted_replies_lock);
+ EXIT;
}
static int
req->rq_req_swab_mask = 0;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
- if (rc < 0) {
+ if (rc != 0) {
CERROR("error unpacking request: ptl %d from %s x"LPU64"\n",
svc->srv_req_portal, libcfs_id2str(req->rq_peer),
req->rq_xid);
goto err_req;
}
- if (rc > 0)
- lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
-
rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
if (rc) {
CERROR ("error unpacking ptlrpc body: ptl %d from %s x"
if (!rs->rs_on_net) {
/* Off the net */
svc->srv_n_difficult_replies--;
+ if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
+ /* wake up threads that are being stopped by
+ ptlrpc_unregister_service/ptlrpc_stop_threads
+ and sleep waiting svr_n_difficult_replies == 0 */
+ cfs_waitq_broadcast(&svc->srv_waitq);
spin_unlock(&svc->srv_lock);
class_export_put (exp);
struct ptlrpc_thread *thread)
{
struct l_wait_info lwi = { 0 };
+ ENTRY;
+ CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
spin_lock(&svc->srv_lock);
thread->t_flags = SVC_STOPPING;
spin_unlock(&svc->srv_lock);
spin_unlock(&svc->srv_lock);
OBD_FREE_PTR(thread);
+ EXIT;
}
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
struct ptlrpc_thread *thread;
+ ENTRY;
spin_lock(&svc->srv_lock);
while (!list_empty(&svc->srv_threads)) {
}
spin_unlock(&svc->srv_lock);
+ EXIT;
}
int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
struct l_wait_info lwi;
struct list_head *tmp;
struct ptlrpc_reply_state *rs, *t;
+ ENTRY;
+ service->srv_is_stopping = 1;
cfs_timer_disarm(&service->srv_at_timer);
ptlrpc_stop_all_threads(service);
cfs_timer_disarm(&service->srv_at_timer);
OBD_FREE_PTR(service);
- return 0;
+ RETURN(0);
}
/* Returns 0 if the service is healthy.