1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
46 struct mds_obd *mds = &obd->u.mds;
49 CDEBUG(D_INFO, "dump from %s\n", label);
50 if (mds->mds_lov_page_dirty == NULL) {
51 CERROR("NULL bitmap!\n");
55 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
58 if (mds->mds_lov_page_array == NULL) {
59 CERROR("not init page array!\n");
63 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64 obd_id *data = mds->mds_lov_page_array[i];
69 for(j=0; j < OBJID_PER_PAGE(); j++) {
72 CDEBUG(D_INFO,"objid page %u idx %u - %llu ",i,j,data[j]);
79 int mds_lov_init_objids(struct obd_device *obd)
81 struct mds_obd *mds = &obd->u.mds;
82 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
87 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
89 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90 if (mds->mds_lov_page_dirty == NULL)
94 OBD_ALLOC(mds->mds_lov_page_array, size);
95 if (mds->mds_lov_page_array == NULL)
96 GOTO(err_free_bitmap, rc = -ENOMEM);
98 /* open and test the lov objd file */
99 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
102 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103 GOTO(err_free, rc = PTR_ERR(file));
105 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107 file->f_dentry->d_inode->i_mode);
108 GOTO(err_open, rc = -ENOENT);
110 mds->mds_lov_objid_filp = file;
114 if (filp_close((struct file *)file, 0))
115 CERROR("can't close %s after error\n", LOV_OBJID);
117 OBD_FREE(mds->mds_lov_page_array, size);
119 FREE_BITMAP(mds->mds_lov_page_dirty);
123 EXPORT_SYMBOL(mds_lov_init_objids);
125 void mds_lov_destroy_objids(struct obd_device *obd)
127 struct mds_obd *mds = &obd->u.mds;
131 if (mds->mds_lov_page_array != NULL) {
132 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133 obd_id *data = mds->mds_lov_page_array[i];
135 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
137 OBD_FREE(mds->mds_lov_page_array,
138 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
141 if (mds->mds_lov_objid_filp) {
142 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143 mds->mds_lov_objid_filp = NULL;
145 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
148 FREE_BITMAP(mds->mds_lov_page_dirty);
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
155 struct mds_obd *mds = &obd->u.mds;
159 /* if we create file without objects - lmm is NULL */
163 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166 int page = i / OBJID_PER_PAGE();
167 int idx = i % OBJID_PER_PAGE();
168 obd_id *data = mds->mds_lov_page_array[page];
170 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171 " old %llu\n", i, id, data[idx]);
172 if (id > data[idx]) {
174 bitmap_set(mds->mds_lov_page_dirty, page);
179 EXPORT_SYMBOL(mds_lov_update_objids);
181 static int mds_lov_read_objids(struct obd_device *obd)
183 struct mds_obd *mds = &obd->u.mds;
185 int i, rc, count = 0, page = 0;
189 /* Read everything in the file, even if our current lov desc
190 has fewer targets. Old targets not in the lov descriptor
191 during mds setup may still have valid objids. */
192 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
196 page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197 CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
198 for(i=0; i < page; i++) {
199 obd_id *data = mds->mds_lov_page_array[i];
200 loff_t off_old = off;
202 LASSERT(data == NULL);
203 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
205 GOTO(out, rc = -ENOMEM);
207 mds->mds_lov_page_array[i] = data;
209 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210 OBJID_PER_PAGE()*sizeof(obd_id), &off);
212 CERROR("Error reading objids %d\n", rc);
218 count += (off-off_old+sizeof(obd_id)-1)/sizeof(obd_id);
220 mds->mds_lov_objid_count = count;
221 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
222 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
223 CDEBUG(D_INFO, "Read %u objid\n", count);
225 mds_lov_dump_objids("read",obd);
230 int mds_lov_write_objids(struct obd_device *obd)
232 struct mds_obd *mds = &obd->u.mds;
236 if (bitmap_check_empty(mds->mds_lov_page_dirty))
239 mds_lov_dump_objids("write", obd);
241 foreach_bit(mds->mds_lov_page_dirty, i) {
242 obd_id *data = mds->mds_lov_page_array[i];
243 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
244 loff_t off = i * size;
246 LASSERT(data != NULL);
248 /* check for particaly filled last page */
249 if (i == mds->mds_lov_objid_lastpage) {
250 size = mds->mds_lov_objid_lastidx * sizeof(obd_id);
253 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
257 bitmap_clear(mds->mds_lov_page_dirty, i);
264 EXPORT_SYMBOL(mds_lov_write_objids);
266 static int mds_lov_get_objid(struct obd_device * obd, struct obd_export *export,
269 struct mds_obd *mds = &obd->u.mds;
276 page = idx / OBJID_PER_PAGE();
277 off = idx % OBJID_PER_PAGE();
278 data = mds->mds_lov_page_array[page];
280 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
282 GOTO(out, rc = -ENOMEM);
284 mds->mds_lov_page_array[page] = data;
287 printk("get %d - %p - %d/%d\n", idx, data, page, off);
288 if (data[off] == 0) {
289 /* We never read this lastid; ask the osc */
290 struct obd_id_info lastid;
291 __u32 size = sizeof(lastid);
294 lastid.data = &data[off];
295 rc = obd_get_info(export, sizeof("last_id"),
296 "last_id", &size, &lastid);
300 if (idx > mds->mds_lov_objid_count) {
301 mds->mds_lov_objid_count = idx;
302 mds->mds_lov_objid_lastpage = page;
303 mds->mds_lov_objid_lastidx = off;
305 bitmap_set(mds->mds_lov_page_dirty, page);
311 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
315 struct obd_trans_info oti = {0};
316 struct lov_stripe_md *empty_ea = NULL;
319 LASSERT(mds->mds_lov_page_array != NULL);
321 /* This create will in fact either create or destroy: If the OST is
322 * missing objects below this ID, they will be created. If it finds
323 * objects above this ID, they will be removed. */
324 memset(&oa, 0, sizeof(oa));
325 oa.o_flags = OBD_FL_DELORPHAN;
326 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
327 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
328 if (ost_uuid != NULL) {
329 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
330 oa.o_valid |= OBD_MD_FLINLINE;
332 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
336 /* update the LOV-OSC knowledge of the last used object id's */
337 /* for all targets */
338 /* is we realy need this ? all osc's should be pass via __mds_lov_synchronize
340 #define MDS_LOV_SETID_COUNT (CFS_PAGE_SIZE / sizeof(struct obd_id_info))
342 int mds_lov_set_nextid(struct obd_device *obd)
344 struct mds_obd *mds = &obd->u.mds;
345 int i = 0, j, rc = 0;
346 struct obd_id_info *info;
349 LASSERT(!obd->obd_recovering);
351 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
352 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
354 OBD_ALLOC(info, CFS_PAGE_SIZE);
358 while(i < mds->mds_lov_desc.ld_tgt_count) {
359 for(j=0; j < MDS_LOV_SETID_COUNT; i++, j++) {
360 int page = i / OBJID_PER_PAGE();
361 int idx = i % OBJID_PER_PAGE();
362 obd_id *data = mds->mds_lov_page_array[page];
364 if (i == mds->mds_lov_desc.ld_tgt_count)
368 info[j].data = &data[idx];
371 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
372 KEY_NEXT_ID, sizeof(info), &info, NULL);
374 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
379 OBD_FREE(info, CFS_PAGE_SIZE);
386 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
388 struct mds_obd *mds = &obd->u.mds;
390 struct obd_id_info info;
393 LASSERT(!obd->obd_recovering);
395 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
396 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
401 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
402 KEY_NEXT_ID, sizeof(info), &info, NULL);
404 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
410 /* Update the lov desc for a new size lov. */
411 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
413 struct mds_obd *mds = &obd->u.mds;
415 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
419 OBD_ALLOC(ld, sizeof(*ld));
423 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
428 /* The size of the LOV target table may have increased. */
429 page = ld->ld_tgt_count / OBJID_PER_PAGE();
430 if (mds->mds_lov_page_array[page] == NULL) {
433 OBD_ALLOC(ids, MDS_LOV_ALLOC_SIZE);
435 GOTO(out, rc = -ENOMEM);
437 mds->mds_lov_page_array[page] = ids;
440 /* Don't change the mds_lov_desc until the objids size matches the
442 mds->mds_lov_desc = *ld;
443 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
444 mds->mds_lov_desc.ld_tgt_count);
446 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
447 mds->mds_lov_desc.ld_tgt_count);
449 mds->mds_max_mdsize = lov_mds_md_size(stripes);
450 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
451 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
452 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
455 /* If we added a target we have to reconnect the llogs */
456 /* We only _need_ to do this at first add (idx), or the first time
457 after recovery. However, it should now be safe to call anytime. */
458 rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
460 /*XXX this notifies the MDD until lov handling use old mds code */
461 if (obd->obd_upcall.onu_owner) {
462 LASSERT(obd->obd_upcall.onu_upcall != NULL);
463 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
464 obd->obd_upcall.onu_owner);
467 OBD_FREE(ld, sizeof(*ld));
472 #define MDSLOV_NO_INDEX -1
474 /* Inform MDS about new/updated target */
475 static int mds_lov_update_mds(struct obd_device *obd,
476 struct obd_device *watched,
477 __u32 idx, struct obd_uuid *uuid)
479 struct mds_obd *mds = &obd->u.mds;
488 /* Don't let anyone else mess with mds_lov_objids now */
489 mutex_down(&obd->obd_dev_sem);
491 old_count = mds->mds_lov_desc.ld_tgt_count;
492 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
496 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
497 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
498 mds->mds_lov_desc.ld_tgt_count);
500 /* idx is set as data from lov_notify. */
501 if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
504 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
505 CERROR("index %d > count %d!\n", idx,
506 mds->mds_lov_desc.ld_tgt_count);
507 GOTO(out, rc = -EINVAL);
510 page = idx / OBJID_PER_PAGE();
511 off = idx % OBJID_PER_PAGE();
512 data = mds->mds_lov_page_array[page];
513 CDEBUG(D_CONFIG, "idx %d - %p - %d/%d\n", idx, data, page, off);
514 if (data[off] == 0) {
515 rc = mds_lov_get_objid(obd, watched->obd_self_export, idx);
517 /* We have read this lastid from disk; tell the osc.
518 Don't call this during recovery. */
519 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
521 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
522 /* Don't abort the rest of the sync */
527 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
530 mutex_up(&obd->obd_dev_sem);
534 /* update the LOV-OSC knowledge of the last used object id's */
535 int mds_lov_connect(struct obd_device *obd, char * lov_name)
537 struct mds_obd *mds = &obd->u.mds;
538 struct lustre_handle conn = {0,};
539 struct obd_connect_data *data;
543 if (IS_ERR(mds->mds_osc_obd))
544 RETURN(PTR_ERR(mds->mds_osc_obd));
546 if (mds->mds_osc_obd)
549 mds->mds_osc_obd = class_name2obd(lov_name);
550 if (!mds->mds_osc_obd) {
551 CERROR("MDS cannot locate LOV %s\n", lov_name);
552 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
556 OBD_ALLOC(data, sizeof(*data));
559 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
560 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
561 OBD_CONNECT_OSS_CAPA;
562 #ifdef HAVE_LRU_RESIZE_SUPPORT
563 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
565 data->ocd_version = LUSTRE_VERSION_CODE;
566 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
567 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
568 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
569 OBD_FREE(data, sizeof(*data));
571 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
572 mds->mds_osc_obd = ERR_PTR(rc);
575 mds->mds_osc_exp = class_conn2export(&conn);
577 rc = obd_register_observer(mds->mds_osc_obd, obd);
579 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
581 GOTO(err_discon, rc);
584 /* Deny new client connections until we are sure we have some OSTs */
585 obd->obd_no_conn = 1;
587 mutex_down(&obd->obd_dev_sem);
588 rc = mds_lov_read_objids(obd);
590 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
594 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
598 /* If we're mounting this code for the first time on an existing FS,
599 * we need to populate the objids array from the real OST values */
600 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
601 __u32 i = mds->mds_lov_objid_count;
602 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
603 rc = mds_lov_get_objid(obd, mds->mds_osc_exp, i);
608 rc = mds_lov_write_objids(obd);
610 CERROR("got last objids from OSTs, but error "
611 "in update objids file: %d\n", rc);
613 mutex_up(&obd->obd_dev_sem);
615 /* I want to see a callback happen when the OBD moves to a
616 * "For General Use" state, and that's when we'll call
617 * set_nextid(). The class driver can help us here, because
618 * it can use the obd_recovering flag to determine when the
619 * the OBD is full available. */
620 /* MDD device will care about that
621 if (!obd->obd_recovering)
622 rc = mds_postrecov(obd);
627 mutex_up(&obd->obd_dev_sem);
628 obd_register_observer(mds->mds_osc_obd, NULL);
630 obd_disconnect(mds->mds_osc_exp);
631 mds->mds_osc_exp = NULL;
632 mds->mds_osc_obd = ERR_PTR(rc);
636 int mds_lov_disconnect(struct obd_device *obd)
638 struct mds_obd *mds = &obd->u.mds;
642 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
643 obd_register_observer(mds->mds_osc_obd, NULL);
645 /* The actual disconnect of the mds_lov will be called from
646 * class_disconnect_exports from mds_lov_clean. So we have to
647 * ensure that class_cleanup doesn't fail due to the extra ref
648 * we're holding now. The mechanism to do that already exists -
649 * the obd_force flag. We'll drop the final ref to the
650 * mds_osc_exp in mds_cleanup. */
651 mds->mds_osc_obd->obd_force = 1;
657 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
658 void *karg, void *uarg)
660 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
661 struct obd_device *obd = exp->exp_obd;
662 struct mds_obd *mds = &obd->u.mds;
663 struct obd_ioctl_data *data = karg;
664 struct lvfs_run_ctxt saved;
668 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
671 case OBD_IOC_RECORD: {
672 char *name = data->ioc_inlbuf1;
673 if (mds->mds_cfg_llh)
676 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
677 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
678 &mds->mds_cfg_llh, NULL, name);
680 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
683 mds->mds_cfg_llh = NULL;
684 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
689 case OBD_IOC_ENDRECORD: {
690 if (!mds->mds_cfg_llh)
693 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
694 rc = llog_close(mds->mds_cfg_llh);
695 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
697 mds->mds_cfg_llh = NULL;
701 case OBD_IOC_CLEAR_LOG: {
702 char *name = data->ioc_inlbuf1;
703 if (mds->mds_cfg_llh)
706 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
707 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
708 &mds->mds_cfg_llh, NULL, name);
710 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
713 rc = llog_destroy(mds->mds_cfg_llh);
714 llog_free_handle(mds->mds_cfg_llh);
716 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
718 mds->mds_cfg_llh = NULL;
722 case OBD_IOC_DORECORD: {
724 struct llog_rec_hdr rec;
725 if (!mds->mds_cfg_llh)
728 rec.lrh_len = llog_data_len(data->ioc_plen1);
730 if (data->ioc_type == LUSTRE_CFG_TYPE) {
731 rec.lrh_type = OBD_CFG_REC;
733 CERROR("unknown cfg record type:%d \n", data->ioc_type);
737 OBD_ALLOC(cfg_buf, data->ioc_plen1);
740 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
742 OBD_FREE(cfg_buf, data->ioc_plen1);
746 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
747 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
749 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
751 OBD_FREE(cfg_buf, data->ioc_plen1);
755 case OBD_IOC_PARSE: {
756 struct llog_ctxt *ctxt =
757 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
758 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
759 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
760 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
767 case OBD_IOC_DUMP_LOG: {
768 struct llog_ctxt *ctxt =
769 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
770 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
771 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
772 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
780 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
781 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
785 case OBD_IOC_SET_READONLY: {
787 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
788 BDEVNAME_DECLARE_STORAGE(tmp);
789 CERROR("*** setting device %s read-only ***\n",
790 ll_bdevname(obd->u.obt.obt_sb, tmp));
792 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
794 rc = fsfilt_commit(obd, inode, handle, 1);
796 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
797 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
799 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
803 case OBD_IOC_CATLOGLIST: {
804 int count = mds->mds_lov_desc.ld_tgt_count;
805 rc = llog_catalog_list(obd, count, data);
809 case OBD_IOC_LLOG_CHECK:
810 case OBD_IOC_LLOG_CANCEL:
811 case OBD_IOC_LLOG_REMOVE: {
812 struct llog_ctxt *ctxt =
813 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
817 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
818 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
819 rc = llog_ioctl(ctxt, cmd, data);
820 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
821 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
822 group = FILTER_GROUP_MDS0 + mds->mds_id;
823 rc2 = obd_set_info_async(mds->mds_osc_exp,
824 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
825 sizeof(group), &group, NULL);
830 case OBD_IOC_LLOG_INFO:
831 case OBD_IOC_LLOG_PRINT: {
832 struct llog_ctxt *ctxt =
833 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
835 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
836 rc = llog_ioctl(ctxt, cmd, data);
837 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
842 case OBD_IOC_ABORT_RECOVERY:
843 CERROR("aborting recovery for device %s\n", obd->obd_name);
844 target_stop_recovery_thread(obd);
848 CDEBUG(D_INFO, "unknown command %x\n", cmd);
855 /* Collect the preconditions we need to allow client connects */
856 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
858 if (flag & CONFIG_LOG)
859 obd->u.mds.mds_fl_cfglog = 1;
860 if (flag & CONFIG_SYNC)
861 obd->u.mds.mds_fl_synced = 1;
862 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
863 /* Open for clients */
864 obd->obd_no_conn = 0;
867 struct mds_lov_sync_info {
868 struct obd_device *mlsi_obd; /* the lov device to sync */
869 struct obd_device *mlsi_watched; /* target osc */
870 __u32 mlsi_index; /* index of target */
873 static int mds_propagate_capa_keys(struct mds_obd *mds)
875 struct lustre_capa_key *key;
880 if (!mds->mds_capa_keys)
883 for (i = 0; i < 2; i++) {
884 key = &mds->mds_capa_keys[i];
885 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
887 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
888 KEY_CAPA_KEY, sizeof(*key), key, NULL);
890 DEBUG_CAPA_KEY(D_ERROR, key,
891 "propagate failed (rc = %d) for", rc);
899 /* We only sync one osc at a time, so that we don't have to hold
900 any kind of lock on the whole mds_lov_desc, which may change
901 (grow) as a result of mds_lov_add_ost. This also avoids any
902 kind of mismatch between the lov_desc and the mds_lov_desc,
903 which are not in lock-step during lov_add_obd */
904 static int __mds_lov_synchronize(void *data)
906 struct mds_lov_sync_info *mlsi = data;
907 struct obd_device *obd = mlsi->mlsi_obd;
908 struct obd_device *watched = mlsi->mlsi_watched;
909 struct mds_obd *mds = &obd->u.mds;
910 struct obd_uuid *uuid;
911 __u32 idx = mlsi->mlsi_index;
912 struct mds_group_info mgi;
916 OBD_FREE(mlsi, sizeof(*mlsi));
920 uuid = &watched->u.cli.cl_target_uuid;
923 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
925 rc = mds_lov_update_mds(obd, watched, idx, uuid);
927 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
930 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
933 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
934 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
938 /* propagate capability keys */
939 rc = mds_propagate_capa_keys(mds);
943 rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
944 mds->mds_lov_desc.ld_tgt_count,
948 CERROR("%s failed at llog_origin_connect: %d\n",
949 obd_uuid2str(uuid), rc);
953 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
954 obd->obd_name, obd_uuid2str(uuid));
956 * FIXME: this obd_stopping was useless,
957 * since obd in mdt layer was set
959 if (obd->obd_stopping)
960 GOTO(out, rc = -ENODEV);
962 rc = mds_lov_clear_orphans(mds, uuid);
964 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
965 obd_uuid2str(uuid), rc);
969 if (obd->obd_upcall.onu_owner) {
971 * This is a hack for mds_notify->mdd_notify. When the mds obd
972 * in mdd is removed, This hack should be removed.
974 LASSERT(obd->obd_upcall.onu_upcall != NULL);
975 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
976 obd->obd_upcall.onu_owner);
981 /* Deactivate it for safety */
982 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
984 if (!obd->obd_stopping && mds->mds_osc_obd &&
985 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
986 obd_notify(mds->mds_osc_obd, watched,
987 OBD_NOTIFY_INACTIVE, NULL);
994 int mds_lov_synchronize(void *data)
996 struct mds_lov_sync_info *mlsi = data;
999 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
1000 /* There is still a watched target,
1001 but we don't know its index */
1002 sprintf(name, "ll_sync_tgt");
1004 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1005 ptlrpc_daemonize(name);
1007 RETURN(__mds_lov_synchronize(data));
1010 int mds_lov_start_synchronize(struct obd_device *obd,
1011 struct obd_device *watched,
1012 void *data, int nonblock)
1014 struct mds_lov_sync_info *mlsi;
1021 OBD_ALLOC(mlsi, sizeof(*mlsi));
1025 mlsi->mlsi_obd = obd;
1026 mlsi->mlsi_watched = watched;
1028 mlsi->mlsi_index = *(__u32 *)data;
1030 mlsi->mlsi_index = MDSLOV_NO_INDEX;
1032 /* Although class_export_get(obd->obd_self_export) would lock
1033 the MDS in place, since it's only a self-export
1034 it doesn't lock the LOV in place. The LOV can be disconnected
1035 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1036 Simply taking an export ref on the LOV doesn't help, because it's
1037 still disconnected. Taking an obd reference insures that we don't
1038 disconnect the LOV. This of course means a cleanup won't
1039 finish for as long as the sync is blocking. */
1043 /* Synchronize in the background */
1044 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1045 CLONE_VM | CLONE_FILES);
1047 CERROR("%s: error starting mds_lov_synchronize: %d\n",
1051 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1052 "thread=%d\n", obd->obd_name,
1053 mlsi->mlsi_index, rc);
1057 rc = __mds_lov_synchronize((void *)mlsi);
1063 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1064 enum obd_notify_event ev, void *data)
1070 /* We only handle these: */
1071 case OBD_NOTIFY_ACTIVE:
1072 case OBD_NOTIFY_SYNC:
1073 case OBD_NOTIFY_SYNC_NONBLOCK:
1075 case OBD_NOTIFY_CONFIG:
1076 mds_allow_cli(obd, (unsigned int)data);
1081 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1082 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1083 CERROR("unexpected notification of %s %s!\n",
1084 watched->obd_type->typ_name, watched->obd_name);
1088 if (obd->obd_recovering) {
1089 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1091 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1092 /* We still have to fix the lov descriptor for ost's added
1093 after the mdt in the config log. They didn't make it into
1095 mutex_down(&obd->obd_dev_sem);
1096 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1098 mutex_up(&obd->obd_dev_sem);
1101 /* We should update init llog here too for replay unlink and
1102 * possiable llog init race when recovery complete */
1103 llog_cat_initialize(obd, NULL,
1104 obd->u.mds.mds_lov_desc.ld_tgt_count,
1105 &watched->u.cli.cl_target_uuid);
1106 mutex_up(&obd->obd_dev_sem);
1107 mds_allow_cli(obd, CONFIG_SYNC);
1111 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
1112 rc = mds_lov_start_synchronize(obd, watched, data,
1113 !(ev == OBD_NOTIFY_SYNC));
1115 lquota_recovery(mds_quota_interface_ref, obd);
1120 /* Convert the on-disk LOV EA structre.
1121 * We always try to convert from an old LOV EA format to the common in-memory
1122 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1123 * then convert back to the new on-disk format and save it back to disk
1124 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1125 * to convert it each time this inode is accessed.
1127 * This function is a bit interesting in the error handling. We can safely
1128 * ship the old lmm to the client in case of failure, since it uses the same
1129 * obd_unpackmd() code and can do the conversion if the MDS fails for some
1130 * reason. We will not delete the old lmm data until we have written the
1131 * new format lmm data in fsfilt_set_md(). */
1132 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1133 struct lov_mds_md *lmm, int lmm_size)
1135 struct lov_stripe_md *lsm = NULL;
1140 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1141 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1144 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1145 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1148 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1152 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1154 GOTO(conv_free, rc);
1157 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1158 if (IS_ERR(handle)) {
1159 rc = PTR_ERR(handle);
1160 GOTO(conv_free, rc);
1163 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1165 err = fsfilt_commit(obd, inode, handle, 0);
1167 rc = err ? err : lmm_size;
1168 GOTO(conv_free, rc);
1170 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);