Whamcloud - gitweb
Land b_smallfix onto HEAD (20040223_1817)
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifdef __KERNEL__
25 # include <linux/config.h>
26 # include <linux/module.h>
27 # include <linux/kmod.h>
28 #else
29 # include <liblustre.h>
30 #endif
31
32 #include <linux/obd_support.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_net.h>
35 #include <linux/lustre_import.h>
36 #include <linux/lustre_export.h>
37 #include <linux/obd.h>
38 #include <linux/obd_class.h>
39
40 #include "ptlrpc_internal.h"
41
42 struct ptlrpc_connect_async_args {
43          __u64 pcaa_peer_committed;
44         int pcaa_initial_connect;
45         int pcaa_was_invalid;
46 };
47
48 /* A CLOSED import should remain so. */
49 #define IMPORT_SET_STATE_NOLOCK(imp, state)                                    \
50 do {                                                                           \
51         if (imp->imp_state != LUSTRE_IMP_CLOSED) {                             \
52                CDEBUG(D_HA, "%p %s: changing import state from %s to %s\n",    \
53                       imp, imp->imp_target_uuid.uuid,                          \
54                       ptlrpc_import_state_name(imp->imp_state),                \
55                       ptlrpc_import_state_name(state));                        \
56                imp->imp_state = state;                                         \
57         }                                                                      \
58 } while(0)
59
60 #define IMPORT_SET_STATE(imp, state)                    \
61 do {                                                    \
62         unsigned long flags;                            \
63                                                         \
64         spin_lock_irqsave(&imp->imp_lock, flags);       \
65         IMPORT_SET_STATE_NOLOCK(imp, state);            \
66         spin_unlock_irqrestore(&imp->imp_lock, flags);  \
67 } while(0)
68
69
70 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
71                                     void * data, int rc);
72 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
73
74 /* Only this function is allowed to change the import state when it is
75  * CLOSED. I would rather refcount the import and free it after
76  * disconnection like we do with exports. To do that, the client_obd
77  * will need to save the peer info somewhere other than in the import,
78  * though. */
79 int ptlrpc_init_import(struct obd_import *imp)
80 {
81         unsigned long flags;
82         
83         spin_lock_irqsave(&imp->imp_lock, flags);
84
85         imp->imp_generation++;
86         imp->imp_state =  LUSTRE_IMP_NEW;
87
88         spin_unlock_irqrestore(&imp->imp_lock, flags);
89
90         return 0;
91 }
92
93 /* Returns true if import was FULL, false if import was already not
94  * connected.
95  */
96 int ptlrpc_set_import_discon(struct obd_import *imp)
97 {
98         unsigned long flags;
99         int rc = 0;
100         
101         spin_lock_irqsave(&imp->imp_lock, flags);
102
103         if (imp->imp_state == LUSTRE_IMP_FULL) {
104                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_DISCON);
105                 rc = 1;
106         } else {
107                 CDEBUG(D_HA, "%p %s: import already not connected: %s\n",
108                        imp,imp->imp_client->cli_name, 
109                        ptlrpc_import_state_name(imp->imp_state));
110         }
111         spin_unlock_irqrestore(&imp->imp_lock, flags);
112
113         return rc;
114 }
115
116 void ptlrpc_fail_import(struct obd_import *imp, int generation)
117 {
118         ENTRY;
119
120         LASSERT (!imp->imp_dlm_fake);
121
122         if (ptlrpc_set_import_discon(imp))
123                 ptlrpc_handle_failed_import(imp);
124
125         EXIT;
126 }
127
128 int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid)
129 {
130         struct obd_device *obd = imp->imp_obd;
131         int initial_connect = 0;
132         int rc;
133         __u64 committed_before_reconnect = 0;
134         int was_invalid = 0;
135         struct ptlrpc_request *request;
136         int size[] = {sizeof(imp->imp_target_uuid),
137                                  sizeof(obd->obd_uuid),
138                                  sizeof(imp->imp_dlm_handle)};
139         char *tmp[] = {imp->imp_target_uuid.uuid,
140                        obd->obd_uuid.uuid,
141                        (char *)&imp->imp_dlm_handle};
142         struct ptlrpc_connect_async_args *aa;
143         unsigned long flags;
144
145         spin_lock_irqsave(&imp->imp_lock, flags);
146         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
147                 spin_unlock_irqrestore(&imp->imp_lock, flags);
148                 CERROR("can't connect to a closed import\n");
149                 RETURN(-EINVAL);
150         } else if (imp->imp_state == LUSTRE_IMP_FULL) {
151                 spin_unlock_irqrestore(&imp->imp_lock, flags);
152                 CERROR("already connected\n");
153                 RETURN(0);
154         } else if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
155                 spin_unlock_irqrestore(&imp->imp_lock, flags);
156                 CERROR("already connecting\n");
157                 RETURN(-EALREADY);
158         }
159
160         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CONNECTING);
161
162         imp->imp_conn_cnt++; 
163         imp->imp_last_replay_transno = 0;
164
165         if (imp->imp_remote_handle.cookie == 0) {
166                 initial_connect = 1;
167         } else {
168                 committed_before_reconnect = imp->imp_peer_committed_transno;;
169
170         }
171
172         if (imp->imp_invalid) {
173                 imp->imp_invalid = 0;
174                 was_invalid = 1;
175         }
176
177         spin_unlock_irqrestore(&imp->imp_lock, flags);
178
179         if (new_uuid) {
180                 struct ptlrpc_connection *conn;
181                 struct obd_uuid uuid;
182                 struct obd_export *dlmexp;
183
184                 obd_str2uuid(&uuid, new_uuid);
185
186                 conn = ptlrpc_uuid_to_connection(&uuid);
187                 if (!conn)
188                         GOTO(out, rc = -ENOENT);
189
190                 CDEBUG(D_HA, "switching import %s/%s from %s to %s\n",
191                        imp->imp_target_uuid.uuid, imp->imp_obd->obd_name,
192                        imp->imp_connection->c_remote_uuid.uuid,
193                        conn->c_remote_uuid.uuid);
194
195                 /* Switch the import's connection and the DLM export's
196                  * connection (which are almost certainly the same, but we
197                  * keep distinct refs just to make things clearer. I think. */
198                 if (imp->imp_connection)
199                         ptlrpc_put_connection(imp->imp_connection);
200                 /* We hand off the ref from ptlrpc_get_connection. */
201                 imp->imp_connection = conn;
202
203                 dlmexp = class_conn2export(&imp->imp_dlm_handle);
204                 
205                 LASSERT(dlmexp != NULL);
206
207                 if (dlmexp->exp_connection)
208                         ptlrpc_put_connection(dlmexp->exp_connection);
209                 dlmexp->exp_connection = ptlrpc_connection_addref(conn);
210                 class_export_put(dlmexp);
211
212         }
213
214         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
215         if (!request)
216                 GOTO(out, rc = -ENOMEM);
217
218 #ifndef __KERNEL__
219         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_LIBCLIENT);
220 #endif
221
222         request->rq_send_state = LUSTRE_IMP_CONNECTING;
223         request->rq_replen = lustre_msg_size(0, NULL);
224         request->rq_interpret_reply = ptlrpc_connect_interpret;
225
226         LASSERT (sizeof (*aa) <= sizeof (request->rq_async_args));
227         aa = (struct ptlrpc_connect_async_args *)&request->rq_async_args;
228         memset(aa, 0, sizeof *aa);
229
230         aa->pcaa_peer_committed = committed_before_reconnect;
231         aa->pcaa_initial_connect = initial_connect;
232         aa->pcaa_was_invalid = was_invalid;
233
234         if (aa->pcaa_initial_connect)
235                 imp->imp_replayable = 1;
236
237         ptlrpcd_add_req(request);
238         rc = 0;
239 out:
240         if (rc != 0) {
241                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
242         }
243
244         RETURN(rc);
245 }
246
247 static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
248                                     void * data, int rc)
249 {
250         struct ptlrpc_connect_async_args *aa = data;
251         struct obd_import *imp = request->rq_import;
252         struct lustre_handle old_hdl;
253         unsigned long flags;
254         int msg_flags;
255         ENTRY;
256         
257         spin_lock_irqsave(&imp->imp_lock, flags);
258         if (imp->imp_state == LUSTRE_IMP_CLOSED) {
259                 spin_unlock_irqrestore(&imp->imp_lock, flags);
260                 RETURN(0);
261         }
262         spin_unlock_irqrestore(&imp->imp_lock, flags);
263
264         if (rc)
265                 GOTO(out, rc);
266
267         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
268
269         if (aa->pcaa_initial_connect) {
270                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
271                         CDEBUG(D_HA, "connected to replayable target: %s\n",
272                                imp->imp_target_uuid.uuid);
273                         imp->imp_replayable = 1;
274                         ptlrpc_pinger_add_import(imp);
275                 } else {
276                         imp->imp_replayable = 0;
277                 }
278                 imp->imp_remote_handle = request->rq_repmsg->handle;
279                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
280                 GOTO(finish, rc = 0);
281         }
282
283         /* Determine what recovery state to move the import to. */
284         if (MSG_CONNECT_RECONNECT & msg_flags) {
285                 memset(&old_hdl, 0, sizeof(old_hdl));
286                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
287                             sizeof (old_hdl))) {
288                         CERROR("%s@%s didn't like our handle "LPX64
289                                ", failed\n", imp->imp_target_uuid.uuid,
290                                imp->imp_connection->c_remote_uuid.uuid,
291                                imp->imp_dlm_handle.cookie);
292                         GOTO(out, rc = -ENOTCONN);
293                 }
294
295                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
296                            sizeof(imp->imp_remote_handle))) {
297                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
298                                "; copying, but this may foreshadow disaster\n",
299                                imp->imp_target_uuid.uuid,
300                                imp->imp_connection->c_remote_uuid.uuid,
301                                imp->imp_remote_handle.cookie,
302                                request->rq_repmsg->handle.cookie);
303                         imp->imp_remote_handle = request->rq_repmsg->handle;
304                 } else {
305                         CERROR("reconnected to %s@%s after partition\n",
306                                imp->imp_target_uuid.uuid, 
307                                imp->imp_connection->c_remote_uuid.uuid);
308                 }
309                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
310         } 
311         else if (MSG_CONNECT_RECOVERING & msg_flags) {
312                 LASSERT(imp->imp_replayable);
313                 imp->imp_state = LUSTRE_IMP_RECOVER;
314                 imp->imp_remote_handle = request->rq_repmsg->handle;
315                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
316         } 
317         else {
318                 imp->imp_remote_handle = request->rq_repmsg->handle;
319                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
320         }
321         
322         /* Sanity checks for a reconnected import. */
323         if (!(imp->imp_replayable) != 
324              !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
325                 CERROR("imp_replayable flag does not match server "
326                        "after reconnect. We should LBUG right here.\n");
327         }
328
329         if (request->rq_repmsg->last_committed < aa->pcaa_peer_committed) {
330                 CERROR("%s went back in time (transno "LPD64
331                        " was previously committed, server now claims "LPD64
332                        ")! is shared storage not coherent?\n",
333                        imp->imp_target_uuid.uuid,
334                        aa->pcaa_peer_committed,
335                        request->rq_repmsg->last_committed);
336         }
337
338 finish:
339         rc = ptlrpc_import_recovery_state_machine(imp);
340         if (rc != 0) {
341                 if (aa->pcaa_was_invalid) {
342                         ptlrpc_set_import_active(imp, 0);
343                 }                
344
345                 if (rc == -ENOTCONN) {
346                         CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery;"
347                                "invalidating and reconnecting\n",
348                                imp->imp_target_uuid.uuid,
349                                imp->imp_connection->c_remote_uuid.uuid);
350                         ptlrpc_connect_import(imp, NULL);
351                         RETURN(0);
352                 } 
353         }
354  out:
355         if (rc != 0) {
356                 IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
357                 if (aa->pcaa_initial_connect && !imp->imp_initial_recov) {
358                         ptlrpc_set_import_active(imp, 0);
359                         GOTO(norecov, rc);
360                 }
361                 CDEBUG(D_ERROR, 
362                        "recovery of %s on %s failed (%d); restarting\n",
363                        imp->imp_target_uuid.uuid,
364                        (char *)imp->imp_connection->c_remote_uuid.uuid, rc);
365                 ptlrpc_handle_failed_import(imp);
366         }
367
368 norecov:
369         wake_up(&imp->imp_recovery_waitq);
370         RETURN(rc);
371 }
372
373 static int completed_replay_interpret(struct ptlrpc_request *req,
374                                     void * data, int rc)
375 {
376         atomic_dec(&req->rq_import->imp_replay_inflight);
377         ptlrpc_import_recovery_state_machine(req->rq_import);
378         RETURN(0);
379 }
380
381 static int signal_completed_replay(struct obd_import *imp)
382  {
383         struct ptlrpc_request *req;
384         ENTRY;
385
386         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
387         atomic_inc(&imp->imp_replay_inflight);
388
389         req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
390         if (!req)
391                 RETURN(-ENOMEM);
392
393         req->rq_replen = lustre_msg_size(0, NULL);
394         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
395         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
396         req->rq_timeout *= 3; 
397         req->rq_interpret_reply = completed_replay_interpret;
398
399         ptlrpcd_add_req(req);
400         RETURN(0);
401 }
402
403
404 int ptlrpc_import_recovery_state_machine(struct obd_import *imp)
405 {
406         int rc = 0;
407         int inflight;
408
409         if (imp->imp_state == LUSTRE_IMP_EVICTED) {
410                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
411                        imp->imp_target_uuid.uuid,
412                        imp->imp_connection->c_remote_uuid.uuid);
413                 ptlrpc_set_import_active(imp, 0);
414                 IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
415         } 
416         
417         if (imp->imp_state == LUSTRE_IMP_REPLAY) {
418                 CDEBUG(D_HA, "replay requested by %s\n",
419                        imp->imp_target_uuid.uuid);
420                 rc = ptlrpc_replay_next(imp, &inflight);
421                 if (inflight == 0 && 
422                     atomic_read(&imp->imp_replay_inflight) == 0) {
423                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
424                         rc = ldlm_replay_locks(imp);
425                         if (rc)
426                                 GOTO(out, rc);
427                 }
428                 rc = 0;
429         }
430
431         if (imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS) {
432                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
433                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_WAIT);
434                         rc = signal_completed_replay(imp);
435                         if (rc)
436                                 GOTO(out, rc);
437                 }
438
439         }
440
441         if (imp->imp_state == LUSTRE_IMP_REPLAY_WAIT) {
442                 if (atomic_read(&imp->imp_replay_inflight) == 0) {
443                         IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
444                 }
445         }
446
447         if (imp->imp_state == LUSTRE_IMP_RECOVER) {
448                 CDEBUG(D_HA, "reconnected to %s@%s\n",
449                        imp->imp_target_uuid.uuid,
450                        imp->imp_connection->c_remote_uuid.uuid);
451
452                 ptlrpc_set_import_active(imp, 1);
453                 rc = ptlrpc_resend(imp);
454                 if (rc)
455                         GOTO(out, rc);
456                 IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
457         } 
458
459         if (imp->imp_state == LUSTRE_IMP_FULL) {
460                 wake_up(&imp->imp_recovery_waitq);
461                 ptlrpc_wake_delayed(imp);
462         }
463
464  out:
465         RETURN(rc);
466 }
467
468 static int back_to_sleep(void *unused) 
469 {
470         return 0;
471 }
472
473 int ptlrpc_disconnect_import(struct obd_import *imp)
474 {
475         struct ptlrpc_request *request;
476         int rq_opc;
477         int rc = 0;
478         unsigned long flags;
479         ENTRY;
480
481         switch (imp->imp_connect_op) {
482         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
483         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
484         case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
485         default:
486                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
487                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
488                 RETURN(-EINVAL);
489         }
490
491
492         if (ptlrpc_import_in_recovery(imp)) {
493                 struct l_wait_info lwi;
494                 lwi = LWI_TIMEOUT_INTR(MAX(obd_timeout * HZ, 1), back_to_sleep, 
495                                        NULL, NULL);
496                 rc = l_wait_event(imp->imp_recovery_waitq, 
497                                   !ptlrpc_import_in_recovery(imp), &lwi);
498
499         }
500
501         spin_lock_irqsave(&imp->imp_lock, flags);
502         if (imp->imp_state != LUSTRE_IMP_FULL) {
503                 GOTO(out, 0);
504         }
505         spin_unlock_irqrestore(&imp->imp_lock, flags);
506
507         request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
508         if (request) {
509                 /* For non-replayable connections, don't attempt
510                    reconnect if this fails */
511                 if (!imp->imp_replayable) {
512                         IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
513                         request->rq_send_state =  LUSTRE_IMP_DISCON;
514                 }
515                 request->rq_replen = lustre_msg_size(0, NULL);
516                 rc = ptlrpc_queue_wait(request);
517                 ptlrpc_req_finished(request);
518         }
519
520         spin_lock_irqsave(&imp->imp_lock, flags);
521 out:
522         IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
523         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
524         spin_unlock_irqrestore(&imp->imp_lock, flags);
525
526         RETURN(rc);
527 }
528