1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mds/mds_lov.c
38 * Lustre Metadata Server (mds) handling of striped file data
40 * Author: Peter Braam <braam@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
54 #include "mds_internal.h"
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
58 struct mds_obd *mds = &obd->u.mds;
61 CDEBUG(D_INFO, "dump from %s\n", label);
62 if (mds->mds_lov_page_dirty == NULL) {
63 CERROR("NULL bitmap!\n");
67 for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68 CDEBUG(D_INFO, "%u - %lx\n", i,
69 mds->mds_lov_page_dirty->data[i]);
71 if (mds->mds_lov_page_array == NULL) {
72 CERROR("not init page array!\n");
76 for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77 obd_id *data = mds->mds_lov_page_array[i];
82 for(j=0; j < OBJID_PER_PAGE(); j++) {
85 CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
93 int mds_lov_init_objids(struct obd_device *obd)
95 struct mds_obd *mds = &obd->u.mds;
96 int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
101 CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
103 mds->mds_lov_page_dirty =
104 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
105 if (mds->mds_lov_page_dirty == NULL)
109 OBD_ALLOC(mds->mds_lov_page_array, size);
110 if (mds->mds_lov_page_array == NULL)
111 GOTO(err_free_bitmap, rc = -ENOMEM);
113 /* open and test the lov objd file */
114 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
117 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
118 GOTO(err_free, rc = PTR_ERR(file));
120 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
121 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
122 file->f_dentry->d_inode->i_mode);
123 GOTO(err_open, rc = -ENOENT);
125 mds->mds_lov_objid_filp = file;
129 if (filp_close((struct file *)file, 0))
130 CERROR("can't close %s after error\n", LOV_OBJID);
132 OBD_FREE(mds->mds_lov_page_array, size);
134 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
139 void mds_lov_destroy_objids(struct obd_device *obd)
141 struct mds_obd *mds = &obd->u.mds;
145 if (mds->mds_lov_page_array != NULL) {
146 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
147 obd_id *data = mds->mds_lov_page_array[i];
149 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
151 OBD_FREE(mds->mds_lov_page_array,
152 MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
155 if (mds->mds_lov_objid_filp) {
156 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
157 mds->mds_lov_objid_filp = NULL;
159 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
162 CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
167 * currently exist two ways for know about ost count and max ost index.
168 * first - after ost is connected to mds and sync process finished
169 * second - get from lmm in recovery process, in case when mds not have configs,
170 * and ost isn't registered in mgs.
172 * \param mds pointer to mds structure
173 * \param index maxium ost index
175 * \retval -ENOMEM is not hame memory for new page
176 * \retval 0 is update passed
178 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
180 __u32 page = index / OBJID_PER_PAGE();
181 __u32 off = index % OBJID_PER_PAGE();
182 obd_id *data = mds->mds_lov_page_array[page];
185 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
189 mds->mds_lov_page_array[page] = data;
192 if (index > mds->mds_lov_objid_max_index) {
193 mds->mds_lov_objid_lastpage = page;
194 mds->mds_lov_objid_lastidx = off;
195 mds->mds_lov_objid_max_index = index;
198 /* workaround - New target not in objids file; increase mdsize */
199 /* ld_tgt_count is used as the max index everywhere, despite its name. */
200 if (data[off] == 0) {
204 mds->mds_lov_objid_count++;
205 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
206 mds->mds_lov_objid_count);
208 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
209 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
211 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
212 " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
213 mds->mds_max_cookiesize);
220 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
222 __u32 page = index / OBJID_PER_PAGE();
223 __u32 off = index % OBJID_PER_PAGE();
224 obd_id *data = mds->mds_lov_page_array[page];
226 return (data[off] > 0);
229 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
231 struct lov_ost_data_v1 *data;
236 /* if we create file without objects - lmm is NULL */
240 switch (le32_to_cpu(lmm->lmm_magic)) {
242 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
243 data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
246 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
247 data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
250 CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
255 cfs_mutex_down(&obd->obd_dev_sem);
256 for (j = 0; j < count; j++) {
257 __u32 i = le32_to_cpu(data[j].l_ost_idx);
258 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
263 cfs_mutex_up(&obd->obd_dev_sem);
267 EXPORT_SYMBOL(mds_lov_prepare_objids);
270 * write llog orphan record about lost ost object,
271 * Special lsm is allocated with single stripe, caller should deallocated it
274 static int mds_log_lost_precreated(struct obd_device *obd,
275 struct lov_stripe_md **lsmp, int *stripes,
276 obd_id id, obd_count count, int idx)
278 struct lov_stripe_md *lsm = *lsmp;
283 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
286 /* need only one stripe, save old value */
287 *stripes = lsm->lsm_stripe_count;
288 lsm->lsm_stripe_count = 1;
292 lsm->lsm_oinfo[0]->loi_id = id;
293 lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
294 lsm->lsm_oinfo[0]->loi_ost_idx = idx;
296 rc = mds_log_op_orphan(obd, lsm, count);
300 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
302 struct mds_obd *mds = &obd->u.mds;
304 struct lov_ost_data_v1 *obj;
305 struct lov_stripe_md *lsm = NULL;
310 /* if we create file without objects - lmm is NULL */
314 switch (le32_to_cpu(lmm->lmm_magic)) {
316 count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
317 obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
320 count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
321 obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
324 CERROR("Unknow lmm type %X !\n",
325 le32_to_cpu(lmm->lmm_magic));
329 for (j = 0; j < count; j++) {
330 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
331 obd_id id = le64_to_cpu(obj[j].l_object_id);
332 __u32 page = i / OBJID_PER_PAGE();
333 __u32 idx = i % OBJID_PER_PAGE();
336 data = mds->mds_lov_page_array[page];
338 CDEBUG(D_INODE,"update last object for ost %u"
339 " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
340 if (id > data[idx]) {
341 int lost = id - data[idx] - 1;
342 /* we might have lost precreated objects due to VBR */
343 if (lost > 0 && obd->obd_recovering) {
344 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
345 if (!obd->obd_version_recov)
346 CERROR("Unexpected gap in objids\n");
347 /* lsm is allocated if NULL */
348 mds_log_lost_precreated(obd, &lsm, &stripes,
349 data[idx]+1, lost, i);
352 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
356 /* restore stripes number */
357 lsm->lsm_stripe_count = stripes;
358 obd_free_memmd(mds->mds_lov_exp, &lsm);
363 EXPORT_SYMBOL(mds_lov_update_objids);
365 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
371 for (i = 0; i < count; i++) {
375 mds->mds_lov_objid_count++;
378 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
379 mds->mds_lov_objid_count);
381 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
382 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
384 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
385 "%d/%d\n", stripes, mds->mds_max_mdsize,
386 mds->mds_max_cookiesize);
392 static int mds_lov_read_objids(struct obd_device *obd)
394 struct mds_obd *mds = &obd->u.mds;
396 int i, rc = 0, count = 0, page = 0;
400 /* Read everything in the file, even if our current lov desc
401 has fewer targets. Old targets not in the lov descriptor
402 during mds setup may still have valid objids. */
403 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
407 page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
408 CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
409 for (i = 0; i < page; i++) {
411 loff_t off_old = off;
413 LASSERT(mds->mds_lov_page_array[i] == NULL);
414 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
415 if (mds->mds_lov_page_array[i] == NULL)
416 GOTO(out, rc = -ENOMEM);
418 data = mds->mds_lov_page_array[i];
420 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
421 MDS_LOV_ALLOC_SIZE, &off);
423 CERROR("Error reading objids %d\n", rc);
426 if (off == off_old) /* hole is read */
427 off += MDS_LOV_ALLOC_SIZE;
429 count = (off - off_old) / sizeof(obd_id);
430 if (mds_lov_update_from_read(mds, data, count)) {
431 CERROR("Can't update mds data\n");
432 GOTO(out, rc = -EIO);
435 mds->mds_lov_objid_lastpage = page - 1;
436 mds->mds_lov_objid_lastidx = count - 1;
438 CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
439 mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
441 mds_lov_dump_objids("read",obd);
446 int mds_lov_write_objids(struct obd_device *obd)
448 struct mds_obd *mds = &obd->u.mds;
452 if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
455 mds_lov_dump_objids("write", obd);
457 cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
458 obd_id *data = mds->mds_lov_page_array[i];
459 unsigned int size = MDS_LOV_ALLOC_SIZE;
460 loff_t off = i * size;
462 LASSERT(data != NULL);
464 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
467 /* check for particaly filled last page */
468 if (i == mds->mds_lov_objid_lastpage)
469 size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
471 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
472 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
475 cfs_bitmap_set(mds->mds_lov_page_dirty, i);
484 EXPORT_SYMBOL(mds_lov_write_objids);
486 static int mds_lov_get_objid(struct obd_device * obd,
489 struct mds_obd *mds = &obd->u.mds;
490 struct obd_export *lov_exp = mds->mds_lov_exp;
498 page = idx / OBJID_PER_PAGE();
499 off = idx % OBJID_PER_PAGE();
500 data = mds->mds_lov_page_array[page];
503 /* We never read this lastid; ask the osc */
504 struct obd_id_info lastid;
506 size = sizeof(lastid);
508 lastid.data = &data[off];
509 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
510 &size, &lastid, NULL);
514 /* workaround for clean filter */
518 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
520 CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
521 idx, data, page, off, data[off]);
526 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
529 struct obdo oa = { 0 };
530 struct obd_trans_info oti = {0};
531 struct lov_stripe_md *empty_ea = NULL;
534 LASSERT(mds->mds_lov_page_array != NULL);
536 /* This create will in fact either create or destroy: If the OST is
537 * missing objects below this ID, they will be created. If it finds
538 * objects above this ID, they will be removed. */
539 memset(&oa, 0, sizeof(oa));
540 oa.o_flags = OBD_FL_DELORPHAN;
541 oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
542 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
543 if (ost_uuid != NULL)
544 oti.oti_ost_uuid = ost_uuid;
546 rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
552 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
554 struct mds_obd *mds = &obd->u.mds;
556 struct obd_id_info info;
559 LASSERT(!obd->obd_recovering);
563 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
564 KEY_NEXT_ID, sizeof(info), &info, NULL);
566 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
572 /* Update the lov desc for a new size lov. */
573 static int mds_lov_update_desc(struct obd_device *obd, int idx,
574 struct obd_uuid *uuid)
576 struct mds_obd *mds = &obd->u.mds;
578 __u32 valsize = sizeof(mds->mds_lov_desc);
582 OBD_ALLOC(ld, sizeof(*ld));
586 rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
591 /* Don't change the mds_lov_desc until the objids size matches the
593 mds->mds_lov_desc = *ld;
594 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
595 mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
597 cfs_mutex_down(&obd->obd_dev_sem);
598 rc = mds_lov_update_max_ost(mds, idx);
599 cfs_mutex_up(&obd->obd_dev_sem);
603 /* If we added a target we have to reconnect the llogs */
604 /* We only _need_ to do this at first add (idx), or the first time
605 after recovery. However, it should now be safe to call anytime. */
606 rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
611 OBD_FREE(ld, sizeof(*ld));
615 /* Inform MDS about new/updated target */
616 static int mds_lov_update_mds(struct obd_device *obd,
617 struct obd_device *watched,
620 struct mds_obd *mds = &obd->u.mds;
627 LASSERT(mds_lov_objinit(mds, idx));
629 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
630 idx, obd->obd_recovering, obd->obd_async_recov,
631 mds->mds_lov_desc.ld_tgt_count);
633 /* idx is set as data from lov_notify. */
634 if (obd->obd_recovering)
637 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
638 CERROR("index %d > count %d!\n", idx,
639 mds->mds_lov_desc.ld_tgt_count);
640 GOTO(out, rc = -EINVAL);
643 rc = mds_lov_get_objid(obd, idx);
647 page = idx / OBJID_PER_PAGE();
648 off = idx % OBJID_PER_PAGE();
649 data = mds->mds_lov_page_array[page];
651 /* We have read this lastid from disk; tell the osc.
652 Don't call this during recovery. */
653 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
655 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
656 /* Don't abort the rest of the sync */
659 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
666 /* update the LOV-OSC knowledge of the last used object id's */
667 int mds_lov_connect(struct obd_device *obd, char * lov_name)
669 struct mds_obd *mds = &obd->u.mds;
670 struct obd_connect_data *data;
674 if (IS_ERR(mds->mds_lov_obd))
675 RETURN(PTR_ERR(mds->mds_lov_obd));
677 if (mds->mds_lov_obd)
680 mds->mds_lov_obd = class_name2obd(lov_name);
681 if (!mds->mds_lov_obd) {
682 CERROR("MDS cannot locate LOV %s\n", lov_name);
683 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
687 cfs_mutex_down(&obd->obd_dev_sem);
688 rc = mds_lov_read_objids(obd);
689 cfs_mutex_up(&obd->obd_dev_sem);
691 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
695 rc = obd_register_observer(mds->mds_lov_obd, obd);
697 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
702 /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
704 obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
706 mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
708 OBD_ALLOC(data, sizeof(*data));
710 GOTO(err_exit, rc = -ENOMEM);
712 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
713 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
714 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_FID |
715 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_CKSUM |
716 OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT |
717 OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN |
718 OBD_CONNECT_SOM | OBD_CONNECT_FULL20;
719 #ifdef HAVE_LRU_RESIZE_SUPPORT
720 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
722 data->ocd_version = LUSTRE_VERSION_CODE;
723 data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
724 /* send max bytes per rpc */
725 data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
726 /* send the list of supported checksum types */
727 data->ocd_cksum_types = OBD_CKSUM_ALL;
728 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
729 rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
730 OBD_FREE(data, sizeof(*data));
732 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
733 mds->mds_lov_obd = ERR_PTR(rc);
737 /* I want to see a callback happen when the OBD moves to a
738 * "For General Use" state, and that's when we'll call
739 * set_nextid(). The class driver can help us here, because
740 * it can use the obd_recovering flag to determine when the
741 * the OBD is full available. */
742 /* MDD device will care about that
743 if (!obd->obd_recovering)
744 rc = mds_postrecov(obd);
749 mds->mds_lov_exp = NULL;
750 mds->mds_lov_obd = ERR_PTR(rc);
754 int mds_lov_disconnect(struct obd_device *obd)
756 struct mds_obd *mds = &obd->u.mds;
760 if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
761 obd_register_observer(mds->mds_lov_obd, NULL);
763 /* The actual disconnect of the mds_lov will be called from
764 * class_disconnect_exports from mds_lov_clean. So we have to
765 * ensure that class_cleanup doesn't fail due to the extra ref
766 * we're holding now. The mechanism to do that already exists -
767 * the obd_force flag. We'll drop the final ref to the
768 * mds_lov_exp in mds_cleanup. */
769 mds->mds_lov_obd->obd_force = 1;
775 struct mds_lov_sync_info {
776 struct obd_device *mlsi_obd; /* the lov device to sync */
777 struct obd_device *mlsi_watched; /* target osc */
778 __u32 mlsi_index; /* index of target */
781 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
783 struct mds_capa_info info = { .uuid = uuid };
784 struct lustre_capa_key *key;
789 if (!mds->mds_capa_keys)
792 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
793 for (i = 0; i < 2; i++) {
794 key = &mds->mds_capa_keys[i];
795 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
798 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
799 KEY_CAPA_KEY, sizeof(info), &info, NULL);
801 DEBUG_CAPA_KEY(D_ERROR, key,
802 "propagate failed (rc = %d) for", rc);
810 /* We only sync one osc at a time, so that we don't have to hold
811 any kind of lock on the whole mds_lov_desc, which may change
812 (grow) as a result of mds_lov_add_ost. This also avoids any
813 kind of mismatch between the lov_desc and the mds_lov_desc,
814 which are not in lock-step during lov_add_obd */
815 static int __mds_lov_synchronize(void *data)
817 struct mds_lov_sync_info *mlsi = data;
818 struct obd_device *obd = mlsi->mlsi_obd;
819 struct obd_device *watched = mlsi->mlsi_watched;
820 struct mds_obd *mds = &obd->u.mds;
821 struct obd_uuid *uuid;
822 __u32 idx = mlsi->mlsi_index;
823 struct mds_group_info mgi;
824 struct llog_ctxt *ctxt;
832 uuid = &watched->u.cli.cl_target_uuid;
835 cfs_down_read(&mds->mds_notify_lock);
836 if (obd->obd_stopping || obd->obd_fail)
837 GOTO(out, rc = -ENODEV);
839 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
840 rc = mds_lov_update_mds(obd, watched, idx);
842 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
845 mgi.group = mdt_to_obd_objseq(mds->mds_id);
848 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
849 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
852 /* propagate capability keys */
853 rc = mds_propagate_capa_keys(mds, uuid);
857 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
859 GOTO(out, rc = -ENODEV);
861 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
862 rc = llog_connect(ctxt, NULL, NULL, uuid);
865 CERROR("%s failed at llog_origin_connect: %d\n",
866 obd_uuid2str(uuid), rc);
870 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
871 obd->obd_name, obd_uuid2str(uuid));
873 rc = mds_lov_clear_orphans(mds, uuid);
875 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
876 obd_uuid2str(uuid), rc);
880 #ifdef HAVE_QUOTA_SUPPORT
881 if (obd->obd_upcall.onu_owner) {
883 * This is a hack for mds_notify->mdd_notify. When the mds obd
884 * in mdd is removed, This hack should be removed.
886 LASSERT(obd->obd_upcall.onu_upcall != NULL);
887 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
888 obd->obd_upcall.onu_owner,NULL);
893 cfs_up_read(&mds->mds_notify_lock);
895 /* Deactivate it for safety */
896 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
898 if (!obd->obd_stopping && mds->mds_lov_obd &&
899 !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
900 obd_notify(mds->mds_lov_obd, watched,
901 OBD_NOTIFY_INACTIVE, NULL);
904 class_decref(obd, "mds_lov_synchronize", obd);
908 int mds_lov_synchronize(void *data)
910 struct mds_lov_sync_info *mlsi = data;
913 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
914 cfs_daemonize_ctxt(name);
916 RETURN(__mds_lov_synchronize(data));
919 int mds_lov_start_synchronize(struct obd_device *obd,
920 struct obd_device *watched,
921 void *data, enum obd_notify_event ev)
923 struct mds_lov_sync_info *mlsi;
925 struct obd_uuid *uuid;
929 uuid = &watched->u.cli.cl_target_uuid;
931 OBD_ALLOC(mlsi, sizeof(*mlsi));
936 mlsi->mlsi_obd = obd;
937 mlsi->mlsi_watched = watched;
938 mlsi->mlsi_index = *(__u32 *)data;
940 /* Although class_export_get(obd->obd_self_export) would lock
941 the MDS in place, since it's only a self-export
942 it doesn't lock the LOV in place. The LOV can be disconnected
943 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
944 Simply taking an export ref on the LOV doesn't help, because it's
945 still disconnected. Taking an obd reference insures that we don't
946 disconnect the LOV. This of course means a cleanup won't
947 finish for as long as the sync is blocking. */
948 class_incref(obd, "mds_lov_synchronize", obd);
950 if (ev != OBD_NOTIFY_SYNC) {
951 /* Synchronize in the background */
952 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
953 CLONE_VM | CLONE_FILES);
955 CERROR("%s: error starting mds_lov_synchronize: %d\n",
957 class_decref(obd, "mds_lov_synchronize", obd);
959 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
960 "thread=%d\n", obd->obd_name,
961 mlsi->mlsi_index, rc);
965 rc = __mds_lov_synchronize((void *)mlsi);
971 int mds_notify(struct obd_device *obd, struct obd_device *watched,
972 enum obd_notify_event ev, void *data)
974 struct mds_obd *mds = &obd->u.mds;
978 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
980 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
981 CERROR("unexpected notification of %s %s!\n",
982 watched->obd_type->typ_name, watched->obd_name);
986 /*XXX this notifies the MDD until lov handling use old mds code
989 if (obd->obd_upcall.onu_owner) {
990 LASSERT(obd->obd_upcall.onu_upcall != NULL);
991 rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
992 obd->obd_upcall.onu_owner,
993 &mds->mds_obt.obt_mount_count);
997 /* We only handle these: */
998 case OBD_NOTIFY_CREATE:
999 CWARN("MDS %s: add target %s\n",obd->obd_name,
1000 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1001 /* We still have to fix the lov descriptor for ost's */
1003 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1004 &watched->u.cli.cl_target_uuid);
1006 case OBD_NOTIFY_ACTIVE:
1007 /* lov want one or more _active_ targets for work */
1008 /* activate event should be pass lov idx as argument */
1009 case OBD_NOTIFY_SYNC:
1010 case OBD_NOTIFY_SYNC_NONBLOCK:
1011 /* sync event should be pass lov idx as argument */
1017 if (obd->obd_recovering) {
1018 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1020 obd_uuid2str(&watched->u.cli.cl_target_uuid));
1021 /* We still have to fix the lov descriptor for ost's added
1022 after the mdt in the config log. They didn't make it into
1024 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1025 &watched->u.cli.cl_target_uuid);
1027 rc = mds_lov_start_synchronize(obd, watched, data, ev);