1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 CDEBUG(D_INFO, "dump from %s\n", label);
62 if (mds->mds_lov_page_dirty == NULL) {
63 CERROR("NULL bitmap!\n");
67 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68 CDEBUG(D_INFO, "%u - %lx\n", i,
69 mds->mds_lov_page_dirty->data[i]);
71 if (mds->mds_lov_page_array == NULL) {
72 CERROR("not init page array!\n");
76 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77 obd_id *data = mds->mds_lov_page_array[i];
82 for(j=0; j < OBJID_PER_PAGE(); j++) {
85 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
93 int mds_lov_init_objids(struct obd_device *obd)
95 struct mds_obd *mds = &obd->u.mds;
96 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
101 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
103 mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
104 if (mds->mds_lov_page_dirty == NULL)
108 OBD_ALLOC(mds->mds_lov_page_array, size);
109 if (mds->mds_lov_page_array == NULL)
110 GOTO(err_free_bitmap, rc = -ENOMEM);
112 /* open and test the lov objd file */
113 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
116 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
117 GOTO(err_free, rc = PTR_ERR(file));
119 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
120 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
121 file->f_dentry->d_inode->i_mode);
122 GOTO(err_open, rc = -ENOENT);
124 mds->mds_lov_objid_filp = file;
128 if (filp_close((struct file *)file, 0))
129 CERROR("can't close %s after error\n", LOV_OBJID);
131 OBD_FREE(mds->mds_lov_page_array, size);
133 FREE_BITMAP(mds->mds_lov_page_dirty);
138 void mds_lov_destroy_objids(struct obd_device *obd)
140 struct mds_obd *mds = &obd->u.mds;
144 if (mds->mds_lov_page_array != NULL) {
145 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
146 obd_id *data = mds->mds_lov_page_array[i];
148 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
150 OBD_FREE(mds->mds_lov_page_array,
151 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
154 if (mds->mds_lov_objid_filp) {
155 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
156 mds->mds_lov_objid_filp = NULL;
158 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
161 FREE_BITMAP(mds->mds_lov_page_dirty);
166 * currently exist two ways for know about ost count and max ost index.
167 * first - after ost is connected to mds and sync process finished
168 * second - get from lmm in recovery process, in case when mds not have configs,
169 * and ost isn't registered in mgs.
171 * \param mds pointer to mds structure
172 * \param index maxium ost index
174 * \retval -ENOMEM is not hame memory for new page
175 * \retval 0 is update passed
177 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
179 __u32 page = index / OBJID_PER_PAGE();
180 __u32 off = index % OBJID_PER_PAGE();
181 obd_id *data = mds->mds_lov_page_array[page];
184 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
188 mds->mds_lov_page_array[page] = data;
191 if (index > mds->mds_lov_objid_max_index) {
192 mds->mds_lov_objid_lastpage = page;
193 mds->mds_lov_objid_lastidx = off;
194 mds->mds_lov_objid_max_index = index;
197 /* workaround - New target not in objids file; increase mdsize */
198 /* ld_tgt_count is used as the max index everywhere, despite its name. */
199 if (data[off] == 0) {
203 mds->mds_lov_objid_count++;
204 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
205 mds->mds_lov_objid_count);
207 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
208 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
210 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
211 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
212 mds->mds_max_cookiesize);
219 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
221 struct lov_ost_data_v1 *data;
226 /* if we create file without objects - lmm is NULL */
230 switch (le32_to_cpu(lmm->lmm_magic)) {
232 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
233 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
236 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
237 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
240 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
245 mutex_down(&obd->obd_dev_sem);
246 for (j = 0; j < count; j++) {
247 __u32 i = le32_to_cpu(data[j].l_ost_idx);
248 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
253 mutex_up(&obd->obd_dev_sem);
257 EXPORT_SYMBOL(mds_lov_prepare_objids);
260 * write llog orphan record about lost ost object,
261 * Special lsm is allocated with single stripe, caller should deallocated it
264 static int mds_log_lost_precreated(struct obd_device *obd,
265 struct lov_stripe_md **lsmp, int *stripes,
266 obd_id id, obd_count count, int idx)
268 struct lov_stripe_md *lsm = *lsmp;
273 rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
276 /* need only one stripe, save old value */
277 *stripes = lsm->lsm_stripe_count;
278 lsm->lsm_stripe_count = 1;
282 lsm->lsm_oinfo[0]->loi_id = id;
283 lsm->lsm_oinfo[0]->loi_gr = mdt_to_obd_objgrp(obd->u.mds.mds_id);
284 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
286 rc = mds_log_op_orphan(obd, lsm, count);
290 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
292 struct mds_obd *mds = &obd->u.mds;
294 struct lov_ost_data_v1 *obj;
295 struct lov_stripe_md *lsm = NULL;
300 /* if we create file without objects - lmm is NULL */
304 switch (le32_to_cpu(lmm->lmm_magic)) {
306 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
307 obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
310 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
311 obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
314 CERROR("Unknow lmm type %X !\n",
315 le32_to_cpu(lmm->lmm_magic));
319 for (j = 0; j < count; j++) {
320 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
321 obd_id id = le64_to_cpu(obj[j].l_object_id);
322 __u32 page = i / OBJID_PER_PAGE();
323 __u32 idx = i % OBJID_PER_PAGE();
326 data = mds->mds_lov_page_array[page];
328 CDEBUG(D_INODE,"update last object for ost %u"
329 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
330 if (id > data[idx]) {
331 int lost = id - data[idx] - 1;
332 /* we might have lost precreated objects due to VBR */
333 if (lost > 0 && obd->obd_recovering) {
334 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
335 if (!obd->obd_version_recov)
336 CERROR("Unexpected gap in objids\n");
337 /* lsm is allocated if NULL */
338 mds_log_lost_precreated(obd, &lsm, &stripes,
339 data[idx]+1, lost, i);
342 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
346 /* restore stripes number */
347 lsm->lsm_stripe_count = stripes;
348 obd_free_memmd(mds->mds_osc_exp, &lsm);
353 EXPORT_SYMBOL(mds_lov_update_objids);
355 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
361 for (i = 0; i < count; i++) {
365 mds->mds_lov_objid_count++;
368 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
369 mds->mds_lov_objid_count);
371 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
372 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
374 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
375 "%d/%d\n", stripes, mds->mds_max_mdsize,
376 mds->mds_max_cookiesize);
382 static int mds_lov_read_objids(struct obd_device *obd)
384 struct mds_obd *mds = &obd->u.mds;
386 int i, rc = 0, count = 0, page = 0;
390 /* Read everything in the file, even if our current lov desc
391 has fewer targets. Old targets not in the lov descriptor
392 during mds setup may still have valid objids. */
393 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
397 page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
398 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
399 for (i = 0; i < page; i++) {
401 loff_t off_old = off;
403 LASSERT(mds->mds_lov_page_array[i] == NULL);
404 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
405 if (mds->mds_lov_page_array[i] == NULL)
406 GOTO(out, rc = -ENOMEM);
408 data = mds->mds_lov_page_array[i];
410 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
411 OBJID_PER_PAGE()*sizeof(obd_id), &off);
413 CERROR("Error reading objids %d\n", rc);
417 count += (off - off_old) / sizeof(obd_id);
418 if (mds_lov_update_from_read(mds, data, count)) {
419 CERROR("Can't update mds data\n");
420 GOTO(out, rc = -EIO);
426 mds->mds_lov_objid_lastpage = i;
427 mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
429 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
430 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
432 mds_lov_dump_objids("read",obd);
437 int mds_lov_write_objids(struct obd_device *obd)
439 struct mds_obd *mds = &obd->u.mds;
443 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
446 mds_lov_dump_objids("write", obd);
448 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
449 obd_id *data = mds->mds_lov_page_array[i];
450 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
451 loff_t off = i * size;
453 LASSERT(data != NULL);
455 /* check for particaly filled last page */
456 if (i == mds->mds_lov_objid_lastpage)
457 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
459 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
460 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
464 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
471 EXPORT_SYMBOL(mds_lov_write_objids);
473 static int mds_lov_get_objid(struct obd_device * obd,
476 struct mds_obd *mds = &obd->u.mds;
483 page = idx / OBJID_PER_PAGE();
484 off = idx % OBJID_PER_PAGE();
485 data = mds->mds_lov_page_array[page];
487 /* We never read this lastid; ask the osc */
488 struct obd_id_info lastid;
489 __u32 size = sizeof(lastid);
492 lastid.data = &data[off];
493 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
494 KEY_LAST_ID, &size, &lastid, NULL);
498 /* workaround for clean filter */
502 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
504 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
505 idx, data, page, off, data[off]);
510 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
513 struct obdo oa = { 0 };
514 struct obd_trans_info oti = {0};
515 struct lov_stripe_md *empty_ea = NULL;
518 LASSERT(mds->mds_lov_page_array != NULL);
520 /* This create will in fact either create or destroy: If the OST is
521 * missing objects below this ID, they will be created. If it finds
522 * objects above this ID, they will be removed. */
523 memset(&oa, 0, sizeof(oa));
524 oa.o_flags = OBD_FL_DELORPHAN;
525 oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
526 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
527 if (ost_uuid != NULL)
528 oti.oti_ost_uuid = ost_uuid;
530 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
536 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
538 struct mds_obd *mds = &obd->u.mds;
540 struct obd_id_info info;
543 LASSERT(!obd->obd_recovering);
547 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
548 KEY_NEXT_ID, sizeof(info), &info, NULL);
550 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
556 /* Update the lov desc for a new size lov. */
557 static int mds_lov_update_desc(struct obd_device *obd, int idx,
558 struct obd_uuid *uuid, enum obd_notify_event ev)
560 struct mds_obd *mds = &obd->u.mds;
562 __u32 valsize = sizeof(mds->mds_lov_desc);
566 OBD_ALLOC(ld, sizeof(*ld));
570 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
575 /* Don't change the mds_lov_desc until the objids size matches the
577 mds->mds_lov_desc = *ld;
578 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
579 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
581 mutex_down(&obd->obd_dev_sem);
582 rc = mds_lov_update_max_ost(mds, idx);
583 mutex_up(&obd->obd_dev_sem);
587 /* If we added a target we have to reconnect the llogs */
588 /* We only _need_ to do this at first add (idx), or the first time
589 after recovery. However, it should now be safe to call anytime. */
590 rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
594 /*XXX this notifies the MDD until lov handling use old mds code */
595 if (obd->obd_upcall.onu_owner) {
596 LASSERT(obd->obd_upcall.onu_upcall != NULL);
597 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
598 obd->obd_upcall.onu_owner,
599 &mds->mds_mount_count);
602 OBD_FREE(ld, sizeof(*ld));
606 /* Inform MDS about new/updated target */
607 static int mds_lov_update_mds(struct obd_device *obd,
608 struct obd_device *watched,
609 __u32 idx, enum obd_notify_event ev)
611 struct mds_obd *mds = &obd->u.mds;
619 /* Don't let anyone else mess with mds_lov_objids now */
620 rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev);
624 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
625 idx, obd->obd_recovering, obd->obd_async_recov,
626 mds->mds_lov_desc.ld_tgt_count);
628 /* idx is set as data from lov_notify. */
629 if (obd->obd_recovering)
632 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
633 CERROR("index %d > count %d!\n", idx,
634 mds->mds_lov_desc.ld_tgt_count);
635 GOTO(out, rc = -EINVAL);
638 rc = mds_lov_get_objid(obd, idx);
642 page = idx / OBJID_PER_PAGE();
643 off = idx % OBJID_PER_PAGE();
644 data = mds->mds_lov_page_array[page];
646 /* We have read this lastid from disk; tell the osc.
647 Don't call this during recovery. */
648 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
650 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
651 /* Don't abort the rest of the sync */
654 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
661 /* update the LOV-OSC knowledge of the last used object id's */
662 int mds_lov_connect(struct obd_device *obd, char * lov_name)
664 struct mds_obd *mds = &obd->u.mds;
665 struct obd_connect_data *data;
669 if (IS_ERR(mds->mds_osc_obd))
670 RETURN(PTR_ERR(mds->mds_osc_obd));
672 if (mds->mds_osc_obd)
675 mds->mds_osc_obd = class_name2obd(lov_name);
676 if (!mds->mds_osc_obd) {
677 CERROR("MDS cannot locate LOV %s\n", lov_name);
678 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
682 mutex_down(&obd->obd_dev_sem);
683 rc = mds_lov_read_objids(obd);
684 mutex_up(&obd->obd_dev_sem);
686 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
690 rc = obd_register_observer(mds->mds_osc_obd, obd);
692 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
697 /* try init too early */
698 rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
702 mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
704 OBD_ALLOC(data, sizeof(*data));
706 GOTO(err_exit, rc = -ENOMEM);
708 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
709 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
710 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
711 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM |
712 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
713 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
715 #ifdef HAVE_LRU_RESIZE_SUPPORT
716 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
718 data->ocd_version = LUSTRE_VERSION_CODE;
719 data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
720 /* send max bytes per rpc */
721 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
722 /* send the list of supported checksum types */
723 data->ocd_cksum_types = OBD_CKSUM_ALL;
724 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
725 rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
726 OBD_FREE(data, sizeof(*data));
728 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
729 mds->mds_osc_obd = ERR_PTR(rc);
733 /* I want to see a callback happen when the OBD moves to a
734 * "For General Use" state, and that's when we'll call
735 * set_nextid(). The class driver can help us here, because
736 * it can use the obd_recovering flag to determine when the
737 * the OBD is full available. */
738 /* MDD device will care about that
739 if (!obd->obd_recovering)
740 rc = mds_postrecov(obd);
745 mds->mds_osc_exp = NULL;
746 mds->mds_osc_obd = ERR_PTR(rc);
750 int mds_lov_disconnect(struct obd_device *obd)
752 struct mds_obd *mds = &obd->u.mds;
756 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
757 obd_register_observer(mds->mds_osc_obd, NULL);
759 /* The actual disconnect of the mds_lov will be called from
760 * class_disconnect_exports from mds_lov_clean. So we have to
761 * ensure that class_cleanup doesn't fail due to the extra ref
762 * we're holding now. The mechanism to do that already exists -
763 * the obd_force flag. We'll drop the final ref to the
764 * mds_osc_exp in mds_cleanup. */
765 mds->mds_osc_obd->obd_force = 1;
771 struct mds_lov_sync_info {
772 struct obd_device *mlsi_obd; /* the lov device to sync */
773 struct obd_device *mlsi_watched; /* target osc */
774 __u32 mlsi_index; /* index of target */
775 enum obd_notify_event mlsi_ev; /* event type */
778 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
780 struct mds_capa_info info = { .uuid = uuid };
781 struct lustre_capa_key *key;
786 if (!mds->mds_capa_keys)
789 for (i = 0; i < 2; i++) {
790 key = &mds->mds_capa_keys[i];
791 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
794 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
795 KEY_CAPA_KEY, sizeof(info), &info, NULL);
797 DEBUG_CAPA_KEY(D_ERROR, key,
798 "propagate failed (rc = %d) for", rc);
806 /* We only sync one osc at a time, so that we don't have to hold
807 any kind of lock on the whole mds_lov_desc, which may change
808 (grow) as a result of mds_lov_add_ost. This also avoids any
809 kind of mismatch between the lov_desc and the mds_lov_desc,
810 which are not in lock-step during lov_add_obd */
811 static int __mds_lov_synchronize(void *data)
813 struct mds_lov_sync_info *mlsi = data;
814 struct obd_device *obd = mlsi->mlsi_obd;
815 struct obd_device *watched = mlsi->mlsi_watched;
816 struct mds_obd *mds = &obd->u.mds;
817 struct obd_uuid *uuid;
818 __u32 idx = mlsi->mlsi_index;
819 enum obd_notify_event ev = mlsi->mlsi_ev;
820 struct mds_group_info mgi;
821 struct llog_ctxt *ctxt;
829 uuid = &watched->u.cli.cl_target_uuid;
832 down_read(&mds->mds_notify_lock);
833 if (obd->obd_stopping || obd->obd_fail)
834 GOTO(out, rc = -ENODEV);
836 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
837 rc = mds_lov_update_mds(obd, watched, idx, ev);
839 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
842 mgi.group = mdt_to_obd_objgrp(mds->mds_id);
845 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
846 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
849 /* propagate capability keys */
850 rc = mds_propagate_capa_keys(mds, uuid);
854 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
856 GOTO(out, rc = -ENODEV);
858 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
859 rc = llog_connect(ctxt, NULL, NULL, uuid);
862 CERROR("%s failed at llog_origin_connect: %d\n",
863 obd_uuid2str(uuid), rc);
867 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
868 obd->obd_name, obd_uuid2str(uuid));
869 rc = mds_lov_clear_orphans(mds, uuid);
871 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
872 obd_uuid2str(uuid), rc);
876 #ifdef HAVE_QUOTA_SUPPORT
877 if (obd->obd_upcall.onu_owner) {
879 * This is a hack for mds_notify->mdd_notify. When the mds obd
880 * in mdd is removed, This hack should be removed.
882 LASSERT(obd->obd_upcall.onu_upcall != NULL);
883 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
884 obd->obd_upcall.onu_owner,NULL);
889 up_read(&mds->mds_notify_lock);
891 /* Deactivate it for safety */
892 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
894 if (!obd->obd_stopping && mds->mds_osc_obd &&
895 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
896 obd_notify(mds->mds_osc_obd, watched,
897 OBD_NOTIFY_INACTIVE, NULL);
900 class_decref(obd, "mds_lov_synchronize", obd);
904 int mds_lov_synchronize(void *data)
906 struct mds_lov_sync_info *mlsi = data;
909 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
910 cfs_daemonize_ctxt(name);
912 RETURN(__mds_lov_synchronize(data));
915 int mds_lov_start_synchronize(struct obd_device *obd,
916 struct obd_device *watched,
917 void *data, enum obd_notify_event ev)
919 struct mds_lov_sync_info *mlsi;
921 struct obd_uuid *uuid;
925 uuid = &watched->u.cli.cl_target_uuid;
927 OBD_ALLOC(mlsi, sizeof(*mlsi));
932 mlsi->mlsi_obd = obd;
933 mlsi->mlsi_watched = watched;
934 mlsi->mlsi_index = *(__u32 *)data;
937 /* Although class_export_get(obd->obd_self_export) would lock
938 the MDS in place, since it's only a self-export
939 it doesn't lock the LOV in place. The LOV can be disconnected
940 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
941 Simply taking an export ref on the LOV doesn't help, because it's
942 still disconnected. Taking an obd reference insures that we don't
943 disconnect the LOV. This of course means a cleanup won't
944 finish for as long as the sync is blocking. */
945 class_incref(obd, "mds_lov_synchronize", obd);
947 if (ev != OBD_NOTIFY_SYNC) {
948 /* Synchronize in the background */
949 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
950 CLONE_VM | CLONE_FILES);
952 CERROR("%s: error starting mds_lov_synchronize: %d\n",
954 class_decref(obd, "mds_lov_synchronize", obd);
956 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
957 "thread=%d\n", obd->obd_name,
958 mlsi->mlsi_index, rc);
962 rc = __mds_lov_synchronize((void *)mlsi);
968 int mds_notify(struct obd_device *obd, struct obd_device *watched,
969 enum obd_notify_event ev, void *data)
974 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
977 /* We only handle these: */
978 case OBD_NOTIFY_ACTIVE:
979 /* lov want one or more _active_ targets for work */
980 /* activate event should be pass lov idx as argument */
981 case OBD_NOTIFY_SYNC:
982 case OBD_NOTIFY_SYNC_NONBLOCK:
983 /* sync event should be pass lov idx as argument */
989 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
990 CERROR("unexpected notification of %s %s!\n",
991 watched->obd_type->typ_name, watched->obd_name);
995 if (obd->obd_recovering) {
996 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
998 obd_uuid2str(&watched->u.cli.cl_target_uuid));
999 /* We still have to fix the lov descriptor for ost's added
1000 after the mdt in the config log. They didn't make it into
1002 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1003 &watched->u.cli.cl_target_uuid, ev);
1005 rc = mds_lov_start_synchronize(obd, watched, data, ev);