Whamcloud - gitweb
b=12860
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         ENTRY;
49
50         lock_kernel();
51         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52                 if (ids[i] > (mds->mds_lov_objids)[i]) {
53                         (mds->mds_lov_objids)[i] = ids[i];
54                         mds->mds_lov_objids_dirty = 1;
55                 }
56         unlock_kernel();
57         EXIT;
58 }
59 EXPORT_SYMBOL(mds_lov_update_objids);
60
61 static int mds_lov_read_objids(struct obd_device *obd)
62 {
63         struct mds_obd *mds = &obd->u.mds;
64         obd_id *ids;
65         loff_t off = 0;
66         int i, rc, size;
67         ENTRY;
68
69         LASSERT(!mds->mds_lov_objids_size);
70         LASSERT(!mds->mds_lov_objids_dirty);
71
72         /* Read everything in the file, even if our current lov desc
73            has fewer targets. Old targets not in the lov descriptor
74            during mds setup may still have valid objids. */
75         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
76         if (size == 0)
77                 RETURN(0);
78
79         OBD_ALLOC(ids, size);
80         if (ids == NULL)
81                 RETURN(-ENOMEM);
82         mds->mds_lov_objids = ids;
83         mds->mds_lov_objids_size = size;
84
85         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
86         if (rc < 0) {
87                 CERROR("Error reading objids %d\n", rc);
88                 RETURN(rc);
89         }
90
91         mds->mds_lov_objids_in_file = size / sizeof(*ids);
92
93         for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
94                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95                        mds->mds_lov_objids[i], i);
96         }
97         RETURN(0);
98 }
99
100 int mds_lov_write_objids(struct obd_device *obd)
101 {
102         struct mds_obd *mds = &obd->u.mds;
103         loff_t off = 0;
104         int i, rc, tgts;
105         ENTRY;
106
107         if (!mds->mds_lov_objids_dirty)
108                 RETURN(0);
109
110         tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
111
112         if (!tgts)
113                 RETURN(0);
114
115         for (i = 0; i < tgts; i++)
116                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
117                        mds->mds_lov_objids[i], i);
118
119         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
120                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
121                                  &off, 0);
122         if (rc >= 0) {
123                 mds->mds_lov_objids_dirty = 0;
124                 rc = 0;
125         }
126
127         RETURN(rc);
128 }
129 EXPORT_SYMBOL(mds_lov_write_objids);
130
131 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
132 {
133         int rc;
134         struct obdo oa;
135         struct obd_trans_info oti = {0};
136         struct lov_stripe_md  *empty_ea = NULL;
137         ENTRY;
138
139         LASSERT(mds->mds_lov_objids != NULL);
140
141         /* This create will in fact either create or destroy:  If the OST is
142          * missing objects below this ID, they will be created.  If it finds
143          * objects above this ID, they will be removed. */
144         memset(&oa, 0, sizeof(oa));
145         oa.o_flags = OBD_FL_DELORPHAN;
146         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
147         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
148         if (ost_uuid != NULL) {
149                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
150                 oa.o_valid |= OBD_MD_FLINLINE;
151         }
152         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
153
154         RETURN(rc);
155 }
156
157 /* update the LOV-OSC knowledge of the last used object id's */
158 int mds_lov_set_nextid(struct obd_device *obd)
159 {
160         struct mds_obd *mds = &obd->u.mds;
161         int rc;
162         ENTRY;
163
164         LASSERT(!obd->obd_recovering);
165         LASSERT(mds->mds_lov_objids != NULL);
166      
167         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
168         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
169
170         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
171                                 KEY_NEXT_ID,
172                                 mds->mds_lov_desc.ld_tgt_count *
173                                 sizeof(*mds->mds_lov_objids),
174                                 mds->mds_lov_objids, NULL);
175
176         if (rc)
177                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
178                         obd->obd_name, rc);
179
180         RETURN(rc);
181 }
182
183 /* Update the lov desc for a new size lov. */
184 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
185 {
186         struct mds_obd *mds = &obd->u.mds;
187         struct lov_desc *ld;
188         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
189         int rc = 0;
190         ENTRY;
191
192         OBD_ALLOC(ld, sizeof(*ld));
193         if (!ld)
194                 RETURN(-ENOMEM);
195
196         rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
197                           &valsize, ld);
198         if (rc)
199                 GOTO(out, rc);
200
201         /* The size of the LOV target table may have increased. */
202         size = ld->ld_tgt_count * sizeof(obd_id);
203         if ((mds->mds_lov_objids_size == 0) ||
204             (size > mds->mds_lov_objids_size)) {
205                 obd_id *ids;
206
207                 /* add room by powers of 2 */
208                 size = 1;
209                 while (size < ld->ld_tgt_count)
210                         size = size << 1;
211                 size = size * sizeof(obd_id);
212
213                 OBD_ALLOC(ids, size);
214                 if (ids == NULL)
215                         GOTO(out, rc = -ENOMEM);
216                 memset(ids, 0, size);
217                 if (mds->mds_lov_objids_size) {
218                         obd_id *old_ids = mds->mds_lov_objids;
219                         memcpy(ids, mds->mds_lov_objids,
220                                mds->mds_lov_objids_size);
221                         mds->mds_lov_objids = ids;
222                         OBD_FREE(old_ids, mds->mds_lov_objids_size);
223                 }
224                 mds->mds_lov_objids = ids;
225                 mds->mds_lov_objids_size = size;
226         }
227
228         /* Don't change the mds_lov_desc until the objids size matches the
229            count (paranoia) */
230         mds->mds_lov_desc = *ld;
231         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
232                mds->mds_lov_desc.ld_tgt_count);
233
234         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
235                         max(mds->mds_lov_desc.ld_tgt_count,
236                             mds->mds_lov_objids_in_file));
237         mds->mds_max_mdsize = lov_mds_md_size(stripes);
238         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
239         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
240                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
241                stripes);
242
243         /* If we added a target we have to reconnect the llogs */
244         /* We only _need_ to do this at first add (idx), or the first time
245            after recovery.  However, it should now be safe to call anytime. */
246         llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
247
248         /*XXX this notifies the MDD until lov handling use old mds code */
249         if (obd->obd_upcall.onu_owner) {
250                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
251                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
252                                                  obd->obd_upcall.onu_owner);
253         }
254 out:
255         OBD_FREE(ld, sizeof(*ld));
256         RETURN(rc);
257 }
258
259
260 #define MDSLOV_NO_INDEX -1
261
262 /* Inform MDS about new/updated target */
263 static int mds_lov_update_mds(struct obd_device *obd,
264                               struct obd_device *watched,
265                               __u32 idx, struct obd_uuid *uuid)
266 {
267         struct mds_obd *mds = &obd->u.mds;
268         int old_count;
269         int rc = 0;
270         ENTRY;
271
272         /* Don't let anyone else mess with mds_lov_objids now */
273         mutex_down(&obd->obd_dev_sem);
274
275         old_count = mds->mds_lov_desc.ld_tgt_count;
276         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
277         if (rc) 
278                 GOTO(out, rc);
279
280         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
281                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
282                mds->mds_lov_desc.ld_tgt_count);
283
284         /* idx is set as data from lov_notify. */
285         if (idx == MDSLOV_NO_INDEX || obd->obd_recovering) 
286                 GOTO(out, rc);
287                 
288         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
289                 CERROR("index %d > count %d!\n", idx, 
290                        mds->mds_lov_desc.ld_tgt_count);
291                 GOTO(out, rc = -EINVAL);
292         }
293
294         if (idx >= mds->mds_lov_objids_in_file) {
295                 /* We never read this lastid; ask the osc */
296                 obd_id lastid;
297                 __u32 size = sizeof(lastid);
298                 rc = obd_get_info(watched->obd_self_export, strlen("last_id"),
299                                   "last_id", &size, &lastid);
300                 if (rc)
301                         GOTO(out, rc);
302                 mds->mds_lov_objids[idx] = lastid;
303                 mds->mds_lov_objids_dirty = 1;
304                 mds_lov_write_objids(obd);
305         } else {
306                 /* We have read this lastid from disk; tell the osc.
307                    Don't call this during recovery. */
308                 rc = mds_lov_set_nextid(obd);
309                 if (rc) {
310                         CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
311                         /* Don't abort the rest of the sync */
312                         rc = 0;
313                 }
314         }
315
316         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
317                mds->mds_lov_objids[idx], idx, rc);
318 out:
319         mutex_up(&obd->obd_dev_sem);
320         RETURN(rc);
321 }
322
323 /* update the LOV-OSC knowledge of the last used object id's */
324 int mds_lov_connect(struct obd_device *obd, char * lov_name)
325 {
326         struct mds_obd *mds = &obd->u.mds;
327         struct lustre_handle conn = {0,};
328         struct obd_connect_data *data;
329         int rc, i;
330         ENTRY;
331
332         if (IS_ERR(mds->mds_osc_obd))
333                 RETURN(PTR_ERR(mds->mds_osc_obd));
334
335         if (mds->mds_osc_obd)
336                 RETURN(0);
337
338         mds->mds_osc_obd = class_name2obd(lov_name);
339         if (!mds->mds_osc_obd) {
340                 CERROR("MDS cannot locate LOV %s\n", lov_name);
341                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
342                 RETURN(-ENOTCONN);
343         }
344
345         OBD_ALLOC(data, sizeof(*data));
346         if (data == NULL)
347                 RETURN(-ENOMEM);
348         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
349                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
350                                   OBD_CONNECT_OSS_CAPA;
351 #ifdef HAVE_LRU_RESIZE_SUPPORT
352         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
353 #endif
354         data->ocd_version = LUSTRE_VERSION_CODE;
355         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
356         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
357         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
358         OBD_FREE(data, sizeof(*data));
359         if (rc) {
360                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
361                 mds->mds_osc_obd = ERR_PTR(rc);
362                 RETURN(rc);
363         }
364         mds->mds_osc_exp = class_conn2export(&conn);
365
366         rc = obd_register_observer(mds->mds_osc_obd, obd);
367         if (rc) {
368                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
369                        lov_name, rc);
370                 GOTO(err_discon, rc);
371         }
372
373         /* Deny new client connections until we are sure we have some OSTs */
374         obd->obd_no_conn = 1;
375
376         mutex_down(&obd->obd_dev_sem);
377         rc = mds_lov_read_objids(obd);
378         if (rc) {
379                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
380                 GOTO(err_reg, rc);
381         }
382
383         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
384         if (rc)
385                 GOTO(err_reg, rc);
386
387         /* tgt_count may be 0! */
388         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
389         if (rc) {
390                 CERROR("failed to initialize catalog %d\n", rc);
391                 GOTO(err_reg, rc);
392         }
393
394         /* If we're mounting this code for the first time on an existing FS,
395          * we need to populate the objids array from the real OST values */
396         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
397                 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
398                 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
399                                   "last_id", &size, mds->mds_lov_objids);
400                 if (!rc) {
401                         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
402                                 CWARN("got last object "LPU64" from OST %d\n",
403                                       mds->mds_lov_objids[i], i);
404                         mds->mds_lov_objids_dirty = 1;
405                         rc = mds_lov_write_objids(obd);
406                         if (rc)
407                                 CERROR("got last objids from OSTs, but error "
408                                        "writing objids file: %d\n", rc);
409                 }
410         }
411         mutex_up(&obd->obd_dev_sem);
412
413         /* I want to see a callback happen when the OBD moves to a
414          * "For General Use" state, and that's when we'll call
415          * set_nextid().  The class driver can help us here, because
416          * it can use the obd_recovering flag to determine when the
417          * the OBD is full available. */
418         /* MDD device will care about that
419         if (!obd->obd_recovering)
420                 rc = mds_postrecov(obd);
421          */
422         RETURN(rc);
423
424 err_reg:
425         mutex_up(&obd->obd_dev_sem);
426         obd_register_observer(mds->mds_osc_obd, NULL);
427 err_discon:
428         obd_disconnect(mds->mds_osc_exp);
429         mds->mds_osc_exp = NULL;
430         mds->mds_osc_obd = ERR_PTR(rc);
431         RETURN(rc);
432 }
433
434 int mds_lov_disconnect(struct obd_device *obd)
435 {
436         struct mds_obd *mds = &obd->u.mds;
437         int rc = 0;
438         ENTRY;
439
440         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
441                 obd_register_observer(mds->mds_osc_obd, NULL);
442
443                 /* The actual disconnect of the mds_lov will be called from
444                  * class_disconnect_exports from mds_lov_clean. So we have to
445                  * ensure that class_cleanup doesn't fail due to the extra ref
446                  * we're holding now. The mechanism to do that already exists -
447                  * the obd_force flag. We'll drop the final ref to the
448                  * mds_osc_exp in mds_cleanup. */
449                 mds->mds_osc_obd->obd_force = 1;
450         }
451
452         RETURN(rc);
453 }
454
455 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
456                   void *karg, void *uarg)
457 {
458         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
459         struct obd_device *obd = exp->exp_obd;
460         struct mds_obd *mds = &obd->u.mds;
461         struct obd_ioctl_data *data = karg;
462         struct lvfs_run_ctxt saved;
463         int rc = 0;
464
465         ENTRY;
466         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
467
468         switch (cmd) {
469         case OBD_IOC_RECORD: {
470                 char *name = data->ioc_inlbuf1;
471                 if (mds->mds_cfg_llh)
472                         RETURN(-EBUSY);
473
474                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
475                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
476                                  &mds->mds_cfg_llh, NULL, name);
477                 if (rc == 0)
478                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
479                                          &cfg_uuid);
480                 else
481                         mds->mds_cfg_llh = NULL;
482                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
483
484                 RETURN(rc);
485         }
486
487         case OBD_IOC_ENDRECORD: {
488                 if (!mds->mds_cfg_llh)
489                         RETURN(-EBADF);
490
491                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
492                 rc = llog_close(mds->mds_cfg_llh);
493                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
494
495                 mds->mds_cfg_llh = NULL;
496                 RETURN(rc);
497         }
498
499         case OBD_IOC_CLEAR_LOG: {
500                 char *name = data->ioc_inlbuf1;
501                 if (mds->mds_cfg_llh)
502                         RETURN(-EBUSY);
503
504                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
505                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
506                                  &mds->mds_cfg_llh, NULL, name);
507                 if (rc == 0) {
508                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
509                                          NULL);
510
511                         rc = llog_destroy(mds->mds_cfg_llh);
512                         llog_free_handle(mds->mds_cfg_llh);
513                 }
514                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
515
516                 mds->mds_cfg_llh = NULL;
517                 RETURN(rc);
518         }
519
520         case OBD_IOC_DORECORD: {
521                 char *cfg_buf;
522                 struct llog_rec_hdr rec;
523                 if (!mds->mds_cfg_llh)
524                         RETURN(-EBADF);
525
526                 rec.lrh_len = llog_data_len(data->ioc_plen1);
527
528                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
529                         rec.lrh_type = OBD_CFG_REC;
530                 } else {
531                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
532                         RETURN(-EINVAL);
533                 }
534
535                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
536                 if (cfg_buf == NULL)
537                         RETURN(-EINVAL);
538                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
539                 if (rc) {
540                         OBD_FREE(cfg_buf, data->ioc_plen1);
541                         RETURN(rc);
542                 }
543
544                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
545                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
546                                     cfg_buf, -1);
547                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
548
549                 OBD_FREE(cfg_buf, data->ioc_plen1);
550                 RETURN(rc);
551         }
552
553         case OBD_IOC_PARSE: {
554                 struct llog_ctxt *ctxt =
555                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
556                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
557                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
558                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
559                 if (rc)
560                         RETURN(rc);
561
562                 RETURN(rc);
563         }
564
565         case OBD_IOC_DUMP_LOG: {
566                 struct llog_ctxt *ctxt =
567                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
568                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
569                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
570                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
571                 if (rc)
572                         RETURN(rc);
573
574                 RETURN(rc);
575         }
576
577         case OBD_IOC_SYNC: {
578                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
579                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
580                 RETURN(rc);
581         }
582
583         case OBD_IOC_SET_READONLY: {
584                 void *handle;
585                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
586                 BDEVNAME_DECLARE_STORAGE(tmp);
587                 CERROR("*** setting device %s read-only ***\n",
588                        ll_bdevname(obd->u.obt.obt_sb, tmp));
589
590                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
591                 if (!IS_ERR(handle))
592                         rc = fsfilt_commit(obd, inode, handle, 1);
593
594                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
595                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
596
597                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
598                 RETURN(0);
599         }
600
601         case OBD_IOC_CATLOGLIST: {
602                 int count = mds->mds_lov_desc.ld_tgt_count;
603                 rc = llog_catalog_list(obd, count, data);
604                 RETURN(rc);
605
606         }
607         case OBD_IOC_LLOG_CHECK:
608         case OBD_IOC_LLOG_CANCEL:
609         case OBD_IOC_LLOG_REMOVE: {
610                 struct llog_ctxt *ctxt =
611                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
612                 int rc2;
613                 __u32 group;
614
615                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
616                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
617                 rc = llog_ioctl(ctxt, cmd, data);
618                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
619                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
620                 group = FILTER_GROUP_MDS0 + mds->mds_id;
621                 rc2 = obd_set_info_async(mds->mds_osc_exp,
622                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
623                                          sizeof(group), &group, NULL);
624                 if (!rc)
625                         rc = rc2;
626                 RETURN(rc);
627         }
628         case OBD_IOC_LLOG_INFO:
629         case OBD_IOC_LLOG_PRINT: {
630                 struct llog_ctxt *ctxt =
631                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
632
633                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
634                 rc = llog_ioctl(ctxt, cmd, data);
635                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
636
637                 RETURN(rc);
638         }
639
640         case OBD_IOC_ABORT_RECOVERY:
641                 CERROR("aborting recovery for device %s\n", obd->obd_name);
642                 target_stop_recovery_thread(obd);
643                 RETURN(0);
644
645         default:
646                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
647                 RETURN(-EINVAL);
648         }
649         RETURN(0);
650
651 }
652
653 /* Collect the preconditions we need to allow client connects */
654 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
655 {
656         if (flag & CONFIG_LOG)
657                 obd->u.mds.mds_fl_cfglog = 1;
658         if (flag & CONFIG_SYNC)
659                 obd->u.mds.mds_fl_synced = 1;
660         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
661                 /* Open for clients */
662                 obd->obd_no_conn = 0;
663 }
664
665 struct mds_lov_sync_info {
666         struct obd_device *mlsi_obd;     /* the lov device to sync */
667         struct obd_device *mlsi_watched; /* target osc */
668         __u32              mlsi_index;   /* index of target */
669 };
670
671 static int mds_propagate_capa_keys(struct mds_obd *mds)
672 {
673         struct lustre_capa_key *key;
674         int i, rc = 0;
675
676         ENTRY;
677
678         if (!mds->mds_capa_keys)
679                 RETURN(0);
680
681         for (i = 0; i < 2; i++) {
682                 key = &mds->mds_capa_keys[i];
683                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
684
685                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
686                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
687                 if (rc) {
688                         DEBUG_CAPA_KEY(D_ERROR, key,
689                                        "propagate failed (rc = %d) for", rc);
690                         RETURN(rc);
691                 }
692         }
693
694         RETURN(0);
695 }
696
697 /* We only sync one osc at a time, so that we don't have to hold
698    any kind of lock on the whole mds_lov_desc, which may change
699    (grow) as a result of mds_lov_add_ost.  This also avoids any
700    kind of mismatch between the lov_desc and the mds_lov_desc,
701    which are not in lock-step during lov_add_obd */
702 static int __mds_lov_synchronize(void *data)
703 {
704         struct mds_lov_sync_info *mlsi = data;
705         struct obd_device *obd = mlsi->mlsi_obd;
706         struct obd_device *watched = mlsi->mlsi_watched;
707         struct mds_obd *mds = &obd->u.mds;
708         struct obd_uuid *uuid;
709         __u32  idx = mlsi->mlsi_index;
710         struct mds_group_info mgi;
711         int rc = 0;
712         ENTRY;
713
714         OBD_FREE(mlsi, sizeof(*mlsi));
715
716         LASSERT(obd);
717         LASSERT(watched);
718         uuid = &watched->u.cli.cl_target_uuid;
719         LASSERT(uuid);
720
721         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
722
723         rc = mds_lov_update_mds(obd, watched, idx, uuid);
724         if (rc != 0) {
725                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
726                 GOTO(out, rc);
727         }
728         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
729         mgi.uuid = uuid;
730
731         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
732                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
733         if (rc != 0)
734                 GOTO(out, rc);
735
736         /* propagate capability keys */
737         rc = mds_propagate_capa_keys(mds);
738         if (rc)
739                 GOTO(out, rc);
740
741         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
742                           mds->mds_lov_desc.ld_tgt_count,
743                           NULL, NULL, uuid);
744
745         if (rc != 0) {
746                 CERROR("%s failed at llog_origin_connect: %d\n",
747                        obd_uuid2str(uuid), rc);
748                 GOTO(out, rc);
749         }
750
751         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
752               obd->obd_name, obd_uuid2str(uuid));
753         /*
754          * FIXME: this obd_stopping was useless, 
755          * since obd in mdt layer was set
756          */
757         if (obd->obd_stopping)
758                 GOTO(out, rc = -ENODEV);
759
760         rc = mds_lov_clear_orphans(mds, uuid);
761         if (rc != 0) {
762                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
763                        obd_uuid2str(uuid), rc);
764                 GOTO(out, rc);
765         }
766         
767         if (obd->obd_upcall.onu_owner) {
768                 /*
769                  * This is a hack for mds_notify->mdd_notify. When the mds obd
770                  * in mdd is removed, This hack should be removed.
771                  */
772                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
773                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
774                                                  obd->obd_upcall.onu_owner);
775         }
776         EXIT;
777 out:
778         if (rc) {
779                 /* Deactivate it for safety */
780                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
781                        rc);
782                 if (!obd->obd_stopping && mds->mds_osc_obd &&
783                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping) 
784                         obd_notify(mds->mds_osc_obd, watched,
785                                    OBD_NOTIFY_INACTIVE, NULL);
786         }
787
788         class_decref(obd);
789         return rc;
790 }
791
792 int mds_lov_synchronize(void *data)
793 {
794         struct mds_lov_sync_info *mlsi = data;
795         char name[20];
796
797         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
798                 /* There is still a watched target,
799                 but we don't know its index */
800                 sprintf(name, "ll_sync_tgt");
801         else
802                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
803         ptlrpc_daemonize(name);
804
805         RETURN(__mds_lov_synchronize(data));
806 }
807
808 int mds_lov_start_synchronize(struct obd_device *obd,
809                               struct obd_device *watched,
810                               void *data, int nonblock)
811 {
812         struct mds_lov_sync_info *mlsi;
813         int rc;
814
815         ENTRY;
816
817         LASSERT(watched);
818
819         OBD_ALLOC(mlsi, sizeof(*mlsi));
820         if (mlsi == NULL)
821                 RETURN(-ENOMEM);
822
823         mlsi->mlsi_obd = obd;
824         mlsi->mlsi_watched = watched;
825         if (data)
826                 mlsi->mlsi_index = *(__u32 *)data;
827         else
828                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
829
830         /* Although class_export_get(obd->obd_self_export) would lock
831            the MDS in place, since it's only a self-export
832            it doesn't lock the LOV in place.  The LOV can be disconnected
833            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
834            Simply taking an export ref on the LOV doesn't help, because it's
835            still disconnected. Taking an obd reference insures that we don't
836            disconnect the LOV.  This of course means a cleanup won't
837            finish for as long as the sync is blocking. */
838         class_incref(obd);
839
840         if (nonblock) {
841                 /* Synchronize in the background */
842                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
843                                        CLONE_VM | CLONE_FILES);
844                 if (rc < 0) {
845                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
846                                obd->obd_name, rc);
847                         class_decref(obd);
848                 } else {
849                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
850                                "thread=%d\n", obd->obd_name,
851                                mlsi->mlsi_index, rc);
852                         rc = 0;
853                 }
854         } else {
855                 rc = __mds_lov_synchronize((void *)mlsi);
856         }
857
858         RETURN(rc);
859 }
860
861 int mds_notify(struct obd_device *obd, struct obd_device *watched,
862                enum obd_notify_event ev, void *data)
863 {
864         int rc = 0;
865         ENTRY;
866
867         switch (ev) {
868         /* We only handle these: */
869         case OBD_NOTIFY_ACTIVE:
870         case OBD_NOTIFY_SYNC:
871         case OBD_NOTIFY_SYNC_NONBLOCK:
872                 break;
873         case OBD_NOTIFY_CONFIG:
874                 mds_allow_cli(obd, (unsigned int)data);
875         default:
876                 RETURN(0);
877         }
878
879         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
880         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
881                 CERROR("unexpected notification of %s %s!\n",
882                        watched->obd_type->typ_name, watched->obd_name);
883                 RETURN(-EINVAL);
884         }
885
886         if (obd->obd_recovering) {
887                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
888                       obd->obd_name,
889                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
890                 /* We still have to fix the lov descriptor for ost's added
891                    after the mdt in the config log.  They didn't make it into
892                    mds_lov_connect. */
893                 mutex_down(&obd->obd_dev_sem);
894                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
895                 if (rc) {
896                         mutex_up(&obd->obd_dev_sem);
897                         RETURN(rc);
898                 }
899                 /* We should update init llog here too for replay unlink and 
900                  * possiable llog init race when recovery complete */
901                 llog_cat_initialize(obd, NULL, 
902                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
903                                     &watched->u.cli.cl_target_uuid);
904                 mutex_up(&obd->obd_dev_sem);
905                 mds_allow_cli(obd, CONFIG_SYNC);
906                 RETURN(rc);
907         }
908
909         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
910         rc = mds_lov_start_synchronize(obd, watched, data,
911                                        !(ev == OBD_NOTIFY_SYNC));
912
913         lquota_recovery(mds_quota_interface_ref, obd);
914
915         RETURN(rc);
916 }
917
918 /* Convert the on-disk LOV EA structre.
919  * We always try to convert from an old LOV EA format to the common in-memory
920  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
921  * then convert back to the new on-disk format and save it back to disk
922  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
923  * to convert it each time this inode is accessed.
924  *
925  * This function is a bit interesting in the error handling.  We can safely
926  * ship the old lmm to the client in case of failure, since it uses the same
927  * obd_unpackmd() code and can do the conversion if the MDS fails for some
928  * reason.  We will not delete the old lmm data until we have written the
929  * new format lmm data in fsfilt_set_md(). */
930 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
931                        struct lov_mds_md *lmm, int lmm_size)
932 {
933         struct lov_stripe_md *lsm = NULL;
934         void *handle;
935         int rc, err;
936         ENTRY;
937
938         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
939             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
940                 RETURN(0);
941
942         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
943                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
944                LOV_MAGIC);
945
946         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
947         if (rc < 0)
948                 GOTO(conv_end, rc);
949
950         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
951         if (rc < 0)
952                 GOTO(conv_free, rc);
953         lmm_size = rc;
954
955         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
956         if (IS_ERR(handle)) {
957                 rc = PTR_ERR(handle);
958                 GOTO(conv_free, rc);
959         }
960
961         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
962
963         err = fsfilt_commit(obd, inode, handle, 0);
964         if (!rc)
965                 rc = err ? err : lmm_size;
966         GOTO(conv_free, rc);
967 conv_free:
968         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
969 conv_end:
970         return rc;
971 }
972
973 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
974                          struct lov_desc *desc)
975 {
976         int i;
977         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
978                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
979                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
980         }
981 }
982 EXPORT_SYMBOL(mds_objids_from_lmm);