Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52
53 #include "mds_internal.h"
54
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
56 {
57         struct mds_obd *mds = &obd->u.mds;
58         unsigned int i=0, j;
59
60         CDEBUG(D_INFO, "dump from %s\n", label);
61         if (mds->mds_lov_page_dirty == NULL) {
62                 CERROR("NULL bitmap!\n");
63                 GOTO(skip_bitmap, i);
64         }
65
66         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
68 skip_bitmap:
69         if (mds->mds_lov_page_array == NULL) {
70                 CERROR("not init page array!\n");
71                 GOTO(skip_array, i);
72
73         }
74         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75                 obd_id *data = mds->mds_lov_page_array[i];
76
77                 if (data == NULL)
78                         continue;
79
80                 for(j=0; j < OBJID_PER_PAGE(); j++) {
81                         if (data[j] == 0)
82                                 continue;
83                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
84                 }
85         }
86 skip_array:
87         EXIT;
88 }
89
90 int mds_lov_init_objids(struct obd_device *obd)
91 {
92         struct mds_obd *mds = &obd->u.mds;
93         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
94         struct file *file;
95         int rc;
96         ENTRY;
97
98         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
99
100         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101         if (mds->mds_lov_page_dirty == NULL)
102                 RETURN(-ENOMEM);
103
104
105         OBD_ALLOC(mds->mds_lov_page_array, size);
106         if (mds->mds_lov_page_array == NULL)
107                 GOTO(err_free_bitmap, rc = -ENOMEM);
108
109         /* open and test the lov objd file */
110         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
111         if (IS_ERR(file)) {
112                 rc = PTR_ERR(file);
113                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114                 GOTO(err_free, rc = PTR_ERR(file));
115         }
116         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118                        file->f_dentry->d_inode->i_mode);
119                 GOTO(err_open, rc = -ENOENT);
120         }
121         mds->mds_lov_objid_filp = file;
122
123         RETURN (0);
124 err_open:
125         if (filp_close((struct file *)file, 0))
126                 CERROR("can't close %s after error\n", LOV_OBJID);
127 err_free:
128         OBD_FREE(mds->mds_lov_page_array, size);
129 err_free_bitmap:
130         FREE_BITMAP(mds->mds_lov_page_dirty);
131
132         RETURN(rc);
133 }
134
135 void mds_lov_destroy_objids(struct obd_device *obd)
136 {
137         struct mds_obd *mds = &obd->u.mds;
138         int i, rc;
139         ENTRY;
140
141         if (mds->mds_lov_page_array != NULL) {
142                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143                         obd_id *data = mds->mds_lov_page_array[i];
144                         if (data != NULL)
145                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
146                 }
147                 OBD_FREE(mds->mds_lov_page_array,
148                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
149         }
150
151         if (mds->mds_lov_objid_filp) {
152                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153                 mds->mds_lov_objid_filp = NULL;
154                 if (rc)
155                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
156         }
157
158         FREE_BITMAP(mds->mds_lov_page_dirty);
159         EXIT;
160 }
161
162 /**
163  * currently exist two ways for know about ost count and max ost index.
164  * first - after ost is connected to mds and sync process finished
165  * second - get from lmm in recovery process, in case when mds not have configs,
166  * and ost isn't registered in mgs.
167  *
168  * \param mds pointer to mds structure
169  * \param index maxium ost index
170  *
171  * \retval -ENOMEM is not hame memory for new page
172  * \retval 0 is update passed
173  */
174 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
175 {
176         __u32 page = index / OBJID_PER_PAGE();
177         __u32 off = index % OBJID_PER_PAGE();
178         obd_id *data =  mds->mds_lov_page_array[page];
179
180         if (data == NULL) {
181                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
182                 if (data == NULL)
183                         RETURN(-ENOMEM);
184
185                 mds->mds_lov_page_array[page] = data;
186         }
187
188         if (index > mds->mds_lov_objid_max_index) {
189                 mds->mds_lov_objid_lastpage = page;
190                 mds->mds_lov_objid_lastidx = off;
191                 mds->mds_lov_objid_max_index = index;
192         }
193
194         /* workaround - New target not in objids file; increase mdsize */
195         /* ld_tgt_count is used as the max index everywhere, despite its name. */
196         if (data[off] == 0) {
197                 __u32 stripes;
198
199                 data[off] = 1;
200                 mds->mds_lov_objid_count++;
201                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
202                                 mds->mds_lov_objid_count);
203
204                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
205                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
206
207                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
208                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
209                        mds->mds_max_cookiesize);
210         }
211
212         EXIT;
213         return 0;
214 }
215
216 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
217 {
218         struct lov_ost_data_v1 *data;
219         __u32 count;
220         int rc = 0;
221         __u32 j;
222
223         /* if we create file without objects - lmm is NULL */
224         if (lmm == NULL)
225                 return 0;
226
227         switch (le32_to_cpu(lmm->lmm_magic)) {
228                 case LOV_MAGIC_V1:
229                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
230                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
231                         break;
232                 case LOV_MAGIC_V3:
233                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
234                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
235                         break;
236                 default:
237                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
238                         RETURN(-EINVAL);
239         }
240
241
242         mutex_down(&obd->obd_dev_sem);
243         for (j = 0; j < count; j++) {
244                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
245                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
246                         rc = -ENOMEM;
247                         break;
248                 }
249         }
250         mutex_up(&obd->obd_dev_sem);
251
252         RETURN(rc);
253 }
254 EXPORT_SYMBOL(mds_lov_prepare_objids);
255
256 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
257 {
258         struct mds_obd *mds = &obd->u.mds;
259         int j;
260         struct lov_ost_data_v1 *obj;
261         int count;
262         ENTRY;
263
264         /* if we create file without objects - lmm is NULL */
265         if (lmm == NULL)
266                 return;
267
268         switch (le32_to_cpu(lmm->lmm_magic)) {
269                 case LOV_MAGIC_V1:
270                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
271                         obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
272                         break;
273                 case LOV_MAGIC_V3:
274                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
275                         obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
276                         break;
277                 default:
278                         CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
279                         return;
280         }
281
282         for (j = 0; j < count; j++) {
283                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
284                 obd_id id = le64_to_cpu(obj[j].l_object_id);
285                 __u32 page = i / OBJID_PER_PAGE();
286                 __u32 idx = i % OBJID_PER_PAGE();
287                 obd_id *data;
288
289                 data = mds->mds_lov_page_array[page];
290
291                 CDEBUG(D_INODE,"update last object for ost %u"
292                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
293                 if (id > data[idx]) {
294                         data[idx] = id;
295                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
296                 }
297         }
298         EXIT;
299         return;
300 }
301 EXPORT_SYMBOL(mds_lov_update_objids);
302
303
304 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
305                                     __u32 count)
306 {
307         __u32 i;
308         __u32 stripes;
309
310         for(i = 0; i < count; i++) {
311                 if (data[i] == 0)
312                         continue;
313
314                 mds->mds_lov_objid_count++;
315         }
316
317         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
318                          mds->mds_lov_objid_count);
319
320         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
321         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
322
323         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
324                "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
325
326         EXIT;
327         return 0;
328 }
329
330 static int mds_lov_read_objids(struct obd_device *obd)
331 {
332         struct mds_obd *mds = &obd->u.mds;
333         loff_t off = 0;
334         int i, rc, count = 0, page = 0;
335         unsigned long size;
336         ENTRY;
337
338         /* Read everything in the file, even if our current lov desc
339            has fewer targets. Old targets not in the lov descriptor
340            during mds setup may still have valid objids. */
341         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
342         if (size == 0)
343                 RETURN(0);
344
345         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
346         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
347
348         for (i = 0; i < page; i++) {
349                 loff_t off_old = off;
350
351                 LASSERT(mds->mds_lov_page_array[i] == NULL);
352                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
353                 if (mds->mds_lov_page_array[i] == NULL)
354                         GOTO(out, rc = -ENOMEM);
355
356                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, mds->mds_lov_page_array[i],
357                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
358                 if (rc < 0) {
359                         CERROR("Error reading objids %d\n", rc);
360                         GOTO(out, rc);
361                 }
362
363                 count += (off - off_old)/sizeof(obd_id);
364                 if (mds_lov_update_from_read(mds, mds->mds_lov_page_array[i], count)) {
365                          CERROR("Can't update mds data\n");
366                          GOTO(out, rc = -EIO);
367                  }
368
369                 if (off == off_old)
370                         break; // eof
371         }
372         mds->mds_lov_objid_lastpage = i;
373         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
374
375         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
376                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
377 out:
378         mds_lov_dump_objids("read",obd);
379
380         RETURN(0);
381 }
382
383 int mds_lov_write_objids(struct obd_device *obd)
384 {
385         struct mds_obd *mds = &obd->u.mds;
386         int i, rc = 0;
387         ENTRY;
388
389         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
390                 RETURN(0);
391
392         mds_lov_dump_objids("write", obd);
393
394         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
395                 obd_id *data =  mds->mds_lov_page_array[i];
396                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
397                 loff_t off = i * size;
398
399                 LASSERT(data != NULL);
400
401                 /* check for particaly filled last page */
402                 if (i == mds->mds_lov_objid_lastpage)
403                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
404
405                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
406                                          size, &off, 0);
407                 if (rc < 0)
408                         break;
409                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
410         }
411         if (rc >= 0)
412                 rc = 0;
413
414         RETURN(rc);
415 }
416 EXPORT_SYMBOL(mds_lov_write_objids);
417
418 static int mds_lov_get_objid(struct obd_device * obd,
419                              obd_id idx)
420 {
421         struct mds_obd *mds = &obd->u.mds;
422         unsigned int page;
423         unsigned int off;
424         obd_id *data;
425         int rc = 0;
426         ENTRY;
427
428         page = idx / OBJID_PER_PAGE();
429         off = idx % OBJID_PER_PAGE();
430         data = mds->mds_lov_page_array[page];
431         if (data[off] == 0) {
432                 /* We never read this lastid; ask the osc */
433                 struct obd_id_info lastid;
434                 __u32 size = sizeof(lastid);
435
436                 lastid.idx = idx;
437                 lastid.data = &data[off];
438                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
439                                   KEY_LAST_ID, &size, &lastid, NULL);
440                 if (rc)
441                         GOTO(out, rc);
442
443                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
444         }
445 out:
446         RETURN(rc);
447 }
448
449 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
450 {
451         int rc;
452         struct obdo oa;
453         struct obd_trans_info oti = {0};
454         struct lov_stripe_md  *empty_ea = NULL;
455         ENTRY;
456
457         LASSERT(mds->mds_lov_page_array != NULL);
458
459         /* This create will in fact either create or destroy:  If the OST is
460          * missing objects below this ID, they will be created.  If it finds
461          * objects above this ID, they will be removed. */
462         memset(&oa, 0, sizeof(oa));
463         oa.o_flags = OBD_FL_DELORPHAN;
464         oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
465         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
466         if (ost_uuid != NULL)
467                 oti.oti_ost_uuid = ost_uuid;
468         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
469
470         RETURN(rc);
471 }
472
473 /* for one target */
474 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
475 {
476         struct mds_obd *mds = &obd->u.mds;
477         int rc;
478         struct obd_id_info info;
479         ENTRY;
480
481         LASSERT(!obd->obd_recovering);
482
483         info.idx = idx;
484         info.data = id;
485         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
486                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
487         if (rc)
488                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
489                         obd->obd_name, rc);
490
491         RETURN(rc);
492 }
493
494 /* Update the lov desc for a new size lov. */
495 static int mds_lov_update_desc(struct obd_device *obd, int idx,
496                                struct obd_uuid *uuid)
497 {
498         struct mds_obd *mds = &obd->u.mds;
499         struct lov_desc *ld;
500         __u32 valsize = sizeof(mds->mds_lov_desc);
501         int rc = 0;
502         ENTRY;
503
504         OBD_ALLOC(ld, sizeof(*ld));
505         if (!ld)
506                 RETURN(-ENOMEM);
507
508         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
509                           &valsize, ld, NULL);
510         if (rc)
511                 GOTO(out, rc);
512
513         /* Don't change the mds_lov_desc until the objids size matches the
514            count (paranoia) */
515         mds->mds_lov_desc = *ld;
516         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
517                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
518
519         mutex_down(&obd->obd_dev_sem);
520         rc = mds_lov_update_max_ost(mds, idx);
521         mutex_up(&obd->obd_dev_sem);
522         if (rc != 0)
523                 GOTO(out, rc );
524
525
526         /* If we added a target we have to reconnect the llogs */
527         /* We only _need_ to do this at first add (idx), or the first time
528            after recovery.  However, it should now be safe to call anytime. */
529         rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
530         if (rc)
531                 GOTO(out, rc);
532
533         /*XXX this notifies the MDD until lov handling use old mds code */
534         if (obd->obd_upcall.onu_owner) {
535                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
536                  rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
537                                                  obd->obd_upcall.onu_owner);
538         }
539 out:
540         OBD_FREE(ld, sizeof(*ld));
541         RETURN(rc);
542 }
543
544 /* Inform MDS about new/updated target */
545 static int mds_lov_update_mds(struct obd_device *obd,
546                               struct obd_device *watched,
547                               __u32 idx)
548 {
549         struct mds_obd *mds = &obd->u.mds;
550         int rc = 0;
551         int page;
552         int off;
553         obd_id *data;
554
555         ENTRY;
556
557         /* Don't let anyone else mess with mds_lov_objids now */
558         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
559         if (rc)
560                 GOTO(out, rc);
561
562         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
563                idx, obd->obd_recovering, obd->obd_async_recov,
564                mds->mds_lov_desc.ld_tgt_count);
565
566         /* idx is set as data from lov_notify. */
567         if (obd->obd_recovering)
568                 GOTO(out, rc);
569
570         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
571                 CERROR("index %d > count %d!\n", idx,
572                        mds->mds_lov_desc.ld_tgt_count);
573                 GOTO(out, rc = -EINVAL);
574         }
575
576         rc = mds_lov_get_objid(obd, idx);
577         if (rc)
578                 GOTO(out, rc);
579
580         page = idx / OBJID_PER_PAGE();
581         off = idx % OBJID_PER_PAGE();
582         data = mds->mds_lov_page_array[page];
583
584         /* We have read this lastid from disk; tell the osc.
585            Don't call this during recovery. */
586         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
587         if (rc) {
588                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
589                 /* Don't abort the rest of the sync */
590                 rc = 0;
591         } else {
592                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
593                         data[off], idx, rc);
594         }
595 out:
596         RETURN(rc);
597 }
598
599 /* update the LOV-OSC knowledge of the last used object id's */
600 int mds_lov_connect(struct obd_device *obd, char * lov_name)
601 {
602         struct mds_obd *mds = &obd->u.mds;
603         struct lustre_handle conn = {0,};
604         struct obd_connect_data *data;
605         int rc;
606         ENTRY;
607
608         if (IS_ERR(mds->mds_osc_obd))
609                 RETURN(PTR_ERR(mds->mds_osc_obd));
610
611         if (mds->mds_osc_obd)
612                 RETURN(0);
613
614         mds->mds_osc_obd = class_name2obd(lov_name);
615         if (!mds->mds_osc_obd) {
616                 CERROR("MDS cannot locate LOV %s\n", lov_name);
617                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
618                 RETURN(-ENOTCONN);
619         }
620
621         mutex_down(&obd->obd_dev_sem);
622         rc = mds_lov_read_objids(obd);
623         mutex_up(&obd->obd_dev_sem);
624         if (rc) {
625                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
626                 GOTO(err_exit, rc);
627         }
628
629         rc = obd_register_observer(mds->mds_osc_obd, obd);
630         if (rc) {
631                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
632                        lov_name, rc);
633                 GOTO(err_exit, rc);
634         }
635
636         mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
637
638         OBD_ALLOC(data, sizeof(*data));
639         if (data == NULL)
640                 RETURN(-ENOMEM);
641         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
642                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
643                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
644                                   OBD_CONNECT_AT | OBD_CONNECT_CHANGE_QS;
645 #ifdef HAVE_LRU_RESIZE_SUPPORT
646         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
647 #endif
648         data->ocd_version = LUSTRE_VERSION_CODE;
649         data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
650         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
651         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
652         OBD_FREE(data, sizeof(*data));
653         if (rc) {
654                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
655                 mds->mds_osc_obd = ERR_PTR(rc);
656                 RETURN(rc);
657         }
658         mds->mds_osc_exp = class_conn2export(&conn);
659
660         /* I want to see a callback happen when the OBD moves to a
661          * "For General Use" state, and that's when we'll call
662          * set_nextid().  The class driver can help us here, because
663          * it can use the obd_recovering flag to determine when the
664          * the OBD is full available. */
665         /* MDD device will care about that
666         if (!obd->obd_recovering)
667                 rc = mds_postrecov(obd);
668          */
669         RETURN(rc);
670
671 err_exit:
672         mds->mds_osc_exp = NULL;
673         mds->mds_osc_obd = ERR_PTR(rc);
674         RETURN(rc);
675 }
676
677 int mds_lov_disconnect(struct obd_device *obd)
678 {
679         struct mds_obd *mds = &obd->u.mds;
680         int rc = 0;
681         ENTRY;
682
683         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
684                 obd_register_observer(mds->mds_osc_obd, NULL);
685
686                 /* The actual disconnect of the mds_lov will be called from
687                  * class_disconnect_exports from mds_lov_clean. So we have to
688                  * ensure that class_cleanup doesn't fail due to the extra ref
689                  * we're holding now. The mechanism to do that already exists -
690                  * the obd_force flag. We'll drop the final ref to the
691                  * mds_osc_exp in mds_cleanup. */
692                 mds->mds_osc_obd->obd_force = 1;
693         }
694
695         RETURN(rc);
696 }
697
698 struct mds_lov_sync_info {
699         struct obd_device *mlsi_obd;     /* the lov device to sync */
700         struct obd_device *mlsi_watched; /* target osc */
701         __u32              mlsi_index;   /* index of target */
702 };
703
704 static int mds_propagate_capa_keys(struct mds_obd *mds)
705 {
706         struct lustre_capa_key *key;
707         int i, rc = 0;
708
709         ENTRY;
710
711         if (!mds->mds_capa_keys)
712                 RETURN(0);
713
714         for (i = 0; i < 2; i++) {
715                 key = &mds->mds_capa_keys[i];
716                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
717
718                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
719                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
720                 if (rc) {
721                         DEBUG_CAPA_KEY(D_ERROR, key,
722                                        "propagate failed (rc = %d) for", rc);
723                         RETURN(rc);
724                 }
725         }
726
727         RETURN(0);
728 }
729
730 /* We only sync one osc at a time, so that we don't have to hold
731    any kind of lock on the whole mds_lov_desc, which may change
732    (grow) as a result of mds_lov_add_ost.  This also avoids any
733    kind of mismatch between the lov_desc and the mds_lov_desc,
734    which are not in lock-step during lov_add_obd */
735 static int __mds_lov_synchronize(void *data)
736 {
737         struct mds_lov_sync_info *mlsi = data;
738         struct obd_device *obd = mlsi->mlsi_obd;
739         struct obd_device *watched = mlsi->mlsi_watched;
740         struct mds_obd *mds = &obd->u.mds;
741         struct obd_uuid *uuid;
742         __u32  idx = mlsi->mlsi_index;
743         struct mds_group_info mgi;
744         struct llog_ctxt *ctxt;
745         int rc = 0;
746         ENTRY;
747
748         OBD_FREE_PTR(mlsi);
749
750         LASSERT(obd);
751         LASSERT(watched);
752         uuid = &watched->u.cli.cl_target_uuid;
753         LASSERT(uuid);
754
755         down_read(&mds->mds_notify_lock);
756         if (obd->obd_stopping || obd->obd_fail)
757                 GOTO(out, rc = -ENODEV);
758
759         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
760         rc = mds_lov_update_mds(obd, watched, idx);
761         if (rc != 0) {
762                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
763                 GOTO(out, rc);
764         }
765         mgi.group = mdt_to_obd_objgrp(mds->mds_id);
766         mgi.uuid = uuid;
767
768         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
769                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
770         if (rc != 0)
771                 GOTO(out, rc);
772         /* propagate capability keys */
773         rc = mds_propagate_capa_keys(mds);
774         if (rc)
775                 GOTO(out, rc);
776
777         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
778         if (!ctxt)
779                 GOTO(out, rc = -ENODEV);
780
781         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
782         rc = llog_connect(ctxt, NULL, NULL, uuid);
783         llog_ctxt_put(ctxt);
784         if (rc != 0) {
785                 CERROR("%s failed at llog_origin_connect: %d\n",
786                        obd_uuid2str(uuid), rc);
787                 GOTO(out, rc);
788         }
789
790         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
791               obd->obd_name, obd_uuid2str(uuid));
792         rc = mds_lov_clear_orphans(mds, uuid);
793         if (rc != 0) {
794                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
795                        obd_uuid2str(uuid), rc);
796                 GOTO(out, rc);
797         }
798
799         if (obd->obd_upcall.onu_owner) {
800                 /*
801                  * This is a hack for mds_notify->mdd_notify. When the mds obd
802                  * in mdd is removed, This hack should be removed.
803                  */
804                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
805                  rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
806                                                  obd->obd_upcall.onu_owner);
807         }
808         EXIT;
809 out:
810         up_read(&mds->mds_notify_lock);
811         if (rc) {
812                 /* Deactivate it for safety */
813                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
814                        rc);
815                 if (!obd->obd_stopping && mds->mds_osc_obd &&
816                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
817                         obd_notify(mds->mds_osc_obd, watched,
818                                    OBD_NOTIFY_INACTIVE, NULL);
819         }
820
821         class_decref(obd, "mds_lov_synchronize", obd);
822         return rc;
823 }
824
825 int mds_lov_synchronize(void *data)
826 {
827         struct mds_lov_sync_info *mlsi = data;
828         char name[20];
829
830         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
831         ptlrpc_daemonize(name);
832
833         RETURN(__mds_lov_synchronize(data));
834 }
835
836 int mds_lov_start_synchronize(struct obd_device *obd,
837                               struct obd_device *watched,
838                               void *data, int nonblock)
839 {
840         struct mds_lov_sync_info *mlsi;
841         int rc;
842         struct obd_uuid *uuid;
843         ENTRY;
844
845         LASSERT(watched);
846         uuid = &watched->u.cli.cl_target_uuid;
847
848         OBD_ALLOC(mlsi, sizeof(*mlsi));
849         if (mlsi == NULL)
850                 RETURN(-ENOMEM);
851
852         LASSERT(data);
853         mlsi->mlsi_obd = obd;
854         mlsi->mlsi_watched = watched;
855         mlsi->mlsi_index = *(__u32 *)data;
856
857         /* Although class_export_get(obd->obd_self_export) would lock
858            the MDS in place, since it's only a self-export
859            it doesn't lock the LOV in place.  The LOV can be disconnected
860            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
861            Simply taking an export ref on the LOV doesn't help, because it's
862            still disconnected. Taking an obd reference insures that we don't
863            disconnect the LOV.  This of course means a cleanup won't
864            finish for as long as the sync is blocking. */
865         class_incref(obd, "mds_lov_synchronize", obd);
866
867         if (nonblock) {
868                 /* Synchronize in the background */
869                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
870                                        CLONE_VM | CLONE_FILES);
871                 if (rc < 0) {
872                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
873                                obd->obd_name, rc);
874                         class_decref(obd, "mds_lov_synchronize", obd);
875                 } else {
876                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
877                                "thread=%d\n", obd->obd_name,
878                                mlsi->mlsi_index, rc);
879                         rc = 0;
880                 }
881         } else {
882                 rc = __mds_lov_synchronize((void *)mlsi);
883         }
884
885         RETURN(rc);
886 }
887
888 int mds_notify(struct obd_device *obd, struct obd_device *watched,
889                enum obd_notify_event ev, void *data)
890 {
891         int rc = 0;
892         ENTRY;
893
894         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
895
896         switch (ev) {
897         /* We only handle these: */
898         case OBD_NOTIFY_ACTIVE:
899                 /* lov want one or more _active_ targets for work */
900                 /* activate event should be pass lov idx as argument */
901         case OBD_NOTIFY_SYNC:
902         case OBD_NOTIFY_SYNC_NONBLOCK:
903                 /* sync event should be pass lov idx as argument */
904                 break;
905         case OBD_NOTIFY_CONFIG:
906         default:
907                 RETURN(0);
908         }
909
910         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
911                 CERROR("unexpected notification of %s %s!\n",
912                        watched->obd_type->typ_name, watched->obd_name);
913                 RETURN(-EINVAL);
914         }
915
916         if (obd->obd_recovering) {
917                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
918                       obd->obd_name,
919                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
920                 /* We still have to fix the lov descriptor for ost's added
921                    after the mdt in the config log.  They didn't make it into
922                    mds_lov_connect. */
923                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
924                                         &watched->u.cli.cl_target_uuid);
925                 RETURN(rc);
926         }
927
928         rc = mds_lov_start_synchronize(obd, watched, data,
929                                        !(ev == OBD_NOTIFY_SYNC));
930
931         RETURN(rc);
932 }