Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_MDS
28
29 #include <linux/module.h>
30 #include <linux/kmod.h>
31 #include <linux/version.h>
32 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
33 #include <linux/mount.h>
34 #endif
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_fsfilt.h>
40 #include <portals/list.h>
41
42 #include "mds_internal.h"
43
44 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
45 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
46 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
47
48 #define LAST_RCVD "last_rcvd"
49
50 /* Add client data to the MDS.  We use a bitmap to locate a free space
51  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
52  * Otherwise, we have just read the data from the last_rcvd file and
53  * we know its offset.
54  */
55 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
56                    struct mds_export_data *med, int cl_idx)
57 {
58         unsigned long *bitmap = mds->mds_client_bitmap;
59         int new_client = (cl_idx == -1);
60
61         LASSERT(bitmap != NULL);
62
63         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
64         if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
65                 RETURN(0);
66
67         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
68          * there's no need for extra complication here
69          */
70         if (new_client) {
71                 cl_idx = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
72         repeat:
73                 if (cl_idx >= MDS_MAX_CLIENTS) {
74                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
75                         return -ENOMEM;
76                 }
77                 if (test_and_set_bit(cl_idx, bitmap)) {
78                         CERROR("MDS client %d: found bit is set in bitmap\n",
79                                cl_idx);
80                         cl_idx = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
81                                                     cl_idx);
82                         goto repeat;
83                 }
84         } else {
85                 if (test_and_set_bit(cl_idx, bitmap)) {
86                         CERROR("MDS client %d: bit already set in bitmap!!\n",
87                                cl_idx);
88                         LBUG();
89                 }
90         }
91
92         CDEBUG(D_INFO, "client at index %d with UUID '%s' added\n",
93                cl_idx, med->med_mcd->mcd_uuid);
94
95         med->med_idx = cl_idx;
96         med->med_off = MDS_LR_CLIENT_START + (cl_idx * MDS_LR_CLIENT_SIZE);
97
98         if (new_client) {
99                 struct obd_run_ctxt saved;
100                 loff_t off = med->med_off;
101                 ssize_t written;
102                 void *handle;
103
104                 push_ctxt(&saved, &mds->mds_ctxt, NULL);
105                 /* We need to start a transaction here first, to avoid a
106                  * possible ordering deadlock on last_rcvd->i_sem and the
107                  * journal lock. In most places we start the journal handle
108                  * first (because we do compound transactions), and then
109                  * later do the write into last_rcvd, which gets i_sem.
110                  *
111                  * Without this transaction, clients connecting at the same
112                  * time other MDS operations are ongoing get last_rcvd->i_sem
113                  * first (in generic_file_write()) and start the journal
114                  * transaction afterwards, and can deadlock with other ops.
115                  *
116                  * We use FSFILT_OP_SETATTR because it is smallest, but all
117                  * ops include enough space for the last_rcvd update so we
118                  * could use any of them, or maybe an FSFILT_OP_NONE is best?
119                  */
120                 handle = fsfilt_start(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
121                                       FSFILT_OP_SETATTR, NULL);
122                 if (IS_ERR(handle)) {
123                         written = PTR_ERR(handle);
124                         CERROR("unable to start transaction: rc %d\n",
125                                (int)written);
126                 } else {
127                         written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
128                                                       (char *)med->med_mcd,
129                                                       sizeof(*med->med_mcd),
130                                                       &off);
131                         fsfilt_commit(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
132                                       handle, 0);
133                 }
134                 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
135
136                 if (written != sizeof(*med->med_mcd)) {
137                         if (written < 0)
138                                 RETURN(written);
139                         RETURN(-EIO);
140                 }
141                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
142                        med->med_idx, med->med_off,
143                        (unsigned int)sizeof(*med->med_mcd));
144         }
145         return 0;
146 }
147
148 int mds_client_free(struct obd_export *exp)
149 {
150         struct mds_export_data *med = &exp->exp_mds_data;
151         struct mds_obd *mds = &exp->exp_obd->u.mds;
152         struct obd_device *obd = exp->exp_obd;
153         struct mds_client_data zero_mcd;
154         struct obd_run_ctxt saved;
155         int written;
156         unsigned long *bitmap = mds->mds_client_bitmap;
157
158         LASSERT(bitmap);
159         if (!med->med_mcd)
160                 RETURN(0);
161
162         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
163         if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
164                 GOTO(free_and_out, 0);
165
166         CDEBUG(D_INFO, "freeing client at index %u (%lld)with UUID '%s'\n",
167                med->med_idx, med->med_off, med->med_mcd->mcd_uuid);
168
169         if (!test_and_clear_bit(med->med_idx, bitmap)) {
170                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
171                        med->med_idx);
172                 LBUG();
173         }
174
175         memset(&zero_mcd, 0, sizeof zero_mcd);
176         push_ctxt(&saved, &mds->mds_ctxt, NULL);
177         written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
178                                       (char *)&zero_mcd, sizeof(zero_mcd),
179                                       &med->med_off);
180         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
181
182         if (written != sizeof(zero_mcd)) {
183                 CERROR("error zeroing out client %s index %d in %s: %d\n",
184                        med->med_mcd->mcd_uuid, med->med_idx, LAST_RCVD,
185                        written);
186         } else {
187                 CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
188                        med->med_mcd->mcd_uuid, med->med_idx);
189         }
190
191  free_and_out:
192         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
193
194         return 0;
195 }
196
197 static int mds_server_free_data(struct mds_obd *mds)
198 {
199         OBD_FREE(mds->mds_client_bitmap,
200                  MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
201         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
202         mds->mds_server_data = NULL;
203
204         return 0;
205 }
206
207 static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
208 {
209         struct mds_obd *mds = &obd->u.mds;
210         struct mds_server_data *msd;
211         struct mds_client_data *mcd = NULL;
212         loff_t off = 0;
213         int cl_idx;
214         unsigned long last_rcvd_size = file->f_dentry->d_inode->i_size;
215         __u64 last_transno = 0;
216         __u64 mount_count;
217         int rc = 0;
218
219         LASSERT(sizeof(struct mds_client_data) == MDS_LR_CLIENT_SIZE);
220         LASSERT(sizeof(struct mds_server_data) <= MDS_LR_SERVER_SIZE);
221
222         OBD_ALLOC(msd, sizeof(*msd));
223         if (!msd)
224                 RETURN(-ENOMEM);
225
226         OBD_ALLOC(mds->mds_client_bitmap,
227                   MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
228         if (!mds->mds_client_bitmap) {
229                 OBD_FREE(msd, sizeof(*msd));
230                 RETURN(-ENOMEM);
231         }
232
233         mds->mds_server_data = msd;
234
235         if (last_rcvd_size == 0) {
236                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
237                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
238                 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
239                 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
240                 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
241
242                 RETURN(0);
243         }
244
245         rc = fsfilt_read_record(obd, file, (char *)msd, sizeof(*msd), &off);
246
247         if (rc != sizeof(*msd)) {
248                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD,rc);
249                 if (rc > 0)
250                         rc = -EIO;
251                 GOTO(err_msd, rc);
252         }
253         if (!msd->msd_server_size)
254                 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
255         if (!msd->msd_client_start)
256                 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
257         if (!msd->msd_client_size)
258                 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
259
260         if (msd->msd_feature_incompat) {
261                 CERROR("unsupported incompat feature %x\n",
262                        le32_to_cpu(msd->msd_feature_incompat));
263                 GOTO(err_msd, rc = -EINVAL);
264         }
265         if (msd->msd_feature_rocompat) {
266                 CERROR("unsupported read-only feature %x\n",
267                        le32_to_cpu(msd->msd_feature_rocompat));
268                 /* Do something like remount filesystem read-only */
269                 GOTO(err_msd, rc = -EINVAL);
270         }
271
272         last_transno = le64_to_cpu(msd->msd_last_transno);
273         mds->mds_last_transno = last_transno;
274
275         mount_count = le64_to_cpu(msd->msd_mount_count);
276         mds->mds_mount_count = mount_count;
277
278         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
279                obd->obd_name, last_transno);
280         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
281                obd->obd_name, mount_count);
282         CDEBUG(D_INODE, "%s: server data size: %u\n",
283                obd->obd_name, le32_to_cpu(msd->msd_server_size));
284         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
285                obd->obd_name, le32_to_cpu(msd->msd_client_start));
286         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
287                obd->obd_name, le32_to_cpu(msd->msd_client_size));
288         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
289                obd->obd_name, last_rcvd_size);
290         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
291                (last_rcvd_size - MDS_LR_CLIENT_START) / MDS_LR_CLIENT_SIZE);
292
293         /* When we do a clean FILTER shutdown, we save the last_transno into
294          * the header.  If we find clients with higher last_transno values
295          * then those clients may need recovery done. */
296         for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
297                 __u64 last_transno;
298                 int mount_age;
299
300                 if (!mcd) {
301                         OBD_ALLOC(mcd, sizeof(*mcd));
302                         if (!mcd)
303                                 GOTO(err_msd, rc = -ENOMEM);
304                 }
305
306                 /* Don't assume off is incremented properly, in case
307                  * sizeof(fsd) isn't the same as fsd->fsd_client_size.
308                  */
309                 off = le32_to_cpu(msd->msd_client_start) +
310                         cl_idx * le16_to_cpu(msd->msd_client_size);
311                 rc = fsfilt_read_record(obd, file, (char *)mcd,
312                                         sizeof(*mcd), &off);
313                 if (rc != sizeof(*mcd)) {
314                         CERROR("error reading MDS %s offset %d: rc = %d\n",
315                                LAST_RCVD, cl_idx, rc);
316                         if (rc > 0) /* XXX fatal error or just abort reading? */
317                                 rc = -EIO;
318                         break;
319                 }
320
321                 if (mcd->mcd_uuid[0] == '\0') {
322                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
323                                cl_idx);
324                         continue;
325                 }
326
327                 last_transno = le64_to_cpu(mcd->mcd_last_transno);
328
329                 /* These exports are cleaned up by mds_disconnect(), so they
330                  * need to be set up like real exports as mds_connect() does.
331                  */
332                 mount_age = mount_count - le64_to_cpu(mcd->mcd_mount_count);
333                 if (mount_age < MDS_MOUNT_RECOV) {
334                         struct obd_export *exp = class_new_export(obd);
335                         struct mds_export_data *med;
336                         CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
337                                "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
338                                "\n", mcd->mcd_uuid, cl_idx,
339                                last_transno, le64_to_cpu(msd->msd_last_transno),
340                                le64_to_cpu(mcd->mcd_mount_count), mount_count);
341
342                         if (!exp) {
343                                 rc = -ENOMEM;
344                                 break;
345                         }
346
347                         memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
348                                sizeof exp->exp_client_uuid.uuid);
349                         med = &exp->exp_mds_data;
350                         med->med_mcd = mcd;
351                         mds_client_add(obd, mds, med, cl_idx);
352                         /* create helper if export init gets more complex */
353                         INIT_LIST_HEAD(&med->med_open_head);
354                         spin_lock_init(&med->med_open_lock);
355
356                         mcd = NULL;
357                         obd->obd_recoverable_clients++;
358                         class_export_put(exp);
359                 } else {
360                         CDEBUG(D_INFO, "discarded client %d, UUID '%s', count "
361                                LPU64"\n", cl_idx, mcd->mcd_uuid,
362                                le64_to_cpu(mcd->mcd_mount_count));
363                 }
364
365                 CDEBUG(D_OTHER, "client at offset %d has last_transno = "
366                        LPU64"\n", cl_idx, last_transno);
367
368                 if (last_transno > mds->mds_last_transno)
369                         mds->mds_last_transno = last_transno;
370         }
371
372         obd->obd_last_committed = mds->mds_last_transno;
373         if (obd->obd_recoverable_clients) {
374                 CERROR("RECOVERY: %d recoverable clients, last_transno "
375                        LPU64"\n",
376                        obd->obd_recoverable_clients, mds->mds_last_transno);
377                 obd->obd_next_recovery_transno = obd->obd_last_committed
378                         + 1;
379                 obd->obd_recovering = 1;
380         }
381
382         if (mcd)
383                 OBD_FREE(mcd, sizeof(*mcd));
384
385         return 0;
386
387 err_msd:
388         mds_server_free_data(mds);
389         return rc;
390 }
391
392 static int mds_fs_prep(struct obd_device *obd)
393 {
394         struct mds_obd *mds = &obd->u.mds;
395         struct obd_run_ctxt saved;
396         struct dentry *dentry;
397         struct file *file;
398         int rc;
399
400         push_ctxt(&saved, &mds->mds_ctxt, NULL);
401         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
402         if (IS_ERR(dentry)) {
403                 rc = PTR_ERR(dentry);
404                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
405                 GOTO(err_pop, rc);
406         }
407
408         mds->mds_rootfid.id = dentry->d_inode->i_ino;
409         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
410         mds->mds_rootfid.f_type = S_IFDIR;
411
412         dput(dentry);
413
414         dentry = lookup_one_len("__iopen__", current->fs->pwd,
415                                 strlen("__iopen__"));
416         if (IS_ERR(dentry) || !dentry->d_inode) {
417                 rc = (IS_ERR(dentry)) ? PTR_ERR(dentry): -ENOENT;
418                 CERROR("cannot open iopen FH directory: rc = %d\n", rc);
419                 GOTO(err_pop, rc);
420         }
421         mds->mds_fid_de = dentry;
422
423         dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777);
424         if (IS_ERR(dentry)) {
425                 rc = PTR_ERR(dentry);
426                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
427                 GOTO(err_fid, rc);
428         }
429         mds->mds_pending_dir = dentry;
430
431         dentry = simple_mkdir(current->fs->pwd, "LOGS", 0700);
432         if (IS_ERR(dentry)) {
433                 rc = PTR_ERR(dentry);
434                 CERROR("cannot create LOGS directory: rc = %d\n", rc);
435                 GOTO(err_pending, rc);
436         }
437         mds->mds_logs_dir = dentry;
438
439         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
440         if (IS_ERR(file)) {
441                 rc = PTR_ERR(file);
442                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
443
444                 GOTO(err_logs, rc = PTR_ERR(file));
445         }
446         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
447                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
448                        file->f_dentry->d_inode->i_mode);
449                 GOTO(err_filp, rc = -ENOENT);
450         }
451
452         rc = fsfilt_journal_data(obd, file);
453         if (rc) {
454                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
455                 GOTO(err_filp, rc);
456         }
457
458         rc = mds_read_last_rcvd(obd, file);
459         if (rc) {
460                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
461                 GOTO(err_client, rc);
462         }
463         mds->mds_rcvd_filp = file;
464 #ifdef I_SKIP_PDFLUSH
465         /*
466          * we need this to protect from deadlock
467          * pdflush vs. lustre_fwrite()
468          */
469         file->f_dentry->d_inode->i_flags |= I_SKIP_PDFLUSH;
470 #endif
471 err_pop:
472         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
473
474         return rc;
475
476 err_client:
477         class_disconnect_exports(obd, 0);
478 err_filp:
479         if (filp_close(file, 0))
480                 CERROR("can't close %s after error\n", LAST_RCVD);
481 err_logs:
482         dput(mds->mds_logs_dir);
483 err_pending:
484         dput(mds->mds_pending_dir);
485 err_fid:
486         dput(mds->mds_fid_de);
487         goto err_pop;
488 }
489
490 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
491 {
492         struct mds_obd *mds = &obd->u.mds;
493         ENTRY;
494
495         mds->mds_vfsmnt = mnt;
496
497         OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
498         mds->mds_ctxt.pwdmnt = mnt;
499         mds->mds_ctxt.pwd = mnt->mnt_root;
500         mds->mds_ctxt.fs = get_ds();
501         RETURN(mds_fs_prep(obd));
502 }
503
504 int mds_fs_cleanup(struct obd_device *obd, int flags)
505 {
506         struct mds_obd *mds = &obd->u.mds;
507         struct obd_run_ctxt saved;
508         int rc = 0;
509
510         if (flags & OBD_OPT_FAILOVER)
511                 CERROR("%s: shutting down for failover; client state will"
512                        " be preserved.\n", obd->obd_name);
513
514         class_disconnect_exports(obd, flags); /* cleans up client info too */
515         mds_server_free_data(mds);
516
517         push_ctxt(&saved, &mds->mds_ctxt, NULL);
518         if (mds->mds_rcvd_filp) {
519                 rc = filp_close(mds->mds_rcvd_filp, 0);
520                 mds->mds_rcvd_filp = NULL;
521                 if (rc)
522                         CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
523         }
524         if (mds->mds_logs_dir) {
525                 l_dput(mds->mds_logs_dir);
526                 mds->mds_logs_dir = NULL;
527         }
528         if (mds->mds_pending_dir) {
529                 l_dput(mds->mds_pending_dir);
530                 mds->mds_pending_dir = NULL;
531         }
532         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
533         shrink_dcache_parent(mds->mds_fid_de);
534         dput(mds->mds_fid_de);
535
536         return rc;
537 }
538
539 /* This is a callback from the llog_* functions.
540  * Assumes caller has already pushed us into the kernel context. */
541 int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle)
542 {
543         struct llog_object_hdr *llh = loghandle->lgh_hdr;
544         struct mds_obd *mds = &cathandle->lgh_obd->u.mds;
545         struct dentry *dchild = NULL;
546         int rc;
547         ENTRY;
548
549         /* If we are going to delete this log, grab a ref before we close
550          * it so we don't have to immediately do another lookup.
551          */
552         if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
553                 CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
554                        loghandle->lgh_cookie.lgc_lgl.lgl_oid,
555                        loghandle->lgh_cookie.lgc_lgl.lgl_ogen);
556                 down(&mds->mds_logs_dir->d_inode->i_sem);
557                 dchild = dget(loghandle->lgh_file->f_dentry);
558                 llog_delete_log(cathandle, loghandle);
559         } else {
560                 CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
561                        loghandle->lgh_cookie.lgc_lgl.lgl_oid,
562                        loghandle->lgh_cookie.lgc_lgl.lgl_ogen);
563         }
564
565         rc = filp_close(loghandle->lgh_file, 0);
566
567         llog_free_handle(loghandle); /* also removes loghandle from list */
568
569         if (dchild) {
570                 int err = vfs_unlink(mds->mds_logs_dir->d_inode, dchild);
571                 if (err) {
572                         CERROR("error unlinking empty log %*s: rc %d\n",
573                                dchild->d_name.len, dchild->d_name.name, err);
574                         if (!rc)
575                                 rc = err;
576                 }
577                 l_dput(dchild);
578                 up(&mds->mds_logs_dir->d_inode->i_sem);
579         }
580         RETURN(rc);
581 }
582
583 /* This is a callback from the llog_* functions.
584  * Assumes caller has already pushed us into the kernel context. */
585 struct llog_handle *mds_log_open(struct obd_device *obd,
586                                  struct llog_cookie *logcookie)
587 {
588         struct ll_fid fid = { .id = logcookie->lgc_lgl.lgl_oid,
589                               .generation = logcookie->lgc_lgl.lgl_ogen,
590                               .f_type = S_IFREG };
591         struct llog_handle *loghandle;
592         struct dentry *dchild;
593         int rc;
594         ENTRY;
595
596         loghandle = llog_alloc_handle();
597         if (loghandle == NULL)
598                 RETURN(ERR_PTR(-ENOMEM));
599
600         down(&obd->u.mds.mds_logs_dir->d_inode->i_sem);
601         dchild = mds_fid2dentry(&obd->u.mds, &fid, NULL);
602         up(&obd->u.mds.mds_logs_dir->d_inode->i_sem);
603         if (IS_ERR(dchild)) {
604                 rc = PTR_ERR(dchild);
605                 CERROR("error looking up log file "LPX64":%x: rc %d\n",
606                        fid.id, fid.generation, rc);
607                 GOTO(out, rc);
608         }
609
610         if (dchild->d_inode == NULL) {
611                 rc = -ENOENT;
612                 CERROR("nonexistent log file "LPX64":%x: rc %d\n",
613                        fid.id, fid.generation, rc);
614                 GOTO(out_put, rc);
615         }
616
617         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
618         mntget(obd->u.mds.mds_vfsmnt);
619         loghandle->lgh_file = dentry_open(dchild, obd->u.mds.mds_vfsmnt,
620                                           O_RDWR | O_LARGEFILE);
621         if (IS_ERR(loghandle->lgh_file)) {
622                 rc = PTR_ERR(loghandle->lgh_file);
623                 CERROR("error opening logfile "LPX64":%x: rc %d\n",
624                        fid.id, fid.generation, rc);
625                 GOTO(out, rc);
626         }
627         memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
628         loghandle->lgh_log_create = mds_log_create;
629         loghandle->lgh_log_open = mds_log_open;
630         loghandle->lgh_log_close = mds_log_close;
631         loghandle->lgh_obd = obd;
632
633         RETURN(loghandle);
634
635 out_put:
636         l_dput(dchild);
637 out:
638         llog_free_handle(loghandle);
639         return ERR_PTR(rc);
640 }
641
642 /* This is a callback from the llog_* functions.
643  * Assumes caller has already pushed us into the kernel context. */
644 struct llog_handle *mds_log_create(struct obd_device *obd)
645 {
646         char logbuf[24], *logname; /* logSSSSSSSSSS.count */
647         struct llog_handle *loghandle;
648         int rc, open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
649         ENTRY;
650
651         loghandle = llog_alloc_handle();
652         if (!loghandle)
653                 RETURN(ERR_PTR(-ENOMEM));
654
655 retry:
656         if (!obd->u.mds.mds_catalog) {
657                 logname = "LOGS/catalog";
658         } else {
659                 sprintf(logbuf, "LOGS/log%lu.%u\n",
660                         CURRENT_SECONDS, obd->u.mds.mds_catalog->lgh_index++);
661                 open_flags |= O_EXCL;
662                 logname = logbuf;
663         }
664         loghandle->lgh_file = filp_open(logname, open_flags, 0644);
665         if (IS_ERR(loghandle->lgh_file)) {
666                 rc = PTR_ERR(loghandle->lgh_file);
667                 if (rc == -EEXIST) {
668                         CDEBUG(D_HA, "collision in logfile %s creation\n",
669                                logname);
670                         obd->u.mds.mds_catalog->lgh_index++;
671                         goto retry;
672                 }
673                 CERROR("error opening/creating %s: rc %d\n", logname, rc);
674                 GOTO(out_handle, rc);
675         }
676
677         loghandle->lgh_cookie.lgc_lgl.lgl_oid =
678                 loghandle->lgh_file->f_dentry->d_inode->i_ino;
679         loghandle->lgh_cookie.lgc_lgl.lgl_ogen =
680                 loghandle->lgh_file->f_dentry->d_inode->i_generation;
681         loghandle->lgh_log_create = mds_log_create;
682         loghandle->lgh_log_open = mds_log_open;
683         loghandle->lgh_log_close = mds_log_close;
684         loghandle->lgh_obd = obd;
685
686         RETURN(loghandle);
687
688 out_handle:
689         llog_free_handle(loghandle);
690         return ERR_PTR(rc);
691 }
692
693 struct llog_handle *mds_get_catalog(struct obd_device *obd)
694 {
695         struct mds_server_data *msd = obd->u.mds.mds_server_data;
696         struct obd_run_ctxt saved;
697         struct llog_handle *cathandle = NULL;
698         int rc = 0;
699         ENTRY;
700
701         push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
702
703         if (msd->msd_catalog_oid) {
704                 struct llog_cookie catcookie;
705
706                 catcookie.lgc_lgl.lgl_oid = le64_to_cpu(msd->msd_catalog_oid);
707                 catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(msd->msd_catalog_ogen);
708                 cathandle = mds_log_open(obd, &catcookie);
709                 if (IS_ERR(cathandle)) {
710                         CERROR("error opening catalog "LPX64":%x: rc %d\n",
711                                catcookie.lgc_lgl.lgl_oid,
712                                catcookie.lgc_lgl.lgl_ogen,
713                                (int)PTR_ERR(cathandle));
714                         msd->msd_catalog_oid = 0;
715                         msd->msd_catalog_ogen = 0;
716                 }
717                 /* ORPHANS FIXME: compare catalog UUID to msd_peeruuid */
718         }
719
720         if (!msd->msd_catalog_oid) {
721                 struct llog_logid *lgl;
722
723                 cathandle = mds_log_create(obd);
724                 if (IS_ERR(cathandle)) {
725                         CERROR("error creating new catalog: rc %d\n",
726                                (int)PTR_ERR(cathandle));
727                         GOTO(out, cathandle);
728                 }
729                 lgl = &cathandle->lgh_cookie.lgc_lgl;
730                 msd->msd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
731                 msd->msd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
732                 rc = mds_update_server_data(obd);
733                 if (rc) {
734                         CERROR("error writing new catalog to disk: rc %d\n",rc);
735                         GOTO(out_handle, rc);
736                 }
737         }
738
739         rc = llog_init_catalog(cathandle, &obd->u.mds.mds_osc_uuid);
740
741 out:
742         pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
743         RETURN(cathandle);
744
745 out_handle:
746         mds_log_close(cathandle, cathandle);
747         cathandle = ERR_PTR(rc);
748         goto out;
749
750 }
751
752 void mds_put_catalog(struct llog_handle *cathandle)
753 {
754         struct llog_handle *loghandle, *n;
755         int rc;
756         ENTRY;
757
758         list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
759                 mds_log_close(cathandle, loghandle);
760
761         rc = filp_close(cathandle->lgh_file, 0);
762         if (rc)
763                 CERROR("error closing catalog: rc %d\n", rc);
764
765         llog_free_handle(cathandle);
766         EXIT;
767 }