1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ptlrpc/import.c
38 * Author: Mike Shaver <shaver@clusterfs.com>
41 #define DEBUG_SUBSYSTEM S_RPC
43 # include <liblustre.h>
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
52 #include <obd_class.h>
54 #include "ptlrpc_internal.h"
56 struct ptlrpc_connect_async_args {
57 __u64 pcaa_peer_committed;
58 int pcaa_initial_connect;
61 /* A CLOSED import should remain so. */
62 #define IMPORT_SET_STATE_NOLOCK(imp, state) \
64 if (imp->imp_state != LUSTRE_IMP_CLOSED) { \
65 CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n", \
66 imp, obd2cli_tgt(imp->imp_obd), \
67 ptlrpc_import_state_name(imp->imp_state), \
68 ptlrpc_import_state_name(state)); \
69 imp->imp_state = state; \
73 #define IMPORT_SET_STATE(imp, state) \
75 spin_lock(&imp->imp_lock); \
76 IMPORT_SET_STATE_NOLOCK(imp, state); \
77 spin_unlock(&imp->imp_lock); \
81 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
83 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
85 /* Only this function is allowed to change the import state when it is
86 * CLOSED. I would rather refcount the import and free it after
87 * disconnection like we do with exports. To do that, the client_obd
88 * will need to save the peer info somewhere other than in the import,
90 int ptlrpc_init_import(struct obd_import *imp)
92 spin_lock(&imp->imp_lock);
94 imp->imp_generation++;
95 imp->imp_state = LUSTRE_IMP_NEW;
97 spin_unlock(&imp->imp_lock);
101 EXPORT_SYMBOL(ptlrpc_init_import);
103 #define UUID_STR "_UUID"
104 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
107 *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
108 ? uuid : uuid + strlen(prefix);
110 *uuid_len = strlen(*uuid_start);
112 if (*uuid_len < strlen(UUID_STR))
115 if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
116 UUID_STR, strlen(UUID_STR)))
117 *uuid_len -= strlen(UUID_STR);
120 /* Returns true if import was FULL, false if import was already not
122 * @imp - import to be disconnected
123 * @conn_cnt - connection count (epoch) of the request that timed out
124 * and caused the disconnection. In some cases, multiple
125 * inflight requests can fail to a single target (e.g. OST
126 * bulk requests) and if one has already caused a reconnection
127 * (increasing the import->conn_cnt) the older failure should
128 * not also cause a reconnection. If zero it forces a reconnect.
130 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
134 spin_lock(&imp->imp_lock);
136 if (imp->imp_state == LUSTRE_IMP_FULL &&
137 (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
141 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
142 &target_start, &target_len);
143 if (imp->imp_replayable) {
144 LCONSOLE_WARN("%s: Connection to service %.*s via nid "
145 "%s was lost; in progress operations using this "
146 "service will wait for recovery to complete.\n",
147 imp->imp_obd->obd_name, target_len, target_start,
148 libcfs_nid2str(imp->imp_connection->c_peer.nid));
150 LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
151 "%.*s via nid %s was lost; in progress "
152 "operations using this service will fail.\n",
153 imp->imp_obd->obd_name, target_len, target_start,
154 libcfs_nid2str(imp->imp_connection->c_peer.nid));
156 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
157 spin_unlock(&imp->imp_lock);
159 if (obd_dump_on_timeout)
160 libcfs_debug_dumplog();
162 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
165 spin_unlock(&imp->imp_lock);
166 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
167 imp->imp_client->cli_name, imp,
168 (imp->imp_state == LUSTRE_IMP_FULL &&
169 imp->imp_conn_cnt > conn_cnt) ?
170 "reconnected" : "not connected", imp->imp_conn_cnt,
171 conn_cnt, ptlrpc_import_state_name(imp->imp_state));
177 /* Must be called with imp_lock held! */
178 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
181 LASSERT_SPIN_LOCKED(&imp->imp_lock);
183 CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
184 imp->imp_invalid = 1;
185 imp->imp_generation++;
186 spin_unlock(&imp->imp_lock);
188 ptlrpc_abort_inflight(imp);
189 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
193 * This acts as a barrier; all existing requests are rejected, and
194 * no new requests will be accepted until the import is valid again.
196 void ptlrpc_deactivate_import(struct obd_import *imp)
198 spin_lock(&imp->imp_lock);
199 ptlrpc_deactivate_and_unlock_import(imp);
203 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
207 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
208 (req->rq_phase == RQ_PHASE_BULK) ||
209 (req->rq_phase == RQ_PHASE_NEW)))
212 if (req->rq_timedout)
215 if (req->rq_phase == RQ_PHASE_NEW)
218 dl = req->rq_deadline;
226 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
228 time_t now = cfs_time_current_sec();
229 struct list_head *tmp, *n;
230 struct ptlrpc_request *req;
231 unsigned int timeout = 0;
233 spin_lock(&imp->imp_lock);
234 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
235 req = list_entry(tmp, struct ptlrpc_request, rq_list);
236 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
238 spin_unlock(&imp->imp_lock);
243 * This function will invalidate the import, if necessary, then block
244 * for all the RPC completions, and finally notify the obd to
245 * invalidate its state (ie cancel locks, clear pending requests,
248 void ptlrpc_invalidate_import(struct obd_import *imp)
250 struct list_head *tmp, *n;
251 struct ptlrpc_request *req;
252 struct l_wait_info lwi;
253 unsigned int timeout;
256 atomic_inc(&imp->imp_inval_count);
259 * If this is an invalid MGC connection, then don't bother
260 * waiting for imp_inflight to drop to 0.
262 if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
265 if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
266 ptlrpc_deactivate_import(imp);
268 LASSERT(imp->imp_invalid);
270 /* Wait forever until inflight == 0. We really can't do it another
271 * way because in some cases we need to wait for very long reply
272 * unlink. We can't do anything before that because there is really
273 * no guarantee that some rdma transfer is not in progress right now. */
275 /* Calculate max timeout for waiting on rpcs to error
276 * out. Use obd_timeout if calculated value is smaller
278 timeout = ptlrpc_inflight_timeout(imp);
279 timeout += timeout / 3;
282 timeout = obd_timeout;
284 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
287 /* Wait for all requests to error out and call completion
288 * callbacks. Cap it at obd_timeout -- these should all
289 * have been locally cancelled by ptlrpc_abort_inflight. */
290 lwi = LWI_TIMEOUT_INTERVAL(
291 cfs_timeout_cap(cfs_time_seconds(timeout)),
292 cfs_time_seconds(1), NULL, NULL);
293 rc = l_wait_event(imp->imp_recovery_waitq,
294 (atomic_read(&imp->imp_inflight) == 0), &lwi);
296 const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
298 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
299 cli_tgt, rc, atomic_read(&imp->imp_inflight));
301 spin_lock(&imp->imp_lock);
302 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
303 req = list_entry(tmp, struct ptlrpc_request,
305 DEBUG_REQ(D_ERROR, req,"still on sending list");
307 list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
308 req = list_entry(tmp, struct ptlrpc_request,
310 DEBUG_REQ(D_ERROR, req,"still on delayed list");
313 if (atomic_read(&imp->imp_unregistering) == 0) {
314 /* We know that only "unregistering" rpcs may
315 * still survive in sending or delaying lists
316 * (They are waiting for long reply unlink in
317 * sluggish nets). Let's check this. If there
318 * is no unregistering and inflight != 0 this
320 LASSERT(atomic_read(&imp->imp_inflight) == 0);
322 /* Let's save one loop as soon as inflight have
323 * dropped to zero. No new inflights possible at
327 CERROR("%s: RPCs in \"%s\" phase found (%d). "
328 "Network is sluggish? Waiting them "
329 "to error out.\n", cli_tgt,
330 ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
331 atomic_read(&imp->imp_unregistering));
333 spin_unlock(&imp->imp_lock);
337 /* Let's additionally check that no new rpcs added to import in
338 * "invalidate" state. */
339 LASSERT(atomic_read(&imp->imp_inflight) == 0);
342 obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
344 atomic_dec(&imp->imp_inval_count);
345 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
348 /* unset imp_invalid */
349 void ptlrpc_activate_import(struct obd_import *imp)
351 struct obd_device *obd = imp->imp_obd;
353 spin_lock(&imp->imp_lock);
354 imp->imp_invalid = 0;
355 spin_unlock(&imp->imp_lock);
357 obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
360 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
364 LASSERT(!imp->imp_dlm_fake);
366 if (ptlrpc_set_import_discon(imp, conn_cnt)) {
367 if (!imp->imp_replayable) {
368 CDEBUG(D_HA, "import %s@%s for %s not replayable, "
369 "auto-deactivating\n",
370 obd2cli_tgt(imp->imp_obd),
371 imp->imp_connection->c_remote_uuid.uuid,
372 imp->imp_obd->obd_name);
373 ptlrpc_deactivate_import(imp);
376 CDEBUG(D_HA, "%s: waking up pinger\n",
377 obd2cli_tgt(imp->imp_obd));
379 spin_lock(&imp->imp_lock);
380 imp->imp_force_verify = 1;
381 spin_unlock(&imp->imp_lock);
383 ptlrpc_pinger_wake_up();
388 int ptlrpc_reconnect_import(struct obd_import *imp)
391 ptlrpc_set_import_discon(imp, 0);
392 /* Force a new connect attempt */
393 ptlrpc_invalidate_import(imp);
394 /* Do a fresh connect next time by zeroing the handle */
395 ptlrpc_disconnect_import(imp, 1);
396 /* Wait for all invalidate calls to finish */
397 if (atomic_read(&imp->imp_inval_count) > 0) {
399 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
400 rc = l_wait_event(imp->imp_recovery_waitq,
401 (atomic_read(&imp->imp_inval_count) == 0),
404 CERROR("Interrupted, inval=%d\n",
405 atomic_read(&imp->imp_inval_count));
409 * Allow reconnect attempts. Note: Currently, the function is
410 * only called by MGC. So assume this is a recoverable import,
411 * and force import to be recoverable. fix this if you need to
414 imp->imp_obd->obd_no_recov = 0;
415 /* Remove 'invalid' flag */
416 ptlrpc_activate_import(imp);
417 /* Attempt a new connect */
418 ptlrpc_recover_import(imp, NULL);
422 EXPORT_SYMBOL(ptlrpc_reconnect_import);
424 static int import_select_connection(struct obd_import *imp)
426 struct obd_import_conn *imp_conn = NULL, *conn;
427 struct obd_export *dlmexp;
431 spin_lock(&imp->imp_lock);
433 if (list_empty(&imp->imp_conn_list)) {
434 CERROR("%s: no connections available\n",
435 imp->imp_obd->obd_name);
436 spin_unlock(&imp->imp_lock);
440 list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
441 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
442 imp->imp_obd->obd_name,
443 libcfs_nid2str(conn->oic_conn->c_peer.nid),
444 conn->oic_last_attempt);
446 /* Don't thrash connections */
447 if (cfs_time_before_64(cfs_time_current_64(),
448 conn->oic_last_attempt +
449 cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
453 /* If we have not tried this connection since the
454 the last successful attempt, go with this one */
455 if ((conn->oic_last_attempt == 0) ||
456 cfs_time_beforeq_64(conn->oic_last_attempt,
457 imp->imp_last_success_conn)) {
463 /* If all of the connections have already been tried
464 since the last successful connection; just choose the
465 least recently used */
468 else if (cfs_time_before_64(conn->oic_last_attempt,
469 imp_conn->oic_last_attempt))
473 /* if not found, simply choose the current one */
475 LASSERT(imp->imp_conn_current);
476 imp_conn = imp->imp_conn_current;
479 LASSERT(imp_conn->oic_conn);
481 /* If we've tried everything, and we're back to the beginning of the
482 list, increase our timeout and try again. It will be reset when
483 we do finally connect. (FIXME: really we should wait for all network
484 state associated with the last connection attempt to drain before
485 trying to reconnect on it.) */
486 if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
487 !imp->imp_recon_bk /* not retrying */) {
488 if (at_get(&imp->imp_at.iat_net_latency) <
489 CONNECTION_SWITCH_MAX) {
490 at_add(&imp->imp_at.iat_net_latency,
491 at_get(&imp->imp_at.iat_net_latency) +
492 CONNECTION_SWITCH_INC);
494 LASSERT(imp_conn->oic_last_attempt);
495 CWARN("%s: tried all connections, increasing latency to %ds\n",
496 imp->imp_obd->obd_name,
497 at_get(&imp->imp_at.iat_net_latency));
500 imp_conn->oic_last_attempt = cfs_time_current_64();
502 /* switch connection, don't mind if it's same as the current one */
503 if (imp->imp_connection)
504 ptlrpc_connection_put(imp->imp_connection);
505 imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
507 dlmexp = class_conn2export(&imp->imp_dlm_handle);
508 LASSERT(dlmexp != NULL);
509 if (dlmexp->exp_connection)
510 ptlrpc_connection_put(dlmexp->exp_connection);
511 dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
512 class_export_put(dlmexp);
514 if (imp->imp_conn_current != imp_conn) {
515 if (imp->imp_conn_current)
516 LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
517 imp->imp_obd->obd_name,
518 imp_conn->oic_uuid.uuid,
519 libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
520 imp->imp_conn_current = imp_conn;
523 CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
524 imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
525 libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
527 spin_unlock(&imp->imp_lock);
533 * must be called under imp lock
535 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
537 struct ptlrpc_request *req;
538 struct list_head *tmp;
540 if (list_empty(&imp->imp_replay_list))
542 tmp = imp->imp_replay_list.next;
543 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
544 *transno = req->rq_transno;
545 if (req->rq_transno == 0) {
546 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
553 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
555 struct obd_device *obd = imp->imp_obd;
557 int initial_connect = 0;
559 __u64 committed_before_reconnect = 0;
560 struct ptlrpc_request *request;
561 __u32 size[] = { sizeof(struct ptlrpc_body),
562 sizeof(imp->imp_obd->u.cli.cl_target_uuid),
563 sizeof(obd->obd_uuid),
564 sizeof(imp->imp_dlm_handle),
565 sizeof(imp->imp_connect_data) };
566 char *tmp[] = { NULL,
567 obd2cli_tgt(imp->imp_obd),
569 (char *)&imp->imp_dlm_handle,
570 (char *)&imp->imp_connect_data };
571 struct ptlrpc_connect_async_args *aa;
574 spin_lock(&imp->imp_lock);
575 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
576 spin_unlock(&imp->imp_lock);
577 CERROR("can't connect to a closed import\n");
579 } else if (imp->imp_state == LUSTRE_IMP_FULL) {
580 spin_unlock(&imp->imp_lock);
581 CERROR("already connected\n");
583 } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
584 spin_unlock(&imp->imp_lock);
585 CERROR("already connecting\n");
589 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
592 imp->imp_resend_replay = 0;
594 if (!lustre_handle_is_used(&imp->imp_remote_handle))
597 committed_before_reconnect = imp->imp_peer_committed_transno;
599 set_transno = ptlrpc_first_transno(imp,
600 &imp->imp_connect_data.ocd_transno);
602 spin_unlock(&imp->imp_lock);
605 struct obd_uuid uuid;
607 obd_str2uuid(&uuid, new_uuid);
608 rc = import_set_conn_priority(imp, &uuid);
613 rc = import_select_connection(imp);
617 /* last in connection list */
618 if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
619 if (imp->imp_initial_recov_bk && initial_connect) {
620 CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
621 imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
622 /* Don't retry if connect fails */
624 obd_set_info_async(obd->obd_self_export,
625 sizeof(KEY_INIT_RECOV),
627 sizeof(rc), &rc, NULL);
629 if (imp->imp_recon_bk) {
630 CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
631 imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
632 spin_lock(&imp->imp_lock);
633 imp->imp_last_recon = 1;
634 spin_unlock(&imp->imp_lock);
638 /* Reset connect flags to the originally requested flags, in case
639 * the server is updated on-the-fly we will get the new features. */
640 imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
641 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
643 rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
644 &obd->obd_uuid, &imp->imp_connect_data, NULL);
648 request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
651 GOTO(out, rc = -ENOMEM);
653 /* Report the rpc service time to the server so that it knows how long
654 * to wait for clients to join recovery */
655 lustre_msg_set_service_time(request->rq_reqmsg,
656 at_timeout2est(request->rq_timeout));
658 /* The amount of time we give the server to process the connect req.
659 * import_select_connection will increase the net latency on
660 * repeated reconnect attempts to cover slow networks.
661 * We override/ignore the server rpc completion estimate here,
662 * which may be large if this is a reconnect attempt */
663 request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
664 lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
667 lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
669 if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
670 lustre_msg_add_op_flags(request->rq_reqmsg,
671 MSG_CONNECT_NEXT_VER);
673 request->rq_no_resend = request->rq_no_delay = 1;
674 request->rq_send_state = LUSTRE_IMP_CONNECTING;
675 /* Allow a slightly larger reply for future growth compatibility */
676 size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
678 ptlrpc_req_set_repsize(request, 2, size);
679 request->rq_interpret_reply = ptlrpc_connect_interpret;
681 CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
682 aa = ptlrpc_req_async_args(request);
683 memset(aa, 0, sizeof *aa);
685 aa->pcaa_peer_committed = committed_before_reconnect;
686 aa->pcaa_initial_connect = initial_connect;
687 if (aa->pcaa_initial_connect) {
688 spin_lock(&imp->imp_lock);
689 imp->imp_replayable = 1;
690 spin_unlock(&imp->imp_lock);
691 lustre_msg_add_op_flags(request->rq_reqmsg,
692 MSG_CONNECT_INITIAL);
696 lustre_msg_add_op_flags(request->rq_reqmsg,
697 MSG_CONNECT_TRANSNO);
699 DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
700 aa->pcaa_initial_connect ? "initial " : "re",
702 ptlrpcd_add_req(request);
706 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
711 EXPORT_SYMBOL(ptlrpc_connect_import);
713 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
716 struct obd_import_conn *imp_conn;
722 spin_lock(&imp->imp_lock);
723 if (list_empty(&imp->imp_conn_list))
727 imp_conn = list_entry(imp->imp_conn_list.prev,
728 struct obd_import_conn,
731 /* XXX: When the failover node is the primary node, it is possible
732 * to have two identical connections in imp_conn_list. We must
733 * compare not conn's pointers but NIDs, otherwise we can defeat
734 * connection throttling. (See bug 14774.) */
735 if (imp->imp_conn_current->oic_conn->c_peer.nid !=
736 imp_conn->oic_conn->c_peer.nid) {
737 ptlrpc_ping_import_soon(imp);
742 /* liblustre has no pinger thead, so we wakup pinger anyway */
746 spin_unlock(&imp->imp_lock);
749 ptlrpc_pinger_wake_up();
754 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
757 struct ptlrpc_connect_async_args *aa = data;
758 struct obd_import *imp = request->rq_import;
759 struct client_obd *cli = &imp->imp_obd->u.cli;
760 struct lustre_handle old_hdl;
761 __u64 old_connect_flags;
765 spin_lock(&imp->imp_lock);
766 if (imp->imp_state == LUSTRE_IMP_CLOSED) {
767 spin_unlock(&imp->imp_lock);
770 spin_unlock(&imp->imp_lock);
775 LASSERT(imp->imp_conn_current);
777 msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
779 /* All imports are pingable */
780 spin_lock(&imp->imp_lock);
781 imp->imp_pingable = 1;
783 if (aa->pcaa_initial_connect) {
784 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
785 imp->imp_replayable = 1;
786 spin_unlock(&imp->imp_lock);
787 CDEBUG(D_HA, "connected to replayable target: %s\n",
788 obd2cli_tgt(imp->imp_obd));
790 imp->imp_replayable = 0;
791 spin_unlock(&imp->imp_lock);
794 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
795 msg_flags & MSG_CONNECT_NEXT_VER) ||
796 request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
797 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
798 CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
799 obd2cli_tgt(imp->imp_obd));
801 CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
802 obd2cli_tgt(imp->imp_obd));
805 imp->imp_remote_handle =
806 *lustre_msg_get_handle(request->rq_repmsg);
808 /* Initial connects are allowed for clients with non-random
809 * uuids when servers are in recovery. Simply signal the
810 * servers replay is complete and wait in REPLAY_WAIT. */
811 if (msg_flags & MSG_CONNECT_RECOVERING) {
812 CDEBUG(D_HA, "connect to %s during recovery\n",
813 obd2cli_tgt(imp->imp_obd));
814 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
816 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
817 ptlrpc_activate_import(imp);
819 GOTO(finish, rc = 0);
821 spin_unlock(&imp->imp_lock);
824 /* Determine what recovery state to move the import to. */
825 if (MSG_CONNECT_RECONNECT & msg_flags) {
826 memset(&old_hdl, 0, sizeof(old_hdl));
827 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
829 CERROR("%s@%s didn't like our handle "LPX64
830 ", failed\n", obd2cli_tgt(imp->imp_obd),
831 imp->imp_connection->c_remote_uuid.uuid,
832 imp->imp_dlm_handle.cookie);
833 GOTO(out, rc = -ENOTCONN);
836 if (memcmp(&imp->imp_remote_handle,
837 lustre_msg_get_handle(request->rq_repmsg),
838 sizeof(imp->imp_remote_handle))) {
839 int level = msg_flags & MSG_CONNECT_RECOVERING ?
842 /* Bug 16611/14775: if server handle have changed,
843 * that means some sort of disconnection happened.
844 * If the server is not in recovery, that also means it
845 * already erased all of our state because of previous
846 * eviction. If it is in recovery - we are safe to
847 * participate since we can reestablish all of our state
848 * with server again */
849 CDEBUG(level,"%s@%s changed server handle from "
850 LPX64" to "LPX64"%s\n",
851 obd2cli_tgt(imp->imp_obd),
852 imp->imp_connection->c_remote_uuid.uuid,
853 imp->imp_remote_handle.cookie,
854 lustre_msg_get_handle(request->rq_repmsg)->
856 (MSG_CONNECT_RECOVERING & msg_flags) ?
857 " but is still in recovery" : "");
859 imp->imp_remote_handle =
860 *lustre_msg_get_handle(request->rq_repmsg);
862 if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
863 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
864 GOTO(finish, rc = 0);
868 CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
869 obd2cli_tgt(imp->imp_obd),
870 imp->imp_connection->c_remote_uuid.uuid);
873 if (imp->imp_invalid) {
874 CDEBUG(D_HA, "%s: reconnected but import is invalid; "
875 "marking evicted\n", imp->imp_obd->obd_name);
876 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
877 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
878 CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
879 imp->imp_obd->obd_name,
880 obd2cli_tgt(imp->imp_obd));
882 spin_lock(&imp->imp_lock);
883 imp->imp_resend_replay = 1;
884 /* VBR: delayed connection */
885 if (MSG_CONNECT_DELAYED & msg_flags) {
886 imp->imp_delayed_recovery = 1;
887 imp->imp_no_lock_replay = 1;
889 spin_unlock(&imp->imp_lock);
891 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
893 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
895 } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
896 LASSERT(imp->imp_replayable);
897 imp->imp_remote_handle =
898 *lustre_msg_get_handle(request->rq_repmsg);
899 imp->imp_last_replay_transno = 0;
900 /* VBR: delayed connection */
901 if (MSG_CONNECT_DELAYED & msg_flags) {
902 spin_lock(&imp->imp_lock);
903 imp->imp_delayed_recovery = 1;
904 imp->imp_no_lock_replay = 1;
905 spin_unlock(&imp->imp_lock);
907 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
909 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
910 "flags reconnect/recovering not set: %x)",msg_flags);
911 imp->imp_remote_handle =
912 *lustre_msg_get_handle(request->rq_repmsg);
913 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
916 /* Sanity checks for a reconnected import. */
917 if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
918 CERROR("imp_replayable flag does not match server "
919 "after reconnect. We should LBUG right here.\n");
922 if (lustre_msg_get_last_committed(request->rq_repmsg) <
923 aa->pcaa_peer_committed) {
924 CERROR("%s went back in time (transno "LPD64
925 " was previously committed, server now claims "LPD64
926 ")! See https://bugzilla.lustre.org/show_bug.cgi?"
928 obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
929 lustre_msg_get_last_committed(request->rq_repmsg));
933 rc = ptlrpc_import_recovery_state_machine(imp);
935 if (rc == -ENOTCONN) {
936 CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
937 "invalidating and reconnecting\n",
938 obd2cli_tgt(imp->imp_obd),
939 imp->imp_connection->c_remote_uuid.uuid);
940 ptlrpc_connect_import(imp, NULL);
944 struct obd_connect_data *ocd;
945 struct obd_export *exp;
947 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
948 lustre_swab_connect);
949 spin_lock(&imp->imp_lock);
950 list_del(&imp->imp_conn_current->oic_item);
951 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
952 imp->imp_last_success_conn =
953 imp->imp_conn_current->oic_last_attempt;
956 spin_unlock(&imp->imp_lock);
957 CERROR("Wrong connect data from server\n");
962 imp->imp_connect_data = *ocd;
964 exp = class_conn2export(&imp->imp_dlm_handle);
965 spin_unlock(&imp->imp_lock);
967 /* check that server granted subset of flags we asked for. */
968 LASSERTF((ocd->ocd_connect_flags &
969 imp->imp_connect_flags_orig) ==
970 ocd->ocd_connect_flags, LPX64" != "LPX64,
971 imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
974 /* This could happen if export is cleaned during the
976 CERROR("Missing export for %s\n",
977 imp->imp_obd->obd_name);
978 GOTO(out, rc = -ENODEV);
980 old_connect_flags = exp->exp_connect_flags;
981 exp->exp_connect_flags = ocd->ocd_connect_flags;
982 imp->imp_obd->obd_self_export->exp_connect_flags =
983 ocd->ocd_connect_flags;
984 class_export_put(exp);
986 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
988 if (!ocd->ocd_ibits_known &&
989 ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
990 CERROR("Inodebits aware server returned zero compatible"
993 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
994 (ocd->ocd_version > LUSTRE_VERSION_CODE +
995 LUSTRE_VERSION_OFFSET_WARN ||
996 ocd->ocd_version < LUSTRE_VERSION_CODE -
997 LUSTRE_VERSION_OFFSET_WARN)) {
998 /* Sigh, some compilers do not like #ifdef in the middle
999 of macro arguments */
1002 "older. Consider upgrading this client";
1005 "older. Consider recompiling this application";
1007 const char *newer = "newer than client version";
1009 LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1010 "is much %s (%s)\n",
1011 obd2cli_tgt(imp->imp_obd),
1012 OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1013 OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1014 OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1015 OBD_OCD_VERSION_FIX(ocd->ocd_version),
1016 ocd->ocd_version > LUSTRE_VERSION_CODE ?
1017 newer : older, LUSTRE_VERSION_STRING);
1020 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1021 /* We sent to the server ocd_cksum_types with bits set
1022 * for algorithms we understand. The server masked off
1023 * the checksum types it doesn't support */
1024 if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1025 LCONSOLE_WARN("The negotiation of the checksum "
1026 "alogrithm to use with server %s "
1027 "failed (%x/%x), disabling "
1029 obd2cli_tgt(imp->imp_obd),
1030 ocd->ocd_cksum_types,
1032 cli->cl_checksum = 0;
1033 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1034 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1036 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1038 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1039 cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1040 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1041 cli->cl_cksum_type = OBD_CKSUM_ADLER;
1043 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1046 /* The server does not support OBD_CONNECT_CKSUM.
1047 * Enforce CRC32 for backward compatibility*/
1048 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1049 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1052 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1053 cli->cl_max_pages_per_rpc =
1054 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1057 /* Reset ns_connect_flags only for initial connect. It might be
1058 * changed in while using FS and if we reset it in reconnect
1059 * this leads to lossing user settings done before such as
1060 * disable lru_resize, etc. */
1061 if (old_connect_flags != exp->exp_connect_flags ||
1062 aa->pcaa_initial_connect) {
1063 CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1064 "flags: "LPX64"\n", imp->imp_obd->obd_name,
1065 ocd->ocd_connect_flags);
1066 imp->imp_obd->obd_namespace->ns_connect_flags =
1067 ocd->ocd_connect_flags;
1068 imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1069 ocd->ocd_connect_flags;
1072 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1073 (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1074 /* We need a per-message support flag, because
1075 a. we don't know if the incoming connect reply
1076 supports AT or not (in reply_in_callback)
1078 b. failovered server means export and flags are gone
1079 (in ptlrpc_send_reply).
1080 Can only be set when we know AT is supported at
1082 imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1084 imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1086 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1087 (cli->cl_max_pages_per_rpc > 0));
1092 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1093 spin_lock(&imp->imp_lock);
1094 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1095 (request->rq_import_generation == imp->imp_generation))
1096 ptlrpc_deactivate_and_unlock_import(imp);
1098 spin_unlock(&imp->imp_lock);
1100 if (imp->imp_recon_bk && imp->imp_last_recon) {
1101 /* Give up trying to reconnect */
1102 imp->imp_obd->obd_no_recov = 1;
1103 ptlrpc_deactivate_import(imp);
1106 if (rc == -EPROTO) {
1107 struct obd_connect_data *ocd;
1108 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1110 lustre_swab_connect);
1112 (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1113 (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1114 /* Actually servers are only supposed to refuse
1115 connection from liblustre clients, so we should
1116 never see this from VFS context */
1117 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1119 " refused connection from this client "
1120 "with an incompatible version (%s). "
1121 "Client must be recompiled\n",
1122 obd2cli_tgt(imp->imp_obd),
1123 OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1124 OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1125 OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1126 OBD_OCD_VERSION_FIX(ocd->ocd_version),
1127 LUSTRE_VERSION_STRING);
1128 ptlrpc_deactivate_import(imp);
1129 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1134 ptlrpc_maybe_ping_import_soon(imp);
1136 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1137 obd2cli_tgt(imp->imp_obd),
1138 (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1141 spin_lock(&imp->imp_lock);
1142 imp->imp_last_recon = 0;
1143 spin_unlock(&imp->imp_lock);
1145 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1149 static int completed_replay_interpret(struct ptlrpc_request *req,
1150 void * data, int rc)
1153 atomic_dec(&req->rq_import->imp_replay_inflight);
1154 if (req->rq_status == 0 &&
1155 !req->rq_import->imp_vbr_failed) {
1156 ptlrpc_import_recovery_state_machine(req->rq_import);
1158 if (req->rq_import->imp_vbr_failed) {
1160 "%s: version recovery fails, reconnecting\n",
1161 req->rq_import->imp_obd->obd_name);
1162 spin_lock(&req->rq_import->imp_lock);
1163 req->rq_import->imp_vbr_failed = 0;
1164 spin_unlock(&req->rq_import->imp_lock);
1166 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1168 req->rq_import->imp_obd->obd_name,
1171 ptlrpc_connect_import(req->rq_import, NULL);
1176 static int signal_completed_replay(struct obd_import *imp)
1178 struct ptlrpc_request *req;
1181 LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1182 atomic_inc(&imp->imp_replay_inflight);
1184 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1186 atomic_dec(&imp->imp_replay_inflight);
1190 ptlrpc_req_set_repsize(req, 1, NULL);
1191 req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1192 lustre_msg_add_flags(req->rq_reqmsg,
1193 MSG_LOCK_REPLAY_DONE |
1194 MSG_REQ_REPLAY_DONE |
1197 if (imp->imp_delayed_recovery)
1198 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1199 req->rq_timeout *= 3;
1200 req->rq_interpret_reply = completed_replay_interpret;
1202 ptlrpcd_add_req(req);
1207 static int ptlrpc_invalidate_import_thread(void *data)
1209 struct obd_import *imp = data;
1214 ptlrpc_daemonize("ll_imp_inval");
1216 CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1217 imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1218 imp->imp_connection->c_remote_uuid.uuid);
1220 ptlrpc_invalidate_import(imp);
1222 /* is client_disconnect_export in flight ? */
1223 spin_lock(&imp->imp_lock);
1224 disconnect = imp->imp_deactive;
1225 spin_unlock(&imp->imp_lock);
1229 if (obd_dump_on_eviction) {
1230 CERROR("dump the log upon eviction\n");
1231 libcfs_debug_dumplog();
1234 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1235 ptlrpc_import_recovery_state_machine(imp);
1238 class_import_put(imp);
1243 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1251 if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1252 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1253 &target_start, &target_len);
1254 /* Don't care about MGC eviction */
1255 if (strcmp(imp->imp_obd->obd_type->typ_name,
1256 LUSTRE_MGC_NAME) != 0) {
1257 LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1258 "%.*s; in progress operations using "
1259 "this service will fail.\n",
1260 target_len, target_start);
1262 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1263 obd2cli_tgt(imp->imp_obd),
1264 imp->imp_connection->c_remote_uuid.uuid);
1267 /* bug 17802: XXX client_disconnect_export vs connect request
1268 * race. if client will evicted at this time, we start
1269 * invalidate thread without referece to import and import can
1270 * be freed at same time. */
1271 class_import_get(imp);
1272 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1273 CLONE_VM | CLONE_FILES);
1275 class_import_put(imp);
1276 CERROR("error starting invalidate thread: %d\n", rc);
1282 ptlrpc_invalidate_import(imp);
1284 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1288 if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1289 CDEBUG(D_HA, "replay requested by %s\n",
1290 obd2cli_tgt(imp->imp_obd));
1291 rc = ptlrpc_replay_next(imp, &inflight);
1292 if (inflight == 0 &&
1293 atomic_read(&imp->imp_replay_inflight) == 0) {
1294 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1295 rc = ldlm_replay_locks(imp);
1302 if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1303 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1304 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1305 rc = signal_completed_replay(imp);
1312 if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1313 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1314 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1318 if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1319 CDEBUG(D_HA, "reconnected to %s@%s\n",
1320 obd2cli_tgt(imp->imp_obd),
1321 imp->imp_connection->c_remote_uuid.uuid);
1323 rc = ptlrpc_resend(imp);
1326 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1327 ptlrpc_activate_import(imp);
1329 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1330 &target_start, &target_len);
1331 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1332 "using nid %s.\n", imp->imp_obd->obd_name,
1333 target_len, target_start,
1334 libcfs_nid2str(imp->imp_connection->c_peer.nid));
1337 if (imp->imp_state == LUSTRE_IMP_FULL) {
1338 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1339 ptlrpc_wake_delayed(imp);
1346 static int back_to_sleep(void *unused)
1351 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1353 struct ptlrpc_request *req;
1355 int nowait = imp->imp_obd->obd_force;
1359 GOTO(set_state, rc);
1361 switch (imp->imp_connect_op) {
1362 case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1363 case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1364 case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1366 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1367 obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1371 if (ptlrpc_import_in_recovery(imp)) {
1372 struct l_wait_info lwi;
1373 cfs_duration_t timeout;
1376 timeout = cfs_time_seconds(obd_timeout);
1378 int idx = import_at_get_index(imp,
1379 imp->imp_client->cli_request_portal);
1380 timeout = cfs_time_seconds(
1381 at_get(&imp->imp_at.iat_service_estimate[idx]));
1383 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1384 back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1385 rc = l_wait_event(imp->imp_recovery_waitq,
1386 !ptlrpc_import_in_recovery(imp), &lwi);
1389 spin_lock(&imp->imp_lock);
1390 if (imp->imp_state != LUSTRE_IMP_FULL)
1393 spin_unlock(&imp->imp_lock);
1395 req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1397 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1398 * it fails. We can get through the above with a down server
1399 * if the client doesn't know the server is gone yet. */
1400 req->rq_no_resend = 1;
1403 /* We want client umounts to happen quickly, no matter the
1405 req->rq_timeout = min_t(int, req->rq_timeout,
1406 INITIAL_CONNECT_TIMEOUT);
1408 /* ... but we always want liblustre clients to nicely
1409 disconnect, so only use the adaptive value. */
1411 req->rq_timeout = obd_timeout / 3;
1414 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1415 req->rq_send_state = LUSTRE_IMP_CONNECTING;
1416 ptlrpc_req_set_repsize(req, 1, NULL);
1417 rc = ptlrpc_queue_wait(req);
1418 ptlrpc_req_finished(req);
1422 spin_lock(&imp->imp_lock);
1425 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1427 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1428 memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1429 /* Try all connections in the future - bz 12758 */
1430 imp->imp_last_recon = 0;
1431 spin_unlock(&imp->imp_lock);
1436 /* Sets maximal number of RPCs possible originating from other side of this
1437 import (server) to us and number of async RPC replies that we are not waiting
1439 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1441 LNetSetAsync(imp->imp_connection->c_peer, count);
1445 /* Adaptive Timeout utils */
1446 extern unsigned int at_min, at_max, at_history;
1448 /* Bin into timeslices using AT_BINS bins.
1449 This gives us a max of the last binlimit*AT_BINS secs without the storage,
1450 but still smoothing out a return to normalcy from a slow response.
1451 (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1452 int at_add(struct adaptive_timeout *at, unsigned int val)
1454 unsigned int old = at->at_current;
1455 time_t now = cfs_time_current_sec();
1456 time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1459 CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1460 val, at, now - at->at_binstart, at->at_current,
1461 at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1464 /* 0's don't count, because we never want our timeout to
1465 drop to 0, and because 0 could mean an error */
1468 spin_lock(&at->at_lock);
1470 if (unlikely(at->at_binstart == 0)) {
1471 /* Special case to remove default from history */
1472 at->at_current = val;
1473 at->at_worst_ever = val;
1474 at->at_worst_time = now;
1475 at->at_hist[0] = val;
1476 at->at_binstart = now;
1477 } else if (now - at->at_binstart < binlimit ) {
1479 at->at_hist[0] = max(val, at->at_hist[0]);
1480 at->at_current = max(val, at->at_current);
1483 unsigned int maxv = val;
1484 /* move bins over */
1485 shift = (now - at->at_binstart) / binlimit;
1487 for(i = AT_BINS - 1; i >= 0; i--) {
1489 at->at_hist[i] = at->at_hist[i - shift];
1490 maxv = max(maxv, at->at_hist[i]);
1495 at->at_hist[0] = val;
1496 at->at_current = maxv;
1497 at->at_binstart += shift * binlimit;
1500 if (at->at_current > at->at_worst_ever) {
1501 at->at_worst_ever = at->at_current;
1502 at->at_worst_time = now;
1505 if (at->at_flags & AT_FLG_NOHIST)
1506 /* Only keep last reported val; keeping the rest of the history
1508 at->at_current = val;
1511 at->at_current = min(at->at_current, at_max);
1512 at->at_current = max(at->at_current, at_min);
1514 if (at->at_current != old)
1515 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1516 "(val=%u) hist %u %u %u %u\n", at,
1517 old, at->at_current, at->at_current - old, val,
1518 at->at_hist[0], at->at_hist[1], at->at_hist[2],
1521 /* if we changed, report the old value */
1522 old = (at->at_current != old) ? old : 0;
1524 spin_unlock(&at->at_lock);
1528 /* Find the imp_at index for a given portal; assign if space available */
1529 int import_at_get_index(struct obd_import *imp, int portal)
1531 struct imp_at *at = &imp->imp_at;
1534 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1535 if (at->iat_portal[i] == portal)
1537 if (at->iat_portal[i] == 0)
1542 /* Not found in list, add it under a lock */
1543 spin_lock(&imp->imp_lock);
1545 /* Check unused under lock */
1546 for (; i < IMP_AT_MAX_PORTALS; i++) {
1547 if (at->iat_portal[i] == portal)
1549 if (at->iat_portal[i] == 0)
1554 /* Not enough portals? */
1555 LASSERT(i < IMP_AT_MAX_PORTALS);
1557 at->iat_portal[i] = portal;
1559 spin_unlock(&imp->imp_lock);