1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of the Lustre file system, http://www.lustre.org
7 * Lustre is a trademark of Cluster File Systems, Inc.
9 * You may have signed or agreed to another license before downloading
10 * this software. If so, you are bound by the terms and conditions
11 * of that agreement, and the following does not apply to you. See the
12 * LICENSE file included with this distribution for more information.
14 * If you did not agree to a different license, then this copy of Lustre
15 * is open source software; you can redistribute it and/or modify it
16 * under the terms of version 2 of the GNU General Public License as
17 * published by the Free Software Foundation.
19 * In either case, Lustre is distributed in the hope that it will be
20 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 * license text for more details.
26 #define DEBUG_SUBSYSTEM S_RPC
29 # include <liblustre.h>
32 # include <linux/kernel.h>
35 #include <obd_class.h>
36 #include <lustre_net.h>
37 #include <lustre_sec.h>
38 #include "ptlrpc_internal.h"
40 lnet_handle_eq_t ptlrpc_eq_h;
43 * Client's outgoing request callback
45 void request_out_callback(lnet_event_t *ev)
47 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
48 struct ptlrpc_request *req = cbid->cbid_arg;
51 LASSERT (ev->type == LNET_EVENT_SEND ||
52 ev->type == LNET_EVENT_UNLINK);
53 LASSERT (ev->unlinked);
55 DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
56 "type %d, status %d", ev->type, ev->status);
58 sptlrpc_request_out_callback(req);
60 if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
62 /* Failed send: make it seem like the reply timed out, just
63 * like failing sends in client.c does currently... */
65 spin_lock(&req->rq_lock);
67 spin_unlock(&req->rq_lock);
69 ptlrpc_wake_client_req(req);
72 ptlrpc_req_finished(req);
78 * Client's incoming reply callback
80 void reply_in_callback(lnet_event_t *ev)
82 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
83 struct ptlrpc_request *req = cbid->cbid_arg;
86 DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
87 "type %d, status %d", ev->type, ev->status);
89 LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
90 LASSERT (ev->md.start == req->rq_repbuf);
91 LASSERT (ev->mlength <= req->rq_repbuf_len);
92 /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
93 for adaptive timeouts' early reply. */
94 LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
96 spin_lock(&req->rq_lock);
98 req->rq_receiving_reply = 0;
103 if (ev->type == LNET_EVENT_UNLINK) {
104 req->rq_must_unlink = 0;
105 DEBUG_REQ(D_RPCTRACE, req, "unlink");
109 if ((ev->offset == 0) &&
110 ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
112 DEBUG_REQ(D_ADAPTTO, req,
113 "Early reply received: mlen=%u offset=%d replen=%d "
114 "replied=%d unlinked=%d", ev->mlength, ev->offset,
115 req->rq_replen, req->rq_replied, ev->unlinked);
117 req->rq_early_count++; /* number received, client side */
118 if (req->rq_replied) {
119 /* If we already got the real reply, then we need to
120 * check if lnet_finalize() unlinked the md. In that
121 * case, there will be no further callback of type
125 req->rq_must_unlink = 0;
127 DEBUG_REQ(D_RPCTRACE, req, "unlinked in reply");
131 req->rq_reply_off = ev->offset;
132 req->rq_nob_received = ev->mlength;
133 /* And we're still receiving */
134 req->rq_receiving_reply = 1;
138 req->rq_reply_off = ev->offset;
139 req->rq_nob_received = ev->mlength;
140 /* LNetMDUnlink can't be called under the LNET_LOCK,
141 so we must unlink in ptlrpc_unregister_reply */
142 DEBUG_REQ(D_INFO, req,
143 "reply in flags=%x mlen=%u offset=%d replen=%d",
144 lustre_msg_get_flags(req->rq_reqmsg),
145 ev->mlength, ev->offset, req->rq_replen);
148 req->rq_import->imp_last_reply_time = cfs_time_current_sec();
151 /* NB don't unlock till after wakeup; req can disappear under us
152 * since we don't have our own ref */
153 ptlrpc_wake_client_req(req);
154 spin_unlock(&req->rq_lock);
159 * Client's bulk has been written/read
161 void client_bulk_callback (lnet_event_t *ev)
163 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
164 struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
167 LASSERT ((desc->bd_type == BULK_PUT_SINK &&
168 ev->type == LNET_EVENT_PUT) ||
169 (desc->bd_type == BULK_GET_SOURCE &&
170 ev->type == LNET_EVENT_GET) ||
171 ev->type == LNET_EVENT_UNLINK);
172 LASSERT (ev->unlinked);
174 CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
175 "event type %d, status %d, desc %p\n",
176 ev->type, ev->status, desc);
178 spin_lock(&desc->bd_lock);
180 LASSERT(desc->bd_network_rw);
181 desc->bd_network_rw = 0;
183 if (ev->type != LNET_EVENT_UNLINK && ev->status == 0) {
184 desc->bd_success = 1;
185 desc->bd_nob_transferred = ev->mlength;
186 desc->bd_sender = ev->sender;
189 sptlrpc_enc_pool_put_pages(desc);
191 /* NB don't unlock till after wakeup; desc can disappear under us
193 ptlrpc_wake_client_req(desc->bd_req);
195 spin_unlock(&desc->bd_lock);
200 * Server's incoming request callback
202 void request_in_callback(lnet_event_t *ev)
204 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
205 struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
206 struct ptlrpc_service *service = rqbd->rqbd_service;
207 struct ptlrpc_request *req;
210 LASSERT (ev->type == LNET_EVENT_PUT ||
211 ev->type == LNET_EVENT_UNLINK);
212 LASSERT ((char *)ev->md.start >= rqbd->rqbd_buffer);
213 LASSERT ((char *)ev->md.start + ev->offset + ev->mlength <=
214 rqbd->rqbd_buffer + service->srv_buf_size);
216 CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
217 "event type %d, status %d, service %s\n",
218 ev->type, ev->status, service->srv_name);
221 /* If this is the last request message to fit in the
222 * request buffer we can use the request object embedded in
223 * rqbd. Note that if we failed to allocate a request,
224 * we'd have to re-post the rqbd, which we can't do in this
226 req = &rqbd->rqbd_req;
227 memset(req, 0, sizeof (*req));
229 LASSERT (ev->type == LNET_EVENT_PUT);
230 if (ev->status != 0) {
231 /* We moaned above already... */
234 OBD_ALLOC_GFP(req, sizeof(*req), CFS_ALLOC_ATOMIC_TRY);
236 CERROR("Can't allocate incoming request descriptor: "
237 "Dropping %s RPC from %s\n",
239 libcfs_id2str(ev->initiator));
244 /* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
245 * flags are reset and scalars are zero. We only set the message
246 * size to non-zero if this was a successful receive. */
247 req->rq_xid = ev->match_bits;
248 req->rq_reqbuf = ev->md.start + ev->offset;
249 if (ev->type == LNET_EVENT_PUT && ev->status == 0)
250 req->rq_reqdata_len = ev->mlength;
251 do_gettimeofday(&req->rq_arrival_time);
252 req->rq_peer = ev->initiator;
253 req->rq_self = ev->target.nid;
255 req->rq_phase = RQ_PHASE_NEW;
257 req->rq_uid = ev->uid;
259 spin_lock_init(&req->rq_lock);
260 CFS_INIT_LIST_HEAD(&req->rq_timed_list);
261 atomic_set(&req->rq_refcount, 1);
262 if (ev->type == LNET_EVENT_PUT)
263 DEBUG_REQ(D_RPCTRACE, req, "incoming req");
265 CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
267 spin_lock(&service->srv_lock);
269 req->rq_history_seq = service->srv_request_seq++;
270 list_add_tail(&req->rq_history_list, &service->srv_request_history);
273 service->srv_nrqbd_receiving--;
274 CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
275 service->srv_nrqbd_receiving);
277 /* Normally, don't complain about 0 buffers posted; LNET won't
278 * drop incoming reqs since we set the portal lazy */
279 if (test_req_buffer_pressure &&
280 ev->type != LNET_EVENT_UNLINK &&
281 service->srv_nrqbd_receiving == 0)
282 CWARN("All %s request buffers busy\n",
285 /* req takes over the network's ref on rqbd */
287 /* req takes a ref on rqbd */
288 rqbd->rqbd_refcount++;
291 list_add_tail(&req->rq_list, &service->srv_req_in_queue);
292 service->srv_n_queued_reqs++;
294 /* NB everything can disappear under us once the request
295 * has been queued and we unlock, so do the wake now... */
296 cfs_waitq_signal(&service->srv_waitq);
298 spin_unlock(&service->srv_lock);
303 * Server's outgoing reply callback
305 void reply_out_callback(lnet_event_t *ev)
307 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
308 struct ptlrpc_reply_state *rs = cbid->cbid_arg;
309 struct ptlrpc_service *svc = rs->rs_service;
312 LASSERT (ev->type == LNET_EVENT_SEND ||
313 ev->type == LNET_EVENT_ACK ||
314 ev->type == LNET_EVENT_UNLINK);
316 if (!rs->rs_difficult) {
317 /* 'Easy' replies have no further processing so I drop the
318 * net's ref on 'rs' */
319 LASSERT (ev->unlinked);
320 ptlrpc_rs_decref(rs);
321 atomic_dec (&svc->srv_outstanding_replies);
326 LASSERT (rs->rs_on_net);
329 /* Last network callback. The net's ref on 'rs' stays put
330 * until ptlrpc_server_handle_reply() is done with it */
331 spin_lock(&svc->srv_lock);
333 ptlrpc_schedule_difficult_reply (rs);
334 spin_unlock(&svc->srv_lock);
341 * Server's bulk completion callback
343 void server_bulk_callback (lnet_event_t *ev)
345 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
346 struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
349 LASSERT (ev->type == LNET_EVENT_SEND ||
350 ev->type == LNET_EVENT_UNLINK ||
351 (desc->bd_type == BULK_PUT_SOURCE &&
352 ev->type == LNET_EVENT_ACK) ||
353 (desc->bd_type == BULK_GET_SINK &&
354 ev->type == LNET_EVENT_REPLY));
356 CDEBUG((ev->status == 0) ? D_NET : D_ERROR,
357 "event type %d, status %d, desc %p\n",
358 ev->type, ev->status, desc);
360 spin_lock(&desc->bd_lock);
362 if ((ev->type == LNET_EVENT_ACK ||
363 ev->type == LNET_EVENT_REPLY) &&
365 /* We heard back from the peer, so even if we get this
366 * before the SENT event (oh yes we can), we know we
367 * read/wrote the peer buffer and how much... */
368 desc->bd_success = 1;
369 desc->bd_nob_transferred = ev->mlength;
370 desc->bd_sender = ev->sender;
374 /* This is the last callback no matter what... */
375 desc->bd_network_rw = 0;
376 cfs_waitq_signal(&desc->bd_waitq);
379 spin_unlock(&desc->bd_lock);
383 static void ptlrpc_master_callback(lnet_event_t *ev)
385 struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
386 void (*callback)(lnet_event_t *ev) = cbid->cbid_fn;
388 /* Honestly, it's best to find out early. */
389 LASSERT (cbid->cbid_arg != LP_POISON);
390 LASSERT (callback == request_out_callback ||
391 callback == reply_in_callback ||
392 callback == client_bulk_callback ||
393 callback == request_in_callback ||
394 callback == reply_out_callback ||
395 callback == server_bulk_callback);
400 int ptlrpc_uuid_to_peer (struct obd_uuid *uuid,
401 lnet_process_id_t *peer, lnet_nid_t *self)
404 __u32 best_order = 0;
407 int portals_compatibility;
413 portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
415 peer->pid = LUSTRE_SRV_LNET_PID;
417 /* Choose the matching UUID that's closest */
418 while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
419 dist = LNetDist(dst_nid, &src_nid, &order);
423 if (dist == 0) { /* local! use loopback LND */
424 peer->nid = *self = LNET_MKNID(LNET_MKNET(LOLND, 0), 0);
431 (dist == best_dist && order < best_order)) {
435 if (portals_compatibility > 1) {
436 /* Strong portals compatibility: Zero the nid's
437 * NET, so if I'm reading new config logs, or
438 * getting configured by (new) lconf I can
439 * still talk to old servers. */
440 dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
441 src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
449 CDEBUG(D_NET,"%s->%s\n", uuid->uuid, libcfs_id2str(*peer));
451 CERROR("No NID found for %s\n", uuid->uuid);
455 void ptlrpc_ni_fini(void)
458 struct l_wait_info lwi;
462 /* Wait for the event queue to become idle since there may still be
463 * messages in flight with pending events (i.e. the fire-and-forget
464 * messages == client requests and "non-difficult" server
467 for (retries = 0;; retries++) {
468 rc = LNetEQFree(ptlrpc_eq_h);
479 CWARN("Event queue still busy\n");
482 cfs_waitq_init(&waitq);
483 lwi = LWI_TIMEOUT(cfs_time_seconds(2), NULL, NULL);
484 l_wait_event(waitq, 0, &lwi);
491 lnet_pid_t ptl_get_pid(void)
498 pid = LUSTRE_SRV_LNET_PID;
503 int ptlrpc_ni_init(void)
509 CDEBUG(D_NET, "My pid is: %x\n", pid);
511 /* We're not passing any limits yet... */
512 rc = LNetNIInit(pid);
514 CDEBUG (D_NET, "Can't init network interface: %d\n", rc);
518 /* CAVEAT EMPTOR: how we process portals events is _radically_
519 * different depending on... */
521 /* kernel portals calls our master callback when events are added to
522 * the event queue. In fact lustre never pulls events off this queue,
523 * so it's only sized for some debug history. */
524 rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
526 /* liblustre calls the master callback when it removes events from the
527 * event queue. The event queue has to be big enough not to drop
529 rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &ptlrpc_eq_h);
534 CERROR ("Failed to allocate event queue: %d\n", rc);
541 CFS_LIST_HEAD(liblustre_wait_callbacks);
542 CFS_LIST_HEAD(liblustre_idle_callbacks);
543 void *liblustre_services_callback;
546 liblustre_register_waitidle_callback (struct list_head *callback_list,
548 int (*fn)(void *arg), void *arg)
550 struct liblustre_wait_callback *llwc;
552 OBD_ALLOC(llwc, sizeof(*llwc));
553 LASSERT (llwc != NULL);
555 llwc->llwc_name = name;
557 llwc->llwc_arg = arg;
558 list_add_tail(&llwc->llwc_list, callback_list);
564 liblustre_deregister_waitidle_callback (void *opaque)
566 struct liblustre_wait_callback *llwc = opaque;
568 list_del(&llwc->llwc_list);
569 OBD_FREE(llwc, sizeof(*llwc));
573 liblustre_register_wait_callback (const char *name,
574 int (*fn)(void *arg), void *arg)
576 return liblustre_register_waitidle_callback(&liblustre_wait_callbacks,
581 liblustre_deregister_wait_callback (void *opaque)
583 liblustre_deregister_waitidle_callback(opaque);
587 liblustre_register_idle_callback (const char *name,
588 int (*fn)(void *arg), void *arg)
590 return liblustre_register_waitidle_callback(&liblustre_idle_callbacks,
595 liblustre_deregister_idle_callback (void *opaque)
597 liblustre_deregister_waitidle_callback(opaque);
601 liblustre_check_events (int timeout)
608 rc = LNetEQPoll(&ptlrpc_eq_h, 1, timeout * 1000, &ev, &i);
612 LASSERT (rc == -EOVERFLOW || rc == 1);
614 /* liblustre: no asynch callback so we can't affort to miss any
616 if (rc == -EOVERFLOW) {
617 CERROR ("Dropped an event!!!\n");
621 ptlrpc_master_callback (&ev);
625 int liblustre_waiting = 0;
628 liblustre_wait_event (int timeout)
630 struct list_head *tmp;
631 struct liblustre_wait_callback *llwc;
632 int found_something = 0;
634 /* single threaded recursion check... */
635 liblustre_waiting = 1;
638 /* Deal with all pending events */
639 while (liblustre_check_events(0))
642 /* Give all registered callbacks a bite at the cherry */
643 list_for_each(tmp, &liblustre_wait_callbacks) {
644 llwc = list_entry(tmp, struct liblustre_wait_callback,
647 if (llwc->llwc_fn(llwc->llwc_arg))
651 if (found_something || timeout == 0)
654 /* Nothing so far, but I'm allowed to block... */
655 found_something = liblustre_check_events(timeout);
656 if (!found_something) /* still nothing */
657 break; /* I timed out */
660 liblustre_waiting = 0;
662 return found_something;
666 liblustre_wait_idle(void)
668 static int recursed = 0;
670 struct list_head *tmp;
671 struct liblustre_wait_callback *llwc;
678 liblustre_wait_event(0);
682 list_for_each(tmp, &liblustre_idle_callbacks) {
683 llwc = list_entry(tmp, struct liblustre_wait_callback,
686 if (!llwc->llwc_fn(llwc->llwc_arg)) {
697 #endif /* __KERNEL__ */
699 int ptlrpc_init_portals(void)
701 int rc = ptlrpc_ni_init();
704 CERROR("network initialisation failed\n");
708 liblustre_services_callback =
709 liblustre_register_wait_callback("liblustre_check_services",
710 &liblustre_check_services,
713 rc = ptlrpcd_addref();
717 CERROR("rpcd initialisation failed\n");
719 liblustre_deregister_wait_callback(liblustre_services_callback);
725 void ptlrpc_exit_portals(void)
728 liblustre_deregister_wait_callback(liblustre_services_callback);