#include <portals/types.h>
#include "ptlrpc_internal.h"
-
-struct ptlrpc_local_cs *ost_local_svc = NULL;
-struct ptlrpc_local_cs *mdt_local_svc = NULL;
-
static int local_handle(struct ptlrpc_request *req, svc_handler_t handler)
{
int rc = 0;
unsigned long flags;
ENTRY;
-/*
- SIGNAL_MASK_LOCK(current, flags);
- sigfillset(¤t->blocked);
- RECALC_SIGPENDING;
- SIGNAL_MASK_UNLOCK(current, flags);
-*/
+
req->rq_export = class_conn2export(&req->rq_reqmsg->handle);
if (req->rq_export) {
req->rq_connection = req->rq_export->exp_connection;
req->rq_export->exp_last_request_time =
LTIME_S(CURRENT_TIME);
} else {
+ /* connection must has been established */
+ LBUG();
/* create a (hopefully temporary) connection that will be used
* to send the reply if this call doesn't create an export.
* XXX revisit this when we revamp ptlrpc */
- LBUG();
req->rq_connection =
ptlrpc_get_connection(&req->rq_peer, NULL);
}
}
-static int localrpc_main(void *arg)
-{
- struct ptlrpc_local_cs *svc = arg;
- struct ptlrpc_thread *thread = svc->svc_thread;
- svc_handler_t handler = svc->srv_handler;
- unsigned long flags;
- int rc;
-
- lock_kernel();
- ptlrpc_daemonize();
-
- SIGNAL_MASK_LOCK(current, flags);
- sigfillset(¤t->blocked);
- RECALC_SIGPENDING;
- SIGNAL_MASK_UNLOCK(current, flags);
-
- THREAD_NAME(current->comm, "%s", "localrpc");
- unlock_kernel();
-
- thread->t_flags = SVC_RUNNING;
- wake_up(&thread->t_ctl_waitq);
-
- while (1) {
- struct ptlrpc_request *req;
- struct l_wait_info lwi = { 0 };
-
- l_wait_event(svc->req_waitq,
- !list_empty(&svc->req_list) ||
- thread->t_flags & SVC_STOPPING,
- &lwi);
-
- if (thread->t_flags & SVC_STOPPING) {
- struct list_head *tmp, *n;
- list_for_each_safe(tmp, n, &svc->req_list) {
- req = list_entry(tmp, struct ptlrpc_request,
- rq_list);
- list_del_init(&req->rq_list);
- }
- break;
- }
-
- spin_lock_irqsave(&svc->req_lock, flags);
- req = list_entry(svc->req_list.next, struct ptlrpc_request, rq_list);
- list_del_init(&req->rq_list);
- spin_unlock_irqrestore(&svc->req_lock, flags);
-
- rc = local_handle(req, handler);
- wake_up(&req->rq_reply_waitq);
-
- }
- thread->t_flags = SVC_STOPPED;
- wake_up(&thread->t_ctl_waitq);
- RETURN(0);
-
-}
-
-static int localrpc_start_thread(struct ptlrpc_local_cs *svc)
-{
- struct l_wait_info lwi = { 0 };
- struct ptlrpc_thread *thread;
- int rc;
- ENTRY;
-
- OBD_ALLOC(thread, sizeof(*thread));
- if (thread == NULL)
- RETURN(-ENOMEM);
- init_waitqueue_head(&thread->t_ctl_waitq);
-
- svc->svc_thread = thread;
-
- /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
- * just drop the VM and FILES in ptlrpc_daemonize() right away.
- */
- rc = kernel_thread(localrpc_main, svc, CLONE_VM | CLONE_FILES);
- if (rc < 0) {
- CERROR("cannot start thread: %d\n", rc);
- GOTO(out_free, rc);
- }
- l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
-
- RETURN(0);
-out_free:
- OBD_FREE(thread, sizeof(*thread));
- RETURN(rc);
-}
-
-static int local_svc_available(struct ptlrpc_request *req)
+int local_svc_available(struct ptlrpc_request *req)
{
int req_portal = req->rq_request_portal;
- int req_opc = req->rq_reqmsg->opc;
- int av_portal = 0, av_opc = 1;
+ int av_portal = 0;
av_portal = (req_portal == MDS_REQUEST_PORTAL ||
req_portal == MDS_SETATTR_PORTAL ||
req_portal == OST_REQUEST_PORTAL ||
req_portal == OST_CREATE_PORTAL) ? 1 : 0;
- /* XXX For debug: only LDLM_ENQUEUE available for local rpc */
- av_opc = (req_opc == LDLM_ENQUEUE) ? 1 : 0;
-
- return av_portal & av_opc;
-
-}
-
-static int _local_rpc_init(struct ptlrpc_local_cs **local_svc,
- svc_handler_t handler)
-{
- int rc = 0;
- struct ptlrpc_local_cs *svc = NULL;
-
- OBD_ALLOC(svc, sizeof(*svc));
- if (svc == NULL)
- RETURN( -ENOMEM);
-
- svc->srv_handler = handler;
- INIT_LIST_HEAD(&svc->req_list);
- spin_lock_init(&svc->req_lock);
- init_waitqueue_head(&svc->req_waitq);
-
- rc = localrpc_start_thread(svc);
- if (rc) {
- OBD_FREE(svc, sizeof(*svc));
- RETURN(rc);
- }
-
- *local_svc = svc;
- RETURN(rc);
-}
-
-int local_rpc_init(char *type, struct ptlrpc_local_cs **svc,
- svc_handler_t handler)
-{
- int rc = 0;
-
- if (strcmp(type, "ost") == 0) {
- if (ost_local_svc == NULL)
- rc = _local_rpc_init(&ost_local_svc, handler);
- *svc = ost_local_svc;
-
- } else if (strcmp(type, "mdt") == 0) {
- if (mdt_local_svc == NULL)
- rc = _local_rpc_init(&mdt_local_svc, handler);
- *svc = mdt_local_svc;
- } else {
- LBUG();
- }
-
- RETURN(rc);
-}
-
-static void _local_rpc_cleanup(struct ptlrpc_local_cs *svc)
-{
- unsigned long flags;
- struct l_wait_info lw = { 0 };
-
- spin_lock_irqsave(&svc->req_lock, flags);
- svc->svc_thread->t_flags = SVC_STOPPING;
- spin_unlock_irqrestore(&svc->req_lock, flags);
- wake_up(&svc->req_waitq);
-
- l_wait_event(svc->svc_thread->t_ctl_waitq,
- svc->svc_thread->t_flags & SVC_STOPPED,
- &lw);
-
- OBD_FREE(svc->svc_thread, sizeof(*svc->svc_thread));
- OBD_FREE(svc, sizeof(*svc));
-}
+ return av_portal;
-void local_rpc_cleanup(char *type)
-{
- if (strcmp(type, "ost") == 0 && ost_local_svc != NULL) {
- _local_rpc_cleanup(ost_local_svc);
- ost_local_svc = NULL;
- } else if (strcmp(type, "mdt") == 0 && mdt_local_svc != NULL) {
- _local_rpc_cleanup(mdt_local_svc);
- mdt_local_svc = NULL;
- }
}
-#define SAME_THREAD
int local_send_rpc(struct ptlrpc_request *req)
{
- struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
int rc;
unsigned long flags;
- struct l_wait_info lwi = { 0 };
struct obd_ucred ucred;
void *journal_info;
- int ngroups;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
ENTRY;
- /* XXX tight restriction for debug */
- if (local_svc_available(req) && cli->local_rpc != NULL &&
- req->rq_bulk == NULL) {
- struct ptlrpc_local_cs *svc = cli->local_rpc;
+ LASSERT (req->rq_replen != 0);
+ req->rq_reqmsg->handle = req->rq_import->imp_remote_handle;
+ req->rq_reqmsg->conn_cnt = req->rq_import->imp_conn_cnt;
- req->rq_reqmsg->handle = req->rq_import->imp_remote_handle;
- req->rq_reqmsg->conn_cnt = req->rq_import->imp_conn_cnt;
-
- LASSERT (req->rq_replen != 0);
-
- spin_lock_irqsave (&req->rq_lock, flags);
- req->rq_receiving_reply = 1;
- req->rq_replied = 0;
- req->rq_err = 0;
- req->rq_timedout = 0;
- req->rq_resend = 0;
- req->rq_restart = 0;
- spin_unlock_irqrestore (&request->rq_lock, flags);
-
- req->rq_sent = LTIME_S(CURRENT_TIME);
-// ptlrpc_pinger_sending_on_import(req->rq_import);
- req->rq_type = PTL_LOCAL_MSG_REQUEST;
-
-#ifndef SAME_THREAD
- spin_lock_irqsave(&svc->req_lock, flags);
- list_del_init(&req->rq_list);
- list_add_tail(&req->rq_list, &svc->req_list);
- spin_unlock_irqrestore(&svc->req_lock, flags);
- wake_up(&svc->req_waitq);
-
- l_wait_event(req->rq_reply_waitq, req->rq_replied, &lwi);
-#else
-
-/*
- // for nesting journal
- journal_info = current->journal_info;
- current->journal_info = NULL;
-*/
- ucred.ouc_fsuid = current->fsuid;
- ucred.ouc_fsgid = current->fsgid;
- ucred.ouc_cap = current->cap_effective;
- ngroups = current->ngroups;
- current->fsuid = current->fsgid = 0;
- current->ngroups = 0;
- current->cap_effective = -1;
+ spin_lock_irqsave (&req->rq_lock, flags);
+ req->rq_receiving_reply = 1;
+ req->rq_replied = 0;
+ req->rq_err = 0;
+ req->rq_timedout = 0;
+ req->rq_resend = 0;
+ req->rq_restart = 0;
+ spin_unlock_irqrestore (&request->rq_lock, flags);
- rc = local_handle(req, svc->srv_handler);
+ req->rq_sent = LTIME_S(CURRENT_TIME);
+ ptlrpc_pinger_sending_on_import(req->rq_import);
+ req->rq_type = PTL_LOCAL_MSG_REQUEST;
+
+ ucred.ouc_fsuid = current->fsuid;
+ ucred.ouc_fsgid = current->fsgid;
+ ucred.ouc_cap = current->cap_effective;
+ current->fsuid = current->fsgid = 0;
+ current->cap_effective = -1;
+
+ /* for nesting journal on MDS and OST*/
+ journal_info = current->journal_info;
+ current->journal_info = NULL;
+
+ rc = local_handle(req, cli->srv_handler);
+
+ current->journal_info = journal_info;
- current->fsuid = ucred.ouc_fsuid;
- current->fsgid = ucred.ouc_fsgid;
- current->cap_effective = ucred.ouc_cap;
- current->ngroups = ngroups;
+ current->fsuid = ucred.ouc_fsuid;
+ current->fsgid = ucred.ouc_fsgid;
+ current->cap_effective = ucred.ouc_cap;
-// current->journal_info = journal_info;
-#endif //SAME_THREAD
- ptlrpc_lprocfs_rpc_sent(req);
-
- } else {
- rc = -EOPNOTSUPP;
- }
-
+ ptlrpc_lprocfs_rpc_sent(req);
RETURN(rc);
}
char *src_addr, *dst_addr;
int len, i;
unsigned long flags;
+ ENTRY;
if (desc->bd_type == BULK_GET_SINK) {
source = desc->bd_req->rq_bulk;
LASSERT(source);
LASSERT(dest);
- /* XXX need more investigation for sparse file case */
+ /* XXX need more investigation for sparse file case ? */
if (source->bd_page_count != dest->bd_page_count)
goto done;
desc->bd_network_rw = 0;
desc->bd_complete = 1;
spin_unlock_irqrestore(&desc->bd_lock, flags);
-/*
- if (desc->bd_req->rq_set != NULL)
- wake_up (&desc->bd_req->rq_set->set_waitq);
- else
- wake_up (&desc->bd_req->rq_reply_waitq);
-*/
+
RETURN(0);
}