Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / ptlrpc / import.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *   Author: Mike Shaver <shaver@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifdef __KERNEL__
25 # include <linux/config.h>
26 # include <linux/module.h>
27 # include <linux/kmod.h>
28 #else
29 # include <liblustre.h>
30 #endif
31
32 #include <linux/obd_support.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_net.h>
35 #include <linux/lustre_import.h>
36 #include <linux/lustre_export.h>
37 #include <linux/obd.h>
38 #include <linux/obd_class.h>
39
40 #include "ptlrpc_internal.h"
41
42 /* should this take an imp_sem to ensure connect is single threaded? */
43 int ptlrpc_connect_import(struct obd_import *imp)
44 {
45         struct obd_device *obd = imp->imp_obd;
46         int msg_flags;
47         int initial_connect = 0;
48         int rc;
49         __u64 committed_before_reconnect = 0;
50         struct ptlrpc_request *request;
51         struct lustre_handle old_hdl;
52         int size[] = {sizeof(imp->imp_target_uuid),
53                                  sizeof(obd->obd_uuid),
54                                  sizeof(imp->imp_dlm_handle)};
55         char *tmp[] = {imp->imp_target_uuid.uuid,
56                        obd->obd_uuid.uuid,
57                        (char *)&imp->imp_dlm_handle};
58         unsigned long flags;
59
60         spin_lock_irqsave(&imp->imp_lock, flags);
61         if (imp->imp_state == LUSTRE_IMP_CONNECTING) {
62                 spin_unlock_irqrestore(&imp->imp_lock, flags);
63                 RETURN(-EALREADY);
64         } else {
65                 LASSERT(imp->imp_state == LUSTRE_IMP_DISCON);
66         }
67         CDEBUG(D_HA, "%s: new state: CONNECTING\n", 
68                imp->imp_client->cli_name);
69         imp->imp_state = LUSTRE_IMP_CONNECTING;
70         imp->imp_conn_cnt++; 
71         if (imp->imp_remote_handle.cookie == 0) {
72                 initial_connect = 1;
73         } else {
74                 committed_before_reconnect = imp->imp_peer_committed_transno;
75         }
76         spin_unlock_irqrestore(&imp->imp_lock, flags);
77
78         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
79         if (!request)
80                 GOTO(out, rc = -ENOMEM);
81
82         request->rq_send_state = LUSTRE_IMP_CONNECTING;
83         request->rq_replen = lustre_msg_size(0, NULL);
84
85         // lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_PEER);
86
87         rc = ptlrpc_queue_wait(request);
88         if (rc) {
89                 GOTO(free_req, rc);
90         }
91
92         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
93
94         if (initial_connect) {
95                 CDEBUG(D_HA, "%s: new state: FULL\n", 
96                        imp->imp_client->cli_name);
97                 if (msg_flags & MSG_CONNECT_REPLAYABLE)
98                         imp->imp_replayable = 1;
99                 imp->imp_remote_handle = request->rq_repmsg->handle;
100                 imp->imp_state = LUSTRE_IMP_FULL;
101                 GOTO(free_req, rc = 0);
102         }
103
104         /* Determine what recovery state to move the import to. */
105         if (MSG_CONNECT_RECONNECT & msg_flags) {
106                 memset(&old_hdl, 0, sizeof(old_hdl));
107                 if (!memcmp(&old_hdl, &request->rq_repmsg->handle,
108                             sizeof (old_hdl))) {
109                         CERROR("%s@%s didn't like our handle "LPX64
110                                ", failed\n", imp->imp_target_uuid.uuid,
111                                imp->imp_connection->c_remote_uuid.uuid,
112                                imp->imp_dlm_handle.cookie);
113                         GOTO(free_req, rc = -ENOTCONN);
114                 }
115
116                 if (memcmp(&imp->imp_remote_handle, &request->rq_repmsg->handle,
117                            sizeof(imp->imp_remote_handle))) {
118                         CERROR("%s@%s changed handle from "LPX64" to "LPX64
119                                "; copying, but this may foreshadow disaster\n",
120                                imp->imp_target_uuid.uuid,
121                                imp->imp_connection->c_remote_uuid.uuid,
122                                imp->imp_remote_handle.cookie,
123                                request->rq_repmsg->handle.cookie);
124                         imp->imp_remote_handle = request->rq_repmsg->handle;
125                 } else {
126                         CERROR("reconnected to %s@%s after partition\n",
127                                imp->imp_target_uuid.uuid, 
128                                imp->imp_connection->c_remote_uuid.uuid);
129                 }
130                 CDEBUG(D_HA, "%s: new state: RECOVER\n", 
131                        imp->imp_client->cli_name);
132                 imp->imp_state = LUSTRE_IMP_RECOVER;
133         } 
134         else if (MSG_CONNECT_RECOVERING & msg_flags) {
135                 CDEBUG(D_HA, "%s: new state: REPLAY\n", 
136                        imp->imp_client->cli_name);
137                 LASSERT(imp->imp_replayable);
138                 imp->imp_state = LUSTRE_IMP_RECOVER;
139                 imp->imp_remote_handle = request->rq_repmsg->handle;
140                 imp->imp_state = LUSTRE_IMP_REPLAY;
141         } 
142         else {
143                 CDEBUG(D_HA, "%s: new state: EVICTED\n", 
144                        imp->imp_client->cli_name);
145                 imp->imp_remote_handle = request->rq_repmsg->handle;
146                 imp->imp_state = LUSTRE_IMP_EVICTED;
147         }
148         
149         /* Sanity checks for a reconnected import. */
150         if (!(imp->imp_replayable) != 
151              !(msg_flags & MSG_CONNECT_REPLAYABLE)) {
152                 CERROR("imp_replayable flag does not match server "
153                        "after reconnect. We should LBUG right here.\n");
154         }
155
156         if (request->rq_repmsg->last_committed < committed_before_reconnect) {
157                 CERROR("%s went back in time (transno "LPD64
158                        " was previously committed, server now claims "LPD64
159                        ")! is shared storage not coherent?\n",
160                        imp->imp_target_uuid.uuid,
161                        committed_before_reconnect,
162                        request->rq_repmsg->last_committed);
163         }
164
165  free_req:
166         ptlrpc_req_finished(request);
167
168  out:
169         if (rc != 0)
170                 imp->imp_state = LUSTRE_IMP_DISCON;
171         RETURN(rc);
172 }
173
174
175
176 int ptlrpc_disconnect_import(struct obd_import *imp)
177 {
178         struct ptlrpc_request *request;
179         int rq_opc;
180         int rc = 0;
181         ENTRY;
182
183         switch (imp->imp_connect_op) {
184         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
185         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
186         case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
187         default:
188                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
189                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
190                 RETURN(-EINVAL);
191         }
192
193         request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
194         if (request) {
195                 /* For non-replayable connections, don't attempt
196                    reconnect if this fails */
197                 if (!imp->imp_obd->obd_replayable) {
198                         imp->imp_state = LUSTRE_IMP_DISCON;
199                         request->rq_send_state =  LUSTRE_IMP_DISCON;
200                 }
201                 request->rq_replen = lustre_msg_size(0, NULL);
202                 rc = ptlrpc_queue_wait(request);
203                 ptlrpc_req_finished(request);
204         }
205
206         imp->imp_state = LUSTRE_IMP_DISCON;
207         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
208         RETURN(rc);
209 }
210