# See the file COPYING in this distribution
DOCS = OBD-HOWTO.sgml OLVM.txt figs notes.txt obdtrace_demo.txt
-doc_DATA = $(DOCS) OBD-HOWTO.html OBD-HOWTO.txt
+# doc_DATA = $(DOCS) OBD-HOWTO.html OBD-HOWTO.txt
CLEANFILES = OBD-HOWTO.html OBD-HOWTO.txt
EXTRA_DIST = $(DOCS)
if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
CERROR("lustre_ost: wrong packet type sent %d\n",
NTOH__u32(hdr->type));
+ BUG();
rc = -EINVAL;
goto out;
}
}
if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGSTOP) ||
sigismember(&(current->pending.signal), SIGINT)) {
req->rq_flags = PTL_RPC_INTR;
EXIT;
if (req->rq_flags != PTL_RPC_REPLY) {
CERROR("Unknown reason for wakeup\n");
- BUG();
+ /* XXX Phil - I end up here when I kill obdctl */
+ ptlrpc_abort(req);
+ //BUG();
EXIT;
return -EINTR;
}
* kmalloc()'ed memory and inserted at the ring tail.
*/
+ spin_lock(&service->srv_lock);
service->srv_ref_count[service->srv_md_active]++;
CDEBUG(D_INODE, "event offset %d buf size %d\n",
if (rc != PTL_OK) {
CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc);
BUG();
+ spin_unlock(&service->srv_lock);
return rc;
}
service->srv_ring_length);
}
+ spin_unlock(&service->srv_lock);
if (ev->type == PTL_EVENT_PUT) {
wake_up(&service->srv_waitq);
} else {
int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
{
ptl_process_id_t local_id;
+ struct ptlreq_hdr *hdr;
int rc;
char *repbuf;
ENTRY;
+ hdr = (struct ptlreq_hdr *)request->rq_reqbuf;
+ if (NTOH__u32(hdr->type) != OST_TYPE_REQ) {
+ CERROR("lustre_ost: wrong packet type sent %d\n",
+ NTOH__u32(hdr->type));
+ BUG();
+ }
if (request->rq_replen == 0) {
CERROR("request->rq_replen is 0!\n");
EXIT;
* it finishes processing an event. This ensures the ref count is
* decremented and that the rpc ring buffer cycles properly.
*/
-int ptl_received_rpc(struct ptlrpc_service *service) {
+int ptl_received_rpc(struct ptlrpc_service *service)
+{
int rc, index;
+ spin_lock(&service->srv_lock);
index = service->srv_md_active;
CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index,
service->srv_ref_count[index]);
if (rc != PTL_OK) {
CERROR("PtlMEInsert failed: %d\n", rc);
BUG();
+ spin_unlock(&service->srv_lock);
return rc;
}
/* XXX cleanup */
CERROR("PtlMDAttach failed: %d\n", rc);
BUG();
+ spin_unlock(&service->srv_lock);
return rc;
}
NEXT_INDEX(index, service->srv_ring_length);
}
+ spin_unlock(&service->srv_lock);
return 0;
}
static int ptlrpc_check_event(struct ptlrpc_service *svc)
{
+ int rc = 0;
+
+ spin_lock(&svc->srv_lock);
if (sigismember(&(current->pending.signal), SIGKILL) ||
+ sigismember(&(current->pending.signal), SIGSTOP) ||
+ sigismember(&(current->pending.signal), SIGCONT) ||
sigismember(&(current->pending.signal), SIGINT)) {
svc->srv_flags |= SVC_KILLED;
EXIT;
- return 1;
+ rc = 1;
+ goto out;
}
if ( svc->srv_flags & SVC_STOPPING ) {
EXIT;
- return 1;
+ rc = 1;
+ goto out;
}
if (svc->srv_flags & SVC_EVENT)
BUG();
if ( svc->srv_eq_h ) {
- int rc;
- rc = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
+ int err;
+ err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
- if (rc == PTL_OK) {
+ if (err == PTL_OK) {
svc->srv_flags |= SVC_EVENT;
EXIT;
- return 1;
+ rc = 1;
+ goto out;
}
- if (rc != PTL_EQ_EMPTY) {
+ if (err != PTL_EQ_EMPTY) {
CDEBUG(D_NET, "BUG: PtlEQGet returned %d\n", rc);
BUG();
}
EXIT;
- return 0;
+ rc = 0;
+ goto out;
}
if (!list_empty(&svc->srv_reqs)) {
svc->srv_flags |= SVC_LIST;
EXIT;
- return 1;
+ rc = 1;
+ goto out;
}
EXIT;
- return 0;
+ out:
+ spin_unlock(&svc->srv_lock);
+ return rc;
}
struct ptlrpc_service *
while (1) {
wait_event(svc->srv_waitq, ptlrpc_check_event(svc));
+ spin_lock(&svc->srv_lock);
if (svc->srv_flags & SVC_SIGNAL) {
EXIT;
+ spin_unlock(&svc->srv_lock);
break;
}
if (svc->srv_flags & SVC_STOPPING) {
EXIT;
+ spin_unlock(&svc->srv_lock);
break;
}
/* FIXME: this NI should be the incoming NI.
* We don't know how to find that from here. */
request.rq_peer.peer_ni = svc->srv_self.peer_ni;
+ svc->srv_flags &= ~SVC_EVENT;
+
+ spin_unlock(&svc->srv_lock);
rc = svc->srv_handler(obddev, svc, &request);
ptl_received_rpc(svc);
- svc->srv_flags &= ~SVC_EVENT;
continue;
}
struct ptlrpc_request *request;
svc->srv_flags = SVC_RUNNING;
- spin_lock(&svc->srv_lock);
request = list_entry(svc->srv_reqs.next,
struct ptlrpc_request,
rq_list);
continue;
}
CERROR("unknown break in service");
+ spin_unlock(&svc->srv_lock);
break;
}
return -ENOMEM;
}
+
/* Insert additional ME's to the ring */
if (i > 0) {
rc = PtlMEInsert(service->srv_me_h[i-1],
quit
EOF
-echo 8191 > /proc/sys/portals/debug
+echo 0xffffffff > /proc/sys/portals/debug
$OBDCTL <<EOF
device 0