Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         if ((libcfs_debug & D_INFO) == 0)
62                 return;
63
64         CDEBUG(D_INFO, "dump from %s\n", label);
65         if (mds->mds_lov_page_dirty == NULL) {
66                 CERROR("NULL bitmap!\n");
67                 GOTO(skip_bitmap, i);
68         }
69
70         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
71                 CDEBUG(D_INFO, "%u - %lx\n", i,
72                        mds->mds_lov_page_dirty->data[i]);
73 skip_bitmap:
74         if (mds->mds_lov_page_array == NULL) {
75                 CERROR("not init page array!\n");
76                 GOTO(skip_array, i);
77
78         }
79         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
80                 obd_id *data = mds->mds_lov_page_array[i];
81
82                 if (data == NULL)
83                         continue;
84
85                 for(j=0; j < OBJID_PER_PAGE(); j++) {
86                         if (data[j] == 0)
87                                 continue;
88                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
89                                i, j, data[j]);
90                 }
91         }
92 skip_array:
93         EXIT;
94 }
95
96 int mds_lov_init_objids(struct obd_device *obd)
97 {
98         struct mds_obd *mds = &obd->u.mds;
99         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
100         struct file *file;
101         int rc;
102         ENTRY;
103
104         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
105
106         mds->mds_lov_page_dirty =
107                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
108         if (mds->mds_lov_page_dirty == NULL)
109                 RETURN(-ENOMEM);
110
111
112         OBD_ALLOC(mds->mds_lov_page_array, size);
113         if (mds->mds_lov_page_array == NULL)
114                 GOTO(err_free_bitmap, rc = -ENOMEM);
115
116         /* open and test the lov objd file */
117         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
118         if (IS_ERR(file)) {
119                 rc = PTR_ERR(file);
120                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
121                 GOTO(err_free, rc = PTR_ERR(file));
122         }
123         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
124                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
125                        file->f_dentry->d_inode->i_mode);
126                 GOTO(err_open, rc = -ENOENT);
127         }
128         mds->mds_lov_objid_filp = file;
129
130         RETURN (0);
131 err_open:
132         if (filp_close((struct file *)file, 0))
133                 CERROR("can't close %s after error\n", LOV_OBJID);
134 err_free:
135         OBD_FREE(mds->mds_lov_page_array, size);
136 err_free_bitmap:
137         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
138
139         RETURN(rc);
140 }
141
142 void mds_lov_destroy_objids(struct obd_device *obd)
143 {
144         struct mds_obd *mds = &obd->u.mds;
145         int i, rc;
146         ENTRY;
147
148         if (mds->mds_lov_page_array != NULL) {
149                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
150                         obd_id *data = mds->mds_lov_page_array[i];
151                         if (data != NULL)
152                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
153                 }
154                 OBD_FREE(mds->mds_lov_page_array,
155                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
156         }
157
158         if (mds->mds_lov_objid_filp) {
159                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
160                 mds->mds_lov_objid_filp = NULL;
161                 if (rc)
162                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
163         }
164
165         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
166         EXIT;
167 }
168
169 /**
170  * currently exist two ways for know about ost count and max ost index.
171  * first - after ost is connected to mds and sync process finished
172  * second - get from lmm in recovery process, in case when mds not have configs,
173  * and ost isn't registered in mgs.
174  *
175  * \param mds pointer to mds structure
176  * \param index maxium ost index
177  *
178  * \retval -ENOMEM is not hame memory for new page
179  * \retval 0 is update passed
180  */
181 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
182 {
183         __u32 page = index / OBJID_PER_PAGE();
184         __u32 off = index % OBJID_PER_PAGE();
185         obd_id *data =  mds->mds_lov_page_array[page];
186
187         if (data == NULL) {
188                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
189                 if (data == NULL)
190                         RETURN(-ENOMEM);
191
192                 mds->mds_lov_page_array[page] = data;
193         }
194
195         if (index > mds->mds_lov_objid_max_index) {
196                 mds->mds_lov_objid_lastpage = page;
197                 mds->mds_lov_objid_lastidx = off;
198                 mds->mds_lov_objid_max_index = index;
199         }
200
201         /* workaround - New target not in objids file; increase mdsize */
202         /* ld_tgt_count is used as the max index everywhere, despite its name. */
203         if (data[off] == 0) {
204                 __u32 max_easize;
205                 __u32 stripes;
206
207                 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
208                 data[off] = 1;
209                 mds->mds_lov_objid_count++;
210                 stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
211                               mds->mds_lov_objid_count);
212
213                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
214                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
215
216                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
217                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
218                        mds->mds_max_cookiesize);
219         }
220
221         EXIT;
222         return 0;
223 }
224
225 static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
226 {
227         __u32 page = index / OBJID_PER_PAGE();
228         __u32 off = index % OBJID_PER_PAGE();
229         obd_id *data =  mds->mds_lov_page_array[page];
230
231         return (data[off] > 0);
232 }
233
234 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
235 {
236         struct lov_ost_data_v1 *data;
237         __u16 count;
238         int rc = 0;
239         __u32 j;
240
241         /* if we create file without objects - lmm is NULL */
242         if (lmm == NULL)
243                 return 0;
244
245         switch (le32_to_cpu(lmm->lmm_magic)) {
246                 case LOV_MAGIC_V1:
247                         count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
248                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
249                         break;
250                 case LOV_MAGIC_V3:
251                         count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
252                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
253                         break;
254                 default:
255                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
256                         RETURN(-EINVAL);
257         }
258
259
260         cfs_mutex_lock(&obd->obd_dev_mutex);
261         for (j = 0; j < count; j++) {
262                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
263                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
264                         rc = -ENOMEM;
265                         break;
266                 }
267         }
268         cfs_mutex_unlock(&obd->obd_dev_mutex);
269
270         RETURN(rc);
271 }
272 EXPORT_SYMBOL(mds_lov_prepare_objids);
273
274 /*
275  * write llog orphan record about lost ost object,
276  * Special lsm is allocated with single stripe, caller should deallocated it
277  * after use
278  */
279 static int mds_log_lost_precreated(struct obd_device *obd,
280                                    struct lov_stripe_md **lsmp, __u16 *stripes,
281                                    obd_id id, obd_count count, int idx)
282 {
283         struct lov_stripe_md *lsm = *lsmp;
284         int rc;
285         ENTRY;
286
287         if (*lsmp == NULL) {
288                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
289                 if (rc < 0)
290                         RETURN(rc);
291                 /* need only one stripe, save old value */
292                 *stripes = lsm->lsm_stripe_count;
293                 lsm->lsm_stripe_count = 1;
294                 *lsmp = lsm;
295         }
296
297         lsm->lsm_oinfo[0]->loi_id = id;
298         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
299         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
300
301         rc = mds_log_op_orphan(obd, lsm, count);
302         RETURN(rc);
303 }
304
305 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
306 {
307         struct mds_obd *mds = &obd->u.mds;
308         int j;
309         struct lov_ost_data_v1 *obj;
310         struct lov_stripe_md *lsm = NULL;
311         __u16 stripes = 0;
312         int count;
313         ENTRY;
314
315         /* if we create file without objects - lmm is NULL */
316         if (lmm == NULL)
317                 return;
318
319         switch (le32_to_cpu(lmm->lmm_magic)) {
320                 case LOV_MAGIC_V1:
321                         count = le16_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
322                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
323                         break;
324                 case LOV_MAGIC_V3:
325                         count = le16_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
326                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
327                         break;
328                 default:
329                         CERROR("Unknow lmm type %X !\n",
330                                le32_to_cpu(lmm->lmm_magic));
331                         return;
332         }
333
334         for (j = 0; j < count; j++) {
335                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
336                 obd_id id = le64_to_cpu(obj[j].l_object_id);
337                 __u32 page = i / OBJID_PER_PAGE();
338                 __u32 idx = i % OBJID_PER_PAGE();
339                 obd_id *data;
340
341                 data = mds->mds_lov_page_array[page];
342
343                 CDEBUG(D_INODE,"update last object for ost %u"
344                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
345                 if (id > data[idx]) {
346                         int lost = id - data[idx] - 1;
347                         /* we might have lost precreated objects due to VBR */
348                         if (lost > 0 && obd->obd_recovering) {
349                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
350                                 if (!obd->obd_version_recov)
351                                         CERROR("Unexpected gap in objids\n");
352                                 /* lsm is allocated if NULL */
353                                 mds_log_lost_precreated(obd, &lsm, &stripes,
354                                                         data[idx]+1, lost, i);
355                         }
356                         data[idx] = id;
357                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
358                 }
359         }
360         if (lsm) {
361                 /* restore stripes number */
362                 lsm->lsm_stripe_count = stripes;
363                 obd_free_memmd(mds->mds_lov_exp, &lsm);
364         }
365         EXIT;
366         return;
367 }
368 EXPORT_SYMBOL(mds_lov_update_objids);
369
370 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
371                                     __u32 count)
372 {
373         __u32 max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
374         __u32 i, stripes;
375
376         for (i = 0; i < count; i++) {
377                 if (data[i] == 0)
378                         continue;
379
380                 mds->mds_lov_objid_count++;
381         }
382
383         stripes = min(lov_mds_md_stripecnt(max_easize, LOV_MAGIC_V3),
384                          mds->mds_lov_objid_count);
385
386         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
387         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
388
389         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
390                "%d/%d\n", stripes, mds->mds_max_mdsize,
391                mds->mds_max_cookiesize);
392
393         EXIT;
394         return 0;
395 }
396
397 static int mds_lov_read_objids(struct obd_device *obd)
398 {
399         struct mds_obd *mds = &obd->u.mds;
400         loff_t off = 0;
401         int i, rc = 0, count = 0, page = 0;
402         unsigned long size;
403         ENTRY;
404
405         /* Read everything in the file, even if our current lov desc
406            has fewer targets. Old targets not in the lov descriptor
407            during mds setup may still have valid objids. */
408         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
409         if (size == 0)
410                 RETURN(0);
411
412         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
413         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
414         for (i = 0; i < page; i++) {
415                 obd_id *data;
416                 loff_t off_old = off;
417
418                 LASSERT(mds->mds_lov_page_array[i] == NULL);
419                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
420                 if (mds->mds_lov_page_array[i] == NULL)
421                         GOTO(out, rc = -ENOMEM);
422
423                 data = mds->mds_lov_page_array[i];
424
425                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
426                                         MDS_LOV_ALLOC_SIZE, &off);
427                 if (rc < 0) {
428                         CERROR("Error reading objids %d\n", rc);
429                         GOTO(out, rc);
430                 }
431                 if (off == off_old) /* hole is read */
432                         off += MDS_LOV_ALLOC_SIZE;
433
434                 count = (off - off_old) / sizeof(obd_id);
435                 if (mds_lov_update_from_read(mds, data, count)) {
436                         CERROR("Can't update mds data\n");
437                         GOTO(out, rc = -EIO);
438                 }
439         }
440         mds->mds_lov_objid_lastpage = page - 1;
441         mds->mds_lov_objid_lastidx = count - 1;
442
443         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
444                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
445 out:
446         mds_lov_dump_objids("read",obd);
447
448         RETURN(rc);
449 }
450
451 int mds_lov_write_objids(struct obd_device *obd)
452 {
453         struct mds_obd *mds = &obd->u.mds;
454         int i = 0, rc = 0;
455         ENTRY;
456
457         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
458                 RETURN(0);
459
460         mds_lov_dump_objids("write", obd);
461
462         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
463                 obd_id *data =  mds->mds_lov_page_array[i];
464                 unsigned int size = MDS_LOV_ALLOC_SIZE;
465                 loff_t off = i * size;
466
467                 LASSERT(data != NULL);
468
469                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
470                         continue;
471
472                 /* check for particaly filled last page */
473                 if (i == mds->mds_lov_objid_lastpage)
474                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
475
476                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
477                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
478                                          size, &off, 0);
479                 if (rc < 0) {
480                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
481                         break;
482                 }
483         }
484         if (rc >= 0)
485                 rc = 0;
486
487         RETURN(rc);
488 }
489 EXPORT_SYMBOL(mds_lov_write_objids);
490
491 static int mds_lov_get_objid(struct obd_device * obd,
492                              obd_id idx)
493 {
494         struct mds_obd *mds = &obd->u.mds;
495         struct obd_export *lov_exp = mds->mds_lov_exp;
496         unsigned int page;
497         unsigned int off;
498         obd_id *data;
499         __u32 size;
500         int rc = 0;
501         ENTRY;
502
503         page = idx / OBJID_PER_PAGE();
504         off = idx % OBJID_PER_PAGE();
505         data = mds->mds_lov_page_array[page];
506
507         if (data[off] < 2) {
508                 /* We never read this lastid; ask the osc */
509                 struct obd_id_info lastid;
510
511                 size = sizeof(lastid);
512                 lastid.idx = idx;
513                 lastid.data = &data[off];
514                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
515                                   &size, &lastid, NULL);
516                 if (rc)
517                         GOTO(out, rc);
518
519                 /* workaround for clean filter */
520                 if (data[off] == 0)
521                         data[off] = 1;
522
523                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
524         }
525         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
526                idx, data, page, off, data[off]);
527 out:
528         RETURN(rc);
529 }
530
531 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
532 {
533         int rc;
534         struct obdo oa = { 0 };
535         struct obd_trans_info oti = {0};
536         struct lov_stripe_md  *empty_ea = NULL;
537         ENTRY;
538
539         LASSERT(mds->mds_lov_page_array != NULL);
540
541         /* This create will in fact either create or destroy:  If the OST is
542          * missing objects below this ID, they will be created.  If it finds
543          * objects above this ID, they will be removed. */
544         memset(&oa, 0, sizeof(oa));
545         oa.o_flags = OBD_FL_DELORPHAN;
546         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
547         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
548         if (ost_uuid != NULL)
549                 oti.oti_ost_uuid = ost_uuid;
550
551         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
552
553         RETURN(rc);
554 }
555
556 /* for one target */
557 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
558 {
559         struct mds_obd *mds = &obd->u.mds;
560         int rc;
561         struct obd_id_info info;
562         ENTRY;
563
564         LASSERT(!obd->obd_recovering);
565
566         info.idx = idx;
567         info.data = id;
568         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
569                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
570         if (rc)
571                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
572                         obd->obd_name, rc);
573
574         RETURN(rc);
575 }
576
577 /* Update the lov desc for a new size lov. */
578 static int mds_lov_update_desc(struct obd_device *obd, int idx,
579                                struct obd_uuid *uuid)
580 {
581         struct mds_obd *mds = &obd->u.mds;
582         struct lov_desc *ld;
583         __u32 valsize = sizeof(mds->mds_lov_desc);
584         int rc = 0;
585         ENTRY;
586
587         OBD_ALLOC(ld, sizeof(*ld));
588         if (!ld)
589                 RETURN(-ENOMEM);
590
591         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
592                           &valsize, ld, NULL);
593         if (rc)
594                 GOTO(out, rc);
595
596         /* Don't change the mds_lov_desc until the objids size matches the
597            count (paranoia) */
598         mds->mds_lov_desc = *ld;
599         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
600                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
601
602         cfs_mutex_lock(&obd->obd_dev_mutex);
603         rc = mds_lov_update_max_ost(mds, idx);
604         cfs_mutex_unlock(&obd->obd_dev_mutex);
605         if (rc != 0)
606                 GOTO(out, rc );
607
608         /* If we added a target we have to reconnect the llogs */
609         /* We only _need_ to do this at first add (idx), or the first time
610            after recovery.  However, it should now be safe to call anytime. */
611         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
612         if (rc)
613                 GOTO(out, rc);
614
615 out:
616         OBD_FREE(ld, sizeof(*ld));
617         RETURN(rc);
618 }
619
620 /* Inform MDS about new/updated target */
621 static int mds_lov_update_mds(struct obd_device *obd,
622                               struct obd_device *watched,
623                               __u32 idx)
624 {
625         struct mds_obd *mds = &obd->u.mds;
626         int rc = 0;
627         int page;
628         int off;
629         obd_id *data;
630         ENTRY;
631
632         LASSERT(mds_lov_objinit(mds, idx));
633
634         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
635                idx, obd->obd_recovering, obd->obd_async_recov,
636                mds->mds_lov_desc.ld_tgt_count);
637
638         /* idx is set as data from lov_notify. */
639         if (obd->obd_recovering)
640                 GOTO(out, rc);
641
642         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
643                 CERROR("index %d > count %d!\n", idx,
644                        mds->mds_lov_desc.ld_tgt_count);
645                 GOTO(out, rc = -EINVAL);
646         }
647
648         rc = mds_lov_get_objid(obd, idx);
649         if (rc)
650                 GOTO(out, rc);
651
652         page = idx / OBJID_PER_PAGE();
653         off = idx % OBJID_PER_PAGE();
654         data = mds->mds_lov_page_array[page];
655
656         /* We have read this lastid from disk; tell the osc.
657            Don't call this during recovery. */
658         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
659         if (rc) {
660                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
661                 /* Don't abort the rest of the sync */
662                 rc = 0;
663         } else {
664                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
665                         data[off], idx, rc);
666         }
667 out:
668         RETURN(rc);
669 }
670
671 /* update the LOV-OSC knowledge of the last used object id's */
672 int mds_lov_connect(struct obd_device *obd, char * lov_name)
673 {
674         struct mds_obd *mds = &obd->u.mds;
675         struct obd_connect_data *data;
676         int rc;
677         ENTRY;
678
679         if (IS_ERR(mds->mds_lov_obd))
680                 RETURN(PTR_ERR(mds->mds_lov_obd));
681
682         if (mds->mds_lov_obd)
683                 RETURN(0);
684
685         mds->mds_lov_obd = class_name2obd(lov_name);
686         if (!mds->mds_lov_obd) {
687                 CERROR("MDS cannot locate LOV %s\n", lov_name);
688                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
689                 RETURN(-ENOTCONN);
690         }
691
692         cfs_mutex_lock(&obd->obd_dev_mutex);
693         rc = mds_lov_read_objids(obd);
694         cfs_mutex_unlock(&obd->obd_dev_mutex);
695         if (rc) {
696                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
697                 GOTO(err_exit, rc);
698         }
699
700         rc = obd_register_observer(mds->mds_lov_obd, obd);
701         if (rc) {
702                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
703                        lov_name, rc);
704                 GOTO(err_exit, rc);
705         }
706
707         /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
708          * targets */
709         obd_notify(mds->mds_lov_obd, NULL, OBD_NOTIFY_CREATE, NULL);
710
711         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
712
713         OBD_ALLOC(data, sizeof(*data));
714         if (data == NULL)
715                 GOTO(err_exit, rc = -ENOMEM);
716
717         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
718                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
719                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FULL20  |
720                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
721                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
722                                   OBD_CONNECT_SOM | OBD_CONNECT_MAX_EASIZE;
723 #ifdef HAVE_LRU_RESIZE_SUPPORT
724         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
725 #endif
726         data->ocd_version = LUSTRE_VERSION_CODE;
727         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
728         data->ocd_max_easize = mds->mds_obt.obt_osd_properties.osd_max_ea_size;
729
730         /* send max bytes per rpc */
731         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
732         /* send the list of supported checksum types */
733         data->ocd_cksum_types = cksum_types_supported();
734         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
735         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
736         OBD_FREE(data, sizeof(*data));
737         if (rc) {
738                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
739                 mds->mds_lov_obd = ERR_PTR(rc);
740                 RETURN(rc);
741         }
742
743         /* I want to see a callback happen when the OBD moves to a
744          * "For General Use" state, and that's when we'll call
745          * set_nextid().  The class driver can help us here, because
746          * it can use the obd_recovering flag to determine when the
747          * the OBD is full available. */
748         /* MDD device will care about that
749         if (!obd->obd_recovering)
750                 rc = mds_postrecov(obd);
751          */
752         RETURN(rc);
753
754 err_exit:
755         mds->mds_lov_exp = NULL;
756         mds->mds_lov_obd = ERR_PTR(rc);
757         RETURN(rc);
758 }
759
760 int mds_lov_disconnect(struct obd_device *obd)
761 {
762         struct mds_obd *mds = &obd->u.mds;
763         int rc = 0;
764         ENTRY;
765
766         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
767                 obd_register_observer(mds->mds_lov_obd, NULL);
768
769                 /* The actual disconnect of the mds_lov will be called from
770                  * class_disconnect_exports from mds_lov_clean. So we have to
771                  * ensure that class_cleanup doesn't fail due to the extra ref
772                  * we're holding now. The mechanism to do that already exists -
773                  * the obd_force flag. We'll drop the final ref to the
774                  * mds_lov_exp in mds_cleanup. */
775                 mds->mds_lov_obd->obd_force = 1;
776         }
777
778         RETURN(rc);
779 }
780
781 struct mds_lov_sync_info {
782         struct obd_device    *mlsi_obd;     /* the lov device to sync */
783         struct obd_device    *mlsi_watched; /* target osc */
784         __u32                 mlsi_index;   /* index of target */
785 };
786
787 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
788 {
789         struct mds_capa_info    info = { .uuid = uuid };
790         struct lustre_capa_key *key;
791         int i, rc = 0;
792
793         ENTRY;
794
795         if (!mds->mds_capa_keys)
796                 RETURN(0);
797
798         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
799         for (i = 0; i < 2; i++) {
800                 key = &mds->mds_capa_keys[i];
801                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
802
803                 info.capa = key;
804                 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
805                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
806                 if (rc) {
807                         DEBUG_CAPA_KEY(D_ERROR, key,
808                                        "propagate failed (rc = %d) for", rc);
809                         RETURN(rc);
810                 }
811         }
812
813         RETURN(0);
814 }
815
816 /* We only sync one osc at a time, so that we don't have to hold
817    any kind of lock on the whole mds_lov_desc, which may change
818    (grow) as a result of mds_lov_add_ost.  This also avoids any
819    kind of mismatch between the lov_desc and the mds_lov_desc,
820    which are not in lock-step during lov_add_obd */
821 static int __mds_lov_synchronize(void *data)
822 {
823         struct mds_lov_sync_info *mlsi = data;
824         struct obd_device *obd = mlsi->mlsi_obd;
825         struct obd_device *watched = mlsi->mlsi_watched;
826         struct mds_obd *mds = &obd->u.mds;
827         struct obd_uuid *uuid;
828         __u32  idx = mlsi->mlsi_index;
829         struct mds_group_info mgi;
830         struct llog_ctxt *ctxt;
831         int rc = 0;
832         ENTRY;
833
834         OBD_FREE_PTR(mlsi);
835
836         LASSERT(obd);
837         LASSERT(watched);
838         uuid = &watched->u.cli.cl_target_uuid;
839         LASSERT(uuid);
840
841         cfs_down_read(&mds->mds_notify_lock);
842         if (obd->obd_stopping || obd->obd_fail)
843                 GOTO(out, rc = -ENODEV);
844
845         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
846         rc = mds_lov_update_mds(obd, watched, idx);
847         if (rc != 0) {
848                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
849                 GOTO(out, rc);
850         }
851         mgi.group = mdt_to_obd_objseq(mds->mds_id);
852         mgi.uuid = uuid;
853
854         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
855                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
856         if (rc != 0)
857                 GOTO(out, rc);
858         /* propagate capability keys */
859         rc = mds_propagate_capa_keys(mds, uuid);
860         if (rc)
861                 GOTO(out, rc);
862
863         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
864         if (!ctxt)
865                 GOTO(out, rc = -ENODEV);
866
867         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
868         rc = llog_connect(ctxt, NULL, NULL, uuid);
869         llog_ctxt_put(ctxt);
870         if (rc != 0) {
871                 CERROR("%s failed at llog_origin_connect: %d\n",
872                        obd_uuid2str(uuid), rc);
873                 GOTO(out, rc);
874         }
875
876         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
877               obd->obd_name, obd_uuid2str(uuid));
878
879         rc = mds_lov_clear_orphans(mds, uuid);
880         if (rc != 0) {
881                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
882                        obd_uuid2str(uuid), rc);
883                 GOTO(out, rc);
884         }
885
886 #ifdef HAVE_QUOTA_SUPPORT
887         if (obd->obd_upcall.onu_owner) {
888                 /*
889                  * This is a hack for mds_notify->mdd_notify. When the mds obd
890                  * in mdd is removed, This hack should be removed.
891                  */
892                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
893                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
894                                                 obd->obd_upcall.onu_owner,NULL);
895         }
896 #endif
897         EXIT;
898 out:
899         cfs_up_read(&mds->mds_notify_lock);
900         if (rc) {
901                 /* Deactivate it for safety */
902                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
903                        rc);
904                 if (!obd->obd_stopping && mds->mds_lov_obd &&
905                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
906                         obd_notify(mds->mds_lov_obd, watched,
907                                    OBD_NOTIFY_INACTIVE, NULL);
908         }
909
910         class_decref(obd, "mds_lov_synchronize", obd);
911         return rc;
912 }
913
914 int mds_lov_synchronize(void *data)
915 {
916         struct mds_lov_sync_info *mlsi = data;
917         char name[20];
918
919         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
920         cfs_daemonize_ctxt(name);
921
922         RETURN(__mds_lov_synchronize(data));
923 }
924
925 int mds_lov_start_synchronize(struct obd_device *obd,
926                               struct obd_device *watched,
927                               void *data, enum obd_notify_event ev)
928 {
929         struct mds_lov_sync_info *mlsi;
930         int rc;
931         struct obd_uuid *uuid;
932         ENTRY;
933
934         LASSERT(watched);
935         uuid = &watched->u.cli.cl_target_uuid;
936
937         OBD_ALLOC(mlsi, sizeof(*mlsi));
938         if (mlsi == NULL)
939                 RETURN(-ENOMEM);
940
941         LASSERT(data);
942         mlsi->mlsi_obd = obd;
943         mlsi->mlsi_watched = watched;
944         mlsi->mlsi_index = *(__u32 *)data;
945
946         /* Although class_export_get(obd->obd_self_export) would lock
947            the MDS in place, since it's only a self-export
948            it doesn't lock the LOV in place.  The LOV can be disconnected
949            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
950            Simply taking an export ref on the LOV doesn't help, because it's
951            still disconnected. Taking an obd reference insures that we don't
952            disconnect the LOV.  This of course means a cleanup won't
953            finish for as long as the sync is blocking. */
954         class_incref(obd, "mds_lov_synchronize", obd);
955
956         if (ev != OBD_NOTIFY_SYNC) {
957                 /* Synchronize in the background */
958                 rc = cfs_create_thread(mds_lov_synchronize, mlsi,
959                                        CFS_DAEMON_FLAGS);
960                 if (rc < 0) {
961                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
962                                obd->obd_name, rc);
963                         class_decref(obd, "mds_lov_synchronize", obd);
964                 } else {
965                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
966                                "thread=%d\n", obd->obd_name,
967                                mlsi->mlsi_index, rc);
968                         rc = 0;
969                 }
970         } else {
971                 rc = __mds_lov_synchronize((void *)mlsi);
972         }
973
974         RETURN(rc);
975 }
976
977 int mds_notify(struct obd_device *obd, struct obd_device *watched,
978                enum obd_notify_event ev, void *data)
979 {
980         struct mds_obd *mds = &obd->u.mds;
981         int rc = 0;
982         ENTRY;
983
984         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
985
986         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
987                 CERROR("unexpected notification of %s %s!\n",
988                        watched->obd_type->typ_name, watched->obd_name);
989                 RETURN(-EINVAL);
990         }
991
992         /*XXX this notifies the MDD until lov handling use old mds code
993          * must non block!
994          */
995         if (obd->obd_upcall.onu_owner) {
996                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
997                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
998                                                  obd->obd_upcall.onu_owner,
999                                                  &mds->mds_obt.obt_mount_count);
1000         }
1001
1002         switch (ev) {
1003         /* We only handle these: */
1004         case OBD_NOTIFY_CREATE:
1005                 CDEBUG(D_CONFIG, "%s: add target %s\n", obd->obd_name,
1006                        obd_uuid2str(&watched->u.cli.cl_target_uuid));
1007                 /* We still have to fix the lov descriptor for ost's */
1008                 LASSERT(data);
1009                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1010                                           &watched->u.cli.cl_target_uuid);
1011                 RETURN(rc);
1012         case OBD_NOTIFY_ACTIVE:
1013                 /* lov want one or more _active_ targets for work */
1014                 /* activate event should be pass lov idx as argument */
1015         case OBD_NOTIFY_SYNC:
1016         case OBD_NOTIFY_SYNC_NONBLOCK:
1017                 /* sync event should be pass lov idx as argument */
1018                 break;
1019         default:
1020                 RETURN(0);
1021         }
1022
1023         if (obd->obd_recovering) {
1024                 CDEBUG(D_CONFIG, "%s: Is in recovery, "
1025                        "not resetting orphans on %s\n",
1026                        obd->obd_name,
1027                        obd_uuid2str(&watched->u.cli.cl_target_uuid));
1028                 /* We still have to fix the lov descriptor for ost's added
1029                    after the mdt in the config log.  They didn't make it into
1030                    mds_lov_connect. */
1031                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1032                                          &watched->u.cli.cl_target_uuid);
1033         } else {
1034                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1035         }
1036         RETURN(rc);
1037 }