Whamcloud - gitweb
b=16102
[fs/lustre-release.git] / lnet / selftest / rpc.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Isaac Huang <isaac@clusterfs.com>
6  *
7  */
8
9 #define DEBUG_SUBSYSTEM S_LNET
10
11 #include "selftest.h"
12
13
14 #define SRPC_PEER_HASH_SIZE       101  /* # peer lists */
15
16 typedef enum {
17         SRPC_STATE_NONE,
18         SRPC_STATE_NI_INIT,
19         SRPC_STATE_EQ_INIT,
20         SRPC_STATE_WI_INIT,
21         SRPC_STATE_RUNNING,
22         SRPC_STATE_STOPPING,
23 } srpc_state_t;
24
25 struct smoketest_rpc {
26         spinlock_t        rpc_glock;     /* global lock */
27         srpc_service_t   *rpc_services[SRPC_SERVICE_MAX_ID + 1];
28         struct list_head *rpc_peers;     /* hash table of known peers */
29         lnet_handle_eq_t  rpc_lnet_eq;   /* _the_ LNet event queue */
30         srpc_state_t      rpc_state;
31         srpc_counters_t   rpc_counters;
32         __u64             rpc_matchbits; /* matchbits counter */
33 } srpc_data;
34
35 static int srpc_peer_credits = 16;
36 CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444,
37                 "# in-flight RPCs per peer (16 by default)");
38
39 /* forward ref's */
40 int srpc_handle_rpc (swi_workitem_t *wi);
41
42 void srpc_get_counters (srpc_counters_t *cnt)
43 {
44         spin_lock(&srpc_data.rpc_glock);
45         *cnt = srpc_data.rpc_counters;
46         spin_unlock(&srpc_data.rpc_glock);
47 }
48
49 void srpc_set_counters (const srpc_counters_t *cnt)
50 {
51         spin_lock(&srpc_data.rpc_glock);
52         srpc_data.rpc_counters = *cnt;
53         spin_unlock(&srpc_data.rpc_glock);
54 }
55
56 void
57 srpc_add_bulk_page (srpc_bulk_t *bk, cfs_page_t *pg, int i)
58 {
59         LASSERT (i >= 0 && i < bk->bk_niov);
60
61 #ifdef __KERNEL__
62         bk->bk_iovs[i].kiov_offset = 0;
63         bk->bk_iovs[i].kiov_page   = pg;
64         bk->bk_iovs[i].kiov_len    = CFS_PAGE_SIZE;
65 #else
66         LASSERT (bk->bk_pages != NULL);
67
68         bk->bk_pages[i] = pg;
69         bk->bk_iovs[i].iov_len  = CFS_PAGE_SIZE;
70         bk->bk_iovs[i].iov_base = cfs_page_address(pg);
71 #endif
72         return;
73 }
74
75 void
76 srpc_free_bulk (srpc_bulk_t *bk)
77 {
78         int         i;
79         cfs_page_t *pg;
80
81         LASSERT (bk != NULL);
82 #ifndef __KERNEL__
83         LASSERT (bk->bk_pages != NULL);
84 #endif
85
86         for (i = 0; i < bk->bk_niov; i++) {
87 #ifdef __KERNEL__
88                 pg = bk->bk_iovs[i].kiov_page;
89 #else
90                 pg = bk->bk_pages[i];
91 #endif
92                 if (pg == NULL) break;
93
94                 cfs_free_page(pg);
95         }
96
97 #ifndef __KERNEL__
98         LIBCFS_FREE(bk->bk_pages, sizeof(cfs_page_t *) * bk->bk_niov);
99 #endif
100         LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov]));
101         return;
102 }
103
104 srpc_bulk_t *
105 srpc_alloc_bulk (int npages, int sink)
106 {
107         srpc_bulk_t  *bk;
108         cfs_page_t  **pages;
109         int           i;
110
111         LASSERT (npages > 0 && npages <= LNET_MAX_IOV);
112
113         LIBCFS_ALLOC(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
114         if (bk == NULL) {
115                 CERROR ("Can't allocate descriptor for %d pages\n", npages);
116                 return NULL;
117         }
118
119         memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[npages]));
120         bk->bk_sink = sink;
121         bk->bk_niov = npages;
122         bk->bk_len  = npages * CFS_PAGE_SIZE;
123 #ifndef __KERNEL__
124         LIBCFS_ALLOC(pages, sizeof(cfs_page_t *) * npages);
125         if (pages == NULL) {
126                 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[npages]));
127                 CERROR ("Can't allocate page array for %d pages\n", npages);
128                 return NULL;
129         }
130
131         memset(pages, 0, sizeof(cfs_page_t *) * npages);
132         bk->bk_pages = pages;
133 #else
134         UNUSED (pages);
135 #endif
136
137         for (i = 0; i < npages; i++) {
138                 cfs_page_t *pg = cfs_alloc_page(CFS_ALLOC_STD);
139
140                 if (pg == NULL) {
141                         CERROR ("Can't allocate page %d of %d\n", i, npages);
142                         srpc_free_bulk(bk);
143                         return NULL;
144                 }
145
146                 srpc_add_bulk_page(bk, pg, i);
147         }
148
149         return bk;
150 }
151
152
153 static inline struct list_head *
154 srpc_nid2peerlist (lnet_nid_t nid)
155 {
156         unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE;
157
158         return &srpc_data.rpc_peers[hash];
159 }
160
161 static inline srpc_peer_t *
162 srpc_create_peer (lnet_nid_t nid)
163 {
164         srpc_peer_t *peer;
165
166         LASSERT (nid != LNET_NID_ANY);
167
168         LIBCFS_ALLOC(peer, sizeof(srpc_peer_t));
169         if (peer == NULL) {
170                 CERROR ("Failed to allocate peer structure for %s\n",
171                         libcfs_nid2str(nid));
172                 return NULL;
173         }
174
175         memset(peer, 0, sizeof(srpc_peer_t));
176         peer->stp_nid     = nid;
177         peer->stp_credits = srpc_peer_credits;
178
179         spin_lock_init(&peer->stp_lock);
180         CFS_INIT_LIST_HEAD(&peer->stp_rpcq);
181         CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq);
182         return peer;
183 }
184
185 srpc_peer_t *
186 srpc_find_peer_locked (lnet_nid_t nid)
187 {
188         struct list_head *peer_list = srpc_nid2peerlist(nid);
189         srpc_peer_t      *peer;
190
191         LASSERT (nid != LNET_NID_ANY);
192
193         list_for_each_entry (peer, peer_list, stp_list) {
194                 if (peer->stp_nid == nid)
195                         return peer;
196         }
197
198         return NULL;
199 }
200
201 static srpc_peer_t *
202 srpc_nid2peer (lnet_nid_t nid)
203 {
204         srpc_peer_t *peer;
205         srpc_peer_t *new_peer;
206
207         spin_lock(&srpc_data.rpc_glock);
208         peer = srpc_find_peer_locked(nid);
209         spin_unlock(&srpc_data.rpc_glock);
210
211         if (peer != NULL)
212                 return peer;
213
214         new_peer = srpc_create_peer(nid);
215
216         spin_lock(&srpc_data.rpc_glock);
217
218         peer = srpc_find_peer_locked(nid);
219         if (peer != NULL) {
220                 spin_unlock(&srpc_data.rpc_glock);
221                 if (new_peer != NULL)
222                         LIBCFS_FREE(new_peer, sizeof(srpc_peer_t));
223
224                 return peer;
225         }
226
227         if (new_peer == NULL) {
228                 spin_unlock(&srpc_data.rpc_glock);
229                 return NULL;
230         }
231
232         list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid));
233         spin_unlock(&srpc_data.rpc_glock);
234         return new_peer;
235 }
236
237 static inline __u64
238 srpc_next_id (void)
239 {
240         __u64 id;
241
242         spin_lock(&srpc_data.rpc_glock);
243         id = srpc_data.rpc_matchbits++;
244         spin_unlock(&srpc_data.rpc_glock);
245         return id;
246 }
247
248 void
249 srpc_init_server_rpc (srpc_server_rpc_t *rpc,
250                       srpc_service_t *sv, srpc_buffer_t *buffer)
251 {
252         memset(rpc, 0, sizeof(*rpc));
253         swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);
254
255         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
256
257         rpc->srpc_service  = sv;
258         rpc->srpc_reqstbuf = buffer;
259         rpc->srpc_peer     = buffer->buf_peer;
260         rpc->srpc_self     = buffer->buf_self;
261         rpc->srpc_replymdh = LNET_INVALID_HANDLE;
262 }
263
264 int
265 srpc_add_service (srpc_service_t *sv)
266 {
267         int                id = sv->sv_id;
268         int                i;
269         srpc_server_rpc_t *rpc;
270
271         LASSERT (sv->sv_concur > 0);
272         LASSERT (0 <= id && id <= SRPC_SERVICE_MAX_ID);
273
274         spin_lock(&srpc_data.rpc_glock);
275
276         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
277
278         if (srpc_data.rpc_services[id] != NULL) {
279                 spin_unlock(&srpc_data.rpc_glock);
280                 return -EBUSY;
281         }
282
283         srpc_data.rpc_services[id] = sv;
284         spin_unlock(&srpc_data.rpc_glock);
285
286         sv->sv_nprune       = 0;
287         sv->sv_nposted_msg  = 0;
288         sv->sv_shuttingdown = 0;
289         spin_lock_init(&sv->sv_lock);
290         CFS_INIT_LIST_HEAD(&sv->sv_free_rpcq);
291         CFS_INIT_LIST_HEAD(&sv->sv_active_rpcq);
292         CFS_INIT_LIST_HEAD(&sv->sv_posted_msgq);
293         CFS_INIT_LIST_HEAD(&sv->sv_blocked_msgq);
294
295         sv->sv_ev.ev_data = sv;
296         sv->sv_ev.ev_type = SRPC_REQUEST_RCVD;
297
298         for (i = 0; i < sv->sv_concur; i++) {
299                 LIBCFS_ALLOC(rpc, sizeof(*rpc));
300                 if (rpc == NULL) goto enomem;
301
302                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
303         }
304
305         CDEBUG (D_NET, "Adding service: id %d, name %s, concurrency %d\n",
306                 id, sv->sv_name, sv->sv_concur);
307         return 0;
308
309 enomem:
310         while (!list_empty(&sv->sv_free_rpcq)) {
311                 rpc = list_entry(sv->sv_free_rpcq.next,
312                                  srpc_server_rpc_t, srpc_list);
313                 list_del(&rpc->srpc_list);
314                 LIBCFS_FREE(rpc, sizeof(*rpc));
315         }
316
317         spin_lock(&srpc_data.rpc_glock);
318         srpc_data.rpc_services[id] = NULL;
319         spin_unlock(&srpc_data.rpc_glock);
320         return -ENOMEM;
321 }
322
323 int
324 srpc_remove_service (srpc_service_t *sv)
325 {
326         int id = sv->sv_id;
327
328         spin_lock(&srpc_data.rpc_glock);
329
330         if (srpc_data.rpc_services[id] != sv) {
331                 spin_unlock(&srpc_data.rpc_glock);
332                 return -ENOENT;
333         }
334
335         srpc_data.rpc_services[id] = NULL;
336         spin_unlock(&srpc_data.rpc_glock);
337         return 0;
338 }
339
340 int
341 srpc_post_passive_rdma(int portal, __u64 matchbits, void *buf,
342                        int len, int options, lnet_process_id_t peer,
343                        lnet_handle_md_t *mdh, srpc_event_t *ev)
344 {
345         int              rc;
346         lnet_md_t        md;
347         lnet_handle_me_t meh;
348
349         rc = LNetMEAttach(portal, peer, matchbits, 0,
350                           LNET_UNLINK, LNET_INS_AFTER, &meh);
351         if (rc != 0) {
352                 CERROR ("LNetMEAttach failed: %d\n", rc);
353                 LASSERT (rc == -ENOMEM);
354                 return -ENOMEM;
355         }
356
357         md.threshold = 1;
358         md.user_ptr  = ev;
359         md.start     = buf;
360         md.length    = len;
361         md.options   = options;
362         md.eq_handle = srpc_data.rpc_lnet_eq;
363
364         rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
365         if (rc != 0) {
366                 CERROR ("LNetMDAttach failed: %d\n", rc);
367                 LASSERT (rc == -ENOMEM);
368
369                 rc = LNetMEUnlink(meh);
370                 LASSERT (rc == 0);
371                 return -ENOMEM;
372         }
373
374         CDEBUG (D_NET,
375                 "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n",
376                 libcfs_id2str(peer), portal, matchbits);
377         return 0;
378 }
379
380 int
381 srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
382                       int options, lnet_process_id_t peer, lnet_nid_t self,
383                       lnet_handle_md_t *mdh, srpc_event_t *ev)
384 {
385         int       rc;
386         lnet_md_t md;
387
388         md.user_ptr  = ev;
389         md.start     = buf;
390         md.length    = len;
391         md.eq_handle = srpc_data.rpc_lnet_eq;
392         md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1;
393         md.options   = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
394
395         rc = LNetMDBind(md, LNET_UNLINK, mdh);
396         if (rc != 0) {
397                 CERROR ("LNetMDBind failed: %d\n", rc);
398                 LASSERT (rc == -ENOMEM);
399                 return -ENOMEM;
400         }
401
402         /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
403          * they're only meaningful for MDs attached to an ME (i.e. passive
404          * buffers... */
405         if ((options & LNET_MD_OP_PUT) != 0) {
406                 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
407                              portal, matchbits, 0, 0);
408         } else {
409                 LASSERT ((options & LNET_MD_OP_GET) != 0);
410
411                 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
412         }
413
414         if (rc != 0) {
415                 CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n",
416                         ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get",
417                         libcfs_id2str(peer), portal, matchbits, rc);
418
419                 /* The forthcoming unlink event will complete this operation
420                  * with failure, so fall through and return success here.
421                  */
422                 rc = LNetMDUnlink(*mdh);
423                 LASSERT (rc == 0);
424         } else {
425                 CDEBUG (D_NET,
426                         "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n",
427                         libcfs_id2str(peer), portal, matchbits);
428         }
429         return 0;
430 }
431
432 int
433 srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf,
434                         int len, lnet_handle_md_t *mdh, srpc_event_t *ev)
435 {
436         int rc;
437         int portal;
438
439         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
440                 portal = SRPC_REQUEST_PORTAL;
441         else
442                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
443
444         rc = srpc_post_active_rdma(portal, service, buf, len,
445                                    LNET_MD_OP_PUT, peer,
446                                    LNET_NID_ANY, mdh, ev);
447         return rc;
448 }
449
450 int
451 srpc_post_passive_rqtbuf(int service, void *buf, int len,
452                          lnet_handle_md_t *mdh, srpc_event_t *ev)
453 {
454         int               rc;
455         int               portal;
456         lnet_process_id_t any = {.nid = LNET_NID_ANY,
457                                  .pid = LNET_PID_ANY};
458
459         if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID)
460                 portal = SRPC_REQUEST_PORTAL;
461         else
462                 portal = SRPC_FRAMEWORK_REQUEST_PORTAL;
463
464         rc = srpc_post_passive_rdma(portal, service, buf, len,
465                                     LNET_MD_OP_PUT, any, mdh, ev);
466         return rc;
467 }
468
469 int
470 srpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
471 {
472         srpc_msg_t *msg = &buf->buf_msg;
473         int         rc;
474
475         LASSERT (!sv->sv_shuttingdown);
476
477         buf->buf_mdh = LNET_INVALID_HANDLE;
478         list_add(&buf->buf_list, &sv->sv_posted_msgq);
479         sv->sv_nposted_msg++;
480         spin_unlock(&sv->sv_lock);
481
482         rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg),
483                                       &buf->buf_mdh, &sv->sv_ev);
484
485         /* At this point, a RPC (new or delayed) may have arrived in
486          * msg and its event handler has been called. So we must add
487          * buf to sv_posted_msgq _before_ dropping sv_lock */
488
489         spin_lock(&sv->sv_lock);
490
491         if (rc == 0) {
492                 if (sv->sv_shuttingdown) {
493                         spin_unlock(&sv->sv_lock);
494
495                         /* srpc_shutdown_service might have tried to unlink me
496                          * when my buf_mdh was still invalid */
497                         LNetMDUnlink(buf->buf_mdh);
498
499                         spin_lock(&sv->sv_lock);
500                 }
501                 return 0;
502         }
503
504         sv->sv_nposted_msg--;
505         if (sv->sv_shuttingdown) return rc;
506
507         list_del(&buf->buf_list);
508
509         spin_unlock(&sv->sv_lock);
510         LIBCFS_FREE(buf, sizeof(*buf));
511         spin_lock(&sv->sv_lock);
512         return rc;
513 }
514
515 int
516 srpc_service_add_buffers (srpc_service_t *sv, int nbuffer)
517 {
518         int                rc;
519         int                posted;
520         srpc_buffer_t     *buf;
521
522         LASSERTF (nbuffer > 0,
523                   "nbuffer must be positive: %d\n", nbuffer);
524
525         for (posted = 0; posted < nbuffer; posted++) {
526                 LIBCFS_ALLOC(buf, sizeof(*buf));
527                 if (buf == NULL) break;
528
529                 spin_lock(&sv->sv_lock);
530                 rc = srpc_service_post_buffer(sv, buf);
531                 spin_unlock(&sv->sv_lock);
532
533                 if (rc != 0) break;
534         }
535
536         return posted;
537 }
538
539 void
540 srpc_service_remove_buffers (srpc_service_t *sv, int nbuffer)
541 {
542         LASSERTF (nbuffer > 0,
543                   "nbuffer must be positive: %d\n", nbuffer);
544
545         spin_lock(&sv->sv_lock);
546
547         LASSERT (sv->sv_nprune >= 0);
548         LASSERT (!sv->sv_shuttingdown);
549
550         sv->sv_nprune += nbuffer;
551
552         spin_unlock(&sv->sv_lock);
553         return;
554 }
555
556 /* returns 1 if sv has finished, otherwise 0 */
557 int
558 srpc_finish_service (srpc_service_t *sv)
559 {
560         srpc_server_rpc_t *rpc;
561         srpc_buffer_t     *buf;
562
563         spin_lock(&sv->sv_lock);
564
565         LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */
566
567         if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) {
568                 CDEBUG (D_NET,
569                         "waiting for %d posted buffers to unlink and "
570                         "in-flight RPCs to die.\n",
571                         sv->sv_nposted_msg);
572
573                 if (!list_empty(&sv->sv_active_rpcq)) {
574                         rpc = list_entry(sv->sv_active_rpcq.next,
575                                          srpc_server_rpc_t, srpc_list);
576                         CDEBUG (D_NETERROR,
577                                 "Active RPC on shutdown: sv %s, peer %s, "
578                                 "wi %s scheduled %d running %d, "
579                                 "ev fired %d type %d status %d lnet %d\n",
580                                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
581                                 swi_state2str(rpc->srpc_wi.wi_state),
582                                 rpc->srpc_wi.wi_scheduled,
583                                 rpc->srpc_wi.wi_running,
584                                 rpc->srpc_ev.ev_fired,
585                                 rpc->srpc_ev.ev_type,
586                                 rpc->srpc_ev.ev_status,
587                                 rpc->srpc_ev.ev_lnet);
588                 }
589
590                 spin_unlock(&sv->sv_lock);
591                 return 0;
592         }
593
594         spin_unlock(&sv->sv_lock); /* no lock needed from now on */
595
596         for (;;) {
597                 struct list_head *q;
598
599                 if (!list_empty(&sv->sv_posted_msgq))
600                         q = &sv->sv_posted_msgq;
601                 else if (!list_empty(&sv->sv_blocked_msgq))
602                         q = &sv->sv_blocked_msgq;
603                 else
604                         break;
605
606                 buf = list_entry(q->next, srpc_buffer_t, buf_list);
607                 list_del(&buf->buf_list);
608
609                 LIBCFS_FREE(buf, sizeof(*buf));
610         }
611
612         while (!list_empty(&sv->sv_free_rpcq)) {
613                 rpc = list_entry(sv->sv_free_rpcq.next,
614                                  srpc_server_rpc_t, srpc_list);
615                 list_del(&rpc->srpc_list);
616                 LIBCFS_FREE(rpc, sizeof(*rpc));
617         }
618
619         return 1;
620 }
621
622 /* called with sv->sv_lock held */
623 void
624 srpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf)
625 {
626         if (sv->sv_shuttingdown) goto free;
627
628         if (sv->sv_nprune == 0) {
629                 if (srpc_service_post_buffer(sv, buf) != 0)
630                         CWARN ("Failed to post %s buffer\n", sv->sv_name);
631                 return;
632         }
633
634         sv->sv_nprune--;
635 free:
636         spin_unlock(&sv->sv_lock);
637         LIBCFS_FREE(buf, sizeof(*buf));
638         spin_lock(&sv->sv_lock);
639 }
640
641 void
642 srpc_shutdown_service (srpc_service_t *sv)
643 {
644         srpc_server_rpc_t *rpc;
645         srpc_buffer_t     *buf;
646
647         spin_lock(&sv->sv_lock);
648
649         CDEBUG (D_NET, "Shutting down service: id %d, name %s\n",
650                 sv->sv_id, sv->sv_name);
651
652         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
653
654         /* schedule in-flight RPCs to notice the shutdown */
655         list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) {
656                 swi_schedule_workitem(&rpc->srpc_wi);
657         }
658
659         spin_unlock(&sv->sv_lock);
660
661         /* OK to traverse sv_posted_msgq without lock, since no one
662          * touches sv_posted_msgq now */
663         list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list)
664                 LNetMDUnlink(buf->buf_mdh);
665
666         return;
667 }
668
669 int
670 srpc_send_request (srpc_client_rpc_t *rpc)
671 {
672         srpc_event_t *ev = &rpc->crpc_reqstev;
673         int           rc;
674
675         ev->ev_fired = 0;
676         ev->ev_data  = rpc;
677         ev->ev_type  = SRPC_REQUEST_SENT;
678
679         rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service,
680                                      &rpc->crpc_reqstmsg, sizeof(srpc_msg_t),
681                                      &rpc->crpc_reqstmdh, ev);
682         if (rc != 0) {
683                 LASSERT (rc == -ENOMEM);
684                 ev->ev_fired = 1;  /* no more event expected */
685         }
686         return rc;
687 }
688
689 int
690 srpc_prepare_reply (srpc_client_rpc_t *rpc)
691 {
692         srpc_event_t *ev = &rpc->crpc_replyev;
693         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
694         int           rc;
695
696         ev->ev_fired = 0;
697         ev->ev_data  = rpc;
698         ev->ev_type  = SRPC_REPLY_RCVD;
699
700         *id = srpc_next_id();
701
702         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
703                                     &rpc->crpc_replymsg, sizeof(srpc_msg_t),
704                                     LNET_MD_OP_PUT, rpc->crpc_dest,
705                                     &rpc->crpc_replymdh, ev);
706         if (rc != 0) {
707                 LASSERT (rc == -ENOMEM);
708                 ev->ev_fired = 1;  /* no more event expected */
709         }
710         return rc;
711 }
712
713 int
714 srpc_prepare_bulk (srpc_client_rpc_t *rpc)
715 {
716         srpc_bulk_t  *bk = &rpc->crpc_bulk;
717         srpc_event_t *ev = &rpc->crpc_bulkev;
718         __u64        *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
719         int           rc;
720         int           opt;
721
722         LASSERT (bk->bk_niov <= LNET_MAX_IOV);
723
724         if (bk->bk_niov == 0) return 0; /* nothing to do */
725
726         opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
727 #ifdef __KERNEL__
728         opt |= LNET_MD_KIOV;
729 #else
730         opt |= LNET_MD_IOVEC;
731 #endif
732
733         ev->ev_fired = 0;
734         ev->ev_data  = rpc;
735         ev->ev_type  = SRPC_BULK_REQ_RCVD;
736
737         *id = srpc_next_id();
738
739         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id,
740                                     &bk->bk_iovs[0], bk->bk_niov, opt,
741                                     rpc->crpc_dest, &bk->bk_mdh, ev);
742         if (rc != 0) {
743                 LASSERT (rc == -ENOMEM);
744                 ev->ev_fired = 1;  /* no more event expected */
745         }
746         return rc;
747 }
748
749 int
750 srpc_do_bulk (srpc_server_rpc_t *rpc)
751 {
752         srpc_event_t  *ev = &rpc->srpc_ev;
753         srpc_bulk_t   *bk = rpc->srpc_bulk;
754         __u64          id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
755         int            rc;
756         int            opt;
757
758         LASSERT (bk != NULL);
759
760         opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
761 #ifdef __KERNEL__
762         opt |= LNET_MD_KIOV;
763 #else
764         opt |= LNET_MD_IOVEC;
765 #endif
766
767         ev->ev_fired = 0;
768         ev->ev_data  = rpc;
769         ev->ev_type  = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
770
771         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
772                                    &bk->bk_iovs[0], bk->bk_niov, opt,
773                                    rpc->srpc_peer, rpc->srpc_self,
774                                    &bk->bk_mdh, ev);
775         if (rc != 0)
776                 ev->ev_fired = 1;  /* no more event expected */
777         return rc;
778 }
779
780 /* called with srpc_service_t::sv_lock held */
781 inline void
782 srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
783 {
784         srpc_service_t *sv = rpc->srpc_service;
785
786         if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
787                 swi_schedule_workitem(&rpc->srpc_wi);
788         else    /* framework RPCs are handled one by one */
789                 swi_schedule_serial_workitem(&rpc->srpc_wi);
790
791         return;
792 }
793
794 /* only called from srpc_handle_rpc */
795 void
796 srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
797 {
798         srpc_service_t *sv = rpc->srpc_service;
799         srpc_buffer_t  *buffer;
800
801         LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);
802
803         rpc->srpc_status = status;
804
805         CDEBUG (status == 0 ? D_NET : D_NETERROR,
806                 "Server RPC done: service %s, peer %s, status %s:%d\n",
807                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
808                 swi_state2str(rpc->srpc_wi.wi_state), status);
809
810         if (status != 0) {
811                 spin_lock(&srpc_data.rpc_glock);
812                 srpc_data.rpc_counters.rpcs_dropped++;
813                 spin_unlock(&srpc_data.rpc_glock);
814         }
815
816         if (rpc->srpc_done != NULL)
817                 (*rpc->srpc_done) (rpc);
818         LASSERT (rpc->srpc_bulk == NULL);
819
820         spin_lock(&sv->sv_lock);
821
822         if (rpc->srpc_reqstbuf != NULL) {
823                 /* NB might drop sv_lock in srpc_service_recycle_buffer, but
824                  * sv won't go away for sv_active_rpcq must not be empty */
825                 srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf);
826                 rpc->srpc_reqstbuf = NULL;
827         }
828
829         list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */
830
831         /*
832          * No one can schedule me now since:
833          * - I'm not on sv_active_rpcq.
834          * - all LNet events have been fired.
835          * Cancel pending schedules and prevent future schedule attempts:
836          */
837         LASSERT (rpc->srpc_ev.ev_fired);
838         swi_kill_workitem(&rpc->srpc_wi);
839
840         if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) {
841                 buffer = list_entry(sv->sv_blocked_msgq.next,
842                                     srpc_buffer_t, buf_list);
843                 list_del(&buffer->buf_list);
844
845                 srpc_init_server_rpc(rpc, sv, buffer);
846                 list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq);
847                 srpc_schedule_server_rpc(rpc);
848         } else {
849                 list_add(&rpc->srpc_list, &sv->sv_free_rpcq);
850         }
851
852         spin_unlock(&sv->sv_lock);
853         return;
854 }
855
856 /* handles an incoming RPC */
857 int
858 srpc_handle_rpc (swi_workitem_t *wi)
859 {
860         srpc_server_rpc_t *rpc = wi->wi_data;
861         srpc_service_t    *sv = rpc->srpc_service;
862         srpc_event_t      *ev = &rpc->srpc_ev;
863         int                rc = 0;
864
865         LASSERT (wi == &rpc->srpc_wi);
866
867         spin_lock(&sv->sv_lock);
868
869         if (sv->sv_shuttingdown) {
870                 spin_unlock(&sv->sv_lock);
871
872                 if (rpc->srpc_bulk != NULL)
873                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
874                 LNetMDUnlink(rpc->srpc_replymdh);
875
876                 if (ev->ev_fired) { /* no more event, OK to finish */
877                         srpc_server_rpc_done(rpc, -ESHUTDOWN);
878                         return 1;
879                 }
880                 return 0;
881         }
882
883         spin_unlock(&sv->sv_lock);
884
885         switch (wi->wi_state) {
886         default:
887                 LBUG ();
888         case SWI_STATE_NEWBORN: {
889                 srpc_msg_t           *msg;
890                 srpc_generic_reply_t *reply;
891
892                 msg = &rpc->srpc_reqstbuf->buf_msg;
893                 reply = &rpc->srpc_replymsg.msg_body.reply;
894
895                 if (msg->msg_version != SRPC_MSG_VERSION &&
896                     msg->msg_version != __swab32(SRPC_MSG_VERSION)) {
897                         CWARN ("Version mismatch: %u, %u expected, from %s\n",
898                                msg->msg_version, SRPC_MSG_VERSION,
899                                libcfs_id2str(rpc->srpc_peer));
900                         reply->status = EPROTO;
901                 } else {
902                         reply->status = 0;
903                         rc = (*sv->sv_handler) (rpc);
904                         LASSERT (reply->status == 0 || !rpc->srpc_bulk);
905                 }
906
907                 if (rc != 0) {
908                         srpc_server_rpc_done(rpc, rc);
909                         return 1;
910                 }
911
912                 wi->wi_state = SWI_STATE_BULK_STARTED;
913
914                 if (rpc->srpc_bulk != NULL) {
915                         rc = srpc_do_bulk(rpc);
916                         if (rc == 0)
917                                 return 0; /* wait for bulk */
918
919                         LASSERT (ev->ev_fired);
920                         ev->ev_status = rc;
921                 }
922         }
923         case SWI_STATE_BULK_STARTED:
924                 /* we cannot LASSERT ev_fired right here because it
925                  * may be set only upon an event with unlinked==1 */
926
927                 if (rpc->srpc_bulk != NULL) {
928                         rc = ev->ev_status;
929
930                         if (sv->sv_bulk_ready != NULL)
931                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
932
933                         if (rc != 0) {
934                                 if (ev->ev_fired) {
935                                         srpc_server_rpc_done(rpc, rc);
936                                         return 1;
937                                 }
938
939                                 rpc->srpc_status = rc;
940                                 wi->wi_state     = SWI_STATE_BULK_ERRORED;
941                                 LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
942                                 return 0; /* wait for UNLINK event  */
943                         }
944                 }
945
946                 LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired);
947
948                 wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
949                 rc = srpc_send_reply(rpc);
950                 if (rc == 0)
951                         return 0; /* wait for reply */
952                 srpc_server_rpc_done(rpc, rc);
953                 return 1;
954
955         case SWI_STATE_REPLY_SUBMITTED:
956                 LASSERT (ev->ev_fired);
957
958                 wi->wi_state = SWI_STATE_DONE;
959                 srpc_server_rpc_done(rpc, ev->ev_status);
960                 return 1;
961
962         case SWI_STATE_BULK_ERRORED:
963                 LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired);
964                 LASSERT (rpc->srpc_status != 0);
965
966                 srpc_server_rpc_done(rpc, rpc->srpc_status);
967                 return 1;
968         }
969
970         return 0;
971 }
972
973 void
974 srpc_client_rpc_expired (void *data)
975 {
976         srpc_client_rpc_t *rpc = data;
977
978         CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n",
979                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
980                rpc->crpc_timeout);
981
982         spin_lock(&rpc->crpc_lock);
983
984         rpc->crpc_timeout = 0;
985         srpc_abort_rpc(rpc, -ETIMEDOUT);
986
987         spin_unlock(&rpc->crpc_lock);
988
989         spin_lock(&srpc_data.rpc_glock);
990         srpc_data.rpc_counters.rpcs_expired++;
991         spin_unlock(&srpc_data.rpc_glock);
992         return;
993 }
994
995 inline void
996 srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc)
997 {
998         stt_timer_t *timer = &rpc->crpc_timer;
999
1000         if (rpc->crpc_timeout == 0) return;
1001
1002         CFS_INIT_LIST_HEAD(&timer->stt_list);
1003         timer->stt_data    = rpc;
1004         timer->stt_func    = srpc_client_rpc_expired;
1005         timer->stt_expires = cfs_time_add(rpc->crpc_timeout,
1006                                           cfs_time_current_sec());
1007         stt_add_timer(timer);
1008         return;
1009 }
1010
1011 /*
1012  * Called with rpc->crpc_lock held.
1013  *
1014  * Upon exit the RPC expiry timer is not queued and the handler is not
1015  * running on any CPU. */
1016 void
1017 srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc)
1018 {
1019         /* timer not planted or already exploded */
1020         if (rpc->crpc_timeout == 0) return;
1021
1022         /* timer sucessfully defused */
1023         if (stt_del_timer(&rpc->crpc_timer)) return;
1024
1025 #ifdef __KERNEL__
1026         /* timer detonated, wait for it to explode */
1027         while (rpc->crpc_timeout != 0) {
1028                 spin_unlock(&rpc->crpc_lock);
1029
1030                 cfs_schedule();
1031
1032                 spin_lock(&rpc->crpc_lock);
1033         }
1034 #else
1035         LBUG(); /* impossible in single-threaded runtime */
1036 #endif
1037         return;
1038 }
1039
1040 void
1041 srpc_check_sends (srpc_peer_t *peer, int credits)
1042 {
1043         struct list_head  *q;
1044         srpc_client_rpc_t *rpc;
1045
1046         LASSERT (credits >= 0);
1047         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1048
1049         spin_lock(&peer->stp_lock);
1050         peer->stp_credits += credits;
1051
1052         while (peer->stp_credits) {
1053                 if (!list_empty(&peer->stp_ctl_rpcq))
1054                         q = &peer->stp_ctl_rpcq;
1055                 else if (!list_empty(&peer->stp_rpcq))
1056                         q = &peer->stp_rpcq;
1057                 else
1058                         break;
1059
1060                 peer->stp_credits--;
1061
1062                 rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl);
1063                 list_del_init(&rpc->crpc_privl);
1064                 srpc_client_rpc_decref(rpc);  /* --ref for peer->*rpcq */
1065
1066                 swi_schedule_workitem(&rpc->crpc_wi);
1067         }
1068
1069         spin_unlock(&peer->stp_lock);
1070         return;
1071 }
1072
1073 void
1074 srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
1075 {
1076         swi_workitem_t *wi = &rpc->crpc_wi;
1077         srpc_peer_t    *peer = rpc->crpc_peer;
1078
1079         LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
1080
1081         spin_lock(&rpc->crpc_lock);
1082
1083         rpc->crpc_closed = 1;
1084         if (rpc->crpc_status == 0)
1085                 rpc->crpc_status = status;
1086
1087         srpc_del_client_rpc_timer(rpc);
1088
1089         CDEBUG ((status == 0) ? D_NET : D_NETERROR,
1090                 "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1091                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1092                 swi_state2str(wi->wi_state), rpc->crpc_aborted, status);
1093
1094         /*
1095          * No one can schedule me now since:
1096          * - RPC timer has been defused.
1097          * - all LNet events have been fired.
1098          * - crpc_closed has been set, preventing srpc_abort_rpc from
1099          *   scheduling me.
1100          * Cancel pending schedules and prevent future schedule attempts:
1101          */
1102         LASSERT (!srpc_event_pending(rpc));
1103         swi_kill_workitem(wi);
1104
1105         spin_unlock(&rpc->crpc_lock);
1106
1107         (*rpc->crpc_done) (rpc);
1108
1109         if (peer != NULL)
1110                 srpc_check_sends(peer, 1);
1111         return;
1112 }
1113
1114 /* sends an outgoing RPC */
1115 int
1116 srpc_send_rpc (swi_workitem_t *wi)
1117 {
1118         int                rc = 0;
1119         srpc_client_rpc_t *rpc = wi->wi_data;
1120         srpc_msg_t        *reply = &rpc->crpc_replymsg;
1121         int                do_bulk = rpc->crpc_bulk.bk_niov > 0;
1122
1123         LASSERT (rpc != NULL);
1124         LASSERT (wi == &rpc->crpc_wi);
1125
1126         spin_lock(&rpc->crpc_lock);
1127
1128         if (rpc->crpc_aborted) {
1129                 spin_unlock(&rpc->crpc_lock);
1130                 goto abort;
1131         }
1132
1133         spin_unlock(&rpc->crpc_lock);
1134
1135         switch (wi->wi_state) {
1136         default:
1137                 LBUG ();
1138         case SWI_STATE_NEWBORN:
1139                 LASSERT (!srpc_event_pending(rpc));
1140
1141                 rc = srpc_prepare_reply(rpc);
1142                 if (rc != 0) {
1143                         srpc_client_rpc_done(rpc, rc);
1144                         return 1;
1145                 }
1146
1147                 rc = srpc_prepare_bulk(rpc);
1148                 if (rc != 0) break;
1149
1150                 wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;
1151                 rc = srpc_send_request(rpc);
1152                 break;
1153
1154         case SWI_STATE_REQUEST_SUBMITTED:
1155                 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1156                  * order; however, they're processed in a strict order:
1157                  * rqt, rpy, and bulk. */
1158                 if (!rpc->crpc_reqstev.ev_fired) break;
1159
1160                 rc = rpc->crpc_reqstev.ev_status;
1161                 if (rc != 0) break;
1162
1163                 wi->wi_state = SWI_STATE_REQUEST_SENT;
1164                 /* perhaps more events, fall thru */
1165         case SWI_STATE_REQUEST_SENT: {
1166                 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
1167
1168                 if (!rpc->crpc_replyev.ev_fired) break;
1169
1170                 rc = rpc->crpc_replyev.ev_status;
1171                 if (rc != 0) break;
1172
1173                 if ((reply->msg_type != type &&
1174                      reply->msg_type != __swab32(type)) ||
1175                     (reply->msg_magic != SRPC_MSG_MAGIC &&
1176                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1177                         CWARN ("Bad message from %s: type %u (%d expected),"
1178                                " magic %u (%d expected).\n",
1179                                libcfs_id2str(rpc->crpc_dest),
1180                                reply->msg_type, type,
1181                                reply->msg_magic, SRPC_MSG_MAGIC);
1182                         rc = -EBADMSG;
1183                         break;
1184                 }
1185
1186                 if (do_bulk && reply->msg_body.reply.status != 0) {
1187                         CWARN ("Remote error %d at %s, unlink bulk buffer in "
1188                                "case peer didn't initiate bulk transfer\n",
1189                                reply->msg_body.reply.status,
1190                                libcfs_id2str(rpc->crpc_dest));
1191                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1192                 }
1193
1194                 wi->wi_state = SWI_STATE_REPLY_RECEIVED;
1195         }
1196         case SWI_STATE_REPLY_RECEIVED:
1197                 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;
1198
1199                 rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1200
1201                 /* Bulk buffer was unlinked due to remote error. Clear error
1202                  * since reply buffer still contains valid data.
1203                  * NB rpc->crpc_done shouldn't look into bulk data in case of
1204                  * remote error. */
1205                 if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1206                     rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
1207                         rc = 0;
1208
1209                 wi->wi_state = SWI_STATE_DONE;
1210                 srpc_client_rpc_done(rpc, rc);
1211                 return 1;
1212         }
1213
1214         if (rc != 0) {
1215                 spin_lock(&rpc->crpc_lock);
1216                 srpc_abort_rpc(rpc, rc);
1217                 spin_unlock(&rpc->crpc_lock);
1218         }
1219
1220 abort:
1221         if (rpc->crpc_aborted) {
1222                 LNetMDUnlink(rpc->crpc_reqstmdh);
1223                 LNetMDUnlink(rpc->crpc_replymdh);
1224                 LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1225
1226                 if (!srpc_event_pending(rpc)) {
1227                         srpc_client_rpc_done(rpc, -EINTR);
1228                         return 1;
1229                 }
1230         }
1231         return 0;
1232 }
1233
1234 srpc_client_rpc_t *
1235 srpc_create_client_rpc (lnet_process_id_t peer, int service,
1236                         int nbulkiov, int bulklen,
1237                         void (*rpc_done)(srpc_client_rpc_t *),
1238                         void (*rpc_fini)(srpc_client_rpc_t *), void *priv)
1239 {
1240         srpc_client_rpc_t *rpc;
1241
1242         LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t,
1243                                    crpc_bulk.bk_iovs[nbulkiov]));
1244         if (rpc == NULL)
1245                 return NULL;
1246
1247         srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1248                              bulklen, rpc_done, rpc_fini, priv);
1249         return rpc;
1250 }
1251
1252 /* called with rpc->crpc_lock held */
1253 static inline void
1254 srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc)
1255 {
1256         int service = rpc->crpc_service;
1257
1258         LASSERT (peer->stp_nid == rpc->crpc_dest.nid);
1259         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1260
1261         rpc->crpc_peer = peer;
1262
1263         spin_lock(&peer->stp_lock);
1264
1265         /* Framework RPCs that alter session state shall take precedence
1266          * over test RPCs and framework query RPCs */
1267         if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID &&
1268             service != SRPC_SERVICE_DEBUG &&
1269             service != SRPC_SERVICE_QUERY_STAT)
1270                 list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq);
1271         else
1272                 list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq);
1273
1274         srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */
1275         spin_unlock(&peer->stp_lock);
1276         return;
1277 }
1278
1279 /* called with rpc->crpc_lock held */
1280 void
1281 srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
1282 {
1283         srpc_peer_t *peer = rpc->crpc_peer;
1284
1285         LASSERT (why != 0);
1286
1287         if (rpc->crpc_aborted || /* already aborted */
1288             rpc->crpc_closed)    /* callback imminent */
1289                 return;
1290
1291         CDEBUG (D_NET,
1292                 "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1293                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1294                 swi_state2str(rpc->crpc_wi.wi_state), why);
1295
1296         rpc->crpc_aborted = 1;
1297         rpc->crpc_status  = why;
1298
1299         if (peer != NULL) {
1300                 spin_lock(&peer->stp_lock);
1301
1302                 if (!list_empty(&rpc->crpc_privl)) { /* still queued */
1303                         list_del_init(&rpc->crpc_privl);
1304                         srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */
1305                         rpc->crpc_peer = NULL;       /* no credit taken */
1306                 }
1307
1308                 spin_unlock(&peer->stp_lock);
1309         }
1310
1311         swi_schedule_workitem(&rpc->crpc_wi);
1312         return;
1313 }
1314
1315 /* called with rpc->crpc_lock held */
1316 void
1317 srpc_post_rpc (srpc_client_rpc_t *rpc)
1318 {
1319         srpc_peer_t *peer;
1320
1321         LASSERT (!rpc->crpc_aborted);
1322         LASSERT (rpc->crpc_peer == NULL);
1323         LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING);
1324         LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0);
1325
1326         CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1327                 libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1328                 rpc->crpc_timeout);
1329
1330         srpc_add_client_rpc_timer(rpc);
1331
1332         peer = srpc_nid2peer(rpc->crpc_dest.nid);
1333         if (peer == NULL) {
1334                 srpc_abort_rpc(rpc, -ENOMEM);
1335                 return;
1336         }
1337
1338         srpc_queue_rpc(peer, rpc);
1339
1340         spin_unlock(&rpc->crpc_lock);
1341         srpc_check_sends(peer, 0);
1342         spin_lock(&rpc->crpc_lock);
1343         return;
1344 }
1345
1346
1347 int
1348 srpc_send_reply (srpc_server_rpc_t *rpc)
1349 {
1350         srpc_event_t   *ev = &rpc->srpc_ev;
1351         srpc_msg_t     *msg = &rpc->srpc_replymsg;
1352         srpc_buffer_t  *buffer = rpc->srpc_reqstbuf;
1353         srpc_service_t *sv = rpc->srpc_service;
1354         __u64           rpyid;
1355         int             rc;
1356
1357         LASSERT (buffer != NULL);
1358         rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1359
1360         spin_lock(&sv->sv_lock);
1361
1362         if (!sv->sv_shuttingdown &&
1363             sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1364                 /* Repost buffer before replying since test client
1365                  * might send me another RPC once it gets the reply */
1366                 if (srpc_service_post_buffer(sv, buffer) != 0)
1367                         CWARN ("Failed to repost %s buffer\n", sv->sv_name);
1368                 rpc->srpc_reqstbuf = NULL;
1369         }
1370
1371         spin_unlock(&sv->sv_lock);
1372
1373         ev->ev_fired = 0;
1374         ev->ev_data  = rpc;
1375         ev->ev_type  = SRPC_REPLY_SENT;
1376
1377         msg->msg_magic   = SRPC_MSG_MAGIC;
1378         msg->msg_version = SRPC_MSG_VERSION;
1379         msg->msg_type    = srpc_service2reply(sv->sv_id);
1380
1381         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1382                                    sizeof(*msg), LNET_MD_OP_PUT,
1383                                    rpc->srpc_peer, rpc->srpc_self,
1384                                    &rpc->srpc_replymdh, ev);
1385         if (rc != 0)
1386                 ev->ev_fired = 1;  /* no more event expected */
1387         return rc;
1388 }
1389
1390 /* when in kernel always called with LNET_LOCK() held, and in thread context */
1391 void
1392 srpc_lnet_ev_handler (lnet_event_t *ev)
1393 {
1394         srpc_event_t      *rpcev = ev->md.user_ptr;
1395         srpc_client_rpc_t *crpc;
1396         srpc_server_rpc_t *srpc;
1397         srpc_buffer_t     *buffer;
1398         srpc_service_t    *sv;
1399         srpc_msg_t        *msg;
1400         srpc_msg_type_t    type;
1401         int                fired_flag = 1;
1402
1403         LASSERT (!in_interrupt());
1404
1405         if (ev->status != 0) {
1406                 spin_lock(&srpc_data.rpc_glock);
1407                 srpc_data.rpc_counters.errors++;
1408                 spin_unlock(&srpc_data.rpc_glock);
1409         }
1410
1411         rpcev->ev_lnet = ev->type;
1412
1413         switch (rpcev->ev_type) {
1414         default:
1415                 LBUG ();
1416         case SRPC_REQUEST_SENT:
1417                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1418                         spin_lock(&srpc_data.rpc_glock);
1419                         srpc_data.rpc_counters.rpcs_sent++;
1420                         spin_unlock(&srpc_data.rpc_glock);
1421                 }
1422         case SRPC_REPLY_RCVD:
1423         case SRPC_BULK_REQ_RCVD:
1424                 crpc = rpcev->ev_data;
1425
1426                 LASSERT (rpcev == &crpc->crpc_reqstev ||
1427                          rpcev == &crpc->crpc_replyev ||
1428                          rpcev == &crpc->crpc_bulkev);
1429
1430                 spin_lock(&crpc->crpc_lock);
1431
1432                 LASSERT (rpcev->ev_fired == 0);
1433                 rpcev->ev_fired  = 1;
1434                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1435                                                 -EINTR : ev->status;
1436                 swi_schedule_workitem(&crpc->crpc_wi);
1437
1438                 spin_unlock(&crpc->crpc_lock);
1439                 break;
1440
1441         case SRPC_REQUEST_RCVD:
1442                 sv = rpcev->ev_data;
1443
1444                 LASSERT (rpcev == &sv->sv_ev);
1445
1446                 spin_lock(&sv->sv_lock);
1447
1448                 LASSERT (ev->unlinked);
1449                 LASSERT (ev->type == LNET_EVENT_PUT ||
1450                          ev->type == LNET_EVENT_UNLINK);
1451                 LASSERT (ev->type != LNET_EVENT_UNLINK ||
1452                          sv->sv_shuttingdown);
1453
1454                 buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg);
1455                 buffer->buf_peer = ev->initiator;
1456                 buffer->buf_self = ev->target.nid;
1457
1458                 sv->sv_nposted_msg--;
1459                 LASSERT (sv->sv_nposted_msg >= 0);
1460
1461                 if (sv->sv_shuttingdown) {
1462                         /* Leave buffer on sv->sv_posted_msgq since
1463                          * srpc_finish_service needs to traverse it. */
1464                         spin_unlock(&sv->sv_lock);
1465                         break;
1466                 }
1467
1468                 list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */
1469                 msg = &buffer->buf_msg;
1470                 type = srpc_service2request(sv->sv_id);
1471
1472                 if (ev->status != 0 || ev->mlength != sizeof(*msg) ||
1473                     (msg->msg_type != type &&
1474                      msg->msg_type != __swab32(type)) ||
1475                     (msg->msg_magic != SRPC_MSG_MAGIC &&
1476                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1477                         CERROR ("Dropping RPC (%s) from %s: "
1478                                 "status %d mlength %d type %u magic %u.\n",
1479                                 sv->sv_name, libcfs_id2str(ev->initiator),
1480                                 ev->status, ev->mlength,
1481                                 msg->msg_type, msg->msg_magic);
1482
1483                         /* NB might drop sv_lock in srpc_service_recycle_buffer,
1484                          * sv_nposted_msg++ as an implicit reference to prevent
1485                          * sv from disappearing under me */
1486                         sv->sv_nposted_msg++;
1487                         srpc_service_recycle_buffer(sv, buffer);
1488                         sv->sv_nposted_msg--;
1489                         spin_unlock(&sv->sv_lock);
1490
1491                         if (ev->status == 0) { /* status!=0 counted already */
1492                                 spin_lock(&srpc_data.rpc_glock);
1493                                 srpc_data.rpc_counters.errors++;
1494                                 spin_unlock(&srpc_data.rpc_glock);
1495                         }
1496                         break;
1497                 }
1498
1499                 if (!list_empty(&sv->sv_free_rpcq)) {
1500                         srpc = list_entry(sv->sv_free_rpcq.next,
1501                                           srpc_server_rpc_t, srpc_list);
1502                         list_del(&srpc->srpc_list);
1503
1504                         srpc_init_server_rpc(srpc, sv, buffer);
1505                         list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq);
1506                         srpc_schedule_server_rpc(srpc);
1507                 } else {
1508                         list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq);
1509                 }
1510
1511                 spin_unlock(&sv->sv_lock);
1512
1513                 spin_lock(&srpc_data.rpc_glock);
1514                 srpc_data.rpc_counters.rpcs_rcvd++;
1515                 spin_unlock(&srpc_data.rpc_glock);
1516                 break;
1517
1518         case SRPC_BULK_GET_RPLD:
1519                 LASSERT (ev->type == LNET_EVENT_SEND ||
1520                          ev->type == LNET_EVENT_REPLY ||
1521                          ev->type == LNET_EVENT_UNLINK);
1522
1523                 if (ev->type == LNET_EVENT_SEND && !ev->unlinked) {
1524                         if (ev->status == 0)
1525                                 break; /* wait for the final LNET_EVENT_REPLY */
1526                         else
1527                                 fired_flag = 0; /* LNET_EVENT_REPLY may arrive
1528                                                    (optimized GET case) */
1529                 }
1530         case SRPC_BULK_PUT_SENT:
1531                 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) {
1532                         spin_lock(&srpc_data.rpc_glock);
1533
1534                         if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1535                                 srpc_data.rpc_counters.bulk_get += ev->mlength;
1536                         else
1537                                 srpc_data.rpc_counters.bulk_put += ev->mlength;
1538
1539                         spin_unlock(&srpc_data.rpc_glock);
1540                 }
1541         case SRPC_REPLY_SENT:
1542                 srpc = rpcev->ev_data;
1543                 sv = srpc->srpc_service;
1544
1545                 LASSERT (rpcev == &srpc->srpc_ev);
1546
1547                 spin_lock(&sv->sv_lock);
1548                 if (fired_flag)
1549                         rpcev->ev_fired  = 1;
1550
1551                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1552                                                 -EINTR : ev->status;
1553
1554                 srpc_schedule_server_rpc(srpc);
1555                 spin_unlock(&sv->sv_lock);
1556                 break;
1557         }
1558
1559         return;
1560 }
1561
1562 #ifndef __KERNEL__
1563
1564 int
1565 srpc_check_event (int timeout)
1566 {
1567         lnet_event_t ev;
1568         int          rc;
1569         int          i;
1570
1571         rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1,
1572                         timeout * 1000, &ev, &i);
1573         if (rc == 0) return 0;
1574
1575         LASSERT (rc == -EOVERFLOW || rc == 1);
1576
1577         /* We can't affort to miss any events... */
1578         if (rc == -EOVERFLOW) {
1579                 CERROR ("Dropped an event!!!\n");
1580                 abort();
1581         }
1582
1583         srpc_lnet_ev_handler(&ev);
1584         return 1;
1585 }
1586
1587 #endif
1588
1589 int
1590 srpc_startup (void)
1591 {
1592         int i;
1593         int rc;
1594
1595 #ifndef __KERNEL__
1596         char *s;
1597
1598         s = getenv("SRPC_PEER_CREDITS");
1599         srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits;
1600 #endif
1601
1602         if (srpc_peer_credits <= 0) {
1603                 CERROR("Peer credits must be positive: %d\n", srpc_peer_credits);
1604                 return -EINVAL;
1605         }
1606
1607         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1608         spin_lock_init(&srpc_data.rpc_glock);
1609
1610         /* 1 second pause to avoid timestamp reuse */
1611         cfs_pause(cfs_time_seconds(1));
1612         srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48;
1613
1614         srpc_data.rpc_state = SRPC_STATE_NONE;
1615
1616         LIBCFS_ALLOC(srpc_data.rpc_peers,
1617                      sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1618         if (srpc_data.rpc_peers == NULL) {
1619                 CERROR ("Failed to alloc peer hash.\n");
1620                 return -ENOMEM;
1621         }
1622
1623         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++)
1624                 CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);
1625
1626 #ifdef __KERNEL__
1627         rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1628 #else
1629         if (the_lnet.ln_server_mode_flag)
1630                 rc = LNetNIInit(LUSTRE_SRV_LNET_PID);
1631         else
1632                 rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);
1633 #endif
1634         if (rc < 0) {
1635                 CERROR ("LNetNIInit() has failed: %d\n", rc);
1636                 LIBCFS_FREE(srpc_data.rpc_peers,
1637                             sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1638                 return rc;
1639         }
1640
1641         srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1642
1643         srpc_data.rpc_lnet_eq = LNET_EQ_NONE;
1644 #ifdef __KERNEL__
1645         rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1646 #else
1647         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);
1648 #endif
1649         if (rc != 0) {
1650                 CERROR("LNetEQAlloc() has failed: %d\n", rc);
1651                 goto bail;
1652         }
1653
1654         rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1655         LASSERT (rc == 0);
1656
1657         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1658
1659         rc = swi_startup();
1660         if (rc != 0)
1661                 goto bail;
1662
1663         srpc_data.rpc_state = SRPC_STATE_WI_INIT;
1664
1665         rc = stt_startup();
1666
1667 bail:
1668         if (rc != 0)
1669                 srpc_shutdown();
1670         else
1671                 srpc_data.rpc_state = SRPC_STATE_RUNNING;
1672
1673         return rc;
1674 }
1675
1676 void
1677 srpc_shutdown (void)
1678 {
1679         int i;
1680         int rc;
1681         int state;
1682
1683         state = srpc_data.rpc_state;
1684         srpc_data.rpc_state = SRPC_STATE_STOPPING;
1685
1686         switch (state) {
1687         default:
1688                 LBUG ();
1689         case SRPC_STATE_RUNNING:
1690                 spin_lock(&srpc_data.rpc_glock);
1691
1692                 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1693                         srpc_service_t *sv = srpc_data.rpc_services[i];
1694
1695                         LASSERTF (sv == NULL,
1696                                   "service not empty: id %d, name %s\n",
1697                                   i, sv->sv_name);
1698                 }
1699
1700                 spin_unlock(&srpc_data.rpc_glock);
1701
1702                 stt_shutdown();
1703
1704         case SRPC_STATE_WI_INIT:
1705                 swi_shutdown();
1706
1707         case SRPC_STATE_EQ_INIT:
1708                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1709                 LASSERT (rc == 0);
1710                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1711                 LASSERT (rc == 0); /* the EQ should have no user by now */
1712
1713         case SRPC_STATE_NI_INIT:
1714                 LNetNIFini();
1715                 break;
1716         }
1717
1718         /* srpc_peer_t's are kept in hash until shutdown */
1719         for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) {
1720                 srpc_peer_t *peer;
1721
1722                 while (!list_empty(&srpc_data.rpc_peers[i])) {
1723                         peer = list_entry(srpc_data.rpc_peers[i].next,
1724                                           srpc_peer_t, stp_list);
1725                         list_del(&peer->stp_list);
1726
1727                         LASSERT (list_empty(&peer->stp_rpcq));
1728                         LASSERT (list_empty(&peer->stp_ctl_rpcq));
1729                         LASSERT (peer->stp_credits == srpc_peer_credits);
1730
1731                         LIBCFS_FREE(peer, sizeof(srpc_peer_t));
1732                 }
1733         }
1734
1735         LIBCFS_FREE(srpc_data.rpc_peers,
1736                     sizeof(struct list_head) * SRPC_PEER_HASH_SIZE);
1737         return;
1738 }