Whamcloud - gitweb
b=14230
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <lustre_mds.h>
35 #include <linux/module.h>
36 #include <linux/init.h>
37 #include <linux/random.h>
38 #include <linux/fs.h>
39 #include <linux/jbd.h>
40 #include <linux/smp_lock.h>
41 #include <linux/buffer_head.h>
42 #include <linux/workqueue.h>
43 #include <linux/mount.h>
44
45 #include <linux/lustre_acl.h>
46 #include <obd_class.h>
47 #include <lustre_dlm.h>
48 #include <obd_lov.h>
49 #include <lustre_fsfilt.h>
50 #include <lprocfs_status.h>
51 #include <lustre_commit_confd.h>
52 #include <lustre_quota.h>
53 #include <lustre_disk.h>
54 #include <lustre_param.h>
55
56 #include "mds_internal.h"
57
58 __u32 mds_max_ost_index=0xFFFF;
59 CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
60                 "maximal OST index");
61
62 /* Look up an entry by inode number. */
63 /* this function ONLY returns valid dget'd dentries with an initialized inode
64    or errors */
65 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
66                               struct vfsmount **mnt)
67 {
68         char fid_name[32];
69         unsigned long ino = fid->id;
70         __u32 generation = fid->generation;
71         struct inode *inode;
72         struct dentry *result;
73
74         if (ino == 0)
75                 RETURN(ERR_PTR(-ESTALE));
76
77         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
78
79         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
80                ino, generation, mds->mds_obt.obt_sb);
81
82         /* under ext3 this is neither supposed to return bad inodes
83            nor NULL inodes. */
84         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
85         if (IS_ERR(result))
86                 RETURN(result);
87
88         inode = result->d_inode;
89         if (!inode)
90                 RETURN(ERR_PTR(-ENOENT));
91
92         if (inode->i_generation == 0 || inode->i_nlink == 0) {
93                 LCONSOLE_WARN("Found inode with zero generation or link -- this"
94                               " may indicate disk corruption (inode: %lu/%u, "
95                               "link %lu, count %d)\n", inode->i_ino,
96                               inode->i_generation,(unsigned long)inode->i_nlink,
97                               atomic_read(&inode->i_count));
98                 dput(result);
99                 RETURN(ERR_PTR(-ENOENT));
100         }
101
102         if (generation && inode->i_generation != generation) {
103                 /* we didn't find the right inode.. */
104                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
105                        "count: %d, generation %u/%u\n", inode->i_ino,
106                        (unsigned long)inode->i_nlink,
107                        atomic_read(&inode->i_count), inode->i_generation,
108                        generation);
109                 dput(result);
110                 RETURN(ERR_PTR(-ENOENT));
111         }
112
113         if (mnt) {
114                 *mnt = mds->mds_vfsmnt;
115                 mntget(*mnt);
116         }
117
118         RETURN(result);
119 }
120
121 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
122 {
123         int rc = 0;
124         ENTRY;
125
126         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
127                 class_uuid_t uuid;
128
129                 ll_generate_random_uuid(uuid);
130                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
131
132                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
133                 if (mds->mds_profile == NULL)
134                         RETURN(-ENOMEM);
135
136                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
137                         LUSTRE_CFG_BUFLEN(lcfg, 3));
138         }
139         RETURN(rc);
140 }
141
142 static int mds_lov_clean(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         struct obd_device *osc = mds->mds_osc_obd;
146         ENTRY;
147
148         if (mds->mds_profile) {
149                 class_del_profile(mds->mds_profile);
150                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
151                 mds->mds_profile = NULL;
152         }
153
154         /* There better be a lov */
155         if (!osc)
156                 RETURN(0);
157         if (IS_ERR(osc))
158                 RETURN(PTR_ERR(osc));
159
160         obd_register_observer(osc, NULL);
161
162         /* Give lov our same shutdown flags */
163         osc->obd_force = obd->obd_force;
164         osc->obd_fail = obd->obd_fail;
165
166         /* Cleanup the lov */
167         obd_disconnect(mds->mds_osc_exp);
168         class_manual_cleanup(osc);
169         mds->mds_osc_exp = NULL;
170
171         RETURN(0);
172 }
173
174 static int mds_postsetup(struct obd_device *obd)
175 {
176         struct mds_obd *mds = &obd->u.mds;
177         int rc = 0;
178         ENTRY;
179
180         rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
181                         &llog_lvfs_ops);
182         if (rc)
183                 RETURN(rc);
184
185         rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
186                         &llog_lvfs_ops);
187         if (rc)
188                 RETURN(rc);
189
190         if (mds->mds_profile) {
191                 struct lustre_profile *lprof;
192                 /* The profile defines which osc and mdc to connect to, for a
193                    client.  We reuse that here to figure out the name of the
194                    lov to use (and ignore lprof->lp_md).
195                    The profile was set in the config log with
196                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
197                 lprof = class_get_profile(mds->mds_profile);
198                 if (lprof == NULL) {
199                         CERROR("No profile found: %s\n", mds->mds_profile);
200                         GOTO(err_cleanup, rc = -ENOENT);
201                 }
202                 rc = mds_lov_connect(obd, lprof->lp_dt);
203                 if (rc)
204                         GOTO(err_cleanup, rc);
205         }
206
207         RETURN(rc);
208
209 err_cleanup:
210         mds_lov_clean(obd);
211         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
212         llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
213         RETURN(rc);
214 }
215
216 int mds_postrecov(struct obd_device *obd)
217 {
218         int rc = 0;
219         ENTRY;
220
221         if (obd->obd_fail)
222                 RETURN(0);
223
224         LASSERT(!obd->obd_recovering);
225         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
226
227         /* clean PENDING dir */
228 #if 0
229         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
230                 rc = mds_cleanup_pending(obd);
231                 if (rc < 0)
232                         GOTO(out, rc);
233 #endif
234         /* FIXME Does target_finish_recovery really need this to block? */
235         /* Notify the LOV, which will in turn call mds_notify for each tgt */
236         /* This means that we have to hack obd_notify to think we're obd_set_up
237            during mds_lov_connect. */
238         obd_notify(obd->u.mds.mds_osc_obd, NULL,
239                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
240                    OBD_NOTIFY_SYNC, NULL);
241
242         /* quota recovery */
243         lquota_recovery(mds_quota_interface_ref, obd);
244
245         RETURN(rc);
246 }
247
248 /* We need to be able to stop an mds_lov_synchronize */
249 static int mds_lov_early_clean(struct obd_device *obd)
250 {
251         struct mds_obd *mds = &obd->u.mds;
252         struct obd_device *osc = mds->mds_osc_obd;
253
254         if (!osc || (!obd->obd_force && !obd->obd_fail))
255                 return(0);
256
257         CDEBUG(D_HA, "abort inflight\n");
258         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
259 }
260
261 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
262 {
263         int rc = 0;
264         ENTRY;
265
266         switch (stage) {
267         case OBD_CLEANUP_EARLY:
268                 break;
269         case OBD_CLEANUP_EXPORTS:
270                 /*XXX Use this for mdd mds cleanup, so comment out
271                  *this target_cleanup_recovery for this tmp MDD MDS
272                  *Wangdi*/
273                 if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
274                         target_cleanup_recovery(obd);
275                 mds_lov_early_clean(obd);
276                 break;
277         case OBD_CLEANUP_SELF_EXP:
278                 mds_lov_disconnect(obd);
279                 mds_lov_clean(obd);
280                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
281                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
282                 rc = obd_llog_finish(obd, 0);
283                 break;
284         case OBD_CLEANUP_OBD:
285                 break;
286         }
287         RETURN(rc);
288 }
289
290 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
291                                           void *data)
292 {
293         struct obd_device *obd = data;
294         struct ll_fid fid;
295         fid.id = id;
296         fid.generation = gen;
297         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
298 }
299
300
301 struct lvfs_callback_ops mds_lvfs_ops = {
302         l_fid2dentry:     mds_lvfs_fid2dentry,
303 };
304
305 quota_interface_t *mds_quota_interface_ref;
306 extern quota_interface_t mds_quota_interface;
307
308 static void mds_init_ctxt(struct obd_device *obd, struct vfsmount *mnt)
309 {
310         struct mds_obd *mds = &obd->u.mds;
311
312         mds->mds_vfsmnt = mnt;
313         /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
314         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
315
316         fsfilt_setup(obd, obd->u.obt.obt_sb);
317
318         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
319         obd->obd_lvfs_ctxt.pwdmnt = mnt;
320         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
321         obd->obd_lvfs_ctxt.fs = get_ds();
322         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
323         return;
324 }
325
326 /*mds still need lov setup here*/
327 static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
328 {
329         struct mds_obd *mds = &obd->u.mds;
330         struct lvfs_run_ctxt saved;
331         const char     *dev;
332         struct vfsmount *mnt;
333         struct lustre_sb_info *lsi;
334         struct lustre_mount_info *lmi;
335         struct dentry  *dentry;
336         int rc = 0;
337         ENTRY;
338
339         CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
340         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
341                 RETURN(0);
342
343         if (lcfg->lcfg_bufcount < 5) {
344                 CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
345                 RETURN(-EINVAL);
346         }
347         dev = lustre_cfg_string(lcfg, 4);
348         lmi = server_get_mount(dev);
349         LASSERT(lmi != NULL);
350
351         lsi = s2lsi(lmi->lmi_sb);
352         mnt = lmi->lmi_mnt;
353         /* FIXME: MDD LOV initialize objects.
354          * we need only lmi here but not get mount
355          * OSD did mount already, so put mount back
356          */
357         atomic_dec(&lsi->lsi_mounts);
358         mntput(mnt);
359
360         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
361         mds_init_ctxt(obd, mnt);
362
363         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
364         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
365         if (IS_ERR(dentry)) {
366                 rc = PTR_ERR(dentry);
367                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
368                 GOTO(err_putfs, rc);
369         }
370         mds->mds_objects_dir = dentry;
371
372         dentry = lookup_one_len("__iopen__", current->fs->pwd,
373                                 strlen("__iopen__"));
374         if (IS_ERR(dentry)) {
375                 rc = PTR_ERR(dentry);
376                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
377                 GOTO(err_objects, rc);
378         }
379
380         mds->mds_fid_de = dentry;
381         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
382                 rc = -ENOENT;
383                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
384                 GOTO(err_fid, rc);
385         }
386         rc = mds_lov_init_objids(obd);
387         if (rc != 0) {
388                CERROR("cannot init lov objid rc = %d\n", rc);
389                GOTO(err_fid, rc );
390         }
391
392         rc = mds_lov_presetup(mds, lcfg);
393         if (rc < 0)
394                 GOTO(err_objects, rc);
395
396         /* Don't wait for mds_postrecov trying to clear orphans */
397         obd->obd_async_recov = 1;
398         rc = mds_postsetup(obd);
399         /* Bug 11557 - allow async abort_recov start
400            FIXME can remove most of this obd_async_recov plumbing
401         obd->obd_async_recov = 0;
402         */
403
404         if (rc)
405                 GOTO(err_objects, rc);
406
407         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
408         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
409
410 err_pop:
411         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
412         RETURN(rc);
413 err_fid:
414         dput(mds->mds_fid_de);
415 err_objects:
416         dput(mds->mds_objects_dir);
417 err_putfs:
418         fsfilt_put_ops(obd->obd_fsops);
419         goto err_pop;
420 }
421
422 static int mds_cmd_cleanup(struct obd_device *obd)
423 {
424         struct mds_obd *mds = &obd->u.mds;
425         struct lvfs_run_ctxt saved;
426         int rc = 0;
427         ENTRY;
428
429         if (obd->obd_fail)
430                 LCONSOLE_WARN("%s: shutting down for failover; client state "
431                               "will be preserved.\n", obd->obd_name);
432
433         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
434
435         mds_lov_destroy_objids(obd);
436
437         if (mds->mds_objects_dir != NULL) {
438                 l_dput(mds->mds_objects_dir);
439                 mds->mds_objects_dir = NULL;
440         }
441
442         shrink_dcache_parent(mds->mds_fid_de);
443         dput(mds->mds_fid_de);
444         LL_DQUOT_OFF(obd->u.obt.obt_sb);
445         fsfilt_put_ops(obd->obd_fsops);
446
447         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
448         RETURN(rc);
449 }
450
451 #if 0
452 static int mds_cmd_health_check(struct obd_device *obd)
453 {
454         return 0;
455 }
456 #endif
457 static struct obd_ops mds_cmd_obd_ops = {
458         .o_owner           = THIS_MODULE,
459         .o_setup           = mds_cmd_setup,
460         .o_cleanup         = mds_cmd_cleanup,
461         .o_precleanup      = mds_precleanup,
462         .o_create          = mds_obd_create,
463         .o_destroy         = mds_obd_destroy,
464         .o_llog_init       = mds_llog_init,
465         .o_llog_finish     = mds_llog_finish,
466         .o_notify          = mds_notify,
467         .o_postrecov       = mds_postrecov,
468         //   .o_health_check    = mds_cmd_health_check,
469 };
470
471 static int __init mds_cmd_init(void)
472 {
473         struct lprocfs_static_vars lvars;
474
475         lprocfs_mds_init_vars(&lvars);
476         class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
477                             LUSTRE_MDS_NAME, NULL);
478
479         return 0;
480 }
481
482 static void /*__exit*/ mds_cmd_exit(void)
483 {
484         class_unregister_type(LUSTRE_MDS_NAME);
485 }
486
487 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
488 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
489 MODULE_LICENSE("GPL");
490
491 module_init(mds_cmd_init);
492 module_exit(mds_cmd_exit);