Whamcloud - gitweb
move handling CATALOGS file at osc layer and forbid access to llog
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         CDEBUG(D_INFO, "dump from %s\n", label);
62         if (mds->mds_lov_page_dirty == NULL) {
63                 CERROR("NULL bitmap!\n");
64                 GOTO(skip_bitmap, i);
65         }
66
67         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68                 CDEBUG(D_INFO, "%u - %lx\n", i,
69                        mds->mds_lov_page_dirty->data[i]);
70 skip_bitmap:
71         if (mds->mds_lov_page_array == NULL) {
72                 CERROR("not init page array!\n");
73                 GOTO(skip_array, i);
74
75         }
76         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77                 obd_id *data = mds->mds_lov_page_array[i];
78
79                 if (data == NULL)
80                         continue;
81
82                 for(j=0; j < OBJID_PER_PAGE(); j++) {
83                         if (data[j] == 0)
84                                 continue;
85                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
86                                i, j, data[j]);
87                 }
88         }
89 skip_array:
90         EXIT;
91 }
92
93 int mds_lov_init_objids(struct obd_device *obd)
94 {
95         struct mds_obd *mds = &obd->u.mds;
96         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
97         struct file *file;
98         int rc;
99         ENTRY;
100
101         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
102
103         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
104         if (mds->mds_lov_page_dirty == NULL)
105                 RETURN(-ENOMEM);
106
107
108         OBD_ALLOC(mds->mds_lov_page_array, size);
109         if (mds->mds_lov_page_array == NULL)
110                 GOTO(err_free_bitmap, rc = -ENOMEM);
111
112         /* open and test the lov objd file */
113         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
114         if (IS_ERR(file)) {
115                 rc = PTR_ERR(file);
116                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
117                 GOTO(err_free, rc = PTR_ERR(file));
118         }
119         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
120                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
121                        file->f_dentry->d_inode->i_mode);
122                 GOTO(err_open, rc = -ENOENT);
123         }
124         mds->mds_lov_objid_filp = file;
125
126         RETURN (0);
127 err_open:
128         if (filp_close((struct file *)file, 0))
129                 CERROR("can't close %s after error\n", LOV_OBJID);
130 err_free:
131         OBD_FREE(mds->mds_lov_page_array, size);
132 err_free_bitmap:
133         FREE_BITMAP(mds->mds_lov_page_dirty);
134
135         RETURN(rc);
136 }
137
138 void mds_lov_destroy_objids(struct obd_device *obd)
139 {
140         struct mds_obd *mds = &obd->u.mds;
141         int i, rc;
142         ENTRY;
143
144         if (mds->mds_lov_page_array != NULL) {
145                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
146                         obd_id *data = mds->mds_lov_page_array[i];
147                         if (data != NULL)
148                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
149                 }
150                 OBD_FREE(mds->mds_lov_page_array,
151                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
152         }
153
154         if (mds->mds_lov_objid_filp) {
155                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
156                 mds->mds_lov_objid_filp = NULL;
157                 if (rc)
158                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
159         }
160
161         FREE_BITMAP(mds->mds_lov_page_dirty);
162         EXIT;
163 }
164
165 /**
166  * currently exist two ways for know about ost count and max ost index.
167  * first - after ost is connected to mds and sync process finished
168  * second - get from lmm in recovery process, in case when mds not have configs,
169  * and ost isn't registered in mgs.
170  *
171  * \param mds pointer to mds structure
172  * \param index maxium ost index
173  *
174  * \retval -ENOMEM is not hame memory for new page
175  * \retval 0 is update passed
176  */
177 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
178 {
179         __u32 page = index / OBJID_PER_PAGE();
180         __u32 off = index % OBJID_PER_PAGE();
181         obd_id *data =  mds->mds_lov_page_array[page];
182
183         if (data == NULL) {
184                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
185                 if (data == NULL)
186                         RETURN(-ENOMEM);
187
188                 mds->mds_lov_page_array[page] = data;
189         }
190
191         if (index > mds->mds_lov_objid_max_index) {
192                 mds->mds_lov_objid_lastpage = page;
193                 mds->mds_lov_objid_lastidx = off;
194                 mds->mds_lov_objid_max_index = index;
195         }
196
197         /* workaround - New target not in objids file; increase mdsize */
198         /* ld_tgt_count is used as the max index everywhere, despite its name. */
199         if (data[off] == 0) {
200                 __u32 stripes;
201
202                 data[off] = 1;
203                 mds->mds_lov_objid_count++;
204                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
205                                 mds->mds_lov_objid_count);
206
207                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
208                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
209
210                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
211                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
212                        mds->mds_max_cookiesize);
213         }
214
215         EXIT;
216         return 0;
217 }
218
219 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
220 {
221         struct lov_ost_data_v1 *data;
222         __u32 count;
223         int rc = 0;
224         __u32 j;
225
226         /* if we create file without objects - lmm is NULL */
227         if (lmm == NULL)
228                 return 0;
229
230         switch (le32_to_cpu(lmm->lmm_magic)) {
231                 case LOV_MAGIC_V1:
232                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
233                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
234                         break;
235                 case LOV_MAGIC_V3:
236                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
237                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
238                         break;
239                 default:
240                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
241                         RETURN(-EINVAL);
242         }
243
244
245         mutex_down(&obd->obd_dev_sem);
246         for (j = 0; j < count; j++) {
247                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
248                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
249                         rc = -ENOMEM;
250                         break;
251                 }
252         }
253         mutex_up(&obd->obd_dev_sem);
254
255         RETURN(rc);
256 }
257 EXPORT_SYMBOL(mds_lov_prepare_objids);
258
259 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
260 {
261         struct mds_obd *mds = &obd->u.mds;
262         int j;
263         struct lov_ost_data_v1 *obj;
264         int count;
265         ENTRY;
266
267         /* if we create file without objects - lmm is NULL */
268         if (lmm == NULL)
269                 return;
270
271         switch (le32_to_cpu(lmm->lmm_magic)) {
272                 case LOV_MAGIC_V1:
273                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
274                         obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
275                         break;
276                 case LOV_MAGIC_V3:
277                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
278                         obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
279                         break;
280                 default:
281                         CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
282                         return;
283         }
284
285         for (j = 0; j < count; j++) {
286                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
287                 obd_id id = le64_to_cpu(obj[j].l_object_id);
288                 __u32 page = i / OBJID_PER_PAGE();
289                 __u32 idx = i % OBJID_PER_PAGE();
290                 obd_id *data;
291
292                 data = mds->mds_lov_page_array[page];
293
294                 CDEBUG(D_INODE,"update last object for ost %u"
295                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
296                 if (id > data[idx]) {
297                         data[idx] = id;
298                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
299                 }
300         }
301         EXIT;
302         return;
303 }
304 EXPORT_SYMBOL(mds_lov_update_objids);
305
306 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
307                                     __u32 count)
308 {
309         __u32 i;
310         __u32 stripes;
311
312         for(i = 0; i < count; i++) {
313                 if (data[i] == 0)
314                         continue;
315
316                 mds->mds_lov_objid_count++;
317         }
318
319         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
320                          mds->mds_lov_objid_count);
321
322         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
323         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
324
325         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
326                "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
327
328         EXIT;
329         return 0;
330 }
331
332 static int mds_lov_read_objids(struct obd_device *obd)
333 {
334         struct mds_obd *mds = &obd->u.mds;
335         loff_t off = 0;
336         int i, rc = 0, count = 0, page = 0;
337         unsigned long size;
338         ENTRY;
339
340         /* Read everything in the file, even if our current lov desc
341            has fewer targets. Old targets not in the lov descriptor
342            during mds setup may still have valid objids. */
343         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
344         if (size == 0)
345                 RETURN(0);
346
347         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
348         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
349         for (i = 0; i < page; i++) {
350                 obd_id *data;
351                 loff_t off_old = off;
352
353                 LASSERT(mds->mds_lov_page_array[i] == NULL);
354                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
355                 if (mds->mds_lov_page_array[i] == NULL)
356                         GOTO(out, rc = -ENOMEM);
357
358                 data = mds->mds_lov_page_array[i];
359
360                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
361                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
362                 if (rc < 0) {
363                         CERROR("Error reading objids %d\n", rc);
364                         GOTO(out, rc);
365                 }
366
367                 count += (off - off_old) / sizeof(obd_id);
368                 if (mds_lov_update_from_read(mds, data, count)) {
369                         CERROR("Can't update mds data\n");
370                         GOTO(out, rc = -EIO);
371                 }
372
373                 if (off == off_old)
374                         break; /* eof */
375         }
376         mds->mds_lov_objid_lastpage = i;
377         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
378
379         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
380                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
381 out:
382         mds_lov_dump_objids("read",obd);
383
384         RETURN(rc);
385 }
386
387 int mds_lov_write_objids(struct obd_device *obd)
388 {
389         struct mds_obd *mds = &obd->u.mds;
390         int i = 0, rc = 0;
391         ENTRY;
392
393         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
394                 RETURN(0);
395
396         mds_lov_dump_objids("write", obd);
397
398         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
399                 obd_id *data =  mds->mds_lov_page_array[i];
400                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
401                 loff_t off = i * size;
402
403                 LASSERT(data != NULL);
404
405                 /* check for particaly filled last page */
406                 if (i == mds->mds_lov_objid_lastpage)
407                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
408
409                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
410                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
411                                          size, &off, 0);
412                 if (rc < 0)
413                         break;
414                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
415         }
416         if (rc >= 0)
417                 rc = 0;
418
419         RETURN(rc);
420 }
421 EXPORT_SYMBOL(mds_lov_write_objids);
422
423 static int mds_lov_get_objid(struct obd_device * obd,
424                              obd_id idx)
425 {
426         struct mds_obd *mds = &obd->u.mds;
427         unsigned int page;
428         unsigned int off;
429         obd_id *data;
430         int rc = 0;
431         ENTRY;
432
433         page = idx / OBJID_PER_PAGE();
434         off = idx % OBJID_PER_PAGE();
435         data = mds->mds_lov_page_array[page];
436         if (data[off] < 2) {
437                 /* We never read this lastid; ask the osc */
438                 struct obd_id_info lastid;
439                 __u32 size = sizeof(lastid);
440
441                 lastid.idx = idx;
442                 lastid.data = &data[off];
443                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
444                                   KEY_LAST_ID, &size, &lastid, NULL);
445                 if (rc)
446                         GOTO(out, rc);
447
448                 /* workaround for clean filter */
449                 if (data[off] == 0)
450                         data[off] = 1;
451
452                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
453         }
454         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
455                idx, data, page, off, data[off]);
456 out:
457         RETURN(rc);
458 }
459
460 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
461 {
462         int rc;
463         struct obdo oa = { 0 };
464         struct obd_trans_info oti = {0};
465         struct lov_stripe_md  *empty_ea = NULL;
466         ENTRY;
467
468         LASSERT(mds->mds_lov_page_array != NULL);
469
470         /* This create will in fact either create or destroy:  If the OST is
471          * missing objects below this ID, they will be created.  If it finds
472          * objects above this ID, they will be removed. */
473         memset(&oa, 0, sizeof(oa));
474         oa.o_flags = OBD_FL_DELORPHAN;
475         oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
476         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
477         if (ost_uuid != NULL)
478                 oti.oti_ost_uuid = ost_uuid;
479
480         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
481
482         RETURN(rc);
483 }
484
485 /* for one target */
486 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
487 {
488         struct mds_obd *mds = &obd->u.mds;
489         int rc;
490         struct obd_id_info info;
491         ENTRY;
492
493         LASSERT(!obd->obd_recovering);
494
495         info.idx = idx;
496         info.data = id;
497         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
498                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
499         if (rc)
500                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
501                         obd->obd_name, rc);
502
503         RETURN(rc);
504 }
505
506 /* Update the lov desc for a new size lov. */
507 static int mds_lov_update_desc(struct obd_device *obd, int idx,
508                                struct obd_uuid *uuid, enum obd_notify_event ev)
509 {
510         struct mds_obd *mds = &obd->u.mds;
511         struct lov_desc *ld;
512         __u32 valsize = sizeof(mds->mds_lov_desc);
513         int rc = 0;
514         ENTRY;
515
516         OBD_ALLOC(ld, sizeof(*ld));
517         if (!ld)
518                 RETURN(-ENOMEM);
519
520         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
521                           &valsize, ld, NULL);
522         if (rc)
523                 GOTO(out, rc);
524
525         /* Don't change the mds_lov_desc until the objids size matches the
526            count (paranoia) */
527         mds->mds_lov_desc = *ld;
528         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
529                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
530
531         mutex_down(&obd->obd_dev_sem);
532         rc = mds_lov_update_max_ost(mds, idx);
533         mutex_up(&obd->obd_dev_sem);
534         if (rc != 0)
535                 GOTO(out, rc );
536
537         /* If we added a target we have to reconnect the llogs */
538         /* We only _need_ to do this at first add (idx), or the first time
539            after recovery.  However, it should now be safe to call anytime. */
540         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
541         if (rc)
542                 GOTO(out, rc);
543
544         /*XXX this notifies the MDD until lov handling use old mds code */
545         if (obd->obd_upcall.onu_owner) {
546                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
547                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
548                                                  obd->obd_upcall.onu_owner,
549                                                  &mds->mds_mount_count);
550         }
551 out:
552         OBD_FREE(ld, sizeof(*ld));
553         RETURN(rc);
554 }
555
556 /* Inform MDS about new/updated target */
557 static int mds_lov_update_mds(struct obd_device *obd,
558                               struct obd_device *watched,
559                               __u32 idx, enum obd_notify_event ev)
560 {
561         struct mds_obd *mds = &obd->u.mds;
562         int rc = 0;
563         int page;
564         int off;
565         obd_id *data;
566
567         ENTRY;
568
569         /* Don't let anyone else mess with mds_lov_objids now */
570         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev);
571         if (rc)
572                 GOTO(out, rc);
573
574         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
575                idx, obd->obd_recovering, obd->obd_async_recov,
576                mds->mds_lov_desc.ld_tgt_count);
577
578         /* idx is set as data from lov_notify. */
579         if (obd->obd_recovering)
580                 GOTO(out, rc);
581
582         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
583                 CERROR("index %d > count %d!\n", idx,
584                        mds->mds_lov_desc.ld_tgt_count);
585                 GOTO(out, rc = -EINVAL);
586         }
587
588         rc = mds_lov_get_objid(obd, idx);
589         if (rc)
590                 GOTO(out, rc);
591
592         page = idx / OBJID_PER_PAGE();
593         off = idx % OBJID_PER_PAGE();
594         data = mds->mds_lov_page_array[page];
595
596         /* We have read this lastid from disk; tell the osc.
597            Don't call this during recovery. */
598         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
599         if (rc) {
600                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
601                 /* Don't abort the rest of the sync */
602                 rc = 0;
603         } else {
604                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
605                         data[off], idx, rc);
606         }
607 out:
608         RETURN(rc);
609 }
610
611 /* update the LOV-OSC knowledge of the last used object id's */
612 int mds_lov_connect(struct obd_device *obd, char * lov_name)
613 {
614         struct mds_obd *mds = &obd->u.mds;
615         struct obd_connect_data *data;
616         int rc;
617         ENTRY;
618
619         if (IS_ERR(mds->mds_osc_obd))
620                 RETURN(PTR_ERR(mds->mds_osc_obd));
621
622         if (mds->mds_osc_obd)
623                 RETURN(0);
624
625         mds->mds_osc_obd = class_name2obd(lov_name);
626         if (!mds->mds_osc_obd) {
627                 CERROR("MDS cannot locate LOV %s\n", lov_name);
628                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
629                 RETURN(-ENOTCONN);
630         }
631
632         mutex_down(&obd->obd_dev_sem);
633         rc = mds_lov_read_objids(obd);
634         mutex_up(&obd->obd_dev_sem);
635         if (rc) {
636                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
637                 GOTO(err_exit, rc);
638         }
639
640         rc = obd_register_observer(mds->mds_osc_obd, obd);
641         if (rc) {
642                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
643                        lov_name, rc);
644                 GOTO(err_exit, rc);
645         }
646
647         /* try init too early */
648         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
649         if (rc)
650                 GOTO(err_exit, rc);
651
652         mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
653
654         OBD_ALLOC(data, sizeof(*data));
655         if (data == NULL)
656                 GOTO(err_exit, rc = -ENOMEM);
657
658         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
659                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
660                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
661                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
662                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
663                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
664                                   OBD_CONNECT_SOM;
665 #ifdef HAVE_LRU_RESIZE_SUPPORT
666         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
667 #endif
668         data->ocd_version = LUSTRE_VERSION_CODE;
669         data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
670         /* send max bytes per rpc */
671         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
672         /* send the list of supported checksum types */
673         data->ocd_cksum_types = OBD_CKSUM_ALL;
674         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
675         rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
676         OBD_FREE(data, sizeof(*data));
677         if (rc) {
678                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
679                 mds->mds_osc_obd = ERR_PTR(rc);
680                 RETURN(rc);
681         }
682
683         /* I want to see a callback happen when the OBD moves to a
684          * "For General Use" state, and that's when we'll call
685          * set_nextid().  The class driver can help us here, because
686          * it can use the obd_recovering flag to determine when the
687          * the OBD is full available. */
688         /* MDD device will care about that
689         if (!obd->obd_recovering)
690                 rc = mds_postrecov(obd);
691          */
692         RETURN(rc);
693
694 err_exit:
695         mds->mds_osc_exp = NULL;
696         mds->mds_osc_obd = ERR_PTR(rc);
697         RETURN(rc);
698 }
699
700 int mds_lov_disconnect(struct obd_device *obd)
701 {
702         struct mds_obd *mds = &obd->u.mds;
703         int rc = 0;
704         ENTRY;
705
706         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
707                 obd_register_observer(mds->mds_osc_obd, NULL);
708
709                 /* The actual disconnect of the mds_lov will be called from
710                  * class_disconnect_exports from mds_lov_clean. So we have to
711                  * ensure that class_cleanup doesn't fail due to the extra ref
712                  * we're holding now. The mechanism to do that already exists -
713                  * the obd_force flag. We'll drop the final ref to the
714                  * mds_osc_exp in mds_cleanup. */
715                 mds->mds_osc_obd->obd_force = 1;
716         }
717
718         RETURN(rc);
719 }
720
721 struct mds_lov_sync_info {
722         struct obd_device    *mlsi_obd;     /* the lov device to sync */
723         struct obd_device    *mlsi_watched; /* target osc */
724         __u32                 mlsi_index;   /* index of target */
725         enum obd_notify_event mlsi_ev;      /* event type */
726 };
727
728 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
729 {
730         struct mds_capa_info    info = { .uuid = uuid };
731         struct lustre_capa_key *key;
732         int i, rc = 0;
733
734         ENTRY;
735
736         if (!mds->mds_capa_keys)
737                 RETURN(0);
738
739         for (i = 0; i < 2; i++) {
740                 key = &mds->mds_capa_keys[i];
741                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
742
743                 info.capa = key;
744                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
745                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
746                 if (rc) {
747                         DEBUG_CAPA_KEY(D_ERROR, key,
748                                        "propagate failed (rc = %d) for", rc);
749                         RETURN(rc);
750                 }
751         }
752
753         RETURN(0);
754 }
755
756 /* We only sync one osc at a time, so that we don't have to hold
757    any kind of lock on the whole mds_lov_desc, which may change
758    (grow) as a result of mds_lov_add_ost.  This also avoids any
759    kind of mismatch between the lov_desc and the mds_lov_desc,
760    which are not in lock-step during lov_add_obd */
761 static int __mds_lov_synchronize(void *data)
762 {
763         struct mds_lov_sync_info *mlsi = data;
764         struct obd_device *obd = mlsi->mlsi_obd;
765         struct obd_device *watched = mlsi->mlsi_watched;
766         struct mds_obd *mds = &obd->u.mds;
767         struct obd_uuid *uuid;
768         __u32  idx = mlsi->mlsi_index;
769         enum obd_notify_event ev = mlsi->mlsi_ev;
770         struct mds_group_info mgi;
771         struct llog_ctxt *ctxt;
772         int rc = 0;
773         ENTRY;
774
775         OBD_FREE_PTR(mlsi);
776
777         LASSERT(obd);
778         LASSERT(watched);
779         uuid = &watched->u.cli.cl_target_uuid;
780         LASSERT(uuid);
781
782         down_read(&mds->mds_notify_lock);
783         if (obd->obd_stopping || obd->obd_fail)
784                 GOTO(out, rc = -ENODEV);
785
786         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
787         rc = mds_lov_update_mds(obd, watched, idx, ev);
788         if (rc != 0) {
789                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
790                 GOTO(out, rc);
791         }
792         mgi.group = mdt_to_obd_objgrp(mds->mds_id);
793         mgi.uuid = uuid;
794
795         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
796                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
797         if (rc != 0)
798                 GOTO(out, rc);
799         /* propagate capability keys */
800         rc = mds_propagate_capa_keys(mds, uuid);
801         if (rc)
802                 GOTO(out, rc);
803
804         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
805         if (!ctxt)
806                 GOTO(out, rc = -ENODEV);
807
808         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
809         rc = llog_connect(ctxt, NULL, NULL, uuid);
810         llog_ctxt_put(ctxt);
811         if (rc != 0) {
812                 CERROR("%s failed at llog_origin_connect: %d\n",
813                        obd_uuid2str(uuid), rc);
814                 GOTO(out, rc);
815         }
816
817         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
818               obd->obd_name, obd_uuid2str(uuid));
819         rc = mds_lov_clear_orphans(mds, uuid);
820         if (rc != 0) {
821                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
822                        obd_uuid2str(uuid), rc);
823                 GOTO(out, rc);
824         }
825
826 #ifdef HAVE_QUOTA_SUPPORT
827         if (obd->obd_upcall.onu_owner) { 
828                 /*
829                  * This is a hack for mds_notify->mdd_notify. When the mds obd
830                  * in mdd is removed, This hack should be removed.
831                  */
832                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
833                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
834                                                 obd->obd_upcall.onu_owner,NULL);
835         }
836 #endif
837         EXIT;
838 out:
839         up_read(&mds->mds_notify_lock);
840         if (rc) {
841                 /* Deactivate it for safety */
842                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
843                        rc);
844                 if (!obd->obd_stopping && mds->mds_osc_obd &&
845                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
846                         obd_notify(mds->mds_osc_obd, watched,
847                                    OBD_NOTIFY_INACTIVE, NULL);
848         }
849
850         class_decref(obd, "mds_lov_synchronize", obd);
851         return rc;
852 }
853
854 int mds_lov_synchronize(void *data)
855 {
856         struct mds_lov_sync_info *mlsi = data;
857         char name[20];
858
859         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
860         ptlrpc_daemonize(name);
861
862         RETURN(__mds_lov_synchronize(data));
863 }
864
865 int mds_lov_start_synchronize(struct obd_device *obd,
866                               struct obd_device *watched,
867                               void *data, enum obd_notify_event ev)
868 {
869         struct mds_lov_sync_info *mlsi;
870         int rc;
871         struct obd_uuid *uuid;
872         ENTRY;
873
874         LASSERT(watched);
875         uuid = &watched->u.cli.cl_target_uuid;
876
877         OBD_ALLOC(mlsi, sizeof(*mlsi));
878         if (mlsi == NULL)
879                 RETURN(-ENOMEM);
880
881         LASSERT(data);
882         mlsi->mlsi_obd = obd;
883         mlsi->mlsi_watched = watched;
884         mlsi->mlsi_index = *(__u32 *)data;
885         mlsi->mlsi_ev = ev;
886
887         /* Although class_export_get(obd->obd_self_export) would lock
888            the MDS in place, since it's only a self-export
889            it doesn't lock the LOV in place.  The LOV can be disconnected
890            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
891            Simply taking an export ref on the LOV doesn't help, because it's
892            still disconnected. Taking an obd reference insures that we don't
893            disconnect the LOV.  This of course means a cleanup won't
894            finish for as long as the sync is blocking. */
895         class_incref(obd, "mds_lov_synchronize", obd);
896
897         if (ev != OBD_NOTIFY_SYNC) {
898                 /* Synchronize in the background */
899                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
900                                        CLONE_VM | CLONE_FILES);
901                 if (rc < 0) {
902                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
903                                obd->obd_name, rc);
904                         class_decref(obd, "mds_lov_synchronize", obd);
905                 } else {
906                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
907                                "thread=%d\n", obd->obd_name,
908                                mlsi->mlsi_index, rc);
909                         rc = 0;
910                 }
911         } else {
912                 rc = __mds_lov_synchronize((void *)mlsi);
913         }
914
915         RETURN(rc);
916 }
917
918 int mds_notify(struct obd_device *obd, struct obd_device *watched,
919                enum obd_notify_event ev, void *data)
920 {
921         int rc = 0;
922         ENTRY;
923
924         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
925
926         switch (ev) {
927         /* We only handle these: */
928         case OBD_NOTIFY_ACTIVE:
929                 /* lov want one or more _active_ targets for work */
930                 /* activate event should be pass lov idx as argument */
931         case OBD_NOTIFY_SYNC:
932         case OBD_NOTIFY_SYNC_NONBLOCK:
933                 /* sync event should be pass lov idx as argument */
934                 break;
935         default:
936                 RETURN(0);
937         }
938
939         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
940                 CERROR("unexpected notification of %s %s!\n",
941                        watched->obd_type->typ_name, watched->obd_name);
942                 RETURN(-EINVAL);
943         }
944
945         if (obd->obd_recovering) {
946                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
947                       obd->obd_name,
948                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
949                 /* We still have to fix the lov descriptor for ost's added
950                    after the mdt in the config log.  They didn't make it into
951                    mds_lov_connect. */
952                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
953                                          &watched->u.cli.cl_target_uuid, ev);
954         } else {
955                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
956         }
957         RETURN(rc);
958 }