Whamcloud - gitweb
b: 1963
authorniu <niu>
Fri, 9 Jan 2004 07:02:28 +0000 (07:02 +0000)
committerniu <niu>
Fri, 9 Jan 2004 07:02:28 +0000 (07:02 +0000)
The tag name is b_localprc NOT b_localrpc, sorry for make this mistake.
Following is description for how to trace the bug of local rpc in uml.

* create tag b_localprc and commit in localrpc patch for test and debug.

* How to active localrpc?
  Run lctl, then select MDT device, then 'probe 8'. thus some requests form
  MDC to MDT will go through local rpc.

* How to repeat the bug?
  Startup Lustre in uml by llmount.sh, then run tests/localrpc_test.sh.

* Process local rpc request in another thread.
  Comment off the SAME_THREAD in ptlrpc_local.c will make service hander of
  local rpc be executed in another thread, this way can pass test 99.

* For debug, I disabled all other local rpc request except the LDLM_ENQUEUE
  requests sent via ptlrpc_queue_wait.

lustre/ptlrpc/ptlrpc_local.c [new file with mode: 0644]
lustre/tests/localrpc_test.sh [new file with mode: 0755]

diff --git a/lustre/ptlrpc/ptlrpc_local.c b/lustre/ptlrpc/ptlrpc_local.c
new file mode 100644 (file)
index 0000000..bf72175
--- /dev/null
@@ -0,0 +1,456 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define DEBUG_SUBSYSTEM S_RPC
+
+#ifndef EXPORT_SYMTAB
+#define EXPORT_SYMTAB
+#endif
+
+#ifndef __KERNEL__
+#include <liblustre.h>
+#include <linux/kp30.h>
+#endif
+#include <linux/obd_support.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_net.h>
+#include <portals/types.h>
+#include "ptlrpc_internal.h"
+
+
+struct ptlrpc_local_cs *ost_local_svc = NULL;
+struct ptlrpc_local_cs *mdt_local_svc = NULL;
+
+static int local_handle(struct ptlrpc_request *req, svc_handler_t handler)
+{
+        int rc = 0;
+        unsigned long flags;
+        ENTRY;
+/*
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
+*/
+        req->rq_export = class_conn2export(&req->rq_reqmsg->handle);
+        if (req->rq_export) {
+                req->rq_connection = req->rq_export->exp_connection;
+                ptlrpc_connection_addref(req->rq_connection);
+                if (req->rq_reqmsg->conn_cnt < 
+                    req->rq_export->exp_conn_cnt) {
+                        DEBUG_REQ(D_ERROR, req,
+                                  "DROPPING req from old connection %d < %d",
+                                  req->rq_reqmsg->conn_cnt,
+                                  req->rq_export->exp_conn_cnt);
+                        rc = -EINVAL;
+                        goto out_putconn;
+                }
+
+                req->rq_export->exp_last_request_time =
+                        LTIME_S(CURRENT_TIME);
+        } else {
+                /* create a (hopefully temporary) connection that will be used
+                 * to send the reply if this call doesn't create an export.
+                 * XXX revisit this when we revamp ptlrpc */
+                LBUG();
+                req->rq_connection =
+                        ptlrpc_get_connection(&req->rq_peer, NULL);
+        }
+
+        rc = handler(req);
+
+        spin_lock_irqsave(&req->rq_lock, flags);
+        memset(req->rq_ack_locks, 0, sizeof(req->rq_ack_locks));
+        if (req->rq_bulk)
+                req->rq_bulk->bd_complete = 1;
+        req->rq_repmsg->type = (req->rq_type == PTL_LOCAL_MSG_ERR) ?
+                PTL_RPC_MSG_ERR : PTL_RPC_MSG_REPLY;
+        req->rq_repmsg->status = req->rq_status;
+        req->rq_repmsg->opc = req->rq_reqmsg->opc;
+        req->rq_receiving_reply = 0;
+        req->rq_replied = 1;
+        spin_unlock_irqrestore(&req->rq_lock, flags);
+
+out_putconn:
+        ptlrpc_put_connection(req->rq_connection);
+        if (req->rq_export != NULL) {
+                class_export_put(req->rq_export);
+                req->rq_export = NULL;
+        }
+        
+        RETURN(rc);
+
+}
+
+static int localrpc_main(void *arg)
+{
+        struct ptlrpc_local_cs *svc = arg;
+        struct ptlrpc_thread *thread = svc->svc_thread;
+        svc_handler_t handler = svc->srv_handler;
+        unsigned long flags;
+        int rc;
+        
+        lock_kernel();
+        ptlrpc_daemonize();
+
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
+
+        THREAD_NAME(current->comm, "%s", "localrpc");
+        unlock_kernel();
+
+        thread->t_flags = SVC_RUNNING;
+        wake_up(&thread->t_ctl_waitq);
+
+        while (1) {
+                struct ptlrpc_request *req;
+                struct l_wait_info lwi = { 0 };
+                
+                l_wait_event(svc->req_waitq, 
+                             !list_empty(&svc->req_list) || 
+                             thread->t_flags & SVC_STOPPING, 
+                             &lwi);
+
+                if (thread->t_flags & SVC_STOPPING) {
+                        struct list_head *tmp, *n;
+                        list_for_each_safe(tmp, n, &svc->req_list) {
+                                req = list_entry(tmp, struct ptlrpc_request, 
+                                                 rq_list);
+                                list_del_init(&req->rq_list);
+                        }
+                        break;
+                }
+                
+                spin_lock_irqsave(&svc->req_lock, flags);
+                req = list_entry(svc->req_list.next, struct ptlrpc_request, rq_list);
+                list_del_init(&req->rq_list);
+                spin_unlock_irqrestore(&svc->req_lock, flags);
+
+                rc = local_handle(req, handler);
+                wake_up(&req->rq_reply_waitq);
+        
+        }
+        thread->t_flags = SVC_STOPPED;
+        wake_up(&thread->t_ctl_waitq);
+        RETURN(0);
+
+}       
+
+static int localrpc_start_thread(struct ptlrpc_local_cs *svc)
+{
+        struct l_wait_info lwi = { 0 };
+        struct ptlrpc_thread *thread;
+        int rc;
+        ENTRY;
+
+        OBD_ALLOC(thread, sizeof(*thread));
+        if (thread == NULL) 
+                RETURN(-ENOMEM);
+        init_waitqueue_head(&thread->t_ctl_waitq);
+
+        svc->svc_thread = thread;
+
+        /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+         * just drop the VM and FILES in ptlrpc_daemonize() right away.
+         */
+        rc = kernel_thread(localrpc_main, svc, CLONE_VM | CLONE_FILES);
+        if (rc < 0) {
+                CERROR("cannot start thread: %d\n", rc);
+                GOTO(out_free, rc);
+        }
+        l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
+        
+        RETURN(0);
+out_free:
+        OBD_FREE(thread, sizeof(*thread));
+        RETURN(rc);
+}
+
+static int local_svc_available(struct ptlrpc_request *req)
+{
+        int req_portal = req->rq_request_portal;
+        int req_opc = req->rq_reqmsg->opc;
+        int av_portal = 0, av_opc = 1;
+
+        av_portal =  (req_portal == MDS_REQUEST_PORTAL ||
+                      req_portal == MDS_SETATTR_PORTAL ||
+                      req_portal == MDS_READPAGE_PORTAL ||
+                      req_portal == OST_REQUEST_PORTAL ||
+                      req_portal == OST_CREATE_PORTAL) ? 1 : 0;
+        
+        /* XXX For debug: only LDLM_ENQUEUE available for local rpc */
+        av_opc = (req_opc == LDLM_ENQUEUE) ? 1 : 0;
+        
+        return av_portal & av_opc;
+
+}
+
+static int _local_rpc_init(struct ptlrpc_local_cs **local_svc, 
+                           svc_handler_t handler)
+{
+        int rc = 0;
+        struct ptlrpc_local_cs *svc = NULL;
+        
+        OBD_ALLOC(svc, sizeof(*svc));
+        if (svc == NULL)
+                RETURN( -ENOMEM);
+        svc->srv_handler = handler;
+        INIT_LIST_HEAD(&svc->req_list);
+        spin_lock_init(&svc->req_lock);
+        init_waitqueue_head(&svc->req_waitq);
+        
+        rc = localrpc_start_thread(svc);
+        if (rc) {
+                OBD_FREE(svc, sizeof(*svc));
+                RETURN(rc);
+        }
+
+        *local_svc = svc;
+        RETURN(rc);
+}
+
+int local_rpc_init(char *type, struct ptlrpc_local_cs **svc, 
+                   svc_handler_t handler)
+{
+        int rc = 0;
+
+        if (strcmp(type, "ost") == 0) {
+                if (ost_local_svc == NULL)
+                        rc = _local_rpc_init(&ost_local_svc, handler);
+                *svc = ost_local_svc;
+        
+        } else if (strcmp(type, "mdt") == 0) {
+                if (mdt_local_svc == NULL)
+                        rc = _local_rpc_init(&mdt_local_svc, handler);
+                *svc = mdt_local_svc;
+        } else {
+                LBUG();
+        }
+
+        RETURN(rc);
+}
+
+static void _local_rpc_cleanup(struct ptlrpc_local_cs *svc)
+{
+        unsigned long flags;
+        struct l_wait_info lw = { 0 };
+        
+        spin_lock_irqsave(&svc->req_lock, flags);
+        svc->svc_thread->t_flags = SVC_STOPPING;
+        spin_unlock_irqrestore(&svc->req_lock, flags);
+        wake_up(&svc->req_waitq);
+        
+        l_wait_event(svc->svc_thread->t_ctl_waitq,
+                     svc->svc_thread->t_flags & SVC_STOPPED,
+                     &lw);
+        
+        OBD_FREE(svc->svc_thread, sizeof(*svc->svc_thread));
+        OBD_FREE(svc, sizeof(*svc));
+}
+
+void local_rpc_cleanup(char *type)
+{
+        if (strcmp(type, "ost") == 0 && ost_local_svc != NULL) {
+                _local_rpc_cleanup(ost_local_svc);
+                ost_local_svc = NULL;
+        } else if (strcmp(type, "mdt") == 0 && mdt_local_svc != NULL) {
+                _local_rpc_cleanup(mdt_local_svc);
+                mdt_local_svc = NULL;
+        }
+}
+
+#define SAME_THREAD
+int local_send_rpc(struct ptlrpc_request *req)
+{
+        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+        int rc;
+        unsigned long flags;
+        struct l_wait_info lwi = { 0 };
+        struct obd_ucred ucred;
+        void *journal_info;
+        int ngroups;
+        ENTRY;
+        
+        /* XXX tight restriction for debug */
+        if (local_svc_available(req) && cli->local_rpc != NULL &&
+            req->rq_bulk == NULL) {
+                struct ptlrpc_local_cs *svc = cli->local_rpc;
+                
+                req->rq_reqmsg->handle = req->rq_import->imp_remote_handle;
+                req->rq_reqmsg->conn_cnt = req->rq_import->imp_conn_cnt;
+                
+                LASSERT (req->rq_replen != 0);
+               
+                spin_lock_irqsave (&req->rq_lock, flags);
+                req->rq_receiving_reply = 1;
+                req->rq_replied = 0;
+                req->rq_err = 0;
+                req->rq_timedout = 0;
+                req->rq_resend = 0;
+                req->rq_restart = 0;
+                spin_unlock_irqrestore (&request->rq_lock, flags);
+                
+                req->rq_sent = LTIME_S(CURRENT_TIME);
+//              ptlrpc_pinger_sending_on_import(req->rq_import);
+                req->rq_type = PTL_LOCAL_MSG_REQUEST;
+
+#ifndef SAME_THREAD        
+                spin_lock_irqsave(&svc->req_lock, flags);
+                list_del_init(&req->rq_list);
+                list_add_tail(&req->rq_list, &svc->req_list);
+                spin_unlock_irqrestore(&svc->req_lock, flags);
+                wake_up(&svc->req_waitq);
+                
+                l_wait_event(req->rq_reply_waitq, req->rq_replied, &lwi);
+#else
+
+/*
+                // for nesting journal
+                journal_info = current->journal_info;
+                current->journal_info = NULL;
+*/                
+                ucred.ouc_fsuid = current->fsuid;
+                ucred.ouc_fsgid = current->fsgid;
+                ucred.ouc_cap = current->cap_effective;
+                ngroups = current->ngroups;
+                current->fsuid = current->fsgid = 0;
+                current->ngroups = 0;
+                current->cap_effective = -1;
+                
+                rc = local_handle(req, svc->srv_handler);
+                
+                current->fsuid = ucred.ouc_fsuid;
+                current->fsgid = ucred.ouc_fsgid;
+                current->cap_effective = ucred.ouc_cap;
+                current->ngroups = ngroups;
+                
+//              current->journal_info = journal_info;
+#endif //SAME_THREAD
+                ptlrpc_lprocfs_rpc_sent(req);
+
+        } else {
+                rc = -EOPNOTSUPP;
+        }
+
+        RETURN(rc);
+}
+
+int local_reply(struct ptlrpc_request *req)
+{
+        unsigned long flags;
+        ENTRY;
+        
+        switch (req->rq_type) {
+        case PTL_LOCAL_MSG_REQUEST:
+                req->rq_type = PTL_LOCAL_MSG_REPLY;
+        case PTL_LOCAL_MSG_REPLY:
+        case PTL_LOCAL_MSG_ERR:
+                spin_lock_irqsave(&req->rq_lock, flags);
+                req->rq_want_ack = 0;
+                spin_unlock_irqrestore(&req->rq_lock, flags);
+                break;
+        default:
+                LBUG();
+                break;
+        }
+        RETURN(0);
+}
+
+int local_error(struct ptlrpc_request *req)
+{
+        ENTRY;
+
+        switch (req->rq_type) {
+        case PTL_LOCAL_MSG_REQUEST:
+        case PTL_LOCAL_MSG_REPLY:
+                req->rq_type = PTL_LOCAL_MSG_ERR;
+        case PTL_LOCAL_MSG_ERR:
+                break;
+        default:
+                LBUG();
+                break;
+        }
+        EXIT;
+
+        return local_reply(req);
+}
+
+int local_bulk_move(struct ptlrpc_bulk_desc *desc)
+{        
+        struct ptlrpc_bulk_desc *source, *dest;
+        struct ptlrpc_bulk_page *src_pg, *dst_pg;
+        struct list_head *src, *dst;
+        char *src_addr, *dst_addr;
+        int len, i;
+        unsigned long flags;
+
+        if (desc->bd_type == BULK_GET_SINK) {
+                source = desc->bd_req->rq_bulk;
+                dest = desc;
+        } else if (desc->bd_type == BULK_PUT_SOURCE) {
+                source = desc;
+                dest = desc->bd_req->rq_bulk;
+        } else {
+                LBUG();
+        }
+
+        LASSERT(source);
+        LASSERT(dest);
+        /* XXX need more investigation for sparse file case */
+        if (source->bd_page_count != dest->bd_page_count)
+                goto done;
+
+        src = source->bd_page_list.next;
+        dst = dest->bd_page_list.next;
+        for (i = 0; i < dest->bd_page_count; i++) {
+                src_pg = list_entry(src, struct ptlrpc_bulk_page, bp_link);
+                dst_pg = list_entry(dst, struct ptlrpc_bulk_page, bp_link);
+
+                len = MIN(src_pg->bp_buflen, dst_pg->bp_buflen);
+                src_addr = kmap(src_pg->bp_page) + src_pg->bp_pageoffset;
+                dst_addr = kmap(dst_pg->bp_page) + dst_pg->bp_pageoffset;
+                memcpy(dst_addr, src_addr, len);
+                kunmap(dst_pg->bp_page);
+                kunmap(src_pg->bp_page);
+
+                src = src->next;
+                dst = dst->next;
+        }
+done:
+        spin_lock_irqsave(&desc->bd_lock, flags);
+        desc->bd_network_rw = 0;
+        desc->bd_complete = 1;
+        spin_unlock_irqrestore(&desc->bd_lock, flags);
+/*        
+        if (desc->bd_req->rq_set != NULL)
+                wake_up (&desc->bd_req->rq_set->set_waitq);
+        else
+                wake_up (&desc->bd_req->rq_reply_waitq);
+*/
+        RETURN(0);
+        
+}
+
+
diff --git a/lustre/tests/localrpc_test.sh b/lustre/tests/localrpc_test.sh
new file mode 100755 (executable)
index 0000000..cb50cdd
--- /dev/null
@@ -0,0 +1,16 @@
+#!/bin/bash
+
+dmesg -n 1
+
+#sh llmount.sh
+../utils/lctl <<eof
+device 2
+probe 2
+eof
+
+dmesg -c > /dev/null
+
+dmesg -n 7
+#echo 0x300811 > /proc/sys/portals/debug
+
+START=:  CLEAN=: ONLY="99a 99b 99c" sh sanity.sh