Whamcloud - gitweb
094326ce6ac6bca9ab02257784b93527597267a9
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_rx_buf.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_rx_buf.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  */
40
41  #include "ptllnd.h"
42
43 void
44 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
45 {
46         memset(rxbp, 0, sizeof(*rxbp));
47         spin_lock_init(&rxbp->rxbp_lock);
48         CFS_INIT_LIST_HEAD(&rxbp->rxbp_list);
49 }
50
51 void
52 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
53 {
54         kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
55
56         LASSERT(rxb->rxb_refcount == 0);
57         LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
58         LASSERT(!rxb->rxb_posted);
59         LASSERT(rxb->rxb_idle);
60
61         cfs_list_del(&rxb->rxb_list);
62         rxbp->rxbp_count--;
63
64         LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
65         LIBCFS_FREE(rxb, sizeof(*rxb));
66 }
67
68 int
69 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
70 {
71         int               bufsize;
72         int               msgs_per_buffer;
73         int               rc;
74         kptl_rx_buffer_t *rxb;
75         char             *buffer;
76         unsigned long     flags;
77
78         bufsize = kptllnd_rx_buffer_size();
79         msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
80
81         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
82
83         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
84
85         for (;;) {
86                 if (rxbp->rxbp_shutdown) {
87                         rc = -ESHUTDOWN;
88                         break;
89                 }
90                 
91                 if (rxbp->rxbp_reserved + count <= 
92                     rxbp->rxbp_count * msgs_per_buffer) {
93                         rc = 0;
94                         break;
95                 }
96                 
97                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
98                 
99                 LIBCFS_ALLOC(rxb, sizeof(*rxb));
100                 LIBCFS_ALLOC(buffer, bufsize);
101
102                 if (rxb == NULL || buffer == NULL) {
103                         CERROR("Failed to allocate rx buffer\n");
104
105                         if (rxb != NULL)
106                                 LIBCFS_FREE(rxb, sizeof(*rxb));
107                         if (buffer != NULL)
108                                 LIBCFS_FREE(buffer, bufsize);
109                         
110                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
111                         rc = -ENOMEM;
112                         break;
113                 }
114
115                 memset(rxb, 0, sizeof(*rxb));
116
117                 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
118                 rxb->rxb_refcount = 0;
119                 rxb->rxb_pool = rxbp;
120                 rxb->rxb_idle = 0;
121                 rxb->rxb_posted = 0;
122                 rxb->rxb_buffer = buffer;
123                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
124
125                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
126                 
127                 if (rxbp->rxbp_shutdown) {
128                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
129                         
130                         LIBCFS_FREE(rxb, sizeof(*rxb));
131                         LIBCFS_FREE(buffer, bufsize);
132
133                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
134                         rc = -ESHUTDOWN;
135                         break;
136                 }
137                 
138                 cfs_list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
139                 rxbp->rxbp_count++;
140
141                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
142                 
143                 kptllnd_rx_buffer_post(rxb);
144
145                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
146         }
147
148         if (rc == 0)
149                 rxbp->rxbp_reserved += count;
150
151         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
152
153         return rc;
154 }
155
156 void
157 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
158                                  int count)
159 {
160         unsigned long flags;
161
162         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
163
164         CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
165         rxbp->rxbp_reserved -= count;
166
167         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
168 }
169
170 void
171 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
172 {
173         kptl_rx_buffer_t       *rxb;
174         int                     rc;
175         int                     i;
176         unsigned long           flags;
177         cfs_list_t             *tmp;
178         cfs_list_t             *nxt;
179         ptl_handle_md_t         mdh;
180
181         /* CAVEAT EMPTOR: I'm racing with everything here!!!
182          *
183          * Buffers can still be posted after I set rxbp_shutdown because I
184          * can't hold rxbp_lock while I'm posting them.
185          *
186          * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
187          * MD handle could become invalid under me.  I am vulnerable to portals
188          * re-using handles (i.e. make the same handle valid again, but for a
189          * different MD) from when the MD is actually unlinked, to when the
190          * event callback tells me it has been unlinked. */
191
192         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
193
194         rxbp->rxbp_shutdown = 1;
195
196         for (i = 9;; i++) {
197                 cfs_list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
198                         rxb = cfs_list_entry (tmp, kptl_rx_buffer_t, rxb_list);
199
200                         if (rxb->rxb_idle) {
201                                 spin_unlock_irqrestore(&rxbp->rxbp_lock,
202                                                            flags);
203                                 kptllnd_rx_buffer_destroy(rxb);
204                                 spin_lock_irqsave(&rxbp->rxbp_lock,
205                                                       flags);
206                                 continue;
207                         }
208
209                         mdh = rxb->rxb_mdh;
210                         if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
211                                 continue;
212                         
213                         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
214
215                         rc = PtlMDUnlink(mdh);
216
217                         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
218                         
219 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
220                         /* callback clears rxb_mdh and drops net's ref
221                          * (which causes repost, but since I set
222                          * shutdown, it will just set the buffer
223                          * idle) */
224 #else
225                         if (rc == PTL_OK) {
226                                 rxb->rxb_posted = 0;
227                                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
228                                 kptllnd_rx_buffer_decref_locked(rxb);
229                         }
230 #endif
231                 }
232
233                 if (cfs_list_empty(&rxbp->rxbp_list))
234                         break;
235
236                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
237
238                 /* Wait a bit for references to be dropped */
239                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
240                        "Waiting for %d Busy RX Buffers\n",
241                        rxbp->rxbp_count);
242
243                 cfs_pause(cfs_time_seconds(1));
244
245                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
246         }
247
248         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
249 }
250
251 void
252 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
253 {
254         int                     rc;
255         ptl_md_t                md;
256         ptl_handle_me_t         meh;
257         ptl_handle_md_t         mdh;
258         ptl_process_id_t        any;
259         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
260         unsigned long           flags;
261
262         LASSERT (!cfs_in_interrupt());
263         LASSERT (rxb->rxb_refcount == 0);
264         LASSERT (!rxb->rxb_idle);
265         LASSERT (!rxb->rxb_posted);
266         LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
267
268         any.nid = PTL_NID_ANY;
269         any.pid = PTL_PID_ANY;
270
271         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
272
273         if (rxbp->rxbp_shutdown) {
274                 rxb->rxb_idle = 1;
275                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
276                 return;
277         }
278
279         rxb->rxb_refcount = 1;                  /* net's ref */
280         rxb->rxb_posted = 1;                    /* I'm posting */
281         
282         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
283
284         rc = PtlMEAttach(kptllnd_data.kptl_nih,
285                          *kptllnd_tunables.kptl_portal,
286                          any,
287                          LNET_MSG_MATCHBITS,
288                          0, /* all matchbits are valid - ignore none */
289                          PTL_UNLINK,
290                          PTL_INS_AFTER,
291                          &meh);
292         if (rc != PTL_OK) {
293                 CERROR("PtlMeAttach rxb failed %s(%d)\n",
294                        kptllnd_errtype2str(rc), rc);
295                 goto failed;
296         }
297
298         /*
299          * Setup MD
300          */
301         md.start = rxb->rxb_buffer;
302         md.length = kptllnd_rx_buffer_size();
303         md.threshold = PTL_MD_THRESH_INF;
304         md.options = PTL_MD_OP_PUT |
305                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
306                      PTL_MD_EVENT_START_DISABLE |
307                      PTL_MD_MAX_SIZE |
308                      PTL_MD_LOCAL_ALIGN8;
309         md.user_ptr = &rxb->rxb_eventarg;
310         md.max_size = *kptllnd_tunables.kptl_max_msg_size;
311         md.eq_handle = kptllnd_data.kptl_eqh;
312
313         rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
314         if (rc == PTL_OK) {
315                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
316                 if (rxb->rxb_posted)            /* Not auto-unlinked yet!!! */
317                         rxb->rxb_mdh = mdh;
318                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
319                 return;
320         }
321         
322         CERROR("PtlMDAttach rxb failed %s(%d)\n",
323                kptllnd_errtype2str(rc), rc);
324         rc = PtlMEUnlink(meh);
325         LASSERT(rc == PTL_OK);
326
327  failed:
328         spin_lock_irqsave(&rxbp->rxbp_lock, flags);
329         rxb->rxb_posted = 0;
330         /* XXX this will just try again immediately */
331         kptllnd_rx_buffer_decref_locked(rxb);
332         spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
333 }
334
335 kptl_rx_t *
336 kptllnd_rx_alloc(void)
337 {
338         kptl_rx_t* rx;
339
340         if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
341                 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
342                 return NULL;
343         }
344
345         rx = kmem_cache_alloc(kptllnd_data.kptl_rx_cache, GFP_ATOMIC);
346         if (rx == NULL) {
347                 CERROR("Failed to allocate rx\n");
348                 return NULL;
349         }
350
351         memset(rx, 0, sizeof(*rx));
352         return rx;
353 }
354
355 void
356 kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
357 {
358         kptl_rx_buffer_t *rxb = rx->rx_rxb;
359         kptl_peer_t      *peer = rx->rx_peer;
360         unsigned long     flags;
361
362         LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
363                  post_credit == PTLLND_POSTRX_PEER_CREDIT);
364
365         CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
366
367         if (rxb != NULL)
368                 kptllnd_rx_buffer_decref(rxb);
369
370         if (peer != NULL) {
371                 /* Update credits (after I've decref-ed the buffer) */
372                 spin_lock_irqsave(&peer->peer_lock, flags);
373
374                 if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
375                         peer->peer_outstanding_credits++;
376
377                 LASSERT (peer->peer_outstanding_credits +
378                          peer->peer_sent_credits <=
379                          *kptllnd_tunables.kptl_peertxcredits);
380
381                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
382                        libcfs_id2str(peer->peer_id), peer->peer_credits,
383                        peer->peer_outstanding_credits, peer->peer_sent_credits,
384                        rx);
385
386                 spin_unlock_irqrestore(&peer->peer_lock, flags);
387
388                 /* I might have to send back credits */
389                 kptllnd_peer_check_sends(peer);
390                 kptllnd_peer_decref(peer);
391         }
392
393         kmem_cache_free(kptllnd_data.kptl_rx_cache, rx);
394 }
395
396 void
397 kptllnd_rx_buffer_callback (ptl_event_t *ev)
398 {
399         kptl_eventarg_t        *eva = ev->md.user_ptr;
400         kptl_rx_buffer_t       *rxb = kptllnd_eventarg2obj(eva);
401         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
402         kptl_rx_t              *rx;
403         int                     unlinked;
404         unsigned long           flags;
405
406 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
407         unlinked = ev->unlinked;
408 #else
409         unlinked = ev->type == PTL_EVENT_UNLINK;
410 #endif
411
412         CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
413                kptllnd_ptlid2str(ev->initiator),
414                kptllnd_evtype2str(ev->type), ev->type, rxb,
415                kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
416                unlinked);
417
418         LASSERT (!rxb->rxb_idle);
419         LASSERT (ev->md.start == rxb->rxb_buffer);
420         LASSERT (ev->offset + ev->mlength <=
421                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
422         LASSERT (ev->type == PTL_EVENT_PUT_END ||
423                  ev->type == PTL_EVENT_UNLINK);
424         LASSERT (ev->type == PTL_EVENT_UNLINK ||
425                  ev->match_bits == LNET_MSG_MATCHBITS);
426
427         if (ev->ni_fail_type != PTL_NI_OK) {
428                 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
429                        kptllnd_ptlid2str(ev->initiator),
430                        kptllnd_evtype2str(ev->type), ev->type, rxb,
431                        kptllnd_errtype2str(ev->ni_fail_type),
432                        ev->ni_fail_type, unlinked);
433         } else if (ev->type == PTL_EVENT_PUT_END &&
434                    !rxbp->rxbp_shutdown) {
435
436                 /* rxbp_shutdown sampled without locking!  I only treat it as a
437                  * hint since shutdown can start while rx's are queued on
438                  * kptl_sched_rxq. */
439 #if (PTL_MD_LOCAL_ALIGN8 == 0)
440                 /* Portals can't force message alignment - someone sending an
441                  * odd-length message will misalign subsequent messages and
442                  * force the fixup below...  */
443                 if ((ev->mlength & 7) != 0)
444                         CWARN("Message from %s has odd length "LPU64": "
445                               "probable version incompatibility\n",
446                               kptllnd_ptlid2str(ev->initiator),
447                               (__u64)ev->mlength);
448 #endif
449                 rx = kptllnd_rx_alloc();
450                 if (rx == NULL) {
451                         CERROR("Message from %s dropped: ENOMEM",
452                                kptllnd_ptlid2str(ev->initiator));
453                 } else {
454                         if ((ev->offset & 7) == 0) {
455                                 kptllnd_rx_buffer_addref(rxb);
456                                 rx->rx_rxb = rxb;
457                                 rx->rx_nob = ev->mlength;
458                                 rx->rx_msg = (kptl_msg_t *)
459                                              (rxb->rxb_buffer + ev->offset);
460                         } else {
461 #if (PTL_MD_LOCAL_ALIGN8 == 0)
462                                 /* Portals can't force alignment - copy into
463                                  * rx_space (avoiding overflow) to fix */
464                                 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
465
466                                 rx->rx_rxb = NULL;
467                                 rx->rx_nob = MIN(maxlen, ev->mlength);
468                                 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
469                                 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
470                                        rx->rx_nob);
471 #else
472                                 /* Portals should have forced the alignment */
473                                 LBUG();
474 #endif
475                         }
476
477                         rx->rx_initiator = ev->initiator;
478                         rx->rx_treceived = jiffies;
479                         /* Queue for attention */
480                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
481                                               flags);
482
483                         cfs_list_add_tail(&rx->rx_list,
484                                           &kptllnd_data.kptl_sched_rxq);
485                         cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
486
487                         spin_unlock_irqrestore(&kptllnd_data. \
488                                                    kptl_sched_lock, flags);
489                 }
490         }
491
492         if (unlinked) {
493                 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
494
495                 rxb->rxb_posted = 0;
496                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
497                 kptllnd_rx_buffer_decref_locked(rxb);
498
499                 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
500         }
501 }
502
503 void
504 kptllnd_nak (ptl_process_id_t dest)
505 {
506         /* Fire-and-forget a stub message that will let the peer know my
507          * protocol magic/version and make her drop/refresh any peer state she
508          * might have with me. */
509         ptl_md_t md = {
510                 .start        = kptllnd_data.kptl_nak_msg,
511                 .length       = kptllnd_data.kptl_nak_msg->ptlm_nob,
512                 .threshold    = 1,
513                 .options      = 0,
514                 .user_ptr     = NULL,
515                 .eq_handle    = PTL_EQ_NONE};
516         ptl_handle_md_t   mdh;
517         int               rc;
518
519         rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
520         if (rc != PTL_OK) {
521                 CWARN("Can't NAK %s: bind failed %s(%d)\n",
522                       kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
523                 return;
524         }
525
526         rc = PtlPut(mdh, PTL_NOACK_REQ, dest,
527                     *kptllnd_tunables.kptl_portal, 0,
528                     LNET_MSG_MATCHBITS, 0, 0);
529         if (rc != PTL_OK) {
530                 CWARN("Can't NAK %s: put failed %s(%d)\n",
531                       kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
532         }
533 }
534
535 kptl_net_t *
536 kptllnd_find_net (lnet_nid_t nid)
537 {
538         kptl_net_t *net;
539
540         read_lock(&kptllnd_data.kptl_net_rw_lock);
541         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
542                 LASSERT (!net->net_shutdown);
543
544                 if (net->net_ni->ni_nid == nid) {
545                         kptllnd_net_addref(net);
546                         read_unlock(&kptllnd_data.kptl_net_rw_lock);
547                         return net;
548                 }
549         }
550         read_unlock(&kptllnd_data.kptl_net_rw_lock);
551
552         return NULL;
553 }
554
555 void
556 kptllnd_rx_parse(kptl_rx_t *rx)
557 {
558         kptl_msg_t             *msg = rx->rx_msg;
559         int                     rc = 0;
560         int                     post_credit = PTLLND_POSTRX_PEER_CREDIT;
561         kptl_net_t             *net = NULL;
562         kptl_peer_t            *peer;
563         cfs_list_t              txs;
564         unsigned long           flags;
565         lnet_process_id_t       srcid;
566
567         LASSERT (!cfs_in_interrupt());
568         LASSERT (rx->rx_peer == NULL);
569
570         CFS_INIT_LIST_HEAD(&txs);
571
572         if ((rx->rx_nob >= 4 &&
573              (msg->ptlm_magic == LNET_PROTO_MAGIC ||
574               msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
575             (rx->rx_nob >= 6 &&
576              ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
577                msg->ptlm_version != PTLLND_MSG_VERSION) ||
578               (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
579                msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
580                 /* NAK incompatible versions
581                  * See other LNDs for how to handle this if/when ptllnd begins
582                  * to allow different versions to co-exist */
583                 CERROR("Bad version: got %04x expected %04x from %s\n",
584                        (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
585                                msg->ptlm_version : __swab16(msg->ptlm_version)),
586                         PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
587                 /* NB backward compatibility */
588                 kptllnd_nak(rx->rx_initiator);
589                 goto rx_done;
590         }
591         
592         rc = kptllnd_msg_unpack(msg, rx->rx_nob);
593         if (rc != 0) {
594                 CERROR ("Error %d unpacking rx from %s\n",
595                         rc, kptllnd_ptlid2str(rx->rx_initiator));
596                 goto rx_done;
597         }
598
599         srcid.nid = msg->ptlm_srcnid;
600         srcid.pid = msg->ptlm_srcpid;
601
602         CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
603                libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
604                msg->ptlm_credits, rx, rx->rx_rxb, 
605                jiffies - rx->rx_treceived,
606                cfs_duration_sec(jiffies - rx->rx_treceived));
607
608         if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) {
609                 CERROR("Bad source nid %s from %s\n",
610                        libcfs_id2str(srcid),
611                        kptllnd_ptlid2str(rx->rx_initiator));
612                 goto rx_done;
613         }
614
615         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
616                 peer = kptllnd_id2peer(srcid);
617                 if (peer == NULL)
618                         goto rx_done;
619                 
620                 CWARN("NAK from %s (%d:%s)\n",
621                       libcfs_id2str(srcid), peer->peer_state,
622                       kptllnd_ptlid2str(rx->rx_initiator));
623
624                 /* NB can't nuke new peer - bug 17546 comment 31 */
625                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
626                         CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n",
627                                libcfs_id2str(srcid),
628                                kptllnd_ptlid2str(rx->rx_initiator));
629                         kptllnd_peer_decref(peer);
630                         goto rx_done;
631                 }
632
633                 rc = -EPROTO;
634                 goto failed;
635         }
636
637         net = kptllnd_find_net(msg->ptlm_dstnid);
638         if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) {
639                 CERROR("Bad dstid %s from %s\n",
640                        libcfs_id2str((lnet_process_id_t) {
641                                .nid = msg->ptlm_dstnid,
642                                .pid = msg->ptlm_dstpid}),
643                        kptllnd_ptlid2str(rx->rx_initiator));
644                 goto rx_done;
645         }
646
647         if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) {
648                 lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid),
649                                             LNET_NIDADDR(srcid.nid));
650                 CERROR("Bad source nid %s from %s, %s expected.\n",
651                        libcfs_id2str(srcid),
652                        kptllnd_ptlid2str(rx->rx_initiator),
653                        libcfs_nid2str(nid));
654                 goto rx_done;
655         }
656
657         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
658                 peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg);
659                 if (peer == NULL)
660                         goto rx_done;
661         } else {
662                 peer = kptllnd_id2peer(srcid);
663                 if (peer == NULL) {
664                         CWARN("NAK %s: no connection, %s must reconnect\n",
665                               kptllnd_msgtype2str(msg->ptlm_type),
666                               libcfs_id2str(srcid));
667                         /* NAK to make the peer reconnect */
668                         kptllnd_nak(rx->rx_initiator);
669                         goto rx_done;
670                 }
671
672                 /* Ignore any messages for a previous incarnation of me */
673                 if (msg->ptlm_dststamp < peer->peer_myincarnation) {
674                         kptllnd_peer_decref(peer);
675                         goto rx_done;
676                 }
677
678                 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
679                         CERROR("%s: Unexpected dststamp "LPX64" "
680                                "("LPX64" expected)\n",
681                                libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
682                                peer->peer_myincarnation);
683                         rc = -EPROTO;
684                         goto failed;
685                 }
686
687                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
688                         /* recoverable error - restart txs */
689                         spin_lock_irqsave(&peer->peer_lock, flags);
690                         kptllnd_cancel_txlist(&peer->peer_sendq, &txs);
691                         spin_unlock_irqrestore(&peer->peer_lock, flags);
692
693                         CWARN("NAK %s: Unexpected %s message\n",
694                               libcfs_id2str(srcid),
695                               kptllnd_msgtype2str(msg->ptlm_type));
696                         kptllnd_nak(rx->rx_initiator);
697                         rc = -EPROTO;
698                         goto failed;
699                 }
700
701                 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
702                         CERROR("%s: Unexpected srcstamp "LPX64" "
703                                "("LPX64" expected)\n",
704                                libcfs_id2str(srcid),
705                                msg->ptlm_srcstamp,
706                                peer->peer_incarnation);
707                         rc = -EPROTO;
708                         goto failed;
709                 }
710         }
711
712         LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) ==
713                          LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n",
714                   libcfs_nid2str(msg->ptlm_srcnid),
715                   libcfs_nid2str(peer->peer_id.nid));
716         LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n",
717                   msg->ptlm_srcpid, peer->peer_id.pid);
718
719         spin_lock_irqsave(&peer->peer_lock, flags);
720
721         /* Check peer only sends when I've sent her credits */
722         if (peer->peer_sent_credits == 0) {
723                 int  c = peer->peer_credits;
724                 int oc = peer->peer_outstanding_credits;
725                 int sc = peer->peer_sent_credits;
726
727                 spin_unlock_irqrestore(&peer->peer_lock, flags);
728
729                 CERROR("%s: buffer overrun [%d/%d+%d]\n",
730                        libcfs_id2str(peer->peer_id), c, sc, oc);
731                 rc = -EPROTO;
732                 goto failed;
733         }
734         peer->peer_sent_credits--;
735
736         /* No check for credit overflow - the peer may post new
737          * buffers after the startup handshake. */
738         peer->peer_credits += msg->ptlm_credits;
739
740         /* This ensures the credit taken by NOOP can be returned */
741         if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
742                 peer->peer_outstanding_credits++;
743                 post_credit = PTLLND_POSTRX_NO_CREDIT;
744         }
745
746         spin_unlock_irqrestore(&peer->peer_lock, flags);
747
748         /* See if something can go out now that credits have come in */
749         if (msg->ptlm_credits != 0)
750                 kptllnd_peer_check_sends(peer);
751
752         /* ptllnd-level protocol correct - rx takes my ref on peer and increments
753          * peer_outstanding_credits when it completes */
754         rx->rx_peer = peer;
755         kptllnd_peer_alive(peer);
756
757         switch (msg->ptlm_type) {
758         default:
759                 /* already checked by kptllnd_msg_unpack() */
760                 LBUG();
761
762         case PTLLND_MSG_TYPE_HELLO:
763                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
764                 goto rx_done;
765
766         case PTLLND_MSG_TYPE_NOOP:
767                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
768                 goto rx_done;
769
770         case PTLLND_MSG_TYPE_IMMEDIATE:
771                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
772                 rc = lnet_parse(net->net_ni,
773                                 &msg->ptlm_u.immediate.kptlim_hdr,
774                                 msg->ptlm_srcnid,
775                                 rx, 0);
776                 if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
777                         kptllnd_net_decref(net);
778                         return;
779                 }
780                 goto failed;
781                 
782         case PTLLND_MSG_TYPE_PUT:
783         case PTLLND_MSG_TYPE_GET:
784                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
785                         msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
786                         "PUT" : "GET");
787
788                 /* checked in kptllnd_msg_unpack() */
789                 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >= 
790                          PTL_RESERVED_MATCHBITS);
791
792                 /* Update last match bits seen */
793                 spin_lock_irqsave(&peer->peer_lock, flags);
794
795                 if (msg->ptlm_u.rdma.kptlrm_matchbits >
796                     rx->rx_peer->peer_last_matchbits_seen)
797                         rx->rx_peer->peer_last_matchbits_seen =
798                                 msg->ptlm_u.rdma.kptlrm_matchbits;
799
800                 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
801
802                 rc = lnet_parse(net->net_ni,
803                                 &msg->ptlm_u.rdma.kptlrm_hdr,
804                                 msg->ptlm_srcnid,
805                                 rx, 1);
806                 if (rc >= 0) {                  /* kptllnd_recv owns 'rx' now */
807                         kptllnd_net_decref(net);
808                         return;
809                 }
810                 goto failed;
811         }
812
813  failed:
814         LASSERT (rc != 0);
815         kptllnd_peer_close(peer, rc);
816         if (rx->rx_peer == NULL)                /* drop ref on peer */
817                 kptllnd_peer_decref(peer);      /* unless rx_done will */
818         if (!cfs_list_empty(&txs)) {
819                 LASSERT (net != NULL);
820                 kptllnd_restart_txs(net, srcid, &txs);
821         }
822  rx_done:
823         if (net != NULL)
824                 kptllnd_net_decref(net);
825         kptllnd_rx_done(rx, post_credit);
826 }