Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_MDS
28
29 #include <linux/module.h>
30 #include <linux/kmod.h>
31 #include <linux/version.h>
32 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
33 #include <linux/mount.h>
34 #endif
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_fsfilt.h>
40
41 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
42 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
43 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
44
45 static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
46
47 #define LAST_RCVD "last_rcvd"
48
49 /* Add client data to the MDS.  We use a bitmap to locate a free space
50  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
51  * Otherwise, we have just read the data from the last_rcvd file and
52  * we know its offset.
53  */
54 int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
55 {
56         int new_client = (cl_off == -1);
57
58         /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
59          * there's no need for extra complication here
60          */
61         if (new_client) {
62                 cl_off = find_first_zero_bit(last_rcvd_slots, MDS_MAX_CLIENTS);
63         repeat:
64                 if (cl_off >= MDS_MAX_CLIENTS) {
65                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
66                         return -ENOMEM;
67                 }
68                 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
69                         CERROR("MDS client %d: found bit is set in bitmap\n",
70                                cl_off);
71                         cl_off = find_next_zero_bit(last_rcvd_slots,
72                                                     MDS_MAX_CLIENTS, cl_off);
73                         goto repeat;
74                 }
75         } else {
76                 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
77                         CERROR("MDS client %d: bit already set in bitmap!!\n",
78                                cl_off);
79                         LBUG();
80                 }
81         }
82
83         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
84                cl_off, med->med_mcd->mcd_uuid);
85
86         med->med_off = cl_off;
87
88         if (new_client) {
89                 struct obd_run_ctxt saved;
90                 loff_t off = MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE);
91                 ssize_t written;
92
93                 push_ctxt(&saved, &mds->mds_ctxt, NULL);
94                 written = lustre_fwrite(mds->mds_rcvd_filp,
95                                         (char *)med->med_mcd,
96                                         sizeof(*med->med_mcd), &off);
97                 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
98
99                 if (written != sizeof(*med->med_mcd)) {
100                         if (written < 0)
101                                 RETURN(written);
102                         RETURN(-EIO);
103                 }
104                 CDEBUG(D_INFO, "wrote client mcd at off %u (len %u)\n",
105                        MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE),
106                        (unsigned int)sizeof(*med->med_mcd));
107         }
108         return 0;
109 }
110
111 int mds_client_free(struct obd_export *exp)
112 {
113         struct mds_export_data *med = &exp->exp_mds_data;
114         struct mds_obd *mds = &exp->exp_obd->u.mds;
115         struct mds_client_data zero_mcd;
116         struct obd_run_ctxt saved;
117         int written;
118         loff_t off;
119
120         if (!med->med_mcd)
121                 RETURN(0);
122
123         off = MDS_LR_CLIENT + (med->med_off * MDS_LR_SIZE);
124
125         CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
126                med->med_off, off, med->med_mcd->mcd_uuid);
127
128         if (!test_and_clear_bit(med->med_off, last_rcvd_slots)) {
129                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
130                        med->med_off);
131                 LBUG();
132         }
133
134         memset(&zero_mcd, 0, sizeof zero_mcd);
135         push_ctxt(&saved, &mds->mds_ctxt, NULL);
136         written = lustre_fwrite(mds->mds_rcvd_filp, (const char *)&zero_mcd,
137                                 sizeof(zero_mcd), &off);
138         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
139
140         if (written != sizeof(zero_mcd)) {
141                 CERROR("error zeroing out client %s off %d in %s: %d\n",
142                        med->med_mcd->mcd_uuid, med->med_off, LAST_RCVD,
143                        written);
144         } else {
145                 CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
146                        med->med_mcd->mcd_uuid, med->med_off);
147         }
148
149         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
150
151         return 0;
152 }
153
154 static int mds_server_free_data(struct mds_obd *mds)
155 {
156         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
157         mds->mds_server_data = NULL;
158
159         return 0;
160 }
161
162 static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
163 {
164         struct mds_obd *mds = &obddev->u.mds;
165         struct mds_server_data *msd;
166         struct mds_client_data *mcd = NULL;
167         loff_t off = 0;
168         int cl_off;
169         unsigned long last_rcvd_size = f->f_dentry->d_inode->i_size;
170         __u64 last_transno = 0;
171         __u64 last_mount;
172         int rc = 0;
173
174         OBD_ALLOC(msd, sizeof(*msd));
175         if (!msd)
176                 RETURN(-ENOMEM);
177         rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
178
179         mds->mds_server_data = msd;
180         if (rc == 0) {
181                 CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
182                 RETURN(0);
183         }
184
185         if (rc != sizeof(*msd)) {
186                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
187                 if (rc > 0)
188                         rc = -EIO;
189                 GOTO(err_msd, rc);
190         }
191
192         CDEBUG(D_INODE, "last_rcvd has size %lu (msd + %lu clients)\n",
193                last_rcvd_size, (last_rcvd_size - MDS_LR_CLIENT)/MDS_LR_SIZE);
194
195         /*
196          * When we do a clean MDS shutdown, we save the last_transno into
197          * the header.
198          */
199         last_transno = le64_to_cpu(msd->msd_last_transno);
200         mds->mds_last_transno = last_transno;
201         CDEBUG(D_INODE, "got "LPU64" for server last_rcvd value\n",
202                last_transno);
203
204         last_mount = le64_to_cpu(msd->msd_mount_count);
205         mds->mds_mount_count = last_mount;
206         CDEBUG(D_INODE, "got "LPU64" for server last_mount value\n",last_mount);
207
208         /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
209         for (off = MDS_LR_CLIENT, cl_off = 0; off < last_rcvd_size; cl_off++) {
210                 int mount_age;
211
212                 if (!mcd) {
213                         OBD_ALLOC(mcd, sizeof(*mcd));
214                         if (!mcd)
215                                 GOTO(err_msd, rc = -ENOMEM);
216                 }
217
218                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
219                 if (rc != sizeof(*mcd)) {
220                         CERROR("error reading MDS %s offset %d: rc = %d\n",
221                                LAST_RCVD, cl_off, rc);
222                         if (rc > 0) /* XXX fatal error or just abort reading? */
223                                 rc = -EIO;
224                         break;
225                 }
226
227                 if (mcd->mcd_uuid[0] == '\0') {
228                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
229                                cl_off);
230                         continue;
231                 }
232
233                 last_transno = le64_to_cpu(mcd->mcd_last_transno);
234
235                 /* These exports are cleaned up by mds_disconnect(), so they
236                  * need to be set up like real exports as mds_connect() does.
237                  */
238                 mount_age = last_mount - le64_to_cpu(mcd->mcd_mount_count);
239                 if (mount_age < MDS_MOUNT_RECOV) {
240                         struct obd_export *exp = class_new_export(obddev);
241                         struct mds_export_data *med;
242
243                         if (!exp) {
244                                 rc = -ENOMEM;
245                                 break;
246                         }
247
248                         memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
249                                sizeof exp->exp_client_uuid.uuid);
250                         med = &exp->exp_mds_data;
251                         med->med_mcd = mcd;
252                         mds_client_add(mds, med, cl_off);
253                         /* create helper if export init gets more complex */
254                         INIT_LIST_HEAD(&med->med_open_head);
255                         spin_lock_init(&med->med_open_lock);
256
257                         mcd = NULL;
258                         obddev->obd_recoverable_clients++;
259                 } else {
260                         CDEBUG(D_INFO,
261                                "discarded client %d, UUID '%s', count %Ld\n",
262                                cl_off, mcd->mcd_uuid,
263                                (long long)le64_to_cpu(mcd->mcd_mount_count));
264                 }
265
266                 CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
267                        cl_off, (unsigned long long)last_transno);
268
269                 if (last_transno > mds->mds_last_transno)
270                         mds->mds_last_transno = last_transno;
271         }
272
273         obddev->obd_last_committed = mds->mds_last_transno;
274         if (obddev->obd_recoverable_clients) {
275                 CERROR("RECOVERY: %d recoverable clients, last_transno "
276                        LPU64"\n",
277                        obddev->obd_recoverable_clients, mds->mds_last_transno);
278                 obddev->obd_next_recovery_transno = obddev->obd_last_committed
279                         + 1;
280                 obddev->obd_flags |= OBD_RECOVERING;
281         }
282
283         if (mcd)
284                 OBD_FREE(mcd, sizeof(*mcd));
285
286         return 0;
287
288 err_msd:
289         mds_server_free_data(mds);
290         return rc;
291 }
292
293 static int mds_fs_prep(struct obd_device *obddev)
294 {
295         struct mds_obd *mds = &obddev->u.mds;
296         struct obd_run_ctxt saved;
297         struct dentry *dentry;
298         struct file *f;
299         int rc;
300
301         push_ctxt(&saved, &mds->mds_ctxt, NULL);
302         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
303         if (IS_ERR(dentry)) {
304                 rc = PTR_ERR(dentry);
305                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
306                 GOTO(err_pop, rc);
307         }
308
309         mds->mds_rootfid.id = dentry->d_inode->i_ino;
310         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
311         mds->mds_rootfid.f_type = S_IFDIR;
312
313         dput(dentry);
314
315         dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
316         if (IS_ERR(dentry)) {
317                 rc = PTR_ERR(dentry);
318                 CERROR("cannot create FH directory: rc = %d\n", rc);
319                 GOTO(err_pop, rc);
320         }
321         /* XXX probably want to hold on to this later... */
322         dput(dentry);
323
324         f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
325         if (IS_ERR(f)) {
326                 rc = PTR_ERR(f);
327                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
328                 GOTO(err_pop, rc = PTR_ERR(f));
329         }
330         if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
331                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
332                        f->f_dentry->d_inode->i_mode);
333                 GOTO(err_filp, rc = -ENOENT);
334         }
335
336         rc = fsfilt_journal_data(obddev, f);
337         if (rc) {
338                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
339                 GOTO(err_filp, rc);
340         }
341
342         rc = mds_read_last_rcvd(obddev, f);
343         if (rc) {
344                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
345                 GOTO(err_client, rc);
346         }
347         mds->mds_rcvd_filp = f;
348 err_pop:
349         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
350
351         return rc;
352
353 err_client:
354         class_disconnect_all(obddev);
355 err_filp:
356         if (filp_close(f, 0))
357                 CERROR("can't close %s after error\n", LAST_RCVD);
358         goto err_pop;
359 }
360
361 int mds_fs_setup(struct obd_device *obddev, struct vfsmount *mnt)
362 {
363         struct mds_obd *mds = &obddev->u.mds;
364         ENTRY;
365
366         mds->mds_vfsmnt = mnt;
367
368         OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
369         mds->mds_ctxt.pwdmnt = mnt;
370         mds->mds_ctxt.pwd = mnt->mnt_root;
371         mds->mds_ctxt.fs = get_ds();
372
373         RETURN(mds_fs_prep(obddev));
374 }
375
376 int mds_fs_cleanup(struct obd_device *obddev)
377 {
378         struct mds_obd *mds = &obddev->u.mds;
379         struct obd_run_ctxt saved;
380         int rc = 0;
381
382         class_disconnect_all(obddev); /* this cleans up client info too */
383         mds_server_free_data(mds);
384
385         push_ctxt(&saved, &mds->mds_ctxt, NULL);
386         if (mds->mds_rcvd_filp) {
387                 rc = filp_close(mds->mds_rcvd_filp, 0);
388                 mds->mds_rcvd_filp = NULL;
389
390                 if (rc)
391                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
392         }
393         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
394
395         return rc;
396 }