Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_MDS
28
29 #include <linux/module.h>
30 #include <linux/kmod.h>
31 #include <linux/version.h>
32 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
33 #include <linux/mount.h>
34 #endif
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_fsfilt.h>
40
41 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
42 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
43 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
44
45 #define LAST_RCVD "last_rcvd"
46
47 /* Add client data to the MDS.  We use a bitmap to locate a free space
48  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
49  * Otherwise, we have just read the data from the last_rcvd file and
50  * we know its offset.
51  */
52 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
53                    struct mds_export_data *med, int cl_off)
54 {
55         unsigned long *bitmap = mds->mds_client_bitmap;
56         int new_client = (cl_off == -1);
57
58         LASSERT(bitmap != NULL);
59
60         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
61         if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
62                 RETURN(0);
63
64         /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
65          * there's no need for extra complication here
66          */
67         if (new_client) {
68                 cl_off = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
69         repeat:
70                 if (cl_off >= MDS_MAX_CLIENTS) {
71                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
72                         return -ENOMEM;
73                 }
74                 if (test_and_set_bit(cl_off, bitmap)) {
75                         CERROR("MDS client %d: found bit is set in bitmap\n",
76                                cl_off);
77                         cl_off = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
78                                                     cl_off);
79                         goto repeat;
80                 }
81         } else {
82                 if (test_and_set_bit(cl_off, bitmap)) {
83                         CERROR("MDS client %d: bit already set in bitmap!!\n",
84                                cl_off);
85                         LBUG();
86                 }
87         }
88
89         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
90                cl_off, med->med_mcd->mcd_uuid);
91
92         med->med_off = cl_off;
93
94         if (new_client) {
95                 struct obd_run_ctxt saved;
96                 loff_t off = MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE);
97                 ssize_t written;
98                 void *handle;
99
100                 push_ctxt(&saved, &mds->mds_ctxt, NULL);
101                 /* We need to start a transaction here first, to avoid a
102                  * possible ordering deadlock on last_rcvd->i_sem and the
103                  * journal lock. In most places we start the journal handle
104                  * first (because we do compound transactions), and then
105                  * later do the write into last_rcvd, which gets i_sem.
106                  *
107                  * Without this transaction, clients connecting at the same
108                  * time other MDS operations are ongoing get last_rcvd->i_sem
109                  * first (in generic_file_write()) and start the journal
110                  * transaction afterwards, and can deadlock with other ops.
111                  *
112                  * We use FSFILT_OP_SETATTR because it is smallest, but all
113                  * ops include enough space for the last_rcvd update so we
114                  * could use any of them, or maybe an FSFILT_OP_NONE is best?
115                  */
116                 handle = fsfilt_start(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
117                                       FSFILT_OP_SETATTR);
118                 if (IS_ERR(handle)) {
119                         written = PTR_ERR(handle);
120                         CERROR("unable to start transaction: rc %d\n",
121                                (int)written);
122                 } else {
123                         written = lustre_fwrite(mds->mds_rcvd_filp,med->med_mcd,
124                                                 sizeof(*med->med_mcd), &off);
125                         fsfilt_commit(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
126                                       handle, 0);
127                 }
128                 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
129
130                 if (written != sizeof(*med->med_mcd)) {
131                         if (written < 0)
132                                 RETURN(written);
133                         RETURN(-EIO);
134                 }
135                 CDEBUG(D_INFO, "wrote client mcd at off %u (len %u)\n",
136                        MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE),
137                        (unsigned int)sizeof(*med->med_mcd));
138         }
139         return 0;
140 }
141
142 int mds_client_free(struct obd_export *exp)
143 {
144         struct mds_export_data *med = &exp->exp_mds_data;
145         struct mds_obd *mds = &exp->exp_obd->u.mds;
146         struct mds_client_data zero_mcd;
147         struct obd_run_ctxt saved;
148         int written;
149         unsigned long *bitmap = mds->mds_client_bitmap;
150         loff_t off;
151
152         LASSERT(bitmap);
153         if (!med->med_mcd)
154                 RETURN(0);
155
156         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
157         if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
158                 GOTO(free_and_out, 0);
159
160         off = MDS_LR_CLIENT + (med->med_off * MDS_LR_SIZE);
161
162         CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
163                med->med_off, off, med->med_mcd->mcd_uuid);
164
165         if (!test_and_clear_bit(med->med_off, bitmap)) {
166                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
167                        med->med_off);
168                 LBUG();
169         }
170
171         memset(&zero_mcd, 0, sizeof zero_mcd);
172         push_ctxt(&saved, &mds->mds_ctxt, NULL);
173         written = lustre_fwrite(mds->mds_rcvd_filp, (const char *)&zero_mcd,
174                                 sizeof(zero_mcd), &off);
175         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
176
177         if (written != sizeof(zero_mcd)) {
178                 CERROR("error zeroing out client %s off %d in %s: %d\n",
179                        med->med_mcd->mcd_uuid, med->med_off, LAST_RCVD,
180                        written);
181         } else {
182                 CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
183                        med->med_mcd->mcd_uuid, med->med_off);
184         }
185
186  free_and_out:
187         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
188
189         return 0;
190 }
191
192 static int mds_server_free_data(struct mds_obd *mds)
193 {
194         OBD_FREE(mds->mds_client_bitmap,
195                  MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
196         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
197         mds->mds_server_data = NULL;
198
199         return 0;
200 }
201
202 static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
203 {
204         struct mds_obd *mds = &obddev->u.mds;
205         struct mds_server_data *msd;
206         struct mds_client_data *mcd = NULL;
207         loff_t off = 0;
208         int cl_off;
209         unsigned long last_rcvd_size = f->f_dentry->d_inode->i_size;
210         __u64 last_transno = 0;
211         __u64 last_mount;
212         int rc = 0;
213
214         LASSERT(sizeof(struct mds_client_data) == MDS_LR_SIZE);
215         LASSERT(sizeof(struct mds_server_data) <= MDS_LR_CLIENT);
216
217         OBD_ALLOC(msd, sizeof(*msd));
218         if (!msd)
219                 RETURN(-ENOMEM);
220
221         OBD_ALLOC(mds->mds_client_bitmap,
222                   MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
223         if (!mds->mds_client_bitmap) {
224                 OBD_FREE(msd, sizeof(*msd));
225                 RETURN(-ENOMEM);
226         }
227
228         rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
229
230         mds->mds_server_data = msd;
231         if (rc == 0) {
232                 CERROR("%s: empty MDS %s, new MDS?\n", obddev->obd_name,
233                        LAST_RCVD);
234                 RETURN(0);
235         }
236
237         if (rc != sizeof(*msd)) {
238                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
239                 if (rc > 0)
240                         rc = -EIO;
241                 GOTO(err_msd, rc);
242         }
243
244         CDEBUG(D_INODE, "last_rcvd has size %lu (msd + %lu clients)\n",
245                last_rcvd_size, (last_rcvd_size - MDS_LR_CLIENT)/MDS_LR_SIZE);
246
247         /*
248          * When we do a clean MDS shutdown, we save the last_transno into
249          * the header.
250          */
251         last_transno = le64_to_cpu(msd->msd_last_transno);
252         mds->mds_last_transno = last_transno;
253         CDEBUG(D_INODE, "got "LPU64" for server last_rcvd value\n",
254                last_transno);
255
256         last_mount = le64_to_cpu(msd->msd_mount_count);
257         mds->mds_mount_count = last_mount;
258         CDEBUG(D_INODE, "got "LPU64" for server last_mount value\n",last_mount);
259
260         /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
261         for (off = MDS_LR_CLIENT, cl_off = 0; off < last_rcvd_size; cl_off++) {
262                 int mount_age;
263
264                 if (!mcd) {
265                         OBD_ALLOC(mcd, sizeof(*mcd));
266                         if (!mcd)
267                                 GOTO(err_msd, rc = -ENOMEM);
268                 }
269
270                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
271                 if (rc != sizeof(*mcd)) {
272                         CERROR("error reading MDS %s offset %d: rc = %d\n",
273                                LAST_RCVD, cl_off, rc);
274                         if (rc > 0) /* XXX fatal error or just abort reading? */
275                                 rc = -EIO;
276                         break;
277                 }
278
279                 if (mcd->mcd_uuid[0] == '\0') {
280                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
281                                cl_off);
282                         continue;
283                 }
284
285                 last_transno = le64_to_cpu(mcd->mcd_last_transno);
286
287                 /* These exports are cleaned up by mds_disconnect(), so they
288                  * need to be set up like real exports as mds_connect() does.
289                  */
290                 mount_age = last_mount - le64_to_cpu(mcd->mcd_mount_count);
291                 if (mount_age < MDS_MOUNT_RECOV) {
292                         struct obd_export *exp = class_new_export(obddev);
293                         struct mds_export_data *med;
294
295                         if (!exp) {
296                                 rc = -ENOMEM;
297                                 break;
298                         }
299
300                         memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
301                                sizeof exp->exp_client_uuid.uuid);
302                         med = &exp->exp_mds_data;
303                         med->med_mcd = mcd;
304                         mds_client_add(obddev, mds, med, cl_off);
305                         /* create helper if export init gets more complex */
306                         INIT_LIST_HEAD(&med->med_open_head);
307                         spin_lock_init(&med->med_open_lock);
308
309                         mcd = NULL;
310                         obddev->obd_recoverable_clients++;
311                         class_export_put(exp);
312                 } else {
313                         CDEBUG(D_INFO, "discarded client %d, UUID '%s', count "
314                                LPU64"\n", cl_off, mcd->mcd_uuid,
315                                le64_to_cpu(mcd->mcd_mount_count));
316                 }
317
318                 CDEBUG(D_OTHER, "client at offset %d has last_transno = %Lu\n",
319                        cl_off, (unsigned long long)last_transno);
320
321                 if (last_transno > mds->mds_last_transno)
322                         mds->mds_last_transno = last_transno;
323         }
324
325         obddev->obd_last_committed = mds->mds_last_transno;
326         if (obddev->obd_recoverable_clients) {
327                 CERROR("RECOVERY: %d recoverable clients, last_transno "
328                        LPU64"\n",
329                        obddev->obd_recoverable_clients, mds->mds_last_transno);
330                 obddev->obd_next_recovery_transno = obddev->obd_last_committed
331                         + 1;
332                 obddev->obd_recovering = 1;
333         }
334
335         if (mcd)
336                 OBD_FREE(mcd, sizeof(*mcd));
337
338         return 0;
339
340 err_msd:
341         mds_server_free_data(mds);
342         return rc;
343 }
344
345 static int mds_fs_prep(struct obd_device *obddev)
346 {
347         struct mds_obd *mds = &obddev->u.mds;
348         struct obd_run_ctxt saved;
349         struct dentry *dentry;
350         struct file *f;
351         int rc;
352
353         push_ctxt(&saved, &mds->mds_ctxt, NULL);
354         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
355         if (IS_ERR(dentry)) {
356                 rc = PTR_ERR(dentry);
357                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
358                 GOTO(err_pop, rc);
359         }
360
361         mds->mds_rootfid.id = dentry->d_inode->i_ino;
362         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
363         mds->mds_rootfid.f_type = S_IFDIR;
364
365         dput(dentry);
366
367         dentry = lookup_one_len("__iopen__", current->fs->pwd,
368                                 strlen("__iopen__"));
369         if (IS_ERR(dentry) || !dentry->d_inode) {
370                 rc = (IS_ERR(dentry)) ? PTR_ERR(dentry): -ENOENT;
371                 CERROR("cannot open iopen FH directory: rc = %d\n", rc);
372                 GOTO(err_pop, rc);
373         }
374         mds->mds_fid_de = dentry;
375
376         f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
377         if (IS_ERR(f)) {
378                 rc = PTR_ERR(f);
379                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
380                 GOTO(err_pop, rc = PTR_ERR(f));
381         }
382         if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
383                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
384                        f->f_dentry->d_inode->i_mode);
385                 GOTO(err_filp, rc = -ENOENT);
386         }
387
388         rc = fsfilt_journal_data(obddev, f);
389         if (rc) {
390                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
391                 GOTO(err_filp, rc);
392         }
393
394         rc = mds_read_last_rcvd(obddev, f);
395         if (rc) {
396                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
397                 GOTO(err_client, rc);
398         }
399         mds->mds_rcvd_filp = f;
400 err_pop:
401         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
402
403         return rc;
404
405 err_client:
406         class_disconnect_exports(obddev, 0);
407 err_filp:
408         if (filp_close(f, 0))
409                 CERROR("can't close %s after error\n", LAST_RCVD);
410         goto err_pop;
411 }
412
413 int mds_fs_setup(struct obd_device *obddev, struct vfsmount *mnt)
414 {
415         struct mds_obd *mds = &obddev->u.mds;
416         ENTRY;
417
418         mds->mds_vfsmnt = mnt;
419
420         OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
421         mds->mds_ctxt.pwdmnt = mnt;
422         mds->mds_ctxt.pwd = mnt->mnt_root;
423         mds->mds_ctxt.fs = get_ds();
424         RETURN(mds_fs_prep(obddev));
425 }
426
427 int mds_fs_cleanup(struct obd_device *obddev, int failover)
428 {
429         struct mds_obd *mds = &obddev->u.mds;
430         struct obd_run_ctxt saved;
431         int rc = 0;
432
433         if (failover)
434                 CERROR("%s: shutting down for failover; client state will"
435                        " be preserved.\n", obddev->obd_name);
436
437         class_disconnect_exports(obddev, failover); /* this cleans up client
438                                                    info too */
439         mds_server_free_data(mds);
440
441         push_ctxt(&saved, &mds->mds_ctxt, NULL);
442         if (mds->mds_rcvd_filp) {
443                 rc = filp_close(mds->mds_rcvd_filp, 0);
444                 mds->mds_rcvd_filp = NULL;
445                 if (rc)
446                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
447         }
448         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
449         shrink_dcache_parent(mds->mds_fid_de);
450         dput(mds->mds_fid_de);
451
452         return rc;
453 }