Whamcloud - gitweb
Fixes some scability and access to not inited memory problems
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <lustre_mds.h>
36 #include <lustre/lustre_idl.h>
37 #include <obd_class.h>
38 #include <obd_lov.h>
39 #include <lustre_lib.h>
40 #include <lustre_fsfilt.h>
41
42 #include "mds_internal.h"
43
44 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
45 {
46         struct mds_obd *mds = &obd->u.mds;
47         unsigned int i=0, j;
48
49         CDEBUG(D_INFO, "dump from %s\n", label);
50         if (mds->mds_lov_page_dirty == NULL) {
51                 CERROR("NULL bitmap!\n");
52                 GOTO(skip_bitmap, i);
53         }
54
55         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
56                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
57 skip_bitmap:
58         if (mds->mds_lov_page_array == NULL) {
59                 CERROR("not init page array!\n");
60                 GOTO(skip_array, i);
61
62         }
63         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
64                 obd_id *data = mds->mds_lov_page_array[i];
65
66                 if (data == NULL)
67                         continue;
68
69                 for(j=0; j < OBJID_PER_PAGE(); j++) {
70                         if (data[j] == 0)
71                                 continue;
72                         CDEBUG(D_INFO,"objid page %u idx %u - %llu ",i,j,data[j]);
73                 }
74         }
75 skip_array:
76         EXIT;
77 }
78
79 int mds_lov_init_objids(struct obd_device *obd)
80 {
81         struct mds_obd *mds = &obd->u.mds;
82         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
83         struct file *file;
84         int rc;
85         ENTRY;
86
87         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
88
89         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
90         if (mds->mds_lov_page_dirty == NULL)
91                 RETURN(-ENOMEM);
92
93
94         OBD_ALLOC(mds->mds_lov_page_array, size);
95         if (mds->mds_lov_page_array == NULL)
96                 GOTO(err_free_bitmap, rc = -ENOMEM);
97
98         /* open and test the lov objd file */
99         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
100         if (IS_ERR(file)) {
101                 rc = PTR_ERR(file);
102                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
103                 GOTO(err_free, rc = PTR_ERR(file));
104         }
105         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
106                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
107                        file->f_dentry->d_inode->i_mode);
108                 GOTO(err_open, rc = -ENOENT);
109         }
110         mds->mds_lov_objid_filp = file;
111
112         RETURN (0);
113 err_open:
114         if (filp_close((struct file *)file, 0))
115                 CERROR("can't close %s after error\n", LOV_OBJID);
116 err_free:
117         OBD_FREE(mds->mds_lov_page_array, size);
118 err_free_bitmap:
119         FREE_BITMAP(mds->mds_lov_page_dirty);
120
121         RETURN(rc);
122 }
123 EXPORT_SYMBOL(mds_lov_init_objids);
124
125 void mds_lov_destroy_objids(struct obd_device *obd)
126 {
127         struct mds_obd *mds = &obd->u.mds;
128         int i, rc;
129         ENTRY;
130
131         if (mds->mds_lov_page_array != NULL) {
132                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
133                         obd_id *data = mds->mds_lov_page_array[i];
134                         if (data != NULL)
135                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
136                 }
137                 OBD_FREE(mds->mds_lov_page_array,
138                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
139         }
140
141         if (mds->mds_lov_objid_filp) {
142                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
143                 mds->mds_lov_objid_filp = NULL;
144                 if (rc)
145                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
146         }
147
148         FREE_BITMAP(mds->mds_lov_page_dirty);
149         EXIT;
150 }
151 EXPORT_SYMBOL(mds_lov_destroy_objids);
152
153 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
154 {
155         struct mds_obd *mds = &obd->u.mds;
156         int j;
157         ENTRY;
158
159         /* if we create file without objects - lmm is NULL */
160         if (lmm == NULL)
161                 return;
162
163         for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) {
164                 int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx);
165                 obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id);
166                 int page = i / OBJID_PER_PAGE();
167                 int idx = i % OBJID_PER_PAGE();
168                 obd_id *data = mds->mds_lov_page_array[page];
169
170                 CDEBUG(D_INODE,"update last object for ost %d - new %llu"
171                                " old %llu\n", i, id, data[idx]);
172                 if (id > data[idx]) {
173                         data[idx] = id;
174                         bitmap_set(mds->mds_lov_page_dirty, page);
175                 }
176         }
177         EXIT;
178 }
179 EXPORT_SYMBOL(mds_lov_update_objids);
180
181 static int mds_lov_read_objids(struct obd_device *obd)
182 {
183         struct mds_obd *mds = &obd->u.mds;
184         loff_t off = 0;
185         int i, rc, count = 0, page = 0;
186         size_t size;
187         ENTRY;
188
189         /* Read everything in the file, even if our current lov desc
190            has fewer targets. Old targets not in the lov descriptor
191            during mds setup may still have valid objids. */
192         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
193         if (size == 0)
194                 RETURN(0);
195
196         page = (size/(OBJID_PER_PAGE()*sizeof(obd_id)))+1;
197         CDEBUG(D_INFO, "file size %d pages %d\n", size, page); 
198         for(i=0; i < page; i++) {
199                 obd_id *data =  mds->mds_lov_page_array[i];
200                 loff_t off_old = off;
201
202                 LASSERT(data == NULL);
203                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
204                 if (data == NULL)
205                         GOTO(out, rc = -ENOMEM);
206
207                 mds->mds_lov_page_array[i] = data;
208
209                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
210                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
211                 if (rc < 0) {
212                         CERROR("Error reading objids %d\n", rc);
213                         GOTO(out, rc);
214                 }
215                 if (off == off_old)
216                         break; // eof
217
218                 count += (off-off_old+sizeof(obd_id)-1)/sizeof(obd_id);
219         }
220         mds->mds_lov_objid_count = count;
221         mds->mds_lov_objid_lastpage = count / OBJID_PER_PAGE();
222         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
223         CDEBUG(D_INFO, "Read %u objid\n", count);
224 out:
225         mds_lov_dump_objids("read",obd);
226
227         RETURN(0);
228 }
229
230 int mds_lov_write_objids(struct obd_device *obd)
231 {
232         struct mds_obd *mds = &obd->u.mds;
233         int i, rc = 0;
234         ENTRY;
235
236         if (bitmap_check_empty(mds->mds_lov_page_dirty))
237                 RETURN(0);
238
239         mds_lov_dump_objids("write", obd);
240
241         foreach_bit(mds->mds_lov_page_dirty, i) {
242                 obd_id *data =  mds->mds_lov_page_array[i];
243                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
244                 loff_t off = i * size;
245
246                 LASSERT(data != NULL);
247
248                 /* check for particaly filled last page */
249                 if (i == mds->mds_lov_objid_lastpage) {
250                         size = mds->mds_lov_objid_lastidx * sizeof(obd_id);
251                 }
252
253                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
254                                          size, &off, 0);
255                 if (rc < 0)
256                         break;
257                 bitmap_clear(mds->mds_lov_page_dirty, i);
258         }
259         if (rc >= 0)
260                 rc = 0;
261
262         RETURN(rc);
263 }
264 EXPORT_SYMBOL(mds_lov_write_objids);
265
266 static int mds_lov_get_objid(struct obd_device * obd, struct obd_export *export,
267                              __u32 idx)
268 {
269         struct mds_obd *mds = &obd->u.mds;
270         unsigned int page;
271         unsigned int off;
272         obd_id *data;
273         int rc = 0;
274         ENTRY;
275
276         page = idx / OBJID_PER_PAGE();
277         off = idx % OBJID_PER_PAGE();
278         data = mds->mds_lov_page_array[page];
279         if (data == NULL) {
280                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
281                 if (data == NULL)
282                         GOTO(out, rc = -ENOMEM);
283
284                 mds->mds_lov_page_array[page] = data;
285         }
286
287         printk("get %d - %p - %d/%d\n", idx, data, page, off);
288         if (data[off] == 0) {
289                 /* We never read this lastid; ask the osc */
290                 struct obd_id_info lastid;
291                 __u32 size = sizeof(lastid);
292
293                 lastid.idx = idx;
294                 lastid.data = &data[off];
295                 rc = obd_get_info(export, sizeof("last_id"),
296                                   "last_id", &size, &lastid);
297                 if (rc)
298                         GOTO(out, rc);
299
300                 if (idx > mds->mds_lov_objid_count) {
301                         mds->mds_lov_objid_count = idx;
302                         mds->mds_lov_objid_lastpage = page;
303                         mds->mds_lov_objid_lastidx = off;
304                 }
305                 bitmap_set(mds->mds_lov_page_dirty, page);
306         }
307 out:
308         RETURN(rc);
309 }
310
311 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
312 {
313         int rc;
314         struct obdo oa;
315         struct obd_trans_info oti = {0};
316         struct lov_stripe_md  *empty_ea = NULL;
317         ENTRY;
318
319         LASSERT(mds->mds_lov_page_array != NULL);
320
321         /* This create will in fact either create or destroy:  If the OST is
322          * missing objects below this ID, they will be created.  If it finds
323          * objects above this ID, they will be removed. */
324         memset(&oa, 0, sizeof(oa));
325         oa.o_flags = OBD_FL_DELORPHAN;
326         oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
327         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
328         if (ost_uuid != NULL) {
329                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
330                 oa.o_valid |= OBD_MD_FLINLINE;
331         }
332         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
333
334         RETURN(rc);
335 }
336 /* update the LOV-OSC knowledge of the last used object id's */
337 /* for all targets */
338 /* is we realy need this ? all osc's should be pass via __mds_lov_synchronize
339  * and call */
340 #define MDS_LOV_SETID_COUNT (CFS_PAGE_SIZE / sizeof(struct obd_id_info))
341
342 int mds_lov_set_nextid(struct obd_device *obd)
343 {
344         struct mds_obd *mds = &obd->u.mds;
345         int i = 0, j, rc = 0;
346         struct obd_id_info *info;
347         ENTRY;
348
349         LASSERT(!obd->obd_recovering);
350
351         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
352         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
353
354         OBD_ALLOC(info, CFS_PAGE_SIZE);
355         if (info == NULL)
356                 RETURN(-ENOMEM);
357
358         while(i < mds->mds_lov_desc.ld_tgt_count) {
359                 for(j=0; j < MDS_LOV_SETID_COUNT; i++, j++) {
360                         int page = i / OBJID_PER_PAGE();
361                         int idx = i % OBJID_PER_PAGE();
362                         obd_id *data = mds->mds_lov_page_array[page];
363
364                         if (i == mds->mds_lov_desc.ld_tgt_count)
365                                 break;
366
367                         info[j].idx = i;
368                         info[j].data = &data[idx];
369                 }
370
371                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
372                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
373                 if (rc) {
374                         CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
375                                  obd->obd_name, rc);
376                         break;
377                 }
378         }
379         OBD_FREE(info, CFS_PAGE_SIZE);
380
381         RETURN(rc);
382
383 }
384
385 /* for one target */
386 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
387 {
388         struct mds_obd *mds = &obd->u.mds;
389         int rc;
390         struct obd_id_info info;
391         ENTRY;
392
393         LASSERT(!obd->obd_recovering);
394
395         /* obd->obd_dev_sem must be held so mds_lov_objids doesn't change */
396         LASSERT_SEM_LOCKED(&obd->obd_dev_sem);
397
398         info.idx = idx;
399         info.data = id;
400
401         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
402                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
403         if (rc)
404                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
405                         obd->obd_name, rc);
406
407         RETURN(rc);
408 }
409
410 /* Update the lov desc for a new size lov. */
411 static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
412 {
413         struct mds_obd *mds = &obd->u.mds;
414         struct lov_desc *ld;
415         __u32 stripes, valsize = sizeof(mds->mds_lov_desc);
416         int page, rc = 0;
417         ENTRY;
418
419         OBD_ALLOC(ld, sizeof(*ld));
420         if (!ld)
421                 RETURN(-ENOMEM);
422
423         rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC,
424                           &valsize, ld);
425         if (rc)
426                 GOTO(out, rc);
427
428         /* The size of the LOV target table may have increased. */
429         page = ld->ld_tgt_count / OBJID_PER_PAGE();
430         if (mds->mds_lov_page_array[page] == NULL) {
431                 obd_id *ids;
432
433                 OBD_ALLOC(ids, MDS_LOV_ALLOC_SIZE);
434                 if (ids == NULL)
435                         GOTO(out, rc = -ENOMEM);
436
437                 mds->mds_lov_page_array[page] = ids;
438         }
439
440         /* Don't change the mds_lov_desc until the objids size matches the
441            count (paranoia) */
442         mds->mds_lov_desc = *ld;
443         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d\n",
444                mds->mds_lov_desc.ld_tgt_count);
445
446         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
447                                mds->mds_lov_desc.ld_tgt_count);
448
449         mds->mds_max_mdsize = lov_mds_md_size(stripes);
450         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
451         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
452                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
453                stripes);
454
455         /* If we added a target we have to reconnect the llogs */
456         /* We only _need_ to do this at first add (idx), or the first time
457            after recovery.  However, it should now be safe to call anytime. */
458         rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
459
460         /*XXX this notifies the MDD until lov handling use old mds code */
461         if (obd->obd_upcall.onu_owner) {
462                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
463                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
464                                                  obd->obd_upcall.onu_owner);
465         }
466 out:
467         OBD_FREE(ld, sizeof(*ld));
468         RETURN(rc);
469 }
470
471
472 #define MDSLOV_NO_INDEX -1
473
474 /* Inform MDS about new/updated target */
475 static int mds_lov_update_mds(struct obd_device *obd,
476                               struct obd_device *watched,
477                               __u32 idx, struct obd_uuid *uuid)
478 {
479         struct mds_obd *mds = &obd->u.mds;
480         __u32 old_count;
481         int rc = 0;
482         int page;
483         int off;
484         obd_id *data;
485
486         ENTRY;
487
488         /* Don't let anyone else mess with mds_lov_objids now */
489         mutex_down(&obd->obd_dev_sem);
490
491         old_count = mds->mds_lov_desc.ld_tgt_count;
492         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
493         if (rc)
494                 GOTO(out, rc);
495
496         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
497                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
498                mds->mds_lov_desc.ld_tgt_count);
499
500         /* idx is set as data from lov_notify. */
501         if (idx == MDSLOV_NO_INDEX || obd->obd_recovering)
502                 GOTO(out, rc);
503
504         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
505                 CERROR("index %d > count %d!\n", idx,
506                        mds->mds_lov_desc.ld_tgt_count);
507                 GOTO(out, rc = -EINVAL);
508         }
509
510         page = idx / OBJID_PER_PAGE();
511         off = idx % OBJID_PER_PAGE();
512         data = mds->mds_lov_page_array[page];
513         CDEBUG(D_CONFIG, "idx %d - %p - %d/%d\n", idx, data, page, off);
514         if (data[off] == 0) {
515                 rc = mds_lov_get_objid(obd, watched->obd_self_export, idx);
516         } else {
517                 /* We have read this lastid from disk; tell the osc.
518                    Don't call this during recovery. */
519                 rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
520                 if (rc) {
521                         CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
522                         /* Don't abort the rest of the sync */
523                         rc = 0;
524                 }
525         }
526
527         CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
528                data[off], idx, rc);
529 out:
530         mutex_up(&obd->obd_dev_sem);
531         RETURN(rc);
532 }
533
534 /* update the LOV-OSC knowledge of the last used object id's */
535 int mds_lov_connect(struct obd_device *obd, char * lov_name)
536 {
537         struct mds_obd *mds = &obd->u.mds;
538         struct lustre_handle conn = {0,};
539         struct obd_connect_data *data;
540         int rc;
541         ENTRY;
542
543         if (IS_ERR(mds->mds_osc_obd))
544                 RETURN(PTR_ERR(mds->mds_osc_obd));
545
546         if (mds->mds_osc_obd)
547                 RETURN(0);
548
549         mds->mds_osc_obd = class_name2obd(lov_name);
550         if (!mds->mds_osc_obd) {
551                 CERROR("MDS cannot locate LOV %s\n", lov_name);
552                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
553                 RETURN(-ENOTCONN);
554         }
555
556         OBD_ALLOC(data, sizeof(*data));
557         if (data == NULL)
558                 RETURN(-ENOMEM);
559         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
560                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
561                                   OBD_CONNECT_OSS_CAPA;
562 #ifdef HAVE_LRU_RESIZE_SUPPORT
563         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
564 #endif
565         data->ocd_version = LUSTRE_VERSION_CODE;
566         data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
567         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
568         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
569         OBD_FREE(data, sizeof(*data));
570         if (rc) {
571                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
572                 mds->mds_osc_obd = ERR_PTR(rc);
573                 RETURN(rc);
574         }
575         mds->mds_osc_exp = class_conn2export(&conn);
576
577         rc = obd_register_observer(mds->mds_osc_obd, obd);
578         if (rc) {
579                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
580                        lov_name, rc);
581                 GOTO(err_discon, rc);
582         }
583
584         /* Deny new client connections until we are sure we have some OSTs */
585         obd->obd_no_conn = 1;
586
587         mutex_down(&obd->obd_dev_sem);
588         rc = mds_lov_read_objids(obd);
589         if (rc) {
590                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
591                 GOTO(err_reg, rc);
592         }
593
594         rc = mds_lov_update_desc(obd, mds->mds_osc_exp);
595         if (rc)
596                 GOTO(err_reg, rc);
597
598         /* If we're mounting this code for the first time on an existing FS,
599          * we need to populate the objids array from the real OST values */
600         if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objid_count) {
601                 __u32 i = mds->mds_lov_objid_count;
602                 for(; i <= mds->mds_lov_desc.ld_tgt_count; i++) {
603                         rc = mds_lov_get_objid(obd, mds->mds_osc_exp, i);
604                         if (rc != 0)
605                                 break;
606                 }
607                 if (rc == 0)
608                         rc = mds_lov_write_objids(obd);
609                 if (rc)
610                         CERROR("got last objids from OSTs, but error "
611                                 "in update objids file: %d\n", rc);
612         }
613         mutex_up(&obd->obd_dev_sem);
614
615         /* I want to see a callback happen when the OBD moves to a
616          * "For General Use" state, and that's when we'll call
617          * set_nextid().  The class driver can help us here, because
618          * it can use the obd_recovering flag to determine when the
619          * the OBD is full available. */
620         /* MDD device will care about that
621         if (!obd->obd_recovering)
622                 rc = mds_postrecov(obd);
623          */
624         RETURN(rc);
625
626 err_reg:
627         mutex_up(&obd->obd_dev_sem);
628         obd_register_observer(mds->mds_osc_obd, NULL);
629 err_discon:
630         obd_disconnect(mds->mds_osc_exp);
631         mds->mds_osc_exp = NULL;
632         mds->mds_osc_obd = ERR_PTR(rc);
633         RETURN(rc);
634 }
635
636 int mds_lov_disconnect(struct obd_device *obd)
637 {
638         struct mds_obd *mds = &obd->u.mds;
639         int rc = 0;
640         ENTRY;
641
642         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
643                 obd_register_observer(mds->mds_osc_obd, NULL);
644
645                 /* The actual disconnect of the mds_lov will be called from
646                  * class_disconnect_exports from mds_lov_clean. So we have to
647                  * ensure that class_cleanup doesn't fail due to the extra ref
648                  * we're holding now. The mechanism to do that already exists -
649                  * the obd_force flag. We'll drop the final ref to the
650                  * mds_osc_exp in mds_cleanup. */
651                 mds->mds_osc_obd->obd_force = 1;
652         }
653
654         RETURN(rc);
655 }
656
657 int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
658                   void *karg, void *uarg)
659 {
660         static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" };
661         struct obd_device *obd = exp->exp_obd;
662         struct mds_obd *mds = &obd->u.mds;
663         struct obd_ioctl_data *data = karg;
664         struct lvfs_run_ctxt saved;
665         int rc = 0;
666
667         ENTRY;
668         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
669
670         switch (cmd) {
671         case OBD_IOC_RECORD: {
672                 char *name = data->ioc_inlbuf1;
673                 if (mds->mds_cfg_llh)
674                         RETURN(-EBUSY);
675
676                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
677                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
678                                  &mds->mds_cfg_llh, NULL, name);
679                 if (rc == 0)
680                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
681                                          &cfg_uuid);
682                 else
683                         mds->mds_cfg_llh = NULL;
684                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
685
686                 RETURN(rc);
687         }
688
689         case OBD_IOC_ENDRECORD: {
690                 if (!mds->mds_cfg_llh)
691                         RETURN(-EBADF);
692
693                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
694                 rc = llog_close(mds->mds_cfg_llh);
695                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
696
697                 mds->mds_cfg_llh = NULL;
698                 RETURN(rc);
699         }
700
701         case OBD_IOC_CLEAR_LOG: {
702                 char *name = data->ioc_inlbuf1;
703                 if (mds->mds_cfg_llh)
704                         RETURN(-EBUSY);
705
706                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
707                 rc = llog_create(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT),
708                                  &mds->mds_cfg_llh, NULL, name);
709                 if (rc == 0) {
710                         llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN,
711                                          NULL);
712
713                         rc = llog_destroy(mds->mds_cfg_llh);
714                         llog_free_handle(mds->mds_cfg_llh);
715                 }
716                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
717
718                 mds->mds_cfg_llh = NULL;
719                 RETURN(rc);
720         }
721
722         case OBD_IOC_DORECORD: {
723                 char *cfg_buf;
724                 struct llog_rec_hdr rec;
725                 if (!mds->mds_cfg_llh)
726                         RETURN(-EBADF);
727
728                 rec.lrh_len = llog_data_len(data->ioc_plen1);
729
730                 if (data->ioc_type == LUSTRE_CFG_TYPE) {
731                         rec.lrh_type = OBD_CFG_REC;
732                 } else {
733                         CERROR("unknown cfg record type:%d \n", data->ioc_type);
734                         RETURN(-EINVAL);
735                 }
736
737                 OBD_ALLOC(cfg_buf, data->ioc_plen1);
738                 if (cfg_buf == NULL)
739                         RETURN(-EINVAL);
740                 rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1);
741                 if (rc) {
742                         OBD_FREE(cfg_buf, data->ioc_plen1);
743                         RETURN(rc);
744                 }
745
746                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
747                 rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0,
748                                     cfg_buf, -1);
749                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
750
751                 OBD_FREE(cfg_buf, data->ioc_plen1);
752                 RETURN(rc);
753         }
754
755         case OBD_IOC_PARSE: {
756                 struct llog_ctxt *ctxt =
757                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
758                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
759                 rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL);
760                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
761                 if (rc)
762                         RETURN(rc);
763
764                 RETURN(rc);
765         }
766
767         case OBD_IOC_DUMP_LOG: {
768                 struct llog_ctxt *ctxt =
769                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
770                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
771                 rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL);
772                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
773                 if (rc)
774                         RETURN(rc);
775
776                 RETURN(rc);
777         }
778
779         case OBD_IOC_SYNC: {
780                 CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name);
781                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
782                 RETURN(rc);
783         }
784
785         case OBD_IOC_SET_READONLY: {
786                 void *handle;
787                 struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode;
788                 BDEVNAME_DECLARE_STORAGE(tmp);
789                 CERROR("*** setting device %s read-only ***\n",
790                        ll_bdevname(obd->u.obt.obt_sb, tmp));
791
792                 handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL);
793                 if (!IS_ERR(handle))
794                         rc = fsfilt_commit(obd, inode, handle, 1);
795
796                 CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name);
797                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
798
799                 lvfs_set_rdonly(obd, obd->u.obt.obt_sb);
800                 RETURN(0);
801         }
802
803         case OBD_IOC_CATLOGLIST: {
804                 int count = mds->mds_lov_desc.ld_tgt_count;
805                 rc = llog_catalog_list(obd, count, data);
806                 RETURN(rc);
807
808         }
809         case OBD_IOC_LLOG_CHECK:
810         case OBD_IOC_LLOG_CANCEL:
811         case OBD_IOC_LLOG_REMOVE: {
812                 struct llog_ctxt *ctxt =
813                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
814                 int rc2;
815                 __u32 group;
816
817                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
818                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
819                 rc = llog_ioctl(ctxt, cmd, data);
820                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
821                 llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
822                 group = FILTER_GROUP_MDS0 + mds->mds_id;
823                 rc2 = obd_set_info_async(mds->mds_osc_exp,
824                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
825                                          sizeof(group), &group, NULL);
826                 if (!rc)
827                         rc = rc2;
828                 RETURN(rc);
829         }
830         case OBD_IOC_LLOG_INFO:
831         case OBD_IOC_LLOG_PRINT: {
832                 struct llog_ctxt *ctxt =
833                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
834
835                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
836                 rc = llog_ioctl(ctxt, cmd, data);
837                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
838
839                 RETURN(rc);
840         }
841
842         case OBD_IOC_ABORT_RECOVERY:
843                 CERROR("aborting recovery for device %s\n", obd->obd_name);
844                 target_stop_recovery_thread(obd);
845                 RETURN(0);
846
847         default:
848                 CDEBUG(D_INFO, "unknown command %x\n", cmd);
849                 RETURN(-EINVAL);
850         }
851         RETURN(0);
852
853 }
854
855 /* Collect the preconditions we need to allow client connects */
856 static void mds_allow_cli(struct obd_device *obd, unsigned int flag)
857 {
858         if (flag & CONFIG_LOG)
859                 obd->u.mds.mds_fl_cfglog = 1;
860         if (flag & CONFIG_SYNC)
861                 obd->u.mds.mds_fl_synced = 1;
862         if (obd->u.mds.mds_fl_cfglog /* bz11778: && obd->u.mds.mds_fl_synced */)
863                 /* Open for clients */
864                 obd->obd_no_conn = 0;
865 }
866
867 struct mds_lov_sync_info {
868         struct obd_device *mlsi_obd;     /* the lov device to sync */
869         struct obd_device *mlsi_watched; /* target osc */
870         __u32              mlsi_index;   /* index of target */
871 };
872
873 static int mds_propagate_capa_keys(struct mds_obd *mds)
874 {
875         struct lustre_capa_key *key;
876         int i, rc = 0;
877
878         ENTRY;
879
880         if (!mds->mds_capa_keys)
881                 RETURN(0);
882
883         for (i = 0; i < 2; i++) {
884                 key = &mds->mds_capa_keys[i];
885                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
886
887                 rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
888                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
889                 if (rc) {
890                         DEBUG_CAPA_KEY(D_ERROR, key,
891                                        "propagate failed (rc = %d) for", rc);
892                         RETURN(rc);
893                 }
894         }
895
896         RETURN(0);
897 }
898
899 /* We only sync one osc at a time, so that we don't have to hold
900    any kind of lock on the whole mds_lov_desc, which may change
901    (grow) as a result of mds_lov_add_ost.  This also avoids any
902    kind of mismatch between the lov_desc and the mds_lov_desc,
903    which are not in lock-step during lov_add_obd */
904 static int __mds_lov_synchronize(void *data)
905 {
906         struct mds_lov_sync_info *mlsi = data;
907         struct obd_device *obd = mlsi->mlsi_obd;
908         struct obd_device *watched = mlsi->mlsi_watched;
909         struct mds_obd *mds = &obd->u.mds;
910         struct obd_uuid *uuid;
911         __u32  idx = mlsi->mlsi_index;
912         struct mds_group_info mgi;
913         int rc = 0;
914         ENTRY;
915
916         OBD_FREE(mlsi, sizeof(*mlsi));
917
918         LASSERT(obd);
919         LASSERT(watched);
920         uuid = &watched->u.cli.cl_target_uuid;
921         LASSERT(uuid);
922
923         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
924
925         rc = mds_lov_update_mds(obd, watched, idx, uuid);
926         if (rc != 0) {
927                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
928                 GOTO(out, rc);
929         }
930         mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
931         mgi.uuid = uuid;
932
933         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
934                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
935         if (rc != 0)
936                 GOTO(out, rc);
937
938         /* propagate capability keys */
939         rc = mds_propagate_capa_keys(mds);
940         if (rc)
941                 GOTO(out, rc);
942
943         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
944                           mds->mds_lov_desc.ld_tgt_count,
945                           NULL, NULL, uuid);
946
947         if (rc != 0) {
948                 CERROR("%s failed at llog_origin_connect: %d\n",
949                        obd_uuid2str(uuid), rc);
950                 GOTO(out, rc);
951         }
952
953         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
954               obd->obd_name, obd_uuid2str(uuid));
955         /*
956          * FIXME: this obd_stopping was useless, 
957          * since obd in mdt layer was set
958          */
959         if (obd->obd_stopping)
960                 GOTO(out, rc = -ENODEV);
961
962         rc = mds_lov_clear_orphans(mds, uuid);
963         if (rc != 0) {
964                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
965                        obd_uuid2str(uuid), rc);
966                 GOTO(out, rc);
967         }
968
969         if (obd->obd_upcall.onu_owner) {
970                 /*
971                  * This is a hack for mds_notify->mdd_notify. When the mds obd
972                  * in mdd is removed, This hack should be removed.
973                  */
974                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
975                  rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
976                                                  obd->obd_upcall.onu_owner);
977         }
978         EXIT;
979 out:
980         if (rc) {
981                 /* Deactivate it for safety */
982                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
983                        rc);
984                 if (!obd->obd_stopping && mds->mds_osc_obd &&
985                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping) 
986                         obd_notify(mds->mds_osc_obd, watched,
987                                    OBD_NOTIFY_INACTIVE, NULL);
988         }
989
990         class_decref(obd);
991         return rc;
992 }
993
994 int mds_lov_synchronize(void *data)
995 {
996         struct mds_lov_sync_info *mlsi = data;
997         char name[20];
998
999         if (mlsi->mlsi_index == MDSLOV_NO_INDEX)
1000                 /* There is still a watched target,
1001                 but we don't know its index */
1002                 sprintf(name, "ll_sync_tgt");
1003         else
1004                 snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
1005         ptlrpc_daemonize(name);
1006
1007         RETURN(__mds_lov_synchronize(data));
1008 }
1009
1010 int mds_lov_start_synchronize(struct obd_device *obd,
1011                               struct obd_device *watched,
1012                               void *data, int nonblock)
1013 {
1014         struct mds_lov_sync_info *mlsi;
1015         int rc;
1016
1017         ENTRY;
1018
1019         LASSERT(watched);
1020
1021         OBD_ALLOC(mlsi, sizeof(*mlsi));
1022         if (mlsi == NULL)
1023                 RETURN(-ENOMEM);
1024
1025         mlsi->mlsi_obd = obd;
1026         mlsi->mlsi_watched = watched;
1027         if (data)
1028                 mlsi->mlsi_index = *(__u32 *)data;
1029         else
1030                 mlsi->mlsi_index = MDSLOV_NO_INDEX;
1031
1032         /* Although class_export_get(obd->obd_self_export) would lock
1033            the MDS in place, since it's only a self-export
1034            it doesn't lock the LOV in place.  The LOV can be disconnected
1035            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
1036            Simply taking an export ref on the LOV doesn't help, because it's
1037            still disconnected. Taking an obd reference insures that we don't
1038            disconnect the LOV.  This of course means a cleanup won't
1039            finish for as long as the sync is blocking. */
1040         class_incref(obd);
1041
1042         if (nonblock) {
1043                 /* Synchronize in the background */
1044                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
1045                                        CLONE_VM | CLONE_FILES);
1046                 if (rc < 0) {
1047                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
1048                                obd->obd_name, rc);
1049                         class_decref(obd);
1050                 } else {
1051                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
1052                                "thread=%d\n", obd->obd_name,
1053                                mlsi->mlsi_index, rc);
1054                         rc = 0;
1055                 }
1056         } else {
1057                 rc = __mds_lov_synchronize((void *)mlsi);
1058         }
1059
1060         RETURN(rc);
1061 }
1062
1063 int mds_notify(struct obd_device *obd, struct obd_device *watched,
1064                enum obd_notify_event ev, void *data)
1065 {
1066         int rc = 0;
1067         ENTRY;
1068
1069         switch (ev) {
1070         /* We only handle these: */
1071         case OBD_NOTIFY_ACTIVE:
1072         case OBD_NOTIFY_SYNC:
1073         case OBD_NOTIFY_SYNC_NONBLOCK:
1074                 break;
1075         case OBD_NOTIFY_CONFIG:
1076                 mds_allow_cli(obd, (unsigned int)data);
1077         default:
1078                 RETURN(0);
1079         }
1080
1081         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
1082         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1083                 CERROR("unexpected notification of %s %s!\n",
1084                        watched->obd_type->typ_name, watched->obd_name);
1085                 RETURN(-EINVAL);
1086         }
1087
1088         if (obd->obd_recovering) {
1089                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1090                       obd->obd_name,
1091                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1092                 /* We still have to fix the lov descriptor for ost's added
1093                    after the mdt in the config log.  They didn't make it into
1094                    mds_lov_connect. */
1095                 mutex_down(&obd->obd_dev_sem);
1096                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
1097                 if (rc) {
1098                         mutex_up(&obd->obd_dev_sem);
1099                         RETURN(rc);
1100                 }
1101                 /* We should update init llog here too for replay unlink and 
1102                  * possiable llog init race when recovery complete */
1103                 llog_cat_initialize(obd, NULL, 
1104                                     obd->u.mds.mds_lov_desc.ld_tgt_count,
1105                                     &watched->u.cli.cl_target_uuid);
1106                 mutex_up(&obd->obd_dev_sem);
1107                 mds_allow_cli(obd, CONFIG_SYNC);
1108                 RETURN(rc);
1109         }
1110
1111         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
1112         rc = mds_lov_start_synchronize(obd, watched, data,
1113                                        !(ev == OBD_NOTIFY_SYNC));
1114
1115         lquota_recovery(mds_quota_interface_ref, obd);
1116
1117         RETURN(rc);
1118 }
1119
1120 /* Convert the on-disk LOV EA structre.
1121  * We always try to convert from an old LOV EA format to the common in-memory
1122  * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and
1123  * then convert back to the new on-disk format and save it back to disk
1124  * (obd_packmd() only ever saves to the new on-disk format) so we don't have
1125  * to convert it each time this inode is accessed.
1126  *
1127  * This function is a bit interesting in the error handling.  We can safely
1128  * ship the old lmm to the client in case of failure, since it uses the same
1129  * obd_unpackmd() code and can do the conversion if the MDS fails for some
1130  * reason.  We will not delete the old lmm data until we have written the
1131  * new format lmm data in fsfilt_set_md(). */
1132 int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
1133                        struct lov_mds_md *lmm, int lmm_size)
1134 {
1135         struct lov_stripe_md *lsm = NULL;
1136         void *handle;
1137         int rc, err;
1138         ENTRY;
1139
1140         if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC ||
1141             le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN))
1142                 RETURN(0);
1143
1144         CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n",
1145                inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic),
1146                LOV_MAGIC);
1147
1148         rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size);
1149         if (rc < 0)
1150                 GOTO(conv_end, rc);
1151
1152         rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm);
1153         if (rc < 0)
1154                 GOTO(conv_free, rc);
1155         lmm_size = rc;
1156
1157         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
1158         if (IS_ERR(handle)) {
1159                 rc = PTR_ERR(handle);
1160                 GOTO(conv_free, rc);
1161         }
1162
1163         rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov");
1164
1165         err = fsfilt_commit(obd, inode, handle, 0);
1166         if (!rc)
1167                 rc = err ? err : lmm_size;
1168         GOTO(conv_free, rc);
1169 conv_free:
1170         obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm);
1171 conv_end:
1172         return rc;
1173 }