Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_MDS
28
29 #include <linux/module.h>
30 #include <linux/kmod.h>
31 #include <linux/version.h>
32 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
33 #include <linux/mount.h>
34 #endif
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_fsfilt.h>
40
41 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
42 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
43 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
44
45 static unsigned long last_rcvd_slots[MDS_MAX_CLIENT_WORDS];
46
47 #define LAST_RCVD "last_rcvd"
48
49 /* Add client data to the MDS.  We use a bitmap to locate a free space
50  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
51  * Otherwise, we have just read the data from the last_rcvd file and
52  * we know its offset.
53  */
54 int mds_client_add(struct mds_obd *mds, struct mds_export_data *med, int cl_off)
55 {
56         int new_client = (cl_off == -1);
57
58         /* the bitmap operations can handle cl_off > sizeof(long) * 8, so
59          * there's no need for extra complication here
60          */
61         if (new_client) {
62                 cl_off = find_first_zero_bit(last_rcvd_slots, MDS_MAX_CLIENTS);
63         repeat:
64                 if (cl_off >= MDS_MAX_CLIENTS) {
65                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
66                         return -ENOMEM;
67                 }
68                 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
69                         CERROR("MDS client %d: found bit is set in bitmap\n",
70                                cl_off);
71                         cl_off = find_next_zero_bit(last_rcvd_slots,
72                                                     MDS_MAX_CLIENTS, cl_off);
73                         goto repeat;
74                 }
75         } else {
76                 if (test_and_set_bit(cl_off, last_rcvd_slots)) {
77                         CERROR("MDS client %d: bit already set in bitmap!!\n",
78                                cl_off);
79                         LBUG();
80                 }
81         }
82
83         CDEBUG(D_INFO, "client at offset %d with UUID '%s' added\n",
84                cl_off, med->med_mcd->mcd_uuid);
85
86         med->med_off = cl_off;
87
88         if (new_client) {
89                 struct obd_run_ctxt saved;
90                 loff_t off = MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE);
91                 ssize_t written;
92
93                 push_ctxt(&saved, &mds->mds_ctxt, NULL);
94                 written = lustre_fwrite(mds->mds_rcvd_filp,
95                                         (char *)med->med_mcd,
96                                         sizeof(*med->med_mcd), &off);
97                 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
98
99                 if (written != sizeof(*med->med_mcd)) {
100                         if (written < 0)
101                                 RETURN(written);
102                         RETURN(-EIO);
103                 }
104                 CDEBUG(D_INFO, "wrote client mcd at off %u (len %u)\n",
105                        MDS_LR_CLIENT + (cl_off * MDS_LR_SIZE),
106                        (unsigned int)sizeof(*med->med_mcd));
107         }
108         return 0;
109 }
110
111 int mds_client_free(struct obd_export *exp)
112 {
113         struct mds_export_data *med = &exp->exp_mds_data;
114         struct mds_obd *mds = &exp->exp_obd->u.mds;
115         struct mds_client_data zero_mcd;
116         struct obd_run_ctxt saved;
117         int written;
118         loff_t off;
119
120         if (!med->med_mcd)
121                 RETURN(0);
122
123         off = MDS_LR_CLIENT + (med->med_off * MDS_LR_SIZE);
124
125         CDEBUG(D_INFO, "freeing client at offset %u (%lld)with UUID '%s'\n",
126                med->med_off, off, med->med_mcd->mcd_uuid);
127
128         if (!test_and_clear_bit(med->med_off, last_rcvd_slots)) {
129                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
130                        med->med_off);
131                 LBUG();
132         }
133
134         memset(&zero_mcd, 0, sizeof zero_mcd);
135         push_ctxt(&saved, &mds->mds_ctxt, NULL);
136         written = lustre_fwrite(mds->mds_rcvd_filp, (const char *)&zero_mcd,
137                                 sizeof(zero_mcd), &off);
138         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
139
140         if (written != sizeof(zero_mcd)) {
141                 CERROR("error zeroing out client %s off %d in %s: %d\n",
142                        med->med_mcd->mcd_uuid, med->med_off, LAST_RCVD,
143                        written);
144         } else {
145                 CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
146                        med->med_mcd->mcd_uuid, med->med_off);
147         }
148
149         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
150
151         return 0;
152 }
153
154 static int mds_server_free_data(struct mds_obd *mds)
155 {
156         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
157         mds->mds_server_data = NULL;
158
159         return 0;
160 }
161
162 static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
163 {
164         struct mds_obd *mds = &obddev->u.mds;
165         struct mds_server_data *msd;
166         struct mds_client_data *mcd = NULL;
167         loff_t off = 0;
168         int cl_off;
169         unsigned long last_rcvd_size = f->f_dentry->d_inode->i_size;
170         __u64 last_transno = 0;
171         __u64 last_mount;
172         int rc = 0;
173  
174         LASSERT(sizeof(struct mds_client_data) == MDS_LR_SIZE);
175         LASSERT(sizeof(struct mds_server_data) <= MDS_LR_CLIENT);
176
177         OBD_ALLOC(msd, sizeof(*msd));
178         if (!msd)
179                 RETURN(-ENOMEM);
180         rc = lustre_fread(f, (char *)msd, sizeof(*msd), &off);
181
182         mds->mds_server_data = msd;
183         if (rc == 0) {
184                 CERROR("empty MDS %s, new MDS?\n", LAST_RCVD);
185                 RETURN(0);
186         }
187
188         if (rc != sizeof(*msd)) {
189                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD, rc);
190                 if (rc > 0)
191                         rc = -EIO;
192                 GOTO(err_msd, rc);
193         }
194
195         CDEBUG(D_INODE, "last_rcvd has size %lu (msd + %lu clients)\n",
196                last_rcvd_size, (last_rcvd_size - MDS_LR_CLIENT)/MDS_LR_SIZE);
197
198         /*
199          * When we do a clean MDS shutdown, we save the last_transno into
200          * the header.
201          */
202         last_transno = le64_to_cpu(msd->msd_last_transno);
203         mds->mds_last_transno = last_transno;
204         CDEBUG(D_INODE, "got "LPU64" for server last_rcvd value\n",
205                last_transno);
206
207         last_mount = le64_to_cpu(msd->msd_mount_count);
208         mds->mds_mount_count = last_mount;
209         CDEBUG(D_INODE, "got "LPU64" for server last_mount value\n",last_mount);
210
211         /* off is adjusted by lustre_fread, so we don't adjust it in the loop */
212         for (off = MDS_LR_CLIENT, cl_off = 0; off < last_rcvd_size; cl_off++) {
213                 int mount_age;
214
215                 if (!mcd) {
216                         OBD_ALLOC(mcd, sizeof(*mcd));
217                         if (!mcd)
218                                 GOTO(err_msd, rc = -ENOMEM);
219                 }
220
221                 rc = lustre_fread(f, (char *)mcd, sizeof(*mcd), &off);
222                 if (rc != sizeof(*mcd)) {
223                         CERROR("error reading MDS %s offset %d: rc = %d\n",
224                                LAST_RCVD, cl_off, rc);
225                         if (rc > 0) /* XXX fatal error or just abort reading? */
226                                 rc = -EIO;
227                         break;
228                 }
229
230                 if (mcd->mcd_uuid[0] == '\0') {
231                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
232                                cl_off);
233                         continue;
234                 }
235
236                 last_transno = le64_to_cpu(mcd->mcd_last_transno);
237
238                 /* These exports are cleaned up by mds_disconnect(), so they
239                  * need to be set up like real exports as mds_connect() does.
240                  */
241                 mount_age = last_mount - le64_to_cpu(mcd->mcd_mount_count);
242                 if (mount_age < MDS_MOUNT_RECOV) {
243                         struct obd_export *exp = class_new_export(obddev);
244                         struct mds_export_data *med;
245
246                         if (!exp) {
247                                 rc = -ENOMEM;
248                                 break;
249                         }
250
251                         memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
252                                sizeof exp->exp_client_uuid.uuid);
253                         med = &exp->exp_mds_data;
254                         med->med_mcd = mcd;
255                         mds_client_add(mds, med, cl_off);
256                         /* create helper if export init gets more complex */
257                         INIT_LIST_HEAD(&med->med_open_head);
258                         spin_lock_init(&med->med_open_lock);
259
260                         mcd = NULL;
261                         obddev->obd_recoverable_clients++;
262                 } else {
263                         CDEBUG(D_INFO,
264                                "discarded client %d, UUID '%s', count %Ld\n",
265                                cl_off, mcd->mcd_uuid,
266                                (long long)le64_to_cpu(mcd->mcd_mount_count));
267                 }
268
269                 CDEBUG(D_OTHER, "client at offset %d has last_rcvd = %Lu\n",
270                        cl_off, (unsigned long long)last_transno);
271
272                 if (last_transno > mds->mds_last_transno)
273                         mds->mds_last_transno = last_transno;
274         }
275
276         obddev->obd_last_committed = mds->mds_last_transno;
277         if (obddev->obd_recoverable_clients) {
278                 CERROR("RECOVERY: %d recoverable clients, last_transno "
279                        LPU64"\n",
280                        obddev->obd_recoverable_clients, mds->mds_last_transno);
281                 obddev->obd_next_recovery_transno = obddev->obd_last_committed
282                         + 1;
283                 obddev->obd_flags |= OBD_RECOVERING;
284         }
285
286         if (mcd)
287                 OBD_FREE(mcd, sizeof(*mcd));
288
289         return 0;
290
291 err_msd:
292         mds_server_free_data(mds);
293         return rc;
294 }
295
296 static int mds_fs_prep(struct obd_device *obddev)
297 {
298         struct mds_obd *mds = &obddev->u.mds;
299         struct obd_run_ctxt saved;
300         struct dentry *dentry;
301         struct file *f;
302         int rc;
303
304         push_ctxt(&saved, &mds->mds_ctxt, NULL);
305         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
306         if (IS_ERR(dentry)) {
307                 rc = PTR_ERR(dentry);
308                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
309                 GOTO(err_pop, rc);
310         }
311
312         mds->mds_rootfid.id = dentry->d_inode->i_ino;
313         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
314         mds->mds_rootfid.f_type = S_IFDIR;
315
316         dput(dentry);
317
318         dentry = simple_mkdir(current->fs->pwd, "FH", 0700);
319         if (IS_ERR(dentry)) {
320                 rc = PTR_ERR(dentry);
321                 CERROR("cannot create FH directory: rc = %d\n", rc);
322                 GOTO(err_pop, rc);
323         }
324         /* XXX probably want to hold on to this later... */
325         dput(dentry);
326
327         f = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
328         if (IS_ERR(f)) {
329                 rc = PTR_ERR(f);
330                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
331                 GOTO(err_pop, rc = PTR_ERR(f));
332         }
333         if (!S_ISREG(f->f_dentry->d_inode->i_mode)) {
334                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
335                        f->f_dentry->d_inode->i_mode);
336                 GOTO(err_filp, rc = -ENOENT);
337         }
338
339         rc = fsfilt_journal_data(obddev, f);
340         if (rc) {
341                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
342                 GOTO(err_filp, rc);
343         }
344
345         rc = mds_read_last_rcvd(obddev, f);
346         if (rc) {
347                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
348                 GOTO(err_client, rc);
349         }
350         mds->mds_rcvd_filp = f;
351 err_pop:
352         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
353
354         return rc;
355
356 err_client:
357         class_disconnect_all(obddev);
358 err_filp:
359         if (filp_close(f, 0))
360                 CERROR("can't close %s after error\n", LAST_RCVD);
361         goto err_pop;
362 }
363
364 int mds_fs_setup(struct obd_device *obddev, struct vfsmount *mnt)
365 {
366         struct mds_obd *mds = &obddev->u.mds;
367         ENTRY;
368
369         mds->mds_vfsmnt = mnt;
370
371         OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
372         mds->mds_ctxt.pwdmnt = mnt;
373         mds->mds_ctxt.pwd = mnt->mnt_root;
374         mds->mds_ctxt.fs = get_ds();
375
376         RETURN(mds_fs_prep(obddev));
377 }
378
379 int mds_fs_cleanup(struct obd_device *obddev)
380 {
381         struct mds_obd *mds = &obddev->u.mds;
382         struct obd_run_ctxt saved;
383         int rc = 0;
384
385         class_disconnect_all(obddev); /* this cleans up client info too */
386         mds_server_free_data(mds);
387
388         push_ctxt(&saved, &mds->mds_ctxt, NULL);
389         if (mds->mds_rcvd_filp) {
390                 rc = filp_close(mds->mds_rcvd_filp, 0);
391                 mds->mds_rcvd_filp = NULL;
392
393                 if (rc)
394                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
395         }
396         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
397
398         return rc;
399 }