Whamcloud - gitweb
b=11778
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         ENTRY;
49
50         lock_kernel();
51         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52                 if (ids[i] > (mds->mds_lov_objids)[i]) {
53                         (mds->mds_lov_objids)[i] = ids[i];
54                         mds->mds_lov_objids_dirty = 1;
55                 }
56         unlock_kernel();
57         EXIT;
58 }
59
60 static int mds_lov_read_objids(struct obd_device *obd)
61 {
62         struct mds_obd *mds = &obd->u.mds;
63         obd_id *ids;
64         loff_t off = 0;
65         int i, rc, size;
66         ENTRY;
67
68         LASSERT(!mds->mds_lov_objids_size);
69         LASSERT(!mds->mds_lov_objids_dirty);
70
71         /* Read everything in the file, even if our current lov desc
72            has fewer targets. Old targets not in the lov descriptor
73            during mds setup may still have valid objids. */
74         size = mds->mds_lov_objid_filp->f_dentry->d_inode->i_size;
75         if (size == 0)
76                 RETURN(0);
77
78         OBD_ALLOC(ids, size);
79         if (ids == NULL)
80                 RETURN(-ENOMEM);
81         mds->mds_lov_objids = ids;
82         mds->mds_lov_objids_size = size;
83
84         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
85         if (rc < 0) {
86                 CERROR("Error reading objids %d\n", rc);
87                 RETURN(rc);
88         }
89
90         mds->mds_lov_objids_in_file = size / sizeof(*ids);
91
92         for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
93                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
94                        mds->mds_lov_objids[i], i);
95         }
96         RETURN(0);
97 }
98
99 int mds_lov_write_objids(struct obd_device *obd)
100 {
101         struct mds_obd *mds = &obd->u.mds;
102         loff_t off = 0;
103         int i, rc, tgts;
104         ENTRY;
105
106         if (!mds->mds_lov_objids_dirty)
107                 RETURN(0);
108
109         tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
110
111         if (!tgts)
112                 RETURN(0);
113
114         for (i = 0; i < tgts; i++)
115                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
116                        mds->mds_lov_objids[i], i);
117
118         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
119                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
120                                  &off, 0);
121         if (rc >= 0) {
122                 mds->mds_lov_objids_dirty = 0;
123                 rc = 0;
124         }
125
126         RETURN(rc);
127 }
128
129 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
130 {
131         int rc;
132         struct obdo oa;
133         struct obd_trans_info oti = {0};
134         struct lov_stripe_md  *empty_ea = NULL;
135         ENTRY;
136
137         LASSERT(mds->mds_lov_objids != NULL);
138
139         /* This create will in fact either create or destroy:  If the OST is
140          * missing objects below this ID, they will be created.  If it finds
141          * objects above this ID, they will be removed. */
142         memset(&oa, 0, sizeof(oa));
143         oa.o_valid = OBD_MD_FLFLAGS;
144         oa.o_flags = OBD_FL_DELORPHAN;
145         if (ost_uuid != NULL) {
146                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
147                 oa.o_valid |= OBD_MD_FLINLINE;
148         }
149         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
150
151         RETURN(rc);
152 }
153
154 /* update the LOV-OSC knowledge of the last used object id's */
155 int mds_lov_set_nextid(struct obd_device *obd)
156 {
157         struct mds_obd *mds = &obd->u.mds;
158         int rc;
159         ENTRY;
160
161         LASSERT(!obd->obd_recovering);
162
163         LASSERT(mds->mds_lov_objids != NULL);
164
165         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
166                                 KEY_NEXT_ID,
167                                 mds->mds_lov_desc.ld_tgt_count *
168                                 sizeof(*mds->mds_lov_objids),
169                                 mds->mds_lov_objids, NULL);
170
171         if (rc)
172                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
173                         obd->obd_name, rc);
174
175         RETURN(rc);
176 }
177
178 /* Update the lov desc for a new size lov. */
179 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
180 {
181         struct mds_obd *mds = &obd->u.mds;
182         struct lov_desc *ld;
183         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
184         int rc = 0;
185         ENTRY;
186
187         OBD_ALLOC(ld, sizeof(*ld));
188         if (!ld)
189                 RETURN(-ENOMEM);
190
191         rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
192                           &valsize, ld);
193         if (rc)
194                 GOTO(out, rc);
195
196         /* The size of the LOV target table may have increased. */
197         size = ld->ld_tgt_count * sizeof(obd_id);
198         if ((mds->mds_lov_objids_size == 0) ||
199             (size > mds->mds_lov_objids_size)) {
200                 obd_id *ids;
201
202                 /* add room by powers of 2 */
203                 size = 1;
204                 while (size < ld->ld_tgt_count)
205                         size = size << 1;
206                 size = size * sizeof(obd_id);
207
208                 OBD_ALLOC(ids, size);
209                 if (ids == NULL)
210                         GOTO(out, rc = -ENOMEM);
211                 memset(ids, 0, size);
212                 if (mds->mds_lov_objids_size) {
213                         obd_id *old_ids = mds->mds_lov_objids;
214                         memcpy(ids, mds->mds_lov_objids,
215                                mds->mds_lov_objids_size);
216                         mds->mds_lov_objids = ids;
217                         OBD_FREE(old_ids, mds->mds_lov_objids_size);
218                 }
219                 mds->mds_lov_objids = ids;
220                 mds->mds_lov_objids_size = size;
221         }
222
223         /* Don't change the mds_lov_desc until the objids size matches the
224            count (paranoia) */
225         mds->mds_lov_desc = *ld;
226         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
227                mds->mds_lov_desc.ld_tgt_count);
228
229         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
230                         max(mds->mds_lov_desc.ld_tgt_count,
231                             mds->mds_lov_objids_in_file));
232         mds->mds_max_mdsize = lov_mds_md_size(stripes);
233         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
234         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
235                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
236                stripes);
237
238 out:
239         OBD_FREE(ld, sizeof(*ld));
240         RETURN(rc);
241 }
242
243
244 #define MDSLOV_NO_INDEX -1
245
246 /* Inform MDS about new/updated target */
247 static int mds_lov_update_mds(struct obd_device *obd,
248                               struct obd_device *watched,
249                               __u32 idx, struct obd_uuid *uuid)
250 {
251         struct mds_obd *mds = &obd->u.mds;
252         int old_count;
253         int rc = 0;
254         ENTRY;
255
256         old_count = mds->mds_lov_desc.ld_tgt_count;
257         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
258         if (rc)
259                 RETURN(rc);
260
261         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
262                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
263                mds->mds_lov_desc.ld_tgt_count);
264
265         /* idx is set as data from lov_notify. */
266         if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
267                 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
268                         CERROR("index %d > count %d!\n", idx,
269                                mds->mds_lov_desc.ld_tgt_count);
270                         RETURN(-EINVAL);
271                 }
272
273                 if (idx >= mds->mds_lov_objids_in_file) {
274                         /* We never read this lastid; ask the osc */
275                         obd_id lastid;
276                         __u32 size = sizeof(lastid);
277                         rc = obd_get_info(watched->obd_self_export,
278                                           strlen("last_id"),
279                                           "last_id", &size, &lastid);
280                         if (rc)
281                                 RETURN(rc);
282                         mds->mds_lov_objids[idx] = lastid;
283                         mds->mds_lov_objids_dirty = 1;
284                         mds_lov_write_objids(obd);
285                 } else {
286                         /* We have read this lastid from disk; tell the osc.
287                            Don't call this during recovery. */
288                         rc = mds_lov_set_nextid(obd);
289                 }
290
291                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
292                       mds->mds_lov_objids[idx], idx);
293         }
294
295         /* If we added a target we have to reconnect the llogs */
296         /* We only _need_ to do this at first add (idx), or the first time
297            after recovery.  However, it should now be safe to call anytime. */
298         CDEBUG(D_CONFIG, "reset llogs idx=%d\n", idx);
299         mutex_down(&obd->obd_dev_sem);
300         llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, uuid);
301         mutex_up(&obd->obd_dev_sem);
302
303         RETURN(rc);
304 }
305
306 /* update the LOV-OSC knowledge of the last used object id's */
307 int mds_lov_connect(struct obd_device *obd, char * lov_name)
308 {
309         struct mds_obd *mds = &obd->u.mds;
310         struct lustre_handle conn = {0,};
311         struct obd_connect_data *data;
312         int rc, i;
313         ENTRY;
314
315         if (IS_ERR(mds->mds_osc_obd))
316                 RETURN(PTR_ERR(mds->mds_osc_obd));
317
318         if (mds->mds_osc_obd)
319                 RETURN(0);
320
321         mds->mds_osc_obd = class_name2obd(lov_name);
322         if (!mds->mds_osc_obd) {
323                 CERROR("MDS cannot locate LOV %s\n", lov_name);
324                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
325                 RETURN(-ENOTCONN);
326         }
327
328         OBD_ALLOC(data, sizeof(*data));
329         if (data == NULL)
330                 RETURN(-ENOMEM);
331         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
332                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64;
333         data->ocd_version = LUSTRE_VERSION_CODE;
334         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
335         rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data);
336         OBD_FREE(data, sizeof(*data));
337         if (rc) {
338                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
339                 mds->mds_osc_obd = ERR_PTR(rc);
340                 RETURN(rc);
341         }
342         mds->mds_osc_exp = class_conn2export(&conn);
343
344         rc = obd_register_observer(mds->mds_osc_obd, obd);
345         if (rc) {
346                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
347                        lov_name, rc);
348                 GOTO(err_discon, rc);
349         }
350         
351         /* Deny new client connections until we are sure we have some OSTs */
352         obd->obd_no_conn = 1;
353
354         rc = mds_lov_read_objids(obd);
355         if (rc) {
356                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
357                 GOTO(err_reg, rc);
358         }
359
360         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
361         if (rc)
362                 GOTO(err_reg, rc);
363
364         /* tgt_count may be 0! */
365         rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
366         if (rc) {
367                 CERROR("failed to initialize catalog %d\n", rc);
368                 GOTO(err_reg, rc);
369         }
370
371         /* If we're mounting this code for the first time on an existing FS,
372          * we need to populate the objids array from the real OST values */
373         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
374                 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
375                 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
376                                   "last_id", &size, mds->mds_lov_objids);
377                 if (!rc) {
378                         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
379                                 CWARN("got last object "LPU64" from OST %d\n",
380                                       mds->mds_lov_objids[i], i);
381                         mds->mds_lov_objids_dirty = 1;
382                         rc = mds_lov_write_objids(obd);
383                         if (rc)
384                                 CERROR("got last objids from OSTs, but error "
385                                        "writing objids file: %d\n", rc);
386                 }
387         }
388
389         /* I want to see a callback happen when the OBD moves to a
390          * "For General Use" state, and that's when we'll call
391          * set_nextid().  The class driver can help us here, because
392          * it can use the obd_recovering flag to determine when the
393          * the OBD is full available. */
394         if (!obd->obd_recovering)
395                 rc = mds_postrecov(obd);
396         RETURN(rc);
397
398 err_reg:
399         obd_register_observer(mds->mds_osc_obd, NULL);
400 err_discon:
401         obd_disconnect(mds->mds_osc_exp);
402         mds->mds_osc_exp = NULL;
403         mds->mds_osc_obd = ERR_PTR(rc);
404         RETURN(rc);
405 }
406
407 int mds_lov_disconnect(struct obd_device *obd)
408 {
409         struct mds_obd *mds = &obd->u.mds;
410         int rc = 0;
411         ENTRY;
412
413         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
414                 obd_register_observer(mds->mds_osc_obd, NULL);
415
416                 /* The actual disconnect of the mds_lov will be called from
417                  * class_disconnect_exports from mds_lov_clean. So we have to
418                  * ensure that class_cleanup doesn't fail due to the extra ref
419                  * we're holding now. The mechanism to do that already exists -
420                  * the obd_force flag. We'll drop the final ref to the
421                  * mds_osc_exp in mds_cleanup. */
422                 mds->mds_osc_obd->obd_force = 1;
423         }
424
425         RETURN(rc);
426 }
427
428 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
429                   void *karg, void *uarg)
430 {
431         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
432         struct obd_device *obd = exp->exp_obd;
433         struct mds_obd *mds = &obd->u.mds;
434         struct obd_ioctl_data *data = karg;
435         struct lvfs_run_ctxt saved;
436         int rc = 0;
437
438         ENTRY;
439         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
440
441         switch (cmd) {
442         case OBD_IOC_RECORD: {
443                 char *name = data->ioc_inlbuf1;
444                 if (mds->mds_cfg_llh)
445                         RETURN(-EBUSY);
446
447                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
448                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
449                                  &mds->mds_cfg_llh, NULL, name);
450                 if (rc == 0)
451                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
452                                          &cfg_uuid);
453                 else
454                         mds->mds_cfg_llh = NULL;
455                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
456
457                 RETURN(rc);
458         }
459
460         case OBD_IOC_ENDRECORD: {
461                 if (!mds->mds_cfg_llh)
462                         RETURN(-EBADF);
463
464                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
465                 rc = llog_close(mds->mds_cfg_llh);
466                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
467
468                 mds->mds_cfg_llh = NULL;
469                 RETURN(rc);
470         }
471
472         case OBD_IOC_CLEAR_LOG: {
473                 char *name = data->ioc_inlbuf1;
474                 if (mds->mds_cfg_llh)
475                         RETURN(-EBUSY);
476
477                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
478                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
479                                  &mds->mds_cfg_llh, NULL, name);
480                 if (rc == 0) {
481                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
482                                          NULL);
483
484                         rc = llog_destroy(mds->mds_cfg_llh);
485                         llog_free_handle(mds->mds_cfg_llh);
486                 }
487                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
488
489                 mds->mds_cfg_llh = NULL;
490                 RETURN(rc);
491         }
492
493         case OBD_IOC_DORECORD: {
494                 char *cfg_buf;
495                 struct llog_rec_hdr rec;
496                 if (!mds->mds_cfg_llh)
497                         RETURN(-EBADF);
498
499                 rec.lrh_len = llog_data_len(data->ioc_plen1);
500
501                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
502                         rec.lrh_type = OBD_CFG_REC;
503                 } else {
504                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
505                         RETURN(-EINVAL);
506                 }
507
508                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
509                 if (cfg_buf == NULL)
510                         RETURN(-EINVAL);
511                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
512                 if (rc) {
513                         OBD_FREE(cfg_buf, data->ioc_plen1);
514                         RETURN(rc);
515                 }
516
517                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
518                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
519                                     cfg_buf, -1);
520                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
521
522                 OBD_FREE(cfg_buf, data->ioc_plen1);
523                 RETURN(rc);
524         }
525
526         case OBD_IOC_PARSE: {
527                 struct llog_ctxt *ctxt =
528                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
529                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
530                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
531                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
532                 if (rc)
533                         RETURN(rc);
534
535                 RETURN(rc);
536         }
537
538         case OBD_IOC_DUMP_LOG: {
539                 struct llog_ctxt *ctxt =
540                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
541                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
542                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
543                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
544                 if (rc)
545                         RETURN(rc);
546
547                 RETURN(rc);
548         }
549
550         case OBD_IOC_SYNC: {
551                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
552                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
553                 RETURN(rc);
554         }
555
556         case OBD_IOC_SET_READONLY: {
557                 void *handle;
558                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
559                 BDEVNAME_DECLARE_STORAGE(tmp);
560                 CERROR("*** setting device %s read-only ***\n",
561                        ll_bdevname(obd->u.obt.obt_sb, tmp));
562
563                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
564                 if (!IS_ERR(handle))
565                         rc = fsfilt_commit(obd, inode, handle, 1);
566
567                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
568                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
569
570                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
571                 RETURN(0);
572         }
573
574         case OBD_IOC_CATLOGLIST: {
575                 int count = mds->mds_lov_desc.ld_tgt_count;
576                 rc = llog_catalog_list(obd, count, data);
577                 RETURN(rc);
578
579         }
580         case OBD_IOC_LLOG_CHECK:
581         case OBD_IOC_LLOG_CANCEL:
582         case OBD_IOC_LLOG_REMOVE: {
583                 struct llog_ctxt *ctxt =
584                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
585                 int rc2;
586
587                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
588                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
589                 rc = llog_ioctl(ctxt, cmd, data);
590                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
591                 llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
592                 rc2 = obd_set_info_async(mds->mds_osc_exp,
593                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
594                                          0, NULL, NULL);
595                 if (!rc)
596                         rc = rc2;
597                 RETURN(rc);
598         }
599         case OBD_IOC_LLOG_INFO:
600         case OBD_IOC_LLOG_PRINT: {
601                 struct llog_ctxt *ctxt =
602                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
603
604                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
605                 rc = llog_ioctl(ctxt, cmd, data);
606                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
607
608                 RETURN(rc);
609         }
610
611         case OBD_IOC_ABORT_RECOVERY:
612                 CERROR("aborting recovery for device %s\n", obd->obd_name);
613                 target_abort_recovery(obd);
614                 RETURN(0);
615
616         default:
617                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
618                 RETURN(-EINVAL);
619         }
620         RETURN(0);
621
622 }
623
624 struct mds_lov_sync_info {
625         struct obd_device *mlsi_obd;     /* the lov device to sync */
626         struct obd_device *mlsi_watched; /* target osc */
627         __u32              mlsi_index;   /* index of target */
628 };
629
630 /* We only sync one osc at a time, so that we don't have to hold
631    any kind of lock on the whole mds_lov_desc, which may change
632    (grow) as a result of mds_lov_add_ost.  This also avoids any
633    kind of mismatch between the lov_desc and the mds_lov_desc,
634    which are not in lock-step during lov_add_obd */
635 static int __mds_lov_synchronize(void *data)
636 {
637         struct mds_lov_sync_info *mlsi = data;
638         struct obd_device *obd = mlsi->mlsi_obd;
639         struct obd_device *watched = mlsi->mlsi_watched;
640         struct mds_obd *mds = &obd->u.mds;
641         struct obd_uuid *uuid;
642         __u32  idx = mlsi->mlsi_index;
643         int rc = 0;
644         ENTRY;
645
646         OBD_FREE(mlsi, sizeof(*mlsi));
647
648         LASSERT(obd);
649         LASSERT(watched);
650         uuid = &watched->u.cli.cl_target_uuid;
651         LASSERT(uuid);
652
653         rc = mds_lov_update_mds(obd, watched, idx, uuid);
654         if (rc != 0)
655                 GOTO(out, rc);
656
657         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
658                                 KEY_MDS_CONN, 0, uuid, NULL);
659         if (rc != 0)
660                 GOTO(out, rc);
661
662         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
663                           mds->mds_lov_desc.ld_tgt_count,
664                           NULL, NULL, uuid);
665
666         if (rc != 0) {
667                 CERROR("%s: failed at llog_origin_connect: %d\n",
668                        obd->obd_name, rc);
669                 GOTO(out, rc);
670         }
671
672         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
673               obd->obd_name, obd_uuid2str(uuid));
674
675         if (obd->obd_stopping)
676                 GOTO(out, rc = -ENODEV);
677
678         rc = mds_lov_clear_orphans(mds, uuid);
679         if (rc != 0) {
680                 CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
681                        obd->obd_name, rc);
682                 GOTO(out, rc);
683         }
684
685         EXIT;
686 out:
687         class_decref(obd);
688         return rc;
689 }
690
691 int mds_lov_synchronize(void *data)
692 {
693         struct mds_lov_sync_info *mlsi = data;
694         char name[20];
695
696         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
697                 /* There is still a watched target,
698                 but we don't know its index */
699                 sprintf(name, "ll_sync_tgt");
700         else
701                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
702         ptlrpc_daemonize(name);
703
704         RETURN(__mds_lov_synchronize(data));
705 }
706
707 int mds_lov_start_synchronize(struct obd_device *obd,
708                               struct obd_device *watched,
709                               void *data, int nonblock)
710 {
711         struct mds_lov_sync_info *mlsi;
712         int rc;
713
714         ENTRY;
715
716         LASSERT(watched);
717
718         OBD_ALLOC(mlsi, sizeof(*mlsi));
719         if (mlsi == NULL)
720                 RETURN(-ENOMEM);
721
722         mlsi->mlsi_obd = obd;
723         mlsi->mlsi_watched = watched;
724         if (data)
725                 mlsi->mlsi_index = *(__u32 *)data;
726         else
727                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
728
729         /* Although class_export_get(obd->obd_self_export) would lock
730            the MDS in place, since it's only a self-export
731            it doesn't lock the LOV in place.  The LOV can be disconnected
732            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
733            Simply taking an export ref on the LOV doesn't help, because it's
734            still disconnected. Taking an obd reference insures that we don't
735            disconnect the LOV.  This of course means a cleanup won't
736            finish for as long as the sync is blocking. */
737         class_incref(obd);
738
739         if (nonblock) {
740                 /* Synchronize in the background */
741                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
742                                        CLONE_VM | CLONE_FILES);
743                 if (rc < 0) {
744                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
745                                obd->obd_name, rc);
746                         class_decref(obd);
747                 } else {
748                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
749                                "thread=%d\n", obd->obd_name,
750                                mlsi->mlsi_index, rc);
751                         rc = 0;
752                 }
753         } else {
754                 rc = __mds_lov_synchronize((void *)mlsi);
755         }
756
757         RETURN(rc);
758 }
759
760 int mds_notify(struct obd_device *obd, struct obd_device *watched,
761                enum obd_notify_event ev, void *data)
762 {
763         int rc = 0;
764         ENTRY;
765
766         switch (ev) {
767         /* We only handle these: */
768         case OBD_NOTIFY_ACTIVE:
769         case OBD_NOTIFY_SYNC:
770         case OBD_NOTIFY_SYNC_NONBLOCK:
771                 break;
772         case OBD_NOTIFY_CONFIG:
773                 /* Open for clients */
774                 obd->obd_no_conn = 0;
775         default:
776                 RETURN(0);
777         }
778
779         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
780
781         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
782                 CERROR("unexpected notification of %s %s!\n",
783                        watched->obd_type->typ_name, watched->obd_name);
784                 RETURN(-EINVAL);
785         }
786
787         if (obd->obd_recovering) {
788                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
789                       obd->obd_name,
790                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
791                 /* We still have to fix the lov descriptor for ost's added
792                    after the mdt in the config log.  They didn't make it into
793                    mds_lov_connect. */
794                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
795                 RETURN(rc);
796         }
797
798         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
799         rc = mds_lov_start_synchronize(obd, watched, data,
800                                        !(ev == OBD_NOTIFY_SYNC));
801
802         lquota_recovery(mds_quota_interface_ref, obd);
803
804         RETURN(rc);
805 }
806
807 /* Convert the on-disk LOV EA structre.
808  * We always try to convert from an old LOV EA format to the common in-memory
809  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
810  * then convert back to the new on-disk format and save it back to disk
811  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
812  * to convert it each time this inode is accessed.
813  *
814  * This function is a bit interesting in the error handling.  We can safely
815  * ship the old lmm to the client in case of failure, since it uses the same
816  * obd_unpackmd() code and can do the conversion if the MDS fails for some
817  * reason.  We will not delete the old lmm data until we have written the
818  * new format lmm data in fsfilt_set_md(). */
819 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
820                        struct lov_mds_md *lmm, int lmm_size)
821 {
822         struct lov_stripe_md *lsm = NULL;
823         void *handle;
824         int rc, err;
825         ENTRY;
826
827         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
828             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
829                 RETURN(0);
830
831         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
832                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
833                LOV_MAGIC);
834
835         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
836         if (rc < 0)
837                 GOTO(conv_end, rc);
838
839         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
840         if (rc < 0)
841                 GOTO(conv_free, rc);
842         lmm_size = rc;
843
844         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
845         if (IS_ERR(handle)) {
846                 rc = PTR_ERR(handle);
847                 GOTO(conv_free, rc);
848         }
849
850         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
851
852         err = fsfilt_commit(obd, inode, handle, 0);
853         if (!rc)
854                 rc = err ? err : lmm_size;
855         GOTO(conv_free, rc);
856 conv_free:
857         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
858 conv_end:
859         return rc;
860 }
861
862 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
863                          struct lov_desc *desc)
864 {
865         int i;
866         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
867                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
868                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
869         }
870 }
871