Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         ENTRY;
49
50         lock_kernel();
51         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52                 if (ids[i] > (mds->mds_lov_objids)[i]) {
53                         (mds->mds_lov_objids)[i] = ids[i];
54                         mds->mds_lov_objids_dirty = 1;
55                 }
56         unlock_kernel();
57         EXIT;
58 }
59 EXPORT_SYMBOL(mds_lov_update_objids);
60
61 static int mds_lov_read_objids(struct obd_device *obd)
62 {
63         struct mds_obd *mds = &obd->u.mds;
64         obd_id *ids;
65         loff_t off = 0;
66         int i, rc, size;
67         ENTRY;
68
69         LASSERT(!mds->mds_lov_objids_size);
70         LASSERT(!mds->mds_lov_objids_dirty);
71
72         /* Read everything in the file, even if our current lov desc
73            has fewer targets. Old targets not in the lov descriptor
74            during mds setup may still have valid objids. */
75         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
76         if (size == 0)
77                 RETURN(0);
78
79         OBD_ALLOC(ids, size);
80         if (ids == NULL)
81                 RETURN(-ENOMEM);
82         mds->mds_lov_objids = ids;
83         mds->mds_lov_objids_size = size;
84
85         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
86         if (rc < 0) {
87                 CERROR("Error reading objids %d\n", rc);
88                 RETURN(rc);
89         }
90
91         mds->mds_lov_objids_in_file = size / sizeof(*ids);
92
93         for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
94                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95                        mds->mds_lov_objids[i], i);
96         }
97         RETURN(0);
98 }
99
100 int mds_lov_write_objids(struct obd_device *obd)
101 {
102         struct mds_obd *mds = &obd->u.mds;
103         loff_t off = 0;
104         int i, rc, tgts;
105         ENTRY;
106
107         if (!mds->mds_lov_objids_dirty)
108                 RETURN(0);
109
110         tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
111
112         if (!tgts)
113                 RETURN(0);
114
115         for (i = 0; i < tgts; i++)
116                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
117                        mds->mds_lov_objids[i], i);
118
119         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
120                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
121                                  &off, 0);
122         if (rc >= 0) {
123                 mds->mds_lov_objids_dirty = 0;
124                 rc = 0;
125         }
126
127         RETURN(rc);
128 }
129 EXPORT_SYMBOL(mds_lov_write_objids);
130
131 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
132 {
133         int rc;
134         struct obdo oa;
135         struct obd_trans_info oti = {0};
136         struct lov_stripe_md  *empty_ea = NULL;
137         ENTRY;
138
139         LASSERT(mds->mds_lov_objids != NULL);
140
141         /* This create will in fact either create or destroy:  If the OST is
142          * missing objects below this ID, they will be created.  If it finds
143          * objects above this ID, they will be removed. */
144         memset(&oa, 0, sizeof(oa));
145         oa.o_flags = OBD_FL_DELORPHAN;
146         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
147         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
148         if (ost_uuid != NULL) {
149                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
150                 oa.o_valid |= OBD_MD_FLINLINE;
151         }
152         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
153
154         RETURN(rc);
155 }
156
157 /* update the LOV-OSC knowledge of the last used object id's */
158 int mds_lov_set_nextid(struct obd_device *obd)
159 {
160         struct mds_obd *mds = &obd->u.mds;
161         int rc;
162         ENTRY;
163
164         LASSERT(!obd->obd_recovering);
165
166         LASSERT(mds->mds_lov_objids != NULL);
167
168         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
169                                 KEY_NEXT_ID,
170                                 mds->mds_lov_desc.ld_tgt_count *
171                                 sizeof(*mds->mds_lov_objids),
172                                 mds->mds_lov_objids, NULL);
173
174         if (rc)
175                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
176                         obd->obd_name, rc);
177
178         RETURN(rc);
179 }
180
181 /* Update the lov desc for a new size lov. */
182 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
183 {
184         struct mds_obd *mds = &obd->u.mds;
185         struct lov_desc *ld;
186         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
187         int rc = 0;
188         ENTRY;
189
190         OBD_ALLOC(ld, sizeof(*ld));
191         if (!ld)
192                 RETURN(-ENOMEM);
193
194         rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
195                           &valsize, ld);
196         if (rc)
197                 GOTO(out, rc);
198
199         /* The size of the LOV target table may have increased. */
200         size = ld->ld_tgt_count * sizeof(obd_id);
201         if ((mds->mds_lov_objids_size == 0) ||
202             (size > mds->mds_lov_objids_size)) {
203                 obd_id *ids;
204
205                 /* add room by powers of 2 */
206                 size = 1;
207                 while (size < ld->ld_tgt_count)
208                         size = size << 1;
209                 size = size * sizeof(obd_id);
210
211                 OBD_ALLOC(ids, size);
212                 if (ids == NULL)
213                         GOTO(out, rc = -ENOMEM);
214                 memset(ids, 0, size);
215                 if (mds->mds_lov_objids_size) {
216                         obd_id *old_ids = mds->mds_lov_objids;
217                         memcpy(ids, mds->mds_lov_objids,
218                                mds->mds_lov_objids_size);
219                         mds->mds_lov_objids = ids;
220                         OBD_FREE(old_ids, mds->mds_lov_objids_size);
221                 }
222                 mds->mds_lov_objids = ids;
223                 mds->mds_lov_objids_size = size;
224         }
225
226         /* Don't change the mds_lov_desc until the objids size matches the
227            count (paranoia) */
228         mds->mds_lov_desc = *ld;
229         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
230                mds->mds_lov_desc.ld_tgt_count);
231
232         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
233                         max(mds->mds_lov_desc.ld_tgt_count,
234                             mds->mds_lov_objids_in_file));
235         mds->mds_max_mdsize = lov_mds_md_size(stripes);
236         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
237         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
238                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
239                stripes);
240
241         /* If we added a target we have to reconnect the llogs */
242         /* We only _need_ to do this at first add (idx), or the first time
243            after recovery.  However, it should now be safe to call anytime. */
244         mutex_down(&obd->obd_dev_sem);
245         llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
246         mutex_up(&obd->obd_dev_sem);
247
248         /*XXX this notifies the MDD until lov handling use old mds code */
249         if (obd->obd_upcall.onu_owner) {
250                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
251                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
252                                                  obd->obd_upcall.onu_owner);
253         }
254 out:
255         OBD_FREE(ld, sizeof(*ld));
256         RETURN(rc);
257 }
258
259
260 #define MDSLOV_NO_INDEX -1
261
262 /* Inform MDS about new/updated target */
263 static int mds_lov_update_mds(struct obd_device *obd,
264                               struct obd_device *watched,
265                               __u32 idx, struct obd_uuid *uuid)
266 {
267         struct mds_obd *mds = &obd->u.mds;
268         int old_count;
269         int rc = 0;
270         ENTRY;
271
272         old_count = mds->mds_lov_desc.ld_tgt_count;
273         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
274         if (rc)
275                 RETURN(rc);
276
277         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
278                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
279                mds->mds_lov_desc.ld_tgt_count);
280
281         /* idx is set as data from lov_notify. */
282         if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
283                 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
284                         CERROR("index %d > count %d!\n", idx,
285                                mds->mds_lov_desc.ld_tgt_count);
286                         RETURN(-EINVAL);
287                 }
288
289                 if (idx >= mds->mds_lov_objids_in_file) {
290                         /* We never read this lastid; ask the osc */
291                         obd_id lastid;
292                         __u32 size = sizeof(lastid);
293                         rc = obd_get_info(watched->obd_self_export,
294                                           strlen("last_id"),
295                                           "last_id", &size, &lastid);
296                         if (rc)
297                                 RETURN(rc);
298                         mds->mds_lov_objids[idx] = lastid;
299                         mds->mds_lov_objids_dirty = 1;
300                         mds_lov_write_objids(obd);
301                 } else {
302                         /* We have read this lastid from disk; tell the osc.
303                            Don't call this during recovery. */
304                         rc = mds_lov_set_nextid(obd);
305                 }
306
307                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
308                       mds->mds_lov_objids[idx], idx);
309         }
310
311         RETURN(rc);
312 }
313
314 /* update the LOV-OSC knowledge of the last used object id's */
315 int mds_lov_connect(struct obd_device *obd, char * lov_name)
316 {
317         struct mds_obd *mds = &obd->u.mds;
318         struct lustre_handle conn = {0,};
319         struct obd_connect_data *data;
320         int rc, i;
321         ENTRY;
322
323         if (IS_ERR(mds->mds_osc_obd))
324                 RETURN(PTR_ERR(mds->mds_osc_obd));
325
326         if (mds->mds_osc_obd)
327                 RETURN(0);
328
329         mds->mds_osc_obd = class_name2obd(lov_name);
330         if (!mds->mds_osc_obd) {
331                 CERROR("MDS cannot locate LOV %s\n", lov_name);
332                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
333                 RETURN(-ENOTCONN);
334         }
335
336         OBD_ALLOC(data, sizeof(*data));
337         if (data == NULL)
338                 RETURN(-ENOMEM);
339         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
340                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
341                                   OBD_CONNECT_OSS_CAPA;
342 #ifdef HAVE_LRU_RESIZE_SUPPORT
343         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
344 #endif
345         data->ocd_version = LUSTRE_VERSION_CODE;
346         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
347         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
348         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
349         OBD_FREE(data, sizeof(*data));
350         if (rc) {
351                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
352                 mds->mds_osc_obd = ERR_PTR(rc);
353                 RETURN(rc);
354         }
355         mds->mds_osc_exp = class_conn2export(&conn);
356
357         rc = obd_register_observer(mds->mds_osc_obd, obd);
358         if (rc) {
359                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
360                        lov_name, rc);
361                 GOTO(err_discon, rc);
362         }
363
364         /* Deny new client connections until we are sure we have some OSTs */
365         obd->obd_no_conn = 1;
366
367         rc = mds_lov_read_objids(obd);
368         if (rc) {
369                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
370                 GOTO(err_reg, rc);
371         }
372
373         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
374         if (rc)
375                 GOTO(err_reg, rc);
376
377         /* tgt_count may be 0! */
378         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
379         if (rc) {
380                 CERROR("failed to initialize catalog %d\n", rc);
381                 GOTO(err_reg, rc);
382         }
383
384         /* If we're mounting this code for the first time on an existing FS,
385          * we need to populate the objids array from the real OST values */
386         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
387                 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
388                 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
389                                   "last_id", &size, mds->mds_lov_objids);
390                 if (!rc) {
391                         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
392                                 CWARN("got last object "LPU64" from OST %d\n",
393                                       mds->mds_lov_objids[i], i);
394                         mds->mds_lov_objids_dirty = 1;
395                         rc = mds_lov_write_objids(obd);
396                         if (rc)
397                                 CERROR("got last objids from OSTs, but error "
398                                        "writing objids file: %d\n", rc);
399                 }
400         }
401
402         /* I want to see a callback happen when the OBD moves to a
403          * "For General Use" state, and that's when we'll call
404          * set_nextid().  The class driver can help us here, because
405          * it can use the obd_recovering flag to determine when the
406          * the OBD is full available. */
407         /* MDD device will care about that
408         if (!obd->obd_recovering)
409                 rc = mds_postrecov(obd);
410          */
411         RETURN(rc);
412
413 err_reg:
414         obd_register_observer(mds->mds_osc_obd, NULL);
415 err_discon:
416         obd_disconnect(mds->mds_osc_exp);
417         mds->mds_osc_exp = NULL;
418         mds->mds_osc_obd = ERR_PTR(rc);
419         RETURN(rc);
420 }
421
422 int mds_lov_disconnect(struct obd_device *obd)
423 {
424         struct mds_obd *mds = &obd->u.mds;
425         int rc = 0;
426         ENTRY;
427
428         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
429                 obd_register_observer(mds->mds_osc_obd, NULL);
430
431                 /* The actual disconnect of the mds_lov will be called from
432                  * class_disconnect_exports from mds_lov_clean. So we have to
433                  * ensure that class_cleanup doesn't fail due to the extra ref
434                  * we're holding now. The mechanism to do that already exists -
435                  * the obd_force flag. We'll drop the final ref to the
436                  * mds_osc_exp in mds_cleanup. */
437                 mds->mds_osc_obd->obd_force = 1;
438         }
439
440         RETURN(rc);
441 }
442
443 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
444                   void *karg, void *uarg)
445 {
446         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
447         struct obd_device *obd = exp->exp_obd;
448         struct mds_obd *mds = &obd->u.mds;
449         struct obd_ioctl_data *data = karg;
450         struct lvfs_run_ctxt saved;
451         int rc = 0;
452
453         ENTRY;
454         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
455
456         switch (cmd) {
457         case OBD_IOC_RECORD: {
458                 char *name = data->ioc_inlbuf1;
459                 if (mds->mds_cfg_llh)
460                         RETURN(-EBUSY);
461
462                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
463                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
464                                  &mds->mds_cfg_llh, NULL, name);
465                 if (rc == 0)
466                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
467                                          &cfg_uuid);
468                 else
469                         mds->mds_cfg_llh = NULL;
470                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
471
472                 RETURN(rc);
473         }
474
475         case OBD_IOC_ENDRECORD: {
476                 if (!mds->mds_cfg_llh)
477                         RETURN(-EBADF);
478
479                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
480                 rc = llog_close(mds->mds_cfg_llh);
481                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
482
483                 mds->mds_cfg_llh = NULL;
484                 RETURN(rc);
485         }
486
487         case OBD_IOC_CLEAR_LOG: {
488                 char *name = data->ioc_inlbuf1;
489                 if (mds->mds_cfg_llh)
490                         RETURN(-EBUSY);
491
492                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
493                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
494                                  &mds->mds_cfg_llh, NULL, name);
495                 if (rc == 0) {
496                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
497                                          NULL);
498
499                         rc = llog_destroy(mds->mds_cfg_llh);
500                         llog_free_handle(mds->mds_cfg_llh);
501                 }
502                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
503
504                 mds->mds_cfg_llh = NULL;
505                 RETURN(rc);
506         }
507
508         case OBD_IOC_DORECORD: {
509                 char *cfg_buf;
510                 struct llog_rec_hdr rec;
511                 if (!mds->mds_cfg_llh)
512                         RETURN(-EBADF);
513
514                 rec.lrh_len = llog_data_len(data->ioc_plen1);
515
516                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
517                         rec.lrh_type = OBD_CFG_REC;
518                 } else {
519                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
520                         RETURN(-EINVAL);
521                 }
522
523                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
524                 if (cfg_buf == NULL)
525                         RETURN(-EINVAL);
526                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
527                 if (rc) {
528                         OBD_FREE(cfg_buf, data->ioc_plen1);
529                         RETURN(rc);
530                 }
531
532                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
533                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
534                                     cfg_buf, -1);
535                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
536
537                 OBD_FREE(cfg_buf, data->ioc_plen1);
538                 RETURN(rc);
539         }
540
541         case OBD_IOC_PARSE: {
542                 struct llog_ctxt *ctxt =
543                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
544                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
545                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
546                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
547                 if (rc)
548                         RETURN(rc);
549
550                 RETURN(rc);
551         }
552
553         case OBD_IOC_DUMP_LOG: {
554                 struct llog_ctxt *ctxt =
555                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
556                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
557                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
558                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
559                 if (rc)
560                         RETURN(rc);
561
562                 RETURN(rc);
563         }
564
565         case OBD_IOC_SYNC: {
566                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
567                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
568                 RETURN(rc);
569         }
570
571         case OBD_IOC_SET_READONLY: {
572                 void *handle;
573                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
574                 BDEVNAME_DECLARE_STORAGE(tmp);
575                 CERROR("*** setting device %s read-only ***\n",
576                        ll_bdevname(obd->u.obt.obt_sb, tmp));
577
578                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
579                 if (!IS_ERR(handle))
580                         rc = fsfilt_commit(obd, inode, handle, 1);
581
582                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
583                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
584
585                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
586                 RETURN(0);
587         }
588
589         case OBD_IOC_CATLOGLIST: {
590                 int count = mds->mds_lov_desc.ld_tgt_count;
591                 rc = llog_catalog_list(obd, count, data);
592                 RETURN(rc);
593
594         }
595         case OBD_IOC_LLOG_CHECK:
596         case OBD_IOC_LLOG_CANCEL:
597         case OBD_IOC_LLOG_REMOVE: {
598                 struct llog_ctxt *ctxt =
599                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
600                 int rc2;
601                 __u32 group;
602
603                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
604                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
605                 rc = llog_ioctl(ctxt, cmd, data);
606                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
607                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
608                 group = FILTER_GROUP_MDS0 + mds->mds_id;
609                 rc2 = obd_set_info_async(mds->mds_osc_exp,
610                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
611                                          sizeof(group), &group, NULL);
612                 if (!rc)
613                         rc = rc2;
614                 RETURN(rc);
615         }
616         case OBD_IOC_LLOG_INFO:
617         case OBD_IOC_LLOG_PRINT: {
618                 struct llog_ctxt *ctxt =
619                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
620
621                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
622                 rc = llog_ioctl(ctxt, cmd, data);
623                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
624
625                 RETURN(rc);
626         }
627
628         case OBD_IOC_ABORT_RECOVERY:
629                 CERROR("aborting recovery for device %s\n", obd->obd_name);
630                 target_stop_recovery_thread(obd);
631                 RETURN(0);
632
633         default:
634                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
635                 RETURN(-EINVAL);
636         }
637         RETURN(0);
638
639 }
640
641 /* Collect the preconditions we need to allow client connects */
642 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
643 {
644         if (flag & CONFIG_LOG)
645                 obd->u.mds.mds_fl_cfglog = 1;
646         if (flag & CONFIG_SYNC)
647                 obd->u.mds.mds_fl_synced = 1;
648         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
649                 /* Open for clients */
650                 obd->obd_no_conn = 0;
651 }
652
653 struct mds_lov_sync_info {
654         struct obd_device *mlsi_obd;     /* the lov device to sync */
655         struct obd_device *mlsi_watched; /* target osc */
656         __u32              mlsi_index;   /* index of target */
657 };
658
659 static int mds_propagate_capa_keys(struct mds_obd *mds)
660 {
661         struct lustre_capa_key *key;
662         int i, rc = 0;
663
664         ENTRY;
665
666         if (!mds->mds_capa_keys)
667                 RETURN(0);
668
669         for (i = 0; i < 2; i++) {
670                 key = &mds->mds_capa_keys[i];
671                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
672
673                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
674                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
675                 if (rc) {
676                         DEBUG_CAPA_KEY(D_ERROR, key,
677                                        "propagate failed (rc = %d) for", rc);
678                         RETURN(rc);
679                 }
680         }
681
682         RETURN(0);
683 }
684
685 /* We only sync one osc at a time, so that we don't have to hold
686    any kind of lock on the whole mds_lov_desc, which may change
687    (grow) as a result of mds_lov_add_ost.  This also avoids any
688    kind of mismatch between the lov_desc and the mds_lov_desc,
689    which are not in lock-step during lov_add_obd */
690 static int __mds_lov_synchronize(void *data)
691 {
692         struct mds_lov_sync_info *mlsi = data;
693         struct obd_device *obd = mlsi->mlsi_obd;
694         struct obd_device *watched = mlsi->mlsi_watched;
695         struct mds_obd *mds = &obd->u.mds;
696         struct obd_uuid *uuid;
697         __u32  idx = mlsi->mlsi_index;
698         struct mds_group_info mgi;
699         int rc = 0;
700         ENTRY;
701
702         OBD_FREE(mlsi, sizeof(*mlsi));
703
704         LASSERT(obd);
705         LASSERT(watched);
706         uuid = &watched->u.cli.cl_target_uuid;
707         LASSERT(uuid);
708
709         rc = mds_lov_update_mds(obd, watched, idx, uuid);
710         if (rc != 0) {
711                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
712                 GOTO(out, rc);
713         }
714         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
715         mgi.uuid = uuid;
716
717         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
718                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
719         if (rc != 0)
720                 GOTO(out, rc);
721
722         /* propagate capability keys */
723         rc = mds_propagate_capa_keys(mds);
724         if (rc)
725                 GOTO(out, rc);
726
727         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
728                           mds->mds_lov_desc.ld_tgt_count,
729                           NULL, NULL, uuid);
730
731         if (rc != 0) {
732                 CERROR("%s: failed at llog_origin_connect: %d\n",
733                        obd->obd_name, rc);
734                 GOTO(out, rc);
735         }
736
737         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
738               obd->obd_name, obd_uuid2str(uuid));
739         /*
740          * FIXME: this obd_stopping was useless, 
741          * since obd in mdt layer was set
742          */
743         if (obd->obd_stopping)
744                 GOTO(out, rc = -ENODEV);
745
746         rc = mds_lov_clear_orphans(mds, uuid);
747         if (rc != 0) {
748                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
749                        obd_uuid2str(uuid), rc);
750                 GOTO(out, rc);
751         }
752         
753         if (obd->obd_upcall.onu_owner) {
754                 /*
755                  * This is a hack for mds_notify->mdd_notify. When the mds obd
756                  * in mdd is removed, This hack should be removed.
757                  */
758                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
759                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
760                                                  obd->obd_upcall.onu_owner);
761         }
762         EXIT;
763 out:
764         class_decref(obd);
765         return rc;
766 }
767
768 int mds_lov_synchronize(void *data)
769 {
770         struct mds_lov_sync_info *mlsi = data;
771         char name[20];
772
773         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
774                 /* There is still a watched target,
775                 but we don't know its index */
776                 sprintf(name, "ll_sync_tgt");
777         else
778                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
779         ptlrpc_daemonize(name);
780
781         RETURN(__mds_lov_synchronize(data));
782 }
783
784 int mds_lov_start_synchronize(struct obd_device *obd,
785                               struct obd_device *watched,
786                               void *data, int nonblock)
787 {
788         struct mds_lov_sync_info *mlsi;
789         int rc;
790
791         ENTRY;
792
793         LASSERT(watched);
794
795         OBD_ALLOC(mlsi, sizeof(*mlsi));
796         if (mlsi == NULL)
797                 RETURN(-ENOMEM);
798
799         mlsi->mlsi_obd = obd;
800         mlsi->mlsi_watched = watched;
801         if (data)
802                 mlsi->mlsi_index = *(__u32 *)data;
803         else
804                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
805
806         /* Although class_export_get(obd->obd_self_export) would lock
807            the MDS in place, since it's only a self-export
808            it doesn't lock the LOV in place.  The LOV can be disconnected
809            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
810            Simply taking an export ref on the LOV doesn't help, because it's
811            still disconnected. Taking an obd reference insures that we don't
812            disconnect the LOV.  This of course means a cleanup won't
813            finish for as long as the sync is blocking. */
814         class_incref(obd);
815
816         if (nonblock) {
817                 /* Synchronize in the background */
818                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
819                                        CLONE_VM | CLONE_FILES);
820                 if (rc < 0) {
821                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
822                                obd->obd_name, rc);
823                         class_decref(obd);
824                 } else {
825                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
826                                "thread=%d\n", obd->obd_name,
827                                mlsi->mlsi_index, rc);
828                         rc = 0;
829                 }
830         } else {
831                 rc = __mds_lov_synchronize((void *)mlsi);
832         }
833
834         RETURN(rc);
835 }
836
837 int mds_notify(struct obd_device *obd, struct obd_device *watched,
838                enum obd_notify_event ev, void *data)
839 {
840         int rc = 0;
841         ENTRY;
842
843         switch (ev) {
844         /* We only handle these: */
845         case OBD_NOTIFY_ACTIVE:
846         case OBD_NOTIFY_SYNC:
847         case OBD_NOTIFY_SYNC_NONBLOCK:
848                 break;
849         case OBD_NOTIFY_CONFIG:
850                 mds_allow_cli(obd, (unsigned int)data);
851         default:
852                 RETURN(0);
853         }
854
855         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
856         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
857                 CERROR("unexpected notification of %s %s!\n",
858                        watched->obd_type->typ_name, watched->obd_name);
859                 RETURN(-EINVAL);
860         }
861
862         if (obd->obd_recovering) {
863                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
864                       obd->obd_name,
865                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
866                 /* We still have to fix the lov descriptor for ost's added
867                    after the mdt in the config log.  They didn't make it into
868                    mds_lov_connect. */
869                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
870                 if (rc)
871                         RETURN(rc);
872                 /* We should update init llog here too for replay unlink and 
873                  * possiable llog init race when recovery complete */
874                 mutex_down(&obd->obd_dev_sem);
875                 llog_cat_initialize(obd, NULL, 
876                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
877                                     &watched->u.cli.cl_target_uuid);
878                 mutex_up(&obd->obd_dev_sem);
879                 mds_allow_cli(obd, CONFIG_SYNC);
880                 RETURN(rc);
881         }
882
883         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
884         rc = mds_lov_start_synchronize(obd, watched, data,
885                                        !(ev == OBD_NOTIFY_SYNC));
886
887         lquota_recovery(mds_quota_interface_ref, obd);
888
889         RETURN(rc);
890 }
891
892 /* Convert the on-disk LOV EA structre.
893  * We always try to convert from an old LOV EA format to the common in-memory
894  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
895  * then convert back to the new on-disk format and save it back to disk
896  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
897  * to convert it each time this inode is accessed.
898  *
899  * This function is a bit interesting in the error handling.  We can safely
900  * ship the old lmm to the client in case of failure, since it uses the same
901  * obd_unpackmd() code and can do the conversion if the MDS fails for some
902  * reason.  We will not delete the old lmm data until we have written the
903  * new format lmm data in fsfilt_set_md(). */
904 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
905                        struct lov_mds_md *lmm, int lmm_size)
906 {
907         struct lov_stripe_md *lsm = NULL;
908         void *handle;
909         int rc, err;
910         ENTRY;
911
912         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
913             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
914                 RETURN(0);
915
916         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
917                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
918                LOV_MAGIC);
919
920         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
921         if (rc < 0)
922                 GOTO(conv_end, rc);
923
924         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
925         if (rc < 0)
926                 GOTO(conv_free, rc);
927         lmm_size = rc;
928
929         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
930         if (IS_ERR(handle)) {
931                 rc = PTR_ERR(handle);
932                 GOTO(conv_free, rc);
933         }
934
935         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
936
937         err = fsfilt_commit(obd, inode, handle, 0);
938         if (!rc)
939                 rc = err ? err : lmm_size;
940         GOTO(conv_free, rc);
941 conv_free:
942         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
943 conv_end:
944         return rc;
945 }
946
947 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
948                          struct lov_desc *desc)
949 {
950         int i;
951         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
952                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
953                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
954         }
955 }
956 EXPORT_SYMBOL(mds_objids_from_lmm);