return;
}
-extern ptl_handle_ni_t tcpnal_ni;
-
void *inter_module_get(char *arg);
/* cheats for now */
static inline void portals_run_lbug_upcall(char *file, const char *fn,
const int l){}
-#define LBUG() \
- do { \
- printf("!!!LBUG at %s:%d\n", __FILE__, __LINE__); \
- sleep(1000000); \
- } while (0)
-
-
-
/* completion */
struct completion {
unsigned int done;
#define PTLBD_BUFSIZE (32 * 1024)
#define PTLBD_MAXREQSIZE 1024
-struct ptlrpc_peer {
- ptl_process_id_t peer_id;
- struct ptlrpc_ni *peer_ni;
-};
-
struct ptlrpc_connection {
struct list_head c_link;
- struct ptlrpc_peer c_peer;
+ ptl_process_id_t c_peer;
struct obd_uuid c_remote_uuid;
atomic_t c_refcount;
};
#define RS_DEBUG 1
struct ptlrpc_reply_state {
- struct ptlrpc_cb_id rs_cb_id;
- struct list_head rs_list;
- struct list_head rs_exp_list;
- struct list_head rs_obd_list;
+ struct ptlrpc_cb_id rs_cb_id;
+ struct list_head rs_list;
+ struct list_head rs_exp_list;
+ struct list_head rs_obd_list;
#if RS_DEBUG
- struct list_head rs_debug_list;
+ struct list_head rs_debug_list;
#endif
/* updates to following flag serialised by srv_request_lock */
- unsigned int rs_difficult:1; /* ACK/commit stuff */
- unsigned int rs_scheduled:1; /* being handled? */
- unsigned int rs_scheduled_ever:1; /* any schedule attempts? */
- unsigned int rs_handled:1; /* been handled yet? */
- unsigned int rs_on_net:1; /* reply_out_callback pending? */
-
- int rs_size;
- __u64 rs_transno;
- __u64 rs_xid;
- struct obd_export *rs_export;
- struct ptlrpc_srv_ni *rs_srv_ni;
- ptl_handle_md_t rs_md_h;
- atomic_t rs_refcount;
+ unsigned int rs_difficult:1; /* ACK/commit stuff */
+ unsigned int rs_scheduled:1; /* being handled? */
+ unsigned int rs_scheduled_ever:1; /* any schedule attempts? */
+ unsigned int rs_handled:1; /* been handled yet? */
+ unsigned int rs_on_net:1; /* reply_out_callback pending? */
+
+ int rs_size;
+ __u64 rs_transno;
+ __u64 rs_xid;
+ struct obd_export *rs_export;
+ struct ptlrpc_service *rs_service;
+ ptl_handle_md_t rs_md_h;
+ atomic_t rs_refcount;
/* locks awaiting client reply ACK */
- int rs_nlocks;
- struct lustre_handle rs_locks[RS_MAX_LOCKS];
- ldlm_mode_t rs_modes[RS_MAX_LOCKS];
+ int rs_nlocks;
+ struct lustre_handle rs_locks[RS_MAX_LOCKS];
+ ldlm_mode_t rs_modes[RS_MAX_LOCKS];
/* last member: variable sized reply message */
- struct lustre_msg rs_msg;
+ struct lustre_msg rs_msg;
};
struct ptlrpc_request {
wait_queue_head_t rq_reply_waitq;
struct ptlrpc_cb_id rq_reply_cbid;
- struct ptlrpc_peer rq_peer; /* XXX see service.c can this be removed? */
- char rq_peerstr[PTL_NALFMT_SIZE];
+ ptl_process_id_t rq_peer;
struct obd_export *rq_export;
struct obd_import *rq_import;
struct ptlrpc_request_buffer_desc {
struct list_head rqbd_list;
struct list_head rqbd_reqs;
- struct ptlrpc_srv_ni *rqbd_srv_ni;
+ struct ptlrpc_service *rqbd_service;
ptl_handle_md_t rqbd_md_h;
int rqbd_refcount;
char *rqbd_buffer;
struct ptlrpc_request rqbd_req;
};
-/* event queues are per-ni, because one day we may get a hardware
- * supported NAL that delivers events asynchonously wrt kernel portals
- * into the eq.
- */
-struct ptlrpc_ni { /* Generic interface state */
- char *pni_name;
- int pni_number;
- ptl_handle_ni_t pni_ni_h;
- ptl_handle_eq_t pni_eq_h;
-};
-
-struct ptlrpc_srv_ni {
- /* Interface-specific service state */
- struct ptlrpc_service *sni_service; /* owning service */
- struct ptlrpc_ni *sni_ni; /* network interface */
- struct list_head sni_active_rqbds; /* req buffers receiving */
- struct list_head sni_active_replies; /* all the active replies */
- int sni_nrqbd_receiving; /* # posted request buffers */
-};
-
typedef int (*svc_handler_t)(struct ptlrpc_request *req);
typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *);
svcreq_printfn_t srv_request_history_print_fn; /* service-specific print fn */
struct list_head srv_idle_rqbds; /* request buffers to be reposted */
+ struct list_head srv_active_rqbds; /* req buffers receiving */
struct list_head srv_history_rqbds; /* request buffer history */
+ int srv_nrqbd_receiving; /* # posted request buffers */
int srv_n_history_rqbds; /* # request buffers in history */
int srv_max_history_rqbds; /* max # request buffers in history */
atomic_t srv_outstanding_replies;
+ struct list_head srv_active_replies; /* all the active replies */
struct list_head srv_reply_queue; /* replies waiting for service */
wait_queue_head_t srv_waitq; /* all threads sleep on this */
struct proc_dir_entry *srv_procroot;
struct lprocfs_stats *srv_stats;
-
- struct ptlrpc_srv_ni srv_interfaces[0];
};
-static inline char *ptlrpc_peernid2str(struct ptlrpc_peer *p, char *str)
-{
- LASSERT(p->peer_ni != NULL);
- return (portals_nid2str(p->peer_ni->pni_number, p->peer_id.nid, str));
-}
-
-static inline char *ptlrpc_id2str(struct ptlrpc_peer *p, char *str)
-{
- LASSERT(p->peer_ni != NULL);
- return (portals_id2str(p->peer_ni->pni_number, p->peer_id, str));
-}
-
/* ptlrpc/events.c */
-extern struct ptlrpc_ni ptlrpc_interfaces[];
-extern int ptlrpc_ninterfaces;
-extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, struct ptlrpc_peer *peer);
+extern ptl_handle_ni_t ptlrpc_ni_h;
+extern ptl_handle_eq_t ptlrpc_eq_h;
+extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, ptl_process_id_t *peer);
extern void request_out_callback (ptl_event_t *ev);
extern void reply_in_callback(ptl_event_t *ev);
extern void client_bulk_callback (ptl_event_t *ev);
extern void request_in_callback(ptl_event_t *ev);
extern void reply_out_callback(ptl_event_t *ev);
extern void server_bulk_callback (ptl_event_t *ev);
-extern int ptlrpc_default_nal(void);
/* ptlrpc/connection.c */
void ptlrpc_dump_connections(void);
void ptlrpc_readdress_connection(struct ptlrpc_connection *, struct obd_uuid *);
-struct ptlrpc_connection *ptlrpc_get_connection(struct ptlrpc_peer *peer,
+struct ptlrpc_connection *ptlrpc_get_connection(ptl_process_id_t peer,
struct obd_uuid *uuid);
int ptlrpc_put_connection(struct ptlrpc_connection *c);
struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
void class_uuid_unparse(class_uuid_t in, struct obd_uuid *out);
/* lustre_peer.c */
-int lustre_uuid_to_peer(char *uuid, __u32 *peer_nal, ptl_nid_t *peer_nid);
-int class_add_uuid(char *uuid, __u64 nid, __u32 nal);
+int lustre_uuid_to_peer(char *uuid, ptl_nid_t *peer_nid);
+int class_add_uuid(char *uuid, __u64 nid);
int class_del_uuid (char *uuid);
void class_init_uuidlist(void);
void class_exit_uuidlist(void);
/* NB the casts only avoid compiler warnings */
case 8:
snprintf(remote_uuid.uuid, sizeof remote_uuid,
- "NET_"LPX64"_UUID", (__u64)req->rq_peer.peer_id.nid);
+ "NET_"LPX64"_UUID", (__u64)req->rq_peer.nid);
break;
case 4:
snprintf(remote_uuid.uuid, sizeof remote_uuid,
- "NET_%x_UUID", (__u32)req->rq_peer.peer_id.nid);
+ "NET_%x_UUID", (__u32)req->rq_peer.nid);
break;
default:
LBUG();
if (export->exp_connection != NULL)
ptlrpc_put_connection(export->exp_connection);
- export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
+ export->exp_connection = ptlrpc_get_connection(req->rq_peer,
&remote_uuid);
if (rc == EALREADY) {
struct ptlrpc_reply_state *rs;
struct obd_device *obd;
struct obd_export *exp;
- struct ptlrpc_srv_ni *sni;
struct ptlrpc_service *svc;
- sni = req->rq_rqbd->rqbd_srv_ni;
- svc = sni->sni_service;
+ svc = req->rq_rqbd->rqbd_service;
rs = req->rq_reply_state;
if (rs == NULL || !rs->rs_difficult) {
/* must be an export if locks saved */
LASSERT (req->rq_export != NULL);
/* req/reply consistent */
- LASSERT (rs->rs_srv_ni == sni);
+ LASSERT (rs->rs_service == svc);
/* "fresh" reply */
LASSERT (!rs->rs_scheduled);
list_add_tail (&rs->rs_list, &svc->srv_reply_queue);
wake_up (&svc->srv_waitq);
} else {
- list_add (&rs->rs_list, &sni->sni_active_replies);
+ list_add (&rs->rs_list, &svc->srv_active_replies);
rs->rs_scheduled = 0; /* allow notifier to schedule */
}
void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
{
- char str[PTL_NALFMT_SIZE];
struct obd_device *obd = NULL;
if (!((portal_debug | D_ERROR) & level))
if (lock->l_conn_export != NULL)
obd = lock->l_conn_export->exp_obd;
if (lock->l_export && lock->l_export->exp_connection) {
- CDEBUG(level, " Node: NID %s on %s (rhandle: "LPX64")\n",
- ptlrpc_peernid2str(&lock->l_export->exp_connection->c_peer, str),
- lock->l_export->exp_connection->c_peer.peer_ni->pni_name,
+ CDEBUG(level, " Node: NID %s (rhandle: "LPX64")\n",
+ libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid),
lock->l_remote_handle.cookie);
} else if (obd == NULL) {
CDEBUG(level, " Node: local\n");
} else {
struct obd_import *imp = obd->u.cli.cl_import;
- CDEBUG(level, " Node: NID %s on %s (rhandle: "LPX64")\n",
- ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
- imp->imp_connection->c_peer.peer_ni->pni_name,
+ CDEBUG(level, " Node: NID %s (rhandle: "LPX64")\n",
+ libcfs_nid2str(imp->imp_connection->c_peer.nid),
lock->l_remote_handle.cookie);
}
CDEBUG(level, " Resource: %p ("LPU64"/"LPU64")\n", lock->l_resource,
static void waiting_locks_callback(unsigned long unused)
{
struct ldlm_lock *lock, *last = NULL;
- char str[PTL_NALFMT_SIZE];
spin_lock_bh(&waiting_locks_spinlock);
while (!list_empty(&waiting_locks_list)) {
LDLM_ERROR(lock, "lock callback timer expired: evicting client "
"%s@%s nid %s ",lock->l_export->exp_client_uuid.uuid,
lock->l_export->exp_connection->c_remote_uuid.uuid,
- ptlrpc_peernid2str(&lock->l_export->exp_connection->c_peer,str));
+ libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
if (lock == last) {
LDLM_ERROR(lock, "waiting on lock multiple times");
const char *ast_type)
{
struct ptlrpc_connection *conn = lock->l_export->exp_connection;
- char str[PTL_NALFMT_SIZE];
-
- ptlrpc_peernid2str(&conn->c_peer, str);
+ char *str = libcfs_nid2str(conn->c_peer.nid);
LCONSOLE_ERROR("A client on nid %s was evicted from service %s.\n",
str, lock->l_export->exp_obd->obd_name);
LDLM_ERROR(lock, "%s AST failed (%d): evicting client %s@%s NID "LPX64
" (%s)", ast_type, rc, lock->l_export->exp_client_uuid.uuid,
- conn->c_remote_uuid.uuid, conn->c_peer.peer_id.nid, str);
+ conn->c_remote_uuid.uuid, conn->c_peer.nid, str);
ptlrpc_fail_export(lock->l_export);
}
struct ptlrpc_request *req, int rc,
const char *ast_type)
{
- struct ptlrpc_peer *peer = &req->rq_import->imp_connection->c_peer;
- char str[PTL_NALFMT_SIZE];
+ ptl_process_id_t peer = req->rq_import->imp_connection->c_peer;
if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
LASSERT(lock->l_export);
if (lock->l_export->exp_libclient) {
LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
" timeout, just cancelling lock", ast_type,
- ptlrpc_peernid2str(peer, str));
+ libcfs_nid2str(peer.nid));
ldlm_lock_cancel(lock);
rc = -ERESTART;
} else {
if (rc == -EINVAL)
LDLM_DEBUG(lock, "client (nid %s) returned %d"
" from %s AST - normal race",
- ptlrpc_peernid2str(peer, str),
+ libcfs_nid2str(peer.nid),
req->rq_repmsg->status, ast_type);
else
LDLM_ERROR(lock, "client (nid %s) returned %d "
- "from %s AST", ptlrpc_peernid2str(peer, str),
+ "from %s AST", libcfs_nid2str(peer.nid),
(req->rq_repmsg != NULL) ?
req->rq_repmsg->status : 0, ast_type);
ldlm_lock_cancel(lock);
" from client %s id %s\n",
dlm_req->lock_handle1.cookie,
req->rq_export->exp_client_uuid.uuid,
- req->rq_peerstr);
+ libcfs_id2str(req->rq_peer));
LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
"(cookie "LPU64")",
dlm_req->lock_handle1.cookie);
"export cookie "LPX64"; this is "
"normal if this node rebooted with a lock held\n",
req->rq_reqmsg->opc,
- req->rq_peerstr,
+ libcfs_id2str(req->rq_peer),
req->rq_reqmsg->handle.cookie);
dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
struct ldlm_request *dlm_req;
CERROR("operation %d from %s with bad export cookie "LPU64"\n",
- req->rq_reqmsg->opc, req->rq_peerstr,
+ req->rq_reqmsg->opc, libcfs_id2str(req->rq_peer),
req->rq_reqmsg->handle.cookie);
dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
rc = ptlrpc_queue_wait(req);
if (rc == ESTALE) {
- char str[PTL_NALFMT_SIZE];
CERROR("client/server (nid %s) out of sync"
" -- not fatal\n",
- ptlrpc_peernid2str(&req->rq_import->
- imp_connection->c_peer, str));
+ libcfs_nid2str(req->rq_import->
+ imp_connection->c_peer.nid));
} else if (rc == -ETIMEDOUT) {
ptlrpc_req_finished(req);
GOTO(restart, rc);
#include "llite_lib.h"
+#error
+
unsigned int portal_subsystem_debug = ~0 - (S_PORTALS | S_NAL);
-ptl_handle_ni_t tcpnal_ni;
struct task_struct *current;
-/* portals interfaces */
-ptl_handle_ni_t *
-kportal_get_ni (int nal)
-{
- switch (nal)
- {
- case SOCKNAL:
- return &tcpnal_ni;
- default:
- return NULL;
- }
-}
-
-inline void
-kportal_put_ni (int nal)
-{
- return;
-}
-
struct ldlm_namespace;
struct ldlm_res_id;
struct obd_import;
void *inter_module_get(char *arg)
{
- if (!strcmp(arg, "tcpnal_ni"))
- return &tcpnal_ni;
- else if (!strcmp(arg, "ldlm_cli_cancel_unused"))
+ if (!strcmp(arg, "ldlm_cli_cancel_unused"))
return ldlm_cli_cancel_unused;
else if (!strcmp(arg, "ldlm_namespace_cleanup"))
return ldlm_namespace_cleanup;
}
/* XXX move to proper place */
+#error
char *portals_nid2str(int nal, ptl_nid_t nid, char *str)
{
switch(nal){
int rc;
ENTRY;
- PtlInit();
- rc = PtlNIInit(procbridge_interface, 0, 0, 0, &tcpnal_ni);
- if (rc != 0) {
- CERROR("TCPNAL: PtlNIInit failed: error %d\n", rc);
- PtlFini();
- RETURN (rc);
- }
- PtlNIDebug(tcpnal_ni, ~0);
+ rc = PtlInit();
+ if (rc != PTL_OK)
+ CERROR("PtlInit failed: error %d\n", rc);
RETURN(rc);
}
void *inter_module_get(char *arg)
{
- if (!strcmp(arg, "tcpnal_ni"))
- return &tcpnal_ni;
- else if (!strcmp(arg, "ldlm_cli_cancel_unused"))
+ if (!strcmp(arg, "ldlm_cli_cancel_unused"))
return ldlm_cli_cancel_unused;
else if (!strcmp(arg, "ldlm_namespace_cleanup"))
return ldlm_namespace_cleanup;
}
/* XXX move to proper place */
+#error
char *portals_nid2str(int nal, ptl_nid_t nid, char *str)
{
switch(nal){
return str;
}
-ptl_handle_ni_t tcpnal_ni;
-
struct pingcli_args {
ptl_nid_t mynid;
ptl_nid_t nid;
struct task_struct *current;
/* portals interfaces */
-ptl_handle_ni_t *
-kportal_get_ni (int nal)
-{
- switch (nal)
- {
- case SOCKNAL:
- return &tcpnal_ni;
- default:
- return NULL;
- }
-}
-
-inline void
-kportal_put_ni (int nal)
-{
- return;
-}
-
int
kportal_nal_cmd(struct portals_cfg *pcfg)
{
{
int rc;
- PtlInit();
- rc = PtlNIInit(procbridge_interface, 0, 0, 0, &tcpnal_ni);
- if (rc != 0) {
- CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
- PtlFini();
- RETURN (rc);
- }
- PtlNIDebug(tcpnal_ni, ~0);
+ rc = PtlInit();
+ if (rc != PTL_OK)
+ CERROR("PtlInit failed: error %d\n", rc);
return rc;
}
struct ptlrpc_reply_state *rs =
list_entry(exp->exp_outstanding_replies.next,
struct ptlrpc_reply_state, rs_exp_list);
- struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
+ struct ptlrpc_service *svc = rs->rs_service;
spin_lock(&svc->srv_lock);
list_del_init(&rs->rs_exp_list);
struct ptlrpc_reply_state *oldrep;
struct ptlrpc_service *svc;
unsigned long flags;
- char str[PTL_NALFMT_SIZE];
int i;
/* CAVEAT EMPTOR: spinlock order */
"new %d old %d\n", req->rq_xid,
req->rq_reqmsg->opc, oldrep->rs_msg.opc);
- svc = oldrep->rs_srv_ni->sni_service;
+ svc = oldrep->rs_service;
spin_lock (&svc->srv_lock);
list_del_init (&oldrep->rs_exp_list);
" o%d NID %s\n",
oldrep->rs_nlocks, oldrep,
oldrep->rs_xid, oldrep->rs_transno, oldrep->rs_msg.opc,
- ptlrpc_peernid2str(&exp->exp_connection->c_peer, str));
+ libcfs_nid2str(exp->exp_connection->c_peer.nid));
for (i = 0; i < oldrep->rs_nlocks; i++)
ptlrpc_save_lock(req,
case OBD_IOC_CLOSE_UUID: {
ptl_nid_t peer_nid;
- __u32 peer_nal;
CDEBUG(D_IOCTL, "closing all connections to uuid %s\n",
data->ioc_inlbuf1);
- lustre_uuid_to_peer(data->ioc_inlbuf1, &peer_nal, &peer_nid);
+ lustre_uuid_to_peer(data->ioc_inlbuf1, &peer_nid);
GOTO(out, err = 0);
}
#include <liblustre.h>
#include <linux/obd_class.h>
#include <linux/obd.h>
+#include <linux/lustre_mds.h>
+#include <linux/obd_ost.h>
#endif
#include <linux/lprocfs_status.h>
#include <linux/lustre_quota.h>
#include <linux/lprocfs_status.h>
struct uuid_nid_data {
- struct list_head head;
- ptl_nid_t nid;
- char *uuid;
- __u32 nal;
+ struct list_head un_list;
+ ptl_nid_t un_nid;
+ char *un_uuid;
};
/* FIXME: This should probably become more elegant than a global linked list */
class_del_uuid(NULL);
}
-int lustre_uuid_to_peer(char *uuid, __u32 *peer_nal, ptl_nid_t *peer_nid)
+int lustre_uuid_to_peer(char *uuid, ptl_nid_t *peer_nid)
{
struct list_head *tmp;
list_for_each(tmp, &g_uuid_list) {
struct uuid_nid_data *data =
- list_entry(tmp, struct uuid_nid_data, head);
+ list_entry(tmp, struct uuid_nid_data, un_list);
- if (strcmp(data->uuid, uuid) == 0) {
- *peer_nid = data->nid;
- *peer_nal = data->nal;
+ if (strcmp(data->un_uuid, uuid) == 0) {
+ *peer_nid = data->un_nid;
spin_unlock (&g_uuid_lock);
return 0;
return -1;
}
-int class_add_uuid(char *uuid, __u64 nid, __u32 nal)
+int class_add_uuid(char *uuid, __u64 nid)
{
struct uuid_nid_data *data;
int rc;
int nob = strnlen (uuid, PAGE_SIZE) + 1;
LASSERT(nid != 0);
- LASSERT(nal != 0);
if (nob > PAGE_SIZE)
return -EINVAL;
if (data == NULL)
return -ENOMEM;
- OBD_ALLOC(data->uuid, nob);
+ OBD_ALLOC(data->un_uuid, nob);
if (data == NULL) {
OBD_FREE(data, sizeof(*data));
return -ENOMEM;
}
- CDEBUG(D_INFO, "add uuid %s "LPX64" %x\n", uuid, nid, nal);
- memcpy(data->uuid, uuid, nob);
- data->nid = nid;
- data->nal = nal;
+ CDEBUG(D_INFO, "add uuid %s "LPX64"\n", uuid, nid);
+ memcpy(data->un_uuid, uuid, nob);
+ data->un_nid = nid;
spin_lock (&g_uuid_lock);
- list_add(&data->head, &g_uuid_list);
+ list_add(&data->un_list, &g_uuid_list);
spin_unlock (&g_uuid_lock);
spin_lock (&g_uuid_lock);
list_for_each_safe(tmp, n, &g_uuid_list) {
- data = list_entry(tmp, struct uuid_nid_data, head);
+ data = list_entry(tmp, struct uuid_nid_data, un_list);
- if (uuid == NULL || strcmp(data->uuid, uuid) == 0) {
- list_del (&data->head);
- list_add (&data->head, &deathrow);
+ if (uuid == NULL || strcmp(data->un_uuid, uuid) == 0) {
+ list_del (&data->un_list);
+ list_add (&data->un_list, &deathrow);
if (uuid)
break;
}
}
do {
- data = list_entry(deathrow.next, struct uuid_nid_data, head);
+ data = list_entry(deathrow.next, struct uuid_nid_data, un_list);
- list_del (&data->head);
+ list_del (&data->un_list);
- OBD_FREE(data->uuid, strlen(data->uuid) + 1);
+ OBD_FREE(data->un_uuid, strlen(data->un_uuid) + 1);
OBD_FREE(data, sizeof(*data));
} while (!list_empty (&deathrow));
int class_process_config(struct lustre_cfg *lcfg)
{
struct obd_device *obd;
- char nidstr[PTL_NALFMT_SIZE];
int err;
LASSERT(lcfg && !IS_ERR(lcfg));
case LCFG_ADD_UUID: {
CDEBUG(D_IOCTL, "adding mapping from uuid %s to nid "LPX64
" (%s), nal %x\n", lustre_cfg_string(lcfg, 1),
- lcfg->lcfg_nid,
- portals_nid2str(lcfg->lcfg_nal, lcfg->lcfg_nid, nidstr),
+ lcfg->lcfg_nid, libcfs_nid2str(lcfg->lcfg_nid),
lcfg->lcfg_nal);
- err = class_add_uuid(lustre_cfg_string(lcfg, 1), lcfg->lcfg_nid,
- lcfg->lcfg_nal);
+ err = class_add_uuid(lustre_cfg_string(lcfg, 1), lcfg->lcfg_nid);
GOTO(out, err);
}
case LCFG_DEL_UUID: {
struct ptlrpc_reply_state *rs =
list_entry(exp->exp_outstanding_replies.next,
struct ptlrpc_reply_state, rs_exp_list);
- struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
+ struct ptlrpc_service *svc = rs->rs_service;
spin_lock(&svc->srv_lock);
list_del_init(&rs->rs_exp_list);
#if CHECKSUM_BULK
if (oa->o_valid & OBD_MD_FLCKSUM) {
- const struct ptlrpc_peer *peer =
- &req->rq_import->imp_connection->c_peer;
+ const ptl_process_id_t peer =
+ req->rq_import->imp_connection->c_peer;
static int cksum_counter;
obd_count server_cksum = oa->o_cksum;
obd_count cksum = cksum_pages(rc, page_count, pga);
- char str[PTL_NALFMT_SIZE];
-
- portals_nid2str(peer->peer_ni->pni_number, peer->peer_nid, str);
+ char *str = libcfs_nid2str(peer.nid);
cksum_counter++;
if (server_cksum != cksum) {
CERROR("Bad checksum: server %x, client %x, server NID "
LPX64" (%s)\n", server_cksum, cksum,
- peer->peer_nid, str);
+ peer.nid, str);
cksum_counter = 0;
oa->o_cksum = cksum;
} else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
CWARN("Checksum %u from "LPX64" (%s) OK: %x\n",
- cksum_counter, peer->peer_nid, str, cksum);
+ cksum_counter, peer.nid, str, cksum);
}
CDEBUG(D_PAGE, "checksum %x\n", cksum);
} else {
if ((cksum_missed & (-cksum_missed)) == cksum_missed)
CERROR("Request checksum %u from "LPX64", no reply\n",
cksum_missed,
- req->rq_import->imp_connection->c_peer.peer_id.nid);
+ req->rq_import->imp_connection->c_peer.nid);
}
#endif
RETURN(0);
"evicting %s@%s id %s\n",
req->rq_export->exp_client_uuid.uuid,
req->rq_export->exp_connection->c_remote_uuid.uuid,
- req->rq_peerstr);
+ libcfs_id2str(req->rq_peer));
ptlrpc_fail_export(req->rq_export);
} else {
CERROR("ignoring bulk IO comms error: "
"client reconnected %s@%s id %s\n",
req->rq_export->exp_client_uuid.uuid,
req->rq_export->exp_connection->c_remote_uuid.uuid,
- req->rq_peerstr);
+ libcfs_id2str(req->rq_peer));
}
}
if (client_cksum != cksum) {
CERROR("Bad checksum: client %x, server %x id %s\n",
client_cksum, cksum,
- req->rq_peerstr);
+ libcfs_id2str(req->rq_peer));
cksum_counter = 1;
repbody->oa.o_cksum = cksum;
} else {
if ((cksum_counter & (-cksum_counter)) == cksum_counter)
CWARN("Checksum %u from %s: %x OK\n",
cksum_counter,
- req->rq_peerstr,
- cksum);
+ libcfs_id2str(req->rq_peer), cksum);
}
}
#endif
req->rq_export->exp_obd->obd_name,
req->rq_export->exp_client_uuid.uuid,
req->rq_export->exp_connection->c_remote_uuid.uuid,
- req->rq_peerstr);
+ libcfs_id2str(req->rq_peer));
ptlrpc_fail_export(req->rq_export);
} else {
CERROR("ignoring bulk IO comms error: "
"client reconnected %s@%s id %s\n",
req->rq_export->exp_client_uuid.uuid,
req->rq_export->exp_connection->c_remote_uuid.uuid,
- req->rq_peerstr);
+ libcfs_id2str(req->rq_peer));
}
}
RETURN(rc);
if (req->rq_export == NULL) {
CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
- req->rq_reqmsg->opc, req->rq_peerstr);
+ req->rq_reqmsg->opc, libcfs_id2str(req->rq_peer));
req->rq_status = -ENOTCONN;
GOTO(out, rc = -ENOTCONN);
}
struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
{
struct ptlrpc_connection *c;
- struct ptlrpc_peer peer;
- int err;
+ ptl_process_id_t peer;
+ int err;
err = ptlrpc_uuid_to_peer(uuid, &peer);
if (err != 0) {
return NULL;
}
- c = ptlrpc_get_connection(&peer, uuid);
+ c = ptlrpc_get_connection(peer, uuid);
if (c) {
memcpy(c->c_remote_uuid.uuid,
uuid->uuid, sizeof(c->c_remote_uuid.uuid));
void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
struct obd_uuid *uuid)
{
- struct ptlrpc_peer peer;
- int err;
+ ptl_process_id_t peer;
+ int err;
err = ptlrpc_uuid_to_peer(uuid, &peer);
if (err != 0) {
static int ptlrpc_send_new_req(struct ptlrpc_request *req)
{
- char str[PTL_NALFMT_SIZE];
struct obd_import *imp;
unsigned long flags;
int rc;
spin_unlock_irqrestore(&imp->imp_lock, flags);
req->rq_reqmsg->status = current->pid;
- CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
- " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
+ CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc"
+ " %s:%s:%d:"LPU64":%s:%d\n", current->comm,
imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
req->rq_xid,
- imp->imp_connection->c_peer.peer_ni->pni_name,
- ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
+ libcfs_nid2str(imp->imp_connection->c_peer.nid),
req->rq_reqmsg->opc);
rc = ptl_send_rpc(req);
int ptlrpc_check_set(struct ptlrpc_request_set *set)
{
- char str[PTL_NALFMT_SIZE];
unsigned long flags;
struct list_head *tmp;
int force_timer_recalc = 0;
req->rq_status);
}
- CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
- "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
+ CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
+ "opc %s:%s:%d:"LPU64":%s:%d\n", current->comm,
imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
req->rq_xid,
- imp->imp_connection->c_peer.peer_ni->pni_name,
- ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
+ libcfs_nid2str(imp->imp_connection->c_peer.nid),
req->rq_reqmsg->opc);
set->set_remaining--;
int ptlrpc_queue_wait(struct ptlrpc_request *req)
{
- char str[PTL_NALFMT_SIZE];
int rc = 0;
int brc;
struct l_wait_info lwi;
/* for distributed debugging */
req->rq_reqmsg->status = current->pid;
LASSERT(imp->imp_obd != NULL);
- CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
- "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
+ CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc "
+ "%s:%s:%d:"LPU64":%s:%d\n", current->comm,
imp->imp_obd->obd_uuid.uuid,
req->rq_reqmsg->status, req->rq_xid,
- imp->imp_connection->c_peer.peer_ni->pni_name,
- ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
+ libcfs_nid2str(imp->imp_connection->c_peer.nid),
req->rq_reqmsg->opc);
/* Mark phase here for a little debug help */
l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
DEBUG_REQ(D_NET, req, "-- done sleeping");
- CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
- "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
+ CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:opc "
+ "%s:%s:%d:"LPU64":%s:%d\n", current->comm,
imp->imp_obd->obd_uuid.uuid,
req->rq_reqmsg->status, req->rq_xid,
- imp->imp_connection->c_peer.peer_ni->pni_name,
- ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
+ libcfs_nid2str(imp->imp_connection->c_peer.nid),
req->rq_reqmsg->opc);
spin_lock_irqsave(&imp->imp_lock, flags);
void ptlrpc_dump_connections(void)
{
- char str[PTL_NALFMT_SIZE];
struct list_head *tmp;
struct ptlrpc_connection *c;
ENTRY;
list_for_each(tmp, &conn_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
- CERROR("Connection %p/%s has refcount %d (nid=%s on %s)\n",
+ CERROR("Connection %p/%s has refcount %d (nid=%s)\n",
c, c->c_remote_uuid.uuid, atomic_read(&c->c_refcount),
- ptlrpc_peernid2str(&c->c_peer, str),
- c->c_peer.peer_ni->pni_name);
+ libcfs_nid2str(c->c_peer.nid));
}
EXIT;
}
-struct ptlrpc_connection *ptlrpc_get_connection(struct ptlrpc_peer *peer,
+struct ptlrpc_connection *ptlrpc_get_connection(ptl_process_id_t peer,
struct obd_uuid *uuid)
{
- char str[PTL_NALFMT_SIZE];
struct list_head *tmp, *pos;
struct ptlrpc_connection *c;
ENTRY;
-
- CDEBUG(D_INFO, "peer is %s on %s\n",
- ptlrpc_id2str(peer, str), peer->peer_ni->pni_name);
+ CDEBUG(D_INFO, "peer is %s\n", libcfs_id2str(peer));
spin_lock(&conn_lock);
list_for_each(tmp, &conn_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
- if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0 &&
- peer->peer_ni == c->c_peer.peer_ni) {
+ if (memcmp(&peer, &c->c_peer, sizeof(peer)) == 0) {
ptlrpc_connection_addref(c);
GOTO(out, c);
}
list_for_each_safe(tmp, pos, &conn_unused_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
- if (memcmp(peer, &c->c_peer, sizeof(*peer)) == 0 &&
- peer->peer_ni == c->c_peer.peer_ni) {
+ if (memcmp(&peer, &c->c_peer, sizeof(peer)) == 0) {
ptlrpc_connection_addref(c);
list_del(&c->c_link);
list_add(&c->c_link, &conn_list);
if (uuid && uuid->uuid) /* XXX ???? */
obd_str2uuid(&c->c_remote_uuid, uuid->uuid);
atomic_set(&c->c_refcount, 0);
- memcpy(&c->c_peer, peer, sizeof(c->c_peer));
+ memcpy(&c->c_peer, &peer, sizeof(c->c_peer));
ptlrpc_connection_addref(c);
int ptlrpc_put_connection(struct ptlrpc_connection *c)
{
- char str[PTL_NALFMT_SIZE];
int rc = 0;
ENTRY;
RETURN(0);
}
- CDEBUG (D_INFO, "connection=%p refcount %d to %s on %s\n",
+ CDEBUG (D_INFO, "connection=%p refcount %d to %s\n",
c, atomic_read(&c->c_refcount) - 1,
- ptlrpc_peernid2str(&c->c_peer, str),
- c->c_peer.peer_ni->pni_name);
+ libcfs_nid2str(c->c_peer.nid));
if (atomic_dec_and_test(&c->c_refcount)) {
spin_lock(&conn_lock);
struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *c)
{
- char str[PTL_NALFMT_SIZE];
ENTRY;
atomic_inc(&c->c_refcount);
- CDEBUG (D_INFO, "connection=%p refcount %d to %s on %s\n",
+ CDEBUG (D_INFO, "connection=%p refcount %d to %s\n",
c, atomic_read(&c->c_refcount),
- ptlrpc_peernid2str(&c->c_peer, str),
- c->c_peer.peer_ni->pni_name);
+ libcfs_nid2str(c->c_peer.nid));
RETURN(c);
}
void ptlrpc_cleanup_connection(void)
{
- char str[PTL_NALFMT_SIZE];
struct list_head *tmp, *pos;
struct ptlrpc_connection *c;
}
list_for_each_safe(tmp, pos, &conn_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
- CERROR("Connection %p/%s has refcount %d (nid=%s on %s)\n",
+ CERROR("Connection %p/%s has refcount %d (nid=%s)\n",
c, c->c_remote_uuid.uuid, atomic_read(&c->c_refcount),
- ptlrpc_peernid2str(&c->c_peer, str),
- c->c_peer.peer_ni->pni_name);
+ libcfs_nid2str(c->c_peer.nid));
list_del(&c->c_link);
OBD_FREE(c, sizeof(*c));
}
static void cray_portals_callback(ptl_event_t *ev);
#endif
-
-struct ptlrpc_ni ptlrpc_interfaces[8];
-int ptlrpc_ninterfaces;
+ptl_handle_ni_t ptlrpc_ni_h;
+ptl_handle_eq_t ptlrpc_eq_h;
/*
* Client's outgoing request callback
{
struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
- struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
- struct ptlrpc_service *service = srv_ni->sni_service;
+ struct ptlrpc_service *service = rqbd->rqbd_service;
struct ptlrpc_request *req;
- char str[PTL_NALFMT_SIZE];
unsigned long flags;
ENTRY;
CERROR("Can't allocate incoming request descriptor: "
"Dropping %s RPC from %s\n",
service->srv_name,
- portals_id2str(srv_ni->sni_ni->pni_number,
- ev->initiator, str));
+ libcfs_id2str(ev->initiator));
return;
}
}
ev->ni_fail_type == PTL_NI_OK)
req->rq_reqlen = ev->mlength;
do_gettimeofday(&req->rq_arrival_time);
- req->rq_peer.peer_id = ev->initiator;
- req->rq_peer.peer_ni = rqbd->rqbd_srv_ni->sni_ni;
- ptlrpc_id2str(&req->rq_peer, req->rq_peerstr);
+ req->rq_peer = ev->initiator;
req->rq_rqbd = rqbd;
req->rq_phase = RQ_PHASE_NEW;
list_add_tail(&req->rq_history_list, &service->srv_request_history);
if (ev->unlinked) {
- srv_ni->sni_nrqbd_receiving--;
+ service->srv_nrqbd_receiving--;
if (ev->type != PTL_EVENT_UNLINK &&
- srv_ni->sni_nrqbd_receiving == 0) {
- /* This service is off-air on this interface because
- * all its request buffers are busy. Portals will
- * start dropping incoming requests until more buffers
- * get posted. NB don't moan if it's because we're
- * tearing down the service. */
- CWARN("All %s %s request buffers busy\n",
- service->srv_name, srv_ni->sni_ni->pni_name);
+ service->srv_nrqbd_receiving == 0) {
+ /* This service is off-air because all its request
+ * buffers are busy. Portals will start dropping
+ * incoming requests until more buffers get posted.
+ * NB don't moan if it's because we're tearing down the
+ * service. */
+ CWARN("All %s request buffers busy\n",
+ service->srv_name);
}
/* req takes over the network's ref on rqbd */
} else {
{
struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
struct ptlrpc_reply_state *rs = cbid->cbid_arg;
- struct ptlrpc_srv_ni *sni = rs->rs_srv_ni;
- struct ptlrpc_service *svc = sni->sni_service;
+ struct ptlrpc_service *svc = rs->rs_service;
unsigned long flags;
ENTRY;
callback (ev);
}
-int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
+int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, ptl_process_id_t *peer)
{
- struct ptlrpc_ni *pni;
- __u32 peer_nal;
- ptl_nid_t peer_nid;
- int i;
- char str[PTL_NALFMT_SIZE];
- int rc;
-
- ENTRY;
-
- rc = lustre_uuid_to_peer (uuid->uuid, &peer_nal, &peer_nid);
-
- if (rc != 0)
- RETURN (rc);
-
- for (i = 0; i < ptlrpc_ninterfaces; i++) {
- pni = &ptlrpc_interfaces[i];
-
- if (pni->pni_number == peer_nal) {
- peer->peer_id.nid = peer_nid;
- peer->peer_id.pid = LUSTRE_SRV_PTL_PID;
- peer->peer_ni = pni;
- RETURN(0);
- }
- }
-
- CERROR("Can't find ptlrpc interface for NAL %x, NID %s\n",
- peer_nal, portals_nid2str(peer_nal, peer_nid, str));
- return (-ENOENT);
+ peer->pid = LUSTRE_SRV_PTL_PID;
+ return lustre_uuid_to_peer (uuid->uuid, &peer->nid);
}
-void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
+void ptlrpc_ni_fini(void)
{
wait_queue_head_t waitq;
struct l_wait_info lwi;
* replies */
for (retries = 0;; retries++) {
- rc = PtlEQFree(pni->pni_eq_h);
+ rc = PtlEQFree(ptlrpc_eq_h);
switch (rc) {
default:
LBUG();
case PTL_OK:
- PtlNIFini(pni->pni_ni_h);
+ PtlNIFini(ptlrpc_ni_h);
return;
case PTL_EQ_IN_USE:
if (retries != 0)
- CWARN("Event queue for %s still busy\n",
- pni->pni_name);
+ CWARN("Event queue still busy\n");
/* Wait for a bit */
init_waitqueue_head(&waitq);
return pid;
}
-int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
+int ptlrpc_ni_init(void)
{
int rc;
char str[20];
- ptl_handle_ni_t nih;
ptl_pid_t pid;
pid = ptl_get_pid();
-
+
/* We're not passing any limits yet... */
- rc = PtlNIInit(number, pid, NULL, NULL, &nih);
+ rc = PtlNIInit(PTL_IFACE_DEFAULT, pid, NULL, NULL, &ptlrpc_ni_h);
if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
- CDEBUG (D_NET, "Can't init network interface %s: %d\n",
- name, rc);
+ CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
return (-ENOENT);
}
CDEBUG(D_NET, "My pid is: %x\n", ptl_get_pid());
- PtlSnprintHandle(str, sizeof(str), nih);
- CDEBUG (D_NET, "init %x %s: %s\n", number, name, str);
-
- pni->pni_name = name;
- pni->pni_number = number;
- pni->pni_ni_h = nih;
-
- pni->pni_eq_h = PTL_INVALID_HANDLE;
+ PtlSnprintHandle(str, sizeof(str), ptlrpc_ni_h);
+ CDEBUG (D_NET, "ptlrpc_ni_h: %s\n", str);
/* CAVEAT EMPTOR: how we process portals events is _radically_
* different depending on... */
/* kernel portals calls our master callback when events are added to
* the event queue. In fact lustre never pulls events off this queue,
* so it's only sized for some debug history. */
- rc = PtlEQAlloc(pni->pni_ni_h, 1024, ptlrpc_master_callback,
- &pni->pni_eq_h);
+ rc = PtlEQAlloc(ptlrpc_ni_h, 1024, ptlrpc_master_callback,
+ &ptlrpc_eq_h);
#else
/* liblustre calls the master callback when it removes events from the
* event queue. The event queue has to be big enough not to drop
/* cray portals implements a non-standard callback to notify us there
* are buffered events even when the app is not doing a filesystem
* call. */
- rc = PtlEQAlloc(pni->pni_ni_h, 10240, cray_portals_callback,
- &pni->pni_eq_h);
+ rc = PtlEQAlloc(ptlrpc_ni_h, 10240, cray_portals_callback,
+ &ptlrpc_eq_h);
# else
- rc = PtlEQAlloc(pni->pni_ni_h, 10240, PTL_EQ_HANDLER_NONE,
- &pni->pni_eq_h);
+ rc = PtlEQAlloc(ptlrpc_ni_h, 10240, PTL_EQ_HANDLER_NONE,
+ &ptlrpc_eq_h);
# endif
#endif
- if (rc != PTL_OK)
- GOTO (fail, rc = -ENOMEM);
-
- return (0);
- fail:
- CERROR ("Failed to initialise network interface %s: %d\n",
- name, rc);
+ if (rc == PTL_OK)
+ return 0;
+
+ CERROR ("Failed to allocate event queue: %d\n", rc);
+ PtlNIFini(ptlrpc_ni_h);
- /* OK to do complete teardown since we invalidated the handles above */
- ptlrpc_ni_fini (pni);
- return (rc);
+ return (-ENOMEM);
}
#ifndef __KERNEL__
int i;
ENTRY;
- rc = PtlEQPoll(&ptlrpc_interfaces[0].pni_eq_h, 1, timeout * 1000,
- &ev, &i);
+ rc = PtlEQPoll(ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
if (rc == PTL_EQ_EMPTY)
RETURN(0);
#endif
#endif /* __KERNEL__ */
-int ptlrpc_default_nal(void)
-{
- if (ptlrpc_ninterfaces == 0)
- return (-ENOENT);
-
- return (ptlrpc_interfaces[0].pni_number);
-}
-
int ptlrpc_init_portals(void)
{
- /* Add new portals network interfaces here.
- * Order is irrelevent! */
- static struct {
- int number;
- char *name;
- } ptl_nis[] = {
-#if !CRAY_PORTALS
- {QSWNAL, "qswnal"},
- {SOCKNAL, "socknal"},
- {GMNAL, "gmnal"},
- {OPENIBNAL, "openibnal"},
- {IIBNAL, "iibnal"},
- {VIBNAL, "vibnal"},
- {TCPNAL, "tcpnal"},
- {LONAL, "lonal"},
- {RANAL, "ranal"},
-#else
- {CRAY_KB_ERNAL, "cray_kb_ernal"},
-#endif
- };
- int rc;
- int i;
-
- LASSERT(ptlrpc_ninterfaces == 0);
-
- for (i = 0; i < sizeof (ptl_nis) / sizeof (ptl_nis[0]); i++) {
- LASSERT(ptlrpc_ninterfaces < (sizeof(ptlrpc_interfaces) /
- sizeof(ptlrpc_interfaces[0])));
-
- rc = ptlrpc_ni_init(ptl_nis[i].number, ptl_nis[i].name,
- &ptlrpc_interfaces[ptlrpc_ninterfaces]);
- if (rc == 0)
- ptlrpc_ninterfaces++;
- }
+ int rc = ptlrpc_ni_init();
- if (ptlrpc_ninterfaces == 0) {
- CERROR("network initialisation failed: is a NAL module "
- "loaded?\n");
+ if (rc != 0) {
+ CERROR("network initialisation failed\n");
return -EIO;
}
#ifndef __KERNEL__
#ifndef __KERNEL__
liblustre_deregister_wait_callback(liblustre_services_callback);
#endif
- while (ptlrpc_ninterfaces > 0)
- ptlrpc_ni_fini (&ptlrpc_interfaces[--ptlrpc_ninterfaces]);
+ ptlrpc_ni_fini();
}
spin_lock_irqsave(&imp->imp_lock, flags);
if (imp->imp_state == LUSTRE_IMP_FULL) {
- char nidbuf[PTL_NALFMT_SIZE];
char *target_start;
int target_len;
"lost; in progress operations using this "
"service will %s.\n",
target_len, target_start,
- ptlrpc_peernid2str(&imp->imp_connection->c_peer,
- nidbuf),
+ libcfs_nid2str(imp->imp_connection->c_peer.nid),
imp->imp_replayable
? "wait for recovery to complete"
: "fail");
}
if (imp->imp_state == LUSTRE_IMP_RECOVER) {
- char nidbuf[PTL_NALFMT_SIZE];
+ char *nidstr;
CDEBUG(D_HA, "reconnected to %s@%s\n",
imp->imp_target_uuid.uuid,
deuuidify(imp->imp_target_uuid.uuid, NULL,
&target_start, &target_len);
- ptlrpc_peernid2str(&imp->imp_connection->c_peer,
- nidbuf);
+ nidstr = libcfs_nid2str(imp->imp_connection->c_peer.nid);
LCONSOLE_INFO("Connection restored to service %.*s using nid "
- "%s.\n",
- target_len, target_start, nidbuf);
+ "%s.\n", target_len, target_start, nidstr);
CWARN("%s: connection restored to %s@%s\n",
imp->imp_obd->obd_name,
* must be just as careful as the service's request
* parser. Currently I only print stuff here I know is OK
* to look at coz it was set up in request_in_callback()!!! */
- seq_printf(s, LPD64":%s:%s:"LPD64":%d:%s ",
- req->rq_history_seq,
- req->rq_peer.peer_ni->pni_name, req->rq_peerstr,
+ seq_printf(s, LPD64":%s:"LPD64":%d:%s ",
+ req->rq_history_seq, libcfs_id2str(req->rq_peer),
req->rq_xid, req->rq_reqlen,ptlrpc_rqphase2str(req));
if (svc->srv_request_history_print_fn == NULL)
{
int rc;
ptl_md_t md;
- char str[PTL_NALFMT_SIZE];
ENTRY;
LASSERT (portal != 0);
LASSERT (conn != NULL);
- CDEBUG (D_INFO, "conn=%p ni %s id %s on %s\n",
- conn, conn->c_peer.peer_ni->pni_name,
- ptlrpc_id2str(&conn->c_peer, str),
- conn->c_peer.peer_ni->pni_name);
+ CDEBUG (D_INFO, "conn=%p id %s\n", conn, libcfs_id2str(conn->c_peer));
md.start = base;
md.length = len;
md.threshold = (ack == PTL_ACK_REQ) ? 2 : 1;
md.options = PTLRPC_MD_OPTIONS;
md.user_ptr = cbid;
- md.eq_handle = conn->c_peer.peer_ni->pni_eq_h;
+ md.eq_handle = ptlrpc_eq_h;
if (ack == PTL_ACK_REQ &&
OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
}
- rc = PtlMDBind (conn->c_peer.peer_ni->pni_ni_h, md,
- PTL_UNLINK, mdh);
+ rc = PtlMDBind (ptlrpc_ni_h, md, PTL_UNLINK, mdh);
if (rc != PTL_OK) {
CERROR ("PtlMDBind failed: %d\n", rc);
LASSERT (rc == PTL_NO_SPACE);
CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
len, portal, xid);
- rc = PtlPut (*mdh, ack, conn->c_peer.peer_id, portal, 0, xid, 0, 0);
+ rc = PtlPut (*mdh, ack, conn->c_peer, portal, 0, xid, 0, 0);
if (rc != PTL_OK) {
int rc2;
/* We're going to get an UNLINK event when I unlink below,
* which will complete just like any other failed send, so
* I fall through and return success here! */
CERROR("PtlPut(%s, %d, "LPD64") failed: %d\n",
- ptlrpc_id2str(&conn->c_peer, str),
- portal, xid, rc);
+ libcfs_id2str(conn->c_peer), portal, xid, rc);
rc2 = PtlMDUnlink(*mdh);
LASSERTF(rc2 == PTL_OK, "rc2 = %d\n", rc2);
}
{
int rc;
int rc2;
- struct ptlrpc_peer *peer;
+ ptl_process_id_t peer;
ptl_md_t md;
__u64 xid;
- char str[PTL_NALFMT_SIZE];
ENTRY;
if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET))
LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
desc->bd_type == BULK_GET_SINK);
desc->bd_success = 0;
- peer = &desc->bd_export->exp_connection->c_peer;
+ peer = desc->bd_export->exp_connection->c_peer;
md.user_ptr = &desc->bd_cbid;
- md.eq_handle = peer->peer_ni->pni_eq_h;
+ md.eq_handle = ptlrpc_eq_h;
md.threshold = 2; /* SENT and ACK/REPLY */
md.options = PTLRPC_MD_OPTIONS;
ptlrpc_fill_bulk_md(&md, desc);
/* NB total length may be 0 for a read past EOF, so we send a 0
* length bulk, since the client expects a bulk event. */
- rc = PtlMDBind(peer->peer_ni->pni_ni_h, md,
- PTL_UNLINK, &desc->bd_md_h);
+ rc = PtlMDBind(ptlrpc_ni_h, md, PTL_UNLINK, &desc->bd_md_h);
if (rc != PTL_OK) {
CERROR("PtlMDBind failed: %d\n", rc);
LASSERT (rc == PTL_NO_SPACE);
/* Client's bulk and reply matchbits are the same */
xid = desc->bd_req->rq_xid;
- CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d on %s "
- "nid %s pid %d xid "LPX64"\n", desc->bd_iov_count,
- desc->bd_nob, desc->bd_portal, peer->peer_ni->pni_name,
- ptlrpc_id2str(peer, str), peer->peer_id.pid, xid);
+ CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
+ "id %s xid "LPX64"\n", desc->bd_iov_count,
+ desc->bd_nob, desc->bd_portal, libcfs_id2str(peer), xid);
/* Network is about to get at the memory */
desc->bd_network_rw = 1;
if (desc->bd_type == BULK_PUT_SOURCE)
- rc = PtlPut (desc->bd_md_h, PTL_ACK_REQ, peer->peer_id,
+ rc = PtlPut (desc->bd_md_h, PTL_ACK_REQ, peer,
desc->bd_portal, 0, xid, 0, 0);
else
- rc = PtlGet (desc->bd_md_h, peer->peer_id,
+ rc = PtlGet (desc->bd_md_h, peer,
desc->bd_portal, 0, xid, 0);
if (rc != PTL_OK) {
* event this creates will signal completion with failure,
* so we return SUCCESS here! */
CERROR("Transfer(%s, %d, "LPX64") failed: %d\n",
- ptlrpc_id2str(peer, str),
- desc->bd_portal, xid, rc);
+ libcfs_id2str(peer), desc->bd_portal, xid, rc);
rc2 = PtlMDUnlink(desc->bd_md_h);
LASSERT (rc2 == PTL_OK);
}
int ptlrpc_register_bulk (struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
- struct ptlrpc_peer *peer;
+ ptl_process_id_t peer;
int rc;
int rc2;
ptl_handle_me_t me_h;
desc->bd_success = 0;
- peer = &desc->bd_import->imp_connection->c_peer;
+ peer = desc->bd_import->imp_connection->c_peer;
md.user_ptr = &desc->bd_cbid;
- md.eq_handle = peer->peer_ni->pni_eq_h;
+ md.eq_handle = ptlrpc_eq_h;
md.threshold = 1; /* PUT or GET */
md.options = PTLRPC_MD_OPTIONS |
((desc->bd_type == BULK_GET_SOURCE) ?
desc->bd_registered = 1;
desc->bd_last_xid = req->rq_xid;
- rc = PtlMEAttach(peer->peer_ni->pni_ni_h, desc->bd_portal,
- desc->bd_import->imp_connection->c_peer.peer_id,
+ rc = PtlMEAttach(ptlrpc_ni_h, desc->bd_portal, peer,
req->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER, &me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
}
CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
- "portal %u on %s\n",
+ "portal %u\n",
desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
desc->bd_iov_count, desc->bd_nob,
- req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
+ req->rq_xid, desc->bd_portal);
RETURN(0);
}
int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
{
- struct ptlrpc_service *svc = req->rq_rqbd->rqbd_srv_ni->sni_service;
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
struct ptlrpc_reply_state *rs = req->rq_reply_state;
struct ptlrpc_connection *conn;
int rc;
req->rq_repmsg->opc = req->rq_reqmsg->opc;
if (req->rq_export == NULL)
- conn = ptlrpc_get_connection(&req->rq_peer, NULL);
+ conn = ptlrpc_get_connection(req->rq_peer, NULL);
else
conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
if (request->rq_repmsg == NULL)
GOTO(cleanup_bulk, rc = -ENOMEM);
- rc = PtlMEAttach(connection->c_peer.peer_ni->pni_ni_h,
+ rc = PtlMEAttach(ptlrpc_ni_h,
request->rq_reply_portal, /* XXX FIXME bug 249 */
- connection->c_peer.peer_id, request->rq_xid, 0,
+ connection->c_peer, request->rq_xid, 0,
PTL_UNLINK, PTL_INS_AFTER, &reply_me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
reply_md.threshold = 1;
reply_md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT;
reply_md.user_ptr = &request->rq_reply_cbid;
- reply_md.eq_handle = connection->c_peer.peer_ni->pni_eq_h;
+ reply_md.eq_handle = ptlrpc_eq_h;
rc = PtlMDAttach(reply_me_h, reply_md, PTL_UNLINK,
&request->rq_reply_md_h);
}
CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
- ", portal %u on %s\n",
+ ", portal %u\n",
request->rq_replen, request->rq_xid,
- request->rq_reply_portal,
- connection->c_peer.peer_ni->pni_name);
+ request->rq_reply_portal);
ptlrpc_request_addref(request); /* +1 ref for the SENT callback */
int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
{
- struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
- struct ptlrpc_service *service = srv_ni->sni_service;
+ struct ptlrpc_service *service = rqbd->rqbd_service;
static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY};
int rc;
ptl_md_t md;
ptl_handle_me_t me_h;
- CDEBUG(D_NET, "PtlMEAttach: portal %d on %s\n",
- service->srv_req_portal, srv_ni->sni_ni->pni_name);
+ CDEBUG(D_NET, "PtlMEAttach: portal %d\n",
+ service->srv_req_portal);
if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_RQBD))
return (-ENOMEM);
- rc = PtlMEAttach(srv_ni->sni_ni->pni_ni_h, service->srv_req_portal,
+ rc = PtlMEAttach(ptlrpc_ni_h, service->srv_req_portal,
match_id, 0, ~0, PTL_UNLINK, PTL_INS_AFTER, &me_h);
if (rc != PTL_OK) {
CERROR("PtlMEAttach failed: %d\n", rc);
md.threshold = PTL_MD_THRESH_INF;
md.options = PTLRPC_MD_OPTIONS | PTL_MD_OP_PUT | PTL_MD_MAX_SIZE;
md.user_ptr = &rqbd->rqbd_cbid;
- md.eq_handle = srv_ni->sni_ni->pni_eq_h;
+ md.eq_handle = ptlrpc_eq_h;
rc = PtlMDAttach(me_h, md, PTL_UNLINK, &rqbd->rqbd_md_h);
if (rc == PTL_OK)
atomic_set(&rs->rs_refcount, 1); /* 1 ref for rq_reply_state */
rs->rs_cb_id.cbid_fn = reply_out_callback;
rs->rs_cb_id.cbid_arg = rs;
- rs->rs_srv_ni = req->rq_rqbd->rqbd_srv_ni;
+ rs->rs_service = req->rq_rqbd->rqbd_service;
rs->rs_size = size;
INIT_LIST_HEAD(&rs->rs_exp_list);
INIT_LIST_HEAD(&rs->rs_obd_list);
}
struct ptlrpc_request_buffer_desc *
-ptlrpc_alloc_rqbd (struct ptlrpc_srv_ni *srv_ni)
+ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
{
- struct ptlrpc_service *svc = srv_ni->sni_service;
unsigned long flags;
struct ptlrpc_request_buffer_desc *rqbd;
if (rqbd == NULL)
return (NULL);
- rqbd->rqbd_srv_ni = srv_ni;
+ rqbd->rqbd_service = svc;
rqbd->rqbd_refcount = 0;
rqbd->rqbd_cbid.cbid_fn = request_in_callback;
rqbd->rqbd_cbid.cbid_arg = rqbd;
void
ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
{
- struct ptlrpc_srv_ni *sni = rqbd->rqbd_srv_ni;
- struct ptlrpc_service *svc = sni->sni_service;
+ struct ptlrpc_service *svc = rqbd->rqbd_service;
unsigned long flags;
LASSERT (rqbd->rqbd_refcount == 0);
}
int
-ptlrpc_grow_req_bufs(struct ptlrpc_srv_ni *srv_ni)
+ptlrpc_grow_req_bufs(struct ptlrpc_service *svc)
{
- struct ptlrpc_service *svc = srv_ni->sni_service;
struct ptlrpc_request_buffer_desc *rqbd;
int i;
CDEBUG(D_RPCTRACE, "%s: allocate %d new %d-byte reqbufs (%d/%d left)\n",
svc->srv_name, svc->srv_nbuf_per_group, svc->srv_buf_size,
- srv_ni->sni_nrqbd_receiving, svc->srv_nbufs);
+ svc->srv_nrqbd_receiving, svc->srv_nbufs);
for (i = 0; i < svc->srv_nbuf_per_group; i++) {
- rqbd = ptlrpc_alloc_rqbd(srv_ni);
+ rqbd = ptlrpc_alloc_rqbd(svc);
if (rqbd == NULL) {
- CERROR ("%s/%s: Can't allocate request buffer\n",
- svc->srv_name, srv_ni->sni_ni->pni_name);
+ CERROR ("%s: Can't allocate request buffer\n",
+ svc->srv_name);
return (-ENOMEM);
}
void
ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
{
- struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
+ struct ptlrpc_service *svc = rs->rs_service;
#ifdef CONFIG_SMP
LASSERT (spin_is_locked (&svc->srv_lock));
LASSERT (rs->rs_difficult);
if (rs->rs_transno <= obd->obd_last_committed) {
- struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
+ struct ptlrpc_service *svc = rs->rs_service;
spin_lock (&svc->srv_lock);
list_del_init (&rs->rs_obd_list);
static int
ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
{
- struct ptlrpc_srv_ni *srv_ni;
struct ptlrpc_request_buffer_desc *rqbd;
unsigned long flags;
int rc;
list_del (&rqbd->rqbd_list);
/* assume we will post successfully */
- srv_ni = rqbd->rqbd_srv_ni;
- srv_ni->sni_nrqbd_receiving++;
- list_add (&rqbd->rqbd_list, &srv_ni->sni_active_rqbds);
+ svc->srv_nrqbd_receiving++;
+ list_add (&rqbd->rqbd_list, &svc->srv_active_rqbds);
spin_unlock_irqrestore(&svc->srv_lock, flags);
spin_lock_irqsave(&svc->srv_lock, flags);
- srv_ni->sni_nrqbd_receiving--;
+ svc->srv_nrqbd_receiving--;
list_del(&rqbd->rqbd_list);
list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
- if (srv_ni->sni_nrqbd_receiving == 0) {
+ if (svc->srv_nrqbd_receiving == 0) {
/* This service is off-air on this interface because all
* its request buffers are busy. Portals will have started
* dropping incoming requests until more buffers get
* posted */
- CERROR("All %s %s request buffers busy\n",
- svc->srv_name, srv_ni->sni_ni->pni_name);
+ CERROR("All %s request buffers busy\n", svc->srv_name);
}
spin_unlock_irqrestore (&svc->srv_lock, flags);
struct proc_dir_entry *proc_entry,
svcreq_printfn_t svcreq_printfn)
{
- int i;
- int rc;
- int ssize;
- struct ptlrpc_service *service;
- struct ptlrpc_srv_ni *srv_ni;
+ int rc;
+ struct ptlrpc_service *service;
ENTRY;
- LASSERT (ptlrpc_ninterfaces > 0);
LASSERT (nbufs > 0);
LASSERT (bufsize >= max_req_size);
- ssize = offsetof (struct ptlrpc_service,
- srv_interfaces[ptlrpc_ninterfaces]);
- OBD_ALLOC(service, ssize);
+ OBD_ALLOC(service, sizeof(*service));
if (service == NULL)
RETURN(NULL);
+ /* First initialise enough for early teardown */
+
service->srv_name = name;
spin_lock_init(&service->srv_lock);
INIT_LIST_HEAD(&service->srv_threads);
INIT_LIST_HEAD(&service->srv_request_queue);
INIT_LIST_HEAD(&service->srv_idle_rqbds);
+ INIT_LIST_HEAD(&service->srv_active_rqbds);
INIT_LIST_HEAD(&service->srv_history_rqbds);
INIT_LIST_HEAD(&service->srv_request_history);
+ INIT_LIST_HEAD(&service->srv_active_replies);
INIT_LIST_HEAD(&service->srv_reply_queue);
- /* First initialise enough for early teardown */
- for (i = 0; i < ptlrpc_ninterfaces; i++) {
- srv_ni = &service->srv_interfaces[i];
-
- srv_ni->sni_service = service;
- srv_ni->sni_ni = &ptlrpc_interfaces[i];
- INIT_LIST_HEAD(&srv_ni->sni_active_rqbds);
- INIT_LIST_HEAD(&srv_ni->sni_active_replies);
- }
-
spin_lock (&ptlrpc_all_services_lock);
list_add (&service->srv_list, &ptlrpc_all_services);
spin_unlock (&ptlrpc_all_services_lock);
- /* Now allocate the request buffers, assuming all interfaces require
- * the same number. */
- for (i = 0; i < ptlrpc_ninterfaces; i++) {
- srv_ni = &service->srv_interfaces[i];
- CDEBUG (D_NET, "%s: initialising interface %s\n", name,
- srv_ni->sni_ni->pni_name);
-
- rc = ptlrpc_grow_req_bufs(srv_ni);
- /* We shouldn't be under memory pressure at startup, so
- * fail if we can't post all our buffers at this time. */
- if (rc != 0)
- GOTO(failed, NULL);
- }
+ /* Now allocate the request buffers */
+ rc = ptlrpc_grow_req_bufs(service);
+ /* We shouldn't be under memory pressure at startup, so
+ * fail if we can't post all our buffers at this time. */
+ if (rc != 0)
+ GOTO(failed, NULL);
if (proc_entry != NULL)
ptlrpc_lprocfs_register_service(proc_entry, service);
- CDEBUG(D_NET, "%s: Started on %d interfaces, listening on portal %d\n",
- service->srv_name, ptlrpc_ninterfaces, service->srv_req_portal);
+ CDEBUG(D_NET, "%s: Started, listening on portal %d\n",
+ service->srv_name, service->srv_req_portal);
RETURN(service);
failed:
ptlrpc_server_free_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
- struct ptlrpc_srv_ni *srv_ni = rqbd->rqbd_srv_ni;
- struct ptlrpc_service *svc = srv_ni->sni_service;
+ struct ptlrpc_service *svc = rqbd->rqbd_service;
unsigned long flags;
int refcount;
struct list_head *tmp;
if (rc != 0) {
CERROR ("error unpacking request: ptl %d from %s"
" xid "LPU64"\n", svc->srv_req_portal,
- request->rq_peerstr, request->rq_xid);
+ libcfs_id2str(request->rq_peer), request->rq_xid);
goto out;
}
rc = -EINVAL;
if (request->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
CERROR("wrong packet type received (type=%u) from %s\n",
- request->rq_reqmsg->type, request->rq_peerstr);
+ request->rq_reqmsg->type,
+ libcfs_id2str(request->rq_peer));
goto out;
}
if (timediff / 1000000 > (long)obd_timeout) {
CERROR("Dropping timed-out opc %d request from %s"
": %ld seconds old\n", request->rq_reqmsg->opc,
- request->rq_peerstr,
+ libcfs_id2str(request->rq_peer),
timediff / 1000000);
goto out;
}
request->rq_phase = RQ_PHASE_INTERPRET;
- CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
- "%s:%s+%d:%d:"LPU64":%s:%s:%d\n", current->comm,
+ CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
+ "%s:%s+%d:%d:"LPU64":%s:%d\n", current->comm,
(request->rq_export ?
(char *)request->rq_export->exp_client_uuid.uuid : "0"),
(request->rq_export ?
atomic_read(&request->rq_export->exp_refcount) : -99),
request->rq_reqmsg->status, request->rq_xid,
- request->rq_peer.peer_ni->pni_name,
- request->rq_peerstr,
+ libcfs_id2str(request->rq_peer),
request->rq_reqmsg->opc);
rc = svc->srv_handler(request);
request->rq_phase = RQ_PHASE_COMPLETE;
- CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:ni:nid:opc "
- "%s:%s+%d:%d:"LPU64":%s:%s:%d\n", current->comm,
+ CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
+ "%s:%s+%d:%d:"LPU64":%s:%d\n", current->comm,
(request->rq_export ?
(char *)request->rq_export->exp_client_uuid.uuid : "0"),
(request->rq_export ?
atomic_read(&request->rq_export->exp_refcount) : -99),
request->rq_reqmsg->status, request->rq_xid,
- request->rq_peer.peer_ni->pni_name,
- request->rq_peerstr,
+ libcfs_id2str(request->rq_peer),
request->rq_reqmsg->opc);
put_conn:
CDEBUG((timediff / 1000000 > (long)obd_timeout) ? D_ERROR : D_HA,
"request "LPU64" opc %u from %s processed in %ldus "
"(%ldus total)\n", request->rq_xid, request->rq_reqmsg->opc,
- request->rq_peerstr,
+ libcfs_id2str(request->rq_peer),
timediff, timeval_sub(&work_end, &request->rq_arrival_time));
if (svc->srv_stats != NULL) {
struct obd_device *obd;
int nlocks;
int been_handled;
- char str[PTL_NALFMT_SIZE];
ENTRY;
spin_lock_irqsave (&svc->srv_lock, flags);
rs,
rs->rs_xid, rs->rs_transno,
rs->rs_msg.opc,
- ptlrpc_peernid2str(&exp->exp_connection->c_peer, str));
+ libcfs_nid2str(exp->exp_connection->c_peer.nid));
}
if ((!been_handled && rs->rs_on_net) ||
}
static void
-ptlrpc_check_rqbd_pools(struct ptlrpc_service *svc)
+ptlrpc_check_rqbd_pool(struct ptlrpc_service *svc)
{
- struct ptlrpc_srv_ni *sni;
- int i;
- int avail = 0;
- int low_water = svc->srv_nbuf_per_group/2;
+ int avail = svc->srv_nrqbd_receiving;
+ int low_water = svc->srv_nbuf_per_group/2;
- for (i = 0; i < ptlrpc_ninterfaces; i++) {
- sni = &svc->srv_interfaces[i];
+ /* NB I'm not locking; just looking. */
- avail += sni->sni_nrqbd_receiving;
- /* NB I'm not locking; just looking. */
+ /* CAVEAT EMPTOR: We might be allocating buffers here because we've
+ * allowed the request history to grow out of control. We could put a
+ * sanity check on that here and cull some history if we need the
+ * space. */
- /* CAVEAT EMPTOR: We might be allocating buffers here
- * because we've allowed the request history to grow out of
- * control. We could put a sanity check on that here and
- * cull some history if we need the space. */
-
- if (sni->sni_nrqbd_receiving <= low_water)
- ptlrpc_grow_req_bufs(sni);
- }
+ if (avail <= low_water)
+ ptlrpc_grow_req_bufs(svc);
lprocfs_counter_add(svc->srv_stats, PTLRPC_REQBUF_AVAIL_CNTR, avail);
}
lc_watchdog_touch(watchdog);
- ptlrpc_check_rqbd_pools(svc);
+ ptlrpc_check_rqbd_pool(svc);
if (!list_empty (&svc->srv_reply_queue))
ptlrpc_server_handle_reply (svc);
int ptlrpc_unregister_service(struct ptlrpc_service *service)
{
- int i;
int rc;
unsigned long flags;
- struct ptlrpc_srv_ni *srv_ni;
struct l_wait_info lwi;
struct list_head *tmp;
* freed */
service->srv_max_history_rqbds = 0;
- for (i = 0; i < ptlrpc_ninterfaces; i++) {
- srv_ni = &service->srv_interfaces[i];
- CDEBUG(D_NET, "%s: tearing down interface %s\n",
- service->srv_name, srv_ni->sni_ni->pni_name);
-
- /* Unlink all the request buffers. This forces a 'final'
- * event with its 'unlink' flag set for each posted rqbd */
- list_for_each(tmp, &srv_ni->sni_active_rqbds) {
- struct ptlrpc_request_buffer_desc *rqbd =
- list_entry(tmp, struct ptlrpc_request_buffer_desc,
- rqbd_list);
+ CDEBUG(D_NET, "%s: tearing down\n", service->srv_name);
- rc = PtlMDUnlink(rqbd->rqbd_md_h);
- LASSERT (rc == PTL_OK || rc == PTL_MD_INVALID);
- }
+ /* Unlink all the request buffers. This forces a 'final' event with
+ * its 'unlink' flag set for each posted rqbd */
+ list_for_each(tmp, &service->srv_active_rqbds) {
+ struct ptlrpc_request_buffer_desc *rqbd =
+ list_entry(tmp, struct ptlrpc_request_buffer_desc,
+ rqbd_list);
- /* Wait for the network to release any buffers it's
- * currently filling */
- for (;;) {
- spin_lock_irqsave(&service->srv_lock, flags);
- rc = srv_ni->sni_nrqbd_receiving;
- spin_unlock_irqrestore(&service->srv_lock, flags);
-
- if (rc == 0)
- break;
-
- /* Network access will complete in finite time but
- * the HUGE timeout lets us CWARN for visibility of
- * sluggish NALs */
- lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
- rc = l_wait_event(service->srv_waitq,
- srv_ni->sni_nrqbd_receiving == 0,
- &lwi);
- if (rc == -ETIMEDOUT)
- CWARN("Waiting for request buffers on "
- "service %s on interface %s ",
- service->srv_name, srv_ni->sni_ni->pni_name);
- }
+ rc = PtlMDUnlink(rqbd->rqbd_md_h);
+ LASSERT (rc == PTL_OK || rc == PTL_MD_INVALID);
+ }
- /* schedule all outstanding replies to terminate them */
+ /* Wait for the network to release any buffers it's currently
+ * filling */
+ for (;;) {
spin_lock_irqsave(&service->srv_lock, flags);
- while (!list_empty(&srv_ni->sni_active_replies)) {
- struct ptlrpc_reply_state *rs =
- list_entry(srv_ni->sni_active_replies.next,
- struct ptlrpc_reply_state,
- rs_list);
- ptlrpc_schedule_difficult_reply(rs);
- }
+ rc = service->srv_nrqbd_receiving;
spin_unlock_irqrestore(&service->srv_lock, flags);
+
+ if (rc == 0)
+ break;
+
+ /* Network access will complete in finite time but the HUGE
+ * timeout lets us CWARN for visibility of sluggish NALs */
+ lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
+ rc = l_wait_event(service->srv_waitq,
+ service->srv_nrqbd_receiving == 0,
+ &lwi);
+ if (rc == -ETIMEDOUT)
+ CWARN("Service %s waiting for request buffers\n",
+ service->srv_name);
}
+ /* schedule all outstanding replies to terminate them */
+ spin_lock_irqsave(&service->srv_lock, flags);
+ while (!list_empty(&service->srv_active_replies)) {
+ struct ptlrpc_reply_state *rs =
+ list_entry(service->srv_active_replies.next,
+ struct ptlrpc_reply_state, rs_list);
+ ptlrpc_schedule_difficult_reply(rs);
+ }
+ spin_unlock_irqrestore(&service->srv_lock, flags);
+
/* purge the request queue. NB No new replies (rqbds all unlinked)
* and no service threads, so I'm the only thread noodling the
* request queue now */
LASSERT(service->srv_n_queued_reqs == 0);
LASSERT(service->srv_n_active_reqs == 0);
LASSERT(service->srv_n_history_rqbds == 0);
-
- for (i = 0; i < ptlrpc_ninterfaces; i++) {
- srv_ni = &service->srv_interfaces[i];
- LASSERT(list_empty(&srv_ni->sni_active_rqbds));
- }
+ LASSERT(list_empty(&service->srv_active_rqbds));
/* Now free all the request buffers since nothing references them
* any more... */
CWARN("Unexpectedly long timeout %p\n", service);
}
- OBD_FREE(service,
- offsetof(struct ptlrpc_service,
- srv_interfaces[ptlrpc_ninterfaces]));
+ OBD_FREE(service, sizeof(*service));
return 0;
}
"Omitting the count means indefinitely, 0 means restore, "
"otherwise fail 'count' messages.\n"
"usage: fail nid|_all_ [count]"},
- {"loopback", jt_ptl_loopback, 0, "print loopback state\n"
- "With arg enable/disable\n"
- "usage: loopback [on|off]"},
/* Device selection commands */
{"=== device selection ===", jt_noop, 0, "device selection"},
case SOCKNAL:
/* We need to do this before the mount is started if routing */
system("/sbin/modprobe ksocknal");
- case TCPNAL:
case OPENIBNAL:
case IIBNAL:
case VIBNAL:
break;
case SOCKNAL:
- case TCPNAL:
case OPENIBNAL:
case VIBNAL:
case RANAL:
}
static
-int do_add_uuid(char * func, char *uuid, ptl_nid_t nid, int nal)
+int do_add_uuid(char * func, char *uuid, ptl_nid_t nid)
{
char tmp[64];
int rc;
lcfg = lustre_cfg_new(LCFG_ADD_UUID, &bufs);
lcfg->lcfg_nid = nid;
- lcfg->lcfg_nal = nal;
#if 0
- fprintf(stderr, "adding\tnal: %d\tnid: %d\tuuid: %s\n",
- lcfg->lcfg_nid, lcfg->lcfg_nal, uuid);
+ fprintf(stderr, "adding\tnid: %d\tuuid: %s\n",
+ lcfg->lcfg_nid, uuid);
#endif
rc = lcfg_ioctl(func, OBD_DEV_ID, lcfg);
lustre_cfg_free(lcfg);
int jt_lcfg_add_uuid(int argc, char **argv)
{
ptl_nid_t nid = 0;
- int nal;
if (argc != 4) {
return CMD_HELP;
return (-1);
}
- nal = ptl_name2nal(argv[3]);
-
- if (nal <= 0) {
- fprintf (stderr, "Can't parse NAL %s\n", argv[3]);
- return -1;
- }
-
- return do_add_uuid(argv[0], argv[1], nid, nal);
+ return do_add_uuid(argv[0], argv[1], nid);
}
-int obd_add_uuid(char *uuid, ptl_nid_t nid, int nal)
+int obd_add_uuid(char *uuid, ptl_nid_t nid)
{
- return do_add_uuid("obd_add_uuid", uuid, nid, nal);
+ return do_add_uuid("obd_add_uuid", uuid, nid);
}
int jt_lcfg_del_uuid(int argc, char **argv)
int jt_lcfg_add_conn(int argc, char **argv);
int jt_lcfg_del_conn(int argc, char **argv);
-int obd_add_uuid(char *uuid, ptl_nid_t nid, int nal);
+int obd_add_uuid(char *uuid, ptl_nid_t nid);
#endif