struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
{
struct ptlrpc_connection *c;
- lnet_process_id_t peer;
+ lnet_nid_t self;
+ lnet_process_id_t peer;
int err;
- err = ptlrpc_uuid_to_peer(uuid, &peer);
+ err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
if (err != 0) {
CERROR("cannot find peer %s!\n", uuid->uuid);
return NULL;
}
- c = ptlrpc_get_connection(peer, uuid);
+ c = ptlrpc_get_connection(peer, self, uuid);
if (c) {
memcpy(c->c_remote_uuid.uuid,
uuid->uuid, sizeof(c->c_remote_uuid.uuid));
void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
struct obd_uuid *uuid)
{
+ lnet_nid_t self;
lnet_process_id_t peer;
- int err;
+ int err;
- err = ptlrpc_uuid_to_peer(uuid, &peer);
+ err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
if (err != 0) {
CERROR("cannot find peer %s!\n", uuid->uuid);
return;
}
- memcpy(&conn->c_peer, &peer, sizeof (peer));
+ conn->c_peer = peer;
+ conn->c_self = self;
return;
}
spin_unlock_irqrestore(&imp->imp_lock, flags);
req->rq_commit_cb(req);
spin_lock_irqsave(&imp->imp_lock, flags);
- }
+ }
if (req->rq_transno > imp->imp_max_transno)
imp->imp_max_transno = req->rq_transno;
lwi = LWI_INTR(interrupted_request, req);
rc = l_wait_event(req->rq_reply_waitq,
(req->rq_send_state == imp->imp_state ||
- req->rq_err),
+ req->rq_err || req->rq_intr),
&lwi);
- DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
+ DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)",
current->comm,
ptlrpc_import_state_name(imp->imp_state),
ptlrpc_import_state_name(req->rq_send_state),
- req->rq_err);
+ req->rq_err, req->rq_intr);
spin_lock_irqsave(&imp->imp_lock, flags);
list_del_init(&req->rq_list);
list_for_each(tmp, &conn_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
- CERROR("Connection %p/%s has refcount %d (nid=%s)\n",
+ CERROR("Connection %p/%s has refcount %d (nid=%s->%s)\n",
c, c->c_remote_uuid.uuid, atomic_read(&c->c_refcount),
+ libcfs_nid2str(c->c_self),
libcfs_nid2str(c->c_peer.nid));
}
EXIT;
}
-struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer,
- struct obd_uuid *uuid)
+struct ptlrpc_connection*
+ptlrpc_lookup_conn_locked (lnet_process_id_t peer)
{
- struct list_head *tmp, *pos;
struct ptlrpc_connection *c;
- ENTRY;
+ struct list_head *tmp;
- CDEBUG(D_INFO, "peer is %s\n", libcfs_id2str(peer));
-
- spin_lock(&conn_lock);
list_for_each(tmp, &conn_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
- if (memcmp(&peer, &c->c_peer, sizeof(peer)) == 0) {
- ptlrpc_connection_addref(c);
- GOTO(out, c);
- }
+
+ if (peer.nid == c->c_peer.nid &&
+ peer.pid == c->c_peer.pid)
+ return ptlrpc_connection_addref(c);
}
- list_for_each_safe(tmp, pos, &conn_unused_list) {
+ list_for_each(tmp, &conn_unused_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
- if (memcmp(&peer, &c->c_peer, sizeof(peer)) == 0) {
- ptlrpc_connection_addref(c);
+
+ if (peer.nid == c->c_peer.nid &&
+ peer.pid == c->c_peer.pid) {
list_del(&c->c_link);
list_add(&c->c_link, &conn_list);
- GOTO(out, c);
+ return ptlrpc_connection_addref(c);
}
}
- /* FIXME: this should be a slab once we can validate slab addresses
- * without OOPSing */
- OBD_ALLOC_GFP(c, sizeof(*c), GFP_ATOMIC);
-
+ return NULL;
+}
+
+
+struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer,
+ lnet_nid_t self, struct obd_uuid *uuid)
+{
+ struct ptlrpc_connection *c;
+ struct ptlrpc_connection *c2;
+ ENTRY;
+
+ CDEBUG(D_INFO, "self %s peer %s\n",
+ libcfs_nid2str(self), libcfs_id2str(peer));
+
+ spin_lock(&conn_lock);
+
+ c = ptlrpc_lookup_conn_locked(peer);
+
+ spin_unlock(&conn_lock);
+
+ if (c != NULL)
+ RETURN (c);
+
+ OBD_ALLOC(c, sizeof(*c));
if (c == NULL)
- GOTO(out, c);
+ RETURN (NULL);
- if (uuid && uuid->uuid) /* XXX ???? */
+ atomic_set(&c->c_refcount, 1);
+ c->c_peer = peer;
+ c->c_self = self;
+ if (uuid != NULL)
obd_str2uuid(&c->c_remote_uuid, uuid->uuid);
- atomic_set(&c->c_refcount, 0);
- memcpy(&c->c_peer, &peer, sizeof(c->c_peer));
-
- ptlrpc_connection_addref(c);
- list_add(&c->c_link, &conn_list);
+ spin_lock(&conn_lock);
- EXIT;
- out:
+ c2 = ptlrpc_lookup_conn_locked(peer);
+ if (c2 == NULL)
+ list_add(&c->c_link, &conn_list);
+
spin_unlock(&conn_lock);
- return c;
+
+ if (c2 == NULL)
+ RETURN (c);
+
+ OBD_FREE(c, sizeof(*c));
+ RETURN (c2);
}
int ptlrpc_put_connection(struct ptlrpc_connection *c)
req->rq_reqlen = ev->mlength;
do_gettimeofday(&req->rq_arrival_time);
req->rq_peer = ev->initiator;
+ req->rq_self = ev->target.nid;
req->rq_rqbd = rqbd;
req->rq_phase = RQ_PHASE_NEW;
#if CRAY_XT3
callback (ev);
}
-int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, lnet_process_id_t *peer)
+int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
+ lnet_process_id_t *peer, lnet_nid_t *self)
{
int best_dist = 0;
int best_order = 0;
int portals_compatibility;
int dist;
int order;
- lnet_nid_t nid;
+ lnet_nid_t dst_nid;
+ lnet_nid_t src_nid;
- portals_compatibility = LNetCtl(IOC_PORTAL_PORTALS_COMPATIBILITY, NULL);
-
- /* Choose the matching UUID that's closest */
- peer->pid = LUSTRE_SRV_PTL_PID;
+ portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
+
+ peer->pid = LUSTRE_SRV_LNET_PID;
- while (lustre_uuid_to_peer(uuid->uuid, &nid, count++) == 0) {
- dist = LNetDist(nid, &order);
+ /* Choose the matching UUID that's closest */
+ while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
+ dist = LNetDist(dst_nid, &src_nid, &order);
if (dist < 0)
continue;
+
+ if (dist == 0) { /* local! use loopback LND */
+ peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
+ rc = 0;
+ break;
+ }
LASSERT (order >= 0);
if (rc < 0 ||
* NET, so if I'm reading new config logs, or
* getting configured by (new) lconf I can
* still talk to old servers. */
- nid = PTL_MKNID(0, PTL_NIDADDR(nid));
+ dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
+ src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
}
- peer->nid = nid;
+ peer->nid = dst_nid;
+ *self = src_nid;
rc = 0;
}
}
CDEBUG(D_WARNING,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
if (rc != 0)
- LCONSOLE_ERROR("I couldn't find a NID for %s. Are the networks "
- "alive? Is routing set up?\n", uuid->uuid);
-
+ CERROR("No NID found for %s\n", uuid->uuid);
return rc;
}
#ifndef __KERNEL__
pid = getpid();
#else
- pid = LUSTRE_SRV_PTL_PID;
+ pid = LUSTRE_SRV_LNET_PID;
#endif
return pid;
}
if (rc == 0)
RETURN(0);
- LASSERT (rc == -EOVERFLOW || rc == 0);
+ LASSERT (rc == -EOVERFLOW || rc == 1);
/* liblustre: no asynch callback so we can't affort to miss any
* events... */
*/
#define DEBUG_SUBSYSTEM S_RPC
-
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
#ifdef __KERNEL__
# include <linux/config.h>
# include <linux/module.h>
return 0;
}
-EXPORT_SYMBOL(ptlrpc_init_import);
#define UUID_STR "_UUID"
static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
if (*uuid_len < strlen(UUID_STR))
return;
-
+
if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
UUID_STR, strlen(UUID_STR)))
*uuid_len -= strlen(UUID_STR);
RETURN(-EINVAL);
}
- if (imp->imp_conn_current &&
+ if (imp->imp_conn_current &&
imp->imp_conn_current->oic_item.next != &imp->imp_conn_list) {
imp_conn = list_entry(imp->imp_conn_current->oic_item.next,
struct obd_import_conn, oic_item);
dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
class_export_put(dlmexp);
+ if (imp->imp_conn_current && (imp->imp_conn_current != imp_conn)) {
+ LCONSOLE_WARN("Changing connection for %s to %s\n",
+ imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid);
+ }
imp->imp_conn_current = imp_conn;
CDEBUG(D_HA, "%s: import %p using connection %s\n",
imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid);
aa->pcaa_peer_committed = committed_before_reconnect;
aa->pcaa_initial_connect = initial_connect;
- if (aa->pcaa_initial_connect)
+ if (aa->pcaa_initial_connect) {
imp->imp_replayable = 1;
+ /* On an initial connect, we don't know which one of a
+ failover server pair is up. Don't wait long. */
+ request->rq_timeout = max((int)(obd_timeout / 20), 5);
+ }
DEBUG_REQ(D_RPCTRACE, request, "(re)connect request");
ptlrpcd_add_req(request);
RETURN(rc);
}
-EXPORT_SYMBOL(ptlrpc_connect_import);
static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
{
/* All imports are pingable */
imp->imp_pingable = 1;
-
+
if (aa->pcaa_initial_connect) {
if (msg_flags & MSG_CONNECT_REPLAYABLE) {
CDEBUG(D_HA, "connected to replayable target: %s\n",
RETURN(0);
}
} else {
+ struct obd_connect_data *ocd;
+
+ ocd = lustre_swab_repbuf(request, 0,
+ sizeof *ocd, lustre_swab_connect);
+ if (ocd == NULL) {
+ CERROR("Wrong connect data from server\n");
+ rc = -EPROTO;
+ GOTO(out, rc);
+ }
spin_lock_irqsave(&imp->imp_lock, flags);
+ /*
+ * check that server granted subset of flags we asked for.
+ */
+ LASSERT((ocd->ocd_connect_flags &
+ imp->imp_connect_data.ocd_connect_flags) ==
+ ocd->ocd_connect_flags);
+ imp->imp_connect_data = *ocd;
if (imp->imp_conn_current != NULL) {
list_del(&imp->imp_conn_current->oic_item);
list_add(&imp->imp_conn_current->oic_item,
ptlrpc_import_recovery_state_machine(req->rq_import);
} else {
CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
- "reconnecting\n",
+ "reconnecting\n",
req->rq_import->imp_obd->obd_name, req->rq_status);
ptlrpc_connect_import(req->rq_import, NULL);
}
{ MDS_SET_INFO, "mds_set_info" },
{ MDS_QUOTACHECK, "mds_quotacheck" },
{ MDS_QUOTACTL, "mds_quotactl" },
+ // { MDS_GETXATTR, "mds_getxattr" },
+ // { MDS_SETXATTR, "mds_setxattr" },
{ LDLM_ENQUEUE, "ldlm_enqueue" },
{ LDLM_CONVERT, "ldlm_convert" },
{ LDLM_CANCEL, "ldlm_cancel" },
* must be just as careful as the service's request
* parser. Currently I only print stuff here I know is OK
* to look at coz it was set up in request_in_callback()!!! */
- seq_printf(s, LPD64":%s:"LPD64":%d:%s ",
- req->rq_history_seq, libcfs_id2str(req->rq_peer),
- req->rq_xid, req->rq_reqlen,ptlrpc_rqphase2str(req));
+ seq_printf(s, LPD64":%s:%s:"LPD64":%d:%s ",
+ req->rq_history_seq, libcfs_nid2str(req->rq_self),
+ libcfs_id2str(req->rq_peer), req->rq_xid,
+ req->rq_reqlen,ptlrpc_rqphase2str(req));
if (svc->srv_request_history_print_fn == NULL)
seq_printf(s, "\n");
CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
len, portal, xid);
- rc = LNetPut (*mdh, ack, conn->c_peer, portal, xid, 0, 0);
+ rc = LNetPut (conn->c_self, *mdh, ack,
+ conn->c_peer, portal, xid, 0, 0);
if (rc != 0) {
int rc2;
/* We're going to get an UNLINK event when I unlink below,
int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
{
- int rc;
- int rc2;
- lnet_process_id_t peer;
- lnet_md_t md;
- __u64 xid;
+ struct ptlrpc_connection *conn = desc->bd_export->exp_connection;
+ int rc;
+ int rc2;
+ lnet_md_t md;
+ __u64 xid;
ENTRY;
if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET))
LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
desc->bd_type == BULK_GET_SINK);
desc->bd_success = 0;
- peer = desc->bd_export->exp_connection->c_peer;
md.user_ptr = &desc->bd_cbid;
md.eq_handle = ptlrpc_eq_h;
xid = desc->bd_req->rq_xid;
CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
"id %s xid "LPX64"\n", desc->bd_iov_count,
- desc->bd_nob, desc->bd_portal, libcfs_id2str(peer), xid);
+ desc->bd_nob, desc->bd_portal,
+ libcfs_id2str(conn->c_peer), xid);
/* Network is about to get at the memory */
desc->bd_network_rw = 1;
if (desc->bd_type == BULK_PUT_SOURCE)
- rc = LNetPut (desc->bd_md_h, LNET_ACK_REQ, peer,
- desc->bd_portal, xid, 0, 0);
+ rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ,
+ conn->c_peer, desc->bd_portal, xid, 0, 0);
else
- rc = LNetGet (desc->bd_md_h, peer,
- desc->bd_portal, xid, 0);
+ rc = LNetGet (conn->c_self, desc->bd_md_h,
+ conn->c_peer, desc->bd_portal, xid, 0);
if (rc != 0) {
/* Can't send, so we unlink the MD bound above. The UNLINK
* event this creates will signal completion with failure,
* so we return SUCCESS here! */
CERROR("Transfer(%s, %d, "LPX64") failed: %d\n",
- libcfs_id2str(peer), desc->bd_portal, xid, rc);
+ libcfs_id2str(conn->c_peer), desc->bd_portal, xid, rc);
rc2 = LNetMDUnlink(desc->bd_md_h);
LASSERT (rc2 == 0);
}
req->rq_repmsg->opc = req->rq_reqmsg->opc;
if (req->rq_export == NULL)
- conn = ptlrpc_get_connection(req->rq_peer, NULL);
+ conn = ptlrpc_get_connection(req->rq_peer, req->rq_self, NULL);
else
conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
RETURN(rc);
}
+int ptl_send_rpc_nowait(struct ptlrpc_request *request)
+{
+ int rc;
+ struct ptlrpc_connection *connection;
+ unsigned long flags;
+ ENTRY;
+
+ LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
+
+ if (request->rq_import->imp_obd &&
+ request->rq_import->imp_obd->obd_fail) {
+ CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
+ request->rq_import->imp_obd->obd_name);
+ /* this prevents us from waiting in ptlrpc_queue_wait */
+ request->rq_err = 1;
+ RETURN(-ENODEV);
+ }
+
+ connection = request->rq_import->imp_connection;
+
+ request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
+ request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST;
+ request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
+
+ spin_lock_irqsave (&request->rq_lock, flags);
+ /* If the MD attach succeeds, there _will_ be a reply_in callback */
+ request->rq_receiving_reply = 0;
+ /* Clear any flags that may be present from previous sends. */
+ request->rq_replied = 0;
+ request->rq_err = 0;
+ request->rq_timedout = 0;
+ request->rq_net_err = 0;
+ request->rq_resend = 0;
+ request->rq_restart = 0;
+ spin_unlock_irqrestore (&request->rq_lock, flags);
+
+ ptlrpc_request_addref(request); /* +1 ref for the SENT callback */
+
+ request->rq_sent = CURRENT_SECONDS;
+ ptlrpc_pinger_sending_on_import(request->rq_import);
+ rc = ptl_send_buf(&request->rq_req_md_h,
+ request->rq_reqmsg, request->rq_reqlen,
+ LNET_NOACK_REQ, &request->rq_req_cbid,
+ connection,
+ request->rq_request_portal,
+ request->rq_xid);
+ if (rc == 0) {
+ ptlrpc_lprocfs_rpc_sent(request);
+ } else {
+ ptlrpc_req_finished (request); /* drop callback ref */
+ }
+
+ return rc;
+}
+
+
int ptl_send_rpc(struct ptlrpc_request *request)
{
int rc;
void lustre_assert_wire_constants(void)
{
/* Wire protocol assertions generated by 'wirecheck'
- * running on Linux schnapps.adilger.int 2.4.28 #2 Thu Dec 16 14:35:03 MST 2004 i686 i686 i38
- * with gcc version 3.3.2 20040108 (Red Hat Linux 3.3.2-6) */
+ * running on Linux mustang 2.6.12-1.1456_FC4smp #1 SMP Thu Sep 22 02:22:14 EDT 2005 i686 i68
+ * with gcc version 4.0.1 20050727 (Red Hat 4.0.1-5) */
/* Constants... */
(long long)MDS_QUOTACHECK);
LASSERTF(MDS_QUOTACTL == 48, " found %lld\n",
(long long)MDS_QUOTACTL);
- LASSERTF(MDS_LAST_OPC == 49, " found %lld\n",
+ LASSERTF(MDS_LAST_OPC == 51, " found %lld\n",
(long long)MDS_LAST_OPC);
LASSERTF(REINT_SETATTR == 1, " found %lld\n",
(long long)REINT_SETATTR);
(long long)OBD_BRW_SYNC);
LASSERTF(OBD_BRW_FROM_GRANT == 32, " found %lld\n",
(long long)OBD_BRW_FROM_GRANT);
+ LASSERTF(OBD_BRW_NOQUOTA == 256, " found %lld\n",
+ (long long)OBD_BRW_NOQUOTA);
/* Checks for struct ost_body */
LASSERTF((int)sizeof(struct ost_body) == 208, " found %lld\n",
(long long)(int)offsetof(struct ldlm_flock, end));
LASSERTF((int)sizeof(((struct ldlm_flock *)0)->end) == 8, " found %lld\n",
(long long)(int)sizeof(((struct ldlm_flock *)0)->end));
- LASSERTF((int)offsetof(struct ldlm_flock, blocking_export) == 16, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_flock, blocking_export));
- LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_export) == 8, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_flock *)0)->blocking_export));
LASSERTF((int)offsetof(struct ldlm_flock, blocking_pid) == 24, " found %lld\n",
(long long)(int)offsetof(struct ldlm_flock, blocking_pid));
LASSERTF((int)sizeof(((struct ldlm_flock *)0)->blocking_pid) == 4, " found %lld\n",
EXPORT_SYMBOL(ptlrpc_error);
EXPORT_SYMBOL(ptlrpc_resend_req);
EXPORT_SYMBOL(ptl_send_rpc);
+EXPORT_SYMBOL(ptl_send_rpc_nowait);
/* client.c */
EXPORT_SYMBOL(ptlrpc_init_client);
SIGNAL_MASK_UNLOCK(current, flags);
unlock_kernel();
- rc = llog_create(ctxt, &llh, &logid, NULL, NULL);
+ rc = llog_create(ctxt, &llh, &logid, NULL);
if (rc) {
CERROR("llog_create failed %d\n", rc);
RETURN(rc);
{
struct ptlrpc_service *svc = rqbd->rqbd_service;
unsigned long flags;
-
+
LASSERT (rqbd->rqbd_refcount == 0);
LASSERT (list_empty(&rqbd->rqbd_reqs));
spin_lock_irqsave(&svc->srv_lock, flags);
- /* schedule request buffer for re-use.
+ /* schedule request buffer for re-use.
* NB I can only do this after I've disposed of their
* reqs; particularly the embedded req */
list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
}
static int
-ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
+ptlrpc_server_handle_reply (struct ptlrpc_service *svc)
{
struct ptlrpc_reply_state *rs;
unsigned long flags;
* in mds_steal_ack_locks() */
CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
" o%d NID %s\n",
- rs,
+ rs,
rs->rs_xid, rs->rs_transno,
- rs->rs_msg.opc,
+ rs->rs_msg.opc,
libcfs_nid2str(exp->exp_connection->c_peer.nid));
}
- if ((!been_handled && rs->rs_on_net) ||
+ if ((!been_handled && rs->rs_on_net) ||
nlocks > 0) {
spin_unlock_irqrestore(&svc->srv_lock, flags);
}
while (nlocks-- > 0)
- ldlm_lock_decref(&rs->rs_locks[nlocks],
+ ldlm_lock_decref(&rs->rs_locks[nlocks],
rs->rs_modes[nlocks]);
spin_lock_irqsave(&svc->srv_lock, flags);
/* Off the net */
svc->srv_n_difficult_replies--;
spin_unlock_irqrestore(&svc->srv_lock, flags);
-
+
class_export_put (exp);
rs->rs_export = NULL;
ptlrpc_rs_decref (rs);
atomic_dec (&svc->srv_outstanding_replies);
RETURN(1);
}
-
+
/* still on the net; callback will schedule */
spin_unlock_irqrestore (&svc->srv_lock, flags);
RETURN(1);
#ifndef __KERNEL__
/* FIXME make use of timeout later */
int
-liblustre_check_services (void *arg)
+liblustre_check_services (void *arg)
{
int did_something = 0;
int rc;
struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
struct ptlrpc_service *svc = data->svc;
struct ptlrpc_thread *thread = data->thread;
+ struct ptlrpc_reply_state *rs;
struct lc_watchdog *watchdog;
unsigned long flags;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
struct group_info *ginfo = NULL;
#endif
+ int rc = 0;
ENTRY;
lock_kernel();
THREAD_NAME(current->comm, sizeof(current->comm) - 1, "%s", data->name);
unlock_kernel();
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,9) && CONFIG_NUMA
+ /* we need to do this before any per-thread allocation is done so that
+ * we get the per-thread allocations on local node. bug 7342 */
+ if (svc->srv_cpu_affinity) {
+ int cpu, num_cpu;
+
+ for (cpu = 0, num_cpu = 0; cpu < NR_CPUS; cpu++) {
+ if (!cpu_online(cpu))
+ continue;
+ if (num_cpu == thread->t_id % num_online_cpus())
+ break;
+ num_cpu++;
+ }
+ set_cpus_allowed(current, node_to_cpumask(cpu_to_node(cpu)));
+ }
+#endif
+
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,4)
ginfo = groups_alloc(0);
if (!ginfo) {
- thread->t_flags = SVC_RUNNING;
- wake_up(&thread->t_ctl_waitq);
- return (-ENOMEM);
+ rc = -ENOMEM;
+ goto out;
}
+
set_current_groups(ginfo);
put_group_info(ginfo);
#endif
+ if (svc->srv_init != NULL) {
+ rc = svc->srv_init(thread);
+ if (rc)
+ goto out;
+ }
+
+ /* Alloc reply state structure for this one */
+ OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, GFP_KERNEL);
+ if (!rs) {
+ rc = -ENOMEM;
+ goto out_srv_init;
+ }
+
/* Record that the thread is running */
thread->t_flags = SVC_RUNNING;
/*
spin_lock_irqsave(&svc->srv_lock, flags);
svc->srv_nthreads++;
+ list_add(&rs->rs_list, &svc->srv_free_rs_list);
spin_unlock_irqrestore(&svc->srv_lock, flags);
+ wake_up(&svc->srv_free_rs_waitq);
+
+ CDEBUG(D_NET, "service thread %d started\n", thread->t_id);
/* XXX maintain a list of all managed devices: insert here */
}
}
+ lc_watchdog_delete(watchdog);
+
+out_srv_init:
/*
* deconstruct service specific state created by ptlrpc_start_thread()
*/
if (svc->srv_done != NULL)
svc->srv_done(thread);
+out:
spin_lock_irqsave(&svc->srv_lock, flags);
svc->srv_nthreads--; /* must know immediately */
spin_unlock_irqrestore(&svc->srv_lock, flags);
- lc_watchdog_delete(watchdog);
+ CDEBUG(D_NET, "service thread %d exiting: rc %d\n", thread->t_id, rc);
+ thread->t_id = rc;
- CDEBUG(D_NET, "service thread exiting, process %d\n", current->pid);
- return 0;
+ return rc;
}
static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
struct ptlrpc_svc_data d;
struct ptlrpc_thread *thread;
unsigned long flags;
- struct ptlrpc_reply_state *rs;
int rc;
ENTRY;
RETURN(-ENOMEM);
init_waitqueue_head(&thread->t_ctl_waitq);
thread->t_id = id;
-
- if (svc->srv_init != NULL) {
- rc = svc->srv_init(thread);
- if (rc != 0)
- RETURN(rc);
- }
- /* Alloc reply state structure for this one */
- OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, GFP_KERNEL);
- if (!rs)
- RETURN(-ENOMEM);
spin_lock_irqsave(&svc->srv_lock, flags);
- list_add(&rs->rs_list, &svc->srv_free_rs_list);
list_add(&thread->t_link, &svc->srv_threads);
spin_unlock_irqrestore(&svc->srv_lock, flags);
- wake_up(&svc->srv_free_rs_waitq);
d.dev = dev;
d.svc = svc;
list_del(&thread->t_link);
spin_unlock_irqrestore(&svc->srv_lock, flags);
- if (svc->srv_done != NULL)
- svc->srv_done(thread);
-
OBD_FREE(thread, sizeof(*thread));
RETURN(rc);
}
- l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
+ l_wait_event(thread->t_ctl_waitq,
+ thread->t_flags & (SVC_RUNNING | SVC_STOPPED), &lwi);
- RETURN(0);
+ rc = (thread->t_flags & SVC_STOPPED) ? thread->t_id : 0;
+ RETURN(rc);
}
#endif
list_entry(service->srv_request_queue.next,
struct ptlrpc_request,
rq_list);
-
+
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;