Whamcloud - gitweb
b=2262
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         ENTRY;
49
50         lock_kernel();
51         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52                 if (ids[i] > (mds->mds_lov_objids)[i]) {
53                         (mds->mds_lov_objids)[i] = ids[i];
54                         mds->mds_lov_objids_dirty = 1;
55                 }
56         unlock_kernel();
57         EXIT;
58 }
59 EXPORT_SYMBOL(mds_lov_update_objids);
60
61 static int mds_lov_read_objids(struct obd_device *obd)
62 {
63         struct mds_obd *mds = &obd->u.mds;
64         obd_id *ids;
65         loff_t off = 0;
66         int i, rc, size;
67         ENTRY;
68
69         LASSERT(!mds->mds_lov_objids_size);
70         LASSERT(!mds->mds_lov_objids_dirty);
71
72         /* Read everything in the file, even if our current lov desc
73            has fewer targets. Old targets not in the lov descriptor
74            during mds setup may still have valid objids. */
75         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
76         if (size == 0)
77                 RETURN(0);
78
79         OBD_ALLOC(ids, size);
80         if (ids == NULL)
81                 RETURN(-ENOMEM);
82         mds->mds_lov_objids = ids;
83         mds->mds_lov_objids_size = size;
84
85         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
86         if (rc < 0) {
87                 CERROR("Error reading objids %d\n", rc);
88                 RETURN(rc);
89         }
90
91         mds->mds_lov_objids_in_file = size / sizeof(*ids);
92
93         for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
94                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95                        mds->mds_lov_objids[i], i);
96         }
97         RETURN(0);
98 }
99
100 int mds_lov_write_objids(struct obd_device *obd)
101 {
102         struct mds_obd *mds = &obd->u.mds;
103         loff_t off = 0;
104         int i, rc, tgts;
105         ENTRY;
106
107         if (!mds->mds_lov_objids_dirty)
108                 RETURN(0);
109
110         tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
111
112         if (!tgts)
113                 RETURN(0);
114
115         for (i = 0; i < tgts; i++)
116                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
117                        mds->mds_lov_objids[i], i);
118
119         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
120                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
121                                  &off, 0);
122         if (rc >= 0) {
123                 mds->mds_lov_objids_dirty = 0;
124                 rc = 0;
125         }
126
127         RETURN(rc);
128 }
129 EXPORT_SYMBOL(mds_lov_write_objids);
130
131 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
132 {
133         int rc;
134         struct obdo oa;
135         struct obd_trans_info oti = {0};
136         struct lov_stripe_md  *empty_ea = NULL;
137         ENTRY;
138
139         LASSERT(mds->mds_lov_objids != NULL);
140
141         /* This create will in fact either create or destroy:  If the OST is
142          * missing objects below this ID, they will be created.  If it finds
143          * objects above this ID, they will be removed. */
144         memset(&oa, 0, sizeof(oa));
145         oa.o_flags = OBD_FL_DELORPHAN;
146         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
147         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
148         if (ost_uuid != NULL) {
149                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
150                 oa.o_valid |= OBD_MD_FLINLINE;
151         }
152         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
153
154         RETURN(rc);
155 }
156
157 /* update the LOV-OSC knowledge of the last used object id's */
158 int mds_lov_set_nextid(struct obd_device *obd)
159 {
160         struct mds_obd *mds = &obd->u.mds;
161         int rc;
162         ENTRY;
163
164         LASSERT(!obd->obd_recovering);
165
166         LASSERT(mds->mds_lov_objids != NULL);
167
168         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
169                                 KEY_NEXT_ID,
170                                 mds->mds_lov_desc.ld_tgt_count *
171                                 sizeof(*mds->mds_lov_objids),
172                                 mds->mds_lov_objids, NULL);
173
174         if (rc)
175                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
176                         obd->obd_name, rc);
177
178         RETURN(rc);
179 }
180
181 /* Update the lov desc for a new size lov. */
182 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
183 {
184         struct mds_obd *mds = &obd->u.mds;
185         struct lov_desc *ld;
186         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
187         int rc = 0;
188         ENTRY;
189
190         OBD_ALLOC(ld, sizeof(*ld));
191         if (!ld)
192                 RETURN(-ENOMEM);
193
194         rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
195                           &valsize, ld);
196         if (rc)
197                 GOTO(out, rc);
198
199         /* The size of the LOV target table may have increased. */
200         size = ld->ld_tgt_count * sizeof(obd_id);
201         if ((mds->mds_lov_objids_size == 0) ||
202             (size > mds->mds_lov_objids_size)) {
203                 obd_id *ids;
204
205                 /* add room by powers of 2 */
206                 size = 1;
207                 while (size < ld->ld_tgt_count)
208                         size = size << 1;
209                 size = size * sizeof(obd_id);
210
211                 OBD_ALLOC(ids, size);
212                 if (ids == NULL)
213                         GOTO(out, rc = -ENOMEM);
214                 memset(ids, 0, size);
215                 if (mds->mds_lov_objids_size) {
216                         obd_id *old_ids = mds->mds_lov_objids;
217                         memcpy(ids, mds->mds_lov_objids,
218                                mds->mds_lov_objids_size);
219                         mds->mds_lov_objids = ids;
220                         OBD_FREE(old_ids, mds->mds_lov_objids_size);
221                 }
222                 mds->mds_lov_objids = ids;
223                 mds->mds_lov_objids_size = size;
224         }
225
226         /* Don't change the mds_lov_desc until the objids size matches the
227            count (paranoia) */
228         mds->mds_lov_desc = *ld;
229         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
230                mds->mds_lov_desc.ld_tgt_count);
231
232         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
233                         max(mds->mds_lov_desc.ld_tgt_count,
234                             mds->mds_lov_objids_in_file));
235         mds->mds_max_mdsize = lov_mds_md_size(stripes);
236         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
237         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
238                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
239                stripes);
240
241         /* If we added a target we have to reconnect the llogs */
242         /* We only _need_ to do this at first add (idx), or the first time
243            after recovery.  However, it should now be safe to call anytime. */
244         mutex_down(&obd->obd_dev_sem);
245         llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
246         mutex_up(&obd->obd_dev_sem);
247
248         /*XXX this notifies the MDD until lov handling use old mds code */
249         if (obd->obd_upcall.onu_owner) {
250                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
251                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
252                                                  obd->obd_upcall.onu_owner);
253         }
254 out:
255         OBD_FREE(ld, sizeof(*ld));
256         RETURN(rc);
257 }
258
259
260 #define MDSLOV_NO_INDEX -1
261
262 /* Inform MDS about new/updated target */
263 static int mds_lov_update_mds(struct obd_device *obd,
264                               struct obd_device *watched,
265                               __u32 idx, struct obd_uuid *uuid)
266 {
267         struct mds_obd *mds = &obd->u.mds;
268         int old_count;
269         int rc = 0;
270         ENTRY;
271
272         old_count = mds->mds_lov_desc.ld_tgt_count;
273         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
274         if (rc)
275                 RETURN(rc);
276
277         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
278                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
279                mds->mds_lov_desc.ld_tgt_count);
280
281         /* idx is set as data from lov_notify. */
282         if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
283                 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
284                         CERROR("index %d > count %d!\n", idx,
285                                mds->mds_lov_desc.ld_tgt_count);
286                         RETURN(-EINVAL);
287                 }
288
289                 if (idx >= mds->mds_lov_objids_in_file) {
290                         /* We never read this lastid; ask the osc */
291                         obd_id lastid;
292                         __u32 size = sizeof(lastid);
293                         rc = obd_get_info(watched->obd_self_export,
294                                           strlen("last_id"),
295                                           "last_id", &size, &lastid);
296                         if (rc)
297                                 RETURN(rc);
298                         mds->mds_lov_objids[idx] = lastid;
299                         mds->mds_lov_objids_dirty = 1;
300                         mds_lov_write_objids(obd);
301                 } else {
302                         /* We have read this lastid from disk; tell the osc.
303                            Don't call this during recovery. */
304                         rc = mds_lov_set_nextid(obd);
305                 }
306
307                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
308                       mds->mds_lov_objids[idx], idx);
309         }
310
311         RETURN(rc);
312 }
313
314 /* update the LOV-OSC knowledge of the last used object id's */
315 int mds_lov_connect(struct obd_device *obd, char * lov_name)
316 {
317         struct mds_obd *mds = &obd->u.mds;
318         struct lustre_handle conn = {0,};
319         struct obd_connect_data *data;
320         int rc, i;
321         ENTRY;
322
323         if (IS_ERR(mds->mds_osc_obd))
324                 RETURN(PTR_ERR(mds->mds_osc_obd));
325
326         if (mds->mds_osc_obd)
327                 RETURN(0);
328
329         mds->mds_osc_obd = class_name2obd(lov_name);
330         if (!mds->mds_osc_obd) {
331                 CERROR("MDS cannot locate LOV %s\n", lov_name);
332                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
333                 RETURN(-ENOTCONN);
334         }
335
336         OBD_ALLOC(data, sizeof(*data));
337         if (data == NULL)
338                 RETURN(-ENOMEM);
339         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
340                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
341                                   OBD_CONNECT_OSS_CAPA;
342 #ifdef HAVE_LRU_RESIZE_SUPPORT
343         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
344 #endif
345         data->ocd_version = LUSTRE_VERSION_CODE;
346         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
347         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
348         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
349         OBD_FREE(data, sizeof(*data));
350         if (rc) {
351                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
352                 mds->mds_osc_obd = ERR_PTR(rc);
353                 RETURN(rc);
354         }
355         mds->mds_osc_exp = class_conn2export(&conn);
356
357         rc = obd_register_observer(mds->mds_osc_obd, obd);
358         if (rc) {
359                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
360                        lov_name, rc);
361                 GOTO(err_discon, rc);
362         }
363
364         /* Deny new client connections until we are sure we have some OSTs */
365         obd->obd_no_conn = 1;
366
367         rc = mds_lov_read_objids(obd);
368         if (rc) {
369                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
370                 GOTO(err_reg, rc);
371         }
372
373         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
374         if (rc)
375                 GOTO(err_reg, rc);
376
377         /* tgt_count may be 0! */
378         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
379         if (rc) {
380                 CERROR("failed to initialize catalog %d\n", rc);
381                 GOTO(err_reg, rc);
382         }
383
384         /* If we're mounting this code for the first time on an existing FS,
385          * we need to populate the objids array from the real OST values */
386         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
387                 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
388                 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
389                                   "last_id", &size, mds->mds_lov_objids);
390                 if (!rc) {
391                         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
392                                 CWARN("got last object "LPU64" from OST %d\n",
393                                       mds->mds_lov_objids[i], i);
394                         mds->mds_lov_objids_dirty = 1;
395                         rc = mds_lov_write_objids(obd);
396                         if (rc)
397                                 CERROR("got last objids from OSTs, but error "
398                                        "writing objids file: %d\n", rc);
399                 }
400         }
401
402         /* I want to see a callback happen when the OBD moves to a
403          * "For General Use" state, and that's when we'll call
404          * set_nextid().  The class driver can help us here, because
405          * it can use the obd_recovering flag to determine when the
406          * the OBD is full available. */
407         /* MDD device will care about that
408         if (!obd->obd_recovering)
409                 rc = mds_postrecov(obd);
410          */
411         RETURN(rc);
412
413 err_reg:
414         obd_register_observer(mds->mds_osc_obd, NULL);
415 err_discon:
416         obd_disconnect(mds->mds_osc_exp);
417         mds->mds_osc_exp = NULL;
418         mds->mds_osc_obd = ERR_PTR(rc);
419         RETURN(rc);
420 }
421
422 int mds_lov_disconnect(struct obd_device *obd)
423 {
424         struct mds_obd *mds = &obd->u.mds;
425         int rc = 0;
426         ENTRY;
427
428         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
429                 obd_register_observer(mds->mds_osc_obd, NULL);
430
431                 /* The actual disconnect of the mds_lov will be called from
432                  * class_disconnect_exports from mds_lov_clean. So we have to
433                  * ensure that class_cleanup doesn't fail due to the extra ref
434                  * we're holding now. The mechanism to do that already exists -
435                  * the obd_force flag. We'll drop the final ref to the
436                  * mds_osc_exp in mds_cleanup. */
437                 mds->mds_osc_obd->obd_force = 1;
438         }
439
440         RETURN(rc);
441 }
442
443 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
444                   void *karg, void *uarg)
445 {
446         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
447         struct obd_device *obd = exp->exp_obd;
448         struct mds_obd *mds = &obd->u.mds;
449         struct obd_ioctl_data *data = karg;
450         struct lvfs_run_ctxt saved;
451         int rc = 0;
452
453         ENTRY;
454         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
455
456         switch (cmd) {
457         case OBD_IOC_RECORD: {
458                 char *name = data->ioc_inlbuf1;
459                 if (mds->mds_cfg_llh)
460                         RETURN(-EBUSY);
461
462                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
463                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
464                                  &mds->mds_cfg_llh, NULL, name);
465                 if (rc == 0)
466                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
467                                          &cfg_uuid);
468                 else
469                         mds->mds_cfg_llh = NULL;
470                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
471
472                 RETURN(rc);
473         }
474
475         case OBD_IOC_ENDRECORD: {
476                 if (!mds->mds_cfg_llh)
477                         RETURN(-EBADF);
478
479                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
480                 rc = llog_close(mds->mds_cfg_llh);
481                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
482
483                 mds->mds_cfg_llh = NULL;
484                 RETURN(rc);
485         }
486
487         case OBD_IOC_CLEAR_LOG: {
488                 char *name = data->ioc_inlbuf1;
489                 if (mds->mds_cfg_llh)
490                         RETURN(-EBUSY);
491
492                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
493                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
494                                  &mds->mds_cfg_llh, NULL, name);
495                 if (rc == 0) {
496                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
497                                          NULL);
498
499                         rc = llog_destroy(mds->mds_cfg_llh);
500                         llog_free_handle(mds->mds_cfg_llh);
501                 }
502                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
503
504                 mds->mds_cfg_llh = NULL;
505                 RETURN(rc);
506         }
507
508         case OBD_IOC_DORECORD: {
509                 char *cfg_buf;
510                 struct llog_rec_hdr rec;
511                 if (!mds->mds_cfg_llh)
512                         RETURN(-EBADF);
513
514                 rec.lrh_len = llog_data_len(data->ioc_plen1);
515
516                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
517                         rec.lrh_type = OBD_CFG_REC;
518                 } else {
519                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
520                         RETURN(-EINVAL);
521                 }
522
523                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
524                 if (cfg_buf == NULL)
525                         RETURN(-EINVAL);
526                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
527                 if (rc) {
528                         OBD_FREE(cfg_buf, data->ioc_plen1);
529                         RETURN(rc);
530                 }
531
532                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
533                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
534                                     cfg_buf, -1);
535                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
536
537                 OBD_FREE(cfg_buf, data->ioc_plen1);
538                 RETURN(rc);
539         }
540
541         case OBD_IOC_PARSE: {
542                 struct llog_ctxt *ctxt =
543                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
544                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
545                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
546                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
547                 if (rc)
548                         RETURN(rc);
549
550                 RETURN(rc);
551         }
552
553         case OBD_IOC_DUMP_LOG: {
554                 struct llog_ctxt *ctxt =
555                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
556                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
557                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
558                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
559                 if (rc)
560                         RETURN(rc);
561
562                 RETURN(rc);
563         }
564
565         case OBD_IOC_SYNC: {
566                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
567                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
568                 RETURN(rc);
569         }
570
571         case OBD_IOC_SET_READONLY: {
572                 void *handle;
573                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
574                 BDEVNAME_DECLARE_STORAGE(tmp);
575                 CERROR("*** setting device %s read-only ***\n",
576                        ll_bdevname(obd->u.obt.obt_sb, tmp));
577
578                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
579                 if (!IS_ERR(handle))
580                         rc = fsfilt_commit(obd, inode, handle, 1);
581
582                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
583                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
584
585                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
586                 RETURN(0);
587         }
588
589         case OBD_IOC_CATLOGLIST: {
590                 int count = mds->mds_lov_desc.ld_tgt_count;
591                 rc = llog_catalog_list(obd, count, data);
592                 RETURN(rc);
593
594         }
595         case OBD_IOC_LLOG_CHECK:
596         case OBD_IOC_LLOG_CANCEL:
597         case OBD_IOC_LLOG_REMOVE: {
598                 struct llog_ctxt *ctxt =
599                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
600                 int rc2;
601                 __u32 group;
602
603                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
604                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
605                 rc = llog_ioctl(ctxt, cmd, data);
606                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
607                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
608                 group = FILTER_GROUP_MDS0 + mds->mds_id;
609                 rc2 = obd_set_info_async(mds->mds_osc_exp,
610                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
611                                          sizeof(group), &group, NULL);
612                 if (!rc)
613                         rc = rc2;
614                 RETURN(rc);
615         }
616         case OBD_IOC_LLOG_INFO:
617         case OBD_IOC_LLOG_PRINT: {
618                 struct llog_ctxt *ctxt =
619                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
620
621                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
622                 rc = llog_ioctl(ctxt, cmd, data);
623                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
624
625                 RETURN(rc);
626         }
627
628         case OBD_IOC_ABORT_RECOVERY:
629                 CERROR("aborting recovery for device %s\n", obd->obd_name);
630                 target_stop_recovery_thread(obd);
631                 RETURN(0);
632
633         default:
634                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
635                 RETURN(-EINVAL);
636         }
637         RETURN(0);
638
639 }
640
641 struct mds_lov_sync_info {
642         struct obd_device *mlsi_obd;     /* the lov device to sync */
643         struct obd_device *mlsi_watched; /* target osc */
644         __u32              mlsi_index;   /* index of target */
645 };
646
647 static int mds_propagate_capa_keys(struct mds_obd *mds)
648 {
649         struct lustre_capa_key *key;
650         int i, rc = 0;
651
652         ENTRY;
653
654         if (!mds->mds_capa_keys)
655                 RETURN(0);
656
657         for (i = 0; i < 2; i++) {
658                 key = &mds->mds_capa_keys[i];
659                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
660
661                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
662                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
663                 if (rc) {
664                         DEBUG_CAPA_KEY(D_ERROR, key,
665                                        "propagate failed (rc = %d) for", rc);
666                         RETURN(rc);
667                 }
668         }
669
670         RETURN(0);
671 }
672
673 /* We only sync one osc at a time, so that we don't have to hold
674    any kind of lock on the whole mds_lov_desc, which may change
675    (grow) as a result of mds_lov_add_ost.  This also avoids any
676    kind of mismatch between the lov_desc and the mds_lov_desc,
677    which are not in lock-step during lov_add_obd */
678 static int __mds_lov_synchronize(void *data)
679 {
680         struct mds_lov_sync_info *mlsi = data;
681         struct obd_device *obd = mlsi->mlsi_obd;
682         struct obd_device *watched = mlsi->mlsi_watched;
683         struct mds_obd *mds = &obd->u.mds;
684         struct obd_uuid *uuid;
685         __u32  idx = mlsi->mlsi_index;
686         struct mds_group_info mgi;
687         int rc = 0;
688         ENTRY;
689
690         OBD_FREE(mlsi, sizeof(*mlsi));
691
692         LASSERT(obd);
693         LASSERT(watched);
694         uuid = &watched->u.cli.cl_target_uuid;
695         LASSERT(uuid);
696
697         rc = mds_lov_update_mds(obd, watched, idx, uuid);
698         if (rc != 0)
699                 GOTO(out, rc);
700         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
701         mgi.uuid = uuid;
702         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
703                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
704         if (rc != 0)
705                 GOTO(out, rc);
706
707         /* propagate capability keys */
708         rc = mds_propagate_capa_keys(mds);
709         if (rc)
710                 GOTO(out, rc);
711
712         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
713                           mds->mds_lov_desc.ld_tgt_count,
714                           NULL, NULL, uuid);
715
716         if (rc != 0) {
717                 CERROR("%s: failed at llog_origin_connect: %d\n",
718                        obd->obd_name, rc);
719                 GOTO(out, rc);
720         }
721
722         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
723               obd->obd_name, obd_uuid2str(uuid));
724         /*
725          * FIXME: this obd_stopping was useless, 
726          * since obd in mdt layer was set
727          */
728         if (obd->obd_stopping)
729                 GOTO(out, rc = -ENODEV);
730
731         rc = mds_lov_clear_orphans(mds, uuid);
732         if (rc != 0) {
733                 CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
734                        obd->obd_name, rc);
735                 GOTO(out, rc);
736         }
737         
738         if (obd->obd_upcall.onu_owner) {
739                 /*
740                  * This is a hack for mds_notify->mdd_notify. When the mds obd
741                  * in mdd is removed, This hack should be removed.
742                  */
743                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
744                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
745                                                  obd->obd_upcall.onu_owner);
746         }
747         EXIT;
748 out:
749         class_decref(obd);
750         return rc;
751 }
752
753 int mds_lov_synchronize(void *data)
754 {
755         struct mds_lov_sync_info *mlsi = data;
756         char name[20];
757
758         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
759                 /* There is still a watched target,
760                 but we don't know its index */
761                 sprintf(name, "ll_sync_tgt");
762         else
763                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
764         ptlrpc_daemonize(name);
765
766         RETURN(__mds_lov_synchronize(data));
767 }
768
769 int mds_lov_start_synchronize(struct obd_device *obd,
770                               struct obd_device *watched,
771                               void *data, int nonblock)
772 {
773         struct mds_lov_sync_info *mlsi;
774         int rc;
775
776         ENTRY;
777
778         LASSERT(watched);
779
780         OBD_ALLOC(mlsi, sizeof(*mlsi));
781         if (mlsi == NULL)
782                 RETURN(-ENOMEM);
783
784         mlsi->mlsi_obd = obd;
785         mlsi->mlsi_watched = watched;
786         if (data)
787                 mlsi->mlsi_index = *(__u32 *)data;
788         else
789                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
790
791         /* Although class_export_get(obd->obd_self_export) would lock
792            the MDS in place, since it's only a self-export
793            it doesn't lock the LOV in place.  The LOV can be disconnected
794            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
795            Simply taking an export ref on the LOV doesn't help, because it's
796            still disconnected. Taking an obd reference insures that we don't
797            disconnect the LOV.  This of course means a cleanup won't
798            finish for as long as the sync is blocking. */
799         class_incref(obd);
800
801         if (nonblock) {
802                 /* Synchronize in the background */
803                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
804                                        CLONE_VM | CLONE_FILES);
805                 if (rc < 0) {
806                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
807                                obd->obd_name, rc);
808                         class_decref(obd);
809                 } else {
810                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
811                                "thread=%d\n", obd->obd_name,
812                                mlsi->mlsi_index, rc);
813                         rc = 0;
814                 }
815         } else {
816                 rc = __mds_lov_synchronize((void *)mlsi);
817         }
818
819         RETURN(rc);
820 }
821
822 int mds_notify(struct obd_device *obd, struct obd_device *watched,
823                enum obd_notify_event ev, void *data)
824 {
825         int rc = 0;
826         ENTRY;
827
828         switch (ev) {
829         /* We only handle these: */
830         case OBD_NOTIFY_ACTIVE:
831         case OBD_NOTIFY_SYNC:
832         case OBD_NOTIFY_SYNC_NONBLOCK:
833                 break;
834         case OBD_NOTIFY_CONFIG:
835                 /* Open for clients */
836                 obd->obd_no_conn = 0;
837         default:
838                 RETURN(0);
839         }
840
841         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
842         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
843                 CERROR("unexpected notification of %s %s!\n",
844                        watched->obd_type->typ_name, watched->obd_name);
845                 RETURN(-EINVAL);
846         }
847
848         if (obd->obd_recovering) {
849                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
850                       obd->obd_name,
851                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
852                 /* We still have to fix the lov descriptor for ost's added
853                    after the mdt in the config log.  They didn't make it into
854                    mds_lov_connect. */
855                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
856                 if (rc)
857                         RETURN(rc);
858                 /* We should update init llog here too for replay unlink and 
859                  * possiable llog init race when recovery complete */
860                 mutex_down(&obd->obd_dev_sem);
861                 llog_cat_initialize(obd, NULL, 
862                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
863                                     &watched->u.cli.cl_target_uuid);
864                 mutex_up(&obd->obd_dev_sem);
865                 RETURN(rc);
866         }
867
868         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
869         rc = mds_lov_start_synchronize(obd, watched, data,
870                                        !(ev == OBD_NOTIFY_SYNC));
871
872         lquota_recovery(mds_quota_interface_ref, obd);
873
874         RETURN(rc);
875 }
876
877 /* Convert the on-disk LOV EA structre.
878  * We always try to convert from an old LOV EA format to the common in-memory
879  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
880  * then convert back to the new on-disk format and save it back to disk
881  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
882  * to convert it each time this inode is accessed.
883  *
884  * This function is a bit interesting in the error handling.  We can safely
885  * ship the old lmm to the client in case of failure, since it uses the same
886  * obd_unpackmd() code and can do the conversion if the MDS fails for some
887  * reason.  We will not delete the old lmm data until we have written the
888  * new format lmm data in fsfilt_set_md(). */
889 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
890                        struct lov_mds_md *lmm, int lmm_size)
891 {
892         struct lov_stripe_md *lsm = NULL;
893         void *handle;
894         int rc, err;
895         ENTRY;
896
897         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
898             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
899                 RETURN(0);
900
901         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
902                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
903                LOV_MAGIC);
904
905         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
906         if (rc < 0)
907                 GOTO(conv_end, rc);
908
909         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
910         if (rc < 0)
911                 GOTO(conv_free, rc);
912         lmm_size = rc;
913
914         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
915         if (IS_ERR(handle)) {
916                 rc = PTR_ERR(handle);
917                 GOTO(conv_free, rc);
918         }
919
920         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
921
922         err = fsfilt_commit(obd, inode, handle, 0);
923         if (!rc)
924                 rc = err ? err : lmm_size;
925         GOTO(conv_free, rc);
926 conv_free:
927         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
928 conv_end:
929         return rc;
930 }
931
932 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
933                          struct lov_desc *desc)
934 {
935         int i;
936         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
937                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
938                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
939         }
940 }
941 EXPORT_SYMBOL(mds_objids_from_lmm);