1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ptlrpc/import.c
38 * Author: Mike Shaver <shaver@clusterfs.com>
41 #define DEBUG_SUBSYSTEM S_RPC
43 # include <liblustre.h>
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
52 #include <obd_class.h>
54 #include "ptlrpc_internal.h"
56 struct ptlrpc_connect_async_args {
57 __u64 pcaa_peer_committed;
58 int pcaa_initial_connect;
61 static void __import_set_state(struct obd_import *imp,
62 enum lustre_imp_state state)
64 imp->imp_state = state;
65 imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
66 imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
67 cfs_time_current_sec();
68 imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
72 /* A CLOSED import should remain so. */
73 #define IMPORT_SET_STATE_NOLOCK(imp, state) \
75 if (imp->imp_state != LUSTRE_IMP_CLOSED) { \
76 CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n", \
77 imp, obd2cli_tgt(imp->imp_obd), \
78 ptlrpc_import_state_name(imp->imp_state), \
79 ptlrpc_import_state_name(state)); \
80 __import_set_state(imp, state); \
84 #define IMPORT_SET_STATE(imp, state) \
86 spin_lock(&imp->imp_lock); \
87 IMPORT_SET_STATE_NOLOCK(imp, state); \
88 spin_unlock(&imp->imp_lock); \
92 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
94 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
96 /* Only this function is allowed to change the import state when it is
97 * CLOSED. I would rather refcount the import and free it after
98 * disconnection like we do with exports. To do that, the client_obd
99 * will need to save the peer info somewhere other than in the import,
101 int ptlrpc_init_import(struct obd_import *imp)
103 spin_lock(&imp->imp_lock);
105 imp->imp_generation++;
106 imp->imp_state = LUSTRE_IMP_NEW;
108 spin_unlock(&imp->imp_lock);
112 EXPORT_SYMBOL(ptlrpc_init_import);
114 #define UUID_STR "_UUID"
115 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
118 *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
119 ? uuid : uuid + strlen(prefix);
121 *uuid_len = strlen(*uuid_start);
123 if (*uuid_len < strlen(UUID_STR))
126 if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
127 UUID_STR, strlen(UUID_STR)))
128 *uuid_len -= strlen(UUID_STR);
131 /* Returns true if import was FULL, false if import was already not
133 * @imp - import to be disconnected
134 * @conn_cnt - connection count (epoch) of the request that timed out
135 * and caused the disconnection. In some cases, multiple
136 * inflight requests can fail to a single target (e.g. OST
137 * bulk requests) and if one has already caused a reconnection
138 * (increasing the import->conn_cnt) the older failure should
139 * not also cause a reconnection. If zero it forces a reconnect.
141 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
145 spin_lock(&imp->imp_lock);
147 if (imp->imp_state == LUSTRE_IMP_FULL &&
148 (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
152 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
153 &target_start, &target_len);
154 if (imp->imp_replayable) {
155 LCONSOLE_WARN("%s: Connection to service %.*s via nid "
156 "%s was lost; in progress operations using this "
157 "service will wait for recovery to complete.\n",
158 imp->imp_obd->obd_name, target_len, target_start,
159 libcfs_nid2str(imp->imp_connection->c_peer.nid));
161 LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
162 "%.*s via nid %s was lost; in progress "
163 "operations using this service will fail.\n",
164 imp->imp_obd->obd_name, target_len, target_start,
165 libcfs_nid2str(imp->imp_connection->c_peer.nid));
167 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
168 spin_unlock(&imp->imp_lock);
170 if (obd_dump_on_timeout)
171 libcfs_debug_dumplog();
173 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
176 spin_unlock(&imp->imp_lock);
177 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
178 imp->imp_client->cli_name, imp,
179 (imp->imp_state == LUSTRE_IMP_FULL &&
180 imp->imp_conn_cnt > conn_cnt) ?
181 "reconnected" : "not connected", imp->imp_conn_cnt,
182 conn_cnt, ptlrpc_import_state_name(imp->imp_state));
188 /* Must be called with imp_lock held! */
189 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
192 LASSERT_SPIN_LOCKED(&imp->imp_lock);
194 CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
195 imp->imp_invalid = 1;
196 imp->imp_generation++;
197 spin_unlock(&imp->imp_lock);
199 ptlrpc_abort_inflight(imp);
200 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
204 * This acts as a barrier; all existing requests are rejected, and
205 * no new requests will be accepted until the import is valid again.
207 void ptlrpc_deactivate_import(struct obd_import *imp)
209 spin_lock(&imp->imp_lock);
210 ptlrpc_deactivate_and_unlock_import(imp);
214 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
218 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
219 (req->rq_phase == RQ_PHASE_BULK) ||
220 (req->rq_phase == RQ_PHASE_NEW)))
223 if (req->rq_timedout)
226 if (req->rq_phase == RQ_PHASE_NEW)
229 dl = req->rq_deadline;
237 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
239 time_t now = cfs_time_current_sec();
240 struct list_head *tmp, *n;
241 struct ptlrpc_request *req;
242 unsigned int timeout = 0;
244 spin_lock(&imp->imp_lock);
245 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
246 req = list_entry(tmp, struct ptlrpc_request, rq_list);
247 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
249 spin_unlock(&imp->imp_lock);
254 * This function will invalidate the import, if necessary, then block
255 * for all the RPC completions, and finally notify the obd to
256 * invalidate its state (ie cancel locks, clear pending requests,
259 void ptlrpc_invalidate_import(struct obd_import *imp)
261 struct list_head *tmp, *n;
262 struct ptlrpc_request *req;
263 struct l_wait_info lwi;
264 unsigned int timeout;
267 atomic_inc(&imp->imp_inval_count);
270 * If this is an invalid MGC connection, then don't bother
271 * waiting for imp_inflight to drop to 0.
273 if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
276 if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
277 ptlrpc_deactivate_import(imp);
279 LASSERT(imp->imp_invalid);
281 /* Wait forever until inflight == 0. We really can't do it another
282 * way because in some cases we need to wait for very long reply
283 * unlink. We can't do anything before that because there is really
284 * no guarantee that some rdma transfer is not in progress right now. */
286 /* Calculate max timeout for waiting on rpcs to error
287 * out. Use obd_timeout if calculated value is smaller
289 timeout = ptlrpc_inflight_timeout(imp);
290 timeout += timeout / 3;
293 timeout = obd_timeout;
295 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
298 /* Wait for all requests to error out and call completion
299 * callbacks. Cap it at obd_timeout -- these should all
300 * have been locally cancelled by ptlrpc_abort_inflight. */
301 lwi = LWI_TIMEOUT_INTERVAL(
302 cfs_timeout_cap(cfs_time_seconds(timeout)),
303 cfs_time_seconds(1), NULL, NULL);
304 rc = l_wait_event(imp->imp_recovery_waitq,
305 (atomic_read(&imp->imp_inflight) == 0), &lwi);
307 const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
309 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
310 cli_tgt, rc, atomic_read(&imp->imp_inflight));
312 spin_lock(&imp->imp_lock);
313 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
314 req = list_entry(tmp, struct ptlrpc_request,
316 DEBUG_REQ(D_ERROR, req,"still on sending list");
318 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
319 req = list_entry(tmp, struct ptlrpc_request,
321 DEBUG_REQ(D_ERROR, req,"still on delayed list");
324 if (atomic_read(&imp->imp_unregistering) == 0) {
325 /* We know that only "unregistering" rpcs may
326 * still survive in sending or delaying lists
327 * (They are waiting for long reply unlink in
328 * sluggish nets). Let's check this. If there
329 * is no unregistering and inflight != 0 this
331 LASSERT(atomic_read(&imp->imp_inflight) == 0);
333 /* Let's save one loop as soon as inflight have
334 * dropped to zero. No new inflights possible at
338 CERROR("%s: RPCs in \"%s\" phase found (%d). "
339 "Network is sluggish? Waiting them "
340 "to error out.\n", cli_tgt,
341 ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
342 atomic_read(&imp->imp_unregistering));
344 spin_unlock(&imp->imp_lock);
348 /* Let's additionally check that no new rpcs added to import in
349 * "invalidate" state. */
350 LASSERT(atomic_read(&imp->imp_inflight) == 0);
353 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
355 atomic_dec(&imp->imp_inval_count);
356 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
359 /* unset imp_invalid */
360 void ptlrpc_activate_import(struct obd_import *imp)
362 struct obd_device *obd = imp->imp_obd;
364 spin_lock(&imp->imp_lock);
365 imp->imp_invalid = 0;
366 spin_unlock(&imp->imp_lock);
368 obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
371 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
375 LASSERT(!imp->imp_dlm_fake);
377 if (ptlrpc_set_import_discon(imp, conn_cnt)) {
378 if (!imp->imp_replayable) {
379 CDEBUG(D_HA, "import %s@%s for %s not replayable, "
380 "auto-deactivating\n",
381 obd2cli_tgt(imp->imp_obd),
382 imp->imp_connection->c_remote_uuid.uuid,
383 imp->imp_obd->obd_name);
384 ptlrpc_deactivate_import(imp);
387 CDEBUG(D_HA, "%s: waking up pinger\n",
388 obd2cli_tgt(imp->imp_obd));
390 spin_lock(&imp->imp_lock);
391 imp->imp_force_verify = 1;
392 spin_unlock(&imp->imp_lock);
394 ptlrpc_pinger_wake_up();
399 int ptlrpc_reconnect_import(struct obd_import *imp)
402 ptlrpc_set_import_discon(imp, 0);
403 /* Force a new connect attempt */
404 ptlrpc_invalidate_import(imp);
405 /* Do a fresh connect next time by zeroing the handle */
406 ptlrpc_disconnect_import(imp, 1);
407 /* Wait for all invalidate calls to finish */
408 if (atomic_read(&imp->imp_inval_count) > 0) {
410 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
411 rc = l_wait_event(imp->imp_recovery_waitq,
412 (atomic_read(&imp->imp_inval_count) == 0),
415 CERROR("Interrupted, inval=%d\n",
416 atomic_read(&imp->imp_inval_count));
420 * Allow reconnect attempts. Note: Currently, the function is
421 * only called by MGC. So assume this is a recoverable import,
422 * and force import to be recoverable. fix this if you need to
425 imp->imp_obd->obd_no_recov = 0;
426 /* Remove 'invalid' flag */
427 ptlrpc_activate_import(imp);
428 /* Attempt a new connect */
429 ptlrpc_recover_import(imp, NULL);
433 EXPORT_SYMBOL(ptlrpc_reconnect_import);
435 static int import_select_connection(struct obd_import *imp)
437 struct obd_import_conn *imp_conn = NULL, *conn;
438 struct obd_export *dlmexp;
442 spin_lock(&imp->imp_lock);
444 if (list_empty(&imp->imp_conn_list)) {
445 CERROR("%s: no connections available\n",
446 imp->imp_obd->obd_name);
447 spin_unlock(&imp->imp_lock);
451 list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
452 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
453 imp->imp_obd->obd_name,
454 libcfs_nid2str(conn->oic_conn->c_peer.nid),
455 conn->oic_last_attempt);
457 /* Don't thrash connections */
458 if (cfs_time_before_64(cfs_time_current_64(),
459 conn->oic_last_attempt +
460 cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
464 /* If we have not tried this connection since the
465 the last successful attempt, go with this one */
466 if ((conn->oic_last_attempt == 0) ||
467 cfs_time_beforeq_64(conn->oic_last_attempt,
468 imp->imp_last_success_conn)) {
474 /* If all of the connections have already been tried
475 since the last successful connection; just choose the
476 least recently used */
479 else if (cfs_time_before_64(conn->oic_last_attempt,
480 imp_conn->oic_last_attempt))
484 /* if not found, simply choose the current one */
485 if (!imp_conn || imp->imp_force_reconnect) {
486 LASSERT(imp->imp_conn_current);
487 imp_conn = imp->imp_conn_current;
490 LASSERT(imp_conn->oic_conn);
492 /* If we've tried everything, and we're back to the beginning of the
493 list, increase our timeout and try again. It will be reset when
494 we do finally connect. (FIXME: really we should wait for all network
495 state associated with the last connection attempt to drain before
496 trying to reconnect on it.) */
497 if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
498 !imp->imp_recon_bk /* not retrying */) {
499 if (at_get(&imp->imp_at.iat_net_latency) <
500 CONNECTION_SWITCH_MAX) {
501 at_add(&imp->imp_at.iat_net_latency,
502 MIN(at_get(&imp->imp_at.iat_net_latency) +
503 CONNECTION_SWITCH_INC, CONNECTION_SWITCH_MAX));
505 LASSERT(imp_conn->oic_last_attempt);
506 CWARN("%s: tried all connections, increasing latency to %ds\n",
507 imp->imp_obd->obd_name,
508 at_get(&imp->imp_at.iat_net_latency));
511 imp_conn->oic_last_attempt = cfs_time_current_64();
513 /* switch connection, don't mind if it's same as the current one */
514 if (imp->imp_connection)
515 ptlrpc_connection_put(imp->imp_connection);
516 imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
518 dlmexp = class_conn2export(&imp->imp_dlm_handle);
519 LASSERT(dlmexp != NULL);
520 if (dlmexp->exp_connection)
521 ptlrpc_connection_put(dlmexp->exp_connection);
522 dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
523 class_export_put(dlmexp);
525 if (imp->imp_conn_current != imp_conn) {
526 if (imp->imp_conn_current)
527 CDEBUG(D_HA, "Changing connection for %s to %s/%s\n",
528 imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid,
529 libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
530 imp->imp_conn_current = imp_conn;
533 CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
534 imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
535 libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
537 spin_unlock(&imp->imp_lock);
543 * must be called under imp lock
545 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
547 struct ptlrpc_request *req;
548 struct list_head *tmp;
550 if (list_empty(&imp->imp_replay_list))
552 tmp = imp->imp_replay_list.next;
553 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
554 *transno = req->rq_transno;
555 if (req->rq_transno == 0) {
556 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
563 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
565 struct obd_device *obd = imp->imp_obd;
567 int initial_connect = 0;
569 __u64 committed_before_reconnect = 0;
570 struct ptlrpc_request *request;
571 __u32 size[] = { sizeof(struct ptlrpc_body),
572 sizeof(imp->imp_obd->u.cli.cl_target_uuid),
573 sizeof(obd->obd_uuid),
574 sizeof(imp->imp_dlm_handle),
575 sizeof(imp->imp_connect_data) };
576 char *tmp[] = { NULL,
577 obd2cli_tgt(imp->imp_obd),
579 (char *)&imp->imp_dlm_handle,
580 (char *)&imp->imp_connect_data };
581 struct ptlrpc_connect_async_args *aa;
584 spin_lock(&imp->imp_lock);
585 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
586 spin_unlock(&imp->imp_lock);
587 CERROR("can't connect to a closed import\n");
589 } else if (imp->imp_state == LUSTRE_IMP_FULL) {
590 spin_unlock(&imp->imp_lock);
591 CERROR("already connected\n");
593 } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
594 spin_unlock(&imp->imp_lock);
595 CERROR("already connecting\n");
599 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
602 imp->imp_resend_replay = 0;
604 if (!lustre_handle_is_used(&imp->imp_remote_handle))
607 committed_before_reconnect = imp->imp_peer_committed_transno;
609 set_transno = ptlrpc_first_transno(imp,
610 &imp->imp_connect_data.ocd_transno);
612 spin_unlock(&imp->imp_lock);
615 struct obd_uuid uuid;
617 obd_str2uuid(&uuid, new_uuid);
618 rc = import_set_conn_priority(imp, &uuid);
623 rc = import_select_connection(imp);
627 /* last in connection list */
628 if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
629 if (imp->imp_initial_recov_bk && initial_connect) {
630 CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
631 imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
632 /* Don't retry if connect fails */
634 obd_set_info_async(obd->obd_self_export,
635 sizeof(KEY_INIT_RECOV),
637 sizeof(rc), &rc, NULL);
639 if (imp->imp_recon_bk) {
640 CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
641 imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
642 spin_lock(&imp->imp_lock);
643 imp->imp_last_recon = 1;
644 spin_unlock(&imp->imp_lock);
648 /* Reset connect flags to the originally requested flags, in case
649 * the server is updated on-the-fly we will get the new features. */
650 imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
651 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
653 rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
654 &obd->obd_uuid, &imp->imp_connect_data, NULL);
658 request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
661 GOTO(out, rc = -ENOMEM);
663 /* Report the rpc service time to the server so that it knows how long
664 * to wait for clients to join recovery */
665 lustre_msg_set_service_time(request->rq_reqmsg,
666 at_timeout2est(request->rq_timeout));
668 /* The amount of time we give the server to process the connect req.
669 * import_select_connection will increase the net latency on
670 * repeated reconnect attempts to cover slow networks.
671 * We override/ignore the server rpc completion estimate here,
672 * which may be large if this is a reconnect attempt */
673 request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
674 lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
677 lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
679 if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
680 lustre_msg_add_op_flags(request->rq_reqmsg,
681 MSG_CONNECT_NEXT_VER);
683 request->rq_no_resend = request->rq_no_delay = 1;
684 request->rq_send_state = LUSTRE_IMP_CONNECTING;
685 /* Allow a slightly larger reply for future growth compatibility */
686 size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
688 ptlrpc_req_set_repsize(request, 2, size);
689 request->rq_interpret_reply = ptlrpc_connect_interpret;
691 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
692 aa = ptlrpc_req_async_args(request);
693 memset(aa, 0, sizeof *aa);
695 aa->pcaa_peer_committed = committed_before_reconnect;
696 aa->pcaa_initial_connect = initial_connect;
697 if (aa->pcaa_initial_connect) {
698 spin_lock(&imp->imp_lock);
699 imp->imp_replayable = 1;
700 spin_unlock(&imp->imp_lock);
701 lustre_msg_add_op_flags(request->rq_reqmsg,
702 MSG_CONNECT_INITIAL);
706 lustre_msg_add_op_flags(request->rq_reqmsg,
707 MSG_CONNECT_TRANSNO);
709 DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
710 aa->pcaa_initial_connect ? "initial " : "re",
712 ptlrpcd_add_req(request);
716 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
721 EXPORT_SYMBOL(ptlrpc_connect_import);
723 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
726 struct obd_import_conn *imp_conn;
732 spin_lock(&imp->imp_lock);
733 if (list_empty(&imp->imp_conn_list))
737 imp_conn = list_entry(imp->imp_conn_list.prev,
738 struct obd_import_conn,
741 /* XXX: When the failover node is the primary node, it is possible
742 * to have two identical connections in imp_conn_list. We must
743 * compare not conn's pointers but NIDs, otherwise we can defeat
744 * connection throttling. (See bug 14774.) */
745 if (imp->imp_conn_current->oic_conn->c_peer.nid !=
746 imp_conn->oic_conn->c_peer.nid) {
747 ptlrpc_ping_import_soon(imp);
752 /* liblustre has no pinger thead, so we wakup pinger anyway */
756 spin_unlock(&imp->imp_lock);
759 ptlrpc_pinger_wake_up();
764 static int ptlrpc_busy_reconnect(int rc)
766 return (rc == -EBUSY) || (rc == -EAGAIN);
769 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
772 struct ptlrpc_connect_async_args *aa = data;
773 struct obd_import *imp = request->rq_import;
774 struct client_obd *cli = &imp->imp_obd->u.cli;
775 struct lustre_handle old_hdl;
776 __u64 old_connect_flags;
780 spin_lock(&imp->imp_lock);
781 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
782 spin_unlock(&imp->imp_lock);
787 /* if this reconnect to busy export - not need select new target
789 if (ptlrpc_busy_reconnect(rc))
790 imp->imp_force_reconnect = 1;
791 spin_unlock(&imp->imp_lock);
795 LASSERT(imp->imp_conn_current);
797 msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
799 /* All imports are pingable */
800 imp->imp_pingable = 1;
801 imp->imp_force_reconnect = 0;
803 if (aa->pcaa_initial_connect) {
804 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
805 imp->imp_replayable = 1;
806 spin_unlock(&imp->imp_lock);
807 CDEBUG(D_HA, "connected to replayable target: %s\n",
808 obd2cli_tgt(imp->imp_obd));
810 imp->imp_replayable = 0;
811 spin_unlock(&imp->imp_lock);
814 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
815 msg_flags & MSG_CONNECT_NEXT_VER) ||
816 request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
817 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
818 CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
819 obd2cli_tgt(imp->imp_obd));
821 CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
822 obd2cli_tgt(imp->imp_obd));
825 imp->imp_remote_handle =
826 *lustre_msg_get_handle(request->rq_repmsg);
828 /* Initial connects are allowed for clients with non-random
829 * uuids when servers are in recovery. Simply signal the
830 * servers replay is complete and wait in REPLAY_WAIT. */
831 if (msg_flags & MSG_CONNECT_RECOVERING) {
832 CDEBUG(D_HA, "connect to %s during recovery\n",
833 obd2cli_tgt(imp->imp_obd));
834 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
836 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
837 ptlrpc_activate_import(imp);
839 GOTO(finish, rc = 0);
841 spin_unlock(&imp->imp_lock);
844 /* Determine what recovery state to move the import to. */
845 if (MSG_CONNECT_RECONNECT & msg_flags) {
846 memset(&old_hdl, 0, sizeof(old_hdl));
847 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
849 CERROR("%s@%s didn't like our handle "LPX64
850 ", failed\n", obd2cli_tgt(imp->imp_obd),
851 imp->imp_connection->c_remote_uuid.uuid,
852 imp->imp_dlm_handle.cookie);
853 GOTO(out, rc = -ENOTCONN);
856 if (memcmp(&imp->imp_remote_handle,
857 lustre_msg_get_handle(request->rq_repmsg),
858 sizeof(imp->imp_remote_handle))) {
859 int level = msg_flags & MSG_CONNECT_RECOVERING ?
862 /* Bug 16611/14775: if server handle have changed,
863 * that means some sort of disconnection happened.
864 * If the server is not in recovery, that also means it
865 * already erased all of our state because of previous
866 * eviction. If it is in recovery - we are safe to
867 * participate since we can reestablish all of our state
868 * with server again */
869 CDEBUG(level,"%s@%s changed server handle from "
870 LPX64" to "LPX64"%s\n",
871 obd2cli_tgt(imp->imp_obd),
872 imp->imp_connection->c_remote_uuid.uuid,
873 imp->imp_remote_handle.cookie,
874 lustre_msg_get_handle(request->rq_repmsg)->
876 (MSG_CONNECT_RECOVERING & msg_flags) ?
877 " but is still in recovery" : "");
879 imp->imp_remote_handle =
880 *lustre_msg_get_handle(request->rq_repmsg);
882 if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
883 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
884 GOTO(finish, rc = 0);
888 CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
889 obd2cli_tgt(imp->imp_obd),
890 imp->imp_connection->c_remote_uuid.uuid);
893 if (imp->imp_invalid) {
894 CDEBUG(D_HA, "%s: reconnected but import is invalid; "
895 "marking evicted\n", imp->imp_obd->obd_name);
896 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
897 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
898 CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
899 imp->imp_obd->obd_name,
900 obd2cli_tgt(imp->imp_obd));
902 spin_lock(&imp->imp_lock);
903 imp->imp_resend_replay = 1;
904 /* VBR: delayed connection */
905 if (MSG_CONNECT_DELAYED & msg_flags) {
906 imp->imp_delayed_recovery = 1;
907 imp->imp_no_lock_replay = 1;
909 spin_unlock(&imp->imp_lock);
911 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
913 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
915 } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
916 LASSERT(imp->imp_replayable);
917 imp->imp_remote_handle =
918 *lustre_msg_get_handle(request->rq_repmsg);
919 imp->imp_last_replay_transno = 0;
920 /* VBR: delayed connection */
921 if (MSG_CONNECT_DELAYED & msg_flags) {
922 spin_lock(&imp->imp_lock);
923 imp->imp_delayed_recovery = 1;
924 imp->imp_no_lock_replay = 1;
925 spin_unlock(&imp->imp_lock);
927 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
929 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
930 "flags reconnect/recovering not set: %x)",msg_flags);
931 imp->imp_remote_handle =
932 *lustre_msg_get_handle(request->rq_repmsg);
933 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
936 /* Sanity checks for a reconnected import. */
937 if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
938 CERROR("imp_replayable flag does not match server "
939 "after reconnect. We should LBUG right here.\n");
942 if (lustre_msg_get_last_committed(request->rq_repmsg) <
943 aa->pcaa_peer_committed) {
944 CERROR("%s went back in time (transno "LPD64
945 " was previously committed, server now claims "LPD64
946 ")! See https://bugzilla.lustre.org/show_bug.cgi?"
948 obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
949 lustre_msg_get_last_committed(request->rq_repmsg));
953 rc = ptlrpc_import_recovery_state_machine(imp);
955 if (rc == -ENOTCONN) {
956 CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
957 "invalidating and reconnecting\n",
958 obd2cli_tgt(imp->imp_obd),
959 imp->imp_connection->c_remote_uuid.uuid);
960 ptlrpc_connect_import(imp, NULL);
964 struct obd_connect_data *ocd;
965 struct obd_export *exp;
967 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
968 lustre_swab_connect);
969 spin_lock(&imp->imp_lock);
970 list_del(&imp->imp_conn_current->oic_item);
971 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
972 imp->imp_last_success_conn =
973 imp->imp_conn_current->oic_last_attempt;
976 spin_unlock(&imp->imp_lock);
977 CERROR("Wrong connect data from server\n");
982 imp->imp_connect_data = *ocd;
984 exp = class_conn2export(&imp->imp_dlm_handle);
985 spin_unlock(&imp->imp_lock);
987 /* check that server granted subset of flags we asked for. */
988 LASSERTF((ocd->ocd_connect_flags &
989 imp->imp_connect_flags_orig) ==
990 ocd->ocd_connect_flags, LPX64" != "LPX64,
991 imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
994 /* This could happen if export is cleaned during the
996 CERROR("Missing export for %s\n",
997 imp->imp_obd->obd_name);
998 GOTO(out, rc = -ENODEV);
1000 old_connect_flags = exp->exp_connect_flags;
1001 exp->exp_connect_flags = ocd->ocd_connect_flags;
1002 imp->imp_obd->obd_self_export->exp_connect_flags =
1003 ocd->ocd_connect_flags;
1004 class_export_put(exp);
1006 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
1008 if (!ocd->ocd_ibits_known &&
1009 ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
1010 CERROR("Inodebits aware server returned zero compatible"
1013 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1014 (ocd->ocd_version > LUSTRE_VERSION_CODE +
1015 LUSTRE_VERSION_OFFSET_WARN ||
1016 ocd->ocd_version < LUSTRE_VERSION_CODE -
1017 LUSTRE_VERSION_OFFSET_WARN)) {
1018 /* Sigh, some compilers do not like #ifdef in the middle
1019 of macro arguments */
1022 "older. Consider upgrading this client";
1025 "older. Consider recompiling this application";
1027 const char *newer = "newer than client version";
1029 LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1030 "is much %s (%s)\n",
1031 obd2cli_tgt(imp->imp_obd),
1032 OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1033 OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1034 OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1035 OBD_OCD_VERSION_FIX(ocd->ocd_version),
1036 ocd->ocd_version > LUSTRE_VERSION_CODE ?
1037 newer : older, LUSTRE_VERSION_STRING);
1040 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1041 /* We sent to the server ocd_cksum_types with bits set
1042 * for algorithms we understand. The server masked off
1043 * the checksum types it doesn't support */
1044 if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1045 LCONSOLE_WARN("The negotiation of the checksum "
1046 "alogrithm to use with server %s "
1047 "failed (%x/%x), disabling "
1049 obd2cli_tgt(imp->imp_obd),
1050 ocd->ocd_cksum_types,
1052 cli->cl_checksum = 0;
1053 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1054 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1056 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1058 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1059 cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1060 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1061 cli->cl_cksum_type = OBD_CKSUM_ADLER;
1063 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1066 /* The server does not support OBD_CONNECT_CKSUM.
1067 * Enforce CRC32 for backward compatibility*/
1068 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1069 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1072 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1073 cli->cl_max_pages_per_rpc =
1074 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1077 /* Reset ns_connect_flags only for initial connect. It might be
1078 * changed in while using FS and if we reset it in reconnect
1079 * this leads to lossing user settings done before such as
1080 * disable lru_resize, etc. */
1081 if (old_connect_flags != exp->exp_connect_flags ||
1082 aa->pcaa_initial_connect) {
1083 CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1084 "flags: "LPX64"\n", imp->imp_obd->obd_name,
1085 ocd->ocd_connect_flags);
1086 imp->imp_obd->obd_namespace->ns_connect_flags =
1087 ocd->ocd_connect_flags;
1088 imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1089 ocd->ocd_connect_flags;
1092 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1093 (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1094 /* We need a per-message support flag, because
1095 a. we don't know if the incoming connect reply
1096 supports AT or not (in reply_in_callback)
1098 b. failovered server means export and flags are gone
1099 (in ptlrpc_send_reply).
1100 Can only be set when we know AT is supported at
1102 imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1104 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1106 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1107 (cli->cl_max_pages_per_rpc > 0));
1112 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1113 spin_lock(&imp->imp_lock);
1114 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1115 (request->rq_import_generation == imp->imp_generation))
1116 ptlrpc_deactivate_and_unlock_import(imp);
1118 spin_unlock(&imp->imp_lock);
1120 if (imp->imp_recon_bk && imp->imp_last_recon) {
1121 /* Give up trying to reconnect */
1122 imp->imp_obd->obd_no_recov = 1;
1123 ptlrpc_deactivate_import(imp);
1126 if (rc == -EPROTO) {
1127 struct obd_connect_data *ocd;
1128 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1130 lustre_swab_connect);
1132 (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1133 (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1134 /* Actually servers are only supposed to refuse
1135 connection from liblustre clients, so we should
1136 never see this from VFS context */
1137 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1139 " refused connection from this client "
1140 "with an incompatible version (%s). "
1141 "Client must be recompiled\n",
1142 obd2cli_tgt(imp->imp_obd),
1143 OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1144 OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1145 OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1146 OBD_OCD_VERSION_FIX(ocd->ocd_version),
1147 LUSTRE_VERSION_STRING);
1148 ptlrpc_deactivate_import(imp);
1149 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1154 ptlrpc_maybe_ping_import_soon(imp);
1156 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1157 obd2cli_tgt(imp->imp_obd),
1158 (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1161 spin_lock(&imp->imp_lock);
1162 imp->imp_last_recon = 0;
1163 spin_unlock(&imp->imp_lock);
1165 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1169 static int completed_replay_interpret(struct ptlrpc_request *req,
1170 void * data, int rc)
1173 atomic_dec(&req->rq_import->imp_replay_inflight);
1174 if (req->rq_status == 0 &&
1175 !req->rq_import->imp_vbr_failed) {
1176 ptlrpc_import_recovery_state_machine(req->rq_import);
1178 if (req->rq_import->imp_vbr_failed) {
1180 "%s: version recovery fails, reconnecting\n",
1181 req->rq_import->imp_obd->obd_name);
1182 spin_lock(&req->rq_import->imp_lock);
1183 req->rq_import->imp_vbr_failed = 0;
1184 spin_unlock(&req->rq_import->imp_lock);
1186 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1188 req->rq_import->imp_obd->obd_name,
1191 ptlrpc_connect_import(req->rq_import, NULL);
1196 static int signal_completed_replay(struct obd_import *imp)
1198 struct ptlrpc_request *req;
1201 LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1202 atomic_inc(&imp->imp_replay_inflight);
1204 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1206 atomic_dec(&imp->imp_replay_inflight);
1210 ptlrpc_req_set_repsize(req, 1, NULL);
1211 req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1212 lustre_msg_add_flags(req->rq_reqmsg,
1213 MSG_LOCK_REPLAY_DONE |
1214 MSG_REQ_REPLAY_DONE |
1217 if (imp->imp_delayed_recovery)
1218 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1219 req->rq_timeout *= 3;
1220 req->rq_interpret_reply = completed_replay_interpret;
1222 ptlrpcd_add_req(req);
1227 static int ptlrpc_invalidate_import_thread(void *data)
1229 struct obd_import *imp = data;
1233 cfs_daemonize_ctxt("ll_imp_inval");
1235 CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1236 imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1237 imp->imp_connection->c_remote_uuid.uuid);
1239 ptlrpc_invalidate_import(imp);
1241 if (obd_dump_on_eviction) {
1242 CERROR("dump the log upon eviction\n");
1243 libcfs_debug_dumplog();
1246 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1247 ptlrpc_import_recovery_state_machine(imp);
1249 class_import_put(imp);
1254 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1262 if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1263 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1264 &target_start, &target_len);
1265 /* Don't care about MGC eviction */
1266 if (strcmp(imp->imp_obd->obd_type->typ_name,
1267 LUSTRE_MGC_NAME) != 0) {
1268 LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1269 "%.*s; in progress operations using "
1270 "this service will fail.\n",
1271 target_len, target_start);
1273 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1274 obd2cli_tgt(imp->imp_obd),
1275 imp->imp_connection->c_remote_uuid.uuid);
1278 /* bug 17802: XXX client_disconnect_export vs connect request
1279 * race. if client will evicted at this time, we start
1280 * invalidate thread without referece to import and import can
1281 * be freed at same time. */
1282 class_import_get(imp);
1283 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1284 CLONE_VM | CLONE_FILES);
1286 class_import_put(imp);
1287 CERROR("error starting invalidate thread: %d\n", rc);
1293 ptlrpc_invalidate_import(imp);
1295 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1299 if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1300 CDEBUG(D_HA, "replay requested by %s\n",
1301 obd2cli_tgt(imp->imp_obd));
1302 rc = ptlrpc_replay_next(imp, &inflight);
1303 if (inflight == 0 &&
1304 atomic_read(&imp->imp_replay_inflight) == 0) {
1305 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1306 rc = ldlm_replay_locks(imp);
1313 if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1314 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1315 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1316 rc = signal_completed_replay(imp);
1323 if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1324 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1325 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1329 if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1330 CDEBUG(D_HA, "reconnected to %s@%s\n",
1331 obd2cli_tgt(imp->imp_obd),
1332 imp->imp_connection->c_remote_uuid.uuid);
1334 rc = ptlrpc_resend(imp);
1337 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1338 ptlrpc_activate_import(imp);
1340 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1341 &target_start, &target_len);
1342 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1343 "using nid %s.\n", imp->imp_obd->obd_name,
1344 target_len, target_start,
1345 libcfs_nid2str(imp->imp_connection->c_peer.nid));
1348 if (imp->imp_state == LUSTRE_IMP_FULL) {
1349 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1350 ptlrpc_wake_delayed(imp);
1357 static int back_to_sleep(void *unused)
1362 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1364 struct ptlrpc_request *req;
1366 int nowait = imp->imp_obd->obd_force;
1370 GOTO(set_state, rc);
1372 switch (imp->imp_connect_op) {
1373 case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1374 case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1375 case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1377 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1378 obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1382 if (ptlrpc_import_in_recovery(imp)) {
1383 struct l_wait_info lwi;
1384 cfs_duration_t timeout;
1387 timeout = cfs_time_seconds(obd_timeout);
1389 int idx = import_at_get_index(imp,
1390 imp->imp_client->cli_request_portal);
1391 timeout = cfs_time_seconds(
1392 at_get(&imp->imp_at.iat_service_estimate[idx]));
1394 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1395 back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1396 rc = l_wait_event(imp->imp_recovery_waitq,
1397 !ptlrpc_import_in_recovery(imp), &lwi);
1400 spin_lock(&imp->imp_lock);
1401 if (imp->imp_state != LUSTRE_IMP_FULL)
1404 spin_unlock(&imp->imp_lock);
1406 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1408 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1409 * it fails. We can get through the above with a down server
1410 * if the client doesn't know the server is gone yet. */
1411 req->rq_no_resend = 1;
1414 /* We want client umounts to happen quickly, no matter the
1416 req->rq_timeout = min_t(int, req->rq_timeout,
1417 INITIAL_CONNECT_TIMEOUT);
1419 /* ... but we always want liblustre clients to nicely
1420 disconnect, so only use the adaptive value. */
1422 req->rq_timeout = obd_timeout / 3;
1425 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1426 req->rq_send_state = LUSTRE_IMP_CONNECTING;
1427 ptlrpc_req_set_repsize(req, 1, NULL);
1428 rc = ptlrpc_queue_wait(req);
1429 ptlrpc_req_finished(req);
1433 spin_lock(&imp->imp_lock);
1436 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1438 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1439 memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1440 /* Try all connections in the future - bz 12758 */
1441 imp->imp_last_recon = 0;
1442 spin_unlock(&imp->imp_lock);
1447 /* Sets maximal number of RPCs possible originating from other side of this
1448 import (server) to us and number of async RPC replies that we are not waiting
1450 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1452 LNetSetAsync(imp->imp_connection->c_peer, count);
1455 void ptlrpc_cleanup_imp(struct obd_import *imp)
1459 spin_lock(&imp->imp_lock);
1460 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1461 imp->imp_generation++;
1462 spin_unlock(&imp->imp_lock);
1463 ptlrpc_abort_inflight(imp);
1468 /* Adaptive Timeout utils */
1469 extern unsigned int at_min, at_max, at_history;
1471 /* Bin into timeslices using AT_BINS bins.
1472 This gives us a max of the last binlimit*AT_BINS secs without the storage,
1473 but still smoothing out a return to normalcy from a slow response.
1474 (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1475 int at_add(struct adaptive_timeout *at, unsigned int val)
1477 unsigned int old = at->at_current;
1478 time_t now = cfs_time_current_sec();
1479 time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1482 CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1483 val, at, now - at->at_binstart, at->at_current,
1484 at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1487 /* 0's don't count, because we never want our timeout to
1488 drop to 0, and because 0 could mean an error */
1491 spin_lock(&at->at_lock);
1493 if (unlikely(at->at_binstart == 0)) {
1494 /* Special case to remove default from history */
1495 at->at_current = val;
1496 at->at_worst_ever = val;
1497 at->at_worst_time = now;
1498 at->at_hist[0] = val;
1499 at->at_binstart = now;
1500 } else if (now - at->at_binstart < binlimit ) {
1502 at->at_hist[0] = max(val, at->at_hist[0]);
1503 at->at_current = max(val, at->at_current);
1506 unsigned int maxv = val;
1507 /* move bins over */
1508 shift = (now - at->at_binstart) / binlimit;
1510 for(i = AT_BINS - 1; i >= 0; i--) {
1512 at->at_hist[i] = at->at_hist[i - shift];
1513 maxv = max(maxv, at->at_hist[i]);
1518 at->at_hist[0] = val;
1519 at->at_current = maxv;
1520 at->at_binstart += shift * binlimit;
1523 if (at->at_current > at->at_worst_ever) {
1524 at->at_worst_ever = at->at_current;
1525 at->at_worst_time = now;
1528 if (at->at_flags & AT_FLG_NOHIST)
1529 /* Only keep last reported val; keeping the rest of the history
1531 at->at_current = val;
1534 at->at_current = min(at->at_current, at_max);
1535 at->at_current = max(at->at_current, at_min);
1537 if (at->at_current != old)
1538 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1539 "(val=%u) hist %u %u %u %u\n", at,
1540 old, at->at_current, at->at_current - old, val,
1541 at->at_hist[0], at->at_hist[1], at->at_hist[2],
1544 /* if we changed, report the old value */
1545 old = (at->at_current != old) ? old : 0;
1547 spin_unlock(&at->at_lock);
1551 /* Find the imp_at index for a given portal; assign if space available */
1552 int import_at_get_index(struct obd_import *imp, int portal)
1554 struct imp_at *at = &imp->imp_at;
1557 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1558 if (at->iat_portal[i] == portal)
1560 if (at->iat_portal[i] == 0)
1565 /* Not found in list, add it under a lock */
1566 spin_lock(&imp->imp_lock);
1568 /* Check unused under lock */
1569 for (; i < IMP_AT_MAX_PORTALS; i++) {
1570 if (at->iat_portal[i] == portal)
1572 if (at->iat_portal[i] == 0)
1577 /* Not enough portals? */
1578 LASSERT(i < IMP_AT_MAX_PORTALS);
1580 at->iat_portal[i] = portal;
1582 spin_unlock(&imp->imp_lock);