Whamcloud - gitweb
change portals mechnism to lnet mecnism in mountconf
authorlincent <lincent>
Mon, 17 Oct 2005 14:01:33 +0000 (14:01 +0000)
committerlincent <lincent>
Mon, 17 Oct 2005 14:01:33 +0000 (14:01 +0000)
lustre/ptlrpc/client.c
lustre/ptlrpc/connection.c
lustre/ptlrpc/events.c
lustre/ptlrpc/import.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recov_thread.c
lustre/ptlrpc/service.c

index 3016b20..a99cb48 100644 (file)
@@ -49,16 +49,17 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
 {
         struct ptlrpc_connection *c;
-        lnet_process_id_t          peer;
+        lnet_nid_t                self;
+        lnet_process_id_t         peer;
         int                       err;
 
-        err = ptlrpc_uuid_to_peer(uuid, &peer);
+        err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
         if (err != 0) {
                 CERROR("cannot find peer %s!\n", uuid->uuid);
                 return NULL;
         }
 
-        c = ptlrpc_get_connection(peer, uuid);
+        c = ptlrpc_get_connection(peer, self, uuid);
         if (c) {
                 memcpy(c->c_remote_uuid.uuid,
                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
@@ -72,16 +73,18 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
                                  struct obd_uuid *uuid)
 {
+        lnet_nid_t        self;
         lnet_process_id_t peer;
-        int              err;
+        int               err;
 
-        err = ptlrpc_uuid_to_peer(uuid, &peer);
+        err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
         if (err != 0) {
                 CERROR("cannot find peer %s!\n", uuid->uuid);
                 return;
         }
 
-        memcpy(&conn->c_peer, &peer, sizeof (peer));
+        conn->c_peer = peer;
+        conn->c_self = self;
         return;
 }
 
@@ -636,7 +639,7 @@ static int after_reply(struct ptlrpc_request *req)
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
                         req->rq_commit_cb(req);
                         spin_lock_irqsave(&imp->imp_lock, flags);
-               }
+                }
 
                 if (req->rq_transno > imp->imp_max_transno)
                         imp->imp_max_transno = req->rq_transno;
@@ -1494,13 +1497,13 @@ restart:
                 lwi = LWI_INTR(interrupted_request, req);
                 rc = l_wait_event(req->rq_reply_waitq,
                                   (req->rq_send_state == imp->imp_state ||
-                                   req->rq_err),
+                                   req->rq_err || req->rq_intr),
                                   &lwi);
-                DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
+                DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)",
                           current->comm,
                           ptlrpc_import_state_name(imp->imp_state),
                           ptlrpc_import_state_name(req->rq_send_state),
-                          req->rq_err);
+                          req->rq_err, req->rq_intr);
 
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 list_del_init(&req->rq_list);
index 4bd52d2..fc55e25 100644 (file)
@@ -46,61 +46,85 @@ void ptlrpc_dump_connections(void)
 
         list_for_each(tmp, &conn_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
-                CERROR("Connection %p/%s has refcount %d (nid=%s)\n",
+                CERROR("Connection %p/%s has refcount %d (nid=%s->%s)\n",
                        c, c->c_remote_uuid.uuid, atomic_read(&c->c_refcount),
+                       libcfs_nid2str(c->c_self), 
                        libcfs_nid2str(c->c_peer.nid));
         }
         EXIT;
 }
 
-struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer,
-                                                struct obd_uuid *uuid)
+struct ptlrpc_connection*
+ptlrpc_lookup_conn_locked (lnet_process_id_t peer)
 {
-        struct list_head *tmp, *pos;
         struct ptlrpc_connection *c;
-        ENTRY;
+        struct list_head         *tmp;
 
-        CDEBUG(D_INFO, "peer is %s\n", libcfs_id2str(peer));
-
-        spin_lock(&conn_lock);
         list_for_each(tmp, &conn_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
-                if (memcmp(&peer, &c->c_peer, sizeof(peer)) == 0) {
-                        ptlrpc_connection_addref(c);
-                        GOTO(out, c);
-                }
+
+                if (peer.nid == c->c_peer.nid &&
+                    peer.pid == c->c_peer.pid)
+                        return ptlrpc_connection_addref(c);
         }
 
-        list_for_each_safe(tmp, pos, &conn_unused_list) {
+        list_for_each(tmp, &conn_unused_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
-                if (memcmp(&peer, &c->c_peer, sizeof(peer)) == 0) {
-                        ptlrpc_connection_addref(c);
+
+                if (peer.nid == c->c_peer.nid &&
+                    peer.pid == c->c_peer.pid) {
                         list_del(&c->c_link);
                         list_add(&c->c_link, &conn_list);
-                        GOTO(out, c);
+                        return ptlrpc_connection_addref(c);
                 }
         }
 
-        /* FIXME: this should be a slab once we can validate slab addresses
-         * without OOPSing */
-        OBD_ALLOC_GFP(c, sizeof(*c), GFP_ATOMIC);
-       
+        return NULL;
+}
+
+
+struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer,
+                                                lnet_nid_t self, struct obd_uuid *uuid)
+{
+        struct ptlrpc_connection *c;
+        struct ptlrpc_connection *c2;
+        ENTRY;
+
+        CDEBUG(D_INFO, "self %s peer %s\n", 
+               libcfs_nid2str(self), libcfs_id2str(peer));
+
+        spin_lock(&conn_lock);
+
+        c = ptlrpc_lookup_conn_locked(peer);
+        
+        spin_unlock(&conn_lock);
+
+        if (c != NULL)
+                RETURN (c);
+        
+        OBD_ALLOC(c, sizeof(*c));
         if (c == NULL)
-                GOTO(out, c);
+                RETURN (NULL);
 
-        if (uuid && uuid->uuid)                         /* XXX ???? */
+        atomic_set(&c->c_refcount, 1);
+        c->c_peer = peer;
+        c->c_self = self;
+        if (uuid != NULL)
                 obd_str2uuid(&c->c_remote_uuid, uuid->uuid);
-        atomic_set(&c->c_refcount, 0);
-        memcpy(&c->c_peer, &peer, sizeof(c->c_peer));
-
-        ptlrpc_connection_addref(c);
 
-        list_add(&c->c_link, &conn_list);
+        spin_lock(&conn_lock);
 
-        EXIT;
- out:
+        c2 = ptlrpc_lookup_conn_locked(peer);
+        if (c2 == NULL)
+                list_add(&c->c_link, &conn_list);
+        
         spin_unlock(&conn_lock);
-        return c;
+
+        if (c2 == NULL)
+                RETURN (c);
+        
+        OBD_FREE(c, sizeof(*c));
+        RETURN (c2);
 }
 
 int ptlrpc_put_connection(struct ptlrpc_connection *c)
index 5c77baa..da6f8ad 100644 (file)
@@ -202,6 +202,7 @@ void request_in_callback(lnet_event_t *ev)
                 req->rq_reqlen = ev->mlength;
         do_gettimeofday(&req->rq_arrival_time);
         req->rq_peer = ev->initiator;
+        req->rq_self = ev->target.nid;
         req->rq_rqbd = rqbd;
         req->rq_phase = RQ_PHASE_NEW;
 #if CRAY_XT3
@@ -341,7 +342,8 @@ static void ptlrpc_master_callback(lnet_event_t *ev)
         callback (ev);
 }
 
-int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, lnet_process_id_t *peer)
+int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, 
+                         lnet_process_id_t *peer, lnet_nid_t *self)
 {
         int               best_dist = 0;
         int               best_order = 0;
@@ -350,17 +352,24 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, lnet_process_id_t *peer)
         int               portals_compatibility;
         int               dist;
         int               order;
-        lnet_nid_t        nid;
+        lnet_nid_t        dst_nid;
+        lnet_nid_t        src_nid;
 
-        portals_compatibility = LNetCtl(IOC_PORTAL_PORTALS_COMPATIBILITY, NULL);
-        
-        /* Choose the matching UUID that's closest */
-        peer->pid = LUSTRE_SRV_PTL_PID;
+        portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
+
+        peer->pid = LUSTRE_SRV_LNET_PID;
 
-        while (lustre_uuid_to_peer(uuid->uuid, &nid, count++) == 0) {
-                dist = LNetDist(nid, &order);
+        /* Choose the matching UUID that's closest */
+        while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
+                dist = LNetDist(dst_nid, &src_nid, &order);
                 if (dist < 0)
                         continue;
+
+                if (dist == 0) {                /* local! use loopback LND */
+                        peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
+                        rc = 0;
+                        break;
+                }
                 
                 LASSERT (order >= 0);
                 if (rc < 0 ||
@@ -374,18 +383,18 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, lnet_process_id_t *peer)
                                  * NET, so if I'm reading new config logs, or
                                  * getting configured by (new) lconf I can
                                  * still talk to old servers. */
-                                nid = PTL_MKNID(0, PTL_NIDADDR(nid));
+                                dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
+                                src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
                         }
-                        peer->nid = nid;
+                        peer->nid = dst_nid;
+                        *self = src_nid;
                         rc = 0;
                 }
         }
 
         CDEBUG(D_WARNING,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
         if (rc != 0) 
-                LCONSOLE_ERROR("I couldn't find a NID for %s. Are the networks "
-                               "alive? Is routing set up?\n", uuid->uuid);
-
+                CERROR("No NID found for %s\n", uuid->uuid);
         return rc;
 }
 
@@ -432,7 +441,7 @@ lnet_pid_t ptl_get_pid(void)
 #ifndef  __KERNEL__
         pid = getpid();
 #else
-        pid = LUSTRE_SRV_PTL_PID;
+        pid = LUSTRE_SRV_LNET_PID;
 #endif
         return pid;
 }
@@ -514,7 +523,7 @@ liblustre_check_events (int timeout)
         if (rc == 0)
                 RETURN(0);
         
-        LASSERT (rc == -EOVERFLOW || rc == 0);
+        LASSERT (rc == -EOVERFLOW || rc == 1);
         
         /* liblustre: no asynch callback so we can't affort to miss any
          * events... */
index 87185f4..83c9d68 100644 (file)
  */
 
 #define DEBUG_SUBSYSTEM S_RPC
-
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #ifdef __KERNEL__
 # include <linux/config.h>
 # include <linux/module.h>
@@ -96,7 +91,6 @@ int ptlrpc_init_import(struct obd_import *imp)
 
         return 0;
 }
-EXPORT_SYMBOL(ptlrpc_init_import);
 
 #define UUID_STR "_UUID"
 static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
@@ -108,7 +102,7 @@ static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uu
 
         if (*uuid_len < strlen(UUID_STR))
                 return;
-        
+
         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
                     UUID_STR, strlen(UUID_STR)))
                 *uuid_len -= strlen(UUID_STR);
@@ -268,7 +262,7 @@ static int import_select_connection(struct obd_import *imp)
                 RETURN(-EINVAL);
         }
 
-        if (imp->imp_conn_current && 
+        if (imp->imp_conn_current &&
             imp->imp_conn_current->oic_item.next != &imp->imp_conn_list) {
                 imp_conn = list_entry(imp->imp_conn_current->oic_item.next,
                                       struct obd_import_conn, oic_item);
@@ -289,6 +283,10 @@ static int import_select_connection(struct obd_import *imp)
         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
         class_export_put(dlmexp);
 
+        if (imp->imp_conn_current && (imp->imp_conn_current != imp_conn)) {
+                LCONSOLE_WARN("Changing connection for %s to %s\n",
+                              imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid);
+        }
         imp->imp_conn_current = imp_conn;
         CDEBUG(D_HA, "%s: import %p using connection %s\n",
                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid);
@@ -377,8 +375,12 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
         aa->pcaa_peer_committed = committed_before_reconnect;
         aa->pcaa_initial_connect = initial_connect;
 
-        if (aa->pcaa_initial_connect)
+        if (aa->pcaa_initial_connect) {
                 imp->imp_replayable = 1;
+                /* On an initial connect, we don't know which one of a 
+                   failover server pair is up.  Don't wait long. */
+                request->rq_timeout = max((int)(obd_timeout / 20), 5);
+        }
 
         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request");
         ptlrpcd_add_req(request);
@@ -390,7 +392,6 @@ out:
 
         RETURN(rc);
 }
-EXPORT_SYMBOL(ptlrpc_connect_import);
 
 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
 {
@@ -448,7 +449,7 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
 
         /* All imports are pingable */
         imp->imp_pingable = 1;
-        
+
         if (aa->pcaa_initial_connect) {
                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
                         CDEBUG(D_HA, "connected to replayable target: %s\n",
@@ -538,7 +539,23 @@ finish:
                         RETURN(0);
                 }
         } else {
+                struct obd_connect_data *ocd;
+
+                ocd = lustre_swab_repbuf(request, 0,
+                                         sizeof *ocd, lustre_swab_connect);
+                if (ocd == NULL) {
+                        CERROR("Wrong connect data from server\n");
+                        rc = -EPROTO;
+                        GOTO(out, rc);
+                }
                 spin_lock_irqsave(&imp->imp_lock, flags);
+                /*
+                 * check that server granted subset of flags we asked for.
+                 */
+                LASSERT((ocd->ocd_connect_flags &
+                         imp->imp_connect_data.ocd_connect_flags) ==
+                        ocd->ocd_connect_flags);
+                imp->imp_connect_data = *ocd;
                 if (imp->imp_conn_current != NULL) {
                         list_del(&imp->imp_conn_current->oic_item);
                         list_add(&imp->imp_conn_current->oic_item,
@@ -581,7 +598,7 @@ static int completed_replay_interpret(struct ptlrpc_request *req,
                 ptlrpc_import_recovery_state_machine(req->rq_import);
         } else {
                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
-                       "reconnecting\n", 
+                       "reconnecting\n",
                        req->rq_import->imp_obd->obd_name, req->rq_status);
                 ptlrpc_connect_import(req->rq_import, NULL);
         }
index a4393cc..39491f4 100644 (file)
@@ -74,6 +74,8 @@ struct ll_rpc_opcode {
         { MDS_SET_INFO,     "mds_set_info" },
         { MDS_QUOTACHECK,   "mds_quotacheck" },
         { MDS_QUOTACTL,     "mds_quotactl" },
+ //       { MDS_GETXATTR,     "mds_getxattr" },
+ //       { MDS_SETXATTR,     "mds_setxattr" },
         { LDLM_ENQUEUE,     "ldlm_enqueue" },
         { LDLM_CONVERT,     "ldlm_convert" },
         { LDLM_CANCEL,      "ldlm_cancel" },
@@ -334,9 +336,10 @@ static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter)
                  * must be just as careful as the service's request
                  * parser. Currently I only print stuff here I know is OK
                  * to look at coz it was set up in request_in_callback()!!! */
-                seq_printf(s, LPD64":%s:"LPD64":%d:%s ",
-                           req->rq_history_seq, libcfs_id2str(req->rq_peer),
-                           req->rq_xid, req->rq_reqlen,ptlrpc_rqphase2str(req));
+                seq_printf(s, LPD64":%s:%s:"LPD64":%d:%s ",
+                           req->rq_history_seq, libcfs_nid2str(req->rq_self), 
+                           libcfs_id2str(req->rq_peer), req->rq_xid, 
+                           req->rq_reqlen,ptlrpc_rqphase2str(req));
 
                 if (svc->srv_request_history_print_fn == NULL)
                         seq_printf(s, "\n");
index 3698319..e1b3219 100644 (file)
@@ -68,7 +68,8 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len,
         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
                len, portal, xid);
 
-        rc = LNetPut (*mdh, ack, conn->c_peer, portal, xid, 0, 0);
+        rc = LNetPut (conn->c_self, *mdh, ack, 
+                      conn->c_peer, portal, xid, 0, 0);
         if (rc != 0) {
                 int rc2;
                 /* We're going to get an UNLINK event when I unlink below,
@@ -85,11 +86,11 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len,
 
 int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
 {
-        int                 rc;
-        int                 rc2;
-        lnet_process_id_t    peer;
-        lnet_md_t            md;
-        __u64               xid;
+        struct ptlrpc_connection *conn = desc->bd_export->exp_connection;
+        int                       rc;
+        int                       rc2;
+        lnet_md_t                 md;
+        __u64                     xid;
         ENTRY;
 
         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET)) 
@@ -100,7 +101,6 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
                  desc->bd_type == BULK_GET_SINK);
         desc->bd_success = 0;
-        peer = desc->bd_export->exp_connection->c_peer;
 
         md.user_ptr = &desc->bd_cbid;
         md.eq_handle = ptlrpc_eq_h;
@@ -125,24 +125,25 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         xid = desc->bd_req->rq_xid;
         CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
                "id %s xid "LPX64"\n", desc->bd_iov_count,
-               desc->bd_nob, desc->bd_portal, libcfs_id2str(peer), xid);
+               desc->bd_nob, desc->bd_portal, 
+               libcfs_id2str(conn->c_peer), xid);
 
         /* Network is about to get at the memory */
         desc->bd_network_rw = 1;
 
         if (desc->bd_type == BULK_PUT_SOURCE)
-                rc = LNetPut (desc->bd_md_h, LNET_ACK_REQ, peer,
-                              desc->bd_portal, xid, 0, 0);
+                rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ, 
+                              conn->c_peer, desc->bd_portal, xid, 0, 0);
         else
-                rc = LNetGet (desc->bd_md_h, peer,
-                              desc->bd_portal, xid, 0);
+                rc = LNetGet (conn->c_self, desc->bd_md_h, 
+                              conn->c_peer, desc->bd_portal, xid, 0);
 
         if (rc != 0) {
                 /* Can't send, so we unlink the MD bound above.  The UNLINK
                  * event this creates will signal completion with failure,
                  * so we return SUCCESS here! */
                 CERROR("Transfer(%s, %d, "LPX64") failed: %d\n",
-                       libcfs_id2str(peer), desc->bd_portal, xid, rc);
+                       libcfs_id2str(conn->c_peer), desc->bd_portal, xid, rc);
                 rc2 = LNetMDUnlink(desc->bd_md_h);
                 LASSERT (rc2 == 0);
         }
@@ -335,7 +336,7 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
         req->rq_repmsg->opc    = req->rq_reqmsg->opc;
 
         if (req->rq_export == NULL) 
-                conn = ptlrpc_get_connection(req->rq_peer, NULL);
+                conn = ptlrpc_get_connection(req->rq_peer, req->rq_self, NULL);
         else
                 conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
 
@@ -376,6 +377,62 @@ int ptlrpc_error(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
+int ptl_send_rpc_nowait(struct ptlrpc_request *request)
+{
+        int rc;
+        struct ptlrpc_connection *connection;
+        unsigned long flags;
+        ENTRY;
+
+        LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
+
+        if (request->rq_import->imp_obd &&
+            request->rq_import->imp_obd->obd_fail) {
+                CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
+                       request->rq_import->imp_obd->obd_name);
+                /* this prevents us from waiting in ptlrpc_queue_wait */
+                request->rq_err = 1;
+                RETURN(-ENODEV);
+        }
+        
+        connection = request->rq_import->imp_connection;
+
+        request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
+        request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST;
+        request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
+
+        spin_lock_irqsave (&request->rq_lock, flags);
+        /* If the MD attach succeeds, there _will_ be a reply_in callback */
+        request->rq_receiving_reply = 0;
+        /* Clear any flags that may be present from previous sends. */
+        request->rq_replied = 0;
+        request->rq_err = 0;
+        request->rq_timedout = 0;
+        request->rq_net_err = 0;
+        request->rq_resend = 0;
+        request->rq_restart = 0;
+        spin_unlock_irqrestore (&request->rq_lock, flags);
+
+        ptlrpc_request_addref(request);       /* +1 ref for the SENT callback */
+
+        request->rq_sent = CURRENT_SECONDS;
+        ptlrpc_pinger_sending_on_import(request->rq_import);
+        rc = ptl_send_buf(&request->rq_req_md_h, 
+                          request->rq_reqmsg, request->rq_reqlen,
+                          LNET_NOACK_REQ, &request->rq_req_cbid, 
+                          connection,
+                          request->rq_request_portal,
+                          request->rq_xid);
+        if (rc == 0) {
+                ptlrpc_lprocfs_rpc_sent(request);
+        } else {
+                ptlrpc_req_finished (request);          /* drop callback ref */
+        }
+
+        return rc;
+}
+
+
 int ptl_send_rpc(struct ptlrpc_request *request)
 {
         int rc;
index bf07199..1abbdab 100644 (file)
@@ -850,8 +850,8 @@ void lustre_swab_qdata(struct qunit_data *d)
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
-         * running on Linux schnapps.adilger.int 2.4.28 #2 Thu Dec 16 14:35:03 MST 2004 i686 i686 i38
-         * with gcc version 3.3.2 20040108 (Red Hat Linux 3.3.2-6) */
+         * running on Linux mustang 2.6.12-1.1456_FC4smp #1 SMP Thu Sep 22 02:22:14 EDT 2005 i686 i68
+         * with gcc version 4.0.1 20050727 (Red Hat 4.0.1-5) */
 
 
         /* Constants... */
@@ -951,7 +951,7 @@ void lustre_assert_wire_constants(void)
                  (long long)MDS_QUOTACHECK);
         LASSERTF(MDS_QUOTACTL == 48, " found %lld\n",
                  (long long)MDS_QUOTACTL);
-        LASSERTF(MDS_LAST_OPC == 49, " found %lld\n",
+        LASSERTF(MDS_LAST_OPC == 51, " found %lld\n",
                  (long long)MDS_LAST_OPC);
         LASSERTF(REINT_SETATTR == 1, " found %lld\n",
                  (long long)REINT_SETATTR);
@@ -1503,6 +1503,8 @@ void lustre_assert_wire_constants(void)
                  (long long)OBD_BRW_SYNC);
         LASSERTF(OBD_BRW_FROM_GRANT == 32, " found %lld\n",
                  (long long)OBD_BRW_FROM_GRANT);
+        LASSERTF(OBD_BRW_NOQUOTA == 256, " found %lld\n",
+                 (long long)OBD_BRW_NOQUOTA);
 
         /* Checks for struct ost_body */
         LASSERTF((int)sizeof(struct ost_body) == 208, " found %lld\n",
@@ -1957,10 +1959,6 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct ldlm_flock, end));
         LASSERTF((int)sizeof(((struct ldlm_flock *)0)->end) == 8, " found %lld\n",
                  (long long)(int)sizeof(((struct ldlm_flock *)0)->end));
-        LASSERTF((int)offsetof(struct ldlm_flock, blocking_export) == 16, " found %lld\n",
-                 (long long)(int)offsetof(struct ldlm_flock, blocking_export));
-        LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_export) == 8, " found %lld\n",
-                 (long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_export));
         LASSERTF((int)offsetof(struct ldlm_flock, blocking_pid) == 24, " found %lld\n",
                  (long long)(int)offsetof(struct ldlm_flock, blocking_pid));
         LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_pid) == 4, " found %lld\n",
index 4df0fe7..35d91ee 100644 (file)
@@ -95,6 +95,7 @@ EXPORT_SYMBOL(ptlrpc_reply);
 EXPORT_SYMBOL(ptlrpc_error);
 EXPORT_SYMBOL(ptlrpc_resend_req);
 EXPORT_SYMBOL(ptl_send_rpc);
+EXPORT_SYMBOL(ptl_send_rpc_nowait);
 
 /* client.c */
 EXPORT_SYMBOL(ptlrpc_init_client);
index 2218b09..1c6ada7 100644 (file)
@@ -515,7 +515,7 @@ static int log_process_thread(void *args)
         SIGNAL_MASK_UNLOCK(current, flags);
         unlock_kernel();
 
-        rc = llog_create(ctxt, &llh, &logid, NULL, NULL);
+        rc = llog_create(ctxt, &llh, &logid, NULL);
         if (rc) {
                 CERROR("llog_create failed %d\n", rc);
                 RETURN(rc);
index 6f43533..af61984 100644 (file)
@@ -97,7 +97,7 @@ ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
 {
         struct ptlrpc_service *svc = rqbd->rqbd_service;
         unsigned long          flags;
-        
+
         LASSERT (rqbd->rqbd_refcount == 0);
         LASSERT (list_empty(&rqbd->rqbd_reqs));
 
@@ -414,7 +414,7 @@ ptlrpc_server_free_request(struct ptlrpc_request *req)
 
                         spin_lock_irqsave(&svc->srv_lock, flags);
 
-                        /* schedule request buffer for re-use.  
+                        /* schedule request buffer for re-use.
                          * NB I can only do this after I've disposed of their
                          * reqs; particularly the embedded req */
                         list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
@@ -598,7 +598,7 @@ put_conn:
 }
 
 static int
-ptlrpc_server_handle_reply (struct ptlrpc_service *svc) 
+ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
 {
         struct ptlrpc_reply_state *rs;
         unsigned long              flags;
@@ -651,13 +651,13 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
                  * in mds_steal_ack_locks()  */
                 CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
                       " o%d NID %s\n",
-                      rs, 
+                      rs,
                       rs->rs_xid, rs->rs_transno,
-                      rs->rs_msg.opc, 
+                      rs->rs_msg.opc,
                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
         }
 
-        if ((!been_handled && rs->rs_on_net) || 
+        if ((!been_handled && rs->rs_on_net) ||
             nlocks > 0) {
                 spin_unlock_irqrestore(&svc->srv_lock, flags);
 
@@ -668,7 +668,7 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
                 }
 
                 while (nlocks-- > 0)
-                        ldlm_lock_decref(&rs->rs_locks[nlocks], 
+                        ldlm_lock_decref(&rs->rs_locks[nlocks],
                                          rs->rs_modes[nlocks]);
 
                 spin_lock_irqsave(&svc->srv_lock, flags);
@@ -680,14 +680,14 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
                 /* Off the net */
                 svc->srv_n_difficult_replies--;
                 spin_unlock_irqrestore(&svc->srv_lock, flags);
-                
+
                 class_export_put (exp);
                 rs->rs_export = NULL;
                 ptlrpc_rs_decref (rs);
                 atomic_dec (&svc->srv_outstanding_replies);
                 RETURN(1);
         }
-        
+
         /* still on the net; callback will schedule */
         spin_unlock_irqrestore (&svc->srv_lock, flags);
         RETURN(1);
@@ -696,7 +696,7 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
 #ifndef __KERNEL__
 /* FIXME make use of timeout later */
 int
-liblustre_check_services (void *arg) 
+liblustre_check_services (void *arg)
 {
         int  did_something = 0;
         int  rc;
@@ -778,11 +778,13 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
         struct ptlrpc_service  *svc = data->svc;
         struct ptlrpc_thread   *thread = data->thread;
+        struct ptlrpc_reply_state *rs;
         struct lc_watchdog     *watchdog;
         unsigned long           flags;
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
         struct group_info *ginfo = NULL;
 #endif
+        int rc = 0;
         ENTRY;
 
         lock_kernel();
@@ -799,17 +801,47 @@ static int ptlrpc_main(void *arg)
         THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
         unlock_kernel();
 
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,9) && CONFIG_NUMA
+        /* we need to do this before any per-thread allocation is done so that
+         * we get the per-thread allocations on local node.  bug 7342 */
+        if (svc->srv_cpu_affinity) {
+                int cpu, num_cpu;
+
+                for (cpu = 0, num_cpu = 0; cpu < NR_CPUS; cpu++) {
+                        if (!cpu_online(cpu))
+                                continue;
+                        if (num_cpu == thread->t_id % num_online_cpus())
+                                break;
+                        num_cpu++;
+                }
+                set_cpus_allowed(current, node_to_cpumask(cpu_to_node(cpu)));
+        }
+#endif
+
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
         ginfo = groups_alloc(0);
         if (!ginfo) {
-                thread->t_flags = SVC_RUNNING;
-                wake_up(&thread->t_ctl_waitq);
-                return (-ENOMEM);
+                rc = -ENOMEM;
+                goto out;
         }
+
         set_current_groups(ginfo);
         put_group_info(ginfo);
 #endif
 
+        if (svc->srv_init != NULL) {
+                rc = svc->srv_init(thread);
+                if (rc)
+                        goto out;
+        }
+
+        /* Alloc reply state structure for this one */
+        OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, GFP_KERNEL);
+        if (!rs) {
+                rc = -ENOMEM;
+                goto out_srv_init;
+        }
+
         /* Record that the thread is running */
         thread->t_flags = SVC_RUNNING;
         /*
@@ -823,7 +855,11 @@ static int ptlrpc_main(void *arg)
 
         spin_lock_irqsave(&svc->srv_lock, flags);
         svc->srv_nthreads++;
+        list_add(&rs->rs_list, &svc->srv_free_rs_list);
         spin_unlock_irqrestore(&svc->srv_lock, flags);
+        wake_up(&svc->srv_free_rs_waitq);
+
+        CDEBUG(D_NET, "service thread %d started\n", thread->t_id);
 
         /* XXX maintain a list of all managed devices: insert here */
 
@@ -871,12 +907,16 @@ static int ptlrpc_main(void *arg)
                 }
         }
 
+        lc_watchdog_delete(watchdog);
+
+out_srv_init:
         /*
          * deconstruct service specific state created by ptlrpc_start_thread()
          */
         if (svc->srv_done != NULL)
                 svc->srv_done(thread);
 
+out:
         spin_lock_irqsave(&svc->srv_lock, flags);
 
         svc->srv_nthreads--;                    /* must know immediately */
@@ -885,10 +925,10 @@ static int ptlrpc_main(void *arg)
 
         spin_unlock_irqrestore(&svc->srv_lock, flags);
 
-        lc_watchdog_delete(watchdog);
+        CDEBUG(D_NET, "service thread %d exiting: rc %d\n", thread->t_id, rc);
+        thread->t_id = rc;
 
-        CDEBUG(D_NET, "service thread exiting, process %d\n", current->pid);
-        return 0;
+        return rc;
 }
 
 static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
@@ -957,7 +997,6 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
         struct ptlrpc_svc_data d;
         struct ptlrpc_thread *thread;
         unsigned long flags;
-        struct ptlrpc_reply_state *rs;
         int rc;
         ENTRY;
 
@@ -966,22 +1005,10 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                 RETURN(-ENOMEM);
         init_waitqueue_head(&thread->t_ctl_waitq);
         thread->t_id = id;
-          
-        if (svc->srv_init != NULL) {
-                rc = svc->srv_init(thread);
-                if (rc != 0)
-                        RETURN(rc);
-        }
 
-        /* Alloc reply state structure for this one */
-        OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, GFP_KERNEL);
-        if (!rs)
-                RETURN(-ENOMEM);
         spin_lock_irqsave(&svc->srv_lock, flags);
-        list_add(&rs->rs_list, &svc->srv_free_rs_list);
         list_add(&thread->t_link, &svc->srv_threads);
         spin_unlock_irqrestore(&svc->srv_lock, flags);
-        wake_up(&svc->srv_free_rs_waitq);
 
         d.dev = dev;
         d.svc = svc;
@@ -999,15 +1026,14 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                 list_del(&thread->t_link);
                 spin_unlock_irqrestore(&svc->srv_lock, flags);
 
-                if (svc->srv_done != NULL)
-                        svc->srv_done(thread);
-
                 OBD_FREE(thread, sizeof(*thread));
                 RETURN(rc);
         }
-        l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
+        l_wait_event(thread->t_ctl_waitq,
+                     thread->t_flags & (SVC_RUNNING | SVC_STOPPED), &lwi);
 
-        RETURN(0);
+        rc = (thread->t_flags & SVC_STOPPED) ? thread->t_id : 0;
+        RETURN(rc);
 }
 #endif
 
@@ -1084,7 +1110,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                         list_entry(service->srv_request_queue.next,
                                    struct ptlrpc_request,
                                    rq_list);
-                
+
                 list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;