Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         OBDO_FREE(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] &&
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         if (rc != -EINTR)
180                                 CERROR("enqueue objid "LPX64" subobj "
181                                        LPX64" on OST idx %d: rc %d\n",
182                                        set->set_oi->oi_md->lsm_object_id,
183                                        loi->loi_id, loi->loi_ost_idx, rc);
184                 } else {
185                         rc = ELDLM_OK;
186                 }
187         }
188         lov_update_set(set, req, rc);
189         RETURN(rc);
190 }
191
192 /* The callback for osc_enqueue that updates lov info for every OSC request. */
193 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
194 {
195         struct ldlm_enqueue_info *einfo;
196         struct lov_request *lovreq;
197
198         lovreq = container_of(oinfo, struct lov_request, rq_oi);
199         einfo = lovreq->rq_rqset->set_ei;
200         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
201 }
202
203 static int enqueue_done(struct lov_request_set *set, __u32 mode)
204 {
205         struct lov_request *req;
206         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
207         int rc = 0;
208         ENTRY;
209
210         /* enqueue/match success, just return */
211         if (set->set_completes && set->set_completes == set->set_success)
212                 RETURN(0);
213
214         /* cancel enqueued/matched locks */
215         list_for_each_entry(req, &set->set_list, rq_link) {
216                 struct lustre_handle *lov_lockhp;
217
218                 if (!req->rq_complete || req->rq_rc)
219                         continue;
220
221                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
222                 LASSERT(lov_lockhp);
223                 if (!lustre_handle_is_used(lov_lockhp))
224                         continue;
225
226                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
227                                 req->rq_oi.oi_md, mode, lov_lockhp);
228                 if (rc && lov->lov_tgts[req->rq_idx] &&
229                     lov->lov_tgts[req->rq_idx]->ltd_active)
230                         CERROR("cancelling obdjid "LPX64" on OST "
231                                "idx %d error: rc = %d\n",
232                                req->rq_oi.oi_md->lsm_object_id,
233                                req->rq_idx, rc);
234         }
235         if (set->set_lockh)
236                 lov_llh_put(set->set_lockh);
237         RETURN(rc);
238 }
239
240 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
241                          struct ptlrpc_request_set *rqset)
242 {
243         int ret = 0;
244         ENTRY;
245
246         if (set == NULL)
247                 RETURN(0);
248         LASSERT(set->set_exp);
249         /* Do enqueue_done only for sync requests and if any request
250          * succeeded. */
251         if (!rqset) {
252                 if (rc)
253                         set->set_completes = 0;
254                 ret = enqueue_done(set, mode);
255         } else if (set->set_lockh)
256                 lov_llh_put(set->set_lockh);
257
258         if (atomic_dec_and_test(&set->set_refcount))
259                 lov_finish_set(set);
260
261         RETURN(rc ? rc : ret);
262 }
263
264 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
265                          struct ldlm_enqueue_info *einfo,
266                          struct lov_request_set **reqset)
267 {
268         struct lov_obd *lov = &exp->exp_obd->u.lov;
269         struct lov_request_set *set;
270         int i, rc = 0;
271         struct lov_oinfo *loi;
272         ENTRY;
273
274         OBD_ALLOC(set, sizeof(*set));
275         if (set == NULL)
276                 RETURN(-ENOMEM);
277         lov_init_set(set);
278
279         set->set_exp = exp;
280         set->set_oi = oinfo;
281         set->set_ei = einfo;
282         set->set_lockh = lov_llh_new(oinfo->oi_md);
283         if (set->set_lockh == NULL)
284                 GOTO(out_set, rc = -ENOMEM);
285         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
286
287         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
288                 struct lov_request *req;
289                 obd_off start, end;
290
291                 loi = oinfo->oi_md->lsm_oinfo[i];
292                 if (!lov_stripe_intersects(oinfo->oi_md, i,
293                                            oinfo->oi_policy.l_extent.start,
294                                            oinfo->oi_policy.l_extent.end,
295                                            &start, &end))
296                         continue;
297
298                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
299                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
300                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
301                         continue;
302                 }
303
304                 OBD_ALLOC(req, sizeof(*req));
305                 if (req == NULL)
306                         GOTO(out_set, rc = -ENOMEM);
307
308                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
309                         sizeof(struct lov_oinfo *) +
310                         sizeof(struct lov_oinfo);
311                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
312                 if (req->rq_oi.oi_md == NULL) {
313                         OBD_FREE(req, sizeof(*req));
314                         GOTO(out_set, rc = -ENOMEM);
315                 }
316                 req->rq_oi.oi_md->lsm_oinfo[0] =
317                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
318                         sizeof(struct lov_oinfo *);
319
320
321                 req->rq_rqset = set;
322                 /* Set lov request specific parameters. */
323                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
324                 req->rq_oi.oi_cb_up = cb_update_enqueue;
325                 req->rq_oi.oi_flags = oinfo->oi_flags;
326
327                 LASSERT(req->rq_oi.oi_lockh);
328
329                 req->rq_oi.oi_policy.l_extent.gid =
330                         oinfo->oi_policy.l_extent.gid;
331                 req->rq_oi.oi_policy.l_extent.start = start;
332                 req->rq_oi.oi_policy.l_extent.end = end;
333
334                 req->rq_idx = loi->loi_ost_idx;
335                 req->rq_stripe = i;
336
337                 /* XXX LOV STACKING: submd should be from the subobj */
338                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
339                 req->rq_oi.oi_md->lsm_stripe_count = 0;
340                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
341                         loi->loi_kms_valid;
342                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
343                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
344
345                 lov_set_add_req(req, set);
346         }
347         if (!set->set_count)
348                 GOTO(out_set, rc = -EIO);
349         *reqset = set;
350         RETURN(0);
351 out_set:
352         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
353         RETURN(rc);
354 }
355
356 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
357                          int rc)
358 {
359         int ret = rc;
360         ENTRY;
361
362         if (rc == 1)
363                 ret = 0;
364         else if (rc == 0)
365                 ret = 1;
366         lov_update_set(set, req, ret);
367         RETURN(rc);
368 }
369
370 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
371 {
372         int rc = 0;
373         ENTRY;
374
375         if (set == NULL)
376                 RETURN(0);
377         LASSERT(set->set_exp);
378         rc = enqueue_done(set, mode);
379         if ((set->set_count == set->set_success) &&
380             (flags & LDLM_FL_TEST_LOCK))
381                 lov_llh_put(set->set_lockh);
382
383         if (atomic_dec_and_test(&set->set_refcount))
384                 lov_finish_set(set);
385
386         RETURN(rc);
387 }
388
389 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
390                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
391                        __u32 mode, struct lustre_handle *lockh,
392                        struct lov_request_set **reqset)
393 {
394         struct lov_obd *lov = &exp->exp_obd->u.lov;
395         struct lov_request_set *set;
396         int i, rc = 0;
397         struct lov_oinfo *loi;
398         ENTRY;
399
400         OBD_ALLOC(set, sizeof(*set));
401         if (set == NULL)
402                 RETURN(-ENOMEM);
403         lov_init_set(set);
404
405         set->set_exp = exp;
406         set->set_oi = oinfo;
407         set->set_oi->oi_md = lsm;
408         set->set_lockh = lov_llh_new(lsm);
409         if (set->set_lockh == NULL)
410                 GOTO(out_set, rc = -ENOMEM);
411         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
412
413         for (i = 0; i < lsm->lsm_stripe_count; i++){
414                 struct lov_request *req;
415                 obd_off start, end;
416
417                 loi = lsm->lsm_oinfo[i];
418                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
419                                            policy->l_extent.end, &start, &end))
420                         continue;
421
422                 /* FIXME raid1 should grace this error */
423                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
424                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
425                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
426                         GOTO(out_set, rc = -EIO);
427                 }
428
429                 OBD_ALLOC(req, sizeof(*req));
430                 if (req == NULL)
431                         GOTO(out_set, rc = -ENOMEM);
432
433                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
434                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
435                 if (req->rq_oi.oi_md == NULL) {
436                         OBD_FREE(req, sizeof(*req));
437                         GOTO(out_set, rc = -ENOMEM);
438                 }
439
440                 req->rq_oi.oi_policy.l_extent.start = start;
441                 req->rq_oi.oi_policy.l_extent.end = end;
442                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
443
444                 req->rq_idx = loi->loi_ost_idx;
445                 req->rq_stripe = i;
446
447                 /* XXX LOV STACKING: submd should be from the subobj */
448                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
449                 req->rq_oi.oi_md->lsm_stripe_count = 0;
450
451                 lov_set_add_req(req, set);
452         }
453         if (!set->set_count)
454                 GOTO(out_set, rc = -EIO);
455         *reqset = set;
456         RETURN(rc);
457 out_set:
458         lov_fini_match_set(set, mode, 0);
459         RETURN(rc);
460 }
461
462 int lov_fini_cancel_set(struct lov_request_set *set)
463 {
464         int rc = 0;
465         ENTRY;
466
467         if (set == NULL)
468                 RETURN(0);
469
470         LASSERT(set->set_exp);
471         if (set->set_lockh)
472                 lov_llh_put(set->set_lockh);
473
474         if (atomic_dec_and_test(&set->set_refcount))
475                 lov_finish_set(set);
476
477         RETURN(rc);
478 }
479
480 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
481                         struct lov_stripe_md *lsm, __u32 mode,
482                         struct lustre_handle *lockh,
483                         struct lov_request_set **reqset)
484 {
485         struct lov_request_set *set;
486         int i, rc = 0;
487         struct lov_oinfo *loi;
488         ENTRY;
489
490         OBD_ALLOC(set, sizeof(*set));
491         if (set == NULL)
492                 RETURN(-ENOMEM);
493         lov_init_set(set);
494
495         set->set_exp = exp;
496         set->set_oi = oinfo;
497         set->set_oi->oi_md = lsm;
498         set->set_lockh = lov_handle2llh(lockh);
499         if (set->set_lockh == NULL) {
500                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
501                 GOTO(out_set, rc = -EINVAL);
502         }
503         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
504
505         for (i = 0; i < lsm->lsm_stripe_count; i++){
506                 struct lov_request *req;
507                 struct lustre_handle *lov_lockhp;
508
509                 loi = lsm->lsm_oinfo[i];
510                 lov_lockhp = set->set_lockh->llh_handles + i;
511                 if (!lustre_handle_is_used(lov_lockhp)) {
512                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
513                                loi->loi_ost_idx, loi->loi_id);
514                         continue;
515                 }
516
517                 OBD_ALLOC(req, sizeof(*req));
518                 if (req == NULL)
519                         GOTO(out_set, rc = -ENOMEM);
520
521                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
522                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
523                 if (req->rq_oi.oi_md == NULL) {
524                         OBD_FREE(req, sizeof(*req));
525                         GOTO(out_set, rc = -ENOMEM);
526                 }
527
528                 req->rq_idx = loi->loi_ost_idx;
529                 req->rq_stripe = i;
530
531                 /* XXX LOV STACKING: submd should be from the subobj */
532                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
533                 req->rq_oi.oi_md->lsm_stripe_count = 0;
534
535                 lov_set_add_req(req, set);
536         }
537         if (!set->set_count)
538                 GOTO(out_set, rc = -EIO);
539         *reqset = set;
540         RETURN(rc);
541 out_set:
542         lov_fini_cancel_set(set);
543         RETURN(rc);
544 }
545
546 static int create_done(struct obd_export *exp, struct lov_request_set *set,
547                        struct lov_stripe_md **lsmp)
548 {
549         struct lov_obd *lov = &exp->exp_obd->u.lov;
550         struct obd_trans_info *oti = set->set_oti;
551         struct obdo *src_oa = set->set_oi->oi_oa;
552         struct lov_request *req;
553         struct obdo *ret_oa = NULL;
554         int attrset = 0, rc = 0;
555         ENTRY;
556
557         LASSERT(set->set_completes);
558
559         /* try alloc objects on other osts if osc_create fails for
560          * exceptions: RPC failure, ENOSPC, etc */
561         if (set->set_count != set->set_success) {
562                 list_for_each_entry (req, &set->set_list, rq_link) {
563                         if (req->rq_rc == 0)
564                                 continue;
565
566                         set->set_completes--;
567                         req->rq_complete = 0;
568
569                         rc = qos_remedy_create(set, req);
570                         lov_update_create_set(set, req, rc);
571
572                         if (rc)
573                                 break;
574                 }
575         }
576
577         /* no successful creates */
578         if (set->set_success == 0)
579                 GOTO(cleanup, rc);
580
581         /* If there was an explicit stripe set, fail.  Otherwise, we
582          * got some objects and that's not bad. */
583         if (set->set_count != set->set_success) {
584                 if (*lsmp)
585                         GOTO(cleanup, rc);
586                 set->set_count = set->set_success;
587                 qos_shrink_lsm(set);
588         }
589
590         OBDO_ALLOC(ret_oa);
591         if (ret_oa == NULL)
592                 GOTO(cleanup, rc = -ENOMEM);
593
594         list_for_each_entry(req, &set->set_list, rq_link) {
595                 if (!req->rq_complete || req->rq_rc)
596                         continue;
597                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
598                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
599                                 req->rq_stripe, &attrset);
600         }
601         if (src_oa->o_valid & OBD_MD_FLSIZE &&
602             ret_oa->o_size != src_oa->o_size) {
603                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
604                        src_oa->o_size, ret_oa->o_size);
605                 LBUG();
606         }
607         ret_oa->o_id = src_oa->o_id;
608         memcpy(src_oa, ret_oa, sizeof(*src_oa));
609         OBDO_FREE(ret_oa);
610
611         *lsmp = set->set_oi->oi_md;
612         GOTO(done, rc = 0);
613
614 cleanup:
615         list_for_each_entry(req, &set->set_list, rq_link) {
616                 struct obd_export *sub_exp;
617                 int err = 0;
618
619                 if (!req->rq_complete || req->rq_rc)
620                         continue;
621
622                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
623                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
624                 if (err)
625                         CERROR("Failed to uncreate objid "LPX64" subobj "
626                                LPX64" on OST idx %d: rc = %d\n",
627                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
628                                req->rq_idx, rc);
629         }
630         if (*lsmp == NULL)
631                 obd_free_memmd(exp, &set->set_oi->oi_md);
632 done:
633         if (oti && set->set_cookies) {
634                 oti->oti_logcookies = set->set_cookies;
635                 if (!set->set_cookie_sent) {
636                         oti_free_cookies(oti);
637                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
638                 } else {
639                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
640                 }
641         }
642         RETURN(rc);
643 }
644
645 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
646 {
647         int rc = 0;
648         ENTRY;
649
650         if (set == NULL)
651                 RETURN(0);
652         LASSERT(set->set_exp);
653         if (set->set_completes)
654                 rc = create_done(set->set_exp, set, lsmp);
655
656         if (atomic_dec_and_test(&set->set_refcount))
657                 lov_finish_set(set);
658
659         RETURN(rc);
660 }
661
662 int lov_update_create_set(struct lov_request_set *set,
663                           struct lov_request *req, int rc)
664 {
665         struct obd_trans_info *oti = set->set_oti;
666         struct lov_stripe_md *lsm = set->set_oi->oi_md;
667         struct lov_oinfo *loi;
668         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
669         ENTRY;
670
671         req->rq_stripe = set->set_success;
672         loi = lsm->lsm_oinfo[req->rq_stripe];
673
674         if (rc && lov->lov_tgts[req->rq_idx] &&
675             lov->lov_tgts[req->rq_idx]->ltd_active) {
676                 CERROR("error creating fid "LPX64" sub-object"
677                        " on OST idx %d/%d: rc = %d\n",
678                        set->set_oi->oi_oa->o_id, req->rq_idx,
679                        lsm->lsm_stripe_count, rc);
680                 if (rc > 0) {
681                         CERROR("obd_create returned invalid err %d\n", rc);
682                         rc = -EIO;
683                 }
684         }
685         lov_update_set(set, req, rc);
686         if (rc)
687                 RETURN(rc);
688
689         if (oti && oti->oti_objid)
690                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
691
692         loi->loi_id = req->rq_oi.oi_oa->o_id;
693         loi->loi_ost_idx = req->rq_idx;
694         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
695                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
696         loi_init(loi);
697
698         if (oti && set->set_cookies)
699                 ++oti->oti_logcookies;
700         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
701                 set->set_cookie_sent++;
702
703         RETURN(0);
704 }
705
706 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
707                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
708                         struct obd_trans_info *oti,
709                         struct lov_request_set **reqset)
710 {
711         struct lov_request_set *set;
712         int rc = 0;
713         ENTRY;
714
715         OBD_ALLOC(set, sizeof(*set));
716         if (set == NULL)
717                 RETURN(-ENOMEM);
718         lov_init_set(set);
719
720         set->set_exp = exp;
721         set->set_oi = oinfo;
722         set->set_oi->oi_md = *lsmp;
723         set->set_oi->oi_oa = src_oa;
724         set->set_oti = oti;
725
726         rc = qos_prep_create(exp, set);
727         if (rc)
728                 lov_fini_create_set(set, lsmp);
729         else
730                 *reqset = set;
731         RETURN(rc);
732 }
733
734 static int common_attr_done(struct lov_request_set *set)
735 {
736         struct list_head *pos;
737         struct lov_request *req;
738         struct obdo *tmp_oa;
739         int rc = 0, attrset = 0;
740         ENTRY;
741
742         LASSERT(set->set_oi != NULL);
743
744         if (set->set_oi->oi_oa == NULL)
745                 RETURN(0);
746
747         if (!set->set_success)
748                 RETURN(-EIO);
749
750         OBDO_ALLOC(tmp_oa);
751         if (tmp_oa == NULL)
752                 GOTO(out, rc = -ENOMEM);
753
754         list_for_each (pos, &set->set_list) {
755                 req = list_entry(pos, struct lov_request, rq_link);
756
757                 if (!req->rq_complete || req->rq_rc)
758                         continue;
759                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
760                         continue;
761                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
762                                 req->rq_oi.oi_oa->o_valid,
763                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
764         }
765         if (!attrset) {
766                 CERROR("No stripes had valid attrs\n");
767                 rc = -EIO;
768         }
769         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
770         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
771 out:
772         if (tmp_oa)
773                 OBDO_FREE(tmp_oa);
774         RETURN(rc);
775
776 }
777
778 static int brw_done(struct lov_request_set *set)
779 {
780         struct lov_stripe_md *lsm = set->set_oi->oi_md;
781         struct lov_oinfo     *loi = NULL;
782         struct list_head *pos;
783         struct lov_request *req;
784         ENTRY;
785
786         list_for_each (pos, &set->set_list) {
787                 req = list_entry(pos, struct lov_request, rq_link);
788
789                 if (!req->rq_complete || req->rq_rc)
790                         continue;
791
792                 loi = lsm->lsm_oinfo[req->rq_stripe];
793
794                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
795                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
796         }
797
798         RETURN(0);
799 }
800
801 int lov_fini_brw_set(struct lov_request_set *set)
802 {
803         int rc = 0;
804         ENTRY;
805
806         if (set == NULL)
807                 RETURN(0);
808         LASSERT(set->set_exp);
809         if (set->set_completes) {
810                 rc = brw_done(set);
811                 /* FIXME update qos data here */
812         }
813         if (atomic_dec_and_test(&set->set_refcount))
814                 lov_finish_set(set);
815
816         RETURN(rc);
817 }
818
819 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
820                      obd_count oa_bufs, struct brw_page *pga,
821                      struct obd_trans_info *oti,
822                      struct lov_request_set **reqset)
823 {
824         struct {
825                 obd_count       index;
826                 obd_count       count;
827                 obd_count       off;
828         } *info = NULL;
829         struct lov_request_set *set;
830         struct lov_oinfo *loi = NULL;
831         struct lov_obd *lov = &exp->exp_obd->u.lov;
832         int rc = 0, i, shift;
833         ENTRY;
834
835         OBD_ALLOC(set, sizeof(*set));
836         if (set == NULL)
837                 RETURN(-ENOMEM);
838         lov_init_set(set);
839
840         set->set_exp = exp;
841         set->set_oti = oti;
842         set->set_oi = oinfo;
843         set->set_oabufs = oa_bufs;
844         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
845         if (!set->set_pga)
846                 GOTO(out, rc = -ENOMEM);
847
848         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
849         if (!info)
850                 GOTO(out, rc = -ENOMEM);
851
852         /* calculate the page count for each stripe */
853         for (i = 0; i < oa_bufs; i++) {
854                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
855                 info[stripe].count++;
856         }
857
858         /* alloc and initialize lov request */
859         shift = 0;
860         for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
861                 struct lov_request *req;
862
863                 if (info[i].count == 0)
864                         continue;
865
866                 loi = oinfo->oi_md->lsm_oinfo[i];
867                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
868                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
869                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
870                         GOTO(out, rc = -EIO);
871                 }
872
873                 OBD_ALLOC(req, sizeof(*req));
874                 if (req == NULL)
875                         GOTO(out, rc = -ENOMEM);
876
877                 OBDO_ALLOC(req->rq_oi.oi_oa);
878                 if (req->rq_oi.oi_oa == NULL) {
879                         OBD_FREE(req, sizeof(*req));
880                         GOTO(out, rc = -ENOMEM);
881                 }
882
883                 if (oinfo->oi_oa) {
884                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
885                                sizeof(*req->rq_oi.oi_oa));
886                 }
887                 req->rq_oi.oi_oa->o_id = loi->loi_id;
888                 req->rq_oi.oi_oa->o_stripe_idx = i;
889
890                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
891                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
892                 if (req->rq_oi.oi_md == NULL) {
893                         OBDO_FREE(req->rq_oi.oi_oa);
894                         OBD_FREE(req, sizeof(*req));
895                         GOTO(out, rc = -ENOMEM);
896                 }
897
898                 req->rq_idx = loi->loi_ost_idx;
899                 req->rq_stripe = i;
900
901                 /* XXX LOV STACKING */
902                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
903                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
904                 req->rq_oabufs = info[i].count;
905                 req->rq_pgaidx = shift;
906                 shift += req->rq_oabufs;
907
908                 /* remember the index for sort brw_page array */
909                 info[i].index = req->rq_pgaidx;
910
911                 lov_set_add_req(req, set);
912         }
913         if (!set->set_count)
914                 GOTO(out, rc = -EIO);
915
916         /* rotate & sort the brw_page array */
917         for (i = 0; i < oa_bufs; i++) {
918                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
919
920                 shift = info[stripe].index + info[stripe].off;
921                 LASSERT(shift < oa_bufs);
922                 set->set_pga[shift] = pga[i];
923                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
924                                   &set->set_pga[shift].off);
925                 info[stripe].off++;
926         }
927 out:
928         if (info)
929                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
930
931         if (rc == 0)
932                 *reqset = set;
933         else
934                 lov_fini_brw_set(set);
935
936         RETURN(rc);
937 }
938
939 int lov_fini_getattr_set(struct lov_request_set *set)
940 {
941         int rc = 0;
942         ENTRY;
943
944         if (set == NULL)
945                 RETURN(0);
946         LASSERT(set->set_exp);
947         if (set->set_completes)
948                 rc = common_attr_done(set);
949
950         if (atomic_dec_and_test(&set->set_refcount))
951                 lov_finish_set(set);
952
953         RETURN(rc);
954 }
955
956 /* The callback for osc_getattr_async that finilizes a request info when a
957  * response is recieved. */
958 static int cb_getattr_update(struct obd_info *oinfo, int rc)
959 {
960         struct lov_request *lovreq;
961         lovreq = container_of(oinfo, struct lov_request, rq_oi);
962         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
963 }
964
965 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
966                          struct lov_request_set **reqset)
967 {
968         struct lov_request_set *set;
969         struct lov_oinfo *loi = NULL;
970         struct lov_obd *lov = &exp->exp_obd->u.lov;
971         int rc = 0, i;
972         ENTRY;
973
974         OBD_ALLOC(set, sizeof(*set));
975         if (set == NULL)
976                 RETURN(-ENOMEM);
977         lov_init_set(set);
978
979         set->set_exp = exp;
980         set->set_oi = oinfo;
981
982         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
983                 struct lov_request *req;
984
985                 loi = oinfo->oi_md->lsm_oinfo[i];
986                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
987                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
988                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
989                         continue;
990                 }
991
992                 OBD_ALLOC(req, sizeof(*req));
993                 if (req == NULL)
994                         GOTO(out_set, rc = -ENOMEM);
995
996                 req->rq_stripe = i;
997                 req->rq_idx = loi->loi_ost_idx;
998
999                 OBDO_ALLOC(req->rq_oi.oi_oa);
1000                 if (req->rq_oi.oi_oa == NULL) {
1001                         OBD_FREE(req, sizeof(*req));
1002                         GOTO(out_set, rc = -ENOMEM);
1003                 }
1004                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1005                        sizeof(*req->rq_oi.oi_oa));
1006                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1007                 req->rq_oi.oi_cb_up = cb_getattr_update;
1008
1009                 lov_set_add_req(req, set);
1010         }
1011         if (!set->set_count)
1012                 GOTO(out_set, rc = -EIO);
1013         *reqset = set;
1014         RETURN(rc);
1015 out_set:
1016         lov_fini_getattr_set(set);
1017         RETURN(rc);
1018 }
1019
1020 int lov_fini_destroy_set(struct lov_request_set *set)
1021 {
1022         ENTRY;
1023
1024         if (set == NULL)
1025                 RETURN(0);
1026         LASSERT(set->set_exp);
1027         if (set->set_completes) {
1028                 /* FIXME update qos data here */
1029         }
1030
1031         if (atomic_dec_and_test(&set->set_refcount))
1032                 lov_finish_set(set);
1033
1034         RETURN(0);
1035 }
1036
1037 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1038                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1039                          struct obd_trans_info *oti,
1040                          struct lov_request_set **reqset)
1041 {
1042         struct lov_request_set *set;
1043         struct lov_oinfo *loi = NULL;
1044         struct lov_obd *lov = &exp->exp_obd->u.lov;
1045         int rc = 0, i;
1046         ENTRY;
1047
1048         OBD_ALLOC(set, sizeof(*set));
1049         if (set == NULL)
1050                 RETURN(-ENOMEM);
1051         lov_init_set(set);
1052
1053         set->set_exp = exp;
1054         set->set_oi = oinfo;
1055         set->set_oi->oi_md = lsm;
1056         set->set_oi->oi_oa = src_oa;
1057         set->set_oti = oti;
1058         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1059                 set->set_cookies = oti->oti_logcookies;
1060
1061         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1062                 struct lov_request *req;
1063
1064                 loi = lsm->lsm_oinfo[i];
1065                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1066                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1067                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1068                         continue;
1069                 }
1070
1071                 OBD_ALLOC(req, sizeof(*req));
1072                 if (req == NULL)
1073                         GOTO(out_set, rc = -ENOMEM);
1074
1075                 req->rq_stripe = i;
1076                 req->rq_idx = loi->loi_ost_idx;
1077
1078                 OBDO_ALLOC(req->rq_oi.oi_oa);
1079                 if (req->rq_oi.oi_oa == NULL) {
1080                         OBD_FREE(req, sizeof(*req));
1081                         GOTO(out_set, rc = -ENOMEM);
1082                 }
1083                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1084                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1085                 lov_set_add_req(req, set);
1086         }
1087         if (!set->set_count)
1088                 GOTO(out_set, rc = -EIO);
1089         *reqset = set;
1090         RETURN(rc);
1091 out_set:
1092         lov_fini_destroy_set(set);
1093         RETURN(rc);
1094 }
1095
1096 int lov_fini_setattr_set(struct lov_request_set *set)
1097 {
1098         int rc = 0;
1099         ENTRY;
1100
1101         if (set == NULL)
1102                 RETURN(0);
1103         LASSERT(set->set_exp);
1104         if (set->set_completes) {
1105                 rc = common_attr_done(set);
1106                 /* FIXME update qos data here */
1107         }
1108
1109         if (atomic_dec_and_test(&set->set_refcount))
1110                 lov_finish_set(set);
1111         RETURN(rc);
1112 }
1113
1114 int lov_update_setattr_set(struct lov_request_set *set,
1115                            struct lov_request *req, int rc)
1116 {
1117         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1118         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1119         ENTRY;
1120
1121         lov_update_set(set, req, rc);
1122
1123         /* grace error on inactive ost */
1124         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1125                     lov->lov_tgts[req->rq_idx]->ltd_active))
1126                 rc = 0;
1127
1128         if (rc == 0) {
1129                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1130                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1131                                 req->rq_oi.oi_oa->o_ctime;
1132                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1133                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1134                                 req->rq_oi.oi_oa->o_mtime;
1135                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1136                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1137                                 req->rq_oi.oi_oa->o_atime;
1138         }
1139
1140         RETURN(rc);
1141 }
1142
1143 /* The callback for osc_setattr_async that finilizes a request info when a
1144  * response is recieved. */
1145 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1146 {
1147         struct lov_request *lovreq;
1148         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1149         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1150 }
1151
1152 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1153                          struct obd_trans_info *oti,
1154                          struct lov_request_set **reqset)
1155 {
1156         struct lov_request_set *set;
1157         struct lov_oinfo *loi = NULL;
1158         struct lov_obd *lov = &exp->exp_obd->u.lov;
1159         int rc = 0, i;
1160         ENTRY;
1161
1162         OBD_ALLOC(set, sizeof(*set));
1163         if (set == NULL)
1164                 RETURN(-ENOMEM);
1165         lov_init_set(set);
1166
1167         set->set_exp = exp;
1168         set->set_oti = oti;
1169         set->set_oi = oinfo;
1170         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1171                 set->set_cookies = oti->oti_logcookies;
1172
1173         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1174                 struct lov_request *req;
1175
1176                 loi = oinfo->oi_md->lsm_oinfo[i];
1177                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1178                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1179                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1180                         continue;
1181                 }
1182
1183                 OBD_ALLOC(req, sizeof(*req));
1184                 if (req == NULL)
1185                         GOTO(out_set, rc = -ENOMEM);
1186                 req->rq_stripe = i;
1187                 req->rq_idx = loi->loi_ost_idx;
1188
1189                 OBDO_ALLOC(req->rq_oi.oi_oa);
1190                 if (req->rq_oi.oi_oa == NULL) {
1191                         OBD_FREE(req, sizeof(*req));
1192                         GOTO(out_set, rc = -ENOMEM);
1193                 }
1194                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1195                        sizeof(*req->rq_oi.oi_oa));
1196                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1197                 req->rq_oi.oi_oa->o_stripe_idx = i;
1198                 req->rq_oi.oi_cb_up = cb_setattr_update;
1199                 req->rq_rqset = set;
1200
1201                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1202                         int off = lov_stripe_offset(oinfo->oi_md,
1203                                                     oinfo->oi_oa->o_size, i,
1204                                                     &req->rq_oi.oi_oa->o_size);
1205
1206                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1207                                 req->rq_oi.oi_oa->o_size--;
1208
1209                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1210                                i, req->rq_oi.oi_oa->o_size,
1211                                oinfo->oi_oa->o_size);
1212                 }
1213                 lov_set_add_req(req, set);
1214         }
1215         if (!set->set_count)
1216                 GOTO(out_set, rc = -EIO);
1217         *reqset = set;
1218         RETURN(rc);
1219 out_set:
1220         lov_fini_setattr_set(set);
1221         RETURN(rc);
1222 }
1223
1224 int lov_fini_punch_set(struct lov_request_set *set)
1225 {
1226         int rc = 0;
1227         ENTRY;
1228
1229         if (set == NULL)
1230                 RETURN(0);
1231         LASSERT(set->set_exp);
1232         if (set->set_completes) {
1233                 rc = -EIO;
1234                 /* FIXME update qos data here */
1235                 if (set->set_success)
1236                         rc = common_attr_done(set);
1237         }
1238
1239         if (atomic_dec_and_test(&set->set_refcount))
1240                 lov_finish_set(set);
1241
1242         RETURN(rc);
1243 }
1244
1245 int lov_update_punch_set(struct lov_request_set *set,
1246                            struct lov_request *req, int rc)
1247 {
1248         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1249         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1250         ENTRY;
1251
1252         lov_update_set(set, req, rc);
1253
1254         /* grace error on inactive ost */
1255         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1256                 rc = 0;
1257
1258         if (rc == 0) {
1259                 lov_stripe_lock(lsm);
1260                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1261                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1262                                 req->rq_oi.oi_oa->o_blocks;
1263                 }
1264
1265                 /* Do we need to update lvb_size here? It needn't because
1266                  * it have been done in ll_truncate(). -jay */
1267                 lov_stripe_unlock(lsm);
1268         }
1269
1270         RETURN(rc);
1271 }
1272
1273 /* The callback for osc_punch that finilizes a request info when a response
1274  * is recieved. */
1275 static int cb_update_punch(struct obd_info *oinfo, int rc)
1276 {
1277         struct lov_request *lovreq;
1278         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1279         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1280 }
1281
1282 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1283                        struct obd_trans_info *oti,
1284                        struct lov_request_set **reqset)
1285 {
1286         struct lov_request_set *set;
1287         struct lov_oinfo *loi = NULL;
1288         struct lov_obd *lov = &exp->exp_obd->u.lov;
1289         int rc = 0, i;
1290         ENTRY;
1291
1292         OBD_ALLOC(set, sizeof(*set));
1293         if (set == NULL)
1294                 RETURN(-ENOMEM);
1295         lov_init_set(set);
1296
1297         set->set_oi = oinfo;
1298         set->set_exp = exp;
1299
1300         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1301                 struct lov_request *req;
1302                 obd_off rs, re;
1303
1304                 loi = oinfo->oi_md->lsm_oinfo[i];
1305                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1306                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1307                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1308                         continue;
1309                 }
1310
1311                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1312                                            oinfo->oi_policy.l_extent.start,
1313                                            oinfo->oi_policy.l_extent.end,
1314                                            &rs, &re))
1315                         continue;
1316
1317                 OBD_ALLOC(req, sizeof(*req));
1318                 if (req == NULL)
1319                         GOTO(out_set, rc = -ENOMEM);
1320                 req->rq_stripe = i;
1321                 req->rq_idx = loi->loi_ost_idx;
1322
1323                 OBDO_ALLOC(req->rq_oi.oi_oa);
1324                 if (req->rq_oi.oi_oa == NULL) {
1325                         OBD_FREE(req, sizeof(*req));
1326                         GOTO(out_set, rc = -ENOMEM);
1327                 }
1328                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1329                        sizeof(*req->rq_oi.oi_oa));
1330                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1331                 req->rq_oi.oi_oa->o_stripe_idx = i;
1332                 req->rq_oi.oi_cb_up = cb_update_punch;
1333                 req->rq_rqset = set;
1334
1335                 req->rq_oi.oi_policy.l_extent.start = rs;
1336                 req->rq_oi.oi_policy.l_extent.end = re;
1337                 req->rq_oi.oi_policy.l_extent.gid = -1;
1338
1339                 lov_set_add_req(req, set);
1340         }
1341         if (!set->set_count)
1342                 GOTO(out_set, rc = -EIO);
1343         *reqset = set;
1344         RETURN(rc);
1345 out_set:
1346         lov_fini_punch_set(set);
1347         RETURN(rc);
1348 }
1349
1350 int lov_fini_sync_set(struct lov_request_set *set)
1351 {
1352         int rc = 0;
1353         ENTRY;
1354
1355         if (set == NULL)
1356                 RETURN(0);
1357         LASSERT(set->set_exp);
1358         if (set->set_completes) {
1359                 if (!set->set_success)
1360                         rc = -EIO;
1361                 /* FIXME update qos data here */
1362         }
1363
1364         if (atomic_dec_and_test(&set->set_refcount))
1365                 lov_finish_set(set);
1366
1367         RETURN(rc);
1368 }
1369
1370 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1371                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1372                       obd_off start, obd_off end,
1373                       struct lov_request_set **reqset)
1374 {
1375         struct lov_request_set *set;
1376         struct lov_oinfo *loi = NULL;
1377         struct lov_obd *lov = &exp->exp_obd->u.lov;
1378         int rc = 0, i;
1379         ENTRY;
1380
1381         OBD_ALLOC(set, sizeof(*set));
1382         if (set == NULL)
1383                 RETURN(-ENOMEM);
1384         lov_init_set(set);
1385
1386         set->set_exp = exp;
1387         set->set_oi = oinfo;
1388         set->set_oi->oi_md = lsm;
1389         set->set_oi->oi_oa = src_oa;
1390
1391         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1392                 struct lov_request *req;
1393                 obd_off rs, re;
1394
1395                 loi = lsm->lsm_oinfo[i];
1396                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1397                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1398                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1399                         continue;
1400                 }
1401
1402                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1403                         continue;
1404
1405                 OBD_ALLOC(req, sizeof(*req));
1406                 if (req == NULL)
1407                         GOTO(out_set, rc = -ENOMEM);
1408                 req->rq_stripe = i;
1409                 req->rq_idx = loi->loi_ost_idx;
1410
1411                 OBDO_ALLOC(req->rq_oi.oi_oa);
1412                 if (req->rq_oi.oi_oa == NULL) {
1413                         OBD_FREE(req, sizeof(*req));
1414                         GOTO(out_set, rc = -ENOMEM);
1415                 }
1416                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1417                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1418                 req->rq_oi.oi_oa->o_stripe_idx = i;
1419
1420                 req->rq_oi.oi_policy.l_extent.start = rs;
1421                 req->rq_oi.oi_policy.l_extent.end = re;
1422                 req->rq_oi.oi_policy.l_extent.gid = -1;
1423
1424                 lov_set_add_req(req, set);
1425         }
1426         if (!set->set_count)
1427                 GOTO(out_set, rc = -EIO);
1428         *reqset = set;
1429         RETURN(rc);
1430 out_set:
1431         lov_fini_sync_set(set);
1432         RETURN(rc);
1433 }
1434
1435 #define LOV_U64_MAX ((__u64)~0ULL)
1436 #define LOV_SUM_MAX(tot, add)                                           \
1437         do {                                                            \
1438                 if ((tot) + (add) < (tot))                              \
1439                         (tot) = LOV_U64_MAX;                            \
1440                 else                                                    \
1441                         (tot) += (add);                                 \
1442         } while(0)
1443
1444 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1445 {
1446         ENTRY;
1447
1448         if (success) {
1449                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1450
1451                 if (osfs->os_files != LOV_U64_MAX)
1452                         do_div(osfs->os_files, expected_stripes);
1453                 if (osfs->os_ffree != LOV_U64_MAX)
1454                         do_div(osfs->os_ffree, expected_stripes);
1455
1456                 spin_lock(&obd->obd_osfs_lock);
1457                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1458                 obd->obd_osfs_age = get_jiffies_64();
1459                 spin_unlock(&obd->obd_osfs_lock);
1460                 RETURN(0);
1461         }
1462
1463         RETURN(-EIO);
1464 }
1465
1466 int lov_fini_statfs_set(struct lov_request_set *set)
1467 {
1468         int rc = 0;
1469         ENTRY;
1470
1471         if (set == NULL)
1472                 RETURN(0);
1473
1474         if (set->set_completes) {
1475                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1476                                      set->set_success);
1477         }
1478
1479         if (atomic_dec_and_test(&set->set_refcount))
1480                 lov_finish_set(set);
1481
1482         RETURN(rc);
1483 }
1484
1485 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1486                        struct obd_statfs *lov_sfs, int success)
1487 {
1488         spin_lock(&obd->obd_osfs_lock);
1489         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1490         obd->obd_osfs_age = get_jiffies_64();
1491         spin_unlock(&obd->obd_osfs_lock);
1492
1493         if (success == 0) {
1494                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1495         } else {
1496 #ifdef MIN_DF
1497                 /* Sandia requested that df (and so, statfs) only
1498                    returned minimal available space on
1499                    a single OST, so people would be able to
1500                    write this much data guaranteed. */
1501                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1502                         /* Presumably if new bavail is smaller,
1503                            new bfree is bigger as well */
1504                         osfs->os_bfree = lov_sfs->os_bfree;
1505                         osfs->os_bavail = lov_sfs->os_bavail;
1506                 }
1507 #else
1508                 osfs->os_bfree += lov_sfs->os_bfree;
1509                 osfs->os_bavail += lov_sfs->os_bavail;
1510 #endif
1511                 osfs->os_blocks += lov_sfs->os_blocks;
1512                 /* XXX not sure about this one - depends on policy.
1513                  *   - could be minimum if we always stripe on all OBDs
1514                  *     (but that would be wrong for any other policy,
1515                  *     if one of the OBDs has no more objects left)
1516                  *   - could be sum if we stripe whole objects
1517                  *   - could be average, just to give a nice number
1518                  *
1519                  * To give a "reasonable" (if not wholly accurate)
1520                  * number, we divide the total number of free objects
1521                  * by expected stripe count (watch out for overflow).
1522                  */
1523                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1524                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1525         }
1526 }
1527
1528 /* The callback for osc_statfs_async that finilizes a request info when a
1529  * response is recieved. */
1530 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1531 {
1532         struct lov_request *lovreq;
1533         struct obd_statfs *osfs, *lov_sfs;
1534         struct obd_device *obd;
1535         struct lov_obd *lov;
1536         int success;
1537         ENTRY;
1538
1539         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1540         lov = &lovreq->rq_rqset->set_obd->u.lov;
1541         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1542
1543         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1544         lov_sfs = oinfo->oi_osfs;
1545
1546         success = lovreq->rq_rqset->set_success;
1547
1548         /* XXX: the same is done in lov_update_common_set, however
1549            lovset->set_exp is not initialized. */
1550         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1551         if (rc) {
1552                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1553                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1554                         rc = 0;
1555                 RETURN(rc);
1556         }
1557
1558         lov_update_statfs(obd, osfs, lov_sfs, success);
1559         qos_update(lov);
1560
1561         RETURN(0);
1562 }
1563
1564 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1565                         struct lov_request_set **reqset)
1566 {
1567         struct lov_request_set *set;
1568         struct lov_obd *lov = &obd->u.lov;
1569         int rc = 0, i;
1570         ENTRY;
1571
1572         OBD_ALLOC(set, sizeof(*set));
1573         if (set == NULL)
1574                 RETURN(-ENOMEM);
1575         lov_init_set(set);
1576
1577         set->set_obd = obd;
1578         set->set_oi = oinfo;
1579
1580         /* We only get block data from the OBD */
1581         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1582                 struct lov_request *req;
1583
1584                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1585                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1586                         continue;
1587                 }
1588
1589                 OBD_ALLOC(req, sizeof(*req));
1590                 if (req == NULL)
1591                         GOTO(out_set, rc = -ENOMEM);
1592
1593                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1594                 if (req->rq_oi.oi_osfs == NULL) {
1595                         OBD_FREE(req, sizeof(*req));
1596                         GOTO(out_set, rc = -ENOMEM);
1597                 }
1598
1599                 req->rq_idx = i;
1600                 req->rq_oi.oi_cb_up = cb_statfs_update;
1601                 req->rq_rqset = set;
1602
1603                 lov_set_add_req(req, set);
1604         }
1605         if (!set->set_count)
1606                 GOTO(out_set, rc = -EIO);
1607         *reqset = set;
1608         RETURN(rc);
1609 out_set:
1610         lov_fini_statfs_set(set);
1611         RETURN(rc);
1612 }