Whamcloud - gitweb
b=2306
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifdef __KERNEL__
25 # include <linux/config.h>
26 # include <linux/module.h>
27 # include <linux/kmod.h>
28 #else
29 # include <liblustre.h>
30 #endif
31
32 #include <linux/obd_support.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_net.h>
35 #include <linux/lustre_import.h>
36 #include <linux/lustre_export.h>
37 #include <linux/obd.h>
38 #include <linux/obd_class.h>
39
40 #include "ptlrpc_internal.h"
41
42 struct ptlrpc_connect_async_args {
43          __u64 pcaa_peer_committed;
44         int pcaa_initial_connect;
45         int pcaa_was_invalid;
46 };
47
48 /* A CLOSED import should remain so. */
49 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
50 do {                                                                           \
51         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
52                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
53                       imp, imp->imp_target_uuid.uuid,                          \
54                       ptlrpc_import_state_name(imp->imp_state),                \
55                       ptlrpc_import_state_name(state));                        \
56                imp->imp_state = state;                                         \
57         }                                                                      \
58 } while(0)
59
60 #define IMPORT_SET_STATE(imp, state)                    \
61 do {                                                    \
62         unsigned long flags;                            \
63                                                         \
64         spin_lock_irqsave(&imp->imp_lock, flags);       \
65         IMPORT_SET_STATE_NOLOCK(imp, state);            \
66         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
67 } while(0)
68
69
70 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
71                                     void * data, int rc);
72 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
73
74 /* Only this function is allowed to change the import state when it is
75  * CLOSED. I would rather refcount the import and free it after
76  * disconnection like we do with exports. To do that, the client_obd
77  * will need to save the peer info somewhere other than in the import,
78  * though. */
79 int ptlrpc_init_import(struct obd_import *imp)
80 {
81         unsigned long flags;
82         
83         spin_lock_irqsave(&imp->imp_lock, flags);
84
85         imp->imp_generation++;
86         imp->imp_state =  LUSTRE_IMP_NEW;
87
88         spin_unlock_irqrestore(&imp->imp_lock, flags);
89
90         return 0;
91 }
92
93 /* Returns true if import was FULL, false if import was already not
94  * connected.
95  */
96 int ptlrpc_set_import_discon(struct obd_import *imp)
97 {
98         unsigned long flags;
99         int rc = 0;
100         
101         spin_lock_irqsave(&imp->imp_lock, flags);
102
103         if (imp->imp_state == LUSTRE_IMP_FULL) {
104                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
105                 spin_unlock_irqrestore(&imp->imp_lock, flags); 
106                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_DISCON);
107                 rc = 1;
108         } else {
109                 spin_unlock_irqrestore(&imp->imp_lock, flags);
110                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
111                        imp,imp->imp_client->cli_name, 
112                        ptlrpc_import_state_name(imp->imp_state));
113         }
114
115         return rc;
116 }
117
118 void ptlrpc_invalidate_import(struct obd_import *imp)
119 {
120         struct obd_device *obd = imp->imp_obd;
121         unsigned long flags;
122         ENTRY;
123
124         spin_lock_irqsave(&imp->imp_lock, flags);
125         /* This is a bit of a hack, but invalidating replayable
126          * imports makes a temporary reconnect failure into a much more
127          * ugly -- and hard to remedy -- situation. */
128         if (!imp->imp_replayable) {
129                 CDEBUG(D_HA, "setting import %s INVALID\n",
130                        imp->imp_target_uuid.uuid);
131                 imp->imp_invalid = 1;
132         }
133         imp->imp_generation++;
134         spin_unlock_irqrestore(&imp->imp_lock, flags);
135
136         ptlrpc_abort_inflight(imp);
137         obd_import_event(obd, imp, IMP_EVENT_INVALIDATE);
138 }
139
140 void ptlrpc_validate_import(struct obd_import *imp)
141 {
142         struct obd_device *obd = imp->imp_obd;
143         unsigned long flags;
144
145         spin_lock_irqsave(&imp->imp_lock, flags);
146         imp->imp_invalid = 0;
147         spin_unlock_irqrestore(&imp->imp_lock, flags);
148
149         obd_import_event(obd, imp, IMP_EVENT_ACTIVE);
150 }
151
152 void ptlrpc_fail_import(struct obd_import *imp, int generation)
153 {
154         ENTRY;
155
156         LASSERT (!imp->imp_dlm_fake);
157
158         if (ptlrpc_set_import_discon(imp)) {
159                 unsigned long flags;
160
161                 if (!imp->imp_replayable) {
162                         CDEBUG(D_HA, "import %s@%s for %s not replayable, "
163                                "auto-deactivating\n",
164                                imp->imp_target_uuid.uuid,
165                                imp->imp_connection->c_remote_uuid.uuid,
166                                imp->imp_obd->obd_name);
167                         ptlrpc_invalidate_import(imp);
168                 }
169                 
170                 CDEBUG(D_HA, "%s: waking up pinger\n", 
171                        imp->imp_target_uuid.uuid);
172                 
173                 spin_lock_irqsave(&imp->imp_lock, flags);
174                 imp->imp_force_verify = 1;
175                 spin_unlock_irqrestore(&imp->imp_lock, flags);
176                 
177                 ptlrpc_pinger_wake_up();
178                 
179         }
180         EXIT;
181 }
182
183 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
184 {
185         struct obd_device *obd = imp->imp_obd;
186         int initial_connect = 0;
187         int rc;
188         __u64 committed_before_reconnect = 0;
189         int was_invalid = 0;
190         struct ptlrpc_request *request;
191         int size[] = {sizeof(imp->imp_target_uuid),
192                                  sizeof(obd->obd_uuid),
193                                  sizeof(imp->imp_dlm_handle)};
194         char *tmp[] = {imp->imp_target_uuid.uuid,
195                        obd->obd_uuid.uuid,
196                        (char *)&imp->imp_dlm_handle};
197         struct ptlrpc_connect_async_args *aa;
198         unsigned long flags;
199
200         spin_lock_irqsave(&imp->imp_lock, flags);
201         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
202                 spin_unlock_irqrestore(&imp->imp_lock, flags);
203                 CERROR("can't connect to a closed import\n");
204                 RETURN(-EINVAL);
205         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
206                 spin_unlock_irqrestore(&imp->imp_lock, flags);
207                 CERROR("already connected\n");
208                 RETURN(0);
209         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
210                 spin_unlock_irqrestore(&imp->imp_lock, flags);
211                 CERROR("already connecting\n");
212                 RETURN(-EALREADY);
213         }
214
215         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
216
217         imp->imp_conn_cnt++; 
218         imp->imp_last_replay_transno = 0;
219
220         if (imp->imp_remote_handle.cookie == 0) {
221                 initial_connect = 1;
222         } else {
223                 committed_before_reconnect = imp->imp_peer_committed_transno;;
224
225         }
226
227         if (imp->imp_invalid) {
228                 imp->imp_invalid = 0;
229                 was_invalid = 1;
230         }
231
232         spin_unlock_irqrestore(&imp->imp_lock, flags);
233
234         if (new_uuid) {
235                 struct ptlrpc_connection *conn;
236                 struct obd_uuid uuid;
237                 struct obd_export *dlmexp;
238
239                 obd_str2uuid(&uuid, new_uuid);
240
241                 conn = ptlrpc_uuid_to_connection(&uuid);
242                 if (!conn)
243                         GOTO(out, rc = -ENOENT);
244
245                 CDEBUG(D_HA, "switching import %s/%s from %s to %s\n",
246                        imp->imp_target_uuid.uuid, imp->imp_obd->obd_name,
247                        imp->imp_connection->c_remote_uuid.uuid,
248                        conn->c_remote_uuid.uuid);
249
250                 /* Switch the import's connection and the DLM export's
251                  * connection (which are almost certainly the same, but we
252                  * keep distinct refs just to make things clearer. I think. */
253                 if (imp->imp_connection)
254                         ptlrpc_put_connection(imp->imp_connection);
255                 /* We hand off the ref from ptlrpc_get_connection. */
256                 imp->imp_connection = conn;
257
258                 dlmexp = class_conn2export(&imp->imp_dlm_handle);
259                 
260                 LASSERT(dlmexp != NULL);
261
262                 if (dlmexp->exp_connection)
263                         ptlrpc_put_connection(dlmexp->exp_connection);
264                 dlmexp->exp_connection = ptlrpc_connection_addref(conn);
265                 class_export_put(dlmexp);
266
267         }
268
269         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
270         if (!request)
271                 GOTO(out, rc = -ENOMEM);
272
273 #ifndef __KERNEL__
274         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
275 #endif
276
277         request->rq_send_state = LUSTRE_IMP_CONNECTING;
278         request->rq_replen = lustre_msg_size(0, NULL);
279         request->rq_interpret_reply = ptlrpc_connect_interpret;
280
281         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
282         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
283         memset(aa, 0, sizeof *aa);
284
285         aa->pcaa_peer_committed = committed_before_reconnect;
286         aa->pcaa_initial_connect = initial_connect;
287         aa->pcaa_was_invalid = was_invalid;
288
289         if (aa->pcaa_initial_connect)
290                 imp->imp_replayable = 1;
291
292         ptlrpcd_add_req(request);
293         rc = 0;
294 out:
295         if (rc != 0) {
296                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
297         }
298
299         RETURN(rc);
300 }
301
302 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
303                                     void * data, int rc)
304 {
305         struct ptlrpc_connect_async_args *aa = data;
306         struct obd_import *imp = request->rq_import;
307         struct lustre_handle old_hdl;
308         unsigned long flags;
309         int msg_flags;
310         ENTRY;
311         
312         spin_lock_irqsave(&imp->imp_lock, flags);
313         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
314                 spin_unlock_irqrestore(&imp->imp_lock, flags);
315                 RETURN(0);
316         }
317         spin_unlock_irqrestore(&imp->imp_lock, flags);
318
319         if (rc)
320                 GOTO(out, rc);
321
322         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
323
324         if (aa->pcaa_initial_connect) {
325                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
326                         CDEBUG(D_HA, "connected to replayable target: %s\n",
327                                imp->imp_target_uuid.uuid);
328                         imp->imp_pingable = imp->imp_replayable = 1;
329                 } else {
330                         imp->imp_replayable = 0;
331                 }
332                 imp->imp_remote_handle = request->rq_repmsg->handle;
333                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
334                 GOTO(finish, rc = 0);
335         }
336
337         /* Determine what recovery state to move the import to. */
338         if (MSG_CONNECT_RECONNECT & msg_flags) {
339                 memset(&old_hdl, 0, sizeof(old_hdl));
340                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
341                             sizeof (old_hdl))) {
342                         CERROR("%s@%s didn't like our handle "LPX64
343                                ", failed\n", imp->imp_target_uuid.uuid,
344                                imp->imp_connection->c_remote_uuid.uuid,
345                                imp->imp_dlm_handle.cookie);
346                         GOTO(out, rc = -ENOTCONN);
347                 }
348
349                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
350                            sizeof(imp->imp_remote_handle))) {
351                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
352                                "; copying, but this may foreshadow disaster\n",
353                                imp->imp_target_uuid.uuid,
354                                imp->imp_connection->c_remote_uuid.uuid,
355                                imp->imp_remote_handle.cookie,
356                                request->rq_repmsg->handle.cookie);
357                         imp->imp_remote_handle = request->rq_repmsg->handle;
358                 } else {
359                         CERROR("reconnected to %s@%s after partition\n",
360                                imp->imp_target_uuid.uuid, 
361                                imp->imp_connection->c_remote_uuid.uuid);
362                 }
363                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
364         } 
365         else if (MSG_CONNECT_RECOVERING & msg_flags) {
366                 LASSERT(imp->imp_replayable);
367                 imp->imp_state = LUSTRE_IMP_RECOVER;
368                 imp->imp_remote_handle = request->rq_repmsg->handle;
369                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
370         } 
371         else {
372                 imp->imp_remote_handle = request->rq_repmsg->handle;
373                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
374         }
375         
376         /* Sanity checks for a reconnected import. */
377         if (!(imp->imp_replayable) != 
378              !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
379                 CERROR("imp_replayable flag does not match server "
380                        "after reconnect. We should LBUG right here.\n");
381         }
382
383         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
384                 CERROR("%s went back in time (transno "LPD64
385                        " was previously committed, server now claims "LPD64
386                        ")! is shared storage not coherent?\n",
387                        imp->imp_target_uuid.uuid,
388                        aa->pcaa_peer_committed,
389                        request->rq_repmsg->last_committed);
390         }
391
392 finish:
393         rc = ptlrpc_import_recovery_state_machine(imp);
394         if (rc != 0) {
395                 if (aa->pcaa_was_invalid)
396                         ptlrpc_invalidate_import(imp);
397
398                 if (rc == -ENOTCONN) {
399                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
400                                "invalidating and reconnecting\n",
401                                imp->imp_target_uuid.uuid,
402                                imp->imp_connection->c_remote_uuid.uuid);
403                         ptlrpc_connect_import(imp, NULL);
404                         RETURN(0);
405                 } 
406         }
407  out:
408         if (rc != 0) {
409                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
410                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov) {
411                         ptlrpc_invalidate_import(imp);
412                 }
413                 CDEBUG(D_ERROR, "recovery of %s on %s failed (%d)\n",
414                        imp->imp_target_uuid.uuid,
415                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
416         }
417
418         wake_up(&imp->imp_recovery_waitq);
419         RETURN(rc);
420 }
421
422 static int completed_replay_interpret(struct ptlrpc_request *req,
423                                     void * data, int rc)
424 {
425         atomic_dec(&req->rq_import->imp_replay_inflight);
426         ptlrpc_import_recovery_state_machine(req->rq_import);
427         RETURN(0);
428 }
429
430 static int signal_completed_replay(struct obd_import *imp)
431  {
432         struct ptlrpc_request *req;
433         ENTRY;
434
435         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
436         atomic_inc(&imp->imp_replay_inflight);
437
438         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
439         if (!req)
440                 RETURN(-ENOMEM);
441
442         req->rq_replen = lustre_msg_size(0, NULL);
443         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
444         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
445         req->rq_timeout *= 3; 
446         req->rq_interpret_reply = completed_replay_interpret;
447
448         ptlrpcd_add_req(req);
449         RETURN(0);
450 }
451
452
453 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
454 {
455         int rc = 0;
456         int inflight;
457
458         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
459                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
460                        imp->imp_target_uuid.uuid,
461                        imp->imp_connection->c_remote_uuid.uuid);
462                 ptlrpc_invalidate_import(imp);
463                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
464         } 
465         
466         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
467                 CDEBUG(D_HA, "replay requested by %s\n",
468                        imp->imp_target_uuid.uuid);
469                 rc = ptlrpc_replay_next(imp, &inflight);
470                 if (inflight == 0 && 
471                     atomic_read(&imp->imp_replay_inflight) == 0) {
472                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
473                         rc = ldlm_replay_locks(imp);
474                         if (rc)
475                                 GOTO(out, rc);
476                 }
477                 rc = 0;
478         }
479
480         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
481                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
482                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
483                         rc = signal_completed_replay(imp);
484                         if (rc)
485                                 GOTO(out, rc);
486                 }
487
488         }
489
490         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
491                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
492                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
493                 }
494         }
495
496         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
497                 CDEBUG(D_HA, "reconnected to %s@%s\n",
498                        imp->imp_target_uuid.uuid,
499                        imp->imp_connection->c_remote_uuid.uuid);
500
501                 rc = ptlrpc_resend(imp);
502                 if (rc)
503                         GOTO(out, rc);
504                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
505                 ptlrpc_validate_import(imp);
506         } 
507
508         if (imp->imp_state == LUSTRE_IMP_FULL) {
509                 wake_up(&imp->imp_recovery_waitq);
510                 ptlrpc_wake_delayed(imp);
511         }
512
513  out:
514         RETURN(rc);
515 }
516
517 static int back_to_sleep(void *unused) 
518 {
519         return 0;
520 }
521
522 int ptlrpc_disconnect_import(struct obd_import *imp)
523 {
524         struct ptlrpc_request *request;
525         int rq_opc;
526         int rc = 0;
527         unsigned long flags;
528         ENTRY;
529
530         switch (imp->imp_connect_op) {
531         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
532         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
533         case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
534         default:
535                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
536                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
537                 RETURN(-EINVAL);
538         }
539
540
541         if (ptlrpc_import_in_recovery(imp)) {
542                 struct l_wait_info lwi;
543                 lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), back_to_sleep, 
544                                        NULL, NULL);
545                 rc = l_wait_event(imp->imp_recovery_waitq, 
546                                   !ptlrpc_import_in_recovery(imp), &lwi);
547
548         }
549
550         spin_lock_irqsave(&imp->imp_lock, flags);
551         if (imp->imp_state != LUSTRE_IMP_FULL) {
552                 GOTO(out, 0);
553         }
554         spin_unlock_irqrestore(&imp->imp_lock, flags);
555
556         request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
557         if (request) {
558                 /* For non-replayable connections, don't attempt
559                    reconnect if this fails */
560                 if (!imp->imp_replayable) {
561                         request->rq_no_resend = 1;
562                         IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
563                         request->rq_send_state =  LUSTRE_IMP_CONNECTING;
564                 }
565                 request->rq_replen = lustre_msg_size(0, NULL);
566                 rc = ptlrpc_queue_wait(request);
567                 ptlrpc_req_finished(request);
568         }
569
570         spin_lock_irqsave(&imp->imp_lock, flags);
571 out:
572         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
573         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
574         spin_unlock_irqrestore(&imp->imp_lock, flags);
575
576         RETURN(rc);
577 }
578