#endif
#include <obd_class.h>
#include <lustre_net.h>
+#include <lustre_sec.h>
#include "ptlrpc_internal.h"
lnet_handle_eq_t ptlrpc_eq_h;
-/*
+/*
* Client's outgoing request callback
*/
void request_out_callback(lnet_event_t *ev)
DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
"type %d, status %d", ev->type, ev->status);
+ sptlrpc_request_out_callback(req);
+
if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
/* Failed send: make it seem like the reply timed out, just
LASSERT (ev->type == LNET_EVENT_PUT ||
ev->type == LNET_EVENT_UNLINK);
LASSERT (ev->unlinked);
- LASSERT (ev->md.start == req->rq_repmsg);
+ LASSERT (ev->md.start == req->rq_repbuf);
LASSERT (ev->offset == 0);
- LASSERT (ev->mlength <= req->rq_replen);
-
+ LASSERT (ev->mlength <= req->rq_repbuf_len);
+
DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
"type %d, status %d", ev->type, ev->status);
EXIT;
}
-/*
+/*
* Client's bulk has been written/read
*/
void client_bulk_callback (lnet_event_t *ev)
struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
ENTRY;
- LASSERT ((desc->bd_type == BULK_PUT_SINK &&
+ LASSERT ((desc->bd_type == BULK_PUT_SINK &&
ev->type == LNET_EVENT_PUT) ||
(desc->bd_type == BULK_GET_SOURCE &&
ev->type == LNET_EVENT_GET) ||
LASSERT (ev->unlinked);
CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
- "event type %d, status %d, desc %p\n",
+ "event type %d, status %d, desc %p\n",
ev->type, ev->status, desc);
spin_lock(&desc->bd_lock);
desc->bd_sender = ev->sender;
}
+ sptlrpc_enc_pool_put_pages(desc);
+
/* NB don't unlock till after wakeup; desc can disappear under us
* otherwise */
ptlrpc_wake_client_req(desc->bd_req);
EXIT;
}
-/*
+/*
* Server's incoming request callback
*/
void request_in_callback(lnet_event_t *ev)
rqbd->rqbd_buffer + service->srv_buf_size);
CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
- "event type %d, status %d, service %s\n",
+ "event type %d, status %d, service %s\n",
ev->type, ev->status, service->srv_name);
if (ev->unlinked) {
if (req == NULL) {
CERROR("Can't allocate incoming request descriptor: "
"Dropping %s RPC from %s\n",
- service->srv_name,
+ service->srv_name,
libcfs_id2str(ev->initiator));
return;
}
* flags are reset and scalars are zero. We only set the message
* size to non-zero if this was a successful receive. */
req->rq_xid = ev->match_bits;
- req->rq_reqmsg = ev->md.start + ev->offset;
+ req->rq_reqbuf = ev->md.start + ev->offset;
if (ev->type == LNET_EVENT_PUT && ev->status == 0)
- req->rq_reqlen = ev->mlength;
+ req->rq_reqdata_len = ev->mlength;
do_gettimeofday(&req->rq_arrival_time);
req->rq_peer = ev->initiator;
req->rq_self = ev->target.nid;
req->rq_uid = ev->uid;
#endif
+ CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
+
spin_lock(&service->srv_lock);
req->rq_history_seq = service->srv_request_seq++;
ev->type == LNET_EVENT_REPLY));
CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
- "event type %d, status %d, desc %p\n",
+ "event type %d, status %d, desc %p\n",
ev->type, ev->status, desc);
spin_lock(&desc->bd_lock);
-
+
if ((ev->type == LNET_EVENT_ACK ||
ev->type == LNET_EVENT_REPLY) &&
ev->status == 0) {
callback == request_in_callback ||
callback == reply_out_callback ||
callback == server_bulk_callback);
-
+
callback (ev);
}
-int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
+int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
lnet_process_id_t *peer, lnet_nid_t *self)
{
int best_dist = 0;
rc = 0;
break;
}
-
+
LASSERT (order >= 0);
if (rc < 0 ||
dist < best_dist ||
}
CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
- if (rc != 0)
+ if (rc != 0)
CERROR("No NID found for %s\n", uuid->uuid);
return rc;
}
struct l_wait_info lwi;
int rc;
int retries;
-
+
/* Wait for the event queue to become idle since there may still be
* messages in flight with pending events (i.e. the fire-and-forget
* messages == client requests and "non-difficult" server
case 0:
LNetNIFini();
return;
-
+
case -EBUSY:
if (retries != 0)
CWARN("Event queue still busy\n");
-
+
/* Wait for a bit */
cfs_waitq_init(&waitq);
lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
#endif
return pid;
}
-
+
int ptlrpc_ni_init(void)
{
int rc;
int (*fn)(void *arg), void *arg)
{
struct liblustre_wait_callback *llwc;
-
+
OBD_ALLOC(llwc, sizeof(*llwc));
LASSERT (llwc != NULL);
-
+
llwc->llwc_name = name;
llwc->llwc_fn = fn;
llwc->llwc_arg = arg;
list_add_tail(&llwc->llwc_list, callback_list);
-
+
return (llwc);
}
liblustre_deregister_waitidle_callback (void *opaque)
{
struct liblustre_wait_callback *llwc = opaque;
-
+
list_del(&llwc->llwc_list);
OBD_FREE(llwc, sizeof(*llwc));
}
rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
if (rc == 0)
RETURN(0);
-
+
LASSERT (rc == -EOVERFLOW || rc == 1);
-
+
/* liblustre: no asynch callback so we can't affort to miss any
* events... */
if (rc == -EOVERFLOW) {
CERROR ("Dropped an event!!!\n");
abort();
}
-
+
ptlrpc_master_callback (&ev);
RETURN(1);
}
/* Give all registered callbacks a bite at the cherry */
list_for_each(tmp, &liblustre_wait_callbacks) {
- llwc = list_entry(tmp, struct liblustre_wait_callback,
+ llwc = list_entry(tmp, struct liblustre_wait_callback,
llwc_list);
-
+
if (llwc->llwc_fn(llwc->llwc_arg))
found_something = 1;
}
return -EIO;
}
#ifndef __KERNEL__
- liblustre_services_callback =
+ liblustre_services_callback =
liblustre_register_wait_callback("liblustre_check_services",
- &liblustre_check_services, NULL);
+ &liblustre_check_services,
+ NULL);
#endif
rc = ptlrpcd_addref();
if (rc == 0)