Whamcloud - gitweb
Branch b1_4_mountconf
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <asm/div64.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <linux/obd_class.h>
37 #include <linux/obd_lov.h>
38 #include <linux/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         INIT_LIST_HEAD(&set->set_list);
48         atomic_set(&set->set_refcount, 1);
49 }
50
51 static void lov_finish_set(struct lov_request_set *set)
52 {
53         struct list_head *pos, *n;
54         ENTRY;
55
56         LASSERT(set);
57         list_for_each_safe(pos, n, &set->set_list) {
58                 struct lov_request *req = list_entry(pos, struct lov_request,
59                                                      rq_link);
60                 list_del_init(&req->rq_link);
61
62                 if (req->rq_oa)
63                         obdo_free(req->rq_oa);
64                 if (req->rq_md)
65                         OBD_FREE(req->rq_md, req->rq_buflen);
66                 OBD_FREE(req, sizeof(*req));
67         }
68
69         if (set->set_pga) {
70                 int len = set->set_oabufs * sizeof(*set->set_pga);
71                 OBD_FREE(set->set_pga, len);
72         }
73         if (set->set_lockh)
74                 lov_llh_put(set->set_lockh);
75
76         OBD_FREE(set, sizeof(*set));
77         EXIT;
78 }
79
80 static void lov_update_set(struct lov_request_set *set,
81                            struct lov_request *req, int rc)
82 {
83         req->rq_complete = 1;
84         req->rq_rc = rc;
85
86         set->set_completes++;
87         if (rc == 0)
88                 set->set_success++;
89 }
90
91 int lov_update_common_set(struct lov_request_set *set,
92                           struct lov_request *req, int rc)
93 {
94         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
95         ENTRY;
96
97         lov_update_set(set, req, rc);
98
99         /* grace error on inactive ost */
100         if (rc && !lov->tgts[req->rq_idx].active)
101                 rc = 0;
102
103         /* FIXME in raid1 regime, should return 0 */
104         RETURN(rc);
105 }
106
107 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
108 {
109         list_add_tail(&req->rq_link, &set->set_list);
110         set->set_count++;
111 }
112
113 int lov_update_enqueue_set(struct lov_request_set *set,
114                            struct lov_request *req, int rc, int flags)
115 {
116         struct lustre_handle *lov_lockhp;
117         struct lov_oinfo *loi;
118         ENTRY;
119
120         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
121         loi = &set->set_md->lsm_oinfo[req->rq_stripe];
122
123         /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
124          * It belongs in the OSC, except that the OSC doesn't have
125          * access to the real LOI -- it gets a copy, that we created
126          * above, and that copy can be arbitrarily out of date.
127          *
128          * The LOV API is due for a serious rewriting anyways, and this
129          * can be addressed then. */
130         if (rc == ELDLM_OK) {
131                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
132                 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
133
134                 LASSERT(lock != NULL);
135                 loi->loi_rss = tmp;
136                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
137                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
138                 /* Extend KMS up to the end of this lock and no further
139                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
140                 if (tmp > lock->l_policy_data.l_extent.end)
141                         tmp = lock->l_policy_data.l_extent.end + 1;
142                 if (tmp >= loi->loi_kms) {
143                         LDLM_DEBUG(lock, "lock acquired, setting rss="
144                                    LPU64", kms="LPU64, loi->loi_rss, tmp);
145                         loi->loi_kms = tmp;
146                         loi->loi_kms_valid = 1;
147                 } else {
148                         LDLM_DEBUG(lock, "lock acquired, setting rss="
149                                    LPU64"; leaving kms="LPU64", end="LPU64,
150                                    loi->loi_rss, loi->loi_kms,
151                                    lock->l_policy_data.l_extent.end);
152                 }
153                 ldlm_lock_allow_match(lock);
154                 LDLM_LOCK_PUT(lock);
155         } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
156                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
157                 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
158                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
159                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
160                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
161                        " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
162                 rc = ELDLM_OK;
163         } else {
164                 struct obd_export *exp = set->set_exp;
165                 struct lov_obd *lov = &exp->exp_obd->u.lov;
166
167                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
168                 if (lov->tgts[req->rq_idx].active) {
169                         CERROR("error: enqueue objid "LPX64" subobj "
170                                 LPX64" on OST idx %d: rc = %d\n",
171                                 set->set_md->lsm_object_id, loi->loi_id,
172                                 loi->loi_ost_idx, rc);
173                 } else {
174                         rc = ELDLM_OK;
175                 }
176         }
177         lov_update_set(set, req, rc);
178         RETURN(rc);
179 }
180
181 static int enqueue_done(struct lov_request_set *set, __u32 mode)
182 {
183         struct lov_request *req;
184         struct lustre_handle *lov_lockhp = NULL;
185         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
186         int rc = 0;
187         ENTRY;
188
189         LASSERT(set->set_completes);
190         /* enqueue/match success, just return */
191         if (set->set_completes == set->set_success)
192                 RETURN(0);
193
194         /* cancel enqueued/matched locks */
195         list_for_each_entry(req, &set->set_list, rq_link) {
196                 if (!req->rq_complete || req->rq_rc)
197                         continue;
198
199                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
200                 LASSERT(lov_lockhp);
201                 if (lov_lockhp->cookie == 0)
202                         continue;
203
204                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
205                                 mode, lov_lockhp);
206                 if (rc && lov->tgts[req->rq_idx].active)
207                         CERROR("cancelling obdjid "LPX64" on OST "
208                                "idx %d error: rc = %d\n",
209                                req->rq_md->lsm_object_id, req->rq_idx, rc);
210         }
211         lov_llh_put(set->set_lockh);
212         RETURN(rc);
213 }
214
215 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
216 {
217         int rc = 0;
218         ENTRY;
219
220         if (set == NULL)
221                 RETURN(0);
222         LASSERT(set->set_exp);
223         if (set->set_completes)
224                 rc = enqueue_done(set, mode);
225         else
226                 lov_llh_put(set->set_lockh);
227
228         if (atomic_dec_and_test(&set->set_refcount))
229                 lov_finish_set(set);
230
231         RETURN(rc);
232 }
233
234 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
235                          ldlm_policy_data_t *policy, __u32 mode,
236                          struct lustre_handle *lockh,
237                          struct lov_request_set **reqset)
238 {
239         struct lov_obd *lov = &exp->exp_obd->u.lov;
240         struct lov_request_set *set;
241         int i, rc = 0;
242         struct lov_oinfo *loi;
243         ENTRY;
244
245         OBD_ALLOC(set, sizeof(*set));
246         if (set == NULL)
247                 RETURN(-ENOMEM);
248         lov_init_set(set);
249
250         set->set_exp = exp;
251         set->set_md = lsm;
252         set->set_lockh = lov_llh_new(lsm);
253         if (set->set_lockh == NULL)
254                 GOTO(out_set, rc = -ENOMEM);
255         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
256
257         loi = lsm->lsm_oinfo;
258         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
259                 struct lov_request *req;
260                 obd_off start, end;
261
262                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
263                                            policy->l_extent.end, &start, &end))
264                         continue;
265
266                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
267                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
268                         continue;
269                 }
270
271                 OBD_ALLOC(req, sizeof(*req));
272                 if (req == NULL)
273                         GOTO(out_set, rc = -ENOMEM);
274
275                 req->rq_buflen = sizeof(*req->rq_md) +
276                         sizeof(struct lov_oinfo);
277                 OBD_ALLOC(req->rq_md, req->rq_buflen);
278                 if (req->rq_md == NULL)
279                         GOTO(out_set, rc = -ENOMEM);
280
281                 req->rq_extent.start = start;
282                 req->rq_extent.end = end;
283                 req->rq_extent.gid = policy->l_extent.gid;
284
285                 req->rq_idx = loi->loi_ost_idx;
286                 req->rq_stripe = i;
287
288                 /* XXX LOV STACKING: submd should be from the subobj */
289                 req->rq_md->lsm_object_id = loi->loi_id;
290                 req->rq_md->lsm_stripe_count = 0;
291                 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
292                 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
293                 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
294                 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
295                 req->rq_md->lsm_oinfo->loi_mtime = loi->loi_mtime;
296
297                 lov_set_add_req(req, set);
298         }
299         if (!set->set_count)
300                 GOTO(out_set, rc = -EIO);
301         *reqset = set;
302         RETURN(0);
303 out_set:
304         lov_fini_enqueue_set(set, mode);
305         RETURN(rc);
306 }
307
308 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
309                          int rc)
310 {
311         int ret = rc;
312         ENTRY;
313
314         if (rc == 1)
315                 ret = 0;
316         lov_update_set(set, req, ret);
317         RETURN(rc);
318 }
319
320 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
321 {
322         int rc = 0;
323         ENTRY;
324
325         if (set == NULL)
326                 RETURN(0);
327         LASSERT(set->set_exp);
328         if (set->set_completes) {
329                 if (set->set_count == set->set_success &&
330                     flags & LDLM_FL_TEST_LOCK)
331                         lov_llh_put(set->set_lockh);
332                 rc = enqueue_done(set, mode);
333         } else {
334                 lov_llh_put(set->set_lockh);
335         }
336
337         if (atomic_dec_and_test(&set->set_refcount))
338                 lov_finish_set(set);
339
340         RETURN(rc);
341 }
342
343 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
344                        ldlm_policy_data_t *policy, __u32 mode,
345                        struct lustre_handle *lockh,
346                        struct lov_request_set **reqset)
347 {
348         struct lov_obd *lov = &exp->exp_obd->u.lov;
349         struct lov_request_set *set;
350         int i, rc = 0;
351         struct lov_oinfo *loi;
352         ENTRY;
353
354         OBD_ALLOC(set, sizeof(*set));
355         if (set == NULL)
356                 RETURN(-ENOMEM);
357         lov_init_set(set);
358
359         set->set_exp = exp;
360         set->set_md = lsm;
361         set->set_lockh = lov_llh_new(lsm);
362         if (set->set_lockh == NULL)
363                 GOTO(out_set, rc = -ENOMEM);
364         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
365
366         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
367                 struct lov_request *req;
368                 obd_off start, end;
369
370                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
371                                            policy->l_extent.end, &start, &end))
372                         continue;
373
374                 /* FIXME raid1 should grace this error */
375                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
376                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
377                         GOTO(out_set, rc = -EIO);
378                 }
379
380                 OBD_ALLOC(req, sizeof(*req));
381                 if (req == NULL)
382                         GOTO(out_set, rc = -ENOMEM);
383
384                 req->rq_buflen = sizeof(*req->rq_md);
385                 OBD_ALLOC(req->rq_md, req->rq_buflen);
386                 if (req->rq_md == NULL)
387                         GOTO(out_set, rc = -ENOMEM);
388
389                 req->rq_extent.start = start;
390                 req->rq_extent.end = end;
391                 req->rq_extent.gid = policy->l_extent.gid;
392
393                 req->rq_idx = loi->loi_ost_idx;
394                 req->rq_stripe = i;
395
396                 /* XXX LOV STACKING: submd should be from the subobj */
397                 req->rq_md->lsm_object_id = loi->loi_id;
398                 req->rq_md->lsm_stripe_count = 0;
399
400                 lov_set_add_req(req, set);
401         }
402         if (!set->set_count)
403                 GOTO(out_set, rc = -EIO);
404         *reqset = set;
405         RETURN(rc);
406 out_set:
407         lov_fini_match_set(set, mode, 0);
408         RETURN(rc);
409 }
410
411 int lov_fini_cancel_set(struct lov_request_set *set)
412 {
413         int rc = 0;
414         ENTRY;
415
416         LASSERT(set->set_exp);
417         if (set == NULL)
418                 RETURN(0);
419
420         if (set->set_lockh)
421                 lov_llh_put(set->set_lockh);
422
423         if (atomic_dec_and_test(&set->set_refcount))
424                 lov_finish_set(set);
425
426         RETURN(rc);
427 }
428
429 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
430                         __u32 mode, struct lustre_handle *lockh,
431                         struct lov_request_set **reqset)
432 {
433         struct lov_request_set *set;
434         int i, rc = 0;
435         struct lov_oinfo *loi;
436         ENTRY;
437
438         OBD_ALLOC(set, sizeof(*set));
439         if (set == NULL)
440                 RETURN(-ENOMEM);
441         lov_init_set(set);
442
443         set->set_exp = exp;
444         set->set_md = lsm;
445         set->set_lockh = lov_handle2llh(lockh);
446         if (set->set_lockh == NULL) {
447                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
448                 GOTO(out_set, rc = -EINVAL);
449         }
450         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
451
452         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
453                 struct lov_request *req;
454                 struct lustre_handle *lov_lockhp;
455
456                 lov_lockhp = set->set_lockh->llh_handles + i;
457                 if (lov_lockhp->cookie == 0) {
458                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
459                                loi->loi_ost_idx, loi->loi_id);
460                         continue;
461                 }
462
463                 OBD_ALLOC(req, sizeof(*req));
464                 if (req == NULL)
465                         GOTO(out_set, rc = -ENOMEM);
466
467                 req->rq_buflen = sizeof(*req->rq_md);
468                 OBD_ALLOC(req->rq_md, req->rq_buflen);
469                 if (req->rq_md == NULL)
470                         GOTO(out_set, rc = -ENOMEM);
471
472                 req->rq_idx = loi->loi_ost_idx;
473                 req->rq_stripe = i;
474
475                 /* XXX LOV STACKING: submd should be from the subobj */
476                 req->rq_md->lsm_object_id = loi->loi_id;
477                 req->rq_md->lsm_stripe_count = 0;
478
479                 lov_set_add_req(req, set);
480         }
481         if (!set->set_count)
482                 GOTO(out_set, rc = -EIO);
483         *reqset = set;
484         RETURN(rc);
485 out_set:
486         lov_fini_cancel_set(set);
487         RETURN(rc);
488 }
489
490 static int create_done(struct obd_export *exp, struct lov_request_set *set,
491                        struct lov_stripe_md **lsmp)
492 {
493         struct lov_obd *lov = &exp->exp_obd->u.lov;
494         struct obd_trans_info *oti = set->set_oti;
495         struct obdo *src_oa = set->set_oa;
496         struct lov_request *req;
497         struct obdo *ret_oa = NULL;
498         int attrset = 0, rc = 0;
499         ENTRY;
500
501         LASSERT(set->set_completes);
502
503         /* try alloc objects on other osts if osc_create fails for
504          * exceptions: RPC failure, ENOSPC, etc */
505         if (set->set_count != set->set_success) {
506                 list_for_each_entry (req, &set->set_list, rq_link) {
507                         if (req->rq_rc == 0)
508                                 continue;
509                         
510                         set->set_completes--;
511                         req->rq_complete = 0;
512                         
513                         rc = qos_remedy_create(set, req);
514                         lov_update_create_set(set, req, rc);
515
516                         if (rc)
517                                 break;
518                 }
519         }
520
521         /* no successful creates */
522         if (set->set_success == 0)
523                 GOTO(cleanup, rc);
524         
525         /* If there was an explicit stripe set, fail.  Otherwise, we
526          * got some objects and that's not bad. */
527         if (set->set_count != set->set_success) {
528                 if (*lsmp)
529                         GOTO(cleanup, rc);
530                 set->set_count = set->set_success;
531                 qos_shrink_lsm(set);
532         }
533
534         ret_oa = obdo_alloc();
535         if (ret_oa == NULL)
536                 GOTO(cleanup, rc = -ENOMEM);
537
538         list_for_each_entry(req, &set->set_list, rq_link) {
539                 if (!req->rq_complete || req->rq_rc)
540                         continue;
541                 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
542                                 set->set_md, req->rq_stripe, &attrset);
543         }
544         if (src_oa->o_valid & OBD_MD_FLSIZE &&
545             ret_oa->o_size != src_oa->o_size) {
546                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
547                        src_oa->o_size, ret_oa->o_size);
548                 LBUG();
549         }
550         ret_oa->o_id = src_oa->o_id;
551         memcpy(src_oa, ret_oa, sizeof(*src_oa));
552         obdo_free(ret_oa);
553
554         *lsmp = set->set_md;
555         GOTO(done, rc = 0);
556
557 cleanup:
558         list_for_each_entry(req, &set->set_list, rq_link) {
559                 struct obd_export *sub_exp;
560                 int err = 0;
561
562                 if (!req->rq_complete || req->rq_rc)
563                         continue;
564
565                 sub_exp = lov->tgts[req->rq_idx].ltd_exp;
566                 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
567                 if (err)
568                         CERROR("Failed to uncreate objid "LPX64" subobj "
569                                LPX64" on OST idx %d: rc = %d\n",
570                                set->set_oa->o_id, req->rq_oa->o_id,
571                                req->rq_idx, rc);
572         }
573         if (*lsmp == NULL)
574                 obd_free_memmd(exp, &set->set_md);
575 done:
576         if (oti && set->set_cookies) {
577                 oti->oti_logcookies = set->set_cookies;
578                 if (!set->set_cookie_sent) {
579                         oti_free_cookies(oti);
580                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
581                 } else {
582                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
583                 }
584         }
585         RETURN(rc);
586 }
587
588 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
589 {
590         int rc = 0;
591         ENTRY;
592
593         LASSERT(set->set_exp);
594         if (set == NULL)
595                 RETURN(0);
596         if (set->set_completes) {
597                 rc = create_done(set->set_exp, set, lsmp);
598                 /* FIXME update qos data here */
599         }
600
601         if (atomic_dec_and_test(&set->set_refcount))
602                 lov_finish_set(set);
603
604         RETURN(rc);
605 }
606
607 int lov_update_create_set(struct lov_request_set *set,
608                           struct lov_request *req, int rc)
609 {
610         struct obd_trans_info *oti = set->set_oti;
611         struct lov_stripe_md *lsm = set->set_md;
612         struct lov_oinfo *loi;
613         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
614         ENTRY;
615
616         req->rq_stripe = set->set_success;
617         loi = &lsm->lsm_oinfo[req->rq_stripe];
618
619         if (rc && lov->tgts[req->rq_idx].active) {
620                 CERROR("error creating fid "LPX64" sub-object"
621                        " on OST idx %d/%d: rc = %d\n",
622                        set->set_oa->o_id, req->rq_idx,
623                        lsm->lsm_stripe_count, rc);
624                 if (rc > 0) {
625                         CERROR("obd_create returned invalid err %d\n", rc);
626                         rc = -EIO;
627                 }
628         }
629         lov_update_set(set, req, rc);
630         if (rc)
631                 RETURN(rc);
632
633         if (oti && oti->oti_objid)
634                 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
635
636         loi->loi_id = req->rq_oa->o_id;
637         loi->loi_ost_idx = req->rq_idx;
638         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
639                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
640         loi_init(loi);
641
642         if (set->set_cookies)
643                 ++oti->oti_logcookies;
644         if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
645                 set->set_cookie_sent++;
646
647         RETURN(0);
648 }
649
650 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **lsmp,
651                         struct obdo *src_oa, struct obd_trans_info *oti,
652                         struct lov_request_set **reqset)
653 {
654         struct lov_obd *lov = &exp->exp_obd->u.lov;
655         struct lov_request_set *set;
656         int rc = 0, newea = 0;
657         ENTRY;
658
659         OBD_ALLOC(set, sizeof(*set));
660         if (set == NULL)
661                 RETURN(-ENOMEM);
662         lov_init_set(set);
663
664         set->set_exp = exp;
665         set->set_md = *lsmp;
666         set->set_oa = src_oa;
667         set->set_oti = oti;
668
669         if (set->set_md == NULL) {
670                 int stripes, stripe_cnt;
671                 stripe_cnt = lov_get_stripecnt(lov, 0);
672
673                 /* If the MDS file was truncated up to some size, stripe over
674                  * enough OSTs to allow the file to be created at that size. */
675                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
676                         stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
677                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
678
679                         if (stripes > lov->desc.ld_active_tgt_count)
680                                 GOTO(out_set, rc = -EFBIG);
681                         if (stripes < stripe_cnt)
682                                 stripes = stripe_cnt;
683                 } else {
684                         stripes = stripe_cnt;
685                 }
686
687                 rc = lov_alloc_memmd(&set->set_md, stripes,
688                                      lov->desc.ld_pattern ?
689                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0);
690                 if (rc < 0)
691                         goto out_set;
692                 newea = 1;
693         }
694
695         rc = qos_prep_create(lov, set, newea);
696         if (rc)
697                 goto out_lsm;
698
699         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
700                 oti_alloc_cookies(oti, set->set_count);
701                 if (!oti->oti_logcookies)
702                         goto out_lsm;
703                 set->set_cookies = oti->oti_logcookies;
704         }
705         *reqset = set;
706         RETURN(rc);
707
708 out_lsm:
709         if (*lsmp == NULL)
710                 obd_free_memmd(exp, &set->set_md);
711 out_set:
712         lov_fini_create_set(set, lsmp);
713         RETURN(rc);
714 }
715
716 static int common_attr_done(struct lov_request_set *set)
717 {
718         struct list_head *pos;
719         struct lov_request *req;
720         struct obdo *tmp_oa;
721         int rc = 0, attrset = 0;
722         ENTRY;
723
724         if (set->set_oa == NULL)
725                 RETURN(0);
726
727         if (!set->set_success)
728                 RETURN(-EIO);
729
730         tmp_oa = obdo_alloc();
731         if (tmp_oa == NULL)
732                 GOTO(out, rc = -ENOMEM);
733
734         list_for_each (pos, &set->set_list) {
735                 req = list_entry(pos, struct lov_request, rq_link);
736
737                 if (!req->rq_complete || req->rq_rc)
738                         continue;
739                 if (req->rq_oa->o_valid == 0)   /* inactive stripe */
740                         continue;
741                 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
742                                 set->set_md, req->rq_stripe, &attrset);
743         }
744         if (!attrset) {
745                 CERROR("No stripes had valid attrs\n");
746                 rc = -EIO;
747         }
748         tmp_oa->o_id = set->set_oa->o_id;
749         memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
750 out:
751         if (tmp_oa)
752                 obdo_free(tmp_oa);
753         RETURN(rc);
754
755 }
756
757 static int brw_done(struct lov_request_set *set)
758 {
759         struct lov_stripe_md *lsm = set->set_md;
760         struct lov_oinfo     *loi = NULL;
761         struct list_head *pos;
762         struct lov_request *req;
763         ENTRY;
764
765         list_for_each (pos, &set->set_list) {
766                 req = list_entry(pos, struct lov_request, rq_link);
767
768                 if (!req->rq_complete || req->rq_rc)
769                         continue;
770
771                 loi = &lsm->lsm_oinfo[req->rq_stripe];
772
773                 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
774                         loi->loi_blocks = req->rq_oa->o_blocks;
775         }
776
777         RETURN(0);
778 }
779
780 int lov_fini_brw_set(struct lov_request_set *set)
781 {
782         int rc = 0;
783         ENTRY;
784
785         LASSERT(set->set_exp);
786         if (set == NULL)
787                 RETURN(0);
788         if (set->set_completes) {
789                 rc = brw_done(set);
790                 /* FIXME update qos data here */
791         }
792         if (atomic_dec_and_test(&set->set_refcount))
793                 lov_finish_set(set);
794
795         RETURN(rc);
796 }
797
798 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
799                      struct lov_stripe_md *lsm, obd_count oa_bufs,
800                      struct brw_page *pga, struct obd_trans_info *oti,
801                      struct lov_request_set **reqset)
802 {
803         struct {
804                 obd_count       index;
805                 obd_count       count;
806                 obd_count       off;
807         } *info = NULL;
808         struct lov_request_set *set;
809         struct lov_oinfo *loi = NULL;
810         struct lov_obd *lov = &exp->exp_obd->u.lov;
811         int rc = 0, i, shift;
812         ENTRY;
813
814         OBD_ALLOC(set, sizeof(*set));
815         if (set == NULL)
816                 RETURN(-ENOMEM);
817         lov_init_set(set);
818
819         set->set_exp = exp;
820         set->set_md = lsm;
821         set->set_oa = src_oa;
822         set->set_oti = oti;
823         set->set_oabufs = oa_bufs;
824         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
825         if (!set->set_pga)
826                 GOTO(out, rc = -ENOMEM);
827
828         OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
829         if (!info)
830                 GOTO(out, rc = -ENOMEM);
831
832         /* calculate the page count for each stripe */
833         for (i = 0; i < oa_bufs; i++) {
834                 int stripe = lov_stripe_number(lsm, pga[i].off);
835                 info[stripe].count++;
836         }
837
838         /* alloc and initialize lov request */
839         shift = 0;
840         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++){
841                 struct lov_request *req;
842
843                 if (info[i].count == 0)
844                         continue;
845
846                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
847                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
848                         GOTO(out, rc = -EIO);
849                 }
850
851                 OBD_ALLOC(req, sizeof(*req));
852                 if (req == NULL)
853                         GOTO(out, rc = -ENOMEM);
854
855                 req->rq_oa = obdo_alloc();
856                 if (req->rq_oa == NULL)
857                         GOTO(out, rc = -ENOMEM);
858
859                 if (src_oa)
860                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
861                 req->rq_oa->o_id = loi->loi_id;
862
863                 req->rq_buflen = sizeof(*req->rq_md);
864                 OBD_ALLOC(req->rq_md, req->rq_buflen);
865                 if (req->rq_md == NULL)
866                         GOTO(out, rc = -ENOMEM);
867
868                 req->rq_idx = loi->loi_ost_idx;
869                 req->rq_stripe = i;
870
871                 /* XXX LOV STACKING */
872                 req->rq_md->lsm_object_id = loi->loi_id;
873                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
874                 req->rq_oabufs = info[i].count;
875                 req->rq_pgaidx = shift;
876                 shift += req->rq_oabufs;
877
878                 /* remember the index for sort brw_page array */
879                 info[i].index = req->rq_pgaidx;
880
881                 lov_set_add_req(req, set);
882         }
883         if (!set->set_count)
884                 GOTO(out, rc = -EIO);
885
886         /* rotate & sort the brw_page array */
887         for (i = 0; i < oa_bufs; i++) {
888                 int stripe = lov_stripe_number(lsm, pga[i].off);
889
890                 shift = info[stripe].index + info[stripe].off;
891                 LASSERT(shift < oa_bufs);
892                 set->set_pga[shift] = pga[i];
893                 lov_stripe_offset(lsm, pga[i].off, stripe,
894                                   &set->set_pga[shift].off);
895                 info[stripe].off++;
896         }
897 out:
898         if (info)
899                 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
900
901         if (rc == 0)
902                 *reqset = set;
903         else
904                 lov_fini_brw_set(set);
905
906         RETURN(rc);
907 }
908
909 int lov_fini_getattr_set(struct lov_request_set *set)
910 {
911         int rc = 0;
912         ENTRY;
913
914         LASSERT(set->set_exp);
915         if (set == NULL)
916                 RETURN(0);
917         if (set->set_completes)
918                 rc = common_attr_done(set);
919
920         if (atomic_dec_and_test(&set->set_refcount))
921                 lov_finish_set(set);
922
923         RETURN(rc);
924 }
925
926 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
927                          struct lov_stripe_md *lsm,
928                          struct lov_request_set **reqset)
929 {
930         struct lov_request_set *set;
931         struct lov_oinfo *loi = NULL;
932         struct lov_obd *lov = &exp->exp_obd->u.lov;
933         int rc = 0, i;
934         ENTRY;
935
936         OBD_ALLOC(set, sizeof(*set));
937         if (set == NULL)
938                 RETURN(-ENOMEM);
939         lov_init_set(set);
940
941         set->set_exp = exp;
942         set->set_md = lsm;
943         set->set_oa = src_oa;
944
945         loi = lsm->lsm_oinfo;
946         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
947                 struct lov_request *req;
948
949                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
950                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
951                         continue;
952                 }
953
954                 OBD_ALLOC(req, sizeof(*req));
955                 if (req == NULL)
956                         GOTO(out_set, rc = -ENOMEM);
957
958                 req->rq_stripe = i;
959                 req->rq_idx = loi->loi_ost_idx;
960
961                 req->rq_oa = obdo_alloc();
962                 if (req->rq_oa == NULL)
963                         GOTO(out_set, rc = -ENOMEM);
964                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
965                 req->rq_oa->o_id = loi->loi_id;
966
967                 lov_set_add_req(req, set);
968         }
969         if (!set->set_count)
970                 GOTO(out_set, rc = -EIO);
971         *reqset = set;
972         RETURN(rc);
973 out_set:
974         lov_fini_getattr_set(set);
975         RETURN(rc);
976 }
977
978 int lov_fini_destroy_set(struct lov_request_set *set)
979 {
980         ENTRY;
981
982         LASSERT(set->set_exp);
983         if (set == NULL)
984                 RETURN(0);
985         if (set->set_completes) {
986                 /* FIXME update qos data here */
987         }
988
989         if (atomic_dec_and_test(&set->set_refcount))
990                 lov_finish_set(set);
991
992         RETURN(0);
993 }
994
995 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
996                          struct lov_stripe_md *lsm,
997                          struct obd_trans_info *oti,
998                          struct lov_request_set **reqset)
999 {
1000         struct lov_request_set *set;
1001         struct lov_oinfo *loi = NULL;
1002         struct lov_obd *lov = &exp->exp_obd->u.lov;
1003         int rc = 0, cookie_set = 0, i;
1004         ENTRY;
1005
1006         OBD_ALLOC(set, sizeof(*set));
1007         if (set == NULL)
1008                 RETURN(-ENOMEM);
1009         lov_init_set(set);
1010
1011         set->set_exp = exp;
1012         set->set_md = lsm;
1013         set->set_oa = src_oa;
1014         set->set_oti = oti;
1015         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1016                 set->set_cookies = oti->oti_logcookies;
1017
1018         loi = lsm->lsm_oinfo;
1019         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1020                 struct lov_request *req;
1021
1022                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1023                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1024                         continue;
1025                 }
1026
1027                 OBD_ALLOC(req, sizeof(*req));
1028                 if (req == NULL)
1029                         GOTO(out_set, rc = -ENOMEM);
1030
1031                 req->rq_stripe = i;
1032                 req->rq_idx = loi->loi_ost_idx;
1033
1034                 req->rq_oa = obdo_alloc();
1035                 if (req->rq_oa == NULL)
1036                         GOTO(out_set, rc = -ENOMEM);
1037                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1038                 req->rq_oa->o_id = loi->loi_id;
1039
1040                 /* Setup the first request's cookie position */
1041                 if (!cookie_set && set->set_cookies) {
1042                         oti->oti_logcookies = set->set_cookies + i;
1043                         cookie_set = 1;
1044                 }
1045                 lov_set_add_req(req, set);
1046         }
1047         if (!set->set_count)
1048                 GOTO(out_set, rc = -EIO);
1049         *reqset = set;
1050         RETURN(rc);
1051 out_set:
1052         lov_fini_destroy_set(set);
1053         RETURN(rc);
1054 }
1055
1056 int lov_fini_setattr_set(struct lov_request_set *set)
1057 {
1058         int rc = 0;
1059         ENTRY;
1060
1061         LASSERT(set->set_exp);
1062         if (set == NULL)
1063                 RETURN(0);
1064         if (set->set_completes) {
1065                 rc = common_attr_done(set);
1066                 /* FIXME update qos data here */
1067         }
1068
1069         if (atomic_dec_and_test(&set->set_refcount))
1070                 lov_finish_set(set);
1071         RETURN(rc);
1072 }
1073
1074 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1075                          struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1076                          struct lov_request_set **reqset)
1077 {
1078         struct lov_request_set *set;
1079         struct lov_oinfo *loi = NULL;
1080         struct lov_obd *lov = &exp->exp_obd->u.lov;
1081         int rc = 0, i;
1082         ENTRY;
1083
1084         OBD_ALLOC(set, sizeof(*set));
1085         if (set == NULL)
1086                 RETURN(-ENOMEM);
1087         lov_init_set(set);
1088
1089         set->set_exp = exp;
1090         set->set_md = lsm;
1091         set->set_oa = src_oa;
1092
1093         loi = lsm->lsm_oinfo;
1094         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1095                 struct lov_request *req;
1096
1097                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1098                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1099                         continue;
1100                 }
1101
1102                 OBD_ALLOC(req, sizeof(*req));
1103                 if (req == NULL)
1104                         GOTO(out_set, rc = -ENOMEM);
1105                 req->rq_stripe = i;
1106                 req->rq_idx = loi->loi_ost_idx;
1107
1108                 req->rq_oa = obdo_alloc();
1109                 if (req->rq_oa == NULL)
1110                         GOTO(out_set, rc = -ENOMEM);
1111                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1112                 req->rq_oa->o_id = loi->loi_id;
1113
1114                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1115                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1116                                               &req->rq_oa->o_size) < 0 &&
1117                             req->rq_oa->o_size)
1118                                 req->rq_oa->o_size--;
1119                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1120                                i, req->rq_oa->o_size, src_oa->o_size);
1121                 }
1122                 lov_set_add_req(req, set);
1123         }
1124         if (!set->set_count)
1125                 GOTO(out_set, rc = -EIO);
1126         *reqset = set;
1127         RETURN(rc);
1128 out_set:
1129         lov_fini_setattr_set(set);
1130         RETURN(rc);
1131 }
1132
1133 int lov_update_setattr_set(struct lov_request_set *set,
1134                            struct lov_request *req, int rc)
1135 {
1136         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1137         ENTRY;
1138
1139         lov_update_set(set, req, rc);
1140
1141         /* grace error on inactive ost */
1142         if (rc && !lov->tgts[req->rq_idx].active)
1143                 rc = 0;
1144
1145         /* FIXME: LOV STACKING update loi data should be done by OSC *
1146          * when this is gone we can go back to using lov_update_common_set() */
1147         if (rc == 0 && req->rq_oa->o_valid & OBD_MD_FLMTIME)
1148                 set->set_md->lsm_oinfo[req->rq_stripe].loi_mtime =
1149                         req->rq_oa->o_mtime;
1150         /* ditto loi_atime, loi_ctime when available */
1151
1152         RETURN(rc);
1153 }
1154
1155 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1156                          int rc)
1157 {
1158         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1159         ENTRY;
1160
1161         lov_update_set(set, req, rc);
1162         if (rc && !lov->tgts[req->rq_idx].active)
1163                 rc = 0;
1164         /* FIXME in raid1 regime, should return 0 */
1165         RETURN(rc);
1166 }
1167
1168 int lov_fini_punch_set(struct lov_request_set *set)
1169 {
1170         int rc = 0;
1171         ENTRY;
1172
1173         LASSERT(set->set_exp);
1174         if (set == NULL)
1175                 RETURN(0);
1176         if (set->set_completes) {
1177                 if (!set->set_success)
1178                         rc = -EIO;
1179                 /* FIXME update qos data here */
1180         }
1181
1182         if (atomic_dec_and_test(&set->set_refcount))
1183                 lov_finish_set(set);
1184
1185         RETURN(rc);
1186 }
1187
1188 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1189                        struct lov_stripe_md *lsm, obd_off start,
1190                        obd_off end, struct obd_trans_info *oti,
1191                        struct lov_request_set **reqset)
1192 {
1193         struct lov_request_set *set;
1194         struct lov_oinfo *loi = NULL;
1195         struct lov_obd *lov = &exp->exp_obd->u.lov;
1196         int rc = 0, i;
1197         ENTRY;
1198
1199         OBD_ALLOC(set, sizeof(*set));
1200         if (set == NULL)
1201                 RETURN(-ENOMEM);
1202         lov_init_set(set);
1203
1204         set->set_exp = exp;
1205         set->set_md = lsm;
1206         set->set_oa = src_oa;
1207
1208         loi = lsm->lsm_oinfo;
1209         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1210                 struct lov_request *req;
1211                 obd_off rs, re;
1212
1213                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1214                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1215                         continue;
1216                 }
1217
1218                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1219                         continue;
1220
1221                 OBD_ALLOC(req, sizeof(*req));
1222                 if (req == NULL)
1223                         GOTO(out_set, rc = -ENOMEM);
1224                 req->rq_stripe = i;
1225                 req->rq_idx = loi->loi_ost_idx;
1226
1227                 req->rq_oa = obdo_alloc();
1228                 if (req->rq_oa == NULL)
1229                         GOTO(out_set, rc = -ENOMEM);
1230                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1231                 req->rq_oa->o_id = loi->loi_id;
1232
1233                 req->rq_extent.start = rs;
1234                 req->rq_extent.end = re;
1235                 req->rq_extent.gid = -1;
1236
1237                 lov_set_add_req(req, set);
1238         }
1239         if (!set->set_count)
1240                 GOTO(out_set, rc = -EIO);
1241         *reqset = set;
1242         RETURN(rc);
1243 out_set:
1244         lov_fini_punch_set(set);
1245         RETURN(rc);
1246 }
1247
1248 int lov_fini_sync_set(struct lov_request_set *set)
1249 {
1250         int rc = 0;
1251         ENTRY;
1252
1253         LASSERT(set->set_exp);
1254         if (set == NULL)
1255                 RETURN(0);
1256         if (set->set_completes) {
1257                 if (!set->set_success)
1258                         rc = -EIO;
1259                 /* FIXME update qos data here */
1260         }
1261
1262         if (atomic_dec_and_test(&set->set_refcount))
1263                 lov_finish_set(set);
1264
1265         RETURN(rc);
1266 }
1267
1268 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1269                       struct lov_stripe_md *lsm, obd_off start,
1270                       obd_off end, struct lov_request_set **reqset)
1271 {
1272         struct lov_request_set *set;
1273         struct lov_oinfo *loi = NULL;
1274         struct lov_obd *lov = &exp->exp_obd->u.lov;
1275         int rc = 0, i;
1276         ENTRY;
1277
1278         OBD_ALLOC(set, sizeof(*set));
1279         if (set == NULL)
1280                 RETURN(-ENOMEM);
1281         lov_init_set(set);
1282
1283         set->set_exp = exp;
1284         set->set_md = lsm;
1285         set->set_oa = src_oa;
1286
1287         loi = lsm->lsm_oinfo;
1288         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1289                 struct lov_request *req;
1290                 obd_off rs, re;
1291
1292                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1293                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1294                         continue;
1295                 }
1296
1297                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1298                         continue;
1299
1300                 OBD_ALLOC(req, sizeof(*req));
1301                 if (req == NULL)
1302                         GOTO(out_set, rc = -ENOMEM);
1303                 req->rq_stripe = i;
1304                 req->rq_idx = loi->loi_ost_idx;
1305
1306                 req->rq_oa = obdo_alloc();
1307                 if (req->rq_oa == NULL)
1308                         GOTO(out_set, rc = -ENOMEM);
1309                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1310                 req->rq_oa->o_id = loi->loi_id;
1311
1312                 req->rq_extent.start = rs;
1313                 req->rq_extent.end = re;
1314                 req->rq_extent.gid = -1;
1315
1316                 lov_set_add_req(req, set);
1317         }
1318         if (!set->set_count)
1319                 GOTO(out_set, rc = -EIO);
1320         *reqset = set;
1321         RETURN(rc);
1322 out_set:
1323         lov_fini_sync_set(set);
1324         RETURN(rc);
1325 }