1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
46 struct mds_obd *mds = &obd->u.mds;
49 CDEBUG(D_INFO, "dump from %s\n", label);
50 if (mds->mds_lov_page_dirty == NULL) {
51 CERROR("NULL bitmap!\n");
55 for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
58 if (mds->mds_lov_page_array == NULL) {
59 CERROR("not init page array!\n");
63 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64 obd_id *data = mds->mds_lov_page_array[i];
69 for(j=0; j < OBJID_PER_PAGE(); j++) {
72 CDEBUG(D_INFO,"objid page %u idx %u - %llu ",i,j,data[j]);
79 int mds_lov_init_objids(struct obd_device *obd)
81 struct mds_obd *mds = &obd->u.mds;
82 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
87 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
89 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90 if (mds->mds_lov_page_dirty == NULL)
94 OBD_ALLOC(mds->mds_lov_page_array, size);
95 if (mds->mds_lov_page_array == NULL)
96 GOTO(err_free_bitmap, rc = -ENOMEM);
98 /* open and test the lov objd file */
99 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
102 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103 GOTO(err_free, rc = PTR_ERR(file));
105 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107 file->f_dentry->d_inode->i_mode);
108 GOTO(err_open, rc = -ENOENT);
110 mds->mds_lov_objid_filp = file;
114 if (filp_close((struct file *)file, 0))
115 CERROR("can't close %s after error\n", LOV_OBJID);
117 OBD_FREE(mds->mds_lov_page_array, size);
119 FREE_BITMAP(mds->mds_lov_page_dirty);
123 EXPORT_SYMBOL(mds_lov_init_objids);
125 void mds_lov_destroy_objids(struct obd_device *obd)
127 struct mds_obd *mds = &obd->u.mds;
131 if (mds->mds_lov_page_array != NULL) {
132 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133 obd_id *data = mds->mds_lov_page_array[i];
135 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
137 OBD_FREE(mds->mds_lov_page_array,
138 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
141 if (mds->mds_lov_objid_filp) {
142 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143 mds->mds_lov_objid_filp = NULL;
145 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
148 FREE_BITMAP(mds->mds_lov_page_dirty);
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
155 struct mds_obd *mds = &obd->u.mds;
159 /* if we create file without objects - lmm is NULL */
163 for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166 int page = i / OBJID_PER_PAGE();
167 int idx = i % OBJID_PER_PAGE();
168 obd_id *data = mds->mds_lov_page_array[page];
170 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171 " old %llu\n", i, id, data[idx]);
172 if (id > data[idx]) {
174 bitmap_set(mds->mds_lov_page_dirty, page);
179 EXPORT_SYMBOL(mds_lov_update_objids);
181 static int mds_lov_read_objids(struct obd_device *obd)
183 struct mds_obd *mds = &obd->u.mds;
185 int i, rc, count = 0, page = 0;
189 /* Read everything in the file, even if our current lov desc
190 has fewer targets. Old targets not in the lov descriptor
191 during mds setup may still have valid objids. */
192 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
196 page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197 CDEBUG(D_INFO, "file size %d pages %d\n", size, page);
198 for(i=0; i < page; i++) {
199 obd_id *data = mds->mds_lov_page_array[i];
200 loff_t off_old = off;
202 LASSERT(data == NULL);
203 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
205 GOTO(out, rc = -ENOMEM);
207 mds->mds_lov_page_array[i] = data;
209 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210 OBJID_PER_PAGE()*sizeof(obd_id), &off);
212 CERROR("Error reading objids %d\n", rc);
218 count += (off-off_old+sizeof(obd_id)-1)/sizeof(obd_id);
220 mds->mds_lov_objid_count = count;
221 mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
222 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
223 CDEBUG(D_INFO, "Read %u objid\n", count);
225 mds_lov_dump_objids("read",obd);
230 int mds_lov_write_objids(struct obd_device *obd)
232 struct mds_obd *mds = &obd->u.mds;
236 if (bitmap_check_empty(mds->mds_lov_page_dirty))
239 mds_lov_dump_objids("write", obd);
241 foreach_bit(mds->mds_lov_page_dirty, i) {
242 obd_id *data = mds->mds_lov_page_array[i];
243 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
244 loff_t off = i * size;
246 LASSERT(data != NULL);
248 /* check for particaly filled last page */
249 if (i == mds->mds_lov_objid_lastpage) {
250 size = mds->mds_lov_objid_lastidx * sizeof(obd_id);
253 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
257 bitmap_clear(mds->mds_lov_page_dirty, i);
264 EXPORT_SYMBOL(mds_lov_write_objids);
266 static int mds_lov_get_objid(struct obd_device * obd, struct obd_export *export,
269 struct mds_obd *mds = &obd->u.mds;
276 page = idx / OBJID_PER_PAGE();
277 off = idx % OBJID_PER_PAGE();
278 data = mds->mds_lov_page_array[page];
280 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
282 GOTO(out, rc = -ENOMEM);
284 mds->mds_lov_page_array[page] = data;
287 if (data[off] == 0) {
288 /* We never read this lastid; ask the osc */
289 struct obd_id_info lastid;
290 __u32 size = sizeof(lastid);
293 lastid.data = &data[off];
294 rc = obd_get_info(export, sizeof("last_id"),
295 "last_id", &size, &lastid);
299 if (idx > mds->mds_lov_objid_count) {
300 mds->mds_lov_objid_count = idx;
301 mds->mds_lov_objid_lastpage = page;
302 mds->mds_lov_objid_lastidx = off;
304 bitmap_set(mds->mds_lov_page_dirty, page);
310 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
314 struct obd_trans_info oti = {0};
315 struct lov_stripe_md *empty_ea = NULL;
318 LASSERT(mds->mds_lov_page_array != NULL);
320 /* This create will in fact either create or destroy: If the OST is
321 * missing objects below this ID, they will be created. If it finds
322 * objects above this ID, they will be removed. */
323 memset(&oa, 0, sizeof(oa));
324 oa.o_flags = OBD_FL_DELORPHAN;
325 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
326 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
327 if (ost_uuid != NULL) {
328 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
329 oa.o_valid |= OBD_MD_FLINLINE;
331 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
335 /* update the LOV-OSC knowledge of the last used object id's */
336 /* for all targets */
337 /* is we realy need this ? all osc's should be pass via __mds_lov_synchronize
339 #define MDS_LOV_SETID_COUNT (CFS_PAGE_SIZE / sizeof(struct obd_id_info))
341 int mds_lov_set_nextid(struct obd_device *obd)
343 struct mds_obd *mds = &obd->u.mds;
344 int i = 0, j, rc = 0;
345 struct obd_id_info *info;
348 LASSERT(!obd->obd_recovering);
350 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
351 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
353 OBD_ALLOC(info, CFS_PAGE_SIZE);
357 while(i < mds->mds_lov_desc.ld_tgt_count) {
358 for(j=0; j < MDS_LOV_SETID_COUNT; i++, j++) {
359 int page = i / OBJID_PER_PAGE();
360 int idx = i % OBJID_PER_PAGE();
361 obd_id *data = mds->mds_lov_page_array[page];
363 if (i == mds->mds_lov_desc.ld_tgt_count)
367 info[j].data = &data[idx];
370 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
371 KEY_NEXT_ID, sizeof(info), &info, NULL);
373 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
378 OBD_FREE(info, CFS_PAGE_SIZE);
385 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
387 struct mds_obd *mds = &obd->u.mds;
389 struct obd_id_info info;
392 LASSERT(!obd->obd_recovering);
394 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
395 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
400 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
401 KEY_NEXT_ID, sizeof(info), &info, NULL);
403 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
409 /* Update the lov desc for a new size lov. */
410 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
412 struct mds_obd *mds = &obd->u.mds;
414 __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
418 OBD_ALLOC(ld, sizeof(*ld));
422 rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
427 /* The size of the LOV target table may have increased. */
428 page = ld->ld_tgt_count / OBJID_PER_PAGE();
429 if (mds->mds_lov_page_array[page] == NULL) {
432 OBD_ALLOC(ids, MDS_LOV_ALLOC_SIZE);
434 GOTO(out, rc = -ENOMEM);
436 mds->mds_lov_page_array[page] = ids;
439 /* Don't change the mds_lov_desc until the objids size matches the
441 mds->mds_lov_desc = *ld;
442 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
443 mds->mds_lov_desc.ld_tgt_count);
445 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
446 mds->mds_lov_desc.ld_tgt_count);
448 mds->mds_max_mdsize = lov_mds_md_size(stripes);
449 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
450 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
451 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
454 /* If we added a target we have to reconnect the llogs */
455 /* We only _need_ to do this at first add (idx), or the first time
456 after recovery. However, it should now be safe to call anytime. */
457 rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
459 /*XXX this notifies the MDD until lov handling use old mds code */
460 if (obd->obd_upcall.onu_owner) {
461 LASSERT(obd->obd_upcall.onu_upcall != NULL);
462 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
463 obd->obd_upcall.onu_owner);
466 OBD_FREE(ld, sizeof(*ld));
471 #define MDSLOV_NO_INDEX -1
473 /* Inform MDS about new/updated target */
474 static int mds_lov_update_mds(struct obd_device *obd,
475 struct obd_device *watched,
476 __u32 idx, struct obd_uuid *uuid)
478 struct mds_obd *mds = &obd->u.mds;
487 /* Don't let anyone else mess with mds_lov_objids now */
488 mutex_down(&obd->obd_dev_sem);
490 old_count = mds->mds_lov_desc.ld_tgt_count;
491 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
495 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
496 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
497 mds->mds_lov_desc.ld_tgt_count);
499 /* idx is set as data from lov_notify. */
500 if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
503 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
504 CERROR("index %d > count %d!\n", idx,
505 mds->mds_lov_desc.ld_tgt_count);
506 GOTO(out, rc = -EINVAL);
509 page = idx / OBJID_PER_PAGE();
510 off = idx % OBJID_PER_PAGE();
511 data = mds->mds_lov_page_array[page];
512 CDEBUG(D_CONFIG, "idx %d - %p - %d/%d\n", idx, data, page, off);
513 if (data[off] == 0) {
514 rc = mds_lov_get_objid(obd, watched->obd_self_export, idx);
516 /* We have read this lastid from disk; tell the osc.
517 Don't call this during recovery. */
518 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
520 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
521 /* Don't abort the rest of the sync */
526 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
529 mutex_up(&obd->obd_dev_sem);
533 /* update the LOV-OSC knowledge of the last used object id's */
534 int mds_lov_connect(struct obd_device *obd, char * lov_name)
536 struct mds_obd *mds = &obd->u.mds;
537 struct lustre_handle conn = {0,};
538 struct obd_connect_data *data;
542 if (IS_ERR(mds->mds_osc_obd))
543 RETURN(PTR_ERR(mds->mds_osc_obd));
545 if (mds->mds_osc_obd)
548 mds->mds_osc_obd = class_name2obd(lov_name);
549 if (!mds->mds_osc_obd) {
550 CERROR("MDS cannot locate LOV %s\n", lov_name);
551 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
555 OBD_ALLOC(data, sizeof(*data));
558 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
559 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
560 OBD_CONNECT_OSS_CAPA;
561 #ifdef HAVE_LRU_RESIZE_SUPPORT
562 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
564 data->ocd_version = LUSTRE_VERSION_CODE;
565 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
566 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
567 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
568 OBD_FREE(data, sizeof(*data));
570 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
571 mds->mds_osc_obd = ERR_PTR(rc);
574 mds->mds_osc_exp = class_conn2export(&conn);
576 rc = obd_register_observer(mds->mds_osc_obd, obd);
578 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
580 GOTO(err_discon, rc);
583 /* Deny new client connections until we are sure we have some OSTs */
584 obd->obd_no_conn = 1;
586 mutex_down(&obd->obd_dev_sem);
587 rc = mds_lov_read_objids(obd);
589 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
593 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
597 /* If we're mounting this code for the first time on an existing FS,
598 * we need to populate the objids array from the real OST values */
599 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
600 __u32 i = mds->mds_lov_objid_count;
601 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
602 rc = mds_lov_get_objid(obd, mds->mds_osc_exp, i);
607 rc = mds_lov_write_objids(obd);
609 CERROR("got last objids from OSTs, but error "
610 "in update objids file: %d\n", rc);
612 mutex_up(&obd->obd_dev_sem);
614 /* I want to see a callback happen when the OBD moves to a
615 * "For General Use" state, and that's when we'll call
616 * set_nextid(). The class driver can help us here, because
617 * it can use the obd_recovering flag to determine when the
618 * the OBD is full available. */
619 /* MDD device will care about that
620 if (!obd->obd_recovering)
621 rc = mds_postrecov(obd);
626 mutex_up(&obd->obd_dev_sem);
627 obd_register_observer(mds->mds_osc_obd, NULL);
629 obd_disconnect(mds->mds_osc_exp);
630 mds->mds_osc_exp = NULL;
631 mds->mds_osc_obd = ERR_PTR(rc);
635 int mds_lov_disconnect(struct obd_device *obd)
637 struct mds_obd *mds = &obd->u.mds;
641 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
642 obd_register_observer(mds->mds_osc_obd, NULL);
644 /* The actual disconnect of the mds_lov will be called from
645 * class_disconnect_exports from mds_lov_clean. So we have to
646 * ensure that class_cleanup doesn't fail due to the extra ref
647 * we're holding now. The mechanism to do that already exists -
648 * the obd_force flag. We'll drop the final ref to the
649 * mds_osc_exp in mds_cleanup. */
650 mds->mds_osc_obd->obd_force = 1;
656 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
657 void *karg, void *uarg)
659 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
660 struct obd_device *obd = exp->exp_obd;
661 struct mds_obd *mds = &obd->u.mds;
662 struct obd_ioctl_data *data = karg;
663 struct lvfs_run_ctxt saved;
667 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
670 case OBD_IOC_RECORD: {
671 char *name = data->ioc_inlbuf1;
672 if (mds->mds_cfg_llh)
675 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
676 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
677 &mds->mds_cfg_llh, NULL, name);
679 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
682 mds->mds_cfg_llh = NULL;
683 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
688 case OBD_IOC_ENDRECORD: {
689 if (!mds->mds_cfg_llh)
692 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
693 rc = llog_close(mds->mds_cfg_llh);
694 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
696 mds->mds_cfg_llh = NULL;
700 case OBD_IOC_CLEAR_LOG: {
701 char *name = data->ioc_inlbuf1;
702 if (mds->mds_cfg_llh)
705 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
706 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
707 &mds->mds_cfg_llh, NULL, name);
709 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
712 rc = llog_destroy(mds->mds_cfg_llh);
713 llog_free_handle(mds->mds_cfg_llh);
715 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
717 mds->mds_cfg_llh = NULL;
721 case OBD_IOC_DORECORD: {
723 struct llog_rec_hdr rec;
724 if (!mds->mds_cfg_llh)
727 rec.lrh_len = llog_data_len(data->ioc_plen1);
729 if (data->ioc_type == LUSTRE_CFG_TYPE) {
730 rec.lrh_type = OBD_CFG_REC;
732 CERROR("unknown cfg record type:%d \n", data->ioc_type);
736 OBD_ALLOC(cfg_buf, data->ioc_plen1);
739 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
741 OBD_FREE(cfg_buf, data->ioc_plen1);
745 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
746 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
748 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
750 OBD_FREE(cfg_buf, data->ioc_plen1);
754 case OBD_IOC_PARSE: {
755 struct llog_ctxt *ctxt =
756 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
757 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
758 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
759 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
766 case OBD_IOC_DUMP_LOG: {
767 struct llog_ctxt *ctxt =
768 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
769 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
770 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
771 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
779 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
780 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
784 case OBD_IOC_SET_READONLY: {
786 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
787 BDEVNAME_DECLARE_STORAGE(tmp);
788 CERROR("*** setting device %s read-only ***\n",
789 ll_bdevname(obd->u.obt.obt_sb, tmp));
791 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
793 rc = fsfilt_commit(obd, inode, handle, 1);
795 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
796 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
798 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
802 case OBD_IOC_CATLOGLIST: {
803 int count = mds->mds_lov_desc.ld_tgt_count;
804 rc = llog_catalog_list(obd, count, data);
808 case OBD_IOC_LLOG_CHECK:
809 case OBD_IOC_LLOG_CANCEL:
810 case OBD_IOC_LLOG_REMOVE: {
811 struct llog_ctxt *ctxt =
812 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
816 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
817 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
818 rc = llog_ioctl(ctxt, cmd, data);
819 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
820 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
821 group = FILTER_GROUP_MDS0 + mds->mds_id;
822 rc2 = obd_set_info_async(mds->mds_osc_exp,
823 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
824 sizeof(group), &group, NULL);
829 case OBD_IOC_LLOG_INFO:
830 case OBD_IOC_LLOG_PRINT: {
831 struct llog_ctxt *ctxt =
832 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
834 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
835 rc = llog_ioctl(ctxt, cmd, data);
836 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
841 case OBD_IOC_ABORT_RECOVERY:
842 CERROR("aborting recovery for device %s\n", obd->obd_name);
843 target_stop_recovery_thread(obd);
847 CDEBUG(D_INFO, "unknown command %x\n", cmd);
854 /* Collect the preconditions we need to allow client connects */
855 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
857 if (flag & CONFIG_LOG)
858 obd->u.mds.mds_fl_cfglog = 1;
859 if (flag & CONFIG_SYNC)
860 obd->u.mds.mds_fl_synced = 1;
861 if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
862 /* Open for clients */
863 obd->obd_no_conn = 0;
866 struct mds_lov_sync_info {
867 struct obd_device *mlsi_obd; /* the lov device to sync */
868 struct obd_device *mlsi_watched; /* target osc */
869 __u32 mlsi_index; /* index of target */
872 static int mds_propagate_capa_keys(struct mds_obd *mds)
874 struct lustre_capa_key *key;
879 if (!mds->mds_capa_keys)
882 for (i = 0; i < 2; i++) {
883 key = &mds->mds_capa_keys[i];
884 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
886 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
887 KEY_CAPA_KEY, sizeof(*key), key, NULL);
889 DEBUG_CAPA_KEY(D_ERROR, key,
890 "propagate failed (rc = %d) for", rc);
898 /* We only sync one osc at a time, so that we don't have to hold
899 any kind of lock on the whole mds_lov_desc, which may change
900 (grow) as a result of mds_lov_add_ost. This also avoids any
901 kind of mismatch between the lov_desc and the mds_lov_desc,
902 which are not in lock-step during lov_add_obd */
903 static int __mds_lov_synchronize(void *data)
905 struct mds_lov_sync_info *mlsi = data;
906 struct obd_device *obd = mlsi->mlsi_obd;
907 struct obd_device *watched = mlsi->mlsi_watched;
908 struct mds_obd *mds = &obd->u.mds;
909 struct obd_uuid *uuid;
910 __u32 idx = mlsi->mlsi_index;
911 struct mds_group_info mgi;
915 OBD_FREE(mlsi, sizeof(*mlsi));
919 uuid = &watched->u.cli.cl_target_uuid;
922 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
924 rc = mds_lov_update_mds(obd, watched, idx, uuid);
926 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
929 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
932 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
933 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
937 /* propagate capability keys */
938 rc = mds_propagate_capa_keys(mds);
942 rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
943 mds->mds_lov_desc.ld_tgt_count,
947 CERROR("%s failed at llog_origin_connect: %d\n",
948 obd_uuid2str(uuid), rc);
952 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
953 obd->obd_name, obd_uuid2str(uuid));
955 * FIXME: this obd_stopping was useless,
956 * since obd in mdt layer was set
958 if (obd->obd_stopping)
959 GOTO(out, rc = -ENODEV);
961 rc = mds_lov_clear_orphans(mds, uuid);
963 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
964 obd_uuid2str(uuid), rc);
968 if (obd->obd_upcall.onu_owner) {
970 * This is a hack for mds_notify->mdd_notify. When the mds obd
971 * in mdd is removed, This hack should be removed.
973 LASSERT(obd->obd_upcall.onu_upcall != NULL);
974 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
975 obd->obd_upcall.onu_owner);
980 /* Deactivate it for safety */
981 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
983 if (!obd->obd_stopping && mds->mds_osc_obd &&
984 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
985 obd_notify(mds->mds_osc_obd, watched,
986 OBD_NOTIFY_INACTIVE, NULL);
993 int mds_lov_synchronize(void *data)
995 struct mds_lov_sync_info *mlsi = data;
998 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
999 /* There is still a watched target,
1000 but we don't know its index */
1001 sprintf(name, "ll_sync_tgt");
1003 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1004 ptlrpc_daemonize(name);
1006 RETURN(__mds_lov_synchronize(data));
1009 int mds_lov_start_synchronize(struct obd_device *obd,
1010 struct obd_device *watched,
1011 void *data, int nonblock)
1013 struct mds_lov_sync_info *mlsi;
1020 OBD_ALLOC(mlsi, sizeof(*mlsi));
1024 mlsi->mlsi_obd = obd;
1025 mlsi->mlsi_watched = watched;
1027 mlsi->mlsi_index = *(__u32 *)data;
1029 mlsi->mlsi_index = MDSLOV_NO_INDEX;
1031 /* Although class_export_get(obd->obd_self_export) would lock
1032 the MDS in place, since it's only a self-export
1033 it doesn't lock the LOV in place. The LOV can be disconnected
1034 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1035 Simply taking an export ref on the LOV doesn't help, because it's
1036 still disconnected. Taking an obd reference insures that we don't
1037 disconnect the LOV. This of course means a cleanup won't
1038 finish for as long as the sync is blocking. */
1042 /* Synchronize in the background */
1043 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1044 CLONE_VM | CLONE_FILES);
1046 CERROR("%s: error starting mds_lov_synchronize: %d\n",
1050 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1051 "thread=%d\n", obd->obd_name,
1052 mlsi->mlsi_index, rc);
1056 rc = __mds_lov_synchronize((void *)mlsi);
1062 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1063 enum obd_notify_event ev, void *data)
1069 /* We only handle these: */
1070 case OBD_NOTIFY_ACTIVE:
1071 case OBD_NOTIFY_SYNC:
1072 case OBD_NOTIFY_SYNC_NONBLOCK:
1074 case OBD_NOTIFY_CONFIG:
1075 mds_allow_cli(obd, (unsigned int)data);
1080 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1081 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1082 CERROR("unexpected notification of %s %s!\n",
1083 watched->obd_type->typ_name, watched->obd_name);
1087 if (obd->obd_recovering) {
1088 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1090 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1091 /* We still have to fix the lov descriptor for ost's added
1092 after the mdt in the config log. They didn't make it into
1094 mutex_down(&obd->obd_dev_sem);
1095 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1097 mutex_up(&obd->obd_dev_sem);
1100 /* We should update init llog here too for replay unlink and
1101 * possiable llog init race when recovery complete */
1102 llog_cat_initialize(obd, NULL,
1103 obd->u.mds.mds_lov_desc.ld_tgt_count,
1104 &watched->u.cli.cl_target_uuid);
1105 mutex_up(&obd->obd_dev_sem);
1106 mds_allow_cli(obd, CONFIG_SYNC);
1110 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
1111 rc = mds_lov_start_synchronize(obd, watched, data,
1112 !(ev == OBD_NOTIFY_SYNC));
1114 lquota_recovery(mds_quota_interface_ref, obd);
1119 /* Convert the on-disk LOV EA structre.
1120 * We always try to convert from an old LOV EA format to the common in-memory
1121 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1122 * then convert back to the new on-disk format and save it back to disk
1123 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1124 * to convert it each time this inode is accessed.
1126 * This function is a bit interesting in the error handling. We can safely
1127 * ship the old lmm to the client in case of failure, since it uses the same
1128 * obd_unpackmd() code and can do the conversion if the MDS fails for some
1129 * reason. We will not delete the old lmm data until we have written the
1130 * new format lmm data in fsfilt_set_md(). */
1131 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1132 struct lov_mds_md *lmm, int lmm_size)
1134 struct lov_stripe_md *lsm = NULL;
1139 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1140 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1143 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1144 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1147 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1151 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1153 GOTO(conv_free, rc);
1156 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1157 if (IS_ERR(handle)) {
1158 rc = PTR_ERR(handle);
1159 GOTO(conv_free, rc);
1162 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1164 err = fsfilt_commit(obd, inode, handle, 0);
1166 rc = err ? err : lmm_size;
1167 GOTO(conv_free, rc);
1169 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);