1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ptlrpc/import.c
38 * Author: Mike Shaver <shaver@clusterfs.com>
41 #define DEBUG_SUBSYSTEM S_RPC
43 # include <liblustre.h>
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
52 #include <obd_class.h>
54 #include "ptlrpc_internal.h"
56 struct ptlrpc_connect_async_args {
57 __u64 pcaa_peer_committed;
58 int pcaa_initial_connect;
61 static void __import_set_state(struct obd_import *imp,
62 enum lustre_imp_state state)
64 imp->imp_state = state;
65 imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
66 imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
67 cfs_time_current_sec();
68 imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
72 /* A CLOSED import should remain so. */
73 #define IMPORT_SET_STATE_NOLOCK(imp, state) \
75 if (imp->imp_state != LUSTRE_IMP_CLOSED) { \
76 CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n", \
77 imp, obd2cli_tgt(imp->imp_obd), \
78 ptlrpc_import_state_name(imp->imp_state), \
79 ptlrpc_import_state_name(state)); \
80 __import_set_state(imp, state); \
84 #define IMPORT_SET_STATE(imp, state) \
86 spin_lock(&imp->imp_lock); \
87 IMPORT_SET_STATE_NOLOCK(imp, state); \
88 spin_unlock(&imp->imp_lock); \
92 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
94 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
96 /* Only this function is allowed to change the import state when it is
97 * CLOSED. I would rather refcount the import and free it after
98 * disconnection like we do with exports. To do that, the client_obd
99 * will need to save the peer info somewhere other than in the import,
101 int ptlrpc_init_import(struct obd_import *imp)
103 spin_lock(&imp->imp_lock);
105 imp->imp_generation++;
106 imp->imp_state = LUSTRE_IMP_NEW;
108 spin_unlock(&imp->imp_lock);
112 EXPORT_SYMBOL(ptlrpc_init_import);
114 #define UUID_STR "_UUID"
115 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
118 *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
119 ? uuid : uuid + strlen(prefix);
121 *uuid_len = strlen(*uuid_start);
123 if (*uuid_len < strlen(UUID_STR))
126 if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
127 UUID_STR, strlen(UUID_STR)))
128 *uuid_len -= strlen(UUID_STR);
131 /* Returns true if import was FULL, false if import was already not
133 * @imp - import to be disconnected
134 * @conn_cnt - connection count (epoch) of the request that timed out
135 * and caused the disconnection. In some cases, multiple
136 * inflight requests can fail to a single target (e.g. OST
137 * bulk requests) and if one has already caused a reconnection
138 * (increasing the import->conn_cnt) the older failure should
139 * not also cause a reconnection. If zero it forces a reconnect.
141 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
145 spin_lock(&imp->imp_lock);
147 if (imp->imp_state == LUSTRE_IMP_FULL &&
148 (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
152 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
153 &target_start, &target_len);
154 if (imp->imp_replayable) {
155 LCONSOLE_WARN("%s: Connection to service %.*s via nid "
156 "%s was lost; in progress operations using this "
157 "service will wait for recovery to complete.\n",
158 imp->imp_obd->obd_name, target_len, target_start,
159 libcfs_nid2str(imp->imp_connection->c_peer.nid));
161 LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
162 "%.*s via nid %s was lost; in progress "
163 "operations using this service will fail.\n",
164 imp->imp_obd->obd_name, target_len, target_start,
165 libcfs_nid2str(imp->imp_connection->c_peer.nid));
167 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
168 spin_unlock(&imp->imp_lock);
170 if (obd_dump_on_timeout)
171 libcfs_debug_dumplog();
173 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
176 spin_unlock(&imp->imp_lock);
177 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
178 imp->imp_client->cli_name, imp,
179 (imp->imp_state == LUSTRE_IMP_FULL &&
180 imp->imp_conn_cnt > conn_cnt) ?
181 "reconnected" : "not connected", imp->imp_conn_cnt,
182 conn_cnt, ptlrpc_import_state_name(imp->imp_state));
188 /* Must be called with imp_lock held! */
189 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
192 LASSERT_SPIN_LOCKED(&imp->imp_lock);
194 CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
195 imp->imp_invalid = 1;
196 imp->imp_generation++;
197 spin_unlock(&imp->imp_lock);
199 ptlrpc_abort_inflight(imp);
200 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
204 * This acts as a barrier; all existing requests are rejected, and
205 * no new requests will be accepted until the import is valid again.
207 void ptlrpc_deactivate_import(struct obd_import *imp)
209 spin_lock(&imp->imp_lock);
210 ptlrpc_deactivate_and_unlock_import(imp);
214 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
218 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
219 (req->rq_phase == RQ_PHASE_BULK) ||
220 (req->rq_phase == RQ_PHASE_NEW)))
223 if (req->rq_timedout)
226 if (req->rq_phase == RQ_PHASE_NEW)
229 dl = req->rq_deadline;
237 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
239 time_t now = cfs_time_current_sec();
240 struct list_head *tmp, *n;
241 struct ptlrpc_request *req;
242 unsigned int timeout = 0;
244 spin_lock(&imp->imp_lock);
245 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
246 req = list_entry(tmp, struct ptlrpc_request, rq_list);
247 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
249 spin_unlock(&imp->imp_lock);
254 * This function will invalidate the import, if necessary, then block
255 * for all the RPC completions, and finally notify the obd to
256 * invalidate its state (ie cancel locks, clear pending requests,
259 void ptlrpc_invalidate_import(struct obd_import *imp)
261 struct list_head *tmp, *n;
262 struct ptlrpc_request *req;
263 struct l_wait_info lwi;
264 unsigned int timeout;
267 atomic_inc(&imp->imp_inval_count);
270 * If this is an invalid MGC connection, then don't bother
271 * waiting for imp_inflight to drop to 0.
273 if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
276 if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
277 ptlrpc_deactivate_import(imp);
279 LASSERT(imp->imp_invalid);
281 /* Wait forever until inflight == 0. We really can't do it another
282 * way because in some cases we need to wait for very long reply
283 * unlink. We can't do anything before that because there is really
284 * no guarantee that some rdma transfer is not in progress right now. */
286 /* Calculate max timeout for waiting on rpcs to error
287 * out. Use obd_timeout if calculated value is smaller
289 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
290 timeout = ptlrpc_inflight_timeout(imp);
291 timeout += timeout / 3;
294 timeout = obd_timeout;
296 /* decrease the interval to increase race condition */
300 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
303 /* Wait for all requests to error out and call completion
304 * callbacks. Cap it at obd_timeout -- these should all
305 * have been locally cancelled by ptlrpc_abort_inflight. */
306 lwi = LWI_TIMEOUT_INTERVAL(
307 cfs_timeout_cap(cfs_time_seconds(timeout)),
308 (timeout > 1)?cfs_time_seconds(1):cfs_time_seconds(1)/2,
310 rc = l_wait_event(imp->imp_recovery_waitq,
311 (atomic_read(&imp->imp_inflight) == 0), &lwi);
313 const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
315 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
316 cli_tgt, rc, atomic_read(&imp->imp_inflight));
318 spin_lock(&imp->imp_lock);
319 if (atomic_read(&imp->imp_inflight) == 0) {
320 int count = atomic_read(&imp->imp_unregistering);
322 /* We know that "unregistering" rpcs only can
323 * survive in sending or delaying lists (they
324 * maybe waiting for long reply unlink in
325 * sluggish nets). Let's check this. If there
326 * is no inflight and unregistering != 0, this
328 LASSERTF(count == 0, "Some RPCs are still "
329 "unregistering: %d\n", count);
331 /* Let's save one loop as soon as inflight have
332 * dropped to zero. No new inflights possible at
336 list_for_each_safe(tmp, n,
337 &imp->imp_sending_list) {
338 req = list_entry(tmp,
339 struct ptlrpc_request,
341 DEBUG_REQ(D_ERROR, req,
342 "still on sending list");
344 list_for_each_safe(tmp, n,
345 &imp->imp_delayed_list) {
346 req = list_entry(tmp,
347 struct ptlrpc_request,
349 DEBUG_REQ(D_ERROR, req,
350 "still on delayed list");
353 CERROR("%s: RPCs in \"%s\" phase found (%d). "
354 "Network is sluggish? Waiting them "
355 "to error out.\n", cli_tgt,
356 ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
357 atomic_read(&imp->imp_unregistering));
359 spin_unlock(&imp->imp_lock);
363 /* Let's additionally check that no new rpcs added to import in
364 * "invalidate" state. */
365 LASSERT(atomic_read(&imp->imp_inflight) == 0);
368 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
370 atomic_dec(&imp->imp_inval_count);
371 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
374 /* unset imp_invalid */
375 void ptlrpc_activate_import(struct obd_import *imp)
377 struct obd_device *obd = imp->imp_obd;
379 spin_lock(&imp->imp_lock);
380 imp->imp_invalid = 0;
381 spin_unlock(&imp->imp_lock);
383 obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
386 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
390 LASSERT(!imp->imp_dlm_fake);
392 if (ptlrpc_set_import_discon(imp, conn_cnt)) {
393 if (!imp->imp_replayable) {
394 CDEBUG(D_HA, "import %s@%s for %s not replayable, "
395 "auto-deactivating\n",
396 obd2cli_tgt(imp->imp_obd),
397 imp->imp_connection->c_remote_uuid.uuid,
398 imp->imp_obd->obd_name);
399 ptlrpc_deactivate_import(imp);
402 CDEBUG(D_HA, "%s: waking up pinger\n",
403 obd2cli_tgt(imp->imp_obd));
405 spin_lock(&imp->imp_lock);
406 imp->imp_force_verify = 1;
407 spin_unlock(&imp->imp_lock);
409 ptlrpc_pinger_wake_up();
414 int ptlrpc_reconnect_import(struct obd_import *imp)
417 ptlrpc_set_import_discon(imp, 0);
418 /* Force a new connect attempt */
419 ptlrpc_invalidate_import(imp);
420 /* Do a fresh connect next time by zeroing the handle */
421 ptlrpc_disconnect_import(imp, 1);
422 /* Wait for all invalidate calls to finish */
423 if (atomic_read(&imp->imp_inval_count) > 0) {
425 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
426 rc = l_wait_event(imp->imp_recovery_waitq,
427 (atomic_read(&imp->imp_inval_count) == 0),
430 CERROR("Interrupted, inval=%d\n",
431 atomic_read(&imp->imp_inval_count));
435 * Allow reconnect attempts. Note: Currently, the function is
436 * only called by MGC. So assume this is a recoverable import,
437 * and force import to be recoverable. fix this if you need to
440 imp->imp_obd->obd_no_recov = 0;
441 /* Remove 'invalid' flag */
442 ptlrpc_activate_import(imp);
443 /* Attempt a new connect */
444 ptlrpc_recover_import(imp, NULL);
448 EXPORT_SYMBOL(ptlrpc_reconnect_import);
450 static int import_select_connection(struct obd_import *imp)
452 struct obd_import_conn *imp_conn = NULL, *conn;
453 struct obd_export *dlmexp;
457 spin_lock(&imp->imp_lock);
459 if (list_empty(&imp->imp_conn_list)) {
460 CERROR("%s: no connections available\n",
461 imp->imp_obd->obd_name);
462 spin_unlock(&imp->imp_lock);
466 list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
467 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
468 imp->imp_obd->obd_name,
469 libcfs_nid2str(conn->oic_conn->c_peer.nid),
470 conn->oic_last_attempt);
472 /* If we have not tried this connection since the
473 the last successful attempt, go with this one */
474 if ((conn->oic_last_attempt == 0) ||
475 cfs_time_beforeq_64(conn->oic_last_attempt,
476 imp->imp_last_success_conn)) {
482 /* If all of the connections have already been tried
483 since the last successful connection; just choose the
484 least recently used */
487 else if (cfs_time_before_64(conn->oic_last_attempt,
488 imp_conn->oic_last_attempt))
492 /* if not found, simply choose the current one */
493 if (!imp_conn || imp->imp_force_reconnect) {
494 LASSERT(imp->imp_conn_current);
495 imp_conn = imp->imp_conn_current;
498 LASSERT(imp_conn->oic_conn);
500 /* If we've tried everything, and we're back to the beginning of the
501 list, increase our timeout and try again. It will be reset when
502 we do finally connect. (FIXME: really we should wait for all network
503 state associated with the last connection attempt to drain before
504 trying to reconnect on it.) */
505 if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
506 !imp->imp_recon_bk /* not retrying */) {
507 if (at_get(&imp->imp_at.iat_net_latency) <
508 CONNECTION_SWITCH_MAX) {
509 at_measured(&imp->imp_at.iat_net_latency,
510 MIN(at_get(&imp->imp_at.iat_net_latency) +
511 CONNECTION_SWITCH_INC,
512 CONNECTION_SWITCH_MAX));
514 LASSERT(imp_conn->oic_last_attempt);
515 CWARN("%s: tried all connections, increasing latency to %ds\n",
516 imp->imp_obd->obd_name,
517 at_get(&imp->imp_at.iat_net_latency));
520 imp_conn->oic_last_attempt = cfs_time_current_64();
522 /* switch connection, don't mind if it's same as the current one */
523 if (imp->imp_connection)
524 ptlrpc_connection_put(imp->imp_connection);
525 imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
527 dlmexp = class_conn2export(&imp->imp_dlm_handle);
528 LASSERT(dlmexp != NULL);
529 if (dlmexp->exp_connection)
530 ptlrpc_connection_put(dlmexp->exp_connection);
531 dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
532 class_export_put(dlmexp);
534 if (imp->imp_conn_current != imp_conn) {
535 if (imp->imp_conn_current)
536 CDEBUG(D_HA, "Changing connection for %s to %s/%s\n",
537 imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid,
538 libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
539 imp->imp_conn_current = imp_conn;
542 CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
543 imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
544 libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
546 spin_unlock(&imp->imp_lock);
552 * must be called under imp lock
554 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
556 struct ptlrpc_request *req;
557 struct list_head *tmp;
559 if (list_empty(&imp->imp_replay_list))
561 tmp = imp->imp_replay_list.next;
562 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
563 *transno = req->rq_transno;
564 if (req->rq_transno == 0) {
565 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
572 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
574 struct obd_device *obd = imp->imp_obd;
576 int initial_connect = 0;
578 __u64 committed_before_reconnect = 0;
579 struct ptlrpc_request *request;
580 __u32 size[] = { sizeof(struct ptlrpc_body),
581 sizeof(imp->imp_obd->u.cli.cl_target_uuid),
582 sizeof(obd->obd_uuid),
583 sizeof(imp->imp_dlm_handle),
584 sizeof(imp->imp_connect_data) };
585 char *tmp[] = { NULL,
586 obd2cli_tgt(imp->imp_obd),
588 (char *)&imp->imp_dlm_handle,
589 (char *)&imp->imp_connect_data };
590 struct ptlrpc_connect_async_args *aa;
593 spin_lock(&imp->imp_lock);
594 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
595 spin_unlock(&imp->imp_lock);
596 CERROR("can't connect to a closed import\n");
598 } else if (imp->imp_state == LUSTRE_IMP_FULL) {
599 spin_unlock(&imp->imp_lock);
600 CERROR("already connected\n");
602 } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
603 spin_unlock(&imp->imp_lock);
604 CERROR("already connecting\n");
608 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
611 imp->imp_resend_replay = 0;
613 if (!lustre_handle_is_used(&imp->imp_remote_handle))
616 committed_before_reconnect = imp->imp_peer_committed_transno;
618 set_transno = ptlrpc_first_transno(imp,
619 &imp->imp_connect_data.ocd_transno);
621 spin_unlock(&imp->imp_lock);
624 struct obd_uuid uuid;
626 obd_str2uuid(&uuid, new_uuid);
627 rc = import_set_conn_priority(imp, &uuid);
632 rc = import_select_connection(imp);
636 /* last in connection list */
637 if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
638 if (imp->imp_initial_recov_bk && initial_connect) {
639 CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
640 imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
641 /* Don't retry if connect fails */
643 obd_set_info_async(obd->obd_self_export,
644 sizeof(KEY_INIT_RECOV),
646 sizeof(rc), &rc, NULL);
648 if (imp->imp_recon_bk) {
649 CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
650 imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
651 spin_lock(&imp->imp_lock);
652 imp->imp_last_recon = 1;
653 spin_unlock(&imp->imp_lock);
657 /* Reset connect flags to the originally requested flags, in case
658 * the server is updated on-the-fly we will get the new features. */
659 imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
660 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
662 rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
663 &obd->obd_uuid, &imp->imp_connect_data, NULL);
667 request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
670 GOTO(out, rc = -ENOMEM);
672 /* Report the rpc service time to the server so that it knows how long
673 * to wait for clients to join recovery */
674 lustre_msg_set_service_time(request->rq_reqmsg,
675 at_timeout2est(request->rq_timeout));
677 /* The amount of time we give the server to process the connect req.
678 * import_select_connection will increase the net latency on
679 * repeated reconnect attempts to cover slow networks.
680 * We override/ignore the server rpc completion estimate here,
681 * which may be large if this is a reconnect attempt */
682 request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
683 lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
686 lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
688 if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
689 lustre_msg_add_op_flags(request->rq_reqmsg,
690 MSG_CONNECT_NEXT_VER);
692 request->rq_no_resend = request->rq_no_delay = 1;
693 request->rq_send_state = LUSTRE_IMP_CONNECTING;
694 /* Allow a slightly larger reply for future growth compatibility */
695 size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
697 ptlrpc_req_set_repsize(request, 2, size);
698 request->rq_interpret_reply = ptlrpc_connect_interpret;
700 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
701 aa = ptlrpc_req_async_args(request);
702 memset(aa, 0, sizeof *aa);
704 aa->pcaa_peer_committed = committed_before_reconnect;
705 aa->pcaa_initial_connect = initial_connect;
706 if (aa->pcaa_initial_connect) {
707 spin_lock(&imp->imp_lock);
708 imp->imp_replayable = 1;
709 spin_unlock(&imp->imp_lock);
710 lustre_msg_add_op_flags(request->rq_reqmsg,
711 MSG_CONNECT_INITIAL);
715 lustre_msg_add_op_flags(request->rq_reqmsg,
716 MSG_CONNECT_TRANSNO);
718 DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
719 aa->pcaa_initial_connect ? "initial " : "re",
721 ptlrpcd_add_req(request);
725 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
730 EXPORT_SYMBOL(ptlrpc_connect_import);
732 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
735 /* the pinger takes care of issuing the next reconnect request */
738 /* liblustre has no pinger thead, so we wakup pinger anyway */
739 ptlrpc_pinger_wake_up();
743 static int ptlrpc_busy_reconnect(int rc)
745 return (rc == -EBUSY) || (rc == -EAGAIN);
748 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
751 struct ptlrpc_connect_async_args *aa = data;
752 struct obd_import *imp = request->rq_import;
753 struct client_obd *cli = &imp->imp_obd->u.cli;
754 struct lustre_handle old_hdl;
755 __u64 old_connect_flags;
759 spin_lock(&imp->imp_lock);
760 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
761 spin_unlock(&imp->imp_lock);
766 /* if this reconnect to busy export - not need select new target
768 imp->imp_force_reconnect = ptlrpc_busy_reconnect(rc);
769 spin_unlock(&imp->imp_lock);
773 LASSERT(imp->imp_conn_current);
775 msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
777 /* All imports are pingable */
778 imp->imp_pingable = 1;
779 imp->imp_force_reconnect = 0;
781 if (aa->pcaa_initial_connect) {
782 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
783 imp->imp_replayable = 1;
784 spin_unlock(&imp->imp_lock);
785 CDEBUG(D_HA, "connected to replayable target: %s\n",
786 obd2cli_tgt(imp->imp_obd));
788 imp->imp_replayable = 0;
789 spin_unlock(&imp->imp_lock);
792 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
793 msg_flags & MSG_CONNECT_NEXT_VER) ||
794 request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
795 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
796 CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
797 obd2cli_tgt(imp->imp_obd));
799 CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
800 obd2cli_tgt(imp->imp_obd));
803 imp->imp_remote_handle =
804 *lustre_msg_get_handle(request->rq_repmsg);
806 /* Initial connects are allowed for clients with non-random
807 * uuids when servers are in recovery. Simply signal the
808 * servers replay is complete and wait in REPLAY_WAIT. */
809 if (msg_flags & MSG_CONNECT_RECOVERING) {
810 CDEBUG(D_HA, "connect to %s during recovery\n",
811 obd2cli_tgt(imp->imp_obd));
812 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
814 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
815 ptlrpc_activate_import(imp);
817 GOTO(finish, rc = 0);
819 spin_unlock(&imp->imp_lock);
822 /* Determine what recovery state to move the import to. */
823 if (MSG_CONNECT_RECONNECT & msg_flags) {
824 memset(&old_hdl, 0, sizeof(old_hdl));
825 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
827 CERROR("%s@%s didn't like our handle "LPX64
828 ", failed\n", obd2cli_tgt(imp->imp_obd),
829 imp->imp_connection->c_remote_uuid.uuid,
830 imp->imp_dlm_handle.cookie);
831 GOTO(out, rc = -ENOTCONN);
834 if (memcmp(&imp->imp_remote_handle,
835 lustre_msg_get_handle(request->rq_repmsg),
836 sizeof(imp->imp_remote_handle))) {
837 int level = msg_flags & MSG_CONNECT_RECOVERING ?
840 /* Bug 16611/14775: if server handle have changed,
841 * that means some sort of disconnection happened.
842 * If the server is not in recovery, that also means it
843 * already erased all of our state because of previous
844 * eviction. If it is in recovery - we are safe to
845 * participate since we can reestablish all of our state
846 * with server again */
847 CDEBUG(level,"%s@%s changed server handle from "
848 LPX64" to "LPX64"%s\n",
849 obd2cli_tgt(imp->imp_obd),
850 imp->imp_connection->c_remote_uuid.uuid,
851 imp->imp_remote_handle.cookie,
852 lustre_msg_get_handle(request->rq_repmsg)->
854 (MSG_CONNECT_RECOVERING & msg_flags) ?
855 " but is still in recovery" : "");
857 imp->imp_remote_handle =
858 *lustre_msg_get_handle(request->rq_repmsg);
860 if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
861 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
862 GOTO(finish, rc = 0);
866 CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
867 obd2cli_tgt(imp->imp_obd),
868 imp->imp_connection->c_remote_uuid.uuid);
871 if (imp->imp_invalid) {
872 CDEBUG(D_HA, "%s: reconnected but import is invalid; "
873 "marking evicted\n", imp->imp_obd->obd_name);
874 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
875 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
876 CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
877 imp->imp_obd->obd_name,
878 obd2cli_tgt(imp->imp_obd));
880 spin_lock(&imp->imp_lock);
881 imp->imp_resend_replay = 1;
882 /* VBR: delayed connection */
883 if (MSG_CONNECT_DELAYED & msg_flags) {
884 imp->imp_delayed_recovery = 1;
885 imp->imp_no_lock_replay = 1;
887 spin_unlock(&imp->imp_lock);
889 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
891 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
893 } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
894 LASSERT(imp->imp_replayable);
895 imp->imp_remote_handle =
896 *lustre_msg_get_handle(request->rq_repmsg);
897 imp->imp_last_replay_transno = 0;
898 /* VBR: delayed connection */
899 if (MSG_CONNECT_DELAYED & msg_flags) {
900 spin_lock(&imp->imp_lock);
901 imp->imp_delayed_recovery = 1;
902 imp->imp_no_lock_replay = 1;
903 spin_unlock(&imp->imp_lock);
905 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
907 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
908 "flags reconnect/recovering not set: %x)",msg_flags);
909 imp->imp_remote_handle =
910 *lustre_msg_get_handle(request->rq_repmsg);
911 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
914 /* Sanity checks for a reconnected import. */
915 if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
916 CERROR("imp_replayable flag does not match server "
917 "after reconnect. We should LBUG right here.\n");
920 if (lustre_msg_get_last_committed(request->rq_repmsg) > 0 &&
921 lustre_msg_get_last_committed(request->rq_repmsg) <
922 aa->pcaa_peer_committed) {
923 CERROR("%s went back in time (transno "LPD64
924 " was previously committed, server now claims "LPD64
925 ")! See https://bugzilla.lustre.org/show_bug.cgi?"
927 obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
928 lustre_msg_get_last_committed(request->rq_repmsg));
932 rc = ptlrpc_import_recovery_state_machine(imp);
934 if (rc == -ENOTCONN) {
935 CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
936 "invalidating and reconnecting\n",
937 obd2cli_tgt(imp->imp_obd),
938 imp->imp_connection->c_remote_uuid.uuid);
939 ptlrpc_connect_import(imp, NULL);
943 struct obd_connect_data *ocd;
944 struct obd_export *exp;
946 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
947 lustre_swab_connect);
948 spin_lock(&imp->imp_lock);
949 list_del(&imp->imp_conn_current->oic_item);
950 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
951 imp->imp_last_success_conn =
952 imp->imp_conn_current->oic_last_attempt;
955 spin_unlock(&imp->imp_lock);
956 CERROR("Wrong connect data from server\n");
961 imp->imp_connect_data = *ocd;
963 exp = class_conn2export(&imp->imp_dlm_handle);
964 spin_unlock(&imp->imp_lock);
966 /* check that server granted subset of flags we asked for. */
967 LASSERTF((ocd->ocd_connect_flags &
968 imp->imp_connect_flags_orig) ==
969 ocd->ocd_connect_flags, LPX64" != "LPX64,
970 imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
973 /* This could happen if export is cleaned during the
975 CERROR("Missing export for %s\n",
976 imp->imp_obd->obd_name);
977 GOTO(out, rc = -ENODEV);
979 old_connect_flags = exp->exp_connect_flags;
980 exp->exp_connect_flags = ocd->ocd_connect_flags;
981 imp->imp_obd->obd_self_export->exp_connect_flags =
982 ocd->ocd_connect_flags;
983 class_export_put(exp);
985 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
987 if (!ocd->ocd_ibits_known &&
988 ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
989 CERROR("Inodebits aware server returned zero compatible"
992 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
993 (ocd->ocd_version > LUSTRE_VERSION_CODE +
994 LUSTRE_VERSION_OFFSET_WARN ||
995 ocd->ocd_version < LUSTRE_VERSION_CODE -
996 LUSTRE_VERSION_OFFSET_WARN)) {
997 /* Sigh, some compilers do not like #ifdef in the middle
998 of macro arguments */
1001 "older. Consider upgrading this client";
1004 "older. Consider recompiling this application";
1006 const char *newer = "newer than client version";
1008 LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1009 "is much %s (%s)\n",
1010 obd2cli_tgt(imp->imp_obd),
1011 OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1012 OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1013 OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1014 OBD_OCD_VERSION_FIX(ocd->ocd_version),
1015 ocd->ocd_version > LUSTRE_VERSION_CODE ?
1016 newer : older, LUSTRE_VERSION_STRING);
1019 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1020 /* We sent to the server ocd_cksum_types with bits set
1021 * for algorithms we understand. The server masked off
1022 * the checksum types it doesn't support */
1023 if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1024 LCONSOLE_WARN("The negotiation of the checksum "
1025 "alogrithm to use with server %s "
1026 "failed (%x/%x), disabling "
1028 obd2cli_tgt(imp->imp_obd),
1029 ocd->ocd_cksum_types,
1031 cli->cl_checksum = 0;
1032 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1033 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1035 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1037 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1038 cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1039 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1040 cli->cl_cksum_type = OBD_CKSUM_ADLER;
1042 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1045 /* The server does not support OBD_CONNECT_CKSUM.
1046 * Enforce CRC32 for backward compatibility*/
1047 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1048 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1051 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1052 cli->cl_max_pages_per_rpc =
1053 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1056 /* Reset ns_connect_flags only for initial connect. It might be
1057 * changed in while using FS and if we reset it in reconnect
1058 * this leads to lossing user settings done before such as
1059 * disable lru_resize, etc. */
1060 if (old_connect_flags != exp->exp_connect_flags ||
1061 aa->pcaa_initial_connect) {
1062 CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1063 "flags: "LPX64"\n", imp->imp_obd->obd_name,
1064 ocd->ocd_connect_flags);
1065 imp->imp_obd->obd_namespace->ns_connect_flags =
1066 ocd->ocd_connect_flags;
1067 imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1068 ocd->ocd_connect_flags;
1071 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1072 (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1073 /* We need a per-message support flag, because
1074 a. we don't know if the incoming connect reply
1075 supports AT or not (in reply_in_callback)
1077 b. failovered server means export and flags are gone
1078 (in ptlrpc_send_reply).
1079 Can only be set when we know AT is supported at
1081 imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1083 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1085 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1086 (cli->cl_max_pages_per_rpc > 0));
1091 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1092 spin_lock(&imp->imp_lock);
1093 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1094 (request->rq_import_generation == imp->imp_generation))
1095 ptlrpc_deactivate_and_unlock_import(imp);
1097 spin_unlock(&imp->imp_lock);
1099 if (imp->imp_recon_bk && imp->imp_last_recon) {
1100 /* Give up trying to reconnect */
1101 imp->imp_obd->obd_no_recov = 1;
1102 ptlrpc_deactivate_import(imp);
1105 if (rc == -EPROTO) {
1106 struct obd_connect_data *ocd;
1107 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1109 lustre_swab_connect);
1111 (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1112 (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1113 /* Actually servers are only supposed to refuse
1114 connection from liblustre clients, so we should
1115 never see this from VFS context */
1116 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1118 " refused connection from this client "
1119 "with an incompatible version (%s). "
1120 "Client must be recompiled\n",
1121 obd2cli_tgt(imp->imp_obd),
1122 OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1123 OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1124 OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1125 OBD_OCD_VERSION_FIX(ocd->ocd_version),
1126 LUSTRE_VERSION_STRING);
1127 ptlrpc_deactivate_import(imp);
1128 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1133 ptlrpc_maybe_ping_import_soon(imp);
1135 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1136 obd2cli_tgt(imp->imp_obd),
1137 (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1140 spin_lock(&imp->imp_lock);
1141 imp->imp_last_recon = 0;
1142 spin_unlock(&imp->imp_lock);
1144 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1148 static int completed_replay_interpret(struct ptlrpc_request *req,
1149 void * data, int rc)
1152 atomic_dec(&req->rq_import->imp_replay_inflight);
1153 if (req->rq_status == 0 &&
1154 !req->rq_import->imp_vbr_failed) {
1155 ptlrpc_import_recovery_state_machine(req->rq_import);
1157 if (req->rq_import->imp_vbr_failed) {
1159 "%s: version recovery fails, reconnecting\n",
1160 req->rq_import->imp_obd->obd_name);
1162 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1164 req->rq_import->imp_obd->obd_name,
1167 ptlrpc_connect_import(req->rq_import, NULL);
1172 static int signal_completed_replay(struct obd_import *imp)
1174 struct ptlrpc_request *req;
1177 LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1178 atomic_inc(&imp->imp_replay_inflight);
1180 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1182 atomic_dec(&imp->imp_replay_inflight);
1186 ptlrpc_req_set_repsize(req, 1, NULL);
1187 req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1188 lustre_msg_add_flags(req->rq_reqmsg,
1189 MSG_LOCK_REPLAY_DONE |
1190 MSG_REQ_REPLAY_DONE |
1193 if (imp->imp_delayed_recovery)
1194 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1195 req->rq_interpret_reply = completed_replay_interpret;
1198 req->rq_timeout *= 3;
1200 ptlrpcd_add_req(req);
1205 static int ptlrpc_invalidate_import_thread(void *data)
1207 struct obd_import *imp = data;
1211 cfs_daemonize_ctxt("ll_imp_inval");
1213 CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1214 imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1215 imp->imp_connection->c_remote_uuid.uuid);
1217 ptlrpc_invalidate_import(imp);
1219 if (obd_dump_on_eviction) {
1220 CERROR("dump the log upon eviction\n");
1221 libcfs_debug_dumplog();
1224 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1225 ptlrpc_import_recovery_state_machine(imp);
1227 class_import_put(imp);
1232 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1240 if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1241 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1242 &target_start, &target_len);
1243 /* Don't care about MGC eviction */
1244 if (strcmp(imp->imp_obd->obd_type->typ_name,
1245 LUSTRE_MGC_NAME) != 0) {
1246 LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1247 "%.*s; in progress operations using "
1248 "this service will fail.\n",
1249 target_len, target_start);
1251 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1252 obd2cli_tgt(imp->imp_obd),
1253 imp->imp_connection->c_remote_uuid.uuid);
1254 /* reset vbr_failed flag upon eviction */
1255 spin_lock(&imp->imp_lock);
1256 imp->imp_vbr_failed = 0;
1257 spin_unlock(&imp->imp_lock);
1260 /* bug 17802: XXX client_disconnect_export vs connect request
1261 * race. if client will evicted at this time, we start
1262 * invalidate thread without referece to import and import can
1263 * be freed at same time. */
1264 class_import_get(imp);
1265 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1266 CLONE_VM | CLONE_FILES);
1268 class_import_put(imp);
1269 CERROR("error starting invalidate thread: %d\n", rc);
1275 ptlrpc_invalidate_import(imp);
1277 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1281 if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1282 CDEBUG(D_HA, "replay requested by %s\n",
1283 obd2cli_tgt(imp->imp_obd));
1284 rc = ptlrpc_replay_next(imp, &inflight);
1285 if (inflight == 0 &&
1286 atomic_read(&imp->imp_replay_inflight) == 0) {
1287 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1288 rc = ldlm_replay_locks(imp);
1295 if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1296 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1297 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1298 rc = signal_completed_replay(imp);
1305 if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1306 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1307 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1311 if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1312 CDEBUG(D_HA, "reconnected to %s@%s\n",
1313 obd2cli_tgt(imp->imp_obd),
1314 imp->imp_connection->c_remote_uuid.uuid);
1316 rc = ptlrpc_resend(imp);
1319 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1320 ptlrpc_activate_import(imp);
1322 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1323 &target_start, &target_len);
1324 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1325 "using nid %s.\n", imp->imp_obd->obd_name,
1326 target_len, target_start,
1327 libcfs_nid2str(imp->imp_connection->c_peer.nid));
1330 if (imp->imp_state == LUSTRE_IMP_FULL) {
1331 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1332 ptlrpc_wake_delayed(imp);
1339 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1341 struct ptlrpc_request *req;
1343 int nowait = imp->imp_obd->obd_force;
1347 GOTO(set_state, rc);
1349 switch (imp->imp_connect_op) {
1350 case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1351 case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1352 case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1354 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1355 obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1359 if (ptlrpc_import_in_recovery(imp)) {
1360 struct l_wait_info lwi;
1361 cfs_duration_t timeout;
1364 timeout = cfs_time_seconds(obd_timeout);
1366 int idx = import_at_get_index(imp,
1367 imp->imp_client->cli_request_portal);
1368 timeout = cfs_time_seconds(
1369 at_get(&imp->imp_at.iat_service_estimate[idx]));
1371 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1372 back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1373 rc = l_wait_event(imp->imp_recovery_waitq,
1374 !ptlrpc_import_in_recovery(imp), &lwi);
1377 spin_lock(&imp->imp_lock);
1378 if (imp->imp_state != LUSTRE_IMP_FULL)
1381 spin_unlock(&imp->imp_lock);
1383 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1385 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1386 * it fails. We can get through the above with a down server
1387 * if the client doesn't know the server is gone yet. */
1388 req->rq_no_resend = 1;
1391 /* We want client umounts to happen quickly, no matter the
1393 req->rq_timeout = min_t(int, req->rq_timeout,
1394 INITIAL_CONNECT_TIMEOUT);
1396 /* ... but we always want liblustre clients to nicely
1397 disconnect, so only use the adaptive value. */
1399 req->rq_timeout = obd_timeout / 3;
1402 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1403 req->rq_send_state = LUSTRE_IMP_CONNECTING;
1404 ptlrpc_req_set_repsize(req, 1, NULL);
1405 rc = ptlrpc_queue_wait(req);
1406 ptlrpc_req_finished(req);
1410 spin_lock(&imp->imp_lock);
1413 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1415 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1416 memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1417 /* Try all connections in the future - bz 12758 */
1418 imp->imp_last_recon = 0;
1419 spin_unlock(&imp->imp_lock);
1424 /* Sets maximal number of RPCs possible originating from other side of this
1425 import (server) to us and number of async RPC replies that we are not waiting
1427 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1429 LNetSetAsync(imp->imp_connection->c_peer, count);
1432 void ptlrpc_cleanup_imp(struct obd_import *imp)
1436 spin_lock(&imp->imp_lock);
1437 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1438 imp->imp_generation++;
1439 spin_unlock(&imp->imp_lock);
1440 ptlrpc_abort_inflight(imp);
1445 /* Adaptive Timeout utils */
1446 extern unsigned int at_min, at_max, at_history;
1448 /* Bin into timeslices using AT_BINS bins.
1449 This gives us a max of the last binlimit*AT_BINS secs without the storage,
1450 but still smoothing out a return to normalcy from a slow response.
1451 (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1452 int at_measured(struct adaptive_timeout *at, unsigned int val)
1454 unsigned int old = at->at_current;
1455 time_t now = cfs_time_current_sec();
1456 time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1459 CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1460 val, at, now - at->at_binstart, at->at_current,
1461 at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1464 /* 0's don't count, because we never want our timeout to
1465 drop to 0, and because 0 could mean an error */
1468 spin_lock(&at->at_lock);
1470 if (unlikely(at->at_binstart == 0)) {
1471 /* Special case to remove default from history */
1472 at->at_current = val;
1473 at->at_worst_ever = val;
1474 at->at_worst_time = now;
1475 at->at_hist[0] = val;
1476 at->at_binstart = now;
1477 } else if (now - at->at_binstart < binlimit ) {
1479 at->at_hist[0] = max(val, at->at_hist[0]);
1480 at->at_current = max(val, at->at_current);
1483 unsigned int maxv = val;
1484 /* move bins over */
1485 shift = (now - at->at_binstart) / binlimit;
1487 for(i = AT_BINS - 1; i >= 0; i--) {
1489 at->at_hist[i] = at->at_hist[i - shift];
1490 maxv = max(maxv, at->at_hist[i]);
1495 at->at_hist[0] = val;
1496 at->at_current = maxv;
1497 at->at_binstart += shift * binlimit;
1500 if (at->at_current > at->at_worst_ever) {
1501 at->at_worst_ever = at->at_current;
1502 at->at_worst_time = now;
1505 if (at->at_flags & AT_FLG_NOHIST)
1506 /* Only keep last reported val; keeping the rest of the history
1508 at->at_current = val;
1511 at->at_current = min(at->at_current, at_max);
1512 at->at_current = max(at->at_current, at_min);
1514 if (at->at_current != old)
1515 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1516 "(val=%u) hist %u %u %u %u\n", at,
1517 old, at->at_current, at->at_current - old, val,
1518 at->at_hist[0], at->at_hist[1], at->at_hist[2],
1521 /* if we changed, report the old value */
1522 old = (at->at_current != old) ? old : 0;
1524 spin_unlock(&at->at_lock);
1528 /* Find the imp_at index for a given portal; assign if space available */
1529 int import_at_get_index(struct obd_import *imp, int portal)
1531 struct imp_at *at = &imp->imp_at;
1534 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1535 if (at->iat_portal[i] == portal)
1537 if (at->iat_portal[i] == 0)
1542 /* Not found in list, add it under a lock */
1543 spin_lock(&imp->imp_lock);
1545 /* Check unused under lock */
1546 for (; i < IMP_AT_MAX_PORTALS; i++) {
1547 if (at->iat_portal[i] == portal)
1549 if (at->iat_portal[i] == 0)
1554 /* Not enough portals? */
1555 LASSERT(i < IMP_AT_MAX_PORTALS);
1557 at->iat_portal[i] = portal;
1559 spin_unlock(&imp->imp_lock);