1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
46 struct mds_obd *mds = &obd->u.mds;
49 CDEBUG(D_INFO, "dump from %s\n", label);
50 if (mds->mds_lov_page_dirty == NULL) {
51 CERROR("NULL bitmap!\n");
55 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
58 if (mds->mds_lov_page_array == NULL) {
59 CERROR("not init page array!\n");
63 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64 obd_id *data = mds->mds_lov_page_array[i];
69 for(j=0; j < OBJID_PER_PAGE(); j++) {
72 CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
79 int mds_lov_init_objids(struct obd_device *obd)
81 struct mds_obd *mds = &obd->u.mds;
82 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
87 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
89 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90 if (mds->mds_lov_page_dirty == NULL)
94 OBD_ALLOC(mds->mds_lov_page_array, size);
95 if (mds->mds_lov_page_array == NULL)
96 GOTO(err_free_bitmap, rc = -ENOMEM);
98 /* open and test the lov objd file */
99 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
102 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103 GOTO(err_free, rc = PTR_ERR(file));
105 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107 file->f_dentry->d_inode->i_mode);
108 GOTO(err_open, rc = -ENOENT);
110 mds->mds_lov_objid_filp = file;
114 if (filp_close((struct file *)file, 0))
115 CERROR("can't close %s after error\n", LOV_OBJID);
117 OBD_FREE(mds->mds_lov_page_array, size);
119 FREE_BITMAP(mds->mds_lov_page_dirty);
123 EXPORT_SYMBOL(mds_lov_init_objids);
125 void mds_lov_destroy_objids(struct obd_device *obd)
127 struct mds_obd *mds = &obd->u.mds;
131 if (mds->mds_lov_page_array != NULL) {
132 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133 obd_id *data = mds->mds_lov_page_array[i];
135 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
137 OBD_FREE(mds->mds_lov_page_array,
138 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
141 if (mds->mds_lov_objid_filp) {
142 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143 mds->mds_lov_objid_filp = NULL;
145 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
148 FREE_BITMAP(mds->mds_lov_page_dirty);
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
155 struct mds_obd *mds = &obd->u.mds;
159 /* if we create file without objects - lmm is NULL */
163 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166 int page = i / OBJID_PER_PAGE();
167 int idx = i % OBJID_PER_PAGE();
168 obd_id *data = mds->mds_lov_page_array[page];
170 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171 " old %llu\n", i, id, data[idx]);
172 if (id > data[idx]) {
174 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
179 EXPORT_SYMBOL(mds_lov_update_objids);
181 static int mds_lov_read_objids(struct obd_device *obd)
183 struct mds_obd *mds = &obd->u.mds;
185 int i, rc, count = 0, page = 0;
189 /* Read everything in the file, even if our current lov desc
190 has fewer targets. Old targets not in the lov descriptor
191 during mds setup may still have valid objids. */
192 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
196 page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197 CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
198 for(i=0; i < page; i++) {
199 obd_id *data = mds->mds_lov_page_array[i];
200 loff_t off_old = off;
202 LASSERT(data == NULL);
203 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
205 GOTO(out, rc = -ENOMEM);
207 mds->mds_lov_page_array[i] = data;
209 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210 OBJID_PER_PAGE()*sizeof(obd_id), &off);
212 CERROR("Error reading objids %d\n", rc);
218 count += (off-off_old)/sizeof(obd_id);
220 mds->mds_lov_objid_count = count;
223 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
224 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
226 CDEBUG(D_INFO, "Read %u - %u %u objid\n", count,
227 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
229 mds_lov_dump_objids("read",obd);
234 int mds_lov_write_objids(struct obd_device *obd)
236 struct mds_obd *mds = &obd->u.mds;
240 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
243 mds_lov_dump_objids("write", obd);
245 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
246 obd_id *data = mds->mds_lov_page_array[i];
247 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
248 loff_t off = i * size;
250 LASSERT(data != NULL);
252 /* check for particaly filled last page */
253 if (i == mds->mds_lov_objid_lastpage)
254 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
256 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
260 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
267 EXPORT_SYMBOL(mds_lov_write_objids);
269 static int mds_lov_get_objid(struct obd_device * obd, struct obd_export *export,
272 struct mds_obd *mds = &obd->u.mds;
279 page = idx / OBJID_PER_PAGE();
280 off = idx % OBJID_PER_PAGE();
281 data = mds->mds_lov_page_array[page];
283 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
285 GOTO(out, rc = -ENOMEM);
287 mds->mds_lov_page_array[page] = data;
290 if (data[off] == 0) {
291 /* We never read this lastid; ask the osc */
292 struct obd_id_info lastid;
293 __u32 size = sizeof(lastid);
296 lastid.data = &data[off];
297 rc = obd_get_info(export, sizeof(KEY_LAST_ID),
298 KEY_LAST_ID, &size, &lastid);
302 if (idx > mds->mds_lov_objid_count) {
303 mds->mds_lov_objid_count = idx;
304 mds->mds_lov_objid_lastpage = page;
305 mds->mds_lov_objid_lastidx = off;
307 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
313 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
317 struct obd_trans_info oti = {0};
318 struct lov_stripe_md *empty_ea = NULL;
321 LASSERT(mds->mds_lov_page_array != NULL);
323 /* This create will in fact either create or destroy: If the OST is
324 * missing objects below this ID, they will be created. If it finds
325 * objects above this ID, they will be removed. */
326 memset(&oa, 0, sizeof(oa));
327 oa.o_flags = OBD_FL_DELORPHAN;
328 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
329 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
330 if (ost_uuid != NULL) {
331 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
332 oa.o_valid |= OBD_MD_FLINLINE;
334 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
340 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
342 struct mds_obd *mds = &obd->u.mds;
344 struct obd_id_info info;
347 LASSERT(!obd->obd_recovering);
349 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
350 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
355 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
356 KEY_NEXT_ID, sizeof(info), &info, NULL);
358 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
364 static __u32 mds_lov_get_idx(struct obd_export *lov,
365 struct obd_uuid *ost_uuid)
368 int valsize = sizeof(ost_uuid);
370 rc = obd_get_info(lov, sizeof(KEY_LOV_IDX), KEY_LOV_IDX,
377 /* Update the lov desc for a new size lov. */
378 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
380 struct mds_obd *mds = &obd->u.mds;
382 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
386 OBD_ALLOC(ld, sizeof(*ld));
390 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
395 /* The size of the LOV target table may have increased. */
396 page = ld->ld_tgt_count / OBJID_PER_PAGE();
397 if (mds->mds_lov_page_array[page] == NULL) {
400 OBD_ALLOC(ids, MDS_LOV_ALLOC_SIZE);
402 GOTO(out, rc = -ENOMEM);
404 mds->mds_lov_page_array[page] = ids;
407 /* Don't change the mds_lov_desc until the objids size matches the
409 mds->mds_lov_desc = *ld;
410 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
411 mds->mds_lov_desc.ld_tgt_count);
413 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
414 mds->mds_lov_desc.ld_tgt_count);
416 mds->mds_max_mdsize = lov_mds_md_size(stripes);
417 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
418 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
419 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
422 /* If we added a target we have to reconnect the llogs */
423 /* We only _need_ to do this at first add (idx), or the first time
424 after recovery. However, it should now be safe to call anytime. */
425 rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
427 /*XXX this notifies the MDD until lov handling use old mds code */
428 if (obd->obd_upcall.onu_owner) {
429 LASSERT(obd->obd_upcall.onu_upcall != NULL);
430 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
431 obd->obd_upcall.onu_owner);
434 OBD_FREE(ld, sizeof(*ld));
439 #define MDSLOV_NO_INDEX -1
441 /* Inform MDS about new/updated target */
442 static int mds_lov_update_mds(struct obd_device *obd,
443 struct obd_device *watched,
444 __u32 idx, struct obd_uuid *uuid)
446 struct mds_obd *mds = &obd->u.mds;
455 /* Don't let anyone else mess with mds_lov_objids now */
456 mutex_down(&obd->obd_dev_sem);
458 old_count = mds->mds_lov_desc.ld_tgt_count;
459 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
463 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
464 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
465 mds->mds_lov_desc.ld_tgt_count);
467 /* idx is set as data from lov_notify. */
468 if (obd->obd_recovering)
471 /* mds post recov not know about ost index - ask lov for it */
472 if (idx == MDSLOV_NO_INDEX)
473 idx = mds_lov_get_idx(mds->mds_osc_exp, uuid);
475 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
476 CERROR("index %d > count %d!\n", idx,
477 mds->mds_lov_desc.ld_tgt_count);
478 GOTO(out, rc = -EINVAL);
481 page = idx / OBJID_PER_PAGE();
482 off = idx % OBJID_PER_PAGE();
483 data = mds->mds_lov_page_array[page];
484 CDEBUG(D_CONFIG, "idx %d - %p - %d/%d\n", idx, data, page, off);
486 if (data[off] == 0) {
487 rc = mds_lov_get_objid(obd, watched->obd_self_export, idx);
489 /* We have read this lastid from disk; tell the osc.
490 Don't call this during recovery. */
491 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
493 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
494 /* Don't abort the rest of the sync */
498 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
501 mutex_up(&obd->obd_dev_sem);
505 /* update the LOV-OSC knowledge of the last used object id's */
506 int mds_lov_connect(struct obd_device *obd, char * lov_name)
508 struct mds_obd *mds = &obd->u.mds;
509 struct lustre_handle conn = {0,};
510 struct obd_connect_data *data;
514 if (IS_ERR(mds->mds_osc_obd))
515 RETURN(PTR_ERR(mds->mds_osc_obd));
517 if (mds->mds_osc_obd)
520 mds->mds_osc_obd = class_name2obd(lov_name);
521 if (!mds->mds_osc_obd) {
522 CERROR("MDS cannot locate LOV %s\n", lov_name);
523 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
527 OBD_ALLOC(data, sizeof(*data));
530 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
531 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
532 OBD_CONNECT_OSS_CAPA;
533 #ifdef HAVE_LRU_RESIZE_SUPPORT
534 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
536 data->ocd_version = LUSTRE_VERSION_CODE;
537 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
538 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
539 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
540 OBD_FREE(data, sizeof(*data));
542 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
543 mds->mds_osc_obd = ERR_PTR(rc);
546 mds->mds_osc_exp = class_conn2export(&conn);
548 rc = obd_register_observer(mds->mds_osc_obd, obd);
550 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
552 GOTO(err_discon, rc);
555 /* Deny new client connections until we are sure we have some OSTs */
556 obd->obd_no_conn = 1;
558 mutex_down(&obd->obd_dev_sem);
559 rc = mds_lov_read_objids(obd);
561 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
565 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
569 /* If we're mounting this code for the first time on an existing FS,
570 * we need to populate the objids array from the real OST values */
571 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
572 __u32 i = mds->mds_lov_objid_count;
573 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
574 rc = mds_lov_get_objid(obd, mds->mds_osc_exp, i);
579 rc = mds_lov_write_objids(obd);
581 CERROR("got last objids from OSTs, but error "
582 "in update objids file: %d\n", rc);
584 mutex_up(&obd->obd_dev_sem);
586 /* I want to see a callback happen when the OBD moves to a
587 * "For General Use" state, and that's when we'll call
588 * set_nextid(). The class driver can help us here, because
589 * it can use the obd_recovering flag to determine when the
590 * the OBD is full available. */
591 /* MDD device will care about that
592 if (!obd->obd_recovering)
593 rc = mds_postrecov(obd);
598 mutex_up(&obd->obd_dev_sem);
599 obd_register_observer(mds->mds_osc_obd, NULL);
601 obd_disconnect(mds->mds_osc_exp);
602 mds->mds_osc_exp = NULL;
603 mds->mds_osc_obd = ERR_PTR(rc);
607 int mds_lov_disconnect(struct obd_device *obd)
609 struct mds_obd *mds = &obd->u.mds;
613 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
614 obd_register_observer(mds->mds_osc_obd, NULL);
616 /* The actual disconnect of the mds_lov will be called from
617 * class_disconnect_exports from mds_lov_clean. So we have to
618 * ensure that class_cleanup doesn't fail due to the extra ref
619 * we're holding now. The mechanism to do that already exists -
620 * the obd_force flag. We'll drop the final ref to the
621 * mds_osc_exp in mds_cleanup. */
622 mds->mds_osc_obd->obd_force = 1;
628 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
629 void *karg, void *uarg)
631 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
632 struct obd_device *obd = exp->exp_obd;
633 struct mds_obd *mds = &obd->u.mds;
634 struct obd_ioctl_data *data = karg;
635 struct lvfs_run_ctxt saved;
639 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
642 case OBD_IOC_RECORD: {
643 char *name = data->ioc_inlbuf1;
644 if (mds->mds_cfg_llh)
647 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
648 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
649 &mds->mds_cfg_llh, NULL, name);
651 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
654 mds->mds_cfg_llh = NULL;
655 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
660 case OBD_IOC_ENDRECORD: {
661 if (!mds->mds_cfg_llh)
664 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
665 rc = llog_close(mds->mds_cfg_llh);
666 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
668 mds->mds_cfg_llh = NULL;
672 case OBD_IOC_CLEAR_LOG: {
673 char *name = data->ioc_inlbuf1;
674 if (mds->mds_cfg_llh)
677 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
678 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
679 &mds->mds_cfg_llh, NULL, name);
681 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
684 rc = llog_destroy(mds->mds_cfg_llh);
685 llog_free_handle(mds->mds_cfg_llh);
687 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
689 mds->mds_cfg_llh = NULL;
693 case OBD_IOC_DORECORD: {
695 struct llog_rec_hdr rec;
696 if (!mds->mds_cfg_llh)
699 rec.lrh_len = llog_data_len(data->ioc_plen1);
701 if (data->ioc_type == LUSTRE_CFG_TYPE) {
702 rec.lrh_type = OBD_CFG_REC;
704 CERROR("unknown cfg record type:%d \n", data->ioc_type);
708 OBD_ALLOC(cfg_buf, data->ioc_plen1);
711 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
713 OBD_FREE(cfg_buf, data->ioc_plen1);
717 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
718 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
720 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
722 OBD_FREE(cfg_buf, data->ioc_plen1);
726 case OBD_IOC_PARSE: {
727 struct llog_ctxt *ctxt =
728 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
729 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
730 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
731 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
738 case OBD_IOC_DUMP_LOG: {
739 struct llog_ctxt *ctxt =
740 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
741 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
742 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
743 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
751 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
752 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
756 case OBD_IOC_SET_READONLY: {
758 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
759 BDEVNAME_DECLARE_STORAGE(tmp);
760 CERROR("*** setting device %s read-only ***\n",
761 ll_bdevname(obd->u.obt.obt_sb, tmp));
763 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
765 rc = fsfilt_commit(obd, inode, handle, 1);
767 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
768 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
770 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
774 case OBD_IOC_CATLOGLIST: {
775 int count = mds->mds_lov_desc.ld_tgt_count;
776 rc = llog_catalog_list(obd, count, data);
780 case OBD_IOC_LLOG_CHECK:
781 case OBD_IOC_LLOG_CANCEL:
782 case OBD_IOC_LLOG_REMOVE: {
783 struct llog_ctxt *ctxt =
784 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
788 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
789 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
790 rc = llog_ioctl(ctxt, cmd, data);
791 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
792 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
793 group = FILTER_GROUP_MDS0 + mds->mds_id;
794 rc2 = obd_set_info_async(mds->mds_osc_exp,
795 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
796 sizeof(group), &group, NULL);
801 case OBD_IOC_LLOG_INFO:
802 case OBD_IOC_LLOG_PRINT: {
803 struct llog_ctxt *ctxt =
804 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
806 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
807 rc = llog_ioctl(ctxt, cmd, data);
808 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
813 case OBD_IOC_ABORT_RECOVERY:
814 CERROR("aborting recovery for device %s\n", obd->obd_name);
815 target_stop_recovery_thread(obd);
819 CDEBUG(D_INFO, "unknown command %x\n", cmd);
826 /* Collect the preconditions we need to allow client connects */
827 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
829 if (flag & CONFIG_LOG)
830 obd->u.mds.mds_fl_cfglog = 1;
831 if (flag & CONFIG_SYNC)
832 obd->u.mds.mds_fl_synced = 1;
833 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
834 /* Open for clients */
835 obd->obd_no_conn = 0;
838 struct mds_lov_sync_info {
839 struct obd_device *mlsi_obd; /* the lov device to sync */
840 struct obd_device *mlsi_watched; /* target osc */
841 __u32 mlsi_index; /* index of target */
844 static int mds_propagate_capa_keys(struct mds_obd *mds)
846 struct lustre_capa_key *key;
851 if (!mds->mds_capa_keys)
854 for (i = 0; i < 2; i++) {
855 key = &mds->mds_capa_keys[i];
856 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
858 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
859 KEY_CAPA_KEY, sizeof(*key), key, NULL);
861 DEBUG_CAPA_KEY(D_ERROR, key,
862 "propagate failed (rc = %d) for", rc);
870 /* We only sync one osc at a time, so that we don't have to hold
871 any kind of lock on the whole mds_lov_desc, which may change
872 (grow) as a result of mds_lov_add_ost. This also avoids any
873 kind of mismatch between the lov_desc and the mds_lov_desc,
874 which are not in lock-step during lov_add_obd */
875 static int __mds_lov_synchronize(void *data)
877 struct mds_lov_sync_info *mlsi = data;
878 struct obd_device *obd = mlsi->mlsi_obd;
879 struct obd_device *watched = mlsi->mlsi_watched;
880 struct mds_obd *mds = &obd->u.mds;
881 struct obd_uuid *uuid;
882 __u32 idx = mlsi->mlsi_index;
883 struct mds_group_info mgi;
887 OBD_FREE(mlsi, sizeof(*mlsi));
891 uuid = &watched->u.cli.cl_target_uuid;
894 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
896 rc = mds_lov_update_mds(obd, watched, idx, uuid);
898 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
901 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
904 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
905 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
909 /* propagate capability keys */
910 rc = mds_propagate_capa_keys(mds);
914 rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
915 mds->mds_lov_desc.ld_tgt_count,
919 CERROR("%s failed at llog_origin_connect: %d\n",
920 obd_uuid2str(uuid), rc);
924 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
925 obd->obd_name, obd_uuid2str(uuid));
927 * FIXME: this obd_stopping was useless,
928 * since obd in mdt layer was set
930 if (obd->obd_stopping)
931 GOTO(out, rc = -ENODEV);
933 rc = mds_lov_clear_orphans(mds, uuid);
935 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
936 obd_uuid2str(uuid), rc);
940 if (obd->obd_upcall.onu_owner) {
942 * This is a hack for mds_notify->mdd_notify. When the mds obd
943 * in mdd is removed, This hack should be removed.
945 LASSERT(obd->obd_upcall.onu_upcall != NULL);
946 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
947 obd->obd_upcall.onu_owner);
952 /* Deactivate it for safety */
953 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
955 if (!obd->obd_stopping && mds->mds_osc_obd &&
956 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
957 obd_notify(mds->mds_osc_obd, watched,
958 OBD_NOTIFY_INACTIVE, NULL);
965 int mds_lov_synchronize(void *data)
967 struct mds_lov_sync_info *mlsi = data;
970 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
971 /* There is still a watched target,
972 but we don't know its index */
973 sprintf(name, "ll_sync_tgt");
975 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
976 ptlrpc_daemonize(name);
978 RETURN(__mds_lov_synchronize(data));
981 int mds_lov_start_synchronize(struct obd_device *obd,
982 struct obd_device *watched,
983 void *data, int nonblock)
985 struct mds_lov_sync_info *mlsi;
992 OBD_ALLOC(mlsi, sizeof(*mlsi));
996 mlsi->mlsi_obd = obd;
997 mlsi->mlsi_watched = watched;
999 mlsi->mlsi_index = *(__u32 *)data;
1001 mlsi->mlsi_index = MDSLOV_NO_INDEX;
1003 /* Although class_export_get(obd->obd_self_export) would lock
1004 the MDS in place, since it's only a self-export
1005 it doesn't lock the LOV in place. The LOV can be disconnected
1006 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1007 Simply taking an export ref on the LOV doesn't help, because it's
1008 still disconnected. Taking an obd reference insures that we don't
1009 disconnect the LOV. This of course means a cleanup won't
1010 finish for as long as the sync is blocking. */
1014 /* Synchronize in the background */
1015 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1016 CLONE_VM | CLONE_FILES);
1018 CERROR("%s: error starting mds_lov_synchronize: %d\n",
1022 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1023 "thread=%d\n", obd->obd_name,
1024 mlsi->mlsi_index, rc);
1028 rc = __mds_lov_synchronize((void *)mlsi);
1034 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1035 enum obd_notify_event ev, void *data)
1041 /* We only handle these: */
1042 case OBD_NOTIFY_ACTIVE:
1043 case OBD_NOTIFY_SYNC:
1044 case OBD_NOTIFY_SYNC_NONBLOCK:
1046 case OBD_NOTIFY_CONFIG:
1047 mds_allow_cli(obd, (unsigned int)data);
1052 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1053 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1054 CERROR("unexpected notification of %s %s!\n",
1055 watched->obd_type->typ_name, watched->obd_name);
1059 if (obd->obd_recovering) {
1060 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1062 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1063 /* We still have to fix the lov descriptor for ost's added
1064 after the mdt in the config log. They didn't make it into
1066 mutex_down(&obd->obd_dev_sem);
1067 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1069 mutex_up(&obd->obd_dev_sem);
1072 /* We should update init llog here too for replay unlink and
1073 * possiable llog init race when recovery complete */
1074 llog_cat_initialize(obd, NULL,
1075 obd->u.mds.mds_lov_desc.ld_tgt_count,
1076 &watched->u.cli.cl_target_uuid);
1077 mutex_up(&obd->obd_dev_sem);
1078 mds_allow_cli(obd, CONFIG_SYNC);
1082 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
1083 rc = mds_lov_start_synchronize(obd, watched, data,
1084 !(ev == OBD_NOTIFY_SYNC));
1086 lquota_recovery(mds_quota_interface_ref, obd);
1091 /* Convert the on-disk LOV EA structre.
1092 * We always try to convert from an old LOV EA format to the common in-memory
1093 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1094 * then convert back to the new on-disk format and save it back to disk
1095 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1096 * to convert it each time this inode is accessed.
1098 * This function is a bit interesting in the error handling. We can safely
1099 * ship the old lmm to the client in case of failure, since it uses the same
1100 * obd_unpackmd() code and can do the conversion if the MDS fails for some
1101 * reason. We will not delete the old lmm data until we have written the
1102 * new format lmm data in fsfilt_set_md(). */
1103 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1104 struct lov_mds_md *lmm, int lmm_size)
1106 struct lov_stripe_md *lsm = NULL;
1111 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1112 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1115 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1116 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1119 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1123 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1125 GOTO(conv_free, rc);
1128 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1129 if (IS_ERR(handle)) {
1130 rc = PTR_ERR(handle);
1131 GOTO(conv_free, rc);
1134 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1136 err = fsfilt_commit(obd, inode, handle, 0);
1138 rc = err ? err : lmm_size;
1139 GOTO(conv_free, rc);
1141 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);