Whamcloud - gitweb
b=4424 Reserve obd_connect_data.ocd_max_easize field
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/import.c
37  *
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_RPC
42 #ifndef __KERNEL__
43 # include <liblustre.h>
44 #endif
45
46 #include <obd_support.h>
47 #include <lustre_ha.h>
48 #include <lustre_net.h>
49 #include <lustre_import.h>
50 #include <lustre_export.h>
51 #include <obd.h>
52 #include <obd_class.h>
53
54 #include "ptlrpc_internal.h"
55
56 struct ptlrpc_connect_async_args {
57          __u64 pcaa_peer_committed;
58         int pcaa_initial_connect;
59 };
60
61 static void __import_set_state(struct obd_import *imp,
62                                enum lustre_imp_state state)
63 {
64         imp->imp_state = state;
65         imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
66         imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
67                 cfs_time_current_sec();
68         imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
69                 IMP_STATE_HIST_LEN;
70 }
71
72 /* A CLOSED import should remain so. */
73 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
74 do {                                                                           \
75         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
76                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
77                       imp, obd2cli_tgt(imp->imp_obd),                          \
78                       ptlrpc_import_state_name(imp->imp_state),                \
79                       ptlrpc_import_state_name(state));                        \
80                __import_set_state(imp, state);                                 \
81         }                                                                      \
82 } while(0)
83
84 #define IMPORT_SET_STATE(imp, state)            \
85 do {                                            \
86         spin_lock(&imp->imp_lock);              \
87         IMPORT_SET_STATE_NOLOCK(imp, state);    \
88         spin_unlock(&imp->imp_lock);            \
89 } while(0)
90
91
92 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
93                                     void * data, int rc);
94 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
95
96 /* Only this function is allowed to change the import state when it is
97  * CLOSED. I would rather refcount the import and free it after
98  * disconnection like we do with exports. To do that, the client_obd
99  * will need to save the peer info somewhere other than in the import,
100  * though. */
101 int ptlrpc_init_import(struct obd_import *imp)
102 {
103         spin_lock(&imp->imp_lock);
104
105         imp->imp_generation++;
106         imp->imp_state =  LUSTRE_IMP_NEW;
107
108         spin_unlock(&imp->imp_lock);
109
110         return 0;
111 }
112 EXPORT_SYMBOL(ptlrpc_init_import);
113
114 #define UUID_STR "_UUID"
115 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
116                       int *uuid_len)
117 {
118         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
119                 ? uuid : uuid + strlen(prefix);
120
121         *uuid_len = strlen(*uuid_start);
122
123         if (*uuid_len < strlen(UUID_STR))
124                 return;
125
126         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
127                     UUID_STR, strlen(UUID_STR)))
128                 *uuid_len -= strlen(UUID_STR);
129 }
130
131 /* Returns true if import was FULL, false if import was already not
132  * connected.
133  * @imp - import to be disconnected
134  * @conn_cnt - connection count (epoch) of the request that timed out
135  *             and caused the disconnection.  In some cases, multiple
136  *             inflight requests can fail to a single target (e.g. OST
137  *             bulk requests) and if one has already caused a reconnection
138  *             (increasing the import->conn_cnt) the older failure should
139  *             not also cause a reconnection.  If zero it forces a reconnect.
140  */
141 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
142 {
143         int rc = 0;
144
145         spin_lock(&imp->imp_lock);
146
147         if (imp->imp_state == LUSTRE_IMP_FULL &&
148             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
149                 char *target_start;
150                 int   target_len;
151
152                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
153                           &target_start, &target_len);
154                 if (imp->imp_replayable) {
155                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
156                                "%s was lost; in progress operations using this "
157                                "service will wait for recovery to complete.\n",
158                                imp->imp_obd->obd_name, target_len, target_start,
159                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
160                 } else {
161                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
162                                "%.*s via nid %s was lost; in progress "
163                                "operations using this service will fail.\n",
164                                imp->imp_obd->obd_name, target_len, target_start,
165                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
166                 }
167                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
168                 spin_unlock(&imp->imp_lock);
169
170                 if (obd_dump_on_timeout)
171                         libcfs_debug_dumplog();
172
173                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
174                 rc = 1;
175         } else {
176                 spin_unlock(&imp->imp_lock);
177                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
178                        imp->imp_client->cli_name, imp,
179                        (imp->imp_state == LUSTRE_IMP_FULL &&
180                         imp->imp_conn_cnt > conn_cnt) ?
181                        "reconnected" : "not connected", imp->imp_conn_cnt,
182                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
183         }
184
185         return rc;
186 }
187
188 /* Must be called with imp_lock held! */
189 static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
190 {
191         ENTRY;
192         LASSERT_SPIN_LOCKED(&imp->imp_lock);
193
194         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
195         imp->imp_invalid = 1;
196         imp->imp_generation++;
197         spin_unlock(&imp->imp_lock);
198
199         ptlrpc_abort_inflight(imp);
200         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
201 }
202
203 /*
204  * This acts as a barrier; all existing requests are rejected, and
205  * no new requests will be accepted until the import is valid again.
206  */
207 void ptlrpc_deactivate_import(struct obd_import *imp)
208 {
209         spin_lock(&imp->imp_lock);
210         ptlrpc_deactivate_and_unlock_import(imp);
211 }
212
213 static unsigned int
214 ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
215 {
216         long dl;
217
218         if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
219               (req->rq_phase == RQ_PHASE_BULK) ||
220               (req->rq_phase == RQ_PHASE_NEW)))
221                 return 0;
222
223         if (req->rq_timedout)
224                 return 0;
225
226         if (req->rq_phase == RQ_PHASE_NEW)
227                 dl = req->rq_sent;
228         else
229                 dl = req->rq_deadline;
230
231         if (dl <= now)
232                 return 0;
233
234         return dl - now;
235 }
236
237 static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
238 {
239         time_t now = cfs_time_current_sec();
240         struct list_head *tmp, *n;
241         struct ptlrpc_request *req;
242         unsigned int timeout = 0;
243
244         spin_lock(&imp->imp_lock);
245         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
246                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
247                 timeout = max(ptlrpc_inflight_deadline(req, now), timeout);
248         }
249         spin_unlock(&imp->imp_lock);
250         return timeout;
251 }
252
253 /*
254  * This function will invalidate the import, if necessary, then block
255  * for all the RPC completions, and finally notify the obd to
256  * invalidate its state (ie cancel locks, clear pending requests,
257  * etc).
258  */
259 void ptlrpc_invalidate_import(struct obd_import *imp)
260 {
261         struct list_head *tmp, *n;
262         struct ptlrpc_request *req;
263         struct l_wait_info lwi;
264         unsigned int timeout;
265         int rc;
266
267         atomic_inc(&imp->imp_inval_count);
268
269         /*
270          * If this is an invalid MGC connection, then don't bother
271          * waiting for imp_inflight to drop to 0.
272          */
273         if (imp->imp_invalid && imp->imp_recon_bk &&!imp->imp_obd->obd_no_recov)
274                 goto out;
275
276         if (!imp->imp_invalid || imp->imp_obd->obd_no_recov)
277                 ptlrpc_deactivate_import(imp);
278
279         LASSERT(imp->imp_invalid);
280
281         /* Wait forever until inflight == 0. We really can't do it another
282          * way because in some cases we need to wait for very long reply
283          * unlink. We can't do anything before that because there is really
284          * no guarantee that some rdma transfer is not in progress right now. */
285         do {
286                 /* Calculate max timeout for waiting on rpcs to error
287                  * out. Use obd_timeout if calculated value is smaller
288                  * than it. */
289                 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
290                         timeout = ptlrpc_inflight_timeout(imp);
291                         timeout += timeout / 3;
292
293                         if (timeout == 0)
294                                 timeout = obd_timeout;
295                 } else {
296                         /* decrease the interval to increase race condition */
297                         timeout = 1;
298                 }
299
300                 CDEBUG(D_RPCTRACE,"Sleeping %d sec for inflight to error out\n",
301                        timeout);
302
303                 /* Wait for all requests to error out and call completion
304                  * callbacks. Cap it at obd_timeout -- these should all
305                  * have been locally cancelled by ptlrpc_abort_inflight. */
306                 lwi = LWI_TIMEOUT_INTERVAL(
307                         cfs_timeout_cap(cfs_time_seconds(timeout)),
308                         (timeout > 1)?cfs_time_seconds(1):cfs_time_seconds(1)/2,
309                         NULL, NULL);
310                 rc = l_wait_event(imp->imp_recovery_waitq,
311                                 (atomic_read(&imp->imp_inflight) == 0), &lwi);
312                 if (rc) {
313                         const char *cli_tgt = obd2cli_tgt(imp->imp_obd);
314
315                         CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
316                                cli_tgt, rc, atomic_read(&imp->imp_inflight));
317
318                         spin_lock(&imp->imp_lock);
319                         if (atomic_read(&imp->imp_inflight) == 0) {
320                                 int count = atomic_read(&imp->imp_unregistering);
321
322                                 /* We know that "unregistering" rpcs only can
323                                  * survive in sending or delaying lists (they
324                                  * maybe waiting for long reply unlink in
325                                  * sluggish nets). Let's check this. If there
326                                  * is no inflight and unregistering != 0, this
327                                  * is bug. */
328                                 LASSERTF(count == 0, "Some RPCs are still "
329                                          "unregistering: %d\n", count);
330
331                                 /* Let's save one loop as soon as inflight have
332                                  * dropped to zero. No new inflights possible at
333                                  * this point. */
334                                 rc = 0;
335                         } else {
336                                 list_for_each_safe(tmp, n,
337                                                    &imp->imp_sending_list) {
338                                         req = list_entry(tmp,
339                                                          struct ptlrpc_request,
340                                                          rq_list);
341                                         DEBUG_REQ(D_ERROR, req,
342                                                   "still on sending list");
343                                 }
344                                 list_for_each_safe(tmp, n,
345                                                    &imp->imp_delayed_list) {
346                                         req = list_entry(tmp,
347                                                          struct ptlrpc_request,
348                                                          rq_list);
349                                         DEBUG_REQ(D_ERROR, req,
350                                                   "still on delayed list");
351                                 }
352
353                                 CERROR("%s: RPCs in \"%s\" phase found (%d). "
354                                        "Network is sluggish? Waiting them "
355                                        "to error out.\n", cli_tgt,
356                                        ptlrpc_phase2str(RQ_PHASE_UNREGISTERING),
357                                        atomic_read(&imp->imp_unregistering));
358                         }
359                         spin_unlock(&imp->imp_lock);
360                 }
361         } while (rc != 0);
362
363         /* Let's additionally check that no new rpcs added to import in
364          * "invalidate" state. */
365         LASSERT(atomic_read(&imp->imp_inflight) == 0);
366
367 out:
368         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
369
370         atomic_dec(&imp->imp_inval_count);
371         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
372 }
373
374 /* unset imp_invalid */
375 void ptlrpc_activate_import(struct obd_import *imp)
376 {
377         struct obd_device *obd = imp->imp_obd;
378
379         spin_lock(&imp->imp_lock);
380         imp->imp_invalid = 0;
381         spin_unlock(&imp->imp_lock);
382
383         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
384 }
385
386 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
387 {
388         ENTRY;
389
390         LASSERT(!imp->imp_dlm_fake);
391
392         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
393                 if (!imp->imp_replayable) {
394                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
395                                "auto-deactivating\n",
396                                obd2cli_tgt(imp->imp_obd),
397                                imp->imp_connection->c_remote_uuid.uuid,
398                                imp->imp_obd->obd_name);
399                         ptlrpc_deactivate_import(imp);
400                 }
401
402                 CDEBUG(D_HA, "%s: waking up pinger\n",
403                        obd2cli_tgt(imp->imp_obd));
404
405                 spin_lock(&imp->imp_lock);
406                 imp->imp_force_verify = 1;
407                 spin_unlock(&imp->imp_lock);
408
409                 ptlrpc_pinger_wake_up();
410         }
411         EXIT;
412 }
413
414 int ptlrpc_reconnect_import(struct obd_import *imp)
415 {
416
417         ptlrpc_set_import_discon(imp, 0);
418         /* Force a new connect attempt */
419         ptlrpc_invalidate_import(imp);
420         /* Do a fresh connect next time by zeroing the handle */
421         ptlrpc_disconnect_import(imp, 1);
422         /* Wait for all invalidate calls to finish */
423         if (atomic_read(&imp->imp_inval_count) > 0) {
424                 int rc;
425                 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
426                 rc = l_wait_event(imp->imp_recovery_waitq,
427                                   (atomic_read(&imp->imp_inval_count) == 0),
428                                   &lwi);
429                 if (rc)
430                         CERROR("Interrupted, inval=%d\n",
431                                atomic_read(&imp->imp_inval_count));
432         }
433
434         /*
435          * Allow reconnect attempts. Note: Currently, the function is
436          * only called by MGC. So assume this is a recoverable import,
437          * and force import to be recoverable. fix this if you need to
438          */
439
440         imp->imp_obd->obd_no_recov = 0;
441         /* Remove 'invalid' flag */
442         ptlrpc_activate_import(imp);
443         /* Attempt a new connect */
444         ptlrpc_recover_import(imp, NULL);
445         return 0;
446 }
447
448 EXPORT_SYMBOL(ptlrpc_reconnect_import);
449
450 static int import_select_connection(struct obd_import *imp)
451 {
452         struct obd_import_conn *imp_conn = NULL, *conn;
453         struct obd_export *dlmexp;
454         int tried_all = 1;
455         ENTRY;
456
457         spin_lock(&imp->imp_lock);
458
459         if (list_empty(&imp->imp_conn_list)) {
460                 CERROR("%s: no connections available\n",
461                         imp->imp_obd->obd_name);
462                 spin_unlock(&imp->imp_lock);
463                 RETURN(-EINVAL);
464         }
465
466         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
467                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
468                        imp->imp_obd->obd_name,
469                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
470                        conn->oic_last_attempt);
471
472                 /* If we have not tried this connection since the
473                    the last successful attempt, go with this one */
474                 if ((conn->oic_last_attempt == 0) ||
475                     cfs_time_beforeq_64(conn->oic_last_attempt,
476                                        imp->imp_last_success_conn)) {
477                         imp_conn = conn;
478                         tried_all = 0;
479                         break;
480                 }
481
482                 /* If all of the connections have already been tried
483                    since the last successful connection; just choose the
484                    least recently used */
485                 if (!imp_conn)
486                         imp_conn = conn;
487                 else if (cfs_time_before_64(conn->oic_last_attempt,
488                                             imp_conn->oic_last_attempt))
489                         imp_conn = conn;
490         }
491
492         /* if not found, simply choose the current one */
493         if (!imp_conn || imp->imp_force_reconnect) {
494                 LASSERT(imp->imp_conn_current);
495                 imp_conn = imp->imp_conn_current;
496                 tried_all = 0;
497         }
498         LASSERT(imp_conn->oic_conn);
499
500         /* If we've tried everything, and we're back to the beginning of the
501            list, increase our timeout and try again. It will be reset when
502            we do finally connect. (FIXME: really we should wait for all network
503            state associated with the last connection attempt to drain before
504            trying to reconnect on it.) */
505         if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
506             !imp->imp_recon_bk /* not retrying */) {
507                 if (at_get(&imp->imp_at.iat_net_latency) <
508                     CONNECTION_SWITCH_MAX) {
509                         at_measured(&imp->imp_at.iat_net_latency,
510                                     MIN(at_get(&imp->imp_at.iat_net_latency) +
511                                     CONNECTION_SWITCH_INC,
512                                     CONNECTION_SWITCH_MAX));
513                 }
514                 LASSERT(imp_conn->oic_last_attempt);
515                 CWARN("%s: tried all connections, increasing latency to %ds\n",
516                       imp->imp_obd->obd_name,
517                       at_get(&imp->imp_at.iat_net_latency));
518         }
519
520         imp_conn->oic_last_attempt = cfs_time_current_64();
521
522         /* switch connection, don't mind if it's same as the current one */
523         if (imp->imp_connection)
524                 ptlrpc_connection_put(imp->imp_connection);
525         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
526
527         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
528         LASSERT(dlmexp != NULL);
529         if (dlmexp->exp_connection)
530                 ptlrpc_connection_put(dlmexp->exp_connection);
531         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
532         class_export_put(dlmexp);
533
534         if (imp->imp_conn_current != imp_conn) {
535                 if (imp->imp_conn_current)
536                         CDEBUG(D_HA, "Changing connection for %s to %s/%s\n",
537                                imp->imp_obd->obd_name, imp_conn->oic_uuid.uuid,
538                                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
539                 imp->imp_conn_current = imp_conn;
540         }
541
542         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
543                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
544                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
545
546         spin_unlock(&imp->imp_lock);
547
548         RETURN(0);
549 }
550
551 /**
552  * must be called under imp lock
553  */
554 static int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
555 {
556         struct ptlrpc_request *req;
557         struct list_head *tmp;
558
559         if (list_empty(&imp->imp_replay_list))
560                 return 0;
561         tmp = imp->imp_replay_list.next;
562         req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
563         *transno = req->rq_transno;
564         if (req->rq_transno == 0) {
565                 DEBUG_REQ(D_ERROR, req, "zero transno in replay");
566                 LBUG();
567         }
568
569         return 1;
570 }
571
572 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
573 {
574         struct obd_device *obd = imp->imp_obd;
575         int set_transno = 0;
576         int initial_connect = 0;
577         int rc;
578         __u64 committed_before_reconnect = 0;
579         struct ptlrpc_request *request;
580         __u32 size[] = { sizeof(struct ptlrpc_body),
581                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
582                        sizeof(obd->obd_uuid),
583                        sizeof(imp->imp_dlm_handle),
584                        sizeof(imp->imp_connect_data) };
585         char *tmp[] = { NULL,
586                         obd2cli_tgt(imp->imp_obd),
587                         obd->obd_uuid.uuid,
588                         (char *)&imp->imp_dlm_handle,
589                         (char *)&imp->imp_connect_data };
590         struct ptlrpc_connect_async_args *aa;
591
592         ENTRY;
593         spin_lock(&imp->imp_lock);
594         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
595                 spin_unlock(&imp->imp_lock);
596                 CERROR("can't connect to a closed import\n");
597                 RETURN(-EINVAL);
598         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
599                 spin_unlock(&imp->imp_lock);
600                 CERROR("already connected\n");
601                 RETURN(0);
602         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
603                 spin_unlock(&imp->imp_lock);
604                 CERROR("already connecting\n");
605                 RETURN(-EALREADY);
606         }
607
608         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
609
610         imp->imp_conn_cnt++;
611         imp->imp_resend_replay = 0;
612
613         if (!lustre_handle_is_used(&imp->imp_remote_handle))
614                 initial_connect = 1;
615         else
616                 committed_before_reconnect = imp->imp_peer_committed_transno;
617
618         set_transno = ptlrpc_first_transno(imp,
619                                            &imp->imp_connect_data.ocd_transno);
620
621         spin_unlock(&imp->imp_lock);
622
623         if (new_uuid) {
624                 struct obd_uuid uuid;
625
626                 obd_str2uuid(&uuid, new_uuid);
627                 rc = import_set_conn_priority(imp, &uuid);
628                 if (rc)
629                         GOTO(out, rc);
630         }
631
632         rc = import_select_connection(imp);
633         if (rc)
634                 GOTO(out, rc);
635
636         /* last in connection list */
637         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
638                 if (imp->imp_initial_recov_bk && initial_connect) {
639                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
640                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
641                         /* Don't retry if connect fails */
642                         rc = 0;
643                         obd_set_info_async(obd->obd_self_export,
644                                            sizeof(KEY_INIT_RECOV),
645                                            KEY_INIT_RECOV,
646                                            sizeof(rc), &rc, NULL);
647                 }
648                 if (imp->imp_recon_bk) {
649                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
650                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
651                         spin_lock(&imp->imp_lock);
652                         imp->imp_last_recon = 1;
653                         spin_unlock(&imp->imp_lock);
654                 }
655         }
656
657         /* Reset connect flags to the originally requested flags, in case
658          * the server is updated on-the-fly we will get the new features. */
659         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
660         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
661
662         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
663                            &obd->obd_uuid, &imp->imp_connect_data, NULL);
664         if (rc)
665                 GOTO(out, rc);
666
667         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
668                                   5, size, tmp);
669         if (!request)
670                 GOTO(out, rc = -ENOMEM);
671
672         /* Report the rpc service time to the server so that it knows how long
673          * to wait for clients to join recovery */
674         lustre_msg_set_service_time(request->rq_reqmsg,
675                                     at_timeout2est(request->rq_timeout));
676
677         /* The amount of time we give the server to process the connect req.
678          * import_select_connection will increase the net latency on
679          * repeated reconnect attempts to cover slow networks.
680          * We override/ignore the server rpc completion estimate here,
681          * which may be large if this is a reconnect attempt */
682         request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
683         lustre_msg_set_timeout(request->rq_reqmsg, request->rq_timeout);
684
685 #ifndef __KERNEL__
686         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
687 #endif
688         if (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V1)
689                 lustre_msg_add_op_flags(request->rq_reqmsg,
690                                         MSG_CONNECT_NEXT_VER);
691
692         request->rq_no_resend = request->rq_no_delay = 1;
693         request->rq_send_state = LUSTRE_IMP_CONNECTING;
694         /* Allow a slightly larger reply for future growth compatibility */
695         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
696                               16 * sizeof(__u64);
697         ptlrpc_req_set_repsize(request, 2, size);
698         request->rq_interpret_reply = ptlrpc_connect_interpret;
699
700         CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args));
701         aa = ptlrpc_req_async_args(request);
702         memset(aa, 0, sizeof *aa);
703
704         aa->pcaa_peer_committed = committed_before_reconnect;
705         aa->pcaa_initial_connect = initial_connect;
706         if (aa->pcaa_initial_connect) {
707                 spin_lock(&imp->imp_lock);
708                 imp->imp_replayable = 1;
709                 spin_unlock(&imp->imp_lock);
710                 lustre_msg_add_op_flags(request->rq_reqmsg,
711                                         MSG_CONNECT_INITIAL);
712         }
713
714         if (set_transno)
715                 lustre_msg_add_op_flags(request->rq_reqmsg,
716                                         MSG_CONNECT_TRANSNO);
717
718         DEBUG_REQ(D_RPCTRACE, request, "%sconnect request %d",
719                   aa->pcaa_initial_connect ? "initial " : "re",
720                   imp->imp_conn_cnt);
721         ptlrpcd_add_req(request);
722         rc = 0;
723 out:
724         if (rc != 0) {
725                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
726         }
727
728         RETURN(rc);
729 }
730 EXPORT_SYMBOL(ptlrpc_connect_import);
731
732 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
733 {
734 #ifdef __KERNEL__
735         /* the pinger takes care of issuing the next reconnect request */
736         return;
737 #else
738         /* liblustre has no pinger thead, so we wakup pinger anyway */
739         ptlrpc_pinger_wake_up();
740 #endif
741 }
742
743 static int ptlrpc_busy_reconnect(int rc)
744 {
745         return (rc == -EBUSY) || (rc == -EAGAIN);
746 }
747
748 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
749                                     void * data, int rc)
750 {
751         struct ptlrpc_connect_async_args *aa = data;
752         struct obd_import *imp = request->rq_import;
753         struct client_obd *cli = &imp->imp_obd->u.cli;
754         struct lustre_handle old_hdl;
755         __u64 old_connect_flags;
756         int msg_flags;
757         ENTRY;
758
759         spin_lock(&imp->imp_lock);
760         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
761                 spin_unlock(&imp->imp_lock);
762                 RETURN(0);
763         }
764
765         if (rc) {
766                 /* if this reconnect to busy export - not need select new target
767                  * for connecting*/
768                 imp->imp_force_reconnect = ptlrpc_busy_reconnect(rc);
769                 spin_unlock(&imp->imp_lock);
770                 GOTO(out, rc);
771         }
772
773         LASSERT(imp->imp_conn_current);
774
775         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
776
777         /* All imports are pingable */
778         imp->imp_pingable = 1;
779         imp->imp_force_reconnect = 0;
780
781         if (aa->pcaa_initial_connect) {
782                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
783                         imp->imp_replayable = 1;
784                         spin_unlock(&imp->imp_lock);
785                         CDEBUG(D_HA, "connected to replayable target: %s\n",
786                                obd2cli_tgt(imp->imp_obd));
787                 } else {
788                         imp->imp_replayable = 0;
789                         spin_unlock(&imp->imp_lock);
790                 }
791
792                 if ((request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V1 &&
793                      msg_flags & MSG_CONNECT_NEXT_VER) ||
794                     request->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) {
795                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
796                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
797                                obd2cli_tgt(imp->imp_obd));
798                 } else {
799                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
800                                obd2cli_tgt(imp->imp_obd));
801                 }
802
803                 imp->imp_remote_handle =
804                                 *lustre_msg_get_handle(request->rq_repmsg);
805
806                 /* Initial connects are allowed for clients with non-random
807                  * uuids when servers are in recovery.  Simply signal the
808                  * servers replay is complete and wait in REPLAY_WAIT. */
809                 if (msg_flags & MSG_CONNECT_RECOVERING) {
810                         CDEBUG(D_HA, "connect to %s during recovery\n",
811                                obd2cli_tgt(imp->imp_obd));
812                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
813                 } else {
814                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
815                         ptlrpc_activate_import(imp);
816                 }
817                 GOTO(finish, rc = 0);
818         } else {
819                 spin_unlock(&imp->imp_lock);
820         }
821
822         /* Determine what recovery state to move the import to. */
823         if (MSG_CONNECT_RECONNECT & msg_flags) {
824                 memset(&old_hdl, 0, sizeof(old_hdl));
825                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
826                             sizeof (old_hdl))) {
827                         CERROR("%s@%s didn't like our handle "LPX64
828                                ", failed\n", obd2cli_tgt(imp->imp_obd),
829                                imp->imp_connection->c_remote_uuid.uuid,
830                                imp->imp_dlm_handle.cookie);
831                         GOTO(out, rc = -ENOTCONN);
832                 }
833
834                 if (memcmp(&imp->imp_remote_handle,
835                            lustre_msg_get_handle(request->rq_repmsg),
836                            sizeof(imp->imp_remote_handle))) {
837                         int level = msg_flags & MSG_CONNECT_RECOVERING ?
838                                 D_HA : D_WARNING;
839
840                         /* Bug 16611/14775: if server handle have changed,
841                          * that means some sort of disconnection happened.
842                          * If the server is not in recovery, that also means it
843                          * already erased all of our state because of previous
844                          * eviction. If it is in recovery - we are safe to
845                          * participate since we can reestablish all of our state
846                          * with server again */
847                         CDEBUG(level,"%s@%s changed server handle from "
848                                      LPX64" to "LPX64"%s\n",
849                                      obd2cli_tgt(imp->imp_obd),
850                                      imp->imp_connection->c_remote_uuid.uuid,
851                                      imp->imp_remote_handle.cookie,
852                                      lustre_msg_get_handle(request->rq_repmsg)->
853                                                                         cookie,
854                                      (MSG_CONNECT_RECOVERING & msg_flags) ?
855                                          " but is still in recovery" : "");
856
857                         imp->imp_remote_handle =
858                                      *lustre_msg_get_handle(request->rq_repmsg);
859
860                         if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
861                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
862                                 GOTO(finish, rc = 0);
863                         }
864
865                 } else {
866                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
867                                obd2cli_tgt(imp->imp_obd),
868                                imp->imp_connection->c_remote_uuid.uuid);
869                 }
870
871                 if (imp->imp_invalid) {
872                         CDEBUG(D_HA, "%s: reconnected but import is invalid; "
873                                "marking evicted\n", imp->imp_obd->obd_name);
874                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
875                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
876                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
877                                imp->imp_obd->obd_name,
878                                obd2cli_tgt(imp->imp_obd));
879
880                         spin_lock(&imp->imp_lock);
881                         imp->imp_resend_replay = 1;
882                         /* VBR: delayed connection */
883                         if (MSG_CONNECT_DELAYED & msg_flags) {
884                                 imp->imp_delayed_recovery = 1;
885                                 imp->imp_no_lock_replay = 1;
886                         }
887                         spin_unlock(&imp->imp_lock);
888
889                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
890                 } else {
891                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
892                 }
893         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
894                 LASSERT(imp->imp_replayable);
895                 imp->imp_remote_handle =
896                                 *lustre_msg_get_handle(request->rq_repmsg);
897                 imp->imp_last_replay_transno = 0;
898                 /* VBR: delayed connection */
899                 if (MSG_CONNECT_DELAYED & msg_flags) {
900                         spin_lock(&imp->imp_lock);
901                         imp->imp_delayed_recovery = 1;
902                         imp->imp_no_lock_replay = 1;
903                         spin_unlock(&imp->imp_lock);
904                 }
905                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
906         } else {
907                 DEBUG_REQ(D_HA, request, "evicting (not initial connect and "
908                           "flags reconnect/recovering not set: %x)",msg_flags);
909                 imp->imp_remote_handle =
910                                 *lustre_msg_get_handle(request->rq_repmsg);
911                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
912         }
913
914         /* Sanity checks for a reconnected import. */
915         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
916                 CERROR("imp_replayable flag does not match server "
917                        "after reconnect. We should LBUG right here.\n");
918         }
919
920         if (lustre_msg_get_last_committed(request->rq_repmsg) > 0 &&
921             lustre_msg_get_last_committed(request->rq_repmsg) <
922             aa->pcaa_peer_committed) {
923                 CERROR("%s went back in time (transno "LPD64
924                        " was previously committed, server now claims "LPD64
925                        ")!  See https://bugzilla.lustre.org/show_bug.cgi?"
926                        "id=9646\n",
927                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
928                        lustre_msg_get_last_committed(request->rq_repmsg));
929         }
930
931 finish:
932         rc = ptlrpc_import_recovery_state_machine(imp);
933         if (rc != 0) {
934                 if (rc == -ENOTCONN) {
935                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
936                                "invalidating and reconnecting\n",
937                                obd2cli_tgt(imp->imp_obd),
938                                imp->imp_connection->c_remote_uuid.uuid);
939                         ptlrpc_connect_import(imp, NULL);
940                         RETURN(0);
941                 }
942         } else {
943                 struct obd_connect_data *ocd;
944                 struct obd_export *exp;
945
946                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
947                                          lustre_swab_connect);
948                 spin_lock(&imp->imp_lock);
949                 list_del(&imp->imp_conn_current->oic_item);
950                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
951                 imp->imp_last_success_conn =
952                         imp->imp_conn_current->oic_last_attempt;
953
954                 if (ocd == NULL) {
955                         spin_unlock(&imp->imp_lock);
956                         CERROR("Wrong connect data from server\n");
957                         rc = -EPROTO;
958                         GOTO(out, rc);
959                 }
960
961                 imp->imp_connect_data = *ocd;
962
963                 exp = class_conn2export(&imp->imp_dlm_handle);
964                 spin_unlock(&imp->imp_lock);
965
966                 /* check that server granted subset of flags we asked for. */
967                 LASSERTF((ocd->ocd_connect_flags &
968                           imp->imp_connect_flags_orig) ==
969                          ocd->ocd_connect_flags, LPX64" != "LPX64,
970                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
971
972                 if (!exp) {
973                         /* This could happen if export is cleaned during the
974                            connect attempt */
975                         CERROR("Missing export for %s\n",
976                                imp->imp_obd->obd_name);
977                         GOTO(out, rc = -ENODEV);
978                 }
979                 old_connect_flags = exp->exp_connect_flags;
980                 exp->exp_connect_flags = ocd->ocd_connect_flags;
981                 imp->imp_obd->obd_self_export->exp_connect_flags =
982                         ocd->ocd_connect_flags;
983                 class_export_put(exp);
984
985                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
986
987                 if (!ocd->ocd_ibits_known &&
988                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
989                         CERROR("Inodebits aware server returned zero compatible"
990                                " bits?\n");
991
992                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
993                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
994                                         LUSTRE_VERSION_OFFSET_WARN ||
995                      ocd->ocd_version < LUSTRE_VERSION_CODE -
996                                         LUSTRE_VERSION_OFFSET_WARN)) {
997                         /* Sigh, some compilers do not like #ifdef in the middle
998                            of macro arguments */
999 #ifdef __KERNEL__
1000                         const char *older =
1001                                 "older.  Consider upgrading this client";
1002 #else
1003                         const char *older =
1004                                 "older.  Consider recompiling this application";
1005 #endif
1006                         const char *newer = "newer than client version";
1007
1008                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
1009                                       "is much %s (%s)\n",
1010                                       obd2cli_tgt(imp->imp_obd),
1011                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1012                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1013                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1014                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
1015                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
1016                                       newer : older, LUSTRE_VERSION_STRING);
1017                 }
1018
1019                 if (ocd->ocd_connect_flags & OBD_CONNECT_CKSUM) {
1020                         /* We sent to the server ocd_cksum_types with bits set
1021                          * for algorithms we understand. The server masked off
1022                          * the checksum types it doesn't support */
1023                         if ((ocd->ocd_cksum_types & OBD_CKSUM_ALL) == 0) {
1024                                 LCONSOLE_WARN("The negotiation of the checksum "
1025                                               "alogrithm to use with server %s "
1026                                               "failed (%x/%x), disabling "
1027                                               "checksums\n",
1028                                               obd2cli_tgt(imp->imp_obd),
1029                                               ocd->ocd_cksum_types,
1030                                               OBD_CKSUM_ALL);
1031                                 cli->cl_checksum = 0;
1032                                 cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1033                                 cli->cl_cksum_type = OBD_CKSUM_CRC32;
1034                         } else {
1035                                 cli->cl_supp_cksum_types = ocd->ocd_cksum_types;
1036
1037                                 if (ocd->ocd_cksum_types & OSC_DEFAULT_CKSUM)
1038                                         cli->cl_cksum_type = OSC_DEFAULT_CKSUM;
1039                                 else if (ocd->ocd_cksum_types & OBD_CKSUM_ADLER)
1040                                         cli->cl_cksum_type = OBD_CKSUM_ADLER;
1041                                 else
1042                                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1043                         }
1044                 } else {
1045                         /* The server does not support OBD_CONNECT_CKSUM.
1046                          * Enforce CRC32 for backward compatibility*/
1047                         cli->cl_supp_cksum_types = OBD_CKSUM_CRC32;
1048                         cli->cl_cksum_type = OBD_CKSUM_CRC32;
1049                 }
1050
1051                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
1052                         cli->cl_max_pages_per_rpc =
1053                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
1054                 }
1055
1056                 /* Reset ns_connect_flags only for initial connect. It might be
1057                  * changed in while using FS and if we reset it in reconnect
1058                  * this leads to lossing user settings done before such as
1059                  * disable lru_resize, etc. */
1060                 if (old_connect_flags != exp->exp_connect_flags ||
1061                     aa->pcaa_initial_connect) {
1062                         CDEBUG(D_HA, "%s: Resetting ns_connect_flags to server "
1063                                "flags: "LPX64"\n", imp->imp_obd->obd_name,
1064                                ocd->ocd_connect_flags);
1065                         imp->imp_obd->obd_namespace->ns_connect_flags =
1066                                 ocd->ocd_connect_flags;
1067                         imp->imp_obd->obd_namespace->ns_orig_connect_flags =
1068                                 ocd->ocd_connect_flags;
1069                 }
1070
1071                 if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
1072                     (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
1073                         /* We need a per-message support flag, because
1074                            a. we don't know if the incoming connect reply
1075                               supports AT or not (in reply_in_callback)
1076                               until we unpack it.
1077                            b. failovered server means export and flags are gone
1078                               (in ptlrpc_send_reply).
1079                            Can only be set when we know AT is supported at
1080                            both ends */
1081                         imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
1082                 else
1083                         imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
1084
1085                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
1086                         (cli->cl_max_pages_per_rpc > 0));
1087         }
1088
1089  out:
1090         if (rc != 0) {
1091                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
1092                 spin_lock(&imp->imp_lock);
1093                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov &&
1094                     (request->rq_import_generation == imp->imp_generation))
1095                         ptlrpc_deactivate_and_unlock_import(imp);
1096                 else
1097                         spin_unlock(&imp->imp_lock);
1098
1099                 if (imp->imp_recon_bk && imp->imp_last_recon) {
1100                         /* Give up trying to reconnect */
1101                         imp->imp_obd->obd_no_recov = 1;
1102                         ptlrpc_deactivate_import(imp);
1103                 }
1104
1105                 if (rc == -EPROTO) {
1106                         struct obd_connect_data *ocd;
1107                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
1108                                                  sizeof *ocd,
1109                                                  lustre_swab_connect);
1110                         if (ocd &&
1111                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
1112                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
1113                            /* Actually servers are only supposed to refuse
1114                               connection from liblustre clients, so we should
1115                               never see this from VFS context */
1116                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
1117                                         "(%d.%d.%d.%d)"
1118                                         " refused connection from this client "
1119                                         "with an incompatible version (%s).  "
1120                                         "Client must be recompiled\n",
1121                                         obd2cli_tgt(imp->imp_obd),
1122                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
1123                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
1124                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
1125                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
1126                                         LUSTRE_VERSION_STRING);
1127                                 ptlrpc_deactivate_import(imp);
1128                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
1129                         }
1130                         RETURN(-EPROTO);
1131                 }
1132
1133                 ptlrpc_maybe_ping_import_soon(imp);
1134
1135                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
1136                        obd2cli_tgt(imp->imp_obd),
1137                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
1138         }
1139
1140         spin_lock(&imp->imp_lock);
1141         imp->imp_last_recon = 0;
1142         spin_unlock(&imp->imp_lock);
1143
1144         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1145         RETURN(rc);
1146 }
1147
1148 static int completed_replay_interpret(struct ptlrpc_request *req,
1149                                       void * data, int rc)
1150 {
1151         ENTRY;
1152         atomic_dec(&req->rq_import->imp_replay_inflight);
1153         if (req->rq_status == 0 &&
1154             !req->rq_import->imp_vbr_failed) {
1155                 ptlrpc_import_recovery_state_machine(req->rq_import);
1156         } else {
1157                 if (req->rq_import->imp_vbr_failed) {
1158                         CDEBUG(D_WARNING,
1159                                "%s: version recovery fails, reconnecting\n",
1160                                req->rq_import->imp_obd->obd_name);
1161                 } else {
1162                         CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
1163                                      "reconnecting\n",
1164                                req->rq_import->imp_obd->obd_name,
1165                                req->rq_status);
1166                 }
1167                 ptlrpc_connect_import(req->rq_import, NULL);
1168         }
1169         RETURN(0);
1170 }
1171
1172 static int signal_completed_replay(struct obd_import *imp)
1173 {
1174         struct ptlrpc_request *req;
1175         ENTRY;
1176
1177         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
1178         atomic_inc(&imp->imp_replay_inflight);
1179
1180         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
1181         if (!req) {
1182                 atomic_dec(&imp->imp_replay_inflight);
1183                 RETURN(-ENOMEM);
1184         }
1185
1186         ptlrpc_req_set_repsize(req, 1, NULL);
1187         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
1188         lustre_msg_add_flags(req->rq_reqmsg,
1189                              MSG_LOCK_REPLAY_DONE |
1190                              MSG_REQ_REPLAY_DONE |
1191                              MSG_LAST_REPLAY);
1192
1193         if (imp->imp_delayed_recovery)
1194                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
1195         req->rq_interpret_reply = completed_replay_interpret;
1196
1197         if (AT_OFF)
1198                 req->rq_timeout *= 3;
1199
1200         ptlrpcd_add_req(req);
1201         RETURN(0);
1202 }
1203
1204 #ifdef __KERNEL__
1205 static int ptlrpc_invalidate_import_thread(void *data)
1206 {
1207         struct obd_import *imp = data;
1208
1209         ENTRY;
1210
1211         cfs_daemonize_ctxt("ll_imp_inval");
1212
1213         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
1214                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
1215                imp->imp_connection->c_remote_uuid.uuid);
1216
1217         ptlrpc_invalidate_import(imp);
1218
1219         if (obd_dump_on_eviction) {
1220                 CERROR("dump the log upon eviction\n");
1221                 libcfs_debug_dumplog();
1222         }
1223
1224         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1225         ptlrpc_import_recovery_state_machine(imp);
1226
1227         class_import_put(imp);
1228         RETURN(0);
1229 }
1230 #endif
1231
1232 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
1233 {
1234         int rc = 0;
1235         int inflight;
1236         char *target_start;
1237         int target_len;
1238
1239         ENTRY;
1240         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
1241                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1242                           &target_start, &target_len);
1243                 /* Don't care about MGC eviction */
1244                 if (strcmp(imp->imp_obd->obd_type->typ_name,
1245                            LUSTRE_MGC_NAME) != 0) {
1246                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
1247                                            "%.*s; in progress operations using "
1248                                            "this service will fail.\n",
1249                                            target_len, target_start);
1250                 }
1251                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
1252                        obd2cli_tgt(imp->imp_obd),
1253                        imp->imp_connection->c_remote_uuid.uuid);
1254                 /* reset vbr_failed flag upon eviction */
1255                 spin_lock(&imp->imp_lock);
1256                 imp->imp_vbr_failed = 0;
1257                 spin_unlock(&imp->imp_lock);
1258
1259 #ifdef __KERNEL__
1260                 /* bug 17802:  XXX client_disconnect_export vs connect request
1261                  * race. if client will evicted at this time, we start
1262                  * invalidate thread without referece to import and import can
1263                  * be freed at same time. */
1264                 class_import_get(imp);
1265                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
1266                                    CLONE_VM | CLONE_FILES);
1267                 if (rc < 0) {
1268                         class_import_put(imp);
1269                         CERROR("error starting invalidate thread: %d\n", rc);
1270                 } else {
1271                         rc = 0;
1272                 }
1273                 RETURN(rc);
1274 #else
1275                 ptlrpc_invalidate_import(imp);
1276
1277                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1278 #endif
1279         }
1280
1281         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
1282                 CDEBUG(D_HA, "replay requested by %s\n",
1283                        obd2cli_tgt(imp->imp_obd));
1284                 rc = ptlrpc_replay_next(imp, &inflight);
1285                 if (inflight == 0 &&
1286                     atomic_read(&imp->imp_replay_inflight) == 0) {
1287                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
1288                         rc = ldlm_replay_locks(imp);
1289                         if (rc)
1290                                 GOTO(out, rc);
1291                 }
1292                 rc = 0;
1293         }
1294
1295         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
1296                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1297                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
1298                         rc = signal_completed_replay(imp);
1299                         if (rc)
1300                                 GOTO(out, rc);
1301                 }
1302
1303         }
1304
1305         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
1306                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
1307                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
1308                 }
1309         }
1310
1311         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
1312                 CDEBUG(D_HA, "reconnected to %s@%s\n",
1313                        obd2cli_tgt(imp->imp_obd),
1314                        imp->imp_connection->c_remote_uuid.uuid);
1315
1316                 rc = ptlrpc_resend(imp);
1317                 if (rc)
1318                         GOTO(out, rc);
1319                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
1320                 ptlrpc_activate_import(imp);
1321
1322                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
1323                           &target_start, &target_len);
1324                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
1325                               "using nid %s.\n", imp->imp_obd->obd_name,
1326                               target_len, target_start,
1327                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
1328         }
1329
1330         if (imp->imp_state == LUSTRE_IMP_FULL) {
1331                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1332                 ptlrpc_wake_delayed(imp);
1333         }
1334
1335  out:
1336         RETURN(rc);
1337 }
1338
1339 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
1340 {
1341         struct ptlrpc_request *req;
1342         int rq_opc, rc = 0;
1343         int nowait = imp->imp_obd->obd_force;
1344         ENTRY;
1345
1346         if (nowait)
1347                 GOTO(set_state, rc);
1348
1349         switch (imp->imp_connect_op) {
1350         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
1351         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
1352         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
1353         default:
1354                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
1355                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
1356                 RETURN(-EINVAL);
1357         }
1358
1359         if (ptlrpc_import_in_recovery(imp)) {
1360                 struct l_wait_info lwi;
1361                 cfs_duration_t timeout;
1362
1363                 if (AT_OFF) {
1364                         timeout = cfs_time_seconds(obd_timeout);
1365                 } else {
1366                         int idx = import_at_get_index(imp,
1367                                 imp->imp_client->cli_request_portal);
1368                         timeout = cfs_time_seconds(
1369                                 at_get(&imp->imp_at.iat_service_estimate[idx]));
1370                 }
1371                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout),
1372                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
1373                 rc = l_wait_event(imp->imp_recovery_waitq,
1374                                   !ptlrpc_import_in_recovery(imp), &lwi);
1375         }
1376
1377         spin_lock(&imp->imp_lock);
1378         if (imp->imp_state != LUSTRE_IMP_FULL)
1379                 GOTO(out, 0);
1380
1381         spin_unlock(&imp->imp_lock);
1382
1383         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1384         if (req) {
1385                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1386                  * it fails.  We can get through the above with a down server
1387                  * if the client doesn't know the server is gone yet. */
1388                 req->rq_no_resend = 1;
1389
1390 #ifndef CRAY_XT3
1391                 /* We want client umounts to happen quickly, no matter the
1392                    server state... */
1393                 req->rq_timeout = min_t(int, req->rq_timeout,
1394                                         INITIAL_CONNECT_TIMEOUT);
1395 #else
1396                 /* ... but we always want liblustre clients to nicely
1397                    disconnect, so only use the adaptive value. */
1398                 if (AT_OFF)
1399                         req->rq_timeout = obd_timeout / 3;
1400 #endif
1401
1402                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1403                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1404                 ptlrpc_req_set_repsize(req, 1, NULL);
1405                 rc = ptlrpc_queue_wait(req);
1406                 ptlrpc_req_finished(req);
1407         }
1408
1409 set_state:
1410         spin_lock(&imp->imp_lock);
1411 out:
1412         if (noclose)
1413                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1414         else
1415                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1416         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1417         /* Try all connections in the future - bz 12758 */
1418         imp->imp_last_recon = 0;
1419         spin_unlock(&imp->imp_lock);
1420
1421         RETURN(rc);
1422 }
1423
1424 /* Sets maximal number of RPCs possible originating from other side of this
1425    import (server) to us and number of async RPC replies that we are not waiting
1426    for arriving */
1427 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1428 {
1429         LNetSetAsync(imp->imp_connection->c_peer, count);
1430 }
1431
1432 void ptlrpc_evict_imp(struct obd_import *imp)
1433 {
1434         ENTRY;
1435         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
1436         EXIT;
1437 }
1438
1439 void ptlrpc_cleanup_imp(struct obd_import *imp)
1440 {
1441         ENTRY;
1442
1443         spin_lock(&imp->imp_lock);
1444         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1445         imp->imp_generation++;
1446         spin_unlock(&imp->imp_lock);
1447         ptlrpc_abort_inflight(imp);
1448
1449         EXIT;
1450 }
1451
1452 /* Adaptive Timeout utils */
1453 extern unsigned int at_min, at_max, at_history;
1454
1455 /* Bin into timeslices using AT_BINS bins.
1456    This gives us a max of the last binlimit*AT_BINS secs without the storage,
1457    but still smoothing out a return to normalcy from a slow response.
1458    (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
1459 int at_measured(struct adaptive_timeout *at, unsigned int val)
1460 {
1461         unsigned int old = at->at_current;
1462         time_t now = cfs_time_current_sec();
1463         time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
1464
1465         LASSERT(at);
1466         CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
1467                val, at, now - at->at_binstart, at->at_current,
1468                at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
1469
1470         if (val == 0)
1471                 /* 0's don't count, because we never want our timeout to
1472                    drop to 0, and because 0 could mean an error */
1473                 return 0;
1474
1475         spin_lock(&at->at_lock);
1476
1477         if (unlikely(at->at_binstart == 0)) {
1478                 /* Special case to remove default from history */
1479                 at->at_current = val;
1480                 at->at_worst_ever = val;
1481                 at->at_worst_time = now;
1482                 at->at_hist[0] = val;
1483                 at->at_binstart = now;
1484         } else if (now - at->at_binstart < binlimit ) {
1485                 /* in bin 0 */
1486                 at->at_hist[0] = max(val, at->at_hist[0]);
1487                 at->at_current = max(val, at->at_current);
1488         } else {
1489                 int i, shift;
1490                 unsigned int maxv = val;
1491                 /* move bins over */
1492                 shift = (now - at->at_binstart) / binlimit;
1493                 LASSERT(shift > 0);
1494                 for(i = AT_BINS - 1; i >= 0; i--) {
1495                         if (i >= shift) {
1496                                 at->at_hist[i] = at->at_hist[i - shift];
1497                                 maxv = max(maxv, at->at_hist[i]);
1498                         } else {
1499                                 at->at_hist[i] = 0;
1500                         }
1501                 }
1502                 at->at_hist[0] = val;
1503                 at->at_current = maxv;
1504                 at->at_binstart += shift * binlimit;
1505         }
1506
1507         if (at->at_current > at->at_worst_ever) {
1508                 at->at_worst_ever = at->at_current;
1509                 at->at_worst_time = now;
1510         }
1511
1512         if (at->at_flags & AT_FLG_NOHIST)
1513                 /* Only keep last reported val; keeping the rest of the history
1514                    for proc only */
1515                 at->at_current = val;
1516
1517         if (at_max > 0)
1518                 at->at_current =  min(at->at_current, at_max);
1519         at->at_current =  max(at->at_current, at_min);
1520
1521         if (at->at_current != old)
1522                 CDEBUG(D_OTHER, "AT %p change: old=%u new=%u delta=%d "
1523                        "(val=%u) hist %u %u %u %u\n", at,
1524                        old, at->at_current, at->at_current - old, val,
1525                        at->at_hist[0], at->at_hist[1], at->at_hist[2],
1526                        at->at_hist[3]);
1527
1528         /* if we changed, report the old value */
1529         old = (at->at_current != old) ? old : 0;
1530
1531         spin_unlock(&at->at_lock);
1532         return old;
1533 }
1534
1535 /* Find the imp_at index for a given portal; assign if space available */
1536 int import_at_get_index(struct obd_import *imp, int portal)
1537 {
1538         struct imp_at *at = &imp->imp_at;
1539         int i;
1540
1541         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1542                 if (at->iat_portal[i] == portal)
1543                         return i;
1544                 if (at->iat_portal[i] == 0)
1545                         /* unused */
1546                         break;
1547         }
1548
1549         /* Not found in list, add it under a lock */
1550         spin_lock(&imp->imp_lock);
1551
1552         /* Check unused under lock */
1553         for (; i < IMP_AT_MAX_PORTALS; i++) {
1554                 if (at->iat_portal[i] == portal)
1555                         goto out;
1556                 if (at->iat_portal[i] == 0)
1557                         /* unused */
1558                         break;
1559         }
1560
1561         /* Not enough portals? */
1562         LASSERT(i < IMP_AT_MAX_PORTALS);
1563
1564         at->iat_portal[i] = portal;
1565 out:
1566         spin_unlock(&imp->imp_lock);
1567         return i;
1568 }