1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (MDS) filesystem interface code
7 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #define DEBUG_SUBSYSTEM S_MDS
29 #include <linux/module.h>
30 #include <linux/kmod.h>
31 #include <linux/version.h>
32 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
33 #include <linux/mount.h>
35 #include <linux/lustre_mds.h>
36 #include <linux/obd_class.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_fsfilt.h>
40 #include <portals/list.h>
42 #include "mds_internal.h"
44 /* This limit is arbitrary, but for now we fit it in 1 page (32k clients) */
45 #define MDS_MAX_CLIENTS (PAGE_SIZE * 8)
46 #define MDS_MAX_CLIENT_WORDS (MDS_MAX_CLIENTS / sizeof(unsigned long))
48 #define LAST_RCVD "last_rcvd"
50 /* Add client data to the MDS. We use a bitmap to locate a free space
51 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
52 * Otherwise, we have just read the data from the last_rcvd file and
55 int mds_client_add(struct obd_device *obd, struct mds_obd *mds,
56 struct mds_export_data *med, int cl_idx)
58 unsigned long *bitmap = mds->mds_client_bitmap;
59 int new_client = (cl_idx == -1);
61 LASSERT(bitmap != NULL);
63 /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
64 if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
67 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
68 * there's no need for extra complication here
71 cl_idx = find_first_zero_bit(bitmap, MDS_MAX_CLIENTS);
73 if (cl_idx >= MDS_MAX_CLIENTS) {
74 CERROR("no room for clients - fix MDS_MAX_CLIENTS\n");
77 if (test_and_set_bit(cl_idx, bitmap)) {
78 CERROR("MDS client %d: found bit is set in bitmap\n",
80 cl_idx = find_next_zero_bit(bitmap, MDS_MAX_CLIENTS,
85 if (test_and_set_bit(cl_idx, bitmap)) {
86 CERROR("MDS client %d: bit already set in bitmap!!\n",
92 CDEBUG(D_INFO, "client at index %d with UUID '%s' added\n",
93 cl_idx, med->med_mcd->mcd_uuid);
95 med->med_idx = cl_idx;
96 med->med_off = MDS_LR_CLIENT_START + (cl_idx * MDS_LR_CLIENT_SIZE);
99 struct obd_run_ctxt saved;
100 loff_t off = med->med_off;
104 push_ctxt(&saved, &mds->mds_ctxt, NULL);
105 /* We need to start a transaction here first, to avoid a
106 * possible ordering deadlock on last_rcvd->i_sem and the
107 * journal lock. In most places we start the journal handle
108 * first (because we do compound transactions), and then
109 * later do the write into last_rcvd, which gets i_sem.
111 * Without this transaction, clients connecting at the same
112 * time other MDS operations are ongoing get last_rcvd->i_sem
113 * first (in generic_file_write()) and start the journal
114 * transaction afterwards, and can deadlock with other ops.
116 * We use FSFILT_OP_SETATTR because it is smallest, but all
117 * ops include enough space for the last_rcvd update so we
118 * could use any of them, or maybe an FSFILT_OP_NONE is best?
120 handle = fsfilt_start(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
121 FSFILT_OP_SETATTR, NULL);
122 if (IS_ERR(handle)) {
123 written = PTR_ERR(handle);
124 CERROR("unable to start transaction: rc %d\n",
127 written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
128 (char *)med->med_mcd,
129 sizeof(*med->med_mcd),
131 fsfilt_commit(obd,mds->mds_rcvd_filp->f_dentry->d_inode,
134 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
136 if (written != sizeof(*med->med_mcd)) {
141 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
142 med->med_idx, med->med_off,
143 (unsigned int)sizeof(*med->med_mcd));
148 int mds_client_free(struct obd_export *exp)
150 struct mds_export_data *med = &exp->exp_mds_data;
151 struct mds_obd *mds = &exp->exp_obd->u.mds;
152 struct obd_device *obd = exp->exp_obd;
153 struct mds_client_data zero_mcd;
154 struct obd_run_ctxt saved;
156 unsigned long *bitmap = mds->mds_client_bitmap;
162 /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
163 if (!strcmp(med->med_mcd->mcd_uuid, "OBD_CLASS_UUID"))
164 GOTO(free_and_out, 0);
166 CDEBUG(D_INFO, "freeing client at index %u (%lld)with UUID '%s'\n",
167 med->med_idx, med->med_off, med->med_mcd->mcd_uuid);
169 if (!test_and_clear_bit(med->med_idx, bitmap)) {
170 CERROR("MDS client %u: bit already clear in bitmap!!\n",
175 memset(&zero_mcd, 0, sizeof zero_mcd);
176 push_ctxt(&saved, &mds->mds_ctxt, NULL);
177 written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
178 (char *)&zero_mcd, sizeof(zero_mcd),
180 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
182 if (written != sizeof(zero_mcd)) {
183 CERROR("error zeroing out client %s index %d in %s: %d\n",
184 med->med_mcd->mcd_uuid, med->med_idx, LAST_RCVD,
187 CDEBUG(D_INFO, "zeroed out disconnecting client %s at off %d\n",
188 med->med_mcd->mcd_uuid, med->med_idx);
192 OBD_FREE(med->med_mcd, sizeof(*med->med_mcd));
197 static int mds_server_free_data(struct mds_obd *mds)
199 OBD_FREE(mds->mds_client_bitmap,
200 MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
201 OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
202 mds->mds_server_data = NULL;
207 static int mds_read_last_rcvd(struct obd_device *obd, struct file *file)
209 struct mds_obd *mds = &obd->u.mds;
210 struct mds_server_data *msd;
211 struct mds_client_data *mcd = NULL;
214 unsigned long last_rcvd_size = file->f_dentry->d_inode->i_size;
215 __u64 last_transno = 0;
219 LASSERT(sizeof(struct mds_client_data) == MDS_LR_CLIENT_SIZE);
220 LASSERT(sizeof(struct mds_server_data) <= MDS_LR_SERVER_SIZE);
222 OBD_ALLOC(msd, sizeof(*msd));
226 OBD_ALLOC(mds->mds_client_bitmap,
227 MDS_MAX_CLIENT_WORDS * sizeof(unsigned long));
228 if (!mds->mds_client_bitmap) {
229 OBD_FREE(msd, sizeof(*msd));
233 mds->mds_server_data = msd;
235 if (last_rcvd_size == 0) {
236 CWARN("%s: initializing new %s\n", obd->obd_name, LAST_RCVD);
237 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
238 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
239 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
240 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
245 rc = fsfilt_read_record(obd, file, (char *)msd, sizeof(*msd), &off);
247 if (rc != sizeof(*msd)) {
248 CERROR("error reading MDS %s: rc = %d\n", LAST_RCVD,rc);
253 if (!msd->msd_server_size)
254 msd->msd_server_size = cpu_to_le32(MDS_LR_SERVER_SIZE);
255 if (!msd->msd_client_start)
256 msd->msd_client_start = cpu_to_le32(MDS_LR_CLIENT_START);
257 if (!msd->msd_client_size)
258 msd->msd_client_size = cpu_to_le16(MDS_LR_CLIENT_SIZE);
260 if (msd->msd_feature_incompat) {
261 CERROR("unsupported incompat feature %x\n",
262 le32_to_cpu(msd->msd_feature_incompat));
263 GOTO(err_msd, rc = -EINVAL);
265 if (msd->msd_feature_rocompat) {
266 CERROR("unsupported read-only feature %x\n",
267 le32_to_cpu(msd->msd_feature_rocompat));
268 /* Do something like remount filesystem read-only */
269 GOTO(err_msd, rc = -EINVAL);
272 last_transno = le64_to_cpu(msd->msd_last_transno);
273 mds->mds_last_transno = last_transno;
275 mount_count = le64_to_cpu(msd->msd_mount_count);
276 mds->mds_mount_count = mount_count;
278 CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
279 obd->obd_name, last_transno);
280 CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
281 obd->obd_name, mount_count);
282 CDEBUG(D_INODE, "%s: server data size: %u\n",
283 obd->obd_name, le32_to_cpu(msd->msd_server_size));
284 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
285 obd->obd_name, le32_to_cpu(msd->msd_client_start));
286 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
287 obd->obd_name, le32_to_cpu(msd->msd_client_size));
288 CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
289 obd->obd_name, last_rcvd_size);
290 CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
291 (last_rcvd_size - MDS_LR_CLIENT_START) / MDS_LR_CLIENT_SIZE);
293 /* When we do a clean FILTER shutdown, we save the last_transno into
294 * the header. If we find clients with higher last_transno values
295 * then those clients may need recovery done. */
296 for (cl_idx = 0; off < last_rcvd_size; cl_idx++) {
301 OBD_ALLOC(mcd, sizeof(*mcd));
303 GOTO(err_msd, rc = -ENOMEM);
306 /* Don't assume off is incremented properly, in case
307 * sizeof(fsd) isn't the same as fsd->fsd_client_size.
309 off = le32_to_cpu(msd->msd_client_start) +
310 cl_idx * le16_to_cpu(msd->msd_client_size);
311 rc = fsfilt_read_record(obd, file, (char *)mcd,
313 if (rc != sizeof(*mcd)) {
314 CERROR("error reading MDS %s offset %d: rc = %d\n",
315 LAST_RCVD, cl_idx, rc);
316 if (rc > 0) /* XXX fatal error or just abort reading? */
321 if (mcd->mcd_uuid[0] == '\0') {
322 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
327 last_transno = le64_to_cpu(mcd->mcd_last_transno);
329 /* These exports are cleaned up by mds_disconnect(), so they
330 * need to be set up like real exports as mds_connect() does.
332 mount_age = mount_count - le64_to_cpu(mcd->mcd_mount_count);
333 if (mount_age < MDS_MOUNT_RECOV) {
334 struct obd_export *exp = class_new_export(obd);
335 struct mds_export_data *med;
336 CERROR("RCVRNG CLIENT uuid: %s off: %d lr: "LPU64
337 "srv lr: "LPU64" mnt: "LPU64" last mount: "LPU64
338 "\n", mcd->mcd_uuid, cl_idx,
339 last_transno, le64_to_cpu(msd->msd_last_transno),
340 le64_to_cpu(mcd->mcd_mount_count), mount_count);
347 memcpy(&exp->exp_client_uuid.uuid, mcd->mcd_uuid,
348 sizeof exp->exp_client_uuid.uuid);
349 med = &exp->exp_mds_data;
351 mds_client_add(obd, mds, med, cl_idx);
352 /* create helper if export init gets more complex */
353 INIT_LIST_HEAD(&med->med_open_head);
354 spin_lock_init(&med->med_open_lock);
357 obd->obd_recoverable_clients++;
358 class_export_put(exp);
360 CDEBUG(D_INFO, "discarded client %d, UUID '%s', count "
361 LPU64"\n", cl_idx, mcd->mcd_uuid,
362 le64_to_cpu(mcd->mcd_mount_count));
365 CDEBUG(D_OTHER, "client at offset %d has last_transno = "
366 LPU64"\n", cl_idx, last_transno);
368 if (last_transno > mds->mds_last_transno)
369 mds->mds_last_transno = last_transno;
372 obd->obd_last_committed = mds->mds_last_transno;
373 if (obd->obd_recoverable_clients) {
374 CERROR("RECOVERY: %d recoverable clients, last_transno "
376 obd->obd_recoverable_clients, mds->mds_last_transno);
377 obd->obd_next_recovery_transno = obd->obd_last_committed
379 obd->obd_recovering = 1;
383 OBD_FREE(mcd, sizeof(*mcd));
388 mds_server_free_data(mds);
392 static int mds_fs_prep(struct obd_device *obd)
394 struct mds_obd *mds = &obd->u.mds;
395 struct obd_run_ctxt saved;
396 struct dentry *dentry;
400 push_ctxt(&saved, &mds->mds_ctxt, NULL);
401 dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
402 if (IS_ERR(dentry)) {
403 rc = PTR_ERR(dentry);
404 CERROR("cannot create ROOT directory: rc = %d\n", rc);
408 mds->mds_rootfid.id = dentry->d_inode->i_ino;
409 mds->mds_rootfid.generation = dentry->d_inode->i_generation;
410 mds->mds_rootfid.f_type = S_IFDIR;
414 dentry = lookup_one_len("__iopen__", current->fs->pwd,
415 strlen("__iopen__"));
416 if (IS_ERR(dentry) || !dentry->d_inode) {
417 rc = (IS_ERR(dentry)) ? PTR_ERR(dentry): -ENOENT;
418 CERROR("cannot open iopen FH directory: rc = %d\n", rc);
421 mds->mds_fid_de = dentry;
423 dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777);
424 if (IS_ERR(dentry)) {
425 rc = PTR_ERR(dentry);
426 CERROR("cannot create PENDING directory: rc = %d\n", rc);
429 mds->mds_pending_dir = dentry;
431 dentry = simple_mkdir(current->fs->pwd, "LOGS", 0700);
432 if (IS_ERR(dentry)) {
433 rc = PTR_ERR(dentry);
434 CERROR("cannot create LOGS directory: rc = %d\n", rc);
435 GOTO(err_pending, rc);
437 mds->mds_logs_dir = dentry;
439 file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
442 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
444 GOTO(err_logs, rc = PTR_ERR(file));
446 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
447 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
448 file->f_dentry->d_inode->i_mode);
449 GOTO(err_filp, rc = -ENOENT);
452 rc = fsfilt_journal_data(obd, file);
454 CERROR("cannot journal data on %s: rc = %d\n", LAST_RCVD, rc);
458 rc = mds_read_last_rcvd(obd, file);
460 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
461 GOTO(err_client, rc);
463 mds->mds_rcvd_filp = file;
464 #ifdef I_SKIP_PDFLUSH
466 * we need this to protect from deadlock
467 * pdflush vs. lustre_fwrite()
469 file->f_dentry->d_inode->i_flags |= I_SKIP_PDFLUSH;
472 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
477 class_disconnect_exports(obd, 0);
479 if (filp_close(file, 0))
480 CERROR("can't close %s after error\n", LAST_RCVD);
482 dput(mds->mds_logs_dir);
484 dput(mds->mds_pending_dir);
486 dput(mds->mds_fid_de);
490 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
492 struct mds_obd *mds = &obd->u.mds;
495 mds->mds_vfsmnt = mnt;
497 OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
498 mds->mds_ctxt.pwdmnt = mnt;
499 mds->mds_ctxt.pwd = mnt->mnt_root;
500 mds->mds_ctxt.fs = get_ds();
501 RETURN(mds_fs_prep(obd));
504 int mds_fs_cleanup(struct obd_device *obd, int flags)
506 struct mds_obd *mds = &obd->u.mds;
507 struct obd_run_ctxt saved;
510 if (flags & OBD_OPT_FAILOVER)
511 CERROR("%s: shutting down for failover; client state will"
512 " be preserved.\n", obd->obd_name);
514 class_disconnect_exports(obd, flags); /* cleans up client info too */
515 mds_server_free_data(mds);
517 push_ctxt(&saved, &mds->mds_ctxt, NULL);
518 if (mds->mds_rcvd_filp) {
519 rc = filp_close(mds->mds_rcvd_filp, 0);
520 mds->mds_rcvd_filp = NULL;
522 CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
524 if (mds->mds_logs_dir) {
525 l_dput(mds->mds_logs_dir);
526 mds->mds_logs_dir = NULL;
528 if (mds->mds_pending_dir) {
529 l_dput(mds->mds_pending_dir);
530 mds->mds_pending_dir = NULL;
532 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
533 shrink_dcache_parent(mds->mds_fid_de);
534 dput(mds->mds_fid_de);
539 /* This is a callback from the llog_* functions.
540 * Assumes caller has already pushed us into the kernel context. */
541 int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle)
543 struct llog_object_hdr *llh = loghandle->lgh_hdr;
544 struct mds_obd *mds = &cathandle->lgh_obd->u.mds;
545 struct dentry *dchild = NULL;
549 /* If we are going to delete this log, grab a ref before we close
550 * it so we don't have to immediately do another lookup.
552 if (llh->llh_hdr.lth_type != LLOG_CATALOG_MAGIC && llh->llh_count == 0){
553 CDEBUG(D_INODE, "deleting log file "LPX64":%x\n",
554 loghandle->lgh_cookie.lgc_lgl.lgl_oid,
555 loghandle->lgh_cookie.lgc_lgl.lgl_ogen);
556 down(&mds->mds_logs_dir->d_inode->i_sem);
557 dchild = dget(loghandle->lgh_file->f_dentry);
558 llog_delete_log(cathandle, loghandle);
560 CDEBUG(D_INODE, "closing log file "LPX64":%x\n",
561 loghandle->lgh_cookie.lgc_lgl.lgl_oid,
562 loghandle->lgh_cookie.lgc_lgl.lgl_ogen);
565 rc = filp_close(loghandle->lgh_file, 0);
567 llog_free_handle(loghandle); /* also removes loghandle from list */
570 int err = vfs_unlink(mds->mds_logs_dir->d_inode, dchild);
572 CERROR("error unlinking empty log %*s: rc %d\n",
573 dchild->d_name.len, dchild->d_name.name, err);
578 up(&mds->mds_logs_dir->d_inode->i_sem);
583 /* This is a callback from the llog_* functions.
584 * Assumes caller has already pushed us into the kernel context. */
585 struct llog_handle *mds_log_open(struct obd_device *obd,
586 struct llog_cookie *logcookie)
588 struct ll_fid fid = { .id = logcookie->lgc_lgl.lgl_oid,
589 .generation = logcookie->lgc_lgl.lgl_ogen,
591 struct llog_handle *loghandle;
592 struct dentry *dchild;
596 loghandle = llog_alloc_handle();
597 if (loghandle == NULL)
598 RETURN(ERR_PTR(-ENOMEM));
600 down(&obd->u.mds.mds_logs_dir->d_inode->i_sem);
601 dchild = mds_fid2dentry(&obd->u.mds, &fid, NULL);
602 up(&obd->u.mds.mds_logs_dir->d_inode->i_sem);
603 if (IS_ERR(dchild)) {
604 rc = PTR_ERR(dchild);
605 CERROR("error looking up log file "LPX64":%x: rc %d\n",
606 fid.id, fid.generation, rc);
610 if (dchild->d_inode == NULL) {
612 CERROR("nonexistent log file "LPX64":%x: rc %d\n",
613 fid.id, fid.generation, rc);
617 /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
618 mntget(obd->u.mds.mds_vfsmnt);
619 loghandle->lgh_file = dentry_open(dchild, obd->u.mds.mds_vfsmnt,
620 O_RDWR | O_LARGEFILE);
621 if (IS_ERR(loghandle->lgh_file)) {
622 rc = PTR_ERR(loghandle->lgh_file);
623 CERROR("error opening logfile "LPX64":%x: rc %d\n",
624 fid.id, fid.generation, rc);
627 memcpy(&loghandle->lgh_cookie, logcookie, sizeof(*logcookie));
628 loghandle->lgh_log_create = mds_log_create;
629 loghandle->lgh_log_open = mds_log_open;
630 loghandle->lgh_log_close = mds_log_close;
631 loghandle->lgh_obd = obd;
638 llog_free_handle(loghandle);
642 /* This is a callback from the llog_* functions.
643 * Assumes caller has already pushed us into the kernel context. */
644 struct llog_handle *mds_log_create(struct obd_device *obd)
646 char logbuf[24], *logname; /* logSSSSSSSSSS.count */
647 struct llog_handle *loghandle;
648 int rc, open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
651 loghandle = llog_alloc_handle();
653 RETURN(ERR_PTR(-ENOMEM));
656 if (!obd->u.mds.mds_catalog) {
657 logname = "LOGS/catalog";
659 sprintf(logbuf, "LOGS/log%lu.%u\n",
660 CURRENT_SECONDS, obd->u.mds.mds_catalog->lgh_index++);
661 open_flags |= O_EXCL;
664 loghandle->lgh_file = filp_open(logname, open_flags, 0644);
665 if (IS_ERR(loghandle->lgh_file)) {
666 rc = PTR_ERR(loghandle->lgh_file);
668 CDEBUG(D_HA, "collision in logfile %s creation\n",
670 obd->u.mds.mds_catalog->lgh_index++;
673 CERROR("error opening/creating %s: rc %d\n", logname, rc);
674 GOTO(out_handle, rc);
677 loghandle->lgh_cookie.lgc_lgl.lgl_oid =
678 loghandle->lgh_file->f_dentry->d_inode->i_ino;
679 loghandle->lgh_cookie.lgc_lgl.lgl_ogen =
680 loghandle->lgh_file->f_dentry->d_inode->i_generation;
681 loghandle->lgh_log_create = mds_log_create;
682 loghandle->lgh_log_open = mds_log_open;
683 loghandle->lgh_log_close = mds_log_close;
684 loghandle->lgh_obd = obd;
689 llog_free_handle(loghandle);
693 struct llog_handle *mds_get_catalog(struct obd_device *obd)
695 struct mds_server_data *msd = obd->u.mds.mds_server_data;
696 struct obd_run_ctxt saved;
697 struct llog_handle *cathandle = NULL;
701 push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
703 if (msd->msd_catalog_oid) {
704 struct llog_cookie catcookie;
706 catcookie.lgc_lgl.lgl_oid = le64_to_cpu(msd->msd_catalog_oid);
707 catcookie.lgc_lgl.lgl_ogen = le32_to_cpu(msd->msd_catalog_ogen);
708 cathandle = mds_log_open(obd, &catcookie);
709 if (IS_ERR(cathandle)) {
710 CERROR("error opening catalog "LPX64":%x: rc %d\n",
711 catcookie.lgc_lgl.lgl_oid,
712 catcookie.lgc_lgl.lgl_ogen,
713 (int)PTR_ERR(cathandle));
714 msd->msd_catalog_oid = 0;
715 msd->msd_catalog_ogen = 0;
717 /* ORPHANS FIXME: compare catalog UUID to msd_peeruuid */
720 if (!msd->msd_catalog_oid) {
721 struct llog_logid *lgl;
723 cathandle = mds_log_create(obd);
724 if (IS_ERR(cathandle)) {
725 CERROR("error creating new catalog: rc %d\n",
726 (int)PTR_ERR(cathandle));
727 GOTO(out, cathandle);
729 lgl = &cathandle->lgh_cookie.lgc_lgl;
730 msd->msd_catalog_oid = cpu_to_le64(lgl->lgl_oid);
731 msd->msd_catalog_ogen = cpu_to_le32(lgl->lgl_ogen);
732 rc = mds_update_server_data(obd);
734 CERROR("error writing new catalog to disk: rc %d\n",rc);
735 GOTO(out_handle, rc);
739 rc = llog_init_catalog(cathandle, &obd->u.mds.mds_osc_uuid);
742 pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
746 mds_log_close(cathandle, cathandle);
747 cathandle = ERR_PTR(rc);
752 void mds_put_catalog(struct llog_handle *cathandle)
754 struct llog_handle *loghandle, *n;
758 list_for_each_entry_safe(loghandle, n, &cathandle->lgh_list, lgh_list)
759 mds_log_close(cathandle, loghandle);
761 rc = filp_close(cathandle->lgh_file, 0);
763 CERROR("error closing catalog: rc %d\n", rc);
765 llog_free_handle(cathandle);