Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lnet / lnet / lib-ptl.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /* Copyright (c) 2012, 2017, Intel Corporation. */
4
5 /* This file is part of Lustre, http://www.lustre.org/
6  *
7  * portal & match routines
8  *
9  * Author: liang@whamcloud.com
10  */
11
12 #define DEBUG_SUBSYSTEM S_LNET
13
14 #include <lnet/lib-lnet.h>
15
16 /* NB: add /proc interfaces in upcoming patches */
17 int portal_rotor = LNET_PTL_ROTOR_HASH_RT;
18 module_param(portal_rotor, int, 0644);
19 MODULE_PARM_DESC(portal_rotor, "redirect PUTs to different cpu-partitions");
20
21 static int
22 lnet_ptl_match_type(unsigned int index, struct lnet_processid *match_id,
23                     __u64 mbits, __u64 ignore_bits)
24 {
25         struct lnet_portal      *ptl = the_lnet.ln_portals[index];
26         int                     unique;
27
28         unique = (ignore_bits == 0 &&
29                   !LNET_NID_IS_ANY(&match_id->nid) &&
30                   match_id->pid != LNET_PID_ANY);
31
32         LASSERT(!lnet_ptl_is_unique(ptl) || !lnet_ptl_is_wildcard(ptl));
33
34         /* prefer to check w/o any lock */
35         if (likely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl)))
36                 goto match;
37
38         /* unset, new portal */
39         lnet_ptl_lock(ptl);
40         /* check again with lock */
41         if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) {
42                 lnet_ptl_unlock(ptl);
43                 goto match;
44         }
45
46         /* still not set */
47         if (unique)
48                 lnet_ptl_setopt(ptl, LNET_PTL_MATCH_UNIQUE);
49         else
50                 lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
51
52         lnet_ptl_unlock(ptl);
53
54         return 1;
55
56  match:
57         if ((lnet_ptl_is_unique(ptl) && !unique) ||
58             (lnet_ptl_is_wildcard(ptl) && unique))
59                 return 0;
60         return 1;
61 }
62
63 static void
64 lnet_ptl_enable_mt(struct lnet_portal *ptl, int cpt)
65 {
66         struct lnet_match_table *mtable = ptl->ptl_mtables[cpt];
67         int                     i;
68
69         /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
70         LASSERT(lnet_ptl_is_wildcard(ptl));
71
72         mtable->mt_enabled = 1;
73
74         ptl->ptl_mt_maps[ptl->ptl_mt_nmaps] = cpt;
75         for (i = ptl->ptl_mt_nmaps - 1; i >= 0; i--) {
76                 LASSERT(ptl->ptl_mt_maps[i] != cpt);
77                 if (ptl->ptl_mt_maps[i] < cpt)
78                         break;
79
80                 /* swap to order */
81                 ptl->ptl_mt_maps[i + 1] = ptl->ptl_mt_maps[i];
82                 ptl->ptl_mt_maps[i] = cpt;
83         }
84
85         ptl->ptl_mt_nmaps++;
86 }
87
88 static void
89 lnet_ptl_disable_mt(struct lnet_portal *ptl, int cpt)
90 {
91         struct lnet_match_table *mtable = ptl->ptl_mtables[cpt];
92         int                     i;
93
94         /* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
95         LASSERT(lnet_ptl_is_wildcard(ptl));
96
97         if (LNET_CPT_NUMBER == 1)
98                 return; /* never disable the only match-table */
99
100         mtable->mt_enabled = 0;
101
102         LASSERT(ptl->ptl_mt_nmaps > 0 &&
103                 ptl->ptl_mt_nmaps <= LNET_CPT_NUMBER);
104
105         /* remove it from mt_maps */
106         ptl->ptl_mt_nmaps--;
107         for (i = 0; i < ptl->ptl_mt_nmaps; i++) {
108                 if (ptl->ptl_mt_maps[i] >= cpt) /* overwrite it */
109                         ptl->ptl_mt_maps[i] = ptl->ptl_mt_maps[i + 1];
110         }
111 }
112
113 static int
114 lnet_try_match_md(struct lnet_libmd *md,
115                   struct lnet_match_info *info, struct lnet_msg *msg)
116 {
117         /* ALWAYS called holding the lnet_res_lock, and can't lnet_res_unlock;
118          * lnet_match_blocked_msg() relies on this to avoid races */
119         unsigned int    offset;
120         unsigned int    mlength;
121         struct lnet_me  *me = md->md_me;
122
123         /* MD exhausted */
124         if (lnet_md_exhausted(md))
125                 return LNET_MATCHMD_NONE | LNET_MATCHMD_EXHAUSTED;
126
127         /* mismatched MD op */
128         if ((md->md_options & info->mi_opc) == 0)
129                 return LNET_MATCHMD_NONE;
130
131         /* mismatched ME matchbits? */
132         if (((me->me_match_bits ^ info->mi_mbits) & ~me->me_ignore_bits) != 0)
133                 return LNET_MATCHMD_NONE;
134
135         /* mismatched PID? */
136         if (me->me_match_id.pid != LNET_PID_ANY &&
137             me->me_match_id.pid != info->mi_id.pid)
138                 return LNET_MATCHMD_NONE;
139
140         /* try to accept match based on bits only */
141         if ((!LNET_NID_IS_ANY(&me->me_match_id.nid) &&
142              !nid_same(&me->me_match_id.nid, &info->mi_id.nid)) ||
143             (!LNET_NID_IS_ANY(&me->me_match_id.nid) &&
144              CFS_FAIL_CHECK(CFS_FAIL_MATCH_MD_NID))) {
145                 struct lnet_peer *lp_me, *lp_peer;
146
147                 /* check if ME NID matches another NID of same peer */
148                 lp_me = lnet_find_peer(&me->me_match_id.nid);
149                 lp_peer = lnet_find_peer(&info->mi_id.nid);
150
151                 if (lp_me && lp_peer && (lp_me == lp_peer)) {
152                         /* Shouldn't happen, but better than dropping
153                          * message entirely. Print warning so we know
154                          * it happens, and something needs to be fixed.
155                          */
156                         CWARN("message from %s matched %llu with NID mismatch %s accepted (same peer %pK)\n",
157                               libcfs_idstr(&info->mi_id),
158                               info->mi_mbits,
159                               libcfs_nidstr(&me->me_match_id.nid),
160                               lp_me);
161                         lnet_peer_decref_locked(lp_me);
162                         lnet_peer_decref_locked(lp_peer);
163                 } else {
164                         CWARN("message from %s matched %llu with NID mismatch %s rejected (different peer %pK != %pK)\n",
165                                 libcfs_idstr(&info->mi_id),
166                                 info->mi_mbits,
167                                 libcfs_nidstr(&me->me_match_id.nid),
168                                 lp_me, lp_peer);
169                         if (lp_me)
170                                 lnet_peer_decref_locked(lp_me);
171                         if (lp_peer)
172                                 lnet_peer_decref_locked(lp_peer);
173
174                         return LNET_MATCHMD_NONE;
175                 }
176         }
177
178         /* Hurrah! This _is_ a match; check it out... */
179
180         if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
181                 offset = md->md_offset;
182         else
183                 offset = info->mi_roffset;
184
185         if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
186                 mlength = md->md_max_size;
187                 LASSERT(md->md_offset + mlength <= md->md_length);
188         } else {
189                 mlength = md->md_length - offset;
190         }
191
192         if (info->mi_rlength <= mlength) {      /* fits in allowed space */
193                 mlength = info->mi_rlength;
194         } else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
195                 /* this packet _really_ is too big */
196                 CERROR("Matching packet from %s, match %llu"
197                        " length %d too big: %d left, %d allowed\n",
198                        libcfs_idstr(&info->mi_id), info->mi_mbits,
199                        info->mi_rlength, md->md_length - offset, mlength);
200
201                 return LNET_MATCHMD_DROP;
202         }
203
204         /* Commit to this ME/MD */
205         CDEBUG(D_NET, "Incoming %s index %x from %s of "
206                "length %d/%d into md %#llx [%d] + %d\n",
207                (info->mi_opc == LNET_MD_OP_PUT) ? "put" : "get",
208                info->mi_portal, libcfs_idstr(&info->mi_id), mlength,
209                info->mi_rlength, md->md_lh.lh_cookie, md->md_niov, offset);
210
211         lnet_msg_attach_md(msg, md, offset, mlength);
212         md->md_offset = offset + mlength;
213
214         if (!lnet_md_exhausted(md))
215                 return LNET_MATCHMD_OK;
216
217         /* Auto-unlink NOW, so the ME gets unlinked if required.
218          * We bumped md->md_refcount above so the MD just gets flagged
219          * for unlink when it is finalized. */
220         if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0)
221                 lnet_md_unlink(md);
222
223         return LNET_MATCHMD_OK | LNET_MATCHMD_EXHAUSTED;
224 }
225
226 static struct lnet_match_table *
227 lnet_match2mt(struct lnet_portal *ptl, struct lnet_processid *id, __u64 mbits)
228 {
229         if (LNET_CPT_NUMBER == 1)
230                 return ptl->ptl_mtables[0]; /* the only one */
231
232         /* if it's a unique portal, return match-table hashed by NID */
233         return lnet_ptl_is_unique(ptl) ?
234                ptl->ptl_mtables[lnet_nid2cpt(&id->nid, NULL)] : NULL;
235 }
236
237 struct lnet_match_table *
238 lnet_mt_of_attach(unsigned int index, struct lnet_processid *id,
239                   __u64 mbits, __u64 ignore_bits, enum lnet_ins_pos pos)
240 {
241         struct lnet_portal      *ptl;
242         struct lnet_match_table *mtable;
243
244         /* NB: called w/o lock */
245         LASSERT(index < the_lnet.ln_nportals);
246
247         if (!lnet_ptl_match_type(index, id, mbits, ignore_bits))
248                 return NULL;
249
250         ptl = the_lnet.ln_portals[index];
251
252         mtable = lnet_match2mt(ptl, id, mbits);
253         if (mtable != NULL) /* unique portal or only one match-table */
254                 return mtable;
255
256         /* it's a wildcard portal */
257         switch (pos) {
258         default:
259                 return NULL;
260         case LNET_INS_BEFORE:
261         case LNET_INS_AFTER:
262                 /* posted by no affinity thread, always hash to specific
263                  * match-table to avoid buffer stealing which is heavy */
264                 return ptl->ptl_mtables[ptl->ptl_index % LNET_CPT_NUMBER];
265         case LNET_INS_LOCAL:
266                 /* posted by cpu-affinity thread */
267                 return ptl->ptl_mtables[lnet_cpt_current()];
268         }
269 }
270
271 static struct lnet_match_table *
272 lnet_mt_of_match(struct lnet_match_info *info, struct lnet_msg *msg)
273 {
274         struct lnet_match_table *mtable;
275         struct lnet_portal      *ptl;
276         unsigned int            nmaps;
277         unsigned int            rotor;
278         unsigned int            cpt;
279         bool                    routed;
280
281         /* NB: called w/o lock */
282         LASSERT(info->mi_portal < the_lnet.ln_nportals);
283         ptl = the_lnet.ln_portals[info->mi_portal];
284
285         LASSERT(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl));
286
287         mtable = lnet_match2mt(ptl, &info->mi_id, info->mi_mbits);
288         if (mtable != NULL)
289                 return mtable;
290
291         /* it's a wildcard portal */
292         routed = LNET_NID_NET(&msg->msg_hdr.src_nid) !=
293                  LNET_NID_NET(&msg->msg_hdr.dest_nid);
294
295         if (portal_rotor == LNET_PTL_ROTOR_OFF ||
296             (portal_rotor != LNET_PTL_ROTOR_ON && !routed)) {
297                 cpt = lnet_cpt_current();
298                 if (ptl->ptl_mtables[cpt]->mt_enabled)
299                         return ptl->ptl_mtables[cpt];
300         }
301
302         rotor = ptl->ptl_rotor++; /* get round-robin factor */
303         if (portal_rotor == LNET_PTL_ROTOR_HASH_RT && routed)
304                 cpt = info->mi_cpt;
305         else
306                 cpt = rotor % LNET_CPT_NUMBER;
307
308         if (!ptl->ptl_mtables[cpt]->mt_enabled) {
309                 /* is there any active entry for this portal? */
310                 nmaps = ptl->ptl_mt_nmaps;
311                 /* map to an active mtable to avoid heavy "stealing" */
312                 if (nmaps != 0) {
313                         /* NB: there is possibility that ptl_mt_maps is being
314                          * changed because we are not under protection of
315                          * lnet_ptl_lock, but it shouldn't hurt anything */
316                         cpt = ptl->ptl_mt_maps[rotor % nmaps];
317                 }
318         }
319
320         return ptl->ptl_mtables[cpt];
321 }
322
323 static int
324 lnet_mt_test_exhausted(struct lnet_match_table *mtable, int pos)
325 {
326         __u64   *bmap;
327         int     i;
328
329         if (!lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
330                 return 0;
331
332         if (pos < 0) { /* check all bits */
333                 for (i = 0; i < LNET_MT_EXHAUSTED_BMAP; i++) {
334                         if (mtable->mt_exhausted[i] != (__u64)(-1))
335                                 return 0;
336                 }
337                 return 1;
338         }
339
340         LASSERT(pos <= LNET_MT_HASH_IGNORE);
341         /* mtable::mt_mhash[pos] is marked as exhausted or not */
342         bmap = &mtable->mt_exhausted[pos >> LNET_MT_BITS_U64];
343         pos &= (1 << LNET_MT_BITS_U64) - 1;
344
345         return ((*bmap) & (1ULL << pos)) != 0;
346 }
347
348 static void
349 lnet_mt_set_exhausted(struct lnet_match_table *mtable, int pos, int exhausted)
350 {
351         __u64   *bmap;
352
353         LASSERT(lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]));
354         LASSERT(pos <= LNET_MT_HASH_IGNORE);
355
356         /* set mtable::mt_mhash[pos] as exhausted/non-exhausted */
357         bmap = &mtable->mt_exhausted[pos >> LNET_MT_BITS_U64];
358         pos &= (1 << LNET_MT_BITS_U64) - 1;
359
360         if (!exhausted)
361                 *bmap &= ~(1ULL << pos);
362         else
363                 *bmap |= 1ULL << pos;
364 }
365
366 struct list_head *
367 lnet_mt_match_head(struct lnet_match_table *mtable,
368                    struct lnet_processid *id, __u64 mbits)
369 {
370         struct lnet_portal *ptl = the_lnet.ln_portals[mtable->mt_portal];
371
372         if (lnet_ptl_is_wildcard(ptl)) {
373                 return &mtable->mt_mhash[mbits & LNET_MT_HASH_MASK];
374         } else {
375                 unsigned long hash = mbits + nidhash(&id->nid) + id->pid;
376
377                 LASSERT(lnet_ptl_is_unique(ptl));
378                 hash = cfs_hash_long(hash, LNET_MT_HASH_BITS);
379                 return &mtable->mt_mhash[hash & LNET_MT_HASH_MASK];
380         }
381 }
382
383 int
384 lnet_mt_match_md(struct lnet_match_table *mtable,
385                  struct lnet_match_info *info, struct lnet_msg *msg)
386 {
387         struct list_head        *head;
388         struct lnet_me          *me;
389         struct lnet_me          *tmp;
390         int                     exhausted = 0;
391         int                     rc;
392
393         /* any ME with ignore bits? */
394         if (!list_empty(&mtable->mt_mhash[LNET_MT_HASH_IGNORE]))
395                 head = &mtable->mt_mhash[LNET_MT_HASH_IGNORE];
396         else
397                 head = lnet_mt_match_head(mtable, &info->mi_id,
398                                           info->mi_mbits);
399  again:
400         /* NB: only wildcard portal needs to return LNET_MATCHMD_EXHAUSTED */
401         if (lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
402                 exhausted = LNET_MATCHMD_EXHAUSTED;
403
404         list_for_each_entry_safe(me, tmp, head, me_list) {
405                 /* ME attached but MD not attached yet */
406                 if (me->me_md == NULL)
407                         continue;
408
409                 LASSERT(me == me->me_md->md_me);
410
411                 rc = lnet_try_match_md(me->me_md, info, msg);
412                 if ((rc & LNET_MATCHMD_EXHAUSTED) == 0)
413                         exhausted = 0; /* mlist is not empty */
414
415                 if ((rc & LNET_MATCHMD_FINISH) != 0) {
416                         /* don't return EXHAUSTED bit because we don't know
417                          * whether the mlist is empty or not */
418                         return rc & ~LNET_MATCHMD_EXHAUSTED;
419                 }
420         }
421
422         if (exhausted == LNET_MATCHMD_EXHAUSTED) { /* @head is exhausted */
423                 lnet_mt_set_exhausted(mtable, head - mtable->mt_mhash, 1);
424                 if (!lnet_mt_test_exhausted(mtable, -1))
425                         exhausted = 0;
426         }
427
428         if (exhausted == 0 && head == &mtable->mt_mhash[LNET_MT_HASH_IGNORE]) {
429                 head = lnet_mt_match_head(mtable, &info->mi_id,
430                                           info->mi_mbits);
431                 goto again; /* re-check MEs w/o ignore-bits */
432         }
433
434         if (info->mi_opc == LNET_MD_OP_GET ||
435             !lnet_ptl_is_lazy(the_lnet.ln_portals[info->mi_portal]))
436                 return LNET_MATCHMD_DROP | exhausted;
437
438         return LNET_MATCHMD_NONE | exhausted;
439 }
440
441 static int
442 lnet_ptl_match_early(struct lnet_portal *ptl, struct lnet_msg *msg)
443 {
444         int     rc;
445
446         /* message arrived before any buffer posting on this portal,
447          * simply delay or drop this message */
448         if (likely(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)))
449                 return 0;
450
451         lnet_ptl_lock(ptl);
452         /* check it again with hold of lock */
453         if (lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)) {
454                 lnet_ptl_unlock(ptl);
455                 return 0;
456         }
457
458         if (lnet_ptl_is_lazy(ptl)) {
459                 if (msg->msg_rx_ready_delay) {
460                         msg->msg_rx_delayed = 1;
461                         list_add_tail(&msg->msg_list,
462                                       &ptl->ptl_msg_delayed);
463                 }
464                 rc = LNET_MATCHMD_NONE;
465         } else {
466                 rc = LNET_MATCHMD_DROP;
467         }
468
469         lnet_ptl_unlock(ptl);
470         return rc;
471 }
472
473 static int
474 lnet_ptl_match_delay(struct lnet_portal *ptl,
475                      struct lnet_match_info *info, struct lnet_msg *msg)
476 {
477         int     first = ptl->ptl_mt_maps[0]; /* read w/o lock */
478         int     rc = 0;
479         int     i;
480
481         /*
482          * Steal buffer from other CPTs, and delay msg if nothing to
483          * steal.  This function is more expensive than a regular
484          * match, but we don't expect it can happen a lot. The return
485          * code contains one of LNET_MATCHMD_OK, LNET_MATCHMD_DROP, or
486          * LNET_MATCHMD_NONE.
487          */
488         LASSERT(lnet_ptl_is_wildcard(ptl));
489
490         for (i = 0; i < LNET_CPT_NUMBER; i++) {
491                 struct lnet_match_table *mtable;
492                 int                     cpt;
493
494                 cpt = (first + i) % LNET_CPT_NUMBER;
495                 mtable = ptl->ptl_mtables[cpt];
496                 if (i != 0 && i != LNET_CPT_NUMBER - 1 && !mtable->mt_enabled)
497                         continue;
498
499                 lnet_res_lock(cpt);
500                 lnet_ptl_lock(ptl);
501
502                 if (i == 0) {
503                         /* The first try, add to stealing list. */
504                         list_add_tail(&msg->msg_list,
505                                       &ptl->ptl_msg_stealing);
506                 }
507
508                 if (!list_empty(&msg->msg_list)) {
509                         /* On stealing list. */
510                         rc = lnet_mt_match_md(mtable, info, msg);
511
512                         if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 &&
513                             mtable->mt_enabled)
514                                 lnet_ptl_disable_mt(ptl, cpt);
515
516                         if ((rc & LNET_MATCHMD_FINISH) != 0) {
517                                 /* Match found, remove from stealing list. */
518                                 list_del_init(&msg->msg_list);
519                         } else if (i == LNET_CPT_NUMBER - 1 || /* (1) */
520                                    ptl->ptl_mt_nmaps == 0 ||   /* (2) */
521                                    (ptl->ptl_mt_nmaps == 1 &&  /* (3) */
522                                     ptl->ptl_mt_maps[0] == cpt)) {
523                                 /*
524                                  * No match found, and this is either
525                                  * (1) the last cpt to check, or
526                                  * (2) there is no active cpt, or
527                                  * (3) this is the only active cpt.
528                                  * There is nothing to steal: delay or
529                                  * drop the message.
530                                  */
531                                 list_del_init(&msg->msg_list);
532
533                                 if (lnet_ptl_is_lazy(ptl)) {
534                                         msg->msg_rx_delayed = 1;
535                                         list_add_tail(&msg->msg_list,
536                                                       &ptl->ptl_msg_delayed);
537                                         rc = LNET_MATCHMD_NONE;
538                                 } else {
539                                         rc = LNET_MATCHMD_DROP;
540                                 }
541                         } else {
542                                 /* Do another iteration. */
543                                 rc = 0;
544                         }
545                 } else {
546                         /*
547                          * No longer on stealing list: another thread
548                          * matched the message in lnet_ptl_attach_md().
549                          * We are now expected to handle the message.
550                          */
551                         rc = msg->msg_md == NULL ?
552                                 LNET_MATCHMD_DROP : LNET_MATCHMD_OK;
553                 }
554
555                 lnet_ptl_unlock(ptl);
556                 lnet_res_unlock(cpt);
557
558                 /*
559                  * Note that test (1) above ensures that we always
560                  * exit the loop through this break statement.
561                  *
562                  * LNET_MATCHMD_NONE means msg was added to the
563                  * delayed queue, and we may no longer reference it
564                  * after lnet_ptl_unlock() and lnet_res_unlock().
565                  */
566                 if (rc & (LNET_MATCHMD_FINISH | LNET_MATCHMD_NONE))
567                         break;
568         }
569
570         return rc;
571 }
572
573 int
574 lnet_ptl_match_md(struct lnet_match_info *info, struct lnet_msg *msg)
575 {
576         struct lnet_match_table *mtable;
577         struct lnet_portal      *ptl;
578         int                     rc;
579
580         CDEBUG(D_NET,
581                "Request from %s of length %d into portal %d MB=%#llx\n",
582                libcfs_idstr(&info->mi_id),
583                info->mi_rlength, info->mi_portal, info->mi_mbits);
584
585         if (info->mi_portal >= the_lnet.ln_nportals) {
586                 CERROR("Invalid portal %d not in [0-%d]\n",
587                        info->mi_portal, the_lnet.ln_nportals);
588                 return LNET_MATCHMD_DROP;
589         }
590
591         ptl = the_lnet.ln_portals[info->mi_portal];
592         rc = lnet_ptl_match_early(ptl, msg);
593         if (rc != 0) /* matched or delayed early message */
594                 return rc;
595
596         mtable = lnet_mt_of_match(info, msg);
597         lnet_res_lock(mtable->mt_cpt);
598
599         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
600                 rc = LNET_MATCHMD_DROP;
601                 goto out1;
602         }
603
604         rc = lnet_mt_match_md(mtable, info, msg);
605         if ((rc & LNET_MATCHMD_EXHAUSTED) != 0 && mtable->mt_enabled) {
606                 lnet_ptl_lock(ptl);
607                 lnet_ptl_disable_mt(ptl, mtable->mt_cpt);
608                 lnet_ptl_unlock(ptl);
609         }
610
611         if ((rc & LNET_MATCHMD_FINISH) != 0)    /* matched or dropping */
612                 goto out1;
613
614         if (!msg->msg_rx_ready_delay)
615                 goto out1;
616
617         LASSERT(lnet_ptl_is_lazy(ptl));
618         LASSERT(!msg->msg_rx_delayed);
619
620         /* NB: we don't expect "delay" can happen a lot */
621         if (lnet_ptl_is_unique(ptl) || LNET_CPT_NUMBER == 1) {
622                 lnet_ptl_lock(ptl);
623
624                 msg->msg_rx_delayed = 1;
625                 list_add_tail(&msg->msg_list, &ptl->ptl_msg_delayed);
626
627                 lnet_ptl_unlock(ptl);
628                 lnet_res_unlock(mtable->mt_cpt);
629                 rc = LNET_MATCHMD_NONE;
630         } else  {
631                 lnet_res_unlock(mtable->mt_cpt);
632                 rc = lnet_ptl_match_delay(ptl, info, msg);
633         }
634
635         /* LNET_MATCHMD_NONE means msg was added to the delay queue */
636         if (rc & LNET_MATCHMD_NONE) {
637                 CDEBUG(D_NET,
638                        "Delaying %s from %s ptl %d MB %#llx off %d len %d\n",
639                        info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET",
640                        libcfs_idstr(&info->mi_id), info->mi_portal,
641                        info->mi_mbits, info->mi_roffset, info->mi_rlength);
642         }
643         goto out0;
644  out1:
645         lnet_res_unlock(mtable->mt_cpt);
646  out0:
647         /* EXHAUSTED bit is only meaningful for internal functions */
648         return rc & ~LNET_MATCHMD_EXHAUSTED;
649 }
650
651 void
652 lnet_ptl_detach_md(struct lnet_me *me, struct lnet_libmd *md)
653 {
654         LASSERT(me->me_md == md && md->md_me == me);
655
656         me->me_md = NULL;
657         md->md_me = NULL;
658 }
659
660 /* called with lnet_res_lock held */
661 void
662 lnet_ptl_attach_md(struct lnet_me *me, struct lnet_libmd *md,
663                    struct list_head *matches, struct list_head *drops)
664 {
665         struct lnet_portal *ptl = the_lnet.ln_portals[me->me_portal];
666         struct lnet_match_table *mtable;
667         struct list_head *head;
668         struct lnet_msg *tmp;
669         struct lnet_msg *msg;
670         int exhausted = 0;
671         int cpt;
672
673         LASSERT(md->md_refcount == 0); /* a brand new MD */
674
675         me->me_md = md;
676         md->md_me = me;
677
678         cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
679         mtable = ptl->ptl_mtables[cpt];
680
681         if (list_empty(&ptl->ptl_msg_stealing) &&
682             list_empty(&ptl->ptl_msg_delayed) &&
683             !lnet_mt_test_exhausted(mtable, me->me_pos))
684                 return;
685
686         lnet_ptl_lock(ptl);
687         head = &ptl->ptl_msg_stealing;
688  again:
689         list_for_each_entry_safe(msg, tmp, head, msg_list) {
690                 struct lnet_match_info  info;
691                 struct lnet_hdr         *hdr;
692                 int                     rc;
693
694                 LASSERT(msg->msg_rx_delayed || head == &ptl->ptl_msg_stealing);
695
696                 hdr   = &msg->msg_hdr;
697                 /* Multi-Rail: Primary peer NID */
698                 info.mi_id.nid  = msg->msg_initiator;
699                 info.mi_id.pid  = hdr->src_pid;
700                 info.mi_opc     = LNET_MD_OP_PUT;
701                 info.mi_portal  = hdr->msg.put.ptl_index;
702                 info.mi_rlength = hdr->payload_length;
703                 info.mi_roffset = hdr->msg.put.offset;
704                 info.mi_mbits   = hdr->msg.put.match_bits;
705
706                 rc = lnet_try_match_md(md, &info, msg);
707
708                 exhausted = (rc & LNET_MATCHMD_EXHAUSTED) != 0;
709                 if ((rc & LNET_MATCHMD_NONE) != 0) {
710                         if (exhausted)
711                                 break;
712                         continue;
713                 }
714
715                 /* Hurrah! This _is_ a match */
716                 LASSERT((rc & LNET_MATCHMD_FINISH) != 0);
717                 list_del_init(&msg->msg_list);
718
719                 if (head == &ptl->ptl_msg_stealing) {
720                         if (exhausted)
721                                 break;
722                         /* stealing thread will handle the message */
723                         continue;
724                 }
725
726                 if ((rc & LNET_MATCHMD_OK) != 0) {
727                         list_add_tail(&msg->msg_list, matches);
728
729                         CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
730                                "match %llu offset %d length %d.\n",
731                                libcfs_idstr(&info.mi_id),
732                                info.mi_portal, info.mi_mbits,
733                                info.mi_roffset, info.mi_rlength);
734                 } else {
735                         list_add_tail(&msg->msg_list, drops);
736                 }
737
738                 if (exhausted)
739                         break;
740         }
741
742         if (!exhausted && head == &ptl->ptl_msg_stealing) {
743                 head = &ptl->ptl_msg_delayed;
744                 goto again;
745         }
746
747         if (lnet_ptl_is_wildcard(ptl) && !exhausted) {
748                 lnet_mt_set_exhausted(mtable, me->me_pos, 0);
749                 if (!mtable->mt_enabled)
750                         lnet_ptl_enable_mt(ptl, cpt);
751         }
752
753         lnet_ptl_unlock(ptl);
754 }
755
756 static void
757 lnet_ptl_cleanup(struct lnet_portal *ptl)
758 {
759         struct lnet_match_table *mtable;
760         int                     i;
761
762         if (ptl->ptl_mtables == NULL) /* uninitialized portal */
763                 return;
764
765         LASSERT(list_empty(&ptl->ptl_msg_delayed));
766         LASSERT(list_empty(&ptl->ptl_msg_stealing));
767         cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
768                 struct list_head *mhash;
769                 struct lnet_me   *me;
770                 int               j;
771
772                 if (mtable->mt_mhash == NULL) /* uninitialized match-table */
773                         continue;
774
775                 mhash = mtable->mt_mhash;
776                 /* cleanup ME */
777                 for (j = 0; j < LNET_MT_HASH_SIZE + 1; j++) {
778                         while ((me = list_first_entry_or_null(&mhash[j],
779                                                               struct lnet_me,
780                                                               me_list)) != NULL) {
781                                 CERROR("Active ME %p on exit\n", me);
782                                 list_del(&me->me_list);
783                                 LIBCFS_FREE_PRE(me, sizeof(*me), "slab-freed");
784                                 kmem_cache_free(lnet_mes_cachep, me);
785                         }
786                 }
787                 /* the extra entry is for MEs with ignore bits */
788                 CFS_FREE_PTR_ARRAY(mhash, LNET_MT_HASH_SIZE + 1);
789         }
790
791         cfs_percpt_free(ptl->ptl_mtables);
792         ptl->ptl_mtables = NULL;
793 }
794
795 static int
796 lnet_ptl_setup(struct lnet_portal *ptl, int index)
797 {
798         struct lnet_match_table *mtable;
799         struct list_head        *mhash;
800         int                     i;
801         int                     j;
802
803         ptl->ptl_mtables = cfs_percpt_alloc(lnet_cpt_table(),
804                                             sizeof(struct lnet_match_table));
805         if (ptl->ptl_mtables == NULL) {
806                 CERROR("Failed to create match table for portal %d\n", index);
807                 return -ENOMEM;
808         }
809
810         ptl->ptl_index = index;
811         INIT_LIST_HEAD(&ptl->ptl_msg_delayed);
812         INIT_LIST_HEAD(&ptl->ptl_msg_stealing);
813         spin_lock_init(&ptl->ptl_lock);
814         cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
815                 /* the extra entry is for MEs with ignore bits */
816                 LIBCFS_CPT_ALLOC(mhash, lnet_cpt_table(), i,
817                                  sizeof(*mhash) * (LNET_MT_HASH_SIZE + 1));
818                 if (mhash == NULL) {
819                         CERROR("Failed to create match hash for portal %d\n",
820                                index);
821                         goto failed;
822                 }
823
824                 memset(&mtable->mt_exhausted[0], -1,
825                        sizeof(mtable->mt_exhausted[0]) *
826                        LNET_MT_EXHAUSTED_BMAP);
827                 mtable->mt_mhash = mhash;
828                 for (j = 0; j < LNET_MT_HASH_SIZE + 1; j++)
829                         INIT_LIST_HEAD(&mhash[j]);
830
831                 mtable->mt_portal = index;
832                 mtable->mt_cpt = i;
833         }
834
835         return 0;
836  failed:
837         lnet_ptl_cleanup(ptl);
838         return -ENOMEM;
839 }
840
841 #define PORTAL_SIZE (offsetof(struct lnet_portal, ptl_mt_maps[LNET_CPT_NUMBER]))
842 void
843 lnet_portals_destroy(void)
844 {
845         int     i;
846
847         if (the_lnet.ln_portals == NULL)
848                 return;
849
850         for (i = 0; i < the_lnet.ln_nportals; i++)
851                 if (the_lnet.ln_portals[i]) {
852                         lnet_ptl_cleanup(the_lnet.ln_portals[i]);
853                         LIBCFS_FREE(the_lnet.ln_portals[i], PORTAL_SIZE);
854                 }
855
856         CFS_FREE_PTR_ARRAY(the_lnet.ln_portals, the_lnet.ln_nportals);
857         the_lnet.ln_portals = NULL;
858 }
859
860 int
861 lnet_portals_create(void)
862 {
863         int     i;
864
865         the_lnet.ln_nportals = MAX_PORTALS;
866         CFS_ALLOC_PTR_ARRAY(the_lnet.ln_portals, the_lnet.ln_nportals);
867         if (the_lnet.ln_portals == NULL) {
868                 CERROR("Failed to allocate portals table\n");
869                 return -ENOMEM;
870         }
871
872         for (i = 0; i < the_lnet.ln_nportals; i++) {
873                 LIBCFS_ALLOC(the_lnet.ln_portals[i], PORTAL_SIZE);
874                 if (!the_lnet.ln_portals[i] ||
875                     lnet_ptl_setup(the_lnet.ln_portals[i], i)) {
876                         lnet_portals_destroy();
877                         return -ENOMEM;
878                 }
879         }
880
881         return 0;
882 }
883
884 /**
885  * Turn on the lazy portal attribute. Use with caution!
886  *
887  * This portal attribute only affects incoming PUT requests to the portal,
888  * and is off by default. By default, if there's no matching MD for an
889  * incoming PUT request, it is simply dropped. With the lazy attribute on,
890  * such requests are queued indefinitely until either a matching MD is
891  * posted to the portal or the lazy attribute is turned off.
892  *
893  * It would prevent dropped requests, however it should be regarded as the
894  * last line of defense - i.e. users must keep a close watch on active
895  * buffers on a lazy portal and once it becomes too low post more buffers as
896  * soon as possible. This is because delayed requests usually have detrimental
897  * effects on underlying network connections. A few delayed requests often
898  * suffice to bring an underlying connection to a complete halt, due to flow
899  * control mechanisms.
900  *
901  * There's also a DOS attack risk. If users don't post match-all MDs on a
902  * lazy portal, a malicious peer can easily stop a service by sending some
903  * PUT requests with match bits that won't match any MD. A routed server is
904  * especially vulnerable since the connections to its neighbor routers are
905  * shared among all clients.
906  *
907  * \param portal Index of the portal to enable the lazy attribute on.
908  *
909  * \retval 0       On success.
910  * \retval -EINVAL If \a portal is not a valid index.
911  */
912 int
913 LNetSetLazyPortal(int portal)
914 {
915         struct lnet_portal *ptl;
916
917         if (portal < 0 || portal >= the_lnet.ln_nportals)
918                 return -EINVAL;
919
920         CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
921         ptl = the_lnet.ln_portals[portal];
922
923         lnet_res_lock(LNET_LOCK_EX);
924         lnet_ptl_lock(ptl);
925
926         lnet_ptl_setopt(ptl, LNET_PTL_LAZY);
927
928         lnet_ptl_unlock(ptl);
929         lnet_res_unlock(LNET_LOCK_EX);
930
931         return 0;
932 }
933 EXPORT_SYMBOL(LNetSetLazyPortal);
934
935 int
936 lnet_clear_lazy_portal(struct lnet_ni *ni, int portal, char *reason)
937 {
938         struct lnet_portal      *ptl;
939         LIST_HEAD(zombies);
940
941         if (portal < 0 || portal >= the_lnet.ln_nportals)
942                 return -EINVAL;
943
944         ptl = the_lnet.ln_portals[portal];
945
946         lnet_res_lock(LNET_LOCK_EX);
947         lnet_ptl_lock(ptl);
948
949         if (!lnet_ptl_is_lazy(ptl)) {
950                 lnet_ptl_unlock(ptl);
951                 lnet_res_unlock(LNET_LOCK_EX);
952                 return 0;
953         }
954
955         if (ni != NULL) {
956                 struct lnet_msg *msg, *tmp;
957
958                 /* grab all messages which are on the NI passed in */
959                 list_for_each_entry_safe(msg, tmp, &ptl->ptl_msg_delayed,
960                                          msg_list) {
961                         if (msg->msg_txni == ni || msg->msg_rxni == ni)
962                                 list_move(&msg->msg_list, &zombies);
963                 }
964         } else {
965                 if (the_lnet.ln_state != LNET_STATE_RUNNING)
966                         CWARN("Active lazy portal %d on exit\n", portal);
967                 else
968                         CDEBUG(D_NET, "clearing portal %d lazy\n", portal);
969
970                 /* grab all the blocked messages atomically */
971                 list_splice_init(&ptl->ptl_msg_delayed, &zombies);
972
973                 lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
974         }
975
976         lnet_ptl_unlock(ptl);
977         lnet_res_unlock(LNET_LOCK_EX);
978
979         lnet_drop_delayed_msg_list(&zombies, reason);
980
981         return 0;
982 }
983
984 /**
985  * Turn off the lazy portal attribute. Delayed requests on the portal,
986  * if any, will be all dropped when this function returns.
987  *
988  * \param portal Index of the portal to disable the lazy attribute on.
989  *
990  * \retval 0       On success.
991  * \retval -EINVAL If \a portal is not a valid index.
992  */
993 int
994 LNetClearLazyPortal(int portal)
995 {
996         return lnet_clear_lazy_portal(NULL, portal,
997                                       "Clearing lazy portal attr");
998 }
999 EXPORT_SYMBOL(LNetClearLazyPortal);