Whamcloud - gitweb
land b1_5 onto HEAD
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         ENTRY;
49
50         lock_kernel();
51         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52                 if (ids[i] > (mds->mds_lov_objids)[i]) {
53                         (mds->mds_lov_objids)[i] = ids[i];
54                         mds->mds_lov_objids_dirty = 1;
55                 }
56         unlock_kernel();
57         EXIT;
58 }
59
60 static int mds_lov_read_objids(struct obd_device *obd)
61 {
62         struct mds_obd *mds = &obd->u.mds;
63         obd_id *ids;
64         loff_t off = 0;
65         int i, rc, size;
66         ENTRY;
67
68         LASSERT(!mds->mds_lov_objids_size);
69         LASSERT(!mds->mds_lov_objids_dirty);
70
71         /* Read everything in the file, even if our current lov desc
72            has fewer targets. Old targets not in the lov descriptor
73            during mds setup may still have valid objids. */
74         size = mds->mds_lov_objid_filp->f_dentry->d_inode->i_size;
75         if (size == 0)
76                 RETURN(0);
77
78         OBD_ALLOC(ids, size);
79         if (ids == NULL)
80                 RETURN(-ENOMEM);
81         mds->mds_lov_objids = ids;
82         mds->mds_lov_objids_size = size;
83
84         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
85         if (rc < 0) {
86                 CERROR("Error reading objids %d\n", rc);
87                 RETURN(rc);
88         }
89
90         mds->mds_lov_objids_in_file = size / sizeof(*ids);
91
92         for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
93                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
94                        mds->mds_lov_objids[i], i);
95         }
96         RETURN(0);
97 }
98
99 int mds_lov_write_objids(struct obd_device *obd)
100 {
101         struct mds_obd *mds = &obd->u.mds;
102         loff_t off = 0;
103         int i, rc, tgts;
104         ENTRY;
105
106         if (!mds->mds_lov_objids_dirty)
107                 RETURN(0);
108
109         tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
110
111         if (!tgts)
112                 RETURN(0);
113
114         for (i = 0; i < tgts; i++)
115                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
116                        mds->mds_lov_objids[i], i);
117
118         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
119                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
120                                  &off, 0);
121         if (rc >= 0) {
122                 mds->mds_lov_objids_dirty = 0;
123                 rc = 0;
124         }
125
126         RETURN(rc);
127 }
128
129 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
130 {
131         int rc;
132         struct obdo oa;
133         struct obd_trans_info oti = {0};
134         struct lov_stripe_md  *empty_ea = NULL;
135         ENTRY;
136
137         LASSERT(mds->mds_lov_objids != NULL);
138
139         /* This create will in fact either create or destroy:  If the OST is
140          * missing objects below this ID, they will be created.  If it finds
141          * objects above this ID, they will be removed. */
142         memset(&oa, 0, sizeof(oa));
143         oa.o_valid = OBD_MD_FLFLAGS;
144         oa.o_flags = OBD_FL_DELORPHAN;
145         if (ost_uuid != NULL) {
146                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
147                 oa.o_valid |= OBD_MD_FLINLINE;
148         }
149         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
150
151         RETURN(rc);
152 }
153
154 /* update the LOV-OSC knowledge of the last used object id's */
155 int mds_lov_set_nextid(struct obd_device *obd)
156 {
157         struct mds_obd *mds = &obd->u.mds;
158         int rc;
159         ENTRY;
160
161         LASSERT(!obd->obd_recovering);
162
163         LASSERT(mds->mds_lov_objids != NULL);
164
165         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
166                                 KEY_NEXT_ID,
167                                 mds->mds_lov_desc.ld_tgt_count *
168                                 sizeof(*mds->mds_lov_objids),
169                                 mds->mds_lov_objids, NULL);
170
171         if (rc)
172                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
173                         obd->obd_name, rc);
174
175         RETURN(rc);
176 }
177
178 /* Update the lov desc for a new size lov. */
179 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
180 {
181         struct mds_obd *mds = &obd->u.mds;
182         struct lov_desc *ld;
183         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
184         int rc = 0;
185         ENTRY;
186
187         OBD_ALLOC(ld, sizeof(*ld));
188         if (!ld)
189                 RETURN(-ENOMEM);
190
191         rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
192                           &valsize, ld);
193         if (rc)
194                 GOTO(out, rc);
195
196         /* The size of the LOV target table may have increased. */
197         size = ld->ld_tgt_count * sizeof(obd_id);
198         if ((mds->mds_lov_objids_size == 0) ||
199             (size > mds->mds_lov_objids_size)) {
200                 obd_id *ids;
201
202                 /* add room by powers of 2 */
203                 size = 1;
204                 while (size < ld->ld_tgt_count)
205                         size = size << 1;
206                 size = size * sizeof(obd_id);
207
208                 OBD_ALLOC(ids, size);
209                 if (ids == NULL)
210                         GOTO(out, rc = -ENOMEM);
211                 memset(ids, 0, size);
212                 if (mds->mds_lov_objids_size) {
213                         obd_id *old_ids = mds->mds_lov_objids;
214                         memcpy(ids, mds->mds_lov_objids,
215                                mds->mds_lov_objids_size);
216                         mds->mds_lov_objids = ids;
217                         OBD_FREE(old_ids, mds->mds_lov_objids_size);
218                 }
219                 mds->mds_lov_objids = ids;
220                 mds->mds_lov_objids_size = size;
221         }
222
223         /* Don't change the mds_lov_desc until the objids size matches the
224            count (paranoia) */
225         mds->mds_lov_desc = *ld;
226         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
227                mds->mds_lov_desc.ld_tgt_count);
228
229         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
230                         max(mds->mds_lov_desc.ld_tgt_count,
231                             mds->mds_lov_objids_in_file));
232         mds->mds_max_mdsize = lov_mds_md_size(stripes);
233         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
234         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
235                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
236                stripes);
237
238 out:
239         OBD_FREE(ld, sizeof(*ld));
240         RETURN(rc);
241 }
242
243
244 #define MDSLOV_NO_INDEX -1
245
246 /* Inform MDS about new/updated target */
247 static int mds_lov_update_mds(struct obd_device *obd,
248                               struct obd_device *watched,
249                               __u32 idx, struct obd_uuid *uuid)
250 {
251         struct mds_obd *mds = &obd->u.mds;
252         int old_count;
253         int rc = 0;
254         ENTRY;
255
256         old_count = mds->mds_lov_desc.ld_tgt_count;
257         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
258         if (rc)
259                 RETURN(rc);
260
261         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
262                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
263                mds->mds_lov_desc.ld_tgt_count);
264
265         /* idx is set as data from lov_notify. */
266         if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
267                 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
268                         CERROR("index %d > count %d!\n", idx,
269                                mds->mds_lov_desc.ld_tgt_count);
270                         RETURN(-EINVAL);
271                 }
272
273                 if (idx >= mds->mds_lov_objids_in_file) {
274                         /* We never read this lastid; ask the osc */
275                         obd_id lastid;
276                         __u32 size = sizeof(lastid);
277                         rc = obd_get_info(watched->obd_self_export,
278                                           strlen("last_id"),
279                                           "last_id", &size, &lastid);
280                         if (rc)
281                                 RETURN(rc);
282                         mds->mds_lov_objids[idx] = lastid;
283                         mds->mds_lov_objids_dirty = 1;
284                         mds_lov_write_objids(obd);
285                 } else {
286                         /* We have read this lastid from disk; tell the osc.
287                            Don't call this during recovery. */
288                         rc = mds_lov_set_nextid(obd);
289                 }
290
291                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
292                       mds->mds_lov_objids[idx], idx);
293         }
294
295         /* If we added a target we have to reconnect the llogs */
296         /* We only _need_ to do this at first add (idx), or the first time
297            after recovery.  However, it should now be safe to call anytime. */
298         CDEBUG(D_CONFIG, "reset llogs idx=%d\n", idx);
299         mutex_down(&obd->obd_dev_sem);
300         llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, uuid);
301         mutex_up(&obd->obd_dev_sem);
302
303         RETURN(rc);
304 }
305
306 /* update the LOV-OSC knowledge of the last used object id's */
307 int mds_lov_connect(struct obd_device *obd, char * lov_name)
308 {
309         struct mds_obd *mds = &obd->u.mds;
310         struct lustre_handle conn = {0,};
311         struct obd_connect_data *data;
312         int rc, i;
313         ENTRY;
314
315         if (IS_ERR(mds->mds_osc_obd))
316                 RETURN(PTR_ERR(mds->mds_osc_obd));
317
318         if (mds->mds_osc_obd)
319                 RETURN(0);
320
321         mds->mds_osc_obd = class_name2obd(lov_name);
322         if (!mds->mds_osc_obd) {
323                 CERROR("MDS cannot locate LOV %s\n", lov_name);
324                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
325                 RETURN(-ENOTCONN);
326         }
327
328         OBD_ALLOC(data, sizeof(*data));
329         if (data == NULL)
330                 RETURN(-ENOMEM);
331         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
332                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64;
333         data->ocd_version = LUSTRE_VERSION_CODE;
334         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
335         rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data);
336         OBD_FREE(data, sizeof(*data));
337         if (rc) {
338                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
339                 mds->mds_osc_obd = ERR_PTR(rc);
340                 RETURN(rc);
341         }
342         mds->mds_osc_exp = class_conn2export(&conn);
343
344         rc = obd_register_observer(mds->mds_osc_obd, obd);
345         if (rc) {
346                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
347                        lov_name, rc);
348                 GOTO(err_discon, rc);
349         }
350
351         rc = mds_lov_read_objids(obd);
352         if (rc) {
353                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
354                 GOTO(err_reg, rc);
355         }
356
357         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
358         if (rc)
359                 GOTO(err_reg, rc);
360
361         /* tgt_count may be 0! */
362         rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
363         if (rc) {
364                 CERROR("failed to initialize catalog %d\n", rc);
365                 GOTO(err_reg, rc);
366         }
367
368         /* If we're mounting this code for the first time on an existing FS,
369          * we need to populate the objids array from the real OST values */
370         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
371                 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
372                 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
373                                   "last_id", &size, mds->mds_lov_objids);
374                 if (!rc) {
375                         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
376                                 CWARN("got last object "LPU64" from OST %d\n",
377                                       mds->mds_lov_objids[i], i);
378                         mds->mds_lov_objids_dirty = 1;
379                         rc = mds_lov_write_objids(obd);
380                         if (rc)
381                                 CERROR("got last objids from OSTs, but error "
382                                        "writing objids file: %d\n", rc);
383                 }
384         }
385
386         /* I want to see a callback happen when the OBD moves to a
387          * "For General Use" state, and that's when we'll call
388          * set_nextid().  The class driver can help us here, because
389          * it can use the obd_recovering flag to determine when the
390          * the OBD is full available. */
391         if (!obd->obd_recovering)
392                 rc = mds_postrecov(obd);
393         RETURN(rc);
394
395 err_reg:
396         obd_register_observer(mds->mds_osc_obd, NULL);
397 err_discon:
398         obd_disconnect(mds->mds_osc_exp);
399         mds->mds_osc_exp = NULL;
400         mds->mds_osc_obd = ERR_PTR(rc);
401         RETURN(rc);
402 }
403
404 int mds_lov_disconnect(struct obd_device *obd)
405 {
406         struct mds_obd *mds = &obd->u.mds;
407         int rc = 0;
408         ENTRY;
409
410         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
411                 obd_register_observer(mds->mds_osc_obd, NULL);
412
413                 /* The actual disconnect of the mds_lov will be called from
414                  * class_disconnect_exports from mds_lov_clean. So we have to
415                  * ensure that class_cleanup doesn't fail due to the extra ref
416                  * we're holding now. The mechanism to do that already exists -
417                  * the obd_force flag. We'll drop the final ref to the
418                  * mds_osc_exp in mds_cleanup. */
419                 mds->mds_osc_obd->obd_force = 1;
420         }
421
422         RETURN(rc);
423 }
424
425 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
426                   void *karg, void *uarg)
427 {
428         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
429         struct obd_device *obd = exp->exp_obd;
430         struct mds_obd *mds = &obd->u.mds;
431         struct obd_ioctl_data *data = karg;
432         struct lvfs_run_ctxt saved;
433         int rc = 0;
434
435         ENTRY;
436         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
437
438         switch (cmd) {
439         case OBD_IOC_RECORD: {
440                 char *name = data->ioc_inlbuf1;
441                 if (mds->mds_cfg_llh)
442                         RETURN(-EBUSY);
443
444                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
445                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
446                                  &mds->mds_cfg_llh, NULL, name);
447                 if (rc == 0)
448                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
449                                          &cfg_uuid);
450                 else
451                         mds->mds_cfg_llh = NULL;
452                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
453
454                 RETURN(rc);
455         }
456
457         case OBD_IOC_ENDRECORD: {
458                 if (!mds->mds_cfg_llh)
459                         RETURN(-EBADF);
460
461                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
462                 rc = llog_close(mds->mds_cfg_llh);
463                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
464
465                 mds->mds_cfg_llh = NULL;
466                 RETURN(rc);
467         }
468
469         case OBD_IOC_CLEAR_LOG: {
470                 char *name = data->ioc_inlbuf1;
471                 if (mds->mds_cfg_llh)
472                         RETURN(-EBUSY);
473
474                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
475                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
476                                  &mds->mds_cfg_llh, NULL, name);
477                 if (rc == 0) {
478                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
479                                          NULL);
480
481                         rc = llog_destroy(mds->mds_cfg_llh);
482                         llog_free_handle(mds->mds_cfg_llh);
483                 }
484                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
485
486                 mds->mds_cfg_llh = NULL;
487                 RETURN(rc);
488         }
489
490         case OBD_IOC_DORECORD: {
491                 char *cfg_buf;
492                 struct llog_rec_hdr rec;
493                 if (!mds->mds_cfg_llh)
494                         RETURN(-EBADF);
495
496                 rec.lrh_len = llog_data_len(data->ioc_plen1);
497
498                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
499                         rec.lrh_type = OBD_CFG_REC;
500                 } else {
501                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
502                         RETURN(-EINVAL);
503                 }
504
505                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
506                 if (cfg_buf == NULL)
507                         RETURN(-EINVAL);
508                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
509                 if (rc) {
510                         OBD_FREE(cfg_buf, data->ioc_plen1);
511                         RETURN(rc);
512                 }
513
514                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
515                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
516                                     cfg_buf, -1);
517                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
518
519                 OBD_FREE(cfg_buf, data->ioc_plen1);
520                 RETURN(rc);
521         }
522
523         case OBD_IOC_PARSE: {
524                 struct llog_ctxt *ctxt =
525                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
526                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
527                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
528                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
529                 if (rc)
530                         RETURN(rc);
531
532                 RETURN(rc);
533         }
534
535         case OBD_IOC_DUMP_LOG: {
536                 struct llog_ctxt *ctxt =
537                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
538                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
539                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
540                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
541                 if (rc)
542                         RETURN(rc);
543
544                 RETURN(rc);
545         }
546
547         case OBD_IOC_SYNC: {
548                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
549                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
550                 RETURN(rc);
551         }
552
553         case OBD_IOC_SET_READONLY: {
554                 void *handle;
555                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
556                 BDEVNAME_DECLARE_STORAGE(tmp);
557                 CERROR("*** setting device %s read-only ***\n",
558                        ll_bdevname(obd->u.obt.obt_sb, tmp));
559
560                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
561                 if (!IS_ERR(handle))
562                         rc = fsfilt_commit(obd, inode, handle, 1);
563
564                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
565                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
566
567                 lvfs_set_rdonly(lvfs_sbdev(obd->u.obt.obt_sb));
568                 RETURN(0);
569         }
570
571         case OBD_IOC_CATLOGLIST: {
572                 int count = mds->mds_lov_desc.ld_tgt_count;
573                 rc = llog_catalog_list(obd, count, data);
574                 RETURN(rc);
575
576         }
577         case OBD_IOC_LLOG_CHECK:
578         case OBD_IOC_LLOG_CANCEL:
579         case OBD_IOC_LLOG_REMOVE: {
580                 struct llog_ctxt *ctxt =
581                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
582                 int rc2;
583
584                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
585                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
586                 rc = llog_ioctl(ctxt, cmd, data);
587                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
588                 llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
589                 rc2 = obd_set_info_async(mds->mds_osc_exp,
590                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
591                                          0, NULL, NULL);
592                 if (!rc)
593                         rc = rc2;
594                 RETURN(rc);
595         }
596         case OBD_IOC_LLOG_INFO:
597         case OBD_IOC_LLOG_PRINT: {
598                 struct llog_ctxt *ctxt =
599                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
600
601                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
602                 rc = llog_ioctl(ctxt, cmd, data);
603                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
604
605                 RETURN(rc);
606         }
607
608         case OBD_IOC_ABORT_RECOVERY:
609                 CERROR("aborting recovery for device %s\n", obd->obd_name);
610                 target_abort_recovery(obd);
611                 RETURN(0);
612
613         default:
614                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
615                 RETURN(-EINVAL);
616         }
617         RETURN(0);
618
619 }
620
621 struct mds_lov_sync_info {
622         struct obd_device *mlsi_obd;     /* the lov device to sync */
623         struct obd_device *mlsi_watched; /* target osc */
624         __u32              mlsi_index;   /* index of target */
625 };
626
627 /* We only sync one osc at a time, so that we don't have to hold
628    any kind of lock on the whole mds_lov_desc, which may change
629    (grow) as a result of mds_lov_add_ost.  This also avoids any
630    kind of mismatch between the lov_desc and the mds_lov_desc,
631    which are not in lock-step during lov_add_obd */
632 static int __mds_lov_synchronize(void *data)
633 {
634         struct mds_lov_sync_info *mlsi = data;
635         struct obd_device *obd = mlsi->mlsi_obd;
636         struct obd_device *watched = mlsi->mlsi_watched;
637         struct mds_obd *mds = &obd->u.mds;
638         struct obd_uuid *uuid;
639         __u32  idx = mlsi->mlsi_index;
640         int rc = 0;
641         ENTRY;
642
643         OBD_FREE(mlsi, sizeof(*mlsi));
644
645         LASSERT(obd);
646         LASSERT(watched);
647         uuid = &watched->u.cli.cl_target_uuid;
648         LASSERT(uuid);
649
650         rc = mds_lov_update_mds(obd, watched, idx, uuid);
651         if (rc != 0)
652                 GOTO(out, rc);
653
654         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
655                                 KEY_MDS_CONN, 0, uuid, NULL);
656         if (rc != 0)
657                 GOTO(out, rc);
658
659         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
660                           mds->mds_lov_desc.ld_tgt_count,
661                           NULL, NULL, uuid);
662
663         if (rc != 0) {
664                 CERROR("%s: failed at llog_origin_connect: %d\n",
665                        obd->obd_name, rc);
666                 GOTO(out, rc);
667         }
668
669         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
670               obd->obd_name, obd_uuid2str(uuid));
671
672         if (obd->obd_stopping)
673                 GOTO(out, rc = -ENODEV);
674
675         rc = mds_lov_clear_orphans(mds, uuid);
676         if (rc != 0) {
677                 CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
678                        obd->obd_name, rc);
679                 GOTO(out, rc);
680         }
681
682         EXIT;
683 out:
684         class_decref(obd);
685         return rc;
686 }
687
688 int mds_lov_synchronize(void *data)
689 {
690         struct mds_lov_sync_info *mlsi = data;
691         char name[20];
692
693         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
694                 /* There is still a watched target,
695                 but we don't know its index */
696                 sprintf(name, "ll_sync_tgt");
697         else
698                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
699         ptlrpc_daemonize(name);
700
701         RETURN(__mds_lov_synchronize(data));
702 }
703
704 int mds_lov_start_synchronize(struct obd_device *obd,
705                               struct obd_device *watched,
706                               void *data, int nonblock)
707 {
708         struct mds_lov_sync_info *mlsi;
709         int rc;
710
711         ENTRY;
712
713         LASSERT(watched);
714
715         OBD_ALLOC(mlsi, sizeof(*mlsi));
716         if (mlsi == NULL)
717                 RETURN(-ENOMEM);
718
719         mlsi->mlsi_obd = obd;
720         mlsi->mlsi_watched = watched;
721         if (data)
722                 mlsi->mlsi_index = *(__u32 *)data;
723         else
724                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
725
726         /* Although class_export_get(obd->obd_self_export) would lock
727            the MDS in place, since it's only a self-export
728            it doesn't lock the LOV in place.  The LOV can be disconnected
729            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
730            Simply taking an export ref on the LOV doesn't help, because it's
731            still disconnected. Taking an obd reference insures that we don't
732            disconnect the LOV.  This of course means a cleanup won't
733            finish for as long as the sync is blocking. */
734         class_incref(obd);
735
736         if (nonblock) {
737                 /* Synchronize in the background */
738                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
739                                        CLONE_VM | CLONE_FILES);
740                 if (rc < 0) {
741                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
742                                obd->obd_name, rc);
743                         class_decref(obd);
744                 } else {
745                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
746                                "thread=%d\n", obd->obd_name,
747                                mlsi->mlsi_index, rc);
748                         rc = 0;
749                 }
750         } else {
751                 rc = __mds_lov_synchronize((void *)mlsi);
752         }
753
754         RETURN(rc);
755 }
756
757 int mds_notify(struct obd_device *obd, struct obd_device *watched,
758                enum obd_notify_event ev, void *data)
759 {
760         int rc = 0;
761         ENTRY;
762
763         switch (ev) {
764         /* We only handle these: */
765         case OBD_NOTIFY_ACTIVE:
766         case OBD_NOTIFY_SYNC:
767         case OBD_NOTIFY_SYNC_NONBLOCK:
768                 break;
769         default:
770                 RETURN(0);
771         }
772
773         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
774
775         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
776                 CERROR("unexpected notification of %s %s!\n",
777                        watched->obd_type->typ_name, watched->obd_name);
778                 RETURN(-EINVAL);
779         }
780
781         if (obd->obd_recovering) {
782                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
783                       obd->obd_name,
784                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
785                 /* We still have to fix the lov descriptor for ost's added
786                    after the mdt in the config log.  They didn't make it into
787                    mds_lov_connect. */
788                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
789                 RETURN(rc);
790         }
791
792         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
793         rc = mds_lov_start_synchronize(obd, watched, data,
794                                        !(ev == OBD_NOTIFY_SYNC));
795
796         lquota_recovery(mds_quota_interface_ref, obd);
797
798         RETURN(rc);
799 }
800
801 /* Convert the on-disk LOV EA structre.
802  * We always try to convert from an old LOV EA format to the common in-memory
803  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
804  * then convert back to the new on-disk format and save it back to disk
805  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
806  * to convert it each time this inode is accessed.
807  *
808  * This function is a bit interesting in the error handling.  We can safely
809  * ship the old lmm to the client in case of failure, since it uses the same
810  * obd_unpackmd() code and can do the conversion if the MDS fails for some
811  * reason.  We will not delete the old lmm data until we have written the
812  * new format lmm data in fsfilt_set_md(). */
813 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
814                        struct lov_mds_md *lmm, int lmm_size)
815 {
816         struct lov_stripe_md *lsm = NULL;
817         void *handle;
818         int rc, err;
819         ENTRY;
820
821         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
822             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
823                 RETURN(0);
824
825         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
826                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
827                LOV_MAGIC);
828
829         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
830         if (rc < 0)
831                 GOTO(conv_end, rc);
832
833         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
834         if (rc < 0)
835                 GOTO(conv_free, rc);
836         lmm_size = rc;
837
838         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
839         if (IS_ERR(handle)) {
840                 rc = PTR_ERR(handle);
841                 GOTO(conv_free, rc);
842         }
843
844         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
845
846         err = fsfilt_commit(obd, inode, handle, 0);
847         if (!rc)
848                 rc = err ? err : lmm_size;
849         GOTO(conv_free, rc);
850 conv_free:
851         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
852 conv_end:
853         return rc;
854 }
855
856 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
857                          struct lov_desc *desc)
858 {
859         int i;
860         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
861                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
862                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
863         }
864 }
865