Whamcloud - gitweb
b=15516
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <lustre_mds.h>
35 #include <linux/module.h>
36 #include <linux/init.h>
37 #include <linux/random.h>
38 #include <linux/fs.h>
39 #include <linux/jbd.h>
40 #include <linux/smp_lock.h>
41 #include <linux/buffer_head.h>
42 #include <linux/workqueue.h>
43 #include <linux/mount.h>
44
45 #include <linux/lustre_acl.h>
46 #include <obd_class.h>
47 #include <lustre_dlm.h>
48 #include <obd_lov.h>
49 #include <lustre_fsfilt.h>
50 #include <lprocfs_status.h>
51 #include <lustre_commit_confd.h>
52 #include <lustre_quota.h>
53 #include <lustre_disk.h>
54 #include <lustre_param.h>
55
56 #include "mds_internal.h"
57
58 __u32 mds_max_ost_index=0xFFFF;
59 CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
60                 "maximal OST index");
61
62 /* Look up an entry by inode number. */
63 /* this function ONLY returns valid dget'd dentries with an initialized inode
64    or errors */
65 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
66                               struct vfsmount **mnt)
67 {
68         char fid_name[32];
69         unsigned long ino = fid->id;
70         __u32 generation = fid->generation;
71         struct inode *inode;
72         struct dentry *result;
73
74         if (ino == 0)
75                 RETURN(ERR_PTR(-ESTALE));
76
77         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
78
79         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
80                ino, generation, mds->mds_obt.obt_sb);
81
82         /* under ext3 this is neither supposed to return bad inodes
83            nor NULL inodes. */
84         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
85         if (IS_ERR(result))
86                 RETURN(result);
87
88         inode = result->d_inode;
89         if (!inode)
90                 RETURN(ERR_PTR(-ENOENT));
91
92         if (inode->i_generation == 0 || inode->i_nlink == 0) {
93                 LCONSOLE_WARN("Found inode with zero generation or link -- this"
94                               " may indicate disk corruption (inode: %lu/%u, "
95                               "link %lu, count %d)\n", inode->i_ino,
96                               inode->i_generation,(unsigned long)inode->i_nlink,
97                               atomic_read(&inode->i_count));
98                 dput(result);
99                 RETURN(ERR_PTR(-ENOENT));
100         }
101
102         if (generation && inode->i_generation != generation) {
103                 /* we didn't find the right inode.. */
104                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
105                        "count: %d, generation %u/%u\n", inode->i_ino,
106                        (unsigned long)inode->i_nlink,
107                        atomic_read(&inode->i_count), inode->i_generation,
108                        generation);
109                 dput(result);
110                 RETURN(ERR_PTR(-ENOENT));
111         }
112
113         if (mnt) {
114                 *mnt = mds->mds_vfsmnt;
115                 mntget(*mnt);
116         }
117
118         RETURN(result);
119 }
120
121 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
122 {
123         int rc = 0;
124         ENTRY;
125
126         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
127                 class_uuid_t uuid;
128
129                 ll_generate_random_uuid(uuid);
130                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
131
132                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
133                 if (mds->mds_profile == NULL)
134                         RETURN(-ENOMEM);
135
136                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
137                         LUSTRE_CFG_BUFLEN(lcfg, 3));
138         }
139         RETURN(rc);
140 }
141
142 static int mds_lov_clean(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         struct obd_device *osc = mds->mds_osc_obd;
146         ENTRY;
147
148         if (mds->mds_profile) {
149                 class_del_profile(mds->mds_profile);
150                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
151                 mds->mds_profile = NULL;
152         }
153
154         /* There better be a lov */
155         if (!osc)
156                 RETURN(0);
157         if (IS_ERR(osc))
158                 RETURN(PTR_ERR(osc));
159
160         obd_register_observer(osc, NULL);
161
162         /* Give lov our same shutdown flags */
163         osc->obd_force = obd->obd_force;
164         osc->obd_fail = obd->obd_fail;
165
166         /* Cleanup the lov */
167         obd_disconnect(mds->mds_osc_exp);
168         class_manual_cleanup(osc);
169
170         RETURN(0);
171 }
172
173 static int mds_postsetup(struct obd_device *obd)
174 {
175         struct mds_obd *mds = &obd->u.mds;
176         int rc = 0;
177         ENTRY;
178
179         rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
180                         &llog_lvfs_ops);
181         if (rc)
182                 RETURN(rc);
183
184         rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
185                         &llog_lvfs_ops);
186         if (rc)
187                 RETURN(rc);
188
189         if (mds->mds_profile) {
190                 struct lustre_profile *lprof;
191                 /* The profile defines which osc and mdc to connect to, for a
192                    client.  We reuse that here to figure out the name of the
193                    lov to use (and ignore lprof->lp_md).
194                    The profile was set in the config log with
195                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
196                 lprof = class_get_profile(mds->mds_profile);
197                 if (lprof == NULL) {
198                         CERROR("No profile found: %s\n", mds->mds_profile);
199                         GOTO(err_cleanup, rc = -ENOENT);
200                 }
201                 rc = mds_lov_connect(obd, lprof->lp_dt);
202                 if (rc)
203                         GOTO(err_cleanup, rc);
204         }
205
206         RETURN(rc);
207
208 err_cleanup:
209         mds_lov_clean(obd);
210         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
211         llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
212         RETURN(rc);
213 }
214
215 int mds_postrecov(struct obd_device *obd)
216 {
217         int rc = 0;
218         ENTRY;
219
220         if (obd->obd_fail)
221                 RETURN(0);
222
223         LASSERT(!obd->obd_recovering);
224         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
225
226         /* clean PENDING dir */
227 #if 0
228         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
229                 rc = mds_cleanup_pending(obd);
230                 if (rc < 0)
231                         GOTO(out, rc);
232 #endif
233         /* FIXME Does target_finish_recovery really need this to block? */
234         /* Notify the LOV, which will in turn call mds_notify for each tgt */
235         /* This means that we have to hack obd_notify to think we're obd_set_up
236            during mds_lov_connect. */
237         obd_notify(obd->u.mds.mds_osc_obd, NULL,
238                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
239                    OBD_NOTIFY_SYNC, NULL);
240
241         /* quota recovery */
242         lquota_recovery(mds_quota_interface_ref, obd);
243
244         RETURN(rc);
245 }
246
247 /* We need to be able to stop an mds_lov_synchronize */
248 static int mds_lov_early_clean(struct obd_device *obd)
249 {
250         struct mds_obd *mds = &obd->u.mds;
251         struct obd_device *osc = mds->mds_osc_obd;
252
253         if (!osc || (!obd->obd_force && !obd->obd_fail))
254                 return(0);
255
256         CDEBUG(D_HA, "abort inflight\n");
257         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
258 }
259
260 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
261 {
262         int rc = 0;
263         struct mds_obd *mds = &obd->u.mds;
264         ENTRY;
265
266         switch (stage) {
267         case OBD_CLEANUP_EARLY:
268                 break;
269         case OBD_CLEANUP_EXPORTS:
270                 mds_lov_early_clean(obd);
271                 down_write(&mds->mds_notify_lock);
272                 mds_lov_disconnect(obd);
273                 mds_lov_clean(obd);
274                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
275                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
276                 rc = obd_llog_finish(obd, 0);
277                 mds->mds_osc_exp = NULL;
278                 up_write(&mds->mds_notify_lock);
279                 break;
280         }
281         RETURN(rc);
282 }
283
284 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
285                                           void *data)
286 {
287         struct obd_device *obd = data;
288         struct ll_fid fid;
289         fid.id = id;
290         fid.generation = gen;
291         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
292 }
293
294
295 struct lvfs_callback_ops mds_lvfs_ops = {
296         l_fid2dentry:     mds_lvfs_fid2dentry,
297 };
298
299 quota_interface_t *mds_quota_interface_ref;
300 extern quota_interface_t mds_quota_interface;
301
302 static void mds_init_ctxt(struct obd_device *obd, struct vfsmount *mnt)
303 {
304         struct mds_obd *mds = &obd->u.mds;
305
306         mds->mds_vfsmnt = mnt;
307         /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
308         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
309
310         fsfilt_setup(obd, obd->u.obt.obt_sb);
311
312         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
313         obd->obd_lvfs_ctxt.pwdmnt = mnt;
314         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
315         obd->obd_lvfs_ctxt.fs = get_ds();
316         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
317         return;
318 }
319
320 /*mds still need lov setup here*/
321 static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
322 {
323         struct mds_obd *mds = &obd->u.mds;
324         struct lvfs_run_ctxt saved;
325         const char     *dev;
326         struct vfsmount *mnt;
327         struct lustre_sb_info *lsi;
328         struct lustre_mount_info *lmi;
329         struct dentry  *dentry;
330         int rc = 0;
331         ENTRY;
332
333         CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
334         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
335                 RETURN(0);
336
337         if (lcfg->lcfg_bufcount < 5) {
338                 CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
339                 RETURN(-EINVAL);
340         }
341         dev = lustre_cfg_string(lcfg, 4);
342         lmi = server_get_mount(dev);
343         LASSERT(lmi != NULL);
344
345         lsi = s2lsi(lmi->lmi_sb);
346         mnt = lmi->lmi_mnt;
347         /* FIXME: MDD LOV initialize objects.
348          * we need only lmi here but not get mount
349          * OSD did mount already, so put mount back
350          */
351         atomic_dec(&lsi->lsi_mounts);
352         mntput(mnt);
353         init_rwsem(&mds->mds_notify_lock);
354
355         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
356         mds_init_ctxt(obd, mnt);
357
358         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
359         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
360         if (IS_ERR(dentry)) {
361                 rc = PTR_ERR(dentry);
362                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
363                 GOTO(err_putfs, rc);
364         }
365         mds->mds_objects_dir = dentry;
366
367         dentry = lookup_one_len("__iopen__", current->fs->pwd,
368                                 strlen("__iopen__"));
369         if (IS_ERR(dentry)) {
370                 rc = PTR_ERR(dentry);
371                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
372                 GOTO(err_objects, rc);
373         }
374
375         mds->mds_fid_de = dentry;
376         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
377                 rc = -ENOENT;
378                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
379                 GOTO(err_fid, rc);
380         }
381         rc = mds_lov_init_objids(obd);
382         if (rc != 0) {
383                CERROR("cannot init lov objid rc = %d\n", rc);
384                GOTO(err_fid, rc );
385         }
386
387         rc = mds_lov_presetup(mds, lcfg);
388         if (rc < 0)
389                 GOTO(err_objects, rc);
390
391         /* Don't wait for mds_postrecov trying to clear orphans */
392         obd->obd_async_recov = 1;
393         rc = mds_postsetup(obd);
394         /* Bug 11557 - allow async abort_recov start
395            FIXME can remove most of this obd_async_recov plumbing
396         obd->obd_async_recov = 0;
397         */
398
399         if (rc)
400                 GOTO(err_objects, rc);
401
402         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
403         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
404
405 err_pop:
406         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
407         RETURN(rc);
408 err_fid:
409         dput(mds->mds_fid_de);
410 err_objects:
411         dput(mds->mds_objects_dir);
412 err_putfs:
413         fsfilt_put_ops(obd->obd_fsops);
414         goto err_pop;
415 }
416
417 static int mds_cmd_cleanup(struct obd_device *obd)
418 {
419         struct mds_obd *mds = &obd->u.mds;
420         struct lvfs_run_ctxt saved;
421         int rc = 0;
422         ENTRY;
423
424         mds->mds_osc_exp = NULL;
425
426         if (obd->obd_fail)
427                 LCONSOLE_WARN("%s: shutting down for failover; client state "
428                               "will be preserved.\n", obd->obd_name);
429
430         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
431
432         mds_lov_destroy_objids(obd);
433
434         if (mds->mds_objects_dir != NULL) {
435                 l_dput(mds->mds_objects_dir);
436                 mds->mds_objects_dir = NULL;
437         }
438
439         shrink_dcache_parent(mds->mds_fid_de);
440         dput(mds->mds_fid_de);
441         LL_DQUOT_OFF(obd->u.obt.obt_sb);
442         fsfilt_put_ops(obd->obd_fsops);
443
444         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
445         RETURN(rc);
446 }
447
448 #if 0
449 static int mds_cmd_health_check(struct obd_device *obd)
450 {
451         return 0;
452 }
453 #endif
454 static struct obd_ops mds_cmd_obd_ops = {
455         .o_owner           = THIS_MODULE,
456         .o_setup           = mds_cmd_setup,
457         .o_cleanup         = mds_cmd_cleanup,
458         .o_precleanup      = mds_precleanup,
459         .o_create          = mds_obd_create,
460         .o_destroy         = mds_obd_destroy,
461         .o_llog_init       = mds_llog_init,
462         .o_llog_finish     = mds_llog_finish,
463         .o_notify          = mds_notify,
464         .o_postrecov       = mds_postrecov,
465         //   .o_health_check    = mds_cmd_health_check,
466 };
467
468 static int __init mds_cmd_init(void)
469 {
470         struct lprocfs_static_vars lvars;
471
472         lprocfs_mds_init_vars(&lvars);
473         class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
474                             LUSTRE_MDS_NAME, NULL);
475
476         return 0;
477 }
478
479 static void /*__exit*/ mds_cmd_exit(void)
480 {
481         class_unregister_type(LUSTRE_MDS_NAME);
482 }
483
484 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
485 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
486 MODULE_LICENSE("GPL");
487
488 module_init(mds_cmd_init);
489 module_exit(mds_cmd_exit);