1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) handling of striped file data
7 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_MDS
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
42 #include "mds_internal.h"
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
46 struct mds_obd *mds = &obd->u.mds;
51 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52 if (ids[i] > (mds->mds_lov_objids)[i]) {
53 (mds->mds_lov_objids)[i] = ids[i];
54 mds->mds_lov_objids_dirty = 1;
59 EXPORT_SYMBOL(mds_lov_update_objids);
61 static int mds_lov_read_objids(struct obd_device *obd)
63 struct mds_obd *mds = &obd->u.mds;
69 LASSERT(!mds->mds_lov_objids_size);
70 LASSERT(!mds->mds_lov_objids_dirty);
72 /* Read everything in the file, even if our current lov desc
73 has fewer targets. Old targets not in the lov descriptor
74 during mds setup may still have valid objids. */
75 size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
82 mds->mds_lov_objids = ids;
83 mds->mds_lov_objids_size = size;
85 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
87 CERROR("Error reading objids %d\n", rc);
91 mds->mds_lov_objids_in_file = size / sizeof(*ids);
93 for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
94 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95 mds->mds_lov_objids[i], i);
100 int mds_lov_write_objids(struct obd_device *obd)
102 struct mds_obd *mds = &obd->u.mds;
107 if (!mds->mds_lov_objids_dirty)
110 tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
115 for (i = 0; i < tgts; i++)
116 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
117 mds->mds_lov_objids[i], i);
119 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
120 mds->mds_lov_objids, tgts * sizeof(obd_id),
123 mds->mds_lov_objids_dirty = 0;
129 EXPORT_SYMBOL(mds_lov_write_objids);
131 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
135 struct obd_trans_info oti = {0};
136 struct lov_stripe_md *empty_ea = NULL;
139 LASSERT(mds->mds_lov_objids != NULL);
141 /* This create will in fact either create or destroy: If the OST is
142 * missing objects below this ID, they will be created. If it finds
143 * objects above this ID, they will be removed. */
144 memset(&oa, 0, sizeof(oa));
145 oa.o_flags = OBD_FL_DELORPHAN;
146 oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
147 oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
148 if (ost_uuid != NULL) {
149 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
150 oa.o_valid |= OBD_MD_FLINLINE;
152 rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
157 /* update the LOV-OSC knowledge of the last used object id's */
158 int mds_lov_set_nextid(struct obd_device *obd)
160 struct mds_obd *mds = &obd->u.mds;
164 LASSERT(!obd->obd_recovering);
166 LASSERT(mds->mds_lov_objids != NULL);
168 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
170 mds->mds_lov_desc.ld_tgt_count *
171 sizeof(*mds->mds_lov_objids),
172 mds->mds_lov_objids, NULL);
175 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
181 /* Update the lov desc for a new size lov. */
182 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
184 struct mds_obd *mds = &obd->u.mds;
186 __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
190 OBD_ALLOC(ld, sizeof(*ld));
194 rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
199 /* The size of the LOV target table may have increased. */
200 size = ld->ld_tgt_count * sizeof(obd_id);
201 if ((mds->mds_lov_objids_size == 0) ||
202 (size > mds->mds_lov_objids_size)) {
205 /* add room by powers of 2 */
207 while (size < ld->ld_tgt_count)
209 size = size * sizeof(obd_id);
211 OBD_ALLOC(ids, size);
213 GOTO(out, rc = -ENOMEM);
214 memset(ids, 0, size);
215 if (mds->mds_lov_objids_size) {
216 obd_id *old_ids = mds->mds_lov_objids;
217 memcpy(ids, mds->mds_lov_objids,
218 mds->mds_lov_objids_size);
219 mds->mds_lov_objids = ids;
220 OBD_FREE(old_ids, mds->mds_lov_objids_size);
222 mds->mds_lov_objids = ids;
223 mds->mds_lov_objids_size = size;
226 /* Don't change the mds_lov_desc until the objids size matches the
228 mds->mds_lov_desc = *ld;
229 CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
230 mds->mds_lov_desc.ld_tgt_count);
232 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
233 max(mds->mds_lov_desc.ld_tgt_count,
234 mds->mds_lov_objids_in_file));
235 mds->mds_max_mdsize = lov_mds_md_size(stripes);
236 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
237 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
238 "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
241 /* If we added a target we have to reconnect the llogs */
242 /* We only _need_ to do this at first add (idx), or the first time
243 after recovery. However, it should now be safe to call anytime. */
244 mutex_down(&obd->obd_dev_sem);
245 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
246 mutex_up(&obd->obd_dev_sem);
248 /*XXX this notifies the MDD until lov handling use old mds code */
249 if (obd->obd_upcall.onu_owner) {
250 LASSERT(obd->obd_upcall.onu_upcall != NULL);
251 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
252 obd->obd_upcall.onu_owner);
255 OBD_FREE(ld, sizeof(*ld));
260 #define MDSLOV_NO_INDEX -1
262 /* Inform MDS about new/updated target */
263 static int mds_lov_update_mds(struct obd_device *obd,
264 struct obd_device *watched,
265 __u32 idx, struct obd_uuid *uuid)
267 struct mds_obd *mds = &obd->u.mds;
272 old_count = mds->mds_lov_desc.ld_tgt_count;
273 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
277 CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
278 idx, obd->obd_recovering, obd->obd_async_recov, old_count,
279 mds->mds_lov_desc.ld_tgt_count);
281 /* idx is set as data from lov_notify. */
282 if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
283 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
284 CERROR("index %d > count %d!\n", idx,
285 mds->mds_lov_desc.ld_tgt_count);
289 if (idx >= mds->mds_lov_objids_in_file) {
290 /* We never read this lastid; ask the osc */
292 __u32 size = sizeof(lastid);
293 rc = obd_get_info(watched->obd_self_export,
295 "last_id", &size, &lastid);
298 mds->mds_lov_objids[idx] = lastid;
299 mds->mds_lov_objids_dirty = 1;
300 mds_lov_write_objids(obd);
302 /* We have read this lastid from disk; tell the osc.
303 Don't call this during recovery. */
304 rc = mds_lov_set_nextid(obd);
307 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
308 mds->mds_lov_objids[idx], idx);
314 /* update the LOV-OSC knowledge of the last used object id's */
315 int mds_lov_connect(struct obd_device *obd, char * lov_name)
317 struct mds_obd *mds = &obd->u.mds;
318 struct lustre_handle conn = {0,};
319 struct obd_connect_data *data;
323 if (IS_ERR(mds->mds_osc_obd))
324 RETURN(PTR_ERR(mds->mds_osc_obd));
326 if (mds->mds_osc_obd)
329 mds->mds_osc_obd = class_name2obd(lov_name);
330 if (!mds->mds_osc_obd) {
331 CERROR("MDS cannot locate LOV %s\n", lov_name);
332 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
336 OBD_ALLOC(data, sizeof(*data));
339 data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
340 OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
341 OBD_CONNECT_OSS_CAPA;
342 data->ocd_version = LUSTRE_VERSION_CODE;
343 data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
344 /* NB: lov_connect() needs to fill in .ocd_index for each OST */
345 rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
346 OBD_FREE(data, sizeof(*data));
348 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
349 mds->mds_osc_obd = ERR_PTR(rc);
352 mds->mds_osc_exp = class_conn2export(&conn);
354 rc = obd_register_observer(mds->mds_osc_obd, obd);
356 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
358 GOTO(err_discon, rc);
361 /* Deny new client connections until we are sure we have some OSTs */
362 obd->obd_no_conn = 1;
364 rc = mds_lov_read_objids(obd);
366 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
370 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
374 /* tgt_count may be 0! */
375 rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
377 CERROR("failed to initialize catalog %d\n", rc);
381 /* If we're mounting this code for the first time on an existing FS,
382 * we need to populate the objids array from the real OST values */
383 if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
384 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
385 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
386 "last_id", &size, mds->mds_lov_objids);
388 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
389 CWARN("got last object "LPU64" from OST %d\n",
390 mds->mds_lov_objids[i], i);
391 mds->mds_lov_objids_dirty = 1;
392 rc = mds_lov_write_objids(obd);
394 CERROR("got last objids from OSTs, but error "
395 "writing objids file: %d\n", rc);
399 /* I want to see a callback happen when the OBD moves to a
400 * "For General Use" state, and that's when we'll call
401 * set_nextid(). The class driver can help us here, because
402 * it can use the obd_recovering flag to determine when the
403 * the OBD is full available. */
404 /* MDD device will care about that
405 if (!obd->obd_recovering)
406 rc = mds_postrecov(obd);
411 obd_register_observer(mds->mds_osc_obd, NULL);
413 obd_disconnect(mds->mds_osc_exp);
414 mds->mds_osc_exp = NULL;
415 mds->mds_osc_obd = ERR_PTR(rc);
419 int mds_lov_disconnect(struct obd_device *obd)
421 struct mds_obd *mds = &obd->u.mds;
425 if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
426 obd_register_observer(mds->mds_osc_obd, NULL);
428 /* The actual disconnect of the mds_lov will be called from
429 * class_disconnect_exports from mds_lov_clean. So we have to
430 * ensure that class_cleanup doesn't fail due to the extra ref
431 * we're holding now. The mechanism to do that already exists -
432 * the obd_force flag. We'll drop the final ref to the
433 * mds_osc_exp in mds_cleanup. */
434 mds->mds_osc_obd->obd_force = 1;
440 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
441 void *karg, void *uarg)
443 static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
444 struct obd_device *obd = exp->exp_obd;
445 struct mds_obd *mds = &obd->u.mds;
446 struct obd_ioctl_data *data = karg;
447 struct lvfs_run_ctxt saved;
451 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
454 case OBD_IOC_RECORD: {
455 char *name = data->ioc_inlbuf1;
456 if (mds->mds_cfg_llh)
459 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
460 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
461 &mds->mds_cfg_llh, NULL, name);
463 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
466 mds->mds_cfg_llh = NULL;
467 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
472 case OBD_IOC_ENDRECORD: {
473 if (!mds->mds_cfg_llh)
476 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
477 rc = llog_close(mds->mds_cfg_llh);
478 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
480 mds->mds_cfg_llh = NULL;
484 case OBD_IOC_CLEAR_LOG: {
485 char *name = data->ioc_inlbuf1;
486 if (mds->mds_cfg_llh)
489 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
490 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
491 &mds->mds_cfg_llh, NULL, name);
493 llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
496 rc = llog_destroy(mds->mds_cfg_llh);
497 llog_free_handle(mds->mds_cfg_llh);
499 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
501 mds->mds_cfg_llh = NULL;
505 case OBD_IOC_DORECORD: {
507 struct llog_rec_hdr rec;
508 if (!mds->mds_cfg_llh)
511 rec.lrh_len = llog_data_len(data->ioc_plen1);
513 if (data->ioc_type == LUSTRE_CFG_TYPE) {
514 rec.lrh_type = OBD_CFG_REC;
516 CERROR("unknown cfg record type:%d \n", data->ioc_type);
520 OBD_ALLOC(cfg_buf, data->ioc_plen1);
523 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
525 OBD_FREE(cfg_buf, data->ioc_plen1);
529 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
530 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
532 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
534 OBD_FREE(cfg_buf, data->ioc_plen1);
538 case OBD_IOC_PARSE: {
539 struct llog_ctxt *ctxt =
540 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
541 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
542 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
543 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
550 case OBD_IOC_DUMP_LOG: {
551 struct llog_ctxt *ctxt =
552 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
553 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
554 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
555 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
563 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
564 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
568 case OBD_IOC_SET_READONLY: {
570 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
571 BDEVNAME_DECLARE_STORAGE(tmp);
572 CERROR("*** setting device %s read-only ***\n",
573 ll_bdevname(obd->u.obt.obt_sb, tmp));
575 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
577 rc = fsfilt_commit(obd, inode, handle, 1);
579 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
580 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
582 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
586 case OBD_IOC_CATLOGLIST: {
587 int count = mds->mds_lov_desc.ld_tgt_count;
588 rc = llog_catalog_list(obd, count, data);
592 case OBD_IOC_LLOG_CHECK:
593 case OBD_IOC_LLOG_CANCEL:
594 case OBD_IOC_LLOG_REMOVE: {
595 struct llog_ctxt *ctxt =
596 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
600 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
601 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
602 rc = llog_ioctl(ctxt, cmd, data);
603 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
604 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
605 group = FILTER_GROUP_MDS0 + mds->mds_id;
606 rc2 = obd_set_info_async(mds->mds_osc_exp,
607 strlen(KEY_MDS_CONN), KEY_MDS_CONN,
608 sizeof(group), &group, NULL);
613 case OBD_IOC_LLOG_INFO:
614 case OBD_IOC_LLOG_PRINT: {
615 struct llog_ctxt *ctxt =
616 llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
618 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
619 rc = llog_ioctl(ctxt, cmd, data);
620 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
625 case OBD_IOC_ABORT_RECOVERY:
626 CERROR("aborting recovery for device %s\n", obd->obd_name);
627 target_stop_recovery_thread(obd);
631 CDEBUG(D_INFO, "unknown command %x\n", cmd);
638 struct mds_lov_sync_info {
639 struct obd_device *mlsi_obd; /* the lov device to sync */
640 struct obd_device *mlsi_watched; /* target osc */
641 __u32 mlsi_index; /* index of target */
644 static int mds_propagate_capa_keys(struct mds_obd *mds)
646 struct lustre_capa_key *key;
651 if (!mds->mds_capa_keys)
654 for (i = 0; i < 2; i++) {
655 key = &mds->mds_capa_keys[i];
656 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
658 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
659 KEY_CAPA_KEY, sizeof(*key), key, NULL);
661 DEBUG_CAPA_KEY(D_ERROR, key,
662 "propagate failed (rc = %d) for", rc);
670 /* We only sync one osc at a time, so that we don't have to hold
671 any kind of lock on the whole mds_lov_desc, which may change
672 (grow) as a result of mds_lov_add_ost. This also avoids any
673 kind of mismatch between the lov_desc and the mds_lov_desc,
674 which are not in lock-step during lov_add_obd */
675 static int __mds_lov_synchronize(void *data)
677 struct mds_lov_sync_info *mlsi = data;
678 struct obd_device *obd = mlsi->mlsi_obd;
679 struct obd_device *watched = mlsi->mlsi_watched;
680 struct mds_obd *mds = &obd->u.mds;
681 struct obd_uuid *uuid;
682 __u32 idx = mlsi->mlsi_index;
683 struct mds_group_info mgi;
687 OBD_FREE(mlsi, sizeof(*mlsi));
691 uuid = &watched->u.cli.cl_target_uuid;
694 rc = mds_lov_update_mds(obd, watched, idx, uuid);
697 mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
699 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
700 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
704 /* propagate capability keys */
705 rc = mds_propagate_capa_keys(mds);
709 rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
710 mds->mds_lov_desc.ld_tgt_count,
714 CERROR("%s: failed at llog_origin_connect: %d\n",
719 LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
720 obd->obd_name, obd_uuid2str(uuid));
722 * FIXME: this obd_stopping was useless,
723 * since obd in mdt layer was set
725 if (obd->obd_stopping)
726 GOTO(out, rc = -ENODEV);
728 rc = mds_lov_clear_orphans(mds, uuid);
730 CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
735 if (obd->obd_upcall.onu_owner) {
737 * This is a hack for mds_notify->mdd_notify. When the mds obd
738 * in mdd is removed, This hack should be removed.
740 LASSERT(obd->obd_upcall.onu_upcall != NULL);
741 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
742 obd->obd_upcall.onu_owner);
750 int mds_lov_synchronize(void *data)
752 struct mds_lov_sync_info *mlsi = data;
755 if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
756 /* There is still a watched target,
757 but we don't know its index */
758 sprintf(name, "ll_sync_tgt");
760 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
761 ptlrpc_daemonize(name);
763 RETURN(__mds_lov_synchronize(data));
766 int mds_lov_start_synchronize(struct obd_device *obd,
767 struct obd_device *watched,
768 void *data, int nonblock)
770 struct mds_lov_sync_info *mlsi;
777 OBD_ALLOC(mlsi, sizeof(*mlsi));
781 mlsi->mlsi_obd = obd;
782 mlsi->mlsi_watched = watched;
784 mlsi->mlsi_index = *(__u32 *)data;
786 mlsi->mlsi_index = MDSLOV_NO_INDEX;
788 /* Although class_export_get(obd->obd_self_export) would lock
789 the MDS in place, since it's only a self-export
790 it doesn't lock the LOV in place. The LOV can be disconnected
791 during MDS precleanup, leaving nothing for __mds_lov_synchronize.
792 Simply taking an export ref on the LOV doesn't help, because it's
793 still disconnected. Taking an obd reference insures that we don't
794 disconnect the LOV. This of course means a cleanup won't
795 finish for as long as the sync is blocking. */
799 /* Synchronize in the background */
800 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
801 CLONE_VM | CLONE_FILES);
803 CERROR("%s: error starting mds_lov_synchronize: %d\n",
807 CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
808 "thread=%d\n", obd->obd_name,
809 mlsi->mlsi_index, rc);
813 rc = __mds_lov_synchronize((void *)mlsi);
819 int mds_notify(struct obd_device *obd, struct obd_device *watched,
820 enum obd_notify_event ev, void *data)
826 /* We only handle these: */
827 case OBD_NOTIFY_ACTIVE:
828 case OBD_NOTIFY_SYNC:
829 case OBD_NOTIFY_SYNC_NONBLOCK:
831 case OBD_NOTIFY_CONFIG:
832 /* Open for clients */
833 obd->obd_no_conn = 0;
838 CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
839 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
840 CERROR("unexpected notification of %s %s!\n",
841 watched->obd_type->typ_name, watched->obd_name);
845 if (obd->obd_recovering) {
846 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
848 obd_uuid2str(&watched->u.cli.cl_target_uuid));
849 /* We still have to fix the lov descriptor for ost's added
850 after the mdt in the config log. They didn't make it into
852 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
855 /* We should update init llog here too for replay unlink and
856 * possiable llog init race when recovery complete */
857 mutex_down(&obd->obd_dev_sem);
858 llog_cat_initialize(obd, NULL,
859 obd->u.mds.mds_lov_desc.ld_tgt_count,
860 &watched->u.cli.cl_target_uuid);
861 mutex_up(&obd->obd_dev_sem);
865 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
866 rc = mds_lov_start_synchronize(obd, watched, data,
867 !(ev == OBD_NOTIFY_SYNC));
869 lquota_recovery(mds_quota_interface_ref, obd);
874 /* Convert the on-disk LOV EA structre.
875 * We always try to convert from an old LOV EA format to the common in-memory
876 * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
877 * then convert back to the new on-disk format and save it back to disk
878 * (obd_packmd() only ever saves to the new on-disk format) so we don't have
879 * to convert it each time this inode is accessed.
881 * This function is a bit interesting in the error handling. We can safely
882 * ship the old lmm to the client in case of failure, since it uses the same
883 * obd_unpackmd() code and can do the conversion if the MDS fails for some
884 * reason. We will not delete the old lmm data until we have written the
885 * new format lmm data in fsfilt_set_md(). */
886 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
887 struct lov_mds_md *lmm, int lmm_size)
889 struct lov_stripe_md *lsm = NULL;
894 if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
895 le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
898 CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
899 inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
902 rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
906 rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
911 handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
912 if (IS_ERR(handle)) {
913 rc = PTR_ERR(handle);
917 rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
919 err = fsfilt_commit(obd, inode, handle, 0);
921 rc = err ? err : lmm_size;
924 obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
929 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
930 struct lov_desc *desc)
933 for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
934 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
935 le64_to_cpu(lmm->lmm_objects[i].l_object_id);
938 EXPORT_SYMBOL(mds_objids_from_lmm);