1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
46 struct mds_obd *mds = &obd->u.mds;
49 CDEBUG(D_INFO, "dump from %s\n", label);
50 if (mds->mds_lov_page_dirty == NULL) {
51 CERROR("NULL bitmap!\n");
55 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
58 if (mds->mds_lov_page_array == NULL) {
59 CERROR("not init page array!\n");
63 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64 obd_id *data = mds->mds_lov_page_array[i];
69 for(j=0; j < OBJID_PER_PAGE(); j++) {
72 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
79 int mds_lov_init_objids(struct obd_device *obd)
81 struct mds_obd *mds = &obd->u.mds;
82 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
87 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
89 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90 if (mds->mds_lov_page_dirty == NULL)
94 OBD_ALLOC(mds->mds_lov_page_array, size);
95 if (mds->mds_lov_page_array == NULL)
96 GOTO(err_free_bitmap, rc = -ENOMEM);
98 /* open and test the lov objd file */
99 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
102 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103 GOTO(err_free, rc = PTR_ERR(file));
105 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107 file->f_dentry->d_inode->i_mode);
108 GOTO(err_open, rc = -ENOENT);
110 mds->mds_lov_objid_filp = file;
114 if (filp_close((struct file *)file, 0))
115 CERROR("can't close %s after error\n", LOV_OBJID);
117 OBD_FREE(mds->mds_lov_page_array, size);
119 FREE_BITMAP(mds->mds_lov_page_dirty);
123 EXPORT_SYMBOL(mds_lov_init_objids);
125 void mds_lov_destroy_objids(struct obd_device *obd)
127 struct mds_obd *mds = &obd->u.mds;
131 if (mds->mds_lov_page_array != NULL) {
132 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133 obd_id *data = mds->mds_lov_page_array[i];
135 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
137 OBD_FREE(mds->mds_lov_page_array,
138 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
141 if (mds->mds_lov_objid_filp) {
142 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143 mds->mds_lov_objid_filp = NULL;
145 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
148 FREE_BITMAP(mds->mds_lov_page_dirty);
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
155 struct mds_obd *mds = &obd->u.mds;
159 /* if we create file without objects - lmm is NULL */
163 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166 int page = i / OBJID_PER_PAGE();
167 int idx = i % OBJID_PER_PAGE();
168 obd_id *data = mds->mds_lov_page_array[page];
170 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171 " old %llu\n", i, id, data[idx]);
172 if (id > data[idx]) {
174 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
179 EXPORT_SYMBOL(mds_lov_update_objids);
181 static int mds_lov_read_objids(struct obd_device *obd)
183 struct mds_obd *mds = &obd->u.mds;
185 int i, rc, count = 0, page = 0;
189 /* Read everything in the file, even if our current lov desc
190 has fewer targets. Old targets not in the lov descriptor
191 during mds setup may still have valid objids. */
192 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
196 page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197 CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
198 for(i=0; i < page; i++) {
199 obd_id *data = mds->mds_lov_page_array[i];
200 loff_t off_old = off;
202 LASSERT(data == NULL);
203 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
205 GOTO(out, rc = -ENOMEM);
207 mds->mds_lov_page_array[i] = data;
209 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210 OBJID_PER_PAGE()*sizeof(obd_id), &off);
212 CERROR("Error reading objids %d\n", rc);
218 count += (off-off_old)/sizeof(obd_id);
220 mds->mds_lov_objid_count = count;
223 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
224 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
226 CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
227 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
229 mds_lov_dump_objids("read",obd);
234 int mds_lov_write_objids(struct obd_device *obd)
236 struct mds_obd *mds = &obd->u.mds;
240 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
243 mds_lov_dump_objids("write", obd);
245 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
246 obd_id *data = mds->mds_lov_page_array[i];
247 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
248 loff_t off = i * size;
250 LASSERT(data != NULL);
252 /* check for particaly filled last page */
253 if (i == mds->mds_lov_objid_lastpage)
254 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
256 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
260 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
267 EXPORT_SYMBOL(mds_lov_write_objids);
269 static int mds_lov_get_objid(struct obd_device * obd,
272 struct mds_obd *mds = &obd->u.mds;
279 page = idx / OBJID_PER_PAGE();
280 off = idx % OBJID_PER_PAGE();
281 data = mds->mds_lov_page_array[page];
283 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
285 GOTO(out, rc = -ENOMEM);
287 mds->mds_lov_page_array[page] = data;
290 if (data[off] == 0) {
291 /* We never read this lastid; ask the osc */
292 struct obd_id_info lastid;
293 __u32 size = sizeof(lastid);
296 lastid.data = &data[off];
297 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
298 KEY_LAST_ID, &size, &lastid);
302 if (idx > mds->mds_lov_objid_count) {
303 mds->mds_lov_objid_count = idx;
304 mds->mds_lov_objid_lastpage = page;
305 mds->mds_lov_objid_lastidx = off;
307 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
313 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
317 struct obd_trans_info oti = {0};
318 struct lov_stripe_md *empty_ea = NULL;
321 LASSERT(mds->mds_lov_page_array != NULL);
323 /* This create will in fact either create or destroy: If the OST is
324 * missing objects below this ID, they will be created. If it finds
325 * objects above this ID, they will be removed. */
326 memset(&oa, 0, sizeof(oa));
327 oa.o_flags = OBD_FL_DELORPHAN;
328 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
329 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
330 if (ost_uuid != NULL) {
331 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
332 oa.o_valid |= OBD_MD_FLINLINE;
334 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
340 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
342 struct mds_obd *mds = &obd->u.mds;
344 struct obd_id_info info;
347 LASSERT(!obd->obd_recovering);
349 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
350 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
354 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
355 KEY_NEXT_ID, sizeof(info), &info, NULL);
357 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
363 static __u32 mds_lov_get_idx(struct obd_export *lov,
364 struct obd_uuid *ost_uuid)
367 int valsize = sizeof(ost_uuid);
369 rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
376 /* Update the lov desc for a new size lov. */
377 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
379 struct mds_obd *mds = &obd->u.mds;
381 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
385 OBD_ALLOC(ld, sizeof(*ld));
389 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
394 /* Don't change the mds_lov_desc until the objids size matches the
396 mds->mds_lov_desc = *ld;
397 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
398 mds->mds_lov_desc.ld_tgt_count);
400 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
401 mds->mds_lov_desc.ld_tgt_count);
403 mds->mds_max_mdsize = lov_mds_md_size(stripes);
404 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
405 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
406 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
409 /* If we added a target we have to reconnect the llogs */
410 /* We only _need_ to do this at first add (idx), or the first time
411 after recovery. However, it should now be safe to call anytime. */
412 rc = llog_cat_initialize(obd, &obd->obd_olg,
413 mds->mds_lov_desc.ld_tgt_count, NULL);
415 /*XXX this notifies the MDD until lov handling use old mds code */
416 if (obd->obd_upcall.onu_owner) {
417 LASSERT(obd->obd_upcall.onu_upcall != NULL);
418 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
419 obd->obd_upcall.onu_owner);
422 OBD_FREE(ld, sizeof(*ld));
427 #define MDSLOV_NO_INDEX -1
429 /* Inform MDS about new/updated target */
430 static int mds_lov_update_mds(struct obd_device *obd,
431 struct obd_device *watched,
434 struct mds_obd *mds = &obd->u.mds;
442 /* Don't let anyone else mess with mds_lov_objids now */
443 mutex_down(&obd->obd_dev_sem);
445 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
449 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
450 idx, obd->obd_recovering, obd->obd_async_recov,
451 mds->mds_lov_desc.ld_tgt_count);
453 /* idx is set as data from lov_notify. */
454 if (obd->obd_recovering)
457 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
458 CERROR("index %d > count %d!\n", idx,
459 mds->mds_lov_desc.ld_tgt_count);
460 GOTO(out, rc = -EINVAL);
463 rc = mds_lov_get_objid(obd, idx);
467 page = idx / OBJID_PER_PAGE();
468 off = idx % OBJID_PER_PAGE();
469 data = mds->mds_lov_page_array[page];
471 /* We have read this lastid from disk; tell the osc.
472 Don't call this during recovery. */
473 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
475 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
476 /* Don't abort the rest of the sync */
479 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
483 mutex_up(&obd->obd_dev_sem);
487 /* update the LOV-OSC knowledge of the last used object id's */
488 int mds_lov_connect(struct obd_device *obd, char * lov_name)
490 struct mds_obd *mds = &obd->u.mds;
491 struct lustre_handle conn = {0,};
492 struct obd_connect_data *data;
496 if (IS_ERR(mds->mds_osc_obd))
497 RETURN(PTR_ERR(mds->mds_osc_obd));
499 if (mds->mds_osc_obd)
502 mds->mds_osc_obd = class_name2obd(lov_name);
503 if (!mds->mds_osc_obd) {
504 CERROR("MDS cannot locate LOV %s\n", lov_name);
505 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
509 OBD_ALLOC(data, sizeof(*data));
512 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
513 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
514 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID;
515 #ifdef HAVE_LRU_RESIZE_SUPPORT
516 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
518 data->ocd_version = LUSTRE_VERSION_CODE;
519 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
520 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
521 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
522 OBD_FREE(data, sizeof(*data));
524 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
525 mds->mds_osc_obd = ERR_PTR(rc);
528 mds->mds_osc_exp = class_conn2export(&conn);
530 rc = obd_register_observer(mds->mds_osc_obd, obd);
532 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
534 GOTO(err_discon, rc);
537 /* Deny new client connections until we are sure we have some OSTs */
538 obd->obd_no_conn = 1;
540 mutex_down(&obd->obd_dev_sem);
541 rc = mds_lov_read_objids(obd);
543 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
547 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
551 /* tgt_count may be 0! */
552 rc = llog_cat_initialize(obd, &obd->obd_olg,
553 mds->mds_lov_desc.ld_tgt_count, NULL);
555 CERROR("failed to initialize catalog %d\n", rc);
559 /* If we're mounting this code for the first time on an existing FS,
560 * we need to populate the objids array from the real OST values */
561 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
562 __u32 i = mds->mds_lov_objid_count;
563 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
564 rc = mds_lov_get_objid(obd, i);
569 rc = mds_lov_write_objids(obd);
571 CERROR("got last objids from OSTs, but error "
572 "in update objids file: %d\n", rc);
574 mutex_up(&obd->obd_dev_sem);
576 /* I want to see a callback happen when the OBD moves to a
577 * "For General Use" state, and that's when we'll call
578 * set_nextid(). The class driver can help us here, because
579 * it can use the obd_recovering flag to determine when the
580 * the OBD is full available. */
581 /* MDD device will care about that
582 if (!obd->obd_recovering)
583 rc = mds_postrecov(obd);
588 mutex_up(&obd->obd_dev_sem);
589 obd_register_observer(mds->mds_osc_obd, NULL);
591 obd_disconnect(mds->mds_osc_exp);
592 mds->mds_osc_exp = NULL;
593 mds->mds_osc_obd = ERR_PTR(rc);
597 int mds_lov_disconnect(struct obd_device *obd)
599 struct mds_obd *mds = &obd->u.mds;
603 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
604 obd_register_observer(mds->mds_osc_obd, NULL);
606 /* The actual disconnect of the mds_lov will be called from
607 * class_disconnect_exports from mds_lov_clean. So we have to
608 * ensure that class_cleanup doesn't fail due to the extra ref
609 * we're holding now. The mechanism to do that already exists -
610 * the obd_force flag. We'll drop the final ref to the
611 * mds_osc_exp in mds_cleanup. */
612 mds->mds_osc_obd->obd_force = 1;
618 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
619 void *karg, void *uarg)
621 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
622 struct obd_device *obd = exp->exp_obd;
623 struct mds_obd *mds = &obd->u.mds;
624 struct obd_ioctl_data *data = karg;
625 struct lvfs_run_ctxt saved;
629 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
632 case OBD_IOC_RECORD: {
633 char *name = data->ioc_inlbuf1;
634 struct llog_ctxt *ctxt;
636 if (mds->mds_cfg_llh)
639 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
640 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
641 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
644 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
647 mds->mds_cfg_llh = NULL;
648 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
653 case OBD_IOC_ENDRECORD: {
654 if (!mds->mds_cfg_llh)
657 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
658 rc = llog_close(mds->mds_cfg_llh);
659 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
661 mds->mds_cfg_llh = NULL;
665 case OBD_IOC_CLEAR_LOG: {
666 char *name = data->ioc_inlbuf1;
667 struct llog_ctxt *ctxt;
668 if (mds->mds_cfg_llh)
671 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
672 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
673 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
676 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
679 rc = llog_destroy(mds->mds_cfg_llh);
680 llog_free_handle(mds->mds_cfg_llh);
682 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
684 mds->mds_cfg_llh = NULL;
688 case OBD_IOC_DORECORD: {
690 struct llog_rec_hdr rec;
691 if (!mds->mds_cfg_llh)
694 rec.lrh_len = llog_data_len(data->ioc_plen1);
696 if (data->ioc_type == LUSTRE_CFG_TYPE) {
697 rec.lrh_type = OBD_CFG_REC;
699 CERROR("unknown cfg record type:%d \n", data->ioc_type);
703 OBD_ALLOC(cfg_buf, data->ioc_plen1);
706 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
708 OBD_FREE(cfg_buf, data->ioc_plen1);
712 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
713 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
715 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
717 OBD_FREE(cfg_buf, data->ioc_plen1);
721 case OBD_IOC_PARSE: {
722 struct llog_ctxt *ctxt =
723 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
724 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
725 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
726 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
734 case OBD_IOC_DUMP_LOG: {
735 struct llog_ctxt *ctxt =
736 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
737 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
738 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
739 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
748 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
749 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
753 case OBD_IOC_SET_READONLY: {
755 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
756 BDEVNAME_DECLARE_STORAGE(tmp);
757 CERROR("*** setting device %s read-only ***\n",
758 ll_bdevname(obd->u.obt.obt_sb, tmp));
760 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
762 rc = fsfilt_commit(obd, inode, handle, 1);
764 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
765 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
767 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
771 case OBD_IOC_CATLOGLIST: {
772 int count = mds->mds_lov_desc.ld_tgt_count;
773 rc = llog_catalog_list(obd, count, data);
777 case OBD_IOC_LLOG_CHECK:
778 case OBD_IOC_LLOG_CANCEL:
779 case OBD_IOC_LLOG_REMOVE: {
780 struct llog_ctxt *ctxt =
781 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
785 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
786 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
787 rc = llog_ioctl(ctxt, cmd, data);
788 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
789 llog_cat_initialize(obd, &obd->obd_olg,
790 mds->mds_lov_desc.ld_tgt_count, NULL);
791 group = FILTER_GROUP_MDS0 + mds->mds_id;
793 rc2 = obd_set_info_async(mds->mds_osc_exp,
794 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
795 sizeof(group), &group, NULL);
800 case OBD_IOC_LLOG_INFO:
801 case OBD_IOC_LLOG_PRINT: {
802 struct llog_ctxt *ctxt =
803 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
805 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
806 rc = llog_ioctl(ctxt, cmd, data);
807 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
813 case OBD_IOC_ABORT_RECOVERY:
814 CERROR("aborting recovery for device %s\n", obd->obd_name);
815 target_stop_recovery_thread(obd);
819 CDEBUG(D_INFO, "unknown command %x\n", cmd);
826 /* Collect the preconditions we need to allow client connects */
827 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
829 if (flag & CONFIG_LOG)
830 obd->u.mds.mds_fl_cfglog = 1;
831 if (flag & CONFIG_SYNC)
832 obd->u.mds.mds_fl_synced = 1;
833 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
834 /* Open for clients */
835 obd->obd_no_conn = 0;
838 struct mds_lov_sync_info {
839 struct obd_device *mlsi_obd; /* the lov device to sync */
840 struct obd_device *mlsi_watched; /* target osc */
841 __u32 mlsi_index; /* index of target */
844 static int mds_propagate_capa_keys(struct mds_obd *mds)
846 struct lustre_capa_key *key;
851 if (!mds->mds_capa_keys)
854 for (i = 0; i < 2; i++) {
855 key = &mds->mds_capa_keys[i];
856 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
858 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
859 KEY_CAPA_KEY, sizeof(*key), key, NULL);
861 DEBUG_CAPA_KEY(D_ERROR, key,
862 "propagate failed (rc = %d) for", rc);
870 /* We only sync one osc at a time, so that we don't have to hold
871 any kind of lock on the whole mds_lov_desc, which may change
872 (grow) as a result of mds_lov_add_ost. This also avoids any
873 kind of mismatch between the lov_desc and the mds_lov_desc,
874 which are not in lock-step during lov_add_obd */
875 static int __mds_lov_synchronize(void *data)
877 struct mds_lov_sync_info *mlsi = data;
878 struct obd_device *obd = mlsi->mlsi_obd;
879 struct obd_device *watched = mlsi->mlsi_watched;
880 struct mds_obd *mds = &obd->u.mds;
881 struct obd_uuid *uuid;
882 __u32 idx = mlsi->mlsi_index;
883 struct mds_group_info mgi;
884 struct llog_ctxt *ctxt;
888 OBD_FREE(mlsi, sizeof(*mlsi));
892 uuid = &watched->u.cli.cl_target_uuid;
895 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
897 rc = mds_lov_update_mds(obd, watched, idx);
899 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
903 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
906 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
907 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
909 /* propagate capability keys */
910 rc = mds_propagate_capa_keys(mds);
914 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
918 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
920 rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
924 CERROR("%s failed at llog_origin_connect: %d\n",
925 obd_uuid2str(uuid), rc);
929 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
930 obd->obd_name, obd_uuid2str(uuid));
932 * FIXME: this obd_stopping was useless,
933 * since obd in mdt layer was set
935 if (obd->obd_stopping)
936 GOTO(out, rc = -ENODEV);
938 rc = mds_lov_clear_orphans(mds, uuid);
940 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
941 obd_uuid2str(uuid), rc);
945 if (obd->obd_upcall.onu_owner) {
947 * This is a hack for mds_notify->mdd_notify. When the mds obd
948 * in mdd is removed, This hack should be removed.
950 LASSERT(obd->obd_upcall.onu_upcall != NULL);
951 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
952 obd->obd_upcall.onu_owner);
957 /* Deactivate it for safety */
958 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
960 if (!obd->obd_stopping && mds->mds_osc_obd &&
961 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
962 obd_notify(mds->mds_osc_obd, watched,
963 OBD_NOTIFY_INACTIVE, NULL);
970 int mds_lov_synchronize(void *data)
972 struct mds_lov_sync_info *mlsi = data;
975 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
976 ptlrpc_daemonize(name);
978 RETURN(__mds_lov_synchronize(data));
981 int mds_lov_start_synchronize(struct obd_device *obd,
982 struct obd_device *watched,
983 void *data, int nonblock)
985 struct mds_lov_sync_info *mlsi;
987 struct mds_obd *mds = &obd->u.mds;
988 struct obd_uuid *uuid;
992 uuid = &watched->u.cli.cl_target_uuid;
994 OBD_ALLOC(mlsi, sizeof(*mlsi));
998 mlsi->mlsi_obd = obd;
999 mlsi->mlsi_watched = watched;
1001 mlsi->mlsi_index = *(__u32 *)data;
1003 mlsi->mlsi_index = mds_lov_get_idx(mds->mds_osc_exp, uuid);
1005 /* Although class_export_get(obd->obd_self_export) would lock
1006 the MDS in place, since it's only a self-export
1007 it doesn't lock the LOV in place. The LOV can be disconnected
1008 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1009 Simply taking an export ref on the LOV doesn't help, because it's
1010 still disconnected. Taking an obd reference insures that we don't
1011 disconnect the LOV. This of course means a cleanup won't
1012 finish for as long as the sync is blocking. */
1016 /* Synchronize in the background */
1017 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1018 CLONE_VM | CLONE_FILES);
1020 CERROR("%s: error starting mds_lov_synchronize: %d\n",
1024 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1025 "thread=%d\n", obd->obd_name,
1026 mlsi->mlsi_index, rc);
1030 rc = __mds_lov_synchronize((void *)mlsi);
1036 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1037 enum obd_notify_event ev, void *data)
1043 /* We only handle these: */
1044 case OBD_NOTIFY_ACTIVE:
1045 case OBD_NOTIFY_SYNC:
1046 case OBD_NOTIFY_SYNC_NONBLOCK:
1048 case OBD_NOTIFY_CONFIG:
1049 mds_allow_cli(obd, (unsigned int)data);
1054 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1055 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1056 CERROR("unexpected notification of %s %s!\n",
1057 watched->obd_type->typ_name, watched->obd_name);
1061 if (obd->obd_recovering) {
1062 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1064 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1065 /* We still have to fix the lov descriptor for ost's added
1066 after the mdt in the config log. They didn't make it into
1068 mutex_down(&obd->obd_dev_sem);
1069 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1071 mutex_up(&obd->obd_dev_sem);
1074 /* We should update init llog here too for replay unlink and
1075 * possiable llog init race when recovery complete */
1076 llog_cat_initialize(obd, &obd->obd_olg,
1077 obd->u.mds.mds_lov_desc.ld_tgt_count,
1078 &watched->u.cli.cl_target_uuid);
1079 mutex_up(&obd->obd_dev_sem);
1080 mds_allow_cli(obd, CONFIG_SYNC);
1084 LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
1085 rc = mds_lov_start_synchronize(obd, watched, data,
1086 !(ev == OBD_NOTIFY_SYNC));
1088 lquota_recovery(mds_quota_interface_ref, obd);
1093 /* Convert the on-disk LOV EA structre.
1094 * We always try to convert from an old LOV EA format to the common in-memory
1095 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1096 * then convert back to the new on-disk format and save it back to disk
1097 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1098 * to convert it each time this inode is accessed.
1100 * This function is a bit interesting in the error handling. We can safely
1101 * ship the old lmm to the client in case of failure, since it uses the same
1102 * obd_unpackmd() code and can do the conversion if the MDS fails for some
1103 * reason. We will not delete the old lmm data until we have written the
1104 * new format lmm data in fsfilt_set_md(). */
1105 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1106 struct lov_mds_md *lmm, int lmm_size)
1108 struct lov_stripe_md *lsm = NULL;
1113 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1114 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1117 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1118 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1121 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1125 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1127 GOTO(conv_free, rc);
1130 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1131 if (IS_ERR(handle)) {
1132 rc = PTR_ERR(handle);
1133 GOTO(conv_free, rc);
1136 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1138 err = fsfilt_commit(obd, inode, handle, 0);
1140 rc = err ? err : lmm_size;
1141 GOTO(conv_free, rc);
1143 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);