1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 CDEBUG(D_INFO, "dump from %s\n", label);
62 if (mds->mds_lov_page_dirty == NULL) {
63 CERROR("NULL bitmap!\n");
67 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68 CDEBUG(D_INFO, "%u - %lx\n", i,
69 mds->mds_lov_page_dirty->data[i]);
71 if (mds->mds_lov_page_array == NULL) {
72 CERROR("not init page array!\n");
76 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77 obd_id *data = mds->mds_lov_page_array[i];
82 for(j=0; j < OBJID_PER_PAGE(); j++) {
85 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
93 int mds_lov_init_objids(struct obd_device *obd)
95 struct mds_obd *mds = &obd->u.mds;
96 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
101 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
103 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
104 if (mds->mds_lov_page_dirty == NULL)
108 OBD_ALLOC(mds->mds_lov_page_array, size);
109 if (mds->mds_lov_page_array == NULL)
110 GOTO(err_free_bitmap, rc = -ENOMEM);
112 /* open and test the lov objd file */
113 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
116 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
117 GOTO(err_free, rc = PTR_ERR(file));
119 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
120 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
121 file->f_dentry->d_inode->i_mode);
122 GOTO(err_open, rc = -ENOENT);
124 mds->mds_lov_objid_filp = file;
128 if (filp_close((struct file *)file, 0))
129 CERROR("can't close %s after error\n", LOV_OBJID);
131 OBD_FREE(mds->mds_lov_page_array, size);
133 FREE_BITMAP(mds->mds_lov_page_dirty);
138 void mds_lov_destroy_objids(struct obd_device *obd)
140 struct mds_obd *mds = &obd->u.mds;
144 if (mds->mds_lov_page_array != NULL) {
145 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
146 obd_id *data = mds->mds_lov_page_array[i];
148 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
150 OBD_FREE(mds->mds_lov_page_array,
151 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
154 if (mds->mds_lov_objid_filp) {
155 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
156 mds->mds_lov_objid_filp = NULL;
158 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
161 FREE_BITMAP(mds->mds_lov_page_dirty);
166 * currently exist two ways for know about ost count and max ost index.
167 * first - after ost is connected to mds and sync process finished
168 * second - get from lmm in recovery process, in case when mds not have configs,
169 * and ost isn't registered in mgs.
171 * \param mds pointer to mds structure
172 * \param index maxium ost index
174 * \retval -ENOMEM is not hame memory for new page
175 * \retval 0 is update passed
177 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
179 __u32 page = index / OBJID_PER_PAGE();
180 __u32 off = index % OBJID_PER_PAGE();
181 obd_id *data = mds->mds_lov_page_array[page];
184 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
188 mds->mds_lov_page_array[page] = data;
191 if (index > mds->mds_lov_objid_max_index) {
192 mds->mds_lov_objid_lastpage = page;
193 mds->mds_lov_objid_lastidx = off;
194 mds->mds_lov_objid_max_index = index;
197 /* workaround - New target not in objids file; increase mdsize */
198 /* ld_tgt_count is used as the max index everywhere, despite its name. */
199 if (data[off] == 0) {
203 mds->mds_lov_objid_count++;
204 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
205 mds->mds_lov_objid_count);
207 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
208 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
210 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
211 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
212 mds->mds_max_cookiesize);
219 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
221 struct lov_ost_data_v1 *data;
226 /* if we create file without objects - lmm is NULL */
230 switch (le32_to_cpu(lmm->lmm_magic)) {
232 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
233 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
236 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
237 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
240 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
245 mutex_down(&obd->obd_dev_sem);
246 for (j = 0; j < count; j++) {
247 __u32 i = le32_to_cpu(data[j].l_ost_idx);
248 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
253 mutex_up(&obd->obd_dev_sem);
257 EXPORT_SYMBOL(mds_lov_prepare_objids);
259 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
261 struct mds_obd *mds = &obd->u.mds;
263 struct lov_ost_data_v1 *obj;
267 /* if we create file without objects - lmm is NULL */
271 switch (le32_to_cpu(lmm->lmm_magic)) {
273 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
274 obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
277 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
278 obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
281 CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
285 for (j = 0; j < count; j++) {
286 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
287 obd_id id = le64_to_cpu(obj[j].l_object_id);
288 __u32 page = i / OBJID_PER_PAGE();
289 __u32 idx = i % OBJID_PER_PAGE();
292 data = mds->mds_lov_page_array[page];
294 CDEBUG(D_INODE,"update last object for ost %u"
295 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
296 if (id > data[idx]) {
298 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
304 EXPORT_SYMBOL(mds_lov_update_objids);
306 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
312 for(i = 0; i < count; i++) {
316 mds->mds_lov_objid_count++;
319 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
320 mds->mds_lov_objid_count);
322 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
323 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
325 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
326 "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
332 static int mds_lov_read_objids(struct obd_device *obd)
334 struct mds_obd *mds = &obd->u.mds;
336 int i, rc = 0, count = 0, page = 0;
340 /* Read everything in the file, even if our current lov desc
341 has fewer targets. Old targets not in the lov descriptor
342 during mds setup may still have valid objids. */
343 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
347 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
348 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
349 for (i = 0; i < page; i++) {
351 loff_t off_old = off;
353 LASSERT(mds->mds_lov_page_array[i] == NULL);
354 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
355 if (mds->mds_lov_page_array[i] == NULL)
356 GOTO(out, rc = -ENOMEM);
358 data = mds->mds_lov_page_array[i];
360 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
361 OBJID_PER_PAGE()*sizeof(obd_id), &off);
363 CERROR("Error reading objids %d\n", rc);
367 count += (off - off_old) / sizeof(obd_id);
368 if (mds_lov_update_from_read(mds, data, count)) {
369 CERROR("Can't update mds data\n");
370 GOTO(out, rc = -EIO);
376 mds->mds_lov_objid_lastpage = i;
377 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
379 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
380 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
382 mds_lov_dump_objids("read",obd);
387 int mds_lov_write_objids(struct obd_device *obd)
389 struct mds_obd *mds = &obd->u.mds;
393 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
396 mds_lov_dump_objids("write", obd);
398 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
399 obd_id *data = mds->mds_lov_page_array[i];
400 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
401 loff_t off = i * size;
403 LASSERT(data != NULL);
405 /* check for particaly filled last page */
406 if (i == mds->mds_lov_objid_lastpage)
407 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
409 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
410 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
414 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
421 EXPORT_SYMBOL(mds_lov_write_objids);
423 static int mds_lov_get_objid(struct obd_device * obd,
426 struct mds_obd *mds = &obd->u.mds;
433 page = idx / OBJID_PER_PAGE();
434 off = idx % OBJID_PER_PAGE();
435 data = mds->mds_lov_page_array[page];
437 /* We never read this lastid; ask the osc */
438 struct obd_id_info lastid;
439 __u32 size = sizeof(lastid);
442 lastid.data = &data[off];
443 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
444 KEY_LAST_ID, &size, &lastid, NULL);
448 /* workaround for clean filter */
452 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
454 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
455 idx, data, page, off, data[off]);
460 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
463 struct obdo oa = { 0 };
464 struct obd_trans_info oti = {0};
465 struct lov_stripe_md *empty_ea = NULL;
468 LASSERT(mds->mds_lov_page_array != NULL);
470 /* This create will in fact either create or destroy: If the OST is
471 * missing objects below this ID, they will be created. If it finds
472 * objects above this ID, they will be removed. */
473 memset(&oa, 0, sizeof(oa));
474 oa.o_flags = OBD_FL_DELORPHAN;
475 oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
476 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
477 if (ost_uuid != NULL)
478 oti.oti_ost_uuid = ost_uuid;
480 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
486 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
488 struct mds_obd *mds = &obd->u.mds;
490 struct obd_id_info info;
493 LASSERT(!obd->obd_recovering);
497 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
498 KEY_NEXT_ID, sizeof(info), &info, NULL);
500 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
506 /* Update the lov desc for a new size lov. */
507 static int mds_lov_update_desc(struct obd_device *obd, int idx,
508 struct obd_uuid *uuid, enum obd_notify_event ev)
510 struct mds_obd *mds = &obd->u.mds;
512 __u32 valsize = sizeof(mds->mds_lov_desc);
516 OBD_ALLOC(ld, sizeof(*ld));
520 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
525 /* Don't change the mds_lov_desc until the objids size matches the
527 mds->mds_lov_desc = *ld;
528 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
529 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
531 mutex_down(&obd->obd_dev_sem);
532 rc = mds_lov_update_max_ost(mds, idx);
533 mutex_up(&obd->obd_dev_sem);
537 /* If we added a target we have to reconnect the llogs */
538 /* We only _need_ to do this at first add (idx), or the first time
539 after recovery. However, it should now be safe to call anytime. */
540 rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
544 /*XXX this notifies the MDD until lov handling use old mds code */
545 if (obd->obd_upcall.onu_owner) {
546 LASSERT(obd->obd_upcall.onu_upcall != NULL);
547 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
548 obd->obd_upcall.onu_owner,
549 &mds->mds_mount_count);
552 OBD_FREE(ld, sizeof(*ld));
556 /* Inform MDS about new/updated target */
557 static int mds_lov_update_mds(struct obd_device *obd,
558 struct obd_device *watched,
559 __u32 idx, enum obd_notify_event ev)
561 struct mds_obd *mds = &obd->u.mds;
569 /* Don't let anyone else mess with mds_lov_objids now */
570 rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev);
574 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
575 idx, obd->obd_recovering, obd->obd_async_recov,
576 mds->mds_lov_desc.ld_tgt_count);
578 /* idx is set as data from lov_notify. */
579 if (obd->obd_recovering)
582 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
583 CERROR("index %d > count %d!\n", idx,
584 mds->mds_lov_desc.ld_tgt_count);
585 GOTO(out, rc = -EINVAL);
588 rc = mds_lov_get_objid(obd, idx);
592 page = idx / OBJID_PER_PAGE();
593 off = idx % OBJID_PER_PAGE();
594 data = mds->mds_lov_page_array[page];
596 /* We have read this lastid from disk; tell the osc.
597 Don't call this during recovery. */
598 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
600 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
601 /* Don't abort the rest of the sync */
604 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
611 /* update the LOV-OSC knowledge of the last used object id's */
612 int mds_lov_connect(struct obd_device *obd, char * lov_name)
614 struct mds_obd *mds = &obd->u.mds;
615 struct obd_connect_data *data;
619 if (IS_ERR(mds->mds_osc_obd))
620 RETURN(PTR_ERR(mds->mds_osc_obd));
622 if (mds->mds_osc_obd)
625 mds->mds_osc_obd = class_name2obd(lov_name);
626 if (!mds->mds_osc_obd) {
627 CERROR("MDS cannot locate LOV %s\n", lov_name);
628 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
632 mutex_down(&obd->obd_dev_sem);
633 rc = mds_lov_read_objids(obd);
634 mutex_up(&obd->obd_dev_sem);
636 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
640 rc = obd_register_observer(mds->mds_osc_obd, obd);
642 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
647 mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
649 OBD_ALLOC(data, sizeof(*data));
652 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
653 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
654 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
655 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM |
656 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
657 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
659 #ifdef HAVE_LRU_RESIZE_SUPPORT
660 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
662 data->ocd_version = LUSTRE_VERSION_CODE;
663 data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
664 /* send max bytes per rpc */
665 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
666 /* send the list of supported checksum types */
667 data->ocd_cksum_types = OBD_CKSUM_ALL;
668 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
669 rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
670 OBD_FREE(data, sizeof(*data));
672 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
673 mds->mds_osc_obd = ERR_PTR(rc);
677 /* I want to see a callback happen when the OBD moves to a
678 * "For General Use" state, and that's when we'll call
679 * set_nextid(). The class driver can help us here, because
680 * it can use the obd_recovering flag to determine when the
681 * the OBD is full available. */
682 /* MDD device will care about that
683 if (!obd->obd_recovering)
684 rc = mds_postrecov(obd);
689 mds->mds_osc_exp = NULL;
690 mds->mds_osc_obd = ERR_PTR(rc);
694 int mds_lov_disconnect(struct obd_device *obd)
696 struct mds_obd *mds = &obd->u.mds;
700 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
701 obd_register_observer(mds->mds_osc_obd, NULL);
703 /* The actual disconnect of the mds_lov will be called from
704 * class_disconnect_exports from mds_lov_clean. So we have to
705 * ensure that class_cleanup doesn't fail due to the extra ref
706 * we're holding now. The mechanism to do that already exists -
707 * the obd_force flag. We'll drop the final ref to the
708 * mds_osc_exp in mds_cleanup. */
709 mds->mds_osc_obd->obd_force = 1;
715 struct mds_lov_sync_info {
716 struct obd_device *mlsi_obd; /* the lov device to sync */
717 struct obd_device *mlsi_watched; /* target osc */
718 __u32 mlsi_index; /* index of target */
719 enum obd_notify_event mlsi_ev; /* event type */
722 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
724 struct mds_capa_info info = { .uuid = uuid };
725 struct lustre_capa_key *key;
730 if (!mds->mds_capa_keys)
733 for (i = 0; i < 2; i++) {
734 key = &mds->mds_capa_keys[i];
735 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
738 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
739 KEY_CAPA_KEY, sizeof(info), &info, NULL);
741 DEBUG_CAPA_KEY(D_ERROR, key,
742 "propagate failed (rc = %d) for", rc);
750 /* We only sync one osc at a time, so that we don't have to hold
751 any kind of lock on the whole mds_lov_desc, which may change
752 (grow) as a result of mds_lov_add_ost. This also avoids any
753 kind of mismatch between the lov_desc and the mds_lov_desc,
754 which are not in lock-step during lov_add_obd */
755 static int __mds_lov_synchronize(void *data)
757 struct mds_lov_sync_info *mlsi = data;
758 struct obd_device *obd = mlsi->mlsi_obd;
759 struct obd_device *watched = mlsi->mlsi_watched;
760 struct mds_obd *mds = &obd->u.mds;
761 struct obd_uuid *uuid;
762 __u32 idx = mlsi->mlsi_index;
763 enum obd_notify_event ev = mlsi->mlsi_ev;
764 struct mds_group_info mgi;
765 struct llog_ctxt *ctxt;
773 uuid = &watched->u.cli.cl_target_uuid;
776 down_read(&mds->mds_notify_lock);
777 if (obd->obd_stopping || obd->obd_fail)
778 GOTO(out, rc = -ENODEV);
780 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
781 rc = mds_lov_update_mds(obd, watched, idx, ev);
783 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
786 mgi.group = mdt_to_obd_objgrp(mds->mds_id);
789 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
790 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
793 /* propagate capability keys */
794 rc = mds_propagate_capa_keys(mds, uuid);
798 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
800 GOTO(out, rc = -ENODEV);
802 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
803 rc = llog_connect(ctxt, NULL, NULL, uuid);
806 CERROR("%s failed at llog_origin_connect: %d\n",
807 obd_uuid2str(uuid), rc);
811 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
812 obd->obd_name, obd_uuid2str(uuid));
813 rc = mds_lov_clear_orphans(mds, uuid);
815 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
816 obd_uuid2str(uuid), rc);
820 #ifdef HAVE_QUOTA_SUPPORT
821 if (obd->obd_upcall.onu_owner) {
823 * This is a hack for mds_notify->mdd_notify. When the mds obd
824 * in mdd is removed, This hack should be removed.
826 LASSERT(obd->obd_upcall.onu_upcall != NULL);
827 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
828 obd->obd_upcall.onu_owner,NULL);
833 up_read(&mds->mds_notify_lock);
835 /* Deactivate it for safety */
836 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
838 if (!obd->obd_stopping && mds->mds_osc_obd &&
839 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
840 obd_notify(mds->mds_osc_obd, watched,
841 OBD_NOTIFY_INACTIVE, NULL);
844 class_decref(obd, "mds_lov_synchronize", obd);
848 int mds_lov_synchronize(void *data)
850 struct mds_lov_sync_info *mlsi = data;
853 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
854 ptlrpc_daemonize(name);
856 RETURN(__mds_lov_synchronize(data));
859 int mds_lov_start_synchronize(struct obd_device *obd,
860 struct obd_device *watched,
861 void *data, enum obd_notify_event ev)
863 struct mds_lov_sync_info *mlsi;
865 struct obd_uuid *uuid;
869 uuid = &watched->u.cli.cl_target_uuid;
871 OBD_ALLOC(mlsi, sizeof(*mlsi));
876 mlsi->mlsi_obd = obd;
877 mlsi->mlsi_watched = watched;
878 mlsi->mlsi_index = *(__u32 *)data;
881 /* Although class_export_get(obd->obd_self_export) would lock
882 the MDS in place, since it's only a self-export
883 it doesn't lock the LOV in place. The LOV can be disconnected
884 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
885 Simply taking an export ref on the LOV doesn't help, because it's
886 still disconnected. Taking an obd reference insures that we don't
887 disconnect the LOV. This of course means a cleanup won't
888 finish for as long as the sync is blocking. */
889 class_incref(obd, "mds_lov_synchronize", obd);
891 if (ev != OBD_NOTIFY_SYNC) {
892 /* Synchronize in the background */
893 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
894 CLONE_VM | CLONE_FILES);
896 CERROR("%s: error starting mds_lov_synchronize: %d\n",
898 class_decref(obd, "mds_lov_synchronize", obd);
900 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
901 "thread=%d\n", obd->obd_name,
902 mlsi->mlsi_index, rc);
906 rc = __mds_lov_synchronize((void *)mlsi);
912 int mds_notify(struct obd_device *obd, struct obd_device *watched,
913 enum obd_notify_event ev, void *data)
918 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
921 /* We only handle these: */
922 case OBD_NOTIFY_ACTIVE:
923 /* lov want one or more _active_ targets for work */
924 /* activate event should be pass lov idx as argument */
925 case OBD_NOTIFY_SYNC:
926 case OBD_NOTIFY_SYNC_NONBLOCK:
927 /* sync event should be pass lov idx as argument */
933 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
934 CERROR("unexpected notification of %s %s!\n",
935 watched->obd_type->typ_name, watched->obd_name);
939 if (obd->obd_recovering) {
940 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
942 obd_uuid2str(&watched->u.cli.cl_target_uuid));
943 /* We still have to fix the lov descriptor for ost's added
944 after the mdt in the config log. They didn't make it into
946 rc = mds_lov_update_desc(obd, *(__u32 *)data,
947 &watched->u.cli.cl_target_uuid, ev);
949 rc = mds_lov_start_synchronize(obd, watched, data, ev);