Whamcloud - gitweb
b=17447
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/handler.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Andreas Dilger <adilger@clusterfs.com>
40  * Author: Phil Schwan <phil@clusterfs.com>
41  * Author: Mike Shaver <shaver@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <lustre_mds.h>
47 #include <linux/module.h>
48 #include <linux/init.h>
49 #include <linux/random.h>
50 #include <linux/fs.h>
51 #include <linux/jbd.h>
52 #include <linux/smp_lock.h>
53 #include <linux/buffer_head.h>
54 #include <linux/workqueue.h>
55 #include <linux/mount.h>
56
57 #include <lustre_acl.h>
58 #include <obd_class.h>
59 #include <lustre_dlm.h>
60 #include <obd_lov.h>
61 #include <lustre_fsfilt.h>
62 #include <lprocfs_status.h>
63 #include <lustre_quota.h>
64 #include <lustre_disk.h>
65 #include <lustre_param.h>
66
67 #include "mds_internal.h"
68
69 __u32 mds_max_ost_index=0xFFFF;
70 CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
71                 "maximal OST index");
72
73 /* Look up an entry by inode number. */
74 /* this function ONLY returns valid dget'd dentries with an initialized inode
75    or errors */
76 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
77                               struct vfsmount **mnt)
78 {
79         char fid_name[32];
80         unsigned long ino = fid->id;
81         __u32 generation = fid->generation;
82         struct inode *inode;
83         struct dentry *result;
84
85         if (ino == 0)
86                 RETURN(ERR_PTR(-ESTALE));
87
88         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
89
90         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
91                ino, generation, mds->mds_obt.obt_sb);
92
93         /* under ext3 this is neither supposed to return bad inodes
94            nor NULL inodes. */
95         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
96         if (IS_ERR(result))
97                 RETURN(result);
98
99         inode = result->d_inode;
100         if (!inode)
101                 RETURN(ERR_PTR(-ENOENT));
102
103         if (inode->i_generation == 0 || inode->i_nlink == 0) {
104                 LCONSOLE_WARN("Found inode with zero generation or link -- this"
105                               " may indicate disk corruption (inode: %lu/%u, "
106                               "link %lu, count %d)\n", inode->i_ino,
107                               inode->i_generation,(unsigned long)inode->i_nlink,
108                               atomic_read(&inode->i_count));
109                 dput(result);
110                 RETURN(ERR_PTR(-ENOENT));
111         }
112
113         if (generation && inode->i_generation != generation) {
114                 /* we didn't find the right inode.. */
115                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
116                        "count: %d, generation %u/%u\n", inode->i_ino,
117                        (unsigned long)inode->i_nlink,
118                        atomic_read(&inode->i_count), inode->i_generation,
119                        generation);
120                 dput(result);
121                 RETURN(ERR_PTR(-ENOENT));
122         }
123
124         if (mnt) {
125                 *mnt = mds->mds_vfsmnt;
126                 mntget(*mnt);
127         }
128
129         RETURN(result);
130 }
131
132 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
133 {
134         int rc = 0;
135         ENTRY;
136
137         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
138                 class_uuid_t uuid;
139
140                 ll_generate_random_uuid(uuid);
141                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
142
143                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
144                 if (mds->mds_profile == NULL)
145                         RETURN(-ENOMEM);
146
147                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
148                         LUSTRE_CFG_BUFLEN(lcfg, 3));
149         }
150         RETURN(rc);
151 }
152
153 static int mds_lov_clean(struct obd_device *obd)
154 {
155         struct mds_obd *mds = &obd->u.mds;
156         struct obd_device *osc = mds->mds_osc_obd;
157         ENTRY;
158
159         if (mds->mds_profile) {
160                 class_del_profile(mds->mds_profile);
161                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
162                 mds->mds_profile = NULL;
163         }
164
165         /* There better be a lov */
166         if (!osc)
167                 RETURN(0);
168         if (IS_ERR(osc))
169                 RETURN(PTR_ERR(osc));
170
171         obd_register_observer(osc, NULL);
172
173         /* Give lov our same shutdown flags */
174         osc->obd_force = obd->obd_force;
175         osc->obd_fail = obd->obd_fail;
176
177         /* Cleanup the lov */
178         obd_disconnect(mds->mds_osc_exp);
179         class_manual_cleanup(osc);
180
181         RETURN(0);
182 }
183
184 static int mds_postsetup(struct obd_device *obd)
185 {
186         struct mds_obd *mds = &obd->u.mds;
187         struct llog_ctxt *ctxt;
188         int rc = 0;
189         ENTRY;
190
191         rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
192                         &llog_lvfs_ops);
193         if (rc)
194                 RETURN(rc);
195
196         rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
197                         &llog_lvfs_ops);
198         if (rc)
199                 GOTO(err_llog, rc);
200
201         if (mds->mds_profile) {
202                 struct lustre_profile *lprof;
203                 /* The profile defines which osc and mdc to connect to, for a
204                    client.  We reuse that here to figure out the name of the
205                    lov to use (and ignore lprof->lp_md).
206                    The profile was set in the config log with
207                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
208                 lprof = class_get_profile(mds->mds_profile);
209                 if (lprof == NULL) {
210                         CERROR("No profile found: %s\n", mds->mds_profile);
211                         GOTO(err_cleanup, rc = -ENOENT);
212                 }
213                 rc = mds_lov_connect(obd, lprof->lp_dt);
214                 if (rc)
215                         GOTO(err_cleanup, rc);
216         }
217
218         RETURN(rc);
219
220 err_cleanup:
221         mds_lov_clean(obd);
222         ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT);
223         if (ctxt)
224                 llog_cleanup(ctxt);
225 err_llog:
226         ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
227         if (ctxt)
228                 llog_cleanup(ctxt);
229         return rc;
230 }
231
232 int mds_postrecov(struct obd_device *obd)
233 {
234         int rc = 0;
235         ENTRY;
236
237         if (obd->obd_fail)
238                 RETURN(0);
239
240         LASSERT(!obd->obd_recovering);
241         /* clean PENDING dir */
242 #if 0
243         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
244                 rc = mds_cleanup_pending(obd);
245                 if (rc < 0)
246                         GOTO(out, rc);
247 #endif
248         /* FIXME Does target_finish_recovery really need this to block? */
249         /* Notify the LOV, which will in turn call mds_notify for each tgt */
250         /* This means that we have to hack obd_notify to think we're obd_set_up
251            during mds_lov_connect. */
252         obd_notify(obd->u.mds.mds_osc_obd, NULL,
253                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
254                    OBD_NOTIFY_SYNC, NULL);
255
256         /* quota recovery */
257         lquota_recovery(mds_quota_interface_ref, obd);
258
259         RETURN(rc);
260 }
261
262 /* We need to be able to stop an mds_lov_synchronize */
263 static int mds_lov_early_clean(struct obd_device *obd)
264 {
265         struct mds_obd *mds = &obd->u.mds;
266         struct obd_device *osc = mds->mds_osc_obd;
267
268         if (!osc || (!obd->obd_force && !obd->obd_fail))
269                 return(0);
270
271         CDEBUG(D_HA, "abort inflight\n");
272         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
273 }
274
275 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
276 {
277         int rc = 0;
278         struct mds_obd *mds = &obd->u.mds;
279         ENTRY;
280
281         switch (stage) {
282         case OBD_CLEANUP_EARLY:
283                 break;
284         case OBD_CLEANUP_EXPORTS:
285                 mds_lov_early_clean(obd);
286                 down_write(&mds->mds_notify_lock);
287                 mds_lov_disconnect(obd);
288                 mds_lov_clean(obd);
289                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
290                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
291                 rc = obd_llog_finish(obd, 0);
292                 mds->mds_osc_exp = NULL;
293                 up_write(&mds->mds_notify_lock);
294                 break;
295         }
296         RETURN(rc);
297 }
298
299 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
300                                           void *data)
301 {
302         struct obd_device *obd = data;
303         struct ll_fid fid;
304         fid.id = id;
305         fid.generation = gen;
306         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
307 }
308
309
310 struct lvfs_callback_ops mds_lvfs_ops = {
311         l_fid2dentry:     mds_lvfs_fid2dentry,
312 };
313
314 quota_interface_t *mds_quota_interface_ref;
315 extern quota_interface_t mds_quota_interface;
316
317 static void mds_init_ctxt(struct obd_device *obd, struct vfsmount *mnt)
318 {
319         struct mds_obd *mds = &obd->u.mds;
320
321         mds->mds_vfsmnt = mnt;
322         /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
323         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
324
325         fsfilt_setup(obd, obd->u.obt.obt_sb);
326
327         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
328         obd->obd_lvfs_ctxt.pwdmnt = mnt;
329         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
330         obd->obd_lvfs_ctxt.fs = get_ds();
331         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
332         return;
333 }
334
335 /*mds still need lov setup here*/
336 static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
337 {
338         struct mds_obd *mds = &obd->u.mds;
339         struct lvfs_run_ctxt saved;
340         const char     *dev;
341         struct vfsmount *mnt;
342         struct lustre_sb_info *lsi;
343         struct lustre_mount_info *lmi;
344         struct dentry  *dentry;
345         int rc = 0;
346         ENTRY;
347
348         CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
349         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
350                 RETURN(0);
351
352         if (lcfg->lcfg_bufcount < 5) {
353                 CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
354                 RETURN(-EINVAL);
355         }
356         dev = lustre_cfg_string(lcfg, 4);
357         lmi = server_get_mount(dev);
358         LASSERT(lmi != NULL);
359
360         lsi = s2lsi(lmi->lmi_sb);
361         mnt = lmi->lmi_mnt;
362         /* FIXME: MDD LOV initialize objects.
363          * we need only lmi here but not get mount
364          * OSD did mount already, so put mount back
365          */
366         atomic_dec(&lsi->lsi_mounts);
367         mntput(mnt);
368         init_rwsem(&mds->mds_notify_lock);
369
370         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
371         mds_init_ctxt(obd, mnt);
372
373         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
374         dentry = simple_mkdir(current->fs->pwd, mnt, "OBJECTS", 0777, 1);
375         if (IS_ERR(dentry)) {
376                 rc = PTR_ERR(dentry);
377                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
378                 GOTO(err_putfs, rc);
379         }
380         mds->mds_objects_dir = dentry;
381
382         dentry = lookup_one_len("__iopen__", current->fs->pwd,
383                                 strlen("__iopen__"));
384         if (IS_ERR(dentry)) {
385                 rc = PTR_ERR(dentry);
386                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
387                 GOTO(err_objects, rc);
388         }
389
390         mds->mds_fid_de = dentry;
391         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
392                 rc = -ENOENT;
393                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
394                 GOTO(err_fid, rc);
395         }
396         rc = mds_lov_init_objids(obd);
397         if (rc != 0) {
398                CERROR("cannot init lov objid rc = %d\n", rc);
399                GOTO(err_fid, rc );
400         }
401
402         rc = mds_lov_presetup(mds, lcfg);
403         if (rc < 0)
404                 GOTO(err_objects, rc);
405
406         /* Don't wait for mds_postrecov trying to clear orphans */
407         obd->obd_async_recov = 1;
408         rc = mds_postsetup(obd);
409         /* Bug 11557 - allow async abort_recov start
410            FIXME can remove most of this obd_async_recov plumbing
411         obd->obd_async_recov = 0;
412         */
413
414         if (rc)
415                 GOTO(err_objects, rc);
416
417         mds->mds_max_mdsize = sizeof(struct lov_mds_md_v3);
418         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
419
420 err_pop:
421         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
422         RETURN(rc);
423 err_fid:
424         dput(mds->mds_fid_de);
425 err_objects:
426         dput(mds->mds_objects_dir);
427 err_putfs:
428         fsfilt_put_ops(obd->obd_fsops);
429         goto err_pop;
430 }
431
432 static int mds_cmd_cleanup(struct obd_device *obd)
433 {
434         struct mds_obd *mds = &obd->u.mds;
435         struct lvfs_run_ctxt saved;
436         int rc = 0;
437         ENTRY;
438
439         mds->mds_osc_exp = NULL;
440
441         if (obd->obd_fail)
442                 LCONSOLE_WARN("%s: shutting down for failover; client state "
443                               "will be preserved.\n", obd->obd_name);
444
445         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
446
447         mds_lov_destroy_objids(obd);
448
449         if (mds->mds_objects_dir != NULL) {
450                 l_dput(mds->mds_objects_dir);
451                 mds->mds_objects_dir = NULL;
452         }
453
454         shrink_dcache_parent(mds->mds_fid_de);
455         dput(mds->mds_fid_de);
456         LL_DQUOT_OFF(obd->u.obt.obt_sb);
457         fsfilt_put_ops(obd->obd_fsops);
458
459         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
460         RETURN(rc);
461 }
462
463 #if 0
464 static int mds_cmd_health_check(struct obd_device *obd)
465 {
466         return 0;
467 }
468 #endif
469 static struct obd_ops mds_cmd_obd_ops = {
470         .o_owner           = THIS_MODULE,
471         .o_setup           = mds_cmd_setup,
472         .o_cleanup         = mds_cmd_cleanup,
473         .o_precleanup      = mds_precleanup,
474         .o_create          = mds_obd_create,
475         .o_destroy         = mds_obd_destroy,
476         .o_llog_init       = mds_llog_init,
477         .o_llog_finish     = mds_llog_finish,
478         .o_notify          = mds_notify,
479         .o_postrecov       = mds_postrecov,
480         //   .o_health_check    = mds_cmd_health_check,
481 };
482
483 static int __init mds_cmd_init(void)
484 {
485         struct lprocfs_static_vars lvars;
486
487         lprocfs_mds_init_vars(&lvars);
488         class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
489                             LUSTRE_MDS_NAME, NULL);
490
491         return 0;
492 }
493
494 static void /*__exit*/ mds_cmd_exit(void)
495 {
496         class_unregister_type(LUSTRE_MDS_NAME);
497 }
498
499 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
500 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
501 MODULE_LICENSE("GPL");
502
503 module_init(mds_cmd_init);
504 module_exit(mds_cmd_exit);