Whamcloud - gitweb
not send LOV EA under replay, we can't know about they size at this
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52
53 #include "mds_internal.h"
54
55 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
56 {
57         struct mds_obd *mds = &obd->u.mds;
58         unsigned int i=0, j;
59
60         CDEBUG(D_INFO, "dump from %s\n", label);
61         if (mds->mds_lov_page_dirty == NULL) {
62                 CERROR("NULL bitmap!\n");
63                 GOTO(skip_bitmap, i);
64         }
65
66         for(i=0;i<((mds->mds_lov_page_dirty->size/BITS_PER_LONG)+1);i++)
67                 CDEBUG(D_INFO, "%u - %lx\n", i, mds->mds_lov_page_dirty->data[i]);
68 skip_bitmap:
69         if (mds->mds_lov_page_array == NULL) {
70                 CERROR("not init page array!\n");
71                 GOTO(skip_array, i);
72
73         }
74         for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
75                 obd_id *data = mds->mds_lov_page_array[i];
76
77                 if (data == NULL)
78                         continue;
79
80                 for(j=0; j < OBJID_PER_PAGE(); j++) {
81                         if (data[j] == 0)
82                                 continue;
83                         CDEBUG(D_INFO,"objid page %u idx %u - %llu \n", i,j,data[j]);
84                 }
85         }
86 skip_array:
87         EXIT;
88 }
89
90 int mds_lov_init_objids(struct obd_device *obd)
91 {
92         struct mds_obd *mds = &obd->u.mds;
93         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
94         struct file *file;
95         int rc;
96         ENTRY;
97
98         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
99
100         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
101         if (mds->mds_lov_page_dirty == NULL)
102                 RETURN(-ENOMEM);
103
104
105         OBD_ALLOC(mds->mds_lov_page_array, size);
106         if (mds->mds_lov_page_array == NULL)
107                 GOTO(err_free_bitmap, rc = -ENOMEM);
108
109         /* open and test the lov objd file */
110         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
111         if (IS_ERR(file)) {
112                 rc = PTR_ERR(file);
113                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
114                 GOTO(err_free, rc = PTR_ERR(file));
115         }
116         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
117                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
118                        file->f_dentry->d_inode->i_mode);
119                 GOTO(err_open, rc = -ENOENT);
120         }
121         mds->mds_lov_objid_filp = file;
122
123         RETURN (0);
124 err_open:
125         if (filp_close((struct file *)file, 0))
126                 CERROR("can't close %s after error\n", LOV_OBJID);
127 err_free:
128         OBD_FREE(mds->mds_lov_page_array, size);
129 err_free_bitmap:
130         FREE_BITMAP(mds->mds_lov_page_dirty);
131
132         RETURN(rc);
133 }
134
135 void mds_lov_destroy_objids(struct obd_device *obd)
136 {
137         struct mds_obd *mds = &obd->u.mds;
138         int i, rc;
139         ENTRY;
140
141         if (mds->mds_lov_page_array != NULL) {
142                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
143                         obd_id *data = mds->mds_lov_page_array[i];
144                         if (data != NULL)
145                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
146                 }
147                 OBD_FREE(mds->mds_lov_page_array,
148                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
149         }
150
151         if (mds->mds_lov_objid_filp) {
152                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
153                 mds->mds_lov_objid_filp = NULL;
154                 if (rc)
155                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
156         }
157
158         FREE_BITMAP(mds->mds_lov_page_dirty);
159         EXIT;
160 }
161
162 /**
163  * currently exist two ways for know about ost count and max ost index.
164  * first - after ost is connected to mds and sync process finished
165  * second - get from lmm in recovery process, in case when mds not have configs,
166  * and ost isn't registered in mgs.
167  *
168  * \param mds pointer to mds structure
169  * \param index maxium ost index
170  *
171  * \retval -ENOMEM is not hame memory for new page
172  * \retval 0 is update passed
173  */
174 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
175 {
176         __u32 page = index / OBJID_PER_PAGE();
177         __u32 off = index % OBJID_PER_PAGE();
178         obd_id *data =  mds->mds_lov_page_array[page];
179
180         if (data == NULL) {
181                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
182                 if (data == NULL)
183                         RETURN(-ENOMEM);
184
185                 mds->mds_lov_page_array[page] = data;
186         }
187
188         if (index > mds->mds_lov_objid_max_index) {
189                 mds->mds_lov_objid_lastpage = page;
190                 mds->mds_lov_objid_lastidx = off;
191                 mds->mds_lov_objid_max_index = index;
192         }
193
194         /* workaround - New target not in objids file; increase mdsize */
195         /* ld_tgt_count is used as the max index everywhere, despite its name. */
196         if (data[off] == 0) {
197                 __u32 stripes;
198
199                 data[off] = 1;
200                 mds->mds_lov_objid_count++;
201                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
202                                 mds->mds_lov_objid_count);
203
204                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
205                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
206
207                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
208                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
209                        mds->mds_max_cookiesize);
210         }
211
212         EXIT;
213         return 0;
214 }
215
216 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
217 {
218         struct lov_ost_data_v1 *data;
219         __u32 count;
220         int rc = 0;
221         __u32 j;
222
223         /* if we create file without objects - lmm is NULL */
224         if (lmm == NULL)
225                 return 0;
226
227         switch (le32_to_cpu(lmm->lmm_magic)) {
228                 case LOV_MAGIC_V1:
229                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
230                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
231                         break;
232                 case LOV_MAGIC_V3:
233                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
234                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
235                         break;
236                 default:
237                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
238                         RETURN(-EINVAL);
239         }
240
241
242         mutex_down(&obd->obd_dev_sem);
243         for (j = 0; j < count; j++) {
244                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
245                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
246                         rc = -ENOMEM;
247                         break;
248                 }
249         }
250         mutex_up(&obd->obd_dev_sem);
251
252         RETURN(rc);
253 }
254 EXPORT_SYMBOL(mds_lov_prepare_objids);
255
256 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
257 {
258         struct mds_obd *mds = &obd->u.mds;
259         int j;
260         struct lov_ost_data_v1 *obj;
261         int count;
262         ENTRY;
263
264         /* if we create file without objects - lmm is NULL */
265         if (lmm == NULL)
266                 return;
267
268         switch (le32_to_cpu(lmm->lmm_magic)) {
269                 case LOV_MAGIC_V1:
270                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
271                         obj = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
272                         break;
273                 case LOV_MAGIC_V3:
274                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
275                         obj = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
276                         break;
277                 default:
278                         CERROR("Unknow lmm type %X !\n", le32_to_cpu(lmm->lmm_magic));
279                         return;
280         }
281
282         for (j = 0; j < count; j++) {
283                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
284                 obd_id id = le64_to_cpu(obj[j].l_object_id);
285                 __u32 page = i / OBJID_PER_PAGE();
286                 __u32 idx = i % OBJID_PER_PAGE();
287                 obd_id *data;
288
289                 data = mds->mds_lov_page_array[page];
290
291                 CDEBUG(D_INODE,"update last object for ost %u"
292                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
293                 if (id > data[idx]) {
294                         data[idx] = id;
295                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
296                 }
297         }
298         EXIT;
299         return;
300 }
301 EXPORT_SYMBOL(mds_lov_update_objids);
302
303
304 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
305                                     __u32 count)
306 {
307         __u32 i;
308         __u32 stripes;
309
310         for(i = 0; i < count; i++) {
311                 if (data[i] == 0)
312                         continue;
313
314                 mds->mds_lov_objid_count++;
315         }
316
317         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
318                          mds->mds_lov_objid_count);
319
320         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
321         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
322
323         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
324                "%d/%d\n", stripes, mds->mds_max_mdsize, mds->mds_max_cookiesize);
325
326         EXIT;
327         return 0;
328 }
329
330 static int mds_lov_read_objids(struct obd_device *obd)
331 {
332         struct mds_obd *mds = &obd->u.mds;
333         loff_t off = 0;
334         int i, rc, count = 0, page = 0;
335         unsigned long size;
336         ENTRY;
337
338         /* Read everything in the file, even if our current lov desc
339            has fewer targets. Old targets not in the lov descriptor
340            during mds setup may still have valid objids. */
341         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
342         if (size == 0)
343                 RETURN(0);
344
345         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
346         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
347
348         for (i = 0; i < page; i++) {
349                 loff_t off_old = off;
350
351                 LASSERT(mds->mds_lov_page_array[i] == NULL);
352                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
353                 if (mds->mds_lov_page_array[i] == NULL)
354                         GOTO(out, rc = -ENOMEM);
355
356                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, mds->mds_lov_page_array[i],
357                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
358                 if (rc < 0) {
359                         CERROR("Error reading objids %d\n", rc);
360                         GOTO(out, rc);
361                 }
362
363                 count += (off - off_old)/sizeof(obd_id);
364                 if (mds_lov_update_from_read(mds, mds->mds_lov_page_array[i], count)) {
365                          CERROR("Can't update mds data\n");
366                          GOTO(out, rc = -EIO);
367                  }
368
369                 if (off == off_old)
370                         break; // eof
371         }
372         mds->mds_lov_objid_lastpage = i;
373         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
374
375         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
376                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
377 out:
378         mds_lov_dump_objids("read",obd);
379
380         RETURN(0);
381 }
382
383 int mds_lov_write_objids(struct obd_device *obd)
384 {
385         struct mds_obd *mds = &obd->u.mds;
386         int i, rc = 0;
387         ENTRY;
388
389         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
390                 RETURN(0);
391
392         mds_lov_dump_objids("write", obd);
393
394         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
395                 obd_id *data =  mds->mds_lov_page_array[i];
396                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
397                 loff_t off = i * size;
398
399                 LASSERT(data != NULL);
400
401                 /* check for particaly filled last page */
402                 if (i == mds->mds_lov_objid_lastpage)
403                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
404
405                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
406                                          size, &off, 0);
407                 if (rc < 0)
408                         break;
409                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
410         }
411         if (rc >= 0)
412                 rc = 0;
413
414         RETURN(rc);
415 }
416 EXPORT_SYMBOL(mds_lov_write_objids);
417
418 static int mds_lov_get_objid(struct obd_device * obd,
419                              obd_id idx)
420 {
421         struct mds_obd *mds = &obd->u.mds;
422         unsigned int page;
423         unsigned int off;
424         obd_id *data;
425         int rc = 0;
426         ENTRY;
427
428         page = idx / OBJID_PER_PAGE();
429         off = idx % OBJID_PER_PAGE();
430         data = mds->mds_lov_page_array[page];
431         if (data[off] == 0) {
432                 /* We never read this lastid; ask the osc */
433                 struct obd_id_info lastid;
434                 __u32 size = sizeof(lastid);
435
436                 lastid.idx = idx;
437                 lastid.data = &data[off];
438                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
439                                   KEY_LAST_ID, &size, &lastid, NULL);
440                 if (rc)
441                         GOTO(out, rc);
442
443                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
444         }
445 out:
446         RETURN(rc);
447 }
448
449 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
450 {
451         int rc;
452         struct obdo oa;
453         struct obd_trans_info oti = {0};
454         struct lov_stripe_md  *empty_ea = NULL;
455         ENTRY;
456
457         LASSERT(mds->mds_lov_page_array != NULL);
458
459         /* This create will in fact either create or destroy:  If the OST is
460          * missing objects below this ID, they will be created.  If it finds
461          * objects above this ID, they will be removed. */
462         memset(&oa, 0, sizeof(oa));
463         oa.o_flags = OBD_FL_DELORPHAN;
464         oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
465         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
466         if (ost_uuid != NULL)
467                 oti.oti_ost_uuid = ost_uuid;
468         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
469
470         RETURN(rc);
471 }
472
473 /* for one target */
474 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
475 {
476         struct mds_obd *mds = &obd->u.mds;
477         int rc;
478         struct obd_id_info info;
479         ENTRY;
480
481         LASSERT(!obd->obd_recovering);
482
483         info.idx = idx;
484         info.data = id;
485         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
486                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
487         if (rc)
488                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
489                         obd->obd_name, rc);
490
491         RETURN(rc);
492 }
493
494 /* Update the lov desc for a new size lov. */
495 static int mds_lov_update_desc(struct obd_device *obd, int idx,
496                                struct obd_uuid *uuid)
497 {
498         struct mds_obd *mds = &obd->u.mds;
499         struct lov_desc *ld;
500         __u32 valsize = sizeof(mds->mds_lov_desc);
501         int rc = 0;
502         ENTRY;
503
504         OBD_ALLOC(ld, sizeof(*ld));
505         if (!ld)
506                 RETURN(-ENOMEM);
507
508         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
509                           &valsize, ld, NULL);
510         if (rc)
511                 GOTO(out, rc);
512
513         /* Don't change the mds_lov_desc until the objids size matches the
514            count (paranoia) */
515         mds->mds_lov_desc = *ld;
516         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
517                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
518
519         mutex_down(&obd->obd_dev_sem);
520         rc = mds_lov_update_max_ost(mds, idx);
521         mutex_up(&obd->obd_dev_sem);
522         if (rc != 0)
523                 GOTO(out, rc );
524
525
526         /* If we added a target we have to reconnect the llogs */
527         /* We only _need_ to do this at first add (idx), or the first time
528            after recovery.  However, it should now be safe to call anytime. */
529         rc = llog_cat_initialize(obd, &obd->obd_olg, idx, uuid);
530         if (rc)
531                 GOTO(out, rc);
532
533         /*XXX this notifies the MDD until lov handling use old mds code */
534         if (obd->obd_upcall.onu_owner) {
535                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
536                  rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
537                                                  obd->obd_upcall.onu_owner);
538         }
539 out:
540         OBD_FREE(ld, sizeof(*ld));
541         RETURN(rc);
542 }
543
544 /* Inform MDS about new/updated target */
545 static int mds_lov_update_mds(struct obd_device *obd,
546                               struct obd_device *watched,
547                               __u32 idx)
548 {
549         struct mds_obd *mds = &obd->u.mds;
550         int rc = 0;
551         int page;
552         int off;
553         obd_id *data;
554
555         ENTRY;
556
557         /* Don't let anyone else mess with mds_lov_objids now */
558         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
559         if (rc)
560                 GOTO(out, rc);
561
562         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
563                idx, obd->obd_recovering, obd->obd_async_recov,
564                mds->mds_lov_desc.ld_tgt_count);
565
566         /* idx is set as data from lov_notify. */
567         if (obd->obd_recovering)
568                 GOTO(out, rc);
569
570         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
571                 CERROR("index %d > count %d!\n", idx,
572                        mds->mds_lov_desc.ld_tgt_count);
573                 GOTO(out, rc = -EINVAL);
574         }
575
576         rc = mds_lov_get_objid(obd, idx);
577         if (rc)
578                 GOTO(out, rc);
579
580         page = idx / OBJID_PER_PAGE();
581         off = idx % OBJID_PER_PAGE();
582         data = mds->mds_lov_page_array[page];
583
584         /* We have read this lastid from disk; tell the osc.
585            Don't call this during recovery. */
586         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
587         if (rc) {
588                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
589                 /* Don't abort the rest of the sync */
590                 rc = 0;
591         } else {
592                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
593                         data[off], idx, rc);
594         }
595 out:
596         RETURN(rc);
597 }
598
599 /* update the LOV-OSC knowledge of the last used object id's */
600 int mds_lov_connect(struct obd_device *obd, char * lov_name)
601 {
602         struct mds_obd *mds = &obd->u.mds;
603         struct lustre_handle conn = {0,};
604         struct obd_connect_data *data;
605         int rc;
606         ENTRY;
607
608         if (IS_ERR(mds->mds_osc_obd))
609                 RETURN(PTR_ERR(mds->mds_osc_obd));
610
611         if (mds->mds_osc_obd)
612                 RETURN(0);
613
614         mds->mds_osc_obd = class_name2obd(lov_name);
615         if (!mds->mds_osc_obd) {
616                 CERROR("MDS cannot locate LOV %s\n", lov_name);
617                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
618                 RETURN(-ENOTCONN);
619         }
620
621         mutex_down(&obd->obd_dev_sem);
622         rc = mds_lov_read_objids(obd);
623         mutex_up(&obd->obd_dev_sem);
624         if (rc) {
625                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
626                 GOTO(err_exit, rc);
627         }
628
629         rc = obd_register_observer(mds->mds_osc_obd, obd);
630         if (rc) {
631                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
632                        lov_name, rc);
633                 GOTO(err_exit, rc);
634         }
635
636         OBD_ALLOC(data, sizeof(*data));
637         if (data == NULL)
638                 RETURN(-ENOMEM);
639         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
640                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
641                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
642                                   OBD_CONNECT_AT | OBD_CONNECT_CHANGE_QS;
643 #ifdef HAVE_LRU_RESIZE_SUPPORT
644         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
645 #endif
646         data->ocd_version = LUSTRE_VERSION_CODE;
647         data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
648         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
649         rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
650         OBD_FREE(data, sizeof(*data));
651         if (rc) {
652                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
653                 mds->mds_osc_obd = ERR_PTR(rc);
654                 RETURN(rc);
655         }
656         mds->mds_osc_exp = class_conn2export(&conn);
657
658         /* I want to see a callback happen when the OBD moves to a
659          * "For General Use" state, and that's when we'll call
660          * set_nextid().  The class driver can help us here, because
661          * it can use the obd_recovering flag to determine when the
662          * the OBD is full available. */
663         /* MDD device will care about that
664         if (!obd->obd_recovering)
665                 rc = mds_postrecov(obd);
666          */
667         RETURN(rc);
668
669 err_exit:
670         mds->mds_osc_exp = NULL;
671         mds->mds_osc_obd = ERR_PTR(rc);
672         RETURN(rc);
673 }
674
675 int mds_lov_disconnect(struct obd_device *obd)
676 {
677         struct mds_obd *mds = &obd->u.mds;
678         int rc = 0;
679         ENTRY;
680
681         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
682                 obd_register_observer(mds->mds_osc_obd, NULL);
683
684                 /* The actual disconnect of the mds_lov will be called from
685                  * class_disconnect_exports from mds_lov_clean. So we have to
686                  * ensure that class_cleanup doesn't fail due to the extra ref
687                  * we're holding now. The mechanism to do that already exists -
688                  * the obd_force flag. We'll drop the final ref to the
689                  * mds_osc_exp in mds_cleanup. */
690                 mds->mds_osc_obd->obd_force = 1;
691         }
692
693         RETURN(rc);
694 }
695
696 struct mds_lov_sync_info {
697         struct obd_device *mlsi_obd;     /* the lov device to sync */
698         struct obd_device *mlsi_watched; /* target osc */
699         __u32              mlsi_index;   /* index of target */
700 };
701
702 static int mds_propagate_capa_keys(struct mds_obd *mds)
703 {
704         struct lustre_capa_key *key;
705         int i, rc = 0;
706
707         ENTRY;
708
709         if (!mds->mds_capa_keys)
710                 RETURN(0);
711
712         for (i = 0; i < 2; i++) {
713                 key = &mds->mds_capa_keys[i];
714                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
715
716                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
717                                         KEY_CAPA_KEY, sizeof(*key), key, NULL);
718                 if (rc) {
719                         DEBUG_CAPA_KEY(D_ERROR, key,
720                                        "propagate failed (rc = %d) for", rc);
721                         RETURN(rc);
722                 }
723         }
724
725         RETURN(0);
726 }
727
728 /* We only sync one osc at a time, so that we don't have to hold
729    any kind of lock on the whole mds_lov_desc, which may change
730    (grow) as a result of mds_lov_add_ost.  This also avoids any
731    kind of mismatch between the lov_desc and the mds_lov_desc,
732    which are not in lock-step during lov_add_obd */
733 static int __mds_lov_synchronize(void *data)
734 {
735         struct mds_lov_sync_info *mlsi = data;
736         struct obd_device *obd = mlsi->mlsi_obd;
737         struct obd_device *watched = mlsi->mlsi_watched;
738         struct mds_obd *mds = &obd->u.mds;
739         struct obd_uuid *uuid;
740         __u32  idx = mlsi->mlsi_index;
741         struct mds_group_info mgi;
742         struct llog_ctxt *ctxt;
743         int rc = 0;
744         ENTRY;
745
746         OBD_FREE_PTR(mlsi);
747
748         LASSERT(obd);
749         LASSERT(watched);
750         uuid = &watched->u.cli.cl_target_uuid;
751         LASSERT(uuid);
752
753         down_read(&mds->mds_notify_lock);
754         if (obd->obd_stopping || obd->obd_fail)
755                 GOTO(out, rc = -ENODEV);
756
757         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
758         rc = mds_lov_update_mds(obd, watched, idx);
759         if (rc != 0) {
760                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
761                 GOTO(out, rc);
762         }
763         mgi.group = mdt_to_obd_objgrp(mds->mds_id);
764         mgi.uuid = uuid;
765
766         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
767                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
768         if (rc != 0)
769                 GOTO(out, rc);
770         /* propagate capability keys */
771         rc = mds_propagate_capa_keys(mds);
772         if (rc)
773                 GOTO(out, rc);
774
775         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
776         if (!ctxt)
777                 GOTO(out, rc = -ENODEV);
778
779         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
780         rc = llog_connect(ctxt, NULL, NULL, uuid);
781         llog_ctxt_put(ctxt);
782         if (rc != 0) {
783                 CERROR("%s failed at llog_origin_connect: %d\n",
784                        obd_uuid2str(uuid), rc);
785                 GOTO(out, rc);
786         }
787
788         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
789               obd->obd_name, obd_uuid2str(uuid));
790         rc = mds_lov_clear_orphans(mds, uuid);
791         if (rc != 0) {
792                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
793                        obd_uuid2str(uuid), rc);
794                 GOTO(out, rc);
795         }
796
797         if (obd->obd_upcall.onu_owner) {
798                 /*
799                  * This is a hack for mds_notify->mdd_notify. When the mds obd
800                  * in mdd is removed, This hack should be removed.
801                  */
802                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
803                  rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_ACTIVE,
804                                                  obd->obd_upcall.onu_owner);
805         }
806         EXIT;
807 out:
808         up_read(&mds->mds_notify_lock);
809         if (rc) {
810                 /* Deactivate it for safety */
811                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
812                        rc);
813                 if (!obd->obd_stopping && mds->mds_osc_obd &&
814                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
815                         obd_notify(mds->mds_osc_obd, watched,
816                                    OBD_NOTIFY_INACTIVE, NULL);
817         }
818
819         class_decref(obd, "mds_lov_synchronize", obd);
820         return rc;
821 }
822
823 int mds_lov_synchronize(void *data)
824 {
825         struct mds_lov_sync_info *mlsi = data;
826         char name[20];
827
828         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
829         ptlrpc_daemonize(name);
830
831         RETURN(__mds_lov_synchronize(data));
832 }
833
834 int mds_lov_start_synchronize(struct obd_device *obd,
835                               struct obd_device *watched,
836                               void *data, int nonblock)
837 {
838         struct mds_lov_sync_info *mlsi;
839         int rc;
840         struct obd_uuid *uuid;
841         ENTRY;
842
843         LASSERT(watched);
844         uuid = &watched->u.cli.cl_target_uuid;
845
846         OBD_ALLOC(mlsi, sizeof(*mlsi));
847         if (mlsi == NULL)
848                 RETURN(-ENOMEM);
849
850         LASSERT(data);
851         mlsi->mlsi_obd = obd;
852         mlsi->mlsi_watched = watched;
853         mlsi->mlsi_index = *(__u32 *)data;
854
855         /* Although class_export_get(obd->obd_self_export) would lock
856            the MDS in place, since it's only a self-export
857            it doesn't lock the LOV in place.  The LOV can be disconnected
858            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
859            Simply taking an export ref on the LOV doesn't help, because it's
860            still disconnected. Taking an obd reference insures that we don't
861            disconnect the LOV.  This of course means a cleanup won't
862            finish for as long as the sync is blocking. */
863         class_incref(obd, "mds_lov_synchronize", obd);
864
865         if (nonblock) {
866                 /* Synchronize in the background */
867                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
868                                        CLONE_VM | CLONE_FILES);
869                 if (rc < 0) {
870                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
871                                obd->obd_name, rc);
872                         class_decref(obd, "mds_lov_synchronize", obd);
873                 } else {
874                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
875                                "thread=%d\n", obd->obd_name,
876                                mlsi->mlsi_index, rc);
877                         rc = 0;
878                 }
879         } else {
880                 rc = __mds_lov_synchronize((void *)mlsi);
881         }
882
883         RETURN(rc);
884 }
885
886 int mds_notify(struct obd_device *obd, struct obd_device *watched,
887                enum obd_notify_event ev, void *data)
888 {
889         int rc = 0;
890         ENTRY;
891
892         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
893
894         switch (ev) {
895         /* We only handle these: */
896         case OBD_NOTIFY_ACTIVE:
897                 /* lov want one or more _active_ targets for work */
898                 /* activate event should be pass lov idx as argument */
899         case OBD_NOTIFY_SYNC:
900         case OBD_NOTIFY_SYNC_NONBLOCK:
901                 /* sync event should be pass lov idx as argument */
902                 break;
903         case OBD_NOTIFY_CONFIG:
904         default:
905                 RETURN(0);
906         }
907
908         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
909                 CERROR("unexpected notification of %s %s!\n",
910                        watched->obd_type->typ_name, watched->obd_name);
911                 RETURN(-EINVAL);
912         }
913
914         if (obd->obd_recovering) {
915                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
916                       obd->obd_name,
917                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
918                 /* We still have to fix the lov descriptor for ost's added
919                    after the mdt in the config log.  They didn't make it into
920                    mds_lov_connect. */
921                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
922                                         &watched->u.cli.cl_target_uuid);
923                 RETURN(rc);
924         }
925
926         rc = mds_lov_start_synchronize(obd, watched, data,
927                                        !(ev == OBD_NOTIFY_SYNC));
928
929         RETURN(rc);
930 }