1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (MDS) filesystem interface code
7 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #define DEBUG_SUBSYSTEM S_MDS
29 #include <linux/module.h>
30 #include <linux/kmod.h>
31 #include <linux/version.h>
32 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
33 #include <linux/mount.h>
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_fsfilt.h>
40 #include <portals/list.h>
42 #include "mds_internal.h"
44 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
45 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
46 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
48 #define LAST_RCVD "last_rcvd"
50 /* Add client data to the MDS. We use a bitmap to locate a free space
51 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
52 * Otherwise, we have just read the data from the last_rcvd file and
55 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
56 struct mds_export_data *med, int cl_idx)
58 unsigned long *bitmap = mds->mds_client_bitmap;
59 int new_client = (cl_idx == -1);
61 LASSERT(bitmap != NULL);
63 /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
64 if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
67 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
68 * there's no need for extra complication here
71 cl_idx = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
73 if (cl_idx >= MDS_MAX_CLIENTS) {
74 CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
77 if (test_and_set_bit(cl_idx, bitmap)) {
78 CERROR("MDS client %d: found bit is set in bitmap\n",
80 cl_idx = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
85 if (test_and_set_bit(cl_idx, bitmap)) {
86 CERROR("MDS client %d: bit already set in bitmap!!\n",
92 CDEBUG(D_INFO, "client at index %d with UUID '%s' added\n",
93 cl_idx, med->med_mcd->mcd_uuid);
95 med->med_idx = cl_idx;
96 med->med_off = MDS_LR_CLIENT_START + (cl_idx * MDS_LR_CLIENT_SIZE);
99 struct obd_run_ctxt saved;
100 loff_t off = med->med_off;
104 push_ctxt(&saved, &mds->mds_ctxt, NULL);
105 /* We need to start a transaction here first, to avoid a
106 * possible ordering deadlock on last_rcvd->i_sem and the
107 * journal lock. In most places we start the journal handle
108 * first (because we do compound transactions), and then
109 * later do the write into last_rcvd, which gets i_sem.
111 * Without this transaction, clients connecting at the same
112 * time other MDS operations are ongoing get last_rcvd->i_sem
113 * first (in generic_file_write()) and start the journal
114 * transaction afterwards, and can deadlock with other ops.
116 * We use FSFILT_OP_SETATTR because it is smallest, but all
117 * ops include enough space for the last_rcvd update so we
118 * could use any of them, or maybe an FSFILT_OP_NONE is best?
120 handle = fsfilt_start(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
121 FSFILT_OP_SETATTR, NULL);
122 if (IS_ERR(handle)) {
123 written = PTR_ERR(handle);
124 CERROR("unable to start transaction: rc %d\n",
127 written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
129 sizeof(*med->med_mcd),
131 fsfilt_commit(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
134 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
136 if (written != sizeof(*med->med_mcd)) {
141 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
142 med->med_idx, med->med_off,
143 (unsigned int)sizeof(*med->med_mcd));
148 int mds_client_free(struct obd_export *exp)
150 struct mds_export_data *med = &exp->exp_mds_data;
151 struct mds_obd *mds = &exp->exp_obd->u.mds;
152 struct obd_device *obd = exp->exp_obd;
153 struct mds_client_data zero_mcd;
154 struct obd_run_ctxt saved;
156 unsigned long *bitmap = mds->mds_client_bitmap;
162 /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
163 if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
164 GOTO(free_and_out, 0);
166 CDEBUG(D_INFO, "freeing client at index %u (%lld)with UUID '%s'\n",
167 med->med_idx, med->med_off, med->med_mcd->mcd_uuid);
169 if (!test_and_clear_bit(med->med_idx, bitmap)) {
170 CERROR("MDS client %u: bit already clear in bitmap!!\n",
175 memset(&zero_mcd, 0, sizeof zero_mcd);
176 push_ctxt(&saved, &mds->mds_ctxt, NULL);
177 written = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd,
178 sizeof(zero_mcd), &med->med_off);
179 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
181 if (written != sizeof(zero_mcd)) {
182 CERROR("error zeroing out client %s index %d in %s: %d\n",
183 med->med_mcd->mcd_uuid, med->med_idx, LAST_RCVD,
186 CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
187 med->med_mcd->mcd_uuid, med->med_idx);
191 OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
196 static int mds_server_free_data(struct mds_obd *mds)
198 OBD_FREE(mds->mds_client_bitmap,
199 MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
200 OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
201 mds->mds_server_data = NULL;
206 static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
208 struct mds_obd *mds = &obd->u.mds;
209 struct mds_server_data *msd;
210 struct mds_client_data *mcd = NULL;
213 unsigned long last_rcvd_size = file->f_dentry->d_inode->i_size;
214 __u64 last_transno = 0;
218 LASSERT(sizeof(struct mds_client_data) == MDS_LR_CLIENT_SIZE);
219 LASSERT(sizeof(struct mds_server_data) <= MDS_LR_SERVER_SIZE);
221 OBD_ALLOC(msd, sizeof(*msd));
225 OBD_ALLOC(mds->mds_client_bitmap,
226 MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
227 if (!mds->mds_client_bitmap) {
228 OBD_FREE(msd, sizeof(*msd));
232 mds->mds_server_data = msd;
234 if (last_rcvd_size == 0) {
236 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
237 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
238 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
239 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
240 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
241 written = fsfilt_write_record(obd, file, msd, sizeof(*msd),
244 if (written == sizeof(*msd))
246 CERROR("%s: error writing new MSD: %d\n", obd->obd_name,
248 GOTO(err_msd, rc = (written < 0 ? written : -EIO));
251 rc = fsfilt_read_record(obd, file, msd, sizeof(*msd), &off);
253 if (rc != sizeof(*msd)) {
254 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD,rc);
259 if (!msd->msd_server_size)
260 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
261 if (!msd->msd_client_start)
262 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
263 if (!msd->msd_client_size)
264 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
266 if (msd->msd_feature_incompat) {
267 CERROR("unsupported incompat feature %x\n",
268 le32_to_cpu(msd->msd_feature_incompat));
269 GOTO(err_msd, rc = -EINVAL);
271 if (msd->msd_feature_rocompat) {
272 CERROR("unsupported read-only feature %x\n",
273 le32_to_cpu(msd->msd_feature_rocompat));
274 /* Do something like remount filesystem read-only */
275 GOTO(err_msd, rc = -EINVAL);
278 last_transno = le64_to_cpu(msd->msd_last_transno);
279 mds->mds_last_transno = last_transno;
281 mount_count = le64_to_cpu(msd->msd_mount_count);
282 mds->mds_mount_count = mount_count;
284 CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
285 obd->obd_name, last_transno);
286 CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
287 obd->obd_name, mount_count);
288 CDEBUG(D_INODE, "%s: server data size: %u\n",
289 obd->obd_name, le32_to_cpu(msd->msd_server_size));
290 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
291 obd->obd_name, le32_to_cpu(msd->msd_client_start));
292 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
293 obd->obd_name, le32_to_cpu(msd->msd_client_size));
294 CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
295 obd->obd_name, last_rcvd_size);
296 CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
297 (last_rcvd_size - MDS_LR_CLIENT_START) / MDS_LR_CLIENT_SIZE);
299 /* When we do a clean FILTER shutdown, we save the last_transno into
300 * the header. If we find clients with higher last_transno values
301 * then those clients may need recovery done. */
302 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
307 OBD_ALLOC(mcd, sizeof(*mcd));
309 GOTO(err_msd, rc = -ENOMEM);
312 /* Don't assume off is incremented properly, in case
313 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
315 off = le32_to_cpu(msd->msd_client_start) +
316 cl_idx * le16_to_cpu(msd->msd_client_size);
317 rc = fsfilt_read_record(obd, file, mcd, sizeof(*mcd), &off);
318 if (rc != sizeof(*mcd)) {
319 CERROR("error reading MDS %s offset %d: rc = %d\n",
320 LAST_RCVD, cl_idx, rc);
321 if (rc > 0) /* XXX fatal error or just abort reading? */
326 if (mcd->mcd_uuid[0] == '\0') {
327 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
332 last_transno = le64_to_cpu(mcd->mcd_last_transno);
334 /* These exports are cleaned up by mds_disconnect(), so they
335 * need to be set up like real exports as mds_connect() does.
337 mount_age = mount_count - le64_to_cpu(mcd->mcd_mount_count);
338 if (mount_age < MDS_MOUNT_RECOV) {
339 struct obd_export *exp = class_new_export(obd);
340 struct mds_export_data *med;
341 CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
342 "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
343 "\n", mcd->mcd_uuid, cl_idx,
344 last_transno, le64_to_cpu(msd->msd_last_transno),
345 le64_to_cpu(mcd->mcd_mount_count), mount_count);
352 memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
353 sizeof exp->exp_client_uuid.uuid);
354 med = &exp->exp_mds_data;
356 mds_client_add(obd, mds, med, cl_idx);
357 /* create helper if export init gets more complex */
358 INIT_LIST_HEAD(&med->med_open_head);
359 spin_lock_init(&med->med_open_lock);
362 obd->obd_recoverable_clients++;
363 class_export_put(exp);
365 CDEBUG(D_INFO, "discarded client %d, UUID '%s', count "
366 LPU64"\n", cl_idx, mcd->mcd_uuid,
367 le64_to_cpu(mcd->mcd_mount_count));
370 CDEBUG(D_OTHER, "client at offset %d has last_transno = "
371 LPU64"\n", cl_idx, last_transno);
373 if (last_transno > mds->mds_last_transno)
374 mds->mds_last_transno = last_transno;
377 obd->obd_last_committed = mds->mds_last_transno;
378 if (obd->obd_recoverable_clients) {
379 CERROR("RECOVERY: %d recoverable clients, last_transno "
381 obd->obd_recoverable_clients, mds->mds_last_transno);
382 obd->obd_next_recovery_transno = obd->obd_last_committed
384 obd->obd_recovering = 1;
388 OBD_FREE(mcd, sizeof(*mcd));
393 mds_server_free_data(mds);
397 static int mds_fs_prep(struct obd_device *obd)
399 struct mds_obd *mds = &obd->u.mds;
400 struct obd_run_ctxt saved;
401 struct dentry *dentry;
405 push_ctxt(&saved, &mds->mds_ctxt, NULL);
406 dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
407 if (IS_ERR(dentry)) {
408 rc = PTR_ERR(dentry);
409 CERROR("cannot create ROOT directory: rc = %d\n", rc);
413 mds->mds_rootfid.id = dentry->d_inode->i_ino;
414 mds->mds_rootfid.generation = dentry->d_inode->i_generation;
415 mds->mds_rootfid.f_type = S_IFDIR;
419 dentry = lookup_one_len("__iopen__", current->fs->pwd,
420 strlen("__iopen__"));
421 if (IS_ERR(dentry) || !dentry->d_inode) {
422 rc = (IS_ERR(dentry)) ? PTR_ERR(dentry): -ENOENT;
423 CERROR("cannot open iopen FH directory: rc = %d\n", rc);
426 mds->mds_fid_de = dentry;
428 dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777);
429 if (IS_ERR(dentry)) {
430 rc = PTR_ERR(dentry);
431 CERROR("cannot create PENDING directory: rc = %d\n", rc);
434 mds->mds_pending_dir = dentry;
436 dentry = simple_mkdir(current->fs->pwd, "LOGS", 0700);
437 if (IS_ERR(dentry)) {
438 rc = PTR_ERR(dentry);
439 CERROR("cannot create LOGS directory: rc = %d\n", rc);
440 GOTO(err_pending, rc);
442 mds->mds_logs_dir = dentry;
444 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
447 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
449 GOTO(err_logs, rc = PTR_ERR(file));
451 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
452 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
453 file->f_dentry->d_inode->i_mode);
454 GOTO(err_filp, rc = -ENOENT);
457 rc = fsfilt_journal_data(obd, file);
459 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
463 rc = mds_read_last_rcvd(obd, file);
465 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
466 GOTO(err_client, rc);
468 mds->mds_rcvd_filp = file;
469 #ifdef I_SKIP_PDFLUSH
471 * we need this to protect from deadlock
472 * pdflush vs. lustre_fwrite()
474 file->f_dentry->d_inode->i_flags |= I_SKIP_PDFLUSH;
477 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
482 class_disconnect_exports(obd, 0);
484 if (filp_close(file, 0))
485 CERROR("can't close %s after error\n", LAST_RCVD);
487 dput(mds->mds_logs_dir);
489 dput(mds->mds_pending_dir);
491 dput(mds->mds_fid_de);
495 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
497 struct mds_obd *mds = &obd->u.mds;
500 mds->mds_vfsmnt = mnt;
502 OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
503 mds->mds_ctxt.pwdmnt = mnt;
504 mds->mds_ctxt.pwd = mnt->mnt_root;
505 mds->mds_ctxt.fs = get_ds();
506 RETURN(mds_fs_prep(obd));
509 int mds_fs_cleanup(struct obd_device *obd, int flags)
511 struct mds_obd *mds = &obd->u.mds;
512 struct obd_run_ctxt saved;
515 if (flags & OBD_OPT_FAILOVER)
516 CERROR("%s: shutting down for failover; client state will"
517 " be preserved.\n", obd->obd_name);
519 class_disconnect_exports(obd, flags); /* cleans up client info too */
520 mds_server_free_data(mds);
522 push_ctxt(&saved, &mds->mds_ctxt, NULL);
523 if (mds->mds_rcvd_filp) {
524 rc = filp_close(mds->mds_rcvd_filp, 0);
525 mds->mds_rcvd_filp = NULL;
527 CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
529 if (mds->mds_logs_dir) {
530 l_dput(mds->mds_logs_dir);
531 mds->mds_logs_dir = NULL;
533 if (mds->mds_pending_dir) {
534 l_dput(mds->mds_pending_dir);
535 mds->mds_pending_dir = NULL;
537 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
538 shrink_dcache_parent(mds->mds_fid_de);
539 dput(mds->mds_fid_de);
544 /* This is a callback from the llog_* functions.
545 * Assumes caller has already pushed us into the kernel context. */
546 int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle)
548 struct llog_object_hdr *llh = loghandle->lgh_hdr;
549 struct mds_obd *mds = &cathandle->lgh_obd->u.mds;
550 struct dentry *dchild = NULL;
554 /* If we are going to delete this log, grab a ref before we close
555 * it so we don't have to immediately do another lookup.
557 if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
558 CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
559 loghandle->lgh_cookie.lgc_lgl.lgl_oid,
560 loghandle->lgh_cookie.lgc_lgl.lgl_ogen);
561 down(&mds->mds_logs_dir->d_inode->i_sem);
562 dchild = dget(loghandle->lgh_file->f_dentry);
563 llog_delete_log(cathandle, loghandle);
565 CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
566 loghandle->lgh_cookie.lgc_lgl.lgl_oid,
567 loghandle->lgh_cookie.lgc_lgl.lgl_ogen);
570 rc = filp_close(loghandle->lgh_file, 0);
572 llog_free_handle(loghandle); /* also removes loghandle from list */
575 int err = vfs_unlink(mds->mds_logs_dir->d_inode, dchild);
577 CERROR("error unlinking empty log %*s: rc %d\n",
578 dchild->d_name.len, dchild->d_name.name, err);
583 up(&mds->mds_logs_dir->d_inode->i_sem);
588 /* This is a callback from the llog_* functions.
589 * Assumes caller has already pushed us into the kernel context. */
590 struct llog_handle *mds_log_open(struct obd_device *obd,
591 struct llog_cookie *logcookie)
593 struct ll_fid fid = { .id = logcookie->lgc_lgl.lgl_oid,
594 .generation = logcookie->lgc_lgl.lgl_ogen,
596 struct llog_handle *loghandle;
597 struct dentry *dchild;
601 loghandle = llog_alloc_handle();
602 if (loghandle == NULL)
603 RETURN(ERR_PTR(-ENOMEM));
605 down(&obd->u.mds.mds_logs_dir->d_inode->i_sem);
606 dchild = mds_fid2dentry(&obd->u.mds, &fid, NULL);
607 up(&obd->u.mds.mds_logs_dir->d_inode->i_sem);
608 if (IS_ERR(dchild)) {
609 rc = PTR_ERR(dchild);
610 CERROR("error looking up log file "LPX64":%x: rc %d\n",
611 fid.id, fid.generation, rc);
615 if (dchild->d_inode == NULL) {
617 CERROR("nonexistent log file "LPX64":%x: rc %d\n",
618 fid.id, fid.generation, rc);
622 /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
623 mntget(obd->u.mds.mds_vfsmnt);
624 loghandle->lgh_file = dentry_open(dchild, obd->u.mds.mds_vfsmnt,
625 O_RDWR | O_LARGEFILE);
626 if (IS_ERR(loghandle->lgh_file)) {
627 rc = PTR_ERR(loghandle->lgh_file);
628 CERROR("error opening logfile "LPX64":%x: rc %d\n",
629 fid.id, fid.generation, rc);
632 memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
633 loghandle->lgh_log_create = mds_log_create;
634 loghandle->lgh_log_open = mds_log_open;
635 loghandle->lgh_log_close = mds_log_close;
636 loghandle->lgh_obd = obd;
643 llog_free_handle(loghandle);
647 /* This is a callback from the llog_* functions.
648 * Assumes caller has already pushed us into the kernel context. */
649 struct llog_handle *mds_log_create(struct obd_device *obd)
651 char logbuf[24], *logname; /* logSSSSSSSSSS.count */
652 struct llog_handle *loghandle;
653 int rc, open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
656 loghandle = llog_alloc_handle();
658 RETURN(ERR_PTR(-ENOMEM));
661 if (!obd->u.mds.mds_catalog) {
662 logname = "LOGS/catalog";
664 sprintf(logbuf, "LOGS/log%lu.%u\n",
665 CURRENT_SECONDS, obd->u.mds.mds_catalog->lgh_index++);
666 open_flags |= O_EXCL;
669 loghandle->lgh_file = filp_open(logname, open_flags, 0644);
670 if (IS_ERR(loghandle->lgh_file)) {
671 rc = PTR_ERR(loghandle->lgh_file);
673 CDEBUG(D_HA, "collision in logfile %s creation\n",
675 obd->u.mds.mds_catalog->lgh_index++;
678 CERROR("error opening/creating %s: rc %d\n", logname, rc);
679 GOTO(out_handle, rc);
682 loghandle->lgh_cookie.lgc_lgl.lgl_oid =
683 loghandle->lgh_file->f_dentry->d_inode->i_ino;
684 loghandle->lgh_cookie.lgc_lgl.lgl_ogen =
685 loghandle->lgh_file->f_dentry->d_inode->i_generation;
686 loghandle->lgh_log_create = mds_log_create;
687 loghandle->lgh_log_open = mds_log_open;
688 loghandle->lgh_log_close = mds_log_close;
689 loghandle->lgh_obd = obd;
694 llog_free_handle(loghandle);
698 struct llog_handle *mds_get_catalog(struct obd_device *obd)
700 struct mds_server_data *msd = obd->u.mds.mds_server_data;
701 struct obd_run_ctxt saved;
702 struct llog_handle *cathandle = NULL;
706 push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
708 if (msd->msd_catalog_oid) {
709 struct llog_cookie catcookie;
711 catcookie.lgc_lgl.lgl_oid = le64_to_cpu(msd->msd_catalog_oid);
712 catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(msd->msd_catalog_ogen);
713 cathandle = mds_log_open(obd, &catcookie);
714 if (IS_ERR(cathandle)) {
715 CERROR("error opening catalog "LPX64":%x: rc %d\n",
716 catcookie.lgc_lgl.lgl_oid,
717 catcookie.lgc_lgl.lgl_ogen,
718 (int)PTR_ERR(cathandle));
719 msd->msd_catalog_oid = 0;
720 msd->msd_catalog_ogen = 0;
722 /* ORPHANS FIXME: compare catalog UUID to msd_peeruuid */
725 if (!msd->msd_catalog_oid) {
726 struct llog_logid *lgl;
728 cathandle = mds_log_create(obd);
729 if (IS_ERR(cathandle)) {
730 CERROR("error creating new catalog: rc %d\n",
731 (int)PTR_ERR(cathandle));
732 GOTO(out, cathandle);
734 lgl = &cathandle->lgh_cookie.lgc_lgl;
735 msd->msd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
736 msd->msd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
737 rc = mds_update_server_data(obd);
739 CERROR("error writing new catalog to disk: rc %d\n",rc);
740 GOTO(out_handle, rc);
744 rc = llog_init_catalog(cathandle, &obd->u.mds.mds_osc_uuid);
747 pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
751 mds_log_close(cathandle, cathandle);
752 cathandle = ERR_PTR(rc);
757 void mds_put_catalog(struct llog_handle *cathandle)
759 struct llog_handle *loghandle, *n;
763 list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
764 mds_log_close(cathandle, loghandle);
766 rc = filp_close(cathandle->lgh_file, 0);
768 CERROR("error closing catalog: rc %d\n", rc);
770 llog_free_handle(cathandle);