1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ptlrpc/import.c
38 * Author: Mike Shaver <shaver@clusterfs.com>
41 #define DEBUG_SUBSYSTEM S_RPC
43 # include <liblustre.h>
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
52 #include <obd_class.h>
54 #include "ptlrpc_internal.h"
56 struct ptlrpc_connect_async_args {
57 __u64 pcaa_peer_committed;
58 int pcaa_initial_connect;
61 static void __import_set_state(struct obd_import *imp,
62 enum lustre_imp_state state)
64 imp->imp_state = state;
65 imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
66 imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
67 cfs_time_current_sec();
68 imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
72 /* A CLOSED import should remain so. */
73 #define IMPORT_SET_STATE_NOLOCK(imp, state) \
75 if (imp->imp_state != LUSTRE_IMP_CLOSED) { \
76 CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n", \
77 imp, obd2cli_tgt(imp->imp_obd), \
78 ptlrpc_import_state_name(imp->imp_state), \
79 ptlrpc_import_state_name(state)); \
80 __import_set_state(imp, state); \
84 #define IMPORT_SET_STATE(imp, state) \
86 spin_lock(&imp->imp_lock); \
87 IMPORT_SET_STATE_NOLOCK(imp, state); \
88 spin_unlock(&imp->imp_lock); \
92 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
94 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
96 /* Only this function is allowed to change the import state when it is
97 * CLOSED. I would rather refcount the import and free it after
98 * disconnection like we do with exports. To do that, the client_obd
99 * will need to save the peer info somewhere other than in the import,
101 int ptlrpc_init_import(struct obd_import *imp)
103 spin_lock(&imp->imp_lock);
105 imp->imp_generation++;
106 imp->imp_state = LUSTRE_IMP_NEW;
108 spin_unlock(&imp->imp_lock);
112 EXPORT_SYMBOL(ptlrpc_init_import);
114 #define UUID_STR "_UUID"
115 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
118 *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
119 ? uuid : uuid + strlen(prefix);
121 *uuid_len = strlen(*uuid_start);
123 if (*uuid_len < strlen(UUID_STR))
126 if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
127 UUID_STR, strlen(UUID_STR)))
128 *uuid_len -= strlen(UUID_STR);
131 /* Returns true if import was FULL, false if import was already not
133 * @imp - import to be disconnected
134 * @conn_cnt - connection count (epoch) of the request that timed out
135 * and caused the disconnection. In some cases, multiple
136 * inflight requests can fail to a single target (e.g. OST
137 * bulk requests) and if one has already caused a reconnection
138 * (increasing the import->conn_cnt) the older failure should
139 * not also cause a reconnection. If zero it forces a reconnect.
141 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
145 spin_lock(&imp->imp_lock);
147 if (imp->imp_state == LUSTRE_IMP_FULL &&
148 (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
152 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
153 &target_start, &target_len);
154 if (imp->imp_replayable) {
155 LCONSOLE_WARN("%s: Connection to service %.*s via nid "
156 "%s was lost; in progress operations using this "
157 "service will wait for recovery to complete.\n",
158 imp->imp_obd->obd_name, target_len, target_start,
159 libcfs_nid2str(imp->imp_connection->c_peer.nid));
161 LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
162 "%.*s via nid %s was lost; in progress "
163 "operations using this service will fail.\n",
164 imp->imp_obd->obd_name, target_len, target_start,
165 libcfs_nid2str(imp->imp_connection->c_peer.nid));
167 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
168 spin_unlock(&imp->imp_lock);
170 if (obd_dump_on_timeout)
171 libcfs_debug_dumplog();
173 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
176 spin_unlock(&imp->imp_lock);
177 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
178 imp->imp_client->cli_name, imp,
179 (imp->imp_state == LUSTRE_IMP_FULL &&
180 imp->imp_conn_cnt > conn_cnt) ?
181 "reconnected" : "not connected", imp->imp_conn_cnt,
182 conn_cnt, ptlrpc_import_state_name(imp->imp_state));
188 /* Must be called with imp_lock held! */
189 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
192 LASSERT_SPIN_LOCKED(&imp->imp_lock);
194 CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
195 imp->imp_invalid = 1;
196 imp->imp_generation++;
197 spin_unlock(&imp->imp_lock);
199 ptlrpc_abort_inflight(imp);
200 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
204 * This acts as a barrier; all existing requests are rejected, and
205 * no new requests will be accepted until the import is valid again.
207 void ptlrpc_deactivate_import(struct obd_import *imp)
209 spin_lock(&imp->imp_lock);
210 ptlrpc_deactivate_and_unlock_import(imp);
214 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
218 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
219 (req->rq_phase == RQ_PHASE_BULK) ||
220 (req->rq_phase == RQ_PHASE_NEW)))
223 if (req->rq_timedout)
226 if (req->rq_phase == RQ_PHASE_NEW)
229 dl = req->rq_deadline;
237 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
239 time_t now = cfs_time_current_sec();
240 struct list_head *tmp, *n;
241 struct ptlrpc_request *req;
242 unsigned int timeout = 0;
244 spin_lock(&imp->imp_lock);
245 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
246 req = list_entry(tmp, struct ptlrpc_request, rq_list);
247 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
249 spin_unlock(&imp->imp_lock);
254 * This function will invalidate the import, if necessary, then block
255 * for all the RPC completions, and finally notify the obd to
256 * invalidate its state (ie cancel locks, clear pending requests,
259 void ptlrpc_invalidate_import(struct obd_import *imp)
261 struct list_head *tmp, *n;
262 struct ptlrpc_request *req;
263 struct l_wait_info lwi;
264 unsigned int timeout;
267 atomic_inc(&imp->imp_inval_count);
270 * If this is an invalid MGC connection, then don't bother
271 * waiting for imp_inflight to drop to 0.
273 if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
276 if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
277 ptlrpc_deactivate_import(imp);
279 LASSERT(imp->imp_invalid);
281 /* Wait forever until inflight == 0. We really can't do it another
282 * way because in some cases we need to wait for very long reply
283 * unlink. We can't do anything before that because there is really
284 * no guarantee that some rdma transfer is not in progress right now. */
286 /* Calculate max timeout for waiting on rpcs to error
287 * out. Use obd_timeout if calculated value is smaller
289 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
290 timeout = ptlrpc_inflight_timeout(imp);
291 timeout += timeout / 3;
294 timeout = obd_timeout;
296 /* decrease the interval to increase race condition */
300 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
303 /* Wait for all requests to error out and call completion
304 * callbacks. Cap it at obd_timeout -- these should all
305 * have been locally cancelled by ptlrpc_abort_inflight. */
306 lwi = LWI_TIMEOUT_INTERVAL(
307 cfs_timeout_cap(cfs_time_seconds(timeout)),
308 (timeout > 1)?cfs_time_seconds(1):cfs_time_seconds(1)/2,
310 rc = l_wait_event(imp->imp_recovery_waitq,
311 (atomic_read(&imp->imp_inflight) == 0), &lwi);
313 const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
315 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
316 cli_tgt, rc, atomic_read(&imp->imp_inflight));
318 spin_lock(&imp->imp_lock);
319 if (atomic_read(&imp->imp_inflight) == 0) {
320 int count = atomic_read(&imp->imp_unregistering);
322 /* We know that "unregistering" rpcs only can
323 * survive in sending or delaying lists (they
324 * maybe waiting for long reply unlink in
325 * sluggish nets). Let's check this. If there
326 * is no inflight and unregistering != 0, this
328 LASSERTF(count == 0, "Some RPCs are still "
329 "unregistering: %d\n", count);
331 /* Let's save one loop as soon as inflight have
332 * dropped to zero. No new inflights possible at
336 list_for_each_safe(tmp, n,
337 &imp->imp_sending_list) {
338 req = list_entry(tmp,
339 struct ptlrpc_request,
341 DEBUG_REQ(D_ERROR, req,
342 "still on sending list");
344 list_for_each_safe(tmp, n,
345 &imp->imp_delayed_list) {
346 req = list_entry(tmp,
347 struct ptlrpc_request,
349 DEBUG_REQ(D_ERROR, req,
350 "still on delayed list");
353 CERROR("%s: RPCs in \"%s\" phase found (%d). "
354 "Network is sluggish? Waiting them "
355 "to error out.\n", cli_tgt,
356 ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
357 atomic_read(&imp->imp_unregistering));
359 spin_unlock(&imp->imp_lock);
363 /* Let's additionally check that no new rpcs added to import in
364 * "invalidate" state. */
365 LASSERT(atomic_read(&imp->imp_inflight) == 0);
368 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
370 atomic_dec(&imp->imp_inval_count);
371 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
374 /* unset imp_invalid */
375 void ptlrpc_activate_import(struct obd_import *imp)
377 struct obd_device *obd = imp->imp_obd;
379 spin_lock(&imp->imp_lock);
380 imp->imp_invalid = 0;
381 spin_unlock(&imp->imp_lock);
383 obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
386 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
390 LASSERT(!imp->imp_dlm_fake);
392 if (ptlrpc_set_import_discon(imp, conn_cnt)) {
393 if (!imp->imp_replayable) {
394 CDEBUG(D_HA, "import %s@%s for %s not replayable, "
395 "auto-deactivating\n",
396 obd2cli_tgt(imp->imp_obd),
397 imp->imp_connection->c_remote_uuid.uuid,
398 imp->imp_obd->obd_name);
399 ptlrpc_deactivate_import(imp);
402 CDEBUG(D_HA, "%s: waking up pinger\n",
403 obd2cli_tgt(imp->imp_obd));
405 spin_lock(&imp->imp_lock);
406 imp->imp_force_verify = 1;
407 spin_unlock(&imp->imp_lock);
409 ptlrpc_pinger_wake_up();
414 int ptlrpc_reconnect_import(struct obd_import *imp)
417 ptlrpc_set_import_discon(imp, 0);
418 /* Force a new connect attempt */
419 ptlrpc_invalidate_import(imp);
420 /* Do a fresh connect next time by zeroing the handle */
421 ptlrpc_disconnect_import(imp, 1);
422 /* Wait for all invalidate calls to finish */
423 if (atomic_read(&imp->imp_inval_count) > 0) {
425 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
426 rc = l_wait_event(imp->imp_recovery_waitq,
427 (atomic_read(&imp->imp_inval_count) == 0),
430 CERROR("Interrupted, inval=%d\n",
431 atomic_read(&imp->imp_inval_count));
435 * Allow reconnect attempts. Note: Currently, the function is
436 * only called by MGC. So assume this is a recoverable import,
437 * and force import to be recoverable. fix this if you need to
440 imp->imp_obd->obd_no_recov = 0;
441 /* Remove 'invalid' flag */
442 ptlrpc_activate_import(imp);
443 /* Attempt a new connect */
444 ptlrpc_recover_import(imp, NULL);
448 EXPORT_SYMBOL(ptlrpc_reconnect_import);
450 static int import_select_connection(struct obd_import *imp)
452 struct obd_import_conn *imp_conn = NULL, *conn;
453 struct obd_export *dlmexp;
457 spin_lock(&imp->imp_lock);
459 if (list_empty(&imp->imp_conn_list)) {
460 CERROR("%s: no connections available\n",
461 imp->imp_obd->obd_name);
462 spin_unlock(&imp->imp_lock);
466 list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
467 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
468 imp->imp_obd->obd_name,
469 libcfs_nid2str(conn->oic_conn->c_peer.nid),
470 conn->oic_last_attempt);
472 /* Don't thrash connections */
473 if (cfs_time_before_64(cfs_time_current_64(),
474 conn->oic_last_attempt +
475 cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
479 /* If we have not tried this connection since the
480 the last successful attempt, go with this one */
481 if ((conn->oic_last_attempt == 0) ||
482 cfs_time_beforeq_64(conn->oic_last_attempt,
483 imp->imp_last_success_conn)) {
489 /* If all of the connections have already been tried
490 since the last successful connection; just choose the
491 least recently used */
494 else if (cfs_time_before_64(conn->oic_last_attempt,
495 imp_conn->oic_last_attempt))
499 /* if not found, simply choose the current one */
500 if (!imp_conn || imp->imp_force_reconnect) {
501 LASSERT(imp->imp_conn_current);
502 imp_conn = imp->imp_conn_current;
505 LASSERT(imp_conn->oic_conn);
507 /* If we've tried everything, and we're back to the beginning of the
508 list, increase our timeout and try again. It will be reset when
509 we do finally connect. (FIXME: really we should wait for all network
510 state associated with the last connection attempt to drain before
511 trying to reconnect on it.) */
512 if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
513 !imp->imp_recon_bk /* not retrying */) {
514 if (at_get(&imp->imp_at.iat_net_latency) <
515 CONNECTION_SWITCH_MAX) {
516 at_measured(&imp->imp_at.iat_net_latency,
517 MIN(at_get(&imp->imp_at.iat_net_latency) +
518 CONNECTION_SWITCH_INC,
519 CONNECTION_SWITCH_MAX));
521 LASSERT(imp_conn->oic_last_attempt);
522 CWARN("%s: tried all connections, increasing latency to %ds\n",
523 imp->imp_obd->obd_name,
524 at_get(&imp->imp_at.iat_net_latency));
527 imp_conn->oic_last_attempt = cfs_time_current_64();
529 /* switch connection, don't mind if it's same as the current one */
530 if (imp->imp_connection)
531 ptlrpc_connection_put(imp->imp_connection);
532 imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
534 dlmexp = class_conn2export(&imp->imp_dlm_handle);
535 LASSERT(dlmexp != NULL);
536 if (dlmexp->exp_connection)
537 ptlrpc_connection_put(dlmexp->exp_connection);
538 dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
539 class_export_put(dlmexp);
541 if (imp->imp_conn_current != imp_conn) {
542 if (imp->imp_conn_current)
543 CDEBUG(D_HA, "Changing connection for %s to %s/%s\n",
544 imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid,
545 libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
546 imp->imp_conn_current = imp_conn;
549 CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
550 imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
551 libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
553 spin_unlock(&imp->imp_lock);
559 * must be called under imp lock
561 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
563 struct ptlrpc_request *req;
564 struct list_head *tmp;
566 if (list_empty(&imp->imp_replay_list))
568 tmp = imp->imp_replay_list.next;
569 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
570 *transno = req->rq_transno;
571 if (req->rq_transno == 0) {
572 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
579 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
581 struct obd_device *obd = imp->imp_obd;
583 int initial_connect = 0;
585 __u64 committed_before_reconnect = 0;
586 struct ptlrpc_request *request;
587 __u32 size[] = { sizeof(struct ptlrpc_body),
588 sizeof(imp->imp_obd->u.cli.cl_target_uuid),
589 sizeof(obd->obd_uuid),
590 sizeof(imp->imp_dlm_handle),
591 sizeof(imp->imp_connect_data) };
592 char *tmp[] = { NULL,
593 obd2cli_tgt(imp->imp_obd),
595 (char *)&imp->imp_dlm_handle,
596 (char *)&imp->imp_connect_data };
597 struct ptlrpc_connect_async_args *aa;
600 spin_lock(&imp->imp_lock);
601 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
602 spin_unlock(&imp->imp_lock);
603 CERROR("can't connect to a closed import\n");
605 } else if (imp->imp_state == LUSTRE_IMP_FULL) {
606 spin_unlock(&imp->imp_lock);
607 CERROR("already connected\n");
609 } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
610 spin_unlock(&imp->imp_lock);
611 CERROR("already connecting\n");
615 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
618 imp->imp_resend_replay = 0;
620 if (!lustre_handle_is_used(&imp->imp_remote_handle))
623 committed_before_reconnect = imp->imp_peer_committed_transno;
625 set_transno = ptlrpc_first_transno(imp,
626 &imp->imp_connect_data.ocd_transno);
628 spin_unlock(&imp->imp_lock);
631 struct obd_uuid uuid;
633 obd_str2uuid(&uuid, new_uuid);
634 rc = import_set_conn_priority(imp, &uuid);
639 rc = import_select_connection(imp);
643 /* last in connection list */
644 if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
645 if (imp->imp_initial_recov_bk && initial_connect) {
646 CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
647 imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
648 /* Don't retry if connect fails */
650 obd_set_info_async(obd->obd_self_export,
651 sizeof(KEY_INIT_RECOV),
653 sizeof(rc), &rc, NULL);
655 if (imp->imp_recon_bk) {
656 CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
657 imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
658 spin_lock(&imp->imp_lock);
659 imp->imp_last_recon = 1;
660 spin_unlock(&imp->imp_lock);
664 /* Reset connect flags to the originally requested flags, in case
665 * the server is updated on-the-fly we will get the new features. */
666 imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
667 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
669 rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
670 &obd->obd_uuid, &imp->imp_connect_data, NULL);
674 request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
677 GOTO(out, rc = -ENOMEM);
679 /* Report the rpc service time to the server so that it knows how long
680 * to wait for clients to join recovery */
681 lustre_msg_set_service_time(request->rq_reqmsg,
682 at_timeout2est(request->rq_timeout));
684 /* The amount of time we give the server to process the connect req.
685 * import_select_connection will increase the net latency on
686 * repeated reconnect attempts to cover slow networks.
687 * We override/ignore the server rpc completion estimate here,
688 * which may be large if this is a reconnect attempt */
689 request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
690 lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
693 lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
695 if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
696 lustre_msg_add_op_flags(request->rq_reqmsg,
697 MSG_CONNECT_NEXT_VER);
699 request->rq_no_resend = request->rq_no_delay = 1;
700 request->rq_send_state = LUSTRE_IMP_CONNECTING;
701 /* Allow a slightly larger reply for future growth compatibility */
702 size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
704 ptlrpc_req_set_repsize(request, 2, size);
705 request->rq_interpret_reply = ptlrpc_connect_interpret;
707 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
708 aa = ptlrpc_req_async_args(request);
709 memset(aa, 0, sizeof *aa);
711 aa->pcaa_peer_committed = committed_before_reconnect;
712 aa->pcaa_initial_connect = initial_connect;
713 if (aa->pcaa_initial_connect) {
714 spin_lock(&imp->imp_lock);
715 imp->imp_replayable = 1;
716 spin_unlock(&imp->imp_lock);
717 lustre_msg_add_op_flags(request->rq_reqmsg,
718 MSG_CONNECT_INITIAL);
722 lustre_msg_add_op_flags(request->rq_reqmsg,
723 MSG_CONNECT_TRANSNO);
725 DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
726 aa->pcaa_initial_connect ? "initial " : "re",
728 ptlrpcd_add_req(request);
732 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
737 EXPORT_SYMBOL(ptlrpc_connect_import);
739 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
742 struct obd_import_conn *imp_conn;
748 spin_lock(&imp->imp_lock);
749 if (list_empty(&imp->imp_conn_list))
753 imp_conn = list_entry(imp->imp_conn_list.prev,
754 struct obd_import_conn,
757 /* XXX: When the failover node is the primary node, it is possible
758 * to have two identical connections in imp_conn_list. We must
759 * compare not conn's pointers but NIDs, otherwise we can defeat
760 * connection throttling. (See bug 14774.) */
761 if (imp->imp_conn_current->oic_conn->c_peer.nid !=
762 imp_conn->oic_conn->c_peer.nid) {
763 ptlrpc_ping_import_soon(imp);
768 /* liblustre has no pinger thead, so we wakup pinger anyway */
772 spin_unlock(&imp->imp_lock);
775 ptlrpc_pinger_wake_up();
780 static int ptlrpc_busy_reconnect(int rc)
782 return (rc == -EBUSY) || (rc == -EAGAIN);
785 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
788 struct ptlrpc_connect_async_args *aa = data;
789 struct obd_import *imp = request->rq_import;
790 struct client_obd *cli = &imp->imp_obd->u.cli;
791 struct lustre_handle old_hdl;
792 __u64 old_connect_flags;
796 spin_lock(&imp->imp_lock);
797 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
798 spin_unlock(&imp->imp_lock);
803 /* if this reconnect to busy export - not need select new target
805 imp->imp_force_reconnect = ptlrpc_busy_reconnect(rc);
806 spin_unlock(&imp->imp_lock);
810 LASSERT(imp->imp_conn_current);
812 msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
814 /* All imports are pingable */
815 imp->imp_pingable = 1;
816 imp->imp_force_reconnect = 0;
818 if (aa->pcaa_initial_connect) {
819 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
820 imp->imp_replayable = 1;
821 spin_unlock(&imp->imp_lock);
822 CDEBUG(D_HA, "connected to replayable target: %s\n",
823 obd2cli_tgt(imp->imp_obd));
825 imp->imp_replayable = 0;
826 spin_unlock(&imp->imp_lock);
829 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
830 msg_flags & MSG_CONNECT_NEXT_VER) ||
831 request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
832 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
833 CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
834 obd2cli_tgt(imp->imp_obd));
836 CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
837 obd2cli_tgt(imp->imp_obd));
840 imp->imp_remote_handle =
841 *lustre_msg_get_handle(request->rq_repmsg);
843 /* Initial connects are allowed for clients with non-random
844 * uuids when servers are in recovery. Simply signal the
845 * servers replay is complete and wait in REPLAY_WAIT. */
846 if (msg_flags & MSG_CONNECT_RECOVERING) {
847 CDEBUG(D_HA, "connect to %s during recovery\n",
848 obd2cli_tgt(imp->imp_obd));
849 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
851 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
852 ptlrpc_activate_import(imp);
854 GOTO(finish, rc = 0);
856 spin_unlock(&imp->imp_lock);
859 /* Determine what recovery state to move the import to. */
860 if (MSG_CONNECT_RECONNECT & msg_flags) {
861 memset(&old_hdl, 0, sizeof(old_hdl));
862 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
864 CERROR("%s@%s didn't like our handle "LPX64
865 ", failed\n", obd2cli_tgt(imp->imp_obd),
866 imp->imp_connection->c_remote_uuid.uuid,
867 imp->imp_dlm_handle.cookie);
868 GOTO(out, rc = -ENOTCONN);
871 if (memcmp(&imp->imp_remote_handle,
872 lustre_msg_get_handle(request->rq_repmsg),
873 sizeof(imp->imp_remote_handle))) {
874 int level = msg_flags & MSG_CONNECT_RECOVERING ?
877 /* Bug 16611/14775: if server handle have changed,
878 * that means some sort of disconnection happened.
879 * If the server is not in recovery, that also means it
880 * already erased all of our state because of previous
881 * eviction. If it is in recovery - we are safe to
882 * participate since we can reestablish all of our state
883 * with server again */
884 CDEBUG(level,"%s@%s changed server handle from "
885 LPX64" to "LPX64"%s\n",
886 obd2cli_tgt(imp->imp_obd),
887 imp->imp_connection->c_remote_uuid.uuid,
888 imp->imp_remote_handle.cookie,
889 lustre_msg_get_handle(request->rq_repmsg)->
891 (MSG_CONNECT_RECOVERING & msg_flags) ?
892 " but is still in recovery" : "");
894 imp->imp_remote_handle =
895 *lustre_msg_get_handle(request->rq_repmsg);
897 if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
898 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
899 GOTO(finish, rc = 0);
903 CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
904 obd2cli_tgt(imp->imp_obd),
905 imp->imp_connection->c_remote_uuid.uuid);
908 if (imp->imp_invalid) {
909 CDEBUG(D_HA, "%s: reconnected but import is invalid; "
910 "marking evicted\n", imp->imp_obd->obd_name);
911 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
912 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
913 CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
914 imp->imp_obd->obd_name,
915 obd2cli_tgt(imp->imp_obd));
917 spin_lock(&imp->imp_lock);
918 imp->imp_resend_replay = 1;
919 /* VBR: delayed connection */
920 if (MSG_CONNECT_DELAYED & msg_flags) {
921 imp->imp_delayed_recovery = 1;
922 imp->imp_no_lock_replay = 1;
924 spin_unlock(&imp->imp_lock);
926 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
928 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
930 } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
931 LASSERT(imp->imp_replayable);
932 imp->imp_remote_handle =
933 *lustre_msg_get_handle(request->rq_repmsg);
934 imp->imp_last_replay_transno = 0;
935 /* VBR: delayed connection */
936 if (MSG_CONNECT_DELAYED & msg_flags) {
937 spin_lock(&imp->imp_lock);
938 imp->imp_delayed_recovery = 1;
939 imp->imp_no_lock_replay = 1;
940 spin_unlock(&imp->imp_lock);
942 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
944 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
945 "flags reconnect/recovering not set: %x)",msg_flags);
946 imp->imp_remote_handle =
947 *lustre_msg_get_handle(request->rq_repmsg);
948 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
951 /* Sanity checks for a reconnected import. */
952 if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
953 CERROR("imp_replayable flag does not match server "
954 "after reconnect. We should LBUG right here.\n");
957 if (lustre_msg_get_last_committed(request->rq_repmsg) > 0 &&
958 lustre_msg_get_last_committed(request->rq_repmsg) <
959 aa->pcaa_peer_committed) {
960 CERROR("%s went back in time (transno "LPD64
961 " was previously committed, server now claims "LPD64
962 ")! See https://bugzilla.lustre.org/show_bug.cgi?"
964 obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
965 lustre_msg_get_last_committed(request->rq_repmsg));
969 rc = ptlrpc_import_recovery_state_machine(imp);
971 if (rc == -ENOTCONN) {
972 CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
973 "invalidating and reconnecting\n",
974 obd2cli_tgt(imp->imp_obd),
975 imp->imp_connection->c_remote_uuid.uuid);
976 ptlrpc_connect_import(imp, NULL);
980 struct obd_connect_data *ocd;
981 struct obd_export *exp;
983 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
984 lustre_swab_connect);
985 spin_lock(&imp->imp_lock);
986 list_del(&imp->imp_conn_current->oic_item);
987 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
988 imp->imp_last_success_conn =
989 imp->imp_conn_current->oic_last_attempt;
992 spin_unlock(&imp->imp_lock);
993 CERROR("Wrong connect data from server\n");
998 imp->imp_connect_data = *ocd;
1000 exp = class_conn2export(&imp->imp_dlm_handle);
1001 spin_unlock(&imp->imp_lock);
1003 /* check that server granted subset of flags we asked for. */
1004 LASSERTF((ocd->ocd_connect_flags &
1005 imp->imp_connect_flags_orig) ==
1006 ocd->ocd_connect_flags, LPX64" != "LPX64,
1007 imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
1010 /* This could happen if export is cleaned during the
1012 CERROR("Missing export for %s\n",
1013 imp->imp_obd->obd_name);
1014 GOTO(out, rc = -ENODEV);
1016 old_connect_flags = exp->exp_connect_flags;
1017 exp->exp_connect_flags = ocd->ocd_connect_flags;
1018 imp->imp_obd->obd_self_export->exp_connect_flags =
1019 ocd->ocd_connect_flags;
1020 class_export_put(exp);
1022 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
1024 if (!ocd->ocd_ibits_known &&
1025 ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
1026 CERROR("Inodebits aware server returned zero compatible"
1029 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1030 (ocd->ocd_version > LUSTRE_VERSION_CODE +
1031 LUSTRE_VERSION_OFFSET_WARN ||
1032 ocd->ocd_version < LUSTRE_VERSION_CODE -
1033 LUSTRE_VERSION_OFFSET_WARN)) {
1034 /* Sigh, some compilers do not like #ifdef in the middle
1035 of macro arguments */
1038 "older. Consider upgrading this client";
1041 "older. Consider recompiling this application";
1043 const char *newer = "newer than client version";
1045 LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1046 "is much %s (%s)\n",
1047 obd2cli_tgt(imp->imp_obd),
1048 OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1049 OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1050 OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1051 OBD_OCD_VERSION_FIX(ocd->ocd_version),
1052 ocd->ocd_version > LUSTRE_VERSION_CODE ?
1053 newer : older, LUSTRE_VERSION_STRING);
1056 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1057 /* We sent to the server ocd_cksum_types with bits set
1058 * for algorithms we understand. The server masked off
1059 * the checksum types it doesn't support */
1060 if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1061 LCONSOLE_WARN("The negotiation of the checksum "
1062 "alogrithm to use with server %s "
1063 "failed (%x/%x), disabling "
1065 obd2cli_tgt(imp->imp_obd),
1066 ocd->ocd_cksum_types,
1068 cli->cl_checksum = 0;
1069 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1070 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1072 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1074 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1075 cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1076 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1077 cli->cl_cksum_type = OBD_CKSUM_ADLER;
1079 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1082 /* The server does not support OBD_CONNECT_CKSUM.
1083 * Enforce CRC32 for backward compatibility*/
1084 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1085 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1088 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1089 cli->cl_max_pages_per_rpc =
1090 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1093 /* Reset ns_connect_flags only for initial connect. It might be
1094 * changed in while using FS and if we reset it in reconnect
1095 * this leads to lossing user settings done before such as
1096 * disable lru_resize, etc. */
1097 if (old_connect_flags != exp->exp_connect_flags ||
1098 aa->pcaa_initial_connect) {
1099 CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1100 "flags: "LPX64"\n", imp->imp_obd->obd_name,
1101 ocd->ocd_connect_flags);
1102 imp->imp_obd->obd_namespace->ns_connect_flags =
1103 ocd->ocd_connect_flags;
1104 imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1105 ocd->ocd_connect_flags;
1108 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1109 (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1110 /* We need a per-message support flag, because
1111 a. we don't know if the incoming connect reply
1112 supports AT or not (in reply_in_callback)
1114 b. failovered server means export and flags are gone
1115 (in ptlrpc_send_reply).
1116 Can only be set when we know AT is supported at
1118 imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1120 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1122 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1123 (cli->cl_max_pages_per_rpc > 0));
1128 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1129 spin_lock(&imp->imp_lock);
1130 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1131 (request->rq_import_generation == imp->imp_generation))
1132 ptlrpc_deactivate_and_unlock_import(imp);
1134 spin_unlock(&imp->imp_lock);
1136 if (imp->imp_recon_bk && imp->imp_last_recon) {
1137 /* Give up trying to reconnect */
1138 imp->imp_obd->obd_no_recov = 1;
1139 ptlrpc_deactivate_import(imp);
1142 if (rc == -EPROTO) {
1143 struct obd_connect_data *ocd;
1144 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1146 lustre_swab_connect);
1148 (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1149 (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1150 /* Actually servers are only supposed to refuse
1151 connection from liblustre clients, so we should
1152 never see this from VFS context */
1153 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1155 " refused connection from this client "
1156 "with an incompatible version (%s). "
1157 "Client must be recompiled\n",
1158 obd2cli_tgt(imp->imp_obd),
1159 OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1160 OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1161 OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1162 OBD_OCD_VERSION_FIX(ocd->ocd_version),
1163 LUSTRE_VERSION_STRING);
1164 ptlrpc_deactivate_import(imp);
1165 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1170 ptlrpc_maybe_ping_import_soon(imp);
1172 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1173 obd2cli_tgt(imp->imp_obd),
1174 (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1177 spin_lock(&imp->imp_lock);
1178 imp->imp_last_recon = 0;
1179 spin_unlock(&imp->imp_lock);
1181 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1185 static int completed_replay_interpret(struct ptlrpc_request *req,
1186 void * data, int rc)
1189 atomic_dec(&req->rq_import->imp_replay_inflight);
1190 if (req->rq_status == 0 &&
1191 !req->rq_import->imp_vbr_failed) {
1192 ptlrpc_import_recovery_state_machine(req->rq_import);
1194 if (req->rq_import->imp_vbr_failed) {
1196 "%s: version recovery fails, reconnecting\n",
1197 req->rq_import->imp_obd->obd_name);
1198 spin_lock(&req->rq_import->imp_lock);
1199 req->rq_import->imp_vbr_failed = 0;
1200 spin_unlock(&req->rq_import->imp_lock);
1202 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1204 req->rq_import->imp_obd->obd_name,
1207 ptlrpc_connect_import(req->rq_import, NULL);
1212 static int signal_completed_replay(struct obd_import *imp)
1214 struct ptlrpc_request *req;
1217 LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1218 atomic_inc(&imp->imp_replay_inflight);
1220 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1222 atomic_dec(&imp->imp_replay_inflight);
1226 ptlrpc_req_set_repsize(req, 1, NULL);
1227 req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1228 lustre_msg_add_flags(req->rq_reqmsg,
1229 MSG_LOCK_REPLAY_DONE |
1230 MSG_REQ_REPLAY_DONE |
1233 if (imp->imp_delayed_recovery)
1234 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1235 req->rq_interpret_reply = completed_replay_interpret;
1238 req->rq_timeout *= 3;
1240 ptlrpcd_add_req(req);
1245 static int ptlrpc_invalidate_import_thread(void *data)
1247 struct obd_import *imp = data;
1251 cfs_daemonize_ctxt("ll_imp_inval");
1253 CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1254 imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1255 imp->imp_connection->c_remote_uuid.uuid);
1257 ptlrpc_invalidate_import(imp);
1259 if (obd_dump_on_eviction) {
1260 CERROR("dump the log upon eviction\n");
1261 libcfs_debug_dumplog();
1264 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1265 ptlrpc_import_recovery_state_machine(imp);
1267 class_import_put(imp);
1272 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1280 if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1281 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1282 &target_start, &target_len);
1283 /* Don't care about MGC eviction */
1284 if (strcmp(imp->imp_obd->obd_type->typ_name,
1285 LUSTRE_MGC_NAME) != 0) {
1286 LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1287 "%.*s; in progress operations using "
1288 "this service will fail.\n",
1289 target_len, target_start);
1291 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1292 obd2cli_tgt(imp->imp_obd),
1293 imp->imp_connection->c_remote_uuid.uuid);
1296 /* bug 17802: XXX client_disconnect_export vs connect request
1297 * race. if client will evicted at this time, we start
1298 * invalidate thread without referece to import and import can
1299 * be freed at same time. */
1300 class_import_get(imp);
1301 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1302 CLONE_VM | CLONE_FILES);
1304 class_import_put(imp);
1305 CERROR("error starting invalidate thread: %d\n", rc);
1311 ptlrpc_invalidate_import(imp);
1313 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1317 if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1318 CDEBUG(D_HA, "replay requested by %s\n",
1319 obd2cli_tgt(imp->imp_obd));
1320 rc = ptlrpc_replay_next(imp, &inflight);
1321 if (inflight == 0 &&
1322 atomic_read(&imp->imp_replay_inflight) == 0) {
1323 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1324 rc = ldlm_replay_locks(imp);
1331 if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1332 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1333 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1334 rc = signal_completed_replay(imp);
1341 if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1342 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1343 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1347 if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1348 CDEBUG(D_HA, "reconnected to %s@%s\n",
1349 obd2cli_tgt(imp->imp_obd),
1350 imp->imp_connection->c_remote_uuid.uuid);
1352 rc = ptlrpc_resend(imp);
1355 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1356 ptlrpc_activate_import(imp);
1358 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1359 &target_start, &target_len);
1360 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1361 "using nid %s.\n", imp->imp_obd->obd_name,
1362 target_len, target_start,
1363 libcfs_nid2str(imp->imp_connection->c_peer.nid));
1366 if (imp->imp_state == LUSTRE_IMP_FULL) {
1367 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1368 ptlrpc_wake_delayed(imp);
1375 static int back_to_sleep(void *unused)
1380 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1382 struct ptlrpc_request *req;
1384 int nowait = imp->imp_obd->obd_force;
1388 GOTO(set_state, rc);
1390 switch (imp->imp_connect_op) {
1391 case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1392 case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1393 case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1395 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1396 obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1400 if (ptlrpc_import_in_recovery(imp)) {
1401 struct l_wait_info lwi;
1402 cfs_duration_t timeout;
1405 timeout = cfs_time_seconds(obd_timeout);
1407 int idx = import_at_get_index(imp,
1408 imp->imp_client->cli_request_portal);
1409 timeout = cfs_time_seconds(
1410 at_get(&imp->imp_at.iat_service_estimate[idx]));
1412 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1413 back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1414 rc = l_wait_event(imp->imp_recovery_waitq,
1415 !ptlrpc_import_in_recovery(imp), &lwi);
1418 spin_lock(&imp->imp_lock);
1419 if (imp->imp_state != LUSTRE_IMP_FULL)
1422 spin_unlock(&imp->imp_lock);
1424 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1426 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1427 * it fails. We can get through the above with a down server
1428 * if the client doesn't know the server is gone yet. */
1429 req->rq_no_resend = 1;
1432 /* We want client umounts to happen quickly, no matter the
1434 req->rq_timeout = min_t(int, req->rq_timeout,
1435 INITIAL_CONNECT_TIMEOUT);
1437 /* ... but we always want liblustre clients to nicely
1438 disconnect, so only use the adaptive value. */
1440 req->rq_timeout = obd_timeout / 3;
1443 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1444 req->rq_send_state = LUSTRE_IMP_CONNECTING;
1445 ptlrpc_req_set_repsize(req, 1, NULL);
1446 rc = ptlrpc_queue_wait(req);
1447 ptlrpc_req_finished(req);
1451 spin_lock(&imp->imp_lock);
1454 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1456 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1457 memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1458 /* Try all connections in the future - bz 12758 */
1459 imp->imp_last_recon = 0;
1460 spin_unlock(&imp->imp_lock);
1465 /* Sets maximal number of RPCs possible originating from other side of this
1466 import (server) to us and number of async RPC replies that we are not waiting
1468 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1470 LNetSetAsync(imp->imp_connection->c_peer, count);
1473 void ptlrpc_cleanup_imp(struct obd_import *imp)
1477 spin_lock(&imp->imp_lock);
1478 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1479 imp->imp_generation++;
1480 spin_unlock(&imp->imp_lock);
1481 ptlrpc_abort_inflight(imp);
1486 /* Adaptive Timeout utils */
1487 extern unsigned int at_min, at_max, at_history;
1489 /* Bin into timeslices using AT_BINS bins.
1490 This gives us a max of the last binlimit*AT_BINS secs without the storage,
1491 but still smoothing out a return to normalcy from a slow response.
1492 (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1493 int at_measured(struct adaptive_timeout *at, unsigned int val)
1495 unsigned int old = at->at_current;
1496 time_t now = cfs_time_current_sec();
1497 time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1500 CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1501 val, at, now - at->at_binstart, at->at_current,
1502 at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1505 /* 0's don't count, because we never want our timeout to
1506 drop to 0, and because 0 could mean an error */
1509 spin_lock(&at->at_lock);
1511 if (unlikely(at->at_binstart == 0)) {
1512 /* Special case to remove default from history */
1513 at->at_current = val;
1514 at->at_worst_ever = val;
1515 at->at_worst_time = now;
1516 at->at_hist[0] = val;
1517 at->at_binstart = now;
1518 } else if (now - at->at_binstart < binlimit ) {
1520 at->at_hist[0] = max(val, at->at_hist[0]);
1521 at->at_current = max(val, at->at_current);
1524 unsigned int maxv = val;
1525 /* move bins over */
1526 shift = (now - at->at_binstart) / binlimit;
1528 for(i = AT_BINS - 1; i >= 0; i--) {
1530 at->at_hist[i] = at->at_hist[i - shift];
1531 maxv = max(maxv, at->at_hist[i]);
1536 at->at_hist[0] = val;
1537 at->at_current = maxv;
1538 at->at_binstart += shift * binlimit;
1541 if (at->at_current > at->at_worst_ever) {
1542 at->at_worst_ever = at->at_current;
1543 at->at_worst_time = now;
1546 if (at->at_flags & AT_FLG_NOHIST)
1547 /* Only keep last reported val; keeping the rest of the history
1549 at->at_current = val;
1552 at->at_current = min(at->at_current, at_max);
1553 at->at_current = max(at->at_current, at_min);
1555 if (at->at_current != old)
1556 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1557 "(val=%u) hist %u %u %u %u\n", at,
1558 old, at->at_current, at->at_current - old, val,
1559 at->at_hist[0], at->at_hist[1], at->at_hist[2],
1562 /* if we changed, report the old value */
1563 old = (at->at_current != old) ? old : 0;
1565 spin_unlock(&at->at_lock);
1569 /* Find the imp_at index for a given portal; assign if space available */
1570 int import_at_get_index(struct obd_import *imp, int portal)
1572 struct imp_at *at = &imp->imp_at;
1575 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1576 if (at->iat_portal[i] == portal)
1578 if (at->iat_portal[i] == 0)
1583 /* Not found in list, add it under a lock */
1584 spin_lock(&imp->imp_lock);
1586 /* Check unused under lock */
1587 for (; i < IMP_AT_MAX_PORTALS; i++) {
1588 if (at->iat_portal[i] == portal)
1590 if (at->iat_portal[i] == 0)
1595 /* Not enough portals? */
1596 LASSERT(i < IMP_AT_MAX_PORTALS);
1598 at->iat_portal[i] = portal;
1600 spin_unlock(&imp->imp_lock);