Whamcloud - gitweb
- lost #define changes from 1_6
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         int dirty = 0;
49         ENTRY;
50
51         down_read(&mds->mds_lov_objids_sem);
52
53         spin_lock(&mds->mds_lov_objids_lock);
54         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) {
55                 if (ids[i] > (mds->mds_lov_objids)[i]) {
56                         (mds->mds_lov_objids)[i] = ids[i];
57                         dirty = 1;
58                 }
59         }
60         mds->mds_lov_objids_dirty = dirty;
61
62         spin_unlock(&mds->mds_lov_objids_lock);
63
64         up_read(&mds->mds_lov_objids_sem);
65
66         EXIT;
67 }
68 EXPORT_SYMBOL(mds_lov_update_objids);
69
70 static int mds_lov_read_objids(struct obd_device *obd)
71 {
72         struct mds_obd *mds = &obd->u.mds;
73         obd_id *ids;
74         loff_t off = 0;
75         int i, rc, size;
76         ENTRY;
77
78         LASSERT(!mds->mds_lov_objids_count);
79         LASSERT(!mds->mds_lov_objids_dirty);
80
81         /* Read everything in the file, even if our current lov desc
82            has fewer targets. Old targets not in the lov descriptor
83            during mds setup may still have valid objids. */
84         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
85         if (size == 0)
86                 GOTO(out, rc = 0);
87
88         OBD_ALLOC(ids, size);
89         if (ids == NULL)
90                 GOTO(out, rc = -ENOMEM);
91
92         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
93         if (rc < 0) {
94                 OBD_FREE(ids, size);
95                 CERROR("Error reading objids %d\n", rc);
96                 GOTO(out, rc);
97         }
98         mds->mds_lov_objids = ids;
99         mds->mds_lov_objids_count = size / sizeof(*ids);
100
101         for (i = 0; i < mds->mds_lov_objids_count; i++) {
102                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
103                        mds->mds_lov_objids[i], i);
104         }
105
106 out:
107         RETURN(rc);
108 }
109 /* must held mds_lov_objids_sem */
110 int mds_lov_write_objids(struct obd_device *obd)
111 {
112         struct mds_obd *mds = &obd->u.mds;
113         loff_t off = 0;
114         int i, rc, tgts;
115         ENTRY;
116         
117         spin_lock(&mds->mds_lov_objids_lock);
118         if (!mds->mds_lov_objids_dirty) {
119                 spin_unlock(&mds->mds_lov_objids_lock);
120                 RETURN(0);
121         }
122         mds->mds_lov_objids_dirty = 0;
123         spin_unlock(&mds->mds_lov_objids_lock);
124
125         tgts = mds->mds_lov_objids_count;
126         if (!tgts)
127                 RETURN(0);
128
129         for (i = 0; i < tgts; i++)
130                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
131                        mds->mds_lov_objids[i], i);
132
133         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
134                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
135                                  &off, 0);
136         if (rc >= 0) {
137                 rc = 0;
138                 cfs_waitq_signal(&mds->mds_lov_objids_wait);
139         } else {
140                 mds->mds_lov_objids_dirty = 1;
141         }
142
143         RETURN(rc);
144 }
145 EXPORT_SYMBOL(mds_lov_write_objids);
146
147 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
148 {
149         int rc;
150         struct obdo oa;
151         struct obd_trans_info oti = {0};
152         struct lov_stripe_md  *empty_ea = NULL;
153         ENTRY;
154
155         LASSERT(mds->mds_lov_objids != NULL);
156
157         /* This create will in fact either create or destroy:  If the OST is
158          * missing objects below this ID, they will be created.  If it finds
159          * objects above this ID, they will be removed. */
160         memset(&oa, 0, sizeof(oa));
161         oa.o_flags = OBD_FL_DELORPHAN;
162         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
163         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
164         if (ost_uuid != NULL) {
165                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
166                 oa.o_valid |= OBD_MD_FLINLINE;
167         }
168         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
169
170         RETURN(rc);
171 }
172
173 /* update the LOV-OSC knowledge of the last used object id's */
174 int mds_lov_set_nextid(struct obd_device *obd)
175 {
176         struct mds_obd *mds = &obd->u.mds;
177         int rc;
178         ENTRY;
179
180         LASSERT(!obd->obd_recovering);
181         if (mds->mds_lov_objids_count == 0)
182                 RETURN(0);
183
184         LASSERT(mds->mds_lov_objids != NULL);
185
186        /* mds->mds_lov_objids_sem must be held so mds_lov_objids doesn't change 
187         * we need use mds->mds_lov_desc.ld_tgt_count because in recovery not all
188         * target can be connected at start time */
189         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
190                                 KEY_NEXT_ID,
191                                 mds->mds_lov_desc.ld_tgt_count *
192                                 sizeof(*mds->mds_lov_objids),
193                                 mds->mds_lov_objids, NULL);
194
195         if (rc)
196                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
197                         obd->obd_name, rc);
198
199         RETURN(rc);
200 }
201
202 /* Update the lov desc for a new size lov. */
203 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
204 {
205         struct mds_obd *mds = &obd->u.mds;
206         struct lov_desc *ld;
207         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
208         int rc = 0;
209         ENTRY;
210
211         OBD_ALLOC(ld, sizeof(*ld));
212         if (!ld)
213                 RETURN(-ENOMEM);
214
215         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
216                           &valsize, ld);
217         if (rc)
218                 GOTO(out, rc);
219
220         /* The size of the LOV target table may have increased. */
221         if (ld->ld_tgt_count > mds->mds_lov_objids_count) {
222                 obd_id *ids;
223
224                 size = ld->ld_tgt_count * sizeof(obd_id);
225
226                 OBD_ALLOC(ids, size);
227                 if (ids == NULL)
228                         GOTO(out, rc = -ENOMEM);
229                 memset(ids, 0, size);
230
231                 /* write lock is enough for protect from access to
232                  * old pointer in mds_lov_update_objids and dirty == 0
233                  * is enough for protect from access in write_objids */
234                 if (mds->mds_lov_objids) {
235                         struct l_wait_info lwi = { 0 };
236                         obd_id *old_ids = mds->mds_lov_objids;
237
238                         memcpy(ids, mds->mds_lov_objids,
239                                mds_lov_objids_size(mds));
240
241                         l_wait_event(mds->mds_lov_objids_wait,
242                                      mds->mds_lov_objids_dirty == 0, &lwi);
243
244                         OBD_FREE(old_ids, mds_lov_objids_size(mds));
245                 }
246                 mds->mds_lov_objids = ids;
247                 mds->mds_lov_objids_count = ld->ld_tgt_count;
248         }
249
250         /* Don't change the mds_lov_desc until the objids size matches the
251            count (paranoia) */
252         mds->mds_lov_desc = *ld;
253         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
254                mds->mds_lov_desc.ld_tgt_count);
255
256         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
257                         max(mds->mds_lov_desc.ld_tgt_count,
258                             mds->mds_lov_objids_count));
259         mds->mds_max_mdsize = lov_mds_md_size(stripes);
260         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
261         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
262                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
263                stripes);
264
265         /* If we added a target we have to reconnect the llogs */
266         /* We only _need_ to do this at first add (idx), or the first time
267            after recovery.  However, it should now be safe to call anytime. */
268         mutex_down(&obd->obd_dev_sem);
269         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
270         mutex_up(&obd->obd_dev_sem);
271
272         /*XXX this notifies the MDD until lov handling use old mds code */
273         if (obd->obd_upcall.onu_owner) {
274                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
275                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
276                                                  obd->obd_upcall.onu_owner);
277         }
278 out:
279         OBD_FREE(ld, sizeof(*ld));
280         RETURN(rc);
281 }
282
283
284 #define MDSLOV_NO_INDEX -1
285
286 /* Inform MDS about new/updated target */
287 static int mds_lov_update_mds(struct obd_device *obd,
288                               struct obd_device *watched,
289                               __u32 idx, struct obd_uuid *uuid)
290 {
291         struct mds_obd *mds = &obd->u.mds;
292         int old_count;
293         int rc = 0;
294         obd_id lastid;
295         __u32 size = sizeof(lastid);
296
297         ENTRY;
298
299         old_count = mds->mds_lov_desc.ld_tgt_count;
300         down_write(&mds->mds_lov_objids_sem);
301         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
302         downgrade_write(&mds->mds_lov_objids_sem);
303         if (rc)
304                 GOTO(out, rc);
305
306         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
307                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
308                mds->mds_lov_desc.ld_tgt_count);
309
310         /* idx is set as data from lov_notify. */
311         if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
312                 GOTO(out, rc);
313
314         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
315                 CERROR("index %d > count %d!\n", idx,
316                        mds->mds_lov_desc.ld_tgt_count);
317                  GOTO(out, rc = -EINVAL);
318         }
319         rc = obd_get_info(watched->obd_self_export, sizeof("last_id"),
320                           "last_id", &size, &lastid);
321         if (rc)
322                 GOTO(out, rc);
323
324         spin_lock(&mds->mds_lov_objids_lock);
325         if (mds->mds_lov_objids[idx] == 0 || mds->mds_lov_objids[idx] > lastid) {
326                 /* last id not init or corrupted - use data from osc */
327                 mds->mds_lov_objids[idx] = lastid;
328                 mds->mds_lov_objids_dirty = 1;
329                 spin_unlock(&mds->mds_lov_objids_lock);
330                 /* not need write immediately, mark for write for avoid
331                  * lock inverse */
332         } else {
333                 spin_unlock(&mds->mds_lov_objids_lock);
334                 /* We have read this lastid from disk; tell the osc.
335                    Don't call this during recovery. */
336                 rc = mds_lov_set_nextid(obd);
337         }
338
339         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
340                mds->mds_lov_objids[idx], idx);
341         
342 out:
343         up_read(&mds->mds_lov_objids_sem);
344         RETURN(rc);
345 }
346
347 int mds_update_objids_from_lastid(struct obd_device *obd)
348 {
349         struct mds_obd *mds = &obd->u.mds;
350         int size;
351         int rc, i;
352
353         if (mds->mds_lov_objids_count < mds->mds_lov_desc.ld_tgt_count) {
354                 obd_id *ids;
355
356                 size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
357                 OBD_ALLOC(ids, size);
358                 if (ids == NULL)
359                         GOTO(out, rc = -ENOMEM);
360
361                 OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds));
362                 mds->mds_lov_objids = ids;
363                 mds->mds_lov_objids_count = size / sizeof(obd_id);
364         }
365
366         size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
367         rc = obd_get_info(mds->mds_osc_exp, sizeof("last_id"),
368                           "last_id", &size, mds->mds_lov_objids);
369         if (!rc) {
370                 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
371                         CWARN("got last object "LPU64" from OST %d\n",
372                               mds->mds_lov_objids[i], i);
373                 mds->mds_lov_objids_dirty = 1;
374                 rc = mds_lov_write_objids(obd);
375                 if (rc)
376                         CERROR("got last objids from OSTs, but error "
377                                "writing objids file: %d\n", rc);
378         }
379 out:
380         RETURN(rc);
381 }
382
383
384 /* update the LOV-OSC knowledge of the last used object id's */
385 int mds_lov_connect(struct obd_device *obd, char * lov_name)
386 {
387         struct mds_obd *mds = &obd->u.mds;
388         struct lustre_handle conn = {0,};
389         struct obd_connect_data *data;
390         int rc;
391         ENTRY;
392
393         if (IS_ERR(mds->mds_osc_obd))
394                 RETURN(PTR_ERR(mds->mds_osc_obd));
395
396         if (mds->mds_osc_obd)
397                 RETURN(0);
398
399         mds->mds_osc_obd = class_name2obd(lov_name);
400         if (!mds->mds_osc_obd) {
401                 CERROR("MDS cannot locate LOV %s\n", lov_name);
402                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
403                 RETURN(-ENOTCONN);
404         }
405
406         OBD_ALLOC(data, sizeof(*data));
407         if (data == NULL)
408                 RETURN(-ENOMEM);
409         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
410                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
411                                   OBD_CONNECT_OSS_CAPA;
412 #ifdef HAVE_LRU_RESIZE_SUPPORT
413         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
414 #endif
415         data->ocd_version = LUSTRE_VERSION_CODE;
416         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
417         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
418         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
419         OBD_FREE(data, sizeof(*data));
420         if (rc) {
421                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
422                 mds->mds_osc_obd = ERR_PTR(rc);
423                 RETURN(rc);
424         }
425         mds->mds_osc_exp = class_conn2export(&conn);
426
427         rc = obd_register_observer(mds->mds_osc_obd, obd);
428         if (rc) {
429                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
430                        lov_name, rc);
431                 GOTO(err_discon, rc);
432         }
433
434         /* Deny new client connections until we are sure we have some OSTs */
435         obd->obd_no_conn = 1;
436
437         down_write(&mds->mds_lov_objids_sem);
438         rc = mds_lov_read_objids(obd);
439         if (rc) {
440                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
441                 GOTO(err_reg, rc);
442         }
443
444         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
445         if (rc)
446                 GOTO(err_reg, rc);
447
448         /* If we're mounting this code for the first time on an existing FS,
449          * we need to populate the objids array from the real OST values */
450         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_count) {
451                 mds_update_objids_from_lastid(obd);
452         }
453         up_write(&mds->mds_lov_objids_sem);
454
455         /* I want to see a callback happen when the OBD moves to a
456          * "For General Use" state, and that's when we'll call
457          * set_nextid().  The class driver can help us here, because
458          * it can use the obd_recovering flag to determine when the
459          * the OBD is full available. */
460         /* MDD device will care about that
461         if (!obd->obd_recovering)
462                 rc = mds_postrecov(obd);
463          */
464         RETURN(rc);
465
466 err_reg:
467         up_write(&mds->mds_lov_objids_sem);
468         obd_register_observer(mds->mds_osc_obd, NULL);
469 err_discon:
470         obd_disconnect(mds->mds_osc_exp);
471         mds->mds_osc_exp = NULL;
472         mds->mds_osc_obd = ERR_PTR(rc);
473         RETURN(rc);
474 }
475
476 int mds_lov_disconnect(struct obd_device *obd)
477 {
478         struct mds_obd *mds = &obd->u.mds;
479         int rc = 0;
480         ENTRY;
481
482         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
483                 obd_register_observer(mds->mds_osc_obd, NULL);
484
485                 /* The actual disconnect of the mds_lov will be called from
486                  * class_disconnect_exports from mds_lov_clean. So we have to
487                  * ensure that class_cleanup doesn't fail due to the extra ref
488                  * we're holding now. The mechanism to do that already exists -
489                  * the obd_force flag. We'll drop the final ref to the
490                  * mds_osc_exp in mds_cleanup. */
491                 mds->mds_osc_obd->obd_force = 1;
492         }
493
494         RETURN(rc);
495 }
496
497 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
498                   void *karg, void *uarg)
499 {
500         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
501         struct obd_device *obd = exp->exp_obd;
502         struct mds_obd *mds = &obd->u.mds;
503         struct obd_ioctl_data *data = karg;
504         struct lvfs_run_ctxt saved;
505         int rc = 0;
506
507         ENTRY;
508         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
509
510         switch (cmd) {
511         case OBD_IOC_RECORD: {
512                 char *name = data->ioc_inlbuf1;
513                 if (mds->mds_cfg_llh)
514                         RETURN(-EBUSY);
515
516                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
517                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
518                                  &mds->mds_cfg_llh, NULL, name);
519                 if (rc == 0)
520                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
521                                          &cfg_uuid);
522                 else
523                         mds->mds_cfg_llh = NULL;
524                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
525
526                 RETURN(rc);
527         }
528
529         case OBD_IOC_ENDRECORD: {
530                 if (!mds->mds_cfg_llh)
531                         RETURN(-EBADF);
532
533                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
534                 rc = llog_close(mds->mds_cfg_llh);
535                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
536
537                 mds->mds_cfg_llh = NULL;
538                 RETURN(rc);
539         }
540
541         case OBD_IOC_CLEAR_LOG: {
542                 char *name = data->ioc_inlbuf1;
543                 if (mds->mds_cfg_llh)
544                         RETURN(-EBUSY);
545
546                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
547                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
548                                  &mds->mds_cfg_llh, NULL, name);
549                 if (rc == 0) {
550                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
551                                          NULL);
552
553                         rc = llog_destroy(mds->mds_cfg_llh);
554                         llog_free_handle(mds->mds_cfg_llh);
555                 }
556                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
557
558                 mds->mds_cfg_llh = NULL;
559                 RETURN(rc);
560         }
561
562         case OBD_IOC_DORECORD: {
563                 char *cfg_buf;
564                 struct llog_rec_hdr rec;
565                 if (!mds->mds_cfg_llh)
566                         RETURN(-EBADF);
567
568                 rec.lrh_len = llog_data_len(data->ioc_plen1);
569
570                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
571                         rec.lrh_type = OBD_CFG_REC;
572                 } else {
573                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
574                         RETURN(-EINVAL);
575                 }
576
577                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
578                 if (cfg_buf == NULL)
579                         RETURN(-EINVAL);
580                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
581                 if (rc) {
582                         OBD_FREE(cfg_buf, data->ioc_plen1);
583                         RETURN(rc);
584                 }
585
586                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
587                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
588                                     cfg_buf, -1);
589                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
590
591                 OBD_FREE(cfg_buf, data->ioc_plen1);
592                 RETURN(rc);
593         }
594
595         case OBD_IOC_PARSE: {
596                 struct llog_ctxt *ctxt =
597                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
598                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
599                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
600                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
601                 if (rc)
602                         RETURN(rc);
603
604                 RETURN(rc);
605         }
606
607         case OBD_IOC_DUMP_LOG: {
608                 struct llog_ctxt *ctxt =
609                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
610                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
611                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
612                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
613                 if (rc)
614                         RETURN(rc);
615
616                 RETURN(rc);
617         }
618
619         case OBD_IOC_SYNC: {
620                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
621                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
622                 RETURN(rc);
623         }
624
625         case OBD_IOC_SET_READONLY: {
626                 void *handle;
627                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
628                 BDEVNAME_DECLARE_STORAGE(tmp);
629                 CERROR("*** setting device %s read-only ***\n",
630                        ll_bdevname(obd->u.obt.obt_sb, tmp));
631
632                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
633                 if (!IS_ERR(handle))
634                         rc = fsfilt_commit(obd, inode, handle, 1);
635
636                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
637                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
638
639                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
640                 RETURN(0);
641         }
642
643         case OBD_IOC_CATLOGLIST: {
644                 int count = mds->mds_lov_desc.ld_tgt_count;
645                 rc = llog_catalog_list(obd, count, data);
646                 RETURN(rc);
647
648         }
649         case OBD_IOC_LLOG_CHECK:
650         case OBD_IOC_LLOG_CANCEL:
651         case OBD_IOC_LLOG_REMOVE: {
652                 struct llog_ctxt *ctxt =
653                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
654                 int rc2;
655                 __u32 group;
656
657                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
658                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
659                 rc = llog_ioctl(ctxt, cmd, data);
660                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
661                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
662                 group = FILTER_GROUP_MDS0 + mds->mds_id;
663                 rc2 = obd_set_info_async(mds->mds_osc_exp,
664                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
665                                          sizeof(group), &group, NULL);
666                 if (!rc)
667                         rc = rc2;
668                 RETURN(rc);
669         }
670         case OBD_IOC_LLOG_INFO:
671         case OBD_IOC_LLOG_PRINT: {
672                 struct llog_ctxt *ctxt =
673                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
674
675                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
676                 rc = llog_ioctl(ctxt, cmd, data);
677                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
678
679                 RETURN(rc);
680         }
681
682         case OBD_IOC_ABORT_RECOVERY:
683                 CERROR("aborting recovery for device %s\n", obd->obd_name);
684                 target_stop_recovery_thread(obd);
685                 RETURN(0);
686
687         default:
688                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
689                 RETURN(-EINVAL);
690         }
691         RETURN(0);
692
693 }
694
695 /* Collect the preconditions we need to allow client connects */
696 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
697 {
698         if (flag & CONFIG_LOG)
699                 obd->u.mds.mds_fl_cfglog = 1;
700         if (flag & CONFIG_SYNC)
701                 obd->u.mds.mds_fl_synced = 1;
702         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
703                 /* Open for clients */
704                 obd->obd_no_conn = 0;
705 }
706
707 struct mds_lov_sync_info {
708         struct obd_device *mlsi_obd;     /* the lov device to sync */
709         struct obd_device *mlsi_watched; /* target osc */
710         __u32              mlsi_index;   /* index of target */
711 };
712
713 static int mds_propagate_capa_keys(struct mds_obd *mds)
714 {
715         struct lustre_capa_key *key;
716         int i, rc = 0;
717
718         ENTRY;
719
720         if (!mds->mds_capa_keys)
721                 RETURN(0);
722
723         for (i = 0; i < 2; i++) {
724                 key = &mds->mds_capa_keys[i];
725                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
726
727                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
728                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
729                 if (rc) {
730                         DEBUG_CAPA_KEY(D_ERROR, key,
731                                        "propagate failed (rc = %d) for", rc);
732                         RETURN(rc);
733                 }
734         }
735
736         RETURN(0);
737 }
738
739 /* We only sync one osc at a time, so that we don't have to hold
740    any kind of lock on the whole mds_lov_desc, which may change
741    (grow) as a result of mds_lov_add_ost.  This also avoids any
742    kind of mismatch between the lov_desc and the mds_lov_desc,
743    which are not in lock-step during lov_add_obd */
744 static int __mds_lov_synchronize(void *data)
745 {
746         struct mds_lov_sync_info *mlsi = data;
747         struct obd_device *obd = mlsi->mlsi_obd;
748         struct obd_device *watched = mlsi->mlsi_watched;
749         struct mds_obd *mds = &obd->u.mds;
750         struct obd_uuid *uuid;
751         __u32  idx = mlsi->mlsi_index;
752         struct mds_group_info mgi;
753         int rc = 0;
754         ENTRY;
755
756         OBD_FREE(mlsi, sizeof(*mlsi));
757
758         LASSERT(obd);
759         LASSERT(watched);
760         uuid = &watched->u.cli.cl_target_uuid;
761         LASSERT(uuid);
762
763         rc = mds_lov_update_mds(obd, watched, idx, uuid);
764         if (rc != 0) {
765                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
766                 GOTO(out, rc);
767         }
768         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
769         mgi.uuid = uuid;
770
771         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
772                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
773         if (rc != 0)
774                 GOTO(out, rc);
775
776         /* propagate capability keys */
777         rc = mds_propagate_capa_keys(mds);
778         if (rc)
779                 GOTO(out, rc);
780
781         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
782                           mds->mds_lov_desc.ld_tgt_count,
783                           NULL, NULL, uuid);
784
785         if (rc != 0) {
786                 CERROR("%s: failed at llog_origin_connect: %d\n",
787                        obd->obd_name, rc);
788                 GOTO(out, rc);
789         }
790
791         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
792               obd->obd_name, obd_uuid2str(uuid));
793         /*
794          * FIXME: this obd_stopping was useless, 
795          * since obd in mdt layer was set
796          */
797         if (obd->obd_stopping)
798                 GOTO(out, rc = -ENODEV);
799
800         rc = mds_lov_clear_orphans(mds, uuid);
801         if (rc != 0) {
802                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
803                        obd_uuid2str(uuid), rc);
804                 GOTO(out, rc);
805         }
806         
807         if (obd->obd_upcall.onu_owner) {
808                 /*
809                  * This is a hack for mds_notify->mdd_notify. When the mds obd
810                  * in mdd is removed, This hack should be removed.
811                  */
812                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
813                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
814                                                  obd->obd_upcall.onu_owner);
815         }
816         EXIT;
817 out:
818         class_decref(obd);
819         return rc;
820 }
821
822 int mds_lov_synchronize(void *data)
823 {
824         struct mds_lov_sync_info *mlsi = data;
825         char name[20];
826
827         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
828                 /* There is still a watched target,
829                 but we don't know its index */
830                 sprintf(name, "ll_sync_tgt");
831         else
832                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
833         ptlrpc_daemonize(name);
834
835         RETURN(__mds_lov_synchronize(data));
836 }
837
838 int mds_lov_start_synchronize(struct obd_device *obd,
839                               struct obd_device *watched,
840                               void *data, int nonblock)
841 {
842         struct mds_lov_sync_info *mlsi;
843         int rc;
844
845         ENTRY;
846
847         LASSERT(watched);
848
849         OBD_ALLOC(mlsi, sizeof(*mlsi));
850         if (mlsi == NULL)
851                 RETURN(-ENOMEM);
852
853         mlsi->mlsi_obd = obd;
854         mlsi->mlsi_watched = watched;
855         if (data)
856                 mlsi->mlsi_index = *(__u32 *)data;
857         else
858                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
859
860         /* Although class_export_get(obd->obd_self_export) would lock
861            the MDS in place, since it's only a self-export
862            it doesn't lock the LOV in place.  The LOV can be disconnected
863            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
864            Simply taking an export ref on the LOV doesn't help, because it's
865            still disconnected. Taking an obd reference insures that we don't
866            disconnect the LOV.  This of course means a cleanup won't
867            finish for as long as the sync is blocking. */
868         class_incref(obd);
869
870         if (nonblock) {
871                 /* Synchronize in the background */
872                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
873                                        CLONE_VM | CLONE_FILES);
874                 if (rc < 0) {
875                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
876                                obd->obd_name, rc);
877                         class_decref(obd);
878                 } else {
879                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
880                                "thread=%d\n", obd->obd_name,
881                                mlsi->mlsi_index, rc);
882                         rc = 0;
883                 }
884         } else {
885                 rc = __mds_lov_synchronize((void *)mlsi);
886         }
887
888         RETURN(rc);
889 }
890
891 int mds_notify(struct obd_device *obd, struct obd_device *watched,
892                enum obd_notify_event ev, void *data)
893 {
894         struct mds_obd *mds = &obd->u.mds;
895         int rc = 0;
896         ENTRY;
897
898         switch (ev) {
899         /* We only handle these: */
900         case OBD_NOTIFY_ACTIVE:
901         case OBD_NOTIFY_SYNC:
902         case OBD_NOTIFY_SYNC_NONBLOCK:
903                 break;
904         case OBD_NOTIFY_CONFIG:
905                 mds_allow_cli(obd, (unsigned int)data);
906         default:
907                 RETURN(0);
908         }
909
910         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
911         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
912                 CERROR("unexpected notification of %s %s!\n",
913                        watched->obd_type->typ_name, watched->obd_name);
914                 RETURN(-EINVAL);
915         }
916
917         if (obd->obd_recovering) {
918                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
919                       obd->obd_name,
920                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
921                 /* We still have to fix the lov descriptor for ost's added
922                    after the mdt in the config log.  They didn't make it into
923                    mds_lov_connect. */
924                 down_write(&mds->mds_lov_objids_sem);
925                 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
926                 up_write(&mds->mds_lov_objids_sem);
927                 if (rc)
928                         RETURN(rc);
929                 /* We should update init llog here too for replay unlink and 
930                  * possiable llog init race when recovery complete */
931                 mutex_down(&obd->obd_dev_sem);
932                 llog_cat_initialize(obd, NULL, 
933                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
934                                     &watched->u.cli.cl_target_uuid);
935                 mutex_up(&obd->obd_dev_sem);
936                 mds_allow_cli(obd, CONFIG_SYNC);
937                 RETURN(rc);
938         }
939
940         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
941         rc = mds_lov_start_synchronize(obd, watched, data,
942                                        !(ev == OBD_NOTIFY_SYNC));
943
944         lquota_recovery(mds_quota_interface_ref, obd);
945
946         RETURN(rc);
947 }
948
949 /* Convert the on-disk LOV EA structre.
950  * We always try to convert from an old LOV EA format to the common in-memory
951  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
952  * then convert back to the new on-disk format and save it back to disk
953  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
954  * to convert it each time this inode is accessed.
955  *
956  * This function is a bit interesting in the error handling.  We can safely
957  * ship the old lmm to the client in case of failure, since it uses the same
958  * obd_unpackmd() code and can do the conversion if the MDS fails for some
959  * reason.  We will not delete the old lmm data until we have written the
960  * new format lmm data in fsfilt_set_md(). */
961 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
962                        struct lov_mds_md *lmm, int lmm_size)
963 {
964         struct lov_stripe_md *lsm = NULL;
965         void *handle;
966         int rc, err;
967         ENTRY;
968
969         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
970             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
971                 RETURN(0);
972
973         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
974                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
975                LOV_MAGIC);
976
977         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
978         if (rc < 0)
979                 GOTO(conv_end, rc);
980
981         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
982         if (rc < 0)
983                 GOTO(conv_free, rc);
984         lmm_size = rc;
985
986         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
987         if (IS_ERR(handle)) {
988                 rc = PTR_ERR(handle);
989                 GOTO(conv_free, rc);
990         }
991
992         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
993
994         err = fsfilt_commit(obd, inode, handle, 0);
995         if (!rc)
996                 rc = err ? err : lmm_size;
997         GOTO(conv_free, rc);
998 conv_free:
999         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1000 conv_end:
1001         return rc;
1002 }
1003
1004 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
1005                          struct lov_desc *desc)
1006 {
1007         int i;
1008         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
1009                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
1010                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
1011         }
1012 }
1013 EXPORT_SYMBOL(mds_objids_from_lmm);