struct list_head cli_sent_head;
struct list_head cli_replied_head;
struct list_head cli_replay_head;
- struct list_head cli_ha_item;
+ struct list_head cli_dying_head;
+ struct list_head cli_ha_item;
void (*cli_recover)(struct ptlrpc_client *);
struct recovd_obd *cli_recovd;
struct ptlrpc_request {
int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
- spinlock_t rq_lock;
struct list_head rq_list;
struct obd_device *rq_obd;
int rq_status;
ptl_handle_me_t b_me_h;
};
+struct ptlrpc_thread {
+ struct list_head t_link;
+
+ __u32 t_flags;
+ wait_queue_head_t t_ctl_waitq;
+};
+
struct ptlrpc_service {
time_t srv_time;
time_t srv_timeout;
/* event queue */
ptl_handle_eq_t srv_eq_h;
- __u32 srv_flags;
struct lustre_peer srv_self;
- ptl_process_id_t srv_id;
- struct task_struct *srv_thread;
- wait_queue_head_t srv_waitq;
- wait_queue_head_t srv_ctl_waitq;
+ wait_queue_head_t srv_waitq; /* all threads sleep on this */
spinlock_t srv_lock;
struct list_head srv_reqs;
+ struct list_head srv_threads;
int (*srv_handler)(struct obd_device *obddev,
struct ptlrpc_service *svc,
struct ptlrpc_request *req);
struct ptlrpc_service *
ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
svc_handler_t);
-void ptlrpc_stop_thread(struct ptlrpc_service *svc);
+void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
char *name);
int rpc_unregister_service(struct ptlrpc_service *service);
struct ptlrpc_svc_data {
char *name;
struct ptlrpc_service *svc;
+ struct ptlrpc_thread *thread;
struct obd_device *dev;
};
INIT_LIST_HEAD(&cl->cli_sent_head);
INIT_LIST_HEAD(&cl->cli_replied_head);
INIT_LIST_HEAD(&cl->cli_replay_head);
+ INIT_LIST_HEAD(&cl->cli_dying_head);
spin_lock_init(&cl->cli_lock);
sema_init(&cl->cli_rpc_sem, 32);
}
break;
/* retain for replay if flagged */
+ list_del(&req->rq_list);
if (req->rq_flags & PTL_RPC_FL_RETAIN) {
- list_del(&req->rq_list);
list_add(&req->rq_list, &cli->cli_replay_head);
} else {
CDEBUG(D_INFO, "Marking request %p as committed ("
req->rq_transno, cli->cli_last_committed);
if (atomic_dec_and_test(&req->rq_refcount))
ptlrpc_free_req(req);
+ else
+ list_add(&req->rq_list, &cli->cli_dying_head);
}
}
list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
/* We do this to prevent ptlrpc_free_req from taking cli_lock */
- CDEBUG(D_INFO, "Cleaning req %p from replied head.\n", req);
+ CDEBUG(D_INFO, "Cleaning req %p from replied list.\n", req);
list_del(&req->rq_list);
req->rq_client = NULL;
ptlrpc_free_req(req);
}
list_for_each_safe(tmp, saved, &cli->cli_sent_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- CDEBUG(D_INFO, "Cleaning req %p from sent head.\n", req);
+ CDEBUG(D_INFO, "Cleaning req %p from sent list.\n", req);
list_del(&req->rq_list);
req->rq_client = NULL;
ptlrpc_free_req(req);
}
list_for_each_safe(tmp, saved, &cli->cli_replay_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- CERROR("Request %p is on the replay head at cleanup!\n", req);
+ CERROR("Request %p is on the replay list at cleanup!\n", req);
list_del(&req->rq_list);
req->rq_client = NULL;
ptlrpc_free_req(req);
}
list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
- CDEBUG(D_INFO, "Cleaning req %p from sending head.\n", req);
+ CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
+ list_del(&req->rq_list);
+ req->rq_client = NULL;
+ ptlrpc_free_req(req);
+ }
+ list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
+ req = list_entry(tmp, struct ptlrpc_request, rq_list);
+ CERROR("Request %p is on the dying list at cleanup!\n", req);
list_del(&req->rq_list);
req->rq_client = NULL;
ptlrpc_free_req(req);
extern int request_in_callback(ptl_event_t *ev, void *data);
extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
-static int ptlrpc_check_event(struct ptlrpc_service *svc, ptl_event_t *event)
+static int ptlrpc_check_event(struct ptlrpc_service *svc,
+ struct ptlrpc_thread *thread, ptl_event_t *event)
{
int rc = 0;
ENTRY;
if (sigismember(&(current->pending.signal), SIGKILL) ||
sigismember(&(current->pending.signal), SIGTERM) ||
sigismember(&(current->pending.signal), SIGINT)) {
- svc->srv_flags |= SVC_KILLED;
+ thread->t_flags |= SVC_KILLED;
GOTO(out, rc = 1);
}
- if (svc->srv_flags & SVC_STOPPING)
+ if (thread->t_flags & SVC_STOPPING)
GOTO(out, rc = 1);
if (ptl_is_valid_handle(&svc->srv_eq_h)) {
err = PtlEQGet(svc->srv_eq_h, event);
if (err == PTL_OK) {
- svc->srv_flags |= SVC_EVENT;
+ thread->t_flags |= SVC_EVENT;
GOTO(out, rc = 1);
}
spin_lock_init(&service->srv_lock);
INIT_LIST_HEAD(&service->srv_reqs);
- init_waitqueue_head(&service->srv_ctl_waitq);
+ INIT_LIST_HEAD(&service->srv_threads);
init_waitqueue_head(&service->srv_waitq);
- service->srv_thread = NULL;
- service->srv_flags = 0;
-
service->srv_buf_size = bufsize;
service->srv_rep_portal = rep_portal;
service->srv_req_portal = req_portal;
}
service->srv_ring_length = RPC_RING_LENGTH;
- service->srv_id.nid = PTL_ID_ANY;
- service->srv_id.pid = PTL_ID_ANY;
rc = PtlEQAlloc(service->srv_self.peer_ni, 128, request_in_callback,
service, &(service->srv_eq_h));
LBUG();
}
- svc->srv_flags &= ~SVC_EVENT;
-
spin_unlock(&svc->srv_lock);
rc = svc->srv_handler(obddev, svc, &request);
ptlrpc_put_connection(request.rq_connection);
struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
struct obd_device *obddev = data->dev;
struct ptlrpc_service *svc = data->svc;
+ struct ptlrpc_thread *thread = data->thread;
ENTRY;
sprintf(current->comm, data->name);
- /* Record that the thread is running */
- svc->srv_thread = current;
- svc->srv_flags = SVC_RUNNING;
- wake_up(&svc->srv_ctl_waitq);
+ /* Record that the thread is running */
+ thread->t_flags = SVC_RUNNING;
+ wake_up(&thread->t_ctl_waitq);
/* XXX maintain a list of all managed devices: insert here */
while (1) {
ptl_event_t event;
- wait_event(svc->srv_waitq, ptlrpc_check_event(svc, &event));
+ wait_event(svc->srv_waitq,
+ ptlrpc_check_event(svc, thread, &event));
spin_lock(&svc->srv_lock);
- if (svc->srv_flags & SVC_SIGNAL) {
- svc->srv_flags &= ~SVC_SIGNAL;
+ if (thread->t_flags & SVC_SIGNAL) {
+ thread->t_flags &= ~SVC_SIGNAL;
spin_unlock(&svc->srv_lock);
EXIT;
break;
}
- if (svc->srv_flags & SVC_STOPPING) {
- svc->srv_flags &= ~SVC_STOPPING;
+ if (thread->t_flags & SVC_STOPPING) {
+ thread->t_flags &= ~SVC_STOPPING;
spin_unlock(&svc->srv_lock);
EXIT;
break;
}
- if (svc->srv_flags & SVC_EVENT) {
- svc->srv_flags &= ~SVC_EVENT;
+ if (thread->t_flags & SVC_EVENT) {
+ thread->t_flags &= ~SVC_EVENT;
rc = handle_incoming_request(obddev, svc, &event);
+ thread->t_flags &= ~SVC_EVENT;
continue;
}
break;
}
- svc->srv_thread = NULL;
- svc->srv_flags = SVC_STOPPED;
- wake_up(&svc->srv_ctl_waitq);
- CDEBUG(D_NET, "svc exiting process %d\n", current->pid);
+ thread->t_flags = SVC_STOPPED;
+ wake_up(&thread->t_ctl_waitq);
+ CDEBUG(D_NET, "service thread exiting, process %d\n", current->pid);
return 0;
}
-void ptlrpc_stop_thread(struct ptlrpc_service *svc)
+static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
+ struct ptlrpc_thread *thread)
{
- svc->srv_flags = SVC_STOPPING;
+ thread->t_flags = SVC_STOPPING;
wake_up(&svc->srv_waitq);
- wait_event_interruptible(svc->srv_ctl_waitq,
- (svc->srv_flags & SVC_STOPPED));
+ wait_event_interruptible(thread->t_ctl_waitq,
+ (thread->t_flags & SVC_STOPPED));
+}
+
+void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
+{
+ spin_lock(&svc->srv_lock);
+ while (!list_empty(&svc->srv_threads)) {
+ struct ptlrpc_thread *thread;
+ thread = list_entry(svc->srv_threads.next, struct ptlrpc_thread,
+ t_link);
+ spin_unlock(&svc->srv_lock);
+ ptlrpc_stop_thread(svc, thread);
+ spin_lock(&svc->srv_lock);
+ list_del(&thread->t_link);
+ OBD_FREE(thread, sizeof(*thread));
+ }
+ spin_unlock(&svc->srv_lock);
}
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
- char *name)
+ char *name)
{
struct ptlrpc_svc_data d;
+ struct ptlrpc_thread *thread;
int rc;
ENTRY;
+ OBD_ALLOC(thread, sizeof(*thread));
+ if (thread == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ init_waitqueue_head(&thread->t_ctl_waitq);
+
d.dev = dev;
d.svc = svc;
d.name = name;
+ d.thread = thread;
- init_waitqueue_head(&svc->srv_waitq);
+ spin_lock(&svc->srv_lock);
+ list_add(&thread->t_link, &svc->srv_threads);
+ spin_unlock(&svc->srv_lock);
- init_waitqueue_head(&svc->srv_ctl_waitq);
rc = kernel_thread(ptlrpc_main, (void *) &d,
CLONE_VM | CLONE_FS | CLONE_FILES);
if (rc < 0) {
CERROR("cannot start thread\n");
RETURN(-EINVAL);
}
- wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
+ wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING);
RETURN(0);
}