X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=4d17c7de15bcb8e95db077d8939631543cdb4807;hb=cd7e28f9cee1a4888af825c1cb80e2111634d6eb;hp=9363251054ae662971c22d04df3919ef43d8337a;hpb=c065f52531e335044388b2759712eeecbb1e78e9;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 9363251..4d17c7d 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -5,9 +5,8 @@ * Message decoding, parsing and finalizing routines * * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Copyright (c) 2001-2002 Sandia National Laboratories * - * This file is part of Lustre, http://www.sf.net/projects/lustre/ + * This file is part of Lustre, http://www.lustre.org * * Lustre is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public @@ -32,133 +31,116 @@ #include -int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) +void +lib_enq_event_locked (lib_nal_t *nal, void *private, + lib_eq_t *eq, ptl_event_t *ev) +{ + ptl_event_t *eq_slot; + + /* Allocate the next queue slot */ + ev->link = ev->sequence = eq->eq_enq_seq++; + /* NB we don't support START events yet and we don't create a separate + * UNLINK event unless an explicit unlink succeeds, so the link + * sequence is pretty useless */ + + /* We don't support different uid/jids yet */ + ev->uid = 0; + ev->jid = 0; + + /* size must be a power of 2 to handle sequence # overflow */ + LASSERT (eq->eq_size != 0 && + eq->eq_size == LOWEST_BIT_SET (eq->eq_size)); + eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1)); + + /* There is no race since both event consumers and event producers + * take the LIB_LOCK(), so we don't screw around with memory + * barriers, setting the sequence number last or wierd structure + * layout assertions. */ + *eq_slot = *ev; + + /* Call the callback handler (if any) */ + if (eq->eq_callback != NULL) + eq->eq_callback (eq_slot); + + /* Wake anyone sleeping for an event (see lib-eq.c) */ +#ifdef __KERNEL__ + if (waitqueue_active(&nal->libnal_ni.ni_waitq)) + wake_up_all(&nal->libnal_ni.ni_waitq); +#else + pthread_cond_broadcast(&nal->libnal_ni.ni_cond); +#endif +} + +void +lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t status) { lib_md_t *md; - lib_eq_t *eq; - int rc; + int unlink; unsigned long flags; - - /* ni went down while processing this message */ - if (nal->ni.up == 0) { - return -1; - } + int rc; + ptl_hdr_t ack; if (msg == NULL) - return 0; + return; - rc = 0; - if (msg->send_ack) { - ptl_hdr_t ack; + /* Only send an ACK if the PUT completed successfully */ + if (status == PTL_OK && + !ptl_is_wire_handle_none(&msg->ack_wmd)) { - LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd)); + LASSERT(msg->ev.type == PTL_EVENT_PUT_END); memset (&ack, 0, sizeof (ack)); - ack.type = HTON__u32 (PTL_MSG_ACK); - ack.dest_nid = HTON__u64 (msg->nid); - ack.src_nid = HTON__u64 (nal->ni.nid); - ack.dest_pid = HTON__u32 (msg->pid); - ack.src_pid = HTON__u32 (nal->ni.pid); - PTL_HDR_LENGTH(&ack) = 0; + ack.type = cpu_to_le32(PTL_MSG_ACK); + ack.dest_nid = cpu_to_le64(msg->ev.initiator.nid); + ack.dest_pid = cpu_to_le32(msg->ev.initiator.pid); + ack.src_nid = cpu_to_le64(nal->libnal_ni.ni_pid.nid); + ack.src_pid = cpu_to_le32(nal->libnal_ni.ni_pid.pid); + ack.payload_length = 0; ack.msg.ack.dst_wmd = msg->ack_wmd; ack.msg.ack.match_bits = msg->ev.match_bits; - ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength); + ack.msg.ack.mlength = cpu_to_le32(msg->ev.mlength); rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK, - msg->nid, msg->pid, NULL, 0, 0); - /* If this send fails, there's nothing else to clean up */ + msg->ev.initiator.nid, msg->ev.initiator.pid, + NULL, 0, 0); + if (rc != PTL_OK) { + /* send failed: there's nothing else to clean up. */ + CERROR("Error %d sending ACK to "LPX64"\n", + rc, msg->ev.initiator.nid); + } } md = msg->md; - LASSERT (md->pending > 0); /* I've not dropped my ref yet */ - eq = md->eq; - - state_lock(nal, &flags); - - if (eq != NULL) { - ptl_event_t *ev = &msg->ev; - ptl_event_t *eq_slot; - - /* I have to hold the lock while I bump the sequence number - * and copy the event into the queue. If not, and I was - * interrupted after bumping the sequence number, other - * events could fill the queue, including the slot I just - * allocated to this event. On resuming, I would overwrite - * a more 'recent' event with old event state, and - * processes taking events off the queue would not detect - * overflow correctly. - */ - - ev->sequence = eq->sequence++;/* Allocate the next queue slot */ - - /* size must be a power of 2 to handle a wrapped sequence # */ - LASSERT (eq->size != 0 && - eq->size == LOWEST_BIT_SET (eq->size)); - eq_slot = eq->base + (ev->sequence & (eq->size - 1)); - - /* Invalidate unlinked_me unless this is the last - * event for an auto-unlinked MD. Note that if md was - * auto-unlinked, md->pending can only decrease - */ - if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */ - md->pending != 1) /* not last ref */ - ev->unlinked_me = PTL_HANDLE_NONE; - - /* Copy the event into the allocated slot, ensuring all the - * rest of the event's contents have been copied _before_ - * the sequence number gets updated. A processes 'getting' - * an event waits on the next queue slot's sequence to be - * 'new'. When it is, _all_ other event fields had better - * be consistent. I assert 'sequence' is the last member, - * so I only need a 2 stage copy. - */ - LASSERT(sizeof (ptl_event_t) == - offsetof(ptl_event_t, sequence) + sizeof(ev->sequence)); - - rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev, - offsetof (ptl_event_t, sequence)); - LASSERT (rc == 0); -#ifdef __KERNEL__ - barrier(); -#endif - /* Updating the sequence number is what makes the event 'new' */ + LIB_LOCK(nal, flags); - /* cb_write is not necessarily atomic, so this could - cause a race with PtlEQGet */ - rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence, - (void *)&ev->sequence,sizeof (ev->sequence)); - LASSERT (rc == 0); + /* Now it's safe to drop my caller's ref */ + md->pending--; + LASSERT (md->pending >= 0); -#ifdef __KERNEL__ - barrier(); -#endif + /* Should I unlink this MD? */ + if (md->pending != 0) /* other refs */ + unlink = 0; + else if ((md->md_flags & PTL_MD_FLAG_ZOMBIE) != 0) + unlink = 1; + else if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) == 0) + unlink = 0; + else + unlink = lib_md_exhausted(md); - /* I must also ensure that (a) callbacks are made in the - * same order as the events land in the queue, and (b) the - * callback occurs before the event can be removed from the - * queue, so I can't drop the lock during the callback. */ - if (nal->cb_callback != NULL) - nal->cb_callback(nal, private, eq, ev); - else if (eq->event_callback != NULL) - (void)((eq->event_callback) (ev)); - } + msg->ev.ni_fail_type = status; + msg->ev.unlinked = unlink; - LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0); + if (md->eq != NULL) + lib_enq_event_locked(nal, private, md->eq, &msg->ev); - md->pending--; - if (md->pending == 0 && /* no more outstanding operations on this md */ - (md->threshold == 0 || /* done its business */ - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */ + if (unlink) lib_md_unlink(nal, md); list_del (&msg->msg_list); - nal->ni.counters.msgs_alloc--; + nal->libnal_ni.ni_counters.msgs_alloc--; lib_msg_free(nal, msg); - state_unlock(nal, &flags); - - return rc; + LIB_UNLOCK(nal, flags); }