Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_MDS
28
29 #include <linux/module.h>
30 #include <linux/kmod.h>
31 #include <linux/lustre_mds.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd_support.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_fsfilt.h>
36
37 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
38 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
39 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
40
41 static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
42
43 #define LAST_RCVD "last_rcvd"
44
45 /* Add client data to the MDS.  We use a bitmap to locate a free space
46  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
47  * Otherwise, we have just read the data from the last_rcvd file and
48  * we know its offset.
49  */
50 int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
51 {
52         int new_client = (cl_off == -1);
53
54         /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
55          * there's no need for extra complication here
56          */
57         if (new_client) {
58                 cl_off = find_first_zero_bit(last_rcvd_slots, MDS_MAX_CLIENTS);
59         repeat:
60                 if (cl_off >= MDS_MAX_CLIENTS) {
61                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
62                         return -ENOMEM;
63                 }
64                 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
65                         CERROR("MDS client %d: found bit is set in bitmap\n",
66                                cl_off);
67                         cl_off = find_next_zero_bit(last_rcvd_slots,
68                                                     MDS_MAX_CLIENTS, cl_off);
69                         goto repeat;
70                 }
71         } else {
72                 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
73                         CERROR("MDS client %d: bit already set in bitmap!!\n",
74                                cl_off);
75                         LBUG();
76                 }
77         }
78
79         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
80                cl_off, med->med_mcd->mcd_uuid);
81
82         med->med_off = cl_off;
83
84         if (new_client) {
85                 struct obd_run_ctxt saved;
86                 loff_t off = MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE);
87                 ssize_t written;
88
89                 push_ctxt(&saved, &mds->mds_ctxt, NULL);
90                 written = lustre_fwrite(mds->mds_rcvd_filp,
91                                         (char *)med->med_mcd,
92                                         sizeof(*med->med_mcd), &off);
93                 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
94
95                 if (written != sizeof(*med->med_mcd)) {
96                         if (written < 0)
97                                 RETURN(written);
98                         RETURN(-EIO);
99                 }
100                 CDEBUG(D_INFO, "wrote client mcd at off %u (len %u)\n",
101                        MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE),
102                        (unsigned int)sizeof(*med->med_mcd));
103         }
104         return 0;
105 }
106
107 int mds_client_free(struct obd_export *exp)
108 {
109         struct mds_export_data *med = &exp->exp_mds_data;
110         struct mds_obd *mds = &exp->exp_obd->u.mds;
111         struct mds_client_data zero_mcd;
112         struct obd_run_ctxt saved;
113         int written;
114         loff_t off;
115
116         if (!med->med_mcd)
117                 RETURN(0);
118
119         off = MDS_LR_CLIENT + (med->med_off * MDS_LR_SIZE);
120
121         CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
122                med->med_off, off, med->med_mcd->mcd_uuid);
123
124         if (!test_and_clear_bit(med->med_off, last_rcvd_slots)) {
125                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
126                        med->med_off);
127                 LBUG();
128         }
129
130         memset(&zero_mcd, 0, sizeof zero_mcd);
131         push_ctxt(&saved, &mds->mds_ctxt, NULL);
132         written = lustre_fwrite(mds->mds_rcvd_filp, (const char *)&zero_mcd,
133                                 sizeof(zero_mcd), &off);
134         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
135
136         if (written != sizeof(zero_mcd)) {
137                 CERROR("error zeroing out client %s off %d in %s: %d\n",
138                        med->med_mcd->mcd_uuid, med->med_off, LAST_RCVD,
139                        written);
140         } else {
141                 CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
142                        med->med_mcd->mcd_uuid, med->med_off);
143         }
144
145         if (med->med_last_reply) {
146                 OBD_FREE(med->med_last_reply, med->med_last_replen);
147                 med->med_last_reply = NULL;
148         }
149         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
150
151         return 0;
152 }
153
154 static int mds_server_free_data(struct mds_obd *mds)
155 {
156         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
157         mds->mds_server_data = NULL;
158
159         return 0;
160 }
161
162 static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
163 {
164         struct mds_obd *mds = &obddev->u.mds;
165         struct mds_server_data *msd;
166         struct mds_client_data *mcd = NULL;
167         loff_t off = 0;
168         int cl_off;
169         unsigned long last_rcvd_size = f->f_dentry->d_inode->i_size;
170         __u64 last_rcvd = 0;
171         __u64 last_mount;
172         int rc = 0;
173
174         OBD_ALLOC(msd, sizeof(*msd));
175         if (!msd)
176                 RETURN(-ENOMEM);
177         rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
178
179         mds->mds_server_data = msd;
180         if (rc == 0) {
181                 CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
182                 RETURN(0);
183         }
184
185         if (rc != sizeof(*msd)) {
186                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
187                 if (rc > 0)
188                         rc = -EIO;
189                 GOTO(err_msd, rc);
190         }
191
192         CDEBUG(D_INODE, "last_rcvd has size %lu (msd + %lu clients)\n",
193                last_rcvd_size, (last_rcvd_size - MDS_LR_CLIENT)/MDS_LR_SIZE);
194
195         /*
196          * When we do a clean MDS shutdown, we save the last_rcvd into
197          * the header.  If we find clients with higher last_rcvd values
198          * then those clients may need recovery done.
199          */
200         last_rcvd = le64_to_cpu(msd->msd_last_rcvd);
201         mds->mds_last_rcvd = last_rcvd;
202         CDEBUG(D_INODE, "got "LPU64" for server last_rcvd value\n", last_rcvd);
203
204         last_mount = le64_to_cpu(msd->msd_mount_count);
205         mds->mds_mount_count = last_mount;
206         CDEBUG(D_INODE, "got "LPU64" for server last_mount value\n",last_mount);
207
208         /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
209         for (off = MDS_LR_CLIENT, cl_off = 0; off < last_rcvd_size; cl_off++) {
210                 int mount_age;
211
212                 if (!mcd) {
213                         OBD_ALLOC(mcd, sizeof(*mcd));
214                         if (!mcd)
215                                 GOTO(err_msd, rc = -ENOMEM);
216                 }
217
218                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
219                 if (rc != sizeof(*mcd)) {
220                         CERROR("error reading MDS %s offset %d: rc = %d\n",
221                                LAST_RCVD, cl_off, rc);
222                         if (rc > 0) /* XXX fatal error or just abort reading? */
223                                 rc = -EIO;
224                         break;
225                 }
226
227                 if (mcd->mcd_uuid[0] == '\0') {
228                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
229                                cl_off);
230                         continue;
231                 }
232
233                 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
234
235                 /* These exports are cleaned up by mds_disconnect(), so they
236                  * need to be set up like real exports as mds_connect() does.
237                  */
238                 mount_age = last_mount - le64_to_cpu(mcd->mcd_mount_count);
239                 if (mount_age < MDS_MOUNT_RECOV) {
240                         struct obd_export *exp = class_new_export(obddev);
241                         struct mds_export_data *med;
242
243                         if (!exp) {
244                                 rc = -ENOMEM;
245                                 break;
246                         }
247
248                         memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
249                                sizeof exp->exp_client_uuid.uuid);
250                         med = &exp->exp_mds_data;
251                         med->med_mcd = mcd;
252                         mds_client_add(mds, med, cl_off);
253                         /* create helper if export init gets more complex */
254                         INIT_LIST_HEAD(&med->med_open_head);
255                         spin_lock_init(&med->med_open_lock);
256
257                         mcd = NULL;
258                         mds->mds_recoverable_clients++;
259                 } else {
260                         CDEBUG(D_INFO,
261                                "discarded client %d, UUID '%s', count %Ld\n",
262                                cl_off, mcd->mcd_uuid,
263                                (long long)le64_to_cpu(mcd->mcd_mount_count));
264                 }
265
266                 CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
267                        cl_off, (unsigned long long)last_rcvd);
268
269                 if (last_rcvd > mds->mds_last_rcvd)
270                         mds->mds_last_rcvd = last_rcvd;
271         }
272
273         obddev->obd_last_committed = mds->mds_last_rcvd;
274         if (mds->mds_recoverable_clients) {
275                 CERROR("RECOVERY: %d recoverable clients, last_rcvd "LPU64"\n",
276                        mds->mds_recoverable_clients, mds->mds_last_rcvd);
277                 mds->mds_next_recovery_transno = obddev->obd_last_committed + 1;
278                 obddev->obd_flags |= OBD_RECOVERING;
279         }
280
281         if (mcd)
282                 OBD_FREE(mcd, sizeof(*mcd));
283
284         return 0;
285
286 err_msd:
287         mds_server_free_data(mds);
288         return rc;
289 }
290
291 static int mds_fs_prep(struct obd_device *obddev)
292 {
293         struct mds_obd *mds = &obddev->u.mds;
294         struct obd_run_ctxt saved;
295         struct dentry *dentry;
296         struct file *f;
297         int rc;
298
299         push_ctxt(&saved, &mds->mds_ctxt, NULL);
300         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
301         if (IS_ERR(dentry)) {
302                 rc = PTR_ERR(dentry);
303                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
304                 GOTO(err_pop, rc);
305         }
306
307         mds->mds_rootfid.id = dentry->d_inode->i_ino;
308         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
309         mds->mds_rootfid.f_type = S_IFDIR;
310
311         dput(dentry);
312
313         dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
314         if (IS_ERR(dentry)) {
315                 rc = PTR_ERR(dentry);
316                 CERROR("cannot create FH directory: rc = %d\n", rc);
317                 GOTO(err_pop, rc);
318         }
319         /* XXX probably want to hold on to this later... */
320         dput(dentry);
321
322         f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
323         if (IS_ERR(f)) {
324                 rc = PTR_ERR(f);
325                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
326                 GOTO(err_pop, rc = PTR_ERR(f));
327         }
328         if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
329                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
330                        f->f_dentry->d_inode->i_mode);
331                 GOTO(err_filp, rc = -ENOENT);
332         }
333
334         rc = fsfilt_journal_data(obddev, f);
335         if (rc) {
336                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
337                 GOTO(err_filp, rc);
338         }
339
340         rc = mds_read_last_rcvd(obddev, f);
341         if (rc) {
342                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
343                 GOTO(err_client, rc);
344         }
345         mds->mds_rcvd_filp = f;
346 err_pop:
347         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
348
349         return rc;
350
351 err_client:
352         class_disconnect_all(obddev);
353 err_filp:
354         if (filp_close(f, 0))
355                 CERROR("can't close %s after error\n", LAST_RCVD);
356         goto err_pop;
357 }
358
359 int mds_fs_setup(struct obd_device *obddev, struct vfsmount *mnt)
360 {
361         struct mds_obd *mds = &obddev->u.mds;
362         ENTRY;
363
364         mds->mds_vfsmnt = mnt;
365
366         OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
367         mds->mds_ctxt.pwdmnt = mnt;
368         mds->mds_ctxt.pwd = mnt->mnt_root;
369         mds->mds_ctxt.fs = get_ds();
370
371         RETURN(mds_fs_prep(obddev));
372 }
373
374 int mds_fs_cleanup(struct obd_device *obddev)
375 {
376         struct mds_obd *mds = &obddev->u.mds;
377         struct obd_run_ctxt saved;
378         int rc = 0;
379
380         class_disconnect_all(obddev); /* this cleans up client info too */
381         mds_server_free_data(mds);
382
383         push_ctxt(&saved, &mds->mds_ctxt, NULL);
384         if (mds->mds_rcvd_filp) {
385                 rc = filp_close(mds->mds_rcvd_filp, 0);
386                 mds->mds_rcvd_filp = NULL;
387
388                 if (rc)
389                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
390         }
391         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
392
393         return rc;
394 }