Whamcloud - gitweb
db3f531312bf69e94d8b7b6c956a779416c46296
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         unsigned int i=0, j;
48
49         CDEBUG(D_INFO, "dump from %s\n", label);
50         if (mds->mds_lov_page_dirty == NULL) {
51                 CERROR("NULL bitmap!\n");
52                 GOTO(skip_bitmap, i);
53         }
54
55         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
57 skip_bitmap:
58         if (mds->mds_lov_page_array == NULL) {
59                 CERROR("not init page array!\n");
60                 GOTO(skip_array, i);
61
62         }
63         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64                 obd_id *data = mds->mds_lov_page_array[i];
65
66                 if (data == NULL)
67                         continue;
68
69                 for(j=0; j < OBJID_PER_PAGE(); j++) {
70                         if (data[j] == 0)
71                                 continue;
72                         CDEBUG(D_INFO,"objid page %u idx %u - %llu ",i,j,data[j]);
73                 }
74         }
75 skip_array:
76         EXIT;
77 }
78
79 int mds_lov_init_objids(struct obd_device *obd)
80 {
81         struct mds_obd *mds = &obd->u.mds;
82         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
83         struct file *file;
84         int rc;
85         ENTRY;
86
87         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
88
89         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90         if (mds->mds_lov_page_dirty == NULL)
91                 RETURN(-ENOMEM);
92
93
94         OBD_ALLOC(mds->mds_lov_page_array, size);
95         if (mds->mds_lov_page_array == NULL)
96                 GOTO(err_free_bitmap, rc = -ENOMEM);
97
98         /* open and test the lov objd file */
99         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
100         if (IS_ERR(file)) {
101                 rc = PTR_ERR(file);
102                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103                 GOTO(err_free, rc = PTR_ERR(file));
104         }
105         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107                        file->f_dentry->d_inode->i_mode);
108                 GOTO(err_open, rc = -ENOENT);
109         }
110         mds->mds_lov_objid_filp = file;
111
112         RETURN (0);
113 err_open:
114         if (filp_close((struct file *)file, 0))
115                 CERROR("can't close %s after error\n", LOV_OBJID);
116 err_free:
117         OBD_FREE(mds->mds_lov_page_array, size);
118 err_free_bitmap:
119         FREE_BITMAP(mds->mds_lov_page_dirty);
120
121         RETURN(rc);
122 }
123 EXPORT_SYMBOL(mds_lov_init_objids);
124
125 void mds_lov_destroy_objids(struct obd_device *obd)
126 {
127         struct mds_obd *mds = &obd->u.mds;
128         int i, rc;
129         ENTRY;
130
131         if (mds->mds_lov_page_array != NULL) {
132                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133                         obd_id *data = mds->mds_lov_page_array[i];
134                         if (data != NULL)
135                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
136                 }
137                 OBD_FREE(mds->mds_lov_page_array,
138                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
139         }
140
141         if (mds->mds_lov_objid_filp) {
142                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143                 mds->mds_lov_objid_filp = NULL;
144                 if (rc)
145                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
146         }
147
148         FREE_BITMAP(mds->mds_lov_page_dirty);
149         EXIT;
150 }
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
152
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
154 {
155         struct mds_obd *mds = &obd->u.mds;
156         int j;
157         ENTRY;
158
159         /* if we create file without objects - lmm is NULL */
160         if (lmm == NULL)
161                 return;
162
163         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164                 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165                 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166                 int page = i / OBJID_PER_PAGE();
167                 int idx = i % OBJID_PER_PAGE();
168                 obd_id *data = mds->mds_lov_page_array[page];
169
170                 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171                                " old %llu\n", i, id, data[idx]);
172                 if (id > data[idx]) {
173                         data[idx] = id;
174                         bitmap_set(mds->mds_lov_page_dirty, page);
175                 }
176         }
177         EXIT;
178 }
179 EXPORT_SYMBOL(mds_lov_update_objids);
180
181 static int mds_lov_read_objids(struct obd_device *obd)
182 {
183         struct mds_obd *mds = &obd->u.mds;
184         loff_t off = 0;
185         int i, rc, count = 0, page = 0;
186         size_t size;
187         ENTRY;
188
189         /* Read everything in the file, even if our current lov desc
190            has fewer targets. Old targets not in the lov descriptor
191            during mds setup may still have valid objids. */
192         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
193         if (size == 0)
194                 RETURN(0);
195
196         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197         CDEBUG(D_INFO, "file size %d pages %d\n", size, page); 
198         for(i=0; i < page; i++) {
199                 obd_id *data =  mds->mds_lov_page_array[i];
200                 loff_t off_old = off;
201
202                 LASSERT(data == NULL);
203                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
204                 if (data == NULL)
205                         GOTO(out, rc = -ENOMEM);
206
207                 mds->mds_lov_page_array[i] = data;
208
209                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
211                 if (rc < 0) {
212                         CERROR("Error reading objids %d\n", rc);
213                         GOTO(out, rc);
214                 }
215                 if (off == off_old)
216                         break; // eof
217
218                 count += (off-off_old+sizeof(obd_id)-1)/sizeof(obd_id);
219         }
220         mds->mds_lov_objid_count = count;
221         mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
222         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
223         CDEBUG(D_INFO, "Read %u objid\n", count);
224 out:
225         mds_lov_dump_objids("read",obd);
226
227         RETURN(0);
228 }
229
230 int mds_lov_write_objids(struct obd_device *obd)
231 {
232         struct mds_obd *mds = &obd->u.mds;
233         int i, rc = 0;
234         ENTRY;
235
236         if (bitmap_check_empty(mds->mds_lov_page_dirty))
237                 RETURN(0);
238
239         mds_lov_dump_objids("write", obd);
240
241         foreach_bit(mds->mds_lov_page_dirty, i) {
242                 obd_id *data =  mds->mds_lov_page_array[i];
243                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
244                 loff_t off = i * size;
245
246                 LASSERT(data != NULL);
247
248                 /* check for particaly filled last page */
249                 if (i == mds->mds_lov_objid_lastpage) {
250                         size = mds->mds_lov_objid_lastidx * sizeof(obd_id);
251                 }
252
253                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
254                                          size, &off, 0);
255                 if (rc < 0)
256                         break;
257                 bitmap_clear(mds->mds_lov_page_dirty, i);
258         }
259         if (rc >= 0)
260                 rc = 0;
261
262         RETURN(rc);
263 }
264 EXPORT_SYMBOL(mds_lov_write_objids);
265
266 static int mds_lov_get_objid(struct obd_device * obd, struct obd_export *export,
267                              __u32 idx)
268 {
269         struct mds_obd *mds = &obd->u.mds;
270         unsigned int page;
271         unsigned int off;
272         obd_id *data;
273         int rc = 0;
274         ENTRY;
275
276         page = idx / OBJID_PER_PAGE();
277         off = idx % OBJID_PER_PAGE();
278         data = mds->mds_lov_page_array[page];
279         if (data == NULL) {
280                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
281                 if (data == NULL)
282                         GOTO(out, rc = -ENOMEM);
283
284                 mds->mds_lov_page_array[page] = data;
285         }
286
287         if (data[off] == 0) {
288                 /* We never read this lastid; ask the osc */
289                 struct obd_id_info lastid;
290                 __u32 size = sizeof(lastid);
291
292                 lastid.idx = idx;
293                 lastid.data = &data[off];
294                 rc = obd_get_info(export, sizeof("last_id"),
295                                   "last_id", &size, &lastid);
296                 if (rc)
297                         GOTO(out, rc);
298
299                 if (idx > mds->mds_lov_objid_count) {
300                         mds->mds_lov_objid_count = idx;
301                         mds->mds_lov_objid_lastpage = page;
302                         mds->mds_lov_objid_lastidx = off;
303                 }
304                 bitmap_set(mds->mds_lov_page_dirty, page);
305         }
306 out:
307         RETURN(rc);
308 }
309
310 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
311 {
312         int rc;
313         struct obdo oa;
314         struct obd_trans_info oti = {0};
315         struct lov_stripe_md  *empty_ea = NULL;
316         ENTRY;
317
318         LASSERT(mds->mds_lov_page_array != NULL);
319
320         /* This create will in fact either create or destroy:  If the OST is
321          * missing objects below this ID, they will be created.  If it finds
322          * objects above this ID, they will be removed. */
323         memset(&oa, 0, sizeof(oa));
324         oa.o_flags = OBD_FL_DELORPHAN;
325         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
326         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
327         if (ost_uuid != NULL) {
328                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
329                 oa.o_valid |= OBD_MD_FLINLINE;
330         }
331         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
332
333         RETURN(rc);
334 }
335 /* update the LOV-OSC knowledge of the last used object id's */
336 /* for all targets */
337 /* is we realy need this ? all osc's should be pass via __mds_lov_synchronize
338  * and call */
339 #define MDS_LOV_SETID_COUNT (CFS_PAGE_SIZE / sizeof(struct obd_id_info))
340
341 int mds_lov_set_nextid(struct obd_device *obd)
342 {
343         struct mds_obd *mds = &obd->u.mds;
344         int i = 0, j, rc = 0;
345         struct obd_id_info *info;
346         ENTRY;
347
348         LASSERT(!obd->obd_recovering);
349
350         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
351         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
352
353         OBD_ALLOC(info, CFS_PAGE_SIZE);
354         if (info == NULL)
355                 RETURN(-ENOMEM);
356
357         while(i < mds->mds_lov_desc.ld_tgt_count) {
358                 for(j=0; j < MDS_LOV_SETID_COUNT; i++, j++) {
359                         int page = i / OBJID_PER_PAGE();
360                         int idx = i % OBJID_PER_PAGE();
361                         obd_id *data = mds->mds_lov_page_array[page];
362
363                         if (i == mds->mds_lov_desc.ld_tgt_count)
364                                 break;
365
366                         info[j].idx = i;
367                         info[j].data = &data[idx];
368                 }
369
370                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
371                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
372                 if (rc) {
373                         CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
374                                  obd->obd_name, rc);
375                         break;
376                 }
377         }
378         OBD_FREE(info, CFS_PAGE_SIZE);
379
380         RETURN(rc);
381
382 }
383
384 /* for one target */
385 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
386 {
387         struct mds_obd *mds = &obd->u.mds;
388         int rc;
389         struct obd_id_info info;
390         ENTRY;
391
392         LASSERT(!obd->obd_recovering);
393
394         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
395         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
396
397         info.idx = idx;
398         info.data = id;
399
400         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
401                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
402         if (rc)
403                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
404                         obd->obd_name, rc);
405
406         RETURN(rc);
407 }
408
409 /* Update the lov desc for a new size lov. */
410 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
411 {
412         struct mds_obd *mds = &obd->u.mds;
413         struct lov_desc *ld;
414         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
415         int page, rc = 0;
416         ENTRY;
417
418         OBD_ALLOC(ld, sizeof(*ld));
419         if (!ld)
420                 RETURN(-ENOMEM);
421
422         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
423                           &valsize, ld);
424         if (rc)
425                 GOTO(out, rc);
426
427         /* The size of the LOV target table may have increased. */
428         page = ld->ld_tgt_count / OBJID_PER_PAGE();
429         if (mds->mds_lov_page_array[page] == NULL) {
430                 obd_id *ids;
431
432                 OBD_ALLOC(ids, MDS_LOV_ALLOC_SIZE);
433                 if (ids == NULL)
434                         GOTO(out, rc = -ENOMEM);
435
436                 mds->mds_lov_page_array[page] = ids;
437         }
438
439         /* Don't change the mds_lov_desc until the objids size matches the
440            count (paranoia) */
441         mds->mds_lov_desc = *ld;
442         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
443                mds->mds_lov_desc.ld_tgt_count);
444
445         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
446                                mds->mds_lov_desc.ld_tgt_count);
447
448         mds->mds_max_mdsize = lov_mds_md_size(stripes);
449         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
450         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
451                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
452                stripes);
453
454         /* If we added a target we have to reconnect the llogs */
455         /* We only _need_ to do this at first add (idx), or the first time
456            after recovery.  However, it should now be safe to call anytime. */
457         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
458
459         /*XXX this notifies the MDD until lov handling use old mds code */
460         if (obd->obd_upcall.onu_owner) {
461                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
462                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
463                                                  obd->obd_upcall.onu_owner);
464         }
465 out:
466         OBD_FREE(ld, sizeof(*ld));
467         RETURN(rc);
468 }
469
470
471 #define MDSLOV_NO_INDEX -1
472
473 /* Inform MDS about new/updated target */
474 static int mds_lov_update_mds(struct obd_device *obd,
475                               struct obd_device *watched,
476                               __u32 idx, struct obd_uuid *uuid)
477 {
478         struct mds_obd *mds = &obd->u.mds;
479         __u32 old_count;
480         int rc = 0;
481         int page;
482         int off;
483         obd_id *data;
484
485         ENTRY;
486
487         /* Don't let anyone else mess with mds_lov_objids now */
488         mutex_down(&obd->obd_dev_sem);
489
490         old_count = mds->mds_lov_desc.ld_tgt_count;
491         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
492         if (rc)
493                 GOTO(out, rc);
494
495         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
496                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
497                mds->mds_lov_desc.ld_tgt_count);
498
499         /* idx is set as data from lov_notify. */
500         if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
501                 GOTO(out, rc);
502
503         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
504                 CERROR("index %d > count %d!\n", idx,
505                        mds->mds_lov_desc.ld_tgt_count);
506                 GOTO(out, rc = -EINVAL);
507         }
508
509         page = idx / OBJID_PER_PAGE();
510         off = idx % OBJID_PER_PAGE();
511         data = mds->mds_lov_page_array[page];
512         CDEBUG(D_CONFIG, "idx %d - %p - %d/%d\n", idx, data, page, off);
513         if (data[off] == 0) {
514                 rc = mds_lov_get_objid(obd, watched->obd_self_export, idx);
515         } else {
516                 /* We have read this lastid from disk; tell the osc.
517                    Don't call this during recovery. */
518                 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
519                 if (rc) {
520                         CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
521                         /* Don't abort the rest of the sync */
522                         rc = 0;
523                 }
524         }
525
526         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
527                data[off], idx, rc);
528 out:
529         mutex_up(&obd->obd_dev_sem);
530         RETURN(rc);
531 }
532
533 /* update the LOV-OSC knowledge of the last used object id's */
534 int mds_lov_connect(struct obd_device *obd, char * lov_name)
535 {
536         struct mds_obd *mds = &obd->u.mds;
537         struct lustre_handle conn = {0,};
538         struct obd_connect_data *data;
539         int rc;
540         ENTRY;
541
542         if (IS_ERR(mds->mds_osc_obd))
543                 RETURN(PTR_ERR(mds->mds_osc_obd));
544
545         if (mds->mds_osc_obd)
546                 RETURN(0);
547
548         mds->mds_osc_obd = class_name2obd(lov_name);
549         if (!mds->mds_osc_obd) {
550                 CERROR("MDS cannot locate LOV %s\n", lov_name);
551                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
552                 RETURN(-ENOTCONN);
553         }
554
555         OBD_ALLOC(data, sizeof(*data));
556         if (data == NULL)
557                 RETURN(-ENOMEM);
558         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
559                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
560                                   OBD_CONNECT_OSS_CAPA;
561 #ifdef HAVE_LRU_RESIZE_SUPPORT
562         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
563 #endif
564         data->ocd_version = LUSTRE_VERSION_CODE;
565         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
566         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
567         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
568         OBD_FREE(data, sizeof(*data));
569         if (rc) {
570                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
571                 mds->mds_osc_obd = ERR_PTR(rc);
572                 RETURN(rc);
573         }
574         mds->mds_osc_exp = class_conn2export(&conn);
575
576         rc = obd_register_observer(mds->mds_osc_obd, obd);
577         if (rc) {
578                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
579                        lov_name, rc);
580                 GOTO(err_discon, rc);
581         }
582
583         /* Deny new client connections until we are sure we have some OSTs */
584         obd->obd_no_conn = 1;
585
586         mutex_down(&obd->obd_dev_sem);
587         rc = mds_lov_read_objids(obd);
588         if (rc) {
589                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
590                 GOTO(err_reg, rc);
591         }
592
593         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
594         if (rc)
595                 GOTO(err_reg, rc);
596
597         /* If we're mounting this code for the first time on an existing FS,
598          * we need to populate the objids array from the real OST values */
599         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
600                 __u32 i = mds->mds_lov_objid_count;
601                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
602                         rc = mds_lov_get_objid(obd, mds->mds_osc_exp, i);
603                         if (rc != 0)
604                                 break;
605                 }
606                 if (rc == 0)
607                         rc = mds_lov_write_objids(obd);
608                 if (rc)
609                         CERROR("got last objids from OSTs, but error "
610                                 "in update objids file: %d\n", rc);
611         }
612         mutex_up(&obd->obd_dev_sem);
613
614         /* I want to see a callback happen when the OBD moves to a
615          * "For General Use" state, and that's when we'll call
616          * set_nextid().  The class driver can help us here, because
617          * it can use the obd_recovering flag to determine when the
618          * the OBD is full available. */
619         /* MDD device will care about that
620         if (!obd->obd_recovering)
621                 rc = mds_postrecov(obd);
622          */
623         RETURN(rc);
624
625 err_reg:
626         mutex_up(&obd->obd_dev_sem);
627         obd_register_observer(mds->mds_osc_obd, NULL);
628 err_discon:
629         obd_disconnect(mds->mds_osc_exp);
630         mds->mds_osc_exp = NULL;
631         mds->mds_osc_obd = ERR_PTR(rc);
632         RETURN(rc);
633 }
634
635 int mds_lov_disconnect(struct obd_device *obd)
636 {
637         struct mds_obd *mds = &obd->u.mds;
638         int rc = 0;
639         ENTRY;
640
641         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
642                 obd_register_observer(mds->mds_osc_obd, NULL);
643
644                 /* The actual disconnect of the mds_lov will be called from
645                  * class_disconnect_exports from mds_lov_clean. So we have to
646                  * ensure that class_cleanup doesn't fail due to the extra ref
647                  * we're holding now. The mechanism to do that already exists -
648                  * the obd_force flag. We'll drop the final ref to the
649                  * mds_osc_exp in mds_cleanup. */
650                 mds->mds_osc_obd->obd_force = 1;
651         }
652
653         RETURN(rc);
654 }
655
656 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
657                   void *karg, void *uarg)
658 {
659         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
660         struct obd_device *obd = exp->exp_obd;
661         struct mds_obd *mds = &obd->u.mds;
662         struct obd_ioctl_data *data = karg;
663         struct lvfs_run_ctxt saved;
664         int rc = 0;
665
666         ENTRY;
667         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
668
669         switch (cmd) {
670         case OBD_IOC_RECORD: {
671                 char *name = data->ioc_inlbuf1;
672                 if (mds->mds_cfg_llh)
673                         RETURN(-EBUSY);
674
675                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
676                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
677                                  &mds->mds_cfg_llh, NULL, name);
678                 if (rc == 0)
679                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
680                                          &cfg_uuid);
681                 else
682                         mds->mds_cfg_llh = NULL;
683                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
684
685                 RETURN(rc);
686         }
687
688         case OBD_IOC_ENDRECORD: {
689                 if (!mds->mds_cfg_llh)
690                         RETURN(-EBADF);
691
692                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
693                 rc = llog_close(mds->mds_cfg_llh);
694                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
695
696                 mds->mds_cfg_llh = NULL;
697                 RETURN(rc);
698         }
699
700         case OBD_IOC_CLEAR_LOG: {
701                 char *name = data->ioc_inlbuf1;
702                 if (mds->mds_cfg_llh)
703                         RETURN(-EBUSY);
704
705                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
706                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
707                                  &mds->mds_cfg_llh, NULL, name);
708                 if (rc == 0) {
709                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
710                                          NULL);
711
712                         rc = llog_destroy(mds->mds_cfg_llh);
713                         llog_free_handle(mds->mds_cfg_llh);
714                 }
715                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
716
717                 mds->mds_cfg_llh = NULL;
718                 RETURN(rc);
719         }
720
721         case OBD_IOC_DORECORD: {
722                 char *cfg_buf;
723                 struct llog_rec_hdr rec;
724                 if (!mds->mds_cfg_llh)
725                         RETURN(-EBADF);
726
727                 rec.lrh_len = llog_data_len(data->ioc_plen1);
728
729                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
730                         rec.lrh_type = OBD_CFG_REC;
731                 } else {
732                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
733                         RETURN(-EINVAL);
734                 }
735
736                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
737                 if (cfg_buf == NULL)
738                         RETURN(-EINVAL);
739                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
740                 if (rc) {
741                         OBD_FREE(cfg_buf, data->ioc_plen1);
742                         RETURN(rc);
743                 }
744
745                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
746                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
747                                     cfg_buf, -1);
748                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
749
750                 OBD_FREE(cfg_buf, data->ioc_plen1);
751                 RETURN(rc);
752         }
753
754         case OBD_IOC_PARSE: {
755                 struct llog_ctxt *ctxt =
756                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
757                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
758                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
759                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
760                 if (rc)
761                         RETURN(rc);
762
763                 RETURN(rc);
764         }
765
766         case OBD_IOC_DUMP_LOG: {
767                 struct llog_ctxt *ctxt =
768                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
769                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
770                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
771                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
772                 if (rc)
773                         RETURN(rc);
774
775                 RETURN(rc);
776         }
777
778         case OBD_IOC_SYNC: {
779                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
780                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
781                 RETURN(rc);
782         }
783
784         case OBD_IOC_SET_READONLY: {
785                 void *handle;
786                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
787                 BDEVNAME_DECLARE_STORAGE(tmp);
788                 CERROR("*** setting device %s read-only ***\n",
789                        ll_bdevname(obd->u.obt.obt_sb, tmp));
790
791                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
792                 if (!IS_ERR(handle))
793                         rc = fsfilt_commit(obd, inode, handle, 1);
794
795                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
796                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
797
798                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
799                 RETURN(0);
800         }
801
802         case OBD_IOC_CATLOGLIST: {
803                 int count = mds->mds_lov_desc.ld_tgt_count;
804                 rc = llog_catalog_list(obd, count, data);
805                 RETURN(rc);
806
807         }
808         case OBD_IOC_LLOG_CHECK:
809         case OBD_IOC_LLOG_CANCEL:
810         case OBD_IOC_LLOG_REMOVE: {
811                 struct llog_ctxt *ctxt =
812                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
813                 int rc2;
814                 __u32 group;
815
816                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
817                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
818                 rc = llog_ioctl(ctxt, cmd, data);
819                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
820                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
821                 group = FILTER_GROUP_MDS0 + mds->mds_id;
822                 rc2 = obd_set_info_async(mds->mds_osc_exp,
823                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
824                                          sizeof(group), &group, NULL);
825                 if (!rc)
826                         rc = rc2;
827                 RETURN(rc);
828         }
829         case OBD_IOC_LLOG_INFO:
830         case OBD_IOC_LLOG_PRINT: {
831                 struct llog_ctxt *ctxt =
832                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
833
834                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
835                 rc = llog_ioctl(ctxt, cmd, data);
836                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
837
838                 RETURN(rc);
839         }
840
841         case OBD_IOC_ABORT_RECOVERY:
842                 CERROR("aborting recovery for device %s\n", obd->obd_name);
843                 target_stop_recovery_thread(obd);
844                 RETURN(0);
845
846         default:
847                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
848                 RETURN(-EINVAL);
849         }
850         RETURN(0);
851
852 }
853
854 /* Collect the preconditions we need to allow client connects */
855 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
856 {
857         if (flag & CONFIG_LOG)
858                 obd->u.mds.mds_fl_cfglog = 1;
859         if (flag & CONFIG_SYNC)
860                 obd->u.mds.mds_fl_synced = 1;
861         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
862                 /* Open for clients */
863                 obd->obd_no_conn = 0;
864 }
865
866 struct mds_lov_sync_info {
867         struct obd_device *mlsi_obd;     /* the lov device to sync */
868         struct obd_device *mlsi_watched; /* target osc */
869         __u32              mlsi_index;   /* index of target */
870 };
871
872 static int mds_propagate_capa_keys(struct mds_obd *mds)
873 {
874         struct lustre_capa_key *key;
875         int i, rc = 0;
876
877         ENTRY;
878
879         if (!mds->mds_capa_keys)
880                 RETURN(0);
881
882         for (i = 0; i < 2; i++) {
883                 key = &mds->mds_capa_keys[i];
884                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
885
886                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
887                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
888                 if (rc) {
889                         DEBUG_CAPA_KEY(D_ERROR, key,
890                                        "propagate failed (rc = %d) for", rc);
891                         RETURN(rc);
892                 }
893         }
894
895         RETURN(0);
896 }
897
898 /* We only sync one osc at a time, so that we don't have to hold
899    any kind of lock on the whole mds_lov_desc, which may change
900    (grow) as a result of mds_lov_add_ost.  This also avoids any
901    kind of mismatch between the lov_desc and the mds_lov_desc,
902    which are not in lock-step during lov_add_obd */
903 static int __mds_lov_synchronize(void *data)
904 {
905         struct mds_lov_sync_info *mlsi = data;
906         struct obd_device *obd = mlsi->mlsi_obd;
907         struct obd_device *watched = mlsi->mlsi_watched;
908         struct mds_obd *mds = &obd->u.mds;
909         struct obd_uuid *uuid;
910         __u32  idx = mlsi->mlsi_index;
911         struct mds_group_info mgi;
912         int rc = 0;
913         ENTRY;
914
915         OBD_FREE(mlsi, sizeof(*mlsi));
916
917         LASSERT(obd);
918         LASSERT(watched);
919         uuid = &watched->u.cli.cl_target_uuid;
920         LASSERT(uuid);
921
922         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
923
924         rc = mds_lov_update_mds(obd, watched, idx, uuid);
925         if (rc != 0) {
926                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
927                 GOTO(out, rc);
928         }
929         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
930         mgi.uuid = uuid;
931
932         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
933                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
934         if (rc != 0)
935                 GOTO(out, rc);
936
937         /* propagate capability keys */
938         rc = mds_propagate_capa_keys(mds);
939         if (rc)
940                 GOTO(out, rc);
941
942         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
943                           mds->mds_lov_desc.ld_tgt_count,
944                           NULL, NULL, uuid);
945
946         if (rc != 0) {
947                 CERROR("%s failed at llog_origin_connect: %d\n",
948                        obd_uuid2str(uuid), rc);
949                 GOTO(out, rc);
950         }
951
952         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
953               obd->obd_name, obd_uuid2str(uuid));
954         /*
955          * FIXME: this obd_stopping was useless, 
956          * since obd in mdt layer was set
957          */
958         if (obd->obd_stopping)
959                 GOTO(out, rc = -ENODEV);
960
961         rc = mds_lov_clear_orphans(mds, uuid);
962         if (rc != 0) {
963                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
964                        obd_uuid2str(uuid), rc);
965                 GOTO(out, rc);
966         }
967
968         if (obd->obd_upcall.onu_owner) {
969                 /*
970                  * This is a hack for mds_notify->mdd_notify. When the mds obd
971                  * in mdd is removed, This hack should be removed.
972                  */
973                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
974                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
975                                                  obd->obd_upcall.onu_owner);
976         }
977         EXIT;
978 out:
979         if (rc) {
980                 /* Deactivate it for safety */
981                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
982                        rc);
983                 if (!obd->obd_stopping && mds->mds_osc_obd &&
984                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping) 
985                         obd_notify(mds->mds_osc_obd, watched,
986                                    OBD_NOTIFY_INACTIVE, NULL);
987         }
988
989         class_decref(obd);
990         return rc;
991 }
992
993 int mds_lov_synchronize(void *data)
994 {
995         struct mds_lov_sync_info *mlsi = data;
996         char name[20];
997
998         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
999                 /* There is still a watched target,
1000                 but we don't know its index */
1001                 sprintf(name, "ll_sync_tgt");
1002         else
1003                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1004         ptlrpc_daemonize(name);
1005
1006         RETURN(__mds_lov_synchronize(data));
1007 }
1008
1009 int mds_lov_start_synchronize(struct obd_device *obd,
1010                               struct obd_device *watched,
1011                               void *data, int nonblock)
1012 {
1013         struct mds_lov_sync_info *mlsi;
1014         int rc;
1015
1016         ENTRY;
1017
1018         LASSERT(watched);
1019
1020         OBD_ALLOC(mlsi, sizeof(*mlsi));
1021         if (mlsi == NULL)
1022                 RETURN(-ENOMEM);
1023
1024         mlsi->mlsi_obd = obd;
1025         mlsi->mlsi_watched = watched;
1026         if (data)
1027                 mlsi->mlsi_index = *(__u32 *)data;
1028         else
1029                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
1030
1031         /* Although class_export_get(obd->obd_self_export) would lock
1032            the MDS in place, since it's only a self-export
1033            it doesn't lock the LOV in place.  The LOV can be disconnected
1034            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1035            Simply taking an export ref on the LOV doesn't help, because it's
1036            still disconnected. Taking an obd reference insures that we don't
1037            disconnect the LOV.  This of course means a cleanup won't
1038            finish for as long as the sync is blocking. */
1039         class_incref(obd);
1040
1041         if (nonblock) {
1042                 /* Synchronize in the background */
1043                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1044                                        CLONE_VM | CLONE_FILES);
1045                 if (rc < 0) {
1046                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
1047                                obd->obd_name, rc);
1048                         class_decref(obd);
1049                 } else {
1050                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1051                                "thread=%d\n", obd->obd_name,
1052                                mlsi->mlsi_index, rc);
1053                         rc = 0;
1054                 }
1055         } else {
1056                 rc = __mds_lov_synchronize((void *)mlsi);
1057         }
1058
1059         RETURN(rc);
1060 }
1061
1062 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1063                enum obd_notify_event ev, void *data)
1064 {
1065         int rc = 0;
1066         ENTRY;
1067
1068         switch (ev) {
1069         /* We only handle these: */
1070         case OBD_NOTIFY_ACTIVE:
1071         case OBD_NOTIFY_SYNC:
1072         case OBD_NOTIFY_SYNC_NONBLOCK:
1073                 break;
1074         case OBD_NOTIFY_CONFIG:
1075                 mds_allow_cli(obd, (unsigned int)data);
1076         default:
1077                 RETURN(0);
1078         }
1079
1080         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1081         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1082                 CERROR("unexpected notification of %s %s!\n",
1083                        watched->obd_type->typ_name, watched->obd_name);
1084                 RETURN(-EINVAL);
1085         }
1086
1087         if (obd->obd_recovering) {
1088                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1089                       obd->obd_name,
1090                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1091                 /* We still have to fix the lov descriptor for ost's added
1092                    after the mdt in the config log.  They didn't make it into
1093                    mds_lov_connect. */
1094                 mutex_down(&obd->obd_dev_sem);
1095                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1096                 if (rc) {
1097                         mutex_up(&obd->obd_dev_sem);
1098                         RETURN(rc);
1099                 }
1100                 /* We should update init llog here too for replay unlink and 
1101                  * possiable llog init race when recovery complete */
1102                 llog_cat_initialize(obd, NULL, 
1103                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
1104                                     &watched->u.cli.cl_target_uuid);
1105                 mutex_up(&obd->obd_dev_sem);
1106                 mds_allow_cli(obd, CONFIG_SYNC);
1107                 RETURN(rc);
1108         }
1109
1110         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
1111         rc = mds_lov_start_synchronize(obd, watched, data,
1112                                        !(ev == OBD_NOTIFY_SYNC));
1113
1114         lquota_recovery(mds_quota_interface_ref, obd);
1115
1116         RETURN(rc);
1117 }
1118
1119 /* Convert the on-disk LOV EA structre.
1120  * We always try to convert from an old LOV EA format to the common in-memory
1121  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1122  * then convert back to the new on-disk format and save it back to disk
1123  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1124  * to convert it each time this inode is accessed.
1125  *
1126  * This function is a bit interesting in the error handling.  We can safely
1127  * ship the old lmm to the client in case of failure, since it uses the same
1128  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1129  * reason.  We will not delete the old lmm data until we have written the
1130  * new format lmm data in fsfilt_set_md(). */
1131 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1132                        struct lov_mds_md *lmm, int lmm_size)
1133 {
1134         struct lov_stripe_md *lsm = NULL;
1135         void *handle;
1136         int rc, err;
1137         ENTRY;
1138
1139         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1140             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1141                 RETURN(0);
1142
1143         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1144                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1145                LOV_MAGIC);
1146
1147         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1148         if (rc < 0)
1149                 GOTO(conv_end, rc);
1150
1151         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1152         if (rc < 0)
1153                 GOTO(conv_free, rc);
1154         lmm_size = rc;
1155
1156         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1157         if (IS_ERR(handle)) {
1158                 rc = PTR_ERR(handle);
1159                 GOTO(conv_free, rc);
1160         }
1161
1162         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1163
1164         err = fsfilt_commit(obd, inode, handle, 0);
1165         if (!rc)
1166                 rc = err ? err : lmm_size;
1167         GOTO(conv_free, rc);
1168 conv_free:
1169         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1170 conv_end:
1171         return rc;
1172 }