1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
46 struct mds_obd *mds = &obd->u.mds;
49 CDEBUG(D_INFO, "dump from %s\n", label);
50 if (mds->mds_lov_page_dirty == NULL) {
51 CERROR("NULL bitmap!\n");
55 for(i=0; i < ((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1) ;i++)
56 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
58 if (mds->mds_lov_page_array == NULL) {
59 CERROR("not init page array!\n");
63 for(i=0; i < MDS_LOV_OBJID_PAGES_COUNT ;i++) {
64 obd_id *data = mds->mds_lov_page_array[i];
69 for(j=0; j < OBJID_PER_PAGE(); j++) {
72 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
80 int mds_lov_init_objids(struct obd_device *obd)
82 struct mds_obd *mds = &obd->u.mds;
83 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
88 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
90 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
91 if (mds->mds_lov_page_dirty == NULL)
95 OBD_ALLOC(mds->mds_lov_page_array, size);
96 if (mds->mds_lov_page_array == NULL)
97 GOTO(err_free_bitmap, rc = -ENOMEM);
99 /* open and test the lov objd file */
100 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
103 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
104 GOTO(err_free, rc = PTR_ERR(file));
106 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
107 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
108 file->f_dentry->d_inode->i_mode);
109 GOTO(err_open, rc = -ENOENT);
111 mds->mds_lov_objid_filp = file;
115 if (filp_close((struct file *)file, 0))
116 CERROR("can't close %s after error\n", LOV_OBJID);
118 OBD_FREE(mds->mds_lov_page_array, size);
120 FREE_BITMAP(mds->mds_lov_page_dirty);
124 EXPORT_SYMBOL(mds_lov_init_objids);
126 void mds_lov_destroy_objids(struct obd_device *obd)
128 struct mds_obd *mds = &obd->u.mds;
132 if (mds->mds_lov_page_array != NULL) {
133 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
134 obd_id *data = mds->mds_lov_page_array[i];
136 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
138 OBD_FREE(mds->mds_lov_page_array,
139 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
142 if (mds->mds_lov_objid_filp) {
143 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
144 mds->mds_lov_objid_filp = NULL;
146 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
149 FREE_BITMAP(mds->mds_lov_page_dirty);
152 EXPORT_SYMBOL(mds_lov_destroy_objids);
154 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
156 struct mds_obd *mds = &obd->u.mds;
160 /* if we create file without objects - lmm is NULL */
164 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
165 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
166 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
167 int page = i / OBJID_PER_PAGE();
168 int idx = i % OBJID_PER_PAGE();
169 obd_id *data = mds->mds_lov_page_array[page];
171 CDEBUG(D_INODE,"update last object for ost %d"
172 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
173 if (id > data[idx]) {
175 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
180 EXPORT_SYMBOL(mds_lov_update_objids);
182 static int mds_lov_read_objids(struct obd_device *obd)
184 struct mds_obd *mds = &obd->u.mds;
186 int i, rc = 0, count = 0, page = 0;
190 /* Read everything in the file, even if our current lov desc
191 has fewer targets. Old targets not in the lov descriptor
192 during mds setup may still have valid objids. */
193 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
197 page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
198 CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
199 for(i=0; i < page; i++) {
200 obd_id *data = mds->mds_lov_page_array[i];
201 loff_t off_old = off;
203 LASSERT(data == NULL);
204 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
206 GOTO(out, rc = -ENOMEM);
208 mds->mds_lov_page_array[i] = data;
210 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
211 OBJID_PER_PAGE()*sizeof(obd_id), &off);
213 CERROR("Error reading objids %d\n", rc);
219 count += (off-off_old)/sizeof(obd_id);
221 mds->mds_lov_objid_count = count;
224 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
225 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
227 CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
228 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
230 mds_lov_dump_objids("read",obd);
235 int mds_lov_write_objids(struct obd_device *obd)
237 struct mds_obd *mds = &obd->u.mds;
241 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
244 mds_lov_dump_objids("write", obd);
246 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
247 obd_id *data = mds->mds_lov_page_array[i];
248 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
249 loff_t off = i * size;
251 LASSERT(data != NULL);
253 /* check for particaly filled last page */
254 if (i == mds->mds_lov_objid_lastpage)
255 size = (mds->mds_lov_objid_lastidx + 1) * sizeof(obd_id);
257 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
261 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
268 EXPORT_SYMBOL(mds_lov_write_objids);
270 static int mds_lov_get_objid(struct obd_device * obd,
273 struct mds_obd *mds = &obd->u.mds;
280 page = idx / OBJID_PER_PAGE();
281 off = idx % OBJID_PER_PAGE();
282 data = mds->mds_lov_page_array[page];
284 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
286 GOTO(out, rc = -ENOMEM);
288 mds->mds_lov_page_array[page] = data;
291 if (data[off] == 0) {
292 /* We never read this lastid; ask the osc */
293 struct obd_id_info lastid;
294 __u32 size = sizeof(lastid);
297 lastid.data = &data[off];
298 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
299 KEY_LAST_ID, &size, &lastid);
303 if (idx > mds->mds_lov_objid_count) {
304 mds->mds_lov_objid_count = idx;
305 mds->mds_lov_objid_lastpage = page;
306 mds->mds_lov_objid_lastidx = off;
308 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
310 CDEBUG(D_INFO, "idx %d - %p - %d/%d - "LPU64"\n",
311 idx, data, page, off, data[off]);
316 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
320 struct obd_trans_info oti = {0};
321 struct lov_stripe_md *empty_ea = NULL;
324 LASSERT(mds->mds_lov_page_array != NULL);
326 /* This create will in fact either create or destroy: If the OST is
327 * missing objects below this ID, they will be created. If it finds
328 * objects above this ID, they will be removed. */
329 memset(&oa, 0, sizeof(oa));
330 oa.o_flags = OBD_FL_DELORPHAN;
331 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
332 if (ost_uuid != NULL) {
333 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
334 oa.o_valid |= OBD_MD_FLINLINE;
336 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
342 static int mds_lov_set_one_nextid(struct obd_device * obd, __u32 idx, obd_id *id)
344 struct mds_obd *mds = &obd->u.mds;
346 struct obd_id_info info;
349 LASSERT(!obd->obd_recovering);
351 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
352 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
357 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
358 KEY_NEXT_ID, sizeof(info), &info, NULL);
360 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
366 static __u32 mds_lov_get_idx(struct obd_export *lov,
367 struct obd_uuid *ost_uuid)
370 int valsize = sizeof(ost_uuid);
372 rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
379 /* Update the lov desc for a new size lov. */
380 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
382 struct mds_obd *mds = &obd->u.mds;
384 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
388 OBD_ALLOC(ld, sizeof(*ld));
392 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
397 /* Don't change the mds_lov_desc until the objids size matches the
399 mds->mds_lov_desc = *ld;
400 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
401 mds->mds_lov_desc.ld_tgt_count);
403 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
404 mds->mds_lov_desc.ld_tgt_count);
406 mds->mds_max_mdsize = lov_mds_md_size(stripes);
407 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
408 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
409 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
412 /* If we added a target we have to reconnect the llogs */
413 /* We only _need_ to do this at first add (idx), or the first time
414 after recovery. However, it should now be safe to call anytime. */
415 rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
418 OBD_FREE(ld, sizeof(*ld));
423 /* Inform MDS about new/updated target */
424 static int mds_lov_update_mds(struct obd_device *obd,
425 struct obd_device *watched,
428 struct mds_obd *mds = &obd->u.mds;
437 /* Don't let anyone else mess with mds_lov_objids now */
438 mutex_down(&obd->obd_dev_sem);
440 old_count = mds->mds_lov_desc.ld_tgt_count;
441 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
445 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
446 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
447 mds->mds_lov_desc.ld_tgt_count);
449 /* idx is set as data from lov_notify. */
450 if (obd->obd_recovering)
453 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
454 CERROR("index %d > count %d!\n", idx,
455 mds->mds_lov_desc.ld_tgt_count);
456 GOTO(out, rc = -EINVAL);
459 rc = mds_lov_get_objid(obd, idx);
461 CERROR("Failed to get objid - %d\n", rc);
465 page = idx / OBJID_PER_PAGE();
466 off = idx % OBJID_PER_PAGE();
467 data = mds->mds_lov_page_array[page];
468 /* We have read this lastid from disk; tell the osc.
469 Don't call this during recovery. */
470 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
472 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
473 /* Don't abort the rest of the sync */
477 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
480 mutex_up(&obd->obd_dev_sem);
484 /* update the LOV-OSC knowledge of the last used object id's */
485 int mds_lov_connect(struct obd_device *obd, char * lov_name)
487 struct mds_obd *mds = &obd->u.mds;
488 struct lustre_handle conn = {0,};
489 struct obd_connect_data *data;
493 if (IS_ERR(mds->mds_osc_obd))
494 RETURN(PTR_ERR(mds->mds_osc_obd));
496 if (mds->mds_osc_obd)
499 mds->mds_osc_obd = class_name2obd(lov_name);
500 if (!mds->mds_osc_obd) {
501 CERROR("MDS cannot locate LOV %s\n", lov_name);
502 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
506 OBD_ALLOC(data, sizeof(*data));
509 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
510 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT;
511 #ifdef HAVE_LRU_RESIZE_SUPPORT
512 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
514 data->ocd_version = LUSTRE_VERSION_CODE;
515 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
516 rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
517 OBD_FREE(data, sizeof(*data));
519 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
520 mds->mds_osc_obd = ERR_PTR(rc);
523 mds->mds_osc_exp = class_conn2export(&conn);
525 rc = obd_register_observer(mds->mds_osc_obd, obd);
527 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
529 GOTO(err_discon, rc);
532 /* Deny new client connections until we are sure we have some OSTs */
533 obd->obd_no_conn = 1;
535 mutex_down(&obd->obd_dev_sem);
536 rc = mds_lov_read_objids(obd);
538 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
542 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
546 /* If we're mounting this code for the first time on an existing FS,
547 * we need to populate the objids array from the real OST values */
548 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
549 __u32 i = mds->mds_lov_objid_count;
550 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
551 rc = mds_lov_get_objid(obd, i);
556 rc = mds_lov_write_objids(obd);
558 CERROR("got last objids from OSTs, but error "
559 "in update objids file: %d\n", rc);
562 mutex_up(&obd->obd_dev_sem);
564 /* I want to see a callback happen when the OBD moves to a
565 * "For General Use" state, and that's when we'll call
566 * set_nextid(). The class driver can help us here, because
567 * it can use the obd_recovering flag to determine when the
568 * the OBD is full available. */
569 if (!obd->obd_recovering)
570 rc = mds_postrecov(obd);
574 mutex_up(&obd->obd_dev_sem);
575 obd_register_observer(mds->mds_osc_obd, NULL);
577 obd_disconnect(mds->mds_osc_exp);
578 mds->mds_osc_exp = NULL;
579 mds->mds_osc_obd = ERR_PTR(rc);
583 int mds_lov_disconnect(struct obd_device *obd)
585 struct mds_obd *mds = &obd->u.mds;
589 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
590 obd_register_observer(mds->mds_osc_obd, NULL);
592 /* The actual disconnect of the mds_lov will be called from
593 * class_disconnect_exports from mds_lov_clean. So we have to
594 * ensure that class_cleanup doesn't fail due to the extra ref
595 * we're holding now. The mechanism to do that already exists -
596 * the obd_force flag. We'll drop the final ref to the
597 * mds_osc_exp in mds_cleanup. */
598 mds->mds_osc_obd->obd_force = 1;
604 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
605 void *karg, void *uarg)
607 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
608 struct obd_device *obd = exp->exp_obd;
609 struct mds_obd *mds = &obd->u.mds;
610 struct obd_ioctl_data *data = karg;
611 struct lvfs_run_ctxt saved;
615 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
618 case OBD_IOC_RECORD: {
619 char *name = data->ioc_inlbuf1;
620 struct llog_ctxt *ctxt;
622 if (mds->mds_cfg_llh)
625 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
626 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
627 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
630 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
633 mds->mds_cfg_llh = NULL;
634 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
639 case OBD_IOC_ENDRECORD: {
640 if (!mds->mds_cfg_llh)
643 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
644 rc = llog_close(mds->mds_cfg_llh);
645 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
647 mds->mds_cfg_llh = NULL;
651 case OBD_IOC_CLEAR_LOG: {
652 char *name = data->ioc_inlbuf1;
653 struct llog_ctxt *ctxt;
654 if (mds->mds_cfg_llh)
657 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
658 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
659 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
662 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
665 rc = llog_destroy(mds->mds_cfg_llh);
666 llog_free_handle(mds->mds_cfg_llh);
668 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
670 mds->mds_cfg_llh = NULL;
674 case OBD_IOC_DORECORD: {
676 struct llog_rec_hdr rec;
677 if (!mds->mds_cfg_llh)
680 rec.lrh_len = llog_data_len(data->ioc_plen1);
682 if (data->ioc_type == LUSTRE_CFG_TYPE) {
683 rec.lrh_type = OBD_CFG_REC;
685 CERROR("unknown cfg record type:%d \n", data->ioc_type);
689 OBD_ALLOC(cfg_buf, data->ioc_plen1);
692 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
694 OBD_FREE(cfg_buf, data->ioc_plen1);
698 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
699 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
701 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
703 OBD_FREE(cfg_buf, data->ioc_plen1);
707 case OBD_IOC_PARSE: {
708 struct llog_ctxt *ctxt =
709 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
710 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
711 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
712 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
720 case OBD_IOC_DUMP_LOG: {
721 struct llog_ctxt *ctxt =
722 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
723 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
724 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
725 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
734 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
735 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
739 case OBD_IOC_SET_READONLY: {
741 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
742 BDEVNAME_DECLARE_STORAGE(tmp);
743 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
744 obd->obd_name, ll_bdevname(obd->u.obt.obt_sb, tmp));
746 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
748 rc = fsfilt_commit(obd, inode, handle, 1);
750 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
751 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
753 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
757 case OBD_IOC_CATLOGLIST: {
758 int count = mds->mds_lov_desc.ld_tgt_count;
759 rc = llog_catalog_list(obd, count, data);
763 case OBD_IOC_LLOG_CHECK:
764 case OBD_IOC_LLOG_CANCEL:
765 case OBD_IOC_LLOG_REMOVE: {
766 struct llog_ctxt *ctxt =
767 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
770 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
771 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
772 rc = llog_ioctl(ctxt, cmd, data);
773 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
774 llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
776 rc2 = obd_set_info_async(mds->mds_osc_exp,
777 sizeof(KEY_MDS_CONN), KEY_MDS_CONN,
783 case OBD_IOC_LLOG_INFO:
784 case OBD_IOC_LLOG_PRINT: {
785 struct llog_ctxt *ctxt =
786 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
788 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
789 rc = llog_ioctl(ctxt, cmd, data);
790 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
796 case OBD_IOC_ABORT_RECOVERY:
797 CERROR("aborting recovery for device %s\n", obd->obd_name);
798 target_abort_recovery(obd);
802 CDEBUG(D_INFO, "unknown command %x\n", cmd);
809 /* Collect the preconditions we need to allow client connects */
810 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
812 if (flag & CONFIG_LOG)
813 obd->u.mds.mds_fl_cfglog = 1;
814 if (flag & CONFIG_SYNC)
815 obd->u.mds.mds_fl_synced = 1;
816 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
817 /* Open for clients */
818 obd->obd_no_conn = 0;
821 struct mds_lov_sync_info {
822 struct obd_device *mlsi_obd; /* the lov device to sync */
823 struct obd_device *mlsi_watched; /* target osc */
824 __u32 mlsi_index; /* index of target */
827 /* We only sync one osc at a time, so that we don't have to hold
828 any kind of lock on the whole mds_lov_desc, which may change
829 (grow) as a result of mds_lov_add_ost. This also avoids any
830 kind of mismatch between the lov_desc and the mds_lov_desc,
831 which are not in lock-step during lov_add_obd */
832 static int __mds_lov_synchronize(void *data)
834 struct mds_lov_sync_info *mlsi = data;
835 struct obd_device *obd = mlsi->mlsi_obd;
836 struct obd_device *watched = mlsi->mlsi_watched;
837 struct mds_obd *mds = &obd->u.mds;
838 struct obd_uuid *uuid;
839 __u32 idx = mlsi->mlsi_index;
840 struct llog_ctxt *ctxt;
844 OBD_FREE(mlsi, sizeof(*mlsi));
848 uuid = &watched->u.cli.cl_target_uuid;
851 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
853 rc = mds_lov_update_mds(obd, watched, idx);
855 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
859 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
860 KEY_MDS_CONN, 0, uuid, NULL);
864 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
868 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
870 rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
875 CERROR("%s failed at llog_origin_connect: %d\n",
876 obd_uuid2str(uuid), rc);
880 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
881 obd->obd_name, obd_uuid2str(uuid));
883 if (obd->obd_stopping)
884 GOTO(out, rc = -ENODEV);
886 rc = mds_lov_clear_orphans(mds, uuid);
888 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
889 obd_uuid2str(uuid), rc);
896 /* Deactivate it for safety */
897 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
899 if (!obd->obd_stopping && mds->mds_osc_obd &&
900 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
901 obd_notify(mds->mds_osc_obd, watched,
902 OBD_NOTIFY_INACTIVE, NULL);
904 /* We've successfully synced at least 1 OST and are ready
905 to handle client requests */
906 mds_allow_cli(obd, CONFIG_SYNC);
913 int mds_lov_synchronize(void *data)
915 struct mds_lov_sync_info *mlsi = data;
918 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
919 ptlrpc_daemonize(name);
921 RETURN(__mds_lov_synchronize(data));
924 int mds_lov_start_synchronize(struct obd_device *obd,
925 struct obd_device *watched,
926 void *data, int nonblock)
928 struct mds_lov_sync_info *mlsi;
929 struct mds_obd *mds = &obd->u.mds;
931 struct obd_uuid *uuid;
935 uuid = &watched->u.cli.cl_target_uuid;
937 OBD_ALLOC(mlsi, sizeof(*mlsi));
941 mlsi->mlsi_obd = obd;
942 mlsi->mlsi_watched = watched;
944 mlsi->mlsi_index = *(__u32 *)data;
946 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
948 /* Although class_export_get(obd->obd_self_export) would lock
949 the MDS in place, since it's only a self-export
950 it doesn't lock the LOV in place. The LOV can be disconnected
951 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
952 Simply taking an export ref on the LOV doesn't help, because it's
953 still disconnected. Taking an obd reference insures that we don't
954 disconnect the LOV. This of course means a cleanup won't
955 finish for as long as the sync is blocking. */
959 /* Synchronize in the background */
960 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
961 CLONE_VM | CLONE_FILES);
963 CERROR("%s: error starting mds_lov_synchronize: %d\n",
967 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
968 "thread=%d\n", obd->obd_name,
969 mlsi->mlsi_index, rc);
973 rc = __mds_lov_synchronize((void *)mlsi);
979 int mds_notify(struct obd_device *obd, struct obd_device *watched,
980 enum obd_notify_event ev, void *data)
986 /* We only handle these: */
987 case OBD_NOTIFY_ACTIVE:
988 case OBD_NOTIFY_SYNC:
989 case OBD_NOTIFY_SYNC_NONBLOCK:
991 case OBD_NOTIFY_CONFIG:
992 mds_allow_cli(obd, (unsigned long)data);
997 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
999 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1000 CERROR("unexpected notification of %s %s!\n",
1001 watched->obd_type->typ_name, watched->obd_name);
1005 if (obd->obd_recovering) {
1006 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1008 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1009 /* We still have to fix the lov descriptor for ost's added
1010 after the mdt in the config log. They didn't make it into
1012 mutex_down(&obd->obd_dev_sem);
1013 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1014 mutex_up(&obd->obd_dev_sem);
1015 mds_allow_cli(obd, CONFIG_SYNC);
1019 LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
1020 rc = mds_lov_start_synchronize(obd, watched, data,
1021 !(ev == OBD_NOTIFY_SYNC));
1023 lquota_recovery(mds_quota_interface_ref, obd);
1028 int mds_get_default_md(struct obd_device *obd, struct lov_mds_md *lmm,
1031 struct lov_desc *ldesc;
1034 ldesc = &obd->u.mds.mds_lov_desc;
1035 LASSERT(ldesc != NULL);
1040 lmm->lmm_magic = LOV_MAGIC_V1;
1041 lmm->lmm_object_gr = LOV_OBJECT_GROUP_DEFAULT;
1042 lmm->lmm_pattern = ldesc->ld_pattern;
1043 lmm->lmm_stripe_size = ldesc->ld_default_stripe_size;
1044 lmm->lmm_stripe_count = ldesc->ld_default_stripe_count;
1045 *size = sizeof(struct lov_mds_md);
1047 RETURN(sizeof(struct lov_mds_md));
1050 /* Convert the on-disk LOV EA structre.
1051 * We always try to convert from an old LOV EA format to the common in-memory
1052 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1053 * then convert back to the new on-disk format and save it back to disk
1054 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1055 * to convert it each time this inode is accessed.
1057 * This function is a bit interesting in the error handling. We can safely
1058 * ship the old lmm to the client in case of failure, since it uses the same
1059 * obd_unpackmd() code and can do the conversion if the MDS fails for some
1060 * reason. We will not delete the old lmm data until we have written the
1061 * new format lmm data in fsfilt_set_md(). */
1062 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1063 struct lov_mds_md *lmm, int lmm_size)
1065 struct lov_stripe_md *lsm = NULL;
1070 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1071 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1074 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1075 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1078 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1082 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1084 GOTO(conv_free, rc);
1087 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1088 if (IS_ERR(handle)) {
1089 rc = PTR_ERR(handle);
1090 GOTO(conv_free, rc);
1093 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1095 err = fsfilt_commit(obd, inode, handle, 0);
1097 rc = err ? err : lmm_size;
1098 GOTO(conv_free, rc);
1100 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);