Whamcloud - gitweb
Add separate file for MDS filesystem interaction routines.
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_fs.c
5  *
6  *  Lustre Metadata Server (MDS) filesystem interface code
7  *
8  *  Copyright (C) 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Andreas Dilger <adilger@clusterfs.com>
14  *
15  */
16
17 #define EXPORT_SYMTAB
18 #define DEBUG_SUBSYSTEM S_MDS
19
20 #include <linux/module.h>
21 #include <linux/kmod.h>
22 #include <linux/lustre_mds.h>
23
24 LIST_HEAD(mds_fs_types);
25
26 struct mds_fs_type {
27         struct list_head                 mft_list;
28         struct mds_fs_operations        *mft_ops;
29         char                            *mft_name;
30 };
31
32 /* This will be a hash table at some point. */
33 static int mds_init_client_data(struct mds_obd *mds)
34 {
35         INIT_LIST_HEAD(&mds->mds_client_info);
36         return 0;
37 }
38
39 #define MDS_MAX_CLIENTS 1024
40 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
41
42 static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
43
44 /* Add client data to the MDS.  The in-memory storage will be a hash at some
45  * point.  We use a bitmap to locate a free space in the last_rcvd file if
46  * cl_off is -1 (i.e. a new client).  Otherwise, we have just read the data
47  * from the last_rcvd file and we know its offset.
48  */
49 int mds_client_add(struct mds_obd *mds, struct mds_client_data *mcd, int cl_off)
50 {
51         struct mds_client_info *mci;
52
53         OBD_ALLOC(mci, sizeof(*mci));
54         if (!mci) {
55                 CERROR("no memory for MDS client info\n");
56                 RETURN(-ENOMEM);
57         }
58         INIT_LIST_HEAD(&mci->mci_open_head);
59
60         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
61                cl_off, mcd->mcd_uuid);
62
63         if (cl_off == -1) {
64                 unsigned long *word;
65                 int bit;
66
67         repeat:
68                 word = last_rcvd_slots;
69                 while(*word == ~0UL)
70                         ++word;
71                 if (word - last_rcvd_slots >= MDS_MAX_CLIENT_WORDS) {
72                         CERROR("no room in client MDS bitmap - fix code\n");
73                         return -ENOMEM;
74                 }
75                 bit = ffz(*word);
76                 if (test_and_set_bit(bit, word)) {
77                         CERROR("found bit %d set for word %d - fix code\n",
78                                bit, word - last_rcvd_slots);
79                         goto repeat;
80                 }
81                 cl_off = word - last_rcvd_slots + bit;
82         } else {
83                 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
84                         CERROR("bit %d already set in bitmap - bad bad\n",
85                                cl_off);
86                         LBUG();
87                 }
88         }
89
90         mci->mci_mcd = mcd;
91         mci->mci_off = cl_off;
92
93         /* For now we just put the clients in a list, not a hashed list */
94         list_add_tail(&mci->mci_list, &mds->mds_client_info);
95
96         mds->mds_client_count++;
97
98         return 0;
99 }
100
101 void mds_client_del(struct mds_obd *mds, struct mds_client_info *mci)
102 {
103         unsigned long *word;
104         int bit;
105
106         word = last_rcvd_slots + mci->mci_off / sizeof(unsigned long);
107         bit = mci->mci_off % sizeof(unsigned long);
108
109         if (!test_and_clear_bit(bit, word)) {
110                 CERROR("bit %d already clear in word %d - bad bad\n",
111                        bit, word - last_rcvd_slots);
112                 LBUG();
113         }
114
115         --mds->mds_client_count;
116         list_del(&mci->mci_list);
117         OBD_FREE(mci->mci_mcd, sizeof(*mci->mci_mcd));
118         OBD_FREE(mci, sizeof (*mci));
119 }
120
121 static int mds_client_free_all(struct mds_obd *mds)
122 {
123         struct list_head *p, *n;
124
125         list_for_each_safe(p, n, &mds->mds_client_info) {
126                 struct mds_client_info *mci;
127
128                 mci = list_entry(p, struct mds_client_info, mci_list);
129                 mds_client_del(mds, mci);
130         }
131
132         return 0;
133 }
134
135 static int mds_server_free_data(struct mds_obd *mds)
136 {
137         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
138         mds->mds_server_data = NULL;
139
140         return 0;
141 }
142
143 #define LAST_RCVD "last_rcvd"
144
145 static int mds_read_last_rcvd(struct mds_obd *mds, struct file *f)
146 {
147         struct mds_server_data *msd;
148         struct mds_client_data *mcd = NULL;
149         loff_t fsize = f->f_dentry->d_inode->i_size;
150         loff_t off = 0;
151         int cl_off;
152         __u64 last_rcvd = 0;
153         __u64 last_mount;
154         int rc = 0;
155
156         OBD_ALLOC(msd, sizeof(*msd));
157         if (!msd)
158                 RETURN(-ENOMEM);
159         rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
160
161         mds->mds_server_data = msd;
162         if (rc == 0) {
163                 CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
164                 RETURN(0);
165         } else if (rc != sizeof(*msd)) {
166                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
167                 if (rc > 0) {
168                         rc = -EIO;
169                 }
170                 GOTO(err_msd, rc);
171         }
172
173         /*
174          * When we do a clean MDS shutdown, we save the last_rcvd into
175          * the header.  If we find clients with higher last_rcvd values
176          * then those clients may need recovery done.
177          */
178         last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
179         mds->mds_last_rcvd = last_rcvd;
180         CDEBUG(D_INODE, "got %Lu for server last_rcvd value\n",
181                (unsigned long long)last_rcvd);
182
183         last_mount = le64_to_cpu(msd->msd_mount_count);
184         mds->mds_mount_count = last_mount;
185         CDEBUG(D_INODE, "got %Lu for server last_mount value\n",
186                (unsigned long long)last_mount);
187
188         for (off = MDS_LR_CLIENT, cl_off = 0, rc = sizeof(*mcd);
189              off <= fsize - sizeof(*mcd) && rc == sizeof(*mcd);
190              off = MDS_LR_CLIENT + ++cl_off * MDS_LR_SIZE) {
191                 if (!mcd)
192                         OBD_ALLOC(mcd, sizeof(*mcd));
193                 if (!mcd)
194                         GOTO(err_msd, rc = -ENOMEM);
195
196                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
197                 if (rc != sizeof(*mcd)) {
198                         CERROR("error reading MDS %s offset %d: rc = %d\n",
199                                LAST_RCVD, cl_off, rc);
200                         if (rc > 0)
201                                 rc = -EIO;
202                         break;
203                 }
204
205                 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
206                 last_mount = le64_to_cpu(mcd->mcd_mount_count);
207
208                 if (last_rcvd &&
209                     last_mount - mcd->mcd_mount_count < MDS_MOUNT_RECOV) {
210                         rc = mds_client_add(mds, mcd, cl_off);
211                         if (rc) {
212                                 rc = 0;
213                                 break;
214                         }
215                         mcd = NULL;
216                 } else {
217                         CDEBUG(D_INFO,
218                                "client at offset %d with UUID '%s' ignored\n",
219                                cl_off, mcd->mcd_uuid);
220                 }
221
222                 if (last_rcvd > mds->mds_last_rcvd) {
223                         CDEBUG(D_OTHER,
224                                "client at offset %d has last_rcvd = %Lu\n",
225                                cl_off, (unsigned long long)last_rcvd);
226                         mds->mds_last_rcvd = last_rcvd;
227                 }
228         }
229         CDEBUG(D_INODE, "got %Lu for highest last_rcvd value, %d clients\n",
230                (unsigned long long)mds->mds_last_rcvd, mds->mds_client_count);
231
232         /* After recovery, there can be no local uncommitted transactions */
233         mds->mds_last_committed = mds->mds_last_rcvd;
234
235         return 0;
236
237 err_msd:
238         mds_server_free_data(mds);
239         return rc;
240 }
241
242 static int mds_fs_prep(struct mds_obd *mds)
243 {
244         struct obd_run_ctxt saved;
245         struct dentry *dentry;
246         struct file *f;
247         int rc;
248
249         push_ctxt(&saved, &mds->mds_ctxt);
250         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0700);
251         if (IS_ERR(dentry)) {
252                 rc = PTR_ERR(dentry);
253                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
254                 GOTO(err_pop, rc);
255         }
256         /* XXX probably want to hold on to this later... */
257         dput(dentry);
258         f = filp_open("ROOT", O_RDONLY, 0);
259         if (IS_ERR(f)) {
260                 rc = PTR_ERR(f);
261                 CERROR("cannot open ROOT: rc = %d\n", rc);
262                 LBUG();
263                 GOTO(err_pop, rc);
264         }
265
266         mds->mds_rootfid.id = f->f_dentry->d_inode->i_ino;
267         mds->mds_rootfid.generation = f->f_dentry->d_inode->i_generation;
268         mds->mds_rootfid.f_type = S_IFDIR;
269
270         rc = filp_close(f, 0);
271         if (rc) {
272                 CERROR("cannot close ROOT: rc = %d\n", rc);
273                 LBUG();
274         }
275
276         dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
277         if (IS_ERR(dentry)) {
278                 rc = PTR_ERR(dentry);
279                 CERROR("cannot create FH directory: rc = %d\n", rc);
280                 GOTO(err_pop, rc);
281         }
282         /* XXX probably want to hold on to this later... */
283         dput(dentry);
284
285         rc = mds_init_client_data(mds);
286         if (rc)
287                 GOTO(err_pop, rc);
288
289         f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
290         if (IS_ERR(f)) {
291                 rc = PTR_ERR(f);
292                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
293                 GOTO(err_pop, rc = PTR_ERR(f));
294         }
295         if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
296                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
297                        f->f_dentry->d_inode->i_mode);
298                 GOTO(err_pop, rc = -ENOENT);
299         }
300
301         rc = mds_fs_journal_data(mds, f);
302         if (rc) {
303                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
304                 GOTO(err_filp, rc);
305         }
306
307         rc = mds_read_last_rcvd(mds, f);
308         if (rc) {
309                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
310                 GOTO(err_client, rc);
311         }
312         mds->mds_rcvd_filp = f;
313         pop_ctxt(&saved);
314
315         RETURN(0);
316
317 err_client:
318         mds_client_free_all(mds);
319 err_filp:
320         if (filp_close(f, 0))
321                 CERROR("can't close %s after error\n", LAST_RCVD);
322 err_pop:
323         pop_ctxt(&saved);
324
325         return rc;
326 }
327
328 static struct mds_fs_operations *mds_search_fs_type(const char *name)
329 {
330         struct list_head *p;
331         struct mds_fs_type *type;
332
333         /* lock mds_fs_types list */
334         list_for_each(p, &mds_fs_types) {
335                 type = list_entry(p, struct mds_fs_type, mft_list);
336                 if (!strcmp(type->mft_name, name)) {
337                         /* unlock mds_fs_types list */
338                         return type->mft_ops;
339                 }
340         }
341         /* unlock mds_fs_types list */
342         return NULL;
343 }
344
345 int mds_register_fs_type(struct mds_fs_operations *ops, const char *name)
346 {
347         struct mds_fs_operations *found;
348         struct mds_fs_type *type;
349
350         if ((found = mds_search_fs_type(name))) {
351                 if (found != ops) {
352                         CERROR("different operations for type %s\n", name);
353                         RETURN(-EEXIST);
354                 }
355                 return 0;
356         }
357         OBD_ALLOC(type, sizeof(*type));
358         if (!type)
359                 RETURN(-ENOMEM);
360
361         INIT_LIST_HEAD(&type->mft_list);
362         type->mft_ops = ops;
363         type->mft_name = strdup(name);
364         if (!type->mft_name) {
365                 OBD_FREE(type, sizeof(*type));
366                 RETURN(-ENOMEM);
367         }
368         MOD_INC_USE_COUNT;
369         list_add(&type->mft_list, &mds_fs_types);
370
371         return 0;
372 }
373
374 void mds_unregister_fs_type(const char *name)
375 {
376         struct list_head *p;
377
378         /* lock mds_fs_types list */
379         list_for_each(p, &mds_fs_types) {
380                 struct mds_fs_type *type;
381
382                 type = list_entry(p, struct mds_fs_type, mft_list);
383                 if (!strcmp(type->mft_name, name)) {
384                         list_del(p);
385                         kfree(type->mft_name);
386                         OBD_FREE(type, sizeof(*type));
387                         MOD_DEC_USE_COUNT;
388                         break;
389                 }
390         }
391         /* unlock mds_fs_types list */
392 }
393
394 int mds_fs_setup(struct mds_obd *mds, struct vfsmount *mnt)
395 {
396         struct mds_fs_operations *fs_ops;
397         int rc;
398
399         if (!(fs_ops = mds_search_fs_type(mds->mds_fstype))) {
400                 char name[32];
401
402                 snprintf(name, sizeof(name) - 1, "mds_%s", mds->mds_fstype);
403                 name[sizeof(name) - 1] = '\0';
404
405                 if ((rc = request_module(name))) {
406                         fs_ops = mds_search_fs_type(mds->mds_fstype);
407                         CDEBUG(D_INFO, "Loaded module '%s'\n", name);
408                         if (!fs_ops)
409                                 rc = -ENOENT;
410                 }
411
412                 if (rc) {
413                         CERROR("Can't find MDS fs interface '%s'\n", name);
414                         RETURN(rc);
415                 }
416         }
417
418         mds->mds_fsops = fs_ops;
419         mds->mds_vfsmnt = mnt;
420         mds->mds_ctxt.pwdmnt = mnt;
421         mds->mds_ctxt.pwd = mnt->mnt_root;
422         mds->mds_ctxt.fs = KERNEL_DS;
423
424         /*
425          * Replace the client filesystem delete_inode method with our own,
426          * so that we can clear the object ID before the inode is deleted.
427          * The fs_delete_inode method will call cl_delete_inode for us.
428          * We need to do this for the MDS superblock only, hence we install
429          * a modified copy of the original superblock method table.
430          *
431          * We still assume that there is only a single MDS client filesystem
432          * type, as we don't have access to the mds struct in delete_inode
433          * and store the client delete_inode method in a global table.  This
434          * will only become a problem if/when multiple MDSs are running on a
435          * single host with different underlying filesystems.
436          */
437         OBD_ALLOC(mds->mds_sop, sizeof(*mds->mds_sop));
438         if (!mds->mds_sop)
439                 RETURN(-ENOMEM);
440
441         memcpy(mds->mds_sop, mds->mds_sb->s_op, sizeof(*mds->mds_sop));
442         mds->mds_fsops->cl_delete_inode = mds->mds_sop->delete_inode;
443         mds->mds_sop->delete_inode = mds->mds_fsops->fs_delete_inode;
444         mds->mds_sb->s_op = mds->mds_sop;
445
446         rc = mds_fs_prep(mds);
447
448         if (rc)
449                 OBD_FREE(mds->mds_sop, sizeof(*mds->mds_sop));
450
451         return rc;
452 }
453
454 void mds_fs_cleanup(struct mds_obd *mds)
455 {
456         mds_client_free_all(mds);
457         mds_server_free_data(mds);
458
459         OBD_FREE(mds->mds_sop, sizeof(*mds->mds_sop));
460 }
461
462 EXPORT_SYMBOL(mds_register_fs_type);
463 EXPORT_SYMBOL(mds_unregister_fs_type);