Whamcloud - gitweb
1e99e1194d15e093257833c7b8fafade91bbc54d
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/handler.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Andreas Dilger <adilger@clusterfs.com>
40  * Author: Phil Schwan <phil@clusterfs.com>
41  * Author: Mike Shaver <shaver@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <lustre_mds.h>
47 #include <linux/module.h>
48 #include <linux/init.h>
49 #include <linux/random.h>
50 #include <linux/fs.h>
51 #include <linux/jbd.h>
52 #include <linux/smp_lock.h>
53 #include <linux/buffer_head.h>
54 #include <linux/workqueue.h>
55 #include <linux/mount.h>
56
57 #include <linux/lustre_acl.h>
58 #include <obd_class.h>
59 #include <lustre_dlm.h>
60 #include <obd_lov.h>
61 #include <lustre_fsfilt.h>
62 #include <lprocfs_status.h>
63 #include <lustre_commit_confd.h>
64 #include <lustre_quota.h>
65 #include <lustre_disk.h>
66 #include <lustre_param.h>
67
68 #include "mds_internal.h"
69
70 __u32 mds_max_ost_index=0xFFFF;
71 CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
72                 "maximal OST index");
73
74 /* Look up an entry by inode number. */
75 /* this function ONLY returns valid dget'd dentries with an initialized inode
76    or errors */
77 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
78                               struct vfsmount **mnt)
79 {
80         char fid_name[32];
81         unsigned long ino = fid->id;
82         __u32 generation = fid->generation;
83         struct inode *inode;
84         struct dentry *result;
85
86         if (ino == 0)
87                 RETURN(ERR_PTR(-ESTALE));
88
89         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
90
91         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
92                ino, generation, mds->mds_obt.obt_sb);
93
94         /* under ext3 this is neither supposed to return bad inodes
95            nor NULL inodes. */
96         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
97         if (IS_ERR(result))
98                 RETURN(result);
99
100         inode = result->d_inode;
101         if (!inode)
102                 RETURN(ERR_PTR(-ENOENT));
103
104         if (inode->i_generation == 0 || inode->i_nlink == 0) {
105                 LCONSOLE_WARN("Found inode with zero generation or link -- this"
106                               " may indicate disk corruption (inode: %lu/%u, "
107                               "link %lu, count %d)\n", inode->i_ino,
108                               inode->i_generation,(unsigned long)inode->i_nlink,
109                               atomic_read(&inode->i_count));
110                 dput(result);
111                 RETURN(ERR_PTR(-ENOENT));
112         }
113
114         if (generation && inode->i_generation != generation) {
115                 /* we didn't find the right inode.. */
116                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
117                        "count: %d, generation %u/%u\n", inode->i_ino,
118                        (unsigned long)inode->i_nlink,
119                        atomic_read(&inode->i_count), inode->i_generation,
120                        generation);
121                 dput(result);
122                 RETURN(ERR_PTR(-ENOENT));
123         }
124
125         if (mnt) {
126                 *mnt = mds->mds_vfsmnt;
127                 mntget(*mnt);
128         }
129
130         RETURN(result);
131 }
132
133 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
134 {
135         int rc = 0;
136         ENTRY;
137
138         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
139                 class_uuid_t uuid;
140
141                 ll_generate_random_uuid(uuid);
142                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
143
144                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
145                 if (mds->mds_profile == NULL)
146                         RETURN(-ENOMEM);
147
148                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
149                         LUSTRE_CFG_BUFLEN(lcfg, 3));
150         }
151         RETURN(rc);
152 }
153
154 static int mds_lov_clean(struct obd_device *obd)
155 {
156         struct mds_obd *mds = &obd->u.mds;
157         struct obd_device *osc = mds->mds_osc_obd;
158         ENTRY;
159
160         if (mds->mds_profile) {
161                 class_del_profile(mds->mds_profile);
162                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
163                 mds->mds_profile = NULL;
164         }
165
166         /* There better be a lov */
167         if (!osc)
168                 RETURN(0);
169         if (IS_ERR(osc))
170                 RETURN(PTR_ERR(osc));
171
172         obd_register_observer(osc, NULL);
173
174         /* Give lov our same shutdown flags */
175         osc->obd_force = obd->obd_force;
176         osc->obd_fail = obd->obd_fail;
177
178         /* Cleanup the lov */
179         obd_disconnect(mds->mds_osc_exp);
180         class_manual_cleanup(osc);
181
182         RETURN(0);
183 }
184
185 static int mds_postsetup(struct obd_device *obd)
186 {
187         struct mds_obd *mds = &obd->u.mds;
188         int rc = 0;
189         ENTRY;
190
191         rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
192                         &llog_lvfs_ops);
193         if (rc)
194                 RETURN(rc);
195
196         rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
197                         &llog_lvfs_ops);
198         if (rc)
199                 RETURN(rc);
200
201         if (mds->mds_profile) {
202                 struct lustre_profile *lprof;
203                 /* The profile defines which osc and mdc to connect to, for a
204                    client.  We reuse that here to figure out the name of the
205                    lov to use (and ignore lprof->lp_md).
206                    The profile was set in the config log with
207                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
208                 lprof = class_get_profile(mds->mds_profile);
209                 if (lprof == NULL) {
210                         CERROR("No profile found: %s\n", mds->mds_profile);
211                         GOTO(err_cleanup, rc = -ENOENT);
212                 }
213                 rc = mds_lov_connect(obd, lprof->lp_dt);
214                 if (rc)
215                         GOTO(err_cleanup, rc);
216         }
217
218         RETURN(rc);
219
220 err_cleanup:
221         mds_lov_clean(obd);
222         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
223         llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
224         RETURN(rc);
225 }
226
227 int mds_postrecov(struct obd_device *obd)
228 {
229         int rc = 0;
230         ENTRY;
231
232         if (obd->obd_fail)
233                 RETURN(0);
234
235         LASSERT(!obd->obd_recovering);
236         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
237
238         /* clean PENDING dir */
239 #if 0
240         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
241                 rc = mds_cleanup_pending(obd);
242                 if (rc < 0)
243                         GOTO(out, rc);
244 #endif
245         /* FIXME Does target_finish_recovery really need this to block? */
246         /* Notify the LOV, which will in turn call mds_notify for each tgt */
247         /* This means that we have to hack obd_notify to think we're obd_set_up
248            during mds_lov_connect. */
249         obd_notify(obd->u.mds.mds_osc_obd, NULL,
250                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
251                    OBD_NOTIFY_SYNC, NULL);
252
253         /* quota recovery */
254         lquota_recovery(mds_quota_interface_ref, obd);
255
256         RETURN(rc);
257 }
258
259 /* We need to be able to stop an mds_lov_synchronize */
260 static int mds_lov_early_clean(struct obd_device *obd)
261 {
262         struct mds_obd *mds = &obd->u.mds;
263         struct obd_device *osc = mds->mds_osc_obd;
264
265         if (!osc || (!obd->obd_force && !obd->obd_fail))
266                 return(0);
267
268         CDEBUG(D_HA, "abort inflight\n");
269         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
270 }
271
272 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
273 {
274         int rc = 0;
275         struct mds_obd *mds = &obd->u.mds;
276         ENTRY;
277
278         switch (stage) {
279         case OBD_CLEANUP_EARLY:
280                 break;
281         case OBD_CLEANUP_EXPORTS:
282                 mds_lov_early_clean(obd);
283                 down_write(&mds->mds_notify_lock);
284                 mds_lov_disconnect(obd);
285                 mds_lov_clean(obd);
286                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
287                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
288                 rc = obd_llog_finish(obd, 0);
289                 mds->mds_osc_exp = NULL;
290                 up_write(&mds->mds_notify_lock);
291                 break;
292         }
293         RETURN(rc);
294 }
295
296 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
297                                           void *data)
298 {
299         struct obd_device *obd = data;
300         struct ll_fid fid;
301         fid.id = id;
302         fid.generation = gen;
303         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
304 }
305
306
307 struct lvfs_callback_ops mds_lvfs_ops = {
308         l_fid2dentry:     mds_lvfs_fid2dentry,
309 };
310
311 quota_interface_t *mds_quota_interface_ref;
312 extern quota_interface_t mds_quota_interface;
313
314 static void mds_init_ctxt(struct obd_device *obd, struct vfsmount *mnt)
315 {
316         struct mds_obd *mds = &obd->u.mds;
317
318         mds->mds_vfsmnt = mnt;
319         /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
320         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
321
322         fsfilt_setup(obd, obd->u.obt.obt_sb);
323
324         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
325         obd->obd_lvfs_ctxt.pwdmnt = mnt;
326         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
327         obd->obd_lvfs_ctxt.fs = get_ds();
328         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
329         return;
330 }
331
332 /*mds still need lov setup here*/
333 static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
334 {
335         struct mds_obd *mds = &obd->u.mds;
336         struct lvfs_run_ctxt saved;
337         const char     *dev;
338         struct vfsmount *mnt;
339         struct lustre_sb_info *lsi;
340         struct lustre_mount_info *lmi;
341         struct dentry  *dentry;
342         int rc = 0;
343         ENTRY;
344
345         CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
346         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
347                 RETURN(0);
348
349         if (lcfg->lcfg_bufcount < 5) {
350                 CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
351                 RETURN(-EINVAL);
352         }
353         dev = lustre_cfg_string(lcfg, 4);
354         lmi = server_get_mount(dev);
355         LASSERT(lmi != NULL);
356
357         lsi = s2lsi(lmi->lmi_sb);
358         mnt = lmi->lmi_mnt;
359         /* FIXME: MDD LOV initialize objects.
360          * we need only lmi here but not get mount
361          * OSD did mount already, so put mount back
362          */
363         atomic_dec(&lsi->lsi_mounts);
364         mntput(mnt);
365         init_rwsem(&mds->mds_notify_lock);
366
367         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
368         mds_init_ctxt(obd, mnt);
369
370         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
371         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
372         if (IS_ERR(dentry)) {
373                 rc = PTR_ERR(dentry);
374                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
375                 GOTO(err_putfs, rc);
376         }
377         mds->mds_objects_dir = dentry;
378
379         dentry = lookup_one_len("__iopen__", current->fs->pwd,
380                                 strlen("__iopen__"));
381         if (IS_ERR(dentry)) {
382                 rc = PTR_ERR(dentry);
383                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
384                 GOTO(err_objects, rc);
385         }
386
387         mds->mds_fid_de = dentry;
388         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
389                 rc = -ENOENT;
390                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
391                 GOTO(err_fid, rc);
392         }
393         rc = mds_lov_init_objids(obd);
394         if (rc != 0) {
395                CERROR("cannot init lov objid rc = %d\n", rc);
396                GOTO(err_fid, rc );
397         }
398
399         rc = mds_lov_presetup(mds, lcfg);
400         if (rc < 0)
401                 GOTO(err_objects, rc);
402
403         /* Don't wait for mds_postrecov trying to clear orphans */
404         obd->obd_async_recov = 1;
405         rc = mds_postsetup(obd);
406         /* Bug 11557 - allow async abort_recov start
407            FIXME can remove most of this obd_async_recov plumbing
408         obd->obd_async_recov = 0;
409         */
410
411         if (rc)
412                 GOTO(err_objects, rc);
413
414         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
415         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
416
417 err_pop:
418         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
419         RETURN(rc);
420 err_fid:
421         dput(mds->mds_fid_de);
422 err_objects:
423         dput(mds->mds_objects_dir);
424 err_putfs:
425         fsfilt_put_ops(obd->obd_fsops);
426         goto err_pop;
427 }
428
429 static int mds_cmd_cleanup(struct obd_device *obd)
430 {
431         struct mds_obd *mds = &obd->u.mds;
432         struct lvfs_run_ctxt saved;
433         int rc = 0;
434         ENTRY;
435
436         mds->mds_osc_exp = NULL;
437
438         if (obd->obd_fail)
439                 LCONSOLE_WARN("%s: shutting down for failover; client state "
440                               "will be preserved.\n", obd->obd_name);
441
442         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
443
444         mds_lov_destroy_objids(obd);
445
446         if (mds->mds_objects_dir != NULL) {
447                 l_dput(mds->mds_objects_dir);
448                 mds->mds_objects_dir = NULL;
449         }
450
451         shrink_dcache_parent(mds->mds_fid_de);
452         dput(mds->mds_fid_de);
453         LL_DQUOT_OFF(obd->u.obt.obt_sb);
454         fsfilt_put_ops(obd->obd_fsops);
455
456         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
457         RETURN(rc);
458 }
459
460 #if 0
461 static int mds_cmd_health_check(struct obd_device *obd)
462 {
463         return 0;
464 }
465 #endif
466 static struct obd_ops mds_cmd_obd_ops = {
467         .o_owner           = THIS_MODULE,
468         .o_setup           = mds_cmd_setup,
469         .o_cleanup         = mds_cmd_cleanup,
470         .o_precleanup      = mds_precleanup,
471         .o_create          = mds_obd_create,
472         .o_destroy         = mds_obd_destroy,
473         .o_llog_init       = mds_llog_init,
474         .o_llog_finish     = mds_llog_finish,
475         .o_notify          = mds_notify,
476         .o_postrecov       = mds_postrecov,
477         //   .o_health_check    = mds_cmd_health_check,
478 };
479
480 static int __init mds_cmd_init(void)
481 {
482         struct lprocfs_static_vars lvars;
483
484         lprocfs_mds_init_vars(&lvars);
485         class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
486                             LUSTRE_MDS_NAME, NULL);
487
488         return 0;
489 }
490
491 static void /*__exit*/ mds_cmd_exit(void)
492 {
493         class_unregister_type(LUSTRE_MDS_NAME);
494 }
495
496 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
497 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
498 MODULE_LICENSE("GPL");
499
500 module_init(mds_cmd_init);
501 module_exit(mds_cmd_exit);