Whamcloud - gitweb
Write out MDS last_rcvd when first created, don't wait for a client connect.
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mds/mds_fs.c
5  *  Lustre Metadata Server (MDS) filesystem interface code
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Andreas Dilger <adilger@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_MDS
28
29 #include <linux/module.h>
30 #include <linux/kmod.h>
31 #include <linux/version.h>
32 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
33 #include <linux/mount.h>
34 #endif
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_fsfilt.h>
40 #include <portals/list.h>
41
42 #include "mds_internal.h"
43
44 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
45 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
46 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
47
48 #define LAST_RCVD "last_rcvd"
49
50 /* Add client data to the MDS.  We use a bitmap to locate a free space
51  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
52  * Otherwise, we have just read the data from the last_rcvd file and
53  * we know its offset.
54  */
55 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
56                    struct mds_export_data *med, int cl_idx)
57 {
58         unsigned long *bitmap = mds->mds_client_bitmap;
59         int new_client = (cl_idx == -1);
60
61         LASSERT(bitmap != NULL);
62
63         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
64         if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
65                 RETURN(0);
66
67         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
68          * there's no need for extra complication here
69          */
70         if (new_client) {
71                 cl_idx = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
72         repeat:
73                 if (cl_idx >= MDS_MAX_CLIENTS) {
74                         CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
75                         return -ENOMEM;
76                 }
77                 if (test_and_set_bit(cl_idx, bitmap)) {
78                         CERROR("MDS client %d: found bit is set in bitmap\n",
79                                cl_idx);
80                         cl_idx = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
81                                                     cl_idx);
82                         goto repeat;
83                 }
84         } else {
85                 if (test_and_set_bit(cl_idx, bitmap)) {
86                         CERROR("MDS client %d: bit already set in bitmap!!\n",
87                                cl_idx);
88                         LBUG();
89                 }
90         }
91
92         CDEBUG(D_INFO, "client at index %d with UUID '%s' added\n",
93                cl_idx, med->med_mcd->mcd_uuid);
94
95         med->med_idx = cl_idx;
96         med->med_off = MDS_LR_CLIENT_START + (cl_idx * MDS_LR_CLIENT_SIZE);
97
98         if (new_client) {
99                 struct obd_run_ctxt saved;
100                 loff_t off = med->med_off;
101                 ssize_t written;
102                 void *handle;
103
104                 push_ctxt(&saved, &mds->mds_ctxt, NULL);
105                 /* We need to start a transaction here first, to avoid a
106                  * possible ordering deadlock on last_rcvd->i_sem and the
107                  * journal lock. In most places we start the journal handle
108                  * first (because we do compound transactions), and then
109                  * later do the write into last_rcvd, which gets i_sem.
110                  *
111                  * Without this transaction, clients connecting at the same
112                  * time other MDS operations are ongoing get last_rcvd->i_sem
113                  * first (in generic_file_write()) and start the journal
114                  * transaction afterwards, and can deadlock with other ops.
115                  *
116                  * We use FSFILT_OP_SETATTR because it is smallest, but all
117                  * ops include enough space for the last_rcvd update so we
118                  * could use any of them, or maybe an FSFILT_OP_NONE is best?
119                  */
120                 handle = fsfilt_start(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
121                                       FSFILT_OP_SETATTR, NULL);
122                 if (IS_ERR(handle)) {
123                         written = PTR_ERR(handle);
124                         CERROR("unable to start transaction: rc %d\n",
125                                (int)written);
126                 } else {
127                         written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
128                                                       med->med_mcd,
129                                                       sizeof(*med->med_mcd),
130                                                       &off);
131                         fsfilt_commit(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
132                                       handle, 0);
133                 }
134                 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
135
136                 if (written != sizeof(*med->med_mcd)) {
137                         if (written < 0)
138                                 RETURN(written);
139                         RETURN(-EIO);
140                 }
141                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
142                        med->med_idx, med->med_off,
143                        (unsigned int)sizeof(*med->med_mcd));
144         }
145         return 0;
146 }
147
148 int mds_client_free(struct obd_export *exp)
149 {
150         struct mds_export_data *med = &exp->exp_mds_data;
151         struct mds_obd *mds = &exp->exp_obd->u.mds;
152         struct obd_device *obd = exp->exp_obd;
153         struct mds_client_data zero_mcd;
154         struct obd_run_ctxt saved;
155         int written;
156         unsigned long *bitmap = mds->mds_client_bitmap;
157
158         LASSERT(bitmap);
159         if (!med->med_mcd)
160                 RETURN(0);
161
162         /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
163         if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
164                 GOTO(free_and_out, 0);
165
166         CDEBUG(D_INFO, "freeing client at index %u (%lld)with UUID '%s'\n",
167                med->med_idx, med->med_off, med->med_mcd->mcd_uuid);
168
169         if (!test_and_clear_bit(med->med_idx, bitmap)) {
170                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
171                        med->med_idx);
172                 LBUG();
173         }
174
175         memset(&zero_mcd, 0, sizeof zero_mcd);
176         push_ctxt(&saved, &mds->mds_ctxt, NULL);
177         written = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd,
178                                       sizeof(zero_mcd), &med->med_off);
179         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
180
181         if (written != sizeof(zero_mcd)) {
182                 CERROR("error zeroing out client %s index %d in %s: %d\n",
183                        med->med_mcd->mcd_uuid, med->med_idx, LAST_RCVD,
184                        written);
185         } else {
186                 CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
187                        med->med_mcd->mcd_uuid, med->med_idx);
188         }
189
190  free_and_out:
191         OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
192
193         return 0;
194 }
195
196 static int mds_server_free_data(struct mds_obd *mds)
197 {
198         OBD_FREE(mds->mds_client_bitmap,
199                  MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
200         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
201         mds->mds_server_data = NULL;
202
203         return 0;
204 }
205
206 static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
207 {
208         struct mds_obd *mds = &obd->u.mds;
209         struct mds_server_data *msd;
210         struct mds_client_data *mcd = NULL;
211         loff_t off = 0;
212         int cl_idx;
213         unsigned long last_rcvd_size = file->f_dentry->d_inode->i_size;
214         __u64 last_transno = 0;
215         __u64 mount_count;
216         int rc = 0;
217
218         LASSERT(sizeof(struct mds_client_data) == MDS_LR_CLIENT_SIZE);
219         LASSERT(sizeof(struct mds_server_data) <= MDS_LR_SERVER_SIZE);
220
221         OBD_ALLOC(msd, sizeof(*msd));
222         if (!msd)
223                 RETURN(-ENOMEM);
224
225         OBD_ALLOC(mds->mds_client_bitmap,
226                   MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
227         if (!mds->mds_client_bitmap) {
228                 OBD_FREE(msd, sizeof(*msd));
229                 RETURN(-ENOMEM);
230         }
231
232         mds->mds_server_data = msd;
233
234         if (last_rcvd_size == 0) {
235                 int written;
236                 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
237                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
238                 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
239                 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
240                 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
241                 written = fsfilt_write_record(obd, file, msd, sizeof(*msd),
242                                               &off);
243
244                 if (written == sizeof(*msd))
245                         RETURN(0);
246                 CERROR("%s: error writing new MSD: %d\n", obd->obd_name,
247                        written);
248                 GOTO(err_msd, rc = (written < 0 ? written : -EIO));
249         }
250
251         rc = fsfilt_read_record(obd, file, msd, sizeof(*msd), &off);
252
253         if (rc != sizeof(*msd)) {
254                 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD,rc);
255                 if (rc > 0)
256                         rc = -EIO;
257                 GOTO(err_msd, rc);
258         }
259         if (!msd->msd_server_size)
260                 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
261         if (!msd->msd_client_start)
262                 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
263         if (!msd->msd_client_size)
264                 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
265
266         if (msd->msd_feature_incompat) {
267                 CERROR("unsupported incompat feature %x\n",
268                        le32_to_cpu(msd->msd_feature_incompat));
269                 GOTO(err_msd, rc = -EINVAL);
270         }
271         if (msd->msd_feature_rocompat) {
272                 CERROR("unsupported read-only feature %x\n",
273                        le32_to_cpu(msd->msd_feature_rocompat));
274                 /* Do something like remount filesystem read-only */
275                 GOTO(err_msd, rc = -EINVAL);
276         }
277
278         last_transno = le64_to_cpu(msd->msd_last_transno);
279         mds->mds_last_transno = last_transno;
280
281         mount_count = le64_to_cpu(msd->msd_mount_count);
282         mds->mds_mount_count = mount_count;
283
284         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
285                obd->obd_name, last_transno);
286         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
287                obd->obd_name, mount_count);
288         CDEBUG(D_INODE, "%s: server data size: %u\n",
289                obd->obd_name, le32_to_cpu(msd->msd_server_size));
290         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
291                obd->obd_name, le32_to_cpu(msd->msd_client_start));
292         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
293                obd->obd_name, le32_to_cpu(msd->msd_client_size));
294         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
295                obd->obd_name, last_rcvd_size);
296         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
297                (last_rcvd_size - MDS_LR_CLIENT_START) / MDS_LR_CLIENT_SIZE);
298
299         /* When we do a clean FILTER shutdown, we save the last_transno into
300          * the header.  If we find clients with higher last_transno values
301          * then those clients may need recovery done. */
302         for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
303                 __u64 last_transno;
304                 int mount_age;
305
306                 if (!mcd) {
307                         OBD_ALLOC(mcd, sizeof(*mcd));
308                         if (!mcd)
309                                 GOTO(err_msd, rc = -ENOMEM);
310                 }
311
312                 /* Don't assume off is incremented properly, in case
313                  * sizeof(fsd) isn't the same as fsd->fsd_client_size.
314                  */
315                 off = le32_to_cpu(msd->msd_client_start) +
316                         cl_idx * le16_to_cpu(msd->msd_client_size);
317                 rc = fsfilt_read_record(obd, file, mcd, sizeof(*mcd), &off);
318                 if (rc != sizeof(*mcd)) {
319                         CERROR("error reading MDS %s offset %d: rc = %d\n",
320                                LAST_RCVD, cl_idx, rc);
321                         if (rc > 0) /* XXX fatal error or just abort reading? */
322                                 rc = -EIO;
323                         break;
324                 }
325
326                 if (mcd->mcd_uuid[0] == '\0') {
327                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
328                                cl_idx);
329                         continue;
330                 }
331
332                 last_transno = le64_to_cpu(mcd->mcd_last_transno);
333
334                 /* These exports are cleaned up by mds_disconnect(), so they
335                  * need to be set up like real exports as mds_connect() does.
336                  */
337                 mount_age = mount_count - le64_to_cpu(mcd->mcd_mount_count);
338                 if (mount_age < MDS_MOUNT_RECOV) {
339                         struct obd_export *exp = class_new_export(obd);
340                         struct mds_export_data *med;
341                         CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
342                                "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
343                                "\n", mcd->mcd_uuid, cl_idx,
344                                last_transno, le64_to_cpu(msd->msd_last_transno),
345                                le64_to_cpu(mcd->mcd_mount_count), mount_count);
346
347                         if (!exp) {
348                                 rc = -ENOMEM;
349                                 break;
350                         }
351
352                         memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
353                                sizeof exp->exp_client_uuid.uuid);
354                         med = &exp->exp_mds_data;
355                         med->med_mcd = mcd;
356                         mds_client_add(obd, mds, med, cl_idx);
357                         /* create helper if export init gets more complex */
358                         INIT_LIST_HEAD(&med->med_open_head);
359                         spin_lock_init(&med->med_open_lock);
360
361                         mcd = NULL;
362                         obd->obd_recoverable_clients++;
363                         class_export_put(exp);
364                 } else {
365                         CDEBUG(D_INFO, "discarded client %d, UUID '%s', count "
366                                LPU64"\n", cl_idx, mcd->mcd_uuid,
367                                le64_to_cpu(mcd->mcd_mount_count));
368                 }
369
370                 CDEBUG(D_OTHER, "client at offset %d has last_transno = "
371                        LPU64"\n", cl_idx, last_transno);
372
373                 if (last_transno > mds->mds_last_transno)
374                         mds->mds_last_transno = last_transno;
375         }
376
377         obd->obd_last_committed = mds->mds_last_transno;
378         if (obd->obd_recoverable_clients) {
379                 CERROR("RECOVERY: %d recoverable clients, last_transno "
380                        LPU64"\n",
381                        obd->obd_recoverable_clients, mds->mds_last_transno);
382                 obd->obd_next_recovery_transno = obd->obd_last_committed
383                         + 1;
384                 obd->obd_recovering = 1;
385         }
386
387         if (mcd)
388                 OBD_FREE(mcd, sizeof(*mcd));
389
390         return 0;
391
392 err_msd:
393         mds_server_free_data(mds);
394         return rc;
395 }
396
397 static int mds_fs_prep(struct obd_device *obd)
398 {
399         struct mds_obd *mds = &obd->u.mds;
400         struct obd_run_ctxt saved;
401         struct dentry *dentry;
402         struct file *file;
403         int rc;
404
405         push_ctxt(&saved, &mds->mds_ctxt, NULL);
406         dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
407         if (IS_ERR(dentry)) {
408                 rc = PTR_ERR(dentry);
409                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
410                 GOTO(err_pop, rc);
411         }
412
413         mds->mds_rootfid.id = dentry->d_inode->i_ino;
414         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
415         mds->mds_rootfid.f_type = S_IFDIR;
416
417         dput(dentry);
418
419         dentry = lookup_one_len("__iopen__", current->fs->pwd,
420                                 strlen("__iopen__"));
421         if (IS_ERR(dentry) || !dentry->d_inode) {
422                 rc = (IS_ERR(dentry)) ? PTR_ERR(dentry): -ENOENT;
423                 CERROR("cannot open iopen FH directory: rc = %d\n", rc);
424                 GOTO(err_pop, rc);
425         }
426         mds->mds_fid_de = dentry;
427
428         dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777);
429         if (IS_ERR(dentry)) {
430                 rc = PTR_ERR(dentry);
431                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
432                 GOTO(err_fid, rc);
433         }
434         mds->mds_pending_dir = dentry;
435
436         dentry = simple_mkdir(current->fs->pwd, "LOGS", 0700);
437         if (IS_ERR(dentry)) {
438                 rc = PTR_ERR(dentry);
439                 CERROR("cannot create LOGS directory: rc = %d\n", rc);
440                 GOTO(err_pending, rc);
441         }
442         mds->mds_logs_dir = dentry;
443
444         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
445         if (IS_ERR(file)) {
446                 rc = PTR_ERR(file);
447                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
448
449                 GOTO(err_logs, rc = PTR_ERR(file));
450         }
451         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
452                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
453                        file->f_dentry->d_inode->i_mode);
454                 GOTO(err_filp, rc = -ENOENT);
455         }
456
457         rc = fsfilt_journal_data(obd, file);
458         if (rc) {
459                 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
460                 GOTO(err_filp, rc);
461         }
462
463         rc = mds_read_last_rcvd(obd, file);
464         if (rc) {
465                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
466                 GOTO(err_client, rc);
467         }
468         mds->mds_rcvd_filp = file;
469 #ifdef I_SKIP_PDFLUSH
470         /*
471          * we need this to protect from deadlock
472          * pdflush vs. lustre_fwrite()
473          */
474         file->f_dentry->d_inode->i_flags |= I_SKIP_PDFLUSH;
475 #endif
476 err_pop:
477         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
478
479         return rc;
480
481 err_client:
482         class_disconnect_exports(obd, 0);
483 err_filp:
484         if (filp_close(file, 0))
485                 CERROR("can't close %s after error\n", LAST_RCVD);
486 err_logs:
487         dput(mds->mds_logs_dir);
488 err_pending:
489         dput(mds->mds_pending_dir);
490 err_fid:
491         dput(mds->mds_fid_de);
492         goto err_pop;
493 }
494
495 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
496 {
497         struct mds_obd *mds = &obd->u.mds;
498         ENTRY;
499
500         mds->mds_vfsmnt = mnt;
501
502         OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
503         mds->mds_ctxt.pwdmnt = mnt;
504         mds->mds_ctxt.pwd = mnt->mnt_root;
505         mds->mds_ctxt.fs = get_ds();
506         RETURN(mds_fs_prep(obd));
507 }
508
509 int mds_fs_cleanup(struct obd_device *obd, int flags)
510 {
511         struct mds_obd *mds = &obd->u.mds;
512         struct obd_run_ctxt saved;
513         int rc = 0;
514
515         if (flags & OBD_OPT_FAILOVER)
516                 CERROR("%s: shutting down for failover; client state will"
517                        " be preserved.\n", obd->obd_name);
518
519         class_disconnect_exports(obd, flags); /* cleans up client info too */
520         mds_server_free_data(mds);
521
522         push_ctxt(&saved, &mds->mds_ctxt, NULL);
523         if (mds->mds_rcvd_filp) {
524                 rc = filp_close(mds->mds_rcvd_filp, 0);
525                 mds->mds_rcvd_filp = NULL;
526                 if (rc)
527                         CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
528         }
529         if (mds->mds_logs_dir) {
530                 l_dput(mds->mds_logs_dir);
531                 mds->mds_logs_dir = NULL;
532         }
533         if (mds->mds_pending_dir) {
534                 l_dput(mds->mds_pending_dir);
535                 mds->mds_pending_dir = NULL;
536         }
537         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
538         shrink_dcache_parent(mds->mds_fid_de);
539         dput(mds->mds_fid_de);
540
541         return rc;
542 }
543
544 /* This is a callback from the llog_* functions.
545  * Assumes caller has already pushed us into the kernel context. */
546 int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle)
547 {
548         struct llog_object_hdr *llh = loghandle->lgh_hdr;
549         struct mds_obd *mds = &cathandle->lgh_obd->u.mds;
550         struct dentry *dchild = NULL;
551         int rc;
552         ENTRY;
553
554         /* If we are going to delete this log, grab a ref before we close
555          * it so we don't have to immediately do another lookup.
556          */
557         if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
558                 CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
559                        loghandle->lgh_cookie.lgc_lgl.lgl_oid,
560                        loghandle->lgh_cookie.lgc_lgl.lgl_ogen);
561                 down(&mds->mds_logs_dir->d_inode->i_sem);
562                 dchild = dget(loghandle->lgh_file->f_dentry);
563                 llog_delete_log(cathandle, loghandle);
564         } else {
565                 CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
566                        loghandle->lgh_cookie.lgc_lgl.lgl_oid,
567                        loghandle->lgh_cookie.lgc_lgl.lgl_ogen);
568         }
569
570         rc = filp_close(loghandle->lgh_file, 0);
571
572         llog_free_handle(loghandle); /* also removes loghandle from list */
573
574         if (dchild) {
575                 int err = vfs_unlink(mds->mds_logs_dir->d_inode, dchild);
576                 if (err) {
577                         CERROR("error unlinking empty log %*s: rc %d\n",
578                                dchild->d_name.len, dchild->d_name.name, err);
579                         if (!rc)
580                                 rc = err;
581                 }
582                 l_dput(dchild);
583                 up(&mds->mds_logs_dir->d_inode->i_sem);
584         }
585         RETURN(rc);
586 }
587
588 /* This is a callback from the llog_* functions.
589  * Assumes caller has already pushed us into the kernel context. */
590 struct llog_handle *mds_log_open(struct obd_device *obd,
591                                  struct llog_cookie *logcookie)
592 {
593         struct ll_fid fid = { .id = logcookie->lgc_lgl.lgl_oid,
594                               .generation = logcookie->lgc_lgl.lgl_ogen,
595                               .f_type = S_IFREG };
596         struct llog_handle *loghandle;
597         struct dentry *dchild;
598         int rc;
599         ENTRY;
600
601         loghandle = llog_alloc_handle();
602         if (loghandle == NULL)
603                 RETURN(ERR_PTR(-ENOMEM));
604
605         down(&obd->u.mds.mds_logs_dir->d_inode->i_sem);
606         dchild = mds_fid2dentry(&obd->u.mds, &fid, NULL);
607         up(&obd->u.mds.mds_logs_dir->d_inode->i_sem);
608         if (IS_ERR(dchild)) {
609                 rc = PTR_ERR(dchild);
610                 CERROR("error looking up log file "LPX64":%x: rc %d\n",
611                        fid.id, fid.generation, rc);
612                 GOTO(out, rc);
613         }
614
615         if (dchild->d_inode == NULL) {
616                 rc = -ENOENT;
617                 CERROR("nonexistent log file "LPX64":%x: rc %d\n",
618                        fid.id, fid.generation, rc);
619                 GOTO(out_put, rc);
620         }
621
622         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
623         mntget(obd->u.mds.mds_vfsmnt);
624         loghandle->lgh_file = dentry_open(dchild, obd->u.mds.mds_vfsmnt,
625                                           O_RDWR | O_LARGEFILE);
626         if (IS_ERR(loghandle->lgh_file)) {
627                 rc = PTR_ERR(loghandle->lgh_file);
628                 CERROR("error opening logfile "LPX64":%x: rc %d\n",
629                        fid.id, fid.generation, rc);
630                 GOTO(out, rc);
631         }
632         memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
633         loghandle->lgh_log_create = mds_log_create;
634         loghandle->lgh_log_open = mds_log_open;
635         loghandle->lgh_log_close = mds_log_close;
636         loghandle->lgh_obd = obd;
637
638         RETURN(loghandle);
639
640 out_put:
641         l_dput(dchild);
642 out:
643         llog_free_handle(loghandle);
644         return ERR_PTR(rc);
645 }
646
647 /* This is a callback from the llog_* functions.
648  * Assumes caller has already pushed us into the kernel context. */
649 struct llog_handle *mds_log_create(struct obd_device *obd)
650 {
651         char logbuf[24], *logname; /* logSSSSSSSSSS.count */
652         struct llog_handle *loghandle;
653         int rc, open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
654         ENTRY;
655
656         loghandle = llog_alloc_handle();
657         if (!loghandle)
658                 RETURN(ERR_PTR(-ENOMEM));
659
660 retry:
661         if (!obd->u.mds.mds_catalog) {
662                 logname = "LOGS/catalog";
663         } else {
664                 sprintf(logbuf, "LOGS/log%lu.%u\n",
665                         CURRENT_SECONDS, obd->u.mds.mds_catalog->lgh_index++);
666                 open_flags |= O_EXCL;
667                 logname = logbuf;
668         }
669         loghandle->lgh_file = filp_open(logname, open_flags, 0644);
670         if (IS_ERR(loghandle->lgh_file)) {
671                 rc = PTR_ERR(loghandle->lgh_file);
672                 if (rc == -EEXIST) {
673                         CDEBUG(D_HA, "collision in logfile %s creation\n",
674                                logname);
675                         obd->u.mds.mds_catalog->lgh_index++;
676                         goto retry;
677                 }
678                 CERROR("error opening/creating %s: rc %d\n", logname, rc);
679                 GOTO(out_handle, rc);
680         }
681
682         loghandle->lgh_cookie.lgc_lgl.lgl_oid =
683                 loghandle->lgh_file->f_dentry->d_inode->i_ino;
684         loghandle->lgh_cookie.lgc_lgl.lgl_ogen =
685                 loghandle->lgh_file->f_dentry->d_inode->i_generation;
686         loghandle->lgh_log_create = mds_log_create;
687         loghandle->lgh_log_open = mds_log_open;
688         loghandle->lgh_log_close = mds_log_close;
689         loghandle->lgh_obd = obd;
690
691         RETURN(loghandle);
692
693 out_handle:
694         llog_free_handle(loghandle);
695         return ERR_PTR(rc);
696 }
697
698 struct llog_handle *mds_get_catalog(struct obd_device *obd)
699 {
700         struct mds_server_data *msd = obd->u.mds.mds_server_data;
701         struct obd_run_ctxt saved;
702         struct llog_handle *cathandle = NULL;
703         int rc = 0;
704         ENTRY;
705
706         push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
707
708         if (msd->msd_catalog_oid) {
709                 struct llog_cookie catcookie;
710
711                 catcookie.lgc_lgl.lgl_oid = le64_to_cpu(msd->msd_catalog_oid);
712                 catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(msd->msd_catalog_ogen);
713                 cathandle = mds_log_open(obd, &catcookie);
714                 if (IS_ERR(cathandle)) {
715                         CERROR("error opening catalog "LPX64":%x: rc %d\n",
716                                catcookie.lgc_lgl.lgl_oid,
717                                catcookie.lgc_lgl.lgl_ogen,
718                                (int)PTR_ERR(cathandle));
719                         msd->msd_catalog_oid = 0;
720                         msd->msd_catalog_ogen = 0;
721                 }
722                 /* ORPHANS FIXME: compare catalog UUID to msd_peeruuid */
723         }
724
725         if (!msd->msd_catalog_oid) {
726                 struct llog_logid *lgl;
727
728                 cathandle = mds_log_create(obd);
729                 if (IS_ERR(cathandle)) {
730                         CERROR("error creating new catalog: rc %d\n",
731                                (int)PTR_ERR(cathandle));
732                         GOTO(out, cathandle);
733                 }
734                 lgl = &cathandle->lgh_cookie.lgc_lgl;
735                 msd->msd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
736                 msd->msd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
737                 rc = mds_update_server_data(obd);
738                 if (rc) {
739                         CERROR("error writing new catalog to disk: rc %d\n",rc);
740                         GOTO(out_handle, rc);
741                 }
742         }
743
744         rc = llog_init_catalog(cathandle, &obd->u.mds.mds_osc_uuid);
745
746 out:
747         pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
748         RETURN(cathandle);
749
750 out_handle:
751         mds_log_close(cathandle, cathandle);
752         cathandle = ERR_PTR(rc);
753         goto out;
754
755 }
756
757 void mds_put_catalog(struct llog_handle *cathandle)
758 {
759         struct llog_handle *loghandle, *n;
760         int rc;
761         ENTRY;
762
763         list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
764                 mds_log_close(cathandle, loghandle);
765
766         rc = filp_close(cathandle->lgh_file, 0);
767         if (rc)
768                 CERROR("error closing catalog: rc %d\n", rc);
769
770         llog_free_handle(cathandle);
771         EXIT;
772 }