Whamcloud - gitweb
0ef7084dcf6bf2d2bce152c9b6fd3a74728fdeb0
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         int i;
48         ENTRY;
49
50         lock_kernel();
51         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
52                 if (ids[i] > (mds->mds_lov_objids)[i]) {
53                         (mds->mds_lov_objids)[i] = ids[i];
54                         mds->mds_lov_objids_dirty = 1;
55                 }
56         unlock_kernel();
57         EXIT;
58 }
59 EXPORT_SYMBOL(mds_lov_update_objids);
60
61 static int mds_lov_read_objids(struct obd_device *obd)
62 {
63         struct mds_obd *mds = &obd->u.mds;
64         obd_id *ids;
65         loff_t off = 0;
66         int i, rc, size;
67         ENTRY;
68
69         LASSERT(!mds->mds_lov_objids_size);
70         LASSERT(!mds->mds_lov_objids_dirty);
71
72         /* Read everything in the file, even if our current lov desc
73            has fewer targets. Old targets not in the lov descriptor
74            during mds setup may still have valid objids. */
75         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
76         if (size == 0)
77                 RETURN(0);
78
79         OBD_ALLOC(ids, size);
80         if (ids == NULL)
81                 RETURN(-ENOMEM);
82         mds->mds_lov_objids = ids;
83         mds->mds_lov_objids_size = size;
84
85         rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off);
86         if (rc < 0) {
87                 CERROR("Error reading objids %d\n", rc);
88                 RETURN(rc);
89         }
90
91         mds->mds_lov_objids_in_file = size / sizeof(*ids);
92
93         for (i = 0; i < mds->mds_lov_objids_in_file; i++) {
94                 CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
95                        mds->mds_lov_objids[i], i);
96         }
97         RETURN(0);
98 }
99
100 int mds_lov_write_objids(struct obd_device *obd)
101 {
102         struct mds_obd *mds = &obd->u.mds;
103         loff_t off = 0;
104         int i, rc, tgts;
105         ENTRY;
106
107         if (!mds->mds_lov_objids_dirty)
108                 RETURN(0);
109
110         tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file);
111
112         if (!tgts)
113                 RETURN(0);
114
115         for (i = 0; i < tgts; i++)
116                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
117                        mds->mds_lov_objids[i], i);
118
119         rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp,
120                                  mds->mds_lov_objids, tgts * sizeof(obd_id),
121                                  &off, 0);
122         if (rc >= 0) {
123                 mds->mds_lov_objids_dirty = 0;
124                 rc = 0;
125         }
126
127         RETURN(rc);
128 }
129 EXPORT_SYMBOL(mds_lov_write_objids);
130
131 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
132 {
133         int rc;
134         struct obdo oa;
135         struct obd_trans_info oti = {0};
136         struct lov_stripe_md  *empty_ea = NULL;
137         ENTRY;
138
139         LASSERT(mds->mds_lov_objids != NULL);
140
141         /* This create will in fact either create or destroy:  If the OST is
142          * missing objects below this ID, they will be created.  If it finds
143          * objects above this ID, they will be removed. */
144         memset(&oa, 0, sizeof(oa));
145         oa.o_flags = OBD_FL_DELORPHAN;
146         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
147         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
148         if (ost_uuid != NULL) {
149                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
150                 oa.o_valid |= OBD_MD_FLINLINE;
151         }
152         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
153
154         RETURN(rc);
155 }
156
157 /* update the LOV-OSC knowledge of the last used object id's */
158 int mds_lov_set_nextid(struct obd_device *obd)
159 {
160         struct mds_obd *mds = &obd->u.mds;
161         int rc;
162         ENTRY;
163
164         LASSERT(!obd->obd_recovering);
165
166         LASSERT(mds->mds_lov_objids != NULL);
167
168         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID),
169                                 KEY_NEXT_ID,
170                                 mds->mds_lov_desc.ld_tgt_count *
171                                 sizeof(*mds->mds_lov_objids),
172                                 mds->mds_lov_objids, NULL);
173
174         if (rc)
175                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
176                         obd->obd_name, rc);
177
178         RETURN(rc);
179 }
180
181 /* Update the lov desc for a new size lov. */
182 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
183 {
184         struct mds_obd *mds = &obd->u.mds;
185         struct lov_desc *ld;
186         __u32 size, stripes, valsize = sizeof(mds->mds_lov_desc);
187         int rc = 0;
188         ENTRY;
189
190         OBD_ALLOC(ld, sizeof(*ld));
191         if (!ld)
192                 RETURN(-ENOMEM);
193
194         rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC,
195                           &valsize, ld);
196         if (rc)
197                 GOTO(out, rc);
198
199         /* The size of the LOV target table may have increased. */
200         size = ld->ld_tgt_count * sizeof(obd_id);
201         if ((mds->mds_lov_objids_size == 0) ||
202             (size > mds->mds_lov_objids_size)) {
203                 obd_id *ids;
204
205                 /* add room by powers of 2 */
206                 size = 1;
207                 while (size < ld->ld_tgt_count)
208                         size = size << 1;
209                 size = size * sizeof(obd_id);
210
211                 OBD_ALLOC(ids, size);
212                 if (ids == NULL)
213                         GOTO(out, rc = -ENOMEM);
214                 memset(ids, 0, size);
215                 if (mds->mds_lov_objids_size) {
216                         obd_id *old_ids = mds->mds_lov_objids;
217                         memcpy(ids, mds->mds_lov_objids,
218                                mds->mds_lov_objids_size);
219                         mds->mds_lov_objids = ids;
220                         OBD_FREE(old_ids, mds->mds_lov_objids_size);
221                 }
222                 mds->mds_lov_objids = ids;
223                 mds->mds_lov_objids_size = size;
224         }
225
226         /* Don't change the mds_lov_desc until the objids size matches the
227            count (paranoia) */
228         mds->mds_lov_desc = *ld;
229         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
230                mds->mds_lov_desc.ld_tgt_count);
231
232         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
233                         max(mds->mds_lov_desc.ld_tgt_count,
234                             mds->mds_lov_objids_in_file));
235         mds->mds_max_mdsize = lov_mds_md_size(stripes);
236         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
237         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
238                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
239                stripes);
240
241         /* If we added a target we have to reconnect the llogs */
242         /* We only _need_ to do this at first add (idx), or the first time
243            after recovery.  However, it should now be safe to call anytime. */
244         mutex_down(&obd->obd_dev_sem);
245         llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
246         mutex_up(&obd->obd_dev_sem);
247
248         /*XXX this notifies the MDD until lov handling use old mds code */
249         if (obd->obd_upcall.onu_owner) {
250                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
251                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
252                                                  obd->obd_upcall.onu_owner);
253         }
254 out:
255         OBD_FREE(ld, sizeof(*ld));
256         RETURN(rc);
257 }
258
259
260 #define MDSLOV_NO_INDEX -1
261
262 /* Inform MDS about new/updated target */
263 static int mds_lov_update_mds(struct obd_device *obd,
264                               struct obd_device *watched,
265                               __u32 idx, struct obd_uuid *uuid)
266 {
267         struct mds_obd *mds = &obd->u.mds;
268         int old_count;
269         int rc = 0;
270         ENTRY;
271
272         old_count = mds->mds_lov_desc.ld_tgt_count;
273         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
274         if (rc)
275                 RETURN(rc);
276
277         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
278                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
279                mds->mds_lov_desc.ld_tgt_count);
280
281         /* idx is set as data from lov_notify. */
282         if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) {
283                 if (idx >= mds->mds_lov_desc.ld_tgt_count) {
284                         CERROR("index %d > count %d!\n", idx,
285                                mds->mds_lov_desc.ld_tgt_count);
286                         RETURN(-EINVAL);
287                 }
288
289                 if (idx >= mds->mds_lov_objids_in_file) {
290                         /* We never read this lastid; ask the osc */
291                         obd_id lastid;
292                         __u32 size = sizeof(lastid);
293                         rc = obd_get_info(watched->obd_self_export,
294                                           strlen("last_id"),
295                                           "last_id", &size, &lastid);
296                         if (rc)
297                                 RETURN(rc);
298                         mds->mds_lov_objids[idx] = lastid;
299                         mds->mds_lov_objids_dirty = 1;
300                         mds_lov_write_objids(obd);
301                 } else {
302                         /* We have read this lastid from disk; tell the osc.
303                            Don't call this during recovery. */
304                         rc = mds_lov_set_nextid(obd);
305                 }
306
307                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n",
308                       mds->mds_lov_objids[idx], idx);
309         }
310
311         RETURN(rc);
312 }
313
314 /* update the LOV-OSC knowledge of the last used object id's */
315 int mds_lov_connect(struct obd_device *obd, char * lov_name)
316 {
317         struct mds_obd *mds = &obd->u.mds;
318         struct lustre_handle conn = {0,};
319         struct obd_connect_data *data;
320         int rc, i;
321         ENTRY;
322
323         if (IS_ERR(mds->mds_osc_obd))
324                 RETURN(PTR_ERR(mds->mds_osc_obd));
325
326         if (mds->mds_osc_obd)
327                 RETURN(0);
328
329         mds->mds_osc_obd = class_name2obd(lov_name);
330         if (!mds->mds_osc_obd) {
331                 CERROR("MDS cannot locate LOV %s\n", lov_name);
332                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
333                 RETURN(-ENOTCONN);
334         }
335
336         OBD_ALLOC(data, sizeof(*data));
337         if (data == NULL)
338                 RETURN(-ENOMEM);
339         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
340                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
341                                   OBD_CONNECT_OSS_CAPA;
342         data->ocd_version = LUSTRE_VERSION_CODE;
343         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
344         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
345         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
346         OBD_FREE(data, sizeof(*data));
347         if (rc) {
348                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
349                 mds->mds_osc_obd = ERR_PTR(rc);
350                 RETURN(rc);
351         }
352         mds->mds_osc_exp = class_conn2export(&conn);
353
354         rc = obd_register_observer(mds->mds_osc_obd, obd);
355         if (rc) {
356                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
357                        lov_name, rc);
358                 GOTO(err_discon, rc);
359         }
360
361         /* Deny new client connections until we are sure we have some OSTs */
362         obd->obd_no_conn = 1;
363
364         rc = mds_lov_read_objids(obd);
365         if (rc) {
366                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
367                 GOTO(err_reg, rc);
368         }
369
370         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
371         if (rc)
372                 GOTO(err_reg, rc);
373
374         /* tgt_count may be 0! */
375         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
376         if (rc) {
377                 CERROR("failed to initialize catalog %d\n", rc);
378                 GOTO(err_reg, rc);
379         }
380
381         /* If we're mounting this code for the first time on an existing FS,
382          * we need to populate the objids array from the real OST values */
383         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) {
384                 int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count;
385                 rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"),
386                                   "last_id", &size, mds->mds_lov_objids);
387                 if (!rc) {
388                         for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++)
389                                 CWARN("got last object "LPU64" from OST %d\n",
390                                       mds->mds_lov_objids[i], i);
391                         mds->mds_lov_objids_dirty = 1;
392                         rc = mds_lov_write_objids(obd);
393                         if (rc)
394                                 CERROR("got last objids from OSTs, but error "
395                                        "writing objids file: %d\n", rc);
396                 }
397         }
398
399         /* I want to see a callback happen when the OBD moves to a
400          * "For General Use" state, and that's when we'll call
401          * set_nextid().  The class driver can help us here, because
402          * it can use the obd_recovering flag to determine when the
403          * the OBD is full available. */
404         /* MDD device will care about that
405         if (!obd->obd_recovering)
406                 rc = mds_postrecov(obd);
407          */
408         RETURN(rc);
409
410 err_reg:
411         obd_register_observer(mds->mds_osc_obd, NULL);
412 err_discon:
413         obd_disconnect(mds->mds_osc_exp);
414         mds->mds_osc_exp = NULL;
415         mds->mds_osc_obd = ERR_PTR(rc);
416         RETURN(rc);
417 }
418
419 int mds_lov_disconnect(struct obd_device *obd)
420 {
421         struct mds_obd *mds = &obd->u.mds;
422         int rc = 0;
423         ENTRY;
424
425         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
426                 obd_register_observer(mds->mds_osc_obd, NULL);
427
428                 /* The actual disconnect of the mds_lov will be called from
429                  * class_disconnect_exports from mds_lov_clean. So we have to
430                  * ensure that class_cleanup doesn't fail due to the extra ref
431                  * we're holding now. The mechanism to do that already exists -
432                  * the obd_force flag. We'll drop the final ref to the
433                  * mds_osc_exp in mds_cleanup. */
434                 mds->mds_osc_obd->obd_force = 1;
435         }
436
437         RETURN(rc);
438 }
439
440 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
441                   void *karg, void *uarg)
442 {
443         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
444         struct obd_device *obd = exp->exp_obd;
445         struct mds_obd *mds = &obd->u.mds;
446         struct obd_ioctl_data *data = karg;
447         struct lvfs_run_ctxt saved;
448         int rc = 0;
449
450         ENTRY;
451         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
452
453         switch (cmd) {
454         case OBD_IOC_RECORD: {
455                 char *name = data->ioc_inlbuf1;
456                 if (mds->mds_cfg_llh)
457                         RETURN(-EBUSY);
458
459                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
460                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
461                                  &mds->mds_cfg_llh, NULL, name);
462                 if (rc == 0)
463                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
464                                          &cfg_uuid);
465                 else
466                         mds->mds_cfg_llh = NULL;
467                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
468
469                 RETURN(rc);
470         }
471
472         case OBD_IOC_ENDRECORD: {
473                 if (!mds->mds_cfg_llh)
474                         RETURN(-EBADF);
475
476                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
477                 rc = llog_close(mds->mds_cfg_llh);
478                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
479
480                 mds->mds_cfg_llh = NULL;
481                 RETURN(rc);
482         }
483
484         case OBD_IOC_CLEAR_LOG: {
485                 char *name = data->ioc_inlbuf1;
486                 if (mds->mds_cfg_llh)
487                         RETURN(-EBUSY);
488
489                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
490                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
491                                  &mds->mds_cfg_llh, NULL, name);
492                 if (rc == 0) {
493                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
494                                          NULL);
495
496                         rc = llog_destroy(mds->mds_cfg_llh);
497                         llog_free_handle(mds->mds_cfg_llh);
498                 }
499                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
500
501                 mds->mds_cfg_llh = NULL;
502                 RETURN(rc);
503         }
504
505         case OBD_IOC_DORECORD: {
506                 char *cfg_buf;
507                 struct llog_rec_hdr rec;
508                 if (!mds->mds_cfg_llh)
509                         RETURN(-EBADF);
510
511                 rec.lrh_len = llog_data_len(data->ioc_plen1);
512
513                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
514                         rec.lrh_type = OBD_CFG_REC;
515                 } else {
516                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
517                         RETURN(-EINVAL);
518                 }
519
520                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
521                 if (cfg_buf == NULL)
522                         RETURN(-EINVAL);
523                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
524                 if (rc) {
525                         OBD_FREE(cfg_buf, data->ioc_plen1);
526                         RETURN(rc);
527                 }
528
529                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
530                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
531                                     cfg_buf, -1);
532                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
533
534                 OBD_FREE(cfg_buf, data->ioc_plen1);
535                 RETURN(rc);
536         }
537
538         case OBD_IOC_PARSE: {
539                 struct llog_ctxt *ctxt =
540                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
541                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
542                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
543                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
544                 if (rc)
545                         RETURN(rc);
546
547                 RETURN(rc);
548         }
549
550         case OBD_IOC_DUMP_LOG: {
551                 struct llog_ctxt *ctxt =
552                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
553                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
554                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
555                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
556                 if (rc)
557                         RETURN(rc);
558
559                 RETURN(rc);
560         }
561
562         case OBD_IOC_SYNC: {
563                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
564                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
565                 RETURN(rc);
566         }
567
568         case OBD_IOC_SET_READONLY: {
569                 void *handle;
570                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
571                 BDEVNAME_DECLARE_STORAGE(tmp);
572                 CERROR("*** setting device %s read-only ***\n",
573                        ll_bdevname(obd->u.obt.obt_sb, tmp));
574
575                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
576                 if (!IS_ERR(handle))
577                         rc = fsfilt_commit(obd, inode, handle, 1);
578
579                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
580                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
581
582                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
583                 RETURN(0);
584         }
585
586         case OBD_IOC_CATLOGLIST: {
587                 int count = mds->mds_lov_desc.ld_tgt_count;
588                 rc = llog_catalog_list(obd, count, data);
589                 RETURN(rc);
590
591         }
592         case OBD_IOC_LLOG_CHECK:
593         case OBD_IOC_LLOG_CANCEL:
594         case OBD_IOC_LLOG_REMOVE: {
595                 struct llog_ctxt *ctxt =
596                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
597                 int rc2;
598                 __u32 group;
599
600                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
601                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
602                 rc = llog_ioctl(ctxt, cmd, data);
603                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
604                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
605                 group = FILTER_GROUP_MDS0 + mds->mds_id;
606                 rc2 = obd_set_info_async(mds->mds_osc_exp,
607                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
608                                          sizeof(group), &group, NULL);
609                 if (!rc)
610                         rc = rc2;
611                 RETURN(rc);
612         }
613         case OBD_IOC_LLOG_INFO:
614         case OBD_IOC_LLOG_PRINT: {
615                 struct llog_ctxt *ctxt =
616                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
617
618                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
619                 rc = llog_ioctl(ctxt, cmd, data);
620                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
621
622                 RETURN(rc);
623         }
624
625         case OBD_IOC_ABORT_RECOVERY:
626                 CERROR("aborting recovery for device %s\n", obd->obd_name);
627                 target_stop_recovery_thread(obd);
628                 RETURN(0);
629
630         default:
631                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
632                 RETURN(-EINVAL);
633         }
634         RETURN(0);
635
636 }
637
638 struct mds_lov_sync_info {
639         struct obd_device *mlsi_obd;     /* the lov device to sync */
640         struct obd_device *mlsi_watched; /* target osc */
641         __u32              mlsi_index;   /* index of target */
642 };
643
644 static int mds_propagate_capa_keys(struct mds_obd *mds)
645 {
646         struct lustre_capa_key *key;
647         int i, rc = 0;
648
649         ENTRY;
650
651         if (!mds->mds_capa_keys)
652                 RETURN(0);
653
654         for (i = 0; i < 2; i++) {
655                 key = &mds->mds_capa_keys[i];
656                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
657
658                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
659                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
660                 if (rc) {
661                         DEBUG_CAPA_KEY(D_ERROR, key,
662                                        "propagate failed (rc = %d) for", rc);
663                         RETURN(rc);
664                 }
665         }
666
667         RETURN(0);
668 }
669
670 /* We only sync one osc at a time, so that we don't have to hold
671    any kind of lock on the whole mds_lov_desc, which may change
672    (grow) as a result of mds_lov_add_ost.  This also avoids any
673    kind of mismatch between the lov_desc and the mds_lov_desc,
674    which are not in lock-step during lov_add_obd */
675 static int __mds_lov_synchronize(void *data)
676 {
677         struct mds_lov_sync_info *mlsi = data;
678         struct obd_device *obd = mlsi->mlsi_obd;
679         struct obd_device *watched = mlsi->mlsi_watched;
680         struct mds_obd *mds = &obd->u.mds;
681         struct obd_uuid *uuid;
682         __u32  idx = mlsi->mlsi_index;
683         struct mds_group_info mgi;
684         int rc = 0;
685         ENTRY;
686
687         OBD_FREE(mlsi, sizeof(*mlsi));
688
689         LASSERT(obd);
690         LASSERT(watched);
691         uuid = &watched->u.cli.cl_target_uuid;
692         LASSERT(uuid);
693
694         rc = mds_lov_update_mds(obd, watched, idx, uuid);
695         if (rc != 0)
696                 GOTO(out, rc);
697         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
698         mgi.uuid = uuid;
699         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
700                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
701         if (rc != 0)
702                 GOTO(out, rc);
703
704         /* propagate capability keys */
705         rc = mds_propagate_capa_keys(mds);
706         if (rc)
707                 GOTO(out, rc);
708
709         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
710                           mds->mds_lov_desc.ld_tgt_count,
711                           NULL, NULL, uuid);
712
713         if (rc != 0) {
714                 CERROR("%s: failed at llog_origin_connect: %d\n",
715                        obd->obd_name, rc);
716                 GOTO(out, rc);
717         }
718
719         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
720               obd->obd_name, obd_uuid2str(uuid));
721         /*
722          * FIXME: this obd_stopping was useless, 
723          * since obd in mdt layer was set
724          */
725         if (obd->obd_stopping)
726                 GOTO(out, rc = -ENODEV);
727
728         rc = mds_lov_clear_orphans(mds, uuid);
729         if (rc != 0) {
730                 CERROR("%s: failed at mds_lov_clear_orphans: %d\n",
731                        obd->obd_name, rc);
732                 GOTO(out, rc);
733         }
734         
735         if (obd->obd_upcall.onu_owner) {
736                 /*
737                  * This is a hack for mds_notify->mdd_notify. When the mds obd
738                  * in mdd is removed, This hack should be removed.
739                  */
740                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
741                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
742                                                  obd->obd_upcall.onu_owner);
743         }
744         EXIT;
745 out:
746         class_decref(obd);
747         return rc;
748 }
749
750 int mds_lov_synchronize(void *data)
751 {
752         struct mds_lov_sync_info *mlsi = data;
753         char name[20];
754
755         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
756                 /* There is still a watched target,
757                 but we don't know its index */
758                 sprintf(name, "ll_sync_tgt");
759         else
760                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
761         ptlrpc_daemonize(name);
762
763         RETURN(__mds_lov_synchronize(data));
764 }
765
766 int mds_lov_start_synchronize(struct obd_device *obd,
767                               struct obd_device *watched,
768                               void *data, int nonblock)
769 {
770         struct mds_lov_sync_info *mlsi;
771         int rc;
772
773         ENTRY;
774
775         LASSERT(watched);
776
777         OBD_ALLOC(mlsi, sizeof(*mlsi));
778         if (mlsi == NULL)
779                 RETURN(-ENOMEM);
780
781         mlsi->mlsi_obd = obd;
782         mlsi->mlsi_watched = watched;
783         if (data)
784                 mlsi->mlsi_index = *(__u32 *)data;
785         else
786                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
787
788         /* Although class_export_get(obd->obd_self_export) would lock
789            the MDS in place, since it's only a self-export
790            it doesn't lock the LOV in place.  The LOV can be disconnected
791            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
792            Simply taking an export ref on the LOV doesn't help, because it's
793            still disconnected. Taking an obd reference insures that we don't
794            disconnect the LOV.  This of course means a cleanup won't
795            finish for as long as the sync is blocking. */
796         class_incref(obd);
797
798         if (nonblock) {
799                 /* Synchronize in the background */
800                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
801                                        CLONE_VM | CLONE_FILES);
802                 if (rc < 0) {
803                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
804                                obd->obd_name, rc);
805                         class_decref(obd);
806                 } else {
807                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
808                                "thread=%d\n", obd->obd_name,
809                                mlsi->mlsi_index, rc);
810                         rc = 0;
811                 }
812         } else {
813                 rc = __mds_lov_synchronize((void *)mlsi);
814         }
815
816         RETURN(rc);
817 }
818
819 int mds_notify(struct obd_device *obd, struct obd_device *watched,
820                enum obd_notify_event ev, void *data)
821 {
822         int rc = 0;
823         ENTRY;
824
825         switch (ev) {
826         /* We only handle these: */
827         case OBD_NOTIFY_ACTIVE:
828         case OBD_NOTIFY_SYNC:
829         case OBD_NOTIFY_SYNC_NONBLOCK:
830                 break;
831         case OBD_NOTIFY_CONFIG:
832                 /* Open for clients */
833                 obd->obd_no_conn = 0;
834         default:
835                 RETURN(0);
836         }
837
838         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
839         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
840                 CERROR("unexpected notification of %s %s!\n",
841                        watched->obd_type->typ_name, watched->obd_name);
842                 RETURN(-EINVAL);
843         }
844
845         if (obd->obd_recovering) {
846                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
847                       obd->obd_name,
848                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
849                 /* We still have to fix the lov descriptor for ost's added
850                    after the mdt in the config log.  They didn't make it into
851                    mds_lov_connect. */
852                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
853                 if (rc)
854                         RETURN(rc);
855                 /* We should update init llog here too for replay unlink and 
856                  * possiable llog init race when recovery complete */
857                 mutex_down(&obd->obd_dev_sem);
858                 llog_cat_initialize(obd, NULL, 
859                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
860                                     &watched->u.cli.cl_target_uuid);
861                 mutex_up(&obd->obd_dev_sem);
862                 RETURN(rc);
863         }
864
865         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
866         rc = mds_lov_start_synchronize(obd, watched, data,
867                                        !(ev == OBD_NOTIFY_SYNC));
868
869         lquota_recovery(mds_quota_interface_ref, obd);
870
871         RETURN(rc);
872 }
873
874 /* Convert the on-disk LOV EA structre.
875  * We always try to convert from an old LOV EA format to the common in-memory
876  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
877  * then convert back to the new on-disk format and save it back to disk
878  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
879  * to convert it each time this inode is accessed.
880  *
881  * This function is a bit interesting in the error handling.  We can safely
882  * ship the old lmm to the client in case of failure, since it uses the same
883  * obd_unpackmd() code and can do the conversion if the MDS fails for some
884  * reason.  We will not delete the old lmm data until we have written the
885  * new format lmm data in fsfilt_set_md(). */
886 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
887                        struct lov_mds_md *lmm, int lmm_size)
888 {
889         struct lov_stripe_md *lsm = NULL;
890         void *handle;
891         int rc, err;
892         ENTRY;
893
894         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
895             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
896                 RETURN(0);
897
898         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
899                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
900                LOV_MAGIC);
901
902         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
903         if (rc < 0)
904                 GOTO(conv_end, rc);
905
906         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
907         if (rc < 0)
908                 GOTO(conv_free, rc);
909         lmm_size = rc;
910
911         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
912         if (IS_ERR(handle)) {
913                 rc = PTR_ERR(handle);
914                 GOTO(conv_free, rc);
915         }
916
917         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
918
919         err = fsfilt_commit(obd, inode, handle, 0);
920         if (!rc)
921                 rc = err ? err : lmm_size;
922         GOTO(conv_free, rc);
923 conv_free:
924         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
925 conv_end:
926         return rc;
927 }
928
929 void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
930                          struct lov_desc *desc)
931 {
932         int i;
933         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
934                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
935                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
936         }
937 }
938 EXPORT_SYMBOL(mds_objids_from_lmm);