Whamcloud - gitweb
b=23161 Update max ea and init llog early
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         CDEBUG(D_INFO, "dump from %s\n", label);
62         if (mds->mds_lov_page_dirty == NULL) {
63                 CERROR("NULL bitmap!\n");
64                 GOTO(skip_bitmap, i);
65         }
66
67         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68                 CDEBUG(D_INFO, "%u - %lx\n", i,
69                        mds->mds_lov_page_dirty->data[i]);
70 skip_bitmap:
71         if (mds->mds_lov_page_array == NULL) {
72                 CERROR("not init page array!\n");
73                 GOTO(skip_array, i);
74
75         }
76         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77                 obd_id *data = mds->mds_lov_page_array[i];
78
79                 if (data == NULL)
80                         continue;
81
82                 for(j=0; j < OBJID_PER_PAGE(); j++) {
83                         if (data[j] == 0)
84                                 continue;
85                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
86                                i, j, data[j]);
87                 }
88         }
89 skip_array:
90         EXIT;
91 }
92
93 int mds_lov_init_objids(struct obd_device *obd)
94 {
95         struct mds_obd *mds = &obd->u.mds;
96         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
97         struct file *file;
98         int rc;
99         ENTRY;
100
101         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
102
103         mds->mds_lov_page_dirty =
104                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
105         if (mds->mds_lov_page_dirty == NULL)
106                 RETURN(-ENOMEM);
107
108
109         OBD_ALLOC(mds->mds_lov_page_array, size);
110         if (mds->mds_lov_page_array == NULL)
111                 GOTO(err_free_bitmap, rc = -ENOMEM);
112
113         /* open and test the lov objd file */
114         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
115         if (IS_ERR(file)) {
116                 rc = PTR_ERR(file);
117                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
118                 GOTO(err_free, rc = PTR_ERR(file));
119         }
120         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
121                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
122                        file->f_dentry->d_inode->i_mode);
123                 GOTO(err_open, rc = -ENOENT);
124         }
125         mds->mds_lov_objid_filp = file;
126
127         RETURN (0);
128 err_open:
129         if (filp_close((struct file *)file, 0))
130                 CERROR("can't close %s after error\n", LOV_OBJID);
131 err_free:
132         OBD_FREE(mds->mds_lov_page_array, size);
133 err_free_bitmap:
134         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
135
136         RETURN(rc);
137 }
138
139 void mds_lov_destroy_objids(struct obd_device *obd)
140 {
141         struct mds_obd *mds = &obd->u.mds;
142         int i, rc;
143         ENTRY;
144
145         if (mds->mds_lov_page_array != NULL) {
146                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
147                         obd_id *data = mds->mds_lov_page_array[i];
148                         if (data != NULL)
149                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
150                 }
151                 OBD_FREE(mds->mds_lov_page_array,
152                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
153         }
154
155         if (mds->mds_lov_objid_filp) {
156                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
157                 mds->mds_lov_objid_filp = NULL;
158                 if (rc)
159                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
160         }
161
162         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
163         EXIT;
164 }
165
166 /**
167  * currently exist two ways for know about ost count and max ost index.
168  * first - after ost is connected to mds and sync process finished
169  * second - get from lmm in recovery process, in case when mds not have configs,
170  * and ost isn't registered in mgs.
171  *
172  * \param mds pointer to mds structure
173  * \param index maxium ost index
174  *
175  * \retval -ENOMEM is not hame memory for new page
176  * \retval 0 is update passed
177  */
178 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
179 {
180         __u32 page = index / OBJID_PER_PAGE();
181         __u32 off = index % OBJID_PER_PAGE();
182         obd_id *data =  mds->mds_lov_page_array[page];
183
184         if (data == NULL) {
185                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186                 if (data == NULL)
187                         RETURN(-ENOMEM);
188
189                 mds->mds_lov_page_array[page] = data;
190         }
191
192         if (index > mds->mds_lov_objid_max_index) {
193                 mds->mds_lov_objid_lastpage = page;
194                 mds->mds_lov_objid_lastidx = off;
195                 mds->mds_lov_objid_max_index = index;
196         }
197
198         /* workaround - New target not in objids file; increase mdsize */
199         /* ld_tgt_count is used as the max index everywhere, despite its name. */
200         if (data[off] == 0) {
201                 __u32 stripes;
202
203                 data[off] = 1;
204                 mds->mds_lov_objid_count++;
205                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
206                                 mds->mds_lov_objid_count);
207
208                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
209                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
210
211                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
212                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
213                        mds->mds_max_cookiesize);
214         }
215
216         EXIT;
217         return 0;
218 }
219
220 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
221 {
222         __u32 page = index / OBJID_PER_PAGE();
223         __u32 off = index % OBJID_PER_PAGE();
224         obd_id *data =  mds->mds_lov_page_array[page];
225
226         return (data[off] > 0);
227 }
228
229 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
230 {
231         struct lov_ost_data_v1 *data;
232         __u32 count;
233         int rc = 0;
234         __u32 j;
235
236         /* if we create file without objects - lmm is NULL */
237         if (lmm == NULL)
238                 return 0;
239
240         switch (le32_to_cpu(lmm->lmm_magic)) {
241                 case LOV_MAGIC_V1:
242                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
243                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
244                         break;
245                 case LOV_MAGIC_V3:
246                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
247                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
248                         break;
249                 default:
250                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
251                         RETURN(-EINVAL);
252         }
253
254
255         cfs_mutex_down(&obd->obd_dev_sem);
256         for (j = 0; j < count; j++) {
257                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
258                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
259                         rc = -ENOMEM;
260                         break;
261                 }
262         }
263         cfs_mutex_up(&obd->obd_dev_sem);
264
265         RETURN(rc);
266 }
267 EXPORT_SYMBOL(mds_lov_prepare_objids);
268
269 /*
270  * write llog orphan record about lost ost object,
271  * Special lsm is allocated with single stripe, caller should deallocated it
272  * after use
273  */
274 static int mds_log_lost_precreated(struct obd_device *obd,
275                                    struct lov_stripe_md **lsmp, int *stripes,
276                                    obd_id id, obd_count count, int idx)
277 {
278         struct lov_stripe_md *lsm = *lsmp;
279         int rc;
280         ENTRY;
281
282         if (*lsmp == NULL) {
283                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
284                 if (rc < 0)
285                         RETURN(rc);
286                 /* need only one stripe, save old value */
287                 *stripes = lsm->lsm_stripe_count;
288                 lsm->lsm_stripe_count = 1;
289                 *lsmp = lsm;
290         }
291
292         lsm->lsm_oinfo[0]->loi_id = id;
293         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
294         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
295
296         rc = mds_log_op_orphan(obd, lsm, count);
297         RETURN(rc);
298 }
299
300 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
301 {
302         struct mds_obd *mds = &obd->u.mds;
303         int j;
304         struct lov_ost_data_v1 *obj;
305         struct lov_stripe_md *lsm = NULL;
306         int stripes = 0;
307         int count;
308         ENTRY;
309
310         /* if we create file without objects - lmm is NULL */
311         if (lmm == NULL)
312                 return;
313
314         switch (le32_to_cpu(lmm->lmm_magic)) {
315                 case LOV_MAGIC_V1:
316                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
317                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
318                         break;
319                 case LOV_MAGIC_V3:
320                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
321                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
322                         break;
323                 default:
324                         CERROR("Unknow lmm type %X !\n",
325                                le32_to_cpu(lmm->lmm_magic));
326                         return;
327         }
328
329         for (j = 0; j < count; j++) {
330                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
331                 obd_id id = le64_to_cpu(obj[j].l_object_id);
332                 __u32 page = i / OBJID_PER_PAGE();
333                 __u32 idx = i % OBJID_PER_PAGE();
334                 obd_id *data;
335
336                 data = mds->mds_lov_page_array[page];
337
338                 CDEBUG(D_INODE,"update last object for ost %u"
339                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
340                 if (id > data[idx]) {
341                         int lost = id - data[idx] - 1;
342                         /* we might have lost precreated objects due to VBR */
343                         if (lost > 0 && obd->obd_recovering) {
344                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
345                                 if (!obd->obd_version_recov)
346                                         CERROR("Unexpected gap in objids\n");
347                                 /* lsm is allocated if NULL */
348                                 mds_log_lost_precreated(obd, &lsm, &stripes,
349                                                         data[idx]+1, lost, i);
350                         }
351                         data[idx] = id;
352                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
353                 }
354         }
355         if (lsm) {
356                 /* restore stripes number */
357                 lsm->lsm_stripe_count = stripes;
358                 obd_free_memmd(mds->mds_lov_exp, &lsm);
359         }
360         EXIT;
361         return;
362 }
363 EXPORT_SYMBOL(mds_lov_update_objids);
364
365 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
366                                     __u32 count)
367 {
368         __u32 i;
369         __u32 stripes;
370
371         for (i = 0; i < count; i++) {
372                 if (data[i] == 0)
373                         continue;
374
375                 mds->mds_lov_objid_count++;
376         }
377
378         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
379                          mds->mds_lov_objid_count);
380
381         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
382         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
383
384         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
385                "%d/%d\n", stripes, mds->mds_max_mdsize,
386                mds->mds_max_cookiesize);
387
388         EXIT;
389         return 0;
390 }
391
392 static int mds_lov_read_objids(struct obd_device *obd)
393 {
394         struct mds_obd *mds = &obd->u.mds;
395         loff_t off = 0;
396         int i, rc = 0, count = 0, page = 0;
397         unsigned long size;
398         ENTRY;
399
400         /* Read everything in the file, even if our current lov desc
401            has fewer targets. Old targets not in the lov descriptor
402            during mds setup may still have valid objids. */
403         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
404         if (size == 0)
405                 RETURN(0);
406
407         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
408         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
409         for (i = 0; i < page; i++) {
410                 obd_id *data;
411                 loff_t off_old = off;
412
413                 LASSERT(mds->mds_lov_page_array[i] == NULL);
414                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
415                 if (mds->mds_lov_page_array[i] == NULL)
416                         GOTO(out, rc = -ENOMEM);
417
418                 data = mds->mds_lov_page_array[i];
419
420                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
421                                         MDS_LOV_ALLOC_SIZE, &off);
422                 if (rc < 0) {
423                         CERROR("Error reading objids %d\n", rc);
424                         GOTO(out, rc);
425                 }
426                 if (off == off_old) /* hole is read */
427                         off += MDS_LOV_ALLOC_SIZE;
428
429                 count = (off - off_old) / sizeof(obd_id);
430                 if (mds_lov_update_from_read(mds, data, count)) {
431                         CERROR("Can't update mds data\n");
432                         GOTO(out, rc = -EIO);
433                 }
434         }
435         mds->mds_lov_objid_lastpage = page - 1;
436         mds->mds_lov_objid_lastidx = count - 1;
437
438         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
439                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
440 out:
441         mds_lov_dump_objids("read",obd);
442
443         RETURN(rc);
444 }
445
446 int mds_lov_write_objids(struct obd_device *obd)
447 {
448         struct mds_obd *mds = &obd->u.mds;
449         int i = 0, rc = 0;
450         ENTRY;
451
452         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
453                 RETURN(0);
454
455         mds_lov_dump_objids("write", obd);
456
457         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
458                 obd_id *data =  mds->mds_lov_page_array[i];
459                 unsigned int size = MDS_LOV_ALLOC_SIZE;
460                 loff_t off = i * size;
461
462                 LASSERT(data != NULL);
463
464                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
465                         continue;
466
467                 /* check for particaly filled last page */
468                 if (i == mds->mds_lov_objid_lastpage)
469                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
470
471                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
472                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
473                                          size, &off, 0);
474                 if (rc < 0) {
475                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
476                         break;
477                 }
478         }
479         if (rc >= 0)
480                 rc = 0;
481
482         RETURN(rc);
483 }
484 EXPORT_SYMBOL(mds_lov_write_objids);
485
486 static int mds_lov_get_objid(struct obd_device * obd,
487                              obd_id idx)
488 {
489         struct mds_obd *mds = &obd->u.mds;
490         struct obd_export *lov_exp = mds->mds_lov_exp;
491         unsigned int page;
492         unsigned int off;
493         obd_id *data;
494         __u32 size;
495         int rc = 0;
496         ENTRY;
497
498         page = idx / OBJID_PER_PAGE();
499         off = idx % OBJID_PER_PAGE();
500         data = mds->mds_lov_page_array[page];
501
502         if (data[off] < 2) {
503                 /* We never read this lastid; ask the osc */
504                 struct obd_id_info lastid;
505
506                 size = sizeof(lastid);
507                 lastid.idx = idx;
508                 lastid.data = &data[off];
509                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
510                                   &size, &lastid, NULL);
511                 if (rc)
512                         GOTO(out, rc);
513
514                 /* workaround for clean filter */
515                 if (data[off] == 0)
516                         data[off] = 1;
517
518                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
519         }
520         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
521                idx, data, page, off, data[off]);
522 out:
523         RETURN(rc);
524 }
525
526 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
527 {
528         int rc;
529         struct obdo oa = { 0 };
530         struct obd_trans_info oti = {0};
531         struct lov_stripe_md  *empty_ea = NULL;
532         ENTRY;
533
534         LASSERT(mds->mds_lov_page_array != NULL);
535
536         /* This create will in fact either create or destroy:  If the OST is
537          * missing objects below this ID, they will be created.  If it finds
538          * objects above this ID, they will be removed. */
539         memset(&oa, 0, sizeof(oa));
540         oa.o_flags = OBD_FL_DELORPHAN;
541         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
542         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
543         if (ost_uuid != NULL)
544                 oti.oti_ost_uuid = ost_uuid;
545
546         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
547
548         RETURN(rc);
549 }
550
551 /* for one target */
552 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
553 {
554         struct mds_obd *mds = &obd->u.mds;
555         int rc;
556         struct obd_id_info info;
557         ENTRY;
558
559         LASSERT(!obd->obd_recovering);
560
561         info.idx = idx;
562         info.data = id;
563         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
564                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
565         if (rc)
566                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
567                         obd->obd_name, rc);
568
569         RETURN(rc);
570 }
571
572 /* Update the lov desc for a new size lov. */
573 static int mds_lov_update_desc(struct obd_device *obd, int idx,
574                                struct obd_uuid *uuid)
575 {
576         struct mds_obd *mds = &obd->u.mds;
577         struct lov_desc *ld;
578         __u32 valsize = sizeof(mds->mds_lov_desc);
579         int rc = 0;
580         ENTRY;
581
582         OBD_ALLOC(ld, sizeof(*ld));
583         if (!ld)
584                 RETURN(-ENOMEM);
585
586         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
587                           &valsize, ld, NULL);
588         if (rc)
589                 GOTO(out, rc);
590
591         /* Don't change the mds_lov_desc until the objids size matches the
592            count (paranoia) */
593         mds->mds_lov_desc = *ld;
594         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
595                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
596
597         cfs_mutex_down(&obd->obd_dev_sem);
598         rc = mds_lov_update_max_ost(mds, idx);
599         cfs_mutex_up(&obd->obd_dev_sem);
600         if (rc != 0)
601                 GOTO(out, rc );
602
603         /* If we added a target we have to reconnect the llogs */
604         /* We only _need_ to do this at first add (idx), or the first time
605            after recovery.  However, it should now be safe to call anytime. */
606         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
607         if (rc)
608                 GOTO(out, rc);
609
610 out:
611         OBD_FREE(ld, sizeof(*ld));
612         RETURN(rc);
613 }
614
615 /* Inform MDS about new/updated target */
616 static int mds_lov_update_mds(struct obd_device *obd,
617                               struct obd_device *watched,
618                               __u32 idx)
619 {
620         struct mds_obd *mds = &obd->u.mds;
621         int rc = 0;
622         int page;
623         int off;
624         obd_id *data;
625         ENTRY;
626
627         LASSERT(mds_lov_objinit(mds, idx));
628
629         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
630                idx, obd->obd_recovering, obd->obd_async_recov,
631                mds->mds_lov_desc.ld_tgt_count);
632
633         /* idx is set as data from lov_notify. */
634         if (obd->obd_recovering)
635                 GOTO(out, rc);
636
637         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
638                 CERROR("index %d > count %d!\n", idx,
639                        mds->mds_lov_desc.ld_tgt_count);
640                 GOTO(out, rc = -EINVAL);
641         }
642
643         rc = mds_lov_get_objid(obd, idx);
644         if (rc)
645                 GOTO(out, rc);
646
647         page = idx / OBJID_PER_PAGE();
648         off = idx % OBJID_PER_PAGE();
649         data = mds->mds_lov_page_array[page];
650
651         /* We have read this lastid from disk; tell the osc.
652            Don't call this during recovery. */
653         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
654         if (rc) {
655                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
656                 /* Don't abort the rest of the sync */
657                 rc = 0;
658         } else {
659                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
660                         data[off], idx, rc);
661         }
662 out:
663         RETURN(rc);
664 }
665
666 /* update the LOV-OSC knowledge of the last used object id's */
667 int mds_lov_connect(struct obd_device *obd, char * lov_name)
668 {
669         struct mds_obd *mds = &obd->u.mds;
670         struct obd_connect_data *data;
671         int rc;
672         ENTRY;
673
674         if (IS_ERR(mds->mds_lov_obd))
675                 RETURN(PTR_ERR(mds->mds_lov_obd));
676
677         if (mds->mds_lov_obd)
678                 RETURN(0);
679
680         mds->mds_lov_obd = class_name2obd(lov_name);
681         if (!mds->mds_lov_obd) {
682                 CERROR("MDS cannot locate LOV %s\n", lov_name);
683                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
684                 RETURN(-ENOTCONN);
685         }
686
687         cfs_mutex_down(&obd->obd_dev_sem);
688         rc = mds_lov_read_objids(obd);
689         cfs_mutex_up(&obd->obd_dev_sem);
690         if (rc) {
691                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
692                 GOTO(err_exit, rc);
693         }
694
695         rc = obd_register_observer(mds->mds_lov_obd, obd);
696         if (rc) {
697                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
698                        lov_name, rc);
699                 GOTO(err_exit, rc);
700         }
701
702         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
703          * targets */
704         obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
705
706         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
707
708         OBD_ALLOC(data, sizeof(*data));
709         if (data == NULL)
710                 GOTO(err_exit, rc = -ENOMEM);
711
712         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
713                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
714                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
715                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
716                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
717                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
718                                   OBD_CONNECT_SOM | OBD_CONNECT_FULL20;
719 #ifdef HAVE_LRU_RESIZE_SUPPORT
720         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
721 #endif
722         data->ocd_version = LUSTRE_VERSION_CODE;
723         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
724         /* send max bytes per rpc */
725         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
726         /* send the list of supported checksum types */
727         data->ocd_cksum_types = OBD_CKSUM_ALL;
728         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
729         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
730         OBD_FREE(data, sizeof(*data));
731         if (rc) {
732                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
733                 mds->mds_lov_obd = ERR_PTR(rc);
734                 RETURN(rc);
735         }
736
737         /* I want to see a callback happen when the OBD moves to a
738          * "For General Use" state, and that's when we'll call
739          * set_nextid().  The class driver can help us here, because
740          * it can use the obd_recovering flag to determine when the
741          * the OBD is full available. */
742         /* MDD device will care about that
743         if (!obd->obd_recovering)
744                 rc = mds_postrecov(obd);
745          */
746         RETURN(rc);
747
748 err_exit:
749         mds->mds_lov_exp = NULL;
750         mds->mds_lov_obd = ERR_PTR(rc);
751         RETURN(rc);
752 }
753
754 int mds_lov_disconnect(struct obd_device *obd)
755 {
756         struct mds_obd *mds = &obd->u.mds;
757         int rc = 0;
758         ENTRY;
759
760         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
761                 obd_register_observer(mds->mds_lov_obd, NULL);
762
763                 /* The actual disconnect of the mds_lov will be called from
764                  * class_disconnect_exports from mds_lov_clean. So we have to
765                  * ensure that class_cleanup doesn't fail due to the extra ref
766                  * we're holding now. The mechanism to do that already exists -
767                  * the obd_force flag. We'll drop the final ref to the
768                  * mds_lov_exp in mds_cleanup. */
769                 mds->mds_lov_obd->obd_force = 1;
770         }
771
772         RETURN(rc);
773 }
774
775 struct mds_lov_sync_info {
776         struct obd_device    *mlsi_obd;     /* the lov device to sync */
777         struct obd_device    *mlsi_watched; /* target osc */
778         __u32                 mlsi_index;   /* index of target */
779 };
780
781 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
782 {
783         struct mds_capa_info    info = { .uuid = uuid };
784         struct lustre_capa_key *key;
785         int i, rc = 0;
786
787         ENTRY;
788
789         if (!mds->mds_capa_keys)
790                 RETURN(0);
791
792         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
793         for (i = 0; i < 2; i++) {
794                 key = &mds->mds_capa_keys[i];
795                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
796
797                 info.capa = key;
798                 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
799                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
800                 if (rc) {
801                         DEBUG_CAPA_KEY(D_ERROR, key,
802                                        "propagate failed (rc = %d) for", rc);
803                         RETURN(rc);
804                 }
805         }
806
807         RETURN(0);
808 }
809
810 /* We only sync one osc at a time, so that we don't have to hold
811    any kind of lock on the whole mds_lov_desc, which may change
812    (grow) as a result of mds_lov_add_ost.  This also avoids any
813    kind of mismatch between the lov_desc and the mds_lov_desc,
814    which are not in lock-step during lov_add_obd */
815 static int __mds_lov_synchronize(void *data)
816 {
817         struct mds_lov_sync_info *mlsi = data;
818         struct obd_device *obd = mlsi->mlsi_obd;
819         struct obd_device *watched = mlsi->mlsi_watched;
820         struct mds_obd *mds = &obd->u.mds;
821         struct obd_uuid *uuid;
822         __u32  idx = mlsi->mlsi_index;
823         struct mds_group_info mgi;
824         struct llog_ctxt *ctxt;
825         int rc = 0;
826         ENTRY;
827
828         OBD_FREE_PTR(mlsi);
829
830         LASSERT(obd);
831         LASSERT(watched);
832         uuid = &watched->u.cli.cl_target_uuid;
833         LASSERT(uuid);
834
835         cfs_down_read(&mds->mds_notify_lock);
836         if (obd->obd_stopping || obd->obd_fail)
837                 GOTO(out, rc = -ENODEV);
838
839         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
840         rc = mds_lov_update_mds(obd, watched, idx);
841         if (rc != 0) {
842                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
843                 GOTO(out, rc);
844         }
845         mgi.group = mdt_to_obd_objseq(mds->mds_id);
846         mgi.uuid = uuid;
847
848         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
849                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
850         if (rc != 0)
851                 GOTO(out, rc);
852         /* propagate capability keys */
853         rc = mds_propagate_capa_keys(mds, uuid);
854         if (rc)
855                 GOTO(out, rc);
856
857         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
858         if (!ctxt)
859                 GOTO(out, rc = -ENODEV);
860
861         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
862         rc = llog_connect(ctxt, NULL, NULL, uuid);
863         llog_ctxt_put(ctxt);
864         if (rc != 0) {
865                 CERROR("%s failed at llog_origin_connect: %d\n",
866                        obd_uuid2str(uuid), rc);
867                 GOTO(out, rc);
868         }
869
870         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
871               obd->obd_name, obd_uuid2str(uuid));
872
873         rc = mds_lov_clear_orphans(mds, uuid);
874         if (rc != 0) {
875                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
876                        obd_uuid2str(uuid), rc);
877                 GOTO(out, rc);
878         }
879
880 #ifdef HAVE_QUOTA_SUPPORT
881         if (obd->obd_upcall.onu_owner) {
882                 /*
883                  * This is a hack for mds_notify->mdd_notify. When the mds obd
884                  * in mdd is removed, This hack should be removed.
885                  */
886                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
887                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
888                                                 obd->obd_upcall.onu_owner,NULL);
889         }
890 #endif
891         EXIT;
892 out:
893         cfs_up_read(&mds->mds_notify_lock);
894         if (rc) {
895                 /* Deactivate it for safety */
896                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
897                        rc);
898                 if (!obd->obd_stopping && mds->mds_lov_obd &&
899                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
900                         obd_notify(mds->mds_lov_obd, watched,
901                                    OBD_NOTIFY_INACTIVE, NULL);
902         }
903
904         class_decref(obd, "mds_lov_synchronize", obd);
905         return rc;
906 }
907
908 int mds_lov_synchronize(void *data)
909 {
910         struct mds_lov_sync_info *mlsi = data;
911         char name[20];
912
913         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
914         cfs_daemonize_ctxt(name);
915
916         RETURN(__mds_lov_synchronize(data));
917 }
918
919 int mds_lov_start_synchronize(struct obd_device *obd,
920                               struct obd_device *watched,
921                               void *data, enum obd_notify_event ev)
922 {
923         struct mds_lov_sync_info *mlsi;
924         int rc;
925         struct obd_uuid *uuid;
926         ENTRY;
927
928         LASSERT(watched);
929         uuid = &watched->u.cli.cl_target_uuid;
930
931         OBD_ALLOC(mlsi, sizeof(*mlsi));
932         if (mlsi == NULL)
933                 RETURN(-ENOMEM);
934
935         LASSERT(data);
936         mlsi->mlsi_obd = obd;
937         mlsi->mlsi_watched = watched;
938         mlsi->mlsi_index = *(__u32 *)data;
939
940         /* Although class_export_get(obd->obd_self_export) would lock
941            the MDS in place, since it's only a self-export
942            it doesn't lock the LOV in place.  The LOV can be disconnected
943            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
944            Simply taking an export ref on the LOV doesn't help, because it's
945            still disconnected. Taking an obd reference insures that we don't
946            disconnect the LOV.  This of course means a cleanup won't
947            finish for as long as the sync is blocking. */
948         class_incref(obd, "mds_lov_synchronize", obd);
949
950         if (ev != OBD_NOTIFY_SYNC) {
951                 /* Synchronize in the background */
952                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
953                                        CLONE_VM | CLONE_FILES);
954                 if (rc < 0) {
955                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
956                                obd->obd_name, rc);
957                         class_decref(obd, "mds_lov_synchronize", obd);
958                 } else {
959                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
960                                "thread=%d\n", obd->obd_name,
961                                mlsi->mlsi_index, rc);
962                         rc = 0;
963                 }
964         } else {
965                 rc = __mds_lov_synchronize((void *)mlsi);
966         }
967
968         RETURN(rc);
969 }
970
971 int mds_notify(struct obd_device *obd, struct obd_device *watched,
972                enum obd_notify_event ev, void *data)
973 {
974         struct mds_obd *mds = &obd->u.mds;
975         int rc = 0;
976         ENTRY;
977
978         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
979
980         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
981                 CERROR("unexpected notification of %s %s!\n",
982                        watched->obd_type->typ_name, watched->obd_name);
983                 RETURN(-EINVAL);
984         }
985
986         /*XXX this notifies the MDD until lov handling use old mds code
987          * must non block!
988          */
989         if (obd->obd_upcall.onu_owner) {
990                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
991                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
992                                                  obd->obd_upcall.onu_owner,
993                                                  &mds->mds_obt.obt_mount_count);
994         }
995
996         switch (ev) {
997         /* We only handle these: */
998         case OBD_NOTIFY_CREATE:
999                 CWARN("MDS %s: add target %s\n",obd->obd_name,
1000                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1001                 /* We still have to fix the lov descriptor for ost's */
1002                 LASSERT(data);
1003                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1004                                           &watched->u.cli.cl_target_uuid);
1005                 RETURN(rc);
1006         case OBD_NOTIFY_ACTIVE:
1007                 /* lov want one or more _active_ targets for work */
1008                 /* activate event should be pass lov idx as argument */
1009         case OBD_NOTIFY_SYNC:
1010         case OBD_NOTIFY_SYNC_NONBLOCK:
1011                 /* sync event should be pass lov idx as argument */
1012                 break;
1013         default:
1014                 RETURN(0);
1015         }
1016
1017         if (obd->obd_recovering) {
1018                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1019                       obd->obd_name,
1020                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1021                 /* We still have to fix the lov descriptor for ost's added
1022                    after the mdt in the config log.  They didn't make it into
1023                    mds_lov_connect. */
1024                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1025                                          &watched->u.cli.cl_target_uuid);
1026         } else {
1027                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1028         }
1029         RETURN(rc);
1030 }