Whamcloud - gitweb
LU-812 kernel: AUTOCONF_INCLUDED removed
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         if ((libcfs_debug & D_INFO) == 0)
62                 return;
63
64         CDEBUG(D_INFO, "dump from %s\n", label);
65         if (mds->mds_lov_page_dirty == NULL) {
66                 CERROR("NULL bitmap!\n");
67                 GOTO(skip_bitmap, i);
68         }
69
70         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
71                 CDEBUG(D_INFO, "%u - %lx\n", i,
72                        mds->mds_lov_page_dirty->data[i]);
73 skip_bitmap:
74         if (mds->mds_lov_page_array == NULL) {
75                 CERROR("not init page array!\n");
76                 GOTO(skip_array, i);
77
78         }
79         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
80                 obd_id *data = mds->mds_lov_page_array[i];
81
82                 if (data == NULL)
83                         continue;
84
85                 for(j=0; j < OBJID_PER_PAGE(); j++) {
86                         if (data[j] == 0)
87                                 continue;
88                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
89                                i, j, data[j]);
90                 }
91         }
92 skip_array:
93         EXIT;
94 }
95
96 int mds_lov_init_objids(struct obd_device *obd)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
100         struct file *file;
101         int rc;
102         ENTRY;
103
104         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
105
106         mds->mds_lov_page_dirty =
107                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
108         if (mds->mds_lov_page_dirty == NULL)
109                 RETURN(-ENOMEM);
110
111
112         OBD_ALLOC(mds->mds_lov_page_array, size);
113         if (mds->mds_lov_page_array == NULL)
114                 GOTO(err_free_bitmap, rc = -ENOMEM);
115
116         /* open and test the lov objd file */
117         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
118         if (IS_ERR(file)) {
119                 rc = PTR_ERR(file);
120                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
121                 GOTO(err_free, rc = PTR_ERR(file));
122         }
123         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
124                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
125                        file->f_dentry->d_inode->i_mode);
126                 GOTO(err_open, rc = -ENOENT);
127         }
128         mds->mds_lov_objid_filp = file;
129
130         RETURN (0);
131 err_open:
132         if (filp_close((struct file *)file, 0))
133                 CERROR("can't close %s after error\n", LOV_OBJID);
134 err_free:
135         OBD_FREE(mds->mds_lov_page_array, size);
136 err_free_bitmap:
137         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
138
139         RETURN(rc);
140 }
141
142 void mds_lov_destroy_objids(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         int i, rc;
146         ENTRY;
147
148         if (mds->mds_lov_page_array != NULL) {
149                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150                         obd_id *data = mds->mds_lov_page_array[i];
151                         if (data != NULL)
152                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
153                 }
154                 OBD_FREE(mds->mds_lov_page_array,
155                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
156         }
157
158         if (mds->mds_lov_objid_filp) {
159                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160                 mds->mds_lov_objid_filp = NULL;
161                 if (rc)
162                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
163         }
164
165         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
166         EXIT;
167 }
168
169 /**
170  * currently exist two ways for know about ost count and max ost index.
171  * first - after ost is connected to mds and sync process finished
172  * second - get from lmm in recovery process, in case when mds not have configs,
173  * and ost isn't registered in mgs.
174  *
175  * \param mds pointer to mds structure
176  * \param index maxium ost index
177  *
178  * \retval -ENOMEM is not hame memory for new page
179  * \retval 0 is update passed
180  */
181 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
182 {
183         __u32 page = index / OBJID_PER_PAGE();
184         __u32 off = index % OBJID_PER_PAGE();
185         obd_id *data =  mds->mds_lov_page_array[page];
186
187         if (data == NULL) {
188                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
189                 if (data == NULL)
190                         RETURN(-ENOMEM);
191
192                 mds->mds_lov_page_array[page] = data;
193         }
194
195         if (index > mds->mds_lov_objid_max_index) {
196                 mds->mds_lov_objid_lastpage = page;
197                 mds->mds_lov_objid_lastidx = off;
198                 mds->mds_lov_objid_max_index = index;
199         }
200
201         /* workaround - New target not in objids file; increase mdsize */
202         /* ld_tgt_count is used as the max index everywhere, despite its name. */
203         if (data[off] == 0) {
204                 __u32 max_easize;
205                 __u32 stripes;
206
207                 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
208                 data[off] = 1;
209                 mds->mds_lov_objid_count++;
210                 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
211                               mds->mds_lov_objid_count);
212
213                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
214                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
215
216                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
217                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
218                        mds->mds_max_cookiesize);
219         }
220
221         EXIT;
222         return 0;
223 }
224
225 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
226 {
227         __u32 page = index / OBJID_PER_PAGE();
228         __u32 off = index % OBJID_PER_PAGE();
229         obd_id *data =  mds->mds_lov_page_array[page];
230
231         return (data[off] > 0);
232 }
233
234 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
235 {
236         struct lov_ost_data_v1 *data;
237         __u16 count;
238         int rc = 0;
239         __u32 j;
240
241         /* if we create file without objects - lmm is NULL */
242         if (lmm == NULL)
243                 return 0;
244
245         switch (le32_to_cpu(lmm->lmm_magic)) {
246                 case LOV_MAGIC_V1:
247                         count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
248                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
249                         break;
250                 case LOV_MAGIC_V3:
251                         count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
252                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
253                         break;
254                 default:
255                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
256                         RETURN(-EINVAL);
257         }
258
259
260         cfs_mutex_lock(&obd->obd_dev_mutex);
261         for (j = 0; j < count; j++) {
262                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
263                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
264                         rc = -ENOMEM;
265                         break;
266                 }
267         }
268         cfs_mutex_unlock(&obd->obd_dev_mutex);
269
270         RETURN(rc);
271 }
272 EXPORT_SYMBOL(mds_lov_prepare_objids);
273
274 /*
275  * write llog orphan record about lost ost object,
276  * Special lsm is allocated with single stripe, caller should deallocated it
277  * after use
278  */
279 static int mds_log_lost_precreated(struct obd_device *obd,
280                                    struct lov_stripe_md **lsmp, __u16 *stripes,
281                                    obd_id id, obd_count count, int idx)
282 {
283         struct lov_stripe_md *lsm = *lsmp;
284         int rc;
285         ENTRY;
286
287         if (*lsmp == NULL) {
288                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
289                 if (rc < 0)
290                         RETURN(rc);
291                 /* need only one stripe, save old value */
292                 *stripes = lsm->lsm_stripe_count;
293                 lsm->lsm_stripe_count = 1;
294                 *lsmp = lsm;
295         }
296
297         lsm->lsm_oinfo[0]->loi_id = id;
298         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
299         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
300
301         rc = mds_log_op_orphan(obd, lsm, count);
302         RETURN(rc);
303 }
304
305 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
306 {
307         struct mds_obd *mds = &obd->u.mds;
308         int j;
309         struct lov_ost_data_v1 *obj;
310         struct lov_stripe_md *lsm = NULL;
311         __u16 stripes = 0;
312         int count;
313         ENTRY;
314
315         /* if we create file without objects - lmm is NULL */
316         if (lmm == NULL)
317                 return;
318
319         switch (le32_to_cpu(lmm->lmm_magic)) {
320                 case LOV_MAGIC_V1:
321                         count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
322                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
323                         break;
324                 case LOV_MAGIC_V3:
325                         count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
326                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
327                         break;
328                 default:
329                         CERROR("Unknow lmm type %X !\n",
330                                le32_to_cpu(lmm->lmm_magic));
331                         return;
332         }
333
334         for (j = 0; j < count; j++) {
335                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
336                 obd_id id = le64_to_cpu(obj[j].l_object_id);
337                 __u32 page = i / OBJID_PER_PAGE();
338                 __u32 idx = i % OBJID_PER_PAGE();
339                 obd_id *data;
340
341                 data = mds->mds_lov_page_array[page];
342
343                 CDEBUG(D_INODE,"update last object for ost %u"
344                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
345                 if (id > data[idx]) {
346                         int lost = id - data[idx] - 1;
347                         /* we might have lost precreated objects due to VBR */
348                         if (lost > 0 && obd->obd_recovering) {
349                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
350                                 if (!obd->obd_version_recov)
351                                         CERROR("Unexpected gap in objids\n");
352                                 /* lsm is allocated if NULL */
353                                 mds_log_lost_precreated(obd, &lsm, &stripes,
354                                                         data[idx]+1, lost, i);
355                         }
356                         data[idx] = id;
357                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
358                 }
359         }
360         if (lsm) {
361                 /* restore stripes number */
362                 lsm->lsm_stripe_count = stripes;
363                 obd_free_memmd(mds->mds_lov_exp, &lsm);
364         }
365         EXIT;
366         return;
367 }
368 EXPORT_SYMBOL(mds_lov_update_objids);
369
370 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
371                                     __u32 count)
372 {
373         __u32 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
374         __u32 i, stripes;
375
376         for (i = 0; i < count; i++) {
377                 if (data[i] == 0)
378                         continue;
379
380                 mds->mds_lov_objid_count++;
381         }
382
383         stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
384                          mds->mds_lov_objid_count);
385
386         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
387         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
388
389         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
390                "%d/%d\n", stripes, mds->mds_max_mdsize,
391                mds->mds_max_cookiesize);
392
393         EXIT;
394         return 0;
395 }
396
397 static int mds_lov_read_objids(struct obd_device *obd)
398 {
399         struct mds_obd *mds = &obd->u.mds;
400         loff_t off = 0;
401         int i, rc = 0, count = 0, page = 0;
402         unsigned long size;
403         ENTRY;
404
405         /* Read everything in the file, even if our current lov desc
406            has fewer targets. Old targets not in the lov descriptor
407            during mds setup may still have valid objids. */
408         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
409         if (size == 0)
410                 RETURN(0);
411
412         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
413         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
414         for (i = 0; i < page; i++) {
415                 obd_id *data;
416                 loff_t off_old = off;
417
418                 LASSERT(mds->mds_lov_page_array[i] == NULL);
419                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
420                 if (mds->mds_lov_page_array[i] == NULL)
421                         GOTO(out, rc = -ENOMEM);
422
423                 data = mds->mds_lov_page_array[i];
424
425                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
426                                         MDS_LOV_ALLOC_SIZE, &off);
427                 if (rc < 0) {
428                         CERROR("Error reading objids %d\n", rc);
429                         GOTO(out, rc);
430                 }
431                 if (off == off_old) /* hole is read */
432                         off += MDS_LOV_ALLOC_SIZE;
433
434                 count = (off - off_old) / sizeof(obd_id);
435                 if (mds_lov_update_from_read(mds, data, count)) {
436                         CERROR("Can't update mds data\n");
437                         GOTO(out, rc = -EIO);
438                 }
439         }
440         mds->mds_lov_objid_lastpage = page - 1;
441         mds->mds_lov_objid_lastidx = count - 1;
442
443         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
444                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
445 out:
446         mds_lov_dump_objids("read",obd);
447
448         RETURN(rc);
449 }
450
451 int mds_lov_write_objids(struct obd_device *obd)
452 {
453         struct mds_obd *mds = &obd->u.mds;
454         int i = 0, rc = 0;
455         ENTRY;
456
457         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
458                 RETURN(0);
459
460         mds_lov_dump_objids("write", obd);
461
462         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
463                 obd_id *data =  mds->mds_lov_page_array[i];
464                 unsigned int size = MDS_LOV_ALLOC_SIZE;
465                 loff_t off = i * size;
466
467                 LASSERT(data != NULL);
468
469                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
470                         continue;
471
472                 /* check for particaly filled last page */
473                 if (i == mds->mds_lov_objid_lastpage)
474                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
475
476                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
477                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
478                                          size, &off, 0);
479                 if (rc < 0) {
480                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
481                         break;
482                 }
483         }
484         if (rc >= 0)
485                 rc = 0;
486
487         RETURN(rc);
488 }
489 EXPORT_SYMBOL(mds_lov_write_objids);
490
491 static int mds_lov_get_objid(struct obd_device * obd,
492                              obd_id idx)
493 {
494         struct mds_obd *mds = &obd->u.mds;
495         struct obd_export *lov_exp = mds->mds_lov_exp;
496         unsigned int page;
497         unsigned int off;
498         obd_id *data;
499         __u32 size;
500         int rc = 0;
501         ENTRY;
502
503         page = idx / OBJID_PER_PAGE();
504         off = idx % OBJID_PER_PAGE();
505         data = mds->mds_lov_page_array[page];
506
507         if (data[off] < 2) {
508                 /* We never read this lastid; ask the osc */
509                 struct obd_id_info lastid;
510
511                 size = sizeof(lastid);
512                 lastid.idx = idx;
513                 lastid.data = &data[off];
514                 rc = obd_get_info(NULL, lov_exp, sizeof(KEY_LAST_ID),
515                                   KEY_LAST_ID, &size, &lastid, NULL);
516                 if (rc)
517                         GOTO(out, rc);
518
519                 /* workaround for clean filter */
520                 if (data[off] == 0)
521                         data[off] = 1;
522
523                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
524         }
525         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
526                idx, data, page, off, data[off]);
527 out:
528         RETURN(rc);
529 }
530
531 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
532 {
533         int rc;
534         struct obdo oa = { 0 };
535         struct obd_trans_info oti = {0};
536         struct lov_stripe_md  *empty_ea = NULL;
537         ENTRY;
538
539         LASSERT(mds->mds_lov_page_array != NULL);
540
541         /* This create will in fact either create or destroy:  If the OST is
542          * missing objects below this ID, they will be created.  If it finds
543          * objects above this ID, they will be removed. */
544         memset(&oa, 0, sizeof(oa));
545         oa.o_flags = OBD_FL_DELORPHAN;
546         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
547         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
548         if (ost_uuid != NULL)
549                 oti.oti_ost_uuid = ost_uuid;
550
551         rc = obd_create(NULL, mds->mds_lov_exp, &oa, &empty_ea, &oti);
552
553         RETURN(rc);
554 }
555
556 /* for one target */
557 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
558 {
559         struct mds_obd *mds = &obd->u.mds;
560         int rc;
561         struct obd_id_info info;
562         ENTRY;
563
564         LASSERT(!obd->obd_recovering);
565
566         info.idx = idx;
567         info.data = id;
568         rc = obd_set_info_async(NULL, mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
569                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
570         if (rc)
571                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
572                         obd->obd_name, rc);
573
574         RETURN(rc);
575 }
576
577 /* Update the lov desc for a new size lov. */
578 static int mds_lov_update_desc(struct obd_device *obd, int idx,
579                                struct obd_uuid *uuid)
580 {
581         struct mds_obd *mds = &obd->u.mds;
582         struct lov_desc *ld;
583         __u32 valsize = sizeof(mds->mds_lov_desc);
584         int rc = 0;
585         ENTRY;
586
587         OBD_ALLOC(ld, sizeof(*ld));
588         if (!ld)
589                 RETURN(-ENOMEM);
590
591         rc = obd_get_info(NULL, mds->mds_lov_exp, sizeof(KEY_LOVDESC),
592                           KEY_LOVDESC, &valsize, ld, NULL);
593         if (rc)
594                 GOTO(out, rc);
595
596         /* Don't change the mds_lov_desc until the objids size matches the
597            count (paranoia) */
598         mds->mds_lov_desc = *ld;
599         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
600                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
601
602         cfs_mutex_lock(&obd->obd_dev_mutex);
603         rc = mds_lov_update_max_ost(mds, idx);
604         cfs_mutex_unlock(&obd->obd_dev_mutex);
605         if (rc != 0)
606                 GOTO(out, rc );
607
608         /* If we added a target we have to reconnect the llogs */
609         /* We only _need_ to do this at first add (idx), or the first time
610            after recovery.  However, it should now be safe to call anytime. */
611         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
612         if (rc)
613                 GOTO(out, rc);
614
615 out:
616         OBD_FREE(ld, sizeof(*ld));
617         RETURN(rc);
618 }
619
620 /* Inform MDS about new/updated target */
621 static int mds_lov_update_mds(struct obd_device *obd,
622                               struct obd_device *watched,
623                               __u32 idx)
624 {
625         struct mds_obd *mds = &obd->u.mds;
626         int rc = 0;
627         int page;
628         int off;
629         obd_id *data;
630         ENTRY;
631
632         LASSERT(mds_lov_objinit(mds, idx));
633
634         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
635                idx, obd->obd_recovering, obd->obd_async_recov,
636                mds->mds_lov_desc.ld_tgt_count);
637
638         /* idx is set as data from lov_notify. */
639         if (obd->obd_recovering)
640                 GOTO(out, rc);
641
642         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
643                 CERROR("index %d > count %d!\n", idx,
644                        mds->mds_lov_desc.ld_tgt_count);
645                 GOTO(out, rc = -EINVAL);
646         }
647
648         rc = mds_lov_get_objid(obd, idx);
649         if (rc)
650                 GOTO(out, rc);
651
652         page = idx / OBJID_PER_PAGE();
653         off = idx % OBJID_PER_PAGE();
654         data = mds->mds_lov_page_array[page];
655
656         /* We have read this lastid from disk; tell the osc.
657            Don't call this during recovery. */
658         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
659         if (rc) {
660                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
661                 /* Don't abort the rest of the sync */
662                 rc = 0;
663         } else {
664                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
665                         data[off], idx, rc);
666         }
667 out:
668         RETURN(rc);
669 }
670
671 /* update the LOV-OSC knowledge of the last used object id's */
672 int mds_lov_connect(struct obd_device *obd, char * lov_name)
673 {
674         struct mds_obd *mds = &obd->u.mds;
675         struct obd_connect_data *data;
676         int rc;
677         ENTRY;
678
679         if (IS_ERR(mds->mds_lov_obd))
680                 RETURN(PTR_ERR(mds->mds_lov_obd));
681
682         if (mds->mds_lov_obd)
683                 RETURN(0);
684
685         mds->mds_lov_obd = class_name2obd(lov_name);
686         if (!mds->mds_lov_obd) {
687                 CERROR("MDS cannot locate LOV %s\n", lov_name);
688                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
689                 RETURN(-ENOTCONN);
690         }
691
692         cfs_mutex_lock(&obd->obd_dev_mutex);
693         rc = mds_lov_read_objids(obd);
694         cfs_mutex_unlock(&obd->obd_dev_mutex);
695         if (rc) {
696                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
697                 GOTO(err_exit, rc);
698         }
699
700         rc = obd_register_observer(mds->mds_lov_obd, obd);
701         if (rc) {
702                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
703                        lov_name, rc);
704                 GOTO(err_exit, rc);
705         }
706
707         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
708          * targets */
709         obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
710
711         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
712
713         OBD_ALLOC(data, sizeof(*data));
714         if (data == NULL)
715                 GOTO(err_exit, rc = -ENOMEM);
716
717         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
718                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
719                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FULL20  |
720                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
721                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
722                                   OBD_CONNECT_SOM | OBD_CONNECT_MAX_EASIZE;
723 #ifdef HAVE_LRU_RESIZE_SUPPORT
724         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
725 #endif
726         data->ocd_version = LUSTRE_VERSION_CODE;
727         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
728         data->ocd_max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
729
730         /* send max bytes per rpc */
731         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
732         /* send the list of supported checksum types */
733         data->ocd_cksum_types = cksum_types_supported_client();
734         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
735         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
736         OBD_FREE(data, sizeof(*data));
737         if (rc) {
738                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
739                 mds->mds_lov_obd = ERR_PTR(rc);
740                 RETURN(rc);
741         }
742
743         /* I want to see a callback happen when the OBD moves to a
744          * "For General Use" state, and that's when we'll call
745          * set_nextid().  The class driver can help us here, because
746          * it can use the obd_recovering flag to determine when the
747          * the OBD is full available. */
748         /* MDD device will care about that
749         if (!obd->obd_recovering)
750                 rc = mds_postrecov(obd);
751          */
752         RETURN(rc);
753
754 err_exit:
755         mds->mds_lov_exp = NULL;
756         mds->mds_lov_obd = ERR_PTR(rc);
757         RETURN(rc);
758 }
759
760 int mds_lov_disconnect(struct obd_device *obd)
761 {
762         struct mds_obd *mds = &obd->u.mds;
763         int rc = 0;
764         ENTRY;
765
766         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
767                 obd_register_observer(mds->mds_lov_obd, NULL);
768
769                 /* The actual disconnect of the mds_lov will be called from
770                  * class_disconnect_exports from mds_lov_clean. So we have to
771                  * ensure that class_cleanup doesn't fail due to the extra ref
772                  * we're holding now. The mechanism to do that already exists -
773                  * the obd_force flag. We'll drop the final ref to the
774                  * mds_lov_exp in mds_cleanup. */
775                 mds->mds_lov_obd->obd_force = 1;
776         }
777
778         RETURN(rc);
779 }
780
781 struct mds_lov_sync_info {
782         struct obd_device    *mlsi_obd;     /* the lov device to sync */
783         struct obd_device    *mlsi_watched; /* target osc */
784         __u32                 mlsi_index;   /* index of target */
785 };
786
787 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
788 {
789         struct mds_capa_info    info = { .uuid = uuid };
790         struct lustre_capa_key *key;
791         int i, rc = 0;
792
793         ENTRY;
794
795         if (!mds->mds_capa_keys)
796                 RETURN(0);
797
798         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
799         for (i = 0; i < 2; i++) {
800                 key = &mds->mds_capa_keys[i];
801                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
802
803                 info.capa = key;
804                 rc = obd_set_info_async(NULL, mds->mds_lov_exp,
805                                         sizeof(KEY_CAPA_KEY), KEY_CAPA_KEY,
806                                         sizeof(info), &info, NULL);
807                 if (rc) {
808                         DEBUG_CAPA_KEY(D_ERROR, key,
809                                        "propagate failed (rc = %d) for", rc);
810                         RETURN(rc);
811                 }
812         }
813
814         RETURN(0);
815 }
816
817 /* We only sync one osc at a time, so that we don't have to hold
818    any kind of lock on the whole mds_lov_desc, which may change
819    (grow) as a result of mds_lov_add_ost.  This also avoids any
820    kind of mismatch between the lov_desc and the mds_lov_desc,
821    which are not in lock-step during lov_add_obd */
822 static int __mds_lov_synchronize(void *data)
823 {
824         struct mds_lov_sync_info *mlsi = data;
825         struct obd_device *obd = mlsi->mlsi_obd;
826         struct obd_device *watched = mlsi->mlsi_watched;
827         struct mds_obd *mds = &obd->u.mds;
828         struct obd_uuid *uuid;
829         __u32  idx = mlsi->mlsi_index;
830         struct mds_group_info mgi;
831         struct llog_ctxt *ctxt;
832         int rc = 0;
833         ENTRY;
834
835         OBD_FREE_PTR(mlsi);
836
837         LASSERT(obd);
838         LASSERT(watched);
839         uuid = &watched->u.cli.cl_target_uuid;
840         LASSERT(uuid);
841
842         cfs_down_read(&mds->mds_notify_lock);
843         if (obd->obd_stopping || obd->obd_fail)
844                 GOTO(out, rc = -ENODEV);
845
846         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
847         rc = mds_lov_update_mds(obd, watched, idx);
848         if (rc != 0) {
849                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
850                 GOTO(out, rc);
851         }
852         mgi.group = mdt_to_obd_objseq(mds->mds_id);
853         mgi.uuid = uuid;
854
855         rc = obd_set_info_async(NULL, mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
856                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
857         if (rc != 0)
858                 GOTO(out, rc);
859         /* propagate capability keys */
860         rc = mds_propagate_capa_keys(mds, uuid);
861         if (rc)
862                 GOTO(out, rc);
863
864         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
865         if (!ctxt)
866                 GOTO(out, rc = -ENODEV);
867
868         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
869         rc = llog_connect(ctxt, NULL, NULL, uuid);
870         llog_ctxt_put(ctxt);
871         if (rc != 0) {
872                 CERROR("%s failed at llog_origin_connect: %d\n",
873                        obd_uuid2str(uuid), rc);
874                 GOTO(out, rc);
875         }
876
877         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
878               obd->obd_name, obd_uuid2str(uuid));
879
880         rc = mds_lov_clear_orphans(mds, uuid);
881         if (rc != 0) {
882                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
883                        obd_uuid2str(uuid), rc);
884                 GOTO(out, rc);
885         }
886
887 #ifdef HAVE_QUOTA_SUPPORT
888         if (obd->obd_upcall.onu_owner) {
889                 /*
890                  * This is a hack for mds_notify->mdd_notify. When the mds obd
891                  * in mdd is removed, This hack should be removed.
892                  */
893                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
894                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
895                                                 obd->obd_upcall.onu_owner,NULL);
896         }
897 #endif
898         EXIT;
899 out:
900         if (rc) {
901                 /* Deactivate it for safety */
902                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
903                        rc);
904                 if (!obd->obd_stopping && mds->mds_lov_obd &&
905                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
906                         obd_notify(mds->mds_lov_obd, watched,
907                                    OBD_NOTIFY_INACTIVE, NULL);
908         }
909         cfs_up_read(&mds->mds_notify_lock);
910
911         class_decref(obd, "mds_lov_synchronize", obd);
912         return rc;
913 }
914
915 int mds_lov_synchronize(void *data)
916 {
917         struct mds_lov_sync_info *mlsi = data;
918         char name[20];
919
920         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
921         cfs_daemonize_ctxt(name);
922
923         RETURN(__mds_lov_synchronize(data));
924 }
925
926 int mds_lov_start_synchronize(struct obd_device *obd,
927                               struct obd_device *watched,
928                               void *data, enum obd_notify_event ev)
929 {
930         struct mds_lov_sync_info *mlsi;
931         int rc;
932         struct obd_uuid *uuid;
933         ENTRY;
934
935         LASSERT(watched);
936         uuid = &watched->u.cli.cl_target_uuid;
937
938         OBD_ALLOC(mlsi, sizeof(*mlsi));
939         if (mlsi == NULL)
940                 RETURN(-ENOMEM);
941
942         LASSERT(data);
943         mlsi->mlsi_obd = obd;
944         mlsi->mlsi_watched = watched;
945         mlsi->mlsi_index = *(__u32 *)data;
946
947         /* Although class_export_get(obd->obd_self_export) would lock
948            the MDS in place, since it's only a self-export
949            it doesn't lock the LOV in place.  The LOV can be disconnected
950            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
951            Simply taking an export ref on the LOV doesn't help, because it's
952            still disconnected. Taking an obd reference insures that we don't
953            disconnect the LOV.  This of course means a cleanup won't
954            finish for as long as the sync is blocking. */
955         class_incref(obd, "mds_lov_synchronize", obd);
956
957         if (ev != OBD_NOTIFY_SYNC) {
958                 /* Synchronize in the background */
959                 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
960                                        CFS_DAEMON_FLAGS);
961                 if (rc < 0) {
962                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
963                                obd->obd_name, rc);
964                         class_decref(obd, "mds_lov_synchronize", obd);
965                 } else {
966                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
967                                "thread=%d\n", obd->obd_name,
968                                mlsi->mlsi_index, rc);
969                         rc = 0;
970                 }
971         } else {
972                 rc = __mds_lov_synchronize((void *)mlsi);
973         }
974
975         RETURN(rc);
976 }
977
978 int mds_notify(struct obd_device *obd, struct obd_device *watched,
979                enum obd_notify_event ev, void *data)
980 {
981         struct mds_obd *mds = &obd->u.mds;
982         int rc = 0;
983         ENTRY;
984
985         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
986
987         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
988                 CERROR("unexpected notification of %s %s!\n",
989                        watched->obd_type->typ_name, watched->obd_name);
990                 RETURN(-EINVAL);
991         }
992
993         /*XXX this notifies the MDD until lov handling use old mds code
994          * must non block!
995          */
996         if (obd->obd_upcall.onu_owner) {
997                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
998                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
999                                                  obd->obd_upcall.onu_owner,
1000                                                  &mds->mds_obt.obt_mount_count);
1001         }
1002
1003         switch (ev) {
1004         /* We only handle these: */
1005         case OBD_NOTIFY_CREATE:
1006                 CDEBUG(D_CONFIG, "%s: add target %s\n", obd->obd_name,
1007                        obd_uuid2str(&watched->u.cli.cl_target_uuid));
1008                 /* We still have to fix the lov descriptor for ost's */
1009                 LASSERT(data);
1010                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1011                                           &watched->u.cli.cl_target_uuid);
1012                 RETURN(rc);
1013         case OBD_NOTIFY_ACTIVE:
1014                 /* lov want one or more _active_ targets for work */
1015                 /* activate event should be pass lov idx as argument */
1016         case OBD_NOTIFY_SYNC:
1017         case OBD_NOTIFY_SYNC_NONBLOCK:
1018                 /* sync event should be pass lov idx as argument */
1019                 break;
1020         default:
1021                 RETURN(0);
1022         }
1023
1024         if (obd->obd_recovering) {
1025                 CDEBUG(D_CONFIG, "%s: Is in recovery, "
1026                        "not resetting orphans on %s\n",
1027                        obd->obd_name,
1028                        obd_uuid2str(&watched->u.cli.cl_target_uuid));
1029                 /* We still have to fix the lov descriptor for ost's added
1030                    after the mdt in the config log.  They didn't make it into
1031                    mds_lov_connect. */
1032                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1033                                          &watched->u.cli.cl_target_uuid);
1034         } else {
1035                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1036         }
1037         RETURN(rc);
1038 }