Whamcloud - gitweb
LU-1406 ofd: add connect functions
[fs/lustre-release.git] / lustre / ofd / ofd_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_obd.c
37  *
38  * Author: Andreas Dilger <adilger@whamcloud.com>
39  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
40  * Author: Mike Pershin <tappro@whamcloud.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_FILTER
44
45 #include "ofd_internal.h"
46 #include <obd_cksum.h>
47
48 static int ofd_parse_connect_data(const struct lu_env *env,
49                                   struct obd_export *exp,
50                                   struct obd_connect_data *data)
51 {
52         struct ofd_device *ofd = ofd_exp(exp);
53
54         if (!data)
55                 RETURN(0);
56
57         CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64
58                " ocd_version: %x ocd_grant: %d ocd_index: %u\n",
59                exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
60                data->ocd_connect_flags, data->ocd_version,
61                data->ocd_grant, data->ocd_index);
62
63         data->ocd_connect_flags &= OST_CONNECT_SUPPORTED;
64         exp->exp_connect_flags = data->ocd_connect_flags;
65         data->ocd_version = LUSTRE_VERSION_CODE;
66
67         /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */
68         if (data->ocd_connect_flags & OBD_CONNECT_MDS)
69                 CDEBUG(D_HA, "%s: Received MDS connection for group %u\n",
70                        exp->exp_obd->obd_name, data->ocd_group);
71         else if (data->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN)
72                 RETURN(-EPROTO);
73
74         if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
75                 struct lr_server_data *lsd = &ofd->ofd_lut.lut_lsd;
76                 int                    index = lsd->lsd_ost_index;
77
78                 if (!(lsd->lsd_feature_compat & OBD_COMPAT_OST)) {
79                         /* this will only happen on the first connect */
80                         lsd->lsd_ost_index = data->ocd_index;
81                         lsd->lsd_feature_compat |= OBD_COMPAT_OST;
82                         /* sync is not needed here as lut_client_add will
83                          * set exp_need_sync flag */
84                         lut_server_data_update(env, &ofd->ofd_lut, 0);
85                 } else if (index != data->ocd_index) {
86                         LCONSOLE_ERROR_MSG(0x136, "Connection from %s to index"
87                                            " %u doesn't match actual OST index"
88                                            " %u in last_rcvd file, bad "
89                                            "configuration?\n",
90                                            obd_export_nid2str(exp), index,
91                                            data->ocd_index);
92                         RETURN(-EBADF);
93                 }
94         }
95
96         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_SIZE)) {
97                 data->ocd_brw_size = 65536;
98         } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
99                 data->ocd_brw_size = min(data->ocd_brw_size,
100                               (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
101                 if (data->ocd_brw_size == 0) {
102                         CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
103                                " ocd_version: %x ocd_grant: %d ocd_index: %u "
104                                "ocd_brw_size is unexpectedly zero, "
105                                "network data corruption?"
106                                "Refusing connection of this client\n",
107                                exp->exp_obd->obd_name,
108                                exp->exp_client_uuid.uuid,
109                                exp, data->ocd_connect_flags, data->ocd_version,
110                                data->ocd_grant, data->ocd_index);
111                         RETURN(-EPROTO);
112                 }
113         }
114
115         if (data->ocd_connect_flags & OBD_CONNECT_CKSUM) {
116                 __u32 cksum_types = data->ocd_cksum_types;
117
118                 /* The client set in ocd_cksum_types the checksum types it
119                  * supports. We have to mask off the algorithms that we don't
120                  * support */
121                 data->ocd_cksum_types &= cksum_types_supported();
122
123                 if (unlikely(data->ocd_cksum_types == 0)) {
124                         CERROR("%s: Connect with checksum support but no "
125                                "ocd_cksum_types is set\n",
126                                exp->exp_obd->obd_name);
127                         RETURN(-EPROTO);
128                 }
129
130                 CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
131                        "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp),
132                        cksum_types, data->ocd_cksum_types);
133         } else {
134                 /* This client does not support OBD_CONNECT_CKSUM
135                  * fall back to CRC32 */
136                 CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
137                        "OBD_CONNECT_CKSUM, CRC32 will be used\n",
138                        exp->exp_obd->obd_name, obd_export_nid2str(exp));
139         }
140
141         if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES)
142                 data->ocd_maxbytes = ofd->ofd_dt_conf.ddp_maxbytes;
143
144         RETURN(0);
145 }
146
147 static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp,
148                              struct obd_device *obd, struct obd_uuid *cluuid,
149                              struct obd_connect_data *data, void *localdata)
150 {
151         int rc;
152
153         ENTRY;
154
155         if (exp == NULL || obd == NULL || cluuid == NULL)
156                 RETURN(-EINVAL);
157
158         rc = lu_env_refill((struct lu_env *)env);
159         if (rc != 0) {
160                 CERROR("Failure to refill session: '%d'\n", rc);
161                 RETURN(rc);
162         }
163
164         ofd_info_init(env, exp);
165         rc = ofd_parse_connect_data(env, exp, data);
166
167         RETURN(rc);
168 }
169
170 static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp,
171                            struct obd_device *obd, struct obd_uuid *cluuid,
172                            struct obd_connect_data *data, void *localdata)
173 {
174         struct obd_export       *exp;
175         struct ofd_device       *ofd;
176         struct lustre_handle     conn = { 0 };
177         int                      rc, group;
178
179         ENTRY;
180
181         if (_exp == NULL || obd == NULL || cluuid == NULL)
182                 RETURN(-EINVAL);
183
184         ofd = ofd_dev(obd->obd_lu_dev);
185
186         rc = class_connect(&conn, obd, cluuid);
187         if (rc)
188                 RETURN(rc);
189
190         exp = class_conn2export(&conn);
191         LASSERT(exp != NULL);
192
193         rc = lu_env_refill((struct lu_env *)env);
194         if (rc != 0) {
195                 CERROR("Failure to refill session: '%d'\n", rc);
196                 GOTO(out, rc);
197         }
198
199         ofd_info_init(env, exp);
200
201         rc = ofd_parse_connect_data(env, exp, data);
202         if (rc)
203                 GOTO(out, rc);
204
205         ofd_export_stats_init(ofd, exp, localdata);
206         group = data->ocd_group;
207         if (obd->obd_replayable) {
208                 struct tg_export_data *ted = &exp->exp_target_data;
209
210                 memcpy(ted->ted_lcd->lcd_uuid, cluuid,
211                        sizeof(ted->ted_lcd->lcd_uuid));
212                 rc = lut_client_new(env, exp);
213                 if (rc != 0)
214                         GOTO(out, rc);
215         }
216         if (group == 0)
217                 GOTO(out, rc = 0);
218
219         /* init new group */
220         if (group > ofd->ofd_max_group) {
221                 ofd->ofd_max_group = group;
222                 rc = ofd_group_load(env, ofd, group);
223         }
224 out:
225         if (rc != 0) {
226                 class_disconnect(exp);
227                 *_exp = NULL;
228         } else {
229                 *_exp = exp;
230         }
231         RETURN(rc);
232 }
233
234 static int ofd_obd_disconnect(struct obd_export *exp)
235 {
236         struct lu_env   env;
237         int             rc;
238
239         ENTRY;
240
241         LASSERT(exp);
242         class_export_get(exp);
243
244         rc = server_disconnect_export(exp);
245
246         rc = lu_env_init(&env, LCT_DT_THREAD);
247         if (rc)
248                 RETURN(rc);
249
250         /* Do not erase record for recoverable client. */
251         if (exp->exp_obd->obd_replayable &&
252             (!exp->exp_obd->obd_fail || exp->exp_failed))
253                 lut_client_del(&env, exp);
254         lu_env_fini(&env);
255
256         class_export_put(exp);
257         RETURN(rc);
258 }
259
260 static int ofd_init_export(struct obd_export *exp)
261 {
262         int rc;
263
264         cfs_spin_lock_init(&exp->exp_filter_data.fed_lock);
265         CFS_INIT_LIST_HEAD(&exp->exp_filter_data.fed_mod_list);
266         cfs_spin_lock(&exp->exp_lock);
267         exp->exp_connecting = 1;
268         cfs_spin_unlock(&exp->exp_lock);
269
270         /* self-export doesn't need client data and ldlm initialization */
271         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
272                                      &exp->exp_client_uuid)))
273                 return 0;
274
275         rc = lut_client_alloc(exp);
276         if (rc == 0)
277                 ldlm_init_export(exp);
278         if (rc)
279                 CERROR("%s: Can't initialize export: rc %d\n",
280                        exp->exp_obd->obd_name, rc);
281         return rc;
282 }
283
284 static int ofd_destroy_export(struct obd_export *exp)
285 {
286         if (exp->exp_filter_data.fed_pending)
287                 CERROR("%s: cli %s/%p has %lu pending on destroyed export"
288                        "\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
289                        exp, exp->exp_filter_data.fed_pending);
290
291         target_destroy_export(exp);
292
293         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
294                                      &exp->exp_client_uuid)))
295                 return 0;
296
297         ldlm_destroy_export(exp);
298         lut_client_free(exp);
299
300         LASSERT(cfs_list_empty(&exp->exp_filter_data.fed_mod_list));
301         return 0;
302 }
303
304 int ofd_obd_postrecov(struct obd_device *obd)
305 {
306         struct lu_env            env;
307         struct lu_device        *ldev = obd->obd_lu_dev;
308         int                      rc;
309
310         ENTRY;
311
312         rc = lu_env_init(&env, LCT_DT_THREAD);
313         if (rc)
314                 RETURN(rc);
315         ofd_info_init(&env, obd->obd_self_export);
316
317         rc = ldev->ld_ops->ldo_recovery_complete(&env, ldev);
318         lu_env_fini(&env);
319         RETURN(rc);
320 }
321
322 static int ofd_obd_notify(struct obd_device *obd, struct obd_device *unused,
323                           enum obd_notify_event ev, void *data)
324 {
325         switch (ev) {
326         case OBD_NOTIFY_CONFIG:
327                 LASSERT(obd->obd_no_conn);
328                 cfs_spin_lock(&obd->obd_dev_lock);
329                 obd->obd_no_conn = 0;
330                 cfs_spin_unlock(&obd->obd_dev_lock);
331                 break;
332         default:
333                 CDEBUG(D_INFO, "%s: Unhandled notification %#x\n",
334                        obd->obd_name, ev);
335         }
336         return 0;
337 }
338
339 struct obd_ops ofd_obd_ops = {
340         .o_owner                = THIS_MODULE,
341         .o_connect              = ofd_obd_connect,
342         .o_reconnect            = ofd_obd_reconnect,
343         .o_disconnect           = ofd_obd_disconnect,
344         .o_init_export          = ofd_init_export,
345         .o_destroy_export       = ofd_destroy_export,
346         .o_postrecov            = ofd_obd_postrecov,
347         .o_notify               = ofd_obd_notify,
348 };