Whamcloud - gitweb
e57a1e2d61b8b4a83e273b7c8205e8198df7bec8
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         CDEBUG(D_INFO, "dump from %s\n", label);
62         if (mds->mds_lov_page_dirty == NULL) {
63                 CERROR("NULL bitmap!\n");
64                 GOTO(skip_bitmap, i);
65         }
66
67         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68                 CDEBUG(D_INFO, "%u - %lx\n", i,
69                        mds->mds_lov_page_dirty->data[i]);
70 skip_bitmap:
71         if (mds->mds_lov_page_array == NULL) {
72                 CERROR("not init page array!\n");
73                 GOTO(skip_array, i);
74
75         }
76         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77                 obd_id *data = mds->mds_lov_page_array[i];
78
79                 if (data == NULL)
80                         continue;
81
82                 for(j=0; j < OBJID_PER_PAGE(); j++) {
83                         if (data[j] == 0)
84                                 continue;
85                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
86                                i, j, data[j]);
87                 }
88         }
89 skip_array:
90         EXIT;
91 }
92
93 int mds_lov_init_objids(struct obd_device *obd)
94 {
95         struct mds_obd *mds = &obd->u.mds;
96         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
97         struct file *file;
98         int rc;
99         ENTRY;
100
101         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
102
103         mds->mds_lov_page_dirty = ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
104         if (mds->mds_lov_page_dirty == NULL)
105                 RETURN(-ENOMEM);
106
107
108         OBD_ALLOC(mds->mds_lov_page_array, size);
109         if (mds->mds_lov_page_array == NULL)
110                 GOTO(err_free_bitmap, rc = -ENOMEM);
111
112         /* open and test the lov objd file */
113         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
114         if (IS_ERR(file)) {
115                 rc = PTR_ERR(file);
116                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
117                 GOTO(err_free, rc = PTR_ERR(file));
118         }
119         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
120                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
121                        file->f_dentry->d_inode->i_mode);
122                 GOTO(err_open, rc = -ENOENT);
123         }
124         mds->mds_lov_objid_filp = file;
125
126         RETURN (0);
127 err_open:
128         if (filp_close((struct file *)file, 0))
129                 CERROR("can't close %s after error\n", LOV_OBJID);
130 err_free:
131         OBD_FREE(mds->mds_lov_page_array, size);
132 err_free_bitmap:
133         FREE_BITMAP(mds->mds_lov_page_dirty);
134
135         RETURN(rc);
136 }
137
138 void mds_lov_destroy_objids(struct obd_device *obd)
139 {
140         struct mds_obd *mds = &obd->u.mds;
141         int i, rc;
142         ENTRY;
143
144         if (mds->mds_lov_page_array != NULL) {
145                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
146                         obd_id *data = mds->mds_lov_page_array[i];
147                         if (data != NULL)
148                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
149                 }
150                 OBD_FREE(mds->mds_lov_page_array,
151                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
152         }
153
154         if (mds->mds_lov_objid_filp) {
155                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
156                 mds->mds_lov_objid_filp = NULL;
157                 if (rc)
158                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
159         }
160
161         FREE_BITMAP(mds->mds_lov_page_dirty);
162         EXIT;
163 }
164
165 /**
166  * currently exist two ways for know about ost count and max ost index.
167  * first - after ost is connected to mds and sync process finished
168  * second - get from lmm in recovery process, in case when mds not have configs,
169  * and ost isn't registered in mgs.
170  *
171  * \param mds pointer to mds structure
172  * \param index maxium ost index
173  *
174  * \retval -ENOMEM is not hame memory for new page
175  * \retval 0 is update passed
176  */
177 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
178 {
179         __u32 page = index / OBJID_PER_PAGE();
180         __u32 off = index % OBJID_PER_PAGE();
181         obd_id *data =  mds->mds_lov_page_array[page];
182
183         if (data == NULL) {
184                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
185                 if (data == NULL)
186                         RETURN(-ENOMEM);
187
188                 mds->mds_lov_page_array[page] = data;
189         }
190
191         if (index > mds->mds_lov_objid_max_index) {
192                 mds->mds_lov_objid_lastpage = page;
193                 mds->mds_lov_objid_lastidx = off;
194                 mds->mds_lov_objid_max_index = index;
195         }
196
197         /* workaround - New target not in objids file; increase mdsize */
198         /* ld_tgt_count is used as the max index everywhere, despite its name. */
199         if (data[off] == 0) {
200                 __u32 stripes;
201
202                 data[off] = 1;
203                 mds->mds_lov_objid_count++;
204                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
205                                 mds->mds_lov_objid_count);
206
207                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
208                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
209
210                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
211                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
212                        mds->mds_max_cookiesize);
213         }
214
215         EXIT;
216         return 0;
217 }
218
219 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
220 {
221         struct lov_ost_data_v1 *data;
222         __u32 count;
223         int rc = 0;
224         __u32 j;
225
226         /* if we create file without objects - lmm is NULL */
227         if (lmm == NULL)
228                 return 0;
229
230         switch (le32_to_cpu(lmm->lmm_magic)) {
231                 case LOV_MAGIC_V1:
232                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
233                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
234                         break;
235                 case LOV_MAGIC_V3:
236                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
237                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
238                         break;
239                 default:
240                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
241                         RETURN(-EINVAL);
242         }
243
244
245         mutex_down(&obd->obd_dev_sem);
246         for (j = 0; j < count; j++) {
247                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
248                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
249                         rc = -ENOMEM;
250                         break;
251                 }
252         }
253         mutex_up(&obd->obd_dev_sem);
254
255         RETURN(rc);
256 }
257 EXPORT_SYMBOL(mds_lov_prepare_objids);
258
259 /*
260  * write llog orphan record about lost ost object,
261  * Special lsm is allocated with single stripe, caller should deallocated it
262  * after use
263  */
264 static int mds_log_lost_precreated(struct obd_device *obd,
265                                    struct lov_stripe_md **lsmp, int *stripes,
266                                    obd_id id, obd_count count, int idx)
267 {
268         struct lov_stripe_md *lsm = *lsmp;
269         int rc;
270         ENTRY;
271
272         if (*lsmp == NULL) {
273                 rc = obd_alloc_memmd(obd->u.mds.mds_osc_exp, &lsm);
274                 if (rc < 0)
275                         RETURN(rc);
276                 /* need only one stripe, save old value */
277                 *stripes = lsm->lsm_stripe_count;
278                 lsm->lsm_stripe_count = 1;
279                 *lsmp = lsm;
280         }
281
282         lsm->lsm_oinfo[0]->loi_id = id;
283         lsm->lsm_oinfo[0]->loi_gr = mdt_to_obd_objgrp(obd->u.mds.mds_id);
284         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
285
286         rc = mds_log_op_orphan(obd, lsm, count);
287         RETURN(rc);
288 }
289
290 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
291 {
292         struct mds_obd *mds = &obd->u.mds;
293         int j;
294         struct lov_ost_data_v1 *obj;
295         struct lov_stripe_md *lsm = NULL;
296         int stripes = 0;
297         int count;
298         ENTRY;
299
300         /* if we create file without objects - lmm is NULL */
301         if (lmm == NULL)
302                 return;
303
304         switch (le32_to_cpu(lmm->lmm_magic)) {
305                 case LOV_MAGIC_V1:
306                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
307                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
308                         break;
309                 case LOV_MAGIC_V3:
310                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
311                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
312                         break;
313                 default:
314                         CERROR("Unknow lmm type %X !\n",
315                                le32_to_cpu(lmm->lmm_magic));
316                         return;
317         }
318
319         for (j = 0; j < count; j++) {
320                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
321                 obd_id id = le64_to_cpu(obj[j].l_object_id);
322                 __u32 page = i / OBJID_PER_PAGE();
323                 __u32 idx = i % OBJID_PER_PAGE();
324                 obd_id *data;
325
326                 data = mds->mds_lov_page_array[page];
327
328                 CDEBUG(D_INODE,"update last object for ost %u"
329                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
330                 if (id > data[idx]) {
331                         int lost = id - data[idx] - 1;
332                         /* we might have lost precreated objects due to VBR */
333                         if (lost > 0 && obd->obd_recovering) {
334                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
335                                 if (!obd->obd_version_recov)
336                                         CERROR("Unexpected gap in objids\n");
337                                 /* lsm is allocated if NULL */
338                                 mds_log_lost_precreated(obd, &lsm, &stripes,
339                                                         data[idx]+1, lost, i);
340                         }
341                         data[idx] = id;
342                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
343                 }
344         }
345         if (lsm) {
346                 /* restore stripes number */
347                 lsm->lsm_stripe_count = stripes;
348                 obd_free_memmd(mds->mds_osc_exp, &lsm);
349         }
350         EXIT;
351         return;
352 }
353 EXPORT_SYMBOL(mds_lov_update_objids);
354
355 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
356                                     __u32 count)
357 {
358         __u32 i;
359         __u32 stripes;
360
361         for (i = 0; i < count; i++) {
362                 if (data[i] == 0)
363                         continue;
364
365                 mds->mds_lov_objid_count++;
366         }
367
368         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
369                          mds->mds_lov_objid_count);
370
371         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
372         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
373
374         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
375                "%d/%d\n", stripes, mds->mds_max_mdsize,
376                mds->mds_max_cookiesize);
377
378         EXIT;
379         return 0;
380 }
381
382 static int mds_lov_read_objids(struct obd_device *obd)
383 {
384         struct mds_obd *mds = &obd->u.mds;
385         loff_t off = 0;
386         int i, rc = 0, count = 0, page = 0;
387         unsigned long size;
388         ENTRY;
389
390         /* Read everything in the file, even if our current lov desc
391            has fewer targets. Old targets not in the lov descriptor
392            during mds setup may still have valid objids. */
393         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
394         if (size == 0)
395                 RETURN(0);
396
397         page = (size / (OBJID_PER_PAGE() * sizeof(obd_id))) + 1;
398         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
399         for (i = 0; i < page; i++) {
400                 obd_id *data;
401                 loff_t off_old = off;
402
403                 LASSERT(mds->mds_lov_page_array[i] == NULL);
404                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
405                 if (mds->mds_lov_page_array[i] == NULL)
406                         GOTO(out, rc = -ENOMEM);
407
408                 data = mds->mds_lov_page_array[i];
409
410                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
411                                         OBJID_PER_PAGE()*sizeof(obd_id), &off);
412                 if (rc < 0) {
413                         CERROR("Error reading objids %d\n", rc);
414                         GOTO(out, rc);
415                 }
416
417                 count += (off - off_old) / sizeof(obd_id);
418                 if (mds_lov_update_from_read(mds, data, count)) {
419                         CERROR("Can't update mds data\n");
420                         GOTO(out, rc = -EIO);
421                 }
422
423                 if (off == off_old)
424                         break; /* eof */
425         }
426         mds->mds_lov_objid_lastpage = i;
427         mds->mds_lov_objid_lastidx = count % OBJID_PER_PAGE();
428
429         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
430                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
431 out:
432         mds_lov_dump_objids("read",obd);
433
434         RETURN(rc);
435 }
436
437 int mds_lov_write_objids(struct obd_device *obd)
438 {
439         struct mds_obd *mds = &obd->u.mds;
440         int i = 0, rc = 0;
441         ENTRY;
442
443         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
444                 RETURN(0);
445
446         mds_lov_dump_objids("write", obd);
447
448         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
449                 obd_id *data =  mds->mds_lov_page_array[i];
450                 unsigned int size = OBJID_PER_PAGE()*sizeof(obd_id);
451                 loff_t off = i * size;
452
453                 LASSERT(data != NULL);
454
455                 /* check for particaly filled last page */
456                 if (i == mds->mds_lov_objid_lastpage)
457                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
458
459                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
460                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
461                                          size, &off, 0);
462                 if (rc < 0)
463                         break;
464                 cfs_bitmap_clear(mds->mds_lov_page_dirty, i);
465         }
466         if (rc >= 0)
467                 rc = 0;
468
469         RETURN(rc);
470 }
471 EXPORT_SYMBOL(mds_lov_write_objids);
472
473 static int mds_lov_get_objid(struct obd_device * obd,
474                              obd_id idx)
475 {
476         struct mds_obd *mds = &obd->u.mds;
477         unsigned int page;
478         unsigned int off;
479         obd_id *data;
480         int rc = 0;
481         ENTRY;
482
483         page = idx / OBJID_PER_PAGE();
484         off = idx % OBJID_PER_PAGE();
485         data = mds->mds_lov_page_array[page];
486         if (data[off] < 2) {
487                 /* We never read this lastid; ask the osc */
488                 struct obd_id_info lastid;
489                 __u32 size = sizeof(lastid);
490
491                 lastid.idx = idx;
492                 lastid.data = &data[off];
493                 rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LAST_ID),
494                                   KEY_LAST_ID, &size, &lastid, NULL);
495                 if (rc)
496                         GOTO(out, rc);
497
498                 /* workaround for clean filter */
499                 if (data[off] == 0)
500                         data[off] = 1;
501
502                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
503         }
504         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
505                idx, data, page, off, data[off]);
506 out:
507         RETURN(rc);
508 }
509
510 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
511 {
512         int rc;
513         struct obdo oa = { 0 };
514         struct obd_trans_info oti = {0};
515         struct lov_stripe_md  *empty_ea = NULL;
516         ENTRY;
517
518         LASSERT(mds->mds_lov_page_array != NULL);
519
520         /* This create will in fact either create or destroy:  If the OST is
521          * missing objects below this ID, they will be created.  If it finds
522          * objects above this ID, they will be removed. */
523         memset(&oa, 0, sizeof(oa));
524         oa.o_flags = OBD_FL_DELORPHAN;
525         oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
526         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
527         if (ost_uuid != NULL)
528                 oti.oti_ost_uuid = ost_uuid;
529
530         rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
531
532         RETURN(rc);
533 }
534
535 /* for one target */
536 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
537 {
538         struct mds_obd *mds = &obd->u.mds;
539         int rc;
540         struct obd_id_info info;
541         ENTRY;
542
543         LASSERT(!obd->obd_recovering);
544
545         info.idx = idx;
546         info.data = id;
547         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID),
548                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
549         if (rc)
550                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
551                         obd->obd_name, rc);
552
553         RETURN(rc);
554 }
555
556 /* Update the lov desc for a new size lov. */
557 static int mds_lov_update_desc(struct obd_device *obd, int idx,
558                                struct obd_uuid *uuid, enum obd_notify_event ev)
559 {
560         struct mds_obd *mds = &obd->u.mds;
561         struct lov_desc *ld;
562         __u32 valsize = sizeof(mds->mds_lov_desc);
563         int rc = 0;
564         ENTRY;
565
566         OBD_ALLOC(ld, sizeof(*ld));
567         if (!ld)
568                 RETURN(-ENOMEM);
569
570         rc = obd_get_info(mds->mds_osc_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
571                           &valsize, ld, NULL);
572         if (rc)
573                 GOTO(out, rc);
574
575         /* Don't change the mds_lov_desc until the objids size matches the
576            count (paranoia) */
577         mds->mds_lov_desc = *ld;
578         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
579                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
580
581         mutex_down(&obd->obd_dev_sem);
582         rc = mds_lov_update_max_ost(mds, idx);
583         mutex_up(&obd->obd_dev_sem);
584         if (rc != 0)
585                 GOTO(out, rc );
586
587         /* If we added a target we have to reconnect the llogs */
588         /* We only _need_ to do this at first add (idx), or the first time
589            after recovery.  However, it should now be safe to call anytime. */
590         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
591         if (rc)
592                 GOTO(out, rc);
593
594         /*XXX this notifies the MDD until lov handling use old mds code */
595         if (obd->obd_upcall.onu_owner) {
596                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
597                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
598                                                  obd->obd_upcall.onu_owner,
599                                                  &mds->mds_mount_count);
600         }
601 out:
602         OBD_FREE(ld, sizeof(*ld));
603         RETURN(rc);
604 }
605
606 /* Inform MDS about new/updated target */
607 static int mds_lov_update_mds(struct obd_device *obd,
608                               struct obd_device *watched,
609                               __u32 idx, enum obd_notify_event ev)
610 {
611         struct mds_obd *mds = &obd->u.mds;
612         int rc = 0;
613         int page;
614         int off;
615         obd_id *data;
616
617         ENTRY;
618
619         /* Don't let anyone else mess with mds_lov_objids now */
620         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev);
621         if (rc)
622                 GOTO(out, rc);
623
624         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
625                idx, obd->obd_recovering, obd->obd_async_recov,
626                mds->mds_lov_desc.ld_tgt_count);
627
628         /* idx is set as data from lov_notify. */
629         if (obd->obd_recovering)
630                 GOTO(out, rc);
631
632         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
633                 CERROR("index %d > count %d!\n", idx,
634                        mds->mds_lov_desc.ld_tgt_count);
635                 GOTO(out, rc = -EINVAL);
636         }
637
638         rc = mds_lov_get_objid(obd, idx);
639         if (rc)
640                 GOTO(out, rc);
641
642         page = idx / OBJID_PER_PAGE();
643         off = idx % OBJID_PER_PAGE();
644         data = mds->mds_lov_page_array[page];
645
646         /* We have read this lastid from disk; tell the osc.
647            Don't call this during recovery. */
648         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
649         if (rc) {
650                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
651                 /* Don't abort the rest of the sync */
652                 rc = 0;
653         } else {
654                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
655                         data[off], idx, rc);
656         }
657 out:
658         RETURN(rc);
659 }
660
661 /* update the LOV-OSC knowledge of the last used object id's */
662 int mds_lov_connect(struct obd_device *obd, char * lov_name)
663 {
664         struct mds_obd *mds = &obd->u.mds;
665         struct obd_connect_data *data;
666         int rc;
667         ENTRY;
668
669         if (IS_ERR(mds->mds_osc_obd))
670                 RETURN(PTR_ERR(mds->mds_osc_obd));
671
672         if (mds->mds_osc_obd)
673                 RETURN(0);
674
675         mds->mds_osc_obd = class_name2obd(lov_name);
676         if (!mds->mds_osc_obd) {
677                 CERROR("MDS cannot locate LOV %s\n", lov_name);
678                 mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
679                 RETURN(-ENOTCONN);
680         }
681
682         mutex_down(&obd->obd_dev_sem);
683         rc = mds_lov_read_objids(obd);
684         mutex_up(&obd->obd_dev_sem);
685         if (rc) {
686                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
687                 GOTO(err_exit, rc);
688         }
689
690         rc = obd_register_observer(mds->mds_osc_obd, obd);
691         if (rc) {
692                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
693                        lov_name, rc);
694                 GOTO(err_exit, rc);
695         }
696
697         /* try init too early */
698         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
699         if (rc)
700                 GOTO(err_exit, rc);
701
702         mds->mds_osc_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
703
704         OBD_ALLOC(data, sizeof(*data));
705         if (data == NULL)
706                 GOTO(err_exit, rc = -ENOMEM);
707
708         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
709                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
710                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
711                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
712                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
713                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
714                                   OBD_CONNECT_SOM;
715 #ifdef HAVE_LRU_RESIZE_SUPPORT
716         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
717 #endif
718         data->ocd_version = LUSTRE_VERSION_CODE;
719         data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
720         /* send max bytes per rpc */
721         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
722         /* send the list of supported checksum types */
723         data->ocd_cksum_types = OBD_CKSUM_ALL;
724         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
725         rc = obd_connect(NULL, &mds->mds_osc_exp, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
726         OBD_FREE(data, sizeof(*data));
727         if (rc) {
728                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
729                 mds->mds_osc_obd = ERR_PTR(rc);
730                 RETURN(rc);
731         }
732
733         /* I want to see a callback happen when the OBD moves to a
734          * "For General Use" state, and that's when we'll call
735          * set_nextid().  The class driver can help us here, because
736          * it can use the obd_recovering flag to determine when the
737          * the OBD is full available. */
738         /* MDD device will care about that
739         if (!obd->obd_recovering)
740                 rc = mds_postrecov(obd);
741          */
742         RETURN(rc);
743
744 err_exit:
745         mds->mds_osc_exp = NULL;
746         mds->mds_osc_obd = ERR_PTR(rc);
747         RETURN(rc);
748 }
749
750 int mds_lov_disconnect(struct obd_device *obd)
751 {
752         struct mds_obd *mds = &obd->u.mds;
753         int rc = 0;
754         ENTRY;
755
756         if (!IS_ERR(mds->mds_osc_obd) && mds->mds_osc_exp != NULL) {
757                 obd_register_observer(mds->mds_osc_obd, NULL);
758
759                 /* The actual disconnect of the mds_lov will be called from
760                  * class_disconnect_exports from mds_lov_clean. So we have to
761                  * ensure that class_cleanup doesn't fail due to the extra ref
762                  * we're holding now. The mechanism to do that already exists -
763                  * the obd_force flag. We'll drop the final ref to the
764                  * mds_osc_exp in mds_cleanup. */
765                 mds->mds_osc_obd->obd_force = 1;
766         }
767
768         RETURN(rc);
769 }
770
771 struct mds_lov_sync_info {
772         struct obd_device    *mlsi_obd;     /* the lov device to sync */
773         struct obd_device    *mlsi_watched; /* target osc */
774         __u32                 mlsi_index;   /* index of target */
775         enum obd_notify_event mlsi_ev;      /* event type */
776 };
777
778 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
779 {
780         struct mds_capa_info    info = { .uuid = uuid };
781         struct lustre_capa_key *key;
782         int i, rc = 0;
783
784         ENTRY;
785
786         if (!mds->mds_capa_keys)
787                 RETURN(0);
788
789         for (i = 0; i < 2; i++) {
790                 key = &mds->mds_capa_keys[i];
791                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
792
793                 info.capa = key;
794                 rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_CAPA_KEY),
795                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
796                 if (rc) {
797                         DEBUG_CAPA_KEY(D_ERROR, key,
798                                        "propagate failed (rc = %d) for", rc);
799                         RETURN(rc);
800                 }
801         }
802
803         RETURN(0);
804 }
805
806 /* We only sync one osc at a time, so that we don't have to hold
807    any kind of lock on the whole mds_lov_desc, which may change
808    (grow) as a result of mds_lov_add_ost.  This also avoids any
809    kind of mismatch between the lov_desc and the mds_lov_desc,
810    which are not in lock-step during lov_add_obd */
811 static int __mds_lov_synchronize(void *data)
812 {
813         struct mds_lov_sync_info *mlsi = data;
814         struct obd_device *obd = mlsi->mlsi_obd;
815         struct obd_device *watched = mlsi->mlsi_watched;
816         struct mds_obd *mds = &obd->u.mds;
817         struct obd_uuid *uuid;
818         __u32  idx = mlsi->mlsi_index;
819         enum obd_notify_event ev = mlsi->mlsi_ev;
820         struct mds_group_info mgi;
821         struct llog_ctxt *ctxt;
822         int rc = 0;
823         ENTRY;
824
825         OBD_FREE_PTR(mlsi);
826
827         LASSERT(obd);
828         LASSERT(watched);
829         uuid = &watched->u.cli.cl_target_uuid;
830         LASSERT(uuid);
831
832         down_read(&mds->mds_notify_lock);
833         if (obd->obd_stopping || obd->obd_fail)
834                 GOTO(out, rc = -ENODEV);
835
836         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
837         rc = mds_lov_update_mds(obd, watched, idx, ev);
838         if (rc != 0) {
839                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
840                 GOTO(out, rc);
841         }
842         mgi.group = mdt_to_obd_objgrp(mds->mds_id);
843         mgi.uuid = uuid;
844
845         rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
846                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
847         if (rc != 0)
848                 GOTO(out, rc);
849         /* propagate capability keys */
850         rc = mds_propagate_capa_keys(mds, uuid);
851         if (rc)
852                 GOTO(out, rc);
853
854         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
855         if (!ctxt)
856                 GOTO(out, rc = -ENODEV);
857
858         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
859         rc = llog_connect(ctxt, NULL, NULL, uuid);
860         llog_ctxt_put(ctxt);
861         if (rc != 0) {
862                 CERROR("%s failed at llog_origin_connect: %d\n",
863                        obd_uuid2str(uuid), rc);
864                 GOTO(out, rc);
865         }
866
867         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
868               obd->obd_name, obd_uuid2str(uuid));
869         rc = mds_lov_clear_orphans(mds, uuid);
870         if (rc != 0) {
871                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
872                        obd_uuid2str(uuid), rc);
873                 GOTO(out, rc);
874         }
875
876 #ifdef HAVE_QUOTA_SUPPORT
877         if (obd->obd_upcall.onu_owner) { 
878                 /*
879                  * This is a hack for mds_notify->mdd_notify. When the mds obd
880                  * in mdd is removed, This hack should be removed.
881                  */
882                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
883                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
884                                                 obd->obd_upcall.onu_owner,NULL);
885         }
886 #endif
887         EXIT;
888 out:
889         up_read(&mds->mds_notify_lock);
890         if (rc) {
891                 /* Deactivate it for safety */
892                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
893                        rc);
894                 if (!obd->obd_stopping && mds->mds_osc_obd &&
895                     !mds->mds_osc_obd->obd_stopping && !watched->obd_stopping)
896                         obd_notify(mds->mds_osc_obd, watched,
897                                    OBD_NOTIFY_INACTIVE, NULL);
898         }
899
900         class_decref(obd, "mds_lov_synchronize", obd);
901         return rc;
902 }
903
904 int mds_lov_synchronize(void *data)
905 {
906         struct mds_lov_sync_info *mlsi = data;
907         char name[20];
908
909         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
910         ptlrpc_daemonize(name);
911
912         RETURN(__mds_lov_synchronize(data));
913 }
914
915 int mds_lov_start_synchronize(struct obd_device *obd,
916                               struct obd_device *watched,
917                               void *data, enum obd_notify_event ev)
918 {
919         struct mds_lov_sync_info *mlsi;
920         int rc;
921         struct obd_uuid *uuid;
922         ENTRY;
923
924         LASSERT(watched);
925         uuid = &watched->u.cli.cl_target_uuid;
926
927         OBD_ALLOC(mlsi, sizeof(*mlsi));
928         if (mlsi == NULL)
929                 RETURN(-ENOMEM);
930
931         LASSERT(data);
932         mlsi->mlsi_obd = obd;
933         mlsi->mlsi_watched = watched;
934         mlsi->mlsi_index = *(__u32 *)data;
935         mlsi->mlsi_ev = ev;
936
937         /* Although class_export_get(obd->obd_self_export) would lock
938            the MDS in place, since it's only a self-export
939            it doesn't lock the LOV in place.  The LOV can be disconnected
940            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
941            Simply taking an export ref on the LOV doesn't help, because it's
942            still disconnected. Taking an obd reference insures that we don't
943            disconnect the LOV.  This of course means a cleanup won't
944            finish for as long as the sync is blocking. */
945         class_incref(obd, "mds_lov_synchronize", obd);
946
947         if (ev != OBD_NOTIFY_SYNC) {
948                 /* Synchronize in the background */
949                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
950                                        CLONE_VM | CLONE_FILES);
951                 if (rc < 0) {
952                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
953                                obd->obd_name, rc);
954                         class_decref(obd, "mds_lov_synchronize", obd);
955                 } else {
956                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
957                                "thread=%d\n", obd->obd_name,
958                                mlsi->mlsi_index, rc);
959                         rc = 0;
960                 }
961         } else {
962                 rc = __mds_lov_synchronize((void *)mlsi);
963         }
964
965         RETURN(rc);
966 }
967
968 int mds_notify(struct obd_device *obd, struct obd_device *watched,
969                enum obd_notify_event ev, void *data)
970 {
971         int rc = 0;
972         ENTRY;
973
974         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
975
976         switch (ev) {
977         /* We only handle these: */
978         case OBD_NOTIFY_ACTIVE:
979                 /* lov want one or more _active_ targets for work */
980                 /* activate event should be pass lov idx as argument */
981         case OBD_NOTIFY_SYNC:
982         case OBD_NOTIFY_SYNC_NONBLOCK:
983                 /* sync event should be pass lov idx as argument */
984                 break;
985         default:
986                 RETURN(0);
987         }
988
989         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
990                 CERROR("unexpected notification of %s %s!\n",
991                        watched->obd_type->typ_name, watched->obd_name);
992                 RETURN(-EINVAL);
993         }
994
995         if (obd->obd_recovering) {
996                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
997                       obd->obd_name,
998                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
999                 /* We still have to fix the lov descriptor for ost's added
1000                    after the mdt in the config log.  They didn't make it into
1001                    mds_lov_connect. */
1002                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1003                                          &watched->u.cli.cl_target_uuid, ev);
1004         } else {
1005                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1006         }
1007         RETURN(rc);
1008 }