1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
46 struct mds_obd *mds = &obd->u.mds;
51 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52 if (ids[i] > (mds->mds_lov_objids)[i]) {
53 (mds->mds_lov_objids)[i] = ids[i];
54 mds->mds_lov_objids_dirty = 1;
60 static int mds_lov_read_objids(struct obd_device *obd)
62 struct mds_obd *mds = &obd->u.mds;
68 LASSERT(!mds->mds_lov_objids_size);
69 LASSERT(!mds->mds_lov_objids_dirty);
71 /* Read everything in the file, even if our current lov desc
72 has fewer targets. Old targets not in the lov descriptor
73 during mds setup may still have valid objids. */
74 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
81 mds->mds_lov_objids = ids;
82 mds->mds_lov_objids_size = size;
84 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
86 CERROR("Error reading objids %d\n", rc);
90 mds->mds_lov_objids_in_file = size / sizeof(*ids);
92 for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
93 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
94 mds->mds_lov_objids[i], i);
99 int mds_lov_write_objids(struct obd_device *obd)
101 struct mds_obd *mds = &obd->u.mds;
106 if (!mds->mds_lov_objids_dirty)
109 tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
114 for (i = 0; i < tgts; i++)
115 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
116 mds->mds_lov_objids[i], i);
118 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
119 mds->mds_lov_objids, tgts * sizeof(obd_id),
122 mds->mds_lov_objids_dirty = 0;
129 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
133 struct obd_trans_info oti = {0};
134 struct lov_stripe_md *empty_ea = NULL;
137 LASSERT(mds->mds_lov_objids != NULL);
139 /* This create will in fact either create or destroy: If the OST is
140 * missing objects below this ID, they will be created. If it finds
141 * objects above this ID, they will be removed. */
142 memset(&oa, 0, sizeof(oa));
143 oa.o_valid = OBD_MD_FLFLAGS;
144 oa.o_flags = OBD_FL_DELORPHAN;
145 if (ost_uuid != NULL) {
146 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
147 oa.o_valid |= OBD_MD_FLINLINE;
149 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
154 /* update the LOV-OSC knowledge of the last used object id's */
155 int mds_lov_set_nextid(struct obd_device *obd)
157 struct mds_obd *mds = &obd->u.mds;
161 LASSERT(!obd->obd_recovering);
162 LASSERT(mds->mds_lov_objids != NULL);
164 /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
165 LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
167 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
169 mds->mds_lov_desc.ld_tgt_count *
170 sizeof(*mds->mds_lov_objids),
171 mds->mds_lov_objids, NULL);
174 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
180 /* Update the lov desc for a new size lov. */
181 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
183 struct mds_obd *mds = &obd->u.mds;
185 __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
189 OBD_ALLOC(ld, sizeof(*ld));
193 rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
198 /* The size of the LOV target table may have increased. */
199 size = ld->ld_tgt_count * sizeof(obd_id);
200 if ((mds->mds_lov_objids_size == 0) ||
201 (size > mds->mds_lov_objids_size)) {
204 /* add room by powers of 2 */
206 while (size < ld->ld_tgt_count)
208 size = size * sizeof(obd_id);
210 OBD_ALLOC(ids, size);
212 GOTO(out, rc = -ENOMEM);
213 memset(ids, 0, size);
214 if (mds->mds_lov_objids_size) {
215 obd_id *old_ids = mds->mds_lov_objids;
216 memcpy(ids, mds->mds_lov_objids,
217 mds->mds_lov_objids_size);
218 mds->mds_lov_objids = ids;
219 OBD_FREE(old_ids, mds->mds_lov_objids_size);
221 mds->mds_lov_objids = ids;
222 mds->mds_lov_objids_size = size;
225 /* Don't change the mds_lov_desc until the objids size matches the
227 mds->mds_lov_desc = *ld;
228 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
229 mds->mds_lov_desc.ld_tgt_count);
231 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
232 max(mds->mds_lov_desc.ld_tgt_count,
233 mds->mds_lov_objids_in_file));
234 mds->mds_max_mdsize = lov_mds_md_size(stripes);
235 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
236 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
237 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
240 /* If we added a target we have to reconnect the llogs */
241 /* We only _need_ to do this at first add (idx), or the first time
242 after recovery. However, it should now be safe to call anytime. */
243 llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
246 OBD_FREE(ld, sizeof(*ld));
251 #define MDSLOV_NO_INDEX -1
253 /* Inform MDS about new/updated target */
254 static int mds_lov_update_mds(struct obd_device *obd,
255 struct obd_device *watched,
256 __u32 idx, struct obd_uuid *uuid)
258 struct mds_obd *mds = &obd->u.mds;
263 /* Don't let anyone else mess with mds_lov_objids now */
264 mutex_down(&obd->obd_dev_sem);
266 old_count = mds->mds_lov_desc.ld_tgt_count;
267 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
271 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
272 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
273 mds->mds_lov_desc.ld_tgt_count);
275 /* idx is set as data from lov_notify. */
276 if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
279 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
280 CERROR("index %d > count %d!\n", idx,
281 mds->mds_lov_desc.ld_tgt_count);
282 GOTO(out, rc = -EINVAL);
285 if (idx >= mds->mds_lov_objids_in_file) {
286 /* We never read this lastid; ask the osc */
288 __u32 size = sizeof(lastid);
289 rc = obd_get_info(watched->obd_self_export, strlen("last_id"),
290 "last_id", &size, &lastid);
293 mds->mds_lov_objids[idx] = lastid;
294 mds->mds_lov_objids_dirty = 1;
295 mds_lov_write_objids(obd);
297 /* We have read this lastid from disk; tell the osc.
298 Don't call this during recovery. */
299 rc = mds_lov_set_nextid(obd);
301 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
302 /* Don't abort the rest of the sync */
307 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
308 mds->mds_lov_objids[idx], idx, rc);
310 mutex_up(&obd->obd_dev_sem);
314 /* update the LOV-OSC knowledge of the last used object id's */
315 int mds_lov_connect(struct obd_device *obd, char * lov_name)
317 struct mds_obd *mds = &obd->u.mds;
318 struct lustre_handle conn = {0,};
319 struct obd_connect_data *data;
323 if (IS_ERR(mds->mds_osc_obd))
324 RETURN(PTR_ERR(mds->mds_osc_obd));
326 if (mds->mds_osc_obd)
329 mds->mds_osc_obd = class_name2obd(lov_name);
330 if (!mds->mds_osc_obd) {
331 CERROR("MDS cannot locate LOV %s\n", lov_name);
332 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
336 OBD_ALLOC(data, sizeof(*data));
339 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
340 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 | OBD_CONNECT_AT;
341 #ifdef HAVE_LRU_RESIZE_SUPPORT
342 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
344 data->ocd_version = LUSTRE_VERSION_CODE;
345 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
346 rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data);
347 OBD_FREE(data, sizeof(*data));
349 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
350 mds->mds_osc_obd = ERR_PTR(rc);
353 mds->mds_osc_exp = class_conn2export(&conn);
355 rc = obd_register_observer(mds->mds_osc_obd, obd);
357 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
359 GOTO(err_discon, rc);
362 /* Deny new client connections until we are sure we have some OSTs */
363 obd->obd_no_conn = 1;
365 mutex_down(&obd->obd_dev_sem);
366 rc = mds_lov_read_objids(obd);
368 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
372 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
376 /* tgt_count may be 0! */
377 rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
379 CERROR("failed to initialize catalog %d\n", rc);
383 /* If we're mounting this code for the first time on an existing FS,
384 * we need to populate the objids array from the real OST values */
385 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
386 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
387 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
388 "last_id", &size, mds->mds_lov_objids);
390 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
391 CWARN("got last object "LPU64" from OST %d\n",
392 mds->mds_lov_objids[i], i);
393 mds->mds_lov_objids_dirty = 1;
394 rc = mds_lov_write_objids(obd);
396 CERROR("got last objids from OSTs, but error "
397 "writing objids file: %d\n", rc);
400 mutex_up(&obd->obd_dev_sem);
402 /* I want to see a callback happen when the OBD moves to a
403 * "For General Use" state, and that's when we'll call
404 * set_nextid(). The class driver can help us here, because
405 * it can use the obd_recovering flag to determine when the
406 * the OBD is full available. */
407 if (!obd->obd_recovering)
408 rc = mds_postrecov(obd);
412 mutex_up(&obd->obd_dev_sem);
413 obd_register_observer(mds->mds_osc_obd, NULL);
415 obd_disconnect(mds->mds_osc_exp);
416 mds->mds_osc_exp = NULL;
417 mds->mds_osc_obd = ERR_PTR(rc);
421 int mds_lov_disconnect(struct obd_device *obd)
423 struct mds_obd *mds = &obd->u.mds;
427 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
428 obd_register_observer(mds->mds_osc_obd, NULL);
430 /* The actual disconnect of the mds_lov will be called from
431 * class_disconnect_exports from mds_lov_clean. So we have to
432 * ensure that class_cleanup doesn't fail due to the extra ref
433 * we're holding now. The mechanism to do that already exists -
434 * the obd_force flag. We'll drop the final ref to the
435 * mds_osc_exp in mds_cleanup. */
436 mds->mds_osc_obd->obd_force = 1;
442 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
443 void *karg, void *uarg)
445 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
446 struct obd_device *obd = exp->exp_obd;
447 struct mds_obd *mds = &obd->u.mds;
448 struct obd_ioctl_data *data = karg;
449 struct lvfs_run_ctxt saved;
453 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
456 case OBD_IOC_RECORD: {
457 char *name = data->ioc_inlbuf1;
458 struct llog_ctxt *ctxt;
460 if (mds->mds_cfg_llh)
463 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
464 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
465 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
468 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
471 mds->mds_cfg_llh = NULL;
472 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
477 case OBD_IOC_ENDRECORD: {
478 if (!mds->mds_cfg_llh)
481 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
482 rc = llog_close(mds->mds_cfg_llh);
483 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
485 mds->mds_cfg_llh = NULL;
489 case OBD_IOC_CLEAR_LOG: {
490 char *name = data->ioc_inlbuf1;
491 struct llog_ctxt *ctxt;
492 if (mds->mds_cfg_llh)
495 ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
496 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
497 rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name);
500 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
503 rc = llog_destroy(mds->mds_cfg_llh);
504 llog_free_handle(mds->mds_cfg_llh);
506 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
508 mds->mds_cfg_llh = NULL;
512 case OBD_IOC_DORECORD: {
514 struct llog_rec_hdr rec;
515 if (!mds->mds_cfg_llh)
518 rec.lrh_len = llog_data_len(data->ioc_plen1);
520 if (data->ioc_type == LUSTRE_CFG_TYPE) {
521 rec.lrh_type = OBD_CFG_REC;
523 CERROR("unknown cfg record type:%d \n", data->ioc_type);
527 OBD_ALLOC(cfg_buf, data->ioc_plen1);
530 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
532 OBD_FREE(cfg_buf, data->ioc_plen1);
536 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
537 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
539 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
541 OBD_FREE(cfg_buf, data->ioc_plen1);
545 case OBD_IOC_PARSE: {
546 struct llog_ctxt *ctxt =
547 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
548 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
549 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
550 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
558 case OBD_IOC_DUMP_LOG: {
559 struct llog_ctxt *ctxt =
560 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
561 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
562 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
563 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
572 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
573 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
577 case OBD_IOC_SET_READONLY: {
579 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
580 BDEVNAME_DECLARE_STORAGE(tmp);
581 LCONSOLE_WARN("*** setting obd %s device '%s' read-only ***\n",
582 obd->obd_name, ll_bdevname(obd->u.obt.obt_sb, tmp));
584 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
586 rc = fsfilt_commit(obd, inode, handle, 1);
588 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
589 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
591 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
595 case OBD_IOC_CATLOGLIST: {
596 int count = mds->mds_lov_desc.ld_tgt_count;
597 rc = llog_catalog_list(obd, count, data);
601 case OBD_IOC_LLOG_CHECK:
602 case OBD_IOC_LLOG_CANCEL:
603 case OBD_IOC_LLOG_REMOVE: {
604 struct llog_ctxt *ctxt =
605 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
608 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
609 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
610 rc = llog_ioctl(ctxt, cmd, data);
611 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
612 llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
614 rc2 = obd_set_info_async(mds->mds_osc_exp,
615 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
621 case OBD_IOC_LLOG_INFO:
622 case OBD_IOC_LLOG_PRINT: {
623 struct llog_ctxt *ctxt =
624 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
626 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
627 rc = llog_ioctl(ctxt, cmd, data);
628 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
634 case OBD_IOC_ABORT_RECOVERY:
635 CERROR("aborting recovery for device %s\n", obd->obd_name);
636 target_abort_recovery(obd);
640 CDEBUG(D_INFO, "unknown command %x\n", cmd);
647 /* Collect the preconditions we need to allow client connects */
648 static void mds_allow_cli(struct obd_device *obd, unsigned long flag)
650 if (flag & CONFIG_LOG)
651 obd->u.mds.mds_fl_cfglog = 1;
652 if (flag & CONFIG_SYNC)
653 obd->u.mds.mds_fl_synced = 1;
654 if (obd->u.mds.mds_fl_cfglog && obd->u.mds.mds_fl_synced)
655 /* Open for clients */
656 obd->obd_no_conn = 0;
659 struct mds_lov_sync_info {
660 struct obd_device *mlsi_obd; /* the lov device to sync */
661 struct obd_device *mlsi_watched; /* target osc */
662 __u32 mlsi_index; /* index of target */
665 /* We only sync one osc at a time, so that we don't have to hold
666 any kind of lock on the whole mds_lov_desc, which may change
667 (grow) as a result of mds_lov_add_ost. This also avoids any
668 kind of mismatch between the lov_desc and the mds_lov_desc,
669 which are not in lock-step during lov_add_obd */
670 static int __mds_lov_synchronize(void *data)
672 struct mds_lov_sync_info *mlsi = data;
673 struct obd_device *obd = mlsi->mlsi_obd;
674 struct obd_device *watched = mlsi->mlsi_watched;
675 struct mds_obd *mds = &obd->u.mds;
676 struct obd_uuid *uuid;
677 __u32 idx = mlsi->mlsi_index;
678 struct llog_ctxt *ctxt;
682 OBD_FREE(mlsi, sizeof(*mlsi));
686 uuid = &watched->u.cli.cl_target_uuid;
689 OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
691 rc = mds_lov_update_mds(obd, watched, idx, uuid);
693 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
697 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
698 KEY_MDS_CONN, 0, uuid, NULL);
702 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
706 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
708 rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
713 CERROR("%s failed at llog_origin_connect: %d\n",
714 obd_uuid2str(uuid), rc);
718 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
719 obd->obd_name, obd_uuid2str(uuid));
721 if (obd->obd_stopping)
722 GOTO(out, rc = -ENODEV);
724 rc = mds_lov_clear_orphans(mds, uuid);
726 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
727 obd_uuid2str(uuid), rc);
734 /* Deactivate it for safety */
735 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
737 if (!obd->obd_stopping && mds->mds_osc_obd &&
738 !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
739 obd_notify(mds->mds_osc_obd, watched,
740 OBD_NOTIFY_INACTIVE, NULL);
742 /* We've successfully synced at least 1 OST and are ready
743 to handle client requests */
744 mds_allow_cli(obd, CONFIG_SYNC);
751 int mds_lov_synchronize(void *data)
753 struct mds_lov_sync_info *mlsi = data;
756 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
757 /* There is still a watched target,
758 but we don't know its index */
759 sprintf(name, "ll_sync_tgt");
761 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
762 ptlrpc_daemonize(name);
764 RETURN(__mds_lov_synchronize(data));
767 int mds_lov_start_synchronize(struct obd_device *obd,
768 struct obd_device *watched,
769 void *data, int nonblock)
771 struct mds_lov_sync_info *mlsi;
778 OBD_ALLOC(mlsi, sizeof(*mlsi));
782 mlsi->mlsi_obd = obd;
783 mlsi->mlsi_watched = watched;
785 mlsi->mlsi_index = *(__u32 *)data;
787 mlsi->mlsi_index = MDSLOV_NO_INDEX;
789 /* Although class_export_get(obd->obd_self_export) would lock
790 the MDS in place, since it's only a self-export
791 it doesn't lock the LOV in place. The LOV can be disconnected
792 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
793 Simply taking an export ref on the LOV doesn't help, because it's
794 still disconnected. Taking an obd reference insures that we don't
795 disconnect the LOV. This of course means a cleanup won't
796 finish for as long as the sync is blocking. */
800 /* Synchronize in the background */
801 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
802 CLONE_VM | CLONE_FILES);
804 CERROR("%s: error starting mds_lov_synchronize: %d\n",
808 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
809 "thread=%d\n", obd->obd_name,
810 mlsi->mlsi_index, rc);
814 rc = __mds_lov_synchronize((void *)mlsi);
820 int mds_notify(struct obd_device *obd, struct obd_device *watched,
821 enum obd_notify_event ev, void *data)
827 /* We only handle these: */
828 case OBD_NOTIFY_ACTIVE:
829 case OBD_NOTIFY_SYNC:
830 case OBD_NOTIFY_SYNC_NONBLOCK:
832 case OBD_NOTIFY_CONFIG:
833 mds_allow_cli(obd, (unsigned long)data);
838 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
840 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
841 CERROR("unexpected notification of %s %s!\n",
842 watched->obd_type->typ_name, watched->obd_name);
846 if (obd->obd_recovering) {
847 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
849 obd_uuid2str(&watched->u.cli.cl_target_uuid));
850 /* We still have to fix the lov descriptor for ost's added
851 after the mdt in the config log. They didn't make it into
853 mutex_down(&obd->obd_dev_sem);
854 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
855 mutex_up(&obd->obd_dev_sem);
856 mds_allow_cli(obd, CONFIG_SYNC);
860 LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
861 rc = mds_lov_start_synchronize(obd, watched, data,
862 !(ev == OBD_NOTIFY_SYNC));
864 lquota_recovery(mds_quota_interface_ref, obd);
869 /* Convert the on-disk LOV EA structre.
870 * We always try to convert from an old LOV EA format to the common in-memory
871 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
872 * then convert back to the new on-disk format and save it back to disk
873 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
874 * to convert it each time this inode is accessed.
876 * This function is a bit interesting in the error handling. We can safely
877 * ship the old lmm to the client in case of failure, since it uses the same
878 * obd_unpackmd() code and can do the conversion if the MDS fails for some
879 * reason. We will not delete the old lmm data until we have written the
880 * new format lmm data in fsfilt_set_md(). */
881 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
882 struct lov_mds_md *lmm, int lmm_size)
884 struct lov_stripe_md *lsm = NULL;
889 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
890 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
893 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
894 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
897 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
901 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
906 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
907 if (IS_ERR(handle)) {
908 rc = PTR_ERR(handle);
912 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
914 err = fsfilt_commit(obd, inode, handle, 0);
916 rc = err ? err : lmm_size;
919 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
924 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
925 struct lov_desc *desc)
928 for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
929 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
930 le64_to_cpu(lmm->lmm_objects[i].l_object_id);