From d70d88daf0f86595dc074f80c54ea734738973f9 Mon Sep 17 00:00:00 2001 From: lincent Date: Mon, 17 Oct 2005 14:01:33 +0000 Subject: [PATCH] change portals mechnism to lnet mecnism in mountconf --- lustre/ptlrpc/client.c | 23 ++++++----- lustre/ptlrpc/connection.c | 84 +++++++++++++++++++++++-------------- lustre/ptlrpc/events.c | 39 +++++++++++------- lustre/ptlrpc/import.c | 41 ++++++++++++------ lustre/ptlrpc/lproc_ptlrpc.c | 9 ++-- lustre/ptlrpc/niobuf.c | 85 +++++++++++++++++++++++++++++++------- lustre/ptlrpc/pack_generic.c | 12 +++--- lustre/ptlrpc/ptlrpc_module.c | 1 + lustre/ptlrpc/recov_thread.c | 2 +- lustre/ptlrpc/service.c | 96 +++++++++++++++++++++++++++---------------- 10 files changed, 265 insertions(+), 127 deletions(-) diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 3016b20..a99cb48 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -49,16 +49,17 @@ void ptlrpc_init_client(int req_portal, int rep_portal, char *name, struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid) { struct ptlrpc_connection *c; - lnet_process_id_t peer; + lnet_nid_t self; + lnet_process_id_t peer; int err; - err = ptlrpc_uuid_to_peer(uuid, &peer); + err = ptlrpc_uuid_to_peer(uuid, &peer, &self); if (err != 0) { CERROR("cannot find peer %s!\n", uuid->uuid); return NULL; } - c = ptlrpc_get_connection(peer, uuid); + c = ptlrpc_get_connection(peer, self, uuid); if (c) { memcpy(c->c_remote_uuid.uuid, uuid->uuid, sizeof(c->c_remote_uuid.uuid)); @@ -72,16 +73,18 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid) void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, struct obd_uuid *uuid) { + lnet_nid_t self; lnet_process_id_t peer; - int err; + int err; - err = ptlrpc_uuid_to_peer(uuid, &peer); + err = ptlrpc_uuid_to_peer(uuid, &peer, &self); if (err != 0) { CERROR("cannot find peer %s!\n", uuid->uuid); return; } - memcpy(&conn->c_peer, &peer, sizeof (peer)); + conn->c_peer = peer; + conn->c_self = self; return; } @@ -636,7 +639,7 @@ static int after_reply(struct ptlrpc_request *req) spin_unlock_irqrestore(&imp->imp_lock, flags); req->rq_commit_cb(req); spin_lock_irqsave(&imp->imp_lock, flags); - } + } if (req->rq_transno > imp->imp_max_transno) imp->imp_max_transno = req->rq_transno; @@ -1494,13 +1497,13 @@ restart: lwi = LWI_INTR(interrupted_request, req); rc = l_wait_event(req->rq_reply_waitq, (req->rq_send_state == imp->imp_state || - req->rq_err), + req->rq_err || req->rq_intr), &lwi); - DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)", + DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)", current->comm, ptlrpc_import_state_name(imp->imp_state), ptlrpc_import_state_name(req->rq_send_state), - req->rq_err); + req->rq_err, req->rq_intr); spin_lock_irqsave(&imp->imp_lock, flags); list_del_init(&req->rq_list); diff --git a/lustre/ptlrpc/connection.c b/lustre/ptlrpc/connection.c index 4bd52d2..fc55e25 100644 --- a/lustre/ptlrpc/connection.c +++ b/lustre/ptlrpc/connection.c @@ -46,61 +46,85 @@ void ptlrpc_dump_connections(void) list_for_each(tmp, &conn_list) { c = list_entry(tmp, struct ptlrpc_connection, c_link); - CERROR("Connection %p/%s has refcount %d (nid=%s)\n", + CERROR("Connection %p/%s has refcount %d (nid=%s->%s)\n", c, c->c_remote_uuid.uuid, atomic_read(&c->c_refcount), + libcfs_nid2str(c->c_self), libcfs_nid2str(c->c_peer.nid)); } EXIT; } -struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer, - struct obd_uuid *uuid) +struct ptlrpc_connection* +ptlrpc_lookup_conn_locked (lnet_process_id_t peer) { - struct list_head *tmp, *pos; struct ptlrpc_connection *c; - ENTRY; + struct list_head *tmp; - CDEBUG(D_INFO, "peer is %s\n", libcfs_id2str(peer)); - - spin_lock(&conn_lock); list_for_each(tmp, &conn_list) { c = list_entry(tmp, struct ptlrpc_connection, c_link); - if (memcmp(&peer, &c->c_peer, sizeof(peer)) == 0) { - ptlrpc_connection_addref(c); - GOTO(out, c); - } + + if (peer.nid == c->c_peer.nid && + peer.pid == c->c_peer.pid) + return ptlrpc_connection_addref(c); } - list_for_each_safe(tmp, pos, &conn_unused_list) { + list_for_each(tmp, &conn_unused_list) { c = list_entry(tmp, struct ptlrpc_connection, c_link); - if (memcmp(&peer, &c->c_peer, sizeof(peer)) == 0) { - ptlrpc_connection_addref(c); + + if (peer.nid == c->c_peer.nid && + peer.pid == c->c_peer.pid) { list_del(&c->c_link); list_add(&c->c_link, &conn_list); - GOTO(out, c); + return ptlrpc_connection_addref(c); } } - /* FIXME: this should be a slab once we can validate slab addresses - * without OOPSing */ - OBD_ALLOC_GFP(c, sizeof(*c), GFP_ATOMIC); - + return NULL; +} + + +struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer, + lnet_nid_t self, struct obd_uuid *uuid) +{ + struct ptlrpc_connection *c; + struct ptlrpc_connection *c2; + ENTRY; + + CDEBUG(D_INFO, "self %s peer %s\n", + libcfs_nid2str(self), libcfs_id2str(peer)); + + spin_lock(&conn_lock); + + c = ptlrpc_lookup_conn_locked(peer); + + spin_unlock(&conn_lock); + + if (c != NULL) + RETURN (c); + + OBD_ALLOC(c, sizeof(*c)); if (c == NULL) - GOTO(out, c); + RETURN (NULL); - if (uuid && uuid->uuid) /* XXX ???? */ + atomic_set(&c->c_refcount, 1); + c->c_peer = peer; + c->c_self = self; + if (uuid != NULL) obd_str2uuid(&c->c_remote_uuid, uuid->uuid); - atomic_set(&c->c_refcount, 0); - memcpy(&c->c_peer, &peer, sizeof(c->c_peer)); - - ptlrpc_connection_addref(c); - list_add(&c->c_link, &conn_list); + spin_lock(&conn_lock); - EXIT; - out: + c2 = ptlrpc_lookup_conn_locked(peer); + if (c2 == NULL) + list_add(&c->c_link, &conn_list); + spin_unlock(&conn_lock); - return c; + + if (c2 == NULL) + RETURN (c); + + OBD_FREE(c, sizeof(*c)); + RETURN (c2); } int ptlrpc_put_connection(struct ptlrpc_connection *c) diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 5c77baa..da6f8ad 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -202,6 +202,7 @@ void request_in_callback(lnet_event_t *ev) req->rq_reqlen = ev->mlength; do_gettimeofday(&req->rq_arrival_time); req->rq_peer = ev->initiator; + req->rq_self = ev->target.nid; req->rq_rqbd = rqbd; req->rq_phase = RQ_PHASE_NEW; #if CRAY_XT3 @@ -341,7 +342,8 @@ static void ptlrpc_master_callback(lnet_event_t *ev) callback (ev); } -int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, lnet_process_id_t *peer) +int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, + lnet_process_id_t *peer, lnet_nid_t *self) { int best_dist = 0; int best_order = 0; @@ -350,17 +352,24 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, lnet_process_id_t *peer) int portals_compatibility; int dist; int order; - lnet_nid_t nid; + lnet_nid_t dst_nid; + lnet_nid_t src_nid; - portals_compatibility = LNetCtl(IOC_PORTAL_PORTALS_COMPATIBILITY, NULL); - - /* Choose the matching UUID that's closest */ - peer->pid = LUSTRE_SRV_PTL_PID; + portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL); + + peer->pid = LUSTRE_SRV_LNET_PID; - while (lustre_uuid_to_peer(uuid->uuid, &nid, count++) == 0) { - dist = LNetDist(nid, &order); + /* Choose the matching UUID that's closest */ + while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) { + dist = LNetDist(dst_nid, &src_nid, &order); if (dist < 0) continue; + + if (dist == 0) { /* local! use loopback LND */ + peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0); + rc = 0; + break; + } LASSERT (order >= 0); if (rc < 0 || @@ -374,18 +383,18 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, lnet_process_id_t *peer) * NET, so if I'm reading new config logs, or * getting configured by (new) lconf I can * still talk to old servers. */ - nid = PTL_MKNID(0, PTL_NIDADDR(nid)); + dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid)); + src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid)); } - peer->nid = nid; + peer->nid = dst_nid; + *self = src_nid; rc = 0; } } CDEBUG(D_WARNING,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer)); if (rc != 0) - LCONSOLE_ERROR("I couldn't find a NID for %s. Are the networks " - "alive? Is routing set up?\n", uuid->uuid); - + CERROR("No NID found for %s\n", uuid->uuid); return rc; } @@ -432,7 +441,7 @@ lnet_pid_t ptl_get_pid(void) #ifndef __KERNEL__ pid = getpid(); #else - pid = LUSTRE_SRV_PTL_PID; + pid = LUSTRE_SRV_LNET_PID; #endif return pid; } @@ -514,7 +523,7 @@ liblustre_check_events (int timeout) if (rc == 0) RETURN(0); - LASSERT (rc == -EOVERFLOW || rc == 0); + LASSERT (rc == -EOVERFLOW || rc == 1); /* liblustre: no asynch callback so we can't affort to miss any * events... */ diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index 87185f4..83c9d68 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -24,11 +24,6 @@ */ #define DEBUG_SUBSYSTEM S_RPC - -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #ifdef __KERNEL__ # include # include @@ -96,7 +91,6 @@ int ptlrpc_init_import(struct obd_import *imp) return 0; } -EXPORT_SYMBOL(ptlrpc_init_import); #define UUID_STR "_UUID" static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len) @@ -108,7 +102,7 @@ static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uu if (*uuid_len < strlen(UUID_STR)) return; - + if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR), UUID_STR, strlen(UUID_STR))) *uuid_len -= strlen(UUID_STR); @@ -268,7 +262,7 @@ static int import_select_connection(struct obd_import *imp) RETURN(-EINVAL); } - if (imp->imp_conn_current && + if (imp->imp_conn_current && imp->imp_conn_current->oic_item.next != &imp->imp_conn_list) { imp_conn = list_entry(imp->imp_conn_current->oic_item.next, struct obd_import_conn, oic_item); @@ -289,6 +283,10 @@ static int import_select_connection(struct obd_import *imp) dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn); class_export_put(dlmexp); + if (imp->imp_conn_current && (imp->imp_conn_current != imp_conn)) { + LCONSOLE_WARN("Changing connection for %s to %s\n", + imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid); + } imp->imp_conn_current = imp_conn; CDEBUG(D_HA, "%s: import %p using connection %s\n", imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid); @@ -377,8 +375,12 @@ int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid) aa->pcaa_peer_committed = committed_before_reconnect; aa->pcaa_initial_connect = initial_connect; - if (aa->pcaa_initial_connect) + if (aa->pcaa_initial_connect) { imp->imp_replayable = 1; + /* On an initial connect, we don't know which one of a + failover server pair is up. Don't wait long. */ + request->rq_timeout = max((int)(obd_timeout / 20), 5); + } DEBUG_REQ(D_RPCTRACE, request, "(re)connect request"); ptlrpcd_add_req(request); @@ -390,7 +392,6 @@ out: RETURN(rc); } -EXPORT_SYMBOL(ptlrpc_connect_import); static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp) { @@ -448,7 +449,7 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request, /* All imports are pingable */ imp->imp_pingable = 1; - + if (aa->pcaa_initial_connect) { if (msg_flags & MSG_CONNECT_REPLAYABLE) { CDEBUG(D_HA, "connected to replayable target: %s\n", @@ -538,7 +539,23 @@ finish: RETURN(0); } } else { + struct obd_connect_data *ocd; + + ocd = lustre_swab_repbuf(request, 0, + sizeof *ocd, lustre_swab_connect); + if (ocd == NULL) { + CERROR("Wrong connect data from server\n"); + rc = -EPROTO; + GOTO(out, rc); + } spin_lock_irqsave(&imp->imp_lock, flags); + /* + * check that server granted subset of flags we asked for. + */ + LASSERT((ocd->ocd_connect_flags & + imp->imp_connect_data.ocd_connect_flags) == + ocd->ocd_connect_flags); + imp->imp_connect_data = *ocd; if (imp->imp_conn_current != NULL) { list_del(&imp->imp_conn_current->oic_item); list_add(&imp->imp_conn_current->oic_item, @@ -581,7 +598,7 @@ static int completed_replay_interpret(struct ptlrpc_request *req, ptlrpc_import_recovery_state_machine(req->rq_import); } else { CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, " - "reconnecting\n", + "reconnecting\n", req->rq_import->imp_obd->obd_name, req->rq_status); ptlrpc_connect_import(req->rq_import, NULL); } diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index a4393cc..39491f4 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -74,6 +74,8 @@ struct ll_rpc_opcode { { MDS_SET_INFO, "mds_set_info" }, { MDS_QUOTACHECK, "mds_quotacheck" }, { MDS_QUOTACTL, "mds_quotactl" }, + // { MDS_GETXATTR, "mds_getxattr" }, + // { MDS_SETXATTR, "mds_setxattr" }, { LDLM_ENQUEUE, "ldlm_enqueue" }, { LDLM_CONVERT, "ldlm_convert" }, { LDLM_CANCEL, "ldlm_cancel" }, @@ -334,9 +336,10 @@ static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter) * must be just as careful as the service's request * parser. Currently I only print stuff here I know is OK * to look at coz it was set up in request_in_callback()!!! */ - seq_printf(s, LPD64":%s:"LPD64":%d:%s ", - req->rq_history_seq, libcfs_id2str(req->rq_peer), - req->rq_xid, req->rq_reqlen,ptlrpc_rqphase2str(req)); + seq_printf(s, LPD64":%s:%s:"LPD64":%d:%s ", + req->rq_history_seq, libcfs_nid2str(req->rq_self), + libcfs_id2str(req->rq_peer), req->rq_xid, + req->rq_reqlen,ptlrpc_rqphase2str(req)); if (svc->srv_request_history_print_fn == NULL) seq_printf(s, "\n"); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 3698319..e1b3219 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -68,7 +68,8 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len, CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n", len, portal, xid); - rc = LNetPut (*mdh, ack, conn->c_peer, portal, xid, 0, 0); + rc = LNetPut (conn->c_self, *mdh, ack, + conn->c_peer, portal, xid, 0, 0); if (rc != 0) { int rc2; /* We're going to get an UNLINK event when I unlink below, @@ -85,11 +86,11 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len, int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc) { - int rc; - int rc2; - lnet_process_id_t peer; - lnet_md_t md; - __u64 xid; + struct ptlrpc_connection *conn = desc->bd_export->exp_connection; + int rc; + int rc2; + lnet_md_t md; + __u64 xid; ENTRY; if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET)) @@ -100,7 +101,6 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc) LASSERT (desc->bd_type == BULK_PUT_SOURCE || desc->bd_type == BULK_GET_SINK); desc->bd_success = 0; - peer = desc->bd_export->exp_connection->c_peer; md.user_ptr = &desc->bd_cbid; md.eq_handle = ptlrpc_eq_h; @@ -125,24 +125,25 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc) xid = desc->bd_req->rq_xid; CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d " "id %s xid "LPX64"\n", desc->bd_iov_count, - desc->bd_nob, desc->bd_portal, libcfs_id2str(peer), xid); + desc->bd_nob, desc->bd_portal, + libcfs_id2str(conn->c_peer), xid); /* Network is about to get at the memory */ desc->bd_network_rw = 1; if (desc->bd_type == BULK_PUT_SOURCE) - rc = LNetPut (desc->bd_md_h, LNET_ACK_REQ, peer, - desc->bd_portal, xid, 0, 0); + rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ, + conn->c_peer, desc->bd_portal, xid, 0, 0); else - rc = LNetGet (desc->bd_md_h, peer, - desc->bd_portal, xid, 0); + rc = LNetGet (conn->c_self, desc->bd_md_h, + conn->c_peer, desc->bd_portal, xid, 0); if (rc != 0) { /* Can't send, so we unlink the MD bound above. The UNLINK * event this creates will signal completion with failure, * so we return SUCCESS here! */ CERROR("Transfer(%s, %d, "LPX64") failed: %d\n", - libcfs_id2str(peer), desc->bd_portal, xid, rc); + libcfs_id2str(conn->c_peer), desc->bd_portal, xid, rc); rc2 = LNetMDUnlink(desc->bd_md_h); LASSERT (rc2 == 0); } @@ -335,7 +336,7 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult) req->rq_repmsg->opc = req->rq_reqmsg->opc; if (req->rq_export == NULL) - conn = ptlrpc_get_connection(req->rq_peer, NULL); + conn = ptlrpc_get_connection(req->rq_peer, req->rq_self, NULL); else conn = ptlrpc_connection_addref(req->rq_export->exp_connection); @@ -376,6 +377,62 @@ int ptlrpc_error(struct ptlrpc_request *req) RETURN(rc); } +int ptl_send_rpc_nowait(struct ptlrpc_request *request) +{ + int rc; + struct ptlrpc_connection *connection; + unsigned long flags; + ENTRY; + + LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST); + + if (request->rq_import->imp_obd && + request->rq_import->imp_obd->obd_fail) { + CDEBUG(D_HA, "muting rpc for failed imp obd %s\n", + request->rq_import->imp_obd->obd_name); + /* this prevents us from waiting in ptlrpc_queue_wait */ + request->rq_err = 1; + RETURN(-ENODEV); + } + + connection = request->rq_import->imp_connection; + + request->rq_reqmsg->handle = request->rq_import->imp_remote_handle; + request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST; + request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt; + + spin_lock_irqsave (&request->rq_lock, flags); + /* If the MD attach succeeds, there _will_ be a reply_in callback */ + request->rq_receiving_reply = 0; + /* Clear any flags that may be present from previous sends. */ + request->rq_replied = 0; + request->rq_err = 0; + request->rq_timedout = 0; + request->rq_net_err = 0; + request->rq_resend = 0; + request->rq_restart = 0; + spin_unlock_irqrestore (&request->rq_lock, flags); + + ptlrpc_request_addref(request); /* +1 ref for the SENT callback */ + + request->rq_sent = CURRENT_SECONDS; + ptlrpc_pinger_sending_on_import(request->rq_import); + rc = ptl_send_buf(&request->rq_req_md_h, + request->rq_reqmsg, request->rq_reqlen, + LNET_NOACK_REQ, &request->rq_req_cbid, + connection, + request->rq_request_portal, + request->rq_xid); + if (rc == 0) { + ptlrpc_lprocfs_rpc_sent(request); + } else { + ptlrpc_req_finished (request); /* drop callback ref */ + } + + return rc; +} + + int ptl_send_rpc(struct ptlrpc_request *request) { int rc; diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index bf07199..1abbdab 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -850,8 +850,8 @@ void lustre_swab_qdata(struct qunit_data *d) void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' - * running on Linux schnapps.adilger.int 2.4.28 #2 Thu Dec 16 14:35:03 MST 2004 i686 i686 i38 - * with gcc version 3.3.2 20040108 (Red Hat Linux 3.3.2-6) */ + * running on Linux mustang 2.6.12-1.1456_FC4smp #1 SMP Thu Sep 22 02:22:14 EDT 2005 i686 i68 + * with gcc version 4.0.1 20050727 (Red Hat 4.0.1-5) */ /* Constants... */ @@ -951,7 +951,7 @@ void lustre_assert_wire_constants(void) (long long)MDS_QUOTACHECK); LASSERTF(MDS_QUOTACTL == 48, " found %lld\n", (long long)MDS_QUOTACTL); - LASSERTF(MDS_LAST_OPC == 49, " found %lld\n", + LASSERTF(MDS_LAST_OPC == 51, " found %lld\n", (long long)MDS_LAST_OPC); LASSERTF(REINT_SETATTR == 1, " found %lld\n", (long long)REINT_SETATTR); @@ -1503,6 +1503,8 @@ void lustre_assert_wire_constants(void) (long long)OBD_BRW_SYNC); LASSERTF(OBD_BRW_FROM_GRANT == 32, " found %lld\n", (long long)OBD_BRW_FROM_GRANT); + LASSERTF(OBD_BRW_NOQUOTA == 256, " found %lld\n", + (long long)OBD_BRW_NOQUOTA); /* Checks for struct ost_body */ LASSERTF((int)sizeof(struct ost_body) == 208, " found %lld\n", @@ -1957,10 +1959,6 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ldlm_flock, end)); LASSERTF((int)sizeof(((struct ldlm_flock *)0)->end) == 8, " found %lld\n", (long long)(int)sizeof(((struct ldlm_flock *)0)->end)); - LASSERTF((int)offsetof(struct ldlm_flock, blocking_export) == 16, " found %lld\n", - (long long)(int)offsetof(struct ldlm_flock, blocking_export)); - LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_export) == 8, " found %lld\n", - (long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_export)); LASSERTF((int)offsetof(struct ldlm_flock, blocking_pid) == 24, " found %lld\n", (long long)(int)offsetof(struct ldlm_flock, blocking_pid)); LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_pid) == 4, " found %lld\n", diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 4df0fe7..35d91ee 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -95,6 +95,7 @@ EXPORT_SYMBOL(ptlrpc_reply); EXPORT_SYMBOL(ptlrpc_error); EXPORT_SYMBOL(ptlrpc_resend_req); EXPORT_SYMBOL(ptl_send_rpc); +EXPORT_SYMBOL(ptl_send_rpc_nowait); /* client.c */ EXPORT_SYMBOL(ptlrpc_init_client); diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index 2218b09..1c6ada7 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -515,7 +515,7 @@ static int log_process_thread(void *args) SIGNAL_MASK_UNLOCK(current, flags); unlock_kernel(); - rc = llog_create(ctxt, &llh, &logid, NULL, NULL); + rc = llog_create(ctxt, &llh, &logid, NULL); if (rc) { CERROR("llog_create failed %d\n", rc); RETURN(rc); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 6f43533..af61984 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -97,7 +97,7 @@ ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd) { struct ptlrpc_service *svc = rqbd->rqbd_service; unsigned long flags; - + LASSERT (rqbd->rqbd_refcount == 0); LASSERT (list_empty(&rqbd->rqbd_reqs)); @@ -414,7 +414,7 @@ ptlrpc_server_free_request(struct ptlrpc_request *req) spin_lock_irqsave(&svc->srv_lock, flags); - /* schedule request buffer for re-use. + /* schedule request buffer for re-use. * NB I can only do this after I've disposed of their * reqs; particularly the embedded req */ list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds); @@ -598,7 +598,7 @@ put_conn: } static int -ptlrpc_server_handle_reply (struct ptlrpc_service *svc) +ptlrpc_server_handle_reply (struct ptlrpc_service *svc) { struct ptlrpc_reply_state *rs; unsigned long flags; @@ -651,13 +651,13 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) * in mds_steal_ack_locks() */ CWARN("All locks stolen from rs %p x"LPD64".t"LPD64 " o%d NID %s\n", - rs, + rs, rs->rs_xid, rs->rs_transno, - rs->rs_msg.opc, + rs->rs_msg.opc, libcfs_nid2str(exp->exp_connection->c_peer.nid)); } - if ((!been_handled && rs->rs_on_net) || + if ((!been_handled && rs->rs_on_net) || nlocks > 0) { spin_unlock_irqrestore(&svc->srv_lock, flags); @@ -668,7 +668,7 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) } while (nlocks-- > 0) - ldlm_lock_decref(&rs->rs_locks[nlocks], + ldlm_lock_decref(&rs->rs_locks[nlocks], rs->rs_modes[nlocks]); spin_lock_irqsave(&svc->srv_lock, flags); @@ -680,14 +680,14 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) /* Off the net */ svc->srv_n_difficult_replies--; spin_unlock_irqrestore(&svc->srv_lock, flags); - + class_export_put (exp); rs->rs_export = NULL; ptlrpc_rs_decref (rs); atomic_dec (&svc->srv_outstanding_replies); RETURN(1); } - + /* still on the net; callback will schedule */ spin_unlock_irqrestore (&svc->srv_lock, flags); RETURN(1); @@ -696,7 +696,7 @@ ptlrpc_server_handle_reply (struct ptlrpc_service *svc) #ifndef __KERNEL__ /* FIXME make use of timeout later */ int -liblustre_check_services (void *arg) +liblustre_check_services (void *arg) { int did_something = 0; int rc; @@ -778,11 +778,13 @@ static int ptlrpc_main(void *arg) struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg; struct ptlrpc_service *svc = data->svc; struct ptlrpc_thread *thread = data->thread; + struct ptlrpc_reply_state *rs; struct lc_watchdog *watchdog; unsigned long flags; #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4) struct group_info *ginfo = NULL; #endif + int rc = 0; ENTRY; lock_kernel(); @@ -799,17 +801,47 @@ static int ptlrpc_main(void *arg) THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name); unlock_kernel(); +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,9) && CONFIG_NUMA + /* we need to do this before any per-thread allocation is done so that + * we get the per-thread allocations on local node. bug 7342 */ + if (svc->srv_cpu_affinity) { + int cpu, num_cpu; + + for (cpu = 0, num_cpu = 0; cpu < NR_CPUS; cpu++) { + if (!cpu_online(cpu)) + continue; + if (num_cpu == thread->t_id % num_online_cpus()) + break; + num_cpu++; + } + set_cpus_allowed(current, node_to_cpumask(cpu_to_node(cpu))); + } +#endif + #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4) ginfo = groups_alloc(0); if (!ginfo) { - thread->t_flags = SVC_RUNNING; - wake_up(&thread->t_ctl_waitq); - return (-ENOMEM); + rc = -ENOMEM; + goto out; } + set_current_groups(ginfo); put_group_info(ginfo); #endif + if (svc->srv_init != NULL) { + rc = svc->srv_init(thread); + if (rc) + goto out; + } + + /* Alloc reply state structure for this one */ + OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, GFP_KERNEL); + if (!rs) { + rc = -ENOMEM; + goto out_srv_init; + } + /* Record that the thread is running */ thread->t_flags = SVC_RUNNING; /* @@ -823,7 +855,11 @@ static int ptlrpc_main(void *arg) spin_lock_irqsave(&svc->srv_lock, flags); svc->srv_nthreads++; + list_add(&rs->rs_list, &svc->srv_free_rs_list); spin_unlock_irqrestore(&svc->srv_lock, flags); + wake_up(&svc->srv_free_rs_waitq); + + CDEBUG(D_NET, "service thread %d started\n", thread->t_id); /* XXX maintain a list of all managed devices: insert here */ @@ -871,12 +907,16 @@ static int ptlrpc_main(void *arg) } } + lc_watchdog_delete(watchdog); + +out_srv_init: /* * deconstruct service specific state created by ptlrpc_start_thread() */ if (svc->srv_done != NULL) svc->srv_done(thread); +out: spin_lock_irqsave(&svc->srv_lock, flags); svc->srv_nthreads--; /* must know immediately */ @@ -885,10 +925,10 @@ static int ptlrpc_main(void *arg) spin_unlock_irqrestore(&svc->srv_lock, flags); - lc_watchdog_delete(watchdog); + CDEBUG(D_NET, "service thread %d exiting: rc %d\n", thread->t_id, rc); + thread->t_id = rc; - CDEBUG(D_NET, "service thread exiting, process %d\n", current->pid); - return 0; + return rc; } static void ptlrpc_stop_thread(struct ptlrpc_service *svc, @@ -957,7 +997,6 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, struct ptlrpc_svc_data d; struct ptlrpc_thread *thread; unsigned long flags; - struct ptlrpc_reply_state *rs; int rc; ENTRY; @@ -966,22 +1005,10 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, RETURN(-ENOMEM); init_waitqueue_head(&thread->t_ctl_waitq); thread->t_id = id; - - if (svc->srv_init != NULL) { - rc = svc->srv_init(thread); - if (rc != 0) - RETURN(rc); - } - /* Alloc reply state structure for this one */ - OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, GFP_KERNEL); - if (!rs) - RETURN(-ENOMEM); spin_lock_irqsave(&svc->srv_lock, flags); - list_add(&rs->rs_list, &svc->srv_free_rs_list); list_add(&thread->t_link, &svc->srv_threads); spin_unlock_irqrestore(&svc->srv_lock, flags); - wake_up(&svc->srv_free_rs_waitq); d.dev = dev; d.svc = svc; @@ -999,15 +1026,14 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, list_del(&thread->t_link); spin_unlock_irqrestore(&svc->srv_lock, flags); - if (svc->srv_done != NULL) - svc->srv_done(thread); - OBD_FREE(thread, sizeof(*thread)); RETURN(rc); } - l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi); + l_wait_event(thread->t_ctl_waitq, + thread->t_flags & (SVC_RUNNING | SVC_STOPPED), &lwi); - RETURN(0); + rc = (thread->t_flags & SVC_STOPPED) ? thread->t_id : 0; + RETURN(rc); } #endif @@ -1084,7 +1110,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) list_entry(service->srv_request_queue.next, struct ptlrpc_request, rq_list); - + list_del(&req->rq_list); service->srv_n_queued_reqs--; service->srv_n_active_reqs++; -- 1.8.3.1