From 230252cb2333cf26ff4045766546e63783235abf Mon Sep 17 00:00:00 2001 From: niu Date: Thu, 15 Jan 2004 03:54:12 +0000 Subject: [PATCH] * refine local rpc code * update from HEAD --- lustre/ptlrpc/ptlrpc_local.c | 289 +++++++------------------------------------ 1 file changed, 42 insertions(+), 247 deletions(-) diff --git a/lustre/ptlrpc/ptlrpc_local.c b/lustre/ptlrpc/ptlrpc_local.c index bf72175..99b618d 100644 --- a/lustre/ptlrpc/ptlrpc_local.c +++ b/lustre/ptlrpc/ptlrpc_local.c @@ -36,21 +36,12 @@ #include #include "ptlrpc_internal.h" - -struct ptlrpc_local_cs *ost_local_svc = NULL; -struct ptlrpc_local_cs *mdt_local_svc = NULL; - static int local_handle(struct ptlrpc_request *req, svc_handler_t handler) { int rc = 0; unsigned long flags; ENTRY; -/* - SIGNAL_MASK_LOCK(current, flags); - sigfillset(¤t->blocked); - RECALC_SIGPENDING; - SIGNAL_MASK_UNLOCK(current, flags); -*/ + req->rq_export = class_conn2export(&req->rq_reqmsg->handle); if (req->rq_export) { req->rq_connection = req->rq_export->exp_connection; @@ -68,10 +59,11 @@ static int local_handle(struct ptlrpc_request *req, svc_handler_t handler) req->rq_export->exp_last_request_time = LTIME_S(CURRENT_TIME); } else { + /* connection must has been established */ + LBUG(); /* create a (hopefully temporary) connection that will be used * to send the reply if this call doesn't create an export. * XXX revisit this when we revamp ptlrpc */ - LBUG(); req->rq_connection = ptlrpc_get_connection(&req->rq_peer, NULL); } @@ -101,97 +93,10 @@ out_putconn: } -static int localrpc_main(void *arg) -{ - struct ptlrpc_local_cs *svc = arg; - struct ptlrpc_thread *thread = svc->svc_thread; - svc_handler_t handler = svc->srv_handler; - unsigned long flags; - int rc; - - lock_kernel(); - ptlrpc_daemonize(); - - SIGNAL_MASK_LOCK(current, flags); - sigfillset(¤t->blocked); - RECALC_SIGPENDING; - SIGNAL_MASK_UNLOCK(current, flags); - - THREAD_NAME(current->comm, "%s", "localrpc"); - unlock_kernel(); - - thread->t_flags = SVC_RUNNING; - wake_up(&thread->t_ctl_waitq); - - while (1) { - struct ptlrpc_request *req; - struct l_wait_info lwi = { 0 }; - - l_wait_event(svc->req_waitq, - !list_empty(&svc->req_list) || - thread->t_flags & SVC_STOPPING, - &lwi); - - if (thread->t_flags & SVC_STOPPING) { - struct list_head *tmp, *n; - list_for_each_safe(tmp, n, &svc->req_list) { - req = list_entry(tmp, struct ptlrpc_request, - rq_list); - list_del_init(&req->rq_list); - } - break; - } - - spin_lock_irqsave(&svc->req_lock, flags); - req = list_entry(svc->req_list.next, struct ptlrpc_request, rq_list); - list_del_init(&req->rq_list); - spin_unlock_irqrestore(&svc->req_lock, flags); - - rc = local_handle(req, handler); - wake_up(&req->rq_reply_waitq); - - } - thread->t_flags = SVC_STOPPED; - wake_up(&thread->t_ctl_waitq); - RETURN(0); - -} - -static int localrpc_start_thread(struct ptlrpc_local_cs *svc) -{ - struct l_wait_info lwi = { 0 }; - struct ptlrpc_thread *thread; - int rc; - ENTRY; - - OBD_ALLOC(thread, sizeof(*thread)); - if (thread == NULL) - RETURN(-ENOMEM); - init_waitqueue_head(&thread->t_ctl_waitq); - - svc->svc_thread = thread; - - /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we - * just drop the VM and FILES in ptlrpc_daemonize() right away. - */ - rc = kernel_thread(localrpc_main, svc, CLONE_VM | CLONE_FILES); - if (rc < 0) { - CERROR("cannot start thread: %d\n", rc); - GOTO(out_free, rc); - } - l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi); - - RETURN(0); -out_free: - OBD_FREE(thread, sizeof(*thread)); - RETURN(rc); -} - -static int local_svc_available(struct ptlrpc_request *req) +int local_svc_available(struct ptlrpc_request *req) { int req_portal = req->rq_request_portal; - int req_opc = req->rq_reqmsg->opc; - int av_portal = 0, av_opc = 1; + int av_portal = 0; av_portal = (req_portal == MDS_REQUEST_PORTAL || req_portal == MDS_SETATTR_PORTAL || @@ -199,161 +104,55 @@ static int local_svc_available(struct ptlrpc_request *req) req_portal == OST_REQUEST_PORTAL || req_portal == OST_CREATE_PORTAL) ? 1 : 0; - /* XXX For debug: only LDLM_ENQUEUE available for local rpc */ - av_opc = (req_opc == LDLM_ENQUEUE) ? 1 : 0; - - return av_portal & av_opc; - -} - -static int _local_rpc_init(struct ptlrpc_local_cs **local_svc, - svc_handler_t handler) -{ - int rc = 0; - struct ptlrpc_local_cs *svc = NULL; - - OBD_ALLOC(svc, sizeof(*svc)); - if (svc == NULL) - RETURN( -ENOMEM); - - svc->srv_handler = handler; - INIT_LIST_HEAD(&svc->req_list); - spin_lock_init(&svc->req_lock); - init_waitqueue_head(&svc->req_waitq); - - rc = localrpc_start_thread(svc); - if (rc) { - OBD_FREE(svc, sizeof(*svc)); - RETURN(rc); - } - - *local_svc = svc; - RETURN(rc); -} - -int local_rpc_init(char *type, struct ptlrpc_local_cs **svc, - svc_handler_t handler) -{ - int rc = 0; - - if (strcmp(type, "ost") == 0) { - if (ost_local_svc == NULL) - rc = _local_rpc_init(&ost_local_svc, handler); - *svc = ost_local_svc; - - } else if (strcmp(type, "mdt") == 0) { - if (mdt_local_svc == NULL) - rc = _local_rpc_init(&mdt_local_svc, handler); - *svc = mdt_local_svc; - } else { - LBUG(); - } - - RETURN(rc); -} - -static void _local_rpc_cleanup(struct ptlrpc_local_cs *svc) -{ - unsigned long flags; - struct l_wait_info lw = { 0 }; - - spin_lock_irqsave(&svc->req_lock, flags); - svc->svc_thread->t_flags = SVC_STOPPING; - spin_unlock_irqrestore(&svc->req_lock, flags); - wake_up(&svc->req_waitq); - - l_wait_event(svc->svc_thread->t_ctl_waitq, - svc->svc_thread->t_flags & SVC_STOPPED, - &lw); - - OBD_FREE(svc->svc_thread, sizeof(*svc->svc_thread)); - OBD_FREE(svc, sizeof(*svc)); -} + return av_portal; -void local_rpc_cleanup(char *type) -{ - if (strcmp(type, "ost") == 0 && ost_local_svc != NULL) { - _local_rpc_cleanup(ost_local_svc); - ost_local_svc = NULL; - } else if (strcmp(type, "mdt") == 0 && mdt_local_svc != NULL) { - _local_rpc_cleanup(mdt_local_svc); - mdt_local_svc = NULL; - } } -#define SAME_THREAD int local_send_rpc(struct ptlrpc_request *req) { - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; int rc; unsigned long flags; - struct l_wait_info lwi = { 0 }; struct obd_ucred ucred; void *journal_info; - int ngroups; + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; ENTRY; - /* XXX tight restriction for debug */ - if (local_svc_available(req) && cli->local_rpc != NULL && - req->rq_bulk == NULL) { - struct ptlrpc_local_cs *svc = cli->local_rpc; + LASSERT (req->rq_replen != 0); + req->rq_reqmsg->handle = req->rq_import->imp_remote_handle; + req->rq_reqmsg->conn_cnt = req->rq_import->imp_conn_cnt; - req->rq_reqmsg->handle = req->rq_import->imp_remote_handle; - req->rq_reqmsg->conn_cnt = req->rq_import->imp_conn_cnt; - - LASSERT (req->rq_replen != 0); - - spin_lock_irqsave (&req->rq_lock, flags); - req->rq_receiving_reply = 1; - req->rq_replied = 0; - req->rq_err = 0; - req->rq_timedout = 0; - req->rq_resend = 0; - req->rq_restart = 0; - spin_unlock_irqrestore (&request->rq_lock, flags); - - req->rq_sent = LTIME_S(CURRENT_TIME); -// ptlrpc_pinger_sending_on_import(req->rq_import); - req->rq_type = PTL_LOCAL_MSG_REQUEST; - -#ifndef SAME_THREAD - spin_lock_irqsave(&svc->req_lock, flags); - list_del_init(&req->rq_list); - list_add_tail(&req->rq_list, &svc->req_list); - spin_unlock_irqrestore(&svc->req_lock, flags); - wake_up(&svc->req_waitq); - - l_wait_event(req->rq_reply_waitq, req->rq_replied, &lwi); -#else - -/* - // for nesting journal - journal_info = current->journal_info; - current->journal_info = NULL; -*/ - ucred.ouc_fsuid = current->fsuid; - ucred.ouc_fsgid = current->fsgid; - ucred.ouc_cap = current->cap_effective; - ngroups = current->ngroups; - current->fsuid = current->fsgid = 0; - current->ngroups = 0; - current->cap_effective = -1; + spin_lock_irqsave (&req->rq_lock, flags); + req->rq_receiving_reply = 1; + req->rq_replied = 0; + req->rq_err = 0; + req->rq_timedout = 0; + req->rq_resend = 0; + req->rq_restart = 0; + spin_unlock_irqrestore (&request->rq_lock, flags); - rc = local_handle(req, svc->srv_handler); + req->rq_sent = LTIME_S(CURRENT_TIME); + ptlrpc_pinger_sending_on_import(req->rq_import); + req->rq_type = PTL_LOCAL_MSG_REQUEST; + + ucred.ouc_fsuid = current->fsuid; + ucred.ouc_fsgid = current->fsgid; + ucred.ouc_cap = current->cap_effective; + current->fsuid = current->fsgid = 0; + current->cap_effective = -1; + + /* for nesting journal on MDS and OST*/ + journal_info = current->journal_info; + current->journal_info = NULL; + + rc = local_handle(req, cli->srv_handler); + + current->journal_info = journal_info; - current->fsuid = ucred.ouc_fsuid; - current->fsgid = ucred.ouc_fsgid; - current->cap_effective = ucred.ouc_cap; - current->ngroups = ngroups; + current->fsuid = ucred.ouc_fsuid; + current->fsgid = ucred.ouc_fsgid; + current->cap_effective = ucred.ouc_cap; -// current->journal_info = journal_info; -#endif //SAME_THREAD - ptlrpc_lprocfs_rpc_sent(req); - - } else { - rc = -EOPNOTSUPP; - } - + ptlrpc_lprocfs_rpc_sent(req); RETURN(rc); } @@ -405,6 +204,7 @@ int local_bulk_move(struct ptlrpc_bulk_desc *desc) char *src_addr, *dst_addr; int len, i; unsigned long flags; + ENTRY; if (desc->bd_type == BULK_GET_SINK) { source = desc->bd_req->rq_bulk; @@ -418,7 +218,7 @@ int local_bulk_move(struct ptlrpc_bulk_desc *desc) LASSERT(source); LASSERT(dest); - /* XXX need more investigation for sparse file case */ + /* XXX need more investigation for sparse file case ? */ if (source->bd_page_count != dest->bd_page_count) goto done; @@ -443,12 +243,7 @@ done: desc->bd_network_rw = 0; desc->bd_complete = 1; spin_unlock_irqrestore(&desc->bd_lock, flags); -/* - if (desc->bd_req->rq_set != NULL) - wake_up (&desc->bd_req->rq_set->set_waitq); - else - wake_up (&desc->bd_req->rq_reply_waitq); -*/ + RETURN(0); } -- 1.8.3.1