Whamcloud - gitweb
refine locking for avoid write wrong info into lov_objid file and some races.
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         int dirty = 0;
49         ENTRY;
50
51         down_read(&mds->mds_lov_objids_sem);
52
53         spin_lock(&mds->mds_lov_objids_lock);
54         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) {
55                 if (ids[i] > (mds->mds_lov_objids)[i]) {
56                         (mds->mds_lov_objids)[i] = ids[i];
57                         dirty = 1;
58                 }
59         }
60         mds->mds_lov_objids_dirty = dirty;
61
62         spin_unlock(&mds->mds_lov_objids_lock);
63
64         up_read(&mds->mds_lov_objids_sem);
65
66         EXIT;
67 }
68 EXPORT_SYMBOL(mds_lov_update_objids);
69
70 static int mds_lov_read_objids(struct obd_device *obd)
71 {
72         struct mds_obd *mds = &obd->u.mds;
73         obd_id *ids;
74         loff_t off = 0;
75         int i, rc, size;
76         ENTRY;
77
78         LASSERT(!mds->mds_lov_objids_count);
79         LASSERT(!mds->mds_lov_objids_dirty);
80
81         /* Read everything in the file, even if our current lov desc
82            has fewer targets. Old targets not in the lov descriptor
83            during mds setup may still have valid objids. */
84         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
85         if (size == 0)
86                 GOTO(out, rc = 0);
87
88         OBD_ALLOC(ids, size);
89         if (ids == NULL)
90                 GOTO(out, rc = -ENOMEM);
91
92         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
93         if (rc < 0) {
94                 OBD_FREE(ids, size);
95                 CERROR("Error reading objids %d\n", rc);
96                 GOTO(out, rc);
97         }
98         mds->mds_lov_objids = ids;
99         mds->mds_lov_objids_count = size / sizeof(*ids);
100
101         for (i = 0; i < mds->mds_lov_objids_count; i++) {
102                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
103                        mds->mds_lov_objids[i], i);
104         }
105
106 out:
107         RETURN(rc);
108 }
109 /* must held mds_lov_objids_sem */
110 int mds_lov_write_objids(struct obd_device *obd)
111 {
112         struct mds_obd *mds = &obd->u.mds;
113         loff_t off = 0;
114         int i, rc, tgts;
115         ENTRY;
116         
117         spin_lock(&mds->mds_lov_objids_lock);
118         if (!mds->mds_lov_objids_dirty) {
119                 spin_unlock(&mds->mds_lov_objids_lock);
120                 RETURN(0);
121         }
122         mds->mds_lov_objids_dirty = 0;
123         spin_unlock(&mds->mds_lov_objids_lock);
124
125         tgts = mds->mds_lov_objids_count;
126         if (!tgts)
127                 RETURN(0);
128
129         for (i = 0; i < tgts; i++)
130                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
131                        mds->mds_lov_objids[i], i);
132
133         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
134                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
135                                  &off, 0);
136         if (rc >= 0) {
137                 rc = 0;
138                 cfs_waitq_signal(&mds->mds_lov_objids_wait);
139         } else {
140                 mds->mds_lov_objids_dirty = 1;
141         }
142
143         RETURN(rc);
144 }
145 EXPORT_SYMBOL(mds_lov_write_objids);
146
147 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
148 {
149         int rc;
150         struct obdo oa;
151         struct obd_trans_info oti = {0};
152         struct lov_stripe_md  *empty_ea = NULL;
153         ENTRY;
154
155         LASSERT(mds->mds_lov_objids != NULL);
156
157         /* This create will in fact either create or destroy:  If the OST is
158          * missing objects below this ID, they will be created.  If it finds
159          * objects above this ID, they will be removed. */
160         memset(&oa, 0, sizeof(oa));
161         oa.o_flags = OBD_FL_DELORPHAN;
162         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
163         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
164         if (ost_uuid != NULL) {
165                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
166                 oa.o_valid |= OBD_MD_FLINLINE;
167         }
168         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
169
170         RETURN(rc);
171 }
172
173 /* update the LOV-OSC knowledge of the last used object id's */
174 int mds_lov_set_nextid(struct obd_device *obd)
175 {
176         struct mds_obd *mds = &obd->u.mds;
177         int rc;
178         ENTRY;
179
180         LASSERT(!obd->obd_recovering);
181         if (mds->mds_lov_objids_count == 0)
182                 RETURN(0);
183
184         LASSERT(mds->mds_lov_objids != NULL);
185
186        /* mds->mds_lov_objids_sem must be held so mds_lov_objids doesn't change 
187         * we need use mds->mds_lov_desc.ld_tgt_count because in recovery not all
188         * target can be connected at start time */
189         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
190                                 KEY_NEXT_ID,
191                                 mds->mds_lov_desc.ld_tgt_count *
192                                 sizeof(*mds->mds_lov_objids),
193                                 mds->mds_lov_objids, NULL);
194
195         if (rc)
196                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
197                         obd->obd_name, rc);
198
199         RETURN(rc);
200 }
201
202 /* Update the lov desc for a new size lov. */
203 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
204 {
205         struct mds_obd *mds = &obd->u.mds;
206         struct lov_desc *ld;
207         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
208         int rc = 0;
209         ENTRY;
210
211         OBD_ALLOC(ld, sizeof(*ld));
212         if (!ld)
213                 RETURN(-ENOMEM);
214
215         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
216                           &valsize, ld);
217         if (rc)
218                 GOTO(out, rc);
219
220         /* The size of the LOV target table may have increased. */
221         if (ld->ld_tgt_count > mds->mds_lov_objids_count) {
222                 obd_id *ids;
223
224                 size = ld->ld_tgt_count * sizeof(obd_id);
225
226                 OBD_ALLOC(ids, size);
227                 if (ids == NULL)
228                         GOTO(out, rc = -ENOMEM);
229                 memset(ids, 0, size);
230
231                 /* write lock is enough for protect from access to
232                  * old pointer in mds_lov_update_objids and dirty == 0
233                  * is enough for protect from access in write_objids */
234                 if (mds->mds_lov_objids) {
235                         struct l_wait_info lwi = { 0 };
236                         obd_id *old_ids = mds->mds_lov_objids;
237
238                         memcpy(ids, mds->mds_lov_objids,
239                                mds_lov_objids_size(mds));
240
241                         l_wait_event(mds->mds_lov_objids_wait,
242                                      mds->mds_lov_objids_dirty == 0, &lwi);
243
244                         OBD_FREE(old_ids, mds_lov_objids_size(mds));
245                 }
246                 mds->mds_lov_objids = ids;
247                 mds->mds_lov_objids_count = ld->ld_tgt_count;
248         }
249
250         /* Don't change the mds_lov_desc until the objids size matches the
251            count (paranoia) */
252         mds->mds_lov_desc = *ld;
253         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
254                mds->mds_lov_desc.ld_tgt_count);
255
256         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
257                         max(mds->mds_lov_desc.ld_tgt_count,
258                             mds->mds_lov_objids_count));
259         mds->mds_max_mdsize = lov_mds_md_size(stripes);
260         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
261         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
262                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
263                stripes);
264
265         /* If we added a target we have to reconnect the llogs */
266         /* We only _need_ to do this at first add (idx), or the first time
267            after recovery.  However, it should now be safe to call anytime. */
268         mutex_down(&obd->obd_dev_sem);
269         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
270         mutex_up(&obd->obd_dev_sem);
271
272         /*XXX this notifies the MDD until lov handling use old mds code */
273         if (obd->obd_upcall.onu_owner) {
274                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
275                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
276                                                  obd->obd_upcall.onu_owner);
277         }
278 out:
279         OBD_FREE(ld, sizeof(*ld));
280         RETURN(rc);
281 }
282
283
284 #define MDSLOV_NO_INDEX -1
285
286 /* Inform MDS about new/updated target */
287 static int mds_lov_update_mds(struct obd_device *obd,
288                               struct obd_device *watched,
289                               __u32 idx, struct obd_uuid *uuid)
290 {
291         struct mds_obd *mds = &obd->u.mds;
292         int old_count;
293         int rc = 0;
294         obd_id lastid;
295         __u32 size = sizeof(lastid);
296
297         ENTRY;
298
299         old_count = mds->mds_lov_desc.ld_tgt_count;
300         down_write(&mds->mds_lov_objids_sem);
301         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
302         downgrade_write(&mds->mds_lov_objids_sem);
303         if (rc)
304                 GOTO(out, rc);
305
306         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
307                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
308                mds->mds_lov_desc.ld_tgt_count);
309
310         /* idx is set as data from lov_notify. */
311         if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
312                 GOTO(out, rc);
313
314         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
315                 CERROR("index %d > count %d!\n", idx,
316                        mds->mds_lov_desc.ld_tgt_count);
317                  GOTO(out, rc = -EINVAL);
318         }
319         rc = obd_get_info(watched->obd_self_export, sizeof("last_id"),
320                           "last_id", &size, &lastid);
321         if (rc)
322                 GOTO(out, rc);
323
324         spin_lock(&mds->mds_lov_objids_lock);
325         if (mds->mds_lov_objids[idx] == 0 || mds->mds_lov_objids[idx] > lastid) {
326                 /* last id not init or corrupted - use data from osc */
327                 mds->mds_lov_objids[idx] = lastid;
328                 mds->mds_lov_objids_dirty = 1;
329                 spin_unlock(&mds->mds_lov_objids_lock);
330                 /* not need write immediately, mark for write for avoid
331                  * lock inverse */
332         } else {
333                 spin_unlock(&mds->mds_lov_objids_lock);
334                 /* We have read this lastid from disk; tell the osc.
335                    Don't call this during recovery. */
336                 rc = mds_lov_set_nextid(obd);
337         }
338
339         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
340                mds->mds_lov_objids[idx], idx);
341         
342 out:
343         up_read(&mds->mds_lov_objids_sem);
344         RETURN(rc);
345 }
346
347 int mds_update_objids_from_lastid(struct obd_device *obd)
348 {
349         struct mds_obd *mds = &obd->u.mds;
350         int size;
351         int rc, i;
352
353         if (mds->mds_lov_objids_count < mds->mds_lov_desc.ld_tgt_count) {
354                 obd_id *ids;
355
356                 size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
357                 OBD_ALLOC(ids, size);
358                 if (ids == NULL)
359                         GOTO(out, rc = -ENOMEM);
360
361                 OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds));
362                 mds->mds_lov_objids = ids;
363                 mds->mds_lov_objids_count = size / sizeof(obd_id);
364         }
365
366         size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id);
367         rc = obd_get_info(mds->mds_osc_exp, sizeof("last_id"),
368                           "last_id", &size, mds->mds_lov_objids);
369         if (!rc) {
370                 for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
371                         CWARN("got last object "LPU64" from OST %d\n",
372                               mds->mds_lov_objids[i], i);
373                 mds->mds_lov_objids_dirty = 1;
374                 rc = mds_lov_write_objids(obd);
375                 if (rc)
376                         CERROR("got last objids from OSTs, but error "
377                                "writing objids file: %d\n", rc);
378         }
379 out:
380         RETURN(rc);
381 }
382
383
384 /* update the LOV-OSC knowledge of the last used object id's */
385 int mds_lov_connect(struct obd_device *obd, char * lov_name)
386 {
387         struct mds_obd *mds = &obd->u.mds;
388         struct lustre_handle conn = {0,};
389         struct obd_connect_data *data;
390         int rc;
391         ENTRY;
392
393         if (IS_ERR(mds->mds_osc_obd))
394                 RETURN(PTR_ERR(mds->mds_osc_obd));
395
396         if (mds->mds_osc_obd)
397                 RETURN(0);
398
399         mds->mds_osc_obd = class_name2obd(lov_name);
400         if (!mds->mds_osc_obd) {
401                 CERROR("MDS cannot locate LOV %s\n", lov_name);
402                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
403                 RETURN(-ENOTCONN);
404         }
405
406         OBD_ALLOC(data, sizeof(*data));
407         if (data == NULL)
408                 RETURN(-ENOMEM);
409         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
410                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
411                                   OBD_CONNECT_OSS_CAPA;
412 #ifdef HAVE_LRU_RESIZE_SUPPORT
413         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
414 #endif
415         data->ocd_version = LUSTRE_VERSION_CODE;
416         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
417         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
418         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
419         OBD_FREE(data, sizeof(*data));
420         if (rc) {
421                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
422                 mds->mds_osc_obd = ERR_PTR(rc);
423                 RETURN(rc);
424         }
425         mds->mds_osc_exp = class_conn2export(&conn);
426
427         rc = obd_register_observer(mds->mds_osc_obd, obd);
428         if (rc) {
429                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
430                        lov_name, rc);
431                 GOTO(err_discon, rc);
432         }
433
434         /* Deny new client connections until we are sure we have some OSTs */
435         obd->obd_no_conn = 1;
436
437         down_write(&mds->mds_lov_objids_sem);
438         rc = mds_lov_read_objids(obd);
439         if (rc) {
440                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
441                 GOTO(err_reg, rc);
442         }
443
444         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
445         if (rc)
446                 GOTO(err_reg, rc);
447
448         /* If we're mounting this code for the first time on an existing FS,
449          * we need to populate the objids array from the real OST values */
450         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_count) {
451                 mds_update_objids_from_lastid(obd);
452         }
453         up_write(&mds->mds_lov_objids_sem);
454
455         /* I want to see a callback happen when the OBD moves to a
456          * "For General Use" state, and that's when we'll call
457          * set_nextid().  The class driver can help us here, because
458          * it can use the obd_recovering flag to determine when the
459          * the OBD is full available. */
460         /* MDD device will care about that
461         if (!obd->obd_recovering)
462                 rc = mds_postrecov(obd);
463          */
464         RETURN(rc);
465
466 err_reg:
467         up_write(&mds->mds_lov_objids_sem);
468         obd_register_observer(mds->mds_osc_obd, NULL);
469 err_discon:
470         obd_disconnect(mds->mds_osc_exp);
471         mds->mds_osc_exp = NULL;
472         mds->mds_osc_obd = ERR_PTR(rc);
473         RETURN(rc);
474 }
475
476 int mds_lov_disconnect(struct obd_device *obd)
477 {
478         struct mds_obd *mds = &obd->u.mds;
479         int rc = 0;
480         ENTRY;
481
482         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
483                 obd_register_observer(mds->mds_osc_obd, NULL);
484
485                 /* The actual disconnect of the mds_lov will be called from
486                  * class_disconnect_exports from mds_lov_clean. So we have to
487                  * ensure that class_cleanup doesn't fail due to the extra ref
488                  * we're holding now. The mechanism to do that already exists -
489                  * the obd_force flag. We'll drop the final ref to the
490                  * mds_osc_exp in mds_cleanup. */
491                 mds->mds_osc_obd->obd_force = 1;
492         }
493
494         RETURN(rc);
495 }
496
497 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
498                   void *karg, void *uarg)
499 {
500         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
501         struct obd_device *obd = exp->exp_obd;
502         struct mds_obd *mds = &obd->u.mds;
503         struct obd_ioctl_data *data = karg;
504         struct lvfs_run_ctxt saved;
505         int rc = 0;
506
507         ENTRY;
508         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
509
510         switch (cmd) {
511         case OBD_IOC_RECORD: {
512                 char *name = data->ioc_inlbuf1;
513                 if (mds->mds_cfg_llh)
514                         RETURN(-EBUSY);
515
516                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
517                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
518                                  &mds->mds_cfg_llh, NULL, name);
519                 if (rc == 0)
520                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
521                                          &cfg_uuid);
522                 else
523                         mds->mds_cfg_llh = NULL;
524                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
525
526                 RETURN(rc);
527         }
528
529         case OBD_IOC_ENDRECORD: {
530                 if (!mds->mds_cfg_llh)
531                         RETURN(-EBADF);
532
533                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
534                 rc = llog_close(mds->mds_cfg_llh);
535                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
536
537                 mds->mds_cfg_llh = NULL;
538                 RETURN(rc);
539         }
540
541         case OBD_IOC_CLEAR_LOG: {
542                 char *name = data->ioc_inlbuf1;
543                 if (mds->mds_cfg_llh)
544                         RETURN(-EBUSY);
545
546                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
547                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
548                                  &mds->mds_cfg_llh, NULL, name);
549                 if (rc == 0) {
550                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
551                                          NULL);
552
553                         rc = llog_destroy(mds->mds_cfg_llh);
554                         llog_free_handle(mds->mds_cfg_llh);
555                 }
556                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
557
558                 mds->mds_cfg_llh = NULL;
559                 RETURN(rc);
560         }
561
562         case OBD_IOC_DORECORD: {
563                 char *cfg_buf;
564                 struct llog_rec_hdr rec;
565                 if (!mds->mds_cfg_llh)
566                         RETURN(-EBADF);
567
568                 rec.lrh_len = llog_data_len(data->ioc_plen1);
569
570                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
571                         rec.lrh_type = OBD_CFG_REC;
572                 } else {
573                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
574                         RETURN(-EINVAL);
575                 }
576
577                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
578                 if (cfg_buf == NULL)
579                         RETURN(-EINVAL);
580                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
581                 if (rc) {
582                         OBD_FREE(cfg_buf, data->ioc_plen1);
583                         RETURN(rc);
584                 }
585
586                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
587                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
588                                     cfg_buf, -1);
589                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
590
591                 OBD_FREE(cfg_buf, data->ioc_plen1);
592                 RETURN(rc);
593         }
594
595         case OBD_IOC_PARSE: {
596                 struct llog_ctxt *ctxt =
597                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
598                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
599                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
600                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
601                 if (rc)
602                         RETURN(rc);
603
604                 RETURN(rc);
605         }
606
607         case OBD_IOC_DUMP_LOG: {
608                 struct llog_ctxt *ctxt =
609                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
610                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
611                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
612                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
613                 if (rc)
614                         RETURN(rc);
615
616                 RETURN(rc);
617         }
618
619         case OBD_IOC_SYNC: {
620                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
621                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
622                 RETURN(rc);
623         }
624
625         case OBD_IOC_SET_READONLY: {
626                 void *handle;
627                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
628                 BDEVNAME_DECLARE_STORAGE(tmp);
629                 CERROR("*** setting device %s read-only ***\n",
630                        ll_bdevname(obd->u.obt.obt_sb, tmp));
631
632                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
633                 if (!IS_ERR(handle))
634                         rc = fsfilt_commit(obd, inode, handle, 1);
635
636                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
637                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
638
639                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
640                 RETURN(0);
641         }
642
643         case OBD_IOC_CATLOGLIST: {
644                 int count = mds->mds_lov_desc.ld_tgt_count;
645                 rc = llog_catalog_list(obd, count, data);
646                 RETURN(rc);
647
648         }
649         case OBD_IOC_LLOG_CHECK:
650         case OBD_IOC_LLOG_CANCEL:
651         case OBD_IOC_LLOG_REMOVE: {
652                 struct llog_ctxt *ctxt =
653                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
654                 int rc2;
655                 __u32 group;
656
657                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
658                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
659                 rc = llog_ioctl(ctxt, cmd, data);
660                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
661                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
662                 group = FILTER_GROUP_MDS0 + mds->mds_id;
663                 rc2 = obd_set_info_async(mds->mds_osc_exp,
664                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
665                                          sizeof(group), &group, NULL);
666                 if (!rc)
667                         rc = rc2;
668                 RETURN(rc);
669         }
670         case OBD_IOC_LLOG_INFO:
671         case OBD_IOC_LLOG_PRINT: {
672                 struct llog_ctxt *ctxt =
673                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
674
675                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
676                 rc = llog_ioctl(ctxt, cmd, data);
677                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
678
679                 RETURN(rc);
680         }
681
682         case OBD_IOC_ABORT_RECOVERY:
683                 CERROR("aborting recovery for device %s\n", obd->obd_name);
684                 target_stop_recovery_thread(obd);
685                 RETURN(0);
686
687         default:
688                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
689                 RETURN(-EINVAL);
690         }
691         RETURN(0);
692
693 }
694
695 struct mds_lov_sync_info {
696         struct obd_device *mlsi_obd;     /* the lov device to sync */
697         struct obd_device *mlsi_watched; /* target osc */
698         __u32              mlsi_index;   /* index of target */
699 };
700
701 static int mds_propagate_capa_keys(struct mds_obd *mds)
702 {
703         struct lustre_capa_key *key;
704         int i, rc = 0;
705
706         ENTRY;
707
708         if (!mds->mds_capa_keys)
709                 RETURN(0);
710
711         for (i = 0; i < 2; i++) {
712                 key = &mds->mds_capa_keys[i];
713                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
714
715                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
716                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
717                 if (rc) {
718                         DEBUG_CAPA_KEY(D_ERROR, key,
719                                        "propagate failed (rc = %d) for", rc);
720                         RETURN(rc);
721                 }
722         }
723
724         RETURN(0);
725 }
726
727 /* We only sync one osc at a time, so that we don't have to hold
728    any kind of lock on the whole mds_lov_desc, which may change
729    (grow) as a result of mds_lov_add_ost.  This also avoids any
730    kind of mismatch between the lov_desc and the mds_lov_desc,
731    which are not in lock-step during lov_add_obd */
732 static int __mds_lov_synchronize(void *data)
733 {
734         struct mds_lov_sync_info *mlsi = data;
735         struct obd_device *obd = mlsi->mlsi_obd;
736         struct obd_device *watched = mlsi->mlsi_watched;
737         struct mds_obd *mds = &obd->u.mds;
738         struct obd_uuid *uuid;
739         __u32  idx = mlsi->mlsi_index;
740         struct mds_group_info mgi;
741         int rc = 0;
742         ENTRY;
743
744         OBD_FREE(mlsi, sizeof(*mlsi));
745
746         LASSERT(obd);
747         LASSERT(watched);
748         uuid = &watched->u.cli.cl_target_uuid;
749         LASSERT(uuid);
750
751         rc = mds_lov_update_mds(obd, watched, idx, uuid);
752         if (rc != 0)
753                 GOTO(out, rc);
754         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
755         mgi.uuid = uuid;
756         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
757                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
758         if (rc != 0)
759                 GOTO(out, rc);
760
761         /* propagate capability keys */
762         rc = mds_propagate_capa_keys(mds);
763         if (rc)
764                 GOTO(out, rc);
765
766         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
767                           mds->mds_lov_desc.ld_tgt_count,
768                           NULL, NULL, uuid);
769
770         if (rc != 0) {
771                 CERROR("%s: failed at llog_origin_connect: %d\n",
772                        obd->obd_name, rc);
773                 GOTO(out, rc);
774         }
775
776         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
777               obd->obd_name, obd_uuid2str(uuid));
778         /*
779          * FIXME: this obd_stopping was useless, 
780          * since obd in mdt layer was set
781          */
782         if (obd->obd_stopping)
783                 GOTO(out, rc = -ENODEV);
784
785         rc = mds_lov_clear_orphans(mds, uuid);
786         if (rc != 0) {
787                 CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
788                        obd->obd_name, rc);
789                 GOTO(out, rc);
790         }
791         
792         if (obd->obd_upcall.onu_owner) {
793                 /*
794                  * This is a hack for mds_notify->mdd_notify. When the mds obd
795                  * in mdd is removed, This hack should be removed.
796                  */
797                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
798                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
799                                                  obd->obd_upcall.onu_owner);
800         }
801         EXIT;
802 out:
803         class_decref(obd);
804         return rc;
805 }
806
807 int mds_lov_synchronize(void *data)
808 {
809         struct mds_lov_sync_info *mlsi = data;
810         char name[20];
811
812         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
813                 /* There is still a watched target,
814                 but we don't know its index */
815                 sprintf(name, "ll_sync_tgt");
816         else
817                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
818         ptlrpc_daemonize(name);
819
820         RETURN(__mds_lov_synchronize(data));
821 }
822
823 int mds_lov_start_synchronize(struct obd_device *obd,
824                               struct obd_device *watched,
825                               void *data, int nonblock)
826 {
827         struct mds_lov_sync_info *mlsi;
828         int rc;
829
830         ENTRY;
831
832         LASSERT(watched);
833
834         OBD_ALLOC(mlsi, sizeof(*mlsi));
835         if (mlsi == NULL)
836                 RETURN(-ENOMEM);
837
838         mlsi->mlsi_obd = obd;
839         mlsi->mlsi_watched = watched;
840         if (data)
841                 mlsi->mlsi_index = *(__u32 *)data;
842         else
843                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
844
845         /* Although class_export_get(obd->obd_self_export) would lock
846            the MDS in place, since it's only a self-export
847            it doesn't lock the LOV in place.  The LOV can be disconnected
848            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
849            Simply taking an export ref on the LOV doesn't help, because it's
850            still disconnected. Taking an obd reference insures that we don't
851            disconnect the LOV.  This of course means a cleanup won't
852            finish for as long as the sync is blocking. */
853         class_incref(obd);
854
855         if (nonblock) {
856                 /* Synchronize in the background */
857                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
858                                        CLONE_VM | CLONE_FILES);
859                 if (rc < 0) {
860                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
861                                obd->obd_name, rc);
862                         class_decref(obd);
863                 } else {
864                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
865                                "thread=%d\n", obd->obd_name,
866                                mlsi->mlsi_index, rc);
867                         rc = 0;
868                 }
869         } else {
870                 rc = __mds_lov_synchronize((void *)mlsi);
871         }
872
873         RETURN(rc);
874 }
875
876 int mds_notify(struct obd_device *obd, struct obd_device *watched,
877                enum obd_notify_event ev, void *data)
878 {
879         struct mds_obd *mds = &obd->u.mds;
880         int rc = 0;
881         ENTRY;
882
883         switch (ev) {
884         /* We only handle these: */
885         case OBD_NOTIFY_ACTIVE:
886         case OBD_NOTIFY_SYNC:
887         case OBD_NOTIFY_SYNC_NONBLOCK:
888                 break;
889         case OBD_NOTIFY_CONFIG:
890                 /* Open for clients */
891                 obd->obd_no_conn = 0;
892         default:
893                 RETURN(0);
894         }
895
896         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
897         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
898                 CERROR("unexpected notification of %s %s!\n",
899                        watched->obd_type->typ_name, watched->obd_name);
900                 RETURN(-EINVAL);
901         }
902
903         if (obd->obd_recovering) {
904                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
905                       obd->obd_name,
906                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
907                 /* We still have to fix the lov descriptor for ost's added
908                    after the mdt in the config log.  They didn't make it into
909                    mds_lov_connect. */
910                 down_write(&mds->mds_lov_objids_sem);
911                 rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
912                 up_write(&mds->mds_lov_objids_sem);
913                 if (rc)
914                         RETURN(rc);
915                 /* We should update init llog here too for replay unlink and 
916                  * possiable llog init race when recovery complete */
917                 mutex_down(&obd->obd_dev_sem);
918                 llog_cat_initialize(obd, NULL, 
919                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
920                                     &watched->u.cli.cl_target_uuid);
921                 mutex_up(&obd->obd_dev_sem);
922                 RETURN(rc);
923         }
924
925         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
926         rc = mds_lov_start_synchronize(obd, watched, data,
927                                        !(ev == OBD_NOTIFY_SYNC));
928
929         lquota_recovery(mds_quota_interface_ref, obd);
930
931         RETURN(rc);
932 }
933
934 /* Convert the on-disk LOV EA structre.
935  * We always try to convert from an old LOV EA format to the common in-memory
936  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
937  * then convert back to the new on-disk format and save it back to disk
938  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
939  * to convert it each time this inode is accessed.
940  *
941  * This function is a bit interesting in the error handling.  We can safely
942  * ship the old lmm to the client in case of failure, since it uses the same
943  * obd_unpackmd() code and can do the conversion if the MDS fails for some
944  * reason.  We will not delete the old lmm data until we have written the
945  * new format lmm data in fsfilt_set_md(). */
946 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
947                        struct lov_mds_md *lmm, int lmm_size)
948 {
949         struct lov_stripe_md *lsm = NULL;
950         void *handle;
951         int rc, err;
952         ENTRY;
953
954         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
955             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
956                 RETURN(0);
957
958         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
959                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
960                LOV_MAGIC);
961
962         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
963         if (rc < 0)
964                 GOTO(conv_end, rc);
965
966         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
967         if (rc < 0)
968                 GOTO(conv_free, rc);
969         lmm_size = rc;
970
971         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
972         if (IS_ERR(handle)) {
973                 rc = PTR_ERR(handle);
974                 GOTO(conv_free, rc);
975         }
976
977         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
978
979         err = fsfilt_commit(obd, inode, handle, 0);
980         if (!rc)
981                 rc = err ? err : lmm_size;
982         GOTO(conv_free, rc);
983 conv_free:
984         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
985 conv_end:
986         return rc;
987 }
988
989 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
990                          struct lov_desc *desc)
991 {
992         int i;
993         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
994                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
995                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
996         }
997 }
998 EXPORT_SYMBOL(mds_objids_from_lmm);