Whamcloud - gitweb
b=11706
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifndef __KERNEL__
28 # include <liblustre.h>
29 #endif
30
31 #include <obd_support.h>
32 #include <lustre_ha.h>
33 #include <lustre_net.h>
34 #include <lustre_import.h>
35 #include <lustre_export.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #include "ptlrpc_internal.h"
40
41 struct ptlrpc_connect_async_args {
42          __u64 pcaa_peer_committed;
43         int pcaa_initial_connect;
44 };
45
46 /* A CLOSED import should remain so. */
47 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
48 do {                                                                           \
49         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
50                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
51                       imp, obd2cli_tgt(imp->imp_obd),                          \
52                       ptlrpc_import_state_name(imp->imp_state),                \
53                       ptlrpc_import_state_name(state));                        \
54                imp->imp_state = state;                                         \
55         }                                                                      \
56 } while(0)
57
58 #define IMPORT_SET_STATE(imp, state)            \
59 do {                                            \
60         spin_lock(&imp->imp_lock);              \
61         IMPORT_SET_STATE_NOLOCK(imp, state);    \
62         spin_unlock(&imp->imp_lock);            \
63 } while(0)
64
65
66 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
67                                     void * data, int rc);
68 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
69
70 /* Only this function is allowed to change the import state when it is
71  * CLOSED. I would rather refcount the import and free it after
72  * disconnection like we do with exports. To do that, the client_obd
73  * will need to save the peer info somewhere other than in the import,
74  * though. */
75 int ptlrpc_init_import(struct obd_import *imp)
76 {
77         spin_lock(&imp->imp_lock);
78
79         imp->imp_generation++;
80         imp->imp_state =  LUSTRE_IMP_NEW;
81
82         spin_unlock(&imp->imp_lock);
83
84         return 0;
85 }
86 EXPORT_SYMBOL(ptlrpc_init_import);
87
88 #define UUID_STR "_UUID"
89 static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
90                       int *uuid_len)
91 {
92         *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
93                 ? uuid : uuid + strlen(prefix);
94
95         *uuid_len = strlen(*uuid_start);
96
97         if (*uuid_len < strlen(UUID_STR))
98                 return;
99
100         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
101                     UUID_STR, strlen(UUID_STR)))
102                 *uuid_len -= strlen(UUID_STR);
103 }
104
105 /* Returns true if import was FULL, false if import was already not
106  * connected.
107  * @imp - import to be disconnected
108  * @conn_cnt - connection count (epoch) of the request that timed out
109  *             and caused the disconnection.  In some cases, multiple
110  *             inflight requests can fail to a single target (e.g. OST
111  *             bulk requests) and if one has already caused a reconnection
112  *             (increasing the import->conn_cnt) the older failure should
113  *             not also cause a reconnection.  If zero it forces a reconnect.
114  */
115 int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
116 {
117         int rc = 0;
118
119         spin_lock(&imp->imp_lock);
120
121         if (imp->imp_state == LUSTRE_IMP_FULL &&
122             (conn_cnt == 0 || conn_cnt == imp->imp_conn_cnt)) {
123                 char *target_start;
124                 int   target_len;
125
126                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
127                           &target_start, &target_len);
128                 if (imp->imp_replayable) {
129                         LCONSOLE_WARN("%s: Connection to service %.*s via nid "
130                                "%s was lost; in progress operations using this "
131                                "service will wait for recovery to complete.\n",
132                                imp->imp_obd->obd_name, target_len, target_start,
133                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
134                 } else {
135                         LCONSOLE_ERROR_MSG(0x166, "%s: Connection to service "
136                                "%.*s via nid %s was lost; in progress "
137                                "operations using this service will fail.\n",
138                                imp->imp_obd->obd_name, target_len, target_start, 
139                                libcfs_nid2str(imp->imp_connection->c_peer.nid));
140                 }
141                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
142                 spin_unlock(&imp->imp_lock);
143     
144                 if (obd_dump_on_timeout)
145                         libcfs_debug_dumplog();
146
147                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
148                 rc = 1;
149         } else {
150                 spin_unlock(&imp->imp_lock);
151                 CDEBUG(D_HA, "%s: import %p already %s (conn %u, was %u): %s\n",
152                        imp->imp_client->cli_name, imp,
153                        (imp->imp_state == LUSTRE_IMP_FULL &&
154                         imp->imp_conn_cnt > conn_cnt) ?
155                        "reconnected" : "not connected", imp->imp_conn_cnt,
156                        conn_cnt, ptlrpc_import_state_name(imp->imp_state));
157         }
158
159         return rc;
160 }
161
162 /*
163  * This acts as a barrier; all existing requests are rejected, and
164  * no new requests will be accepted until the import is valid again.
165  */
166 void ptlrpc_deactivate_import(struct obd_import *imp)
167 {
168         ENTRY;
169
170         spin_lock(&imp->imp_lock);
171         CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
172         imp->imp_invalid = 1;
173         imp->imp_generation++;
174         spin_unlock(&imp->imp_lock);
175
176         ptlrpc_abort_inflight(imp);
177         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
178 }
179
180 /*
181  * This function will invalidate the import, if necessary, then block
182  * for all the RPC completions, and finally notify the obd to
183  * invalidate its state (ie cancel locks, clear pending requests,
184  * etc).
185  */
186 void ptlrpc_invalidate_import(struct obd_import *imp)
187 {
188         struct l_wait_info lwi;
189         int rc;
190
191         if (!imp->imp_invalid)
192                 ptlrpc_deactivate_import(imp);
193
194         LASSERT(imp->imp_invalid);
195
196         /* wait for all requests to error out and call completion callbacks */
197         lwi = LWI_TIMEOUT_INTERVAL(cfs_timeout_cap(cfs_time_seconds(obd_timeout)), 
198                                    HZ, NULL, NULL);
199         rc = l_wait_event(imp->imp_recovery_waitq,
200                           (atomic_read(&imp->imp_inflight) == 0),
201                           &lwi);
202
203         if (rc)
204                 CDEBUG(D_HA, "%s: rc = %d waiting for callback (%d != 0)\n",
205                        obd2cli_tgt(imp->imp_obd), rc,
206                        atomic_read(&imp->imp_inflight));
207
208         obd_import_event(imp->imp_obd, imp, IMP_EVENT_INVALIDATE);
209 }
210
211 /* unset imp_invalid */
212 void ptlrpc_activate_import(struct obd_import *imp)
213 {
214         struct obd_device *obd = imp->imp_obd;
215
216         spin_lock(&imp->imp_lock);
217         imp->imp_invalid = 0;
218         spin_unlock(&imp->imp_lock);
219
220         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
221 }
222
223 void ptlrpc_fail_import(struct obd_import *imp, __u32 conn_cnt)
224 {
225         ENTRY;
226
227         LASSERT(!imp->imp_dlm_fake);
228
229         if (ptlrpc_set_import_discon(imp, conn_cnt)) {
230                 if (!imp->imp_replayable) {
231                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
232                                "auto-deactivating\n",
233                                obd2cli_tgt(imp->imp_obd),
234                                imp->imp_connection->c_remote_uuid.uuid,
235                                imp->imp_obd->obd_name);
236                         ptlrpc_deactivate_import(imp);
237                 }
238
239                 CDEBUG(D_HA, "%s: waking up pinger\n",
240                        obd2cli_tgt(imp->imp_obd));
241
242                 spin_lock(&imp->imp_lock);
243                 imp->imp_force_verify = 1;
244                 spin_unlock(&imp->imp_lock);
245
246                 ptlrpc_pinger_wake_up();
247         }
248         EXIT;
249 }
250
251 static int import_select_connection(struct obd_import *imp)
252 {
253         struct obd_import_conn *imp_conn = NULL, *conn;
254         struct obd_export *dlmexp;
255         ENTRY;
256
257         spin_lock(&imp->imp_lock);
258
259         if (list_empty(&imp->imp_conn_list)) {
260                 CERROR("%s: no connections available\n",
261                         imp->imp_obd->obd_name);
262                 spin_unlock(&imp->imp_lock);
263                 RETURN(-EINVAL);
264         }
265
266         list_for_each_entry(conn, &imp->imp_conn_list, oic_item) {
267                 CDEBUG(D_HA, "%s: connect to NID %s last attempt "LPU64"\n",
268                        imp->imp_obd->obd_name,
269                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
270                        conn->oic_last_attempt);
271                 /* Throttle the reconnect rate to once per RECONNECT_INTERVAL */
272                 if (cfs_time_before_64(conn->oic_last_attempt + 
273                                        RECONNECT_INTERVAL * HZ,
274                                        cfs_time_current_64())) {
275                         /* If we have never tried this connection since the
276                            the last successful attempt, go with this one */
277                         if (cfs_time_before_64(conn->oic_last_attempt,
278                                                imp->imp_last_success_conn)) {
279                                 imp_conn = conn;
280                                 break;
281                         }
282
283                         /* Both of these connections have already been tried
284                            since the last successful connection; just choose the
285                            least recently used */
286                         if (!imp_conn)
287                                 imp_conn = conn;
288                         else if (cfs_time_before_64(conn->oic_last_attempt,
289                                                     imp_conn->oic_last_attempt))
290                                 imp_conn = conn;
291                 } else {
292                         /* Exceptionally unlikely case caused by the node
293                          * booting and attempting to mount lustre faster than
294                          * than RECONNECT_INTERVAL seconds. */
295                         if (unlikely(conn->oic_last_attempt == 0)) {
296                                 imp_conn = conn;
297                                 break;
298                         }
299                 }
300         }
301
302         /* if not found, simply choose the current one */
303         if (!imp_conn) {
304                 LASSERT(imp->imp_conn_current);
305                 imp_conn = imp->imp_conn_current;
306         }
307         LASSERT(imp_conn->oic_conn);
308
309         imp_conn->oic_last_attempt = cfs_time_current_64();
310
311         /* switch connection, don't mind if it's same as the current one */
312         if (imp->imp_connection)
313                 ptlrpc_put_connection(imp->imp_connection);
314         imp->imp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
315
316         dlmexp =  class_conn2export(&imp->imp_dlm_handle);
317         LASSERT(dlmexp != NULL);
318         if (dlmexp->exp_connection)
319                 ptlrpc_put_connection(dlmexp->exp_connection);
320         dlmexp->exp_connection = ptlrpc_connection_addref(imp_conn->oic_conn);
321         class_export_put(dlmexp);
322
323         if (imp->imp_conn_current != imp_conn) {
324                 if (imp->imp_conn_current)
325                         LCONSOLE_INFO("Changing connection for %s to %s/%s\n",
326                                       imp->imp_obd->obd_name,
327                                       imp_conn->oic_uuid.uuid,
328                                       libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
329                 imp->imp_conn_current = imp_conn;
330         }
331
332         CDEBUG(D_HA, "%s: import %p using connection %s/%s\n",
333                imp->imp_obd->obd_name, imp, imp_conn->oic_uuid.uuid,
334                libcfs_nid2str(imp_conn->oic_conn->c_peer.nid));
335
336         spin_unlock(&imp->imp_lock);
337
338         RETURN(0);
339 }
340
341 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
342 {
343         struct obd_device *obd = imp->imp_obd;
344         int initial_connect = 0;
345         int rc;
346         __u64 committed_before_reconnect = 0;
347         struct ptlrpc_request *request;
348         int size[] = { sizeof(struct ptlrpc_body),
349                        sizeof(imp->imp_obd->u.cli.cl_target_uuid),
350                        sizeof(obd->obd_uuid),
351                        sizeof(imp->imp_dlm_handle),
352                        sizeof(imp->imp_connect_data) };
353         char *tmp[] = { NULL,
354                         obd2cli_tgt(imp->imp_obd),
355                         obd->obd_uuid.uuid,
356                         (char *)&imp->imp_dlm_handle,
357                         (char *)&imp->imp_connect_data };
358         struct ptlrpc_connect_async_args *aa;
359
360         ENTRY;
361         spin_lock(&imp->imp_lock);
362         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
363                 spin_unlock(&imp->imp_lock);
364                 CERROR("can't connect to a closed import\n");
365                 RETURN(-EINVAL);
366         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
367                 spin_unlock(&imp->imp_lock);
368                 CERROR("already connected\n");
369                 RETURN(0);
370         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
371                 spin_unlock(&imp->imp_lock);
372                 CERROR("already connecting\n");
373                 RETURN(-EALREADY);
374         }
375
376         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
377
378         imp->imp_conn_cnt++;
379         imp->imp_resend_replay = 0;
380
381         if (!lustre_handle_is_used(&imp->imp_remote_handle))
382                 initial_connect = 1;
383         else
384                 committed_before_reconnect = imp->imp_peer_committed_transno;
385
386         spin_unlock(&imp->imp_lock);
387
388         if (new_uuid) {
389                 struct obd_uuid uuid;
390
391                 obd_str2uuid(&uuid, new_uuid);
392                 rc = import_set_conn_priority(imp, &uuid);
393                 if (rc)
394                         GOTO(out, rc);
395         }
396
397         rc = import_select_connection(imp);
398         if (rc)
399                 GOTO(out, rc);
400
401         /* last in connection list */
402         if (imp->imp_conn_current->oic_item.next == &imp->imp_conn_list) {
403                 if (imp->imp_initial_recov_bk && initial_connect) {
404                         CDEBUG(D_HA, "Last connection attempt (%d) for %s\n",
405                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
406                         /* Don't retry if connect fails */
407                         rc = 0;
408                         obd_set_info_async(obd->obd_self_export,
409                                            strlen(KEY_INIT_RECOV),
410                                            KEY_INIT_RECOV,
411                                            sizeof(rc), &rc, NULL);
412                 }
413                 if (imp->imp_recon_bk) {
414                         CDEBUG(D_HA, "Last reconnection attempt (%d) for %s\n",
415                                imp->imp_conn_cnt, obd2cli_tgt(imp->imp_obd));
416                         spin_lock(&imp->imp_lock);
417                         imp->imp_last_recon = 1;
418                         spin_unlock(&imp->imp_lock);
419                 }
420         }
421
422         /* Reset connect flags to the originally requested flags, in case
423          * the server is updated on-the-fly we will get the new features. */
424         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
425         rc = obd_reconnect(imp->imp_obd->obd_self_export, obd,
426                            &obd->obd_uuid, &imp->imp_connect_data);
427         if (rc)
428                 GOTO(out, rc);
429
430         request = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, imp->imp_connect_op,
431                                   5, size, tmp);
432         if (!request)
433                 GOTO(out, rc = -ENOMEM);
434
435 #ifndef __KERNEL__
436         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
437 #endif
438         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_NEXT_VER);
439
440         request->rq_send_state = LUSTRE_IMP_CONNECTING;
441         /* Allow a slightly larger reply for future growth compatibility */
442         size[REPLY_REC_OFF] = sizeof(struct obd_connect_data) +
443                               16 * sizeof(__u64);
444         ptlrpc_req_set_repsize(request, 2, size);
445         request->rq_interpret_reply = ptlrpc_connect_interpret;
446
447         CLASSERT(sizeof (*aa) <= sizeof (request->rq_async_args));
448         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
449         memset(aa, 0, sizeof *aa);
450
451         aa->pcaa_peer_committed = committed_before_reconnect;
452         aa->pcaa_initial_connect = initial_connect;
453
454         if (aa->pcaa_initial_connect) {
455                 spin_lock(&imp->imp_lock);
456                 imp->imp_replayable = 1;
457                 spin_unlock(&imp->imp_lock);
458                 /* On an initial connect, we don't know which one of a
459                    failover server pair is up.  Don't wait long. */
460 #ifdef CRAY_XT3
461                 request->rq_timeout = max((int)(obd_timeout / 2), 5);
462 #else
463                 request->rq_timeout = max((int)(obd_timeout / 20), 5);
464 #endif
465         }
466
467         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request");
468         ptlrpcd_add_req(request);
469         rc = 0;
470 out:
471         if (rc != 0) {
472                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
473         }
474
475         RETURN(rc);
476 }
477 EXPORT_SYMBOL(ptlrpc_connect_import);
478
479 static void ptlrpc_maybe_ping_import_soon(struct obd_import *imp)
480 {
481 #ifdef __KERNEL__
482         struct obd_import_conn *imp_conn;
483 #endif
484         int wake_pinger = 0;
485
486         ENTRY;
487
488         spin_lock(&imp->imp_lock);
489         if (list_empty(&imp->imp_conn_list))
490                 GOTO(unlock, 0);
491
492 #ifdef __KERNEL__
493         imp_conn = list_entry(imp->imp_conn_list.prev,
494                               struct obd_import_conn,
495                               oic_item);
496
497         if (imp->imp_conn_current != imp_conn) {
498                 ptlrpc_ping_import_soon(imp);
499                 wake_pinger = 1;
500         }
501
502 #else
503         /* liblustre has no pinger thead, so we wakup pinger anyway */
504         wake_pinger = 1;
505 #endif 
506  unlock:
507         spin_unlock(&imp->imp_lock);
508
509         if (wake_pinger)
510                 ptlrpc_pinger_wake_up();
511
512         EXIT;
513 }
514
515 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
516                                     void * data, int rc)
517 {
518         struct ptlrpc_connect_async_args *aa = data;
519         struct obd_import *imp = request->rq_import;
520         struct client_obd *cli = &imp->imp_obd->u.cli;
521         struct lustre_handle old_hdl;
522         int msg_flags;
523         ENTRY;
524
525         spin_lock(&imp->imp_lock);
526         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
527                 spin_unlock(&imp->imp_lock);
528                 RETURN(0);
529         }
530         spin_unlock(&imp->imp_lock);
531
532         if (rc)
533                 GOTO(out, rc);
534
535         LASSERT(imp->imp_conn_current);
536
537         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
538
539         /* All imports are pingable */
540         spin_lock(&imp->imp_lock);
541         imp->imp_pingable = 1;
542
543         if (aa->pcaa_initial_connect) {
544                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
545                         imp->imp_replayable = 1;
546                         spin_unlock(&imp->imp_lock);
547                         CDEBUG(D_HA, "connected to replayable target: %s\n",
548                                obd2cli_tgt(imp->imp_obd));
549                 } else {
550                         imp->imp_replayable = 0;
551                         spin_unlock(&imp->imp_lock);
552                 }
553
554                 if (msg_flags & MSG_CONNECT_NEXT_VER) {
555                         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
556                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v2\n",
557                                obd2cli_tgt(imp->imp_obd));
558                 } else {
559                         CDEBUG(D_RPCTRACE, "connect to %s with lustre_msg_v1\n",
560                                obd2cli_tgt(imp->imp_obd));
561                 }
562
563                 imp->imp_remote_handle =
564                                 *lustre_msg_get_handle(request->rq_repmsg);
565
566                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
567                 GOTO(finish, rc = 0);
568         } else {
569                 spin_unlock(&imp->imp_lock);
570         }
571
572         /* Determine what recovery state to move the import to. */
573         if (MSG_CONNECT_RECONNECT & msg_flags) {
574                 memset(&old_hdl, 0, sizeof(old_hdl));
575                 if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
576                             sizeof (old_hdl))) {
577                         CERROR("%s@%s didn't like our handle "LPX64
578                                ", failed\n", obd2cli_tgt(imp->imp_obd),
579                                imp->imp_connection->c_remote_uuid.uuid,
580                                imp->imp_dlm_handle.cookie);
581                         GOTO(out, rc = -ENOTCONN);
582                 }
583
584                 if (memcmp(&imp->imp_remote_handle,
585                            lustre_msg_get_handle(request->rq_repmsg),
586                            sizeof(imp->imp_remote_handle))) {
587                         int level = D_ERROR;
588                         /* Old MGC can reconnect to a restarted MGS */
589                         if (strcmp(imp->imp_obd->obd_type->typ_name,
590                                    LUSTRE_MGC_NAME) == 0) {
591                                 level = D_CONFIG;
592                         }
593                         CDEBUG(level, 
594                                "%s@%s changed handle from "LPX64" to "LPX64
595                                "; copying, but this may foreshadow disaster\n",
596                                obd2cli_tgt(imp->imp_obd),
597                                imp->imp_connection->c_remote_uuid.uuid,
598                                imp->imp_remote_handle.cookie,
599                                lustre_msg_get_handle(request->rq_repmsg)->
600                                         cookie);
601                         imp->imp_remote_handle =
602                                      *lustre_msg_get_handle(request->rq_repmsg);
603                 } else {
604                         CDEBUG(D_HA, "reconnected to %s@%s after partition\n",
605                                obd2cli_tgt(imp->imp_obd),
606                                imp->imp_connection->c_remote_uuid.uuid);
607                 }
608
609                 if (imp->imp_invalid) {
610                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
611                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
612                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
613                                imp->imp_obd->obd_name,
614                                obd2cli_tgt(imp->imp_obd));
615
616                         spin_lock(&imp->imp_lock);
617                         imp->imp_resend_replay = 1;
618                         spin_unlock(&imp->imp_lock);
619
620                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
621                 } else {
622                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
623                 }
624         } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
625                 LASSERT(imp->imp_replayable);
626                 imp->imp_remote_handle =
627                                 *lustre_msg_get_handle(request->rq_repmsg);
628                 imp->imp_last_replay_transno = 0;
629                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
630         } else {
631                 imp->imp_remote_handle =
632                                 *lustre_msg_get_handle(request->rq_repmsg);
633                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
634         }
635
636         /* Sanity checks for a reconnected import. */
637         if (!(imp->imp_replayable) != !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
638                 CERROR("imp_replayable flag does not match server "
639                        "after reconnect. We should LBUG right here.\n");
640         }
641
642         if (lustre_msg_get_last_committed(request->rq_repmsg) <
643             aa->pcaa_peer_committed) {
644                 CERROR("%s went back in time (transno "LPD64
645                        " was previously committed, server now claims "LPD64
646                        ")!  See https://bugzilla.clusterfs.com/"
647                        "long_list.cgi?buglist=9646\n",
648                        obd2cli_tgt(imp->imp_obd), aa->pcaa_peer_committed,
649                        lustre_msg_get_last_committed(request->rq_repmsg));
650         }
651
652 finish:
653         rc = ptlrpc_import_recovery_state_machine(imp);
654         if (rc != 0) {
655                 if (rc == -ENOTCONN) {
656                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
657                                "invalidating and reconnecting\n",
658                                obd2cli_tgt(imp->imp_obd),
659                                imp->imp_connection->c_remote_uuid.uuid);
660                         ptlrpc_connect_import(imp, NULL);
661                         RETURN(0);
662                 }
663         } else {
664                 struct obd_connect_data *ocd;
665                 struct obd_export *exp;
666
667                 ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
668                                          lustre_swab_connect);
669
670                 spin_lock(&imp->imp_lock);
671                 list_del(&imp->imp_conn_current->oic_item);
672                 list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
673                 imp->imp_last_success_conn =
674                         imp->imp_conn_current->oic_last_attempt;
675
676                 if (ocd == NULL) {
677                         spin_unlock(&imp->imp_lock);
678                         CERROR("Wrong connect data from server\n");
679                         rc = -EPROTO;
680                         GOTO(out, rc);
681                 }
682
683                 imp->imp_connect_data = *ocd;
684
685                 exp = class_conn2export(&imp->imp_dlm_handle);
686                 spin_unlock(&imp->imp_lock);
687
688                 /* check that server granted subset of flags we asked for. */
689                 LASSERTF((ocd->ocd_connect_flags &
690                           imp->imp_connect_flags_orig) ==
691                          ocd->ocd_connect_flags, LPX64" != "LPX64,
692                          imp->imp_connect_flags_orig, ocd->ocd_connect_flags);
693
694                 if (!exp) {
695                         /* This could happen if export is cleaned during the 
696                            connect attempt */
697                         CERROR("Missing export for %s\n", 
698                                imp->imp_obd->obd_name);
699                         GOTO(out, rc = -ENODEV);
700                 }
701                 exp->exp_connect_flags = ocd->ocd_connect_flags;
702                 class_export_put(exp);
703
704                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
705
706                 if (!ocd->ocd_ibits_known &&
707                     ocd->ocd_connect_flags & OBD_CONNECT_IBITS)
708                         CERROR("Inodebits aware server returned zero compatible"
709                                " bits?\n");
710
711                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
712                     (ocd->ocd_version > LUSTRE_VERSION_CODE +
713                                         LUSTRE_VERSION_OFFSET_WARN ||
714                      ocd->ocd_version < LUSTRE_VERSION_CODE -
715                                         LUSTRE_VERSION_OFFSET_WARN)) {
716                         /* Sigh, some compilers do not like #ifdef in the middle
717                            of macro arguments */
718 #ifdef __KERNEL__
719                         const char *older =
720                                 "older.  Consider upgrading this client";
721 #else
722                         const char *older =
723                                 "older.  Consider recompiling this application";
724 #endif
725                         const char *newer = "newer than client version";
726
727                         LCONSOLE_WARN("Server %s version (%d.%d.%d.%d) "
728                                       "is much %s (%s)\n",
729                                       obd2cli_tgt(imp->imp_obd),
730                                       OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
731                                       OBD_OCD_VERSION_MINOR(ocd->ocd_version),
732                                       OBD_OCD_VERSION_PATCH(ocd->ocd_version),
733                                       OBD_OCD_VERSION_FIX(ocd->ocd_version),
734                                       ocd->ocd_version > LUSTRE_VERSION_CODE ?
735                                       newer : older, LUSTRE_VERSION_STRING);
736                 }
737
738                 if (ocd->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
739                         cli->cl_max_pages_per_rpc = 
740                                 ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
741                 }
742
743                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
744                         (cli->cl_max_pages_per_rpc > 0));
745         }
746
747  out:
748         if (rc != 0) {
749                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
750                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov)
751                         ptlrpc_deactivate_import(imp);
752
753                 if (imp->imp_recon_bk && imp->imp_last_recon) {
754                         /* Give up trying to reconnect */
755                         imp->imp_obd->obd_no_recov = 1;
756                         ptlrpc_deactivate_import(imp);
757                 }
758
759                 if (rc == -EPROTO) {
760                         struct obd_connect_data *ocd;
761                         ocd = lustre_swab_repbuf(request, REPLY_REC_OFF,
762                                                  sizeof *ocd,
763                                                  lustre_swab_connect);
764                         if (ocd &&
765                             (ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
766                             (ocd->ocd_version != LUSTRE_VERSION_CODE)) {
767                            /* Actually servers are only supposed to refuse
768                               connection from liblustre clients, so we should
769                               never see this from VFS context */
770                                 LCONSOLE_ERROR_MSG(0x16a, "Server %s version "
771                                         "(%d.%d.%d.%d)"
772                                         " refused connection from this client "
773                                         "with an incompatible version (%s).  "
774                                         "Client must be recompiled\n",
775                                         obd2cli_tgt(imp->imp_obd),
776                                         OBD_OCD_VERSION_MAJOR(ocd->ocd_version),
777                                         OBD_OCD_VERSION_MINOR(ocd->ocd_version),
778                                         OBD_OCD_VERSION_PATCH(ocd->ocd_version),
779                                         OBD_OCD_VERSION_FIX(ocd->ocd_version),
780                                         LUSTRE_VERSION_STRING);
781                                 ptlrpc_deactivate_import(imp);
782                                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CLOSED);
783                         }
784                         RETURN(-EPROTO);
785                 }
786
787                 ptlrpc_maybe_ping_import_soon(imp);
788
789                 CDEBUG(D_HA, "recovery of %s on %s failed (%d)\n",
790                        obd2cli_tgt(imp->imp_obd),
791                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
792         }
793         
794         spin_lock(&imp->imp_lock);
795         imp->imp_last_recon = 0;
796         spin_unlock(&imp->imp_lock);
797
798         cfs_waitq_signal(&imp->imp_recovery_waitq);
799         RETURN(rc);
800 }
801
802 static int completed_replay_interpret(struct ptlrpc_request *req,
803                                     void * data, int rc)
804 {
805         ENTRY;
806         atomic_dec(&req->rq_import->imp_replay_inflight);
807         if (req->rq_status == 0) {
808                 ptlrpc_import_recovery_state_machine(req->rq_import);
809         } else {
810                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
811                        "reconnecting\n",
812                        req->rq_import->imp_obd->obd_name, req->rq_status);
813                 ptlrpc_connect_import(req->rq_import, NULL);
814         }
815
816         RETURN(0);
817 }
818
819 static int signal_completed_replay(struct obd_import *imp)
820 {
821         struct ptlrpc_request *req;
822         ENTRY;
823
824         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
825         atomic_inc(&imp->imp_replay_inflight);
826
827         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
828         if (!req) {
829                 atomic_dec(&imp->imp_replay_inflight);
830                 RETURN(-ENOMEM);
831         }
832
833         ptlrpc_req_set_repsize(req, 1, NULL);
834         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
835         lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
836         req->rq_timeout *= 3;
837         req->rq_interpret_reply = completed_replay_interpret;
838
839         ptlrpcd_add_req(req);
840         RETURN(0);
841 }
842
843 #ifdef __KERNEL__
844 static int ptlrpc_invalidate_import_thread(void *data)
845 {
846         struct obd_import *imp = data;
847
848         ENTRY;
849
850         ptlrpc_daemonize("ll_imp_inval");
851         
852         CDEBUG(D_HA, "thread invalidate import %s to %s@%s\n",
853                imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd),
854                imp->imp_connection->c_remote_uuid.uuid);
855
856         ptlrpc_invalidate_import(imp);
857
858         if (obd_dump_on_eviction) {
859                 CERROR("dump the log upon eviction\n");
860                 libcfs_debug_dumplog();
861         }
862
863         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
864         ptlrpc_import_recovery_state_machine(imp);
865
866         RETURN(0);
867 }
868 #endif
869
870 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
871 {
872         int rc = 0;
873         int inflight;
874         char *target_start;
875         int target_len;
876
877         ENTRY;
878         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
879                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
880                           &target_start, &target_len);
881                 /* Don't care about MGC eviction */
882                 if (strcmp(imp->imp_obd->obd_type->typ_name,
883                            LUSTRE_MGC_NAME) != 0) {
884                         LCONSOLE_ERROR_MSG(0x167, "This client was evicted by "
885                                            "%.*s; in progress operations using "
886                                            "this service will fail.\n",
887                                            target_len, target_start);
888                 }
889                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
890                        obd2cli_tgt(imp->imp_obd),
891                        imp->imp_connection->c_remote_uuid.uuid);
892
893 #ifdef __KERNEL__
894                 rc = cfs_kernel_thread(ptlrpc_invalidate_import_thread, imp,
895                                    CLONE_VM | CLONE_FILES);
896                 if (rc < 0)
897                         CERROR("error starting invalidate thread: %d\n", rc);
898                 else
899                         rc = 0;
900                 RETURN(rc);
901 #else
902                 ptlrpc_invalidate_import(imp);
903
904                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
905 #endif
906         }
907
908         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
909                 CDEBUG(D_HA, "replay requested by %s\n",
910                        obd2cli_tgt(imp->imp_obd));
911                 rc = ptlrpc_replay_next(imp, &inflight);
912                 if (inflight == 0 &&
913                     atomic_read(&imp->imp_replay_inflight) == 0) {
914                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
915                         rc = ldlm_replay_locks(imp);
916                         if (rc)
917                                 GOTO(out, rc);
918                 }
919                 rc = 0;
920         }
921
922         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
923                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
924                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
925                         rc = signal_completed_replay(imp);
926                         if (rc)
927                                 GOTO(out, rc);
928                 }
929
930         }
931
932         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
933                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
934                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
935                 }
936         }
937
938         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
939                 CDEBUG(D_HA, "reconnected to %s@%s\n",
940                        obd2cli_tgt(imp->imp_obd),
941                        imp->imp_connection->c_remote_uuid.uuid);
942
943                 rc = ptlrpc_resend(imp);
944                 if (rc)
945                         GOTO(out, rc);
946                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
947                 ptlrpc_activate_import(imp);
948
949                 deuuidify(obd2cli_tgt(imp->imp_obd), NULL,
950                           &target_start, &target_len);
951                 LCONSOLE_INFO("%s: Connection restored to service %.*s "
952                               "using nid %s.\n", imp->imp_obd->obd_name,
953                               target_len, target_start,
954                               libcfs_nid2str(imp->imp_connection->c_peer.nid));
955         }
956
957         if (imp->imp_state == LUSTRE_IMP_FULL) {
958                 cfs_waitq_signal(&imp->imp_recovery_waitq);
959                 ptlrpc_wake_delayed(imp);
960         }
961
962  out:
963         RETURN(rc);
964 }
965
966 static int back_to_sleep(void *unused)
967 {
968         return 0;
969 }
970
971 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
972 {
973         struct ptlrpc_request *req;
974         int rq_opc, rc = 0;
975         ENTRY;
976
977         switch (imp->imp_connect_op) {
978         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
979         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
980         case MGS_CONNECT: rq_opc = MGS_DISCONNECT; break;
981         default:
982                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
983                        obd2cli_tgt(imp->imp_obd), imp->imp_connect_op);
984                 RETURN(-EINVAL);
985         }
986
987         if (ptlrpc_import_in_recovery(imp)) {
988                 struct l_wait_info lwi;
989                 cfs_duration_t timeout = cfs_time_seconds(obd_timeout);
990
991                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout), 
992                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
993                 rc = l_wait_event(imp->imp_recovery_waitq,
994                                   !ptlrpc_import_in_recovery(imp), &lwi);
995
996         }
997
998         spin_lock(&imp->imp_lock);
999         if (imp->imp_state != LUSTRE_IMP_FULL)
1000                 GOTO(out, 0);
1001
1002         spin_unlock(&imp->imp_lock);
1003
1004         req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, rq_opc, 1, NULL, NULL);
1005         if (req) {
1006                 /* We are disconnecting, do not retry a failed DISCONNECT rpc if
1007                  * it fails.  We can get through the above with a down server
1008                  * if the client doesn't know the server is gone yet. */
1009                 req->rq_no_resend = 1;
1010 #ifdef CRAY_XT3
1011                 req->rq_timeout = obd_timeout / 3;
1012 #else
1013                 req->rq_timeout = 5;
1014 #endif
1015                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
1016                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
1017                 ptlrpc_req_set_repsize(req, 1, NULL);
1018                 rc = ptlrpc_queue_wait(req);
1019                 ptlrpc_req_finished(req);
1020         }
1021
1022         spin_lock(&imp->imp_lock);
1023 out:
1024         if (noclose) 
1025                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
1026         else
1027                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
1028         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
1029         spin_unlock(&imp->imp_lock);
1030
1031         RETURN(rc);
1032 }
1033
1034 /* Sets maximal number of RPCs possible originating from other side of this
1035    import (server) to us and number of async RPC replies that we are not waiting
1036    for arriving */
1037 void ptlrpc_import_setasync(struct obd_import *imp, int count)
1038 {
1039         LNetSetAsync(imp->imp_connection->c_peer, count);
1040 }
1041